1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import shutil
4import unittest
5from ruffus import pipeline_run, pipeline_printout, Pipeline, formatter, originate, subdivide, split
6import time
7import sys
8
9"""
10
11    test_split_subdivide_checkpointing.py
12
13
14"""
15import os
16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
17
18# add grandparent to search path for testing
19grandparent_dir = os.path.abspath(
20    os.path.join(os.path.dirname(__file__), "..", ".."))
21sys.path.insert(0, grandparent_dir)
22
23# module name = script name without extension
24module_name = os.path.splitext(os.path.basename(__file__))[0]
25
26
27# funky code to import by file name
28parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
29
30
31# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
32
33#   imports
34
35
36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
37
38
39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
40
41#   Each time the pipeline is FORCED to rerun,
42#       More files are created for each task
43
44
45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
46
47
48@originate([tempdir + 'start'])
49def make_start(outfile):
50    """
51    -> start
52    """
53    open(outfile, 'w').close()
54
55
56@split(make_start, tempdir + '*.split')
57def split_start(infiles, outfiles):
58    """
59    -> XXX.split
60        where XXX = 0 .. N,
61              N   = previous N + 1
62    """
63
64    # split always runs exactly one job (unlike @subdivide)
65    # So it implicitly combines all its inputs before running and generating multiple output
66    # @originate generates multiple output so the input for @split is a list...
67    infile = infiles[0]
68
69    # clean up previous
70    for f in outfiles:
71        os.unlink(f)
72
73    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
74    #
75    #   Create more files than the previous invocation
76    #
77    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
78    n_to_produce = len(outfiles) + 1
79    for i in range(n_to_produce):
80        f = '{}{}.split'.format(tempdir, i)
81        open(f, 'a').close()
82
83
84@subdivide(split_start, formatter(), tempdir + '{basename[0]}_*.subdivided', tempdir + '{basename[0]}')
85def subdivide_start(infile, outfiles, infile_basename):
86    # cleanup existing
87    for f in outfiles:
88        os.unlink(f)
89
90    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
91    #
92    #   Create more files than the previous invocation
93    #
94    #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
95    n_to_produce = len(outfiles) + 1
96    output_file_names = []
97    for i in range(n_to_produce):
98        output_file_name = '{}_{}.subdivided'.format(infile_basename, i)
99        open(output_file_name, 'a').close()
100        output_file_names.append(output_file_name)
101
102
103TEST_VERBOSITY = 0
104
105
106class Test_ruffus(unittest.TestCase):
107
108    def tearDown(self):
109        # only tear down if not throw exception so we can debug?
110        try:
111            shutil.rmtree(tempdir)
112        except:
113            pass
114
115    def setUp(self):
116        try:
117            shutil.rmtree(tempdir)
118        except:
119            pass
120        os.makedirs(tempdir)
121
122    def check_file_exists_or_not_as_expected(self, expected_files, not_expected_files):
123        """
124        Check if files exist / not exist
125        """
126        for ee in expected_files:
127            if not os.path.exists(tempdir + ee):
128                raise Exception("Expected file %s" % (tempdir + ee))
129        for ne in not_expected_files:
130            if os.path.exists(tempdir + ne):
131                raise Exception("Unexpected file %s" % (tempdir + ne))
132
133    def test_newstyle_ruffus(self):
134
135        test_pipeline = Pipeline("test")
136        test_pipeline.originate(task_func=make_start,
137                                output=[tempdir + 'start'])
138        test_pipeline.split(task_func=split_start,
139                            input=make_start, output=tempdir + '*.split')
140        test_pipeline.subdivide(task_func=subdivide_start, input=split_start, filter=formatter(
141        ), output=tempdir + '{basename[0]}_*.subdivided', extras=[tempdir + '{basename[0]}'])
142
143        expected_files_after_1_runs = ["start", "0.split", "0_0.subdivided"]
144        expected_files_after_2_runs = [
145            "1.split", "0_1.subdivided", "1_0.subdivided"]
146        expected_files_after_3_runs = [
147            "2.split", "0_2.subdivided", "1_1.subdivided", "2_0.subdivided"]
148        expected_files_after_4_runs = [
149            "3.split", "0_3.subdivided", "1_2.subdivided", "2_1.subdivided", "3_0.subdivided"]
150
151        print("     1 Run pipeline normally...")
152        test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY)
153        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs,
154                                                  expected_files_after_2_runs)
155        print("     2 Check that running again does nothing. (All up to date).")
156        test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY)
157        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs,
158                                                  expected_files_after_2_runs)
159        time.sleep(2)
160
161        print("     3 Running again with forced tasks to generate more files...")
162        test_pipeline.run(forcedtorun_tasks=[
163                          "test::make_start"], multiprocess=10, verbose=TEST_VERBOSITY)
164        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
165                                                  + expected_files_after_2_runs,
166                                                  expected_files_after_3_runs)
167        print("     4 Check that running again does nothing. (All up to date).")
168        test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY)
169        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
170                                                  + expected_files_after_2_runs,
171                                                  expected_files_after_3_runs)
172        time.sleep(2)
173
174        print("     5 Running again with forced tasks to generate even more files...")
175        test_pipeline.run(forcedtorun_tasks=make_start,
176                          multiprocess=10, verbose=TEST_VERBOSITY)
177        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
178                                                  + expected_files_after_2_runs
179                                                  + expected_files_after_3_runs,
180                                                  expected_files_after_4_runs)
181        print("     6 Check that running again does nothing. (All up to date).")
182        test_pipeline.run(multiprocess=10, verbose=TEST_VERBOSITY)
183        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
184                                                  + expected_files_after_2_runs
185                                                  + expected_files_after_3_runs,
186                                                  expected_files_after_4_runs)
187
188    def test_ruffus(self):
189
190        expected_files_after_1_runs = ["start", "0.split", "0_0.subdivided"]
191        expected_files_after_2_runs = [
192            "1.split", "0_1.subdivided", "1_0.subdivided"]
193        expected_files_after_3_runs = [
194            "2.split", "0_2.subdivided", "1_1.subdivided", "2_0.subdivided"]
195        expected_files_after_4_runs = [
196            "3.split", "0_3.subdivided", "1_2.subdivided", "2_1.subdivided", "3_0.subdivided"]
197
198        print("     1 Run pipeline normally...")
199        pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
200        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs,
201                                                  expected_files_after_2_runs)
202        print("     2 Check that running again does nothing. (All up to date).")
203        pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
204        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs,
205                                                  expected_files_after_2_runs)
206
207        time.sleep(2)
208
209        print("     3 Running again with forced tasks to generate more files...")
210        pipeline_run(forcedtorun_tasks=[
211                     make_start], multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
212        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
213                                                  + expected_files_after_2_runs,
214                                                  expected_files_after_3_runs)
215        print("     4 Check that running again does nothing. (All up to date).")
216        pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
217        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
218                                                  + expected_files_after_2_runs,
219                                                  expected_files_after_3_runs)
220        time.sleep(2)
221
222        print("     5 Running again with forced tasks to generate even more files...")
223        pipeline_run(forcedtorun_tasks=[
224                     make_start], multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
225        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
226                                                  + expected_files_after_2_runs
227                                                  + expected_files_after_3_runs,
228                                                  expected_files_after_4_runs)
229        time.sleep(2)
230        print("     6 Check that running again does nothing. (All up to date).")
231        pipeline_run(multiprocess=10, verbose=TEST_VERBOSITY, pipeline="main")
232        self.check_file_exists_or_not_as_expected(expected_files_after_1_runs
233                                                  + expected_files_after_2_runs
234                                                  + expected_files_after_3_runs,
235                                                  expected_files_after_4_runs)
236
237
238if __name__ == '__main__':
239    if len(sys.argv) > 1 and "-v" == sys.argv.pop():
240        TEST_VERBOSITY = 7
241    unittest.main()
242