1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017, 2018 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"context"
22	"fmt"
23	"io"
24	"io/ioutil"
25	"net/http"
26	"net/url"
27	"strconv"
28	"strings"
29	"time"
30
31	"github.com/google/uuid"
32	"github.com/minio/minio-go/v7/pkg/encrypt"
33	"github.com/minio/minio-go/v7/pkg/s3utils"
34)
35
36// CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs
37type CopyDestOptions struct {
38	Bucket string // points to destination bucket
39	Object string // points to destination object
40
41	// `Encryption` is the key info for server-side-encryption with customer
42	// provided key. If it is nil, no encryption is performed.
43	Encryption encrypt.ServerSide
44
45	// `userMeta` is the user-metadata key-value pairs to be set on the
46	// destination. The keys are automatically prefixed with `x-amz-meta-`
47	// if needed. If nil is passed, and if only a single source (of any
48	// size) is provided in the ComposeObject call, then metadata from the
49	// source is copied to the destination.
50	// if no user-metadata is provided, it is copied from source
51	// (when there is only once source object in the compose
52	// request)
53	UserMetadata map[string]string
54	// UserMetadata is only set to destination if ReplaceMetadata is true
55	// other value is UserMetadata is ignored and we preserve src.UserMetadata
56	// NOTE: if you set this value to true and now metadata is present
57	// in UserMetadata your destination object will not have any metadata
58	// set.
59	ReplaceMetadata bool
60
61	// `userTags` is the user defined object tags to be set on destination.
62	// This will be set only if the `replaceTags` field is set to true.
63	// Otherwise this field is ignored
64	UserTags    map[string]string
65	ReplaceTags bool
66
67	// Specifies whether you want to apply a Legal Hold to the copied object.
68	LegalHold LegalHoldStatus
69
70	// Object Retention related fields
71	Mode            RetentionMode
72	RetainUntilDate time.Time
73
74	Size int64 // Needs to be specified if progress bar is specified.
75	// Progress of the entire copy operation will be sent here.
76	Progress io.Reader
77}
78
79// Process custom-metadata to remove a `x-amz-meta-` prefix if
80// present and validate that keys are distinct (after this
81// prefix removal).
82func filterCustomMeta(userMeta map[string]string) map[string]string {
83	m := make(map[string]string)
84	for k, v := range userMeta {
85		if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") {
86			k = k[len("x-amz-meta-"):]
87		}
88		if _, ok := m[k]; ok {
89			continue
90		}
91		m[k] = v
92	}
93	return m
94}
95
96// Marshal converts all the CopyDestOptions into their
97// equivalent HTTP header representation
98func (opts CopyDestOptions) Marshal(header http.Header) {
99	const replaceDirective = "REPLACE"
100	if opts.ReplaceTags {
101		header.Set(amzTaggingHeaderDirective, replaceDirective)
102		if tags := s3utils.TagEncode(opts.UserTags); tags != "" {
103			header.Set(amzTaggingHeader, tags)
104		}
105	}
106
107	if opts.LegalHold != LegalHoldStatus("") {
108		header.Set(amzLegalHoldHeader, opts.LegalHold.String())
109	}
110
111	if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() {
112		header.Set(amzLockMode, opts.Mode.String())
113		header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339))
114	}
115
116	if opts.Encryption != nil {
117		opts.Encryption.Marshal(header)
118	}
119
120	if opts.ReplaceMetadata {
121		header.Set("x-amz-metadata-directive", replaceDirective)
122		for k, v := range filterCustomMeta(opts.UserMetadata) {
123			if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) {
124				header.Set(k, v)
125			} else {
126				header.Set("x-amz-meta-"+k, v)
127			}
128		}
129	}
130}
131
132// toDestinationInfo returns a validated copyOptions object.
133func (opts CopyDestOptions) validate() (err error) {
134	// Input validation.
135	if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
136		return err
137	}
138	if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
139		return err
140	}
141	if opts.Progress != nil && opts.Size < 0 {
142		return errInvalidArgument("For progress bar effective size needs to be specified")
143	}
144	return nil
145}
146
147// CopySrcOptions represents a source object to be copied, using
148// server-side copying APIs.
149type CopySrcOptions struct {
150	Bucket, Object       string
151	VersionID            string
152	MatchETag            string
153	NoMatchETag          string
154	MatchModifiedSince   time.Time
155	MatchUnmodifiedSince time.Time
156	MatchRange           bool
157	Start, End           int64
158	Encryption           encrypt.ServerSide
159}
160
161// Marshal converts all the CopySrcOptions into their
162// equivalent HTTP header representation
163func (opts CopySrcOptions) Marshal(header http.Header) {
164	// Set the source header
165	header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object))
166	if opts.VersionID != "" {
167		header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID)
168	}
169
170	if opts.MatchETag != "" {
171		header.Set("x-amz-copy-source-if-match", opts.MatchETag)
172	}
173	if opts.NoMatchETag != "" {
174		header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag)
175	}
176
177	if !opts.MatchModifiedSince.IsZero() {
178		header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat))
179	}
180	if !opts.MatchUnmodifiedSince.IsZero() {
181		header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat))
182	}
183
184	if opts.Encryption != nil {
185		encrypt.SSECopy(opts.Encryption).Marshal(header)
186	}
187}
188
189func (opts CopySrcOptions) validate() (err error) {
190	// Input validation.
191	if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil {
192		return err
193	}
194	if err = s3utils.CheckValidObjectName(opts.Object); err != nil {
195		return err
196	}
197	if opts.Start > opts.End || opts.Start < 0 {
198		return errInvalidArgument("start must be non-negative, and start must be at most end.")
199	}
200	return nil
201}
202
203// Low level implementation of CopyObject API, supports only upto 5GiB worth of copy.
204func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string,
205	metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions) (ObjectInfo, error) {
206
207	// Build headers.
208	headers := make(http.Header)
209
210	// Set all the metadata headers.
211	for k, v := range metadata {
212		headers.Set(k, v)
213	}
214	if !dstOpts.Internal.ReplicationStatus.Empty() {
215		headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus))
216	}
217	if !dstOpts.Internal.SourceMTime.IsZero() {
218		headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano))
219	}
220	if dstOpts.Internal.SourceETag != "" {
221		headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag)
222	}
223	if len(dstOpts.UserTags) != 0 {
224		headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags))
225	}
226
227	reqMetadata := requestMetadata{
228		bucketName:   destBucket,
229		objectName:   destObject,
230		customHeader: headers,
231	}
232	if dstOpts.Internal.SourceVersionID != "" {
233		if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil {
234			return ObjectInfo{}, errInvalidArgument(err.Error())
235		}
236		urlValues := make(url.Values)
237		urlValues.Set("versionId", dstOpts.Internal.SourceVersionID)
238		reqMetadata.queryValues = urlValues
239	}
240
241	// Set the source header
242	headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
243	if srcOpts.VersionID != "" {
244		headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID)
245	}
246	// Send upload-part-copy request
247	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
248	defer closeResponse(resp)
249	if err != nil {
250		return ObjectInfo{}, err
251	}
252
253	// Check if we got an error response.
254	if resp.StatusCode != http.StatusOK {
255		return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject)
256	}
257
258	cpObjRes := copyObjectResult{}
259	err = xmlDecoder(resp.Body, &cpObjRes)
260	if err != nil {
261		return ObjectInfo{}, err
262	}
263
264	objInfo := ObjectInfo{
265		Key:          destObject,
266		ETag:         strings.Trim(cpObjRes.ETag, "\""),
267		LastModified: cpObjRes.LastModified,
268	}
269	return objInfo, nil
270}
271
272func (c Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string,
273	partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) {
274
275	headers := make(http.Header)
276
277	// Set source
278	headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject))
279
280	if startOffset < 0 {
281		return p, errInvalidArgument("startOffset must be non-negative")
282	}
283
284	if length >= 0 {
285		headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1))
286	}
287
288	for k, v := range metadata {
289		headers.Set(k, v)
290	}
291
292	queryValues := make(url.Values)
293	queryValues.Set("partNumber", strconv.Itoa(partID))
294	queryValues.Set("uploadId", uploadID)
295
296	resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
297		bucketName:   destBucket,
298		objectName:   destObject,
299		customHeader: headers,
300		queryValues:  queryValues,
301	})
302	defer closeResponse(resp)
303	if err != nil {
304		return
305	}
306
307	// Check if we got an error response.
308	if resp.StatusCode != http.StatusOK {
309		return p, httpRespToErrorResponse(resp, destBucket, destObject)
310	}
311
312	// Decode copy-part response on success.
313	cpObjRes := copyObjectResult{}
314	err = xmlDecoder(resp.Body, &cpObjRes)
315	if err != nil {
316		return p, err
317	}
318	p.PartNumber, p.ETag = partID, cpObjRes.ETag
319	return p, nil
320}
321
322// uploadPartCopy - helper function to create a part in a multipart
323// upload via an upload-part-copy request
324// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html
325func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int,
326	headers http.Header) (p CompletePart, err error) {
327
328	// Build query parameters
329	urlValues := make(url.Values)
330	urlValues.Set("partNumber", strconv.Itoa(partNumber))
331	urlValues.Set("uploadId", uploadID)
332
333	// Send upload-part-copy request
334	resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{
335		bucketName:   bucket,
336		objectName:   object,
337		customHeader: headers,
338		queryValues:  urlValues,
339	})
340	defer closeResponse(resp)
341	if err != nil {
342		return p, err
343	}
344
345	// Check if we got an error response.
346	if resp.StatusCode != http.StatusOK {
347		return p, httpRespToErrorResponse(resp, bucket, object)
348	}
349
350	// Decode copy-part response on success.
351	cpObjRes := copyObjectResult{}
352	err = xmlDecoder(resp.Body, &cpObjRes)
353	if err != nil {
354		return p, err
355	}
356	p.PartNumber, p.ETag = partNumber, cpObjRes.ETag
357	return p, nil
358}
359
360// ComposeObject - creates an object using server-side copying
361// of existing objects. It takes a list of source objects (with optional offsets)
362// and concatenates them into a new object using only server-side copying
363// operations. Optionally takes progress reader hook for applications to
364// look at current progress.
365func (c Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) {
366	if len(srcs) < 1 || len(srcs) > maxPartsCount {
367		return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.")
368	}
369
370	for _, src := range srcs {
371		if err := src.validate(); err != nil {
372			return UploadInfo{}, err
373		}
374	}
375
376	if err := dst.validate(); err != nil {
377		return UploadInfo{}, err
378	}
379
380	srcObjectInfos := make([]ObjectInfo, len(srcs))
381	srcObjectSizes := make([]int64, len(srcs))
382	var totalSize, totalParts int64
383	var err error
384	for i, src := range srcs {
385		opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID}
386		srcObjectInfos[i], err = c.statObject(context.Background(), src.Bucket, src.Object, opts)
387		if err != nil {
388			return UploadInfo{}, err
389		}
390
391		srcCopySize := srcObjectInfos[i].Size
392		// Check if a segment is specified, and if so, is the
393		// segment within object bounds?
394		if src.MatchRange {
395			// Since range is specified,
396			//    0 <= src.start <= src.end
397			// so only invalid case to check is:
398			if src.End >= srcCopySize || src.Start < 0 {
399				return UploadInfo{}, errInvalidArgument(
400					fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)",
401						i, src.Start, src.End, srcCopySize))
402			}
403			srcCopySize = src.End - src.Start + 1
404		}
405
406		// Only the last source may be less than `absMinPartSize`
407		if srcCopySize < absMinPartSize && i < len(srcs)-1 {
408			return UploadInfo{}, errInvalidArgument(
409				fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize))
410		}
411
412		// Is data to copy too large?
413		totalSize += srcCopySize
414		if totalSize > maxMultipartPutObjectSize {
415			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize))
416		}
417
418		// record source size
419		srcObjectSizes[i] = srcCopySize
420
421		// calculate parts needed for current source
422		totalParts += partsRequired(srcCopySize)
423		// Do we need more parts than we are allowed?
424		if totalParts > maxPartsCount {
425			return UploadInfo{}, errInvalidArgument(fmt.Sprintf(
426				"Your proposed compose object requires more than %d parts", maxPartsCount))
427		}
428	}
429
430	// Single source object case (i.e. when only one source is
431	// involved, it is being copied wholly and at most 5GiB in
432	// size, emptyfiles are also supported).
433	if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) {
434		return c.CopyObject(ctx, dst, srcs[0])
435	}
436
437	// Now, handle multipart-copy cases.
438
439	// 1. Ensure that the object has not been changed while
440	//    we are copying data.
441	for i, src := range srcs {
442		src.MatchETag = srcObjectInfos[i].ETag
443	}
444
445	// 2. Initiate a new multipart upload.
446
447	// Set user-metadata on the destination object. If no
448	// user-metadata is specified, and there is only one source,
449	// (only) then metadata from source is copied.
450	var userMeta map[string]string
451	if dst.ReplaceMetadata {
452		userMeta = dst.UserMetadata
453	} else {
454		userMeta = srcObjectInfos[0].UserMetadata
455	}
456
457	var userTags map[string]string
458	if dst.ReplaceTags {
459		userTags = dst.UserTags
460	} else {
461		userTags = srcObjectInfos[0].UserTags
462	}
463
464	uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{
465		ServerSideEncryption: dst.Encryption,
466		UserMetadata:         userMeta,
467		UserTags:             userTags,
468		Mode:                 dst.Mode,
469		RetainUntilDate:      dst.RetainUntilDate,
470		LegalHold:            dst.LegalHold,
471	})
472	if err != nil {
473		return UploadInfo{}, err
474	}
475
476	// 3. Perform copy part uploads
477	objParts := []CompletePart{}
478	partIndex := 1
479	for i, src := range srcs {
480		var h = make(http.Header)
481		src.Marshal(h)
482		if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC {
483			dst.Encryption.Marshal(h)
484		}
485
486		// calculate start/end indices of parts after
487		// splitting.
488		startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src)
489		for j, start := range startIdx {
490			end := endIdx[j]
491
492			// Add (or reset) source range header for
493			// upload part copy request.
494			h.Set("x-amz-copy-source-range",
495				fmt.Sprintf("bytes=%d-%d", start, end))
496
497			// make upload-part-copy request
498			complPart, err := c.uploadPartCopy(ctx, dst.Bucket,
499				dst.Object, uploadID, partIndex, h)
500			if err != nil {
501				return UploadInfo{}, err
502			}
503			if dst.Progress != nil {
504				io.CopyN(ioutil.Discard, dst.Progress, end-start+1)
505			}
506			objParts = append(objParts, complPart)
507			partIndex++
508		}
509	}
510
511	// 4. Make final complete-multipart request.
512	uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID,
513		completeMultipartUpload{Parts: objParts})
514	if err != nil {
515		return UploadInfo{}, err
516	}
517
518	uploadInfo.Size = totalSize
519	return uploadInfo, nil
520}
521
522// partsRequired is maximum parts possible with
523// max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1))
524func partsRequired(size int64) int64 {
525	maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1)
526	r := size / int64(maxPartSize)
527	if size%int64(maxPartSize) > 0 {
528		r++
529	}
530	return r
531}
532
533// calculateEvenSplits - computes splits for a source and returns
534// start and end index slices. Splits happen evenly to be sure that no
535// part is less than 5MiB, as that could fail the multipart request if
536// it is not the last part.
537func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) {
538	if size == 0 {
539		return
540	}
541
542	reqParts := partsRequired(size)
543	startIndex = make([]int64, reqParts)
544	endIndex = make([]int64, reqParts)
545	// Compute number of required parts `k`, as:
546	//
547	// k = ceiling(size / copyPartSize)
548	//
549	// Now, distribute the `size` bytes in the source into
550	// k parts as evenly as possible:
551	//
552	// r parts sized (q+1) bytes, and
553	// (k - r) parts sized q bytes, where
554	//
555	// size = q * k + r (by simple division of size by k,
556	// so that 0 <= r < k)
557	//
558	start := src.Start
559	if start == -1 {
560		start = 0
561	}
562	quot, rem := size/reqParts, size%reqParts
563	nextStart := start
564	for j := int64(0); j < reqParts; j++ {
565		curPartSize := quot
566		if j < rem {
567			curPartSize++
568		}
569
570		cStart := nextStart
571		cEnd := cStart + curPartSize - 1
572		nextStart = cEnd + 1
573
574		startIndex[j], endIndex[j] = cStart, cEnd
575	}
576	return
577}
578