1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import unittest
4import multiprocessing.managers
5from ruffus import Pipeline, suffix, pipeline_run
6
7"""
8
9    test_softlink_uptodate.py
10
11"""
12
13import os
14import sys
15
16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
17
18# add grandparent to search path for testing
19grandparent_dir = os.path.abspath(
20    os.path.join(os.path.dirname(__file__), "..", ".."))
21sys.path.insert(0, grandparent_dir)
22
23# module name = script name without extension
24module_name = os.path.splitext(os.path.basename(__file__))[0]
25
26
27# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
28
29#   Tasks
30
31
32# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
33
34
35# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
36
37#   Tasks
38
39
40# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
41
42#
43#   First task
44#
45def start_task(output_file_name, executed_tasks_proxy, mutex_proxy):
46    with open(output_file_name,  "w") as f:
47        pass
48    with mutex_proxy:
49        executed_tasks_proxy["start_task"] = 1
50
51#
52#   Forwards file names, is always as up to date as its input files...
53#
54
55
56def same_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
57    with mutex_proxy:
58        executed_tasks_proxy["same_file_name_task"] = executed_tasks_proxy.get(
59            "same_file_name_task", 0) + 1
60
61#
62#   Links file names, is always as up to date if links are not missing
63#
64
65
66def linked_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
67    os.symlink(os.path.abspath(input_file_name),
68               os.path.abspath(output_file_name))
69    with mutex_proxy:
70        executed_tasks_proxy["linked_file_name_task"] = executed_tasks_proxy.get(
71            "linked_file_name_task", 0) + 1
72
73
74#
75#   Final task linking everything
76#
77def final_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy):
78    with open(output_file_name,  "w") as f:
79        pass
80    with mutex_proxy:
81        executed_tasks_proxy["final_task"] = executed_tasks_proxy.get(
82            "final_task", 0) + 1
83
84# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
85
86#   Run pipeline
87
88
89# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
90
91
92try:
93    from StringIO import StringIO
94except:
95    from io import StringIO
96
97
98class Test_ruffus(unittest.TestCase):
99    def setUp(self):
100
101        # list of executed tasks
102        manager = multiprocessing.managers.SyncManager()
103        manager.start()
104        global mutex_proxy
105        global executed_tasks_proxy
106        mutex_proxy = manager.Lock()
107        executed_tasks_proxy = manager.dict()
108
109        pipeline = Pipeline.pipelines["main"]
110        pipeline.originate(task_func=start_task,
111                           output=[tempdir + "a.1", tempdir + "b.1"],
112                           extras=[executed_tasks_proxy, mutex_proxy])\
113            .mkdir(tempdir)
114        pipeline.transform(task_func=same_file_name_task,
115                           input=start_task,
116                           filter=suffix(".1"),
117                           output=".1",
118                           extras=[executed_tasks_proxy, mutex_proxy])
119        pipeline.transform(task_func=linked_file_name_task,
120                           input=start_task,
121                           filter=suffix(".1"),
122                           output=".linked.1",
123                           extras=[executed_tasks_proxy, mutex_proxy])
124        pipeline.transform(task_func=final_task,
125                           input=[linked_file_name_task, same_file_name_task],
126                           filter=suffix(".1"),
127                           output=".3",
128                           extras=[executed_tasks_proxy, mutex_proxy])
129        self.cleanUp()
130
131    def cleanUp(self, check_expected=False):
132        for f in ["a.1", "b.1", "a.linked.1", "b.linked.1", "a.3", "b.3", "a.linked.3", "b.linked.3"]:
133            if os.path.lexists(tempdir + f):
134                os.unlink(tempdir + f)
135            elif check_expected:
136                raise Exception("Expected %s missing" % (tempdir + f))
137        if os.path.lexists(tempdir):
138            os.rmdir(tempdir)
139        elif check_expected:
140            raise Exception("Expected %s missing" % (tempdir))
141
142    def tearDown(self):
143        self.cleanUp(True)
144
145    def test_ruffus(self):
146        #
147        #   Run task 1 only
148        #
149        print("    Run start_task only", file=sys.stderr)
150        pipeline_run(log_exceptions=True, verbose=0, pipeline="main")
151
152        #
153        #   Run task 3 only
154        #
155        print(
156            "    Run final_task: linked_file_name_task should run as well", file=sys.stderr)
157        pipeline_run(log_exceptions=True, verbose=0, pipeline="main")
158
159        #
160        #   Run task 3 again:
161        #
162        #       All jobs should be up to date
163        #
164        print("    Run final_task again: All jobs should be up to date",
165              file=sys.stderr)
166        pipeline_run(log_exceptions=True, verbose=0, pipeline="main")
167
168        #
169        #   Make sure right number of jobs / tasks ran
170        #
171        for task_name, jobs_count in ({'start_task': 1, 'final_task': 4, 'linked_file_name_task': 2}).items():
172            if task_name not in executed_tasks_proxy:
173                raise Exception("Error: %s did not run!!" % task_name)
174            if executed_tasks_proxy[task_name] != jobs_count:
175                raise Exception("Error: %s did not have %d jobs!!" %
176                                (task_name, jobs_count))
177        if "same_file_name_task" in executed_tasks_proxy:
178            raise Exception("Error: %s should not have run!!" %
179                            "same_file_name_task")
180
181
182if __name__ == '__main__':
183    unittest.main()
184