1# frozen_string_literal: true 2 3module Gitlab 4 module Database 5 module LoadBalancing 6 # Load balancing for ActiveRecord connections. 7 # 8 # Each host in the load balancer uses the same credentials as the primary 9 # database. 10 class LoadBalancer 11 CACHE_KEY = :gitlab_load_balancer_host 12 13 REPLICA_SUFFIX = '_replica' 14 15 attr_reader :host_list, :configuration 16 17 # configuration - An instance of `LoadBalancing::Configuration` that 18 # contains the configuration details (such as the hosts) 19 # for this load balancer. 20 def initialize(configuration) 21 @configuration = configuration 22 @primary_only = !configuration.load_balancing_enabled? 23 @host_list = 24 if @primary_only 25 HostList.new([PrimaryHost.new(self)]) 26 else 27 HostList.new(configuration.hosts.map { |addr| Host.new(addr, self) }) 28 end 29 end 30 31 def name 32 @configuration.db_config_name 33 end 34 35 def primary_only? 36 @primary_only 37 end 38 39 def disconnect!(timeout: 120) 40 host_list.hosts.each { |host| host.disconnect!(timeout: timeout) } 41 end 42 43 # Yields a connection that can be used for reads. 44 # 45 # If no secondaries were available this method will use the primary 46 # instead. 47 def read(&block) 48 conflict_retried = 0 49 50 while host 51 ensure_caching! 52 53 begin 54 connection = host.connection 55 return yield connection 56 rescue StandardError => error 57 if primary_only? 58 # If we only have primary configured, retrying is pointless 59 raise error 60 elsif serialization_failure?(error) 61 # This error can occur when a query conflicts. See 62 # https://www.postgresql.org/docs/current/static/hot-standby.html#HOT-STANDBY-CONFLICT 63 # for more information. 64 # 65 # In this event we'll cycle through the secondaries at most 3 66 # times before using the primary instead. 67 will_retry = conflict_retried < @host_list.length * 3 68 69 ::Gitlab::Database::LoadBalancing::Logger.warn( 70 event: :host_query_conflict, 71 message: 'Query conflict on host', 72 conflict_retried: conflict_retried, 73 will_retry: will_retry, 74 db_host: host.host, 75 db_port: host.port, 76 host_list_length: @host_list.length 77 ) 78 79 if will_retry 80 conflict_retried += 1 81 release_host 82 else 83 break 84 end 85 elsif connection_error?(error) 86 host.offline! 87 release_host 88 else 89 raise error 90 end 91 end 92 end 93 94 ::Gitlab::Database::LoadBalancing::Logger.warn( 95 event: :no_secondaries_available, 96 message: 'No secondaries were available, using primary instead', 97 conflict_retried: conflict_retried, 98 host_list_length: @host_list.length 99 ) 100 101 read_write(&block) 102 end 103 104 # Yields a connection that can be used for both reads and writes. 105 def read_write 106 connection = nil 107 # In the event of a failover the primary may be briefly unavailable. 108 # Instead of immediately grinding to a halt we'll retry the operation 109 # a few times. 110 retry_with_backoff do 111 connection = pool.connection 112 yield connection 113 end 114 end 115 116 # Returns a host to use for queries. 117 # 118 # Hosts are scoped per thread so that multiple threads don't 119 # accidentally re-use the same host + connection. 120 def host 121 request_cache[CACHE_KEY] ||= @host_list.next 122 end 123 124 # Releases the host and connection for the current thread. 125 def release_host 126 if host = request_cache[CACHE_KEY] 127 host.disable_query_cache! 128 host.release_connection 129 end 130 131 request_cache.delete(CACHE_KEY) 132 end 133 134 def release_primary_connection 135 pool.release_connection 136 end 137 138 # Returns the transaction write location of the primary. 139 def primary_write_location 140 location = read_write do |connection| 141 get_write_location(connection) 142 end 143 144 return location if location 145 146 raise 'Failed to determine the write location of the primary database' 147 end 148 149 # Returns true if there was at least one host that has caught up with the given transaction. 150 def select_up_to_date_host(location) 151 all_hosts = @host_list.hosts.shuffle 152 host = all_hosts.find { |host| host.caught_up?(location) } 153 154 return false unless host 155 156 request_cache[CACHE_KEY] = host 157 158 true 159 end 160 161 # Yields a block, retrying it upon error using an exponential backoff. 162 def retry_with_backoff(retries = 3, time = 2) 163 # In CI we only use the primary, but databases may not always be 164 # available (or take a few seconds to become available). Retrying in 165 # this case can slow down CI jobs. In addition, retrying with _only_ 166 # a primary being present isn't all that helpful. 167 # 168 # To prevent this from happening, we don't make any attempt at 169 # retrying unless one or more replicas are used. This matches the 170 # behaviour from before we enabled load balancing code even if no 171 # replicas were configured. 172 return yield if primary_only? 173 174 retried = 0 175 last_error = nil 176 177 while retried < retries 178 begin 179 return yield 180 rescue StandardError => error 181 raise error unless connection_error?(error) 182 183 # We need to release the primary connection as otherwise Rails 184 # will keep raising errors when using the connection. 185 release_primary_connection 186 187 last_error = error 188 sleep(time) 189 retried += 1 190 time **= 2 191 end 192 end 193 194 raise last_error 195 end 196 197 def connection_error?(error) 198 case error 199 when ActiveRecord::NoDatabaseError 200 # Retrying this error isn't going to magically make the database 201 # appear. It also slows down CI jobs that are meant to create the 202 # database in the first place. 203 false 204 when ActiveRecord::StatementInvalid, ActionView::Template::Error 205 # After connecting to the DB Rails will wrap query errors using this 206 # class. 207 if (cause = error.cause) 208 connection_error?(cause) 209 else 210 false 211 end 212 when *CONNECTION_ERRORS 213 true 214 else 215 # When PG tries to set the client encoding but fails due to a 216 # connection error it will raise a PG::Error instance. Catching that 217 # would catch all errors (even those we don't want), so instead we 218 # check for the message of the error. 219 error.message.start_with?('invalid encoding name:') 220 end 221 end 222 223 def serialization_failure?(error) 224 if error.cause 225 serialization_failure?(error.cause) 226 else 227 error.is_a?(PG::TRSerializationFailure) 228 end 229 end 230 231 # pool_size - The size of the DB pool. 232 # host - An optional host name to use instead of the default one. 233 # port - An optional port to connect to. 234 def create_replica_connection_pool(pool_size, host = nil, port = nil) 235 db_config = @configuration.replica_db_config 236 237 env_config = db_config.configuration_hash.dup 238 env_config[:pool] = pool_size 239 env_config[:host] = host if host 240 env_config[:port] = port if port 241 242 replica_db_config = ActiveRecord::DatabaseConfigurations::HashConfig.new( 243 db_config.env_name, 244 db_config.name + REPLICA_SUFFIX, 245 env_config 246 ) 247 248 # We cannot use ActiveRecord::Base.connection_handler.establish_connection 249 # as it will rewrite ActiveRecord::Base.connection 250 ActiveRecord::ConnectionAdapters::ConnectionHandler 251 .new 252 .establish_connection(replica_db_config) 253 end 254 255 # ActiveRecord::ConnectionAdapters::ConnectionHandler handles fetching, 256 # and caching for connections pools for each "connection", so we 257 # leverage that. 258 def pool 259 ActiveRecord::Base.connection_handler.retrieve_connection_pool( 260 @configuration.primary_connection_specification_name, 261 role: ActiveRecord::Base.writing_role, 262 shard: ActiveRecord::Base.default_shard 263 ) || raise(::ActiveRecord::ConnectionNotEstablished) 264 end 265 266 def wal_diff(location1, location2) 267 read_write do |connection| 268 lsn1 = connection.quote(location1) 269 lsn2 = connection.quote(location2) 270 271 query = <<-SQL.squish 272 SELECT pg_wal_lsn_diff(#{lsn1}, #{lsn2}) 273 AS result 274 SQL 275 276 row = connection.select_all(query).first 277 row['result'] if row 278 end 279 end 280 281 private 282 283 def ensure_caching! 284 return unless Rails.application.executor.active? 285 return if host.query_cache_enabled 286 287 host.enable_query_cache! 288 end 289 290 def request_cache 291 base = SafeRequestStore[:gitlab_load_balancer] ||= {} 292 base[self] ||= {} 293 end 294 295 # @param [ActiveRecord::Connection] ar_connection 296 # @return [String] 297 def get_write_location(ar_connection) 298 use_new_load_balancer_query = Gitlab::Utils 299 .to_boolean(ENV['USE_NEW_LOAD_BALANCER_QUERY'], default: true) 300 301 sql = 302 if use_new_load_balancer_query 303 <<~NEWSQL 304 SELECT CASE 305 WHEN pg_is_in_recovery() = true AND EXISTS (SELECT 1 FROM pg_stat_get_wal_senders()) 306 THEN pg_last_wal_replay_lsn()::text 307 WHEN pg_is_in_recovery() = false 308 THEN pg_current_wal_insert_lsn()::text 309 ELSE NULL 310 END AS location; 311 NEWSQL 312 else 313 <<~SQL 314 SELECT pg_current_wal_insert_lsn()::text AS location 315 SQL 316 end 317 318 row = ar_connection.select_all(sql).first 319 row['location'] if row 320 end 321 end 322 end 323 end 324end 325