1import re 2import tempfile 3 4from snakemake.exceptions import WorkflowError 5 6 7class DefaultResources: 8 defaults = { 9 "mem_mb": "max(2*input.size_mb, 1000)", 10 "disk_mb": "max(2*input.size_mb, 1000)", 11 "tmpdir": "system_tmpdir", 12 } 13 14 bare_defaults = { 15 "tmpdir": "system_tmpdir", 16 } 17 18 @classmethod 19 def decode_arg(cls, arg): 20 try: 21 return arg.split("=") 22 except ValueError: 23 raise ValueError("Resources have to be defined as name=value pairs.") 24 25 @classmethod 26 def encode_arg(cls, name, value): 27 return "{}={}".format(name, value) 28 29 def __init__(self, args=None, from_other=None, mode="full"): 30 if mode == "full": 31 self._args = dict(DefaultResources.defaults) 32 elif mode == "bare": 33 self._args = dict(DefaultResources.bare_defaults) 34 else: 35 raise ValueError("Unexpected mode for DefaultResources: {}".format(mode)) 36 37 if from_other is not None: 38 self._args = dict(from_other._args) 39 self.parsed = dict(from_other.parsed) 40 else: 41 if args is None: 42 args = [] 43 44 self._args.update( 45 {name: value for name, value in map(self.decode_arg, args)} 46 ) 47 48 def fallback(val): 49 def callable(wildcards, input, attempt, threads, rulename): 50 try: 51 value = eval( 52 val, 53 { 54 "input": input, 55 "attempt": attempt, 56 "threads": threads, 57 "system_tmpdir": tempfile.gettempdir(), 58 }, 59 ) 60 # Triggers for string arguments like n1-standard-4 61 except NameError: 62 return val 63 except Exception as e: 64 if not ( 65 isinstance(e, FileNotFoundError) and e.filename in input 66 ): 67 # Missing input files are handled by the caller 68 raise WorkflowError( 69 "Failed to evaluate DefaultResources value " 70 "'{}'.\n" 71 " String arguments may need additional " 72 "quoting. Ex: --default-resources " 73 "\"tmpdir='/home/user/tmp'\".".format(val) 74 ) 75 raise e 76 return value 77 78 return callable 79 80 self.parsed = dict(_cores=1, _nodes=1) 81 self.parsed.update(parse_resources(self._args, fallback=fallback)) 82 83 def set_resource(self, name, value): 84 self._args[name] = "{}".format(value) 85 self.parsed[name] = value 86 87 @property 88 def args(self): 89 return [self.encode_arg(name, value) for name, value in self._args.items()] 90 91 def __bool__(self): 92 return bool(self.parsed) 93 94 95def parse_resources(resources_args, fallback=None): 96 """Parse resources from args.""" 97 resources = dict() 98 if resources_args is not None: 99 valid = re.compile(r"[a-zA-Z_]\w*$") 100 101 if isinstance(resources_args, list): 102 resources_args = map(DefaultResources.decode_arg, resources_args) 103 else: 104 resources_args = resources_args.items() 105 106 for res, val in resources_args: 107 if not valid.match(res): 108 raise ValueError( 109 "Resource definition must start with a valid identifier, but found {}.".format( 110 res 111 ) 112 ) 113 try: 114 val = int(val) 115 except ValueError: 116 if fallback is not None: 117 val = fallback(val) 118 else: 119 raise ValueError( 120 "Resource definiton must contain an integer after the identifier." 121 ) 122 if res == "_cores": 123 raise ValueError( 124 "Resource _cores is already defined internally. Use a different name." 125 ) 126 resources[res] = val 127 return resources 128