summaryrefslogtreecommitdiffstats
path: root/tests/roof/graph.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/roof/graph.py')
-rw-r--r--tests/roof/graph.py203
1 files changed, 203 insertions, 0 deletions
diff --git a/tests/roof/graph.py b/tests/roof/graph.py
new file mode 100644
index 0000000..c34a3ed
--- /dev/null
+++ b/tests/roof/graph.py
@@ -0,0 +1,203 @@
+import re
+import gi
+
+gi.require_version('Ufo', '0.0')
+from gi.repository import Ufo
+from gi.repository import GObject
+
+from roof.config import RoofConfig
+from roof.defaults import roof_filters, roof_data_types, roof_raw_data_types, roof_aux_data_types
+from roof.utils import get_filenames
+
+class RoofGraph(RoofConfig):
+ def __init__(self, config=None):
+ self.pm = Ufo.PluginManager()
+ self.graph = Ufo.TaskGraph()
+ self.scheduler = Ufo.Scheduler()
+ self.tasks = {}
+
+ super(RoofGraph, self).__init__()
+
+ def get_task(self, name, **kwargs):
+ task = self.pm.get_task(name)
+ task.set_properties(name, **kwargs)
+ return task
+
+ def save_task(self, stage, alias, task):
+ if stage is None: stage = "general"
+ if stage not in self.tasks: self.tasks[stage] = {}
+ self.tasks[stage][alias if alias is not None else name] = task
+ return task
+
+ def get_roof_task(self, name, **kwargs):
+ kwargs.update(config = self.config_file)
+ return self.get_task(name, **kwargs)
+
+ def get_processor_task(self, stage, name, **kwargs):
+ extra_args = self.get_opt(stage, name + '-options')
+ if extra_args is not None: kwargs.update(extra_args)
+ if (re.compile('roof').match(name)): kwargs.update(config = self.config_file)
+ return self.save_task(stage, name, self.get_task(name, **kwargs))
+
+ def get_reader(self):
+ first = self.get_opt('data', 'first_file_number', 1)
+ if self.args.read:
+ # Reconstruction from standard UFO files
+ path = self.get_roof_path(self.args.read)
+ step = 1
+ if (self.args.plane is not None) and (self.args.plane > 0):
+ first += self.args.plane - 1;
+ step = self.planes
+
+ params = { 'path': path, 'first': first, 'step': step }
+ if self.args.number:
+ params['number'] = self.args.number
+
+ print ("Reading {} data from {}".format(self.args.read,path))
+ return self.get_task('read', **params)
+ else:
+ path = None
+ if self.args.simulate:
+ first = self.get_opt('simulation', 'first_file_number', first)
+ base_path = self.get_opt('simulation', 'base_path', self.path)
+ read_path = self.get_opt('simulation', self.args.write if self.args.write and self.args.write in roof_aux_data_types else 'data')
+ path = read_path if read_path.startswith('/') else base_path + '/' + read_path
+ print ("Simulating packets from {}".format(path))
+
+ # Reconstruction from network or simulated data (also generation of flat/dark-fields)
+ build_type = "raw" if self.args.noroof else "sino" if self.check_writer_type_is_raw() else "ufo"
+ build = self.get_roof_task('roof-build', simulate = self.args.simulate, number = self.args.number, build = build_type)
+ for id in range(self.streams):
+ read = self.get_roof_task('roof-read', id = id, simulate = self.args.simulate, path = path, first_file_number = first)
+ self.graph.connect_nodes(read, build)
+ build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT)
+
+ return build
+
+ def get_writer(self):
+ path = self.get_writer_path()
+ if path is None:
+ print ("Starting ROOF using NULL writter")
+ write = self.get_task('null')
+ else:
+ # FIXME: If writting non raw data, we may need to generate all-0-frames if something broken/corrupted.
+ print ("Starting ROOF streaming to {}".format(path))
+ write = self.get_task('write', filename=path)
+ return write
+
+ def get_correction_flat_field_correct(self, head):
+ # Standard UFO reconstruction stack distinguish flat/dark-fields recorded before and after experiment. We only do 'before experiment' part.
+ darks = self.get_roof_path('dark_fields')
+ n_darks = len(get_filenames(darks))
+ if n_darks == 0: raise FileNotFoundError("Dark fields are not found in {}".format(darks))
+ flats = self.get_roof_path('falt_fields')
+ n_flats = len(get_filenames(flats))
+ if n_flats == 0: raise FileNotFoundError("Flat fields are not found in {}".format(flats))
+ dark_reader = self.get_task('read', path = darks)
+ flat_reader = self.get_task('read', path = flats)
+
+ # We are using standard get_task here because this is too generic plugin to allow config-based customization
+ mode = self.get_opt('correction', 'aggregation', 'average')
+ if mode == 'median':
+ dark_stack = self.get_task('stack', number = n_darks)
+ dark_reduced = self.get_task('flatten', mode = 'median')
+ flat_stack = self.get_task('stack', number = n_flats)
+ flat_reduced = self.get_task('flatten', mode = 'median')
+
+ self.graph.connect_nodes(dark_reader, dark_stack)
+ self.graph.connect_nodes(dark_stack, dark_reduced)
+ self.graph.connect_nodes(flat_reader, flat_stack)
+ self.graph.connect_nodes(flat_stack, flat_reduced)
+ elif mode == 'average':
+ dark_reduced = self.get_task('average')
+ flat_reduced = self.get_task('average')
+ self.graph.connect_nodes(dark_reader, dark_reduced)
+ self.graph.connect_nodes(flat_reader, flat_reduced)
+ else:
+ raise ValueError('Invalid reduction mode')
+
+ ffc = self.get_task('flat-field-correct') # dark_scale=args.dark_scale, absorption_correct=args.absorptivity, fix_nan_and_inf=args.fix_nan_and_inf)
+ self.graph.connect_nodes_full(head, ffc, 0)
+ self.graph.connect_nodes_full(dark_reduced, ffc, 1)
+ self.graph.connect_nodes_full(flat_reduced, ffc, 2)
+ return ffc
+
+ def get_processor(self, head, stage, writer = None):
+ # skip (but not if not already skipped in previous processor)
+ # how to connect readers to ffc?
+
+ filters = self.get_opt(stage, 'filters', roof_filters[stage])
+ read_here = self.args.read and self.args.read in roof_data_types[stage].keys()
+ write_here = self.args.write and self.args.write in roof_data_types[stage].keys()
+
+ start_pos = 0
+ if read_here:
+ start_filter = roof_data_types[stage][self.args.read]
+ start_pos = filters.index(start_filter)
+
+ last_pos = len(filters)
+ if write_here:
+ stop_filter = roof_data_types[stage][self.args.write]
+ if stop_filter: last_pos = filters.index(stop_filter)
+
+ # Will just execute empty range if we start reading from the end (e.g. 'fan-sinograms' in correction)
+ for i in range(start_pos, last_pos):
+ method = 'get_' + stage + '_' + filters[i].replace('-','_')
+ if method in dir(self):
+ f = getattr(self, method)(head)
+ else:
+ f = self.get_processor_task(stage, filters[pos])
+ graph.connect_nodes(head, f)
+ head = f
+
+ if write_here and writer:
+ self.graph.connect_nodes(head, writer)
+
+ return None if write_here else head
+
+
+ def get(self):
+ reader = self.get_reader()
+ writer = self.get_writer()
+
+ # We support following operation modes (defined by modifiers -w -c -g ]
+ # - Record mode: Writting raw data (raw-sinograms, flat-fields, dark-fields) [ no modified or -w <...> ]
+ # - Write mode: The reconstruction is performed and data is written after the specified step (default) [ -w <all other data types> ]
+ # - Control mode: Control branch and raw data writting [ -c ]
+ # - GUI mode: Visualization in GUI + raw_sinograms are written when enabled in GUI + some control tasks (also when enabled) [ -g ]
+
+ head = reader
+ # Check if we are branching here
+ if (self.args.track or self.args.gui) and (self.get_data_type() is not None):
+ # FIXME: In GUI mode we can add here a 'write filter' to pause/resume writting. Alternative is to pass gobject flaga to fastwriter (this will be limited to fastwriter, then, which is likely OK)
+ # FIXME: we may need to convert in the end if we are writing raw data and the data is comming from net/simulation
+ # In other case (non branch), either we have already converted (in reader) or we don't need to convert (writing raw data). Small performance penalty if we convert before filter, but ....
+ copy = Ufo.CopyTask()
+ self.graph.connect_nodes(reader, copy)
+ self.graph.connect_nodes(copy, writer)
+ head = copy
+
+ # Sinograms are already filtered in the reader
+ if not self.args.read:
+ main_filter = self.get_task('roof-filter', plane = self.args.plane) if self.args.plane else None
+ if main_filter:
+ self.graph.connect_nodes(head, main_filter)
+ head = main_filter
+
+ class finish(Exception): pass
+ try:
+ if not self.args.read or self.args.read in roof_data_types['correction'].keys():
+ head = self.get_processor(head, 'correction', writer)
+ if not head: raise finish()
+
+ if head != reader or self.args.read in roof_data_types['reconstruction'].keys():
+ head = self.get_processor(head, 'reconstruction', writer)
+ if not head: raise finish()
+
+ # if head split to 3 branches.... Otherwise, continue with control branch...
+ except finish:
+ pass
+
+ def run(self):
+ self.scheduler.run(self.graph)
+ \ No newline at end of file