1package praefect
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"io"
8	"math/rand"
9	"strconv"
10	"strings"
11	"sync"
12	"sync/atomic"
13	"testing"
14	"time"
15
16	"github.com/prometheus/client_golang/prometheus"
17	"github.com/prometheus/client_golang/prometheus/testutil"
18	"github.com/sirupsen/logrus"
19	"github.com/stretchr/testify/assert"
20	"github.com/stretchr/testify/require"
21	"gitlab.com/gitlab-org/gitaly/v14/client"
22	"gitlab.com/gitlab-org/gitaly/v14/internal/cache"
23	"gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service"
24	"gitlab.com/gitlab-org/gitaly/v14/internal/helper"
25	gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata"
26	"gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag"
27	"gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler"
28	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr"
29	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config"
30	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore"
31	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql"
32	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy"
33	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock"
34	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes"
35	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry"
36	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions"
37	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper"
38	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest"
39	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert"
40	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg"
41	"gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver"
42	"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo"
43	"gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting"
44	"gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
45	"gitlab.com/gitlab-org/labkit/correlation"
46	"google.golang.org/grpc"
47	"google.golang.org/grpc/codes"
48	"google.golang.org/grpc/health/grpc_health_v1"
49	"google.golang.org/grpc/metadata"
50	"google.golang.org/grpc/peer"
51	"google.golang.org/grpc/status"
52	"google.golang.org/protobuf/proto"
53	"google.golang.org/protobuf/types/known/emptypb"
54)
55
56var testLogger = logrus.New()
57
58func init() {
59	testLogger.SetOutput(io.Discard)
60}
61
62func TestSecondaryRotation(t *testing.T) {
63	t.Skip("secondary rotation will change with the new data model")
64}
65
66func TestStreamDirectorReadOnlyEnforcement(t *testing.T) {
67	t.Parallel()
68	db := glsql.NewDB(t)
69	for _, tc := range []struct {
70		desc     string
71		readOnly bool
72	}{
73		{desc: "writable", readOnly: false},
74		{desc: "read-only", readOnly: true},
75	} {
76		t.Run(tc.desc, func(t *testing.T) {
77			db.TruncateAll(t)
78
79			const (
80				virtualStorage = "test-virtual-storage"
81				relativePath   = "test-repository"
82				storage        = "test-storage"
83			)
84			conf := config.Config{
85				VirtualStorages: []*config.VirtualStorage{
86					{
87						Name: virtualStorage,
88						Nodes: []*config.Node{
89							{
90								Address: "tcp://gitaly-primary.example.com",
91								Storage: storage,
92							},
93						},
94					},
95				},
96			}
97
98			ctx, cancel := testhelper.Context()
99			defer cancel()
100
101			rs := datastore.MockRepositoryStore{
102				GetConsistentStoragesFunc: func(context.Context, string, string) (map[string]struct{}, error) {
103					if tc.readOnly {
104						return map[string]struct{}{storage + "-other": {}}, nil
105					}
106					return map[string]struct{}{storage: {}}, nil
107				},
108			}
109
110			coordinator := NewCoordinator(
111				datastore.NewPostgresReplicationEventQueue(db),
112				rs,
113				NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) {
114					require.Equal(t, virtualStorage, vs)
115					return nodes.Shard{
116						Primary: &nodes.MockNode{GetStorageMethod: func() string {
117							return storage
118						}},
119					}, nil
120				}}, rs),
121				transactions.NewManager(conf),
122				conf,
123				protoregistry.GitalyProtoPreregistered,
124			)
125
126			frame, err := proto.Marshal(&gitalypb.CleanupRequest{Repository: &gitalypb.Repository{
127				StorageName:  virtualStorage,
128				RelativePath: relativePath,
129			}})
130			require.NoError(t, err)
131
132			_, err = coordinator.StreamDirector(ctx, "/gitaly.RepositoryService/Cleanup", &mockPeeker{frame: frame})
133			if tc.readOnly {
134				require.Equal(t, ErrRepositoryReadOnly, err)
135				testhelper.RequireGrpcError(t, err, codes.FailedPrecondition)
136			} else {
137				require.NoError(t, err)
138			}
139		})
140	}
141}
142
143func TestStreamDirectorMutator(t *testing.T) {
144	t.Parallel()
145	gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
146	testhelper.NewServerWithHealth(t, gitalySocket0)
147	testhelper.NewServerWithHealth(t, gitalySocket1)
148
149	primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
150	primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"}
151	secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"}
152	conf := config.Config{
153		VirtualStorages: []*config.VirtualStorage{
154			{
155				Name:  "praefect",
156				Nodes: []*config.Node{primaryNode, secondaryNode},
157			},
158		},
159	}
160	db := glsql.NewDB(t)
161
162	targetRepo := gitalypb.Repository{
163		StorageName:  "praefect",
164		RelativePath: "/path/to/hashed/storage",
165	}
166
167	ctx, cancel := testhelper.Context()
168	defer cancel()
169
170	txMgr := transactions.NewManager(conf)
171
172	nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil)
173	require.NoError(t, err)
174	defer nodeSet.Close()
175
176	for _, tc := range []struct {
177		desc             string
178		repositoryExists bool
179		error            error
180	}{
181		{
182			desc:             "succcessful",
183			repositoryExists: true,
184		},
185		{
186			desc:  "repository not found",
187			error: helper.ErrNotFound(fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get primary: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))),
188		},
189	} {
190		t.Run(tc.desc, func(t *testing.T) {
191			tx := db.Begin(t)
192			defer tx.Rollback(t)
193
194			rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames())
195
196			if tc.repositoryExists {
197				require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true))
198			}
199
200			testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()})
201			queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
202			queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
203				assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
204				return queue.Enqueue(ctx, event)
205			})
206
207			coordinator := NewCoordinator(
208				queueInterceptor,
209				rs,
210				NewPerRepositoryRouter(
211					nodeSet.Connections(),
212					nodes.NewPerRepositoryElector(tx),
213					StaticHealthChecker(conf.StorageNames()),
214					NewLockedRandom(rand.New(rand.NewSource(0))),
215					rs,
216					datastore.NewAssignmentStore(tx, conf.StorageNames()),
217					rs,
218					nil,
219				),
220				txMgr,
221				conf,
222				protoregistry.GitalyProtoPreregistered,
223			)
224
225			frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
226				Origin:     &targetRepo,
227				ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo},
228			})
229			require.NoError(t, err)
230
231			require.NoError(t, err)
232
233			fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool"
234
235			peeker := &mockPeeker{frame}
236			streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
237			if tc.error != nil {
238				require.Equal(t, tc.error, err)
239				return
240			}
241
242			require.NoError(t, err)
243			require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
244
245			mi, err := coordinator.registry.LookupMethod(fullMethod)
246			require.NoError(t, err)
247
248			m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
249			require.NoError(t, err)
250
251			rewrittenTargetRepo, err := mi.TargetRepo(m)
252			require.NoError(t, err)
253			require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
254
255			// this call creates new events in the queue and simulates usual flow of the update operation
256			require.NoError(t, streamParams.RequestFinalizer())
257
258			// wait until event persisted (async operation)
259			require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
260				return len(i.GetEnqueuedResult()) == 1
261			}))
262
263			events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10)
264			require.NoError(t, err)
265			require.Len(t, events, 1)
266
267			expectedEvent := datastore.ReplicationEvent{
268				ID:        1,
269				State:     datastore.JobStateInProgress,
270				Attempt:   2,
271				LockID:    "praefect|praefect-internal-2|/path/to/hashed/storage",
272				CreatedAt: events[0].CreatedAt,
273				UpdatedAt: events[0].UpdatedAt,
274				Job: datastore.ReplicationJob{
275					RepositoryID:      1,
276					Change:            datastore.UpdateRepo,
277					VirtualStorage:    conf.VirtualStorages[0].Name,
278					RelativePath:      targetRepo.RelativePath,
279					TargetNodeStorage: secondaryNode.Storage,
280					SourceNodeStorage: primaryNode.Storage,
281				},
282				Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
283			}
284			require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct")
285		})
286	}
287}
288
289func TestStreamDirectorMutator_StopTransaction(t *testing.T) {
290	t.Parallel()
291	socket := testhelper.GetTemporaryGitalySocketFileName(t)
292	testhelper.NewServerWithHealth(t, socket)
293
294	conf := config.Config{
295		VirtualStorages: []*config.VirtualStorage{
296			{
297				Name: "praefect",
298				Nodes: []*config.Node{
299					{Address: "unix://" + socket, Storage: "primary"},
300					{Address: "unix://" + socket, Storage: "secondary"},
301				},
302			},
303		},
304	}
305
306	repo := gitalypb.Repository{
307		StorageName:  "praefect",
308		RelativePath: "/path/to/hashed/storage",
309	}
310
311	nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
312	require.NoError(t, err)
313	nodeMgr.Start(0, time.Hour)
314
315	ctx, cancel := testhelper.Context()
316	defer cancel()
317
318	shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
319	require.NoError(t, err)
320
321	for _, name := range []string{"primary", "secondary"} {
322		node, err := shard.GetNode(name)
323		require.NoError(t, err)
324		waitNodeToChangeHealthStatus(ctx, t, node, true)
325	}
326
327	rs := datastore.MockRepositoryStore{
328		GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
329			return map[string]struct{}{"primary": {}, "secondary": {}}, nil
330		},
331	}
332
333	txMgr := transactions.NewManager(conf)
334
335	coordinator := NewCoordinator(
336		datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)),
337		rs,
338		NewNodeManagerRouter(nodeMgr, rs),
339		txMgr,
340		conf,
341		protoregistry.GitalyProtoPreregistered,
342	)
343
344	fullMethod := "/gitaly.SmartHTTPService/PostReceivePack"
345
346	frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{
347		Repository: &repo,
348	})
349	require.NoError(t, err)
350	peeker := &mockPeeker{frame}
351
352	streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
353	require.NoError(t, err)
354
355	txCtx := peer.NewContext(streamParams.Primary().Ctx, &peer.Peer{})
356	transaction, err := txinfo.TransactionFromContext(txCtx)
357	require.NoError(t, err)
358
359	var wg sync.WaitGroup
360	var syncWG sync.WaitGroup
361
362	wg.Add(2)
363	syncWG.Add(2)
364
365	go func() {
366		defer wg.Done()
367
368		vote := voting.VoteFromData([]byte("vote"))
369		err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote)
370		require.NoError(t, err)
371
372		// Assure that at least one vote was agreed on.
373		syncWG.Done()
374		syncWG.Wait()
375
376		require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID))
377	}()
378
379	go func() {
380		defer wg.Done()
381
382		vote := voting.VoteFromData([]byte("vote"))
383		err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote)
384		require.NoError(t, err)
385
386		// Assure that at least one vote was agreed on.
387		syncWG.Done()
388		syncWG.Wait()
389
390		err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote)
391		assert.True(t, errors.Is(err, transactions.ErrTransactionStopped))
392	}()
393
394	wg.Wait()
395
396	err = streamParams.RequestFinalizer()
397	require.NoError(t, err)
398}
399
400type mockRouter struct {
401	Router
402	routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error)
403}
404
405func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) {
406	return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary)
407}
408
409func TestStreamDirectorAccessor(t *testing.T) {
410	t.Parallel()
411	gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t)
412	testhelper.NewServerWithHealth(t, gitalySocket)
413
414	gitalyAddress := "unix://" + gitalySocket
415	conf := config.Config{
416		VirtualStorages: []*config.VirtualStorage{
417			{
418				Name: "praefect",
419				Nodes: []*config.Node{
420					{
421						Address: gitalyAddress,
422						Storage: "praefect-internal-1",
423					},
424				},
425			},
426		},
427	}
428
429	queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
430
431	targetRepo := gitalypb.Repository{
432		StorageName:  "praefect",
433		RelativePath: "/path/to/hashed/storage",
434	}
435
436	ctx, cancel := testhelper.Context()
437	defer cancel()
438
439	entry := testhelper.DiscardTestEntry(t)
440	rs := datastore.MockRepositoryStore{}
441
442	nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
443	require.NoError(t, err)
444	nodeMgr.Start(0, time.Minute)
445
446	txMgr := transactions.NewManager(conf)
447
448	for _, tc := range []struct {
449		desc   string
450		router Router
451		error  error
452	}{
453		{
454			desc:   "success",
455			router: NewNodeManagerRouter(nodeMgr, rs),
456		},
457		{
458			desc: "repository not found",
459			router: mockRouter{
460				routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RouterNode, error) {
461					return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath)
462				},
463			},
464			error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))),
465		},
466	} {
467		t.Run(tc.desc, func(t *testing.T) {
468			coordinator := NewCoordinator(
469				queue,
470				rs,
471				tc.router,
472				txMgr,
473				conf,
474				protoregistry.GitalyProtoPreregistered,
475			)
476
477			frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
478			require.NoError(t, err)
479
480			fullMethod := "/gitaly.RefService/FindAllBranches"
481
482			peeker := &mockPeeker{frame: frame}
483			streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
484			if tc.error != nil {
485				require.Equal(t, tc.error, err)
486				return
487			}
488
489			require.NoError(t, err)
490			require.Equal(t, gitalyAddress, streamParams.Primary().Conn.Target())
491
492			mi, err := coordinator.registry.LookupMethod(fullMethod)
493			require.NoError(t, err)
494			require.Equal(t, protoregistry.ScopeRepository, mi.Scope, "method must be repository scoped")
495			require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
496
497			m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
498			require.NoError(t, err)
499
500			rewrittenTargetRepo, err := mi.TargetRepo(m)
501			require.NoError(t, err)
502			require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
503		})
504	}
505}
506
507func TestCoordinatorStreamDirector_distributesReads(t *testing.T) {
508	t.Parallel()
509	gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
510	primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0)
511	healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1)
512
513	primaryNodeConf := config.Node{
514		Address: "unix://" + gitalySocket0,
515		Storage: "gitaly-1",
516	}
517
518	secondaryNodeConf := config.Node{
519		Address: "unix://" + gitalySocket1,
520		Storage: "gitaly-2",
521	}
522	conf := config.Config{
523		VirtualStorages: []*config.VirtualStorage{
524			{
525				Name:  "praefect",
526				Nodes: []*config.Node{&primaryNodeConf, &secondaryNodeConf},
527			},
528		},
529		Failover: config.Failover{
530			Enabled:          true,
531			ElectionStrategy: "local",
532		},
533	}
534
535	queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))
536
537	targetRepo := gitalypb.Repository{
538		StorageName:  "praefect",
539		RelativePath: "/path/to/hashed/storage",
540	}
541
542	ctx, cancel := testhelper.Context()
543	defer cancel()
544
545	entry := testhelper.DiscardTestEntry(t)
546
547	repoStore := datastore.MockRepositoryStore{
548		GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
549			return map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil
550		},
551	}
552
553	nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
554	require.NoError(t, err)
555	nodeMgr.Start(0, time.Minute)
556
557	txMgr := transactions.NewManager(conf)
558
559	coordinator := NewCoordinator(
560		queue,
561		repoStore,
562		NewNodeManagerRouter(nodeMgr, repoStore),
563		txMgr,
564		conf,
565		protoregistry.GitalyProtoPreregistered,
566	)
567
568	t.Run("forwards accessor operations", func(t *testing.T) {
569		var primaryChosen int
570		var secondaryChosen int
571
572		for i := 0; i < 16; i++ {
573			frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
574			require.NoError(t, err)
575
576			fullMethod := "/gitaly.RefService/FindAllBranches"
577
578			peeker := &mockPeeker{frame: frame}
579
580			streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
581			require.NoError(t, err)
582			require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary")
583
584			var nodeConf config.Node
585			switch streamParams.Primary().Conn.Target() {
586			case primaryNodeConf.Address:
587				nodeConf = primaryNodeConf
588				primaryChosen++
589			case secondaryNodeConf.Address:
590				nodeConf = secondaryNodeConf
591				secondaryChosen++
592			}
593
594			mi, err := coordinator.registry.LookupMethod(fullMethod)
595			require.NoError(t, err)
596			require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
597
598			m, err := protoMessage(mi, streamParams.Primary().Msg)
599			require.NoError(t, err)
600
601			rewrittenTargetRepo, err := mi.TargetRepo(m)
602			require.NoError(t, err)
603			require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
604		}
605
606		require.NotZero(t, primaryChosen, "primary should have been chosen at least once")
607		require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once")
608	})
609
610	t.Run("forwards accessor to primary if force-routing", func(t *testing.T) {
611		var primaryChosen int
612		var secondaryChosen int
613
614		for i := 0; i < 16; i++ {
615			frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
616			require.NoError(t, err)
617
618			fullMethod := "/gitaly.RefService/FindAllBranches"
619
620			peeker := &mockPeeker{frame: frame}
621
622			ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
623			ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
624
625			streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker)
626			require.NoError(t, err)
627			require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary")
628
629			var nodeConf config.Node
630			switch streamParams.Primary().Conn.Target() {
631			case primaryNodeConf.Address:
632				nodeConf = primaryNodeConf
633				primaryChosen++
634			case secondaryNodeConf.Address:
635				nodeConf = secondaryNodeConf
636				secondaryChosen++
637			}
638
639			mi, err := coordinator.registry.LookupMethod(fullMethod)
640			require.NoError(t, err)
641			require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
642
643			m, err := protoMessage(mi, streamParams.Primary().Msg)
644			require.NoError(t, err)
645
646			rewrittenTargetRepo, err := mi.TargetRepo(m)
647			require.NoError(t, err)
648			require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
649		}
650
651		require.Equal(t, 16, primaryChosen, "primary should have always been chosen")
652		require.Zero(t, secondaryChosen, "secondary should never have been chosen")
653	})
654
655	t.Run("forwards accessor to primary for primary-only RPCs", func(t *testing.T) {
656		var primaryChosen int
657		var secondaryChosen int
658
659		for i := 0; i < 16; i++ {
660			frame, err := proto.Marshal(&gitalypb.GetObjectDirectorySizeRequest{Repository: &targetRepo})
661			require.NoError(t, err)
662
663			fullMethod := "/gitaly.RepositoryService/GetObjectDirectorySize"
664
665			peeker := &mockPeeker{frame: frame}
666
667			ctx, cancel := testhelper.Context()
668			defer cancel()
669
670			streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker)
671			require.NoError(t, err)
672			require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary")
673
674			var nodeConf config.Node
675			switch streamParams.Primary().Conn.Target() {
676			case primaryNodeConf.Address:
677				nodeConf = primaryNodeConf
678				primaryChosen++
679			case secondaryNodeConf.Address:
680				nodeConf = secondaryNodeConf
681				secondaryChosen++
682			}
683
684			mi, err := coordinator.registry.LookupMethod(fullMethod)
685			require.NoError(t, err)
686			require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
687
688			m, err := protoMessage(mi, streamParams.Primary().Msg)
689			require.NoError(t, err)
690
691			rewrittenTargetRepo, err := mi.TargetRepo(m)
692			require.NoError(t, err)
693			require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
694		}
695
696		require.Equal(t, 16, primaryChosen, "primary should have always been chosen")
697		require.Zero(t, secondaryChosen, "secondary should never have been chosen")
698	})
699
700	t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) {
701		healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
702
703		shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
704		require.NoError(t, err)
705
706		gitaly1, err := shard.GetNode(secondaryNodeConf.Storage)
707		require.NoError(t, err)
708		waitNodeToChangeHealthStatus(ctx, t, gitaly1, false)
709		defer func() {
710			healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
711			waitNodeToChangeHealthStatus(ctx, t, gitaly1, true)
712		}()
713
714		frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
715		require.NoError(t, err)
716
717		fullMethod := "/gitaly.RefService/FindAllBranches"
718
719		peeker := &mockPeeker{frame: frame}
720		streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
721		require.NoError(t, err)
722		require.Equal(t, primaryNodeConf.Address, streamParams.Primary().Conn.Target(), "must be redirected to primary")
723
724		mi, err := coordinator.registry.LookupMethod(fullMethod)
725		require.NoError(t, err)
726		require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor")
727
728		m, err := protoMessage(mi, streamParams.Primary().Msg)
729		require.NoError(t, err)
730
731		rewrittenTargetRepo, err := mi.TargetRepo(m)
732		require.NoError(t, err)
733		require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
734	})
735
736	t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) {
737		primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
738
739		shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name)
740		require.NoError(t, err)
741
742		primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage)
743		require.NoError(t, err)
744		waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false)
745		defer func() {
746			primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
747			waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true)
748		}()
749
750		frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo})
751		require.NoError(t, err)
752
753		fullMethod := "/gitaly.RefService/FindAllBranches"
754
755		ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id")
756		ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly))
757
758		peeker := &mockPeeker{frame: frame}
759		_, err = coordinator.StreamDirector(ctx, fullMethod, peeker)
760		require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy))
761	})
762
763	t.Run("doesn't forward mutator operations", func(t *testing.T) {
764		frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo})
765		require.NoError(t, err)
766
767		fullMethod := "/gitaly.OperationService/UserUpdateBranch"
768
769		peeker := &mockPeeker{frame: frame}
770		streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
771		require.NoError(t, err)
772		require.Equal(t, primaryNodeConf.Address, streamParams.Primary().Conn.Target(), "must be redirected to primary")
773
774		mi, err := coordinator.registry.LookupMethod(fullMethod)
775		require.NoError(t, err)
776		require.Equal(t, protoregistry.OpMutator, mi.Operation, "method must be a mutator")
777
778		m, err := protoMessage(mi, streamParams.Primary().Msg)
779		require.NoError(t, err)
780
781		rewrittenTargetRepo, err := mi.TargetRepo(m)
782		require.NoError(t, err)
783		require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name")
784	})
785}
786
787func TestStreamDirector_repo_creation(t *testing.T) {
788	t.Parallel()
789
790	db := glsql.NewDB(t)
791
792	for _, tc := range []struct {
793		desc              string
794		electionStrategy  config.ElectionStrategy
795		replicationFactor int
796		primaryStored     bool
797		assignmentsStored bool
798	}{
799		{
800			desc:              "virtual storage scoped primaries",
801			electionStrategy:  config.ElectionStrategySQL,
802			replicationFactor: 3, // assignments are not set when not using repository specific primaries
803			primaryStored:     false,
804			assignmentsStored: false,
805		},
806		{
807			desc:              "repository specific primaries without variable replication factor",
808			electionStrategy:  config.ElectionStrategyPerRepository,
809			primaryStored:     true,
810			assignmentsStored: false,
811		},
812		{
813			desc:              "repository specific primaries with variable replication factor",
814			electionStrategy:  config.ElectionStrategyPerRepository,
815			replicationFactor: 3,
816			primaryStored:     true,
817			assignmentsStored: true,
818		},
819	} {
820		t.Run(tc.desc, func(t *testing.T) {
821			db.TruncateAll(t)
822			primaryNode := &config.Node{Storage: "praefect-internal-1"}
823			healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"}
824			unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"}
825			conf := config.Config{
826				Failover: config.Failover{ElectionStrategy: tc.electionStrategy},
827				VirtualStorages: []*config.VirtualStorage{
828					{
829						Name:                     "praefect",
830						DefaultReplicationFactor: tc.replicationFactor,
831						Nodes:                    []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode},
832					},
833				},
834			}
835
836			rewrittenStorage := primaryNode.Storage
837			targetRepo := gitalypb.Repository{
838				StorageName:  "praefect",
839				RelativePath: "/path/to/hashed/storage",
840			}
841
842			var createRepositoryCalled int64
843			rs := datastore.MockRepositoryStore{
844				CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error {
845					atomic.AddInt64(&createRepositoryCalled, 1)
846					assert.Equal(t, int64(0), repoID)
847					assert.Equal(t, targetRepo.StorageName, virtualStorage)
848					assert.Equal(t, targetRepo.RelativePath, relativePath)
849					assert.Equal(t, rewrittenStorage, primary)
850					assert.Equal(t, []string{healthySecondaryNode.Storage}, updatedSecondaries)
851					assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries)
852					assert.Equal(t, tc.primaryStored, storePrimary)
853					assert.Equal(t, tc.assignmentsStored, storeAssignments)
854					return nil
855				},
856			}
857
858			var router Router
859			var primaryConnPointer string
860			var secondaryConnPointers []string
861			switch tc.electionStrategy {
862			case config.ElectionStrategySQL:
863				gitalySocket0 := testhelper.GetTemporaryGitalySocketFileName(t)
864				gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t)
865				gitalySocket2 := testhelper.GetTemporaryGitalySocketFileName(t)
866				testhelper.NewServerWithHealth(t, gitalySocket0)
867				testhelper.NewServerWithHealth(t, gitalySocket1)
868				healthSrv2 := testhelper.NewServerWithHealth(t, gitalySocket2)
869				healthSrv2.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING)
870
871				primaryNode.Address = "unix://" + gitalySocket0
872				healthySecondaryNode.Address = "unix://" + gitalySocket1
873				unhealthySecondaryNode.Address = "unix://" + gitalySocket2
874
875				nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
876				require.NoError(t, err)
877				nodeMgr.Start(0, time.Hour)
878
879				router = NewNodeManagerRouter(nodeMgr, rs)
880				for _, node := range nodeMgr.Nodes()["praefect"] {
881					if node.GetStorage() == primaryNode.Storage {
882						primaryConnPointer = fmt.Sprintf("%p", node.GetConnection())
883						continue
884					}
885
886					if node.GetStorage() == healthySecondaryNode.Storage {
887						secondaryConnPointers = []string{fmt.Sprintf("%p", node.GetConnection())}
888					}
889				}
890			case config.ElectionStrategyPerRepository:
891				conns := Connections{
892					"praefect": {
893						primaryNode.Storage:            &grpc.ClientConn{},
894						healthySecondaryNode.Storage:   &grpc.ClientConn{},
895						unhealthySecondaryNode.Storage: &grpc.ClientConn{},
896					},
897				}
898				primaryConnPointer = fmt.Sprintf("%p", conns["praefect"][primaryNode.Storage])
899				secondaryConnPointers = []string{fmt.Sprintf("%p", conns["praefect"][healthySecondaryNode.Storage])}
900				router = NewPerRepositoryRouter(
901					conns,
902					nil,
903					StaticHealthChecker{"praefect": {primaryNode.Storage, healthySecondaryNode.Storage}},
904					mockRandom{
905						intnFunc: func(n int) int {
906							require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes")
907							return 0
908						},
909						shuffleFunc: func(n int, swap func(int, int)) {
910							require.Equal(t, n, 2, "number of secondary candidates should match the number of node minus the primary")
911						},
912					},
913					nil,
914					nil,
915					rs,
916					conf.DefaultReplicationFactors(),
917				)
918			default:
919				t.Fatalf("unexpected election strategy: %q", tc.electionStrategy)
920			}
921
922			txMgr := transactions.NewManager(conf)
923			queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db))
924
925			coordinator := NewCoordinator(
926				queueInterceptor,
927				rs,
928				router,
929				txMgr,
930				conf,
931				protoregistry.GitalyProtoPreregistered,
932			)
933
934			frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{
935				Repository: &targetRepo,
936			})
937			require.NoError(t, err)
938
939			fullMethod := "/gitaly.RepositoryService/CreateRepository"
940
941			ctx, cancel := testhelper.Context()
942			defer cancel()
943
944			peeker := &mockPeeker{frame}
945			streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker)
946			require.NoError(t, err)
947			require.Equal(t, primaryConnPointer, fmt.Sprintf("%p", streamParams.Primary().Conn))
948
949			var secondaries []string
950			for _, dst := range streamParams.Secondaries() {
951				secondaries = append(secondaries, fmt.Sprintf("%p", dst.Conn))
952			}
953			require.Equal(t, secondaryConnPointers, secondaries, "secondary connections did not match expected")
954
955			mi, err := coordinator.registry.LookupMethod(fullMethod)
956			require.NoError(t, err)
957
958			m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg)
959			require.NoError(t, err)
960
961			rewrittenTargetRepo, err := mi.TargetRepo(m)
962			require.NoError(t, err)
963			require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name")
964
965			vote := voting.VoteFromData([]byte{})
966			require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-1", vote))
967			require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-2", vote))
968
969			// this call creates new events in the queue and simulates usual flow of the update operation
970			err = streamParams.RequestFinalizer()
971			require.NoError(t, err)
972
973			// wait until event persisted (async operation)
974			require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
975				return len(i.GetEnqueuedResult()) == 1
976			}))
977
978			var expectedEvents, actualEvents []datastore.ReplicationEvent
979			for _, target := range []string{unhealthySecondaryNode.Storage} {
980				actual, err := queueInterceptor.Dequeue(ctx, "praefect", target, 10)
981				require.NoError(t, err)
982				require.Len(t, actual, 1)
983
984				actualEvents = append(actualEvents, actual[0])
985				expectedEvents = append(expectedEvents, datastore.ReplicationEvent{
986					ID:        actual[0].ID,
987					State:     datastore.JobStateInProgress,
988					Attempt:   2,
989					LockID:    fmt.Sprintf("praefect|%s|/path/to/hashed/storage", target),
990					CreatedAt: actual[0].CreatedAt,
991					UpdatedAt: actual[0].UpdatedAt,
992					Job: datastore.ReplicationJob{
993						Change:            datastore.UpdateRepo,
994						VirtualStorage:    conf.VirtualStorages[0].Name,
995						RelativePath:      targetRepo.RelativePath,
996						TargetNodeStorage: target,
997						SourceNodeStorage: primaryNode.Storage,
998					},
999					Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"},
1000				})
1001			}
1002
1003			require.Equal(t, expectedEvents, actualEvents, "ensure replication job created by stream director is correct")
1004			require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore")
1005		})
1006	}
1007}
1008
1009func waitNodeToChangeHealthStatus(ctx context.Context, t *testing.T, node nodes.Node, health bool) {
1010	t.Helper()
1011
1012	ctx, cancel := context.WithTimeout(ctx, time.Second)
1013	defer cancel()
1014
1015	for node.IsHealthy() != health {
1016		_, err := node.CheckHealth(ctx)
1017		require.NoError(t, err)
1018	}
1019}
1020
1021type mockPeeker struct {
1022	frame []byte
1023}
1024
1025func (m *mockPeeker) Peek() ([]byte, error) {
1026	return m.frame, nil
1027}
1028
1029func (m *mockPeeker) Modify(payload []byte) error {
1030	m.frame = payload
1031
1032	return nil
1033}
1034
1035func TestAbsentCorrelationID(t *testing.T) {
1036	t.Parallel()
1037	gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
1038	healthSrv0 := testhelper.NewServerWithHealth(t, gitalySocket0)
1039	healthSrv1 := testhelper.NewServerWithHealth(t, gitalySocket1)
1040	healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
1041	healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
1042
1043	primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
1044	conf := config.Config{
1045		VirtualStorages: []*config.VirtualStorage{
1046			{
1047				Name: "praefect",
1048				Nodes: []*config.Node{
1049					{
1050						Address: primaryAddress,
1051						Storage: "praefect-internal-1",
1052					},
1053					{
1054						Address: secondaryAddress,
1055						Storage: "praefect-internal-2",
1056					},
1057				},
1058			},
1059		},
1060	}
1061
1062	queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)))
1063	queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
1064		assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created")
1065		return queue.Enqueue(ctx, event)
1066	})
1067	targetRepo := gitalypb.Repository{
1068		StorageName:  "praefect",
1069		RelativePath: "/path/to/hashed/storage",
1070	}
1071
1072	ctx, cancel := testhelper.Context()
1073	defer cancel()
1074
1075	entry := testhelper.DiscardTestEntry(t)
1076
1077	nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
1078	require.NoError(t, err)
1079	nodeMgr.Start(0, time.Hour)
1080
1081	txMgr := transactions.NewManager(conf)
1082	rs := datastore.MockRepositoryStore{}
1083
1084	coordinator := NewCoordinator(
1085		queueInterceptor,
1086		rs,
1087		NewNodeManagerRouter(nodeMgr, rs),
1088		txMgr,
1089		conf,
1090		protoregistry.GitalyProtoPreregistered,
1091	)
1092
1093	frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{
1094		Origin:     &targetRepo,
1095		ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo},
1096	})
1097	require.NoError(t, err)
1098
1099	fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool"
1100	peeker := &mockPeeker{frame}
1101	streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker)
1102	require.NoError(t, err)
1103	require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target())
1104
1105	// must be run as it adds replication events to the queue
1106	require.NoError(t, streamParams.RequestFinalizer())
1107
1108	require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool {
1109		return len(i.GetEnqueuedResult()) == 1
1110	}))
1111	jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1)
1112	require.NoError(t, err)
1113	require.Len(t, jobs, 1)
1114
1115	require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey],
1116		"the coordinator should have generated a random ID")
1117}
1118
1119func TestCoordinatorEnqueueFailure(t *testing.T) {
1120	t.Parallel()
1121	conf := config.Config{
1122		VirtualStorages: []*config.VirtualStorage{
1123			{
1124				Name: "praefect",
1125				Nodes: []*config.Node{
1126					{
1127						Address: "unix:///woof",
1128						Storage: "praefect-internal-1",
1129					},
1130					{
1131						Address: "unix:///meow",
1132						Storage: "praefect-internal-2",
1133					},
1134				},
1135			},
1136		},
1137	}
1138
1139	queueInterceptor := datastore.NewReplicationEventQueueInterceptor(nil)
1140	errQ := make(chan error, 1)
1141	queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) {
1142		return datastore.ReplicationEvent{}, <-errQ
1143	})
1144	queueInterceptor.OnDequeue(func(context.Context, string, string, int, datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) {
1145		return nil, nil
1146	})
1147	queueInterceptor.OnAcknowledge(func(context.Context, datastore.JobState, []uint64, datastore.ReplicationEventQueue) ([]uint64, error) {
1148		return nil, nil
1149	})
1150
1151	ms := &mockSvc{
1152		repoMutatorUnary: func(context.Context, *mock.RepoRequest) (*emptypb.Empty, error) {
1153			return &emptypb.Empty{}, nil // always succeeds
1154		},
1155	}
1156
1157	r, err := protoregistry.NewFromPaths("praefect/mock/mock.proto")
1158	require.NoError(t, err)
1159
1160	ctx, cancel := testhelper.Context()
1161	defer cancel()
1162
1163	cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{
1164		withAnnotations: r,
1165		withQueue:       queueInterceptor,
1166		withBackends: withMockBackends(t, map[string]mock.SimpleServiceServer{
1167			conf.VirtualStorages[0].Nodes[0].Storage: ms,
1168			conf.VirtualStorages[0].Nodes[1].Storage: ms,
1169		}),
1170	})
1171	defer cleanup()
1172
1173	mcli := mock.NewSimpleServiceClient(cc)
1174
1175	errQ <- nil
1176	repoReq := &mock.RepoRequest{
1177		Repo: &gitalypb.Repository{
1178			RelativePath: "meow",
1179			StorageName:  conf.VirtualStorages[0].Name,
1180		},
1181	}
1182	_, err = mcli.RepoMutatorUnary(ctx, repoReq)
1183	require.NoError(t, err)
1184
1185	expectErrMsg := "enqueue failed"
1186	errQ <- errors.New(expectErrMsg)
1187	_, err = mcli.RepoMutatorUnary(ctx, repoReq)
1188	require.Error(t, err)
1189	require.Equal(t, err.Error(), "rpc error: code = Unknown desc = enqueue replication event: "+expectErrMsg)
1190}
1191
1192func TestStreamDirectorStorageScope(t *testing.T) {
1193	// stubs health-check requests because nodes.NewManager establishes connection on creation
1194	gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t)
1195	testhelper.NewServerWithHealth(t, gitalySocket0)
1196	testhelper.NewServerWithHealth(t, gitalySocket1)
1197
1198	primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1
1199	primaryGitaly := &config.Node{Address: primaryAddress, Storage: "gitaly-1"}
1200	secondaryGitaly := &config.Node{Address: secondaryAddress, Storage: "gitaly-2"}
1201	conf := config.Config{
1202		Failover: config.Failover{Enabled: true},
1203		VirtualStorages: []*config.VirtualStorage{{
1204			Name:  "praefect",
1205			Nodes: []*config.Node{primaryGitaly, secondaryGitaly},
1206		}},
1207	}
1208
1209	rs := datastore.MockRepositoryStore{}
1210
1211	nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil)
1212	require.NoError(t, err)
1213	nodeMgr.Start(0, time.Second)
1214	coordinator := NewCoordinator(
1215		nil,
1216		rs,
1217		NewNodeManagerRouter(nodeMgr, rs),
1218		nil,
1219		conf,
1220		protoregistry.GitalyProtoPreregistered,
1221	)
1222
1223	ctx, cancel := testhelper.Context()
1224	defer cancel()
1225
1226	t.Run("mutator", func(t *testing.T) {
1227		fullMethod := "/gitaly.NamespaceService/RemoveNamespace"
1228		requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpMutator)
1229
1230		frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{
1231			StorageName: conf.VirtualStorages[0].Name,
1232			Name:        "stub",
1233		})
1234		require.NoError(t, err)
1235
1236		streamParams, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
1237		require.NoError(t, err)
1238
1239		require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target(), "stream director didn't redirect to gitaly storage")
1240
1241		rewritten := gitalypb.RemoveNamespaceRequest{}
1242		require.NoError(t, proto.Unmarshal(streamParams.Primary().Msg, &rewritten))
1243		require.Equal(t, primaryGitaly.Storage, rewritten.StorageName, "stream director didn't rewrite storage")
1244	})
1245
1246	t.Run("accessor", func(t *testing.T) {
1247		fullMethod := "/gitaly.NamespaceService/NamespaceExists"
1248		requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpAccessor)
1249
1250		frame, err := proto.Marshal(&gitalypb.NamespaceExistsRequest{
1251			StorageName: conf.VirtualStorages[0].Name,
1252			Name:        "stub",
1253		})
1254		require.NoError(t, err)
1255
1256		streamParams, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
1257		require.NoError(t, err)
1258
1259		require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target(), "stream director didn't redirect to gitaly storage")
1260
1261		rewritten := gitalypb.RemoveNamespaceRequest{}
1262		require.NoError(t, proto.Unmarshal(streamParams.Primary().Msg, &rewritten))
1263		require.Equal(t, primaryGitaly.Storage, rewritten.StorageName, "stream director didn't rewrite storage")
1264	})
1265}
1266
1267func TestStreamDirectorStorageScopeError(t *testing.T) {
1268	ctx, cancel := testhelper.Context()
1269	defer cancel()
1270
1271	t.Run("no storage provided", func(t *testing.T) {
1272		mgr := &nodes.MockManager{
1273			GetShardFunc: func(s string) (nodes.Shard, error) {
1274				require.FailNow(t, "validation of input was not executed")
1275				return nodes.Shard{}, assert.AnError
1276			},
1277		}
1278
1279		rs := datastore.MockRepositoryStore{}
1280		coordinator := NewCoordinator(
1281			nil,
1282			rs,
1283			NewNodeManagerRouter(mgr, rs),
1284			nil,
1285			config.Config{},
1286			protoregistry.GitalyProtoPreregistered,
1287		)
1288
1289		frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "", Name: "stub"})
1290		require.NoError(t, err)
1291
1292		_, err = coordinator.StreamDirector(ctx, "/gitaly.NamespaceService/RemoveNamespace", &mockPeeker{frame})
1293		require.Error(t, err)
1294		result, ok := status.FromError(err)
1295		require.True(t, ok)
1296		require.Equal(t, codes.InvalidArgument, result.Code())
1297		require.Equal(t, "storage scoped: target storage is invalid", result.Message())
1298	})
1299
1300	t.Run("unknown storage provided", func(t *testing.T) {
1301		mgr := &nodes.MockManager{
1302			GetShardFunc: func(s string) (nodes.Shard, error) {
1303				require.Equal(t, "fake", s)
1304				return nodes.Shard{}, nodes.ErrVirtualStorageNotExist
1305			},
1306		}
1307
1308		rs := datastore.MockRepositoryStore{}
1309		coordinator := NewCoordinator(
1310			nil,
1311			rs,
1312			NewNodeManagerRouter(mgr, rs),
1313			nil,
1314			config.Config{},
1315			protoregistry.GitalyProtoPreregistered,
1316		)
1317
1318		frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"})
1319		require.NoError(t, err)
1320
1321		_, err = coordinator.StreamDirector(ctx, "/gitaly.NamespaceService/RemoveNamespace", &mockPeeker{frame})
1322		require.Error(t, err)
1323		result, ok := status.FromError(err)
1324		require.True(t, ok)
1325		require.Equal(t, codes.InvalidArgument, result.Code())
1326		require.Equal(t, "virtual storage does not exist", result.Message())
1327	})
1328
1329	t.Run("primary gitaly is not healthy", func(t *testing.T) {
1330		t.Run("accessor", func(t *testing.T) {
1331			mgr := &nodes.MockManager{
1332				GetShardFunc: func(s string) (nodes.Shard, error) {
1333					require.Equal(t, "fake", s)
1334					return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
1335				},
1336			}
1337
1338			rs := datastore.MockRepositoryStore{}
1339			coordinator := NewCoordinator(
1340				nil,
1341				rs,
1342				NewNodeManagerRouter(mgr, rs),
1343				nil,
1344				config.Config{},
1345				protoregistry.GitalyProtoPreregistered,
1346			)
1347
1348			fullMethod := "/gitaly.NamespaceService/NamespaceExists"
1349			requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpAccessor)
1350
1351			frame, err := proto.Marshal(&gitalypb.NamespaceExistsRequest{StorageName: "fake", Name: "stub"})
1352			require.NoError(t, err)
1353
1354			_, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
1355			require.Error(t, err)
1356			result, ok := status.FromError(err)
1357			require.True(t, ok)
1358			require.Equal(t, codes.Internal, result.Code())
1359			require.Equal(t, `accessor storage scoped: route storage accessor "fake": primary gitaly is not healthy`, result.Message())
1360		})
1361
1362		t.Run("mutator", func(t *testing.T) {
1363			mgr := &nodes.MockManager{
1364				GetShardFunc: func(s string) (nodes.Shard, error) {
1365					require.Equal(t, "fake", s)
1366					return nodes.Shard{}, nodes.ErrPrimaryNotHealthy
1367				},
1368			}
1369			rs := datastore.MockRepositoryStore{}
1370			coordinator := NewCoordinator(
1371				nil,
1372				rs,
1373				NewNodeManagerRouter(mgr, rs),
1374				nil,
1375				config.Config{},
1376				protoregistry.GitalyProtoPreregistered,
1377			)
1378
1379			fullMethod := "/gitaly.NamespaceService/RemoveNamespace"
1380			requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpMutator)
1381
1382			frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"})
1383			require.NoError(t, err)
1384
1385			_, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame})
1386			require.Error(t, err)
1387			result, ok := status.FromError(err)
1388			require.True(t, ok)
1389			require.Equal(t, codes.Internal, result.Code())
1390			require.Equal(t, `mutator storage scoped: get shard "fake": primary gitaly is not healthy`, result.Message())
1391		})
1392	})
1393}
1394
1395func TestDisabledTransactionsWithFeatureFlag(t *testing.T) {
1396	ctx, cancel := testhelper.Context()
1397	defer cancel()
1398
1399	for rpc, enabledFn := range transactionRPCs {
1400		if enabledFn(ctx) {
1401			require.True(t, shouldUseTransaction(ctx, rpc))
1402			break
1403		}
1404	}
1405}
1406
1407func requireScopeOperation(t *testing.T, registry *protoregistry.Registry, fullMethod string, scope protoregistry.Scope, op protoregistry.OpType) {
1408	t.Helper()
1409
1410	mi, err := registry.LookupMethod(fullMethod)
1411	require.NoError(t, err)
1412	require.Equal(t, scope, mi.Scope, "scope doesn't match requested")
1413	require.Equal(t, op, mi.Operation, "operation type doesn't match requested")
1414}
1415
1416type mockOperationServer struct {
1417	gitalypb.UnimplementedOperationServiceServer
1418	t      testing.TB
1419	wg     *sync.WaitGroup
1420	err    error
1421	called bool
1422}
1423
1424func (s *mockOperationServer) UserCreateBranch(
1425	context.Context,
1426	*gitalypb.UserCreateBranchRequest,
1427) (*gitalypb.UserCreateBranchResponse, error) {
1428	// We need to wait for all servers to arrive in this RPC. If we don't it could be that for
1429	// example the primary arrives quicker than the others and directly errors. This would cause
1430	// stream cancellation, and if the secondaries didn't yet end up in UserCreateBranch, we
1431	// wouldn't see the function call.
1432	s.called = true
1433	s.wg.Done()
1434	s.wg.Wait()
1435	return &gitalypb.UserCreateBranchResponse{}, s.err
1436}
1437
1438type mockLeaseEnder struct{}
1439
1440func (e mockLeaseEnder) EndLease(context.Context) error {
1441	return nil
1442}
1443
1444type mockDiskCache struct {
1445	cache.Cache
1446}
1447
1448func (c *mockDiskCache) StartLease(*gitalypb.Repository) (cache.LeaseEnder, error) {
1449	return mockLeaseEnder{}, nil
1450}
1451
1452// TestCoordinator_grpcErrorHandling asserts that we correctly proxy errors in case any of the nodes
1453// fails. Most importantly, we want to make sure to only ever forward errors from the primary and
1454// never from the secondaries.
1455func TestCoordinator_grpcErrorHandling(t *testing.T) {
1456	t.Parallel()
1457	praefectConfig := config.Config{
1458		VirtualStorages: []*config.VirtualStorage{
1459			{
1460				Name: testhelper.DefaultStorageName,
1461			},
1462		},
1463	}
1464
1465	type gitalyNode struct {
1466		mock            *nodes.MockNode
1467		operationServer *mockOperationServer
1468	}
1469
1470	_, repoProto, _ := testcfg.BuildWithRepo(t)
1471
1472	for _, tc := range []struct {
1473		desc        string
1474		errByNode   map[string]error
1475		expectedErr error
1476	}{
1477		{
1478			desc: "no errors",
1479		},
1480		{
1481			desc: "primary error gets forwarded",
1482			errByNode: map[string]error{
1483				"primary": errors.New("foo"),
1484			},
1485			expectedErr: status.Error(codes.Unknown, "foo"),
1486		},
1487		{
1488			desc: "secondary error gets ignored",
1489			errByNode: map[string]error{
1490				"secondary-1": errors.New("foo"),
1491			},
1492		},
1493		{
1494			desc: "primary error has precedence",
1495			errByNode: map[string]error{
1496				"primary":     errors.New("primary"),
1497				"secondary-1": errors.New("secondary-1"),
1498				"secondary-2": errors.New("secondary-2"),
1499			},
1500			expectedErr: status.Error(codes.Unknown, "primary"),
1501		},
1502	} {
1503		t.Run(tc.desc, func(t *testing.T) {
1504			ctx, cleanup := testhelper.Context()
1505			defer cleanup()
1506
1507			var wg sync.WaitGroup
1508			gitalies := make(map[string]gitalyNode)
1509			for _, gitaly := range []string{"primary", "secondary-1", "secondary-2"} {
1510				gitaly := gitaly
1511
1512				cfg := testcfg.Build(t, testcfg.WithStorages(gitaly))
1513
1514				operationServer := &mockOperationServer{
1515					t:  t,
1516					wg: &wg,
1517				}
1518
1519				addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) {
1520					gitalypb.RegisterOperationServiceServer(srv, operationServer)
1521				}, testserver.WithDiskCache(&mockDiskCache{}), testserver.WithDisablePraefect())
1522
1523				conn, err := client.DialContext(ctx, addr, []grpc.DialOption{
1524					grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())),
1525				})
1526				require.NoError(t, err)
1527				defer conn.Close()
1528
1529				gitalies[gitaly] = gitalyNode{
1530					mock: &nodes.MockNode{
1531						Conn:             conn,
1532						Healthy:          true,
1533						GetStorageMethod: func() string { return gitaly },
1534					},
1535					operationServer: operationServer,
1536				}
1537
1538				praefectConfig.VirtualStorages[0].Nodes = append(praefectConfig.VirtualStorages[0].Nodes, &config.Node{
1539					Address: addr,
1540					Storage: gitaly,
1541				})
1542			}
1543
1544			praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{
1545				// Set up a mock manager which sets up primary/secondaries and pretends that all nodes are
1546				// healthy. We need fixed roles and unhealthy nodes will not take part in transactions.
1547				withNodeMgr: &nodes.MockManager{
1548					Storage: testhelper.DefaultStorageName,
1549					GetShardFunc: func(shardName string) (nodes.Shard, error) {
1550						require.Equal(t, testhelper.DefaultStorageName, shardName)
1551						return nodes.Shard{
1552							Primary: gitalies["primary"].mock,
1553							Secondaries: []nodes.Node{
1554								gitalies["secondary-1"].mock,
1555								gitalies["secondary-2"].mock,
1556							},
1557						}, nil
1558					},
1559				},
1560				// Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent
1561				// nodes will take part in transactions.
1562				withRepoStore: datastore.MockRepositoryStore{
1563					GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) {
1564						return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil
1565					},
1566				},
1567			})
1568			defer cleanup()
1569
1570			for name, node := range gitalies {
1571				wg.Add(1)
1572				node.operationServer.err = tc.errByNode[name]
1573				node.operationServer.called = false
1574			}
1575
1576			_, err := gitalypb.NewOperationServiceClient(praefectConn).UserCreateBranch(ctx,
1577				&gitalypb.UserCreateBranchRequest{
1578					Repository: repoProto,
1579				})
1580			testassert.GrpcEqualErr(t, tc.expectedErr, err)
1581
1582			for _, node := range gitalies {
1583				require.True(t, node.operationServer.called, "expected gitaly %q to have been called", node.mock.GetStorage())
1584			}
1585		})
1586	}
1587}
1588
1589type mockTransaction struct {
1590	nodeStates      map[string]transactions.VoteResult
1591	subtransactions int
1592	didVote         map[string]bool
1593}
1594
1595func (t mockTransaction) ID() uint64 {
1596	return 0
1597}
1598
1599func (t mockTransaction) CountSubtransactions() int {
1600	return t.subtransactions
1601}
1602
1603func (t mockTransaction) DidVote(node string) bool {
1604	return t.didVote[node]
1605}
1606
1607func (t mockTransaction) State() (map[string]transactions.VoteResult, error) {
1608	return t.nodeStates, nil
1609}
1610
1611func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) {
1612	type node struct {
1613		name  string
1614		state transactions.VoteResult
1615		err   error
1616	}
1617
1618	ctx, cancel := testhelper.Context()
1619	defer cancel()
1620
1621	anyErr := errors.New("arbitrary error")
1622
1623	for _, tc := range []struct {
1624		desc                   string
1625		primary                node
1626		secondaries            []node
1627		replicas               []string
1628		subtransactions        int
1629		didVote                map[string]bool
1630		expectedPrimaryDirtied bool
1631		expectedOutdated       []string
1632		expectedUpdated        []string
1633		expectedMetrics        map[string]int
1634	}{
1635		{
1636			desc: "single committed node",
1637			primary: node{
1638				name:  "primary",
1639				state: transactions.VoteCommitted,
1640			},
1641			didVote: map[string]bool{
1642				"primary": true,
1643			},
1644			subtransactions:        1,
1645			expectedPrimaryDirtied: true,
1646		},
1647		{
1648			desc: "single failed node",
1649			primary: node{
1650				name:  "primary",
1651				state: transactions.VoteFailed,
1652			},
1653			subtransactions: 1,
1654		},
1655		{
1656			desc: "single erred node",
1657			primary: node{
1658				name: "primary",
1659				err:  anyErr,
1660			},
1661		},
1662		{
1663			desc: "single node without subtransactions",
1664			primary: node{
1665				name: "primary",
1666			},
1667			subtransactions:        0,
1668			expectedPrimaryDirtied: true,
1669		},
1670		{
1671			desc: "single successful node with replica",
1672			primary: node{
1673				name:  "primary",
1674				state: transactions.VoteCommitted,
1675			},
1676			replicas: []string{"replica"},
1677			didVote: map[string]bool{
1678				"primary": true,
1679			},
1680			subtransactions:        1,
1681			expectedPrimaryDirtied: true,
1682			expectedOutdated:       []string{"replica"},
1683			expectedMetrics: map[string]int{
1684				"outdated": 1,
1685			},
1686		},
1687		{
1688			desc: "single failing node with replica is not considered modified",
1689			primary: node{
1690				name:  "primary",
1691				state: transactions.VoteFailed,
1692			},
1693			subtransactions: 1,
1694		},
1695		{
1696			desc: "single erred node with replica",
1697			primary: node{
1698				name:  "primary",
1699				state: transactions.VoteCommitted,
1700				err:   anyErr,
1701			},
1702			replicas: []string{"replica"},
1703			didVote: map[string]bool{
1704				"primary": true,
1705			},
1706			subtransactions:        1,
1707			expectedPrimaryDirtied: true,
1708			expectedOutdated:       []string{"replica"},
1709			expectedMetrics: map[string]int{
1710				"outdated": 1,
1711			},
1712		},
1713		{
1714			desc: "single erred node without commit with replica",
1715			primary: node{
1716				name:  "primary",
1717				state: transactions.VoteCommitted,
1718				err:   anyErr,
1719			},
1720			replicas:               []string{"replica"},
1721			subtransactions:        1,
1722			expectedPrimaryDirtied: false,
1723		},
1724		{
1725			desc: "single node without transaction with replica",
1726			primary: node{
1727				name: "primary",
1728			},
1729			replicas:               []string{"replica"},
1730			subtransactions:        0,
1731			expectedPrimaryDirtied: true,
1732			expectedOutdated:       []string{"replica"},
1733			expectedMetrics: map[string]int{
1734				"outdated": 1,
1735			},
1736		},
1737		{
1738			desc: "multiple committed nodes",
1739			primary: node{
1740				name:  "primary",
1741				state: transactions.VoteCommitted,
1742			},
1743			secondaries: []node{
1744				{name: "s1", state: transactions.VoteCommitted},
1745				{name: "s2", state: transactions.VoteCommitted},
1746			},
1747			didVote: map[string]bool{
1748				"primary": true,
1749			},
1750			subtransactions:        1,
1751			expectedPrimaryDirtied: true,
1752			expectedUpdated:        []string{"s1", "s2"},
1753			expectedMetrics: map[string]int{
1754				"updated": 2,
1755			},
1756		},
1757		{
1758			desc: "multiple committed nodes with primary err",
1759			primary: node{
1760				name:  "primary",
1761				state: transactions.VoteCommitted,
1762				err:   anyErr,
1763			},
1764			secondaries: []node{
1765				{name: "s1", state: transactions.VoteCommitted},
1766				{name: "s2", state: transactions.VoteCommitted},
1767			},
1768			didVote: map[string]bool{
1769				"primary": true,
1770			},
1771			subtransactions:        1,
1772			expectedPrimaryDirtied: true,
1773			expectedOutdated:       []string{"s1", "s2"},
1774			expectedMetrics: map[string]int{
1775				"node-error-status": 2,
1776			},
1777		},
1778		{
1779			desc: "multiple committed nodes with same error as primary",
1780			primary: node{
1781				name:  "primary",
1782				state: transactions.VoteCommitted,
1783				err:   anyErr,
1784			},
1785			secondaries: []node{
1786				{name: "s1", state: transactions.VoteCommitted, err: anyErr},
1787				{name: "s2", state: transactions.VoteCommitted, err: anyErr},
1788			},
1789			didVote: map[string]bool{
1790				"primary": true,
1791			},
1792			subtransactions:        1,
1793			expectedPrimaryDirtied: true,
1794			expectedUpdated:        []string{"s1", "s2"},
1795			expectedMetrics: map[string]int{
1796				"updated": 2,
1797			},
1798		},
1799		{
1800			desc: "multiple committed nodes with different error as primary",
1801			primary: node{
1802				name:  "primary",
1803				state: transactions.VoteCommitted,
1804				err:   anyErr,
1805			},
1806			secondaries: []node{
1807				{name: "s1", state: transactions.VoteCommitted, err: errors.New("somethingsomething")},
1808				{name: "s2", state: transactions.VoteCommitted, err: anyErr},
1809			},
1810			didVote: map[string]bool{
1811				"primary": true,
1812			},
1813			subtransactions:        1,
1814			expectedPrimaryDirtied: true,
1815			expectedUpdated:        []string{"s2"},
1816			expectedOutdated:       []string{"s1"},
1817			expectedMetrics: map[string]int{
1818				"node-error-status": 1,
1819				"updated":           1,
1820			},
1821		},
1822		{
1823			desc: "multiple committed nodes with secondary err",
1824			primary: node{
1825				name:  "primary",
1826				state: transactions.VoteCommitted,
1827			},
1828			secondaries: []node{
1829				{name: "s1", state: transactions.VoteCommitted, err: anyErr},
1830				{name: "s2", state: transactions.VoteCommitted},
1831			},
1832			didVote: map[string]bool{
1833				"primary": true,
1834			},
1835			subtransactions:        1,
1836			expectedPrimaryDirtied: true,
1837			expectedUpdated:        []string{"s2"},
1838			expectedOutdated:       []string{"s1"},
1839			expectedMetrics: map[string]int{
1840				"node-error-status": 1,
1841				"updated":           1,
1842			},
1843		},
1844		{
1845			desc: "multiple committed nodes with primary and missing secondary err",
1846			primary: node{
1847				name:  "primary",
1848				state: transactions.VoteCommitted,
1849				err:   anyErr,
1850			},
1851			secondaries: []node{
1852				{name: "s1", state: transactions.VoteCommitted, err: anyErr},
1853				{name: "s2", state: transactions.VoteCommitted},
1854			},
1855			didVote: map[string]bool{
1856				"primary": true,
1857			},
1858			subtransactions:        1,
1859			expectedPrimaryDirtied: true,
1860			expectedUpdated:        []string{"s1"},
1861			expectedOutdated:       []string{"s2"},
1862			expectedMetrics: map[string]int{
1863				"node-error-status": 1,
1864				"updated":           1,
1865			},
1866		},
1867		{
1868			desc: "partial success",
1869			primary: node{
1870				name:  "primary",
1871				state: transactions.VoteCommitted,
1872			},
1873			secondaries: []node{
1874				{name: "s1", state: transactions.VoteFailed},
1875				{name: "s2", state: transactions.VoteCommitted},
1876			},
1877			didVote: map[string]bool{
1878				"primary": true,
1879			},
1880			subtransactions:        1,
1881			expectedPrimaryDirtied: true,
1882			expectedUpdated:        []string{"s2"},
1883			expectedOutdated:       []string{"s1"},
1884			expectedMetrics: map[string]int{
1885				"node-not-committed": 1,
1886				"updated":            1,
1887			},
1888		},
1889		{
1890			desc: "failure with (impossible) secondary success",
1891			primary: node{
1892				name:  "primary",
1893				state: transactions.VoteFailed,
1894			},
1895			secondaries: []node{
1896				{name: "s1", state: transactions.VoteFailed},
1897				{name: "s2", state: transactions.VoteCommitted},
1898			},
1899			didVote: map[string]bool{
1900				"primary": true,
1901			},
1902			subtransactions:        1,
1903			expectedPrimaryDirtied: true,
1904			expectedOutdated:       []string{"s1", "s2"},
1905			expectedMetrics: map[string]int{
1906				"primary-not-committed": 2,
1907			},
1908		},
1909		{
1910			desc: "failure with no primary votes",
1911			primary: node{
1912				name:  "primary",
1913				state: transactions.VoteFailed,
1914			},
1915			secondaries: []node{
1916				{name: "s1", state: transactions.VoteFailed},
1917				{name: "s2", state: transactions.VoteCommitted},
1918			},
1919			didVote: map[string]bool{
1920				"s1": true,
1921				"s2": true,
1922			},
1923			subtransactions: 1,
1924		},
1925		{
1926			desc: "multiple nodes without subtransactions",
1927			primary: node{
1928				name:  "primary",
1929				state: transactions.VoteFailed,
1930			},
1931			secondaries: []node{
1932				{name: "s1", state: transactions.VoteFailed},
1933				{name: "s2", state: transactions.VoteCommitted},
1934			},
1935			subtransactions:        0,
1936			expectedPrimaryDirtied: true,
1937			expectedOutdated:       []string{"s1", "s2"},
1938			expectedMetrics: map[string]int{
1939				"no-votes": 2,
1940			},
1941		},
1942		{
1943			desc: "multiple nodes with replica and partial failures",
1944			primary: node{
1945				name:  "primary",
1946				state: transactions.VoteCommitted,
1947			},
1948			secondaries: []node{
1949				{name: "s1", state: transactions.VoteFailed},
1950				{name: "s2", state: transactions.VoteCommitted},
1951			},
1952			replicas: []string{"r1", "r2"},
1953			didVote: map[string]bool{
1954				"primary": true,
1955			},
1956			subtransactions:        1,
1957			expectedPrimaryDirtied: true,
1958			expectedOutdated:       []string{"s1", "r1", "r2"},
1959			expectedUpdated:        []string{"s2"},
1960			expectedMetrics: map[string]int{
1961				"node-not-committed": 1,
1962				"outdated":           2,
1963				"updated":            1,
1964			},
1965		},
1966		{
1967			desc: "multiple nodes with replica and partial err",
1968			primary: node{
1969				name:  "primary",
1970				state: transactions.VoteCommitted,
1971			},
1972			secondaries: []node{
1973				{name: "s1", state: transactions.VoteFailed},
1974				{name: "s2", state: transactions.VoteCommitted, err: anyErr},
1975			},
1976			replicas: []string{"r1", "r2"},
1977			didVote: map[string]bool{
1978				"primary": true,
1979			},
1980			subtransactions:        1,
1981			expectedPrimaryDirtied: true,
1982			expectedOutdated:       []string{"s1", "s2", "r1", "r2"},
1983			expectedMetrics: map[string]int{
1984				"node-error-status":  1,
1985				"node-not-committed": 1,
1986				"outdated":           2,
1987			},
1988		},
1989	} {
1990		t.Run(tc.desc, func(t *testing.T) {
1991			nodes := append(tc.secondaries, tc.primary)
1992			voters := make([]transactions.Voter, len(nodes))
1993
1994			states := make(map[string]transactions.VoteResult)
1995			nodeErrors := &nodeErrors{
1996				errByNode: make(map[string]error),
1997			}
1998
1999			for i, node := range nodes {
2000				voters[i] = transactions.Voter{
2001					Name:  node.name,
2002					Votes: 1,
2003				}
2004				states[node.name] = node.state
2005				nodeErrors.errByNode[node.name] = node.err
2006			}
2007
2008			transaction := mockTransaction{
2009				nodeStates:      states,
2010				subtransactions: tc.subtransactions,
2011				didVote:         tc.didVote,
2012			}
2013
2014			route := RepositoryMutatorRoute{
2015				Primary: RouterNode{
2016					Storage: tc.primary.name,
2017				},
2018			}
2019			for _, secondary := range tc.secondaries {
2020				route.Secondaries = append(route.Secondaries, RouterNode{
2021					Storage: secondary.name,
2022				})
2023			}
2024			route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...)
2025
2026			metric := prometheus.NewCounterVec(prometheus.CounterOpts{
2027				Name: "stub", Help: "help",
2028			}, []string{"reason"})
2029
2030			primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric)
2031			require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied)
2032			require.ElementsMatch(t, tc.expectedUpdated, updated)
2033			require.ElementsMatch(t, tc.expectedOutdated, outdated)
2034
2035			expectedMetrics := "# HELP stub help\n# TYPE stub counter\n"
2036			for metric, value := range tc.expectedMetrics {
2037				expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value)
2038			}
2039
2040			require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics)))
2041		})
2042	}
2043}
2044
2045func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) {
2046	type ctxKey struct{}
2047
2048	parentDeadline := time.Now()
2049	ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), ctxKey{}, "value"), parentDeadline)
2050	cancel()
2051
2052	requireSuppressedCancellation := func(t testing.TB, ctx context.Context) {
2053		deadline, ok := ctx.Deadline()
2054		require.True(t, ok)
2055		require.NotEqual(t, parentDeadline, deadline)
2056		require.Equal(t, ctx.Value(ctxKey{}), "value")
2057		require.Nil(t, ctx.Err())
2058		select {
2059		case <-ctx.Done():
2060			t.Fatal("context should not be canceled if the parent is canceled")
2061		default:
2062			require.NotNil(t, ctx.Done())
2063		}
2064	}
2065
2066	err := errors.New("error")
2067
2068	for _, tc := range []struct {
2069		change datastore.ChangeType
2070		errMsg string
2071	}{
2072		{
2073			change: datastore.UpdateRepo,
2074			errMsg: "increment generation: error",
2075		},
2076		{
2077			change: datastore.RenameRepo,
2078			errMsg: "rename repository: error",
2079		},
2080		{
2081			change: datastore.DeleteRepo,
2082			errMsg: "delete repository: error",
2083		},
2084		{
2085			change: "replication jobs only",
2086			errMsg: "enqueue replication event: error",
2087		},
2088	} {
2089		t.Run(string(tc.change), func(t *testing.T) {
2090			require.EqualError(t,
2091				NewCoordinator(
2092					&datastore.MockReplicationEventQueue{
2093						EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) {
2094							requireSuppressedCancellation(t, ctx)
2095							return datastore.ReplicationEvent{}, err
2096						},
2097					},
2098					datastore.MockRepositoryStore{
2099						IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error {
2100							requireSuppressedCancellation(t, ctx)
2101							return err
2102						},
2103						RenameRepositoryFunc: func(ctx context.Context, _, _, _, _ string) error {
2104							requireSuppressedCancellation(t, ctx)
2105							return err
2106						},
2107						DeleteRepositoryFunc: func(ctx context.Context, _, _ string, _ []string) error {
2108							requireSuppressedCancellation(t, ctx)
2109							return err
2110						},
2111						CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _ string, _, _ []string, _, _ bool) error {
2112							requireSuppressedCancellation(t, ctx)
2113							return err
2114						},
2115					},
2116					nil,
2117					nil,
2118					config.Config{},
2119					nil,
2120				).newRequestFinalizer(
2121					ctx,
2122					0,
2123					"virtual storage",
2124					&gitalypb.Repository{},
2125					"primary",
2126					[]string{},
2127					[]string{"secondary"},
2128					tc.change,
2129					datastore.Params{"RelativePath": "relative-path"},
2130					"rpc-name",
2131				)(),
2132				tc.errMsg,
2133			)
2134		})
2135	}
2136}
2137
2138func TestStreamParametersContext(t *testing.T) {
2139	// Because we're using NewFeatureFlag, they'll end up in the All array.
2140	enabledFF := featureflag.NewFeatureFlag("default-enabled", true)
2141	disabledFF := featureflag.NewFeatureFlag("default-disabled", false)
2142
2143	type expectedFlag struct {
2144		flag    featureflag.FeatureFlag
2145		enabled bool
2146	}
2147
2148	expectedFlags := func(overrides ...expectedFlag) []expectedFlag {
2149		flagValues := map[featureflag.FeatureFlag]bool{}
2150		for _, flag := range featureflag.All {
2151			flagValues[flag] = flag.OnByDefault
2152		}
2153		for _, override := range overrides {
2154			flagValues[override.flag] = override.enabled
2155		}
2156
2157		expectedFlags := make([]expectedFlag, 0, len(flagValues))
2158		for flag, value := range flagValues {
2159			expectedFlags = append(expectedFlags, expectedFlag{
2160				flag: flag, enabled: value,
2161			})
2162		}
2163
2164		return expectedFlags
2165	}
2166
2167	metadataForFlags := func(flags []expectedFlag) metadata.MD {
2168		pairs := []string{}
2169		for _, flag := range flags {
2170			pairs = append(pairs, flag.flag.MetadataKey(), strconv.FormatBool(flag.enabled))
2171		}
2172		return metadata.Pairs(pairs...)
2173	}
2174
2175	for _, tc := range []struct {
2176		desc               string
2177		setupContext       func() context.Context
2178		expectedIncomingMD metadata.MD
2179		expectedOutgoingMD metadata.MD
2180		expectedFlags      []expectedFlag
2181	}{
2182		{
2183			desc: "no metadata",
2184			setupContext: func() context.Context {
2185				return context.Background()
2186			},
2187			expectedFlags:      expectedFlags(),
2188			expectedOutgoingMD: metadataForFlags(expectedFlags()),
2189		},
2190		{
2191			desc: "with incoming metadata",
2192			setupContext: func() context.Context {
2193				ctx := context.Background()
2194				ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("key", "value"))
2195				return ctx
2196			},
2197			expectedIncomingMD: metadata.Pairs("key", "value"),
2198			expectedOutgoingMD: metadata.Join(
2199				metadata.Pairs("key", "value"),
2200				metadataForFlags(expectedFlags()),
2201			),
2202			expectedFlags: expectedFlags(),
2203		},
2204		{
2205			desc: "with outgoing metadata",
2206			setupContext: func() context.Context {
2207				ctx := context.Background()
2208				ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("key", "value"))
2209				return ctx
2210			},
2211			expectedOutgoingMD: metadata.Join(
2212				metadata.Pairs("key", "value"),
2213				metadataForFlags(expectedFlags()),
2214			),
2215			expectedFlags: expectedFlags(),
2216		},
2217		{
2218			desc: "with incoming and outgoing metadata",
2219			setupContext: func() context.Context {
2220				ctx := context.Background()
2221				ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("incoming", "value"))
2222				ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("outgoing", "value"))
2223				return ctx
2224			},
2225			// This behaviour is quite subtle: in the previous test case where we only
2226			// have outgoing metadata, we retain it. But in case we have both incoming
2227			// and outgoing we'd discard the outgoing metadata altogether. It is
2228			// debatable whether this is a bug or feature, so I'll just document this
2229			// weird edge case here for now.
2230			expectedIncomingMD: metadata.Pairs("incoming", "value"),
2231			expectedOutgoingMD: metadata.Join(
2232				metadata.Pairs("incoming", "value"),
2233				metadataForFlags(expectedFlags()),
2234			),
2235			expectedFlags: expectedFlags(),
2236		},
2237		{
2238			desc: "with flags set to their default values",
2239			setupContext: func() context.Context {
2240				ctx := context.Background()
2241				ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, enabledFF)
2242				ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, disabledFF)
2243				return ctx
2244			},
2245			expectedIncomingMD: metadata.Pairs(
2246				enabledFF.MetadataKey(), "true",
2247				disabledFF.MetadataKey(), "false",
2248			),
2249			expectedOutgoingMD: metadata.Join(
2250				metadataForFlags(expectedFlags()),
2251				metadata.Pairs(
2252					enabledFF.MetadataKey(), "true",
2253					disabledFF.MetadataKey(), "false",
2254				),
2255			),
2256			expectedFlags: expectedFlags(),
2257		},
2258		{
2259			desc: "with flags set to their reverse default values",
2260			setupContext: func() context.Context {
2261				ctx := context.Background()
2262				ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, enabledFF)
2263				ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, disabledFF)
2264				return ctx
2265			},
2266			expectedIncomingMD: metadata.Pairs(
2267				enabledFF.MetadataKey(), "false",
2268				disabledFF.MetadataKey(), "true",
2269			),
2270			expectedOutgoingMD: metadata.Join(
2271				metadataForFlags(expectedFlags(
2272					expectedFlag{flag: enabledFF, enabled: false},
2273					expectedFlag{flag: disabledFF, enabled: true},
2274				)),
2275				metadata.Pairs(
2276					enabledFF.MetadataKey(), "false",
2277					disabledFF.MetadataKey(), "true",
2278				),
2279			),
2280			expectedFlags: expectedFlags(
2281				expectedFlag{flag: enabledFF, enabled: false},
2282				expectedFlag{flag: disabledFF, enabled: true},
2283			),
2284		},
2285		{
2286			desc: "mixed flags and metadata",
2287			setupContext: func() context.Context {
2288				ctx := context.Background()
2289				ctx = metadata.NewIncomingContext(ctx, metadata.Pairs(
2290					disabledFF.MetadataKey(), "true",
2291					"incoming", "value"),
2292				)
2293				return ctx
2294			},
2295			expectedIncomingMD: metadata.Pairs(
2296				disabledFF.MetadataKey(), "true",
2297				"incoming", "value",
2298			),
2299			expectedOutgoingMD: metadata.Join(
2300				metadataForFlags(expectedFlags(
2301					expectedFlag{flag: disabledFF, enabled: true},
2302				)),
2303				metadata.Pairs(
2304					disabledFF.MetadataKey(), "true",
2305					"incoming", "value",
2306				),
2307			),
2308			expectedFlags: expectedFlags(
2309				expectedFlag{flag: disabledFF, enabled: true},
2310			),
2311		},
2312	} {
2313		t.Run(tc.desc, func(t *testing.T) {
2314			ctx := streamParametersContext(tc.setupContext())
2315
2316			incomingMD, ok := metadata.FromIncomingContext(ctx)
2317			if tc.expectedIncomingMD == nil {
2318				require.False(t, ok)
2319			} else {
2320				require.True(t, ok)
2321			}
2322			require.Equal(t, tc.expectedIncomingMD, incomingMD)
2323
2324			outgoingMD, ok := metadata.FromOutgoingContext(ctx)
2325			if tc.expectedOutgoingMD == nil {
2326				require.False(t, ok)
2327			} else {
2328				require.True(t, ok)
2329			}
2330			require.Equal(t, tc.expectedOutgoingMD, outgoingMD)
2331
2332			incomingCtx := gitaly_metadata.OutgoingToIncoming(ctx)
2333			for _, expectedFlag := range tc.expectedFlags {
2334				require.Equal(t, expectedFlag.enabled, expectedFlag.flag.IsEnabled(incomingCtx))
2335			}
2336		})
2337	}
2338}
2339