1# frozen_string_literal: true 2 3require 'fog/aws' 4require 'carrierwave/storage/fog' 5 6# 7# This concern should add object storage support 8# to the GitlabUploader class 9# 10module ObjectStorage 11 RemoteStoreError = Class.new(StandardError) 12 UnknownStoreError = Class.new(StandardError) 13 ObjectStorageUnavailable = Class.new(StandardError) 14 15 class ExclusiveLeaseTaken < StandardError 16 def initialize(lease_key) 17 @lease_key = lease_key 18 end 19 20 def message 21 *lease_key_group, _ = *@lease_key.split(":") 22 "Exclusive lease for #{lease_key_group.join(':')} is already taken." 23 end 24 end 25 26 TMP_UPLOAD_PATH = 'tmp/uploads' 27 28 module Store 29 LOCAL = 1 30 REMOTE = 2 31 end 32 33 SUPPORTED_STORES = [Store::LOCAL, Store::REMOTE].freeze 34 35 module Extension 36 # this extension is the glue between the ObjectStorage::Concern and RecordsUploads::Concern 37 module RecordsUploads 38 extend ActiveSupport::Concern 39 40 prepended do |base| 41 raise "#{base} must include ObjectStorage::Concern to use extensions." unless base < Concern 42 43 base.include(::RecordsUploads::Concern) 44 end 45 46 def retrieve_from_store!(identifier) 47 paths = upload_paths(identifier) 48 49 unless current_upload_satisfies?(paths, model) 50 # the upload we already have isn't right, find the correct one 51 self.upload = model&.retrieve_upload(identifier, paths) 52 end 53 54 super 55 end 56 57 def build_upload 58 super.tap do |upload| 59 upload.store = object_store 60 end 61 end 62 63 def upload=(upload) 64 return if upload.nil? 65 66 self.object_store = upload.store 67 super 68 end 69 70 def schedule_background_upload(*args) 71 return unless schedule_background_upload? 72 return unless upload 73 74 ObjectStorage::BackgroundMoveWorker.perform_async(self.class.name, 75 upload.class.to_s, 76 mounted_as, 77 upload.id) 78 end 79 80 def exclusive_lease_key 81 # For FileUploaders, model may have many uploaders. In that case 82 # we want to use exclusive key per upload, not per model to allow 83 # parallel migration 84 key_object = upload || model 85 86 "object_storage_migrate:#{key_object.class}:#{key_object.id}" 87 end 88 89 private 90 91 def current_upload_satisfies?(paths, model) 92 return false unless upload 93 return false unless model 94 95 paths.include?(upload.path) && 96 upload.model_id == model.id && 97 upload.model_type == model.class.base_class.sti_name 98 end 99 end 100 end 101 102 # Add support for automatic background uploading after the file is stored. 103 # 104 module BackgroundMove 105 extend ActiveSupport::Concern 106 107 def background_upload(mount_points = []) 108 return unless mount_points.any? 109 110 run_after_commit do 111 mount_points.each { |mount| send(mount).schedule_background_upload } # rubocop:disable GitlabSecurity/PublicSend 112 end 113 end 114 115 def changed_mounts 116 self.class.uploaders.select do |mount, uploader_class| 117 mounted_as = uploader_class.serialization_column(self.class, mount) 118 uploader = send(:"#{mounted_as}") # rubocop:disable GitlabSecurity/PublicSend 119 120 next unless uploader 121 next unless uploader.exists? 122 next unless send(:"saved_change_to_#{mounted_as}?") # rubocop:disable GitlabSecurity/PublicSend 123 124 mount 125 end.keys 126 end 127 128 included do 129 include AfterCommitQueue 130 after_save do 131 background_upload(changed_mounts) 132 end 133 end 134 end 135 136 module Concern 137 extend ActiveSupport::Concern 138 139 included do |base| 140 base.include(ObjectStorage) 141 142 after :migrate, :delete_migrated_file 143 end 144 145 class_methods do 146 def object_store_options 147 options.object_store 148 end 149 150 def object_store_enabled? 151 object_store_options.enabled 152 end 153 154 def direct_upload_enabled? 155 object_store_options&.direct_upload 156 end 157 158 def background_upload_enabled? 159 object_store_options.background_upload 160 end 161 162 def proxy_download_enabled? 163 object_store_options.proxy_download 164 end 165 166 def direct_download_enabled? 167 !proxy_download_enabled? 168 end 169 170 def object_store_credentials 171 object_store_options.connection.to_hash.deep_symbolize_keys 172 end 173 174 def remote_store_path 175 object_store_options.remote_directory 176 end 177 178 def serialization_column(model_class, mount_point) 179 model_class.uploader_options.dig(mount_point, :mount_on) || mount_point 180 end 181 182 def workhorse_authorize(has_length:, maximum_size: nil) 183 {}.tap do |hash| 184 if self.object_store_enabled? && self.direct_upload_enabled? 185 hash[:RemoteObject] = workhorse_remote_upload_options(has_length: has_length, maximum_size: maximum_size) 186 else 187 hash[:TempPath] = workhorse_local_upload_path 188 end 189 190 hash[:MaximumSize] = maximum_size if maximum_size.present? 191 end 192 end 193 194 def workhorse_local_upload_path 195 File.join(self.root, TMP_UPLOAD_PATH) 196 end 197 198 def object_store_config 199 ObjectStorage::Config.new(object_store_options) 200 end 201 202 def workhorse_remote_upload_options(has_length:, maximum_size: nil) 203 return unless self.object_store_enabled? 204 return unless self.direct_upload_enabled? 205 206 id = [CarrierWave.generate_cache_id, SecureRandom.hex].join('-') 207 upload_path = File.join(TMP_UPLOAD_PATH, id) 208 direct_upload = ObjectStorage::DirectUpload.new(self.object_store_config, upload_path, 209 has_length: has_length, maximum_size: maximum_size) 210 211 direct_upload.to_hash.merge(ID: id) 212 end 213 end 214 215 class OpenFile 216 extend Forwardable 217 218 # Explicitly exclude :path, because rubyzip uses that to detect "real" files. 219 def_delegators :@file, *(Zip::File::IO_METHODS - [:path]) 220 221 # Even though :size is not in IO_METHODS, we do need it. 222 def_delegators :@file, :size 223 224 def initialize(file) 225 @file = file 226 end 227 end 228 229 # allow to configure and overwrite the filename 230 def filename 231 @filename || super || file&.filename # rubocop:disable Gitlab/ModuleWithInstanceVariables 232 end 233 234 def filename=(filename) 235 @filename = filename # rubocop:disable Gitlab/ModuleWithInstanceVariables 236 end 237 238 def file_storage? 239 storage.is_a?(CarrierWave::Storage::File) 240 end 241 242 def file_cache_storage? 243 cache_storage.is_a?(CarrierWave::Storage::File) 244 end 245 246 def object_store 247 # We use Store::LOCAL as null value indicates the local storage 248 @object_store ||= model.try(store_serialization_column) || Store::LOCAL 249 end 250 251 # rubocop:disable Gitlab/ModuleWithInstanceVariables 252 def object_store=(value) 253 @object_store = value || Store::LOCAL 254 @storage = storage_for(object_store) 255 end 256 # rubocop:enable Gitlab/ModuleWithInstanceVariables 257 258 # Return true if the current file is part or the model (i.e. is mounted in the model) 259 # 260 def persist_object_store? 261 model.respond_to?(:"#{store_serialization_column}=") 262 end 263 264 # Save the current @object_store to the model <mounted_as>_store column 265 def persist_object_store! 266 return unless persist_object_store? 267 268 updated = model.update_column(store_serialization_column, object_store) 269 raise 'Failed to update object store' unless updated 270 end 271 272 def use_file(&blk) 273 with_exclusive_lease do 274 unsafe_use_file(&blk) 275 end 276 end 277 278 def use_open_file(&blk) 279 Tempfile.open(path) do |file| 280 file.unlink 281 file.binmode 282 283 if file_storage? 284 IO.copy_stream(path, file) 285 else 286 Faraday.get(url) do |req| 287 req.options.on_data = proc { |chunk, _| file.write(chunk) } 288 end 289 end 290 291 file.seek(0, IO::SEEK_SET) 292 293 yield OpenFile.new(file) 294 end 295 end 296 297 # 298 # Move the file to another store 299 # 300 # new_store: Enum (Store::LOCAL, Store::REMOTE) 301 # 302 def migrate!(new_store) 303 with_exclusive_lease do 304 unsafe_migrate!(new_store) 305 end 306 end 307 308 def schedule_background_upload(*args) 309 return unless schedule_background_upload? 310 311 ObjectStorage::BackgroundMoveWorker.perform_async(self.class.name, 312 model.class.name, 313 mounted_as, 314 model.id) 315 end 316 317 def fog_directory 318 self.class.remote_store_path 319 end 320 321 def fog_credentials 322 self.class.object_store_credentials 323 end 324 325 def fog_attributes 326 @fog_attributes ||= self.class.object_store_config.fog_attributes 327 end 328 329 # Set ACL of uploaded objects to not-public (fog-aws)[1] or no ACL at all 330 # (fog-google). Value is ignored by other supported backends (fog-aliyun, 331 # fog-openstack, fog-rackspace) 332 # [1]: https://github.com/fog/fog-aws/blob/daa50bb3717a462baf4d04d0e0cbfc18baacb541/lib/fog/aws/models/storage/file.rb#L152-L159 333 def fog_public 334 nil 335 end 336 337 def delete_migrated_file(migrated_file) 338 migrated_file.delete 339 end 340 341 def exists? 342 file.present? 343 end 344 345 def store_dir(store = nil) 346 store_dirs[store || object_store] 347 end 348 349 def store_dirs 350 { 351 Store::LOCAL => File.join(base_dir, dynamic_segment), 352 Store::REMOTE => File.join(dynamic_segment) 353 } 354 end 355 356 # Returns all the possible paths for an upload. 357 # the `upload.path` is a lookup parameter, and it may change 358 # depending on the `store` param. 359 def upload_paths(identifier) 360 store_dirs.map { |store, path| File.join(path, identifier) } 361 end 362 363 def cache!(new_file = sanitized_file) 364 # We intercept ::UploadedFile which might be stored on remote storage 365 # We use that for "accelerated" uploads, where we store result on remote storage 366 if new_file.is_a?(::UploadedFile) && new_file.remote_id.present? 367 return cache_remote_file!(new_file.remote_id, new_file.original_filename) 368 end 369 370 super 371 end 372 373 def store!(new_file = nil) 374 # when direct upload is enabled, always store on remote storage 375 if self.class.object_store_enabled? && self.class.direct_upload_enabled? 376 self.object_store = Store::REMOTE 377 end 378 379 super 380 end 381 382 def exclusive_lease_key 383 "object_storage_migrate:#{model.class}:#{model.id}" 384 end 385 386 private 387 388 def schedule_background_upload? 389 self.class.object_store_enabled? && 390 self.class.background_upload_enabled? && 391 self.file_storage? 392 end 393 394 def cache_remote_file!(remote_object_id, original_filename) 395 file_path = File.join(TMP_UPLOAD_PATH, remote_object_id) 396 file_path = Pathname.new(file_path).cleanpath.to_s 397 raise RemoteStoreError, 'Bad file path' unless file_path.start_with?(TMP_UPLOAD_PATH + '/') 398 399 # TODO: 400 # This should be changed to make use of `tmp/cache` mechanism 401 # instead of using custom upload directory, 402 # using tmp/cache makes this implementation way easier than it is today 403 CarrierWave::Storage::Fog::File.new(self, storage_for(Store::REMOTE), file_path).tap do |file| 404 raise RemoteStoreError, 'Missing file' unless file.exists? 405 406 # Remote stored file, we force to store on remote storage 407 self.object_store = Store::REMOTE 408 409 # TODO: 410 # We store file internally and force it to be considered as `cached` 411 # This makes CarrierWave to store file in permament location (copy/delete) 412 # once this object is saved, but not sooner 413 @cache_id = "force-to-use-cache" # rubocop:disable Gitlab/ModuleWithInstanceVariables 414 @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables 415 @filename = original_filename # rubocop:disable Gitlab/ModuleWithInstanceVariables 416 end 417 end 418 419 # this is a hack around CarrierWave. The #migrate method needs to be 420 # able to force the current file to the migrated file upon success. 421 def file=(file) 422 @file = file # rubocop:disable Gitlab/ModuleWithInstanceVariables 423 end 424 425 def serialization_column 426 self.class.serialization_column(model.class, mounted_as) 427 end 428 429 # Returns the column where the 'store' is saved 430 # defaults to 'store' 431 def store_serialization_column 432 [serialization_column, 'store'].compact.join('_').to_sym 433 end 434 435 def storage 436 @storage ||= storage_for(object_store) 437 end 438 439 def storage_for(store) 440 case store 441 when Store::REMOTE 442 raise "Object Storage is not enabled for #{self.class}" unless self.class.object_store_enabled? 443 444 CarrierWave::Storage::Fog.new(self) 445 when Store::LOCAL 446 CarrierWave::Storage::File.new(self) 447 else 448 raise UnknownStoreError 449 end 450 end 451 452 def with_exclusive_lease 453 lease_key = exclusive_lease_key 454 uuid = Gitlab::ExclusiveLease.new(lease_key, timeout: 1.hour.to_i).try_obtain 455 raise ExclusiveLeaseTaken, lease_key unless uuid 456 457 yield uuid 458 ensure 459 Gitlab::ExclusiveLease.cancel(lease_key, uuid) 460 end 461 462 # 463 # Move the file to another store 464 # 465 # new_store: Enum (Store::LOCAL, Store::REMOTE) 466 # 467 def unsafe_migrate!(new_store) 468 return unless object_store != new_store 469 return unless file 470 471 new_file = nil 472 file_to_delete = file 473 from_object_store = object_store 474 self.object_store = new_store # changes the storage and file 475 476 cache_stored_file! if file_storage? 477 478 with_callbacks(:migrate, file_to_delete) do 479 with_callbacks(:store, file_to_delete) do # for #store_versions! 480 new_file = storage.store!(file) 481 persist_object_store! 482 self.file = new_file 483 end 484 end 485 486 file 487 rescue StandardError => e 488 # in case of failure delete new file 489 new_file.delete unless new_file.nil? 490 # revert back to the old file 491 self.object_store = from_object_store 492 self.file = file_to_delete 493 raise e 494 end 495 end 496 497 def unsafe_use_file 498 if file_storage? 499 return yield path 500 end 501 502 begin 503 cache_stored_file! 504 yield cache_path 505 ensure 506 FileUtils.rm_f(cache_path) 507 cache_storage.delete_dir!(cache_path(nil)) 508 end 509 end 510end 511 512ObjectStorage::Concern.include_mod_with('ObjectStorage::Concern') 513