1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import re 4import shutil 5import unittest 6from ruffus.ruffus_utility import CHECKSUM_FILE_TIMESTAMPS, RUFFUS_HISTORY_FILE, get_default_history_file_name 7from ruffus.ruffus_exceptions import RethrownJobError 8from ruffus import pipeline_run, pipeline_printout, Pipeline, collate, mkdir, regex, \ 9 suffix, formatter, originate, transform 10import sys 11import os 12 13""" 14 15 test_job_history_with_exceptions.py 16 17 Make sure that when an exception is thrown only the current and following tasks fail 18 19""" 20 21# sub-1s resolution in system? 22one_second_per_job = None 23throw_exception = False 24 25tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 26 27# add grandparent to search path for testing 28grandparent_dir = os.path.abspath( 29 os.path.join(os.path.dirname(__file__), "..", "..")) 30sys.path.insert(0, grandparent_dir) 31 32# module name = script name without extension 33module_name = os.path.splitext(os.path.basename(__file__))[0] 34 35 36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 37# 38# imports 39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 40 41try: 42 from StringIO import StringIO 43except: 44 from io import StringIO 45 46 47# ___________________________________________________________________________ 48# 49# generate_initial_files1 50# ___________________________________________________________________________ 51@originate([tempdir + prefix + "_name.tmp1" for prefix in "abcd"]) 52def generate_initial_files1(on): 53 with open(on, 'w') as outfile: 54 pass 55 56# ___________________________________________________________________________ 57# 58# generate_initial_files2 59# ___________________________________________________________________________ 60 61 62@originate([tempdir + "e_name.tmp1", tempdir + "f_name.tmp1"]) 63def generate_initial_files2(on): 64 with open(on, 'w') as outfile: 65 pass 66 67# ___________________________________________________________________________ 68# 69# generate_initial_files3 70# ___________________________________________________________________________ 71 72 73@originate([tempdir + "/g_name.tmp1", tempdir + "/h_name.tmp1"]) 74def generate_initial_files3(on): 75 with open(on, 'w') as outfile: 76 pass 77 78# ___________________________________________________________________________ 79# 80# generate_initial_files1 81# ___________________________________________________________________________ 82 83 84@originate(tempdir + "i_name.tmp1") 85def generate_initial_files4(on): 86 with open(on, 'w') as outfile: 87 pass 88 89# ___________________________________________________________________________ 90# 91# check_task2 92# ___________________________________________________________________________ 93 94 95@collate([generate_initial_files1, generate_initial_files2, generate_initial_files3, 96 generate_initial_files4], 97 formatter(), 98 "{path[0]}/all.tmp2") 99# @transform([generate_initial_files1, generate_initial_files2, generate_initial_files3, 100# generate_initial_files4], 101# formatter( ), 102# "{path[0]}/{basename[0]}.tmp2") 103def check_task2(infiles, outfile): 104 with open(outfile, "w") as p: 105 pass 106 #print >>sys.stderr, "8" * 80, "\n", " task2 :%s %s " % (infiles, outfile) 107 108# ___________________________________________________________________________ 109# 110# check_task3 111# ___________________________________________________________________________ 112 113 114@transform(check_task2, suffix(".tmp2"), ".tmp3") 115def check_task3(infile, outfile): 116 global throw_exception 117 if throw_exception != None: 118 throw_exception = not throw_exception 119 if throw_exception: 120 #print >>sys.stderr, "Throw exception for ", infile, outfile 121 raise Exception("oops") 122 else: 123 #print >>sys.stderr, "No throw exception for ", infile, outfile 124 pass 125 with open(outfile, "w") as p: 126 pass 127 #print >>sys.stderr, "8" * 80, "\n", " task3 :%s %s " % (infile, outfile) 128 129# ___________________________________________________________________________ 130# 131# check_task4 132# ___________________________________________________________________________ 133 134 135@transform(check_task3, suffix(".tmp3"), ".tmp4") 136def check_task4(infile, outfile): 137 with open(outfile, "w") as p: 138 pass 139 #print >>sys.stderr, "8" * 80, "\n", " task4 :%s %s " % (infile, outfile) 140 141 142def cleanup_tmpdir(): 143 os.system('rm -f %s %s' % 144 (os.path.join(tempdir, '*'), RUFFUS_HISTORY_FILE)) 145 146 147VERBOSITY = 5 148VERBOSITY = 11 149 150cnt_pipelines = 0 151 152 153class Test_job_history_with_exceptions(unittest.TestCase): 154 def setUp(self): 155 try: 156 os.mkdir(tempdir) 157 except OSError: 158 pass 159 160 # ___________________________________________________________________________ 161 # 162 # test product() pipeline_printout and pipeline_run 163 # ___________________________________________________________________________ 164 def test_job_history_with_exceptions(self): 165 cleanup_tmpdir() 166 s = StringIO() 167 pipeline_printout(s, [check_task4], verbose=VERBOSITY, 168 wrap_width=10000, pipeline="main") 169 # print s.getvalue() 170 171 def create_pipeline(self): 172 # each pipeline has a different name 173 global cnt_pipelines 174 cnt_pipelines = cnt_pipelines + 1 175 test_pipeline = Pipeline("test %d" % cnt_pipelines) 176 177 test_pipeline.originate(task_func=generate_initial_files1, 178 output=[tempdir + prefix + "_name.tmp1" for prefix in "abcd"]) 179 180 test_pipeline.originate(task_func=generate_initial_files2, 181 output=[tempdir + "e_name.tmp1", tempdir + "f_name.tmp1"]) 182 183 test_pipeline.originate(task_func=generate_initial_files3, 184 output=[tempdir + "g_name.tmp1", tempdir + "h_name.tmp1"]) 185 186 test_pipeline.originate(task_func=generate_initial_files4, 187 output=tempdir + "i_name.tmp1") 188 189 test_pipeline.collate(task_func=check_task2, 190 input=[generate_initial_files1, 191 generate_initial_files2, 192 generate_initial_files3, 193 generate_initial_files4], 194 filter=formatter(), 195 output="{path[0]}/all.tmp2") 196 197 test_pipeline.transform(task_func=check_task3, 198 input=check_task2, 199 filter=suffix(".tmp2"), 200 output=".tmp3") 201 202 test_pipeline.transform(task_func=check_task4, 203 input=check_task3, 204 filter=suffix(".tmp3"), 205 output=".tmp4") 206 return test_pipeline 207 208 def test_job_history_with_exceptions_run(self): 209 """Run""" 210 for i in range(1): 211 cleanup_tmpdir() 212 try: 213 pipeline_run([check_task4], verbose=0, 214 #multithread = 2, 215 one_second_per_job=one_second_per_job, pipeline="main") 216 except: 217 pass 218 s = StringIO() 219 pipeline_printout( 220 s, [check_task4], verbose=VERBOSITY, wrap_width=10000, pipeline="main") 221 # 222 # task 2 should be up to date because exception was throw in task 3 223 # 224 pipeline_printout_str = s.getvalue() 225 correct_order = not re.search( 226 'Tasks which will be run:.*\n(.*\n)*Task = check_task2', pipeline_printout_str) 227 if not correct_order: 228 print(pipeline_printout_str) 229 self.assertTrue(correct_order) 230 sys.stderr.write(".") 231 print() 232 233 def test_newstyle_recreate_job_history(self): 234 """Run""" 235 test_pipeline = self.create_pipeline() 236 global throw_exception 237 throw_exception = None 238 cleanup_tmpdir() 239 240 # 241 # print "Initial run without creating sqlite file" 242 # 243 test_pipeline.run([check_task4], verbose=0, 244 checksum_level=CHECKSUM_FILE_TIMESTAMPS, 245 multithread=10, 246 one_second_per_job=one_second_per_job) 247 248 # 249 # print "printout without sqlite" 250 # 251 s = StringIO() 252 test_pipeline.printout( 253 s, [check_task4], checksum_level=CHECKSUM_FILE_TIMESTAMPS) 254 self.assertTrue(not re.search( 255 'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())) 256 # 257 # print "printout expecting sqlite file" 258 # 259 s = StringIO() 260 test_pipeline.printout(s, [check_task4]) 261 self.assertTrue( 262 re.search('Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())) 263 # 264 # print "Regenerate sqlite file" 265 # 266 test_pipeline.run([check_task4], 267 checksum_level=CHECKSUM_FILE_TIMESTAMPS, 268 history_file=get_default_history_file_name(), 269 multithread=1, 270 verbose=0, 271 touch_files_only=2, 272 one_second_per_job=one_second_per_job) 273 # 274 # print "printout expecting sqlite file" 275 # 276 s = StringIO() 277 test_pipeline.printout(s, [check_task4], verbose=VERBOSITY) 278 succeed = not re.search( 279 'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()) 280 if not succeed: 281 print(s.getvalue(), file=sys.stderr) 282 self.assertTrue(succeed) 283 284 throw_exception = False 285 286 # 287 def test_newstyle_job_history_with_exceptions_run(self): 288 """Run""" 289 test_pipeline = self.create_pipeline() 290 for i in range(1): 291 cleanup_tmpdir() 292 try: 293 test_pipeline.run([check_task4], verbose=0, 294 #multithread = 2, 295 one_second_per_job=one_second_per_job) 296 except: 297 pass 298 s = StringIO() 299 test_pipeline.printout( 300 s, [check_task4], verbose=VERBOSITY, wrap_width=10000) 301 # 302 # task 2 should be up to date because exception was throw in task 3 303 # 304 pipeline_printout_str = s.getvalue() 305 correct_order = not re.search( 306 'Tasks which will be run:.*\n(.*\n)*Task = check_task2', pipeline_printout_str) 307 if not correct_order: 308 print(pipeline_printout_str) 309 self.assertTrue(correct_order) 310 sys.stderr.write(".") 311 print() 312 313 def test_recreate_job_history(self): 314 """Run""" 315 global throw_exception 316 throw_exception = None 317 cleanup_tmpdir() 318 319 # 320 # print "Initial run without creating sqlite file" 321 # 322 pipeline_run([check_task4], verbose=0, 323 checksum_level=CHECKSUM_FILE_TIMESTAMPS, 324 multithread=10, 325 one_second_per_job=one_second_per_job, pipeline="main") 326 327 # 328 # print "printout without sqlite" 329 # 330 s = StringIO() 331 pipeline_printout( 332 s, [check_task4], checksum_level=CHECKSUM_FILE_TIMESTAMPS, pipeline="main") 333 self.assertTrue(not re.search( 334 'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())) 335 # 336 # print "printout expecting sqlite file" 337 # 338 s = StringIO() 339 pipeline_printout(s, [check_task4], pipeline="main") 340 self.assertTrue( 341 re.search('Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())) 342 # 343 # print "Regenerate sqlite file" 344 # 345 pipeline_run([check_task4], 346 checksum_level=CHECKSUM_FILE_TIMESTAMPS, 347 history_file=get_default_history_file_name(), 348 multithread=1, 349 verbose=0, 350 touch_files_only=2, 351 one_second_per_job=one_second_per_job, pipeline="main") 352 # 353 # print "printout expecting sqlite file" 354 # 355 s = StringIO() 356 pipeline_printout(s, [check_task4], verbose=VERBOSITY, pipeline="main") 357 succeed = not re.search( 358 'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()) 359 if not succeed: 360 print(s.getvalue(), file=sys.stderr) 361 self.assertTrue(succeed) 362 363 throw_exception = False 364 365 # ___________________________________________________________________________ 366 # 367 # cleanup 368 # ___________________________________________________________________________ 369 def tearDown(self): 370 shutil.rmtree(tempdir) 371 pass 372 373 374# 375# Necessary to protect the "entry point" of the program under windows. 376# see: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming 377# 378if __name__ == '__main__': 379 #pipeline_printout(sys.stdout, [check_product_task], verbose = VERBOSITY, pipeline= "main") 380 unittest.main() 381