1from __future__ import unicode_literals 2 3from dvc.utils.compat import str 4 5import re 6import posixpath 7from multiprocessing import cpu_count 8 9import dvc.prompt as prompt 10import dvc.logger as logger 11from dvc.config import Config 12from dvc.exceptions import DvcException, ConfirmRemoveError 13 14 15STATUS_OK = 1 16STATUS_NEW = 3 17STATUS_DELETED = 4 18 19 20STATUS_MAP = { 21 # (local_exists, remote_exists) 22 (True, True): STATUS_OK, 23 (False, False): STATUS_OK, 24 (True, False): STATUS_NEW, 25 (False, True): STATUS_DELETED, 26} 27 28 29class DataCloudError(DvcException): 30 """ Data Cloud exception """ 31 32 def __init__(self, msg): 33 super(DataCloudError, self).__init__("Data sync error: {}".format(msg)) 34 35 36class RemoteCmdError(DvcException): 37 def __init__(self, remote, cmd, ret, err): 38 super(RemoteCmdError, self).__init__( 39 "{remote} command '{cmd}' finished with non-zero return code" 40 " {ret}': {err}".format(remote=remote, cmd=cmd, ret=ret, err=err) 41 ) 42 43 44class RemoteActionNotImplemented(DvcException): 45 def __init__(self, action, scheme): 46 m = "{} is not supported by {} remote".format(action, scheme) 47 super(RemoteActionNotImplemented, self).__init__(m) 48 49 50class RemoteMissingDepsError(DvcException): 51 pass 52 53 54class RemoteBase(object): 55 scheme = None 56 REGEX = None 57 REQUIRES = {} 58 JOBS = 4 * cpu_count() 59 60 def __init__(self, repo, config): 61 self.repo = repo 62 deps_ok = all(self.REQUIRES.values()) 63 if not deps_ok: 64 missing = [k for k, v in self.REQUIRES.items() if v is None] 65 url = config.get( 66 Config.SECTION_REMOTE_URL, "{}://".format(self.scheme) 67 ) 68 msg = ( 69 "URL '{}' is supported but requires these missing " 70 "dependencies: {}. If you have installed dvc using pip, " 71 "choose one of these options to proceed: \n" 72 "\n" 73 " 1) Install specific missing dependencies:\n" 74 " pip install {}\n" 75 " 2) Install dvc package that includes those missing " 76 "dependencies: \n" 77 " pip install dvc[{}]\n" 78 " 3) Install dvc package with all possible " 79 "dependencies included: \n" 80 " pip install dvc[all]\n" 81 "\n" 82 "If you have installed dvc from a binary package and you " 83 "are still seeing this message, please report it to us " 84 "using https://github.com/iterative/dvc/issues. Thank you!" 85 ).format(url, missing, " ".join(missing), self.scheme) 86 raise RemoteMissingDepsError(msg) 87 88 def __repr__(self): 89 return "{class_name}: '{url}'".format( 90 class_name=type(self).__name__, url=(self.url or "No url") 91 ) 92 93 def compat_config(config): 94 return config.copy() 95 96 @classmethod 97 def supported(cls, config): 98 url = config[Config.SECTION_REMOTE_URL] 99 return cls.match(url) is not None 100 101 @classmethod 102 def match(cls, url): 103 return re.match(cls.REGEX, url) 104 105 def group(self, name): 106 m = self.match(self.url) 107 if not m: 108 return None 109 return m.group(name) 110 111 def save_info(self, path_info): 112 raise NotImplementedError 113 114 def changed(self, path_info, checksum_info): 115 """Checks if data has changed. 116 117 A file is considered changed if: 118 - It doesn't exist on the working directory (was unlinked) 119 - Checksum is not computed (saving a new file) 120 - The checkusm stored in the State is different from the given one 121 - There's no file in the cache 122 123 Args: 124 path_info: dict with path information. 125 checksum: expected checksum for this data. 126 127 Returns: 128 bool: True if data has changed, False otherwise. 129 """ 130 131 logger.debug( 132 "checking if '{}'('{}') has changed.".format( 133 path_info, checksum_info 134 ) 135 ) 136 137 if not self.exists(path_info): 138 logger.debug("'{}' doesn't exist.".format(path_info)) 139 return True 140 141 checksum = checksum_info.get(self.PARAM_CHECKSUM) 142 if checksum is None: 143 logger.debug("checksum for '{}' is missing.".format(path_info)) 144 return True 145 146 if self.changed_cache(checksum): 147 logger.debug( 148 "cache for '{}'('{}') has changed.".format(path_info, checksum) 149 ) 150 return True 151 152 actual = self.save_info(path_info)[self.PARAM_CHECKSUM] 153 if checksum != actual: 154 logger.debug( 155 "checksum '{}'(actual '{}') for '{}' has changed.".format( 156 checksum, actual, path_info 157 ) 158 ) 159 return True 160 161 logger.debug("'{}' hasn't changed.".format(path_info)) 162 return False 163 164 def save(self, path_info, checksum_info): 165 if path_info["scheme"] != self.scheme: 166 raise RemoteActionNotImplemented( 167 "save {} -> {}".format(path_info["scheme"], self.scheme), 168 self.scheme, 169 ) 170 171 checksum = checksum_info[self.PARAM_CHECKSUM] 172 if not self.changed_cache(checksum): 173 return 174 175 to_info = self.checksum_to_path_info(checksum) 176 177 logger.info("Saving '{}' to '{}'.".format(path_info, to_info)) 178 179 self.copy(path_info, to_info) 180 181 def download( 182 self, 183 from_infos, 184 to_infos, 185 no_progress_bar=False, 186 name=None, 187 resume=False, 188 ): 189 raise RemoteActionNotImplemented("download", self.scheme) 190 191 def upload(self, from_infos, to_infos, names=None): 192 raise RemoteActionNotImplemented("upload", self.scheme) 193 194 def remove(self, path_info): 195 raise RemoteActionNotImplemented("remove", self.scheme) 196 197 def move(self, path_info): 198 raise RemoteActionNotImplemented("move", self.scheme) 199 200 def copy(self, from_info, to_info): 201 raise RemoteActionNotImplemented("copy", self.scheme) 202 203 def exists(self, path_infos): 204 raise NotImplementedError 205 206 @classmethod 207 def _verify_path_args(cls, from_infos, to_infos, names=None): 208 assert isinstance(from_infos, list) 209 assert isinstance(to_infos, list) 210 assert len(from_infos) == len(to_infos) 211 212 if not names: 213 names = len(to_infos) * [None] 214 else: 215 assert isinstance(names, list) 216 assert len(names) == len(to_infos) 217 218 return names 219 220 @property 221 def ospath(self): 222 return posixpath 223 224 def checksum_to_path(self, checksum): 225 return self.ospath.join(self.prefix, checksum[0:2], checksum[2:]) 226 227 def path_to_checksum(self, path): 228 relpath = self.ospath.relpath(path, self.prefix) 229 return self.ospath.dirname(relpath) + self.ospath.basename(relpath) 230 231 def checksum_to_path_info(self, checksum): 232 path_info = self.path_info.copy() 233 path_info["path"] = self.checksum_to_path(checksum) 234 return path_info 235 236 def md5s_to_path_infos(self, md5s): 237 return [self.checksum_to_path_info(md5) for md5 in md5s] 238 239 def list_cache_paths(self): 240 raise NotImplementedError 241 242 def all(self): 243 # NOTE: The list might be way too big(e.g. 100M entries, md5 for each 244 # is 32 bytes, so ~3200Mb list) and we don't really need all of it at 245 # the same time, so it makes sense to use a generator to gradually 246 # iterate over it, without keeping all of it in memory. 247 return ( 248 self.path_to_checksum(path) for path in self.list_cache_paths() 249 ) 250 251 def gc(self, cinfos): 252 from dvc.remote.local import RemoteLOCAL 253 254 used = [info[RemoteLOCAL.PARAM_CHECKSUM] for info in cinfos["local"]] 255 256 if self.scheme != "": 257 used += [info[self.PARAM_CHECKSUM] for info in cinfos[self.scheme]] 258 259 removed = False 260 for checksum in self.all(): 261 if checksum in used: 262 continue 263 path_info = self.checksum_to_path_info(checksum) 264 self.remove(path_info) 265 removed = True 266 return removed 267 268 def changed_cache(self, checksum): 269 cache = self.checksum_to_path_info(checksum) 270 expected = {self.PARAM_CHECKSUM: checksum} 271 272 if not self.exists(cache): 273 return True 274 275 actual = self.save_info(cache) 276 277 logger.debug( 278 "Cache '{}' actual '{}'.".format(str(expected), str(actual)) 279 ) 280 281 if expected != actual: 282 if self.exists(cache): 283 msg = "corrupted cache file {}" 284 logger.warning(msg.format(str(cache))) 285 self.remove(cache) 286 return True 287 288 return False 289 290 def cache_exists(self, checksums): 291 # NOTE: The reason for such an odd logic is that most of the remotes 292 # take much shorter time to just retrieve everything they have under 293 # a certain prefix(e.g. s3, gs, ssh, hdfs). Other remotes that can 294 # check if particular file exists much quicker, use their own 295 # implementation of cache_exists(see http, local). 296 # 297 # Result of all() might be way too big, so we should walk through 298 # it in one pass. 299 return list(filter(lambda checksum: checksum in checksums, self.all())) 300 301 def already_cached(self, path_info): 302 current = self.save_info(path_info)[self.PARAM_CHECKSUM] 303 304 if not current: 305 return False 306 307 return not self.changed_cache(current) 308 309 def safe_remove(self, path_info, force=False): 310 if not self.exists(path_info): 311 return 312 313 if not force and not self.already_cached(path_info): 314 msg = ( 315 "file '{}' is going to be removed." 316 " Are you sure you want to proceed?".format(str(path_info)) 317 ) 318 319 if not prompt.confirm(msg): 320 raise ConfirmRemoveError(str(path_info)) 321 322 self.remove(path_info) 323 324 def do_checkout(self, path_info, checksum, force=False): 325 if self.exists(path_info): 326 msg = "data '{}' exists. Removing before checkout." 327 logger.warning(msg.format(str(path_info))) 328 self.safe_remove(path_info, force=force) 329 330 from_info = self.checksum_to_path_info(checksum) 331 self.copy(from_info, path_info) 332 333 def checkout(self, path_info, checksum_info, force=False): 334 scheme = path_info["scheme"] 335 if scheme not in ["", "local"] and scheme != self.scheme: 336 raise NotImplementedError 337 338 checksum = checksum_info.get(self.PARAM_CHECKSUM) 339 if not checksum: 340 msg = "No checksum info for '{}'." 341 logger.info(msg.format(str(path_info))) 342 return 343 344 if not self.changed(path_info, checksum_info): 345 msg = "Data '{}' didn't change." 346 logger.info(msg.format(str(path_info))) 347 return 348 349 if self.changed_cache(checksum): 350 msg = "Cache '{}' not found. File '{}' won't be created." 351 logger.warning(msg.format(checksum, str(path_info))) 352 self.safe_remove(path_info, force=force) 353 return 354 355 msg = "Checking out '{}' with cache '{}'." 356 logger.info(msg.format(str(path_info), checksum)) 357 358 self.do_checkout(path_info, checksum, force=force) 359