1#!/usr/local/bin/python3.8
2from __future__ import print_function
3import shutil
4import logging
5import unittest
6from ruffus.proxy_logger import make_shared_logger_and_proxy, setup_std_shared_logger
7from ruffus import originate, transform, suffix, merge, pipeline_run, Pipeline
8import sys
9
10"""
11
12    test_with_logger.py
13
14"""
15
16
17import os
18tempdir = os.path.relpath(os.path.abspath(os.path.splitext(__file__)[0])) + "/"
19input_file_names = [os.path.join(tempdir, "%d.1" % fn) for fn in range(20)]
20final_file_name = os.path.join(tempdir, "final.result")
21try:
22    os.makedirs(tempdir)
23except:
24    pass
25
26
27# add grandparent to search path for testing
28grandparent_dir = os.path.abspath(
29    os.path.join(os.path.dirname(__file__), "..", ".."))
30sys.path.insert(0, grandparent_dir)
31
32# module name = script name without extension
33module_name = os.path.splitext(os.path.basename(__file__))[0]
34
35
36# funky code to import by file name
37# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
38
39#   imports
40
41
42# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
43
44
45# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
46
47#   Tasks
48
49
50# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
51
52def write_input_output_filenames_to_output(infiles, outfile, logger_proxy, logging_mutex):
53    """
54    Helper function: Writes input output file names and input contents to outfile
55    """
56    with open(outfile, "w") as oo:
57        # save file name strings before we turn infiles into a list
58        fn_str = "%s -> %s" % (infiles, outfile)
59
60        # None = []
61        if infiles is None:
62            infiles = []
63        # str = [str]
64        if not isinstance(infiles, list):
65            infiles = [infiles]
66
67        max_white_space = -2
68        # write content of infiles indented
69        for infile in infiles:
70            with open(infile) as ii:
71                for line in ii:
72                    oo.write(line)
73                    max_white_space = max(
74                        [max_white_space, len(line) - len(line.lstrip())])
75        # add extra spaces before filenames
76        oo.write(" " * (max_white_space + 2) + fn_str + "\n")
77
78    with logging_mutex:
79        logger_proxy.info(fn_str)
80
81
82#
83#   Make logger
84#
85#import logging
86args = dict()
87args["file_name"] = os.path.join(tempdir, module_name + ".log")
88args["level"] = logging.DEBUG
89args["rotating"] = True
90args["maxBytes"] = 20000
91args["backupCount"] = 10
92args["formatter"] = "%(asctime)s - %(name)s - %(levelname)6s - %(message)s"
93
94if sys.version_info[0] == 3 and sys.version_info[1] == 2 and __name__ != "__main__":
95    print(
96        """
97    888888888888888888888888888888888888888888888888888888888888888888888888888
98
99        ERROR:
100
101    This unit test can not be run as a python module (python -m unittest xxx)
102    due to the interaction of bugs / misfeatures in the multiprocessing module
103    and python3.2
104
105        See http://bugs.python.org/issue15914
106            http://bugs.python.org/issue9573
107
108    In detail:
109
110    Making a shared logger calls code within the multiprocessing module.
111    This in turn tries to import the hmac module inside deliver_challenge().
112    This hangs if it happens after a module fork.
113
114    The only way around this is to only make calls to multiprocessing
115    (i.e. make_shared_logger_and_proxy(...)) after the import phase of
116    module loading.
117
118    This python bug will be triggered if your make_shared_logger_and_proxy()
119    call is at global scope in a module (i.e. not __main__) and only for
120    python version 3.2
121
122    888888888888888888888888888888888888888888888888888888888888888888888888888
123
124""")
125    sys.exit()
126
127(logger_proxy,
128 logging_mutex) = make_shared_logger_and_proxy(setup_std_shared_logger,
129                                               "my_logger", args)
130
131
132#
133#    task1
134#
135@originate(input_file_names, logger_proxy, logging_mutex)
136def task1(outfile, logger_proxy, logging_mutex):
137    write_input_output_filenames_to_output(
138        None, outfile, logger_proxy, logging_mutex)
139
140
141#
142#    task2
143#
144@transform(task1, suffix(".1"), ".2", logger_proxy, logging_mutex)
145def task2(infile, outfile, logger_proxy, logging_mutex):
146    write_input_output_filenames_to_output(
147        infile, outfile, logger_proxy, logging_mutex)
148
149
150#
151#    task3
152#
153@transform(task2, suffix(".2"), ".3", logger_proxy, logging_mutex)
154def task3(infile, outfile, logger_proxy, logging_mutex):
155    """
156    Third task
157    """
158    write_input_output_filenames_to_output(
159        infile, outfile, logger_proxy, logging_mutex)
160
161
162#
163#    task4
164#
165@merge(task3, final_file_name, logger_proxy, logging_mutex)
166def task4(infile, outfile, logger_proxy, logging_mutex):
167    """
168    Fourth task
169    """
170    write_input_output_filenames_to_output(
171        infile, outfile, logger_proxy, logging_mutex)
172
173
174class Test_ruffus(unittest.TestCase):
175    def setUp(self):
176        self.tearDown()
177        try:
178            os.makedirs(tempdir)
179            #sys.stderr.write("    Created %s\n" % tempdir)
180        except:
181            pass
182
183    def tearDown(self):
184        try:
185            shutil.rmtree(tempdir)
186            #sys.stderr.write("    Removed %s\n" % tempdir)
187            pass
188        except:
189            pass
190
191    def test_simpler(self):
192        pipeline_run(multiprocess=500, verbose=0, pipeline="main")
193
194    def test_newstyle_simpler(self):
195        test_pipeline = Pipeline("test")
196        test_pipeline.originate(task1, input_file_names, extras=[
197                                logger_proxy, logging_mutex])
198        test_pipeline.transform(task2, task1, suffix(
199            ".1"), ".2", extras=[logger_proxy, logging_mutex])
200        test_pipeline.transform(task3, task2, suffix(
201            ".2"), ".3", extras=[logger_proxy, logging_mutex])
202        test_pipeline.merge(task4, task3, final_file_name,
203                            extras=[logger_proxy, logging_mutex])
204        #test_pipeline.merge(task4, task3, final_file_name, extras = {"logger_proxy": logger_proxy, "logging_mutex": logging_mutex})
205        test_pipeline.run(multiprocess=500, verbose=0)
206
207
208if __name__ == '__main__':
209    unittest.main()
210