1# frozen_string_literal: true 2 3module ObjectStorage 4 # 5 # The DirectUpload class generates a set of presigned URLs 6 # that can be used to upload data to object storage from untrusted component: Workhorse, Runner? 7 # 8 # For Google it assumes that the platform supports variable Content-Length. 9 # 10 # For AWS it initiates Multipart Upload and presignes a set of part uploads. 11 # Class calculates the best part size to be able to upload up to asked maximum size. 12 # The number of generated parts will never go above 100, 13 # but we will always try to reduce amount of generated parts. 14 # The part size is rounded-up to 5MB. 15 # 16 class DirectUpload 17 include Gitlab::Utils::StrongMemoize 18 19 TIMEOUT = 4.hours 20 EXPIRE_OFFSET = 15.minutes 21 22 MAXIMUM_MULTIPART_PARTS = 100 23 MINIMUM_MULTIPART_SIZE = 5.megabytes 24 25 attr_reader :config, :credentials, :bucket_name, :object_name 26 attr_reader :has_length, :maximum_size 27 28 def initialize(config, object_name, has_length:, maximum_size: nil) 29 unless has_length 30 raise ArgumentError, 'maximum_size has to be specified if length is unknown' unless maximum_size 31 end 32 33 @config = config 34 @credentials = config.credentials 35 @bucket_name = config.bucket 36 @object_name = object_name 37 @has_length = has_length 38 @maximum_size = maximum_size 39 end 40 41 def to_hash 42 { 43 Timeout: TIMEOUT, 44 GetURL: get_url, 45 StoreURL: store_url, 46 DeleteURL: delete_url, 47 MultipartUpload: multipart_upload_hash, 48 CustomPutHeaders: true, 49 PutHeaders: upload_options 50 }.merge(workhorse_client_hash).compact 51 end 52 53 def multipart_upload_hash 54 return unless requires_multipart_upload? 55 56 { 57 PartSize: rounded_multipart_part_size, 58 PartURLs: multipart_part_urls, 59 CompleteURL: multipart_complete_url, 60 AbortURL: multipart_abort_url 61 } 62 end 63 64 def workhorse_client_hash 65 if config.aws? 66 workhorse_aws_hash 67 elsif config.azure? 68 workhorse_azure_hash 69 else 70 {} 71 end 72 end 73 74 def workhorse_aws_hash 75 { 76 UseWorkhorseClient: use_workhorse_s3_client?, 77 RemoteTempObjectID: object_name, 78 ObjectStorage: { 79 Provider: 'AWS', 80 S3Config: { 81 Bucket: bucket_name, 82 Region: credentials[:region] || ::Fog::AWS::Storage::DEFAULT_REGION, 83 Endpoint: credentials[:endpoint], 84 PathStyle: config.use_path_style?, 85 UseIamProfile: config.use_iam_profile?, 86 ServerSideEncryption: config.server_side_encryption, 87 SSEKMSKeyID: config.server_side_encryption_kms_key_id 88 }.compact 89 } 90 } 91 end 92 93 def workhorse_azure_hash 94 { 95 # Azure requires Workhorse client because direct uploads can't 96 # use pre-signed URLs without buffering the whole file to disk. 97 UseWorkhorseClient: true, 98 RemoteTempObjectID: object_name, 99 ObjectStorage: { 100 Provider: 'AzureRM', 101 GoCloudConfig: { 102 URL: azure_gocloud_url 103 } 104 } 105 } 106 end 107 108 def azure_gocloud_url 109 url = "azblob://#{bucket_name}" 110 url += "?domain=#{config.azure_storage_domain}" if config.azure_storage_domain.present? 111 url 112 end 113 114 def use_workhorse_s3_client? 115 return false unless config.use_iam_profile? || config.consolidated_settings? 116 # The Golang AWS SDK does not support V2 signatures 117 return false unless credentials.fetch(:aws_signature_version, 4).to_i >= 4 118 119 true 120 end 121 122 def provider 123 credentials[:provider].to_s 124 end 125 126 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html 127 def get_url 128 if config.google? 129 connection.get_object_https_url(bucket_name, object_name, expire_at) 130 else 131 connection.get_object_url(bucket_name, object_name, expire_at) 132 end 133 end 134 135 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html 136 def delete_url 137 connection.delete_object_url(bucket_name, object_name, expire_at) 138 end 139 140 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html 141 def store_url 142 connection.put_object_url(bucket_name, object_name, expire_at, upload_options) 143 end 144 145 def multipart_part_urls 146 Array.new(number_of_multipart_parts) do |part_index| 147 multipart_part_upload_url(part_index + 1) 148 end 149 end 150 151 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html 152 def multipart_part_upload_url(part_number) 153 connection.signed_url({ 154 method: 'PUT', 155 bucket_name: bucket_name, 156 object_name: object_name, 157 query: { 'uploadId' => upload_id, 'partNumber' => part_number }, 158 headers: upload_options 159 }, expire_at) 160 end 161 162 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html 163 def multipart_complete_url 164 connection.signed_url({ 165 method: 'POST', 166 bucket_name: bucket_name, 167 object_name: object_name, 168 query: { 'uploadId' => upload_id }, 169 headers: { 'Content-Type' => 'application/xml' } 170 }, expire_at) 171 end 172 173 # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadAbort.html 174 def multipart_abort_url 175 connection.signed_url({ 176 method: 'DELETE', 177 bucket_name: bucket_name, 178 object_name: object_name, 179 query: { 'uploadId' => upload_id } 180 }, expire_at) 181 end 182 183 private 184 185 def rounded_multipart_part_size 186 # round multipart_part_size up to minimum_multipart_size 187 (multipart_part_size + MINIMUM_MULTIPART_SIZE - 1) / MINIMUM_MULTIPART_SIZE * MINIMUM_MULTIPART_SIZE 188 end 189 190 def multipart_part_size 191 return MINIMUM_MULTIPART_SIZE if maximum_size == 0 192 193 maximum_size / number_of_multipart_parts 194 end 195 196 def number_of_multipart_parts 197 # If we don't have max length, we can only assume the file is as large as possible. 198 return MAXIMUM_MULTIPART_PARTS if maximum_size == 0 199 200 [ 201 # round maximum_size up to minimum_mulitpart_size 202 (maximum_size + MINIMUM_MULTIPART_SIZE - 1) / MINIMUM_MULTIPART_SIZE, 203 MAXIMUM_MULTIPART_PARTS 204 ].min 205 end 206 207 def requires_multipart_upload? 208 config.aws? && !has_length 209 end 210 211 def upload_id 212 return unless requires_multipart_upload? 213 214 strong_memoize(:upload_id) do 215 new_upload = connection.initiate_multipart_upload(bucket_name, object_name, config.fog_attributes) 216 new_upload.body["UploadId"] 217 end 218 end 219 220 def expire_at 221 strong_memoize(:expire_at) do 222 Time.now + TIMEOUT + EXPIRE_OFFSET 223 end 224 end 225 226 def upload_options 227 {} 228 end 229 230 def connection 231 @connection ||= ::Fog::Storage.new(credentials) 232 end 233 end 234end 235