1/*
2 * MinIO Go Library for Amazon S3 Compatible Cloud Storage
3 * Copyright 2017 MinIO, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18package minio
19
20import (
21	"bytes"
22	"context"
23	"encoding/base64"
24	"fmt"
25	"io"
26	"net/http"
27	"net/url"
28	"sort"
29	"strings"
30
31	"github.com/google/uuid"
32	"github.com/minio/minio-go/v7/pkg/s3utils"
33)
34
35// putObjectMultipartStream - upload a large object using
36// multipart upload and streaming signature for signing payload.
37// Comprehensive put object operation involving multipart uploads.
38//
39// Following code handles these types of readers.
40//
41//  - *minio.Object
42//  - Any reader which has a method 'ReadAt()'
43//
44func (c Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
45	reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
46
47	if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
48		// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
49		info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
50	} else {
51		info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts)
52	}
53	if err != nil {
54		errResp := ToErrorResponse(err)
55		// Verify if multipart functionality is not available, if not
56		// fall back to single PutObject operation.
57		if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
58			// Verify if size of reader is greater than '5GiB'.
59			if size > maxSinglePutObjectSize {
60				return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
61			}
62			// Fall back to uploading as single PutObject operation.
63			return c.putObject(ctx, bucketName, objectName, reader, size, opts)
64		}
65	}
66	return info, err
67}
68
69// uploadedPartRes - the response received from a part upload.
70type uploadedPartRes struct {
71	Error   error // Any error encountered while uploading the part.
72	PartNum int   // Number of the part uploaded.
73	Size    int64 // Size of the part uploaded.
74	Part    ObjectPart
75}
76
77type uploadPartReq struct {
78	PartNum int        // Number of the part uploaded.
79	Part    ObjectPart // Size of the part uploaded.
80}
81
82// putObjectMultipartFromReadAt - Uploads files bigger than 128MiB.
83// Supports all readers which implements io.ReaderAt interface
84// (ReadAt method).
85//
86// NOTE: This function is meant to be used for all readers which
87// implement io.ReaderAt which allows us for resuming multipart
88// uploads but reading at an offset, which would avoid re-read the
89// data which was already uploaded. Internally this function uses
90// temporary files for staging all the data, these temporary files are
91// cleaned automatically when the caller i.e http client closes the
92// stream after uploading all the contents successfully.
93func (c Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
94	reader io.ReaderAt, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
95	// Input validation.
96	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
97		return UploadInfo{}, err
98	}
99	if err = s3utils.CheckValidObjectName(objectName); err != nil {
100		return UploadInfo{}, err
101	}
102
103	// Calculate the optimal parts info for a given size.
104	totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size, opts.PartSize)
105	if err != nil {
106		return UploadInfo{}, err
107	}
108
109	// Initiate a new multipart upload.
110	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
111	if err != nil {
112		return UploadInfo{}, err
113	}
114
115	// Aborts the multipart upload in progress, if the
116	// function returns any error, since we do not resume
117	// we should purge the parts which have been uploaded
118	// to relinquish storage space.
119	defer func() {
120		if err != nil {
121			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
122		}
123	}()
124
125	// Total data read and written to server. should be equal to 'size' at the end of the call.
126	var totalUploadedSize int64
127
128	// Complete multipart upload.
129	var complMultipartUpload completeMultipartUpload
130
131	// Declare a channel that sends the next part number to be uploaded.
132	// Buffered to 10000 because thats the maximum number of parts allowed
133	// by S3.
134	uploadPartsCh := make(chan uploadPartReq, 10000)
135
136	// Declare a channel that sends back the response of a part upload.
137	// Buffered to 10000 because thats the maximum number of parts allowed
138	// by S3.
139	uploadedPartsCh := make(chan uploadedPartRes, 10000)
140
141	// Used for readability, lastPartNumber is always totalPartsCount.
142	lastPartNumber := totalPartsCount
143
144	// Send each part number to the channel to be processed.
145	for p := 1; p <= totalPartsCount; p++ {
146		uploadPartsCh <- uploadPartReq{PartNum: p}
147	}
148	close(uploadPartsCh)
149
150	var partsBuf = make([][]byte, opts.getNumThreads())
151	for i := range partsBuf {
152		partsBuf[i] = make([]byte, 0, partSize)
153	}
154
155	// Receive each part number from the channel allowing three parallel uploads.
156	for w := 1; w <= opts.getNumThreads(); w++ {
157		go func(w int, partSize int64) {
158			// Each worker will draw from the part channel and upload in parallel.
159			for uploadReq := range uploadPartsCh {
160
161				// If partNumber was not uploaded we calculate the missing
162				// part offset and size. For all other part numbers we
163				// calculate offset based on multiples of partSize.
164				readOffset := int64(uploadReq.PartNum-1) * partSize
165
166				// As a special case if partNumber is lastPartNumber, we
167				// calculate the offset based on the last part size.
168				if uploadReq.PartNum == lastPartNumber {
169					readOffset = (size - lastPartSize)
170					partSize = lastPartSize
171				}
172
173				n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize])
174				if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
175					uploadedPartsCh <- uploadedPartRes{
176						Error: rerr,
177					}
178					// Exit the goroutine.
179					return
180				}
181
182				// Get a section reader on a particular offset.
183				hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress)
184
185				// Proceed to upload the part.
186				objPart, err := c.uploadPart(ctx, bucketName, objectName,
187					uploadID, hookReader, uploadReq.PartNum,
188					"", "", partSize, opts.ServerSideEncryption)
189				if err != nil {
190					uploadedPartsCh <- uploadedPartRes{
191						Error: err,
192					}
193					// Exit the goroutine.
194					return
195				}
196
197				// Save successfully uploaded part metadata.
198				uploadReq.Part = objPart
199
200				// Send successful part info through the channel.
201				uploadedPartsCh <- uploadedPartRes{
202					Size:    objPart.Size,
203					PartNum: uploadReq.PartNum,
204					Part:    uploadReq.Part,
205				}
206			}
207		}(w, partSize)
208	}
209
210	// Gather the responses as they occur and update any
211	// progress bar.
212	for u := 1; u <= totalPartsCount; u++ {
213		uploadRes := <-uploadedPartsCh
214		if uploadRes.Error != nil {
215			return UploadInfo{}, uploadRes.Error
216		}
217		// Update the totalUploadedSize.
218		totalUploadedSize += uploadRes.Size
219		// Store the parts to be completed in order.
220		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
221			ETag:       uploadRes.Part.ETag,
222			PartNumber: uploadRes.Part.PartNumber,
223		})
224	}
225
226	// Verify if we uploaded all the data.
227	if totalUploadedSize != size {
228		return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
229	}
230
231	// Sort all completed parts.
232	sort.Sort(completedParts(complMultipartUpload.Parts))
233
234	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
235	if err != nil {
236		return UploadInfo{}, err
237	}
238
239	uploadInfo.Size = totalUploadedSize
240	return uploadInfo, nil
241}
242
243func (c Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string,
244	reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
245	// Input validation.
246	if err = s3utils.CheckValidBucketName(bucketName); err != nil {
247		return UploadInfo{}, err
248	}
249	if err = s3utils.CheckValidObjectName(objectName); err != nil {
250		return UploadInfo{}, err
251	}
252
253	// Calculate the optimal parts info for a given size.
254	totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size, opts.PartSize)
255	if err != nil {
256		return UploadInfo{}, err
257	}
258	// Initiates a new multipart request
259	uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
260	if err != nil {
261		return UploadInfo{}, err
262	}
263
264	// Aborts the multipart upload if the function returns
265	// any error, since we do not resume we should purge
266	// the parts which have been uploaded to relinquish
267	// storage space.
268	defer func() {
269		if err != nil {
270			c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
271		}
272	}()
273
274	// Total data read and written to server. should be equal to 'size' at the end of the call.
275	var totalUploadedSize int64
276
277	// Initialize parts uploaded map.
278	partsInfo := make(map[int]ObjectPart)
279
280	// Create a buffer.
281	buf := make([]byte, partSize)
282
283	// Avoid declaring variables in the for loop
284	var md5Base64 string
285	var hookReader io.Reader
286
287	// Part number always starts with '1'.
288	var partNumber int
289	for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
290
291		// Proceed to upload the part.
292		if partNumber == totalPartsCount {
293			partSize = lastPartSize
294		}
295
296		if opts.SendContentMd5 {
297			length, rerr := readFull(reader, buf)
298			if rerr == io.EOF && partNumber > 1 {
299				break
300			}
301
302			if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
303				return UploadInfo{}, rerr
304			}
305
306			// Calculate md5sum.
307			hash := c.md5Hasher()
308			hash.Write(buf[:length])
309			md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
310			hash.Close()
311
312			// Update progress reader appropriately to the latest offset
313			// as we read from the source.
314			hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress)
315		} else {
316			// Update progress reader appropriately to the latest offset
317			// as we read from the source.
318			hookReader = newHook(reader, opts.Progress)
319		}
320
321		objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
322			io.LimitReader(hookReader, partSize),
323			partNumber, md5Base64, "", partSize, opts.ServerSideEncryption)
324		if uerr != nil {
325			return UploadInfo{}, uerr
326		}
327
328		// Save successfully uploaded part metadata.
329		partsInfo[partNumber] = objPart
330
331		// Save successfully uploaded size.
332		totalUploadedSize += partSize
333	}
334
335	// Verify if we uploaded all the data.
336	if size > 0 {
337		if totalUploadedSize != size {
338			return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
339		}
340	}
341
342	// Complete multipart upload.
343	var complMultipartUpload completeMultipartUpload
344
345	// Loop over total uploaded parts to save them in
346	// Parts array before completing the multipart request.
347	for i := 1; i < partNumber; i++ {
348		part, ok := partsInfo[i]
349		if !ok {
350			return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
351		}
352		complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
353			ETag:       part.ETag,
354			PartNumber: part.PartNumber,
355		})
356	}
357
358	// Sort all completed parts.
359	sort.Sort(completedParts(complMultipartUpload.Parts))
360
361	uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload)
362	if err != nil {
363		return UploadInfo{}, err
364	}
365
366	uploadInfo.Size = totalUploadedSize
367	return uploadInfo, nil
368}
369
370// putObject special function used Google Cloud Storage. This special function
371// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
372func (c Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
373	// Input validation.
374	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
375		return UploadInfo{}, err
376	}
377	if err := s3utils.CheckValidObjectName(objectName); err != nil {
378		return UploadInfo{}, err
379	}
380
381	// Size -1 is only supported on Google Cloud Storage, we error
382	// out in all other situations.
383	if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) {
384		return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName)
385	}
386
387	if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 {
388		return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'")
389	}
390
391	if size > 0 {
392		if isReadAt(reader) && !isObject(reader) {
393			seeker, ok := reader.(io.Seeker)
394			if ok {
395				offset, err := seeker.Seek(0, io.SeekCurrent)
396				if err != nil {
397					return UploadInfo{}, errInvalidArgument(err.Error())
398				}
399				reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
400			}
401		}
402	}
403
404	var md5Base64 string
405	if opts.SendContentMd5 {
406		// Create a buffer.
407		buf := make([]byte, size)
408
409		length, rErr := readFull(reader, buf)
410		if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
411			return UploadInfo{}, rErr
412		}
413
414		// Calculate md5sum.
415		hash := c.md5Hasher()
416		hash.Write(buf[:length])
417		md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
418		reader = bytes.NewReader(buf[:length])
419		hash.Close()
420	}
421
422	// Update progress reader appropriately to the latest offset as we
423	// read from the source.
424	readSeeker := newHook(reader, opts.Progress)
425
426	// This function does not calculate sha256 and md5sum for payload.
427	// Execute put object.
428	return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
429}
430
431// putObjectDo - executes the put object http operation.
432// NOTE: You must have WRITE permissions on a bucket to add an object to it.
433func (c Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) {
434	// Input validation.
435	if err := s3utils.CheckValidBucketName(bucketName); err != nil {
436		return UploadInfo{}, err
437	}
438	if err := s3utils.CheckValidObjectName(objectName); err != nil {
439		return UploadInfo{}, err
440	}
441	// Set headers.
442	customHeader := opts.Header()
443
444	// Populate request metadata.
445	reqMetadata := requestMetadata{
446		bucketName:       bucketName,
447		objectName:       objectName,
448		customHeader:     customHeader,
449		contentBody:      reader,
450		contentLength:    size,
451		contentMD5Base64: md5Base64,
452		contentSHA256Hex: sha256Hex,
453	}
454	if opts.Internal.SourceVersionID != "" {
455		if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
456			return UploadInfo{}, errInvalidArgument(err.Error())
457		}
458		urlValues := make(url.Values)
459		urlValues.Set("versionId", opts.Internal.SourceVersionID)
460		reqMetadata.queryValues = urlValues
461	}
462
463	// Execute PUT an objectName.
464	resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
465	defer closeResponse(resp)
466	if err != nil {
467		return UploadInfo{}, err
468	}
469	if resp != nil {
470		if resp.StatusCode != http.StatusOK {
471			return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
472		}
473	}
474
475	// extract lifecycle expiry date and rule ID
476	expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
477
478	return UploadInfo{
479		Bucket:           bucketName,
480		Key:              objectName,
481		ETag:             trimEtag(resp.Header.Get("ETag")),
482		VersionID:        resp.Header.Get(amzVersionID),
483		Size:             size,
484		Expiration:       expTime,
485		ExpirationRuleID: ruleID,
486	}, nil
487}
488