1package datastore
2
3import (
4	"context"
5	"fmt"
6
7	"github.com/lib/pq"
8	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
9)
10
11// InvalidArgumentError tags the error as being caused by an invalid argument.
12type InvalidArgumentError struct{ error }
13
14func newVirtualStorageNotFoundError(virtualStorage string) error {
15	return InvalidArgumentError{fmt.Errorf("virtual storage %q not found", virtualStorage)}
16}
17
18func newUnattainableReplicationFactorError(attempted, maximum int) error {
19	return InvalidArgumentError{fmt.Errorf("attempted to set replication factor %d but virtual storage only contains %d storages", attempted, maximum)}
20}
21
22func newMinimumReplicationFactorError(replicationFactor int) error {
23	return InvalidArgumentError{fmt.Errorf("attempted to set replication factor %d but minimum is 1", replicationFactor)}
24}
25
26func newRepositoryNotFoundError(virtualStorage, relativePath string) error {
27	return InvalidArgumentError{fmt.Errorf("repository %q/%q not found", virtualStorage, relativePath)}
28}
29
30// AssignmentStore manages host assignments in Postgres.
31type AssignmentStore struct {
32	db                 glsql.Querier
33	configuredStorages map[string][]string
34}
35
36// NewAssignmentStore returns a new AssignmentStore using the passed in database.
37func NewAssignmentStore(db glsql.Querier, configuredStorages map[string][]string) AssignmentStore {
38	return AssignmentStore{db: db, configuredStorages: configuredStorages}
39}
40
41func (s AssignmentStore) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
42	configuredStorages, ok := s.configuredStorages[virtualStorage]
43	if !ok {
44		return nil, newVirtualStorageNotFoundError(virtualStorage)
45	}
46
47	rows, err := s.db.QueryContext(ctx, `
48SELECT storage
49FROM repository_assignments
50WHERE virtual_storage = $1
51AND   relative_path = $2
52AND   storage = ANY($3)
53`, virtualStorage, relativePath, pq.StringArray(configuredStorages))
54	if err != nil {
55		return nil, fmt.Errorf("query: %w", err)
56	}
57	defer rows.Close()
58
59	var assignedStorages []string
60	for rows.Next() {
61		var storage string
62		if err := rows.Scan(&storage); err != nil {
63			return nil, fmt.Errorf("scan: %w", err)
64		}
65
66		assignedStorages = append(assignedStorages, storage)
67	}
68
69	if err := rows.Err(); err != nil {
70		return nil, fmt.Errorf("iterating rows: %w", err)
71	}
72
73	if len(assignedStorages) == 0 {
74		return configuredStorages, nil
75	}
76
77	return assignedStorages, nil
78}
79
80// SetReplicationFactor assigns or unassigns a repository's host nodes until the desired replication factor is met.
81// Please see the protobuf documentation of the method for details.
82func (s AssignmentStore) SetReplicationFactor(ctx context.Context, virtualStorage, relativePath string, replicationFactor int) ([]string, error) {
83	candidateStorages, ok := s.configuredStorages[virtualStorage]
84	if !ok {
85		return nil, newVirtualStorageNotFoundError(virtualStorage)
86	}
87
88	if replicationFactor < 1 {
89		return nil, newMinimumReplicationFactorError(replicationFactor)
90	}
91
92	if max := len(candidateStorages); replicationFactor > max {
93		return nil, newUnattainableReplicationFactorError(replicationFactor, max)
94	}
95
96	// The query works as follows:
97	//
98	// 1. `repository` CTE locks the repository's record for the duration of the update.
99	//    This prevents concurrent updates to the `repository_assignments` table for the given
100	//    repository. It is not sufficient to rely on row locks in `repository_assignments`
101	//    as there might be rows being inserted or deleted in another transaction that
102	//    our transaction does not lock. This could be the case if the replication factor
103	//    is being increased concurrently from two different nodes and they assign different
104	//    storages.
105	//
106	// 2. `existing_assignments` CTE gets the existing assignments for the repository. While
107	//    there may be assignments in the database for storage nodes that were removed from the
108	//    cluster, the query filters them out.
109	//
110	// 3. `created_assignments` CTE assigns new hosts to the repository if the replication
111	//    factor has been increased. Random storages which are not yet assigned to the repository
112	//    are picked until the replication factor is met. The primary of a repository is always
113	//    assigned first.
114	//
115	// 4. `removed_assignments` CTE removes host assignments if the replication factor has been
116	//    decreased. Primary is never removed as it needs a copy of the repository in order to
117	//    accept writes. Random hosts are removed until the replication factor is met.
118	//
119	// 6. Finally we return the current set of assignments. CTE updates are not visible in the
120	//    tables during the transaction. To account for that, we filter out removed assignments
121	//    from the existing assignments. If the replication factor was increased, we'll include the
122	//    created assignments. If the replication factor did not change, the query returns the
123	//    current assignments.
124	rows, err := s.db.QueryContext(ctx, `
125WITH repository AS (
126	SELECT virtual_storage, relative_path, "primary"
127	FROM repositories
128	WHERE virtual_storage = $1
129	AND   relative_path   = $2
130	FOR UPDATE
131),
132
133existing_assignments AS (
134	SELECT storage
135	FROM repository
136	JOIN repository_assignments USING (virtual_storage, relative_path)
137	WHERE storage = ANY($4::text[])
138),
139
140created_assignments AS (
141	INSERT INTO repository_assignments
142	SELECT virtual_storage, relative_path, storage
143	FROM repository
144	CROSS JOIN ( SELECT unnest($4::text[]) AS storage ) AS configured_storages
145	WHERE storage NOT IN ( SELECT storage FROM existing_assignments )
146	ORDER BY CASE WHEN storage = "primary" THEN 1 ELSE 0 END DESC, random()
147	LIMIT ( SELECT GREATEST(COUNT(*), $3) - COUNT(*) FROM existing_assignments )
148	RETURNING storage
149),
150
151removed_assignments AS (
152	DELETE FROM repository_assignments
153	USING (
154		SELECT virtual_storage, relative_path, storage
155		FROM repository, existing_assignments
156		WHERE storage != "primary"
157		ORDER BY random()
158		LIMIT ( SELECT COUNT(*) - LEAST(COUNT(*), $3)  FROM existing_assignments )
159	) AS removals
160	WHERE repository_assignments.virtual_storage = removals.virtual_storage
161	AND   repository_assignments.relative_path   = removals.relative_path
162	AND   repository_assignments.storage         = removals.storage
163	RETURNING removals.storage
164)
165
166SELECT storage
167FROM existing_assignments
168WHERE storage NOT IN ( SELECT storage FROM removed_assignments )
169UNION
170SELECT storage
171FROM created_assignments
172ORDER BY storage
173	`, virtualStorage, relativePath, replicationFactor, pq.StringArray(candidateStorages))
174	if err != nil {
175		return nil, fmt.Errorf("query: %w", err)
176	}
177
178	defer rows.Close()
179
180	var storages []string
181	for rows.Next() {
182		var storage string
183		if err := rows.Scan(&storage); err != nil {
184			return nil, fmt.Errorf("scan: %w", err)
185		}
186
187		storages = append(storages, storage)
188	}
189
190	if len(storages) == 0 {
191		return nil, newRepositoryNotFoundError(virtualStorage, relativePath)
192	}
193
194	return storages, rows.Err()
195}
196