1package praefect 2 3import ( 4 "context" 5 "sort" 6 "testing" 7 8 "github.com/stretchr/testify/require" 9 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/datastore" 10 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/nodes" 11 "gitlab.com/gitlab-org/gitaly/v14/internal/testhelper" 12 "google.golang.org/grpc" 13 "google.golang.org/grpc/metadata" 14) 15 16// StaticRepositoryAssignments is a static assignment of storages for each individual repository. 17type StaticRepositoryAssignments map[string]map[string][]string 18 19func (st StaticRepositoryAssignments) GetHostAssignments(ctx context.Context, virtualStorage, relativePath string) ([]string, error) { 20 vs, ok := st[virtualStorage] 21 if !ok { 22 return nil, nodes.ErrVirtualStorageNotExist 23 } 24 25 storages, ok := vs[relativePath] 26 if !ok { 27 return nil, errRepositoryNotFound 28 } 29 30 return storages, nil 31} 32 33// PrimaryGetter is an adapter to turn conforming functions in to a PrimaryGetter. 34type PrimaryGetterFunc func(ctx context.Context, virtualStorage, relativePath string) (string, error) 35 36func (fn PrimaryGetterFunc) GetPrimary(ctx context.Context, virtualStorage, relativePath string) (string, error) { 37 return fn(ctx, virtualStorage, relativePath) 38} 39 40func TestPerRepositoryRouter_RouteStorageAccessor(t *testing.T) { 41 ctx, cancel := testhelper.Context() 42 defer cancel() 43 44 for _, tc := range []struct { 45 desc string 46 virtualStorage string 47 numCandidates int 48 pickCandidate int 49 error error 50 node string 51 }{ 52 { 53 desc: "unknown virtual storage", 54 virtualStorage: "unknown", 55 error: nodes.ErrVirtualStorageNotExist, 56 }, 57 { 58 desc: "picks randomly first candidate", 59 virtualStorage: "virtual-storage-1", 60 numCandidates: 2, 61 pickCandidate: 0, 62 node: "valid-choice-1", 63 }, 64 { 65 desc: "picks randomly second candidate", 66 virtualStorage: "virtual-storage-1", 67 numCandidates: 2, 68 pickCandidate: 1, 69 node: "valid-choice-2", 70 }, 71 } { 72 t.Run(tc.desc, func(t *testing.T) { 73 conns := Connections{ 74 "virtual-storage-1": { 75 "valid-choice-1": &grpc.ClientConn{}, 76 "valid-choice-2": &grpc.ClientConn{}, 77 "unhealthy": &grpc.ClientConn{}, 78 }, 79 } 80 81 router := NewPerRepositoryRouter( 82 conns, 83 nil, 84 StaticHealthChecker{ 85 "virtual-storage-1": { 86 "valid-choice-1", 87 "valid-choice-2", 88 }, 89 }, 90 mockRandom{ 91 intnFunc: func(n int) int { 92 require.Equal(t, tc.numCandidates, n) 93 return tc.pickCandidate 94 }, 95 }, 96 nil, 97 nil, 98 nil, 99 ) 100 101 node, err := router.RouteStorageAccessor(ctx, tc.virtualStorage) 102 require.Equal(t, tc.error, err) 103 require.Equal(t, RouterNode{ 104 Storage: tc.node, 105 Connection: conns["virtual-storage-1"][tc.node], 106 }, node) 107 }) 108 } 109} 110 111func TestPerRepositoryRouter_RouteRepositoryAccessor(t *testing.T) { 112 for _, tc := range []struct { 113 desc string 114 virtualStorage string 115 healthyNodes StaticHealthChecker 116 metadata map[string]string 117 forcePrimary bool 118 numCandidates int 119 pickCandidate int 120 error error 121 node string 122 }{ 123 { 124 desc: "unknown virtual storage", 125 virtualStorage: "unknown", 126 error: nodes.ErrVirtualStorageNotExist, 127 }, 128 { 129 desc: "no healthy nodes", 130 virtualStorage: "virtual-storage-1", 131 healthyNodes: map[string][]string{}, 132 error: ErrNoHealthyNodes, 133 }, 134 { 135 desc: "primary picked randomly", 136 virtualStorage: "virtual-storage-1", 137 healthyNodes: map[string][]string{ 138 "virtual-storage-1": {"primary", "consistent-secondary"}, 139 }, 140 numCandidates: 2, 141 pickCandidate: 0, 142 node: "primary", 143 }, 144 { 145 desc: "secondary picked randomly", 146 virtualStorage: "virtual-storage-1", 147 healthyNodes: map[string][]string{ 148 "virtual-storage-1": {"primary", "consistent-secondary"}, 149 }, 150 numCandidates: 2, 151 pickCandidate: 1, 152 node: "consistent-secondary", 153 }, 154 { 155 desc: "secondary picked when primary is unhealthy", 156 virtualStorage: "virtual-storage-1", 157 healthyNodes: map[string][]string{ 158 "virtual-storage-1": {"consistent-secondary"}, 159 }, 160 numCandidates: 1, 161 node: "consistent-secondary", 162 }, 163 { 164 desc: "no suitable nodes", 165 virtualStorage: "virtual-storage-1", 166 healthyNodes: map[string][]string{ 167 "virtual-storage-1": {"inconistent-secondary"}, 168 }, 169 error: ErrNoSuitableNode, 170 }, 171 { 172 desc: "primary force-picked", 173 virtualStorage: "virtual-storage-1", 174 healthyNodes: map[string][]string{ 175 "virtual-storage-1": {"primary", "consistent-secondary"}, 176 }, 177 forcePrimary: true, 178 node: "primary", 179 }, 180 { 181 desc: "secondary not picked if force-picking unhealthy primary", 182 virtualStorage: "virtual-storage-1", 183 healthyNodes: map[string][]string{ 184 "virtual-storage-1": {"consistent-secondary"}, 185 }, 186 forcePrimary: true, 187 error: nodes.ErrPrimaryNotHealthy, 188 }, 189 } { 190 t.Run(tc.desc, func(t *testing.T) { 191 ctx, cancel := testhelper.Context() 192 defer cancel() 193 194 ctx = testhelper.MergeIncomingMetadata(ctx, metadata.New(tc.metadata)) 195 196 conns := Connections{ 197 "virtual-storage-1": { 198 "primary": &grpc.ClientConn{}, 199 "consistent-secondary": &grpc.ClientConn{}, 200 "inconistent-secondary": &grpc.ClientConn{}, 201 "unhealthy-secondary": &grpc.ClientConn{}, 202 }, 203 } 204 205 router := NewPerRepositoryRouter( 206 conns, 207 PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { 208 t.Helper() 209 require.Equal(t, tc.virtualStorage, virtualStorage) 210 require.Equal(t, "repository", relativePath) 211 return "primary", nil 212 }), 213 tc.healthyNodes, 214 mockRandom{ 215 intnFunc: func(n int) int { 216 require.Equal(t, tc.numCandidates, n) 217 return tc.pickCandidate 218 }, 219 }, 220 datastore.MockRepositoryStore{ 221 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 222 t.Helper() 223 require.Equal(t, tc.virtualStorage, virtualStorage) 224 require.Equal(t, "repository", relativePath) 225 return map[string]struct{}{"primary": {}, "consistent-secondary": {}}, nil 226 }, 227 }, 228 nil, 229 nil, 230 ) 231 232 node, err := router.RouteRepositoryAccessor(ctx, tc.virtualStorage, "repository", tc.forcePrimary) 233 require.Equal(t, tc.error, err) 234 if tc.node != "" { 235 require.Equal(t, RouterNode{ 236 Storage: tc.node, 237 Connection: conns[tc.virtualStorage][tc.node], 238 }, node) 239 } else { 240 require.Empty(t, node) 241 } 242 }) 243 } 244} 245 246func TestPerRepositoryRouter_RouteRepositoryMutator(t *testing.T) { 247 configuredNodes := map[string][]string{ 248 "virtual-storage-1": {"primary", "secondary-1", "secondary-2"}, 249 } 250 251 for _, tc := range []struct { 252 desc string 253 virtualStorage string 254 healthyNodes StaticHealthChecker 255 consistentStorages []string 256 secondaries []string 257 replicationTargets []string 258 error error 259 assignedNodes AssignmentGetter 260 }{ 261 { 262 desc: "unknown virtual storage", 263 virtualStorage: "unknown", 264 error: nodes.ErrVirtualStorageNotExist, 265 }, 266 { 267 desc: "primary outdated", 268 virtualStorage: "virtual-storage-1", 269 healthyNodes: StaticHealthChecker(configuredNodes), 270 assignedNodes: StaticStorageAssignments(configuredNodes), 271 consistentStorages: []string{"secondary-1", "secondary-2"}, 272 error: ErrRepositoryReadOnly, 273 }, 274 { 275 desc: "primary unhealthy", 276 virtualStorage: "virtual-storage-1", 277 healthyNodes: StaticHealthChecker{"virtual-storage-1": {"secondary-1", "secondary-2"}}, 278 assignedNodes: StaticStorageAssignments(configuredNodes), 279 consistentStorages: []string{"primary", "secondary-1", "secondary-2"}, 280 error: nodes.ErrPrimaryNotHealthy, 281 }, 282 { 283 desc: "all secondaries consistent", 284 virtualStorage: "virtual-storage-1", 285 healthyNodes: StaticHealthChecker(configuredNodes), 286 assignedNodes: StaticStorageAssignments(configuredNodes), 287 consistentStorages: []string{"primary", "secondary-1", "secondary-2"}, 288 secondaries: []string{"secondary-1", "secondary-2"}, 289 }, 290 { 291 desc: "inconsistent secondary", 292 virtualStorage: "virtual-storage-1", 293 healthyNodes: StaticHealthChecker(configuredNodes), 294 assignedNodes: StaticStorageAssignments(configuredNodes), 295 consistentStorages: []string{"primary", "secondary-2"}, 296 secondaries: []string{"secondary-2"}, 297 replicationTargets: []string{"secondary-1"}, 298 }, 299 { 300 desc: "unhealthy secondaries", 301 virtualStorage: "virtual-storage-1", 302 healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}}, 303 assignedNodes: StaticStorageAssignments(configuredNodes), 304 consistentStorages: []string{"primary", "secondary-1"}, 305 replicationTargets: []string{"secondary-1", "secondary-2"}, 306 }, 307 { 308 desc: "up to date unassigned nodes are ignored", 309 virtualStorage: "virtual-storage-1", 310 healthyNodes: StaticHealthChecker(configuredNodes), 311 assignedNodes: StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"primary", "secondary-1"}}}, 312 consistentStorages: []string{"primary", "secondary-1", "secondary-2"}, 313 secondaries: []string{"secondary-1"}, 314 }, 315 { 316 desc: "outdated unassigned nodes are ignored", 317 virtualStorage: "virtual-storage-1", 318 healthyNodes: StaticHealthChecker(configuredNodes), 319 assignedNodes: StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"primary", "secondary-1"}}}, 320 consistentStorages: []string{"primary", "secondary-1"}, 321 secondaries: []string{"secondary-1"}, 322 }, 323 { 324 desc: "primary is unassigned", 325 virtualStorage: "virtual-storage-1", 326 healthyNodes: StaticHealthChecker(configuredNodes), 327 assignedNodes: StaticRepositoryAssignments{"virtual-storage-1": {"repository": {"secondary-1", "secondary-2"}}}, 328 consistentStorages: []string{"primary", "secondary-1", "secondary-2"}, 329 secondaries: []string{"secondary-1"}, 330 replicationTargets: []string{"secondary-2"}, 331 error: errPrimaryUnassigned, 332 }, 333 } { 334 t.Run(tc.desc, func(t *testing.T) { 335 ctx, cancel := testhelper.Context() 336 defer cancel() 337 338 conns := Connections{ 339 "virtual-storage-1": { 340 "primary": &grpc.ClientConn{}, 341 "secondary-1": &grpc.ClientConn{}, 342 "secondary-2": &grpc.ClientConn{}, 343 }, 344 } 345 346 router := NewPerRepositoryRouter( 347 conns, 348 PrimaryGetterFunc(func(ctx context.Context, virtualStorage, relativePath string) (string, error) { 349 t.Helper() 350 require.Equal(t, tc.virtualStorage, virtualStorage) 351 require.Equal(t, "repository", relativePath) 352 return "primary", nil 353 }), 354 tc.healthyNodes, 355 nil, 356 datastore.MockRepositoryStore{ 357 GetConsistentStoragesFunc: func(ctx context.Context, virtualStorage, relativePath string) (map[string]struct{}, error) { 358 t.Helper() 359 require.Equal(t, tc.virtualStorage, virtualStorage) 360 require.Equal(t, "repository", relativePath) 361 consistentStorages := map[string]struct{}{} 362 for _, storage := range tc.consistentStorages { 363 consistentStorages[storage] = struct{}{} 364 } 365 366 return consistentStorages, nil 367 }, 368 }, 369 tc.assignedNodes, 370 nil, 371 ) 372 373 route, err := router.RouteRepositoryMutator(ctx, tc.virtualStorage, "repository") 374 require.Equal(t, tc.error, err) 375 if err == nil { 376 var secondaries []RouterNode 377 for _, secondary := range tc.secondaries { 378 secondaries = append(secondaries, RouterNode{ 379 Storage: secondary, 380 Connection: conns[tc.virtualStorage][secondary], 381 }) 382 } 383 384 require.Equal(t, RepositoryMutatorRoute{ 385 Primary: RouterNode{ 386 Storage: "primary", 387 Connection: conns[tc.virtualStorage]["primary"], 388 }, 389 Secondaries: secondaries, 390 ReplicationTargets: tc.replicationTargets, 391 }, route) 392 } 393 }) 394 } 395} 396 397func TestPerRepositoryRouter_RouteRepositoryCreation(t *testing.T) { 398 configuredNodes := map[string][]string{ 399 "virtual-storage-1": {"primary", "secondary-1", "secondary-2"}, 400 } 401 402 type matcher func(*testing.T, RepositoryMutatorRoute) 403 404 requireOneOf := func(expected ...RepositoryMutatorRoute) matcher { 405 return func(t *testing.T, actual RepositoryMutatorRoute) { 406 sort.Slice(actual.Secondaries, func(i, j int) bool { 407 return actual.Secondaries[i].Storage < actual.Secondaries[j].Storage 408 }) 409 sort.Strings(actual.ReplicationTargets) 410 require.Contains(t, expected, actual) 411 } 412 } 413 414 primaryConn := &grpc.ClientConn{} 415 secondary1Conn := &grpc.ClientConn{} 416 secondary2Conn := &grpc.ClientConn{} 417 418 for _, tc := range []struct { 419 desc string 420 virtualStorage string 421 healthyNodes StaticHealthChecker 422 replicationFactor int 423 primaryCandidates int 424 primaryPick int 425 secondaryCandidates int 426 matchRoute matcher 427 error error 428 }{ 429 { 430 desc: "no healthy nodes", 431 virtualStorage: "virtual-storage-1", 432 healthyNodes: StaticHealthChecker{}, 433 error: ErrNoHealthyNodes, 434 }, 435 { 436 desc: "invalid virtual storage", 437 virtualStorage: "invalid", 438 error: nodes.ErrVirtualStorageNotExist, 439 }, 440 { 441 desc: "no healthy secondaries", 442 virtualStorage: "virtual-storage-1", 443 healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary"}}, 444 primaryCandidates: 1, 445 primaryPick: 0, 446 matchRoute: requireOneOf( 447 RepositoryMutatorRoute{ 448 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 449 ReplicationTargets: []string{"secondary-1", "secondary-2"}, 450 }, 451 ), 452 }, 453 { 454 desc: "success with all secondaries healthy", 455 virtualStorage: "virtual-storage-1", 456 healthyNodes: StaticHealthChecker(configuredNodes), 457 primaryCandidates: 3, 458 primaryPick: 0, 459 matchRoute: requireOneOf( 460 RepositoryMutatorRoute{ 461 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 462 Secondaries: []RouterNode{ 463 {Storage: "secondary-1", Connection: secondary1Conn}, 464 {Storage: "secondary-2", Connection: secondary2Conn}, 465 }, 466 }, 467 ), 468 }, 469 { 470 desc: "success with one secondary unhealthy", 471 virtualStorage: "virtual-storage-1", 472 healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary", "secondary-1"}}, 473 primaryCandidates: 2, 474 primaryPick: 0, 475 matchRoute: requireOneOf( 476 RepositoryMutatorRoute{ 477 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 478 Secondaries: []RouterNode{ 479 {Storage: "secondary-1", Connection: secondary1Conn}, 480 }, 481 ReplicationTargets: []string{"secondary-2"}, 482 }, 483 ), 484 }, 485 { 486 desc: "replication factor of one configured", 487 virtualStorage: "virtual-storage-1", 488 healthyNodes: StaticHealthChecker(configuredNodes), 489 replicationFactor: 1, 490 primaryCandidates: 3, 491 primaryPick: 0, 492 matchRoute: requireOneOf( 493 RepositoryMutatorRoute{ 494 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 495 }, 496 ), 497 }, 498 { 499 desc: "replication factor of two configured", 500 virtualStorage: "virtual-storage-1", 501 healthyNodes: StaticHealthChecker(configuredNodes), 502 replicationFactor: 2, 503 primaryCandidates: 3, 504 primaryPick: 0, 505 secondaryCandidates: 2, 506 matchRoute: requireOneOf( 507 RepositoryMutatorRoute{ 508 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 509 Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, 510 }, 511 RepositoryMutatorRoute{ 512 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 513 Secondaries: []RouterNode{{Storage: "secondary-2", Connection: secondary1Conn}}, 514 }, 515 ), 516 }, 517 { 518 desc: "replication factor of three configured with unhealthy secondary", 519 virtualStorage: "virtual-storage-1", 520 healthyNodes: StaticHealthChecker{"virtual-storage-1": {"primary", "secondary-1"}}, 521 replicationFactor: 3, 522 primaryCandidates: 2, 523 primaryPick: 0, 524 secondaryCandidates: 2, 525 matchRoute: requireOneOf( 526 RepositoryMutatorRoute{ 527 Primary: RouterNode{Storage: "primary", Connection: primaryConn}, 528 Secondaries: []RouterNode{{Storage: "secondary-1", Connection: secondary1Conn}}, 529 ReplicationTargets: []string{"secondary-2"}, 530 }, 531 ), 532 }, 533 } { 534 t.Run(tc.desc, func(t *testing.T) { 535 ctx, cancel := testhelper.Context() 536 defer cancel() 537 538 route, err := NewPerRepositoryRouter( 539 Connections{ 540 "virtual-storage-1": { 541 "primary": primaryConn, 542 "secondary-1": secondary1Conn, 543 "secondary-2": secondary2Conn, 544 }, 545 }, 546 nil, 547 tc.healthyNodes, 548 mockRandom{ 549 intnFunc: func(n int) int { 550 require.Equal(t, tc.primaryCandidates, n) 551 return tc.primaryPick 552 }, 553 shuffleFunc: func(n int, swap func(i, j int)) { 554 require.Equal(t, tc.secondaryCandidates, n) 555 }, 556 }, 557 nil, 558 nil, 559 map[string]int{"virtual-storage-1": tc.replicationFactor}, 560 ).RouteRepositoryCreation(ctx, tc.virtualStorage) 561 if tc.error != nil { 562 require.Equal(t, tc.error, err) 563 return 564 } 565 566 require.NoError(t, err) 567 tc.matchRoute(t, route) 568 }) 569 } 570} 571