1from __future__ import print_function 2import re 3from . import dbdict 4from .ruffus_utility import * 5from .ruffus_utility import shorten_filenames_encoder, FILE_CHECK_RETRY, FILE_CHECK_SLEEP 6from .ruffus_exceptions import * 7 8################################################################################ 9# 10# file_name_parameters 11# 12# 13# Copyright (c) 10/9/2009 Leo Goodstadt 14# 15# Permission is hereby granted, free of charge, to any person obtaining a copy 16# of this software and associated documentation files (the "Software"), to deal 17# in the Software without restriction, including without limitation the rights 18# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 19# copies of the Software, and to permit persons to whom the Software is 20# furnished to do so, subject to the following conditions: 21# 22# The above copyright notice and this permission notice shall be included in 23# all copies or substantial portions of the Software. 24# 25# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 26# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 27# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 28# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 29# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 30# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 31# THE SOFTWARE. 32################################################################################# 33 34 35""" 36 37******************************************** 38:mod:`file_name_parameters` -- Overview 39******************************************** 40 41 42.. moduleauthor:: Leo Goodstadt <ruffus@llew.org.uk> 43 44 Handles file names for ruffus 45 46 47""" 48 49 50# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 51 52# imports 53 54 55# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 56import os 57import sys 58import time 59import glob 60from itertools import groupby 61import itertools 62from collections import defaultdict 63from time import strftime, gmtime 64 65if sys.hexversion >= 0x03000000: 66 # everything is unicode in python3 67 path_str_type = str 68else: 69 path_str_type = basestring 70 71 72class t_combinatorics_type: 73 (COMBINATORICS_PRODUCT, COMBINATORICS_PERMUTATIONS, 74 COMBINATORICS_COMBINATIONS, COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT) = list(range(4)) 75 76 77def get_readable_path_str(original_path, max_len): 78 """ 79 Truncates path to max_len characters if necessary 80 If the result is a path within nested directory, will remove partially 81 truncated directories names 82 """ 83 if len(original_path) < max_len: 84 return original_path 85 truncated_name = original_path[-(max_len - 5):] 86 if "/" not in truncated_name: 87 return "[...]" + truncated_name 88 return "[...]" + re.sub("^[^/]+", "", truncated_name) 89 90 91def epoch_seconds_to_str(epoch_seconds): 92 """ 93 Converts seconds since epoch into nice string with date and time to 2 significant 94 digits for seconds 95 """ 96 # returns 24 char long 25 May 2011 23:37:40.12 97 time_str = strftime("%d %b %Y %H:%M:%S", gmtime(epoch_seconds)) 98 # 99 fraction_of_second_as_str = ( 100 "%.2f" % (epoch_seconds - int(epoch_seconds)))[1:] 101 # or fraction = ("%.2f" % (divmod(epoch_seconds, 1)[1]))[1:] 102 return (time_str + fraction_of_second_as_str) 103 104 105err_msg_no_regex_match = ("No jobs were run because no file names matched.\n" 106 "Please make sure that the regular expression is correctly specified.") 107err_msg_empty_files_parameter = ("@files() was empty, i.e. no files were specified.\n" 108 "Please make sure this is by design.") 109 110 111class t_file_names_transform(object): 112 """ 113 Does the work for generating output / "extra input" / "extra" filenames 114 input 115 - a set of file names (derived from tasks, globs, hard coded file names) 116 - a specification (e.g. a new suffix, a regular expression substitution pattern) 117 output 118 - a new file name 119 120 N.B. Is this level of abstraction adequate? 121 1) On one hand, this is a simple extension of the current working design 122 2) On the other, we throw away the nested structure of tasks / globs on one hand 123 and the nested structure of the outputs on the other hand. 124 """ 125 126 def substitute(self, starting_file_names, pattern): 127 pass 128 129 # overriden only in t_suffix_file_names_transform 130 # only suffix() behaves differently for output and extra files... 131 def substitute_output_files(self, starting_file_names, pattern): 132 return self.substitute(starting_file_names, pattern) 133 134 135class t_suffix_file_names_transform(t_file_names_transform): 136 """ 137 Does the work for generating output / "extra input" / "extra" filenames 138 replacing a specified suffix 139 """ 140 141 def __init__(self, enclosing_task, suffix_object, error_type, descriptor_string, output_dir): 142 self.matching_regex = compile_suffix( 143 enclosing_task, suffix_object, error_type, descriptor_string) 144 self.matching_regex_str = suffix_object.args[0] 145 self.output_dir = output_dir 146 147 def substitute(self, starting_file_names, pattern): 148 if self.output_dir == []: 149 return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern) 150 else: 151 # change directory of starting file and return substitution 152 starting_file_name = os.path.join( 153 self.output_dir, os.path.split(starting_file_names[0])[1]) 154 return regex_replace(starting_file_name, self.matching_regex_str, self.matching_regex, pattern) 155 return 156 157 def substitute_output_files(self, starting_file_names, pattern): 158 if self.output_dir == []: 159 return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern, SUFFIX_SUBSTITUTE) 160 else: 161 # change directory of starting file and return substitution 162 starting_file_name = os.path.join( 163 self.output_dir, os.path.split(starting_file_names[0])[1]) 164 return regex_replace(starting_file_name, self.matching_regex_str, self.matching_regex, pattern, SUFFIX_SUBSTITUTE) 165 166 167class t_regex_file_names_transform(t_file_names_transform): 168 """ 169 Does the work for generating output / "extra input" / "extra" filenames 170 replacing a specified regular expression 171 """ 172 173 def __init__(self, enclosing_task, regex_object, error_type, descriptor_string): 174 self.matching_regex = compile_regex( 175 enclosing_task, regex_object, error_type, descriptor_string) 176 self.matching_regex_str = regex_object.args[0] 177 178 def substitute(self, starting_file_names, pattern): 179 return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern) 180 181 182class t_formatter_file_names_transform(t_file_names_transform): 183 """ 184 Does the work for generating output / "extra input" / "extra" filenames 185 replacing a specified regular expression 186 """ 187 188 def __init__(self, enclosing_task, format_object, error_type, descriptor_string): 189 self.matching_regexes = [] 190 self.matching_regex_strs = [] 191 if len(format_object.args): 192 self.matching_regexes = compile_formatter( 193 enclosing_task, format_object, error_type, descriptor_string) 194 self.matching_regex_strs = list(format_object.args) 195 196 def substitute(self, starting_file_names, pattern): 197 # note: uses all file names 198 return formatter_replace(starting_file_names, self.matching_regex_strs, self.matching_regexes, pattern) 199 200 201class t_nested_formatter_file_names_transform(t_file_names_transform): 202 """ 203 Does the work for generating output / "extra input" / "extra" filenames 204 apply a whole series of regular expresions to a whole series of input 205 """ 206 207 def __init__(self, enclosing_task, format_objects, error_type, descriptor_string): 208 self.list_matching_regex = [] 209 self.list_matching_regex_str = [] 210 211 for format_object in format_objects: 212 if len(format_object.args): 213 self.list_matching_regex.append(compile_formatter( 214 enclosing_task, format_object, error_type, descriptor_string)) 215 self.list_matching_regex_str.append(list(format_object.args)) 216 else: 217 self.list_matching_regex.append([]) 218 self.list_matching_regex_str.append([]) 219 220 def substitute(self, starting_file_names, pattern): 221 # note: uses all file names 222 return nested_formatter_replace(starting_file_names, self.list_matching_regex_str, self.list_matching_regex, pattern) 223 224 225# _________________________________________________________________________________________ 226 227# t_params_tasks_globs_run_time_data 228 229# _________________________________________________________________________________________ 230class t_params_tasks_globs_run_time_data(object): 231 """ 232 After parameters are parsed into tasks, globs, runtime data 233 """ 234 235 def __init__(self, params, tasks, globs, runtime_data_names): 236 self.params = params 237 self.tasks = tasks 238 self.globs = globs 239 self.runtime_data_names = runtime_data_names 240 241 def __str__(self): 242 return str(self.params) 243 244 def param_iter(self): 245 for p in self.params: 246 yield t_params_tasks_globs_run_time_data(p, self.tasks, self.globs, 247 self.runtime_data_names) 248 249 def unexpanded_globs(self): 250 """ 251 do not expand globs 252 """ 253 return t_params_tasks_globs_run_time_data(self.params, self.tasks, [], 254 self.runtime_data_names) 255 256 def single_file_to_list(self): 257 """ 258 if parameter is a simple string, wrap that in a list unless it is glob 259 Useful for simple @transform cases 260 """ 261 if isinstance(self.params, path_str_type) and not is_glob(self.params): 262 self.params = [self.params] 263 return True 264 return False 265 266 def file_names_transformed(self, filenames, file_names_transform): 267 """ 268 return clone with the filenames / globs transformed by the supplied transform object 269 """ 270 output_glob = file_names_transform.substitute(filenames, self.globs) 271 output_param = file_names_transform.substitute(filenames, self.params) 272 return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob, 273 self.runtime_data_names) 274 275 def output_file_names_transformed(self, filenames, file_names_transform): 276 """ 277 return clone with the filenames / globs transformed by the supplied transform object 278 """ 279 output_glob = file_names_transform.substitute_output_files( 280 filenames, self.globs) 281 output_param = file_names_transform.substitute_output_files( 282 filenames, self.params) 283 return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob, 284 self.runtime_data_names) 285 # 286 # deprecated 287 # 288 289 def regex_replaced(self, filename, regex, regex_or_suffix=REGEX_SUBSTITUTE): 290 output_glob = regex_replace( 291 filename, regex, self.globs, regex_or_suffix) 292 output_param = regex_replace( 293 filename, regex, self.params, regex_or_suffix) 294 return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob, 295 self.runtime_data_names) 296 297 298# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 299 300# needs_update_func 301 302# functions which are called to see if a job needs to be updated 303# 304# Each task is a series of parallel jobs 305# each of which has the following pseudo-code 306# 307# for param in param_generator_func(): 308# if needs_update_func(*param): 309# job_wrapper(*param) 310# 311# N.B. param_generator_func yields iterators of *sequences* 312# if you are generating single parameters, turn them into lists: 313# 314# for a in alist: 315# yield (a,) 316# 317# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 318# _________________________________________________________________________________________ 319 320# needs_update_check_directory_missing 321 322# N.B. throws exception if this is an ordinary file, not a directory 323 324 325# _________________________________________________________________________________________ 326def needs_update_check_directory_missing(*params, **kwargs): 327 """ 328 Called per directory: 329 Does it exist? 330 Is it an ordinary file not a directory? (throw exception 331 """ 332 if len(params) == 1: 333 dirs = params[0] 334 elif len(params) == 2: 335 dirs = params[1] 336 else: 337 raise Exception( 338 "Wrong number of arguments in mkdir check %s" % (params,)) 339 340 missing_directories = [] 341 for d in get_strings_in_flattened_sequence(dirs): 342 # print >>sys.stderr, "check directory missing %d " % os.path.exists(d) # DEBUG 343 if not os.path.exists(d): 344 missing_directories.append(d) 345 continue 346 # return True, "Directory [%s] is missing" % d 347 if not os.path.isdir(d): 348 raise error_not_a_directory( 349 "%s already exists but as a file, not a directory" % d) 350 351 if len(missing_directories): 352 if len(missing_directories) > 1: 353 return True, ": Directories %r are missing" % (", ".join(missing_directories)) 354 else: 355 return True, ": Directories %r is missing" % (missing_directories[0]) 356 return False, "All directories exist" 357 358 359def check_input_files_exist(*params): 360 """If inputs are missing then there is no way a job can run 361 successful. Must throw exception. 362 363 This extra function is a hack to make sure input files exists 364 right before job is called for better error messages, and to save 365 things from blowing up inside the task function. 366 367 In practice, we have observed a sporadic time-lag between a task 368 completing on a remote node and an output file appearing in the 369 expected location on the host running the ruffus pipeline. This 370 causes a subsequent task to fail with missing input file even 371 though the file will appear a few seconds later. It is not clear 372 if this is an issue of a poorly configured storage, but a retry 373 behaviour has been implemented to work around such issues. 374 375 """ 376 if len(params): 377 input_files = params[0] 378 379 for f in get_strings_in_flattened_sequence(input_files): 380 tries = FILE_CHECK_RETRY 381 while tries > 0: 382 if not os.path.exists(f): 383 if os.path.lexists(f): 384 raise MissingInputFileError("No way to run job: " + 385 "Input file '%s' is a broken symbolic link." % f) 386 tries -= 1 387 time.sleep(FILE_CHECK_SLEEP) 388 continue 389 break 390 if tries <= 0: 391 raise MissingInputFileError("No way to run job: " + 392 "Input file '%s' does not exist" % f) 393 394 395def needs_update_check_exist(*params, **kwargs): 396 """ 397 Given input and output files, see if all exist 398 Each can be 399 400 #. string: assumed to be a filename "file1" 401 #. any other type 402 #. arbitrary nested sequence of (1) and (2) 403 404 """ 405 if "verbose_abbreviated_path" in kwargs: 406 verbose_abbreviated_path = kwargs["verbose_abbreviated_path"] 407 else: 408 verbose_abbreviated_path = -55 409 410 # missing output means build 411 if len(params) < 2: 412 return True, "i/o files not specified" 413 414 i, o = params[0:2] 415 i = get_strings_in_flattened_sequence(i) 416 o = get_strings_in_flattened_sequence(o) 417 418 # 419 # build: missing output file 420 # 421 if len(o) == 0: 422 return True, "Missing output file" 423 424 # missing input / output file means always build 425 missing_files = [] 426 for io in (i, o): 427 for p in io: 428 if not os.path.exists(p): 429 missing_files.append(p) 430 if len(missing_files): 431 return True, "...\n Missing file%s %s" % ("s" if len(missing_files) > 1 else "", 432 shorten_filenames_encoder(missing_files, 433 verbose_abbreviated_path)) 434 435 # 436 # missing input -> build only if output absent 437 # 438 if len(i) == 0: 439 return False, "Missing input files" 440 441 return False, "Up to date" 442 443 444# _________________________________________________________________________________________ 445 446# needs_update_check_modify_time 447 448# _________________________________________________________________________________________ 449def needs_update_check_modify_time(*params, **kwargs): 450 """ 451 Given input and output files, see if all exist and whether output files are later than input files 452 Each can be 453 454 #. string: assumed to be a filename "file1" 455 #. any other type 456 #. arbitrary nested sequence of (1) and (2) 457 458 """ 459 # conditions for rerunning a job: 460 # 1. forced to rerun entire taskset 461 # 2. 1+ Output files don't exist 462 # 3. 1+ of input files is newer than 1+ output files -- ruffus does this level right now... 463 # 4. internal completion time for that file is out of date # incomplete runs will be rerun automatically 464 # 5. checksum of code that ran the file is out of date # changes to function body result in rerun 465 # 6. checksum of the args that ran the file are out of date # appropriate config file changes result in rerun 466 try: 467 task = kwargs['task'] 468 except KeyError: 469 # allow the task not to be specified and fall back to classic 470 # file timestamp behavior (either this or fix all the test cases, 471 # which often don't have proper tasks) 472 class Namespace: 473 pass 474 task = Namespace() 475 task.checksum_level = CHECKSUM_FILE_TIMESTAMPS 476 477 if "verbose_abbreviated_path" in kwargs: 478 verbose_abbreviated_path = kwargs["verbose_abbreviated_path"] 479 else: 480 verbose_abbreviated_path = -55 481 482 try: 483 job_history = kwargs['job_history'] 484 except KeyError: 485 # allow job_history not to be specified and reopen dbdict file redundantly... 486 # Either this or fix all the test cases 487 #job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True) 488 print("Oops: Should only appear in test code", file=sys.stderr) 489 job_history = open_job_history(None) 490 491 # missing output means build 492 if len(params) < 2: 493 return True, "" 494 495 i, o = params[0:2] 496 i = get_strings_in_flattened_sequence(i) 497 o = get_strings_in_flattened_sequence(o) 498 499 # 500 # build: missing output file 501 # 502 if len(o) == 0: 503 return True, "Missing output file" 504 505 # missing input / output file means always build 506 missing_files = [] 507 for io in (i, o): 508 for p in io: 509 if not os.path.exists(p): 510 missing_files.append(p) 511 if len(missing_files): 512 return True, "...\n Missing file%s %s" % ("s" if len(missing_files) > 1 else "", 513 shorten_filenames_encoder(missing_files, 514 verbose_abbreviated_path)) 515 # 516 # N.B. Checkpointing uses relative paths 517 # 518 519 # existing files, but from previous interrupted runs 520 if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS: 521 incomplete_files = [] 522 set_incomplete_files = set() 523 func_changed_files = [] 524 set_func_changed_files = set() 525 param_changed_files = [] 526 set_param_changed_files = set() 527 # for io in (i, o): 528 # for p in io: 529 # if p not in job_history: 530 # incomplete_files.append(p) 531 for p in o: 532 if os.path.relpath(p) not in job_history and p not in set_incomplete_files: 533 incomplete_files.append(p) 534 set_incomplete_files.add(p) 535 if len(incomplete_files): 536 return True, "Uncheckpointed file%s (left over from a failed run?):\n %s" % ("s" if len(incomplete_files) > 1 else "", 537 shorten_filenames_encoder(incomplete_files, 538 verbose_abbreviated_path)) 539 # check if function that generated our output file has changed 540 for o_f_n in o: 541 rel_o_f_n = os.path.relpath(o_f_n) 542 old_chksum = job_history[rel_o_f_n] 543 new_chksum = JobHistoryChecksum(rel_o_f_n, None, params[2:], task) 544 if task.checksum_level >= CHECKSUM_FUNCTIONS_AND_PARAMS and \ 545 new_chksum.chksum_params != old_chksum.chksum_params and \ 546 o_f_n not in set_func_changed_files: 547 param_changed_files.append(o_f_n) 548 set_param_changed_files.add(o_f_n) 549 elif task.checksum_level >= CHECKSUM_FUNCTIONS and \ 550 new_chksum.chksum_func != old_chksum.chksum_func and \ 551 o_f_n not in set_func_changed_files: 552 func_changed_files.append(o_f_n) 553 set_func_changed_files.add(o_f_n) 554 555 if len(func_changed_files): 556 return True, "Pipeline function has changed:\n %s" % (shorten_filenames_encoder(func_changed_files, 557 verbose_abbreviated_path)) 558 if len(param_changed_files): 559 return True, "Pipeline parameters have changed:\n %s" % (shorten_filenames_encoder(param_changed_files, 560 verbose_abbreviated_path)) 561 562 # 563 # missing input -> build only if output absent or function is out of date 564 # 565 if len(i) == 0: 566 return False, "Missing input files" 567 568 # 569 # get sorted modified times for all input and output files 570 # 571 filename_to_times = [[], []] 572 file_times = [[], []] 573 574 # _____________________________________________________________________________________ 575 576 # pretty_io_with_date_times 577 578 # _____________________________________________________________________________________ 579 580 def pretty_io_with_date_times(filename_to_times): 581 582 # sort 583 for io in range(2): 584 filename_to_times[io].sort() 585 586 # 587 # add asterisk for all files which are causing this job to be out of date 588 # 589 file_name_to_asterisk = dict() 590 oldest_output_mtime = filename_to_times[1][0][0] 591 for mtime, file_name in filename_to_times[0]: 592 file_name_to_asterisk[file_name] = "*" if mtime >= oldest_output_mtime else " " 593 newest_output_mtime = filename_to_times[0][-1][0] 594 for mtime, file_name in filename_to_times[1]: 595 file_name_to_asterisk[file_name] = "*" if mtime <= newest_output_mtime else " " 596 597 # 598 # try to fit in 100 - 15 = 85 char lines 599 # date time ~ 25 characters so limit file name to 55 characters 600 # 601 msg = "\n" 602 category_names = "Input", "Output" 603 for io in range(2): 604 msg += " %s files:\n" % category_names[io] 605 for mtime, file_name in filename_to_times[io]: 606 file_datetime_str = epoch_seconds_to_str(mtime) 607 msg += (" " + # indent 608 # asterisked out of date files 609 file_name_to_asterisk[file_name] + " " + 610 file_datetime_str + ": " + # date time of file 611 shorten_filenames_encoder(file_name, 612 verbose_abbreviated_path) + "\n") # file name truncated to 55 613 return msg 614 615 # 616 # Ignore output file if it is found in the list of input files 617 # By definition they have the same timestamp, 618 # and the job will otherwise appear to be out of date 619 # 620 # Symbolic links followed 621 real_input_file_names = set() 622 for input_file_name in i: 623 rel_input_file_name = os.path.relpath(input_file_name) 624 real_input_file_names.add(os.path.realpath(input_file_name)) 625 file_timestamp = os.path.getmtime(input_file_name) 626 if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS and rel_input_file_name in job_history: 627 old_chksum = job_history[rel_input_file_name] 628 mtime = max(file_timestamp, old_chksum.mtime) 629 else: 630 mtime = file_timestamp 631 filename_to_times[0].append((mtime, input_file_name)) 632 file_times[0].append(mtime) 633 634 # for output files, we need to check modification time *in addition* to 635 # function and argument checksums... 636 for output_file_name in o: 637 # 638 # Ignore output files which are just symbolic links to input files or passed through 639 # from input to output 640 # 641 real_file_name = os.path.realpath(output_file_name) 642 if real_file_name in real_input_file_names: 643 continue 644 645 rel_output_file_name = os.path.relpath(output_file_name) 646 file_timestamp = os.path.getmtime(output_file_name) 647 if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS: 648 old_chksum = job_history[rel_output_file_name] 649 if old_chksum.mtime > file_timestamp and old_chksum.mtime - file_timestamp > 1.1: 650 mtime = file_timestamp 651 # use check sum time in preference if both are within one second 652 # (suggesting higher resolution 653 else: 654 mtime = old_chksum.mtime 655 else: 656 mtime = file_timestamp 657 file_times[1].append(mtime) 658 filename_to_times[1].append((mtime, output_file_name)) 659 660 # 661 # Debug: Force print modified file names and times 662 # 663 # if len(file_times[0]) and len (file_times[1]): 664 # print >>sys.stderr, pretty_io_with_date_times(filename_to_times), file_times, (max(file_times[0]) >= min(file_times[1])) 665 # else: 666 # print >>sys.stderr, i, o 667 668 # 669 # update if any input file >= (more recent) output file 670 # 671 if len(file_times[0]) and len(file_times[1]) and max(file_times[0]) >= min(file_times[1]): 672 return True, pretty_io_with_date_times(filename_to_times) 673 674 if "return_file_dates_when_uptodate" in kwargs and kwargs["return_file_dates_when_uptodate"]: 675 return False, "Up to date\n" + pretty_io_with_date_times(filename_to_times) 676 677 return False, "Up to date" 678 679 680# _________________________________________________________________________________________ 681# 682# is_file_re_combining 683# 684# _________________________________________________________________________________________ 685def is_file_re_combining(old_args): 686 """ 687 Helper function for @files_re 688 check if parameters wrapped in combine 689 """ 690 combining_all_jobs = False 691 orig_args = [] 692 for arg in old_args: 693 if isinstance(arg, combine): 694 combining_all_jobs = True 695 if len(arg.args) == 1: 696 orig_args.append(arg.args[0]) 697 else: 698 orig_args.append(arg[0].args) 699 else: 700 orig_args.append(arg) 701 return combining_all_jobs, orig_args 702 703 704# _________________________________________________________________________________________ 705 706# file_names_from_tasks_globs 707 708# _________________________________________________________________________________________ 709def file_names_from_tasks_globs(files_task_globs, 710 runtime_data, do_not_expand_single_job_tasks=False): 711 """ 712 Replaces glob specifications and tasks with actual files / task output 713 """ 714 715 # special handling for chaining tasks which conceptual have a single job 716 # i.e. @merge and @files/@parallel with single job parameters 717 if files_task_globs.params.__class__.__name__ == 'Task' and do_not_expand_single_job_tasks: 718 return files_task_globs.params._get_output_files(True, runtime_data) 719 720 task_or_glob_to_files = dict() 721 722 # look up globs and tasks 723 for g in files_task_globs.globs: 724 # check whether still is glob pattern after transform 725 # {} are particularly suspicious... 726 if is_glob(g): 727 task_or_glob_to_files[g] = sorted(glob.glob(g)) 728 for t in files_task_globs.tasks: 729 of = t._get_output_files(False, runtime_data) 730 task_or_glob_to_files[t] = of 731 for n in files_task_globs.runtime_data_names: 732 data_name = n.args[0] 733 if data_name in runtime_data: 734 task_or_glob_to_files[n] = runtime_data[data_name] 735 else: 736 raise error_missing_runtime_parameter("The inputs of this task depends on " + 737 "the runtime parameter " + 738 "'%s' which is missing " % data_name) 739 740 return expand_nested_tasks_or_globs(files_task_globs.params, task_or_glob_to_files) 741 742 743# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 744 745# param_factories 746 747# makes python generators which yield parameters for 748# 749# A) needs_update_func 750# B) job_wrapper 751 752# Each task is a series of parallel jobs 753# each of which has the following pseudo-code 754# 755# for param in param_generator_func(): 756# if needs_update_func(*param): 757# act_func(*param) 758# 759# Test Usage: 760# 761# 762# param_func = xxx_factory(tasks, globs, orig_input_params, ...) 763# 764# for params in param_func(): 765# i, o = params[0:1] 766# print " input_params = " , i 767# print "output = " , o 768# 769# 770# 771# 772# 773# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 774 775# _________________________________________________________________________________________ 776 777# touch_file_factory 778 779# _________________________________________________________________________________________ 780def touch_file_factory(orig_args, register_cleanup): 781 """ 782 Creates function, which when called, will touch files 783 """ 784 file_names = orig_args 785 # accepts unicode 786 if isinstance(orig_args, path_str_type): 787 file_names = [orig_args] 788 else: 789 # make copy so when original is modifies, we don't get confused! 790 file_names = list(orig_args) 791 792 def do_touch_file(): 793 for f in file_names: 794 if not os.path.exists(f): 795 with open(f, 'w') as ff: 796 pass 797 else: 798 os.utime(f, None) 799 register_cleanup(f, "touch") 800 return do_touch_file 801 802 803# _________________________________________________________________________________________ 804 805# file_param_factory 806 807# orig_args = ["input", "output", 1, 2, ...] 808# orig_args = [ 809# ["input0", "output0", 1, 2, ...] # job 1 810# [["input1a", "input1b"], "output1", 1, 2, ...] # job 2 811# ["input2", ["output2a", "output2b"], 1, 2, ...] # job 3 812# ["input3", "output3", 1, 2, ...] # job 4 813# ] 814# 815# _________________________________________________________________________________________ 816def args_param_factory(orig_args): 817 """ 818 Factory for functions which 819 yield tuples of inputs, outputs / extras 820 821 ..Note:: 822 823 1. Each job requires input/output file names 824 2. Input/output file names can be a string, an arbitrarily nested sequence 825 3. Non-string types are ignored 826 3. Either Input or output file name must contain at least one string 827 828 """ 829 def iterator(runtime_data): 830 for job_param in orig_args: 831 yield job_param, job_param 832 return iterator 833 834# _________________________________________________________________________________________ 835 836# file_param_factory 837 838# orig_args = ["input", "output", 1, 2, ...] 839# orig_args = [ 840# ["input0", "output0", 1, 2, ...] # job 1 841# [["input1a", "input1b"], "output1", 1, 2, ...] # job 2 842# ["input2", ["output2a", "output2b"], 1, 2, ...] # job 3 843# ["input3", "output3", 1, 2, ...] # job 4 844# ] 845# 846# _________________________________________________________________________________________ 847 848 849def files_param_factory(input_files_task_globs, 850 do_not_expand_single_job_tasks, output_extras): 851 """ 852 Factory for functions which 853 yield tuples of inputs, outputs / extras 854 855 ..Note:: 856 857 1. Each job requires input/output file names 858 2. Input/output file names can be a string, an arbitrarily nested sequence 859 3. Non-string types are ignored 860 3. Either Input or output file name must contain at least one string 861 862 """ 863 def iterator(runtime_data): 864 # substitute inputs 865 # input_params = file_names_from_tasks_globs(input_files_task_globs, runtime_data, False) 866 867 if input_files_task_globs.params == []: 868 if "ruffus_WARNING" not in runtime_data: 869 runtime_data["ruffus_WARNING"] = defaultdict(set) 870 runtime_data["ruffus_WARNING"][iterator].add( 871 err_msg_empty_files_parameter) 872 return 873 874 for input_spec, output_extra_param in zip(input_files_task_globs.param_iter(), output_extras): 875 input_param = file_names_from_tasks_globs( 876 input_spec, runtime_data, do_not_expand_single_job_tasks) 877 yield_param = (input_param, ) + output_extra_param 878 yield yield_param, yield_param 879 return iterator 880 881 882def files_custom_generator_param_factory(generator): 883 """ 884 Factory for @files taking custom generators 885 wraps so that the generator swallows the extra runtime_data argument 886 887 """ 888 def iterator(runtime_data): 889 for params in generator(): 890 yield params, params 891 return iterator 892 893# _________________________________________________________________________________________ 894 895# split_param_factory 896 897# _________________________________________________________________________________________ 898 899 900def split_param_factory(input_files_task_globs, output_files_task_globs, *extra_params): 901 """ 902 Factory for task_split 903 """ 904 def iterator(runtime_data): 905 # do_not_expand_single_job_tasks = True 906 907 # 908 # substitute tasks / globs at runtime. No glob subsitution for logging 909 # 910 input_param = file_names_from_tasks_globs( 911 input_files_task_globs, runtime_data, True) 912 output_param = file_names_from_tasks_globs( 913 output_files_task_globs, runtime_data) 914 output_param_logging = file_names_from_tasks_globs( 915 output_files_task_globs.unexpanded_globs(), runtime_data) 916 917 yield (input_param, output_param) + extra_params, (input_param, output_param_logging) + extra_params 918 919 return iterator 920 921 922# _________________________________________________________________________________________ 923 924# merge_param_factory 925 926# _________________________________________________________________________________________ 927def merge_param_factory(input_files_task_globs, 928 output_param, 929 *extra_params): 930 """ 931 Factory for task_merge 932 """ 933 # 934 def iterator(runtime_data): 935 # do_not_expand_single_job_tasks = True 936 input_param = file_names_from_tasks_globs( 937 input_files_task_globs, runtime_data, True) 938 yield_param = (input_param, output_param) + extra_params 939 yield yield_param, yield_param 940 941 return iterator 942 943 944# _________________________________________________________________________________________ 945 946# originate_param_factory 947 948# _________________________________________________________________________________________ 949def originate_param_factory(list_output_files_task_globs, 950 *extra_params): 951 """ 952 Factory for task_originate 953 """ 954 # 955 def iterator(runtime_data): 956 for output_files_task_globs in list_output_files_task_globs: 957 output_param = file_names_from_tasks_globs( 958 output_files_task_globs, runtime_data) 959 output_param_logging = file_names_from_tasks_globs( 960 output_files_task_globs.unexpanded_globs(), runtime_data) 961 yield (None, output_param) + tuple(extra_params), (None, output_param_logging) + tuple(extra_params) 962 963 return iterator 964 965 966# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 967 968# param_factories 969 970# ... which take inputs(), add_inputs(), suffix(), regex(), formatter() 971 972# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888 973 974 975# _________________________________________________________________________________________ 976 977# input_param_to_file_name_list 978 979# _________________________________________________________________________________________ 980def input_param_to_file_name_list(input_params): 981 """ 982 Common function for 983 collate_param_factory 984 transform_param_factory 985 subdivide_param_factory 986 Creates adapter object 987 Converts (on the fly) collection / iterator of input params 988 ==> generator of flat list of strings (file_names) 989 """ 990 for per_job_input_param in input_params: 991 flattened_list_of_file_names = get_strings_in_flattened_sequence( 992 per_job_input_param) 993 yield per_job_input_param, flattened_list_of_file_names 994 995 996# _________________________________________________________________________________________ 997 998# input_param_to_file_name_list 999 1000# _________________________________________________________________________________________ 1001def list_input_param_to_file_name_list(input_params): 1002 """ 1003 Common function for 1004 product_param_factory 1005 Creates adapter object 1006 Converts (on the fly) collection / iterator of nested (input params) 1007 ==> generator of flat list of strings (file_names) 1008 """ 1009 for per_job_input_param_list in input_params: 1010 list_of_flattened_list_of_file_names = [ 1011 get_strings_in_flattened_sequence(ii) for ii in per_job_input_param_list] 1012 yield per_job_input_param_list, list_of_flattened_list_of_file_names 1013 1014 1015# _________________________________________________________________________________________ 1016 1017# yield_io_params_per_job 1018 1019# _________________________________________________________________________________________ 1020def yield_io_params_per_job(input_params, 1021 file_names_transform, 1022 extra_input_files_task_globs, 1023 replace_inputs, 1024 output_pattern, 1025 extra_specs, 1026 runtime_data, 1027 iterator, 1028 expand_globs_in_output=False): 1029 """ 1030 Helper function for 1031 transform_param_factory and 1032 collate_param_factory and 1033 subdivide_param_factory and 1034 combinatorics_param_factory and 1035 product_param_factory 1036 1037 1038 ********************************************************* 1039 * * 1040 * Bad (non-orthogonal) design here. Needs refactoring * 1041 * * 1042 ********************************************************* 1043 1044 subdivide_param_factory requires globs patterns to be expanded 1045 1046 yield (function call parameters, display parameters) 1047 1048 all others 1049 1050 yield function call parameters 1051 1052 1053 This means that 1054 1055 all but @subdivide have 1056 1057 for y in yield_io_params_per_job (...): 1058 yield y, y 1059 1060 subdivide_param_factory has: 1061 1062 return yield_io_params_per_job 1063 1064 We would make everything more orthogonal but the current code makes collate easier to write... 1065 1066 collate_param_factory 1067 1068 for output_extra_params, grouped_params in groupby(sorted(io_params_iter, key = get_output_extras), key = get_output_extras): 1069 1070 1071 1072 1073 """ 1074 # 1075 # Add extra warning if no regular expressions match: 1076 # This is a common class of frustrating errors 1077 # 1078 no_regular_expression_matches = True 1079 1080 for orig_input_param, filenames in input_params: 1081 try: 1082 1083 # 1084 # Should run job even if there are no file names, so long as there are input parameters...?? 1085 # 1086 # if not orig_input_param: 1087 if not filenames: 1088 continue 1089 1090 # 1091 # extra input has a mixture of input and output parameter behaviours: 1092 # 1) If it contains tasks, the files from these are passed through unchanged 1093 # 2) If it contains strings which look like strings, 1094 # these are transformed using regular expression, file component substitution etc. 1095 # just like output params 1096 # 1097 # So we do (2) first, ignoring tasks, then (1) 1098 if extra_input_files_task_globs: 1099 extra_inputs = extra_input_files_task_globs.file_names_transformed( 1100 filenames, file_names_transform) 1101 1102 # 1103 # add or replace existing input parameters 1104 # 1105 if replace_inputs == t_extra_inputs.REPLACE_INPUTS: 1106 input_param = file_names_from_tasks_globs( 1107 extra_inputs, runtime_data) 1108 elif replace_inputs == t_extra_inputs.ADD_TO_INPUTS: 1109 input_param = ( 1110 orig_input_param,) + file_names_from_tasks_globs(extra_inputs, runtime_data) 1111 else: 1112 input_param = orig_input_param 1113 else: 1114 input_param = orig_input_param 1115 1116 # extras 1117 extra_params = tuple(file_names_transform.substitute( 1118 filenames, p) for p in extra_specs) 1119 1120 if expand_globs_in_output: 1121 # 1122 # do regex substitution to complete glob pattern 1123 # before glob matching 1124 # 1125 output_pattern_transformed = output_pattern.output_file_names_transformed( 1126 filenames, file_names_transform) 1127 output_param = file_names_from_tasks_globs( 1128 output_pattern_transformed, runtime_data) 1129 output_param_unglobbed = file_names_from_tasks_globs( 1130 output_pattern_transformed.unexpanded_globs(), runtime_data) 1131 yield ((input_param, output_param) + extra_params, 1132 (input_param, output_param_unglobbed) + extra_params) 1133 else: 1134 1135 # output 1136 output_param = file_names_transform.substitute_output_files( 1137 filenames, output_pattern) 1138 yield (input_param, output_param) + extra_params 1139 1140 no_regular_expression_matches = False 1141 1142 # match failures are ignored 1143 except error_input_file_does_not_match: 1144 if runtime_data != None: 1145 if not "MATCH_FAILURE" in runtime_data: 1146 runtime_data["MATCH_FAILURE"] = defaultdict(set) 1147 runtime_data["MATCH_FAILURE"][iterator].add( 1148 str(sys.exc_info()[1]).strip()) 1149 continue 1150 1151 # all other exceptions including malformed regexes are raised 1152 except Exception: 1153 # print sys.exc_info() 1154 raise 1155 1156 # 1157 # Add extra warning if no regular expressions match: 1158 # This is a common class of frustrating errors 1159 # 1160 if no_regular_expression_matches == True: 1161 if runtime_data != None: 1162 if "ruffus_WARNING" not in runtime_data: 1163 runtime_data["ruffus_WARNING"] = defaultdict(set) 1164 runtime_data["ruffus_WARNING"][iterator].add( 1165 err_msg_no_regex_match) 1166 1167 1168# _________________________________________________________________________________________ 1169 1170# subdivide_param_factory 1171 1172# _________________________________________________________________________________________ 1173def subdivide_param_factory(input_files_task_globs, 1174 file_names_transform, 1175 extra_input_files_task_globs, 1176 replace_inputs, 1177 output_files_task_globs, 1178 *extra_specs): 1179 """ 1180 Factory for task_split (advanced form) 1181 """ 1182 def iterator(runtime_data): 1183 1184 # 1185 # Convert input file names, globs, and tasks -> a list of (nested) file names 1186 # Each element of the list corresponds to the input parameters of a single job 1187 # 1188 input_params = file_names_from_tasks_globs( 1189 input_files_task_globs, runtime_data) 1190 1191 if not len(input_params): 1192 return [] 1193 1194 return yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))), 1195 file_names_transform, 1196 extra_input_files_task_globs, 1197 replace_inputs, 1198 output_files_task_globs, 1199 extra_specs, 1200 runtime_data, 1201 iterator, 1202 True) 1203 1204 return iterator 1205 1206 1207# _________________________________________________________________________________________ 1208 1209# combinatorics_param_factory 1210 1211# _________________________________________________________________________________________ 1212def combinatorics_param_factory(input_files_task_globs, 1213 combinatorics_type, 1214 k_tuple, 1215 file_names_transform, 1216 extra_input_files_task_globs, 1217 replace_inputs, 1218 output_pattern, 1219 *extra_specs): 1220 """ 1221 Factory for task_combinations_with_replacement, task_combinations, task_permutations 1222 """ 1223 def iterator(runtime_data): 1224 1225 # 1226 # Convert input file names, globs, and tasks -> a list of (nested) file names 1227 # Each element of the list corresponds to the input parameters of a single job 1228 # 1229 input_params = file_names_from_tasks_globs( 1230 input_files_task_globs, runtime_data) 1231 1232 if not len(input_params): 1233 return 1234 1235 if combinatorics_type == t_combinatorics_type.COMBINATORICS_PERMUTATIONS: 1236 combinatoric_iter = itertools.permutations(input_params, k_tuple) 1237 elif combinatorics_type == t_combinatorics_type.COMBINATORICS_COMBINATIONS: 1238 combinatoric_iter = itertools.combinations(input_params, k_tuple) 1239 elif combinatorics_type == t_combinatorics_type.COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT: 1240 combinatoric_iter = itertools.combinations_with_replacement( 1241 input_params, k_tuple) 1242 else: 1243 raise Exception("Unknown combinatorics type %d" % 1244 combinatorics_type) 1245 1246 for y in yield_io_params_per_job(list_input_param_to_file_name_list(combinatoric_iter), 1247 file_names_transform, 1248 extra_input_files_task_globs, 1249 replace_inputs, 1250 output_pattern, 1251 extra_specs, 1252 runtime_data, 1253 iterator): 1254 yield y, y 1255 1256 return iterator 1257 1258 1259# _________________________________________________________________________________________ 1260 1261# product_param_factory 1262 1263# _________________________________________________________________________________________ 1264def product_param_factory(list_input_files_task_globs, 1265 file_names_transform, 1266 extra_input_files_task_globs, 1267 replace_inputs, 1268 output_pattern, 1269 *extra_specs): 1270 """ 1271 Factory for task_product 1272 """ 1273 def iterator(runtime_data): 1274 1275 # 1276 # Convert input file names, globs, and tasks -> a list of (nested) file names 1277 # Each element of the list corresponds to the input parameters of a single job 1278 # 1279 input_params_list = [file_names_from_tasks_globs( 1280 ftg, runtime_data) for ftg in list_input_files_task_globs] 1281 1282 # 1283 # ignore if empty list in any of all versus all 1284 # 1285 if not len(input_params_list): 1286 return 1287 1288 for input_params in input_params_list: 1289 if not len(input_params): 1290 return 1291 1292 for y in yield_io_params_per_job(list_input_param_to_file_name_list(itertools.product(*input_params_list)), 1293 file_names_transform, 1294 extra_input_files_task_globs, 1295 replace_inputs, 1296 output_pattern, 1297 extra_specs, 1298 runtime_data, 1299 iterator): 1300 yield y, y 1301 1302 return iterator 1303 1304 1305# _________________________________________________________________________________________ 1306 1307# transform_param_factory 1308 1309# _________________________________________________________________________________________ 1310def transform_param_factory(input_files_task_globs, 1311 file_names_transform, 1312 extra_input_files_task_globs, 1313 replace_inputs, 1314 output_pattern, 1315 *extra_specs): 1316 """ 1317 Factory for task_transform 1318 """ 1319 def iterator(runtime_data): 1320 1321 # 1322 # Convert input file names, globs, and tasks -> a list of (nested) file names 1323 # Each element of the list corresponds to the input parameters of a single job 1324 # 1325 input_params = file_names_from_tasks_globs(input_files_task_globs, runtime_data) 1326 1327 if not len(input_params): 1328 return 1329 1330 for y in yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))), 1331 file_names_transform, 1332 extra_input_files_task_globs, 1333 replace_inputs, 1334 output_pattern, 1335 extra_specs, 1336 runtime_data, 1337 iterator): 1338 yield y, y 1339 1340 return iterator 1341 1342 1343# _________________________________________________________________________________________ 1344 1345# collate_param_factory 1346 1347# _________________________________________________________________________________________ 1348def collate_param_factory(input_files_task_globs, 1349 file_names_transform, 1350 extra_input_files_task_globs, 1351 replace_inputs, 1352 output_pattern, 1353 *extra_specs): 1354 """ 1355 Factory for task_collate 1356 1357 Looks exactly like @transform except that all [input] which lead to the same [output / extra] are combined together 1358 """ 1359 # 1360 def iterator(runtime_data): 1361 1362 # 1363 # Convert input file names, globs, and tasks -> a list of (nested) file names 1364 # Each element of the list corresponds to the input parameters of a single job 1365 # 1366 input_params = file_names_from_tasks_globs( 1367 input_files_task_globs, runtime_data) 1368 1369 if not len(input_params): 1370 return 1371 1372 io_params_iter = yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))), 1373 file_names_transform, 1374 extra_input_files_task_globs, 1375 replace_inputs, 1376 output_pattern, 1377 extra_specs, 1378 runtime_data, 1379 iterator) 1380 1381 # 1382 # group job params if their output/extra params are identical 1383 # 1384 # sort by first converted to string, and then grouped itself 1385 # identical things must be adjacent and sorting by strings guarantees that 1386 def get_output_extras(x): return x[1:] 1387 1388 def get_output_extras_str(x): return str(x[1:]) 1389 for output_extra_params, grouped_params in groupby(sorted(io_params_iter, key=get_output_extras_str), key=get_output_extras): 1390 # 1391 # yield the different input params grouped into a tuple, followed by all the common params 1392 # i.e. (input1, input2, input3), common_output, common_extra1, common_extra2... 1393 # 1394 1395 # Use group by to avoid successive duplicate input_param (remember we have sorted) 1396 # This works even with unhashable items! 1397 1398 params = (tuple(input_param for input_param, ignore in 1399 groupby(g[0] for g in grouped_params)),) + output_extra_params 1400 1401 # the same params twice, once for use, once for display, identical in this case 1402 yield params, params 1403 1404 return iterator 1405