1// Package sync is the implementation of sync/copy/move 2package sync 3 4import ( 5 "context" 6 "fmt" 7 "path" 8 "sort" 9 "strings" 10 "sync" 11 "time" 12 13 "github.com/pkg/errors" 14 "github.com/rclone/rclone/fs" 15 "github.com/rclone/rclone/fs/accounting" 16 "github.com/rclone/rclone/fs/filter" 17 "github.com/rclone/rclone/fs/fserrors" 18 "github.com/rclone/rclone/fs/hash" 19 "github.com/rclone/rclone/fs/march" 20 "github.com/rclone/rclone/fs/operations" 21) 22 23type syncCopyMove struct { 24 // parameters 25 fdst fs.Fs 26 fsrc fs.Fs 27 deleteMode fs.DeleteMode // how we are doing deletions 28 DoMove bool 29 copyEmptySrcDirs bool 30 deleteEmptySrcDirs bool 31 dir string 32 // internal state 33 ci *fs.ConfigInfo // global config 34 fi *filter.Filter // filter config 35 ctx context.Context // internal context for controlling go-routines 36 cancel func() // cancel the context 37 inCtx context.Context // internal context for controlling march 38 inCancel func() // cancel the march context 39 noTraverse bool // if set don't traverse the dst 40 noCheckDest bool // if set transfer all objects regardless without checking dst 41 noUnicodeNormalization bool // don't normalize unicode characters in filenames 42 deletersWg sync.WaitGroup // for delete before go routine 43 deleteFilesCh chan fs.Object // channel to receive deletes if delete before 44 trackRenames bool // set if we should do server-side renames 45 trackRenamesStrategy trackRenamesStrategy // strategies used for tracking renames 46 dstFilesMu sync.Mutex // protect dstFiles 47 dstFiles map[string]fs.Object // dst files, always filled 48 srcFiles map[string]fs.Object // src files, only used if deleteBefore 49 srcFilesChan chan fs.Object // passes src objects 50 srcFilesResult chan error // error result of src listing 51 dstFilesResult chan error // error result of dst listing 52 dstEmptyDirsMu sync.Mutex // protect dstEmptyDirs 53 dstEmptyDirs map[string]fs.DirEntry // potentially empty directories 54 srcEmptyDirsMu sync.Mutex // protect srcEmptyDirs 55 srcEmptyDirs map[string]fs.DirEntry // potentially empty directories 56 checkerWg sync.WaitGroup // wait for checkers 57 toBeChecked *pipe // checkers channel 58 transfersWg sync.WaitGroup // wait for transfers 59 toBeUploaded *pipe // copiers channel 60 errorMu sync.Mutex // Mutex covering the errors variables 61 err error // normal error from copy process 62 noRetryErr error // error with NoRetry set 63 fatalErr error // fatal error 64 commonHash hash.Type // common hash type between src and dst 65 modifyWindow time.Duration // modify window between fsrc, fdst 66 renameMapMu sync.Mutex // mutex to protect the below 67 renameMap map[string][]fs.Object // dst files by hash - only used by trackRenames 68 renamerWg sync.WaitGroup // wait for renamers 69 toBeRenamed *pipe // renamers channel 70 trackRenamesWg sync.WaitGroup // wg for background track renames 71 trackRenamesCh chan fs.Object // objects are pumped in here 72 renameCheck []fs.Object // accumulate files to check for rename here 73 compareCopyDest []fs.Fs // place to check for files to server side copy 74 backupDir fs.Fs // place to store overwrites/deletes 75 checkFirst bool // if set run all the checkers before starting transfers 76} 77 78type trackRenamesStrategy byte 79 80const ( 81 trackRenamesStrategyHash trackRenamesStrategy = 1 << iota 82 trackRenamesStrategyModtime 83 trackRenamesStrategyLeaf 84) 85 86func (strategy trackRenamesStrategy) hash() bool { 87 return (strategy & trackRenamesStrategyHash) != 0 88} 89 90func (strategy trackRenamesStrategy) modTime() bool { 91 return (strategy & trackRenamesStrategyModtime) != 0 92} 93 94func (strategy trackRenamesStrategy) leaf() bool { 95 return (strategy & trackRenamesStrategyLeaf) != 0 96} 97 98func newSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, deleteEmptySrcDirs bool, copyEmptySrcDirs bool) (*syncCopyMove, error) { 99 if (deleteMode != fs.DeleteModeOff || DoMove) && operations.Overlapping(fdst, fsrc) { 100 return nil, fserrors.FatalError(fs.ErrorOverlapping) 101 } 102 ci := fs.GetConfig(ctx) 103 fi := filter.GetConfig(ctx) 104 s := &syncCopyMove{ 105 ci: ci, 106 fi: fi, 107 fdst: fdst, 108 fsrc: fsrc, 109 deleteMode: deleteMode, 110 DoMove: DoMove, 111 copyEmptySrcDirs: copyEmptySrcDirs, 112 deleteEmptySrcDirs: deleteEmptySrcDirs, 113 dir: "", 114 srcFilesChan: make(chan fs.Object, ci.Checkers+ci.Transfers), 115 srcFilesResult: make(chan error, 1), 116 dstFilesResult: make(chan error, 1), 117 dstEmptyDirs: make(map[string]fs.DirEntry), 118 srcEmptyDirs: make(map[string]fs.DirEntry), 119 noTraverse: ci.NoTraverse, 120 noCheckDest: ci.NoCheckDest, 121 noUnicodeNormalization: ci.NoUnicodeNormalization, 122 deleteFilesCh: make(chan fs.Object, ci.Checkers), 123 trackRenames: ci.TrackRenames, 124 commonHash: fsrc.Hashes().Overlap(fdst.Hashes()).GetOne(), 125 modifyWindow: fs.GetModifyWindow(ctx, fsrc, fdst), 126 trackRenamesCh: make(chan fs.Object, ci.Checkers), 127 checkFirst: ci.CheckFirst, 128 } 129 backlog := ci.MaxBacklog 130 if s.checkFirst { 131 fs.Infof(s.fdst, "Running all checks before starting transfers") 132 backlog = -1 133 } 134 var err error 135 s.toBeChecked, err = newPipe(ci.OrderBy, accounting.Stats(ctx).SetCheckQueue, backlog) 136 if err != nil { 137 return nil, err 138 } 139 s.toBeUploaded, err = newPipe(ci.OrderBy, accounting.Stats(ctx).SetTransferQueue, backlog) 140 if err != nil { 141 return nil, err 142 } 143 s.toBeRenamed, err = newPipe(ci.OrderBy, accounting.Stats(ctx).SetRenameQueue, backlog) 144 if err != nil { 145 return nil, err 146 } 147 // If a max session duration has been defined add a deadline to the context 148 if ci.MaxDuration > 0 { 149 endTime := time.Now().Add(ci.MaxDuration) 150 fs.Infof(s.fdst, "Transfer session deadline: %s", endTime.Format("2006/01/02 15:04:05")) 151 s.ctx, s.cancel = context.WithDeadline(ctx, endTime) 152 } else { 153 s.ctx, s.cancel = context.WithCancel(ctx) 154 } 155 // Input context - cancel this for graceful stop 156 s.inCtx, s.inCancel = context.WithCancel(s.ctx) 157 if s.noTraverse && s.deleteMode != fs.DeleteModeOff { 158 if !fi.HaveFilesFrom() { 159 fs.Errorf(nil, "Ignoring --no-traverse with sync") 160 } 161 s.noTraverse = false 162 } 163 s.trackRenamesStrategy, err = parseTrackRenamesStrategy(ci.TrackRenamesStrategy) 164 if err != nil { 165 return nil, err 166 } 167 if s.noCheckDest { 168 if s.deleteMode != fs.DeleteModeOff { 169 return nil, errors.New("can't use --no-check-dest with sync: use copy instead") 170 } 171 if ci.Immutable { 172 return nil, errors.New("can't use --no-check-dest with --immutable") 173 } 174 if s.backupDir != nil { 175 return nil, errors.New("can't use --no-check-dest with --backup-dir") 176 } 177 } 178 if s.trackRenames { 179 // Don't track renames for remotes without server-side move support. 180 if !operations.CanServerSideMove(fdst) { 181 fs.Errorf(fdst, "Ignoring --track-renames as the destination does not support server-side move or copy") 182 s.trackRenames = false 183 } 184 if s.trackRenamesStrategy.hash() && s.commonHash == hash.None { 185 fs.Errorf(fdst, "Ignoring --track-renames as the source and destination do not have a common hash") 186 s.trackRenames = false 187 } 188 189 if s.trackRenamesStrategy.modTime() && s.modifyWindow == fs.ModTimeNotSupported { 190 fs.Errorf(fdst, "Ignoring --track-renames as either the source or destination do not support modtime") 191 s.trackRenames = false 192 } 193 194 if s.deleteMode == fs.DeleteModeOff { 195 fs.Errorf(fdst, "Ignoring --track-renames as it doesn't work with copy or move, only sync") 196 s.trackRenames = false 197 } 198 } 199 if s.trackRenames { 200 // track renames needs delete after 201 if s.deleteMode != fs.DeleteModeOff { 202 s.deleteMode = fs.DeleteModeAfter 203 } 204 if s.noTraverse { 205 fs.Errorf(nil, "Ignoring --no-traverse with --track-renames") 206 s.noTraverse = false 207 } 208 } 209 // Make Fs for --backup-dir if required 210 if ci.BackupDir != "" || ci.Suffix != "" { 211 var err error 212 s.backupDir, err = operations.BackupDir(ctx, fdst, fsrc, "") 213 if err != nil { 214 return nil, err 215 } 216 } 217 if len(ci.CompareDest) > 0 { 218 var err error 219 s.compareCopyDest, err = operations.GetCompareDest(ctx) 220 if err != nil { 221 return nil, err 222 } 223 } else if len(ci.CopyDest) > 0 { 224 var err error 225 s.compareCopyDest, err = operations.GetCopyDest(ctx, fdst) 226 if err != nil { 227 return nil, err 228 } 229 } 230 return s, nil 231} 232 233// Check to see if the context has been cancelled 234func (s *syncCopyMove) aborting() bool { 235 return s.ctx.Err() != nil 236} 237 238// This reads the map and pumps it into the channel passed in, closing 239// the channel at the end 240func (s *syncCopyMove) pumpMapToChan(files map[string]fs.Object, out chan<- fs.Object) { 241outer: 242 for _, o := range files { 243 if s.aborting() { 244 break outer 245 } 246 select { 247 case out <- o: 248 case <-s.ctx.Done(): 249 break outer 250 } 251 } 252 close(out) 253 s.srcFilesResult <- nil 254} 255 256// This checks the types of errors returned while copying files 257func (s *syncCopyMove) processError(err error) { 258 if err == nil { 259 return 260 } 261 if err == context.DeadlineExceeded { 262 err = fserrors.NoRetryError(err) 263 } else if err == accounting.ErrorMaxTransferLimitReachedGraceful { 264 if s.inCtx.Err() == nil { 265 fs.Logf(nil, "%v - stopping transfers", err) 266 // Cancel the march and stop the pipes 267 s.inCancel() 268 } 269 } else if err == context.Canceled && s.inCtx.Err() != nil { 270 // Ignore context Canceled if we have called s.inCancel() 271 return 272 } 273 s.errorMu.Lock() 274 defer s.errorMu.Unlock() 275 switch { 276 case fserrors.IsFatalError(err): 277 if !s.aborting() { 278 fs.Errorf(nil, "Cancelling sync due to fatal error: %v", err) 279 s.cancel() 280 } 281 s.fatalErr = err 282 case fserrors.IsNoRetryError(err): 283 s.noRetryErr = err 284 default: 285 s.err = err 286 } 287} 288 289// Returns the current error (if any) in the order of precedence 290// fatalErr 291// normal error 292// noRetryErr 293func (s *syncCopyMove) currentError() error { 294 s.errorMu.Lock() 295 defer s.errorMu.Unlock() 296 if s.fatalErr != nil { 297 return s.fatalErr 298 } 299 if s.err != nil { 300 return s.err 301 } 302 return s.noRetryErr 303} 304 305// pairChecker reads Objects~s on in send to out if they need transferring. 306// 307// FIXME potentially doing lots of hashes at once 308func (s *syncCopyMove) pairChecker(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { 309 defer wg.Done() 310 for { 311 pair, ok := in.GetMax(s.inCtx, fraction) 312 if !ok { 313 return 314 } 315 src := pair.Src 316 var err error 317 tr := accounting.Stats(s.ctx).NewCheckingTransfer(src) 318 // Check to see if can store this 319 if src.Storable() { 320 NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, pair.Dst, pair.Src, s.compareCopyDest, s.backupDir) 321 if err != nil { 322 s.processError(err) 323 } 324 if !NoNeedTransfer && operations.NeedTransfer(s.ctx, pair.Dst, pair.Src) { 325 // If files are treated as immutable, fail if destination exists and does not match 326 if s.ci.Immutable && pair.Dst != nil { 327 err := fs.CountError(fserrors.NoRetryError(fs.ErrorImmutableModified)) 328 fs.Errorf(pair.Dst, "Source and destination exist but do not match: %v", err) 329 s.processError(err) 330 } else { 331 // If destination already exists, then we must move it into --backup-dir if required 332 if pair.Dst != nil && s.backupDir != nil { 333 err := operations.MoveBackupDir(s.ctx, s.backupDir, pair.Dst) 334 if err != nil { 335 s.processError(err) 336 } else { 337 // If successful zero out the dst as it is no longer there and copy the file 338 pair.Dst = nil 339 ok = out.Put(s.ctx, pair) 340 if !ok { 341 return 342 } 343 } 344 } else { 345 ok = out.Put(s.ctx, pair) 346 if !ok { 347 return 348 } 349 } 350 } 351 } else { 352 // If moving need to delete the files we don't need to copy 353 if s.DoMove { 354 // Delete src if no error on copy 355 if operations.SameObject(src, pair.Dst) { 356 fs.Logf(src, "Not removing source file as it is the same file as the destination") 357 } else if s.ci.IgnoreExisting { 358 fs.Debugf(src, "Not removing source file as destination file exists and --ignore-existing is set") 359 } else { 360 s.processError(operations.DeleteFile(s.ctx, src)) 361 } 362 } 363 } 364 } 365 tr.Done(s.ctx, err) 366 } 367} 368 369// pairRenamer reads Objects~s on in and attempts to rename them, 370// otherwise it sends them out if they need transferring. 371func (s *syncCopyMove) pairRenamer(in *pipe, out *pipe, fraction int, wg *sync.WaitGroup) { 372 defer wg.Done() 373 for { 374 pair, ok := in.GetMax(s.inCtx, fraction) 375 if !ok { 376 return 377 } 378 src := pair.Src 379 if !s.tryRename(src) { 380 // pass on if not renamed 381 ok = out.Put(s.ctx, pair) 382 if !ok { 383 return 384 } 385 } 386 } 387} 388 389// pairCopyOrMove reads Objects on in and moves or copies them. 390func (s *syncCopyMove) pairCopyOrMove(ctx context.Context, in *pipe, fdst fs.Fs, fraction int, wg *sync.WaitGroup) { 391 defer wg.Done() 392 var err error 393 for { 394 pair, ok := in.GetMax(s.inCtx, fraction) 395 if !ok { 396 return 397 } 398 src := pair.Src 399 if s.DoMove { 400 _, err = operations.Move(ctx, fdst, pair.Dst, src.Remote(), src) 401 } else { 402 _, err = operations.Copy(ctx, fdst, pair.Dst, src.Remote(), src) 403 } 404 s.processError(err) 405 } 406} 407 408// This starts the background checkers. 409func (s *syncCopyMove) startCheckers() { 410 s.checkerWg.Add(s.ci.Checkers) 411 for i := 0; i < s.ci.Checkers; i++ { 412 fraction := (100 * i) / s.ci.Checkers 413 go s.pairChecker(s.toBeChecked, s.toBeUploaded, fraction, &s.checkerWg) 414 } 415} 416 417// This stops the background checkers 418func (s *syncCopyMove) stopCheckers() { 419 s.toBeChecked.Close() 420 fs.Debugf(s.fdst, "Waiting for checks to finish") 421 s.checkerWg.Wait() 422} 423 424// This starts the background transfers 425func (s *syncCopyMove) startTransfers() { 426 s.transfersWg.Add(s.ci.Transfers) 427 for i := 0; i < s.ci.Transfers; i++ { 428 fraction := (100 * i) / s.ci.Transfers 429 go s.pairCopyOrMove(s.ctx, s.toBeUploaded, s.fdst, fraction, &s.transfersWg) 430 } 431} 432 433// This stops the background transfers 434func (s *syncCopyMove) stopTransfers() { 435 s.toBeUploaded.Close() 436 fs.Debugf(s.fdst, "Waiting for transfers to finish") 437 s.transfersWg.Wait() 438} 439 440// This starts the background renamers. 441func (s *syncCopyMove) startRenamers() { 442 if !s.trackRenames { 443 return 444 } 445 s.renamerWg.Add(s.ci.Checkers) 446 for i := 0; i < s.ci.Checkers; i++ { 447 fraction := (100 * i) / s.ci.Checkers 448 go s.pairRenamer(s.toBeRenamed, s.toBeUploaded, fraction, &s.renamerWg) 449 } 450} 451 452// This stops the background renamers 453func (s *syncCopyMove) stopRenamers() { 454 if !s.trackRenames { 455 return 456 } 457 s.toBeRenamed.Close() 458 fs.Debugf(s.fdst, "Waiting for renames to finish") 459 s.renamerWg.Wait() 460} 461 462// This starts the collection of possible renames 463func (s *syncCopyMove) startTrackRenames() { 464 if !s.trackRenames { 465 return 466 } 467 s.trackRenamesWg.Add(1) 468 go func() { 469 defer s.trackRenamesWg.Done() 470 for o := range s.trackRenamesCh { 471 s.renameCheck = append(s.renameCheck, o) 472 } 473 }() 474} 475 476// This stops the background rename collection 477func (s *syncCopyMove) stopTrackRenames() { 478 if !s.trackRenames { 479 return 480 } 481 close(s.trackRenamesCh) 482 s.trackRenamesWg.Wait() 483} 484 485// This starts the background deletion of files for --delete-during 486func (s *syncCopyMove) startDeleters() { 487 if s.deleteMode != fs.DeleteModeDuring && s.deleteMode != fs.DeleteModeOnly { 488 return 489 } 490 s.deletersWg.Add(1) 491 go func() { 492 defer s.deletersWg.Done() 493 err := operations.DeleteFilesWithBackupDir(s.ctx, s.deleteFilesCh, s.backupDir) 494 s.processError(err) 495 }() 496} 497 498// This stops the background deleters 499func (s *syncCopyMove) stopDeleters() { 500 if s.deleteMode != fs.DeleteModeDuring && s.deleteMode != fs.DeleteModeOnly { 501 return 502 } 503 close(s.deleteFilesCh) 504 s.deletersWg.Wait() 505} 506 507// This deletes the files in the dstFiles map. If checkSrcMap is set 508// then it checks to see if they exist first in srcFiles the source 509// file map, otherwise it unconditionally deletes them. If 510// checkSrcMap is clear then it assumes that the any source files that 511// have been found have been removed from dstFiles already. 512func (s *syncCopyMove) deleteFiles(checkSrcMap bool) error { 513 if accounting.Stats(s.ctx).Errored() && !s.ci.IgnoreErrors { 514 fs.Errorf(s.fdst, "%v", fs.ErrorNotDeleting) 515 return fs.ErrorNotDeleting 516 } 517 518 // Delete the spare files 519 toDelete := make(fs.ObjectsChan, s.ci.Transfers) 520 go func() { 521 outer: 522 for remote, o := range s.dstFiles { 523 if checkSrcMap { 524 _, exists := s.srcFiles[remote] 525 if exists { 526 continue 527 } 528 } 529 if s.aborting() { 530 break 531 } 532 select { 533 case <-s.ctx.Done(): 534 break outer 535 case toDelete <- o: 536 } 537 } 538 close(toDelete) 539 }() 540 return operations.DeleteFilesWithBackupDir(s.ctx, toDelete, s.backupDir) 541} 542 543// This deletes the empty directories in the slice passed in. It 544// ignores any errors deleting directories 545func (s *syncCopyMove) deleteEmptyDirectories(ctx context.Context, f fs.Fs, entriesMap map[string]fs.DirEntry) error { 546 if len(entriesMap) == 0 { 547 return nil 548 } 549 if accounting.Stats(ctx).Errored() && !s.ci.IgnoreErrors { 550 fs.Errorf(f, "%v", fs.ErrorNotDeletingDirs) 551 return fs.ErrorNotDeletingDirs 552 } 553 554 var entries fs.DirEntries 555 for _, entry := range entriesMap { 556 entries = append(entries, entry) 557 } 558 // Now delete the empty directories starting from the longest path 559 sort.Sort(entries) 560 var errorCount int 561 var okCount int 562 for i := len(entries) - 1; i >= 0; i-- { 563 entry := entries[i] 564 dir, ok := entry.(fs.Directory) 565 if ok { 566 // TryRmdir only deletes empty directories 567 err := operations.TryRmdir(ctx, f, dir.Remote()) 568 if err != nil { 569 fs.Debugf(fs.LogDirName(f, dir.Remote()), "Failed to Rmdir: %v", err) 570 errorCount++ 571 } else { 572 okCount++ 573 } 574 } else { 575 fs.Errorf(f, "Not a directory: %v", entry) 576 } 577 } 578 if errorCount > 0 { 579 fs.Debugf(f, "failed to delete %d directories", errorCount) 580 } 581 if okCount > 0 { 582 fs.Debugf(f, "deleted %d directories", okCount) 583 } 584 return nil 585} 586 587// This copies the empty directories in the slice passed in and logs 588// any errors copying the directories 589func copyEmptyDirectories(ctx context.Context, f fs.Fs, entries map[string]fs.DirEntry) error { 590 if len(entries) == 0 { 591 return nil 592 } 593 594 var okCount int 595 for _, entry := range entries { 596 dir, ok := entry.(fs.Directory) 597 if ok { 598 err := operations.Mkdir(ctx, f, dir.Remote()) 599 if err != nil { 600 fs.Errorf(fs.LogDirName(f, dir.Remote()), "Failed to Mkdir: %v", err) 601 } else { 602 okCount++ 603 } 604 } else { 605 fs.Errorf(f, "Not a directory: %v", entry) 606 } 607 } 608 609 if accounting.Stats(ctx).Errored() { 610 fs.Debugf(f, "failed to copy %d directories", accounting.Stats(ctx).GetErrors()) 611 } 612 613 if okCount > 0 { 614 fs.Debugf(f, "copied %d directories", okCount) 615 } 616 return nil 617} 618 619func (s *syncCopyMove) srcParentDirCheck(entry fs.DirEntry) { 620 // If we are moving files then we don't want to remove directories with files in them 621 // from the srcEmptyDirs as we are about to move them making the directory empty. 622 if s.DoMove { 623 return 624 } 625 parentDir := path.Dir(entry.Remote()) 626 if parentDir == "." { 627 parentDir = "" 628 } 629 delete(s.srcEmptyDirs, parentDir) 630} 631 632// parseTrackRenamesStrategy turns a config string into a trackRenamesStrategy 633func parseTrackRenamesStrategy(strategies string) (strategy trackRenamesStrategy, err error) { 634 if len(strategies) == 0 { 635 return strategy, nil 636 } 637 for _, s := range strings.Split(strategies, ",") { 638 switch s { 639 case "hash": 640 strategy |= trackRenamesStrategyHash 641 case "modtime": 642 strategy |= trackRenamesStrategyModtime 643 case "leaf": 644 strategy |= trackRenamesStrategyLeaf 645 case "size": 646 // ignore 647 default: 648 return strategy, errors.Errorf("unknown track renames strategy %q", s) 649 } 650 } 651 return strategy, nil 652} 653 654// renameID makes a string with the size and the other identifiers of the requested rename strategies 655// 656// it may return an empty string in which case no hash could be made 657func (s *syncCopyMove) renameID(obj fs.Object, renamesStrategy trackRenamesStrategy, precision time.Duration) string { 658 var builder strings.Builder 659 660 fmt.Fprintf(&builder, "%d", obj.Size()) 661 662 if renamesStrategy.hash() { 663 var err error 664 hash, err := obj.Hash(s.ctx, s.commonHash) 665 666 if err != nil { 667 fs.Debugf(obj, "Hash failed: %v", err) 668 return "" 669 } 670 if hash == "" { 671 return "" 672 } 673 674 builder.WriteRune(',') 675 builder.WriteString(hash) 676 } 677 678 // for renamesStrategy.modTime() we don't add to the hash but we check the times in 679 // popRenameMap 680 681 if renamesStrategy.leaf() { 682 builder.WriteRune(',') 683 builder.WriteString(path.Base(obj.Remote())) 684 } 685 686 return builder.String() 687} 688 689// pushRenameMap adds the object with hash to the rename map 690func (s *syncCopyMove) pushRenameMap(hash string, obj fs.Object) { 691 s.renameMapMu.Lock() 692 s.renameMap[hash] = append(s.renameMap[hash], obj) 693 s.renameMapMu.Unlock() 694} 695 696// popRenameMap finds the object with hash and pop the first match from 697// renameMap or returns nil if not found. 698func (s *syncCopyMove) popRenameMap(hash string, src fs.Object) (dst fs.Object) { 699 s.renameMapMu.Lock() 700 defer s.renameMapMu.Unlock() 701 dsts, ok := s.renameMap[hash] 702 if ok && len(dsts) > 0 { 703 // Element to remove 704 i := 0 705 706 // If using track renames strategy modtime then we need to check the modtimes here 707 if s.trackRenamesStrategy.modTime() { 708 i = -1 709 srcModTime := src.ModTime(s.ctx) 710 for j, dst := range dsts { 711 dstModTime := dst.ModTime(s.ctx) 712 dt := dstModTime.Sub(srcModTime) 713 if dt < s.modifyWindow && dt > -s.modifyWindow { 714 i = j 715 break 716 } 717 } 718 // If nothing matched then return nil 719 if i < 0 { 720 return nil 721 } 722 } 723 724 // Remove the entry and return it 725 dst = dsts[i] 726 dsts = append(dsts[:i], dsts[i+1:]...) 727 if len(dsts) > 0 { 728 s.renameMap[hash] = dsts 729 } else { 730 delete(s.renameMap, hash) 731 } 732 } 733 return dst 734} 735 736// makeRenameMap builds a map of the destination files by hash that 737// match sizes in the slice of objects in s.renameCheck 738func (s *syncCopyMove) makeRenameMap() { 739 fs.Infof(s.fdst, "Making map for --track-renames") 740 741 // first make a map of possible sizes we need to check 742 possibleSizes := map[int64]struct{}{} 743 for _, obj := range s.renameCheck { 744 possibleSizes[obj.Size()] = struct{}{} 745 } 746 747 // pump all the dstFiles into in 748 in := make(chan fs.Object, s.ci.Checkers) 749 go s.pumpMapToChan(s.dstFiles, in) 750 751 // now make a map of size,hash for all dstFiles 752 s.renameMap = make(map[string][]fs.Object) 753 var wg sync.WaitGroup 754 wg.Add(s.ci.Transfers) 755 for i := 0; i < s.ci.Transfers; i++ { 756 go func() { 757 defer wg.Done() 758 for obj := range in { 759 // only create hash for dst fs.Object if its size could match 760 if _, found := possibleSizes[obj.Size()]; found { 761 tr := accounting.Stats(s.ctx).NewCheckingTransfer(obj) 762 hash := s.renameID(obj, s.trackRenamesStrategy, s.modifyWindow) 763 764 if hash != "" { 765 s.pushRenameMap(hash, obj) 766 } 767 768 tr.Done(s.ctx, nil) 769 } 770 } 771 }() 772 } 773 wg.Wait() 774 fs.Infof(s.fdst, "Finished making map for --track-renames") 775} 776 777// tryRename renames an src object when doing track renames if 778// possible, it returns true if the object was renamed. 779func (s *syncCopyMove) tryRename(src fs.Object) bool { 780 // Calculate the hash of the src object 781 hash := s.renameID(src, s.trackRenamesStrategy, fs.GetModifyWindow(s.ctx, s.fsrc, s.fdst)) 782 783 if hash == "" { 784 return false 785 } 786 787 // Get a match on fdst 788 dst := s.popRenameMap(hash, src) 789 if dst == nil { 790 return false 791 } 792 793 // Find dst object we are about to overwrite if it exists 794 dstOverwritten, _ := s.fdst.NewObject(s.ctx, src.Remote()) 795 796 // Rename dst to have name src.Remote() 797 _, err := operations.Move(s.ctx, s.fdst, dstOverwritten, src.Remote(), dst) 798 if err != nil { 799 fs.Debugf(src, "Failed to rename to %q: %v", dst.Remote(), err) 800 return false 801 } 802 803 // remove file from dstFiles if present 804 s.dstFilesMu.Lock() 805 delete(s.dstFiles, dst.Remote()) 806 s.dstFilesMu.Unlock() 807 808 fs.Infof(src, "Renamed from %q", dst.Remote()) 809 return true 810} 811 812// Syncs fsrc into fdst 813// 814// If Delete is true then it deletes any files in fdst that aren't in fsrc 815// 816// If DoMove is true then files will be moved instead of copied 817// 818// dir is the start directory, "" for root 819func (s *syncCopyMove) run() error { 820 if operations.Same(s.fdst, s.fsrc) { 821 fs.Errorf(s.fdst, "Nothing to do as source and destination are the same") 822 return nil 823 } 824 825 // Start background checking and transferring pipeline 826 s.startCheckers() 827 s.startRenamers() 828 if !s.checkFirst { 829 s.startTransfers() 830 } 831 s.startDeleters() 832 s.dstFiles = make(map[string]fs.Object) 833 834 s.startTrackRenames() 835 836 // set up a march over fdst and fsrc 837 m := &march.March{ 838 Ctx: s.inCtx, 839 Fdst: s.fdst, 840 Fsrc: s.fsrc, 841 Dir: s.dir, 842 NoTraverse: s.noTraverse, 843 Callback: s, 844 DstIncludeAll: s.fi.Opt.DeleteExcluded, 845 NoCheckDest: s.noCheckDest, 846 NoUnicodeNormalization: s.noUnicodeNormalization, 847 } 848 s.processError(m.Run(s.ctx)) 849 850 s.stopTrackRenames() 851 if s.trackRenames { 852 // Build the map of the remaining dstFiles by hash 853 s.makeRenameMap() 854 // Attempt renames for all the files which don't have a matching dst 855 for _, src := range s.renameCheck { 856 ok := s.toBeRenamed.Put(s.ctx, fs.ObjectPair{Src: src, Dst: nil}) 857 if !ok { 858 break 859 } 860 } 861 } 862 863 // Stop background checking and transferring pipeline 864 s.stopCheckers() 865 if s.checkFirst { 866 fs.Infof(s.fdst, "Checks finished, now starting transfers") 867 s.startTransfers() 868 } 869 s.stopRenamers() 870 s.stopTransfers() 871 s.stopDeleters() 872 873 if s.copyEmptySrcDirs { 874 s.processError(copyEmptyDirectories(s.ctx, s.fdst, s.srcEmptyDirs)) 875 } 876 877 // Delete files after 878 if s.deleteMode == fs.DeleteModeAfter { 879 if s.currentError() != nil && !s.ci.IgnoreErrors { 880 fs.Errorf(s.fdst, "%v", fs.ErrorNotDeleting) 881 } else { 882 s.processError(s.deleteFiles(false)) 883 } 884 } 885 886 // Prune empty directories 887 if s.deleteMode != fs.DeleteModeOff { 888 if s.currentError() != nil && !s.ci.IgnoreErrors { 889 fs.Errorf(s.fdst, "%v", fs.ErrorNotDeletingDirs) 890 } else { 891 s.processError(s.deleteEmptyDirectories(s.ctx, s.fdst, s.dstEmptyDirs)) 892 } 893 } 894 895 // Delete empty fsrc subdirectories 896 // if DoMove and --delete-empty-src-dirs flag is set 897 if s.DoMove && s.deleteEmptySrcDirs { 898 // delete empty subdirectories that were part of the move 899 s.processError(s.deleteEmptyDirectories(s.ctx, s.fsrc, s.srcEmptyDirs)) 900 } 901 902 // Read the error out of the context if there is one 903 s.processError(s.ctx.Err()) 904 905 // Print nothing to transfer message if there were no transfers and no errors 906 if s.deleteMode != fs.DeleteModeOnly && accounting.Stats(s.ctx).GetTransfers() == 0 && s.currentError() == nil { 907 fs.Infof(nil, "There was nothing to transfer") 908 } 909 910 // cancel the context to free resources 911 s.cancel() 912 return s.currentError() 913} 914 915// DstOnly have an object which is in the destination only 916func (s *syncCopyMove) DstOnly(dst fs.DirEntry) (recurse bool) { 917 if s.deleteMode == fs.DeleteModeOff { 918 return false 919 } 920 switch x := dst.(type) { 921 case fs.Object: 922 switch s.deleteMode { 923 case fs.DeleteModeAfter: 924 // record object as needs deleting 925 s.dstFilesMu.Lock() 926 s.dstFiles[x.Remote()] = x 927 s.dstFilesMu.Unlock() 928 case fs.DeleteModeDuring, fs.DeleteModeOnly: 929 select { 930 case <-s.ctx.Done(): 931 return 932 case s.deleteFilesCh <- x: 933 } 934 default: 935 panic(fmt.Sprintf("unexpected delete mode %d", s.deleteMode)) 936 } 937 case fs.Directory: 938 // Do the same thing to the entire contents of the directory 939 // Record directory as it is potentially empty and needs deleting 940 if s.fdst.Features().CanHaveEmptyDirectories { 941 s.dstEmptyDirsMu.Lock() 942 s.dstEmptyDirs[dst.Remote()] = dst 943 s.dstEmptyDirsMu.Unlock() 944 } 945 return true 946 default: 947 panic("Bad object in DirEntries") 948 949 } 950 return false 951} 952 953// SrcOnly have an object which is in the source only 954func (s *syncCopyMove) SrcOnly(src fs.DirEntry) (recurse bool) { 955 if s.deleteMode == fs.DeleteModeOnly { 956 return false 957 } 958 switch x := src.(type) { 959 case fs.Object: 960 // If it's a copy operation, 961 // remove parent directory from srcEmptyDirs 962 // since it's not really empty 963 s.srcEmptyDirsMu.Lock() 964 s.srcParentDirCheck(src) 965 s.srcEmptyDirsMu.Unlock() 966 967 if s.trackRenames { 968 // Save object to check for a rename later 969 select { 970 case <-s.ctx.Done(): 971 return 972 case s.trackRenamesCh <- x: 973 } 974 } else { 975 // Check CompareDest && CopyDest 976 NoNeedTransfer, err := operations.CompareOrCopyDest(s.ctx, s.fdst, nil, x, s.compareCopyDest, s.backupDir) 977 if err != nil { 978 s.processError(err) 979 } 980 if !NoNeedTransfer { 981 // No need to check since doesn't exist 982 ok := s.toBeUploaded.Put(s.ctx, fs.ObjectPair{Src: x, Dst: nil}) 983 if !ok { 984 return 985 } 986 } 987 } 988 case fs.Directory: 989 // Do the same thing to the entire contents of the directory 990 // Record the directory for deletion 991 s.srcEmptyDirsMu.Lock() 992 s.srcParentDirCheck(src) 993 s.srcEmptyDirs[src.Remote()] = src 994 s.srcEmptyDirsMu.Unlock() 995 return true 996 default: 997 panic("Bad object in DirEntries") 998 } 999 return false 1000} 1001 1002// Match is called when src and dst are present, so sync src to dst 1003func (s *syncCopyMove) Match(ctx context.Context, dst, src fs.DirEntry) (recurse bool) { 1004 switch srcX := src.(type) { 1005 case fs.Object: 1006 s.srcEmptyDirsMu.Lock() 1007 s.srcParentDirCheck(src) 1008 s.srcEmptyDirsMu.Unlock() 1009 1010 if s.deleteMode == fs.DeleteModeOnly { 1011 return false 1012 } 1013 dstX, ok := dst.(fs.Object) 1014 if ok { 1015 ok = s.toBeChecked.Put(s.ctx, fs.ObjectPair{Src: srcX, Dst: dstX}) 1016 if !ok { 1017 return false 1018 } 1019 } else { 1020 // FIXME src is file, dst is directory 1021 err := errors.New("can't overwrite directory with file") 1022 fs.Errorf(dst, "%v", err) 1023 s.processError(err) 1024 } 1025 case fs.Directory: 1026 // Do the same thing to the entire contents of the directory 1027 _, ok := dst.(fs.Directory) 1028 if ok { 1029 // Only record matched (src & dst) empty dirs when performing move 1030 if s.DoMove { 1031 // Record the src directory for deletion 1032 s.srcEmptyDirsMu.Lock() 1033 s.srcParentDirCheck(src) 1034 s.srcEmptyDirs[src.Remote()] = src 1035 s.srcEmptyDirsMu.Unlock() 1036 } 1037 1038 return true 1039 } 1040 // FIXME src is dir, dst is file 1041 err := errors.New("can't overwrite file with directory") 1042 fs.Errorf(dst, "%v", err) 1043 s.processError(err) 1044 default: 1045 panic("Bad object in DirEntries") 1046 } 1047 return false 1048} 1049 1050// Syncs fsrc into fdst 1051// 1052// If Delete is true then it deletes any files in fdst that aren't in fsrc 1053// 1054// If DoMove is true then files will be moved instead of copied 1055// 1056// dir is the start directory, "" for root 1057func runSyncCopyMove(ctx context.Context, fdst, fsrc fs.Fs, deleteMode fs.DeleteMode, DoMove bool, deleteEmptySrcDirs bool, copyEmptySrcDirs bool) error { 1058 ci := fs.GetConfig(ctx) 1059 if deleteMode != fs.DeleteModeOff && DoMove { 1060 return fserrors.FatalError(errors.New("can't delete and move at the same time")) 1061 } 1062 // Run an extra pass to delete only 1063 if deleteMode == fs.DeleteModeBefore { 1064 if ci.TrackRenames { 1065 return fserrors.FatalError(errors.New("can't use --delete-before with --track-renames")) 1066 } 1067 // only delete stuff during in this pass 1068 do, err := newSyncCopyMove(ctx, fdst, fsrc, fs.DeleteModeOnly, false, deleteEmptySrcDirs, copyEmptySrcDirs) 1069 if err != nil { 1070 return err 1071 } 1072 err = do.run() 1073 if err != nil { 1074 return err 1075 } 1076 // Next pass does a copy only 1077 deleteMode = fs.DeleteModeOff 1078 } 1079 do, err := newSyncCopyMove(ctx, fdst, fsrc, deleteMode, DoMove, deleteEmptySrcDirs, copyEmptySrcDirs) 1080 if err != nil { 1081 return err 1082 } 1083 return do.run() 1084} 1085 1086// Sync fsrc into fdst 1087func Sync(ctx context.Context, fdst, fsrc fs.Fs, copyEmptySrcDirs bool) error { 1088 ci := fs.GetConfig(ctx) 1089 return runSyncCopyMove(ctx, fdst, fsrc, ci.DeleteMode, false, false, copyEmptySrcDirs) 1090} 1091 1092// CopyDir copies fsrc into fdst 1093func CopyDir(ctx context.Context, fdst, fsrc fs.Fs, copyEmptySrcDirs bool) error { 1094 return runSyncCopyMove(ctx, fdst, fsrc, fs.DeleteModeOff, false, false, copyEmptySrcDirs) 1095} 1096 1097// moveDir moves fsrc into fdst 1098func moveDir(ctx context.Context, fdst, fsrc fs.Fs, deleteEmptySrcDirs bool, copyEmptySrcDirs bool) error { 1099 return runSyncCopyMove(ctx, fdst, fsrc, fs.DeleteModeOff, true, deleteEmptySrcDirs, copyEmptySrcDirs) 1100} 1101 1102// MoveDir moves fsrc into fdst 1103func MoveDir(ctx context.Context, fdst, fsrc fs.Fs, deleteEmptySrcDirs bool, copyEmptySrcDirs bool) error { 1104 fi := filter.GetConfig(ctx) 1105 if operations.Same(fdst, fsrc) { 1106 fs.Errorf(fdst, "Nothing to do as source and destination are the same") 1107 return nil 1108 } 1109 1110 // First attempt to use DirMover if exists, same Fs and no filters are active 1111 if fdstDirMove := fdst.Features().DirMove; fdstDirMove != nil && operations.SameConfig(fsrc, fdst) && fi.InActive() { 1112 if operations.SkipDestructive(ctx, fdst, "server-side directory move") { 1113 return nil 1114 } 1115 fs.Debugf(fdst, "Using server-side directory move") 1116 err := fdstDirMove(ctx, fsrc, "", "") 1117 switch err { 1118 case fs.ErrorCantDirMove, fs.ErrorDirExists: 1119 fs.Infof(fdst, "Server side directory move failed - fallback to file moves: %v", err) 1120 case nil: 1121 fs.Infof(fdst, "Server side directory move succeeded") 1122 return nil 1123 default: 1124 err = fs.CountError(err) 1125 fs.Errorf(fdst, "Server side directory move failed: %v", err) 1126 return err 1127 } 1128 } 1129 1130 // Otherwise move the files one by one 1131 return moveDir(ctx, fdst, fsrc, deleteEmptySrcDirs, copyEmptySrcDirs) 1132} 1133