summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2019-11-17 09:16:57 +0100
committerSuren A. Chilingaryan <csa@suren.me>2019-11-17 09:16:57 +0100
commit3d93df54d024f49895db6277e873dccd10b5baec (patch)
treec664797c69e4b4680d04aee7669da03e452e0c5d /src
downloadufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.gz
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.bz2
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.xz
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.zip
The first test (file file-base simmulation)
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt82
-rw-r--r--src/DEVELOPMENT26
-rw-r--r--src/kernels/CMakeLists.txt8
-rw-r--r--src/kernels/meson.build7
-rw-r--r--src/meson.build45
-rw-r--r--src/ufo-roof-buffer.c122
-rw-r--r--src/ufo-roof-buffer.h31
-rw-r--r--src/ufo-roof-build-task.c325
-rw-r--r--src/ufo-roof-build-task.h53
-rw-r--r--src/ufo-roof-config.c172
-rw-r--r--src/ufo-roof-config.h37
-rw-r--r--src/ufo-roof-error.h58
-rw-r--r--src/ufo-roof-read-file.c84
-rw-r--r--src/ufo-roof-read-file.h8
-rw-r--r--src/ufo-roof-read-socket.c118
-rw-r--r--src/ufo-roof-read-socket.h8
-rw-r--r--src/ufo-roof-read-task.c289
-rw-r--r--src/ufo-roof-read-task.h53
-rw-r--r--src/ufo-roof-read.h17
-rw-r--r--src/ufo-roof.h20
20 files changed, 1563 insertions, 0 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..0913528
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,82 @@
+cmake_minimum_required(VERSION 2.6)
+
+#{{{ Sources
+set(ufofilter_SRCS
+ ufo-roof-read-task.c
+ ufo-roof-build-task.c
+ )
+
+set(common_SRCS
+ ufo-roof-config.c
+ )
+
+set(roof_read_aux_SRCS
+ ufo-roof-read-socket.c
+ ufo-roof-read-file.c
+ )
+
+set(roof_build_aux_SRCS
+ ufo-roof-buffer.c
+ )
+
+
+file(GLOB ufofilter_KERNELS "kernels/*.cl")
+#}}}
+#{{{ Variables
+set(ufofilter_LIBS
+ m
+ ${UFO_LIBRARIES}
+ ${OpenCL_LIBRARIES})
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations")
+
+add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES)
+#}}}
+#{{{ Dependency checks
+
+
+#}}}
+#{{{ Plugin targets
+include_directories(${CMAKE_CURRENT_BINARY_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${OpenCL_INCLUDE_DIRS}
+ ${UFO_INCLUDE_DIRS})
+
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in
+ ${CMAKE_CURRENT_BINARY_DIR}/config.h)
+
+
+foreach(_src ${ufofilter_SRCS})
+ # find plugin suffix
+ string(REGEX REPLACE "ufo-([^ \\.]+)-task.*" "\\1" task "${_src}")
+
+ # build string to get auxalleanous sources
+ string(REPLACE "-" "_" _aux ${task})
+ string(TOUPPER ${_aux} _aux_upper)
+
+ # create an option name and add this to disable filters
+ set(_aux_src "${_aux}_aux_SRCS")
+ set(_aux_libs "${_aux}_aux_LIBS")
+
+ string(REPLACE "-" "" _targetname ${task})
+ set(target "ufofilter${_targetname}")
+
+ # build single shared library per filter
+ if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+ add_library(${target} MODULE ${_src} ${common_SRCS} ${${_aux_src}})
+ else()
+ add_library(${target} SHARED ${_src} ${common_SRCS} ${${_aux_src}})
+ endif()
+
+ target_link_libraries(${target} ${ufofilter_LIBS} ${${_aux_libs}} ufoaux)
+
+ list(APPEND all_targets ${target})
+
+ install(TARGETS ${target}
+ ARCHIVE DESTINATION ${UFO_PLUGINDIR}
+ LIBRARY DESTINATION ${UFO_PLUGINDIR})
+endforeach()
+#}}}
+#{{{ Subdirectories
+add_subdirectory(kernels)
+#}}}
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
new file mode 100644
index 0000000..f3381fb
--- /dev/null
+++ b/src/DEVELOPMENT
@@ -0,0 +1,26 @@
+Architecture
+===========
+ - Current implementation follows UFO architecture: reader and dataset-builder are split in two filters.
+ * The reader is multi-threaded. However, only a single instance of the builder is possible to schedule.
+ This could limit maximum throughput on dual-head or even signle-head, but many-core systems.
+ * Another problem here is timing. All events in the builder are initiaded from the reader. Consequently,
+ as it seems we can't timeout on semi-complete dataset if no new data is arriving.
+ * Besides, performance this is also critical for stability. With continuous streaming there is no problem,
+ however, if a finite number of frames requested and some packets are lost, the software will wait forever
+ for missing bits.
+
+
+Questions
+=========
+ - Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order
+ packets and copy them later (or buffer everything for simplicity). We can avoid this data copy if we can get
+ at least one packet in advance.
+
+ - How I can execute 'generate' method on 'reductor' filter if no new data on the input for the specified
+ amount of time. One option is sending empty buffer with metadata indicating timeout. But this is again
+ hackish.
+
+ - Can we use 16-bit buffers? I can set dimmensions to 1/4 of the correct value to address this. But is it
+ possible to do in a clean way?
+
+ - What is 'ufotools' python package mentioned in documentation? Just a typo?
diff --git a/src/kernels/CMakeLists.txt b/src/kernels/CMakeLists.txt
new file mode 100644
index 0000000..e7df764
--- /dev/null
+++ b/src/kernels/CMakeLists.txt
@@ -0,0 +1,8 @@
+cmake_minimum_required(VERSION 2.6)
+
+# copy kernels
+file(GLOB ufofilter_KERNELS "*.cl")
+
+foreach(_kernel ${ufofilter_KERNELS})
+ install(FILES ${_kernel} DESTINATION ${UFO_KERNELDIR})
+endforeach()
diff --git a/src/kernels/meson.build b/src/kernels/meson.build
new file mode 100644
index 0000000..9803fb9
--- /dev/null
+++ b/src/kernels/meson.build
@@ -0,0 +1,7 @@
+kernel_files = [
+]
+
+install_data(kernel_files,
+ install_dir: kernel_install_dir,
+)
+
diff --git a/src/meson.build b/src/meson.build
new file mode 100644
index 0000000..8f3529e
--- /dev/null
+++ b/src/meson.build
@@ -0,0 +1,45 @@
+plugins = [
+ 'roof-read',
+ 'roof-build',
+]
+
+roof_common_src = [
+ 'ufo-roof-config.c',
+]
+
+roof_plugin_src = {
+ 'roof-read': [
+ 'ufo-roof-read-socket.c',
+ 'ufo-roof-read-file.c',
+ ],
+ 'roof-build': [
+ 'ufo-roof-buffer.c',
+ ],
+}
+
+# standard plugins
+
+foreach plugin: plugins
+ name = ''.join(plugin.split('-'))
+
+ sources = roof_common_src + [
+ 'ufo-@0@-task.c'.format(plugin),
+ ]
+
+ if plugin in roof_plugin_src
+ sources += roof_plugin_src[plugin]
+ endif
+
+ shared_module(name,
+ 'ufo-@0@-task.c'.format(plugin),
+ dependencies: deps,
+ name_prefix: 'libufofilter',
+ install: true,
+ install_dir: plugin_install_dir,
+ sources: sources
+ )
+endforeach
+
+
+
+subdir('kernels')
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
new file mode 100644
index 0000000..f071481
--- /dev/null
+++ b/src/ufo-roof-buffer.c
@@ -0,0 +1,122 @@
+#include <stdio.h>
+#include <stdint.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+
+// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
+
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
+ UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
+ if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
+
+ buffer->ring_size = cfg->buffer_size;
+ buffer->fragment_size = cfg->payload_size;
+ buffer->dataset_size = cfg->dataset_size;
+ buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
+ buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
+// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);
+
+ buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
+ buffer->n_fragments = (_Atomic int*)calloc(buffer->ring_size, sizeof(_Atomic int));
+ buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
+
+ if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
+ ufo_roof_buffer_free(buffer);
+ roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
+ }
+
+ return buffer;
+}
+
+void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
+ if (buffer) {
+ if (buffer->ring_buffer)
+ free(buffer->ring_buffer);
+ if (buffer->n_fragments)
+ free(buffer->n_fragments);
+ if (buffer->stream_fragment)
+ free(buffer->stream_fragment);
+
+ free(buffer);
+ }
+}
+
+ // fragment_id is numbered from 1 (0 - means auto)
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+ guint buffer_id;
+ guint dataset_id;
+
+ if (!fragment_id) {
+ fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
+ dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
+ fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
+ buffer_id = dataset_id % buffer->ring_size;
+
+ // Late arrived packed
+// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
+ if (dataset_id < buffer->current_id)
+ roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+
+ // We are not fast enough, new packets are arrvining to fast
+ if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
+ // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
+
+ if (error)
+ root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed",
+ buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+
+ // FIXME: Send semi-complete buffers further?
+ // FIXME: Or shall we drop more if larger buffers are allocated?
+ for (int i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ buffer->current_id = dataset_id - buffer->ring_size + 1;
+
+ // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
+ // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
+ // The updates may happen after writting/reading is finished.
+ }
+
+ // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here.
+ void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+ void *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
+
+/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n",
+ buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
+ ((uint32_t*)fragment)[0]
+ );*/
+ memcpy(fragment_buffer, fragment, buffer->fragment_size);
+
+ // FIXME: Sanity checks: verify is not a dublicate fragment?
+ atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+
+ if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (dataset_id == buffer->current_id) {
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+
+
+gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) {
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+ void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;
+
+ memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
+ buffer->n_fragments[buffer_id] = 0;
+ buffer->current_id += 1;
+
+ return TRUE;
+}
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
new file mode 100644
index 0000000..bb71791
--- /dev/null
+++ b/src/ufo-roof-buffer.h
@@ -0,0 +1,31 @@
+#ifndef __UFO_ROOF_BUFFER_H
+#define __UFO_ROOF_BUUFER_H
+
+#include <stdatomic.h>
+
+struct _UfoRoofBuffer {
+ guint current_id; // The ID of the first (active) dataset in the buffer
+
+ guint ring_size; // Number of datasets to buffer
+ void *ring_buffer; // The ring buffer
+ _Atomic int *n_fragments; // Number of completed fragments in each buffer
+ guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams)
+// int *fragments; // Mark individual completed fragments (if we care for partial data)
+
+
+ guint dataset_size; // Size (in bytes) of a full dataset
+ guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment)
+
+ guint fragments_per_dataset; // Number of packets in dataset (used to compute when dataset is ready)
+ guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready)
+};
+
+typedef struct _UfoRoofBuffer UfoRoofBuffer;
+
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error);
+void ufo_roof_buffer_free(UfoRoofBuffer *buf);
+
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error);
+gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error);
+
+#endif
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
new file mode 100644
index 0000000..81c84ce
--- /dev/null
+++ b/src/ufo-roof-build-task.c
@@ -0,0 +1,325 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-build-task.h"
+
+
+struct _UfoRoofBuildTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+
+ guint number; // Number of datasets to read
+ gboolean stop; // Stop flag
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_STOP,
+ PROP_NUMBER,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_build_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL));
+}
+
+static void
+ufo_roof_build_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, &gerr);
+ if (!priv->cfg)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+
+ priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr);
+ if (!priv->buf)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+}
+
+static void
+ufo_roof_build_task_finalize (GObject *object)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->buf) {
+ ufo_roof_buffer_free(priv->buf);
+ priv->buf = NULL;
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+
+ G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
+}
+
+
+
+static void
+ufo_roof_build_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ guint bytes = priv->cfg->dataset_size;
+
+ // FIXME: Can this be made more elegant?
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+
+}
+
+static guint
+ufo_roof_build_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_roof_build_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 1;
+}
+
+static UfoTaskMode
+ufo_roof_build_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR;
+}
+
+static gboolean
+ufo_roof_build_task_process (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+ gboolean ready = FALSE;
+
+// UfoRequisition in_req;
+// ufo_buffer_get_requisition (inputs[0], &in_req);
+
+ void *data = ufo_buffer_get_host_array(inputs[0], NULL);
+ UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
+
+ if (priv->stop)
+ return FALSE;
+
+ for (int i = 0; i < header->n_packets; i++) {
+ int packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data);
+ packet_id = pheader->packet_id + 1;
+ }
+
+ ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ data += cfg->max_packet_size;
+ }
+
+ // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+
+// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets);
+
+ return !ready;
+}
+
+static gboolean
+ufo_roof_build_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ gboolean ready = FALSE;
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+// UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+
+ if (priv->stop)
+ return FALSE;
+
+ ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if ((priv->number)&&(buf->current_id >= priv->number)) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+
+// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id);
+
+ return ready;
+}
+
+static void
+ufo_roof_build_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ case PROP_NUMBER:
+ priv->number = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_build_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ case PROP_NUMBER:
+ g_value_set_uint (value, priv->number);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_build_task_setup;
+ iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_build_task_get_mode;
+ iface->get_requisition = ufo_roof_build_task_get_requisition;
+ iface->process = ufo_roof_build_task_process;
+ iface->generate = ufo_roof_build_task_generate;
+}
+
+static void
+ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_build_task_set_property;
+ oclass->get_property = ufo_roof_build_task_get_property;
+ oclass->finalize = ufo_roof_build_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_NUMBER] =
+ g_param_spec_uint("number",
+ "Number of datasets to receive",
+ "Number of datasets to receive",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+}
+
+static void
+ufo_roof_build_task_init(UfoRoofBuildTask *self)
+{
+ self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
+}
diff --git a/src/ufo-roof-build-task.h b/src/ufo-roof-build-task.h
new file mode 100644
index 0000000..cefb7ab
--- /dev/null
+++ b/src/ufo-roof-build-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ROOF_BUILD_TASK_H
+#define __UFO_ROOF_BUILD_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ROOF_BUILD_TASK (ufo_roof_build_task_get_type())
+#define UFO_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTask))
+#define UFO_IS_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_BUILD_TASK))
+#define UFO_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass))
+#define UFO_IS_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_BUILD_TASK))
+#define UFO_ROOF_BUILD_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass))
+
+typedef struct _UfoRoofBuildTask UfoRoofBuildTask;
+typedef struct _UfoRoofBuildTaskClass UfoRoofBuildTaskClass;
+typedef struct _UfoRoofBuildTaskPrivate UfoRoofBuildTaskPrivate;
+
+struct _UfoRoofBuildTask {
+ UfoTaskNode parent_instance;
+
+ UfoRoofBuildTaskPrivate *priv;
+};
+
+struct _UfoRoofBuildTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_roof_build_task_new (void);
+GType ufo_roof_build_task_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c
new file mode 100644
index 0000000..11f8bd4
--- /dev/null
+++ b/src/ufo-roof-config.c
@@ -0,0 +1,172 @@
+#include <stdio.h>
+#include <stdint.h>
+
+#include <json-glib/json-glib.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-error.h"
+#include "ufo-roof-config.h"
+
+#define roof_config_node_get_with_default(var, parent, type, name, default) do { \
+ JsonNode *node = json_object_get_member(parent, name); \
+ if (node) var = json_node_get_##type(node); \
+ else var = default; \
+ } while(0)
+
+#define roof_config_node_get_string_with_default(var, parent, name, default) do { \
+ const gchar *str; \
+ JsonNode *node = json_object_get_member(parent, name); \
+ if (node) str = json_node_get_string(node); \
+ else str = default; \
+ if (var != str) { \
+ if (var) g_free(var); \
+ var = g_strdup(str); \
+ } \
+ } while(0)
+
+#define roof_config_node_get(var, parent, type, name) \
+ roof_config_node_get_with_default(var, parent, type, name, var)
+
+#define roof_config_node_get_string(var, parent, name) \
+ roof_config_node_get_string_with_default(var, parent, name, var)
+
+
+typedef struct {
+ UfoRoofConfig cfg;
+
+ JsonParser *parser;
+} UfoRoofConfigPrivate;
+
+void ufo_roof_config_free(UfoRoofConfig *cfg) {
+ if (cfg) {
+ UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg;
+
+ if (cfg->path)
+ g_free(cfg->path);
+
+ if (priv->parser)
+ g_object_unref (priv->parser);
+
+ free(cfg);
+ }
+}
+
+UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
+ UfoRoofConfigPrivate *priv;
+ UfoRoofConfig *cfg;
+
+// JsonNode *node;
+ JsonObject *root = NULL;
+ JsonObject *hardware = NULL;
+ JsonObject *network = NULL;
+ JsonObject *performance = NULL;
+ JsonObject *simulation = NULL;
+
+ GError *gerr = NULL;
+
+ priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate));
+ if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig");
+
+ memset(priv, 0, sizeof(UfoRoofConfigPrivate));
+
+ // Set defaults
+ cfg = &priv->cfg;
+
+ cfg->port = 4000;
+ cfg->n_streams = 1;
+ cfg->protocol = "udp";
+ cfg->network_timeout = 10000000;
+ cfg->header_size = 0;
+ cfg->payload_size = 0;
+ cfg->max_packet_size = 0;
+ cfg->max_packets = 100;
+ cfg->dataset_size = 0;
+ cfg->buffer_size = 2;
+ cfg->path = NULL;
+
+ // Read configuration
+ priv->parser = json_parser_new_immutable ();
+ json_parser_load_from_file (priv->parser, config, &gerr);
+
+ if (gerr != NULL) {
+ g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config);
+ ufo_roof_config_free(cfg);
+ return NULL;
+ }
+
+ root = json_node_get_object (json_parser_get_root (priv->parser));
+
+ if (root) {
+ roof_config_node_get(hardware, root, object, "hardware");
+ roof_config_node_get(network, root, object, "network");
+ roof_config_node_get(simulation, root, object, "simulation");
+ }
+
+ if (hardware) {
+ // FIXME: Compute dataset size based on roof hardware
+ }
+
+ if (network) {
+// int max_packet_size = 0;
+
+ roof_config_node_get(cfg->port, network, int, "port");
+ roof_config_node_get(cfg->n_streams, network, int, "streams");
+
+ roof_config_node_get(cfg->max_packet_size, network, int, "max_packet_size");
+ // FIXME: compute payload_size based on sample_size
+ roof_config_node_get(cfg->payload_size, network, int, "payload_size");
+ roof_config_node_get(cfg->header_size, network, int, "header_size");
+ roof_config_node_get(cfg->dataset_size, network, int, "dataset_size");
+ }
+
+ if (performance) {
+ roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once");
+ roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
+ }
+
+ if (simulation) {
+ roof_config_node_get_string(cfg->path, simulation, "path");
+ roof_config_node_get(cfg->first_file_number, simulation, int, "first_file_number");
+ }
+
+ // Check configuration consistency
+ if (!cfg->payload_size) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "Packet size is not set");
+ }
+
+ if ((!cfg->header_size)&&(!cfg->path)) {
+ if (!strncmp(cfg->protocol, "udp", 3)) {
+ // Error if 0 implicitely set, use default value otherwise
+ if ((network)&&(json_object_get_member(network, "header_size"))) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "The header with packet ids is required for un-ordered protocols");
+ } else {
+ cfg->header_size = sizeof(uint32_t);
+ }
+ }
+ }
+
+ guint fragments_per_dataset = cfg->dataset_size / cfg->payload_size;
+ guint fragments_per_stream = fragments_per_dataset / cfg->n_streams;
+
+ if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams);
+ }
+
+ if (cfg->buffer_size * fragments_per_stream < cfg->max_packets) {
+ cfg->max_packets = cfg->buffer_size * fragments_per_stream / 2;
+ }
+
+ // Finalize configuration
+ if (!cfg->max_packet_size)
+ cfg->max_packet_size = cfg->header_size + cfg->payload_size;
+
+ if (!cfg->dataset_size)
+ cfg->dataset_size = cfg->payload_size;
+
+
+ return cfg;
+}
diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h
new file mode 100644
index 0000000..a22c84f
--- /dev/null
+++ b/src/ufo-roof-config.h
@@ -0,0 +1,37 @@
+#ifndef __UFO_ROOF_CONFIG_H
+#define __UFO_ROOF_CONFIG_H
+
+#include <glib.h>
+
+typedef struct {
+ gchar *path; // Location of data files for simmulation purposes (i.e. reading a sequence of files instead listening on the corresponding ports)
+ guint first_file_number; // Indicates if the numbering of files starts at 0 or 1
+ gchar *protocol; // Protocols: tcp, udp, tcp6, udp6, ...
+ guint port; // First port
+ guint n_streams; // Number of independent data streams (expected on sequential ports)
+ guint header_size; // Expected size of the packet header, for dgram protocols we need at least 32-bit sequence number. Defaults to uint32_t for udp* and 0 - otherwise
+ guint payload_size; // Expected size of TCP/UDP packet (without header)
+ guint dataset_size; // Size of a single dataset (image, sinogram, etc.). This is real size in bytes, excluding all technical headers used in communication protocol.
+
+//?
+
+
+/*
+ guint pixels_per_module;
+ guint planes_per_module;
+ guint samples_per_dataset;
+*/
+
+ guint max_packets; // limits maximum number of packets which are read at once
+ guint max_packet_size; // payload_size + header_size + ...?
+ guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected
+ guint network_timeout; // Maximum time (us) to wait for data on the socket
+
+
+
+} UfoRoofConfig;
+
+UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error);
+void ufo_roof_config_free(UfoRoofConfig *cfg);
+
+#endif /* __UFO_ROOF_CONFIG_H */
diff --git a/src/ufo-roof-error.h b/src/ufo-roof-error.h
new file mode 100644
index 0000000..ed0ae2b
--- /dev/null
+++ b/src/ufo-roof-error.h
@@ -0,0 +1,58 @@
+#ifndef __UFO_ROOF_ERROR_H
+#define __UFO_ROOF_ERROR_H
+
+#include <ufo/ufo.h>
+
+
+#define roof_print_error(error) do { \
+ g_warning("%s", error->message); \
+ g_error_free(error); \
+ error = NULL; \
+ } while (0)
+
+#define roof_set_error(error, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ } while (0)
+
+#define roof_error(error, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ return; \
+ } while (0)
+
+#define roof_propagate_error(error, err, msg...) do { \
+ g_propagate_prefixed_error(error, err, msg); \
+ return; \
+ } while (0)
+
+#define roof_error_with_retval(error, retval, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ return retval; \
+ } while (0)
+
+#define roof_propagate_error_with_retval(error, retval, err, msg...) do { \
+ g_propagate_prefixed_error(error, err, msg); \
+ return retval; \
+ } while (0)
+
+
+#define roof_setup_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+#define roof_new_error(error, msg...) \
+ roof_error_with_retval(error, NULL, SETUP, msg)
+
+
+#define roof_network_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+#define root_set_network_error(error, msg...) \
+ roof_set_error(error, SETUP, msg)
+
+#define roof_network_error_with_retval(error, retval, msg...) \
+ roof_error_with_retval(error, retval, SETUP, msg)
+
+#define roof_memory_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+
+#endif /* __UFO_ROOF_ERROR_H */
diff --git a/src/ufo-roof-read-file.c b/src/ufo-roof-read-file.c
new file mode 100644
index 0000000..de8391e
--- /dev/null
+++ b/src/ufo-roof-read-file.c
@@ -0,0 +1,84 @@
+#include <stdio.h>
+#include <errno.h>
+#include <stdint.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-file.h"
+
+typedef struct {
+ UfoRoofReadInterface iface;
+
+ UfoRoofConfig *cfg;
+
+ gchar *fname;
+ FILE *fd;
+} UfoRoofReadFile;
+
+static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
+
+ if (reader) {
+ if (reader->fname)
+ g_free(reader->fname);
+
+ if (reader->fd >= 0)
+ fclose(reader->fd);
+
+ free(reader);
+ }
+}
+
+static guint ufo_roof_read_file(UfoRoofReadInterface *iface, void *buffers, GError **error) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
+ UfoRoofConfig *cfg = reader->cfg;
+
+ size_t bytes = 0;
+ size_t packet_size = cfg->header_size + cfg->payload_size;
+ size_t expected = cfg->max_packets * packet_size;
+
+ while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) {
+ size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd);
+ bytes += ret;
+ }
+
+ guint packets = bytes / packet_size;
+
+ if (ferror(reader->fd)) {
+ roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd));
+ } else if ((feof(reader->fd))&&(bytes % packet_size)) {
+ roof_network_error_with_retval(error, packets, "extra data in the end of input");
+ }
+
+ return packets;
+}
+
+
+UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile));
+ if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile");
+
+ // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now.
+ if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size)
+ roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
+
+ reader->cfg = cfg;
+ reader->iface.close = ufo_roof_read_file_free;
+ reader->iface.read =ufo_roof_read_file;
+
+ reader->fname = g_strdup_printf(cfg->path, id + cfg->first_file_number);
+ if (!reader->fname) {
+ free(reader);
+ roof_new_error(error, "Can't build file name");
+ }
+
+ reader->fd = fopen(reader->fname, "rb");
+ if (!reader->fd) {
+ g_free(reader->fname);
+ g_free(reader);
+ roof_new_error(error, "Can't open file %s", reader->fname);
+ }
+
+ return (UfoRoofReadInterface*)reader;
+}
diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h
new file mode 100644
index 0000000..54bcf49
--- /dev/null
+++ b/src/ufo-roof-read-file.h
@@ -0,0 +1,8 @@
+#ifndef __UFO_ROOF_READ_FILE_H
+#define __UFO_ROOF_READ_FILE_H
+
+#include "ufo-roof-read.h"
+
+UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
new file mode 100644
index 0000000..b72e9d0
--- /dev/null
+++ b/src/ufo-roof-read-socket.c
@@ -0,0 +1,118 @@
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-socket.h"
+
+typedef struct {
+ UfoRoofReadInterface iface;
+
+ UfoRoofConfig *cfg;
+ int socket;
+} UfoRoofReadSocket;
+
+static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
+
+ if (reader) {
+ if (reader->socket >= 0)
+ close(reader->socket);
+ free(reader);
+ }
+}
+
+static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, void *buf, GError **error) {
+ struct timespec timeout_ts;
+
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
+ UfoRoofConfig *cfg = reader->cfg;
+
+ struct mmsghdr msg[cfg->max_packets];
+ struct iovec msgvec[cfg->max_packets];
+
+ timeout_ts.tv_sec = cfg->network_timeout / 1000000;
+ timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
+
+ // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < cfg->max_packets; i++) {
+ msgvec[i].iov_base = buf + i * cfg->max_packet_size;
+ msgvec[i].iov_len = cfg->max_packet_size;
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
+
+ // FIXME: Shall we verify packets consistency here? We can check at least the sizes...
+
+ return (guint)packets;
+}
+
+
+UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
+ int err;
+ int port = cfg->port + id;
+ char port_str[16];
+ const char *addr_str = "0.0.0.0";
+ struct addrinfo sockaddr_hints;
+ struct addrinfo *sockaddr_info;
+
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
+ if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");
+
+ reader->cfg = cfg;
+ reader->iface.close = ufo_roof_read_socket_free;
+ reader->iface.read =ufo_roof_read_socket;
+
+ snprintf(port_str, sizeof(port_str), "%d", port);
+ port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
+
+ memset(&sockaddr_hints, 0, sizeof(sockaddr_hints));
+ if (!strncmp(cfg->protocol, "udp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_DGRAM;
+ sockaddr_hints.ai_protocol = IPPROTO_UDP;
+ } else if (!strncmp(cfg->protocol, "tcp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_STREAM;
+ sockaddr_hints.ai_protocol = IPPROTO_TCP;
+ } else {
+ roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol);
+ }
+
+ err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+ if (err || !sockaddr_info) {
+ free(reader);
+ roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str);
+ }
+
+ reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol);
+ if(reader->socket == -1) {
+ freeaddrinfo(sockaddr_info);
+ free(reader);
+ roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str);
+ }
+
+ err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+ if(err != 0) {
+ freeaddrinfo(sockaddr_info);
+ close(reader->socket);
+ free(reader);
+ roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
+ }
+
+ freeaddrinfo(sockaddr_info);
+
+ return (UfoRoofReadInterface*)reader;
+}
diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h
new file mode 100644
index 0000000..74b0742
--- /dev/null
+++ b/src/ufo-roof-read-socket.h
@@ -0,0 +1,8 @@
+#ifndef __UFO_ROOF_READ_SOCKET_H
+#define __UFO_ROOF_READ_SOCKET_H
+
+#include "ufo-roof-read.h"
+
+UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
new file mode 100644
index 0000000..ebff9de
--- /dev/null
+++ b/src/ufo-roof-read-task.c
@@ -0,0 +1,289 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdio.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-socket.h"
+#include "ufo-roof-read-file.h"
+#include "ufo-roof-read-task.h"
+
+struct _UfoRoofReadTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofReadInterface *reader;
+
+ guint id; // Reader ID (defince sequential port number)
+ gboolean stop; // Flag requiring termination
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofReadTask, ufo_roof_read_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_READ_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_ID,
+ PROP_STOP,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_read_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_READ_TASK, NULL));
+}
+
+static void
+ufo_roof_read_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ GError *gerr = NULL;
+
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, &gerr);
+ if (!priv->cfg) roof_propagate_error(error, gerr, "roof_config_new: ");
+
+ // Consistency checks
+ if (priv->id >= priv->cfg->n_streams)
+ roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams);
+
+ // Start actual reader
+ if (priv->cfg->path)
+ priv->reader = ufo_roof_read_file_new(priv->cfg, priv->id, &gerr);
+ else
+ priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr);
+
+ if (!priv->reader)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+
+}
+
+
+static void
+ufo_roof_read_task_finalize (GObject *object)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ if (priv->reader) {
+ priv->reader->close(priv->reader);
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+ G_OBJECT_CLASS (ufo_roof_read_task_parent_class)->finalize (object);
+}
+
+static void
+ufo_roof_read_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+
+ guint bytes = cfg->max_packets * cfg->max_packet_size + sizeof(UfoRoofPacketBlockHeader);
+
+ // FIXME: Can this be made more elegant?
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+}
+
+static guint
+ufo_roof_read_task_get_num_inputs (UfoTask *task)
+{
+ return 0;
+}
+
+static guint
+ufo_roof_read_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 0;
+}
+
+static UfoTaskMode
+ufo_roof_read_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_roof_read_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ GError *gerr = NULL;
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+ UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg);
+
+ if (priv->stop)
+ return FALSE;
+
+ guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
+ if (gerr) {
+ g_warning("Error reciving data: %s", gerr->message);
+ return FALSE;
+ }
+
+ // FIXME: End of data (shall we restart in the network case?)
+ if (!packets)
+ return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+ header->n_packets = packets;
+
+ return TRUE;
+}
+
+static void
+ufo_roof_read_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_ID:
+ priv->id = g_value_get_uint (value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_read_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_ID:
+ g_value_set_uint (value, priv->id);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_read_task_setup;
+ iface->get_num_inputs = ufo_roof_read_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_read_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_read_task_get_mode;
+ iface->get_requisition = ufo_roof_read_task_get_requisition;
+ iface->generate = ufo_roof_read_task_generate;
+}
+
+static void
+ufo_roof_read_task_class_init (UfoRoofReadTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_read_task_set_property;
+ oclass->get_property = ufo_roof_read_task_get_property;
+ oclass->finalize = ufo_roof_read_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_ID] =
+ g_param_spec_uint ("id",
+ "Reader ID",
+ "ID for multi-port servers",
+ 0, G_MAXUINT, 1,
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofReadTaskPrivate));
+}
+
+static void
+ufo_roof_read_task_init(UfoRoofReadTask *self)
+{
+ self->priv = UFO_ROOF_READ_TASK_GET_PRIVATE(self);
+}
diff --git a/src/ufo-roof-read-task.h b/src/ufo-roof-read-task.h
new file mode 100644
index 0000000..2fe45c4
--- /dev/null
+++ b/src/ufo-roof-read-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ROOF_READ_TASK_H
+#define __UFO_ROOF_READ_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ROOF_READ_TASK (ufo_roof_read_task_get_type())
+#define UFO_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTask))
+#define UFO_IS_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_READ_TASK))
+#define UFO_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass))
+#define UFO_IS_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_READ_TASK))
+#define UFO_ROOF_READ_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass))
+
+typedef struct _UfoRoofReadTask UfoRoofReadTask;
+typedef struct _UfoRoofReadTaskClass UfoRoofReadTaskClass;
+typedef struct _UfoRoofReadTaskPrivate UfoRoofReadTaskPrivate;
+
+struct _UfoRoofReadTask {
+ UfoTaskNode parent_instance;
+
+ UfoRoofReadTaskPrivate *priv;
+};
+
+struct _UfoRoofReadTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_roof_read_task_new (void);
+GType ufo_roof_read_task_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h
new file mode 100644
index 0000000..50dbdf3
--- /dev/null
+++ b/src/ufo-roof-read.h
@@ -0,0 +1,17 @@
+#ifndef __UFO_ROOF_READ_H
+#define __UFO_ROOF_READ_H
+
+#include "ufo-roof-config.h"
+
+typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
+
+typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, void *buf, GError **error);
+typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
+
+struct _UfoRoofReadInterface {
+ UfoRoofReaderRead read;
+ UfoRoofReaderClose close;
+};
+
+
+#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof.h b/src/ufo-roof.h
new file mode 100644
index 0000000..c3edda2
--- /dev/null
+++ b/src/ufo-roof.h
@@ -0,0 +1,20 @@
+#ifndef __UFO_ROOF_H
+#define __UFO_ROOF_H
+
+#include "ufo-roof-config.h"
+#include "ufo-roof-error.h"
+
+#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf))
+#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((void*)buf) + cfg->max_packets * cfg->max_packet_size))
+
+typedef struct {
+ uint32_t packet_id; // Sequential Packet ID (numbered from 0)
+} UfoRoofPacketHeader;
+
+typedef struct {
+ uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0)
+ uint32_t n_packets; // Number of packets
+} UfoRoofPacketBlockHeader;
+
+
+#endif /* __UFO_ROOF_H */