1// Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved. 2 3// Package transfer simplifies interaction with the Object Storage service by abstracting away the method used 4// to upload objects. Depending on the configuration parameters, UploadManager may choose to do a single 5// put_object request, or break up the upload into multiple parts and utilize multi-part uploads. 6// 7// An advantage of using multi-part uploads is the ability to retry individual failed parts, as well as being 8// able to upload parts in parallel to reduce upload time. 9// 10// To use this package, you must be authorized in an IAM policy. If you're not authorized, talk to an administrator. 11package transfer 12 13import ( 14 "context" 15 "errors" 16 "math" 17 "os" 18 "strings" 19 "time" 20 21 "github.com/oracle/oci-go-sdk/common" 22) 23 24// UploadManager is the interface that groups the upload methods 25type UploadManager struct { 26 FileUploader FileUploader 27 StreamUploader StreamUploader 28} 29 30var ( 31 errorInvalidStreamUploader = errors.New("streamUploader is required, use NewUploadManager for default implementation") 32 errorInvalidFileUploader = errors.New("fileUploader is required, use NewUploadManager for default implementation") 33) 34 35// NewUploadManager return a pointer to UploadManager 36func NewUploadManager() *UploadManager { 37 return &UploadManager{ 38 FileUploader: &fileUpload{multipartUploader: &multipartUpload{}}, 39 StreamUploader: &streamUpload{multipartUploader: &multipartUpload{}}, 40 } 41} 42 43// UploadFile uploads an object to Object Storage. Depending on the options provided and the 44// size of the object, the object may be uploaded in multiple parts or just an signle object. 45func (uploadManager *UploadManager) UploadFile(ctx context.Context, request UploadFileRequest) (response UploadResponse, err error) { 46 if err = request.validate(); err != nil { 47 return 48 } 49 50 if err = request.initDefaultValues(); err != nil { 51 return 52 } 53 54 if uploadManager.FileUploader == nil { 55 err = errorInvalidFileUploader 56 return 57 } 58 59 file, err := os.Open(request.FilePath) 60 defer file.Close() 61 62 if err != nil { 63 return 64 } 65 66 fi, err := file.Stat() 67 if err != nil { 68 return 69 } 70 71 fileSize := fi.Size() 72 73 // parrallel upload disabled by user or the file size smaller than or equal to partSize 74 // use UploadFilePutObject 75 if !*request.AllowMultipartUploads || 76 fileSize <= *request.PartSize { 77 response, err = uploadManager.FileUploader.UploadFilePutObject(ctx, request) 78 return 79 } 80 81 response, err = uploadManager.FileUploader.UploadFileMultiparts(ctx, request) 82 return 83} 84 85// ResumeUploadFile resumes a multipart file upload. 86func (uploadManager *UploadManager) ResumeUploadFile(ctx context.Context, uploadID string) (response UploadResponse, err error) { 87 if len(strings.TrimSpace(uploadID)) == 0 { 88 err = errors.New("uploadID is required to resume a multipart file upload") 89 return 90 } 91 response, err = uploadManager.FileUploader.ResumeUploadFile(ctx, uploadID) 92 return 93} 94 95// UploadStream uploads streaming data to Object Storage. If the stream is non-empty, this will always perform a 96// multipart upload, splitting parts based on the part size (10 MiB if none specified). If the stream is empty, 97// this will upload a single empty object to Object Storage. 98// Stream uploads are not currently resumable. 99func (uploadManager *UploadManager) UploadStream(ctx context.Context, request UploadStreamRequest) (response UploadResponse, err error) { 100 if err = request.validate(); err != nil { 101 return 102 } 103 104 if err = request.initDefaultValues(); err != nil { 105 return 106 } 107 108 if uploadManager.StreamUploader == nil { 109 err = errorInvalidStreamUploader 110 return 111 } 112 113 response, err = uploadManager.StreamUploader.UploadStream(ctx, request) 114 return 115} 116 117func getUploadManagerRetryPolicy() *common.RetryPolicy { 118 attempts := uint(3) 119 retryOnAllNon200ResponseCodes := func(r common.OCIOperationResponse) bool { 120 return !(r.Error == nil && 199 < r.Response.HTTPResponse().StatusCode && r.Response.HTTPResponse().StatusCode < 300) 121 } 122 123 exponentialBackoff := func(r common.OCIOperationResponse) time.Duration { 124 return time.Duration(math.Pow(float64(2), float64(r.AttemptNumber-1))) * time.Second 125 } 126 policy := common.NewRetryPolicy(attempts, retryOnAllNon200ResponseCodes, exponentialBackoff) 127 128 return &policy 129} 130