summaryrefslogtreecommitdiffstats
path: root/src/DetectorModule/DetectorModule.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/DetectorModule/DetectorModule.cpp')
-rw-r--r--src/DetectorModule/DetectorModule.cpp92
1 files changed, 72 insertions, 20 deletions
diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp
index 169c5a5..e7e0272 100644
--- a/src/DetectorModule/DetectorModule.cpp
+++ b/src/DetectorModule/DetectorModule.cpp
@@ -15,12 +15,27 @@
#include <exception>
#include <fstream>
-void timer_start(std::function<void(void)> func, unsigned int interval){
- std::thread([func, interval]() {
+void timer_start(std::function<void(int)> func, unsigned int interval, unsigned int max_packets){
+ std::thread([func, interval, max_packets]() {
+ int packets = 1;
+ auto next = std::chrono::high_resolution_clock::now();
while (true)
{
- func();
- std::this_thread::sleep_for(std::chrono::microseconds(interval));
+ func(packets);
+
+ next += std::chrono::microseconds(packets * interval);
+ auto now = std::chrono::high_resolution_clock::now();
+ if (now > next) {
+ std::chrono::nanoseconds late = now - next;
+ packets = 1 + (late.count() / interval / 1000);
+ if (packets > max_packets)
+ packets = max_packets;
+ } else {
+ packets = 1;
+ }
+
+ std::this_thread::sleep_until(next);
+// std::this_thread::sleep_for(std::chrono::microseconds(interval));
}
}).detach();
}
@@ -29,7 +44,8 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
detectorID_{detectorID},
numberOfDetectorsPerModule_{16},
index_{0u},
- client_{address, detectorID+4000}{
+ client_{address, detectorID+4000},
+ max_packets_{1000u}{
printf("Creating %d\n", detectorID);
@@ -37,7 +53,10 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!");
}
- sendBuffer_.resize(numberOfProjectionsPerPacket_*numberOfDetectorsPerModule_*sizeof(unsigned short)+sizeof(std::size_t));
+ sendBuffer_.resize(max_packets_);
+ for(auto &it: sendBuffer_) {
+ it.resize(numberOfProjectionsPerPacket_ * numberOfDetectorsPerModule_ * sizeof(unsigned short) + sizeof(size_t) + sizeof(short int));
+ }
//read the input data from the file corresponding to the detectorModuleID
readInput();
@@ -46,7 +65,7 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address,
printf("Created %d\n", detectorID);
}
-auto DetectorModule::send() -> void{
+auto DetectorModule::send(int packets = 1) -> void{
BOOST_LOG_TRIVIAL(debug) << "Detectormodule " << detectorID_ << " :sending udp packet with index " << index_ << ".";
int numberOfParts = numberOfProjections_/numberOfProjectionsPerPacket_;
// sendBuffer_[0] = (sizeof(std::size_t)) & 0xff;
@@ -57,23 +76,56 @@ auto DetectorModule::send() -> void{
// sendBuffer_[5] = (sizeof(std::size_t) >> 40) & 0xff;
// sendBuffer_[6] = (sizeof(std::size_t) >> 48) & 0xff;
// sendBuffer_[7] = (sizeof(std::size_t) >> 56) & 0xff;
- unsigned int bufferSizeIndex = index_ % 1000;
- unsigned int sinoSize = numberOfDetectorsPerModule_*numberOfProjectionsPerPacket_;
- *reinterpret_cast<int*>(sendBuffer_.data()) = index_;
- *reinterpret_cast<unsigned short*>(sendBuffer_.data()+sizeof(std::size_t)) = partID_;
- std::copy(((char*)buffer_.data())+sinoSize*(bufferSizeIndex*numberOfParts+partID_)*sizeof(unsigned short), ((char*)buffer_.data())+(sinoSize*(1+bufferSizeIndex*numberOfParts+partID_))*sizeof(unsigned short), sendBuffer_.begin()+sizeof(std::size_t)+sizeof(unsigned short));
- BOOST_LOG_TRIVIAL(debug) << "INDEX: " << (bufferSizeIndex*numberOfParts+partID_);
- client_.send(sendBuffer_.data(), sendBuffer_.size());
- partID_ = (partID_+1) % numberOfParts;
- if(partID_ == 0)
- ++index_;
+
+
+ struct mmsghdr msg[packets];
+ struct iovec msgvec[packets];
+
+ unsigned int hdrSize = sizeof(size_t) + sizeof(short int);
+ unsigned int sinoSize = numberOfDetectorsPerModule_ * numberOfProjectionsPerPacket_;
+
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < packets; i++) {
+ unsigned int bufferSizeIndex = index_ % 1000;
+
+ char *ptr = sendBuffer_[i].data();
+
+ msgvec[i].iov_base = sendBuffer_[i].data();
+ msgvec[i].iov_len = sendBuffer_[i].size();
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+
+
+ *reinterpret_cast<size_t*>(ptr) = index_ * numberOfParts + partID_;
+ *reinterpret_cast<unsigned short*>(ptr + sizeof(size_t)) = partID_;
+ memcpy(ptr + hdrSize, buffer_.data() + sinoSize * (bufferSizeIndex * numberOfParts + partID_), sinoSize * sizeof(unsigned short));
+
+ partID_ = (partID_ + 1) % numberOfParts;
+ if (partID_ == 0) ++index_;
+ }
+
+ client_.msend(packets, msg);
+
+ auto ts = std::chrono::high_resolution_clock::now();
+ std::chrono::nanoseconds d = ts - ts_;
+ counter_ += packets;
+ if (d.count() >= 1000000000) {
+ printf("Packets %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", counter_, sendBuffer_[0].size(), 8. * counter_ * sendBuffer_[0].size() / 1024 / 1024 / 1024, 1. * d.count() / 1000000);
+ counter_ = 0;
+ ts_ = ts;
+ }
}
auto DetectorModule::sendPeriodically(unsigned int timeIntervall) -> void {
- std::function<void(void)> f = [=]() {
- this->send();
+ counter_ = 0;
+ ips_ = 1000000. / ((double)timeIntervall);
+ ts_ = std::chrono::high_resolution_clock::now();
+
+ std::function<void(int)> f = [=](int packets = 1) {
+ this->send(packets);
};
- timer_start(f, timeIntervall);
+ timer_start(f, timeIntervall, max_packets_);
}
auto DetectorModule::readInput() -> void {