summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2018-07-25 15:57:55 +0200
committerSuren A. Chilingaryan <csa@suren.me>2018-07-25 15:57:55 +0200
commit76affa8334acbd21f3a1186fdaace1efe93e2e31 (patch)
tree32065ef3866c4759f00f86e91cc01a6e24134269 /src
parent41d41b7deb416167da5a0de2638ecea078c13ea6 (diff)
downloadods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.gz
ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.bz2
ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.tar.xz
ods-76affa8334acbd21f3a1186fdaace1efe93e2e31.zip
Recieve multiple packets in one system call
Diffstat (limited to 'src')
-rw-r--r--src/ReceiverThreads/ReceiverThreads.cpp88
-rw-r--r--src/UDPServer/UDPServer.cpp16
-rw-r--r--src/UDPServer/UDPServer.h1
3 files changed, 89 insertions, 16 deletions
diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp
index 3d22c66..e60971e 100644
--- a/src/ReceiverThreads/ReceiverThreads.cpp
+++ b/src/ReceiverThreads/ReceiverThreads.cpp
@@ -26,24 +26,80 @@ ReceiverThreads::ReceiverThreads(const std::string& address, const int timeInter
}
auto ReceiverThreads::receiverThread(const int port) -> void {
- UDPServer server = UDPServer(address_, port);
- std::vector<unsigned short> buf(33000);
- std::size_t lastIndex{0};
+ int max_packets = 100;
+ int max_packet_size = 65535;
+
+ UDPServer server = UDPServer(address_, port);
+ std::vector<std::vector<char>> buffers;
+
+ std::size_t rcv_index = 0;
+ std::size_t rcv_packets = 0;
+ std::size_t rcv_size = 0;
+
+ std::size_t lastIndex{0};
+ std::size_t loss = 0;
+
+ struct mmsghdr msg[max_packets];
+ struct iovec msgvec[max_packets];
+
+ buffers.resize(max_packets);
+
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < max_packets; i++) {
+ buffers[i].resize(max_packet_size);
+
+ msgvec[i].iov_base = buffers[i].data();
+ msgvec[i].iov_len = buffers[i].size();
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+
BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_;
+
+ double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
+ auto ts_last = std::chrono::high_resolution_clock::now();
while(true){
- int bytes = server.timed_recv((char*)buf.data(), 65536, timeIntervall_);
- if(bytes < 0){
- break;
- }
- BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
- std::size_t index = *((std::size_t *)buf.data());
- int diff = index - lastIndex - 1;
- if(diff > 0){
- loss_ += diff;
- BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
- }
- lastIndex = index;
+ int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
+
+ if (packets >= 0) {
+ for (int i = 0; i < packets; i++) {
+ int bytes = msg[i].msg_len;
+ unsigned short *buf = reinterpret_cast<unsigned short*>(msgvec[i].iov_base);
+
+ rcv_packets++;
+ rcv_size += bytes;
+
+// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes.";
+ std::size_t index =*((std::size_t *)buf);
+ int diff = index - lastIndex - 1;
+ if(diff > 0){
+ loss += diff;
+ BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex;
+ }
+
+/* if (port == 4000) {
+ printf("%i:%i:%i:%i,", index, diff, loss, i);
+ }*/
+
+ lastIndex = index;
+ }
+ }
+
+ auto ts = std::chrono::high_resolution_clock::now();
+ std::chrono::nanoseconds d = ts - ts_last;
+ if (d.count() >= 1000000000) {
+ printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000);
+ rcv_packets = 0;
+ rcv_size = 0;
+ rcv_index = lastIndex;
+ loss = 0;
+ ts_last = ts;
+ }
}
- BOOST_LOG_TRIVIAL(info) << "Lost " << loss_ << " from " << lastIndex << " packets; (" << loss_/(double)lastIndex*100.0 << "%)";
+
+ BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
+ loss_ += loss;
}
diff --git a/src/UDPServer/UDPServer.cpp b/src/UDPServer/UDPServer.cpp
index 8c9decf..42166b4 100644
--- a/src/UDPServer/UDPServer.cpp
+++ b/src/UDPServer/UDPServer.cpp
@@ -81,6 +81,13 @@ UDPServer::UDPServer(const std::string& addr, int port)
close(f_socket);
throw udp_client_server_runtime_error(("could not bind UDP socket with: \"" + addr + ":" + decimal_port + "\"").c_str());
}
+
+/*
+ int a = 134217728;
+ if (setsockopt(f_socket, SOL_SOCKET, SO_RCVBUF, &a, sizeof(int)) == -1) {
+ fprintf(stderr, "Error setting socket opts: %s\n", strerror(errno));
+ }
+*/
}
/** \brief Clean up the UDP server.
@@ -184,3 +191,12 @@ int UDPServer::timed_recv(char *msg, size_t max_size, int max_wait_s)
// our socket has data
return ::recv(f_socket, msg, max_size, 0);
}
+
+int UDPServer::mrecv(int n, struct mmsghdr *msg, int max_wait_s)
+{
+ struct timespec timeout;
+ timeout.tv_sec = max_wait_s;
+ timeout.tv_nsec = 0;
+
+ return recvmmsg(f_socket, msg, n, MSG_WAITFORONE, &timeout);
+}
diff --git a/src/UDPServer/UDPServer.h b/src/UDPServer/UDPServer.h
index 22f33b3..ed0e033 100644
--- a/src/UDPServer/UDPServer.h
+++ b/src/UDPServer/UDPServer.h
@@ -34,6 +34,7 @@ public:
int recv(char *msg, size_t max_size);
int timed_recv(char *msg, size_t max_size, int max_wait_ms);
+ int mrecv(int n, struct mmsghdr *msg, int max_wait_s);
private:
int f_socket;