1package datastore 2 3import ( 4 "context" 5 "database/sql" 6 "database/sql/driver" 7 "encoding/json" 8 "errors" 9 "fmt" 10 "time" 11 12 "github.com/lib/pq" 13 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" 14) 15 16// ReplicationEventQueue allows to put new events to the persistent queue and retrieve them back. 17type ReplicationEventQueue interface { 18 // Enqueue puts provided event into the persistent queue. 19 Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) 20 // Dequeue retrieves events from the persistent queue using provided limitations and filters. 21 Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) 22 // Acknowledge updates previously dequeued events with the new state and releases resources acquired for it. 23 // It updates events that are in 'in_progress' state to the state that is passed in. 24 // It also updates state of similar events (scheduled fot the same repository with same change from the same source) 25 // that are in 'ready' state and created before the target event was dequeue for the processing if the new state is 26 // 'completed'. Otherwise it won't be changed. 27 // It returns sub-set of passed in ids that were updated. 28 Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) 29 // StartHealthUpdate starts periodical update of the event's health identifier. 30 // The events with fresh health identifier won't be considered as stale. 31 // The health update will be executed on each new entry received from trigger channel passed in. 32 // It is a blocking call that is managed by the passed in context. 33 StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error 34 // AcknowledgeStale moves replication events that are 'in_progress' state for too long (more than staleAfter) 35 // into the next state: 36 // 'failed' - in case it has more attempts to be executed 37 // 'dead' - in case it has no more attempts to be executed 38 AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error 39} 40 41func allowToAck(state JobState) error { 42 switch state { 43 case JobStateCompleted, JobStateFailed, JobStateCancelled, JobStateDead: 44 return nil 45 default: 46 return fmt.Errorf("event state is not supported: %q", state) 47 } 48} 49 50// ReplicationJob is a persistent representation of the replication job. 51type ReplicationJob struct { 52 // RepositoryID is the ID of the repository this job relates to. RepositoryID 53 // may be 0 if the job doesn't relate to any known repository. This can happen 54 // for example when the job is deleting an orphaned replica of a deleted repository. 55 RepositoryID int64 `json:"repository_id"` 56 Change ChangeType `json:"change"` 57 RelativePath string `json:"relative_path"` 58 TargetNodeStorage string `json:"target_node_storage"` 59 SourceNodeStorage string `json:"source_node_storage"` 60 VirtualStorage string `json:"virtual_storage"` 61 Params Params `json:"params"` 62} 63 64func (job *ReplicationJob) Scan(value interface{}) error { 65 if value == nil { 66 return nil 67 } 68 69 d, ok := value.([]byte) 70 if !ok { 71 return fmt.Errorf("unexpected type received: %T", value) 72 } 73 74 return json.Unmarshal(d, job) 75} 76 77func (job ReplicationJob) Value() (driver.Value, error) { 78 data, err := json.Marshal(job) 79 if err != nil { 80 return nil, err 81 } 82 return string(data), nil 83} 84 85// ReplicationEvent is a persistent representation of the replication event. 86type ReplicationEvent struct { 87 ID uint64 88 State JobState 89 Attempt int 90 LockID string 91 CreatedAt time.Time 92 UpdatedAt *time.Time 93 Job ReplicationJob 94 Meta Params 95} 96 97// Mapping returns list of references to the struct fields that correspond to the SQL columns/column aliases. 98func (event *ReplicationEvent) Mapping(columns []string) ([]interface{}, error) { 99 var mapping []interface{} 100 for _, column := range columns { 101 switch column { 102 case "id": 103 mapping = append(mapping, &event.ID) 104 case "state": 105 mapping = append(mapping, &event.State) 106 case "created_at": 107 mapping = append(mapping, &event.CreatedAt) 108 case "updated_at": 109 mapping = append(mapping, &event.UpdatedAt) 110 case "attempt": 111 mapping = append(mapping, &event.Attempt) 112 case "lock_id": 113 mapping = append(mapping, &event.LockID) 114 case "job": 115 mapping = append(mapping, &event.Job) 116 case "meta": 117 mapping = append(mapping, &event.Meta) 118 default: 119 return nil, fmt.Errorf("unknown column specified in SELECT statement: %q", column) 120 } 121 } 122 return mapping, nil 123} 124 125// Scan fills receive fields with values fetched from database based on the set of columns/column aliases. 126func (event *ReplicationEvent) Scan(columns []string, rows *sql.Rows) error { 127 mappings, err := event.Mapping(columns) 128 if err != nil { 129 return err 130 } 131 return rows.Scan(mappings...) 132} 133 134// scanReplicationEvents reads all rows and convert them into structs filling all the fields according to fetched columns/column aliases. 135func scanReplicationEvents(rows *sql.Rows) (events []ReplicationEvent, err error) { 136 columns, err := rows.Columns() 137 if err != nil { 138 return events, err 139 } 140 141 defer func() { 142 if cErr := rows.Close(); cErr != nil && err == nil { 143 err = cErr 144 } 145 }() 146 147 for rows.Next() { 148 var event ReplicationEvent 149 if err = event.Scan(columns, rows); err != nil { 150 return events, err 151 } 152 events = append(events, event) 153 } 154 155 return events, rows.Err() 156} 157 158// interface implementation protection 159var _ ReplicationEventQueue = PostgresReplicationEventQueue{} 160 161// NewPostgresReplicationEventQueue returns new instance with provided Querier as a reference to storage. 162func NewPostgresReplicationEventQueue(qc glsql.Querier) PostgresReplicationEventQueue { 163 return PostgresReplicationEventQueue{qc: qc} 164} 165 166// PostgresReplicationEventQueue is a Postgres implementation of persistent queue. 167type PostgresReplicationEventQueue struct { 168 // The main requirements for the queue implementation are: 169 // - it should not use long transactions 170 // - it should perform without problems if PgBouncer is used in between with `pool_mode = transaction` 171 // - it should perform concurrently with other queue implementations (support of horizontal scaling) 172 // - it should handle events sequentially starting with the oldest 173 // - it should handle events concurrently for multiple repositories 174 // - it should support retries 175 // 176 // Current implementation uses the following tables to mimic the queue: 177 // - replication_queue_lock 178 // - replication_queue 179 // - replication_queue_job_lock 180 // 181 // `replication_queue_lock` holds repository level locks to synchronize multiple Praefects instances 182 // working on the same queue (shared database). Only one worker is allowed to operate on a given repository at a time. 183 // The `id` column is a concatenated string of the virtual storage name, gitaly node, 184 // and repository relative path: virtual1|node1|/path/to/project.git with `|` as a delimiter (represented as <lock> 185 // elsewhere in this doc). 186 // The `acquired` column reflects whether the lock for this repository (qualified by `id` column) is taken. 187 // 188 // `replication_queue` stores the actual replication event. The job is stored as a JSONB value in the `job` column of 189 // the table. It includes `meta` column designed to store meta information such as `correlation_id` etc. Each event has 190 // corresponding value in the `lock_id` column from `replication_queue_lock` table. Each replication event will be 191 // created with the following defaults: 192 // - attempt: 3 193 // - state: `ready` 194 // - created_at: UTC timestamp 195 // - updated_at: NULL 196 // 197 // `replication_queue_job_lock` holds event specific locks to prevent multiple queue workers from operating on the same 198 // event and track the events that are protected by the <lock>. 199 // 200 // The mechanics of how the queue works is described in the `Enqueue`, `Dequeue`, `Acknowledge` methods. 201 qc glsql.Querier 202} 203 204func (rq PostgresReplicationEventQueue) Enqueue(ctx context.Context, event ReplicationEvent) (ReplicationEvent, error) { 205 // When `Enqueue` method is called: 206 // 1. Insertion of the new record into `replication_queue_lock` table, so we are ensured all events have 207 // a corresponding <lock>. If a record already exists it won't be inserted again. 208 // 2. Insertion of the new record into the `replication_queue` table with the defaults listed above, 209 // the job, the meta and corresponding <lock> used in `replication_queue_lock` table for the `lock_id` column. 210 211 query := ` 212 WITH insert_lock AS ( 213 INSERT INTO replication_queue_lock(id) 214 VALUES ($1 || '|' || $2 || '|' || $3) 215 ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id 216 RETURNING id 217 ) 218 INSERT INTO replication_queue(lock_id, job, meta) 219 SELECT insert_lock.id, $4, $5 220 FROM insert_lock 221 RETURNING id, state, created_at, updated_at, lock_id, attempt, job, meta` 222 // this will always return a single row result (because of lock uniqueness) or an error 223 rows, err := rq.qc.QueryContext(ctx, query, event.Job.VirtualStorage, event.Job.TargetNodeStorage, event.Job.RelativePath, event.Job, event.Meta) 224 if err != nil { 225 return ReplicationEvent{}, fmt.Errorf("query: %w", err) 226 } 227 228 events, err := scanReplicationEvents(rows) 229 if err != nil { 230 return ReplicationEvent{}, fmt.Errorf("scan: %w", err) 231 } 232 233 return events[0], nil 234} 235 236func (rq PostgresReplicationEventQueue) Dequeue(ctx context.Context, virtualStorage, nodeStorage string, count int) ([]ReplicationEvent, error) { 237 // When `Dequeue` method is called: 238 // 1. Events with attempts left that are either in `ready` or `failed` state are candidates for dequeuing. 239 // Events already being processed by another worker are filtered out by checking if the event is already locked 240 // in the `replication_queue_job_lock` table. 241 // 2. Events for repositories that are already locked by another Praefect instance are filtered out. 242 // Repository locks are stored in the `replication_queue_lock` table. 243 // 3. The events that still remain after filtering are dequeued. On dequeuing: 244 // - The event's attempts are decremented by 1. 245 // - The event's state is set to `in_progress` 246 // - The event's `updated_at` is set to current time in UTC. 247 // 4. For each event retrieved from the step above a new record would be created in 248 // `replication_queue_job_lock` table. Rows in this table allows us to track events that were fetched for processing 249 // and relation of them with the locks in the `replication_queue_lock` table. The reason we need it is because 250 // multiple events can be fetched for the same repository (more details on it in `Acknowledge` below). 251 // 5. Update the corresponding <lock> in `replication_queue_lock` table and column `acquired` is assigned with 252 // `TRUE` value to signal that this <lock> is busy and can't be used to fetch events (step 2.). 253 // 254 // As a special case, 'delete_replica' type events have unlimited attempts. This is to ensure we never partially apply the job 255 // by deleting the repository from the disk but leaving it still present in the database. Praefect would then see that there still 256 // is a replica on the storage, when there is none in fact. That could cause us to delete all replicas of a repository. 257 258 query := ` 259 WITH lock AS ( 260 SELECT id 261 FROM replication_queue_lock 262 WHERE id LIKE ($1 || '|' || $2 || '|%') AND NOT acquired 263 FOR UPDATE SKIP LOCKED 264 ) 265 , candidate AS ( 266 SELECT id 267 FROM replication_queue 268 WHERE id IN ( 269 SELECT DISTINCT FIRST_VALUE(queue.id) OVER (PARTITION BY lock_id, job->>'change' ORDER BY queue.created_at) 270 FROM replication_queue AS queue 271 JOIN lock ON queue.lock_id = lock.id 272 WHERE queue.state IN ('ready', 'failed' ) 273 AND NOT EXISTS (SELECT 1 FROM replication_queue_job_lock WHERE lock_id = queue.lock_id) 274 ) 275 ORDER BY created_at 276 LIMIT $3 277 FOR UPDATE 278 ) 279 , job AS ( 280 UPDATE replication_queue AS queue 281 SET attempt = CASE WHEN job->>'change' = 'delete_replica' THEN queue.attempt ELSE queue.attempt - 1 END 282 , state = 'in_progress' 283 , updated_at = NOW() AT TIME ZONE 'UTC' 284 FROM candidate 285 WHERE queue.id = candidate.id 286 RETURNING queue.id, queue.state, queue.created_at, queue.updated_at, queue.lock_id, queue.attempt, queue.job, queue.meta 287 ) 288 , track_job_lock AS ( 289 INSERT INTO replication_queue_job_lock (job_id, lock_id, triggered_at) 290 SELECT job.id, job.lock_id, NOW() AT TIME ZONE 'UTC' 291 FROM job 292 RETURNING lock_id 293 ) 294 , acquire_lock AS ( 295 UPDATE replication_queue_lock AS lock 296 SET acquired = TRUE 297 FROM track_job_lock AS tracked 298 WHERE lock.id = tracked.lock_id 299 ) 300 SELECT id, state, created_at, updated_at, lock_id, attempt, job, meta 301 FROM job 302 ORDER BY id` 303 rows, err := rq.qc.QueryContext(ctx, query, virtualStorage, nodeStorage, count) 304 if err != nil { 305 return nil, fmt.Errorf("query: %w", err) 306 } 307 308 res, err := scanReplicationEvents(rows) 309 if err != nil { 310 return nil, fmt.Errorf("scan: %w", err) 311 } 312 313 return res, nil 314} 315 316func (rq PostgresReplicationEventQueue) Acknowledge(ctx context.Context, state JobState, ids []uint64) ([]uint64, error) { 317 // When `Acknowledge` method is called: 318 // 1. The list of event `id`s and corresponding <lock>s retrieved from `replication_queue` table as passed in by the 319 // user `ids` could not exist in the table or the `state` of the event could differ from `in_progress` (it is 320 // possible to acknowledge only events previously fetched by the `Dequeue` method) 321 // 2. Based on the list fetched on previous step the delete is executed on the `replication_queue` table. In case the 322 // new state for the entry is 'dead' it will be just deleted, but if the new state is 'completed' the event will 323 // be delete as well, but all events similar to it (events for the same repository with same change type and a source) 324 // that were created before processed events were queued for processing will also be deleted. 325 // In case the new state is something different ('failed') the event will be updated only with a new state. 326 // It returns a list of event `id`s and corresponding <lock>s of the affected events during this delete/update process. 327 // 3. The removal of records in `replication_queue_job_lock` table happens that were created by step 4. of `Dequeue` 328 // method call. 329 // 4. Acquisition state of <lock>s in `replication_queue_lock` table updated by comparing amount of existing bindings 330 // in `replication_queue_lock` table for the <lock> to amount of removed bindings done on the 3. for the <lock> 331 // and if amount is the same the <lock> is free and column `acquired` assigned `FALSE` value, so this <lock> can 332 // be used in the `Dequeue` method to retrieve other events. If amounts don't match no update happens and <lock> 333 // remains acquired until all events are acknowledged (binding records removed from the `replication_queue_job_lock` 334 // table). 335 336 if len(ids) == 0 { 337 return nil, nil 338 } 339 340 if err := allowToAck(state); err != nil { 341 return nil, err 342 } 343 344 pqIDs := make(pq.Int64Array, len(ids)) 345 for i, id := range ids { 346 pqIDs[i] = int64(id) 347 } 348 349 query := ` 350 WITH existing AS ( 351 SELECT id, lock_id, updated_at, job 352 FROM replication_queue 353 WHERE id = ANY($1) 354 AND state = 'in_progress' 355 FOR UPDATE 356 ) 357 , deleted AS ( 358 DELETE FROM replication_queue AS queue 359 USING existing 360 WHERE ($2::REPLICATION_JOB_STATE = 'dead' AND existing.id = queue.id) OR ( 361 $2::REPLICATION_JOB_STATE = 'completed' 362 AND (existing.id = queue.id OR ( 363 -- this is an optimization to omit events that won't make any effect as the same event 364 -- was just applied, so we acknowledge similar events: 365 -- only not yet touched events (no attempts to process it) 366 queue.state = 'ready' 367 -- and they were created before current event was consumed for processing 368 AND queue.created_at < existing.updated_at 369 -- they are for the exact same repository 370 AND queue.lock_id = existing.lock_id 371 -- and created to apply exact same replication operation (gc, update, ...) 372 AND queue.job->>'change' = existing.job->>'change' 373 -- from the same source storage (if applicable, as 'gc' has no source) 374 AND COALESCE(queue.job->>'source_node_storage', '') = COALESCE(existing.job->>'source_node_storage', '')) 375 ) 376 ) 377 RETURNING queue.id, queue.lock_id 378 ) 379 , updated AS ( 380 UPDATE replication_queue AS queue 381 SET state = $2::REPLICATION_JOB_STATE, 382 updated_at = NOW() AT TIME ZONE 'UTC' 383 FROM existing 384 WHERE existing.id = queue.id 385 RETURNING queue.id, queue.lock_id 386 ) 387 , removed_job_lock AS ( 388 DELETE FROM replication_queue_job_lock AS job_lock 389 USING (SELECT * FROM deleted UNION SELECT * FROM updated) AS to_release 390 WHERE job_lock.job_id = to_release.id AND job_lock.lock_id = to_release.lock_id 391 RETURNING to_release.lock_id 392 ) 393 , release AS ( 394 UPDATE replication_queue_lock 395 SET acquired = FALSE 396 WHERE id IN ( 397 SELECT existing.lock_id 398 FROM (SELECT lock_id, COUNT(*) AS amount FROM removed_job_lock GROUP BY lock_id) AS removed 399 JOIN ( 400 SELECT lock_id, COUNT(*) AS amount 401 FROM replication_queue_job_lock 402 WHERE lock_id IN (SELECT lock_id FROM removed_job_lock) 403 GROUP BY lock_id 404 ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount 405 ) 406 ) 407 SELECT id 408 FROM existing` 409 rows, err := rq.qc.QueryContext(ctx, query, pqIDs, state) 410 if err != nil { 411 return nil, fmt.Errorf("query: %w", err) 412 } 413 defer rows.Close() 414 415 var acknowledged glsql.Uint64Provider 416 if err := glsql.ScanAll(rows, &acknowledged); err != nil { 417 return nil, fmt.Errorf("scan: %w", err) 418 } 419 420 return acknowledged.Values(), rows.Err() 421} 422 423// StartHealthUpdate starts periodical update of the event's health identifier. 424// The events with fresh health identifier won't be considered as stale. 425// The health update will be executed on each new entry received from trigger channel passed in. 426// It is a blocking call that is managed by the passed in context. 427func (rq PostgresReplicationEventQueue) StartHealthUpdate(ctx context.Context, trigger <-chan time.Time, events []ReplicationEvent) error { 428 if len(events) == 0 { 429 return nil 430 } 431 432 jobIDs := make(pq.Int64Array, len(events)) 433 lockIDs := make(pq.StringArray, len(events)) 434 for i := range events { 435 jobIDs[i] = int64(events[i].ID) 436 lockIDs[i] = events[i].LockID 437 } 438 439 query := ` 440 UPDATE replication_queue_job_lock 441 SET triggered_at = NOW() AT TIME ZONE 'UTC' 442 WHERE (job_id, lock_id) IN (SELECT UNNEST($1::BIGINT[]), UNNEST($2::TEXT[]))` 443 444 for { 445 select { 446 case <-ctx.Done(): 447 return nil 448 case <-trigger: 449 res, err := rq.qc.ExecContext(ctx, query, jobIDs, lockIDs) 450 if err != nil { 451 if pqError, ok := err.(*pq.Error); ok && pqError.Code.Name() == "query_canceled" { 452 return nil 453 } 454 if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { 455 return nil 456 } 457 return err 458 } 459 460 affected, err := res.RowsAffected() 461 if err != nil { 462 return err 463 } 464 465 if affected == 0 { 466 return nil 467 } 468 } 469 } 470} 471 472// AcknowledgeStale moves replication events that are 'in_progress' state for too long (more then staleAfter) 473// into the next state: 474// 'failed' - in case it has more attempts to be executed 475// 'dead' - in case it has no more attempts to be executed 476// The job considered 'in_progress' if it has corresponding entry in the 'replication_queue_job_lock' table. 477// When moving from 'in_progress' to other state the entry from 'replication_queue_job_lock' table will be 478// removed and entry in the 'replication_queue_lock' will be updated if needed (release of the lock). 479func (rq PostgresReplicationEventQueue) AcknowledgeStale(ctx context.Context, staleAfter time.Duration) error { 480 query := ` 481 WITH stale_job_lock AS ( 482 DELETE FROM replication_queue_job_lock WHERE triggered_at < NOW() AT TIME ZONE 'UTC' - INTERVAL '1 MILLISECOND' * $1 483 RETURNING job_id, lock_id 484 ) 485 , update_job AS ( 486 UPDATE replication_queue AS queue 487 SET state = (CASE WHEN attempt >= 1 THEN 'failed' ELSE 'dead' END)::REPLICATION_JOB_STATE 488 FROM stale_job_lock 489 WHERE stale_job_lock.job_id = queue.id 490 RETURNING queue.id, queue.lock_id 491 ) 492 UPDATE replication_queue_lock 493 SET acquired = FALSE 494 WHERE id IN ( 495 SELECT existing.lock_id 496 FROM (SELECT lock_id, COUNT(*) AS amount FROM stale_job_lock GROUP BY lock_id) AS removed 497 JOIN ( 498 SELECT lock_id, COUNT(*) AS amount 499 FROM replication_queue_job_lock 500 WHERE lock_id IN (SELECT lock_id FROM stale_job_lock) 501 GROUP BY lock_id 502 ) AS existing ON removed.lock_id = existing.lock_id AND removed.amount = existing.amount 503 )` 504 _, err := rq.qc.ExecContext(ctx, query, staleAfter.Milliseconds()) 505 if err != nil { 506 return fmt.Errorf("exec acknowledge stale: %w", err) 507 } 508 509 return nil 510} 511