1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13
14package wire
15
16import (
17	"bytes"
18	"context"
19	"math/rand"
20	"testing"
21	"time"
22
23	"cloud.google.com/go/pubsublite/internal/test"
24	"google.golang.org/grpc/codes"
25	"google.golang.org/grpc/status"
26
27	pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
28)
29
30func testPublishSettings() PublishSettings {
31	settings := DefaultPublishSettings
32	// Send 1 message at a time to make tests deterministic.
33	settings.CountThreshold = 1
34	// Send messages with minimal delay to speed up tests.
35	settings.DelayThreshold = time.Millisecond
36	settings.Timeout = 5 * time.Second
37	// Disable topic partition count background polling.
38	settings.ConfigPollPeriod = 0
39	return settings
40}
41
42// testPartitionPublisher wraps a singlePartitionPublisher for ease of testing.
43type testPartitionPublisher struct {
44	pub *singlePartitionPublisher
45	serviceTestProxy
46}
47
48func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, settings PublishSettings) *testPartitionPublisher {
49	ctx := context.Background()
50	pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
51	if err != nil {
52		t.Fatal(err)
53	}
54
55	pubFactory := &singlePartitionPublisherFactory{
56		ctx:       ctx,
57		pubClient: pubClient,
58		settings:  settings,
59		topicPath: topic.Path,
60	}
61	tp := &testPartitionPublisher{
62		pub: pubFactory.New(topic.Partition),
63	}
64	tp.initAndStart(t, tp.pub, "Publisher", pubClient)
65	return tp
66}
67
68func (tp *testPartitionPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver {
69	result := newTestPublishResultReceiver(tp.t, msg)
70	tp.pub.Publish(msg, result.set)
71	return result
72}
73
74func (tp *testPartitionPublisher) FinalError() (err error) {
75	err = tp.serviceTestProxy.FinalError()
76
77	// Verify that the stream has terminated.
78	if gotStatus, wantStatus := tp.pub.stream.Status(), streamTerminated; gotStatus != wantStatus {
79		tp.t.Errorf("%s retryableStream status: %v, want: %v", tp.name, gotStatus, wantStatus)
80	}
81	if tp.pub.stream.currentStream() != nil {
82		tp.t.Errorf("%s client stream should be nil", tp.name)
83	}
84	return
85}
86
87func TestSinglePartitionPublisherInvalidInitialResponse(t *testing.T) {
88	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
89
90	verifiers := test.NewVerifiers(t)
91	stream := test.NewRPCVerifier(t)
92	stream.Push(initPubReq(topic), msgPubResp(0), nil) // Publish response instead of initial response
93	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
94
95	mockServer.OnTestStart(verifiers)
96	defer mockServer.OnTestEnd()
97
98	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
99
100	wantErr := errInvalidInitialPubResponse
101	if gotErr := pub.StartError(); !test.ErrorEqual(gotErr, wantErr) {
102		t.Errorf("Start() got err: (%v), want: (%v)", gotErr, wantErr)
103	}
104	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
105		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
106	}
107}
108
109func TestSinglePartitionPublisherSpuriousPublishResponse(t *testing.T) {
110	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
111
112	verifiers := test.NewVerifiers(t)
113	stream := test.NewRPCVerifier(t)
114	stream.Push(initPubReq(topic), initPubResp(), nil)
115	barrier := stream.PushWithBarrier(nil, msgPubResp(0), nil) // Publish response with no messages
116	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
117
118	mockServer.OnTestStart(verifiers)
119	defer mockServer.OnTestEnd()
120
121	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
122	if gotErr := pub.StartError(); gotErr != nil {
123		t.Errorf("Start() got err: (%v)", gotErr)
124	}
125
126	// Send after startup to ensure the test is deterministic.
127	barrier.Release()
128	if gotErr, wantErr := pub.FinalError(), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) {
129		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
130	}
131}
132
133func TestSinglePartitionPublisherBatching(t *testing.T) {
134	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
135	settings := testPublishSettings()
136	settings.DelayThreshold = time.Minute // Batching delay disabled, tested elsewhere
137	settings.CountThreshold = 3
138
139	// Batch 1
140	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
141	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
142	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
143
144	// Batch 2
145	msg4 := &pb.PubSubMessage{Data: []byte{'3'}}
146
147	verifiers := test.NewVerifiers(t)
148	stream := test.NewRPCVerifier(t)
149	stream.Push(initPubReq(topic), initPubResp(), nil)
150	stream.Push(msgPubReq(msg1, msg2, msg3), msgPubResp(0), nil)
151	stream.Push(msgPubReq(msg4), msgPubResp(33), nil)
152	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
153
154	mockServer.OnTestStart(verifiers)
155	defer mockServer.OnTestEnd()
156
157	pub := newTestSinglePartitionPublisher(t, topic, settings)
158	if gotErr := pub.StartError(); gotErr != nil {
159		t.Errorf("Start() got err: (%v)", gotErr)
160	}
161
162	result1 := pub.Publish(msg1)
163	result2 := pub.Publish(msg2)
164	result3 := pub.Publish(msg3)
165	result4 := pub.Publish(msg4)
166	// Stop flushes pending messages.
167	pub.Stop()
168
169	result1.ValidateResult(topic.Partition, 0)
170	result2.ValidateResult(topic.Partition, 1)
171	result3.ValidateResult(topic.Partition, 2)
172	result4.ValidateResult(topic.Partition, 33)
173
174	if gotErr := pub.FinalError(); gotErr != nil {
175		t.Errorf("Publisher final err: (%v), want: <nil>", gotErr)
176	}
177}
178
179func TestSinglePartitionPublisherResendMessages(t *testing.T) {
180	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
181
182	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
183	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
184	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
185
186	verifiers := test.NewVerifiers(t)
187
188	// Simulate a transient error that results in a reconnect before any server
189	// publish responses are received.
190	stream1 := test.NewRPCVerifier(t)
191	stream1.Push(initPubReq(topic), initPubResp(), nil)
192	stream1.Push(msgPubReq(msg1), nil, nil)
193	stream1.Push(msgPubReq(msg2), nil, status.Error(codes.Aborted, "server aborted"))
194	verifiers.AddPublishStream(topic.Path, topic.Partition, stream1)
195
196	// The publisher should resend all in-flight batches to the second stream.
197	stream2 := test.NewRPCVerifier(t)
198	stream2.Push(initPubReq(topic), initPubResp(), nil)
199	stream2.Push(msgPubReq(msg1, msg2), msgPubResp(0), nil)
200	stream2.Push(msgPubReq(msg3), msgPubResp(2), nil)
201	verifiers.AddPublishStream(topic.Path, topic.Partition, stream2)
202
203	mockServer.OnTestStart(verifiers)
204	defer mockServer.OnTestEnd()
205
206	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
207	defer pub.StopVerifyNoError()
208	if gotErr := pub.StartError(); gotErr != nil {
209		t.Errorf("Start() got err: (%v)", gotErr)
210	}
211
212	result1 := pub.Publish(msg1)
213	result2 := pub.Publish(msg2)
214	result1.ValidateResult(topic.Partition, 0)
215	result2.ValidateResult(topic.Partition, 1)
216
217	result3 := pub.Publish(msg3)
218	result3.ValidateResult(topic.Partition, 2)
219}
220
221func TestSinglePartitionPublisherPublishPermanentError(t *testing.T) {
222	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
223	permError := status.Error(codes.NotFound, "topic deleted")
224
225	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
226	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
227	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
228
229	verifiers := test.NewVerifiers(t)
230	stream := test.NewRPCVerifier(t)
231	stream.Push(initPubReq(topic), initPubResp(), nil)
232	stream.Push(msgPubReq(msg1), nil, permError) // Permanent error terminates publisher
233	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
234
235	mockServer.OnTestStart(verifiers)
236	defer mockServer.OnTestEnd()
237
238	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
239	if gotErr := pub.StartError(); gotErr != nil {
240		t.Errorf("Start() got err: (%v)", gotErr)
241	}
242
243	result1 := pub.Publish(msg1)
244	result2 := pub.Publish(msg2)
245	result1.ValidateError(permError)
246	result2.ValidateError(permError)
247
248	// This message arrives after the publisher has already stopped, so its error
249	// message is ErrServiceStopped.
250	result3 := pub.Publish(msg3)
251	result3.ValidateError(ErrServiceStopped)
252
253	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, permError) {
254		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, permError)
255	}
256}
257
258func TestSinglePartitionPublisherBufferOverflow(t *testing.T) {
259	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
260	settings := testPublishSettings()
261	settings.BufferedByteLimit = 15
262
263	msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)}
264	msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)} // Causes overflow
265	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
266
267	verifiers := test.NewVerifiers(t)
268	stream := test.NewRPCVerifier(t)
269	stream.Push(initPubReq(topic), initPubResp(), nil)
270	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(0), nil)
271	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
272
273	mockServer.OnTestStart(verifiers)
274	defer mockServer.OnTestEnd()
275
276	pub := newTestSinglePartitionPublisher(t, topic, settings)
277	if gotErr := pub.StartError(); gotErr != nil {
278		t.Errorf("Start() got err: (%v)", gotErr)
279	}
280
281	result1 := pub.Publish(msg1)
282	// Overflow is detected, which terminates the publisher, but previous messages
283	// are flushed.
284	result2 := pub.Publish(msg2)
285	// Delay the server response for the first Publish to verify that it is
286	// allowed to complete.
287	barrier.Release()
288	// This message arrives after the publisher has already stopped, so its error
289	// message is ErrServiceStopped.
290	result3 := pub.Publish(msg3)
291
292	result1.ValidateResult(topic.Partition, 0)
293	result2.ValidateError(ErrOverflow)
294	result3.ValidateError(ErrServiceStopped)
295
296	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, ErrOverflow) {
297		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, ErrOverflow)
298	}
299}
300
301func TestSinglePartitionPublisherBufferRefill(t *testing.T) {
302	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
303	settings := testPublishSettings()
304	settings.BufferedByteLimit = 15
305
306	msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'1'}, 10)}
307	msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{'2'}, 10)}
308
309	verifiers := test.NewVerifiers(t)
310	stream := test.NewRPCVerifier(t)
311	stream.Push(initPubReq(topic), initPubResp(), nil)
312	stream.Push(msgPubReq(msg1), msgPubResp(0), nil)
313	stream.Push(msgPubReq(msg2), msgPubResp(1), nil)
314	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
315
316	mockServer.OnTestStart(verifiers)
317	defer mockServer.OnTestEnd()
318
319	pub := newTestSinglePartitionPublisher(t, topic, settings)
320	if gotErr := pub.StartError(); gotErr != nil {
321		t.Errorf("Start() got err: (%v)", gotErr)
322	}
323
324	result1 := pub.Publish(msg1)
325	result1.ValidateResult(topic.Partition, 0)
326
327	// No overflow because msg2 is sent after the response for msg1 is received.
328	result2 := pub.Publish(msg2)
329	result2.ValidateResult(topic.Partition, 1)
330
331	pub.StopVerifyNoError()
332}
333
334func TestSinglePartitionPublisherInvalidCursorOffsets(t *testing.T) {
335	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
336
337	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
338	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
339	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
340
341	verifiers := test.NewVerifiers(t)
342	stream := test.NewRPCVerifier(t)
343	stream.Push(initPubReq(topic), initPubResp(), nil)
344	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(4), nil)
345	// The server returns an inconsistent cursor offset for msg2, which causes the
346	// publisher client to fail permanently.
347	stream.Push(msgPubReq(msg2), msgPubResp(4), nil)
348	stream.Push(msgPubReq(msg3), msgPubResp(5), nil)
349	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
350
351	mockServer.OnTestStart(verifiers)
352	defer mockServer.OnTestEnd()
353
354	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
355	if gotErr := pub.StartError(); gotErr != nil {
356		t.Errorf("Start() got err: (%v)", gotErr)
357	}
358
359	result1 := pub.Publish(msg1)
360	result2 := pub.Publish(msg2)
361	result3 := pub.Publish(msg3)
362	barrier.Release()
363
364	result1.ValidateResult(topic.Partition, 4)
365
366	// msg2 and subsequent messages are errored.
367	wantMsg := "server returned publish response with inconsistent start offset"
368	result2.ValidateErrorMsg(wantMsg)
369	result3.ValidateErrorMsg(wantMsg)
370	if gotErr := pub.FinalError(); !test.ErrorHasMsg(gotErr, wantMsg) {
371		t.Errorf("Publisher final err: (%v), want msg: %q", gotErr, wantMsg)
372	}
373}
374
375func TestSinglePartitionPublisherInvalidServerPublishResponse(t *testing.T) {
376	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
377	msg := &pb.PubSubMessage{Data: []byte{'1'}}
378
379	verifiers := test.NewVerifiers(t)
380	stream := test.NewRPCVerifier(t)
381	stream.Push(initPubReq(topic), initPubResp(), nil)
382	// Server sends duplicate initial publish response, which causes the publisher
383	// client to fail permanently.
384	stream.Push(msgPubReq(msg), initPubResp(), nil)
385	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
386
387	mockServer.OnTestStart(verifiers)
388	defer mockServer.OnTestEnd()
389
390	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
391	if gotErr := pub.StartError(); gotErr != nil {
392		t.Errorf("Start() got err: (%v)", gotErr)
393	}
394
395	result := pub.Publish(msg)
396
397	wantErr := errInvalidMsgPubResponse
398	result.ValidateError(wantErr)
399	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, wantErr) {
400		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, wantErr)
401	}
402}
403
404func TestSinglePartitionPublisherStopFlushesMessages(t *testing.T) {
405	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
406	finalErr := status.Error(codes.FailedPrecondition, "invalid message")
407
408	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
409	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
410	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
411	msg4 := &pb.PubSubMessage{Data: []byte{'4'}}
412
413	verifiers := test.NewVerifiers(t)
414	stream := test.NewRPCVerifier(t)
415	stream.Push(initPubReq(topic), initPubResp(), nil)
416	barrier := stream.PushWithBarrier(msgPubReq(msg1), msgPubResp(5), nil)
417	stream.Push(msgPubReq(msg2), msgPubResp(6), nil)
418	stream.Push(msgPubReq(msg3), nil, finalErr)
419	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
420
421	mockServer.OnTestStart(verifiers)
422	defer mockServer.OnTestEnd()
423
424	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
425	if gotErr := pub.StartError(); gotErr != nil {
426		t.Errorf("Start() got err: (%v)", gotErr)
427	}
428
429	result1 := pub.Publish(msg1)
430	result2 := pub.Publish(msg2)
431	result3 := pub.Publish(msg3)
432	pub.Stop()
433	barrier.Release()
434	result4 := pub.Publish(msg4)
435
436	// First 2 messages should be allowed to complete.
437	result1.ValidateResult(topic.Partition, 5)
438	result2.ValidateResult(topic.Partition, 6)
439	// msg3 failed with a server error, which should result in the publisher
440	// terminating with an error.
441	result3.ValidateError(finalErr)
442	// msg4 was sent after the user called Stop(), so should fail immediately with
443	// ErrServiceStopped.
444	result4.ValidateError(ErrServiceStopped)
445
446	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, finalErr) {
447		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, finalErr)
448	}
449}
450
451func TestSinglePartitionPublisherPublishWhileStarting(t *testing.T) {
452	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
453	msg := &pb.PubSubMessage{Data: []byte{'1'}}
454
455	verifiers := test.NewVerifiers(t)
456	stream := test.NewRPCVerifier(t)
457	stream.Push(initPubReq(topic), initPubResp(), nil)
458	stream.Push(msgPubReq(msg), msgPubResp(42), nil)
459	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
460
461	mockServer.OnTestStart(verifiers)
462	defer mockServer.OnTestEnd()
463
464	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
465
466	// Did not wait for publisher to finish startup. But it should send msg once
467	// the Publish stream connects.
468	result := pub.Publish(msg)
469	result.ValidateResult(topic.Partition, 42)
470
471	pub.StopVerifyNoError()
472}
473
474func TestSinglePartitionPublisherPublishWhileStartingFails(t *testing.T) {
475	topic := topicPartition{"projects/123456/locations/us-central1-b/topics/my-topic", 0}
476	msg := &pb.PubSubMessage{Data: []byte{'1'}}
477	serverErr := status.Error(codes.FailedPrecondition, "failed")
478
479	verifiers := test.NewVerifiers(t)
480	stream := test.NewRPCVerifier(t)
481	barrier := stream.PushWithBarrier(initPubReq(topic), nil, serverErr)
482	verifiers.AddPublishStream(topic.Path, topic.Partition, stream)
483
484	mockServer.OnTestStart(verifiers)
485	defer mockServer.OnTestEnd()
486
487	pub := newTestSinglePartitionPublisher(t, topic, testPublishSettings())
488
489	// Published during startup.
490	result := pub.Publish(msg)
491	// Send the initial response (with error) to complete startup.
492	barrier.Release()
493
494	result.ValidateError(serverErr)
495	if gotErr := pub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
496		t.Errorf("Publisher final err: (%v), want: (%v)", gotErr, serverErr)
497	}
498}
499
500// testRoutingPublisher wraps a routingPublisher for testing.
501type testRoutingPublisher struct {
502	t   *testing.T
503	pub *routingPublisher
504}
505
506func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher {
507	ctx := context.Background()
508	pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn())
509	if err != nil {
510		t.Fatal(err)
511	}
512	adminClient, err := NewAdminClient(ctx, "ignored", testServer.ClientConn())
513	if err != nil {
514		t.Fatal(err)
515	}
516	allClients := apiClients{pubClient, adminClient}
517
518	source := &test.FakeSource{Ret: fakeSourceVal}
519	msgRouterFactory := newMessageRouterFactory(rand.New(source))
520	pubFactory := &singlePartitionPublisherFactory{
521		ctx:       ctx,
522		pubClient: pubClient,
523		settings:  settings,
524		topicPath: topicPath,
525	}
526	pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory)
527	pub.Start()
528	return &testRoutingPublisher{t: t, pub: pub}
529}
530
531func (tp *testRoutingPublisher) Publish(msg *pb.PubSubMessage) *testPublishResultReceiver {
532	result := newTestPublishResultReceiver(tp.t, msg)
533	tp.pub.Publish(msg, result.set)
534	return result
535}
536
537func (tp *testRoutingPublisher) NumPartitionPublishers() int {
538	tp.pub.mu.Lock()
539	defer tp.pub.mu.Unlock()
540	return len(tp.pub.publishers)
541}
542
543func (tp *testRoutingPublisher) Start()             { tp.pub.Start() }
544func (tp *testRoutingPublisher) Stop()              { tp.pub.Stop() }
545func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() }
546func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() }
547
548func TestRoutingPublisherStartOnce(t *testing.T) {
549	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
550	numPartitions := 2
551
552	verifiers := test.NewVerifiers(t)
553	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
554
555	stream0 := test.NewRPCVerifier(t)
556	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
557	verifiers.AddPublishStream(topic, 0, stream0)
558
559	stream1 := test.NewRPCVerifier(t)
560	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
561	verifiers.AddPublishStream(topic, 1, stream1)
562
563	mockServer.OnTestStart(verifiers)
564	defer mockServer.OnTestEnd()
565
566	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
567
568	t.Run("First succeeds", func(t *testing.T) {
569		// Note: newTestRoutingPublisher() called Start.
570		if gotErr := pub.WaitStarted(); gotErr != nil {
571			t.Errorf("Start() got err: (%v)", gotErr)
572		}
573		if got, want := pub.NumPartitionPublishers(), numPartitions; got != want {
574			t.Errorf("Num partition publishers: got %d, want %d", got, want)
575		}
576	})
577	t.Run("Second no-op", func(t *testing.T) {
578		// An error is not returned, but no new streams are opened. The mock server
579		// does not expect more RPCs.
580		pub.Start()
581		if gotErr := pub.WaitStarted(); gotErr != nil {
582			t.Errorf("Start() got err: (%v)", gotErr)
583		}
584	})
585
586	pub.Stop()
587	if gotErr := pub.WaitStopped(); gotErr != nil {
588		t.Errorf("Stop() got err: (%v)", gotErr)
589	}
590}
591
592func TestRoutingPublisherStartStop(t *testing.T) {
593	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
594	numPartitions := 2
595
596	verifiers := test.NewVerifiers(t)
597	barrier := verifiers.GlobalVerifier.PushWithBarrier(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
598
599	mockServer.OnTestStart(verifiers)
600	defer mockServer.OnTestEnd()
601
602	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
603	pub.Stop()
604	barrier.Release()
605
606	if gotErr := pub.WaitStopped(); gotErr != nil {
607		t.Errorf("Stop() got err: (%v)", gotErr)
608	}
609	// No publishers should be created.
610	if got, want := pub.NumPartitionPublishers(), 0; got != want {
611		t.Errorf("Num partition publishers: got %d, want %d", got, want)
612	}
613}
614
615func TestRoutingPublisherRoundRobin(t *testing.T) {
616	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
617	numPartitions := 3
618
619	// Messages have no ordering key, so the roundRobinMsgRouter is used.
620	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
621	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
622	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
623	msg4 := &pb.PubSubMessage{Data: []byte{'4'}}
624
625	verifiers := test.NewVerifiers(t)
626	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
627
628	// Partition 0
629	stream0 := test.NewRPCVerifier(t)
630	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
631	stream0.Push(msgPubReq(msg3), msgPubResp(34), nil)
632	verifiers.AddPublishStream(topic, 0, stream0)
633
634	// Partition 1
635	stream1 := test.NewRPCVerifier(t)
636	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
637	stream1.Push(msgPubReq(msg1), msgPubResp(41), nil)
638	stream1.Push(msgPubReq(msg4), msgPubResp(42), nil)
639	verifiers.AddPublishStream(topic, 1, stream1)
640
641	// Partition 2
642	stream2 := test.NewRPCVerifier(t)
643	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
644	stream2.Push(msgPubReq(msg2), msgPubResp(78), nil)
645	verifiers.AddPublishStream(topic, 2, stream2)
646
647	mockServer.OnTestStart(verifiers)
648	defer mockServer.OnTestEnd()
649
650	// Note: The fake source is initialized with value=1, so Partition=1 publisher
651	// will be the first chosen by the roundRobinMsgRouter.
652	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1)
653	if err := pub.WaitStarted(); err != nil {
654		t.Errorf("Start() got err: (%v)", err)
655	}
656
657	result1 := pub.Publish(msg1)
658	result2 := pub.Publish(msg2)
659	result3 := pub.Publish(msg3)
660	result4 := pub.Publish(msg4)
661
662	result1.ValidateResult(1, 41)
663	result2.ValidateResult(2, 78)
664	result3.ValidateResult(0, 34)
665	result4.ValidateResult(1, 42)
666
667	pub.Stop()
668	if err := pub.WaitStopped(); err != nil {
669		t.Errorf("Stop() got err: (%v)", err)
670	}
671}
672
673func TestRoutingPublisherHashing(t *testing.T) {
674	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
675	numPartitions := 3
676
677	key0 := []byte("bar") // hashes to partition 0
678	key1 := []byte("baz") // hashes to partition 1
679	key2 := []byte("foo") // hashes to partition 2
680
681	// Messages have ordering key, so the hashingMsgRouter is used.
682	msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2}
683	msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0}
684	msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2}
685	msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1}
686	msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0}
687
688	verifiers := test.NewVerifiers(t)
689	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
690
691	// Partition 0
692	stream0 := test.NewRPCVerifier(t)
693	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
694	stream0.Push(msgPubReq(msg2), msgPubResp(20), nil)
695	stream0.Push(msgPubReq(msg5), msgPubResp(21), nil)
696	verifiers.AddPublishStream(topic, 0, stream0)
697
698	// Partition 1
699	stream1 := test.NewRPCVerifier(t)
700	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
701	stream1.Push(msgPubReq(msg4), msgPubResp(40), nil)
702	verifiers.AddPublishStream(topic, 1, stream1)
703
704	// Partition 2
705	stream2 := test.NewRPCVerifier(t)
706	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
707	stream2.Push(msgPubReq(msg1), msgPubResp(10), nil)
708	stream2.Push(msgPubReq(msg3), msgPubResp(11), nil)
709	verifiers.AddPublishStream(topic, 2, stream2)
710
711	mockServer.OnTestStart(verifiers)
712	defer mockServer.OnTestEnd()
713
714	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
715	if err := pub.WaitStarted(); err != nil {
716		t.Errorf("Start() got err: (%v)", err)
717	}
718
719	result1 := pub.Publish(msg1)
720	result2 := pub.Publish(msg2)
721	result3 := pub.Publish(msg3)
722	result4 := pub.Publish(msg4)
723	result5 := pub.Publish(msg5)
724
725	result1.ValidateResult(2, 10)
726	result2.ValidateResult(0, 20)
727	result3.ValidateResult(2, 11)
728	result4.ValidateResult(1, 40)
729	result5.ValidateResult(0, 21)
730
731	pub.Stop()
732	if err := pub.WaitStopped(); err != nil {
733		t.Errorf("Stop() got err: (%v)", err)
734	}
735}
736
737func TestRoutingPublisherPermanentError(t *testing.T) {
738	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
739	numPartitions := 2
740	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
741	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
742	serverErr := status.Error(codes.FailedPrecondition, "failed")
743
744	verifiers := test.NewVerifiers(t)
745	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
746
747	// Partition 0
748	stream0 := test.NewRPCVerifier(t)
749	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
750	stream0.Push(msgPubReq(msg1), msgPubResp(34), nil)
751	verifiers.AddPublishStream(topic, 0, stream0)
752
753	// Partition 1. Fails due to permanent error, which will also shut down
754	// partition-0 publisher, but it should be allowed to flush its pending
755	// messages.
756	stream1 := test.NewRPCVerifier(t)
757	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
758	stream1.Push(msgPubReq(msg2), nil, serverErr)
759	verifiers.AddPublishStream(topic, 1, stream1)
760
761	mockServer.OnTestStart(verifiers)
762	defer mockServer.OnTestEnd()
763
764	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
765	if err := pub.WaitStarted(); err != nil {
766		t.Errorf("Start() got err: (%v)", err)
767	}
768
769	result1 := pub.Publish(msg1)
770	result2 := pub.Publish(msg2)
771
772	result1.ValidateResult(0, 34)
773	result2.ValidateError(serverErr)
774
775	if gotErr := pub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
776		t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
777	}
778}
779
780func TestRoutingPublisherPublishAfterStop(t *testing.T) {
781	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
782	numPartitions := 2
783	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
784	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
785
786	verifiers := test.NewVerifiers(t)
787	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil)
788
789	// Partition 0
790	stream0 := test.NewRPCVerifier(t)
791	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
792	verifiers.AddPublishStream(topic, 0, stream0)
793
794	// Partition 1
795	stream1 := test.NewRPCVerifier(t)
796	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
797	verifiers.AddPublishStream(topic, 1, stream1)
798
799	mockServer.OnTestStart(verifiers)
800	defer mockServer.OnTestEnd()
801
802	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
803	if err := pub.WaitStarted(); err != nil {
804		t.Errorf("Start() got err: (%v)", err)
805	}
806
807	pub.Stop()
808	result1 := pub.Publish(msg1)
809	result2 := pub.Publish(msg2)
810
811	result1.ValidateError(ErrServiceStopped)
812	result2.ValidateError(ErrServiceStopped)
813
814	if err := pub.WaitStopped(); err != nil {
815		t.Errorf("Stop() got err: (%v)", err)
816	}
817}
818
819func TestRoutingPublisherPartitionCountFail(t *testing.T) {
820	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
821	wantErr := status.Error(codes.NotFound, "no exist")
822
823	// Retrieving the number of partitions results in an error. Startup cannot
824	// proceed.
825	verifiers := test.NewVerifiers(t)
826	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, wantErr)
827
828	mockServer.OnTestStart(verifiers)
829	defer mockServer.OnTestEnd()
830
831	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
832
833	if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) {
834		t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr)
835	}
836	if got, want := pub.NumPartitionPublishers(), 0; got != want {
837		t.Errorf("Num partition publishers: got %d, want %d", got, want)
838	}
839
840	// Verify that the publisher does not attempt to restart. The mock server does
841	// not expect more RPCs.
842	pub.Start()
843}
844
845func TestRoutingPublisherPartitionCountInvalid(t *testing.T) {
846	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
847
848	// The number of partitions returned by the server must be valid, otherwise
849	// startup cannot proceed.
850	verifiers := test.NewVerifiers(t)
851	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(0), nil)
852
853	mockServer.OnTestStart(verifiers)
854	defer mockServer.OnTestEnd()
855
856	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
857
858	wantMsg := "topic has invalid number of partitions"
859	if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) {
860		t.Errorf("Start() got err: (%v), want msg: %q", gotErr, wantMsg)
861	}
862	if got, want := pub.NumPartitionPublishers(), 0; got != want {
863		t.Errorf("Num partition publishers: got %d, want %d", got, want)
864	}
865}
866
867func TestRoutingPublisherPartitionCountIncreases(t *testing.T) {
868	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
869	initialPartitionCount := 1
870	updatedPartitionCount := 3
871	msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
872	msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
873	msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
874
875	verifiers := test.NewVerifiers(t)
876	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
877	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil)
878
879	stream0 := test.NewRPCVerifier(t)
880	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
881	stream0.Push(msgPubReq(msg1), msgPubResp(11), nil)
882	verifiers.AddPublishStream(topic, 0, stream0)
883
884	stream1 := test.NewRPCVerifier(t)
885	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
886	stream1.Push(msgPubReq(msg2), msgPubResp(22), nil)
887	verifiers.AddPublishStream(topic, 1, stream1)
888
889	stream2 := test.NewRPCVerifier(t)
890	stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil)
891	stream2.Push(msgPubReq(msg3), msgPubResp(33), nil)
892	verifiers.AddPublishStream(topic, 2, stream2)
893
894	mockServer.OnTestStart(verifiers)
895	defer mockServer.OnTestEnd()
896
897	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
898
899	t.Run("Initial count", func(t *testing.T) {
900		if gotErr := pub.WaitStarted(); gotErr != nil {
901			t.Errorf("Start() got err: (%v)", gotErr)
902		}
903		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
904			t.Errorf("Num partition publishers: got %d, want %d", got, want)
905		}
906	})
907	t.Run("Updated count", func(t *testing.T) {
908		pub.pub.partitionWatcher.updatePartitionCount()
909		if got, want := pub.NumPartitionPublishers(), updatedPartitionCount; got != want {
910			t.Errorf("Num partition publishers: got %d, want %d", got, want)
911		}
912	})
913	t.Run("Publish", func(t *testing.T) {
914		result1 := pub.Publish(msg1)
915		result2 := pub.Publish(msg2)
916		result3 := pub.Publish(msg3)
917
918		result1.ValidateResult(0, 11)
919		result2.ValidateResult(1, 22)
920		result3.ValidateResult(2, 33)
921	})
922
923	pub.Stop()
924	if gotErr := pub.WaitStopped(); gotErr != nil {
925		t.Errorf("Stop() got err: (%v)", gotErr)
926	}
927}
928
929func TestRoutingPublisherPartitionCountDecreases(t *testing.T) {
930	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
931	initialPartitionCount := 2
932	updatedPartitionCount := 1
933
934	verifiers := test.NewVerifiers(t)
935	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
936	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil)
937
938	stream0 := test.NewRPCVerifier(t)
939	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
940	verifiers.AddPublishStream(topic, 0, stream0)
941
942	stream1 := test.NewRPCVerifier(t)
943	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
944	verifiers.AddPublishStream(topic, 1, stream1)
945
946	mockServer.OnTestStart(verifiers)
947	defer mockServer.OnTestEnd()
948
949	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
950
951	t.Run("Initial count", func(t *testing.T) {
952		if gotErr := pub.WaitStarted(); gotErr != nil {
953			t.Errorf("Start() got err: (%v)", gotErr)
954		}
955		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
956			t.Errorf("Num partition publishers: got %d, want %d", got, want)
957		}
958	})
959	t.Run("Updated count", func(t *testing.T) {
960		pub.pub.partitionWatcher.updatePartitionCount()
961
962		// Decreasing count ignored.
963		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
964			t.Errorf("Num partition publishers: got %d, want %d", got, want)
965		}
966	})
967
968	pub.Stop()
969	if gotErr := pub.WaitStopped(); gotErr != nil {
970		t.Errorf("Stop() got err: (%v)", gotErr)
971	}
972}
973
974func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) {
975	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
976	initialPartitionCount := 2
977	serverErr := status.Error(codes.NotFound, "deleted")
978
979	verifiers := test.NewVerifiers(t)
980	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil)
981	verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr)
982
983	stream0 := test.NewRPCVerifier(t)
984	stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil)
985	verifiers.AddPublishStream(topic, 0, stream0)
986
987	stream1 := test.NewRPCVerifier(t)
988	stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil)
989	verifiers.AddPublishStream(topic, 1, stream1)
990
991	mockServer.OnTestStart(verifiers)
992	defer mockServer.OnTestEnd()
993
994	pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0)
995
996	t.Run("Initial count", func(t *testing.T) {
997		if gotErr := pub.WaitStarted(); gotErr != nil {
998			t.Errorf("Start() got err: (%v)", gotErr)
999		}
1000		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
1001			t.Errorf("Num partition publishers: got %d, want %d", got, want)
1002		}
1003	})
1004	t.Run("Failed update", func(t *testing.T) {
1005		pub.pub.partitionWatcher.updatePartitionCount()
1006
1007		// Failed update ignored.
1008		if got, want := pub.NumPartitionPublishers(), initialPartitionCount; got != want {
1009			t.Errorf("Num partition publishers: got %d, want %d", got, want)
1010		}
1011	})
1012
1013	pub.Stop()
1014	if gotErr := pub.WaitStopped(); gotErr != nil {
1015		t.Errorf("Stop() got err: (%v)", gotErr)
1016	}
1017}
1018
1019func TestNewPublisherValidatesSettings(t *testing.T) {
1020	const topic = "projects/123456/locations/us-central1-b/topics/my-topic"
1021	const region = "us-central1"
1022
1023	settings := DefaultPublishSettings
1024	settings.DelayThreshold = 0
1025	if _, err := NewPublisher(context.Background(), settings, region, topic); err == nil {
1026		t.Error("NewPublisher() did not return error")
1027	}
1028}
1029