1# frozen_string_literal: true
2
3module QA
4  module Service
5    class PraefectManager
6      include Service::Shellout
7
8      attr_accessor :gitlab
9
10      PrometheusQueryError = Class.new(StandardError)
11
12      def initialize
13        @gitlab = 'gitlab-gitaly-cluster'
14        @praefect = 'praefect'
15        @postgres = 'postgres'
16        @primary_node = 'gitaly1'
17        @secondary_node = 'gitaly2'
18        @tertiary_node = 'gitaly3'
19        @virtual_storage = 'default'
20      end
21
22      attr_reader :primary_node, :secondary_node, :tertiary_node
23
24      # Executes the praefect `dataloss` command.
25      #
26      # @return [Boolean] whether dataloss has occurred
27      def dataloss?
28        wait_until_shell_command_matches(dataloss_command, /Outdated repositories/)
29      end
30
31      def replicated?(project_id)
32        Support::Retrier.retry_until(raise_on_failure: false) do
33          replicas = wait_until_shell_command(%(docker exec #{@gitlab} bash -c 'gitlab-rake "gitlab:praefect:replicas[#{project_id}]"')) do |line|
34            QA::Runtime::Logger.debug(line.chomp)
35            # The output of the rake task looks something like this:
36            #
37            # Project name                    | gitaly1 (primary)                        | gitaly2                                  | gitaly3
38            # ----------------------------------------------------------------------------------------------------------------------------------------------------------------
39            # gitaly_cluster-3aff1f2bd14e6c98 | 23c4422629234d62b62adacafd0a33a8364e8619 | 23c4422629234d62b62adacafd0a33a8364e8619 | 23c4422629234d62b62adacafd0a33a8364e8619
40            #
41            break line if line.start_with?('gitaly_cluster')
42            break nil if line.include?('Something went wrong when getting replicas')
43          end
44          next false unless replicas
45
46          # We want to know if the checksums are identical
47          replicas&.split('|')&.map(&:strip)&.slice(1..3)&.uniq&.one?
48        end
49      end
50
51      def stop_primary_node
52        stop_node(@primary_node)
53      end
54
55      def start_primary_node
56        start_node(@primary_node)
57      end
58
59      def start_praefect
60        start_node(@praefect)
61        wait_for_praefect
62      end
63
64      def stop_praefect
65        stop_node(@praefect)
66      end
67
68      def stop_secondary_node
69        stop_node(@secondary_node)
70      end
71
72      def start_secondary_node
73        start_node(@secondary_node)
74      end
75
76      def stop_tertiary_node
77        stop_node(@tertiary_node)
78      end
79
80      def start_tertiary_node
81        start_node(@tertiary_node)
82      end
83
84      def start_node(name)
85        shell "docker start #{name}"
86        wait_until_shell_command_matches(
87          "docker inspect -f {{.State.Running}} #{name}",
88          /true/,
89          sleep_interval: 3,
90          max_duration: 180,
91          retry_on_exception: true
92        )
93      end
94
95      def stop_node(name)
96        shell "docker stop #{name}"
97      end
98
99      def clear_replication_queue
100        QA::Runtime::Logger.info("Clearing the replication queue")
101        shell sql_to_docker_exec_cmd(
102          <<~SQL
103            delete from replication_queue_job_lock;
104            delete from replication_queue_lock;
105            delete from replication_queue;
106          SQL
107        )
108      end
109
110      def create_stalled_replication_queue
111        QA::Runtime::Logger.info("Setting jobs in replication queue to `in_progress` and acquiring locks")
112        shell sql_to_docker_exec_cmd(
113          <<~SQL
114            update replication_queue set state = 'in_progress';
115            insert into replication_queue_job_lock (job_id, lock_id, triggered_at)
116              select id, rq.lock_id, created_at from replication_queue rq
117                left join replication_queue_job_lock rqjl on rq.id = rqjl.job_id
118                where state = 'in_progress' and rqjl.job_id is null;
119            update replication_queue_lock set acquired = 't';
120          SQL
121        )
122      end
123
124      # Reconciles the previous primary node with the current one
125      # I.e., it brings the previous primary node up-to-date
126      def reconcile_nodes
127        reconcile_node_with_node(@primary_node, current_primary_node)
128      end
129
130      def reconcile_node_with_node(target, reference)
131        QA::Runtime::Logger.info("Reconcile #{target} with #{reference} on #{@virtual_storage}")
132        wait_until_shell_command_matches(
133          "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml reconcile -virtual #{@virtual_storage} -target #{target} -reference #{reference} -f'",
134          /FINISHED: \d+ repos were checked for consistency/,
135          sleep_interval: 5,
136          retry_on_exception: true
137        )
138      end
139
140      def query_read_distribution
141        output = shell "docker exec #{@gitlab} bash -c 'curl -s http://localhost:9090/api/v1/query?query=gitaly_praefect_read_distribution'" do |line|
142          QA::Runtime::Logger.debug(line)
143          break line
144        end
145        result = JSON.parse(output)
146
147        raise PrometheusQueryError, "Unable to query read distribution metrics" unless result['status'] == 'success'
148
149        result['data']['result'].map { |result| { node: result['metric']['storage'], value: result['value'][1].to_i } }
150      end
151
152      def replication_queue_incomplete_count
153        result = []
154        shell sql_to_docker_exec_cmd("select count(*) from replication_queue where state = 'ready' or state = 'in_progress';") do |line|
155          result << line
156        end
157        # The result looks like:
158        #   count
159        #   -----
160        #       1
161        result[2].to_i
162      end
163
164      def replication_queue_lock_count
165        result = []
166        shell sql_to_docker_exec_cmd("select count(*) from replication_queue_lock where acquired = 't';") do |line|
167          result << line
168        end
169        # The result looks like:
170        #   count
171        #   -----
172        #       1
173        result[2].to_i
174      end
175
176      def start_all_nodes
177        start_node(@primary_node)
178        start_node(@secondary_node)
179        start_node(@tertiary_node)
180        start_node(@praefect)
181
182        wait_for_health_check_all_nodes
183        wait_for_reliable_connection
184      end
185
186      def verify_storage_move(source_storage, destination_storage, repo_type: :project)
187        return if Specs::Helpers::ContextSelector.dot_com?
188
189        repo_path = verify_storage_move_from_gitaly(source_storage[:name], repo_type: repo_type)
190
191        destination_storage[:type] == :praefect ? verify_storage_move_to_praefect(repo_path, destination_storage[:name]) : verify_storage_move_to_gitaly(repo_path, destination_storage[:name])
192      end
193
194      def wait_for_praefect
195        wait_until_shell_command_matches(
196          "docker inspect -f {{.State.Running}} #{@praefect}",
197          /true/,
198          sleep_interval: 3,
199          max_duration: 180,
200          retry_on_exception: true
201        )
202
203        QA::Runtime::Logger.info('Wait until Praefect starts and is listening')
204        wait_until_shell_command_matches(
205          "docker exec #{@praefect} bash -c 'cat /var/log/gitlab/praefect/current'",
206          /listening at tcp address/
207        )
208
209        wait_for_gitaly_check
210      end
211
212      def wait_for_sql_ping
213        wait_until_shell_command_matches(
214          "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml sql-ping'",
215          /praefect sql-ping: OK/
216        )
217      end
218
219      def health_check_failure_message?(msg)
220        ['error when pinging healthcheck', 'failed checking node health'].include?(msg)
221      end
222
223      def wait_for_no_praefect_storage_error
224        # If a healthcheck error was the last message to be logged, we'll keep seeing that message even if it's no longer a problem
225        # That is, there's no message shown in the Praefect logs when the healthcheck succeeds
226        # To work around that we perform the gitaly check rake task, wait a few seconds, and then we confirm that no healthcheck errors appear
227
228        QA::Runtime::Logger.info("Checking that Praefect does not report healthcheck errors with its gitaly nodes")
229
230        Support::Waiter.wait_until(max_duration: 120) do
231          wait_for_gitaly_check
232
233          sleep 5
234
235          shell "docker exec #{@praefect} bash -c 'tail -n 1 /var/log/gitlab/praefect/current'" do |line|
236            QA::Runtime::Logger.debug(line.chomp)
237            log = JSON.parse(line)
238
239            break true unless health_check_failure_message?(log['msg'])
240          rescue JSON::ParserError
241            # Ignore lines that can't be parsed as JSON
242          end
243        end
244      end
245
246      def wait_for_storage_nodes
247        wait_for_no_praefect_storage_error
248
249        Support::Waiter.repeat_until(max_attempts: 3, max_duration: 120, sleep_interval: 1) do
250          nodes_confirmed = {
251            @primary_node => false,
252            @secondary_node => false,
253            @tertiary_node => false
254          }
255
256          wait_until_shell_command("docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml dial-nodes'") do |line|
257            QA::Runtime::Logger.debug(line.chomp)
258
259            nodes_confirmed.each_key do |node|
260              nodes_confirmed[node] = true if line =~ /SUCCESS: confirmed Gitaly storage "#{node}" in virtual storages \[#{@virtual_storage}\] is served/
261            end
262
263            nodes_confirmed.values.all?
264          end
265        end
266      end
267
268      def wait_for_health_check_all_nodes
269        wait_for_health_check(@primary_node)
270        wait_for_health_check(@secondary_node)
271        wait_for_health_check(@tertiary_node)
272      end
273
274      def wait_for_health_check(node)
275        QA::Runtime::Logger.info("Waiting for health check on #{node}")
276        wait_until_node_is_marked_as_healthy_storage(node)
277      end
278
279      def wait_for_primary_node_health_check
280        wait_for_health_check(@primary_node)
281      end
282
283      def wait_for_secondary_node_health_check
284        wait_for_health_check(@secondary_node)
285      end
286
287      def wait_for_tertiary_node_health_check
288        wait_for_health_check(@tertiary_node)
289      end
290
291      def wait_for_health_check_failure(node)
292        QA::Runtime::Logger.info("Waiting for health check failure on #{node}")
293        wait_until_node_is_removed_from_healthy_storages(node)
294      end
295
296      def wait_for_primary_node_health_check_failure
297        wait_for_health_check_failure(@primary_node)
298      end
299
300      def wait_for_secondary_node_health_check_failure
301        wait_for_health_check_failure(@secondary_node)
302      end
303
304      def wait_for_tertiary_node_health_check_failure
305        wait_for_health_check_failure(@tertiary_node)
306      end
307
308      def wait_until_node_is_removed_from_healthy_storages(node)
309        Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do
310          result = []
311          shell sql_to_docker_exec_cmd("SELECT count(*) FROM healthy_storages WHERE storage = '#{node}';") do |line|
312            result << line
313          end
314          QA::Runtime::Logger.debug("result is ---#{result}")
315          result[2].to_i == 0
316        end
317      end
318
319      def wait_until_node_is_marked_as_healthy_storage(node)
320        Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do
321          result = []
322          shell sql_to_docker_exec_cmd("SELECT count(*) FROM healthy_storages WHERE storage = '#{node}';") do |line|
323            result << line
324          end
325
326          QA::Runtime::Logger.debug("result is ---#{result}")
327          result[2].to_i == 1
328        end
329      end
330
331      def wait_for_gitaly_check
332        Support::Waiter.wait_until(max_duration: 120, sleep_interval: 1, raise_on_failure: true) do
333          wait_until_shell_command("docker exec #{@gitlab} bash -c 'gitlab-rake gitlab:git:fsck'") do |line|
334            QA::Runtime::Logger.debug(line.chomp)
335            line.include?('Done')
336          end
337        end
338      end
339
340      # Waits until there is an increase in the number of reads for
341      # any node compared to the number of reads provided. If a node
342      # has no pre-read data, consider it to have had zero reads.
343      def wait_for_read_count_change(pre_read_data)
344        diff_found = false
345        Support::Waiter.wait_until(sleep_interval: 1, max_duration: 60) do
346          query_read_distribution.each_with_index do |data, index|
347            diff_found = true if data[:value] > value_for_node(pre_read_data, data[:node])
348          end
349          diff_found
350        end
351      end
352
353      def value_for_node(data, node)
354        data.find(-> {{ value: 0 }}) { |item| item[:node] == node }[:value]
355      end
356
357      def wait_for_reliable_connection
358        QA::Runtime::Logger.info('Wait until GitLab and Praefect can communicate reliably')
359        wait_for_sql_ping
360        wait_for_storage_nodes
361      end
362
363      def wait_for_replication(project_id)
364        Support::Waiter.wait_until(sleep_interval: 1) { replication_queue_incomplete_count == 0 && replicated?(project_id) }
365      end
366
367      def replication_pending?
368        result = []
369        shell sql_to_docker_exec_cmd(
370          <<~SQL
371            select job from replication_queue
372            where state = 'ready'
373              and job ->> 'change' = 'update'
374              and job ->> 'target_node_storage' = '#{@primary_node}';
375          SQL
376        ) do |line|
377          result << line
378        end
379
380        # The result looks like:
381        #
382        #  job
383        #  -----------
384        #   {"change": "update", "params": null, "relative_path": "@hashed/4b/22/4b227777d4dd1fc61c6f884f48641d02b4d121d3fd328cb08b5531fcacdabf8a.git", "virtual_storage": "default", "source_node_storage": "gitaly3", "target_node_storage": "gitaly1"}
385        #  (1 row)
386        #  <blank row>
387        #
388        # Therefore when replication is pending there is at least 1 row of data plus 4 rows of metadata/layout
389
390        result.size >= 5
391      end
392
393      def list_untracked_repositories
394        untracked_repositories = []
395        shell "docker exec #{@praefect} bash -c 'gitlab-ctl praefect list-untracked-repositories'" do |line|
396          # Results look like this
397          #   The following repositories were found on disk, but missing from the tracking database:
398          #   {"relative_path":"@hashed/aa/bb.git","storage":"gitaly1","virtual_storage":"default"}
399          #   {"relative_path":"@hashed/bb/cc.git","storage":"gitaly3","virtual_storage":"default"}
400
401          QA::Runtime::Logger.debug(line.chomp)
402          untracked_repositories.append(JSON.parse(line))
403        rescue JSON::ParserError
404          # Ignore lines that can't be parsed as JSON
405        end
406
407        QA::Runtime::Logger.debug("list_untracked_repositories --- #{untracked_repositories}")
408        untracked_repositories
409      end
410
411      def track_repository_in_praefect(relative_path, storage, virtual_storage)
412        cmd = "gitlab-ctl praefect track-repository --repository-relative-path #{relative_path} --authoritative-storage #{storage} --virtual-storage-name #{virtual_storage}"
413        shell "docker exec #{@praefect} bash -c '#{cmd}'"
414      end
415
416      def remove_tracked_praefect_repository(relative_path, virtual_storage)
417        cmd = "gitlab-ctl praefect remove-repository --repository-relative-path #{relative_path} --virtual-storage-name #{virtual_storage}"
418        shell "docker exec #{@praefect} bash -c '#{cmd}'"
419      end
420
421      def add_repo_to_disk(node, repo_path)
422        cmd = "GIT_DIR=. git init --initial-branch=main /var/opt/gitlab/git-data/repositories/#{repo_path}"
423        shell "docker exec --user git #{node} bash -c '#{cmd}'"
424      end
425
426      def remove_repo_from_disk(repo_path)
427        cmd = "rm -rf /var/opt/gitlab/git-data/repositories/#{repo_path}"
428        shell "docker exec #{@primary_node} bash -c '#{cmd}'"
429        shell "docker exec #{@secondary_node} bash -c '#{cmd}'"
430        shell "docker exec #{@tertiary_node} bash -c '#{cmd}'"
431      end
432
433      def remove_repository_from_praefect_database(relative_path)
434        shell sql_to_docker_exec_cmd("delete from repositories where relative_path = '#{relative_path}';")
435        shell sql_to_docker_exec_cmd("delete from storage_repositories where relative_path = '#{relative_path}';")
436      end
437
438      def praefect_database_tracks_repo?(relative_path)
439        storage_repositories = []
440        shell sql_to_docker_exec_cmd("SELECT count(*) FROM storage_repositories where relative_path='#{relative_path}';") do |line|
441          storage_repositories << line
442        end
443        QA::Runtime::Logger.debug("storage_repositories count is ---#{storage_repositories}")
444
445        repositories = []
446        shell sql_to_docker_exec_cmd("SELECT count(*) FROM repositories where relative_path='#{relative_path}';") do |line|
447          repositories << line
448        end
449        QA::Runtime::Logger.debug("repositories count is ---#{repositories}")
450
451        (storage_repositories[2].to_i >= 1) && (repositories[2].to_i >= 1)
452      end
453
454      def repository_replicated_to_disk?(node, relative_path)
455        Support::Waiter.wait_until(max_duration: 300, sleep_interval: 3, raise_on_failure: false) do
456          result = []
457          shell sql_to_docker_exec_cmd("SELECT count(*) FROM storage_repositories where relative_path='#{relative_path}';") do |line|
458            result << line
459          end
460          QA::Runtime::Logger.debug("result is ---#{result}")
461          result[2].to_i == 3
462        end
463
464        repository_exists_on_node_disk?(node, relative_path)
465      end
466
467      def repository_exists_on_node_disk?(node, relative_path)
468        # If the dir does not exist it has a non zero exit code leading to a error being raised
469        # Instead we echo a test line if the dir does not exist, which has a zero exit code, with no output
470        bash_command = "test -d /var/opt/gitlab/git-data/repositories/#{relative_path} || echo -n 'DIR_DOES_NOT_EXIST'"
471        result = []
472        shell "docker exec #{node} bash -c '#{bash_command}'" do |line|
473          result << line
474        end
475        QA::Runtime::Logger.debug("result is ---#{result}")
476        result.exclude?("DIR_DOES_NOT_EXIST")
477      end
478
479      private
480
481      def current_primary_node
482        result = []
483        shell sql_to_docker_exec_cmd("select node_name from shard_primaries where shard_name = '#{@virtual_storage}';") do |line|
484          result << line
485        end
486        # The result looks like:
487        #  node_name
488        #  -----------
489        #   gitaly1
490        #  (1 row)
491
492        result[2].strip
493      end
494
495      def dataloss_command
496        "docker exec #{@praefect} bash -c '/opt/gitlab/embedded/bin/praefect -config /var/opt/gitlab/praefect/config.toml dataloss'"
497      end
498
499      def sql_to_docker_exec_cmd(sql)
500        Service::Shellout.sql_to_docker_exec_cmd(sql, 'postgres', 'SQL_PASSWORD', 'praefect_production', 'postgres.test', @postgres)
501      end
502
503      def verify_storage_move_from_gitaly(storage, repo_type: :project)
504        wait_until_shell_command("docker exec #{@gitlab} bash -c 'tail -n 50 /var/log/gitlab/gitaly/current'") do |line|
505          log = JSON.parse(line)
506
507          if (log['grpc.method'] == 'RenameRepository' || log['grpc.method'] == 'RemoveRepository') &&
508              log['grpc.request.repoStorage'] == storage &&
509              repo_type(log['grpc.request.repoPath']) == repo_type
510            break log['grpc.request.repoPath']
511          end
512        rescue JSON::ParserError
513          # Ignore lines that can't be parsed as JSON
514        end
515      end
516
517      def verify_storage_move_to_praefect(repo_path, virtual_storage)
518        wait_until_shell_command("docker exec #{@praefect} bash -c 'tail -n 50 /var/log/gitlab/praefect/current'") do |line|
519          log = JSON.parse(line)
520
521          log['grpc.method'] == 'ReplicateRepository' && log['virtual_storage'] == virtual_storage && log['relative_path'] == repo_path
522        rescue JSON::ParserError
523          # Ignore lines that can't be parsed as JSON
524        end
525      end
526
527      def verify_storage_move_to_gitaly(repo_path, storage)
528        wait_until_shell_command("docker exec #{@gitlab} bash -c 'tail -n 50 /var/log/gitlab/gitaly/current'") do |line|
529          log = JSON.parse(line)
530
531          log['grpc.method'] == 'ReplicateRepository' && log['grpc.request.repoStorage'] == storage && log['grpc.request.repoPath'] == repo_path
532        rescue JSON::ParserError
533          # Ignore lines that can't be parsed as JSON
534        end
535      end
536
537      def with_praefect_log(**kwargs)
538        wait_until_shell_command("docker exec #{@praefect} bash -c 'tail -n 1 /var/log/gitlab/praefect/current'", **kwargs) do |line|
539          QA::Runtime::Logger.debug(line.chomp)
540          yield JSON.parse(line)
541        end
542      end
543
544      def repo_type(repo_path)
545        return :snippet if repo_path.start_with?('@snippets')
546        return :design if repo_path.end_with?('.design.git')
547
548        if repo_path.end_with?('.wiki.git')
549          return repo_path.start_with?('@groups') ? :group_wiki : :wiki
550        end
551
552        :project
553      end
554    end
555  end
556end
557