1// Package chunker provides wrappers for Fs and Object which split large files in chunks 2package chunker 3 4import ( 5 "bytes" 6 "context" 7 "crypto/md5" 8 "crypto/sha1" 9 "encoding/hex" 10 "encoding/json" 11 "fmt" 12 gohash "hash" 13 "io" 14 "io/ioutil" 15 "math/rand" 16 "path" 17 "regexp" 18 "sort" 19 "strconv" 20 "strings" 21 "sync" 22 "time" 23 24 "github.com/pkg/errors" 25 "github.com/rclone/rclone/fs" 26 "github.com/rclone/rclone/fs/accounting" 27 "github.com/rclone/rclone/fs/cache" 28 "github.com/rclone/rclone/fs/config/configmap" 29 "github.com/rclone/rclone/fs/config/configstruct" 30 "github.com/rclone/rclone/fs/fspath" 31 "github.com/rclone/rclone/fs/hash" 32 "github.com/rclone/rclone/fs/operations" 33) 34 35// 36// Chunker's composite files have one or more chunks 37// and optional metadata object. If it's present, 38// meta object is named after the original file. 39// 40// The only supported metadata format is simplejson atm. 41// It supports only per-file meta objects that are rudimentary, 42// used mostly for consistency checks (lazily for performance reasons). 43// Other formats can be developed that use an external meta store 44// free of these limitations, but this needs some support from 45// rclone core (e.g. metadata store interfaces). 46// 47// The following types of chunks are supported: 48// data and control, active and temporary. 49// Chunk type is identified by matching chunk file name 50// based on the chunk name format configured by user and transaction 51// style being used. 52// 53// Both data and control chunks can be either temporary (aka hidden) 54// or active (non-temporary aka normal aka permanent). 55// An operation creates temporary chunks while it runs. 56// By completion it removes temporary and leaves active chunks. 57// 58// Temporary chunks have a special hardcoded suffix in addition 59// to the configured name pattern. 60// Temporary suffix includes so called transaction identifier 61// (abbreviated as `xactID` below), a generic non-negative base-36 "number" 62// used by parallel operations to share a composite object. 63// Chunker also accepts the longer decimal temporary suffix (obsolete), 64// which is transparently converted to the new format. In its maximum 65// length of 13 decimals it makes a 7-digit base-36 number. 66// 67// When transactions is set to the norename style, data chunks will 68// keep their temporary chunk names (with the transacion identifier 69// suffix). To distinguish them from temporary chunks, the txn field 70// of the metadata file is set to match the transaction identifier of 71// the data chunks. 72// 73// Chunker can tell data chunks from control chunks by the characters 74// located in the "hash placeholder" position of configured format. 75// Data chunks have decimal digits there. 76// Control chunks have in that position a short lowercase alphanumeric 77// string (starting with a letter) prepended by underscore. 78// 79// Metadata format v1 does not define any control chunk types, 80// they are currently ignored aka reserved. 81// In future they can be used to implement resumable uploads etc. 82// 83const ( 84 ctrlTypeRegStr = `[a-z][a-z0-9]{2,6}` 85 tempSuffixFormat = `_%04s` 86 tempSuffixRegStr = `_([0-9a-z]{4,9})` 87 tempSuffixRegOld = `\.\.tmp_([0-9]{10,13})` 88) 89 90var ( 91 // regular expressions to validate control type and temporary suffix 92 ctrlTypeRegexp = regexp.MustCompile(`^` + ctrlTypeRegStr + `$`) 93 tempSuffixRegexp = regexp.MustCompile(`^` + tempSuffixRegStr + `$`) 94) 95 96// Normally metadata is a small piece of JSON (about 100-300 bytes). 97// The size of valid metadata must never exceed this limit. 98// Current maximum provides a reasonable room for future extensions. 99// 100// Please refrain from increasing it, this can cause old rclone versions 101// to fail, or worse, treat meta object as a normal file (see NewObject). 102// If more room is needed please bump metadata version forcing previous 103// releases to ask for upgrade, and offload extra info to a control chunk. 104// 105// And still chunker's primary function is to chunk large files 106// rather than serve as a generic metadata container. 107const maxMetadataSize = 1023 108const maxMetadataSizeWritten = 255 109 110// Current/highest supported metadata format. 111const metadataVersion = 2 112 113// optimizeFirstChunk enables the following optimization in the Put: 114// If a single chunk is expected, put the first chunk using the 115// base target name instead of a temporary name, thus avoiding 116// extra rename operation. 117// Warning: this optimization is not transaction safe. 118const optimizeFirstChunk = false 119 120// revealHidden is a stub until chunker lands the `reveal hidden` option. 121const revealHidden = false 122 123// Prevent memory overflow due to specially crafted chunk name 124const maxSafeChunkNumber = 10000000 125 126// Number of attempts to find unique transaction identifier 127const maxTransactionProbes = 100 128 129// standard chunker errors 130var ( 131 ErrChunkOverflow = errors.New("chunk number overflow") 132 ErrMetaTooBig = errors.New("metadata is too big") 133 ErrMetaUnknown = errors.New("unknown metadata, please upgrade rclone") 134) 135 136// variants of baseMove's parameter delMode 137const ( 138 delNever = 0 // don't delete, just move 139 delAlways = 1 // delete destination before moving 140 delFailed = 2 // move, then delete and try again if failed 141) 142 143// Register with Fs 144func init() { 145 fs.Register(&fs.RegInfo{ 146 Name: "chunker", 147 Description: "Transparently chunk/split large files", 148 NewFs: NewFs, 149 Options: []fs.Option{{ 150 Name: "remote", 151 Required: true, 152 Help: `Remote to chunk/unchunk. 153 154Normally should contain a ':' and a path, e.g. "myremote:path/to/dir", 155"myremote:bucket" or maybe "myremote:" (not recommended).`, 156 }, { 157 Name: "chunk_size", 158 Advanced: false, 159 Default: fs.SizeSuffix(2147483648), // 2 GiB 160 Help: `Files larger than chunk size will be split in chunks.`, 161 }, { 162 Name: "name_format", 163 Advanced: true, 164 Hide: fs.OptionHideCommandLine, 165 Default: `*.rclone_chunk.###`, 166 Help: `String format of chunk file names. 167 168The two placeholders are: base file name (*) and chunk number (#...). 169There must be one and only one asterisk and one or more consecutive hash characters. 170If chunk number has less digits than the number of hashes, it is left-padded by zeros. 171If there are more digits in the number, they are left as is. 172Possible chunk files are ignored if their name does not match given format.`, 173 }, { 174 Name: "start_from", 175 Advanced: true, 176 Hide: fs.OptionHideCommandLine, 177 Default: 1, 178 Help: `Minimum valid chunk number. Usually 0 or 1. 179 180By default chunk numbers start from 1.`, 181 }, { 182 Name: "meta_format", 183 Advanced: true, 184 Hide: fs.OptionHideCommandLine, 185 Default: "simplejson", 186 Help: `Format of the metadata object or "none". 187 188By default "simplejson". 189Metadata is a small JSON file named after the composite file.`, 190 Examples: []fs.OptionExample{{ 191 Value: "none", 192 Help: `Do not use metadata files at all. 193Requires hash type "none".`, 194 }, { 195 Value: "simplejson", 196 Help: `Simple JSON supports hash sums and chunk validation. 197 198It has the following fields: ver, size, nchunks, md5, sha1.`, 199 }}, 200 }, { 201 Name: "hash_type", 202 Advanced: false, 203 Default: "md5", 204 Help: `Choose how chunker handles hash sums. 205 206All modes but "none" require metadata.`, 207 Examples: []fs.OptionExample{{ 208 Value: "none", 209 Help: `Pass any hash supported by wrapped remote for non-chunked files. 210Return nothing otherwise.`, 211 }, { 212 Value: "md5", 213 Help: `MD5 for composite files.`, 214 }, { 215 Value: "sha1", 216 Help: `SHA1 for composite files.`, 217 }, { 218 Value: "md5all", 219 Help: `MD5 for all files.`, 220 }, { 221 Value: "sha1all", 222 Help: `SHA1 for all files.`, 223 }, { 224 Value: "md5quick", 225 Help: `Copying a file to chunker will request MD5 from the source. 226Falling back to SHA1 if unsupported.`, 227 }, { 228 Value: "sha1quick", 229 Help: `Similar to "md5quick" but prefers SHA1 over MD5.`, 230 }}, 231 }, { 232 Name: "fail_hard", 233 Advanced: true, 234 Default: false, 235 Help: `Choose how chunker should handle files with missing or invalid chunks.`, 236 Examples: []fs.OptionExample{ 237 { 238 Value: "true", 239 Help: "Report errors and abort current command.", 240 }, { 241 Value: "false", 242 Help: "Warn user, skip incomplete file and proceed.", 243 }, 244 }, 245 }, { 246 Name: "transactions", 247 Advanced: true, 248 Default: "rename", 249 Help: `Choose how chunker should handle temporary files during transactions.`, 250 Hide: fs.OptionHideCommandLine, 251 Examples: []fs.OptionExample{ 252 { 253 Value: "rename", 254 Help: "Rename temporary files after a successful transaction.", 255 }, { 256 Value: "norename", 257 Help: `Leave temporary file names and write transaction ID to metadata file. 258Metadata is required for no rename transactions (meta format cannot be "none"). 259If you are using norename transactions you should be careful not to downgrade Rclone 260as older versions of Rclone don't support this transaction style and will misinterpret 261files manipulated by norename transactions. 262This method is EXPERIMENTAL, don't use on production systems.`, 263 }, { 264 Value: "auto", 265 Help: `Rename or norename will be used depending on capabilities of the backend. 266If meta format is set to "none", rename transactions will always be used. 267This method is EXPERIMENTAL, don't use on production systems.`, 268 }, 269 }, 270 }}, 271 }) 272} 273 274// NewFs constructs an Fs from the path, container:path 275func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) { 276 // Parse config into Options struct 277 opt := new(Options) 278 err := configstruct.Set(m, opt) 279 if err != nil { 280 return nil, err 281 } 282 if opt.StartFrom < 0 { 283 return nil, errors.New("start_from must be non-negative") 284 } 285 286 remote := opt.Remote 287 if strings.HasPrefix(remote, name+":") { 288 return nil, errors.New("can't point remote at itself - check the value of the remote setting") 289 } 290 291 baseName, basePath, err := fspath.SplitFs(remote) 292 if err != nil { 293 return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote) 294 } 295 // Look for a file first 296 remotePath := fspath.JoinRootPath(basePath, rpath) 297 baseFs, err := cache.Get(ctx, baseName+remotePath) 298 if err != fs.ErrorIsFile && err != nil { 299 return nil, errors.Wrapf(err, "failed to make remote %q to wrap", baseName+remotePath) 300 } 301 if !operations.CanServerSideMove(baseFs) { 302 return nil, errors.New("can't use chunker on a backend which doesn't support server-side move or copy") 303 } 304 305 f := &Fs{ 306 base: baseFs, 307 name: name, 308 root: rpath, 309 opt: *opt, 310 } 311 cache.PinUntilFinalized(f.base, f) 312 f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm. 313 314 if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType, opt.Transactions); err != nil { 315 return nil, err 316 } 317 318 // Handle the tricky case detected by FsMkdir/FsPutFiles/FsIsFile 319 // when `rpath` points to a composite multi-chunk file without metadata, 320 // i.e. `rpath` does not exist in the wrapped remote, but chunker 321 // detects a composite file because it finds the first chunk! 322 // (yet can't satisfy fstest.CheckListing, will ignore) 323 if err == nil && !f.useMeta && strings.Contains(rpath, "/") { 324 firstChunkPath := f.makeChunkName(remotePath, 0, "", "") 325 _, testErr := cache.Get(ctx, baseName+firstChunkPath) 326 if testErr == fs.ErrorIsFile { 327 err = testErr 328 } 329 } 330 331 // Note 1: the features here are ones we could support, and they are 332 // ANDed with the ones from wrappedFs. 333 // Note 2: features.Fill() points features.PutStream to our PutStream, 334 // but features.Mask() will nullify it if wrappedFs does not have it. 335 f.features = (&fs.Features{ 336 CaseInsensitive: true, 337 DuplicateFiles: true, 338 ReadMimeType: false, // Object.MimeType not supported 339 WriteMimeType: true, 340 BucketBased: true, 341 CanHaveEmptyDirectories: true, 342 ServerSideAcrossConfigs: true, 343 }).Fill(ctx, f).Mask(ctx, baseFs).WrapsFs(f, baseFs) 344 345 f.features.Disable("ListR") // Recursive listing may cause chunker skip files 346 347 return f, err 348} 349 350// Options defines the configuration for this backend 351type Options struct { 352 Remote string `config:"remote"` 353 ChunkSize fs.SizeSuffix `config:"chunk_size"` 354 NameFormat string `config:"name_format"` 355 StartFrom int `config:"start_from"` 356 MetaFormat string `config:"meta_format"` 357 HashType string `config:"hash_type"` 358 FailHard bool `config:"fail_hard"` 359 Transactions string `config:"transactions"` 360} 361 362// Fs represents a wrapped fs.Fs 363type Fs struct { 364 name string 365 root string 366 base fs.Fs // remote wrapped by chunker overlay 367 wrapper fs.Fs // wrapper is used by SetWrapper 368 useMeta bool // false if metadata format is 'none' 369 useMD5 bool // mutually exclusive with useSHA1 370 useSHA1 bool // mutually exclusive with useMD5 371 hashFallback bool // allows fallback from MD5 to SHA1 and vice versa 372 hashAll bool // hash all files, mutually exclusive with hashFallback 373 dataNameFmt string // name format of data chunks 374 ctrlNameFmt string // name format of control chunks 375 nameRegexp *regexp.Regexp // regular expression to match chunk names 376 xactIDRand *rand.Rand // generator of random transaction identifiers 377 xactIDMutex sync.Mutex // mutex for the source of randomness 378 opt Options // copy of Options 379 features *fs.Features // optional features 380 dirSort bool // reserved for future, ignored 381 useNoRename bool // can be set with the transactions option 382} 383 384// configure sets up chunker for given name format, meta format and hash type. 385// It also seeds the source of random transaction identifiers. 386// configure must be called only from NewFs or by unit tests. 387func (f *Fs) configure(nameFormat, metaFormat, hashType, transactionMode string) error { 388 if err := f.setChunkNameFormat(nameFormat); err != nil { 389 return errors.Wrapf(err, "invalid name format '%s'", nameFormat) 390 } 391 if err := f.setMetaFormat(metaFormat); err != nil { 392 return err 393 } 394 if err := f.setHashType(hashType); err != nil { 395 return err 396 } 397 if err := f.setTransactionMode(transactionMode); err != nil { 398 return err 399 } 400 401 randomSeed := time.Now().UnixNano() 402 f.xactIDRand = rand.New(rand.NewSource(randomSeed)) 403 404 return nil 405} 406 407func (f *Fs) setMetaFormat(metaFormat string) error { 408 switch metaFormat { 409 case "none": 410 f.useMeta = false 411 case "simplejson": 412 f.useMeta = true 413 default: 414 return fmt.Errorf("unsupported meta format '%s'", metaFormat) 415 } 416 return nil 417} 418 419// setHashType 420// must be called *after* setMetaFormat. 421// 422// In the "All" mode chunker will force metadata on all files 423// if the wrapped remote can't provide given hashsum. 424func (f *Fs) setHashType(hashType string) error { 425 f.useMD5 = false 426 f.useSHA1 = false 427 f.hashFallback = false 428 f.hashAll = false 429 requireMetaHash := true 430 431 switch hashType { 432 case "none": 433 requireMetaHash = false 434 case "md5": 435 f.useMD5 = true 436 case "sha1": 437 f.useSHA1 = true 438 case "md5quick": 439 f.useMD5 = true 440 f.hashFallback = true 441 case "sha1quick": 442 f.useSHA1 = true 443 f.hashFallback = true 444 case "md5all": 445 f.useMD5 = true 446 f.hashAll = !f.base.Hashes().Contains(hash.MD5) || f.base.Features().SlowHash 447 case "sha1all": 448 f.useSHA1 = true 449 f.hashAll = !f.base.Hashes().Contains(hash.SHA1) || f.base.Features().SlowHash 450 default: 451 return fmt.Errorf("unsupported hash type '%s'", hashType) 452 } 453 if requireMetaHash && !f.useMeta { 454 return fmt.Errorf("hash type '%s' requires compatible meta format", hashType) 455 } 456 return nil 457} 458 459func (f *Fs) setTransactionMode(transactionMode string) error { 460 switch transactionMode { 461 case "rename": 462 f.useNoRename = false 463 case "norename": 464 if !f.useMeta { 465 return errors.New("incompatible transaction options") 466 } 467 f.useNoRename = true 468 case "auto": 469 f.useNoRename = !f.CanQuickRename() 470 if f.useNoRename && !f.useMeta { 471 f.useNoRename = false 472 return errors.New("using norename transactions requires metadata") 473 } 474 default: 475 return fmt.Errorf("unsupported transaction mode '%s'", transactionMode) 476 } 477 return nil 478} 479 480// setChunkNameFormat converts pattern based chunk name format 481// into Printf format and Regular expressions for data and 482// control chunks. 483func (f *Fs) setChunkNameFormat(pattern string) error { 484 // validate pattern 485 if strings.Count(pattern, "*") != 1 { 486 return errors.New("pattern must have exactly one asterisk (*)") 487 } 488 numDigits := strings.Count(pattern, "#") 489 if numDigits < 1 { 490 return errors.New("pattern must have a hash character (#)") 491 } 492 if strings.Index(pattern, "*") > strings.Index(pattern, "#") { 493 return errors.New("asterisk (*) in pattern must come before hashes (#)") 494 } 495 if ok, _ := regexp.MatchString("^[^#]*[#]+[^#]*$", pattern); !ok { 496 return errors.New("hashes (#) in pattern must be consecutive") 497 } 498 if dir, _ := path.Split(pattern); dir != "" { 499 return errors.New("directory separator prohibited") 500 } 501 if pattern[0] != '*' { 502 return errors.New("pattern must start with asterisk") // to be lifted later 503 } 504 505 // craft a unified regular expression for all types of chunks 506 reHashes := regexp.MustCompile("[#]+") 507 reDigits := "[0-9]+" 508 if numDigits > 1 { 509 reDigits = fmt.Sprintf("[0-9]{%d,}", numDigits) 510 } 511 reDataOrCtrl := fmt.Sprintf("(?:(%s)|_(%s))", reDigits, ctrlTypeRegStr) 512 513 // this must be non-greedy or else it could eat up temporary suffix 514 const mainNameRegStr = "(.+?)" 515 516 strRegex := regexp.QuoteMeta(pattern) 517 strRegex = reHashes.ReplaceAllLiteralString(strRegex, reDataOrCtrl) 518 strRegex = strings.Replace(strRegex, "\\*", mainNameRegStr, -1) 519 strRegex = fmt.Sprintf("^%s(?:%s|%s)?$", strRegex, tempSuffixRegStr, tempSuffixRegOld) 520 f.nameRegexp = regexp.MustCompile(strRegex) 521 522 // craft printf formats for active data/control chunks 523 fmtDigits := "%d" 524 if numDigits > 1 { 525 fmtDigits = fmt.Sprintf("%%0%dd", numDigits) 526 } 527 strFmt := strings.Replace(pattern, "%", "%%", -1) 528 strFmt = strings.Replace(strFmt, "*", "%s", 1) 529 f.dataNameFmt = reHashes.ReplaceAllLiteralString(strFmt, fmtDigits) 530 f.ctrlNameFmt = reHashes.ReplaceAllLiteralString(strFmt, "_%s") 531 return nil 532} 533 534// makeChunkName produces chunk name (or path) for a given file. 535// 536// filePath can be name, relative or absolute path of main file. 537// 538// chunkNo must be a zero based index of data chunk. 539// Negative chunkNo e.g. -1 indicates a control chunk. 540// ctrlType is type of control chunk (must be valid). 541// ctrlType must be "" for data chunks. 542// 543// xactID is a transaction identifier. Empty xactID denotes active chunk, 544// otherwise temporary chunk name is produced. 545// 546func (f *Fs) makeChunkName(filePath string, chunkNo int, ctrlType, xactID string) string { 547 dir, parentName := path.Split(filePath) 548 var name, tempSuffix string 549 switch { 550 case chunkNo >= 0 && ctrlType == "": 551 name = fmt.Sprintf(f.dataNameFmt, parentName, chunkNo+f.opt.StartFrom) 552 case chunkNo < 0 && ctrlTypeRegexp.MatchString(ctrlType): 553 name = fmt.Sprintf(f.ctrlNameFmt, parentName, ctrlType) 554 default: 555 panic("makeChunkName: invalid argument") // must not produce something we can't consume 556 } 557 if xactID != "" { 558 tempSuffix = fmt.Sprintf(tempSuffixFormat, xactID) 559 if !tempSuffixRegexp.MatchString(tempSuffix) { 560 panic("makeChunkName: invalid argument") 561 } 562 } 563 return dir + name + tempSuffix 564} 565 566// parseChunkName checks whether given file path belongs to 567// a chunk and extracts chunk name parts. 568// 569// filePath can be name, relative or absolute path of a file. 570// 571// Returned parentPath is path of the composite file owning the chunk. 572// It's a non-empty string if valid chunk name is detected 573// or "" if it's not a chunk. 574// Other returned values depend on detected chunk type: 575// data or control, active or temporary: 576// 577// data chunk - the returned chunkNo is non-negative and ctrlType is "" 578// control chunk - the chunkNo is -1 and ctrlType is a non-empty string 579// active chunk - the returned xactID is "" 580// temporary chunk - the xactID is a non-empty string 581func (f *Fs) parseChunkName(filePath string) (parentPath string, chunkNo int, ctrlType, xactID string) { 582 dir, name := path.Split(filePath) 583 match := f.nameRegexp.FindStringSubmatch(name) 584 if match == nil || match[1] == "" { 585 return "", -1, "", "" 586 } 587 var err error 588 589 chunkNo = -1 590 if match[2] != "" { 591 if chunkNo, err = strconv.Atoi(match[2]); err != nil { 592 chunkNo = -1 593 } 594 if chunkNo -= f.opt.StartFrom; chunkNo < 0 { 595 fs.Infof(f, "invalid data chunk number in file %q", name) 596 return "", -1, "", "" 597 } 598 } 599 600 if match[4] != "" { 601 xactID = match[4] 602 } 603 if match[5] != "" { 604 // old-style temporary suffix 605 number, err := strconv.ParseInt(match[5], 10, 64) 606 if err != nil || number < 0 { 607 fs.Infof(f, "invalid old-style transaction number in file %q", name) 608 return "", -1, "", "" 609 } 610 // convert old-style transaction number to base-36 transaction ID 611 xactID = fmt.Sprintf(tempSuffixFormat, strconv.FormatInt(number, 36)) 612 xactID = xactID[1:] // strip leading underscore 613 } 614 615 parentPath = dir + match[1] 616 ctrlType = match[3] 617 return 618} 619 620// forbidChunk prints error message or raises error if file is chunk. 621// First argument sets log prefix, use `false` to suppress message. 622func (f *Fs) forbidChunk(o interface{}, filePath string) error { 623 if parentPath, _, _, _ := f.parseChunkName(filePath); parentPath != "" { 624 if f.opt.FailHard { 625 return fmt.Errorf("chunk overlap with %q", parentPath) 626 } 627 if boolVal, isBool := o.(bool); !isBool || boolVal { 628 fs.Errorf(o, "chunk overlap with %q", parentPath) 629 } 630 } 631 return nil 632} 633 634// newXactID produces a sufficiently random transaction identifier. 635// 636// The temporary suffix mask allows identifiers consisting of 4-9 637// base-36 digits (ie. digits 0-9 or lowercase letters a-z). 638// The identifiers must be unique between transactions running on 639// the single file in parallel. 640// 641// Currently the function produces 6-character identifiers. 642// Together with underscore this makes a 7-character temporary suffix. 643// 644// The first 4 characters isolate groups of transactions by time intervals. 645// The maximum length of interval is base-36 "zzzz" ie. 1,679,615 seconds. 646// The function rather takes a maximum prime closest to this number 647// (see https://primes.utm.edu) as the interval length to better safeguard 648// against repeating pseudo-random sequences in cases when rclone is 649// invoked from a periodic scheduler like unix cron. 650// Thus, the interval is slightly more than 19 days 10 hours 33 minutes. 651// 652// The remaining 2 base-36 digits (in the range from 0 to 1295 inclusive) 653// are taken from the local random source. 654// This provides about 0.1% collision probability for two parallel 655// operations started at the same second and working on the same file. 656// 657// Non-empty filePath argument enables probing for existing temporary chunk 658// to further eliminate collisions. 659func (f *Fs) newXactID(ctx context.Context, filePath string) (xactID string, err error) { 660 const closestPrimeZzzzSeconds = 1679609 661 const maxTwoBase36Digits = 1295 662 663 unixSec := time.Now().Unix() 664 if unixSec < 0 { 665 unixSec = -unixSec // unlikely but the number must be positive 666 } 667 circleSec := unixSec % closestPrimeZzzzSeconds 668 first4chars := strconv.FormatInt(circleSec, 36) 669 670 for tries := 0; tries < maxTransactionProbes; tries++ { 671 f.xactIDMutex.Lock() 672 randomness := f.xactIDRand.Int63n(maxTwoBase36Digits + 1) 673 f.xactIDMutex.Unlock() 674 675 last2chars := strconv.FormatInt(randomness, 36) 676 xactID = fmt.Sprintf("%04s%02s", first4chars, last2chars) 677 678 if filePath == "" { 679 return 680 } 681 probeChunk := f.makeChunkName(filePath, 0, "", xactID) 682 _, probeErr := f.base.NewObject(ctx, probeChunk) 683 if probeErr != nil { 684 return 685 } 686 } 687 688 return "", fmt.Errorf("can't setup transaction for %s", filePath) 689} 690 691// List the objects and directories in dir into entries. 692// The entries can be returned in any order but should be 693// for a complete directory. 694// 695// dir should be "" to list the root, and should not have 696// trailing slashes. 697// 698// This should return ErrDirNotFound if the directory isn't found. 699// 700// Commands normally cleanup all temporary chunks in case of a failure. 701// However, if rclone dies unexpectedly, it can leave behind a bunch of 702// hidden temporary chunks. List and its underlying chunkEntries() 703// silently skip all temporary chunks in the directory. It's okay if 704// they belong to an unfinished command running in parallel. 705// 706// However, there is no way to discover dead temporary chunks atm. 707// As a workaround users can use `purge` to forcibly remove the whole 708// directory together with dead chunks. 709// In future a flag named like `--chunker-list-hidden` may be added to 710// rclone that will tell List to reveal hidden chunks. 711// 712func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) { 713 entries, err = f.base.List(ctx, dir) 714 if err != nil { 715 return nil, err 716 } 717 return f.processEntries(ctx, entries, dir) 718} 719 720// ListR lists the objects and directories of the Fs starting 721// from dir recursively into out. 722// 723// dir should be "" to start from the root, and should not 724// have trailing slashes. 725// 726// This should return ErrDirNotFound if the directory isn't 727// found. 728// 729// It should call callback for each tranche of entries read. 730// These need not be returned in any particular order. If 731// callback returns an error then the listing will stop 732// immediately. 733// 734// Don't implement this unless you have a more efficient way 735// of listing recursively than doing a directory traversal. 736func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) { 737 do := f.base.Features().ListR 738 return do(ctx, dir, func(entries fs.DirEntries) error { 739 newEntries, err := f.processEntries(ctx, entries, dir) 740 if err != nil { 741 return err 742 } 743 return callback(newEntries) 744 }) 745} 746 747// processEntries assembles chunk entries into composite entries 748func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirPath string) (newEntries fs.DirEntries, err error) { 749 var sortedEntries fs.DirEntries 750 if f.dirSort { 751 // sort entries so that meta objects go before their chunks 752 sortedEntries = make(fs.DirEntries, len(origEntries)) 753 copy(sortedEntries, origEntries) 754 sort.Sort(sortedEntries) 755 } else { 756 sortedEntries = origEntries 757 } 758 759 byRemote := make(map[string]*Object) 760 badEntry := make(map[string]bool) 761 isSubdir := make(map[string]bool) 762 txnByRemote := map[string]string{} 763 764 var tempEntries fs.DirEntries 765 for _, dirOrObject := range sortedEntries { 766 switch entry := dirOrObject.(type) { 767 case fs.Object: 768 remote := entry.Remote() 769 mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote) 770 if mainRemote == "" { 771 // this is meta object or standalone file 772 object := f.newObject("", entry, nil) 773 byRemote[remote] = object 774 tempEntries = append(tempEntries, object) 775 if f.useNoRename { 776 txnByRemote[remote], err = object.readXactID(ctx) 777 if err != nil { 778 return nil, err 779 } 780 } 781 break 782 } 783 // this is some kind of chunk 784 // metobject should have been created above if present 785 mainObject := byRemote[mainRemote] 786 isSpecial := xactID != txnByRemote[mainRemote] || ctrlType != "" 787 if mainObject == nil && f.useMeta && !isSpecial { 788 fs.Debugf(f, "skip orphan data chunk %q", remote) 789 break 790 } 791 if mainObject == nil && !f.useMeta { 792 // this is the "nometa" case 793 // create dummy chunked object without metadata 794 mainObject = f.newObject(mainRemote, nil, nil) 795 byRemote[mainRemote] = mainObject 796 if !badEntry[mainRemote] { 797 tempEntries = append(tempEntries, mainObject) 798 } 799 } 800 if isSpecial { 801 if revealHidden { 802 fs.Infof(f, "ignore non-data chunk %q", remote) 803 } 804 // need to read metadata to ensure actual object type 805 // no need to read if metaobject is too big or absent, 806 // use the fact that before calling validate() 807 // the `size` field caches metaobject size, if any 808 if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize { 809 mainObject.unsure = true 810 } 811 break 812 } 813 if err := mainObject.addChunk(entry, chunkNo); err != nil { 814 if f.opt.FailHard { 815 return nil, err 816 } 817 badEntry[mainRemote] = true 818 } 819 case fs.Directory: 820 isSubdir[entry.Remote()] = true 821 wrapDir := fs.NewDirCopy(ctx, entry) 822 wrapDir.SetRemote(entry.Remote()) 823 tempEntries = append(tempEntries, wrapDir) 824 default: 825 if f.opt.FailHard { 826 return nil, fmt.Errorf("unknown object type %T", entry) 827 } 828 fs.Debugf(f, "unknown object type %T", entry) 829 } 830 } 831 832 for _, entry := range tempEntries { 833 if object, ok := entry.(*Object); ok { 834 remote := object.Remote() 835 if isSubdir[remote] { 836 if f.opt.FailHard { 837 return nil, fmt.Errorf("%q is both meta object and directory", remote) 838 } 839 badEntry[remote] = true // fall thru 840 } 841 if badEntry[remote] { 842 fs.Debugf(f, "invalid directory entry %q", remote) 843 continue 844 } 845 if err := object.validate(); err != nil { 846 if f.opt.FailHard { 847 return nil, err 848 } 849 fs.Debugf(f, "invalid chunks in object %q", remote) 850 continue 851 } 852 } 853 newEntries = append(newEntries, entry) 854 } 855 856 if f.dirSort { 857 sort.Sort(newEntries) 858 } 859 return newEntries, nil 860} 861 862// NewObject finds the Object at remote. 863// 864// Please note that every NewObject invocation will scan the whole directory. 865// Using here something like fs.DirCache might improve performance 866// (yet making the logic more complex). 867// 868// Note that chunker prefers analyzing file names rather than reading 869// the content of meta object assuming that directory scans are fast 870// but opening even a small file can be slow on some backends. 871// 872func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) { 873 return f.scanObject(ctx, remote, false) 874} 875 876// scanObject is like NewObject with optional quick scan mode. 877// The quick mode avoids directory requests other than `List`, 878// ignores non-chunked objects and skips chunk size checks. 879func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.Object, error) { 880 if err := f.forbidChunk(false, remote); err != nil { 881 return nil, errors.Wrap(err, "can't access") 882 } 883 884 var ( 885 o *Object 886 baseObj fs.Object 887 currentXactID string 888 err error 889 sameMain bool 890 ) 891 892 if f.useMeta { 893 baseObj, err = f.base.NewObject(ctx, remote) 894 if err != nil { 895 return nil, err 896 } 897 remote = baseObj.Remote() 898 899 // Chunker's meta object cannot be large and maxMetadataSize acts 900 // as a hard limit. Anything larger than that is treated as a 901 // non-chunked file without even checking its contents, so it's 902 // paramount to prevent metadata from exceeding the maximum size. 903 // Anything smaller is additionally checked for format. 904 o = f.newObject("", baseObj, nil) 905 if o.size > maxMetadataSize { 906 return o, nil 907 } 908 } else { 909 // Metadata is disabled, hence this is either a multi-chunk 910 // composite file without meta object or a non-chunked file. 911 // Create an empty wrapper here, scan directory to determine 912 // which case it is and postpone reading if it's the latter one. 913 o = f.newObject(remote, nil, nil) 914 } 915 916 // If the object is small, it's probably a meta object. 917 // However, composite file must have data chunks besides it. 918 // Scan directory for possible data chunks now and decide later on. 919 dir := path.Dir(strings.TrimRight(remote, "/")) 920 if dir == "." { 921 dir = "" 922 } 923 entries, err := f.base.List(ctx, dir) 924 switch err { 925 case nil: 926 // OK, fall thru 927 case fs.ErrorDirNotFound: 928 entries = nil 929 default: 930 return nil, errors.Wrap(err, "can't detect composite file") 931 } 932 933 if f.useNoRename { 934 currentXactID, err = o.readXactID(ctx) 935 if err != nil { 936 return nil, err 937 } 938 } 939 caseInsensitive := f.features.CaseInsensitive 940 941 for _, dirOrObject := range entries { 942 entry, ok := dirOrObject.(fs.Object) 943 if !ok { 944 continue 945 } 946 entryRemote := entry.Remote() 947 if !caseInsensitive && !strings.Contains(entryRemote, remote) { 948 continue // bypass regexp to save cpu 949 } 950 mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote) 951 if mainRemote == "" { 952 continue // skip non-chunks 953 } 954 if caseInsensitive { 955 sameMain = strings.EqualFold(mainRemote, remote) 956 } else { 957 sameMain = mainRemote == remote 958 } 959 if !sameMain { 960 continue // skip alien chunks 961 } 962 if ctrlType != "" || xactID != currentXactID { 963 if f.useMeta { 964 // temporary/control chunk calls for lazy metadata read 965 o.unsure = true 966 } 967 continue 968 } 969 //fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo) 970 if err := o.addChunk(entry, chunkNo); err != nil { 971 return nil, err 972 } 973 } 974 975 if o.main == nil && (o.chunks == nil || len(o.chunks) == 0) { 976 // Scanning hasn't found data chunks with conforming names. 977 if f.useMeta || quickScan { 978 // Metadata is required but absent and there are no chunks. 979 return nil, fs.ErrorObjectNotFound 980 } 981 982 // Data chunks are not found and metadata is disabled. 983 // Thus, we are in the "latter case" from above. 984 // Let's try the postponed reading of a non-chunked file and add it 985 // as a single chunk to the empty composite wrapper created above 986 // with nil metadata. 987 baseObj, err = f.base.NewObject(ctx, remote) 988 if err == nil { 989 err = o.addChunk(baseObj, 0) 990 } 991 if err != nil { 992 return nil, err 993 } 994 } 995 996 // This is either a composite object with metadata or a non-chunked 997 // file without metadata. Validate it and update the total data size. 998 // As an optimization, skip metadata reading here - we will call 999 // readMetadata lazily when needed (reading can be expensive). 1000 if !quickScan { 1001 if err := o.validate(); err != nil { 1002 return nil, err 1003 } 1004 } 1005 return o, nil 1006} 1007 1008// readMetadata reads composite object metadata and caches results, 1009// in case of critical errors metadata is not cached. 1010// Returns ErrMetaUnknown if an unsupported metadata format is detected. 1011// If object is not chunked but marked by List or NewObject for recheck, 1012// readMetadata will attempt to parse object as composite with fallback 1013// to non-chunked representation if the attempt fails. 1014func (o *Object) readMetadata(ctx context.Context) error { 1015 // return quickly if metadata is absent or has been already cached 1016 if !o.f.useMeta { 1017 o.isFull = true 1018 } 1019 if o.isFull { 1020 return nil 1021 } 1022 if !o.isComposite() && !o.unsure { 1023 // this for sure is a non-chunked standalone file 1024 o.isFull = true 1025 return nil 1026 } 1027 1028 // validate metadata 1029 metaObject := o.main 1030 if metaObject.Size() > maxMetadataSize { 1031 if o.unsure { 1032 // this is not metadata but a foreign object 1033 o.unsure = false 1034 o.chunks = nil // make isComposite return false 1035 o.isFull = true // cache results 1036 return nil 1037 } 1038 return ErrMetaTooBig 1039 } 1040 1041 // size is within limits, perform consistency checks 1042 reader, err := metaObject.Open(ctx) 1043 if err != nil { 1044 return err 1045 } 1046 metadata, err := ioutil.ReadAll(reader) 1047 _ = reader.Close() // ensure file handle is freed on windows 1048 if err != nil { 1049 return err 1050 } 1051 1052 switch o.f.opt.MetaFormat { 1053 case "simplejson": 1054 metaInfo, madeByChunker, err := unmarshalSimpleJSON(ctx, metaObject, metadata) 1055 if o.unsure { 1056 o.unsure = false 1057 if !madeByChunker { 1058 // this is not metadata but a foreign object 1059 o.chunks = nil // make isComposite return false 1060 o.isFull = true // cache results 1061 return nil 1062 } 1063 } 1064 switch err { 1065 case nil: 1066 // fall thru 1067 case ErrMetaTooBig, ErrMetaUnknown: 1068 return err // return these errors unwrapped for unit tests 1069 default: 1070 return errors.Wrap(err, "invalid metadata") 1071 } 1072 if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks { 1073 return errors.New("metadata doesn't match file size") 1074 } 1075 o.md5 = metaInfo.md5 1076 o.sha1 = metaInfo.sha1 1077 o.xactID = metaInfo.xactID 1078 } 1079 1080 o.isFull = true // cache results 1081 o.xIDCached = true 1082 return nil 1083} 1084 1085// readXactID returns the transaction ID stored in the passed metadata object 1086func (o *Object) readXactID(ctx context.Context) (xactID string, err error) { 1087 // if xactID has already been read and cahced return it now 1088 if o.xIDCached { 1089 return o.xactID, nil 1090 } 1091 // Avoid reading metadata for backends that don't use xactID to identify permanent chunks 1092 if !o.f.useNoRename { 1093 return "", errors.New("readXactID requires norename transactions") 1094 } 1095 if o.main == nil { 1096 return "", errors.New("readXactID requires valid metaobject") 1097 } 1098 if o.main.Size() > maxMetadataSize { 1099 return "", nil // this was likely not a metadata object, return empty xactID but don't throw error 1100 } 1101 reader, err := o.main.Open(ctx) 1102 if err != nil { 1103 return "", err 1104 } 1105 data, err := ioutil.ReadAll(reader) 1106 _ = reader.Close() // ensure file handle is freed on windows 1107 if err != nil { 1108 return "", err 1109 } 1110 1111 switch o.f.opt.MetaFormat { 1112 case "simplejson": 1113 if len(data) > maxMetadataSizeWritten { 1114 return "", nil // this was likely not a metadata object, return empty xactID but don't throw error 1115 } 1116 var metadata metaSimpleJSON 1117 err = json.Unmarshal(data, &metadata) 1118 if err != nil { 1119 return "", nil // this was likely not a metadata object, return empty xactID but don't throw error 1120 } 1121 xactID = metadata.XactID 1122 } 1123 o.xactID = xactID 1124 o.xIDCached = true 1125 return xactID, nil 1126} 1127 1128// put implements Put, PutStream, PutUnchecked, Update 1129func (f *Fs) put( 1130 ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption, 1131 basePut putFn, action string, target fs.Object) (obj fs.Object, err error) { 1132 1133 // Perform consistency checks 1134 if err := f.forbidChunk(src, remote); err != nil { 1135 return nil, errors.Wrap(err, action+" refused") 1136 } 1137 if target == nil { 1138 // Get target object with a quick directory scan 1139 // skip metadata check if target object does not exist. 1140 // ignore not-chunked objects, skip chunk size checks. 1141 if obj, err := f.scanObject(ctx, remote, true); err == nil { 1142 target = obj 1143 } 1144 } 1145 if target != nil { 1146 obj := target.(*Object) 1147 if err := obj.readMetadata(ctx); err == ErrMetaUnknown { 1148 // refuse to update a file of unsupported format 1149 return nil, errors.Wrap(err, "refusing to "+action) 1150 } 1151 } 1152 1153 // Prepare to upload 1154 c := f.newChunkingReader(src) 1155 wrapIn := c.wrapStream(ctx, in, src) 1156 1157 var metaObject fs.Object 1158 defer func() { 1159 if err != nil { 1160 c.rollback(ctx, metaObject) 1161 } 1162 }() 1163 1164 baseRemote := remote 1165 xactID, errXact := f.newXactID(ctx, baseRemote) 1166 if errXact != nil { 1167 return nil, errXact 1168 } 1169 1170 // Transfer chunks data 1171 for c.chunkNo = 0; !c.done; c.chunkNo++ { 1172 if c.chunkNo > maxSafeChunkNumber { 1173 return nil, ErrChunkOverflow 1174 } 1175 1176 tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactID) 1177 size := c.sizeLeft 1178 if size > c.chunkSize { 1179 size = c.chunkSize 1180 } 1181 savedReadCount := c.readCount 1182 1183 // If a single chunk is expected, avoid the extra rename operation 1184 chunkRemote := tempRemote 1185 if c.expectSingle && c.chunkNo == 0 && optimizeFirstChunk { 1186 chunkRemote = baseRemote 1187 } 1188 info := f.wrapInfo(src, chunkRemote, size) 1189 1190 // Refill chunkLimit and let basePut repeatedly call chunkingReader.Read() 1191 c.chunkLimit = c.chunkSize 1192 // TODO: handle range/limit options 1193 chunk, errChunk := basePut(ctx, wrapIn, info, options...) 1194 if errChunk != nil { 1195 return nil, errChunk 1196 } 1197 1198 if size > 0 && c.readCount == savedReadCount && c.expectSingle { 1199 // basePut returned success but didn't call chunkingReader's Read. 1200 // This is possible if wrapped remote has performed the put by hash 1201 // because chunker bridges Hash from source for non-chunked files. 1202 // Hence, force Read here to update accounting and hashsums. 1203 if err := c.dummyRead(wrapIn, size); err != nil { 1204 return nil, err 1205 } 1206 } 1207 if c.sizeLeft == 0 && !c.done { 1208 // The file has been apparently put by hash, force completion. 1209 c.done = true 1210 } 1211 1212 // Expected a single chunk but more to come, so name it as usual. 1213 if !c.done && chunkRemote != tempRemote { 1214 fs.Infof(chunk, "Expected single chunk, got more") 1215 chunkMoved, errMove := f.baseMove(ctx, chunk, tempRemote, delFailed) 1216 if errMove != nil { 1217 silentlyRemove(ctx, chunk) 1218 return nil, errMove 1219 } 1220 chunk = chunkMoved 1221 } 1222 1223 // Wrapped remote may or may not have seen EOF from chunking reader, 1224 // e.g. the box multi-uploader reads exactly the chunk size specified 1225 // and skips the "EOF" read. Hence, switch to next limit here. 1226 if !(c.chunkLimit == 0 || c.chunkLimit == c.chunkSize || c.sizeTotal == -1 || c.done) { 1227 silentlyRemove(ctx, chunk) 1228 return nil, fmt.Errorf("destination ignored %d data bytes", c.chunkLimit) 1229 } 1230 c.chunkLimit = c.chunkSize 1231 1232 c.chunks = append(c.chunks, chunk) 1233 } 1234 1235 // Validate uploaded size 1236 if c.sizeTotal != -1 && c.readCount != c.sizeTotal { 1237 return nil, fmt.Errorf("incorrect upload size %d != %d", c.readCount, c.sizeTotal) 1238 } 1239 1240 // Check for input that looks like valid metadata 1241 needMeta := len(c.chunks) > 1 1242 if c.readCount <= maxMetadataSize && len(c.chunks) == 1 { 1243 _, madeByChunker, _ := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead) 1244 needMeta = madeByChunker 1245 } 1246 1247 // Finalize small object as non-chunked. 1248 // This can be bypassed, and single chunk with metadata will be 1249 // created if forced by consistent hashing or due to unsafe input. 1250 if !needMeta && !f.hashAll && f.useMeta { 1251 // If previous object was chunked, remove its chunks 1252 f.removeOldChunks(ctx, baseRemote) 1253 1254 // Rename single data chunk in place 1255 chunk := c.chunks[0] 1256 if chunk.Remote() != baseRemote { 1257 chunkMoved, errMove := f.baseMove(ctx, chunk, baseRemote, delAlways) 1258 if errMove != nil { 1259 silentlyRemove(ctx, chunk) 1260 return nil, errMove 1261 } 1262 chunk = chunkMoved 1263 } 1264 1265 return f.newObject("", chunk, nil), nil 1266 } 1267 1268 // Validate total size of data chunks 1269 var sizeTotal int64 1270 for _, chunk := range c.chunks { 1271 sizeTotal += chunk.Size() 1272 } 1273 if sizeTotal != c.readCount { 1274 return nil, fmt.Errorf("incorrect chunks size %d != %d", sizeTotal, c.readCount) 1275 } 1276 1277 // If previous object was chunked, remove its chunks 1278 f.removeOldChunks(ctx, baseRemote) 1279 1280 if !f.useNoRename { 1281 // The transaction suffix will be removed for backends with quick rename operations 1282 for chunkNo, chunk := range c.chunks { 1283 chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "") 1284 chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed) 1285 if errMove != nil { 1286 return nil, errMove 1287 } 1288 c.chunks[chunkNo] = chunkMoved 1289 } 1290 xactID = "" 1291 } 1292 1293 if !f.useMeta { 1294 // Remove stale metadata, if any 1295 oldMeta, errOldMeta := f.base.NewObject(ctx, baseRemote) 1296 if errOldMeta == nil { 1297 silentlyRemove(ctx, oldMeta) 1298 } 1299 1300 o := f.newObject(baseRemote, nil, c.chunks) 1301 o.size = sizeTotal 1302 return o, nil 1303 } 1304 1305 // Update meta object 1306 var metadata []byte 1307 switch f.opt.MetaFormat { 1308 case "simplejson": 1309 c.updateHashes() 1310 metadata, err = marshalSimpleJSON(ctx, sizeTotal, len(c.chunks), c.md5, c.sha1, xactID) 1311 } 1312 if err == nil { 1313 metaInfo := f.wrapInfo(src, baseRemote, int64(len(metadata))) 1314 metaObject, err = basePut(ctx, bytes.NewReader(metadata), metaInfo) 1315 } 1316 if err != nil { 1317 return nil, err 1318 } 1319 1320 o := f.newObject("", metaObject, c.chunks) 1321 o.size = sizeTotal 1322 o.xactID = xactID 1323 return o, nil 1324} 1325 1326type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) 1327 1328type chunkingReader struct { 1329 baseReader io.Reader 1330 sizeTotal int64 1331 sizeLeft int64 1332 readCount int64 1333 chunkSize int64 1334 chunkLimit int64 1335 chunkNo int 1336 err error 1337 done bool 1338 chunks []fs.Object 1339 expectSingle bool 1340 smallHead []byte 1341 fs *Fs 1342 hasher gohash.Hash 1343 md5 string 1344 sha1 string 1345} 1346 1347func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader { 1348 c := &chunkingReader{ 1349 fs: f, 1350 chunkSize: int64(f.opt.ChunkSize), 1351 sizeTotal: src.Size(), 1352 } 1353 c.chunkLimit = c.chunkSize 1354 c.sizeLeft = c.sizeTotal 1355 c.expectSingle = c.sizeTotal >= 0 && c.sizeTotal <= c.chunkSize 1356 return c 1357} 1358 1359func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader { 1360 baseIn, wrapBack := accounting.UnWrap(in) 1361 1362 switch { 1363 case c.fs.useMD5: 1364 srcObj := fs.UnWrapObjectInfo(src) 1365 if srcObj != nil && srcObj.Fs().Features().SlowHash { 1366 fs.Debugf(src, "skip slow MD5 on source file, hashing in-transit") 1367 c.hasher = md5.New() 1368 break 1369 } 1370 if c.md5, _ = src.Hash(ctx, hash.MD5); c.md5 == "" { 1371 if c.fs.hashFallback { 1372 c.sha1, _ = src.Hash(ctx, hash.SHA1) 1373 } else { 1374 c.hasher = md5.New() 1375 } 1376 } 1377 case c.fs.useSHA1: 1378 srcObj := fs.UnWrapObjectInfo(src) 1379 if srcObj != nil && srcObj.Fs().Features().SlowHash { 1380 fs.Debugf(src, "skip slow SHA1 on source file, hashing in-transit") 1381 c.hasher = sha1.New() 1382 break 1383 } 1384 if c.sha1, _ = src.Hash(ctx, hash.SHA1); c.sha1 == "" { 1385 if c.fs.hashFallback { 1386 c.md5, _ = src.Hash(ctx, hash.MD5) 1387 } else { 1388 c.hasher = sha1.New() 1389 } 1390 } 1391 } 1392 1393 if c.hasher != nil { 1394 baseIn = io.TeeReader(baseIn, c.hasher) 1395 } 1396 c.baseReader = baseIn 1397 return wrapBack(c) 1398} 1399 1400func (c *chunkingReader) updateHashes() { 1401 if c.hasher == nil { 1402 return 1403 } 1404 switch { 1405 case c.fs.useMD5: 1406 c.md5 = hex.EncodeToString(c.hasher.Sum(nil)) 1407 case c.fs.useSHA1: 1408 c.sha1 = hex.EncodeToString(c.hasher.Sum(nil)) 1409 } 1410} 1411 1412// Note: Read is not called if wrapped remote performs put by hash. 1413func (c *chunkingReader) Read(buf []byte) (bytesRead int, err error) { 1414 if c.chunkLimit <= 0 { 1415 // Chunk complete - switch to next one. 1416 // Note #1: 1417 // We might not get here because some remotes (e.g. box multi-uploader) 1418 // read the specified size exactly and skip the concluding EOF Read. 1419 // Then a check in the put loop will kick in. 1420 // Note #2: 1421 // The crypt backend after receiving EOF here will call Read again 1422 // and we must insist on returning EOF, so we postpone refilling 1423 // chunkLimit to the main loop. 1424 return 0, io.EOF 1425 } 1426 if int64(len(buf)) > c.chunkLimit { 1427 buf = buf[0:c.chunkLimit] 1428 } 1429 bytesRead, err = c.baseReader.Read(buf) 1430 if err != nil && err != io.EOF { 1431 c.err = err 1432 c.done = true 1433 return 1434 } 1435 c.accountBytes(int64(bytesRead)) 1436 if c.chunkNo == 0 && c.expectSingle && bytesRead > 0 && c.readCount <= maxMetadataSize { 1437 c.smallHead = append(c.smallHead, buf[:bytesRead]...) 1438 } 1439 if bytesRead == 0 && c.sizeLeft == 0 { 1440 err = io.EOF // Force EOF when no data left. 1441 } 1442 if err == io.EOF { 1443 c.done = true 1444 } 1445 return 1446} 1447 1448func (c *chunkingReader) accountBytes(bytesRead int64) { 1449 c.readCount += bytesRead 1450 c.chunkLimit -= bytesRead 1451 if c.sizeLeft != -1 { 1452 c.sizeLeft -= bytesRead 1453 } 1454} 1455 1456// dummyRead updates accounting, hashsums, etc. by simulating reads 1457func (c *chunkingReader) dummyRead(in io.Reader, size int64) error { 1458 if c.hasher == nil && c.readCount+size > maxMetadataSize { 1459 c.accountBytes(size) 1460 return nil 1461 } 1462 const bufLen = 1048576 // 1 MiB 1463 buf := make([]byte, bufLen) 1464 for size > 0 { 1465 n := size 1466 if n > bufLen { 1467 n = bufLen 1468 } 1469 if _, err := io.ReadFull(in, buf[0:n]); err != nil { 1470 return err 1471 } 1472 size -= n 1473 } 1474 return nil 1475} 1476 1477// rollback removes uploaded temporary chunks 1478func (c *chunkingReader) rollback(ctx context.Context, metaObject fs.Object) { 1479 if metaObject != nil { 1480 c.chunks = append(c.chunks, metaObject) 1481 } 1482 for _, chunk := range c.chunks { 1483 if err := chunk.Remove(ctx); err != nil { 1484 fs.Errorf(chunk, "Failed to remove temporary chunk: %v", err) 1485 } 1486 } 1487} 1488 1489func (f *Fs) removeOldChunks(ctx context.Context, remote string) { 1490 oldFsObject, err := f.NewObject(ctx, remote) 1491 if err == nil { 1492 oldObject := oldFsObject.(*Object) 1493 for _, chunk := range oldObject.chunks { 1494 if err := chunk.Remove(ctx); err != nil { 1495 fs.Errorf(chunk, "Failed to remove old chunk: %v", err) 1496 } 1497 } 1498 } 1499} 1500 1501// Put into the remote path with the given modTime and size. 1502// 1503// May create the object even if it returns an error - if so 1504// will return the object and the error, otherwise will return 1505// nil and the error 1506func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { 1507 return f.put(ctx, in, src, src.Remote(), options, f.base.Put, "put", nil) 1508} 1509 1510// PutStream uploads to the remote path with the modTime given of indeterminate size 1511func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { 1512 return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream, "upload", nil) 1513} 1514 1515// Update in to the object with the modTime given of the given size 1516func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error { 1517 basePut := o.f.base.Put 1518 if src.Size() < 0 { 1519 basePut = o.f.base.Features().PutStream 1520 if basePut == nil { 1521 return errors.New("wrapped file system does not support streaming uploads") 1522 } 1523 } 1524 oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut, "update", o) 1525 if err == nil { 1526 *o = *oNew.(*Object) 1527 } 1528 return err 1529} 1530 1531// PutUnchecked uploads the object 1532// 1533// This will create a duplicate if we upload a new file without 1534// checking to see if there is one already - use Put() for that. 1535func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) { 1536 do := f.base.Features().PutUnchecked 1537 if do == nil { 1538 return nil, errors.New("can't PutUnchecked") 1539 } 1540 // TODO: handle range/limit options and really chunk stream here! 1541 o, err := do(ctx, in, f.wrapInfo(src, "", -1)) 1542 if err != nil { 1543 return nil, err 1544 } 1545 return f.newObject("", o, nil), nil 1546} 1547 1548// Hashes returns the supported hash sets. 1549// Chunker advertises a hash type if and only if it can be calculated 1550// for files of any size, non-chunked or composite. 1551func (f *Fs) Hashes() hash.Set { 1552 // composites AND no fallback AND (chunker OR wrapped Fs will hash all non-chunked's) 1553 if f.useMD5 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.MD5)) { 1554 return hash.NewHashSet(hash.MD5) 1555 } 1556 if f.useSHA1 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.SHA1)) { 1557 return hash.NewHashSet(hash.SHA1) 1558 } 1559 return hash.NewHashSet() // can't provide strong guarantees 1560} 1561 1562// Mkdir makes the directory (container, bucket) 1563// 1564// Shouldn't return an error if it already exists 1565func (f *Fs) Mkdir(ctx context.Context, dir string) error { 1566 if err := f.forbidChunk(dir, dir); err != nil { 1567 return errors.Wrap(err, "can't mkdir") 1568 } 1569 return f.base.Mkdir(ctx, dir) 1570} 1571 1572// Rmdir removes the directory (container, bucket) if empty 1573// 1574// Return an error if it doesn't exist or isn't empty 1575func (f *Fs) Rmdir(ctx context.Context, dir string) error { 1576 return f.base.Rmdir(ctx, dir) 1577} 1578 1579// Purge all files in the directory 1580// 1581// Implement this if you have a way of deleting all the files 1582// quicker than just running Remove() on the result of List() 1583// 1584// Return an error if it doesn't exist. 1585// 1586// This command will chain to `purge` from wrapped remote. 1587// As a result it removes not only composite chunker files with their 1588// active chunks but also all hidden temporary chunks in the directory. 1589// 1590func (f *Fs) Purge(ctx context.Context, dir string) error { 1591 do := f.base.Features().Purge 1592 if do == nil { 1593 return fs.ErrorCantPurge 1594 } 1595 return do(ctx, dir) 1596} 1597 1598// Remove an object (chunks and metadata, if any) 1599// 1600// Remove deletes only active chunks of the composite object. 1601// It does not try to look for temporary chunks because they could belong 1602// to another command modifying this composite file in parallel. 1603// 1604// Commands normally cleanup all temporary chunks in case of a failure. 1605// However, if rclone dies unexpectedly, it can leave hidden temporary 1606// chunks, which cannot be discovered using the `list` command. 1607// Remove does not try to search for such chunks or to delete them. 1608// Sometimes this can lead to strange results e.g. when `list` shows that 1609// directory is empty but `rmdir` refuses to remove it because on the 1610// level of wrapped remote it's actually *not* empty. 1611// As a workaround users can use `purge` to forcibly remove it. 1612// 1613// In future, a flag `--chunker-delete-hidden` may be added which tells 1614// Remove to search directory for hidden chunks and remove them too 1615// (at the risk of breaking parallel commands). 1616// 1617// Remove is the only operation allowed on the composite files with 1618// invalid or future metadata format. 1619// We don't let user copy/move/update unsupported composite files. 1620// Let's at least let her get rid of them, just complain loudly. 1621// 1622// This can litter directory with orphan chunks of unsupported types, 1623// but as long as we remove meta object, even future releases will 1624// treat the composite file as removed and refuse to act upon it. 1625// 1626// Disclaimer: corruption can still happen if unsupported file is removed 1627// and then recreated with the same name. 1628// Unsupported control chunks will get re-picked by a more recent 1629// rclone version with unexpected results. This can be helped by 1630// the `delete hidden` flag above or at least the user has been warned. 1631// 1632func (o *Object) Remove(ctx context.Context) (err error) { 1633 if err := o.f.forbidChunk(o, o.Remote()); err != nil { 1634 // operations.Move can still call Remove if chunker's Move refuses 1635 // to corrupt file in hard mode. Hence, refuse to Remove, too. 1636 return errors.Wrap(err, "refuse to corrupt") 1637 } 1638 if err := o.readMetadata(ctx); err == ErrMetaUnknown { 1639 // Proceed but warn user that unexpected things can happen. 1640 fs.Errorf(o, "Removing a file with unsupported metadata: %v", err) 1641 } 1642 1643 // Remove non-chunked file or meta object of a composite file. 1644 if o.main != nil { 1645 err = o.main.Remove(ctx) 1646 } 1647 1648 // Remove only active data chunks, ignore any temporary chunks that 1649 // might probably be created in parallel by other transactions. 1650 for _, chunk := range o.chunks { 1651 chunkErr := chunk.Remove(ctx) 1652 if err == nil { 1653 err = chunkErr 1654 } 1655 } 1656 1657 // There are no known control chunks to remove atm. 1658 return err 1659} 1660 1661// copyOrMove implements copy or move 1662func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMoveFn, md5, sha1, opName string) (fs.Object, error) { 1663 if err := f.forbidChunk(o, remote); err != nil { 1664 return nil, errors.Wrapf(err, "can't %s", opName) 1665 } 1666 if err := o.readMetadata(ctx); err != nil { 1667 // Refuse to copy/move composite files with invalid or future 1668 // metadata format which might involve unsupported chunk types. 1669 return nil, errors.Wrapf(err, "can't %s this file", opName) 1670 } 1671 if !o.isComposite() { 1672 fs.Debugf(o, "%s non-chunked object...", opName) 1673 oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk 1674 if err != nil { 1675 return nil, err 1676 } 1677 return f.newObject("", oResult, nil), nil 1678 } 1679 1680 fs.Debugf(o, "%s %d data chunks...", opName, len(o.chunks)) 1681 mainRemote := o.remote 1682 var newChunks []fs.Object 1683 var err error 1684 1685 // Copy/move active data chunks. 1686 // Ignore possible temporary chunks being created by parallel operations. 1687 for _, chunk := range o.chunks { 1688 chunkRemote := chunk.Remote() 1689 if !strings.HasPrefix(chunkRemote, mainRemote) { 1690 err = fmt.Errorf("invalid chunk name %q", chunkRemote) 1691 break 1692 } 1693 chunkSuffix := chunkRemote[len(mainRemote):] 1694 chunkResult, err := do(ctx, chunk, remote+chunkSuffix) 1695 if err != nil { 1696 break 1697 } 1698 newChunks = append(newChunks, chunkResult) 1699 } 1700 1701 // Copy or move old metadata. 1702 // There are no known control chunks to move/copy atm. 1703 var metaObject fs.Object 1704 if err == nil && o.main != nil { 1705 metaObject, err = do(ctx, o.main, remote) 1706 } 1707 if err != nil { 1708 for _, chunk := range newChunks { 1709 silentlyRemove(ctx, chunk) 1710 } 1711 return nil, err 1712 } 1713 1714 // Create wrapping object, calculate and validate total size 1715 newObj := f.newObject(remote, metaObject, newChunks) 1716 err = newObj.validate() 1717 if err != nil { 1718 silentlyRemove(ctx, newObj) 1719 return nil, err 1720 } 1721 1722 // Update metadata 1723 var metadata []byte 1724 switch f.opt.MetaFormat { 1725 case "simplejson": 1726 metadata, err = marshalSimpleJSON(ctx, newObj.size, len(newChunks), md5, sha1, o.xactID) 1727 if err == nil { 1728 metaInfo := f.wrapInfo(metaObject, "", int64(len(metadata))) 1729 err = newObj.main.Update(ctx, bytes.NewReader(metadata), metaInfo) 1730 } 1731 case "none": 1732 if newObj.main != nil { 1733 err = newObj.main.Remove(ctx) 1734 } 1735 } 1736 1737 // Return the composite object 1738 if err != nil { 1739 silentlyRemove(ctx, newObj) 1740 return nil, err 1741 } 1742 return newObj, nil 1743} 1744 1745type copyMoveFn func(context.Context, fs.Object, string) (fs.Object, error) 1746 1747func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string) (obj *Object, md5, sha1 string, ok bool) { 1748 var diff string 1749 obj, ok = src.(*Object) 1750 1751 switch { 1752 case !ok: 1753 diff = "remote types" 1754 case !operations.SameConfig(f.base, obj.f.base): 1755 diff = "wrapped remotes" 1756 case f.opt.ChunkSize != obj.f.opt.ChunkSize: 1757 diff = "chunk sizes" 1758 case f.opt.NameFormat != obj.f.opt.NameFormat: 1759 diff = "chunk name formats" 1760 case f.opt.StartFrom != obj.f.opt.StartFrom: 1761 diff = "chunk numbering" 1762 case f.opt.MetaFormat != obj.f.opt.MetaFormat: 1763 diff = "meta formats" 1764 } 1765 if diff != "" { 1766 fs.Debugf(src, "Can't %s - different %s", opName, diff) 1767 ok = false 1768 return 1769 } 1770 1771 if obj.unsure { 1772 // ensure object is composite if need to re-read metadata 1773 _ = obj.readMetadata(ctx) 1774 } 1775 requireMetaHash := obj.isComposite() && f.opt.MetaFormat == "simplejson" 1776 if !requireMetaHash && !f.hashAll { 1777 ok = true // hash is not required for metadata 1778 return 1779 } 1780 1781 switch { 1782 case f.useMD5: 1783 md5, _ = obj.Hash(ctx, hash.MD5) 1784 ok = md5 != "" 1785 if !ok && f.hashFallback { 1786 sha1, _ = obj.Hash(ctx, hash.SHA1) 1787 ok = sha1 != "" 1788 } 1789 case f.useSHA1: 1790 sha1, _ = obj.Hash(ctx, hash.SHA1) 1791 ok = sha1 != "" 1792 if !ok && f.hashFallback { 1793 md5, _ = obj.Hash(ctx, hash.MD5) 1794 ok = md5 != "" 1795 } 1796 default: 1797 ok = false 1798 } 1799 if !ok { 1800 fs.Debugf(src, "Can't %s - required hash not found", opName) 1801 } 1802 return 1803} 1804 1805// Copy src to this remote using server-side copy operations. 1806// 1807// This is stored with the remote path given 1808// 1809// It returns the destination Object and a possible error 1810// 1811// Will only be called if src.Fs().Name() == f.Name() 1812// 1813// If it isn't possible then return fs.ErrorCantCopy 1814func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { 1815 baseCopy := f.base.Features().Copy 1816 if baseCopy == nil { 1817 return nil, fs.ErrorCantCopy 1818 } 1819 obj, md5, sha1, ok := f.okForServerSide(ctx, src, "copy") 1820 if !ok { 1821 return nil, fs.ErrorCantCopy 1822 } 1823 return f.copyOrMove(ctx, obj, remote, baseCopy, md5, sha1, "copy") 1824} 1825 1826// Move src to this remote using server-side move operations. 1827// 1828// This is stored with the remote path given 1829// 1830// It returns the destination Object and a possible error 1831// 1832// Will only be called if src.Fs().Name() == f.Name() 1833// 1834// If it isn't possible then return fs.ErrorCantMove 1835func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { 1836 baseMove := func(ctx context.Context, src fs.Object, remote string) (fs.Object, error) { 1837 return f.baseMove(ctx, src, remote, delNever) 1838 } 1839 obj, md5, sha1, ok := f.okForServerSide(ctx, src, "move") 1840 if !ok { 1841 return nil, fs.ErrorCantMove 1842 } 1843 return f.copyOrMove(ctx, obj, remote, baseMove, md5, sha1, "move") 1844} 1845 1846// baseMove chains to the wrapped Move or simulates it by Copy+Delete 1847func (f *Fs) baseMove(ctx context.Context, src fs.Object, remote string, delMode int) (fs.Object, error) { 1848 var ( 1849 dest fs.Object 1850 err error 1851 ) 1852 switch delMode { 1853 case delAlways: 1854 dest, err = f.base.NewObject(ctx, remote) 1855 case delFailed: 1856 dest, err = operations.Move(ctx, f.base, nil, remote, src) 1857 if err == nil { 1858 return dest, err 1859 } 1860 dest, err = f.base.NewObject(ctx, remote) 1861 case delNever: 1862 // fall thru, the default 1863 } 1864 if err != nil { 1865 dest = nil 1866 } 1867 return operations.Move(ctx, f.base, dest, remote, src) 1868} 1869 1870// DirMove moves src, srcRemote to this remote at dstRemote 1871// using server-side move operations. 1872// 1873// Will only be called if src.Fs().Name() == f.Name() 1874// 1875// If it isn't possible then return fs.ErrorCantDirMove 1876// 1877// If destination exists then return fs.ErrorDirExists 1878func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error { 1879 do := f.base.Features().DirMove 1880 if do == nil { 1881 return fs.ErrorCantDirMove 1882 } 1883 srcFs, ok := src.(*Fs) 1884 if !ok { 1885 fs.Debugf(srcFs, "Can't move directory - not same remote type") 1886 return fs.ErrorCantDirMove 1887 } 1888 return do(ctx, srcFs.base, srcRemote, dstRemote) 1889} 1890 1891// CleanUp the trash in the Fs 1892// 1893// Implement this if you have a way of emptying the trash or 1894// otherwise cleaning up old versions of files. 1895func (f *Fs) CleanUp(ctx context.Context) error { 1896 do := f.base.Features().CleanUp 1897 if do == nil { 1898 return errors.New("can't CleanUp") 1899 } 1900 return do(ctx) 1901} 1902 1903// About gets quota information from the Fs 1904func (f *Fs) About(ctx context.Context) (*fs.Usage, error) { 1905 do := f.base.Features().About 1906 if do == nil { 1907 return nil, errors.New("About not supported") 1908 } 1909 return do(ctx) 1910} 1911 1912// UnWrap returns the Fs that this Fs is wrapping 1913func (f *Fs) UnWrap() fs.Fs { 1914 return f.base 1915} 1916 1917// WrapFs returns the Fs that is wrapping this Fs 1918func (f *Fs) WrapFs() fs.Fs { 1919 return f.wrapper 1920} 1921 1922// SetWrapper sets the Fs that is wrapping this Fs 1923func (f *Fs) SetWrapper(wrapper fs.Fs) { 1924 f.wrapper = wrapper 1925} 1926 1927// ChangeNotify calls the passed function with a path 1928// that has had changes. If the implementation 1929// uses polling, it should adhere to the given interval. 1930// 1931// Replace data chunk names by the name of composite file. 1932// Ignore temporary and control chunks. 1933func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) { 1934 do := f.base.Features().ChangeNotify 1935 if do == nil { 1936 return 1937 } 1938 wrappedNotifyFunc := func(path string, entryType fs.EntryType) { 1939 //fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType) 1940 if entryType == fs.EntryObject { 1941 mainPath, _, _, xactID := f.parseChunkName(path) 1942 metaXactID := "" 1943 if f.useNoRename { 1944 metaObject, _ := f.base.NewObject(ctx, mainPath) 1945 dummyObject := f.newObject("", metaObject, nil) 1946 metaXactID, _ = dummyObject.readXactID(ctx) 1947 } 1948 if mainPath != "" && xactID == metaXactID { 1949 path = mainPath 1950 } 1951 } 1952 notifyFunc(path, entryType) 1953 } 1954 do(ctx, wrappedNotifyFunc, pollIntervalChan) 1955} 1956 1957// Shutdown the backend, closing any background tasks and any 1958// cached connections. 1959func (f *Fs) Shutdown(ctx context.Context) error { 1960 do := f.base.Features().Shutdown 1961 if do == nil { 1962 return nil 1963 } 1964 return do(ctx) 1965} 1966 1967// Object represents a composite file wrapping one or more data chunks 1968type Object struct { 1969 remote string 1970 main fs.Object // meta object if file is composite, or wrapped non-chunked file, nil if meta format is 'none' 1971 chunks []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none' 1972 size int64 // cached total size of chunks in a composite file or -1 for non-chunked files 1973 isFull bool // true if metadata has been read 1974 xIDCached bool // true if xactID has been read 1975 unsure bool // true if need to read metadata to detect object type 1976 xactID string // transaction ID for "norename" or empty string for "renamed" chunks 1977 md5 string 1978 sha1 string 1979 f *Fs 1980} 1981 1982func (o *Object) addChunk(chunk fs.Object, chunkNo int) error { 1983 if chunkNo < 0 { 1984 return fmt.Errorf("invalid chunk number %d", chunkNo+o.f.opt.StartFrom) 1985 } 1986 if chunkNo == len(o.chunks) { 1987 o.chunks = append(o.chunks, chunk) 1988 return nil 1989 } 1990 if chunkNo > maxSafeChunkNumber { 1991 return ErrChunkOverflow 1992 } 1993 if chunkNo > len(o.chunks) { 1994 newChunks := make([]fs.Object, (chunkNo + 1), (chunkNo+1)*2) 1995 copy(newChunks, o.chunks) 1996 o.chunks = newChunks 1997 } 1998 if o.chunks[chunkNo] != nil { 1999 return fmt.Errorf("duplicate chunk number %d", chunkNo+o.f.opt.StartFrom) 2000 } 2001 o.chunks[chunkNo] = chunk 2002 return nil 2003} 2004 2005// validate verifies the object internals and updates total size 2006func (o *Object) validate() error { 2007 if !o.isComposite() { 2008 _ = o.mainChunk() // verify that single wrapped chunk exists 2009 return nil 2010 } 2011 2012 metaObject := o.main // this file is composite - o.main refers to meta object (or nil if meta format is 'none') 2013 if metaObject != nil && metaObject.Size() > maxMetadataSize { 2014 // metadata of a chunked file must be a tiny piece of json 2015 o.size = -1 2016 return fmt.Errorf("%q metadata is too large", o.remote) 2017 } 2018 2019 var totalSize int64 2020 for _, chunk := range o.chunks { 2021 if chunk == nil { 2022 o.size = -1 2023 return fmt.Errorf("%q has missing chunks", o) 2024 } 2025 totalSize += chunk.Size() 2026 } 2027 o.size = totalSize // cache up the total data size 2028 return nil 2029} 2030 2031func (f *Fs) newObject(remote string, main fs.Object, chunks []fs.Object) *Object { 2032 var size int64 = -1 2033 if main != nil { 2034 size = main.Size() 2035 if remote == "" { 2036 remote = main.Remote() 2037 } 2038 } 2039 return &Object{ 2040 remote: remote, 2041 main: main, 2042 size: size, 2043 f: f, 2044 chunks: chunks, 2045 } 2046} 2047 2048// mainChunk returns: 2049// - a wrapped object for non-chunked files 2050// - meta object for chunked files with metadata 2051// - first chunk for chunked files without metadata 2052// Never returns nil. 2053func (o *Object) mainChunk() fs.Object { 2054 if o.main != nil { 2055 return o.main // meta object or non-chunked wrapped file 2056 } 2057 if o.chunks != nil { 2058 return o.chunks[0] // first chunk of a chunked composite file 2059 } 2060 panic("invalid chunked object") // very unlikely 2061} 2062 2063func (o *Object) isComposite() bool { 2064 return o.chunks != nil 2065} 2066 2067// Fs returns read only access to the Fs that this object is part of 2068func (o *Object) Fs() fs.Info { 2069 return o.f 2070} 2071 2072// Return a string version 2073func (o *Object) String() string { 2074 if o == nil { 2075 return "<nil>" 2076 } 2077 return o.remote 2078} 2079 2080// Remote returns the remote path 2081func (o *Object) Remote() string { 2082 return o.remote 2083} 2084 2085// Size returns the size of the file 2086func (o *Object) Size() int64 { 2087 if o.isComposite() { 2088 return o.size // total size of data chunks in a composite file 2089 } 2090 return o.mainChunk().Size() // size of wrapped non-chunked file 2091} 2092 2093// Storable returns whether object is storable 2094func (o *Object) Storable() bool { 2095 return o.mainChunk().Storable() 2096} 2097 2098// ModTime returns the modification time of the file 2099func (o *Object) ModTime(ctx context.Context) time.Time { 2100 return o.mainChunk().ModTime(ctx) 2101} 2102 2103// SetModTime sets the modification time of the file 2104func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error { 2105 if err := o.readMetadata(ctx); err != nil { 2106 return err // refuse to act on unsupported format 2107 } 2108 return o.mainChunk().SetModTime(ctx, mtime) 2109} 2110 2111// Hash returns the selected checksum of the file. 2112// If no checksum is available it returns "". 2113// 2114// Hash won't fail with `unsupported` error but return empty 2115// hash string if a particular hashsum type is not supported 2116// 2117// Hash takes hashsum from metadata if available or requests it 2118// from wrapped remote for non-chunked files. 2119// Metadata (if meta format is not 'none') is by default kept 2120// only for composite files. In the "All" hashing mode chunker 2121// will force metadata on all files if particular hashsum type 2122// is not supported by wrapped remote. 2123// 2124// Note that Hash prefers the wrapped hashsum for non-chunked 2125// file, then tries to read it from metadata. This in theory 2126// handles the unusual case when a small file has been tampered 2127// on the level of wrapped remote but chunker is unaware of that. 2128// 2129func (o *Object) Hash(ctx context.Context, hashType hash.Type) (string, error) { 2130 if err := o.readMetadata(ctx); err != nil { 2131 return "", err // valid metadata is required to get hash, abort 2132 } 2133 if !o.isComposite() { 2134 // First, chain to the wrapped non-chunked file if possible. 2135 if value, err := o.mainChunk().Hash(ctx, hashType); err == nil && value != "" { 2136 return value, nil 2137 } 2138 } 2139 2140 // Try hash from metadata if the file is composite or if wrapped remote fails. 2141 switch hashType { 2142 case hash.MD5: 2143 if o.md5 == "" { 2144 return "", nil 2145 } 2146 return o.md5, nil 2147 case hash.SHA1: 2148 if o.sha1 == "" { 2149 return "", nil 2150 } 2151 return o.sha1, nil 2152 default: 2153 return "", hash.ErrUnsupported 2154 } 2155} 2156 2157// UnWrap returns the wrapped Object 2158func (o *Object) UnWrap() fs.Object { 2159 return o.mainChunk() 2160} 2161 2162// Open opens the file for read. Call Close() on the returned io.ReadCloser 2163func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) { 2164 if err := o.readMetadata(ctx); err != nil { 2165 // refuse to open unsupported format 2166 return nil, errors.Wrap(err, "can't open") 2167 } 2168 if !o.isComposite() { 2169 return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file 2170 } 2171 2172 var openOptions []fs.OpenOption 2173 var offset, limit int64 = 0, -1 2174 2175 for _, option := range options { 2176 switch opt := option.(type) { 2177 case *fs.SeekOption: 2178 offset = opt.Offset 2179 case *fs.RangeOption: 2180 offset, limit = opt.Decode(o.size) 2181 default: 2182 // pass Options on to the wrapped open, if appropriate 2183 openOptions = append(openOptions, option) 2184 } 2185 } 2186 2187 if offset < 0 { 2188 return nil, errors.New("invalid offset") 2189 } 2190 if limit < 0 { 2191 limit = o.size - offset 2192 } 2193 2194 return o.newLinearReader(ctx, offset, limit, openOptions) 2195} 2196 2197// linearReader opens and reads file chunks sequentially, without read-ahead 2198type linearReader struct { 2199 ctx context.Context 2200 chunks []fs.Object 2201 options []fs.OpenOption 2202 limit int64 2203 count int64 2204 pos int 2205 reader io.ReadCloser 2206 err error 2207} 2208 2209func (o *Object) newLinearReader(ctx context.Context, offset, limit int64, options []fs.OpenOption) (io.ReadCloser, error) { 2210 r := &linearReader{ 2211 ctx: ctx, 2212 chunks: o.chunks, 2213 options: options, 2214 limit: limit, 2215 } 2216 2217 // skip to chunk for given offset 2218 err := io.EOF 2219 for offset >= 0 && err != nil { 2220 offset, err = r.nextChunk(offset) 2221 } 2222 if err == nil || err == io.EOF { 2223 r.err = err 2224 return r, nil 2225 } 2226 return nil, err 2227} 2228 2229func (r *linearReader) nextChunk(offset int64) (int64, error) { 2230 if r.err != nil { 2231 return -1, r.err 2232 } 2233 if r.pos >= len(r.chunks) || r.limit <= 0 || offset < 0 { 2234 return -1, io.EOF 2235 } 2236 2237 chunk := r.chunks[r.pos] 2238 count := chunk.Size() 2239 r.pos++ 2240 2241 if offset >= count { 2242 return offset - count, io.EOF 2243 } 2244 count -= offset 2245 if r.limit < count { 2246 count = r.limit 2247 } 2248 options := append(r.options, &fs.RangeOption{Start: offset, End: offset + count - 1}) 2249 2250 if err := r.Close(); err != nil { 2251 return -1, err 2252 } 2253 2254 reader, err := chunk.Open(r.ctx, options...) 2255 if err != nil { 2256 return -1, err 2257 } 2258 2259 r.reader = reader 2260 r.count = count 2261 return offset, nil 2262} 2263 2264func (r *linearReader) Read(p []byte) (n int, err error) { 2265 if r.err != nil { 2266 return 0, r.err 2267 } 2268 if r.limit <= 0 { 2269 r.err = io.EOF 2270 return 0, io.EOF 2271 } 2272 2273 for r.count <= 0 { 2274 // current chunk has been read completely or its size is zero 2275 off, err := r.nextChunk(0) 2276 if off < 0 { 2277 r.err = err 2278 return 0, err 2279 } 2280 } 2281 2282 n, err = r.reader.Read(p) 2283 if err == nil || err == io.EOF { 2284 r.count -= int64(n) 2285 r.limit -= int64(n) 2286 if r.limit > 0 { 2287 err = nil // more data to read 2288 } 2289 } 2290 r.err = err 2291 return 2292} 2293 2294func (r *linearReader) Close() (err error) { 2295 if r.reader != nil { 2296 err = r.reader.Close() 2297 r.reader = nil 2298 } 2299 return 2300} 2301 2302// ObjectInfo describes a wrapped fs.ObjectInfo for being the source 2303type ObjectInfo struct { 2304 src fs.ObjectInfo 2305 fs *Fs 2306 nChunks int // number of data chunks 2307 xactID string // transaction ID for "norename" or empty string for "renamed" chunks 2308 size int64 // overrides source size by the total size of data chunks 2309 remote string // overrides remote name 2310 md5 string // overrides MD5 checksum 2311 sha1 string // overrides SHA1 checksum 2312} 2313 2314func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, totalSize int64) *ObjectInfo { 2315 return &ObjectInfo{ 2316 src: src, 2317 fs: f, 2318 size: totalSize, 2319 remote: newRemote, 2320 } 2321} 2322 2323// Fs returns read only access to the Fs that this object is part of 2324func (oi *ObjectInfo) Fs() fs.Info { 2325 if oi.fs == nil { 2326 panic("stub ObjectInfo") 2327 } 2328 return oi.fs 2329} 2330 2331// String returns string representation 2332func (oi *ObjectInfo) String() string { 2333 return oi.src.String() 2334} 2335 2336// Storable returns whether object is storable 2337func (oi *ObjectInfo) Storable() bool { 2338 return oi.src.Storable() 2339} 2340 2341// Remote returns the remote path 2342func (oi *ObjectInfo) Remote() string { 2343 if oi.remote != "" { 2344 return oi.remote 2345 } 2346 return oi.src.Remote() 2347} 2348 2349// Size returns the size of the file 2350func (oi *ObjectInfo) Size() int64 { 2351 if oi.size != -1 { 2352 return oi.size 2353 } 2354 return oi.src.Size() 2355} 2356 2357// ModTime returns the modification time 2358func (oi *ObjectInfo) ModTime(ctx context.Context) time.Time { 2359 return oi.src.ModTime(ctx) 2360} 2361 2362// Hash returns the selected checksum of the wrapped file 2363// It returns "" if no checksum is available or if this 2364// info doesn't wrap the complete file. 2365func (oi *ObjectInfo) Hash(ctx context.Context, hashType hash.Type) (string, error) { 2366 var errUnsupported error 2367 switch hashType { 2368 case hash.MD5: 2369 if oi.md5 != "" { 2370 return oi.md5, nil 2371 } 2372 case hash.SHA1: 2373 if oi.sha1 != "" { 2374 return oi.sha1, nil 2375 } 2376 default: 2377 errUnsupported = hash.ErrUnsupported 2378 } 2379 if oi.Size() != oi.src.Size() { 2380 // fail if this info wraps only a part of the file 2381 return "", errUnsupported 2382 } 2383 // chain to full source if possible 2384 value, err := oi.src.Hash(ctx, hashType) 2385 if err == hash.ErrUnsupported { 2386 return "", errUnsupported 2387 } 2388 return value, err 2389} 2390 2391// ID returns the ID of the Object if known, or "" if not 2392func (o *Object) ID() string { 2393 if doer, ok := o.mainChunk().(fs.IDer); ok { 2394 return doer.ID() 2395 } 2396 return "" 2397} 2398 2399// Meta format `simplejson` 2400type metaSimpleJSON struct { 2401 // required core fields 2402 Version *int `json:"ver"` 2403 Size *int64 `json:"size"` // total size of data chunks 2404 ChunkNum *int `json:"nchunks"` // number of data chunks 2405 // optional extra fields 2406 MD5 string `json:"md5,omitempty"` 2407 SHA1 string `json:"sha1,omitempty"` 2408 XactID string `json:"txn,omitempty"` // transaction ID for norename transactions 2409} 2410 2411// marshalSimpleJSON 2412// 2413// Current implementation creates metadata in three cases: 2414// - for files larger than chunk size 2415// - if file contents can be mistaken as meta object 2416// - if consistent hashing is On but wrapped remote can't provide given hash 2417// 2418func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1, xactID string) ([]byte, error) { 2419 version := metadataVersion 2420 if xactID == "" && version == 2 { 2421 version = 1 2422 } 2423 metadata := metaSimpleJSON{ 2424 // required core fields 2425 Version: &version, 2426 Size: &size, 2427 ChunkNum: &nChunks, 2428 // optional extra fields 2429 MD5: md5, 2430 SHA1: sha1, 2431 XactID: xactID, 2432 } 2433 data, err := json.Marshal(&metadata) 2434 if err == nil && data != nil && len(data) >= maxMetadataSizeWritten { 2435 // be a nitpicker, never produce something you can't consume 2436 return nil, errors.New("metadata can't be this big, please report to rclone developers") 2437 } 2438 return data, err 2439} 2440 2441// unmarshalSimpleJSON parses metadata. 2442// 2443// In case of errors returns a flag telling whether input has been 2444// produced by incompatible version of rclone vs wasn't metadata at all. 2445// Only metadata format version 1 is supported atm. 2446// Future releases will transparently migrate older metadata objects. 2447// New format will have a higher version number and cannot be correctly 2448// handled by current implementation. 2449// The version check below will then explicitly ask user to upgrade rclone. 2450// 2451func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) { 2452 // Be strict about JSON format 2453 // to reduce possibility that a random small file resembles metadata. 2454 if len(data) > maxMetadataSizeWritten { 2455 return nil, false, ErrMetaTooBig 2456 } 2457 if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' { 2458 return nil, false, errors.New("invalid json") 2459 } 2460 var metadata metaSimpleJSON 2461 err = json.Unmarshal(data, &metadata) 2462 if err != nil { 2463 return nil, false, err 2464 } 2465 // Basic fields are strictly required 2466 // to reduce possibility that a random small file resembles metadata. 2467 if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil { 2468 return nil, false, errors.New("missing required field") 2469 } 2470 // Perform strict checks, avoid corruption of future metadata formats. 2471 if *metadata.Version < 1 { 2472 return nil, false, errors.New("wrong version") 2473 } 2474 if *metadata.Size < 0 { 2475 return nil, false, errors.New("negative file size") 2476 } 2477 if *metadata.ChunkNum < 0 { 2478 return nil, false, errors.New("negative number of chunks") 2479 } 2480 if *metadata.ChunkNum > maxSafeChunkNumber { 2481 return nil, true, ErrChunkOverflow // produced by incompatible version of rclone 2482 } 2483 if metadata.MD5 != "" { 2484 _, err = hex.DecodeString(metadata.MD5) 2485 if len(metadata.MD5) != 32 || err != nil { 2486 return nil, false, errors.New("wrong md5 hash") 2487 } 2488 } 2489 if metadata.SHA1 != "" { 2490 _, err = hex.DecodeString(metadata.SHA1) 2491 if len(metadata.SHA1) != 40 || err != nil { 2492 return nil, false, errors.New("wrong sha1 hash") 2493 } 2494 } 2495 // ChunkNum is allowed to be 0 in future versions 2496 if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion { 2497 return nil, false, errors.New("wrong number of chunks") 2498 } 2499 // Non-strict mode also accepts future metadata versions 2500 if *metadata.Version > metadataVersion { 2501 return nil, true, ErrMetaUnknown // produced by incompatible version of rclone 2502 } 2503 2504 var nilFs *Fs // nil object triggers appropriate type method 2505 info = nilFs.wrapInfo(metaObject, "", *metadata.Size) 2506 info.nChunks = *metadata.ChunkNum 2507 info.md5 = metadata.MD5 2508 info.sha1 = metadata.SHA1 2509 info.xactID = metadata.XactID 2510 return info, true, nil 2511} 2512 2513func silentlyRemove(ctx context.Context, o fs.Object) { 2514 _ = o.Remove(ctx) // ignore error 2515} 2516 2517// Name of the remote (as passed into NewFs) 2518func (f *Fs) Name() string { 2519 return f.name 2520} 2521 2522// Root of the remote (as passed into NewFs) 2523func (f *Fs) Root() string { 2524 return f.root 2525} 2526 2527// Features returns the optional features of this Fs 2528func (f *Fs) Features() *fs.Features { 2529 return f.features 2530} 2531 2532// String returns a description of the FS 2533func (f *Fs) String() string { 2534 return fmt.Sprintf("Chunked '%s:%s'", f.name, f.root) 2535} 2536 2537// Precision returns the precision of this Fs 2538func (f *Fs) Precision() time.Duration { 2539 return f.base.Precision() 2540} 2541 2542// CanQuickRename returns true if the Fs supports a quick rename operation 2543func (f *Fs) CanQuickRename() bool { 2544 return f.base.Features().Move != nil 2545} 2546 2547// Check the interfaces are satisfied 2548var ( 2549 _ fs.Fs = (*Fs)(nil) 2550 _ fs.Purger = (*Fs)(nil) 2551 _ fs.Copier = (*Fs)(nil) 2552 _ fs.Mover = (*Fs)(nil) 2553 _ fs.DirMover = (*Fs)(nil) 2554 _ fs.PutUncheckeder = (*Fs)(nil) 2555 _ fs.PutStreamer = (*Fs)(nil) 2556 _ fs.CleanUpper = (*Fs)(nil) 2557 _ fs.UnWrapper = (*Fs)(nil) 2558 _ fs.ListRer = (*Fs)(nil) 2559 _ fs.Abouter = (*Fs)(nil) 2560 _ fs.Wrapper = (*Fs)(nil) 2561 _ fs.ChangeNotifier = (*Fs)(nil) 2562 _ fs.Shutdowner = (*Fs)(nil) 2563 _ fs.ObjectInfo = (*ObjectInfo)(nil) 2564 _ fs.Object = (*Object)(nil) 2565 _ fs.ObjectUnWrapper = (*Object)(nil) 2566 _ fs.IDer = (*Object)(nil) 2567) 2568