1#!/usr/local/bin/python3.8 2from __future__ import print_function 3import shutil 4import logging 5import unittest 6from ruffus.proxy_logger import make_shared_logger_and_proxy, setup_std_shared_logger 7from ruffus import originate, transform, suffix, merge, pipeline_run, Pipeline 8import sys 9 10""" 11 12 test_with_logger.py 13 14""" 15 16 17import os 18tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/" 19input_file_names = [os.path.join(tempdir, "%d.1" % fn) for fn in range(20)] 20final_file_name = os.path.join(tempdir, "final.result") 21try: 22 os.makedirs(tempdir) 23except: 24 pass 25 26 27# add grandparent to search path for testing 28grandparent_dir = os.path.abspath( 29 os.path.join(os.path.dirname(__file__), "..", "..")) 30sys.path.insert(0, grandparent_dir) 31 32# module name = script name without extension 33module_name = os.path.splitext(os.path.basename(__file__))[0] 34 35 36# funky code to import by file name 37# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 38 39# imports 40 41 42# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 43 44 45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 46 47# Tasks 48 49 50# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 51 52def write_input_output_filenames_to_output(infiles, outfile, logger_proxy, logging_mutex): 53 """ 54 Helper function: Writes input output file names and input contents to outfile 55 """ 56 with open(outfile, "w") as oo: 57 # save file name strings before we turn infiles into a list 58 fn_str = "%s -> %s" % (infiles, outfile) 59 60 # None = [] 61 if infiles is None: 62 infiles = [] 63 # str = [str] 64 if not isinstance(infiles, list): 65 infiles = [infiles] 66 67 max_white_space = -2 68 # write content of infiles indented 69 for infile in infiles: 70 with open(infile) as ii: 71 for line in ii: 72 oo.write(line) 73 max_white_space = max( 74 [max_white_space, len(line) - len(line.lstrip())]) 75 # add extra spaces before filenames 76 oo.write(" " * (max_white_space + 2) + fn_str + "\n") 77 78 with logging_mutex: 79 logger_proxy.info(fn_str) 80 81 82# 83# Make logger 84# 85#import logging 86args = dict() 87args["file_name"] = os.path.join(tempdir, module_name + ".log") 88args["level"] = logging.DEBUG 89args["rotating"] = True 90args["maxBytes"] = 20000 91args["backupCount"] = 10 92args["formatter"] = "%(asctime)s - %(name)s - %(levelname)6s - %(message)s" 93 94if sys.version_info[0] == 3 and sys.version_info[1] == 2 and __name__ != "__main__": 95 print( 96 """ 97 888888888888888888888888888888888888888888888888888888888888888888888888888 98 99 ERROR: 100 101 This unit test can not be run as a python module (python -m unittest xxx) 102 due to the interaction of bugs / misfeatures in the multiprocessing module 103 and python3.2 104 105 See http://bugs.python.org/issue15914 106 http://bugs.python.org/issue9573 107 108 In detail: 109 110 Making a shared logger calls code within the multiprocessing module. 111 This in turn tries to import the hmac module inside deliver_challenge(). 112 This hangs if it happens after a module fork. 113 114 The only way around this is to only make calls to multiprocessing 115 (i.e. make_shared_logger_and_proxy(...)) after the import phase of 116 module loading. 117 118 This python bug will be triggered if your make_shared_logger_and_proxy() 119 call is at global scope in a module (i.e. not __main__) and only for 120 python version 3.2 121 122 888888888888888888888888888888888888888888888888888888888888888888888888888 123 124""") 125 sys.exit() 126 127(logger_proxy, 128 logging_mutex) = make_shared_logger_and_proxy(setup_std_shared_logger, 129 "my_logger", args) 130 131 132# 133# task1 134# 135@originate(input_file_names, logger_proxy, logging_mutex) 136def task1(outfile, logger_proxy, logging_mutex): 137 write_input_output_filenames_to_output( 138 None, outfile, logger_proxy, logging_mutex) 139 140 141# 142# task2 143# 144@transform(task1, suffix(".1"), ".2", logger_proxy, logging_mutex) 145def task2(infile, outfile, logger_proxy, logging_mutex): 146 write_input_output_filenames_to_output( 147 infile, outfile, logger_proxy, logging_mutex) 148 149 150# 151# task3 152# 153@transform(task2, suffix(".2"), ".3", logger_proxy, logging_mutex) 154def task3(infile, outfile, logger_proxy, logging_mutex): 155 """ 156 Third task 157 """ 158 write_input_output_filenames_to_output( 159 infile, outfile, logger_proxy, logging_mutex) 160 161 162# 163# task4 164# 165@merge(task3, final_file_name, logger_proxy, logging_mutex) 166def task4(infile, outfile, logger_proxy, logging_mutex): 167 """ 168 Fourth task 169 """ 170 write_input_output_filenames_to_output( 171 infile, outfile, logger_proxy, logging_mutex) 172 173 174class Test_ruffus(unittest.TestCase): 175 def setUp(self): 176 self.tearDown() 177 try: 178 os.makedirs(tempdir) 179 #sys.stderr.write(" Created %s\n" % tempdir) 180 except: 181 pass 182 183 def tearDown(self): 184 try: 185 shutil.rmtree(tempdir) 186 #sys.stderr.write(" Removed %s\n" % tempdir) 187 pass 188 except: 189 pass 190 191 def test_simpler(self): 192 pipeline_run(multiprocess=500, verbose=0, pipeline="main") 193 194 def test_newstyle_simpler(self): 195 test_pipeline = Pipeline("test") 196 test_pipeline.originate(task1, input_file_names, extras=[ 197 logger_proxy, logging_mutex]) 198 test_pipeline.transform(task2, task1, suffix( 199 ".1"), ".2", extras=[logger_proxy, logging_mutex]) 200 test_pipeline.transform(task3, task2, suffix( 201 ".2"), ".3", extras=[logger_proxy, logging_mutex]) 202 test_pipeline.merge(task4, task3, final_file_name, 203 extras=[logger_proxy, logging_mutex]) 204 #test_pipeline.merge(task4, task3, final_file_name, extras = {"logger_proxy": logger_proxy, "logging_mutex": logging_mutex}) 205 test_pipeline.run(multiprocess=500, verbose=0) 206 207 208if __name__ == '__main__': 209 unittest.main() 210