1// Package chunker provides wrappers for Fs and Object which split large files in chunks
2package chunker
3
4import (
5	"bytes"
6	"context"
7	"crypto/md5"
8	"crypto/sha1"
9	"encoding/hex"
10	"encoding/json"
11	"fmt"
12	gohash "hash"
13	"io"
14	"io/ioutil"
15	"math/rand"
16	"path"
17	"regexp"
18	"sort"
19	"strconv"
20	"strings"
21	"sync"
22	"time"
23
24	"github.com/pkg/errors"
25	"github.com/rclone/rclone/fs"
26	"github.com/rclone/rclone/fs/accounting"
27	"github.com/rclone/rclone/fs/cache"
28	"github.com/rclone/rclone/fs/config/configmap"
29	"github.com/rclone/rclone/fs/config/configstruct"
30	"github.com/rclone/rclone/fs/fspath"
31	"github.com/rclone/rclone/fs/hash"
32	"github.com/rclone/rclone/fs/operations"
33)
34
35//
36// Chunker's composite files have one or more chunks
37// and optional metadata object. If it's present,
38// meta object is named after the original file.
39//
40// The only supported metadata format is simplejson atm.
41// It supports only per-file meta objects that are rudimentary,
42// used mostly for consistency checks (lazily for performance reasons).
43// Other formats can be developed that use an external meta store
44// free of these limitations, but this needs some support from
45// rclone core (e.g. metadata store interfaces).
46//
47// The following types of chunks are supported:
48// data and control, active and temporary.
49// Chunk type is identified by matching chunk file name
50// based on the chunk name format configured by user and transaction
51// style being used.
52//
53// Both data and control chunks can be either temporary (aka hidden)
54// or active (non-temporary aka normal aka permanent).
55// An operation creates temporary chunks while it runs.
56// By completion it removes temporary and leaves active chunks.
57//
58// Temporary chunks have a special hardcoded suffix in addition
59// to the configured name pattern.
60// Temporary suffix includes so called transaction identifier
61// (abbreviated as `xactID` below), a generic non-negative base-36 "number"
62// used by parallel operations to share a composite object.
63// Chunker also accepts the longer decimal temporary suffix (obsolete),
64// which is transparently converted to the new format. In its maximum
65// length of 13 decimals it makes a 7-digit base-36 number.
66//
67// When transactions is set to the norename style, data chunks will
68// keep their temporary chunk names (with the transacion identifier
69// suffix). To distinguish them from temporary chunks, the txn field
70// of the metadata file is set to match the transaction identifier of
71// the data chunks.
72//
73// Chunker can tell data chunks from control chunks by the characters
74// located in the "hash placeholder" position of configured format.
75// Data chunks have decimal digits there.
76// Control chunks have in that position a short lowercase alphanumeric
77// string (starting with a letter) prepended by underscore.
78//
79// Metadata format v1 does not define any control chunk types,
80// they are currently ignored aka reserved.
81// In future they can be used to implement resumable uploads etc.
82//
83const (
84	ctrlTypeRegStr   = `[a-z][a-z0-9]{2,6}`
85	tempSuffixFormat = `_%04s`
86	tempSuffixRegStr = `_([0-9a-z]{4,9})`
87	tempSuffixRegOld = `\.\.tmp_([0-9]{10,13})`
88)
89
90var (
91	// regular expressions to validate control type and temporary suffix
92	ctrlTypeRegexp   = regexp.MustCompile(`^` + ctrlTypeRegStr + `$`)
93	tempSuffixRegexp = regexp.MustCompile(`^` + tempSuffixRegStr + `$`)
94)
95
96// Normally metadata is a small piece of JSON (about 100-300 bytes).
97// The size of valid metadata must never exceed this limit.
98// Current maximum provides a reasonable room for future extensions.
99//
100// Please refrain from increasing it, this can cause old rclone versions
101// to fail, or worse, treat meta object as a normal file (see NewObject).
102// If more room is needed please bump metadata version forcing previous
103// releases to ask for upgrade, and offload extra info to a control chunk.
104//
105// And still chunker's primary function is to chunk large files
106// rather than serve as a generic metadata container.
107const maxMetadataSize = 1023
108const maxMetadataSizeWritten = 255
109
110// Current/highest supported metadata format.
111const metadataVersion = 2
112
113// optimizeFirstChunk enables the following optimization in the Put:
114// If a single chunk is expected, put the first chunk using the
115// base target name instead of a temporary name, thus avoiding
116// extra rename operation.
117// Warning: this optimization is not transaction safe.
118const optimizeFirstChunk = false
119
120// revealHidden is a stub until chunker lands the `reveal hidden` option.
121const revealHidden = false
122
123// Prevent memory overflow due to specially crafted chunk name
124const maxSafeChunkNumber = 10000000
125
126// Number of attempts to find unique transaction identifier
127const maxTransactionProbes = 100
128
129// standard chunker errors
130var (
131	ErrChunkOverflow = errors.New("chunk number overflow")
132	ErrMetaTooBig    = errors.New("metadata is too big")
133	ErrMetaUnknown   = errors.New("unknown metadata, please upgrade rclone")
134)
135
136// variants of baseMove's parameter delMode
137const (
138	delNever  = 0 // don't delete, just move
139	delAlways = 1 // delete destination before moving
140	delFailed = 2 // move, then delete and try again if failed
141)
142
143// Register with Fs
144func init() {
145	fs.Register(&fs.RegInfo{
146		Name:        "chunker",
147		Description: "Transparently chunk/split large files",
148		NewFs:       NewFs,
149		Options: []fs.Option{{
150			Name:     "remote",
151			Required: true,
152			Help: `Remote to chunk/unchunk.
153
154Normally should contain a ':' and a path, e.g. "myremote:path/to/dir",
155"myremote:bucket" or maybe "myremote:" (not recommended).`,
156		}, {
157			Name:     "chunk_size",
158			Advanced: false,
159			Default:  fs.SizeSuffix(2147483648), // 2 GiB
160			Help:     `Files larger than chunk size will be split in chunks.`,
161		}, {
162			Name:     "name_format",
163			Advanced: true,
164			Hide:     fs.OptionHideCommandLine,
165			Default:  `*.rclone_chunk.###`,
166			Help: `String format of chunk file names.
167
168The two placeholders are: base file name (*) and chunk number (#...).
169There must be one and only one asterisk and one or more consecutive hash characters.
170If chunk number has less digits than the number of hashes, it is left-padded by zeros.
171If there are more digits in the number, they are left as is.
172Possible chunk files are ignored if their name does not match given format.`,
173		}, {
174			Name:     "start_from",
175			Advanced: true,
176			Hide:     fs.OptionHideCommandLine,
177			Default:  1,
178			Help: `Minimum valid chunk number. Usually 0 or 1.
179
180By default chunk numbers start from 1.`,
181		}, {
182			Name:     "meta_format",
183			Advanced: true,
184			Hide:     fs.OptionHideCommandLine,
185			Default:  "simplejson",
186			Help: `Format of the metadata object or "none".
187
188By default "simplejson".
189Metadata is a small JSON file named after the composite file.`,
190			Examples: []fs.OptionExample{{
191				Value: "none",
192				Help: `Do not use metadata files at all.
193Requires hash type "none".`,
194			}, {
195				Value: "simplejson",
196				Help: `Simple JSON supports hash sums and chunk validation.
197
198It has the following fields: ver, size, nchunks, md5, sha1.`,
199			}},
200		}, {
201			Name:     "hash_type",
202			Advanced: false,
203			Default:  "md5",
204			Help: `Choose how chunker handles hash sums.
205
206All modes but "none" require metadata.`,
207			Examples: []fs.OptionExample{{
208				Value: "none",
209				Help: `Pass any hash supported by wrapped remote for non-chunked files.
210Return nothing otherwise.`,
211			}, {
212				Value: "md5",
213				Help:  `MD5 for composite files.`,
214			}, {
215				Value: "sha1",
216				Help:  `SHA1 for composite files.`,
217			}, {
218				Value: "md5all",
219				Help:  `MD5 for all files.`,
220			}, {
221				Value: "sha1all",
222				Help:  `SHA1 for all files.`,
223			}, {
224				Value: "md5quick",
225				Help: `Copying a file to chunker will request MD5 from the source.
226Falling back to SHA1 if unsupported.`,
227			}, {
228				Value: "sha1quick",
229				Help:  `Similar to "md5quick" but prefers SHA1 over MD5.`,
230			}},
231		}, {
232			Name:     "fail_hard",
233			Advanced: true,
234			Default:  false,
235			Help:     `Choose how chunker should handle files with missing or invalid chunks.`,
236			Examples: []fs.OptionExample{
237				{
238					Value: "true",
239					Help:  "Report errors and abort current command.",
240				}, {
241					Value: "false",
242					Help:  "Warn user, skip incomplete file and proceed.",
243				},
244			},
245		}, {
246			Name:     "transactions",
247			Advanced: true,
248			Default:  "rename",
249			Help:     `Choose how chunker should handle temporary files during transactions.`,
250			Hide:     fs.OptionHideCommandLine,
251			Examples: []fs.OptionExample{
252				{
253					Value: "rename",
254					Help:  "Rename temporary files after a successful transaction.",
255				}, {
256					Value: "norename",
257					Help: `Leave temporary file names and write transaction ID to metadata file.
258Metadata is required for no rename transactions (meta format cannot be "none").
259If you are using norename transactions you should be careful not to downgrade Rclone
260as older versions of Rclone don't support this transaction style and will misinterpret
261files manipulated by norename transactions.
262This method is EXPERIMENTAL, don't use on production systems.`,
263				}, {
264					Value: "auto",
265					Help: `Rename or norename will be used depending on capabilities of the backend.
266If meta format is set to "none", rename transactions will always be used.
267This method is EXPERIMENTAL, don't use on production systems.`,
268				},
269			},
270		}},
271	})
272}
273
274// NewFs constructs an Fs from the path, container:path
275func NewFs(ctx context.Context, name, rpath string, m configmap.Mapper) (fs.Fs, error) {
276	// Parse config into Options struct
277	opt := new(Options)
278	err := configstruct.Set(m, opt)
279	if err != nil {
280		return nil, err
281	}
282	if opt.StartFrom < 0 {
283		return nil, errors.New("start_from must be non-negative")
284	}
285
286	remote := opt.Remote
287	if strings.HasPrefix(remote, name+":") {
288		return nil, errors.New("can't point remote at itself - check the value of the remote setting")
289	}
290
291	baseName, basePath, err := fspath.SplitFs(remote)
292	if err != nil {
293		return nil, errors.Wrapf(err, "failed to parse remote %q to wrap", remote)
294	}
295	// Look for a file first
296	remotePath := fspath.JoinRootPath(basePath, rpath)
297	baseFs, err := cache.Get(ctx, baseName+remotePath)
298	if err != fs.ErrorIsFile && err != nil {
299		return nil, errors.Wrapf(err, "failed to make remote %q to wrap", baseName+remotePath)
300	}
301	if !operations.CanServerSideMove(baseFs) {
302		return nil, errors.New("can't use chunker on a backend which doesn't support server-side move or copy")
303	}
304
305	f := &Fs{
306		base: baseFs,
307		name: name,
308		root: rpath,
309		opt:  *opt,
310	}
311	cache.PinUntilFinalized(f.base, f)
312	f.dirSort = true // processEntries requires that meta Objects prerun data chunks atm.
313
314	if err := f.configure(opt.NameFormat, opt.MetaFormat, opt.HashType, opt.Transactions); err != nil {
315		return nil, err
316	}
317
318	// Handle the tricky case detected by FsMkdir/FsPutFiles/FsIsFile
319	// when `rpath` points to a composite multi-chunk file without metadata,
320	// i.e. `rpath` does not exist in the wrapped remote, but chunker
321	// detects a composite file because it finds the first chunk!
322	// (yet can't satisfy fstest.CheckListing, will ignore)
323	if err == nil && !f.useMeta && strings.Contains(rpath, "/") {
324		firstChunkPath := f.makeChunkName(remotePath, 0, "", "")
325		_, testErr := cache.Get(ctx, baseName+firstChunkPath)
326		if testErr == fs.ErrorIsFile {
327			err = testErr
328		}
329	}
330
331	// Note 1: the features here are ones we could support, and they are
332	// ANDed with the ones from wrappedFs.
333	// Note 2: features.Fill() points features.PutStream to our PutStream,
334	// but features.Mask() will nullify it if wrappedFs does not have it.
335	f.features = (&fs.Features{
336		CaseInsensitive:         true,
337		DuplicateFiles:          true,
338		ReadMimeType:            false, // Object.MimeType not supported
339		WriteMimeType:           true,
340		BucketBased:             true,
341		CanHaveEmptyDirectories: true,
342		ServerSideAcrossConfigs: true,
343	}).Fill(ctx, f).Mask(ctx, baseFs).WrapsFs(f, baseFs)
344
345	f.features.Disable("ListR") // Recursive listing may cause chunker skip files
346
347	return f, err
348}
349
350// Options defines the configuration for this backend
351type Options struct {
352	Remote       string        `config:"remote"`
353	ChunkSize    fs.SizeSuffix `config:"chunk_size"`
354	NameFormat   string        `config:"name_format"`
355	StartFrom    int           `config:"start_from"`
356	MetaFormat   string        `config:"meta_format"`
357	HashType     string        `config:"hash_type"`
358	FailHard     bool          `config:"fail_hard"`
359	Transactions string        `config:"transactions"`
360}
361
362// Fs represents a wrapped fs.Fs
363type Fs struct {
364	name         string
365	root         string
366	base         fs.Fs          // remote wrapped by chunker overlay
367	wrapper      fs.Fs          // wrapper is used by SetWrapper
368	useMeta      bool           // false if metadata format is 'none'
369	useMD5       bool           // mutually exclusive with useSHA1
370	useSHA1      bool           // mutually exclusive with useMD5
371	hashFallback bool           // allows fallback from MD5 to SHA1 and vice versa
372	hashAll      bool           // hash all files, mutually exclusive with hashFallback
373	dataNameFmt  string         // name format of data chunks
374	ctrlNameFmt  string         // name format of control chunks
375	nameRegexp   *regexp.Regexp // regular expression to match chunk names
376	xactIDRand   *rand.Rand     // generator of random transaction identifiers
377	xactIDMutex  sync.Mutex     // mutex for the source of randomness
378	opt          Options        // copy of Options
379	features     *fs.Features   // optional features
380	dirSort      bool           // reserved for future, ignored
381	useNoRename  bool           // can be set with the transactions option
382}
383
384// configure sets up chunker for given name format, meta format and hash type.
385// It also seeds the source of random transaction identifiers.
386// configure must be called only from NewFs or by unit tests.
387func (f *Fs) configure(nameFormat, metaFormat, hashType, transactionMode string) error {
388	if err := f.setChunkNameFormat(nameFormat); err != nil {
389		return errors.Wrapf(err, "invalid name format '%s'", nameFormat)
390	}
391	if err := f.setMetaFormat(metaFormat); err != nil {
392		return err
393	}
394	if err := f.setHashType(hashType); err != nil {
395		return err
396	}
397	if err := f.setTransactionMode(transactionMode); err != nil {
398		return err
399	}
400
401	randomSeed := time.Now().UnixNano()
402	f.xactIDRand = rand.New(rand.NewSource(randomSeed))
403
404	return nil
405}
406
407func (f *Fs) setMetaFormat(metaFormat string) error {
408	switch metaFormat {
409	case "none":
410		f.useMeta = false
411	case "simplejson":
412		f.useMeta = true
413	default:
414		return fmt.Errorf("unsupported meta format '%s'", metaFormat)
415	}
416	return nil
417}
418
419// setHashType
420// must be called *after* setMetaFormat.
421//
422// In the "All" mode chunker will force metadata on all files
423// if the wrapped remote can't provide given hashsum.
424func (f *Fs) setHashType(hashType string) error {
425	f.useMD5 = false
426	f.useSHA1 = false
427	f.hashFallback = false
428	f.hashAll = false
429	requireMetaHash := true
430
431	switch hashType {
432	case "none":
433		requireMetaHash = false
434	case "md5":
435		f.useMD5 = true
436	case "sha1":
437		f.useSHA1 = true
438	case "md5quick":
439		f.useMD5 = true
440		f.hashFallback = true
441	case "sha1quick":
442		f.useSHA1 = true
443		f.hashFallback = true
444	case "md5all":
445		f.useMD5 = true
446		f.hashAll = !f.base.Hashes().Contains(hash.MD5) || f.base.Features().SlowHash
447	case "sha1all":
448		f.useSHA1 = true
449		f.hashAll = !f.base.Hashes().Contains(hash.SHA1) || f.base.Features().SlowHash
450	default:
451		return fmt.Errorf("unsupported hash type '%s'", hashType)
452	}
453	if requireMetaHash && !f.useMeta {
454		return fmt.Errorf("hash type '%s' requires compatible meta format", hashType)
455	}
456	return nil
457}
458
459func (f *Fs) setTransactionMode(transactionMode string) error {
460	switch transactionMode {
461	case "rename":
462		f.useNoRename = false
463	case "norename":
464		if !f.useMeta {
465			return errors.New("incompatible transaction options")
466		}
467		f.useNoRename = true
468	case "auto":
469		f.useNoRename = !f.CanQuickRename()
470		if f.useNoRename && !f.useMeta {
471			f.useNoRename = false
472			return errors.New("using norename transactions requires metadata")
473		}
474	default:
475		return fmt.Errorf("unsupported transaction mode '%s'", transactionMode)
476	}
477	return nil
478}
479
480// setChunkNameFormat converts pattern based chunk name format
481// into Printf format and Regular expressions for data and
482// control chunks.
483func (f *Fs) setChunkNameFormat(pattern string) error {
484	// validate pattern
485	if strings.Count(pattern, "*") != 1 {
486		return errors.New("pattern must have exactly one asterisk (*)")
487	}
488	numDigits := strings.Count(pattern, "#")
489	if numDigits < 1 {
490		return errors.New("pattern must have a hash character (#)")
491	}
492	if strings.Index(pattern, "*") > strings.Index(pattern, "#") {
493		return errors.New("asterisk (*) in pattern must come before hashes (#)")
494	}
495	if ok, _ := regexp.MatchString("^[^#]*[#]+[^#]*$", pattern); !ok {
496		return errors.New("hashes (#) in pattern must be consecutive")
497	}
498	if dir, _ := path.Split(pattern); dir != "" {
499		return errors.New("directory separator prohibited")
500	}
501	if pattern[0] != '*' {
502		return errors.New("pattern must start with asterisk") // to be lifted later
503	}
504
505	// craft a unified regular expression for all types of chunks
506	reHashes := regexp.MustCompile("[#]+")
507	reDigits := "[0-9]+"
508	if numDigits > 1 {
509		reDigits = fmt.Sprintf("[0-9]{%d,}", numDigits)
510	}
511	reDataOrCtrl := fmt.Sprintf("(?:(%s)|_(%s))", reDigits, ctrlTypeRegStr)
512
513	// this must be non-greedy or else it could eat up temporary suffix
514	const mainNameRegStr = "(.+?)"
515
516	strRegex := regexp.QuoteMeta(pattern)
517	strRegex = reHashes.ReplaceAllLiteralString(strRegex, reDataOrCtrl)
518	strRegex = strings.Replace(strRegex, "\\*", mainNameRegStr, -1)
519	strRegex = fmt.Sprintf("^%s(?:%s|%s)?$", strRegex, tempSuffixRegStr, tempSuffixRegOld)
520	f.nameRegexp = regexp.MustCompile(strRegex)
521
522	// craft printf formats for active data/control chunks
523	fmtDigits := "%d"
524	if numDigits > 1 {
525		fmtDigits = fmt.Sprintf("%%0%dd", numDigits)
526	}
527	strFmt := strings.Replace(pattern, "%", "%%", -1)
528	strFmt = strings.Replace(strFmt, "*", "%s", 1)
529	f.dataNameFmt = reHashes.ReplaceAllLiteralString(strFmt, fmtDigits)
530	f.ctrlNameFmt = reHashes.ReplaceAllLiteralString(strFmt, "_%s")
531	return nil
532}
533
534// makeChunkName produces chunk name (or path) for a given file.
535//
536// filePath can be name, relative or absolute path of main file.
537//
538// chunkNo must be a zero based index of data chunk.
539// Negative chunkNo e.g. -1 indicates a control chunk.
540// ctrlType is type of control chunk (must be valid).
541// ctrlType must be "" for data chunks.
542//
543// xactID is a transaction identifier. Empty xactID denotes active chunk,
544// otherwise temporary chunk name is produced.
545//
546func (f *Fs) makeChunkName(filePath string, chunkNo int, ctrlType, xactID string) string {
547	dir, parentName := path.Split(filePath)
548	var name, tempSuffix string
549	switch {
550	case chunkNo >= 0 && ctrlType == "":
551		name = fmt.Sprintf(f.dataNameFmt, parentName, chunkNo+f.opt.StartFrom)
552	case chunkNo < 0 && ctrlTypeRegexp.MatchString(ctrlType):
553		name = fmt.Sprintf(f.ctrlNameFmt, parentName, ctrlType)
554	default:
555		panic("makeChunkName: invalid argument") // must not produce something we can't consume
556	}
557	if xactID != "" {
558		tempSuffix = fmt.Sprintf(tempSuffixFormat, xactID)
559		if !tempSuffixRegexp.MatchString(tempSuffix) {
560			panic("makeChunkName: invalid argument")
561		}
562	}
563	return dir + name + tempSuffix
564}
565
566// parseChunkName checks whether given file path belongs to
567// a chunk and extracts chunk name parts.
568//
569// filePath can be name, relative or absolute path of a file.
570//
571// Returned parentPath is path of the composite file owning the chunk.
572// It's a non-empty string if valid chunk name is detected
573// or "" if it's not a chunk.
574// Other returned values depend on detected chunk type:
575// data or control, active or temporary:
576//
577// data chunk - the returned chunkNo is non-negative and ctrlType is ""
578// control chunk - the chunkNo is -1 and ctrlType is a non-empty string
579// active chunk - the returned xactID is ""
580// temporary chunk - the xactID is a non-empty string
581func (f *Fs) parseChunkName(filePath string) (parentPath string, chunkNo int, ctrlType, xactID string) {
582	dir, name := path.Split(filePath)
583	match := f.nameRegexp.FindStringSubmatch(name)
584	if match == nil || match[1] == "" {
585		return "", -1, "", ""
586	}
587	var err error
588
589	chunkNo = -1
590	if match[2] != "" {
591		if chunkNo, err = strconv.Atoi(match[2]); err != nil {
592			chunkNo = -1
593		}
594		if chunkNo -= f.opt.StartFrom; chunkNo < 0 {
595			fs.Infof(f, "invalid data chunk number in file %q", name)
596			return "", -1, "", ""
597		}
598	}
599
600	if match[4] != "" {
601		xactID = match[4]
602	}
603	if match[5] != "" {
604		// old-style temporary suffix
605		number, err := strconv.ParseInt(match[5], 10, 64)
606		if err != nil || number < 0 {
607			fs.Infof(f, "invalid old-style transaction number in file %q", name)
608			return "", -1, "", ""
609		}
610		// convert old-style transaction number to base-36 transaction ID
611		xactID = fmt.Sprintf(tempSuffixFormat, strconv.FormatInt(number, 36))
612		xactID = xactID[1:] // strip leading underscore
613	}
614
615	parentPath = dir + match[1]
616	ctrlType = match[3]
617	return
618}
619
620// forbidChunk prints error message or raises error if file is chunk.
621// First argument sets log prefix, use `false` to suppress message.
622func (f *Fs) forbidChunk(o interface{}, filePath string) error {
623	if parentPath, _, _, _ := f.parseChunkName(filePath); parentPath != "" {
624		if f.opt.FailHard {
625			return fmt.Errorf("chunk overlap with %q", parentPath)
626		}
627		if boolVal, isBool := o.(bool); !isBool || boolVal {
628			fs.Errorf(o, "chunk overlap with %q", parentPath)
629		}
630	}
631	return nil
632}
633
634// newXactID produces a sufficiently random transaction identifier.
635//
636// The temporary suffix mask allows identifiers consisting of 4-9
637// base-36 digits (ie. digits 0-9 or lowercase letters a-z).
638// The identifiers must be unique between transactions running on
639// the single file in parallel.
640//
641// Currently the function produces 6-character identifiers.
642// Together with underscore this makes a 7-character temporary suffix.
643//
644// The first 4 characters isolate groups of transactions by time intervals.
645// The maximum length of interval is base-36 "zzzz" ie. 1,679,615 seconds.
646// The function rather takes a maximum prime closest to this number
647// (see https://primes.utm.edu) as the interval length to better safeguard
648// against repeating pseudo-random sequences in cases when rclone is
649// invoked from a periodic scheduler like unix cron.
650// Thus, the interval is slightly more than 19 days 10 hours 33 minutes.
651//
652// The remaining 2 base-36 digits (in the range from 0 to 1295 inclusive)
653// are taken from the local random source.
654// This provides about 0.1% collision probability for two parallel
655// operations started at the same second and working on the same file.
656//
657// Non-empty filePath argument enables probing for existing temporary chunk
658// to further eliminate collisions.
659func (f *Fs) newXactID(ctx context.Context, filePath string) (xactID string, err error) {
660	const closestPrimeZzzzSeconds = 1679609
661	const maxTwoBase36Digits = 1295
662
663	unixSec := time.Now().Unix()
664	if unixSec < 0 {
665		unixSec = -unixSec // unlikely but the number must be positive
666	}
667	circleSec := unixSec % closestPrimeZzzzSeconds
668	first4chars := strconv.FormatInt(circleSec, 36)
669
670	for tries := 0; tries < maxTransactionProbes; tries++ {
671		f.xactIDMutex.Lock()
672		randomness := f.xactIDRand.Int63n(maxTwoBase36Digits + 1)
673		f.xactIDMutex.Unlock()
674
675		last2chars := strconv.FormatInt(randomness, 36)
676		xactID = fmt.Sprintf("%04s%02s", first4chars, last2chars)
677
678		if filePath == "" {
679			return
680		}
681		probeChunk := f.makeChunkName(filePath, 0, "", xactID)
682		_, probeErr := f.base.NewObject(ctx, probeChunk)
683		if probeErr != nil {
684			return
685		}
686	}
687
688	return "", fmt.Errorf("can't setup transaction for %s", filePath)
689}
690
691// List the objects and directories in dir into entries.
692// The entries can be returned in any order but should be
693// for a complete directory.
694//
695// dir should be "" to list the root, and should not have
696// trailing slashes.
697//
698// This should return ErrDirNotFound if the directory isn't found.
699//
700// Commands normally cleanup all temporary chunks in case of a failure.
701// However, if rclone dies unexpectedly, it can leave behind a bunch of
702// hidden temporary chunks. List and its underlying chunkEntries()
703// silently skip all temporary chunks in the directory. It's okay if
704// they belong to an unfinished command running in parallel.
705//
706// However, there is no way to discover dead temporary chunks atm.
707// As a workaround users can use `purge` to forcibly remove the whole
708// directory together with dead chunks.
709// In future a flag named like `--chunker-list-hidden` may be added to
710// rclone that will tell List to reveal hidden chunks.
711//
712func (f *Fs) List(ctx context.Context, dir string) (entries fs.DirEntries, err error) {
713	entries, err = f.base.List(ctx, dir)
714	if err != nil {
715		return nil, err
716	}
717	return f.processEntries(ctx, entries, dir)
718}
719
720// ListR lists the objects and directories of the Fs starting
721// from dir recursively into out.
722//
723// dir should be "" to start from the root, and should not
724// have trailing slashes.
725//
726// This should return ErrDirNotFound if the directory isn't
727// found.
728//
729// It should call callback for each tranche of entries read.
730// These need not be returned in any particular order.  If
731// callback returns an error then the listing will stop
732// immediately.
733//
734// Don't implement this unless you have a more efficient way
735// of listing recursively than doing a directory traversal.
736func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
737	do := f.base.Features().ListR
738	return do(ctx, dir, func(entries fs.DirEntries) error {
739		newEntries, err := f.processEntries(ctx, entries, dir)
740		if err != nil {
741			return err
742		}
743		return callback(newEntries)
744	})
745}
746
747// processEntries assembles chunk entries into composite entries
748func (f *Fs) processEntries(ctx context.Context, origEntries fs.DirEntries, dirPath string) (newEntries fs.DirEntries, err error) {
749	var sortedEntries fs.DirEntries
750	if f.dirSort {
751		// sort entries so that meta objects go before their chunks
752		sortedEntries = make(fs.DirEntries, len(origEntries))
753		copy(sortedEntries, origEntries)
754		sort.Sort(sortedEntries)
755	} else {
756		sortedEntries = origEntries
757	}
758
759	byRemote := make(map[string]*Object)
760	badEntry := make(map[string]bool)
761	isSubdir := make(map[string]bool)
762	txnByRemote := map[string]string{}
763
764	var tempEntries fs.DirEntries
765	for _, dirOrObject := range sortedEntries {
766		switch entry := dirOrObject.(type) {
767		case fs.Object:
768			remote := entry.Remote()
769			mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(remote)
770			if mainRemote == "" {
771				// this is meta object or standalone file
772				object := f.newObject("", entry, nil)
773				byRemote[remote] = object
774				tempEntries = append(tempEntries, object)
775				if f.useNoRename {
776					txnByRemote[remote], err = object.readXactID(ctx)
777					if err != nil {
778						return nil, err
779					}
780				}
781				break
782			}
783			// this is some kind of chunk
784			// metobject should have been created above if present
785			mainObject := byRemote[mainRemote]
786			isSpecial := xactID != txnByRemote[mainRemote] || ctrlType != ""
787			if mainObject == nil && f.useMeta && !isSpecial {
788				fs.Debugf(f, "skip orphan data chunk %q", remote)
789				break
790			}
791			if mainObject == nil && !f.useMeta {
792				// this is the "nometa" case
793				// create dummy chunked object without metadata
794				mainObject = f.newObject(mainRemote, nil, nil)
795				byRemote[mainRemote] = mainObject
796				if !badEntry[mainRemote] {
797					tempEntries = append(tempEntries, mainObject)
798				}
799			}
800			if isSpecial {
801				if revealHidden {
802					fs.Infof(f, "ignore non-data chunk %q", remote)
803				}
804				// need to read metadata to ensure actual object type
805				// no need to read if metaobject is too big or absent,
806				// use the fact that before calling validate()
807				// the `size` field caches metaobject size, if any
808				if f.useMeta && mainObject != nil && mainObject.size <= maxMetadataSize {
809					mainObject.unsure = true
810				}
811				break
812			}
813			if err := mainObject.addChunk(entry, chunkNo); err != nil {
814				if f.opt.FailHard {
815					return nil, err
816				}
817				badEntry[mainRemote] = true
818			}
819		case fs.Directory:
820			isSubdir[entry.Remote()] = true
821			wrapDir := fs.NewDirCopy(ctx, entry)
822			wrapDir.SetRemote(entry.Remote())
823			tempEntries = append(tempEntries, wrapDir)
824		default:
825			if f.opt.FailHard {
826				return nil, fmt.Errorf("unknown object type %T", entry)
827			}
828			fs.Debugf(f, "unknown object type %T", entry)
829		}
830	}
831
832	for _, entry := range tempEntries {
833		if object, ok := entry.(*Object); ok {
834			remote := object.Remote()
835			if isSubdir[remote] {
836				if f.opt.FailHard {
837					return nil, fmt.Errorf("%q is both meta object and directory", remote)
838				}
839				badEntry[remote] = true // fall thru
840			}
841			if badEntry[remote] {
842				fs.Debugf(f, "invalid directory entry %q", remote)
843				continue
844			}
845			if err := object.validate(); err != nil {
846				if f.opt.FailHard {
847					return nil, err
848				}
849				fs.Debugf(f, "invalid chunks in object %q", remote)
850				continue
851			}
852		}
853		newEntries = append(newEntries, entry)
854	}
855
856	if f.dirSort {
857		sort.Sort(newEntries)
858	}
859	return newEntries, nil
860}
861
862// NewObject finds the Object at remote.
863//
864// Please note that every NewObject invocation will scan the whole directory.
865// Using here something like fs.DirCache might improve performance
866// (yet making the logic more complex).
867//
868// Note that chunker prefers analyzing file names rather than reading
869// the content of meta object assuming that directory scans are fast
870// but opening even a small file can be slow on some backends.
871//
872func (f *Fs) NewObject(ctx context.Context, remote string) (fs.Object, error) {
873	return f.scanObject(ctx, remote, false)
874}
875
876// scanObject is like NewObject with optional quick scan mode.
877// The quick mode avoids directory requests other than `List`,
878// ignores non-chunked objects and skips chunk size checks.
879func (f *Fs) scanObject(ctx context.Context, remote string, quickScan bool) (fs.Object, error) {
880	if err := f.forbidChunk(false, remote); err != nil {
881		return nil, errors.Wrap(err, "can't access")
882	}
883
884	var (
885		o             *Object
886		baseObj       fs.Object
887		currentXactID string
888		err           error
889		sameMain      bool
890	)
891
892	if f.useMeta {
893		baseObj, err = f.base.NewObject(ctx, remote)
894		if err != nil {
895			return nil, err
896		}
897		remote = baseObj.Remote()
898
899		// Chunker's meta object cannot be large and maxMetadataSize acts
900		// as a hard limit. Anything larger than that is treated as a
901		// non-chunked file without even checking its contents, so it's
902		// paramount to prevent metadata from exceeding the maximum size.
903		// Anything smaller is additionally checked for format.
904		o = f.newObject("", baseObj, nil)
905		if o.size > maxMetadataSize {
906			return o, nil
907		}
908	} else {
909		// Metadata is disabled, hence this is either a multi-chunk
910		// composite file without meta object or a non-chunked file.
911		// Create an empty wrapper here, scan directory to determine
912		// which case it is and postpone reading if it's the latter one.
913		o = f.newObject(remote, nil, nil)
914	}
915
916	// If the object is small, it's probably a meta object.
917	// However, composite file must have data chunks besides it.
918	// Scan directory for possible data chunks now and decide later on.
919	dir := path.Dir(strings.TrimRight(remote, "/"))
920	if dir == "." {
921		dir = ""
922	}
923	entries, err := f.base.List(ctx, dir)
924	switch err {
925	case nil:
926		// OK, fall thru
927	case fs.ErrorDirNotFound:
928		entries = nil
929	default:
930		return nil, errors.Wrap(err, "can't detect composite file")
931	}
932
933	if f.useNoRename {
934		currentXactID, err = o.readXactID(ctx)
935		if err != nil {
936			return nil, err
937		}
938	}
939	caseInsensitive := f.features.CaseInsensitive
940
941	for _, dirOrObject := range entries {
942		entry, ok := dirOrObject.(fs.Object)
943		if !ok {
944			continue
945		}
946		entryRemote := entry.Remote()
947		if !caseInsensitive && !strings.Contains(entryRemote, remote) {
948			continue // bypass regexp to save cpu
949		}
950		mainRemote, chunkNo, ctrlType, xactID := f.parseChunkName(entryRemote)
951		if mainRemote == "" {
952			continue // skip non-chunks
953		}
954		if caseInsensitive {
955			sameMain = strings.EqualFold(mainRemote, remote)
956		} else {
957			sameMain = mainRemote == remote
958		}
959		if !sameMain {
960			continue // skip alien chunks
961		}
962		if ctrlType != "" || xactID != currentXactID {
963			if f.useMeta {
964				// temporary/control chunk calls for lazy metadata read
965				o.unsure = true
966			}
967			continue
968		}
969		//fs.Debugf(f, "%q belongs to %q as chunk %d", entryRemote, mainRemote, chunkNo)
970		if err := o.addChunk(entry, chunkNo); err != nil {
971			return nil, err
972		}
973	}
974
975	if o.main == nil && (o.chunks == nil || len(o.chunks) == 0) {
976		// Scanning hasn't found data chunks with conforming names.
977		if f.useMeta || quickScan {
978			// Metadata is required but absent and there are no chunks.
979			return nil, fs.ErrorObjectNotFound
980		}
981
982		// Data chunks are not found and metadata is disabled.
983		// Thus, we are in the "latter case" from above.
984		// Let's try the postponed reading of a non-chunked file and add it
985		// as a single chunk to the empty composite wrapper created above
986		// with nil metadata.
987		baseObj, err = f.base.NewObject(ctx, remote)
988		if err == nil {
989			err = o.addChunk(baseObj, 0)
990		}
991		if err != nil {
992			return nil, err
993		}
994	}
995
996	// This is either a composite object with metadata or a non-chunked
997	// file without metadata. Validate it and update the total data size.
998	// As an optimization, skip metadata reading here - we will call
999	// readMetadata lazily when needed (reading can be expensive).
1000	if !quickScan {
1001		if err := o.validate(); err != nil {
1002			return nil, err
1003		}
1004	}
1005	return o, nil
1006}
1007
1008// readMetadata reads composite object metadata and caches results,
1009// in case of critical errors metadata is not cached.
1010// Returns ErrMetaUnknown if an unsupported metadata format is detected.
1011// If object is not chunked but marked by List or NewObject for recheck,
1012// readMetadata will attempt to parse object as composite with fallback
1013// to non-chunked representation if the attempt fails.
1014func (o *Object) readMetadata(ctx context.Context) error {
1015	// return quickly if metadata is absent or has been already cached
1016	if !o.f.useMeta {
1017		o.isFull = true
1018	}
1019	if o.isFull {
1020		return nil
1021	}
1022	if !o.isComposite() && !o.unsure {
1023		// this for sure is a non-chunked standalone file
1024		o.isFull = true
1025		return nil
1026	}
1027
1028	// validate metadata
1029	metaObject := o.main
1030	if metaObject.Size() > maxMetadataSize {
1031		if o.unsure {
1032			// this is not metadata but a foreign object
1033			o.unsure = false
1034			o.chunks = nil  // make isComposite return false
1035			o.isFull = true // cache results
1036			return nil
1037		}
1038		return ErrMetaTooBig
1039	}
1040
1041	// size is within limits, perform consistency checks
1042	reader, err := metaObject.Open(ctx)
1043	if err != nil {
1044		return err
1045	}
1046	metadata, err := ioutil.ReadAll(reader)
1047	_ = reader.Close() // ensure file handle is freed on windows
1048	if err != nil {
1049		return err
1050	}
1051
1052	switch o.f.opt.MetaFormat {
1053	case "simplejson":
1054		metaInfo, madeByChunker, err := unmarshalSimpleJSON(ctx, metaObject, metadata)
1055		if o.unsure {
1056			o.unsure = false
1057			if !madeByChunker {
1058				// this is not metadata but a foreign object
1059				o.chunks = nil  // make isComposite return false
1060				o.isFull = true // cache results
1061				return nil
1062			}
1063		}
1064		switch err {
1065		case nil:
1066			// fall thru
1067		case ErrMetaTooBig, ErrMetaUnknown:
1068			return err // return these errors unwrapped for unit tests
1069		default:
1070			return errors.Wrap(err, "invalid metadata")
1071		}
1072		if o.size != metaInfo.Size() || len(o.chunks) != metaInfo.nChunks {
1073			return errors.New("metadata doesn't match file size")
1074		}
1075		o.md5 = metaInfo.md5
1076		o.sha1 = metaInfo.sha1
1077		o.xactID = metaInfo.xactID
1078	}
1079
1080	o.isFull = true // cache results
1081	o.xIDCached = true
1082	return nil
1083}
1084
1085// readXactID returns the transaction ID stored in the passed metadata object
1086func (o *Object) readXactID(ctx context.Context) (xactID string, err error) {
1087	// if xactID has already been read and cahced return it now
1088	if o.xIDCached {
1089		return o.xactID, nil
1090	}
1091	// Avoid reading metadata for backends that don't use xactID to identify permanent chunks
1092	if !o.f.useNoRename {
1093		return "", errors.New("readXactID requires norename transactions")
1094	}
1095	if o.main == nil {
1096		return "", errors.New("readXactID requires valid metaobject")
1097	}
1098	if o.main.Size() > maxMetadataSize {
1099		return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
1100	}
1101	reader, err := o.main.Open(ctx)
1102	if err != nil {
1103		return "", err
1104	}
1105	data, err := ioutil.ReadAll(reader)
1106	_ = reader.Close() // ensure file handle is freed on windows
1107	if err != nil {
1108		return "", err
1109	}
1110
1111	switch o.f.opt.MetaFormat {
1112	case "simplejson":
1113		if len(data) > maxMetadataSizeWritten {
1114			return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
1115		}
1116		var metadata metaSimpleJSON
1117		err = json.Unmarshal(data, &metadata)
1118		if err != nil {
1119			return "", nil // this was likely not a metadata object, return empty xactID but don't throw error
1120		}
1121		xactID = metadata.XactID
1122	}
1123	o.xactID = xactID
1124	o.xIDCached = true
1125	return xactID, nil
1126}
1127
1128// put implements Put, PutStream, PutUnchecked, Update
1129func (f *Fs) put(
1130	ctx context.Context, in io.Reader, src fs.ObjectInfo, remote string, options []fs.OpenOption,
1131	basePut putFn, action string, target fs.Object) (obj fs.Object, err error) {
1132
1133	// Perform consistency checks
1134	if err := f.forbidChunk(src, remote); err != nil {
1135		return nil, errors.Wrap(err, action+" refused")
1136	}
1137	if target == nil {
1138		// Get target object with a quick directory scan
1139		// skip metadata check if target object does not exist.
1140		// ignore not-chunked objects, skip chunk size checks.
1141		if obj, err := f.scanObject(ctx, remote, true); err == nil {
1142			target = obj
1143		}
1144	}
1145	if target != nil {
1146		obj := target.(*Object)
1147		if err := obj.readMetadata(ctx); err == ErrMetaUnknown {
1148			// refuse to update a file of unsupported format
1149			return nil, errors.Wrap(err, "refusing to "+action)
1150		}
1151	}
1152
1153	// Prepare to upload
1154	c := f.newChunkingReader(src)
1155	wrapIn := c.wrapStream(ctx, in, src)
1156
1157	var metaObject fs.Object
1158	defer func() {
1159		if err != nil {
1160			c.rollback(ctx, metaObject)
1161		}
1162	}()
1163
1164	baseRemote := remote
1165	xactID, errXact := f.newXactID(ctx, baseRemote)
1166	if errXact != nil {
1167		return nil, errXact
1168	}
1169
1170	// Transfer chunks data
1171	for c.chunkNo = 0; !c.done; c.chunkNo++ {
1172		if c.chunkNo > maxSafeChunkNumber {
1173			return nil, ErrChunkOverflow
1174		}
1175
1176		tempRemote := f.makeChunkName(baseRemote, c.chunkNo, "", xactID)
1177		size := c.sizeLeft
1178		if size > c.chunkSize {
1179			size = c.chunkSize
1180		}
1181		savedReadCount := c.readCount
1182
1183		// If a single chunk is expected, avoid the extra rename operation
1184		chunkRemote := tempRemote
1185		if c.expectSingle && c.chunkNo == 0 && optimizeFirstChunk {
1186			chunkRemote = baseRemote
1187		}
1188		info := f.wrapInfo(src, chunkRemote, size)
1189
1190		// Refill chunkLimit and let basePut repeatedly call chunkingReader.Read()
1191		c.chunkLimit = c.chunkSize
1192		// TODO: handle range/limit options
1193		chunk, errChunk := basePut(ctx, wrapIn, info, options...)
1194		if errChunk != nil {
1195			return nil, errChunk
1196		}
1197
1198		if size > 0 && c.readCount == savedReadCount && c.expectSingle {
1199			// basePut returned success but didn't call chunkingReader's Read.
1200			// This is possible if wrapped remote has performed the put by hash
1201			// because chunker bridges Hash from source for non-chunked files.
1202			// Hence, force Read here to update accounting and hashsums.
1203			if err := c.dummyRead(wrapIn, size); err != nil {
1204				return nil, err
1205			}
1206		}
1207		if c.sizeLeft == 0 && !c.done {
1208			// The file has been apparently put by hash, force completion.
1209			c.done = true
1210		}
1211
1212		// Expected a single chunk but more to come, so name it as usual.
1213		if !c.done && chunkRemote != tempRemote {
1214			fs.Infof(chunk, "Expected single chunk, got more")
1215			chunkMoved, errMove := f.baseMove(ctx, chunk, tempRemote, delFailed)
1216			if errMove != nil {
1217				silentlyRemove(ctx, chunk)
1218				return nil, errMove
1219			}
1220			chunk = chunkMoved
1221		}
1222
1223		// Wrapped remote may or may not have seen EOF from chunking reader,
1224		// e.g. the box multi-uploader reads exactly the chunk size specified
1225		// and skips the "EOF" read. Hence, switch to next limit here.
1226		if !(c.chunkLimit == 0 || c.chunkLimit == c.chunkSize || c.sizeTotal == -1 || c.done) {
1227			silentlyRemove(ctx, chunk)
1228			return nil, fmt.Errorf("destination ignored %d data bytes", c.chunkLimit)
1229		}
1230		c.chunkLimit = c.chunkSize
1231
1232		c.chunks = append(c.chunks, chunk)
1233	}
1234
1235	// Validate uploaded size
1236	if c.sizeTotal != -1 && c.readCount != c.sizeTotal {
1237		return nil, fmt.Errorf("incorrect upload size %d != %d", c.readCount, c.sizeTotal)
1238	}
1239
1240	// Check for input that looks like valid metadata
1241	needMeta := len(c.chunks) > 1
1242	if c.readCount <= maxMetadataSize && len(c.chunks) == 1 {
1243		_, madeByChunker, _ := unmarshalSimpleJSON(ctx, c.chunks[0], c.smallHead)
1244		needMeta = madeByChunker
1245	}
1246
1247	// Finalize small object as non-chunked.
1248	// This can be bypassed, and single chunk with metadata will be
1249	// created if forced by consistent hashing or due to unsafe input.
1250	if !needMeta && !f.hashAll && f.useMeta {
1251		// If previous object was chunked, remove its chunks
1252		f.removeOldChunks(ctx, baseRemote)
1253
1254		// Rename single data chunk in place
1255		chunk := c.chunks[0]
1256		if chunk.Remote() != baseRemote {
1257			chunkMoved, errMove := f.baseMove(ctx, chunk, baseRemote, delAlways)
1258			if errMove != nil {
1259				silentlyRemove(ctx, chunk)
1260				return nil, errMove
1261			}
1262			chunk = chunkMoved
1263		}
1264
1265		return f.newObject("", chunk, nil), nil
1266	}
1267
1268	// Validate total size of data chunks
1269	var sizeTotal int64
1270	for _, chunk := range c.chunks {
1271		sizeTotal += chunk.Size()
1272	}
1273	if sizeTotal != c.readCount {
1274		return nil, fmt.Errorf("incorrect chunks size %d != %d", sizeTotal, c.readCount)
1275	}
1276
1277	// If previous object was chunked, remove its chunks
1278	f.removeOldChunks(ctx, baseRemote)
1279
1280	if !f.useNoRename {
1281		// The transaction suffix will be removed for backends with quick rename operations
1282		for chunkNo, chunk := range c.chunks {
1283			chunkRemote := f.makeChunkName(baseRemote, chunkNo, "", "")
1284			chunkMoved, errMove := f.baseMove(ctx, chunk, chunkRemote, delFailed)
1285			if errMove != nil {
1286				return nil, errMove
1287			}
1288			c.chunks[chunkNo] = chunkMoved
1289		}
1290		xactID = ""
1291	}
1292
1293	if !f.useMeta {
1294		// Remove stale metadata, if any
1295		oldMeta, errOldMeta := f.base.NewObject(ctx, baseRemote)
1296		if errOldMeta == nil {
1297			silentlyRemove(ctx, oldMeta)
1298		}
1299
1300		o := f.newObject(baseRemote, nil, c.chunks)
1301		o.size = sizeTotal
1302		return o, nil
1303	}
1304
1305	// Update meta object
1306	var metadata []byte
1307	switch f.opt.MetaFormat {
1308	case "simplejson":
1309		c.updateHashes()
1310		metadata, err = marshalSimpleJSON(ctx, sizeTotal, len(c.chunks), c.md5, c.sha1, xactID)
1311	}
1312	if err == nil {
1313		metaInfo := f.wrapInfo(src, baseRemote, int64(len(metadata)))
1314		metaObject, err = basePut(ctx, bytes.NewReader(metadata), metaInfo)
1315	}
1316	if err != nil {
1317		return nil, err
1318	}
1319
1320	o := f.newObject("", metaObject, c.chunks)
1321	o.size = sizeTotal
1322	o.xactID = xactID
1323	return o, nil
1324}
1325
1326type putFn func(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error)
1327
1328type chunkingReader struct {
1329	baseReader   io.Reader
1330	sizeTotal    int64
1331	sizeLeft     int64
1332	readCount    int64
1333	chunkSize    int64
1334	chunkLimit   int64
1335	chunkNo      int
1336	err          error
1337	done         bool
1338	chunks       []fs.Object
1339	expectSingle bool
1340	smallHead    []byte
1341	fs           *Fs
1342	hasher       gohash.Hash
1343	md5          string
1344	sha1         string
1345}
1346
1347func (f *Fs) newChunkingReader(src fs.ObjectInfo) *chunkingReader {
1348	c := &chunkingReader{
1349		fs:        f,
1350		chunkSize: int64(f.opt.ChunkSize),
1351		sizeTotal: src.Size(),
1352	}
1353	c.chunkLimit = c.chunkSize
1354	c.sizeLeft = c.sizeTotal
1355	c.expectSingle = c.sizeTotal >= 0 && c.sizeTotal <= c.chunkSize
1356	return c
1357}
1358
1359func (c *chunkingReader) wrapStream(ctx context.Context, in io.Reader, src fs.ObjectInfo) io.Reader {
1360	baseIn, wrapBack := accounting.UnWrap(in)
1361
1362	switch {
1363	case c.fs.useMD5:
1364		srcObj := fs.UnWrapObjectInfo(src)
1365		if srcObj != nil && srcObj.Fs().Features().SlowHash {
1366			fs.Debugf(src, "skip slow MD5 on source file, hashing in-transit")
1367			c.hasher = md5.New()
1368			break
1369		}
1370		if c.md5, _ = src.Hash(ctx, hash.MD5); c.md5 == "" {
1371			if c.fs.hashFallback {
1372				c.sha1, _ = src.Hash(ctx, hash.SHA1)
1373			} else {
1374				c.hasher = md5.New()
1375			}
1376		}
1377	case c.fs.useSHA1:
1378		srcObj := fs.UnWrapObjectInfo(src)
1379		if srcObj != nil && srcObj.Fs().Features().SlowHash {
1380			fs.Debugf(src, "skip slow SHA1 on source file, hashing in-transit")
1381			c.hasher = sha1.New()
1382			break
1383		}
1384		if c.sha1, _ = src.Hash(ctx, hash.SHA1); c.sha1 == "" {
1385			if c.fs.hashFallback {
1386				c.md5, _ = src.Hash(ctx, hash.MD5)
1387			} else {
1388				c.hasher = sha1.New()
1389			}
1390		}
1391	}
1392
1393	if c.hasher != nil {
1394		baseIn = io.TeeReader(baseIn, c.hasher)
1395	}
1396	c.baseReader = baseIn
1397	return wrapBack(c)
1398}
1399
1400func (c *chunkingReader) updateHashes() {
1401	if c.hasher == nil {
1402		return
1403	}
1404	switch {
1405	case c.fs.useMD5:
1406		c.md5 = hex.EncodeToString(c.hasher.Sum(nil))
1407	case c.fs.useSHA1:
1408		c.sha1 = hex.EncodeToString(c.hasher.Sum(nil))
1409	}
1410}
1411
1412// Note: Read is not called if wrapped remote performs put by hash.
1413func (c *chunkingReader) Read(buf []byte) (bytesRead int, err error) {
1414	if c.chunkLimit <= 0 {
1415		// Chunk complete - switch to next one.
1416		// Note #1:
1417		// We might not get here because some remotes (e.g. box multi-uploader)
1418		// read the specified size exactly and skip the concluding EOF Read.
1419		// Then a check in the put loop will kick in.
1420		// Note #2:
1421		// The crypt backend after receiving EOF here will call Read again
1422		// and we must insist on returning EOF, so we postpone refilling
1423		// chunkLimit to the main loop.
1424		return 0, io.EOF
1425	}
1426	if int64(len(buf)) > c.chunkLimit {
1427		buf = buf[0:c.chunkLimit]
1428	}
1429	bytesRead, err = c.baseReader.Read(buf)
1430	if err != nil && err != io.EOF {
1431		c.err = err
1432		c.done = true
1433		return
1434	}
1435	c.accountBytes(int64(bytesRead))
1436	if c.chunkNo == 0 && c.expectSingle && bytesRead > 0 && c.readCount <= maxMetadataSize {
1437		c.smallHead = append(c.smallHead, buf[:bytesRead]...)
1438	}
1439	if bytesRead == 0 && c.sizeLeft == 0 {
1440		err = io.EOF // Force EOF when no data left.
1441	}
1442	if err == io.EOF {
1443		c.done = true
1444	}
1445	return
1446}
1447
1448func (c *chunkingReader) accountBytes(bytesRead int64) {
1449	c.readCount += bytesRead
1450	c.chunkLimit -= bytesRead
1451	if c.sizeLeft != -1 {
1452		c.sizeLeft -= bytesRead
1453	}
1454}
1455
1456// dummyRead updates accounting, hashsums, etc. by simulating reads
1457func (c *chunkingReader) dummyRead(in io.Reader, size int64) error {
1458	if c.hasher == nil && c.readCount+size > maxMetadataSize {
1459		c.accountBytes(size)
1460		return nil
1461	}
1462	const bufLen = 1048576 // 1 MiB
1463	buf := make([]byte, bufLen)
1464	for size > 0 {
1465		n := size
1466		if n > bufLen {
1467			n = bufLen
1468		}
1469		if _, err := io.ReadFull(in, buf[0:n]); err != nil {
1470			return err
1471		}
1472		size -= n
1473	}
1474	return nil
1475}
1476
1477// rollback removes uploaded temporary chunks
1478func (c *chunkingReader) rollback(ctx context.Context, metaObject fs.Object) {
1479	if metaObject != nil {
1480		c.chunks = append(c.chunks, metaObject)
1481	}
1482	for _, chunk := range c.chunks {
1483		if err := chunk.Remove(ctx); err != nil {
1484			fs.Errorf(chunk, "Failed to remove temporary chunk: %v", err)
1485		}
1486	}
1487}
1488
1489func (f *Fs) removeOldChunks(ctx context.Context, remote string) {
1490	oldFsObject, err := f.NewObject(ctx, remote)
1491	if err == nil {
1492		oldObject := oldFsObject.(*Object)
1493		for _, chunk := range oldObject.chunks {
1494			if err := chunk.Remove(ctx); err != nil {
1495				fs.Errorf(chunk, "Failed to remove old chunk: %v", err)
1496			}
1497		}
1498	}
1499}
1500
1501// Put into the remote path with the given modTime and size.
1502//
1503// May create the object even if it returns an error - if so
1504// will return the object and the error, otherwise will return
1505// nil and the error
1506func (f *Fs) Put(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
1507	return f.put(ctx, in, src, src.Remote(), options, f.base.Put, "put", nil)
1508}
1509
1510// PutStream uploads to the remote path with the modTime given of indeterminate size
1511func (f *Fs) PutStream(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
1512	return f.put(ctx, in, src, src.Remote(), options, f.base.Features().PutStream, "upload", nil)
1513}
1514
1515// Update in to the object with the modTime given of the given size
1516func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) error {
1517	basePut := o.f.base.Put
1518	if src.Size() < 0 {
1519		basePut = o.f.base.Features().PutStream
1520		if basePut == nil {
1521			return errors.New("wrapped file system does not support streaming uploads")
1522		}
1523	}
1524	oNew, err := o.f.put(ctx, in, src, o.Remote(), options, basePut, "update", o)
1525	if err == nil {
1526		*o = *oNew.(*Object)
1527	}
1528	return err
1529}
1530
1531// PutUnchecked uploads the object
1532//
1533// This will create a duplicate if we upload a new file without
1534// checking to see if there is one already - use Put() for that.
1535func (f *Fs) PutUnchecked(ctx context.Context, in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
1536	do := f.base.Features().PutUnchecked
1537	if do == nil {
1538		return nil, errors.New("can't PutUnchecked")
1539	}
1540	// TODO: handle range/limit options and really chunk stream here!
1541	o, err := do(ctx, in, f.wrapInfo(src, "", -1))
1542	if err != nil {
1543		return nil, err
1544	}
1545	return f.newObject("", o, nil), nil
1546}
1547
1548// Hashes returns the supported hash sets.
1549// Chunker advertises a hash type if and only if it can be calculated
1550// for files of any size, non-chunked or composite.
1551func (f *Fs) Hashes() hash.Set {
1552	// composites AND no fallback AND (chunker OR wrapped Fs will hash all non-chunked's)
1553	if f.useMD5 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.MD5)) {
1554		return hash.NewHashSet(hash.MD5)
1555	}
1556	if f.useSHA1 && !f.hashFallback && (f.hashAll || f.base.Hashes().Contains(hash.SHA1)) {
1557		return hash.NewHashSet(hash.SHA1)
1558	}
1559	return hash.NewHashSet() // can't provide strong guarantees
1560}
1561
1562// Mkdir makes the directory (container, bucket)
1563//
1564// Shouldn't return an error if it already exists
1565func (f *Fs) Mkdir(ctx context.Context, dir string) error {
1566	if err := f.forbidChunk(dir, dir); err != nil {
1567		return errors.Wrap(err, "can't mkdir")
1568	}
1569	return f.base.Mkdir(ctx, dir)
1570}
1571
1572// Rmdir removes the directory (container, bucket) if empty
1573//
1574// Return an error if it doesn't exist or isn't empty
1575func (f *Fs) Rmdir(ctx context.Context, dir string) error {
1576	return f.base.Rmdir(ctx, dir)
1577}
1578
1579// Purge all files in the directory
1580//
1581// Implement this if you have a way of deleting all the files
1582// quicker than just running Remove() on the result of List()
1583//
1584// Return an error if it doesn't exist.
1585//
1586// This command will chain to `purge` from wrapped remote.
1587// As a result it removes not only composite chunker files with their
1588// active chunks but also all hidden temporary chunks in the directory.
1589//
1590func (f *Fs) Purge(ctx context.Context, dir string) error {
1591	do := f.base.Features().Purge
1592	if do == nil {
1593		return fs.ErrorCantPurge
1594	}
1595	return do(ctx, dir)
1596}
1597
1598// Remove an object (chunks and metadata, if any)
1599//
1600// Remove deletes only active chunks of the composite object.
1601// It does not try to look for temporary chunks because they could belong
1602// to another command modifying this composite file in parallel.
1603//
1604// Commands normally cleanup all temporary chunks in case of a failure.
1605// However, if rclone dies unexpectedly, it can leave hidden temporary
1606// chunks, which cannot be discovered using the `list` command.
1607// Remove does not try to search for such chunks or to delete them.
1608// Sometimes this can lead to strange results e.g. when `list` shows that
1609// directory is empty but `rmdir` refuses to remove it because on the
1610// level of wrapped remote it's actually *not* empty.
1611// As a workaround users can use `purge` to forcibly remove it.
1612//
1613// In future, a flag `--chunker-delete-hidden` may be added which tells
1614// Remove to search directory for hidden chunks and remove them too
1615// (at the risk of breaking parallel commands).
1616//
1617// Remove is the only operation allowed on the composite files with
1618// invalid or future metadata format.
1619// We don't let user copy/move/update unsupported composite files.
1620// Let's at least let her get rid of them, just complain loudly.
1621//
1622// This can litter directory with orphan chunks of unsupported types,
1623// but as long as we remove meta object, even future releases will
1624// treat the composite file as removed and refuse to act upon it.
1625//
1626// Disclaimer: corruption can still happen if unsupported file is removed
1627// and then recreated with the same name.
1628// Unsupported control chunks will get re-picked by a more recent
1629// rclone version with unexpected results. This can be helped by
1630// the `delete hidden` flag above or at least the user has been warned.
1631//
1632func (o *Object) Remove(ctx context.Context) (err error) {
1633	if err := o.f.forbidChunk(o, o.Remote()); err != nil {
1634		// operations.Move can still call Remove if chunker's Move refuses
1635		// to corrupt file in hard mode. Hence, refuse to Remove, too.
1636		return errors.Wrap(err, "refuse to corrupt")
1637	}
1638	if err := o.readMetadata(ctx); err == ErrMetaUnknown {
1639		// Proceed but warn user that unexpected things can happen.
1640		fs.Errorf(o, "Removing a file with unsupported metadata: %v", err)
1641	}
1642
1643	// Remove non-chunked file or meta object of a composite file.
1644	if o.main != nil {
1645		err = o.main.Remove(ctx)
1646	}
1647
1648	// Remove only active data chunks, ignore any temporary chunks that
1649	// might probably be created in parallel by other transactions.
1650	for _, chunk := range o.chunks {
1651		chunkErr := chunk.Remove(ctx)
1652		if err == nil {
1653			err = chunkErr
1654		}
1655	}
1656
1657	// There are no known control chunks to remove atm.
1658	return err
1659}
1660
1661// copyOrMove implements copy or move
1662func (f *Fs) copyOrMove(ctx context.Context, o *Object, remote string, do copyMoveFn, md5, sha1, opName string) (fs.Object, error) {
1663	if err := f.forbidChunk(o, remote); err != nil {
1664		return nil, errors.Wrapf(err, "can't %s", opName)
1665	}
1666	if err := o.readMetadata(ctx); err != nil {
1667		// Refuse to copy/move composite files with invalid or future
1668		// metadata format which might involve unsupported chunk types.
1669		return nil, errors.Wrapf(err, "can't %s this file", opName)
1670	}
1671	if !o.isComposite() {
1672		fs.Debugf(o, "%s non-chunked object...", opName)
1673		oResult, err := do(ctx, o.mainChunk(), remote) // chain operation to a single wrapped chunk
1674		if err != nil {
1675			return nil, err
1676		}
1677		return f.newObject("", oResult, nil), nil
1678	}
1679
1680	fs.Debugf(o, "%s %d data chunks...", opName, len(o.chunks))
1681	mainRemote := o.remote
1682	var newChunks []fs.Object
1683	var err error
1684
1685	// Copy/move active data chunks.
1686	// Ignore possible temporary chunks being created by parallel operations.
1687	for _, chunk := range o.chunks {
1688		chunkRemote := chunk.Remote()
1689		if !strings.HasPrefix(chunkRemote, mainRemote) {
1690			err = fmt.Errorf("invalid chunk name %q", chunkRemote)
1691			break
1692		}
1693		chunkSuffix := chunkRemote[len(mainRemote):]
1694		chunkResult, err := do(ctx, chunk, remote+chunkSuffix)
1695		if err != nil {
1696			break
1697		}
1698		newChunks = append(newChunks, chunkResult)
1699	}
1700
1701	// Copy or move old metadata.
1702	// There are no known control chunks to move/copy atm.
1703	var metaObject fs.Object
1704	if err == nil && o.main != nil {
1705		metaObject, err = do(ctx, o.main, remote)
1706	}
1707	if err != nil {
1708		for _, chunk := range newChunks {
1709			silentlyRemove(ctx, chunk)
1710		}
1711		return nil, err
1712	}
1713
1714	// Create wrapping object, calculate and validate total size
1715	newObj := f.newObject(remote, metaObject, newChunks)
1716	err = newObj.validate()
1717	if err != nil {
1718		silentlyRemove(ctx, newObj)
1719		return nil, err
1720	}
1721
1722	// Update metadata
1723	var metadata []byte
1724	switch f.opt.MetaFormat {
1725	case "simplejson":
1726		metadata, err = marshalSimpleJSON(ctx, newObj.size, len(newChunks), md5, sha1, o.xactID)
1727		if err == nil {
1728			metaInfo := f.wrapInfo(metaObject, "", int64(len(metadata)))
1729			err = newObj.main.Update(ctx, bytes.NewReader(metadata), metaInfo)
1730		}
1731	case "none":
1732		if newObj.main != nil {
1733			err = newObj.main.Remove(ctx)
1734		}
1735	}
1736
1737	// Return the composite object
1738	if err != nil {
1739		silentlyRemove(ctx, newObj)
1740		return nil, err
1741	}
1742	return newObj, nil
1743}
1744
1745type copyMoveFn func(context.Context, fs.Object, string) (fs.Object, error)
1746
1747func (f *Fs) okForServerSide(ctx context.Context, src fs.Object, opName string) (obj *Object, md5, sha1 string, ok bool) {
1748	var diff string
1749	obj, ok = src.(*Object)
1750
1751	switch {
1752	case !ok:
1753		diff = "remote types"
1754	case !operations.SameConfig(f.base, obj.f.base):
1755		diff = "wrapped remotes"
1756	case f.opt.ChunkSize != obj.f.opt.ChunkSize:
1757		diff = "chunk sizes"
1758	case f.opt.NameFormat != obj.f.opt.NameFormat:
1759		diff = "chunk name formats"
1760	case f.opt.StartFrom != obj.f.opt.StartFrom:
1761		diff = "chunk numbering"
1762	case f.opt.MetaFormat != obj.f.opt.MetaFormat:
1763		diff = "meta formats"
1764	}
1765	if diff != "" {
1766		fs.Debugf(src, "Can't %s - different %s", opName, diff)
1767		ok = false
1768		return
1769	}
1770
1771	if obj.unsure {
1772		// ensure object is composite if need to re-read metadata
1773		_ = obj.readMetadata(ctx)
1774	}
1775	requireMetaHash := obj.isComposite() && f.opt.MetaFormat == "simplejson"
1776	if !requireMetaHash && !f.hashAll {
1777		ok = true // hash is not required for metadata
1778		return
1779	}
1780
1781	switch {
1782	case f.useMD5:
1783		md5, _ = obj.Hash(ctx, hash.MD5)
1784		ok = md5 != ""
1785		if !ok && f.hashFallback {
1786			sha1, _ = obj.Hash(ctx, hash.SHA1)
1787			ok = sha1 != ""
1788		}
1789	case f.useSHA1:
1790		sha1, _ = obj.Hash(ctx, hash.SHA1)
1791		ok = sha1 != ""
1792		if !ok && f.hashFallback {
1793			md5, _ = obj.Hash(ctx, hash.MD5)
1794			ok = md5 != ""
1795		}
1796	default:
1797		ok = false
1798	}
1799	if !ok {
1800		fs.Debugf(src, "Can't %s - required hash not found", opName)
1801	}
1802	return
1803}
1804
1805// Copy src to this remote using server-side copy operations.
1806//
1807// This is stored with the remote path given
1808//
1809// It returns the destination Object and a possible error
1810//
1811// Will only be called if src.Fs().Name() == f.Name()
1812//
1813// If it isn't possible then return fs.ErrorCantCopy
1814func (f *Fs) Copy(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
1815	baseCopy := f.base.Features().Copy
1816	if baseCopy == nil {
1817		return nil, fs.ErrorCantCopy
1818	}
1819	obj, md5, sha1, ok := f.okForServerSide(ctx, src, "copy")
1820	if !ok {
1821		return nil, fs.ErrorCantCopy
1822	}
1823	return f.copyOrMove(ctx, obj, remote, baseCopy, md5, sha1, "copy")
1824}
1825
1826// Move src to this remote using server-side move operations.
1827//
1828// This is stored with the remote path given
1829//
1830// It returns the destination Object and a possible error
1831//
1832// Will only be called if src.Fs().Name() == f.Name()
1833//
1834// If it isn't possible then return fs.ErrorCantMove
1835func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
1836	baseMove := func(ctx context.Context, src fs.Object, remote string) (fs.Object, error) {
1837		return f.baseMove(ctx, src, remote, delNever)
1838	}
1839	obj, md5, sha1, ok := f.okForServerSide(ctx, src, "move")
1840	if !ok {
1841		return nil, fs.ErrorCantMove
1842	}
1843	return f.copyOrMove(ctx, obj, remote, baseMove, md5, sha1, "move")
1844}
1845
1846// baseMove chains to the wrapped Move or simulates it by Copy+Delete
1847func (f *Fs) baseMove(ctx context.Context, src fs.Object, remote string, delMode int) (fs.Object, error) {
1848	var (
1849		dest fs.Object
1850		err  error
1851	)
1852	switch delMode {
1853	case delAlways:
1854		dest, err = f.base.NewObject(ctx, remote)
1855	case delFailed:
1856		dest, err = operations.Move(ctx, f.base, nil, remote, src)
1857		if err == nil {
1858			return dest, err
1859		}
1860		dest, err = f.base.NewObject(ctx, remote)
1861	case delNever:
1862		// fall thru, the default
1863	}
1864	if err != nil {
1865		dest = nil
1866	}
1867	return operations.Move(ctx, f.base, dest, remote, src)
1868}
1869
1870// DirMove moves src, srcRemote to this remote at dstRemote
1871// using server-side move operations.
1872//
1873// Will only be called if src.Fs().Name() == f.Name()
1874//
1875// If it isn't possible then return fs.ErrorCantDirMove
1876//
1877// If destination exists then return fs.ErrorDirExists
1878func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string) error {
1879	do := f.base.Features().DirMove
1880	if do == nil {
1881		return fs.ErrorCantDirMove
1882	}
1883	srcFs, ok := src.(*Fs)
1884	if !ok {
1885		fs.Debugf(srcFs, "Can't move directory - not same remote type")
1886		return fs.ErrorCantDirMove
1887	}
1888	return do(ctx, srcFs.base, srcRemote, dstRemote)
1889}
1890
1891// CleanUp the trash in the Fs
1892//
1893// Implement this if you have a way of emptying the trash or
1894// otherwise cleaning up old versions of files.
1895func (f *Fs) CleanUp(ctx context.Context) error {
1896	do := f.base.Features().CleanUp
1897	if do == nil {
1898		return errors.New("can't CleanUp")
1899	}
1900	return do(ctx)
1901}
1902
1903// About gets quota information from the Fs
1904func (f *Fs) About(ctx context.Context) (*fs.Usage, error) {
1905	do := f.base.Features().About
1906	if do == nil {
1907		return nil, errors.New("About not supported")
1908	}
1909	return do(ctx)
1910}
1911
1912// UnWrap returns the Fs that this Fs is wrapping
1913func (f *Fs) UnWrap() fs.Fs {
1914	return f.base
1915}
1916
1917// WrapFs returns the Fs that is wrapping this Fs
1918func (f *Fs) WrapFs() fs.Fs {
1919	return f.wrapper
1920}
1921
1922// SetWrapper sets the Fs that is wrapping this Fs
1923func (f *Fs) SetWrapper(wrapper fs.Fs) {
1924	f.wrapper = wrapper
1925}
1926
1927// ChangeNotify calls the passed function with a path
1928// that has had changes. If the implementation
1929// uses polling, it should adhere to the given interval.
1930//
1931// Replace data chunk names by the name of composite file.
1932// Ignore temporary and control chunks.
1933func (f *Fs) ChangeNotify(ctx context.Context, notifyFunc func(string, fs.EntryType), pollIntervalChan <-chan time.Duration) {
1934	do := f.base.Features().ChangeNotify
1935	if do == nil {
1936		return
1937	}
1938	wrappedNotifyFunc := func(path string, entryType fs.EntryType) {
1939		//fs.Debugf(f, "ChangeNotify: path %q entryType %d", path, entryType)
1940		if entryType == fs.EntryObject {
1941			mainPath, _, _, xactID := f.parseChunkName(path)
1942			metaXactID := ""
1943			if f.useNoRename {
1944				metaObject, _ := f.base.NewObject(ctx, mainPath)
1945				dummyObject := f.newObject("", metaObject, nil)
1946				metaXactID, _ = dummyObject.readXactID(ctx)
1947			}
1948			if mainPath != "" && xactID == metaXactID {
1949				path = mainPath
1950			}
1951		}
1952		notifyFunc(path, entryType)
1953	}
1954	do(ctx, wrappedNotifyFunc, pollIntervalChan)
1955}
1956
1957// Shutdown the backend, closing any background tasks and any
1958// cached connections.
1959func (f *Fs) Shutdown(ctx context.Context) error {
1960	do := f.base.Features().Shutdown
1961	if do == nil {
1962		return nil
1963	}
1964	return do(ctx)
1965}
1966
1967// Object represents a composite file wrapping one or more data chunks
1968type Object struct {
1969	remote    string
1970	main      fs.Object   // meta object if file is composite, or wrapped non-chunked file, nil if meta format is 'none'
1971	chunks    []fs.Object // active data chunks if file is composite, or wrapped file as a single chunk if meta format is 'none'
1972	size      int64       // cached total size of chunks in a composite file or -1 for non-chunked files
1973	isFull    bool        // true if metadata has been read
1974	xIDCached bool        // true if xactID has been read
1975	unsure    bool        // true if need to read metadata to detect object type
1976	xactID    string      // transaction ID for "norename" or empty string for "renamed" chunks
1977	md5       string
1978	sha1      string
1979	f         *Fs
1980}
1981
1982func (o *Object) addChunk(chunk fs.Object, chunkNo int) error {
1983	if chunkNo < 0 {
1984		return fmt.Errorf("invalid chunk number %d", chunkNo+o.f.opt.StartFrom)
1985	}
1986	if chunkNo == len(o.chunks) {
1987		o.chunks = append(o.chunks, chunk)
1988		return nil
1989	}
1990	if chunkNo > maxSafeChunkNumber {
1991		return ErrChunkOverflow
1992	}
1993	if chunkNo > len(o.chunks) {
1994		newChunks := make([]fs.Object, (chunkNo + 1), (chunkNo+1)*2)
1995		copy(newChunks, o.chunks)
1996		o.chunks = newChunks
1997	}
1998	if o.chunks[chunkNo] != nil {
1999		return fmt.Errorf("duplicate chunk number %d", chunkNo+o.f.opt.StartFrom)
2000	}
2001	o.chunks[chunkNo] = chunk
2002	return nil
2003}
2004
2005// validate verifies the object internals and updates total size
2006func (o *Object) validate() error {
2007	if !o.isComposite() {
2008		_ = o.mainChunk() // verify that single wrapped chunk exists
2009		return nil
2010	}
2011
2012	metaObject := o.main // this file is composite - o.main refers to meta object (or nil if meta format is 'none')
2013	if metaObject != nil && metaObject.Size() > maxMetadataSize {
2014		// metadata of a chunked file must be a tiny piece of json
2015		o.size = -1
2016		return fmt.Errorf("%q metadata is too large", o.remote)
2017	}
2018
2019	var totalSize int64
2020	for _, chunk := range o.chunks {
2021		if chunk == nil {
2022			o.size = -1
2023			return fmt.Errorf("%q has missing chunks", o)
2024		}
2025		totalSize += chunk.Size()
2026	}
2027	o.size = totalSize // cache up the total data size
2028	return nil
2029}
2030
2031func (f *Fs) newObject(remote string, main fs.Object, chunks []fs.Object) *Object {
2032	var size int64 = -1
2033	if main != nil {
2034		size = main.Size()
2035		if remote == "" {
2036			remote = main.Remote()
2037		}
2038	}
2039	return &Object{
2040		remote: remote,
2041		main:   main,
2042		size:   size,
2043		f:      f,
2044		chunks: chunks,
2045	}
2046}
2047
2048// mainChunk returns:
2049// - a wrapped object for non-chunked files
2050// - meta object for chunked files with metadata
2051// - first chunk for chunked files without metadata
2052// Never returns nil.
2053func (o *Object) mainChunk() fs.Object {
2054	if o.main != nil {
2055		return o.main // meta object or non-chunked wrapped file
2056	}
2057	if o.chunks != nil {
2058		return o.chunks[0] // first chunk of a chunked composite file
2059	}
2060	panic("invalid chunked object") // very unlikely
2061}
2062
2063func (o *Object) isComposite() bool {
2064	return o.chunks != nil
2065}
2066
2067// Fs returns read only access to the Fs that this object is part of
2068func (o *Object) Fs() fs.Info {
2069	return o.f
2070}
2071
2072// Return a string version
2073func (o *Object) String() string {
2074	if o == nil {
2075		return "<nil>"
2076	}
2077	return o.remote
2078}
2079
2080// Remote returns the remote path
2081func (o *Object) Remote() string {
2082	return o.remote
2083}
2084
2085// Size returns the size of the file
2086func (o *Object) Size() int64 {
2087	if o.isComposite() {
2088		return o.size // total size of data chunks in a composite file
2089	}
2090	return o.mainChunk().Size() // size of wrapped non-chunked file
2091}
2092
2093// Storable returns whether object is storable
2094func (o *Object) Storable() bool {
2095	return o.mainChunk().Storable()
2096}
2097
2098// ModTime returns the modification time of the file
2099func (o *Object) ModTime(ctx context.Context) time.Time {
2100	return o.mainChunk().ModTime(ctx)
2101}
2102
2103// SetModTime sets the modification time of the file
2104func (o *Object) SetModTime(ctx context.Context, mtime time.Time) error {
2105	if err := o.readMetadata(ctx); err != nil {
2106		return err // refuse to act on unsupported format
2107	}
2108	return o.mainChunk().SetModTime(ctx, mtime)
2109}
2110
2111// Hash returns the selected checksum of the file.
2112// If no checksum is available it returns "".
2113//
2114// Hash won't fail with `unsupported` error but return empty
2115// hash string if a particular hashsum type is not supported
2116//
2117// Hash takes hashsum from metadata if available or requests it
2118// from wrapped remote for non-chunked files.
2119// Metadata (if meta format is not 'none') is by default kept
2120// only for composite files. In the "All" hashing mode chunker
2121// will force metadata on all files if particular hashsum type
2122// is not supported by wrapped remote.
2123//
2124// Note that Hash prefers the wrapped hashsum for non-chunked
2125// file, then tries to read it from metadata. This in theory
2126// handles the unusual case when a small file has been tampered
2127// on the level of wrapped remote but chunker is unaware of that.
2128//
2129func (o *Object) Hash(ctx context.Context, hashType hash.Type) (string, error) {
2130	if err := o.readMetadata(ctx); err != nil {
2131		return "", err // valid metadata is required to get hash, abort
2132	}
2133	if !o.isComposite() {
2134		// First, chain to the wrapped non-chunked file if possible.
2135		if value, err := o.mainChunk().Hash(ctx, hashType); err == nil && value != "" {
2136			return value, nil
2137		}
2138	}
2139
2140	// Try hash from metadata if the file is composite or if wrapped remote fails.
2141	switch hashType {
2142	case hash.MD5:
2143		if o.md5 == "" {
2144			return "", nil
2145		}
2146		return o.md5, nil
2147	case hash.SHA1:
2148		if o.sha1 == "" {
2149			return "", nil
2150		}
2151		return o.sha1, nil
2152	default:
2153		return "", hash.ErrUnsupported
2154	}
2155}
2156
2157// UnWrap returns the wrapped Object
2158func (o *Object) UnWrap() fs.Object {
2159	return o.mainChunk()
2160}
2161
2162// Open opens the file for read.  Call Close() on the returned io.ReadCloser
2163func (o *Object) Open(ctx context.Context, options ...fs.OpenOption) (rc io.ReadCloser, err error) {
2164	if err := o.readMetadata(ctx); err != nil {
2165		// refuse to open unsupported format
2166		return nil, errors.Wrap(err, "can't open")
2167	}
2168	if !o.isComposite() {
2169		return o.mainChunk().Open(ctx, options...) // chain to wrapped non-chunked file
2170	}
2171
2172	var openOptions []fs.OpenOption
2173	var offset, limit int64 = 0, -1
2174
2175	for _, option := range options {
2176		switch opt := option.(type) {
2177		case *fs.SeekOption:
2178			offset = opt.Offset
2179		case *fs.RangeOption:
2180			offset, limit = opt.Decode(o.size)
2181		default:
2182			// pass Options on to the wrapped open, if appropriate
2183			openOptions = append(openOptions, option)
2184		}
2185	}
2186
2187	if offset < 0 {
2188		return nil, errors.New("invalid offset")
2189	}
2190	if limit < 0 {
2191		limit = o.size - offset
2192	}
2193
2194	return o.newLinearReader(ctx, offset, limit, openOptions)
2195}
2196
2197// linearReader opens and reads file chunks sequentially, without read-ahead
2198type linearReader struct {
2199	ctx     context.Context
2200	chunks  []fs.Object
2201	options []fs.OpenOption
2202	limit   int64
2203	count   int64
2204	pos     int
2205	reader  io.ReadCloser
2206	err     error
2207}
2208
2209func (o *Object) newLinearReader(ctx context.Context, offset, limit int64, options []fs.OpenOption) (io.ReadCloser, error) {
2210	r := &linearReader{
2211		ctx:     ctx,
2212		chunks:  o.chunks,
2213		options: options,
2214		limit:   limit,
2215	}
2216
2217	// skip to chunk for given offset
2218	err := io.EOF
2219	for offset >= 0 && err != nil {
2220		offset, err = r.nextChunk(offset)
2221	}
2222	if err == nil || err == io.EOF {
2223		r.err = err
2224		return r, nil
2225	}
2226	return nil, err
2227}
2228
2229func (r *linearReader) nextChunk(offset int64) (int64, error) {
2230	if r.err != nil {
2231		return -1, r.err
2232	}
2233	if r.pos >= len(r.chunks) || r.limit <= 0 || offset < 0 {
2234		return -1, io.EOF
2235	}
2236
2237	chunk := r.chunks[r.pos]
2238	count := chunk.Size()
2239	r.pos++
2240
2241	if offset >= count {
2242		return offset - count, io.EOF
2243	}
2244	count -= offset
2245	if r.limit < count {
2246		count = r.limit
2247	}
2248	options := append(r.options, &fs.RangeOption{Start: offset, End: offset + count - 1})
2249
2250	if err := r.Close(); err != nil {
2251		return -1, err
2252	}
2253
2254	reader, err := chunk.Open(r.ctx, options...)
2255	if err != nil {
2256		return -1, err
2257	}
2258
2259	r.reader = reader
2260	r.count = count
2261	return offset, nil
2262}
2263
2264func (r *linearReader) Read(p []byte) (n int, err error) {
2265	if r.err != nil {
2266		return 0, r.err
2267	}
2268	if r.limit <= 0 {
2269		r.err = io.EOF
2270		return 0, io.EOF
2271	}
2272
2273	for r.count <= 0 {
2274		// current chunk has been read completely or its size is zero
2275		off, err := r.nextChunk(0)
2276		if off < 0 {
2277			r.err = err
2278			return 0, err
2279		}
2280	}
2281
2282	n, err = r.reader.Read(p)
2283	if err == nil || err == io.EOF {
2284		r.count -= int64(n)
2285		r.limit -= int64(n)
2286		if r.limit > 0 {
2287			err = nil // more data to read
2288		}
2289	}
2290	r.err = err
2291	return
2292}
2293
2294func (r *linearReader) Close() (err error) {
2295	if r.reader != nil {
2296		err = r.reader.Close()
2297		r.reader = nil
2298	}
2299	return
2300}
2301
2302// ObjectInfo describes a wrapped fs.ObjectInfo for being the source
2303type ObjectInfo struct {
2304	src     fs.ObjectInfo
2305	fs      *Fs
2306	nChunks int    // number of data chunks
2307	xactID  string // transaction ID for "norename" or empty string for "renamed" chunks
2308	size    int64  // overrides source size by the total size of data chunks
2309	remote  string // overrides remote name
2310	md5     string // overrides MD5 checksum
2311	sha1    string // overrides SHA1 checksum
2312}
2313
2314func (f *Fs) wrapInfo(src fs.ObjectInfo, newRemote string, totalSize int64) *ObjectInfo {
2315	return &ObjectInfo{
2316		src:    src,
2317		fs:     f,
2318		size:   totalSize,
2319		remote: newRemote,
2320	}
2321}
2322
2323// Fs returns read only access to the Fs that this object is part of
2324func (oi *ObjectInfo) Fs() fs.Info {
2325	if oi.fs == nil {
2326		panic("stub ObjectInfo")
2327	}
2328	return oi.fs
2329}
2330
2331// String returns string representation
2332func (oi *ObjectInfo) String() string {
2333	return oi.src.String()
2334}
2335
2336// Storable returns whether object is storable
2337func (oi *ObjectInfo) Storable() bool {
2338	return oi.src.Storable()
2339}
2340
2341// Remote returns the remote path
2342func (oi *ObjectInfo) Remote() string {
2343	if oi.remote != "" {
2344		return oi.remote
2345	}
2346	return oi.src.Remote()
2347}
2348
2349// Size returns the size of the file
2350func (oi *ObjectInfo) Size() int64 {
2351	if oi.size != -1 {
2352		return oi.size
2353	}
2354	return oi.src.Size()
2355}
2356
2357// ModTime returns the modification time
2358func (oi *ObjectInfo) ModTime(ctx context.Context) time.Time {
2359	return oi.src.ModTime(ctx)
2360}
2361
2362// Hash returns the selected checksum of the wrapped file
2363// It returns "" if no checksum is available or if this
2364// info doesn't wrap the complete file.
2365func (oi *ObjectInfo) Hash(ctx context.Context, hashType hash.Type) (string, error) {
2366	var errUnsupported error
2367	switch hashType {
2368	case hash.MD5:
2369		if oi.md5 != "" {
2370			return oi.md5, nil
2371		}
2372	case hash.SHA1:
2373		if oi.sha1 != "" {
2374			return oi.sha1, nil
2375		}
2376	default:
2377		errUnsupported = hash.ErrUnsupported
2378	}
2379	if oi.Size() != oi.src.Size() {
2380		// fail if this info wraps only a part of the file
2381		return "", errUnsupported
2382	}
2383	// chain to full source if possible
2384	value, err := oi.src.Hash(ctx, hashType)
2385	if err == hash.ErrUnsupported {
2386		return "", errUnsupported
2387	}
2388	return value, err
2389}
2390
2391// ID returns the ID of the Object if known, or "" if not
2392func (o *Object) ID() string {
2393	if doer, ok := o.mainChunk().(fs.IDer); ok {
2394		return doer.ID()
2395	}
2396	return ""
2397}
2398
2399// Meta format `simplejson`
2400type metaSimpleJSON struct {
2401	// required core fields
2402	Version  *int   `json:"ver"`
2403	Size     *int64 `json:"size"`    // total size of data chunks
2404	ChunkNum *int   `json:"nchunks"` // number of data chunks
2405	// optional extra fields
2406	MD5    string `json:"md5,omitempty"`
2407	SHA1   string `json:"sha1,omitempty"`
2408	XactID string `json:"txn,omitempty"` // transaction ID for norename transactions
2409}
2410
2411// marshalSimpleJSON
2412//
2413// Current implementation creates metadata in three cases:
2414// - for files larger than chunk size
2415// - if file contents can be mistaken as meta object
2416// - if consistent hashing is On but wrapped remote can't provide given hash
2417//
2418func marshalSimpleJSON(ctx context.Context, size int64, nChunks int, md5, sha1, xactID string) ([]byte, error) {
2419	version := metadataVersion
2420	if xactID == "" && version == 2 {
2421		version = 1
2422	}
2423	metadata := metaSimpleJSON{
2424		// required core fields
2425		Version:  &version,
2426		Size:     &size,
2427		ChunkNum: &nChunks,
2428		// optional extra fields
2429		MD5:    md5,
2430		SHA1:   sha1,
2431		XactID: xactID,
2432	}
2433	data, err := json.Marshal(&metadata)
2434	if err == nil && data != nil && len(data) >= maxMetadataSizeWritten {
2435		// be a nitpicker, never produce something you can't consume
2436		return nil, errors.New("metadata can't be this big, please report to rclone developers")
2437	}
2438	return data, err
2439}
2440
2441// unmarshalSimpleJSON parses metadata.
2442//
2443// In case of errors returns a flag telling whether input has been
2444// produced by incompatible version of rclone vs wasn't metadata at all.
2445// Only metadata format version 1 is supported atm.
2446// Future releases will transparently migrate older metadata objects.
2447// New format will have a higher version number and cannot be correctly
2448// handled by current implementation.
2449// The version check below will then explicitly ask user to upgrade rclone.
2450//
2451func unmarshalSimpleJSON(ctx context.Context, metaObject fs.Object, data []byte) (info *ObjectInfo, madeByChunker bool, err error) {
2452	// Be strict about JSON format
2453	// to reduce possibility that a random small file resembles metadata.
2454	if len(data) > maxMetadataSizeWritten {
2455		return nil, false, ErrMetaTooBig
2456	}
2457	if data == nil || len(data) < 2 || data[0] != '{' || data[len(data)-1] != '}' {
2458		return nil, false, errors.New("invalid json")
2459	}
2460	var metadata metaSimpleJSON
2461	err = json.Unmarshal(data, &metadata)
2462	if err != nil {
2463		return nil, false, err
2464	}
2465	// Basic fields are strictly required
2466	// to reduce possibility that a random small file resembles metadata.
2467	if metadata.Version == nil || metadata.Size == nil || metadata.ChunkNum == nil {
2468		return nil, false, errors.New("missing required field")
2469	}
2470	// Perform strict checks, avoid corruption of future metadata formats.
2471	if *metadata.Version < 1 {
2472		return nil, false, errors.New("wrong version")
2473	}
2474	if *metadata.Size < 0 {
2475		return nil, false, errors.New("negative file size")
2476	}
2477	if *metadata.ChunkNum < 0 {
2478		return nil, false, errors.New("negative number of chunks")
2479	}
2480	if *metadata.ChunkNum > maxSafeChunkNumber {
2481		return nil, true, ErrChunkOverflow // produced by incompatible version of rclone
2482	}
2483	if metadata.MD5 != "" {
2484		_, err = hex.DecodeString(metadata.MD5)
2485		if len(metadata.MD5) != 32 || err != nil {
2486			return nil, false, errors.New("wrong md5 hash")
2487		}
2488	}
2489	if metadata.SHA1 != "" {
2490		_, err = hex.DecodeString(metadata.SHA1)
2491		if len(metadata.SHA1) != 40 || err != nil {
2492			return nil, false, errors.New("wrong sha1 hash")
2493		}
2494	}
2495	// ChunkNum is allowed to be 0 in future versions
2496	if *metadata.ChunkNum < 1 && *metadata.Version <= metadataVersion {
2497		return nil, false, errors.New("wrong number of chunks")
2498	}
2499	// Non-strict mode also accepts future metadata versions
2500	if *metadata.Version > metadataVersion {
2501		return nil, true, ErrMetaUnknown // produced by incompatible version of rclone
2502	}
2503
2504	var nilFs *Fs // nil object triggers appropriate type method
2505	info = nilFs.wrapInfo(metaObject, "", *metadata.Size)
2506	info.nChunks = *metadata.ChunkNum
2507	info.md5 = metadata.MD5
2508	info.sha1 = metadata.SHA1
2509	info.xactID = metadata.XactID
2510	return info, true, nil
2511}
2512
2513func silentlyRemove(ctx context.Context, o fs.Object) {
2514	_ = o.Remove(ctx) // ignore error
2515}
2516
2517// Name of the remote (as passed into NewFs)
2518func (f *Fs) Name() string {
2519	return f.name
2520}
2521
2522// Root of the remote (as passed into NewFs)
2523func (f *Fs) Root() string {
2524	return f.root
2525}
2526
2527// Features returns the optional features of this Fs
2528func (f *Fs) Features() *fs.Features {
2529	return f.features
2530}
2531
2532// String returns a description of the FS
2533func (f *Fs) String() string {
2534	return fmt.Sprintf("Chunked '%s:%s'", f.name, f.root)
2535}
2536
2537// Precision returns the precision of this Fs
2538func (f *Fs) Precision() time.Duration {
2539	return f.base.Precision()
2540}
2541
2542// CanQuickRename returns true if the Fs supports a quick rename operation
2543func (f *Fs) CanQuickRename() bool {
2544	return f.base.Features().Move != nil
2545}
2546
2547// Check the interfaces are satisfied
2548var (
2549	_ fs.Fs              = (*Fs)(nil)
2550	_ fs.Purger          = (*Fs)(nil)
2551	_ fs.Copier          = (*Fs)(nil)
2552	_ fs.Mover           = (*Fs)(nil)
2553	_ fs.DirMover        = (*Fs)(nil)
2554	_ fs.PutUncheckeder  = (*Fs)(nil)
2555	_ fs.PutStreamer     = (*Fs)(nil)
2556	_ fs.CleanUpper      = (*Fs)(nil)
2557	_ fs.UnWrapper       = (*Fs)(nil)
2558	_ fs.ListRer         = (*Fs)(nil)
2559	_ fs.Abouter         = (*Fs)(nil)
2560	_ fs.Wrapper         = (*Fs)(nil)
2561	_ fs.ChangeNotifier  = (*Fs)(nil)
2562	_ fs.Shutdowner      = (*Fs)(nil)
2563	_ fs.ObjectInfo      = (*ObjectInfo)(nil)
2564	_ fs.Object          = (*Object)(nil)
2565	_ fs.ObjectUnWrapper = (*Object)(nil)
2566	_ fs.IDer            = (*Object)(nil)
2567)
2568