1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import json 4from collections import defaultdict 5import re 6import time 7import shutil 8import unittest 9from ruffus import pipeline_run, pipeline_printout, Pipeline, transform, follows, posttask, merge, \ 10 mkdir, suffix, originate, regex, inputs, jobs_limit, files 11import sys 12 13""" 14 15 branching.py 16 17 test branching dependencies 18 19""" 20 21 22import os 23tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 24 25# add grandparent to search path for testing 26grandparent_dir = os.path.abspath( 27 os.path.join(os.path.dirname(__file__), "..", "..")) 28sys.path.insert(0, grandparent_dir) 29 30# module name = script name without extension 31module_name = os.path.splitext(os.path.basename(__file__))[0] 32 33 34# funky code to import by file name 35parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) 36 37 38# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 39 40# imports 41 42 43# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 44 45try: 46 from StringIO import StringIO 47except: 48 from io import StringIO 49 50# use simplejson in place of json for python < 2.6 51# try: 52# import json 53# except ImportError: 54# import simplejson 55# json = simplejson 56 57# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 58 59# Functions 60 61 62# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 63 64def check_job_io(infiles, outfiles, extra_params): 65 """ 66 cat input files content to output files 67 after writing out job parameters 68 """ 69 # dump parameters 70 params = (infiles, outfiles) + extra_params 71 72 if isinstance(infiles, str): 73 infiles = [infiles] 74 elif infiles is None: 75 infiles = [] 76 if isinstance(outfiles, str): 77 outfiles = [outfiles] 78 output_text = list() 79 for f in infiles: 80 with open(f) as ii: 81 output_text.append(ii.read()) 82 output_text = "".join(sorted(output_text)) 83 output_text += json.dumps(infiles) + " -> " + json.dumps(outfiles) + "\n" 84 for f in outfiles: 85 with open(f, "w") as oo: 86 oo.write(output_text) 87 88 89# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 90 91# Main logic 92 93 94# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 95 96 97# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 98 99# Tasks 100 101 102# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 103# 104# 1 -> 2 -> 3 -> 105# -> 4 -> 106# 5 -> 6 107# 108 109def do_write(file_name, what): 110 with open(file_name, "a") as oo: 111 oo.write(what) 112 113 114test_file = tempdir + "task.done" 115# 116# task1 117# 118 119 120@originate([tempdir + d for d in ('a.1', 'b.1', 'c.1')]) 121@follows(mkdir(tempdir)) 122@posttask(lambda: do_write(test_file, "Task 1 Done\n")) 123def task1(outfile, *extra_params): 124 """ 125 First task 126 """ 127 with open(tempdir + "jobs.start", "a") as oo: 128 oo.write('job = %s\n' % json.dumps([None, outfile])) 129 check_job_io(None, outfile, extra_params) 130 with open(tempdir + "jobs.finish", "a") as oo: 131 oo.write('job = %s\n' % json.dumps([None, outfile])) 132 133 134# 135# task2 136# 137@posttask(lambda: do_write(test_file, "Task 2 Done\n")) 138@transform(task1, suffix(".1"), ".2") 139def task2(infiles, outfiles, *extra_params): 140 """ 141 Second task 142 """ 143 with open(tempdir + "jobs.start", "a") as oo: 144 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 145 check_job_io(infiles, outfiles, extra_params) 146 with open(tempdir + "jobs.finish", "a") as oo: 147 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 148 149 150# 151# task3 152# 153@transform(task2, regex('(.*).2'), inputs([r"\1.2", tempdir + "a.1"]), r'\1.3') 154@posttask(lambda: do_write(test_file, "Task 3 Done\n")) 155def task3(infiles, outfiles, *extra_params): 156 """ 157 Third task 158 """ 159 with open(tempdir + "jobs.start", "a") as oo: 160 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 161 check_job_io(infiles, outfiles, extra_params) 162 with open(tempdir + "jobs.finish", "a") as oo: 163 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 164 165 166# 167# task4 168# 169@jobs_limit(1) 170@transform(tempdir + "*.1", suffix(".1"), ".4") 171@follows(task1) 172@posttask(lambda: do_write(test_file, "Task 4 Done\n")) 173def task4(infiles, outfiles, *extra_params): 174 """ 175 Fourth task is extra slow 176 """ 177 with open(tempdir + "jobs.start", "a") as oo: 178 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 179 time.sleep(0.1) 180 check_job_io(infiles, outfiles, extra_params) 181 with open(tempdir + "jobs.finish", "a") as oo: 182 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 183 184# 185# task5 186# 187 188 189@files(None, tempdir + 'a.5') 190@follows(mkdir(tempdir)) 191@posttask(lambda: do_write(test_file, "Task 5 Done\n")) 192def task5(infiles, outfiles, *extra_params): 193 """ 194 Fifth task is extra slow 195 """ 196 with open(tempdir + "jobs.start", "a") as oo: 197 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 198 time.sleep(1) 199 check_job_io(infiles, outfiles, extra_params) 200 with open(tempdir + "jobs.finish", "a") as oo: 201 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 202 203# 204# task6 205# 206# @files([[[tempdir + d for d in 'a.3', 'b.3', 'c.3', 'a.4', 'b.4', 'c.4', 'a.5'], tempdir + 'final.6']]) 207 208 209@merge([task3, task4, task5], tempdir + "final.6") 210@follows(task3, task4, task5, ) 211@posttask(lambda: do_write(test_file, "Task 6 Done\n")) 212def task6(infiles, outfiles, *extra_params): 213 """ 214 final task 215 """ 216 with open(tempdir + "jobs.start", "a") as oo: 217 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 218 check_job_io(infiles, outfiles, extra_params) 219 with open(tempdir + "jobs.finish", "a") as oo: 220 oo.write('job = %s\n' % json.dumps([infiles, outfiles])) 221 222 223# 224# Use equivalent but new sytle syntax 225# 226test_pipeline = Pipeline("test") 227 228test_pipeline.originate(task_func=task1, 229 output=[tempdir + d for d in ('a.1', 'b.1', 'c.1')])\ 230 .follows(mkdir(tempdir))\ 231 .posttask(lambda: do_write(test_file, "Task 1 Done\n")) 232 233test_pipeline.transform(task_func=task2, 234 input=task1, 235 filter=suffix(".1"), 236 output=".2") \ 237 .posttask(lambda: do_write(test_file, "Task 2 Done\n")) 238 239test_pipeline.transform(task3, task2, regex('(.*).2'), inputs([r"\1.2", tempdir + "a.1"]), r'\1.3')\ 240 .posttask(lambda: do_write(test_file, "Task 3 Done\n")) 241 242 243test_pipeline.transform(task4, tempdir + "*.1", suffix(".1"), ".4")\ 244 .follows(task1)\ 245 .posttask(lambda: do_write(test_file, "Task 4 Done\n"))\ 246 .jobs_limit(1) 247 248test_pipeline.files(task5, None, tempdir + 'a.5')\ 249 .follows(mkdir(tempdir))\ 250 .posttask(lambda: do_write(test_file, "Task 5 Done\n")) 251 252test_pipeline.merge(task_func=task6, 253 input=[task3, task4, task5], 254 output=tempdir + "final.6")\ 255 .follows(task3, task4, task5, ) \ 256 .posttask(lambda: do_write(test_file, "Task 6 Done\n")) 257 258 259def check_job_order_correct(filename): 260 """ 261 1 -> 2 -> 3 -> 262 -> 4 -> 263 5 -> 6 264 """ 265 266 precedence_rules = [[1, 2], 267 [2, 3], 268 [1, 4], 269 [5, 6], 270 [3, 6], 271 [4, 6]] 272 273 index_re = re.compile(r'.*\.([0-9])["\]\n]*$') 274 job_indices = defaultdict(list) 275 with open(filename) as ii: 276 for linenum, l in enumerate(ii): 277 m = index_re.search(l) 278 if not m: 279 raise "Non-matching line in [%s]" % filename 280 job_indices[int(m.group(1))].append(linenum) 281 282 for job_index in job_indices: 283 job_indices[job_index].sort() 284 285 for before, after in precedence_rules: 286 if before not in job_indices or after not in job_indices: 287 continue 288 if job_indices[before][-1] >= job_indices[after][0]: 289 raise Exception("Precedence violated for job %d [line %d] and job %d [line %d] of [%s]" 290 % (before, job_indices[before][-1], 291 after, job_indices[after][0], 292 filename)) 293 294 295def check_final_output_correct(after_touch_files=False): 296 """ 297 check if the final output in final.6 is as expected 298 """ 299 expected_output = \ 300 """ ["DIR/a.1"] -> ["DIR/a.2"] 301 ["DIR/a.1"] -> ["DIR/a.4"] 302 ["DIR/a.2", "DIR/a.1"] -> ["DIR/a.3"] 303 ["DIR/a.3", "DIR/b.3", "DIR/c.3", "DIR/a.4", "DIR/b.4", "DIR/c.4", "DIR/a.5"] -> ["DIR/final.6"] 304 ["DIR/b.1"] -> ["DIR/b.2"] 305 ["DIR/b.1"] -> ["DIR/b.4"] 306 ["DIR/b.2", "DIR/a.1"] -> ["DIR/b.3"] 307 ["DIR/c.1"] -> ["DIR/c.2"] 308 ["DIR/c.1"] -> ["DIR/c.4"] 309 ["DIR/c.2", "DIR/a.1"] -> ["DIR/c.3"] 310 [] -> ["DIR/a.1"] 311 [] -> ["DIR/a.1"] 312 [] -> ["DIR/a.1"] 313 [] -> ["DIR/a.1"] 314 [] -> ["DIR/a.1"] 315 [] -> ["DIR/a.5"] 316 [] -> ["DIR/b.1"] 317 [] -> ["DIR/b.1"] 318 [] -> ["DIR/c.1"] 319 [] -> ["DIR/c.1"]""" 320 321 expected_output = expected_output.replace( 322 " ", "").replace("DIR/", tempdir).split("\n") 323 orig_expected_output = expected_output 324 if after_touch_files: 325 expected_output.pop(-3) 326 with open(tempdir + "final.6", "r") as ii: 327 final_6_contents = sorted([l.rstrip() for l in ii.readlines()]) 328 if final_6_contents != expected_output: 329 print("Actual:", file=sys.stderr) 330 for ll in final_6_contents: 331 print(ll, file=sys.stderr) 332 print("_" * 80, file=sys.stderr) 333 print("Expected:", file=sys.stderr) 334 for ll in orig_expected_output: 335 print(ll, file=sys.stderr) 336 print("_" * 80, file=sys.stderr) 337 for i, (l1, l2) in enumerate(zip(final_6_contents, expected_output)): 338 if l1 != l2: 339 sys.stderr.write( 340 "%d\nActual:\n >%s<\nExpected:\n >%s<\n" % (i, l1, l2)) 341 raise Exception("Final.6 output is not as expected\n") 342 343 344class Test_ruffus(unittest.TestCase): 345 346 def tearDown(self): 347 try: 348 shutil.rmtree(tempdir) 349 except: 350 pass 351 352 def setUp(self): 353 try: 354 shutil.rmtree(tempdir) 355 except: 356 pass 357 os.makedirs(tempdir) 358 359 def test_ruffus(self): 360 print("\n\n Run pipeline normally...") 361 pipeline_run(multiprocess=10, verbose=0, pipeline="main") 362 check_final_output_correct() 363 check_job_order_correct(tempdir + "jobs.start") 364 check_job_order_correct(tempdir + "jobs.finish") 365 print(" OK") 366 367 print("\n\n Touch task2 only:") 368 os.unlink(os.path.join(tempdir, "jobs.start")) 369 os.unlink(os.path.join(tempdir, "jobs.finish")) 370 print(" First delete b.1 for task2...") 371 os.unlink(os.path.join(tempdir, "b.1")) 372 print(" Then run with touch_file_only...") 373 pipeline_run([task2], multiprocess=10, 374 touch_files_only=True, verbose=0, pipeline="main") 375 376 # check touching has made task2 up to date 377 s = StringIO() 378 pipeline_printout(s, [task2], verbose=4, 379 wrap_width=10000, pipeline="main") 380 output_str = s.getvalue() 381 #print (">>>\n", output_str, "<<<\n", file=sys.stderr) 382 if "b.1" in output_str: 383 raise Exception("Expected b.1 created by touching...") 384 if "b.2" in output_str: 385 raise Exception("Expected b.2 created by touching...") 386 print(" Touching has made task2 up to date...\n") 387 388 print(" Then run normally again...") 389 pipeline_run(multiprocess=10, verbose=0, pipeline="main") 390 check_final_output_correct(True) 391 check_job_order_correct(tempdir + "jobs.start") 392 check_job_order_correct(tempdir + "jobs.finish") 393 394 def test_ruffus_new_syntax(self): 395 print("\n\n Run pipeline normally...") 396 test_pipeline.run(multiprocess=10, verbose=0) 397 check_final_output_correct() 398 check_job_order_correct(tempdir + "jobs.start") 399 check_job_order_correct(tempdir + "jobs.finish") 400 print(" OK") 401 402 print("\n\n Touch task2 only:") 403 os.unlink(os.path.join(tempdir, "jobs.start")) 404 os.unlink(os.path.join(tempdir, "jobs.finish")) 405 print(" First delete b.1 for task2...") 406 os.unlink(os.path.join(tempdir, "b.1")) 407 print(" Then run with touch_file_only...") 408 test_pipeline.run([task2], multiprocess=10, 409 touch_files_only=True, verbose=0) 410 411 # check touching has made task2 up to date 412 s = StringIO() 413 test_pipeline.printout(s, [task2], verbose=4, wrap_width=10000) 414 output_str = s.getvalue() 415 #print (">>>\n", output_str, "<<<\n", file=sys.stderr) 416 if "b.1" in output_str: 417 raise Exception("Expected b.1 created by touching...") 418 if "b.2" in output_str: 419 raise Exception("Expected b.2 created by touching...") 420 print(" Touching has made task2 up to date...\n") 421 422 print(" Then run normally again...") 423 test_pipeline.run(multiprocess=10, verbose=0) 424 check_final_output_correct(True) 425 check_job_order_correct(tempdir + "jobs.start") 426 check_job_order_correct(tempdir + "jobs.finish") 427 428 429if __name__ == '__main__': 430 unittest.main() 431