1/* 2 * Minio Go Library for Amazon S3 Compatible Cloud Storage 3 * Copyright 2015-2017 Minio, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18package minio 19 20import ( 21 "bytes" 22 "context" 23 "encoding/base64" 24 "encoding/hex" 25 "encoding/xml" 26 "fmt" 27 "io" 28 "io/ioutil" 29 "net/http" 30 "net/url" 31 "runtime/debug" 32 "sort" 33 "strconv" 34 "strings" 35 36 "github.com/minio/minio-go/pkg/encrypt" 37 "github.com/minio/minio-go/pkg/s3utils" 38) 39 40func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, 41 opts PutObjectOptions) (n int64, err error) { 42 n, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts) 43 if err != nil { 44 errResp := ToErrorResponse(err) 45 // Verify if multipart functionality is not available, if not 46 // fall back to single PutObject operation. 47 if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") { 48 // Verify if size of reader is greater than '5GiB'. 49 if size > maxSinglePutObjectSize { 50 return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName) 51 } 52 // Fall back to uploading as single PutObject operation. 53 return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts) 54 } 55 } 56 return n, err 57} 58 59func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) { 60 // Input validation. 61 if err = s3utils.CheckValidBucketName(bucketName); err != nil { 62 return 0, err 63 } 64 if err = s3utils.CheckValidObjectName(objectName); err != nil { 65 return 0, err 66 } 67 68 // Total data read and written to server. should be equal to 69 // 'size' at the end of the call. 70 var totalUploadedSize int64 71 72 // Complete multipart upload. 73 var complMultipartUpload completeMultipartUpload 74 75 // Calculate the optimal parts info for a given size. 76 totalPartsCount, partSize, _, err := optimalPartInfo(-1) 77 if err != nil { 78 return 0, err 79 } 80 81 // Initiate a new multipart upload. 82 uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts) 83 if err != nil { 84 return 0, err 85 } 86 87 defer func() { 88 if err != nil { 89 c.abortMultipartUpload(ctx, bucketName, objectName, uploadID) 90 } 91 }() 92 93 // Part number always starts with '1'. 94 partNumber := 1 95 96 // Initialize parts uploaded map. 97 partsInfo := make(map[int]ObjectPart) 98 99 // Create a buffer. 100 buf := make([]byte, partSize) 101 defer debug.FreeOSMemory() 102 103 for partNumber <= totalPartsCount { 104 // Choose hash algorithms to be calculated by hashCopyN, 105 // avoid sha256 with non-v4 signature request or 106 // HTTPS connection. 107 hashAlgos, hashSums := c.hashMaterials() 108 109 length, rErr := io.ReadFull(reader, buf) 110 if rErr == io.EOF { 111 break 112 } 113 if rErr != nil && rErr != io.ErrUnexpectedEOF { 114 return 0, rErr 115 } 116 117 // Calculates hash sums while copying partSize bytes into cw. 118 for k, v := range hashAlgos { 119 v.Write(buf[:length]) 120 hashSums[k] = v.Sum(nil) 121 } 122 123 // Update progress reader appropriately to the latest offset 124 // as we read from the source. 125 rd := newHook(bytes.NewReader(buf[:length]), opts.Progress) 126 127 // Checksums.. 128 var ( 129 md5Base64 string 130 sha256Hex string 131 ) 132 if hashSums["md5"] != nil { 133 md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"]) 134 } 135 if hashSums["sha256"] != nil { 136 sha256Hex = hex.EncodeToString(hashSums["sha256"]) 137 } 138 139 // Proceed to upload the part. 140 var objPart ObjectPart 141 objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, 142 md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption) 143 if err != nil { 144 return totalUploadedSize, err 145 } 146 147 // Save successfully uploaded part metadata. 148 partsInfo[partNumber] = objPart 149 150 // Save successfully uploaded size. 151 totalUploadedSize += int64(length) 152 153 // Increment part number. 154 partNumber++ 155 156 // For unknown size, Read EOF we break away. 157 // We do not have to upload till totalPartsCount. 158 if rErr == io.EOF { 159 break 160 } 161 } 162 163 // Loop over total uploaded parts to save them in 164 // Parts array before completing the multipart request. 165 for i := 1; i < partNumber; i++ { 166 part, ok := partsInfo[i] 167 if !ok { 168 return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i)) 169 } 170 complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{ 171 ETag: part.ETag, 172 PartNumber: part.PartNumber, 173 }) 174 } 175 176 // Sort all completed parts. 177 sort.Sort(completedParts(complMultipartUpload.Parts)) 178 if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil { 179 return totalUploadedSize, err 180 } 181 182 // Return final size. 183 return totalUploadedSize, nil 184} 185 186// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID. 187func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) { 188 // Input validation. 189 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 190 return initiateMultipartUploadResult{}, err 191 } 192 if err := s3utils.CheckValidObjectName(objectName); err != nil { 193 return initiateMultipartUploadResult{}, err 194 } 195 196 // Initialize url queries. 197 urlValues := make(url.Values) 198 urlValues.Set("uploads", "") 199 200 // Set ContentType header. 201 customHeader := opts.Header() 202 203 reqMetadata := requestMetadata{ 204 bucketName: bucketName, 205 objectName: objectName, 206 queryValues: urlValues, 207 customHeader: customHeader, 208 } 209 210 // Execute POST on an objectName to initiate multipart upload. 211 resp, err := c.executeMethod(ctx, "POST", reqMetadata) 212 defer closeResponse(resp) 213 if err != nil { 214 return initiateMultipartUploadResult{}, err 215 } 216 if resp != nil { 217 if resp.StatusCode != http.StatusOK { 218 return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) 219 } 220 } 221 // Decode xml for new multipart upload. 222 initiateMultipartUploadResult := initiateMultipartUploadResult{} 223 err = xmlDecoder(resp.Body, &initiateMultipartUploadResult) 224 if err != nil { 225 return initiateMultipartUploadResult, err 226 } 227 return initiateMultipartUploadResult, nil 228} 229 230// uploadPart - Uploads a part in a multipart upload. 231func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader, 232 partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) { 233 // Input validation. 234 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 235 return ObjectPart{}, err 236 } 237 if err := s3utils.CheckValidObjectName(objectName); err != nil { 238 return ObjectPart{}, err 239 } 240 if size > maxPartSize { 241 return ObjectPart{}, ErrEntityTooLarge(size, maxPartSize, bucketName, objectName) 242 } 243 if size <= -1 { 244 return ObjectPart{}, ErrEntityTooSmall(size, bucketName, objectName) 245 } 246 if partNumber <= 0 { 247 return ObjectPart{}, ErrInvalidArgument("Part number cannot be negative or equal to zero.") 248 } 249 if uploadID == "" { 250 return ObjectPart{}, ErrInvalidArgument("UploadID cannot be empty.") 251 } 252 253 // Get resources properly escaped and lined up before using them in http request. 254 urlValues := make(url.Values) 255 // Set part number. 256 urlValues.Set("partNumber", strconv.Itoa(partNumber)) 257 // Set upload id. 258 urlValues.Set("uploadId", uploadID) 259 260 // Set encryption headers, if any. 261 customHeader := make(http.Header) 262 // https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html 263 // Server-side encryption is supported by the S3 Multipart Upload actions. 264 // Unless you are using a customer-provided encryption key, you don't need 265 // to specify the encryption parameters in each UploadPart request. 266 if sse != nil && sse.Type() == encrypt.SSEC { 267 sse.Marshal(customHeader) 268 } 269 270 reqMetadata := requestMetadata{ 271 bucketName: bucketName, 272 objectName: objectName, 273 queryValues: urlValues, 274 customHeader: customHeader, 275 contentBody: reader, 276 contentLength: size, 277 contentMD5Base64: md5Base64, 278 contentSHA256Hex: sha256Hex, 279 } 280 281 // Execute PUT on each part. 282 resp, err := c.executeMethod(ctx, "PUT", reqMetadata) 283 defer closeResponse(resp) 284 if err != nil { 285 return ObjectPart{}, err 286 } 287 if resp != nil { 288 if resp.StatusCode != http.StatusOK { 289 return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName) 290 } 291 } 292 // Once successfully uploaded, return completed part. 293 objPart := ObjectPart{} 294 objPart.Size = size 295 objPart.PartNumber = partNumber 296 // Trim off the odd double quotes from ETag in the beginning and end. 297 objPart.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"") 298 objPart.ETag = strings.TrimSuffix(objPart.ETag, "\"") 299 return objPart, nil 300} 301 302// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts. 303func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string, 304 complete completeMultipartUpload) (completeMultipartUploadResult, error) { 305 // Input validation. 306 if err := s3utils.CheckValidBucketName(bucketName); err != nil { 307 return completeMultipartUploadResult{}, err 308 } 309 if err := s3utils.CheckValidObjectName(objectName); err != nil { 310 return completeMultipartUploadResult{}, err 311 } 312 313 // Initialize url queries. 314 urlValues := make(url.Values) 315 urlValues.Set("uploadId", uploadID) 316 // Marshal complete multipart body. 317 completeMultipartUploadBytes, err := xml.Marshal(complete) 318 if err != nil { 319 return completeMultipartUploadResult{}, err 320 } 321 322 // Instantiate all the complete multipart buffer. 323 completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes) 324 reqMetadata := requestMetadata{ 325 bucketName: bucketName, 326 objectName: objectName, 327 queryValues: urlValues, 328 contentBody: completeMultipartUploadBuffer, 329 contentLength: int64(len(completeMultipartUploadBytes)), 330 contentSHA256Hex: sum256Hex(completeMultipartUploadBytes), 331 } 332 333 // Execute POST to complete multipart upload for an objectName. 334 resp, err := c.executeMethod(ctx, "POST", reqMetadata) 335 defer closeResponse(resp) 336 if err != nil { 337 return completeMultipartUploadResult{}, err 338 } 339 if resp != nil { 340 if resp.StatusCode != http.StatusOK { 341 return completeMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName) 342 } 343 } 344 345 // Read resp.Body into a []bytes to parse for Error response inside the body 346 var b []byte 347 b, err = ioutil.ReadAll(resp.Body) 348 if err != nil { 349 return completeMultipartUploadResult{}, err 350 } 351 // Decode completed multipart upload response on success. 352 completeMultipartUploadResult := completeMultipartUploadResult{} 353 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult) 354 if err != nil { 355 // xml parsing failure due to presence an ill-formed xml fragment 356 return completeMultipartUploadResult, err 357 } else if completeMultipartUploadResult.Bucket == "" { 358 // xml's Decode method ignores well-formed xml that don't apply to the type of value supplied. 359 // In this case, it would leave completeMultipartUploadResult with the corresponding zero-values 360 // of the members. 361 362 // Decode completed multipart upload response on failure 363 completeMultipartUploadErr := ErrorResponse{} 364 err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr) 365 if err != nil { 366 // xml parsing failure due to presence an ill-formed xml fragment 367 return completeMultipartUploadResult, err 368 } 369 return completeMultipartUploadResult, completeMultipartUploadErr 370 } 371 return completeMultipartUploadResult, nil 372} 373