1# frozen_string_literal: true 2 3module QA 4 module Service 5 class PraefectManager 6 include Service::Shellout 7 8 attr_accessor :gitlab 9 10 PrometheusQueryError = Class.new(StandardError) 11 12 def initialize 13 @gitlab = 'gitlab-gitaly-cluster' 14 @praefect = 'praefect' 15 @postgres = 'postgres' 16 @primary_node = 'gitaly1' 17 @secondary_node = 'gitaly2' 18 @tertiary_node = 'gitaly3' 19 @virtual_storage = 'default' 20 end 21 22 attr_reader :primary_node, :secondary_node, :tertiary_node 23 24 # Executes the praefect `dataloss` command. 25 # 26 # @return [Boolean] whether dataloss has occurred 27 def dataloss? 28 wait_until_shell_command_matches(dataloss_command, /Outdated repositories/) 29 end 30 31 def replicated?(project_id) 32 Support::Retrier.retry_until(raise_on_failure: false) do 33 replicas = wait_until_shell_command(%(docker exec #{@gitlab} bash -c 'gitlab-rake "gitlab:praefect:replicas[#{project_id}]"')) do |line| 34 QA::Runtime::Logger.debug(line.chomp) 35 # The output of the rake task looks something like this: 36 # 37 # Project name | gitaly1 (primary) | gitaly2 | gitaly3 38 # ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 39 # gitaly_cluster-3aff1f2bd14e6c98 | 23c4422629234d62b62adacafd0a33a8364e8619 | 23c4422629234d62b62adacafd0a33a8364e8619 | 23c4422629234d62b62adacafd0a33a8364e8619 40 # 41 break line if line.start_with?('gitaly_cluster') 42 break nil if line.include?('Something went wrong when getting replicas') 43 end 44 next false unless replicas 45 46 # We want to know if the checksums are identical 47 replicas&.split('|')&.map(&:strip)&.slice(1..3)&.uniq&.one? 48 end 49 end 50 51 def stop_primary_node 52 stop_node(@primary_node) 53 end 54 55 def start_primary_node 56 start_node(@primary_node) 57 end 58 59 def start_praefect 60 start_node(@praefect) 61 wait_for_praefect 62 end 63 64 def stop_praefect 65 stop_node(@praefect) 66 end 67 68 def stop_secondary_node 69 stop_node(@secondary_node) 70 end 71 72 def start_secondary_node 73 start_node(@secondary_node) 74 end 75 76 def stop_tertiary_node 77 stop_node(@tertiary_node) 78 end 79 80 def start_tertiary_node 81 start_node(@tertiary_node) 82 end 83 84 def start_node(name) 85 shell "docker start #{name}" 86 wait_until_shell_command_matches( 87 "docker inspect -f {{.State.Running}} #{name}", 88 /true/, 89 sleep_interval: 3, 90 max_duration: 180, 91 retry_on_exception: true 92 ) 93 end 94 95 def stop_node(name) 96 shell "docker stop #{name}" 97 end 98 99 def clear_replication_queue 100 QA::Runtime::Logger.info("Clearing the replication queue") 101 shell sql_to_docker_exec_cmd( 102 <<~SQL 103 delete from replication_queue_job_lock; 104 delete from replication_queue_lock; 105 delete from replication_queue; 106 SQL 107 ) 108 end 109 110 def create_stalled_replication_queue 111 QA::Runtime::Logger.info("Setting jobs in replication queue to `in_progress` and acquiring locks") 112 shell sql_to_docker_exec_cmd( 113 <<~SQL 114 update replication_queue set state = 'in_progress'; 115 insert into replication_queue_job_lock (job_id, lock_id, triggered_at) 116 select id, rq.lock_id, created_at from replication_queue rq 117 left join replication_queue_job_lock rqjl on rq.id = rqjl.job_id 118 where state = 'in_progress' and rqjl.job_id is null; 119 update replication_queue_lock set acquired = 't'; 120 SQL 121 ) 122 end 123 124 # Reconciles the previous primary node with the current one 125 # I.e., it brings the previous primary node up-to-date 126 def reconcile_nodes 127 reconcile_node_with_node(@primary_node, current_primary_node) 128 end 129 130 def reconcile_node_with_node(target, reference) 131 QA::Runtime::Logger.info("Reconcile #{target} with #{reference} on #{@virtual_storage}") 132 wait_until_shell_command_matches( 133 "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml reconcile -virtual #{@virtual_storage} -target #{target} -reference #{reference} -f'", 134 /FINISHED: \d+ repos were checked for consistency/, 135 sleep_interval: 5, 136 retry_on_exception: true 137 ) 138 end 139 140 def query_read_distribution 141 output = shell "docker exec #{@gitlab} bash -c 'curl -s http://localhost:9090/api/v1/query?query=gitaly_praefect_read_distribution'" do |line| 142 QA::Runtime::Logger.debug(line) 143 break line 144 end 145 result = JSON.parse(output) 146 147 raise PrometheusQueryError, "Unable to query read distribution metrics" unless result['status'] == 'success' 148 149 result['data']['result'].map { |result| { node: result['metric']['storage'], value: result['value'][1].to_i } } 150 end 151 152 def replication_queue_incomplete_count 153 result = [] 154 shell sql_to_docker_exec_cmd("select count(*) from replication_queue where state = 'ready' or state = 'in_progress';") do |line| 155 result << line 156 end 157 # The result looks like: 158 # count 159 # ----- 160 # 1 161 result[2].to_i 162 end 163 164 def replication_queue_lock_count 165 result = [] 166 shell sql_to_docker_exec_cmd("select count(*) from replication_queue_lock where acquired = 't';") do |line| 167 result << line 168 end 169 # The result looks like: 170 # count 171 # ----- 172 # 1 173 result[2].to_i 174 end 175 176 def start_all_nodes 177 start_node(@primary_node) 178 start_node(@secondary_node) 179 start_node(@tertiary_node) 180 start_node(@praefect) 181 182 wait_for_health_check_all_nodes 183 wait_for_reliable_connection 184 end 185 186 def verify_storage_move(source_storage, destination_storage, repo_type: :project) 187 return if Specs::Helpers::ContextSelector.dot_com? 188 189 repo_path = verify_storage_move_from_gitaly(source_storage[:name], repo_type: repo_type) 190 191 destination_storage[:type] == :praefect ? verify_storage_move_to_praefect(repo_path, destination_storage[:name]) : verify_storage_move_to_gitaly(repo_path, destination_storage[:name]) 192 end 193 194 def wait_for_praefect 195 wait_until_shell_command_matches( 196 "docker inspect -f {{.State.Running}} #{@praefect}", 197 /true/, 198 sleep_interval: 3, 199 max_duration: 180, 200 retry_on_exception: true 201 ) 202 203 QA::Runtime::Logger.info('Wait until Praefect starts and is listening') 204 wait_until_shell_command_matches( 205 "docker exec #{@praefect} bash -c 'cat /var/log/gitlab/praefect/current'", 206 /listening at tcp address/ 207 ) 208 209 wait_for_gitaly_check 210 end 211 212 def wait_for_sql_ping 213 wait_until_shell_command_matches( 214 "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml sql-ping'", 215 /praefect sql-ping: OK/ 216 ) 217 end 218 219 def health_check_failure_message?(msg) 220 ['error when pinging healthcheck', 'failed checking node health'].include?(msg) 221 end 222 223 def wait_for_no_praefect_storage_error 224 # If a healthcheck error was the last message to be logged, we'll keep seeing that message even if it's no longer a problem 225 # That is, there's no message shown in the Praefect logs when the healthcheck succeeds 226 # To work around that we perform the gitaly check rake task, wait a few seconds, and then we confirm that no healthcheck errors appear 227 228 QA::Runtime::Logger.info("Checking that Praefect does not report healthcheck errors with its gitaly nodes") 229 230 Support::Waiter.wait_until(max_duration: 120) do 231 wait_for_gitaly_check 232 233 sleep 5 234 235 shell "docker exec #{@praefect} bash -c 'tail -n 1 /var/log/gitlab/praefect/current'" do |line| 236 QA::Runtime::Logger.debug(line.chomp) 237 log = JSON.parse(line) 238 239 break true unless health_check_failure_message?(log['msg']) 240 rescue JSON::ParserError 241 # Ignore lines that can't be parsed as JSON 242 end 243 end 244 end 245 246 def wait_for_storage_nodes 247 wait_for_no_praefect_storage_error 248 249 Support::Waiter.repeat_until(max_attempts: 3, max_duration: 120, sleep_interval: 1) do 250 nodes_confirmed = { 251 @primary_node => false, 252 @secondary_node => false, 253 @tertiary_node => false 254 } 255 256 wait_until_shell_command("docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml dial-nodes'") do |line| 257 QA::Runtime::Logger.debug(line.chomp) 258 259 nodes_confirmed.each_key do |node| 260 nodes_confirmed[node] = true if line =~ /SUCCESS: confirmed Gitaly storage "#{node}" in virtual storages \[#{@virtual_storage}\] is served/ 261 end 262 263 nodes_confirmed.values.all? 264 end 265 end 266 end 267 268 def wait_for_health_check_all_nodes 269 wait_for_health_check(@primary_node) 270 wait_for_health_check(@secondary_node) 271 wait_for_health_check(@tertiary_node) 272 end 273 274 def wait_for_health_check(node) 275 QA::Runtime::Logger.info("Waiting for health check on #{node}") 276 wait_until_node_is_marked_as_healthy_storage(node) 277 end 278 279 def wait_for_primary_node_health_check 280 wait_for_health_check(@primary_node) 281 end 282 283 def wait_for_secondary_node_health_check 284 wait_for_health_check(@secondary_node) 285 end 286 287 def wait_for_tertiary_node_health_check 288 wait_for_health_check(@tertiary_node) 289 end 290 291 def wait_for_health_check_failure(node) 292 QA::Runtime::Logger.info("Waiting for health check failure on #{node}") 293 wait_until_node_is_removed_from_healthy_storages(node) 294 end 295 296 def wait_for_primary_node_health_check_failure 297 wait_for_health_check_failure(@primary_node) 298 end 299 300 def wait_for_secondary_node_health_check_failure 301 wait_for_health_check_failure(@secondary_node) 302 end 303 304 def wait_for_tertiary_node_health_check_failure 305 wait_for_health_check_failure(@tertiary_node) 306 end 307 308 def wait_until_node_is_removed_from_healthy_storages(node) 309 Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do 310 result = [] 311 shell sql_to_docker_exec_cmd("SELECT count(*) FROM healthy_storages WHERE storage = '#{node}';") do |line| 312 result << line 313 end 314 QA::Runtime::Logger.debug("result is ---#{result}") 315 result[2].to_i == 0 316 end 317 end 318 319 def wait_until_node_is_marked_as_healthy_storage(node) 320 Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do 321 result = [] 322 shell sql_to_docker_exec_cmd("SELECT count(*) FROM healthy_storages WHERE storage = '#{node}';") do |line| 323 result << line 324 end 325 326 QA::Runtime::Logger.debug("result is ---#{result}") 327 result[2].to_i == 1 328 end 329 end 330 331 def wait_for_gitaly_check 332 Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do 333 wait_until_shell_command("docker exec #{@gitlab} bash -c 'gitlab-rake gitlab:git:fsck'") do |line| 334 QA::Runtime::Logger.debug(line.chomp) 335 line.include?('Done') 336 end 337 end 338 end 339 340 # Waits until there is an increase in the number of reads for 341 # any node compared to the number of reads provided. If a node 342 # has no pre-read data, consider it to have had zero reads. 343 def wait_for_read_count_change(pre_read_data) 344 diff_found = false 345 Support::Waiter.wait_until(sleep_interval: 1, max_duration: 60) do 346 query_read_distribution.each_with_index do |data, index| 347 diff_found = true if data[:value] > value_for_node(pre_read_data, data[:node]) 348 end 349 diff_found 350 end 351 end 352 353 def value_for_node(data, node) 354 data.find(-> {{ value: 0 }}) { |item| item[:node] == node }[:value] 355 end 356 357 def wait_for_reliable_connection 358 QA::Runtime::Logger.info('Wait until GitLab and Praefect can communicate reliably') 359 wait_for_sql_ping 360 wait_for_storage_nodes 361 end 362 363 def wait_for_replication(project_id) 364 Support::Waiter.wait_until(sleep_interval: 1) { replication_queue_incomplete_count == 0 && replicated?(project_id) } 365 end 366 367 def replication_pending? 368 result = [] 369 shell sql_to_docker_exec_cmd( 370 <<~SQL 371 select job from replication_queue 372 where state = 'ready' 373 and job ->> 'change' = 'update' 374 and job ->> 'target_node_storage' = '#{@primary_node}'; 375 SQL 376 ) do |line| 377 result << line 378 end 379 380 # The result looks like: 381 # 382 # job 383 # ----------- 384 # {"change": "update", "params": null, "relative_path": "@hashed/4b/22/4b227777d4dd1fc61c6f884f48641d02b4d121d3fd328cb08b5531fcacdabf8a.git", "virtual_storage": "default", "source_node_storage": "gitaly3", "target_node_storage": "gitaly1"} 385 # (1 row) 386 # <blank row> 387 # 388 # Therefore when replication is pending there is at least 1 row of data plus 4 rows of metadata/layout 389 390 result.size >= 5 391 end 392 393 def list_untracked_repositories 394 untracked_repositories = [] 395 shell "docker exec #{@praefect} bash -c 'gitlab-ctl praefect list-untracked-repositories'" do |line| 396 # Results look like this 397 # The following repositories were found on disk, but missing from the tracking database: 398 # {"relative_path":"@hashed/aa/bb.git","storage":"gitaly1","virtual_storage":"default"} 399 # {"relative_path":"@hashed/bb/cc.git","storage":"gitaly3","virtual_storage":"default"} 400 401 QA::Runtime::Logger.debug(line.chomp) 402 untracked_repositories.append(JSON.parse(line)) 403 rescue JSON::ParserError 404 # Ignore lines that can't be parsed as JSON 405 end 406 407 QA::Runtime::Logger.debug("list_untracked_repositories --- #{untracked_repositories}") 408 untracked_repositories 409 end 410 411 def track_repository_in_praefect(relative_path, storage, virtual_storage) 412 cmd = "gitlab-ctl praefect track-repository --repository-relative-path #{relative_path} --authoritative-storage #{storage} --virtual-storage-name #{virtual_storage}" 413 shell "docker exec #{@praefect} bash -c '#{cmd}'" 414 end 415 416 def remove_tracked_praefect_repository(relative_path, virtual_storage) 417 cmd = "gitlab-ctl praefect remove-repository --repository-relative-path #{relative_path} --virtual-storage-name #{virtual_storage}" 418 shell "docker exec #{@praefect} bash -c '#{cmd}'" 419 end 420 421 def add_repo_to_disk(node, repo_path) 422 cmd = "GIT_DIR=. git init --initial-branch=main /var/opt/gitlab/git-data/repositories/#{repo_path}" 423 shell "docker exec --user git #{node} bash -c '#{cmd}'" 424 end 425 426 def remove_repo_from_disk(repo_path) 427 cmd = "rm -rf /var/opt/gitlab/git-data/repositories/#{repo_path}" 428 shell "docker exec #{@primary_node} bash -c '#{cmd}'" 429 shell "docker exec #{@secondary_node} bash -c '#{cmd}'" 430 shell "docker exec #{@tertiary_node} bash -c '#{cmd}'" 431 end 432 433 def remove_repository_from_praefect_database(relative_path) 434 shell sql_to_docker_exec_cmd("delete from repositories where relative_path = '#{relative_path}';") 435 shell sql_to_docker_exec_cmd("delete from storage_repositories where relative_path = '#{relative_path}';") 436 end 437 438 def praefect_database_tracks_repo?(relative_path) 439 storage_repositories = [] 440 shell sql_to_docker_exec_cmd("SELECT count(*) FROM storage_repositories where relative_path='#{relative_path}';") do |line| 441 storage_repositories << line 442 end 443 QA::Runtime::Logger.debug("storage_repositories count is ---#{storage_repositories}") 444 445 repositories = [] 446 shell sql_to_docker_exec_cmd("SELECT count(*) FROM repositories where relative_path='#{relative_path}';") do |line| 447 repositories << line 448 end 449 QA::Runtime::Logger.debug("repositories count is ---#{repositories}") 450 451 (storage_repositories[2].to_i >= 1) && (repositories[2].to_i >= 1) 452 end 453 454 def repository_replicated_to_disk?(node, relative_path) 455 Support::Waiter.wait_until(max_duration: 300, sleep_interval: 3, raise_on_failure: false) do 456 result = [] 457 shell sql_to_docker_exec_cmd("SELECT count(*) FROM storage_repositories where relative_path='#{relative_path}';") do |line| 458 result << line 459 end 460 QA::Runtime::Logger.debug("result is ---#{result}") 461 result[2].to_i == 3 462 end 463 464 repository_exists_on_node_disk?(node, relative_path) 465 end 466 467 def repository_exists_on_node_disk?(node, relative_path) 468 # If the dir does not exist it has a non zero exit code leading to a error being raised 469 # Instead we echo a test line if the dir does not exist, which has a zero exit code, with no output 470 bash_command = "test -d /var/opt/gitlab/git-data/repositories/#{relative_path} || echo -n 'DIR_DOES_NOT_EXIST'" 471 result = [] 472 shell "docker exec #{node} bash -c '#{bash_command}'" do |line| 473 result << line 474 end 475 QA::Runtime::Logger.debug("result is ---#{result}") 476 result.exclude?("DIR_DOES_NOT_EXIST") 477 end 478 479 private 480 481 def current_primary_node 482 result = [] 483 shell sql_to_docker_exec_cmd("select node_name from shard_primaries where shard_name = '#{@virtual_storage}';") do |line| 484 result << line 485 end 486 # The result looks like: 487 # node_name 488 # ----------- 489 # gitaly1 490 # (1 row) 491 492 result[2].strip 493 end 494 495 def dataloss_command 496 "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml dataloss'" 497 end 498 499 def sql_to_docker_exec_cmd(sql) 500 Service::Shellout.sql_to_docker_exec_cmd(sql, 'postgres', 'SQL_PASSWORD', 'praefect_production', 'postgres.test', @postgres) 501 end 502 503 def verify_storage_move_from_gitaly(storage, repo_type: :project) 504 wait_until_shell_command("docker exec #{@gitlab} bash -c 'tail -n 50 /var/log/gitlab/gitaly/current'") do |line| 505 log = JSON.parse(line) 506 507 if (log['grpc.method'] == 'RenameRepository' || log['grpc.method'] == 'RemoveRepository') && 508 log['grpc.request.repoStorage'] == storage && 509 repo_type(log['grpc.request.repoPath']) == repo_type 510 break log['grpc.request.repoPath'] 511 end 512 rescue JSON::ParserError 513 # Ignore lines that can't be parsed as JSON 514 end 515 end 516 517 def verify_storage_move_to_praefect(repo_path, virtual_storage) 518 wait_until_shell_command("docker exec #{@praefect} bash -c 'tail -n 50 /var/log/gitlab/praefect/current'") do |line| 519 log = JSON.parse(line) 520 521 log['grpc.method'] == 'ReplicateRepository' && log['virtual_storage'] == virtual_storage && log['relative_path'] == repo_path 522 rescue JSON::ParserError 523 # Ignore lines that can't be parsed as JSON 524 end 525 end 526 527 def verify_storage_move_to_gitaly(repo_path, storage) 528 wait_until_shell_command("docker exec #{@gitlab} bash -c 'tail -n 50 /var/log/gitlab/gitaly/current'") do |line| 529 log = JSON.parse(line) 530 531 log['grpc.method'] == 'ReplicateRepository' && log['grpc.request.repoStorage'] == storage && log['grpc.request.repoPath'] == repo_path 532 rescue JSON::ParserError 533 # Ignore lines that can't be parsed as JSON 534 end 535 end 536 537 def with_praefect_log(**kwargs) 538 wait_until_shell_command("docker exec #{@praefect} bash -c 'tail -n 1 /var/log/gitlab/praefect/current'", **kwargs) do |line| 539 QA::Runtime::Logger.debug(line.chomp) 540 yield JSON.parse(line) 541 end 542 end 543 544 def repo_type(repo_path) 545 return :snippet if repo_path.start_with?('@snippets') 546 return :design if repo_path.end_with?('.design.git') 547 548 if repo_path.end_with?('.wiki.git') 549 return repo_path.start_with?('@groups') ? :group_wiki : :wiki 550 end 551 552 :project 553 end 554 end 555 end 556end 557