1package praefect 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "io" 8 "math/rand" 9 "strconv" 10 "strings" 11 "sync" 12 "sync/atomic" 13 "testing" 14 "time" 15 16 "github.com/prometheus/client_golang/prometheus" 17 "github.com/prometheus/client_golang/prometheus/testutil" 18 "github.com/sirupsen/logrus" 19 "github.com/stretchr/testify/assert" 20 "github.com/stretchr/testify/require" 21 "gitlab.com/gitlab-org/gitaly/v14/client" 22 "gitlab.com/gitlab-org/gitaly/v14/internal/cache" 23 "gitlab.com/gitlab-org/gitaly/v14/internal/gitaly/service" 24 "gitlab.com/gitlab-org/gitaly/v14/internal/helper" 25 gitaly_metadata "gitlab.com/gitlab-org/gitaly/v14/internal/metadata" 26 "gitlab.com/gitlab-org/gitaly/v14/internal/metadata/featureflag" 27 "gitlab.com/gitlab-org/gitaly/v14/internal/middleware/metadatahandler" 28 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/commonerr" 29 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" 30 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" 31 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore/glsql" 32 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/grpc-proxy/proxy" 33 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/mock" 34 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" 35 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" 36 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/transactions" 37 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" 38 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" 39 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testassert" 40 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testcfg" 41 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/testserver" 42 "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/txinfo" 43 "gitlab.com/gitlab-org/gitaly/v14/internal/transaction/voting" 44 "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb" 45 "gitlab.com/gitlab-org/labkit/correlation" 46 "google.golang.org/grpc" 47 "google.golang.org/grpc/codes" 48 "google.golang.org/grpc/health/grpc_health_v1" 49 "google.golang.org/grpc/metadata" 50 "google.golang.org/grpc/peer" 51 "google.golang.org/grpc/status" 52 "google.golang.org/protobuf/proto" 53 "google.golang.org/protobuf/types/known/emptypb" 54) 55 56var testLogger = logrus.New() 57 58func init() { 59 testLogger.SetOutput(io.Discard) 60} 61 62func TestSecondaryRotation(t *testing.T) { 63 t.Skip("secondary rotation will change with the new data model") 64} 65 66func TestStreamDirectorReadOnlyEnforcement(t *testing.T) { 67 t.Parallel() 68 db := glsql.NewDB(t) 69 for _, tc := range []struct { 70 desc string 71 readOnly bool 72 }{ 73 {desc: "writable", readOnly: false}, 74 {desc: "read-only", readOnly: true}, 75 } { 76 t.Run(tc.desc, func(t *testing.T) { 77 db.TruncateAll(t) 78 79 const ( 80 virtualStorage = "test-virtual-storage" 81 relativePath = "test-repository" 82 storage = "test-storage" 83 ) 84 conf := config.Config{ 85 VirtualStorages: []*config.VirtualStorage{ 86 { 87 Name: virtualStorage, 88 Nodes: []*config.Node{ 89 { 90 Address: "tcp://gitaly-primary.example.com", 91 Storage: storage, 92 }, 93 }, 94 }, 95 }, 96 } 97 98 ctx, cancel := testhelper.Context() 99 defer cancel() 100 101 rs := datastore.MockRepositoryStore{ 102 GetConsistentStoragesFunc: func(context.Context, string, string) (map[string]struct{}, error) { 103 if tc.readOnly { 104 return map[string]struct{}{storage + "-other": {}}, nil 105 } 106 return map[string]struct{}{storage: {}}, nil 107 }, 108 } 109 110 coordinator := NewCoordinator( 111 datastore.NewPostgresReplicationEventQueue(db), 112 rs, 113 NewNodeManagerRouter(&nodes.MockManager{GetShardFunc: func(vs string) (nodes.Shard, error) { 114 require.Equal(t, virtualStorage, vs) 115 return nodes.Shard{ 116 Primary: &nodes.MockNode{GetStorageMethod: func() string { 117 return storage 118 }}, 119 }, nil 120 }}, rs), 121 transactions.NewManager(conf), 122 conf, 123 protoregistry.GitalyProtoPreregistered, 124 ) 125 126 frame, err := proto.Marshal(&gitalypb.CleanupRequest{Repository: &gitalypb.Repository{ 127 StorageName: virtualStorage, 128 RelativePath: relativePath, 129 }}) 130 require.NoError(t, err) 131 132 _, err = coordinator.StreamDirector(ctx, "/gitaly.RepositoryService/Cleanup", &mockPeeker{frame: frame}) 133 if tc.readOnly { 134 require.Equal(t, ErrRepositoryReadOnly, err) 135 testhelper.RequireGrpcError(t, err, codes.FailedPrecondition) 136 } else { 137 require.NoError(t, err) 138 } 139 }) 140 } 141} 142 143func TestStreamDirectorMutator(t *testing.T) { 144 t.Parallel() 145 gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 146 testhelper.NewServerWithHealth(t, gitalySocket0) 147 testhelper.NewServerWithHealth(t, gitalySocket1) 148 149 primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1 150 primaryNode := &config.Node{Address: primaryAddress, Storage: "praefect-internal-1"} 151 secondaryNode := &config.Node{Address: secondaryAddress, Storage: "praefect-internal-2"} 152 conf := config.Config{ 153 VirtualStorages: []*config.VirtualStorage{ 154 { 155 Name: "praefect", 156 Nodes: []*config.Node{primaryNode, secondaryNode}, 157 }, 158 }, 159 } 160 db := glsql.NewDB(t) 161 162 targetRepo := gitalypb.Repository{ 163 StorageName: "praefect", 164 RelativePath: "/path/to/hashed/storage", 165 } 166 167 ctx, cancel := testhelper.Context() 168 defer cancel() 169 170 txMgr := transactions.NewManager(conf) 171 172 nodeSet, err := DialNodes(ctx, conf.VirtualStorages, protoregistry.GitalyProtoPreregistered, nil, nil, nil) 173 require.NoError(t, err) 174 defer nodeSet.Close() 175 176 for _, tc := range []struct { 177 desc string 178 repositoryExists bool 179 error error 180 }{ 181 { 182 desc: "succcessful", 183 repositoryExists: true, 184 }, 185 { 186 desc: "repository not found", 187 error: helper.ErrNotFound(fmt.Errorf("mutator call: route repository mutator: %w", fmt.Errorf("get primary: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath)))), 188 }, 189 } { 190 t.Run(tc.desc, func(t *testing.T) { 191 tx := db.Begin(t) 192 defer tx.Rollback(t) 193 194 rs := datastore.NewPostgresRepositoryStore(tx, conf.StorageNames()) 195 196 if tc.repositoryExists { 197 require.NoError(t, rs.CreateRepository(ctx, 1, targetRepo.StorageName, targetRepo.RelativePath, primaryNode.Storage, []string{secondaryNode.Storage}, nil, true, true)) 198 } 199 200 testhelper.SetHealthyNodes(t, ctx, tx, map[string]map[string][]string{"praefect": conf.StorageNames()}) 201 queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) 202 queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { 203 assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") 204 return queue.Enqueue(ctx, event) 205 }) 206 207 coordinator := NewCoordinator( 208 queueInterceptor, 209 rs, 210 NewPerRepositoryRouter( 211 nodeSet.Connections(), 212 nodes.NewPerRepositoryElector(tx), 213 StaticHealthChecker(conf.StorageNames()), 214 NewLockedRandom(rand.New(rand.NewSource(0))), 215 rs, 216 datastore.NewAssignmentStore(tx, conf.StorageNames()), 217 rs, 218 nil, 219 ), 220 txMgr, 221 conf, 222 protoregistry.GitalyProtoPreregistered, 223 ) 224 225 frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ 226 Origin: &targetRepo, 227 ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo}, 228 }) 229 require.NoError(t, err) 230 231 require.NoError(t, err) 232 233 fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool" 234 235 peeker := &mockPeeker{frame} 236 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 237 if tc.error != nil { 238 require.Equal(t, tc.error, err) 239 return 240 } 241 242 require.NoError(t, err) 243 require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) 244 245 mi, err := coordinator.registry.LookupMethod(fullMethod) 246 require.NoError(t, err) 247 248 m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) 249 require.NoError(t, err) 250 251 rewrittenTargetRepo, err := mi.TargetRepo(m) 252 require.NoError(t, err) 253 require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") 254 255 // this call creates new events in the queue and simulates usual flow of the update operation 256 require.NoError(t, streamParams.RequestFinalizer()) 257 258 // wait until event persisted (async operation) 259 require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { 260 return len(i.GetEnqueuedResult()) == 1 261 })) 262 263 events, err := queueInterceptor.Dequeue(ctx, "praefect", "praefect-internal-2", 10) 264 require.NoError(t, err) 265 require.Len(t, events, 1) 266 267 expectedEvent := datastore.ReplicationEvent{ 268 ID: 1, 269 State: datastore.JobStateInProgress, 270 Attempt: 2, 271 LockID: "praefect|praefect-internal-2|/path/to/hashed/storage", 272 CreatedAt: events[0].CreatedAt, 273 UpdatedAt: events[0].UpdatedAt, 274 Job: datastore.ReplicationJob{ 275 RepositoryID: 1, 276 Change: datastore.UpdateRepo, 277 VirtualStorage: conf.VirtualStorages[0].Name, 278 RelativePath: targetRepo.RelativePath, 279 TargetNodeStorage: secondaryNode.Storage, 280 SourceNodeStorage: primaryNode.Storage, 281 }, 282 Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, 283 } 284 require.Equal(t, expectedEvent, events[0], "ensure replication job created by stream director is correct") 285 }) 286 } 287} 288 289func TestStreamDirectorMutator_StopTransaction(t *testing.T) { 290 t.Parallel() 291 socket := testhelper.GetTemporaryGitalySocketFileName(t) 292 testhelper.NewServerWithHealth(t, socket) 293 294 conf := config.Config{ 295 VirtualStorages: []*config.VirtualStorage{ 296 { 297 Name: "praefect", 298 Nodes: []*config.Node{ 299 {Address: "unix://" + socket, Storage: "primary"}, 300 {Address: "unix://" + socket, Storage: "secondary"}, 301 }, 302 }, 303 }, 304 } 305 306 repo := gitalypb.Repository{ 307 StorageName: "praefect", 308 RelativePath: "/path/to/hashed/storage", 309 } 310 311 nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 312 require.NoError(t, err) 313 nodeMgr.Start(0, time.Hour) 314 315 ctx, cancel := testhelper.Context() 316 defer cancel() 317 318 shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) 319 require.NoError(t, err) 320 321 for _, name := range []string{"primary", "secondary"} { 322 node, err := shard.GetNode(name) 323 require.NoError(t, err) 324 waitNodeToChangeHealthStatus(ctx, t, node, true) 325 } 326 327 rs := datastore.MockRepositoryStore{ 328 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 329 return map[string]struct{}{"primary": {}, "secondary": {}}, nil 330 }, 331 } 332 333 txMgr := transactions.NewManager(conf) 334 335 coordinator := NewCoordinator( 336 datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)), 337 rs, 338 NewNodeManagerRouter(nodeMgr, rs), 339 txMgr, 340 conf, 341 protoregistry.GitalyProtoPreregistered, 342 ) 343 344 fullMethod := "/gitaly.SmartHTTPService/PostReceivePack" 345 346 frame, err := proto.Marshal(&gitalypb.PostReceivePackRequest{ 347 Repository: &repo, 348 }) 349 require.NoError(t, err) 350 peeker := &mockPeeker{frame} 351 352 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 353 require.NoError(t, err) 354 355 txCtx := peer.NewContext(streamParams.Primary().Ctx, &peer.Peer{}) 356 transaction, err := txinfo.TransactionFromContext(txCtx) 357 require.NoError(t, err) 358 359 var wg sync.WaitGroup 360 var syncWG sync.WaitGroup 361 362 wg.Add(2) 363 syncWG.Add(2) 364 365 go func() { 366 defer wg.Done() 367 368 vote := voting.VoteFromData([]byte("vote")) 369 err := txMgr.VoteTransaction(ctx, transaction.ID, "primary", vote) 370 require.NoError(t, err) 371 372 // Assure that at least one vote was agreed on. 373 syncWG.Done() 374 syncWG.Wait() 375 376 require.NoError(t, txMgr.StopTransaction(ctx, transaction.ID)) 377 }() 378 379 go func() { 380 defer wg.Done() 381 382 vote := voting.VoteFromData([]byte("vote")) 383 err := txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote) 384 require.NoError(t, err) 385 386 // Assure that at least one vote was agreed on. 387 syncWG.Done() 388 syncWG.Wait() 389 390 err = txMgr.VoteTransaction(ctx, transaction.ID, "secondary", vote) 391 assert.True(t, errors.Is(err, transactions.ErrTransactionStopped)) 392 }() 393 394 wg.Wait() 395 396 err = streamParams.RequestFinalizer() 397 require.NoError(t, err) 398} 399 400type mockRouter struct { 401 Router 402 routeRepositoryAccessorFunc func(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) 403} 404 405func (m mockRouter) RouteRepositoryAccessor(ctx context.Context, virtualStorage, relativePath string, forcePrimary bool) (RouterNode, error) { 406 return m.routeRepositoryAccessorFunc(ctx, virtualStorage, relativePath, forcePrimary) 407} 408 409func TestStreamDirectorAccessor(t *testing.T) { 410 t.Parallel() 411 gitalySocket := testhelper.GetTemporaryGitalySocketFileName(t) 412 testhelper.NewServerWithHealth(t, gitalySocket) 413 414 gitalyAddress := "unix://" + gitalySocket 415 conf := config.Config{ 416 VirtualStorages: []*config.VirtualStorage{ 417 { 418 Name: "praefect", 419 Nodes: []*config.Node{ 420 { 421 Address: gitalyAddress, 422 Storage: "praefect-internal-1", 423 }, 424 }, 425 }, 426 }, 427 } 428 429 queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) 430 431 targetRepo := gitalypb.Repository{ 432 StorageName: "praefect", 433 RelativePath: "/path/to/hashed/storage", 434 } 435 436 ctx, cancel := testhelper.Context() 437 defer cancel() 438 439 entry := testhelper.DiscardTestEntry(t) 440 rs := datastore.MockRepositoryStore{} 441 442 nodeMgr, err := nodes.NewManager(entry, conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 443 require.NoError(t, err) 444 nodeMgr.Start(0, time.Minute) 445 446 txMgr := transactions.NewManager(conf) 447 448 for _, tc := range []struct { 449 desc string 450 router Router 451 error error 452 }{ 453 { 454 desc: "success", 455 router: NewNodeManagerRouter(nodeMgr, rs), 456 }, 457 { 458 desc: "repository not found", 459 router: mockRouter{ 460 routeRepositoryAccessorFunc: func(_ context.Context, virtualStorage, relativePath string, _ bool) (RouterNode, error) { 461 return RouterNode{}, commonerr.NewRepositoryNotFoundError(virtualStorage, relativePath) 462 }, 463 }, 464 error: helper.ErrNotFound(fmt.Errorf("accessor call: route repository accessor: %w", commonerr.NewRepositoryNotFoundError(targetRepo.StorageName, targetRepo.RelativePath))), 465 }, 466 } { 467 t.Run(tc.desc, func(t *testing.T) { 468 coordinator := NewCoordinator( 469 queue, 470 rs, 471 tc.router, 472 txMgr, 473 conf, 474 protoregistry.GitalyProtoPreregistered, 475 ) 476 477 frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) 478 require.NoError(t, err) 479 480 fullMethod := "/gitaly.RefService/FindAllBranches" 481 482 peeker := &mockPeeker{frame: frame} 483 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 484 if tc.error != nil { 485 require.Equal(t, tc.error, err) 486 return 487 } 488 489 require.NoError(t, err) 490 require.Equal(t, gitalyAddress, streamParams.Primary().Conn.Target()) 491 492 mi, err := coordinator.registry.LookupMethod(fullMethod) 493 require.NoError(t, err) 494 require.Equal(t, protoregistry.ScopeRepository, mi.Scope, "method must be repository scoped") 495 require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") 496 497 m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) 498 require.NoError(t, err) 499 500 rewrittenTargetRepo, err := mi.TargetRepo(m) 501 require.NoError(t, err) 502 require.Equal(t, "praefect-internal-1", rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") 503 }) 504 } 505} 506 507func TestCoordinatorStreamDirector_distributesReads(t *testing.T) { 508 t.Parallel() 509 gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 510 primaryHealthSrv := testhelper.NewServerWithHealth(t, gitalySocket0) 511 healthSrv := testhelper.NewServerWithHealth(t, gitalySocket1) 512 513 primaryNodeConf := config.Node{ 514 Address: "unix://" + gitalySocket0, 515 Storage: "gitaly-1", 516 } 517 518 secondaryNodeConf := config.Node{ 519 Address: "unix://" + gitalySocket1, 520 Storage: "gitaly-2", 521 } 522 conf := config.Config{ 523 VirtualStorages: []*config.VirtualStorage{ 524 { 525 Name: "praefect", 526 Nodes: []*config.Node{&primaryNodeConf, &secondaryNodeConf}, 527 }, 528 }, 529 Failover: config.Failover{ 530 Enabled: true, 531 ElectionStrategy: "local", 532 }, 533 } 534 535 queue := datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t)) 536 537 targetRepo := gitalypb.Repository{ 538 StorageName: "praefect", 539 RelativePath: "/path/to/hashed/storage", 540 } 541 542 ctx, cancel := testhelper.Context() 543 defer cancel() 544 545 entry := testhelper.DiscardTestEntry(t) 546 547 repoStore := datastore.MockRepositoryStore{ 548 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 549 return map[string]struct{}{primaryNodeConf.Storage: {}, secondaryNodeConf.Storage: {}}, nil 550 }, 551 } 552 553 nodeMgr, err := nodes.NewManager(entry, conf, nil, repoStore, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 554 require.NoError(t, err) 555 nodeMgr.Start(0, time.Minute) 556 557 txMgr := transactions.NewManager(conf) 558 559 coordinator := NewCoordinator( 560 queue, 561 repoStore, 562 NewNodeManagerRouter(nodeMgr, repoStore), 563 txMgr, 564 conf, 565 protoregistry.GitalyProtoPreregistered, 566 ) 567 568 t.Run("forwards accessor operations", func(t *testing.T) { 569 var primaryChosen int 570 var secondaryChosen int 571 572 for i := 0; i < 16; i++ { 573 frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) 574 require.NoError(t, err) 575 576 fullMethod := "/gitaly.RefService/FindAllBranches" 577 578 peeker := &mockPeeker{frame: frame} 579 580 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 581 require.NoError(t, err) 582 require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary") 583 584 var nodeConf config.Node 585 switch streamParams.Primary().Conn.Target() { 586 case primaryNodeConf.Address: 587 nodeConf = primaryNodeConf 588 primaryChosen++ 589 case secondaryNodeConf.Address: 590 nodeConf = secondaryNodeConf 591 secondaryChosen++ 592 } 593 594 mi, err := coordinator.registry.LookupMethod(fullMethod) 595 require.NoError(t, err) 596 require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") 597 598 m, err := protoMessage(mi, streamParams.Primary().Msg) 599 require.NoError(t, err) 600 601 rewrittenTargetRepo, err := mi.TargetRepo(m) 602 require.NoError(t, err) 603 require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") 604 } 605 606 require.NotZero(t, primaryChosen, "primary should have been chosen at least once") 607 require.NotZero(t, secondaryChosen, "secondary should have been chosen at least once") 608 }) 609 610 t.Run("forwards accessor to primary if force-routing", func(t *testing.T) { 611 var primaryChosen int 612 var secondaryChosen int 613 614 for i := 0; i < 16; i++ { 615 frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) 616 require.NoError(t, err) 617 618 fullMethod := "/gitaly.RefService/FindAllBranches" 619 620 peeker := &mockPeeker{frame: frame} 621 622 ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") 623 ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) 624 625 streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) 626 require.NoError(t, err) 627 require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary") 628 629 var nodeConf config.Node 630 switch streamParams.Primary().Conn.Target() { 631 case primaryNodeConf.Address: 632 nodeConf = primaryNodeConf 633 primaryChosen++ 634 case secondaryNodeConf.Address: 635 nodeConf = secondaryNodeConf 636 secondaryChosen++ 637 } 638 639 mi, err := coordinator.registry.LookupMethod(fullMethod) 640 require.NoError(t, err) 641 require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") 642 643 m, err := protoMessage(mi, streamParams.Primary().Msg) 644 require.NoError(t, err) 645 646 rewrittenTargetRepo, err := mi.TargetRepo(m) 647 require.NoError(t, err) 648 require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") 649 } 650 651 require.Equal(t, 16, primaryChosen, "primary should have always been chosen") 652 require.Zero(t, secondaryChosen, "secondary should never have been chosen") 653 }) 654 655 t.Run("forwards accessor to primary for primary-only RPCs", func(t *testing.T) { 656 var primaryChosen int 657 var secondaryChosen int 658 659 for i := 0; i < 16; i++ { 660 frame, err := proto.Marshal(&gitalypb.GetObjectDirectorySizeRequest{Repository: &targetRepo}) 661 require.NoError(t, err) 662 663 fullMethod := "/gitaly.RepositoryService/GetObjectDirectorySize" 664 665 peeker := &mockPeeker{frame: frame} 666 667 ctx, cancel := testhelper.Context() 668 defer cancel() 669 670 streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) 671 require.NoError(t, err) 672 require.Contains(t, []string{primaryNodeConf.Address, secondaryNodeConf.Address}, streamParams.Primary().Conn.Target(), "must be redirected to primary or secondary") 673 674 var nodeConf config.Node 675 switch streamParams.Primary().Conn.Target() { 676 case primaryNodeConf.Address: 677 nodeConf = primaryNodeConf 678 primaryChosen++ 679 case secondaryNodeConf.Address: 680 nodeConf = secondaryNodeConf 681 secondaryChosen++ 682 } 683 684 mi, err := coordinator.registry.LookupMethod(fullMethod) 685 require.NoError(t, err) 686 require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") 687 688 m, err := protoMessage(mi, streamParams.Primary().Msg) 689 require.NoError(t, err) 690 691 rewrittenTargetRepo, err := mi.TargetRepo(m) 692 require.NoError(t, err) 693 require.Equal(t, nodeConf.Storage, rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") 694 } 695 696 require.Equal(t, 16, primaryChosen, "primary should have always been chosen") 697 require.Zero(t, secondaryChosen, "secondary should never have been chosen") 698 }) 699 700 t.Run("forwards accessor operations only to healthy nodes", func(t *testing.T) { 701 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 702 703 shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) 704 require.NoError(t, err) 705 706 gitaly1, err := shard.GetNode(secondaryNodeConf.Storage) 707 require.NoError(t, err) 708 waitNodeToChangeHealthStatus(ctx, t, gitaly1, false) 709 defer func() { 710 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 711 waitNodeToChangeHealthStatus(ctx, t, gitaly1, true) 712 }() 713 714 frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) 715 require.NoError(t, err) 716 717 fullMethod := "/gitaly.RefService/FindAllBranches" 718 719 peeker := &mockPeeker{frame: frame} 720 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 721 require.NoError(t, err) 722 require.Equal(t, primaryNodeConf.Address, streamParams.Primary().Conn.Target(), "must be redirected to primary") 723 724 mi, err := coordinator.registry.LookupMethod(fullMethod) 725 require.NoError(t, err) 726 require.Equal(t, protoregistry.OpAccessor, mi.Operation, "method must be an accessor") 727 728 m, err := protoMessage(mi, streamParams.Primary().Msg) 729 require.NoError(t, err) 730 731 rewrittenTargetRepo, err := mi.TargetRepo(m) 732 require.NoError(t, err) 733 require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") 734 }) 735 736 t.Run("fails if force-routing to unhealthy primary", func(t *testing.T) { 737 primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 738 739 shard, err := nodeMgr.GetShard(ctx, conf.VirtualStorages[0].Name) 740 require.NoError(t, err) 741 742 primaryGitaly, err := shard.GetNode(primaryNodeConf.Storage) 743 require.NoError(t, err) 744 waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, false) 745 defer func() { 746 primaryHealthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 747 waitNodeToChangeHealthStatus(ctx, t, primaryGitaly, true) 748 }() 749 750 frame, err := proto.Marshal(&gitalypb.FindAllBranchesRequest{Repository: &targetRepo}) 751 require.NoError(t, err) 752 753 fullMethod := "/gitaly.RefService/FindAllBranches" 754 755 ctx := correlation.ContextWithCorrelation(ctx, "my-correlation-id") 756 ctx = testhelper.MergeIncomingMetadata(ctx, metadata.Pairs(routeRepositoryAccessorPolicy, routeRepositoryAccessorPolicyPrimaryOnly)) 757 758 peeker := &mockPeeker{frame: frame} 759 _, err = coordinator.StreamDirector(ctx, fullMethod, peeker) 760 require.True(t, errors.Is(err, nodes.ErrPrimaryNotHealthy)) 761 }) 762 763 t.Run("doesn't forward mutator operations", func(t *testing.T) { 764 frame, err := proto.Marshal(&gitalypb.UserUpdateBranchRequest{Repository: &targetRepo}) 765 require.NoError(t, err) 766 767 fullMethod := "/gitaly.OperationService/UserUpdateBranch" 768 769 peeker := &mockPeeker{frame: frame} 770 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 771 require.NoError(t, err) 772 require.Equal(t, primaryNodeConf.Address, streamParams.Primary().Conn.Target(), "must be redirected to primary") 773 774 mi, err := coordinator.registry.LookupMethod(fullMethod) 775 require.NoError(t, err) 776 require.Equal(t, protoregistry.OpMutator, mi.Operation, "method must be a mutator") 777 778 m, err := protoMessage(mi, streamParams.Primary().Msg) 779 require.NoError(t, err) 780 781 rewrittenTargetRepo, err := mi.TargetRepo(m) 782 require.NoError(t, err) 783 require.Equal(t, "gitaly-1", rewrittenTargetRepo.GetStorageName(), "stream director must rewrite the storage name") 784 }) 785} 786 787func TestStreamDirector_repo_creation(t *testing.T) { 788 t.Parallel() 789 790 db := glsql.NewDB(t) 791 792 for _, tc := range []struct { 793 desc string 794 electionStrategy config.ElectionStrategy 795 replicationFactor int 796 primaryStored bool 797 assignmentsStored bool 798 }{ 799 { 800 desc: "virtual storage scoped primaries", 801 electionStrategy: config.ElectionStrategySQL, 802 replicationFactor: 3, // assignments are not set when not using repository specific primaries 803 primaryStored: false, 804 assignmentsStored: false, 805 }, 806 { 807 desc: "repository specific primaries without variable replication factor", 808 electionStrategy: config.ElectionStrategyPerRepository, 809 primaryStored: true, 810 assignmentsStored: false, 811 }, 812 { 813 desc: "repository specific primaries with variable replication factor", 814 electionStrategy: config.ElectionStrategyPerRepository, 815 replicationFactor: 3, 816 primaryStored: true, 817 assignmentsStored: true, 818 }, 819 } { 820 t.Run(tc.desc, func(t *testing.T) { 821 db.TruncateAll(t) 822 primaryNode := &config.Node{Storage: "praefect-internal-1"} 823 healthySecondaryNode := &config.Node{Storage: "praefect-internal-2"} 824 unhealthySecondaryNode := &config.Node{Storage: "praefect-internal-3"} 825 conf := config.Config{ 826 Failover: config.Failover{ElectionStrategy: tc.electionStrategy}, 827 VirtualStorages: []*config.VirtualStorage{ 828 { 829 Name: "praefect", 830 DefaultReplicationFactor: tc.replicationFactor, 831 Nodes: []*config.Node{primaryNode, healthySecondaryNode, unhealthySecondaryNode}, 832 }, 833 }, 834 } 835 836 rewrittenStorage := primaryNode.Storage 837 targetRepo := gitalypb.Repository{ 838 StorageName: "praefect", 839 RelativePath: "/path/to/hashed/storage", 840 } 841 842 var createRepositoryCalled int64 843 rs := datastore.MockRepositoryStore{ 844 CreateRepositoryFunc: func(ctx context.Context, repoID int64, virtualStorage, relativePath, primary string, updatedSecondaries, outdatedSecondaries []string, storePrimary, storeAssignments bool) error { 845 atomic.AddInt64(&createRepositoryCalled, 1) 846 assert.Equal(t, int64(0), repoID) 847 assert.Equal(t, targetRepo.StorageName, virtualStorage) 848 assert.Equal(t, targetRepo.RelativePath, relativePath) 849 assert.Equal(t, rewrittenStorage, primary) 850 assert.Equal(t, []string{healthySecondaryNode.Storage}, updatedSecondaries) 851 assert.Equal(t, []string{unhealthySecondaryNode.Storage}, outdatedSecondaries) 852 assert.Equal(t, tc.primaryStored, storePrimary) 853 assert.Equal(t, tc.assignmentsStored, storeAssignments) 854 return nil 855 }, 856 } 857 858 var router Router 859 var primaryConnPointer string 860 var secondaryConnPointers []string 861 switch tc.electionStrategy { 862 case config.ElectionStrategySQL: 863 gitalySocket0 := testhelper.GetTemporaryGitalySocketFileName(t) 864 gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t) 865 gitalySocket2 := testhelper.GetTemporaryGitalySocketFileName(t) 866 testhelper.NewServerWithHealth(t, gitalySocket0) 867 testhelper.NewServerWithHealth(t, gitalySocket1) 868 healthSrv2 := testhelper.NewServerWithHealth(t, gitalySocket2) 869 healthSrv2.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 870 871 primaryNode.Address = "unix://" + gitalySocket0 872 healthySecondaryNode.Address = "unix://" + gitalySocket1 873 unhealthySecondaryNode.Address = "unix://" + gitalySocket2 874 875 nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 876 require.NoError(t, err) 877 nodeMgr.Start(0, time.Hour) 878 879 router = NewNodeManagerRouter(nodeMgr, rs) 880 for _, node := range nodeMgr.Nodes()["praefect"] { 881 if node.GetStorage() == primaryNode.Storage { 882 primaryConnPointer = fmt.Sprintf("%p", node.GetConnection()) 883 continue 884 } 885 886 if node.GetStorage() == healthySecondaryNode.Storage { 887 secondaryConnPointers = []string{fmt.Sprintf("%p", node.GetConnection())} 888 } 889 } 890 case config.ElectionStrategyPerRepository: 891 conns := Connections{ 892 "praefect": { 893 primaryNode.Storage: &grpc.ClientConn{}, 894 healthySecondaryNode.Storage: &grpc.ClientConn{}, 895 unhealthySecondaryNode.Storage: &grpc.ClientConn{}, 896 }, 897 } 898 primaryConnPointer = fmt.Sprintf("%p", conns["praefect"][primaryNode.Storage]) 899 secondaryConnPointers = []string{fmt.Sprintf("%p", conns["praefect"][healthySecondaryNode.Storage])} 900 router = NewPerRepositoryRouter( 901 conns, 902 nil, 903 StaticHealthChecker{"praefect": {primaryNode.Storage, healthySecondaryNode.Storage}}, 904 mockRandom{ 905 intnFunc: func(n int) int { 906 require.Equal(t, n, 2, "number of primary candidates should match the number of healthy nodes") 907 return 0 908 }, 909 shuffleFunc: func(n int, swap func(int, int)) { 910 require.Equal(t, n, 2, "number of secondary candidates should match the number of node minus the primary") 911 }, 912 }, 913 nil, 914 nil, 915 rs, 916 conf.DefaultReplicationFactors(), 917 ) 918 default: 919 t.Fatalf("unexpected election strategy: %q", tc.electionStrategy) 920 } 921 922 txMgr := transactions.NewManager(conf) 923 queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(db)) 924 925 coordinator := NewCoordinator( 926 queueInterceptor, 927 rs, 928 router, 929 txMgr, 930 conf, 931 protoregistry.GitalyProtoPreregistered, 932 ) 933 934 frame, err := proto.Marshal(&gitalypb.CreateRepositoryRequest{ 935 Repository: &targetRepo, 936 }) 937 require.NoError(t, err) 938 939 fullMethod := "/gitaly.RepositoryService/CreateRepository" 940 941 ctx, cancel := testhelper.Context() 942 defer cancel() 943 944 peeker := &mockPeeker{frame} 945 streamParams, err := coordinator.StreamDirector(correlation.ContextWithCorrelation(ctx, "my-correlation-id"), fullMethod, peeker) 946 require.NoError(t, err) 947 require.Equal(t, primaryConnPointer, fmt.Sprintf("%p", streamParams.Primary().Conn)) 948 949 var secondaries []string 950 for _, dst := range streamParams.Secondaries() { 951 secondaries = append(secondaries, fmt.Sprintf("%p", dst.Conn)) 952 } 953 require.Equal(t, secondaryConnPointers, secondaries, "secondary connections did not match expected") 954 955 mi, err := coordinator.registry.LookupMethod(fullMethod) 956 require.NoError(t, err) 957 958 m, err := mi.UnmarshalRequestProto(streamParams.Primary().Msg) 959 require.NoError(t, err) 960 961 rewrittenTargetRepo, err := mi.TargetRepo(m) 962 require.NoError(t, err) 963 require.Equal(t, rewrittenStorage, rewrittenTargetRepo.GetStorageName(), "stream director should have rewritten the storage name") 964 965 vote := voting.VoteFromData([]byte{}) 966 require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-1", vote)) 967 require.NoError(t, txMgr.VoteTransaction(ctx, 1, "praefect-internal-2", vote)) 968 969 // this call creates new events in the queue and simulates usual flow of the update operation 970 err = streamParams.RequestFinalizer() 971 require.NoError(t, err) 972 973 // wait until event persisted (async operation) 974 require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { 975 return len(i.GetEnqueuedResult()) == 1 976 })) 977 978 var expectedEvents, actualEvents []datastore.ReplicationEvent 979 for _, target := range []string{unhealthySecondaryNode.Storage} { 980 actual, err := queueInterceptor.Dequeue(ctx, "praefect", target, 10) 981 require.NoError(t, err) 982 require.Len(t, actual, 1) 983 984 actualEvents = append(actualEvents, actual[0]) 985 expectedEvents = append(expectedEvents, datastore.ReplicationEvent{ 986 ID: actual[0].ID, 987 State: datastore.JobStateInProgress, 988 Attempt: 2, 989 LockID: fmt.Sprintf("praefect|%s|/path/to/hashed/storage", target), 990 CreatedAt: actual[0].CreatedAt, 991 UpdatedAt: actual[0].UpdatedAt, 992 Job: datastore.ReplicationJob{ 993 Change: datastore.UpdateRepo, 994 VirtualStorage: conf.VirtualStorages[0].Name, 995 RelativePath: targetRepo.RelativePath, 996 TargetNodeStorage: target, 997 SourceNodeStorage: primaryNode.Storage, 998 }, 999 Meta: datastore.Params{metadatahandler.CorrelationIDKey: "my-correlation-id"}, 1000 }) 1001 } 1002 1003 require.Equal(t, expectedEvents, actualEvents, "ensure replication job created by stream director is correct") 1004 require.EqualValues(t, 1, atomic.LoadInt64(&createRepositoryCalled), "ensure CreateRepository is called on datastore") 1005 }) 1006 } 1007} 1008 1009func waitNodeToChangeHealthStatus(ctx context.Context, t *testing.T, node nodes.Node, health bool) { 1010 t.Helper() 1011 1012 ctx, cancel := context.WithTimeout(ctx, time.Second) 1013 defer cancel() 1014 1015 for node.IsHealthy() != health { 1016 _, err := node.CheckHealth(ctx) 1017 require.NoError(t, err) 1018 } 1019} 1020 1021type mockPeeker struct { 1022 frame []byte 1023} 1024 1025func (m *mockPeeker) Peek() ([]byte, error) { 1026 return m.frame, nil 1027} 1028 1029func (m *mockPeeker) Modify(payload []byte) error { 1030 m.frame = payload 1031 1032 return nil 1033} 1034 1035func TestAbsentCorrelationID(t *testing.T) { 1036 t.Parallel() 1037 gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 1038 healthSrv0 := testhelper.NewServerWithHealth(t, gitalySocket0) 1039 healthSrv1 := testhelper.NewServerWithHealth(t, gitalySocket1) 1040 healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 1041 healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 1042 1043 primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1 1044 conf := config.Config{ 1045 VirtualStorages: []*config.VirtualStorage{ 1046 { 1047 Name: "praefect", 1048 Nodes: []*config.Node{ 1049 { 1050 Address: primaryAddress, 1051 Storage: "praefect-internal-1", 1052 }, 1053 { 1054 Address: secondaryAddress, 1055 Storage: "praefect-internal-2", 1056 }, 1057 }, 1058 }, 1059 }, 1060 } 1061 1062 queueInterceptor := datastore.NewReplicationEventQueueInterceptor(datastore.NewPostgresReplicationEventQueue(glsql.NewDB(t))) 1063 queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { 1064 assert.True(t, len(queueInterceptor.GetEnqueued()) < 2, "expected only one event to be created") 1065 return queue.Enqueue(ctx, event) 1066 }) 1067 targetRepo := gitalypb.Repository{ 1068 StorageName: "praefect", 1069 RelativePath: "/path/to/hashed/storage", 1070 } 1071 1072 ctx, cancel := testhelper.Context() 1073 defer cancel() 1074 1075 entry := testhelper.DiscardTestEntry(t) 1076 1077 nodeMgr, err := nodes.NewManager(entry, conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 1078 require.NoError(t, err) 1079 nodeMgr.Start(0, time.Hour) 1080 1081 txMgr := transactions.NewManager(conf) 1082 rs := datastore.MockRepositoryStore{} 1083 1084 coordinator := NewCoordinator( 1085 queueInterceptor, 1086 rs, 1087 NewNodeManagerRouter(nodeMgr, rs), 1088 txMgr, 1089 conf, 1090 protoregistry.GitalyProtoPreregistered, 1091 ) 1092 1093 frame, err := proto.Marshal(&gitalypb.CreateObjectPoolRequest{ 1094 Origin: &targetRepo, 1095 ObjectPool: &gitalypb.ObjectPool{Repository: &targetRepo}, 1096 }) 1097 require.NoError(t, err) 1098 1099 fullMethod := "/gitaly.ObjectPoolService/CreateObjectPool" 1100 peeker := &mockPeeker{frame} 1101 streamParams, err := coordinator.StreamDirector(ctx, fullMethod, peeker) 1102 require.NoError(t, err) 1103 require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target()) 1104 1105 // must be run as it adds replication events to the queue 1106 require.NoError(t, streamParams.RequestFinalizer()) 1107 1108 require.NoError(t, queueInterceptor.Wait(time.Minute, func(i *datastore.ReplicationEventQueueInterceptor) bool { 1109 return len(i.GetEnqueuedResult()) == 1 1110 })) 1111 jobs, err := queueInterceptor.Dequeue(ctx, conf.VirtualStorages[0].Name, conf.VirtualStorages[0].Nodes[1].Storage, 1) 1112 require.NoError(t, err) 1113 require.Len(t, jobs, 1) 1114 1115 require.NotZero(t, jobs[0].Meta[metadatahandler.CorrelationIDKey], 1116 "the coordinator should have generated a random ID") 1117} 1118 1119func TestCoordinatorEnqueueFailure(t *testing.T) { 1120 t.Parallel() 1121 conf := config.Config{ 1122 VirtualStorages: []*config.VirtualStorage{ 1123 { 1124 Name: "praefect", 1125 Nodes: []*config.Node{ 1126 { 1127 Address: "unix:///woof", 1128 Storage: "praefect-internal-1", 1129 }, 1130 { 1131 Address: "unix:///meow", 1132 Storage: "praefect-internal-2", 1133 }, 1134 }, 1135 }, 1136 }, 1137 } 1138 1139 queueInterceptor := datastore.NewReplicationEventQueueInterceptor(nil) 1140 errQ := make(chan error, 1) 1141 queueInterceptor.OnEnqueue(func(ctx context.Context, event datastore.ReplicationEvent, queue datastore.ReplicationEventQueue) (datastore.ReplicationEvent, error) { 1142 return datastore.ReplicationEvent{}, <-errQ 1143 }) 1144 queueInterceptor.OnDequeue(func(context.Context, string, string, int, datastore.ReplicationEventQueue) ([]datastore.ReplicationEvent, error) { 1145 return nil, nil 1146 }) 1147 queueInterceptor.OnAcknowledge(func(context.Context, datastore.JobState, []uint64, datastore.ReplicationEventQueue) ([]uint64, error) { 1148 return nil, nil 1149 }) 1150 1151 ms := &mockSvc{ 1152 repoMutatorUnary: func(context.Context, *mock.RepoRequest) (*emptypb.Empty, error) { 1153 return &emptypb.Empty{}, nil // always succeeds 1154 }, 1155 } 1156 1157 r, err := protoregistry.NewFromPaths("praefect/mock/mock.proto") 1158 require.NoError(t, err) 1159 1160 ctx, cancel := testhelper.Context() 1161 defer cancel() 1162 1163 cc, _, cleanup := runPraefectServer(t, ctx, conf, buildOptions{ 1164 withAnnotations: r, 1165 withQueue: queueInterceptor, 1166 withBackends: withMockBackends(t, map[string]mock.SimpleServiceServer{ 1167 conf.VirtualStorages[0].Nodes[0].Storage: ms, 1168 conf.VirtualStorages[0].Nodes[1].Storage: ms, 1169 }), 1170 }) 1171 defer cleanup() 1172 1173 mcli := mock.NewSimpleServiceClient(cc) 1174 1175 errQ <- nil 1176 repoReq := &mock.RepoRequest{ 1177 Repo: &gitalypb.Repository{ 1178 RelativePath: "meow", 1179 StorageName: conf.VirtualStorages[0].Name, 1180 }, 1181 } 1182 _, err = mcli.RepoMutatorUnary(ctx, repoReq) 1183 require.NoError(t, err) 1184 1185 expectErrMsg := "enqueue failed" 1186 errQ <- errors.New(expectErrMsg) 1187 _, err = mcli.RepoMutatorUnary(ctx, repoReq) 1188 require.Error(t, err) 1189 require.Equal(t, err.Error(), "rpc error: code = Unknown desc = enqueue replication event: "+expectErrMsg) 1190} 1191 1192func TestStreamDirectorStorageScope(t *testing.T) { 1193 // stubs health-check requests because nodes.NewManager establishes connection on creation 1194 gitalySocket0, gitalySocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 1195 testhelper.NewServerWithHealth(t, gitalySocket0) 1196 testhelper.NewServerWithHealth(t, gitalySocket1) 1197 1198 primaryAddress, secondaryAddress := "unix://"+gitalySocket0, "unix://"+gitalySocket1 1199 primaryGitaly := &config.Node{Address: primaryAddress, Storage: "gitaly-1"} 1200 secondaryGitaly := &config.Node{Address: secondaryAddress, Storage: "gitaly-2"} 1201 conf := config.Config{ 1202 Failover: config.Failover{Enabled: true}, 1203 VirtualStorages: []*config.VirtualStorage{{ 1204 Name: "praefect", 1205 Nodes: []*config.Node{primaryGitaly, secondaryGitaly}, 1206 }}, 1207 } 1208 1209 rs := datastore.MockRepositoryStore{} 1210 1211 nodeMgr, err := nodes.NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil, nil) 1212 require.NoError(t, err) 1213 nodeMgr.Start(0, time.Second) 1214 coordinator := NewCoordinator( 1215 nil, 1216 rs, 1217 NewNodeManagerRouter(nodeMgr, rs), 1218 nil, 1219 conf, 1220 protoregistry.GitalyProtoPreregistered, 1221 ) 1222 1223 ctx, cancel := testhelper.Context() 1224 defer cancel() 1225 1226 t.Run("mutator", func(t *testing.T) { 1227 fullMethod := "/gitaly.NamespaceService/RemoveNamespace" 1228 requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpMutator) 1229 1230 frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{ 1231 StorageName: conf.VirtualStorages[0].Name, 1232 Name: "stub", 1233 }) 1234 require.NoError(t, err) 1235 1236 streamParams, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame}) 1237 require.NoError(t, err) 1238 1239 require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target(), "stream director didn't redirect to gitaly storage") 1240 1241 rewritten := gitalypb.RemoveNamespaceRequest{} 1242 require.NoError(t, proto.Unmarshal(streamParams.Primary().Msg, &rewritten)) 1243 require.Equal(t, primaryGitaly.Storage, rewritten.StorageName, "stream director didn't rewrite storage") 1244 }) 1245 1246 t.Run("accessor", func(t *testing.T) { 1247 fullMethod := "/gitaly.NamespaceService/NamespaceExists" 1248 requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpAccessor) 1249 1250 frame, err := proto.Marshal(&gitalypb.NamespaceExistsRequest{ 1251 StorageName: conf.VirtualStorages[0].Name, 1252 Name: "stub", 1253 }) 1254 require.NoError(t, err) 1255 1256 streamParams, err := coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame}) 1257 require.NoError(t, err) 1258 1259 require.Equal(t, primaryAddress, streamParams.Primary().Conn.Target(), "stream director didn't redirect to gitaly storage") 1260 1261 rewritten := gitalypb.RemoveNamespaceRequest{} 1262 require.NoError(t, proto.Unmarshal(streamParams.Primary().Msg, &rewritten)) 1263 require.Equal(t, primaryGitaly.Storage, rewritten.StorageName, "stream director didn't rewrite storage") 1264 }) 1265} 1266 1267func TestStreamDirectorStorageScopeError(t *testing.T) { 1268 ctx, cancel := testhelper.Context() 1269 defer cancel() 1270 1271 t.Run("no storage provided", func(t *testing.T) { 1272 mgr := &nodes.MockManager{ 1273 GetShardFunc: func(s string) (nodes.Shard, error) { 1274 require.FailNow(t, "validation of input was not executed") 1275 return nodes.Shard{}, assert.AnError 1276 }, 1277 } 1278 1279 rs := datastore.MockRepositoryStore{} 1280 coordinator := NewCoordinator( 1281 nil, 1282 rs, 1283 NewNodeManagerRouter(mgr, rs), 1284 nil, 1285 config.Config{}, 1286 protoregistry.GitalyProtoPreregistered, 1287 ) 1288 1289 frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "", Name: "stub"}) 1290 require.NoError(t, err) 1291 1292 _, err = coordinator.StreamDirector(ctx, "/gitaly.NamespaceService/RemoveNamespace", &mockPeeker{frame}) 1293 require.Error(t, err) 1294 result, ok := status.FromError(err) 1295 require.True(t, ok) 1296 require.Equal(t, codes.InvalidArgument, result.Code()) 1297 require.Equal(t, "storage scoped: target storage is invalid", result.Message()) 1298 }) 1299 1300 t.Run("unknown storage provided", func(t *testing.T) { 1301 mgr := &nodes.MockManager{ 1302 GetShardFunc: func(s string) (nodes.Shard, error) { 1303 require.Equal(t, "fake", s) 1304 return nodes.Shard{}, nodes.ErrVirtualStorageNotExist 1305 }, 1306 } 1307 1308 rs := datastore.MockRepositoryStore{} 1309 coordinator := NewCoordinator( 1310 nil, 1311 rs, 1312 NewNodeManagerRouter(mgr, rs), 1313 nil, 1314 config.Config{}, 1315 protoregistry.GitalyProtoPreregistered, 1316 ) 1317 1318 frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"}) 1319 require.NoError(t, err) 1320 1321 _, err = coordinator.StreamDirector(ctx, "/gitaly.NamespaceService/RemoveNamespace", &mockPeeker{frame}) 1322 require.Error(t, err) 1323 result, ok := status.FromError(err) 1324 require.True(t, ok) 1325 require.Equal(t, codes.InvalidArgument, result.Code()) 1326 require.Equal(t, "virtual storage does not exist", result.Message()) 1327 }) 1328 1329 t.Run("primary gitaly is not healthy", func(t *testing.T) { 1330 t.Run("accessor", func(t *testing.T) { 1331 mgr := &nodes.MockManager{ 1332 GetShardFunc: func(s string) (nodes.Shard, error) { 1333 require.Equal(t, "fake", s) 1334 return nodes.Shard{}, nodes.ErrPrimaryNotHealthy 1335 }, 1336 } 1337 1338 rs := datastore.MockRepositoryStore{} 1339 coordinator := NewCoordinator( 1340 nil, 1341 rs, 1342 NewNodeManagerRouter(mgr, rs), 1343 nil, 1344 config.Config{}, 1345 protoregistry.GitalyProtoPreregistered, 1346 ) 1347 1348 fullMethod := "/gitaly.NamespaceService/NamespaceExists" 1349 requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpAccessor) 1350 1351 frame, err := proto.Marshal(&gitalypb.NamespaceExistsRequest{StorageName: "fake", Name: "stub"}) 1352 require.NoError(t, err) 1353 1354 _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame}) 1355 require.Error(t, err) 1356 result, ok := status.FromError(err) 1357 require.True(t, ok) 1358 require.Equal(t, codes.Internal, result.Code()) 1359 require.Equal(t, `accessor storage scoped: route storage accessor "fake": primary gitaly is not healthy`, result.Message()) 1360 }) 1361 1362 t.Run("mutator", func(t *testing.T) { 1363 mgr := &nodes.MockManager{ 1364 GetShardFunc: func(s string) (nodes.Shard, error) { 1365 require.Equal(t, "fake", s) 1366 return nodes.Shard{}, nodes.ErrPrimaryNotHealthy 1367 }, 1368 } 1369 rs := datastore.MockRepositoryStore{} 1370 coordinator := NewCoordinator( 1371 nil, 1372 rs, 1373 NewNodeManagerRouter(mgr, rs), 1374 nil, 1375 config.Config{}, 1376 protoregistry.GitalyProtoPreregistered, 1377 ) 1378 1379 fullMethod := "/gitaly.NamespaceService/RemoveNamespace" 1380 requireScopeOperation(t, coordinator.registry, fullMethod, protoregistry.ScopeStorage, protoregistry.OpMutator) 1381 1382 frame, err := proto.Marshal(&gitalypb.RemoveNamespaceRequest{StorageName: "fake", Name: "stub"}) 1383 require.NoError(t, err) 1384 1385 _, err = coordinator.StreamDirector(ctx, fullMethod, &mockPeeker{frame}) 1386 require.Error(t, err) 1387 result, ok := status.FromError(err) 1388 require.True(t, ok) 1389 require.Equal(t, codes.Internal, result.Code()) 1390 require.Equal(t, `mutator storage scoped: get shard "fake": primary gitaly is not healthy`, result.Message()) 1391 }) 1392 }) 1393} 1394 1395func TestDisabledTransactionsWithFeatureFlag(t *testing.T) { 1396 ctx, cancel := testhelper.Context() 1397 defer cancel() 1398 1399 for rpc, enabledFn := range transactionRPCs { 1400 if enabledFn(ctx) { 1401 require.True(t, shouldUseTransaction(ctx, rpc)) 1402 break 1403 } 1404 } 1405} 1406 1407func requireScopeOperation(t *testing.T, registry *protoregistry.Registry, fullMethod string, scope protoregistry.Scope, op protoregistry.OpType) { 1408 t.Helper() 1409 1410 mi, err := registry.LookupMethod(fullMethod) 1411 require.NoError(t, err) 1412 require.Equal(t, scope, mi.Scope, "scope doesn't match requested") 1413 require.Equal(t, op, mi.Operation, "operation type doesn't match requested") 1414} 1415 1416type mockOperationServer struct { 1417 gitalypb.UnimplementedOperationServiceServer 1418 t testing.TB 1419 wg *sync.WaitGroup 1420 err error 1421 called bool 1422} 1423 1424func (s *mockOperationServer) UserCreateBranch( 1425 context.Context, 1426 *gitalypb.UserCreateBranchRequest, 1427) (*gitalypb.UserCreateBranchResponse, error) { 1428 // We need to wait for all servers to arrive in this RPC. If we don't it could be that for 1429 // example the primary arrives quicker than the others and directly errors. This would cause 1430 // stream cancellation, and if the secondaries didn't yet end up in UserCreateBranch, we 1431 // wouldn't see the function call. 1432 s.called = true 1433 s.wg.Done() 1434 s.wg.Wait() 1435 return &gitalypb.UserCreateBranchResponse{}, s.err 1436} 1437 1438type mockLeaseEnder struct{} 1439 1440func (e mockLeaseEnder) EndLease(context.Context) error { 1441 return nil 1442} 1443 1444type mockDiskCache struct { 1445 cache.Cache 1446} 1447 1448func (c *mockDiskCache) StartLease(*gitalypb.Repository) (cache.LeaseEnder, error) { 1449 return mockLeaseEnder{}, nil 1450} 1451 1452// TestCoordinator_grpcErrorHandling asserts that we correctly proxy errors in case any of the nodes 1453// fails. Most importantly, we want to make sure to only ever forward errors from the primary and 1454// never from the secondaries. 1455func TestCoordinator_grpcErrorHandling(t *testing.T) { 1456 t.Parallel() 1457 praefectConfig := config.Config{ 1458 VirtualStorages: []*config.VirtualStorage{ 1459 { 1460 Name: testhelper.DefaultStorageName, 1461 }, 1462 }, 1463 } 1464 1465 type gitalyNode struct { 1466 mock *nodes.MockNode 1467 operationServer *mockOperationServer 1468 } 1469 1470 _, repoProto, _ := testcfg.BuildWithRepo(t) 1471 1472 for _, tc := range []struct { 1473 desc string 1474 errByNode map[string]error 1475 expectedErr error 1476 }{ 1477 { 1478 desc: "no errors", 1479 }, 1480 { 1481 desc: "primary error gets forwarded", 1482 errByNode: map[string]error{ 1483 "primary": errors.New("foo"), 1484 }, 1485 expectedErr: status.Error(codes.Unknown, "foo"), 1486 }, 1487 { 1488 desc: "secondary error gets ignored", 1489 errByNode: map[string]error{ 1490 "secondary-1": errors.New("foo"), 1491 }, 1492 }, 1493 { 1494 desc: "primary error has precedence", 1495 errByNode: map[string]error{ 1496 "primary": errors.New("primary"), 1497 "secondary-1": errors.New("secondary-1"), 1498 "secondary-2": errors.New("secondary-2"), 1499 }, 1500 expectedErr: status.Error(codes.Unknown, "primary"), 1501 }, 1502 } { 1503 t.Run(tc.desc, func(t *testing.T) { 1504 ctx, cleanup := testhelper.Context() 1505 defer cleanup() 1506 1507 var wg sync.WaitGroup 1508 gitalies := make(map[string]gitalyNode) 1509 for _, gitaly := range []string{"primary", "secondary-1", "secondary-2"} { 1510 gitaly := gitaly 1511 1512 cfg := testcfg.Build(t, testcfg.WithStorages(gitaly)) 1513 1514 operationServer := &mockOperationServer{ 1515 t: t, 1516 wg: &wg, 1517 } 1518 1519 addr := testserver.RunGitalyServer(t, cfg, nil, func(srv *grpc.Server, deps *service.Dependencies) { 1520 gitalypb.RegisterOperationServiceServer(srv, operationServer) 1521 }, testserver.WithDiskCache(&mockDiskCache{}), testserver.WithDisablePraefect()) 1522 1523 conn, err := client.DialContext(ctx, addr, []grpc.DialOption{ 1524 grpc.WithDefaultCallOptions(grpc.ForceCodec(proxy.NewCodec())), 1525 }) 1526 require.NoError(t, err) 1527 defer conn.Close() 1528 1529 gitalies[gitaly] = gitalyNode{ 1530 mock: &nodes.MockNode{ 1531 Conn: conn, 1532 Healthy: true, 1533 GetStorageMethod: func() string { return gitaly }, 1534 }, 1535 operationServer: operationServer, 1536 } 1537 1538 praefectConfig.VirtualStorages[0].Nodes = append(praefectConfig.VirtualStorages[0].Nodes, &config.Node{ 1539 Address: addr, 1540 Storage: gitaly, 1541 }) 1542 } 1543 1544 praefectConn, _, cleanup := runPraefectServer(t, ctx, praefectConfig, buildOptions{ 1545 // Set up a mock manager which sets up primary/secondaries and pretends that all nodes are 1546 // healthy. We need fixed roles and unhealthy nodes will not take part in transactions. 1547 withNodeMgr: &nodes.MockManager{ 1548 Storage: testhelper.DefaultStorageName, 1549 GetShardFunc: func(shardName string) (nodes.Shard, error) { 1550 require.Equal(t, testhelper.DefaultStorageName, shardName) 1551 return nodes.Shard{ 1552 Primary: gitalies["primary"].mock, 1553 Secondaries: []nodes.Node{ 1554 gitalies["secondary-1"].mock, 1555 gitalies["secondary-2"].mock, 1556 }, 1557 }, nil 1558 }, 1559 }, 1560 // Set up a mock repsoitory store pretending that all nodes are consistent. Only consistent 1561 // nodes will take part in transactions. 1562 withRepoStore: datastore.MockRepositoryStore{ 1563 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 1564 return map[string]struct{}{"primary": {}, "secondary-1": {}, "secondary-2": {}}, nil 1565 }, 1566 }, 1567 }) 1568 defer cleanup() 1569 1570 for name, node := range gitalies { 1571 wg.Add(1) 1572 node.operationServer.err = tc.errByNode[name] 1573 node.operationServer.called = false 1574 } 1575 1576 _, err := gitalypb.NewOperationServiceClient(praefectConn).UserCreateBranch(ctx, 1577 &gitalypb.UserCreateBranchRequest{ 1578 Repository: repoProto, 1579 }) 1580 testassert.GrpcEqualErr(t, tc.expectedErr, err) 1581 1582 for _, node := range gitalies { 1583 require.True(t, node.operationServer.called, "expected gitaly %q to have been called", node.mock.GetStorage()) 1584 } 1585 }) 1586 } 1587} 1588 1589type mockTransaction struct { 1590 nodeStates map[string]transactions.VoteResult 1591 subtransactions int 1592 didVote map[string]bool 1593} 1594 1595func (t mockTransaction) ID() uint64 { 1596 return 0 1597} 1598 1599func (t mockTransaction) CountSubtransactions() int { 1600 return t.subtransactions 1601} 1602 1603func (t mockTransaction) DidVote(node string) bool { 1604 return t.didVote[node] 1605} 1606 1607func (t mockTransaction) State() (map[string]transactions.VoteResult, error) { 1608 return t.nodeStates, nil 1609} 1610 1611func TestGetUpdatedAndOutdatedSecondaries(t *testing.T) { 1612 type node struct { 1613 name string 1614 state transactions.VoteResult 1615 err error 1616 } 1617 1618 ctx, cancel := testhelper.Context() 1619 defer cancel() 1620 1621 anyErr := errors.New("arbitrary error") 1622 1623 for _, tc := range []struct { 1624 desc string 1625 primary node 1626 secondaries []node 1627 replicas []string 1628 subtransactions int 1629 didVote map[string]bool 1630 expectedPrimaryDirtied bool 1631 expectedOutdated []string 1632 expectedUpdated []string 1633 expectedMetrics map[string]int 1634 }{ 1635 { 1636 desc: "single committed node", 1637 primary: node{ 1638 name: "primary", 1639 state: transactions.VoteCommitted, 1640 }, 1641 didVote: map[string]bool{ 1642 "primary": true, 1643 }, 1644 subtransactions: 1, 1645 expectedPrimaryDirtied: true, 1646 }, 1647 { 1648 desc: "single failed node", 1649 primary: node{ 1650 name: "primary", 1651 state: transactions.VoteFailed, 1652 }, 1653 subtransactions: 1, 1654 }, 1655 { 1656 desc: "single erred node", 1657 primary: node{ 1658 name: "primary", 1659 err: anyErr, 1660 }, 1661 }, 1662 { 1663 desc: "single node without subtransactions", 1664 primary: node{ 1665 name: "primary", 1666 }, 1667 subtransactions: 0, 1668 expectedPrimaryDirtied: true, 1669 }, 1670 { 1671 desc: "single successful node with replica", 1672 primary: node{ 1673 name: "primary", 1674 state: transactions.VoteCommitted, 1675 }, 1676 replicas: []string{"replica"}, 1677 didVote: map[string]bool{ 1678 "primary": true, 1679 }, 1680 subtransactions: 1, 1681 expectedPrimaryDirtied: true, 1682 expectedOutdated: []string{"replica"}, 1683 expectedMetrics: map[string]int{ 1684 "outdated": 1, 1685 }, 1686 }, 1687 { 1688 desc: "single failing node with replica is not considered modified", 1689 primary: node{ 1690 name: "primary", 1691 state: transactions.VoteFailed, 1692 }, 1693 subtransactions: 1, 1694 }, 1695 { 1696 desc: "single erred node with replica", 1697 primary: node{ 1698 name: "primary", 1699 state: transactions.VoteCommitted, 1700 err: anyErr, 1701 }, 1702 replicas: []string{"replica"}, 1703 didVote: map[string]bool{ 1704 "primary": true, 1705 }, 1706 subtransactions: 1, 1707 expectedPrimaryDirtied: true, 1708 expectedOutdated: []string{"replica"}, 1709 expectedMetrics: map[string]int{ 1710 "outdated": 1, 1711 }, 1712 }, 1713 { 1714 desc: "single erred node without commit with replica", 1715 primary: node{ 1716 name: "primary", 1717 state: transactions.VoteCommitted, 1718 err: anyErr, 1719 }, 1720 replicas: []string{"replica"}, 1721 subtransactions: 1, 1722 expectedPrimaryDirtied: false, 1723 }, 1724 { 1725 desc: "single node without transaction with replica", 1726 primary: node{ 1727 name: "primary", 1728 }, 1729 replicas: []string{"replica"}, 1730 subtransactions: 0, 1731 expectedPrimaryDirtied: true, 1732 expectedOutdated: []string{"replica"}, 1733 expectedMetrics: map[string]int{ 1734 "outdated": 1, 1735 }, 1736 }, 1737 { 1738 desc: "multiple committed nodes", 1739 primary: node{ 1740 name: "primary", 1741 state: transactions.VoteCommitted, 1742 }, 1743 secondaries: []node{ 1744 {name: "s1", state: transactions.VoteCommitted}, 1745 {name: "s2", state: transactions.VoteCommitted}, 1746 }, 1747 didVote: map[string]bool{ 1748 "primary": true, 1749 }, 1750 subtransactions: 1, 1751 expectedPrimaryDirtied: true, 1752 expectedUpdated: []string{"s1", "s2"}, 1753 expectedMetrics: map[string]int{ 1754 "updated": 2, 1755 }, 1756 }, 1757 { 1758 desc: "multiple committed nodes with primary err", 1759 primary: node{ 1760 name: "primary", 1761 state: transactions.VoteCommitted, 1762 err: anyErr, 1763 }, 1764 secondaries: []node{ 1765 {name: "s1", state: transactions.VoteCommitted}, 1766 {name: "s2", state: transactions.VoteCommitted}, 1767 }, 1768 didVote: map[string]bool{ 1769 "primary": true, 1770 }, 1771 subtransactions: 1, 1772 expectedPrimaryDirtied: true, 1773 expectedOutdated: []string{"s1", "s2"}, 1774 expectedMetrics: map[string]int{ 1775 "node-error-status": 2, 1776 }, 1777 }, 1778 { 1779 desc: "multiple committed nodes with same error as primary", 1780 primary: node{ 1781 name: "primary", 1782 state: transactions.VoteCommitted, 1783 err: anyErr, 1784 }, 1785 secondaries: []node{ 1786 {name: "s1", state: transactions.VoteCommitted, err: anyErr}, 1787 {name: "s2", state: transactions.VoteCommitted, err: anyErr}, 1788 }, 1789 didVote: map[string]bool{ 1790 "primary": true, 1791 }, 1792 subtransactions: 1, 1793 expectedPrimaryDirtied: true, 1794 expectedUpdated: []string{"s1", "s2"}, 1795 expectedMetrics: map[string]int{ 1796 "updated": 2, 1797 }, 1798 }, 1799 { 1800 desc: "multiple committed nodes with different error as primary", 1801 primary: node{ 1802 name: "primary", 1803 state: transactions.VoteCommitted, 1804 err: anyErr, 1805 }, 1806 secondaries: []node{ 1807 {name: "s1", state: transactions.VoteCommitted, err: errors.New("somethingsomething")}, 1808 {name: "s2", state: transactions.VoteCommitted, err: anyErr}, 1809 }, 1810 didVote: map[string]bool{ 1811 "primary": true, 1812 }, 1813 subtransactions: 1, 1814 expectedPrimaryDirtied: true, 1815 expectedUpdated: []string{"s2"}, 1816 expectedOutdated: []string{"s1"}, 1817 expectedMetrics: map[string]int{ 1818 "node-error-status": 1, 1819 "updated": 1, 1820 }, 1821 }, 1822 { 1823 desc: "multiple committed nodes with secondary err", 1824 primary: node{ 1825 name: "primary", 1826 state: transactions.VoteCommitted, 1827 }, 1828 secondaries: []node{ 1829 {name: "s1", state: transactions.VoteCommitted, err: anyErr}, 1830 {name: "s2", state: transactions.VoteCommitted}, 1831 }, 1832 didVote: map[string]bool{ 1833 "primary": true, 1834 }, 1835 subtransactions: 1, 1836 expectedPrimaryDirtied: true, 1837 expectedUpdated: []string{"s2"}, 1838 expectedOutdated: []string{"s1"}, 1839 expectedMetrics: map[string]int{ 1840 "node-error-status": 1, 1841 "updated": 1, 1842 }, 1843 }, 1844 { 1845 desc: "multiple committed nodes with primary and missing secondary err", 1846 primary: node{ 1847 name: "primary", 1848 state: transactions.VoteCommitted, 1849 err: anyErr, 1850 }, 1851 secondaries: []node{ 1852 {name: "s1", state: transactions.VoteCommitted, err: anyErr}, 1853 {name: "s2", state: transactions.VoteCommitted}, 1854 }, 1855 didVote: map[string]bool{ 1856 "primary": true, 1857 }, 1858 subtransactions: 1, 1859 expectedPrimaryDirtied: true, 1860 expectedUpdated: []string{"s1"}, 1861 expectedOutdated: []string{"s2"}, 1862 expectedMetrics: map[string]int{ 1863 "node-error-status": 1, 1864 "updated": 1, 1865 }, 1866 }, 1867 { 1868 desc: "partial success", 1869 primary: node{ 1870 name: "primary", 1871 state: transactions.VoteCommitted, 1872 }, 1873 secondaries: []node{ 1874 {name: "s1", state: transactions.VoteFailed}, 1875 {name: "s2", state: transactions.VoteCommitted}, 1876 }, 1877 didVote: map[string]bool{ 1878 "primary": true, 1879 }, 1880 subtransactions: 1, 1881 expectedPrimaryDirtied: true, 1882 expectedUpdated: []string{"s2"}, 1883 expectedOutdated: []string{"s1"}, 1884 expectedMetrics: map[string]int{ 1885 "node-not-committed": 1, 1886 "updated": 1, 1887 }, 1888 }, 1889 { 1890 desc: "failure with (impossible) secondary success", 1891 primary: node{ 1892 name: "primary", 1893 state: transactions.VoteFailed, 1894 }, 1895 secondaries: []node{ 1896 {name: "s1", state: transactions.VoteFailed}, 1897 {name: "s2", state: transactions.VoteCommitted}, 1898 }, 1899 didVote: map[string]bool{ 1900 "primary": true, 1901 }, 1902 subtransactions: 1, 1903 expectedPrimaryDirtied: true, 1904 expectedOutdated: []string{"s1", "s2"}, 1905 expectedMetrics: map[string]int{ 1906 "primary-not-committed": 2, 1907 }, 1908 }, 1909 { 1910 desc: "failure with no primary votes", 1911 primary: node{ 1912 name: "primary", 1913 state: transactions.VoteFailed, 1914 }, 1915 secondaries: []node{ 1916 {name: "s1", state: transactions.VoteFailed}, 1917 {name: "s2", state: transactions.VoteCommitted}, 1918 }, 1919 didVote: map[string]bool{ 1920 "s1": true, 1921 "s2": true, 1922 }, 1923 subtransactions: 1, 1924 }, 1925 { 1926 desc: "multiple nodes without subtransactions", 1927 primary: node{ 1928 name: "primary", 1929 state: transactions.VoteFailed, 1930 }, 1931 secondaries: []node{ 1932 {name: "s1", state: transactions.VoteFailed}, 1933 {name: "s2", state: transactions.VoteCommitted}, 1934 }, 1935 subtransactions: 0, 1936 expectedPrimaryDirtied: true, 1937 expectedOutdated: []string{"s1", "s2"}, 1938 expectedMetrics: map[string]int{ 1939 "no-votes": 2, 1940 }, 1941 }, 1942 { 1943 desc: "multiple nodes with replica and partial failures", 1944 primary: node{ 1945 name: "primary", 1946 state: transactions.VoteCommitted, 1947 }, 1948 secondaries: []node{ 1949 {name: "s1", state: transactions.VoteFailed}, 1950 {name: "s2", state: transactions.VoteCommitted}, 1951 }, 1952 replicas: []string{"r1", "r2"}, 1953 didVote: map[string]bool{ 1954 "primary": true, 1955 }, 1956 subtransactions: 1, 1957 expectedPrimaryDirtied: true, 1958 expectedOutdated: []string{"s1", "r1", "r2"}, 1959 expectedUpdated: []string{"s2"}, 1960 expectedMetrics: map[string]int{ 1961 "node-not-committed": 1, 1962 "outdated": 2, 1963 "updated": 1, 1964 }, 1965 }, 1966 { 1967 desc: "multiple nodes with replica and partial err", 1968 primary: node{ 1969 name: "primary", 1970 state: transactions.VoteCommitted, 1971 }, 1972 secondaries: []node{ 1973 {name: "s1", state: transactions.VoteFailed}, 1974 {name: "s2", state: transactions.VoteCommitted, err: anyErr}, 1975 }, 1976 replicas: []string{"r1", "r2"}, 1977 didVote: map[string]bool{ 1978 "primary": true, 1979 }, 1980 subtransactions: 1, 1981 expectedPrimaryDirtied: true, 1982 expectedOutdated: []string{"s1", "s2", "r1", "r2"}, 1983 expectedMetrics: map[string]int{ 1984 "node-error-status": 1, 1985 "node-not-committed": 1, 1986 "outdated": 2, 1987 }, 1988 }, 1989 } { 1990 t.Run(tc.desc, func(t *testing.T) { 1991 nodes := append(tc.secondaries, tc.primary) 1992 voters := make([]transactions.Voter, len(nodes)) 1993 1994 states := make(map[string]transactions.VoteResult) 1995 nodeErrors := &nodeErrors{ 1996 errByNode: make(map[string]error), 1997 } 1998 1999 for i, node := range nodes { 2000 voters[i] = transactions.Voter{ 2001 Name: node.name, 2002 Votes: 1, 2003 } 2004 states[node.name] = node.state 2005 nodeErrors.errByNode[node.name] = node.err 2006 } 2007 2008 transaction := mockTransaction{ 2009 nodeStates: states, 2010 subtransactions: tc.subtransactions, 2011 didVote: tc.didVote, 2012 } 2013 2014 route := RepositoryMutatorRoute{ 2015 Primary: RouterNode{ 2016 Storage: tc.primary.name, 2017 }, 2018 } 2019 for _, secondary := range tc.secondaries { 2020 route.Secondaries = append(route.Secondaries, RouterNode{ 2021 Storage: secondary.name, 2022 }) 2023 } 2024 route.ReplicationTargets = append(route.ReplicationTargets, tc.replicas...) 2025 2026 metric := prometheus.NewCounterVec(prometheus.CounterOpts{ 2027 Name: "stub", Help: "help", 2028 }, []string{"reason"}) 2029 2030 primaryDirtied, updated, outdated := getUpdatedAndOutdatedSecondaries(ctx, route, transaction, nodeErrors, metric) 2031 require.Equal(t, tc.expectedPrimaryDirtied, primaryDirtied) 2032 require.ElementsMatch(t, tc.expectedUpdated, updated) 2033 require.ElementsMatch(t, tc.expectedOutdated, outdated) 2034 2035 expectedMetrics := "# HELP stub help\n# TYPE stub counter\n" 2036 for metric, value := range tc.expectedMetrics { 2037 expectedMetrics += fmt.Sprintf("stub{reason=\"%s\"} %d\n", metric, value) 2038 } 2039 2040 require.NoError(t, testutil.CollectAndCompare(metric, strings.NewReader(expectedMetrics))) 2041 }) 2042 } 2043} 2044 2045func TestNewRequestFinalizer_contextIsDisjointedFromTheRPC(t *testing.T) { 2046 type ctxKey struct{} 2047 2048 parentDeadline := time.Now() 2049 ctx, cancel := context.WithDeadline(context.WithValue(context.Background(), ctxKey{}, "value"), parentDeadline) 2050 cancel() 2051 2052 requireSuppressedCancellation := func(t testing.TB, ctx context.Context) { 2053 deadline, ok := ctx.Deadline() 2054 require.True(t, ok) 2055 require.NotEqual(t, parentDeadline, deadline) 2056 require.Equal(t, ctx.Value(ctxKey{}), "value") 2057 require.Nil(t, ctx.Err()) 2058 select { 2059 case <-ctx.Done(): 2060 t.Fatal("context should not be canceled if the parent is canceled") 2061 default: 2062 require.NotNil(t, ctx.Done()) 2063 } 2064 } 2065 2066 err := errors.New("error") 2067 2068 for _, tc := range []struct { 2069 change datastore.ChangeType 2070 errMsg string 2071 }{ 2072 { 2073 change: datastore.UpdateRepo, 2074 errMsg: "increment generation: error", 2075 }, 2076 { 2077 change: datastore.RenameRepo, 2078 errMsg: "rename repository: error", 2079 }, 2080 { 2081 change: datastore.DeleteRepo, 2082 errMsg: "delete repository: error", 2083 }, 2084 { 2085 change: "replication jobs only", 2086 errMsg: "enqueue replication event: error", 2087 }, 2088 } { 2089 t.Run(string(tc.change), func(t *testing.T) { 2090 require.EqualError(t, 2091 NewCoordinator( 2092 &datastore.MockReplicationEventQueue{ 2093 EnqueueFunc: func(ctx context.Context, _ datastore.ReplicationEvent) (datastore.ReplicationEvent, error) { 2094 requireSuppressedCancellation(t, ctx) 2095 return datastore.ReplicationEvent{}, err 2096 }, 2097 }, 2098 datastore.MockRepositoryStore{ 2099 IncrementGenerationFunc: func(ctx context.Context, _, _, _ string, _ []string) error { 2100 requireSuppressedCancellation(t, ctx) 2101 return err 2102 }, 2103 RenameRepositoryFunc: func(ctx context.Context, _, _, _, _ string) error { 2104 requireSuppressedCancellation(t, ctx) 2105 return err 2106 }, 2107 DeleteRepositoryFunc: func(ctx context.Context, _, _ string, _ []string) error { 2108 requireSuppressedCancellation(t, ctx) 2109 return err 2110 }, 2111 CreateRepositoryFunc: func(ctx context.Context, _ int64, _, _, _ string, _, _ []string, _, _ bool) error { 2112 requireSuppressedCancellation(t, ctx) 2113 return err 2114 }, 2115 }, 2116 nil, 2117 nil, 2118 config.Config{}, 2119 nil, 2120 ).newRequestFinalizer( 2121 ctx, 2122 0, 2123 "virtual storage", 2124 &gitalypb.Repository{}, 2125 "primary", 2126 []string{}, 2127 []string{"secondary"}, 2128 tc.change, 2129 datastore.Params{"RelativePath": "relative-path"}, 2130 "rpc-name", 2131 )(), 2132 tc.errMsg, 2133 ) 2134 }) 2135 } 2136} 2137 2138func TestStreamParametersContext(t *testing.T) { 2139 // Because we're using NewFeatureFlag, they'll end up in the All array. 2140 enabledFF := featureflag.NewFeatureFlag("default-enabled", true) 2141 disabledFF := featureflag.NewFeatureFlag("default-disabled", false) 2142 2143 type expectedFlag struct { 2144 flag featureflag.FeatureFlag 2145 enabled bool 2146 } 2147 2148 expectedFlags := func(overrides ...expectedFlag) []expectedFlag { 2149 flagValues := map[featureflag.FeatureFlag]bool{} 2150 for _, flag := range featureflag.All { 2151 flagValues[flag] = flag.OnByDefault 2152 } 2153 for _, override := range overrides { 2154 flagValues[override.flag] = override.enabled 2155 } 2156 2157 expectedFlags := make([]expectedFlag, 0, len(flagValues)) 2158 for flag, value := range flagValues { 2159 expectedFlags = append(expectedFlags, expectedFlag{ 2160 flag: flag, enabled: value, 2161 }) 2162 } 2163 2164 return expectedFlags 2165 } 2166 2167 metadataForFlags := func(flags []expectedFlag) metadata.MD { 2168 pairs := []string{} 2169 for _, flag := range flags { 2170 pairs = append(pairs, flag.flag.MetadataKey(), strconv.FormatBool(flag.enabled)) 2171 } 2172 return metadata.Pairs(pairs...) 2173 } 2174 2175 for _, tc := range []struct { 2176 desc string 2177 setupContext func() context.Context 2178 expectedIncomingMD metadata.MD 2179 expectedOutgoingMD metadata.MD 2180 expectedFlags []expectedFlag 2181 }{ 2182 { 2183 desc: "no metadata", 2184 setupContext: func() context.Context { 2185 return context.Background() 2186 }, 2187 expectedFlags: expectedFlags(), 2188 expectedOutgoingMD: metadataForFlags(expectedFlags()), 2189 }, 2190 { 2191 desc: "with incoming metadata", 2192 setupContext: func() context.Context { 2193 ctx := context.Background() 2194 ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("key", "value")) 2195 return ctx 2196 }, 2197 expectedIncomingMD: metadata.Pairs("key", "value"), 2198 expectedOutgoingMD: metadata.Join( 2199 metadata.Pairs("key", "value"), 2200 metadataForFlags(expectedFlags()), 2201 ), 2202 expectedFlags: expectedFlags(), 2203 }, 2204 { 2205 desc: "with outgoing metadata", 2206 setupContext: func() context.Context { 2207 ctx := context.Background() 2208 ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("key", "value")) 2209 return ctx 2210 }, 2211 expectedOutgoingMD: metadata.Join( 2212 metadata.Pairs("key", "value"), 2213 metadataForFlags(expectedFlags()), 2214 ), 2215 expectedFlags: expectedFlags(), 2216 }, 2217 { 2218 desc: "with incoming and outgoing metadata", 2219 setupContext: func() context.Context { 2220 ctx := context.Background() 2221 ctx = metadata.NewIncomingContext(ctx, metadata.Pairs("incoming", "value")) 2222 ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("outgoing", "value")) 2223 return ctx 2224 }, 2225 // This behaviour is quite subtle: in the previous test case where we only 2226 // have outgoing metadata, we retain it. But in case we have both incoming 2227 // and outgoing we'd discard the outgoing metadata altogether. It is 2228 // debatable whether this is a bug or feature, so I'll just document this 2229 // weird edge case here for now. 2230 expectedIncomingMD: metadata.Pairs("incoming", "value"), 2231 expectedOutgoingMD: metadata.Join( 2232 metadata.Pairs("incoming", "value"), 2233 metadataForFlags(expectedFlags()), 2234 ), 2235 expectedFlags: expectedFlags(), 2236 }, 2237 { 2238 desc: "with flags set to their default values", 2239 setupContext: func() context.Context { 2240 ctx := context.Background() 2241 ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, enabledFF) 2242 ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, disabledFF) 2243 return ctx 2244 }, 2245 expectedIncomingMD: metadata.Pairs( 2246 enabledFF.MetadataKey(), "true", 2247 disabledFF.MetadataKey(), "false", 2248 ), 2249 expectedOutgoingMD: metadata.Join( 2250 metadataForFlags(expectedFlags()), 2251 metadata.Pairs( 2252 enabledFF.MetadataKey(), "true", 2253 disabledFF.MetadataKey(), "false", 2254 ), 2255 ), 2256 expectedFlags: expectedFlags(), 2257 }, 2258 { 2259 desc: "with flags set to their reverse default values", 2260 setupContext: func() context.Context { 2261 ctx := context.Background() 2262 ctx = featureflag.IncomingCtxWithDisabledFeatureFlag(ctx, enabledFF) 2263 ctx = featureflag.IncomingCtxWithFeatureFlag(ctx, disabledFF) 2264 return ctx 2265 }, 2266 expectedIncomingMD: metadata.Pairs( 2267 enabledFF.MetadataKey(), "false", 2268 disabledFF.MetadataKey(), "true", 2269 ), 2270 expectedOutgoingMD: metadata.Join( 2271 metadataForFlags(expectedFlags( 2272 expectedFlag{flag: enabledFF, enabled: false}, 2273 expectedFlag{flag: disabledFF, enabled: true}, 2274 )), 2275 metadata.Pairs( 2276 enabledFF.MetadataKey(), "false", 2277 disabledFF.MetadataKey(), "true", 2278 ), 2279 ), 2280 expectedFlags: expectedFlags( 2281 expectedFlag{flag: enabledFF, enabled: false}, 2282 expectedFlag{flag: disabledFF, enabled: true}, 2283 ), 2284 }, 2285 { 2286 desc: "mixed flags and metadata", 2287 setupContext: func() context.Context { 2288 ctx := context.Background() 2289 ctx = metadata.NewIncomingContext(ctx, metadata.Pairs( 2290 disabledFF.MetadataKey(), "true", 2291 "incoming", "value"), 2292 ) 2293 return ctx 2294 }, 2295 expectedIncomingMD: metadata.Pairs( 2296 disabledFF.MetadataKey(), "true", 2297 "incoming", "value", 2298 ), 2299 expectedOutgoingMD: metadata.Join( 2300 metadataForFlags(expectedFlags( 2301 expectedFlag{flag: disabledFF, enabled: true}, 2302 )), 2303 metadata.Pairs( 2304 disabledFF.MetadataKey(), "true", 2305 "incoming", "value", 2306 ), 2307 ), 2308 expectedFlags: expectedFlags( 2309 expectedFlag{flag: disabledFF, enabled: true}, 2310 ), 2311 }, 2312 } { 2313 t.Run(tc.desc, func(t *testing.T) { 2314 ctx := streamParametersContext(tc.setupContext()) 2315 2316 incomingMD, ok := metadata.FromIncomingContext(ctx) 2317 if tc.expectedIncomingMD == nil { 2318 require.False(t, ok) 2319 } else { 2320 require.True(t, ok) 2321 } 2322 require.Equal(t, tc.expectedIncomingMD, incomingMD) 2323 2324 outgoingMD, ok := metadata.FromOutgoingContext(ctx) 2325 if tc.expectedOutgoingMD == nil { 2326 require.False(t, ok) 2327 } else { 2328 require.True(t, ok) 2329 } 2330 require.Equal(t, tc.expectedOutgoingMD, outgoingMD) 2331 2332 incomingCtx := gitaly_metadata.OutgoingToIncoming(ctx) 2333 for _, expectedFlag := range tc.expectedFlags { 2334 require.Equal(t, expectedFlag.enabled, expectedFlag.flag.IsEnabled(incomingCtx)) 2335 } 2336 }) 2337 } 2338} 2339