1from __future__ import unicode_literals 2 3from dvc.utils.compat import str, makedirs 4 5import os 6import stat 7import uuid 8import json 9import ntpath 10import shutil 11import posixpath 12from operator import itemgetter 13 14import dvc.logger as logger 15from dvc.system import System 16from dvc.remote.base import RemoteBase, STATUS_MAP, STATUS_NEW, STATUS_DELETED 17from dvc.utils import remove, move, copyfile, dict_md5, to_chunks, tmp_fname 18from dvc.utils import LARGE_DIR_SIZE 19from dvc.config import Config 20from dvc.exceptions import DvcException 21from dvc.progress import progress 22from concurrent.futures import ThreadPoolExecutor 23 24from dvc.utils.fs import get_mtime_and_size, get_inode 25 26 27class RemoteLOCAL(RemoteBase): 28 scheme = "local" 29 REGEX = r"^(?P<path>.*)$" 30 PARAM_CHECKSUM = "md5" 31 PARAM_PATH = "path" 32 PARAM_RELPATH = "relpath" 33 MD5_DIR_SUFFIX = ".dir" 34 35 CACHE_TYPES = ["reflink", "hardlink", "symlink", "copy"] 36 CACHE_TYPE_MAP = { 37 "copy": shutil.copyfile, 38 "symlink": System.symlink, 39 "hardlink": System.hardlink, 40 "reflink": System.reflink, 41 } 42 43 def __init__(self, repo, config): 44 super(RemoteLOCAL, self).__init__(repo, config) 45 self.state = self.repo.state if self.repo else None 46 self.protected = config.get(Config.SECTION_CACHE_PROTECTED, False) 47 storagepath = config.get(Config.SECTION_AWS_STORAGEPATH, None) 48 self.cache_dir = config.get(Config.SECTION_REMOTE_URL, storagepath) 49 50 if self.cache_dir is not None and not os.path.isabs(self.cache_dir): 51 cwd = config[Config.PRIVATE_CWD] 52 self.cache_dir = os.path.abspath(os.path.join(cwd, self.cache_dir)) 53 54 types = config.get(Config.SECTION_CACHE_TYPE, None) 55 if types: 56 if isinstance(types, str): 57 types = [t.strip() for t in types.split(",")] 58 self.cache_types = types 59 else: 60 self.cache_types = self.CACHE_TYPES 61 62 if self.cache_dir is not None and not os.path.exists(self.cache_dir): 63 os.mkdir(self.cache_dir) 64 65 self.path_info = {"scheme": "local"} 66 67 @staticmethod 68 def compat_config(config): 69 ret = config.copy() 70 url = ret.pop(Config.SECTION_AWS_STORAGEPATH, "") 71 ret[Config.SECTION_REMOTE_URL] = url 72 return ret 73 74 @property 75 def url(self): 76 return self.cache_dir 77 78 @property 79 def prefix(self): 80 return self.cache_dir 81 82 def list_cache_paths(self): 83 clist = [] 84 for entry in os.listdir(self.cache_dir): 85 subdir = os.path.join(self.cache_dir, entry) 86 if not os.path.isdir(subdir): 87 continue 88 89 for cache in os.listdir(subdir): 90 clist.append(os.path.join(subdir, cache)) 91 92 return clist 93 94 def get(self, md5): 95 if not md5: 96 return None 97 98 return os.path.join(self.cache_dir, md5[0:2], md5[2:]) 99 100 def changed_cache_file(self, md5): 101 cache = self.get(md5) 102 if self.state.changed(cache, md5=md5): 103 if os.path.exists(cache): 104 msg = "Corrupted cache file {}." 105 logger.warning(msg.format(os.path.relpath(cache))) 106 remove(cache) 107 return True 108 return False 109 110 def exists(self, path_info): 111 assert not isinstance(path_info, list) 112 assert path_info["scheme"] == "local" 113 return os.path.exists(path_info["path"]) 114 115 def changed_cache(self, md5): 116 cache = self.get(md5) 117 clist = [(cache, md5)] 118 119 while True: 120 if len(clist) == 0: 121 break 122 123 cache, md5 = clist.pop() 124 if self.changed_cache_file(md5): 125 return True 126 127 if self.is_dir_cache(cache) and self._cache_metadata_changed(): 128 for entry in self.load_dir_cache(md5): 129 md5 = entry[self.PARAM_CHECKSUM] 130 cache = self.get(md5) 131 clist.append((cache, md5)) 132 133 return False 134 135 def link(self, cache, path): 136 assert os.path.isfile(cache) 137 138 dname = os.path.dirname(path) 139 if not os.path.exists(dname): 140 os.makedirs(dname) 141 142 # NOTE: just create an empty file for an empty cache 143 if os.path.getsize(cache) == 0: 144 open(path, "w+").close() 145 146 msg = "Created empty file: {} -> {}".format(cache, path) 147 logger.debug(msg) 148 return 149 150 i = len(self.cache_types) 151 while i > 0: 152 try: 153 self.CACHE_TYPE_MAP[self.cache_types[0]](cache, path) 154 155 if self.protected: 156 os.chmod(path, stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH) 157 158 msg = "Created {}'{}': {} -> {}".format( 159 "protected " if self.protected else "", 160 self.cache_types[0], 161 cache, 162 path, 163 ) 164 165 logger.debug(msg) 166 return 167 168 except DvcException as exc: 169 msg = "Cache type '{}' is not supported: {}" 170 logger.debug(msg.format(self.cache_types[0], str(exc))) 171 del self.cache_types[0] 172 i -= 1 173 174 raise DvcException("no possible cache types left to try out.") 175 176 @property 177 def ospath(self): 178 if os.name == "nt": 179 return ntpath 180 return posixpath 181 182 @classmethod 183 def to_ospath(cls, path): 184 if os.name == "nt": 185 return cls.ntpath(path) 186 return cls.unixpath(path) 187 188 @staticmethod 189 def unixpath(path): 190 assert not ntpath.isabs(path) 191 assert not posixpath.isabs(path) 192 return path.replace("\\", "/") 193 194 @staticmethod 195 def ntpath(path): 196 assert not ntpath.isabs(path) 197 assert not posixpath.isabs(path) 198 return path.replace("/", "\\") 199 200 def collect_dir_cache(self, dname): 201 dir_info = [] 202 203 for root, dirs, files in os.walk(str(dname)): 204 bar = False 205 206 if len(files) > LARGE_DIR_SIZE: 207 msg = ( 208 "Computing md5 for a large directory {}. " 209 "This is only done once." 210 ) 211 logger.info(msg.format(os.path.relpath(root))) 212 bar = True 213 title = os.path.relpath(root) 214 processed = 0 215 total = len(files) 216 progress.update_target(title, 0, total) 217 218 for fname in files: 219 path = os.path.join(root, fname) 220 relpath = self.unixpath(os.path.relpath(path, dname)) 221 222 if bar: 223 progress.update_target(title, processed, total) 224 processed += 1 225 226 md5 = self.state.update(path) 227 dir_info.append( 228 {self.PARAM_RELPATH: relpath, self.PARAM_CHECKSUM: md5} 229 ) 230 231 if bar: 232 progress.finish_target(title) 233 234 # NOTE: sorting the list by path to ensure reproducibility 235 dir_info = sorted(dir_info, key=itemgetter(self.PARAM_RELPATH)) 236 237 md5 = dict_md5(dir_info) + self.MD5_DIR_SUFFIX 238 if self.changed_cache_file(md5): 239 self.dump_dir_cache(md5, dir_info) 240 241 return (md5, dir_info) 242 243 def load_dir_cache(self, md5): 244 path = self.get(md5) 245 246 assert self.is_dir_cache(path) 247 248 try: 249 with open(path, "r") as fd: 250 d = json.load(fd) 251 except Exception: 252 msg = "Failed to load dir cache '{}'" 253 logger.error(msg.format(os.path.relpath(path))) 254 return [] 255 256 if not isinstance(d, list): 257 msg = "dir cache file format error '{}' [skipping the file]" 258 logger.error(msg.format(os.path.relpath(path))) 259 return [] 260 261 for info in d: 262 info["relpath"] = self.to_ospath(info["relpath"]) 263 264 return d 265 266 def dump_dir_cache(self, md5, dir_info): 267 path = self.get(md5) 268 dname = os.path.dirname(path) 269 270 assert self.is_dir_cache(path) 271 assert isinstance(dir_info, list) 272 273 if not os.path.isdir(dname): 274 os.makedirs(dname) 275 276 # NOTE: Writing first and renaming after that 277 # to make sure that the operation is atomic. 278 tmp = "{}.{}".format(path, str(uuid.uuid4())) 279 280 with open(tmp, "w+") as fd: 281 json.dump(dir_info, fd, sort_keys=True) 282 move(tmp, path) 283 284 @classmethod 285 def is_dir_cache(cls, cache): 286 return cache.endswith(cls.MD5_DIR_SUFFIX) 287 288 def do_checkout(self, path_info, checksum, force=False): 289 path = path_info["path"] 290 md5 = checksum 291 cache = self.get(md5) 292 293 if not self.is_dir_cache(cache): 294 if os.path.exists(path): 295 self.safe_remove(path_info, force=force) 296 297 self.link(cache, path) 298 self.state.update_link(path) 299 return 300 301 # Create dir separately so that dir is created 302 # even if there are no files in it 303 if not os.path.exists(path): 304 os.makedirs(path) 305 306 dir_info = self.load_dir_cache(md5) 307 dir_relpath = os.path.relpath(path) 308 dir_size = len(dir_info) 309 bar = dir_size > LARGE_DIR_SIZE 310 311 logger.info("Linking directory '{}'.".format(dir_relpath)) 312 313 for processed, entry in enumerate(dir_info): 314 relpath = entry[self.PARAM_RELPATH] 315 m = entry[self.PARAM_CHECKSUM] 316 p = os.path.join(path, relpath) 317 c = self.get(m) 318 319 entry_info = {"scheme": path_info["scheme"], self.PARAM_PATH: p} 320 321 entry_checksum_info = {self.PARAM_CHECKSUM: m} 322 323 if self.changed(entry_info, entry_checksum_info): 324 if os.path.exists(p): 325 self.safe_remove(entry_info, force=force) 326 327 self.link(c, p) 328 329 if bar: 330 progress.update_target(dir_relpath, processed, dir_size) 331 332 self._discard_working_directory_changes(path, dir_info, force=force) 333 334 self.state.update_link(path) 335 336 if bar: 337 progress.finish_target(dir_relpath) 338 339 def already_cached(self, path_info): 340 assert path_info["scheme"] in ["", "local"] 341 342 current_md5 = self.state.update(path_info["path"]) 343 344 if not current_md5: 345 return False 346 347 return not self.changed_cache(current_md5) 348 349 def _discard_working_directory_changes(self, path, dir_info, force=False): 350 working_dir_files = set( 351 os.path.join(root, file) 352 for root, _, files in os.walk(str(path)) 353 for file in files 354 ) 355 356 cached_files = set( 357 os.path.join(path, file["relpath"]) for file in dir_info 358 ) 359 360 delta = working_dir_files - cached_files 361 362 for file in delta: 363 self.safe_remove({"scheme": "local", "path": file}, force=force) 364 365 def _move(self, inp, outp): 366 # moving in two stages to make the whole operation atomic in 367 # case inp and outp are in different filesystems and actual 368 # physical copying of data is happening 369 tmp = "{}.{}".format(outp, str(uuid.uuid4())) 370 move(inp, tmp) 371 move(tmp, outp) 372 373 def _save_file(self, path, md5): 374 assert md5 is not None 375 376 cache = self.get(md5) 377 378 if self.changed_cache(md5): 379 self._move(path, cache) 380 else: 381 remove(path) 382 383 self.link(cache, path) 384 self.state.update_link(path) 385 386 # we need to update path and cache, since in case of reflink, 387 # or copy cache type moving original file results in updates on 388 # next executed command, which causes md5 recalculation 389 self.state.update(path, md5) 390 self.state.update(cache, md5) 391 392 return {self.PARAM_CHECKSUM: md5} 393 394 def _save_dir(self, path, md5): 395 dir_info = self.load_dir_cache(md5) 396 dir_relpath = os.path.relpath(path) 397 dir_size = len(dir_info) 398 bar = dir_size > LARGE_DIR_SIZE 399 400 logger.info("Linking directory '{}'.".format(dir_relpath)) 401 402 for processed, entry in enumerate(dir_info): 403 relpath = entry[self.PARAM_RELPATH] 404 m = entry[self.PARAM_CHECKSUM] 405 p = os.path.join(path, relpath) 406 c = self.get(m) 407 408 if self.changed_cache(m): 409 self._move(p, c) 410 else: 411 remove(p) 412 413 self.link(c, p) 414 415 self.state.update(p, m) 416 self.state.update(c, m) 417 418 if bar: 419 progress.update_target(dir_relpath, processed, dir_size) 420 421 self.state.update_link(path) 422 423 cache = self.get(md5) 424 self.state.update(cache) 425 self.state.update(path, md5) 426 427 if bar: 428 progress.finish_target(dir_relpath) 429 430 def save(self, path_info, checksum_info): 431 if path_info["scheme"] != "local": 432 raise NotImplementedError 433 434 path = path_info["path"] 435 436 msg = "Saving '{}' to cache '{}'." 437 logger.info( 438 msg.format(os.path.relpath(path), os.path.relpath(self.cache_dir)) 439 ) 440 441 md5 = checksum_info[self.PARAM_CHECKSUM] 442 if os.path.isdir(path): 443 self._save_dir(path, md5) 444 else: 445 self._save_file(path, md5) 446 447 def save_info(self, path_info): 448 if path_info["scheme"] != "local": 449 raise NotImplementedError 450 451 return {self.PARAM_CHECKSUM: self.state.update(path_info["path"])} 452 453 def remove(self, path_info): 454 if path_info["scheme"] != "local": 455 raise NotImplementedError 456 457 remove(path_info["path"]) 458 459 def move(self, from_info, to_info): 460 if from_info["scheme"] != "local" or to_info["scheme"] != "local": 461 raise NotImplementedError 462 463 move(from_info["path"], to_info["path"]) 464 465 def cache_exists(self, md5s): 466 assert isinstance(md5s, list) 467 return list(filter(lambda md5: not self.changed_cache_file(md5), md5s)) 468 469 def upload(self, from_infos, to_infos, names=None): 470 names = self._verify_path_args(to_infos, from_infos, names) 471 472 for from_info, to_info, name in zip(from_infos, to_infos, names): 473 if to_info["scheme"] != "local": 474 raise NotImplementedError 475 476 if from_info["scheme"] != "local": 477 raise NotImplementedError 478 479 logger.debug( 480 "Uploading '{}' to '{}'".format( 481 from_info["path"], to_info["path"] 482 ) 483 ) 484 485 if not name: 486 name = os.path.basename(from_info["path"]) 487 488 makedirs(os.path.dirname(to_info["path"]), exist_ok=True) 489 490 try: 491 copyfile(from_info["path"], to_info["path"], name=name) 492 except Exception: 493 logger.error( 494 "failed to upload '{}' to '{}'".format( 495 from_info["path"], to_info["path"] 496 ) 497 ) 498 499 def download( 500 self, 501 from_infos, 502 to_infos, 503 no_progress_bar=False, 504 names=None, 505 resume=False, 506 ): 507 names = self._verify_path_args(from_infos, to_infos, names) 508 509 for to_info, from_info, name in zip(to_infos, from_infos, names): 510 if from_info["scheme"] != "local": 511 raise NotImplementedError 512 513 if to_info["scheme"] != "local": 514 raise NotImplementedError 515 516 logger.debug( 517 "Downloading '{}' to '{}'".format( 518 from_info["path"], to_info["path"] 519 ) 520 ) 521 522 if not name: 523 name = os.path.basename(to_info["path"]) 524 525 makedirs(os.path.dirname(to_info["path"]), exist_ok=True) 526 tmp_file = tmp_fname(to_info["path"]) 527 try: 528 copyfile( 529 from_info["path"], 530 tmp_file, 531 no_progress_bar=no_progress_bar, 532 name=name, 533 ) 534 except Exception: 535 logger.error( 536 "failed to download '{}' to '{}'".format( 537 from_info["path"], to_info["path"] 538 ) 539 ) 540 541 continue 542 543 os.rename(tmp_file, to_info["path"]) 544 545 def _group(self, checksum_infos, show_checksums=False): 546 by_md5 = {} 547 548 for info in checksum_infos: 549 md5 = info[self.PARAM_CHECKSUM] 550 551 if show_checksums: 552 by_md5[md5] = {"name": md5} 553 continue 554 555 name = info[self.PARAM_PATH] 556 branch = info.get("branch") 557 if branch: 558 name += "({})".format(branch) 559 560 if md5 not in by_md5.keys(): 561 by_md5[md5] = {"name": name} 562 else: 563 by_md5[md5]["name"] += " " + name 564 565 return by_md5 566 567 def status(self, checksum_infos, remote, jobs=None, show_checksums=False): 568 logger.info("Preparing to collect status from {}".format(remote.url)) 569 title = "Collecting information" 570 571 ret = {} 572 573 progress.set_n_total(1) 574 progress.update_target(title, 0, 100) 575 576 progress.update_target(title, 10, 100) 577 578 ret = self._group(checksum_infos, show_checksums=show_checksums) 579 md5s = list(ret.keys()) 580 581 progress.update_target(title, 30, 100) 582 583 remote_exists = list(remote.cache_exists(md5s)) 584 585 progress.update_target(title, 90, 100) 586 587 local_exists = self.cache_exists(md5s) 588 589 progress.finish_target(title) 590 591 for md5, info in ret.items(): 592 info["status"] = STATUS_MAP[ 593 (md5 in local_exists, md5 in remote_exists) 594 ] 595 596 return ret 597 598 def _get_chunks(self, download, remote, status_info, status, jobs): 599 title = "Analysing status." 600 601 progress.set_n_total(1) 602 total = len(status_info) 603 current = 0 604 605 cache = [] 606 path_infos = [] 607 names = [] 608 for md5, info in status_info.items(): 609 if info["status"] == status: 610 cache.append(self.checksum_to_path_info(md5)) 611 path_infos.append(remote.checksum_to_path_info(md5)) 612 names.append(info["name"]) 613 current += 1 614 progress.update_target(title, current, total) 615 616 progress.finish_target(title) 617 618 progress.set_n_total(len(names)) 619 620 if download: 621 to_infos = cache 622 from_infos = path_infos 623 else: 624 to_infos = path_infos 625 from_infos = cache 626 627 return list( 628 zip( 629 to_chunks(from_infos, jobs), 630 to_chunks(to_infos, jobs), 631 to_chunks(names, jobs), 632 ) 633 ) 634 635 def _process( 636 self, 637 checksum_infos, 638 remote, 639 jobs=None, 640 show_checksums=False, 641 download=False, 642 ): 643 msg = "Preparing to {} data {} '{}'" 644 logger.info( 645 msg.format( 646 "download" if download else "upload", 647 "from" if download else "to", 648 remote.url, 649 ) 650 ) 651 652 if download: 653 func = remote.download 654 status = STATUS_DELETED 655 else: 656 func = remote.upload 657 status = STATUS_NEW 658 659 if jobs is None: 660 jobs = remote.JOBS 661 662 status_info = self.status( 663 checksum_infos, remote, jobs=jobs, show_checksums=show_checksums 664 ) 665 666 chunks = self._get_chunks(download, remote, status_info, status, jobs) 667 668 if len(chunks) == 0: 669 return 0 670 671 futures = [] 672 with ThreadPoolExecutor(max_workers=jobs) as executor: 673 for from_infos, to_infos, names in chunks: 674 res = executor.submit(func, from_infos, to_infos, names=names) 675 futures.append(res) 676 677 for f in futures: 678 f.result() 679 680 return len(chunks) 681 682 def push(self, checksum_infos, remote, jobs=None, show_checksums=False): 683 return self._process( 684 checksum_infos, 685 remote, 686 jobs=jobs, 687 show_checksums=show_checksums, 688 download=False, 689 ) 690 691 def pull(self, checksum_infos, remote, jobs=None, show_checksums=False): 692 return self._process( 693 checksum_infos, 694 remote, 695 jobs=jobs, 696 show_checksums=show_checksums, 697 download=True, 698 ) 699 700 def _cache_metadata_changed(self): 701 mtime, size = get_mtime_and_size(self.cache_dir) 702 inode = get_inode(self.cache_dir) 703 704 existing_record = self.state.get_state_record_for_inode(inode) 705 706 if existing_record: 707 cached_mtime, cached_size, _, _ = existing_record 708 return not (mtime == cached_mtime and size == cached_size) 709 710 return True 711