summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/hardware.txt2
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/ufo-roof-buffer.c79
-rw-r--r--src/ufo-roof-buffer.h17
-rw-r--r--src/ufo-roof-build-task.c21
-rw-r--r--src/ufo-roof-config.c4
-rw-r--r--src/ufo-roof-config.h1
-rw-r--r--src/ufo-roof-filter-task.c2
-rw-r--r--tests/roof.yaml1
9 files changed, 102 insertions, 27 deletions
diff --git a/docs/hardware.txt b/docs/hardware.txt
index 50c3a0c..006829d 100644
--- a/docs/hardware.txt
+++ b/docs/hardware.txt
@@ -1,3 +1,4 @@
+ - The packet ids are 64-bit and in big-endian data format
- Jumbo frames are not currently supported, max packet size is 1500 bytes.
* The maximum number of samples per packet can be computed as
n = (1500 - header_size) / sample_size (pixels_per_module * bpp) i.e. 46 = | 1492 / 32 |
@@ -9,3 +10,4 @@
Questions
=========
- Do we need to compute 'flats' and 'darks' for each plane separately? Or just one set will work for all?
+ - How do FPGA syncrhonize IDs? Is it reliable. I currently have desync in receiving (but this could be caused by networking stack problems) \ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index bbbc5fb..720c18f 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -5,7 +5,7 @@ set(ufofilter_SRCS
ufo-roof-read-task.c
ufo-roof-build-task.c
ufo-roof-filter-task.c
- ufo-roof-flat-field-correct.c
+ ufo-roof-flat-field-correct-task.c
)
set(common_SRCS
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
index 32598c9..0e0a890 100644
--- a/src/ufo-roof-buffer.c
+++ b/src/ufo-roof-buffer.c
@@ -19,6 +19,7 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d
buffer->max_datasets = max_datasets;
buffer->ring_size = cfg->buffer_size;
buffer->drop_buffers = cfg->drop_buffers;
+ buffer->latency_buffers = cfg->latency_buffers;
buffer->n_dims = n_dims;
buffer->dataset_size = cfg->dataset_size;
buffer->dataset_dims[0] = cfg->fan_bins * cfg->bit_depth / 8;
@@ -35,6 +36,15 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d
buffer->n_fragments = (_Atomic guint*)calloc(buffer->ring_size, sizeof(_Atomic int));
buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
+#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+ buffer->first_id = malloc(buffer->ring_size * sizeof(guint64));
+ if (!buffer->first_id) roof_new_error(error, "Can't allocate first_id buffer for ROOF datasets");
+ for (guint i = 0; i < buffer->ring_size; i++)
+ buffer->first_id[i] = (guint64)-1;
+#else
+ buffer->first_id = (guint64)-1;
+#endif
+
if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
ufo_roof_buffer_free(buffer);
roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
@@ -45,6 +55,10 @@ UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_d
void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
if (buffer) {
+#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+ if (buffer->first_id)
+ free(buffer->first_id);
+#endif
if (buffer->ring_buffer)
free(buffer->ring_buffer);
if (buffer->n_fragments)
@@ -57,10 +71,11 @@ void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
}
// fragment_id is numbered from 1 (0 - means auto)
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error) {
gboolean ready = FALSE;
guint buffer_id;
- guint dataset_id;
+ guint64 first_id;
+ guint64 dataset_id;
if (!fragment_id) {
fragment_id = ++buffer->stream_fragment[stream_id];
@@ -69,21 +84,37 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
+
+#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+ if (buffer->first_id[stream_id] == (guint64)-1)
+ buffer->first_id[stream_id] = dataset_id;
+ first_id = buffer->first_id[stream_id];
+#else
+ if (buffer->first_id == (guint64)-1)
+ buffer->first_id = dataset_id;
+ first_id = buffer->first_id;
+#endif
+ if (dataset_id < first_id)
+ return FALSE;
+
+ dataset_id -= first_id;
buffer_id = dataset_id % buffer->ring_size;
// FIXME: Currently, this produces too much output. Introduce some kind of debugging mode?
- if (dataset_id < buffer->current_id)
+ if (dataset_id < buffer->current_id) {
+ roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %li, currently processing %li", dataset_id, buffer->current_id);
return FALSE;
-// roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+ }
- if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets))
+ if ((buffer->max_datasets)&&(dataset_id >= buffer->max_datasets)) {
+// printf("Stream %i: dataset %li < %li, first_id: %li\n", stream_id, dataset_id, buffer->max_datasets, first_id);
return FALSE;
-
+ }
// We are not fast enough, new packets are arrvining to fast
if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
// FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
- root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %i. Dropping datasets from %i to %i, dataset %i has %i parts of %i completed",
+ root_set_network_error(error, "Ring buffer exhausted. Get packet for dataset %li. Dropping datasets from %li to %li, dataset %li has %i parts of %i completed",
dataset_id, buffer->current_id, dataset_id - (buffer->ring_size - buffer->drop_buffers), buffer->current_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
// FIXME: Send semi-complete buffers further?
@@ -105,10 +136,14 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// The updates may happen after writting/reading is finished.
}
-/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n",
- buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
- ((uint32_t*)fragment)[0]
- );*/
+/*
+ printf("dataset: %lu (%u) is %u of %u complete, new packet: %lu (%ux%u %lu), packet_size: %u [%x]\n",
+ dataset_id, buffer_id,
+ buffer->n_fragments[buffer_id] + 1, buffer->fragments_per_dataset,
+ stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id,
+ buffer->fragment_size, ((uint32_t*)fragment)[0]
+ );
+*/
uint8_t *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
if (buffer->n_dims == 2) {
@@ -116,8 +151,8 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
stream_id * buffer->fragment_dims[0] + // x-coordinate
(fragment_id * buffer->fragment_dims[1]) * buffer->dataset_dims[0]; // y-coordinate
- for (int i = 0; i < buffer->fragment_dims[1]; ++i) {
- memcpy(fragment_buffer + i * buffer->dataset_dims[0], fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
+ for (guint i = 0; i < buffer->fragment_dims[1]; ++i) {
+ memcpy(fragment_buffer + i * buffer->dataset_dims[0], (uint8_t*)fragment + i * buffer->fragment_dims[0], buffer->fragment_dims[0]);
}
} else {
// 1D stracture, simply putting fragment at the appropriate position in the stream
@@ -132,11 +167,29 @@ gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, gu
// FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
if (dataset_id == buffer->current_id)
ready = TRUE;
+ else if ((buffer->latency_buffers)&&(dataset_id >= (buffer->current_id + buffer->latency_buffers)))
+ ready = ufo_roof_buffer_skip_to_ready(buffer);
}
return ready;
}
+gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer) {
+ for (guint i = 0; i < buffer->ring_size; i++) {
+ guint64 id = buffer->current_id + i;
+ guint buffer_id = id % buffer->ring_size;
+
+ if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
+ buffer->current_id = id;
+ return TRUE;
+ }
+
+// printf("Skipping event %lu (%u), only %u of %u fragments are ready\n", id, buffer_id, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+ buffer->n_fragments[buffer_id] = 0;
+ }
+
+ return FALSE;
+}
gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error) {
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
index 3d7ad2d..8e9c00b 100644
--- a/src/ufo-roof-buffer.h
+++ b/src/ufo-roof-buffer.h
@@ -1,20 +1,30 @@
#ifndef __UFO_ROOF_BUFFER_H
#define __UFO_ROOF_BUUFER_H
+ // This IS harmful! Just for testing
+//#define UFO_ROOF_INDEPENDENT_STREAMS
+
#include <stdatomic.h>
+
struct _UfoRoofBuffer {
- guint current_id; // The ID of the first (active) dataset in the buffer
+ guint64 current_id; // The ID of the first (active) dataset in the buffer
+#ifdef UFO_ROOF_INDEPENDENT_STREAMS
+ guint64 *first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
+#else
+ guint64 first_id; // The ID of the first received dataset (used for numbering), -1 means not yet known
+#endif
guint ring_size; // Number of datasets to buffer
guint drop_buffers; // If we need to catch up
+ guint latency_buffers; // we skip incomplete buffers if current_id + latency_buffers is ready
uint8_t *ring_buffer; // The ring buffer
_Atomic guint *n_fragments; // Number of completed fragments in each buffer
guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams)
// int *fragments; // Mark individual completed fragments (if we care for partial data)
- guint max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped
+ guint64 max_datasets; // Only the specified number of datasets will be buffered, the rest will be silently dropped
guint n_dims; // Indicates if we just assemble one fragment after another or there is 2D/3D data structure (ROOF)
guint dataset_size; // Size (in bytes) of a full dataset
guint dataset_dims[2]; // x (in bytes), y (in rows)
@@ -30,7 +40,8 @@ typedef struct _UfoRoofBuffer UfoRoofBuffer;
UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, guint n_dims, guint max_datasets, GError **error);
void ufo_roof_buffer_free(UfoRoofBuffer *buf);
-gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error);
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint64 fragment_id, gconstpointer fragment, GError **error);
+gboolean ufo_roof_buffer_skip_to_ready(UfoRoofBuffer *buffer);
gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, gulong *seqid, GError **error);
#endif
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
index 9d55d38..a39ed11 100644
--- a/src/ufo-roof-build-task.c
+++ b/src/ufo-roof-build-task.c
@@ -18,6 +18,7 @@
*/
#include <stdio.h>
+#include <endian.h>
#ifdef __APPLE__
#include <OpenCL/cl.h>
@@ -46,7 +47,7 @@ struct _UfoRoofBuildTaskPrivate {
gboolean stop; // Stop flag
gboolean simulate; // Indicates if we are running in network or simulation modes
- guint announced; // For debugging
+ guint64 announced; // For debugging
struct timespec last_fragment_timestamp;
};
@@ -210,12 +211,12 @@ ufo_roof_build_task_process (UfoTask *task,
const uint8_t *fragment = data;
for (guint i = 0; i < header->n_packets; i++) {
- guint packet_id = 0;
+ guint64 packet_id = 0;
// Otherwise considered consecutive and handled by the buffer
if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(fragment);
- packet_id = pheader->packet_id + 1;
+ packet_id = be64toh(pheader->packet_id) + 1;
}
// FIXME: Can we kill here the dataset finished during the previous step of iteration
@@ -235,8 +236,14 @@ ufo_roof_build_task_process (UfoTask *task,
// No new accepted events within timeout
if (((current_time.tv_sec - priv->last_fragment_timestamp.tv_sec) * 1000000 + (current_time.tv_nsec - priv->last_fragment_timestamp.tv_nsec) / 1000) > cfg->network_timeout) {
- priv->stop = TRUE;
- g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ ready = ufo_roof_buffer_skip_to_ready(buf);
+ if (ready) {
+ // FIXME: shall we really reset timer here?
+ clock_gettime(CLOCK_REALTIME, &priv->last_fragment_timestamp);
+ } else {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
}
}
@@ -306,13 +313,13 @@ ufo_roof_build_task_generate (UfoTask *task,
// FIXME: Or shall we start from counting from the ID of the first registerd dataset
if ((priv->number)&&(buf->current_id >= priv->number)) {
-// printf("%u datasets processed, stopping\n", buf->current_id);
+// printf("%lu datasets processed, stopping\n", buf->current_id);
priv->stop = TRUE;
g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
}
if (((priv->number > 0)&&(priv->number <= 100))||((buf->current_id - priv->announced) > 1000)) {
- printf("Generating dataset %i (%s), next: %u out of %u)\n", buf->current_id, ready?"yes":" no", buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
+ printf("Processing dataset %li (%s), next: %u out of %u\n", buf->current_id + (ready?0:1), (ready?"ready ":"timeout "), buf->n_fragments[buf->current_id%buf->ring_size], buf->fragments_per_dataset);
priv->announced = buf->current_id;
}
diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c
index 17f4b30..944ee31 100644
--- a/src/ufo-roof-config.c
+++ b/src/ufo-roof-config.c
@@ -94,6 +94,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->max_packets = 100;
cfg->dataset_size = 0;
cfg->buffer_size = 2;
+ cfg->latency_buffers = 0;
cfg->drop_buffers = 0;
@@ -193,6 +194,7 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once");
roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
roof_config_node_get(cfg->drop_buffers, performance, int, "drop_buffers");
+ roof_config_node_get(cfg->latency_buffers, performance, int, "latency_buffers");
}
@@ -237,7 +239,5 @@ UfoRoofConfig *ufo_roof_config_new(const char *config, UfoRoofConfigFlags flags,
cfg->drop_buffers = cfg->buffer_size / 2;
}
- printf("dataset size: %i\n", cfg->dataset_size);
-
return cfg;
}
diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h
index 34bef0b..8718381 100644
--- a/src/ufo-roof-config.h
+++ b/src/ufo-roof-config.h
@@ -37,6 +37,7 @@ typedef struct {
guint max_packet_size; // payload_size + header_size + ... (we don't care if tail is variable length provided that the complete packet does not exceed max_packet_size bytes)
guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected
guint drop_buffers; // If we are slow and lost some buffers, we may drop more than minimally necessary to catch up.
+ guint latency_buffers; // We skip incomplete buffers if later (at least latency_buffer in future) dataset is already ready, 0 - never skip
guint network_timeout; // Maximum time (us) to wait for data on the socket
diff --git a/src/ufo-roof-filter-task.c b/src/ufo-roof-filter-task.c
index 01bc742..3cee601 100644
--- a/src/ufo-roof-filter-task.c
+++ b/src/ufo-roof-filter-task.c
@@ -103,7 +103,7 @@ ufo_roof_filter_task_process (UfoTask *task,
priv = UFO_ROOF_FILTER_TASK_GET_PRIVATE (task);
if (priv->plane) {
- int buf_plane;
+ guint buf_plane;
GValue *value;
value = ufo_buffer_get_metadata(inputs[0], "plane");
diff --git a/tests/roof.yaml b/tests/roof.yaml
index 0a0ce1d..c9754ec 100644
--- a/tests/roof.yaml
+++ b/tests/roof.yaml
@@ -26,6 +26,7 @@ network:
performance:
buffer_size: 10
# drop_buffers: 0
+# latency_buffers: 0
packets_at_once: 100
data:
base_path: "/home/csa/roof2_data/test_data"