1# frozen_string_literal: true 2 3## 4# A mixin for ActiveRecord models that enables callers to insert instances of the 5# target class into the database en-bloc via the [bulk_insert] method. 6# 7# Upon inclusion in the target class, the mixin will perform a number of checks to 8# ensure that the target is eligible for bulk insertions. For instance, it must not 9# use ActiveRecord callbacks that fire between [save]s, since these would not run 10# properly when instances are inserted in bulk. 11# 12# The mixin uses ActiveRecord 6's [InsertAll] type internally for bulk insertions. 13# Unlike [InsertAll], however, it requires you to pass instances of the target type 14# rather than row hashes, since it will run validations prior to insertion. 15# 16# @example 17# 18# class MyRecord < ApplicationRecord 19# include BulkInsertSafe # must be included _last_ i.e. after any other concerns 20# end 21# 22# # simple 23# MyRecord.bulk_insert!(items) 24# 25# # with custom batch size 26# MyRecord.bulk_insert!(items, batch_size: 100) 27# 28# # without validations 29# MyRecord.bulk_insert!(items, validate: false) 30# 31# # with attribute hash modification 32# MyRecord.bulk_insert!(items) { |item_attrs| item_attrs['col'] = 42 } 33# 34# 35module BulkInsertSafe 36 extend ActiveSupport::Concern 37 38 # These are the callbacks we think safe when used on models that are 39 # written to the database in bulk 40 ALLOWED_CALLBACKS = Set[ 41 :initialize, 42 :validate, 43 :validation, 44 :find, 45 :destroy 46 ].freeze 47 48 DEFAULT_BATCH_SIZE = 500 49 50 MethodNotAllowedError = Class.new(StandardError) 51 PrimaryKeySetError = Class.new(StandardError) 52 53 class_methods do 54 def insert_all_proxy_class 55 @insert_all_proxy_class ||= Class.new(self) do 56 attr_readonly :created_at 57 end 58 end 59 60 def set_callback(name, *args) 61 unless _bulk_insert_callback_allowed?(name, args) 62 raise MethodNotAllowedError, 63 "Not allowed to call `set_callback(#{name}, #{args})` when model extends `BulkInsertSafe`." \ 64 "Callbacks that fire per each record being inserted do not work with bulk-inserts." 65 end 66 67 super 68 end 69 70 # Inserts the given ActiveRecord [items] to the table mapped to this class. 71 # Items will be inserted in batches of a given size, where insertion semantics are 72 # "atomic across all batches". 73 # 74 # @param [Boolean] validate Whether validations should run on [items] 75 # @param [Integer] batch_size How many items should at most be inserted at once 76 # @param [Boolean] skip_duplicates Marks duplicates as allowed, and skips inserting them 77 # @param [Symbol] returns Pass :ids to return an array with the primary key values 78 # for all inserted records or nil to omit the underlying 79 # RETURNING SQL clause entirely. 80 # @param [Proc] handle_attributes Block that will receive each item attribute hash 81 # prior to insertion for further processing 82 # 83 # Note that this method will throw on the following occasions: 84 # - [PrimaryKeySetError] when primary keys are set on entities prior to insertion 85 # - [ActiveRecord::RecordInvalid] on entity validation failures 86 # - [ActiveRecord::RecordNotUnique] on duplicate key errors 87 # 88 # @return true if operation succeeded, throws otherwise. 89 # 90 def bulk_insert!(items, validate: true, skip_duplicates: false, returns: nil, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes) 91 _bulk_insert_all!(items, 92 validate: validate, 93 on_duplicate: skip_duplicates ? :skip : :raise, 94 returns: returns, 95 unique_by: nil, 96 batch_size: batch_size, 97 &handle_attributes) 98 end 99 100 # Upserts the given ActiveRecord [items] to the table mapped to this class. 101 # Items will be inserted or updated in batches of a given size, 102 # where insertion semantics are "atomic across all batches". 103 # 104 # @param [Boolean] validate Whether validations should run on [items] 105 # @param [Integer] batch_size How many items should at most be inserted at once 106 # @param [Symbol/Array] unique_by Defines index or columns to use to consider item duplicate 107 # @param [Symbol] returns Pass :ids to return an array with the primary key values 108 # for all inserted or updated records or nil to omit the 109 # underlying RETURNING SQL clause entirely. 110 # @param [Proc] handle_attributes Block that will receive each item attribute hash 111 # prior to insertion for further processing 112 # 113 # Unique indexes can be identified by columns or name: 114 # - unique_by: :isbn 115 # - unique_by: %i[ author_id name ] 116 # - unique_by: :index_books_on_isbn 117 # 118 # Note that this method will throw on the following occasions: 119 # - [PrimaryKeySetError] when primary keys are set on entities prior to insertion 120 # - [ActiveRecord::RecordInvalid] on entity validation failures 121 # - [ActiveRecord::RecordNotUnique] on duplicate key errors 122 # 123 # @return true if operation succeeded, throws otherwise. 124 # 125 def bulk_upsert!(items, unique_by:, returns: nil, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes) 126 _bulk_insert_all!(items, 127 validate: validate, 128 on_duplicate: :update, 129 returns: returns, 130 unique_by: unique_by, 131 batch_size: batch_size, 132 &handle_attributes) 133 end 134 135 private 136 137 def _bulk_insert_all!(items, on_duplicate:, returns:, unique_by:, validate:, batch_size:, &handle_attributes) 138 return [] if items.empty? 139 140 returning = 141 case returns 142 when :ids 143 [primary_key] 144 when nil 145 false 146 else 147 returns 148 end 149 150 # Handle insertions for tables with a composite primary key 151 primary_keys = connection.schema_cache.primary_keys(table_name) 152 if unique_by.blank? && primary_key != primary_keys 153 unique_by = primary_keys 154 end 155 156 transaction do 157 items.each_slice(batch_size).flat_map do |item_batch| 158 attributes = _bulk_insert_item_attributes( 159 item_batch, validate, &handle_attributes) 160 161 ActiveRecord::InsertAll 162 .new(insert_all_proxy_class, attributes, on_duplicate: on_duplicate, returning: returning, unique_by: unique_by) 163 .execute 164 .cast_values(insert_all_proxy_class.attribute_types).to_a 165 end 166 end 167 end 168 169 def _bulk_insert_item_attributes(items, validate_items) 170 items.map do |item| 171 item.validate! if validate_items 172 173 attributes = {} 174 column_names.each do |name| 175 attributes[name] = item.read_attribute(name) 176 end 177 178 _bulk_insert_reject_primary_key!(attributes, item.class.primary_key) 179 180 yield attributes if block_given? 181 182 attributes 183 end 184 end 185 186 def _bulk_insert_reject_primary_key!(attributes, primary_key) 187 if existing_pk = attributes.delete(primary_key) 188 raise PrimaryKeySetError, "Primary key set: #{primary_key}:#{existing_pk}\n" \ 189 "Bulk-inserts are only supported for rows that don't already have PK set" 190 end 191 end 192 193 def _bulk_insert_callback_allowed?(name, args) 194 ALLOWED_CALLBACKS.include?(name) || _bulk_insert_saved_from_belongs_to?(name, args) 195 end 196 197 # belongs_to associations will install a before_save hook during class loading 198 def _bulk_insert_saved_from_belongs_to?(name, args) 199 args.first == :before && args.second.to_s.start_with?('autosave_associated_records_for_') 200 end 201 end 202end 203