1.. include:: ../../global.inc
2.. include:: manual_chapter_numbers.inc
3
4.. _new_manual.transform_in_parallel.code:
5
6######################################################################################################
7|new_manual.transform_in_parallel.chapter_num|: Python Code for More on ``@transform``-ing data
8######################################################################################################
9
10.. seealso::
11
12
13    * :ref:`Manual Table of Contents <new_manual.table_of_contents>`
14    * :ref:`@transform syntax in detail <decorators.transform>`
15    * Back to |new_manual.transform_in_parallel.chapter_num|: :ref:`More on @transform-ing data and @originate <new_manual.transform_in_parallel>`
16
17*******************************************
18Producing several items / files per job
19*******************************************
20
21    ::
22
23        from ruffus import *
24
25        #---------------------------------------------------------------
26        #   Create pairs of input files
27        #
28        first_task_params = [
29                             ['job1.a.start', 'job1.b.start'],
30                             ['job2.a.start', 'job2.b.start'],
31                             ['job3.a.start', 'job3.b.start'],
32                            ]
33
34        for input_file_pairs in first_task_params:
35            for input_file in input_file_pairs:
36                open(input_file, "w")
37
38
39        #---------------------------------------------------------------
40        #
41        #   first task
42        #
43        @transform(first_task_params, suffix(".start"),
44                                [".output.1",
45                                 ".output.extra.1"],
46                               "some_extra.string.for_example", 14)
47        def first_task(input_files, output_file_pair,
48                        extra_parameter_str, extra_parameter_num):
49            for output_file in output_file_pair:
50                with open(output_file, "w"):
51                    pass
52
53
54        #---------------------------------------------------------------
55        #
56        #   second task
57        #
58        @transform(first_task, suffix(".output.1"), ".output2")
59        def second_task(input_files, output_file):
60            with open(output_file, "w"): pass
61
62
63        #---------------------------------------------------------------
64        #
65        #       Run
66        #
67        pipeline_run([second_task])
68
69=============================
70Resulting Output
71=============================
72
73        ::
74
75            >>> pipeline_run([second_task])
76                Job  = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed
77                Job  = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed
78                Job  = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed
79            Completed Task = first_task
80                Job  = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed
81                Job  = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed
82                Job  = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed
83            Completed Task = second_task
84
85
86
87*******************************************
88Defining tasks function out of order
89*******************************************
90
91    .. code-block:: python
92       :emphasize-lines: 22
93
94        from ruffus import *
95
96        #---------------------------------------------------------------
97        #   Create pairs of input files
98        #
99        first_task_params = [
100                             ['job1.a.start', 'job1.b.start'],
101                             ['job2.a.start', 'job2.b.start'],
102                             ['job3.a.start', 'job3.b.start'],
103                            ]
104
105        for input_file_pairs in first_task_params:
106            for input_file in input_file_pairs:
107                open(input_file, "w")
108
109
110
111        #---------------------------------------------------------------
112        #
113        #   second task defined first
114        #
115        #   task name string wrapped in output_from(...)
116        @transform(output_from("first_task"), suffix(".output.1"), ".output2")
117        def second_task(input_files, output_file):
118            with open(output_file, "w"): pass
119
120
121        #---------------------------------------------------------------
122        #
123        #   first task
124        #
125        @transform(first_task_params, suffix(".start"),
126                                [".output.1",
127                                 ".output.extra.1"],
128                               "some_extra.string.for_example", 14)
129        def first_task(input_files, output_file_pair,
130                        extra_parameter_str, extra_parameter_num):
131            for output_file in output_file_pair:
132                with open(output_file, "w"):
133                    pass
134
135
136        #---------------------------------------------------------------
137        #
138        #       Run
139        #
140        pipeline_run([second_task])
141
142
143=============================
144Resulting Output
145=============================
146
147        .. code-block:: pycon
148
149            >>> pipeline_run([second_task])
150                Job  = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed
151                Job  = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed
152                Job  = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed
153            Completed Task = first_task
154                Job  = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed
155                Job  = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed
156                Job  = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed
157            Completed Task = second_task
158
159.. _new_manual.transform.multiple_dependencies.code:
160
161*******************************************
162Multiple dependencies
163*******************************************
164
165    .. code-block:: python
166       :emphasize-lines: 58
167
168        from ruffus import *
169        import time
170        import random
171
172        #---------------------------------------------------------------
173        #   Create pairs of input files
174        #
175        first_task_params = [
176                             ['job1.a.start', 'job1.b.start'],
177                             ['job2.a.start', 'job2.b.start'],
178                             ['job3.a.start', 'job3.b.start'],
179                            ]
180        second_task_params = [
181                             ['job4.a.start', 'job4.b.start'],
182                             ['job5.a.start', 'job5.b.start'],
183                             ['job6.a.start', 'job6.b.start'],
184                            ]
185
186        for input_file_pairs in first_task_params + second_task_params:
187            for input_file in input_file_pairs:
188                open(input_file, "w")
189
190
191
192        #---------------------------------------------------------------
193        #
194        #   first task
195        #
196        @transform(first_task_params, suffix(".start"),
197                                [".output.1",
198                                 ".output.extra.1"],
199                               "some_extra.string.for_example", 14)
200        def first_task(input_files, output_file_pair,
201                        extra_parameter_str, extra_parameter_num):
202            for output_file in output_file_pair:
203                with open(output_file, "w"):
204                    pass
205            time.sleep(random.random())
206
207
208
209        #---------------------------------------------------------------
210        #
211        #   second task
212        #
213        @transform(second_task_params, suffix(".start"),
214                                [".output.1",
215                                 ".output.extra.1"],
216                               "some_extra.string.for_example", 14)
217        def second_task(input_files, output_file_pair,
218                        extra_parameter_str, extra_parameter_num):
219            for output_file in output_file_pair:
220                with open(output_file, "w"):
221                    pass
222            time.sleep(random.random())
223
224
225        #---------------------------------------------------------------
226        #
227        #   third task
228        #
229        #       depends on both first_task() and second_task()
230        @transform([first_task, second_task], suffix(".output.1"), ".output2")
231        def third_task(input_files, output_file):
232            with open(output_file, "w"): pass
233
234
235        #---------------------------------------------------------------
236        #
237        #       Run
238        #
239        pipeline_run([third_task], multiprocess = 6)
240
241=============================
242Resulting Output
243=============================
244
245    .. code-block:: pycon
246
247        >>> pipeline_run([third_task], multiprocess = 6)
248            Job  = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed
249            Job  = [[job6.a.start, job6.b.start] -> [job6.a.output.1, job6.a.output.extra.1], some_extra.string.for_example, 14] completed
250            Job  = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed
251            Job  = [[job4.a.start, job4.b.start] -> [job4.a.output.1, job4.a.output.extra.1], some_extra.string.for_example, 14] completed
252            Job  = [[job5.a.start, job5.b.start] -> [job5.a.output.1, job5.a.output.extra.1], some_extra.string.for_example, 14] completed
253        Completed Task = second_task
254            Job  = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed
255        Completed Task = first_task
256            Job  = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed
257            Job  = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed
258            Job  = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed
259            Job  = [[job4.a.output.1, job4.a.output.extra.1] -> job4.a.output2] completed
260            Job  = [[job5.a.output.1, job5.a.output.extra.1] -> job5.a.output2] completed
261            Job  = [[job6.a.output.1, job6.a.output.extra.1] -> job6.a.output2] completed
262        Completed Task = third_task
263
264
265*******************************************
266Multiple dependencies after @follows
267*******************************************
268
269    .. code-block:: python
270        :emphasize-lines: 31
271
272        from ruffus import *
273        import time
274        import random
275
276        #---------------------------------------------------------------
277        #   Create pairs of input files
278        #
279        first_task_params = [
280                             ['job1.a.start', 'job1.b.start'],
281                             ['job2.a.start', 'job2.b.start'],
282                             ['job3.a.start', 'job3.b.start'],
283                            ]
284        second_task_params = [
285                             ['job4.a.start', 'job4.b.start'],
286                             ['job5.a.start', 'job5.b.start'],
287                             ['job6.a.start', 'job6.b.start'],
288                            ]
289
290        for input_file_pairs in first_task_params + second_task_params:
291            for input_file in input_file_pairs:
292                open(input_file, "w")
293
294
295
296        #---------------------------------------------------------------
297        #
298        #   first task
299        #
300        @transform(first_task_params, suffix(".start"),
301                                [".output.1",
302                                 ".output.extra.1"],
303                               "some_extra.string.for_example", 14)
304        def first_task(input_files, output_file_pair,
305                        extra_parameter_str, extra_parameter_num):
306            for output_file in output_file_pair:
307                with open(output_file, "w"):
308                    pass
309            time.sleep(random.random())
310
311
312
313        #---------------------------------------------------------------
314        #
315        #   second task
316        #
317        @follows("first_task")
318        @transform(second_task_params, suffix(".start"),
319                                [".output.1",
320                                 ".output.extra.1"],
321                               "some_extra.string.for_example", 14)
322        def second_task(input_files, output_file_pair,
323                        extra_parameter_str, extra_parameter_num):
324            for output_file in output_file_pair:
325                with open(output_file, "w"):
326                    pass
327            time.sleep(random.random())
328
329
330        #---------------------------------------------------------------
331        #
332        #   third task
333        #
334        #       depends on both first_task() and second_task()
335        @transform([first_task, second_task], suffix(".output.1"), ".output2")
336        def third_task(input_files, output_file):
337            with open(output_file, "w"): pass
338
339
340        #---------------------------------------------------------------
341        #
342        #       Run
343        #
344        pipeline_run([third_task], multiprocess = 6)
345
346=======================================================================================
347Resulting Output: ``first_task`` completes before ``second_task``
348=======================================================================================
349
350    .. code-block:: pycon
351
352        >>> pipeline_run([third_task], multiprocess = 6)
353            Job  = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed
354            Job  = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed
355            Job  = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed
356        Completed Task = first_task
357            Job  = [[job4.a.start, job4.b.start] -> [job4.a.output.1, job4.a.output.extra.1], some_extra.string.for_example, 14] completed
358            Job  = [[job6.a.start, job6.b.start] -> [job6.a.output.1, job6.a.output.extra.1], some_extra.string.for_example, 14] completed
359            Job  = [[job5.a.start, job5.b.start] -> [job5.a.output.1, job5.a.output.extra.1], some_extra.string.for_example, 14] completed
360        Completed Task = second_task
361            Job  = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed
362            Job  = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed
363            Job  = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed
364            Job  = [[job4.a.output.1, job4.a.output.extra.1] -> job4.a.output2] completed
365            Job  = [[job5.a.output.1, job5.a.output.extra.1] -> job5.a.output2] completed
366            Job  = [[job6.a.output.1, job6.a.output.extra.1] -> job6.a.output2] completed
367