1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "bytes" 18 "context" 19 "math/rand" 20 "testing" 21 "time" 22 23 "cloud.google.com/go/pubsublite/internal/test" 24 "google.golang.org/grpc/codes" 25 "google.golang.org/grpc/status" 26 27 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 28) 29 30func testPublishSettings() PublishSettings { 31 settings := DefaultPublishSettings 32 // Send 1 message at a time to make tests deterministic. 33 settings.CountThreshold = 1 34 // Send messages with minimal delay to speed up tests. 35 settings.DelayThreshold = time.Millisecond 36 settings.Timeout = 5 * time.Second 37 // Disable topic partition count background polling. 38 settings.ConfigPollPeriod = 0 39 return settings 40} 41 42// testPartitionPublisher wraps a singlePartitionPublisher for ease of testing. 43type testPartitionPublisher struct { 44 pub *singlePartitionPublisher 45 serviceTestProxy 46} 47 48func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher { 49 ctx := context.Background() 50 pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) 51 if err != nil { 52 t.Fatal(err) 53 } 54 55 pubFactory := &singlePartitionPublisherFactory{ 56 ctx: ctx, 57 pubClient: pubClient, 58 settings: settings, 59 topicPath: topic.Path, 60 } 61 tp := &testPartitionPublisher{ 62 pub: pubFactory.New(topic.Partition), 63 } 64 tp.initAndStart(t, tp.pub, "Publisher", pubClient) 65 return tp 66} 67 68func (tp *testPartitionPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { 69 result := newTestPublishResultReceiver(tp.t, msg) 70 tp.pub.Publish(msg, result.set) 71 return result 72} 73 74func (tp *testPartitionPublisher) FinalError() (err error) { 75 err = tp.serviceTestProxy.FinalError() 76 77 // Verify that the stream has terminated. 78 if gotStatus, wantStatus := tp.pub.stream.Status(), streamTerminated; gotStatus != wantStatus { 79 tp.t.Errorf("%s retryableStream status: %v, want: %v", tp.name, gotStatus, wantStatus) 80 } 81 if tp.pub.stream.currentStream() != nil { 82 tp.t.Errorf("%s client stream should be nil", tp.name) 83 } 84 return 85} 86 87func TestSinglePartitionPublisherInvalidInitialResponse(t *testing.T) { 88 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 89 90 verifiers := test.NewVerifiers(t) 91 stream := test.NewRPCVerifier(t) 92 stream.Push(initPubReq(topic), msgPubResp(0), nil) // Publish response instead of initial response 93 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 94 95 mockServer.OnTestStart(verifiers) 96 defer mockServer.OnTestEnd() 97 98 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 99 100 wantErr := errInvalidInitialPubResponse 101 if gotErr := pub.StartError(); !test.ErrorEqual(gotErr, wantErr) { 102 t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr) 103 } 104 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { 105 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 106 } 107} 108 109func TestSinglePartitionPublisherSpuriousPublishResponse(t *testing.T) { 110 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 111 112 verifiers := test.NewVerifiers(t) 113 stream := test.NewRPCVerifier(t) 114 stream.Push(initPubReq(topic), initPubResp(), nil) 115 barrier := stream.PushWithBarrier(nil, msgPubResp(0), nil) // Publish response with no messages 116 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 117 118 mockServer.OnTestStart(verifiers) 119 defer mockServer.OnTestEnd() 120 121 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 122 if gotErr := pub.StartError(); gotErr != nil { 123 t.Errorf("Start() got err: (%v)", gotErr) 124 } 125 126 // Send after startup to ensure the test is deterministic. 127 barrier.Release() 128 if gotErr, wantErr := pub.FinalError(), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { 129 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 130 } 131} 132 133func TestSinglePartitionPublisherBatching(t *testing.T) { 134 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 135 settings := testPublishSettings() 136 settings.DelayThreshold = time.Minute // Batching delay disabled, tested elsewhere 137 settings.CountThreshold = 3 138 139 // Batch 1 140 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 141 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 142 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 143 144 // Batch 2 145 msg4 := &pb.PubSubMessage{Data: []byte{'3'}} 146 147 verifiers := test.NewVerifiers(t) 148 stream := test.NewRPCVerifier(t) 149 stream.Push(initPubReq(topic), initPubResp(), nil) 150 stream.Push(msgPubReq(msg1, msg2, msg3), msgPubResp(0), nil) 151 stream.Push(msgPubReq(msg4), msgPubResp(33), nil) 152 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 153 154 mockServer.OnTestStart(verifiers) 155 defer mockServer.OnTestEnd() 156 157 pub := newTestSinglePartitionPublisher(t, topic, settings) 158 if gotErr := pub.StartError(); gotErr != nil { 159 t.Errorf("Start() got err: (%v)", gotErr) 160 } 161 162 result1 := pub.Publish(msg1) 163 result2 := pub.Publish(msg2) 164 result3 := pub.Publish(msg3) 165 result4 := pub.Publish(msg4) 166 // Stop flushes pending messages. 167 pub.Stop() 168 169 result1.ValidateResult(topic.Partition, 0) 170 result2.ValidateResult(topic.Partition, 1) 171 result3.ValidateResult(topic.Partition, 2) 172 result4.ValidateResult(topic.Partition, 33) 173 174 if gotErr := pub.FinalError(); gotErr != nil { 175 t.Errorf("Publisher final err: (%v), want: <nil>", gotErr) 176 } 177} 178 179func TestSinglePartitionPublisherResendMessages(t *testing.T) { 180 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 181 182 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 183 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 184 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 185 186 verifiers := test.NewVerifiers(t) 187 188 // Simulate a transient error that results in a reconnect before any server 189 // publish responses are received. 190 stream1 := test.NewRPCVerifier(t) 191 stream1.Push(initPubReq(topic), initPubResp(), nil) 192 stream1.Push(msgPubReq(msg1), nil, nil) 193 stream1.Push(msgPubReq(msg2), nil, status.Error(codes.Aborted, "server aborted")) 194 verifiers.AddPublishStream(topic.Path, topic.Partition, stream1) 195 196 // The publisher should resend all in-flight batches to the second stream. 197 stream2 := test.NewRPCVerifier(t) 198 stream2.Push(initPubReq(topic), initPubResp(), nil) 199 stream2.Push(msgPubReq(msg1, msg2), msgPubResp(0), nil) 200 stream2.Push(msgPubReq(msg3), msgPubResp(2), nil) 201 verifiers.AddPublishStream(topic.Path, topic.Partition, stream2) 202 203 mockServer.OnTestStart(verifiers) 204 defer mockServer.OnTestEnd() 205 206 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 207 defer pub.StopVerifyNoError() 208 if gotErr := pub.StartError(); gotErr != nil { 209 t.Errorf("Start() got err: (%v)", gotErr) 210 } 211 212 result1 := pub.Publish(msg1) 213 result2 := pub.Publish(msg2) 214 result1.ValidateResult(topic.Partition, 0) 215 result2.ValidateResult(topic.Partition, 1) 216 217 result3 := pub.Publish(msg3) 218 result3.ValidateResult(topic.Partition, 2) 219} 220 221func TestSinglePartitionPublisherPublishPermanentError(t *testing.T) { 222 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 223 permError := status.Error(codes.NotFound, "topic deleted") 224 225 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 226 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 227 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 228 229 verifiers := test.NewVerifiers(t) 230 stream := test.NewRPCVerifier(t) 231 stream.Push(initPubReq(topic), initPubResp(), nil) 232 stream.Push(msgPubReq(msg1), nil, permError) // Permanent error terminates publisher 233 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 234 235 mockServer.OnTestStart(verifiers) 236 defer mockServer.OnTestEnd() 237 238 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 239 if gotErr := pub.StartError(); gotErr != nil { 240 t.Errorf("Start() got err: (%v)", gotErr) 241 } 242 243 result1 := pub.Publish(msg1) 244 result2 := pub.Publish(msg2) 245 result1.ValidateError(permError) 246 result2.ValidateError(permError) 247 248 // This message arrives after the publisher has already stopped, so its error 249 // message is ErrServiceStopped. 250 result3 := pub.Publish(msg3) 251 result3.ValidateError(ErrServiceStopped) 252 253 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, permError) { 254 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, permError) 255 } 256} 257 258func TestSinglePartitionPublisherBufferOverflow(t *testing.T) { 259 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 260 settings := testPublishSettings() 261 settings.BufferedByteLimit = 15 262 263 msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} 264 msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} // Causes overflow 265 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 266 267 verifiers := test.NewVerifiers(t) 268 stream := test.NewRPCVerifier(t) 269 stream.Push(initPubReq(topic), initPubResp(), nil) 270 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(0), nil) 271 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 272 273 mockServer.OnTestStart(verifiers) 274 defer mockServer.OnTestEnd() 275 276 pub := newTestSinglePartitionPublisher(t, topic, settings) 277 if gotErr := pub.StartError(); gotErr != nil { 278 t.Errorf("Start() got err: (%v)", gotErr) 279 } 280 281 result1 := pub.Publish(msg1) 282 // Overflow is detected, which terminates the publisher, but previous messages 283 // are flushed. 284 result2 := pub.Publish(msg2) 285 // Delay the server response for the first Publish to verify that it is 286 // allowed to complete. 287 barrier.Release() 288 // This message arrives after the publisher has already stopped, so its error 289 // message is ErrServiceStopped. 290 result3 := pub.Publish(msg3) 291 292 result1.ValidateResult(topic.Partition, 0) 293 result2.ValidateError(ErrOverflow) 294 result3.ValidateError(ErrServiceStopped) 295 296 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, ErrOverflow) { 297 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, ErrOverflow) 298 } 299} 300 301func TestSinglePartitionPublisherBufferRefill(t *testing.T) { 302 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 303 settings := testPublishSettings() 304 settings.BufferedByteLimit = 15 305 306 msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} 307 msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} 308 309 verifiers := test.NewVerifiers(t) 310 stream := test.NewRPCVerifier(t) 311 stream.Push(initPubReq(topic), initPubResp(), nil) 312 stream.Push(msgPubReq(msg1), msgPubResp(0), nil) 313 stream.Push(msgPubReq(msg2), msgPubResp(1), nil) 314 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 315 316 mockServer.OnTestStart(verifiers) 317 defer mockServer.OnTestEnd() 318 319 pub := newTestSinglePartitionPublisher(t, topic, settings) 320 if gotErr := pub.StartError(); gotErr != nil { 321 t.Errorf("Start() got err: (%v)", gotErr) 322 } 323 324 result1 := pub.Publish(msg1) 325 result1.ValidateResult(topic.Partition, 0) 326 327 // No overflow because msg2 is sent after the response for msg1 is received. 328 result2 := pub.Publish(msg2) 329 result2.ValidateResult(topic.Partition, 1) 330 331 pub.StopVerifyNoError() 332} 333 334func TestSinglePartitionPublisherInvalidCursorOffsets(t *testing.T) { 335 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 336 337 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 338 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 339 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 340 341 verifiers := test.NewVerifiers(t) 342 stream := test.NewRPCVerifier(t) 343 stream.Push(initPubReq(topic), initPubResp(), nil) 344 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(4), nil) 345 // The server returns an inconsistent cursor offset for msg2, which causes the 346 // publisher client to fail permanently. 347 stream.Push(msgPubReq(msg2), msgPubResp(4), nil) 348 stream.Push(msgPubReq(msg3), msgPubResp(5), nil) 349 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 350 351 mockServer.OnTestStart(verifiers) 352 defer mockServer.OnTestEnd() 353 354 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 355 if gotErr := pub.StartError(); gotErr != nil { 356 t.Errorf("Start() got err: (%v)", gotErr) 357 } 358 359 result1 := pub.Publish(msg1) 360 result2 := pub.Publish(msg2) 361 result3 := pub.Publish(msg3) 362 barrier.Release() 363 364 result1.ValidateResult(topic.Partition, 4) 365 366 // msg2 and subsequent messages are errored. 367 wantMsg := "server returned publish response with inconsistent start offset" 368 result2.ValidateErrorMsg(wantMsg) 369 result3.ValidateErrorMsg(wantMsg) 370 if gotErr := pub.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) { 371 t.Errorf("Publisher final err: (%v), want msg: %q", gotErr, wantMsg) 372 } 373} 374 375func TestSinglePartitionPublisherInvalidServerPublishResponse(t *testing.T) { 376 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 377 msg := &pb.PubSubMessage{Data: []byte{'1'}} 378 379 verifiers := test.NewVerifiers(t) 380 stream := test.NewRPCVerifier(t) 381 stream.Push(initPubReq(topic), initPubResp(), nil) 382 // Server sends duplicate initial publish response, which causes the publisher 383 // client to fail permanently. 384 stream.Push(msgPubReq(msg), initPubResp(), nil) 385 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 386 387 mockServer.OnTestStart(verifiers) 388 defer mockServer.OnTestEnd() 389 390 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 391 if gotErr := pub.StartError(); gotErr != nil { 392 t.Errorf("Start() got err: (%v)", gotErr) 393 } 394 395 result := pub.Publish(msg) 396 397 wantErr := errInvalidMsgPubResponse 398 result.ValidateError(wantErr) 399 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { 400 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 401 } 402} 403 404func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) { 405 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 406 finalErr := status.Error(codes.FailedPrecondition, "invalid message") 407 408 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 409 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 410 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 411 msg4 := &pb.PubSubMessage{Data: []byte{'4'}} 412 413 verifiers := test.NewVerifiers(t) 414 stream := test.NewRPCVerifier(t) 415 stream.Push(initPubReq(topic), initPubResp(), nil) 416 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(5), nil) 417 stream.Push(msgPubReq(msg2), msgPubResp(6), nil) 418 stream.Push(msgPubReq(msg3), nil, finalErr) 419 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 420 421 mockServer.OnTestStart(verifiers) 422 defer mockServer.OnTestEnd() 423 424 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 425 if gotErr := pub.StartError(); gotErr != nil { 426 t.Errorf("Start() got err: (%v)", gotErr) 427 } 428 429 result1 := pub.Publish(msg1) 430 result2 := pub.Publish(msg2) 431 result3 := pub.Publish(msg3) 432 pub.Stop() 433 barrier.Release() 434 result4 := pub.Publish(msg4) 435 436 // First 2 messages should be allowed to complete. 437 result1.ValidateResult(topic.Partition, 5) 438 result2.ValidateResult(topic.Partition, 6) 439 // msg3 failed with a server error, which should result in the publisher 440 // terminating with an error. 441 result3.ValidateError(finalErr) 442 // msg4 was sent after the user called Stop(), so should fail immediately with 443 // ErrServiceStopped. 444 result4.ValidateError(ErrServiceStopped) 445 446 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, finalErr) { 447 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr) 448 } 449} 450 451func TestSinglePartitionPublisherPublishWhileStarting(t *testing.T) { 452 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 453 msg := &pb.PubSubMessage{Data: []byte{'1'}} 454 455 verifiers := test.NewVerifiers(t) 456 stream := test.NewRPCVerifier(t) 457 stream.Push(initPubReq(topic), initPubResp(), nil) 458 stream.Push(msgPubReq(msg), msgPubResp(42), nil) 459 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 460 461 mockServer.OnTestStart(verifiers) 462 defer mockServer.OnTestEnd() 463 464 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 465 466 // Did not wait for publisher to finish startup. But it should send msg once 467 // the Publish stream connects. 468 result := pub.Publish(msg) 469 result.ValidateResult(topic.Partition, 42) 470 471 pub.StopVerifyNoError() 472} 473 474func TestSinglePartitionPublisherPublishWhileStartingFails(t *testing.T) { 475 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 476 msg := &pb.PubSubMessage{Data: []byte{'1'}} 477 serverErr := status.Error(codes.FailedPrecondition, "failed") 478 479 verifiers := test.NewVerifiers(t) 480 stream := test.NewRPCVerifier(t) 481 barrier := stream.PushWithBarrier(initPubReq(topic), nil, serverErr) 482 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 483 484 mockServer.OnTestStart(verifiers) 485 defer mockServer.OnTestEnd() 486 487 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 488 489 // Published during startup. 490 result := pub.Publish(msg) 491 // Send the initial response (with error) to complete startup. 492 barrier.Release() 493 494 result.ValidateError(serverErr) 495 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { 496 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, serverErr) 497 } 498} 499 500// testRoutingPublisher wraps a routingPublisher for testing. 501type testRoutingPublisher struct { 502 t *testing.T 503 pub *routingPublisher 504} 505 506func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher { 507 ctx := context.Background() 508 pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) 509 if err != nil { 510 t.Fatal(err) 511 } 512 adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn()) 513 if err != nil { 514 t.Fatal(err) 515 } 516 allClients := apiClients{pubClient, adminClient} 517 518 source := &test.FakeSource{Ret: fakeSourceVal} 519 msgRouterFactory := newMessageRouterFactory(rand.New(source)) 520 pubFactory := &singlePartitionPublisherFactory{ 521 ctx: ctx, 522 pubClient: pubClient, 523 settings: settings, 524 topicPath: topicPath, 525 } 526 pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory) 527 pub.Start() 528 return &testRoutingPublisher{t: t, pub: pub} 529} 530 531func (tp *testRoutingPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { 532 result := newTestPublishResultReceiver(tp.t, msg) 533 tp.pub.Publish(msg, result.set) 534 return result 535} 536 537func (tp *testRoutingPublisher) NumPartitionPublishers() int { 538 tp.pub.mu.Lock() 539 defer tp.pub.mu.Unlock() 540 return len(tp.pub.publishers) 541} 542 543func (tp *testRoutingPublisher) Start() { tp.pub.Start() } 544func (tp *testRoutingPublisher) Stop() { tp.pub.Stop() } 545func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() } 546func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() } 547 548func TestRoutingPublisherStartOnce(t *testing.T) { 549 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 550 numPartitions := 2 551 552 verifiers := test.NewVerifiers(t) 553 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 554 555 stream0 := test.NewRPCVerifier(t) 556 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 557 verifiers.AddPublishStream(topic, 0, stream0) 558 559 stream1 := test.NewRPCVerifier(t) 560 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 561 verifiers.AddPublishStream(topic, 1, stream1) 562 563 mockServer.OnTestStart(verifiers) 564 defer mockServer.OnTestEnd() 565 566 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 567 568 t.Run("First succeeds", func(t *testing.T) { 569 // Note: newTestRoutingPublisher() called Start. 570 if gotErr := pub.WaitStarted(); gotErr != nil { 571 t.Errorf("Start() got err: (%v)", gotErr) 572 } 573 if got, want := pub.NumPartitionPublishers(), numPartitions; got != want { 574 t.Errorf("Num partition publishers: got %d, want %d", got, want) 575 } 576 }) 577 t.Run("Second no-op", func(t *testing.T) { 578 // An error is not returned, but no new streams are opened. The mock server 579 // does not expect more RPCs. 580 pub.Start() 581 if gotErr := pub.WaitStarted(); gotErr != nil { 582 t.Errorf("Start() got err: (%v)", gotErr) 583 } 584 }) 585 586 pub.Stop() 587 if gotErr := pub.WaitStopped(); gotErr != nil { 588 t.Errorf("Stop() got err: (%v)", gotErr) 589 } 590} 591 592func TestRoutingPublisherStartStop(t *testing.T) { 593 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 594 numPartitions := 2 595 596 verifiers := test.NewVerifiers(t) 597 barrier := verifiers.GlobalVerifier.PushWithBarrier(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 598 599 mockServer.OnTestStart(verifiers) 600 defer mockServer.OnTestEnd() 601 602 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 603 pub.Stop() 604 barrier.Release() 605 606 if gotErr := pub.WaitStopped(); gotErr != nil { 607 t.Errorf("Stop() got err: (%v)", gotErr) 608 } 609 // No publishers should be created. 610 if got, want := pub.NumPartitionPublishers(), 0; got != want { 611 t.Errorf("Num partition publishers: got %d, want %d", got, want) 612 } 613} 614 615func TestRoutingPublisherRoundRobin(t *testing.T) { 616 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 617 numPartitions := 3 618 619 // Messages have no ordering key, so the roundRobinMsgRouter is used. 620 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 621 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 622 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 623 msg4 := &pb.PubSubMessage{Data: []byte{'4'}} 624 625 verifiers := test.NewVerifiers(t) 626 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 627 628 // Partition 0 629 stream0 := test.NewRPCVerifier(t) 630 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 631 stream0.Push(msgPubReq(msg3), msgPubResp(34), nil) 632 verifiers.AddPublishStream(topic, 0, stream0) 633 634 // Partition 1 635 stream1 := test.NewRPCVerifier(t) 636 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 637 stream1.Push(msgPubReq(msg1), msgPubResp(41), nil) 638 stream1.Push(msgPubReq(msg4), msgPubResp(42), nil) 639 verifiers.AddPublishStream(topic, 1, stream1) 640 641 // Partition 2 642 stream2 := test.NewRPCVerifier(t) 643 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 644 stream2.Push(msgPubReq(msg2), msgPubResp(78), nil) 645 verifiers.AddPublishStream(topic, 2, stream2) 646 647 mockServer.OnTestStart(verifiers) 648 defer mockServer.OnTestEnd() 649 650 // Note: The fake source is initialized with value=1, so Partition=1 publisher 651 // will be the first chosen by the roundRobinMsgRouter. 652 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1) 653 if err := pub.WaitStarted(); err != nil { 654 t.Errorf("Start() got err: (%v)", err) 655 } 656 657 result1 := pub.Publish(msg1) 658 result2 := pub.Publish(msg2) 659 result3 := pub.Publish(msg3) 660 result4 := pub.Publish(msg4) 661 662 result1.ValidateResult(1, 41) 663 result2.ValidateResult(2, 78) 664 result3.ValidateResult(0, 34) 665 result4.ValidateResult(1, 42) 666 667 pub.Stop() 668 if err := pub.WaitStopped(); err != nil { 669 t.Errorf("Stop() got err: (%v)", err) 670 } 671} 672 673func TestRoutingPublisherHashing(t *testing.T) { 674 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 675 numPartitions := 3 676 677 key0 := []byte("bar") // hashes to partition 0 678 key1 := []byte("baz") // hashes to partition 1 679 key2 := []byte("foo") // hashes to partition 2 680 681 // Messages have ordering key, so the hashingMsgRouter is used. 682 msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2} 683 msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0} 684 msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2} 685 msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1} 686 msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0} 687 688 verifiers := test.NewVerifiers(t) 689 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 690 691 // Partition 0 692 stream0 := test.NewRPCVerifier(t) 693 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 694 stream0.Push(msgPubReq(msg2), msgPubResp(20), nil) 695 stream0.Push(msgPubReq(msg5), msgPubResp(21), nil) 696 verifiers.AddPublishStream(topic, 0, stream0) 697 698 // Partition 1 699 stream1 := test.NewRPCVerifier(t) 700 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 701 stream1.Push(msgPubReq(msg4), msgPubResp(40), nil) 702 verifiers.AddPublishStream(topic, 1, stream1) 703 704 // Partition 2 705 stream2 := test.NewRPCVerifier(t) 706 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 707 stream2.Push(msgPubReq(msg1), msgPubResp(10), nil) 708 stream2.Push(msgPubReq(msg3), msgPubResp(11), nil) 709 verifiers.AddPublishStream(topic, 2, stream2) 710 711 mockServer.OnTestStart(verifiers) 712 defer mockServer.OnTestEnd() 713 714 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 715 if err := pub.WaitStarted(); err != nil { 716 t.Errorf("Start() got err: (%v)", err) 717 } 718 719 result1 := pub.Publish(msg1) 720 result2 := pub.Publish(msg2) 721 result3 := pub.Publish(msg3) 722 result4 := pub.Publish(msg4) 723 result5 := pub.Publish(msg5) 724 725 result1.ValidateResult(2, 10) 726 result2.ValidateResult(0, 20) 727 result3.ValidateResult(2, 11) 728 result4.ValidateResult(1, 40) 729 result5.ValidateResult(0, 21) 730 731 pub.Stop() 732 if err := pub.WaitStopped(); err != nil { 733 t.Errorf("Stop() got err: (%v)", err) 734 } 735} 736 737func TestRoutingPublisherPermanentError(t *testing.T) { 738 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 739 numPartitions := 2 740 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 741 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 742 serverErr := status.Error(codes.FailedPrecondition, "failed") 743 744 verifiers := test.NewVerifiers(t) 745 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 746 747 // Partition 0 748 stream0 := test.NewRPCVerifier(t) 749 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 750 stream0.Push(msgPubReq(msg1), msgPubResp(34), nil) 751 verifiers.AddPublishStream(topic, 0, stream0) 752 753 // Partition 1. Fails due to permanent error, which will also shut down 754 // partition-0 publisher, but it should be allowed to flush its pending 755 // messages. 756 stream1 := test.NewRPCVerifier(t) 757 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 758 stream1.Push(msgPubReq(msg2), nil, serverErr) 759 verifiers.AddPublishStream(topic, 1, stream1) 760 761 mockServer.OnTestStart(verifiers) 762 defer mockServer.OnTestEnd() 763 764 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 765 if err := pub.WaitStarted(); err != nil { 766 t.Errorf("Start() got err: (%v)", err) 767 } 768 769 result1 := pub.Publish(msg1) 770 result2 := pub.Publish(msg2) 771 772 result1.ValidateResult(0, 34) 773 result2.ValidateError(serverErr) 774 775 if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { 776 t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) 777 } 778} 779 780func TestRoutingPublisherPublishAfterStop(t *testing.T) { 781 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 782 numPartitions := 2 783 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 784 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 785 786 verifiers := test.NewVerifiers(t) 787 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 788 789 // Partition 0 790 stream0 := test.NewRPCVerifier(t) 791 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 792 verifiers.AddPublishStream(topic, 0, stream0) 793 794 // Partition 1 795 stream1 := test.NewRPCVerifier(t) 796 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 797 verifiers.AddPublishStream(topic, 1, stream1) 798 799 mockServer.OnTestStart(verifiers) 800 defer mockServer.OnTestEnd() 801 802 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 803 if err := pub.WaitStarted(); err != nil { 804 t.Errorf("Start() got err: (%v)", err) 805 } 806 807 pub.Stop() 808 result1 := pub.Publish(msg1) 809 result2 := pub.Publish(msg2) 810 811 result1.ValidateError(ErrServiceStopped) 812 result2.ValidateError(ErrServiceStopped) 813 814 if err := pub.WaitStopped(); err != nil { 815 t.Errorf("Stop() got err: (%v)", err) 816 } 817} 818 819func TestRoutingPublisherPartitionCountFail(t *testing.T) { 820 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 821 wantErr := status.Error(codes.NotFound, "no exist") 822 823 // Retrieving the number of partitions results in an error. Startup cannot 824 // proceed. 825 verifiers := test.NewVerifiers(t) 826 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, wantErr) 827 828 mockServer.OnTestStart(verifiers) 829 defer mockServer.OnTestEnd() 830 831 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 832 833 if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) { 834 t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr) 835 } 836 if got, want := pub.NumPartitionPublishers(), 0; got != want { 837 t.Errorf("Num partition publishers: got %d, want %d", got, want) 838 } 839 840 // Verify that the publisher does not attempt to restart. The mock server does 841 // not expect more RPCs. 842 pub.Start() 843} 844 845func TestRoutingPublisherPartitionCountInvalid(t *testing.T) { 846 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 847 848 // The number of partitions returned by the server must be valid, otherwise 849 // startup cannot proceed. 850 verifiers := test.NewVerifiers(t) 851 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil) 852 853 mockServer.OnTestStart(verifiers) 854 defer mockServer.OnTestEnd() 855 856 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 857 858 wantMsg := "topic has invalid number of partitions" 859 if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) { 860 t.Errorf("Start() got err: (%v), want msg: %q", gotErr, wantMsg) 861 } 862 if got, want := pub.NumPartitionPublishers(), 0; got != want { 863 t.Errorf("Num partition publishers: got %d, want %d", got, want) 864 } 865} 866 867func TestRoutingPublisherPartitionCountIncreases(t *testing.T) { 868 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 869 initialPartitionCount := 1 870 updatedPartitionCount := 3 871 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 872 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 873 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 874 875 verifiers := test.NewVerifiers(t) 876 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 877 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) 878 879 stream0 := test.NewRPCVerifier(t) 880 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 881 stream0.Push(msgPubReq(msg1), msgPubResp(11), nil) 882 verifiers.AddPublishStream(topic, 0, stream0) 883 884 stream1 := test.NewRPCVerifier(t) 885 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 886 stream1.Push(msgPubReq(msg2), msgPubResp(22), nil) 887 verifiers.AddPublishStream(topic, 1, stream1) 888 889 stream2 := test.NewRPCVerifier(t) 890 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 891 stream2.Push(msgPubReq(msg3), msgPubResp(33), nil) 892 verifiers.AddPublishStream(topic, 2, stream2) 893 894 mockServer.OnTestStart(verifiers) 895 defer mockServer.OnTestEnd() 896 897 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 898 899 t.Run("Initial count", func(t *testing.T) { 900 if gotErr := pub.WaitStarted(); gotErr != nil { 901 t.Errorf("Start() got err: (%v)", gotErr) 902 } 903 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 904 t.Errorf("Num partition publishers: got %d, want %d", got, want) 905 } 906 }) 907 t.Run("Updated count", func(t *testing.T) { 908 pub.pub.partitionWatcher.updatePartitionCount() 909 if got, want := pub.NumPartitionPublishers(), updatedPartitionCount; got != want { 910 t.Errorf("Num partition publishers: got %d, want %d", got, want) 911 } 912 }) 913 t.Run("Publish", func(t *testing.T) { 914 result1 := pub.Publish(msg1) 915 result2 := pub.Publish(msg2) 916 result3 := pub.Publish(msg3) 917 918 result1.ValidateResult(0, 11) 919 result2.ValidateResult(1, 22) 920 result3.ValidateResult(2, 33) 921 }) 922 923 pub.Stop() 924 if gotErr := pub.WaitStopped(); gotErr != nil { 925 t.Errorf("Stop() got err: (%v)", gotErr) 926 } 927} 928 929func TestRoutingPublisherPartitionCountDecreases(t *testing.T) { 930 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 931 initialPartitionCount := 2 932 updatedPartitionCount := 1 933 934 verifiers := test.NewVerifiers(t) 935 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 936 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) 937 938 stream0 := test.NewRPCVerifier(t) 939 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 940 verifiers.AddPublishStream(topic, 0, stream0) 941 942 stream1 := test.NewRPCVerifier(t) 943 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 944 verifiers.AddPublishStream(topic, 1, stream1) 945 946 mockServer.OnTestStart(verifiers) 947 defer mockServer.OnTestEnd() 948 949 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 950 951 t.Run("Initial count", func(t *testing.T) { 952 if gotErr := pub.WaitStarted(); gotErr != nil { 953 t.Errorf("Start() got err: (%v)", gotErr) 954 } 955 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 956 t.Errorf("Num partition publishers: got %d, want %d", got, want) 957 } 958 }) 959 t.Run("Updated count", func(t *testing.T) { 960 pub.pub.partitionWatcher.updatePartitionCount() 961 962 // Decreasing count ignored. 963 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 964 t.Errorf("Num partition publishers: got %d, want %d", got, want) 965 } 966 }) 967 968 pub.Stop() 969 if gotErr := pub.WaitStopped(); gotErr != nil { 970 t.Errorf("Stop() got err: (%v)", gotErr) 971 } 972} 973 974func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) { 975 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 976 initialPartitionCount := 2 977 serverErr := status.Error(codes.NotFound, "deleted") 978 979 verifiers := test.NewVerifiers(t) 980 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 981 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr) 982 983 stream0 := test.NewRPCVerifier(t) 984 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 985 verifiers.AddPublishStream(topic, 0, stream0) 986 987 stream1 := test.NewRPCVerifier(t) 988 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 989 verifiers.AddPublishStream(topic, 1, stream1) 990 991 mockServer.OnTestStart(verifiers) 992 defer mockServer.OnTestEnd() 993 994 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 995 996 t.Run("Initial count", func(t *testing.T) { 997 if gotErr := pub.WaitStarted(); gotErr != nil { 998 t.Errorf("Start() got err: (%v)", gotErr) 999 } 1000 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 1001 t.Errorf("Num partition publishers: got %d, want %d", got, want) 1002 } 1003 }) 1004 t.Run("Failed update", func(t *testing.T) { 1005 pub.pub.partitionWatcher.updatePartitionCount() 1006 1007 // Failed background update terminates the routingPublisher. 1008 if gotErr := pub.WaitStopped(); !test.ErrorHasMsg(gotErr, serverErr.Error()) { 1009 t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) 1010 } 1011 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 1012 t.Errorf("Num partition publishers: got %d, want %d", got, want) 1013 } 1014 }) 1015} 1016 1017func TestNewPublisherCreatesImpl(t *testing.T) { 1018 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 1019 const region = "us-central1" 1020 1021 if pub, err := NewPublisher(context.Background(), DefaultPublishSettings, region, topic); err != nil { 1022 t.Errorf("NewPublisher() got error: %v", err) 1023 } else if _, ok := pub.(*routingPublisher); !ok { 1024 t.Error("NewPublisher() did not return a routingPublisher") 1025 } 1026} 1027 1028func TestNewPublisherValidatesSettings(t *testing.T) { 1029 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 1030 const region = "us-central1" 1031 1032 settings := DefaultPublishSettings 1033 settings.DelayThreshold = 0 1034 if _, err := NewPublisher(context.Background(), settings, region, topic); err == nil { 1035 t.Error("NewPublisher() did not return error") 1036 } 1037} 1038