1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import json
4from collections import defaultdict
5import re
6import time
7import shutil
8import unittest
9from ruffus import pipeline_run, pipeline_printout, Pipeline, transform, follows, posttask, merge, \
10    mkdir, suffix, originate, regex, inputs, jobs_limit, files
11import sys
12
13"""
14
15    branching.py
16
17        test branching dependencies
18
19"""
20
21
22import os
23tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
24
25# add grandparent to search path for testing
26grandparent_dir = os.path.abspath(
27    os.path.join(os.path.dirname(__file__), "..", ".."))
28sys.path.insert(0, grandparent_dir)
29
30# module name = script name without extension
31module_name = os.path.splitext(os.path.basename(__file__))[0]
32
33
34# funky code to import by file name
35parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
36
37
38# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
39
40#   imports
41
42
43# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
44
45try:
46    from StringIO import StringIO
47except:
48    from io import StringIO
49
50# use simplejson in place of json for python < 2.6
51# try:
52#    import json
53# except ImportError:
54#    import simplejson
55#    json = simplejson
56
57# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
58
59#   Functions
60
61
62# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
63
64def check_job_io(infiles, outfiles, extra_params):
65    """
66    cat input files content to output files
67        after writing out job parameters
68    """
69    # dump parameters
70    params = (infiles, outfiles) + extra_params
71
72    if isinstance(infiles, str):
73        infiles = [infiles]
74    elif infiles is None:
75        infiles = []
76    if isinstance(outfiles, str):
77        outfiles = [outfiles]
78    output_text = list()
79    for f in infiles:
80        with open(f) as ii:
81            output_text.append(ii.read())
82    output_text = "".join(sorted(output_text))
83    output_text += json.dumps(infiles) + " -> " + json.dumps(outfiles) + "\n"
84    for f in outfiles:
85        with open(f, "w") as oo:
86            oo.write(output_text)
87
88
89# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
90
91#   Main logic
92
93
94# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
95
96
97# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
98
99#   Tasks
100
101
102# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
103#
104#   1   ->  2   ->  3   ->
105#       ->  4           ->
106#                   5   ->    6
107#
108
109def do_write(file_name, what):
110    with open(file_name, "a") as oo:
111        oo.write(what)
112
113
114test_file = tempdir + "task.done"
115#
116#    task1
117#
118
119
120@originate([tempdir + d for d in ('a.1', 'b.1', 'c.1')])
121@follows(mkdir(tempdir))
122@posttask(lambda: do_write(test_file, "Task 1 Done\n"))
123def task1(outfile, *extra_params):
124    """
125    First task
126    """
127    with open(tempdir + "jobs.start",  "a") as oo:
128        oo.write('job = %s\n' % json.dumps([None, outfile]))
129    check_job_io(None, outfile, extra_params)
130    with open(tempdir + "jobs.finish",  "a") as oo:
131        oo.write('job = %s\n' % json.dumps([None, outfile]))
132
133
134#
135#    task2
136#
137@posttask(lambda: do_write(test_file, "Task 2 Done\n"))
138@transform(task1, suffix(".1"), ".2")
139def task2(infiles, outfiles, *extra_params):
140    """
141    Second task
142    """
143    with open(tempdir + "jobs.start",  "a") as oo:
144        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
145    check_job_io(infiles, outfiles, extra_params)
146    with open(tempdir + "jobs.finish",  "a") as oo:
147        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
148
149
150#
151#    task3
152#
153@transform(task2, regex('(.*).2'), inputs([r"\1.2", tempdir + "a.1"]), r'\1.3')
154@posttask(lambda: do_write(test_file, "Task 3 Done\n"))
155def task3(infiles, outfiles, *extra_params):
156    """
157    Third task
158    """
159    with open(tempdir + "jobs.start",  "a") as oo:
160        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
161    check_job_io(infiles, outfiles, extra_params)
162    with open(tempdir + "jobs.finish",  "a") as oo:
163        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
164
165
166#
167#    task4
168#
169@jobs_limit(1)
170@transform(tempdir + "*.1", suffix(".1"), ".4")
171@follows(task1)
172@posttask(lambda: do_write(test_file, "Task 4 Done\n"))
173def task4(infiles, outfiles, *extra_params):
174    """
175    Fourth task is extra slow
176    """
177    with open(tempdir + "jobs.start",  "a") as oo:
178        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
179    time.sleep(0.1)
180    check_job_io(infiles, outfiles, extra_params)
181    with open(tempdir + "jobs.finish",  "a") as oo:
182        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
183
184#
185#    task5
186#
187
188
189@files(None, tempdir + 'a.5')
190@follows(mkdir(tempdir))
191@posttask(lambda: do_write(test_file, "Task 5 Done\n"))
192def task5(infiles, outfiles, *extra_params):
193    """
194    Fifth task is extra slow
195    """
196    with open(tempdir + "jobs.start",  "a") as oo:
197        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
198    time.sleep(1)
199    check_job_io(infiles, outfiles, extra_params)
200    with open(tempdir + "jobs.finish",  "a") as oo:
201        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
202
203#
204#    task6
205#
206# @files([[[tempdir + d for d in 'a.3', 'b.3', 'c.3', 'a.4', 'b.4', 'c.4', 'a.5'], tempdir + 'final.6']])
207
208
209@merge([task3, task4, task5], tempdir + "final.6")
210@follows(task3, task4, task5, )
211@posttask(lambda: do_write(test_file, "Task 6 Done\n"))
212def task6(infiles, outfiles, *extra_params):
213    """
214    final task
215    """
216    with open(tempdir + "jobs.start",  "a") as oo:
217        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
218    check_job_io(infiles, outfiles, extra_params)
219    with open(tempdir + "jobs.finish",  "a") as oo:
220        oo.write('job = %s\n' % json.dumps([infiles, outfiles]))
221
222
223#
224#   Use equivalent but new sytle syntax
225#
226test_pipeline = Pipeline("test")
227
228test_pipeline.originate(task_func=task1,
229                        output=[tempdir + d for d in ('a.1', 'b.1', 'c.1')])\
230    .follows(mkdir(tempdir))\
231    .posttask(lambda: do_write(test_file, "Task 1 Done\n"))
232
233test_pipeline.transform(task_func=task2,
234                        input=task1,
235                        filter=suffix(".1"),
236                        output=".2") \
237    .posttask(lambda: do_write(test_file, "Task 2 Done\n"))
238
239test_pipeline.transform(task3, task2, regex('(.*).2'), inputs([r"\1.2", tempdir + "a.1"]), r'\1.3')\
240    .posttask(lambda: do_write(test_file, "Task 3 Done\n"))
241
242
243test_pipeline.transform(task4, tempdir + "*.1", suffix(".1"), ".4")\
244    .follows(task1)\
245    .posttask(lambda: do_write(test_file, "Task 4 Done\n"))\
246    .jobs_limit(1)
247
248test_pipeline.files(task5, None, tempdir + 'a.5')\
249    .follows(mkdir(tempdir))\
250    .posttask(lambda: do_write(test_file, "Task 5 Done\n"))
251
252test_pipeline.merge(task_func=task6,
253                    input=[task3, task4, task5],
254                    output=tempdir + "final.6")\
255    .follows(task3, task4, task5, ) \
256    .posttask(lambda: do_write(test_file, "Task 6 Done\n"))
257
258
259def check_job_order_correct(filename):
260    """
261       1   ->  2   ->  3   ->
262           ->  4           ->
263                       5   ->    6
264    """
265
266    precedence_rules = [[1, 2],
267                        [2, 3],
268                        [1, 4],
269                        [5, 6],
270                        [3, 6],
271                        [4, 6]]
272
273    index_re = re.compile(r'.*\.([0-9])["\]\n]*$')
274    job_indices = defaultdict(list)
275    with open(filename) as ii:
276        for linenum, l in enumerate(ii):
277            m = index_re.search(l)
278            if not m:
279                raise "Non-matching line in [%s]" % filename
280            job_indices[int(m.group(1))].append(linenum)
281
282    for job_index in job_indices:
283        job_indices[job_index].sort()
284
285    for before, after in precedence_rules:
286        if before not in job_indices or after not in job_indices:
287            continue
288        if job_indices[before][-1] >= job_indices[after][0]:
289            raise Exception("Precedence violated for job %d [line %d] and job %d [line %d] of [%s]"
290                            % (before, job_indices[before][-1],
291                               after,  job_indices[after][0],
292                               filename))
293
294
295def check_final_output_correct(after_touch_files=False):
296    """
297    check if the final output in final.6 is as expected
298    """
299    expected_output = \
300        """        ["DIR/a.1"] -> ["DIR/a.2"]
301        ["DIR/a.1"] -> ["DIR/a.4"]
302        ["DIR/a.2", "DIR/a.1"] -> ["DIR/a.3"]
303        ["DIR/a.3", "DIR/b.3", "DIR/c.3", "DIR/a.4", "DIR/b.4", "DIR/c.4", "DIR/a.5"] -> ["DIR/final.6"]
304        ["DIR/b.1"] -> ["DIR/b.2"]
305        ["DIR/b.1"] -> ["DIR/b.4"]
306        ["DIR/b.2", "DIR/a.1"] -> ["DIR/b.3"]
307        ["DIR/c.1"] -> ["DIR/c.2"]
308        ["DIR/c.1"] -> ["DIR/c.4"]
309        ["DIR/c.2", "DIR/a.1"] -> ["DIR/c.3"]
310        [] -> ["DIR/a.1"]
311        [] -> ["DIR/a.1"]
312        [] -> ["DIR/a.1"]
313        [] -> ["DIR/a.1"]
314        [] -> ["DIR/a.1"]
315        [] -> ["DIR/a.5"]
316        [] -> ["DIR/b.1"]
317        [] -> ["DIR/b.1"]
318        [] -> ["DIR/c.1"]
319        [] -> ["DIR/c.1"]"""
320
321    expected_output = expected_output.replace(
322        "        ", "").replace("DIR/", tempdir).split("\n")
323    orig_expected_output = expected_output
324    if after_touch_files:
325        expected_output.pop(-3)
326    with open(tempdir + "final.6", "r") as ii:
327        final_6_contents = sorted([l.rstrip() for l in ii.readlines()])
328    if final_6_contents != expected_output:
329        print("Actual:", file=sys.stderr)
330        for ll in final_6_contents:
331            print(ll, file=sys.stderr)
332        print("_" * 80, file=sys.stderr)
333        print("Expected:", file=sys.stderr)
334        for ll in orig_expected_output:
335            print(ll, file=sys.stderr)
336        print("_" * 80, file=sys.stderr)
337        for i, (l1, l2) in enumerate(zip(final_6_contents, expected_output)):
338            if l1 != l2:
339                sys.stderr.write(
340                    "%d\nActual:\n  >%s<\nExpected:\n  >%s<\n" % (i, l1, l2))
341        raise Exception("Final.6 output is not as expected\n")
342
343
344class Test_ruffus(unittest.TestCase):
345
346    def tearDown(self):
347        try:
348            shutil.rmtree(tempdir)
349        except:
350            pass
351
352    def setUp(self):
353        try:
354            shutil.rmtree(tempdir)
355        except:
356            pass
357        os.makedirs(tempdir)
358
359    def test_ruffus(self):
360        print("\n\n     Run pipeline normally...")
361        pipeline_run(multiprocess=10, verbose=0, pipeline="main")
362        check_final_output_correct()
363        check_job_order_correct(tempdir + "jobs.start")
364        check_job_order_correct(tempdir + "jobs.finish")
365        print("     OK")
366
367        print("\n\n     Touch task2 only:")
368        os.unlink(os.path.join(tempdir, "jobs.start"))
369        os.unlink(os.path.join(tempdir, "jobs.finish"))
370        print("       First delete b.1 for task2...")
371        os.unlink(os.path.join(tempdir, "b.1"))
372        print("       Then run with touch_file_only...")
373        pipeline_run([task2], multiprocess=10,
374                     touch_files_only=True, verbose=0, pipeline="main")
375
376        # check touching has made task2 up to date
377        s = StringIO()
378        pipeline_printout(s, [task2], verbose=4,
379                          wrap_width=10000, pipeline="main")
380        output_str = s.getvalue()
381        #print (">>>\n", output_str, "<<<\n", file=sys.stderr)
382        if "b.1" in output_str:
383            raise Exception("Expected b.1 created by touching...")
384        if "b.2" in output_str:
385            raise Exception("Expected b.2 created by touching...")
386        print("     Touching has made task2 up to date...\n")
387
388        print("     Then run normally again...")
389        pipeline_run(multiprocess=10, verbose=0, pipeline="main")
390        check_final_output_correct(True)
391        check_job_order_correct(tempdir + "jobs.start")
392        check_job_order_correct(tempdir + "jobs.finish")
393
394    def test_ruffus_new_syntax(self):
395        print("\n\n     Run pipeline normally...")
396        test_pipeline.run(multiprocess=10, verbose=0)
397        check_final_output_correct()
398        check_job_order_correct(tempdir + "jobs.start")
399        check_job_order_correct(tempdir + "jobs.finish")
400        print("     OK")
401
402        print("\n\n     Touch task2 only:")
403        os.unlink(os.path.join(tempdir, "jobs.start"))
404        os.unlink(os.path.join(tempdir, "jobs.finish"))
405        print("       First delete b.1 for task2...")
406        os.unlink(os.path.join(tempdir, "b.1"))
407        print("       Then run with touch_file_only...")
408        test_pipeline.run([task2], multiprocess=10,
409                          touch_files_only=True, verbose=0)
410
411        # check touching has made task2 up to date
412        s = StringIO()
413        test_pipeline.printout(s, [task2], verbose=4, wrap_width=10000)
414        output_str = s.getvalue()
415        #print (">>>\n", output_str, "<<<\n", file=sys.stderr)
416        if "b.1" in output_str:
417            raise Exception("Expected b.1 created by touching...")
418        if "b.2" in output_str:
419            raise Exception("Expected b.2 created by touching...")
420        print("     Touching has made task2 up to date...\n")
421
422        print("     Then run normally again...")
423        test_pipeline.run(multiprocess=10, verbose=0)
424        check_final_output_correct(True)
425        check_job_order_correct(tempdir + "jobs.start")
426        check_job_order_correct(tempdir + "jobs.finish")
427
428
429if __name__ == '__main__':
430    unittest.main()
431