1# frozen_string_literal: true
2
3module ObjectStorage
4  #
5  # The DirectUpload class generates a set of presigned URLs
6  # that can be used to upload data to object storage from untrusted component: Workhorse, Runner?
7  #
8  # For Google it assumes that the platform supports variable Content-Length.
9  #
10  # For AWS it initiates Multipart Upload and presignes a set of part uploads.
11  #   Class calculates the best part size to be able to upload up to asked maximum size.
12  #   The number of generated parts will never go above 100,
13  #   but we will always try to reduce amount of generated parts.
14  #   The part size is rounded-up to 5MB.
15  #
16  class DirectUpload
17    include Gitlab::Utils::StrongMemoize
18
19    TIMEOUT = 4.hours
20    EXPIRE_OFFSET = 15.minutes
21
22    MAXIMUM_MULTIPART_PARTS = 100
23    MINIMUM_MULTIPART_SIZE = 5.megabytes
24
25    attr_reader :config, :credentials, :bucket_name, :object_name
26    attr_reader :has_length, :maximum_size
27
28    def initialize(config, object_name, has_length:, maximum_size: nil)
29      unless has_length
30        raise ArgumentError, 'maximum_size has to be specified if length is unknown' unless maximum_size
31      end
32
33      @config = config
34      @credentials = config.credentials
35      @bucket_name = config.bucket
36      @object_name = object_name
37      @has_length = has_length
38      @maximum_size = maximum_size
39    end
40
41    def to_hash
42      {
43        Timeout: TIMEOUT,
44        GetURL: get_url,
45        StoreURL: store_url,
46        DeleteURL: delete_url,
47        MultipartUpload: multipart_upload_hash,
48        CustomPutHeaders: true,
49        PutHeaders: upload_options
50      }.merge(workhorse_client_hash).compact
51    end
52
53    def multipart_upload_hash
54      return unless requires_multipart_upload?
55
56      {
57        PartSize: rounded_multipart_part_size,
58        PartURLs: multipart_part_urls,
59        CompleteURL: multipart_complete_url,
60        AbortURL: multipart_abort_url
61      }
62    end
63
64    def workhorse_client_hash
65      if config.aws?
66        workhorse_aws_hash
67      elsif config.azure?
68        workhorse_azure_hash
69      else
70        {}
71      end
72    end
73
74    def workhorse_aws_hash
75      {
76        UseWorkhorseClient: use_workhorse_s3_client?,
77        RemoteTempObjectID: object_name,
78        ObjectStorage: {
79          Provider: 'AWS',
80          S3Config: {
81            Bucket: bucket_name,
82            Region: credentials[:region] || ::Fog::AWS::Storage::DEFAULT_REGION,
83            Endpoint: credentials[:endpoint],
84            PathStyle: config.use_path_style?,
85            UseIamProfile: config.use_iam_profile?,
86            ServerSideEncryption: config.server_side_encryption,
87            SSEKMSKeyID: config.server_side_encryption_kms_key_id
88          }.compact
89        }
90      }
91    end
92
93    def workhorse_azure_hash
94      {
95        # Azure requires Workhorse client because direct uploads can't
96        # use pre-signed URLs without buffering the whole file to disk.
97        UseWorkhorseClient: true,
98        RemoteTempObjectID: object_name,
99        ObjectStorage: {
100          Provider: 'AzureRM',
101          GoCloudConfig: {
102            URL: azure_gocloud_url
103          }
104        }
105      }
106    end
107
108    def azure_gocloud_url
109      url = "azblob://#{bucket_name}"
110      url += "?domain=#{config.azure_storage_domain}" if config.azure_storage_domain.present?
111      url
112    end
113
114    def use_workhorse_s3_client?
115      return false unless config.use_iam_profile? || config.consolidated_settings?
116      # The Golang AWS SDK does not support V2 signatures
117      return false unless credentials.fetch(:aws_signature_version, 4).to_i >= 4
118
119      true
120    end
121
122    def provider
123      credentials[:provider].to_s
124    end
125
126    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectGET.html
127    def get_url
128      if config.google?
129        connection.get_object_https_url(bucket_name, object_name, expire_at)
130      else
131        connection.get_object_url(bucket_name, object_name, expire_at)
132      end
133    end
134
135    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectDELETE.html
136    def delete_url
137      connection.delete_object_url(bucket_name, object_name, expire_at)
138    end
139
140    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html
141    def store_url
142      connection.put_object_url(bucket_name, object_name, expire_at, upload_options)
143    end
144
145    def multipart_part_urls
146      Array.new(number_of_multipart_parts) do |part_index|
147        multipart_part_upload_url(part_index + 1)
148      end
149    end
150
151    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
152    def multipart_part_upload_url(part_number)
153      connection.signed_url({
154        method: 'PUT',
155        bucket_name: bucket_name,
156        object_name: object_name,
157        query: { 'uploadId' => upload_id, 'partNumber' => part_number },
158        headers: upload_options
159      }, expire_at)
160    end
161
162    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadComplete.html
163    def multipart_complete_url
164      connection.signed_url({
165        method: 'POST',
166        bucket_name: bucket_name,
167        object_name: object_name,
168        query: { 'uploadId' => upload_id },
169        headers: { 'Content-Type' => 'application/xml' }
170      }, expire_at)
171    end
172
173    # Implements https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadAbort.html
174    def multipart_abort_url
175      connection.signed_url({
176        method: 'DELETE',
177        bucket_name: bucket_name,
178        object_name: object_name,
179        query: { 'uploadId' => upload_id }
180      }, expire_at)
181    end
182
183    private
184
185    def rounded_multipart_part_size
186      # round multipart_part_size up to minimum_multipart_size
187      (multipart_part_size + MINIMUM_MULTIPART_SIZE - 1) / MINIMUM_MULTIPART_SIZE * MINIMUM_MULTIPART_SIZE
188    end
189
190    def multipart_part_size
191      return MINIMUM_MULTIPART_SIZE if maximum_size == 0
192
193      maximum_size / number_of_multipart_parts
194    end
195
196    def number_of_multipart_parts
197      # If we don't have max length, we can only assume the file is as large as possible.
198      return MAXIMUM_MULTIPART_PARTS if maximum_size == 0
199
200      [
201        # round maximum_size up to minimum_mulitpart_size
202        (maximum_size + MINIMUM_MULTIPART_SIZE - 1) / MINIMUM_MULTIPART_SIZE,
203        MAXIMUM_MULTIPART_PARTS
204      ].min
205    end
206
207    def requires_multipart_upload?
208      config.aws? && !has_length
209    end
210
211    def upload_id
212      return unless requires_multipart_upload?
213
214      strong_memoize(:upload_id) do
215        new_upload = connection.initiate_multipart_upload(bucket_name, object_name, config.fog_attributes)
216        new_upload.body["UploadId"]
217      end
218    end
219
220    def expire_at
221      strong_memoize(:expire_at) do
222        Time.now + TIMEOUT + EXPIRE_OFFSET
223      end
224    end
225
226    def upload_options
227      {}
228    end
229
230    def connection
231      @connection ||= ::Fog::Storage.new(credentials)
232    end
233  end
234end
235