From 09b7e077f10fe324774f759086f3596947d69b19 Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Mon, 6 Aug 2018 18:35:13 +0200 Subject: Send multiple packets in one system call --- config.cfg | 4 +- src/DetectorModule/DetectorModule.cpp | 92 +++++++++++++++++++++++++++-------- src/DetectorModule/DetectorModule.h | 8 ++- src/UDPClient/UDPClient.cpp | 13 +++++ src/UDPClient/UDPClient.h | 1 + 5 files changed, 94 insertions(+), 24 deletions(-) diff --git a/config.cfg b/config.cfg index 87bdb78..d5f6978 100644 --- a/config.cfg +++ b/config.cfg @@ -1,10 +1,10 @@ numberOfFanDetectors = 432 -dataInputPath = "/DetModStream/data_pumpe_repaired/" +dataInputPath = "/mnt/ands/testdata/data_pumpe_repaired/" dataFileName = "data_pumpe_repaired_DetModNr_" dataFileEnding = ".fx" numberOfPlanes = 2 samplingRate = 1 scanRate = 2000 numberOfDataFrames = 500 -numberOfProjectionsPerPacket = 500 +numberOfProjectionsPerPacket = 40 numberOfDetectorsPerModule = 16 diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp index 169c5a5..e7e0272 100644 --- a/src/DetectorModule/DetectorModule.cpp +++ b/src/DetectorModule/DetectorModule.cpp @@ -15,12 +15,27 @@ #include #include -void timer_start(std::function func, unsigned int interval){ - std::thread([func, interval]() { +void timer_start(std::function func, unsigned int interval, unsigned int max_packets){ + std::thread([func, interval, max_packets]() { + int packets = 1; + auto next = std::chrono::high_resolution_clock::now(); while (true) { - func(); - std::this_thread::sleep_for(std::chrono::microseconds(interval)); + func(packets); + + next += std::chrono::microseconds(packets * interval); + auto now = std::chrono::high_resolution_clock::now(); + if (now > next) { + std::chrono::nanoseconds late = now - next; + packets = 1 + (late.count() / interval / 1000); + if (packets > max_packets) + packets = max_packets; + } else { + packets = 1; + } + + std::this_thread::sleep_until(next); +// std::this_thread::sleep_for(std::chrono::microseconds(interval)); } }).detach(); } @@ -29,7 +44,8 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address, detectorID_{detectorID}, numberOfDetectorsPerModule_{16}, index_{0u}, - client_{address, detectorID+4000}{ + client_{address, detectorID+4000}, + max_packets_{1000u}{ printf("Creating %d\n", detectorID); @@ -37,7 +53,10 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address, throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!"); } - sendBuffer_.resize(numberOfProjectionsPerPacket_*numberOfDetectorsPerModule_*sizeof(unsigned short)+sizeof(std::size_t)); + sendBuffer_.resize(max_packets_); + for(auto &it: sendBuffer_) { + it.resize(numberOfProjectionsPerPacket_ * numberOfDetectorsPerModule_ * sizeof(unsigned short) + sizeof(size_t) + sizeof(short int)); + } //read the input data from the file corresponding to the detectorModuleID readInput(); @@ -46,7 +65,7 @@ DetectorModule::DetectorModule(const int detectorID, const std::string& address, printf("Created %d\n", detectorID); } -auto DetectorModule::send() -> void{ +auto DetectorModule::send(int packets = 1) -> void{ BOOST_LOG_TRIVIAL(debug) << "Detectormodule " << detectorID_ << " :sending udp packet with index " << index_ << "."; int numberOfParts = numberOfProjections_/numberOfProjectionsPerPacket_; // sendBuffer_[0] = (sizeof(std::size_t)) & 0xff; @@ -57,23 +76,56 @@ auto DetectorModule::send() -> void{ // sendBuffer_[5] = (sizeof(std::size_t) >> 40) & 0xff; // sendBuffer_[6] = (sizeof(std::size_t) >> 48) & 0xff; // sendBuffer_[7] = (sizeof(std::size_t) >> 56) & 0xff; - unsigned int bufferSizeIndex = index_ % 1000; - unsigned int sinoSize = numberOfDetectorsPerModule_*numberOfProjectionsPerPacket_; - *reinterpret_cast(sendBuffer_.data()) = index_; - *reinterpret_cast(sendBuffer_.data()+sizeof(std::size_t)) = partID_; - std::copy(((char*)buffer_.data())+sinoSize*(bufferSizeIndex*numberOfParts+partID_)*sizeof(unsigned short), ((char*)buffer_.data())+(sinoSize*(1+bufferSizeIndex*numberOfParts+partID_))*sizeof(unsigned short), sendBuffer_.begin()+sizeof(std::size_t)+sizeof(unsigned short)); - BOOST_LOG_TRIVIAL(debug) << "INDEX: " << (bufferSizeIndex*numberOfParts+partID_); - client_.send(sendBuffer_.data(), sendBuffer_.size()); - partID_ = (partID_+1) % numberOfParts; - if(partID_ == 0) - ++index_; + + + struct mmsghdr msg[packets]; + struct iovec msgvec[packets]; + + unsigned int hdrSize = sizeof(size_t) + sizeof(short int); + unsigned int sinoSize = numberOfDetectorsPerModule_ * numberOfProjectionsPerPacket_; + + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < packets; i++) { + unsigned int bufferSizeIndex = index_ % 1000; + + char *ptr = sendBuffer_[i].data(); + + msgvec[i].iov_base = sendBuffer_[i].data(); + msgvec[i].iov_len = sendBuffer_[i].size(); + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + + + *reinterpret_cast(ptr) = index_ * numberOfParts + partID_; + *reinterpret_cast(ptr + sizeof(size_t)) = partID_; + memcpy(ptr + hdrSize, buffer_.data() + sinoSize * (bufferSizeIndex * numberOfParts + partID_), sinoSize * sizeof(unsigned short)); + + partID_ = (partID_ + 1) % numberOfParts; + if (partID_ == 0) ++index_; + } + + client_.msend(packets, msg); + + auto ts = std::chrono::high_resolution_clock::now(); + std::chrono::nanoseconds d = ts - ts_; + counter_ += packets; + if (d.count() >= 1000000000) { + printf("Packets %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", counter_, sendBuffer_[0].size(), 8. * counter_ * sendBuffer_[0].size() / 1024 / 1024 / 1024, 1. * d.count() / 1000000); + counter_ = 0; + ts_ = ts; + } } auto DetectorModule::sendPeriodically(unsigned int timeIntervall) -> void { - std::function f = [=]() { - this->send(); + counter_ = 0; + ips_ = 1000000. / ((double)timeIntervall); + ts_ = std::chrono::high_resolution_clock::now(); + + std::function f = [=](int packets = 1) { + this->send(packets); }; - timer_start(f, timeIntervall); + timer_start(f, timeIntervall, max_packets_); } auto DetectorModule::readInput() -> void { diff --git a/src/DetectorModule/DetectorModule.h b/src/DetectorModule/DetectorModule.h index afe4d04..f959857 100644 --- a/src/DetectorModule/DetectorModule.h +++ b/src/DetectorModule/DetectorModule.h @@ -27,7 +27,7 @@ public: private: std::vector buffer_; - std::vector sendBuffer_; + std::vector> sendBuffer_; int detectorID_; UDPClient client_; @@ -40,12 +40,16 @@ private: unsigned int numberOfFrames_; std::string path_, fileName_, fileEnding_; + unsigned int max_packets_; std::size_t index_; + std::size_t ips_; + std::size_t counter_; + std::chrono::high_resolution_clock::time_point ts_; unsigned short partID_{0}; auto readConfig(const std::string& configFile) -> bool; auto readInput() -> void; - auto send() -> void; + auto send(int packets) -> void; }; diff --git a/src/UDPClient/UDPClient.cpp b/src/UDPClient/UDPClient.cpp index 1d427ba..b9d55d0 100644 --- a/src/UDPClient/UDPClient.cpp +++ b/src/UDPClient/UDPClient.cpp @@ -64,12 +64,21 @@ { throw udp_client_server_runtime_error(("invalid address or port: \"" + addr + ":" + decimal_port + "\"").c_str()); } + f_socket = socket(f_addrinfo->ai_family, SOCK_DGRAM | SOCK_CLOEXEC, IPPROTO_UDP); if(f_socket == -1) { freeaddrinfo(f_addrinfo); throw udp_client_server_runtime_error(("could not create socket for: \"" + addr + ":" + decimal_port + "\"").c_str()); } + + if (connect(f_socket, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen)) + { + close(f_socket); + freeaddrinfo(f_addrinfo); + throw udp_client_server_runtime_error(("could not connect socket for: \"" + addr + ":" + decimal_port + "\"").c_str()); + } + printf("Created client %d\n", f_port); } @@ -140,3 +149,7 @@ int UDPClient::send(const char *msg, std::size_t size){ return sendto(f_socket, msg, size, 0, f_addrinfo->ai_addr, f_addrinfo->ai_addrlen); } + + int UDPClient::msend(int n, struct mmsghdr *msg){ + return sendmmsg(f_socket, msg, n, 0); + } diff --git a/src/UDPClient/UDPClient.h b/src/UDPClient/UDPClient.h index f6cf0d6..f1c8a8d 100644 --- a/src/UDPClient/UDPClient.h +++ b/src/UDPClient/UDPClient.h @@ -33,6 +33,7 @@ public: std::string get_addr() const; int send(const char *msg, size_t size); + int msend(int n, struct mmsghdr *msg); private: int f_socket; -- cgit v1.2.1