1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import unittest 4import multiprocessing.managers 5from ruffus import Pipeline, suffix, pipeline_run 6 7""" 8 9 test_softlink_uptodate.py 10 11""" 12 13import os 14import sys 15 16tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 17 18# add grandparent to search path for testing 19grandparent_dir = os.path.abspath( 20 os.path.join(os.path.dirname(__file__), "..", "..")) 21sys.path.insert(0, grandparent_dir) 22 23# module name = script name without extension 24module_name = os.path.splitext(os.path.basename(__file__))[0] 25 26 27# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 28 29# Tasks 30 31 32# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 33 34 35# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 36 37# Tasks 38 39 40# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 41 42# 43# First task 44# 45def start_task(output_file_name, executed_tasks_proxy, mutex_proxy): 46 with open(output_file_name, "w") as f: 47 pass 48 with mutex_proxy: 49 executed_tasks_proxy["start_task"] = 1 50 51# 52# Forwards file names, is always as up to date as its input files... 53# 54 55 56def same_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy): 57 with mutex_proxy: 58 executed_tasks_proxy["same_file_name_task"] = executed_tasks_proxy.get( 59 "same_file_name_task", 0) + 1 60 61# 62# Links file names, is always as up to date if links are not missing 63# 64 65 66def linked_file_name_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy): 67 os.symlink(os.path.abspath(input_file_name), 68 os.path.abspath(output_file_name)) 69 with mutex_proxy: 70 executed_tasks_proxy["linked_file_name_task"] = executed_tasks_proxy.get( 71 "linked_file_name_task", 0) + 1 72 73 74# 75# Final task linking everything 76# 77def final_task(input_file_name, output_file_name, executed_tasks_proxy, mutex_proxy): 78 with open(output_file_name, "w") as f: 79 pass 80 with mutex_proxy: 81 executed_tasks_proxy["final_task"] = executed_tasks_proxy.get( 82 "final_task", 0) + 1 83 84# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 85 86# Run pipeline 87 88 89# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 90 91 92try: 93 from StringIO import StringIO 94except: 95 from io import StringIO 96 97 98class Test_ruffus(unittest.TestCase): 99 def setUp(self): 100 101 # list of executed tasks 102 manager = multiprocessing.managers.SyncManager() 103 manager.start() 104 global mutex_proxy 105 global executed_tasks_proxy 106 mutex_proxy = manager.Lock() 107 executed_tasks_proxy = manager.dict() 108 109 pipeline = Pipeline.pipelines["main"] 110 pipeline.originate(task_func=start_task, 111 output=[tempdir + "a.1", tempdir + "b.1"], 112 extras=[executed_tasks_proxy, mutex_proxy])\ 113 .mkdir(tempdir) 114 pipeline.transform(task_func=same_file_name_task, 115 input=start_task, 116 filter=suffix(".1"), 117 output=".1", 118 extras=[executed_tasks_proxy, mutex_proxy]) 119 pipeline.transform(task_func=linked_file_name_task, 120 input=start_task, 121 filter=suffix(".1"), 122 output=".linked.1", 123 extras=[executed_tasks_proxy, mutex_proxy]) 124 pipeline.transform(task_func=final_task, 125 input=[linked_file_name_task, same_file_name_task], 126 filter=suffix(".1"), 127 output=".3", 128 extras=[executed_tasks_proxy, mutex_proxy]) 129 self.cleanUp() 130 131 def cleanUp(self, check_expected=False): 132 for f in ["a.1", "b.1", "a.linked.1", "b.linked.1", "a.3", "b.3", "a.linked.3", "b.linked.3"]: 133 if os.path.lexists(tempdir + f): 134 os.unlink(tempdir + f) 135 elif check_expected: 136 raise Exception("Expected %s missing" % (tempdir + f)) 137 if os.path.lexists(tempdir): 138 os.rmdir(tempdir) 139 elif check_expected: 140 raise Exception("Expected %s missing" % (tempdir)) 141 142 def tearDown(self): 143 self.cleanUp(True) 144 145 def test_ruffus(self): 146 # 147 # Run task 1 only 148 # 149 print(" Run start_task only", file=sys.stderr) 150 pipeline_run(log_exceptions=True, verbose=0, pipeline="main") 151 152 # 153 # Run task 3 only 154 # 155 print( 156 " Run final_task: linked_file_name_task should run as well", file=sys.stderr) 157 pipeline_run(log_exceptions=True, verbose=0, pipeline="main") 158 159 # 160 # Run task 3 again: 161 # 162 # All jobs should be up to date 163 # 164 print(" Run final_task again: All jobs should be up to date", 165 file=sys.stderr) 166 pipeline_run(log_exceptions=True, verbose=0, pipeline="main") 167 168 # 169 # Make sure right number of jobs / tasks ran 170 # 171 for task_name, jobs_count in ({'start_task': 1, 'final_task': 4, 'linked_file_name_task': 2}).items(): 172 if task_name not in executed_tasks_proxy: 173 raise Exception("Error: %s did not run!!" % task_name) 174 if executed_tasks_proxy[task_name] != jobs_count: 175 raise Exception("Error: %s did not have %d jobs!!" % 176 (task_name, jobs_count)) 177 if "same_file_name_task" in executed_tasks_proxy: 178 raise Exception("Error: %s should not have run!!" % 179 "same_file_name_task") 180 181 182if __name__ == '__main__': 183 unittest.main() 184