1/* 2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2015-2017 MinIO, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "encoding/hex" 25 "encoding/xml" 26 "fmt" 27 "io" 28 "io/ioutil" 29 "net/http" 30 "net/url" 31 "sort" 32 "strconv" 33 "strings" 34 35 "github.com/google/uuid" 36 "github.com/minio/minio-go/v7/pkg/encrypt" 37 "github.com/minio/minio-go/v7/pkg/s3utils" 38) 39 40func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, 41 opts PutObjectOptions) (info UploadInfo, err error) { 42 info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts) 43 if err != nil { 44 errResp := ToErrorResponse(err) 45 // Verify if multipart functionality is not available, if not 46 // fall back to single PutObject operation. 47 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { 48 // Verify if size of reader is greater than '5GiB'. 49 if size > maxSinglePutObjectSize { 50 return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) 51 } 52 // Fall back to uploading as single PutObject operation. 53 return c.putObject(ctx, bucketName, objectName, reader, size, opts) 54 } 55 } 56 return info, err 57} 58 59func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) { 60 // Input validation. 61 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 62 return UploadInfo{}, err 63 } 64 if err = s3utils.CheckValidObjectName(objectName); err != nil { 65 return UploadInfo{}, err 66 } 67 68 // Total data read and written to server. should be equal to 69 // 'size' at the end of the call. 70 var totalUploadedSize int64 71 72 // Complete multipart upload. 73 var complMultipartUpload completeMultipartUpload 74 75 // Calculate the optimal parts info for a given size. 76 totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize) 77 if err != nil { 78 return UploadInfo{}, err 79 } 80 81 // Initiate a new multipart upload. 82 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 83 if err != nil { 84 return UploadInfo{}, err 85 } 86 87 defer func() { 88 if err != nil { 89 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 90 } 91 }() 92 93 // Part number always starts with '1'. 94 partNumber := 1 95 96 // Initialize parts uploaded map. 97 partsInfo := make(map[int]ObjectPart) 98 99 // Create a buffer. 100 buf := make([]byte, partSize) 101 102 for partNumber <= totalPartsCount { 103 // Choose hash algorithms to be calculated by hashCopyN, 104 // avoid sha256 with non-v4 signature request or 105 // HTTPS connection. 106 hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5) 107 108 length, rErr := readFull(reader, buf) 109 if rErr == io.EOF && partNumber > 1 { 110 break 111 } 112 113 if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF { 114 return UploadInfo{}, rErr 115 } 116 117 // Calculates hash sums while copying partSize bytes into cw. 118 for k, v := range hashAlgos { 119 v.Write(buf[:length]) 120 hashSums[k] = v.Sum(nil) 121 v.Close() 122 } 123 124 // Update progress reader appropriately to the latest offset 125 // as we read from the source. 126 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) 127 128 // Checksums.. 129 var ( 130 md5Base64 string 131 sha256Hex string 132 ) 133 if hashSums["md5"] != nil { 134 md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"]) 135 } 136 if hashSums["sha256"] != nil { 137 sha256Hex = hex.EncodeToString(hashSums["sha256"]) 138 } 139 140 // Proceed to upload the part. 141 objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, 142 md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption) 143 if uerr != nil { 144 return UploadInfo{}, uerr 145 } 146 147 // Save successfully uploaded part metadata. 148 partsInfo[partNumber] = objPart 149 150 // Save successfully uploaded size. 151 totalUploadedSize += int64(length) 152 153 // Increment part number. 154 partNumber++ 155 156 // For unknown size, Read EOF we break away. 157 // We do not have to upload till totalPartsCount. 158 if rErr == io.EOF { 159 break 160 } 161 } 162 163 // Loop over total uploaded parts to save them in 164 // Parts array before completing the multipart request. 165 for i := 1; i < partNumber; i++ { 166 part, ok := partsInfo[i] 167 if !ok { 168 return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 169 } 170 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 171 ETag: part.ETag, 172 PartNumber: part.PartNumber, 173 }) 174 } 175 176 // Sort all completed parts. 177 sort.Sort(completedParts(complMultipartUpload.Parts)) 178 179 uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{}) 180 if err != nil { 181 return UploadInfo{}, err 182 } 183 184 uploadInfo.Size = totalUploadedSize 185 return uploadInfo, nil 186} 187 188// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. 189func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) { 190 // Input validation. 191 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 192 return initiateMultipartUploadResult{}, err 193 } 194 if err := s3utils.CheckValidObjectName(objectName); err != nil { 195 return initiateMultipartUploadResult{}, err 196 } 197 198 // Initialize url queries. 199 urlValues := make(url.Values) 200 urlValues.Set("uploads", "") 201 202 if opts.Internal.SourceVersionID != "" { 203 if opts.Internal.SourceVersionID != nullVersionID { 204 if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil { 205 return initiateMultipartUploadResult{}, errInvalidArgument(err.Error()) 206 } 207 } 208 urlValues.Set("versionId", opts.Internal.SourceVersionID) 209 } 210 211 // Set ContentType header. 212 customHeader := opts.Header() 213 214 reqMetadata := requestMetadata{ 215 bucketName: bucketName, 216 objectName: objectName, 217 queryValues: urlValues, 218 customHeader: customHeader, 219 } 220 221 // Execute POST on an objectName to initiate multipart upload. 222 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata) 223 defer closeResponse(resp) 224 if err != nil { 225 return initiateMultipartUploadResult{}, err 226 } 227 if resp != nil { 228 if resp.StatusCode != http.StatusOK { 229 return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) 230 } 231 } 232 // Decode xml for new multipart upload. 233 initiateMultipartUploadResult := initiateMultipartUploadResult{} 234 err = xmlDecoder(resp.Body, &initiateMultipartUploadResult) 235 if err != nil { 236 return initiateMultipartUploadResult, err 237 } 238 return initiateMultipartUploadResult, nil 239} 240 241// uploadPart - Uploads a part in a multipart upload. 242func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader, 243 partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) { 244 // Input validation. 245 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 246 return ObjectPart{}, err 247 } 248 if err := s3utils.CheckValidObjectName(objectName); err != nil { 249 return ObjectPart{}, err 250 } 251 if size > maxPartSize { 252 return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName) 253 } 254 if size <= -1 { 255 return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName) 256 } 257 if partNumber <= 0 { 258 return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.") 259 } 260 if uploadID == "" { 261 return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.") 262 } 263 264 // Get resources properly escaped and lined up before using them in http request. 265 urlValues := make(url.Values) 266 // Set part number. 267 urlValues.Set("partNumber", strconv.Itoa(partNumber)) 268 // Set upload id. 269 urlValues.Set("uploadId", uploadID) 270 271 // Set encryption headers, if any. 272 customHeader := make(http.Header) 273 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html 274 // Server-side encryption is supported by the S3 Multipart Upload actions. 275 // Unless you are using a customer-provided encryption key, you don't need 276 // to specify the encryption parameters in each UploadPart request. 277 if sse != nil && sse.Type() == encrypt.SSEC { 278 sse.Marshal(customHeader) 279 } 280 281 reqMetadata := requestMetadata{ 282 bucketName: bucketName, 283 objectName: objectName, 284 queryValues: urlValues, 285 customHeader: customHeader, 286 contentBody: reader, 287 contentLength: size, 288 contentMD5Base64: md5Base64, 289 contentSHA256Hex: sha256Hex, 290 } 291 292 // Execute PUT on each part. 293 resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata) 294 defer closeResponse(resp) 295 if err != nil { 296 return ObjectPart{}, err 297 } 298 if resp != nil { 299 if resp.StatusCode != http.StatusOK { 300 return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName) 301 } 302 } 303 // Once successfully uploaded, return completed part. 304 objPart := ObjectPart{} 305 objPart.Size = size 306 objPart.PartNumber = partNumber 307 // Trim off the odd double quotes from ETag in the beginning and end. 308 objPart.ETag = trimEtag(resp.Header.Get("ETag")) 309 return objPart, nil 310} 311 312// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. 313func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string, 314 complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error) { 315 // Input validation. 316 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 317 return UploadInfo{}, err 318 } 319 if err := s3utils.CheckValidObjectName(objectName); err != nil { 320 return UploadInfo{}, err 321 } 322 323 // Initialize url queries. 324 urlValues := make(url.Values) 325 urlValues.Set("uploadId", uploadID) 326 // Marshal complete multipart body. 327 completeMultipartUploadBytes, err := xml.Marshal(complete) 328 if err != nil { 329 return UploadInfo{}, err 330 } 331 332 // Instantiate all the complete multipart buffer. 333 completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes) 334 reqMetadata := requestMetadata{ 335 bucketName: bucketName, 336 objectName: objectName, 337 queryValues: urlValues, 338 contentBody: completeMultipartUploadBuffer, 339 contentLength: int64(len(completeMultipartUploadBytes)), 340 contentSHA256Hex: sum256Hex(completeMultipartUploadBytes), 341 customHeader: opts.Header(), 342 } 343 344 // Execute POST to complete multipart upload for an objectName. 345 resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata) 346 defer closeResponse(resp) 347 if err != nil { 348 return UploadInfo{}, err 349 } 350 if resp != nil { 351 if resp.StatusCode != http.StatusOK { 352 return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName) 353 } 354 } 355 356 // Read resp.Body into a []bytes to parse for Error response inside the body 357 var b []byte 358 b, err = ioutil.ReadAll(resp.Body) 359 if err != nil { 360 return UploadInfo{}, err 361 } 362 // Decode completed multipart upload response on success. 363 completeMultipartUploadResult := completeMultipartUploadResult{} 364 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult) 365 if err != nil { 366 // xml parsing failure due to presence an ill-formed xml fragment 367 return UploadInfo{}, err 368 } else if completeMultipartUploadResult.Bucket == "" { 369 // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied. 370 // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values 371 // of the members. 372 373 // Decode completed multipart upload response on failure 374 completeMultipartUploadErr := ErrorResponse{} 375 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr) 376 if err != nil { 377 // xml parsing failure due to presence an ill-formed xml fragment 378 return UploadInfo{}, err 379 } 380 return UploadInfo{}, completeMultipartUploadErr 381 } 382 383 // extract lifecycle expiry date and rule ID 384 expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration)) 385 386 return UploadInfo{ 387 Bucket: completeMultipartUploadResult.Bucket, 388 Key: completeMultipartUploadResult.Key, 389 ETag: trimEtag(completeMultipartUploadResult.ETag), 390 VersionID: resp.Header.Get(amzVersionID), 391 Location: completeMultipartUploadResult.Location, 392 Expiration: expTime, 393 ExpirationRuleID: ruleID, 394 }, nil 395 396} 397