1from __future__ import unicode_literals
2
3from dvc.utils.compat import str
4
5import re
6import posixpath
7from multiprocessing import cpu_count
8
9import dvc.prompt as prompt
10import dvc.logger as logger
11from dvc.config import Config
12from dvc.exceptions import DvcException, ConfirmRemoveError
13
14
15STATUS_OK = 1
16STATUS_NEW = 3
17STATUS_DELETED = 4
18
19
20STATUS_MAP = {
21    # (local_exists, remote_exists)
22    (True, True): STATUS_OK,
23    (False, False): STATUS_OK,
24    (True, False): STATUS_NEW,
25    (False, True): STATUS_DELETED,
26}
27
28
29class DataCloudError(DvcException):
30    """ Data Cloud exception """
31
32    def __init__(self, msg):
33        super(DataCloudError, self).__init__("Data sync error: {}".format(msg))
34
35
36class RemoteCmdError(DvcException):
37    def __init__(self, remote, cmd, ret, err):
38        super(RemoteCmdError, self).__init__(
39            "{remote} command '{cmd}' finished with non-zero return code"
40            " {ret}': {err}".format(remote=remote, cmd=cmd, ret=ret, err=err)
41        )
42
43
44class RemoteActionNotImplemented(DvcException):
45    def __init__(self, action, scheme):
46        m = "{} is not supported by {} remote".format(action, scheme)
47        super(RemoteActionNotImplemented, self).__init__(m)
48
49
50class RemoteMissingDepsError(DvcException):
51    pass
52
53
54class RemoteBase(object):
55    scheme = None
56    REGEX = None
57    REQUIRES = {}
58    JOBS = 4 * cpu_count()
59
60    def __init__(self, repo, config):
61        self.repo = repo
62        deps_ok = all(self.REQUIRES.values())
63        if not deps_ok:
64            missing = [k for k, v in self.REQUIRES.items() if v is None]
65            url = config.get(
66                Config.SECTION_REMOTE_URL, "{}://".format(self.scheme)
67            )
68            msg = (
69                "URL '{}' is supported but requires these missing "
70                "dependencies: {}. If you have installed dvc using pip, "
71                "choose one of these options to proceed: \n"
72                "\n"
73                "    1) Install specific missing dependencies:\n"
74                "        pip install {}\n"
75                "    2) Install dvc package that includes those missing "
76                "dependencies: \n"
77                "        pip install dvc[{}]\n"
78                "    3) Install dvc package with all possible "
79                "dependencies included: \n"
80                "        pip install dvc[all]\n"
81                "\n"
82                "If you have installed dvc from a binary package and you "
83                "are still seeing this message, please report it to us "
84                "using https://github.com/iterative/dvc/issues. Thank you!"
85            ).format(url, missing, " ".join(missing), self.scheme)
86            raise RemoteMissingDepsError(msg)
87
88    def __repr__(self):
89        return "{class_name}: '{url}'".format(
90            class_name=type(self).__name__, url=(self.url or "No url")
91        )
92
93    def compat_config(config):
94        return config.copy()
95
96    @classmethod
97    def supported(cls, config):
98        url = config[Config.SECTION_REMOTE_URL]
99        return cls.match(url) is not None
100
101    @classmethod
102    def match(cls, url):
103        return re.match(cls.REGEX, url)
104
105    def group(self, name):
106        m = self.match(self.url)
107        if not m:
108            return None
109        return m.group(name)
110
111    def save_info(self, path_info):
112        raise NotImplementedError
113
114    def changed(self, path_info, checksum_info):
115        """Checks if data has changed.
116
117        A file is considered changed if:
118            - It doesn't exist on the working directory (was unlinked)
119            - Checksum is not computed (saving a new file)
120            - The checkusm stored in the State is different from the given one
121            - There's no file in the cache
122
123        Args:
124            path_info: dict with path information.
125            checksum: expected checksum for this data.
126
127        Returns:
128            bool: True if data has changed, False otherwise.
129        """
130
131        logger.debug(
132            "checking if '{}'('{}') has changed.".format(
133                path_info, checksum_info
134            )
135        )
136
137        if not self.exists(path_info):
138            logger.debug("'{}' doesn't exist.".format(path_info))
139            return True
140
141        checksum = checksum_info.get(self.PARAM_CHECKSUM)
142        if checksum is None:
143            logger.debug("checksum for '{}' is missing.".format(path_info))
144            return True
145
146        if self.changed_cache(checksum):
147            logger.debug(
148                "cache for '{}'('{}') has changed.".format(path_info, checksum)
149            )
150            return True
151
152        actual = self.save_info(path_info)[self.PARAM_CHECKSUM]
153        if checksum != actual:
154            logger.debug(
155                "checksum '{}'(actual '{}') for '{}' has changed.".format(
156                    checksum, actual, path_info
157                )
158            )
159            return True
160
161        logger.debug("'{}' hasn't changed.".format(path_info))
162        return False
163
164    def save(self, path_info, checksum_info):
165        if path_info["scheme"] != self.scheme:
166            raise RemoteActionNotImplemented(
167                "save {} -> {}".format(path_info["scheme"], self.scheme),
168                self.scheme,
169            )
170
171        checksum = checksum_info[self.PARAM_CHECKSUM]
172        if not self.changed_cache(checksum):
173            return
174
175        to_info = self.checksum_to_path_info(checksum)
176
177        logger.info("Saving '{}' to '{}'.".format(path_info, to_info))
178
179        self.copy(path_info, to_info)
180
181    def download(
182        self,
183        from_infos,
184        to_infos,
185        no_progress_bar=False,
186        name=None,
187        resume=False,
188    ):
189        raise RemoteActionNotImplemented("download", self.scheme)
190
191    def upload(self, from_infos, to_infos, names=None):
192        raise RemoteActionNotImplemented("upload", self.scheme)
193
194    def remove(self, path_info):
195        raise RemoteActionNotImplemented("remove", self.scheme)
196
197    def move(self, path_info):
198        raise RemoteActionNotImplemented("move", self.scheme)
199
200    def copy(self, from_info, to_info):
201        raise RemoteActionNotImplemented("copy", self.scheme)
202
203    def exists(self, path_infos):
204        raise NotImplementedError
205
206    @classmethod
207    def _verify_path_args(cls, from_infos, to_infos, names=None):
208        assert isinstance(from_infos, list)
209        assert isinstance(to_infos, list)
210        assert len(from_infos) == len(to_infos)
211
212        if not names:
213            names = len(to_infos) * [None]
214        else:
215            assert isinstance(names, list)
216            assert len(names) == len(to_infos)
217
218        return names
219
220    @property
221    def ospath(self):
222        return posixpath
223
224    def checksum_to_path(self, checksum):
225        return self.ospath.join(self.prefix, checksum[0:2], checksum[2:])
226
227    def path_to_checksum(self, path):
228        relpath = self.ospath.relpath(path, self.prefix)
229        return self.ospath.dirname(relpath) + self.ospath.basename(relpath)
230
231    def checksum_to_path_info(self, checksum):
232        path_info = self.path_info.copy()
233        path_info["path"] = self.checksum_to_path(checksum)
234        return path_info
235
236    def md5s_to_path_infos(self, md5s):
237        return [self.checksum_to_path_info(md5) for md5 in md5s]
238
239    def list_cache_paths(self):
240        raise NotImplementedError
241
242    def all(self):
243        # NOTE: The list might be way too big(e.g. 100M entries, md5 for each
244        # is 32 bytes, so ~3200Mb list) and we don't really need all of it at
245        # the same time, so it makes sense to use a generator to gradually
246        # iterate over it, without keeping all of it in memory.
247        return (
248            self.path_to_checksum(path) for path in self.list_cache_paths()
249        )
250
251    def gc(self, cinfos):
252        from dvc.remote.local import RemoteLOCAL
253
254        used = [info[RemoteLOCAL.PARAM_CHECKSUM] for info in cinfos["local"]]
255
256        if self.scheme != "":
257            used += [info[self.PARAM_CHECKSUM] for info in cinfos[self.scheme]]
258
259        removed = False
260        for checksum in self.all():
261            if checksum in used:
262                continue
263            path_info = self.checksum_to_path_info(checksum)
264            self.remove(path_info)
265            removed = True
266        return removed
267
268    def changed_cache(self, checksum):
269        cache = self.checksum_to_path_info(checksum)
270        expected = {self.PARAM_CHECKSUM: checksum}
271
272        if not self.exists(cache):
273            return True
274
275        actual = self.save_info(cache)
276
277        logger.debug(
278            "Cache '{}' actual '{}'.".format(str(expected), str(actual))
279        )
280
281        if expected != actual:
282            if self.exists(cache):
283                msg = "corrupted cache file {}"
284                logger.warning(msg.format(str(cache)))
285                self.remove(cache)
286            return True
287
288        return False
289
290    def cache_exists(self, checksums):
291        # NOTE: The reason for such an odd logic is that most of the remotes
292        # take much shorter time to just retrieve everything they have under
293        # a certain prefix(e.g. s3, gs, ssh, hdfs). Other remotes that can
294        # check if particular file exists much quicker, use their own
295        # implementation of cache_exists(see http, local).
296        #
297        # Result of all() might be way too big, so we should walk through
298        # it in one pass.
299        return list(filter(lambda checksum: checksum in checksums, self.all()))
300
301    def already_cached(self, path_info):
302        current = self.save_info(path_info)[self.PARAM_CHECKSUM]
303
304        if not current:
305            return False
306
307        return not self.changed_cache(current)
308
309    def safe_remove(self, path_info, force=False):
310        if not self.exists(path_info):
311            return
312
313        if not force and not self.already_cached(path_info):
314            msg = (
315                "file '{}' is going to be removed."
316                " Are you sure you want to proceed?".format(str(path_info))
317            )
318
319            if not prompt.confirm(msg):
320                raise ConfirmRemoveError(str(path_info))
321
322        self.remove(path_info)
323
324    def do_checkout(self, path_info, checksum, force=False):
325        if self.exists(path_info):
326            msg = "data '{}' exists. Removing before checkout."
327            logger.warning(msg.format(str(path_info)))
328            self.safe_remove(path_info, force=force)
329
330        from_info = self.checksum_to_path_info(checksum)
331        self.copy(from_info, path_info)
332
333    def checkout(self, path_info, checksum_info, force=False):
334        scheme = path_info["scheme"]
335        if scheme not in ["", "local"] and scheme != self.scheme:
336            raise NotImplementedError
337
338        checksum = checksum_info.get(self.PARAM_CHECKSUM)
339        if not checksum:
340            msg = "No checksum info for '{}'."
341            logger.info(msg.format(str(path_info)))
342            return
343
344        if not self.changed(path_info, checksum_info):
345            msg = "Data '{}' didn't change."
346            logger.info(msg.format(str(path_info)))
347            return
348
349        if self.changed_cache(checksum):
350            msg = "Cache '{}' not found. File '{}' won't be created."
351            logger.warning(msg.format(checksum, str(path_info)))
352            self.safe_remove(path_info, force=force)
353            return
354
355        msg = "Checking out '{}' with cache '{}'."
356        logger.info(msg.format(str(path_info), checksum))
357
358        self.do_checkout(path_info, checksum, force=force)
359