1import errno
2import fcntl
3import functools
4import inspect
5import json
6import logging
7import os
8import select
9import shlex
10import shutil
11import struct
12import sys
13import tempfile
14import textwrap
15import time
16import traceback
17from subprocess import Popen, PIPE
18
19from . import __version__
20from .compress import LZ4
21from .constants import *  # NOQA
22from .helpers import Error, IntegrityError
23from .helpers import bin_to_hex
24from .helpers import get_base_dir
25from .helpers import get_limited_unpacker
26from .helpers import hostname_is_unique
27from .helpers import replace_placeholders
28from .helpers import sysinfo
29from .helpers import format_file_size
30from .helpers import truncate_and_unlink
31from .helpers import prepare_subprocess_env
32from .logger import create_logger, setup_logging
33from .helpers import msgpack
34from .repository import Repository
35from .version import parse_version, format_version
36from .algorithms.checksums import xxh64
37
38logger = create_logger(__name__)
39
40RPC_PROTOCOL_VERSION = 2
41BORG_VERSION = parse_version(__version__)
42MSGID, MSG, ARGS, RESULT = b'i', b'm', b'a', b'r'
43
44MAX_INFLIGHT = 100
45
46RATELIMIT_PERIOD = 0.1
47
48
49def os_write(fd, data):
50    """os.write wrapper so we do not lose data for partial writes."""
51    # TODO: this issue is fixed in cygwin since at least 2.8.0, remove this
52    #       wrapper / workaround when this version is considered ancient.
53    # This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
54    # and also due to its different blocking pipe behaviour compared to Linux/*BSD.
55    # Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
56    # signal, in which case serve() would terminate.
57    amount = remaining = len(data)
58    while remaining:
59        count = os.write(fd, data)
60        remaining -= count
61        if not remaining:
62            break
63        data = data[count:]
64        time.sleep(count * 1e-09)
65    return amount
66
67
68class ConnectionClosed(Error):
69    """Connection closed by remote host"""
70
71
72class ConnectionClosedWithHint(ConnectionClosed):
73    """Connection closed by remote host. {}"""
74
75
76class PathNotAllowed(Error):
77    """Repository path not allowed: {}"""
78
79
80class InvalidRPCMethod(Error):
81    """RPC method {} is not valid"""
82
83
84class UnexpectedRPCDataFormatFromClient(Error):
85    """Borg {}: Got unexpected RPC data format from client."""
86
87
88class UnexpectedRPCDataFormatFromServer(Error):
89    """Got unexpected RPC data format from server:\n{}"""
90
91    def __init__(self, data):
92        try:
93            data = data.decode()[:128]
94        except UnicodeDecodeError:
95            data = data[:128]
96            data = ['%02X' % byte for byte in data]
97            data = textwrap.fill(' '.join(data), 16 * 3)
98        super().__init__(data)
99
100
101# Protocol compatibility:
102# In general the server is responsible for rejecting too old clients and the client it responsible for rejecting
103# too old servers. This ensures that the knowledge what is compatible is always held by the newer component.
104#
105# The server can do checks for the client version in RepositoryServer.negotiate. If the client_data is 2 then
106# client is in the version range [0.29.0, 1.0.x] inclusive. For newer clients client_data is a dict which contains
107# client_version.
108#
109# For the client the return of the negotiate method is either 2 if the server is in the version range [0.29.0, 1.0.x]
110# inclusive, or it is a dict which includes the server version.
111#
112# All method calls on the remote repository object must be allowlisted in RepositoryServer.rpc_methods and have api
113# stubs in RemoteRepository. The @api decorator on these stubs is used to set server version requirements.
114#
115# Method parameters are identified only by name and never by position. Unknown parameters are ignored by the server side.
116# If a new parameter is important and may not be ignored, on the client a parameter specific version requirement needs
117# to be added.
118# When parameters are removed, they need to be preserved as defaulted parameters on the client stubs so that older
119# servers still get compatible input.
120
121
122compatMap = {
123    'check': ('repair', 'save_space', ),
124    'commit': ('save_space', ),
125    'rollback': (),
126    'destroy': (),
127    '__len__': (),
128    'list': ('limit', 'marker', ),
129    'put': ('id', 'data', ),
130    'get': ('id', ),
131    'delete': ('id', ),
132    'save_key': ('keydata', ),
133    'load_key': (),
134    'break_lock': (),
135    'negotiate': ('client_data', ),
136    'open': ('path', 'create', 'lock_wait', 'lock', 'exclusive', 'append_only', ),
137    'get_free_nonce': (),
138    'commit_nonce_reservation': ('next_unreserved', 'start_nonce', ),
139}
140
141
142def decode_keys(d):
143    return {k.decode(): d[k] for k in d}
144
145
146class RepositoryServer:  # pragma: no cover
147    rpc_methods = (
148        '__len__',
149        'check',
150        'commit',
151        'delete',
152        'destroy',
153        'get',
154        'list',
155        'scan',
156        'negotiate',
157        'open',
158        'put',
159        'rollback',
160        'save_key',
161        'load_key',
162        'break_lock',
163        'get_free_nonce',
164        'commit_nonce_reservation',
165        'inject_exception',
166    )
167
168    def __init__(self, restrict_to_paths, restrict_to_repositories, append_only, storage_quota):
169        self.repository = None
170        self.restrict_to_paths = restrict_to_paths
171        self.restrict_to_repositories = restrict_to_repositories
172        # This flag is parsed from the serve command line via Archiver.do_serve,
173        # i.e. it reflects local system policy and generally ranks higher than
174        # whatever the client wants, except when initializing a new repository
175        # (see RepositoryServer.open below).
176        self.append_only = append_only
177        self.storage_quota = storage_quota
178        self.client_version = parse_version('1.0.8')  # fallback version if client is too old to send version information
179
180    def positional_to_named(self, method, argv):
181        """Translate from positional protocol to named protocol."""
182        try:
183            return {name: argv[pos] for pos, name in enumerate(compatMap[method])}
184        except IndexError:
185            if method == 'open' and len(argv) == 4:
186                # borg clients < 1.0.7 use open() with 4 args
187                mapping = compatMap[method][:4]
188            else:
189                raise
190            return {name: argv[pos] for pos, name in enumerate(mapping)}
191
192    def filter_args(self, f, kwargs):
193        """Remove unknown named parameters from call, because client did (implicitly) say it's ok."""
194        known = set(inspect.signature(f).parameters)
195        return {name: kwargs[name] for name in kwargs if name in known}
196
197    def serve(self):
198        stdin_fd = sys.stdin.fileno()
199        stdout_fd = sys.stdout.fileno()
200        stderr_fd = sys.stdout.fileno()
201        # Make stdin non-blocking
202        fl = fcntl.fcntl(stdin_fd, fcntl.F_GETFL)
203        fcntl.fcntl(stdin_fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
204        # Make stdout blocking
205        fl = fcntl.fcntl(stdout_fd, fcntl.F_GETFL)
206        fcntl.fcntl(stdout_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
207        # Make stderr blocking
208        fl = fcntl.fcntl(stderr_fd, fcntl.F_GETFL)
209        fcntl.fcntl(stderr_fd, fcntl.F_SETFL, fl & ~os.O_NONBLOCK)
210        unpacker = get_limited_unpacker('server')
211        while True:
212            r, w, es = select.select([stdin_fd], [], [], 10)
213            if r:
214                data = os.read(stdin_fd, BUFSIZE)
215                if not data:
216                    if self.repository is not None:
217                        self.repository.close()
218                    else:
219                        os_write(stderr_fd, 'Borg {}: Got connection close before repository was opened.\n'
220                                 .format(__version__).encode())
221                    return
222                unpacker.feed(data)
223                for unpacked in unpacker:
224                    if isinstance(unpacked, dict):
225                        dictFormat = True
226                        msgid = unpacked[MSGID]
227                        method = unpacked[MSG].decode()
228                        args = decode_keys(unpacked[ARGS])
229                    elif isinstance(unpacked, tuple) and len(unpacked) == 4:
230                        dictFormat = False
231                        # The first field 'type' was always 1 and has always been ignored
232                        _, msgid, method, args = unpacked
233                        method = method.decode()
234                        args = self.positional_to_named(method, args)
235                    else:
236                        if self.repository is not None:
237                            self.repository.close()
238                        raise UnexpectedRPCDataFormatFromClient(__version__)
239                    try:
240                        if method not in self.rpc_methods:
241                            raise InvalidRPCMethod(method)
242                        try:
243                            f = getattr(self, method)
244                        except AttributeError:
245                            f = getattr(self.repository, method)
246                        args = self.filter_args(f, args)
247                        res = f(**args)
248                    except BaseException as e:
249                        if dictFormat:
250                            ex_short = traceback.format_exception_only(e.__class__, e)
251                            ex_full = traceback.format_exception(*sys.exc_info())
252                            ex_trace = True
253                            if isinstance(e, Error):
254                                ex_short = [e.get_message()]
255                                ex_trace = e.traceback
256                            if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
257                                # These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
258                                # and will be handled just like locally raised exceptions. Suppress the remote traceback
259                                # for these, except ErrorWithTraceback, which should always display a traceback.
260                                pass
261                            else:
262                                logging.debug('\n'.join(ex_full))
263
264                            try:
265                                msg = msgpack.packb({MSGID: msgid,
266                                                    b'exception_class': e.__class__.__name__,
267                                                    b'exception_args': e.args,
268                                                    b'exception_full': ex_full,
269                                                    b'exception_short': ex_short,
270                                                    b'exception_trace': ex_trace,
271                                                    b'sysinfo': sysinfo()})
272                            except TypeError:
273                                msg = msgpack.packb({MSGID: msgid,
274                                                    b'exception_class': e.__class__.__name__,
275                                                    b'exception_args': [x if isinstance(x, (str, bytes, int)) else None
276                                                                        for x in e.args],
277                                                    b'exception_full': ex_full,
278                                                    b'exception_short': ex_short,
279                                                    b'exception_trace': ex_trace,
280                                                    b'sysinfo': sysinfo()})
281
282                            os_write(stdout_fd, msg)
283                        else:
284                            if isinstance(e, (Repository.DoesNotExist, Repository.AlreadyExists, PathNotAllowed)):
285                                # These exceptions are reconstructed on the client end in RemoteRepository.call_many(),
286                                # and will be handled just like locally raised exceptions. Suppress the remote traceback
287                                # for these, except ErrorWithTraceback, which should always display a traceback.
288                                pass
289                            else:
290                                if isinstance(e, Error):
291                                    tb_log_level = logging.ERROR if e.traceback else logging.DEBUG
292                                    msg = e.get_message()
293                                else:
294                                    tb_log_level = logging.ERROR
295                                    msg = '%s Exception in RPC call' % e.__class__.__name__
296                                tb = '%s\n%s' % (traceback.format_exc(), sysinfo())
297                                logging.error(msg)
298                                logging.log(tb_log_level, tb)
299                            exc = 'Remote Exception (see remote log for the traceback)'
300                            os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
301                    else:
302                        if dictFormat:
303                            os_write(stdout_fd, msgpack.packb({MSGID: msgid, RESULT: res}))
304                        else:
305                            os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
306            if es:
307                self.repository.close()
308                return
309
310    def negotiate(self, client_data):
311        # old format used in 1.0.x
312        if client_data == RPC_PROTOCOL_VERSION:
313            return RPC_PROTOCOL_VERSION
314        # clients since 1.1.0b3 use a dict as client_data
315        # clients since 1.1.0b6 support json log format from server
316        if isinstance(client_data, dict):
317            self.client_version = client_data[b'client_version']
318            level = logging.getLevelName(logging.getLogger('').level)
319            setup_logging(is_serve=True, json=True, level=level)
320            logger.debug('Initialized logging system for JSON-based protocol')
321        else:
322            self.client_version = BORG_VERSION  # seems to be newer than current version (no known old format)
323
324        # not a known old format, send newest negotiate this version knows
325        return {'server_version': BORG_VERSION}
326
327    def _resolve_path(self, path):
328        if isinstance(path, bytes):
329            path = os.fsdecode(path)
330        # Leading slash is always present with URI (ssh://), but not with short-form (who@host:path).
331        if path.startswith('/~/'):  # /~/x = path x relative to home dir
332            path = os.path.join(get_base_dir(), path[3:])
333        elif path.startswith('~/'):
334            path = os.path.join(get_base_dir(), path[2:])
335        elif path.startswith('/~'):  # /~username/x = relative to "user" home dir
336            path = os.path.expanduser(path[1:])
337        elif path.startswith('~'):
338            path = os.path.expanduser(path)
339        elif path.startswith('/./'):  # /./x = path x relative to cwd
340            path = path[3:]
341        return os.path.realpath(path)
342
343    def open(self, path, create=False, lock_wait=None, lock=True, exclusive=None, append_only=False,
344             make_parent_dirs=False):
345        logging.debug('Resolving repository path %r', path)
346        path = self._resolve_path(path)
347        logging.debug('Resolved repository path to %r', path)
348        path_with_sep = os.path.join(path, '')  # make sure there is a trailing slash (os.sep)
349        if self.restrict_to_paths:
350            # if --restrict-to-path P is given, we make sure that we only operate in/below path P.
351            # for the prefix check, it is important that the compared paths both have trailing slashes,
352            # so that a path /foobar will NOT be accepted with --restrict-to-path /foo option.
353            for restrict_to_path in self.restrict_to_paths:
354                restrict_to_path_with_sep = os.path.join(os.path.realpath(restrict_to_path), '')  # trailing slash
355                if path_with_sep.startswith(restrict_to_path_with_sep):
356                    break
357            else:
358                raise PathNotAllowed(path)
359        if self.restrict_to_repositories:
360            for restrict_to_repository in self.restrict_to_repositories:
361                restrict_to_repository_with_sep = os.path.join(os.path.realpath(restrict_to_repository), '')
362                if restrict_to_repository_with_sep == path_with_sep:
363                    break
364            else:
365                raise PathNotAllowed(path)
366        # "borg init" on "borg serve --append-only" (=self.append_only) does not create an append only repo,
367        # while "borg init --append-only" (=append_only) does, regardless of the --append-only (self.append_only)
368        # flag for serve.
369        append_only = (not create and self.append_only) or append_only
370        self.repository = Repository(path, create, lock_wait=lock_wait, lock=lock,
371                                     append_only=append_only,
372                                     storage_quota=self.storage_quota,
373                                     exclusive=exclusive,
374                                     make_parent_dirs=make_parent_dirs)
375        self.repository.__enter__()  # clean exit handled by serve() method
376        return self.repository.id
377
378    def inject_exception(self, kind):
379        kind = kind.decode()
380        s1 = 'test string'
381        s2 = 'test string2'
382        if kind == 'DoesNotExist':
383            raise Repository.DoesNotExist(s1)
384        elif kind == 'AlreadyExists':
385            raise Repository.AlreadyExists(s1)
386        elif kind == 'CheckNeeded':
387            raise Repository.CheckNeeded(s1)
388        elif kind == 'IntegrityError':
389            raise IntegrityError(s1)
390        elif kind == 'PathNotAllowed':
391            raise PathNotAllowed('foo')
392        elif kind == 'ObjectNotFound':
393            raise Repository.ObjectNotFound(s1, s2)
394        elif kind == 'InvalidRPCMethod':
395            raise InvalidRPCMethod(s1)
396        elif kind == 'divide':
397            0 // 0
398
399
400class SleepingBandwidthLimiter:
401    def __init__(self, limit):
402        if limit:
403            self.ratelimit = int(limit * RATELIMIT_PERIOD)
404            self.ratelimit_last = time.monotonic()
405            self.ratelimit_quota = self.ratelimit
406        else:
407            self.ratelimit = None
408
409    def write(self, fd, to_send):
410        if self.ratelimit:
411            now = time.monotonic()
412            if self.ratelimit_last + RATELIMIT_PERIOD <= now:
413                self.ratelimit_quota += self.ratelimit
414                if self.ratelimit_quota > 2 * self.ratelimit:
415                    self.ratelimit_quota = 2 * self.ratelimit
416                self.ratelimit_last = now
417            if self.ratelimit_quota == 0:
418                tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
419                time.sleep(tosleep)
420                self.ratelimit_quota += self.ratelimit
421                self.ratelimit_last = time.monotonic()
422            if len(to_send) > self.ratelimit_quota:
423                to_send = to_send[:self.ratelimit_quota]
424        written = os.write(fd, to_send)
425        if self.ratelimit:
426            self.ratelimit_quota -= written
427        return written
428
429
430def api(*, since, **kwargs_decorator):
431    """Check version requirements and use self.call to do the remote method call.
432
433    <since> specifies the version in which borg introduced this method,
434    calling this method when connected to an older version will fail without transmiting
435    anything to the server.
436
437    Further kwargs can be used to encode version specific restrictions.
438    If a previous hardcoded behaviour is parameterized in a version, this allows calls that
439    use the previously hardcoded behaviour to pass through and generates an error if another
440    behaviour is requested by the client.
441
442    e.g. when 'append_only' was introduced in 1.0.7 the previous behaviour was what now is append_only=False.
443    Thus @api(..., append_only={'since': parse_version('1.0.7'), 'previously': False}) allows calls
444    with append_only=False for all version but rejects calls using append_only=True on versions older than 1.0.7.
445    """
446    def decorator(f):
447        @functools.wraps(f)
448        def do_rpc(self, *args, **kwargs):
449            sig = inspect.signature(f)
450            bound_args = sig.bind(self, *args, **kwargs)
451            named = {}  # Arguments for the remote process
452            extra = {}  # Arguments for the local process
453            for name, param in sig.parameters.items():
454                if name == 'self':
455                    continue
456                if name in bound_args.arguments:
457                    if name == 'wait':
458                        extra[name] = bound_args.arguments[name]
459                    else:
460                        named[name] = bound_args.arguments[name]
461                else:
462                    if param.default is not param.empty:
463                        named[name] = param.default
464
465            if self.server_version < since:
466                raise self.RPCServerOutdated(f.__name__, format_version(since))
467
468            for name, restriction in kwargs_decorator.items():
469                if restriction['since'] <= self.server_version:
470                    continue
471                if 'previously' in restriction and named[name] == restriction['previously']:
472                    continue
473
474                raise self.RPCServerOutdated("{0} {1}={2!s}".format(f.__name__, name, named[name]),
475                                             format_version(restriction['since']))
476
477            return self.call(f.__name__, named, **extra)
478        return do_rpc
479    return decorator
480
481
482class RemoteRepository:
483    extra_test_args = []
484
485    class RPCError(Exception):
486        def __init__(self, unpacked):
487            # for borg < 1.1: unpacked only has b'exception_class' as key
488            # for borg 1.1+: unpacked has keys: b'exception_args', b'exception_full', b'exception_short', b'sysinfo'
489            self.unpacked = unpacked
490
491        def get_message(self):
492            if b'exception_short' in self.unpacked:
493                return b'\n'.join(self.unpacked[b'exception_short']).decode()
494            else:
495                return self.exception_class
496
497        @property
498        def traceback(self):
499            return self.unpacked.get(b'exception_trace', True)
500
501        @property
502        def exception_class(self):
503            return self.unpacked[b'exception_class'].decode()
504
505        @property
506        def exception_full(self):
507            if b'exception_full' in self.unpacked:
508                return b'\n'.join(self.unpacked[b'exception_full']).decode()
509            else:
510                return self.get_message() + '\nRemote Exception (see remote log for the traceback)'
511
512        @property
513        def sysinfo(self):
514            if b'sysinfo' in self.unpacked:
515                return self.unpacked[b'sysinfo'].decode()
516            else:
517                return ''
518
519    class RPCServerOutdated(Error):
520        """Borg server is too old for {}. Required version {}"""
521
522        @property
523        def method(self):
524            return self.args[0]
525
526        @property
527        def required_version(self):
528            return self.args[1]
529
530    # If compatibility with 1.0.x is not longer needed, replace all checks of this with True and simplify the code
531    dictFormat = False  # outside of __init__ for testing of legacy free protocol
532
533    def __init__(self, location, create=False, exclusive=False, lock_wait=None, lock=True, append_only=False,
534                 make_parent_dirs=False, args=None):
535        self.location = self._location = location
536        self.preload_ids = []
537        self.msgid = 0
538        self.rx_bytes = 0
539        self.tx_bytes = 0
540        self.to_send = b''
541        self.stderr_received = b''  # incomplete stderr line bytes received (no \n yet)
542        self.chunkid_to_msgids = {}
543        self.ignore_responses = set()
544        self.responses = {}
545        self.async_responses = {}
546        self.shutdown_time = None
547        self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
548        self.unpacker = get_limited_unpacker('client')
549        self.server_version = parse_version('1.0.8')  # fallback version if server is too old to send version information
550        self.p = None
551        self._args = args
552        testing = location.host == '__testsuite__'
553        # when testing, we invoke and talk to a borg process directly (no ssh).
554        # when not testing, we invoke the system-installed ssh binary to talk to a remote borg.
555        env = prepare_subprocess_env(system=not testing)
556        borg_cmd = self.borg_cmd(args, testing)
557        if not testing:
558            borg_cmd = self.ssh_cmd(location) + borg_cmd
559        logger.debug('SSH command line: %s', borg_cmd)
560        self.p = Popen(borg_cmd, bufsize=0, stdin=PIPE, stdout=PIPE, stderr=PIPE, env=env)
561        self.stdin_fd = self.p.stdin.fileno()
562        self.stdout_fd = self.p.stdout.fileno()
563        self.stderr_fd = self.p.stderr.fileno()
564        fcntl.fcntl(self.stdin_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdin_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
565        fcntl.fcntl(self.stdout_fd, fcntl.F_SETFL, fcntl.fcntl(self.stdout_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
566        fcntl.fcntl(self.stderr_fd, fcntl.F_SETFL, fcntl.fcntl(self.stderr_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
567        self.r_fds = [self.stdout_fd, self.stderr_fd]
568        self.x_fds = [self.stdin_fd, self.stdout_fd, self.stderr_fd]
569
570        try:
571            try:
572                version = self.call('negotiate', {'client_data': {
573                    b'client_version': BORG_VERSION,
574                }})
575            except ConnectionClosed:
576                raise ConnectionClosedWithHint('Is borg working on the server?') from None
577            if version == RPC_PROTOCOL_VERSION:
578                self.dictFormat = False
579            elif isinstance(version, dict) and b'server_version' in version:
580                self.dictFormat = True
581                self.server_version = version[b'server_version']
582            else:
583                raise Exception('Server insisted on using unsupported protocol version %s' % version)
584
585            def do_open():
586                self.id = self.open(path=self.location.path, create=create, lock_wait=lock_wait,
587                                    lock=lock, exclusive=exclusive, append_only=append_only,
588                                    make_parent_dirs=make_parent_dirs)
589
590            if self.dictFormat:
591                do_open()
592            else:
593                # Ugly detection of versions prior to 1.0.7: If open throws it has to be 1.0.6 or lower
594                try:
595                    do_open()
596                except self.RPCError as err:
597                    if err.exception_class != 'TypeError':
598                        raise
599                    msg = """\
600Please note:
601If you see a TypeError complaining about the number of positional arguments
602given to open(), you can ignore it if it comes from a borg version < 1.0.7.
603This TypeError is a cosmetic side effect of the compatibility code borg
604clients >= 1.0.7 have to support older borg servers.
605This problem will go away as soon as the server has been upgraded to 1.0.7+.
606"""
607                    # emit this msg in the same way as the 'Remote: ...' lines that show the remote TypeError
608                    sys.stderr.write(msg)
609                    self.server_version = parse_version('1.0.6')
610                    compatMap['open'] = ('path', 'create', 'lock_wait', 'lock', )
611                    # try again with corrected version and compatMap
612                    do_open()
613        except Exception:
614            self.close()
615            raise
616
617    def __del__(self):
618        if len(self.responses):
619            logging.debug('still %d cached responses left in RemoteRepository' % (len(self.responses),))
620        if self.p:
621            self.close()
622            assert False, 'cleanup happened in Repository.__del__'
623
624    def __repr__(self):
625        return '<%s %s>' % (self.__class__.__name__, self.location.canonical_path())
626
627    def __enter__(self):
628        return self
629
630    def __exit__(self, exc_type, exc_val, exc_tb):
631        try:
632            if exc_type is not None:
633                self.shutdown_time = time.monotonic() + 30
634                self.rollback()
635        finally:
636            # in any case, we want to cleanly close the repo, even if the
637            # rollback can not succeed (e.g. because the connection was
638            # already closed) and raised another exception:
639            logger.debug('RemoteRepository: %s bytes sent, %s bytes received, %d messages sent',
640                         format_file_size(self.tx_bytes), format_file_size(self.rx_bytes), self.msgid)
641            self.close()
642
643    @property
644    def id_str(self):
645        return bin_to_hex(self.id)
646
647    def borg_cmd(self, args, testing):
648        """return a borg serve command line"""
649        # give some args/options to 'borg serve' process as they were given to us
650        opts = []
651        if args is not None:
652            opts.append('--umask=%03o' % args.umask)
653            root_logger = logging.getLogger()
654            if root_logger.isEnabledFor(logging.DEBUG):
655                opts.append('--debug')
656            elif root_logger.isEnabledFor(logging.INFO):
657                opts.append('--info')
658            elif root_logger.isEnabledFor(logging.WARNING):
659                pass  # warning is default
660            elif root_logger.isEnabledFor(logging.ERROR):
661                opts.append('--error')
662            elif root_logger.isEnabledFor(logging.CRITICAL):
663                opts.append('--critical')
664            else:
665                raise ValueError('log level missing, fix this code')
666
667            # Tell the remote server about debug topics it may need to consider.
668            # Note that debug topics are usable for "spew" or "trace" logs which would
669            # be too plentiful to transfer for normal use, so the server doesn't send
670            # them unless explicitly enabled.
671            #
672            # Needless to say, if you do --debug-topic=repository.compaction, for example,
673            # with a 1.0.x server it won't work, because the server does not recognize the
674            # option.
675            #
676            # This is not considered a problem, since this is a debugging feature that
677            # should not be used for regular use.
678            for topic in args.debug_topics:
679                if '.' not in topic:
680                    topic = 'borg.debug.' + topic
681                if 'repository' in topic:
682                    opts.append('--debug-topic=%s' % topic)
683
684            if 'storage_quota' in args and args.storage_quota:
685                opts.append('--storage-quota=%s' % args.storage_quota)
686        env_vars = []
687        if not hostname_is_unique():
688            env_vars.append('BORG_HOSTNAME_IS_UNIQUE=no')
689        if testing:
690            return env_vars + [sys.executable, '-m', 'borg.archiver', 'serve'] + opts + self.extra_test_args
691        else:  # pragma: no cover
692            remote_path = args.remote_path or os.environ.get('BORG_REMOTE_PATH', 'borg')
693            remote_path = replace_placeholders(remote_path)
694            return env_vars + [remote_path, 'serve'] + opts
695
696    def ssh_cmd(self, location):
697        """return a ssh command line that can be prefixed to a borg command line"""
698        rsh = self._args.rsh or os.environ.get('BORG_RSH', 'ssh')
699        args = shlex.split(rsh)
700        if location.port:
701            args += ['-p', str(location.port)]
702        if location.user:
703            args.append('%s@%s' % (location.user, location.host))
704        else:
705            args.append('%s' % location.host)
706        return args
707
708    def named_to_positional(self, method, kwargs):
709        return [kwargs[name] for name in compatMap[method]]
710
711    def call(self, cmd, args, **kw):
712        for resp in self.call_many(cmd, [args], **kw):
713            return resp
714
715    def call_many(self, cmd, calls, wait=True, is_preloaded=False, async_wait=True):
716        if not calls and cmd != 'async_responses':
717            return
718
719        def pop_preload_msgid(chunkid):
720            msgid = self.chunkid_to_msgids[chunkid].pop(0)
721            if not self.chunkid_to_msgids[chunkid]:
722                del self.chunkid_to_msgids[chunkid]
723            return msgid
724
725        def handle_error(unpacked):
726            error = unpacked[b'exception_class'].decode()
727            old_server = b'exception_args' not in unpacked
728            args = unpacked.get(b'exception_args')
729
730            if error == 'DoesNotExist':
731                raise Repository.DoesNotExist(self.location.orig)
732            elif error == 'AlreadyExists':
733                raise Repository.AlreadyExists(self.location.orig)
734            elif error == 'CheckNeeded':
735                raise Repository.CheckNeeded(self.location.orig)
736            elif error == 'IntegrityError':
737                if old_server:
738                    raise IntegrityError('(not available)')
739                else:
740                    raise IntegrityError(args[0].decode())
741            elif error == 'AtticRepository':
742                if old_server:
743                    raise Repository.AtticRepository('(not available)')
744                else:
745                    raise Repository.AtticRepository(args[0].decode())
746            elif error == 'PathNotAllowed':
747                if old_server:
748                    raise PathNotAllowed('(unknown)')
749                else:
750                    raise PathNotAllowed(args[0].decode())
751            elif error == 'ParentPathDoesNotExist':
752                raise Repository.ParentPathDoesNotExist(args[0].decode())
753            elif error == 'ObjectNotFound':
754                if old_server:
755                    raise Repository.ObjectNotFound('(not available)', self.location.orig)
756                else:
757                    raise Repository.ObjectNotFound(args[0].decode(), self.location.orig)
758            elif error == 'InvalidRPCMethod':
759                if old_server:
760                    raise InvalidRPCMethod('(not available)')
761                else:
762                    raise InvalidRPCMethod(args[0].decode())
763            else:
764                raise self.RPCError(unpacked)
765
766        calls = list(calls)
767        waiting_for = []
768        while wait or calls:
769            if self.shutdown_time and time.monotonic() > self.shutdown_time:
770                # we are shutting this RemoteRepository down already, make sure we do not waste
771                # a lot of time in case a lot of async stuff is coming in or remote is gone or slow.
772                logger.debug('shutdown_time reached, shutting down with %d waiting_for and %d async_responses.',
773                             len(waiting_for), len(self.async_responses))
774                return
775            while waiting_for:
776                try:
777                    unpacked = self.responses.pop(waiting_for[0])
778                    waiting_for.pop(0)
779                    if b'exception_class' in unpacked:
780                        handle_error(unpacked)
781                    else:
782                        yield unpacked[RESULT]
783                        if not waiting_for and not calls:
784                            return
785                except KeyError:
786                    break
787            if cmd == 'async_responses':
788                while True:
789                    try:
790                        msgid, unpacked = self.async_responses.popitem()
791                    except KeyError:
792                        # there is nothing left what we already have received
793                        if async_wait and self.ignore_responses:
794                            # but do not return if we shall wait and there is something left to wait for:
795                            break
796                        else:
797                            return
798                    else:
799                        if b'exception_class' in unpacked:
800                            handle_error(unpacked)
801                        else:
802                            yield unpacked[RESULT]
803            if self.to_send or ((calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT):
804                w_fds = [self.stdin_fd]
805            else:
806                w_fds = []
807            r, w, x = select.select(self.r_fds, w_fds, self.x_fds, 1)
808            if x:
809                raise Exception('FD exception occurred')
810            for fd in r:
811                if fd is self.stdout_fd:
812                    data = os.read(fd, BUFSIZE)
813                    if not data:
814                        raise ConnectionClosed()
815                    self.rx_bytes += len(data)
816                    self.unpacker.feed(data)
817                    for unpacked in self.unpacker:
818                        if isinstance(unpacked, dict):
819                            msgid = unpacked[MSGID]
820                        elif isinstance(unpacked, tuple) and len(unpacked) == 4:
821                            # The first field 'type' was always 1 and has always been ignored
822                            _, msgid, error, res = unpacked
823                            if error:
824                                # ignore res, because it is only a fixed string anyway.
825                                unpacked = {MSGID: msgid, b'exception_class': error}
826                            else:
827                                unpacked = {MSGID: msgid, RESULT: res}
828                        else:
829                            raise UnexpectedRPCDataFormatFromServer(data)
830                        if msgid in self.ignore_responses:
831                            self.ignore_responses.remove(msgid)
832                            # async methods never return values, but may raise exceptions.
833                            if b'exception_class' in unpacked:
834                                self.async_responses[msgid] = unpacked
835                            else:
836                                # we currently do not have async result values except "None",
837                                # so we do not add them into async_responses.
838                                if unpacked[RESULT] is not None:
839                                    self.async_responses[msgid] = unpacked
840                        else:
841                            self.responses[msgid] = unpacked
842                elif fd is self.stderr_fd:
843                    data = os.read(fd, 32768)
844                    if not data:
845                        raise ConnectionClosed()
846                    self.rx_bytes += len(data)
847                    # deal with incomplete lines (may appear due to block buffering)
848                    if self.stderr_received:
849                        data = self.stderr_received + data
850                        self.stderr_received = b''
851                    lines = data.splitlines(keepends=True)
852                    if lines and not lines[-1].endswith((b'\r', b'\n')):
853                        self.stderr_received = lines.pop()
854                    # now we have complete lines in <lines> and any partial line in self.stderr_received.
855                    for line in lines:
856                        handle_remote_line(line.decode('utf-8'))  # decode late, avoid partial utf-8 sequences
857            if w:
858                while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
859                    if calls:
860                        if is_preloaded:
861                            assert cmd == 'get', "is_preload is only supported for 'get'"
862                            if calls[0]['id'] in self.chunkid_to_msgids:
863                                waiting_for.append(pop_preload_msgid(calls.pop(0)['id']))
864                        else:
865                            args = calls.pop(0)
866                            if cmd == 'get' and args['id'] in self.chunkid_to_msgids:
867                                waiting_for.append(pop_preload_msgid(args['id']))
868                            else:
869                                self.msgid += 1
870                                waiting_for.append(self.msgid)
871                                if self.dictFormat:
872                                    self.to_send = msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})
873                                else:
874                                    self.to_send = msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))
875                    if not self.to_send and self.preload_ids:
876                        chunk_id = self.preload_ids.pop(0)
877                        args = {'id': chunk_id}
878                        self.msgid += 1
879                        self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid)
880                        if self.dictFormat:
881                            self.to_send = msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})
882                        else:
883                            self.to_send = msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args)))
884
885                if self.to_send:
886                    try:
887                        written = self.ratelimit.write(self.stdin_fd, self.to_send)
888                        self.tx_bytes += written
889                        self.to_send = self.to_send[written:]
890                    except OSError as e:
891                        # io.write might raise EAGAIN even though select indicates
892                        # that the fd should be writable
893                        if e.errno != errno.EAGAIN:
894                            raise
895        self.ignore_responses |= set(waiting_for)  # we lose order here
896
897    @api(since=parse_version('1.0.0'),
898         append_only={'since': parse_version('1.0.7'), 'previously': False},
899         make_parent_dirs={'since': parse_version('1.1.9'), 'previously': False})
900    def open(self, path, create=False, lock_wait=None, lock=True, exclusive=False, append_only=False,
901             make_parent_dirs=False):
902        """actual remoting is done via self.call in the @api decorator"""
903
904    @api(since=parse_version('1.0.0'))
905    def check(self, repair=False, save_space=False):
906        """actual remoting is done via self.call in the @api decorator"""
907
908    @api(since=parse_version('1.0.0'))
909    def commit(self, save_space=False):
910        """actual remoting is done via self.call in the @api decorator"""
911
912    @api(since=parse_version('1.0.0'))
913    def rollback(self):
914        """actual remoting is done via self.call in the @api decorator"""
915
916    @api(since=parse_version('1.0.0'))
917    def destroy(self):
918        """actual remoting is done via self.call in the @api decorator"""
919
920    @api(since=parse_version('1.0.0'))
921    def __len__(self):
922        """actual remoting is done via self.call in the @api decorator"""
923
924    @api(since=parse_version('1.0.0'))
925    def list(self, limit=None, marker=None):
926        """actual remoting is done via self.call in the @api decorator"""
927
928    @api(since=parse_version('1.1.0b3'))
929    def scan(self, limit=None, marker=None):
930        """actual remoting is done via self.call in the @api decorator"""
931
932    def get(self, id):
933        for resp in self.get_many([id]):
934            return resp
935
936    def get_many(self, ids, is_preloaded=False):
937        for resp in self.call_many('get', [{'id': id} for id in ids], is_preloaded=is_preloaded):
938            yield resp
939
940    @api(since=parse_version('1.0.0'))
941    def put(self, id, data, wait=True):
942        """actual remoting is done via self.call in the @api decorator"""
943
944    @api(since=parse_version('1.0.0'))
945    def delete(self, id, wait=True):
946        """actual remoting is done via self.call in the @api decorator"""
947
948    @api(since=parse_version('1.0.0'))
949    def save_key(self, keydata):
950        """actual remoting is done via self.call in the @api decorator"""
951
952    @api(since=parse_version('1.0.0'))
953    def load_key(self):
954        """actual remoting is done via self.call in the @api decorator"""
955
956    @api(since=parse_version('1.0.0'))
957    def get_free_nonce(self):
958        """actual remoting is done via self.call in the @api decorator"""
959
960    @api(since=parse_version('1.0.0'))
961    def commit_nonce_reservation(self, next_unreserved, start_nonce):
962        """actual remoting is done via self.call in the @api decorator"""
963
964    @api(since=parse_version('1.0.0'))
965    def break_lock(self):
966        """actual remoting is done via self.call in the @api decorator"""
967
968    def close(self):
969        if self.p:
970            self.p.stdin.close()
971            self.p.stdout.close()
972            self.p.wait()
973            self.p = None
974
975    def async_response(self, wait=True):
976        for resp in self.call_many('async_responses', calls=[], wait=True, async_wait=wait):
977            return resp
978
979    def preload(self, ids):
980        self.preload_ids += ids
981
982
983def handle_remote_line(line):
984    """
985    Handle a remote log line.
986
987    This function is remarkably complex because it handles multiple wire formats.
988    """
989    assert line.endswith(('\r', '\n'))
990    if line.startswith('{'):
991        # This format is used by Borg since 1.1.0b6 for new-protocol clients.
992        # It is the same format that is exposed by --log-json.
993        msg = json.loads(line)
994
995        if msg['type'] not in ('progress_message', 'progress_percent', 'log_message'):
996            logger.warning('Dropped remote log message with unknown type %r: %s', msg['type'], line)
997            return
998
999        if msg['type'] == 'log_message':
1000            # Re-emit log messages on the same level as the remote to get correct log suppression and verbosity.
1001            level = getattr(logging, msg['levelname'], logging.CRITICAL)
1002            assert isinstance(level, int)
1003            target_logger = logging.getLogger(msg['name'])
1004            msg['message'] = 'Remote: ' + msg['message']
1005            # In JSON mode, we manually check whether the log message should be propagated.
1006            if logging.getLogger('borg').json and level >= target_logger.getEffectiveLevel():
1007                sys.stderr.write(json.dumps(msg) + '\n')
1008            else:
1009                target_logger.log(level, '%s', msg['message'])
1010        elif msg['type'].startswith('progress_'):
1011            # Progress messages are a bit more complex.
1012            # First of all, we check whether progress output is enabled. This is signalled
1013            # through the effective level of the borg.output.progress logger
1014            # (also see ProgressIndicatorBase in borg.helpers).
1015            progress_logger = logging.getLogger('borg.output.progress')
1016            if progress_logger.getEffectiveLevel() == logging.INFO:
1017                # When progress output is enabled, we check whether the client is in
1018                # --log-json mode, as signalled by the "json" attribute on the "borg" logger.
1019                if logging.getLogger('borg').json:
1020                    # In --log-json mode we re-emit the progress JSON line as sent by the server,
1021                    # with the message, if any, prefixed with "Remote: ".
1022                    if 'message' in msg:
1023                        msg['message'] = 'Remote: ' + msg['message']
1024                    sys.stderr.write(json.dumps(msg) + '\n')
1025                elif 'message' in msg:
1026                    # In text log mode we write only the message to stderr and terminate with \r
1027                    # (carriage return, i.e. move the write cursor back to the beginning of the line)
1028                    # so that the next message, progress or not, overwrites it. This mirrors the behaviour
1029                    # of local progress displays.
1030                    sys.stderr.write('Remote: ' + msg['message'] + '\r')
1031    elif line.startswith('$LOG '):
1032        # This format is used by borg serve 0.xx, 1.0.x and 1.1.0b1..b5.
1033        # It prefixed log lines with $LOG as a marker, followed by the log level
1034        # and optionally a logger name, then "Remote:" as a separator followed by the original
1035        # message.
1036        _, level, msg = line.split(' ', 2)
1037        level = getattr(logging, level, logging.CRITICAL)  # str -> int
1038        if msg.startswith('Remote:'):
1039            # server format: '$LOG <level> Remote: <msg>'
1040            logging.log(level, msg.rstrip())
1041        else:
1042            # server format '$LOG <level> <logname> Remote: <msg>'
1043            logname, msg = msg.split(' ', 1)
1044            logging.getLogger(logname).log(level, msg.rstrip())
1045    else:
1046        # Plain 1.0.x and older format - re-emit to stderr (mirroring what the 1.0.x
1047        # client did) or as a generic log message.
1048        # We don't know what priority the line had.
1049        if logging.getLogger('borg').json:
1050            logging.getLogger('').warning('Remote: ' + line.strip())
1051        else:
1052            # In non-JSON mode we circumvent logging to preserve carriage returns (\r)
1053            # which are generated by remote progress displays.
1054            sys.stderr.write('Remote: ' + line)
1055
1056
1057class RepositoryNoCache:
1058    """A not caching Repository wrapper, passes through to repository.
1059
1060    Just to have same API (including the context manager) as RepositoryCache.
1061
1062    *transform* is a callable taking two arguments, key and raw repository data.
1063    The return value is returned from get()/get_many(). By default, the raw
1064    repository data is returned.
1065    """
1066    def __init__(self, repository, transform=None):
1067        self.repository = repository
1068        self.transform = transform or (lambda key, data: data)
1069
1070    def close(self):
1071        pass
1072
1073    def __enter__(self):
1074        return self
1075
1076    def __exit__(self, exc_type, exc_val, exc_tb):
1077        self.close()
1078
1079    def get(self, key):
1080        return next(self.get_many([key], cache=False))
1081
1082    def get_many(self, keys, cache=True):
1083        for key, data in zip(keys, self.repository.get_many(keys)):
1084            yield self.transform(key, data)
1085
1086    def log_instrumentation(self):
1087        pass
1088
1089
1090class RepositoryCache(RepositoryNoCache):
1091    """
1092    A caching Repository wrapper.
1093
1094    Caches Repository GET operations locally.
1095
1096    *pack* and *unpack* complement *transform* of the base class.
1097    *pack* receives the output of *transform* and should return bytes,
1098    which are stored in the cache. *unpack* receives these bytes and
1099    should return the initial data (as returned by *transform*).
1100    """
1101
1102    def __init__(self, repository, pack=None, unpack=None, transform=None):
1103        super().__init__(repository, transform)
1104        self.pack = pack or (lambda data: data)
1105        self.unpack = unpack or (lambda data: data)
1106        self.cache = set()
1107        self.basedir = tempfile.mkdtemp(prefix='borg-cache-')
1108        self.query_size_limit()
1109        self.size = 0
1110        # Instrumentation
1111        self.hits = 0
1112        self.misses = 0
1113        self.slow_misses = 0
1114        self.slow_lat = 0.0
1115        self.evictions = 0
1116        self.enospc = 0
1117
1118    def query_size_limit(self):
1119        stat_fs = os.statvfs(self.basedir)
1120        available_space = stat_fs.f_bavail * stat_fs.f_frsize
1121        self.size_limit = int(min(available_space * 0.25, 2**31))
1122
1123    def key_filename(self, key):
1124        return os.path.join(self.basedir, bin_to_hex(key))
1125
1126    def backoff(self):
1127        self.query_size_limit()
1128        target_size = int(0.9 * self.size_limit)
1129        while self.size > target_size and self.cache:
1130            key = self.cache.pop()
1131            file = self.key_filename(key)
1132            self.size -= os.stat(file).st_size
1133            os.unlink(file)
1134            self.evictions += 1
1135
1136    def add_entry(self, key, data, cache):
1137        transformed = self.transform(key, data)
1138        if not cache:
1139            return transformed
1140        packed = self.pack(transformed)
1141        file = self.key_filename(key)
1142        try:
1143            with open(file, 'wb') as fd:
1144                fd.write(packed)
1145        except OSError as os_error:
1146            try:
1147                truncate_and_unlink(file)
1148            except FileNotFoundError:
1149                pass  # open() could have failed as well
1150            if os_error.errno == errno.ENOSPC:
1151                self.enospc += 1
1152                self.backoff()
1153            else:
1154                raise
1155        else:
1156            self.size += len(packed)
1157            self.cache.add(key)
1158            if self.size > self.size_limit:
1159                self.backoff()
1160        return transformed
1161
1162    def log_instrumentation(self):
1163        logger.debug('RepositoryCache: current items %d, size %s / %s, %d hits, %d misses, %d slow misses (+%.1fs), '
1164                     '%d evictions, %d ENOSPC hit',
1165                     len(self.cache), format_file_size(self.size), format_file_size(self.size_limit),
1166                     self.hits, self.misses, self.slow_misses, self.slow_lat,
1167                     self.evictions, self.enospc)
1168
1169    def close(self):
1170        self.log_instrumentation()
1171        self.cache.clear()
1172        shutil.rmtree(self.basedir)
1173
1174    def get_many(self, keys, cache=True):
1175        unknown_keys = [key for key in keys if key not in self.cache]
1176        repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys))
1177        for key in keys:
1178            if key in self.cache:
1179                file = self.key_filename(key)
1180                with open(file, 'rb') as fd:
1181                    self.hits += 1
1182                    yield self.unpack(fd.read())
1183            else:
1184                for key_, data in repository_iterator:
1185                    if key_ == key:
1186                        transformed = self.add_entry(key, data, cache)
1187                        self.misses += 1
1188                        yield transformed
1189                        break
1190                else:
1191                    # slow path: eviction during this get_many removed this key from the cache
1192                    t0 = time.perf_counter()
1193                    data = self.repository.get(key)
1194                    self.slow_lat += time.perf_counter() - t0
1195                    transformed = self.add_entry(key, data, cache)
1196                    self.slow_misses += 1
1197                    yield transformed
1198        # Consume any pending requests
1199        for _ in repository_iterator:
1200            pass
1201
1202
1203def cache_if_remote(repository, *, decrypted_cache=False, pack=None, unpack=None, transform=None, force_cache=False):
1204    """
1205    Return a Repository(No)Cache for *repository*.
1206
1207    If *decrypted_cache* is a key object, then get and get_many will return a tuple
1208    (csize, plaintext) instead of the actual data in the repository. The cache will
1209    store decrypted data, which increases CPU efficiency (by avoiding repeatedly decrypting
1210    and more importantly MAC and ID checking cached objects).
1211    Internally, objects are compressed with LZ4.
1212    """
1213    if decrypted_cache and (pack or unpack or transform):
1214        raise ValueError('decrypted_cache and pack/unpack/transform are incompatible')
1215    elif decrypted_cache:
1216        key = decrypted_cache
1217        # 32 bit csize, 64 bit (8 byte) xxh64
1218        cache_struct = struct.Struct('=I8s')
1219        compressor = LZ4()
1220
1221        def pack(data):
1222            csize, decrypted = data
1223            compressed = compressor.compress(decrypted)
1224            return cache_struct.pack(csize, xxh64(compressed)) + compressed
1225
1226        def unpack(data):
1227            data = memoryview(data)
1228            csize, checksum = cache_struct.unpack(data[:cache_struct.size])
1229            compressed = data[cache_struct.size:]
1230            if checksum != xxh64(compressed):
1231                raise IntegrityError('detected corrupted data in metadata cache')
1232            return csize, compressor.decompress(compressed)
1233
1234        def transform(id_, data):
1235            csize = len(data)
1236            decrypted = key.decrypt(id_, data)
1237            return csize, decrypted
1238
1239    if isinstance(repository, RemoteRepository) or force_cache:
1240        return RepositoryCache(repository, pack, unpack, transform)
1241    else:
1242        return RepositoryNoCache(repository, transform)
1243