summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-buffer.c
blob: f83885e847c1c0a754eaa9ad126218394104a0d7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <stdio.h>
#include <stdint.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-buffer.h"

// This is currently not thread safe. With dual-filter architecture this will be called sequentially.

UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
    UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
    if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");

    buffer->ring_size = cfg->buffer_size;
    buffer->drop_buffers = cfg->drop_buffers;
    buffer->fragment_size = cfg->payload_size;
    buffer->dataset_size = cfg->dataset_size;
    buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
    buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
//    printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);

    buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
    buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
    buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));

    if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
        ufo_roof_buffer_free(buffer);
        roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
    }

    return buffer;
}

void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
    if (buffer) {
        if (buffer->ring_buffer)
            free(buffer->ring_buffer);
        if (buffer->n_fragments)
            free(buffer->n_fragments);
        if (buffer->stream_fragment)
            free(buffer->stream_fragment);

        free(buffer);
    }
}

    // fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
    gboolean ready = FALSE;
    guint buffer_id;
    guint dataset_id;

    if (!fragment_id) {
        fragment_id = ++buffer->stream_fragment[stream_id];
    }

        // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
    dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
    fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
    buffer_id = dataset_id % buffer->ring_size;

	// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
    if (dataset_id < buffer->current_id)
	return FALSE;
//        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);

        // We are not fast enough, new packets are arrvining to fast        
    if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
            // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?

        root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
            dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);

            // FIXME: Send semi-complete buffers further?
            // FIXME: Or shall we drop more if larger buffers are allocated?
        if ((dataset_id - buffer->current_id) > 2 * buffer->ring_size) {
	    memset(buffer->n_fragments, 0, buffer->ring_size * sizeof(_Atomic guint));
	    buffer->current_id = dataset_id;
	} else {
	    for (guint i = buffer->current_id; i <= (dataset_id - (buffer->ring_size - buffer->drop_buffers)); i++)
		buffer->n_fragments[i%buffer->ring_size] = 0;
	    buffer->current_id = dataset_id - (buffer->ring_size - buffer->drop_buffers) + 1;
	}

	if (buffer->n_fragments[buffer->current_id%buffer->ring_size] == buffer->fragments_per_dataset)
	    ready = TRUE;
	
            // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
            // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value 
            // The updates may happen after writting/reading is finished.
    }

        // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here.
    uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
    uint8_t *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
    
/*    printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", 
        buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
        ((uint32_t*)fragment)[0]
    );*/
    memcpy(fragment_buffer, fragment, buffer->fragment_size);

        // FIXME: Sanity checks: verify is not a dublicate fragment?
    atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);

    if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
        if (dataset_id == buffer->current_id)
	    ready = TRUE;
    }
    
    return ready;
}



gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) {
    guint buffer_id = buffer->current_id % buffer->ring_size;
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;

    memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
    buffer->n_fragments[buffer_id] = 0;
    buffer->current_id += 1;

    return TRUE;    
}