1#!/usr/bin/env python 2from __future__ import print_function 3 4from collections import defaultdict, deque 5from collections import namedtuple 6from contextlib import contextmanager 7from multiprocessing import Pool as ProcessPool 8from multiprocessing.pool import ThreadPool 9import copy 10import functools 11import multiprocessing 12import os 13import glob 14import re 15import signal 16import subprocess 17import sys 18import textwrap 19import time 20import traceback 21 22from .file_name_parameters import \ 23 args_param_factory, \ 24 check_files_io_parameters, \ 25 check_input_files_exist, \ 26 check_parallel_parameters, \ 27 collate_param_factory, \ 28 combinatorics_param_factory, \ 29 files_custom_generator_param_factory, \ 30 files_param_factory, \ 31 get_nested_tasks_or_globs, \ 32 is_file_re_combining, \ 33 merge_param_factory, \ 34 needs_update_check_directory_missing, \ 35 needs_update_check_modify_time, \ 36 originate_param_factory, \ 37 product_param_factory, \ 38 regex, suffix, formatter, inputs, \ 39 split_param_factory, \ 40 subdivide_param_factory, \ 41 t_combinatorics_type, \ 42 t_extra_inputs, \ 43 t_formatter_file_names_transform, \ 44 t_nested_formatter_file_names_transform, \ 45 t_params_tasks_globs_run_time_data, \ 46 t_regex_file_names_transform, \ 47 t_suffix_file_names_transform, \ 48 transform_param_factory, \ 49 touch_file_factory 50from .ruffus_utility import shorten_filenames_encoder, \ 51 ignore_unknown_encoder, \ 52 get_strings_in_flattened_sequence, \ 53 JobHistoryChecksum, \ 54 CHECKSUM_FILE_TIMESTAMPS, \ 55 parse_task_arguments, \ 56 replace_placeholders_with_tasks_in_input_params, \ 57 get_default_checksum_level, \ 58 open_job_history, \ 59 non_str_sequence 60import ruffus.ruffus_exceptions as ruffus_exceptions 61from .print_dependencies import attributes_to_str 62from .graph import node, topologically_sorted_nodes, graph_printout 63 64 65if sys.hexversion < 0x03000000: 66 from future_builtins import zip, map 67 68################################################################################ 69# 70# 71# task.py 72# 73# Copyright (c) 10/9/2009 Leo Goodstadt 74# 75# Permission is hereby granted, free of charge, to any person obtaining a copy 76# of this software and associated documentation files (the "Software"), to deal 77# in the Software without restriction, including without limitation the rights 78# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 79# copies of the Software, and to permit persons to whom the Software is 80# furnished to do so, subject to the following conditions: 81# 82# The above copyright notice and this permission notice shall be included in 83# all copies or substantial portions of the Software. 84# 85# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 86# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 87# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 88# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 89# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 90# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 91# THE SOFTWARE. 92################################################################################# 93""" 94 95******************************************** 96:mod:`ruffus.task` -- Overview 97******************************************** 98 99.. moduleauthor:: Leo Goodstadt <ruffus@llew.org.uk> 100 101Initial implementation of @active_if by Jacob Biesinger 102 103============================ 104Decorator syntax: 105============================ 106 107 Pipelined tasks are created by "decorating" a function with the following syntax:: 108 109 def func_a(): 110 pass 111 112 @follows(func_a) 113 def func_b (): 114 pass 115 116 117 Each task is a single function which is applied one or more times to a list of parameters 118 (typically input files to produce a list of output files). 119 120 Each of these is a separate, independent job (sharing the same code) which can be 121 run in parallel. 122 123 124============================ 125Running the pipeline 126============================ 127 To run the pipeline:: 128 129 pipeline_run(target_tasks, forcedtorun_tasks = [], multiprocess = 1, 130 logger = stderr_logger, 131 gnu_make_maximal_rebuild_mode = True, 132 cleanup_log = "../cleanup.log") 133 134 pipeline_cleanup(cleanup_log = "../cleanup.log") 135 136 137 138 139 140 141""" 142 143try: 144 from collections.abc import Callable 145except ImportError: 146 from collections import Callable 147 148# 88888888888888888888888888888888888888888888888888888888888888888888888888888 149 150# imports 151 152 153# 88888888888888888888888888888888888888888888888888888888888888888888888888888 154if sys.hexversion >= 0x03000000: 155 # everything is unicode in python3 156 from functools import reduce 157 158 159try: 160 import cPickle as pickle 161except: 162 import pickle as pickle 163 164 165if __name__ == '__main__': 166 import sys 167 sys.path.insert(0, ".") 168 169 170if sys.hexversion >= 0x03000000: 171 # everything is unicode in python3 172 path_str_type = str 173else: 174 path_str_type = basestring 175 176 177# 178# use simplejson in place of json for python < 2.6 179# 180try: 181 import json 182except ImportError: 183 import simplejson 184 json = simplejson 185dumps = json.dumps 186 187if sys.hexversion >= 0x03000000: 188 import queue as queue 189else: 190 import Queue as queue 191 192 193class Ruffus_Keyboard_Interrupt_Exception (Exception): 194 pass 195 196# 88888888888888888888888888888888888888888888888888888888888888888888888888888 197 198# 199# light weight logging objects 200# 201# 202# 88888888888888888888888888888888888888888888888888888888888888888888888888888 203 204 205class t_black_hole_logger: 206 207 """ 208 Does nothing! 209 """ 210 211 def info(self, message, *args, **kwargs): 212 pass 213 214 def debug(self, message, *args, **kwargs): 215 pass 216 217 def warning(self, message, *args, **kwargs): 218 pass 219 220 def error(self, message, *args, **kwargs): 221 pass 222 223 224class t_stderr_logger: 225 226 """ 227 Everything to stderr 228 """ 229 230 def __init__(self): 231 self.unique_prefix = "" 232 233 def add_unique_prefix(self): 234 import random 235 random.seed() 236 self.unique_prefix = str(random.randint(0, 1000)) + " " 237 238 def info(self, message): 239 sys.stderr.write(self.unique_prefix + message + "\n") 240 241 def warning(self, message): 242 sys.stderr.write("\n\n" + self.unique_prefix + 243 "WARNING:\n " + message + "\n\n") 244 245 def error(self, message): 246 sys.stderr.write("\n\n" + self.unique_prefix + 247 "ERROR:\n " + message + "\n\n") 248 249 def debug(self, message): 250 sys.stderr.write(self.unique_prefix + message + "\n") 251 252 253class t_stream_logger: 254 255 """ 256 Everything to stderr 257 """ 258 259 def __init__(self, stream): 260 self.stream = stream 261 262 def info(self, message): 263 self.stream.write(message + "\n") 264 265 def warning(self, message): 266 self.stream.write("\n\nWARNING:\n " + message + "\n\n") 267 268 def error(self, message): 269 self.stream.write("\n\nERROR:\n " + message + "\n\n") 270 271 def debug(self, message): 272 self.stream.write(message + "\n") 273 274 275black_hole_logger = t_black_hole_logger() 276stderr_logger = t_stderr_logger() 277 278 279class t_verbose_logger: 280 281 def __init__(self, verbose, verbose_abbreviated_path, logger, runtime_data): 282 self.verbose = verbose 283 self.logger = logger 284 self.runtime_data = runtime_data 285 self.verbose_abbreviated_path = verbose_abbreviated_path 286 287 288def log_at_level(logger, message_level, verbose_level, msg): 289 """ 290 writes to log if message_level > verbose level 291 Returns anything written in case we might want to drop down and output at a 292 lower log level 293 """ 294 if message_level <= verbose_level: 295 logger.info(msg) 296 return True 297 return False 298 299 300# queue management objects 301# inserted into queue like job parameters to control multi-processing queue 302# fake parameters to signal in queue 303class all_tasks_complete: 304 pass 305 306 307class waiting_for_more_tasks_to_complete: 308 pass 309 310 311# synchronisation data 312# 313# SyncManager() 314# syncmanager.start() 315 316@contextmanager 317def do_nothing_semaphore(): 318 yield 319 320 321# option to turn on EXTRA pipeline_run DEBUGGING 322EXTRA_PIPELINERUN_DEBUGGING = False 323 324 325class task_decorator(object): 326 327 """ 328 Forwards to functions within Task 329 """ 330 331 def __init__(self, *decoratorArgs, **decoratorNamedArgs): 332 """ 333 saves decorator arguments 334 """ 335 self.args = decoratorArgs 336 self.named_args = decoratorNamedArgs 337 338 def __call__(self, task_func): 339 """ 340 calls func in task with the same name as the class 341 """ 342 # add task to main pipeline 343 # check for duplicate tasks inside _create_task 344 task = main_pipeline._create_task(task_func) 345 346 # call the method called 347 # task.decorator_xxxx 348 # where xxxx = transform subdivide etc 349 task_decorator_function = getattr( 350 task, "_decorator_" + self.__class__.__name__) 351 task.created_via_decorator = True 352 # create empty placeholder with the args %s actually inside the task function 353 task.description_with_args_placeholder = task._get_decorated_function( 354 ).replace("...", "%s", 1) 355 task_decorator_function(*self.args, **self.named_args) 356 357 # 358 # don't change the function so we can call it unaltered 359 # 360 return task_func 361 362 363class follows(task_decorator): 364 pass 365 366 367class files(task_decorator): 368 pass 369 370 371class split(task_decorator): 372 pass 373 374 375class transform(task_decorator): 376 pass 377 378 379class subdivide(task_decorator): 380 381 """ 382 Splits a each set of input files into multiple output file names, 383 where the number of output files may not be known beforehand. 384 """ 385 pass 386 387 388class originate(task_decorator): 389 pass 390 391 392class merge(task_decorator): 393 pass 394 395 396class posttask(task_decorator): 397 pass 398 399 400class jobs_limit(task_decorator): 401 pass 402 403 404class collate(task_decorator): 405 pass 406 407 408class active_if(task_decorator): 409 pass 410 411 412class check_if_uptodate(task_decorator): 413 pass 414 415 416class parallel(task_decorator): 417 pass 418 419 420class graphviz(task_decorator): 421 pass 422 423 424class files_re(task_decorator): 425 """obsolete""" 426 pass 427 428 429# indicator objects 430class mkdir(task_decorator): 431 # def __init__ (self, *args): 432 # self.args = args 433 pass 434 435 436# touch_file 437class touch_file(object): 438 439 def __init__(self, *args): 440 self.args = args 441 442 443# job descriptors 444# given parameters, returns strings describing job 445# First returned parameter is string in strong form 446# Second returned parameter is a list of strings for input, 447# output and extra parameters 448# intended to be reformatted with indentation 449# main use in error logging 450def generic_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data): 451 if unglobbed_params in ([], None): 452 m = "Job" 453 else: 454 m = "Job = %s" % ignore_unknown_encoder(unglobbed_params) 455 return m, [m] 456 457 458def io_files_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data): 459 extra_param = ", " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1] \ 460 if len(unglobbed_params) > 2 else "" 461 out_param = shorten_filenames_encoder(unglobbed_params[1], verbose_abbreviated_path) \ 462 if len(unglobbed_params) > 1 else "??" 463 in_param = shorten_filenames_encoder(unglobbed_params[0], verbose_abbreviated_path) \ 464 if len(unglobbed_params) > 0 else "??" 465 466 return ("Job = [%s -> %s%s]" % (in_param, out_param, extra_param), 467 ["Job = [%s" % in_param, "-> " + out_param + extra_param + "]"]) 468 469 470def io_files_one_to_many_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data): 471 472 extra_param = ", " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1] \ 473 if len(unglobbed_params) > 2 else "" 474 out_param = shorten_filenames_encoder(unglobbed_params[1], verbose_abbreviated_path) \ 475 if len(unglobbed_params) > 1 else "??" 476 in_param = shorten_filenames_encoder(unglobbed_params[0], verbose_abbreviated_path) \ 477 if len(unglobbed_params) > 0 else "??" 478 479 # start with input parameter 480 ret_params = ["Job = [%s" % in_param] 481 482 # add output parameter to list, 483 # processing one by one if multiple output parameters 484 if len(unglobbed_params) > 1: 485 if isinstance(unglobbed_params[1], (list, tuple)): 486 ret_params.extend( 487 "-> " + shorten_filenames_encoder(p, verbose_abbreviated_path) for p in unglobbed_params[1]) 488 else: 489 ret_params.append("-> " + out_param) 490 491 # add extra 492 if len(unglobbed_params) > 2: 493 ret_params.append( 494 " , " + shorten_filenames_encoder(unglobbed_params[2:], verbose_abbreviated_path)[1:-1]) 495 496 # add closing bracket 497 ret_params[-1] += "]" 498 499 return ("Job = [%s -> %s%s]" % (in_param, out_param, extra_param), ret_params) 500 501 502def mkdir_job_descriptor(unglobbed_params, verbose_abbreviated_path, runtime_data): 503 # input, output and parameters 504 if len(unglobbed_params) == 1: 505 m = "Make directories %s" % (shorten_filenames_encoder( 506 unglobbed_params[0], verbose_abbreviated_path)) 507 elif len(unglobbed_params) == 2: 508 m = "Make directories %s" % (shorten_filenames_encoder( 509 unglobbed_params[1], verbose_abbreviated_path)) 510 else: 511 return [], [] 512 return m, [m] 513 514 515# job wrappers 516# registers files/directories for cleanup 517def job_wrapper_generic(params, user_defined_work_func, register_cleanup, touch_files_only): 518 """ 519 run func 520 """ 521 assert(user_defined_work_func) 522 return user_defined_work_func(*params) 523 524 525def job_wrapper_io_files(params, user_defined_work_func, register_cleanup, touch_files_only, 526 output_files_only=False): 527 """ 528 job wrapper for all that deal with i/o files 529 run func on any i/o if not up to date 530 """ 531 assert(user_defined_work_func) 532 533 i, o = params[0:2] 534 535 if touch_files_only == 0: 536 # @originate only uses output files 537 if output_files_only: 538 # TODOOO extra and named extras 539 ret_val = user_defined_work_func(*(params[1:])) 540 # all other decorators 541 else: 542 try: 543 # TODOOO extra and named extras 544 ret_val = user_defined_work_func(*params) 545 # EXTRA pipeline_run DEBUGGING 546 if EXTRA_PIPELINERUN_DEBUGGING: 547 sys.stderr.write( 548 "w" * 36 + "[[ task() done ]]" + "w" * 27 + "\n") 549 except KeyboardInterrupt as e: 550 # Reraise KeyboardInterrupt as a normal Exception 551 # EXTRA pipeline_run DEBUGGING 552 if EXTRA_PIPELINERUN_DEBUGGING: 553 sys.stderr.write("E" * 36 + "[[ KeyboardInterrupt from task() ]]" + 554 "E" * 9 + "\n") 555 raise Ruffus_Keyboard_Interrupt_Exception("KeyboardInterrupt") 556 except: 557 # sys.stderr.write("?? %s ??" % (tuple(params),)) 558 raise 559 elif touch_files_only == 1: 560 # job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True) 561 562 # 563 # Do not touch any output files which are the same as any input 564 # i.e. which are just being passed through 565 # 566 # list of input files 567 real_input_file_names = set() 568 for f in get_strings_in_flattened_sequence(i): 569 real_input_file_names.add(os.path.realpath(f)) 570 571 # 572 # touch files only 573 # 574 for f in get_strings_in_flattened_sequence(o): 575 576 if os.path.realpath(f) in real_input_file_names: 577 continue 578 579 # 580 # race condition still possible... 581 # 582 with open(f, 'a') as ff: 583 os.utime(f, None) 584 # if not os.path.exists(f): 585 # open(f, 'w') 586 # mtime = os.path.getmtime(f) 587 # else: 588 # os.utime(f, None) 589 # mtime = os.path.getmtime(f) 590 591 # job_history[f] = chksum # update file times and job details in 592 # history 593 594 # 595 # register strings in output file for cleanup 596 # 597 for f in get_strings_in_flattened_sequence(o): 598 register_cleanup(f, "file") 599 600 601def job_wrapper_output_files(params, user_defined_work_func, register_cleanup, touch_files_only): 602 """ 603 job wrapper for all that only deals with output files. 604 605 run func on any output file if not up to date 606 """ 607 job_wrapper_io_files(params, user_defined_work_func, register_cleanup, touch_files_only, 608 output_files_only=True) 609 610 611def job_wrapper_mkdir(params, user_defined_work_func, register_cleanup, touch_files_only): 612 """ 613 Make missing directories including any intermediate directories on the specified path(s) 614 """ 615 # 616 # Just in case, swallow file exist errors because some other makedirs 617 # might be subpath of this directory 618 # Should not be necessary because of "sorted" in task_mkdir 619 # 620 # 621 if len(params) == 1: 622 dirs = params[0] 623 624 # if there are two parameters, they are i/o, and the directories to be 625 # created are the output 626 elif len(params) >= 2: 627 dirs = params[1] 628 else: 629 raise Exception("No arguments in mkdir check %s" % (params,)) 630 631 # get all file names in flat list 632 dirs = get_strings_in_flattened_sequence(dirs) 633 634 for d in dirs: 635 try: 636 # Please email the authors if an uncaught exception is raised here 637 os.makedirs(d) 638 register_cleanup(d, "makedirs") 639 except: 640 # 641 # ignore exception if 642 # exception == OSError + "File exists" or // Linux 643 # exception == WindowsError + "file already exists" // Windows 644 # Are other exceptions raised by other OS? 645 # 646 # 647 exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() 648 # exceptionType == OSError and 649 if "File exists" in str(exceptionValue): 650 continue 651 # exceptionType == WindowsError and 652 elif "file already exists" in str(exceptionValue): 653 continue 654 raise 655 656 # changed for compatibility with python 3.x 657 # except OSError, e: 658 # if "File exists" not in e: 659 # raise 660 661 662JOB_ERROR = 0 663JOB_SIGNALLED_BREAK = 1 664JOB_UP_TO_DATE = 2 665JOB_COMPLETED = 3 666 667# t_job_result 668# Previously a collections.namedtuple (introduced in python 2.6) 669# Now using implementation from running 670# t_job_result = namedtuple('t_job_result', 671# 'task_name state job_name return_value exception', verbose =1) 672# for compatibility with python 2.5 673t_job_result = namedtuple('t_job_result', 674 'task_name ' 675 'node_index state ' 676 'job_name ' 677 'return_value ' 678 'exception ' 679 'params ' 680 'unglobbed_params ') 681 682 683def run_pooled_job_without_exceptions(process_parameters): 684 """ 685 handles running jobs in parallel 686 Make sure exceptions are caught here: 687 Otherwise, these will kill the thread/process 688 return any exceptions which will be rethrown at the other end: 689 See ruffus_exceptions.RethrownJobError / run_all_jobs_in_task 690 """ 691 # signal.signal(signal.SIGINT, signal.SIG_IGN) 692 (params, unglobbed_params, task_name, node_index, job_name, job_wrapper, user_defined_work_func, 693 job_limit_semaphore, death_event, touch_files_only) = process_parameters 694 695 # #job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True) 696 # outfile = params[1] if len(params) > 1 else None # mkdir has no output 697 # if not isinstance(outfile, list): 698 # # outfile = [outfile] 699 # for o in outfile: 700 # job_history.pop(o, None) # remove outfile from history if it exists 701 702 if job_limit_semaphore is None: 703 job_limit_semaphore = do_nothing_semaphore() 704 705 try: 706 with job_limit_semaphore: 707 # EXTRA pipeline_run DEBUGGING 708 if EXTRA_PIPELINERUN_DEBUGGING: 709 sys.stderr.write( 710 ">" * 36 + "[[ job_wrapper ]]" + ">" * 27 + "\n") 711 return_value = job_wrapper(params, user_defined_work_func, 712 register_cleanup, touch_files_only) 713 714 # 715 # ensure one second between jobs 716 # 717 # if one_second_per_job: 718 # time.sleep(1.01) 719 # EXTRA pipeline_run DEBUGGING 720 if EXTRA_PIPELINERUN_DEBUGGING: 721 sys.stderr.write( 722 "<" * 36 + "[[ job_wrapper done ]]" + "<" * 22 + "\n") 723 return t_job_result(task_name, node_index, JOB_COMPLETED, job_name, return_value, None, 724 params, unglobbed_params) 725 except KeyboardInterrupt as e: 726 # Reraise KeyboardInterrupt as a normal Exception. 727 # Should never be necessary here 728 # EXTRA pipeline_run DEBUGGING 729 if EXTRA_PIPELINERUN_DEBUGGING: 730 sys.stderr.write( 731 "E" * 36 + "[[ KeyboardInterrupt ]]" + "E" * 21 + "\n") 732 death_event.set() 733 raise Ruffus_Keyboard_Interrupt_Exception("KeyboardInterrupt") 734 except: 735 # EXTRA pipeline_run DEBUGGING 736 if EXTRA_PIPELINERUN_DEBUGGING: 737 sys.stderr.write( 738 "E" * 36 + "[[ Other Interrupt ]]" + "E" * 23 + "\n") 739 # Wrap up one or more exceptions rethrown across process boundaries 740 # 741 # See multiprocessor.Server.handle_request/serve_client for an 742 # analogous function 743 exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() 744 exception_stack = traceback.format_exc() 745 exception_name = exceptionType.__module__ + '.' + exceptionType.__name__ 746 exception_value = str(exceptionValue) 747 if len(exception_value): 748 exception_value = "(%s)" % exception_value 749 750 if exceptionType == Ruffus_Keyboard_Interrupt_Exception: 751 death_event.set() 752 job_state = JOB_SIGNALLED_BREAK 753 elif exceptionType == ruffus_exceptions.JobSignalledBreak: 754 job_state = JOB_SIGNALLED_BREAK 755 else: 756 job_state = JOB_ERROR 757 return t_job_result(task_name, node_index, job_state, job_name, None, 758 [task_name, 759 job_name, 760 exception_name, 761 exception_value, 762 exception_stack], params, unglobbed_params) 763 764 765def subprocess_checkcall_wrapper(**named_args): 766 """ 767 Splits string at semicolons and runs with subprocess.check_call 768 """ 769 for cmd in named_args["command_str"].split(";"): 770 cmd = cmd.replace("\n", " ").strip() 771 if not len(cmd): 772 continue 773 cmd = cmd.format(**named_args) 774 subprocess.check_call(cmd, shell=True) 775 776 777def exec_string_as_task_func(input_args, output_args, **named_args): 778 """ 779 Ruffus provided function for tasks which are just strings 780 (no Python function provided) 781 The task executor function is given as a paramter which is 782 then called with the arguments. 783 Convoluted but avoids special casing too much 784 """ 785 if not "__RUFFUS_TASK_CALLBACK__" in named_args or \ 786 not callable(named_args["__RUFFUS_TASK_CALLBACK__"]): 787 raise Exception("Missing call back function") 788 if not "command_str" in named_args or \ 789 not isinstance(named_args["command_str"], (path_str_type,)): 790 raise Exception("Missing call back function string") 791 792 callback = named_args["__RUFFUS_TASK_CALLBACK__"] 793 del named_args["__RUFFUS_TASK_CALLBACK__"] 794 795 named_args["input"] = input_args 796 named_args["output"] = output_args 797 callback(**named_args) 798 799 800# todo 801def register_cleanup(file_name, operation): 802 pass 803 804 805# pipeline functions only have "name" as a named parameter 806def get_name_from_args(named_args): 807 if "name" in named_args: 808 name = named_args["name"] 809 del named_args["name"] 810 return name 811 else: 812 return None 813 814 815class Pipeline(dict): 816 """ 817 818 Each Ruffus Pipeline object has to have a unique name. "main" is 819 reserved for "main_pipeline", the default pipeline for all Ruffus 820 decorators. 821 """ 822 823 # dictionary of all pipelines 824 pipelines = dict() 825 cnt_mkdir = 0 826 827 def __init__(self, name, *arg, **kw): 828 # initialise dict 829 super(Pipeline, self).__init__(*arg, **kw) 830 831 # set of tasks 832 self.tasks = set() 833 self.task_names = set() 834 835 # add self to list of all pipelines 836 self.name = name 837 self.original_name = name 838 if name in Pipeline.pipelines: 839 raise Exception("Error:\nDuplicate pipeline. " 840 "A pipeline named '%s' already exists.\n" % name) 841 Pipeline.pipelines[name] = self 842 self.head_tasks = [] 843 self.tail_tasks = [] 844 self.lookup = dict() 845 846 self.command_str_callback = subprocess_checkcall_wrapper 847 self.job_state = "active" 848 849 @classmethod 850 def clear_all(cls): 851 """clear all pipelines. 852 """ 853 cls.pipelines = dict() 854 855 def _create_task(self, task_func, task_name=None): 856 """ 857 Create task with a function 858 """ 859 860 # 861 # If string, this is a command to be executed later 862 # Derive task name from command 863 # 864 # 865 if isinstance(task_func, (path_str_type,)): 866 task_str = task_func 867 task_func = exec_string_as_task_func 868 if not task_name: 869 elements = task_str.split() 870 use_n_elements = 1 871 while use_n_elements < len(elements): 872 task_name = " ".join(elements[0:use_n_elements]) 873 if task_name not in self.task_names: 874 break 875 else: 876 raise ruffus_exceptions.error_duplicate_task_name("The task string '%s' is ambiguous for " 877 "Pipeline '%s'. You must disambiguate " 878 "explicitly with different task names " 879 % (task_str, self.name)) 880 return Task(task_func, task_name, self) 881 882 # 883 # Derive task name from Python Task function name 884 # 885 if not task_name: 886 if task_func.__module__ == "__main__": 887 task_name = task_func.__name__ 888 else: 889 task_name = str(task_func.__module__) + \ 890 "." + task_func.__name__ 891 892 if task_name not in self: 893 return Task(task_func, task_name, self) 894 895 # task_name already there as the identifying task_name. 896 # If the task_func also matches everything is fine 897 elif (task_name in self.task_names and 898 self[task_name].user_defined_work_func == task_func): 899 return self[task_name] 900 901 # If the task name is already taken but with a different function, 902 # this will blow up 903 # But if the function is being reused and with a previously different 904 # task name then OK 905 else: 906 return Task(task_func, task_name, self) 907 908 def _complete_task_setup(self, processed_tasks): 909 """ 910 Finishes initialising all tasks 911 Make sure all tasks in dependency list are linked to real functions 912 """ 913 914 processed_pipelines = set([self.name]) 915 unprocessed_tasks = deque(self.tasks) 916 while len(unprocessed_tasks): 917 task = unprocessed_tasks.popleft() 918 if task in processed_tasks: 919 continue 920 processed_tasks.add(task) 921 for ancestral_task in task._complete_setup(): 922 if ancestral_task not in processed_tasks: 923 unprocessed_tasks.append(ancestral_task) 924 processed_pipelines.add(ancestral_task.pipeline.name) 925 # 926 # some jobs single state status mirrors parent's state 927 # and parent task not known until dependencies resolved 928 # Is this legacy code? 929 # Breaks @merge otherwise 930 # 931 if isinstance(task._is_single_job_single_output, Task): 932 task._is_single_job_single_output = \ 933 task._is_single_job_single_output._is_single_job_single_output 934 935 for pipeline_name in list(processed_pipelines): 936 if pipeline_name != self.name: 937 processed_pipelines |= self.pipelines[pipeline_name]._complete_task_setup( 938 processed_tasks) 939 940 return processed_pipelines 941 942 def set_command_str_callback(self, command_str_callback): 943 if not callable(command_str_callback): 944 raise Exception( 945 "set_command_str_callback() takes a python function or a callable object.") 946 self.command_str_callback = command_str_callback 947 948 def get_head_tasks(self): 949 """ 950 Return tasks at the head of the pipeline, 951 i.e. with only descendants/dependants 952 N.B. Head and Tail sets can overlap 953 954 Most of the time when self.head_tasks == [], it has been left undefined by mistake. 955 So we usually throw an exception at the point of use 956 """ 957 return self.head_tasks 958 959 def set_head_tasks(self, head_tasks): 960 """ 961 Specifies tasks at the head of the pipeline, 962 i.e. with only descendants/dependants 963 """ 964 if not isinstance(head_tasks, (list,)): 965 raise Exception("Pipelines['{pipeline_name}'].set_head_tasks() expects a " 966 "list not {head_tasks_type}".format(pipeline_name=self.name, 967 head_tasks_type=type(head_tasks))) 968 969 for tt in head_tasks: 970 if not isinstance(tt, (Task,)): 971 raise Exception("Pipelines['{pipeline_name}'].set_head_tasks() expects a " 972 "list of tasks not {task_type} {task}".format(pipeline_name=self.name, 973 task_type=type( 974 tt), 975 task=1)) 976 self.head_tasks = head_tasks 977 978 def get_tail_tasks(self): 979 """ 980 Return tasks at the tail of the pipeline, 981 i.e. without descendants/dependants 982 N.B. Head and Tail sets can overlap 983 984 Most of the time when self.tail_tasks == [], 985 it has been left undefined by mistake. 986 So we usually throw an exception at the point of use 987 """ 988 return self.tail_tasks 989 990 def set_tail_tasks(self, tail_tasks): 991 """ 992 Specifies tasks at the tail of the pipeline, 993 i.e. with only descendants/dependants 994 """ 995 self.tail_tasks = tail_tasks 996 997 def set_input(self, **args): 998 """ 999 Change the input parameter(s) of the designated "head" tasks of the pipeline 1000 """ 1001 if not len(self.get_head_tasks()): 1002 raise ruffus_exceptions.error_no_head_tasks("Pipeline '{pipeline_name}' has no head tasks defined.\n" 1003 "Which task in '{pipeline_name}' do you want " 1004 "to set_input() for?".format(pipeline_name=self.name)) 1005 1006 for tt in self.get_head_tasks(): 1007 tt.set_input(**args) 1008 1009 def set_output(self, **args): 1010 """ 1011 Change the output parameter(s) of the designated "head" tasks of the pipeline 1012 """ 1013 if not len(self.get_head_tasks()): 1014 raise ruffus_exceptions.error_no_head_tasks("Pipeline '{pipeline_name}' has no head tasks defined.\n" 1015 "Which task in '{pipeline_name}' do you want " 1016 "to set_output() for?".format(pipeline_name=self.name)) 1017 1018 for tt in self.get_head_tasks(): 1019 tt.set_output(**args) 1020 1021 def suspend_jobs(self): 1022 self.job_state = "suspended" 1023 1024 def resume_jobs(self): 1025 self.job_state = "active" 1026 1027 def is_job_suspended(self): 1028 return self.job_state == "suspended" 1029 1030 def clone(self, new_name, *arg, **kw): 1031 """ 1032 Make a deep copy of the pipeline 1033 """ 1034 1035 # setup new pipeline 1036 new_pipeline = Pipeline(new_name, *arg, **kw) 1037 1038 # set of tasks 1039 new_pipeline.tasks = set(task._clone(new_pipeline) 1040 for task in self.tasks) 1041 new_pipeline.task_names = set(self.task_names) 1042 1043 # so keep original name after a series of cloning operations 1044 new_pipeline.original_name = self.original_name 1045 1046 # lookup tasks in new pipeline 1047 new_pipeline.head_tasks = [new_pipeline[t._name] 1048 for t in self.head_tasks] 1049 new_pipeline.tail_tasks = [new_pipeline[t._name] 1050 for t in self.tail_tasks] 1051 1052 # do not copy a suspended state, but always set to active 1053 new_pipeline.state = "active" 1054 return new_pipeline 1055 1056 def mkdir(self, *unnamed_args, **named_args): 1057 """ 1058 Makes directories each incoming input to a corresponding output 1059 This is a One to One operation 1060 """ 1061 name = get_name_from_args(named_args) 1062 # func is a placeholder... 1063 if name is None: 1064 self.cnt_mkdir += 1 1065 if self.cnt_mkdir == 1: 1066 name = "mkdir" 1067 else: 1068 name = "mkdir # %d" % self.cnt_mkdir 1069 task = self._create_task(task_func=job_wrapper_mkdir, task_name=name) 1070 task.created_via_decorator = False 1071 task.syntax = "pipeline.mkdir" 1072 task.description_with_args_placeholder = "%s(name = %r, %%s)" % ( 1073 task.syntax, task._get_display_name()) 1074 task._prepare_mkdir(unnamed_args, named_args, 1075 task.description_with_args_placeholder) 1076 return task 1077 1078 def _do_create_task_by_OOP(self, task_func, named_args, syntax): 1079 """ 1080 Helper function for 1081 Pipeline.transform 1082 Pipeline.originate 1083 pipeline.split 1084 pipeline.subdivide 1085 pipeline.parallel 1086 pipeline.files 1087 pipeline.combinations_with_replacement 1088 pipeline.combinations 1089 pipeline.permutations 1090 pipeline.product 1091 pipeline.collate 1092 pipeline.merge 1093 """ 1094 name = get_name_from_args(named_args) 1095 1096 # if task_func is a string, will 1097 # 1) set self.task_func = exec_string_as_task_func 1098 # 2) set self.name if necessary to the first unambigous words of the the command_str 1099 # 2) set self.func_description to the command_str 1100 task = self._create_task(task_func, name) 1101 1102 task.created_via_decorator = False 1103 task.syntax = syntax 1104 if isinstance(task_func, (path_str_type,)): 1105 task_func_name = task._name 1106 else: 1107 task_func_name = task_func.__name__ 1108 1109 task.description_with_args_placeholder = "{syntax}(name = {task_display_name!r}, task_func = {task_func_name}, %s)" \ 1110 .format(syntax=syntax, 1111 task_display_name=task._get_display_name(), 1112 task_func_name=task_func_name,) 1113 1114 if isinstance(task_func, (path_str_type,)): 1115 # 1116 # Make sure extras is dict 1117 # 1118 if "extras" in named_args: 1119 if not isinstance(named_args["extras"], dict): 1120 raise ruffus_exceptions.error_executable_str((task.description_with_args_placeholder % "...") + 1121 "\n requires a dictionary for named parameters. " + 1122 "For example:\n" + 1123 task.description_with_args_placeholder % 1124 "extras = {my_param = 45, her_param = 'whatever'}") 1125 else: 1126 named_args["extras"] = dict() 1127 named_args["extras"]["command_str"] = task_func 1128 # named_args["extras"]["__RUFFUS_TASK_CALLBACK__"] = pipeline.command_str_callback 1129 1130 return task 1131 1132 def lookup_task_from_name(self, task_name, default_module_name): 1133 """ 1134 If lookup returns None, means ambiguous: do nothing 1135 Only ever returns a list of one 1136 """ 1137 multiple_tasks = [] 1138 1139 # Does the unqualified name uniquely identify? 1140 if task_name in self.lookup: 1141 if len(self.lookup[task_name]) == 1: 1142 return self.lookup[task_name] 1143 else: 1144 multiple_tasks = self.lookup[task_name] 1145 1146 # Even if the unqualified name does not uniquely identify, 1147 # maybe the qualified name does 1148 full_qualified_name = default_module_name + "." + task_name 1149 if full_qualified_name in self.lookup: 1150 if len(self.lookup[full_qualified_name]) == 1: 1151 return self.lookup[full_qualified_name] 1152 else: 1153 multiple_tasks = self.lookup[task_name] 1154 1155 # Nothing matched 1156 if not multiple_tasks: 1157 return [] 1158 1159 # If either the qualified or unqualified name is ambiguous, throw... 1160 task_names = ",".join(t._name for t in multiple_tasks) 1161 raise ruffus_exceptions.error_ambiguous_task("%s is ambiguous. Which do you mean? (%s)." 1162 % (task_name, task_names)) 1163 1164 def follows(self, task_func, *unnamed_args, **named_args): 1165 """ 1166 Transforms each incoming input to a corresponding output 1167 This is a One to One operation 1168 """ 1169 task = self._do_create_task_by_OOP( 1170 task_func, named_args, "pipeline.follows") 1171 task.deferred_follow_params.append([task.description_with_args_placeholder, False, 1172 unnamed_args]) 1173 # task._connect_parents(task.description_with_args_placeholder, False, 1174 # unnamed_args) 1175 return task 1176 1177 def check_if_uptodate(self, task_func, func, **named_args): 1178 """ 1179 Specifies how a task is to be checked if it needs to be rerun (i.e. is 1180 up-to-date). 1181 func returns true if input / output files are up to date 1182 func takes as many arguments as the task function 1183 """ 1184 task = self._do_create_task_by_OOP( 1185 task_func, named_args, "check_if_uptodate") 1186 return task.check_if_uptodate(func) 1187 1188 def graphviz(self, task_func, *unnamed_args, **named_args): 1189 """ 1190 Transforms each incoming input to a corresponding output 1191 This is a One to One operation 1192 """ 1193 task = self._do_create_task_by_OOP( 1194 task_func, named_args, "pipeline.graphviz") 1195 task.graphviz_attributes = named_args 1196 if len(unnamed_args): 1197 raise TypeError("Only named arguments expected in :" + 1198 task.description_with_args_placeholder % unnamed_args) 1199 return task 1200 1201 def transform(self, task_func, *unnamed_args, **named_args): 1202 """ 1203 Transforms each incoming input to a corresponding output 1204 This is a One to One operation 1205 """ 1206 task = self._do_create_task_by_OOP( 1207 task_func, named_args, "pipeline.transform") 1208 task._prepare_transform(unnamed_args, named_args) 1209 return task 1210 1211 def originate(self, task_func, *unnamed_args, **named_args): 1212 """ 1213 Originates a new set of output files, 1214 one output per call to the task function 1215 """ 1216 task = self._do_create_task_by_OOP( 1217 task_func, named_args, "pipeline.originate") 1218 task._prepare_originate(unnamed_args, named_args) 1219 return task 1220 1221 def split(self, task_func, *unnamed_args, **named_args): 1222 """ 1223 Splits a single set of input files into multiple output file names, 1224 where the number of output files may not be known beforehand. 1225 This is a One to Many operation 1226 """ 1227 task = self._do_create_task_by_OOP( 1228 task_func, named_args, "pipeline.split") 1229 task._prepare_split(unnamed_args, named_args) 1230 return task 1231 1232 def subdivide(self, task_func, *unnamed_args, **named_args): 1233 """ 1234 Subdivides a each set of input files into multiple output file names, 1235 where the number of output files may not be known beforehand. 1236 This is a Many to Even More operation 1237 """ 1238 task = self._do_create_task_by_OOP( 1239 task_func, named_args, "pipeline.subdivide") 1240 task._prepare_subdivide(unnamed_args, named_args) 1241 return task 1242 1243 def merge(self, task_func, *unnamed_args, **named_args): 1244 """ 1245 Merges multiple input files into a single output. 1246 This is a Many to One operation 1247 """ 1248 task = self._do_create_task_by_OOP( 1249 task_func, named_args, "pipeline.merge") 1250 task._prepare_merge(unnamed_args, named_args) 1251 return task 1252 1253 def collate(self, task_func, *unnamed_args, **named_args): 1254 """ 1255 Collates each set of multiple matching input files into an output. 1256 This is a Many to Fewer operation 1257 """ 1258 task = self._do_create_task_by_OOP( 1259 task_func, named_args, "pipeline.collate") 1260 task._prepare_collate(unnamed_args, named_args) 1261 return task 1262 1263 def product(self, task_func, *unnamed_args, **named_args): 1264 """ 1265 All-vs-all Product between items from each set of inputs 1266 """ 1267 task = self._do_create_task_by_OOP( 1268 task_func, named_args, "pipeline.product") 1269 task._prepare_product(unnamed_args, named_args) 1270 return task 1271 1272 def permutations(self, task_func, *unnamed_args, **named_args): 1273 """ 1274 Permutations between items from a set of inputs 1275 * k-length tuples 1276 * all possible orderings 1277 * no self vs self 1278 """ 1279 task = self._do_create_task_by_OOP( 1280 task_func, named_args, "pipeline.permutations") 1281 task._prepare_combinatorics( 1282 unnamed_args, named_args, ruffus_exceptions.error_task_permutations) 1283 return task 1284 1285 def combinations(self, task_func, *unnamed_args, **named_args): 1286 """ 1287 Combinations of items from a set of inputs 1288 * k-length tuples 1289 * Single (sorted) ordering, i.e. AB is the same as BA, 1290 * No repeats. No AA, BB 1291 For Example: 1292 combinations("ABCD", 3) = ['ABC', 'ABD', 'ACD', 'BCD'] 1293 combinations("ABCD", 2) = ['AB', 'AC', 'AD', 'BC', 'BD', 'CD'] 1294 """ 1295 task = self._do_create_task_by_OOP( 1296 task_func, named_args, "pipeline.combinations") 1297 task._prepare_combinatorics( 1298 unnamed_args, named_args, ruffus_exceptions.error_task_combinations) 1299 return task 1300 1301 def combinations_with_replacement(self, task_func, *unnamed_args, 1302 **named_args): 1303 """ 1304 Combinations with replacement of items from a set of inputs 1305 * k-length tuples 1306 * Single (sorted) ordering, i.e. AB is the same as BA, 1307 * Repeats. AA, BB, AAC etc. 1308 For Example: 1309 combinations_with_replacement("ABCD", 2) = [ 1310 'AA', 'AB', 'AC', 'AD', 1311 'BB', 'BC', 'BD', 1312 'CC', 'CD', 1313 'DD'] 1314 combinations_with_replacement("ABCD", 3) = [ 1315 'AAA', 'AAB', 'AAC', 'AAD', 1316 'ABB', 'ABC', 'ABD', 1317 'ACC', 'ACD', 1318 'ADD', 1319 'BBB', 'BBC', 'BBD', 1320 'BCC', 'BCD', 1321 'BDD', 1322 'CCC', 'CCD', 1323 'CDD', 1324 'DDD'] 1325 """ 1326 task = self._do_create_task_by_OOP( 1327 task_func, named_args, "combinations_with_replacement") 1328 task._prepare_combinatorics(unnamed_args, named_args, 1329 ruffus_exceptions.error_task_combinations_with_replacement) 1330 return task 1331 1332 def files(self, task_func, *unnamed_args, **named_args): 1333 """ 1334 calls user function in parallel 1335 with either each of a list of parameters 1336 or using parameters generated by a custom function 1337 1338 In the parameter list, 1339 The first two items of each set of parameters must 1340 be input/output files or lists of files or Null 1341 """ 1342 task = self._do_create_task_by_OOP( 1343 task_func, named_args, "pipeline.files") 1344 task._prepare_files(unnamed_args, named_args) 1345 return task 1346 1347 def parallel(self, task_func, *unnamed_args, **named_args): 1348 """ 1349 calls user function in parallel 1350 with either each of a list of parameters 1351 or using parameters generated by a custom function 1352 """ 1353 task = self._do_create_task_by_OOP( 1354 task_func, named_args, "pipeline.parallel") 1355 task._prepare_parallel(unnamed_args, named_args) 1356 return task 1357 1358 # Forwarding functions. Should bring procedural function here and 1359 # forward from the other direction? 1360 def run(self, *unnamed_args, **named_args): 1361 if "pipeline" not in named_args: 1362 named_args["pipeline"] = self 1363 pipeline_run(*unnamed_args, **named_args) 1364 1365 def printout(self, *unnamed_args, **named_args): 1366 if "pipeline" not in named_args: 1367 named_args["pipeline"] = self 1368 pipeline_printout(*unnamed_args, **named_args) 1369 1370 def get_task_names(self, *unnamed_args, **named_args): 1371 if "pipeline" not in named_args: 1372 named_args["pipeline"] = self 1373 pipeline_get_task_names(*unnamed_args, **named_args) 1374 1375 def printout_graph(self, *unnamed_args, **named_args): 1376 if "pipeline" not in named_args: 1377 named_args["pipeline"] = self 1378 pipeline_printout_graph(*unnamed_args, **named_args) 1379 1380 1381# Global default shared pipeline (used for decorators) 1382main_pipeline = Pipeline(name="main") 1383 1384 1385def lookup_unique_task_from_func(task_func, default_pipeline_name="main"): 1386 """ 1387 Go through all pipelines and match task_func to find a unique task 1388 Throw exception if ambiguous 1389 """ 1390 1391 def unique_task_from_func_in_pipeline(task_func, pipeline): 1392 if task_func in pipeline.lookup: 1393 if len(pipeline.lookup[task_func]) == 1: 1394 # Found task! 1395 return pipeline.lookup[task_func][0] 1396 1397 # Found too many tasks! Ambiguous... 1398 task_names = ", ".join( 1399 task._name for task in pipeline.lookup[task_func]) 1400 raise ruffus_exceptions.error_ambiguous_task( 1401 "Function def %s(...): is used by multiple tasks (%s). Which one do you mean?." 1402 % (task_func.__name__, task_names)) 1403 return None 1404 1405 # 1406 # Iterate through all pipelines starting with the specified pipeline 1407 # 1408 task = unique_task_from_func_in_pipeline( 1409 task_func, Pipeline.pipelines[default_pipeline_name]) 1410 if task: 1411 return task 1412 1413 # 1414 # Sees if function uniquely identifies a single task across pipelines 1415 # 1416 found_tasks = [] 1417 found_pipelines = [] 1418 for pipeline in Pipeline.pipelines.values(): 1419 task = unique_task_from_func_in_pipeline(task_func, pipeline) 1420 if task: 1421 found_tasks.append(task) 1422 found_pipelines.append(pipeline) 1423 1424 if len(found_tasks) == 1: 1425 return found_tasks[0] 1426 1427 if len(found_tasks) > 1: 1428 raise ruffus_exceptions.error_ambiguous_task("Task Name %s is ambiguous and specifies different tasks " 1429 "across multiple pipelines (%s)." 1430 % (task_func.__name__, ",".join(found_pipelines))) 1431 1432 return None 1433 1434 1435def lookup_tasks_from_name(task_name, default_pipeline_name, default_module_name="__main__", 1436 pipeline_names_as_alias_to_all_tasks=False): 1437 """ 1438 1439 Tries: 1440 (1) Named pipeline in the format pipeline::task_name 1441 (2) tasks matching task_name in default_pipeline_name 1442 (3) pipeline names matching task_name 1443 (4) if task_name uniquely identifies any task in all other pipelines... 1444 1445 Only returns multiple tasks if (3) task_name is the name of a pipeline 1446 """ 1447 1448 # Lookup the task from the function or task name 1449 pipeline_name, task_name = re.match("(?:(.+)::)?(.*)", task_name).groups() 1450 1451 # 1452 # (1) Look in specified pipeline 1453 # Will blow up if task_name is ambiguous 1454 # 1455 if pipeline_name: 1456 if pipeline_name not in Pipeline.pipelines: 1457 raise ruffus_exceptions.error_not_a_pipeline("%s is not a pipeline." % pipeline_name) 1458 pipeline = Pipeline.pipelines[pipeline_name] 1459 return pipeline.lookup_task_from_name(task_name, default_module_name) 1460 1461 # 1462 # (2) Try default pipeline 1463 # Will blow up if task_name is ambiguous 1464 # 1465 if default_pipeline_name not in Pipeline.pipelines: 1466 raise ruffus_exceptions.error_not_a_pipeline( 1467 "%s is not a pipeline." % default_pipeline_name) 1468 pipeline = Pipeline.pipelines[default_pipeline_name] 1469 tasks = pipeline.lookup_task_from_name(task_name, default_module_name) 1470 if tasks: 1471 return tasks 1472 1473 # (3) task_name is actually the name of a pipeline 1474 # Alias for pipeline.get_tail_tasks() 1475 # N.B. This is the *only* time multiple tasks might be returned 1476 # 1477 if task_name in Pipeline.pipelines: 1478 if pipeline_names_as_alias_to_all_tasks: 1479 return Pipeline.pipelines[task_name].tasks 1480 elif len(Pipeline.pipelines[task_name].get_tail_tasks()): 1481 return Pipeline.pipelines[task_name].get_tail_tasks() 1482 else: 1483 raise ruffus_exceptions.error_no_tail_tasks( 1484 "Pipeline %s has no tail tasks defined. Which task do you " 1485 "mean when you specify the whole pipeline as a dependency?" % task_name) 1486 1487 # 1488 # (4) Try all other pipelines 1489 # Will blow up if task_name is ambiguous 1490 # 1491 found_tasks = [] 1492 found_pipelines = [] 1493 for pipeline_name, pipeline in Pipeline.pipelines.items(): 1494 tasks = pipeline.lookup_task_from_name(task_name, default_module_name) 1495 if tasks: 1496 found_tasks.append(tasks) 1497 found_pipelines.append(pipeline_name) 1498 1499 # unambiguous: good 1500 if len(found_tasks) == 1: 1501 return found_tasks[0] 1502 1503 # ambiguous: bad 1504 if len(found_tasks) > 1: 1505 raise ruffus_exceptions.error_ambiguous_task( 1506 "Task Name %s is ambiguous and specifies different tasks across " 1507 "several pipelines (%s)." % (task_name, ",".join(found_pipelines))) 1508 1509 # Nothing found 1510 return [] 1511 1512 1513def lookup_tasks_from_user_specified_names(task_description, task_names, 1514 default_pipeline_name="main", 1515 default_module_name="__main__", 1516 pipeline_names_as_alias_to_all_tasks=False): 1517 """ 1518 Given a list of task names, look up the corresponding tasks 1519 Will just pass through if the task_name is already a task 1520 """ 1521 1522 # 1523 # In case we are given a single item instead of a list 1524 # 1525 if not isinstance(task_names, (list, tuple)): 1526 task_names = [task_names] 1527 1528 task_list = [] 1529 1530 for task_name in task_names: 1531 1532 # "task_name" is a Task or pipeline, add those 1533 if isinstance(task_name, Task): 1534 task_list.append(task_name) 1535 continue 1536 1537 elif isinstance(task_name, Pipeline): 1538 if pipeline_names_as_alias_to_all_tasks: 1539 task_list.extend(task_name.tasks) 1540 continue 1541 # use tail tasks 1542 elif len(task_name.get_tail_tasks()): 1543 task_list.extend(task_name.get_tail_tasks()) 1544 continue 1545 # no tail task 1546 else: 1547 raise ruffus_exceptions.error_no_tail_tasks("Pipeline %s has no 'tail tasks'. Which task do you mean" 1548 " when you specify the whole pipeline?" % task_name.name) 1549 1550 if isinstance(task_name, Callable): 1551 # blows up if ambiguous 1552 task = lookup_unique_task_from_func( 1553 task_name, default_pipeline_name) 1554 # blow up for unwrapped function 1555 if not task: 1556 raise ruffus_exceptions.error_function_is_not_a_task( 1557 ("Function def %s(...): is not a Ruffus task." % task_func.__name__) + 1558 " The function needs to have a ruffus decoration like " 1559 "'@transform', or be a member of a ruffus.Pipeline().") 1560 1561 task_list.append(task) 1562 continue 1563 1564 # some kind of string: task or func or pipeline name? 1565 if isinstance(task_name, path_str_type): 1566 1567 # Will throw Exception if ambiguous 1568 tasks = lookup_tasks_from_name( 1569 task_name, default_pipeline_name, default_module_name, 1570 pipeline_names_as_alias_to_all_tasks) 1571 # not found 1572 if not tasks: 1573 raise ruffus_exceptions.error_node_not_task("%s task '%s' is not a pipelined task in Ruffus. Is it " 1574 "spelt correctly ?" % (task_description, task_name)) 1575 task_list.extend(tasks) 1576 continue 1577 1578 else: 1579 raise TypeError( 1580 "Expecting a string or function, or a Ruffus Task or Pipeline object") 1581 return task_list 1582 1583 1584class Task(node): 1585 1586 """ 1587 1588 * Represents each stage of a pipeline. 1589 * Associated with a single python function. 1590 * Identified uniquely within the pipeline by its name. 1591 1592 """ 1593 1594 # DEBUGGG 1595 # def __str__ (self): 1596 # return "Task = <%s>" % self._get_display_name() 1597 1598 _action_names = ["unspecified", 1599 "task", 1600 "task_files_re", 1601 "task_split", 1602 "task_merge", 1603 "task_transform", 1604 "task_collate", 1605 "task_files_func", 1606 "task_files", 1607 "task_mkdir", 1608 "task_parallel", 1609 "task_active_if", 1610 "task_product", 1611 "task_permutations", 1612 "task_combinations", 1613 "task_combinations_with_replacement", 1614 "task_subdivide", 1615 "task_originate", 1616 "task_graphviz", 1617 ] 1618 # ENUMS 1619 (_action_unspecified, 1620 _action_task, 1621 _action_task_files_re, 1622 _action_task_split, 1623 _action_task_merge, 1624 _action_task_transform, 1625 _action_task_collate, 1626 _action_task_files_func, 1627 _action_task_files, 1628 _action_mkdir, 1629 _action_task_parallel, 1630 _action_active_if, 1631 _action_task_product, 1632 _action_task_permutations, 1633 _action_task_combinations, 1634 _action_task_combinations_with_replacement, 1635 _action_task_subdivide, 1636 _action_task_originate, 1637 _action_task_graphviz) = range(19) 1638 1639 (_multiple_jobs_outputs, 1640 _single_job_single_output, 1641 _job_single_matches_parent) = range(3) 1642 1643 _job_limit_semaphores = {} 1644 1645 # _________________________________________________________________________ 1646 1647 # _get_action_name 1648 1649 # _________________________________________________________________________ 1650 def _get_action_name(self): 1651 return Task._action_names[self._action_type] 1652 1653 # _________________________________________________________________________ 1654 1655 # __init__ 1656 1657 # _________________________________________________________________________ 1658 def __init__(self, func, task_name, pipeline=None, command_str=None): 1659 """ 1660 * Creates a Task object with a specified python function and task name 1661 * The type of the Task (whether it is a transform or merge or collate 1662 etc. operation) is specified subsequently. This is because Ruffus 1663 decorators do not have to be specified in order, and we don't 1664 know ahead of time. 1665 """ 1666 if pipeline is None: 1667 pipeline = main_pipeline 1668 self.pipeline = pipeline 1669 # no function: just string 1670 if command_str is not None: 1671 self.func_module_name = "" 1672 self.func_name = "" 1673 self.func_description = command_str 1674 else: 1675 self.func_module_name = str(func.__module__) 1676 self.func_name = func.__name__ 1677 # convert description into one line 1678 self.func_description = re.sub( 1679 r"\n\s+", " ", func.__doc__).strip() if func.__doc__ else "" 1680 1681 if not task_name: 1682 task_name = self.func_module_name + "." + self.func_name 1683 1684 node.__init__(self, task_name) 1685 self._action_type = Task._action_task 1686 self._action_type_desc = Task._action_names[self._action_type] 1687 1688 # Each task has its own checksum level 1689 # At the moment this is really so multiple pipelines in the same 1690 # script can have different checksum levels 1691 # Though set by pipeline_xxxx functions, have initial valid value so 1692 # unit tests work :-| 1693 self.checksum_level = CHECKSUM_FILE_TIMESTAMPS 1694 self.param_generator_func = None 1695 self.needs_update_func = None 1696 self.job_wrapper = job_wrapper_generic 1697 1698 # 1699 self.job_descriptor = generic_job_descriptor 1700 1701 # jobs which produce a single output. 1702 # special handling for task.get_output_files for dependency chaining 1703 self._is_single_job_single_output = self._multiple_jobs_outputs 1704 self.single_multi_io = self._many_to_many 1705 1706 # function which is decorated and does the actual work 1707 self.user_defined_work_func = func 1708 1709 # functions which will be called when task completes 1710 self.posttask_functions = [] 1711 1712 # give makedir automatically made parent tasks unique names 1713 self.cnt_task_mkdir = 0 1714 1715 # whether only task function itself knows what output it will produce 1716 # i.e. output is a glob or something similar 1717 self.indeterminate_output = 0 1718 1719 # cache output file names here 1720 self.output_filenames = None 1721 1722 # semaphore name must be unique 1723 self.semaphore_name = pipeline.name + ":" + task_name 1724 1725 # do not test for whether task is active 1726 self.active_if_checks = None 1727 1728 # extra flag for outputfiles 1729 self.is_active = True 1730 1731 # Created via decorator or OO interface 1732 # so that display_name looks more natural 1733 self.created_via_decorator = False 1734 1735 # Finish setting up task 1736 self._setup_task_func = Task._do_nothing_setup 1737 1738 # Finish setting up task 1739 self.deferred_follow_params = [] 1740 1741 # Finish setting up task 1742 self.parsed_args = {} 1743 self.error_type = None 1744 1745 # @split or pipeline.split etc. 1746 self.syntax = "" 1747 1748 self.description_with_args_placeholder = "%s" 1749 1750 # whether task has a (re-specifiable) input parameter 1751 self.has_input_param = True 1752 self.has_pipeline_in_input_param = False 1753 1754 # add to pipeline's lookup 1755 # this code is here rather than the pipeline so that current unittests 1756 # do not need to bother about pipeline 1757 if task_name in self.pipeline.task_names: 1758 raise ruffus_exceptions.error_duplicate_task_name("Same task name %s specified multiple times in the " 1759 "same pipeline (%s)" % (task_name, self.pipeline.name)) 1760 1761 self.pipeline.tasks.add(self) 1762 1763 # task_name is always a unique lookup and overrides everything else 1764 self.pipeline[task_name] = self 1765 self.pipeline.lookup[task_name] = [self] 1766 self.pipeline.task_names.add(task_name) 1767 1768 self.command_str_callback = "PIPELINE" 1769 1770 # 1771 # Allow pipeline to lookup task by 1772 # 1) Func 1773 # 2) task name 1774 # 3) func name 1775 # 1776 # Ambiguous func names returns an empty list [] 1777 # 1778 1779 for lookup in (func, self.func_name, self.func_module_name + "." + self.func_name): 1780 # don't add to lookup if this conflicts with a task_name which is 1781 # always unique and overriding 1782 if lookup == ".": 1783 continue 1784 if lookup not in self.pipeline.task_names: 1785 # non-unique map 1786 if lookup in self.pipeline.lookup: 1787 self.pipeline.lookup[lookup].append(self) 1788 # remove non-uniques from Pipeline 1789 if lookup in self.pipeline: 1790 del self.pipeline[lookup] 1791 else: 1792 self.pipeline.lookup[lookup] = [self] 1793 self.pipeline[lookup] = self 1794 1795 # _________________________________________________________________________ 1796 1797 # _clone 1798 1799 # _________________________________________________________________________ 1800 def _clone(self, new_pipeline): 1801 """ 1802 * Clones a Task object from self 1803 """ 1804 new_task = Task(self.user_defined_work_func, self._name, new_pipeline) 1805 new_task.command_str_callback = self.command_str_callback 1806 new_task._action_type = self._action_type 1807 new_task._action_type_desc = self._action_type_desc 1808 new_task.checksum_level = self.checksum_level 1809 new_task.param_generator_func = self.param_generator_func 1810 new_task.needs_update_func = self.needs_update_func 1811 new_task.job_wrapper = self.job_wrapper 1812 new_task.job_descriptor = self.job_descriptor 1813 new_task._is_single_job_single_output = self._is_single_job_single_output 1814 new_task.single_multi_io = self.single_multi_io 1815 new_task.posttask_functions = copy.deepcopy(self.posttask_functions) 1816 new_task.cnt_task_mkdir = self.cnt_task_mkdir 1817 new_task.indeterminate_output = self.indeterminate_output 1818 new_task.semaphore_name = self.semaphore_name 1819 new_task.is_active = self.is_active 1820 new_task.created_via_decorator = self.created_via_decorator 1821 new_task._setup_task_func = self._setup_task_func 1822 new_task.error_type = self.error_type 1823 new_task.syntax = self.syntax 1824 new_task.description_with_args_placeholder = \ 1825 self.description_with_args_placeholder.replace( 1826 self.pipeline.name, new_pipeline.name) 1827 new_task.has_input_param = self.has_input_param 1828 new_task.has_pipeline_in_input_param = self.has_pipeline_in_input_param 1829 new_task.output_filenames = copy.deepcopy(self.output_filenames) 1830 new_task.active_if_checks = copy.deepcopy(self.active_if_checks) 1831 new_task.parsed_args = copy.deepcopy(self.parsed_args) 1832 new_task.deferred_follow_params = copy.deepcopy( 1833 self.deferred_follow_params) 1834 1835 return new_task 1836 1837 # _________________________________________________________________________ 1838 1839 # command_str_callback 1840 1841 # _________________________________________________________________________ 1842 def set_command_str_callback(self, command_str_callback): 1843 if not callable(command_str_callback): 1844 raise Exception( 1845 "set_command_str_callback() takes a python function or a callable object.") 1846 self.command_str_callback = command_str_callback 1847 1848 # _________________________________________________________________________ 1849 1850 # set_output 1851 1852 # _________________________________________________________________________ 1853 1854 def set_output(self, **args): 1855 """ 1856 Changes output parameter(s) for originate 1857 set_input(output = "test.txt") 1858 """ 1859 1860 if self.syntax not in ("pipeline.originate", "@originate"): 1861 raise ruffus_exceptions.error_set_output("Can only set output for originate tasks") 1862 # 1863 # For product: filter parameter is a list of formatter() 1864 # 1865 if "output" in args: 1866 self.parsed_args["output"] = args["output"] 1867 del args["output"] 1868 else: 1869 raise ruffus_exceptions.error_set_output( 1870 "Missing the output argument in set_input(output=xxx)") 1871 1872 # Non "input" arguments 1873 if len(args): 1874 raise ruffus_exceptions.error_set_output("Unexpected argument name in set_output(%s). " 1875 "Only expecting output=xxx." % (args,)) 1876 # _________________________________________________________________________ 1877 1878 # set_input 1879 1880 # _________________________________________________________________________ 1881 def set_input(self, **args): 1882 """ 1883 Changes any of the input parameter(s) of the task 1884 For example: 1885 set_input(input = "test.txt") 1886 set_input(input2 = "b.txt") 1887 set_input(input = "a.txt", input2 = "b.txt") 1888 """ 1889 # 1890 # For product: filter parameter is a list of formatter() 1891 # 1892 if ("filter" in self.parsed_args and 1893 isinstance(self.parsed_args["filter"], list)): 1894 # the number of input is the count of filter 1895 cnt_expected_input = len(self.parsed_args["filter"]) 1896 1897 # make sure the parsed parameter argument is setup, with empty 1898 # lists if necessary 1899 # Should have been done already... 1900 # if self.parsed_args["input"] is None: 1901 # self.parsed_args["input"] = [[] 1902 # for i in range(cnt_expected_input)] 1903 1904 # update each element of the list accordingly 1905 # removing args so we can check if there is anything left over 1906 for inputN in range(cnt_expected_input): 1907 input_name = "input%d" % (inputN + 1) if inputN else "input" 1908 if input_name in args: 1909 self.parsed_args["input"][inputN] = args[input_name] 1910 del args[input_name] 1911 1912 if len(args): 1913 raise ruffus_exceptions.error_set_input("Unexpected arguments in set_input(%s). " 1914 "Only expecting inputN=xxx" % (args,)) 1915 return 1916 1917 if "input" in args: 1918 self.parsed_args["input"] = args["input"] 1919 del args["input"] 1920 else: 1921 raise ruffus_exceptions.error_set_input( 1922 "Missing the input argument in set_input(input=xxx)") 1923 1924 # Non "input" arguments 1925 if len(args): 1926 raise ruffus_exceptions.error_set_input("Unexpected argument name in set_input(%s). " 1927 "Only expecting input=xxx." % (args,)) 1928 1929 # _________________________________________________________________________ 1930 1931 # _init_for_pipeline 1932 1933 # _________________________________________________________________________ 1934 def _init_for_pipeline(self): 1935 """ 1936 Initialize variables for pipeline run / printout 1937 1938 ********** 1939 BEWARE 1940 ********** 1941 1942 Because state is stored, ruffus is *not* reentrant. 1943 1944 TODO: Need to create runtime DAG to mirror task DAG which holds 1945 output_filenames to be reentrant 1946 1947 ********** 1948 BEWARE 1949 ********** 1950 """ 1951 1952 # cache output file names here 1953 self.output_filenames = None 1954 1955 # _________________________________________________________________________ 1956 1957 # _set_action_type 1958 1959 # _________________________________________________________________________ 1960 def _set_action_type(self, new_action_type): 1961 """ 1962 Save how this task 1963 1) tests whether it is up-to-date and 1964 2) handles input/output files 1965 1966 Checks that the task has not been defined with conflicting actions 1967 1968 """ 1969 if self._action_type not in (Task._action_unspecified, Task._action_task): 1970 old_action = Task._action_names[self._action_type] 1971 new_action = Task._action_names[new_action_type] 1972 actions = " and ".join(list(set((old_action, new_action)))) 1973 raise ruffus_exceptions.error_decorator_args("Duplicate task for:\n\n%s\n\n" 1974 "This has already been specified with a the same name " 1975 "or function\n" 1976 "(%r, %s)\n" % 1977 (self.description_with_args_placeholder % "...", 1978 self._get_display_name(), 1979 actions)) 1980 self._action_type = new_action_type 1981 self._action_type_desc = Task._action_names[new_action_type] 1982 1983 def _get_job_name(self, descriptive_param, verbose_abbreviated_path, runtime_data): 1984 """ 1985 Use job descriptor to return short name for job including any parameters 1986 1987 runtime_data is not (yet) used but may be used to add context in future 1988 """ 1989 return self.job_descriptor(descriptive_param, verbose_abbreviated_path, runtime_data)[0] 1990 1991 def _get_display_name(self): 1992 """ 1993 Returns task name, removing __main__. namespace or main. if present 1994 """ 1995 if self.pipeline.name != "main": 1996 return "{pipeline_name}::{task_name}".format(pipeline_name=self.pipeline.name, 1997 task_name=self._name.replace("__main__.", "").replace("main::", "")) 1998 else: 1999 return self._name.replace("__main__.", "").replace("main::", "") 2000 2001 def _get_decorated_function(self): 2002 """ 2003 Returns name of task function, removing __main__ namespace if necessary 2004 If not specified using decorator notation, returns empty string 2005 N.B. Returns trailing new line 2006 2007 """ 2008 if not self.created_via_decorator: 2009 return "" 2010 2011 func_name = (self.func_module_name + "." + 2012 self.func_name) \ 2013 if self.func_module_name != "__main__" else self.func_name 2014 return "def %s(...):\n ...\n" % func_name 2015 2016 def _update_active_state(self): 2017 # If has an @active_if decorator, check if the task needs to be run 2018 # @active_if parameters may be call back functions or booleans 2019 if (self.active_if_checks is not None and 2020 any(not arg() if isinstance(arg, Callable) else not arg 2021 for arg in self.active_if_checks)): 2022 # flip is active to false. 2023 # ( get_output_files() will return empty if inactive ) 2024 # Remember each iteration of pipeline_printout pipeline_run 2025 # will have another bite at changing this value 2026 self.is_active = False 2027 else: 2028 # flip is active to True so that downstream dependencies will be 2029 # correct ( get_output_files() will return empty if inactive ) 2030 # Remember each iteration of pipeline_printout pipeline_run will 2031 # have another bite at changing this value 2032 self.is_active = True 2033 2034 # This code will look much better once we have job level 2035 # dependencies pipeline_run has dependencies percolating 2036 # up/down. Don't want to recreate all the logic here 2037 def _printout(self, runtime_data, force_rerun, job_history, task_is_out_of_date, verbose=1, 2038 verbose_abbreviated_path=2, indent=4): 2039 """ 2040 Print out all jobs for this task 2041 2042 verbose = 2043 level 1 : logs Out-of-date Task names 2044 level 2 : logs All Tasks (including any task function 2045 docstrings) 2046 level 3 : logs Out-of-date Jobs in Out-of-date Tasks, no 2047 explanation 2048 level 4 : logs Out-of-date Jobs in Out-of-date Tasks, 2049 saying why they are out of date (include only 2050 list of up-to-date tasks) 2051 level 5 : All Jobs in Out-of-date Tasks (include only list 2052 of up-to-date tasks) 2053 level 6 : All jobs in All Tasks whether out of date or not 2054 level 7 : Show file modification times for All jobs in All Tasks 2055 2056 """ 2057 2058 def _get_job_names(unglobbed_params, indent_str): 2059 job_names = self.job_descriptor( 2060 unglobbed_params, verbose_abbreviated_path, runtime_data)[1] 2061 if len(job_names) > 1: 2062 job_names = ([indent_str + job_names[0]] + 2063 [indent_str + " " + jn for jn in job_names[1:]]) 2064 else: 2065 job_names = ([indent_str + job_names[0]]) 2066 return job_names 2067 2068 if not verbose: 2069 return [] 2070 2071 indent_str = ' ' * indent 2072 2073 messages = [] 2074 2075 # LOGGER: level 1 : logs Out-of-date Tasks (names and warnings) 2076 2077 messages.append("Task = %r %s " % (self._get_display_name(), 2078 (" >>Forced to rerun<<" if force_rerun else ""))) 2079 if verbose == 1: 2080 return messages 2081 2082 # LOGGER: level 2 : logs All Tasks (including any task function 2083 # docstrings) 2084 if verbose >= 2 and len(self.func_description): 2085 messages.append(indent_str + '"' + self.func_description + '"') 2086 2087 # 2088 # single job state 2089 # 2090 if verbose >= 10: 2091 if self._is_single_job_single_output == self._single_job_single_output: 2092 messages.append(" Single job single output") 2093 elif self._is_single_job_single_output == self._multiple_jobs_outputs: 2094 messages.append(" Multiple jobs Multiple outputs") 2095 else: 2096 messages.append(" Single jobs status depends on %r" % 2097 self._is_single_job_single_output._get_display_name()) 2098 2099 # LOGGER: No job if less than 2 2100 if verbose <= 2: 2101 return messages 2102 2103 # increase indent for jobs up to date status 2104 indent_str += " " * 3 2105 2106 # 2107 # If has an @active_if decorator, check if the task needs to be run 2108 # @active_if parameters may be call back functions or booleans 2109 # 2110 if not self.is_active: 2111 # LOGGER 2112 if verbose <= 3: 2113 return messages 2114 messages.append(indent_str + "Task is inactive") 2115 # add spacer line 2116 messages.append("") 2117 return messages 2118 2119 # 2120 # No parameters: just call task function 2121 # 2122 if self.param_generator_func is None: 2123 # LOGGER 2124 if verbose <= 3: 2125 return messages 2126 2127 # 2128 # needs update func = None: always needs update 2129 # 2130 if self.needs_update_func is None: 2131 messages.append( 2132 indent_str + "Task needs update: No func to check if up-to-date.") 2133 return messages 2134 2135 if self.needs_update_func == needs_update_check_modify_time: 2136 needs_update, msg = self.needs_update_func( 2137 task=self, job_history=job_history, 2138 verbose_abbreviated_path=verbose_abbreviated_path, 2139 return_file_dates_when_uptodate=verbose > 6) 2140 else: 2141 needs_update, msg = self.needs_update_func() 2142 2143 if needs_update: 2144 messages.append(indent_str + "Task needs update: %s" % msg) 2145 elif verbose > 6: 2146 messages.append(indent_str + "Task %s" % msg) 2147 # 2148 # Get rid of up-to-date messages: 2149 # Superfluous for parts of the pipeline which are up-to-date 2150 # Misleading for parts of the pipeline which require 2151 # updating: tasks might have to run based on dependencies 2152 # anyway 2153 # 2154 # else: 2155 # if task_is_out_of_date: 2156 # messages.append(indent_str + "Task appears up-to-date but 2157 # will rerun after its dependencies") 2158 # else: 2159 # messages.append(indent_str + "Task up-to-date") 2160 2161 else: 2162 runtime_data["MATCH_FAILURE"] = defaultdict(set) 2163 # 2164 # return messages description per job if verbose > 5 else 2165 # whether up to date or not 2166 # 2167 cnt_jobs = 0 2168 for params, unglobbed_params in self.param_generator_func(runtime_data): 2169 cnt_jobs += 1 2170 2171 # 2172 # needs update func = None: always needs update 2173 # 2174 if self.needs_update_func is None: 2175 if verbose >= 5: 2176 messages.extend(_get_job_names( 2177 unglobbed_params, indent_str)) 2178 messages.append(indent_str + " Jobs needs update: No " 2179 "function to check if up-to-date or not") 2180 continue 2181 2182 if self.needs_update_func == needs_update_check_modify_time: 2183 needs_update, msg = self.needs_update_func( 2184 *params, task=self, job_history=job_history, 2185 verbose_abbreviated_path=verbose_abbreviated_path, 2186 return_file_dates_when_uptodate=verbose > 6) 2187 else: 2188 needs_update, msg = self.needs_update_func(*params) 2189 2190 if needs_update: 2191 messages.extend(_get_job_names( 2192 unglobbed_params, indent_str)) 2193 if verbose >= 4: 2194 per_job_messages = [(indent_str + s) 2195 for s in (" Job needs update: %s" % msg).split("\n")] 2196 messages.extend(per_job_messages) 2197 else: 2198 messages.append(indent_str + " Job needs update") 2199 2200 # up to date: log anyway if verbose 2201 else: 2202 # LOGGER 2203 if (task_is_out_of_date and verbose >= 5) or verbose >= 6: 2204 messages.extend(_get_job_names( 2205 unglobbed_params, indent_str)) 2206 # 2207 # Get rid of up-to-date messages: 2208 # Superfluous for parts of the pipeline which are up-to-date 2209 # Misleading for parts of the pipeline which require updating: 2210 # tasks might have to run based on dependencies anyway 2211 # 2212 # if not task_is_out_of_date: 2213 # messages.append(indent_str + " Job up-to-date") 2214 if verbose > 6: 2215 messages.extend((indent_str + s) 2216 for s in (msg).split("\n")) 2217 2218 if cnt_jobs == 0: 2219 messages.append(indent_str + "!!! No jobs for this task.") 2220 messages.append(indent_str + "Are you sure there is " 2221 "not a error in your code / regular expression?") 2222 # LOGGER 2223 2224 # DEBUGGGG!! 2225 if verbose >= 4 or (verbose and cnt_jobs == 0): 2226 if runtime_data and "MATCH_FAILURE" in runtime_data and\ 2227 self.param_generator_func in runtime_data["MATCH_FAILURE"]: 2228 for job_msg in runtime_data["MATCH_FAILURE"][self.param_generator_func]: 2229 messages.append( 2230 indent_str + "Job Warning: Input substitution failed:") 2231 messages.append( 2232 indent_str + " Do your regular expressions match the corresponding Input?") 2233 messages.extend(" " + indent_str + 2234 line for line in job_msg.split("\n")) 2235 2236 runtime_data["MATCH_FAILURE"][self.param_generator_func] = set() 2237 messages.append("") 2238 return messages 2239 2240 # _________________________________________________________________________ 2241 2242 # _is_up_to_date 2243 # 2244 # use to be named signal 2245 # returns whether up to date 2246 # stops recursing if true 2247 # 2248 # _________________________________________________________________________ 2249 def _is_up_to_date(self, verbose_logger_job_history): 2250 """ 2251 If true, depth first search will not pass through this node 2252 """ 2253 if not verbose_logger_job_history: 2254 raise Exception("verbose_logger_job_history is None") 2255 2256 verbose_logger = verbose_logger_job_history[0] 2257 job_history = verbose_logger_job_history[1] 2258 2259 try: 2260 logger = verbose_logger.logger 2261 verbose = verbose_logger.verbose 2262 runtime_data = verbose_logger.runtime_data 2263 verbose_abbreviated_path = verbose_logger.verbose_abbreviated_path 2264 2265 log_at_level(logger, 10, verbose, " Task = %r " % 2266 self._get_display_name()) 2267 2268 # 2269 # If job is inactive, always consider it up-to-date 2270 # 2271 if (self.active_if_checks is not None and 2272 any(not arg() if isinstance(arg, Callable) else not arg 2273 for arg in self.active_if_checks)): 2274 log_at_level(logger, 10, verbose, 2275 " Inactive task: treat as Up to date") 2276 # print 'signaling that the inactive task is up to date' 2277 return True 2278 2279 # 2280 # Always needs update if no way to check if up to date 2281 # 2282 if self.needs_update_func is None: 2283 log_at_level(logger, 10, verbose, 2284 " No update function: treat as out of date") 2285 return False 2286 2287 # 2288 # if no parameters, just return the results of needs update 2289 # 2290 if self.param_generator_func is None: 2291 if self.needs_update_func == needs_update_check_modify_time: 2292 needs_update, ignore_msg = self.needs_update_func( 2293 task=self, job_history=job_history, 2294 verbose_abbreviated_path=verbose_abbreviated_path) 2295 else: 2296 needs_update, ignore_msg = self.needs_update_func() 2297 log_at_level(logger, 10, verbose, 2298 " Needs update = %s" % needs_update) 2299 return not needs_update 2300 else: 2301 # 2302 # return not up to date if ANY jobs needs update 2303 # 2304 for params, unglobbed_params in self.param_generator_func(runtime_data): 2305 if self.needs_update_func == needs_update_check_modify_time: 2306 needs_update, ignore_msg = self.needs_update_func( 2307 *params, task=self, job_history=job_history, 2308 verbose_abbreviated_path=verbose_abbreviated_path) 2309 else: 2310 needs_update, ignore_msg = self.needs_update_func( 2311 *params) 2312 if needs_update: 2313 log_at_level(logger, 10, verbose, " Needing update:\n %s" 2314 % self._get_job_name(unglobbed_params, 2315 verbose_abbreviated_path, runtime_data)) 2316 return False 2317 2318 # 2319 # Percolate warnings from parameter factories 2320 # 2321 # !! 2322 if (verbose >= 1 and "ruffus_WARNING" in runtime_data and 2323 self.param_generator_func in runtime_data["ruffus_WARNING"]): 2324 for msg in runtime_data["ruffus_WARNING"][self.param_generator_func]: 2325 logger.warning(" 'In Task\n%s\n%s" % ( 2326 self.description_with_args_placeholder % "...", msg)) 2327 2328 log_at_level(logger, 10, verbose, " All jobs up to date") 2329 2330 return True 2331 2332 # 2333 # removed for compatibility with python 3.x 2334 # 2335 # rethrow exception after adding task name 2336 # except error_task, inst: 2337 # inst.specify_task(self, "Exceptions in dependency checking") 2338 # raise 2339 2340 except: 2341 exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() 2342 # 2343 # rethrow exception after adding task name 2344 # 2345 if exceptionType == error_task: 2346 exceptionValue.specify 2347 inst.specify_task(self, "Exceptions in dependency checking") 2348 raise 2349 2350 exception_stack = traceback.format_exc() 2351 exception_name = exceptionType.__module__ + '.' + exceptionType.__name__ 2352 exception_value = str(exceptionValue) 2353 if len(exception_value): 2354 exception_value = "(%s)" % exception_value 2355 errt = ruffus_exceptions.RethrownJobError([(self._name, 2356 "", 2357 exception_name, 2358 exception_value, 2359 exception_stack)]) 2360 errt.specify_task(self, "Exceptions generating parameters") 2361 raise errt 2362 2363 # _________________________________________________________________________ 2364 2365 # _get_output_files 2366 # 2367 # 2368 # _________________________________________________________________________ 2369 def _get_output_files(self, do_not_expand_single_job_tasks, runtime_data): 2370 """ 2371 Cache output files 2372 2373 Normally returns a list with one item for each job or a just a list 2374 of file names. 2375 For "_single_job_single_output" 2376 i.e. @merge and @files with single jobs, 2377 returns the output of a single job (i.e. can be a string) 2378 """ 2379 2380 # 2381 # N.B. active_if_checks is called once per task 2382 # in make_job_parameter_generator() for consistency 2383 # 2384 # self.is_active can be set using self.active_if_checks in that 2385 # function, and therefore can be changed BETWEEN invocations 2386 # of pipeline_run 2387 # 2388 # self.is_active is not used anywhere else 2389 # 2390 if (not self.is_active): 2391 return [] 2392 2393 if self.output_filenames is None: 2394 2395 self.output_filenames = [] 2396 2397 # skip tasks which don't have parameters 2398 if self.param_generator_func is not None: 2399 2400 cnt_jobs = 0 2401 for params, unglobbed_params in self.param_generator_func(runtime_data): 2402 cnt_jobs += 1 2403 # skip tasks which don't have output parameters 2404 if len(params) >= 2: 2405 # make sure each @split or @subdivide or @originate 2406 # returns a list of jobs 2407 # i.e. each @split or @subdivide or @originate is 2408 # always a ->many operation 2409 # even if len(many) can be 1 (or zero) 2410 if self.indeterminate_output and not non_str_sequence(params[1]): 2411 self.output_filenames.append([params[1]]) 2412 else: 2413 self.output_filenames.append(params[1]) 2414 2415 if self._is_single_job_single_output == self._single_job_single_output: 2416 if cnt_jobs > 1: 2417 raise ruffus_exceptions.error_task_get_output(self, "Task which is supposed to produce a " 2418 "single output somehow has more than one job.") 2419 2420 # 2421 # The output of @split should be treated as multiple jobs 2422 # 2423 # The output of @split is always a list of lists: 2424 # 1) There is a list of @split jobs 2425 # A) For advanced (regex) @split 2426 # this is a many -> many more operation 2427 # So len(list) == many (i.e. the number of jobs 2428 # B) For normal @split 2429 # this is a 1 -> many operation 2430 # So len(list) = 1 2431 # 2432 # 2) The output of each @split job is a list 2433 # The items in this list of lists are each a job in 2434 # subsequent tasks 2435 # 2436 # 2437 # So we need to concatenate these separate lists into a 2438 # single list of output 2439 # 2440 # For example: 2441 # @split(["a.1", "b.1"], regex(r"(.)\.1"), r"\1.*.2") 2442 # def example(input, output): 2443 # JOB 1 2444 # a.1 -> a.i.2 2445 # -> a.j.2 2446 # 2447 # JOB 2 2448 # b.1 -> b.i.2 2449 # -> b.j.2 2450 # 2451 # output_filenames = [ [a.i.2, a.j.2], [b.i.2, b.j.2] ] 2452 # 2453 # we want [ a.i.2, a.j.2, b.i.2, b.j.2 ] 2454 # 2455 # This also works for simple @split 2456 # 2457 # @split("a.1", r"a.*.2") 2458 # def example(input, output): 2459 # only job 2460 # a.1 -> a.i.2 2461 # -> a.j.2 2462 # 2463 # output_filenames = [ [a.i.2, a.j.2] ] 2464 # 2465 # we want [ a.i.2, a.j.2 ] 2466 # 2467 if len(self.output_filenames) and self.indeterminate_output: 2468 self.output_filenames = reduce( 2469 lambda x, y: x + y, self.output_filenames) 2470 2471 # special handling for jobs which have a single task 2472 if (do_not_expand_single_job_tasks and 2473 self._is_single_job_single_output and 2474 len(self.output_filenames)): 2475 return self.output_filenames[0] 2476 2477 # 2478 # sort by jobs so it is just a weeny little bit less deterministic 2479 # 2480 return sorted(self.output_filenames, key=lambda x: str(x)) 2481 2482 # _________________________________________________________________________ 2483 2484 # _completed 2485 # 2486 # All logging logic moved to caller site 2487 # _________________________________________________________________________ 2488 def _completed(self): 2489 """ 2490 called even when all jobs are up to date 2491 """ 2492 if not self.is_active: 2493 self.output_filenames = None 2494 return 2495 2496 for f in self.posttask_functions: 2497 f() 2498 2499 # 2500 # indeterminate output. Check actual output again if someother tasks 2501 # job function depend on it 2502 # used for @split 2503 # 2504 if self.indeterminate_output: 2505 self.output_filenames = None 2506 2507 # _________________________________________________________________________ 2508 2509 # _handle_tasks_globs_in_inputs 2510 2511 # _________________________________________________________________________ 2512 def _handle_tasks_globs_in_inputs(self, input_params, modify_inputs_mode): 2513 """ 2514 Helper function for tasks which 2515 1) Notes globs and tasks 2516 2) Replaces tasks names and functions with actual tasks 2517 3) Adds task dependencies automatically via task_follows 2518 2519 modify_inputs_mode = results["modify_inputs_mode"] = 2520 t_extra_inputs.ADD_TO_INPUTS | REPLACE_INPUTS | 2521 KEEP_INPUTS | KEEP_OUTPUTS 2522 """ 2523 # DEBUGGG 2524 # print(" task._handle_tasks_globs_in_inputs start %s" % (self._get_display_name(), ), file = sys.stderr) 2525 # 2526 # get list of function/function names and globs 2527 # 2528 function_or_func_names, globs, runtime_data_names = get_nested_tasks_or_globs(input_params) 2529 2530 # 2531 # replace function / function names with tasks 2532 # 2533 if modify_inputs_mode == t_extra_inputs.ADD_TO_INPUTS: 2534 description_with_args_placeholder = \ 2535 self.description_with_args_placeholder % "add_inputs = add_inputs(%r)" 2536 elif modify_inputs_mode == t_extra_inputs.REPLACE_INPUTS: 2537 description_with_args_placeholder = \ 2538 self.description_with_args_placeholder % "replace_inputs = add_inputs(%r)" 2539 elif modify_inputs_mode == t_extra_inputs.KEEP_OUTPUTS: 2540 description_with_args_placeholder = \ 2541 self.description_with_args_placeholder % "output =%r" 2542 else: # t_extra_inputs.KEEP_INPUTS 2543 description_with_args_placeholder = \ 2544 self.description_with_args_placeholder % "input =%r" 2545 2546 tasks = self._connect_parents( 2547 description_with_args_placeholder, True, function_or_func_names) 2548 functions_to_tasks = dict() 2549 for funct_name_task_or_pipeline, task in zip(function_or_func_names, tasks): 2550 if isinstance(funct_name_task_or_pipeline, Pipeline): 2551 functions_to_tasks["PIPELINE=%s=PIPELINE" % 2552 funct_name_task_or_pipeline.name] = task 2553 else: 2554 functions_to_tasks[funct_name_task_or_pipeline] = task 2555 2556 # replace strings, tasks, pipelines with tasks 2557 input_params = replace_placeholders_with_tasks_in_input_params( 2558 input_params, functions_to_tasks) 2559 # DEBUGGG 2560 #print(" task._handle_tasks_globs_in_inputs finish %s" % (self._get_display_name(), ), file = sys.stderr) 2561 return t_params_tasks_globs_run_time_data(input_params, tasks, globs, runtime_data_names) 2562 2563 def _choose_file_names_transform(self, parsed_args, 2564 valid_tags=(regex, suffix, formatter)): 2565 """ 2566 shared code for subdivide, transform, product etc for choosing method 2567 for transform input file to output files 2568 """ 2569 file_name_transform_tag = parsed_args["filter"] 2570 valid_tag_names = [] 2571 # regular expression match 2572 if (regex in valid_tags): 2573 valid_tag_names.append("regex()") 2574 if isinstance(file_name_transform_tag, regex): 2575 return t_regex_file_names_transform(self, 2576 file_name_transform_tag, 2577 self.error_type, 2578 self.syntax) 2579 2580 # simulate end of string (suffix) match 2581 if (suffix in valid_tags): 2582 valid_tag_names.append("suffix()") 2583 if isinstance(file_name_transform_tag, suffix): 2584 output_dir = parsed_args["output_dir"] if "output_dir" in parsed_args else [ 2585 ] 2586 return t_suffix_file_names_transform(self, 2587 file_name_transform_tag, 2588 self.error_type, 2589 self.syntax, 2590 output_dir) 2591 # new style string.format() 2592 if (formatter in valid_tags): 2593 valid_tag_names.append("formatter()") 2594 if isinstance(file_name_transform_tag, formatter): 2595 return t_formatter_file_names_transform(self, 2596 file_name_transform_tag, 2597 self.error_type, 2598 self.syntax) 2599 2600 raise self.error_type(self, 2601 "%s expects one of %s as the second argument" 2602 % (self.syntax, ", ".join(valid_tag_names))) 2603 2604 # task handlers 2605 # sets 2606 # 1) action_type 2607 # 2) param_generator_func 2608 # 3) needs_update_func 2609 # 4) job wrapper 2610 def _do_nothing_setup(self): 2611 """ 2612 Task is already set up: do nothing 2613 """ 2614 return set() 2615 2616 # originate does have an Input param. 2617 # It is just None (and not set-able) 2618 def _decorator_originate(self, *unnamed_args, **named_args): 2619 """ 2620 @originate 2621 """ 2622 self.syntax = "@originate" 2623 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 2624 self._get_decorated_function()) 2625 self._prepare_originate(unnamed_args, named_args) 2626 2627 # originate 2628 # self.has_input_param = True 2629 2630 def _prepare_originate(self, unnamed_args, named_args): 2631 """ 2632 Common function for pipeline.originate and @originate 2633 """ 2634 self.error_type = ruffus_exceptions.error_task_originate 2635 self._set_action_type(Task._action_task_originate) 2636 self._setup_task_func = Task._originate_setup 2637 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2638 self.job_wrapper = job_wrapper_output_files 2639 self.job_descriptor = io_files_one_to_many_job_descriptor 2640 self.single_multi_io = self._many_to_many 2641 # output is not a glob 2642 self.indeterminate_output = 0 2643 2644 self.parsed_args = parse_task_arguments(unnamed_args, named_args, ["output", "extras"], 2645 self.description_with_args_placeholder) 2646 2647 def _originate_setup(self): 2648 """ 2649 Finish setting up originate 2650 """ 2651 # 2652 # If self.parsed_args["output"] is a single item (e.g. file name), 2653 # that will be treated as a list 2654 # Each item in the list of these will be called as an output in a 2655 # separate function call 2656 # 2657 output_params = self.parsed_args["output"] 2658 if not non_str_sequence(output_params): 2659 output_params = [output_params] 2660 2661 # 2662 # output globs will be replaced with files. But there should not be 2663 # tasks here! 2664 # 2665 list_output_files_task_globs = [self._handle_tasks_globs_in_inputs( 2666 oo, t_extra_inputs.KEEP_INPUTS) for oo in output_params] 2667 for oftg in list_output_files_task_globs: 2668 if len(oftg.tasks): 2669 raise self.error_type(self, "%s cannot output to another " 2670 "task. Do not include tasks in " 2671 "output parameters." % self.syntax) 2672 2673 self.param_generator_func = originate_param_factory(list_output_files_task_globs, 2674 *self.parsed_args["extras"]) 2675 return set() 2676 2677 def _decorator_transform(self, *unnamed_args, **named_args): 2678 """ 2679 @originate 2680 """ 2681 self.syntax = "@transform" 2682 self.description_with_args_placeholder = "%s(%%s)\n%s" % ( 2683 self.syntax, self._get_decorated_function()) 2684 self._prepare_transform(unnamed_args, named_args) 2685 2686 def _prepare_transform(self, unnamed_args, named_args): 2687 """ 2688 Common function for pipeline.transform and @transform 2689 """ 2690 self.error_type = ruffus_exceptions.error_task_transform 2691 self._set_action_type(Task._action_task_transform) 2692 self._setup_task_func = Task._transform_setup 2693 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2694 self.job_wrapper = job_wrapper_io_files 2695 self.job_descriptor = io_files_job_descriptor 2696 self.single_multi_io = self._many_to_many 2697 2698 # Parse named and unnamed arguments 2699 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 2700 ["input", "filter", "modify_inputs", 2701 "output", "extras", "output_dir"], 2702 self.description_with_args_placeholder) 2703 2704 def _transform_setup(self): 2705 """ 2706 Finish setting up transform 2707 """ 2708 # DEBUGGG 2709 # print(" task._transform_setup start %s" % (self._get_display_name(), ), file = sys.stderr) 2710 # replace function / function names with tasks 2711 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 2712 t_extra_inputs.KEEP_INPUTS) 2713 ancestral_tasks = set(input_files_task_globs.tasks) 2714 2715 # _single_job_single_output is bad policy. Can we remove it? 2716 # What does this actually mean in Ruffus semantics? 2717 # 2718 # allows transform to take a single file or task 2719 if input_files_task_globs.single_file_to_list(): 2720 self._is_single_job_single_output = self._single_job_single_output 2721 2722 # 2723 # whether transform generates a list of jobs or not will depend on 2724 # the parent task 2725 # 2726 elif isinstance(input_files_task_globs.params, Task): 2727 self._is_single_job_single_output = input_files_task_globs.params 2728 2729 # how to transform input to output file name 2730 file_names_transform = self._choose_file_names_transform( 2731 self.parsed_args) 2732 2733 modify_inputs = self.parsed_args["modify_inputs"] 2734 if modify_inputs is not None: 2735 modify_inputs = self._handle_tasks_globs_in_inputs( 2736 modify_inputs, self.parsed_args["modify_inputs_mode"]) 2737 ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks) 2738 2739 self.param_generator_func = transform_param_factory(input_files_task_globs, 2740 file_names_transform, 2741 modify_inputs, 2742 self.parsed_args["modify_inputs_mode"], 2743 self.parsed_args["output"], 2744 *self.parsed_args["extras"]) 2745 2746 # DEBUGGG 2747 #print(" task._transform_setup finish %s" % (self._get_display_name(), ), file = sys.stderr) 2748 return ancestral_tasks 2749 2750 def _decorator_subdivide(self, *unnamed_args, **named_args): 2751 """ 2752 @subdivide 2753 """ 2754 self.syntax = "@subdivide" 2755 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 2756 self._get_decorated_function()) 2757 self._prepare_subdivide(unnamed_args, named_args) 2758 2759 def _prepare_subdivide(self, unnamed_args, named_args): 2760 """ 2761 Common code for @subdivide and pipeline.subdivide 2762 @split can also end up here 2763 """ 2764 self.error_type = ruffus_exceptions.error_task_subdivide 2765 self._set_action_type(Task._action_task_subdivide) 2766 self._setup_task_func = Task._subdivide_setup 2767 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2768 self.job_wrapper = job_wrapper_io_files 2769 self.job_descriptor = io_files_one_to_many_job_descriptor 2770 self.single_multi_io = self._many_to_many 2771 # output is a glob 2772 self.indeterminate_output = 2 2773 2774 # 2775 # Parse named and unnamed arguments 2776 # 2777 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 2778 ["input", "filter", "modify_inputs", 2779 "output", "extras", "output_dir"], 2780 self.description_with_args_placeholder) 2781 2782 def _subdivide_setup(self): 2783 """ 2784 Finish setting up subdivide 2785 """ 2786 2787 # 2788 # replace function / function names with tasks 2789 # 2790 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 2791 t_extra_inputs.KEEP_INPUTS) 2792 2793 # allows split to take a single file or task 2794 input_files_task_globs.single_file_to_list() 2795 2796 ancestral_tasks = set(input_files_task_globs.tasks) 2797 2798 # how to transform input to output file name 2799 file_names_transform = self._choose_file_names_transform( 2800 self.parsed_args) 2801 2802 modify_inputs = self.parsed_args["modify_inputs"] 2803 if modify_inputs is not None: 2804 modify_inputs = self._handle_tasks_globs_in_inputs( 2805 modify_inputs, self.parsed_args["modify_inputs_mode"]) 2806 ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks) 2807 2808 # 2809 # output globs will be replaced with files. 2810 # But there should not be tasks here! 2811 # 2812 output_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["output"], 2813 t_extra_inputs.KEEP_OUTPUTS) 2814 if len(output_files_task_globs.tasks): 2815 raise self.error_type(self, ("%s cannot output to another task. Do not include tasks " 2816 "in output parameters.") % self.syntax) 2817 2818 self.param_generator_func = subdivide_param_factory(input_files_task_globs, 2819 # False, # 2820 # flatten input 2821 # removed 2822 file_names_transform, 2823 modify_inputs, 2824 self.parsed_args["modify_inputs_mode"], 2825 output_files_task_globs, 2826 *self.parsed_args["extras"]) 2827 return ancestral_tasks 2828 2829 def _decorator_split(self, *unnamed_args, **named_args): 2830 """ 2831 @split 2832 """ 2833 self.syntax = "@split" 2834 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 2835 self._get_decorated_function()) 2836 2837 # 2838 # This is actually @subdivide 2839 # 2840 if isinstance(unnamed_args[1], regex): 2841 self._prepare_subdivide(unnamed_args, named_args, 2842 self.description_with_args_placeholder) 2843 2844 # 2845 # This is actually @split 2846 # 2847 else: 2848 self._prepare_split(unnamed_args, named_args) 2849 2850 def _prepare_split(self, unnamed_args, named_args): 2851 """ 2852 Common code for @split and pipeline.split 2853 """ 2854 self.error_type = ruffus_exceptions.error_task_split 2855 self._set_action_type(Task._action_task_split) 2856 self._setup_task_func = Task._split_setup 2857 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2858 self.job_wrapper = job_wrapper_io_files 2859 self.job_descriptor = io_files_one_to_many_job_descriptor 2860 self.single_multi_io = self._one_to_many 2861 # output is a glob 2862 self.indeterminate_output = 1 2863 2864 # 2865 # Parse named and unnamed arguments 2866 # 2867 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 2868 ["input", "output", "extras"], 2869 self.description_with_args_placeholder) 2870 2871 def _split_setup(self): 2872 """ 2873 Finish setting up split 2874 """ 2875 2876 # 2877 # replace function / function names with tasks 2878 # 2879 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 2880 t_extra_inputs.KEEP_INPUTS) 2881 2882 # 2883 # output globs will be replaced with files. 2884 # But there should not be tasks here! 2885 # 2886 output_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["output"], 2887 t_extra_inputs.KEEP_OUTPUTS) 2888 if len(output_files_task_globs.tasks): 2889 raise self.error_type(self, "%s cannot output to another task. " 2890 "Do not include tasks in output " 2891 "parameters." % self.syntax) 2892 2893 self.param_generator_func = split_param_factory(input_files_task_globs, 2894 output_files_task_globs, 2895 *self.parsed_args["extras"]) 2896 return set(input_files_task_globs.tasks) 2897 2898 def _decorator_merge(self, *unnamed_args, **named_args): 2899 """ 2900 @merge 2901 """ 2902 self.syntax = "@merge" 2903 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 2904 self._get_decorated_function()) 2905 self._prepare_merge(unnamed_args, named_args) 2906 2907 def _prepare_merge(self, unnamed_args, named_args): 2908 """ 2909 Common code for @merge and pipeline.merge 2910 """ 2911 self.error_type = ruffus_exceptions.error_task_merge 2912 self._set_action_type(Task._action_task_merge) 2913 self._setup_task_func = Task._merge_setup 2914 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2915 self.job_wrapper = job_wrapper_io_files 2916 self.job_descriptor = io_files_job_descriptor 2917 self.single_multi_io = self._many_to_one 2918 self._is_single_job_single_output = self._single_job_single_output 2919 2920 # 2921 # Parse named and unnamed arguments 2922 # 2923 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 2924 ["input", "output", "extras"], 2925 self.description_with_args_placeholder) 2926 2927 def _merge_setup(self): 2928 """ 2929 Finish setting up merge 2930 """ 2931 # 2932 # replace function / function names with tasks 2933 # 2934 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 2935 t_extra_inputs.KEEP_INPUTS) 2936 2937 self.param_generator_func = merge_param_factory(input_files_task_globs, 2938 self.parsed_args["output"], 2939 *self.parsed_args["extras"]) 2940 return set(input_files_task_globs.tasks) 2941 2942 def _decorator_collate(self, *unnamed_args, **named_args): 2943 """ 2944 @collate 2945 """ 2946 self.syntax = "@collate" 2947 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 2948 self._get_decorated_function()) 2949 self._prepare_collate(unnamed_args, named_args) 2950 2951 def _prepare_collate(self, unnamed_args, named_args): 2952 """ 2953 Common code for @collate and pipeline.collate 2954 """ 2955 self.error_type = ruffus_exceptions.error_task_collate 2956 self._set_action_type(Task._action_task_collate) 2957 self._setup_task_func = Task._collate_setup 2958 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 2959 self.job_wrapper = job_wrapper_io_files 2960 self.job_descriptor = io_files_job_descriptor 2961 self.single_multi_io = self._many_to_many 2962 2963 # 2964 # Parse named and unnamed arguments 2965 # 2966 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 2967 ["input", "filter", "modify_inputs", 2968 "output", "extras"], 2969 self.description_with_args_placeholder) 2970 2971 def _collate_setup(self): 2972 """ 2973 Finish setting up collate 2974 """ 2975 2976 # 2977 # replace function / function names with tasks 2978 # 2979 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 2980 t_extra_inputs.KEEP_INPUTS) 2981 ancestral_tasks = set(input_files_task_globs.tasks) 2982 2983 # how to transform input to output file name 2984 file_names_transform = self._choose_file_names_transform(self.parsed_args, 2985 (regex, formatter)) 2986 2987 modify_inputs = self.parsed_args["modify_inputs"] 2988 if modify_inputs is not None: 2989 modify_inputs = self._handle_tasks_globs_in_inputs( 2990 modify_inputs, self.parsed_args["modify_inputs_mode"]) 2991 ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks) 2992 2993 self.param_generator_func = collate_param_factory(input_files_task_globs, 2994 # False, # 2995 # flatten input 2996 # removed 2997 file_names_transform, 2998 modify_inputs, 2999 self.parsed_args["modify_inputs_mode"], 3000 self.parsed_args["output"], 3001 *self.parsed_args["extras"]) 3002 3003 return ancestral_tasks 3004 3005 def _decorator_mkdir(self, *unnamed_args, **named_args): 3006 """ 3007 @mkdir 3008 """ 3009 syntax = "@mkdir" 3010 description_with_args_placeholder = "%s(%%s)\n%s" % ( 3011 self.syntax, (self.description_with_args_placeholder % "...")) 3012 self._prepare_preceding_mkdir(unnamed_args, named_args, syntax, 3013 description_with_args_placeholder) 3014 3015 def mkdir(self, *unnamed_args, **named_args): 3016 """ 3017 Make missing directories, including intermediates, before this task 3018 """ 3019 syntax = "Task(name = %s).mkdir" % self._name 3020 description_with_args_placeholder = "%s(%%s)" % (self.syntax) 3021 self._prepare_preceding_mkdir(unnamed_args, named_args, syntax, 3022 description_with_args_placeholder) 3023 return self 3024 3025 def _prepare_preceding_mkdir(self, unnamed_args, named_args, syntax, 3026 task_description, defer=True): 3027 """ 3028 Add mkdir Task to run before self 3029 Common to 3030 Task.mkdir 3031 @mkdir 3032 @follows(..., mkdir()) 3033 """ 3034 # 3035 # Create a new Task with a unique name to this instance of mkdir 3036 # 3037 self.cnt_task_mkdir += 1 3038 cnt_task_mkdir_str = ( 3039 " #%d" % self.cnt_task_mkdir) if self.cnt_task_mkdir > 1 else "" 3040 task_name = r"mkdir%r%s before %s " % ( 3041 unnamed_args, cnt_task_mkdir_str, self._name) 3042 task_name = task_name.replace(",)", ")").replace(",", ", ") 3043 new_task = self.pipeline._create_task( 3044 task_func=job_wrapper_mkdir, task_name=task_name) 3045 3046 # defer _add_parent so we can clone unless we are already 3047 # calling add_parent (from _connect_parents()) 3048 if defer: 3049 self.deferred_follow_params.append( 3050 [task_description, False, [new_task]]) 3051 3052 # 3053 # Prepare new node 3054 # 3055 new_task.syntax = syntax 3056 new_task._prepare_mkdir(unnamed_args, named_args, task_description) 3057 3058 # 3059 # Hack: 3060 # If the task name is too ugly, 3061 # we can override it for flowchart printing using the 3062 # display_name 3063 # 3064 # new_node.display_name = ??? new_node.func_description 3065 return new_task 3066 3067 def _prepare_mkdir(self, unnamed_args, named_args, task_description): 3068 3069 self.error_type = ruffus_exceptions.error_task_mkdir 3070 self._set_action_type(Task._action_mkdir) 3071 self.needs_update_func = self.needs_update_func or needs_update_check_directory_missing 3072 self.job_wrapper = job_wrapper_mkdir 3073 self.job_descriptor = mkdir_job_descriptor 3074 3075 # doesn't have a real function 3076 # use job_wrapper just so it is not None 3077 self.user_defined_work_func = self.job_wrapper 3078 3079 # 3080 # @transform like behaviour with regex / suffix or formatter 3081 # 3082 if (len(unnamed_args) > 1 and 3083 isinstance(unnamed_args[1], (formatter, suffix, regex))) or "filter" in named_args: 3084 self.single_multi_io = self._many_to_many 3085 self._setup_task_func = Task._transform_setup 3086 3087 # 3088 # Parse named and unnamed arguments 3089 # 3090 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 3091 ["input", "filter", "modify_inputs", 3092 "output", "output_dir", "extras"], task_description) 3093 3094 # 3095 # simple behaviour: just make directories in list of strings 3096 # 3097 # the mkdir decorator accepts one string, multiple strings or a list of strings 3098 else: 3099 3100 # 3101 # override funct description normally parsed from func.__doc__ 3102 # "Make missing directories including any intermediate 3103 # directories on the specified path(s)" 3104 # 3105 self.func_description = "Make missing directories %s" % ( 3106 shorten_filenames_encoder(unnamed_args, 0)) 3107 3108 self.single_multi_io = self._one_to_one 3109 self._setup_task_func = Task._do_nothing_setup 3110 self.has_input_param = False 3111 3112 # 3113 # 3114 # 3115 # if a single argument collection of parameters, keep that as is 3116 if len(unnamed_args) == 0: 3117 self.parsed_args["output"] = [] 3118 elif len(unnamed_args) > 1: 3119 self.parsed_args["output"] = unnamed_args 3120 # len(unnamed_args) == 1: unpack unnamed_args[0] 3121 elif non_str_sequence(unnamed_args[0]): 3122 self.parsed_args["output"] = unnamed_args[0] 3123 # single string or other non collection types 3124 else: 3125 self.parsed_args["output"] = unnamed_args 3126 3127 # all directories created in one job to reduce race conditions 3128 # so we are converting [a,b,c] into [ [(a, b,c)] ] 3129 # where unnamed_args = (a,b,c) 3130 # i.e. one job whose solitory argument is a tuple/list of directory 3131 # names 3132 self.param_generator_func = args_param_factory( 3133 [[sorted(self.parsed_args["output"], key=lambda x: str(x))]]) 3134 3135 # print ("mkdir %s" % (self.func_description), file = sys.stderr) 3136 3137 def _decorator_product(self, *unnamed_args, **named_args): 3138 """ 3139 @product 3140 """ 3141 self.syntax = "@product" 3142 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3143 self._get_decorated_function()) 3144 self._prepare_product(unnamed_args, named_args) 3145 3146 def _prepare_product(self, unnamed_args, named_args): 3147 """ 3148 Common code for @product and pipeline.product 3149 """ 3150 self.error_type = ruffus_exceptions.error_task_product 3151 self._set_action_type(Task._action_task_product) 3152 self._setup_task_func = Task._product_setup 3153 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 3154 self.job_wrapper = job_wrapper_io_files 3155 self.job_descriptor = io_files_job_descriptor 3156 self.single_multi_io = self._many_to_many 3157 3158 # 3159 # Parse named and unnamed arguments 3160 # 3161 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 3162 ["input", "filter", "inputN", "modify_inputs", 3163 "output", "extras"], 3164 self.description_with_args_placeholder) 3165 3166 def _product_setup(self): 3167 """ 3168 Finish setting up product 3169 """ 3170 # 3171 # replace function / function names with tasks 3172 # 3173 list_input_files_task_globs = [self._handle_tasks_globs_in_inputs(ii, 3174 t_extra_inputs.KEEP_INPUTS) 3175 for ii in self.parsed_args["input"]] 3176 ancestral_tasks = set() 3177 for input_files_task_globs in list_input_files_task_globs: 3178 ancestral_tasks = ancestral_tasks.union( 3179 input_files_task_globs.tasks) 3180 3181 # how to transform input to output file name 3182 file_names_transform = t_nested_formatter_file_names_transform(self, 3183 self.parsed_args["filter"], 3184 self.error_type, 3185 self.syntax) 3186 3187 modify_inputs = self.parsed_args["modify_inputs"] 3188 if modify_inputs is not None: 3189 modify_inputs = self._handle_tasks_globs_in_inputs( 3190 modify_inputs, self.parsed_args["modify_inputs_mode"]) 3191 ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks) 3192 3193 self.param_generator_func = product_param_factory(list_input_files_task_globs, 3194 # False, # 3195 # flatten input 3196 # removed 3197 file_names_transform, 3198 modify_inputs, 3199 self.parsed_args["modify_inputs_mode"], 3200 self.parsed_args["output"], 3201 *self.parsed_args["extras"]) 3202 3203 return ancestral_tasks 3204 3205 def _decorator_permutations(self, *unnamed_args, **named_args): 3206 """ 3207 @permutations 3208 """ 3209 self.syntax = "@permutations" 3210 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3211 self._get_decorated_function()) 3212 self._prepare_combinatorics( 3213 unnamed_args, named_args, ruffus_exceptions.error_task_permutations) 3214 3215 def _decorator_combinations(self, *unnamed_args, **named_args): 3216 """ 3217 @combinations 3218 """ 3219 self.syntax = "@combinations" 3220 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3221 self._get_decorated_function()) 3222 self._prepare_combinatorics( 3223 unnamed_args, named_args, ruffus_exceptions.error_task_combinations) 3224 3225 def _decorator_combinations_with_replacement(self, *unnamed_args, 3226 **named_args): 3227 """ 3228 @combinations_with_replacement 3229 """ 3230 self.syntax = "@combinations_with_replacement" 3231 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3232 self._get_decorated_function()) 3233 self._prepare_combinatorics(unnamed_args, named_args, 3234 ruffus_exceptions.error_task_combinations_with_replacement) 3235 3236 def _prepare_combinatorics(self, unnamed_args, named_args, error_type): 3237 """ 3238 Common code for 3239 @permutations and pipeline.permutations 3240 @combinations and pipeline.combinations 3241 @combinations_with_replacement and 3242 pipeline.combinations_with_replacement 3243 """ 3244 self.error_type = error_type 3245 self._setup_task_func = Task._combinatorics_setup 3246 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 3247 self.job_wrapper = job_wrapper_io_files 3248 self.job_descriptor = io_files_job_descriptor 3249 self.single_multi_io = self._many_to_many 3250 3251 # 3252 # Parse named and unnamed arguments 3253 # 3254 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 3255 ["input", "filter", "tuple_size", 3256 "modify_inputs", "output", "extras"], 3257 self.description_with_args_placeholder) 3258 3259 def _combinatorics_setup(self): 3260 """ 3261 Finish setting up combinatorics 3262 """ 3263 # 3264 # replace function / function names with tasks 3265 # 3266 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 3267 t_extra_inputs.KEEP_INPUTS) 3268 ancestral_tasks = set(input_files_task_globs.tasks) 3269 3270 # how to transform input to output file name: len(k-tuples) of 3271 # (identical) formatters 3272 file_names_transform = t_nested_formatter_file_names_transform( 3273 self, [self.parsed_args["filter"]] * 3274 self.parsed_args["tuple_size"], 3275 self.error_type, self.syntax) 3276 3277 modify_inputs = self.parsed_args["modify_inputs"] 3278 if modify_inputs is not None: 3279 modify_inputs = self._handle_tasks_globs_in_inputs( 3280 modify_inputs, self.parsed_args["modify_inputs_mode"]) 3281 ancestral_tasks = ancestral_tasks.union(modify_inputs.tasks) 3282 3283 # we are not going to specify what type of combinatorics this is twice: 3284 # just look up from our error type 3285 error_type_to_combinatorics_type = { 3286 ruffus_exceptions.error_task_combinations_with_replacement: 3287 t_combinatorics_type.COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT, 3288 ruffus_exceptions.error_task_combinations: 3289 t_combinatorics_type.COMBINATORICS_COMBINATIONS, 3290 ruffus_exceptions.error_task_permutations: 3291 t_combinatorics_type.COMBINATORICS_PERMUTATIONS 3292 } 3293 3294 self.param_generator_func = \ 3295 combinatorics_param_factory(input_files_task_globs, 3296 # False, # 3297 # flatten 3298 # input 3299 # removed 3300 error_type_to_combinatorics_type[ 3301 self.error_type], 3302 self.parsed_args["tuple_size"], 3303 file_names_transform, 3304 modify_inputs, 3305 self.parsed_args["modify_inputs_mode"], 3306 self.parsed_args["output"], 3307 *self.parsed_args["extras"]) 3308 3309 return ancestral_tasks 3310 3311 def _decorator_files(self, *unnamed_args, **named_args): 3312 """ 3313 @files 3314 """ 3315 self.syntax = "@files" 3316 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3317 self._get_decorated_function()) 3318 self._prepare_files(unnamed_args, named_args) 3319 3320 def _prepare_files(self, unnamed_args, named_args): 3321 """ 3322 Common code for @files and pipeline.files 3323 """ 3324 self.error_type = ruffus_exceptions.error_task_files 3325 self._setup_task_func = Task._do_nothing_setup 3326 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 3327 self.job_wrapper = job_wrapper_io_files 3328 self.job_descriptor = io_files_job_descriptor 3329 3330 if len(unnamed_args) == 0: 3331 raise ruffus_exceptions.error_task_files(self, "Too few arguments for @files") 3332 3333 # Use parameters generated by a custom function 3334 if len(unnamed_args) == 1 and isinstance(unnamed_args[0], 3335 Callable): 3336 3337 self._set_action_type(Task._action_task_files_func) 3338 self.param_generator_func = files_custom_generator_param_factory( 3339 unnamed_args[0]) 3340 3341 # assume 3342 self.single_multi_io = self._many_to_many 3343 3344 # Use parameters in supplied list 3345 else: 3346 self._set_action_type(Task._action_task_files) 3347 3348 if len(unnamed_args) > 1: 3349 3350 # single jobs 3351 # This is true even if the previous task has multiple output 3352 # These will all be joined together at the hip (like @merge) 3353 # If you want different behavior, use @transform 3354 params = copy.copy([unnamed_args]) 3355 self._is_single_job_single_output = self._single_job_single_output 3356 self.single_multi_io = self._one_to_one 3357 3358 else: 3359 3360 # multiple jobs with input/output parameters etc. 3361 params = copy.copy(unnamed_args[0]) 3362 self._is_single_job_single_output = self._multiple_jobs_outputs 3363 self.single_multi_io = self._many_to_many 3364 3365 check_files_io_parameters(self, params, ruffus_exceptions.error_task_files) 3366 3367 self.parsed_args["input"] = [pp[0] for pp in params] 3368 self.parsed_args["output"] = [tuple(pp[1:]) for pp in params] 3369 self._setup_task_func = Task._files_setup 3370 3371 def _files_setup(self): 3372 """ 3373 Finish setting up @files 3374 """ 3375 # 3376 # replace function / function names with tasks 3377 # 3378 input_files_task_globs = self._handle_tasks_globs_in_inputs(self.parsed_args["input"], 3379 t_extra_inputs.KEEP_INPUTS) 3380 3381 self.param_generator_func = files_param_factory(input_files_task_globs, 3382 True, 3383 self.parsed_args["output"]) 3384 return set(input_files_task_globs.tasks) 3385 3386 def _decorator_parallel(self, *unnamed_args, **named_args): 3387 """ 3388 @parallel 3389 """ 3390 self.syntax = "@parallel" 3391 self._prepare_parallel(unnamed_args, named_args) 3392 3393 def _prepare_parallel(self, unnamed_args, named_args): 3394 """ 3395 Common code for @parallel and pipeline.parallel 3396 """ 3397 self.error_type = ruffus_exceptions.error_task_parallel 3398 self._set_action_type(Task._action_task_parallel) 3399 self._setup_task_func = Task._do_nothing_setup 3400 # self.needs_update_func = None 3401 self.job_wrapper = job_wrapper_generic 3402 self.job_descriptor = io_files_job_descriptor 3403 3404 if len(unnamed_args) == 0: 3405 raise ruffus_exceptions.error_task_parallel(self, "Too few arguments for @parallel") 3406 3407 # Use parameters generated by a custom function 3408 if len(unnamed_args) == 1 and isinstance(unnamed_args[0], 3409 Callable): 3410 self.param_generator_func = args_param_factory(unnamed_args[0]()) 3411 3412 # list of params 3413 else: 3414 if len(unnamed_args) > 1: 3415 # single jobs 3416 params = copy.copy([unnamed_args]) 3417 self._is_single_job_single_output = self._single_job_single_output 3418 else: 3419 # multiple jobs with input/output parameters etc. 3420 params = copy.copy(unnamed_args[0]) 3421 check_parallel_parameters(self, params, ruffus_exceptions.error_task_parallel) 3422 3423 self.param_generator_func = args_param_factory(params) 3424 3425 def _decorator_files_re(self, *unnamed_args, **named_args): 3426 """ 3427 @files_re 3428 3429 calls user function in parallel 3430 with input_files, output_files, parameters 3431 These needed to be generated on the fly by 3432 getting all file names in the supplied list/glob pattern 3433 There are two variations: 3434 3435 1) inputfiles = all files in glob which match the regular 3436 expression 3437 outputfile = generated from the replacement string 3438 3439 2) inputfiles = all files in glob which match the regular 3440 expression and generated from the "from" 3441 replacement string 3442 outputfiles = all files in glob which match the regular 3443 expression and generated from the "to" 3444 replacement string 3445 """ 3446 self.syntax = "@files_re" 3447 self.error_type = ruffus_exceptions.error_task_files_re 3448 self._set_action_type(Task._action_task_files_re) 3449 self.needs_update_func = self.needs_update_func or needs_update_check_modify_time 3450 self.job_wrapper = job_wrapper_io_files 3451 self.job_descriptor = io_files_job_descriptor 3452 self.single_multi_io = self._many_to_many 3453 3454 if len(unnamed_args) < 3: 3455 raise self.error_type(self, "Too few arguments for @files_re") 3456 3457 # 888888888888888888888888888888888888888888888888888888888888888888888 3458 3459 # !! HERE BE DRAGONS !! 3460 3461 # Legacy, deprecated parameter handling depending on positions 3462 # and not even on type 3463 3464 # check if parameters wrapped in combine 3465 combining_all_jobs, unnamed_args = is_file_re_combining(unnamed_args) 3466 3467 # second parameter is always regex() 3468 unnamed_args[1] = regex(unnamed_args[1]) 3469 3470 # third parameter is inputs() if there is a four and fifth parameter... 3471 # That means if you want "extra" parameters, you always need inputs() 3472 if len(unnamed_args) > 3: 3473 unnamed_args[2] = inputs(unnamed_args[2]) 3474 3475 # 888888888888888888888888888888888888888888888888888888888888888888888 3476 3477 self.description_with_args_placeholder = "%s(%%s)\n%s" % (self.syntax, 3478 self._get_decorated_function()) 3479 self.parsed_args = parse_task_arguments(unnamed_args, named_args, 3480 ["input", "filter", "modify_inputs", 3481 "output", "extras"], 3482 self.description_with_args_placeholder) 3483 3484 if combining_all_jobs: 3485 self._setup_task_func = Task._collate_setup 3486 else: 3487 self._setup_task_func = Task._transform_setup 3488 3489 # 8888888888888888888888888888888888888888888888888888888888888888888888888 3490 3491 # Task functions 3492 3493 # follows 3494 # check_if_uptodate 3495 # posttask 3496 # jobs_limit 3497 # active_if 3498 # graphviz 3499 3500 # 8888888888888888888888888888888888888888888888888888888888888888888888888 3501 def follows(self, *unnamed_args, **named_args): 3502 """ 3503 Specifies a preceding task / action which this task will follow. 3504 The preceding task can be specified as a string or function or Task 3505 object. 3506 A task can also follow the making of one or more directories: 3507 3508 task.follows(mkdir("my_dir")) 3509 3510 """ 3511 description_with_args_placeholder = ( 3512 self.description_with_args_placeholder % "...") + ".follows(%r)" 3513 3514 self.deferred_follow_params.append([description_with_args_placeholder, False, 3515 unnamed_args]) 3516 # self._connect_parents(description_with_args_placeholder, False, 3517 # unnamed_args) 3518 return self 3519 3520 def _decorator_follows(self, *unnamed_args, **named_args): 3521 """ 3522 unnamed_args can be string or function or Task 3523 For strings, if lookup fails, will defer. 3524 """ 3525 description_with_args_placeholder = "@follows(%r)\n" + ( 3526 self.description_with_args_placeholder % "...") 3527 self.deferred_follow_params.append([description_with_args_placeholder, False, 3528 unnamed_args]) 3529 # self._connect_parents(description_with_args_placeholder, False, unnamed_args) 3530 3531 def _complete_setup(self): 3532 """ 3533 Connect up parents if follows was specified and setups up task functions 3534 Returns a set of parent tasks 3535 3536 Note will tear down previous parental links before doing anything 3537 """ 3538 # DEBUGGG 3539 # print(" task._complete_setup start %s" % (self._get_display_name(), ), file = sys.stderr) 3540 self._remove_all_parents() 3541 ancestral_tasks = self._deferred_connect_parents() 3542 ancestral_tasks |= self._setup_task_func(self) 3543 if "named_extras" in self.parsed_args: 3544 if self.command_str_callback == "PIPELINE": 3545 self.parsed_args["named_extras"]["__RUFFUS_TASK_CALLBACK__"] = self.pipeline.command_str_callback 3546 else: 3547 self.parsed_args["named_extras"]["__RUFFUS_TASK_CALLBACK__"] = self.command_str_callback 3548 # DEBUGGG 3549 # print(" task._complete_setup finish %s\n" % (self._get_display_name(), ), file = sys.stderr) 3550 return ancestral_tasks 3551 3552 def _deferred_connect_parents(self): 3553 """ 3554 Called by _complete_task_setup() from pipeline_run, pipeline_printout etc. 3555 returns a non-redundant list of all the ancestral tasks 3556 """ 3557 # DEBUGGG 3558 # print(" task._deferred_connect_parents start %s (%d to do)" % (self._get_display_name(), 3559 # len(self.deferred_follow_params)), file = sys.stderr) 3560 parent_tasks = set() 3561 3562 for ii, deferred_follow_params in enumerate(self.deferred_follow_params): 3563 # DEBUGGG 3564 # print(" task._deferred_connect_parents %s %d out of %d " % (self._get_display_name(), 3565 # ii, len(self.deferred_follow_params)), file = sys.stderr) 3566 new_tasks = self._connect_parents(*deferred_follow_params) 3567 # convert to mkdir and dynamically created tasks from follows into the actual created tasks 3568 # otherwise each time we redo this, we will have a sorceror's apprentice situation! 3569 deferred_follow_params[2] = new_tasks 3570 parent_tasks.update(new_tasks) 3571 3572 # DEBUGGG 3573 # print(" task._deferred_connect_parents finish %s" % self._get_display_name(), file = sys.stderr) 3574 return parent_tasks 3575 3576 # Deferred tasks will need to be resolved later 3577 # Because deferred tasks can belong to other pipelines 3578 def _connect_parents(self, description_with_args_placeholder, no_mkdir, 3579 unnamed_args): 3580 """ 3581 unnamed_args can be string or function or Task 3582 For strings, if lookup fails, will defer. 3583 3584 Called from 3585 * task.follows 3586 * @follows 3587 * decorators, e.g. @transform _handle_tasks_globs_in_inputs 3588 (input dependencies) 3589 * pipeline.transform etc. _handle_tasks_globs_in_inputs 3590 (input dependencies) 3591 * @split / pipeline.split _handle_tasks_globs_in_inputs 3592 (output dependencies) 3593 """ 3594 # DEBUGGG 3595 #print(" _connect_parents start %s" % self._get_display_name(), file = sys.stderr) 3596 new_tasks = [] 3597 for arg in unnamed_args: 3598 # 3599 # Task 3600 # 3601 if isinstance(arg, Task): 3602 if arg == self: 3603 raise ruffus_exceptions.error_decorator_args( 3604 "Cannot have a task as its own (circular) dependency:\n" 3605 % description_with_args_placeholder % (arg,)) 3606 3607 # 3608 # re-lookup from task name to handle cloning 3609 # 3610 if arg.pipeline.name == self.pipeline.original_name and \ 3611 self.pipeline.original_name != self.pipeline.name: 3612 tasks = lookup_tasks_from_name(arg._name, 3613 default_pipeline_name=self.pipeline.name, 3614 default_module_name=self.func_module_name) 3615 new_tasks.extend(tasks) 3616 3617 if not tasks: 3618 raise ruffus_exceptions.error_node_not_task( 3619 "task '%s' '%s::%s' is somehow absent in the cloned pipeline (%s)!%s" 3620 % (self.pipeline.original_name, arg._name, self.pipeline.name, 3621 description_with_args_placeholder % (arg._name,))) 3622 else: 3623 new_tasks.append(arg) 3624 3625 # 3626 # Pipeline: defer 3627 # 3628 elif isinstance(arg, Pipeline): 3629 if arg == self.pipeline: 3630 raise ruffus_exceptions.error_decorator_args( 3631 "Cannot have your own pipeline as a (circular) " 3632 "dependency of a Task:\n" + 3633 description_with_args_placeholder % (arg,)) 3634 3635 if not len(arg.get_tail_tasks()): 3636 raise ruffus_exceptions.error_no_tail_tasks( 3637 "Pipeline '{pipeline_name}' has no 'tail' tasks defined.\nWhich task " 3638 "in '{pipeline_name}' are you referring to?" 3639 .format(pipeline_name=arg.name)) 3640 new_tasks.extend(arg.get_tail_tasks()) 3641 3642 # specified by string: unicode or otherwise 3643 elif isinstance(arg, path_str_type): 3644 # handle pipeline cloning 3645 task_name = arg.replace(self.pipeline.original_name + "::", 3646 self.pipeline.name + "::") 3647 3648 tasks = lookup_tasks_from_name(arg, 3649 default_pipeline_name=self.pipeline.name, 3650 default_module_name=self.func_module_name) 3651 new_tasks.extend(tasks) 3652 3653 if not tasks: 3654 raise ruffus_exceptions.error_node_not_task("task '%s' is not a pipelined task in Ruffus. " 3655 "Have you mis-spelt the function or task name?\n%s" 3656 % (arg, description_with_args_placeholder % (arg,))) 3657 3658 # for mkdir, automatically generate task with unique name 3659 elif isinstance(arg, mkdir): 3660 if no_mkdir: 3661 raise ruffus_exceptions.error_decorator_args("Unexpected mkdir() found.\n" + 3662 description_with_args_placeholder % (arg,)) 3663 3664 # syntax for new task doing the mkdir 3665 if self.created_via_decorator: 3666 mkdir_task_syntax = "@follows(mkdir())" 3667 else: 3668 mkdir_task_syntax = "Task(name=%r).follows(mkdir())" % self._get_display_name( 3669 ) 3670 mkdir_description_with_args_placeholder = \ 3671 description_with_args_placeholder % "mkdir(%s)" 3672 new_tasks.append(self._prepare_preceding_mkdir(arg.args, {}, mkdir_task_syntax, 3673 mkdir_description_with_args_placeholder, False)) 3674 3675 # Is this a function? 3676 # Turn this function into a task 3677 # (add task as attribute of this function) 3678 # Add self as dependent 3679 elif isinstance(arg, Callable): 3680 task = lookup_unique_task_from_func( 3681 arg, default_pipeline_name=self.pipeline.name) 3682 3683 # add new task to pipeline if necessary 3684 if not task: 3685 task = main_pipeline._create_task(task_func=arg) 3686 new_tasks.append(task) 3687 3688 else: 3689 raise ruffus_exceptions.error_decorator_args( 3690 "Expecting a function or function name or task name or " 3691 "Task or Pipeline.\n" + 3692 description_with_args_placeholder % (arg,)) 3693 3694 # add dependency 3695 # duplicate dependencies are ignore automatically 3696 # 3697 for task in new_tasks: 3698 self._add_parent(task) 3699 3700 # DEBUGGG 3701 # print(" _connect_parents finish %s" % self._get_display_name(), file = sys.stderr) 3702 return new_tasks 3703 3704 def check_if_uptodate(self, func): 3705 """ 3706 Specifies how a task is to be checked if it needs to be rerun (i.e. is 3707 up-to-date). 3708 func returns true if input / output files are up to date 3709 func takes as many arguments as the task function 3710 """ 3711 if not isinstance(func, Callable): 3712 description_with_args_placeholder = \ 3713 (self.description_with_args_placeholder % 3714 "...") + ".check_if_uptodate(%r)" 3715 raise ruffus_exceptions.error_decorator_args( 3716 "Expected a single function or Callable object in \n" + 3717 description_with_args_placeholder % (func,)) 3718 self.needs_update_func = func 3719 return self 3720 3721 def _decorator_check_if_uptodate(self, *args): 3722 """ 3723 @check_if_uptodate 3724 """ 3725 if len(args) != 1 or not isinstance(args[0], Callable): 3726 description_with_args_placeholder = "@check_if_uptodate(%r)\n" + ( 3727 self.description_with_args_placeholder % "...") 3728 raise ruffus_exceptions.error_decorator_args( 3729 "Expected a single function or Callable object in \n" + 3730 description_with_args_placeholder % (args,)) 3731 self.needs_update_func = args[0] 3732 3733 def posttask(self, *funcs): 3734 """ 3735 Takes one or more functions which will be called if the task completes 3736 """ 3737 description_with_args_placeholder = ("Expecting simple functions or touch_file() in \n" + 3738 (self.description_with_args_placeholder % "...") + 3739 ".posttask(%r)") 3740 self._set_posttask(description_with_args_placeholder, *funcs) 3741 return self 3742 3743 def _decorator_posttask(self, *funcs): 3744 """ 3745 @posttask 3746 """ 3747 description_with_args_placeholder = ("Expecting simple functions or touch_file() in \n" + 3748 "@posttask(%r)\n" + 3749 (self.description_with_args_placeholder % "...")) 3750 self._set_posttask(description_with_args_placeholder, *funcs) 3751 3752 def _set_posttask(self, description_with_args_placeholder, *funcs): 3753 """ 3754 Takes one or more functions which will be called if the task completes 3755 """ 3756 for arg in funcs: 3757 if isinstance(arg, touch_file): 3758 self.posttask_functions.append( 3759 touch_file_factory(arg.args, register_cleanup)) 3760 elif isinstance(arg, Callable): 3761 self.posttask_functions.append(arg) 3762 else: 3763 raise ruffus_exceptions.PostTaskArgumentError( 3764 description_with_args_placeholder % (arg,)) 3765 3766 def jobs_limit(self, maximum_jobs_in_parallel, limit_name=None): 3767 """ 3768 Limit the number of concurrent jobs 3769 """ 3770 description_with_args_placeholder = ((self.description_with_args_placeholder % "...") + 3771 ".jobs_limit(%r%s)") 3772 self._set_jobs_limit(description_with_args_placeholder, 3773 maximum_jobs_in_parallel, limit_name) 3774 return self 3775 3776 def _decorator_jobs_limit(self, maximum_jobs_in_parallel, limit_name=None): 3777 """ 3778 @jobs_limit 3779 """ 3780 description_with_args_placeholder = ("@jobs_limit(%r%s)\n" + 3781 (self.description_with_args_placeholder % "...")) 3782 self._set_jobs_limit(description_with_args_placeholder, 3783 maximum_jobs_in_parallel, limit_name) 3784 3785 def _set_jobs_limit(self, description_with_args_placeholder, 3786 maximum_jobs_in_parallel, limit_name=None): 3787 try: 3788 maximum_jobs_in_parallel = int(maximum_jobs_in_parallel) 3789 assert(maximum_jobs_in_parallel >= 1) 3790 except: 3791 limit_name = ", " + limit_name if limit_name else "" 3792 raise ruffus_exceptions.JobsLimitArgumentError( 3793 "Expecting a positive integer > 1 in \n" + 3794 description_with_args_placeholder % (maximum_jobs_in_parallel, limit_name)) 3795 3796 # set semaphore name to other than the "pipeline.name:task name" 3797 if limit_name is not None: 3798 self.semaphore_name = limit_name 3799 if self.semaphore_name in self._job_limit_semaphores: 3800 prev_maximum_jobs = self._job_limit_semaphores[self.semaphore_name] 3801 if prev_maximum_jobs != maximum_jobs_in_parallel: 3802 limit_name = ", " + limit_name if limit_name else "" 3803 raise ruffus_exceptions.JobsLimitArgumentError( 3804 'The job limit %r cannot re-defined from the former ' 3805 'limit of %d in \n' 3806 % (self.semaphore_name, prev_maximum_jobs) + 3807 description_with_args_placeholder 3808 % (maximum_jobs_in_parallel, limit_name)) 3809 else: 3810 # 3811 # save semaphore and limit 3812 # 3813 self._job_limit_semaphores[ 3814 self.semaphore_name] = maximum_jobs_in_parallel 3815 3816 def active_if(self, *active_if_checks): 3817 """ 3818 If any of active_checks is False or returns False, then the task is 3819 marked as "inactive" and its outputs removed. 3820 """ 3821 # print 'job is active:', active_checks, [ 3822 # arg() if isinstance(arg, Callable) else arg 3823 # for arg in active_checks] 3824 if self.active_if_checks is None: 3825 self.active_if_checks = [] 3826 self.active_if_checks.extend(active_if_checks) 3827 # print(self.active_if_checks) 3828 return self 3829 3830 def _decorator_active_if(self, *active_if_checks): 3831 """ 3832 @active_if 3833 """ 3834 self.active_if(*active_if_checks) 3835 3836 def graphviz(self, *unnamed_args, **named_args): 3837 """ 3838 Sets graphviz (e.g. `dot`) attributes used to draw this Task 3839 """ 3840 self.graphviz_attributes = named_args 3841 if len(unnamed_args): 3842 raise TypeError("Only named arguments expected in :" + 3843 self.description_with_args_placeholder % "..." + 3844 ".graphviz(%r)\n" % unnamed_args) 3845 return self 3846 3847 def _decorator_graphviz(self, *unnamed_args, **named_args): 3848 self.graphviz_attributes = named_args 3849 if len(unnamed_args): 3850 raise TypeError("Only named arguments expected in :" + 3851 "@graphviz(%r)\n" % unnamed_args + 3852 self.description_with_args_placeholder % "...") 3853 3854 3855class task_encoder(json.JSONEncoder): 3856 3857 def default(self, obj): 3858 if isinstance(obj, set): 3859 return list(obj) 3860 if isinstance(obj, defaultdict): 3861 return dict(obj) 3862 if isinstance(obj, Task): 3863 # , Task._action_names[obj._action_task], obj.func_description] 3864 return obj._name 3865 return json.JSONEncoder.default(self, obj) 3866 3867 3868def is_node_up_to_date(node, extra_data): 3869 """ 3870 Forwards tree depth first search "signalling" mechanism to 3871 node _is_up_to_date method 3872 Depth first search stops when node._is_up_to_date return True 3873 """ 3874 return node._is_up_to_date(extra_data) 3875 3876 3877def update_checksum_level_on_tasks(checksum_level): 3878 """Reset the checksum level for all tasks""" 3879 for n in node._all_nodes: 3880 n.checksum_level = checksum_level 3881 3882 3883def update_active_states_for_all_tasks(): 3884 """ 3885 3886 @active_if decorated tasks can change their active state every time 3887 pipeline_run / pipeline_printout / pipeline_printout_graph is called 3888 3889 update_active_states_for_all_tasks () 3890 3891 """ 3892 for n in node._all_nodes: 3893 n._update_active_state() 3894 3895 3896def lookup_pipeline(pipeline): 3897 """ 3898 If pipeline is 3899 None : main_pipeline 3900 string : lookup name in pipelines 3901 """ 3902 if pipeline is None: 3903 return main_pipeline 3904 3905 # Pipeline object pass through unchanged 3906 if isinstance(pipeline, Pipeline): 3907 return pipeline 3908 3909 # strings: lookup from name 3910 if isinstance(pipeline, str) and pipeline in Pipeline.pipelines: 3911 return Pipeline.pipelines[pipeline] 3912 3913 raise ruffus_exceptions.error_not_a_pipeline("%s does not name a pipeline." % pipeline) 3914 3915 3916def _pipeline_prepare_to_run(checksum_level, history_file, pipeline, runtime_data, target_tasks, forcedtorun_tasks): 3917 """ 3918 Common function to setup pipeline, check parameters 3919 before pipeline_run, pipeline_printout, pipeline_printout_graph 3920 """ 3921 3922 if checksum_level is None: 3923 checksum_level = get_default_checksum_level() 3924 3925 update_checksum_level_on_tasks(checksum_level) 3926 3927 # 3928 # If we aren't using checksums, and history file hasn't been specified, 3929 # we might be a bit surprised to find Ruffus writing to a 3930 # sqlite db anyway. 3931 # Let us just dump to a placeholder memory db that can then be discarded 3932 # Of course, if history_file is specified, we presume you know what 3933 # you are doing 3934 # 3935 if checksum_level == CHECKSUM_FILE_TIMESTAMPS and history_file is None: 3936 history_file = ':memory:' 3937 # 3938 # load previous job history if it exists, otherwise create an empty history 3939 # 3940 job_history = open_job_history(history_file) 3941 3942 # 3943 # @active_if decorated tasks can change their active state every time 3944 # pipeline_run / pipeline_printout / pipeline_printout_graph is called 3945 # 3946 update_active_states_for_all_tasks() 3947 3948 # 3949 # run time data 3950 # 3951 if runtime_data is None: 3952 runtime_data = {} 3953 if not isinstance(runtime_data, dict): 3954 raise Exception("Parameter runtime_data should be a " 3955 "dictionary of values passes to jobs at run time.") 3956 3957 # 3958 # This is the default namespace for looking for tasks 3959 # 3960 # pipeline must be a Pipeline or a string naming a pipeline 3961 # 3962 # Keep pipeline 3963 # 3964 if pipeline is not None: 3965 pipeline = lookup_pipeline(pipeline) 3966 default_pipeline_name = pipeline.name 3967 else: 3968 default_pipeline_name = "main" 3969 3970 # 3971 # Lookup target jobs 3972 # 3973 if target_tasks is None: 3974 target_tasks = [] 3975 if forcedtorun_tasks is None: 3976 forcedtorun_tasks = [] 3977 # lookup names, prioritise the specified pipeline or "main" 3978 target_tasks = lookup_tasks_from_user_specified_names( 3979 "Target", target_tasks, default_pipeline_name, "__main__", True) 3980 forcedtorun_tasks = lookup_tasks_from_user_specified_names("Forced to run", forcedtorun_tasks, 3981 default_pipeline_name, "__main__", True) 3982 3983 # 3984 # Empty target, either run the specified tasks from the pipeline 3985 # or will run every single task under the sun 3986 # 3987 if not target_tasks: 3988 if pipeline: 3989 target_tasks.extend(list(pipeline.tasks)) 3990 if not target_tasks: 3991 for pipeline_name in Pipeline.pipelines.keys(): 3992 target_tasks.extend( 3993 list(Pipeline.pipelines[pipeline_name].tasks)) 3994 3995 # make sure pipeline is defined 3996 pipeline = lookup_pipeline(pipeline) 3997 3998 # Unique task list 3999 target_tasks = list(set(target_tasks)) 4000 4001 # 4002 # Make sure all tasks in dependency list from (forcedtorun_tasks and target_tasks) 4003 # are setup and linked to real functions 4004 # 4005 processed_tasks = set() 4006 completed_pipeline_names = set() 4007 incomplete_pipeline_names = set() 4008 4009 # get list of all involved pipelines 4010 for task in forcedtorun_tasks + target_tasks: 4011 if task.pipeline.name not in completed_pipeline_names: 4012 incomplete_pipeline_names.add(task.pipeline.name) 4013 4014 # set up each pipeline. 4015 # These will in turn lookup up their antecedents (even in another pipeline) and 4016 # set them up as well. 4017 for pipeline_name in incomplete_pipeline_names: 4018 if pipeline_name in completed_pipeline_names: 4019 continue 4020 completed_pipeline_names = completed_pipeline_names.union( 4021 pipeline.pipelines[pipeline_name]._complete_task_setup(processed_tasks)) 4022 4023 return checksum_level, job_history, pipeline, runtime_data, target_tasks, forcedtorun_tasks 4024 4025 4026def pipeline_printout_graph(stream, 4027 output_format=None, 4028 target_tasks=[], 4029 forcedtorun_tasks=[], 4030 draw_vertically=True, 4031 ignore_upstream_of_target=False, 4032 skip_uptodate_tasks=False, 4033 gnu_make_maximal_rebuild_mode=True, 4034 test_all_task_for_update=True, 4035 no_key_legend=False, 4036 minimal_key_legend=True, 4037 user_colour_scheme=None, 4038 pipeline_name="Pipeline:", 4039 size=(11, 8), 4040 dpi=120, 4041 runtime_data=None, 4042 checksum_level=None, 4043 history_file=None, 4044 pipeline=None): 4045 # Remember to add further extra parameters here to 4046 # "extra_pipeline_printout_graph_options" inside cmdline.py 4047 # This will forward extra parameters from the 4048 # command line to pipeline_printout_graph 4049 """ 4050 print out pipeline dependencies in various formats 4051 4052 :param stream: where to print to 4053 :type stream: file-like object with ``write()`` function 4054 :param output_format: ["dot", "jpg", "svg", "ps", "png"]. All but the 4055 first depends on the 4056 `dot <http://www.graphviz.org>`_ program. 4057 :param target_tasks: targets task functions which will be run if they are 4058 out-of-date. 4059 :param forcedtorun_tasks: task functions which will be run whether or not 4060 they are out-of-date. 4061 :param draw_vertically: Top to bottom instead of left to right. 4062 :param ignore_upstream_of_target: Don't draw upstream tasks of targets. 4063 :param skip_uptodate_tasks: Don't draw up-to-date tasks if possible. 4064 :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all* 4065 out-of-date tasks. Runs minimal 4066 set to build targets if set to 4067 ``True``. Use with caution. 4068 :param test_all_task_for_update: Ask all task functions if they are 4069 up-to-date. 4070 :param no_key_legend: Don't draw key/legend for graph. 4071 :param minimal_key_legend: Only legend entries for used task types 4072 :param user_colour_scheme: Dictionary specifying flowchart colour scheme 4073 :param pipeline_name: Pipeline Title 4074 :param size: tuple of x and y dimensions 4075 :param dpi: print resolution 4076 :param runtime_data: Experimental feature: pass data to tasks at run time 4077 :param history_file: Database file storing checksums and file timestamps 4078 for input/output files. 4079 :param checksum_level: Several options for checking up-to-dateness are 4080 available: Default is level 1. 4081 level 0 : Use only file timestamps 4082 level 1 : above, plus timestamp of successful job completion 4083 level 2 : above, plus a checksum of the pipeline function body 4084 level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators 4085 """ 4086 4087 # EXTRA pipeline_run DEBUGGING 4088 global EXTRA_PIPELINERUN_DEBUGGING 4089 EXTRA_PIPELINERUN_DEBUGGING = False 4090 4091 (checksum_level, 4092 job_history, 4093 pipeline, 4094 runtime_data, 4095 target_tasks, 4096 forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level, history_file, 4097 pipeline, runtime_data, 4098 target_tasks, forcedtorun_tasks) 4099 4100 (topological_sorted, ignore_param1, ignore_param2, ignore_param3) = \ 4101 topologically_sorted_nodes(target_tasks, forcedtorun_tasks, 4102 gnu_make_maximal_rebuild_mode, 4103 extra_data_for_signal=[ 4104 t_verbose_logger(0, 0, None, runtime_data), job_history], 4105 signal_callback=is_node_up_to_date) 4106 if not len(target_tasks): 4107 target_tasks = topological_sorted[-1:] 4108 4109 # open file if (unicode?) string 4110 close_stream = False 4111 if isinstance(stream, path_str_type): 4112 stream = open(stream, "wb") 4113 close_stream = True 4114 4115 # derive format automatically from name 4116 if output_format is None: 4117 output_format = os.path.splitext(stream.name)[1].lstrip(".") 4118 4119 try: 4120 graph_printout(stream, 4121 output_format, 4122 target_tasks, 4123 forcedtorun_tasks, 4124 draw_vertically, 4125 ignore_upstream_of_target, 4126 skip_uptodate_tasks, 4127 gnu_make_maximal_rebuild_mode, 4128 test_all_task_for_update, 4129 no_key_legend, 4130 minimal_key_legend, 4131 user_colour_scheme, 4132 pipeline_name, 4133 size, 4134 dpi, 4135 extra_data_for_signal=[t_verbose_logger( 4136 0, 0, None, runtime_data), job_history], 4137 signal_callback=is_node_up_to_date) 4138 finally: 4139 # if this is a stream we opened, we have to close it ourselves 4140 if close_stream: 4141 stream.close() 4142 4143 4144def get_completed_task_strings(incomplete_tasks, all_tasks, forcedtorun_tasks, verbose, 4145 verbose_abbreviated_path, indent, runtime_data, job_history): 4146 """ 4147 Printout list of completed tasks 4148 """ 4149 completed_task_strings = [] 4150 if len(all_tasks) > len(incomplete_tasks): 4151 completed_task_strings.append("") 4152 completed_task_strings.append("_" * 40) 4153 completed_task_strings.append("Tasks which are up-to-date:") 4154 completed_task_strings.append("") 4155 completed_task_strings.append("") 4156 set_of_incomplete_tasks = set(incomplete_tasks) 4157 4158 for t in all_tasks: 4159 # Only print Up to date tasks 4160 if t in set_of_incomplete_tasks: 4161 continue 4162 # LOGGER 4163 completed_task_strings.extend(t._printout(runtime_data, 4164 t in forcedtorun_tasks, job_history, False, 4165 verbose, verbose_abbreviated_path, indent)) 4166 4167 completed_task_strings.append("_" * 40) 4168 completed_task_strings.append("") 4169 completed_task_strings.append("") 4170 4171 return completed_task_strings 4172 4173 4174def pipeline_printout(output_stream=None, 4175 target_tasks=[], 4176 forcedtorun_tasks=[], 4177 # verbose defaults to 4 if None 4178 verbose=None, 4179 indent=4, 4180 gnu_make_maximal_rebuild_mode=True, 4181 wrap_width=100, 4182 runtime_data=None, 4183 checksum_level=None, 4184 history_file=None, 4185 verbose_abbreviated_path=None, 4186 pipeline=None): 4187 # Remember to add further extra parameters here to 4188 # "extra_pipeline_printout_options" inside cmdline.py 4189 # This will forward extra parameters from the command 4190 # line to pipeline_printout 4191 """ 4192 Printouts the parts of the pipeline which will be run 4193 4194 Because the parameters of some jobs depend on the results of previous 4195 tasks, this function produces only the current snap-shot of task jobs. 4196 In particular, tasks which generate variable number of inputs into 4197 following tasks will not produce the full range of jobs. 4198 4199 :: 4200 verbose = 0 : Nothing 4201 verbose = 1 : All Tasks names 4202 verbose = 2 : All Tasks (including any task function docstrings) 4203 verbose = 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation 4204 verbose = 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings 4205 verbose = 5 : All Jobs in Out-of-date Tasks, (include only list of up-to-date tasks) 4206 verbose = 6 : All jobs in All Tasks whether out of date or not 4207 4208 :param output_stream: where to print to 4209 :type output_stream: file-like object with ``write()`` function 4210 :param target_tasks: targets task functions which will be run if they are 4211 out-of-date 4212 :param forcedtorun_tasks: task functions which will be run whether or not 4213 they are out-of-date 4214 :param verbose: level 0 : nothing 4215 level 1 : Out-of-date Task names 4216 level 2 : All Tasks (including any task function docstrings) 4217 level 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation 4218 level 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings 4219 level 5 : All Jobs in Out-of-date Tasks, (include only list of up-to-date tasks) 4220 level 6 : All jobs in All Tasks whether out of date or not 4221 level 7 : Show file modification times for All jobs in All Tasks 4222 level 10: logs messages useful only for debugging ruffus pipeline code 4223 :param indent: How much indentation for pretty format. 4224 :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all* 4225 out-of-date tasks. Runs minimal 4226 set to build targets if set to 4227 ``True``. Use with caution. 4228 :param wrap_width: The maximum length of each line 4229 :param runtime_data: Experimental feature: pass data to tasks at run time 4230 :param checksum_level: Several options for checking up-to-dateness are 4231 available: Default is level 1. 4232 level 0 : Use only file timestamps 4233 level 1 : above, plus timestamp of successful job completion 4234 level 2 : above, plus a checksum of the pipeline function body 4235 level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators 4236 :param history_file: Database file storing checksums and file timestamps for input/output files. 4237 :param verbose_abbreviated_path: whether input and output paths are abbreviated. 4238 level 0: The full (expanded, abspath) input or output path 4239 level > 1: The number of subdirectories to include. Abbreviated paths are prefixed with ``[,,,]/`` 4240 level < 0: Input / Output parameters are truncated to ``MMM`` letters where ``verbose_abbreviated_path ==-MMM``. Subdirectories are first removed to see if this allows the paths to fit in the specified limit. Otherwise abbreviated paths are prefixed by ``<???>`` 4241 """ 4242 # do nothing! 4243 if verbose == 0: 4244 return 4245 4246 # 4247 # default values 4248 # 4249 if verbose_abbreviated_path is None: 4250 verbose_abbreviated_path = 2 4251 if verbose is None: 4252 verbose = 4 4253 4254 # EXTRA pipeline_run DEBUGGING 4255 global EXTRA_PIPELINERUN_DEBUGGING 4256 EXTRA_PIPELINERUN_DEBUGGING = False 4257 4258 if output_stream is None: 4259 output_stream = sys.stdout 4260 4261 if not hasattr(output_stream, "write"): 4262 raise Exception("The first parameter to pipeline_printout needs to be " 4263 "an output file, e.g. sys.stdout and not %s" 4264 % str(output_stream)) 4265 4266 logging_strm = t_verbose_logger(verbose, verbose_abbreviated_path, 4267 t_stream_logger(output_stream), runtime_data) 4268 4269 (checksum_level, 4270 job_history, 4271 pipeline, 4272 runtime_data, 4273 target_tasks, 4274 forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level, history_file, 4275 pipeline, runtime_data, 4276 target_tasks, forcedtorun_tasks) 4277 4278 (incomplete_tasks, 4279 self_terminated_nodes, 4280 dag_violating_edges, 4281 dag_violating_nodes) = \ 4282 topologically_sorted_nodes(target_tasks, forcedtorun_tasks, 4283 gnu_make_maximal_rebuild_mode, 4284 extra_data_for_signal=[ 4285 t_verbose_logger(0, 0, None, runtime_data), job_history], 4286 signal_callback=is_node_up_to_date) 4287 4288 # 4289 # raise error if DAG violating nodes 4290 # 4291 if len(dag_violating_nodes): 4292 dag_violating_tasks = ", ".join(t._name for t in dag_violating_nodes) 4293 4294 e = ruffus_exceptions.error_circular_dependencies("Circular dependencies found in the pipeline involving " 4295 "one or more of (%s)" % (dag_violating_tasks,)) 4296 raise e 4297 4298 wrap_indent = " " * (indent + 11) 4299 4300 # 4301 # Get updated nodes as all_nodes - nodes_to_run 4302 # 4303 # LOGGER level 6 : All jobs in All Tasks whether out of date or not 4304 if verbose in [1, 2] or verbose >= 5: 4305 (all_tasks, ignore_param1, ignore_param2, ignore_param3) = \ 4306 topologically_sorted_nodes(target_tasks, True, gnu_make_maximal_rebuild_mode, 4307 extra_data_for_signal=[ 4308 t_verbose_logger( 4309 0, 0, None, runtime_data), 4310 job_history], 4311 signal_callback=is_node_up_to_date) 4312 for m in get_completed_task_strings(incomplete_tasks, all_tasks, forcedtorun_tasks, 4313 verbose, verbose_abbreviated_path, indent, 4314 runtime_data, job_history): 4315 output_stream.write(textwrap.fill(m, subsequent_indent=wrap_indent, 4316 width=wrap_width) + "\n") 4317 4318 output_stream.write("\n" + "_" * 40 + "\nTasks which will be run:\n\n") 4319 for t in incomplete_tasks: 4320 # LOGGER 4321 messages = t._printout(runtime_data, t in forcedtorun_tasks, 4322 job_history, True, verbose, 4323 verbose_abbreviated_path, indent) 4324 for m in messages: 4325 output_stream.write(textwrap.fill(m, subsequent_indent=wrap_indent, 4326 width=wrap_width) + "\n") 4327 4328 if verbose: 4329 # LOGGER 4330 output_stream.write("_" * 40 + "\n") 4331 4332 4333def get_semaphore(t, _job_limit_semaphores, syncmanager): 4334 """ 4335 return semaphore to limit the number of concurrent jobs 4336 """ 4337 # 4338 # Is this task limited in the number of jobs? 4339 # 4340 if t.semaphore_name not in t._job_limit_semaphores: 4341 return None 4342 4343 # 4344 # create semaphore if not yet created 4345 # 4346 if t.semaphore_name not in _job_limit_semaphores: 4347 maximum_jobs_num = t._job_limit_semaphores[t.semaphore_name] 4348 _job_limit_semaphores[t.semaphore_name] = syncmanager.BoundedSemaphore( 4349 maximum_jobs_num) 4350 return _job_limit_semaphores[t.semaphore_name] 4351 4352 4353 4354def job_needs_to_run(task, params, force_rerun, logger, verbose, job_name, 4355 job_history, verbose_abbreviated_path): 4356 """ 4357 Check if job parameters out of date / needs to rerun 4358 Also logs why things are up to date or not 4359 4360 TODO Is this a duplicate of logic in is_up_to_date?? 4361 TODO Is this a duplicate of logic in _printout?? 4362 TODO Ignores is_active 4363 """ 4364 4365 # Out of date because forced to run 4366 if force_rerun: 4367 # LOGGER: Out-of-date Jobs in Out-of-date Tasks 4368 log_at_level(logger, 3, verbose, " force task %s to rerun " 4369 % job_name) 4370 return True 4371 4372 if task.needs_update_func is None: 4373 # LOGGER: Out-of-date Jobs in Out-of-date Tasks 4374 log_at_level(logger, 3, verbose, " %s no function to check " 4375 "if up-to-date " % job_name) 4376 return True 4377 4378 # extra clunky hack to also pass task info-- 4379 # makes sure that there haven't been code or 4380 # arg changes 4381 if task.needs_update_func == needs_update_check_modify_time: 4382 needs_update, msg = task.needs_update_func( 4383 *params, task=task, job_history=job_history, 4384 verbose_abbreviated_path=verbose_abbreviated_path, 4385 return_file_dates_when_uptodate=verbose > 6) 4386 else: 4387 needs_update, msg = task.needs_update_func(*params) 4388 4389 if not needs_update: 4390 # LOGGER: All Jobs in Out-of-date Tasks 4391 log_at_level(logger, 5, verbose, 4392 " %s unnecessary: already %s" % (job_name, msg)) 4393 return False 4394 4395 4396 # LOGGER: Out-of-date Jobs in Out-of-date 4397 # Tasks: Why out of date 4398 if not log_at_level(logger, 4, verbose, " %s %s " % (job_name, msg)): 4399 # LOGGER: Out-of-date Jobs in 4400 # Out-of-date Tasks: No explanation 4401 log_at_level(logger, 3, verbose, " %s" % (job_name)) 4402 4403 # 4404 # Clunky hack to make sure input files exists right 4405 # before job is called for better error messages 4406 # 4407 if task.needs_update_func == needs_update_check_modify_time: 4408 check_input_files_exist(*params) 4409 4410 return True 4411 4412 4413def remove_completed_tasks(task_with_completed_job_q, incomplete_tasks, 4414 count_remaining_jobs, logger, verbose): 4415 """ 4416 Remove completed tasks in same thread as job parameters generation to 4417 prevent race conditions 4418 Task completion is usually signalled from pipeline_run 4419 """ 4420 while True: 4421 try: 4422 (job_completed_task, 4423 job_completed_task_name, 4424 job_completed_node_index, 4425 job_completed_name) = task_with_completed_job_q.get_nowait() 4426 4427 if job_completed_task not in incomplete_tasks: 4428 raise Exception("Last job %s for %s. Missing from " 4429 "incomplete tasks in make_job_parameter_generator" 4430 % (job_completed_name, job_completed_task_name)) 4431 count_remaining_jobs[job_completed_task] -= 1 4432 # 4433 # Negative job count : something has gone very wrong 4434 # 4435 if count_remaining_jobs[job_completed_task] < 0: 4436 raise Exception("job %s for %s causes job count < 0." 4437 % (job_completed_name, 4438 job_completed_task_name)) 4439 4440 # 4441 # This Task completed 4442 # 4443 if count_remaining_jobs[job_completed_task] == 0: 4444 log_at_level(logger, 10, verbose, " Last job for %r. " 4445 "Retired from incomplete tasks in pipeline_run " 4446 % job_completed_task._get_display_name()) 4447 incomplete_tasks.remove(job_completed_task) 4448 job_completed_task._completed() 4449 log_at_level(logger, 1, verbose, "Completed Task = %r " 4450 % job_completed_task._get_display_name()) 4451 4452 except queue.Empty: 4453 break 4454 4455 4456def make_job_parameter_generator(incomplete_tasks, task_parents, logger, 4457 forcedtorun_tasks, task_with_completed_job_q, 4458 runtime_data, verbose, 4459 verbose_abbreviated_path, 4460 syncmanager, 4461 death_event, 4462 touch_files_only, job_history): 4463 """ 4464 Parameter generator factory for all jobs / tasks 4465 """ 4466 4467 inprogress_tasks = set() 4468 _job_limit_semaphores = dict() 4469 4470 # _________________________________________________________________________ 4471 # 4472 # Parameter generator returned by factory 4473 # 4474 # _________________________________________________________________________ 4475 def parameter_generator(): 4476 count_remaining_jobs = defaultdict(int) 4477 log_at_level(logger, 10, verbose, " job_parameter_generator BEGIN") 4478 while len(incomplete_tasks): 4479 cnt_jobs_created_for_all_tasks = 0 4480 cnt_tasks_processed = 0 4481 4482 # 4483 # get rid of all completed tasks first 4484 # Completion is signalled from pipeline_run 4485 # 4486 remove_completed_tasks(task_with_completed_job_q, incomplete_tasks, 4487 count_remaining_jobs, logger, verbose) 4488 4489 for t in list(incomplete_tasks): 4490 # 4491 # wrap in execption handler so that we know 4492 # which task the original exception came from 4493 # 4494 try: 4495 log_at_level(logger, 10, verbose, " job_parameter_generator consider " 4496 "task = %r" % t._get_display_name()) 4497 4498 # ignore tasks in progress 4499 if t in inprogress_tasks: 4500 continue 4501 log_at_level(logger, 10, verbose, " job_parameter_generator task %r not in " 4502 "progress" % t._get_display_name()) 4503 4504 # ignore tasks with incomplete dependencies 4505 incomplete_parent = False 4506 for parent in task_parents[t]: 4507 if parent in incomplete_tasks: 4508 incomplete_parent = True 4509 break 4510 if incomplete_parent: 4511 continue 4512 4513 log_at_level(logger, 10, verbose, " job_parameter_generator start task %r " 4514 "(parents completed)" % t._get_display_name()) 4515 force_rerun = t in forcedtorun_tasks 4516 inprogress_tasks.add(t) 4517 cnt_tasks_processed += 1 4518 4519 # 4520 # Log active task 4521 # 4522 if t.is_active: 4523 forced_msg = ": Forced to rerun" if force_rerun else "" 4524 log_at_level(logger, 1, verbose, "Task enters queue = %r %s" 4525 % (t._get_display_name(), forced_msg)) 4526 if len(t.func_description): 4527 log_at_level(logger, 2, verbose, 4528 " " + t.func_description) 4529 # 4530 # Inactive skip loop 4531 # 4532 else: 4533 incomplete_tasks.remove(t) 4534 # N.B. inactive tasks are not _completed() 4535 # t._completed() 4536 t.output_filenames = None 4537 log_at_level(logger, 2, verbose, "Inactive Task = %r" 4538 % t._get_display_name()) 4539 continue 4540 4541 # use output parameters generated by running task 4542 t.output_filenames = [] 4543 4544 # If no parameters: just call task function (empty list) 4545 if t.param_generator_func is None: 4546 task_parameters = ([[], []],) 4547 else: 4548 task_parameters = t.param_generator_func(runtime_data) 4549 4550 # 4551 # iterate through jobs 4552 # 4553 cnt_jobs_created = 0 4554 for params, unglobbed_params in task_parameters: 4555 4556 # 4557 # save output even if uptodate 4558 # 4559 if len(params) >= 2: 4560 # To do: In the case of split subdivide, we should be doing this after 4561 # The job finishes 4562 t.output_filenames.append(params[1]) 4563 4564 job_name = t._get_job_name(unglobbed_params, 4565 verbose_abbreviated_path, 4566 runtime_data) 4567 4568 if not job_needs_to_run(t, params, force_rerun, logger, verbose, job_name, 4569 job_history, verbose_abbreviated_path): 4570 continue 4571 4572 # pause for one second before first job of each tasks 4573 # @originate tasks do not need to pause, 4574 # because they depend on nothing! 4575 if cnt_jobs_created == 0 and touch_files_only < 2: 4576 if "ONE_SECOND_PER_JOB" in runtime_data and \ 4577 runtime_data["ONE_SECOND_PER_JOB"] and \ 4578 t._action_type != Task._action_task_originate: 4579 log_at_level(logger, 10, verbose, 4580 " 1 second PAUSE in job_parameter_generator\n\n\n") 4581 time.sleep(1.01) 4582 else: 4583 time.sleep(0.1) 4584 4585 count_remaining_jobs[t] += 1 4586 cnt_jobs_created += 1 4587 cnt_jobs_created_for_all_tasks += 1 4588 4589 yield (params, 4590 unglobbed_params, 4591 t._name, 4592 t._node_index, 4593 job_name, 4594 t.job_wrapper, 4595 t.user_defined_work_func, 4596 get_semaphore( 4597 t, _job_limit_semaphores, syncmanager), 4598 death_event, 4599 touch_files_only) 4600 4601 # if no job came from this task, this task is complete 4602 # we need to retire it here instead of normal completion 4603 # at end of job tasks precisely 4604 # because it created no jobs 4605 if cnt_jobs_created == 0: 4606 incomplete_tasks.remove(t) 4607 t._completed() 4608 log_at_level(logger, 1, verbose, 4609 "Uptodate Task = %r" % t._get_display_name()) 4610 # LOGGER: logs All Tasks (including any task function docstrings) 4611 log_at_level(logger, 10, verbose, " No jobs created for %r. Retired " 4612 "in parameter_generator " % t._get_display_name()) 4613 4614 # 4615 # Add extra warning if no regular expressions match: 4616 # This is a common class of frustrating errors 4617 # 4618 # DEBUGGGG!! 4619 if verbose >= 1 and \ 4620 "ruffus_WARNING" in runtime_data and \ 4621 t.param_generator_func in runtime_data["ruffus_WARNING"]: 4622 indent_str = " " * 8 4623 for msg in runtime_data["ruffus_WARNING"][t.param_generator_func]: 4624 messages = [msg.replace( 4625 "\n", "\n" + indent_str)] 4626 if verbose >= 4 and runtime_data and \ 4627 "MATCH_FAILURE" in runtime_data and \ 4628 t.param_generator_func in runtime_data["MATCH_FAILURE"]: 4629 for job_msg in runtime_data["MATCH_FAILURE"][t.param_generator_func]: 4630 messages.append( 4631 indent_str + "Job Warning: Input substitution failed:") 4632 messages.append( 4633 indent_str + " " + job_msg.replace("\n", "\n" + indent_str + " ")) 4634 logger.warning(" In Task %r:\n%s%s " 4635 % (t._get_display_name(), indent_str, "\n".join(messages))) 4636 4637 # 4638 # GeneratorExit thrown when generator doesn't complete. 4639 # I.e. there is a break in the pipeline_run loop. 4640 # This happens where there are exceptions 4641 # signalled from within a job 4642 # 4643 # This is not really an exception, more a way to exit the 4644 # generator loop asynchrononously so that cleanups can 4645 # happen (e.g. the "with" statement or finally.) 4646 # 4647 # We could write except Exception: below which will catch 4648 # everything but KeyboardInterrupt and StopIteration 4649 # and GeneratorExit in python 2.6 4650 # 4651 # However, in python 2.5, GeneratorExit inherits from 4652 # Exception. So we explicitly catch and rethrow 4653 # GeneratorExit. 4654 except GeneratorExit: 4655 raise 4656 except: 4657 exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() 4658 exception_stack = traceback.format_exc() 4659 exception_name = exceptionType.__module__ + '.' + exceptionType.__name__ 4660 exception_value = str(exceptionValue) 4661 if len(exception_value): 4662 exception_value = "(%s)" % exception_value 4663 errt = ruffus_exceptions.RethrownJobError([(t._name, 4664 "", 4665 exception_name, 4666 exception_value, 4667 exception_stack)]) 4668 errt.specify_task(t, "Exceptions generating parameters") 4669 raise errt 4670 4671 # extra tests in case final tasks do not result in jobs 4672 if len(incomplete_tasks) and \ 4673 (not cnt_tasks_processed or cnt_jobs_created_for_all_tasks): 4674 log_at_level(logger, 10, verbose, " incomplete tasks = " + 4675 ",".join([t._name for t in incomplete_tasks])) 4676 yield waiting_for_more_tasks_to_complete() 4677 4678 yield all_tasks_complete() 4679 # This function is done 4680 log_at_level(logger, 10, verbose, " job_parameter_generator END") 4681 4682 return parameter_generator 4683 4684 4685def feed_job_params_to_process_pool_factory(parameter_q, death_event, logger, 4686 verbose): 4687 """ 4688 Process pool gets its parameters from this generator 4689 Use factory function to save parameter_queue 4690 """ 4691 def feed_job_params_to_process_pool(): 4692 log_at_level(logger, 10, verbose, 4693 " Send params to Pooled Process START") 4694 while 1: 4695 log_at_level(logger, 10, verbose, 4696 " Get next parameter size = %d" % parameter_q.qsize()) 4697 if not parameter_q.qsize(): 4698 time.sleep(0.1) 4699 params = parameter_q.get() 4700 log_at_level(logger, 10, verbose, " Get next parameter done") 4701 4702 # all tasks done 4703 if isinstance(params, all_tasks_complete): 4704 break 4705 4706 if death_event.is_set(): 4707 death_event.clear() 4708 break 4709 4710 log_at_level(logger, 10, verbose, 4711 " Send params to Pooled Process=>" + str(params[0])) 4712 yield params 4713 4714 log_at_level(logger, 10, verbose, 4715 " Send params to Pooled Process END") 4716 4717 # return generator 4718 return feed_job_params_to_process_pool 4719 4720 4721def fill_queue_with_job_parameters(job_parameters, parameter_q, POOL_SIZE, 4722 logger, verbose): 4723 """ 4724 Ensures queue filled with number of parameters > jobs / slots (POOL_SIZE) 4725 """ 4726 log_at_level(logger, 10, verbose, 4727 " fill_queue_with_job_parameters START") 4728 4729 for params in job_parameters: 4730 4731 # stop if no more jobs available 4732 if isinstance(params, waiting_for_more_tasks_to_complete): 4733 log_at_level(logger, 10, verbose, 4734 " fill_queue_with_job_parameters WAITING for task to complete") 4735 break 4736 4737 if not isinstance(params, all_tasks_complete): 4738 log_at_level(logger, 10, verbose, " fill_queue_with_job_parameters=>" + 4739 str(params[0])) 4740 4741 # put into queue 4742 parameter_q.put(params) 4743 4744 # queue size needs to be at least 2 so that the parameter queue never 4745 # consists of a singlewaiting_for_task_to_complete entry which will 4746 # cause a loop and everything to hang! 4747 if parameter_q.qsize() > POOL_SIZE + 1: 4748 break 4749 log_at_level(logger, 10, verbose, " fill_queue_with_job_parameters END") 4750 4751 4752def pipeline_get_task_names(pipeline=None): 4753 """ 4754 Get all task names in a pipeline 4755 Not that does not check if pipeline is wired up properly 4756 """ 4757 4758 # EXTRA pipeline_run DEBUGGING 4759 global EXTRA_PIPELINERUN_DEBUGGING 4760 EXTRA_PIPELINERUN_DEBUGGING = False 4761 4762 # 4763 # pipeline must be a Pipeline or a string naming a pipeline 4764 # 4765 pipeline = lookup_pipeline(pipeline) 4766 4767 # 4768 # Make sure all tasks in dependency list are linked to real functions 4769 # 4770 processed_tasks = set() 4771 completed_pipeline_names = pipeline._complete_task_setup(processed_tasks) 4772 4773 # 4774 # Return task names for all nodes willy nilly 4775 # 4776 4777 return [n._name for n in node._all_nodes] 4778 4779 4780def get_job_result_output_file_names(job_result): 4781 """ 4782 Excludes input file names being passed through 4783 """ 4784 if len(job_result.unglobbed_params) <= 1: # some jobs have no outputs 4785 return 4786 4787 unglobbed_input_params = job_result.unglobbed_params[0] 4788 unglobbed_output_params = job_result.unglobbed_params[1] 4789 4790 # some have multiple outputs from one job 4791 if not isinstance(unglobbed_output_params, list): 4792 unglobbed_output_params = [unglobbed_output_params] 4793 4794 # canonical path of input files, retaining any symbolic links: 4795 # symbolic links have their own checksumming 4796 input_file_names = set() 4797 for i_f_n in get_strings_in_flattened_sequence([unglobbed_input_params]): 4798 input_file_names.add(os.path.abspath(i_f_n)) 4799 4800 # 4801 # N.B. output parameters are not necessary all strings 4802 # and not all files have been successfully created, 4803 # even though the task apparently completed properly! 4804 # Remember to re-expand globs (from unglobbed paramters) 4805 # after the job has run successfully 4806 # 4807 for possible_glob_str in get_strings_in_flattened_sequence(unglobbed_output_params): 4808 for o_f_n in glob.glob(possible_glob_str): 4809 # 4810 # Exclude output files if they are input files "passed through" 4811 # 4812 if os.path.abspath(o_f_n) in input_file_names: 4813 continue 4814 4815 # 4816 # use paths relative to working directory 4817 # 4818 yield os.path.relpath(o_f_n) 4819 4820 return 4821 4822 4823def handle_sigint(pool, pipeline): 4824 pool.kill(ruffus_exceptions.JobSignalledBreak) 4825 4826 4827def handle_sigusr1(pool, pipeline): 4828 pipeline.suspend_jobs() 4829 4830 4831def handle_sigusr2(pool, pipeline): 4832 pipeline.resume_jobs() 4833 4834 4835# How the job queue works: 4836# Main loop 4837# 4838# iterates pool.map using feed_job_params_to_process_pool() 4839# (calls parameter_q.get() until all_tasks_complete) 4840# 4841# if errors but want to finish tasks already in pipeine: 4842# parameter_q.put(all_tasks_complete()) 4843# keep going 4844# else: 4845# 4846# loops through jobs until no more jobs in non-dependent tasks 4847# separate loop in generator so that list of incomplete_tasks 4848# does not get updated half way through 4849# causing race conditions 4850# 4851# parameter_q.put(params) 4852# until waiting_for_more_tasks_to_complete 4853# until queue is full (check *after*) 4854# 4855def pipeline_run(target_tasks=[], 4856 forcedtorun_tasks=[], 4857 multiprocess=1, 4858 logger=stderr_logger, 4859 gnu_make_maximal_rebuild_mode=True, 4860 # verbose defaults to 1 if None 4861 verbose=None, 4862 runtime_data=None, 4863 one_second_per_job=None, 4864 touch_files_only=False, 4865 exceptions_terminate_immediately=False, 4866 log_exceptions=False, 4867 checksum_level=None, 4868 multithread=0, 4869 history_file=None, 4870 # defaults to 2 if None 4871 verbose_abbreviated_path=None, 4872 pipeline=None, 4873 pool_manager="multiprocessing"): 4874 # Remember to add further extra parameters here to 4875 # "extra_pipeline_run_options" inside cmdline.py 4876 # This will forward extra parameters from the command line to 4877 # pipeline_run 4878 """Run pipelines. 4879 4880 :param target_tasks: targets task functions which will be run if they are 4881 out-of-date 4882 :param forcedtorun_tasks: task functions which will be run whether or not 4883 they are out-of-date 4884 :param multiprocess: The number of concurrent jobs running on different 4885 processes. 4886 :param multithread: The number of concurrent jobs running as different 4887 threads. If > 1, ruffus will use multithreading 4888 *instead of* multiprocessing (and ignore the 4889 multiprocess parameter). Using multi threading 4890 is particularly useful to manage high performance 4891 clusters which otherwise are prone to 4892 "processor storms" when large number of cores finish 4893 jobs at the same time. 4894 :param logger: Where progress will be logged. Defaults to stderr output. 4895 :type logger: `logging <http://docs.python.org/library/logging.html>`_ 4896 objects 4897 :param verbose: 4898 4899 * level 0 : nothing 4900 * level 1 : All Task names 4901 * level 2 : All Tasks names any task function docstrings 4902 * level 3 : Out-of-date Jobs in Out-of-date Tasks, no explanation 4903 * level 4 : Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings 4904 * level 5 : All Jobs in Out-of-date Tasks, (include only list of up-to-date 4905 tasks) 4906 * level 6 : All jobs in All Tasks whether out of date or not 4907 * level 7 : Show file modification times for All jobs in All Tasks 4908 * level 10: logs messages useful only for debugging ruffus pipeline code 4909 :param touch_files_only: Create or update input/output files only to 4910 simulate running the pipeline. Do not run jobs. 4911 If set to CHECKSUM_REGENERATE, will regenerate 4912 the checksum history file to reflect the existing 4913 i/o files on disk. 4914 :param exceptions_terminate_immediately: Exceptions cause immediate 4915 termination rather than waiting 4916 for N jobs to finish where 4917 N = multiprocess 4918 :param log_exceptions: Print exceptions to logger as soon as they occur. 4919 :param checksum_level: Several options for checking up-to-dateness are 4920 available: Default is level 1. 4921 4922 * level 0 : Use only file timestamps 4923 * level 1 : above, plus timestamp of successful job completion 4924 * level 2 : above, plus a checksum of the pipeline function body 4925 * level 3 : above, plus a checksum of the pipeline 4926 function default arguments and the 4927 additional arguments passed in by task 4928 decorators 4929 :param one_second_per_job: To work around poor file timepstamp resolution 4930 for some file systems. Defaults to True if 4931 checksum_level is 0 forcing Tasks to take a 4932 minimum of 1 second to complete. 4933 :param runtime_data: Experimental feature: pass data to tasks at run time 4934 :param gnu_make_maximal_rebuild_mode: Defaults to re-running *all* 4935 out-of-date tasks. Runs minimal 4936 set to build targets if set to 4937 ``True``. Use with caution. 4938 :param history_file: Database file storing checksums and file timestamps 4939 for input/output files. 4940 :param verbose_abbreviated_path: whether input and output paths are abbreviated. 4941 4942 * level 0: The full (expanded, abspath) input or output path 4943 * level > 1: The number of subdirectories to include. 4944 Abbreviated paths are prefixed with ``[,,,]/`` 4945 * level < 0: Input / Output parameters are truncated 4946 to ``MMM`` letters where ``verbose_abbreviated_path 4947 ==-MMM``. Subdirectories are first removed to see 4948 if this allows the paths to fit in the specified 4949 limit. Otherwise abbreviated paths are prefixed by 4950 ``<???>`` 4951 """ 4952 # DEBUGGG 4953 #print("pipeline_run start", file = sys.stderr) 4954 4955 # 4956 # default values 4957 # 4958 if touch_files_only is False: 4959 touch_files_only = 0 4960 elif touch_files_only is True: 4961 touch_files_only = 1 4962 else: 4963 touch_files_only = 2 4964 # we are not running anything so do it as quickly as possible 4965 one_second_per_job = False 4966 if verbose is None: 4967 verbose = 1 4968 if verbose_abbreviated_path is None: 4969 verbose_abbreviated_path = 2 4970 4971 # EXTRA pipeline_run DEBUGGING 4972 global EXTRA_PIPELINERUN_DEBUGGING 4973 if verbose >= 10: 4974 EXTRA_PIPELINERUN_DEBUGGING = True 4975 else: 4976 EXTRA_PIPELINERUN_DEBUGGING = False 4977 4978 if verbose == 0: 4979 logger = black_hole_logger 4980 elif verbose >= 11: 4981 # debugging aid: See t_stderr_logger 4982 # Each invocation of add_unique_prefix adds a unique prefix to 4983 # all subsequent output So that individual runs of pipeline run 4984 # are tagged 4985 if hasattr(logger, "add_unique_prefix"): 4986 logger.add_unique_prefix() 4987 4988 (checksum_level, 4989 job_history, 4990 pipeline, 4991 runtime_data, 4992 target_tasks, 4993 forcedtorun_tasks) = _pipeline_prepare_to_run(checksum_level, 4994 history_file, 4995 pipeline, 4996 runtime_data, 4997 target_tasks, 4998 forcedtorun_tasks) 4999 5000 # select pool and queue type. Selection is convoluted 5001 # or backwards compatibility. 5002 itr_kwargs = {} 5003 if multiprocess is None: 5004 multiprocess = 0 5005 if multithread is None: 5006 multithread = 0 5007 parallelism = max(multiprocess, multithread) 5008 5009 if parallelism > 1: 5010 if pool_manager == "multiprocessing": 5011 syncmanager = multiprocessing.Manager() 5012 death_event = syncmanager.Event() 5013 if multithread: 5014 pool_t = ThreadPool 5015 queue_t = queue.Queue 5016 elif multiprocess > 1: 5017 pool_t = ProcessPool 5018 queue_t = queue.Queue 5019 # Use a timeout of 3 years per job..., so that the condition 5020 # we are waiting for in the thread can be interrupted by 5021 # signals... In other words, so that Ctrl-C works 5022 # Yucky part is that timeout is an extra parameter to 5023 # IMapIterator.next(timeout=None) but next() for normal 5024 # iterators do not take any extra parameters. 5025 itr_kwargs = dict(timeout=99999999) 5026 pool = pool_t(parallelism) 5027 elif pool_manager == "gevent": 5028 import gevent.event 5029 import gevent.queue 5030 import gevent.pool 5031 import gevent.signal 5032 try: 5033 import gevent.lock as gevent_lock 5034 except: 5035 import gevent.coros as gevent_lock 5036 syncmanager = gevent_lock 5037 death_event = gevent.event.Event() 5038 pool_t = gevent.pool.Pool 5039 pool = pool_t(parallelism) 5040 queue_t = gevent.queue.Queue 5041 gevent.signal(signal.SIGINT, functools.partial(handle_sigint, pool=pool, pipeline=pipeline)) 5042 gevent.signal(signal.SIGUSR1, functools.partial(handle_sigusr1, pool=pool, pipeline=pipeline)) 5043 gevent.signal(signal.SIGUSR2, functools.partial(handle_sigusr2, pool=pool, pipeline=pipeline)) 5044 else: 5045 raise ValueError("unknown pool manager '{}'".format(pool_manager)) 5046 5047 else: 5048 syncmanager = multiprocessing.Manager() 5049 death_event = syncmanager.Event() 5050 pool = None 5051 queue_t = queue.Queue 5052 5053 # Supplement mtime with system clock if using 5054 # CHECKSUM_HISTORY_TIMESTAMPS we don't need to default to adding 1 5055 # second delays between jobs 5056 if one_second_per_job is None: 5057 if checksum_level == CHECKSUM_FILE_TIMESTAMPS: 5058 log_at_level(logger, 10, verbose, 5059 " Checksums rely on FILE TIMESTAMPS only and we don't know if the " 5060 "system file time resolution: Pause 1 second...") 5061 runtime_data["ONE_SECOND_PER_JOB"] = True 5062 else: 5063 log_at_level(logger, 10, verbose, " Checksum use calculated time as well: " 5064 "No 1 second pause...") 5065 runtime_data["ONE_SECOND_PER_JOB"] = False 5066 else: 5067 log_at_level(logger, 10, verbose, " One second per job specified to be %s" 5068 % one_second_per_job) 5069 runtime_data["ONE_SECOND_PER_JOB"] = one_second_per_job 5070 5071 if touch_files_only and verbose >= 1: 5072 logger.info("Touch output files instead of remaking them.") 5073 5074 5075 # To update the checksum file, we force all tasks to rerun but 5076 # then don't actually call the task function... 5077 # So starting with target_tasks and forcedtorun_tasks, 5078 # we harvest all upstream dependencies willy, nilly 5079 # and assign the results to forcedtorun_tasks 5080 if touch_files_only == 2: 5081 (forcedtorun_tasks, ignore_param1, ignore_param2, ignore_param3) = \ 5082 topologically_sorted_nodes(target_tasks + forcedtorun_tasks, True, 5083 gnu_make_maximal_rebuild_mode, 5084 extra_data_for_signal=[t_verbose_logger(0, 0, None, 5085 runtime_data), 5086 job_history], 5087 signal_callback=is_node_up_to_date) 5088 5089 # If verbose >=10, for debugging: 5090 # Prints which tasks trigger the pipeline rerun... 5091 # i.e. start from the farthest task, prints out all the up to date 5092 # tasks, and the first out of date task 5093 (incomplete_tasks, self_terminated_nodes, 5094 dag_violating_edges, dag_violating_nodes) = \ 5095 topologically_sorted_nodes(target_tasks, forcedtorun_tasks, 5096 gnu_make_maximal_rebuild_mode, 5097 extra_data_for_signal=[ 5098 t_verbose_logger(verbose, verbose_abbreviated_path, 5099 logger, runtime_data), 5100 job_history], 5101 signal_callback=is_node_up_to_date) 5102 5103 if len(dag_violating_nodes): 5104 dag_violating_tasks = ", ".join(t._name for t in dag_violating_nodes) 5105 5106 e = ruffus_exceptions.error_circular_dependencies("Circular dependencies found in the " 5107 "pipeline involving one or more of " 5108 "(%s)" % (dag_violating_tasks)) 5109 raise e 5110 5111 # get dependencies. Only include tasks which will be run 5112 set_of_incomplete_tasks = set(incomplete_tasks) 5113 task_parents = defaultdict(set) 5114 for t in set_of_incomplete_tasks: 5115 task_parents[t] = set() 5116 for parent in t._get_inward(): 5117 if parent in set_of_incomplete_tasks: 5118 task_parents[t].add(parent) 5119 5120 # Print Complete tasks 5121 # LOGGER level 5 : All jobs in All Tasks whether out of date or not 5122 if verbose in [1, 2] or verbose >= 5: 5123 (all_tasks, ignore_param1, ignore_param2, ignore_param3) = topologically_sorted_nodes( 5124 target_tasks, True, 5125 gnu_make_maximal_rebuild_mode, 5126 extra_data_for_signal=[t_verbose_logger(0, 0, None, 5127 runtime_data), 5128 job_history], 5129 signal_callback=is_node_up_to_date) 5130 # indent hardcoded to 4 5131 for m in get_completed_task_strings(incomplete_tasks, all_tasks, 5132 forcedtorun_tasks, verbose, 5133 verbose_abbreviated_path, 4, 5134 runtime_data, job_history): 5135 logger.info(m) 5136 5137 # print json.dumps(task_parents.items(), indent=4, cls=task_encoder) 5138 logger.info("") 5139 logger.info("_" * 40) 5140 logger.info("Tasks which will be run:") 5141 logger.info("") 5142 logger.info("") 5143 5144 # prepare tasks for pipeline run: 5145 # 5146 # clear task outputs 5147 # task.output_filenames = None 5148 # 5149 # ********** 5150 # BEWARE 5151 # ********** 5152 # 5153 # Because state is stored, ruffus is *not* reentrant. 5154 # 5155 # ********** 5156 # BEWARE 5157 # ********** 5158 for t in incomplete_tasks: 5159 t._init_for_pipeline() 5160 5161 # 5162 # prime queue with initial set of job parameters 5163 # 5164 parameter_q = queue_t() 5165 task_with_completed_job_q = queue_t() 5166 5167 parameter_generator = make_job_parameter_generator(incomplete_tasks, 5168 task_parents, 5169 logger, forcedtorun_tasks, 5170 task_with_completed_job_q, 5171 runtime_data, verbose, 5172 verbose_abbreviated_path, 5173 syncmanager, death_event, 5174 touch_files_only, job_history) 5175 job_parameters = parameter_generator() 5176 fill_queue_with_job_parameters( 5177 job_parameters, parameter_q, parallelism, logger, verbose) 5178 5179 # 5180 # N.B. 5181 # Handling keyboard shortcuts may require 5182 # See http://stackoverflow.com/questions/1408356/ 5183 # keyboard-interrupts-with-pythons-multiprocessing-pool 5184 # 5185 # When waiting for a condition in threading.Condition.wait(), 5186 # KeyboardInterrupt is never sent 5187 # unless a timeout is specified 5188 # 5189 # 5190 # 5191 # # 5192 # whether using multiprocessing 5193 # # 5194 # pool = Pool(parallelism) if multiprocess > 1 else None 5195 # if pool: 5196 # pool_func = pool.imap_unordered 5197 # job_iterator_timeout = [] 5198 # else: 5199 # pool_func = imap 5200 # job_iterator_timeout = [999999999999] 5201 # 5202 # 5203 # .... 5204 # 5205 # 5206 # it = pool_func(run_pooled_job_without_exceptions, 5207 # feed_job_params_to_process_pool()) 5208 # while 1: 5209 # try: 5210 # job_result = it.next(*job_iterator_timeout) 5211 # 5212 # ... 5213 # 5214 # except StopIteration: 5215 # break 5216 5217 if pool is not None: 5218 pool_func = pool.imap_unordered 5219 else: 5220 pool_func = map 5221 5222 feed_job_params_to_process_pool = feed_job_params_to_process_pool_factory( 5223 parameter_q, death_event, logger, verbose) 5224 5225 # 5226 # for each result from job 5227 # 5228 job_errors = ruffus_exceptions.RethrownJobError() 5229 tasks_with_errors = set() 5230 5231 # 5232 # job_result.job_name / job_result.return_value 5233 # Reserved for returning result from job... 5234 # How? 5235 # 5236 # Rewrite for loop so we can call iter.next() with a timeout 5237 try: 5238 5239 # for job_result in pool_func(run_pooled_job_without_exceptions, 5240 # feed_job_params_to_process_pool()): 5241 ii = iter(pool_func(run_pooled_job_without_exceptions, 5242 feed_job_params_to_process_pool())) 5243 while 1: 5244 if pool is not None: 5245 job_result = ii.next(**itr_kwargs) 5246 else: 5247 job_result = next(ii) 5248 # run next task 5249 log_at_level(logger, 11, verbose, "r" * 80 + "\n") 5250 t = node._lookup_node_from_index(job_result.node_index) 5251 5252 # remove failed jobs from history-- their output is bogus now! 5253 if job_result.state in (JOB_ERROR, JOB_SIGNALLED_BREAK): 5254 log_at_level( 5255 logger, 10, verbose, " JOB ERROR / JOB_SIGNALLED_BREAK: " + job_result.job_name) 5256 # remove outfile from history if it exists 5257 for o_f_n in get_job_result_output_file_names(job_result): 5258 job_history.pop(o_f_n, None) 5259 5260 # only save poolsize number of errors 5261 if job_result.state == JOB_ERROR: 5262 log_at_level(logger, 10, verbose, " Exception caught for %s" 5263 % job_result.job_name) 5264 job_errors.append(job_result.exception) 5265 tasks_with_errors.add(t) 5266 5267 # 5268 # print to logger immediately 5269 # 5270 if log_exceptions: 5271 log_at_level(logger, 10, verbose, " Log Exception") 5272 logger.error(job_errors.get_nth_exception_str()) 5273 5274 # 5275 # break if too many errors 5276 # 5277 if len(job_errors) >= parallelism or exceptions_terminate_immediately: 5278 log_at_level(logger, 10, verbose, " Break loop %s %s %s " 5279 % (exceptions_terminate_immediately, 5280 len(job_errors), parallelism)) 5281 parameter_q.put(all_tasks_complete()) 5282 break 5283 5284 # break immediately if the user says stop 5285 elif job_result.state == JOB_SIGNALLED_BREAK: 5286 job_errors.append(job_result.exception) 5287 job_errors.specify_task(t, "Exceptions running jobs") 5288 log_at_level(logger, 10, verbose, " Break loop JOB_SIGNALLED_BREAK %s %s " 5289 % (len(job_errors), parallelism)) 5290 parameter_q.put(all_tasks_complete()) 5291 break 5292 5293 else: 5294 if job_result.state == JOB_UP_TO_DATE: 5295 # LOGGER: All Jobs in Out-of-date Tasks 5296 log_at_level(logger, 5, verbose, " %s unnecessary: already up to date" 5297 % job_result.job_name) 5298 else: 5299 # LOGGER: Out-of-date Jobs in Out-of-date Tasks 5300 log_at_level(logger, 3, verbose, 5301 " %s completed" % job_result.job_name) 5302 # save this task name and the job (input and output files) 5303 # alternatively, we could just save the output file and its 5304 # completion time, or on the other end of the spectrum, 5305 # we could save a checksum of the function that generated 5306 # this file, something akin to: 5307 # chksum = md5.md5(marshal.dumps(t.user_defined_work_func.func_code.co_code)) 5308 # we could even checksum the arguments to the function that 5309 # generated this file: 5310 # chksum2 = md5.md5(marshal.dumps(t.user_defined_work_func.func_defaults) + 5311 # marshal.dumps(t.args)) 5312 5313 for o_f_n in get_job_result_output_file_names(job_result): 5314 try: 5315 log_at_level(logger, 10, verbose, 5316 " Job History : " + o_f_n) 5317 mtime = os.path.getmtime(o_f_n) 5318 # 5319 # use probably higher resolution 5320 # time.time() over mtime which might have 1 or 2s 5321 # resolutions, unless there is clock skew and the 5322 # filesystem time > system time (e.g. for networks) 5323 # 5324 epoch_seconds = time.time() 5325 # Aargh. go back to insert one second between jobs 5326 if epoch_seconds < mtime: 5327 if one_second_per_job is None and \ 5328 not runtime_data["ONE_SECOND_PER_JOB"]: 5329 log_at_level(logger, 10, verbose, 5330 " Switch to 1s per job") 5331 runtime_data["ONE_SECOND_PER_JOB"] = True 5332 elif epoch_seconds - mtime < 1.1: 5333 mtime = epoch_seconds 5334 chksum = JobHistoryChecksum(o_f_n, mtime, 5335 job_result.unglobbed_params[2:], t) 5336 job_history[o_f_n] = chksum 5337 log_at_level(logger, 10, verbose, 5338 " Job History Saved: " + o_f_n) 5339 except: 5340 pass 5341 5342 log_at_level(logger, 10, verbose, 5343 " _is_up_to_date completed task & checksum...") 5344 # 5345 # _is_up_to_date completed task after checksumming 5346 # 5347 task_with_completed_job_q.put((t, 5348 job_result.task_name, 5349 job_result.node_index, 5350 job_result.job_name)) 5351 5352 # make sure queue is still full after each job is retired 5353 # do this after undating which jobs are incomplete 5354 log_at_level(logger, 10, verbose, " job errors?") 5355 if len(job_errors): 5356 # parameter_q.clear() 5357 # if len(job_errors) == 1 and not parameter_q._closed: 5358 log_at_level(logger, 10, verbose, " all tasks completed...") 5359 parameter_q.put(all_tasks_complete()) 5360 else: 5361 log_at_level(logger, 10, verbose, 5362 " Fill queue with more parameter...") 5363 fill_queue_with_job_parameters(job_parameters, parameter_q, parallelism, logger, 5364 verbose) 5365 # The equivalent of the normal end of a fall loop 5366 except StopIteration as e: 5367 pass 5368 except: 5369 exception_name, exception_value, exception_Traceback = sys.exc_info() 5370 exception_stack = traceback.format_exc() 5371 # save exception to rethrow later 5372 job_errors.append((None, None, exception_name, 5373 exception_value, exception_stack)) 5374 for ee in exception_value, exception_name, exception_stack: 5375 log_at_level(logger, 10, verbose, 5376 " Exception caught %s" % (ee,)) 5377 log_at_level(logger, 10, verbose, 5378 " Get next parameter size = %d" % parameter_q.qsize()) 5379 log_at_level(logger, 10, verbose, " Task with completed " 5380 "jobs size = %d" % task_with_completed_job_q.qsize()) 5381 parameter_q.put(all_tasks_complete()) 5382 try: 5383 death_event.clear() 5384 except: 5385 pass 5386 5387 if pool is not None: 5388 if hasattr(pool, "close"): 5389 log_at_level(logger, 10, verbose, " pool.close") 5390 pool.close() 5391 log_at_level(logger, 10, verbose, " pool.terminate") 5392 try: 5393 pool.terminate() 5394 except: 5395 pass 5396 log_at_level(logger, 10, verbose, " pool.terminated") 5397 raise job_errors 5398 5399 # log_at_level (logger, 10, verbose, " syncmanager.shutdown") 5400 # syncmanager.shutdown() 5401 5402 if pool is not None: 5403 log_at_level(logger, 10, verbose, " pool.close") 5404 # pool.join() 5405 try: 5406 pool.close() 5407 except AttributeError: 5408 pass 5409 log_at_level(logger, 10, verbose, " pool.terminate") 5410 try: 5411 pool.terminate() 5412 except AttributeError: 5413 pass 5414 except Exception: 5415 # an exception may be thrown after a signal is caught (Ctrl-C) 5416 # when the EventProxy(s) for death_event might be left hanging 5417 pass 5418 log_at_level(logger, 10, verbose, " pool.terminated") 5419 5420 # Switch back off EXTRA pipeline_run DEBUGGING 5421 EXTRA_PIPELINERUN_DEBUGGING = False 5422 5423 if len(job_errors): 5424 raise job_errors 5425 5426 5427if __name__ == '__main__': 5428 import unittest 5429 5430 # 5431 # debug parameter ignored if called as a module 5432 # 5433 if sys.argv.count("--debug"): 5434 sys.argv.remove("--debug") 5435 unittest.main() 5436