1package nodes 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "net" 8 "testing" 9 "time" 10 11 "github.com/stretchr/testify/require" 12 "gitlab.com/gitlab-org/gitaly/v14/client" 13 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/config" 14 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" 15 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/protoregistry" 16 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" 17 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper/promtest" 18 "google.golang.org/grpc" 19 "google.golang.org/grpc/health" 20 "google.golang.org/grpc/health/grpc_health_v1" 21) 22 23type nodeAssertion struct { 24 Storage string 25 Address string 26} 27 28type shardAssertion struct { 29 Primary *nodeAssertion 30 Secondaries []nodeAssertion 31} 32 33func toNodeAssertion(n Node) *nodeAssertion { 34 if n == nil { 35 return nil 36 } 37 38 return &nodeAssertion{ 39 Storage: n.GetStorage(), 40 Address: n.GetAddress(), 41 } 42} 43 44func assertShard(t *testing.T, exp shardAssertion, act Shard) { 45 t.Helper() 46 47 actSecondaries := make([]nodeAssertion, 0, len(act.Secondaries)) 48 for _, n := range act.Secondaries { 49 actSecondaries = append(actSecondaries, *toNodeAssertion(n)) 50 } 51 52 require.Equal(t, exp, shardAssertion{ 53 Primary: toNodeAssertion(act.Primary), 54 Secondaries: actSecondaries, 55 }) 56} 57 58func TestNodeStatus(t *testing.T) { 59 socket := testhelper.GetTemporaryGitalySocketFileName(t) 60 healthSvr := testhelper.NewServerWithHealth(t, socket) 61 62 cc, err := grpc.Dial( 63 "unix://"+socket, 64 grpc.WithInsecure(), 65 ) 66 67 require.NoError(t, err) 68 69 mockHistogramVec := promtest.NewMockHistogramVec() 70 71 storageName := "default" 72 cs := newConnectionStatus(config.Node{Storage: storageName}, cc, testhelper.DiscardTestEntry(t), mockHistogramVec, nil) 73 74 ctx, cancel := testhelper.Context() 75 defer cancel() 76 77 var expectedLabels [][]string 78 for i := 0; i < healthcheckThreshold; i++ { 79 status, err := cs.CheckHealth(ctx) 80 81 require.NoError(t, err) 82 require.True(t, status) 83 expectedLabels = append(expectedLabels, []string{storageName}) 84 } 85 86 require.Equal(t, expectedLabels, mockHistogramVec.LabelsCalled()) 87 require.Len(t, mockHistogramVec.Observer().Observed(), healthcheckThreshold) 88 89 healthSvr.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 90 91 status, err := cs.CheckHealth(ctx) 92 require.NoError(t, err) 93 require.False(t, status) 94} 95 96func TestManagerFailoverDisabledElectionStrategySQL(t *testing.T) { 97 const virtualStorageName = "virtual-storage-0" 98 const primaryStorage = "praefect-internal-0" 99 socket0, socket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 100 virtualStorage := &config.VirtualStorage{ 101 Name: virtualStorageName, 102 Nodes: []*config.Node{ 103 { 104 Storage: primaryStorage, 105 Address: "unix://" + socket0, 106 }, 107 { 108 Storage: "praefect-internal-1", 109 Address: "unix://" + socket1, 110 }, 111 }, 112 } 113 114 healthSrv := testhelper.NewServerWithHealth(t, socket0) 115 testhelper.NewServerWithHealth(t, socket1) 116 117 conf := config.Config{ 118 Failover: config.Failover{Enabled: false, ElectionStrategy: config.ElectionStrategySQL}, 119 VirtualStorages: []*config.VirtualStorage{virtualStorage}, 120 } 121 nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) 122 require.NoError(t, err) 123 124 nm.Start(time.Millisecond, time.Millisecond) 125 126 ctx, cancel := testhelper.Context() 127 defer cancel() 128 129 shard, err := nm.GetShard(ctx, virtualStorageName) 130 require.NoError(t, err) 131 require.Equal(t, primaryStorage, shard.Primary.GetStorage()) 132 133 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 134 nm.checkShards() 135 136 _, err = nm.GetShard(ctx, virtualStorageName) 137 require.Error(t, err) 138 require.Equal(t, ErrPrimaryNotHealthy, err) 139} 140 141func TestDialWithUnhealthyNode(t *testing.T) { 142 primaryLn, err := net.Listen("unix", testhelper.GetTemporaryGitalySocketFileName(t)) 143 require.NoError(t, err) 144 145 primaryAddress := "unix://" + primaryLn.Addr().String() 146 const secondaryAddress = "unix://does-not-exist" 147 const storageName = "default" 148 149 conf := config.Config{ 150 VirtualStorages: []*config.VirtualStorage{ 151 { 152 Name: storageName, 153 Nodes: []*config.Node{ 154 { 155 Storage: "starts", 156 Address: primaryAddress, 157 }, 158 { 159 Storage: "never-starts", 160 Address: secondaryAddress, 161 }, 162 }, 163 }, 164 }, 165 } 166 167 testhelper.NewHealthServerWithListener(t, primaryLn) 168 169 mgr, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, nil, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) 170 require.NoError(t, err) 171 172 mgr.Start(1*time.Millisecond, 1*time.Millisecond) 173 174 ctx, cancel := testhelper.Context() 175 defer cancel() 176 177 shard, err := mgr.GetShard(ctx, storageName) 178 require.NoError(t, err) 179 assertShard(t, shardAssertion{ 180 Primary: &nodeAssertion{Storage: "starts", Address: primaryAddress}, 181 Secondaries: []nodeAssertion{{Storage: "never-starts", Address: secondaryAddress}}, 182 }, shard) 183} 184 185func TestNodeManager(t *testing.T) { 186 internalSocket0, internalSocket1 := testhelper.GetTemporaryGitalySocketFileName(t), testhelper.GetTemporaryGitalySocketFileName(t) 187 healthSrv0 := testhelper.NewServerWithHealth(t, internalSocket0) 188 healthSrv1 := testhelper.NewServerWithHealth(t, internalSocket1) 189 190 node1 := &config.Node{ 191 Storage: "praefect-internal-0", 192 Address: "unix://" + internalSocket0, 193 } 194 195 node2 := &config.Node{ 196 Storage: "praefect-internal-1", 197 Address: "unix://" + internalSocket1, 198 } 199 200 virtualStorages := []*config.VirtualStorage{ 201 { 202 Name: "virtual-storage-0", 203 Nodes: []*config.Node{node1, node2}, 204 }, 205 } 206 207 confWithFailover := config.Config{ 208 VirtualStorages: virtualStorages, 209 Failover: config.Failover{Enabled: true}, 210 } 211 confWithoutFailover := config.Config{ 212 VirtualStorages: virtualStorages, 213 Failover: config.Failover{Enabled: false}, 214 } 215 216 mockHistogram := promtest.NewMockHistogramVec() 217 nm, err := NewManager(testhelper.DiscardTestEntry(t), confWithFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil) 218 require.NoError(t, err) 219 220 nmWithoutFailover, err := NewManager(testhelper.DiscardTestEntry(t), confWithoutFailover, nil, nil, mockHistogram, protoregistry.GitalyProtoPreregistered, nil, nil) 221 require.NoError(t, err) 222 223 // monitoring period set to 1 hour as we execute health checks by hands in this test 224 nm.Start(0, time.Hour) 225 nmWithoutFailover.Start(0, time.Hour) 226 227 ctx, cancel := testhelper.Context() 228 defer cancel() 229 230 shardWithoutFailover, err := nmWithoutFailover.GetShard(ctx, "virtual-storage-0") 231 require.NoError(t, err) 232 233 shard, err := nm.GetShard(ctx, "virtual-storage-0") 234 require.NoError(t, err) 235 236 // shard without failover and shard with failover should be the same 237 initialState := shardAssertion{ 238 Primary: &nodeAssertion{node1.Storage, node1.Address}, 239 Secondaries: []nodeAssertion{{node2.Storage, node2.Address}}, 240 } 241 assertShard(t, initialState, shard) 242 assertShard(t, initialState, shardWithoutFailover) 243 244 const unhealthyCheckCount = 1 245 const healthyCheckCount = healthcheckThreshold 246 checkShards := func(count int) { 247 for i := 0; i < count; i++ { 248 nm.checkShards() 249 nmWithoutFailover.checkShards() 250 } 251 } 252 253 healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 254 checkShards(unhealthyCheckCount) 255 256 labelsCalled := mockHistogram.LabelsCalled() 257 for _, node := range virtualStorages[0].Nodes { 258 require.Contains(t, labelsCalled, []string{node.Storage}) 259 } 260 261 // since the failover is disabled the attempt to get a shard with unhealthy primary fails 262 _, err = nmWithoutFailover.GetShard(ctx, "virtual-storage-0") 263 require.Error(t, err) 264 require.Equal(t, ErrPrimaryNotHealthy, err) 265 266 // since the primary is unhealthy, we expect checkShards to demote primary to secondary, and promote the healthy 267 // secondary to primary 268 shard, err = nm.GetShard(ctx, "virtual-storage-0") 269 require.NoError(t, err) 270 // shard with failover should have promoted a secondary to primary and demoted the primary to a secondary 271 assertShard(t, shardAssertion{ 272 Primary: &nodeAssertion{node2.Storage, node2.Address}, 273 Secondaries: []nodeAssertion{{node1.Storage, node1.Address}}, 274 }, shard) 275 276 // failing back to the original primary 277 healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 278 healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 279 checkShards(healthyCheckCount) 280 281 shard, err = nm.GetShard(ctx, "virtual-storage-0") 282 require.NoError(t, err) 283 284 assertShard(t, shardAssertion{ 285 Primary: &nodeAssertion{node1.Storage, node1.Address}, 286 Secondaries: []nodeAssertion{{node2.Storage, node2.Address}}, 287 }, shard) 288 289 primary, err := nm.GetPrimary(ctx, "virtual-storage-0", "") 290 require.NoError(t, err) 291 require.Equal(t, shard.Primary.GetStorage(), primary) 292 293 healthSrv0.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 294 healthSrv1.SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 295 checkShards(unhealthyCheckCount) 296 297 _, err = nm.GetShard(ctx, "virtual-storage-0") 298 require.Error(t, err, "should return error since no nodes are healthy") 299 300 _, err = nm.GetPrimary(ctx, "virtual-storage-0", "") 301 require.Equal(t, ErrPrimaryNotHealthy, err) 302} 303 304func TestMgr_GetSyncedNode(t *testing.T) { 305 const count = 3 306 const virtualStorage = "virtual-storage-0" 307 const repoPath = "path/1" 308 309 var healthSrvs [count]*health.Server 310 var nodes [count]*config.Node 311 for i := 0; i < count; i++ { 312 socket := testhelper.GetTemporaryGitalySocketFileName(t) 313 healthSrvs[i] = testhelper.NewServerWithHealth(t, socket) 314 nodes[i] = &config.Node{Storage: fmt.Sprintf("gitaly-%d", i), Address: "unix://" + socket} 315 } 316 317 conf := config.Config{ 318 VirtualStorages: []*config.VirtualStorage{{Name: virtualStorage, Nodes: nodes[:]}}, 319 } 320 321 ctx, cancel := testhelper.Context() 322 defer cancel() 323 324 var consistentSecondariesErr error 325 consistentStorages := map[string]struct{}{} 326 327 verify := func(failover bool, scenario func(t *testing.T, nm Manager, rs datastore.RepositoryStore)) func(*testing.T) { 328 conf.Failover.Enabled = failover 329 rs := datastore.MockRepositoryStore{ 330 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 331 return consistentStorages, consistentSecondariesErr 332 }, 333 } 334 335 nm, err := NewManager(testhelper.DiscardTestEntry(t), conf, nil, rs, promtest.NewMockHistogramVec(), protoregistry.GitalyProtoPreregistered, nil, nil) 336 require.NoError(t, err) 337 338 for i := range healthSrvs { 339 healthSrvs[i].SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 340 } 341 nm.Start(0, time.Hour) 342 343 return func(t *testing.T) { scenario(t, nm, rs) } 344 } 345 346 t.Run("unknown virtual storage", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 347 consistentSecondariesErr = ErrVirtualStorageNotExist 348 _, err := nm.GetSyncedNode(ctx, "virtual-storage-unknown", "stub") 349 require.True(t, errors.Is(err, ErrVirtualStorageNotExist)) 350 })) 351 352 t.Run("state is undefined", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 353 consistentSecondariesErr = nil 354 node, err := nm.GetSyncedNode(ctx, virtualStorage, "no/matter") 355 require.NoError(t, err) 356 require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "") 357 })) 358 359 t.Run("no up to date storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 360 consistentStorages = nil 361 362 node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath) 363 require.NoError(t, err) 364 require.Equal(t, "gitaly-0", node.GetStorage()) 365 })) 366 367 t.Run("multiple storages up to date", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 368 consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}, "gitaly-2": {}} 369 370 chosen := map[Node]struct{}{} 371 for i := 0; i < 1000 && len(chosen) < 2; i++ { 372 node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath) 373 require.NoError(t, err) 374 chosen[node] = struct{}{} 375 } 376 if len(chosen) < 2 { 377 require.FailNow(t, "no distribution in too many attempts") 378 } 379 })) 380 381 t.Run("single secondary storage up to date but unhealthy", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 382 consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} 383 384 healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 385 386 shard, err := nm.GetShard(ctx, virtualStorage) 387 require.NoError(t, err) 388 389 gitaly1, err := shard.GetNode("gitaly-1") 390 require.NoError(t, err) 391 392 ok, err := gitaly1.CheckHealth(ctx) 393 require.NoError(t, err) 394 require.False(t, ok) 395 396 node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath) 397 require.NoError(t, err) 398 require.Equal(t, conf.VirtualStorages[0].Nodes[0].Address, node.GetAddress(), "secondary shouldn't be chosen as it is unhealthy") 399 })) 400 401 t.Run("no healthy storages", verify(true, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 402 consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} 403 404 healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 405 healthSrvs[1].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 406 407 shard, err := nm.GetShard(ctx, virtualStorage) 408 require.NoError(t, err) 409 410 gitaly0, err := shard.GetNode("gitaly-0") 411 require.NoError(t, err) 412 413 gitaly0OK, err := gitaly0.CheckHealth(ctx) 414 require.NoError(t, err) 415 require.False(t, gitaly0OK) 416 417 gitaly1, err := shard.GetNode("gitaly-1") 418 require.NoError(t, err) 419 420 gitaly1OK, err := gitaly1.CheckHealth(ctx) 421 require.NoError(t, err) 422 require.False(t, gitaly1OK) 423 424 _, err = nm.GetSyncedNode(ctx, virtualStorage, repoPath) 425 require.True(t, errors.Is(err, ErrPrimaryNotHealthy)) 426 })) 427 428 t.Run("disabled failover doesn't disable health state", verify(false, func(t *testing.T, nm Manager, rs datastore.RepositoryStore) { 429 consistentStorages = map[string]struct{}{"gitaly-0": {}, "gitaly-1": {}} 430 431 shard, err := nm.GetShard(ctx, virtualStorage) 432 require.NoError(t, err) 433 434 gitaly0, err := shard.GetNode("gitaly-0") 435 require.NoError(t, err) 436 437 require.Equal(t, shard.Primary, gitaly0) 438 439 gitaly0OK, err := gitaly0.CheckHealth(ctx) 440 require.NoError(t, err) 441 require.True(t, gitaly0OK) 442 443 gitaly1, err := shard.GetNode("gitaly-1") 444 require.NoError(t, err) 445 446 gitaly1OK, err := gitaly1.CheckHealth(ctx) 447 require.NoError(t, err) 448 require.True(t, gitaly1OK) 449 450 healthSrvs[0].SetServingStatus("", grpc_health_v1.HealthCheckResponse_UNKNOWN) 451 gitaly0OK, err = gitaly0.CheckHealth(ctx) 452 require.NoError(t, err) 453 require.False(t, gitaly0OK, "primary should be unhealthy") 454 455 node, err := nm.GetSyncedNode(ctx, virtualStorage, repoPath) 456 require.NoError(t, err) 457 require.Equal(t, conf.VirtualStorages[0].Nodes[1].Address, node.GetAddress(), "primary shouldn't be chosen as it is unhealthy") 458 })) 459} 460 461func TestNodeStatus_IsHealthy(t *testing.T) { 462 checkNTimes := func(ctx context.Context, t *testing.T, ns *nodeStatus, n int) { 463 for i := 0; i < n; i++ { 464 _, err := ns.CheckHealth(ctx) 465 require.NoError(t, err) 466 } 467 } 468 469 socket := testhelper.GetTemporaryGitalySocketFileName(t) 470 address := "unix://" + socket 471 472 healthSrv := testhelper.NewServerWithHealth(t, socket) 473 474 clientConn, err := client.Dial(address, nil) 475 require.NoError(t, err) 476 defer func() { require.NoError(t, clientConn.Close()) }() 477 478 node := config.Node{Storage: "gitaly-0", Address: address} 479 480 ctx, cancel := testhelper.Context() 481 defer cancel() 482 483 logger := testhelper.DiscardTestLogger(t) 484 latencyHistMock := &promtest.MockHistogramVec{} 485 486 t.Run("unchecked node is unhealthy", func(t *testing.T) { 487 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 488 require.False(t, ns.IsHealthy()) 489 }) 490 491 t.Run("not enough check to consider it healthy", func(t *testing.T) { 492 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 493 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 494 checkNTimes(ctx, t, ns, healthcheckThreshold-1) 495 496 require.False(t, ns.IsHealthy()) 497 }) 498 499 t.Run("healthy", func(t *testing.T) { 500 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 501 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 502 checkNTimes(ctx, t, ns, healthcheckThreshold) 503 504 require.True(t, ns.IsHealthy()) 505 }) 506 507 t.Run("healthy turns into unhealthy after single failed check", func(t *testing.T) { 508 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 509 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 510 checkNTimes(ctx, t, ns, healthcheckThreshold) 511 512 require.True(t, ns.IsHealthy(), "node must be turned into healthy state") 513 514 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 515 checkNTimes(ctx, t, ns, 1) 516 517 require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state") 518 }) 519 520 t.Run("unhealthy turns into healthy after pre-define threshold of checks", func(t *testing.T) { 521 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 522 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 523 checkNTimes(ctx, t, ns, healthcheckThreshold) 524 525 require.True(t, ns.IsHealthy(), "node must be turned into healthy state") 526 527 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_NOT_SERVING) 528 checkNTimes(ctx, t, ns, 1) 529 530 require.False(t, ns.IsHealthy(), "node must be turned into unhealthy state") 531 532 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 533 for i := 1; i < healthcheckThreshold; i++ { 534 checkNTimes(ctx, t, ns, 1) 535 require.False(t, ns.IsHealthy(), "node must be unhealthy until defined threshold of checks complete positively") 536 } 537 checkNTimes(ctx, t, ns, 1) // the last check that must turn it into healthy state 538 539 require.True(t, ns.IsHealthy(), "node should be healthy again") 540 }) 541 542 t.Run("concurrent access has no races", func(t *testing.T) { 543 ns := newConnectionStatus(node, clientConn, logger, latencyHistMock, nil) 544 healthSrv.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) 545 546 t.Run("continuously does health checks - 1", func(t *testing.T) { 547 t.Parallel() 548 checkNTimes(ctx, t, ns, healthcheckThreshold) 549 }) 550 551 t.Run("continuously checks health - 1", func(t *testing.T) { 552 t.Parallel() 553 ns.IsHealthy() 554 }) 555 556 t.Run("continuously does health checks - 2", func(t *testing.T) { 557 t.Parallel() 558 checkNTimes(ctx, t, ns, healthcheckThreshold) 559 }) 560 561 t.Run("continuously checks health - 2", func(t *testing.T) { 562 t.Parallel() 563 ns.IsHealthy() 564 }) 565 }) 566} 567