summaryrefslogtreecommitdiffstats
path: root/src/ReceiverThreads/ReceiverThreads.cpp
blob: 688de00af99908fdecc86e5d86a3fadfab0ca6d6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
 * Copyright 2016
 * 
 * ReceiverThreads.cpp
 *
 *  Created on: 21.07.2016
 *      Author: Tobias Frust
 */

#include <iostream>

#include "ReceiverThreads.h"
#include "../UDPServer/UDPServer.h"

#include <sys/epoll.h>
#include <vma_extra.h>

//#define USE_VMA

ReceiverThreads::ReceiverThreads(const std::string& address, const int timeIntervall, const int numberOfDetectorModules, const int firstPort)
   : timeIntervall_{timeIntervall}, firstPort_{firstPort}, numberOfDetectorModules_{numberOfDetectorModules}, address_{address}, loss_{0}, nbufs_(100) {

   int max_packet_size = 65535;

#ifdef USE_VMA
   vma_ = vma_get_api();
	// First call fails?
   if (!vma_) vma_ = vma_get_api();
#else 
   vma_ =  NULL;
#endif

   ringbuf_.resize(nbufs_);
    for (int i = 0; i < nbufs_; i++)
	ringbuf_[i].resize(numberOfDetectorModules * max_packet_size);

   for(auto i = 0; i < numberOfDetectorModules; i++){
      receiverModules_.emplace_back(&ReceiverThreads::receiverThread, this, firstPort+i);
   }

   for(auto i = 0; i < numberOfDetectorModules; i++){
      receiverModules_[i].join();
   }
}

auto ReceiverThreads::receiverThread(const int port) -> void {
    int vma_ring_fd;

    int max_packets = 100;
    int max_packet_size = 65535;
    int id = port - firstPort_;

    UDPServer server = UDPServer(address_, port);
    std::vector<std::vector<char>> buffers;

    std::size_t rcv_index = 0;
    std::size_t rcv_packets = 0;
    std::size_t rcv_size  = 0;
    
    std::size_t lastIndex{0};
    std::size_t loss = 0;
   
    struct mmsghdr msg[max_packets];
    struct iovec msgvec[max_packets];

    buffers.resize(max_packets);

    memset(msg, 0, sizeof(msg));
    memset(msgvec, 0, sizeof(msgvec));
    for (int i = 0; i < max_packets; i++) {
	buffers[i].resize(max_packet_size);

	msgvec[i].iov_base = buffers[i].data();
	msgvec[i].iov_len = buffers[i].size();
	msg[i].msg_hdr.msg_iov = &msgvec[i];
	msg[i].msg_hdr.msg_iovlen = 1;
    }

    if (vma_) {
	vma_->get_socket_rings_fds(server.f_socket, &vma_ring_fd, 1);
	if (vma_ring_fd < 0)  throw "Can't get ring fds";
    }

   printf("ID %d listening %d\n", id, port);

   double coef = 1000. * 1000. * 1000. / 1024. / 1024. / 1024.;
   auto ts_last = std::chrono::high_resolution_clock::now();
   int rbuf = 0;
   while(true){
	int packets;
	struct vma_completion_t vma_comps[max_packets];
	struct vma_packet_desc_t vma_packs[max_packets];

	if (vma_) {
		// Seems crashes on ConnectX-3, requires later cards according to documentation (section 8.2)
	    packets = vma_->socketxtreme_poll(vma_ring_fd, vma_comps, max_packets, 0);
	} else {
	    packets = server.mrecv(max_packets, msg, 1); //timeIntervall_);
	}

	if (packets >= 0) {
	  for (int i = 0; i < packets; i++) {
	    int bytes;
	    unsigned short *buf;
    	    char *ringptr = ringbuf_[rbuf++].data() + id * max_packet_size;
	    if (rbuf == nbufs_) rbuf = 0;
	
	    if (vma_) {
		vma_packs[i] = vma_comps[i].packet;

		switch (vma_comps[i].events) {
                 case VMA_SOCKETXTREME_PACKET:
		    break;
                 case EPOLLERR:
                 case EPOLLRDHUP:
		    printf("Polling error event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
		    throw "Polling error";
                 default:
            	    printf("Unsupported event=0x%lx user_data=%ld\n", vma_comps[i].events, vma_comps[i].user_data);
            	    throw "Polling error";
		}
		
		bytes = vma_comps[i].packet.total_len;
		buf = reinterpret_cast<unsigned short*>(vma_comps[i].packet.buff_lst->payload);

        	int n_bufs = vma_comps[i].packet.num_bufs;
		struct vma_buff_t *vma_buf = vma_comps[i].packet.buff_lst;
		
		for (int j = 0; j < n_bufs; j++) {
		    memcpy(ringptr, vma_buf->payload, vma_buf->len);
		    ringptr += vma_buf->len;
		    vma_buf = vma_buf->next;
		}
	    } else {
		bytes = msg[i].msg_len;
		buf =  reinterpret_cast<unsigned short*>(msgvec[i].iov_base);

		memcpy(ringptr, buf, bytes);
	    }

	    rcv_packets++;
	    rcv_size += bytes;

    	    std::size_t index =*((std::size_t *)buf);
    	    int diff = index - lastIndex - 1;
    	    if(diff > 0){
        	loss += diff;
    	    }

/*    	    if (port == 4000) {
    		printf("%i:%i:%i:%i,", index, diff, loss, i);
    	    }*/

    	    lastIndex = index;
    	  }
    	  
    	  if (vma_) {
	    vma_->socketxtreme_free_vma_packets(vma_packs, packets);
	  }
    	}
     
	auto ts = std::chrono::high_resolution_clock::now();
	std::chrono::nanoseconds d = ts - ts_last;
	if (d.count() >= 1000000000) {
	    printf("Lost %.2lf%, Received: %i (%zu bytes, %.3lf GBit/s) in %.3lf ms [VMA: %i, port: %i]\n", loss / (double)(lastIndex - rcv_index)*100.0, rcv_packets, rcv_size, 8. * rcv_size * coef / d.count() , 1. * d.count() / 1000000, (vma_?1:0), port);
	    rcv_packets = 0;
	    rcv_size = 0;
	    rcv_index = lastIndex;
	    loss = 0;
	    ts_last = ts;
	}
   }
   
   std::cout << "Lost " << loss << " from " << lastIndex << " packets; (" << loss/(double)lastIndex*100.0 << "%)";
   loss_ += loss;
}