summaryrefslogtreecommitdiffstats
path: root/tests/roof/graph.py
blob: c34a3ed61059feb94c1c280ab470bcf153f12906 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import re
import gi

gi.require_version('Ufo', '0.0') 
from gi.repository import Ufo
from gi.repository import GObject

from roof.config import RoofConfig
from roof.defaults import roof_filters, roof_data_types, roof_raw_data_types, roof_aux_data_types
from roof.utils import get_filenames

class RoofGraph(RoofConfig):
    def __init__(self, config=None):
        self.pm  = Ufo.PluginManager()
        self.graph = Ufo.TaskGraph()
        self.scheduler = Ufo.Scheduler()
        self.tasks = {}

        super(RoofGraph, self).__init__()

    def get_task(self, name, **kwargs):
        task = self.pm.get_task(name)
        task.set_properties(name, **kwargs)
        return task

    def save_task(self, stage, alias, task):
        if stage is None: stage = "general"
        if stage not in self.tasks: self.tasks[stage] = {}
        self.tasks[stage][alias if alias is not None else name] = task
        return task

    def get_roof_task(self, name, **kwargs):
        kwargs.update(config = self.config_file)
        return self.get_task(name, **kwargs)
    
    def get_processor_task(self, stage, name, **kwargs):
        extra_args = self.get_opt(stage, name + '-options')
        if extra_args is not None: kwargs.update(extra_args)
        if (re.compile('roof').match(name)): kwargs.update(config = self.config_file)
        return self.save_task(stage, name, self.get_task(name, **kwargs))
    
    def get_reader(self):
        first = self.get_opt('data', 'first_file_number', 1)
        if self.args.read:
            # Reconstruction from standard UFO files
            path = self.get_roof_path(self.args.read)
            step = 1
            if (self.args.plane is not None) and (self.args.plane > 0): 
                first += self.args.plane - 1;
                step = self.planes

            params = { 'path': path, 'first': first, 'step': step }
            if self.args.number: 
                params['number'] = self.args.number
            
            print ("Reading {} data from {}".format(self.args.read,path))
            return self.get_task('read', **params)
        else:
            path = None
            if self.args.simulate:
                first = self.get_opt('simulation', 'first_file_number', first)
                base_path = self.get_opt('simulation', 'base_path', self.path)
                read_path = self.get_opt('simulation', self.args.write if self.args.write and self.args.write in roof_aux_data_types else 'data')
                path = read_path if read_path.startswith('/') else  base_path + '/' + read_path
                print ("Simulating packets from {}".format(path))

            # Reconstruction from network or simulated data (also generation of flat/dark-fields)
            build_type = "raw" if self.args.noroof else "sino" if self.check_writer_type_is_raw() else "ufo"
            build = self.get_roof_task('roof-build', simulate = self.args.simulate, number = self.args.number, build = build_type)
            for id in range(self.streams):
                read = self.get_roof_task('roof-read', id = id, simulate = self.args.simulate, path = path, first_file_number = first)
                self.graph.connect_nodes(read, build)
                build.bind_property('stop', read, 'stop', GObject.BindingFlags.DEFAULT)

            return build

    def get_writer(self):
        path = self.get_writer_path()
        if path is None:
            print ("Starting ROOF using NULL writter")
            write = self.get_task('null')
        else:
            # FIXME: If writting non raw data, we may need to generate all-0-frames if something broken/corrupted.
            print ("Starting ROOF streaming to {}".format(path))
            write = self.get_task('write', filename=path)
        return write

    def get_correction_flat_field_correct(self, head):
        # Standard UFO reconstruction stack distinguish flat/dark-fields recorded before and after experiment. We only do 'before experiment' part.
        darks = self.get_roof_path('dark_fields')
        n_darks = len(get_filenames(darks))
        if n_darks == 0: raise FileNotFoundError("Dark fields are not found in {}".format(darks))
        flats = self.get_roof_path('falt_fields')
        n_flats = len(get_filenames(flats))
        if n_flats == 0: raise FileNotFoundError("Flat fields are not found in {}".format(flats))
        dark_reader = self.get_task('read', path = darks)
        flat_reader = self.get_task('read', path = flats)

        # We are using standard get_task here because this is too generic plugin to allow config-based customization
        mode = self.get_opt('correction', 'aggregation', 'average')
        if mode == 'median':
            dark_stack = self.get_task('stack', number = n_darks)
            dark_reduced = self.get_task('flatten', mode = 'median')
            flat_stack = self.get_task('stack', number = n_flats)
            flat_reduced = self.get_task('flatten', mode = 'median')

            self.graph.connect_nodes(dark_reader, dark_stack)
            self.graph.connect_nodes(dark_stack, dark_reduced)
            self.graph.connect_nodes(flat_reader, flat_stack)
            self.graph.connect_nodes(flat_stack, flat_reduced)
        elif mode == 'average':
            dark_reduced = self.get_task('average')
            flat_reduced = self.get_task('average')
            self.graph.connect_nodes(dark_reader, dark_reduced)
            self.graph.connect_nodes(flat_reader, flat_reduced)
        else:
            raise ValueError('Invalid reduction mode')

        ffc = self.get_task('flat-field-correct') # dark_scale=args.dark_scale, absorption_correct=args.absorptivity, fix_nan_and_inf=args.fix_nan_and_inf)
        self.graph.connect_nodes_full(head, ffc, 0)
        self.graph.connect_nodes_full(dark_reduced, ffc, 1)
        self.graph.connect_nodes_full(flat_reduced, ffc, 2)
        return ffc

    def get_processor(self, head, stage, writer = None):
        # skip (but not if not already skipped in previous processor)
        # how to connect readers to ffc?
        
        filters = self.get_opt(stage, 'filters', roof_filters[stage])
        read_here = self.args.read and self.args.read in roof_data_types[stage].keys()
        write_here = self.args.write and self.args.write in roof_data_types[stage].keys()
        
        start_pos = 0
        if read_here:
            start_filter = roof_data_types[stage][self.args.read]
            start_pos = filters.index(start_filter)

        last_pos = len(filters)
        if write_here:
            stop_filter = roof_data_types[stage][self.args.write]
            if stop_filter: last_pos = filters.index(stop_filter)

        # Will just execute empty range if we start reading from the end (e.g. 'fan-sinograms' in correction)
        for i in range(start_pos, last_pos):
            method = 'get_' + stage + '_' + filters[i].replace('-','_')
            if method in dir(self):
                f = getattr(self, method)(head)
            else:
                f = self.get_processor_task(stage, filters[pos])
                graph.connect_nodes(head, f)
            head = f 

        if write_here and writer:
            self.graph.connect_nodes(head, writer)
                        
        return None if write_here else head


    def get(self):
        reader = self.get_reader()
        writer = self.get_writer()

        # We support following operation modes (defined by modifiers -w -c -g ]
        # - Record mode: Writting raw data (raw-sinograms, flat-fields, dark-fields)                                                    [ no modified or -w <...> ]
        # - Write mode: The reconstruction is performed and data is written after the specified step (default)                          [ -w <all other data types> ]
        # - Control mode: Control branch and raw data writting                                                                          [ -c ]
        # - GUI mode: Visualization in GUI + raw_sinograms are written when enabled in GUI + some control tasks (also when enabled)     [ -g ]
    
        head = reader
        # Check if we are branching here
        if (self.args.track or self.args.gui) and (self.get_data_type() is not None):
            # FIXME: In GUI mode we can add here a 'write filter' to pause/resume writting. Alternative is to pass gobject flaga to fastwriter (this will be limited to fastwriter, then, which is likely OK)
            # FIXME: we may need to convert in the end if we are writing raw data and the data is comming from net/simulation
            # In other case (non branch), either we have already converted (in reader) or we don't need to convert (writing raw data). Small performance penalty if we convert before filter, but ....
            copy = Ufo.CopyTask()
            self.graph.connect_nodes(reader, copy)
            self.graph.connect_nodes(copy, writer)
            head = copy

        # Sinograms are already filtered in the reader
        if not self.args.read:
            main_filter = self.get_task('roof-filter', plane = self.args.plane) if self.args.plane else None
            if main_filter:
                self.graph.connect_nodes(head, main_filter)
                head = main_filter

        class finish(Exception): pass
        try:
            if not self.args.read or self.args.read in roof_data_types['correction'].keys():
                head = self.get_processor(head, 'correction', writer)
                if not head: raise finish()

            if head != reader or self.args.read in roof_data_types['reconstruction'].keys():
                head = self.get_processor(head, 'reconstruction', writer)
                if not head: raise finish()

            # if head split to 3 branches.... Otherwise, continue with control branch...
        except finish:
            pass
        
    def run(self):
        self.scheduler.run(self.graph)