1package praefect
2
3import (
4	"context"
5	"sort"
6	"testing"
7
8	"github.com/stretchr/testify/require"
9	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
10	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
11	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
12	"google.golang.org/grpc"
13	"google.golang.org/grpc/metadata"
14)
15
16// StaticRepositoryAssignments is a static assignment of storages for each individual repository.
17type StaticRepositoryAssignments map[string]map[string][]string
18
19func (st StaticRepositoryAssignments) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) {
20	vs, ok := st[virtualStorage]
21	if !ok {
22		return nil, nodes.ErrVirtualStorageNotExist
23	}
24
25	storages, ok := vs[relativePath]
26	if !ok {
27		return nil, errRepositoryNotFound
28	}
29
30	return storages, nil
31}
32
33// PrimaryGetter is an adapter to turn conforming functions in to a PrimaryGetter.
34type PrimaryGetterFunc func(ctx context.Context, virtualStorage, relativePath string) (string, error)
35
36func (fn PrimaryGetterFunc) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) {
37	return fn(ctx, virtualStorage, relativePath)
38}
39
40func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) {
41	ctx, cancel := testhelper.Context()
42	defer cancel()
43
44	for _, tc := range []struct {
45		desc           string
46		virtualStorage string
47		numCandidates  int
48		pickCandidate  int
49		error          error
50		node           string
51	}{
52		{
53			desc:           "unknown virtual storage",
54			virtualStorage: "unknown",
55			error:          nodes.ErrVirtualStorageNotExist,
56		},
57		{
58			desc:           "picks randomly first candidate",
59			virtualStorage: "virtual-storage-1",
60			numCandidates:  2,
61			pickCandidate:  0,
62			node:           "valid-choice-1",
63		},
64		{
65			desc:           "picks randomly second candidate",
66			virtualStorage: "virtual-storage-1",
67			numCandidates:  2,
68			pickCandidate:  1,
69			node:           "valid-choice-2",
70		},
71	} {
72		t.Run(tc.desc, func(t *testing.T) {
73			conns := Connections{
74				"virtual-storage-1": {
75					"valid-choice-1": &grpc.ClientConn{},
76					"valid-choice-2": &grpc.ClientConn{},
77					"unhealthy":      &grpc.ClientConn{},
78				},
79			}
80
81			router := NewPerRepositoryRouter(
82				conns,
83				nil,
84				StaticHealthChecker{
85					"virtual-storage-1": {
86						"valid-choice-1",
87						"valid-choice-2",
88					},
89				},
90				mockRandom{
91					intnFunc: func(n int) int {
92						require.Equal(t, tc.numCandidates, n)
93						return tc.pickCandidate
94					},
95				},
96				nil,
97				nil,
98				nil,
99			)
100
101			node, err := router.RouteStorageAccessor(ctx, tc.virtualStorage)
102			require.Equal(t, tc.error, err)
103			require.Equal(t, RouterNode{
104				Storage:    tc.node,
105				Connection: conns["virtual-storage-1"][tc.node],
106			}, node)
107		})
108	}
109}
110
111func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) {
112	for _, tc := range []struct {
113		desc           string
114		virtualStorage string
115		healthyNodes   StaticHealthChecker
116		metadata       map[string]string
117		forcePrimary   bool
118		numCandidates  int
119		pickCandidate  int
120		error          error
121		node           string
122	}{
123		{
124			desc:           "unknown virtual storage",
125			virtualStorage: "unknown",
126			error:          nodes.ErrVirtualStorageNotExist,
127		},
128		{
129			desc:           "no healthy nodes",
130			virtualStorage: "virtual-storage-1",
131			healthyNodes:   map[string][]string{},
132			error:          ErrNoHealthyNodes,
133		},
134		{
135			desc:           "primary picked randomly",
136			virtualStorage: "virtual-storage-1",
137			healthyNodes: map[string][]string{
138				"virtual-storage-1": {"primary", "consistent-secondary"},
139			},
140			numCandidates: 2,
141			pickCandidate: 0,
142			node:          "primary",
143		},
144		{
145			desc:           "secondary picked randomly",
146			virtualStorage: "virtual-storage-1",
147			healthyNodes: map[string][]string{
148				"virtual-storage-1": {"primary", "consistent-secondary"},
149			},
150			numCandidates: 2,
151			pickCandidate: 1,
152			node:          "consistent-secondary",
153		},
154		{
155			desc:           "secondary picked when primary is unhealthy",
156			virtualStorage: "virtual-storage-1",
157			healthyNodes: map[string][]string{
158				"virtual-storage-1": {"consistent-secondary"},
159			},
160			numCandidates: 1,
161			node:          "consistent-secondary",
162		},
163		{
164			desc:           "no suitable nodes",
165			virtualStorage: "virtual-storage-1",
166			healthyNodes: map[string][]string{
167				"virtual-storage-1": {"inconistent-secondary"},
168			},
169			error: ErrNoSuitableNode,
170		},
171		{
172			desc:           "primary force-picked",
173			virtualStorage: "virtual-storage-1",
174			healthyNodes: map[string][]string{
175				"virtual-storage-1": {"primary", "consistent-secondary"},
176			},
177			forcePrimary: true,
178			node:         "primary",
179		},
180		{
181			desc:           "secondary not picked if force-picking unhealthy primary",
182			virtualStorage: "virtual-storage-1",
183			healthyNodes: map[string][]string{
184				"virtual-storage-1": {"consistent-secondary"},
185			},
186			forcePrimary: true,
187			error:        nodes.ErrPrimaryNotHealthy,
188		},
189	} {
190		t.Run(tc.desc, func(t *testing.T) {
191			ctx, cancel := testhelper.Context()
192			defer cancel()
193
194			ctx = testhelper.MergeIncomingMetadata(ctx, metadata.New(tc.metadata))
195
196			conns := Connections{
197				"virtual-storage-1": {
198					"primary":               &grpc.ClientConn{},
199					"consistent-secondary":  &grpc.ClientConn{},
200					"inconistent-secondary": &grpc.ClientConn{},
201					"unhealthy-secondary":   &grpc.ClientConn{},
202				},
203			}
204
205			router := NewPerRepositoryRouter(
206				conns,
207				PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) {
208					t.Helper()
209					require.Equal(t, tc.virtualStorage, virtualStorage)
210					require.Equal(t, "repository", relativePath)
211					return "primary", nil
212				}),
213				tc.healthyNodes,
214				mockRandom{
215					intnFunc: func(n int) int {
216						require.Equal(t, tc.numCandidates, n)
217						return tc.pickCandidate
218					},
219				},
220				datastore.MockRepositoryStore{
221					GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
222						t.Helper()
223						require.Equal(t, tc.virtualStorage, virtualStorage)
224						require.Equal(t, "repository", relativePath)
225						return map[string]struct{}{"primary": {}, "consistent-secondary": {}}, nil
226					},
227				},
228				nil,
229				nil,
230			)
231
232			node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository", tc.forcePrimary)
233			require.Equal(t, tc.error, err)
234			if tc.node != "" {
235				require.Equal(t, RouterNode{
236					Storage:    tc.node,
237					Connection: conns[tc.virtualStorage][tc.node],
238				}, node)
239			} else {
240				require.Empty(t, node)
241			}
242		})
243	}
244}
245
246func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) {
247	configuredNodes := map[string][]string{
248		"virtual-storage-1": {"primary", "secondary-1", "secondary-2"},
249	}
250
251	for _, tc := range []struct {
252		desc               string
253		virtualStorage     string
254		healthyNodes       StaticHealthChecker
255		consistentStorages []string
256		secondaries        []string
257		replicationTargets []string
258		error              error
259		assignedNodes      AssignmentGetter
260	}{
261		{
262			desc:           "unknown virtual storage",
263			virtualStorage: "unknown",
264			error:          nodes.ErrVirtualStorageNotExist,
265		},
266		{
267			desc:               "primary outdated",
268			virtualStorage:     "virtual-storage-1",
269			healthyNodes:       StaticHealthChecker(configuredNodes),
270			assignedNodes:      StaticStorageAssignments(configuredNodes),
271			consistentStorages: []string{"secondary-1", "secondary-2"},
272			error:              ErrRepositoryReadOnly,
273		},
274		{
275			desc:               "primary unhealthy",
276			virtualStorage:     "virtual-storage-1",
277			healthyNodes:       StaticHealthChecker{"virtual-storage-1": {"secondary-1", "secondary-2"}},
278			assignedNodes:      StaticStorageAssignments(configuredNodes),
279			consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
280			error:              nodes.ErrPrimaryNotHealthy,
281		},
282		{
283			desc:               "all secondaries consistent",
284			virtualStorage:     "virtual-storage-1",
285			healthyNodes:       StaticHealthChecker(configuredNodes),
286			assignedNodes:      StaticStorageAssignments(configuredNodes),
287			consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
288			secondaries:        []string{"secondary-1", "secondary-2"},
289		},
290		{
291			desc:               "inconsistent secondary",
292			virtualStorage:     "virtual-storage-1",
293			healthyNodes:       StaticHealthChecker(configuredNodes),
294			assignedNodes:      StaticStorageAssignments(configuredNodes),
295			consistentStorages: []string{"primary", "secondary-2"},
296			secondaries:        []string{"secondary-2"},
297			replicationTargets: []string{"secondary-1"},
298		},
299		{
300			desc:               "unhealthy secondaries",
301			virtualStorage:     "virtual-storage-1",
302			healthyNodes:       StaticHealthChecker{"virtual-storage-1": {"primary"}},
303			assignedNodes:      StaticStorageAssignments(configuredNodes),
304			consistentStorages: []string{"primary", "secondary-1"},
305			replicationTargets: []string{"secondary-1", "secondary-2"},
306		},
307		{
308			desc:               "up to date unassigned nodes are ignored",
309			virtualStorage:     "virtual-storage-1",
310			healthyNodes:       StaticHealthChecker(configuredNodes),
311			assignedNodes:      StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"primary", "secondary-1"}}},
312			consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
313			secondaries:        []string{"secondary-1"},
314		},
315		{
316			desc:               "outdated unassigned nodes are ignored",
317			virtualStorage:     "virtual-storage-1",
318			healthyNodes:       StaticHealthChecker(configuredNodes),
319			assignedNodes:      StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"primary", "secondary-1"}}},
320			consistentStorages: []string{"primary", "secondary-1"},
321			secondaries:        []string{"secondary-1"},
322		},
323		{
324			desc:               "primary is unassigned",
325			virtualStorage:     "virtual-storage-1",
326			healthyNodes:       StaticHealthChecker(configuredNodes),
327			assignedNodes:      StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"secondary-1", "secondary-2"}}},
328			consistentStorages: []string{"primary", "secondary-1", "secondary-2"},
329			secondaries:        []string{"secondary-1"},
330			replicationTargets: []string{"secondary-2"},
331			error:              errPrimaryUnassigned,
332		},
333	} {
334		t.Run(tc.desc, func(t *testing.T) {
335			ctx, cancel := testhelper.Context()
336			defer cancel()
337
338			conns := Connections{
339				"virtual-storage-1": {
340					"primary":     &grpc.ClientConn{},
341					"secondary-1": &grpc.ClientConn{},
342					"secondary-2": &grpc.ClientConn{},
343				},
344			}
345
346			router := NewPerRepositoryRouter(
347				conns,
348				PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) {
349					t.Helper()
350					require.Equal(t, tc.virtualStorage, virtualStorage)
351					require.Equal(t, "repository", relativePath)
352					return "primary", nil
353				}),
354				tc.healthyNodes,
355				nil,
356				datastore.MockRepositoryStore{
357					GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
358						t.Helper()
359						require.Equal(t, tc.virtualStorage, virtualStorage)
360						require.Equal(t, "repository", relativePath)
361						consistentStorages := map[string]struct{}{}
362						for _, storage := range tc.consistentStorages {
363							consistentStorages[storage] = struct{}{}
364						}
365
366						return consistentStorages, nil
367					},
368				},
369				tc.assignedNodes,
370				nil,
371			)
372
373			route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository")
374			require.Equal(t, tc.error, err)
375			if err == nil {
376				var secondaries []RouterNode
377				for _, secondary := range tc.secondaries {
378					secondaries = append(secondaries, RouterNode{
379						Storage:    secondary,
380						Connection: conns[tc.virtualStorage][secondary],
381					})
382				}
383
384				require.Equal(t, RepositoryMutatorRoute{
385					Primary: RouterNode{
386						Storage:    "primary",
387						Connection: conns[tc.virtualStorage]["primary"],
388					},
389					Secondaries:        secondaries,
390					ReplicationTargets: tc.replicationTargets,
391				}, route)
392			}
393		})
394	}
395}
396
397func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) {
398	configuredNodes := map[string][]string{
399		"virtual-storage-1": {"primary", "secondary-1", "secondary-2"},
400	}
401
402	type matcher func(*testing.T, RepositoryMutatorRoute)
403
404	requireOneOf := func(expected ...RepositoryMutatorRoute) matcher {
405		return func(t *testing.T, actual RepositoryMutatorRoute) {
406			sort.Slice(actual.Secondaries, func(i, j int) bool {
407				return actual.Secondaries[i].Storage < actual.Secondaries[j].Storage
408			})
409			sort.Strings(actual.ReplicationTargets)
410			require.Contains(t, expected, actual)
411		}
412	}
413
414	primaryConn := &grpc.ClientConn{}
415	secondary1Conn := &grpc.ClientConn{}
416	secondary2Conn := &grpc.ClientConn{}
417
418	for _, tc := range []struct {
419		desc                string
420		virtualStorage      string
421		healthyNodes        StaticHealthChecker
422		replicationFactor   int
423		primaryCandidates   int
424		primaryPick         int
425		secondaryCandidates int
426		matchRoute          matcher
427		error               error
428	}{
429		{
430			desc:           "no healthy nodes",
431			virtualStorage: "virtual-storage-1",
432			healthyNodes:   StaticHealthChecker{},
433			error:          ErrNoHealthyNodes,
434		},
435		{
436			desc:           "invalid virtual storage",
437			virtualStorage: "invalid",
438			error:          nodes.ErrVirtualStorageNotExist,
439		},
440		{
441			desc:              "no healthy secondaries",
442			virtualStorage:    "virtual-storage-1",
443			healthyNodes:      StaticHealthChecker{"virtual-storage-1": {"primary"}},
444			primaryCandidates: 1,
445			primaryPick:       0,
446			matchRoute: requireOneOf(
447				RepositoryMutatorRoute{
448					Primary:            RouterNode{Storage: "primary", Connection: primaryConn},
449					ReplicationTargets: []string{"secondary-1", "secondary-2"},
450				},
451			),
452		},
453		{
454			desc:              "success with all secondaries healthy",
455			virtualStorage:    "virtual-storage-1",
456			healthyNodes:      StaticHealthChecker(configuredNodes),
457			primaryCandidates: 3,
458			primaryPick:       0,
459			matchRoute: requireOneOf(
460				RepositoryMutatorRoute{
461					Primary: RouterNode{Storage: "primary", Connection: primaryConn},
462					Secondaries: []RouterNode{
463						{Storage: "secondary-1", Connection: secondary1Conn},
464						{Storage: "secondary-2", Connection: secondary2Conn},
465					},
466				},
467			),
468		},
469		{
470			desc:              "success with one secondary unhealthy",
471			virtualStorage:    "virtual-storage-1",
472			healthyNodes:      StaticHealthChecker{"virtual-storage-1": {"primary", "secondary-1"}},
473			primaryCandidates: 2,
474			primaryPick:       0,
475			matchRoute: requireOneOf(
476				RepositoryMutatorRoute{
477					Primary: RouterNode{Storage: "primary", Connection: primaryConn},
478					Secondaries: []RouterNode{
479						{Storage: "secondary-1", Connection: secondary1Conn},
480					},
481					ReplicationTargets: []string{"secondary-2"},
482				},
483			),
484		},
485		{
486			desc:              "replication factor of one configured",
487			virtualStorage:    "virtual-storage-1",
488			healthyNodes:      StaticHealthChecker(configuredNodes),
489			replicationFactor: 1,
490			primaryCandidates: 3,
491			primaryPick:       0,
492			matchRoute: requireOneOf(
493				RepositoryMutatorRoute{
494					Primary: RouterNode{Storage: "primary", Connection: primaryConn},
495				},
496			),
497		},
498		{
499			desc:                "replication factor of two configured",
500			virtualStorage:      "virtual-storage-1",
501			healthyNodes:        StaticHealthChecker(configuredNodes),
502			replicationFactor:   2,
503			primaryCandidates:   3,
504			primaryPick:         0,
505			secondaryCandidates: 2,
506			matchRoute: requireOneOf(
507				RepositoryMutatorRoute{
508					Primary:     RouterNode{Storage: "primary", Connection: primaryConn},
509					Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
510				},
511				RepositoryMutatorRoute{
512					Primary:     RouterNode{Storage: "primary", Connection: primaryConn},
513					Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}},
514				},
515			),
516		},
517		{
518			desc:                "replication factor of three configured with unhealthy secondary",
519			virtualStorage:      "virtual-storage-1",
520			healthyNodes:        StaticHealthChecker{"virtual-storage-1": {"primary", "secondary-1"}},
521			replicationFactor:   3,
522			primaryCandidates:   2,
523			primaryPick:         0,
524			secondaryCandidates: 2,
525			matchRoute: requireOneOf(
526				RepositoryMutatorRoute{
527					Primary:            RouterNode{Storage: "primary", Connection: primaryConn},
528					Secondaries:        []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}},
529					ReplicationTargets: []string{"secondary-2"},
530				},
531			),
532		},
533	} {
534		t.Run(tc.desc, func(t *testing.T) {
535			ctx, cancel := testhelper.Context()
536			defer cancel()
537
538			route, err := NewPerRepositoryRouter(
539				Connections{
540					"virtual-storage-1": {
541						"primary":     primaryConn,
542						"secondary-1": secondary1Conn,
543						"secondary-2": secondary2Conn,
544					},
545				},
546				nil,
547				tc.healthyNodes,
548				mockRandom{
549					intnFunc: func(n int) int {
550						require.Equal(t, tc.primaryCandidates, n)
551						return tc.primaryPick
552					},
553					shuffleFunc: func(n int, swap func(i, j int)) {
554						require.Equal(t, tc.secondaryCandidates, n)
555					},
556				},
557				nil,
558				nil,
559				map[string]int{"virtual-storage-1": tc.replicationFactor},
560			).RouteRepositoryCreation(ctx, tc.virtualStorage)
561			if tc.error != nil {
562				require.Equal(t, tc.error, err)
563				return
564			}
565
566			require.NoError(t, err)
567			tc.matchRoute(t, route)
568		})
569	}
570}
571