1from __future__ import print_function
2import re
3from . import dbdict
4from .ruffus_utility import *
5from .ruffus_utility import shorten_filenames_encoder, FILE_CHECK_RETRY, FILE_CHECK_SLEEP
6from .ruffus_exceptions import *
7
8################################################################################
9#
10#   file_name_parameters
11#
12#
13#   Copyright (c) 10/9/2009 Leo Goodstadt
14#
15#   Permission is hereby granted, free of charge, to any person obtaining a copy
16#   of this software and associated documentation files (the "Software"), to deal
17#   in the Software without restriction, including without limitation the rights
18#   to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
19#   copies of the Software, and to permit persons to whom the Software is
20#   furnished to do so, subject to the following conditions:
21#
22#   The above copyright notice and this permission notice shall be included in
23#   all copies or substantial portions of the Software.
24#
25#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
26#   IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
27#   FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
28#   AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
29#   LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
30#   OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
31#   THE SOFTWARE.
32#################################################################################
33
34
35"""
36
37********************************************
38:mod:`file_name_parameters` -- Overview
39********************************************
40
41
42.. moduleauthor:: Leo Goodstadt <ruffus@llew.org.uk>
43
44    Handles file names for ruffus
45
46
47"""
48
49
50# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
51
52#   imports
53
54
55# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
56import os
57import sys
58import time
59import glob
60from itertools import groupby
61import itertools
62from collections import defaultdict
63from time import strftime, gmtime
64
65if sys.hexversion >= 0x03000000:
66    # everything is unicode in python3
67    path_str_type = str
68else:
69    path_str_type = basestring
70
71
72class t_combinatorics_type:
73    (COMBINATORICS_PRODUCT, COMBINATORICS_PERMUTATIONS,
74        COMBINATORICS_COMBINATIONS, COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT) = list(range(4))
75
76
77def get_readable_path_str(original_path, max_len):
78    """
79    Truncates path to max_len characters if necessary
80    If the result is a path within nested directory, will remove partially
81        truncated directories names
82    """
83    if len(original_path) < max_len:
84        return original_path
85    truncated_name = original_path[-(max_len - 5):]
86    if "/" not in truncated_name:
87        return "[...]" + truncated_name
88    return "[...]" + re.sub("^[^/]+", "", truncated_name)
89
90
91def epoch_seconds_to_str(epoch_seconds):
92    """
93    Converts seconds since epoch into nice string with date and time to 2 significant
94        digits for seconds
95    """
96    #   returns 24 char long  25 May 2011 23:37:40.12
97    time_str = strftime("%d %b %Y %H:%M:%S", gmtime(epoch_seconds))
98    #
99    fraction_of_second_as_str = (
100        "%.2f" % (epoch_seconds - int(epoch_seconds)))[1:]
101    #   or fraction = ("%.2f" % (divmod(epoch_seconds, 1)[1]))[1:]
102    return (time_str + fraction_of_second_as_str)
103
104
105err_msg_no_regex_match = ("No jobs were run because no file names matched.\n"
106                          "Please make sure that the regular expression is correctly specified.")
107err_msg_empty_files_parameter = ("@files() was empty, i.e. no files were specified.\n"
108                                 "Please make sure this is by design.")
109
110
111class t_file_names_transform(object):
112    """
113    Does the work for generating output / "extra input" / "extra" filenames
114        input
115            - a set of file names (derived from tasks, globs, hard coded file names)
116            - a specification (e.g. a new suffix, a regular expression substitution pattern)
117        output
118            - a new file name
119
120    N.B. Is this level of abstraction adequate?
121        1) On one hand, this is a simple extension of the current working design
122        2) On the other, we throw away the nested structure of tasks / globs on one hand
123           and the nested structure of the outputs on the other hand.
124    """
125
126    def substitute(self, starting_file_names, pattern):
127        pass
128
129    # overriden only in t_suffix_file_names_transform
130    # only suffix() behaves differently for output and extra files...
131    def substitute_output_files(self, starting_file_names, pattern):
132        return self.substitute(starting_file_names, pattern)
133
134
135class t_suffix_file_names_transform(t_file_names_transform):
136    """
137    Does the work for generating output / "extra input" / "extra" filenames
138        replacing a specified suffix
139    """
140
141    def __init__(self, enclosing_task, suffix_object, error_type, descriptor_string, output_dir):
142        self.matching_regex = compile_suffix(
143            enclosing_task, suffix_object, error_type, descriptor_string)
144        self.matching_regex_str = suffix_object.args[0]
145        self.output_dir = output_dir
146
147    def substitute(self, starting_file_names, pattern):
148        if self.output_dir == []:
149            return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern)
150        else:
151            # change directory of starting file and return substitution
152            starting_file_name = os.path.join(
153                self.output_dir, os.path.split(starting_file_names[0])[1])
154            return regex_replace(starting_file_name, self.matching_regex_str, self.matching_regex, pattern)
155        return
156
157    def substitute_output_files(self, starting_file_names, pattern):
158        if self.output_dir == []:
159            return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern, SUFFIX_SUBSTITUTE)
160        else:
161            # change directory of starting file and return substitution
162            starting_file_name = os.path.join(
163                self.output_dir, os.path.split(starting_file_names[0])[1])
164            return regex_replace(starting_file_name, self.matching_regex_str, self.matching_regex, pattern, SUFFIX_SUBSTITUTE)
165
166
167class t_regex_file_names_transform(t_file_names_transform):
168    """
169    Does the work for generating output / "extra input" / "extra" filenames
170        replacing a specified regular expression
171    """
172
173    def __init__(self, enclosing_task, regex_object, error_type, descriptor_string):
174        self.matching_regex = compile_regex(
175            enclosing_task, regex_object, error_type, descriptor_string)
176        self.matching_regex_str = regex_object.args[0]
177
178    def substitute(self, starting_file_names, pattern):
179        return regex_replace(starting_file_names[0], self.matching_regex_str, self.matching_regex, pattern)
180
181
182class t_formatter_file_names_transform(t_file_names_transform):
183    """
184    Does the work for generating output / "extra input" / "extra" filenames
185        replacing a specified regular expression
186    """
187
188    def __init__(self, enclosing_task, format_object, error_type, descriptor_string):
189        self.matching_regexes = []
190        self.matching_regex_strs = []
191        if len(format_object.args):
192            self.matching_regexes = compile_formatter(
193                enclosing_task, format_object, error_type, descriptor_string)
194            self.matching_regex_strs = list(format_object.args)
195
196    def substitute(self, starting_file_names, pattern):
197        # note: uses all file names
198        return formatter_replace(starting_file_names, self.matching_regex_strs, self.matching_regexes, pattern)
199
200
201class t_nested_formatter_file_names_transform(t_file_names_transform):
202    """
203    Does the work for generating output / "extra input" / "extra" filenames
204        apply a whole series of regular expresions to a whole series of input
205    """
206
207    def __init__(self, enclosing_task, format_objects, error_type, descriptor_string):
208        self.list_matching_regex = []
209        self.list_matching_regex_str = []
210
211        for format_object in format_objects:
212            if len(format_object.args):
213                self.list_matching_regex.append(compile_formatter(
214                    enclosing_task, format_object, error_type, descriptor_string))
215                self.list_matching_regex_str.append(list(format_object.args))
216            else:
217                self.list_matching_regex.append([])
218                self.list_matching_regex_str.append([])
219
220    def substitute(self, starting_file_names, pattern):
221        # note: uses all file names
222        return nested_formatter_replace(starting_file_names, self.list_matching_regex_str, self.list_matching_regex, pattern)
223
224
225# _________________________________________________________________________________________
226
227#   t_params_tasks_globs_run_time_data
228
229# _________________________________________________________________________________________
230class t_params_tasks_globs_run_time_data(object):
231    """
232    After parameters are parsed into tasks, globs, runtime data
233    """
234
235    def __init__(self, params, tasks, globs, runtime_data_names):
236        self.params = params
237        self.tasks = tasks
238        self.globs = globs
239        self.runtime_data_names = runtime_data_names
240
241    def __str__(self):
242        return str(self.params)
243
244    def param_iter(self):
245        for p in self.params:
246            yield t_params_tasks_globs_run_time_data(p, self.tasks, self.globs,
247                                                     self.runtime_data_names)
248
249    def unexpanded_globs(self):
250        """
251        do not expand globs
252        """
253        return t_params_tasks_globs_run_time_data(self.params, self.tasks, [],
254                                                  self.runtime_data_names)
255
256    def single_file_to_list(self):
257        """
258        if parameter is a simple string, wrap that in a list unless it is glob
259        Useful for simple @transform cases
260        """
261        if isinstance(self.params, path_str_type) and not is_glob(self.params):
262            self.params = [self.params]
263            return True
264        return False
265
266    def file_names_transformed(self, filenames, file_names_transform):
267        """
268        return clone with the filenames / globs transformed by the supplied transform object
269        """
270        output_glob = file_names_transform.substitute(filenames, self.globs)
271        output_param = file_names_transform.substitute(filenames, self.params)
272        return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob,
273                                                  self.runtime_data_names)
274
275    def output_file_names_transformed(self, filenames, file_names_transform):
276        """
277        return clone with the filenames / globs transformed by the supplied transform object
278        """
279        output_glob = file_names_transform.substitute_output_files(
280            filenames, self.globs)
281        output_param = file_names_transform.substitute_output_files(
282            filenames, self.params)
283        return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob,
284                                                  self.runtime_data_names)
285    #
286    #   deprecated
287    #
288
289    def regex_replaced(self, filename, regex, regex_or_suffix=REGEX_SUBSTITUTE):
290        output_glob = regex_replace(
291            filename, regex, self.globs, regex_or_suffix)
292        output_param = regex_replace(
293            filename, regex, self.params, regex_or_suffix)
294        return t_params_tasks_globs_run_time_data(output_param, self.tasks, output_glob,
295                                                  self.runtime_data_names)
296
297
298# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
299
300#   needs_update_func
301
302#       functions which are called to see if a job needs to be updated
303#
304#   Each task is a series of parallel jobs
305#           each of which has the following pseudo-code
306#
307#   for param in param_generator_func():
308#       if needs_update_func(*param):
309#           job_wrapper(*param)
310#
311#   N.B. param_generator_func yields iterators of *sequences*
312#   if you are generating single parameters, turn them into lists:
313#
314#       for a in alist:
315#           yield (a,)
316#
317# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
318# _________________________________________________________________________________________
319
320#   needs_update_check_directory_missing
321
322#       N.B. throws exception if this is an ordinary file, not a directory
323
324
325# _________________________________________________________________________________________
326def needs_update_check_directory_missing(*params, **kwargs):
327    """
328    Called per directory:
329        Does it exist?
330        Is it an ordinary file not a directory? (throw exception
331    """
332    if len(params) == 1:
333        dirs = params[0]
334    elif len(params) == 2:
335        dirs = params[1]
336    else:
337        raise Exception(
338            "Wrong number of arguments in mkdir check %s" % (params,))
339
340    missing_directories = []
341    for d in get_strings_in_flattened_sequence(dirs):
342        # print >>sys.stderr, "check directory missing %d " % os.path.exists(d) # DEBUG
343        if not os.path.exists(d):
344            missing_directories.append(d)
345            continue
346            # return True, "Directory [%s] is missing" % d
347        if not os.path.isdir(d):
348            raise error_not_a_directory(
349                "%s already exists but as a file, not a directory" % d)
350
351    if len(missing_directories):
352        if len(missing_directories) > 1:
353            return True, ": Directories %r are missing" % (", ".join(missing_directories))
354        else:
355            return True, ": Directories %r is missing" % (missing_directories[0])
356    return False, "All directories exist"
357
358
359def check_input_files_exist(*params):
360    """If inputs are missing then there is no way a job can run
361    successful. Must throw exception.
362
363    This extra function is a hack to make sure input files exists
364    right before job is called for better error messages, and to save
365    things from blowing up inside the task function.
366
367    In practice, we have observed a sporadic time-lag between a task
368    completing on a remote node and an output file appearing in the
369    expected location on the host running the ruffus pipeline. This
370    causes a subsequent task to fail with missing input file even
371    though the file will appear a few seconds later. It is not clear
372    if this is an issue of a poorly configured storage, but a retry
373    behaviour has been implemented to work around such issues.
374
375    """
376    if len(params):
377        input_files = params[0]
378
379        for f in get_strings_in_flattened_sequence(input_files):
380            tries = FILE_CHECK_RETRY
381            while tries > 0:
382                if not os.path.exists(f):
383                    if os.path.lexists(f):
384                        raise MissingInputFileError("No way to run job: " +
385                                                    "Input file '%s' is a broken symbolic link." % f)
386                    tries -= 1
387                    time.sleep(FILE_CHECK_SLEEP)
388                    continue
389                break
390            if tries <= 0:
391                raise MissingInputFileError("No way to run job: " +
392                                            "Input file '%s' does not exist" % f)
393
394
395def needs_update_check_exist(*params, **kwargs):
396    """
397    Given input and output files, see if all exist
398    Each can be
399
400        #. string: assumed to be a filename "file1"
401        #. any other type
402        #. arbitrary nested sequence of (1) and (2)
403
404    """
405    if "verbose_abbreviated_path" in kwargs:
406        verbose_abbreviated_path = kwargs["verbose_abbreviated_path"]
407    else:
408        verbose_abbreviated_path = -55
409
410    # missing output means build
411    if len(params) < 2:
412        return True, "i/o files not specified"
413
414    i, o = params[0:2]
415    i = get_strings_in_flattened_sequence(i)
416    o = get_strings_in_flattened_sequence(o)
417
418    #
419    # build: missing output file
420    #
421    if len(o) == 0:
422        return True, "Missing output file"
423
424    # missing input / output file means always build
425    missing_files = []
426    for io in (i, o):
427        for p in io:
428            if not os.path.exists(p):
429                missing_files.append(p)
430    if len(missing_files):
431        return True, "...\n        Missing file%s %s" % ("s" if len(missing_files) > 1 else "",
432                                                         shorten_filenames_encoder(missing_files,
433                                                                                   verbose_abbreviated_path))
434
435    #
436    #   missing input -> build only if output absent
437    #
438    if len(i) == 0:
439        return False, "Missing input files"
440
441    return False, "Up to date"
442
443
444# _________________________________________________________________________________________
445
446#   needs_update_check_modify_time
447
448# _________________________________________________________________________________________
449def needs_update_check_modify_time(*params, **kwargs):
450    """
451    Given input and output files, see if all exist and whether output files are later than input files
452    Each can be
453
454        #. string: assumed to be a filename "file1"
455        #. any other type
456        #. arbitrary nested sequence of (1) and (2)
457
458    """
459    # conditions for rerunning a job:
460    #   1. forced to rerun entire taskset
461    #   2. 1+ Output files don't exist
462    #   3. 1+ of input files is newer than 1+ output files  -- ruffus does this level right now...
463    #   4. internal completion time for that file is out of date   # incomplete runs will be rerun automatically
464    #   5. checksum of code that ran the file is out of date       # changes to function body result in rerun
465    #   6. checksum of the args that ran the file are out of date  # appropriate config file changes result in rerun
466    try:
467        task = kwargs['task']
468    except KeyError:
469        # allow the task not to be specified and fall back to classic
470        # file timestamp behavior (either this or fix all the test cases,
471        # which often don't have proper tasks)
472        class Namespace:
473            pass
474        task = Namespace()
475        task.checksum_level = CHECKSUM_FILE_TIMESTAMPS
476
477    if "verbose_abbreviated_path" in kwargs:
478        verbose_abbreviated_path = kwargs["verbose_abbreviated_path"]
479    else:
480        verbose_abbreviated_path = -55
481
482    try:
483        job_history = kwargs['job_history']
484    except KeyError:
485        # allow job_history not to be specified and reopen dbdict file redundantly...
486        #   Either this or fix all the test cases
487        #job_history = dbdict.open(RUFFUS_HISTORY_FILE, picklevalues=True)
488        print("Oops: Should only appear in test code", file=sys.stderr)
489        job_history = open_job_history(None)
490
491    # missing output means build
492    if len(params) < 2:
493        return True, ""
494
495    i, o = params[0:2]
496    i = get_strings_in_flattened_sequence(i)
497    o = get_strings_in_flattened_sequence(o)
498
499    #
500    # build: missing output file
501    #
502    if len(o) == 0:
503        return True, "Missing output file"
504
505    # missing input / output file means always build
506    missing_files = []
507    for io in (i, o):
508        for p in io:
509            if not os.path.exists(p):
510                missing_files.append(p)
511    if len(missing_files):
512        return True, "...\n        Missing file%s %s" % ("s" if len(missing_files) > 1 else "",
513                                                         shorten_filenames_encoder(missing_files,
514                                                                                   verbose_abbreviated_path))
515    #
516    #   N.B. Checkpointing uses relative paths
517    #
518
519    # existing files, but from previous interrupted runs
520    if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS:
521        incomplete_files = []
522        set_incomplete_files = set()
523        func_changed_files = []
524        set_func_changed_files = set()
525        param_changed_files = []
526        set_param_changed_files = set()
527        # for io in (i, o):
528        #    for p in io:
529        #        if p not in job_history:
530        #            incomplete_files.append(p)
531        for p in o:
532            if os.path.relpath(p) not in job_history and p not in set_incomplete_files:
533                incomplete_files.append(p)
534                set_incomplete_files.add(p)
535        if len(incomplete_files):
536            return True, "Uncheckpointed file%s (left over from a failed run?):\n        %s" % ("s" if len(incomplete_files) > 1 else "",
537                                                                                                shorten_filenames_encoder(incomplete_files,
538                                                                                                                          verbose_abbreviated_path))
539        # check if function that generated our output file has changed
540        for o_f_n in o:
541            rel_o_f_n = os.path.relpath(o_f_n)
542            old_chksum = job_history[rel_o_f_n]
543            new_chksum = JobHistoryChecksum(rel_o_f_n, None, params[2:], task)
544            if task.checksum_level >= CHECKSUM_FUNCTIONS_AND_PARAMS and \
545                    new_chksum.chksum_params != old_chksum.chksum_params and \
546                    o_f_n not in set_func_changed_files:
547                param_changed_files.append(o_f_n)
548                set_param_changed_files.add(o_f_n)
549            elif task.checksum_level >= CHECKSUM_FUNCTIONS and \
550                    new_chksum.chksum_func != old_chksum.chksum_func and \
551                    o_f_n not in set_func_changed_files:
552                func_changed_files.append(o_f_n)
553                set_func_changed_files.add(o_f_n)
554
555        if len(func_changed_files):
556            return True, "Pipeline function has changed:\n        %s" % (shorten_filenames_encoder(func_changed_files,
557                                                                                                   verbose_abbreviated_path))
558        if len(param_changed_files):
559            return True, "Pipeline parameters have changed:\n        %s" % (shorten_filenames_encoder(param_changed_files,
560                                                                                                      verbose_abbreviated_path))
561
562    #
563    #   missing input -> build only if output absent or function is out of date
564    #
565    if len(i) == 0:
566        return False, "Missing input files"
567
568    #
569    #   get sorted modified times for all input and output files
570    #
571    filename_to_times = [[], []]
572    file_times = [[], []]
573
574    # _____________________________________________________________________________________
575
576    #   pretty_io_with_date_times
577
578    # _____________________________________________________________________________________
579
580    def pretty_io_with_date_times(filename_to_times):
581
582        # sort
583        for io in range(2):
584            filename_to_times[io].sort()
585
586        #
587        #   add asterisk for all files which are causing this job to be out of date
588        #
589        file_name_to_asterisk = dict()
590        oldest_output_mtime = filename_to_times[1][0][0]
591        for mtime, file_name in filename_to_times[0]:
592            file_name_to_asterisk[file_name] = "*" if mtime >= oldest_output_mtime else " "
593        newest_output_mtime = filename_to_times[0][-1][0]
594        for mtime, file_name in filename_to_times[1]:
595            file_name_to_asterisk[file_name] = "*" if mtime <= newest_output_mtime else " "
596
597        #
598        #   try to fit in 100 - 15 = 85 char lines
599        #   date time ~ 25 characters so limit file name to 55 characters
600        #
601        msg = "\n"
602        category_names = "Input", "Output"
603        for io in range(2):
604            msg += "  %s files:\n" % category_names[io]
605            for mtime, file_name in filename_to_times[io]:
606                file_datetime_str = epoch_seconds_to_str(mtime)
607                msg += ("   " +                                         # indent
608                        # asterisked out of date files
609                        file_name_to_asterisk[file_name] + " " +
610                        file_datetime_str + ": " +                      # date time of file
611                        shorten_filenames_encoder(file_name,
612                                                  verbose_abbreviated_path) + "\n")    # file name truncated to 55
613        return msg
614
615    #
616    #   Ignore output file if it is found in the list of input files
617    #       By definition they have the same timestamp,
618    #       and the job will otherwise appear to be out of date
619    #
620    #   Symbolic links followed
621    real_input_file_names = set()
622    for input_file_name in i:
623        rel_input_file_name = os.path.relpath(input_file_name)
624        real_input_file_names.add(os.path.realpath(input_file_name))
625        file_timestamp = os.path.getmtime(input_file_name)
626        if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS and rel_input_file_name in job_history:
627            old_chksum = job_history[rel_input_file_name]
628            mtime = max(file_timestamp, old_chksum.mtime)
629        else:
630            mtime = file_timestamp
631        filename_to_times[0].append((mtime, input_file_name))
632        file_times[0].append(mtime)
633
634    # for output files, we need to check modification time *in addition* to
635    # function and argument checksums...
636    for output_file_name in o:
637        #
638        #   Ignore output files which are just symbolic links to input files or passed through
639        #       from input to output
640        #
641        real_file_name = os.path.realpath(output_file_name)
642        if real_file_name in real_input_file_names:
643            continue
644
645        rel_output_file_name = os.path.relpath(output_file_name)
646        file_timestamp = os.path.getmtime(output_file_name)
647        if task.checksum_level >= CHECKSUM_HISTORY_TIMESTAMPS:
648            old_chksum = job_history[rel_output_file_name]
649            if old_chksum.mtime > file_timestamp and old_chksum.mtime - file_timestamp > 1.1:
650                mtime = file_timestamp
651            # use check sum time in preference if both are within one second
652            #   (suggesting higher resolution
653            else:
654                mtime = old_chksum.mtime
655        else:
656            mtime = file_timestamp
657        file_times[1].append(mtime)
658        filename_to_times[1].append((mtime, output_file_name))
659
660    #
661    #   Debug: Force print modified file names and times
662    #
663    # if len(file_times[0]) and len (file_times[1]):
664    #    print >>sys.stderr, pretty_io_with_date_times(filename_to_times), file_times, (max(file_times[0]) >= min(file_times[1]))
665    # else:
666    #    print >>sys.stderr, i, o
667
668    #
669    #   update if any input file >= (more recent) output file
670    #
671    if len(file_times[0]) and len(file_times[1]) and max(file_times[0]) >= min(file_times[1]):
672        return True, pretty_io_with_date_times(filename_to_times)
673
674    if "return_file_dates_when_uptodate" in kwargs and kwargs["return_file_dates_when_uptodate"]:
675        return False, "Up to date\n" + pretty_io_with_date_times(filename_to_times)
676
677    return False, "Up to date"
678
679
680# _________________________________________________________________________________________
681#
682#   is_file_re_combining
683#
684# _________________________________________________________________________________________
685def is_file_re_combining(old_args):
686    """
687    Helper function for @files_re
688    check if parameters wrapped in combine
689    """
690    combining_all_jobs = False
691    orig_args = []
692    for arg in old_args:
693        if isinstance(arg, combine):
694            combining_all_jobs = True
695            if len(arg.args) == 1:
696                orig_args.append(arg.args[0])
697            else:
698                orig_args.append(arg[0].args)
699        else:
700            orig_args.append(arg)
701    return combining_all_jobs, orig_args
702
703
704# _________________________________________________________________________________________
705
706#   file_names_from_tasks_globs
707
708# _________________________________________________________________________________________
709def file_names_from_tasks_globs(files_task_globs,
710                                runtime_data, do_not_expand_single_job_tasks=False):
711    """
712    Replaces glob specifications and tasks with actual files / task output
713    """
714
715    # special handling for chaining tasks which conceptual have a single job
716    #       i.e. @merge and @files/@parallel with single job parameters
717    if files_task_globs.params.__class__.__name__ == 'Task' and do_not_expand_single_job_tasks:
718        return files_task_globs.params._get_output_files(True, runtime_data)
719
720    task_or_glob_to_files = dict()
721
722    # look up globs and tasks
723    for g in files_task_globs.globs:
724        # check whether still is glob pattern after transform
725        # {} are particularly suspicious...
726        if is_glob(g):
727            task_or_glob_to_files[g] = sorted(glob.glob(g))
728    for t in files_task_globs.tasks:
729        of = t._get_output_files(False, runtime_data)
730        task_or_glob_to_files[t] = of
731    for n in files_task_globs.runtime_data_names:
732        data_name = n.args[0]
733        if data_name in runtime_data:
734            task_or_glob_to_files[n] = runtime_data[data_name]
735        else:
736            raise error_missing_runtime_parameter("The inputs of this task depends on " +
737                                                  "the runtime parameter " +
738                                                  "'%s' which is missing " % data_name)
739
740    return expand_nested_tasks_or_globs(files_task_globs.params, task_or_glob_to_files)
741
742
743# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
744
745#   param_factories
746
747#       makes python generators which yield parameters for
748#
749#           A) needs_update_func
750#           B) job_wrapper
751
752#       Each task is a series of parallel jobs
753#           each of which has the following pseudo-code
754#
755#       for param in param_generator_func():
756#           if needs_update_func(*param):
757#               act_func(*param)
758#
759#       Test Usage:
760#
761#
762#       param_func = xxx_factory(tasks, globs, orig_input_params, ...)
763#
764#        for params in param_func():
765#           i, o = params[0:1]
766#           print " input_params = " , i
767#           print "output  = " , o
768#
769#
770#
771#
772#
773# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
774
775# _________________________________________________________________________________________
776
777#   touch_file_factory
778
779# _________________________________________________________________________________________
780def touch_file_factory(orig_args, register_cleanup):
781    """
782    Creates function, which when called, will touch files
783    """
784    file_names = orig_args
785    # accepts unicode
786    if isinstance(orig_args, path_str_type):
787        file_names = [orig_args]
788    else:
789        # make copy so when original is modifies, we don't get confused!
790        file_names = list(orig_args)
791
792    def do_touch_file():
793        for f in file_names:
794            if not os.path.exists(f):
795                with open(f, 'w') as ff:
796                    pass
797            else:
798                os.utime(f, None)
799            register_cleanup(f, "touch")
800    return do_touch_file
801
802
803# _________________________________________________________________________________________
804
805#   file_param_factory
806
807#       orig_args = ["input", "output", 1, 2, ...]
808#       orig_args = [
809#                       ["input0",               "output0",                1, 2, ...]   # job 1
810#                       [["input1a", "input1b"], "output1",                1, 2, ...]   # job 2
811#                       ["input2",               ["output2a", "output2b"], 1, 2, ...]   # job 3
812#                       ["input3",               "output3",                1, 2, ...]   # job 4
813#                   ]
814#
815# _________________________________________________________________________________________
816def args_param_factory(orig_args):
817    """
818    Factory for functions which
819        yield tuples of inputs, outputs / extras
820
821    ..Note::
822
823        1. Each job requires input/output file names
824        2. Input/output file names can be a string, an arbitrarily nested sequence
825        3. Non-string types are ignored
826        3. Either Input or output file name must contain at least one string
827
828    """
829    def iterator(runtime_data):
830        for job_param in orig_args:
831            yield job_param, job_param
832    return iterator
833
834# _________________________________________________________________________________________
835
836#   file_param_factory
837
838#       orig_args = ["input", "output", 1, 2, ...]
839#       orig_args = [
840#                       ["input0",               "output0",                1, 2, ...]   # job 1
841#                       [["input1a", "input1b"], "output1",                1, 2, ...]   # job 2
842#                       ["input2",               ["output2a", "output2b"], 1, 2, ...]   # job 3
843#                       ["input3",               "output3",                1, 2, ...]   # job 4
844#                   ]
845#
846# _________________________________________________________________________________________
847
848
849def files_param_factory(input_files_task_globs,
850                        do_not_expand_single_job_tasks, output_extras):
851    """
852    Factory for functions which
853        yield tuples of inputs, outputs / extras
854
855    ..Note::
856
857        1. Each job requires input/output file names
858        2. Input/output file names can be a string, an arbitrarily nested sequence
859        3. Non-string types are ignored
860        3. Either Input or output file name must contain at least one string
861
862    """
863    def iterator(runtime_data):
864        # substitute inputs
865        # input_params = file_names_from_tasks_globs(input_files_task_globs, runtime_data, False)
866
867        if input_files_task_globs.params == []:
868            if "ruffus_WARNING" not in runtime_data:
869                runtime_data["ruffus_WARNING"] = defaultdict(set)
870            runtime_data["ruffus_WARNING"][iterator].add(
871                err_msg_empty_files_parameter)
872            return
873
874        for input_spec, output_extra_param in zip(input_files_task_globs.param_iter(), output_extras):
875            input_param = file_names_from_tasks_globs(
876                input_spec, runtime_data, do_not_expand_single_job_tasks)
877            yield_param = (input_param, ) + output_extra_param
878            yield yield_param, yield_param
879    return iterator
880
881
882def files_custom_generator_param_factory(generator):
883    """
884    Factory for @files taking custom generators
885        wraps so that the generator swallows the extra runtime_data argument
886
887    """
888    def iterator(runtime_data):
889        for params in generator():
890            yield params, params
891    return iterator
892
893# _________________________________________________________________________________________
894
895#   split_param_factory
896
897# _________________________________________________________________________________________
898
899
900def split_param_factory(input_files_task_globs, output_files_task_globs, *extra_params):
901    """
902    Factory for task_split
903    """
904    def iterator(runtime_data):
905        # do_not_expand_single_job_tasks = True
906
907        #
908        #   substitute tasks / globs at runtime. No glob subsitution for logging
909        #
910        input_param = file_names_from_tasks_globs(
911            input_files_task_globs,                     runtime_data, True)
912        output_param = file_names_from_tasks_globs(
913            output_files_task_globs,                    runtime_data)
914        output_param_logging = file_names_from_tasks_globs(
915            output_files_task_globs.unexpanded_globs(), runtime_data)
916
917        yield (input_param, output_param) + extra_params, (input_param, output_param_logging) + extra_params
918
919    return iterator
920
921
922# _________________________________________________________________________________________
923
924#   merge_param_factory
925
926# _________________________________________________________________________________________
927def merge_param_factory(input_files_task_globs,
928                        output_param,
929                        *extra_params):
930    """
931    Factory for task_merge
932    """
933    #
934    def iterator(runtime_data):
935        # do_not_expand_single_job_tasks = True
936        input_param = file_names_from_tasks_globs(
937            input_files_task_globs, runtime_data, True)
938        yield_param = (input_param, output_param) + extra_params
939        yield yield_param, yield_param
940
941    return iterator
942
943
944# _________________________________________________________________________________________
945
946#   originate_param_factory
947
948# _________________________________________________________________________________________
949def originate_param_factory(list_output_files_task_globs,
950                            *extra_params):
951    """
952    Factory for task_originate
953    """
954    #
955    def iterator(runtime_data):
956        for output_files_task_globs in list_output_files_task_globs:
957            output_param = file_names_from_tasks_globs(
958                output_files_task_globs,                    runtime_data)
959            output_param_logging = file_names_from_tasks_globs(
960                output_files_task_globs.unexpanded_globs(), runtime_data)
961            yield (None, output_param) + tuple(extra_params), (None, output_param_logging) + tuple(extra_params)
962
963    return iterator
964
965
966# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
967
968#   param_factories
969
970#       ... which take inputs(), add_inputs(), suffix(), regex(), formatter()
971
972# 88888888888888888888888888888888888888888888888888888888888888888888888888888888888888888
973
974
975# _________________________________________________________________________________________
976
977#   input_param_to_file_name_list
978
979# _________________________________________________________________________________________
980def input_param_to_file_name_list(input_params):
981    """
982    Common function for
983            collate_param_factory
984            transform_param_factory
985            subdivide_param_factory
986        Creates adapter object
987        Converts (on the fly) collection / iterator of input params
988                ==> generator of flat list of strings (file_names)
989    """
990    for per_job_input_param in input_params:
991        flattened_list_of_file_names = get_strings_in_flattened_sequence(
992            per_job_input_param)
993        yield per_job_input_param, flattened_list_of_file_names
994
995
996# _________________________________________________________________________________________
997
998#   input_param_to_file_name_list
999
1000# _________________________________________________________________________________________
1001def list_input_param_to_file_name_list(input_params):
1002    """
1003    Common function for
1004            product_param_factory
1005        Creates adapter object
1006        Converts (on the fly) collection / iterator of nested (input params)
1007                ==> generator of flat list of strings (file_names)
1008    """
1009    for per_job_input_param_list in input_params:
1010        list_of_flattened_list_of_file_names = [
1011            get_strings_in_flattened_sequence(ii) for ii in per_job_input_param_list]
1012        yield per_job_input_param_list, list_of_flattened_list_of_file_names
1013
1014
1015# _________________________________________________________________________________________
1016
1017#   yield_io_params_per_job
1018
1019# _________________________________________________________________________________________
1020def yield_io_params_per_job(input_params,
1021                            file_names_transform,
1022                            extra_input_files_task_globs,
1023                            replace_inputs,
1024                            output_pattern,
1025                            extra_specs,
1026                            runtime_data,
1027                            iterator,
1028                            expand_globs_in_output=False):
1029    """
1030    Helper function for
1031        transform_param_factory and
1032        collate_param_factory and
1033        subdivide_param_factory and
1034        combinatorics_param_factory and
1035        product_param_factory
1036
1037
1038    *********************************************************
1039    *                                                       *
1040    *  Bad (non-orthogonal) design here. Needs refactoring  *
1041    *                                                       *
1042    *********************************************************
1043
1044        subdivide_param_factory requires globs patterns to be expanded
1045
1046            yield (function call parameters, display parameters)
1047
1048        all others
1049
1050            yield function call parameters
1051
1052
1053        This means that
1054
1055            all but @subdivide have
1056
1057                for y in yield_io_params_per_job (...):
1058                    yield y, y
1059
1060            subdivide_param_factory has:
1061
1062                return yield_io_params_per_job
1063
1064        We would make everything more orthogonal but the current code makes collate easier to write...
1065
1066            collate_param_factory
1067
1068                for output_extra_params, grouped_params in groupby(sorted(io_params_iter, key = get_output_extras), key = get_output_extras):
1069
1070
1071
1072
1073    """
1074    #
1075    #   Add extra warning if no regular expressions match:
1076    #   This is a common class of frustrating errors
1077    #
1078    no_regular_expression_matches = True
1079
1080    for orig_input_param, filenames in input_params:
1081        try:
1082
1083            #
1084            #   Should run job even if there are no file names, so long as there are input parameters...??
1085            #
1086            # if not orig_input_param:
1087            if not filenames:
1088                continue
1089
1090            #
1091            #   extra input has a mixture of input and output parameter behaviours:
1092            #       1) If it contains tasks, the files from these are passed through unchanged
1093            #       2) If it contains strings which look like strings,
1094            #          these are transformed using regular expression, file component substitution etc.
1095            #          just like output params
1096            #
1097            #       So we do (2) first, ignoring tasks, then (1)
1098            if extra_input_files_task_globs:
1099                extra_inputs = extra_input_files_task_globs.file_names_transformed(
1100                    filenames, file_names_transform)
1101
1102                #
1103                # add or replace existing input parameters
1104                #
1105                if replace_inputs == t_extra_inputs.REPLACE_INPUTS:
1106                    input_param = file_names_from_tasks_globs(
1107                        extra_inputs, runtime_data)
1108                elif replace_inputs == t_extra_inputs.ADD_TO_INPUTS:
1109                    input_param = (
1110                        orig_input_param,) + file_names_from_tasks_globs(extra_inputs, runtime_data)
1111                else:
1112                    input_param = orig_input_param
1113            else:
1114                input_param = orig_input_param
1115
1116            # extras
1117            extra_params = tuple(file_names_transform.substitute(
1118                filenames, p) for p in extra_specs)
1119
1120            if expand_globs_in_output:
1121                #
1122                #   do regex substitution to complete glob pattern
1123                #       before glob matching
1124                #
1125                output_pattern_transformed = output_pattern.output_file_names_transformed(
1126                    filenames, file_names_transform)
1127                output_param = file_names_from_tasks_globs(
1128                    output_pattern_transformed, runtime_data)
1129                output_param_unglobbed = file_names_from_tasks_globs(
1130                    output_pattern_transformed.unexpanded_globs(), runtime_data)
1131                yield ((input_param, output_param) + extra_params,
1132                       (input_param, output_param_unglobbed) + extra_params)
1133            else:
1134
1135                # output
1136                output_param = file_names_transform.substitute_output_files(
1137                    filenames, output_pattern)
1138                yield (input_param, output_param) + extra_params
1139
1140            no_regular_expression_matches = False
1141
1142        # match failures are ignored
1143        except error_input_file_does_not_match:
1144            if runtime_data != None:
1145                if not "MATCH_FAILURE" in runtime_data:
1146                    runtime_data["MATCH_FAILURE"] = defaultdict(set)
1147                runtime_data["MATCH_FAILURE"][iterator].add(
1148                    str(sys.exc_info()[1]).strip())
1149            continue
1150
1151        # all other exceptions including malformed regexes are raised
1152        except Exception:
1153            # print sys.exc_info()
1154            raise
1155
1156    #
1157    #   Add extra warning if no regular expressions match:
1158    #   This is a common class of frustrating errors
1159    #
1160    if no_regular_expression_matches == True:
1161        if runtime_data != None:
1162            if "ruffus_WARNING" not in runtime_data:
1163                runtime_data["ruffus_WARNING"] = defaultdict(set)
1164            runtime_data["ruffus_WARNING"][iterator].add(
1165                err_msg_no_regex_match)
1166
1167
1168# _________________________________________________________________________________________
1169
1170#   subdivide_param_factory
1171
1172# _________________________________________________________________________________________
1173def subdivide_param_factory(input_files_task_globs,
1174                            file_names_transform,
1175                            extra_input_files_task_globs,
1176                            replace_inputs,
1177                            output_files_task_globs,
1178                            *extra_specs):
1179    """
1180    Factory for task_split (advanced form)
1181    """
1182    def iterator(runtime_data):
1183
1184        #
1185        #   Convert input file names, globs, and tasks -> a list of (nested) file names
1186        #       Each element of the list corresponds to the input parameters of a single job
1187        #
1188        input_params = file_names_from_tasks_globs(
1189            input_files_task_globs, runtime_data)
1190
1191        if not len(input_params):
1192            return []
1193
1194        return yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))),
1195                                       file_names_transform,
1196                                       extra_input_files_task_globs,
1197                                       replace_inputs,
1198                                       output_files_task_globs,
1199                                       extra_specs,
1200                                       runtime_data,
1201                                       iterator,
1202                                       True)
1203
1204    return iterator
1205
1206
1207# _________________________________________________________________________________________
1208
1209#   combinatorics_param_factory
1210
1211# _________________________________________________________________________________________
1212def combinatorics_param_factory(input_files_task_globs,
1213                                combinatorics_type,
1214                                k_tuple,
1215                                file_names_transform,
1216                                extra_input_files_task_globs,
1217                                replace_inputs,
1218                                output_pattern,
1219                                *extra_specs):
1220    """
1221    Factory for task_combinations_with_replacement, task_combinations, task_permutations
1222    """
1223    def iterator(runtime_data):
1224
1225        #
1226        #   Convert input file names, globs, and tasks -> a list of (nested) file names
1227        #       Each element of the list corresponds to the input parameters of a single job
1228        #
1229        input_params = file_names_from_tasks_globs(
1230            input_files_task_globs, runtime_data)
1231
1232        if not len(input_params):
1233            return
1234
1235        if combinatorics_type == t_combinatorics_type.COMBINATORICS_PERMUTATIONS:
1236            combinatoric_iter = itertools.permutations(input_params, k_tuple)
1237        elif combinatorics_type == t_combinatorics_type.COMBINATORICS_COMBINATIONS:
1238            combinatoric_iter = itertools.combinations(input_params, k_tuple)
1239        elif combinatorics_type == t_combinatorics_type.COMBINATORICS_COMBINATIONS_WITH_REPLACEMENT:
1240            combinatoric_iter = itertools.combinations_with_replacement(
1241                input_params, k_tuple)
1242        else:
1243            raise Exception("Unknown combinatorics type %d" %
1244                            combinatorics_type)
1245
1246        for y in yield_io_params_per_job(list_input_param_to_file_name_list(combinatoric_iter),
1247                                         file_names_transform,
1248                                         extra_input_files_task_globs,
1249                                         replace_inputs,
1250                                         output_pattern,
1251                                         extra_specs,
1252                                         runtime_data,
1253                                         iterator):
1254            yield y, y
1255
1256    return iterator
1257
1258
1259# _________________________________________________________________________________________
1260
1261#   product_param_factory
1262
1263# _________________________________________________________________________________________
1264def product_param_factory(list_input_files_task_globs,
1265                          file_names_transform,
1266                          extra_input_files_task_globs,
1267                          replace_inputs,
1268                          output_pattern,
1269                          *extra_specs):
1270    """
1271    Factory for task_product
1272    """
1273    def iterator(runtime_data):
1274
1275        #
1276        #   Convert input file names, globs, and tasks -> a list of (nested) file names
1277        #       Each element of the list corresponds to the input parameters of a single job
1278        #
1279        input_params_list = [file_names_from_tasks_globs(
1280            ftg, runtime_data) for ftg in list_input_files_task_globs]
1281
1282        #
1283        #   ignore if empty list in any of all versus all
1284        #
1285        if not len(input_params_list):
1286            return
1287
1288        for input_params in input_params_list:
1289            if not len(input_params):
1290                return
1291
1292        for y in yield_io_params_per_job(list_input_param_to_file_name_list(itertools.product(*input_params_list)),
1293                                         file_names_transform,
1294                                         extra_input_files_task_globs,
1295                                         replace_inputs,
1296                                         output_pattern,
1297                                         extra_specs,
1298                                         runtime_data,
1299                                         iterator):
1300            yield y, y
1301
1302    return iterator
1303
1304
1305# _________________________________________________________________________________________
1306
1307#   transform_param_factory
1308
1309# _________________________________________________________________________________________
1310def transform_param_factory(input_files_task_globs,
1311                            file_names_transform,
1312                            extra_input_files_task_globs,
1313                            replace_inputs,
1314                            output_pattern,
1315                            *extra_specs):
1316    """
1317    Factory for task_transform
1318    """
1319    def iterator(runtime_data):
1320
1321        #
1322        #   Convert input file names, globs, and tasks -> a list of (nested) file names
1323        #       Each element of the list corresponds to the input parameters of a single job
1324        #
1325        input_params = file_names_from_tasks_globs(input_files_task_globs, runtime_data)
1326
1327        if not len(input_params):
1328            return
1329
1330        for y in yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))),
1331                                         file_names_transform,
1332                                         extra_input_files_task_globs,
1333                                         replace_inputs,
1334                                         output_pattern,
1335                                         extra_specs,
1336                                         runtime_data,
1337                                         iterator):
1338            yield y, y
1339
1340    return iterator
1341
1342
1343# _________________________________________________________________________________________
1344
1345#   collate_param_factory
1346
1347# _________________________________________________________________________________________
1348def collate_param_factory(input_files_task_globs,
1349                          file_names_transform,
1350                          extra_input_files_task_globs,
1351                          replace_inputs,
1352                          output_pattern,
1353                          *extra_specs):
1354    """
1355    Factory for task_collate
1356
1357    Looks exactly like @transform except that all [input] which lead to the same [output / extra] are combined together
1358    """
1359    #
1360    def iterator(runtime_data):
1361
1362        #
1363        #   Convert input file names, globs, and tasks -> a list of (nested) file names
1364        #       Each element of the list corresponds to the input parameters of a single job
1365        #
1366        input_params = file_names_from_tasks_globs(
1367            input_files_task_globs, runtime_data)
1368
1369        if not len(input_params):
1370            return
1371
1372        io_params_iter = yield_io_params_per_job(input_param_to_file_name_list(sorted(input_params, key=lambda x: str(x))),
1373                                                 file_names_transform,
1374                                                 extra_input_files_task_globs,
1375                                                 replace_inputs,
1376                                                 output_pattern,
1377                                                 extra_specs,
1378                                                 runtime_data,
1379                                                 iterator)
1380
1381        #
1382        #   group job params if their output/extra params are identical
1383        #
1384        # sort by first converted to string, and then grouped itself
1385        # identical things must be adjacent and sorting by strings guarantees that
1386        def get_output_extras(x): return x[1:]
1387
1388        def get_output_extras_str(x): return str(x[1:])
1389        for output_extra_params, grouped_params in groupby(sorted(io_params_iter, key=get_output_extras_str), key=get_output_extras):
1390            #
1391            #   yield the different input params grouped into a tuple, followed by all the common params
1392            #   i.e. (input1, input2, input3), common_output, common_extra1, common_extra2...
1393            #
1394
1395            #   Use group by to avoid successive duplicate input_param (remember we have sorted)
1396            #       This works even with unhashable items!
1397
1398            params = (tuple(input_param for input_param, ignore in
1399                            groupby(g[0] for g in grouped_params)),) + output_extra_params
1400
1401            # the same params twice, once for use, once for display, identical in this case
1402            yield params, params
1403
1404    return iterator
1405