1# frozen_string_literal: true
2
3require 'fog/aws'
4require 'carrierwave/storage/fog'
5
6#
7# This concern should add object storage support
8# to the GitlabUploader class
9#
10module ObjectStorage
11  RemoteStoreError = Class.new(StandardError)
12  UnknownStoreError = Class.new(StandardError)
13  ObjectStorageUnavailable = Class.new(StandardError)
14
15  class ExclusiveLeaseTaken < StandardError
16    def initialize(lease_key)
17      @lease_key = lease_key
18    end
19
20    def message
21      *lease_key_group, _ = *@lease_key.split(":")
22      "Exclusive lease for #{lease_key_group.join(':')} is already taken."
23    end
24  end
25
26  TMP_UPLOAD_PATH = 'tmp/uploads'
27
28  module Store
29    LOCAL = 1
30    REMOTE = 2
31  end
32
33  SUPPORTED_STORES = [Store::LOCAL, Store::REMOTE].freeze
34
35  module Extension
36    # this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern
37    module RecordsUploads
38      extend ActiveSupport::Concern
39
40      prepended do |base|
41        raise "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern
42
43        base.include(::RecordsUploads::Concern)
44      end
45
46      def retrieve_from_store!(identifier)
47        paths = upload_paths(identifier)
48
49        unless current_upload_satisfies?(paths, model)
50          # the upload we already have isn't right, find the correct one
51          self.upload = model&.retrieve_upload(identifier, paths)
52        end
53
54        super
55      end
56
57      def build_upload
58        super.tap do |upload|
59          upload.store = object_store
60        end
61      end
62
63      def upload=(upload)
64        return if upload.nil?
65
66        self.object_store = upload.store
67        super
68      end
69
70      def schedule_background_upload(*args)
71        return unless schedule_background_upload?
72        return unless upload
73
74        ObjectStorage::BackgroundMoveWorker.perform_async(self.class.name,
75                                                upload.class.to_s,
76                                                mounted_as,
77                                                upload.id)
78      end
79
80      def exclusive_lease_key
81        # For FileUploaders, model may have many uploaders. In that case
82        # we want to use exclusive key per upload, not per model to allow
83        # parallel migration
84        key_object = upload || model
85
86        "object_storage_migrate:#{key_object.class}:#{key_object.id}"
87      end
88
89      private
90
91      def current_upload_satisfies?(paths, model)
92        return false unless upload
93        return false unless model
94
95        paths.include?(upload.path) &&
96          upload.model_id == model.id &&
97          upload.model_type == model.class.base_class.sti_name
98      end
99    end
100  end
101
102  # Add support for automatic background uploading after the file is stored.
103  #
104  module BackgroundMove
105    extend ActiveSupport::Concern
106
107    def background_upload(mount_points = [])
108      return unless mount_points.any?
109
110      run_after_commit do
111        mount_points.each { |mount| send(mount).schedule_background_upload } # rubocop:disable GitlabSecurity/PublicSend
112      end
113    end
114
115    def changed_mounts
116      self.class.uploaders.select do |mount, uploader_class|
117        mounted_as = uploader_class.serialization_column(self.class, mount)
118        uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend
119
120        next unless uploader
121        next unless uploader.exists?
122        next unless send(:"saved_change_to_#{mounted_as}?") # rubocop:disable GitlabSecurity/PublicSend
123
124        mount
125      end.keys
126    end
127
128    included do
129      include AfterCommitQueue
130      after_save do
131        background_upload(changed_mounts)
132      end
133    end
134  end
135
136  module Concern
137    extend ActiveSupport::Concern
138
139    included do |base|
140      base.include(ObjectStorage)
141
142      after :migrate, :delete_migrated_file
143    end
144
145    class_methods do
146      def object_store_options
147        options.object_store
148      end
149
150      def object_store_enabled?
151        object_store_options.enabled
152      end
153
154      def direct_upload_enabled?
155        object_store_options&.direct_upload
156      end
157
158      def background_upload_enabled?
159        object_store_options.background_upload
160      end
161
162      def proxy_download_enabled?
163        object_store_options.proxy_download
164      end
165
166      def direct_download_enabled?
167        !proxy_download_enabled?
168      end
169
170      def object_store_credentials
171        object_store_options.connection.to_hash.deep_symbolize_keys
172      end
173
174      def remote_store_path
175        object_store_options.remote_directory
176      end
177
178      def serialization_column(model_class, mount_point)
179        model_class.uploader_options.dig(mount_point, :mount_on) || mount_point
180      end
181
182      def workhorse_authorize(has_length:, maximum_size: nil)
183        {}.tap do |hash|
184          if self.object_store_enabled? && self.direct_upload_enabled?
185            hash[:RemoteObject] = workhorse_remote_upload_options(has_length: has_length, maximum_size: maximum_size)
186          else
187            hash[:TempPath] = workhorse_local_upload_path
188          end
189
190          hash[:MaximumSize] = maximum_size if maximum_size.present?
191        end
192      end
193
194      def workhorse_local_upload_path
195        File.join(self.root, TMP_UPLOAD_PATH)
196      end
197
198      def object_store_config
199        ObjectStorage::Config.new(object_store_options)
200      end
201
202      def workhorse_remote_upload_options(has_length:, maximum_size: nil)
203        return unless self.object_store_enabled?
204        return unless self.direct_upload_enabled?
205
206        id = [CarrierWave.generate_cache_id, SecureRandom.hex].join('-')
207        upload_path = File.join(TMP_UPLOAD_PATH, id)
208        direct_upload = ObjectStorage::DirectUpload.new(self.object_store_config, upload_path,
209          has_length: has_length, maximum_size: maximum_size)
210
211        direct_upload.to_hash.merge(ID: id)
212      end
213    end
214
215    class OpenFile
216      extend Forwardable
217
218      # Explicitly exclude :path, because rubyzip uses that to detect "real" files.
219      def_delegators :@file, *(Zip::File::IO_METHODS - [:path])
220
221      # Even though :size is not in IO_METHODS, we do need it.
222      def_delegators :@file, :size
223
224      def initialize(file)
225        @file = file
226      end
227    end
228
229    # allow to configure and overwrite the filename
230    def filename
231      @filename || super || file&.filename # rubocop:disable Gitlab/ModuleWithInstanceVariables
232    end
233
234    def filename=(filename)
235      @filename = filename # rubocop:disable Gitlab/ModuleWithInstanceVariables
236    end
237
238    def file_storage?
239      storage.is_a?(CarrierWave::Storage::File)
240    end
241
242    def file_cache_storage?
243      cache_storage.is_a?(CarrierWave::Storage::File)
244    end
245
246    def object_store
247      # We use Store::LOCAL as null value indicates the local storage
248      @object_store ||= model.try(store_serialization_column) || Store::LOCAL
249    end
250
251    # rubocop:disable Gitlab/ModuleWithInstanceVariables
252    def object_store=(value)
253      @object_store = value || Store::LOCAL
254      @storage = storage_for(object_store)
255    end
256    # rubocop:enable Gitlab/ModuleWithInstanceVariables
257
258    # Return true if the current file is part or the model (i.e. is mounted in the model)
259    #
260    def persist_object_store?
261      model.respond_to?(:"#{store_serialization_column}=")
262    end
263
264    # Save the current @object_store to the model <mounted_as>_store column
265    def persist_object_store!
266      return unless persist_object_store?
267
268      updated = model.update_column(store_serialization_column, object_store)
269      raise 'Failed to update object store' unless updated
270    end
271
272    def use_file(&blk)
273      with_exclusive_lease do
274        unsafe_use_file(&blk)
275      end
276    end
277
278    def use_open_file(&blk)
279      Tempfile.open(path) do |file|
280        file.unlink
281        file.binmode
282
283        if file_storage?
284          IO.copy_stream(path, file)
285        else
286          Faraday.get(url) do |req|
287            req.options.on_data = proc { |chunk, _| file.write(chunk) }
288          end
289        end
290
291        file.seek(0, IO::SEEK_SET)
292
293        yield OpenFile.new(file)
294      end
295    end
296
297    #
298    # Move the file to another store
299    #
300    #   new_store: Enum (Store::LOCAL, Store::REMOTE)
301    #
302    def migrate!(new_store)
303      with_exclusive_lease do
304        unsafe_migrate!(new_store)
305      end
306    end
307
308    def schedule_background_upload(*args)
309      return unless schedule_background_upload?
310
311      ObjectStorage::BackgroundMoveWorker.perform_async(self.class.name,
312                                                          model.class.name,
313                                                          mounted_as,
314                                                          model.id)
315    end
316
317    def fog_directory
318      self.class.remote_store_path
319    end
320
321    def fog_credentials
322      self.class.object_store_credentials
323    end
324
325    def fog_attributes
326      @fog_attributes ||= self.class.object_store_config.fog_attributes
327    end
328
329    # Set ACL of uploaded objects to not-public (fog-aws)[1] or no ACL at all
330    # (fog-google).  Value is ignored by other supported backends (fog-aliyun,
331    # fog-openstack, fog-rackspace)
332    # [1]: https://github.com/fog/fog-aws/blob/daa50bb3717a462baf4d04d0e0cbfc18baacb541/lib/fog/aws/models/storage/file.rb#L152-L159
333    def fog_public
334      nil
335    end
336
337    def delete_migrated_file(migrated_file)
338      migrated_file.delete
339    end
340
341    def exists?
342      file.present?
343    end
344
345    def store_dir(store = nil)
346      store_dirs[store || object_store]
347    end
348
349    def store_dirs
350      {
351        Store::LOCAL => File.join(base_dir, dynamic_segment),
352        Store::REMOTE => File.join(dynamic_segment)
353      }
354    end
355
356    # Returns all the possible paths for an upload.
357    # the `upload.path` is a lookup parameter, and it may change
358    # depending on the `store` param.
359    def upload_paths(identifier)
360      store_dirs.map { |store, path| File.join(path, identifier) }
361    end
362
363    def cache!(new_file = sanitized_file)
364      # We intercept ::UploadedFile which might be stored on remote storage
365      # We use that for "accelerated" uploads, where we store result on remote storage
366      if new_file.is_a?(::UploadedFile) && new_file.remote_id.present?
367        return cache_remote_file!(new_file.remote_id, new_file.original_filename)
368      end
369
370      super
371    end
372
373    def store!(new_file = nil)
374      # when direct upload is enabled, always store on remote storage
375      if self.class.object_store_enabled? && self.class.direct_upload_enabled?
376        self.object_store = Store::REMOTE
377      end
378
379      super
380    end
381
382    def exclusive_lease_key
383      "object_storage_migrate:#{model.class}:#{model.id}"
384    end
385
386    private
387
388    def schedule_background_upload?
389      self.class.object_store_enabled? &&
390        self.class.background_upload_enabled? &&
391        self.file_storage?
392    end
393
394    def cache_remote_file!(remote_object_id, original_filename)
395      file_path = File.join(TMP_UPLOAD_PATH, remote_object_id)
396      file_path = Pathname.new(file_path).cleanpath.to_s
397      raise RemoteStoreError, 'Bad file path' unless file_path.start_with?(TMP_UPLOAD_PATH + '/')
398
399      # TODO:
400      # This should be changed to make use of `tmp/cache` mechanism
401      # instead of using custom upload directory,
402      # using tmp/cache makes this implementation way easier than it is today
403      CarrierWave::Storage::Fog::File.new(self, storage_for(Store::REMOTE), file_path).tap do |file|
404        raise RemoteStoreError, 'Missing file' unless file.exists?
405
406        # Remote stored file, we force to store on remote storage
407        self.object_store = Store::REMOTE
408
409        # TODO:
410        # We store file internally and force it to be considered as `cached`
411        # This makes CarrierWave to store file in permament location (copy/delete)
412        # once this object is saved, but not sooner
413        @cache_id = "force-to-use-cache" # rubocop:disable Gitlab/ModuleWithInstanceVariables
414        @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables
415        @filename = original_filename # rubocop:disable Gitlab/ModuleWithInstanceVariables
416      end
417    end
418
419    # this is a hack around CarrierWave. The #migrate method needs to be
420    # able to force the current file to the migrated file upon success.
421    def file=(file)
422      @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables
423    end
424
425    def serialization_column
426      self.class.serialization_column(model.class, mounted_as)
427    end
428
429    # Returns the column where the 'store' is saved
430    #   defaults to 'store'
431    def store_serialization_column
432      [serialization_column, 'store'].compact.join('_').to_sym
433    end
434
435    def storage
436      @storage ||= storage_for(object_store)
437    end
438
439    def storage_for(store)
440      case store
441      when Store::REMOTE
442        raise "Object Storage is not enabled for #{self.class}" unless self.class.object_store_enabled?
443
444        CarrierWave::Storage::Fog.new(self)
445      when Store::LOCAL
446        CarrierWave::Storage::File.new(self)
447      else
448        raise UnknownStoreError
449      end
450    end
451
452    def with_exclusive_lease
453      lease_key = exclusive_lease_key
454      uuid = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.hour.to_i).try_obtain
455      raise ExclusiveLeaseTaken, lease_key unless uuid
456
457      yield uuid
458    ensure
459      Gitlab::ExclusiveLease.cancel(lease_key, uuid)
460    end
461
462    #
463    # Move the file to another store
464    #
465    #   new_store: Enum (Store::LOCAL, Store::REMOTE)
466    #
467    def unsafe_migrate!(new_store)
468      return unless object_store != new_store
469      return unless file
470
471      new_file = nil
472      file_to_delete = file
473      from_object_store = object_store
474      self.object_store = new_store # changes the storage and file
475
476      cache_stored_file! if file_storage?
477
478      with_callbacks(:migrate, file_to_delete) do
479        with_callbacks(:store, file_to_delete) do # for #store_versions!
480          new_file = storage.store!(file)
481          persist_object_store!
482          self.file = new_file
483        end
484      end
485
486      file
487    rescue StandardError => e
488      # in case of failure delete new file
489      new_file.delete unless new_file.nil?
490      # revert back to the old file
491      self.object_store = from_object_store
492      self.file = file_to_delete
493      raise e
494    end
495  end
496
497  def unsafe_use_file
498    if file_storage?
499      return yield path
500    end
501
502    begin
503      cache_stored_file!
504      yield cache_path
505    ensure
506      FileUtils.rm_f(cache_path)
507      cache_storage.delete_dir!(cache_path(nil))
508    end
509  end
510end
511
512ObjectStorage::Concern.include_mod_with('ObjectStorage::Concern')
513