1// Package s3store provides a storage backend using AWS S3 or compatible servers. 2// 3// Configuration 4// 5// In order to allow this backend to function properly, the user accessing the 6// bucket must have at least following AWS IAM policy permissions for the 7// bucket and all of its subresources: 8// s3:AbortMultipartUpload 9// s3:DeleteObject 10// s3:GetObject 11// s3:ListMultipartUploadParts 12// s3:PutObject 13// 14// While this package uses the official AWS SDK for Go, S3Store is able 15// to work with any S3-compatible service such as Riak CS. In order to change 16// the HTTP endpoint used for sending requests to, consult the AWS Go SDK 17// (http://docs.aws.amazon.com/sdk-for-go/api/aws/Config.html#WithEndpoint-instance_method). 18// 19// Implementation 20// 21// Once a new tus upload is initiated, multiple objects in S3 are created: 22// 23// First of all, a new info object is stored which contains a JSON-encoded blob 24// of general information about the upload including its size and meta data. 25// This kind of objects have the suffix ".info" in their key. 26// 27// In addition a new multipart upload 28// (http://docs.aws.amazon.com/AmazonS3/latest/dev/uploadobjusingmpu.html) is 29// created. Whenever a new chunk is uploaded to tusd using a PATCH request, a 30// new part is pushed to the multipart upload on S3. 31// 32// If meta data is associated with the upload during creation, it will be added 33// to the multipart upload and after finishing it, the meta data will be passed 34// to the final object. However, the metadata which will be attached to the 35// final object can only contain ASCII characters and every non-ASCII character 36// will be replaced by a question mark (for example, "Menü" will be "Men?"). 37// However, this does not apply for the metadata returned by the GetInfo 38// function since it relies on the info object for reading the metadata. 39// Therefore, HEAD responses will always contain the unchanged metadata, Base64- 40// encoded, even if it contains non-ASCII characters. 41// 42// Once the upload is finished, the multipart upload is completed, resulting in 43// the entire file being stored in the bucket. The info object, containing 44// meta data is not deleted. It is recommended to copy the finished upload to 45// another bucket to avoid it being deleted by the Termination extension. 46// 47// If an upload is about to being terminated, the multipart upload is aborted 48// which removes all of the uploaded parts from the bucket. In addition, the 49// info object is also deleted. If the upload has been finished already, the 50// finished object containing the entire upload is also removed. 51// 52// Considerations 53// 54// In order to support tus' principle of resumable upload, S3's Multipart-Uploads 55// are internally used. 56// 57// When receiving a PATCH request, its body will be temporarily stored on disk. 58// This requirement has been made to ensure the minimum size of a single part 59// and to allow the AWS SDK to calculate a checksum. Once the part has been uploaded 60// to S3, the temporary file will be removed immediately. Therefore, please 61// ensure that the server running this storage backend has enough disk space 62// available to hold these caches. 63// 64// In addition, it must be mentioned that AWS S3 only offers eventual 65// consistency (https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel). 66// Therefore, it is required to build additional measurements in order to 67// prevent concurrent access to the same upload resources which may result in 68// data corruption. See handler.LockerDataStore for more information. 69package s3store 70 71import ( 72 "bytes" 73 "context" 74 "encoding/json" 75 "errors" 76 "fmt" 77 "io" 78 "io/ioutil" 79 "net/http" 80 "os" 81 "regexp" 82 "strings" 83 "sync" 84 "time" 85 86 "github.com/tus/tusd/internal/uid" 87 "github.com/tus/tusd/pkg/handler" 88 89 "github.com/aws/aws-sdk-go/aws" 90 "github.com/aws/aws-sdk-go/aws/awserr" 91 "github.com/aws/aws-sdk-go/aws/request" 92 "github.com/aws/aws-sdk-go/service/s3" 93) 94 95// This regular expression matches every character which is not defined in the 96// ASCII tables which range from 00 to 7F, inclusive. 97// It also matches the \r and \n characters which are not allowed in values 98// for HTTP headers. 99var nonASCIIRegexp = regexp.MustCompile(`([^\x00-\x7F]|[\r\n])`) 100 101// See the handler.DataStore interface for documentation about the different 102// methods. 103type S3Store struct { 104 // Bucket used to store the data in, e.g. "tusdstore.example.com" 105 Bucket string 106 // ObjectPrefix is prepended to the name of each S3 object that is created 107 // to store uploaded files. It can be used to create a pseudo-directory 108 // structure in the bucket, e.g. "path/to/my/uploads". 109 ObjectPrefix string 110 // MetadataObjectPrefix is prepended to the name of each .info and .part S3 111 // object that is created. If it is not set, then ObjectPrefix is used. 112 MetadataObjectPrefix string 113 // Service specifies an interface used to communicate with the S3 backend. 114 // Usually, this is an instance of github.com/aws/aws-sdk-go/service/s3.S3 115 // (http://docs.aws.amazon.com/sdk-for-go/api/service/s3/S3.html). 116 Service S3API 117 // MaxPartSize specifies the maximum size of a single part uploaded to S3 118 // in bytes. This value must be bigger than MinPartSize! In order to 119 // choose the correct number, two things have to be kept in mind: 120 // 121 // If this value is too big and uploading the part to S3 is interrupted 122 // expectedly, the entire part is discarded and the end user is required 123 // to resume the upload and re-upload the entire big part. In addition, the 124 // entire part must be written to disk before submitting to S3. 125 // 126 // If this value is too low, a lot of requests to S3 may be made, depending 127 // on how fast data is coming in. This may result in an eventual overhead. 128 MaxPartSize int64 129 // MinPartSize specifies the minimum size of a single part uploaded to S3 130 // in bytes. This number needs to match with the underlying S3 backend or else 131 // uploaded parts will be reject. AWS S3, for example, uses 5MB for this value. 132 MinPartSize int64 133 // PreferredPartSize specifies the preferred size of a single part uploaded to 134 // S3. S3Store will attempt to slice the incoming data into parts with this 135 // size whenever possible. In some cases, smaller parts are necessary, so 136 // not every part may reach this value. The PreferredPartSize must be inside the 137 // range of MinPartSize to MaxPartSize. 138 PreferredPartSize int64 139 // MaxMultipartParts is the maximum number of parts an S3 multipart upload is 140 // allowed to have according to AWS S3 API specifications. 141 // See: http://docs.aws.amazon.com/AmazonS3/latest/dev/qfacts.html 142 MaxMultipartParts int64 143 // MaxObjectSize is the maximum size an S3 Object can have according to S3 144 // API specifications. See link above. 145 MaxObjectSize int64 146 // MaxBufferedParts is the number of additional parts that can be received from 147 // the client and stored on disk while a part is being uploaded to S3. This 148 // can help improve throughput by not blocking the client while tusd is 149 // communicating with the S3 API, which can have unpredictable latency. 150 MaxBufferedParts int64 151 // TemporaryDirectory is the path where S3Store will create temporary files 152 // on disk during the upload. An empty string ("", the default value) will 153 // cause S3Store to use the operating system's default temporary directory. 154 TemporaryDirectory string 155 // DisableContentHashes instructs the S3Store to not calculate the MD5 and SHA256 156 // hashes when uploading data to S3. These hashes are used for file integrity checks 157 // and for authentication. However, these hashes also consume a significant amount of 158 // CPU, so it might be desirable to disable them. 159 // Note that this property is experimental and might be removed in the future! 160 DisableContentHashes bool 161} 162 163type S3API interface { 164 PutObjectWithContext(ctx context.Context, input *s3.PutObjectInput, opt ...request.Option) (*s3.PutObjectOutput, error) 165 ListPartsWithContext(ctx context.Context, input *s3.ListPartsInput, opt ...request.Option) (*s3.ListPartsOutput, error) 166 UploadPartWithContext(ctx context.Context, input *s3.UploadPartInput, opt ...request.Option) (*s3.UploadPartOutput, error) 167 GetObjectWithContext(ctx context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) 168 CreateMultipartUploadWithContext(ctx context.Context, input *s3.CreateMultipartUploadInput, opt ...request.Option) (*s3.CreateMultipartUploadOutput, error) 169 AbortMultipartUploadWithContext(ctx context.Context, input *s3.AbortMultipartUploadInput, opt ...request.Option) (*s3.AbortMultipartUploadOutput, error) 170 DeleteObjectWithContext(ctx context.Context, input *s3.DeleteObjectInput, opt ...request.Option) (*s3.DeleteObjectOutput, error) 171 DeleteObjectsWithContext(ctx context.Context, input *s3.DeleteObjectsInput, opt ...request.Option) (*s3.DeleteObjectsOutput, error) 172 CompleteMultipartUploadWithContext(ctx context.Context, input *s3.CompleteMultipartUploadInput, opt ...request.Option) (*s3.CompleteMultipartUploadOutput, error) 173 UploadPartCopyWithContext(ctx context.Context, input *s3.UploadPartCopyInput, opt ...request.Option) (*s3.UploadPartCopyOutput, error) 174} 175 176type s3APIForPresigning interface { 177 UploadPartRequest(input *s3.UploadPartInput) (req *request.Request, output *s3.UploadPartOutput) 178} 179 180// New constructs a new storage using the supplied bucket and service object. 181func New(bucket string, service S3API) S3Store { 182 return S3Store{ 183 Bucket: bucket, 184 Service: service, 185 MaxPartSize: 5 * 1024 * 1024 * 1024, 186 MinPartSize: 5 * 1024 * 1024, 187 PreferredPartSize: 50 * 1024 * 1024, 188 MaxMultipartParts: 10000, 189 MaxObjectSize: 5 * 1024 * 1024 * 1024 * 1024, 190 MaxBufferedParts: 20, 191 TemporaryDirectory: "", 192 } 193} 194 195// UseIn sets this store as the core data store in the passed composer and adds 196// all possible extension to it. 197func (store S3Store) UseIn(composer *handler.StoreComposer) { 198 composer.UseCore(store) 199 composer.UseTerminater(store) 200 composer.UseConcater(store) 201 composer.UseLengthDeferrer(store) 202} 203 204type s3Upload struct { 205 id string 206 store *S3Store 207 208 // info stores the upload's current FileInfo struct. It may be nil if it hasn't 209 // been fetched yet from S3. Never read or write to it directly but instead use 210 // the GetInfo and writeInfo functions. 211 info *handler.FileInfo 212} 213 214func (store S3Store) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) { 215 // an upload larger than MaxObjectSize must throw an error 216 if info.Size > store.MaxObjectSize { 217 return nil, fmt.Errorf("s3store: upload size of %v bytes exceeds MaxObjectSize of %v bytes", info.Size, store.MaxObjectSize) 218 } 219 220 var uploadId string 221 if info.ID == "" { 222 uploadId = uid.Uid() 223 } else { 224 // certain tests set info.ID in advance 225 uploadId = info.ID 226 } 227 228 // Convert meta data into a map of pointers for AWS Go SDK, sigh. 229 metadata := make(map[string]*string, len(info.MetaData)) 230 for key, value := range info.MetaData { 231 // Copying the value is required in order to prevent it from being 232 // overwritten by the next iteration. 233 v := nonASCIIRegexp.ReplaceAllString(value, "?") 234 metadata[key] = &v 235 } 236 237 // Create the actual multipart upload 238 res, err := store.Service.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 239 Bucket: aws.String(store.Bucket), 240 Key: store.keyWithPrefix(uploadId), 241 Metadata: metadata, 242 }) 243 if err != nil { 244 return nil, fmt.Errorf("s3store: unable to create multipart upload:\n%s", err) 245 } 246 247 id := uploadId + "+" + *res.UploadId 248 info.ID = id 249 250 info.Storage = map[string]string{ 251 "Type": "s3store", 252 "Bucket": store.Bucket, 253 "Key": *store.keyWithPrefix(uploadId), 254 } 255 256 upload := &s3Upload{id, &store, nil} 257 err = upload.writeInfo(ctx, info) 258 if err != nil { 259 return nil, fmt.Errorf("s3store: unable to create info file:\n%s", err) 260 } 261 262 return upload, nil 263} 264 265func (store S3Store) GetUpload(ctx context.Context, id string) (handler.Upload, error) { 266 return &s3Upload{id, &store, nil}, nil 267} 268 269func (store S3Store) AsTerminatableUpload(upload handler.Upload) handler.TerminatableUpload { 270 return upload.(*s3Upload) 271} 272 273func (store S3Store) AsLengthDeclarableUpload(upload handler.Upload) handler.LengthDeclarableUpload { 274 return upload.(*s3Upload) 275} 276 277func (store S3Store) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { 278 return upload.(*s3Upload) 279} 280 281func (upload *s3Upload) writeInfo(ctx context.Context, info handler.FileInfo) error { 282 id := upload.id 283 store := upload.store 284 285 uploadId, _ := splitIds(id) 286 287 upload.info = &info 288 289 infoJson, err := json.Marshal(info) 290 if err != nil { 291 return err 292 } 293 294 // Create object on S3 containing information about the file 295 _, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ 296 Bucket: aws.String(store.Bucket), 297 Key: store.metadataKeyWithPrefix(uploadId + ".info"), 298 Body: bytes.NewReader(infoJson), 299 ContentLength: aws.Int64(int64(len(infoJson))), 300 }) 301 302 return err 303} 304 305func (upload s3Upload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { 306 id := upload.id 307 store := upload.store 308 309 uploadId, multipartId := splitIds(id) 310 311 // Get the total size of the current upload 312 info, err := upload.GetInfo(ctx) 313 if err != nil { 314 return 0, err 315 } 316 317 size := info.Size 318 bytesUploaded := int64(0) 319 optimalPartSize, err := store.calcOptimalPartSize(size) 320 if err != nil { 321 return 0, err 322 } 323 324 // Get number of parts to generate next number 325 parts, err := store.listAllParts(ctx, id) 326 if err != nil { 327 return 0, err 328 } 329 330 numParts := len(parts) 331 nextPartNum := int64(numParts + 1) 332 333 incompletePartFile, incompletePartSize, err := store.downloadIncompletePartForUpload(ctx, uploadId) 334 if err != nil { 335 return 0, err 336 } 337 if incompletePartFile != nil { 338 defer cleanUpTempFile(incompletePartFile) 339 340 if err := store.deleteIncompletePartForUpload(ctx, uploadId); err != nil { 341 return 0, err 342 } 343 344 src = io.MultiReader(incompletePartFile, src) 345 } 346 347 fileChan := make(chan *os.File, store.MaxBufferedParts) 348 doneChan := make(chan struct{}) 349 defer close(doneChan) 350 351 // If we panic or return while there are still files in the channel, then 352 // we may leak file descriptors. Let's ensure that those are cleaned up. 353 defer func() { 354 for file := range fileChan { 355 cleanUpTempFile(file) 356 } 357 }() 358 359 partProducer := s3PartProducer{ 360 store: store, 361 done: doneChan, 362 files: fileChan, 363 r: src, 364 } 365 go partProducer.produce(optimalPartSize) 366 367 for file := range fileChan { 368 stat, err := file.Stat() 369 if err != nil { 370 return 0, err 371 } 372 n := stat.Size() 373 374 isFinalChunk := !info.SizeIsDeferred && (size == (offset-incompletePartSize)+n) 375 if n >= store.MinPartSize || isFinalChunk { 376 uploadPartInput := &s3.UploadPartInput{ 377 Bucket: aws.String(store.Bucket), 378 Key: store.keyWithPrefix(uploadId), 379 UploadId: aws.String(multipartId), 380 PartNumber: aws.Int64(nextPartNum), 381 } 382 if err := upload.putPartForUpload(ctx, uploadPartInput, file, n); err != nil { 383 return bytesUploaded, err 384 } 385 } else { 386 if err := store.putIncompletePartForUpload(ctx, uploadId, file); err != nil { 387 return bytesUploaded, err 388 } 389 390 bytesUploaded += n 391 392 return (bytesUploaded - incompletePartSize), nil 393 } 394 395 offset += n 396 bytesUploaded += n 397 nextPartNum += 1 398 } 399 400 return bytesUploaded - incompletePartSize, partProducer.err 401} 402 403func cleanUpTempFile(file *os.File) { 404 file.Close() 405 os.Remove(file.Name()) 406} 407 408func (upload *s3Upload) putPartForUpload(ctx context.Context, uploadPartInput *s3.UploadPartInput, file *os.File, size int64) error { 409 defer cleanUpTempFile(file) 410 411 if !upload.store.DisableContentHashes { 412 // By default, use the traditional approach to upload data 413 uploadPartInput.Body = file 414 _, err := upload.store.Service.UploadPartWithContext(ctx, uploadPartInput) 415 return err 416 } else { 417 // Experimental feature to prevent the AWS SDK from calculating the SHA256 hash 418 // for the parts we upload to S3. 419 // We compute the presigned URL without the body attached and then send the request 420 // on our own. This way, the body is not included in the SHA256 calculation. 421 s3api, ok := upload.store.Service.(s3APIForPresigning) 422 if !ok { 423 return fmt.Errorf("s3store: failed to cast S3 service for presigning") 424 } 425 426 s3Req, _ := s3api.UploadPartRequest(uploadPartInput) 427 428 url, err := s3Req.Presign(15 * time.Minute) 429 if err != nil { 430 return err 431 } 432 433 req, err := http.NewRequest("PUT", url, file) 434 if err != nil { 435 return err 436 } 437 438 // Set the Content-Length manually to prevent the usage of Transfer-Encoding: chunked, 439 // which is not supported by AWS S3. 440 req.ContentLength = size 441 442 res, err := http.DefaultClient.Do(req) 443 if err != nil { 444 return err 445 } 446 defer res.Body.Close() 447 448 if res.StatusCode != 200 { 449 buf := new(strings.Builder) 450 io.Copy(buf, res.Body) 451 return fmt.Errorf("s3store: unexpected response code %d for presigned upload: %s", res.StatusCode, buf.String()) 452 } 453 454 return nil 455 } 456} 457 458func (upload *s3Upload) GetInfo(ctx context.Context) (info handler.FileInfo, err error) { 459 if upload.info != nil { 460 return *upload.info, nil 461 } 462 463 info, err = upload.fetchInfo(ctx) 464 if err != nil { 465 return info, err 466 } 467 468 upload.info = &info 469 return info, nil 470} 471 472func (upload s3Upload) fetchInfo(ctx context.Context) (info handler.FileInfo, err error) { 473 id := upload.id 474 store := upload.store 475 uploadId, _ := splitIds(id) 476 477 // Get file info stored in separate object 478 res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ 479 Bucket: aws.String(store.Bucket), 480 Key: store.metadataKeyWithPrefix(uploadId + ".info"), 481 }) 482 if err != nil { 483 if isAwsError(err, "NoSuchKey") { 484 return info, handler.ErrNotFound 485 } 486 487 return info, err 488 } 489 490 if err := json.NewDecoder(res.Body).Decode(&info); err != nil { 491 return info, err 492 } 493 494 // Get uploaded parts and their offset 495 parts, err := store.listAllParts(ctx, id) 496 if err != nil { 497 // Check if the error is caused by the upload not being found. This happens 498 // when the multipart upload has already been completed or aborted. Since 499 // we already found the info object, we know that the upload has been 500 // completed and therefore can ensure the the offset is the size. 501 // AWS S3 returns NoSuchUpload, but other implementations, such as DigitalOcean 502 // Spaces, can also return NoSuchKey. 503 if isAwsError(err, "NoSuchUpload") || isAwsError(err, "NoSuchKey") { 504 info.Offset = info.Size 505 return info, nil 506 } else { 507 return info, err 508 } 509 } 510 511 offset := int64(0) 512 513 for _, part := range parts { 514 offset += *part.Size 515 } 516 517 incompletePartObject, err := store.getIncompletePartForUpload(ctx, uploadId) 518 if err != nil { 519 return info, err 520 } 521 if incompletePartObject != nil { 522 defer incompletePartObject.Body.Close() 523 offset += *incompletePartObject.ContentLength 524 } 525 526 info.Offset = offset 527 528 return 529} 530 531func (upload s3Upload) GetReader(ctx context.Context) (io.Reader, error) { 532 id := upload.id 533 store := upload.store 534 uploadId, multipartId := splitIds(id) 535 536 // Attempt to get upload content 537 res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ 538 Bucket: aws.String(store.Bucket), 539 Key: store.keyWithPrefix(uploadId), 540 }) 541 if err == nil { 542 // No error occurred, and we are able to stream the object 543 return res.Body, nil 544 } 545 546 // If the file cannot be found, we ignore this error and continue since the 547 // upload may not have been finished yet. In this case we do not want to 548 // return a ErrNotFound but a more meaning-full message. 549 if !isAwsError(err, "NoSuchKey") { 550 return nil, err 551 } 552 553 // Test whether the multipart upload exists to find out if the upload 554 // never existsted or just has not been finished yet 555 _, err = store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{ 556 Bucket: aws.String(store.Bucket), 557 Key: store.keyWithPrefix(uploadId), 558 UploadId: aws.String(multipartId), 559 MaxParts: aws.Int64(0), 560 }) 561 if err == nil { 562 // The multipart upload still exists, which means we cannot download it yet 563 return nil, handler.NewHTTPError(errors.New("cannot stream non-finished upload"), http.StatusBadRequest) 564 } 565 566 if isAwsError(err, "NoSuchUpload") { 567 // Neither the object nor the multipart upload exists, so we return a 404 568 return nil, handler.ErrNotFound 569 } 570 571 return nil, err 572} 573 574func (upload s3Upload) Terminate(ctx context.Context) error { 575 id := upload.id 576 store := upload.store 577 uploadId, multipartId := splitIds(id) 578 var wg sync.WaitGroup 579 wg.Add(2) 580 errs := make([]error, 0, 3) 581 582 go func() { 583 defer wg.Done() 584 585 // Abort the multipart upload 586 _, err := store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 587 Bucket: aws.String(store.Bucket), 588 Key: store.keyWithPrefix(uploadId), 589 UploadId: aws.String(multipartId), 590 }) 591 if err != nil && !isAwsError(err, "NoSuchUpload") { 592 errs = append(errs, err) 593 } 594 }() 595 596 go func() { 597 defer wg.Done() 598 599 // Delete the info and content files 600 res, err := store.Service.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{ 601 Bucket: aws.String(store.Bucket), 602 Delete: &s3.Delete{ 603 Objects: []*s3.ObjectIdentifier{ 604 { 605 Key: store.keyWithPrefix(uploadId), 606 }, 607 { 608 Key: store.metadataKeyWithPrefix(uploadId + ".part"), 609 }, 610 { 611 Key: store.metadataKeyWithPrefix(uploadId + ".info"), 612 }, 613 }, 614 Quiet: aws.Bool(true), 615 }, 616 }) 617 618 if err != nil { 619 errs = append(errs, err) 620 return 621 } 622 623 for _, s3Err := range res.Errors { 624 if *s3Err.Code != "NoSuchKey" { 625 errs = append(errs, fmt.Errorf("AWS S3 Error (%s) for object %s: %s", *s3Err.Code, *s3Err.Key, *s3Err.Message)) 626 } 627 } 628 }() 629 630 wg.Wait() 631 632 if len(errs) > 0 { 633 return newMultiError(errs) 634 } 635 636 return nil 637} 638 639func (upload s3Upload) FinishUpload(ctx context.Context) error { 640 id := upload.id 641 store := upload.store 642 uploadId, multipartId := splitIds(id) 643 644 // Get uploaded parts 645 parts, err := store.listAllParts(ctx, id) 646 if err != nil { 647 return err 648 } 649 650 if len(parts) == 0 { 651 // AWS expects at least one part to be present when completing the multipart 652 // upload. So if the tus upload has a size of 0, we create an empty part 653 // and use that for completing the multipart upload. 654 res, err := store.Service.UploadPartWithContext(ctx, &s3.UploadPartInput{ 655 Bucket: aws.String(store.Bucket), 656 Key: store.keyWithPrefix(uploadId), 657 UploadId: aws.String(multipartId), 658 PartNumber: aws.Int64(1), 659 Body: bytes.NewReader([]byte{}), 660 }) 661 if err != nil { 662 return err 663 } 664 665 parts = []*s3.Part{ 666 &s3.Part{ 667 ETag: res.ETag, 668 PartNumber: aws.Int64(1), 669 }, 670 } 671 672 } 673 674 // Transform the []*s3.Part slice to a []*s3.CompletedPart slice for the next 675 // request. 676 completedParts := make([]*s3.CompletedPart, len(parts)) 677 678 for index, part := range parts { 679 completedParts[index] = &s3.CompletedPart{ 680 ETag: part.ETag, 681 PartNumber: part.PartNumber, 682 } 683 } 684 685 _, err = store.Service.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 686 Bucket: aws.String(store.Bucket), 687 Key: store.keyWithPrefix(uploadId), 688 UploadId: aws.String(multipartId), 689 MultipartUpload: &s3.CompletedMultipartUpload{ 690 Parts: completedParts, 691 }, 692 }) 693 694 return err 695} 696 697func (upload *s3Upload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error { 698 hasSmallPart := false 699 for _, partialUpload := range partialUploads { 700 info, err := partialUpload.GetInfo(ctx) 701 if err != nil { 702 return err 703 } 704 705 if info.Size < upload.store.MinPartSize { 706 hasSmallPart = true 707 } 708 } 709 710 // If one partial upload is smaller than the the minimum part size for an S3 711 // Multipart Upload, we cannot use S3 Multipart Uploads for concatenating all 712 // the files. 713 // So instead we have to download them and concat them on disk. 714 if hasSmallPart { 715 return upload.concatUsingDownload(ctx, partialUploads) 716 } else { 717 return upload.concatUsingMultipart(ctx, partialUploads) 718 } 719} 720 721func (upload *s3Upload) concatUsingDownload(ctx context.Context, partialUploads []handler.Upload) error { 722 id := upload.id 723 store := upload.store 724 uploadId, multipartId := splitIds(id) 725 726 // Create a temporary file for holding the concatenated data 727 file, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-concat-tmp-") 728 if err != nil { 729 return err 730 } 731 defer cleanUpTempFile(file) 732 733 // Download each part and append it to the temporary file 734 for _, partialUpload := range partialUploads { 735 partialS3Upload := partialUpload.(*s3Upload) 736 partialId, _ := splitIds(partialS3Upload.id) 737 738 res, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ 739 Bucket: aws.String(store.Bucket), 740 Key: store.keyWithPrefix(partialId), 741 }) 742 if err != nil { 743 return err 744 } 745 defer res.Body.Close() 746 747 if _, err := io.Copy(file, res.Body); err != nil { 748 return err 749 } 750 } 751 752 // Seek to the beginning of the file, so the entire file is being uploaded 753 file.Seek(0, 0) 754 755 // Upload the entire file to S3 756 _, err = store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ 757 Bucket: aws.String(store.Bucket), 758 Key: store.keyWithPrefix(uploadId), 759 Body: file, 760 }) 761 if err != nil { 762 return err 763 } 764 765 // Finally, abort the multipart upload since it will no longer be used. 766 // This happens asynchronously since we do not need to wait for the result. 767 // Also, the error is ignored on purpose as it does not change the outcome of 768 // the request. 769 go func() { 770 store.Service.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 771 Bucket: aws.String(store.Bucket), 772 Key: store.keyWithPrefix(uploadId), 773 UploadId: aws.String(multipartId), 774 }) 775 }() 776 777 return nil 778} 779 780func (upload *s3Upload) concatUsingMultipart(ctx context.Context, partialUploads []handler.Upload) error { 781 id := upload.id 782 store := upload.store 783 uploadId, multipartId := splitIds(id) 784 785 numPartialUploads := len(partialUploads) 786 errs := make([]error, 0, numPartialUploads) 787 788 // Copy partial uploads concurrently 789 var wg sync.WaitGroup 790 wg.Add(numPartialUploads) 791 for i, partialUpload := range partialUploads { 792 partialS3Upload := partialUpload.(*s3Upload) 793 partialId, _ := splitIds(partialS3Upload.id) 794 795 go func(i int, partialId string) { 796 defer wg.Done() 797 798 _, err := store.Service.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{ 799 Bucket: aws.String(store.Bucket), 800 Key: store.keyWithPrefix(uploadId), 801 UploadId: aws.String(multipartId), 802 // Part numbers must be in the range of 1 to 10000, inclusive. Since 803 // slice indexes start at 0, we add 1 to ensure that i >= 1. 804 PartNumber: aws.Int64(int64(i + 1)), 805 CopySource: aws.String(store.Bucket + "/" + partialId), 806 }) 807 if err != nil { 808 errs = append(errs, err) 809 return 810 } 811 }(i, partialId) 812 } 813 814 wg.Wait() 815 816 if len(errs) > 0 { 817 return newMultiError(errs) 818 } 819 820 return upload.FinishUpload(ctx) 821} 822 823func (upload *s3Upload) DeclareLength(ctx context.Context, length int64) error { 824 info, err := upload.GetInfo(ctx) 825 if err != nil { 826 return err 827 } 828 info.Size = length 829 info.SizeIsDeferred = false 830 831 return upload.writeInfo(ctx, info) 832} 833 834func (store S3Store) listAllParts(ctx context.Context, id string) (parts []*s3.Part, err error) { 835 uploadId, multipartId := splitIds(id) 836 837 partMarker := int64(0) 838 for { 839 // Get uploaded parts 840 listPtr, err := store.Service.ListPartsWithContext(ctx, &s3.ListPartsInput{ 841 Bucket: aws.String(store.Bucket), 842 Key: store.keyWithPrefix(uploadId), 843 UploadId: aws.String(multipartId), 844 PartNumberMarker: aws.Int64(partMarker), 845 }) 846 if err != nil { 847 return nil, err 848 } 849 850 parts = append(parts, (*listPtr).Parts...) 851 852 if listPtr.IsTruncated != nil && *listPtr.IsTruncated { 853 partMarker = *listPtr.NextPartNumberMarker 854 } else { 855 break 856 } 857 } 858 return parts, nil 859} 860 861func (store S3Store) downloadIncompletePartForUpload(ctx context.Context, uploadId string) (*os.File, int64, error) { 862 incompleteUploadObject, err := store.getIncompletePartForUpload(ctx, uploadId) 863 if err != nil { 864 return nil, 0, err 865 } 866 if incompleteUploadObject == nil { 867 // We did not find an incomplete upload 868 return nil, 0, nil 869 } 870 defer incompleteUploadObject.Body.Close() 871 872 partFile, err := ioutil.TempFile(store.TemporaryDirectory, "tusd-s3-tmp-") 873 if err != nil { 874 return nil, 0, err 875 } 876 877 n, err := io.Copy(partFile, incompleteUploadObject.Body) 878 if err != nil { 879 return nil, 0, err 880 } 881 if n < *incompleteUploadObject.ContentLength { 882 return nil, 0, errors.New("short read of incomplete upload") 883 } 884 885 _, err = partFile.Seek(0, 0) 886 if err != nil { 887 return nil, 0, err 888 } 889 890 return partFile, n, nil 891} 892 893func (store S3Store) getIncompletePartForUpload(ctx context.Context, uploadId string) (*s3.GetObjectOutput, error) { 894 obj, err := store.Service.GetObjectWithContext(ctx, &s3.GetObjectInput{ 895 Bucket: aws.String(store.Bucket), 896 Key: store.metadataKeyWithPrefix(uploadId + ".part"), 897 }) 898 899 if err != nil && (isAwsError(err, s3.ErrCodeNoSuchKey) || isAwsError(err, "NotFound") || isAwsError(err, "AccessDenied")) { 900 return nil, nil 901 } 902 903 return obj, err 904} 905 906func (store S3Store) putIncompletePartForUpload(ctx context.Context, uploadId string, file *os.File) error { 907 defer cleanUpTempFile(file) 908 909 _, err := store.Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ 910 Bucket: aws.String(store.Bucket), 911 Key: store.metadataKeyWithPrefix(uploadId + ".part"), 912 Body: file, 913 }) 914 return err 915} 916 917func (store S3Store) deleteIncompletePartForUpload(ctx context.Context, uploadId string) error { 918 _, err := store.Service.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{ 919 Bucket: aws.String(store.Bucket), 920 Key: store.metadataKeyWithPrefix(uploadId + ".part"), 921 }) 922 return err 923} 924 925func splitIds(id string) (uploadId, multipartId string) { 926 index := strings.Index(id, "+") 927 if index == -1 { 928 return 929 } 930 931 uploadId = id[:index] 932 multipartId = id[index+1:] 933 return 934} 935 936// isAwsError tests whether an error object is an instance of the AWS error 937// specified by its code. 938func isAwsError(err error, code string) bool { 939 if err, ok := err.(awserr.Error); ok && err.Code() == code { 940 return true 941 } 942 return false 943} 944 945func (store S3Store) calcOptimalPartSize(size int64) (optimalPartSize int64, err error) { 946 switch { 947 // When upload is smaller or equal to PreferredPartSize, we upload in just one part. 948 case size <= store.PreferredPartSize: 949 optimalPartSize = store.PreferredPartSize 950 // Does the upload fit in MaxMultipartParts parts or less with PreferredPartSize. 951 case size <= store.PreferredPartSize*store.MaxMultipartParts: 952 optimalPartSize = store.PreferredPartSize 953 // Prerequisite: Be aware, that the result of an integer division (x/y) is 954 // ALWAYS rounded DOWN, as there are no digits behind the comma. 955 // In order to find out, whether we have an exact result or a rounded down 956 // one, we can check, whether the remainder of that division is 0 (x%y == 0). 957 // 958 // So if the result of (size/MaxMultipartParts) is not a rounded down value, 959 // then we can use it as our optimalPartSize. But if this division produces a 960 // remainder, we have to round up the result by adding +1. Otherwise our 961 // upload would not fit into MaxMultipartParts number of parts with that 962 // size. We would need an additional part in order to upload everything. 963 // While in almost all cases, we could skip the check for the remainder and 964 // just add +1 to every result, but there is one case, where doing that would 965 // doom our upload. When (MaxObjectSize == MaxPartSize * MaxMultipartParts), 966 // by adding +1, we would end up with an optimalPartSize > MaxPartSize. 967 // With the current S3 API specifications, we will not run into this problem, 968 // but these specs are subject to change, and there are other stores as well, 969 // which are implementing the S3 API (e.g. RIAK, Ceph RadosGW), but might 970 // have different settings. 971 case size%store.MaxMultipartParts == 0: 972 optimalPartSize = size / store.MaxMultipartParts 973 // Having a remainder larger than 0 means, the float result would have 974 // digits after the comma (e.g. be something like 10.9). As a result, we can 975 // only squeeze our upload into MaxMultipartParts parts, if we rounded UP 976 // this division's result. That is what is happending here. We round up by 977 // adding +1, if the prior test for (remainder == 0) did not succeed. 978 default: 979 optimalPartSize = size/store.MaxMultipartParts + 1 980 } 981 982 // optimalPartSize must never exceed MaxPartSize 983 if optimalPartSize > store.MaxPartSize { 984 return optimalPartSize, fmt.Errorf("calcOptimalPartSize: to upload %v bytes optimalPartSize %v must exceed MaxPartSize %v", size, optimalPartSize, store.MaxPartSize) 985 } 986 return optimalPartSize, nil 987} 988 989func (store S3Store) keyWithPrefix(key string) *string { 990 prefix := store.ObjectPrefix 991 if prefix != "" && !strings.HasSuffix(prefix, "/") { 992 prefix += "/" 993 } 994 995 return aws.String(prefix + key) 996} 997 998func (store S3Store) metadataKeyWithPrefix(key string) *string { 999 prefix := store.MetadataObjectPrefix 1000 if prefix == "" { 1001 prefix = store.ObjectPrefix 1002 } 1003 if prefix != "" && !strings.HasSuffix(prefix, "/") { 1004 prefix += "/" 1005 } 1006 1007 return aws.String(prefix + key) 1008} 1009