1package nodes
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"net"
8	"testing"
9	"time"
10
11	"github.com/stretchr/testify/require"
12	"gitlab.com/gitlab-org/gitaly/v14/client"
13	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
14	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
15	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
16	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
17	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
18	"google.golang.org/grpc"
19	"google.golang.org/grpc/health"
20	"google.golang.org/grpc/health/grpc_health_v1"
21)
22
23type nodeAssertion struct {
24	Storage string
25	Address string
26}
27
28type shardAssertion struct {
29	Primary     *nodeAssertion
30	Secondaries []nodeAssertion
31}
32
33func toNodeAssertion(n Node) *nodeAssertion {
34	if n == nil {
35		return nil
36	}
37
38	return &nodeAssertion{
39		Storage: n.GetStorage(),
40		Address: n.GetAddress(),
41	}
42}
43
44func assertShard(t *testing.T, exp shardAssertion, act Shard) {
45	t.Helper()
46
47	actSecondaries := make([]nodeAssertion, 0, len(act.Secondaries))
48	for _, n := range act.Secondaries {
49		actSecondaries = append(actSecondaries, *toNodeAssertion(n))
50	}
51
52	require.Equal(t, exp, shardAssertion{
53		Primary:     toNodeAssertion(act.Primary),
54		Secondaries: actSecondaries,
55	})
56}
57
58func TestNodeStatus(t *testing.T) {
59	socket := testhelper.GetTemporaryGitalySocketFileName(t)
60	healthSvr := testhelper.NewServerWithHealth(t, socket)
61
62	cc, err := grpc.Dial(
63		"unix://"+socket,
64		grpc.WithInsecure(),
65	)
66
67	require.NoError(t, err)
68
69	mockHistogramVec := promtest.NewMockHistogramVec()
70
71	storageName := "default"
72	cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec, nil)
73
74	ctx, cancel := testhelper.Context()
75	defer cancel()
76
77	var expectedLabels [][]string
78	for i := 0; i < healthcheckThreshold; i++ {
79		status, err := cs.CheckHealth(ctx)
80
81		require.NoError(t, err)
82		require.True(t, status)
83		expectedLabels = append(expectedLabels, []string{storageName})
84	}
85
86	require.Equal(t, expectedLabels, mockHistogramVec.LabelsCalled())
87	require.Len(t, mockHistogramVec.Observer().Observed(), healthcheckThreshold)
88
89	healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
90
91	status, err := cs.CheckHealth(ctx)
92	require.NoError(t, err)
93	require.False(t, status)
94}
95
96func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) {
97	const virtualStorageName = "virtual-storage-0"
98	const primaryStorage = "praefect-internal-0"
99	socket0, socket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
100	virtualStorage := &config.VirtualStorage{
101		Name: virtualStorageName,
102		Nodes: []*config.Node{
103			{
104				Storage: primaryStorage,
105				Address: "unix://" + socket0,
106			},
107			{
108				Storage: "praefect-internal-1",
109				Address: "unix://" + socket1,
110			},
111		},
112	}
113
114	healthSrv := testhelper.NewServerWithHealth(t, socket0)
115	testhelper.NewServerWithHealth(t, socket1)
116
117	conf := config.Config{
118		Failover:        config.Failover{Enabled: false, ElectionStrategy: config.ElectionStrategySQL},
119		VirtualStorages: []*config.VirtualStorage{virtualStorage},
120	}
121	nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
122	require.NoError(t, err)
123
124	nm.Start(time.Millisecond, time.Millisecond)
125
126	ctx, cancel := testhelper.Context()
127	defer cancel()
128
129	shard, err := nm.GetShard(ctx, virtualStorageName)
130	require.NoError(t, err)
131	require.Equal(t, primaryStorage, shard.Primary.GetStorage())
132
133	healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
134	nm.checkShards()
135
136	_, err = nm.GetShard(ctx, virtualStorageName)
137	require.Error(t, err)
138	require.Equal(t, ErrPrimaryNotHealthy, err)
139}
140
141func TestDialWithUnhealthyNode(t *testing.T) {
142	primaryLn, err := net.Listen("unix", testhelper.GetTemporaryGitalySocketFileName(t))
143	require.NoError(t, err)
144
145	primaryAddress := "unix://" + primaryLn.Addr().String()
146	const secondaryAddress = "unix://does-not-exist"
147	const storageName = "default"
148
149	conf := config.Config{
150		VirtualStorages: []*config.VirtualStorage{
151			{
152				Name: storageName,
153				Nodes: []*config.Node{
154					{
155						Storage: "starts",
156						Address: primaryAddress,
157					},
158					{
159						Storage: "never-starts",
160						Address: secondaryAddress,
161					},
162				},
163			},
164		},
165	}
166
167	testhelper.NewHealthServerWithListener(t, primaryLn)
168
169	mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
170	require.NoError(t, err)
171
172	mgr.Start(1*time.Millisecond, 1*time.Millisecond)
173
174	ctx, cancel := testhelper.Context()
175	defer cancel()
176
177	shard, err := mgr.GetShard(ctx, storageName)
178	require.NoError(t, err)
179	assertShard(t, shardAssertion{
180		Primary:     &nodeAssertion{Storage: "starts", Address: primaryAddress},
181		Secondaries: []nodeAssertion{{Storage: "never-starts", Address: secondaryAddress}},
182	}, shard)
183}
184
185func TestNodeManager(t *testing.T) {
186	internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
187	healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0)
188	healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1)
189
190	node1 := &config.Node{
191		Storage: "praefect-internal-0",
192		Address: "unix://" + internalSocket0,
193	}
194
195	node2 := &config.Node{
196		Storage: "praefect-internal-1",
197		Address: "unix://" + internalSocket1,
198	}
199
200	virtualStorages := []*config.VirtualStorage{
201		{
202			Name:  "virtual-storage-0",
203			Nodes: []*config.Node{node1, node2},
204		},
205	}
206
207	confWithFailover := config.Config{
208		VirtualStorages: virtualStorages,
209		Failover:        config.Failover{Enabled: true},
210	}
211	confWithoutFailover := config.Config{
212		VirtualStorages: virtualStorages,
213		Failover:        config.Failover{Enabled: false},
214	}
215
216	mockHistogram := promtest.NewMockHistogramVec()
217	nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil)
218	require.NoError(t, err)
219
220	nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil)
221	require.NoError(t, err)
222
223	// monitoring period set to 1 hour as we execute health checks by hands in this test
224	nm.Start(0, time.Hour)
225	nmWithoutFailover.Start(0, time.Hour)
226
227	ctx, cancel := testhelper.Context()
228	defer cancel()
229
230	shardWithoutFailover, err := nmWithoutFailover.GetShard(ctx, "virtual-storage-0")
231	require.NoError(t, err)
232
233	shard, err := nm.GetShard(ctx, "virtual-storage-0")
234	require.NoError(t, err)
235
236	// shard without failover and shard with failover should be the same
237	initialState := shardAssertion{
238		Primary:     &nodeAssertion{node1.Storage, node1.Address},
239		Secondaries: []nodeAssertion{{node2.Storage, node2.Address}},
240	}
241	assertShard(t, initialState, shard)
242	assertShard(t, initialState, shardWithoutFailover)
243
244	const unhealthyCheckCount = 1
245	const healthyCheckCount = healthcheckThreshold
246	checkShards := func(count int) {
247		for i := 0; i < count; i++ {
248			nm.checkShards()
249			nmWithoutFailover.checkShards()
250		}
251	}
252
253	healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
254	checkShards(unhealthyCheckCount)
255
256	labelsCalled := mockHistogram.LabelsCalled()
257	for _, node := range virtualStorages[0].Nodes {
258		require.Contains(t, labelsCalled, []string{node.Storage})
259	}
260
261	// since the failover is disabled the attempt to get a shard with unhealthy primary fails
262	_, err = nmWithoutFailover.GetShard(ctx, "virtual-storage-0")
263	require.Error(t, err)
264	require.Equal(t, ErrPrimaryNotHealthy, err)
265
266	// since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy
267	// secondary to primary
268	shard, err = nm.GetShard(ctx, "virtual-storage-0")
269	require.NoError(t, err)
270	// shard with failover should have promoted a secondary to primary and demoted the primary to a secondary
271	assertShard(t, shardAssertion{
272		Primary:     &nodeAssertion{node2.Storage, node2.Address},
273		Secondaries: []nodeAssertion{{node1.Storage, node1.Address}},
274	}, shard)
275
276	// failing back to the original primary
277	healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
278	healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
279	checkShards(healthyCheckCount)
280
281	shard, err = nm.GetShard(ctx, "virtual-storage-0")
282	require.NoError(t, err)
283
284	assertShard(t, shardAssertion{
285		Primary:     &nodeAssertion{node1.Storage, node1.Address},
286		Secondaries: []nodeAssertion{{node2.Storage, node2.Address}},
287	}, shard)
288
289	primary, err := nm.GetPrimary(ctx, "virtual-storage-0", "")
290	require.NoError(t, err)
291	require.Equal(t, shard.Primary.GetStorage(), primary)
292
293	healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
294	healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
295	checkShards(unhealthyCheckCount)
296
297	_, err = nm.GetShard(ctx, "virtual-storage-0")
298	require.Error(t, err, "should return error since no nodes are healthy")
299
300	_, err = nm.GetPrimary(ctx, "virtual-storage-0", "")
301	require.Equal(t, ErrPrimaryNotHealthy, err)
302}
303
304func TestMgr_GetSyncedNode(t *testing.T) {
305	const count = 3
306	const virtualStorage = "virtual-storage-0"
307	const repoPath = "path/1"
308
309	var healthSrvs [count]*health.Server
310	var nodes [count]*config.Node
311	for i := 0; i < count; i++ {
312		socket := testhelper.GetTemporaryGitalySocketFileName(t)
313		healthSrvs[i] = testhelper.NewServerWithHealth(t, socket)
314		nodes[i] = &config.Node{Storage: fmt.Sprintf("gitaly-%d", i), Address: "unix://" + socket}
315	}
316
317	conf := config.Config{
318		VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: nodes[:]}},
319	}
320
321	ctx, cancel := testhelper.Context()
322	defer cancel()
323
324	var consistentSecondariesErr error
325	consistentStorages := map[string]struct{}{}
326
327	verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) {
328		conf.Failover.Enabled = failover
329		rs := datastore.MockRepositoryStore{
330			GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
331				return consistentStorages, consistentSecondariesErr
332			},
333		}
334
335		nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil)
336		require.NoError(t, err)
337
338		for i := range healthSrvs {
339			healthSrvs[i].SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
340		}
341		nm.Start(0, time.Hour)
342
343		return func(t *testing.T) { scenario(t, nm, rs) }
344	}
345
346	t.Run("unknown virtual storage", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
347		consistentSecondariesErr = ErrVirtualStorageNotExist
348		_, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "stub")
349		require.True(t, errors.Is(err, ErrVirtualStorageNotExist))
350	}))
351
352	t.Run("state is undefined", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
353		consistentSecondariesErr = nil
354		node, err := nm.GetSyncedNode(ctx, virtualStorage, "no/matter")
355		require.NoError(t, err)
356		require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "")
357	}))
358
359	t.Run("no up to date storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
360		consistentStorages = nil
361
362		node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
363		require.NoError(t, err)
364		require.Equal(t, "gitaly-0", node.GetStorage())
365	}))
366
367	t.Run("multiple storages up to date", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
368		consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}, "gitaly-2": {}}
369
370		chosen := map[Node]struct{}{}
371		for i := 0; i < 1000 && len(chosen) < 2; i++ {
372			node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
373			require.NoError(t, err)
374			chosen[node] = struct{}{}
375		}
376		if len(chosen) < 2 {
377			require.FailNow(t, "no distribution in too many attempts")
378		}
379	}))
380
381	t.Run("single secondary storage up to date but unhealthy", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
382		consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
383
384		healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
385
386		shard, err := nm.GetShard(ctx, virtualStorage)
387		require.NoError(t, err)
388
389		gitaly1, err := shard.GetNode("gitaly-1")
390		require.NoError(t, err)
391
392		ok, err := gitaly1.CheckHealth(ctx)
393		require.NoError(t, err)
394		require.False(t, ok)
395
396		node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
397		require.NoError(t, err)
398		require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "secondary shouldn't be chosen as it is unhealthy")
399	}))
400
401	t.Run("no healthy storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
402		consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
403
404		healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
405		healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
406
407		shard, err := nm.GetShard(ctx, virtualStorage)
408		require.NoError(t, err)
409
410		gitaly0, err := shard.GetNode("gitaly-0")
411		require.NoError(t, err)
412
413		gitaly0OK, err := gitaly0.CheckHealth(ctx)
414		require.NoError(t, err)
415		require.False(t, gitaly0OK)
416
417		gitaly1, err := shard.GetNode("gitaly-1")
418		require.NoError(t, err)
419
420		gitaly1OK, err := gitaly1.CheckHealth(ctx)
421		require.NoError(t, err)
422		require.False(t, gitaly1OK)
423
424		_, err = nm.GetSyncedNode(ctx, virtualStorage, repoPath)
425		require.True(t, errors.Is(err, ErrPrimaryNotHealthy))
426	}))
427
428	t.Run("disabled failover doesn't disable health state", verify(false, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) {
429		consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}}
430
431		shard, err := nm.GetShard(ctx, virtualStorage)
432		require.NoError(t, err)
433
434		gitaly0, err := shard.GetNode("gitaly-0")
435		require.NoError(t, err)
436
437		require.Equal(t, shard.Primary, gitaly0)
438
439		gitaly0OK, err := gitaly0.CheckHealth(ctx)
440		require.NoError(t, err)
441		require.True(t, gitaly0OK)
442
443		gitaly1, err := shard.GetNode("gitaly-1")
444		require.NoError(t, err)
445
446		gitaly1OK, err := gitaly1.CheckHealth(ctx)
447		require.NoError(t, err)
448		require.True(t, gitaly1OK)
449
450		healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN)
451		gitaly0OK, err = gitaly0.CheckHealth(ctx)
452		require.NoError(t, err)
453		require.False(t, gitaly0OK, "primary should be unhealthy")
454
455		node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath)
456		require.NoError(t, err)
457		require.Equal(t, conf.VirtualStorages[0].Nodes[1].Address, node.GetAddress(), "primary shouldn't be chosen as it is unhealthy")
458	}))
459}
460
461func TestNodeStatus_IsHealthy(t *testing.T) {
462	checkNTimes := func(ctx context.Context, t *testing.T, ns *nodeStatus, n int) {
463		for i := 0; i < n; i++ {
464			_, err := ns.CheckHealth(ctx)
465			require.NoError(t, err)
466		}
467	}
468
469	socket := testhelper.GetTemporaryGitalySocketFileName(t)
470	address := "unix://" + socket
471
472	healthSrv := testhelper.NewServerWithHealth(t, socket)
473
474	clientConn, err := client.Dial(address, nil)
475	require.NoError(t, err)
476	defer func() { require.NoError(t, clientConn.Close()) }()
477
478	node := config.Node{Storage: "gitaly-0", Address: address}
479
480	ctx, cancel := testhelper.Context()
481	defer cancel()
482
483	logger := testhelper.DiscardTestLogger(t)
484	latencyHistMock := &promtest.MockHistogramVec{}
485
486	t.Run("unchecked node is unhealthy", func(t *testing.T) {
487		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
488		require.False(t, ns.IsHealthy())
489	})
490
491	t.Run("not enough check to consider it healthy", func(t *testing.T) {
492		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
493		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
494		checkNTimes(ctx, t, ns, healthcheckThreshold-1)
495
496		require.False(t, ns.IsHealthy())
497	})
498
499	t.Run("healthy", func(t *testing.T) {
500		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
501		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
502		checkNTimes(ctx, t, ns, healthcheckThreshold)
503
504		require.True(t, ns.IsHealthy())
505	})
506
507	t.Run("healthy turns into unhealthy after single failed check", func(t *testing.T) {
508		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
509		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
510		checkNTimes(ctx, t, ns, healthcheckThreshold)
511
512		require.True(t, ns.IsHealthy(), "node must be turned into healthy state")
513
514		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
515		checkNTimes(ctx, t, ns, 1)
516
517		require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state")
518	})
519
520	t.Run("unhealthy turns into healthy after pre-define threshold of checks", func(t *testing.T) {
521		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
522		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
523		checkNTimes(ctx, t, ns, healthcheckThreshold)
524
525		require.True(t, ns.IsHealthy(), "node must be turned into healthy state")
526
527		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
528		checkNTimes(ctx, t, ns, 1)
529
530		require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state")
531
532		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
533		for i := 1; i < healthcheckThreshold; i++ {
534			checkNTimes(ctx, t, ns, 1)
535			require.False(t, ns.IsHealthy(), "node must be unhealthy until defined threshold of checks complete positively")
536		}
537		checkNTimes(ctx, t, ns, 1) // the last check that must turn it into healthy state
538
539		require.True(t, ns.IsHealthy(), "node should be healthy again")
540	})
541
542	t.Run("concurrent access has no races", func(t *testing.T) {
543		ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil)
544		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
545
546		t.Run("continuously does health checks - 1", func(t *testing.T) {
547			t.Parallel()
548			checkNTimes(ctx, t, ns, healthcheckThreshold)
549		})
550
551		t.Run("continuously checks health - 1", func(t *testing.T) {
552			t.Parallel()
553			ns.IsHealthy()
554		})
555
556		t.Run("continuously does health checks - 2", func(t *testing.T) {
557			t.Parallel()
558			checkNTimes(ctx, t, ns, healthcheckThreshold)
559		})
560
561		t.Run("continuously checks health - 2", func(t *testing.T) {
562			t.Parallel()
563			ns.IsHealthy()
564		})
565	})
566}
567