From 254f1dc9e629e9de818672174a6614c6595fb11a Mon Sep 17 00:00:00 2001 From: "Suren A. Chilingaryan" Date: Mon, 6 Aug 2018 20:20:46 +0200 Subject: Support arbitrary ports and port-range splitting --- analyze.sh | 6 +- cfg/ipecamera0.sh | 3 + cfg/ipecamera3.sh | 3 + cfg/ipecamera4.sh | 3 + cfg/ipecamera5.sh | 3 + cfg/ipecamera6.sh | 3 + config.sh | 2 + run-client.sh | 12 +- run-server.sh | 10 +- run.sh | 4 + src/Detector/Detector.cpp | 6 +- src/Detector/Detector.h | 2 +- src/DetectorModule/DetectorModule.cpp | 6 +- src/DetectorModule/DetectorModule.h | 2 +- src/ReceiverThreads.vma/ReceiverThreads.cpp | 166 ++++++++++++++++++++++++++++ src/ReceiverThreads.vma/ReceiverThreads.h | 34 ++++++ src/ReceiverThreads/ReceiverThreads.cpp | 5 +- src/ReceiverThreads/ReceiverThreads.h | 2 +- src/main_client.cpp | 15 ++- src/main_server.cpp | 14 ++- 20 files changed, 270 insertions(+), 31 deletions(-) create mode 100644 cfg/ipecamera0.sh create mode 100644 cfg/ipecamera3.sh create mode 100644 cfg/ipecamera4.sh create mode 100644 cfg/ipecamera5.sh create mode 100644 cfg/ipecamera6.sh create mode 100644 config.sh create mode 100755 run.sh create mode 100644 src/ReceiverThreads.vma/ReceiverThreads.cpp create mode 100644 src/ReceiverThreads.vma/ReceiverThreads.h diff --git a/analyze.sh b/analyze.sh index 9c11bd6..33799cc 100755 --- a/analyze.sh +++ b/analyze.sh @@ -2,9 +2,11 @@ sleep=1 -stats1=($(ethtool -S ens11 | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) +int=$(ip addr show | grep "192\.168\.2\." | awk '{ print $NF }') + +stats1=($(ethtool -S $int | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) sleep $sleep -stats2=($(ethtool -S ens11 | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) +stats2=($(ethtool -S $int | grep -P "rx\d?_(packets|bytes)" | awk '{ print $2 }')) for i in "${!stats1[@]}"; do diff=$(bc <<< "(${stats2[$i]} - ${stats1[$i]}) / $sleep") diff --git a/cfg/ipecamera0.sh b/cfg/ipecamera0.sh new file mode 100644 index 0000000..a93d716 --- /dev/null +++ b/cfg/ipecamera0.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +./run-client.sh 192.168.2.86 4022 5 1500 diff --git a/cfg/ipecamera3.sh b/cfg/ipecamera3.sh new file mode 100644 index 0000000..28b2ca9 --- /dev/null +++ b/cfg/ipecamera3.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +./run-client.sh 192.168.2.86 4000 8 1500 diff --git a/cfg/ipecamera4.sh b/cfg/ipecamera4.sh new file mode 100644 index 0000000..f1ce0b0 --- /dev/null +++ b/cfg/ipecamera4.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +./run-client.sh 192.168.2.86 4008 8 1500 diff --git a/cfg/ipecamera5.sh b/cfg/ipecamera5.sh new file mode 100644 index 0000000..cc9e34b --- /dev/null +++ b/cfg/ipecamera5.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +./run-client.sh 192.168.2.86 4016 6 1500 diff --git a/cfg/ipecamera6.sh b/cfg/ipecamera6.sh new file mode 100644 index 0000000..25528fd --- /dev/null +++ b/cfg/ipecamera6.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +./run-server.sh 192.168.2.86 4000 27 1500 diff --git a/config.sh b/config.sh new file mode 100644 index 0000000..b43906c --- /dev/null +++ b/config.sh @@ -0,0 +1,2 @@ +ods_path=/mnt/ands/ods/bin/ +vma_path=/mnt/ands diff --git a/run-client.sh b/run-client.sh index e4b0f8f..7ed9967 100755 --- a/run-client.sh +++ b/run-client.sh @@ -8,8 +8,12 @@ function run { ip=$1 mtu=1500 #max 9000 -[ -n "$1" ] || { echo "Usage: run [mtu]" ; exit 1 ; } -[ -n "$2" ] && mtu=$2 +first_port=4000 +num_ports=27 +[ -n "$1" ] || { echo "Usage: run [first_port] [num_ports] [mtu]" ; exit 1 ; } +[ -n "$2" ] && first_port=$2 +[ -n "$3" ] && num_ports=$3 +[ -n "$4" ] && mtu=$4 int=$(ip route show to match "$ip" | grep src | awk '{ print $3 }') @@ -17,10 +21,10 @@ ip link set $int mtu $mtu if [ $mtu -le 1500 ]; then sed -i '' -e 's/numberOfProjectionsPerPacket.*/numberOfProjectionsPerPacket = 40/' config.cfg - run 80000 "$ip" + run "$ip" $first_port $num_ports 120000 elif [ $mtu -eq 9000 ]; then sed -i '' -e 's/numberOfProjectionsPerPacket.*/numberOfProjectionsPerPacket = 250/' config.cfg - run 20000 "$ip" + run "$ip" $first_port $num_ports 20000 fi diff --git a/run-server.sh b/run-server.sh index 7d29c10..99330c2 100755 --- a/run-server.sh +++ b/run-server.sh @@ -8,9 +8,13 @@ function run { ip=$1 +first_port=4000 +num_ports=27 mtu=1500 #max 9000 -[ -n "$1" ] || { echo "Usage: run [mtu]" ; exit 1 ; } -[ -n "$2" ] && mtu=$2 +[ -n "$1" ] || { echo "Usage: run [first_port] [num_ports] [mtu]" ; exit 1 ; } +[ -n "$2" ] && first_port=$2 +[ -n "$3" ] && num_ports=$3 +[ -n "$4" ] && mtu=$4 ipinfo=$(ip addr show | grep $ip) [ $? -eq 0 ] || { echo "Specified IP $ip is not found" ; exit 1 ; } @@ -22,4 +26,4 @@ echo 8000 > /proc/sys/vm/nr_hugepages # 0 ip link set $int mtu $mtu -run $ip +run $ip $first_port $num_ports diff --git a/run.sh b/run.sh new file mode 100755 index 0000000..7f6846a --- /dev/null +++ b/run.sh @@ -0,0 +1,4 @@ +#! /bin/bash + +name=$(uname -n | cut -d '.' -f 1) +bash cfg/$name.sh diff --git a/src/Detector/Detector.cpp b/src/Detector/Detector.cpp index 5dde6d1..43402ee 100644 --- a/src/Detector/Detector.cpp +++ b/src/Detector/Detector.cpp @@ -10,12 +10,12 @@ #include "Detector.h" -Detector::Detector(const std::string& address, const std::string& configPath, const unsigned int timeIntervall) : +Detector::Detector(const std::string& address, const std::string& configPath, const int firstPort, const int numPorts, const unsigned int timeIntervall) : timeIntervall_{timeIntervall}, - numberOfDetectorModules_{27} { + numberOfDetectorModules_{numPorts} { modules_.reserve(numberOfDetectorModules_); for(auto i = 0; i < numberOfDetectorModules_; i++){ - modules_.emplace_back(i, address, configPath); + modules_.emplace_back(i, address, firstPort + i, configPath); } } diff --git a/src/Detector/Detector.h b/src/Detector/Detector.h index 1969dbd..4295e63 100644 --- a/src/Detector/Detector.h +++ b/src/Detector/Detector.h @@ -17,7 +17,7 @@ class Detector { public: - Detector(const std::string& address, const std::string& configPath, const unsigned int timeIntervall); + Detector(const std::string& address, const std::string& configPath, const int firstPort, const int numPorts, const unsigned int timeIntervall); auto run() -> void; private: diff --git a/src/DetectorModule/DetectorModule.cpp b/src/DetectorModule/DetectorModule.cpp index e7e0272..6767eb9 100644 --- a/src/DetectorModule/DetectorModule.cpp +++ b/src/DetectorModule/DetectorModule.cpp @@ -40,15 +40,13 @@ void timer_start(std::function func, unsigned int interval, unsigned }).detach(); } -DetectorModule::DetectorModule(const int detectorID, const std::string& address, const std::string& configPath) : +DetectorModule::DetectorModule(const int detectorID, const std::string& address, const int port, const std::string& configPath) : detectorID_{detectorID}, numberOfDetectorsPerModule_{16}, index_{0u}, - client_{address, detectorID+4000}, + client_{address, port}, max_packets_{1000u}{ - printf("Creating %d\n", detectorID); - if (readConfig(configPath)) { throw std::runtime_error("DetectorModule: Configuration file could not be loaded successfully. Please check!"); } diff --git a/src/DetectorModule/DetectorModule.h b/src/DetectorModule/DetectorModule.h index f959857..fe25727 100644 --- a/src/DetectorModule/DetectorModule.h +++ b/src/DetectorModule/DetectorModule.h @@ -21,7 +21,7 @@ class DetectorModule { public: - DetectorModule(const int detectorID, const std::string& address, const std::string& configPath); + DetectorModule(const int detectorID, const std::string& address, const int port, const std::string& configPath); auto sendPeriodically(unsigned int timeIntervall) -> void; diff --git a/src/ReceiverThreads.vma/ReceiverThreads.cpp b/src/ReceiverThreads.vma/ReceiverThreads.cpp new file mode 100644 index 0000000..650a840 --- /dev/null +++ b/src/ReceiverThreads.vma/ReceiverThreads.cpp @@ -0,0 +1,166 @@ +/* + * Copyright 2016 + * + * ReceiverThreads.cpp + * + * Created on: 21.07.2016 + * Author: Tobias Frust + */ + +#include "ReceiverThreads.h" +#include "../UDPServer/UDPServer.h" + +#include + +ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules) + : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i); + } + + for(auto i = 0; i < numberOfDetectorModules; i++){ + receiverModules_[i].join(); + } + +} + +auto ReceiverThreads::receiverThread(const int port) -> void { + int max_packets = 100; + int max_packet_size = 65535; + + + struct vma_ap_t *vma = vma_get_api(); + if (!vma) throw "Can't get LibVMA API" + + struct vma_packet_desc_t vma_packet; + struct vma_buff_t *vma_buf; + int vma_buf_offset; + + int vma_ring_fd; + vma->get_socket_rings_fds(fd?, &vma_ring_fd, 1); + if (vma_ring_fd < 0) throw "Can't get ring fds"; + + while (true) { + // free vma packets + + + struct vma_completion_t vma_comps[max_packets]; + int n_packets = vma->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0); + if (!n_packets) continue; + + for (int i = 0; i < n_packets; i++) { + switch (vma_comps[i].events) { + case VMA_SOCKETXTREME_PACKET: + break; + case EPOLLERR: + case EPOLLRDHUP: + printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + default: + printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data); + throw "Polling error"; + } + + int packet_len = vma_comps[i].packet.total_len; + int n_buffs = vma_comps[i].packet.num_bufs; + + vma_comps.packet.buff_lst + + vma_comps[i].packet.buff_lst->payload +/* +<------><------><------><------><------>conn = conns_in[vma_comps.user_data] vma_comps.packet.total_len; +<------><------><------><------><------>conn->vma_packet.num_bufs = vma_comps.packet.num_bufs; +<------><------><------><------><------>conn->vma_packet.total_len = vma_comps.packet.total_len; +<------><------><------><------><------>conn->vma_packet.buff_lst = vma_comps.packet.buff_lst; +<------><------><------><------><------>conn->vma_buf = conn->vma_packet.buff_lst; +<------><------><------><------><------>conn->vma_buf_offset = 0; +*/ +/* + ret = _min((_config.msg_size - conn->msg_len), (conn->vma_buf->len - conn->vma_buf_offset)); + memcpy(((uint8_t *)msg_hdr) + conn->msg_len, + ((uint8_t *)conn->vma_buf->payload) + conn->vma_buf_offset, + ret); +*/ + _vma_api->socketxtreme_free_vma_packets(&conn->vma_packet, 1); + } + + +auto ReceiverThreads::receiverThread(const int port) -> void { + int max_packets = 100; + int max_packet_size = 65535; + + UDPServer server = UDPServer(address_, port); + std::vector> buffers; + + std::size_t rcv_index = 0; + std::size_t rcv_packets = 0; + std::size_t rcv_size = 0; + + std::size_t lastIndex{0}; + std::size_t loss = 0; + + struct mmsghdr msg[max_packets]; + struct iovec msgvec[max_packets]; + + buffers.resize(max_packets); + + memset(msg, 0, sizeof(msg)); + memset(msgvec, 0, sizeof(msgvec)); + for (int i = 0; i < max_packets; i++) { + buffers[i].resize(max_packet_size); + + msgvec[i].iov_base = buffers[i].data(); + msgvec[i].iov_len = buffers[i].size(); + msg[i].msg_hdr.msg_iov = &msgvec[i]; + msg[i].msg_hdr.msg_iovlen = 1; + } + + + BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; + + double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; + auto ts_last = std::chrono::high_resolution_clock::now(); + while(true){ + int packets = server.mrecv(max_packets, msg, 1); //timeIntervall_); + + if (packets >= 0) { + for (int i = 0; i < packets; i++) { + int bytes = msg[i].msg_len; + unsigned short *buf = reinterpret_cast(msgvec[i].iov_base); + + rcv_packets++; + rcv_size += bytes; + +// BOOST_LOG_TRIVIAL(debug) << "Received " << bytes << " Bytes."; + std::size_t index =*((std::size_t *)buf); + int diff = index - lastIndex - 1; + if(diff > 0){ + loss += diff; + BOOST_LOG_TRIVIAL(debug) << "Packet loss or wrong order! new: " << index << " old: " << lastIndex; + } + +/* if (port == 4000) { + printf("%i:%i:%i:%i,", index, diff, loss, i); + }*/ + + lastIndex = index; + } + } + + auto ts = std::chrono::high_resolution_clock::now(); + std::chrono::nanoseconds d = ts - ts_last; + if (d.count() >= 1000000000) { + printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000); + rcv_packets = 0; + rcv_size = 0; + rcv_index = lastIndex; + loss = 0; + ts_last = ts; + } + } + + BOOST_LOG_TRIVIAL(info) << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)"; + loss_ += loss; +} + diff --git a/src/ReceiverThreads.vma/ReceiverThreads.h b/src/ReceiverThreads.vma/ReceiverThreads.h new file mode 100644 index 0000000..3cc986c --- /dev/null +++ b/src/ReceiverThreads.vma/ReceiverThreads.h @@ -0,0 +1,34 @@ +/* + * Copyright 2016 + * + * ReceiverThreads.h + * + * Created on: 21.07.2016 + * Author: Tobias Frust + */ + +#ifndef RECEIVERTHREADS_H_ +#define RECEIVERTHREADS_H_ + +#include +#include + +class ReceiverThreads { +public: + ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules); + + auto run() -> void; +private: + auto receiverThread(const int port) -> void; + + std::vector receiverModules_; + + std::size_t loss_; + + int timeIntervall_; + int numberOfDetectorModules_; + + std::string address_; +}; + +#endif /* RECEIVERTHREADS_H_ */ diff --git a/src/ReceiverThreads/ReceiverThreads.cpp b/src/ReceiverThreads/ReceiverThreads.cpp index e60971e..e5c339b 100644 --- a/src/ReceiverThreads/ReceiverThreads.cpp +++ b/src/ReceiverThreads/ReceiverThreads.cpp @@ -12,11 +12,11 @@ #include -ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules) +ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort) : timeIntervall_{timeIntervall}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0} { for(auto i = 0; i < numberOfDetectorModules; i++){ - receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, 4000+i); + receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i); } for(auto i = 0; i < numberOfDetectorModules; i++){ @@ -56,6 +56,7 @@ auto ReceiverThreads::receiverThread(const int port) -> void { } + printf("Listening %d\n", port); BOOST_LOG_TRIVIAL(info) << "Address: " << address_ << " port: " << port << " timeout: " << timeIntervall_; double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.; diff --git a/src/ReceiverThreads/ReceiverThreads.h b/src/ReceiverThreads/ReceiverThreads.h index 7cb04c0..094652a 100644 --- a/src/ReceiverThreads/ReceiverThreads.h +++ b/src/ReceiverThreads/ReceiverThreads.h @@ -15,7 +15,7 @@ class ReceiverThreads { public: - ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules); + ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort = 4000); auto run() -> void; private: diff --git a/src/main_client.cpp b/src/main_client.cpp index 01b3aad..fb0c1e3 100644 --- a/src/main_client.cpp +++ b/src/main_client.cpp @@ -10,23 +10,28 @@ #include void initLog() { +/* #ifndef NDEBUG boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::debug); #else boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info); #endif +*/ + boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info); } int main (int argc, char *argv[]){ - if(argc < 3){ - BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorClient
!"; + if(argc < 5){ + BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorClient
"; return 0; } - int imagesPerSec = std::stoi(argv[1]); + std::string address = argv[1]; + int firstPort = std::stoi(argv[2]); + int numPorts = std::stoi(argv[3]); + int imagesPerSec = std::stoi(argv[4]); - std::string address = argv[2]; double timegap = 1./(double)imagesPerSec; unsigned int intervall = timegap*1000*1000; @@ -37,7 +42,7 @@ int main (int argc, char *argv[]){ auto configPath = std::string { "config.cfg" }; - Detector detector{address, configPath, intervall}; + Detector detector{address, configPath, firstPort, numPorts, intervall}; //DetectorModule detModule0 = DetectorModule(1, address, configPath); diff --git a/src/main_server.cpp b/src/main_server.cpp index cd84cb9..b2f9425 100644 --- a/src/main_server.cpp +++ b/src/main_server.cpp @@ -10,11 +10,14 @@ #include void initLog() { +/* #ifndef NDEBUG boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::debug); #else boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info); #endif +*/ + boost::log::core::get()->set_filter(boost::log::trivial::severity >= boost::log::trivial::info); } void start(std::function func){ @@ -30,12 +33,14 @@ int main (int argc, char *argv[]){ initLog(); - if(argc < 2){ - BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorServer
!"; + if(argc < 4){ + BOOST_LOG_TRIVIAL(error) << "Program usage: ./onlineDetectorSimulatorServer
"; return 0; } std::string address = argv[1]; + int firstPort = std::stoi(argv[2]); + int numPorts = std::stoi(argv[3]); // int port = 4002; // @@ -46,9 +51,8 @@ int main (int argc, char *argv[]){ std::vector buf(16000); - std::cout << "Receiving UDP packages: " << std::endl; - - ReceiverThreads(address, 10, 27); + printf("Receving udp packets on ports %u - %u\n", firstPort, firstPort + numPorts); + ReceiverThreads(address, 10, numPorts, firstPort); // for(auto i = 0; i < 27; i++){ // std::function f = [=]() { -- cgit v1.2.1