1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pstest
16
17import (
18	"context"
19	"fmt"
20	"io"
21	"sync"
22	"testing"
23	"time"
24
25	"cloud.google.com/go/internal/testutil"
26	"github.com/golang/protobuf/ptypes"
27	pb "google.golang.org/genproto/googleapis/pubsub/v1"
28	"google.golang.org/grpc"
29	"google.golang.org/grpc/codes"
30	"google.golang.org/grpc/status"
31)
32
33func TestTopics(t *testing.T) {
34	pclient, _, server, cleanup := newFake(context.TODO(), t)
35	defer cleanup()
36
37	ctx := context.Background()
38	var topics []*pb.Topic
39	for i := 1; i < 3; i++ {
40		topics = append(topics, mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{
41			Name:   fmt.Sprintf("projects/P/topics/T%d", i),
42			Labels: map[string]string{"num": fmt.Sprintf("%d", i)},
43		}))
44	}
45	if got, want := len(server.GServer.topics), len(topics); got != want {
46		t.Fatalf("got %d topics, want %d", got, want)
47	}
48	for _, top := range topics {
49		got, err := pclient.GetTopic(ctx, &pb.GetTopicRequest{Topic: top.Name})
50		if err != nil {
51			t.Fatal(err)
52		}
53		if !testutil.Equal(got, top) {
54			t.Errorf("\ngot %+v\nwant %+v", got, top)
55		}
56	}
57
58	res, err := pclient.ListTopics(ctx, &pb.ListTopicsRequest{Project: "projects/P"})
59	if err != nil {
60		t.Fatal(err)
61	}
62	if got, want := res.Topics, topics; !testutil.Equal(got, want) {
63		t.Errorf("\ngot %+v\nwant %+v", got, want)
64	}
65
66	for _, top := range topics {
67		if _, err := pclient.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: top.Name}); err != nil {
68			t.Fatal(err)
69		}
70	}
71	if got, want := len(server.GServer.topics), 0; got != want {
72		t.Fatalf("got %d topics, want %d", got, want)
73	}
74}
75
76func TestSubscriptions(t *testing.T) {
77	pclient, sclient, server, cleanup := newFake(context.TODO(), t)
78	defer cleanup()
79
80	ctx := context.Background()
81	topic := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
82	var subs []*pb.Subscription
83	for i := 0; i < 3; i++ {
84		subs = append(subs, mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
85			Name:               fmt.Sprintf("projects/P/subscriptions/S%d", i),
86			Topic:              topic.Name,
87			AckDeadlineSeconds: int32(10 * (i + 1)),
88		}))
89	}
90
91	if got, want := len(server.GServer.subs), len(subs); got != want {
92		t.Fatalf("got %d subscriptions, want %d", got, want)
93	}
94	for _, s := range subs {
95		got, err := sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.Name})
96		if err != nil {
97			t.Fatal(err)
98		}
99		if !testutil.Equal(got, s) {
100			t.Errorf("\ngot %+v\nwant %+v", got, s)
101		}
102	}
103
104	res, err := sclient.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Project: "projects/P"})
105	if err != nil {
106		t.Fatal(err)
107	}
108	if got, want := res.Subscriptions, subs; !testutil.Equal(got, want) {
109		t.Errorf("\ngot %+v\nwant %+v", got, want)
110	}
111
112	res2, err := pclient.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Topic: topic.Name})
113	if err != nil {
114		t.Fatal(err)
115	}
116	if got, want := len(res2.Subscriptions), len(subs); got != want {
117		t.Fatalf("got %d subs, want %d", got, want)
118	}
119	for i, got := range res2.Subscriptions {
120		want := subs[i].Name
121		if !testutil.Equal(got, want) {
122			t.Errorf("\ngot %+v\nwant %+v", got, want)
123		}
124	}
125
126	for _, s := range subs {
127		if _, err := sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.Name}); err != nil {
128			t.Fatal(err)
129		}
130	}
131	if got, want := len(server.GServer.subs), 0; got != want {
132		t.Fatalf("got %d subscriptions, want %d", got, want)
133	}
134}
135
136func TestSubscriptionErrors(t *testing.T) {
137	_, sclient, _, cleanup := newFake(context.TODO(), t)
138	defer cleanup()
139
140	ctx := context.Background()
141
142	checkCode := func(err error, want codes.Code) {
143		t.Helper()
144		if status.Code(err) != want {
145			t.Errorf("got %v, want code %s", err, want)
146		}
147	}
148
149	_, err := sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{})
150	checkCode(err, codes.InvalidArgument)
151	_, err = sclient.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: "s"})
152	checkCode(err, codes.NotFound)
153	_, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{})
154	checkCode(err, codes.InvalidArgument)
155	_, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{Subscription: &pb.Subscription{}})
156	checkCode(err, codes.InvalidArgument)
157	_, err = sclient.UpdateSubscription(ctx, &pb.UpdateSubscriptionRequest{Subscription: &pb.Subscription{Name: "s"}})
158	checkCode(err, codes.NotFound)
159	_, err = sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{})
160	checkCode(err, codes.InvalidArgument)
161	_, err = sclient.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: "s"})
162	checkCode(err, codes.NotFound)
163	_, err = sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{})
164	checkCode(err, codes.InvalidArgument)
165	_, err = sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{Subscription: "s"})
166	checkCode(err, codes.NotFound)
167	_, err = sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{})
168	checkCode(err, codes.InvalidArgument)
169	_, err = sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{Subscription: "s"})
170	checkCode(err, codes.NotFound)
171	_, err = sclient.Pull(ctx, &pb.PullRequest{})
172	checkCode(err, codes.InvalidArgument)
173	_, err = sclient.Pull(ctx, &pb.PullRequest{Subscription: "s"})
174	checkCode(err, codes.NotFound)
175	_, err = sclient.Seek(ctx, &pb.SeekRequest{})
176	checkCode(err, codes.InvalidArgument)
177	srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}
178	_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt})
179	checkCode(err, codes.InvalidArgument)
180	_, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"})
181	checkCode(err, codes.NotFound)
182}
183
184func TestPublish(t *testing.T) {
185	s := NewServer()
186	defer s.Close()
187
188	var ids []string
189	for i := 0; i < 3; i++ {
190		ids = append(ids, s.Publish("projects/p/topics/t", []byte("hello"), nil))
191	}
192	s.Wait()
193	ms := s.Messages()
194	if got, want := len(ms), len(ids); got != want {
195		t.Errorf("got %d messages, want %d", got, want)
196	}
197	for i, id := range ids {
198		if got, want := ms[i].ID, id; got != want {
199			t.Errorf("got %s, want %s", got, want)
200		}
201	}
202
203	m := s.Message(ids[1])
204	if m == nil {
205		t.Error("got nil, want a message")
206	}
207}
208
209func TestClearMessages(t *testing.T) {
210	s := NewServer()
211	defer s.Close()
212
213	for i := 0; i < 3; i++ {
214		s.Publish("projects/p/topics/t", []byte("hello"), nil)
215	}
216	s.Wait()
217	msgs := s.Messages()
218	if got, want := len(msgs), 3; got != want {
219		t.Errorf("got %d messages, want %d", got, want)
220	}
221	s.ClearMessages()
222	msgs = s.Messages()
223	if got, want := len(msgs), 0; got != want {
224		t.Errorf("got %d messages, want %d", got, want)
225	}
226}
227
228// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".
229func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages []*pb.PubsubMessage) map[string]*pb.PubsubMessage {
230	pubTime := time.Now()
231	now.Store(func() time.Time { return pubTime })
232	defer func() { now.Store(time.Now) }()
233
234	res, err := pclient.Publish(context.Background(), &pb.PublishRequest{
235		Topic:    topic.Name,
236		Messages: messages,
237	})
238	if err != nil {
239		t.Fatal(err)
240	}
241	tsPubTime, err := ptypes.TimestampProto(pubTime)
242	if err != nil {
243		t.Fatal(err)
244	}
245	want := map[string]*pb.PubsubMessage{}
246	for i, id := range res.MessageIds {
247		want[id] = &pb.PubsubMessage{
248			Data:        messages[i].Data,
249			Attributes:  messages[i].Attributes,
250			MessageId:   id,
251			PublishTime: tsPubTime,
252		}
253	}
254	return want
255}
256
257func TestPull(t *testing.T) {
258	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
259	defer cleanup()
260
261	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
262	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
263		Name:               "projects/P/subscriptions/S",
264		Topic:              top.Name,
265		AckDeadlineSeconds: 10,
266	})
267
268	want := publish(t, pclient, top, []*pb.PubsubMessage{
269		{Data: []byte("d1")},
270		{Data: []byte("d2")},
271		{Data: []byte("d3")},
272	})
273	got := pubsubMessages(pullN(context.TODO(), t, len(want), sclient, sub))
274	if diff := testutil.Diff(got, want); diff != "" {
275		t.Error(diff)
276	}
277
278	res, err := sclient.Pull(context.Background(), &pb.PullRequest{Subscription: sub.Name})
279	if err != nil {
280		t.Fatal(err)
281	}
282	if len(res.ReceivedMessages) != 0 {
283		t.Errorf("got %d messages, want zero", len(res.ReceivedMessages))
284	}
285}
286
287func TestStreamingPull(t *testing.T) {
288	// A simple test of streaming pull.
289	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
290	defer cleanup()
291
292	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
293	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
294		Name:               "projects/P/subscriptions/S",
295		Topic:              top.Name,
296		AckDeadlineSeconds: 10,
297	})
298
299	want := publish(t, pclient, top, []*pb.PubsubMessage{
300		{Data: []byte("d1")},
301		{Data: []byte("d2")},
302		{Data: []byte("d3")},
303	})
304	got := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub))
305	if diff := testutil.Diff(got, want); diff != "" {
306		t.Error(diff)
307	}
308}
309
310// This test acks each message as it arrives and makes sure we don't see dups.
311func TestStreamingPullAck(t *testing.T) {
312	minAckDeadlineSecs = 1
313	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
314	defer cleanup()
315
316	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
317	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
318		Name:               "projects/P/subscriptions/S",
319		Topic:              top.Name,
320		AckDeadlineSeconds: 1,
321	})
322
323	_ = publish(t, pclient, top, []*pb.PubsubMessage{
324		{Data: []byte("d1")},
325		{Data: []byte("d2")},
326		{Data: []byte("d3")},
327	})
328
329	got := map[string]bool{}
330	ctx, cancel := context.WithCancel(context.Background())
331	spc := mustStartStreamingPull(ctx, t, sclient, sub)
332	time.AfterFunc(time.Duration(2*minAckDeadlineSecs)*time.Second, cancel)
333
334	for i := 0; i < 4; i++ {
335		res, err := spc.Recv()
336		if err == io.EOF {
337			break
338		}
339		if err != nil {
340			if status.Code(err) == codes.Canceled {
341				break
342			}
343			t.Fatal(err)
344		}
345		if i == 3 {
346			t.Fatal("expected to only see 3 messages, got 4")
347		}
348		req := &pb.StreamingPullRequest{}
349		for _, m := range res.ReceivedMessages {
350			if got[m.Message.MessageId] {
351				t.Fatal("duplicate message")
352			}
353			got[m.Message.MessageId] = true
354			req.AckIds = append(req.AckIds, m.AckId)
355		}
356		if err := spc.Send(req); err != nil {
357			t.Fatal(err)
358		}
359	}
360}
361
362func TestAcknowledge(t *testing.T) {
363	ctx := context.Background()
364	pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
365	defer cleanup()
366
367	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
368	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
369		Name:               "projects/P/subscriptions/S",
370		Topic:              top.Name,
371		AckDeadlineSeconds: 10,
372	})
373
374	publish(t, pclient, top, []*pb.PubsubMessage{
375		{Data: []byte("d1")},
376		{Data: []byte("d2")},
377		{Data: []byte("d3")},
378	})
379	msgs := streamingPullN(context.TODO(), t, 3, sclient, sub)
380	var ackIDs []string
381	for _, m := range msgs {
382		ackIDs = append(ackIDs, m.AckId)
383	}
384	if _, err := sclient.Acknowledge(ctx, &pb.AcknowledgeRequest{
385		Subscription: sub.Name,
386		AckIds:       ackIDs,
387	}); err != nil {
388		t.Fatal(err)
389	}
390	smsgs := srv.Messages()
391	if got, want := len(smsgs), 3; got != want {
392		t.Fatalf("got %d messages, want %d", got, want)
393	}
394	for _, sm := range smsgs {
395		if sm.Acks != 1 {
396			t.Errorf("message %s: got %d acks, want 1", sm.ID, sm.Acks)
397		}
398	}
399}
400
401func TestModAck(t *testing.T) {
402	ctx := context.Background()
403	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
404	defer cleanup()
405
406	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
407	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
408		Name:               "projects/P/subscriptions/S",
409		Topic:              top.Name,
410		AckDeadlineSeconds: 10,
411	})
412
413	publish(t, pclient, top, []*pb.PubsubMessage{
414		{Data: []byte("d1")},
415		{Data: []byte("d2")},
416		{Data: []byte("d3")},
417	})
418	msgs := streamingPullN(context.TODO(), t, 3, sclient, sub)
419	var ackIDs []string
420	for _, m := range msgs {
421		ackIDs = append(ackIDs, m.AckId)
422	}
423	if _, err := sclient.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{
424		Subscription:       sub.Name,
425		AckIds:             ackIDs,
426		AckDeadlineSeconds: 0,
427	}); err != nil {
428		t.Fatal(err)
429	}
430	// Having nacked all three messages, we should see them again.
431	msgs = streamingPullN(context.TODO(), t, 3, sclient, sub)
432	if got, want := len(msgs), 3; got != want {
433		t.Errorf("got %d messages, want %d", got, want)
434	}
435}
436
437func TestAckDeadline(t *testing.T) {
438	// Messages should be resent after they expire.
439	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
440	defer cleanup()
441
442	minAckDeadlineSecs = 2
443	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
444	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
445		Name:               "projects/P/subscriptions/S",
446		Topic:              top.Name,
447		AckDeadlineSeconds: minAckDeadlineSecs,
448	})
449
450	_ = publish(t, pclient, top, []*pb.PubsubMessage{
451		{Data: []byte("d1")},
452		{Data: []byte("d2")},
453		{Data: []byte("d3")},
454	})
455
456	got := map[string]int{}
457	spc := mustStartStreamingPull(context.TODO(), t, sclient, sub)
458	// In 5 seconds the ack deadline will expire twice, so we should see each message
459	// exactly three times.
460	time.AfterFunc(5*time.Second, func() {
461		if err := spc.CloseSend(); err != nil {
462			t.Errorf("CloseSend: %v", err)
463		}
464	})
465	for {
466		res, err := spc.Recv()
467		if err == io.EOF {
468			break
469		}
470		if err != nil {
471			t.Fatal(err)
472		}
473		for _, m := range res.ReceivedMessages {
474			got[m.Message.MessageId]++
475		}
476	}
477	for id, n := range got {
478		if n != 3 {
479			t.Errorf("message %s: saw %d times, want 3", id, n)
480		}
481	}
482}
483
484func TestMultiSubs(t *testing.T) {
485	// Each subscription gets every message.
486	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
487	defer cleanup()
488
489	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
490	sub1 := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
491		Name:               "projects/P/subscriptions/S1",
492		Topic:              top.Name,
493		AckDeadlineSeconds: 10,
494	})
495	sub2 := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
496		Name:               "projects/P/subscriptions/S2",
497		Topic:              top.Name,
498		AckDeadlineSeconds: 10,
499	})
500
501	want := publish(t, pclient, top, []*pb.PubsubMessage{
502		{Data: []byte("d1")},
503		{Data: []byte("d2")},
504		{Data: []byte("d3")},
505	})
506	got1 := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub1))
507	got2 := pubsubMessages(streamingPullN(context.TODO(), t, len(want), sclient, sub2))
508	if diff := testutil.Diff(got1, want); diff != "" {
509		t.Error(diff)
510	}
511	if diff := testutil.Diff(got2, want); diff != "" {
512		t.Error(diff)
513	}
514}
515
516// Messages are handed out to all streams of a subscription in a best-effort
517// round-robin behavior. The fake server prefers to fail-fast onto another
518// stream when one stream is already busy, though, so we're unable to test
519// strict round robin behavior.
520func TestMultiStreams(t *testing.T) {
521	ctx, cancel := context.WithCancel(context.Background())
522	defer cancel()
523	pclient, sclient, _, cleanup := newFake(ctx, t)
524	defer cleanup()
525
526	top := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
527	sub := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{
528		Name:               "projects/P/subscriptions/S",
529		Topic:              top.Name,
530		AckDeadlineSeconds: 10,
531	})
532	st1 := mustStartStreamingPull(ctx, t, sclient, sub)
533	defer st1.CloseSend()
534	st1Received := make(chan struct{})
535	go func() {
536		_, err := st1.Recv()
537		if err != nil {
538			t.Error(err)
539		}
540		close(st1Received)
541	}()
542
543	st2 := mustStartStreamingPull(ctx, t, sclient, sub)
544	defer st2.CloseSend()
545	st2Received := make(chan struct{})
546	go func() {
547		_, err := st2.Recv()
548		if err != nil {
549			t.Error(err)
550		}
551		close(st2Received)
552	}()
553
554	publish(t, pclient, top, []*pb.PubsubMessage{
555		{Data: []byte("d1")},
556		{Data: []byte("d2")},
557	})
558
559	timeout := time.After(5 * time.Second)
560	select {
561	case <-timeout:
562		t.Fatal("timed out waiting for stream 1 to receive any message")
563	case <-st1Received:
564	}
565	select {
566	case <-timeout:
567		t.Fatal("timed out waiting for stream 1 to receive any message")
568	case <-st2Received:
569	}
570}
571
572func TestStreamingPullTimeout(t *testing.T) {
573	pclient, sclient, srv, cleanup := newFake(context.TODO(), t)
574	defer cleanup()
575
576	timeout := 200 * time.Millisecond
577	srv.SetStreamTimeout(timeout)
578	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
579	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
580		Name:               "projects/P/subscriptions/S",
581		Topic:              top.Name,
582		AckDeadlineSeconds: 10,
583	})
584	stream := mustStartStreamingPull(context.TODO(), t, sclient, sub)
585	time.Sleep(2 * timeout)
586	_, err := stream.Recv()
587	if err != io.EOF {
588		t.Errorf("got %v, want io.EOF", err)
589	}
590}
591
592func TestSeek(t *testing.T) {
593	pclient, sclient, _, cleanup := newFake(context.TODO(), t)
594	defer cleanup()
595
596	top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
597	sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
598		Name:               "projects/P/subscriptions/S",
599		Topic:              top.Name,
600		AckDeadlineSeconds: 10,
601	})
602	ts := ptypes.TimestampNow()
603	_, err := sclient.Seek(context.Background(), &pb.SeekRequest{
604		Subscription: sub.Name,
605		Target:       &pb.SeekRequest_Time{Time: ts},
606	})
607	if err != nil {
608		t.Errorf("Seeking: %v", err)
609	}
610}
611
612func TestTryDeliverMessage(t *testing.T) {
613	for _, test := range []struct {
614		availStreamIdx int
615		expectedOutIdx int
616	}{
617		{availStreamIdx: 0, expectedOutIdx: 0},
618		// Stream 1 will always be marked for deletion.
619		{availStreamIdx: 2, expectedOutIdx: 1}, // s0, s1 (deleted), s2, s3 becomes s0, s2, s3. So we expect outIdx=1.
620		{availStreamIdx: 3, expectedOutIdx: 2}, // s0, s1 (deleted), s2, s3 becomes s0, s2, s3. So we expect outIdx=2.
621	} {
622		top := newTopic(&pb.Topic{Name: "some-topic"})
623		sub := newSubscription(top, &sync.Mutex{}, &pb.Subscription{Name: "some-sub", Topic: "some-topic"})
624
625		done := make(chan struct{}, 1)
626		done <- struct{}{}
627		sub.streams = []*stream{{}, {done: done}, {}, {}}
628
629		msgc := make(chan *pb.ReceivedMessage, 1)
630		sub.streams[test.availStreamIdx].msgc = msgc
631
632		var d int
633		idx, ok := sub.tryDeliverMessage(&message{deliveries: &d}, 0, time.Now())
634		if !ok {
635			t.Fatalf("[avail=%d]: expected msg to be put on stream %d's channel, but it was not", test.availStreamIdx, test.expectedOutIdx)
636		}
637		if idx != test.expectedOutIdx {
638			t.Fatalf("[avail=%d]: expected msg to be put on stream %d, but it was put on %d", test.availStreamIdx, test.expectedOutIdx, idx)
639		}
640		select {
641		case <-msgc:
642		default:
643			t.Fatalf("[avail=%d]: expected msg to be put on stream %d's channel, but it was not", test.availStreamIdx, idx)
644		}
645	}
646}
647
648func mustStartStreamingPull(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) pb.Subscriber_StreamingPullClient {
649	spc, err := sc.StreamingPull(ctx)
650	if err != nil {
651		t.Fatal(err)
652	}
653	if err := spc.Send(&pb.StreamingPullRequest{Subscription: sub.Name}); err != nil {
654		t.Fatal(err)
655	}
656	return spc
657}
658
659func pullN(ctx context.Context, t *testing.T, n int, sc pb.SubscriberClient, sub *pb.Subscription) map[string]*pb.ReceivedMessage {
660	got := map[string]*pb.ReceivedMessage{}
661	for i := 0; len(got) < n; i++ {
662		res, err := sc.Pull(ctx, &pb.PullRequest{Subscription: sub.Name, MaxMessages: int32(n - len(got))})
663		if err != nil {
664			t.Fatal(err)
665		}
666		for _, m := range res.ReceivedMessages {
667			got[m.Message.MessageId] = m
668		}
669	}
670	return got
671}
672
673func streamingPullN(ctx context.Context, t *testing.T, n int, sc pb.SubscriberClient, sub *pb.Subscription) map[string]*pb.ReceivedMessage {
674	spc := mustStartStreamingPull(ctx, t, sc, sub)
675	got := map[string]*pb.ReceivedMessage{}
676	for i := 0; i < n; i++ {
677		res, err := spc.Recv()
678		if err != nil {
679			t.Fatal(err)
680		}
681		for _, m := range res.ReceivedMessages {
682			got[m.Message.MessageId] = m
683		}
684	}
685	if err := spc.CloseSend(); err != nil {
686		t.Fatal(err)
687	}
688	res, err := spc.Recv()
689	if err != io.EOF {
690		t.Fatalf("Recv returned <%v> instead of EOF; res = %v", err, res)
691	}
692	return got
693}
694
695func pubsubMessages(rms map[string]*pb.ReceivedMessage) map[string]*pb.PubsubMessage {
696	ms := map[string]*pb.PubsubMessage{}
697	for k, rm := range rms {
698		ms[k] = rm.Message
699	}
700	return ms
701}
702
703func mustCreateTopic(ctx context.Context, t *testing.T, pc pb.PublisherClient, topic *pb.Topic) *pb.Topic {
704	top, err := pc.CreateTopic(ctx, topic)
705	if err != nil {
706		t.Fatal(err)
707	}
708	return top
709}
710
711func mustCreateSubscription(ctx context.Context, t *testing.T, sc pb.SubscriberClient, sub *pb.Subscription) *pb.Subscription {
712	sub, err := sc.CreateSubscription(ctx, sub)
713	if err != nil {
714		t.Fatal(err)
715	}
716	return sub
717}
718
719// newFake creates a new fake server along  with a publisher and subscriber
720// client. Its final return is a cleanup function.
721//
722// Note: be sure to call cleanup!
723func newFake(ctx context.Context, t *testing.T) (pb.PublisherClient, pb.SubscriberClient, *Server, func()) {
724	srv := NewServer()
725	conn, err := grpc.DialContext(ctx, srv.Addr, grpc.WithInsecure())
726	if err != nil {
727		t.Fatal(err)
728	}
729	return pb.NewPublisherClient(conn), pb.NewSubscriberClient(conn), srv, func() {
730		srv.Close()
731		conn.Close()
732	}
733}
734