1__author__ = "Johannes Köster"
2__copyright__ = "Copyright 2021, Johannes Köster"
3__email__ = "johannes.koester@protonmail.com"
4__license__ = "MIT"
5
6from functools import update_wrapper
7import itertools
8import platform
9import hashlib
10import inspect
11import uuid
12import os
13import asyncio
14import sys
15import collections
16from pathlib import Path
17
18from snakemake._version import get_versions
19
20__version__ = get_versions()["version"]
21del get_versions
22
23
24MIN_PY_VERSION = (3, 5)
25DYNAMIC_FILL = "__snakemake_dynamic__"
26SNAKEMAKE_SEARCHPATH = str(Path(__file__).parent.parent.parent)
27UUID_NAMESPACE = uuid.uuid5(uuid.NAMESPACE_URL, "https://snakemake.readthedocs.io")
28NOTHING_TO_BE_DONE_MSG = (
29    "Nothing to be done (all requested files are present and up to date)."
30)
31
32ON_WINDOWS = platform.system() == "Windows"
33
34
35if sys.version_info < (3, 7):
36
37    def async_run(coroutine):
38        loop = asyncio.get_event_loop()
39        return loop.run_until_complete(coroutine)
40
41
42else:
43
44    def async_run(coroutine):
45        """Attaches to running event loop or creates a new one to execute a
46        coroutine.
47
48        .. seealso::
49
50             https://github.com/snakemake/snakemake/issues/1105
51             https://stackoverflow.com/a/65696398
52
53        """
54        try:
55            _ = asyncio.get_running_loop()
56        except RuntimeError:
57            asyncio.run(coroutine)
58        else:
59            asyncio.create_task(coroutine)
60
61
62# A string that prints as TBD
63class TBDString(str):
64    # the second arg is necessary to avoid problems when pickling
65    def __new__(cls, _=None):
66        return str.__new__(cls, "<TBD>")
67
68
69APPDIRS = None
70
71
72RULEFUNC_CONTEXT_MARKER = "__is_snakemake_rule_func"
73
74
75def get_appdirs():
76    global APPDIRS
77    if APPDIRS is None:
78        from appdirs import AppDirs
79
80        APPDIRS = AppDirs("snakemake", "snakemake")
81    return APPDIRS
82
83
84def is_local_file(path_or_uri):
85    return parse_uri(path_or_uri).scheme == "file"
86
87
88def parse_uri(path_or_uri):
89    from smart_open import parse_uri
90
91    try:
92        return parse_uri(path_or_uri)
93    except NotImplementedError as e:
94        # Snakemake sees a lot of URIs which are not supported by smart_open yet
95        # "docker", "git+file", "shub", "ncbi","root","roots","rootk", "gsiftp",
96        # "srm","ega","ab","dropbox"
97        # Fall back to a simple split if we encounter something which isn't supported.
98        scheme, _, uri_path = path_or_uri.partition("://")
99        if scheme and uri_path:
100            uri = collections.namedtuple("Uri", ["scheme", "uri_path"])
101            return uri(scheme, uri_path)
102        else:
103            raise e
104
105
106def smart_join(base, path, abspath=False):
107    if is_local_file(base):
108        full = os.path.join(base, path)
109        if abspath:
110            return os.path.abspath(full)
111        return full
112    else:
113        from smart_open import parse_uri
114
115        uri = parse_uri("{}/{}".format(base, path))
116        if not ON_WINDOWS:
117            # Norm the path such that it does not contain any ../,
118            # which is invalid in an URL.
119            assert uri.uri_path[0] == "/"
120            uri_path = os.path.normpath(uri.uri_path)
121        else:
122            uri_path = uri.uri_path
123        return "{scheme}:/{uri_path}".format(scheme=uri.scheme, uri_path=uri_path)
124
125
126def num_if_possible(s):
127    """Convert string to number if possible, otherwise return string."""
128    try:
129        return int(s)
130    except ValueError:
131        try:
132            return float(s)
133        except ValueError:
134            return s
135
136
137def get_last_stable_version():
138    return __version__.split("+")[0]
139
140
141def get_container_image():
142    return "snakemake/snakemake:v{}".format(get_last_stable_version())
143
144
145def get_uuid(name):
146    return uuid.uuid5(UUID_NAMESPACE, name)
147
148
149def get_file_hash(filename, algorithm="sha256"):
150    """find the SHA256 hash string of a file. We use this so that the
151    user can choose to cache working directories in storage.
152    """
153    from snakemake.logging import logger
154
155    # The algorithm must be available
156    try:
157        hasher = hashlib.new(algorithm)
158    except ValueError as ex:
159        logger.error("%s is not an available algorithm." % algorithm)
160        raise ex
161
162    with open(filename, "rb") as f:
163        for chunk in iter(lambda: f.read(4096), b""):
164            hasher.update(chunk)
165    return hasher.hexdigest()
166
167
168def bytesto(bytes, to, bsize=1024):
169    """convert bytes to megabytes.
170    bytes to mb: bytesto(bytes, 'm')
171    bytes to gb: bytesto(bytes, 'g' etc.
172    From https://gist.github.com/shawnbutts/3906915
173    """
174    levels = {"k": 1, "m": 2, "g": 3, "t": 4, "p": 5, "e": 6}
175    answer = float(bytes)
176    for _ in range(levels[to]):
177        answer = answer / bsize
178    return answer
179
180
181class Mode:
182    """
183    Enum for execution mode of Snakemake.
184    This handles the behavior of e.g. the logger.
185    """
186
187    default = 0
188    subprocess = 1
189    cluster = 2
190
191
192class lazy_property(property):
193    __slots__ = ["method", "cached", "__doc__"]
194
195    @staticmethod
196    def clean(instance, method):
197        delattr(instance, method)
198
199    def __init__(self, method):
200        self.method = method
201        self.cached = "_{}".format(method.__name__)
202        super().__init__(method, doc=method.__doc__)
203
204    def __get__(self, instance, owner):
205        cached = (
206            getattr(instance, self.cached) if hasattr(instance, self.cached) else None
207        )
208        if cached is not None:
209            return cached
210        value = self.method(instance)
211        setattr(instance, self.cached, value)
212        return value
213
214
215def strip_prefix(text, prefix):
216    if text.startswith(prefix):
217        return text[len(prefix) :]
218    return text
219
220
221def log_location(msg):
222    from snakemake.logging import logger
223
224    callerframerecord = inspect.stack()[1]
225    frame = callerframerecord[0]
226    info = inspect.getframeinfo(frame)
227    logger.debug(
228        "{}: {info.filename}, {info.function}, {info.lineno}".format(msg, info=info)
229    )
230
231
232def group_into_chunks(n, iterable):
233    """Group iterable into chunks of size at most n.
234
235    See https://stackoverflow.com/a/8998040.
236    """
237    it = iter(iterable)
238    while True:
239        chunk = tuple(itertools.islice(it, n))
240        if not chunk:
241            return
242        yield chunk
243
244
245class Rules:
246    """A namespace for rules so that they can be accessed via dot notation."""
247
248    pass
249
250
251class Scatter:
252    """A namespace for scatter to allow items to be accessed via dot notation."""
253
254    pass
255
256
257class Gather:
258    """A namespace for gather to allow items to be accessed via dot notation."""
259
260    pass
261