1from __future__ import unicode_literals
2
3from dvc.utils.compat import str, makedirs
4
5import os
6import stat
7import uuid
8import json
9import ntpath
10import shutil
11import posixpath
12from operator import itemgetter
13
14import dvc.logger as logger
15from dvc.system import System
16from dvc.remote.base import RemoteBase, STATUS_MAP, STATUS_NEW, STATUS_DELETED
17from dvc.utils import remove, move, copyfile, dict_md5, to_chunks, tmp_fname
18from dvc.utils import LARGE_DIR_SIZE
19from dvc.config import Config
20from dvc.exceptions import DvcException
21from dvc.progress import progress
22from concurrent.futures import ThreadPoolExecutor
23
24from dvc.utils.fs import get_mtime_and_size, get_inode
25
26
27class RemoteLOCAL(RemoteBase):
28    scheme = "local"
29    REGEX = r"^(?P<path>.*)$"
30    PARAM_CHECKSUM = "md5"
31    PARAM_PATH = "path"
32    PARAM_RELPATH = "relpath"
33    MD5_DIR_SUFFIX = ".dir"
34
35    CACHE_TYPES = ["reflink", "hardlink", "symlink", "copy"]
36    CACHE_TYPE_MAP = {
37        "copy": shutil.copyfile,
38        "symlink": System.symlink,
39        "hardlink": System.hardlink,
40        "reflink": System.reflink,
41    }
42
43    def __init__(self, repo, config):
44        super(RemoteLOCAL, self).__init__(repo, config)
45        self.state = self.repo.state if self.repo else None
46        self.protected = config.get(Config.SECTION_CACHE_PROTECTED, False)
47        storagepath = config.get(Config.SECTION_AWS_STORAGEPATH, None)
48        self.cache_dir = config.get(Config.SECTION_REMOTE_URL, storagepath)
49
50        if self.cache_dir is not None and not os.path.isabs(self.cache_dir):
51            cwd = config[Config.PRIVATE_CWD]
52            self.cache_dir = os.path.abspath(os.path.join(cwd, self.cache_dir))
53
54        types = config.get(Config.SECTION_CACHE_TYPE, None)
55        if types:
56            if isinstance(types, str):
57                types = [t.strip() for t in types.split(",")]
58            self.cache_types = types
59        else:
60            self.cache_types = self.CACHE_TYPES
61
62        if self.cache_dir is not None and not os.path.exists(self.cache_dir):
63            os.mkdir(self.cache_dir)
64
65        self.path_info = {"scheme": "local"}
66
67    @staticmethod
68    def compat_config(config):
69        ret = config.copy()
70        url = ret.pop(Config.SECTION_AWS_STORAGEPATH, "")
71        ret[Config.SECTION_REMOTE_URL] = url
72        return ret
73
74    @property
75    def url(self):
76        return self.cache_dir
77
78    @property
79    def prefix(self):
80        return self.cache_dir
81
82    def list_cache_paths(self):
83        clist = []
84        for entry in os.listdir(self.cache_dir):
85            subdir = os.path.join(self.cache_dir, entry)
86            if not os.path.isdir(subdir):
87                continue
88
89            for cache in os.listdir(subdir):
90                clist.append(os.path.join(subdir, cache))
91
92        return clist
93
94    def get(self, md5):
95        if not md5:
96            return None
97
98        return os.path.join(self.cache_dir, md5[0:2], md5[2:])
99
100    def changed_cache_file(self, md5):
101        cache = self.get(md5)
102        if self.state.changed(cache, md5=md5):
103            if os.path.exists(cache):
104                msg = "Corrupted cache file {}."
105                logger.warning(msg.format(os.path.relpath(cache)))
106                remove(cache)
107            return True
108        return False
109
110    def exists(self, path_info):
111        assert not isinstance(path_info, list)
112        assert path_info["scheme"] == "local"
113        return os.path.exists(path_info["path"])
114
115    def changed_cache(self, md5):
116        cache = self.get(md5)
117        clist = [(cache, md5)]
118
119        while True:
120            if len(clist) == 0:
121                break
122
123            cache, md5 = clist.pop()
124            if self.changed_cache_file(md5):
125                return True
126
127            if self.is_dir_cache(cache) and self._cache_metadata_changed():
128                for entry in self.load_dir_cache(md5):
129                    md5 = entry[self.PARAM_CHECKSUM]
130                    cache = self.get(md5)
131                    clist.append((cache, md5))
132
133        return False
134
135    def link(self, cache, path):
136        assert os.path.isfile(cache)
137
138        dname = os.path.dirname(path)
139        if not os.path.exists(dname):
140            os.makedirs(dname)
141
142        # NOTE: just create an empty file for an empty cache
143        if os.path.getsize(cache) == 0:
144            open(path, "w+").close()
145
146            msg = "Created empty file: {} -> {}".format(cache, path)
147            logger.debug(msg)
148            return
149
150        i = len(self.cache_types)
151        while i > 0:
152            try:
153                self.CACHE_TYPE_MAP[self.cache_types[0]](cache, path)
154
155                if self.protected:
156                    os.chmod(path, stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH)
157
158                msg = "Created {}'{}': {} -> {}".format(
159                    "protected " if self.protected else "",
160                    self.cache_types[0],
161                    cache,
162                    path,
163                )
164
165                logger.debug(msg)
166                return
167
168            except DvcException as exc:
169                msg = "Cache type '{}' is not supported: {}"
170                logger.debug(msg.format(self.cache_types[0], str(exc)))
171                del self.cache_types[0]
172                i -= 1
173
174        raise DvcException("no possible cache types left to try out.")
175
176    @property
177    def ospath(self):
178        if os.name == "nt":
179            return ntpath
180        return posixpath
181
182    @classmethod
183    def to_ospath(cls, path):
184        if os.name == "nt":
185            return cls.ntpath(path)
186        return cls.unixpath(path)
187
188    @staticmethod
189    def unixpath(path):
190        assert not ntpath.isabs(path)
191        assert not posixpath.isabs(path)
192        return path.replace("\\", "/")
193
194    @staticmethod
195    def ntpath(path):
196        assert not ntpath.isabs(path)
197        assert not posixpath.isabs(path)
198        return path.replace("/", "\\")
199
200    def collect_dir_cache(self, dname):
201        dir_info = []
202
203        for root, dirs, files in os.walk(str(dname)):
204            bar = False
205
206            if len(files) > LARGE_DIR_SIZE:
207                msg = (
208                    "Computing md5 for a large directory {}. "
209                    "This is only done once."
210                )
211                logger.info(msg.format(os.path.relpath(root)))
212                bar = True
213                title = os.path.relpath(root)
214                processed = 0
215                total = len(files)
216                progress.update_target(title, 0, total)
217
218            for fname in files:
219                path = os.path.join(root, fname)
220                relpath = self.unixpath(os.path.relpath(path, dname))
221
222                if bar:
223                    progress.update_target(title, processed, total)
224                    processed += 1
225
226                md5 = self.state.update(path)
227                dir_info.append(
228                    {self.PARAM_RELPATH: relpath, self.PARAM_CHECKSUM: md5}
229                )
230
231            if bar:
232                progress.finish_target(title)
233
234        # NOTE: sorting the list by path to ensure reproducibility
235        dir_info = sorted(dir_info, key=itemgetter(self.PARAM_RELPATH))
236
237        md5 = dict_md5(dir_info) + self.MD5_DIR_SUFFIX
238        if self.changed_cache_file(md5):
239            self.dump_dir_cache(md5, dir_info)
240
241        return (md5, dir_info)
242
243    def load_dir_cache(self, md5):
244        path = self.get(md5)
245
246        assert self.is_dir_cache(path)
247
248        try:
249            with open(path, "r") as fd:
250                d = json.load(fd)
251        except Exception:
252            msg = "Failed to load dir cache '{}'"
253            logger.error(msg.format(os.path.relpath(path)))
254            return []
255
256        if not isinstance(d, list):
257            msg = "dir cache file format error '{}' [skipping the file]"
258            logger.error(msg.format(os.path.relpath(path)))
259            return []
260
261        for info in d:
262            info["relpath"] = self.to_ospath(info["relpath"])
263
264        return d
265
266    def dump_dir_cache(self, md5, dir_info):
267        path = self.get(md5)
268        dname = os.path.dirname(path)
269
270        assert self.is_dir_cache(path)
271        assert isinstance(dir_info, list)
272
273        if not os.path.isdir(dname):
274            os.makedirs(dname)
275
276        # NOTE: Writing first and renaming after that
277        # to make sure that the operation is atomic.
278        tmp = "{}.{}".format(path, str(uuid.uuid4()))
279
280        with open(tmp, "w+") as fd:
281            json.dump(dir_info, fd, sort_keys=True)
282        move(tmp, path)
283
284    @classmethod
285    def is_dir_cache(cls, cache):
286        return cache.endswith(cls.MD5_DIR_SUFFIX)
287
288    def do_checkout(self, path_info, checksum, force=False):
289        path = path_info["path"]
290        md5 = checksum
291        cache = self.get(md5)
292
293        if not self.is_dir_cache(cache):
294            if os.path.exists(path):
295                self.safe_remove(path_info, force=force)
296
297            self.link(cache, path)
298            self.state.update_link(path)
299            return
300
301        # Create dir separately so that dir is created
302        # even if there are no files in it
303        if not os.path.exists(path):
304            os.makedirs(path)
305
306        dir_info = self.load_dir_cache(md5)
307        dir_relpath = os.path.relpath(path)
308        dir_size = len(dir_info)
309        bar = dir_size > LARGE_DIR_SIZE
310
311        logger.info("Linking directory '{}'.".format(dir_relpath))
312
313        for processed, entry in enumerate(dir_info):
314            relpath = entry[self.PARAM_RELPATH]
315            m = entry[self.PARAM_CHECKSUM]
316            p = os.path.join(path, relpath)
317            c = self.get(m)
318
319            entry_info = {"scheme": path_info["scheme"], self.PARAM_PATH: p}
320
321            entry_checksum_info = {self.PARAM_CHECKSUM: m}
322
323            if self.changed(entry_info, entry_checksum_info):
324                if os.path.exists(p):
325                    self.safe_remove(entry_info, force=force)
326
327                self.link(c, p)
328
329            if bar:
330                progress.update_target(dir_relpath, processed, dir_size)
331
332        self._discard_working_directory_changes(path, dir_info, force=force)
333
334        self.state.update_link(path)
335
336        if bar:
337            progress.finish_target(dir_relpath)
338
339    def already_cached(self, path_info):
340        assert path_info["scheme"] in ["", "local"]
341
342        current_md5 = self.state.update(path_info["path"])
343
344        if not current_md5:
345            return False
346
347        return not self.changed_cache(current_md5)
348
349    def _discard_working_directory_changes(self, path, dir_info, force=False):
350        working_dir_files = set(
351            os.path.join(root, file)
352            for root, _, files in os.walk(str(path))
353            for file in files
354        )
355
356        cached_files = set(
357            os.path.join(path, file["relpath"]) for file in dir_info
358        )
359
360        delta = working_dir_files - cached_files
361
362        for file in delta:
363            self.safe_remove({"scheme": "local", "path": file}, force=force)
364
365    def _move(self, inp, outp):
366        # moving in two stages to make the whole operation atomic in
367        # case inp and outp are in different filesystems and actual
368        # physical copying of data is happening
369        tmp = "{}.{}".format(outp, str(uuid.uuid4()))
370        move(inp, tmp)
371        move(tmp, outp)
372
373    def _save_file(self, path, md5):
374        assert md5 is not None
375
376        cache = self.get(md5)
377
378        if self.changed_cache(md5):
379            self._move(path, cache)
380        else:
381            remove(path)
382
383        self.link(cache, path)
384        self.state.update_link(path)
385
386        # we need to update path and cache, since in case of reflink,
387        # or copy cache type moving original file results in updates on
388        # next executed command, which causes md5 recalculation
389        self.state.update(path, md5)
390        self.state.update(cache, md5)
391
392        return {self.PARAM_CHECKSUM: md5}
393
394    def _save_dir(self, path, md5):
395        dir_info = self.load_dir_cache(md5)
396        dir_relpath = os.path.relpath(path)
397        dir_size = len(dir_info)
398        bar = dir_size > LARGE_DIR_SIZE
399
400        logger.info("Linking directory '{}'.".format(dir_relpath))
401
402        for processed, entry in enumerate(dir_info):
403            relpath = entry[self.PARAM_RELPATH]
404            m = entry[self.PARAM_CHECKSUM]
405            p = os.path.join(path, relpath)
406            c = self.get(m)
407
408            if self.changed_cache(m):
409                self._move(p, c)
410            else:
411                remove(p)
412
413            self.link(c, p)
414
415            self.state.update(p, m)
416            self.state.update(c, m)
417
418            if bar:
419                progress.update_target(dir_relpath, processed, dir_size)
420
421        self.state.update_link(path)
422
423        cache = self.get(md5)
424        self.state.update(cache)
425        self.state.update(path, md5)
426
427        if bar:
428            progress.finish_target(dir_relpath)
429
430    def save(self, path_info, checksum_info):
431        if path_info["scheme"] != "local":
432            raise NotImplementedError
433
434        path = path_info["path"]
435
436        msg = "Saving '{}' to cache '{}'."
437        logger.info(
438            msg.format(os.path.relpath(path), os.path.relpath(self.cache_dir))
439        )
440
441        md5 = checksum_info[self.PARAM_CHECKSUM]
442        if os.path.isdir(path):
443            self._save_dir(path, md5)
444        else:
445            self._save_file(path, md5)
446
447    def save_info(self, path_info):
448        if path_info["scheme"] != "local":
449            raise NotImplementedError
450
451        return {self.PARAM_CHECKSUM: self.state.update(path_info["path"])}
452
453    def remove(self, path_info):
454        if path_info["scheme"] != "local":
455            raise NotImplementedError
456
457        remove(path_info["path"])
458
459    def move(self, from_info, to_info):
460        if from_info["scheme"] != "local" or to_info["scheme"] != "local":
461            raise NotImplementedError
462
463        move(from_info["path"], to_info["path"])
464
465    def cache_exists(self, md5s):
466        assert isinstance(md5s, list)
467        return list(filter(lambda md5: not self.changed_cache_file(md5), md5s))
468
469    def upload(self, from_infos, to_infos, names=None):
470        names = self._verify_path_args(to_infos, from_infos, names)
471
472        for from_info, to_info, name in zip(from_infos, to_infos, names):
473            if to_info["scheme"] != "local":
474                raise NotImplementedError
475
476            if from_info["scheme"] != "local":
477                raise NotImplementedError
478
479            logger.debug(
480                "Uploading '{}' to '{}'".format(
481                    from_info["path"], to_info["path"]
482                )
483            )
484
485            if not name:
486                name = os.path.basename(from_info["path"])
487
488            makedirs(os.path.dirname(to_info["path"]), exist_ok=True)
489
490            try:
491                copyfile(from_info["path"], to_info["path"], name=name)
492            except Exception:
493                logger.error(
494                    "failed to upload '{}' to '{}'".format(
495                        from_info["path"], to_info["path"]
496                    )
497                )
498
499    def download(
500        self,
501        from_infos,
502        to_infos,
503        no_progress_bar=False,
504        names=None,
505        resume=False,
506    ):
507        names = self._verify_path_args(from_infos, to_infos, names)
508
509        for to_info, from_info, name in zip(to_infos, from_infos, names):
510            if from_info["scheme"] != "local":
511                raise NotImplementedError
512
513            if to_info["scheme"] != "local":
514                raise NotImplementedError
515
516            logger.debug(
517                "Downloading '{}' to '{}'".format(
518                    from_info["path"], to_info["path"]
519                )
520            )
521
522            if not name:
523                name = os.path.basename(to_info["path"])
524
525            makedirs(os.path.dirname(to_info["path"]), exist_ok=True)
526            tmp_file = tmp_fname(to_info["path"])
527            try:
528                copyfile(
529                    from_info["path"],
530                    tmp_file,
531                    no_progress_bar=no_progress_bar,
532                    name=name,
533                )
534            except Exception:
535                logger.error(
536                    "failed to download '{}' to '{}'".format(
537                        from_info["path"], to_info["path"]
538                    )
539                )
540
541                continue
542
543            os.rename(tmp_file, to_info["path"])
544
545    def _group(self, checksum_infos, show_checksums=False):
546        by_md5 = {}
547
548        for info in checksum_infos:
549            md5 = info[self.PARAM_CHECKSUM]
550
551            if show_checksums:
552                by_md5[md5] = {"name": md5}
553                continue
554
555            name = info[self.PARAM_PATH]
556            branch = info.get("branch")
557            if branch:
558                name += "({})".format(branch)
559
560            if md5 not in by_md5.keys():
561                by_md5[md5] = {"name": name}
562            else:
563                by_md5[md5]["name"] += " " + name
564
565        return by_md5
566
567    def status(self, checksum_infos, remote, jobs=None, show_checksums=False):
568        logger.info("Preparing to collect status from {}".format(remote.url))
569        title = "Collecting information"
570
571        ret = {}
572
573        progress.set_n_total(1)
574        progress.update_target(title, 0, 100)
575
576        progress.update_target(title, 10, 100)
577
578        ret = self._group(checksum_infos, show_checksums=show_checksums)
579        md5s = list(ret.keys())
580
581        progress.update_target(title, 30, 100)
582
583        remote_exists = list(remote.cache_exists(md5s))
584
585        progress.update_target(title, 90, 100)
586
587        local_exists = self.cache_exists(md5s)
588
589        progress.finish_target(title)
590
591        for md5, info in ret.items():
592            info["status"] = STATUS_MAP[
593                (md5 in local_exists, md5 in remote_exists)
594            ]
595
596        return ret
597
598    def _get_chunks(self, download, remote, status_info, status, jobs):
599        title = "Analysing status."
600
601        progress.set_n_total(1)
602        total = len(status_info)
603        current = 0
604
605        cache = []
606        path_infos = []
607        names = []
608        for md5, info in status_info.items():
609            if info["status"] == status:
610                cache.append(self.checksum_to_path_info(md5))
611                path_infos.append(remote.checksum_to_path_info(md5))
612                names.append(info["name"])
613            current += 1
614            progress.update_target(title, current, total)
615
616        progress.finish_target(title)
617
618        progress.set_n_total(len(names))
619
620        if download:
621            to_infos = cache
622            from_infos = path_infos
623        else:
624            to_infos = path_infos
625            from_infos = cache
626
627        return list(
628            zip(
629                to_chunks(from_infos, jobs),
630                to_chunks(to_infos, jobs),
631                to_chunks(names, jobs),
632            )
633        )
634
635    def _process(
636        self,
637        checksum_infos,
638        remote,
639        jobs=None,
640        show_checksums=False,
641        download=False,
642    ):
643        msg = "Preparing to {} data {} '{}'"
644        logger.info(
645            msg.format(
646                "download" if download else "upload",
647                "from" if download else "to",
648                remote.url,
649            )
650        )
651
652        if download:
653            func = remote.download
654            status = STATUS_DELETED
655        else:
656            func = remote.upload
657            status = STATUS_NEW
658
659        if jobs is None:
660            jobs = remote.JOBS
661
662        status_info = self.status(
663            checksum_infos, remote, jobs=jobs, show_checksums=show_checksums
664        )
665
666        chunks = self._get_chunks(download, remote, status_info, status, jobs)
667
668        if len(chunks) == 0:
669            return 0
670
671        futures = []
672        with ThreadPoolExecutor(max_workers=jobs) as executor:
673            for from_infos, to_infos, names in chunks:
674                res = executor.submit(func, from_infos, to_infos, names=names)
675                futures.append(res)
676
677        for f in futures:
678            f.result()
679
680        return len(chunks)
681
682    def push(self, checksum_infos, remote, jobs=None, show_checksums=False):
683        return self._process(
684            checksum_infos,
685            remote,
686            jobs=jobs,
687            show_checksums=show_checksums,
688            download=False,
689        )
690
691    def pull(self, checksum_infos, remote, jobs=None, show_checksums=False):
692        return self._process(
693            checksum_infos,
694            remote,
695            jobs=jobs,
696            show_checksums=show_checksums,
697            download=True,
698        )
699
700    def _cache_metadata_changed(self):
701        mtime, size = get_mtime_and_size(self.cache_dir)
702        inode = get_inode(self.cache_dir)
703
704        existing_record = self.state.get_state_record_for_inode(inode)
705
706        if existing_record:
707            cached_mtime, cached_size, _, _ = existing_record
708            return not (mtime == cached_mtime and size == cached_size)
709
710        return True
711