1// Copyright 2018 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"log"
20	"sync/atomic"
21	"testing"
22	"time"
23
24	"cloud.google.com/go/pubsub/pstest"
25	"google.golang.org/api/option"
26	"google.golang.org/grpc"
27)
28
29// Using the fake PubSub server in the pstest package, verify that streaming
30// pull resumes if the server stream times out.
31func TestStreamTimeout(t *testing.T) {
32	t.Parallel()
33	log.SetFlags(log.Lmicroseconds)
34	ctx := context.Background()
35	srv := pstest.NewServer()
36	defer srv.Close()
37
38	srv.SetStreamTimeout(2 * time.Second)
39	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
40	if err != nil {
41		t.Fatal(err)
42	}
43	defer conn.Close()
44
45	opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn))
46	client, err := NewClient(ctx, "P", opts...)
47	if err != nil {
48		t.Fatal(err)
49	}
50	defer client.Close()
51
52	topic, err := client.CreateTopic(ctx, "T")
53	if err != nil {
54		t.Fatal(err)
55	}
56	sub, err := client.CreateSubscription(ctx, "sub", SubscriptionConfig{Topic: topic, AckDeadline: 10 * time.Second})
57	if err != nil {
58		t.Fatal(err)
59	}
60	const nPublish = 8
61	rctx, cancel := context.WithTimeout(ctx, 30*time.Second)
62	defer cancel()
63	errc := make(chan error)
64	var nSeen int64
65	go func() {
66		errc <- sub.Receive(rctx, func(ctx context.Context, m *Message) {
67			m.Ack()
68			n := atomic.AddInt64(&nSeen, 1)
69			if n >= nPublish {
70				cancel()
71			}
72		})
73	}()
74
75	for i := 0; i < nPublish; i++ {
76		pr := topic.Publish(ctx, &Message{Data: []byte("msg")})
77		_, err := pr.Get(ctx)
78		if err != nil {
79			t.Fatal(err)
80		}
81		time.Sleep(250 * time.Millisecond)
82	}
83
84	if err := <-errc; err != nil {
85		t.Fatal(err)
86	}
87	if err := sub.Delete(ctx); err != nil {
88		t.Fatal(err)
89	}
90	n := atomic.LoadInt64(&nSeen)
91	if n < nPublish {
92		t.Errorf("got %d messages, want %d", n, nPublish)
93	}
94}
95