1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"context"
18	"sort"
19	"testing"
20	"time"
21
22	"cloud.google.com/go/internal/testutil"
23	"cloud.google.com/go/pubsublite/internal/test"
24	"github.com/google/go-cmp/cmp/cmpopts"
25	"google.golang.org/grpc/codes"
26	"google.golang.org/grpc/status"
27	"google.golang.org/protobuf/proto"
28
29	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
30)
31
32func testSubscriberSettings() ReceiveSettings {
33	settings := testReceiveSettings()
34	settings.MaxOutstandingMessages = 10
35	settings.MaxOutstandingBytes = 1000
36	return settings
37}
38
39// initFlowControlReq returns the first expected flow control request when
40// testSubscriberSettings are used.
41func initFlowControlReq() *pb.SubscribeRequest {
42	return flowControlSubReq(flowControlTokens{Bytes: 1000, Messages: 10})
43}
44
45func partitionMsgs(partition int, msgs ...*pb.SequencedMessage) []*ReceivedMessage {
46	var received []*ReceivedMessage
47	for _, msg := range msgs {
48		received = append(received, &ReceivedMessage{Msg: msg, Partition: partition})
49	}
50	return received
51}
52
53func join(args ...[]*ReceivedMessage) []*ReceivedMessage {
54	var received []*ReceivedMessage
55	for _, msgs := range args {
56		received = append(received, msgs...)
57	}
58	return received
59}
60
61type testMessageReceiver struct {
62	t        *testing.T
63	received chan *ReceivedMessage
64}
65
66func newTestMessageReceiver(t *testing.T) *testMessageReceiver {
67	return &testMessageReceiver{
68		t:        t,
69		received: make(chan *ReceivedMessage, 5),
70	}
71}
72
73func (tr *testMessageReceiver) onMessage(msg *ReceivedMessage) {
74	tr.received <- msg
75}
76
77func (tr *testMessageReceiver) ValidateMsg(want *pb.SequencedMessage) AckConsumer {
78	select {
79	case <-time.After(serviceTestWaitTimeout):
80		tr.t.Errorf("Message (%v) not received within %v", want, serviceTestWaitTimeout)
81		return nil
82	case got := <-tr.received:
83		if !proto.Equal(got.Msg, want) {
84			tr.t.Errorf("Received message: got (%v), want (%v)", got.Msg, want)
85		}
86		return got.Ack
87	}
88}
89
90type ByMsgOffset []*ReceivedMessage
91
92func (m ByMsgOffset) Len() int      { return len(m) }
93func (m ByMsgOffset) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
94func (m ByMsgOffset) Less(i, j int) bool {
95	return m[i].Msg.GetCursor().GetOffset() < m[j].Msg.GetCursor().GetOffset()
96}
97
98func (tr *testMessageReceiver) ValidateMsgs(want []*ReceivedMessage) {
99	var got []*ReceivedMessage
100	for count := 0; count < len(want); count++ {
101		select {
102		case <-time.After(serviceTestWaitTimeout):
103			tr.t.Errorf("Received messages count: got %d, want %d", count, len(want))
104		case received := <-tr.received:
105			received.Ack.Ack()
106			got = append(got, received)
107		}
108	}
109
110	sort.Sort(ByMsgOffset(want))
111	sort.Sort(ByMsgOffset(got))
112	if !testutil.Equal(got, want, cmpopts.IgnoreFields(ReceivedMessage{}, "Ack")) {
113		tr.t.Errorf("Received messages: got: %v\nwant: %v", got, want)
114	}
115}
116
117func (tr *testMessageReceiver) VerifyNoMsgs() {
118	select {
119	case got := <-tr.received:
120		tr.t.Errorf("Got unexpected message: %v", got.Msg)
121	case <-time.After(20 * time.Millisecond):
122		// Wait to ensure no messages received.
123	}
124}
125
126// testBlockingMessageReceiver can be used to simulate a client message receiver
127// func that is blocking due to slow message processing.
128type testBlockingMessageReceiver struct {
129	blockReceive chan struct{}
130
131	testMessageReceiver
132}
133
134func newTestBlockingMessageReceiver(t *testing.T) *testBlockingMessageReceiver {
135	return &testBlockingMessageReceiver{
136		testMessageReceiver: testMessageReceiver{
137			t:        t,
138			received: make(chan *ReceivedMessage, 5),
139		},
140		blockReceive: make(chan struct{}),
141	}
142}
143
144// onMessage is the message receiver func and blocks until there is a call to
145// Return().
146func (tr *testBlockingMessageReceiver) onMessage(msg *ReceivedMessage) {
147	tr.testMessageReceiver.onMessage(msg)
148	<-tr.blockReceive
149}
150
151// Return signals onMessage to return.
152func (tr *testBlockingMessageReceiver) Return() {
153	var void struct{}
154	tr.blockReceive <- void
155}
156
157func TestMessageDeliveryQueue(t *testing.T) {
158	acks := newAckTracker()
159	receiver := newTestMessageReceiver(t)
160	messageQueue := newMessageDeliveryQueue(acks, receiver.onMessage, 10)
161
162	t.Run("Add before start", func(t *testing.T) {
163		msg1 := seqMsgWithOffset(1)
164		ack1 := newAckConsumer(1, 0, nil)
165		messageQueue.Add(&ReceivedMessage{Msg: msg1, Ack: ack1})
166
167		receiver.VerifyNoMsgs()
168	})
169
170	t.Run("Add after start", func(t *testing.T) {
171		msg2 := seqMsgWithOffset(2)
172		ack2 := newAckConsumer(2, 0, nil)
173		msg3 := seqMsgWithOffset(3)
174		ack3 := newAckConsumer(3, 0, nil)
175
176		messageQueue.Start()
177		messageQueue.Start() // Check duplicate starts
178		messageQueue.Add(&ReceivedMessage{Msg: msg2, Ack: ack2})
179		messageQueue.Add(&ReceivedMessage{Msg: msg3, Ack: ack3})
180
181		receiver.ValidateMsg(msg2)
182		receiver.ValidateMsg(msg3)
183	})
184
185	t.Run("Add after stop", func(t *testing.T) {
186		msg4 := seqMsgWithOffset(4)
187		ack4 := newAckConsumer(4, 0, nil)
188
189		messageQueue.Stop()
190		messageQueue.Stop() // Check duplicate stop
191		messageQueue.Add(&ReceivedMessage{Msg: msg4, Ack: ack4})
192
193		receiver.VerifyNoMsgs()
194	})
195}
196
197// testSubscribeStream wraps a subscribeStream for ease of testing.
198type testSubscribeStream struct {
199	Receiver *testMessageReceiver
200	t        *testing.T
201	sub      *subscribeStream
202	serviceTestProxy
203}
204
205func newTestSubscribeStream(t *testing.T, subscription subscriptionPartition, settings ReceiveSettings, acks *ackTracker) *testSubscribeStream {
206	ctx := context.Background()
207	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
208	if err != nil {
209		t.Fatal(err)
210	}
211
212	ts := &testSubscribeStream{
213		Receiver: newTestMessageReceiver(t),
214		t:        t,
215	}
216	ts.sub = newSubscribeStream(ctx, subClient, settings, ts.Receiver.onMessage, subscription, acks, true)
217	ts.initAndStart(t, ts.sub, "Subscriber", subClient)
218	return ts
219}
220
221// SendBatchFlowControl invokes the periodic background batch flow control. Note
222// that the periodic task is disabled in tests.
223func (ts *testSubscribeStream) SendBatchFlowControl() {
224	ts.sub.sendBatchFlowControl()
225}
226
227func TestSubscribeStreamReconnect(t *testing.T) {
228	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
229	acks := newAckTracker()
230	msg1 := seqMsgWithOffsetAndSize(67, 200)
231	msg2 := seqMsgWithOffsetAndSize(68, 100)
232	permanentErr := status.Error(codes.FailedPrecondition, "permanent failure")
233
234	verifiers := test.NewVerifiers(t)
235
236	stream1 := test.NewRPCVerifier(t)
237	stream1.Push(initSubReq(subscription), initSubResp(), nil)
238	stream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
239	stream1.Push(nil, nil, status.Error(codes.Unavailable, "server unavailable"))
240	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream1)
241
242	// When reconnected, the subscribeStream should seek to msg2 and have
243	// subtracted flow control tokens.
244	stream2 := test.NewRPCVerifier(t)
245	stream2.Push(initSubReq(subscription), initSubResp(), nil)
246	stream2.Push(seekReq(68), seekResp(68), nil)
247	stream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 9}), msgSubResp(msg2), nil)
248	// Subscriber should terminate on permanent error.
249	stream2.Push(nil, nil, permanentErr)
250	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream2)
251
252	mockServer.OnTestStart(verifiers)
253	defer mockServer.OnTestEnd()
254
255	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
256	if gotErr := sub.StartError(); gotErr != nil {
257		t.Errorf("Start() got err: (%v)", gotErr)
258	}
259	sub.Receiver.ValidateMsg(msg1)
260	sub.Receiver.ValidateMsg(msg2)
261	if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, permanentErr) {
262		t.Errorf("Final err: (%v), want: (%v)", gotErr, permanentErr)
263	}
264}
265
266func TestSubscribeStreamFlowControlBatching(t *testing.T) {
267	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
268	acks := newAckTracker()
269	msg1 := seqMsgWithOffsetAndSize(67, 200)
270	msg2 := seqMsgWithOffsetAndSize(68, 100)
271	serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")
272
273	verifiers := test.NewVerifiers(t)
274	stream := test.NewRPCVerifier(t)
275	stream.Push(initSubReq(subscription), initSubResp(), nil)
276	stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
277	// Batch flow control request expected.
278	stream.Push(flowControlSubReq(flowControlTokens{Bytes: 300, Messages: 2}), nil, serverErr)
279	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
280
281	mockServer.OnTestStart(verifiers)
282	defer mockServer.OnTestEnd()
283
284	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
285	if gotErr := sub.StartError(); gotErr != nil {
286		t.Errorf("Start() got err: (%v)", gotErr)
287	}
288	sub.Receiver.ValidateMsg(msg1)
289	sub.Receiver.ValidateMsg(msg2)
290	sub.sub.onAckAsync(msg1.SizeBytes)
291	sub.sub.onAckAsync(msg2.SizeBytes)
292	sub.sub.sendBatchFlowControl()
293	if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
294		t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
295	}
296}
297
298func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
299	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
300	acks := newAckTracker()
301	msg1 := seqMsgWithOffsetAndSize(67, 250)
302	// MaxOutstandingBytes = 1000, so msg2 pushes the pending flow control bytes
303	// over the expediteBatchRequestRatio=50% threshold in flowControlBatcher.
304	msg2 := seqMsgWithOffsetAndSize(68, 251)
305	serverErr := status.Error(codes.InvalidArgument, "verifies flow control received")
306
307	verifiers := test.NewVerifiers(t)
308	stream := test.NewRPCVerifier(t)
309	stream.Push(initSubReq(subscription), initSubResp(), nil)
310	stream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
311	// Batch flow control request expected.
312	stream.Push(flowControlSubReq(flowControlTokens{Bytes: 501, Messages: 2}), nil, serverErr)
313	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
314
315	mockServer.OnTestStart(verifiers)
316	defer mockServer.OnTestEnd()
317
318	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
319	if gotErr := sub.StartError(); gotErr != nil {
320		t.Errorf("Start() got err: (%v)", gotErr)
321	}
322	sub.Receiver.ValidateMsg(msg1)
323	sub.Receiver.ValidateMsg(msg2)
324	sub.sub.onAckAsync(msg1.SizeBytes)
325	sub.sub.onAckAsync(msg2.SizeBytes)
326	// Note: the ack for msg2 automatically triggers sending the flow control.
327	if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
328		t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
329	}
330}
331
332func TestSubscribeStreamInvalidInitialResponse(t *testing.T) {
333	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
334	acks := newAckTracker()
335
336	verifiers := test.NewVerifiers(t)
337	stream := test.NewRPCVerifier(t)
338	stream.Push(initSubReq(subscription), seekResp(0), nil) // Seek instead of init response
339	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
340
341	mockServer.OnTestStart(verifiers)
342	defer mockServer.OnTestEnd()
343
344	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
345	if gotErr, wantErr := sub.StartError(), errInvalidInitialSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
346		t.Errorf("Start got err: (%v), want: (%v)", gotErr, wantErr)
347	}
348}
349
350func TestSubscribeStreamDuplicateInitialResponse(t *testing.T) {
351	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
352	acks := newAckTracker()
353
354	verifiers := test.NewVerifiers(t)
355	stream := test.NewRPCVerifier(t)
356	stream.Push(initSubReq(subscription), initSubResp(), nil)
357	stream.Push(initFlowControlReq(), initSubResp(), nil) // Second initial response
358	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
359
360	mockServer.OnTestStart(verifiers)
361	defer mockServer.OnTestEnd()
362
363	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
364	if gotErr, wantErr := sub.FinalError(), errInvalidSubscribeResponse; !test.ErrorEqual(gotErr, wantErr) {
365		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
366	}
367}
368
369func TestSubscribeStreamSpuriousSeekResponse(t *testing.T) {
370	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
371	acks := newAckTracker()
372
373	verifiers := test.NewVerifiers(t)
374	stream := test.NewRPCVerifier(t)
375	stream.Push(initSubReq(subscription), initSubResp(), nil)
376	stream.Push(initFlowControlReq(), seekResp(1), nil) // Seek response with no seek request
377	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
378
379	mockServer.OnTestStart(verifiers)
380	defer mockServer.OnTestEnd()
381
382	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
383	if gotErr, wantErr := sub.FinalError(), errNoInFlightSeek; !test.ErrorEqual(gotErr, wantErr) {
384		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
385	}
386}
387
388func TestSubscribeStreamNoMessages(t *testing.T) {
389	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
390	acks := newAckTracker()
391
392	verifiers := test.NewVerifiers(t)
393	stream := test.NewRPCVerifier(t)
394	stream.Push(initSubReq(subscription), initSubResp(), nil)
395	stream.Push(initFlowControlReq(), msgSubResp(), nil) // No messages in response
396	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
397
398	mockServer.OnTestStart(verifiers)
399	defer mockServer.OnTestEnd()
400
401	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
402	if gotErr, wantErr := sub.FinalError(), errServerNoMessages; !test.ErrorEqual(gotErr, wantErr) {
403		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
404	}
405}
406
407func TestSubscribeStreamMessagesOutOfOrder(t *testing.T) {
408	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
409	acks := newAckTracker()
410	msg1 := seqMsgWithOffsetAndSize(56, 100)
411	msg2 := seqMsgWithOffsetAndSize(55, 100) // Offset before msg1
412
413	verifiers := test.NewVerifiers(t)
414	stream := test.NewRPCVerifier(t)
415	stream.Push(initSubReq(subscription), initSubResp(), nil)
416	stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
417	stream.Push(nil, msgSubResp(msg2), nil)
418	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
419
420	mockServer.OnTestStart(verifiers)
421	defer mockServer.OnTestEnd()
422
423	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
424	sub.Receiver.ValidateMsg(msg1)
425	if gotErr, msg := sub.FinalError(), "start offset = 55, expected >= 57"; !test.ErrorHasMsg(gotErr, msg) {
426		t.Errorf("Final err: (%v), want msg: %q", gotErr, msg)
427	}
428}
429
430func TestSubscribeStreamFlowControlOverflow(t *testing.T) {
431	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
432	acks := newAckTracker()
433	msg1 := seqMsgWithOffsetAndSize(56, 900)
434	msg2 := seqMsgWithOffsetAndSize(57, 101) // Overflows ReceiveSettings.MaxOutstandingBytes = 1000
435
436	verifiers := test.NewVerifiers(t)
437	stream := test.NewRPCVerifier(t)
438	stream.Push(initSubReq(subscription), initSubResp(), nil)
439	stream.Push(initFlowControlReq(), msgSubResp(msg1), nil)
440	stream.Push(nil, msgSubResp(msg2), nil)
441	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, stream)
442
443	mockServer.OnTestStart(verifiers)
444	defer mockServer.OnTestEnd()
445
446	sub := newTestSubscribeStream(t, subscription, testSubscriberSettings(), acks)
447	sub.Receiver.ValidateMsg(msg1)
448	if gotErr, wantErr := sub.FinalError(), errTokenCounterBytesNegative; !test.ErrorEqual(gotErr, wantErr) {
449		t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
450	}
451}
452
453type testSinglePartitionSubscriber singlePartitionSubscriber
454
455func (t *testSinglePartitionSubscriber) WaitStopped() error {
456	err := t.compositeService.WaitStopped()
457	// Close connections.
458	t.committer.cursorClient.Close()
459	t.subscriber.subClient.Close()
460	return err
461}
462
463func newTestSinglePartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscription subscriptionPartition) *testSinglePartitionSubscriber {
464	ctx := context.Background()
465	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
466	if err != nil {
467		t.Fatal(err)
468	}
469	cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
470	if err != nil {
471		t.Fatal(err)
472	}
473
474	f := &singlePartitionSubscriberFactory{
475		ctx:              ctx,
476		subClient:        subClient,
477		cursorClient:     cursorClient,
478		settings:         testSubscriberSettings(),
479		subscriptionPath: subscription.Path,
480		receiver:         receiverFunc,
481		disableTasks:     true, // Background tasks disabled to control event order
482	}
483	sub := f.New(subscription.Partition)
484	sub.Start()
485	return (*testSinglePartitionSubscriber)(sub)
486}
487
488func TestSinglePartitionSubscriberStartStop(t *testing.T) {
489	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
490	receiver := newTestMessageReceiver(t)
491
492	verifiers := test.NewVerifiers(t)
493
494	// Verifies the behavior of the subscribeStream and committer when they are
495	// stopped before any messages are received.
496	subStream := test.NewRPCVerifier(t)
497	subStream.Push(initSubReq(subscription), initSubResp(), nil)
498	barrier := subStream.PushWithBarrier(initFlowControlReq(), nil, nil)
499	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
500
501	cmtStream := test.NewRPCVerifier(t)
502	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
503	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
504
505	mockServer.OnTestStart(verifiers)
506	defer mockServer.OnTestEnd()
507
508	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
509	if gotErr := sub.WaitStarted(); gotErr != nil {
510		t.Errorf("Start() got err: (%v)", gotErr)
511	}
512	barrier.Release() // To ensure the test is deterministic (i.e. flow control req always received)
513	sub.Stop()
514	if gotErr := sub.WaitStopped(); gotErr != nil {
515		t.Errorf("Stop() got err: (%v)", gotErr)
516	}
517}
518
519func TestSinglePartitionSubscriberSimpleMsgAck(t *testing.T) {
520	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
521	receiver := newTestMessageReceiver(t)
522	msg1 := seqMsgWithOffsetAndSize(22, 100)
523	msg2 := seqMsgWithOffsetAndSize(23, 200)
524
525	verifiers := test.NewVerifiers(t)
526
527	subStream := test.NewRPCVerifier(t)
528	subStream.Push(initSubReq(subscription), initSubResp(), nil)
529	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
530	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
531
532	cmtStream := test.NewRPCVerifier(t)
533	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
534	cmtStream.Push(commitReq(24), commitResp(1), nil)
535	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
536
537	mockServer.OnTestStart(verifiers)
538	defer mockServer.OnTestEnd()
539
540	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
541	if gotErr := sub.WaitStarted(); gotErr != nil {
542		t.Errorf("Start() got err: (%v)", gotErr)
543	}
544	receiver.ValidateMsg(msg1).Ack()
545	receiver.ValidateMsg(msg2).Ack()
546	sub.Stop()
547	if gotErr := sub.WaitStopped(); gotErr != nil {
548		t.Errorf("Stop() got err: (%v)", gotErr)
549	}
550}
551
552func TestSinglePartitionSubscriberMessageQueue(t *testing.T) {
553	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
554	receiver := newTestBlockingMessageReceiver(t)
555	msg1 := seqMsgWithOffsetAndSize(1, 100)
556	msg2 := seqMsgWithOffsetAndSize(2, 100)
557	msg3 := seqMsgWithOffsetAndSize(3, 100)
558	retryableErr := status.Error(codes.Unavailable, "should retry")
559
560	verifiers := test.NewVerifiers(t)
561
562	subStream1 := test.NewRPCVerifier(t)
563	subStream1.Push(initSubReq(subscription), initSubResp(), nil)
564	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
565	subStream1.Push(nil, msgSubResp(msg2), nil)
566	subStream1.Push(nil, nil, retryableErr)
567	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream1)
568
569	// When reconnected, the subscribeStream should seek to msg3 and have
570	// subtracted flow control tokens for msg1 and msg2.
571	subStream2 := test.NewRPCVerifier(t)
572	subStream2.Push(initSubReq(subscription), initSubResp(), nil)
573	subStream2.Push(seekReq(3), nil, nil)
574	subStream2.Push(flowControlSubReq(flowControlTokens{Bytes: 800, Messages: 8}), msgSubResp(msg3), nil)
575	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream2)
576
577	cmtStream := test.NewRPCVerifier(t)
578	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
579	cmtStream.Push(commitReq(4), commitResp(1), nil)
580	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
581
582	mockServer.OnTestStart(verifiers)
583	defer mockServer.OnTestEnd()
584
585	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
586	if gotErr := sub.WaitStarted(); gotErr != nil {
587		t.Errorf("Start() got err: (%v)", gotErr)
588	}
589
590	// Verifies that messageDeliveryQueue delivers messages sequentially and waits
591	// for the client message receiver func to return before delivering the next
592	// message.
593	var acks []AckConsumer
594	for _, msg := range []*pb.SequencedMessage{msg1, msg2, msg3} {
595		ack := receiver.ValidateMsg(msg)
596		acks = append(acks, ack)
597		receiver.VerifyNoMsgs()
598		receiver.Return()
599	}
600
601	// Ack all messages so that the committer terminates.
602	for _, ack := range acks {
603		ack.Ack()
604	}
605
606	sub.Stop()
607	if gotErr := sub.WaitStopped(); gotErr != nil {
608		t.Errorf("Stop() got err: (%v)", gotErr)
609	}
610}
611
612func TestSinglePartitionSubscriberStopDuringReceive(t *testing.T) {
613	subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-sub", 0}
614	receiver := newTestBlockingMessageReceiver(t)
615	msg1 := seqMsgWithOffsetAndSize(1, 100)
616	msg2 := seqMsgWithOffsetAndSize(2, 100)
617
618	verifiers := test.NewVerifiers(t)
619
620	subStream := test.NewRPCVerifier(t)
621	subStream.Push(initSubReq(subscription), initSubResp(), nil)
622	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
623	verifiers.AddSubscribeStream(subscription.Path, subscription.Partition, subStream)
624
625	cmtStream := test.NewRPCVerifier(t)
626	cmtStream.Push(initCommitReq(subscription), initCommitResp(), nil)
627	cmtStream.Push(commitReq(2), commitResp(1), nil)
628	verifiers.AddCommitStream(subscription.Path, subscription.Partition, cmtStream)
629
630	mockServer.OnTestStart(verifiers)
631	defer mockServer.OnTestEnd()
632
633	sub := newTestSinglePartitionSubscriber(t, receiver.onMessage, subscription)
634	if gotErr := sub.WaitStarted(); gotErr != nil {
635		t.Errorf("Start() got err: (%v)", gotErr)
636	}
637
638	receiver.ValidateMsg(msg1).Ack()
639
640	// Stop the subscriber before returning from the message receiver func.
641	sub.Stop()
642	receiver.Return()
643
644	if gotErr := sub.WaitStopped(); gotErr != nil {
645		t.Errorf("Stop() got err: (%v)", gotErr)
646	}
647	receiver.VerifyNoMsgs() // msg2 should not be received
648}
649
650func newTestMultiPartitionSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string, partitions []int) *multiPartitionSubscriber {
651	ctx := context.Background()
652	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
653	if err != nil {
654		t.Fatal(err)
655	}
656	cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
657	if err != nil {
658		t.Fatal(err)
659	}
660	allClients := apiClients{subClient, cursorClient}
661
662	f := &singlePartitionSubscriberFactory{
663		ctx:              ctx,
664		subClient:        subClient,
665		cursorClient:     cursorClient,
666		settings:         testSubscriberSettings(),
667		subscriptionPath: subscriptionPath,
668		receiver:         receiverFunc,
669		disableTasks:     true, // Background tasks disabled to control event order
670	}
671	f.settings.Partitions = partitions
672	sub := newMultiPartitionSubscriber(allClients, f)
673	sub.Start()
674	return sub
675}
676
677func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) {
678	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
679	receiver := newTestMessageReceiver(t)
680	msg1 := seqMsgWithOffsetAndSize(22, 100)
681	msg2 := seqMsgWithOffsetAndSize(23, 200)
682	msg3 := seqMsgWithOffsetAndSize(44, 100)
683	msg4 := seqMsgWithOffsetAndSize(45, 200)
684
685	verifiers := test.NewVerifiers(t)
686
687	// Partition 1
688	subStream1 := test.NewRPCVerifier(t)
689	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
690	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
691	subStream1.Push(nil, msgSubResp(msg2), nil)
692	verifiers.AddSubscribeStream(subscription, 1, subStream1)
693
694	cmtStream1 := test.NewRPCVerifier(t)
695	cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
696	cmtStream1.Push(commitReq(24), commitResp(1), nil)
697	verifiers.AddCommitStream(subscription, 1, cmtStream1)
698
699	// Partition 2
700	subStream2 := test.NewRPCVerifier(t)
701	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
702	subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
703	subStream2.Push(nil, msgSubResp(msg4), nil)
704	verifiers.AddSubscribeStream(subscription, 2, subStream2)
705
706	cmtStream2 := test.NewRPCVerifier(t)
707	cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
708	cmtStream2.Push(commitReq(46), commitResp(1), nil)
709	verifiers.AddCommitStream(subscription, 2, cmtStream2)
710
711	mockServer.OnTestStart(verifiers)
712	defer mockServer.OnTestEnd()
713
714	sub := newTestMultiPartitionSubscriber(t, receiver.onMessage, subscription, []int{1, 2})
715	if gotErr := sub.WaitStarted(); gotErr != nil {
716		t.Errorf("Start() got err: (%v)", gotErr)
717	}
718	receiver.ValidateMsgs(join(partitionMsgs(1, msg1, msg2), partitionMsgs(2, msg3, msg4)))
719	sub.Stop()
720	if gotErr := sub.WaitStopped(); gotErr != nil {
721		t.Errorf("Stop() got err: (%v)", gotErr)
722	}
723}
724
725func TestMultiPartitionSubscriberPermanentError(t *testing.T) {
726	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
727	receiver := newTestMessageReceiver(t)
728	msg1 := seqMsgWithOffsetAndSize(22, 100)
729	msg2 := seqMsgWithOffsetAndSize(23, 200)
730	msg3 := seqMsgWithOffsetAndSize(44, 100)
731	serverErr := status.Error(codes.FailedPrecondition, "failed")
732
733	verifiers := test.NewVerifiers(t)
734
735	// Partition 1
736	subStream1 := test.NewRPCVerifier(t)
737	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
738	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
739	msg2Barrier := subStream1.PushWithBarrier(nil, msgSubResp(msg2), nil)
740	verifiers.AddSubscribeStream(subscription, 1, subStream1)
741
742	cmtStream1 := test.NewRPCVerifier(t)
743	cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
744	cmtStream1.Push(commitReq(23), commitResp(1), nil)
745	verifiers.AddCommitStream(subscription, 1, cmtStream1)
746
747	// Partition 2
748	subStream2 := test.NewRPCVerifier(t)
749	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
750	subStream2.Push(initFlowControlReq(), msgSubResp(msg3), nil)
751	errorBarrier := subStream2.PushWithBarrier(nil, nil, serverErr)
752	verifiers.AddSubscribeStream(subscription, 2, subStream2)
753
754	cmtStream2 := test.NewRPCVerifier(t)
755	cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
756	cmtStream2.Push(commitReq(45), commitResp(1), nil)
757	verifiers.AddCommitStream(subscription, 2, cmtStream2)
758
759	mockServer.OnTestStart(verifiers)
760	defer mockServer.OnTestEnd()
761
762	sub := newTestMultiPartitionSubscriber(t, receiver.onMessage, subscription, []int{1, 2})
763	if gotErr := sub.WaitStarted(); gotErr != nil {
764		t.Errorf("Start() got err: (%v)", gotErr)
765	}
766	receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg3)))
767	errorBarrier.Release() // Release server error now to ensure test is deterministic
768	if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
769		t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
770	}
771
772	// Verify msg2 never received as subscriber has terminated.
773	msg2Barrier.Release()
774	receiver.VerifyNoMsgs()
775}
776
777func (as *assigningSubscriber) Partitions() []int {
778	as.mu.Lock()
779	defer as.mu.Unlock()
780
781	var partitions []int
782	for p := range as.subscribers {
783		partitions = append(partitions, p)
784	}
785	sort.Ints(partitions)
786	return partitions
787}
788
789func (as *assigningSubscriber) FlushCommits() {
790	as.mu.Lock()
791	defer as.mu.Unlock()
792
793	for _, sub := range as.subscribers {
794		sub.committer.commitOffsetToStream()
795	}
796}
797
798func newTestAssigningSubscriber(t *testing.T, receiverFunc MessageReceiverFunc, subscriptionPath string) *assigningSubscriber {
799	ctx := context.Background()
800	subClient, err := newSubscriberClient(ctx, "ignored", testServer.ClientConn())
801	if err != nil {
802		t.Fatal(err)
803	}
804	cursorClient, err := newCursorClient(ctx, "ignored", testServer.ClientConn())
805	if err != nil {
806		t.Fatal(err)
807	}
808	assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testServer.ClientConn())
809	if err != nil {
810		t.Fatal(err)
811	}
812	allClients := apiClients{subClient, cursorClient, assignmentClient}
813
814	f := &singlePartitionSubscriberFactory{
815		ctx:              ctx,
816		subClient:        subClient,
817		cursorClient:     cursorClient,
818		settings:         testSubscriberSettings(),
819		subscriptionPath: subscriptionPath,
820		receiver:         receiverFunc,
821		disableTasks:     true, // Background tasks disabled to control event order
822	}
823	sub, err := newAssigningSubscriber(allClients, assignmentClient, fakeGenerateUUID, f)
824	if err != nil {
825		t.Fatal(err)
826	}
827	sub.Start()
828	return sub
829}
830
831func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
832	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
833	receiver := newTestMessageReceiver(t)
834	msg1 := seqMsgWithOffsetAndSize(33, 100)
835	msg2 := seqMsgWithOffsetAndSize(34, 200)
836	msg3 := seqMsgWithOffsetAndSize(66, 100)
837	msg4 := seqMsgWithOffsetAndSize(67, 100)
838	msg5 := seqMsgWithOffsetAndSize(88, 100)
839
840	verifiers := test.NewVerifiers(t)
841
842	// Assignment stream
843	asnStream := test.NewRPCVerifier(t)
844	asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 6}), nil)
845	assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{3, 8}), nil)
846	asnStream.Push(assignmentAckReq(), nil, nil)
847	verifiers.AddAssignmentStream(subscription, asnStream)
848
849	// Partition 3
850	subStream3 := test.NewRPCVerifier(t)
851	subStream3.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 3}), initSubResp(), nil)
852	subStream3.Push(initFlowControlReq(), msgSubResp(msg1), nil)
853	msg2Barrier := subStream3.PushWithBarrier(nil, msgSubResp(msg2), nil)
854	verifiers.AddSubscribeStream(subscription, 3, subStream3)
855
856	cmtStream3 := test.NewRPCVerifier(t)
857	cmtStream3.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 3}), initCommitResp(), nil)
858	cmtStream3.Push(commitReq(34), commitResp(1), nil)
859	cmtStream3.Push(commitReq(35), commitResp(1), nil)
860	verifiers.AddCommitStream(subscription, 3, cmtStream3)
861
862	// Partition 6
863	subStream6 := test.NewRPCVerifier(t)
864	subStream6.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 6}), initSubResp(), nil)
865	subStream6.Push(initFlowControlReq(), msgSubResp(msg3), nil)
866	// msg4 should not be received.
867	msg4Barrier := subStream6.PushWithBarrier(nil, msgSubResp(msg4), nil)
868	verifiers.AddSubscribeStream(subscription, 6, subStream6)
869
870	cmtStream6 := test.NewRPCVerifier(t)
871	cmtStream6.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 6}), initCommitResp(), nil)
872	cmtStream6.Push(commitReq(67), commitResp(1), nil)
873	verifiers.AddCommitStream(subscription, 6, cmtStream6)
874
875	// Partition 8
876	subStream8 := test.NewRPCVerifier(t)
877	subStream8.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 8}), initSubResp(), nil)
878	subStream8.Push(initFlowControlReq(), msgSubResp(msg5), nil)
879	verifiers.AddSubscribeStream(subscription, 8, subStream8)
880
881	cmtStream8 := test.NewRPCVerifier(t)
882	cmtStream8.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 8}), initCommitResp(), nil)
883	cmtStream8.Push(commitReq(89), commitResp(1), nil)
884	verifiers.AddCommitStream(subscription, 8, cmtStream8)
885
886	mockServer.OnTestStart(verifiers)
887	defer mockServer.OnTestEnd()
888
889	sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription)
890	if gotErr := sub.WaitStarted(); gotErr != nil {
891		t.Errorf("Start() got err: (%v)", gotErr)
892	}
893
894	// Partition assignments are initially {3, 6}.
895	receiver.ValidateMsgs(join(partitionMsgs(3, msg1), partitionMsgs(6, msg3)))
896	if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) {
897		t.Errorf("subscriber partitions: got %d, want %d", got, want)
898	}
899
900	// Partition assignments will now be {3, 8}.
901	assignmentBarrier.Release()
902	receiver.ValidateMsgs(partitionMsgs(8, msg5))
903	if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) {
904		t.Errorf("subscriber partitions: got %d, want %d", got, want)
905	}
906
907	// msg2 is from partition 3 and should be received. msg4 is from partition 6
908	// (removed) and should be discarded.
909	sub.FlushCommits()
910	msg2Barrier.Release()
911	msg4Barrier.Release()
912	receiver.ValidateMsgs(partitionMsgs(3, msg2))
913
914	// Stop should flush all commit cursors.
915	sub.Stop()
916	if gotErr := sub.WaitStopped(); gotErr != nil {
917		t.Errorf("Stop() got err: (%v)", gotErr)
918	}
919}
920
921func TestAssigningSubscriberPermanentError(t *testing.T) {
922	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
923	receiver := newTestMessageReceiver(t)
924	msg1 := seqMsgWithOffsetAndSize(11, 100)
925	msg2 := seqMsgWithOffsetAndSize(22, 200)
926	serverErr := status.Error(codes.FailedPrecondition, "failed")
927
928	verifiers := test.NewVerifiers(t)
929
930	// Assignment stream
931	asnStream := test.NewRPCVerifier(t)
932	asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
933	errBarrier := asnStream.PushWithBarrier(assignmentAckReq(), nil, serverErr)
934	verifiers.AddAssignmentStream(subscription, asnStream)
935
936	// Partition 1
937	subStream1 := test.NewRPCVerifier(t)
938	subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
939	subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
940	verifiers.AddSubscribeStream(subscription, 1, subStream1)
941
942	cmtStream1 := test.NewRPCVerifier(t)
943	cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
944	cmtStream1.Push(commitReq(12), commitResp(1), nil)
945	verifiers.AddCommitStream(subscription, 1, cmtStream1)
946
947	// Partition 2
948	subStream2 := test.NewRPCVerifier(t)
949	subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
950	subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
951	verifiers.AddSubscribeStream(subscription, 2, subStream2)
952
953	cmtStream2 := test.NewRPCVerifier(t)
954	cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
955	cmtStream2.Push(commitReq(23), commitResp(1), nil)
956	verifiers.AddCommitStream(subscription, 2, cmtStream2)
957
958	mockServer.OnTestStart(verifiers)
959	defer mockServer.OnTestEnd()
960
961	sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription)
962	if gotErr := sub.WaitStarted(); gotErr != nil {
963		t.Errorf("Start() got err: (%v)", gotErr)
964	}
965	receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg2)))
966
967	// Permanent assignment stream error should terminate subscriber. Commits are
968	// still flushed.
969	errBarrier.Release()
970	if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
971		t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
972	}
973}
974
975func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
976	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
977	receiver := newTestMessageReceiver(t)
978	msg1 := seqMsgWithOffsetAndSize(11, 100)
979	msg2 := seqMsgWithOffsetAndSize(22, 200)
980
981	verifiers := test.NewVerifiers(t)
982
983	// Assignment stream
984	asnStream := test.NewRPCVerifier(t)
985	asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
986	assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{}), nil)
987	assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil)
988	verifiers.AddAssignmentStream(subscription, asnStream)
989
990	// Partition 1
991	subStream := test.NewRPCVerifier(t)
992	subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
993	subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
994	verifiers.AddSubscribeStream(subscription, 1, subStream)
995
996	cmtStream := test.NewRPCVerifier(t)
997	cmtStream.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
998	cmtStream.Push(commitReq(12), commitResp(1), nil)
999	verifiers.AddCommitStream(subscription, 1, cmtStream)
1000
1001	mockServer.OnTestStart(verifiers)
1002	defer mockServer.OnTestEnd()
1003
1004	sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription)
1005	if gotErr := sub.WaitStarted(); gotErr != nil {
1006		t.Errorf("Start() got err: (%v)", gotErr)
1007	}
1008
1009	// Partition assignments are initially {1}.
1010	receiver.ValidateMsg(msg1).Ack()
1011	ack2 := receiver.ValidateMsg(msg2)
1012
1013	// Partition assignments will now be {}.
1014	assignmentBarrier1.Release()
1015	assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic
1016
1017	// Partition 1 has already been unassigned, so this ack is discarded.
1018	ack2.Ack()
1019
1020	sub.Stop()
1021	if gotErr := sub.WaitStopped(); gotErr != nil {
1022		t.Errorf("Stop() got err: (%v)", gotErr)
1023	}
1024}
1025
1026func TestNewSubscriberCreatesCorrectImpl(t *testing.T) {
1027	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
1028	const region = "us-central1"
1029	receiver := newTestMessageReceiver(t)
1030
1031	sub, err := NewSubscriber(context.Background(), DefaultReceiveSettings, receiver.onMessage, region, subscription)
1032	if err != nil {
1033		t.Errorf("NewSubscriber() got error: %v", err)
1034	} else if _, ok := sub.(*assigningSubscriber); !ok {
1035		t.Error("NewSubscriber() did not return a assigningSubscriber")
1036	}
1037
1038	settings := DefaultReceiveSettings
1039	settings.Partitions = []int{1, 2, 3}
1040	sub, err = NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription)
1041	if err != nil {
1042		t.Errorf("NewSubscriber() got error: %v", err)
1043	} else if _, ok := sub.(*multiPartitionSubscriber); !ok {
1044		t.Error("NewSubscriber() did not return a multiPartitionSubscriber")
1045	}
1046}
1047
1048func TestNewSubscriberValidatesSettings(t *testing.T) {
1049	const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
1050	const region = "us-central1"
1051	receiver := newTestMessageReceiver(t)
1052
1053	settings := DefaultReceiveSettings
1054	settings.MaxOutstandingMessages = 0
1055	if _, err := NewSubscriber(context.Background(), settings, receiver.onMessage, region, subscription); err == nil {
1056		t.Error("NewSubscriber() did not return error")
1057	}
1058}
1059