1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2015-2017 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"bytes"
22	"context"
23	"encoding/base64"
24	"encoding/hex"
25	"encoding/xml"
26	"fmt"
27	"io"
28	"io/ioutil"
29	"net/http"
30	"net/url"
31	"sort"
32	"strconv"
33	"strings"
34
35	"github.com/google/uuid"
36	"github.com/minio/minio-go/v7/pkg/encrypt"
37	"github.com/minio/minio-go/v7/pkg/s3utils"
38)
39
40func (c *Client) putObjectMultipart(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64,
41	opts PutObjectOptions) (info UploadInfo, err error) {
42	info, err = c.putObjectMultipartNoStream(ctx, bucketName, objectName, reader, opts)
43	if err != nil {
44		errResp := ToErrorResponse(err)
45		// Verify if multipart functionality is not available, if not
46		// fall back to single PutObject operation.
47		if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
48			// Verify if size of reader is greater than '5GiB'.
49			if size > maxSinglePutObjectSize {
50				return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
51			}
52			// Fall back to uploading as single PutObject operation.
53			return c.putObject(ctx, bucketName, objectName, reader, size, opts)
54		}
55	}
56	return info, err
57}
58
59func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, objectName string, reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
60	// Input validation.
61	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
62		return UploadInfo{}, err
63	}
64	if err = s3utils.CheckValidObjectName(objectName); err != nil {
65		return UploadInfo{}, err
66	}
67
68	// Total data read and written to server. should be equal to
69	// 'size' at the end of the call.
70	var totalUploadedSize int64
71
72	// Complete multipart upload.
73	var complMultipartUpload completeMultipartUpload
74
75	// Calculate the optimal parts info for a given size.
76	totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
77	if err != nil {
78		return UploadInfo{}, err
79	}
80
81	// Initiate a new multipart upload.
82	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
83	if err != nil {
84		return UploadInfo{}, err
85	}
86
87	defer func() {
88		if err != nil {
89			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
90		}
91	}()
92
93	// Part number always starts with '1'.
94	partNumber := 1
95
96	// Initialize parts uploaded map.
97	partsInfo := make(map[int]ObjectPart)
98
99	// Create a buffer.
100	buf := make([]byte, partSize)
101
102	for partNumber <= totalPartsCount {
103		// Choose hash algorithms to be calculated by hashCopyN,
104		// avoid sha256 with non-v4 signature request or
105		// HTTPS connection.
106		hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5)
107
108		length, rErr := readFull(reader, buf)
109		if rErr == io.EOF && partNumber > 1 {
110			break
111		}
112
113		if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
114			return UploadInfo{}, rErr
115		}
116
117		// Calculates hash sums while copying partSize bytes into cw.
118		for k, v := range hashAlgos {
119			v.Write(buf[:length])
120			hashSums[k] = v.Sum(nil)
121			v.Close()
122		}
123
124		// Update progress reader appropriately to the latest offset
125		// as we read from the source.
126		rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)
127
128		// Checksums..
129		var (
130			md5Base64 string
131			sha256Hex string
132		)
133		if hashSums["md5"] != nil {
134			md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
135		}
136		if hashSums["sha256"] != nil {
137			sha256Hex = hex.EncodeToString(hashSums["sha256"])
138		}
139
140		// Proceed to upload the part.
141		objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
142			md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption)
143		if uerr != nil {
144			return UploadInfo{}, uerr
145		}
146
147		// Save successfully uploaded part metadata.
148		partsInfo[partNumber] = objPart
149
150		// Save successfully uploaded size.
151		totalUploadedSize += int64(length)
152
153		// Increment part number.
154		partNumber++
155
156		// For unknown size, Read EOF we break away.
157		// We do not have to upload till totalPartsCount.
158		if rErr == io.EOF {
159			break
160		}
161	}
162
163	// Loop over total uploaded parts to save them in
164	// Parts array before completing the multipart request.
165	for i := 1; i < partNumber; i++ {
166		part, ok := partsInfo[i]
167		if !ok {
168			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
169		}
170		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
171			ETag:       part.ETag,
172			PartNumber: part.PartNumber,
173		})
174	}
175
176	// Sort all completed parts.
177	sort.Sort(completedParts(complMultipartUpload.Parts))
178
179	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
180	if err != nil {
181		return UploadInfo{}, err
182	}
183
184	uploadInfo.Size = totalUploadedSize
185	return uploadInfo, nil
186}
187
188// initiateMultipartUpload - Initiates a multipart upload and returns an upload ID.
189func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, objectName string, opts PutObjectOptions) (initiateMultipartUploadResult, error) {
190	// Input validation.
191	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
192		return initiateMultipartUploadResult{}, err
193	}
194	if err := s3utils.CheckValidObjectName(objectName); err != nil {
195		return initiateMultipartUploadResult{}, err
196	}
197
198	// Initialize url queries.
199	urlValues := make(url.Values)
200	urlValues.Set("uploads", "")
201
202	if opts.Internal.SourceVersionID != "" {
203		if opts.Internal.SourceVersionID != nullVersionID {
204			if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
205				return initiateMultipartUploadResult{}, errInvalidArgument(err.Error())
206			}
207		}
208		urlValues.Set("versionId", opts.Internal.SourceVersionID)
209	}
210
211	// Set ContentType header.
212	customHeader := opts.Header()
213
214	reqMetadata := requestMetadata{
215		bucketName:   bucketName,
216		objectName:   objectName,
217		queryValues:  urlValues,
218		customHeader: customHeader,
219	}
220
221	// Execute POST on an objectName to initiate multipart upload.
222	resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
223	defer closeResponse(resp)
224	if err != nil {
225		return initiateMultipartUploadResult{}, err
226	}
227	if resp != nil {
228		if resp.StatusCode != http.StatusOK {
229			return initiateMultipartUploadResult{}, httpRespToErrorResponse(resp, bucketName, objectName)
230		}
231	}
232	// Decode xml for new multipart upload.
233	initiateMultipartUploadResult := initiateMultipartUploadResult{}
234	err = xmlDecoder(resp.Body, &initiateMultipartUploadResult)
235	if err != nil {
236		return initiateMultipartUploadResult, err
237	}
238	return initiateMultipartUploadResult, nil
239}
240
241// uploadPart - Uploads a part in a multipart upload.
242func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
243	partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide) (ObjectPart, error) {
244	// Input validation.
245	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
246		return ObjectPart{}, err
247	}
248	if err := s3utils.CheckValidObjectName(objectName); err != nil {
249		return ObjectPart{}, err
250	}
251	if size > maxPartSize {
252		return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName)
253	}
254	if size <= -1 {
255		return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName)
256	}
257	if partNumber <= 0 {
258		return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
259	}
260	if uploadID == "" {
261		return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
262	}
263
264	// Get resources properly escaped and lined up before using them in http request.
265	urlValues := make(url.Values)
266	// Set part number.
267	urlValues.Set("partNumber", strconv.Itoa(partNumber))
268	// Set upload id.
269	urlValues.Set("uploadId", uploadID)
270
271	// Set encryption headers, if any.
272	customHeader := make(http.Header)
273	// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
274	// Server-side encryption is supported by the S3 Multipart Upload actions.
275	// Unless you are using a customer-provided encryption key, you don't need
276	// to specify the encryption parameters in each UploadPart request.
277	if sse != nil && sse.Type() == encrypt.SSEC {
278		sse.Marshal(customHeader)
279	}
280
281	reqMetadata := requestMetadata{
282		bucketName:       bucketName,
283		objectName:       objectName,
284		queryValues:      urlValues,
285		customHeader:     customHeader,
286		contentBody:      reader,
287		contentLength:    size,
288		contentMD5Base64: md5Base64,
289		contentSHA256Hex: sha256Hex,
290	}
291
292	// Execute PUT on each part.
293	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
294	defer closeResponse(resp)
295	if err != nil {
296		return ObjectPart{}, err
297	}
298	if resp != nil {
299		if resp.StatusCode != http.StatusOK {
300			return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
301		}
302	}
303	// Once successfully uploaded, return completed part.
304	objPart := ObjectPart{}
305	objPart.Size = size
306	objPart.PartNumber = partNumber
307	// Trim off the odd double quotes from ETag in the beginning and end.
308	objPart.ETag = trimEtag(resp.Header.Get("ETag"))
309	return objPart, nil
310}
311
312// completeMultipartUpload - Completes a multipart upload by assembling previously uploaded parts.
313func (c *Client) completeMultipartUpload(ctx context.Context, bucketName, objectName, uploadID string,
314	complete completeMultipartUpload, opts PutObjectOptions) (UploadInfo, error) {
315	// Input validation.
316	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
317		return UploadInfo{}, err
318	}
319	if err := s3utils.CheckValidObjectName(objectName); err != nil {
320		return UploadInfo{}, err
321	}
322
323	// Initialize url queries.
324	urlValues := make(url.Values)
325	urlValues.Set("uploadId", uploadID)
326	// Marshal complete multipart body.
327	completeMultipartUploadBytes, err := xml.Marshal(complete)
328	if err != nil {
329		return UploadInfo{}, err
330	}
331
332	// Instantiate all the complete multipart buffer.
333	completeMultipartUploadBuffer := bytes.NewReader(completeMultipartUploadBytes)
334	reqMetadata := requestMetadata{
335		bucketName:       bucketName,
336		objectName:       objectName,
337		queryValues:      urlValues,
338		contentBody:      completeMultipartUploadBuffer,
339		contentLength:    int64(len(completeMultipartUploadBytes)),
340		contentSHA256Hex: sum256Hex(completeMultipartUploadBytes),
341		customHeader:     opts.Header(),
342	}
343
344	// Execute POST to complete multipart upload for an objectName.
345	resp, err := c.executeMethod(ctx, http.MethodPost, reqMetadata)
346	defer closeResponse(resp)
347	if err != nil {
348		return UploadInfo{}, err
349	}
350	if resp != nil {
351		if resp.StatusCode != http.StatusOK {
352			return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
353		}
354	}
355
356	// Read resp.Body into a []bytes to parse for Error response inside the body
357	var b []byte
358	b, err = ioutil.ReadAll(resp.Body)
359	if err != nil {
360		return UploadInfo{}, err
361	}
362	// Decode completed multipart upload response on success.
363	completeMultipartUploadResult := completeMultipartUploadResult{}
364	err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadResult)
365	if err != nil {
366		// xml parsing failure due to presence an ill-formed xml fragment
367		return UploadInfo{}, err
368	} else if completeMultipartUploadResult.Bucket == "" {
369		// xml's Decode method ignores well-formed xml that don't apply to the type of value supplied.
370		// In this case, it would leave completeMultipartUploadResult with the corresponding zero-values
371		// of the members.
372
373		// Decode completed multipart upload response on failure
374		completeMultipartUploadErr := ErrorResponse{}
375		err = xmlDecoder(bytes.NewReader(b), &completeMultipartUploadErr)
376		if err != nil {
377			// xml parsing failure due to presence an ill-formed xml fragment
378			return UploadInfo{}, err
379		}
380		return UploadInfo{}, completeMultipartUploadErr
381	}
382
383	// extract lifecycle expiry date and rule ID
384	expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
385
386	return UploadInfo{
387		Bucket:           completeMultipartUploadResult.Bucket,
388		Key:              completeMultipartUploadResult.Key,
389		ETag:             trimEtag(completeMultipartUploadResult.ETag),
390		VersionID:        resp.Header.Get(amzVersionID),
391		Location:         completeMultipartUploadResult.Location,
392		Expiration:       expTime,
393		ExpirationRuleID: ruleID,
394	}, nil
395
396}
397