1/* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017, 2018 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "context" 22 "fmt" 23 "io" 24 "io/ioutil" 25 "net/http" 26 "net/url" 27 "strconv" 28 "strings" 29 "time" 30 31 "github.com/google/uuid" 32 "github.com/minio/minio-go/v7/pkg/encrypt" 33 "github.com/minio/minio-go/v7/pkg/s3utils" 34) 35 36// CopyDestOptions represents options specified by user for CopyObject/ComposeObject APIs 37type CopyDestOptions struct { 38 Bucket string // points to destination bucket 39 Object string // points to destination object 40 41 // `Encryption` is the key info for server-side-encryption with customer 42 // provided key. If it is nil, no encryption is performed. 43 Encryption encrypt.ServerSide 44 45 // `userMeta` is the user-metadata key-value pairs to be set on the 46 // destination. The keys are automatically prefixed with `x-amz-meta-` 47 // if needed. If nil is passed, and if only a single source (of any 48 // size) is provided in the ComposeObject call, then metadata from the 49 // source is copied to the destination. 50 // if no user-metadata is provided, it is copied from source 51 // (when there is only once source object in the compose 52 // request) 53 UserMetadata map[string]string 54 // UserMetadata is only set to destination if ReplaceMetadata is true 55 // other value is UserMetadata is ignored and we preserve src.UserMetadata 56 // NOTE: if you set this value to true and now metadata is present 57 // in UserMetadata your destination object will not have any metadata 58 // set. 59 ReplaceMetadata bool 60 61 // `userTags` is the user defined object tags to be set on destination. 62 // This will be set only if the `replaceTags` field is set to true. 63 // Otherwise this field is ignored 64 UserTags map[string]string 65 ReplaceTags bool 66 67 // Specifies whether you want to apply a Legal Hold to the copied object. 68 LegalHold LegalHoldStatus 69 70 // Object Retention related fields 71 Mode RetentionMode 72 RetainUntilDate time.Time 73 74 Size int64 // Needs to be specified if progress bar is specified. 75 // Progress of the entire copy operation will be sent here. 76 Progress io.Reader 77} 78 79// Process custom-metadata to remove a `x-amz-meta-` prefix if 80// present and validate that keys are distinct (after this 81// prefix removal). 82func filterCustomMeta(userMeta map[string]string) map[string]string { 83 m := make(map[string]string) 84 for k, v := range userMeta { 85 if strings.HasPrefix(strings.ToLower(k), "x-amz-meta-") { 86 k = k[len("x-amz-meta-"):] 87 } 88 if _, ok := m[k]; ok { 89 continue 90 } 91 m[k] = v 92 } 93 return m 94} 95 96// Marshal converts all the CopyDestOptions into their 97// equivalent HTTP header representation 98func (opts CopyDestOptions) Marshal(header http.Header) { 99 const replaceDirective = "REPLACE" 100 if opts.ReplaceTags { 101 header.Set(amzTaggingHeaderDirective, replaceDirective) 102 if tags := s3utils.TagEncode(opts.UserTags); tags != "" { 103 header.Set(amzTaggingHeader, tags) 104 } 105 } 106 107 if opts.LegalHold != LegalHoldStatus("") { 108 header.Set(amzLegalHoldHeader, opts.LegalHold.String()) 109 } 110 111 if opts.Mode != RetentionMode("") && !opts.RetainUntilDate.IsZero() { 112 header.Set(amzLockMode, opts.Mode.String()) 113 header.Set(amzLockRetainUntil, opts.RetainUntilDate.Format(time.RFC3339)) 114 } 115 116 if opts.Encryption != nil { 117 opts.Encryption.Marshal(header) 118 } 119 120 if opts.ReplaceMetadata { 121 header.Set("x-amz-metadata-directive", replaceDirective) 122 for k, v := range filterCustomMeta(opts.UserMetadata) { 123 if isAmzHeader(k) || isStandardHeader(k) || isStorageClassHeader(k) { 124 header.Set(k, v) 125 } else { 126 header.Set("x-amz-meta-"+k, v) 127 } 128 } 129 } 130} 131 132// toDestinationInfo returns a validated copyOptions object. 133func (opts CopyDestOptions) validate() (err error) { 134 // Input validation. 135 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil { 136 return err 137 } 138 if err = s3utils.CheckValidObjectName(opts.Object); err != nil { 139 return err 140 } 141 if opts.Progress != nil && opts.Size < 0 { 142 return errInvalidArgument("For progress bar effective size needs to be specified") 143 } 144 return nil 145} 146 147// CopySrcOptions represents a source object to be copied, using 148// server-side copying APIs. 149type CopySrcOptions struct { 150 Bucket, Object string 151 VersionID string 152 MatchETag string 153 NoMatchETag string 154 MatchModifiedSince time.Time 155 MatchUnmodifiedSince time.Time 156 MatchRange bool 157 Start, End int64 158 Encryption encrypt.ServerSide 159} 160 161// Marshal converts all the CopySrcOptions into their 162// equivalent HTTP header representation 163func (opts CopySrcOptions) Marshal(header http.Header) { 164 // Set the source header 165 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)) 166 if opts.VersionID != "" { 167 header.Set("x-amz-copy-source", s3utils.EncodePath(opts.Bucket+"/"+opts.Object)+"?versionId="+opts.VersionID) 168 } 169 170 if opts.MatchETag != "" { 171 header.Set("x-amz-copy-source-if-match", opts.MatchETag) 172 } 173 if opts.NoMatchETag != "" { 174 header.Set("x-amz-copy-source-if-none-match", opts.NoMatchETag) 175 } 176 177 if !opts.MatchModifiedSince.IsZero() { 178 header.Set("x-amz-copy-source-if-modified-since", opts.MatchModifiedSince.Format(http.TimeFormat)) 179 } 180 if !opts.MatchUnmodifiedSince.IsZero() { 181 header.Set("x-amz-copy-source-if-unmodified-since", opts.MatchUnmodifiedSince.Format(http.TimeFormat)) 182 } 183 184 if opts.Encryption != nil { 185 encrypt.SSECopy(opts.Encryption).Marshal(header) 186 } 187} 188 189func (opts CopySrcOptions) validate() (err error) { 190 // Input validation. 191 if err = s3utils.CheckValidBucketName(opts.Bucket); err != nil { 192 return err 193 } 194 if err = s3utils.CheckValidObjectName(opts.Object); err != nil { 195 return err 196 } 197 if opts.Start > opts.End || opts.Start < 0 { 198 return errInvalidArgument("start must be non-negative, and start must be at most end.") 199 } 200 return nil 201} 202 203// Low level implementation of CopyObject API, supports only upto 5GiB worth of copy. 204func (c Client) copyObjectDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, 205 metadata map[string]string, srcOpts CopySrcOptions, dstOpts PutObjectOptions) (ObjectInfo, error) { 206 207 // Build headers. 208 headers := make(http.Header) 209 210 // Set all the metadata headers. 211 for k, v := range metadata { 212 headers.Set(k, v) 213 } 214 if !dstOpts.Internal.ReplicationStatus.Empty() { 215 headers.Set(amzBucketReplicationStatus, string(dstOpts.Internal.ReplicationStatus)) 216 } 217 if !dstOpts.Internal.SourceMTime.IsZero() { 218 headers.Set(minIOBucketSourceMTime, dstOpts.Internal.SourceMTime.Format(time.RFC3339Nano)) 219 } 220 if dstOpts.Internal.SourceETag != "" { 221 headers.Set(minIOBucketSourceETag, dstOpts.Internal.SourceETag) 222 } 223 if len(dstOpts.UserTags) != 0 { 224 headers.Set(amzTaggingHeader, s3utils.TagEncode(dstOpts.UserTags)) 225 } 226 227 reqMetadata := requestMetadata{ 228 bucketName: destBucket, 229 objectName: destObject, 230 customHeader: headers, 231 } 232 if dstOpts.Internal.SourceVersionID != "" { 233 if _, err := uuid.Parse(dstOpts.Internal.SourceVersionID); err != nil { 234 return ObjectInfo{}, errInvalidArgument(err.Error()) 235 } 236 urlValues := make(url.Values) 237 urlValues.Set("versionId", dstOpts.Internal.SourceVersionID) 238 reqMetadata.queryValues = urlValues 239 } 240 241 // Set the source header 242 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)) 243 if srcOpts.VersionID != "" { 244 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)+"?versionId="+srcOpts.VersionID) 245 } 246 // Send upload-part-copy request 247 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 248 defer closeResponse(resp) 249 if err != nil { 250 return ObjectInfo{}, err 251 } 252 253 // Check if we got an error response. 254 if resp.StatusCode != http.StatusOK { 255 return ObjectInfo{}, httpRespToErrorResponse(resp, srcBucket, srcObject) 256 } 257 258 cpObjRes := copyObjectResult{} 259 err = xmlDecoder(resp.Body, &cpObjRes) 260 if err != nil { 261 return ObjectInfo{}, err 262 } 263 264 objInfo := ObjectInfo{ 265 Key: destObject, 266 ETag: strings.Trim(cpObjRes.ETag, "\""), 267 LastModified: cpObjRes.LastModified, 268 } 269 return objInfo, nil 270} 271 272func (c Client) copyObjectPartDo(ctx context.Context, srcBucket, srcObject, destBucket, destObject string, uploadID string, 273 partID int, startOffset int64, length int64, metadata map[string]string) (p CompletePart, err error) { 274 275 headers := make(http.Header) 276 277 // Set source 278 headers.Set("x-amz-copy-source", s3utils.EncodePath(srcBucket+"/"+srcObject)) 279 280 if startOffset < 0 { 281 return p, errInvalidArgument("startOffset must be non-negative") 282 } 283 284 if length >= 0 { 285 headers.Set("x-amz-copy-source-range", fmt.Sprintf("bytes=%d-%d", startOffset, startOffset+length-1)) 286 } 287 288 for k, v := range metadata { 289 headers.Set(k, v) 290 } 291 292 queryValues := make(url.Values) 293 queryValues.Set("partNumber", strconv.Itoa(partID)) 294 queryValues.Set("uploadId", uploadID) 295 296 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{ 297 bucketName: destBucket, 298 objectName: destObject, 299 customHeader: headers, 300 queryValues: queryValues, 301 }) 302 defer closeResponse(resp) 303 if err != nil { 304 return 305 } 306 307 // Check if we got an error response. 308 if resp.StatusCode != http.StatusOK { 309 return p, httpRespToErrorResponse(resp, destBucket, destObject) 310 } 311 312 // Decode copy-part response on success. 313 cpObjRes := copyObjectResult{} 314 err = xmlDecoder(resp.Body, &cpObjRes) 315 if err != nil { 316 return p, err 317 } 318 p.PartNumber, p.ETag = partID, cpObjRes.ETag 319 return p, nil 320} 321 322// uploadPartCopy - helper function to create a part in a multipart 323// upload via an upload-part-copy request 324// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPartCopy.html 325func (c Client) uploadPartCopy(ctx context.Context, bucket, object, uploadID string, partNumber int, 326 headers http.Header) (p CompletePart, err error) { 327 328 // Build query parameters 329 urlValues := make(url.Values) 330 urlValues.Set("partNumber", strconv.Itoa(partNumber)) 331 urlValues.Set("uploadId", uploadID) 332 333 // Send upload-part-copy request 334 resp, err := c.executeMethod(ctx, http.MethodPut, requestMetadata{ 335 bucketName: bucket, 336 objectName: object, 337 customHeader: headers, 338 queryValues: urlValues, 339 }) 340 defer closeResponse(resp) 341 if err != nil { 342 return p, err 343 } 344 345 // Check if we got an error response. 346 if resp.StatusCode != http.StatusOK { 347 return p, httpRespToErrorResponse(resp, bucket, object) 348 } 349 350 // Decode copy-part response on success. 351 cpObjRes := copyObjectResult{} 352 err = xmlDecoder(resp.Body, &cpObjRes) 353 if err != nil { 354 return p, err 355 } 356 p.PartNumber, p.ETag = partNumber, cpObjRes.ETag 357 return p, nil 358} 359 360// ComposeObject - creates an object using server-side copying 361// of existing objects. It takes a list of source objects (with optional offsets) 362// and concatenates them into a new object using only server-side copying 363// operations. Optionally takes progress reader hook for applications to 364// look at current progress. 365func (c Client) ComposeObject(ctx context.Context, dst CopyDestOptions, srcs ...CopySrcOptions) (UploadInfo, error) { 366 if len(srcs) < 1 || len(srcs) > maxPartsCount { 367 return UploadInfo{}, errInvalidArgument("There must be as least one and up to 10000 source objects.") 368 } 369 370 for _, src := range srcs { 371 if err := src.validate(); err != nil { 372 return UploadInfo{}, err 373 } 374 } 375 376 if err := dst.validate(); err != nil { 377 return UploadInfo{}, err 378 } 379 380 srcObjectInfos := make([]ObjectInfo, len(srcs)) 381 srcObjectSizes := make([]int64, len(srcs)) 382 var totalSize, totalParts int64 383 var err error 384 for i, src := range srcs { 385 opts := StatObjectOptions{ServerSideEncryption: encrypt.SSE(src.Encryption), VersionID: src.VersionID} 386 srcObjectInfos[i], err = c.statObject(context.Background(), src.Bucket, src.Object, opts) 387 if err != nil { 388 return UploadInfo{}, err 389 } 390 391 srcCopySize := srcObjectInfos[i].Size 392 // Check if a segment is specified, and if so, is the 393 // segment within object bounds? 394 if src.MatchRange { 395 // Since range is specified, 396 // 0 <= src.start <= src.end 397 // so only invalid case to check is: 398 if src.End >= srcCopySize || src.Start < 0 { 399 return UploadInfo{}, errInvalidArgument( 400 fmt.Sprintf("CopySrcOptions %d has invalid segment-to-copy [%d, %d] (size is %d)", 401 i, src.Start, src.End, srcCopySize)) 402 } 403 srcCopySize = src.End - src.Start + 1 404 } 405 406 // Only the last source may be less than `absMinPartSize` 407 if srcCopySize < absMinPartSize && i < len(srcs)-1 { 408 return UploadInfo{}, errInvalidArgument( 409 fmt.Sprintf("CopySrcOptions %d is too small (%d) and it is not the last part", i, srcCopySize)) 410 } 411 412 // Is data to copy too large? 413 totalSize += srcCopySize 414 if totalSize > maxMultipartPutObjectSize { 415 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Cannot compose an object of size %d (> 5TiB)", totalSize)) 416 } 417 418 // record source size 419 srcObjectSizes[i] = srcCopySize 420 421 // calculate parts needed for current source 422 totalParts += partsRequired(srcCopySize) 423 // Do we need more parts than we are allowed? 424 if totalParts > maxPartsCount { 425 return UploadInfo{}, errInvalidArgument(fmt.Sprintf( 426 "Your proposed compose object requires more than %d parts", maxPartsCount)) 427 } 428 } 429 430 // Single source object case (i.e. when only one source is 431 // involved, it is being copied wholly and at most 5GiB in 432 // size, emptyfiles are also supported). 433 if (totalParts == 1 && srcs[0].Start == -1 && totalSize <= maxPartSize) || (totalSize == 0) { 434 return c.CopyObject(ctx, dst, srcs[0]) 435 } 436 437 // Now, handle multipart-copy cases. 438 439 // 1. Ensure that the object has not been changed while 440 // we are copying data. 441 for i, src := range srcs { 442 src.MatchETag = srcObjectInfos[i].ETag 443 } 444 445 // 2. Initiate a new multipart upload. 446 447 // Set user-metadata on the destination object. If no 448 // user-metadata is specified, and there is only one source, 449 // (only) then metadata from source is copied. 450 var userMeta map[string]string 451 if dst.ReplaceMetadata { 452 userMeta = dst.UserMetadata 453 } else { 454 userMeta = srcObjectInfos[0].UserMetadata 455 } 456 457 var userTags map[string]string 458 if dst.ReplaceTags { 459 userTags = dst.UserTags 460 } else { 461 userTags = srcObjectInfos[0].UserTags 462 } 463 464 uploadID, err := c.newUploadID(ctx, dst.Bucket, dst.Object, PutObjectOptions{ 465 ServerSideEncryption: dst.Encryption, 466 UserMetadata: userMeta, 467 UserTags: userTags, 468 Mode: dst.Mode, 469 RetainUntilDate: dst.RetainUntilDate, 470 LegalHold: dst.LegalHold, 471 }) 472 if err != nil { 473 return UploadInfo{}, err 474 } 475 476 // 3. Perform copy part uploads 477 objParts := []CompletePart{} 478 partIndex := 1 479 for i, src := range srcs { 480 var h = make(http.Header) 481 src.Marshal(h) 482 if dst.Encryption != nil && dst.Encryption.Type() == encrypt.SSEC { 483 dst.Encryption.Marshal(h) 484 } 485 486 // calculate start/end indices of parts after 487 // splitting. 488 startIdx, endIdx := calculateEvenSplits(srcObjectSizes[i], src) 489 for j, start := range startIdx { 490 end := endIdx[j] 491 492 // Add (or reset) source range header for 493 // upload part copy request. 494 h.Set("x-amz-copy-source-range", 495 fmt.Sprintf("bytes=%d-%d", start, end)) 496 497 // make upload-part-copy request 498 complPart, err := c.uploadPartCopy(ctx, dst.Bucket, 499 dst.Object, uploadID, partIndex, h) 500 if err != nil { 501 return UploadInfo{}, err 502 } 503 if dst.Progress != nil { 504 io.CopyN(ioutil.Discard, dst.Progress, end-start+1) 505 } 506 objParts = append(objParts, complPart) 507 partIndex++ 508 } 509 } 510 511 // 4. Make final complete-multipart request. 512 uploadInfo, err := c.completeMultipartUpload(ctx, dst.Bucket, dst.Object, uploadID, 513 completeMultipartUpload{Parts: objParts}) 514 if err != nil { 515 return UploadInfo{}, err 516 } 517 518 uploadInfo.Size = totalSize 519 return uploadInfo, nil 520} 521 522// partsRequired is maximum parts possible with 523// max part size of ceiling(maxMultipartPutObjectSize / (maxPartsCount - 1)) 524func partsRequired(size int64) int64 { 525 maxPartSize := maxMultipartPutObjectSize / (maxPartsCount - 1) 526 r := size / int64(maxPartSize) 527 if size%int64(maxPartSize) > 0 { 528 r++ 529 } 530 return r 531} 532 533// calculateEvenSplits - computes splits for a source and returns 534// start and end index slices. Splits happen evenly to be sure that no 535// part is less than 5MiB, as that could fail the multipart request if 536// it is not the last part. 537func calculateEvenSplits(size int64, src CopySrcOptions) (startIndex, endIndex []int64) { 538 if size == 0 { 539 return 540 } 541 542 reqParts := partsRequired(size) 543 startIndex = make([]int64, reqParts) 544 endIndex = make([]int64, reqParts) 545 // Compute number of required parts `k`, as: 546 // 547 // k = ceiling(size / copyPartSize) 548 // 549 // Now, distribute the `size` bytes in the source into 550 // k parts as evenly as possible: 551 // 552 // r parts sized (q+1) bytes, and 553 // (k - r) parts sized q bytes, where 554 // 555 // size = q * k + r (by simple division of size by k, 556 // so that 0 <= r < k) 557 // 558 start := src.Start 559 if start == -1 { 560 start = 0 561 } 562 quot, rem := size/reqParts, size%reqParts 563 nextStart := start 564 for j := int64(0); j < reqParts; j++ { 565 curPartSize := quot 566 if j < rem { 567 curPartSize++ 568 } 569 570 cStart := nextStart 571 cEnd := cStart + curPartSize - 1 572 nextStart = cEnd + 1 573 574 startIndex[j], endIndex[j] = cStart, cEnd 575 } 576 return 577} 578