1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import shutil 4import unittest 5from ruffus import pipeline_run, pipeline_printout, Pipeline, formatter, originate, subdivide, split 6import time 7import sys 8 9""" 10 11 test_split_subdivide_checkpointing.py 12 13 14""" 15import os 16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 17 18# add grandparent to search path for testing 19grandparent_dir = os.path.abspath( 20 os.path.join(os.path.dirname(__file__), "..", "..")) 21sys.path.insert(0, grandparent_dir) 22 23# module name = script name without extension 24module_name = os.path.splitext(os.path.basename(__file__))[0] 25 26 27# funky code to import by file name 28parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) 29 30 31# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 32 33# imports 34 35 36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 37 38 39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 40 41# Each time the pipeline is FORCED to rerun, 42# More files are created for each task 43 44 45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 46 47 48@originate([tempdir + 'start']) 49def make_start(outfile): 50 """ 51 -> start 52 """ 53 open(outfile, 'w').close() 54 55 56@split(make_start, tempdir + '*.split') 57def split_start(infiles, outfiles): 58 """ 59 -> XXX.split 60 where XXX = 0 .. N, 61 N = previous N + 1 62 """ 63 64 # split always runs exactly one job (unlike @subdivide) 65 # So it implicitly combines all its inputs before running and generating multiple output 66 # @originate generates multiple output so the input for @split is a list... 67 infile = infiles[0] 68 69 # clean up previous 70 for f in outfiles: 71 os.unlink(f) 72 73 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 74 # 75 # Create more files than the previous invocation 76 # 77 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 78 n_to_produce = len(outfiles) + 1 79 for i in range(n_to_produce): 80 f = '{}{}.split'.format(tempdir, i) 81 open(f, 'a').close() 82 83 84@subdivide(split_start, formatter(), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}') 85def subdivide_start(infile, outfiles, infile_basename): 86 # cleanup existing 87 for f in outfiles: 88 os.unlink(f) 89 90 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 91 # 92 # Create more files than the previous invocation 93 # 94 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 95 n_to_produce = len(outfiles) + 1 96 output_file_names = [] 97 for i in range(n_to_produce): 98 output_file_name = '{}_{}.subdivided'.format(infile_basename, i) 99 open(output_file_name, 'a').close() 100 output_file_names.append(output_file_name) 101 102 103TEST_VERBOSITY = 0 104 105 106class Test_ruffus(unittest.TestCase): 107 108 def tearDown(self): 109 # only tear down if not throw exception so we can debug? 110 try: 111 shutil.rmtree(tempdir) 112 except: 113 pass 114 115 def setUp(self): 116 try: 117 shutil.rmtree(tempdir) 118 except: 119 pass 120 os.makedirs(tempdir) 121 122 def check_file_exists_or_not_as_expected(self, expected_files, not_expected_files): 123 """ 124 Check if files exist / not exist 125 """ 126 for ee in expected_files: 127 if not os.path.exists(tempdir + ee): 128 raise Exception("Expected file %s" % (tempdir + ee)) 129 for ne in not_expected_files: 130 if os.path.exists(tempdir + ne): 131 raise Exception("Unexpected file %s" % (tempdir + ne)) 132 133 def test_newstyle_ruffus(self): 134 135 test_pipeline = Pipeline("test") 136 test_pipeline.originate(task_func=make_start, 137 output=[tempdir + 'start']) 138 test_pipeline.split(task_func=split_start, 139 input=make_start, output=tempdir + '*.split') 140 test_pipeline.subdivide(task_func=subdivide_start, input=split_start, filter=formatter( 141 ), output=tempdir + '{basename[0]}_*.subdivided', extras=[tempdir + '{basename[0]}']) 142 143 expected_files_after_1_runs = ["start", "0.split", "0_0.subdivided"] 144 expected_files_after_2_runs = [ 145 "1.split", "0_1.subdivided", "1_0.subdivided"] 146 expected_files_after_3_runs = [ 147 "2.split", "0_2.subdivided", "1_1.subdivided", "2_0.subdivided"] 148 expected_files_after_4_runs = [ 149 "3.split", "0_3.subdivided", "1_2.subdivided", "2_1.subdivided", "3_0.subdivided"] 150 151 print(" 1 Run pipeline normally...") 152 test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY) 153 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs, 154 expected_files_after_2_runs) 155 print(" 2 Check that running again does nothing. (All up to date).") 156 test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY) 157 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs, 158 expected_files_after_2_runs) 159 time.sleep(2) 160 161 print(" 3 Running again with forced tasks to generate more files...") 162 test_pipeline.run(forcedtorun_tasks=[ 163 "test::make_start"], multiprocess=10, verbose=TEST_VERBOSITY) 164 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 165 + expected_files_after_2_runs, 166 expected_files_after_3_runs) 167 print(" 4 Check that running again does nothing. (All up to date).") 168 test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY) 169 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 170 + expected_files_after_2_runs, 171 expected_files_after_3_runs) 172 time.sleep(2) 173 174 print(" 5 Running again with forced tasks to generate even more files...") 175 test_pipeline.run(forcedtorun_tasks=make_start, 176 multiprocess=10, verbose=TEST_VERBOSITY) 177 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 178 + expected_files_after_2_runs 179 + expected_files_after_3_runs, 180 expected_files_after_4_runs) 181 print(" 6 Check that running again does nothing. (All up to date).") 182 test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY) 183 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 184 + expected_files_after_2_runs 185 + expected_files_after_3_runs, 186 expected_files_after_4_runs) 187 188 def test_ruffus(self): 189 190 expected_files_after_1_runs = ["start", "0.split", "0_0.subdivided"] 191 expected_files_after_2_runs = [ 192 "1.split", "0_1.subdivided", "1_0.subdivided"] 193 expected_files_after_3_runs = [ 194 "2.split", "0_2.subdivided", "1_1.subdivided", "2_0.subdivided"] 195 expected_files_after_4_runs = [ 196 "3.split", "0_3.subdivided", "1_2.subdivided", "2_1.subdivided", "3_0.subdivided"] 197 198 print(" 1 Run pipeline normally...") 199 pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 200 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs, 201 expected_files_after_2_runs) 202 print(" 2 Check that running again does nothing. (All up to date).") 203 pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 204 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs, 205 expected_files_after_2_runs) 206 207 time.sleep(2) 208 209 print(" 3 Running again with forced tasks to generate more files...") 210 pipeline_run(forcedtorun_tasks=[ 211 make_start], multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 212 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 213 + expected_files_after_2_runs, 214 expected_files_after_3_runs) 215 print(" 4 Check that running again does nothing. (All up to date).") 216 pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 217 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 218 + expected_files_after_2_runs, 219 expected_files_after_3_runs) 220 time.sleep(2) 221 222 print(" 5 Running again with forced tasks to generate even more files...") 223 pipeline_run(forcedtorun_tasks=[ 224 make_start], multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 225 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 226 + expected_files_after_2_runs 227 + expected_files_after_3_runs, 228 expected_files_after_4_runs) 229 time.sleep(2) 230 print(" 6 Check that running again does nothing. (All up to date).") 231 pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main") 232 self.check_file_exists_or_not_as_expected(expected_files_after_1_runs 233 + expected_files_after_2_runs 234 + expected_files_after_3_runs, 235 expected_files_after_4_runs) 236 237 238if __name__ == '__main__': 239 if len(sys.argv) > 1 and "-v" == sys.argv.pop(): 240 TEST_VERBOSITY = 7 241 unittest.main() 242