1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import shutil
4import unittest
5from subprocess import *
6from ruffus import pipeline_run, pipeline_printout_graph, originate, split, transform, \
7    subdivide, formatter, Pipeline
8import sys
9"""
10
11    test_split_subdivide_checkpointing.py
12
13
14"""
15import os
16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
17
18# add grandparent to search path for testing
19grandparent_dir = os.path.abspath(
20    os.path.join(os.path.dirname(__file__), "..", ".."))
21sys.path.insert(0, grandparent_dir)
22
23# module name = script name without extension
24module_name = os.path.splitext(os.path.basename(__file__))[0]
25
26
27# funky code to import by file name
28parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
29
30
31# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
32
33#   imports
34
35
36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
37
38
39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
40
41#   Each time the pipeline is FORCED to rerun,
42#       More files are created for each task
43
44
45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
46
47
48@originate([tempdir + 'start'])
49def make_start(outfile):
50    """
51    -> start
52    """
53    open(outfile, 'w').close()
54
55
56@split(make_start, tempdir + '*.split')
57def split_start(infiles, outfiles):
58    """
59    -> XXX.split
60        where XXX = 0 .. N,
61              N   = previous N + 1
62    """
63
64    # split always runs exactly one job (unlike @subdivide)
65    # So it implicitly combines all its inputs before running and generating multiple output
66    # @originate generates multiple output so the input for @split is a list...
67    infile = infiles[0]
68
69    # clean up previous
70    for f in outfiles:
71        os.unlink(f)
72
73    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
74    #
75    #   Create more files than the previous invocation
76    #
77    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
78    n_to_produce = len(outfiles) + 1
79    for i in range(n_to_produce):
80        f = '{}{}.split'.format(tempdir, i)
81        open(f, 'a').close()
82
83
84@subdivide(split_start, formatter(), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}')
85def subdivide_start(infile, outfiles, infile_basename):
86    # cleanup existing
87    for f in outfiles:
88        os.unlink(f)
89
90    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
91    #
92    #   Create more files than the previous invocation
93    #
94    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
95    n_to_produce = len(outfiles) + 1
96    for i in range(n_to_produce):
97        open('{}_{}.subdivided'.format(infile_basename, i), 'a').close()
98
99
100class Test_ruffus(unittest.TestCase):
101
102    def tearDown(self):
103        # only tear down if not throw exception so we can debug?
104        try:
105            shutil.rmtree(tempdir)
106        except:
107            pass
108
109    def setUp(self):
110        try:
111            shutil.rmtree(tempdir)
112        except:
113            pass
114        os.makedirs(tempdir)
115
116        #
117        #   check graphviz exists for turning dot files into jpg, svg etc
118        #
119        try:
120            process = Popen("echo what | dot", stdout=PIPE,
121                            stderr=STDOUT, shell=True)
122            output, unused_err = process.communicate()
123            retcode = process.poll()
124            if retcode:
125                raise CalledProcessError(
126                    retcode, "echo what | dot", output=output)
127        except CalledProcessError as err:
128            output_str = str(err.output)
129            if "No such file or directory" in output_str or "not found" in output_str or "Unable to access jarfile" in output_str:
130                self.graph_viz_present = False
131                return
132        self.graph_viz_present = True
133
134    def test_ruffus(self):
135
136        print("     Run pipeline normally...")
137        if self.graph_viz_present:
138            pipeline_printout_graph(tempdir + "flowchart.dot", pipeline="main")
139            pipeline_printout_graph(tempdir + "flowchart.jpg",
140                                    target_tasks=[subdivide_start],
141                                    forcedtorun_tasks=[split_start],
142                                    no_key_legend=True)
143            pipeline_printout_graph(
144                tempdir + "flowchart.svg", no_key_legend=False, pipeline="main")
145            # Unknown format
146            try:
147                pipeline_printout_graph(
148                    tempdir + "flowchart.unknown", no_key_legend=False, pipeline="main")
149                raise Exception(
150                    "Failed to throw exception for pipeline_printout_graph unknown extension ")
151            except CalledProcessError as err:
152                pass
153            pipeline_printout_graph(
154                tempdir + "flowchart.unknown", "svg", no_key_legend=False, pipeline="main")
155
156        else:
157            pipeline_printout_graph(tempdir + "flowchart.dot",
158                                    target_tasks=[subdivide_start],
159                                    forcedtorun_tasks=[split_start],
160                                    no_key_legend=True,
161                                    pipeline="main")
162
163    def test_newstyle_ruffus(self):
164
165        print("     Run pipeline normally...")
166        test_pipeline = Pipeline("test")
167        test_pipeline.originate(make_start, [tempdir + 'start'])
168
169        test_pipeline.split(split_start, make_start, tempdir + '*.split')
170
171        test_pipeline.subdivide(subdivide_start, split_start, formatter(
172        ), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}')
173        if self.graph_viz_present:
174            test_pipeline.printout_graph(tempdir + "flowchart.dot")
175            test_pipeline.printout_graph(tempdir + "flowchart.jpg",
176                                         target_tasks=[subdivide_start],
177                                         forcedtorun_tasks=[split_start],
178                                         no_key_legend=True)
179            test_pipeline.printout_graph(
180                tempdir + "flowchart.svg", no_key_legend=False)
181            # Unknown format
182            try:
183                test_pipeline.printout_graph(
184                    tempdir + "flowchart.unknown", no_key_legend=False)
185                raise Exception(
186                    "Failed to throw exception for test_pipeline.printout_graph unknown extension ")
187            except CalledProcessError as err:
188                pass
189            test_pipeline.printout_graph(
190                tempdir + "flowchart.unknown", "svg", no_key_legend=False)
191
192        else:
193            test_pipeline.printout_graph(tempdir + "flowchart.dot",
194                                         target_tasks=[subdivide_start],
195                                         forcedtorun_tasks=[split_start],
196                                         no_key_legend=True)
197
198
199if __name__ == '__main__':
200    unittest.main()
201