1// Copyright 2018 Google LLC 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package pubsub 16 17import ( 18 "context" 19 "log" 20 "sync/atomic" 21 "testing" 22 "time" 23 24 "cloud.google.com/go/pubsub/pstest" 25 "google.golang.org/api/option" 26 "google.golang.org/grpc" 27) 28 29// Using the fake PubSub server in the pstest package, verify that streaming 30// pull resumes if the server stream times out. 31func TestStreamTimeout(t *testing.T) { 32 t.Parallel() 33 log.SetFlags(log.Lmicroseconds) 34 ctx := context.Background() 35 srv := pstest.NewServer() 36 defer srv.Close() 37 38 srv.SetStreamTimeout(2 * time.Second) 39 conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure()) 40 if err != nil { 41 t.Fatal(err) 42 } 43 defer conn.Close() 44 45 opts := withGRPCHeadersAssertion(t, option.WithGRPCConn(conn)) 46 client, err := NewClient(ctx, "P", opts...) 47 if err != nil { 48 t.Fatal(err) 49 } 50 defer client.Close() 51 52 topic, err := client.CreateTopic(ctx, "T") 53 if err != nil { 54 t.Fatal(err) 55 } 56 sub, err := client.CreateSubscription(ctx, "sub", SubscriptionConfig{Topic: topic, AckDeadline: 10 * time.Second}) 57 if err != nil { 58 t.Fatal(err) 59 } 60 const nPublish = 8 61 rctx, cancel := context.WithTimeout(ctx, 30*time.Second) 62 defer cancel() 63 errc := make(chan error) 64 var nSeen int64 65 go func() { 66 errc <- sub.Receive(rctx, func(ctx context.Context, m *Message) { 67 m.Ack() 68 n := atomic.AddInt64(&nSeen, 1) 69 if n >= nPublish { 70 cancel() 71 } 72 }) 73 }() 74 75 for i := 0; i < nPublish; i++ { 76 pr := topic.Publish(ctx, &Message{Data: []byte("msg")}) 77 _, err := pr.Get(ctx) 78 if err != nil { 79 t.Fatal(err) 80 } 81 time.Sleep(250 * time.Millisecond) 82 } 83 84 if err := <-errc; err != nil { 85 t.Fatal(err) 86 } 87 if err := sub.Delete(ctx); err != nil { 88 t.Fatal(err) 89 } 90 n := atomic.LoadInt64(&nSeen) 91 if n < nPublish { 92 t.Errorf("got %d messages, want %d", n, nPublish) 93 } 94} 95