1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"bytes"
18	"context"
19	"math/rand"
20	"testing"
21	"time"
22
23	"cloud.google.com/go/pubsublite/internal/test"
24	"google.golang.org/grpc/codes"
25	"google.golang.org/grpc/status"
26
27	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
28)
29
30func testPublishSettings() PublishSettings {
31	settings := DefaultPublishSettings
32	// Send 1 message at a time to make tests deterministic.
33	settings.CountThreshold = 1
34	// Send messages with minimal delay to speed up tests.
35	settings.DelayThreshold = time.Millisecond
36	settings.Timeout = 5 * time.Second
37	// Set long poll period to prevent background update, but still have non-zero
38	// request timeout.
39	settings.ConfigPollPeriod = 1 * time.Minute
40	return settings
41}
42
43// testPartitionPublisher wraps a singlePartitionPublisher for ease of testing.
44type testPartitionPublisher struct {
45	pub *singlePartitionPublisher
46	serviceTestProxy
47}
48
49func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher {
50	ctx := context.Background()
51	pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
52	if err != nil {
53		t.Fatal(err)
54	}
55
56	pubFactory := &singlePartitionPublisherFactory{
57		ctx:       ctx,
58		pubClient: pubClient,
59		settings:  settings,
60		topicPath: topic.Path,
61	}
62	tp := &testPartitionPublisher{
63		pub: pubFactory.New(topic.Partition),
64	}
65	tp.initAndStart(t, tp.pub, "Publisher", pubClient)
66	return tp
67}
68
69func (tp *testPartitionPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver {
70	result := newTestPublishResultReceiver(tp.t, msg)
71	tp.pub.Publish(msg, result.set)
72	return result
73}
74
75func (tp *testPartitionPublisher) FinalError() (err error) {
76	err = tp.serviceTestProxy.FinalError()
77
78	// Verify that the stream has terminated.
79	if gotStatus, wantStatus := tp.pub.stream.Status(), streamTerminated; gotStatus != wantStatus {
80		tp.t.Errorf("%s retryableStream status: %v, want: %v", tp.name, gotStatus, wantStatus)
81	}
82	if tp.pub.stream.currentStream() != nil {
83		tp.t.Errorf("%s client stream should be nil", tp.name)
84	}
85	return
86}
87
88func TestSinglePartitionPublisherInvalidInitialResponse(t *testing.T) {
89	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
90
91	verifiers := test.NewVerifiers(t)
92	stream := test.NewRPCVerifier(t)
93	stream.Push(initPubReq(topic), msgPubResp(0), nil) // Publish response instead of initial response
94	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
95
96	mockServer.OnTestStart(verifiers)
97	defer mockServer.OnTestEnd()
98
99	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
100
101	wantErr := errInvalidInitialPubResponse
102	if gotErr := pub.StartError(); !test.ErrorEqual(gotErr, wantErr) {
103		t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr)
104	}
105	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
106		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
107	}
108}
109
110func TestSinglePartitionPublisherSpuriousPublishResponse(t *testing.T) {
111	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
112
113	verifiers := test.NewVerifiers(t)
114	stream := test.NewRPCVerifier(t)
115	stream.Push(initPubReq(topic), initPubResp(), nil)
116	barrier := stream.PushWithBarrier(nil, msgPubResp(0), nil) // Publish response with no messages
117	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
118
119	mockServer.OnTestStart(verifiers)
120	defer mockServer.OnTestEnd()
121
122	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
123	if gotErr := pub.StartError(); gotErr != nil {
124		t.Errorf("Start() got err: (%v)", gotErr)
125	}
126
127	// Send after startup to ensure the test is deterministic.
128	barrier.Release()
129	if gotErr, wantErr := pub.FinalError(), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
130		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
131	}
132}
133
134func TestSinglePartitionPublisherBatching(t *testing.T) {
135	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
136	settings := testPublishSettings()
137	settings.DelayThreshold = time.Minute // Batching delay disabled, tested elsewhere
138	settings.CountThreshold = 3
139
140	// Batch 1
141	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
142	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
143	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
144
145	// Batch 2
146	msg4 := &pb.PubSubMessage{Data: []byte{'3'}}
147
148	verifiers := test.NewVerifiers(t)
149	stream := test.NewRPCVerifier(t)
150	stream.Push(initPubReq(topic), initPubResp(), nil)
151	stream.Push(msgPubReq(msg1, msg2, msg3), msgPubResp(0), nil)
152	stream.Push(msgPubReq(msg4), msgPubResp(33), nil)
153	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
154
155	mockServer.OnTestStart(verifiers)
156	defer mockServer.OnTestEnd()
157
158	pub := newTestSinglePartitionPublisher(t, topic, settings)
159	if gotErr := pub.StartError(); gotErr != nil {
160		t.Errorf("Start() got err: (%v)", gotErr)
161	}
162
163	result1 := pub.Publish(msg1)
164	result2 := pub.Publish(msg2)
165	result3 := pub.Publish(msg3)
166	result4 := pub.Publish(msg4)
167	// Stop flushes pending messages.
168	pub.Stop()
169
170	result1.ValidateResult(topic.Partition, 0)
171	result2.ValidateResult(topic.Partition, 1)
172	result3.ValidateResult(topic.Partition, 2)
173	result4.ValidateResult(topic.Partition, 33)
174
175	if gotErr := pub.FinalError(); gotErr != nil {
176		t.Errorf("Publisher final err: (%v), want: <nil>", gotErr)
177	}
178}
179
180func TestSinglePartitionPublisherResendMessages(t *testing.T) {
181	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
182
183	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
184	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
185	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
186
187	verifiers := test.NewVerifiers(t)
188
189	// Simulate a transient error that results in a reconnect before any server
190	// publish responses are received.
191	stream1 := test.NewRPCVerifier(t)
192	stream1.Push(initPubReq(topic), initPubResp(), nil)
193	stream1.Push(msgPubReq(msg1), nil, nil)
194	stream1.Push(msgPubReq(msg2), nil, status.Error(codes.Aborted, "server aborted"))
195	verifiers.AddPublishStream(topic.Path, topic.Partition, stream1)
196
197	// The publisher should resend all in-flight batches to the second stream.
198	stream2 := test.NewRPCVerifier(t)
199	stream2.Push(initPubReq(topic), initPubResp(), nil)
200	stream2.Push(msgPubReq(msg1, msg2), msgPubResp(0), nil)
201	stream2.Push(msgPubReq(msg3), msgPubResp(2), nil)
202	verifiers.AddPublishStream(topic.Path, topic.Partition, stream2)
203
204	mockServer.OnTestStart(verifiers)
205	defer mockServer.OnTestEnd()
206
207	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
208	defer pub.StopVerifyNoError()
209	if gotErr := pub.StartError(); gotErr != nil {
210		t.Errorf("Start() got err: (%v)", gotErr)
211	}
212
213	result1 := pub.Publish(msg1)
214	result2 := pub.Publish(msg2)
215	result1.ValidateResult(topic.Partition, 0)
216	result2.ValidateResult(topic.Partition, 1)
217
218	result3 := pub.Publish(msg3)
219	result3.ValidateResult(topic.Partition, 2)
220}
221
222func TestSinglePartitionPublisherPublishPermanentError(t *testing.T) {
223	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
224	permError := status.Error(codes.NotFound, "topic deleted")
225
226	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
227	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
228	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
229
230	verifiers := test.NewVerifiers(t)
231	stream := test.NewRPCVerifier(t)
232	stream.Push(initPubReq(topic), initPubResp(), nil)
233	stream.Push(msgPubReq(msg1), nil, permError) // Permanent error terminates publisher
234	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
235
236	mockServer.OnTestStart(verifiers)
237	defer mockServer.OnTestEnd()
238
239	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
240	if gotErr := pub.StartError(); gotErr != nil {
241		t.Errorf("Start() got err: (%v)", gotErr)
242	}
243
244	result1 := pub.Publish(msg1)
245	result2 := pub.Publish(msg2)
246	result1.ValidateError(permError)
247	result2.ValidateError(permError)
248
249	// This message arrives after the publisher has already stopped, so its error
250	// message is ErrServiceStopped.
251	result3 := pub.Publish(msg3)
252	result3.ValidateError(ErrServiceStopped)
253
254	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, permError) {
255		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, permError)
256	}
257}
258
259func TestSinglePartitionPublisherBufferOverflow(t *testing.T) {
260	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
261	settings := testPublishSettings()
262	settings.BufferedByteLimit = 15
263
264	msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)}
265	msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} // Causes overflow
266	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
267
268	verifiers := test.NewVerifiers(t)
269	stream := test.NewRPCVerifier(t)
270	stream.Push(initPubReq(topic), initPubResp(), nil)
271	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(0), nil)
272	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
273
274	mockServer.OnTestStart(verifiers)
275	defer mockServer.OnTestEnd()
276
277	pub := newTestSinglePartitionPublisher(t, topic, settings)
278	if gotErr := pub.StartError(); gotErr != nil {
279		t.Errorf("Start() got err: (%v)", gotErr)
280	}
281
282	result1 := pub.Publish(msg1)
283	// Overflow is detected, which terminates the publisher, but previous messages
284	// are flushed.
285	result2 := pub.Publish(msg2)
286	// Delay the server response for the first Publish to verify that it is
287	// allowed to complete.
288	barrier.Release()
289	// This message arrives after the publisher has already stopped, so its error
290	// message is ErrServiceStopped.
291	result3 := pub.Publish(msg3)
292
293	result1.ValidateResult(topic.Partition, 0)
294	result2.ValidateError(ErrOverflow)
295	result3.ValidateError(ErrServiceStopped)
296
297	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, ErrOverflow) {
298		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, ErrOverflow)
299	}
300}
301
302func TestSinglePartitionPublisherBufferRefill(t *testing.T) {
303	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
304	settings := testPublishSettings()
305	settings.BufferedByteLimit = 15
306
307	msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)}
308	msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)}
309
310	verifiers := test.NewVerifiers(t)
311	stream := test.NewRPCVerifier(t)
312	stream.Push(initPubReq(topic), initPubResp(), nil)
313	stream.Push(msgPubReq(msg1), msgPubResp(0), nil)
314	stream.Push(msgPubReq(msg2), msgPubResp(1), nil)
315	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
316
317	mockServer.OnTestStart(verifiers)
318	defer mockServer.OnTestEnd()
319
320	pub := newTestSinglePartitionPublisher(t, topic, settings)
321	if gotErr := pub.StartError(); gotErr != nil {
322		t.Errorf("Start() got err: (%v)", gotErr)
323	}
324
325	result1 := pub.Publish(msg1)
326	result1.ValidateResult(topic.Partition, 0)
327
328	// No overflow because msg2 is sent after the response for msg1 is received.
329	result2 := pub.Publish(msg2)
330	result2.ValidateResult(topic.Partition, 1)
331
332	pub.StopVerifyNoError()
333}
334
335func TestSinglePartitionPublisherInvalidCursorOffsets(t *testing.T) {
336	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
337
338	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
339	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
340	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
341
342	verifiers := test.NewVerifiers(t)
343	stream := test.NewRPCVerifier(t)
344	stream.Push(initPubReq(topic), initPubResp(), nil)
345	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(4), nil)
346	// The server returns an inconsistent cursor offset for msg2, which causes the
347	// publisher client to fail permanently.
348	stream.Push(msgPubReq(msg2), msgPubResp(4), nil)
349	stream.Push(msgPubReq(msg3), msgPubResp(5), nil)
350	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
351
352	mockServer.OnTestStart(verifiers)
353	defer mockServer.OnTestEnd()
354
355	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
356	if gotErr := pub.StartError(); gotErr != nil {
357		t.Errorf("Start() got err: (%v)", gotErr)
358	}
359
360	result1 := pub.Publish(msg1)
361	result2 := pub.Publish(msg2)
362	result3 := pub.Publish(msg3)
363	barrier.Release()
364
365	result1.ValidateResult(topic.Partition, 4)
366
367	// msg2 and subsequent messages are errored.
368	wantMsg := "server returned publish response with inconsistent start offset"
369	result2.ValidateErrorMsg(wantMsg)
370	result3.ValidateErrorMsg(wantMsg)
371	if gotErr := pub.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) {
372		t.Errorf("Publisher final err: (%v), want msg: %q", gotErr, wantMsg)
373	}
374}
375
376func TestSinglePartitionPublisherInvalidServerPublishResponse(t *testing.T) {
377	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
378	msg := &pb.PubSubMessage{Data: []byte{'1'}}
379
380	verifiers := test.NewVerifiers(t)
381	stream := test.NewRPCVerifier(t)
382	stream.Push(initPubReq(topic), initPubResp(), nil)
383	// Server sends duplicate initial publish response, which causes the publisher
384	// client to fail permanently.
385	stream.Push(msgPubReq(msg), initPubResp(), nil)
386	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
387
388	mockServer.OnTestStart(verifiers)
389	defer mockServer.OnTestEnd()
390
391	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
392	if gotErr := pub.StartError(); gotErr != nil {
393		t.Errorf("Start() got err: (%v)", gotErr)
394	}
395
396	result := pub.Publish(msg)
397
398	wantErr := errInvalidMsgPubResponse
399	result.ValidateError(wantErr)
400	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
401		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
402	}
403}
404
405func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) {
406	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
407	finalErr := status.Error(codes.FailedPrecondition, "invalid message")
408
409	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
410	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
411	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
412	msg4 := &pb.PubSubMessage{Data: []byte{'4'}}
413
414	verifiers := test.NewVerifiers(t)
415	stream := test.NewRPCVerifier(t)
416	stream.Push(initPubReq(topic), initPubResp(), nil)
417	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(5), nil)
418	stream.Push(msgPubReq(msg2), msgPubResp(6), nil)
419	stream.Push(msgPubReq(msg3), nil, finalErr)
420	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
421
422	mockServer.OnTestStart(verifiers)
423	defer mockServer.OnTestEnd()
424
425	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
426	if gotErr := pub.StartError(); gotErr != nil {
427		t.Errorf("Start() got err: (%v)", gotErr)
428	}
429
430	result1 := pub.Publish(msg1)
431	result2 := pub.Publish(msg2)
432	result3 := pub.Publish(msg3)
433	pub.Stop()
434	barrier.Release()
435	result4 := pub.Publish(msg4)
436
437	// First 2 messages should be allowed to complete.
438	result1.ValidateResult(topic.Partition, 5)
439	result2.ValidateResult(topic.Partition, 6)
440	// msg3 failed with a server error, which should result in the publisher
441	// terminating with an error.
442	result3.ValidateError(finalErr)
443	// msg4 was sent after the user called Stop(), so should fail immediately with
444	// ErrServiceStopped.
445	result4.ValidateError(ErrServiceStopped)
446
447	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, finalErr) {
448		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr)
449	}
450}
451
452func TestSinglePartitionPublisherPublishWhileStarting(t *testing.T) {
453	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
454	msg := &pb.PubSubMessage{Data: []byte{'1'}}
455
456	verifiers := test.NewVerifiers(t)
457	stream := test.NewRPCVerifier(t)
458	stream.Push(initPubReq(topic), initPubResp(), nil)
459	stream.Push(msgPubReq(msg), msgPubResp(42), nil)
460	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
461
462	mockServer.OnTestStart(verifiers)
463	defer mockServer.OnTestEnd()
464
465	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
466
467	// Did not wait for publisher to finish startup. But it should send msg once
468	// the Publish stream connects.
469	result := pub.Publish(msg)
470	result.ValidateResult(topic.Partition, 42)
471
472	pub.StopVerifyNoError()
473}
474
475func TestSinglePartitionPublisherPublishWhileStartingFails(t *testing.T) {
476	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
477	msg := &pb.PubSubMessage{Data: []byte{'1'}}
478	serverErr := status.Error(codes.FailedPrecondition, "failed")
479
480	verifiers := test.NewVerifiers(t)
481	stream := test.NewRPCVerifier(t)
482	barrier := stream.PushWithBarrier(initPubReq(topic), nil, serverErr)
483	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
484
485	mockServer.OnTestStart(verifiers)
486	defer mockServer.OnTestEnd()
487
488	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
489
490	// Published during startup.
491	result := pub.Publish(msg)
492	// Send the initial response (with error) to complete startup.
493	barrier.Release()
494
495	result.ValidateError(serverErr)
496	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
497		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, serverErr)
498	}
499}
500
501// testRoutingPublisher wraps a routingPublisher for testing.
502type testRoutingPublisher struct {
503	t   *testing.T
504	pub *routingPublisher
505}
506
507func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher {
508	ctx := context.Background()
509	pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
510	if err != nil {
511		t.Fatal(err)
512	}
513	adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
514	if err != nil {
515		t.Fatal(err)
516	}
517	allClients := apiClients{pubClient, adminClient}
518
519	source := &test.FakeSource{Ret: fakeSourceVal}
520	msgRouterFactory := newMessageRouterFactory(rand.New(source))
521	pubFactory := &singlePartitionPublisherFactory{
522		ctx:       ctx,
523		pubClient: pubClient,
524		settings:  settings,
525		topicPath: topicPath,
526	}
527	pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory)
528	pub.Start()
529	return &testRoutingPublisher{t: t, pub: pub}
530}
531
532func (tp *testRoutingPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver {
533	result := newTestPublishResultReceiver(tp.t, msg)
534	tp.pub.Publish(msg, result.set)
535	return result
536}
537
538func (tp *testRoutingPublisher) NumPartitionPublishers() int {
539	tp.pub.mu.Lock()
540	defer tp.pub.mu.Unlock()
541	return len(tp.pub.publishers)
542}
543
544func (tp *testRoutingPublisher) Start()             { tp.pub.Start() }
545func (tp *testRoutingPublisher) Stop()              { tp.pub.Stop() }
546func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() }
547func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() }
548
549func TestRoutingPublisherStartOnce(t *testing.T) {
550	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
551	numPartitions := 2
552
553	verifiers := test.NewVerifiers(t)
554	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
555
556	stream0 := test.NewRPCVerifier(t)
557	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
558	verifiers.AddPublishStream(topic, 0, stream0)
559
560	stream1 := test.NewRPCVerifier(t)
561	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
562	verifiers.AddPublishStream(topic, 1, stream1)
563
564	mockServer.OnTestStart(verifiers)
565	defer mockServer.OnTestEnd()
566
567	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
568
569	t.Run("First succeeds", func(t *testing.T) {
570		// Note: newTestRoutingPublisher() called Start.
571		if gotErr := pub.WaitStarted(); gotErr != nil {
572			t.Errorf("Start() got err: (%v)", gotErr)
573		}
574		if got, want := pub.NumPartitionPublishers(), numPartitions; got != want {
575			t.Errorf("Num partition publishers: got %d, want %d", got, want)
576		}
577	})
578	t.Run("Second no-op", func(t *testing.T) {
579		// An error is not returned, but no new streams are opened. The mock server
580		// does not expect more RPCs.
581		pub.Start()
582		if gotErr := pub.WaitStarted(); gotErr != nil {
583			t.Errorf("Start() got err: (%v)", gotErr)
584		}
585	})
586
587	pub.Stop()
588	if gotErr := pub.WaitStopped(); gotErr != nil {
589		t.Errorf("Stop() got err: (%v)", gotErr)
590	}
591}
592
593func TestRoutingPublisherStartStop(t *testing.T) {
594	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
595	numPartitions := 2
596
597	verifiers := test.NewVerifiers(t)
598	barrier := verifiers.GlobalVerifier.PushWithBarrier(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
599
600	mockServer.OnTestStart(verifiers)
601	defer mockServer.OnTestEnd()
602
603	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
604	pub.Stop()
605	barrier.Release()
606
607	if gotErr := pub.WaitStopped(); gotErr != nil {
608		t.Errorf("Stop() got err: (%v)", gotErr)
609	}
610	// No publishers should be created.
611	if got, want := pub.NumPartitionPublishers(), 0; got != want {
612		t.Errorf("Num partition publishers: got %d, want %d", got, want)
613	}
614}
615
616func TestRoutingPublisherRoundRobin(t *testing.T) {
617	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
618	numPartitions := 3
619
620	// Messages have no ordering key, so the roundRobinMsgRouter is used.
621	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
622	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
623	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
624	msg4 := &pb.PubSubMessage{Data: []byte{'4'}}
625
626	verifiers := test.NewVerifiers(t)
627	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
628
629	// Partition 0
630	stream0 := test.NewRPCVerifier(t)
631	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
632	stream0.Push(msgPubReq(msg3), msgPubResp(34), nil)
633	verifiers.AddPublishStream(topic, 0, stream0)
634
635	// Partition 1
636	stream1 := test.NewRPCVerifier(t)
637	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
638	stream1.Push(msgPubReq(msg1), msgPubResp(41), nil)
639	stream1.Push(msgPubReq(msg4), msgPubResp(42), nil)
640	verifiers.AddPublishStream(topic, 1, stream1)
641
642	// Partition 2
643	stream2 := test.NewRPCVerifier(t)
644	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
645	stream2.Push(msgPubReq(msg2), msgPubResp(78), nil)
646	verifiers.AddPublishStream(topic, 2, stream2)
647
648	mockServer.OnTestStart(verifiers)
649	defer mockServer.OnTestEnd()
650
651	// Note: The fake source is initialized with value=1, so Partition=1 publisher
652	// will be the first chosen by the roundRobinMsgRouter.
653	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1)
654	if err := pub.WaitStarted(); err != nil {
655		t.Errorf("Start() got err: (%v)", err)
656	}
657
658	result1 := pub.Publish(msg1)
659	result2 := pub.Publish(msg2)
660	result3 := pub.Publish(msg3)
661	result4 := pub.Publish(msg4)
662
663	result1.ValidateResult(1, 41)
664	result2.ValidateResult(2, 78)
665	result3.ValidateResult(0, 34)
666	result4.ValidateResult(1, 42)
667
668	pub.Stop()
669	if err := pub.WaitStopped(); err != nil {
670		t.Errorf("Stop() got err: (%v)", err)
671	}
672}
673
674func TestRoutingPublisherHashing(t *testing.T) {
675	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
676	numPartitions := 3
677
678	key0 := []byte("bar") // hashes to partition 0
679	key1 := []byte("baz") // hashes to partition 1
680	key2 := []byte("foo") // hashes to partition 2
681
682	// Messages have ordering key, so the hashingMsgRouter is used.
683	msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2}
684	msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0}
685	msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2}
686	msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1}
687	msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0}
688
689	verifiers := test.NewVerifiers(t)
690	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
691
692	// Partition 0
693	stream0 := test.NewRPCVerifier(t)
694	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
695	stream0.Push(msgPubReq(msg2), msgPubResp(20), nil)
696	stream0.Push(msgPubReq(msg5), msgPubResp(21), nil)
697	verifiers.AddPublishStream(topic, 0, stream0)
698
699	// Partition 1
700	stream1 := test.NewRPCVerifier(t)
701	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
702	stream1.Push(msgPubReq(msg4), msgPubResp(40), nil)
703	verifiers.AddPublishStream(topic, 1, stream1)
704
705	// Partition 2
706	stream2 := test.NewRPCVerifier(t)
707	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
708	stream2.Push(msgPubReq(msg1), msgPubResp(10), nil)
709	stream2.Push(msgPubReq(msg3), msgPubResp(11), nil)
710	verifiers.AddPublishStream(topic, 2, stream2)
711
712	mockServer.OnTestStart(verifiers)
713	defer mockServer.OnTestEnd()
714
715	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
716	if err := pub.WaitStarted(); err != nil {
717		t.Errorf("Start() got err: (%v)", err)
718	}
719
720	result1 := pub.Publish(msg1)
721	result2 := pub.Publish(msg2)
722	result3 := pub.Publish(msg3)
723	result4 := pub.Publish(msg4)
724	result5 := pub.Publish(msg5)
725
726	result1.ValidateResult(2, 10)
727	result2.ValidateResult(0, 20)
728	result3.ValidateResult(2, 11)
729	result4.ValidateResult(1, 40)
730	result5.ValidateResult(0, 21)
731
732	pub.Stop()
733	if err := pub.WaitStopped(); err != nil {
734		t.Errorf("Stop() got err: (%v)", err)
735	}
736}
737
738func TestRoutingPublisherPermanentError(t *testing.T) {
739	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
740	numPartitions := 2
741	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
742	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
743	serverErr := status.Error(codes.FailedPrecondition, "failed")
744
745	verifiers := test.NewVerifiers(t)
746	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
747
748	// Partition 0
749	stream0 := test.NewRPCVerifier(t)
750	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
751	stream0.Push(msgPubReq(msg1), msgPubResp(34), nil)
752	verifiers.AddPublishStream(topic, 0, stream0)
753
754	// Partition 1. Fails due to permanent error, which will also shut down
755	// partition-0 publisher, but it should be allowed to flush its pending
756	// messages.
757	stream1 := test.NewRPCVerifier(t)
758	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
759	stream1.Push(msgPubReq(msg2), nil, serverErr)
760	verifiers.AddPublishStream(topic, 1, stream1)
761
762	mockServer.OnTestStart(verifiers)
763	defer mockServer.OnTestEnd()
764
765	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
766	if err := pub.WaitStarted(); err != nil {
767		t.Errorf("Start() got err: (%v)", err)
768	}
769
770	result1 := pub.Publish(msg1)
771	result2 := pub.Publish(msg2)
772
773	result1.ValidateResult(0, 34)
774	result2.ValidateError(serverErr)
775
776	if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
777		t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
778	}
779}
780
781func TestRoutingPublisherPublishAfterStop(t *testing.T) {
782	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
783	numPartitions := 2
784	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
785	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
786
787	verifiers := test.NewVerifiers(t)
788	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
789
790	// Partition 0
791	stream0 := test.NewRPCVerifier(t)
792	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
793	verifiers.AddPublishStream(topic, 0, stream0)
794
795	// Partition 1
796	stream1 := test.NewRPCVerifier(t)
797	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
798	verifiers.AddPublishStream(topic, 1, stream1)
799
800	mockServer.OnTestStart(verifiers)
801	defer mockServer.OnTestEnd()
802
803	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
804	if err := pub.WaitStarted(); err != nil {
805		t.Errorf("Start() got err: (%v)", err)
806	}
807
808	pub.Stop()
809	result1 := pub.Publish(msg1)
810	result2 := pub.Publish(msg2)
811
812	result1.ValidateError(ErrServiceStopped)
813	result2.ValidateError(ErrServiceStopped)
814
815	if err := pub.WaitStopped(); err != nil {
816		t.Errorf("Stop() got err: (%v)", err)
817	}
818}
819
820func TestRoutingPublisherPartitionCountFail(t *testing.T) {
821	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
822	wantErr := status.Error(codes.NotFound, "no exist")
823
824	// Retrieving the number of partitions results in an error. Startup cannot
825	// proceed.
826	verifiers := test.NewVerifiers(t)
827	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, wantErr)
828
829	mockServer.OnTestStart(verifiers)
830	defer mockServer.OnTestEnd()
831
832	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
833
834	if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) {
835		t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr)
836	}
837	if got, want := pub.NumPartitionPublishers(), 0; got != want {
838		t.Errorf("Num partition publishers: got %d, want %d", got, want)
839	}
840
841	// Verify that the publisher does not attempt to restart. The mock server does
842	// not expect more RPCs.
843	pub.Start()
844}
845
846func TestRoutingPublisherPartitionCountInvalid(t *testing.T) {
847	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
848
849	// The number of partitions returned by the server must be valid, otherwise
850	// startup cannot proceed.
851	verifiers := test.NewVerifiers(t)
852	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil)
853
854	mockServer.OnTestStart(verifiers)
855	defer mockServer.OnTestEnd()
856
857	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
858
859	wantMsg := "topic has invalid number of partitions"
860	if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) {
861		t.Errorf("Start() got err: (%v), want msg: %q", gotErr, wantMsg)
862	}
863	if got, want := pub.NumPartitionPublishers(), 0; got != want {
864		t.Errorf("Num partition publishers: got %d, want %d", got, want)
865	}
866}
867
868func TestRoutingPublisherPartitionCountIncreases(t *testing.T) {
869	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
870	initialPartitionCount := 1
871	updatedPartitionCount := 3
872	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
873	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
874	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
875
876	verifiers := test.NewVerifiers(t)
877	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
878	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil)
879
880	stream0 := test.NewRPCVerifier(t)
881	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
882	stream0.Push(msgPubReq(msg1), msgPubResp(11), nil)
883	verifiers.AddPublishStream(topic, 0, stream0)
884
885	stream1 := test.NewRPCVerifier(t)
886	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
887	stream1.Push(msgPubReq(msg2), msgPubResp(22), nil)
888	verifiers.AddPublishStream(topic, 1, stream1)
889
890	stream2 := test.NewRPCVerifier(t)
891	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
892	stream2.Push(msgPubReq(msg3), msgPubResp(33), nil)
893	verifiers.AddPublishStream(topic, 2, stream2)
894
895	mockServer.OnTestStart(verifiers)
896	defer mockServer.OnTestEnd()
897
898	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
899
900	t.Run("Initial count", func(t *testing.T) {
901		if gotErr := pub.WaitStarted(); gotErr != nil {
902			t.Errorf("Start() got err: (%v)", gotErr)
903		}
904		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
905			t.Errorf("Num partition publishers: got %d, want %d", got, want)
906		}
907	})
908	t.Run("Updated count", func(t *testing.T) {
909		pub.pub.partitionWatcher.updatePartitionCount()
910		if got, want := pub.NumPartitionPublishers(), updatedPartitionCount; got != want {
911			t.Errorf("Num partition publishers: got %d, want %d", got, want)
912		}
913	})
914	t.Run("Publish", func(t *testing.T) {
915		result1 := pub.Publish(msg1)
916		result2 := pub.Publish(msg2)
917		result3 := pub.Publish(msg3)
918
919		result1.ValidateResult(0, 11)
920		result2.ValidateResult(1, 22)
921		result3.ValidateResult(2, 33)
922	})
923
924	pub.Stop()
925	if gotErr := pub.WaitStopped(); gotErr != nil {
926		t.Errorf("Stop() got err: (%v)", gotErr)
927	}
928}
929
930func TestRoutingPublisherPartitionCountDecreases(t *testing.T) {
931	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
932	initialPartitionCount := 2
933	updatedPartitionCount := 1
934
935	verifiers := test.NewVerifiers(t)
936	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
937	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil)
938
939	stream0 := test.NewRPCVerifier(t)
940	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
941	verifiers.AddPublishStream(topic, 0, stream0)
942
943	stream1 := test.NewRPCVerifier(t)
944	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
945	verifiers.AddPublishStream(topic, 1, stream1)
946
947	mockServer.OnTestStart(verifiers)
948	defer mockServer.OnTestEnd()
949
950	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
951
952	t.Run("Initial count", func(t *testing.T) {
953		if gotErr := pub.WaitStarted(); gotErr != nil {
954			t.Errorf("Start() got err: (%v)", gotErr)
955		}
956		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
957			t.Errorf("Num partition publishers: got %d, want %d", got, want)
958		}
959	})
960	t.Run("Updated count", func(t *testing.T) {
961		pub.pub.partitionWatcher.updatePartitionCount()
962
963		// Decreasing count ignored.
964		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
965			t.Errorf("Num partition publishers: got %d, want %d", got, want)
966		}
967	})
968
969	pub.Stop()
970	if gotErr := pub.WaitStopped(); gotErr != nil {
971		t.Errorf("Stop() got err: (%v)", gotErr)
972	}
973}
974
975func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) {
976	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
977	initialPartitionCount := 2
978	serverErr := status.Error(codes.NotFound, "deleted")
979
980	verifiers := test.NewVerifiers(t)
981	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
982	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr)
983
984	stream0 := test.NewRPCVerifier(t)
985	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
986	verifiers.AddPublishStream(topic, 0, stream0)
987
988	stream1 := test.NewRPCVerifier(t)
989	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
990	verifiers.AddPublishStream(topic, 1, stream1)
991
992	mockServer.OnTestStart(verifiers)
993	defer mockServer.OnTestEnd()
994
995	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
996
997	t.Run("Initial count", func(t *testing.T) {
998		if gotErr := pub.WaitStarted(); gotErr != nil {
999			t.Errorf("Start() got err: (%v)", gotErr)
1000		}
1001		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
1002			t.Errorf("Num partition publishers: got %d, want %d", got, want)
1003		}
1004	})
1005	t.Run("Failed update", func(t *testing.T) {
1006		pub.pub.partitionWatcher.updatePartitionCount()
1007
1008		// Failed update ignored.
1009		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
1010			t.Errorf("Num partition publishers: got %d, want %d", got, want)
1011		}
1012	})
1013
1014	pub.Stop()
1015	if gotErr := pub.WaitStopped(); gotErr != nil {
1016		t.Errorf("Stop() got err: (%v)", gotErr)
1017	}
1018}
1019
1020func TestNewPublisherValidatesSettings(t *testing.T) {
1021	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
1022	const region = "us-central1"
1023
1024	settings := DefaultPublishSettings
1025	settings.DelayThreshold = 0
1026	if _, err := NewPublisher(context.Background(), settings, region, topic); err == nil {
1027		t.Error("NewPublisher() did not return error")
1028	}
1029}
1030