1package datastore 2 3import ( 4 "context" 5 "fmt" 6 7 "github.com/lib/pq" 8 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" 9) 10 11// InvalidArgumentError tags the error as being caused by an invalid argument. 12type InvalidArgumentError struct{ error } 13 14func newVirtualStorageNotFoundError(virtualStorage string) error { 15 return InvalidArgumentError{fmt.Errorf("virtual storage %q not found", virtualStorage)} 16} 17 18func newUnattainableReplicationFactorError(attempted, maximum int) error { 19 return InvalidArgumentError{fmt.Errorf("attempted to set replication factor %d but virtual storage only contains %d storages", attempted, maximum)} 20} 21 22func newMinimumReplicationFactorError(replicationFactor int) error { 23 return InvalidArgumentError{fmt.Errorf("attempted to set replication factor %d but minimum is 1", replicationFactor)} 24} 25 26func newRepositoryNotFoundError(virtualStorage, relativePath string) error { 27 return InvalidArgumentError{fmt.Errorf("repository %q/%q not found", virtualStorage, relativePath)} 28} 29 30// AssignmentStore manages host assignments in Postgres. 31type AssignmentStore struct { 32 db glsql.Querier 33 configuredStorages map[string][]string 34} 35 36// NewAssignmentStore returns a new AssignmentStore using the passed in database. 37func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore { 38 return AssignmentStore{db: db, configuredStorages: configuredStorages} 39} 40 41func (s AssignmentStore) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) { 42 configuredStorages, ok := s.configuredStorages[virtualStorage] 43 if !ok { 44 return nil, newVirtualStorageNotFoundError(virtualStorage) 45 } 46 47 rows, err := s.db.QueryContext(ctx, ` 48SELECT storage 49FROM repository_assignments 50WHERE virtual_storage = $1 51AND relative_path = $2 52AND storage = ANY($3) 53`, virtualStorage, relativePath, pq.StringArray(configuredStorages)) 54 if err != nil { 55 return nil, fmt.Errorf("query: %w", err) 56 } 57 defer rows.Close() 58 59 var assignedStorages []string 60 for rows.Next() { 61 var storage string 62 if err := rows.Scan(&storage); err != nil { 63 return nil, fmt.Errorf("scan: %w", err) 64 } 65 66 assignedStorages = append(assignedStorages, storage) 67 } 68 69 if err := rows.Err(); err != nil { 70 return nil, fmt.Errorf("iterating rows: %w", err) 71 } 72 73 if len(assignedStorages) == 0 { 74 return configuredStorages, nil 75 } 76 77 return assignedStorages, nil 78} 79 80// SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met. 81// Please see the protobuf documentation of the method for details. 82func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) { 83 candidateStorages, ok := s.configuredStorages[virtualStorage] 84 if !ok { 85 return nil, newVirtualStorageNotFoundError(virtualStorage) 86 } 87 88 if replicationFactor < 1 { 89 return nil, newMinimumReplicationFactorError(replicationFactor) 90 } 91 92 if max := len(candidateStorages); replicationFactor > max { 93 return nil, newUnattainableReplicationFactorError(replicationFactor, max) 94 } 95 96 // The query works as follows: 97 // 98 // 1. `repository` CTE locks the repository's record for the duration of the update. 99 // This prevents concurrent updates to the `repository_assignments` table for the given 100 // repository. It is not sufficient to rely on row locks in `repository_assignments` 101 // as there might be rows being inserted or deleted in another transaction that 102 // our transaction does not lock. This could be the case if the replication factor 103 // is being increased concurrently from two different nodes and they assign different 104 // storages. 105 // 106 // 2. `existing_assignments` CTE gets the existing assignments for the repository. While 107 // there may be assignments in the database for storage nodes that were removed from the 108 // cluster, the query filters them out. 109 // 110 // 3. `created_assignments` CTE assigns new hosts to the repository if the replication 111 // factor has been increased. Random storages which are not yet assigned to the repository 112 // are picked until the replication factor is met. The primary of a repository is always 113 // assigned first. 114 // 115 // 4. `removed_assignments` CTE removes host assignments if the replication factor has been 116 // decreased. Primary is never removed as it needs a copy of the repository in order to 117 // accept writes. Random hosts are removed until the replication factor is met. 118 // 119 // 6. Finally we return the current set of assignments. CTE updates are not visible in the 120 // tables during the transaction. To account for that, we filter out removed assignments 121 // from the existing assignments. If the replication factor was increased, we'll include the 122 // created assignments. If the replication factor did not change, the query returns the 123 // current assignments. 124 rows, err := s.db.QueryContext(ctx, ` 125WITH repository AS ( 126 SELECT virtual_storage, relative_path, "primary" 127 FROM repositories 128 WHERE virtual_storage = $1 129 AND relative_path = $2 130 FOR UPDATE 131), 132 133existing_assignments AS ( 134 SELECT storage 135 FROM repository 136 JOIN repository_assignments USING (virtual_storage, relative_path) 137 WHERE storage = ANY($4::text[]) 138), 139 140created_assignments AS ( 141 INSERT INTO repository_assignments 142 SELECT virtual_storage, relative_path, storage 143 FROM repository 144 CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages 145 WHERE storage NOT IN ( SELECT storage FROM existing_assignments ) 146 ORDER BY CASE WHEN storage = "primary" THEN 1 ELSE 0 END DESC, random() 147 LIMIT ( SELECT GREATEST(COUNT(*), $3) - COUNT(*) FROM existing_assignments ) 148 RETURNING storage 149), 150 151removed_assignments AS ( 152 DELETE FROM repository_assignments 153 USING ( 154 SELECT virtual_storage, relative_path, storage 155 FROM repository, existing_assignments 156 WHERE storage != "primary" 157 ORDER BY random() 158 LIMIT ( SELECT COUNT(*) - LEAST(COUNT(*), $3) FROM existing_assignments ) 159 ) AS removals 160 WHERE repository_assignments.virtual_storage = removals.virtual_storage 161 AND repository_assignments.relative_path = removals.relative_path 162 AND repository_assignments.storage = removals.storage 163 RETURNING removals.storage 164) 165 166SELECT storage 167FROM existing_assignments 168WHERE storage NOT IN ( SELECT storage FROM removed_assignments ) 169UNION 170SELECT storage 171FROM created_assignments 172ORDER BY storage 173 `, virtualStorage, relativePath, replicationFactor, pq.StringArray(candidateStorages)) 174 if err != nil { 175 return nil, fmt.Errorf("query: %w", err) 176 } 177 178 defer rows.Close() 179 180 var storages []string 181 for rows.Next() { 182 var storage string 183 if err := rows.Scan(&storage); err != nil { 184 return nil, fmt.Errorf("scan: %w", err) 185 } 186 187 storages = append(storages, storage) 188 } 189 190 if len(storages) == 0 { 191 return nil, newRepositoryNotFoundError(virtualStorage, relativePath) 192 } 193 194 return storages, rows.Err() 195} 196