summaryrefslogtreecommitdiffstats
path: root/src/ufo-roof-buffer.c
blob: f071481a964dd7f18364ca0e2720d9edb2a1d3ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <stdint.h>

#include "glib.h"

#include "ufo-roof.h"
#include "ufo-roof-buffer.h"

// This is currently not thread safe. With dual-filter architecture this will be called sequentially.

UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
    UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
    if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");

    buffer->ring_size = cfg->buffer_size;
    buffer->fragment_size = cfg->payload_size;
    buffer->dataset_size = cfg->dataset_size;
    buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
    buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
//    printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);

    buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
    buffer->n_fragments = (_Atomic int*)calloc(buffer->ring_size, sizeof(_Atomic int));
    buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));

    if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
        ufo_roof_buffer_free(buffer);
        roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
    }

    return buffer;
}

void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
    if (buffer) {
        if (buffer->ring_buffer)
            free(buffer->ring_buffer);
        if (buffer->n_fragments)
            free(buffer->n_fragments);
        if (buffer->stream_fragment)
            free(buffer->stream_fragment);

        free(buffer);
    }
}

    // fragment_id is numbered from 1 (0 - means auto)
gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
    guint buffer_id;
    guint dataset_id;

    if (!fragment_id) {
        fragment_id = ++buffer->stream_fragment[stream_id];
    }

        // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
    dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
    fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
    buffer_id = dataset_id % buffer->ring_size;

        // Late arrived packed
//    printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
    if (dataset_id < buffer->current_id)
        roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);

        // We are not fast enough, new packets are arrvining to fast        
    if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
            // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?

        if (error)
            root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed", 
                buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);

            // FIXME: Send semi-complete buffers further?
            // FIXME: Or shall we drop more if larger buffers are allocated?
        for (int i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
            buffer->n_fragments[i%buffer->ring_size] = 0;
        buffer->current_id = dataset_id - buffer->ring_size + 1;

            // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
            // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value 
            // The updates may happen after writting/reading is finished.
    }

        // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here.
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
    void *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
    
/*    printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n", 
        buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
        ((uint32_t*)fragment)[0]
    );*/
    memcpy(fragment_buffer, fragment, buffer->fragment_size);

        // FIXME: Sanity checks: verify is not a dublicate fragment?
    atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);

    if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
            // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
        if (dataset_id == buffer->current_id) {
            return TRUE;    
        }
    }
    
    return FALSE;
}



gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) {
    guint buffer_id = buffer->current_id % buffer->ring_size;
    void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;

        // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
    if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;

    memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
    buffer->n_fragments[buffer_id] = 0;
    buffer->current_id += 1;

    return TRUE;    
}