1#!/usr/local/bin/python3.8 2 3from __future__ import print_function 4from ruffus.ruffus_utility import RUFFUS_HISTORY_FILE, CHECKSUM_FILE_TIMESTAMPS, get_default_history_file_name 5from ruffus.drmaa_wrapper import run_job 6from ruffus.ruffus_exceptions import RethrownJobError 7import ruffus 8 9import unittest 10import os 11import sys 12import shutil 13try: 14 from StringIO import StringIO 15except: 16 from io import StringIO 17import time 18import re 19import subprocess 20 21exe_path = os.path.split(os.path.abspath(sys.argv[0]))[0] 22sys.path.insert(0, os.path.abspath(os.path.join(exe_path, "..", ".."))) 23 24# from ruffus.combinatorics import * 25 26 27workdir = 'tmp_test_job_history_with_exceptions' 28# sub-1s resolution in system? 29one_second_per_job = None 30throw_exception = False 31 32@ruffus.mkdir(workdir) 33@ruffus.originate([workdir + "/" + prefix + "_name.tmp1" for prefix in "abcdefghijk"]) 34def generate_initial_files1(on): 35 with open(on, 'w') as outfile: 36 pass 37 38 39@ruffus.transform(generate_initial_files1, 40 ruffus.suffix(".tmp1"), ".tmp2") 41def test_task2(infile, outfile): 42 print("%s start to run " % infile) 43 run_job("./five_second.py", run_locally=True) 44 print("%s wake up " % infile) 45 with open(outfile, "w") as p: 46 pass 47 48 49@ruffus.transform(test_task2, ruffus.suffix(".tmp2"), ".tmp3") 50def test_task3(infile, outfile): 51 print("%s start to run " % infile) 52 # subprocess.check_call("./five_second.py") 53 run_job("./five_second.py", run_locally=True) 54 print("%s wake up " % infile) 55 with open(outfile, "w") as p: 56 pass 57 58 59def cleanup_tmpdir(): 60 os.system('rm -f %s %s' % 61 (os.path.join(workdir, '*'), RUFFUS_HISTORY_FILE)) 62 63 64def do_main(): 65 print("Press Ctrl-C Now!!", file=sys.stdout) 66 sys.stdout.flush() 67 time.sleep(2) 68 print("Start....", file=sys.stdout) 69 sys.stdout.flush() 70 ruffus.pipeline_run(verbose=11, 71 multiprocess=5, pipeline="main") 72 print("too late!!", file=sys.stdout) 73 sys.stdout.flush() 74 cleanup_tmpdir() 75 76 77do_main() 78