summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSuren A. Chilingaryan <csa@suren.me>2019-11-17 09:16:57 +0100
committerSuren A. Chilingaryan <csa@suren.me>2019-11-17 09:16:57 +0100
commit3d93df54d024f49895db6277e873dccd10b5baec (patch)
treec664797c69e4b4680d04aee7669da03e452e0c5d
downloadufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.gz
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.bz2
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.tar.xz
ufo-roof-3d93df54d024f49895db6277e873dccd10b5baec.zip
The first test (file file-base simmulation)
-rw-r--r--CMakeLists.txt56
-rw-r--r--build.sh1
-rw-r--r--meson.build52
-rw-r--r--src/CMakeLists.txt82
-rw-r--r--src/DEVELOPMENT26
-rw-r--r--src/kernels/CMakeLists.txt8
-rw-r--r--src/kernels/meson.build7
-rw-r--r--src/meson.build45
-rw-r--r--src/ufo-roof-buffer.c122
-rw-r--r--src/ufo-roof-buffer.h31
-rw-r--r--src/ufo-roof-build-task.c325
-rw-r--r--src/ufo-roof-build-task.h53
-rw-r--r--src/ufo-roof-config.c172
-rw-r--r--src/ufo-roof-config.h37
-rw-r--r--src/ufo-roof-error.h58
-rw-r--r--src/ufo-roof-read-file.c84
-rw-r--r--src/ufo-roof-read-file.h8
-rw-r--r--src/ufo-roof-read-socket.c118
-rw-r--r--src/ufo-roof-read-socket.h8
-rw-r--r--src/ufo-roof-read-task.c289
-rw-r--r--src/ufo-roof-read-task.h53
-rw-r--r--src/ufo-roof-read.h17
-rw-r--r--src/ufo-roof.h20
-rw-r--r--tests/roof.json31
-rw-r--r--tests/roof.py46
-rwxr-xr-xtests/roof.sh3
-rw-r--r--tests/roof.yaml26
-rw-r--r--tests/test_file.sh11
28 files changed, 1789 insertions, 0 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
new file mode 100644
index 0000000..37c38fc
--- /dev/null
+++ b/CMakeLists.txt
@@ -0,0 +1,56 @@
+cmake_minimum_required(VERSION 2.8)
+
+project(ufo C CXX)
+
+set(TARNAME "ufo-roof")
+
+set(UFO_ROOF_VERSION_MAJOR "0")
+set(UFO_ROOF_VERSION_MINOR "0")
+set(UFO_ROOF_VERSION_PATCH "1")
+set(UFO_ROOF_VERSION_STRING_LONG "${UFO_ROOF_VERSION_MAJOR}.${UFO_ROOF_VERSION_MINOR}.${UFO_ROOF_VERSION_PATCH}")
+set(UFO_ROOF_VERSION_STRING_SHORT "${UFO_ROOF_VERSION_MAJOR}.${UFO_ROOF_VERSION_MINOR}")
+
+set(UFO_DESCRIPTION "UFO roof filters")
+set(UFO_DESCRIPTION_SUMMARY "UFO roof filters")
+
+list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/common/cmake")
+
+if (APPLE)
+ set(CMAKE_MACOSX_RPATH "ON")
+ set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib")
+endif()
+
+include(GNUInstallDirs)
+include(PkgConfigVars)
+
+set(PKG_UFO_CORE_MIN_REQUIRED "0.16")
+
+
+option(WITH_PROFILING "Enable profiling" OFF)
+
+if (WITH_PROFILING)
+ add_definitions("-pg")
+ set(CMAKE_C_FLAGS "-pg")
+endif ()
+
+find_package(OpenCL REQUIRED)
+find_package(PkgConfig REQUIRED)
+
+pkg_check_modules(UFO ufo>=${PKG_UFO_CORE_MIN_REQUIRED} REQUIRED)
+
+pkg_check_variable(ufo plugindir)
+pkg_check_variable(ufo kerneldir)
+
+link_directories(${UFO_LIBRARY_DIRS})
+add_definitions("-Wall -Wextra -fPIC")
+add_definitions(-DG_LOG_DOMAIN="Ufo")
+
+if (CMAKE_COMPILER_IS_GNUCC OR ("${CMAKE_C_COMPILER_ID}" STREQUAL "Clang"))
+ add_definitions("-Wno-unused-parameter")
+endif ()
+enable_testing()
+
+#add_subdirectory(docs)
+#add_subdirectory(deps)
+add_subdirectory(src)
+#add_subdirectory(tests)
diff --git a/build.sh b/build.sh
new file mode 100644
index 0000000..050ef22
--- /dev/null
+++ b/build.sh
@@ -0,0 +1 @@
+PKG_CONFIG_PATH="/usr/local/lib64/pkgconfig/" meson build
diff --git a/meson.build b/meson.build
new file mode 100644
index 0000000..b2b5f3a
--- /dev/null
+++ b/meson.build
@@ -0,0 +1,52 @@
+project('ufo-roof',
+ ['c', 'cpp'],
+ version: '0.0.1'
+)
+
+version = meson.project_version()
+components = version.split('.')
+version_major = components[0]
+version_minor = components[1]
+version_patch = components[2]
+
+cc = meson.get_compiler('c')
+
+add_global_arguments(
+ '-DGLIB_DISABLE_DEPRECATION_WARNINGS',
+ '-DCL_USE_DEPRECATED_OPENCL_1_1_APIS',
+ '-DCL_USE_DEPRECATED_OPENCL_1_2_APIS',
+ language: 'c'
+)
+
+if cc.get_id() == 'gcc'
+ add_global_arguments(
+ '-Wno-unused-parameter',
+ '-fopenmp',
+ language: ['c', 'cpp'])
+ add_global_link_arguments('-fopenmp', language: 'c')
+endif
+
+opencl_dep = declare_dependency(dependencies: cc.find_library('OpenCL'))
+ufo_dep = dependency('ufo', version: '>= 0.16')
+
+m_dep = declare_dependency(
+ dependencies: cc.find_library('m')
+)
+
+plugin_install_dir = ufo_dep.get_pkgconfig_variable('plugindir')
+kernel_install_dir = ufo_dep.get_pkgconfig_variable('kerneldir')
+
+prefixdir = get_option('prefix')
+datadir = join_paths(prefixdir, get_option('datadir'))
+docdir = join_paths(datadir, 'doc', 'ufo-filters')
+
+deps = [
+ ufo_dep,
+ opencl_dep,
+ m_dep,
+]
+
+#subdir('deps')
+#subdir('docs')
+subdir('src')
+#subdir('tests')
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
new file mode 100644
index 0000000..0913528
--- /dev/null
+++ b/src/CMakeLists.txt
@@ -0,0 +1,82 @@
+cmake_minimum_required(VERSION 2.6)
+
+#{{{ Sources
+set(ufofilter_SRCS
+ ufo-roof-read-task.c
+ ufo-roof-build-task.c
+ )
+
+set(common_SRCS
+ ufo-roof-config.c
+ )
+
+set(roof_read_aux_SRCS
+ ufo-roof-read-socket.c
+ ufo-roof-read-file.c
+ )
+
+set(roof_build_aux_SRCS
+ ufo-roof-buffer.c
+ )
+
+
+file(GLOB ufofilter_KERNELS "kernels/*.cl")
+#}}}
+#{{{ Variables
+set(ufofilter_LIBS
+ m
+ ${UFO_LIBRARIES}
+ ${OpenCL_LIBRARIES})
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=gnu99 -pedantic -Wall -Wextra -fPIC -Wno-unused-parameter -Wno-deprecated-declarations")
+
+add_definitions(-D_FILE_OFFSET_BITS=64 -D_LARGE_FILES)
+#}}}
+#{{{ Dependency checks
+
+
+#}}}
+#{{{ Plugin targets
+include_directories(${CMAKE_CURRENT_BINARY_DIR}
+ ${CMAKE_CURRENT_SOURCE_DIR}
+ ${OpenCL_INCLUDE_DIRS}
+ ${UFO_INCLUDE_DIRS})
+
+configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config.h.in
+ ${CMAKE_CURRENT_BINARY_DIR}/config.h)
+
+
+foreach(_src ${ufofilter_SRCS})
+ # find plugin suffix
+ string(REGEX REPLACE "ufo-([^ \\.]+)-task.*" "\\1" task "${_src}")
+
+ # build string to get auxalleanous sources
+ string(REPLACE "-" "_" _aux ${task})
+ string(TOUPPER ${_aux} _aux_upper)
+
+ # create an option name and add this to disable filters
+ set(_aux_src "${_aux}_aux_SRCS")
+ set(_aux_libs "${_aux}_aux_LIBS")
+
+ string(REPLACE "-" "" _targetname ${task})
+ set(target "ufofilter${_targetname}")
+
+ # build single shared library per filter
+ if (${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
+ add_library(${target} MODULE ${_src} ${common_SRCS} ${${_aux_src}})
+ else()
+ add_library(${target} SHARED ${_src} ${common_SRCS} ${${_aux_src}})
+ endif()
+
+ target_link_libraries(${target} ${ufofilter_LIBS} ${${_aux_libs}} ufoaux)
+
+ list(APPEND all_targets ${target})
+
+ install(TARGETS ${target}
+ ARCHIVE DESTINATION ${UFO_PLUGINDIR}
+ LIBRARY DESTINATION ${UFO_PLUGINDIR})
+endforeach()
+#}}}
+#{{{ Subdirectories
+add_subdirectory(kernels)
+#}}}
diff --git a/src/DEVELOPMENT b/src/DEVELOPMENT
new file mode 100644
index 0000000..f3381fb
--- /dev/null
+++ b/src/DEVELOPMENT
@@ -0,0 +1,26 @@
+Architecture
+===========
+ - Current implementation follows UFO architecture: reader and dataset-builder are split in two filters.
+ * The reader is multi-threaded. However, only a single instance of the builder is possible to schedule.
+ This could limit maximum throughput on dual-head or even signle-head, but many-core systems.
+ * Another problem here is timing. All events in the builder are initiaded from the reader. Consequently,
+ as it seems we can't timeout on semi-complete dataset if no new data is arriving.
+ * Besides, performance this is also critical for stability. With continuous streaming there is no problem,
+ however, if a finite number of frames requested and some packets are lost, the software will wait forever
+ for missing bits.
+
+
+Questions
+=========
+ - Can we pre-allocate several UFO buffers for forth-comming events. Currently, we need to buffer out-of-order
+ packets and copy them later (or buffer everything for simplicity). We can avoid this data copy if we can get
+ at least one packet in advance.
+
+ - How I can execute 'generate' method on 'reductor' filter if no new data on the input for the specified
+ amount of time. One option is sending empty buffer with metadata indicating timeout. But this is again
+ hackish.
+
+ - Can we use 16-bit buffers? I can set dimmensions to 1/4 of the correct value to address this. But is it
+ possible to do in a clean way?
+
+ - What is 'ufotools' python package mentioned in documentation? Just a typo?
diff --git a/src/kernels/CMakeLists.txt b/src/kernels/CMakeLists.txt
new file mode 100644
index 0000000..e7df764
--- /dev/null
+++ b/src/kernels/CMakeLists.txt
@@ -0,0 +1,8 @@
+cmake_minimum_required(VERSION 2.6)
+
+# copy kernels
+file(GLOB ufofilter_KERNELS "*.cl")
+
+foreach(_kernel ${ufofilter_KERNELS})
+ install(FILES ${_kernel} DESTINATION ${UFO_KERNELDIR})
+endforeach()
diff --git a/src/kernels/meson.build b/src/kernels/meson.build
new file mode 100644
index 0000000..9803fb9
--- /dev/null
+++ b/src/kernels/meson.build
@@ -0,0 +1,7 @@
+kernel_files = [
+]
+
+install_data(kernel_files,
+ install_dir: kernel_install_dir,
+)
+
diff --git a/src/meson.build b/src/meson.build
new file mode 100644
index 0000000..8f3529e
--- /dev/null
+++ b/src/meson.build
@@ -0,0 +1,45 @@
+plugins = [
+ 'roof-read',
+ 'roof-build',
+]
+
+roof_common_src = [
+ 'ufo-roof-config.c',
+]
+
+roof_plugin_src = {
+ 'roof-read': [
+ 'ufo-roof-read-socket.c',
+ 'ufo-roof-read-file.c',
+ ],
+ 'roof-build': [
+ 'ufo-roof-buffer.c',
+ ],
+}
+
+# standard plugins
+
+foreach plugin: plugins
+ name = ''.join(plugin.split('-'))
+
+ sources = roof_common_src + [
+ 'ufo-@0@-task.c'.format(plugin),
+ ]
+
+ if plugin in roof_plugin_src
+ sources += roof_plugin_src[plugin]
+ endif
+
+ shared_module(name,
+ 'ufo-@0@-task.c'.format(plugin),
+ dependencies: deps,
+ name_prefix: 'libufofilter',
+ install: true,
+ install_dir: plugin_install_dir,
+ sources: sources
+ )
+endforeach
+
+
+
+subdir('kernels')
diff --git a/src/ufo-roof-buffer.c b/src/ufo-roof-buffer.c
new file mode 100644
index 0000000..f071481
--- /dev/null
+++ b/src/ufo-roof-buffer.c
@@ -0,0 +1,122 @@
+#include <stdio.h>
+#include <stdint.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+
+// This is currently not thread safe. With dual-filter architecture this will be called sequentially.
+
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error) {
+ UfoRoofBuffer *buffer = (UfoRoofBuffer*)calloc(1, sizeof(UfoRoofBuffer));
+ if (!buffer) roof_new_error(error, "Can't allocate UfoRoofBuffer");
+
+ buffer->ring_size = cfg->buffer_size;
+ buffer->fragment_size = cfg->payload_size;
+ buffer->dataset_size = cfg->dataset_size;
+ buffer->fragments_per_dataset = buffer->dataset_size / buffer->fragment_size;
+ buffer->fragments_per_stream = buffer->fragments_per_dataset / cfg->n_streams;
+// printf("Configuration: dataset: %u - %u fragments (%u streams x %u) x %u bytes\n", buffer->dataset_size, buffer->fragments_per_dataset, cfg->n_streams, buffer->fragments_per_stream, buffer->fragment_size);
+
+ buffer->ring_buffer = malloc(buffer->ring_size * buffer->dataset_size);
+ buffer->n_fragments = (_Atomic int*)calloc(buffer->ring_size, sizeof(_Atomic int));
+ buffer->stream_fragment = (guint*)calloc(cfg->n_streams, sizeof(guint));
+
+ if ((!buffer->ring_buffer)||(!buffer->n_fragments)||(!buffer->stream_fragment)) {
+ ufo_roof_buffer_free(buffer);
+ roof_new_error(error, "Can't allocate ring buffer for ROOF datasets, total size %u", buffer->ring_size * buffer->dataset_size);
+ }
+
+ return buffer;
+}
+
+void ufo_roof_buffer_free(UfoRoofBuffer *buffer) {
+ if (buffer) {
+ if (buffer->ring_buffer)
+ free(buffer->ring_buffer);
+ if (buffer->n_fragments)
+ free(buffer->n_fragments);
+ if (buffer->stream_fragment)
+ free(buffer->stream_fragment);
+
+ free(buffer);
+ }
+}
+
+ // fragment_id is numbered from 1 (0 - means auto)
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error) {
+ guint buffer_id;
+ guint dataset_id;
+
+ if (!fragment_id) {
+ fragment_id = ++buffer->stream_fragment[stream_id];
+ }
+
+ // If we have packets of arbitrary size, we would need dataset_id transferred along with packet_id (otherwise complex guessing is required)
+ dataset_id = (fragment_id - 1) / buffer->fragments_per_stream;
+ fragment_id = (fragment_id - 1) % buffer->fragments_per_stream;
+ buffer_id = dataset_id % buffer->ring_size;
+
+ // Late arrived packed
+// printf("data set: %i, channel: %i, fragment: %i (buffer: %i)\n", dataset_id, stream_id, fragment_id, buffer_id);
+ if (dataset_id < buffer->current_id)
+ roof_network_error_with_retval(error, FALSE, "Late arrived packet for dataset %i, currently processing %i", dataset_id, buffer->current_id);
+
+ // We are not fast enough, new packets are arrvining to fast
+ if (dataset_id >= (buffer->current_id + buffer->ring_size)) {
+ // FIXME: Broken packets sanity checks? Allocate additional buffers on demand?
+
+ if (error)
+ root_set_network_error(error, "Ring buffer exhausted. Dropping datasets from %i to %i, current dataset has %i parts of %i completed",
+ buffer->current_id, dataset_id - buffer->ring_size, buffer->n_fragments[buffer_id], buffer->fragments_per_dataset);
+
+ // FIXME: Send semi-complete buffers further?
+ // FIXME: Or shall we drop more if larger buffers are allocated?
+ for (int i = buffer->current_id; i <= (dataset_id - buffer->ring_size); i++)
+ buffer->n_fragments[i%buffer->ring_size] = 0;
+ buffer->current_id = dataset_id - buffer->ring_size + 1;
+
+ // FIXME: In mult-threaded case, we need to ensure that all threads are stopped writting here (and generator is not reading) before we can reassign buffer to the new dataset.
+ // To avoid locking, we can store per-thread 'current_id' and only proceed to writting when all per-threads current_ids are equal or above the global value
+ // The updates may happen after writting/reading is finished.
+ }
+
+ // FIXME: This is builds events as it read from file in roof v.1 code. We can assemble fan projections directly here.
+ void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+ void *fragment_buffer = dataset_buffer + (stream_id * buffer->fragments_per_stream + fragment_id) * buffer->fragment_size;
+
+/* printf("buffer: %u (%u), packet: %u (%ux%u %u), packet_size: %u [%x]\n",
+ buffer_id, dataset_id, stream_id * buffer->fragments_per_stream + fragment_id, stream_id, buffer->fragments_per_stream, fragment_id, buffer->fragment_size,
+ ((uint32_t*)fragment)[0]
+ );*/
+ memcpy(fragment_buffer, fragment, buffer->fragment_size);
+
+ // FIXME: Sanity checks: verify is not a dublicate fragment?
+ atomic_fetch_add(&buffer->n_fragments[buffer_id], 1);
+
+ if (buffer->n_fragments[buffer_id] == buffer->fragments_per_dataset) {
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (dataset_id == buffer->current_id) {
+ return TRUE;
+ }
+ }
+
+ return FALSE;
+}
+
+
+
+gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error) {
+ guint buffer_id = buffer->current_id % buffer->ring_size;
+ void *dataset_buffer = buffer->ring_buffer + buffer_id * buffer->dataset_size;
+
+ // FIXME: what about a complete dataset blocked by earlier one with only a few framents missing?
+ if (buffer->n_fragments[buffer_id] < buffer->fragments_per_dataset) return FALSE;
+
+ memcpy(output_buffer, dataset_buffer, buffer->dataset_size);
+ buffer->n_fragments[buffer_id] = 0;
+ buffer->current_id += 1;
+
+ return TRUE;
+}
diff --git a/src/ufo-roof-buffer.h b/src/ufo-roof-buffer.h
new file mode 100644
index 0000000..bb71791
--- /dev/null
+++ b/src/ufo-roof-buffer.h
@@ -0,0 +1,31 @@
+#ifndef __UFO_ROOF_BUFFER_H
+#define __UFO_ROOF_BUUFER_H
+
+#include <stdatomic.h>
+
+struct _UfoRoofBuffer {
+ guint current_id; // The ID of the first (active) dataset in the buffer
+
+ guint ring_size; // Number of datasets to buffer
+ void *ring_buffer; // The ring buffer
+ _Atomic int *n_fragments; // Number of completed fragments in each buffer
+ guint *stream_fragment; // Currently processed fragment in the stream (for ordered streams)
+// int *fragments; // Mark individual completed fragments (if we care for partial data)
+
+
+ guint dataset_size; // Size (in bytes) of a full dataset
+ guint fragment_size; // Size (in bytes) of a single fragment (we expect fixed-size fragments at the moment)
+
+ guint fragments_per_dataset; // Number of packets in dataset (used to compute when dataset is ready)
+ guint fragments_per_stream; // Number of packets in each of data streams (used to compute when dataset is ready)
+};
+
+typedef struct _UfoRoofBuffer UfoRoofBuffer;
+
+UfoRoofBuffer *ufo_roof_buffer_new(UfoRoofConfig *cfg, GError **error);
+void ufo_roof_buffer_free(UfoRoofBuffer *buf);
+
+gboolean ufo_roof_buffer_set_fragment(UfoRoofBuffer *buffer, guint stream_id, guint fragment_id, gconstpointer fragment, GError **error);
+gboolean ufo_roof_buffer_get_dataset(UfoRoofBuffer *buffer, gpointer output_buffer, GError **error);
+
+#endif
diff --git a/src/ufo-roof-build-task.c b/src/ufo-roof-build-task.c
new file mode 100644
index 0000000..81c84ce
--- /dev/null
+++ b/src/ufo-roof-build-task.c
@@ -0,0 +1,325 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <stdio.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-buffer.h"
+#include "ufo-roof-build-task.h"
+
+
+struct _UfoRoofBuildTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofBuffer *buf; // Ring buffer for incomming UDP packet
+
+ guint number; // Number of datasets to read
+ gboolean stop; // Stop flag
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofBuildTask, ufo_roof_build_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_BUILD_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_STOP,
+ PROP_NUMBER,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_build_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_BUILD_TASK, NULL));
+}
+
+static void
+ufo_roof_build_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, &gerr);
+ if (!priv->cfg)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+
+
+ priv->buf = ufo_roof_buffer_new(priv->cfg, &gerr);
+ if (!priv->buf)
+ roof_propagate_error(error, gerr, "roof-build-setup: ");
+}
+
+static void
+ufo_roof_build_task_finalize (GObject *object)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ if (priv->buf) {
+ ufo_roof_buffer_free(priv->buf);
+ priv->buf = NULL;
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+
+ G_OBJECT_CLASS (ufo_roof_build_task_parent_class)->finalize (object);
+}
+
+
+
+static void
+ufo_roof_build_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+
+ guint bytes = priv->cfg->dataset_size;
+
+ // FIXME: Can this be made more elegant?
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+
+}
+
+static guint
+ufo_roof_build_task_get_num_inputs (UfoTask *task)
+{
+ return 1;
+}
+
+static guint
+ufo_roof_build_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 1;
+}
+
+static UfoTaskMode
+ufo_roof_build_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_REDUCTOR;
+}
+
+static gboolean
+ufo_roof_build_task_process (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+ gboolean ready = FALSE;
+
+// UfoRequisition in_req;
+// ufo_buffer_get_requisition (inputs[0], &in_req);
+
+ void *data = ufo_buffer_get_host_array(inputs[0], NULL);
+ UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(data, cfg);
+
+ if (priv->stop)
+ return FALSE;
+
+ for (int i = 0; i < header->n_packets; i++) {
+ int packet_id = 0;
+
+ // Otherwise considered consecutive and handled by the buffer
+ if (cfg->header_size >= sizeof(UfoRoofPacketHeader)) {
+ UfoRoofPacketHeader *pheader = UFO_ROOF_PACKET_HEADER(data);
+ packet_id = pheader->packet_id + 1;
+ }
+
+ ready |= ufo_roof_buffer_set_fragment(buf, header->channel_id, packet_id, data, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ data += cfg->max_packet_size;
+ }
+
+ // FIXME: if 2nd dataset is ready (2nd and 3rd?), skip the first one?
+
+// printf("proc (%s) - channel: %i, packets: %i\n", ready?"yes":" no", header->channel_id, header->n_packets);
+
+ return !ready;
+}
+
+static gboolean
+ufo_roof_build_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ gboolean ready = FALSE;
+ GError *gerr = NULL;
+
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (task);
+// UfoRoofConfig *cfg = priv->cfg;
+ UfoRoofBuffer *buf = priv->buf;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+
+ if (priv->stop)
+ return FALSE;
+
+ ready = ufo_roof_buffer_get_dataset(buf, output_buffer, &gerr);
+ if (gerr) roof_print_error(gerr);
+
+ if ((priv->number)&&(buf->current_id >= priv->number)) {
+ priv->stop = TRUE;
+ g_object_notify_by_pspec (G_OBJECT(task), properties[PROP_STOP]);
+ }
+
+// printf("gen(%s) %i\n", ready?"yes":" no", buf->current_id);
+
+ return ready;
+}
+
+static void
+ufo_roof_build_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ case PROP_NUMBER:
+ priv->number = g_value_get_uint (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_build_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofBuildTaskPrivate *priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ case PROP_NUMBER:
+ g_value_set_uint (value, priv->number);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_build_task_setup;
+ iface->get_num_inputs = ufo_roof_build_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_build_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_build_task_get_mode;
+ iface->get_requisition = ufo_roof_build_task_get_requisition;
+ iface->process = ufo_roof_build_task_process;
+ iface->generate = ufo_roof_build_task_generate;
+}
+
+static void
+ufo_roof_build_task_class_init (UfoRoofBuildTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_build_task_set_property;
+ oclass->get_property = ufo_roof_build_task_get_property;
+ oclass->finalize = ufo_roof_build_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ properties[PROP_NUMBER] =
+ g_param_spec_uint("number",
+ "Number of datasets to receive",
+ "Number of datasets to receive",
+ 0, G_MAXUINT, 0,
+ G_PARAM_READWRITE);
+
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofBuildTaskPrivate));
+}
+
+static void
+ufo_roof_build_task_init(UfoRoofBuildTask *self)
+{
+ self->priv = UFO_ROOF_BUILD_TASK_GET_PRIVATE(self);
+}
diff --git a/src/ufo-roof-build-task.h b/src/ufo-roof-build-task.h
new file mode 100644
index 0000000..cefb7ab
--- /dev/null
+++ b/src/ufo-roof-build-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ROOF_BUILD_TASK_H
+#define __UFO_ROOF_BUILD_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ROOF_BUILD_TASK (ufo_roof_build_task_get_type())
+#define UFO_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTask))
+#define UFO_IS_ROOF_BUILD_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_BUILD_TASK))
+#define UFO_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass))
+#define UFO_IS_ROOF_BUILD_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_BUILD_TASK))
+#define UFO_ROOF_BUILD_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_BUILD_TASK, UfoRoofBuildTaskClass))
+
+typedef struct _UfoRoofBuildTask UfoRoofBuildTask;
+typedef struct _UfoRoofBuildTaskClass UfoRoofBuildTaskClass;
+typedef struct _UfoRoofBuildTaskPrivate UfoRoofBuildTaskPrivate;
+
+struct _UfoRoofBuildTask {
+ UfoTaskNode parent_instance;
+
+ UfoRoofBuildTaskPrivate *priv;
+};
+
+struct _UfoRoofBuildTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_roof_build_task_new (void);
+GType ufo_roof_build_task_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ufo-roof-config.c b/src/ufo-roof-config.c
new file mode 100644
index 0000000..11f8bd4
--- /dev/null
+++ b/src/ufo-roof-config.c
@@ -0,0 +1,172 @@
+#include <stdio.h>
+#include <stdint.h>
+
+#include <json-glib/json-glib.h>
+
+#include <ufo/ufo.h>
+
+#include "ufo-roof-error.h"
+#include "ufo-roof-config.h"
+
+#define roof_config_node_get_with_default(var, parent, type, name, default) do { \
+ JsonNode *node = json_object_get_member(parent, name); \
+ if (node) var = json_node_get_##type(node); \
+ else var = default; \
+ } while(0)
+
+#define roof_config_node_get_string_with_default(var, parent, name, default) do { \
+ const gchar *str; \
+ JsonNode *node = json_object_get_member(parent, name); \
+ if (node) str = json_node_get_string(node); \
+ else str = default; \
+ if (var != str) { \
+ if (var) g_free(var); \
+ var = g_strdup(str); \
+ } \
+ } while(0)
+
+#define roof_config_node_get(var, parent, type, name) \
+ roof_config_node_get_with_default(var, parent, type, name, var)
+
+#define roof_config_node_get_string(var, parent, name) \
+ roof_config_node_get_string_with_default(var, parent, name, var)
+
+
+typedef struct {
+ UfoRoofConfig cfg;
+
+ JsonParser *parser;
+} UfoRoofConfigPrivate;
+
+void ufo_roof_config_free(UfoRoofConfig *cfg) {
+ if (cfg) {
+ UfoRoofConfigPrivate *priv = (UfoRoofConfigPrivate*)cfg;
+
+ if (cfg->path)
+ g_free(cfg->path);
+
+ if (priv->parser)
+ g_object_unref (priv->parser);
+
+ free(cfg);
+ }
+}
+
+UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error) {
+ UfoRoofConfigPrivate *priv;
+ UfoRoofConfig *cfg;
+
+// JsonNode *node;
+ JsonObject *root = NULL;
+ JsonObject *hardware = NULL;
+ JsonObject *network = NULL;
+ JsonObject *performance = NULL;
+ JsonObject *simulation = NULL;
+
+ GError *gerr = NULL;
+
+ priv = (UfoRoofConfigPrivate*)malloc(sizeof(UfoRoofConfigPrivate));
+ if (!priv) roof_new_error(error, "Can't allocate UfoRoofConfig");
+
+ memset(priv, 0, sizeof(UfoRoofConfigPrivate));
+
+ // Set defaults
+ cfg = &priv->cfg;
+
+ cfg->port = 4000;
+ cfg->n_streams = 1;
+ cfg->protocol = "udp";
+ cfg->network_timeout = 10000000;
+ cfg->header_size = 0;
+ cfg->payload_size = 0;
+ cfg->max_packet_size = 0;
+ cfg->max_packets = 100;
+ cfg->dataset_size = 0;
+ cfg->buffer_size = 2;
+ cfg->path = NULL;
+
+ // Read configuration
+ priv->parser = json_parser_new_immutable ();
+ json_parser_load_from_file (priv->parser, config, &gerr);
+
+ if (gerr != NULL) {
+ g_propagate_prefixed_error(error, gerr, "Error parsing JSON file (%s) with ROOF configuration: ", config);
+ ufo_roof_config_free(cfg);
+ return NULL;
+ }
+
+ root = json_node_get_object (json_parser_get_root (priv->parser));
+
+ if (root) {
+ roof_config_node_get(hardware, root, object, "hardware");
+ roof_config_node_get(network, root, object, "network");
+ roof_config_node_get(simulation, root, object, "simulation");
+ }
+
+ if (hardware) {
+ // FIXME: Compute dataset size based on roof hardware
+ }
+
+ if (network) {
+// int max_packet_size = 0;
+
+ roof_config_node_get(cfg->port, network, int, "port");
+ roof_config_node_get(cfg->n_streams, network, int, "streams");
+
+ roof_config_node_get(cfg->max_packet_size, network, int, "max_packet_size");
+ // FIXME: compute payload_size based on sample_size
+ roof_config_node_get(cfg->payload_size, network, int, "payload_size");
+ roof_config_node_get(cfg->header_size, network, int, "header_size");
+ roof_config_node_get(cfg->dataset_size, network, int, "dataset_size");
+ }
+
+ if (performance) {
+ roof_config_node_get(cfg->max_packets, performance, int, "packets_at_once");
+ roof_config_node_get(cfg->buffer_size, performance, int, "buffer_size");
+ }
+
+ if (simulation) {
+ roof_config_node_get_string(cfg->path, simulation, "path");
+ roof_config_node_get(cfg->first_file_number, simulation, int, "first_file_number");
+ }
+
+ // Check configuration consistency
+ if (!cfg->payload_size) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "Packet size is not set");
+ }
+
+ if ((!cfg->header_size)&&(!cfg->path)) {
+ if (!strncmp(cfg->protocol, "udp", 3)) {
+ // Error if 0 implicitely set, use default value otherwise
+ if ((network)&&(json_object_get_member(network, "header_size"))) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "The header with packet ids is required for un-ordered protocols");
+ } else {
+ cfg->header_size = sizeof(uint32_t);
+ }
+ }
+ }
+
+ guint fragments_per_dataset = cfg->dataset_size / cfg->payload_size;
+ guint fragments_per_stream = fragments_per_dataset / cfg->n_streams;
+
+ if ((cfg->dataset_size % cfg->payload_size)||(fragments_per_dataset%cfg->n_streams)) {
+ ufo_roof_config_free(cfg);
+ roof_new_error(error, "Inconsistent ROOF configuration: dataset_size=%u, packet_size=%u, data_streams=%u", cfg->dataset_size, cfg->payload_size, cfg->n_streams);
+ }
+
+ if (cfg->buffer_size * fragments_per_stream < cfg->max_packets) {
+ cfg->max_packets = cfg->buffer_size * fragments_per_stream / 2;
+ }
+
+ // Finalize configuration
+ if (!cfg->max_packet_size)
+ cfg->max_packet_size = cfg->header_size + cfg->payload_size;
+
+ if (!cfg->dataset_size)
+ cfg->dataset_size = cfg->payload_size;
+
+
+ return cfg;
+}
diff --git a/src/ufo-roof-config.h b/src/ufo-roof-config.h
new file mode 100644
index 0000000..a22c84f
--- /dev/null
+++ b/src/ufo-roof-config.h
@@ -0,0 +1,37 @@
+#ifndef __UFO_ROOF_CONFIG_H
+#define __UFO_ROOF_CONFIG_H
+
+#include <glib.h>
+
+typedef struct {
+ gchar *path; // Location of data files for simmulation purposes (i.e. reading a sequence of files instead listening on the corresponding ports)
+ guint first_file_number; // Indicates if the numbering of files starts at 0 or 1
+ gchar *protocol; // Protocols: tcp, udp, tcp6, udp6, ...
+ guint port; // First port
+ guint n_streams; // Number of independent data streams (expected on sequential ports)
+ guint header_size; // Expected size of the packet header, for dgram protocols we need at least 32-bit sequence number. Defaults to uint32_t for udp* and 0 - otherwise
+ guint payload_size; // Expected size of TCP/UDP packet (without header)
+ guint dataset_size; // Size of a single dataset (image, sinogram, etc.). This is real size in bytes, excluding all technical headers used in communication protocol.
+
+//?
+
+
+/*
+ guint pixels_per_module;
+ guint planes_per_module;
+ guint samples_per_dataset;
+*/
+
+ guint max_packets; // limits maximum number of packets which are read at once
+ guint max_packet_size; // payload_size + header_size + ...?
+ guint buffer_size; // How many datasets we can buffer. There is no sense to have more than 2 for odered protocols (default), but having larger number could help for UDP if significant order disturbances are expected
+ guint network_timeout; // Maximum time (us) to wait for data on the socket
+
+
+
+} UfoRoofConfig;
+
+UfoRoofConfig *ufo_roof_config_new(const char *config, GError **error);
+void ufo_roof_config_free(UfoRoofConfig *cfg);
+
+#endif /* __UFO_ROOF_CONFIG_H */
diff --git a/src/ufo-roof-error.h b/src/ufo-roof-error.h
new file mode 100644
index 0000000..ed0ae2b
--- /dev/null
+++ b/src/ufo-roof-error.h
@@ -0,0 +1,58 @@
+#ifndef __UFO_ROOF_ERROR_H
+#define __UFO_ROOF_ERROR_H
+
+#include <ufo/ufo.h>
+
+
+#define roof_print_error(error) do { \
+ g_warning("%s", error->message); \
+ g_error_free(error); \
+ error = NULL; \
+ } while (0)
+
+#define roof_set_error(error, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ } while (0)
+
+#define roof_error(error, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ return; \
+ } while (0)
+
+#define roof_propagate_error(error, err, msg...) do { \
+ g_propagate_prefixed_error(error, err, msg); \
+ return; \
+ } while (0)
+
+#define roof_error_with_retval(error, retval, type, msg...) do { \
+ if (error) g_set_error(error, UFO_TASK_ERROR, UFO_TASK_ERROR_##type, msg); \
+ return retval; \
+ } while (0)
+
+#define roof_propagate_error_with_retval(error, retval, err, msg...) do { \
+ g_propagate_prefixed_error(error, err, msg); \
+ return retval; \
+ } while (0)
+
+
+#define roof_setup_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+#define roof_new_error(error, msg...) \
+ roof_error_with_retval(error, NULL, SETUP, msg)
+
+
+#define roof_network_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+#define root_set_network_error(error, msg...) \
+ roof_set_error(error, SETUP, msg)
+
+#define roof_network_error_with_retval(error, retval, msg...) \
+ roof_error_with_retval(error, retval, SETUP, msg)
+
+#define roof_memory_error(error, msg...) \
+ roof_error(error, SETUP, msg)
+
+
+#endif /* __UFO_ROOF_ERROR_H */
diff --git a/src/ufo-roof-read-file.c b/src/ufo-roof-read-file.c
new file mode 100644
index 0000000..de8391e
--- /dev/null
+++ b/src/ufo-roof-read-file.c
@@ -0,0 +1,84 @@
+#include <stdio.h>
+#include <errno.h>
+#include <stdint.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-file.h"
+
+typedef struct {
+ UfoRoofReadInterface iface;
+
+ UfoRoofConfig *cfg;
+
+ gchar *fname;
+ FILE *fd;
+} UfoRoofReadFile;
+
+static void ufo_roof_read_file_free(UfoRoofReadInterface *iface) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
+
+ if (reader) {
+ if (reader->fname)
+ g_free(reader->fname);
+
+ if (reader->fd >= 0)
+ fclose(reader->fd);
+
+ free(reader);
+ }
+}
+
+static guint ufo_roof_read_file(UfoRoofReadInterface *iface, void *buffers, GError **error) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)iface;
+ UfoRoofConfig *cfg = reader->cfg;
+
+ size_t bytes = 0;
+ size_t packet_size = cfg->header_size + cfg->payload_size;
+ size_t expected = cfg->max_packets * packet_size;
+
+ while ((!feof(reader->fd))&&(!ferror(reader->fd))&&(bytes < expected)) {
+ size_t ret = fread(buffers + bytes, 1, expected - bytes, reader->fd);
+ bytes += ret;
+ }
+
+ guint packets = bytes / packet_size;
+
+ if (ferror(reader->fd)) {
+ roof_network_error_with_retval(error, 0, "read failed, error %i", ferror(reader->fd));
+ } else if ((feof(reader->fd))&&(bytes % packet_size)) {
+ roof_network_error_with_retval(error, packets, "extra data in the end of input");
+ }
+
+ return packets;
+}
+
+
+UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error) {
+ UfoRoofReadFile *reader = (UfoRoofReadFile*)calloc(1, sizeof(UfoRoofReadFile));
+ if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadFile");
+
+ // FIXME: Shall we jump if max_packet_size > header+payload (or will be extra data included in the data files)? Report error for now.
+ if ((cfg->header_size + cfg->payload_size) != cfg->max_packet_size)
+ roof_new_error(error, "packet_size (%u) should be equal to max_packet_size (%u) if UfoRoofReadFile is used", cfg->header_size + cfg->payload_size, cfg->max_packet_size);
+
+ reader->cfg = cfg;
+ reader->iface.close = ufo_roof_read_file_free;
+ reader->iface.read =ufo_roof_read_file;
+
+ reader->fname = g_strdup_printf(cfg->path, id + cfg->first_file_number);
+ if (!reader->fname) {
+ free(reader);
+ roof_new_error(error, "Can't build file name");
+ }
+
+ reader->fd = fopen(reader->fname, "rb");
+ if (!reader->fd) {
+ g_free(reader->fname);
+ g_free(reader);
+ roof_new_error(error, "Can't open file %s", reader->fname);
+ }
+
+ return (UfoRoofReadInterface*)reader;
+}
diff --git a/src/ufo-roof-read-file.h b/src/ufo-roof-read-file.h
new file mode 100644
index 0000000..54bcf49
--- /dev/null
+++ b/src/ufo-roof-read-file.h
@@ -0,0 +1,8 @@
+#ifndef __UFO_ROOF_READ_FILE_H
+#define __UFO_ROOF_READ_FILE_H
+
+#include "ufo-roof-read.h"
+
+UfoRoofReadInterface *ufo_roof_read_file_new(UfoRoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-socket.c b/src/ufo-roof-read-socket.c
new file mode 100644
index 0000000..b72e9d0
--- /dev/null
+++ b/src/ufo-roof-read-socket.c
@@ -0,0 +1,118 @@
+#define _GNU_SOURCE
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+
+#include "glib.h"
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-socket.h"
+
+typedef struct {
+ UfoRoofReadInterface iface;
+
+ UfoRoofConfig *cfg;
+ int socket;
+} UfoRoofReadSocket;
+
+static void ufo_roof_read_socket_free(UfoRoofReadInterface *iface) {
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
+
+ if (reader) {
+ if (reader->socket >= 0)
+ close(reader->socket);
+ free(reader);
+ }
+}
+
+static guint ufo_roof_read_socket(UfoRoofReadInterface *iface, void *buf, GError **error) {
+ struct timespec timeout_ts;
+
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)iface;
+ UfoRoofConfig *cfg = reader->cfg;
+
+ struct mmsghdr msg[cfg->max_packets];
+ struct iovec msgvec[cfg->max_packets];
+
+ timeout_ts.tv_sec = cfg->network_timeout / 1000000;
+ timeout_ts.tv_nsec = 1000 * (cfg->network_timeout % 1000000);
+
+ // FIXME: Is it optimal? Auto-tune max_packets? Combine read & build?
+ memset(msg, 0, sizeof(msg));
+ memset(msgvec, 0, sizeof(msgvec));
+ for (int i = 0; i < cfg->max_packets; i++) {
+ msgvec[i].iov_base = buf + i * cfg->max_packet_size;
+ msgvec[i].iov_len = cfg->max_packet_size;
+ msg[i].msg_hdr.msg_iov = &msgvec[i];
+ msg[i].msg_hdr.msg_iovlen = 1;
+ }
+
+ int packets = recvmmsg(reader->socket, msg, reader->cfg->max_packets, MSG_WAITFORONE, &timeout_ts);
+ if (packets < 0) roof_network_error_with_retval(error, 0, "recvmmsg failed, error %i", errno);
+
+ // FIXME: Shall we verify packets consistency here? We can check at least the sizes...
+
+ return (guint)packets;
+}
+
+
+UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error) {
+ int err;
+ int port = cfg->port + id;
+ char port_str[16];
+ const char *addr_str = "0.0.0.0";
+ struct addrinfo sockaddr_hints;
+ struct addrinfo *sockaddr_info;
+
+ UfoRoofReadSocket *reader = (UfoRoofReadSocket*)calloc(1, sizeof(UfoRoofReadSocket));
+ if (!reader) roof_new_error(error, "Can't allocate UfoRoofReadSocket");
+
+ reader->cfg = cfg;
+ reader->iface.close = ufo_roof_read_socket_free;
+ reader->iface.read =ufo_roof_read_socket;
+
+ snprintf(port_str, sizeof(port_str), "%d", port);
+ port_str[sizeof(port_str) / sizeof(port_str[0]) - 1] = '\0';
+
+ memset(&sockaddr_hints, 0, sizeof(sockaddr_hints));
+ if (!strncmp(cfg->protocol, "udp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_DGRAM;
+ sockaddr_hints.ai_protocol = IPPROTO_UDP;
+ } else if (!strncmp(cfg->protocol, "tcp", 3)) {
+ sockaddr_hints.ai_family = AF_UNSPEC;
+ sockaddr_hints.ai_socktype = SOCK_STREAM;
+ sockaddr_hints.ai_protocol = IPPROTO_TCP;
+ } else {
+ roof_new_error(error, "Unsupported protocol (%s)", cfg->protocol);
+ }
+
+ err = getaddrinfo(addr_str, port_str, &sockaddr_hints, &sockaddr_info);
+ if (err || !sockaddr_info) {
+ free(reader);
+ roof_new_error(error, "Invalid address (%s) or port (%s)", addr_str, port_str);
+ }
+
+ reader->socket = socket(sockaddr_info->ai_family, sockaddr_info->ai_socktype | SOCK_CLOEXEC, sockaddr_info->ai_protocol);
+ if(reader->socket == -1) {
+ freeaddrinfo(sockaddr_info);
+ free(reader);
+ roof_new_error(error, "Can't create socket (%s) for address (%s) on port (%s)", cfg->protocol, addr_str, port_str);
+ }
+
+ err = bind(reader->socket, sockaddr_info->ai_addr, sockaddr_info->ai_addrlen);
+ if(err != 0) {
+ freeaddrinfo(sockaddr_info);
+ close(reader->socket);
+ free(reader);
+ roof_new_error(error, "Error (%i) binding socket (%s) for address (%s) on port (%s)", err, cfg->protocol, addr_str, port_str);
+ }
+
+ freeaddrinfo(sockaddr_info);
+
+ return (UfoRoofReadInterface*)reader;
+}
diff --git a/src/ufo-roof-read-socket.h b/src/ufo-roof-read-socket.h
new file mode 100644
index 0000000..74b0742
--- /dev/null
+++ b/src/ufo-roof-read-socket.h
@@ -0,0 +1,8 @@
+#ifndef __UFO_ROOF_READ_SOCKET_H
+#define __UFO_ROOF_READ_SOCKET_H
+
+#include "ufo-roof-read.h"
+
+UfoRoofReadInterface *ufo_roof_read_socket_new(UfoRoofConfig *cfg, guint id, GError **error);
+
+#endif
diff --git a/src/ufo-roof-read-task.c b/src/ufo-roof-read-task.c
new file mode 100644
index 0000000..ebff9de
--- /dev/null
+++ b/src/ufo-roof-read-task.c
@@ -0,0 +1,289 @@
+/*
+ * Copyright (C) 2011-2015 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _GNU_SOURCE
+
+#include <stdio.h>
+
+#ifdef __APPLE__
+#include <OpenCL/cl.h>
+#else
+#include <CL/cl.h>
+#endif
+
+#include "ufo-roof.h"
+#include "ufo-roof-read-socket.h"
+#include "ufo-roof-read-file.h"
+#include "ufo-roof-read-task.h"
+
+struct _UfoRoofReadTaskPrivate {
+ gchar *config; // ROOF configuration file name
+ UfoRoofConfig *cfg; // Parsed ROOF parameters
+ UfoRoofReadInterface *reader;
+
+ guint id; // Reader ID (defince sequential port number)
+ gboolean stop; // Flag requiring termination
+};
+
+static void ufo_task_interface_init (UfoTaskIface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (UfoRoofReadTask, ufo_roof_read_task, UFO_TYPE_TASK_NODE,
+ G_IMPLEMENT_INTERFACE (UFO_TYPE_TASK,
+ ufo_task_interface_init))
+
+#define UFO_ROOF_READ_TASK_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskPrivate))
+
+enum {
+ PROP_0,
+ PROP_ID,
+ PROP_STOP,
+ PROP_CONFIG,
+ N_PROPERTIES
+};
+
+static GParamSpec *properties[N_PROPERTIES] = { NULL, };
+
+UfoNode *
+ufo_roof_read_task_new (void)
+{
+ return UFO_NODE (g_object_new (UFO_TYPE_ROOF_READ_TASK, NULL));
+}
+
+static void
+ufo_roof_read_task_setup (UfoTask *task,
+ UfoResources *resources,
+ GError **error)
+{
+ GError *gerr = NULL;
+
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+
+ if (!priv->config)
+ roof_setup_error(error, "ROOF configuration is not specified");
+
+ priv->cfg = ufo_roof_config_new(priv->config, &gerr);
+ if (!priv->cfg) roof_propagate_error(error, gerr, "roof_config_new: ");
+
+ // Consistency checks
+ if (priv->id >= priv->cfg->n_streams)
+ roof_setup_error(error, "Specified Stream ID is %u, but only %u data streams is configured", priv->id, priv->cfg->n_streams);
+
+ // Start actual reader
+ if (priv->cfg->path)
+ priv->reader = ufo_roof_read_file_new(priv->cfg, priv->id, &gerr);
+ else
+ priv->reader = ufo_roof_read_socket_new(priv->cfg, priv->id, &gerr);
+
+ if (!priv->reader)
+ roof_propagate_error(error, gerr, "roof_read_new: ");
+
+}
+
+
+static void
+ufo_roof_read_task_finalize (GObject *object)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ if (priv->reader) {
+ priv->reader->close(priv->reader);
+ }
+
+ if (priv->cfg) {
+ ufo_roof_config_free(priv->cfg);
+ priv->cfg = NULL;
+ }
+
+ if (priv->config) {
+ g_free(priv->config);
+ priv->config = NULL;
+ }
+
+ G_OBJECT_CLASS (ufo_roof_read_task_parent_class)->finalize (object);
+}
+
+static void
+ufo_roof_read_task_get_requisition (UfoTask *task,
+ UfoBuffer **inputs,
+ UfoRequisition *requisition,
+ GError **error)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+
+ guint bytes = cfg->max_packets * cfg->max_packet_size + sizeof(UfoRoofPacketBlockHeader);
+
+ // FIXME: Can this be made more elegant?
+ requisition->n_dims = 1;
+ requisition->dims[0] = bytes / sizeof(float) + ((bytes%sizeof(float))?1:0);
+}
+
+static guint
+ufo_roof_read_task_get_num_inputs (UfoTask *task)
+{
+ return 0;
+}
+
+static guint
+ufo_roof_read_task_get_num_dimensions (UfoTask *task,
+ guint input)
+{
+ return 0;
+}
+
+static UfoTaskMode
+ufo_roof_read_task_get_mode (UfoTask *task)
+{
+ return UFO_TASK_MODE_CPU | UFO_TASK_MODE_GENERATOR;
+}
+
+
+static gboolean
+ufo_roof_read_task_generate (UfoTask *task,
+ UfoBuffer *output,
+ UfoRequisition *requisition)
+{
+ GError *gerr = NULL;
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (task);
+ UfoRoofConfig *cfg = priv->cfg;
+
+ void *output_buffer = ufo_buffer_get_host_array(output, NULL);
+ UfoRoofPacketBlockHeader *header = UFO_ROOF_PACKET_BLOCK_HEADER(output_buffer, cfg);
+
+ if (priv->stop)
+ return FALSE;
+
+ guint packets = priv->reader->read(priv->reader, output_buffer, &gerr);
+ if (gerr) {
+ g_warning("Error reciving data: %s", gerr->message);
+ return FALSE;
+ }
+
+ // FIXME: End of data (shall we restart in the network case?)
+ if (!packets)
+ return FALSE;
+
+ // Shall I use UFO metadata (ufo_buffer_set_metadata) insead?
+ header->channel_id = priv->id;
+ header->n_packets = packets;
+
+ return TRUE;
+}
+
+static void
+ufo_roof_read_task_set_property (GObject *object,
+ guint property_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ if (priv->config) g_free(priv->config);
+ priv->config = g_value_dup_string(value);
+ break;
+ case PROP_ID:
+ priv->id = g_value_get_uint (value);
+ break;
+ case PROP_STOP:
+ priv->stop = g_value_get_boolean (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_roof_read_task_get_property (GObject *object,
+ guint property_id,
+ GValue *value,
+ GParamSpec *pspec)
+{
+ UfoRoofReadTaskPrivate *priv = UFO_ROOF_READ_TASK_GET_PRIVATE (object);
+
+ switch (property_id) {
+ case PROP_CONFIG:
+ g_value_set_string(value, priv->config);
+ break;
+ case PROP_ID:
+ g_value_set_uint (value, priv->id);
+ break;
+ case PROP_STOP:
+ g_value_set_boolean (value, priv->stop);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+ufo_task_interface_init (UfoTaskIface *iface)
+{
+ iface->setup = ufo_roof_read_task_setup;
+ iface->get_num_inputs = ufo_roof_read_task_get_num_inputs;
+ iface->get_num_dimensions = ufo_roof_read_task_get_num_dimensions;
+ iface->get_mode = ufo_roof_read_task_get_mode;
+ iface->get_requisition = ufo_roof_read_task_get_requisition;
+ iface->generate = ufo_roof_read_task_generate;
+}
+
+static void
+ufo_roof_read_task_class_init (UfoRoofReadTaskClass *klass)
+{
+ GObjectClass *oclass = G_OBJECT_CLASS (klass);
+
+ oclass->set_property = ufo_roof_read_task_set_property;
+ oclass->get_property = ufo_roof_read_task_get_property;
+ oclass->finalize = ufo_roof_read_task_finalize;
+
+ properties[PROP_CONFIG] =
+ g_param_spec_string ("config",
+ "ROOF configuration",
+ "Path to ROOF configuration file",
+ "",
+ G_PARAM_READWRITE);
+
+ properties[PROP_ID] =
+ g_param_spec_uint ("id",
+ "Reader ID",
+ "ID for multi-port servers",
+ 0, G_MAXUINT, 1,
+ G_PARAM_READWRITE);
+
+ properties[PROP_STOP] =
+ g_param_spec_boolean ("stop",
+ "Stop flag",
+ "Stop socket servers and terminates filter execution",
+ FALSE,
+ G_PARAM_READWRITE);
+
+ for (guint i = PROP_0 + 1; i < N_PROPERTIES; i++)
+ g_object_class_install_property (oclass, i, properties[i]);
+
+ g_type_class_add_private (oclass, sizeof(UfoRoofReadTaskPrivate));
+}
+
+static void
+ufo_roof_read_task_init(UfoRoofReadTask *self)
+{
+ self->priv = UFO_ROOF_READ_TASK_GET_PRIVATE(self);
+}
diff --git a/src/ufo-roof-read-task.h b/src/ufo-roof-read-task.h
new file mode 100644
index 0000000..2fe45c4
--- /dev/null
+++ b/src/ufo-roof-read-task.h
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2011-2013 Karlsruhe Institute of Technology
+ *
+ * This file is part of Ufo.
+ *
+ * This library is free software: you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UFO_ROOF_READ_TASK_H
+#define __UFO_ROOF_READ_TASK_H
+
+#include <ufo/ufo.h>
+
+G_BEGIN_DECLS
+
+#define UFO_TYPE_ROOF_READ_TASK (ufo_roof_read_task_get_type())
+#define UFO_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTask))
+#define UFO_IS_ROOF_READ_TASK(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), UFO_TYPE_ROOF_READ_TASK))
+#define UFO_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass))
+#define UFO_IS_ROOF_READ_TASK_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), UFO_TYPE_ROOF_READ_TASK))
+#define UFO_ROOF_READ_TASK_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), UFO_TYPE_ROOF_READ_TASK, UfoRoofReadTaskClass))
+
+typedef struct _UfoRoofReadTask UfoRoofReadTask;
+typedef struct _UfoRoofReadTaskClass UfoRoofReadTaskClass;
+typedef struct _UfoRoofReadTaskPrivate UfoRoofReadTaskPrivate;
+
+struct _UfoRoofReadTask {
+ UfoTaskNode parent_instance;
+
+ UfoRoofReadTaskPrivate *priv;
+};
+
+struct _UfoRoofReadTaskClass {
+ UfoTaskNodeClass parent_class;
+};
+
+UfoNode *ufo_roof_read_task_new (void);
+GType ufo_roof_read_task_get_type (void);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ufo-roof-read.h b/src/ufo-roof-read.h
new file mode 100644
index 0000000..50dbdf3
--- /dev/null
+++ b/src/ufo-roof-read.h
@@ -0,0 +1,17 @@
+#ifndef __UFO_ROOF_READ_H
+#define __UFO_ROOF_READ_H
+
+#include "ufo-roof-config.h"
+
+typedef struct _UfoRoofReadInterface UfoRoofReadInterface;
+
+typedef guint (*UfoRoofReaderRead)(UfoRoofReadInterface *reader, void *buf, GError **error);
+typedef void (*UfoRoofReaderClose)(UfoRoofReadInterface *reader);
+
+struct _UfoRoofReadInterface {
+ UfoRoofReaderRead read;
+ UfoRoofReaderClose close;
+};
+
+
+#endif /* __UFO_ROOF_READ_H */
diff --git a/src/ufo-roof.h b/src/ufo-roof.h
new file mode 100644
index 0000000..c3edda2
--- /dev/null
+++ b/src/ufo-roof.h
@@ -0,0 +1,20 @@
+#ifndef __UFO_ROOF_H
+#define __UFO_ROOF_H
+
+#include "ufo-roof-config.h"
+#include "ufo-roof-error.h"
+
+#define UFO_ROOF_PACKET_HEADER(buf) ((UfoRoofPacketHeader*)(buf))
+#define UFO_ROOF_PACKET_BLOCK_HEADER(buf, cfg) ((UfoRoofPacketBlockHeader*)(((void*)buf) + cfg->max_packets * cfg->max_packet_size))
+
+typedef struct {
+ uint32_t packet_id; // Sequential Packet ID (numbered from 0)
+} UfoRoofPacketHeader;
+
+typedef struct {
+ uint32_t channel_id; // Specifies channel on which the data were received (numbered from 0)
+ uint32_t n_packets; // Number of packets
+} UfoRoofPacketBlockHeader;
+
+
+#endif /* __UFO_ROOF_H */
diff --git a/tests/roof.json b/tests/roof.json
new file mode 100644
index 0000000..0dcd1e7
--- /dev/null
+++ b/tests/roof.json
@@ -0,0 +1,31 @@
+{
+ "network": {
+ "protocol": "udp",
+ "port": 4000,
+ "streams": 16,
+ "payload_size": 1280,
+ "dataset_size": 1024000
+ },
+ "performance": {
+ "buffer_size": 2,
+ "packets_at_once": 100
+ },
+ "simulation": {
+ "first_file_number": 1,
+ "path": "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat"
+ },
+ "setup": {
+ "planes": 2,
+ "modules": 16,
+ "bit_depth": 16,
+ "pixels_per_module": 16,
+ "samples_per_rotation": 2000000
+ },
+ "geometry": {
+ "fan_projections": 1000,
+ "fan_detectors": 432,
+ "parallel_projections": 512,
+ "parallel_detectors": 256,
+ "detector_diameter": 216
+ }
+}
diff --git a/tests/roof.py b/tests/roof.py
new file mode 100644
index 0000000..2138931
--- /dev/null
+++ b/tests/roof.py
@@ -0,0 +1,46 @@
+import gi
+import sys
+import json
+import gobject
+
+from gi.repository import Ufo
+from gi.repository import GObject
+
+class RoofConfig:
+ def __init__(self, config="roof.json"):
+ self.streams = 1
+
+ with open(config) as json_file:
+ cfg = json.load(json_file)
+ if cfg.get("network", {}).get("streams") != None:
+ self.streams = cfg["network"]["streams"]
+ elif cfg.get("setup", {}).get("modules") != None:
+ self.streams = cfg["setup"]["modules"]
+
+config = "roof.json"
+cfg = RoofConfig(config)
+
+pm = Ufo.PluginManager()
+graph = Ufo.TaskGraph()
+scheduler = Ufo.Scheduler()
+
+
+build = pm.get_task('roof-build')
+build.set_properties(config=config, number=0)
+
+write = pm.get_task('write')
+write.set_properties(filename="test.raw")
+
+for id in range(cfg.streams):
+ read = pm.get_task('roof-read')
+ read.set_properties(config=config, id=id)
+ graph.connect_nodes(read, build)
+ build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT)
+
+#read_task.set_properties(path='/home/data/*.tif', start=10, number=100)
+#graph.connect_nodes_full(read, write, 0)
+graph.connect_nodes(build, write)
+
+
+
+scheduler.run(graph)
diff --git a/tests/roof.sh b/tests/roof.sh
new file mode 100755
index 0000000..d0ec30a
--- /dev/null
+++ b/tests/roof.sh
@@ -0,0 +1,3 @@
+cat roof.yaml | yq . > roof.json
+
+GI_TYPELIB_PATH="/usr/local/lib64/girepository-1.0/" python roof.py
diff --git a/tests/roof.yaml b/tests/roof.yaml
new file mode 100644
index 0000000..a49d53b
--- /dev/null
+++ b/tests/roof.yaml
@@ -0,0 +1,26 @@
+network:
+ protocol: udp
+ port: 4000
+ streams: 16
+# header_size: 0
+ payload_size: 1280
+# max_packet_size: 1284
+ dataset_size: 1024000
+performance:
+ buffer_size: 2
+ packets_at_once: 100
+simulation:
+ first_file_number: 1
+ path: "/mnt/fast/ROOF2/roof2-data.pumpe256/meas/data_pumpe_dyn_192.168.100_%02u.dat"
+setup:
+ planes: 2
+ modules: 16
+ bit_depth: 16
+ pixels_per_module: 16
+ samples_per_rotation: 2000000
+geometry:
+ fan_projections: 1000
+ fan_detectors: 432
+ parallel_projections: 512
+ parallel_detectors: 256
+ detector_diameter: 216
diff --git a/tests/test_file.sh b/tests/test_file.sh
new file mode 100644
index 0000000..5f47e45
--- /dev/null
+++ b/tests/test_file.sh
@@ -0,0 +1,11 @@
+#! /bin/bash
+
+packet_size=1280
+packets_per_dataset=50
+
+for packet in $(seq 0 24); do
+ for id in $(seq 0 15); do
+ name=$(ls *$id.dat | grep -P "_0?$id.dat")
+ dd if=$name of="roof_test.raw" bs=$packet_size count=$packets_per_dataset skip=$((packet * $packets_per_dataset)) oflag=append conv=notrunc
+ done
+done