1// Copyright 2020 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13 14package wire 15 16import ( 17 "context" 18 "sort" 19 "testing" 20 "time" 21 22 "cloud.google.com/go/internal/testutil" 23 "cloud.google.com/go/pubsublite/internal/test" 24 "github.com/google/go-cmp/cmp/cmpopts" 25 "google.golang.org/grpc/codes" 26 "google.golang.org/grpc/status" 27 "google.golang.org/protobuf/proto" 28 29 pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" 30) 31 32func testSubscriberSettings() ReceiveSettings { 33 settings := testReceiveSettings() 34 settings.MaxOutstandingMessages = 10 35 settings.MaxOutstandingBytes = 1000 36 return settings 37} 38 39// initFlowControlReq returns the first expected flow control request when 40// testSubscriberSettings are used. 41func initFlowControlReq() *pb.SubscribeRequest { 42 return flowControlSubReq(flowControlTokens{Bytes: 1000, Messages: 10}) 43} 44 45func partitionMsgs(partition int, msgs ...*pb.SequencedMessage) []*ReceivedMessage { 46 var received []*ReceivedMessage 47 for _, msg := range msgs { 48 received = append(received, &ReceivedMessage{Msg: msg, Partition: partition}) 49 } 50 return received 51} 52 53func join(args ...[]*ReceivedMessage) []*ReceivedMessage { 54 var received []*ReceivedMessage 55 for _, msgs := range args { 56 received = append(received, msgs...) 57 } 58 return received 59} 60 61type testMessageReceiver struct { 62 t *testing.T 63 received chan *ReceivedMessage 64} 65 66func newTestMessageReceiver(t *testing.T) *testMessageReceiver { 67 return &testMessageReceiver{ 68 t: t, 69 received: make(chan *ReceivedMessage, 5), 70 } 71} 72 73func (tr *testMessageReceiver) onMessage(msg *ReceivedMessage) { 74 tr.received <- msg 75} 76 77func (tr *testMessageReceiver) ValidateMsg(want *pb.SequencedMessage) AckConsumer { 78 select { 79 case <-time.After(serviceTestWaitTimeout): 80 tr.t.Errorf("Message (%v) not received within %v", want, serviceTestWaitTimeout) 81 return nil 82 case got := <-tr.received: 83 if !proto.Equal(got.Msg, want) { 84 tr.t.Errorf("Received message: got (%v), want (%v)", got.Msg, want) 85 } 86 return got.Ack 87 } 88} 89 90type ByMsgOffset []*ReceivedMessage 91 92func (m ByMsgOffset) Len() int { return len(m) } 93func (m ByMsgOffset) Swap(i, j int) { m[i], m[j] = m[j], m[i] } 94func (m ByMsgOffset) Less(i, j int) bool { 95 return m[i].Msg.GetCursor().GetOffset() < m[j].Msg.GetCursor().GetOffset() 96} 97 98func (tr *testMessageReceiver) ValidateMsgs(want []*ReceivedMessage) { 99 var got []*ReceivedMessage 100 for count := 0; count < len(want); count++ { 101 select { 102 case <-time.After(serviceTestWaitTimeout): 103 tr.t.Errorf("Received messages count: got %d, want %d", count, len(want)) 104 case received := <-tr.received: 105 received.Ack.Ack() 106 got = append(got, received) 107 } 108 } 109 110 sort.Sort(ByMsgOffset(want)) 111 sort.Sort(ByMsgOffset(got)) 112 if !testutil.Equal(got, want, cmpopts.IgnoreFields(ReceivedMessage{}, "Ack")) { 113 tr.t.Errorf("Received messages: got: %v\nwant: %v", got, want) 114 } 115} 116 117func (tr *testMessageReceiver) VerifyNoMsgs() { 118 select { 119 case got := <-tr.received: 120 tr.t.Errorf("Got unexpected message: %v", got.Msg) 121 case <-time.After(20 * time.Millisecond): 122 // Wait to ensure no messages received. 123 } 124} 125 126// testBlockingMessageReceiver can be used to simulate a client message receiver 127// func that is blocking due to slow message processing. 128type testBlockingMessageReceiver struct { 129 blockReceive chan struct{} 130 131 testMessageReceiver 132} 133 134func newTestBlockingMessageReceiver(t *testing.T) *testBlockingMessageReceiver { 135 return &testBlockingMessageReceiver{ 136 testMessageReceiver: testMessageReceiver{ 137 t: t, 138 received: make(chan *ReceivedMessage, 5), 139 }, 140 blockReceive: make(chan struct{}), 141 } 142} 143 144// onMessage is the message receiver func and blocks until there is a call to 145// Return(). 146func (tr *testBlockingMessageReceiver) onMessage(msg *ReceivedMessage) { 147 tr.testMessageReceiver.onMessage(msg) 148 <-tr.blockReceive 149} 150 151// Return signals onMessage to return. 152func (tr *testBlockingMessageReceiver) Return() { 153 var void struct{} 154 tr.blockReceive <- void 155} 156 157func TestMessageDeliveryQueue(t *testing.T) { 158 acks := newAckTracker() 159 receiver := newTestMessageReceiver(t) 160 messageQueue := newMessageDeliveryQueue(acks, receiver.onMessage, 10) 161 162 t.Run("Add before start", func(t *testing.T) { 163 msg1 := seqMsgWithOffset(1) 164 ack1 := newAckConsumer(1, 0, nil) 165 messageQueue.Add(&ReceivedMessage{Msg: msg1, Ack: ack1}) 166 167 receiver.VerifyNoMsgs() 168 }) 169 170 t.Run("Add after start", func(t *testing.T) { 171 msg2 := seqMsgWithOffset(2) 172 ack2 := newAckConsumer(2, 0, nil) 173 msg3 := seqMsgWithOffset(3) 174 ack3 := newAckConsumer(3, 0, nil) 175 176 messageQueue.Start() 177 messageQueue.Start() // Check duplicate starts 178 messageQueue.Add(&ReceivedMessage{Msg: msg2, Ack: ack2}) 179 messageQueue.Add(&ReceivedMessage{Msg: msg3, Ack: ack3}) 180 181 receiver.ValidateMsg(msg2) 182 receiver.ValidateMsg(msg3) 183 }) 184 185 t.Run("Add after stop", func(t *testing.T) { 186 msg4 := seqMsgWithOffset(4) 187 ack4 := newAckConsumer(4, 0, nil) 188 189 messageQueue.Stop() 190 messageQueue.Stop() // Check duplicate stop 191 messageQueue.Add(&ReceivedMessage{Msg: msg4, Ack: ack4}) 192 193 receiver.VerifyNoMsgs() 194 }) 195} 196 197// testSubscribeStream wraps a subscribeStream for ease of testing. 198type testSubscribeStream struct { 199 Receiver *testMessageReceiver 200 t *testing.T 201 sub *subscribeStream 202 serviceTestProxy 203} 204 205func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream { 206 ctx := context.Background() 207 subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) 208 if err != nil { 209 t.Fatal(err) 210 } 211 212 ts := &testSubscribeStream{ 213 Receiver: newTestMessageReceiver(t), 214 t: t, 215 } 216 ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true) 217 ts.initAndStart(t, ts.sub, "Subscriber", subClient) 218 return ts 219} 220 221// SendBatchFlowControl invokes the periodic background batch flow control. Note 222// that the periodic task is disabled in tests. 223func (ts *testSubscribeStream) SendBatchFlowControl() { 224 ts.sub.sendBatchFlowControl() 225} 226 227func TestSubscribeStreamReconnect(t *testing.T) { 228 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 229 acks := newAckTracker() 230 msg1 := seqMsgWithOffsetAndSize(67, 200) 231 msg2 := seqMsgWithOffsetAndSize(68, 100) 232 permanentErr := status.Error(codes.FailedPrecondition, "permanent failure") 233 234 verifiers := test.NewVerifiers(t) 235 236 stream1 := test.NewRPCVerifier(t) 237 stream1.Push(initSubReq(subscription), initSubResp(), nil) 238 stream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) 239 stream1.Push(nil, nil, status.Error(codes.Unavailable, "server unavailable")) 240 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1) 241 242 // When reconnected, the subscribeStream should seek to msg2 and have 243 // subtracted flow control tokens. 244 stream2 := test.NewRPCVerifier(t) 245 stream2.Push(initSubReq(subscription), initSubResp(), nil) 246 stream2.Push(seekReq(68), seekResp(68), nil) 247 stream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 9}), msgSubResp(msg2), nil) 248 // Subscriber should terminate on permanent error. 249 stream2.Push(nil, nil, permanentErr) 250 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2) 251 252 mockServer.OnTestStart(verifiers) 253 defer mockServer.OnTestEnd() 254 255 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 256 if gotErr := sub.StartError(); gotErr != nil { 257 t.Errorf("Start() got err: (%v)", gotErr) 258 } 259 sub.Receiver.ValidateMsg(msg1) 260 sub.Receiver.ValidateMsg(msg2) 261 if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, permanentErr) { 262 t.Errorf("Final err: (%v), want: (%v)", gotErr, permanentErr) 263 } 264} 265 266func TestSubscribeStreamFlowControlBatching(t *testing.T) { 267 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 268 acks := newAckTracker() 269 msg1 := seqMsgWithOffsetAndSize(67, 200) 270 msg2 := seqMsgWithOffsetAndSize(68, 100) 271 serverErr := status.Error(codes.InvalidArgument, "verifies flow control received") 272 273 verifiers := test.NewVerifiers(t) 274 stream := test.NewRPCVerifier(t) 275 stream.Push(initSubReq(subscription), initSubResp(), nil) 276 stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) 277 // Batch flow control request expected. 278 stream.Push(flowControlSubReq(flowControlTokens{Bytes: 300, Messages: 2}), nil, serverErr) 279 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 280 281 mockServer.OnTestStart(verifiers) 282 defer mockServer.OnTestEnd() 283 284 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 285 if gotErr := sub.StartError(); gotErr != nil { 286 t.Errorf("Start() got err: (%v)", gotErr) 287 } 288 sub.Receiver.ValidateMsg(msg1) 289 sub.Receiver.ValidateMsg(msg2) 290 sub.sub.onAckAsync(msg1.SizeBytes) 291 sub.sub.onAckAsync(msg2.SizeBytes) 292 sub.sub.sendBatchFlowControl() 293 if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { 294 t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) 295 } 296} 297 298func TestSubscribeStreamExpediteFlowControl(t *testing.T) { 299 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 300 acks := newAckTracker() 301 msg1 := seqMsgWithOffsetAndSize(67, 250) 302 // MaxOutstandingBytes = 1000, so msg2 pushes the pending flow control bytes 303 // over the expediteBatchRequestRatio=50% threshold in flowControlBatcher. 304 msg2 := seqMsgWithOffsetAndSize(68, 251) 305 serverErr := status.Error(codes.InvalidArgument, "verifies flow control received") 306 307 verifiers := test.NewVerifiers(t) 308 stream := test.NewRPCVerifier(t) 309 stream.Push(initSubReq(subscription), initSubResp(), nil) 310 stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) 311 // Batch flow control request expected. 312 stream.Push(flowControlSubReq(flowControlTokens{Bytes: 501, Messages: 2}), nil, serverErr) 313 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 314 315 mockServer.OnTestStart(verifiers) 316 defer mockServer.OnTestEnd() 317 318 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 319 if gotErr := sub.StartError(); gotErr != nil { 320 t.Errorf("Start() got err: (%v)", gotErr) 321 } 322 sub.Receiver.ValidateMsg(msg1) 323 sub.Receiver.ValidateMsg(msg2) 324 sub.sub.onAckAsync(msg1.SizeBytes) 325 sub.sub.onAckAsync(msg2.SizeBytes) 326 // Note: the ack for msg2 automatically triggers sending the flow control. 327 if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) { 328 t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr) 329 } 330} 331 332func TestSubscribeStreamInvalidInitialResponse(t *testing.T) { 333 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 334 acks := newAckTracker() 335 336 verifiers := test.NewVerifiers(t) 337 stream := test.NewRPCVerifier(t) 338 stream.Push(initSubReq(subscription), seekResp(0), nil) // Seek instead of init response 339 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 340 341 mockServer.OnTestStart(verifiers) 342 defer mockServer.OnTestEnd() 343 344 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 345 if gotErr, wantErr := sub.StartError(), errInvalidInitialSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) { 346 t.Errorf("Start got err: (%v), want: (%v)", gotErr, wantErr) 347 } 348} 349 350func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) { 351 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 352 acks := newAckTracker() 353 354 verifiers := test.NewVerifiers(t) 355 stream := test.NewRPCVerifier(t) 356 stream.Push(initSubReq(subscription), initSubResp(), nil) 357 stream.Push(initFlowControlReq(), initSubResp(), nil) // Second initial response 358 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 359 360 mockServer.OnTestStart(verifiers) 361 defer mockServer.OnTestEnd() 362 363 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 364 if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) { 365 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 366 } 367} 368 369func TestSubscribeStreamSpuriousSeekResponse(t *testing.T) { 370 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 371 acks := newAckTracker() 372 373 verifiers := test.NewVerifiers(t) 374 stream := test.NewRPCVerifier(t) 375 stream.Push(initSubReq(subscription), initSubResp(), nil) 376 stream.Push(initFlowControlReq(), seekResp(1), nil) // Seek response with no seek request 377 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 378 379 mockServer.OnTestStart(verifiers) 380 defer mockServer.OnTestEnd() 381 382 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 383 if gotErr, wantErr := sub.FinalError(), errNoInFlightSeek; !test.ErrorEqual(gotErr, wantErr) { 384 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 385 } 386} 387 388func TestSubscribeStreamNoMessages(t *testing.T) { 389 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 390 acks := newAckTracker() 391 392 verifiers := test.NewVerifiers(t) 393 stream := test.NewRPCVerifier(t) 394 stream.Push(initSubReq(subscription), initSubResp(), nil) 395 stream.Push(initFlowControlReq(), msgSubResp(), nil) // No messages in response 396 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 397 398 mockServer.OnTestStart(verifiers) 399 defer mockServer.OnTestEnd() 400 401 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 402 if gotErr, wantErr := sub.FinalError(), errServerNoMessages; !test.ErrorEqual(gotErr, wantErr) { 403 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 404 } 405} 406 407func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) { 408 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 409 acks := newAckTracker() 410 msg1 := seqMsgWithOffsetAndSize(56, 100) 411 msg2 := seqMsgWithOffsetAndSize(55, 100) // Offset before msg1 412 413 verifiers := test.NewVerifiers(t) 414 stream := test.NewRPCVerifier(t) 415 stream.Push(initSubReq(subscription), initSubResp(), nil) 416 stream.Push(initFlowControlReq(), msgSubResp(msg1), nil) 417 stream.Push(nil, msgSubResp(msg2), nil) 418 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 419 420 mockServer.OnTestStart(verifiers) 421 defer mockServer.OnTestEnd() 422 423 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 424 sub.Receiver.ValidateMsg(msg1) 425 if gotErr, msg := sub.FinalError(), "start offset = 55, expected >= 57"; !test.ErrorHasMsg(gotErr, msg) { 426 t.Errorf("Final err: (%v), want msg: %q", gotErr, msg) 427 } 428} 429 430func TestSubscribeStreamFlowControlOverflow(t *testing.T) { 431 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 432 acks := newAckTracker() 433 msg1 := seqMsgWithOffsetAndSize(56, 900) 434 msg2 := seqMsgWithOffsetAndSize(57, 101) // Overflows ReceiveSettings.MaxOutstandingBytes = 1000 435 436 verifiers := test.NewVerifiers(t) 437 stream := test.NewRPCVerifier(t) 438 stream.Push(initSubReq(subscription), initSubResp(), nil) 439 stream.Push(initFlowControlReq(), msgSubResp(msg1), nil) 440 stream.Push(nil, msgSubResp(msg2), nil) 441 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream) 442 443 mockServer.OnTestStart(verifiers) 444 defer mockServer.OnTestEnd() 445 446 sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks) 447 sub.Receiver.ValidateMsg(msg1) 448 if gotErr, wantErr := sub.FinalError(), errTokenCounterBytesNegative; !test.ErrorEqual(gotErr, wantErr) { 449 t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr) 450 } 451} 452 453type testSinglePartitionSubscriber singlePartitionSubscriber 454 455func (t *testSinglePartitionSubscriber) WaitStopped() error { 456 err := t.compositeService.WaitStopped() 457 // Close connections. 458 t.committer.cursorClient.Close() 459 t.subscriber.subClient.Close() 460 return err 461} 462 463func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscription subscriptionPartition) *testSinglePartitionSubscriber { 464 ctx := context.Background() 465 subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) 466 if err != nil { 467 t.Fatal(err) 468 } 469 cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) 470 if err != nil { 471 t.Fatal(err) 472 } 473 474 f := &singlePartitionSubscriberFactory{ 475 ctx: ctx, 476 subClient: subClient, 477 cursorClient: cursorClient, 478 settings: testSubscriberSettings(), 479 subscriptionPath: subscription.Path, 480 receiver: receiverFunc, 481 disableTasks: true, // Background tasks disabled to control event order 482 } 483 sub := f.New(subscription.Partition) 484 sub.Start() 485 return (*testSinglePartitionSubscriber)(sub) 486} 487 488func TestSinglePartitionSubscriberStartStop(t *testing.T) { 489 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 490 receiver := newTestMessageReceiver(t) 491 492 verifiers := test.NewVerifiers(t) 493 494 // Verifies the behavior of the subscribeStream and committer when they are 495 // stopped before any messages are received. 496 subStream := test.NewRPCVerifier(t) 497 subStream.Push(initSubReq(subscription), initSubResp(), nil) 498 barrier := subStream.PushWithBarrier(initFlowControlReq(), nil, nil) 499 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) 500 501 cmtStream := test.NewRPCVerifier(t) 502 cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) 503 verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) 504 505 mockServer.OnTestStart(verifiers) 506 defer mockServer.OnTestEnd() 507 508 sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) 509 if gotErr := sub.WaitStarted(); gotErr != nil { 510 t.Errorf("Start() got err: (%v)", gotErr) 511 } 512 barrier.Release() // To ensure the test is deterministic (i.e. flow control req always received) 513 sub.Stop() 514 if gotErr := sub.WaitStopped(); gotErr != nil { 515 t.Errorf("Stop() got err: (%v)", gotErr) 516 } 517} 518 519func TestSinglePartitionSubscriberSimpleMsgAck(t *testing.T) { 520 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 521 receiver := newTestMessageReceiver(t) 522 msg1 := seqMsgWithOffsetAndSize(22, 100) 523 msg2 := seqMsgWithOffsetAndSize(23, 200) 524 525 verifiers := test.NewVerifiers(t) 526 527 subStream := test.NewRPCVerifier(t) 528 subStream.Push(initSubReq(subscription), initSubResp(), nil) 529 subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) 530 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) 531 532 cmtStream := test.NewRPCVerifier(t) 533 cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) 534 cmtStream.Push(commitReq(24), commitResp(1), nil) 535 verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) 536 537 mockServer.OnTestStart(verifiers) 538 defer mockServer.OnTestEnd() 539 540 sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) 541 if gotErr := sub.WaitStarted(); gotErr != nil { 542 t.Errorf("Start() got err: (%v)", gotErr) 543 } 544 receiver.ValidateMsg(msg1).Ack() 545 receiver.ValidateMsg(msg2).Ack() 546 sub.Stop() 547 if gotErr := sub.WaitStopped(); gotErr != nil { 548 t.Errorf("Stop() got err: (%v)", gotErr) 549 } 550} 551 552func TestSinglePartitionSubscriberMessageQueue(t *testing.T) { 553 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 554 receiver := newTestBlockingMessageReceiver(t) 555 msg1 := seqMsgWithOffsetAndSize(1, 100) 556 msg2 := seqMsgWithOffsetAndSize(2, 100) 557 msg3 := seqMsgWithOffsetAndSize(3, 100) 558 retryableErr := status.Error(codes.Unavailable, "should retry") 559 560 verifiers := test.NewVerifiers(t) 561 562 subStream1 := test.NewRPCVerifier(t) 563 subStream1.Push(initSubReq(subscription), initSubResp(), nil) 564 subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) 565 subStream1.Push(nil, msgSubResp(msg2), nil) 566 subStream1.Push(nil, nil, retryableErr) 567 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1) 568 569 // When reconnected, the subscribeStream should seek to msg3 and have 570 // subtracted flow control tokens for msg1 and msg2. 571 subStream2 := test.NewRPCVerifier(t) 572 subStream2.Push(initSubReq(subscription), initSubResp(), nil) 573 subStream2.Push(seekReq(3), nil, nil) 574 subStream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 8}), msgSubResp(msg3), nil) 575 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2) 576 577 cmtStream := test.NewRPCVerifier(t) 578 cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) 579 cmtStream.Push(commitReq(4), commitResp(1), nil) 580 verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) 581 582 mockServer.OnTestStart(verifiers) 583 defer mockServer.OnTestEnd() 584 585 sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) 586 if gotErr := sub.WaitStarted(); gotErr != nil { 587 t.Errorf("Start() got err: (%v)", gotErr) 588 } 589 590 // Verifies that messageDeliveryQueue delivers messages sequentially and waits 591 // for the client message receiver func to return before delivering the next 592 // message. 593 var acks []AckConsumer 594 for _, msg := range []*pb.SequencedMessage{msg1, msg2, msg3} { 595 ack := receiver.ValidateMsg(msg) 596 acks = append(acks, ack) 597 receiver.VerifyNoMsgs() 598 receiver.Return() 599 } 600 601 // Ack all messages so that the committer terminates. 602 for _, ack := range acks { 603 ack.Ack() 604 } 605 606 sub.Stop() 607 if gotErr := sub.WaitStopped(); gotErr != nil { 608 t.Errorf("Stop() got err: (%v)", gotErr) 609 } 610} 611 612func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) { 613 subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0} 614 receiver := newTestBlockingMessageReceiver(t) 615 msg1 := seqMsgWithOffsetAndSize(1, 100) 616 msg2 := seqMsgWithOffsetAndSize(2, 100) 617 618 verifiers := test.NewVerifiers(t) 619 620 subStream := test.NewRPCVerifier(t) 621 subStream.Push(initSubReq(subscription), initSubResp(), nil) 622 subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) 623 verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream) 624 625 cmtStream := test.NewRPCVerifier(t) 626 cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil) 627 cmtStream.Push(commitReq(2), commitResp(1), nil) 628 verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream) 629 630 mockServer.OnTestStart(verifiers) 631 defer mockServer.OnTestEnd() 632 633 sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription) 634 if gotErr := sub.WaitStarted(); gotErr != nil { 635 t.Errorf("Start() got err: (%v)", gotErr) 636 } 637 638 receiver.ValidateMsg(msg1).Ack() 639 640 // Stop the subscriber before returning from the message receiver func. 641 sub.Stop() 642 receiver.Return() 643 644 if gotErr := sub.WaitStopped(); gotErr != nil { 645 t.Errorf("Stop() got err: (%v)", gotErr) 646 } 647 receiver.VerifyNoMsgs() // msg2 should not be received 648} 649 650func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber { 651 ctx := context.Background() 652 subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) 653 if err != nil { 654 t.Fatal(err) 655 } 656 cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) 657 if err != nil { 658 t.Fatal(err) 659 } 660 allClients := apiClients{subClient, cursorClient} 661 662 f := &singlePartitionSubscriberFactory{ 663 ctx: ctx, 664 subClient: subClient, 665 cursorClient: cursorClient, 666 settings: testSubscriberSettings(), 667 subscriptionPath: subscriptionPath, 668 receiver: receiverFunc, 669 disableTasks: true, // Background tasks disabled to control event order 670 } 671 f.settings.Partitions = partitions 672 sub := newMultiPartitionSubscriber(allClients, f) 673 sub.Start() 674 return sub 675} 676 677func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) { 678 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 679 receiver := newTestMessageReceiver(t) 680 msg1 := seqMsgWithOffsetAndSize(22, 100) 681 msg2 := seqMsgWithOffsetAndSize(23, 200) 682 msg3 := seqMsgWithOffsetAndSize(44, 100) 683 msg4 := seqMsgWithOffsetAndSize(45, 200) 684 685 verifiers := test.NewVerifiers(t) 686 687 // Partition 1 688 subStream1 := test.NewRPCVerifier(t) 689 subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) 690 subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) 691 subStream1.Push(nil, msgSubResp(msg2), nil) 692 verifiers.AddSubscribeStream(subscription, 1, subStream1) 693 694 cmtStream1 := test.NewRPCVerifier(t) 695 cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) 696 cmtStream1.Push(commitReq(24), commitResp(1), nil) 697 verifiers.AddCommitStream(subscription, 1, cmtStream1) 698 699 // Partition 2 700 subStream2 := test.NewRPCVerifier(t) 701 subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) 702 subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil) 703 subStream2.Push(nil, msgSubResp(msg4), nil) 704 verifiers.AddSubscribeStream(subscription, 2, subStream2) 705 706 cmtStream2 := test.NewRPCVerifier(t) 707 cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) 708 cmtStream2.Push(commitReq(46), commitResp(1), nil) 709 verifiers.AddCommitStream(subscription, 2, cmtStream2) 710 711 mockServer.OnTestStart(verifiers) 712 defer mockServer.OnTestEnd() 713 714 sub := newTestMultiPartitionSubscriber(t, receiver.onMessage, subscription, []int{1, 2}) 715 if gotErr := sub.WaitStarted(); gotErr != nil { 716 t.Errorf("Start() got err: (%v)", gotErr) 717 } 718 receiver.ValidateMsgs(join(partitionMsgs(1, msg1, msg2), partitionMsgs(2, msg3, msg4))) 719 sub.Stop() 720 if gotErr := sub.WaitStopped(); gotErr != nil { 721 t.Errorf("Stop() got err: (%v)", gotErr) 722 } 723} 724 725func TestMultiPartitionSubscriberPermanentError(t *testing.T) { 726 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 727 receiver := newTestMessageReceiver(t) 728 msg1 := seqMsgWithOffsetAndSize(22, 100) 729 msg2 := seqMsgWithOffsetAndSize(23, 200) 730 msg3 := seqMsgWithOffsetAndSize(44, 100) 731 serverErr := status.Error(codes.FailedPrecondition, "failed") 732 733 verifiers := test.NewVerifiers(t) 734 735 // Partition 1 736 subStream1 := test.NewRPCVerifier(t) 737 subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) 738 subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) 739 msg2Barrier := subStream1.PushWithBarrier(nil, msgSubResp(msg2), nil) 740 verifiers.AddSubscribeStream(subscription, 1, subStream1) 741 742 cmtStream1 := test.NewRPCVerifier(t) 743 cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) 744 cmtStream1.Push(commitReq(23), commitResp(1), nil) 745 verifiers.AddCommitStream(subscription, 1, cmtStream1) 746 747 // Partition 2 748 subStream2 := test.NewRPCVerifier(t) 749 subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) 750 subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil) 751 errorBarrier := subStream2.PushWithBarrier(nil, nil, serverErr) 752 verifiers.AddSubscribeStream(subscription, 2, subStream2) 753 754 cmtStream2 := test.NewRPCVerifier(t) 755 cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) 756 cmtStream2.Push(commitReq(45), commitResp(1), nil) 757 verifiers.AddCommitStream(subscription, 2, cmtStream2) 758 759 mockServer.OnTestStart(verifiers) 760 defer mockServer.OnTestEnd() 761 762 sub := newTestMultiPartitionSubscriber(t, receiver.onMessage, subscription, []int{1, 2}) 763 if gotErr := sub.WaitStarted(); gotErr != nil { 764 t.Errorf("Start() got err: (%v)", gotErr) 765 } 766 receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg3))) 767 errorBarrier.Release() // Release server error now to ensure test is deterministic 768 if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { 769 t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) 770 } 771 772 // Verify msg2 never received as subscriber has terminated. 773 msg2Barrier.Release() 774 receiver.VerifyNoMsgs() 775} 776 777func (as *assigningSubscriber) Partitions() []int { 778 as.mu.Lock() 779 defer as.mu.Unlock() 780 781 var partitions []int 782 for p := range as.subscribers { 783 partitions = append(partitions, p) 784 } 785 sort.Ints(partitions) 786 return partitions 787} 788 789func (as *assigningSubscriber) FlushCommits() { 790 as.mu.Lock() 791 defer as.mu.Unlock() 792 793 for _, sub := range as.subscribers { 794 sub.committer.commitOffsetToStream() 795 } 796} 797 798func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string) *assigningSubscriber { 799 ctx := context.Background() 800 subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn()) 801 if err != nil { 802 t.Fatal(err) 803 } 804 cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn()) 805 if err != nil { 806 t.Fatal(err) 807 } 808 assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn()) 809 if err != nil { 810 t.Fatal(err) 811 } 812 allClients := apiClients{subClient, cursorClient, assignmentClient} 813 814 f := &singlePartitionSubscriberFactory{ 815 ctx: ctx, 816 subClient: subClient, 817 cursorClient: cursorClient, 818 settings: testSubscriberSettings(), 819 subscriptionPath: subscriptionPath, 820 receiver: receiverFunc, 821 disableTasks: true, // Background tasks disabled to control event order 822 } 823 sub, err := newAssigningSubscriber(allClients, assignmentClient, fakeGenerateUUID, f) 824 if err != nil { 825 t.Fatal(err) 826 } 827 sub.Start() 828 return sub 829} 830 831func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { 832 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 833 receiver := newTestMessageReceiver(t) 834 msg1 := seqMsgWithOffsetAndSize(33, 100) 835 msg2 := seqMsgWithOffsetAndSize(34, 200) 836 msg3 := seqMsgWithOffsetAndSize(66, 100) 837 msg4 := seqMsgWithOffsetAndSize(67, 100) 838 msg5 := seqMsgWithOffsetAndSize(88, 100) 839 840 verifiers := test.NewVerifiers(t) 841 842 // Assignment stream 843 asnStream := test.NewRPCVerifier(t) 844 asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 6}), nil) 845 assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil) 846 asnStream.Push(assignmentAckReq(), nil, nil) 847 verifiers.AddAssignmentStream(subscription, asnStream) 848 849 // Partition 3 850 subStream3 := test.NewRPCVerifier(t) 851 subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil) 852 subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil) 853 msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil) 854 verifiers.AddSubscribeStream(subscription, 3, subStream3) 855 856 cmtStream3 := test.NewRPCVerifier(t) 857 cmtStream3.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 3}), initCommitResp(), nil) 858 cmtStream3.Push(commitReq(34), commitResp(1), nil) 859 cmtStream3.Push(commitReq(35), commitResp(1), nil) 860 verifiers.AddCommitStream(subscription, 3, cmtStream3) 861 862 // Partition 6 863 subStream6 := test.NewRPCVerifier(t) 864 subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil) 865 subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil) 866 // msg4 should not be received. 867 msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil) 868 verifiers.AddSubscribeStream(subscription, 6, subStream6) 869 870 cmtStream6 := test.NewRPCVerifier(t) 871 cmtStream6.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 6}), initCommitResp(), nil) 872 cmtStream6.Push(commitReq(67), commitResp(1), nil) 873 verifiers.AddCommitStream(subscription, 6, cmtStream6) 874 875 // Partition 8 876 subStream8 := test.NewRPCVerifier(t) 877 subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil) 878 subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil) 879 verifiers.AddSubscribeStream(subscription, 8, subStream8) 880 881 cmtStream8 := test.NewRPCVerifier(t) 882 cmtStream8.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 8}), initCommitResp(), nil) 883 cmtStream8.Push(commitReq(89), commitResp(1), nil) 884 verifiers.AddCommitStream(subscription, 8, cmtStream8) 885 886 mockServer.OnTestStart(verifiers) 887 defer mockServer.OnTestEnd() 888 889 sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) 890 if gotErr := sub.WaitStarted(); gotErr != nil { 891 t.Errorf("Start() got err: (%v)", gotErr) 892 } 893 894 // Partition assignments are initially {3, 6}. 895 receiver.ValidateMsgs(join(partitionMsgs(3, msg1), partitionMsgs(6, msg3))) 896 if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) { 897 t.Errorf("subscriber partitions: got %d, want %d", got, want) 898 } 899 900 // Partition assignments will now be {3, 8}. 901 assignmentBarrier.Release() 902 receiver.ValidateMsgs(partitionMsgs(8, msg5)) 903 if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) { 904 t.Errorf("subscriber partitions: got %d, want %d", got, want) 905 } 906 907 // msg2 is from partition 3 and should be received. msg4 is from partition 6 908 // (removed) and should be discarded. 909 sub.FlushCommits() 910 msg2Barrier.Release() 911 msg4Barrier.Release() 912 receiver.ValidateMsgs(partitionMsgs(3, msg2)) 913 914 // Stop should flush all commit cursors. 915 sub.Stop() 916 if gotErr := sub.WaitStopped(); gotErr != nil { 917 t.Errorf("Stop() got err: (%v)", gotErr) 918 } 919} 920 921func TestAssigningSubscriberPermanentError(t *testing.T) { 922 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 923 receiver := newTestMessageReceiver(t) 924 msg1 := seqMsgWithOffsetAndSize(11, 100) 925 msg2 := seqMsgWithOffsetAndSize(22, 200) 926 serverErr := status.Error(codes.FailedPrecondition, "failed") 927 928 verifiers := test.NewVerifiers(t) 929 930 // Assignment stream 931 asnStream := test.NewRPCVerifier(t) 932 asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil) 933 errBarrier := asnStream.PushWithBarrier(assignmentAckReq(), nil, serverErr) 934 verifiers.AddAssignmentStream(subscription, asnStream) 935 936 // Partition 1 937 subStream1 := test.NewRPCVerifier(t) 938 subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) 939 subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil) 940 verifiers.AddSubscribeStream(subscription, 1, subStream1) 941 942 cmtStream1 := test.NewRPCVerifier(t) 943 cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) 944 cmtStream1.Push(commitReq(12), commitResp(1), nil) 945 verifiers.AddCommitStream(subscription, 1, cmtStream1) 946 947 // Partition 2 948 subStream2 := test.NewRPCVerifier(t) 949 subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil) 950 subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil) 951 verifiers.AddSubscribeStream(subscription, 2, subStream2) 952 953 cmtStream2 := test.NewRPCVerifier(t) 954 cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil) 955 cmtStream2.Push(commitReq(23), commitResp(1), nil) 956 verifiers.AddCommitStream(subscription, 2, cmtStream2) 957 958 mockServer.OnTestStart(verifiers) 959 defer mockServer.OnTestEnd() 960 961 sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) 962 if gotErr := sub.WaitStarted(); gotErr != nil { 963 t.Errorf("Start() got err: (%v)", gotErr) 964 } 965 receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg2))) 966 967 // Permanent assignment stream error should terminate subscriber. Commits are 968 // still flushed. 969 errBarrier.Release() 970 if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { 971 t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) 972 } 973} 974 975func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) { 976 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 977 receiver := newTestMessageReceiver(t) 978 msg1 := seqMsgWithOffsetAndSize(11, 100) 979 msg2 := seqMsgWithOffsetAndSize(22, 200) 980 981 verifiers := test.NewVerifiers(t) 982 983 // Assignment stream 984 asnStream := test.NewRPCVerifier(t) 985 asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil) 986 assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{}), nil) 987 assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil) 988 verifiers.AddAssignmentStream(subscription, asnStream) 989 990 // Partition 1 991 subStream := test.NewRPCVerifier(t) 992 subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil) 993 subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil) 994 verifiers.AddSubscribeStream(subscription, 1, subStream) 995 996 cmtStream := test.NewRPCVerifier(t) 997 cmtStream.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil) 998 cmtStream.Push(commitReq(12), commitResp(1), nil) 999 verifiers.AddCommitStream(subscription, 1, cmtStream) 1000 1001 mockServer.OnTestStart(verifiers) 1002 defer mockServer.OnTestEnd() 1003 1004 sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription) 1005 if gotErr := sub.WaitStarted(); gotErr != nil { 1006 t.Errorf("Start() got err: (%v)", gotErr) 1007 } 1008 1009 // Partition assignments are initially {1}. 1010 receiver.ValidateMsg(msg1).Ack() 1011 ack2 := receiver.ValidateMsg(msg2) 1012 1013 // Partition assignments will now be {}. 1014 assignmentBarrier1.Release() 1015 assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic 1016 1017 // Partition 1 has already been unassigned, so this ack is discarded. 1018 ack2.Ack() 1019 1020 sub.Stop() 1021 if gotErr := sub.WaitStopped(); gotErr != nil { 1022 t.Errorf("Stop() got err: (%v)", gotErr) 1023 } 1024} 1025 1026func TestNewSubscriberCreatesCorrectImpl(t *testing.T) { 1027 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 1028 const region = "us-central1" 1029 receiver := newTestMessageReceiver(t) 1030 1031 sub, err := NewSubscriber(context.Background(), DefaultReceiveSettings, receiver.onMessage, region, subscription) 1032 if err != nil { 1033 t.Errorf("NewSubscriber() got error: %v", err) 1034 } else if _, ok := sub.(*assigningSubscriber); !ok { 1035 t.Error("NewSubscriber() did not return a assigningSubscriber") 1036 } 1037 1038 settings := DefaultReceiveSettings 1039 settings.Partitions = []int{1, 2, 3} 1040 sub, err = NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription) 1041 if err != nil { 1042 t.Errorf("NewSubscriber() got error: %v", err) 1043 } else if _, ok := sub.(*multiPartitionSubscriber); !ok { 1044 t.Error("NewSubscriber() did not return a multiPartitionSubscriber") 1045 } 1046} 1047 1048func TestNewSubscriberValidatesSettings(t *testing.T) { 1049 const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub" 1050 const region = "us-central1" 1051 receiver := newTestMessageReceiver(t) 1052 1053 settings := DefaultReceiveSettings 1054 settings.MaxOutstandingMessages = 0 1055 if _, err := NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription); err == nil { 1056 t.Error("NewSubscriber() did not return error") 1057 } 1058} 1059