1// Package s3store provides a storage backend using AWS S3 or compatible servers.
2//
3// Configuration
4//
5// In order to allow this backend to function properly, the user accessing the
6// bucket must have at least following AWS IAM policy permissions for the
7// bucket and all of its subresources:
8// 	s3:AbortMultipartUpload
9// 	s3:DeleteObject
10// 	s3:GetObject
11// 	s3:ListMultipartUploadParts
12// 	s3:PutObject
13//
14// While this package uses the official AWS SDK for Go, S3Store is able
15// to work with any S3-compatible service such as Riak CS. In order to change
16// the HTTP endpoint used for sending requests to, consult the AWS Go SDK
17// (http://docs.aws.amazon.com/sdk-for-go/api/aws/Config.html#WithEndpoint-instance_method).
18//
19// Implementation
20//
21// Once a new tus upload is initiated, multiple objects in S3 are created:
22//
23// First of all, a new info object is stored which contains a JSON-encoded blob
24// of general information about the upload including its size and meta data.
25// This kind of objects have the suffix ".info" in their key.
26//
27// In addition a new multipart upload
28// (http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) is
29// created. Whenever a new chunk is uploaded to tusd using a PATCH request, a
30// new part is pushed to the multipart upload on S3.
31//
32// If meta data is associated with the upload during creation, it will be added
33// to the multipart upload and after finishing it, the meta data will be passed
34// to the final object. However, the metadata which will be attached to the
35// final object can only contain ASCII characters and every non-ASCII character
36// will be replaced by a question mark (for example, "Menü" will be "Men?").
37// However, this does not apply for the metadata returned by the GetInfo
38// function since it relies on the info object for reading the metadata.
39// Therefore, HEAD responses will always contain the unchanged metadata, Base64-
40// encoded, even if it contains non-ASCII characters.
41//
42// Once the upload is finished, the multipart upload is completed, resulting in
43// the entire file being stored in the bucket. The info object, containing
44// meta data is not deleted. It is recommended to copy the finished upload to
45// another bucket to avoid it being deleted by the Termination extension.
46//
47// If an upload is about to being terminated, the multipart upload is aborted
48// which removes all of the uploaded parts from the bucket. In addition, the
49// info object is also deleted. If the upload has been finished already, the
50// finished object containing the entire upload is also removed.
51//
52// Considerations
53//
54// In order to support tus' principle of resumable upload, S3's Multipart-Uploads
55// are internally used.
56//
57// When receiving a PATCH request, its body will be temporarily stored on disk.
58// This requirement has been made to ensure the minimum size of a single part
59// and to allow the AWS SDK to calculate a checksum. Once the part has been uploaded
60// to S3, the temporary file will be removed immediately. Therefore, please
61// ensure that the server running this storage backend has enough disk space
62// available to hold these caches.
63//
64// In addition, it must be mentioned that AWS S3 only offers eventual
65// consistency (https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel).
66// Therefore, it is required to build additional measurements in order to
67// prevent concurrent access to the same upload resources which may result in
68// data corruption. See handler.LockerDataStore for more information.
69package s3store
70
71import (
72	"bytes"
73	"context"
74	"encoding/json"
75	"errors"
76	"fmt"
77	"io"
78	"io/ioutil"
79	"net/http"
80	"os"
81	"regexp"
82	"strings"
83	"sync"
84	"time"
85
86	"github.com/tus/tusd/internal/uid"
87	"github.com/tus/tusd/pkg/handler"
88
89	"github.com/aws/aws-sdk-go/aws"
90	"github.com/aws/aws-sdk-go/aws/awserr"
91	"github.com/aws/aws-sdk-go/aws/request"
92	"github.com/aws/aws-sdk-go/service/s3"
93)
94
95// This regular expression matches every character which is not defined in the
96// ASCII tables which range from 00 to 7F, inclusive.
97// It also matches the \r and \n characters which are not allowed in values
98// for HTTP headers.
99var nonASCIIRegexp = regexp.MustCompile(`([^\x00-\x7F]|[\r\n])`)
100
101// See the handler.DataStore interface for documentation about the different
102// methods.
103type S3Store struct {
104	// Bucket used to store the data in, e.g. "tusdstore.example.com"
105	Bucket string
106	// ObjectPrefix is prepended to the name of each S3 object that is created
107	// to store uploaded files. It can be used to create a pseudo-directory
108	// structure in the bucket, e.g. "path/to/my/uploads".
109	ObjectPrefix string
110	// MetadataObjectPrefix is prepended to the name of each .info and .part S3
111	// object that is created. If it is not set, then ObjectPrefix is used.
112	MetadataObjectPrefix string
113	// Service specifies an interface used to communicate with the S3 backend.
114	// Usually, this is an instance of github.com/aws/aws-sdk-go/service/s3.S3
115	// (http://docs.aws.amazon.com/sdk-for-go/api/service/s3/S3.html).
116	Service S3API
117	// MaxPartSize specifies the maximum size of a single part uploaded to S3
118	// in bytes. This value must be bigger than MinPartSize! In order to
119	// choose the correct number, two things have to be kept in mind:
120	//
121	// If this value is too big and uploading the part to S3 is interrupted
122	// expectedly, the entire part is discarded and the end user is required
123	// to resume the upload and re-upload the entire big part. In addition, the
124	// entire part must be written to disk before submitting to S3.
125	//
126	// If this value is too low, a lot of requests to S3 may be made, depending
127	// on how fast data is coming in. This may result in an eventual overhead.
128	MaxPartSize int64
129	// MinPartSize specifies the minimum size of a single part uploaded to S3
130	// in bytes. This number needs to match with the underlying S3 backend or else
131	// uploaded parts will be reject. AWS S3, for example, uses 5MB for this value.
132	MinPartSize int64
133	// PreferredPartSize specifies the preferred size of a single part uploaded to
134	// S3. S3Store will attempt to slice the incoming data into parts with this
135	// size whenever possible. In some cases, smaller parts are necessary, so
136	// not every part may reach this value. The PreferredPartSize must be inside the
137	// range of MinPartSize to MaxPartSize.
138	PreferredPartSize int64
139	// MaxMultipartParts is the maximum number of parts an S3 multipart upload is
140	// allowed to have according to AWS S3 API specifications.
141	// See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html
142	MaxMultipartParts int64
143	// MaxObjectSize is the maximum size an S3 Object can have according to S3
144	// API specifications. See link above.
145	MaxObjectSize int64
146	// MaxBufferedParts is the number of additional parts that can be received from
147	// the client and stored on disk while a part is being uploaded to S3. This
148	// can help improve throughput by not blocking the client while tusd is
149	// communicating with the S3 API, which can have unpredictable latency.
150	MaxBufferedParts int64
151	// TemporaryDirectory is the path where S3Store will create temporary files
152	// on disk during the upload. An empty string ("", the default value) will
153	// cause S3Store to use the operating system's default temporary directory.
154	TemporaryDirectory string
155	// DisableContentHashes instructs the S3Store to not calculate the MD5 and SHA256
156	// hashes when uploading data to S3. These hashes are used for file integrity checks
157	// and for authentication. However, these hashes also consume a significant amount of
158	// CPU, so it might be desirable to disable them.
159	// Note that this property is experimental and might be removed in the future!
160	DisableContentHashes bool
161}
162
163type S3API interface {
164	PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opt ...request.Option) (*s3.PutObjectOutput, error)
165	ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error)
166	UploadPartWithContext(ctx context.Context, input *s3.UploadPartInput, opt ...request.Option) (*s3.UploadPartOutput, error)
167	GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error)
168	CreateMultipartUploadWithContext(ctx context.Context, input *s3.CreateMultipartUploadInput, opt ...request.Option) (*s3.CreateMultipartUploadOutput, error)
169	AbortMultipartUploadWithContext(ctx context.Context, input *s3.AbortMultipartUploadInput, opt ...request.Option) (*s3.AbortMultipartUploadOutput, error)
170	DeleteObjectWithContext(ctx context.Context, input *s3.DeleteObjectInput, opt ...request.Option) (*s3.DeleteObjectOutput, error)
171	DeleteObjectsWithContext(ctx context.Context, input *s3.DeleteObjectsInput, opt ...request.Option) (*s3.DeleteObjectsOutput, error)
172	CompleteMultipartUploadWithContext(ctx context.Context, input *s3.CompleteMultipartUploadInput, opt ...request.Option) (*s3.CompleteMultipartUploadOutput, error)
173	UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error)
174}
175
176type s3APIForPresigning interface {
177	UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput)
178}
179
180// New constructs a new storage using the supplied bucket and service object.
181func New(bucket string, service S3API) S3Store {
182	return S3Store{
183		Bucket:             bucket,
184		Service:            service,
185		MaxPartSize:        5 * 1024 * 1024 * 1024,
186		MinPartSize:        5 * 1024 * 1024,
187		PreferredPartSize:  50 * 1024 * 1024,
188		MaxMultipartParts:  10000,
189		MaxObjectSize:      5 * 1024 * 1024 * 1024 * 1024,
190		MaxBufferedParts:   20,
191		TemporaryDirectory: "",
192	}
193}
194
195// UseIn sets this store as the core data store in the passed composer and adds
196// all possible extension to it.
197func (store S3Store) UseIn(composer *handler.StoreComposer) {
198	composer.UseCore(store)
199	composer.UseTerminater(store)
200	composer.UseConcater(store)
201	composer.UseLengthDeferrer(store)
202}
203
204type s3Upload struct {
205	id    string
206	store *S3Store
207
208	// info stores the upload's current FileInfo struct. It may be nil if it hasn't
209	// been fetched yet from S3. Never read or write to it directly but instead use
210	// the GetInfo and writeInfo functions.
211	info *handler.FileInfo
212}
213
214func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) {
215	// an upload larger than MaxObjectSize must throw an error
216	if info.Size > store.MaxObjectSize {
217		return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize)
218	}
219
220	var uploadId string
221	if info.ID == "" {
222		uploadId = uid.Uid()
223	} else {
224		// certain tests set info.ID in advance
225		uploadId = info.ID
226	}
227
228	// Convert meta data into a map of pointers for AWS Go SDK, sigh.
229	metadata := make(map[string]*string, len(info.MetaData))
230	for key, value := range info.MetaData {
231		// Copying the value is required in order to prevent it from being
232		// overwritten by the next iteration.
233		v := nonASCIIRegexp.ReplaceAllString(value, "?")
234		metadata[key] = &v
235	}
236
237	// Create the actual multipart upload
238	res, err := store.Service.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
239		Bucket:   aws.String(store.Bucket),
240		Key:      store.keyWithPrefix(uploadId),
241		Metadata: metadata,
242	})
243	if err != nil {
244		return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err)
245	}
246
247	id := uploadId + "+" + *res.UploadId
248	info.ID = id
249
250	info.Storage = map[string]string{
251		"Type":   "s3store",
252		"Bucket": store.Bucket,
253		"Key":    *store.keyWithPrefix(uploadId),
254	}
255
256	upload := &s3Upload{id, &store, nil}
257	err = upload.writeInfo(ctx, info)
258	if err != nil {
259		return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err)
260	}
261
262	return upload, nil
263}
264
265func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, error) {
266	return &s3Upload{id, &store, nil}, nil
267}
268
269func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload {
270	return upload.(*s3Upload)
271}
272
273func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload {
274	return upload.(*s3Upload)
275}
276
277func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload {
278	return upload.(*s3Upload)
279}
280
281func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error {
282	id := upload.id
283	store := upload.store
284
285	uploadId, _ := splitIds(id)
286
287	upload.info = &info
288
289	infoJson, err := json.Marshal(info)
290	if err != nil {
291		return err
292	}
293
294	// Create object on S3 containing information about the file
295	_, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
296		Bucket:        aws.String(store.Bucket),
297		Key:           store.metadataKeyWithPrefix(uploadId + ".info"),
298		Body:          bytes.NewReader(infoJson),
299		ContentLength: aws.Int64(int64(len(infoJson))),
300	})
301
302	return err
303}
304
305func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) {
306	id := upload.id
307	store := upload.store
308
309	uploadId, multipartId := splitIds(id)
310
311	// Get the total size of the current upload
312	info, err := upload.GetInfo(ctx)
313	if err != nil {
314		return 0, err
315	}
316
317	size := info.Size
318	bytesUploaded := int64(0)
319	optimalPartSize, err := store.calcOptimalPartSize(size)
320	if err != nil {
321		return 0, err
322	}
323
324	// Get number of parts to generate next number
325	parts, err := store.listAllParts(ctx, id)
326	if err != nil {
327		return 0, err
328	}
329
330	numParts := len(parts)
331	nextPartNum := int64(numParts + 1)
332
333	incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(ctx, uploadId)
334	if err != nil {
335		return 0, err
336	}
337	if incompletePartFile != nil {
338		defer cleanUpTempFile(incompletePartFile)
339
340		if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil {
341			return 0, err
342		}
343
344		src = io.MultiReader(incompletePartFile, src)
345	}
346
347	fileChan := make(chan *os.File, store.MaxBufferedParts)
348	doneChan := make(chan struct{})
349	defer close(doneChan)
350
351	// If we panic or return while there are still files in the channel, then
352	// we may leak file descriptors. Let's ensure that those are cleaned up.
353	defer func() {
354		for file := range fileChan {
355			cleanUpTempFile(file)
356		}
357	}()
358
359	partProducer := s3PartProducer{
360		store: store,
361		done:  doneChan,
362		files: fileChan,
363		r:     src,
364	}
365	go partProducer.produce(optimalPartSize)
366
367	for file := range fileChan {
368		stat, err := file.Stat()
369		if err != nil {
370			return 0, err
371		}
372		n := stat.Size()
373
374		isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n)
375		if n >= store.MinPartSize || isFinalChunk {
376			uploadPartInput := &s3.UploadPartInput{
377				Bucket:     aws.String(store.Bucket),
378				Key:        store.keyWithPrefix(uploadId),
379				UploadId:   aws.String(multipartId),
380				PartNumber: aws.Int64(nextPartNum),
381			}
382			if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil {
383				return bytesUploaded, err
384			}
385		} else {
386			if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil {
387				return bytesUploaded, err
388			}
389
390			bytesUploaded += n
391
392			return (bytesUploaded - incompletePartSize), nil
393		}
394
395		offset += n
396		bytesUploaded += n
397		nextPartNum += 1
398	}
399
400	return bytesUploaded - incompletePartSize, partProducer.err
401}
402
403func cleanUpTempFile(file *os.File) {
404	file.Close()
405	os.Remove(file.Name())
406}
407
408func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error {
409	defer cleanUpTempFile(file)
410
411	if !upload.store.DisableContentHashes {
412		// By default, use the traditional approach to upload data
413		uploadPartInput.Body = file
414		_, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput)
415		return err
416	} else {
417		// Experimental feature to prevent the AWS SDK from calculating the SHA256 hash
418		// for the parts we upload to S3.
419		// We compute the presigned URL without the body attached and then send the request
420		// on our own. This way, the body is not included in the SHA256 calculation.
421		s3api, ok := upload.store.Service.(s3APIForPresigning)
422		if !ok {
423			return fmt.Errorf("s3store: failed to cast S3 service for presigning")
424		}
425
426		s3Req, _ := s3api.UploadPartRequest(uploadPartInput)
427
428		url, err := s3Req.Presign(15 * time.Minute)
429		if err != nil {
430			return err
431		}
432
433		req, err := http.NewRequest("PUT", url, file)
434		if err != nil {
435			return err
436		}
437
438		// Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked,
439		// which is not supported by AWS S3.
440		req.ContentLength = size
441
442		res, err := http.DefaultClient.Do(req)
443		if err != nil {
444			return err
445		}
446		defer res.Body.Close()
447
448		if res.StatusCode != 200 {
449			buf := new(strings.Builder)
450			io.Copy(buf, res.Body)
451			return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String())
452		}
453
454		return nil
455	}
456}
457
458func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) {
459	if upload.info != nil {
460		return *upload.info, nil
461	}
462
463	info, err = upload.fetchInfo(ctx)
464	if err != nil {
465		return info, err
466	}
467
468	upload.info = &info
469	return info, nil
470}
471
472func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, err error) {
473	id := upload.id
474	store := upload.store
475	uploadId, _ := splitIds(id)
476
477	// Get file info stored in separate object
478	res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
479		Bucket: aws.String(store.Bucket),
480		Key:    store.metadataKeyWithPrefix(uploadId + ".info"),
481	})
482	if err != nil {
483		if isAwsError(err, "NoSuchKey") {
484			return info, handler.ErrNotFound
485		}
486
487		return info, err
488	}
489
490	if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
491		return info, err
492	}
493
494	// Get uploaded parts and their offset
495	parts, err := store.listAllParts(ctx, id)
496	if err != nil {
497		// Check if the error is caused by the upload not being found. This happens
498		// when the multipart upload has already been completed or aborted. Since
499		// we already found the info object, we know that the upload has been
500		// completed and therefore can ensure the the offset is the size.
501		// AWS S3 returns NoSuchUpload, but other implementations, such as DigitalOcean
502		// Spaces, can also return NoSuchKey.
503		if isAwsError(err, "NoSuchUpload") || isAwsError(err, "NoSuchKey") {
504			info.Offset = info.Size
505			return info, nil
506		} else {
507			return info, err
508		}
509	}
510
511	offset := int64(0)
512
513	for _, part := range parts {
514		offset += *part.Size
515	}
516
517	incompletePartObject, err := store.getIncompletePartForUpload(ctx, uploadId)
518	if err != nil {
519		return info, err
520	}
521	if incompletePartObject != nil {
522		defer incompletePartObject.Body.Close()
523		offset += *incompletePartObject.ContentLength
524	}
525
526	info.Offset = offset
527
528	return
529}
530
531func (upload s3Upload) GetReader(ctx context.Context) (io.Reader, error) {
532	id := upload.id
533	store := upload.store
534	uploadId, multipartId := splitIds(id)
535
536	// Attempt to get upload content
537	res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
538		Bucket: aws.String(store.Bucket),
539		Key:    store.keyWithPrefix(uploadId),
540	})
541	if err == nil {
542		// No error occurred, and we are able to stream the object
543		return res.Body, nil
544	}
545
546	// If the file cannot be found, we ignore this error and continue since the
547	// upload may not have been finished yet. In this case we do not want to
548	// return a ErrNotFound but a more meaning-full message.
549	if !isAwsError(err, "NoSuchKey") {
550		return nil, err
551	}
552
553	// Test whether the multipart upload exists to find out if the upload
554	// never existsted or just has not been finished yet
555	_, err = store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{
556		Bucket:   aws.String(store.Bucket),
557		Key:      store.keyWithPrefix(uploadId),
558		UploadId: aws.String(multipartId),
559		MaxParts: aws.Int64(0),
560	})
561	if err == nil {
562		// The multipart upload still exists, which means we cannot download it yet
563		return nil, handler.NewHTTPError(errors.New("cannot stream non-finished upload"), http.StatusBadRequest)
564	}
565
566	if isAwsError(err, "NoSuchUpload") {
567		// Neither the object nor the multipart upload exists, so we return a 404
568		return nil, handler.ErrNotFound
569	}
570
571	return nil, err
572}
573
574func (upload s3Upload) Terminate(ctx context.Context) error {
575	id := upload.id
576	store := upload.store
577	uploadId, multipartId := splitIds(id)
578	var wg sync.WaitGroup
579	wg.Add(2)
580	errs := make([]error, 0, 3)
581
582	go func() {
583		defer wg.Done()
584
585		// Abort the multipart upload
586		_, err := store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
587			Bucket:   aws.String(store.Bucket),
588			Key:      store.keyWithPrefix(uploadId),
589			UploadId: aws.String(multipartId),
590		})
591		if err != nil && !isAwsError(err, "NoSuchUpload") {
592			errs = append(errs, err)
593		}
594	}()
595
596	go func() {
597		defer wg.Done()
598
599		// Delete the info and content files
600		res, err := store.Service.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
601			Bucket: aws.String(store.Bucket),
602			Delete: &s3.Delete{
603				Objects: []*s3.ObjectIdentifier{
604					{
605						Key: store.keyWithPrefix(uploadId),
606					},
607					{
608						Key: store.metadataKeyWithPrefix(uploadId + ".part"),
609					},
610					{
611						Key: store.metadataKeyWithPrefix(uploadId + ".info"),
612					},
613				},
614				Quiet: aws.Bool(true),
615			},
616		})
617
618		if err != nil {
619			errs = append(errs, err)
620			return
621		}
622
623		for _, s3Err := range res.Errors {
624			if *s3Err.Code != "NoSuchKey" {
625				errs = append(errs, fmt.Errorf("AWS S3 Error (%s) for object %s: %s", *s3Err.Code, *s3Err.Key, *s3Err.Message))
626			}
627		}
628	}()
629
630	wg.Wait()
631
632	if len(errs) > 0 {
633		return newMultiError(errs)
634	}
635
636	return nil
637}
638
639func (upload s3Upload) FinishUpload(ctx context.Context) error {
640	id := upload.id
641	store := upload.store
642	uploadId, multipartId := splitIds(id)
643
644	// Get uploaded parts
645	parts, err := store.listAllParts(ctx, id)
646	if err != nil {
647		return err
648	}
649
650	if len(parts) == 0 {
651		// AWS expects at least one part to be present when completing the multipart
652		// upload. So if the tus upload has a size of 0, we create an empty part
653		// and use that for completing the multipart upload.
654		res, err := store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{
655			Bucket:     aws.String(store.Bucket),
656			Key:        store.keyWithPrefix(uploadId),
657			UploadId:   aws.String(multipartId),
658			PartNumber: aws.Int64(1),
659			Body:       bytes.NewReader([]byte{}),
660		})
661		if err != nil {
662			return err
663		}
664
665		parts = []*s3.Part{
666			&s3.Part{
667				ETag:       res.ETag,
668				PartNumber: aws.Int64(1),
669			},
670		}
671
672	}
673
674	// Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next
675	// request.
676	completedParts := make([]*s3.CompletedPart, len(parts))
677
678	for index, part := range parts {
679		completedParts[index] = &s3.CompletedPart{
680			ETag:       part.ETag,
681			PartNumber: part.PartNumber,
682		}
683	}
684
685	_, err = store.Service.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
686		Bucket:   aws.String(store.Bucket),
687		Key:      store.keyWithPrefix(uploadId),
688		UploadId: aws.String(multipartId),
689		MultipartUpload: &s3.CompletedMultipartUpload{
690			Parts: completedParts,
691		},
692	})
693
694	return err
695}
696
697func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error {
698	hasSmallPart := false
699	for _, partialUpload := range partialUploads {
700		info, err := partialUpload.GetInfo(ctx)
701		if err != nil {
702			return err
703		}
704
705		if info.Size < upload.store.MinPartSize {
706			hasSmallPart = true
707		}
708	}
709
710	// If one partial upload is smaller than the the minimum part size for an S3
711	// Multipart Upload, we cannot use S3 Multipart Uploads for concatenating all
712	// the files.
713	// So instead we have to download them and concat them on disk.
714	if hasSmallPart {
715		return upload.concatUsingDownload(ctx, partialUploads)
716	} else {
717		return upload.concatUsingMultipart(ctx, partialUploads)
718	}
719}
720
721func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads []handler.Upload) error {
722	id := upload.id
723	store := upload.store
724	uploadId, multipartId := splitIds(id)
725
726	// Create a temporary file for holding the concatenated data
727	file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-")
728	if err != nil {
729		return err
730	}
731	defer cleanUpTempFile(file)
732
733	// Download each part and append it to the temporary file
734	for _, partialUpload := range partialUploads {
735		partialS3Upload := partialUpload.(*s3Upload)
736		partialId, _ := splitIds(partialS3Upload.id)
737
738		res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
739			Bucket: aws.String(store.Bucket),
740			Key:    store.keyWithPrefix(partialId),
741		})
742		if err != nil {
743			return err
744		}
745		defer res.Body.Close()
746
747		if _, err := io.Copy(file, res.Body); err != nil {
748			return err
749		}
750	}
751
752	// Seek to the beginning of the file, so the entire file is being uploaded
753	file.Seek(0, 0)
754
755	// Upload the entire file to S3
756	_, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
757		Bucket: aws.String(store.Bucket),
758		Key:    store.keyWithPrefix(uploadId),
759		Body:   file,
760	})
761	if err != nil {
762		return err
763	}
764
765	// Finally, abort the multipart upload since it will no longer be used.
766	// This happens asynchronously since we do not need to wait for the result.
767	// Also, the error is ignored on purpose as it does not change the outcome of
768	// the request.
769	go func() {
770		store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{
771			Bucket:   aws.String(store.Bucket),
772			Key:      store.keyWithPrefix(uploadId),
773			UploadId: aws.String(multipartId),
774		})
775	}()
776
777	return nil
778}
779
780func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error {
781	id := upload.id
782	store := upload.store
783	uploadId, multipartId := splitIds(id)
784
785	numPartialUploads := len(partialUploads)
786	errs := make([]error, 0, numPartialUploads)
787
788	// Copy partial uploads concurrently
789	var wg sync.WaitGroup
790	wg.Add(numPartialUploads)
791	for i, partialUpload := range partialUploads {
792		partialS3Upload := partialUpload.(*s3Upload)
793		partialId, _ := splitIds(partialS3Upload.id)
794
795		go func(i int, partialId string) {
796			defer wg.Done()
797
798			_, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
799				Bucket:   aws.String(store.Bucket),
800				Key:      store.keyWithPrefix(uploadId),
801				UploadId: aws.String(multipartId),
802				// Part numbers must be in the range of 1 to 10000, inclusive. Since
803				// slice indexes start at 0, we add 1 to ensure that i >= 1.
804				PartNumber: aws.Int64(int64(i + 1)),
805				CopySource: aws.String(store.Bucket + "/" + partialId),
806			})
807			if err != nil {
808				errs = append(errs, err)
809				return
810			}
811		}(i, partialId)
812	}
813
814	wg.Wait()
815
816	if len(errs) > 0 {
817		return newMultiError(errs)
818	}
819
820	return upload.FinishUpload(ctx)
821}
822
823func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error {
824	info, err := upload.GetInfo(ctx)
825	if err != nil {
826		return err
827	}
828	info.Size = length
829	info.SizeIsDeferred = false
830
831	return upload.writeInfo(ctx, info)
832}
833
834func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.Part, err error) {
835	uploadId, multipartId := splitIds(id)
836
837	partMarker := int64(0)
838	for {
839		// Get uploaded parts
840		listPtr, err := store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{
841			Bucket:           aws.String(store.Bucket),
842			Key:              store.keyWithPrefix(uploadId),
843			UploadId:         aws.String(multipartId),
844			PartNumberMarker: aws.Int64(partMarker),
845		})
846		if err != nil {
847			return nil, err
848		}
849
850		parts = append(parts, (*listPtr).Parts...)
851
852		if listPtr.IsTruncated != nil && *listPtr.IsTruncated {
853			partMarker = *listPtr.NextPartNumberMarker
854		} else {
855			break
856		}
857	}
858	return parts, nil
859}
860
861func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, int64, error) {
862	incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId)
863	if err != nil {
864		return nil, 0, err
865	}
866	if incompleteUploadObject == nil {
867		// We did not find an incomplete upload
868		return nil, 0, nil
869	}
870	defer incompleteUploadObject.Body.Close()
871
872	partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-")
873	if err != nil {
874		return nil, 0, err
875	}
876
877	n, err := io.Copy(partFile, incompleteUploadObject.Body)
878	if err != nil {
879		return nil, 0, err
880	}
881	if n < *incompleteUploadObject.ContentLength {
882		return nil, 0, errors.New("short read of incomplete upload")
883	}
884
885	_, err = partFile.Seek(0, 0)
886	if err != nil {
887		return nil, 0, err
888	}
889
890	return partFile, n, nil
891}
892
893func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId string) (*s3.GetObjectOutput, error) {
894	obj, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{
895		Bucket: aws.String(store.Bucket),
896		Key:    store.metadataKeyWithPrefix(uploadId + ".part"),
897	})
898
899	if err != nil && (isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied")) {
900		return nil, nil
901	}
902
903	return obj, err
904}
905
906func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error {
907	defer cleanUpTempFile(file)
908
909	_, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{
910		Bucket: aws.String(store.Bucket),
911		Key:    store.metadataKeyWithPrefix(uploadId + ".part"),
912		Body:   file,
913	})
914	return err
915}
916
917func (store S3Store) deleteIncompletePartForUpload(ctx context.Context, uploadId string) error {
918	_, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
919		Bucket: aws.String(store.Bucket),
920		Key:    store.metadataKeyWithPrefix(uploadId + ".part"),
921	})
922	return err
923}
924
925func splitIds(id string) (uploadId, multipartId string) {
926	index := strings.Index(id, "+")
927	if index == -1 {
928		return
929	}
930
931	uploadId = id[:index]
932	multipartId = id[index+1:]
933	return
934}
935
936// isAwsError tests whether an error object is an instance of the AWS error
937// specified by its code.
938func isAwsError(err error, code string) bool {
939	if err, ok := err.(awserr.Error); ok && err.Code() == code {
940		return true
941	}
942	return false
943}
944
945func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err error) {
946	switch {
947	// When upload is smaller or equal to PreferredPartSize, we upload in just one part.
948	case size <= store.PreferredPartSize:
949		optimalPartSize = store.PreferredPartSize
950	// Does the upload fit in MaxMultipartParts parts or less with PreferredPartSize.
951	case size <= store.PreferredPartSize*store.MaxMultipartParts:
952		optimalPartSize = store.PreferredPartSize
953	// Prerequisite: Be aware, that the result of an integer division (x/y) is
954	// ALWAYS rounded DOWN, as there are no digits behind the comma.
955	// In order to find out, whether we have an exact result or a rounded down
956	// one, we can check, whether the remainder of that division is 0 (x%y == 0).
957	//
958	// So if the result of (size/MaxMultipartParts) is not a rounded down value,
959	// then we can use it as our optimalPartSize. But if this division produces a
960	// remainder, we have to round up the result by adding +1. Otherwise our
961	// upload would not fit into MaxMultipartParts number of parts with that
962	// size. We would need an additional part in order to upload everything.
963	// While in almost all cases, we could skip the check for the remainder and
964	// just add +1 to every result, but there is one case, where doing that would
965	// doom our upload. When (MaxObjectSize == MaxPartSize * MaxMultipartParts),
966	// by adding +1, we would end up with an optimalPartSize > MaxPartSize.
967	// With the current S3 API specifications, we will not run into this problem,
968	// but these specs are subject to change, and there are other stores as well,
969	// which are implementing the S3 API (e.g. RIAK, Ceph RadosGW), but might
970	// have different settings.
971	case size%store.MaxMultipartParts == 0:
972		optimalPartSize = size / store.MaxMultipartParts
973	// Having a remainder larger than 0 means, the float result would have
974	// digits after the comma (e.g. be something like 10.9). As a result, we can
975	// only squeeze our upload into MaxMultipartParts parts, if we rounded UP
976	// this division's result. That is what is happending here. We round up by
977	// adding +1, if the prior test for (remainder == 0) did not succeed.
978	default:
979		optimalPartSize = size/store.MaxMultipartParts + 1
980	}
981
982	// optimalPartSize must never exceed MaxPartSize
983	if optimalPartSize > store.MaxPartSize {
984		return optimalPartSize, fmt.Errorf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize)
985	}
986	return optimalPartSize, nil
987}
988
989func (store S3Store) keyWithPrefix(key string) *string {
990	prefix := store.ObjectPrefix
991	if prefix != "" && !strings.HasSuffix(prefix, "/") {
992		prefix += "/"
993	}
994
995	return aws.String(prefix + key)
996}
997
998func (store S3Store) metadataKeyWithPrefix(key string) *string {
999	prefix := store.MetadataObjectPrefix
1000	if prefix == "" {
1001		prefix = store.ObjectPrefix
1002	}
1003	if prefix != "" && !strings.HasSuffix(prefix, "/") {
1004		prefix += "/"
1005	}
1006
1007	return aws.String(prefix + key)
1008}
1009