1# frozen_string_literal: true
2
3##
4# A mixin for ActiveRecord models that enables callers to insert instances of the
5# target class into the database en-bloc via the [bulk_insert] method.
6#
7# Upon inclusion in the target class, the mixin will perform a number of checks to
8# ensure that the target is eligible for bulk insertions. For instance, it must not
9# use ActiveRecord callbacks that fire between [save]s, since these would not run
10# properly when instances are inserted in bulk.
11#
12# The mixin uses ActiveRecord 6's [InsertAll] type internally for bulk insertions.
13# Unlike [InsertAll], however, it requires you to pass instances of the target type
14# rather than row hashes, since it will run validations prior to insertion.
15#
16# @example
17#
18#   class MyRecord < ApplicationRecord
19#     include BulkInsertSafe # must be included _last_ i.e. after any other concerns
20#   end
21#
22#   # simple
23#   MyRecord.bulk_insert!(items)
24#
25#   # with custom batch size
26#   MyRecord.bulk_insert!(items, batch_size: 100)
27#
28#   # without validations
29#   MyRecord.bulk_insert!(items, validate: false)
30#
31#   # with attribute hash modification
32#   MyRecord.bulk_insert!(items) { |item_attrs| item_attrs['col'] = 42 }
33#
34#
35module BulkInsertSafe
36  extend ActiveSupport::Concern
37
38  # These are the callbacks we think safe when used on models that are
39  # written to the database in bulk
40  ALLOWED_CALLBACKS = Set[
41    :initialize,
42    :validate,
43    :validation,
44    :find,
45    :destroy
46  ].freeze
47
48  DEFAULT_BATCH_SIZE = 500
49
50  MethodNotAllowedError = Class.new(StandardError)
51  PrimaryKeySetError = Class.new(StandardError)
52
53  class_methods do
54    def insert_all_proxy_class
55      @insert_all_proxy_class ||= Class.new(self) do
56        attr_readonly :created_at
57      end
58    end
59
60    def set_callback(name, *args)
61      unless _bulk_insert_callback_allowed?(name, args)
62        raise MethodNotAllowedError,
63          "Not allowed to call `set_callback(#{name}, #{args})` when model extends `BulkInsertSafe`." \
64            "Callbacks that fire per each record being inserted do not work with bulk-inserts."
65      end
66
67      super
68    end
69
70    # Inserts the given ActiveRecord [items] to the table mapped to this class.
71    # Items will be inserted in batches of a given size, where insertion semantics are
72    # "atomic across all batches".
73    #
74    # @param [Boolean] validate          Whether validations should run on [items]
75    # @param [Integer] batch_size        How many items should at most be inserted at once
76    # @param [Boolean] skip_duplicates   Marks duplicates as allowed, and skips inserting them
77    # @param [Symbol]  returns           Pass :ids to return an array with the primary key values
78    #                                    for all inserted records or nil to omit the underlying
79    #                                    RETURNING SQL clause entirely.
80    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
81    #                                    prior to insertion for further processing
82    #
83    # Note that this method will throw on the following occasions:
84    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
85    # - [ActiveRecord::RecordInvalid]   on entity validation failures
86    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
87    #
88    # @return true if operation succeeded, throws otherwise.
89    #
90    def bulk_insert!(items, validate: true, skip_duplicates: false, returns: nil, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
91      _bulk_insert_all!(items,
92        validate: validate,
93        on_duplicate: skip_duplicates ? :skip : :raise,
94        returns: returns,
95        unique_by: nil,
96        batch_size: batch_size,
97        &handle_attributes)
98    end
99
100    # Upserts the given ActiveRecord [items] to the table mapped to this class.
101    # Items will be inserted or updated in batches of a given size,
102    # where insertion semantics are "atomic across all batches".
103    #
104    # @param [Boolean] validate          Whether validations should run on [items]
105    # @param [Integer] batch_size        How many items should at most be inserted at once
106    # @param [Symbol/Array] unique_by    Defines index or columns to use to consider item duplicate
107    # @param [Symbol]  returns           Pass :ids to return an array with the primary key values
108    #                                    for all inserted or updated records or nil to omit the
109    #                                    underlying RETURNING SQL clause entirely.
110    # @param [Proc]    handle_attributes Block that will receive each item attribute hash
111    #                                    prior to insertion for further processing
112    #
113    # Unique indexes can be identified by columns or name:
114    #  - unique_by: :isbn
115    #  - unique_by: %i[ author_id name ]
116    #  - unique_by: :index_books_on_isbn
117    #
118    # Note that this method will throw on the following occasions:
119    # - [PrimaryKeySetError]            when primary keys are set on entities prior to insertion
120    # - [ActiveRecord::RecordInvalid]   on entity validation failures
121    # - [ActiveRecord::RecordNotUnique] on duplicate key errors
122    #
123    # @return true if operation succeeded, throws otherwise.
124    #
125    def bulk_upsert!(items, unique_by:, returns: nil, validate: true, batch_size: DEFAULT_BATCH_SIZE, &handle_attributes)
126      _bulk_insert_all!(items,
127        validate: validate,
128        on_duplicate: :update,
129        returns: returns,
130        unique_by: unique_by,
131        batch_size: batch_size,
132        &handle_attributes)
133    end
134
135    private
136
137    def _bulk_insert_all!(items, on_duplicate:, returns:, unique_by:, validate:, batch_size:, &handle_attributes)
138      return [] if items.empty?
139
140      returning =
141        case returns
142        when :ids
143          [primary_key]
144        when nil
145          false
146        else
147          returns
148        end
149
150      # Handle insertions for tables with a composite primary key
151      primary_keys = connection.schema_cache.primary_keys(table_name)
152      if unique_by.blank? && primary_key != primary_keys
153        unique_by = primary_keys
154      end
155
156      transaction do
157        items.each_slice(batch_size).flat_map do |item_batch|
158          attributes = _bulk_insert_item_attributes(
159            item_batch, validate, &handle_attributes)
160
161          ActiveRecord::InsertAll
162              .new(insert_all_proxy_class, attributes, on_duplicate: on_duplicate, returning: returning, unique_by: unique_by)
163              .execute
164              .cast_values(insert_all_proxy_class.attribute_types).to_a
165        end
166      end
167    end
168
169    def _bulk_insert_item_attributes(items, validate_items)
170      items.map do |item|
171        item.validate! if validate_items
172
173        attributes = {}
174        column_names.each do |name|
175          attributes[name] = item.read_attribute(name)
176        end
177
178        _bulk_insert_reject_primary_key!(attributes, item.class.primary_key)
179
180        yield attributes if block_given?
181
182        attributes
183      end
184    end
185
186    def _bulk_insert_reject_primary_key!(attributes, primary_key)
187      if existing_pk = attributes.delete(primary_key)
188        raise PrimaryKeySetError, "Primary key set: #{primary_key}:#{existing_pk}\n" \
189          "Bulk-inserts are only supported for rows that don't already have PK set"
190      end
191    end
192
193    def _bulk_insert_callback_allowed?(name, args)
194      ALLOWED_CALLBACKS.include?(name) || _bulk_insert_saved_from_belongs_to?(name, args)
195    end
196
197    # belongs_to associations will install a before_save hook during class loading
198    def _bulk_insert_saved_from_belongs_to?(name, args)
199      args.first == :before && args.second.to_s.start_with?('autosave_associated_records_for_')
200    end
201  end
202end
203