1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "bytes" 18 "context" 19 "math/rand" 20 "testing" 21 "time" 22 23 "cloud.google.com/go/pubsublite/internal/test" 24 "google.golang.org/grpc/codes" 25 "google.golang.org/grpc/status" 26 27 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 28) 29 30func testPublishSettings() PublishSettings { 31 settings := DefaultPublishSettings 32 // Send 1 message at a time to make tests deterministic. 33 settings.CountThreshold = 1 34 // Send messages with minimal delay to speed up tests. 35 settings.DelayThreshold = time.Millisecond 36 settings.Timeout = 5 * time.Second 37 // Set long poll period to prevent background update, but still have non-zero 38 // request timeout. 39 settings.ConfigPollPeriod = 1 * time.Minute 40 return settings 41} 42 43// testPartitionPublisher wraps a singlePartitionPublisher for ease of testing. 44type testPartitionPublisher struct { 45 pub *singlePartitionPublisher 46 serviceTestProxy 47} 48 49func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher { 50 ctx := context.Background() 51 pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) 52 if err != nil { 53 t.Fatal(err) 54 } 55 56 pubFactory := &singlePartitionPublisherFactory{ 57 ctx: ctx, 58 pubClient: pubClient, 59 settings: settings, 60 topicPath: topic.Path, 61 } 62 tp := &testPartitionPublisher{ 63 pub: pubFactory.New(topic.Partition), 64 } 65 tp.initAndStart(t, tp.pub, "Publisher", pubClient) 66 return tp 67} 68 69func (tp *testPartitionPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { 70 result := newTestPublishResultReceiver(tp.t, msg) 71 tp.pub.Publish(msg, result.set) 72 return result 73} 74 75func (tp *testPartitionPublisher) FinalError() (err error) { 76 err = tp.serviceTestProxy.FinalError() 77 78 // Verify that the stream has terminated. 79 if gotStatus, wantStatus := tp.pub.stream.Status(), streamTerminated; gotStatus != wantStatus { 80 tp.t.Errorf("%s retryableStream status: %v, want: %v", tp.name, gotStatus, wantStatus) 81 } 82 if tp.pub.stream.currentStream() != nil { 83 tp.t.Errorf("%s client stream should be nil", tp.name) 84 } 85 return 86} 87 88func TestSinglePartitionPublisherInvalidInitialResponse(t *testing.T) { 89 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 90 91 verifiers := test.NewVerifiers(t) 92 stream := test.NewRPCVerifier(t) 93 stream.Push(initPubReq(topic), msgPubResp(0), nil) // Publish response instead of initial response 94 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 95 96 mockServer.OnTestStart(verifiers) 97 defer mockServer.OnTestEnd() 98 99 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 100 101 wantErr := errInvalidInitialPubResponse 102 if gotErr := pub.StartError(); !test.ErrorEqual(gotErr, wantErr) { 103 t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr) 104 } 105 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { 106 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 107 } 108} 109 110func TestSinglePartitionPublisherSpuriousPublishResponse(t *testing.T) { 111 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 112 113 verifiers := test.NewVerifiers(t) 114 stream := test.NewRPCVerifier(t) 115 stream.Push(initPubReq(topic), initPubResp(), nil) 116 barrier := stream.PushWithBarrier(nil, msgPubResp(0), nil) // Publish response with no messages 117 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 118 119 mockServer.OnTestStart(verifiers) 120 defer mockServer.OnTestEnd() 121 122 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 123 if gotErr := pub.StartError(); gotErr != nil { 124 t.Errorf("Start() got err: (%v)", gotErr) 125 } 126 127 // Send after startup to ensure the test is deterministic. 128 barrier.Release() 129 if gotErr, wantErr := pub.FinalError(), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { 130 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 131 } 132} 133 134func TestSinglePartitionPublisherBatching(t *testing.T) { 135 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 136 settings := testPublishSettings() 137 settings.DelayThreshold = time.Minute // Batching delay disabled, tested elsewhere 138 settings.CountThreshold = 3 139 140 // Batch 1 141 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 142 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 143 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 144 145 // Batch 2 146 msg4 := &pb.PubSubMessage{Data: []byte{'3'}} 147 148 verifiers := test.NewVerifiers(t) 149 stream := test.NewRPCVerifier(t) 150 stream.Push(initPubReq(topic), initPubResp(), nil) 151 stream.Push(msgPubReq(msg1, msg2, msg3), msgPubResp(0), nil) 152 stream.Push(msgPubReq(msg4), msgPubResp(33), nil) 153 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 154 155 mockServer.OnTestStart(verifiers) 156 defer mockServer.OnTestEnd() 157 158 pub := newTestSinglePartitionPublisher(t, topic, settings) 159 if gotErr := pub.StartError(); gotErr != nil { 160 t.Errorf("Start() got err: (%v)", gotErr) 161 } 162 163 result1 := pub.Publish(msg1) 164 result2 := pub.Publish(msg2) 165 result3 := pub.Publish(msg3) 166 result4 := pub.Publish(msg4) 167 // Stop flushes pending messages. 168 pub.Stop() 169 170 result1.ValidateResult(topic.Partition, 0) 171 result2.ValidateResult(topic.Partition, 1) 172 result3.ValidateResult(topic.Partition, 2) 173 result4.ValidateResult(topic.Partition, 33) 174 175 if gotErr := pub.FinalError(); gotErr != nil { 176 t.Errorf("Publisher final err: (%v), want: <nil>", gotErr) 177 } 178} 179 180func TestSinglePartitionPublisherResendMessages(t *testing.T) { 181 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 182 183 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 184 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 185 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 186 187 verifiers := test.NewVerifiers(t) 188 189 // Simulate a transient error that results in a reconnect before any server 190 // publish responses are received. 191 stream1 := test.NewRPCVerifier(t) 192 stream1.Push(initPubReq(topic), initPubResp(), nil) 193 stream1.Push(msgPubReq(msg1), nil, nil) 194 stream1.Push(msgPubReq(msg2), nil, status.Error(codes.Aborted, "server aborted")) 195 verifiers.AddPublishStream(topic.Path, topic.Partition, stream1) 196 197 // The publisher should resend all in-flight batches to the second stream. 198 stream2 := test.NewRPCVerifier(t) 199 stream2.Push(initPubReq(topic), initPubResp(), nil) 200 stream2.Push(msgPubReq(msg1, msg2), msgPubResp(0), nil) 201 stream2.Push(msgPubReq(msg3), msgPubResp(2), nil) 202 verifiers.AddPublishStream(topic.Path, topic.Partition, stream2) 203 204 mockServer.OnTestStart(verifiers) 205 defer mockServer.OnTestEnd() 206 207 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 208 defer pub.StopVerifyNoError() 209 if gotErr := pub.StartError(); gotErr != nil { 210 t.Errorf("Start() got err: (%v)", gotErr) 211 } 212 213 result1 := pub.Publish(msg1) 214 result2 := pub.Publish(msg2) 215 result1.ValidateResult(topic.Partition, 0) 216 result2.ValidateResult(topic.Partition, 1) 217 218 result3 := pub.Publish(msg3) 219 result3.ValidateResult(topic.Partition, 2) 220} 221 222func TestSinglePartitionPublisherPublishPermanentError(t *testing.T) { 223 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 224 permError := status.Error(codes.NotFound, "topic deleted") 225 226 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 227 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 228 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 229 230 verifiers := test.NewVerifiers(t) 231 stream := test.NewRPCVerifier(t) 232 stream.Push(initPubReq(topic), initPubResp(), nil) 233 stream.Push(msgPubReq(msg1), nil, permError) // Permanent error terminates publisher 234 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 235 236 mockServer.OnTestStart(verifiers) 237 defer mockServer.OnTestEnd() 238 239 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 240 if gotErr := pub.StartError(); gotErr != nil { 241 t.Errorf("Start() got err: (%v)", gotErr) 242 } 243 244 result1 := pub.Publish(msg1) 245 result2 := pub.Publish(msg2) 246 result1.ValidateError(permError) 247 result2.ValidateError(permError) 248 249 // This message arrives after the publisher has already stopped, so its error 250 // message is ErrServiceStopped. 251 result3 := pub.Publish(msg3) 252 result3.ValidateError(ErrServiceStopped) 253 254 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, permError) { 255 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, permError) 256 } 257} 258 259func TestSinglePartitionPublisherBufferOverflow(t *testing.T) { 260 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 261 settings := testPublishSettings() 262 settings.BufferedByteLimit = 15 263 264 msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} 265 msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} // Causes overflow 266 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 267 268 verifiers := test.NewVerifiers(t) 269 stream := test.NewRPCVerifier(t) 270 stream.Push(initPubReq(topic), initPubResp(), nil) 271 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(0), nil) 272 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 273 274 mockServer.OnTestStart(verifiers) 275 defer mockServer.OnTestEnd() 276 277 pub := newTestSinglePartitionPublisher(t, topic, settings) 278 if gotErr := pub.StartError(); gotErr != nil { 279 t.Errorf("Start() got err: (%v)", gotErr) 280 } 281 282 result1 := pub.Publish(msg1) 283 // Overflow is detected, which terminates the publisher, but previous messages 284 // are flushed. 285 result2 := pub.Publish(msg2) 286 // Delay the server response for the first Publish to verify that it is 287 // allowed to complete. 288 barrier.Release() 289 // This message arrives after the publisher has already stopped, so its error 290 // message is ErrServiceStopped. 291 result3 := pub.Publish(msg3) 292 293 result1.ValidateResult(topic.Partition, 0) 294 result2.ValidateError(ErrOverflow) 295 result3.ValidateError(ErrServiceStopped) 296 297 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, ErrOverflow) { 298 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, ErrOverflow) 299 } 300} 301 302func TestSinglePartitionPublisherBufferRefill(t *testing.T) { 303 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 304 settings := testPublishSettings() 305 settings.BufferedByteLimit = 15 306 307 msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)} 308 msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} 309 310 verifiers := test.NewVerifiers(t) 311 stream := test.NewRPCVerifier(t) 312 stream.Push(initPubReq(topic), initPubResp(), nil) 313 stream.Push(msgPubReq(msg1), msgPubResp(0), nil) 314 stream.Push(msgPubReq(msg2), msgPubResp(1), nil) 315 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 316 317 mockServer.OnTestStart(verifiers) 318 defer mockServer.OnTestEnd() 319 320 pub := newTestSinglePartitionPublisher(t, topic, settings) 321 if gotErr := pub.StartError(); gotErr != nil { 322 t.Errorf("Start() got err: (%v)", gotErr) 323 } 324 325 result1 := pub.Publish(msg1) 326 result1.ValidateResult(topic.Partition, 0) 327 328 // No overflow because msg2 is sent after the response for msg1 is received. 329 result2 := pub.Publish(msg2) 330 result2.ValidateResult(topic.Partition, 1) 331 332 pub.StopVerifyNoError() 333} 334 335func TestSinglePartitionPublisherInvalidCursorOffsets(t *testing.T) { 336 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 337 338 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 339 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 340 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 341 342 verifiers := test.NewVerifiers(t) 343 stream := test.NewRPCVerifier(t) 344 stream.Push(initPubReq(topic), initPubResp(), nil) 345 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(4), nil) 346 // The server returns an inconsistent cursor offset for msg2, which causes the 347 // publisher client to fail permanently. 348 stream.Push(msgPubReq(msg2), msgPubResp(4), nil) 349 stream.Push(msgPubReq(msg3), msgPubResp(5), nil) 350 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 351 352 mockServer.OnTestStart(verifiers) 353 defer mockServer.OnTestEnd() 354 355 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 356 if gotErr := pub.StartError(); gotErr != nil { 357 t.Errorf("Start() got err: (%v)", gotErr) 358 } 359 360 result1 := pub.Publish(msg1) 361 result2 := pub.Publish(msg2) 362 result3 := pub.Publish(msg3) 363 barrier.Release() 364 365 result1.ValidateResult(topic.Partition, 4) 366 367 // msg2 and subsequent messages are errored. 368 wantMsg := "server returned publish response with inconsistent start offset" 369 result2.ValidateErrorMsg(wantMsg) 370 result3.ValidateErrorMsg(wantMsg) 371 if gotErr := pub.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) { 372 t.Errorf("Publisher final err: (%v), want msg: %q", gotErr, wantMsg) 373 } 374} 375 376func TestSinglePartitionPublisherInvalidServerPublishResponse(t *testing.T) { 377 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 378 msg := &pb.PubSubMessage{Data: []byte{'1'}} 379 380 verifiers := test.NewVerifiers(t) 381 stream := test.NewRPCVerifier(t) 382 stream.Push(initPubReq(topic), initPubResp(), nil) 383 // Server sends duplicate initial publish response, which causes the publisher 384 // client to fail permanently. 385 stream.Push(msgPubReq(msg), initPubResp(), nil) 386 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 387 388 mockServer.OnTestStart(verifiers) 389 defer mockServer.OnTestEnd() 390 391 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 392 if gotErr := pub.StartError(); gotErr != nil { 393 t.Errorf("Start() got err: (%v)", gotErr) 394 } 395 396 result := pub.Publish(msg) 397 398 wantErr := errInvalidMsgPubResponse 399 result.ValidateError(wantErr) 400 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) { 401 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr) 402 } 403} 404 405func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) { 406 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 407 finalErr := status.Error(codes.FailedPrecondition, "invalid message") 408 409 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 410 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 411 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 412 msg4 := &pb.PubSubMessage{Data: []byte{'4'}} 413 414 verifiers := test.NewVerifiers(t) 415 stream := test.NewRPCVerifier(t) 416 stream.Push(initPubReq(topic), initPubResp(), nil) 417 barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(5), nil) 418 stream.Push(msgPubReq(msg2), msgPubResp(6), nil) 419 stream.Push(msgPubReq(msg3), nil, finalErr) 420 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 421 422 mockServer.OnTestStart(verifiers) 423 defer mockServer.OnTestEnd() 424 425 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 426 if gotErr := pub.StartError(); gotErr != nil { 427 t.Errorf("Start() got err: (%v)", gotErr) 428 } 429 430 result1 := pub.Publish(msg1) 431 result2 := pub.Publish(msg2) 432 result3 := pub.Publish(msg3) 433 pub.Stop() 434 barrier.Release() 435 result4 := pub.Publish(msg4) 436 437 // First 2 messages should be allowed to complete. 438 result1.ValidateResult(topic.Partition, 5) 439 result2.ValidateResult(topic.Partition, 6) 440 // msg3 failed with a server error, which should result in the publisher 441 // terminating with an error. 442 result3.ValidateError(finalErr) 443 // msg4 was sent after the user called Stop(), so should fail immediately with 444 // ErrServiceStopped. 445 result4.ValidateError(ErrServiceStopped) 446 447 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, finalErr) { 448 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr) 449 } 450} 451 452func TestSinglePartitionPublisherPublishWhileStarting(t *testing.T) { 453 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 454 msg := &pb.PubSubMessage{Data: []byte{'1'}} 455 456 verifiers := test.NewVerifiers(t) 457 stream := test.NewRPCVerifier(t) 458 stream.Push(initPubReq(topic), initPubResp(), nil) 459 stream.Push(msgPubReq(msg), msgPubResp(42), nil) 460 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 461 462 mockServer.OnTestStart(verifiers) 463 defer mockServer.OnTestEnd() 464 465 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 466 467 // Did not wait for publisher to finish startup. But it should send msg once 468 // the Publish stream connects. 469 result := pub.Publish(msg) 470 result.ValidateResult(topic.Partition, 42) 471 472 pub.StopVerifyNoError() 473} 474 475func TestSinglePartitionPublisherPublishWhileStartingFails(t *testing.T) { 476 topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0} 477 msg := &pb.PubSubMessage{Data: []byte{'1'}} 478 serverErr := status.Error(codes.FailedPrecondition, "failed") 479 480 verifiers := test.NewVerifiers(t) 481 stream := test.NewRPCVerifier(t) 482 barrier := stream.PushWithBarrier(initPubReq(topic), nil, serverErr) 483 verifiers.AddPublishStream(topic.Path, topic.Partition, stream) 484 485 mockServer.OnTestStart(verifiers) 486 defer mockServer.OnTestEnd() 487 488 pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings()) 489 490 // Published during startup. 491 result := pub.Publish(msg) 492 // Send the initial response (with error) to complete startup. 493 barrier.Release() 494 495 result.ValidateError(serverErr) 496 if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { 497 t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, serverErr) 498 } 499} 500 501// testRoutingPublisher wraps a routingPublisher for testing. 502type testRoutingPublisher struct { 503 t *testing.T 504 pub *routingPublisher 505} 506 507func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher { 508 ctx := context.Background() 509 pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) 510 if err != nil { 511 t.Fatal(err) 512 } 513 adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn()) 514 if err != nil { 515 t.Fatal(err) 516 } 517 allClients := apiClients{pubClient, adminClient} 518 519 source := &test.FakeSource{Ret: fakeSourceVal} 520 msgRouterFactory := newMessageRouterFactory(rand.New(source)) 521 pubFactory := &singlePartitionPublisherFactory{ 522 ctx: ctx, 523 pubClient: pubClient, 524 settings: settings, 525 topicPath: topicPath, 526 } 527 pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory) 528 pub.Start() 529 return &testRoutingPublisher{t: t, pub: pub} 530} 531 532func (tp *testRoutingPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver { 533 result := newTestPublishResultReceiver(tp.t, msg) 534 tp.pub.Publish(msg, result.set) 535 return result 536} 537 538func (tp *testRoutingPublisher) NumPartitionPublishers() int { 539 tp.pub.mu.Lock() 540 defer tp.pub.mu.Unlock() 541 return len(tp.pub.publishers) 542} 543 544func (tp *testRoutingPublisher) Start() { tp.pub.Start() } 545func (tp *testRoutingPublisher) Stop() { tp.pub.Stop() } 546func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() } 547func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() } 548 549func TestRoutingPublisherStartOnce(t *testing.T) { 550 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 551 numPartitions := 2 552 553 verifiers := test.NewVerifiers(t) 554 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 555 556 stream0 := test.NewRPCVerifier(t) 557 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 558 verifiers.AddPublishStream(topic, 0, stream0) 559 560 stream1 := test.NewRPCVerifier(t) 561 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 562 verifiers.AddPublishStream(topic, 1, stream1) 563 564 mockServer.OnTestStart(verifiers) 565 defer mockServer.OnTestEnd() 566 567 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 568 569 t.Run("First succeeds", func(t *testing.T) { 570 // Note: newTestRoutingPublisher() called Start. 571 if gotErr := pub.WaitStarted(); gotErr != nil { 572 t.Errorf("Start() got err: (%v)", gotErr) 573 } 574 if got, want := pub.NumPartitionPublishers(), numPartitions; got != want { 575 t.Errorf("Num partition publishers: got %d, want %d", got, want) 576 } 577 }) 578 t.Run("Second no-op", func(t *testing.T) { 579 // An error is not returned, but no new streams are opened. The mock server 580 // does not expect more RPCs. 581 pub.Start() 582 if gotErr := pub.WaitStarted(); gotErr != nil { 583 t.Errorf("Start() got err: (%v)", gotErr) 584 } 585 }) 586 587 pub.Stop() 588 if gotErr := pub.WaitStopped(); gotErr != nil { 589 t.Errorf("Stop() got err: (%v)", gotErr) 590 } 591} 592 593func TestRoutingPublisherStartStop(t *testing.T) { 594 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 595 numPartitions := 2 596 597 verifiers := test.NewVerifiers(t) 598 barrier := verifiers.GlobalVerifier.PushWithBarrier(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 599 600 mockServer.OnTestStart(verifiers) 601 defer mockServer.OnTestEnd() 602 603 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 604 pub.Stop() 605 barrier.Release() 606 607 if gotErr := pub.WaitStopped(); gotErr != nil { 608 t.Errorf("Stop() got err: (%v)", gotErr) 609 } 610 // No publishers should be created. 611 if got, want := pub.NumPartitionPublishers(), 0; got != want { 612 t.Errorf("Num partition publishers: got %d, want %d", got, want) 613 } 614} 615 616func TestRoutingPublisherRoundRobin(t *testing.T) { 617 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 618 numPartitions := 3 619 620 // Messages have no ordering key, so the roundRobinMsgRouter is used. 621 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 622 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 623 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 624 msg4 := &pb.PubSubMessage{Data: []byte{'4'}} 625 626 verifiers := test.NewVerifiers(t) 627 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 628 629 // Partition 0 630 stream0 := test.NewRPCVerifier(t) 631 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 632 stream0.Push(msgPubReq(msg3), msgPubResp(34), nil) 633 verifiers.AddPublishStream(topic, 0, stream0) 634 635 // Partition 1 636 stream1 := test.NewRPCVerifier(t) 637 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 638 stream1.Push(msgPubReq(msg1), msgPubResp(41), nil) 639 stream1.Push(msgPubReq(msg4), msgPubResp(42), nil) 640 verifiers.AddPublishStream(topic, 1, stream1) 641 642 // Partition 2 643 stream2 := test.NewRPCVerifier(t) 644 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 645 stream2.Push(msgPubReq(msg2), msgPubResp(78), nil) 646 verifiers.AddPublishStream(topic, 2, stream2) 647 648 mockServer.OnTestStart(verifiers) 649 defer mockServer.OnTestEnd() 650 651 // Note: The fake source is initialized with value=1, so Partition=1 publisher 652 // will be the first chosen by the roundRobinMsgRouter. 653 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1) 654 if err := pub.WaitStarted(); err != nil { 655 t.Errorf("Start() got err: (%v)", err) 656 } 657 658 result1 := pub.Publish(msg1) 659 result2 := pub.Publish(msg2) 660 result3 := pub.Publish(msg3) 661 result4 := pub.Publish(msg4) 662 663 result1.ValidateResult(1, 41) 664 result2.ValidateResult(2, 78) 665 result3.ValidateResult(0, 34) 666 result4.ValidateResult(1, 42) 667 668 pub.Stop() 669 if err := pub.WaitStopped(); err != nil { 670 t.Errorf("Stop() got err: (%v)", err) 671 } 672} 673 674func TestRoutingPublisherHashing(t *testing.T) { 675 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 676 numPartitions := 3 677 678 key0 := []byte("bar") // hashes to partition 0 679 key1 := []byte("baz") // hashes to partition 1 680 key2 := []byte("foo") // hashes to partition 2 681 682 // Messages have ordering key, so the hashingMsgRouter is used. 683 msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2} 684 msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0} 685 msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2} 686 msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1} 687 msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0} 688 689 verifiers := test.NewVerifiers(t) 690 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 691 692 // Partition 0 693 stream0 := test.NewRPCVerifier(t) 694 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 695 stream0.Push(msgPubReq(msg2), msgPubResp(20), nil) 696 stream0.Push(msgPubReq(msg5), msgPubResp(21), nil) 697 verifiers.AddPublishStream(topic, 0, stream0) 698 699 // Partition 1 700 stream1 := test.NewRPCVerifier(t) 701 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 702 stream1.Push(msgPubReq(msg4), msgPubResp(40), nil) 703 verifiers.AddPublishStream(topic, 1, stream1) 704 705 // Partition 2 706 stream2 := test.NewRPCVerifier(t) 707 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 708 stream2.Push(msgPubReq(msg1), msgPubResp(10), nil) 709 stream2.Push(msgPubReq(msg3), msgPubResp(11), nil) 710 verifiers.AddPublishStream(topic, 2, stream2) 711 712 mockServer.OnTestStart(verifiers) 713 defer mockServer.OnTestEnd() 714 715 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 716 if err := pub.WaitStarted(); err != nil { 717 t.Errorf("Start() got err: (%v)", err) 718 } 719 720 result1 := pub.Publish(msg1) 721 result2 := pub.Publish(msg2) 722 result3 := pub.Publish(msg3) 723 result4 := pub.Publish(msg4) 724 result5 := pub.Publish(msg5) 725 726 result1.ValidateResult(2, 10) 727 result2.ValidateResult(0, 20) 728 result3.ValidateResult(2, 11) 729 result4.ValidateResult(1, 40) 730 result5.ValidateResult(0, 21) 731 732 pub.Stop() 733 if err := pub.WaitStopped(); err != nil { 734 t.Errorf("Stop() got err: (%v)", err) 735 } 736} 737 738func TestRoutingPublisherPermanentError(t *testing.T) { 739 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 740 numPartitions := 2 741 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 742 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 743 serverErr := status.Error(codes.FailedPrecondition, "failed") 744 745 verifiers := test.NewVerifiers(t) 746 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 747 748 // Partition 0 749 stream0 := test.NewRPCVerifier(t) 750 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 751 stream0.Push(msgPubReq(msg1), msgPubResp(34), nil) 752 verifiers.AddPublishStream(topic, 0, stream0) 753 754 // Partition 1. Fails due to permanent error, which will also shut down 755 // partition-0 publisher, but it should be allowed to flush its pending 756 // messages. 757 stream1 := test.NewRPCVerifier(t) 758 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 759 stream1.Push(msgPubReq(msg2), nil, serverErr) 760 verifiers.AddPublishStream(topic, 1, stream1) 761 762 mockServer.OnTestStart(verifiers) 763 defer mockServer.OnTestEnd() 764 765 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 766 if err := pub.WaitStarted(); err != nil { 767 t.Errorf("Start() got err: (%v)", err) 768 } 769 770 result1 := pub.Publish(msg1) 771 result2 := pub.Publish(msg2) 772 773 result1.ValidateResult(0, 34) 774 result2.ValidateError(serverErr) 775 776 if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { 777 t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) 778 } 779} 780 781func TestRoutingPublisherPublishAfterStop(t *testing.T) { 782 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 783 numPartitions := 2 784 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 785 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 786 787 verifiers := test.NewVerifiers(t) 788 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) 789 790 // Partition 0 791 stream0 := test.NewRPCVerifier(t) 792 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 793 verifiers.AddPublishStream(topic, 0, stream0) 794 795 // Partition 1 796 stream1 := test.NewRPCVerifier(t) 797 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 798 verifiers.AddPublishStream(topic, 1, stream1) 799 800 mockServer.OnTestStart(verifiers) 801 defer mockServer.OnTestEnd() 802 803 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 804 if err := pub.WaitStarted(); err != nil { 805 t.Errorf("Start() got err: (%v)", err) 806 } 807 808 pub.Stop() 809 result1 := pub.Publish(msg1) 810 result2 := pub.Publish(msg2) 811 812 result1.ValidateError(ErrServiceStopped) 813 result2.ValidateError(ErrServiceStopped) 814 815 if err := pub.WaitStopped(); err != nil { 816 t.Errorf("Stop() got err: (%v)", err) 817 } 818} 819 820func TestRoutingPublisherPartitionCountFail(t *testing.T) { 821 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 822 wantErr := status.Error(codes.NotFound, "no exist") 823 824 // Retrieving the number of partitions results in an error. Startup cannot 825 // proceed. 826 verifiers := test.NewVerifiers(t) 827 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, wantErr) 828 829 mockServer.OnTestStart(verifiers) 830 defer mockServer.OnTestEnd() 831 832 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 833 834 if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) { 835 t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr) 836 } 837 if got, want := pub.NumPartitionPublishers(), 0; got != want { 838 t.Errorf("Num partition publishers: got %d, want %d", got, want) 839 } 840 841 // Verify that the publisher does not attempt to restart. The mock server does 842 // not expect more RPCs. 843 pub.Start() 844} 845 846func TestRoutingPublisherPartitionCountInvalid(t *testing.T) { 847 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 848 849 // The number of partitions returned by the server must be valid, otherwise 850 // startup cannot proceed. 851 verifiers := test.NewVerifiers(t) 852 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil) 853 854 mockServer.OnTestStart(verifiers) 855 defer mockServer.OnTestEnd() 856 857 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 858 859 wantMsg := "topic has invalid number of partitions" 860 if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) { 861 t.Errorf("Start() got err: (%v), want msg: %q", gotErr, wantMsg) 862 } 863 if got, want := pub.NumPartitionPublishers(), 0; got != want { 864 t.Errorf("Num partition publishers: got %d, want %d", got, want) 865 } 866} 867 868func TestRoutingPublisherPartitionCountIncreases(t *testing.T) { 869 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 870 initialPartitionCount := 1 871 updatedPartitionCount := 3 872 msg1 := &pb.PubSubMessage{Data: []byte{'1'}} 873 msg2 := &pb.PubSubMessage{Data: []byte{'2'}} 874 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} 875 876 verifiers := test.NewVerifiers(t) 877 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 878 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) 879 880 stream0 := test.NewRPCVerifier(t) 881 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 882 stream0.Push(msgPubReq(msg1), msgPubResp(11), nil) 883 verifiers.AddPublishStream(topic, 0, stream0) 884 885 stream1 := test.NewRPCVerifier(t) 886 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 887 stream1.Push(msgPubReq(msg2), msgPubResp(22), nil) 888 verifiers.AddPublishStream(topic, 1, stream1) 889 890 stream2 := test.NewRPCVerifier(t) 891 stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) 892 stream2.Push(msgPubReq(msg3), msgPubResp(33), nil) 893 verifiers.AddPublishStream(topic, 2, stream2) 894 895 mockServer.OnTestStart(verifiers) 896 defer mockServer.OnTestEnd() 897 898 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 899 900 t.Run("Initial count", func(t *testing.T) { 901 if gotErr := pub.WaitStarted(); gotErr != nil { 902 t.Errorf("Start() got err: (%v)", gotErr) 903 } 904 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 905 t.Errorf("Num partition publishers: got %d, want %d", got, want) 906 } 907 }) 908 t.Run("Updated count", func(t *testing.T) { 909 pub.pub.partitionWatcher.updatePartitionCount() 910 if got, want := pub.NumPartitionPublishers(), updatedPartitionCount; got != want { 911 t.Errorf("Num partition publishers: got %d, want %d", got, want) 912 } 913 }) 914 t.Run("Publish", func(t *testing.T) { 915 result1 := pub.Publish(msg1) 916 result2 := pub.Publish(msg2) 917 result3 := pub.Publish(msg3) 918 919 result1.ValidateResult(0, 11) 920 result2.ValidateResult(1, 22) 921 result3.ValidateResult(2, 33) 922 }) 923 924 pub.Stop() 925 if gotErr := pub.WaitStopped(); gotErr != nil { 926 t.Errorf("Stop() got err: (%v)", gotErr) 927 } 928} 929 930func TestRoutingPublisherPartitionCountDecreases(t *testing.T) { 931 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 932 initialPartitionCount := 2 933 updatedPartitionCount := 1 934 935 verifiers := test.NewVerifiers(t) 936 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 937 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) 938 939 stream0 := test.NewRPCVerifier(t) 940 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 941 verifiers.AddPublishStream(topic, 0, stream0) 942 943 stream1 := test.NewRPCVerifier(t) 944 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 945 verifiers.AddPublishStream(topic, 1, stream1) 946 947 mockServer.OnTestStart(verifiers) 948 defer mockServer.OnTestEnd() 949 950 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 951 952 t.Run("Initial count", func(t *testing.T) { 953 if gotErr := pub.WaitStarted(); gotErr != nil { 954 t.Errorf("Start() got err: (%v)", gotErr) 955 } 956 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 957 t.Errorf("Num partition publishers: got %d, want %d", got, want) 958 } 959 }) 960 t.Run("Updated count", func(t *testing.T) { 961 pub.pub.partitionWatcher.updatePartitionCount() 962 963 // Decreasing count ignored. 964 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 965 t.Errorf("Num partition publishers: got %d, want %d", got, want) 966 } 967 }) 968 969 pub.Stop() 970 if gotErr := pub.WaitStopped(); gotErr != nil { 971 t.Errorf("Stop() got err: (%v)", gotErr) 972 } 973} 974 975func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) { 976 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 977 initialPartitionCount := 2 978 serverErr := status.Error(codes.NotFound, "deleted") 979 980 verifiers := test.NewVerifiers(t) 981 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) 982 verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr) 983 984 stream0 := test.NewRPCVerifier(t) 985 stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) 986 verifiers.AddPublishStream(topic, 0, stream0) 987 988 stream1 := test.NewRPCVerifier(t) 989 stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) 990 verifiers.AddPublishStream(topic, 1, stream1) 991 992 mockServer.OnTestStart(verifiers) 993 defer mockServer.OnTestEnd() 994 995 pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) 996 997 t.Run("Initial count", func(t *testing.T) { 998 if gotErr := pub.WaitStarted(); gotErr != nil { 999 t.Errorf("Start() got err: (%v)", gotErr) 1000 } 1001 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 1002 t.Errorf("Num partition publishers: got %d, want %d", got, want) 1003 } 1004 }) 1005 t.Run("Failed update", func(t *testing.T) { 1006 pub.pub.partitionWatcher.updatePartitionCount() 1007 1008 // Failed update ignored. 1009 if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want { 1010 t.Errorf("Num partition publishers: got %d, want %d", got, want) 1011 } 1012 }) 1013 1014 pub.Stop() 1015 if gotErr := pub.WaitStopped(); gotErr != nil { 1016 t.Errorf("Stop() got err: (%v)", gotErr) 1017 } 1018} 1019 1020func TestNewPublisherValidatesSettings(t *testing.T) { 1021 const topic = "projects/123456/locations/us-central1-b/topics/my-topic" 1022 const region = "us-central1" 1023 1024 settings := DefaultPublishSettings 1025 settings.DelayThreshold = 0 1026 if _, err := NewPublisher(context.Background(), settings, region, topic); err == nil { 1027 t.Error("NewPublisher() did not return error") 1028 } 1029} 1030