1# frozen_string_literal: true
2
3module Gitlab
4  module Database
5    module LoadBalancing
6      # Load balancing for ActiveRecord connections.
7      #
8      # Each host in the load balancer uses the same credentials as the primary
9      # database.
10      class LoadBalancer
11        CACHE_KEY = :gitlab_load_balancer_host
12
13        REPLICA_SUFFIX = '_replica'
14
15        attr_reader :host_list, :configuration
16
17        # configuration - An instance of `LoadBalancing::Configuration` that
18        #                 contains the configuration details (such as the hosts)
19        #                 for this load balancer.
20        def initialize(configuration)
21          @configuration = configuration
22          @primary_only = !configuration.load_balancing_enabled?
23          @host_list =
24            if @primary_only
25              HostList.new([PrimaryHost.new(self)])
26            else
27              HostList.new(configuration.hosts.map { |addr| Host.new(addr, self) })
28            end
29        end
30
31        def name
32          @configuration.db_config_name
33        end
34
35        def primary_only?
36          @primary_only
37        end
38
39        def disconnect!(timeout: 120)
40          host_list.hosts.each { |host| host.disconnect!(timeout: timeout) }
41        end
42
43        # Yields a connection that can be used for reads.
44        #
45        # If no secondaries were available this method will use the primary
46        # instead.
47        def read(&block)
48          conflict_retried = 0
49
50          while host
51            ensure_caching!
52
53            begin
54              connection = host.connection
55              return yield connection
56            rescue StandardError => error
57              if primary_only?
58                # If we only have primary configured, retrying is pointless
59                raise error
60              elsif serialization_failure?(error)
61                # This error can occur when a query conflicts. See
62                # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT
63                # for more information.
64                #
65                # In this event we'll cycle through the secondaries at most 3
66                # times before using the primary instead.
67                will_retry = conflict_retried < @host_list.length * 3
68
69                ::Gitlab::Database::LoadBalancing::Logger.warn(
70                  event: :host_query_conflict,
71                  message: 'Query conflict on host',
72                  conflict_retried: conflict_retried,
73                  will_retry: will_retry,
74                  db_host: host.host,
75                  db_port: host.port,
76                  host_list_length: @host_list.length
77                )
78
79                if will_retry
80                  conflict_retried += 1
81                  release_host
82                else
83                  break
84                end
85              elsif connection_error?(error)
86                host.offline!
87                release_host
88              else
89                raise error
90              end
91            end
92          end
93
94          ::Gitlab::Database::LoadBalancing::Logger.warn(
95            event: :no_secondaries_available,
96            message: 'No secondaries were available, using primary instead',
97            conflict_retried: conflict_retried,
98            host_list_length: @host_list.length
99          )
100
101          read_write(&block)
102        end
103
104        # Yields a connection that can be used for both reads and writes.
105        def read_write
106          connection = nil
107          # In the event of a failover the primary may be briefly unavailable.
108          # Instead of immediately grinding to a halt we'll retry the operation
109          # a few times.
110          retry_with_backoff do
111            connection = pool.connection
112            yield connection
113          end
114        end
115
116        # Returns a host to use for queries.
117        #
118        # Hosts are scoped per thread so that multiple threads don't
119        # accidentally re-use the same host + connection.
120        def host
121          request_cache[CACHE_KEY] ||= @host_list.next
122        end
123
124        # Releases the host and connection for the current thread.
125        def release_host
126          if host = request_cache[CACHE_KEY]
127            host.disable_query_cache!
128            host.release_connection
129          end
130
131          request_cache.delete(CACHE_KEY)
132        end
133
134        def release_primary_connection
135          pool.release_connection
136        end
137
138        # Returns the transaction write location of the primary.
139        def primary_write_location
140          location = read_write do |connection|
141            get_write_location(connection)
142          end
143
144          return location if location
145
146          raise 'Failed to determine the write location of the primary database'
147        end
148
149        # Returns true if there was at least one host that has caught up with the given transaction.
150        def select_up_to_date_host(location)
151          all_hosts = @host_list.hosts.shuffle
152          host = all_hosts.find { |host| host.caught_up?(location) }
153
154          return false unless host
155
156          request_cache[CACHE_KEY] = host
157
158          true
159        end
160
161        # Yields a block, retrying it upon error using an exponential backoff.
162        def retry_with_backoff(retries = 3, time = 2)
163          # In CI we only use the primary, but databases may not always be
164          # available (or take a few seconds to become available). Retrying in
165          # this case can slow down CI jobs. In addition, retrying with _only_
166          # a primary being present isn't all that helpful.
167          #
168          # To prevent this from happening, we don't make any attempt at
169          # retrying unless one or more replicas are used. This matches the
170          # behaviour from before we enabled load balancing code even if no
171          # replicas were configured.
172          return yield if primary_only?
173
174          retried = 0
175          last_error = nil
176
177          while retried < retries
178            begin
179              return yield
180            rescue StandardError => error
181              raise error unless connection_error?(error)
182
183              # We need to release the primary connection as otherwise Rails
184              # will keep raising errors when using the connection.
185              release_primary_connection
186
187              last_error = error
188              sleep(time)
189              retried += 1
190              time **= 2
191            end
192          end
193
194          raise last_error
195        end
196
197        def connection_error?(error)
198          case error
199          when ActiveRecord::NoDatabaseError
200            # Retrying this error isn't going to magically make the database
201            # appear. It also slows down CI jobs that are meant to create the
202            # database in the first place.
203            false
204          when ActiveRecord::StatementInvalid, ActionView::Template::Error
205            # After connecting to the DB Rails will wrap query errors using this
206            # class.
207            if (cause = error.cause)
208              connection_error?(cause)
209            else
210              false
211            end
212          when *CONNECTION_ERRORS
213            true
214          else
215            # When PG tries to set the client encoding but fails due to a
216            # connection error it will raise a PG::Error instance. Catching that
217            # would catch all errors (even those we don't want), so instead we
218            # check for the message of the error.
219            error.message.start_with?('invalid encoding name:')
220          end
221        end
222
223        def serialization_failure?(error)
224          if error.cause
225            serialization_failure?(error.cause)
226          else
227            error.is_a?(PG::TRSerializationFailure)
228          end
229        end
230
231        # pool_size - The size of the DB pool.
232        # host - An optional host name to use instead of the default one.
233        # port - An optional port to connect to.
234        def create_replica_connection_pool(pool_size, host = nil, port = nil)
235          db_config = @configuration.replica_db_config
236
237          env_config = db_config.configuration_hash.dup
238          env_config[:pool] = pool_size
239          env_config[:host] = host if host
240          env_config[:port] = port if port
241
242          replica_db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new(
243            db_config.env_name,
244            db_config.name + REPLICA_SUFFIX,
245            env_config
246          )
247
248          # We cannot use ActiveRecord::Base.connection_handler.establish_connection
249          # as it will rewrite ActiveRecord::Base.connection
250          ActiveRecord::ConnectionAdapters::ConnectionHandler
251            .new
252            .establish_connection(replica_db_config)
253        end
254
255        # ActiveRecord::ConnectionAdapters::ConnectionHandler handles fetching,
256        # and caching for connections pools for each "connection", so we
257        # leverage that.
258        def pool
259          ActiveRecord::Base.connection_handler.retrieve_connection_pool(
260            @configuration.primary_connection_specification_name,
261            role: ActiveRecord::Base.writing_role,
262            shard: ActiveRecord::Base.default_shard
263          ) || raise(::ActiveRecord::ConnectionNotEstablished)
264        end
265
266        def wal_diff(location1, location2)
267          read_write do |connection|
268            lsn1 = connection.quote(location1)
269            lsn2 = connection.quote(location2)
270
271            query = <<-SQL.squish
272            SELECT pg_wal_lsn_diff(#{lsn1}, #{lsn2})
273              AS result
274            SQL
275
276            row = connection.select_all(query).first
277            row['result'] if row
278          end
279        end
280
281        private
282
283        def ensure_caching!
284          return unless Rails.application.executor.active?
285          return if host.query_cache_enabled
286
287          host.enable_query_cache!
288        end
289
290        def request_cache
291          base = SafeRequestStore[:gitlab_load_balancer] ||= {}
292          base[self] ||= {}
293        end
294
295        # @param [ActiveRecord::Connection] ar_connection
296        # @return [String]
297        def get_write_location(ar_connection)
298          use_new_load_balancer_query = Gitlab::Utils
299            .to_boolean(ENV['USE_NEW_LOAD_BALANCER_QUERY'], default: true)
300
301          sql =
302            if use_new_load_balancer_query
303              <<~NEWSQL
304                SELECT CASE
305                    WHEN pg_is_in_recovery() = true AND EXISTS (SELECT 1 FROM pg_stat_get_wal_senders())
306                      THEN pg_last_wal_replay_lsn()::text
307                    WHEN pg_is_in_recovery() = false
308                      THEN pg_current_wal_insert_lsn()::text
309                      ELSE NULL
310                    END AS location;
311              NEWSQL
312            else
313              <<~SQL
314                SELECT pg_current_wal_insert_lsn()::text AS location
315              SQL
316            end
317
318          row = ar_connection.select_all(sql).first
319          row['location'] if row
320        end
321      end
322    end
323  end
324end
325