1#!/usr/bin/env python
2from __future__ import print_function
3
4from collections import defaultdict, deque
5from collections import namedtuple
6from contextlib import contextmanager
7from multiprocessing import Pool as ProcessPool
8from multiprocessing.pool import ThreadPool
9import copy
10import functools
11import multiprocessing
12import os
13import glob
14import re
15import signal
16import subprocess
17import sys
18import textwrap
19import time
20import traceback
21
22from .file_name_parameters import \
23    args_param_factory, \
24    check_files_io_parameters, \
25    check_input_files_exist, \
26    check_parallel_parameters, \
27    collate_param_factory, \
28    combinatorics_param_factory, \
29    files_custom_generator_param_factory, \
30    files_param_factory, \
31    get_nested_tasks_or_globs, \
32    is_file_re_combining, \
33    merge_param_factory, \
34    needs_update_check_directory_missing, \
35    needs_update_check_modify_time, \
36    originate_param_factory, \
37    product_param_factory, \
38    regex, suffix, formatter, inputs, \
39    split_param_factory, \
40    subdivide_param_factory, \
41    t_combinatorics_type, \
42    t_extra_inputs, \
43    t_formatter_file_names_transform, \
44    t_nested_formatter_file_names_transform, \
45    t_params_tasks_globs_run_time_data, \
46    t_regex_file_names_transform, \
47    t_suffix_file_names_transform, \
48    transform_param_factory, \
49    touch_file_factory
50from .ruffus_utility import shorten_filenames_encoder, \
51    ignore_unknown_encoder, \
52    get_strings_in_flattened_sequence, \
53    JobHistoryChecksum, \
54    CHECKSUM_FILE_TIMESTAMPS, \
55    parse_task_arguments, \
56    replace_placeholders_with_tasks_in_input_params, \
57    get_default_checksum_level, \
58    open_job_history, \
59    non_str_sequence
60import ruffus.ruffus_exceptions as ruffus_exceptions
61from .print_dependencies import attributes_to_str
62from .graph import node, topologically_sorted_nodes, graph_printout
63
64
65if sys.hexversion < 0x03000000:
66    from future_builtins import zip, map
67
68################################################################################
69#
70#
71#   task.py
72#
73#   Copyright (c) 10/9/2009 Leo Goodstadt
74#
75#   Permission is hereby granted, free of charge, to any person obtaining a copy
76#   of this software and associated documentation files (the "Software"), to deal
77#   in the Software without restriction, including without limitation the rights
78#   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
79#   copies of the Software, and to permit persons to whom the Software is
80#   furnished to do so, subject to the following conditions:
81#
82#   The above copyright notice and this permission notice shall be included in
83#   all copies or substantial portions of the Software.
84#
85#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
86#   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
87#   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
88#   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
89#   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
90#   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
91#   THE SOFTWARE.
92#################################################################################
93"""
94
95********************************************
96:mod:`ruffus.task` -- Overview
97********************************************
98
99.. moduleauthor:: Leo Goodstadt <ruffus@llew.org.uk>
100
101Initial implementation of @active_if by Jacob Biesinger
102
103============================
104Decorator syntax:
105============================
106
107    Pipelined tasks are created by "decorating" a function with the following syntax::
108
109        def func_a():
110            pass
111
112        @follows(func_a)
113        def func_b ():
114            pass
115
116
117    Each task is a single function which is applied one or more times to a list of parameters
118    (typically input files to produce a list of output files).
119
120    Each of these is a separate, independent job (sharing the same code) which can be
121    run in parallel.
122
123
124============================
125Running the pipeline
126============================
127    To run the pipeline::
128
129            pipeline_run(target_tasks, forcedtorun_tasks = [], multiprocess = 1,
130                            logger = stderr_logger,
131                            gnu_make_maximal_rebuild_mode  = True,
132                            cleanup_log = "../cleanup.log")
133
134            pipeline_cleanup(cleanup_log = "../cleanup.log")
135
136
137
138
139
140
141"""
142
143try:
144    from collections.abc import Callable
145except ImportError:
146    from collections import Callable
147
148# 88888888888888888888888888888888888888888888888888888888888888888888888888888
149
150#   imports
151
152
153# 88888888888888888888888888888888888888888888888888888888888888888888888888888
154if sys.hexversion >= 0x03000000:
155    # everything is unicode in python3
156    from functools import reduce
157
158
159try:
160    import cPickle as pickle
161except:
162    import pickle as pickle
163
164
165if __name__ == '__main__':
166    import sys
167    sys.path.insert(0, ".")
168
169
170if sys.hexversion >= 0x03000000:
171    # everything is unicode in python3
172    path_str_type = str
173else:
174    path_str_type = basestring
175
176
177#
178# use simplejson in place of json for python < 2.6
179#
180try:
181    import json
182except ImportError:
183    import simplejson
184    json = simplejson
185dumps = json.dumps
186
187if sys.hexversion >= 0x03000000:
188    import queue as queue
189else:
190    import Queue as queue
191
192
193class Ruffus_Keyboard_Interrupt_Exception (Exception):
194    pass
195
196# 88888888888888888888888888888888888888888888888888888888888888888888888888888
197
198#
199#   light weight logging objects
200#
201#
202# 88888888888888888888888888888888888888888888888888888888888888888888888888888
203
204
205class t_black_hole_logger:
206
207    """
208    Does nothing!
209    """
210
211    def info(self, message, *args, **kwargs):
212        pass
213
214    def debug(self, message, *args, **kwargs):
215        pass
216
217    def warning(self, message, *args, **kwargs):
218        pass
219
220    def error(self, message, *args, **kwargs):
221        pass
222
223
224class t_stderr_logger:
225
226    """
227    Everything to stderr
228    """
229
230    def __init__(self):
231        self.unique_prefix = ""
232
233    def add_unique_prefix(self):
234        import random
235        random.seed()
236        self.unique_prefix = str(random.randint(0, 1000)) + " "
237
238    def info(self, message):
239        sys.stderr.write(self.unique_prefix + message + "\n")
240
241    def warning(self, message):
242        sys.stderr.write("\n\n" + self.unique_prefix +
243                         "WARNING:\n    " + message + "\n\n")
244
245    def error(self, message):
246        sys.stderr.write("\n\n" + self.unique_prefix +
247                         "ERROR:\n    " + message + "\n\n")
248
249    def debug(self, message):
250        sys.stderr.write(self.unique_prefix + message + "\n")
251
252
253class t_stream_logger:
254
255    """
256    Everything to stderr
257    """
258
259    def __init__(self, stream):
260        self.stream = stream
261
262    def info(self, message):
263        self.stream.write(message + "\n")
264
265    def warning(self, message):
266        self.stream.write("\n\nWARNING:\n    " + message + "\n\n")
267
268    def error(self, message):
269        self.stream.write("\n\nERROR:\n    " + message + "\n\n")
270
271    def debug(self, message):
272        self.stream.write(message + "\n")
273
274
275black_hole_logger = t_black_hole_logger()
276stderr_logger = t_stderr_logger()
277
278
279class t_verbose_logger:
280
281    def __init__(self, verbose, verbose_abbreviated_path, logger, runtime_data):
282        self.verbose = verbose
283        self.logger = logger
284        self.runtime_data = runtime_data
285        self.verbose_abbreviated_path = verbose_abbreviated_path
286
287
288def log_at_level(logger, message_level, verbose_level, msg):
289    """
290    writes to log if message_level > verbose level
291    Returns anything written in case we might want to drop down and output at a
292    lower log level
293    """
294    if message_level <= verbose_level:
295        logger.info(msg)
296        return True
297    return False
298
299
300#   queue management objects
301#       inserted into queue like job parameters to control multi-processing queue
302# fake parameters to signal in queue
303class all_tasks_complete:
304    pass
305
306
307class waiting_for_more_tasks_to_complete:
308    pass
309
310
311# synchronisation data
312#
313# SyncManager()
314# syncmanager.start()
315
316@contextmanager
317def do_nothing_semaphore():
318    yield
319
320
321# option to turn on EXTRA pipeline_run DEBUGGING
322EXTRA_PIPELINERUN_DEBUGGING = False
323
324
325class task_decorator(object):
326
327    """
328        Forwards to functions within Task
329    """
330
331    def __init__(self, *decoratorArgs, **decoratorNamedArgs):
332        """
333            saves decorator arguments
334        """
335        self.args = decoratorArgs
336        self.named_args = decoratorNamedArgs
337
338    def __call__(self, task_func):
339        """
340            calls func in task with the same name as the class
341        """
342        # add task to main pipeline
343        # check for duplicate tasks inside _create_task
344        task = main_pipeline._create_task(task_func)
345
346        # call the method called
347        #   task.decorator_xxxx
348        #   where xxxx = transform subdivide etc
349        task_decorator_function = getattr(
350            task, "_decorator_" + self.__class__.__name__)
351        task.created_via_decorator = True
352        # create empty placeholder with the args %s actually inside the task function
353        task.description_with_args_placeholder = task._get_decorated_function(
354        ).replace("...", "%s", 1)
355        task_decorator_function(*self.args, **self.named_args)
356
357        #
358        #   don't change the function so we can call it unaltered
359        #
360        return task_func
361
362
363class follows(task_decorator):
364    pass
365
366
367class files(task_decorator):
368    pass
369
370
371class split(task_decorator):
372    pass
373
374
375class transform(task_decorator):
376    pass
377
378
379class subdivide(task_decorator):
380
381    """
382    Splits a each set of input files into multiple output file names,
383        where the number of output files may not be known beforehand.
384    """
385    pass
386
387
388class originate(task_decorator):
389    pass
390
391
392class merge(task_decorator):
393    pass
394
395
396class posttask(task_decorator):
397    pass
398
399
400class jobs_limit(task_decorator):
401    pass
402
403
404class collate(task_decorator):
405    pass
406
407
408class active_if(task_decorator):
409    pass
410
411
412class check_if_uptodate(task_decorator):
413    pass
414
415
416class parallel(task_decorator):
417    pass
418
419
420class graphviz(task_decorator):
421    pass
422
423
424class files_re(task_decorator):
425    """obsolete"""
426    pass
427
428
429#  indicator objects
430class mkdir(task_decorator):
431    # def __init__ (self, *args):
432    #    self.args = args
433    pass
434
435
436#   touch_file
437class touch_file(object):
438
439    def __init__(self, *args):
440        self.args = args
441
442
443#       job descriptors
444#           given parameters, returns strings describing job
445#           First returned parameter is string in strong form
446#           Second returned parameter is a list of strings for input,
447#               output and extra parameters
448#               intended to be reformatted with indentation
449#           main use in error logging
450def generic_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data):
451    if unglobbed_params in ([], None):
452        m = "Job"
453    else:
454        m = "Job  = %s" % ignore_unknown_encoder(unglobbed_params)
455    return m, [m]
456
457
458def io_files_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data):
459    extra_param = ", " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1] \
460        if len(unglobbed_params) > 2 else ""
461    out_param = shorten_filenames_encoder(unglobbed_params[1], verbose_abbreviated_path) \
462        if len(unglobbed_params) > 1 else "??"
463    in_param = shorten_filenames_encoder(unglobbed_params[0], verbose_abbreviated_path) \
464        if len(unglobbed_params) > 0 else "??"
465
466    return ("Job  = [%s -> %s%s]" % (in_param, out_param, extra_param),
467            ["Job  = [%s" % in_param, "-> " + out_param + extra_param + "]"])
468
469
470def io_files_one_to_many_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data):
471
472    extra_param = ", " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1] \
473        if len(unglobbed_params) > 2 else ""
474    out_param = shorten_filenames_encoder(unglobbed_params[1], verbose_abbreviated_path) \
475        if len(unglobbed_params) > 1 else "??"
476    in_param = shorten_filenames_encoder(unglobbed_params[0], verbose_abbreviated_path) \
477        if len(unglobbed_params) > 0 else "??"
478
479    # start with input parameter
480    ret_params = ["Job  = [%s" % in_param]
481
482    # add output parameter to list,
483    #   processing one by one if multiple output parameters
484    if len(unglobbed_params) > 1:
485        if isinstance(unglobbed_params[1], (list, tuple)):
486            ret_params.extend(
487                "-> " + shorten_filenames_encoder(p, verbose_abbreviated_path) for p in unglobbed_params[1])
488        else:
489            ret_params.append("-> " + out_param)
490
491    # add extra
492    if len(unglobbed_params) > 2:
493        ret_params.append(
494            " , " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1])
495
496    # add closing bracket
497    ret_params[-1] += "]"
498
499    return ("Job  = [%s -> %s%s]" % (in_param, out_param, extra_param), ret_params)
500
501
502def mkdir_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data):
503    # input, output and parameters
504    if len(unglobbed_params) == 1:
505        m = "Make directories %s" % (shorten_filenames_encoder(
506            unglobbed_params[0], verbose_abbreviated_path))
507    elif len(unglobbed_params) == 2:
508        m = "Make directories %s" % (shorten_filenames_encoder(
509            unglobbed_params[1], verbose_abbreviated_path))
510    else:
511        return [], []
512    return m, [m]
513
514
515#       job wrappers
516#           registers files/directories for cleanup
517def job_wrapper_generic(params, user_defined_work_func, register_cleanup, touch_files_only):
518    """
519    run func
520    """
521    assert(user_defined_work_func)
522    return user_defined_work_func(*params)
523
524
525def job_wrapper_io_files(params, user_defined_work_func, register_cleanup, touch_files_only,
526                         output_files_only=False):
527    """
528    job wrapper for all that deal with i/o files
529    run func on any i/o if not up to date
530    """
531    assert(user_defined_work_func)
532
533    i, o = params[0:2]
534
535    if touch_files_only == 0:
536        # @originate only uses output files
537        if output_files_only:
538            # TODOOO extra and named extras
539            ret_val = user_defined_work_func(*(params[1:]))
540        # all other decorators
541        else:
542            try:
543                # TODOOO extra and named extras
544                ret_val = user_defined_work_func(*params)
545                # EXTRA pipeline_run DEBUGGING
546                if EXTRA_PIPELINERUN_DEBUGGING:
547                    sys.stderr.write(
548                        "w" * 36 + "[[ task() done ]]" + "w" * 27 + "\n")
549            except KeyboardInterrupt as e:
550                # Reraise KeyboardInterrupt as a normal Exception
551                # EXTRA pipeline_run DEBUGGING
552                if EXTRA_PIPELINERUN_DEBUGGING:
553                    sys.stderr.write("E" * 36 + "[[ KeyboardInterrupt from task() ]]" +
554                                     "E" * 9 + "\n")
555                raise Ruffus_Keyboard_Interrupt_Exception("KeyboardInterrupt")
556            except:
557                # sys.stderr.write("?? %s ??" % (tuple(params),))
558                raise
559    elif touch_files_only == 1:
560        # job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True)
561
562        #
563        #   Do not touch any output files which are the same as any input
564        #       i.e. which are just being passed through
565        #
566        # list of input files
567        real_input_file_names = set()
568        for f in get_strings_in_flattened_sequence(i):
569            real_input_file_names.add(os.path.realpath(f))
570
571        #
572        #   touch files only
573        #
574        for f in get_strings_in_flattened_sequence(o):
575
576            if os.path.realpath(f) in real_input_file_names:
577                continue
578
579            #
580            #   race condition still possible...
581            #
582            with open(f, 'a') as ff:
583                os.utime(f, None)
584            # if not os.path.exists(f):
585            #    open(f, 'w')
586            #    mtime = os.path.getmtime(f)
587            # else:
588            #    os.utime(f, None)
589            #    mtime = os.path.getmtime(f)
590
591            # job_history[f] = chksum  # update file times and job details in
592            # history
593
594    #
595    # register strings in output file for cleanup
596    #
597    for f in get_strings_in_flattened_sequence(o):
598        register_cleanup(f, "file")
599
600
601def job_wrapper_output_files(params, user_defined_work_func, register_cleanup, touch_files_only):
602    """
603    job wrapper for all that only deals with output files.
604
605    run func on any output file if not up to date
606    """
607    job_wrapper_io_files(params, user_defined_work_func, register_cleanup, touch_files_only,
608                         output_files_only=True)
609
610
611def job_wrapper_mkdir(params, user_defined_work_func, register_cleanup, touch_files_only):
612    """
613    Make missing directories including any intermediate directories on the specified path(s)
614    """
615    #
616    #   Just in case, swallow file exist errors because some other makedirs
617    #       might be subpath of this directory
618    #   Should not be necessary because of "sorted" in task_mkdir
619    #
620    #
621    if len(params) == 1:
622        dirs = params[0]
623
624    # if there are two parameters, they are i/o, and the directories to be
625    # created are the output
626    elif len(params) >= 2:
627        dirs = params[1]
628    else:
629        raise Exception("No arguments in mkdir check %s" % (params,))
630
631    # get all file names in flat list
632    dirs = get_strings_in_flattened_sequence(dirs)
633
634    for d in dirs:
635        try:
636            # Please email the authors if an uncaught exception is raised here
637            os.makedirs(d)
638            register_cleanup(d, "makedirs")
639        except:
640            #
641            #   ignore exception if
642            #      exception == OSError      + "File exists" or      // Linux
643            #      exception == WindowsError + "file already exists" // Windows
644            #   Are other exceptions raised by other OS?
645            #
646            #
647            exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
648            # exceptionType == OSError and
649            if "File exists" in str(exceptionValue):
650                continue
651            # exceptionType == WindowsError and
652            elif "file already exists" in str(exceptionValue):
653                continue
654            raise
655
656        #   changed for compatibility with python 3.x
657        # except OSError, e:
658        #    if "File exists" not in e:
659        #        raise
660
661
662JOB_ERROR = 0
663JOB_SIGNALLED_BREAK = 1
664JOB_UP_TO_DATE = 2
665JOB_COMPLETED = 3
666
667#   t_job_result
668#       Previously a collections.namedtuple (introduced in python 2.6)
669#       Now using implementation from running
670#           t_job_result = namedtuple('t_job_result',
671#                'task_name state job_name return_value exception', verbose =1)
672#           for compatibility with python 2.5
673t_job_result = namedtuple('t_job_result',
674                          'task_name '
675                          'node_index state '
676                          'job_name '
677                          'return_value '
678                          'exception '
679                          'params '
680                          'unglobbed_params ')
681
682
683def run_pooled_job_without_exceptions(process_parameters):
684    """
685    handles running jobs in parallel
686    Make sure exceptions are caught here:
687        Otherwise, these will kill the thread/process
688        return any exceptions which will be rethrown at the other end:
689        See ruffus_exceptions.RethrownJobError /  run_all_jobs_in_task
690    """
691    # signal.signal(signal.SIGINT, signal.SIG_IGN)
692    (params, unglobbed_params, task_name, node_index, job_name, job_wrapper, user_defined_work_func,
693     job_limit_semaphore, death_event, touch_files_only) = process_parameters
694
695    # #job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True)
696    #  outfile = params[1] if len(params) > 1 else None   # mkdir has no output
697    #  if not isinstance(outfile, list):
698    # #    outfile = [outfile]
699    #  for o in outfile:
700    #  job_history.pop(o, None)  # remove outfile from history if it exists
701
702    if job_limit_semaphore is None:
703        job_limit_semaphore = do_nothing_semaphore()
704
705    try:
706        with job_limit_semaphore:
707            # EXTRA pipeline_run DEBUGGING
708            if EXTRA_PIPELINERUN_DEBUGGING:
709                sys.stderr.write(
710                    ">" * 36 + "[[ job_wrapper ]]" + ">" * 27 + "\n")
711            return_value = job_wrapper(params, user_defined_work_func,
712                                       register_cleanup, touch_files_only)
713
714            #
715            #   ensure one second between jobs
716            #
717            # if one_second_per_job:
718            #    time.sleep(1.01)
719            # EXTRA pipeline_run DEBUGGING
720            if EXTRA_PIPELINERUN_DEBUGGING:
721                sys.stderr.write(
722                    "<" * 36 + "[[ job_wrapper done ]]" + "<" * 22 + "\n")
723            return t_job_result(task_name, node_index, JOB_COMPLETED, job_name, return_value, None,
724                                params, unglobbed_params)
725    except KeyboardInterrupt as e:
726        # Reraise KeyboardInterrupt as a normal Exception.
727        #   Should never be necessary here
728        # EXTRA pipeline_run DEBUGGING
729        if EXTRA_PIPELINERUN_DEBUGGING:
730            sys.stderr.write(
731                "E" * 36 + "[[ KeyboardInterrupt ]]" + "E" * 21 + "\n")
732        death_event.set()
733        raise Ruffus_Keyboard_Interrupt_Exception("KeyboardInterrupt")
734    except:
735        # EXTRA pipeline_run DEBUGGING
736        if EXTRA_PIPELINERUN_DEBUGGING:
737            sys.stderr.write(
738                "E" * 36 + "[[ Other Interrupt ]]" + "E" * 23 + "\n")
739        #   Wrap up one or more exceptions rethrown across process boundaries
740        #
741        # See multiprocessor.Server.handle_request/serve_client for an
742        # analogous function
743        exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
744        exception_stack = traceback.format_exc()
745        exception_name = exceptionType.__module__ + '.' + exceptionType.__name__
746        exception_value = str(exceptionValue)
747        if len(exception_value):
748            exception_value = "(%s)" % exception_value
749
750        if exceptionType == Ruffus_Keyboard_Interrupt_Exception:
751            death_event.set()
752            job_state = JOB_SIGNALLED_BREAK
753        elif exceptionType == ruffus_exceptions.JobSignalledBreak:
754            job_state = JOB_SIGNALLED_BREAK
755        else:
756            job_state = JOB_ERROR
757        return t_job_result(task_name, node_index, job_state, job_name, None,
758                            [task_name,
759                             job_name,
760                             exception_name,
761                             exception_value,
762                             exception_stack], params, unglobbed_params)
763
764
765def subprocess_checkcall_wrapper(**named_args):
766    """
767    Splits string at semicolons and runs with subprocess.check_call
768    """
769    for cmd in named_args["command_str"].split(";"):
770        cmd = cmd.replace("\n", " ").strip()
771        if not len(cmd):
772            continue
773        cmd = cmd.format(**named_args)
774        subprocess.check_call(cmd, shell=True)
775
776
777def exec_string_as_task_func(input_args, output_args, **named_args):
778    """
779    Ruffus provided function for tasks which are just strings
780        (no Python function provided)
781    The task executor function is given as a paramter which is
782        then called with the arguments.
783    Convoluted but avoids special casing too much
784    """
785    if not "__RUFFUS_TASK_CALLBACK__" in named_args or \
786            not callable(named_args["__RUFFUS_TASK_CALLBACK__"]):
787        raise Exception("Missing call back function")
788    if not "command_str" in named_args or \
789            not isinstance(named_args["command_str"], (path_str_type,)):
790        raise Exception("Missing call back function string")
791
792    callback = named_args["__RUFFUS_TASK_CALLBACK__"]
793    del named_args["__RUFFUS_TASK_CALLBACK__"]
794
795    named_args["input"] = input_args
796    named_args["output"] = output_args
797    callback(**named_args)
798
799
800# todo
801def register_cleanup(file_name, operation):
802    pass
803
804
805# pipeline functions only have "name" as a named parameter
806def get_name_from_args(named_args):
807    if "name" in named_args:
808        name = named_args["name"]
809        del named_args["name"]
810        return name
811    else:
812        return None
813
814
815class Pipeline(dict):
816    """
817
818    Each Ruffus Pipeline object has to have a unique name.  "main" is
819    reserved for "main_pipeline", the default pipeline for all Ruffus
820    decorators.
821    """
822
823    # dictionary of all pipelines
824    pipelines = dict()
825    cnt_mkdir = 0
826
827    def __init__(self, name, *arg, **kw):
828        # initialise dict
829        super(Pipeline, self).__init__(*arg, **kw)
830
831        # set of tasks
832        self.tasks = set()
833        self.task_names = set()
834
835        # add self to list of all pipelines
836        self.name = name
837        self.original_name = name
838        if name in Pipeline.pipelines:
839            raise Exception("Error:\nDuplicate pipeline. "
840                            "A pipeline named '%s' already exists.\n" % name)
841        Pipeline.pipelines[name] = self
842        self.head_tasks = []
843        self.tail_tasks = []
844        self.lookup = dict()
845
846        self.command_str_callback = subprocess_checkcall_wrapper
847        self.job_state = "active"
848
849    @classmethod
850    def clear_all(cls):
851        """clear all pipelines.
852        """
853        cls.pipelines = dict()
854
855    def _create_task(self, task_func, task_name=None):
856        """
857        Create task with a function
858        """
859
860        #
861        #   If string, this is a command to be executed later
862        #   Derive task name from command
863        #
864        #
865        if isinstance(task_func, (path_str_type,)):
866            task_str = task_func
867            task_func = exec_string_as_task_func
868            if not task_name:
869                elements = task_str.split()
870                use_n_elements = 1
871                while use_n_elements < len(elements):
872                    task_name = " ".join(elements[0:use_n_elements])
873                    if task_name not in self.task_names:
874                        break
875                else:
876                    raise ruffus_exceptions.error_duplicate_task_name("The task string '%s' is ambiguous for "
877                                                    "Pipeline '%s'. You must disambiguate "
878                                                    "explicitly with different task names "
879                                                    % (task_str, self.name))
880            return Task(task_func, task_name, self)
881
882        #
883        #   Derive task name from Python Task function name
884        #
885        if not task_name:
886            if task_func.__module__ == "__main__":
887                task_name = task_func.__name__
888            else:
889                task_name = str(task_func.__module__) + \
890                    "." + task_func.__name__
891
892        if task_name not in self:
893            return Task(task_func, task_name, self)
894
895        # task_name already there as the identifying task_name.
896        # If the task_func also matches everything is fine
897        elif (task_name in self.task_names and
898              self[task_name].user_defined_work_func == task_func):
899            return self[task_name]
900
901        # If the task name is already taken but with a different function,
902        #   this will blow up
903        # But if the function is being reused and with a previously different
904        # task name then OK
905        else:
906            return Task(task_func, task_name, self)
907
908    def _complete_task_setup(self, processed_tasks):
909        """
910        Finishes initialising all tasks
911        Make sure all tasks in dependency list are linked to real functions
912        """
913
914        processed_pipelines = set([self.name])
915        unprocessed_tasks = deque(self.tasks)
916        while len(unprocessed_tasks):
917            task = unprocessed_tasks.popleft()
918            if task in processed_tasks:
919                continue
920            processed_tasks.add(task)
921            for ancestral_task in task._complete_setup():
922                if ancestral_task not in processed_tasks:
923                    unprocessed_tasks.append(ancestral_task)
924                    processed_pipelines.add(ancestral_task.pipeline.name)
925            #
926            #   some jobs single state status mirrors parent's state
927            #       and parent task not known until dependencies resolved
928            #   Is this legacy code?
929            #       Breaks @merge otherwise
930            #
931            if isinstance(task._is_single_job_single_output, Task):
932                task._is_single_job_single_output = \
933                    task._is_single_job_single_output._is_single_job_single_output
934
935        for pipeline_name in list(processed_pipelines):
936            if pipeline_name != self.name:
937                processed_pipelines |= self.pipelines[pipeline_name]._complete_task_setup(
938                    processed_tasks)
939
940        return processed_pipelines
941
942    def set_command_str_callback(self, command_str_callback):
943        if not callable(command_str_callback):
944            raise Exception(
945                "set_command_str_callback() takes a python function or a callable object.")
946        self.command_str_callback = command_str_callback
947
948    def get_head_tasks(self):
949        """
950        Return tasks at the head of the pipeline,
951            i.e. with only descendants/dependants
952        N.B. Head and Tail sets can overlap
953
954        Most of the time when self.head_tasks == [], it has been left undefined by mistake.
955            So we usually throw an exception at the point of use
956        """
957        return self.head_tasks
958
959    def set_head_tasks(self, head_tasks):
960        """
961        Specifies tasks at the head of the pipeline,
962            i.e. with only descendants/dependants
963        """
964        if not isinstance(head_tasks, (list,)):
965            raise Exception("Pipelines['{pipeline_name}'].set_head_tasks() expects a "
966                            "list not {head_tasks_type}".format(pipeline_name=self.name,
967                                                                head_tasks_type=type(head_tasks)))
968
969        for tt in head_tasks:
970            if not isinstance(tt, (Task,)):
971                raise Exception("Pipelines['{pipeline_name}'].set_head_tasks() expects a "
972                                "list of tasks not {task_type} {task}".format(pipeline_name=self.name,
973                                                                              task_type=type(
974                                                                                  tt),
975                                                                              task=1))
976        self.head_tasks = head_tasks
977
978    def get_tail_tasks(self):
979        """
980        Return tasks at the tail of the pipeline,
981            i.e. without descendants/dependants
982        N.B. Head and Tail sets can overlap
983
984        Most of the time when self.tail_tasks == [],
985            it has been left undefined by mistake.
986            So we usually throw an exception at the point of use
987        """
988        return self.tail_tasks
989
990    def set_tail_tasks(self, tail_tasks):
991        """
992        Specifies tasks at the tail of the pipeline,
993            i.e. with only descendants/dependants
994        """
995        self.tail_tasks = tail_tasks
996
997    def set_input(self, **args):
998        """
999        Change the input parameter(s) of the designated "head" tasks of the pipeline
1000        """
1001        if not len(self.get_head_tasks()):
1002            raise ruffus_exceptions.error_no_head_tasks("Pipeline '{pipeline_name}' has no head tasks defined.\n"
1003                                      "Which task in '{pipeline_name}' do you want "
1004                                      "to set_input() for?".format(pipeline_name=self.name))
1005
1006        for tt in self.get_head_tasks():
1007            tt.set_input(**args)
1008
1009    def set_output(self, **args):
1010        """
1011        Change the output parameter(s) of the designated "head" tasks of the pipeline
1012        """
1013        if not len(self.get_head_tasks()):
1014            raise ruffus_exceptions.error_no_head_tasks("Pipeline '{pipeline_name}' has no head tasks defined.\n"
1015                                      "Which task in '{pipeline_name}' do you want "
1016                                      "to set_output() for?".format(pipeline_name=self.name))
1017
1018        for tt in self.get_head_tasks():
1019            tt.set_output(**args)
1020
1021    def suspend_jobs(self):
1022        self.job_state = "suspended"
1023
1024    def resume_jobs(self):
1025        self.job_state = "active"
1026
1027    def is_job_suspended(self):
1028        return self.job_state == "suspended"
1029
1030    def clone(self, new_name, *arg, **kw):
1031        """
1032        Make a deep copy of the pipeline
1033        """
1034
1035        # setup new pipeline
1036        new_pipeline = Pipeline(new_name, *arg, **kw)
1037
1038        # set of tasks
1039        new_pipeline.tasks = set(task._clone(new_pipeline)
1040                                 for task in self.tasks)
1041        new_pipeline.task_names = set(self.task_names)
1042
1043        # so keep original name after a series of cloning operations
1044        new_pipeline.original_name = self.original_name
1045
1046        # lookup tasks in new pipeline
1047        new_pipeline.head_tasks = [new_pipeline[t._name]
1048                                   for t in self.head_tasks]
1049        new_pipeline.tail_tasks = [new_pipeline[t._name]
1050                                   for t in self.tail_tasks]
1051
1052        # do not copy a suspended state, but always set to active
1053        new_pipeline.state = "active"
1054        return new_pipeline
1055
1056    def mkdir(self, *unnamed_args, **named_args):
1057        """
1058        Makes directories each incoming input to a corresponding output
1059        This is a One to One operation
1060        """
1061        name = get_name_from_args(named_args)
1062        # func is a placeholder...
1063        if name is None:
1064            self.cnt_mkdir += 1
1065            if self.cnt_mkdir == 1:
1066                name = "mkdir"
1067            else:
1068                name = "mkdir # %d" % self.cnt_mkdir
1069        task = self._create_task(task_func=job_wrapper_mkdir, task_name=name)
1070        task.created_via_decorator = False
1071        task.syntax = "pipeline.mkdir"
1072        task.description_with_args_placeholder = "%s(name = %r, %%s)" % (
1073            task.syntax, task._get_display_name())
1074        task._prepare_mkdir(unnamed_args, named_args,
1075                            task.description_with_args_placeholder)
1076        return task
1077
1078    def _do_create_task_by_OOP(self, task_func, named_args, syntax):
1079        """
1080        Helper function for
1081            Pipeline.transform
1082            Pipeline.originate
1083            pipeline.split
1084            pipeline.subdivide
1085            pipeline.parallel
1086            pipeline.files
1087            pipeline.combinations_with_replacement
1088            pipeline.combinations
1089            pipeline.permutations
1090            pipeline.product
1091            pipeline.collate
1092            pipeline.merge
1093        """
1094        name = get_name_from_args(named_args)
1095
1096        #   if task_func is a string, will
1097        #       1) set self.task_func = exec_string_as_task_func
1098        #       2) set self.name if necessary to the first unambigous words of the the command_str
1099        #       2) set self.func_description to the command_str
1100        task = self._create_task(task_func, name)
1101
1102        task.created_via_decorator = False
1103        task.syntax = syntax
1104        if isinstance(task_func, (path_str_type,)):
1105            task_func_name = task._name
1106        else:
1107            task_func_name = task_func.__name__
1108
1109        task.description_with_args_placeholder = "{syntax}(name = {task_display_name!r}, task_func = {task_func_name}, %s)" \
1110            .format(syntax=syntax,
1111                    task_display_name=task._get_display_name(),
1112                    task_func_name=task_func_name,)
1113
1114        if isinstance(task_func, (path_str_type,)):
1115            #
1116            #   Make sure extras is  dict
1117            #
1118            if "extras" in named_args:
1119                if not isinstance(named_args["extras"], dict):
1120                    raise ruffus_exceptions.error_executable_str((task.description_with_args_placeholder % "...") +
1121                                               "\n requires a dictionary for named parameters. " +
1122                                               "For example:\n" +
1123                                               task.description_with_args_placeholder %
1124                                               "extras = {my_param = 45, her_param = 'whatever'}")
1125            else:
1126                named_args["extras"] = dict()
1127            named_args["extras"]["command_str"] = task_func
1128            # named_args["extras"]["__RUFFUS_TASK_CALLBACK__"] = pipeline.command_str_callback
1129
1130        return task
1131
1132    def lookup_task_from_name(self, task_name, default_module_name):
1133        """
1134        If lookup returns None, means ambiguous: do nothing
1135        Only ever returns a list of one
1136        """
1137        multiple_tasks = []
1138
1139        #   Does the unqualified name uniquely identify?
1140        if task_name in self.lookup:
1141            if len(self.lookup[task_name]) == 1:
1142                return self.lookup[task_name]
1143            else:
1144                multiple_tasks = self.lookup[task_name]
1145
1146        #   Even if the unqualified name does not uniquely identify,
1147        #       maybe the qualified name does
1148        full_qualified_name = default_module_name + "." + task_name
1149        if full_qualified_name in self.lookup:
1150            if len(self.lookup[full_qualified_name]) == 1:
1151                return self.lookup[full_qualified_name]
1152            else:
1153                multiple_tasks = self.lookup[task_name]
1154
1155        #   Nothing matched
1156        if not multiple_tasks:
1157            return []
1158
1159        #   If either the qualified or unqualified name is ambiguous, throw...
1160        task_names = ",".join(t._name for t in multiple_tasks)
1161        raise ruffus_exceptions.error_ambiguous_task("%s is ambiguous. Which do you mean? (%s)."
1162                                   % (task_name, task_names))
1163
1164    def follows(self, task_func, *unnamed_args, **named_args):
1165        """
1166        Transforms each incoming input to a corresponding output
1167        This is a One to One operation
1168        """
1169        task = self._do_create_task_by_OOP(
1170            task_func, named_args, "pipeline.follows")
1171        task.deferred_follow_params.append([task.description_with_args_placeholder, False,
1172                                            unnamed_args])
1173        # task._connect_parents(task.description_with_args_placeholder, False,
1174        #                 unnamed_args)
1175        return task
1176
1177    def check_if_uptodate(self, task_func, func, **named_args):
1178        """
1179        Specifies how a task is to be checked if it needs to be rerun (i.e. is
1180        up-to-date).
1181        func returns true if input / output files are up to date
1182        func takes as many arguments as the task function
1183        """
1184        task = self._do_create_task_by_OOP(
1185            task_func, named_args, "check_if_uptodate")
1186        return task.check_if_uptodate(func)
1187
1188    def graphviz(self, task_func, *unnamed_args, **named_args):
1189        """
1190        Transforms each incoming input to a corresponding output
1191        This is a One to One operation
1192        """
1193        task = self._do_create_task_by_OOP(
1194            task_func, named_args, "pipeline.graphviz")
1195        task.graphviz_attributes = named_args
1196        if len(unnamed_args):
1197            raise TypeError("Only named arguments expected in :" +
1198                            task.description_with_args_placeholder % unnamed_args)
1199        return task
1200
1201    def transform(self, task_func, *unnamed_args, **named_args):
1202        """
1203        Transforms each incoming input to a corresponding output
1204        This is a One to One operation
1205        """
1206        task = self._do_create_task_by_OOP(
1207            task_func, named_args, "pipeline.transform")
1208        task._prepare_transform(unnamed_args, named_args)
1209        return task
1210
1211    def originate(self, task_func, *unnamed_args, **named_args):
1212        """
1213        Originates a new set of output files,
1214            one output per call to the task function
1215        """
1216        task = self._do_create_task_by_OOP(
1217            task_func, named_args, "pipeline.originate")
1218        task._prepare_originate(unnamed_args, named_args)
1219        return task
1220
1221    def split(self, task_func, *unnamed_args, **named_args):
1222        """
1223        Splits a single set of input files into multiple output file names,
1224            where the number of output files may not be known beforehand.
1225        This is a One to Many operation
1226        """
1227        task = self._do_create_task_by_OOP(
1228            task_func, named_args, "pipeline.split")
1229        task._prepare_split(unnamed_args, named_args)
1230        return task
1231
1232    def subdivide(self, task_func, *unnamed_args, **named_args):
1233        """
1234        Subdivides a each set of input files into multiple output file names,
1235            where the number of output files may not be known beforehand.
1236        This is a Many to Even More operation
1237        """
1238        task = self._do_create_task_by_OOP(
1239            task_func, named_args, "pipeline.subdivide")
1240        task._prepare_subdivide(unnamed_args, named_args)
1241        return task
1242
1243    def merge(self, task_func, *unnamed_args, **named_args):
1244        """
1245        Merges multiple input files into a single output.
1246        This is a Many to One operation
1247        """
1248        task = self._do_create_task_by_OOP(
1249            task_func, named_args, "pipeline.merge")
1250        task._prepare_merge(unnamed_args, named_args)
1251        return task
1252
1253    def collate(self, task_func, *unnamed_args, **named_args):
1254        """
1255        Collates each set of multiple matching input files into an output.
1256        This is a Many to Fewer operation
1257        """
1258        task = self._do_create_task_by_OOP(
1259            task_func, named_args, "pipeline.collate")
1260        task._prepare_collate(unnamed_args, named_args)
1261        return task
1262
1263    def product(self, task_func, *unnamed_args, **named_args):
1264        """
1265        All-vs-all Product between items from each set of inputs
1266        """
1267        task = self._do_create_task_by_OOP(
1268            task_func, named_args, "pipeline.product")
1269        task._prepare_product(unnamed_args, named_args)
1270        return task
1271
1272    def permutations(self, task_func, *unnamed_args, **named_args):
1273        """
1274        Permutations between items from a set of inputs
1275        * k-length tuples
1276        * all possible orderings
1277        * no self vs self
1278        """
1279        task = self._do_create_task_by_OOP(
1280            task_func, named_args, "pipeline.permutations")
1281        task._prepare_combinatorics(
1282            unnamed_args, named_args, ruffus_exceptions.error_task_permutations)
1283        return task
1284
1285    def combinations(self, task_func, *unnamed_args, **named_args):
1286        """
1287        Combinations of items from a set of inputs
1288        * k-length tuples
1289        * Single (sorted) ordering, i.e. AB is the same as BA,
1290        * No repeats. No AA, BB
1291        For Example:
1292            combinations("ABCD", 3) = ['ABC', 'ABD', 'ACD', 'BCD']
1293            combinations("ABCD", 2) = ['AB', 'AC', 'AD', 'BC', 'BD', 'CD']
1294        """
1295        task = self._do_create_task_by_OOP(
1296            task_func, named_args, "pipeline.combinations")
1297        task._prepare_combinatorics(
1298            unnamed_args, named_args, ruffus_exceptions.error_task_combinations)
1299        return task
1300
1301    def combinations_with_replacement(self, task_func, *unnamed_args,
1302                                      **named_args):
1303        """
1304        Combinations with replacement of items from a set of inputs
1305        * k-length tuples
1306        * Single (sorted) ordering, i.e. AB is the same as BA,
1307        * Repeats. AA, BB, AAC etc.
1308        For Example:
1309            combinations_with_replacement("ABCD", 2) = [
1310                'AA', 'AB', 'AC', 'AD',
1311                'BB', 'BC', 'BD',
1312                'CC', 'CD',
1313                'DD']
1314            combinations_with_replacement("ABCD", 3) = [
1315                'AAA', 'AAB', 'AAC', 'AAD',
1316                'ABB', 'ABC', 'ABD',
1317                'ACC', 'ACD',
1318                'ADD',
1319                'BBB', 'BBC', 'BBD',
1320                'BCC', 'BCD',
1321                'BDD',
1322                'CCC', 'CCD',
1323                'CDD',
1324                'DDD']
1325        """
1326        task = self._do_create_task_by_OOP(
1327            task_func, named_args, "combinations_with_replacement")
1328        task._prepare_combinatorics(unnamed_args, named_args,
1329                                    ruffus_exceptions.error_task_combinations_with_replacement)
1330        return task
1331
1332    def files(self, task_func, *unnamed_args, **named_args):
1333        """
1334        calls user function in parallel
1335            with either each of a list of parameters
1336            or using parameters generated by a custom function
1337
1338            In the parameter list,
1339                The first two items of each set of parameters must
1340                be input/output files or lists of files or Null
1341        """
1342        task = self._do_create_task_by_OOP(
1343            task_func, named_args, "pipeline.files")
1344        task._prepare_files(unnamed_args, named_args)
1345        return task
1346
1347    def parallel(self, task_func, *unnamed_args, **named_args):
1348        """
1349        calls user function in parallel
1350            with either each of a list of parameters
1351            or using parameters generated by a custom function
1352        """
1353        task = self._do_create_task_by_OOP(
1354            task_func, named_args, "pipeline.parallel")
1355        task._prepare_parallel(unnamed_args, named_args)
1356        return task
1357
1358    # Forwarding functions. Should bring procedural function here and
1359    # forward from the other direction?
1360    def run(self, *unnamed_args, **named_args):
1361        if "pipeline" not in named_args:
1362            named_args["pipeline"] = self
1363        pipeline_run(*unnamed_args, **named_args)
1364
1365    def printout(self, *unnamed_args, **named_args):
1366        if "pipeline" not in named_args:
1367            named_args["pipeline"] = self
1368        pipeline_printout(*unnamed_args, **named_args)
1369
1370    def get_task_names(self, *unnamed_args, **named_args):
1371        if "pipeline" not in named_args:
1372            named_args["pipeline"] = self
1373        pipeline_get_task_names(*unnamed_args, **named_args)
1374
1375    def printout_graph(self, *unnamed_args, **named_args):
1376        if "pipeline" not in named_args:
1377            named_args["pipeline"] = self
1378        pipeline_printout_graph(*unnamed_args, **named_args)
1379
1380
1381# Global default shared pipeline (used for decorators)
1382main_pipeline = Pipeline(name="main")
1383
1384
1385def lookup_unique_task_from_func(task_func, default_pipeline_name="main"):
1386    """
1387    Go through all pipelines and match task_func to find a unique task
1388    Throw exception if ambiguous
1389    """
1390
1391    def unique_task_from_func_in_pipeline(task_func, pipeline):
1392        if task_func in pipeline.lookup:
1393            if len(pipeline.lookup[task_func]) == 1:
1394                # Found task!
1395                return pipeline.lookup[task_func][0]
1396
1397            # Found too many tasks! Ambiguous...
1398            task_names = ", ".join(
1399                task._name for task in pipeline.lookup[task_func])
1400            raise ruffus_exceptions.error_ambiguous_task(
1401                "Function def %s(...): is used by multiple tasks (%s). Which one do you mean?."
1402                % (task_func.__name__, task_names))
1403        return None
1404
1405    #
1406    #   Iterate through all pipelines starting with the specified pipeline
1407    #
1408    task = unique_task_from_func_in_pipeline(
1409        task_func, Pipeline.pipelines[default_pipeline_name])
1410    if task:
1411        return task
1412
1413    #
1414    #   Sees if function uniquely identifies a single task across pipelines
1415    #
1416    found_tasks = []
1417    found_pipelines = []
1418    for pipeline in Pipeline.pipelines.values():
1419        task = unique_task_from_func_in_pipeline(task_func, pipeline)
1420        if task:
1421            found_tasks.append(task)
1422            found_pipelines.append(pipeline)
1423
1424    if len(found_tasks) == 1:
1425        return found_tasks[0]
1426
1427    if len(found_tasks) > 1:
1428        raise ruffus_exceptions.error_ambiguous_task("Task Name %s is ambiguous and specifies different tasks "
1429                                   "across multiple pipelines (%s)."
1430                                   % (task_func.__name__, ",".join(found_pipelines)))
1431
1432    return None
1433
1434
1435def lookup_tasks_from_name(task_name, default_pipeline_name, default_module_name="__main__",
1436                           pipeline_names_as_alias_to_all_tasks=False):
1437    """
1438
1439        Tries:
1440            (1) Named pipeline in the format pipeline::task_name
1441            (2) tasks matching task_name in default_pipeline_name
1442            (3) pipeline names matching task_name
1443            (4) if task_name uniquely identifies any task in all other pipelines...
1444
1445        Only returns multiple tasks if (3) task_name is the name of a pipeline
1446    """
1447
1448    # Lookup the task from the function or task name
1449    pipeline_name, task_name = re.match("(?:(.+)::)?(.*)", task_name).groups()
1450
1451    #
1452    #   (1) Look in specified pipeline
1453    #      Will blow up if task_name is ambiguous
1454    #
1455    if pipeline_name:
1456        if pipeline_name not in Pipeline.pipelines:
1457            raise ruffus_exceptions.error_not_a_pipeline("%s is not a pipeline." % pipeline_name)
1458        pipeline = Pipeline.pipelines[pipeline_name]
1459        return pipeline.lookup_task_from_name(task_name, default_module_name)
1460
1461    #
1462    #   (2) Try default pipeline
1463    #      Will blow up if task_name is ambiguous
1464    #
1465    if default_pipeline_name not in Pipeline.pipelines:
1466        raise ruffus_exceptions.error_not_a_pipeline(
1467            "%s is not a pipeline." % default_pipeline_name)
1468    pipeline = Pipeline.pipelines[default_pipeline_name]
1469    tasks = pipeline.lookup_task_from_name(task_name, default_module_name)
1470    if tasks:
1471        return tasks
1472
1473    #   (3) task_name is actually the name of a pipeline
1474    #      Alias for pipeline.get_tail_tasks()
1475    #      N.B. This is the *only* time multiple tasks might be returned
1476    #
1477    if task_name in Pipeline.pipelines:
1478        if pipeline_names_as_alias_to_all_tasks:
1479            return Pipeline.pipelines[task_name].tasks
1480        elif len(Pipeline.pipelines[task_name].get_tail_tasks()):
1481            return Pipeline.pipelines[task_name].get_tail_tasks()
1482        else:
1483            raise ruffus_exceptions.error_no_tail_tasks(
1484                "Pipeline %s has no tail tasks defined. Which task do you "
1485                "mean when you specify the whole pipeline as a dependency?" % task_name)
1486
1487    #
1488    #   (4) Try all other pipelines
1489    #      Will blow up if task_name is ambiguous
1490    #
1491    found_tasks = []
1492    found_pipelines = []
1493    for pipeline_name, pipeline in Pipeline.pipelines.items():
1494        tasks = pipeline.lookup_task_from_name(task_name, default_module_name)
1495        if tasks:
1496            found_tasks.append(tasks)
1497            found_pipelines.append(pipeline_name)
1498
1499    # unambiguous: good
1500    if len(found_tasks) == 1:
1501        return found_tasks[0]
1502
1503    # ambiguous: bad
1504    if len(found_tasks) > 1:
1505        raise ruffus_exceptions.error_ambiguous_task(
1506            "Task Name %s is ambiguous and specifies different tasks across "
1507            "several pipelines (%s)." % (task_name, ",".join(found_pipelines)))
1508
1509    # Nothing found
1510    return []
1511
1512
1513def lookup_tasks_from_user_specified_names(task_description, task_names,
1514                                           default_pipeline_name="main",
1515                                           default_module_name="__main__",
1516                                           pipeline_names_as_alias_to_all_tasks=False):
1517    """
1518    Given a list of task names, look up the corresponding tasks
1519    Will just pass through if the task_name is already a task
1520    """
1521
1522    #
1523    #   In case we are given a single item instead of a list
1524    #
1525    if not isinstance(task_names, (list, tuple)):
1526        task_names = [task_names]
1527
1528    task_list = []
1529
1530    for task_name in task_names:
1531
1532        # "task_name" is a Task or pipeline, add those
1533        if isinstance(task_name, Task):
1534            task_list.append(task_name)
1535            continue
1536
1537        elif isinstance(task_name, Pipeline):
1538            if pipeline_names_as_alias_to_all_tasks:
1539                task_list.extend(task_name.tasks)
1540                continue
1541            # use tail tasks
1542            elif len(task_name.get_tail_tasks()):
1543                task_list.extend(task_name.get_tail_tasks())
1544                continue
1545            # no tail task
1546            else:
1547                raise ruffus_exceptions.error_no_tail_tasks("Pipeline %s has no 'tail tasks'. Which task do you mean"
1548                                          " when you specify the whole pipeline?" % task_name.name)
1549
1550        if isinstance(task_name, Callable):
1551            # blows up if ambiguous
1552            task = lookup_unique_task_from_func(
1553                task_name, default_pipeline_name)
1554            # blow up for unwrapped function
1555            if not task:
1556                raise ruffus_exceptions.error_function_is_not_a_task(
1557                    ("Function def %s(...): is not a Ruffus task." % task_func.__name__) +
1558                    " The function needs to have a ruffus decoration like "
1559                    "'@transform', or be a member of a ruffus.Pipeline().")
1560
1561            task_list.append(task)
1562            continue
1563
1564        # some kind of string: task or func or pipeline name?
1565        if isinstance(task_name, path_str_type):
1566
1567            # Will throw Exception if ambiguous
1568            tasks = lookup_tasks_from_name(
1569                task_name, default_pipeline_name, default_module_name,
1570                pipeline_names_as_alias_to_all_tasks)
1571            # not found
1572            if not tasks:
1573                raise ruffus_exceptions.error_node_not_task("%s task '%s' is not a pipelined task in Ruffus. Is it "
1574                                          "spelt correctly ?" % (task_description, task_name))
1575            task_list.extend(tasks)
1576            continue
1577
1578        else:
1579            raise TypeError(
1580                "Expecting a string or function, or a Ruffus Task or Pipeline object")
1581    return task_list
1582
1583
1584class Task(node):
1585
1586    """
1587
1588    * Represents each stage of a pipeline.
1589    * Associated with a single python function.
1590    * Identified uniquely within the pipeline by its name.
1591
1592    """
1593
1594    # DEBUGGG
1595    # def __str__ (self):
1596    #    return "Task = <%s>" % self._get_display_name()
1597
1598    _action_names = ["unspecified",
1599                     "task",
1600                     "task_files_re",
1601                     "task_split",
1602                     "task_merge",
1603                     "task_transform",
1604                     "task_collate",
1605                     "task_files_func",
1606                     "task_files",
1607                     "task_mkdir",
1608                     "task_parallel",
1609                     "task_active_if",
1610                     "task_product",
1611                     "task_permutations",
1612                     "task_combinations",
1613                     "task_combinations_with_replacement",
1614                     "task_subdivide",
1615                     "task_originate",
1616                     "task_graphviz",
1617                     ]
1618    # ENUMS
1619    (_action_unspecified,
1620     _action_task,
1621     _action_task_files_re,
1622     _action_task_split,
1623     _action_task_merge,
1624     _action_task_transform,
1625     _action_task_collate,
1626     _action_task_files_func,
1627     _action_task_files,
1628     _action_mkdir,
1629     _action_task_parallel,
1630     _action_active_if,
1631     _action_task_product,
1632     _action_task_permutations,
1633     _action_task_combinations,
1634     _action_task_combinations_with_replacement,
1635     _action_task_subdivide,
1636     _action_task_originate,
1637     _action_task_graphviz) = range(19)
1638
1639    (_multiple_jobs_outputs,
1640     _single_job_single_output,
1641     _job_single_matches_parent) = range(3)
1642
1643    _job_limit_semaphores = {}
1644
1645    # _________________________________________________________________________
1646
1647    #   _get_action_name
1648
1649    # _________________________________________________________________________
1650    def _get_action_name(self):
1651        return Task._action_names[self._action_type]
1652
1653    # _________________________________________________________________________
1654
1655    #   __init__
1656
1657    # _________________________________________________________________________
1658    def __init__(self, func, task_name, pipeline=None, command_str=None):
1659        """
1660        * Creates a Task object with a specified python function and task name
1661        * The type of the Task (whether it is a transform or merge or collate
1662            etc. operation) is specified subsequently. This is because Ruffus
1663            decorators do not have to be specified in order, and we don't
1664            know ahead of time.
1665        """
1666        if pipeline is None:
1667            pipeline = main_pipeline
1668        self.pipeline = pipeline
1669        # no function: just string
1670        if command_str is not None:
1671            self.func_module_name = ""
1672            self.func_name = ""
1673            self.func_description = command_str
1674        else:
1675            self.func_module_name = str(func.__module__)
1676            self.func_name = func.__name__
1677            # convert description into one line
1678            self.func_description = re.sub(
1679                r"\n\s+", " ", func.__doc__).strip() if func.__doc__ else ""
1680
1681        if not task_name:
1682            task_name = self.func_module_name + "." + self.func_name
1683
1684        node.__init__(self, task_name)
1685        self._action_type = Task._action_task
1686        self._action_type_desc = Task._action_names[self._action_type]
1687
1688        #   Each task has its own checksum level
1689        #   At the moment this is really so multiple pipelines in the same
1690        #       script can have different checksum levels
1691        # Though set by pipeline_xxxx functions, have initial valid value so
1692        # unit tests work :-|
1693        self.checksum_level = CHECKSUM_FILE_TIMESTAMPS
1694        self.param_generator_func = None
1695        self.needs_update_func = None
1696        self.job_wrapper = job_wrapper_generic
1697
1698        #
1699        self.job_descriptor = generic_job_descriptor
1700
1701        # jobs which produce a single output.
1702        # special handling for task.get_output_files for dependency chaining
1703        self._is_single_job_single_output = self._multiple_jobs_outputs
1704        self.single_multi_io = self._many_to_many
1705
1706        # function which is decorated and does the actual work
1707        self.user_defined_work_func = func
1708
1709        # functions which will be called when task completes
1710        self.posttask_functions = []
1711
1712        # give makedir automatically made parent tasks unique names
1713        self.cnt_task_mkdir = 0
1714
1715        # whether only task function itself knows what output it will produce
1716        # i.e. output is a glob or something similar
1717        self.indeterminate_output = 0
1718
1719        # cache output file names here
1720        self.output_filenames = None
1721
1722        # semaphore name must be unique
1723        self.semaphore_name = pipeline.name + ":" + task_name
1724
1725        # do not test for whether task is active
1726        self.active_if_checks = None
1727
1728        # extra flag for outputfiles
1729        self.is_active = True
1730
1731        # Created via decorator or OO interface
1732        #   so that display_name looks more natural
1733        self.created_via_decorator = False
1734
1735        # Finish setting up task
1736        self._setup_task_func = Task._do_nothing_setup
1737
1738        # Finish setting up task
1739        self.deferred_follow_params = []
1740
1741        # Finish setting up task
1742        self.parsed_args = {}
1743        self.error_type = None
1744
1745        # @split or pipeline.split etc.
1746        self.syntax = ""
1747
1748        self.description_with_args_placeholder = "%s"
1749
1750        # whether task has a (re-specifiable) input parameter
1751        self.has_input_param = True
1752        self.has_pipeline_in_input_param = False
1753
1754        # add to pipeline's lookup
1755        # this code is here rather than the pipeline so that current unittests
1756        #   do not need to bother about pipeline
1757        if task_name in self.pipeline.task_names:
1758            raise ruffus_exceptions.error_duplicate_task_name("Same task name %s specified multiple times in the "
1759                                            "same pipeline (%s)" % (task_name, self.pipeline.name))
1760
1761        self.pipeline.tasks.add(self)
1762
1763        # task_name is always a unique lookup and overrides everything else
1764        self.pipeline[task_name] = self
1765        self.pipeline.lookup[task_name] = [self]
1766        self.pipeline.task_names.add(task_name)
1767
1768        self.command_str_callback = "PIPELINE"
1769
1770        #
1771        #   Allow pipeline to lookup task by
1772        #       1) Func
1773        #       2) task name
1774        #       3) func name
1775        #
1776        #   Ambiguous func names returns an empty list []
1777        #
1778
1779        for lookup in (func, self.func_name, self.func_module_name + "." + self.func_name):
1780            # don't add to lookup if this conflicts with a task_name which is
1781            # always unique and overriding
1782            if lookup == ".":
1783                continue
1784            if lookup not in self.pipeline.task_names:
1785                # non-unique map
1786                if lookup in self.pipeline.lookup:
1787                    self.pipeline.lookup[lookup].append(self)
1788                    # remove non-uniques from Pipeline
1789                    if lookup in self.pipeline:
1790                        del self.pipeline[lookup]
1791                else:
1792                    self.pipeline.lookup[lookup] = [self]
1793                    self.pipeline[lookup] = self
1794
1795    # _________________________________________________________________________
1796
1797    #   _clone
1798
1799    # _________________________________________________________________________
1800    def _clone(self, new_pipeline):
1801        """
1802        * Clones a Task object from self
1803        """
1804        new_task = Task(self.user_defined_work_func, self._name, new_pipeline)
1805        new_task.command_str_callback = self.command_str_callback
1806        new_task._action_type = self._action_type
1807        new_task._action_type_desc = self._action_type_desc
1808        new_task.checksum_level = self.checksum_level
1809        new_task.param_generator_func = self.param_generator_func
1810        new_task.needs_update_func = self.needs_update_func
1811        new_task.job_wrapper = self.job_wrapper
1812        new_task.job_descriptor = self.job_descriptor
1813        new_task._is_single_job_single_output = self._is_single_job_single_output
1814        new_task.single_multi_io = self.single_multi_io
1815        new_task.posttask_functions = copy.deepcopy(self.posttask_functions)
1816        new_task.cnt_task_mkdir = self.cnt_task_mkdir
1817        new_task.indeterminate_output = self.indeterminate_output
1818        new_task.semaphore_name = self.semaphore_name
1819        new_task.is_active = self.is_active
1820        new_task.created_via_decorator = self.created_via_decorator
1821        new_task._setup_task_func = self._setup_task_func
1822        new_task.error_type = self.error_type
1823        new_task.syntax = self.syntax
1824        new_task.description_with_args_placeholder = \
1825            self.description_with_args_placeholder.replace(
1826                self.pipeline.name, new_pipeline.name)
1827        new_task.has_input_param = self.has_input_param
1828        new_task.has_pipeline_in_input_param = self.has_pipeline_in_input_param
1829        new_task.output_filenames = copy.deepcopy(self.output_filenames)
1830        new_task.active_if_checks = copy.deepcopy(self.active_if_checks)
1831        new_task.parsed_args = copy.deepcopy(self.parsed_args)
1832        new_task.deferred_follow_params = copy.deepcopy(
1833            self.deferred_follow_params)
1834
1835        return new_task
1836
1837    # _________________________________________________________________________
1838
1839    #   command_str_callback
1840
1841    # _________________________________________________________________________
1842    def set_command_str_callback(self, command_str_callback):
1843        if not callable(command_str_callback):
1844            raise Exception(
1845                "set_command_str_callback() takes a python function or a callable object.")
1846        self.command_str_callback = command_str_callback
1847
1848    # _________________________________________________________________________
1849
1850    #   set_output
1851
1852    # _________________________________________________________________________
1853
1854    def set_output(self, **args):
1855        """
1856        Changes output parameter(s) for originate
1857            set_input(output  = "test.txt")
1858        """
1859
1860        if self.syntax not in ("pipeline.originate", "@originate"):
1861            raise ruffus_exceptions.error_set_output("Can only set output for originate tasks")
1862        #
1863        #   For product: filter parameter is a list of formatter()
1864        #
1865        if "output" in args:
1866            self.parsed_args["output"] = args["output"]
1867            del args["output"]
1868        else:
1869            raise ruffus_exceptions.error_set_output(
1870                "Missing the output argument in set_input(output=xxx)")
1871
1872        # Non "input" arguments
1873        if len(args):
1874            raise ruffus_exceptions.error_set_output("Unexpected argument name in set_output(%s). "
1875                                   "Only expecting output=xxx." % (args,))
1876    # _________________________________________________________________________
1877
1878    #   set_input
1879
1880    # _________________________________________________________________________
1881    def set_input(self, **args):
1882        """
1883        Changes any of the input parameter(s) of the task
1884        For example:
1885            set_input(input  = "test.txt")
1886            set_input(input2 = "b.txt")
1887            set_input(input = "a.txt", input2 = "b.txt")
1888        """
1889        #
1890        #   For product: filter parameter is a list of formatter()
1891        #
1892        if ("filter" in self.parsed_args and
1893                isinstance(self.parsed_args["filter"], list)):
1894            # the number of input is the count of filter
1895            cnt_expected_input = len(self.parsed_args["filter"])
1896
1897            # make sure the parsed parameter argument is setup, with empty
1898            # lists if necessary
1899            # Should have been done already...
1900            # if self.parsed_args["input"] is None:
1901            #    self.parsed_args["input"] = [[]
1902            #       for i in range(cnt_expected_input)]
1903
1904            #   update each element of the list accordingly
1905            #   removing args so we can check if there is anything left over
1906            for inputN in range(cnt_expected_input):
1907                input_name = "input%d" % (inputN + 1) if inputN else "input"
1908                if input_name in args:
1909                    self.parsed_args["input"][inputN] = args[input_name]
1910                    del args[input_name]
1911
1912            if len(args):
1913                raise ruffus_exceptions.error_set_input("Unexpected arguments in set_input(%s). "
1914                                      "Only expecting inputN=xxx" % (args,))
1915            return
1916
1917        if "input" in args:
1918            self.parsed_args["input"] = args["input"]
1919            del args["input"]
1920        else:
1921            raise ruffus_exceptions.error_set_input(
1922                "Missing the input argument in set_input(input=xxx)")
1923
1924        # Non "input" arguments
1925        if len(args):
1926            raise ruffus_exceptions.error_set_input("Unexpected argument name in set_input(%s). "
1927                                  "Only expecting input=xxx." % (args,))
1928
1929    # _________________________________________________________________________
1930
1931    #   _init_for_pipeline
1932
1933    # _________________________________________________________________________
1934    def _init_for_pipeline(self):
1935        """
1936        Initialize variables for pipeline run / printout
1937
1938        **********
1939          BEWARE
1940        **********
1941
1942        Because state is stored, ruffus is *not* reentrant.
1943
1944        TODO: Need to create runtime DAG to mirror task DAG which holds
1945                output_filenames to be reentrant
1946
1947        **********
1948          BEWARE
1949        **********
1950        """
1951
1952        # cache output file names here
1953        self.output_filenames = None
1954
1955    # _________________________________________________________________________
1956
1957    #   _set_action_type
1958
1959    # _________________________________________________________________________
1960    def _set_action_type(self, new_action_type):
1961        """
1962        Save how this task
1963            1) tests whether it is up-to-date and
1964            2) handles input/output files
1965
1966        Checks that the task has not been defined with conflicting actions
1967
1968        """
1969        if self._action_type not in (Task._action_unspecified, Task._action_task):
1970            old_action = Task._action_names[self._action_type]
1971            new_action = Task._action_names[new_action_type]
1972            actions = " and ".join(list(set((old_action, new_action))))
1973            raise ruffus_exceptions.error_decorator_args("Duplicate task for:\n\n%s\n\n"
1974                                       "This has already been specified with a the same name "
1975                                       "or function\n"
1976                                       "(%r, %s)\n" %
1977                                       (self.description_with_args_placeholder % "...",
1978                                        self._get_display_name(),
1979                                        actions))
1980        self._action_type = new_action_type
1981        self._action_type_desc = Task._action_names[new_action_type]
1982
1983    def _get_job_name(self, descriptive_param, verbose_abbreviated_path, runtime_data):
1984        """
1985        Use job descriptor to return short name for job including any parameters
1986
1987        runtime_data is not (yet) used but may be used to add context in future
1988        """
1989        return self.job_descriptor(descriptive_param, verbose_abbreviated_path, runtime_data)[0]
1990
1991    def _get_display_name(self):
1992        """
1993        Returns task name, removing __main__. namespace or main. if present
1994        """
1995        if self.pipeline.name != "main":
1996            return "{pipeline_name}::{task_name}".format(pipeline_name=self.pipeline.name,
1997                                                         task_name=self._name.replace("__main__.", "").replace("main::", ""))
1998        else:
1999            return self._name.replace("__main__.", "").replace("main::", "")
2000
2001    def _get_decorated_function(self):
2002        """
2003        Returns name of task function, removing __main__ namespace if necessary
2004        If not specified using decorator notation, returns empty string
2005        N.B. Returns trailing new line
2006
2007        """
2008        if not self.created_via_decorator:
2009            return ""
2010
2011        func_name = (self.func_module_name + "." +
2012                     self.func_name) \
2013            if self.func_module_name != "__main__" else self.func_name
2014        return "def %s(...):\n    ...\n" % func_name
2015
2016    def _update_active_state(self):
2017        #   If has an @active_if decorator, check if the task needs to be run
2018        #       @active_if parameters may be call back functions or booleans
2019        if (self.active_if_checks is not None and
2020            any(not arg() if isinstance(arg, Callable) else not arg
2021                for arg in self.active_if_checks)):
2022                # flip is active to false.
2023                #   ( get_output_files() will return empty if inactive )
2024                #   Remember each iteration of pipeline_printout pipeline_run
2025                #   will have another bite at changing this value
2026            self.is_active = False
2027        else:
2028            # flip is active to True so that downstream dependencies will be
2029            #   correct ( get_output_files() will return empty if inactive )
2030            #   Remember each iteration of pipeline_printout pipeline_run will
2031            #   have another bite at changing this value
2032            self.is_active = True
2033
2034    # This code will look much better once we have job level
2035    # dependencies pipeline_run has dependencies percolating
2036    # up/down. Don't want to recreate all the logic here
2037    def _printout(self, runtime_data, force_rerun, job_history, task_is_out_of_date, verbose=1,
2038                  verbose_abbreviated_path=2, indent=4):
2039        """
2040        Print out all jobs for this task
2041
2042            verbose =
2043                    level 1 : logs Out-of-date Task names
2044                    level 2 : logs All Tasks (including any task function
2045                              docstrings)
2046                    level 3 : logs Out-of-date Jobs in Out-of-date Tasks, no
2047                              explanation
2048                    level 4 : logs Out-of-date Jobs in Out-of-date Tasks,
2049                              saying why they are out of date (include only
2050                              list of up-to-date tasks)
2051                    level 5 : All Jobs in Out-of-date Tasks (include only list
2052                              of up-to-date tasks)
2053                    level 6 : All jobs in All Tasks whether out of date or not
2054                    level 7 : Show file modification times for All jobs in All Tasks
2055
2056        """
2057
2058        def _get_job_names(unglobbed_params, indent_str):
2059            job_names = self.job_descriptor(
2060                unglobbed_params, verbose_abbreviated_path, runtime_data)[1]
2061            if len(job_names) > 1:
2062                job_names = ([indent_str + job_names[0]] +
2063                             [indent_str + "      " + jn for jn in job_names[1:]])
2064            else:
2065                job_names = ([indent_str + job_names[0]])
2066            return job_names
2067
2068        if not verbose:
2069            return []
2070
2071        indent_str = ' ' * indent
2072
2073        messages = []
2074
2075        # LOGGER: level 1 : logs Out-of-date Tasks (names and warnings)
2076
2077        messages.append("Task = %r %s " % (self._get_display_name(),
2078                                           ("    >>Forced to rerun<<" if force_rerun else "")))
2079        if verbose == 1:
2080            return messages
2081
2082        # LOGGER: level 2 : logs All Tasks (including any task function
2083        # docstrings)
2084        if verbose >= 2 and len(self.func_description):
2085            messages.append(indent_str + '"' + self.func_description + '"')
2086
2087        #
2088        #   single job state
2089        #
2090        if verbose >= 10:
2091            if self._is_single_job_single_output == self._single_job_single_output:
2092                messages.append("    Single job single output")
2093            elif self._is_single_job_single_output == self._multiple_jobs_outputs:
2094                messages.append("    Multiple jobs Multiple outputs")
2095            else:
2096                messages.append("    Single jobs status depends on %r" %
2097                                self._is_single_job_single_output._get_display_name())
2098
2099        # LOGGER: No job if less than 2
2100        if verbose <= 2:
2101            return messages
2102
2103        # increase indent for jobs up to date status
2104        indent_str += " " * 3
2105
2106        #
2107        #   If has an @active_if decorator, check if the task needs to be run
2108        #       @active_if parameters may be call back functions or booleans
2109        #
2110        if not self.is_active:
2111            # LOGGER
2112            if verbose <= 3:
2113                return messages
2114            messages.append(indent_str + "Task is inactive")
2115            # add spacer line
2116            messages.append("")
2117            return messages
2118
2119        #
2120        #   No parameters: just call task function
2121        #
2122        if self.param_generator_func is None:
2123            # LOGGER
2124            if verbose <= 3:
2125                return messages
2126
2127            #
2128            #   needs update func = None: always needs update
2129            #
2130            if self.needs_update_func is None:
2131                messages.append(
2132                    indent_str + "Task needs update: No func to check if up-to-date.")
2133                return messages
2134
2135            if self.needs_update_func == needs_update_check_modify_time:
2136                needs_update, msg = self.needs_update_func(
2137                    task=self, job_history=job_history,
2138                    verbose_abbreviated_path=verbose_abbreviated_path,
2139                    return_file_dates_when_uptodate=verbose > 6)
2140            else:
2141                needs_update, msg = self.needs_update_func()
2142
2143            if needs_update:
2144                messages.append(indent_str + "Task needs update: %s" % msg)
2145            elif verbose > 6:
2146                messages.append(indent_str + "Task %s" % msg)
2147            #
2148            #   Get rid of up-to-date messages:
2149            #       Superfluous for parts of the pipeline which are up-to-date
2150            #       Misleading for parts of the pipeline which require
2151            #           updating: tasks might have to run based on dependencies
2152            #           anyway
2153            #
2154            # else:
2155            #    if task_is_out_of_date:
2156            #        messages.append(indent_str + "Task appears up-to-date but
2157            #                        will rerun after its dependencies")
2158            #    else:
2159            #        messages.append(indent_str + "Task up-to-date")
2160
2161        else:
2162            runtime_data["MATCH_FAILURE"] = defaultdict(set)
2163            #
2164            #   return messages description per job if verbose > 5 else
2165            #       whether up to date or not
2166            #
2167            cnt_jobs = 0
2168            for params, unglobbed_params in self.param_generator_func(runtime_data):
2169                cnt_jobs += 1
2170
2171                #
2172                #   needs update func = None: always needs update
2173                #
2174                if self.needs_update_func is None:
2175                    if verbose >= 5:
2176                        messages.extend(_get_job_names(
2177                            unglobbed_params, indent_str))
2178                        messages.append(indent_str + "  Jobs needs update: No "
2179                                        "function to check if up-to-date or not")
2180                    continue
2181
2182                if self.needs_update_func == needs_update_check_modify_time:
2183                    needs_update, msg = self.needs_update_func(
2184                        *params, task=self, job_history=job_history,
2185                        verbose_abbreviated_path=verbose_abbreviated_path,
2186                        return_file_dates_when_uptodate=verbose > 6)
2187                else:
2188                    needs_update, msg = self.needs_update_func(*params)
2189
2190                if needs_update:
2191                    messages.extend(_get_job_names(
2192                        unglobbed_params, indent_str))
2193                    if verbose >= 4:
2194                        per_job_messages = [(indent_str + s)
2195                                            for s in ("  Job needs update: %s" % msg).split("\n")]
2196                        messages.extend(per_job_messages)
2197                    else:
2198                        messages.append(indent_str + "  Job needs update")
2199
2200                # up to date: log anyway if verbose
2201                else:
2202                    # LOGGER
2203                    if (task_is_out_of_date and verbose >= 5) or verbose >= 6:
2204                        messages.extend(_get_job_names(
2205                            unglobbed_params, indent_str))
2206                        #
2207                        #   Get rid of up-to-date messages:
2208                        #       Superfluous for parts of the pipeline which are up-to-date
2209                        #       Misleading for parts of the pipeline which require updating:
2210                        #           tasks might have to run based on dependencies anyway
2211                        #
2212                        # if not task_is_out_of_date:
2213                        #    messages.append(indent_str + "  Job up-to-date")
2214                    if verbose > 6:
2215                        messages.extend((indent_str + s)
2216                                        for s in (msg).split("\n"))
2217
2218            if cnt_jobs == 0:
2219                messages.append(indent_str + "!!! No jobs for this task.")
2220                messages.append(indent_str + "Are you sure there is "
2221                                "not a error in your code / regular expression?")
2222            # LOGGER
2223
2224            # DEBUGGGG!!
2225            if verbose >= 4 or (verbose and cnt_jobs == 0):
2226                if runtime_data and "MATCH_FAILURE" in runtime_data and\
2227                        self.param_generator_func in runtime_data["MATCH_FAILURE"]:
2228                    for job_msg in runtime_data["MATCH_FAILURE"][self.param_generator_func]:
2229                        messages.append(
2230                            indent_str + "Job Warning: Input substitution failed:")
2231                        messages.append(
2232                            indent_str + "             Do your regular expressions match the corresponding Input?")
2233                        messages.extend("  " + indent_str +
2234                                        line for line in job_msg.split("\n"))
2235
2236            runtime_data["MATCH_FAILURE"][self.param_generator_func] = set()
2237        messages.append("")
2238        return messages
2239
2240    # _________________________________________________________________________
2241
2242    #   _is_up_to_date
2243    #
2244    #       use to be named signal
2245    #       returns whether up to date
2246    #       stops recursing if true
2247    #
2248    # _________________________________________________________________________
2249    def _is_up_to_date(self, verbose_logger_job_history):
2250        """
2251        If true, depth first search will not pass through this node
2252        """
2253        if not verbose_logger_job_history:
2254            raise Exception("verbose_logger_job_history is None")
2255
2256        verbose_logger = verbose_logger_job_history[0]
2257        job_history = verbose_logger_job_history[1]
2258
2259        try:
2260            logger = verbose_logger.logger
2261            verbose = verbose_logger.verbose
2262            runtime_data = verbose_logger.runtime_data
2263            verbose_abbreviated_path = verbose_logger.verbose_abbreviated_path
2264
2265            log_at_level(logger, 10, verbose, "  Task = %r " %
2266                         self._get_display_name())
2267
2268            #
2269            #   If job is inactive, always consider it up-to-date
2270            #
2271            if (self.active_if_checks is not None and
2272                any(not arg() if isinstance(arg, Callable) else not arg
2273                    for arg in self.active_if_checks)):
2274                log_at_level(logger, 10, verbose,
2275                             "    Inactive task: treat as Up to date")
2276                # print 'signaling that the inactive task is up to date'
2277                return True
2278
2279            #
2280            #   Always needs update if no way to check if up to date
2281            #
2282            if self.needs_update_func is None:
2283                log_at_level(logger, 10, verbose,
2284                             "    No update function: treat as out of date")
2285                return False
2286
2287            #
2288            #   if no parameters, just return the results of needs update
2289            #
2290            if self.param_generator_func is None:
2291                if self.needs_update_func == needs_update_check_modify_time:
2292                    needs_update, ignore_msg = self.needs_update_func(
2293                        task=self, job_history=job_history,
2294                        verbose_abbreviated_path=verbose_abbreviated_path)
2295                else:
2296                    needs_update, ignore_msg = self.needs_update_func()
2297                log_at_level(logger, 10, verbose,
2298                             "    Needs update = %s" % needs_update)
2299                return not needs_update
2300            else:
2301                #
2302                #   return not up to date if ANY jobs needs update
2303                #
2304                for params, unglobbed_params in self.param_generator_func(runtime_data):
2305                    if self.needs_update_func == needs_update_check_modify_time:
2306                        needs_update, ignore_msg = self.needs_update_func(
2307                            *params, task=self, job_history=job_history,
2308                            verbose_abbreviated_path=verbose_abbreviated_path)
2309                    else:
2310                        needs_update, ignore_msg = self.needs_update_func(
2311                            *params)
2312                    if needs_update:
2313                        log_at_level(logger, 10, verbose, "    Needing update:\n      %s"
2314                                     % self._get_job_name(unglobbed_params,
2315                                                          verbose_abbreviated_path, runtime_data))
2316                        return False
2317
2318                #
2319                #   Percolate warnings from parameter factories
2320                #
2321                #  !!
2322                if (verbose >= 1 and "ruffus_WARNING" in runtime_data and
2323                        self.param_generator_func in runtime_data["ruffus_WARNING"]):
2324                    for msg in runtime_data["ruffus_WARNING"][self.param_generator_func]:
2325                        logger.warning("    'In Task\n%s\n%s" % (
2326                                       self.description_with_args_placeholder % "...", msg))
2327
2328                log_at_level(logger, 10, verbose, "    All jobs up to date")
2329
2330                return True
2331
2332        #
2333        # removed for compatibility with python 3.x
2334        #
2335        # rethrow exception after adding task name
2336        # except error_task, inst:
2337        #    inst.specify_task(self, "Exceptions in dependency checking")
2338        #    raise
2339
2340        except:
2341            exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
2342            #
2343            # rethrow exception after adding task name
2344            #
2345            if exceptionType == error_task:
2346                exceptionValue.specify
2347                inst.specify_task(self, "Exceptions in dependency checking")
2348                raise
2349
2350            exception_stack = traceback.format_exc()
2351            exception_name = exceptionType.__module__ + '.' + exceptionType.__name__
2352            exception_value = str(exceptionValue)
2353            if len(exception_value):
2354                exception_value = "(%s)" % exception_value
2355            errt = ruffus_exceptions.RethrownJobError([(self._name,
2356                                      "",
2357                                      exception_name,
2358                                      exception_value,
2359                                      exception_stack)])
2360            errt.specify_task(self, "Exceptions generating parameters")
2361            raise errt
2362
2363    # _________________________________________________________________________
2364
2365    #   _get_output_files
2366    #
2367    #
2368    # _________________________________________________________________________
2369    def _get_output_files(self, do_not_expand_single_job_tasks, runtime_data):
2370        """
2371        Cache output files
2372
2373            Normally returns a list with one item for each job or a just a list
2374            of file names.
2375            For "_single_job_single_output"
2376                i.e. @merge and @files with single jobs,
2377                returns the output of a single job (i.e. can be a string)
2378        """
2379
2380        #
2381        #   N.B. active_if_checks is called once per task
2382        #        in make_job_parameter_generator() for consistency
2383        #
2384        #   self.is_active can be set using self.active_if_checks in that
2385        #       function, and therefore can be changed BETWEEN invocations
2386        #       of pipeline_run
2387        #
2388        #   self.is_active is not used anywhere else
2389        #
2390        if (not self.is_active):
2391            return []
2392
2393        if self.output_filenames is None:
2394
2395            self.output_filenames = []
2396
2397            # skip tasks which don't have parameters
2398            if self.param_generator_func is not None:
2399
2400                cnt_jobs = 0
2401                for params, unglobbed_params in self.param_generator_func(runtime_data):
2402                    cnt_jobs += 1
2403                    # skip tasks which don't have output parameters
2404                    if len(params) >= 2:
2405                        # make sure each @split or @subdivide or @originate
2406                        #   returns a list of jobs
2407                        #   i.e. each @split or @subdivide or @originate is
2408                        #       always a ->many operation
2409                        #       even if len(many) can be 1 (or zero)
2410                        if self.indeterminate_output and not non_str_sequence(params[1]):
2411                            self.output_filenames.append([params[1]])
2412                        else:
2413                            self.output_filenames.append(params[1])
2414
2415                if self._is_single_job_single_output == self._single_job_single_output:
2416                    if cnt_jobs > 1:
2417                        raise ruffus_exceptions.error_task_get_output(self, "Task which is supposed to produce a "
2418                                                    "single output somehow has more than one job.")
2419
2420                #
2421                #   The output of @split should be treated as multiple jobs
2422                #
2423                #       The output of @split is always a list of lists:
2424                #         1) There is a list of @split jobs
2425                #            A) For advanced (regex) @split
2426                #               this is a many -> many more operation
2427                #               So len(list) == many (i.e. the number of jobs
2428                #            B) For normal @split
2429                #               this is a  1   -> many operation
2430                #               So len(list)  = 1
2431                #
2432                #         2) The output of each @split job is a list
2433                #            The items in this list of lists are each a job in
2434                #               subsequent tasks
2435                #
2436                #
2437                #         So we need to concatenate these separate lists into a
2438                #         single list of output
2439                #
2440                #         For example:
2441                #         @split(["a.1", "b.1"], regex(r"(.)\.1"), r"\1.*.2")
2442                #         def example(input, output):
2443                # JOB 1
2444                # a.1 -> a.i.2
2445                # -> a.j.2
2446                #
2447                # JOB 2
2448                # b.1 -> b.i.2
2449                # -> b.j.2
2450                #
2451                #         output_filenames = [ [a.i.2, a.j.2], [b.i.2, b.j.2] ]
2452                #
2453                #         we want [ a.i.2, a.j.2, b.i.2, b.j.2 ]
2454                #
2455                #         This also works for simple @split
2456                #
2457                #         @split("a.1", r"a.*.2")
2458                #         def example(input, output):
2459                # only job
2460                # a.1 -> a.i.2
2461                # -> a.j.2
2462                #
2463                #         output_filenames = [ [a.i.2, a.j.2] ]
2464                #
2465                #         we want [ a.i.2, a.j.2 ]
2466                #
2467                if len(self.output_filenames) and self.indeterminate_output:
2468                    self.output_filenames = reduce(
2469                        lambda x, y: x + y, self.output_filenames)
2470
2471        # special handling for jobs which have a single task
2472        if (do_not_expand_single_job_tasks and
2473                self._is_single_job_single_output and
2474                len(self.output_filenames)):
2475            return self.output_filenames[0]
2476
2477        #
2478        # sort by jobs so it is just a weeny little bit less deterministic
2479        #
2480        return sorted(self.output_filenames, key=lambda x: str(x))
2481
2482    # _________________________________________________________________________
2483
2484    #   _completed
2485    #
2486    #       All logging logic moved to caller site
2487    # _________________________________________________________________________
2488    def _completed(self):
2489        """
2490        called even when all jobs are up to date
2491        """
2492        if not self.is_active:
2493            self.output_filenames = None
2494            return
2495
2496        for f in self.posttask_functions:
2497            f()
2498
2499        #
2500        #   indeterminate output. Check actual output again if someother tasks
2501        #       job function depend on it
2502        #       used for @split
2503        #
2504        if self.indeterminate_output:
2505            self.output_filenames = None
2506
2507    # _________________________________________________________________________
2508
2509    #   _handle_tasks_globs_in_inputs
2510
2511    # _________________________________________________________________________
2512    def _handle_tasks_globs_in_inputs(self, input_params, modify_inputs_mode):
2513        """
2514        Helper function for tasks which
2515            1) Notes globs and tasks
2516            2) Replaces tasks names and functions with actual tasks
2517            3) Adds task dependencies automatically via task_follows
2518
2519            modify_inputs_mode = results["modify_inputs_mode"] =
2520                t_extra_inputs.ADD_TO_INPUTS | REPLACE_INPUTS |
2521                               KEEP_INPUTS | KEEP_OUTPUTS
2522        """
2523        # DEBUGGG
2524        # print("    task._handle_tasks_globs_in_inputs start %s" % (self._get_display_name(), ), file = sys.stderr)
2525        #
2526        # get list of function/function names and globs
2527        #
2528        function_or_func_names, globs, runtime_data_names = get_nested_tasks_or_globs(input_params)
2529
2530        #
2531        # replace function / function names with tasks
2532        #
2533        if modify_inputs_mode == t_extra_inputs.ADD_TO_INPUTS:
2534            description_with_args_placeholder = \
2535                self.description_with_args_placeholder % "add_inputs = add_inputs(%r)"
2536        elif modify_inputs_mode == t_extra_inputs.REPLACE_INPUTS:
2537            description_with_args_placeholder = \
2538                self.description_with_args_placeholder % "replace_inputs = add_inputs(%r)"
2539        elif modify_inputs_mode == t_extra_inputs.KEEP_OUTPUTS:
2540            description_with_args_placeholder = \
2541                self.description_with_args_placeholder % "output =%r"
2542        else:  # t_extra_inputs.KEEP_INPUTS
2543            description_with_args_placeholder = \
2544                self.description_with_args_placeholder % "input =%r"
2545
2546        tasks = self._connect_parents(
2547            description_with_args_placeholder, True, function_or_func_names)
2548        functions_to_tasks = dict()
2549        for funct_name_task_or_pipeline, task in zip(function_or_func_names, tasks):
2550            if isinstance(funct_name_task_or_pipeline, Pipeline):
2551                functions_to_tasks["PIPELINE=%s=PIPELINE" %
2552                                   funct_name_task_or_pipeline.name] = task
2553            else:
2554                functions_to_tasks[funct_name_task_or_pipeline] = task
2555
2556        # replace strings, tasks, pipelines with tasks
2557        input_params = replace_placeholders_with_tasks_in_input_params(
2558            input_params, functions_to_tasks)
2559        # DEBUGGG
2560        #print("    task._handle_tasks_globs_in_inputs finish %s" % (self._get_display_name(), ), file = sys.stderr)
2561        return t_params_tasks_globs_run_time_data(input_params, tasks, globs, runtime_data_names)
2562
2563    def _choose_file_names_transform(self, parsed_args,
2564                                     valid_tags=(regex, suffix, formatter)):
2565        """
2566        shared code for subdivide, transform, product etc for choosing method
2567        for transform input file to output files
2568        """
2569        file_name_transform_tag = parsed_args["filter"]
2570        valid_tag_names = []
2571        # regular expression match
2572        if (regex in valid_tags):
2573            valid_tag_names.append("regex()")
2574            if isinstance(file_name_transform_tag, regex):
2575                return t_regex_file_names_transform(self,
2576                                                    file_name_transform_tag,
2577                                                    self.error_type,
2578                                                    self.syntax)
2579
2580        # simulate end of string (suffix) match
2581        if (suffix in valid_tags):
2582            valid_tag_names.append("suffix()")
2583            if isinstance(file_name_transform_tag, suffix):
2584                output_dir = parsed_args["output_dir"] if "output_dir" in parsed_args else [
2585                ]
2586                return t_suffix_file_names_transform(self,
2587                                                     file_name_transform_tag,
2588                                                     self.error_type,
2589                                                     self.syntax,
2590                                                     output_dir)
2591        # new style string.format()
2592        if (formatter in valid_tags):
2593            valid_tag_names.append("formatter()")
2594            if isinstance(file_name_transform_tag, formatter):
2595                return t_formatter_file_names_transform(self,
2596                                                        file_name_transform_tag,
2597                                                        self.error_type,
2598                                                        self.syntax)
2599
2600        raise self.error_type(self,
2601                              "%s expects one of %s as the second argument"
2602                              % (self.syntax, ", ".join(valid_tag_names)))
2603
2604    #       task handlers
2605    #         sets
2606    #               1) action_type
2607    #               2) param_generator_func
2608    #               3) needs_update_func
2609    #               4) job wrapper
2610    def _do_nothing_setup(self):
2611        """
2612        Task is already set up: do nothing
2613        """
2614        return set()
2615
2616    #       originate does have an Input param.
2617    #       It is just None (and not set-able)
2618    def _decorator_originate(self, *unnamed_args, **named_args):
2619        """
2620        @originate
2621        """
2622        self.syntax = "@originate"
2623        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
2624                                                                  self._get_decorated_function())
2625        self._prepare_originate(unnamed_args, named_args)
2626
2627        # originate
2628        # self.has_input_param        = True
2629
2630    def _prepare_originate(self, unnamed_args, named_args):
2631        """
2632        Common function for pipeline.originate and @originate
2633        """
2634        self.error_type = ruffus_exceptions.error_task_originate
2635        self._set_action_type(Task._action_task_originate)
2636        self._setup_task_func = Task._originate_setup
2637        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2638        self.job_wrapper = job_wrapper_output_files
2639        self.job_descriptor = io_files_one_to_many_job_descriptor
2640        self.single_multi_io = self._many_to_many
2641        # output is not a glob
2642        self.indeterminate_output = 0
2643
2644        self.parsed_args = parse_task_arguments(unnamed_args, named_args, ["output", "extras"],
2645                                                self.description_with_args_placeholder)
2646
2647    def _originate_setup(self):
2648        """
2649        Finish setting up originate
2650        """
2651        #
2652        # If self.parsed_args["output"] is a single item (e.g. file name),
2653        # that will be treated as a list
2654        # Each item in the list of these will be called as an output in a
2655        #   separate function call
2656        #
2657        output_params = self.parsed_args["output"]
2658        if not non_str_sequence(output_params):
2659            output_params = [output_params]
2660
2661        #
2662        #   output globs will be replaced with files. But there should not be
2663        #       tasks here!
2664        #
2665        list_output_files_task_globs = [self._handle_tasks_globs_in_inputs(
2666                                        oo, t_extra_inputs.KEEP_INPUTS) for oo in output_params]
2667        for oftg in list_output_files_task_globs:
2668            if len(oftg.tasks):
2669                raise self.error_type(self, "%s cannot output to another "
2670                                      "task. Do not include tasks in "
2671                                      "output parameters." % self.syntax)
2672
2673        self.param_generator_func = originate_param_factory(list_output_files_task_globs,
2674                                                            *self.parsed_args["extras"])
2675        return set()
2676
2677    def _decorator_transform(self, *unnamed_args, **named_args):
2678        """
2679        @originate
2680        """
2681        self.syntax = "@transform"
2682        self.description_with_args_placeholder = "%s(%%s)\n%s" % (
2683            self.syntax, self._get_decorated_function())
2684        self._prepare_transform(unnamed_args, named_args)
2685
2686    def _prepare_transform(self, unnamed_args, named_args):
2687        """
2688        Common function for pipeline.transform and @transform
2689        """
2690        self.error_type = ruffus_exceptions.error_task_transform
2691        self._set_action_type(Task._action_task_transform)
2692        self._setup_task_func = Task._transform_setup
2693        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2694        self.job_wrapper = job_wrapper_io_files
2695        self.job_descriptor = io_files_job_descriptor
2696        self.single_multi_io = self._many_to_many
2697
2698        #   Parse named and unnamed arguments
2699        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
2700                                                ["input", "filter", "modify_inputs",
2701                                                 "output", "extras", "output_dir"],
2702                                                self.description_with_args_placeholder)
2703
2704    def _transform_setup(self):
2705        """
2706        Finish setting up transform
2707        """
2708        # DEBUGGG
2709        # print("   task._transform_setup start %s" % (self._get_display_name(), ), file = sys.stderr)
2710        # replace function / function names with tasks
2711        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
2712                                                                    t_extra_inputs.KEEP_INPUTS)
2713        ancestral_tasks = set(input_files_task_globs.tasks)
2714
2715        #       _single_job_single_output is bad policy. Can we remove it?
2716        #       What does this actually mean in Ruffus semantics?
2717        #
2718        #   allows transform to take a single file or task
2719        if input_files_task_globs.single_file_to_list():
2720            self._is_single_job_single_output = self._single_job_single_output
2721
2722        #
2723        #   whether transform generates a list of jobs or not will depend on
2724        #       the parent task
2725        #
2726        elif isinstance(input_files_task_globs.params, Task):
2727            self._is_single_job_single_output = input_files_task_globs.params
2728
2729        # how to transform input to output file name
2730        file_names_transform = self._choose_file_names_transform(
2731            self.parsed_args)
2732
2733        modify_inputs = self.parsed_args["modify_inputs"]
2734        if modify_inputs is not None:
2735            modify_inputs = self._handle_tasks_globs_in_inputs(
2736                modify_inputs, self.parsed_args["modify_inputs_mode"])
2737            ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks)
2738
2739        self.param_generator_func = transform_param_factory(input_files_task_globs,
2740                                                            file_names_transform,
2741                                                            modify_inputs,
2742                                                            self.parsed_args["modify_inputs_mode"],
2743                                                            self.parsed_args["output"],
2744                                                            *self.parsed_args["extras"])
2745
2746        # DEBUGGG
2747        #print("   task._transform_setup finish %s" % (self._get_display_name(), ), file = sys.stderr)
2748        return ancestral_tasks
2749
2750    def _decorator_subdivide(self, *unnamed_args, **named_args):
2751        """
2752        @subdivide
2753        """
2754        self.syntax = "@subdivide"
2755        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
2756                                                                  self._get_decorated_function())
2757        self._prepare_subdivide(unnamed_args, named_args)
2758
2759    def _prepare_subdivide(self, unnamed_args, named_args):
2760        """
2761            Common code for @subdivide and pipeline.subdivide
2762            @split can also end up here
2763        """
2764        self.error_type = ruffus_exceptions.error_task_subdivide
2765        self._set_action_type(Task._action_task_subdivide)
2766        self._setup_task_func = Task._subdivide_setup
2767        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2768        self.job_wrapper = job_wrapper_io_files
2769        self.job_descriptor = io_files_one_to_many_job_descriptor
2770        self.single_multi_io = self._many_to_many
2771        # output is a glob
2772        self.indeterminate_output = 2
2773
2774        #
2775        #   Parse named and unnamed arguments
2776        #
2777        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
2778                                                ["input", "filter", "modify_inputs",
2779                                                 "output", "extras", "output_dir"],
2780                                                self.description_with_args_placeholder)
2781
2782    def _subdivide_setup(self):
2783        """
2784        Finish setting up subdivide
2785        """
2786
2787        #
2788        # replace function / function names with tasks
2789        #
2790        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
2791                                                                    t_extra_inputs.KEEP_INPUTS)
2792
2793        #   allows split to take a single file or task
2794        input_files_task_globs.single_file_to_list()
2795
2796        ancestral_tasks = set(input_files_task_globs.tasks)
2797
2798        # how to transform input to output file name
2799        file_names_transform = self._choose_file_names_transform(
2800            self.parsed_args)
2801
2802        modify_inputs = self.parsed_args["modify_inputs"]
2803        if modify_inputs is not None:
2804            modify_inputs = self._handle_tasks_globs_in_inputs(
2805                modify_inputs, self.parsed_args["modify_inputs_mode"])
2806            ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks)
2807
2808        #
2809        #   output globs will be replaced with files.
2810        #       But there should not be tasks here!
2811        #
2812        output_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["output"],
2813                                                                     t_extra_inputs.KEEP_OUTPUTS)
2814        if len(output_files_task_globs.tasks):
2815            raise self.error_type(self, ("%s cannot output to another task. Do not include tasks "
2816                                         "in output parameters.") % self.syntax)
2817
2818        self.param_generator_func = subdivide_param_factory(input_files_task_globs,
2819                                                            # False, #
2820                                                            # flatten input
2821                                                            # removed
2822                                                            file_names_transform,
2823                                                            modify_inputs,
2824                                                            self.parsed_args["modify_inputs_mode"],
2825                                                            output_files_task_globs,
2826                                                            *self.parsed_args["extras"])
2827        return ancestral_tasks
2828
2829    def _decorator_split(self, *unnamed_args, **named_args):
2830        """
2831        @split
2832        """
2833        self.syntax = "@split"
2834        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
2835                                                                  self._get_decorated_function())
2836
2837        #
2838        #   This is actually @subdivide
2839        #
2840        if isinstance(unnamed_args[1], regex):
2841            self._prepare_subdivide(unnamed_args, named_args,
2842                                    self.description_with_args_placeholder)
2843
2844        #
2845        #   This is actually @split
2846        #
2847        else:
2848            self._prepare_split(unnamed_args, named_args)
2849
2850    def _prepare_split(self, unnamed_args, named_args):
2851        """
2852        Common code for @split and pipeline.split
2853        """
2854        self.error_type = ruffus_exceptions.error_task_split
2855        self._set_action_type(Task._action_task_split)
2856        self._setup_task_func = Task._split_setup
2857        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2858        self.job_wrapper = job_wrapper_io_files
2859        self.job_descriptor = io_files_one_to_many_job_descriptor
2860        self.single_multi_io = self._one_to_many
2861        # output is a glob
2862        self.indeterminate_output = 1
2863
2864        #
2865        #   Parse named and unnamed arguments
2866        #
2867        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
2868                                                ["input", "output", "extras"],
2869                                                self.description_with_args_placeholder)
2870
2871    def _split_setup(self):
2872        """
2873        Finish setting up split
2874        """
2875
2876        #
2877        # replace function / function names with tasks
2878        #
2879        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
2880                                                                    t_extra_inputs.KEEP_INPUTS)
2881
2882        #
2883        #   output globs will be replaced with files.
2884        #       But there should not be tasks here!
2885        #
2886        output_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["output"],
2887                                                                     t_extra_inputs.KEEP_OUTPUTS)
2888        if len(output_files_task_globs.tasks):
2889            raise self.error_type(self, "%s cannot output to another task. "
2890                                        "Do not include tasks in output "
2891                                        "parameters." % self.syntax)
2892
2893        self.param_generator_func = split_param_factory(input_files_task_globs,
2894                                                        output_files_task_globs,
2895                                                        *self.parsed_args["extras"])
2896        return set(input_files_task_globs.tasks)
2897
2898    def _decorator_merge(self, *unnamed_args, **named_args):
2899        """
2900        @merge
2901        """
2902        self.syntax = "@merge"
2903        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
2904                                                                  self._get_decorated_function())
2905        self._prepare_merge(unnamed_args, named_args)
2906
2907    def _prepare_merge(self, unnamed_args, named_args):
2908        """
2909        Common code for @merge and pipeline.merge
2910        """
2911        self.error_type = ruffus_exceptions.error_task_merge
2912        self._set_action_type(Task._action_task_merge)
2913        self._setup_task_func = Task._merge_setup
2914        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2915        self.job_wrapper = job_wrapper_io_files
2916        self.job_descriptor = io_files_job_descriptor
2917        self.single_multi_io = self._many_to_one
2918        self._is_single_job_single_output = self._single_job_single_output
2919
2920        #
2921        #   Parse named and unnamed arguments
2922        #
2923        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
2924                                                ["input", "output", "extras"],
2925                                                self.description_with_args_placeholder)
2926
2927    def _merge_setup(self):
2928        """
2929        Finish setting up merge
2930        """
2931        #
2932        # replace function / function names with tasks
2933        #
2934        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
2935                                                                    t_extra_inputs.KEEP_INPUTS)
2936
2937        self.param_generator_func = merge_param_factory(input_files_task_globs,
2938                                                        self.parsed_args["output"],
2939                                                        *self.parsed_args["extras"])
2940        return set(input_files_task_globs.tasks)
2941
2942    def _decorator_collate(self, *unnamed_args, **named_args):
2943        """
2944        @collate
2945        """
2946        self.syntax = "@collate"
2947        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
2948                                                                  self._get_decorated_function())
2949        self._prepare_collate(unnamed_args, named_args)
2950
2951    def _prepare_collate(self, unnamed_args, named_args):
2952        """
2953        Common code for @collate and pipeline.collate
2954        """
2955        self.error_type = ruffus_exceptions.error_task_collate
2956        self._set_action_type(Task._action_task_collate)
2957        self._setup_task_func = Task._collate_setup
2958        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
2959        self.job_wrapper = job_wrapper_io_files
2960        self.job_descriptor = io_files_job_descriptor
2961        self.single_multi_io = self._many_to_many
2962
2963        #
2964        #   Parse named and unnamed arguments
2965        #
2966        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
2967                                                ["input", "filter", "modify_inputs",
2968                                                 "output", "extras"],
2969                                                self.description_with_args_placeholder)
2970
2971    def _collate_setup(self):
2972        """
2973        Finish setting up collate
2974        """
2975
2976        #
2977        # replace function / function names with tasks
2978        #
2979        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
2980                                                                    t_extra_inputs.KEEP_INPUTS)
2981        ancestral_tasks = set(input_files_task_globs.tasks)
2982
2983        # how to transform input to output file name
2984        file_names_transform = self._choose_file_names_transform(self.parsed_args,
2985                                                                 (regex, formatter))
2986
2987        modify_inputs = self.parsed_args["modify_inputs"]
2988        if modify_inputs is not None:
2989            modify_inputs = self._handle_tasks_globs_in_inputs(
2990                modify_inputs, self.parsed_args["modify_inputs_mode"])
2991            ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks)
2992
2993        self.param_generator_func = collate_param_factory(input_files_task_globs,
2994                                                          # False, #
2995                                                          # flatten input
2996                                                          # removed
2997                                                          file_names_transform,
2998                                                          modify_inputs,
2999                                                          self.parsed_args["modify_inputs_mode"],
3000                                                          self.parsed_args["output"],
3001                                                          *self.parsed_args["extras"])
3002
3003        return ancestral_tasks
3004
3005    def _decorator_mkdir(self, *unnamed_args, **named_args):
3006        """
3007        @mkdir
3008        """
3009        syntax = "@mkdir"
3010        description_with_args_placeholder = "%s(%%s)\n%s" % (
3011            self.syntax, (self.description_with_args_placeholder % "..."))
3012        self._prepare_preceding_mkdir(unnamed_args, named_args, syntax,
3013                                      description_with_args_placeholder)
3014
3015    def mkdir(self, *unnamed_args, **named_args):
3016        """
3017        Make missing directories, including intermediates, before this task
3018        """
3019        syntax = "Task(name = %s).mkdir" % self._name
3020        description_with_args_placeholder = "%s(%%s)" % (self.syntax)
3021        self._prepare_preceding_mkdir(unnamed_args, named_args, syntax,
3022                                      description_with_args_placeholder)
3023        return self
3024
3025    def _prepare_preceding_mkdir(self, unnamed_args, named_args, syntax,
3026                                 task_description, defer=True):
3027        """
3028        Add mkdir Task to run before self
3029            Common to
3030                Task.mkdir
3031                @mkdir
3032                @follows(..., mkdir())
3033        """
3034        #
3035        #   Create a new Task with a unique name to this instance of mkdir
3036        #
3037        self.cnt_task_mkdir += 1
3038        cnt_task_mkdir_str = (
3039            " #%d" % self.cnt_task_mkdir) if self.cnt_task_mkdir > 1 else ""
3040        task_name = r"mkdir%r%s   before %s " % (
3041            unnamed_args, cnt_task_mkdir_str, self._name)
3042        task_name = task_name.replace(",)", ")").replace(",", ",  ")
3043        new_task = self.pipeline._create_task(
3044            task_func=job_wrapper_mkdir, task_name=task_name)
3045
3046        #   defer _add_parent so we can clone unless we are already
3047        #       calling add_parent (from _connect_parents())
3048        if defer:
3049            self.deferred_follow_params.append(
3050                [task_description, False, [new_task]])
3051
3052        #
3053        #   Prepare new node
3054        #
3055        new_task.syntax = syntax
3056        new_task._prepare_mkdir(unnamed_args, named_args, task_description)
3057
3058        #
3059        #   Hack:
3060        #       If the task name is too ugly,
3061        #       we can override it for flowchart printing using the
3062        #       display_name
3063        #
3064        # new_node.display_name = ??? new_node.func_description
3065        return new_task
3066
3067    def _prepare_mkdir(self, unnamed_args, named_args, task_description):
3068
3069        self.error_type = ruffus_exceptions.error_task_mkdir
3070        self._set_action_type(Task._action_mkdir)
3071        self.needs_update_func = self.needs_update_func or needs_update_check_directory_missing
3072        self.job_wrapper = job_wrapper_mkdir
3073        self.job_descriptor = mkdir_job_descriptor
3074
3075        # doesn't have a real function
3076        #  use job_wrapper just so it is not None
3077        self.user_defined_work_func = self.job_wrapper
3078
3079        #
3080        # @transform like behaviour with regex / suffix or formatter
3081        #
3082        if (len(unnamed_args) > 1 and
3083                isinstance(unnamed_args[1], (formatter, suffix, regex))) or "filter" in named_args:
3084            self.single_multi_io = self._many_to_many
3085            self._setup_task_func = Task._transform_setup
3086
3087            #
3088            #   Parse named and unnamed arguments
3089            #
3090            self.parsed_args = parse_task_arguments(unnamed_args, named_args,
3091                                                    ["input", "filter", "modify_inputs",
3092                                                     "output", "output_dir", "extras"], task_description)
3093
3094        #
3095        # simple behaviour: just make directories in list of strings
3096        #
3097        # the mkdir decorator accepts one string, multiple strings or a list of strings
3098        else:
3099
3100            #
3101            # override funct description normally parsed from func.__doc__
3102            #   "Make missing directories including any intermediate
3103            #   directories on the specified path(s)"
3104            #
3105            self.func_description = "Make missing directories %s" % (
3106                shorten_filenames_encoder(unnamed_args, 0))
3107
3108            self.single_multi_io = self._one_to_one
3109            self._setup_task_func = Task._do_nothing_setup
3110            self.has_input_param = False
3111
3112            #
3113            #
3114            #
3115            # if a single argument collection of parameters, keep that as is
3116            if len(unnamed_args) == 0:
3117                self.parsed_args["output"] = []
3118            elif len(unnamed_args) > 1:
3119                self.parsed_args["output"] = unnamed_args
3120            # len(unnamed_args) == 1: unpack unnamed_args[0]
3121            elif non_str_sequence(unnamed_args[0]):
3122                self.parsed_args["output"] = unnamed_args[0]
3123            # single string or other non collection types
3124            else:
3125                self.parsed_args["output"] = unnamed_args
3126
3127            #   all directories created in one job to reduce race conditions
3128            #    so we are converting [a,b,c] into [   [(a, b,c)]   ]
3129            #    where unnamed_args = (a,b,c)
3130            # i.e. one job whose solitory argument is a tuple/list of directory
3131            # names
3132            self.param_generator_func = args_param_factory(
3133                [[sorted(self.parsed_args["output"], key=lambda x: str(x))]])
3134
3135            # print ("mkdir %s" % (self.func_description), file = sys.stderr)
3136
3137    def _decorator_product(self, *unnamed_args, **named_args):
3138        """
3139        @product
3140        """
3141        self.syntax = "@product"
3142        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3143                                                                  self._get_decorated_function())
3144        self._prepare_product(unnamed_args, named_args)
3145
3146    def _prepare_product(self, unnamed_args, named_args):
3147        """
3148        Common code for @product and pipeline.product
3149        """
3150        self.error_type = ruffus_exceptions.error_task_product
3151        self._set_action_type(Task._action_task_product)
3152        self._setup_task_func = Task._product_setup
3153        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
3154        self.job_wrapper = job_wrapper_io_files
3155        self.job_descriptor = io_files_job_descriptor
3156        self.single_multi_io = self._many_to_many
3157
3158        #
3159        #   Parse named and unnamed arguments
3160        #
3161        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
3162                                                ["input", "filter", "inputN", "modify_inputs",
3163                                                 "output", "extras"],
3164                                                self.description_with_args_placeholder)
3165
3166    def _product_setup(self):
3167        """
3168        Finish setting up product
3169        """
3170        #
3171        # replace function / function names with tasks
3172        #
3173        list_input_files_task_globs = [self._handle_tasks_globs_in_inputs(ii,
3174                                                                          t_extra_inputs.KEEP_INPUTS)
3175                                       for ii in self.parsed_args["input"]]
3176        ancestral_tasks = set()
3177        for input_files_task_globs in list_input_files_task_globs:
3178            ancestral_tasks = ancestral_tasks.union(
3179                input_files_task_globs.tasks)
3180
3181        # how to transform input to output file name
3182        file_names_transform = t_nested_formatter_file_names_transform(self,
3183                                                                       self.parsed_args["filter"],
3184                                                                       self.error_type,
3185                                                                       self.syntax)
3186
3187        modify_inputs = self.parsed_args["modify_inputs"]
3188        if modify_inputs is not None:
3189            modify_inputs = self._handle_tasks_globs_in_inputs(
3190                modify_inputs, self.parsed_args["modify_inputs_mode"])
3191            ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks)
3192
3193        self.param_generator_func = product_param_factory(list_input_files_task_globs,
3194                                                          # False, #
3195                                                          # flatten input
3196                                                          # removed
3197                                                          file_names_transform,
3198                                                          modify_inputs,
3199                                                          self.parsed_args["modify_inputs_mode"],
3200                                                          self.parsed_args["output"],
3201                                                          *self.parsed_args["extras"])
3202
3203        return ancestral_tasks
3204
3205    def _decorator_permutations(self, *unnamed_args, **named_args):
3206        """
3207        @permutations
3208        """
3209        self.syntax = "@permutations"
3210        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3211                                                                  self._get_decorated_function())
3212        self._prepare_combinatorics(
3213            unnamed_args, named_args, ruffus_exceptions.error_task_permutations)
3214
3215    def _decorator_combinations(self, *unnamed_args, **named_args):
3216        """
3217        @combinations
3218        """
3219        self.syntax = "@combinations"
3220        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3221                                                                  self._get_decorated_function())
3222        self._prepare_combinatorics(
3223            unnamed_args, named_args, ruffus_exceptions.error_task_combinations)
3224
3225    def _decorator_combinations_with_replacement(self, *unnamed_args,
3226                                                 **named_args):
3227        """
3228        @combinations_with_replacement
3229        """
3230        self.syntax = "@combinations_with_replacement"
3231        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3232                                                                  self._get_decorated_function())
3233        self._prepare_combinatorics(unnamed_args, named_args,
3234                                    ruffus_exceptions.error_task_combinations_with_replacement)
3235
3236    def _prepare_combinatorics(self, unnamed_args, named_args, error_type):
3237        """
3238        Common code for
3239            @permutations and pipeline.permutations
3240            @combinations and pipeline.combinations
3241            @combinations_with_replacement and
3242                pipeline.combinations_with_replacement
3243        """
3244        self.error_type = error_type
3245        self._setup_task_func = Task._combinatorics_setup
3246        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
3247        self.job_wrapper = job_wrapper_io_files
3248        self.job_descriptor = io_files_job_descriptor
3249        self.single_multi_io = self._many_to_many
3250
3251        #
3252        #   Parse named and unnamed arguments
3253        #
3254        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
3255                                                ["input", "filter", "tuple_size",
3256                                                 "modify_inputs", "output", "extras"],
3257                                                self.description_with_args_placeholder)
3258
3259    def _combinatorics_setup(self):
3260        """
3261            Finish setting up combinatorics
3262        """
3263        #
3264        # replace function / function names with tasks
3265        #
3266        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
3267                                                                    t_extra_inputs.KEEP_INPUTS)
3268        ancestral_tasks = set(input_files_task_globs.tasks)
3269
3270        # how to transform input to output file name: len(k-tuples) of
3271        # (identical) formatters
3272        file_names_transform = t_nested_formatter_file_names_transform(
3273            self, [self.parsed_args["filter"]] *
3274            self.parsed_args["tuple_size"],
3275            self.error_type, self.syntax)
3276
3277        modify_inputs = self.parsed_args["modify_inputs"]
3278        if modify_inputs is not None:
3279            modify_inputs = self._handle_tasks_globs_in_inputs(
3280                modify_inputs, self.parsed_args["modify_inputs_mode"])
3281            ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks)
3282
3283        # we are not going to specify what type of combinatorics this is twice:
3284        #       just look up from our error type
3285        error_type_to_combinatorics_type = {
3286            ruffus_exceptions.error_task_combinations_with_replacement:
3287            t_combinatorics_type.COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT,
3288            ruffus_exceptions.error_task_combinations:
3289            t_combinatorics_type.COMBINATORICS_COMBINATIONS,
3290            ruffus_exceptions.error_task_permutations:
3291            t_combinatorics_type.COMBINATORICS_PERMUTATIONS
3292        }
3293
3294        self.param_generator_func = \
3295            combinatorics_param_factory(input_files_task_globs,
3296                                        # False, #
3297                                        # flatten
3298                                        # input
3299                                        # removed
3300                                        error_type_to_combinatorics_type[
3301                                            self.error_type],
3302                                        self.parsed_args["tuple_size"],
3303                                        file_names_transform,
3304                                        modify_inputs,
3305                                        self.parsed_args["modify_inputs_mode"],
3306                                        self.parsed_args["output"],
3307                                        *self.parsed_args["extras"])
3308
3309        return ancestral_tasks
3310
3311    def _decorator_files(self, *unnamed_args, **named_args):
3312        """
3313        @files
3314        """
3315        self.syntax = "@files"
3316        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3317                                                                  self._get_decorated_function())
3318        self._prepare_files(unnamed_args, named_args)
3319
3320    def _prepare_files(self, unnamed_args, named_args):
3321        """
3322        Common code for @files and pipeline.files
3323        """
3324        self.error_type = ruffus_exceptions.error_task_files
3325        self._setup_task_func = Task._do_nothing_setup
3326        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
3327        self.job_wrapper = job_wrapper_io_files
3328        self.job_descriptor = io_files_job_descriptor
3329
3330        if len(unnamed_args) == 0:
3331            raise ruffus_exceptions.error_task_files(self, "Too few arguments for @files")
3332
3333        #   Use parameters generated by a custom function
3334        if len(unnamed_args) == 1 and isinstance(unnamed_args[0],
3335                                                 Callable):
3336
3337            self._set_action_type(Task._action_task_files_func)
3338            self.param_generator_func = files_custom_generator_param_factory(
3339                unnamed_args[0])
3340
3341            # assume
3342            self.single_multi_io = self._many_to_many
3343
3344        #   Use parameters in supplied list
3345        else:
3346            self._set_action_type(Task._action_task_files)
3347
3348            if len(unnamed_args) > 1:
3349
3350                # single jobs
3351                # This is true even if the previous task has multiple output
3352                # These will all be joined together at the hip (like @merge)
3353                # If you want different behavior, use @transform
3354                params = copy.copy([unnamed_args])
3355                self._is_single_job_single_output = self._single_job_single_output
3356                self.single_multi_io = self._one_to_one
3357
3358            else:
3359
3360                # multiple jobs with input/output parameters etc.
3361                params = copy.copy(unnamed_args[0])
3362                self._is_single_job_single_output = self._multiple_jobs_outputs
3363                self.single_multi_io = self._many_to_many
3364
3365            check_files_io_parameters(self, params, ruffus_exceptions.error_task_files)
3366
3367            self.parsed_args["input"] = [pp[0] for pp in params]
3368            self.parsed_args["output"] = [tuple(pp[1:]) for pp in params]
3369            self._setup_task_func = Task._files_setup
3370
3371    def _files_setup(self):
3372        """
3373            Finish setting up @files
3374        """
3375        #
3376        # replace function / function names with tasks
3377        #
3378        input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"],
3379                                                                    t_extra_inputs.KEEP_INPUTS)
3380
3381        self.param_generator_func = files_param_factory(input_files_task_globs,
3382                                                        True,
3383                                                        self.parsed_args["output"])
3384        return set(input_files_task_globs.tasks)
3385
3386    def _decorator_parallel(self, *unnamed_args, **named_args):
3387        """
3388        @parallel
3389        """
3390        self.syntax = "@parallel"
3391        self._prepare_parallel(unnamed_args, named_args)
3392
3393    def _prepare_parallel(self, unnamed_args, named_args):
3394        """
3395        Common code for @parallel and pipeline.parallel
3396        """
3397        self.error_type = ruffus_exceptions.error_task_parallel
3398        self._set_action_type(Task._action_task_parallel)
3399        self._setup_task_func = Task._do_nothing_setup
3400        # self.needs_update_func = None
3401        self.job_wrapper = job_wrapper_generic
3402        self.job_descriptor = io_files_job_descriptor
3403
3404        if len(unnamed_args) == 0:
3405            raise ruffus_exceptions.error_task_parallel(self, "Too few arguments for @parallel")
3406
3407        #   Use parameters generated by a custom function
3408        if len(unnamed_args) == 1 and isinstance(unnamed_args[0],
3409                                                 Callable):
3410            self.param_generator_func = args_param_factory(unnamed_args[0]())
3411
3412        # list of  params
3413        else:
3414            if len(unnamed_args) > 1:
3415                # single jobs
3416                params = copy.copy([unnamed_args])
3417                self._is_single_job_single_output = self._single_job_single_output
3418            else:
3419                # multiple jobs with input/output parameters etc.
3420                params = copy.copy(unnamed_args[0])
3421                check_parallel_parameters(self, params, ruffus_exceptions.error_task_parallel)
3422
3423            self.param_generator_func = args_param_factory(params)
3424
3425    def _decorator_files_re(self, *unnamed_args, **named_args):
3426        """
3427        @files_re
3428
3429        calls user function in parallel
3430            with input_files, output_files, parameters
3431            These needed to be generated on the fly by
3432                getting all file names in the supplied list/glob pattern
3433            There are two variations:
3434
3435            1)    inputfiles = all files in glob which match the regular
3436                                expression
3437                  outputfile = generated from the replacement string
3438
3439            2)    inputfiles = all files in glob which match the regular
3440                                    expression and generated from the "from"
3441                                    replacement string
3442                  outputfiles = all files in glob which match the regular
3443                                    expression and generated from the "to"
3444                                    replacement string
3445        """
3446        self.syntax = "@files_re"
3447        self.error_type = ruffus_exceptions.error_task_files_re
3448        self._set_action_type(Task._action_task_files_re)
3449        self.needs_update_func = self.needs_update_func or needs_update_check_modify_time
3450        self.job_wrapper = job_wrapper_io_files
3451        self.job_descriptor = io_files_job_descriptor
3452        self.single_multi_io = self._many_to_many
3453
3454        if len(unnamed_args) < 3:
3455            raise self.error_type(self, "Too few arguments for @files_re")
3456
3457        # 888888888888888888888888888888888888888888888888888888888888888888888
3458
3459        #   !! HERE BE DRAGONS !!
3460
3461        #       Legacy, deprecated parameter handling depending on positions
3462        #           and not even on type
3463
3464        # check if parameters wrapped in combine
3465        combining_all_jobs, unnamed_args = is_file_re_combining(unnamed_args)
3466
3467        # second parameter is always regex()
3468        unnamed_args[1] = regex(unnamed_args[1])
3469
3470        # third parameter is inputs() if there is a four and fifth parameter...
3471        # That means if you want "extra" parameters, you always need inputs()
3472        if len(unnamed_args) > 3:
3473            unnamed_args[2] = inputs(unnamed_args[2])
3474
3475        # 888888888888888888888888888888888888888888888888888888888888888888888
3476
3477        self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax,
3478                                                                  self._get_decorated_function())
3479        self.parsed_args = parse_task_arguments(unnamed_args, named_args,
3480                                                ["input", "filter", "modify_inputs",
3481                                                 "output", "extras"],
3482                                                self.description_with_args_placeholder)
3483
3484        if combining_all_jobs:
3485            self._setup_task_func = Task._collate_setup
3486        else:
3487            self._setup_task_func = Task._transform_setup
3488
3489    # 8888888888888888888888888888888888888888888888888888888888888888888888888
3490
3491    #   Task functions
3492
3493    #       follows
3494    #       check_if_uptodate
3495    #       posttask
3496    #       jobs_limit
3497    #       active_if
3498    #       graphviz
3499
3500    # 8888888888888888888888888888888888888888888888888888888888888888888888888
3501    def follows(self, *unnamed_args, **named_args):
3502        """
3503        Specifies a preceding task / action which this task will follow.
3504        The preceding task can be specified as a string or function or Task
3505        object.
3506        A task can also follow the making of one or more directories:
3507
3508        task.follows(mkdir("my_dir"))
3509
3510        """
3511        description_with_args_placeholder = (
3512            self.description_with_args_placeholder % "...") + ".follows(%r)"
3513
3514        self.deferred_follow_params.append([description_with_args_placeholder, False,
3515                                            unnamed_args])
3516        # self._connect_parents(description_with_args_placeholder, False,
3517        #                 unnamed_args)
3518        return self
3519
3520    def _decorator_follows(self, *unnamed_args, **named_args):
3521        """
3522        unnamed_args can be string or function or Task
3523        For strings, if lookup fails, will defer.
3524        """
3525        description_with_args_placeholder = "@follows(%r)\n" + (
3526            self.description_with_args_placeholder % "...")
3527        self.deferred_follow_params.append([description_with_args_placeholder, False,
3528                                            unnamed_args])
3529        # self._connect_parents(description_with_args_placeholder, False, unnamed_args)
3530
3531    def _complete_setup(self):
3532        """
3533        Connect up parents if follows was specified and setups up task functions
3534        Returns a set of parent tasks
3535
3536        Note will tear down previous parental links before doing anything
3537        """
3538        # DEBUGGG
3539        # print("  task._complete_setup start %s" % (self._get_display_name(), ), file = sys.stderr)
3540        self._remove_all_parents()
3541        ancestral_tasks = self._deferred_connect_parents()
3542        ancestral_tasks |= self._setup_task_func(self)
3543        if "named_extras" in self.parsed_args:
3544            if self.command_str_callback == "PIPELINE":
3545                self.parsed_args["named_extras"]["__RUFFUS_TASK_CALLBACK__"] = self.pipeline.command_str_callback
3546            else:
3547                self.parsed_args["named_extras"]["__RUFFUS_TASK_CALLBACK__"] = self.command_str_callback
3548        # DEBUGGG
3549        # print("  task._complete_setup finish %s\n" % (self._get_display_name(), ), file = sys.stderr)
3550        return ancestral_tasks
3551
3552    def _deferred_connect_parents(self):
3553        """
3554        Called by _complete_task_setup() from pipeline_run, pipeline_printout etc.
3555        returns a non-redundant list of all the ancestral tasks
3556        """
3557        # DEBUGGG
3558        # print("   task._deferred_connect_parents start %s (%d to do)" % (self._get_display_name(),
3559        # len(self.deferred_follow_params)), file = sys.stderr)
3560        parent_tasks = set()
3561
3562        for ii, deferred_follow_params in enumerate(self.deferred_follow_params):
3563            # DEBUGGG
3564            # print("   task._deferred_connect_parents %s %d out of %d " % (self._get_display_name(),
3565            # ii, len(self.deferred_follow_params)), file = sys.stderr)
3566            new_tasks = self._connect_parents(*deferred_follow_params)
3567            # convert to mkdir and dynamically created tasks from follows into the actual created tasks
3568            # otherwise each time we redo this, we will have a sorceror's apprentice situation!
3569            deferred_follow_params[2] = new_tasks
3570            parent_tasks.update(new_tasks)
3571
3572        # DEBUGGG
3573        # print("   task._deferred_connect_parents finish %s" % self._get_display_name(), file = sys.stderr)
3574        return parent_tasks
3575
3576    #       Deferred tasks will need to be resolved later
3577    #       Because deferred tasks can belong to other pipelines
3578    def _connect_parents(self, description_with_args_placeholder, no_mkdir,
3579                         unnamed_args):
3580        """
3581        unnamed_args can be string or function or Task
3582        For strings, if lookup fails, will defer.
3583
3584        Called from
3585            * task.follows
3586            * @follows
3587            * decorators, e.g. @transform _handle_tasks_globs_in_inputs
3588              (input dependencies)
3589            * pipeline.transform etc. _handle_tasks_globs_in_inputs
3590              (input dependencies)
3591            * @split / pipeline.split _handle_tasks_globs_in_inputs
3592              (output dependencies)
3593        """
3594        # DEBUGGG
3595        #print("      _connect_parents start %s" % self._get_display_name(), file = sys.stderr)
3596        new_tasks = []
3597        for arg in unnamed_args:
3598            #
3599            #   Task
3600            #
3601            if isinstance(arg, Task):
3602                if arg == self:
3603                    raise ruffus_exceptions.error_decorator_args(
3604                        "Cannot have a task as its own (circular) dependency:\n"
3605                        % description_with_args_placeholder % (arg,))
3606
3607                #
3608                #   re-lookup from task name to handle cloning
3609                #
3610                if arg.pipeline.name == self.pipeline.original_name and \
3611                        self.pipeline.original_name != self.pipeline.name:
3612                    tasks = lookup_tasks_from_name(arg._name,
3613                                                   default_pipeline_name=self.pipeline.name,
3614                                                   default_module_name=self.func_module_name)
3615                    new_tasks.extend(tasks)
3616
3617                    if not tasks:
3618                        raise ruffus_exceptions.error_node_not_task(
3619                            "task '%s' '%s::%s' is somehow absent in the cloned pipeline (%s)!%s"
3620                            % (self.pipeline.original_name, arg._name, self.pipeline.name,
3621                               description_with_args_placeholder % (arg._name,)))
3622                else:
3623                    new_tasks.append(arg)
3624
3625            #
3626            #   Pipeline: defer
3627            #
3628            elif isinstance(arg, Pipeline):
3629                if arg == self.pipeline:
3630                    raise ruffus_exceptions.error_decorator_args(
3631                        "Cannot have your own pipeline as a (circular) "
3632                        "dependency of a Task:\n" +
3633                        description_with_args_placeholder % (arg,))
3634
3635                if not len(arg.get_tail_tasks()):
3636                    raise ruffus_exceptions.error_no_tail_tasks(
3637                        "Pipeline '{pipeline_name}' has no 'tail' tasks defined.\nWhich task "
3638                        "in '{pipeline_name}' are you referring to?"
3639                        .format(pipeline_name=arg.name))
3640                new_tasks.extend(arg.get_tail_tasks())
3641
3642            #   specified by string: unicode or otherwise
3643            elif isinstance(arg, path_str_type):
3644                # handle pipeline cloning
3645                task_name = arg.replace(self.pipeline.original_name + "::",
3646                                        self.pipeline.name + "::")
3647
3648                tasks = lookup_tasks_from_name(arg,
3649                                               default_pipeline_name=self.pipeline.name,
3650                                               default_module_name=self.func_module_name)
3651                new_tasks.extend(tasks)
3652
3653                if not tasks:
3654                    raise ruffus_exceptions.error_node_not_task("task '%s' is not a pipelined task in Ruffus. "
3655                                              "Have you mis-spelt the function or task name?\n%s"
3656                                              % (arg, description_with_args_placeholder % (arg,)))
3657
3658            #   for mkdir, automatically generate task with unique name
3659            elif isinstance(arg, mkdir):
3660                if no_mkdir:
3661                    raise ruffus_exceptions.error_decorator_args("Unexpected mkdir() found.\n" +
3662                                                                 description_with_args_placeholder % (arg,))
3663
3664                # syntax for new task doing the mkdir
3665                if self.created_via_decorator:
3666                    mkdir_task_syntax = "@follows(mkdir())"
3667                else:
3668                    mkdir_task_syntax = "Task(name=%r).follows(mkdir())" % self._get_display_name(
3669                    )
3670                mkdir_description_with_args_placeholder = \
3671                    description_with_args_placeholder % "mkdir(%s)"
3672                new_tasks.append(self._prepare_preceding_mkdir(arg.args, {}, mkdir_task_syntax,
3673                                                               mkdir_description_with_args_placeholder, False))
3674
3675            #   Is this a function?
3676            #       Turn this function into a task
3677            #           (add task as attribute of this function)
3678            #       Add self as dependent
3679            elif isinstance(arg, Callable):
3680                task = lookup_unique_task_from_func(
3681                    arg, default_pipeline_name=self.pipeline.name)
3682
3683                # add new task to pipeline if necessary
3684                if not task:
3685                    task = main_pipeline._create_task(task_func=arg)
3686                new_tasks.append(task)
3687
3688            else:
3689                raise ruffus_exceptions.error_decorator_args(
3690                    "Expecting a function or function name or task name or "
3691                    "Task or Pipeline.\n" +
3692                    description_with_args_placeholder % (arg,))
3693
3694        #   add dependency
3695        #       duplicate dependencies are ignore automatically
3696        #
3697        for task in new_tasks:
3698            self._add_parent(task)
3699
3700        # DEBUGGG
3701        # print("      _connect_parents finish %s" % self._get_display_name(), file = sys.stderr)
3702        return new_tasks
3703
3704    def check_if_uptodate(self, func):
3705        """
3706        Specifies how a task is to be checked if it needs to be rerun (i.e. is
3707        up-to-date).
3708        func returns true if input / output files are up to date
3709        func takes as many arguments as the task function
3710        """
3711        if not isinstance(func, Callable):
3712            description_with_args_placeholder = \
3713                (self.description_with_args_placeholder %
3714                 "...") + ".check_if_uptodate(%r)"
3715            raise ruffus_exceptions.error_decorator_args(
3716                "Expected a single function or Callable object in \n" +
3717                description_with_args_placeholder % (func,))
3718        self.needs_update_func = func
3719        return self
3720
3721    def _decorator_check_if_uptodate(self, *args):
3722        """
3723        @check_if_uptodate
3724        """
3725        if len(args) != 1 or not isinstance(args[0], Callable):
3726            description_with_args_placeholder = "@check_if_uptodate(%r)\n" + (
3727                                                self.description_with_args_placeholder % "...")
3728            raise ruffus_exceptions.error_decorator_args(
3729                "Expected a single function or Callable object in \n" +
3730                description_with_args_placeholder % (args,))
3731        self.needs_update_func = args[0]
3732
3733    def posttask(self, *funcs):
3734        """
3735        Takes one or more functions which will be called if the task completes
3736        """
3737        description_with_args_placeholder = ("Expecting simple functions or touch_file() in \n" +
3738                                             (self.description_with_args_placeholder % "...") +
3739                                             ".posttask(%r)")
3740        self._set_posttask(description_with_args_placeholder, *funcs)
3741        return self
3742
3743    def _decorator_posttask(self, *funcs):
3744        """
3745        @posttask
3746        """
3747        description_with_args_placeholder = ("Expecting simple functions or touch_file() in \n" +
3748                                             "@posttask(%r)\n" +
3749                                             (self.description_with_args_placeholder % "..."))
3750        self._set_posttask(description_with_args_placeholder, *funcs)
3751
3752    def _set_posttask(self, description_with_args_placeholder, *funcs):
3753        """
3754        Takes one or more functions which will be called if the task completes
3755        """
3756        for arg in funcs:
3757            if isinstance(arg, touch_file):
3758                self.posttask_functions.append(
3759                    touch_file_factory(arg.args, register_cleanup))
3760            elif isinstance(arg, Callable):
3761                self.posttask_functions.append(arg)
3762            else:
3763                raise ruffus_exceptions.PostTaskArgumentError(
3764                    description_with_args_placeholder % (arg,))
3765
3766    def jobs_limit(self, maximum_jobs_in_parallel, limit_name=None):
3767        """
3768        Limit the number of concurrent jobs
3769        """
3770        description_with_args_placeholder = ((self.description_with_args_placeholder % "...") +
3771                                             ".jobs_limit(%r%s)")
3772        self._set_jobs_limit(description_with_args_placeholder,
3773                             maximum_jobs_in_parallel, limit_name)
3774        return self
3775
3776    def _decorator_jobs_limit(self, maximum_jobs_in_parallel, limit_name=None):
3777        """
3778        @jobs_limit
3779        """
3780        description_with_args_placeholder = ("@jobs_limit(%r%s)\n" +
3781                                             (self.description_with_args_placeholder % "..."))
3782        self._set_jobs_limit(description_with_args_placeholder,
3783                             maximum_jobs_in_parallel, limit_name)
3784
3785    def _set_jobs_limit(self, description_with_args_placeholder,
3786                        maximum_jobs_in_parallel, limit_name=None):
3787        try:
3788            maximum_jobs_in_parallel = int(maximum_jobs_in_parallel)
3789            assert(maximum_jobs_in_parallel >= 1)
3790        except:
3791            limit_name = ", " + limit_name if limit_name else ""
3792            raise ruffus_exceptions.JobsLimitArgumentError(
3793                "Expecting a positive integer > 1 in \n" +
3794                description_with_args_placeholder % (maximum_jobs_in_parallel, limit_name))
3795
3796        # set semaphore name to other than the "pipeline.name:task name"
3797        if limit_name is not None:
3798            self.semaphore_name = limit_name
3799        if self.semaphore_name in self._job_limit_semaphores:
3800            prev_maximum_jobs = self._job_limit_semaphores[self.semaphore_name]
3801            if prev_maximum_jobs != maximum_jobs_in_parallel:
3802                limit_name = ", " + limit_name if limit_name else ""
3803                raise ruffus_exceptions.JobsLimitArgumentError(
3804                    'The job limit %r cannot re-defined from the former '
3805                    'limit of %d in \n'
3806                    % (self.semaphore_name, prev_maximum_jobs) +
3807                    description_with_args_placeholder
3808                    % (maximum_jobs_in_parallel, limit_name))
3809        else:
3810            #
3811            #   save semaphore and limit
3812            #
3813            self._job_limit_semaphores[
3814                self.semaphore_name] = maximum_jobs_in_parallel
3815
3816    def active_if(self, *active_if_checks):
3817        """
3818        If any of active_checks is False or returns False, then the task is
3819        marked as "inactive" and its outputs removed.
3820        """
3821        # print 'job is active:', active_checks, [
3822        #             arg() if isinstance(arg, Callable) else arg
3823        #             for arg in active_checks]
3824        if self.active_if_checks is None:
3825            self.active_if_checks = []
3826        self.active_if_checks.extend(active_if_checks)
3827        # print(self.active_if_checks)
3828        return self
3829
3830    def _decorator_active_if(self, *active_if_checks):
3831        """
3832        @active_if
3833        """
3834        self.active_if(*active_if_checks)
3835
3836    def graphviz(self, *unnamed_args, **named_args):
3837        """
3838        Sets graphviz (e.g. `dot`) attributes used to draw this Task
3839        """
3840        self.graphviz_attributes = named_args
3841        if len(unnamed_args):
3842            raise TypeError("Only named arguments expected in :" +
3843                            self.description_with_args_placeholder % "..." +
3844                            ".graphviz(%r)\n" % unnamed_args)
3845        return self
3846
3847    def _decorator_graphviz(self, *unnamed_args, **named_args):
3848        self.graphviz_attributes = named_args
3849        if len(unnamed_args):
3850            raise TypeError("Only named arguments expected in :" +
3851                            "@graphviz(%r)\n" % unnamed_args +
3852                            self.description_with_args_placeholder % "...")
3853
3854
3855class task_encoder(json.JSONEncoder):
3856
3857    def default(self, obj):
3858        if isinstance(obj, set):
3859            return list(obj)
3860        if isinstance(obj, defaultdict):
3861            return dict(obj)
3862        if isinstance(obj, Task):
3863            # , Task._action_names[obj._action_task], obj.func_description]
3864            return obj._name
3865        return json.JSONEncoder.default(self, obj)
3866
3867
3868def is_node_up_to_date(node, extra_data):
3869    """
3870    Forwards tree depth first search "signalling" mechanism to
3871        node _is_up_to_date method
3872    Depth first search stops when node._is_up_to_date return True
3873    """
3874    return node._is_up_to_date(extra_data)
3875
3876
3877def update_checksum_level_on_tasks(checksum_level):
3878    """Reset the checksum level for all tasks"""
3879    for n in node._all_nodes:
3880        n.checksum_level = checksum_level
3881
3882
3883def update_active_states_for_all_tasks():
3884    """
3885
3886    @active_if decorated tasks can change their active state every time
3887      pipeline_run / pipeline_printout / pipeline_printout_graph is called
3888
3889    update_active_states_for_all_tasks ()
3890
3891    """
3892    for n in node._all_nodes:
3893        n._update_active_state()
3894
3895
3896def lookup_pipeline(pipeline):
3897    """
3898    If pipeline is
3899        None                : main_pipeline
3900        string              : lookup name in pipelines
3901    """
3902    if pipeline is None:
3903        return main_pipeline
3904
3905    # Pipeline object pass through unchanged
3906    if isinstance(pipeline, Pipeline):
3907        return pipeline
3908
3909    # strings: lookup from name
3910    if isinstance(pipeline, str) and pipeline in Pipeline.pipelines:
3911        return Pipeline.pipelines[pipeline]
3912
3913    raise ruffus_exceptions.error_not_a_pipeline("%s does not name a pipeline." % pipeline)
3914
3915
3916def _pipeline_prepare_to_run(checksum_level, history_file, pipeline, runtime_data, target_tasks, forcedtorun_tasks):
3917    """
3918    Common function to setup pipeline, check parameters
3919        before pipeline_run, pipeline_printout, pipeline_printout_graph
3920    """
3921
3922    if checksum_level is None:
3923        checksum_level = get_default_checksum_level()
3924
3925    update_checksum_level_on_tasks(checksum_level)
3926
3927    #
3928    #   If we aren't using checksums, and history file hasn't been specified,
3929    #       we might be a bit surprised to find Ruffus writing to a
3930    #       sqlite db anyway.
3931    #   Let us just dump to a placeholder memory db that can then be discarded
3932    #   Of course, if history_file is specified, we presume you know what
3933    #       you are doing
3934    #
3935    if checksum_level == CHECKSUM_FILE_TIMESTAMPS and history_file is None:
3936        history_file = ':memory:'
3937    #
3938    # load previous job history if it exists, otherwise create an empty history
3939    #
3940    job_history = open_job_history(history_file)
3941
3942    #
3943    # @active_if decorated tasks can change their active state every time
3944    #   pipeline_run / pipeline_printout / pipeline_printout_graph is called
3945    #
3946    update_active_states_for_all_tasks()
3947
3948    #
3949    #   run time data
3950    #
3951    if runtime_data is None:
3952        runtime_data = {}
3953    if not isinstance(runtime_data, dict):
3954        raise Exception("Parameter runtime_data should be a "
3955                        "dictionary of values passes to jobs at run time.")
3956
3957    #
3958    #   This is the default namespace for looking for tasks
3959    #
3960    #   pipeline must be a Pipeline or a string naming a pipeline
3961    #
3962    #   Keep pipeline
3963    #
3964    if pipeline is not None:
3965        pipeline = lookup_pipeline(pipeline)
3966        default_pipeline_name = pipeline.name
3967    else:
3968        default_pipeline_name = "main"
3969
3970    #
3971    #   Lookup target jobs
3972    #
3973    if target_tasks is None:
3974        target_tasks = []
3975    if forcedtorun_tasks is None:
3976        forcedtorun_tasks = []
3977    # lookup names, prioritise the specified pipeline or "main"
3978    target_tasks = lookup_tasks_from_user_specified_names(
3979        "Target", target_tasks, default_pipeline_name, "__main__", True)
3980    forcedtorun_tasks = lookup_tasks_from_user_specified_names("Forced to run", forcedtorun_tasks,
3981                                                               default_pipeline_name, "__main__", True)
3982
3983    #
3984    #   Empty target, either run the specified tasks from the pipeline
3985    #   or will run every single task under the sun
3986    #
3987    if not target_tasks:
3988        if pipeline:
3989            target_tasks.extend(list(pipeline.tasks))
3990        if not target_tasks:
3991            for pipeline_name in Pipeline.pipelines.keys():
3992                target_tasks.extend(
3993                    list(Pipeline.pipelines[pipeline_name].tasks))
3994
3995    # make sure pipeline is defined
3996    pipeline = lookup_pipeline(pipeline)
3997
3998    # Unique task list
3999    target_tasks = list(set(target_tasks))
4000
4001    #
4002    #   Make sure all tasks in dependency list from (forcedtorun_tasks and target_tasks)
4003    #       are setup and linked to real functions
4004    #
4005    processed_tasks = set()
4006    completed_pipeline_names = set()
4007    incomplete_pipeline_names = set()
4008
4009    # get list of all involved pipelines
4010    for task in forcedtorun_tasks + target_tasks:
4011        if task.pipeline.name not in completed_pipeline_names:
4012            incomplete_pipeline_names.add(task.pipeline.name)
4013
4014    # set up each pipeline.
4015    # These will in turn lookup up their antecedents (even in another pipeline) and
4016    #   set them up as well.
4017    for pipeline_name in incomplete_pipeline_names:
4018        if pipeline_name in completed_pipeline_names:
4019            continue
4020        completed_pipeline_names = completed_pipeline_names.union(
4021            pipeline.pipelines[pipeline_name]._complete_task_setup(processed_tasks))
4022
4023    return checksum_level, job_history, pipeline, runtime_data, target_tasks, forcedtorun_tasks
4024
4025
4026def pipeline_printout_graph(stream,
4027                            output_format=None,
4028                            target_tasks=[],
4029                            forcedtorun_tasks=[],
4030                            draw_vertically=True,
4031                            ignore_upstream_of_target=False,
4032                            skip_uptodate_tasks=False,
4033                            gnu_make_maximal_rebuild_mode=True,
4034                            test_all_task_for_update=True,
4035                            no_key_legend=False,
4036                            minimal_key_legend=True,
4037                            user_colour_scheme=None,
4038                            pipeline_name="Pipeline:",
4039                            size=(11, 8),
4040                            dpi=120,
4041                            runtime_data=None,
4042                            checksum_level=None,
4043                            history_file=None,
4044                            pipeline=None):
4045    # Remember to add further extra parameters here to
4046    #   "extra_pipeline_printout_graph_options" inside cmdline.py
4047    # This will forward extra parameters from the
4048    # command line to pipeline_printout_graph
4049    """
4050    print out pipeline dependencies in various formats
4051
4052    :param stream: where to print to
4053    :type stream: file-like object with ``write()`` function
4054    :param output_format: ["dot", "jpg", "svg", "ps", "png"]. All but the
4055                          first depends on the
4056                          `dot <http://www.graphviz.org>`_ program.
4057    :param target_tasks: targets task functions which will be run if they are
4058                         out-of-date.
4059    :param forcedtorun_tasks: task functions which will be run whether or not
4060                              they are out-of-date.
4061    :param draw_vertically: Top to bottom instead of left to right.
4062    :param ignore_upstream_of_target: Don't draw upstream tasks of targets.
4063    :param skip_uptodate_tasks: Don't draw up-to-date tasks if possible.
4064    :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all*
4065                                          out-of-date tasks. Runs minimal
4066                                          set to build targets if set to
4067                                          ``True``. Use with caution.
4068    :param test_all_task_for_update: Ask all task functions if they are
4069                                     up-to-date.
4070    :param no_key_legend: Don't draw key/legend for graph.
4071    :param minimal_key_legend: Only legend entries for used task types
4072    :param user_colour_scheme: Dictionary specifying flowchart colour scheme
4073    :param pipeline_name: Pipeline Title
4074    :param size: tuple of x and y dimensions
4075    :param dpi: print resolution
4076    :param runtime_data: Experimental feature: pass data to tasks at run time
4077    :param history_file: Database file storing checksums and file timestamps
4078                         for input/output files.
4079    :param checksum_level: Several options for checking up-to-dateness are
4080                           available: Default is level 1.
4081                           level 0 : Use only file timestamps
4082                           level 1 : above, plus timestamp of successful job completion
4083                           level 2 : above, plus a checksum of the pipeline function body
4084                           level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators
4085    """
4086
4087    # EXTRA pipeline_run DEBUGGING
4088    global EXTRA_PIPELINERUN_DEBUGGING
4089    EXTRA_PIPELINERUN_DEBUGGING = False
4090
4091    (checksum_level,
4092     job_history,
4093     pipeline,
4094     runtime_data,
4095     target_tasks,
4096     forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level, history_file,
4097                                                   pipeline, runtime_data,
4098                                                   target_tasks, forcedtorun_tasks)
4099
4100    (topological_sorted, ignore_param1, ignore_param2, ignore_param3) = \
4101        topologically_sorted_nodes(target_tasks, forcedtorun_tasks,
4102                                   gnu_make_maximal_rebuild_mode,
4103                                   extra_data_for_signal=[
4104                                       t_verbose_logger(0, 0, None, runtime_data), job_history],
4105                                   signal_callback=is_node_up_to_date)
4106    if not len(target_tasks):
4107        target_tasks = topological_sorted[-1:]
4108
4109    # open file if (unicode?) string
4110    close_stream = False
4111    if isinstance(stream, path_str_type):
4112        stream = open(stream, "wb")
4113        close_stream = True
4114
4115    # derive format automatically from name
4116    if output_format is None:
4117        output_format = os.path.splitext(stream.name)[1].lstrip(".")
4118
4119    try:
4120        graph_printout(stream,
4121                       output_format,
4122                       target_tasks,
4123                       forcedtorun_tasks,
4124                       draw_vertically,
4125                       ignore_upstream_of_target,
4126                       skip_uptodate_tasks,
4127                       gnu_make_maximal_rebuild_mode,
4128                       test_all_task_for_update,
4129                       no_key_legend,
4130                       minimal_key_legend,
4131                       user_colour_scheme,
4132                       pipeline_name,
4133                       size,
4134                       dpi,
4135                       extra_data_for_signal=[t_verbose_logger(
4136                           0, 0, None, runtime_data), job_history],
4137                       signal_callback=is_node_up_to_date)
4138    finally:
4139        # if this is a stream we opened, we have to close it ourselves
4140        if close_stream:
4141            stream.close()
4142
4143
4144def get_completed_task_strings(incomplete_tasks, all_tasks, forcedtorun_tasks, verbose,
4145                               verbose_abbreviated_path, indent, runtime_data, job_history):
4146    """
4147    Printout list of completed tasks
4148    """
4149    completed_task_strings = []
4150    if len(all_tasks) > len(incomplete_tasks):
4151        completed_task_strings.append("")
4152        completed_task_strings.append("_" * 40)
4153        completed_task_strings.append("Tasks which are up-to-date:")
4154        completed_task_strings.append("")
4155        completed_task_strings.append("")
4156        set_of_incomplete_tasks = set(incomplete_tasks)
4157
4158        for t in all_tasks:
4159            # Only print Up to date tasks
4160            if t in set_of_incomplete_tasks:
4161                continue
4162            # LOGGER
4163            completed_task_strings.extend(t._printout(runtime_data,
4164                                                      t in forcedtorun_tasks, job_history, False,
4165                                                      verbose, verbose_abbreviated_path, indent))
4166
4167        completed_task_strings.append("_" * 40)
4168        completed_task_strings.append("")
4169        completed_task_strings.append("")
4170
4171    return completed_task_strings
4172
4173
4174def pipeline_printout(output_stream=None,
4175                      target_tasks=[],
4176                      forcedtorun_tasks=[],
4177                      # verbose defaults to 4 if None
4178                      verbose=None,
4179                      indent=4,
4180                      gnu_make_maximal_rebuild_mode=True,
4181                      wrap_width=100,
4182                      runtime_data=None,
4183                      checksum_level=None,
4184                      history_file=None,
4185                      verbose_abbreviated_path=None,
4186                      pipeline=None):
4187    # Remember to add further extra parameters here to
4188    #   "extra_pipeline_printout_options" inside cmdline.py
4189    # This will forward extra parameters from the command
4190    # line to pipeline_printout
4191    """
4192    Printouts the parts of the pipeline which will be run
4193
4194    Because the parameters of some jobs depend on the results of previous
4195    tasks, this function produces only the current snap-shot of task jobs.
4196    In particular, tasks which generate variable number of inputs into
4197    following tasks will not produce the full range of jobs.
4198
4199    ::
4200        verbose = 0 : Nothing
4201        verbose = 1 : All Tasks names
4202        verbose = 2 : All Tasks (including any task function docstrings)
4203        verbose = 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation
4204        verbose = 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings
4205        verbose = 5 : All Jobs in Out-of-date Tasks,  (include only list of up-to-date tasks)
4206        verbose = 6 : All jobs in All Tasks whether out of date or not
4207
4208    :param output_stream: where to print to
4209    :type output_stream: file-like object with ``write()`` function
4210    :param target_tasks: targets task functions which will be run if they are
4211                         out-of-date
4212    :param forcedtorun_tasks: task functions which will be run whether or not
4213                              they are out-of-date
4214    :param verbose: level 0 : nothing
4215                    level 1 : Out-of-date Task names
4216                    level 2 : All Tasks (including any task function docstrings)
4217                    level 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation
4218                    level 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings
4219                    level 5 : All Jobs in Out-of-date Tasks,  (include only list of up-to-date tasks)
4220                    level 6 : All jobs in All Tasks whether out of date or not
4221                    level 7 : Show file modification times for All jobs in All Tasks
4222                    level 10: logs messages useful only for debugging ruffus pipeline code
4223    :param indent: How much indentation for pretty format.
4224    :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all*
4225                                          out-of-date tasks. Runs minimal
4226                                          set to build targets if set to
4227                                          ``True``. Use with caution.
4228    :param wrap_width: The maximum length of each line
4229    :param runtime_data: Experimental feature: pass data to tasks at run time
4230    :param checksum_level: Several options for checking up-to-dateness are
4231                           available: Default is level 1.
4232                           level 0 : Use only file timestamps
4233                           level 1 : above, plus timestamp of successful job completion
4234                           level 2 : above, plus a checksum of the pipeline function body
4235                           level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators
4236    :param history_file: Database file storing checksums and file timestamps for input/output files.
4237    :param verbose_abbreviated_path: whether input and output paths are abbreviated.
4238        level 0: The full (expanded, abspath) input or output path
4239        level > 1: The number of subdirectories to include. Abbreviated paths are prefixed with ``[,,,]/``
4240        level < 0: Input / Output parameters are truncated to ``MMM`` letters where ``verbose_abbreviated_path ==-MMM``. Subdirectories are first removed to see if this allows the paths to fit in the specified limit. Otherwise abbreviated paths are prefixed by ``<???>``
4241    """
4242    # do nothing!
4243    if verbose == 0:
4244        return
4245
4246    #
4247    # default values
4248    #
4249    if verbose_abbreviated_path is None:
4250        verbose_abbreviated_path = 2
4251    if verbose is None:
4252        verbose = 4
4253
4254    # EXTRA pipeline_run DEBUGGING
4255    global EXTRA_PIPELINERUN_DEBUGGING
4256    EXTRA_PIPELINERUN_DEBUGGING = False
4257
4258    if output_stream is None:
4259        output_stream = sys.stdout
4260
4261    if not hasattr(output_stream, "write"):
4262        raise Exception("The first parameter to pipeline_printout needs to be "
4263                        "an output file, e.g. sys.stdout and not %s"
4264                        % str(output_stream))
4265
4266    logging_strm = t_verbose_logger(verbose, verbose_abbreviated_path,
4267                                    t_stream_logger(output_stream), runtime_data)
4268
4269    (checksum_level,
4270     job_history,
4271     pipeline,
4272     runtime_data,
4273     target_tasks,
4274     forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level, history_file,
4275                                                   pipeline, runtime_data,
4276                                                   target_tasks, forcedtorun_tasks)
4277
4278    (incomplete_tasks,
4279     self_terminated_nodes,
4280     dag_violating_edges,
4281     dag_violating_nodes) = \
4282        topologically_sorted_nodes(target_tasks, forcedtorun_tasks,
4283                                   gnu_make_maximal_rebuild_mode,
4284                                   extra_data_for_signal=[
4285                                       t_verbose_logger(0, 0, None, runtime_data), job_history],
4286                                   signal_callback=is_node_up_to_date)
4287
4288    #
4289    #   raise error if DAG violating nodes
4290    #
4291    if len(dag_violating_nodes):
4292        dag_violating_tasks = ", ".join(t._name for t in dag_violating_nodes)
4293
4294        e = ruffus_exceptions.error_circular_dependencies("Circular dependencies found in the pipeline involving "
4295                                        "one or more of (%s)" % (dag_violating_tasks,))
4296        raise e
4297
4298    wrap_indent = " " * (indent + 11)
4299
4300    #
4301    #   Get updated nodes as all_nodes - nodes_to_run
4302    #
4303    #   LOGGER level 6 : All jobs in All Tasks whether out of date or not
4304    if verbose in [1, 2] or verbose >= 5:
4305        (all_tasks, ignore_param1, ignore_param2, ignore_param3) = \
4306            topologically_sorted_nodes(target_tasks, True, gnu_make_maximal_rebuild_mode,
4307                                       extra_data_for_signal=[
4308                                           t_verbose_logger(
4309                                               0, 0, None, runtime_data),
4310                                           job_history],
4311                                       signal_callback=is_node_up_to_date)
4312        for m in get_completed_task_strings(incomplete_tasks, all_tasks, forcedtorun_tasks,
4313                                            verbose, verbose_abbreviated_path, indent,
4314                                            runtime_data, job_history):
4315            output_stream.write(textwrap.fill(m, subsequent_indent=wrap_indent,
4316                                              width=wrap_width) + "\n")
4317
4318    output_stream.write("\n" + "_" * 40 + "\nTasks which will be run:\n\n")
4319    for t in incomplete_tasks:
4320        # LOGGER
4321        messages = t._printout(runtime_data, t in forcedtorun_tasks,
4322                               job_history, True, verbose,
4323                               verbose_abbreviated_path, indent)
4324        for m in messages:
4325            output_stream.write(textwrap.fill(m, subsequent_indent=wrap_indent,
4326                                              width=wrap_width) + "\n")
4327
4328    if verbose:
4329        # LOGGER
4330        output_stream.write("_" * 40 + "\n")
4331
4332
4333def get_semaphore(t, _job_limit_semaphores, syncmanager):
4334    """
4335    return semaphore to limit the number of concurrent jobs
4336    """
4337    #
4338    #   Is this task limited in the number of jobs?
4339    #
4340    if t.semaphore_name not in t._job_limit_semaphores:
4341        return None
4342
4343    #
4344    #   create semaphore if not yet created
4345    #
4346    if t.semaphore_name not in _job_limit_semaphores:
4347        maximum_jobs_num = t._job_limit_semaphores[t.semaphore_name]
4348        _job_limit_semaphores[t.semaphore_name] = syncmanager.BoundedSemaphore(
4349            maximum_jobs_num)
4350    return _job_limit_semaphores[t.semaphore_name]
4351
4352
4353
4354def job_needs_to_run(task, params, force_rerun, logger, verbose, job_name,
4355                     job_history, verbose_abbreviated_path):
4356    """
4357    Check if job parameters out of date / needs to rerun
4358    Also logs why things are up to date or not
4359
4360    TODO Is this a duplicate of logic in is_up_to_date??
4361    TODO Is this a duplicate of logic in _printout??
4362    TODO Ignores is_active
4363    """
4364
4365    # Out of date because forced to run
4366    if force_rerun:
4367        # LOGGER: Out-of-date Jobs in Out-of-date Tasks
4368        log_at_level(logger, 3, verbose, "    force task %s to rerun "
4369                     % job_name)
4370        return True
4371
4372    if task.needs_update_func is None:
4373        # LOGGER: Out-of-date Jobs in Out-of-date Tasks
4374        log_at_level(logger, 3, verbose, "    %s no function to check "
4375                     "if up-to-date " % job_name)
4376        return True
4377
4378    # extra clunky hack to also pass task info--
4379    # makes sure that there haven't been code or
4380    # arg changes
4381    if task.needs_update_func == needs_update_check_modify_time:
4382        needs_update, msg = task.needs_update_func(
4383            *params, task=task, job_history=job_history,
4384            verbose_abbreviated_path=verbose_abbreviated_path,
4385            return_file_dates_when_uptodate=verbose > 6)
4386    else:
4387        needs_update, msg = task.needs_update_func(*params)
4388
4389    if not needs_update:
4390        # LOGGER: All Jobs in Out-of-date Tasks
4391        log_at_level(logger, 5, verbose,
4392                     "    %s unnecessary: already %s" % (job_name, msg))
4393        return False
4394
4395
4396    # LOGGER: Out-of-date Jobs in Out-of-date
4397    # Tasks: Why out of date
4398    if not log_at_level(logger, 4, verbose, "    %s %s " % (job_name, msg)):
4399        # LOGGER: Out-of-date Jobs in
4400        # Out-of-date Tasks: No explanation
4401        log_at_level(logger, 3, verbose, "    %s" % (job_name))
4402
4403    #
4404    #   Clunky hack to make sure input files exists right
4405    #       before job is called for better error messages
4406    #
4407    if task.needs_update_func == needs_update_check_modify_time:
4408        check_input_files_exist(*params)
4409
4410    return True
4411
4412
4413def remove_completed_tasks(task_with_completed_job_q, incomplete_tasks,
4414                           count_remaining_jobs, logger, verbose):
4415    """
4416    Remove completed tasks in same thread as job parameters generation to
4417        prevent race conditions
4418    Task completion is usually signalled from pipeline_run
4419    """
4420    while True:
4421        try:
4422            (job_completed_task,
4423             job_completed_task_name,
4424             job_completed_node_index,
4425             job_completed_name) = task_with_completed_job_q.get_nowait()
4426
4427            if job_completed_task not in incomplete_tasks:
4428                raise Exception("Last job %s for %s. Missing from "
4429                                "incomplete tasks in make_job_parameter_generator"
4430                                % (job_completed_name, job_completed_task_name))
4431            count_remaining_jobs[job_completed_task] -= 1
4432            #
4433            #   Negative job count : something has gone very wrong
4434            #
4435            if count_remaining_jobs[job_completed_task] < 0:
4436                raise Exception("job %s for %s causes job count < 0."
4437                                % (job_completed_name,
4438                                   job_completed_task_name))
4439
4440            #
4441            #   This Task completed
4442            #
4443            if count_remaining_jobs[job_completed_task] == 0:
4444                log_at_level(logger, 10, verbose, "   Last job for %r. "
4445                             "Retired from incomplete tasks in pipeline_run "
4446                             % job_completed_task._get_display_name())
4447                incomplete_tasks.remove(job_completed_task)
4448                job_completed_task._completed()
4449                log_at_level(logger, 1, verbose, "Completed Task = %r "
4450                             % job_completed_task._get_display_name())
4451
4452        except queue.Empty:
4453            break
4454
4455
4456def make_job_parameter_generator(incomplete_tasks, task_parents, logger,
4457                                 forcedtorun_tasks, task_with_completed_job_q,
4458                                 runtime_data, verbose,
4459                                 verbose_abbreviated_path,
4460                                 syncmanager,
4461                                 death_event,
4462                                 touch_files_only, job_history):
4463    """
4464    Parameter generator factory for all jobs / tasks
4465    """
4466
4467    inprogress_tasks = set()
4468    _job_limit_semaphores = dict()
4469
4470    # _________________________________________________________________________
4471    #
4472    #   Parameter generator returned by factory
4473    #
4474    # _________________________________________________________________________
4475    def parameter_generator():
4476        count_remaining_jobs = defaultdict(int)
4477        log_at_level(logger, 10, verbose, "   job_parameter_generator BEGIN")
4478        while len(incomplete_tasks):
4479            cnt_jobs_created_for_all_tasks = 0
4480            cnt_tasks_processed = 0
4481
4482            #
4483            #   get rid of all completed tasks first
4484            #       Completion is signalled from pipeline_run
4485            #
4486            remove_completed_tasks(task_with_completed_job_q, incomplete_tasks,
4487                                   count_remaining_jobs, logger, verbose)
4488
4489            for t in list(incomplete_tasks):
4490                #
4491                #   wrap in execption handler so that we know
4492                #       which task the original exception came from
4493                #
4494                try:
4495                    log_at_level(logger, 10, verbose, "   job_parameter_generator consider "
4496                                 "task = %r" % t._get_display_name())
4497
4498                    # ignore tasks in progress
4499                    if t in inprogress_tasks:
4500                        continue
4501                    log_at_level(logger, 10, verbose, "   job_parameter_generator task %r not in "
4502                                 "progress" % t._get_display_name())
4503
4504                    # ignore tasks with incomplete dependencies
4505                    incomplete_parent = False
4506                    for parent in task_parents[t]:
4507                        if parent in incomplete_tasks:
4508                            incomplete_parent = True
4509                            break
4510                    if incomplete_parent:
4511                        continue
4512
4513                    log_at_level(logger, 10, verbose, "   job_parameter_generator start task %r "
4514                                 "(parents completed)" % t._get_display_name())
4515                    force_rerun = t in forcedtorun_tasks
4516                    inprogress_tasks.add(t)
4517                    cnt_tasks_processed += 1
4518
4519                    #
4520                    # Log active task
4521                    #
4522                    if t.is_active:
4523                        forced_msg = ": Forced to rerun" if force_rerun else ""
4524                        log_at_level(logger, 1, verbose, "Task enters queue = %r %s"
4525                                     % (t._get_display_name(), forced_msg))
4526                        if len(t.func_description):
4527                            log_at_level(logger, 2, verbose,
4528                                         "    " + t.func_description)
4529                    #
4530                    #   Inactive skip loop
4531                    #
4532                    else:
4533                        incomplete_tasks.remove(t)
4534                        # N.B. inactive tasks are not _completed()
4535                        # t._completed()
4536                        t.output_filenames = None
4537                        log_at_level(logger, 2, verbose, "Inactive Task = %r"
4538                                     % t._get_display_name())
4539                        continue
4540
4541                    # use output parameters generated by running task
4542                    t.output_filenames = []
4543
4544                    # If no parameters: just call task function (empty list)
4545                    if t.param_generator_func is None:
4546                        task_parameters = ([[], []],)
4547                    else:
4548                        task_parameters = t.param_generator_func(runtime_data)
4549
4550                    #
4551                    #   iterate through jobs
4552                    #
4553                    cnt_jobs_created = 0
4554                    for params, unglobbed_params in task_parameters:
4555
4556                        #
4557                        #   save output even if uptodate
4558                        #
4559                        if len(params) >= 2:
4560                            # To do: In the case of split subdivide, we should be doing this after
4561                            #       The job finishes
4562                            t.output_filenames.append(params[1])
4563
4564                        job_name = t._get_job_name(unglobbed_params,
4565                                                   verbose_abbreviated_path,
4566                                                   runtime_data)
4567
4568                        if not job_needs_to_run(t, params, force_rerun, logger, verbose, job_name,
4569                                                job_history, verbose_abbreviated_path):
4570                            continue
4571
4572                        # pause for one second before first job of each tasks
4573                        # @originate tasks do not need to pause,
4574                        #   because they depend on nothing!
4575                        if cnt_jobs_created == 0 and touch_files_only < 2:
4576                            if "ONE_SECOND_PER_JOB" in runtime_data and \
4577                                    runtime_data["ONE_SECOND_PER_JOB"] and \
4578                                    t._action_type != Task._action_task_originate:
4579                                log_at_level(logger, 10, verbose,
4580                                             "   1 second PAUSE in job_parameter_generator\n\n\n")
4581                                time.sleep(1.01)
4582                            else:
4583                                time.sleep(0.1)
4584
4585                        count_remaining_jobs[t] += 1
4586                        cnt_jobs_created += 1
4587                        cnt_jobs_created_for_all_tasks += 1
4588
4589                        yield (params,
4590                               unglobbed_params,
4591                               t._name,
4592                               t._node_index,
4593                               job_name,
4594                               t.job_wrapper,
4595                               t.user_defined_work_func,
4596                               get_semaphore(
4597                                   t, _job_limit_semaphores, syncmanager),
4598                               death_event,
4599                               touch_files_only)
4600
4601                    # if no job came from this task, this task is complete
4602                    #   we need to retire it here instead of normal completion
4603                    #       at end of job tasks precisely
4604                    #       because it created no jobs
4605                    if cnt_jobs_created == 0:
4606                        incomplete_tasks.remove(t)
4607                        t._completed()
4608                        log_at_level(logger, 1, verbose,
4609                                     "Uptodate Task = %r" % t._get_display_name())
4610                        # LOGGER: logs All Tasks (including any task function docstrings)
4611                        log_at_level(logger, 10, verbose, "   No jobs created for %r. Retired "
4612                                     "in parameter_generator " % t._get_display_name())
4613
4614                        #
4615                        #   Add extra warning if no regular expressions match:
4616                        #   This is a common class of frustrating errors
4617                        #
4618                        # DEBUGGGG!!
4619                        if verbose >= 1 and \
4620                                "ruffus_WARNING" in runtime_data and \
4621                                t.param_generator_func in runtime_data["ruffus_WARNING"]:
4622                            indent_str = " " * 8
4623                            for msg in runtime_data["ruffus_WARNING"][t.param_generator_func]:
4624                                messages = [msg.replace(
4625                                    "\n", "\n" + indent_str)]
4626                                if verbose >= 4 and runtime_data and \
4627                                    "MATCH_FAILURE" in runtime_data and \
4628                                        t.param_generator_func in runtime_data["MATCH_FAILURE"]:
4629                                    for job_msg in runtime_data["MATCH_FAILURE"][t.param_generator_func]:
4630                                        messages.append(
4631                                            indent_str + "Job Warning: Input substitution failed:")
4632                                        messages.append(
4633                                            indent_str + "  " + job_msg.replace("\n", "\n" + indent_str + "  "))
4634                                logger.warning("    In Task %r:\n%s%s "
4635                                               % (t._get_display_name(), indent_str, "\n".join(messages)))
4636
4637                #
4638                #   GeneratorExit thrown when generator doesn't complete.
4639                #       I.e. there is a break in the pipeline_run loop.
4640                #       This happens where there are exceptions
4641                #           signalled from within a job
4642                #
4643                #   This is not really an exception, more a way to exit the
4644                #       generator loop asynchrononously so that cleanups can
4645                #       happen (e.g. the "with" statement or finally.)
4646                #
4647                #   We could write except Exception: below which will catch
4648                #       everything but KeyboardInterrupt and StopIteration
4649                #       and GeneratorExit in python 2.6
4650                #
4651                #   However, in python 2.5, GeneratorExit inherits from
4652                #       Exception. So we explicitly catch and rethrow
4653                #       GeneratorExit.
4654                except GeneratorExit:
4655                    raise
4656                except:
4657                    exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
4658                    exception_stack = traceback.format_exc()
4659                    exception_name = exceptionType.__module__ + '.' + exceptionType.__name__
4660                    exception_value = str(exceptionValue)
4661                    if len(exception_value):
4662                        exception_value = "(%s)" % exception_value
4663                    errt = ruffus_exceptions.RethrownJobError([(t._name,
4664                                              "",
4665                                              exception_name,
4666                                              exception_value,
4667                                              exception_stack)])
4668                    errt.specify_task(t, "Exceptions generating parameters")
4669                    raise errt
4670
4671            # extra tests in case final tasks do not result in jobs
4672            if len(incomplete_tasks) and \
4673                    (not cnt_tasks_processed or cnt_jobs_created_for_all_tasks):
4674                log_at_level(logger, 10, verbose, "    incomplete tasks = " +
4675                             ",".join([t._name for t in incomplete_tasks]))
4676                yield waiting_for_more_tasks_to_complete()
4677
4678        yield all_tasks_complete()
4679        # This function is done
4680        log_at_level(logger, 10, verbose, "   job_parameter_generator END")
4681
4682    return parameter_generator
4683
4684
4685def feed_job_params_to_process_pool_factory(parameter_q, death_event, logger,
4686                                            verbose):
4687    """
4688    Process pool gets its parameters from this generator
4689    Use factory function to save parameter_queue
4690    """
4691    def feed_job_params_to_process_pool():
4692        log_at_level(logger, 10, verbose,
4693                     "   Send params to Pooled Process START")
4694        while 1:
4695            log_at_level(logger, 10, verbose,
4696                         "   Get next parameter size = %d" % parameter_q.qsize())
4697            if not parameter_q.qsize():
4698                time.sleep(0.1)
4699            params = parameter_q.get()
4700            log_at_level(logger, 10, verbose, "   Get next parameter done")
4701
4702            # all tasks done
4703            if isinstance(params, all_tasks_complete):
4704                break
4705
4706            if death_event.is_set():
4707                death_event.clear()
4708                break
4709
4710            log_at_level(logger, 10, verbose,
4711                         "   Send params to Pooled Process=>" + str(params[0]))
4712            yield params
4713
4714        log_at_level(logger, 10, verbose,
4715                     "   Send params to Pooled Process END")
4716
4717    # return generator
4718    return feed_job_params_to_process_pool
4719
4720
4721def fill_queue_with_job_parameters(job_parameters, parameter_q, POOL_SIZE,
4722                                   logger, verbose):
4723    """
4724    Ensures queue filled with number of parameters > jobs / slots (POOL_SIZE)
4725    """
4726    log_at_level(logger, 10, verbose,
4727                 "    fill_queue_with_job_parameters START")
4728
4729    for params in job_parameters:
4730
4731        # stop if no more jobs available
4732        if isinstance(params, waiting_for_more_tasks_to_complete):
4733            log_at_level(logger, 10, verbose,
4734                         "    fill_queue_with_job_parameters WAITING for task to complete")
4735            break
4736
4737        if not isinstance(params, all_tasks_complete):
4738            log_at_level(logger, 10, verbose, "    fill_queue_with_job_parameters=>" +
4739                         str(params[0]))
4740
4741        # put into queue
4742        parameter_q.put(params)
4743
4744        # queue size needs to be at least 2 so that the parameter queue never
4745        #   consists of a singlewaiting_for_task_to_complete entry which will
4746        #   cause a loop and everything to hang!
4747        if parameter_q.qsize() > POOL_SIZE + 1:
4748            break
4749    log_at_level(logger, 10, verbose, "    fill_queue_with_job_parameters END")
4750
4751
4752def pipeline_get_task_names(pipeline=None):
4753    """
4754    Get all task names in a pipeline
4755    Not that does not check if pipeline is wired up properly
4756    """
4757
4758    # EXTRA pipeline_run DEBUGGING
4759    global EXTRA_PIPELINERUN_DEBUGGING
4760    EXTRA_PIPELINERUN_DEBUGGING = False
4761
4762    #
4763    #   pipeline must be a Pipeline or a string naming a pipeline
4764    #
4765    pipeline = lookup_pipeline(pipeline)
4766
4767    #
4768    #   Make sure all tasks in dependency list are linked to real functions
4769    #
4770    processed_tasks = set()
4771    completed_pipeline_names = pipeline._complete_task_setup(processed_tasks)
4772
4773    #
4774    #   Return task names for all nodes willy nilly
4775    #
4776
4777    return [n._name for n in node._all_nodes]
4778
4779
4780def get_job_result_output_file_names(job_result):
4781    """
4782    Excludes input file names being passed through
4783    """
4784    if len(job_result.unglobbed_params) <= 1:  # some jobs have no outputs
4785        return
4786
4787    unglobbed_input_params = job_result.unglobbed_params[0]
4788    unglobbed_output_params = job_result.unglobbed_params[1]
4789
4790    # some have multiple outputs from one job
4791    if not isinstance(unglobbed_output_params, list):
4792        unglobbed_output_params = [unglobbed_output_params]
4793
4794    # canonical path of input files, retaining any symbolic links:
4795    #   symbolic links have their own checksumming
4796    input_file_names = set()
4797    for i_f_n in get_strings_in_flattened_sequence([unglobbed_input_params]):
4798        input_file_names.add(os.path.abspath(i_f_n))
4799
4800    #
4801    # N.B. output parameters are not necessary all strings
4802    #   and not all files have been successfully created,
4803    #   even though the task apparently completed properly!
4804    # Remember to re-expand globs (from unglobbed paramters)
4805    #   after the job has run successfully
4806    #
4807    for possible_glob_str in get_strings_in_flattened_sequence(unglobbed_output_params):
4808        for o_f_n in glob.glob(possible_glob_str):
4809            #
4810            # Exclude output files if they are input files "passed through"
4811            #
4812            if os.path.abspath(o_f_n) in input_file_names:
4813                continue
4814
4815            #
4816            # use paths relative to working directory
4817            #
4818            yield os.path.relpath(o_f_n)
4819
4820    return
4821
4822
4823def handle_sigint(pool, pipeline):
4824    pool.kill(ruffus_exceptions.JobSignalledBreak)
4825
4826
4827def handle_sigusr1(pool, pipeline):
4828    pipeline.suspend_jobs()
4829
4830
4831def handle_sigusr2(pool, pipeline):
4832    pipeline.resume_jobs()
4833
4834
4835#   How the job queue works:
4836#   Main loop
4837#
4838#       iterates pool.map using feed_job_params_to_process_pool()
4839#       (calls parameter_q.get() until all_tasks_complete)
4840#
4841#           if errors but want to finish tasks already in pipeine:
4842#               parameter_q.put(all_tasks_complete())
4843#               keep going
4844#        else:
4845#
4846#            loops through jobs until no more jobs in non-dependent tasks
4847#               separate loop in generator so that list of incomplete_tasks
4848#               does not get updated half way through
4849#               causing race conditions
4850#
4851#               parameter_q.put(params)
4852#               until waiting_for_more_tasks_to_complete
4853#               until queue is full (check *after*)
4854#
4855def pipeline_run(target_tasks=[],
4856                 forcedtorun_tasks=[],
4857                 multiprocess=1,
4858                 logger=stderr_logger,
4859                 gnu_make_maximal_rebuild_mode=True,
4860                 # verbose defaults to 1 if None
4861                 verbose=None,
4862                 runtime_data=None,
4863                 one_second_per_job=None,
4864                 touch_files_only=False,
4865                 exceptions_terminate_immediately=False,
4866                 log_exceptions=False,
4867                 checksum_level=None,
4868                 multithread=0,
4869                 history_file=None,
4870                 # defaults to 2 if None
4871                 verbose_abbreviated_path=None,
4872                 pipeline=None,
4873                 pool_manager="multiprocessing"):
4874    # Remember to add further extra parameters here to
4875    #   "extra_pipeline_run_options" inside cmdline.py
4876    # This will forward extra parameters from the command line to
4877    # pipeline_run
4878    """Run pipelines.
4879
4880    :param target_tasks: targets task functions which will be run if they are
4881                         out-of-date
4882    :param forcedtorun_tasks: task functions which will be run whether or not
4883                              they are out-of-date
4884    :param multiprocess: The number of concurrent jobs running on different
4885                         processes.
4886    :param multithread: The number of concurrent jobs running as different
4887                        threads. If > 1, ruffus will use multithreading
4888                        *instead of* multiprocessing (and ignore the
4889                        multiprocess parameter). Using multi threading
4890                        is particularly useful to manage high performance
4891                        clusters which otherwise are prone to
4892                        "processor storms" when large number of cores finish
4893                        jobs at the same time.
4894    :param logger: Where progress will be logged. Defaults to stderr output.
4895    :type logger: `logging <http://docs.python.org/library/logging.html>`_
4896                  objects
4897    :param verbose:
4898
4899                    * level 0 : nothing
4900                    * level 1 : All Task names
4901                    * level 2 : All Tasks names any task function docstrings
4902                    * level 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation
4903                    * level 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings
4904                    * level 5 : All Jobs in Out-of-date Tasks,  (include only list of up-to-date
4905                      tasks)
4906                    * level 6 : All jobs in All Tasks whether out of date or not
4907                    * level 7 : Show file modification times for All jobs in All Tasks
4908                    * level 10: logs messages useful only for debugging ruffus pipeline code
4909    :param touch_files_only: Create or update input/output files only to
4910                             simulate running the pipeline. Do not run jobs.
4911                             If set to CHECKSUM_REGENERATE, will regenerate
4912                             the checksum history file to reflect the existing
4913                             i/o files on disk.
4914    :param exceptions_terminate_immediately: Exceptions cause immediate
4915                                             termination rather than waiting
4916                                             for N jobs to finish where
4917                                             N = multiprocess
4918    :param log_exceptions: Print exceptions to logger as soon as they occur.
4919    :param checksum_level: Several options for checking up-to-dateness are
4920                           available: Default is level 1.
4921
4922                           * level 0 : Use only file timestamps
4923                           * level 1 : above, plus timestamp of successful job completion
4924                           * level 2 : above, plus a checksum of the pipeline function body
4925                           * level 3 : above, plus a checksum of the pipeline
4926                             function default arguments and the
4927                             additional arguments passed in by task
4928                             decorators
4929    :param one_second_per_job: To work around poor file timepstamp resolution
4930                               for some file systems. Defaults to True if
4931                               checksum_level is 0 forcing Tasks to take a
4932                               minimum of 1 second to complete.
4933    :param runtime_data: Experimental feature: pass data to tasks at run time
4934    :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all*
4935                                          out-of-date tasks. Runs minimal
4936                                          set to build targets if set to
4937                                          ``True``. Use with caution.
4938    :param history_file: Database file storing checksums and file timestamps
4939                         for input/output files.
4940    :param verbose_abbreviated_path: whether input and output paths are abbreviated.
4941
4942                                     * level 0: The full (expanded, abspath) input or output path
4943                                     * level > 1: The number of subdirectories to include.
4944                                       Abbreviated paths are prefixed with ``[,,,]/``
4945                                     * level < 0: Input / Output parameters are truncated
4946                                       to ``MMM`` letters where ``verbose_abbreviated_path
4947                                       ==-MMM``. Subdirectories are first removed to see
4948                                       if this allows the paths to fit in the specified
4949                                       limit. Otherwise abbreviated paths are prefixed by
4950                                       ``<???>``
4951    """
4952    # DEBUGGG
4953    #print("pipeline_run start", file = sys.stderr)
4954
4955    #
4956    # default values
4957    #
4958    if touch_files_only is False:
4959        touch_files_only = 0
4960    elif touch_files_only is True:
4961        touch_files_only = 1
4962    else:
4963        touch_files_only = 2
4964        # we are not running anything so do it as quickly as possible
4965        one_second_per_job = False
4966    if verbose is None:
4967        verbose = 1
4968    if verbose_abbreviated_path is None:
4969        verbose_abbreviated_path = 2
4970
4971    # EXTRA pipeline_run DEBUGGING
4972    global EXTRA_PIPELINERUN_DEBUGGING
4973    if verbose >= 10:
4974        EXTRA_PIPELINERUN_DEBUGGING = True
4975    else:
4976        EXTRA_PIPELINERUN_DEBUGGING = False
4977
4978    if verbose == 0:
4979        logger = black_hole_logger
4980    elif verbose >= 11:
4981        #   debugging aid: See t_stderr_logger
4982        #   Each invocation of add_unique_prefix adds a unique prefix to
4983        #       all subsequent output So that individual runs of pipeline run
4984        #       are tagged
4985        if hasattr(logger, "add_unique_prefix"):
4986            logger.add_unique_prefix()
4987
4988    (checksum_level,
4989     job_history,
4990     pipeline,
4991     runtime_data,
4992     target_tasks,
4993     forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level,
4994                                                   history_file,
4995                                                   pipeline,
4996                                                   runtime_data,
4997                                                   target_tasks,
4998                                                   forcedtorun_tasks)
4999
5000    # select pool and queue type. Selection is convoluted
5001    # or backwards compatibility.
5002    itr_kwargs = {}
5003    if multiprocess is None:
5004        multiprocess = 0
5005    if multithread is None:
5006        multithread = 0
5007    parallelism = max(multiprocess, multithread)
5008
5009    if parallelism > 1:
5010        if pool_manager == "multiprocessing":
5011            syncmanager = multiprocessing.Manager()
5012            death_event = syncmanager.Event()
5013            if multithread:
5014                pool_t = ThreadPool
5015                queue_t = queue.Queue
5016            elif multiprocess > 1:
5017                pool_t = ProcessPool
5018                queue_t = queue.Queue
5019                #   Use a timeout of 3 years per job..., so that the condition
5020                #       we are waiting for in the thread can be interrupted by
5021                #       signals... In other words, so that Ctrl-C works
5022                #   Yucky part is that timeout is an extra parameter to
5023                #       IMapIterator.next(timeout=None) but next() for normal
5024                #       iterators do not take any extra parameters.
5025                itr_kwargs = dict(timeout=99999999)
5026            pool = pool_t(parallelism)
5027        elif pool_manager == "gevent":
5028            import gevent.event
5029            import gevent.queue
5030            import gevent.pool
5031            import gevent.signal
5032            try:
5033                import gevent.lock as gevent_lock
5034            except:
5035                import gevent.coros as gevent_lock
5036            syncmanager = gevent_lock
5037            death_event = gevent.event.Event()
5038            pool_t = gevent.pool.Pool
5039            pool = pool_t(parallelism)
5040            queue_t = gevent.queue.Queue
5041            gevent.signal(signal.SIGINT, functools.partial(handle_sigint, pool=pool, pipeline=pipeline))
5042            gevent.signal(signal.SIGUSR1, functools.partial(handle_sigusr1, pool=pool, pipeline=pipeline))
5043            gevent.signal(signal.SIGUSR2, functools.partial(handle_sigusr2, pool=pool, pipeline=pipeline))
5044        else:
5045            raise ValueError("unknown pool manager '{}'".format(pool_manager))
5046
5047    else:
5048        syncmanager = multiprocessing.Manager()
5049        death_event = syncmanager.Event()
5050        pool = None
5051        queue_t = queue.Queue
5052
5053    # Supplement mtime with system clock if using
5054    # CHECKSUM_HISTORY_TIMESTAMPS we don't need to default to adding 1
5055    # second delays between jobs
5056    if one_second_per_job is None:
5057        if checksum_level == CHECKSUM_FILE_TIMESTAMPS:
5058            log_at_level(logger, 10, verbose,
5059                         "   Checksums rely on FILE TIMESTAMPS only and we don't know if the "
5060                         "system file time resolution: Pause 1 second...")
5061            runtime_data["ONE_SECOND_PER_JOB"] = True
5062        else:
5063            log_at_level(logger, 10, verbose, "   Checksum use calculated time as well: "
5064                         "No 1 second pause...")
5065            runtime_data["ONE_SECOND_PER_JOB"] = False
5066    else:
5067        log_at_level(logger, 10, verbose, "   One second per job specified to be %s"
5068                     % one_second_per_job)
5069        runtime_data["ONE_SECOND_PER_JOB"] = one_second_per_job
5070
5071    if touch_files_only and verbose >= 1:
5072        logger.info("Touch output files instead of remaking them.")
5073
5074
5075    # To update the checksum file, we force all tasks to rerun but
5076    # then don't actually call the task function...
5077    # So starting with target_tasks and forcedtorun_tasks,
5078    # we harvest all upstream dependencies willy, nilly
5079    # and assign the results to forcedtorun_tasks
5080    if touch_files_only == 2:
5081        (forcedtorun_tasks, ignore_param1, ignore_param2, ignore_param3) = \
5082            topologically_sorted_nodes(target_tasks + forcedtorun_tasks, True,
5083                                       gnu_make_maximal_rebuild_mode,
5084                                       extra_data_for_signal=[t_verbose_logger(0, 0, None,
5085                                                                               runtime_data),
5086                                                              job_history],
5087                                       signal_callback=is_node_up_to_date)
5088
5089    # If verbose >=10, for debugging:
5090    #   Prints which tasks trigger the pipeline rerun...
5091    #   i.e. start from the farthest task, prints out all the up to date
5092    #   tasks, and the first out of date task
5093    (incomplete_tasks, self_terminated_nodes,
5094     dag_violating_edges, dag_violating_nodes) = \
5095        topologically_sorted_nodes(target_tasks, forcedtorun_tasks,
5096                                   gnu_make_maximal_rebuild_mode,
5097                                   extra_data_for_signal=[
5098                                       t_verbose_logger(verbose, verbose_abbreviated_path,
5099                                                        logger, runtime_data),
5100                                       job_history],
5101                                   signal_callback=is_node_up_to_date)
5102
5103    if len(dag_violating_nodes):
5104        dag_violating_tasks = ", ".join(t._name for t in dag_violating_nodes)
5105
5106        e = ruffus_exceptions.error_circular_dependencies("Circular dependencies found in the "
5107                                                          "pipeline involving one or more of "
5108                                                          "(%s)" % (dag_violating_tasks))
5109        raise e
5110
5111    # get dependencies. Only include tasks which will be run
5112    set_of_incomplete_tasks = set(incomplete_tasks)
5113    task_parents = defaultdict(set)
5114    for t in set_of_incomplete_tasks:
5115        task_parents[t] = set()
5116        for parent in t._get_inward():
5117            if parent in set_of_incomplete_tasks:
5118                task_parents[t].add(parent)
5119
5120    # Print Complete tasks
5121    # LOGGER level 5 : All jobs in All Tasks whether out of date or not
5122    if verbose in [1, 2] or verbose >= 5:
5123        (all_tasks, ignore_param1, ignore_param2, ignore_param3) = topologically_sorted_nodes(
5124            target_tasks, True,
5125            gnu_make_maximal_rebuild_mode,
5126            extra_data_for_signal=[t_verbose_logger(0, 0, None,
5127                                                    runtime_data),
5128                                   job_history],
5129            signal_callback=is_node_up_to_date)
5130        # indent hardcoded to 4
5131        for m in get_completed_task_strings(incomplete_tasks, all_tasks,
5132                                            forcedtorun_tasks, verbose,
5133                                            verbose_abbreviated_path, 4,
5134                                            runtime_data, job_history):
5135            logger.info(m)
5136
5137    # print json.dumps(task_parents.items(), indent=4, cls=task_encoder)
5138    logger.info("")
5139    logger.info("_" * 40)
5140    logger.info("Tasks which will be run:")
5141    logger.info("")
5142    logger.info("")
5143
5144    # prepare tasks for pipeline run:
5145    #
5146    #   clear task outputs
5147    #       task.output_filenames = None
5148    #
5149    #    **********
5150    #      BEWARE
5151    #    **********
5152    #
5153    #    Because state is stored, ruffus is *not* reentrant.
5154    #
5155    #    **********
5156    #      BEWARE
5157    #    **********
5158    for t in incomplete_tasks:
5159        t._init_for_pipeline()
5160
5161    #
5162    # prime queue with initial set of job parameters
5163    #
5164    parameter_q = queue_t()
5165    task_with_completed_job_q = queue_t()
5166
5167    parameter_generator = make_job_parameter_generator(incomplete_tasks,
5168                                                       task_parents,
5169                                                       logger, forcedtorun_tasks,
5170                                                       task_with_completed_job_q,
5171                                                       runtime_data, verbose,
5172                                                       verbose_abbreviated_path,
5173                                                       syncmanager, death_event,
5174                                                       touch_files_only, job_history)
5175    job_parameters = parameter_generator()
5176    fill_queue_with_job_parameters(
5177        job_parameters, parameter_q, parallelism, logger, verbose)
5178
5179    #
5180    #   N.B.
5181    #   Handling keyboard shortcuts may require
5182    #       See http://stackoverflow.com/questions/1408356/
5183    #           keyboard-interrupts-with-pythons-multiprocessing-pool
5184    #
5185    #   When waiting for a condition in threading.Condition.wait(),
5186    #       KeyboardInterrupt is never sent
5187    #       unless a timeout is specified
5188    #
5189    #
5190    #
5191    #   #
5192    # whether using multiprocessing
5193    #   #
5194    #   pool = Pool(parallelism) if multiprocess > 1 else None
5195    #   if pool:
5196    #       pool_func = pool.imap_unordered
5197    #       job_iterator_timeout = []
5198    #   else:
5199    #       pool_func = imap
5200    #       job_iterator_timeout = [999999999999]
5201    #
5202    #
5203    #   ....
5204    #
5205    #
5206    #   it = pool_func(run_pooled_job_without_exceptions,
5207    #                  feed_job_params_to_process_pool())
5208    #   while 1:
5209    #      try:
5210    #          job_result = it.next(*job_iterator_timeout)
5211    #
5212    #          ...
5213    #
5214    #      except StopIteration:
5215    #          break
5216
5217    if pool is not None:
5218        pool_func = pool.imap_unordered
5219    else:
5220        pool_func = map
5221
5222    feed_job_params_to_process_pool = feed_job_params_to_process_pool_factory(
5223        parameter_q, death_event, logger, verbose)
5224
5225    #
5226    #   for each result from job
5227    #
5228    job_errors = ruffus_exceptions.RethrownJobError()
5229    tasks_with_errors = set()
5230
5231    #
5232    #   job_result.job_name / job_result.return_value
5233    #       Reserved for returning result from job...
5234    #       How?
5235    #
5236    #   Rewrite for loop so we can call iter.next() with a timeout
5237    try:
5238
5239        # for job_result in pool_func(run_pooled_job_without_exceptions,
5240        # feed_job_params_to_process_pool()):
5241        ii = iter(pool_func(run_pooled_job_without_exceptions,
5242                            feed_job_params_to_process_pool()))
5243        while 1:
5244            if pool is not None:
5245                job_result = ii.next(**itr_kwargs)
5246            else:
5247                job_result = next(ii)
5248            # run next task
5249            log_at_level(logger, 11, verbose, "r" * 80 + "\n")
5250            t = node._lookup_node_from_index(job_result.node_index)
5251
5252            # remove failed jobs from history-- their output is bogus now!
5253            if job_result.state in (JOB_ERROR, JOB_SIGNALLED_BREAK):
5254                log_at_level(
5255                    logger, 10, verbose, "   JOB ERROR / JOB_SIGNALLED_BREAK: " + job_result.job_name)
5256                # remove outfile from history if it exists
5257                for o_f_n in get_job_result_output_file_names(job_result):
5258                    job_history.pop(o_f_n, None)
5259
5260            # only save poolsize number of errors
5261            if job_result.state == JOB_ERROR:
5262                log_at_level(logger, 10, verbose, "   Exception caught for %s"
5263                             % job_result.job_name)
5264                job_errors.append(job_result.exception)
5265                tasks_with_errors.add(t)
5266
5267                #
5268                # print to logger immediately
5269                #
5270                if log_exceptions:
5271                    log_at_level(logger, 10, verbose, "   Log Exception")
5272                    logger.error(job_errors.get_nth_exception_str())
5273
5274                #
5275                # break if too many errors
5276                #
5277                if len(job_errors) >= parallelism or exceptions_terminate_immediately:
5278                    log_at_level(logger, 10, verbose, "   Break loop %s %s %s "
5279                                 % (exceptions_terminate_immediately,
5280                                    len(job_errors), parallelism))
5281                    parameter_q.put(all_tasks_complete())
5282                    break
5283
5284            # break immediately if the user says stop
5285            elif job_result.state == JOB_SIGNALLED_BREAK:
5286                job_errors.append(job_result.exception)
5287                job_errors.specify_task(t, "Exceptions running jobs")
5288                log_at_level(logger, 10, verbose, "   Break loop JOB_SIGNALLED_BREAK %s %s "
5289                             % (len(job_errors), parallelism))
5290                parameter_q.put(all_tasks_complete())
5291                break
5292
5293            else:
5294                if job_result.state == JOB_UP_TO_DATE:
5295                    # LOGGER: All Jobs in Out-of-date Tasks
5296                    log_at_level(logger, 5, verbose, "    %s unnecessary: already up to date"
5297                                 % job_result.job_name)
5298                else:
5299                    # LOGGER: Out-of-date Jobs in Out-of-date Tasks
5300                    log_at_level(logger, 3, verbose,
5301                                 "    %s completed" % job_result.job_name)
5302                    # save this task name and the job (input and output files)
5303                    # alternatively, we could just save the output file and its
5304                    # completion time, or on the other end of the spectrum,
5305                    # we could save a checksum of the function that generated
5306                    # this file, something akin to:
5307                    # chksum = md5.md5(marshal.dumps(t.user_defined_work_func.func_code.co_code))
5308                    # we could even checksum the arguments to the function that
5309                    # generated this file:
5310                    # chksum2 = md5.md5(marshal.dumps(t.user_defined_work_func.func_defaults) +
5311                    #                   marshal.dumps(t.args))
5312
5313                    for o_f_n in get_job_result_output_file_names(job_result):
5314                        try:
5315                            log_at_level(logger, 10, verbose,
5316                                         "   Job History : " + o_f_n)
5317                            mtime = os.path.getmtime(o_f_n)
5318                            #
5319                            #   use probably higher resolution
5320                            #       time.time() over mtime which might have 1 or 2s
5321                            #       resolutions, unless there is clock skew and the
5322                            #       filesystem time > system time (e.g. for networks)
5323                            #
5324                            epoch_seconds = time.time()
5325                            # Aargh. go back to insert one second between jobs
5326                            if epoch_seconds < mtime:
5327                                if one_second_per_job is None and \
5328                                        not runtime_data["ONE_SECOND_PER_JOB"]:
5329                                    log_at_level(logger, 10, verbose,
5330                                                 "   Switch to 1s per job")
5331                                    runtime_data["ONE_SECOND_PER_JOB"] = True
5332                            elif epoch_seconds - mtime < 1.1:
5333                                mtime = epoch_seconds
5334                            chksum = JobHistoryChecksum(o_f_n, mtime,
5335                                                        job_result.unglobbed_params[2:], t)
5336                            job_history[o_f_n] = chksum
5337                            log_at_level(logger, 10, verbose,
5338                                         "   Job History Saved: " + o_f_n)
5339                        except:
5340                            pass
5341
5342            log_at_level(logger, 10, verbose,
5343                         "   _is_up_to_date completed task & checksum...")
5344            #
5345            #   _is_up_to_date completed task after checksumming
5346            #
5347            task_with_completed_job_q.put((t,
5348                                           job_result.task_name,
5349                                           job_result.node_index,
5350                                           job_result.job_name))
5351
5352            # make sure queue is still full after each job is retired
5353            # do this after undating which jobs are incomplete
5354            log_at_level(logger, 10, verbose, "   job errors?")
5355            if len(job_errors):
5356                # parameter_q.clear()
5357                # if len(job_errors) == 1 and not parameter_q._closed:
5358                log_at_level(logger, 10, verbose, "   all tasks completed...")
5359                parameter_q.put(all_tasks_complete())
5360            else:
5361                log_at_level(logger, 10, verbose,
5362                             "   Fill queue with more parameter...")
5363                fill_queue_with_job_parameters(job_parameters, parameter_q, parallelism, logger,
5364                                               verbose)
5365    # The equivalent of the normal end of a fall loop
5366    except StopIteration as e:
5367        pass
5368    except:
5369        exception_name, exception_value, exception_Traceback = sys.exc_info()
5370        exception_stack = traceback.format_exc()
5371        # save exception to rethrow later
5372        job_errors.append((None, None, exception_name,
5373                           exception_value, exception_stack))
5374        for ee in exception_value, exception_name, exception_stack:
5375            log_at_level(logger, 10, verbose,
5376                         "       Exception caught %s" % (ee,))
5377        log_at_level(logger, 10, verbose,
5378                     "   Get next parameter size = %d" % parameter_q.qsize())
5379        log_at_level(logger, 10, verbose, "   Task with completed "
5380                     "jobs size = %d" % task_with_completed_job_q.qsize())
5381        parameter_q.put(all_tasks_complete())
5382        try:
5383            death_event.clear()
5384        except:
5385            pass
5386
5387        if pool is not None:
5388            if hasattr(pool, "close"):
5389                log_at_level(logger, 10, verbose, "       pool.close")
5390                pool.close()
5391            log_at_level(logger, 10, verbose, "       pool.terminate")
5392            try:
5393                pool.terminate()
5394            except:
5395                pass
5396            log_at_level(logger, 10, verbose, "       pool.terminated")
5397        raise job_errors
5398
5399    # log_at_level (logger, 10, verbose, "       syncmanager.shutdown")
5400    # syncmanager.shutdown()
5401
5402    if pool is not None:
5403        log_at_level(logger, 10, verbose, "       pool.close")
5404        # pool.join()
5405        try:
5406            pool.close()
5407        except AttributeError:
5408            pass
5409        log_at_level(logger, 10, verbose, "       pool.terminate")
5410        try:
5411            pool.terminate()
5412        except AttributeError:
5413            pass
5414        except Exception:
5415            # an exception may be thrown after a signal is caught (Ctrl-C)
5416            #   when the EventProxy(s) for death_event might be left hanging
5417            pass
5418        log_at_level(logger, 10, verbose, "       pool.terminated")
5419
5420    # Switch back off EXTRA pipeline_run DEBUGGING
5421    EXTRA_PIPELINERUN_DEBUGGING = False
5422
5423    if len(job_errors):
5424        raise job_errors
5425
5426
5427if __name__ == '__main__':
5428    import unittest
5429
5430    #
5431    #   debug parameter ignored if called as a module
5432    #
5433    if sys.argv.count("--debug"):
5434        sys.argv.remove("--debug")
5435    unittest.main()
5436