1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import re
4import shutil
5import unittest
6from ruffus.ruffus_utility import CHECKSUM_FILE_TIMESTAMPS, RUFFUS_HISTORY_FILE, get_default_history_file_name
7from ruffus.ruffus_exceptions import RethrownJobError
8from ruffus import pipeline_run, pipeline_printout, Pipeline, collate, mkdir, regex, \
9    suffix, formatter, originate, transform
10import sys
11import os
12
13"""
14
15    test_job_history_with_exceptions.py
16
17        Make sure that when an exception is thrown only the current and following tasks fail
18
19"""
20
21# sub-1s resolution in system?
22one_second_per_job = None
23throw_exception = False
24
25tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
26
27# add grandparent to search path for testing
28grandparent_dir = os.path.abspath(
29    os.path.join(os.path.dirname(__file__), "..", ".."))
30sys.path.insert(0, grandparent_dir)
31
32# module name = script name without extension
33module_name = os.path.splitext(os.path.basename(__file__))[0]
34
35
36# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
37#
38#   imports
39# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
40
41try:
42    from StringIO import StringIO
43except:
44    from io import StringIO
45
46
47# ___________________________________________________________________________
48#
49#   generate_initial_files1
50# ___________________________________________________________________________
51@originate([tempdir + prefix + "_name.tmp1" for prefix in "abcd"])
52def generate_initial_files1(on):
53    with open(on, 'w') as outfile:
54        pass
55
56# ___________________________________________________________________________
57#
58#   generate_initial_files2
59# ___________________________________________________________________________
60
61
62@originate([tempdir + "e_name.tmp1", tempdir + "f_name.tmp1"])
63def generate_initial_files2(on):
64    with open(on, 'w') as outfile:
65        pass
66
67# ___________________________________________________________________________
68#
69#   generate_initial_files3
70# ___________________________________________________________________________
71
72
73@originate([tempdir + "/g_name.tmp1", tempdir + "/h_name.tmp1"])
74def generate_initial_files3(on):
75    with open(on, 'w') as outfile:
76        pass
77
78# ___________________________________________________________________________
79#
80#   generate_initial_files1
81# ___________________________________________________________________________
82
83
84@originate(tempdir + "i_name.tmp1")
85def generate_initial_files4(on):
86    with open(on, 'w') as outfile:
87        pass
88
89# ___________________________________________________________________________
90#
91#   check_task2
92# ___________________________________________________________________________
93
94
95@collate([generate_initial_files1, generate_initial_files2, generate_initial_files3,
96          generate_initial_files4],
97         formatter(),
98         "{path[0]}/all.tmp2")
99# @transform([generate_initial_files1, generate_initial_files2, generate_initial_files3,
100#            generate_initial_files4],
101#            formatter( ),
102#            "{path[0]}/{basename[0]}.tmp2")
103def check_task2(infiles, outfile):
104    with open(outfile, "w") as p:
105        pass
106    #print >>sys.stderr, "8" * 80, "\n", "    task2 :%s %s " % (infiles, outfile)
107
108# ___________________________________________________________________________
109#
110#   check_task3
111# ___________________________________________________________________________
112
113
114@transform(check_task2, suffix(".tmp2"), ".tmp3")
115def check_task3(infile, outfile):
116    global throw_exception
117    if throw_exception != None:
118        throw_exception = not throw_exception
119    if throw_exception:
120        #print >>sys.stderr, "Throw exception for ", infile, outfile
121        raise Exception("oops")
122    else:
123        #print >>sys.stderr, "No throw exception for ", infile, outfile
124        pass
125    with open(outfile, "w") as p:
126        pass
127    #print >>sys.stderr, "8" * 80, "\n", "    task3 :%s %s " % (infile, outfile)
128
129# ___________________________________________________________________________
130#
131#   check_task4
132# ___________________________________________________________________________
133
134
135@transform(check_task3, suffix(".tmp3"), ".tmp4")
136def check_task4(infile, outfile):
137    with open(outfile, "w") as p:
138        pass
139    #print >>sys.stderr, "8" * 80, "\n", "    task4 :%s %s " % (infile, outfile)
140
141
142def cleanup_tmpdir():
143    os.system('rm -f %s %s' %
144              (os.path.join(tempdir, '*'), RUFFUS_HISTORY_FILE))
145
146
147VERBOSITY = 5
148VERBOSITY = 11
149
150cnt_pipelines = 0
151
152
153class Test_job_history_with_exceptions(unittest.TestCase):
154    def setUp(self):
155        try:
156            os.mkdir(tempdir)
157        except OSError:
158            pass
159
160    # ___________________________________________________________________________
161    #
162    #   test product() pipeline_printout and pipeline_run
163    # ___________________________________________________________________________
164    def test_job_history_with_exceptions(self):
165        cleanup_tmpdir()
166        s = StringIO()
167        pipeline_printout(s, [check_task4], verbose=VERBOSITY,
168                          wrap_width=10000, pipeline="main")
169        # print s.getvalue()
170
171    def create_pipeline(self):
172        # each pipeline has a different name
173        global cnt_pipelines
174        cnt_pipelines = cnt_pipelines + 1
175        test_pipeline = Pipeline("test %d" % cnt_pipelines)
176
177        test_pipeline.originate(task_func=generate_initial_files1,
178                                output=[tempdir + prefix + "_name.tmp1" for prefix in "abcd"])
179
180        test_pipeline.originate(task_func=generate_initial_files2,
181                                output=[tempdir + "e_name.tmp1", tempdir + "f_name.tmp1"])
182
183        test_pipeline.originate(task_func=generate_initial_files3,
184                                output=[tempdir + "g_name.tmp1", tempdir + "h_name.tmp1"])
185
186        test_pipeline.originate(task_func=generate_initial_files4,
187                                output=tempdir + "i_name.tmp1")
188
189        test_pipeline.collate(task_func=check_task2,
190                              input=[generate_initial_files1,
191                                     generate_initial_files2,
192                                     generate_initial_files3,
193                                     generate_initial_files4],
194                              filter=formatter(),
195                              output="{path[0]}/all.tmp2")
196
197        test_pipeline.transform(task_func=check_task3,
198                                input=check_task2,
199                                filter=suffix(".tmp2"),
200                                output=".tmp3")
201
202        test_pipeline.transform(task_func=check_task4,
203                                input=check_task3,
204                                filter=suffix(".tmp3"),
205                                output=".tmp4")
206        return test_pipeline
207
208    def test_job_history_with_exceptions_run(self):
209        """Run"""
210        for i in range(1):
211            cleanup_tmpdir()
212            try:
213                pipeline_run([check_task4], verbose=0,
214                             #multithread = 2,
215                             one_second_per_job=one_second_per_job, pipeline="main")
216            except:
217                pass
218            s = StringIO()
219            pipeline_printout(
220                s, [check_task4], verbose=VERBOSITY, wrap_width=10000, pipeline="main")
221            #
222            # task 2 should be up to date because exception was throw in task 3
223            #
224            pipeline_printout_str = s.getvalue()
225            correct_order = not re.search(
226                'Tasks which will be run:.*\n(.*\n)*Task = check_task2', pipeline_printout_str)
227            if not correct_order:
228                print(pipeline_printout_str)
229            self.assertTrue(correct_order)
230            sys.stderr.write(".")
231        print()
232
233    def test_newstyle_recreate_job_history(self):
234        """Run"""
235        test_pipeline = self.create_pipeline()
236        global throw_exception
237        throw_exception = None
238        cleanup_tmpdir()
239
240        #
241        #      print "Initial run without creating sqlite file"
242        #
243        test_pipeline.run([check_task4], verbose=0,
244                          checksum_level=CHECKSUM_FILE_TIMESTAMPS,
245                          multithread=10,
246                          one_second_per_job=one_second_per_job)
247
248        #
249        #   print "printout without sqlite"
250        #
251        s = StringIO()
252        test_pipeline.printout(
253            s, [check_task4], checksum_level=CHECKSUM_FILE_TIMESTAMPS)
254        self.assertTrue(not re.search(
255            'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()))
256        #
257        # print "printout expecting sqlite file"
258        #
259        s = StringIO()
260        test_pipeline.printout(s, [check_task4])
261        self.assertTrue(
262            re.search('Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()))
263        #
264        #   print "Regenerate sqlite file"
265        #
266        test_pipeline.run([check_task4],
267                          checksum_level=CHECKSUM_FILE_TIMESTAMPS,
268                          history_file=get_default_history_file_name(),
269                          multithread=1,
270                          verbose=0,
271                          touch_files_only=2,
272                          one_second_per_job=one_second_per_job)
273        #
274        # print "printout expecting sqlite file"
275        #
276        s = StringIO()
277        test_pipeline.printout(s, [check_task4], verbose=VERBOSITY)
278        succeed = not re.search(
279            'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())
280        if not succeed:
281            print(s.getvalue(), file=sys.stderr)
282        self.assertTrue(succeed)
283
284        throw_exception = False
285
286    #
287    def test_newstyle_job_history_with_exceptions_run(self):
288        """Run"""
289        test_pipeline = self.create_pipeline()
290        for i in range(1):
291            cleanup_tmpdir()
292            try:
293                test_pipeline.run([check_task4], verbose=0,
294                                  #multithread = 2,
295                                  one_second_per_job=one_second_per_job)
296            except:
297                pass
298            s = StringIO()
299            test_pipeline.printout(
300                s, [check_task4], verbose=VERBOSITY, wrap_width=10000)
301            #
302            # task 2 should be up to date because exception was throw in task 3
303            #
304            pipeline_printout_str = s.getvalue()
305            correct_order = not re.search(
306                'Tasks which will be run:.*\n(.*\n)*Task = check_task2', pipeline_printout_str)
307            if not correct_order:
308                print(pipeline_printout_str)
309            self.assertTrue(correct_order)
310            sys.stderr.write(".")
311        print()
312
313    def test_recreate_job_history(self):
314        """Run"""
315        global throw_exception
316        throw_exception = None
317        cleanup_tmpdir()
318
319        #
320        #      print "Initial run without creating sqlite file"
321        #
322        pipeline_run([check_task4], verbose=0,
323                     checksum_level=CHECKSUM_FILE_TIMESTAMPS,
324                     multithread=10,
325                     one_second_per_job=one_second_per_job, pipeline="main")
326
327        #
328        #   print "printout without sqlite"
329        #
330        s = StringIO()
331        pipeline_printout(
332            s, [check_task4], checksum_level=CHECKSUM_FILE_TIMESTAMPS, pipeline="main")
333        self.assertTrue(not re.search(
334            'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()))
335        #
336        # print "printout expecting sqlite file"
337        #
338        s = StringIO()
339        pipeline_printout(s, [check_task4], pipeline="main")
340        self.assertTrue(
341            re.search('Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue()))
342        #
343        #   print "Regenerate sqlite file"
344        #
345        pipeline_run([check_task4],
346                     checksum_level=CHECKSUM_FILE_TIMESTAMPS,
347                     history_file=get_default_history_file_name(),
348                     multithread=1,
349                     verbose=0,
350                     touch_files_only=2,
351                     one_second_per_job=one_second_per_job, pipeline="main")
352        #
353        # print "printout expecting sqlite file"
354        #
355        s = StringIO()
356        pipeline_printout(s, [check_task4], verbose=VERBOSITY, pipeline="main")
357        succeed = not re.search(
358            'Tasks which will be run:.*\n(.*\n)*Task = ', s.getvalue())
359        if not succeed:
360            print(s.getvalue(), file=sys.stderr)
361        self.assertTrue(succeed)
362
363        throw_exception = False
364
365    # ___________________________________________________________________________
366    #
367    #   cleanup
368    # ___________________________________________________________________________
369    def tearDown(self):
370        shutil.rmtree(tempdir)
371        pass
372
373
374#
375#   Necessary to protect the "entry point" of the program under windows.
376#       see: http://docs.python.org/library/multiprocessing.html#multiprocessing-programming
377#
378if __name__ == '__main__':
379    #pipeline_printout(sys.stdout, [check_product_task], verbose = VERBOSITY, pipeline= "main")
380    unittest.main()
381