1/* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2017 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "fmt" 25 "io" 26 "net/http" 27 "net/url" 28 "sort" 29 "strings" 30 31 "github.com/google/uuid" 32 "github.com/minio/minio-go/v7/pkg/s3utils" 33) 34 35// putObjectMultipartStream - upload a large object using 36// multipart upload and streaming signature for signing payload. 37// Comprehensive put object operation involving multipart uploads. 38// 39// Following code handles these types of readers. 40// 41// - *minio.Object 42// - Any reader which has a method 'ReadAt()' 43// 44func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string, 45 reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 46 47 if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 { 48 // Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader. 49 info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts) 50 } else { 51 info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts) 52 } 53 if err != nil { 54 errResp := ToErrorResponse(err) 55 // Verify if multipart functionality is not available, if not 56 // fall back to single PutObject operation. 57 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { 58 // Verify if size of reader is greater than '5GiB'. 59 if size > maxSinglePutObjectSize { 60 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) 61 } 62 // Fall back to uploading as single PutObject operation. 63 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 64 } 65 } 66 return info, err 67} 68 69// uploadedPartRes - the response received from a part upload. 70type uploadedPartRes struct { 71 Error error // Any error encountered while uploading the part. 72 PartNum int // Number of the part uploaded. 73 Size int64 // Size of the part uploaded. 74 Part ObjectPart 75} 76 77type uploadPartReq struct { 78 PartNum int // Number of the part uploaded. 79 Part ObjectPart // Size of the part uploaded. 80} 81 82// putObjectMultipartFromReadAt - Uploads files bigger than 128MiB. 83// Supports all readers which implements io.ReaderAt interface 84// (ReadAt method). 85// 86// NOTE: This function is meant to be used for all readers which 87// implement io.ReaderAt which allows us for resuming multipart 88// uploads but reading at an offset, which would avoid re-read the 89// data which was already uploaded. Internally this function uses 90// temporary files for staging all the data, these temporary files are 91// cleaned automatically when the caller i.e http client closes the 92// stream after uploading all the contents successfully. 93func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string, 94 reader io.ReaderAt, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 95 // Input validation. 96 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 97 return UploadInfo{}, err 98 } 99 if err = s3utils.CheckValidObjectName(objectName); err != nil { 100 return UploadInfo{}, err 101 } 102 103 // Calculate the optimal parts info for a given size. 104 totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size, opts.PartSize) 105 if err != nil { 106 return UploadInfo{}, err 107 } 108 109 // Initiate a new multipart upload. 110 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 111 if err != nil { 112 return UploadInfo{}, err 113 } 114 115 // Aborts the multipart upload in progress, if the 116 // function returns any error, since we do not resume 117 // we should purge the parts which have been uploaded 118 // to relinquish storage space. 119 defer func() { 120 if err != nil { 121 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 122 } 123 }() 124 125 // Total data read and written to server. should be equal to 'size' at the end of the call. 126 var totalUploadedSize int64 127 128 // Complete multipart upload. 129 var complMultipartUpload completeMultipartUpload 130 131 // Declare a channel that sends the next part number to be uploaded. 132 // Buffered to 10000 because thats the maximum number of parts allowed 133 // by S3. 134 uploadPartsCh := make(chan uploadPartReq, 10000) 135 136 // Declare a channel that sends back the response of a part upload. 137 // Buffered to 10000 because thats the maximum number of parts allowed 138 // by S3. 139 uploadedPartsCh := make(chan uploadedPartRes, 10000) 140 141 // Used for readability, lastPartNumber is always totalPartsCount. 142 lastPartNumber := totalPartsCount 143 144 // Send each part number to the channel to be processed. 145 for p := 1; p <= totalPartsCount; p++ { 146 uploadPartsCh <- uploadPartReq{PartNum: p} 147 } 148 close(uploadPartsCh) 149 150 var partsBuf = make([][]byte, opts.getNumThreads()) 151 for i := range partsBuf { 152 partsBuf[i] = make([]byte, 0, partSize) 153 } 154 155 // Receive each part number from the channel allowing three parallel uploads. 156 for w := 1; w <= opts.getNumThreads(); w++ { 157 go func(w int, partSize int64) { 158 // Each worker will draw from the part channel and upload in parallel. 159 for uploadReq := range uploadPartsCh { 160 161 // If partNumber was not uploaded we calculate the missing 162 // part offset and size. For all other part numbers we 163 // calculate offset based on multiples of partSize. 164 readOffset := int64(uploadReq.PartNum-1) * partSize 165 166 // As a special case if partNumber is lastPartNumber, we 167 // calculate the offset based on the last part size. 168 if uploadReq.PartNum == lastPartNumber { 169 readOffset = (size - lastPartSize) 170 partSize = lastPartSize 171 } 172 173 n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize]) 174 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { 175 uploadedPartsCh <- uploadedPartRes{ 176 Error: rerr, 177 } 178 // Exit the goroutine. 179 return 180 } 181 182 // Get a section reader on a particular offset. 183 hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress) 184 185 // Proceed to upload the part. 186 objPart, err := c.uploadPart(ctx, bucketName, objectName, 187 uploadID, hookReader, uploadReq.PartNum, 188 "", "", partSize, opts.ServerSideEncryption) 189 if err != nil { 190 uploadedPartsCh <- uploadedPartRes{ 191 Error: err, 192 } 193 // Exit the goroutine. 194 return 195 } 196 197 // Save successfully uploaded part metadata. 198 uploadReq.Part = objPart 199 200 // Send successful part info through the channel. 201 uploadedPartsCh <- uploadedPartRes{ 202 Size: objPart.Size, 203 PartNum: uploadReq.PartNum, 204 Part: uploadReq.Part, 205 } 206 } 207 }(w, partSize) 208 } 209 210 // Gather the responses as they occur and update any 211 // progress bar. 212 for u := 1; u <= totalPartsCount; u++ { 213 uploadRes := <-uploadedPartsCh 214 if uploadRes.Error != nil { 215 return UploadInfo{}, uploadRes.Error 216 } 217 // Update the totalUploadedSize. 218 totalUploadedSize += uploadRes.Size 219 // Store the parts to be completed in order. 220 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 221 ETag: uploadRes.Part.ETag, 222 PartNumber: uploadRes.Part.PartNumber, 223 }) 224 } 225 226 // Verify if we uploaded all the data. 227 if totalUploadedSize != size { 228 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) 229 } 230 231 // Sort all completed parts. 232 sort.Sort(completedParts(complMultipartUpload.Parts)) 233 234 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload) 235 if err != nil { 236 return UploadInfo{}, err 237 } 238 239 uploadInfo.Size = totalUploadedSize 240 return uploadInfo, nil 241} 242 243func (c Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string, 244 reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 245 // Input validation. 246 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 247 return UploadInfo{}, err 248 } 249 if err = s3utils.CheckValidObjectName(objectName); err != nil { 250 return UploadInfo{}, err 251 } 252 253 // Calculate the optimal parts info for a given size. 254 totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size, opts.PartSize) 255 if err != nil { 256 return UploadInfo{}, err 257 } 258 // Initiates a new multipart request 259 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 260 if err != nil { 261 return UploadInfo{}, err 262 } 263 264 // Aborts the multipart upload if the function returns 265 // any error, since we do not resume we should purge 266 // the parts which have been uploaded to relinquish 267 // storage space. 268 defer func() { 269 if err != nil { 270 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 271 } 272 }() 273 274 // Total data read and written to server. should be equal to 'size' at the end of the call. 275 var totalUploadedSize int64 276 277 // Initialize parts uploaded map. 278 partsInfo := make(map[int]ObjectPart) 279 280 // Create a buffer. 281 buf := make([]byte, partSize) 282 283 // Avoid declaring variables in the for loop 284 var md5Base64 string 285 var hookReader io.Reader 286 287 // Part number always starts with '1'. 288 var partNumber int 289 for partNumber = 1; partNumber <= totalPartsCount; partNumber++ { 290 291 // Proceed to upload the part. 292 if partNumber == totalPartsCount { 293 partSize = lastPartSize 294 } 295 296 if opts.SendContentMd5 { 297 length, rerr := readFull(reader, buf) 298 if rerr == io.EOF && partNumber > 1 { 299 break 300 } 301 302 if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF { 303 return UploadInfo{}, rerr 304 } 305 306 // Calculate md5sum. 307 hash := c.md5Hasher() 308 hash.Write(buf[:length]) 309 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) 310 hash.Close() 311 312 // Update progress reader appropriately to the latest offset 313 // as we read from the source. 314 hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress) 315 } else { 316 // Update progress reader appropriately to the latest offset 317 // as we read from the source. 318 hookReader = newHook(reader, opts.Progress) 319 } 320 321 objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, 322 io.LimitReader(hookReader, partSize), 323 partNumber, md5Base64, "", partSize, opts.ServerSideEncryption) 324 if uerr != nil { 325 return UploadInfo{}, uerr 326 } 327 328 // Save successfully uploaded part metadata. 329 partsInfo[partNumber] = objPart 330 331 // Save successfully uploaded size. 332 totalUploadedSize += partSize 333 } 334 335 // Verify if we uploaded all the data. 336 if size > 0 { 337 if totalUploadedSize != size { 338 return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName) 339 } 340 } 341 342 // Complete multipart upload. 343 var complMultipartUpload completeMultipartUpload 344 345 // Loop over total uploaded parts to save them in 346 // Parts array before completing the multipart request. 347 for i := 1; i < partNumber; i++ { 348 part, ok := partsInfo[i] 349 if !ok { 350 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 351 } 352 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 353 ETag: part.ETag, 354 PartNumber: part.PartNumber, 355 }) 356 } 357 358 // Sort all completed parts. 359 sort.Sort(completedParts(complMultipartUpload.Parts)) 360 361 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload) 362 if err != nil { 363 return UploadInfo{}, err 364 } 365 366 uploadInfo.Size = totalUploadedSize 367 return uploadInfo, nil 368} 369 370// putObject special function used Google Cloud Storage. This special function 371// is used for Google Cloud Storage since Google's multipart API is not S3 compatible. 372func (c Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) { 373 // Input validation. 374 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 375 return UploadInfo{}, err 376 } 377 if err := s3utils.CheckValidObjectName(objectName); err != nil { 378 return UploadInfo{}, err 379 } 380 381 // Size -1 is only supported on Google Cloud Storage, we error 382 // out in all other situations. 383 if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) { 384 return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName) 385 } 386 387 if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 { 388 return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'") 389 } 390 391 if size > 0 { 392 if isReadAt(reader) && !isObject(reader) { 393 seeker, ok := reader.(io.Seeker) 394 if ok { 395 offset, err := seeker.Seek(0, io.SeekCurrent) 396 if err != nil { 397 return UploadInfo{}, errInvalidArgument(err.Error()) 398 } 399 reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size) 400 } 401 } 402 } 403 404 var md5Base64 string 405 if opts.SendContentMd5 { 406 // Create a buffer. 407 buf := make([]byte, size) 408 409 length, rErr := readFull(reader, buf) 410 if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF { 411 return UploadInfo{}, rErr 412 } 413 414 // Calculate md5sum. 415 hash := c.md5Hasher() 416 hash.Write(buf[:length]) 417 md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil)) 418 reader = bytes.NewReader(buf[:length]) 419 hash.Close() 420 } 421 422 // Update progress reader appropriately to the latest offset as we 423 // read from the source. 424 readSeeker := newHook(reader, opts.Progress) 425 426 // This function does not calculate sha256 and md5sum for payload. 427 // Execute put object. 428 return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts) 429} 430 431// putObjectDo - executes the put object http operation. 432// NOTE: You must have WRITE permissions on a bucket to add an object to it. 433func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) { 434 // Input validation. 435 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 436 return UploadInfo{}, err 437 } 438 if err := s3utils.CheckValidObjectName(objectName); err != nil { 439 return UploadInfo{}, err 440 } 441 // Set headers. 442 customHeader := opts.Header() 443 444 // Populate request metadata. 445 reqMetadata := requestMetadata{ 446 bucketName: bucketName, 447 objectName: objectName, 448 customHeader: customHeader, 449 contentBody: reader, 450 contentLength: size, 451 contentMD5Base64: md5Base64, 452 contentSHA256Hex: sha256Hex, 453 } 454 if opts.Internal.SourceVersionID != "" { 455 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { 456 return UploadInfo{}, errInvalidArgument(err.Error()) 457 } 458 urlValues := make(url.Values) 459 urlValues.Set("versionId", opts.Internal.SourceVersionID) 460 reqMetadata.queryValues = urlValues 461 } 462 463 // Execute PUT an objectName. 464 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 465 defer closeResponse(resp) 466 if err != nil { 467 return UploadInfo{}, err 468 } 469 if resp != nil { 470 if resp.StatusCode != http.StatusOK { 471 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) 472 } 473 } 474 475 // extract lifecycle expiry date and rule ID 476 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) 477 478 return UploadInfo{ 479 Bucket: bucketName, 480 Key: objectName, 481 ETag: trimEtag(resp.Header.Get("ETag")), 482 VersionID: resp.Header.Get(amzVersionID), 483 Size: size, 484 Expiration: expTime, 485 ExpirationRuleID: ruleID, 486 }, nil 487} 488