1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import shutil 4import unittest 5from subprocess import * 6from ruffus import pipeline_run, pipeline_printout_graph, originate, split, transform, \ 7 subdivide, formatter, Pipeline 8import sys 9""" 10 11 test_split_subdivide_checkpointing.py 12 13 14""" 15import os 16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 17 18# add grandparent to search path for testing 19grandparent_dir = os.path.abspath( 20 os.path.join(os.path.dirname(__file__), "..", "..")) 21sys.path.insert(0, grandparent_dir) 22 23# module name = script name without extension 24module_name = os.path.splitext(os.path.basename(__file__))[0] 25 26 27# funky code to import by file name 28parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) 29 30 31# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 32 33# imports 34 35 36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 37 38 39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 40 41# Each time the pipeline is FORCED to rerun, 42# More files are created for each task 43 44 45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 46 47 48@originate([tempdir + 'start']) 49def make_start(outfile): 50 """ 51 -> start 52 """ 53 open(outfile, 'w').close() 54 55 56@split(make_start, tempdir + '*.split') 57def split_start(infiles, outfiles): 58 """ 59 -> XXX.split 60 where XXX = 0 .. N, 61 N = previous N + 1 62 """ 63 64 # split always runs exactly one job (unlike @subdivide) 65 # So it implicitly combines all its inputs before running and generating multiple output 66 # @originate generates multiple output so the input for @split is a list... 67 infile = infiles[0] 68 69 # clean up previous 70 for f in outfiles: 71 os.unlink(f) 72 73 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 74 # 75 # Create more files than the previous invocation 76 # 77 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 78 n_to_produce = len(outfiles) + 1 79 for i in range(n_to_produce): 80 f = '{}{}.split'.format(tempdir, i) 81 open(f, 'a').close() 82 83 84@subdivide(split_start, formatter(), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}') 85def subdivide_start(infile, outfiles, infile_basename): 86 # cleanup existing 87 for f in outfiles: 88 os.unlink(f) 89 90 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 91 # 92 # Create more files than the previous invocation 93 # 94 #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! 95 n_to_produce = len(outfiles) + 1 96 for i in range(n_to_produce): 97 open('{}_{}.subdivided'.format(infile_basename, i), 'a').close() 98 99 100class Test_ruffus(unittest.TestCase): 101 102 def tearDown(self): 103 # only tear down if not throw exception so we can debug? 104 try: 105 shutil.rmtree(tempdir) 106 except: 107 pass 108 109 def setUp(self): 110 try: 111 shutil.rmtree(tempdir) 112 except: 113 pass 114 os.makedirs(tempdir) 115 116 # 117 # check graphviz exists for turning dot files into jpg, svg etc 118 # 119 try: 120 process = Popen("echo what | dot", stdout=PIPE, 121 stderr=STDOUT, shell=True) 122 output, unused_err = process.communicate() 123 retcode = process.poll() 124 if retcode: 125 raise CalledProcessError( 126 retcode, "echo what | dot", output=output) 127 except CalledProcessError as err: 128 output_str = str(err.output) 129 if "No such file or directory" in output_str or "not found" in output_str or "Unable to access jarfile" in output_str: 130 self.graph_viz_present = False 131 return 132 self.graph_viz_present = True 133 134 def test_ruffus(self): 135 136 print(" Run pipeline normally...") 137 if self.graph_viz_present: 138 pipeline_printout_graph(tempdir + "flowchart.dot", pipeline="main") 139 pipeline_printout_graph(tempdir + "flowchart.jpg", 140 target_tasks=[subdivide_start], 141 forcedtorun_tasks=[split_start], 142 no_key_legend=True) 143 pipeline_printout_graph( 144 tempdir + "flowchart.svg", no_key_legend=False, pipeline="main") 145 # Unknown format 146 try: 147 pipeline_printout_graph( 148 tempdir + "flowchart.unknown", no_key_legend=False, pipeline="main") 149 raise Exception( 150 "Failed to throw exception for pipeline_printout_graph unknown extension ") 151 except CalledProcessError as err: 152 pass 153 pipeline_printout_graph( 154 tempdir + "flowchart.unknown", "svg", no_key_legend=False, pipeline="main") 155 156 else: 157 pipeline_printout_graph(tempdir + "flowchart.dot", 158 target_tasks=[subdivide_start], 159 forcedtorun_tasks=[split_start], 160 no_key_legend=True, 161 pipeline="main") 162 163 def test_newstyle_ruffus(self): 164 165 print(" Run pipeline normally...") 166 test_pipeline = Pipeline("test") 167 test_pipeline.originate(make_start, [tempdir + 'start']) 168 169 test_pipeline.split(split_start, make_start, tempdir + '*.split') 170 171 test_pipeline.subdivide(subdivide_start, split_start, formatter( 172 ), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}') 173 if self.graph_viz_present: 174 test_pipeline.printout_graph(tempdir + "flowchart.dot") 175 test_pipeline.printout_graph(tempdir + "flowchart.jpg", 176 target_tasks=[subdivide_start], 177 forcedtorun_tasks=[split_start], 178 no_key_legend=True) 179 test_pipeline.printout_graph( 180 tempdir + "flowchart.svg", no_key_legend=False) 181 # Unknown format 182 try: 183 test_pipeline.printout_graph( 184 tempdir + "flowchart.unknown", no_key_legend=False) 185 raise Exception( 186 "Failed to throw exception for test_pipeline.printout_graph unknown extension ") 187 except CalledProcessError as err: 188 pass 189 test_pipeline.printout_graph( 190 tempdir + "flowchart.unknown", "svg", no_key_legend=False) 191 192 else: 193 test_pipeline.printout_graph(tempdir + "flowchart.dot", 194 target_tasks=[subdivide_start], 195 forcedtorun_tasks=[split_start], 196 no_key_legend=True) 197 198 199if __name__ == '__main__': 200 unittest.main() 201