1package datastore
2
3import (
4	"context"
5	"database/sql"
6	"database/sql/driver"
7	"encoding/json"
8	"errors"
9	"fmt"
10	"time"
11
12	"github.com/lib/pq"
13	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
14)
15
16// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back.
17type ReplicationEventQueue interface {
18	// Enqueue puts provided event into the persistent queue.
19	Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error)
20	// Dequeue retrieves events from the persistent queue using provided limitations and filters.
21	Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error)
22	// Acknowledge updates previously dequeued events with the new state and releases resources acquired for it.
23	// It updates events that are in 'in_progress' state to the state that is passed in.
24	// It also updates state of similar events (scheduled fot the same repository with same change from the same source)
25	// that are in 'ready' state and created before the target event was dequeue for the processing if the new state is
26	// 'completed'. Otherwise it won't be changed.
27	// It returns sub-set of passed in ids that were updated.
28	Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error)
29	// StartHealthUpdate starts periodical update of the event's health identifier.
30	// The events with fresh health identifier won't be considered as stale.
31	// The health update will be executed on each new entry received from trigger channel passed in.
32	// It is a blocking call that is managed by the passed in context.
33	StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error
34	// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter)
35	// into the next state:
36	//   'failed' - in case it has more attempts to be executed
37	//   'dead' - in case it has no more attempts to be executed
38	AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error
39}
40
41func allowToAck(state JobState) error {
42	switch state {
43	case JobStateCompleted, JobStateFailed, JobStateCancelled, JobStateDead:
44		return nil
45	default:
46		return fmt.Errorf("event state is not supported: %q", state)
47	}
48}
49
50// ReplicationJob is a persistent representation of the replication job.
51type ReplicationJob struct {
52	// RepositoryID is the ID of the repository this job relates to. RepositoryID
53	// may be 0 if the job doesn't relate to any known repository. This can happen
54	// for example when the job is deleting an orphaned replica of a deleted repository.
55	RepositoryID      int64      `json:"repository_id"`
56	Change            ChangeType `json:"change"`
57	RelativePath      string     `json:"relative_path"`
58	TargetNodeStorage string     `json:"target_node_storage"`
59	SourceNodeStorage string     `json:"source_node_storage"`
60	VirtualStorage    string     `json:"virtual_storage"`
61	Params            Params     `json:"params"`
62}
63
64func (job *ReplicationJob) Scan(value interface{}) error {
65	if value == nil {
66		return nil
67	}
68
69	d, ok := value.([]byte)
70	if !ok {
71		return fmt.Errorf("unexpected type received: %T", value)
72	}
73
74	return json.Unmarshal(d, job)
75}
76
77func (job ReplicationJob) Value() (driver.Value, error) {
78	data, err := json.Marshal(job)
79	if err != nil {
80		return nil, err
81	}
82	return string(data), nil
83}
84
85// ReplicationEvent is a persistent representation of the replication event.
86type ReplicationEvent struct {
87	ID        uint64
88	State     JobState
89	Attempt   int
90	LockID    string
91	CreatedAt time.Time
92	UpdatedAt *time.Time
93	Job       ReplicationJob
94	Meta      Params
95}
96
97// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases.
98func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) {
99	var mapping []interface{}
100	for _, column := range columns {
101		switch column {
102		case "id":
103			mapping = append(mapping, &event.ID)
104		case "state":
105			mapping = append(mapping, &event.State)
106		case "created_at":
107			mapping = append(mapping, &event.CreatedAt)
108		case "updated_at":
109			mapping = append(mapping, &event.UpdatedAt)
110		case "attempt":
111			mapping = append(mapping, &event.Attempt)
112		case "lock_id":
113			mapping = append(mapping, &event.LockID)
114		case "job":
115			mapping = append(mapping, &event.Job)
116		case "meta":
117			mapping = append(mapping, &event.Meta)
118		default:
119			return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column)
120		}
121	}
122	return mapping, nil
123}
124
125// Scan fills receive fields with values fetched from database based on the set of columns/column aliases.
126func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error {
127	mappings, err := event.Mapping(columns)
128	if err != nil {
129		return err
130	}
131	return rows.Scan(mappings...)
132}
133
134// scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases.
135func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) {
136	columns, err := rows.Columns()
137	if err != nil {
138		return events, err
139	}
140
141	defer func() {
142		if cErr := rows.Close(); cErr != nil && err == nil {
143			err = cErr
144		}
145	}()
146
147	for rows.Next() {
148		var event ReplicationEvent
149		if err = event.Scan(columns, rows); err != nil {
150			return events, err
151		}
152		events = append(events, event)
153	}
154
155	return events, rows.Err()
156}
157
158// interface implementation protection
159var _ ReplicationEventQueue = PostgresReplicationEventQueue{}
160
161// NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage.
162func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue {
163	return PostgresReplicationEventQueue{qc: qc}
164}
165
166// PostgresReplicationEventQueue is a Postgres implementation of persistent queue.
167type PostgresReplicationEventQueue struct {
168	// The main requirements for the queue implementation are:
169	//  - it should not use long transactions
170	//  - it should perform without problems if PgBouncer is used in between with `pool_mode = transaction`
171	//  - it should perform concurrently with other queue implementations (support of horizontal scaling)
172	//  - it should handle events sequentially starting with the oldest
173	//  - it should handle events concurrently for multiple repositories
174	//  - it should support retries
175	//
176	// Current implementation uses the following tables to mimic the queue:
177	//  - replication_queue_lock
178	//  - replication_queue
179	//  - replication_queue_job_lock
180	//
181	// `replication_queue_lock` holds repository level locks to synchronize multiple Praefects instances
182	// working on the same queue (shared database). Only one worker is allowed to operate on a given repository at a time.
183	// The `id` column is a concatenated string of the virtual storage name, gitaly node,
184	// and repository relative path: virtual1|node1|/path/to/project.git with `|` as a delimiter (represented as <lock>
185	// elsewhere in this doc).
186	// The `acquired` column reflects whether the lock for this repository (qualified by `id` column) is taken.
187	//
188	// `replication_queue` stores the actual replication event. The job is stored as a JSONB value in the `job` column of
189	// the table. It includes `meta` column designed to store meta information such as `correlation_id` etc. Each event has
190	// corresponding value in the `lock_id` column from `replication_queue_lock` table. Each replication event will be
191	// created with the following defaults:
192	//  - attempt: 3
193	//  - state: `ready`
194	//  - created_at: UTC timestamp
195	//  - updated_at: NULL
196	//
197	// `replication_queue_job_lock` holds event specific locks to prevent multiple queue workers from operating on the same
198	// event and track the events that are protected by the <lock>.
199	//
200	// The mechanics of how the queue works is described in the `Enqueue`, `Dequeue`, `Acknowledge` methods.
201	qc glsql.Querier
202}
203
204func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) {
205	// When `Enqueue` method is called:
206	//  1. Insertion of the new record into `replication_queue_lock` table, so we are ensured all events have
207	//     a corresponding <lock>. If a record already exists it won't be inserted again.
208	//  2. Insertion of the new record into the `replication_queue` table with the defaults listed above,
209	//     the job, the meta and corresponding <lock> used in `replication_queue_lock` table for the `lock_id` column.
210
211	query := `
212		WITH insert_lock AS (
213			INSERT INTO replication_queue_lock(id)
214			VALUES ($1 || '|' || $2 || '|' || $3)
215			ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id
216			RETURNING id
217		)
218		INSERT INTO replication_queue(lock_id, job, meta)
219		SELECT insert_lock.id, $4, $5
220		FROM insert_lock
221		RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta`
222	// this will always return a single row result (because of lock uniqueness) or an error
223	rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta)
224	if err != nil {
225		return ReplicationEvent{}, fmt.Errorf("query: %w", err)
226	}
227
228	events, err := scanReplicationEvents(rows)
229	if err != nil {
230		return ReplicationEvent{}, fmt.Errorf("scan: %w", err)
231	}
232
233	return events[0], nil
234}
235
236func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) {
237	// When `Dequeue` method is called:
238	//  1. Events with attempts left that are either in `ready` or `failed` state are candidates for dequeuing.
239	//     Events already being processed by another worker are filtered out by checking if the event is already locked
240	//     in the `replication_queue_job_lock` table.
241	//  2. Events for repositories that are already locked by another Praefect instance are filtered out.
242	//     Repository locks are stored in the `replication_queue_lock` table.
243	//  3. The events that still remain after filtering are dequeued. On dequeuing:
244	//      - The event's attempts are decremented by 1.
245	//      - The event's state is set to `in_progress`
246	//      - The event's `updated_at` is set to current time in UTC.
247	//  4. For each event retrieved from the step above a new record would be created in
248	//     `replication_queue_job_lock` table. Rows in this table allows us to track events that were fetched for processing
249	//     and relation of them with the locks in the `replication_queue_lock` table. The reason we need it is because
250	//     multiple events can be fetched for the same repository (more details on it in `Acknowledge` below).
251	//  5. Update the corresponding <lock> in `replication_queue_lock` table and column `acquired` is assigned with
252	//     `TRUE` value to signal that this <lock> is busy and can't be used to fetch events (step 2.).
253	//
254	//  As a special case, 'delete_replica' type events have unlimited attempts. This is to ensure we never partially apply the job
255	//  by deleting the repository from the disk but leaving it still present in the database. Praefect would then see that there still
256	//  is a replica on the storage, when there is none in fact. That could cause us to delete all replicas of a repository.
257
258	query := `
259		WITH lock AS (
260			SELECT id
261			FROM replication_queue_lock
262			WHERE id LIKE ($1 || '|' || $2 || '|%') AND NOT acquired
263			FOR UPDATE SKIP LOCKED
264		)
265		, candidate AS (
266			SELECT id
267			FROM replication_queue
268			WHERE id IN (
269				SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change'  ORDER BY queue.created_at)
270				FROM replication_queue AS queue
271				JOIN lock ON queue.lock_id = lock.id
272				WHERE queue.state IN ('ready', 'failed' )
273					AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = queue.lock_id)
274			)
275			ORDER BY created_at
276			LIMIT $3
277			FOR UPDATE
278		)
279		, job AS (
280			UPDATE replication_queue AS queue
281			SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END
282				, state = 'in_progress'
283				, updated_at = NOW() AT TIME ZONE 'UTC'
284			FROM candidate
285			WHERE queue.id = candidate.id
286			RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta
287		)
288		, track_job_lock AS (
289			INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at)
290			SELECT job.id, job.lock_id, NOW() AT TIME ZONE 'UTC'
291			FROM job
292			RETURNING lock_id
293		)
294		, acquire_lock AS (
295			UPDATE replication_queue_lock AS lock
296			SET acquired = TRUE
297			FROM track_job_lock AS tracked
298			WHERE lock.id = tracked.lock_id
299		)
300		SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta
301		FROM job
302		ORDER BY id`
303	rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count)
304	if err != nil {
305		return nil, fmt.Errorf("query: %w", err)
306	}
307
308	res, err := scanReplicationEvents(rows)
309	if err != nil {
310		return nil, fmt.Errorf("scan: %w", err)
311	}
312
313	return res, nil
314}
315
316func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) {
317	// When `Acknowledge` method is called:
318	//  1. The list of event `id`s and corresponding <lock>s retrieved from `replication_queue` table as passed in by the
319	//     user `ids` could not exist in the table or the `state` of the event could differ from `in_progress` (it is
320	//     possible to acknowledge only events previously fetched by the `Dequeue` method)
321	//  2. Based on the list fetched on previous step the delete is executed on the `replication_queue` table. In case the
322	//     new state for the entry is 'dead' it will be just deleted, but if the new state is 'completed' the event will
323	//     be delete as well, but all events similar to it (events for the same repository with same change type and a source)
324	//     that were created before processed events were queued for processing will also be deleted.
325	//     In case the new state is something different ('failed') the event will be updated only with a new state.
326	//     It returns a list of event `id`s and corresponding <lock>s of the affected events during this delete/update process.
327	//  3. The removal of records in `replication_queue_job_lock` table happens that were created by step 4. of `Dequeue`
328	//     method call.
329	//  4. Acquisition state of <lock>s in `replication_queue_lock` table updated by comparing amount of existing bindings
330	//     in `replication_queue_lock` table for the <lock> to amount of removed bindings done on the 3. for the <lock>
331	//     and if amount is the same the <lock> is free and column `acquired` assigned `FALSE` value, so this <lock> can
332	//     be used in the `Dequeue` method to retrieve other events. If amounts don't match no update happens and <lock>
333	//     remains acquired until all events are acknowledged (binding records removed from the `replication_queue_job_lock`
334	//     table).
335
336	if len(ids) == 0 {
337		return nil, nil
338	}
339
340	if err := allowToAck(state); err != nil {
341		return nil, err
342	}
343
344	pqIDs := make(pq.Int64Array, len(ids))
345	for i, id := range ids {
346		pqIDs[i] = int64(id)
347	}
348
349	query := `
350		WITH existing AS (
351			SELECT id, lock_id, updated_at, job
352			FROM replication_queue
353			WHERE id = ANY($1)
354			AND state = 'in_progress'
355			FOR UPDATE
356		)
357		, deleted AS (
358			DELETE FROM replication_queue AS queue
359			USING existing
360			WHERE ($2::REPLICATION_JOB_STATE = 'dead' AND existing.id = queue.id) OR (
361				$2::REPLICATION_JOB_STATE = 'completed'
362				AND (existing.id = queue.id OR (
363					-- this is an optimization to omit events that won't make any effect as the same event
364					-- was just applied, so we acknowledge similar events:
365					-- only not yet touched events (no attempts to process it)
366					queue.state = 'ready'
367					-- and they were created before current event was consumed for processing
368					AND queue.created_at < existing.updated_at
369					-- they are for the exact same repository
370					AND queue.lock_id = existing.lock_id
371					-- and created to apply exact same replication operation (gc, update, ...)
372					AND queue.job->>'change' = existing.job->>'change'
373					-- from the same source storage (if applicable, as 'gc' has no source)
374					AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', ''))
375				)
376			)
377			RETURNING queue.id, queue.lock_id
378		)
379		, updated AS (
380			UPDATE replication_queue AS queue
381			SET state = $2::REPLICATION_JOB_STATE,
382				updated_at = NOW() AT TIME ZONE 'UTC'
383			FROM existing
384			WHERE existing.id = queue.id
385			RETURNING queue.id, queue.lock_id
386		)
387		, removed_job_lock AS (
388			DELETE FROM replication_queue_job_lock AS job_lock
389			USING (SELECT * FROM deleted UNION SELECT * FROM updated) AS to_release
390			WHERE job_lock.job_id = to_release.id AND job_lock.lock_id = to_release.lock_id
391			RETURNING to_release.lock_id
392		)
393		, release AS (
394			UPDATE replication_queue_lock
395			SET acquired = FALSE
396			WHERE id IN (
397				SELECT existing.lock_id
398				FROM (SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id) AS removed
399				JOIN (
400					SELECT lock_id, COUNT(*) AS amount
401					FROM replication_queue_job_lock
402					WHERE lock_id IN (SELECT lock_id FROM removed_job_lock)
403					GROUP BY lock_id
404				) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
405			)
406		)
407		SELECT id
408		FROM existing`
409	rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state)
410	if err != nil {
411		return nil, fmt.Errorf("query: %w", err)
412	}
413	defer rows.Close()
414
415	var acknowledged glsql.Uint64Provider
416	if err := glsql.ScanAll(rows, &acknowledged); err != nil {
417		return nil, fmt.Errorf("scan: %w", err)
418	}
419
420	return acknowledged.Values(), rows.Err()
421}
422
423// StartHealthUpdate starts periodical update of the event's health identifier.
424// The events with fresh health identifier won't be considered as stale.
425// The health update will be executed on each new entry received from trigger channel passed in.
426// It is a blocking call that is managed by the passed in context.
427func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error {
428	if len(events) == 0 {
429		return nil
430	}
431
432	jobIDs := make(pq.Int64Array, len(events))
433	lockIDs := make(pq.StringArray, len(events))
434	for i := range events {
435		jobIDs[i] = int64(events[i].ID)
436		lockIDs[i] = events[i].LockID
437	}
438
439	query := `
440		UPDATE replication_queue_job_lock
441		SET triggered_at = NOW() AT TIME ZONE 'UTC'
442		WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))`
443
444	for {
445		select {
446		case <-ctx.Done():
447			return nil
448		case <-trigger:
449			res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs)
450			if err != nil {
451				if pqError, ok := err.(*pq.Error); ok && pqError.Code.Name() == "query_canceled" {
452					return nil
453				}
454				if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
455					return nil
456				}
457				return err
458			}
459
460			affected, err := res.RowsAffected()
461			if err != nil {
462				return err
463			}
464
465			if affected == 0 {
466				return nil
467			}
468		}
469	}
470}
471
472// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter)
473// into the next state:
474//   'failed' - in case it has more attempts to be executed
475//   'dead' - in case it has no more attempts to be executed
476// The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table.
477// When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be
478// removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock).
479func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error {
480	query := `
481		WITH stale_job_lock AS (
482			DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() AT TIME ZONE 'UTC' - INTERVAL '1 MILLISECOND' * $1
483			RETURNING job_id, lock_id
484		)
485		, update_job AS (
486			UPDATE replication_queue AS queue
487			SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE
488			FROM stale_job_lock
489			WHERE stale_job_lock.job_id = queue.id
490			RETURNING queue.id, queue.lock_id
491		)
492		UPDATE replication_queue_lock
493		SET acquired = FALSE
494		WHERE id IN (
495			SELECT existing.lock_id
496			FROM (SELECT lock_id, COUNT(*) AS amount FROM stale_job_lock GROUP BY lock_id) AS removed
497			JOIN (
498				SELECT lock_id, COUNT(*) AS amount
499				FROM replication_queue_job_lock
500				WHERE lock_id IN (SELECT lock_id FROM stale_job_lock)
501				GROUP BY lock_id
502			) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount
503		)`
504	_, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds())
505	if err != nil {
506		return fmt.Errorf("exec acknowledge stale: %w", err)
507	}
508
509	return nil
510}
511