1/*
2 * Minio Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2015-2017 Minio, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"bytes"
22	"context"
23	"encoding/base64"
24	"encoding/hex"
25	"encoding/xml"
26	"fmt"
27	"io"
28	"io/ioutil"
29	"net/http"
30	"net/url"
31	"runtime/debug"
32	"sort"
33	"strconv"
34	"strings"
35
36	"github.com/minio/minio-go/pkg/encrypt"
37	"github.com/minio/minio-go/pkg/s3utils"
38)
39
40func (c Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
41	opts PutObjectOptions) (n int64, err error) {
42	n, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
43	if err != nil {
44		errResp := ToErrorResponse(err)
45		// Verify if multipart functionality is not available, if not
46		// fall back to single PutObject operation.
47		if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
48			// Verify if size of reader is greater than '5GiB'.
49			if size > maxSinglePutObjectSize {
50				return 0, ErrEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
51			}
52			// Fall back to uploading as single PutObject operation.
53			return c.putObjectNoChecksum(ctx, bucketName, objectName, reader, size, opts)
54		}
55	}
56	return n, err
57}
58
59func (c Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (n int64, err error) {
60	// Input validation.
61	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
62		return 0, err
63	}
64	if err = s3utils.CheckValidObjectName(objectName); err != nil {
65		return 0, err
66	}
67
68	// Total data read and written to server. should be equal to
69	// 'size' at the end of the call.
70	var totalUploadedSize int64
71
72	// Complete multipart upload.
73	var complMultipartUpload completeMultipartUpload
74
75	// Calculate the optimal parts info for a given size.
76	totalPartsCount, partSize, _, err := optimalPartInfo(-1)
77	if err != nil {
78		return 0, err
79	}
80
81	// Initiate a new multipart upload.
82	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
83	if err != nil {
84		return 0, err
85	}
86
87	defer func() {
88		if err != nil {
89			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
90		}
91	}()
92
93	// Part number always starts with '1'.
94	partNumber := 1
95
96	// Initialize parts uploaded map.
97	partsInfo := make(map[int]ObjectPart)
98
99	// Create a buffer.
100	buf := make([]byte, partSize)
101	defer debug.FreeOSMemory()
102
103	for partNumber <= totalPartsCount {
104		// Choose hash algorithms to be calculated by hashCopyN,
105		// avoid sha256 with non-v4 signature request or
106		// HTTPS connection.
107		hashAlgos, hashSums := c.hashMaterials()
108
109		length, rErr := io.ReadFull(reader, buf)
110		if rErr == io.EOF {
111			break
112		}
113		if rErr != nil && rErr != io.ErrUnexpectedEOF {
114			return 0, rErr
115		}
116
117		// Calculates hash sums while copying partSize bytes into cw.
118		for k, v := range hashAlgos {
119			v.Write(buf[:length])
120			hashSums[k] = v.Sum(nil)
121		}
122
123		// Update progress reader appropriately to the latest offset
124		// as we read from the source.
125		rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
126
127		// Checksums..
128		var (
129			md5Base64 string
130			sha256Hex string
131		)
132		if hashSums["md5"] != nil {
133			md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
134		}
135		if hashSums["sha256"] != nil {
136			sha256Hex = hex.EncodeToString(hashSums["sha256"])
137		}
138
139		// Proceed to upload the part.
140		var objPart ObjectPart
141		objPart, err = c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
142			md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
143		if err != nil {
144			return totalUploadedSize, err
145		}
146
147		// Save successfully uploaded part metadata.
148		partsInfo[partNumber] = objPart
149
150		// Save successfully uploaded size.
151		totalUploadedSize += int64(length)
152
153		// Increment part number.
154		partNumber++
155
156		// For unknown size, Read EOF we break away.
157		// We do not have to upload till totalPartsCount.
158		if rErr == io.EOF {
159			break
160		}
161	}
162
163	// Loop over total uploaded parts to save them in
164	// Parts array before completing the multipart request.
165	for i := 1; i < partNumber; i++ {
166		part, ok := partsInfo[i]
167		if !ok {
168			return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", i))
169		}
170		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
171			ETag:       part.ETag,
172			PartNumber: part.PartNumber,
173		})
174	}
175
176	// Sort all completed parts.
177	sort.Sort(completedParts(complMultipartUpload.Parts))
178	if _, err = c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload); err != nil {
179		return totalUploadedSize, err
180	}
181
182	// Return final size.
183	return totalUploadedSize, nil
184}
185
186// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
187func (c Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
188	// Input validation.
189	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
190		return initiateMultipartUploadResult{}, err
191	}
192	if err := s3utils.CheckValidObjectName(objectName); err != nil {
193		return initiateMultipartUploadResult{}, err
194	}
195
196	// Initialize url queries.
197	urlValues := make(url.Values)
198	urlValues.Set("uploads", "")
199
200	// Set ContentType header.
201	customHeader := opts.Header()
202
203	reqMetadata := requestMetadata{
204		bucketName:   bucketName,
205		objectName:   objectName,
206		queryValues:  urlValues,
207		customHeader: customHeader,
208	}
209
210	// Execute POST on an objectName to initiate multipart upload.
211	resp, err := c.executeMethod(ctx, "POST", reqMetadata)
212	defer closeResponse(resp)
213	if err != nil {
214		return initiateMultipartUploadResult{}, err
215	}
216	if resp != nil {
217		if resp.StatusCode != http.StatusOK {
218			return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
219		}
220	}
221	// Decode xml for new multipart upload.
222	initiateMultipartUploadResult := initiateMultipartUploadResult{}
223	err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
224	if err != nil {
225		return initiateMultipartUploadResult, err
226	}
227	return initiateMultipartUploadResult, nil
228}
229
230// uploadPart - Uploads a part in a multipart upload.
231func (c Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
232	partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) {
233	// Input validation.
234	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
235		return ObjectPart{}, err
236	}
237	if err := s3utils.CheckValidObjectName(objectName); err != nil {
238		return ObjectPart{}, err
239	}
240	if size > maxPartSize {
241		return ObjectPart{}, ErrEntityTooLarge(size, maxPartSize, bucketName, objectName)
242	}
243	if size <= -1 {
244		return ObjectPart{}, ErrEntityTooSmall(size, bucketName, objectName)
245	}
246	if partNumber <= 0 {
247		return ObjectPart{}, ErrInvalidArgument("Part number cannot be negative or equal to zero.")
248	}
249	if uploadID == "" {
250		return ObjectPart{}, ErrInvalidArgument("UploadID cannot be empty.")
251	}
252
253	// Get resources properly escaped and lined up before using them in http request.
254	urlValues := make(url.Values)
255	// Set part number.
256	urlValues.Set("partNumber", strconv.Itoa(partNumber))
257	// Set upload id.
258	urlValues.Set("uploadId", uploadID)
259
260	// Set encryption headers, if any.
261	customHeader := make(http.Header)
262	// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
263	// Server-side encryption is supported by the S3 Multipart Upload actions.
264	// Unless you are using a customer-provided encryption key, you don't need
265	// to specify the encryption parameters in each UploadPart request.
266	if sse != nil && sse.Type() == encrypt.SSEC {
267		sse.Marshal(customHeader)
268	}
269
270	reqMetadata := requestMetadata{
271		bucketName:       bucketName,
272		objectName:       objectName,
273		queryValues:      urlValues,
274		customHeader:     customHeader,
275		contentBody:      reader,
276		contentLength:    size,
277		contentMD5Base64: md5Base64,
278		contentSHA256Hex: sha256Hex,
279	}
280
281	// Execute PUT on each part.
282	resp, err := c.executeMethod(ctx, "PUT", reqMetadata)
283	defer closeResponse(resp)
284	if err != nil {
285		return ObjectPart{}, err
286	}
287	if resp != nil {
288		if resp.StatusCode != http.StatusOK {
289			return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
290		}
291	}
292	// Once successfully uploaded, return completed part.
293	objPart := ObjectPart{}
294	objPart.Size = size
295	objPart.PartNumber = partNumber
296	// Trim off the odd double quotes from ETag in the beginning and end.
297	objPart.ETag = strings.TrimPrefix(resp.Header.Get("ETag"), "\"")
298	objPart.ETag = strings.TrimSuffix(objPart.ETag, "\"")
299	return objPart, nil
300}
301
302// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
303func (c Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
304	complete completeMultipartUpload) (completeMultipartUploadResult, error) {
305	// Input validation.
306	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
307		return completeMultipartUploadResult{}, err
308	}
309	if err := s3utils.CheckValidObjectName(objectName); err != nil {
310		return completeMultipartUploadResult{}, err
311	}
312
313	// Initialize url queries.
314	urlValues := make(url.Values)
315	urlValues.Set("uploadId", uploadID)
316	// Marshal complete multipart body.
317	completeMultipartUploadBytes, err := xml.Marshal(complete)
318	if err != nil {
319		return completeMultipartUploadResult{}, err
320	}
321
322	// Instantiate all the complete multipart buffer.
323	completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
324	reqMetadata := requestMetadata{
325		bucketName:       bucketName,
326		objectName:       objectName,
327		queryValues:      urlValues,
328		contentBody:      completeMultipartUploadBuffer,
329		contentLength:    int64(len(completeMultipartUploadBytes)),
330		contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
331	}
332
333	// Execute POST to complete multipart upload for an objectName.
334	resp, err := c.executeMethod(ctx, "POST", reqMetadata)
335	defer closeResponse(resp)
336	if err != nil {
337		return completeMultipartUploadResult{}, err
338	}
339	if resp != nil {
340		if resp.StatusCode != http.StatusOK {
341			return completeMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
342		}
343	}
344
345	// Read resp.Body into a []bytes to parse for Error response inside the body
346	var b []byte
347	b, err = ioutil.ReadAll(resp.Body)
348	if err != nil {
349		return completeMultipartUploadResult{}, err
350	}
351	// Decode completed multipart upload response on success.
352	completeMultipartUploadResult := completeMultipartUploadResult{}
353	err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
354	if err != nil {
355		// xml parsing failure due to presence an ill-formed xml fragment
356		return completeMultipartUploadResult, err
357	} else if completeMultipartUploadResult.Bucket == "" {
358		// xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
359		// In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
360		// of the members.
361
362		// Decode completed multipart upload response on failure
363		completeMultipartUploadErr := ErrorResponse{}
364		err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
365		if err != nil {
366			// xml parsing failure due to presence an ill-formed xml fragment
367			return completeMultipartUploadResult, err
368		}
369		return completeMultipartUploadResult, completeMultipartUploadErr
370	}
371	return completeMultipartUploadResult, nil
372}
373