1.. include:: ../../global.inc 2.. include:: manual_chapter_numbers.inc 3 4.. _new_manual.transform_in_parallel.code: 5 6###################################################################################################### 7|new_manual.transform_in_parallel.chapter_num|: Python Code for More on ``@transform``-ing data 8###################################################################################################### 9 10.. seealso:: 11 12 13 * :ref:`Manual Table of Contents <new_manual.table_of_contents>` 14 * :ref:`@transform syntax in detail <decorators.transform>` 15 * Back to |new_manual.transform_in_parallel.chapter_num|: :ref:`More on @transform-ing data and @originate <new_manual.transform_in_parallel>` 16 17******************************************* 18Producing several items / files per job 19******************************************* 20 21 :: 22 23 from ruffus import * 24 25 #--------------------------------------------------------------- 26 # Create pairs of input files 27 # 28 first_task_params = [ 29 ['job1.a.start', 'job1.b.start'], 30 ['job2.a.start', 'job2.b.start'], 31 ['job3.a.start', 'job3.b.start'], 32 ] 33 34 for input_file_pairs in first_task_params: 35 for input_file in input_file_pairs: 36 open(input_file, "w") 37 38 39 #--------------------------------------------------------------- 40 # 41 # first task 42 # 43 @transform(first_task_params, suffix(".start"), 44 [".output.1", 45 ".output.extra.1"], 46 "some_extra.string.for_example", 14) 47 def first_task(input_files, output_file_pair, 48 extra_parameter_str, extra_parameter_num): 49 for output_file in output_file_pair: 50 with open(output_file, "w"): 51 pass 52 53 54 #--------------------------------------------------------------- 55 # 56 # second task 57 # 58 @transform(first_task, suffix(".output.1"), ".output2") 59 def second_task(input_files, output_file): 60 with open(output_file, "w"): pass 61 62 63 #--------------------------------------------------------------- 64 # 65 # Run 66 # 67 pipeline_run([second_task]) 68 69============================= 70Resulting Output 71============================= 72 73 :: 74 75 >>> pipeline_run([second_task]) 76 Job = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed 77 Job = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed 78 Job = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed 79 Completed Task = first_task 80 Job = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed 81 Job = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed 82 Job = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed 83 Completed Task = second_task 84 85 86 87******************************************* 88Defining tasks function out of order 89******************************************* 90 91 .. code-block:: python 92 :emphasize-lines: 22 93 94 from ruffus import * 95 96 #--------------------------------------------------------------- 97 # Create pairs of input files 98 # 99 first_task_params = [ 100 ['job1.a.start', 'job1.b.start'], 101 ['job2.a.start', 'job2.b.start'], 102 ['job3.a.start', 'job3.b.start'], 103 ] 104 105 for input_file_pairs in first_task_params: 106 for input_file in input_file_pairs: 107 open(input_file, "w") 108 109 110 111 #--------------------------------------------------------------- 112 # 113 # second task defined first 114 # 115 # task name string wrapped in output_from(...) 116 @transform(output_from("first_task"), suffix(".output.1"), ".output2") 117 def second_task(input_files, output_file): 118 with open(output_file, "w"): pass 119 120 121 #--------------------------------------------------------------- 122 # 123 # first task 124 # 125 @transform(first_task_params, suffix(".start"), 126 [".output.1", 127 ".output.extra.1"], 128 "some_extra.string.for_example", 14) 129 def first_task(input_files, output_file_pair, 130 extra_parameter_str, extra_parameter_num): 131 for output_file in output_file_pair: 132 with open(output_file, "w"): 133 pass 134 135 136 #--------------------------------------------------------------- 137 # 138 # Run 139 # 140 pipeline_run([second_task]) 141 142 143============================= 144Resulting Output 145============================= 146 147 .. code-block:: pycon 148 149 >>> pipeline_run([second_task]) 150 Job = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed 151 Job = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed 152 Job = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed 153 Completed Task = first_task 154 Job = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed 155 Job = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed 156 Job = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed 157 Completed Task = second_task 158 159.. _new_manual.transform.multiple_dependencies.code: 160 161******************************************* 162Multiple dependencies 163******************************************* 164 165 .. code-block:: python 166 :emphasize-lines: 58 167 168 from ruffus import * 169 import time 170 import random 171 172 #--------------------------------------------------------------- 173 # Create pairs of input files 174 # 175 first_task_params = [ 176 ['job1.a.start', 'job1.b.start'], 177 ['job2.a.start', 'job2.b.start'], 178 ['job3.a.start', 'job3.b.start'], 179 ] 180 second_task_params = [ 181 ['job4.a.start', 'job4.b.start'], 182 ['job5.a.start', 'job5.b.start'], 183 ['job6.a.start', 'job6.b.start'], 184 ] 185 186 for input_file_pairs in first_task_params + second_task_params: 187 for input_file in input_file_pairs: 188 open(input_file, "w") 189 190 191 192 #--------------------------------------------------------------- 193 # 194 # first task 195 # 196 @transform(first_task_params, suffix(".start"), 197 [".output.1", 198 ".output.extra.1"], 199 "some_extra.string.for_example", 14) 200 def first_task(input_files, output_file_pair, 201 extra_parameter_str, extra_parameter_num): 202 for output_file in output_file_pair: 203 with open(output_file, "w"): 204 pass 205 time.sleep(random.random()) 206 207 208 209 #--------------------------------------------------------------- 210 # 211 # second task 212 # 213 @transform(second_task_params, suffix(".start"), 214 [".output.1", 215 ".output.extra.1"], 216 "some_extra.string.for_example", 14) 217 def second_task(input_files, output_file_pair, 218 extra_parameter_str, extra_parameter_num): 219 for output_file in output_file_pair: 220 with open(output_file, "w"): 221 pass 222 time.sleep(random.random()) 223 224 225 #--------------------------------------------------------------- 226 # 227 # third task 228 # 229 # depends on both first_task() and second_task() 230 @transform([first_task, second_task], suffix(".output.1"), ".output2") 231 def third_task(input_files, output_file): 232 with open(output_file, "w"): pass 233 234 235 #--------------------------------------------------------------- 236 # 237 # Run 238 # 239 pipeline_run([third_task], multiprocess = 6) 240 241============================= 242Resulting Output 243============================= 244 245 .. code-block:: pycon 246 247 >>> pipeline_run([third_task], multiprocess = 6) 248 Job = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed 249 Job = [[job6.a.start, job6.b.start] -> [job6.a.output.1, job6.a.output.extra.1], some_extra.string.for_example, 14] completed 250 Job = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed 251 Job = [[job4.a.start, job4.b.start] -> [job4.a.output.1, job4.a.output.extra.1], some_extra.string.for_example, 14] completed 252 Job = [[job5.a.start, job5.b.start] -> [job5.a.output.1, job5.a.output.extra.1], some_extra.string.for_example, 14] completed 253 Completed Task = second_task 254 Job = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed 255 Completed Task = first_task 256 Job = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed 257 Job = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed 258 Job = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed 259 Job = [[job4.a.output.1, job4.a.output.extra.1] -> job4.a.output2] completed 260 Job = [[job5.a.output.1, job5.a.output.extra.1] -> job5.a.output2] completed 261 Job = [[job6.a.output.1, job6.a.output.extra.1] -> job6.a.output2] completed 262 Completed Task = third_task 263 264 265******************************************* 266Multiple dependencies after @follows 267******************************************* 268 269 .. code-block:: python 270 :emphasize-lines: 31 271 272 from ruffus import * 273 import time 274 import random 275 276 #--------------------------------------------------------------- 277 # Create pairs of input files 278 # 279 first_task_params = [ 280 ['job1.a.start', 'job1.b.start'], 281 ['job2.a.start', 'job2.b.start'], 282 ['job3.a.start', 'job3.b.start'], 283 ] 284 second_task_params = [ 285 ['job4.a.start', 'job4.b.start'], 286 ['job5.a.start', 'job5.b.start'], 287 ['job6.a.start', 'job6.b.start'], 288 ] 289 290 for input_file_pairs in first_task_params + second_task_params: 291 for input_file in input_file_pairs: 292 open(input_file, "w") 293 294 295 296 #--------------------------------------------------------------- 297 # 298 # first task 299 # 300 @transform(first_task_params, suffix(".start"), 301 [".output.1", 302 ".output.extra.1"], 303 "some_extra.string.for_example", 14) 304 def first_task(input_files, output_file_pair, 305 extra_parameter_str, extra_parameter_num): 306 for output_file in output_file_pair: 307 with open(output_file, "w"): 308 pass 309 time.sleep(random.random()) 310 311 312 313 #--------------------------------------------------------------- 314 # 315 # second task 316 # 317 @follows("first_task") 318 @transform(second_task_params, suffix(".start"), 319 [".output.1", 320 ".output.extra.1"], 321 "some_extra.string.for_example", 14) 322 def second_task(input_files, output_file_pair, 323 extra_parameter_str, extra_parameter_num): 324 for output_file in output_file_pair: 325 with open(output_file, "w"): 326 pass 327 time.sleep(random.random()) 328 329 330 #--------------------------------------------------------------- 331 # 332 # third task 333 # 334 # depends on both first_task() and second_task() 335 @transform([first_task, second_task], suffix(".output.1"), ".output2") 336 def third_task(input_files, output_file): 337 with open(output_file, "w"): pass 338 339 340 #--------------------------------------------------------------- 341 # 342 # Run 343 # 344 pipeline_run([third_task], multiprocess = 6) 345 346======================================================================================= 347Resulting Output: ``first_task`` completes before ``second_task`` 348======================================================================================= 349 350 .. code-block:: pycon 351 352 >>> pipeline_run([third_task], multiprocess = 6) 353 Job = [[job2.a.start, job2.b.start] -> [job2.a.output.1, job2.a.output.extra.1], some_extra.string.for_example, 14] completed 354 Job = [[job3.a.start, job3.b.start] -> [job3.a.output.1, job3.a.output.extra.1], some_extra.string.for_example, 14] completed 355 Job = [[job1.a.start, job1.b.start] -> [job1.a.output.1, job1.a.output.extra.1], some_extra.string.for_example, 14] completed 356 Completed Task = first_task 357 Job = [[job4.a.start, job4.b.start] -> [job4.a.output.1, job4.a.output.extra.1], some_extra.string.for_example, 14] completed 358 Job = [[job6.a.start, job6.b.start] -> [job6.a.output.1, job6.a.output.extra.1], some_extra.string.for_example, 14] completed 359 Job = [[job5.a.start, job5.b.start] -> [job5.a.output.1, job5.a.output.extra.1], some_extra.string.for_example, 14] completed 360 Completed Task = second_task 361 Job = [[job1.a.output.1, job1.a.output.extra.1] -> job1.a.output2] completed 362 Job = [[job2.a.output.1, job2.a.output.extra.1] -> job2.a.output2] completed 363 Job = [[job3.a.output.1, job3.a.output.extra.1] -> job3.a.output2] completed 364 Job = [[job4.a.output.1, job4.a.output.extra.1] -> job4.a.output2] completed 365 Job = [[job5.a.output.1, job5.a.output.extra.1] -> job5.a.output2] completed 366 Job = [[job6.a.output.1, job6.a.output.extra.1] -> job6.a.output2] completed 367