1// Copyright 2014 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package longtest_test
16
17import (
18	"context"
19	"fmt"
20	"log"
21	"math/rand"
22	"strconv"
23	"strings"
24	"sync"
25	"testing"
26	"time"
27
28	"cloud.google.com/go/internal/testutil"
29	"cloud.google.com/go/pubsub"
30	"google.golang.org/api/iterator"
31	"google.golang.org/api/option"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/status"
34)
35
36const (
37	timeout                 = time.Minute * 10
38	ackDeadline             = time.Second * 10
39	nMessages               = 1e4
40	acceptableDupPercentage = 1
41	numAcceptableDups       = int(nMessages * acceptableDupPercentage / 100)
42	resourcePrefix          = "endtoend"
43)
44
45// The end-to-end pumps many messages into a topic and tests that they are all
46// delivered to each subscription for the topic. It also tests that messages
47// are not unexpectedly redelivered.
48func TestEndToEnd_Dupes(t *testing.T) {
49	t.Skip("https://github.com/googleapis/google-cloud-go/issues/1752")
50
51	ctx, cancel := context.WithTimeout(context.Background(), timeout)
52	defer cancel()
53	client, topic, cleanup := prepareEndToEndTest(ctx, t)
54	defer cleanup()
55	subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
56
57	// Two subscriptions to the same topic.
58	var err error
59	var subs [2]*pubsub.Subscription
60	for i := 0; i < len(subs); i++ {
61		subs[i], err = client.CreateSubscription(ctx, fmt.Sprintf("%s-%d", subPrefix, i), pubsub.SubscriptionConfig{
62			Topic:       topic,
63			AckDeadline: ackDeadline,
64		})
65		if err != nil {
66			t.Fatalf("CreateSub error: %v", err)
67		}
68		defer subs[i].Delete(ctx)
69	}
70
71	err = publish(ctx, topic, nMessages)
72	topic.Stop()
73	if err != nil {
74		t.Fatalf("publish: %v", err)
75	}
76
77	// recv provides an indication that messages are still arriving.
78	recv := make(chan struct{})
79	// We have two subscriptions to our topic.
80	// Each subscription will get a copy of each published message.
81	var wg sync.WaitGroup
82	cctx, cancel := context.WithTimeout(ctx, timeout)
83	defer cancel()
84
85	consumers := []*consumer{
86		{
87			counts:    make(map[string]int),
88			recv:      recv,
89			durations: []time.Duration{time.Hour},
90			done:      make(chan struct{}),
91		},
92		{
93			counts:    make(map[string]int),
94			recv:      recv,
95			durations: []time.Duration{ackDeadline, ackDeadline, ackDeadline / 2, ackDeadline / 2, time.Hour},
96			done:      make(chan struct{}),
97		},
98	}
99	for i, con := range consumers {
100		con := con
101		sub := subs[i]
102		wg.Add(1)
103		go func() {
104			defer wg.Done()
105			con.consume(ctx, t, sub)
106		}()
107	}
108	// Wait for a while after the last message before declaring quiescence.
109	// We wait a multiple of the ack deadline, for two reasons:
110	// 1. To detect if messages are redelivered after having their ack
111	//    deadline extended.
112	// 2. To wait for redelivery of messages that were en route when a Receive
113	//    is canceled. This can take considerably longer than the ack deadline.
114	quiescenceDur := ackDeadline * 6
115	quiescenceTimer := time.NewTimer(quiescenceDur)
116
117loop:
118	for {
119		select {
120		case <-recv:
121			// Reset timer so we wait quiescenceDur after the last message.
122			// See https://godoc.org/time#Timer.Reset for why the Stop
123			// and channel drain are necessary.
124			if !quiescenceTimer.Stop() {
125				<-quiescenceTimer.C
126			}
127			quiescenceTimer.Reset(quiescenceDur)
128
129		case <-quiescenceTimer.C:
130			cancel()
131			log.Println("quiesced")
132			break loop
133
134		case <-cctx.Done():
135			t.Fatal("timed out")
136		}
137	}
138	wg.Wait()
139	close(recv)
140	for i, con := range consumers {
141		var numDups int
142		var zeroes int
143		for _, v := range con.counts {
144			if v == 0 {
145				zeroes++
146			}
147			numDups += v - 1
148		}
149
150		if zeroes > 0 {
151			t.Errorf("Consumer %d: %d messages never arrived", i, zeroes)
152		} else if numDups > numAcceptableDups {
153			t.Errorf("Consumer %d: Willing to accept %d dups (%v%% duplicated of %d messages), but got %d", i, numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
154		}
155	}
156
157	for i, con := range consumers {
158		select {
159		case <-con.done:
160		case <-time.After(15 * time.Second):
161			t.Fatalf("timed out waiting for consumer %d to finish", i)
162		}
163	}
164}
165
166func TestEndToEnd_LongProcessingTime(t *testing.T) {
167	ctx, cancel := context.WithTimeout(context.Background(), timeout)
168	defer cancel()
169	client, topic, cleanup := prepareEndToEndTest(ctx, t)
170	defer cleanup()
171	subPrefix := fmt.Sprintf("%s-%d", resourcePrefix, time.Now().UnixNano())
172
173	// Two subscriptions to the same topic.
174	sub, err := client.CreateSubscription(ctx, subPrefix+"-00", pubsub.SubscriptionConfig{
175		Topic:       topic,
176		AckDeadline: ackDeadline,
177	})
178	if err != nil {
179		t.Fatalf("CreateSub error: %v", err)
180	}
181	defer sub.Delete(ctx)
182
183	// Tests the issue found in https://github.com/googleapis/google-cloud-go/issues/1247.
184	sub.ReceiveSettings.Synchronous = true
185	sub.ReceiveSettings.MaxOutstandingMessages = 500
186
187	err = publish(ctx, topic, 500)
188	topic.Stop()
189	if err != nil {
190		t.Fatalf("publish: %v", err)
191	}
192
193	// recv provides an indication that messages are still arriving.
194	recv := make(chan struct{})
195	consumer := consumer{
196		counts:    make(map[string]int),
197		recv:      recv,
198		durations: []time.Duration{time.Hour},
199		processingDelay: func() time.Duration {
200			return time.Duration(1+rand.Int63n(120)) * time.Second
201		},
202		done: make(chan struct{}),
203	}
204	go consumer.consume(ctx, t, sub)
205	// Wait for a while after the last message before declaring quiescence.
206	// We wait a multiple of the ack deadline, for two reasons:
207	// 1. To detect if messages are redelivered after having their ack
208	//    deadline extended.
209	// 2. To wait for redelivery of messages that were en route when a Receive
210	//    is canceled. This can take considerably longer than the ack deadline.
211	quiescenceDur := 12 * ackDeadline
212	quiescenceTimer := time.NewTimer(quiescenceDur)
213loop:
214	for {
215		select {
216		case <-recv:
217			// Reset timer so we wait quiescenceDur after the last message.
218			// See https://godoc.org/time#Timer.Reset for why the Stop
219			// and channel drain are necessary.
220			if !quiescenceTimer.Stop() {
221				<-quiescenceTimer.C
222			}
223			quiescenceTimer.Reset(quiescenceDur)
224
225		case <-quiescenceTimer.C:
226			cancel()
227			log.Println("quiesced")
228			break loop
229
230		case <-ctx.Done():
231			t.Fatal("timed out")
232		}
233	}
234	close(recv)
235	var numDups int
236	var zeroes int
237	for _, v := range consumer.counts {
238		if v == 0 {
239			zeroes++
240		}
241		numDups += v - 1
242	}
243
244	if zeroes > 0 {
245		t.Errorf("%d messages never arrived", zeroes)
246	} else if numDups > numAcceptableDups {
247		t.Errorf("Willing to accept %d dups (%v duplicated of %d messages), but got %d", numAcceptableDups, acceptableDupPercentage, int(nMessages), numDups)
248	}
249
250	select {
251	case <-consumer.done:
252	case <-time.After(15 * time.Second):
253		t.Fatal("timed out waiting for consumer to finish")
254	}
255}
256
257// publish publishes n messages to topic.
258func publish(ctx context.Context, topic *pubsub.Topic, n int) error {
259	var rs []*pubsub.PublishResult
260	for i := 0; i < n; i++ {
261		m := &pubsub.Message{Data: []byte(fmt.Sprintf("msg %d", i))}
262		rs = append(rs, topic.Publish(ctx, m))
263	}
264	for _, r := range rs {
265		_, err := r.Get(ctx)
266		if err != nil {
267			return err
268		}
269	}
270	return nil
271}
272
273// consumer consumes messages according to its configuration.
274type consumer struct {
275	// A consumer will spin out a Receive for each duration, which will be
276	// canceled after each duration and the next one spun up. For example, if
277	// there are 5 3 second durations, then there will be 5 3 second Receives.
278	durations []time.Duration
279
280	// A value is sent to recv each time process is called.
281	recv chan struct{}
282
283	// How long to wait for before acking.
284	processingDelay func() time.Duration
285
286	mu         sync.Mutex
287	counts     map[string]int // msgID: recvdAmt
288	totalRecvd int
289
290	// Done consuming.
291	done chan struct{}
292}
293
294// consume reads messages from a subscription, and keeps track of what it receives in mc.
295// After consume returns, the caller should wait on wg to ensure that no more updates to mc will be made.
296func (c *consumer) consume(ctx context.Context, t *testing.T, sub *pubsub.Subscription) {
297	defer close(c.done)
298	for _, dur := range c.durations {
299		ctx2, cancel := context.WithTimeout(ctx, dur)
300		defer cancel()
301		id := sub.String()[len(sub.String())-1:]
302		t.Logf("%s: start receive", id)
303		prev := c.totalRecvd
304		err := sub.Receive(ctx2, c.process)
305		t.Logf("%s: end receive; read %d", id, c.totalRecvd-prev)
306		if serr, _ := status.FromError(err); err != nil && serr.Code() != codes.Canceled {
307			panic(err)
308		}
309		select {
310		case <-ctx.Done():
311			return
312		default:
313		}
314	}
315}
316
317// process handles a message and records it in mc.
318func (c *consumer) process(_ context.Context, m *pubsub.Message) {
319	c.mu.Lock()
320	c.counts[m.ID]++
321	c.totalRecvd++
322	c.mu.Unlock()
323	c.recv <- struct{}{}
324
325	var delay time.Duration
326	if c.processingDelay == nil {
327		delay = time.Duration(rand.Intn(int(ackDeadline * 3)))
328	} else {
329		delay = c.processingDelay()
330	}
331
332	// Simulate time taken to process m, while continuing to process more messages.
333	// Some messages will need to have their ack deadline extended due to this delay.
334	time.AfterFunc(delay, func() {
335		m.Ack()
336	})
337}
338
339// Remember to call cleanup!
340func prepareEndToEndTest(ctx context.Context, t *testing.T) (*pubsub.Client, *pubsub.Topic, func()) {
341	if testing.Short() {
342		t.Skip("Integration tests skipped in short mode")
343	}
344	ts := testutil.TokenSource(ctx, pubsub.ScopePubSub, pubsub.ScopeCloudPlatform)
345	if ts == nil {
346		t.Skip("Integration tests skipped. See CONTRIBUTING.md for details")
347	}
348
349	now := time.Now()
350	topicName := fmt.Sprintf("%s-%d", resourcePrefix, now.UnixNano())
351
352	client, err := pubsub.NewClient(ctx, testutil.ProjID(), option.WithTokenSource(ts))
353	if err != nil {
354		t.Fatalf("Creating client error: %v", err)
355	}
356
357	// Don't stop the test if cleanup failed.
358	if err := cleanupSubscription(ctx, client); err != nil {
359		t.Logf("Pre-test subscription cleanup failed: %v", err)
360	}
361	if err := cleanupTopic(ctx, client); err != nil {
362		t.Logf("Pre-test topic cleanup failed: %v", err)
363	}
364
365	var topic *pubsub.Topic
366	if topic, err = client.CreateTopic(ctx, topicName); err != nil {
367		t.Fatalf("CreateTopic error: %v", err)
368	}
369
370	return client, topic, func() {
371		topic.Delete(ctx)
372		client.Close()
373	}
374}
375
376// cleanupTopic deletes stale testing topics.
377func cleanupTopic(ctx context.Context, client *pubsub.Client) error {
378	if testing.Short() {
379		return nil // Don't clean up in short mode.
380	}
381	// Delete topics which were	created a while ago.
382	const expireAge = 24 * time.Hour
383
384	it := client.Topics(ctx)
385	for {
386		t, err := it.Next()
387		if err == iterator.Done {
388			break
389		}
390		if err != nil {
391			return err
392		}
393		// Take timestamp from id.
394		tID := t.ID()
395		p := strings.Split(tID, "-")
396
397		// Only delete resources created from the endtoend test.
398		// Otherwise, this will affect other tests running midflight.
399		if p[0] == resourcePrefix {
400			tCreated := p[len(p)-1]
401			timestamp, err := strconv.ParseInt(tCreated, 10, 64)
402			if err != nil {
403				continue
404			}
405			timeTCreated := time.Unix(0, timestamp)
406			if time.Since(timeTCreated) > expireAge {
407				log.Printf("deleting topic %q", tID)
408				if err := t.Delete(ctx); err != nil {
409					return fmt.Errorf("Delete topic: %v: %v", t.String(), err)
410				}
411			}
412		}
413	}
414	return nil
415}
416
417// cleanupSubscription deletes stale testing subscriptions.
418func cleanupSubscription(ctx context.Context, client *pubsub.Client) error {
419	if testing.Short() {
420		return nil // Don't clean up in short mode.
421	}
422	// Delete subscriptions which were created a while ago.
423	const expireAge = 24 * time.Hour
424
425	it := client.Subscriptions(ctx)
426	for {
427		s, err := it.Next()
428		if err == iterator.Done {
429			break
430		}
431		if err != nil {
432			return err
433		}
434		sID := s.ID()
435		p := strings.Split(sID, "-")
436
437		// Only delete resources created from the endtoend test.
438		// Otherwise, this will affect other tests running midflight.
439		if p[0] == resourcePrefix {
440			sCreated := p[len(p)-2]
441			timestamp, err := strconv.ParseInt(sCreated, 10, 64)
442			if err != nil {
443				continue
444			}
445			timeSCreated := time.Unix(0, timestamp)
446			if time.Since(timeSCreated) > expireAge {
447				log.Printf("deleting subscription %q", sID)
448				if err := s.Delete(ctx); err != nil {
449					return fmt.Errorf("Delete subscription: %v: %v", s.String(), err)
450				}
451			}
452		}
453	}
454	return nil
455}
456