1// Copyright 2017 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package pubsub
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"reflect"
22	"sync"
23	"sync/atomic"
24	"testing"
25	"time"
26
27	"cloud.google.com/go/internal/testutil"
28	"cloud.google.com/go/pubsub/pstest"
29	"google.golang.org/api/option"
30	"google.golang.org/grpc"
31	"google.golang.org/grpc/codes"
32	"google.golang.org/grpc/status"
33)
34
35var (
36	projName                = "some-project"
37	topicName               = "some-topic"
38	fullyQualifiedTopicName = fmt.Sprintf("projects/%s/topics/%s", projName, topicName)
39)
40
41func TestSplitRequestIDs(t *testing.T) {
42	t.Parallel()
43	ids := []string{"aaaa", "bbbb", "cccc", "dddd", "eeee"}
44	for _, test := range []struct {
45		ids        []string
46		splitIndex int
47	}{
48		{[]string{}, 0},
49		{ids, 2},
50		{ids[:2], 2},
51	} {
52		got1, got2 := splitRequestIDs(test.ids, reqFixedOverhead+20)
53		want1, want2 := test.ids[:test.splitIndex], test.ids[test.splitIndex:]
54		if !testutil.Equal(got1, want1) {
55			t.Errorf("%v, 1: got %v, want %v", test, got1, want1)
56		}
57		if !testutil.Equal(got2, want2) {
58			t.Errorf("%v, 2: got %v, want %v", test, got2, want2)
59		}
60	}
61}
62
63func TestAckDistribution(t *testing.T) {
64	if testing.Short() {
65		t.SkipNow()
66	}
67	t.Skip("broken")
68
69	ctx, cancel := context.WithCancel(context.Background())
70	defer cancel()
71
72	minAckDeadline = 1 * time.Second
73	pstest.SetMinAckDeadline(minAckDeadline)
74	srv := pstest.NewServer()
75	defer srv.Close()
76	defer pstest.ResetMinAckDeadline()
77
78	// Create the topic via a Publish. It's convenient to do it here as opposed to client.CreateTopic because the client
79	// has not been established yet, and also because we want to create the topic once whereas the client is established
80	// below twice.
81	srv.Publish(fullyQualifiedTopicName, []byte("creating a topic"), nil)
82
83	queuedMsgs := make(chan int32, 1024)
84	go continuouslySend(ctx, srv, queuedMsgs)
85
86	for _, testcase := range []struct {
87		initialProcessSecs int32
88		finalProcessSecs   int32
89	}{
90		{initialProcessSecs: 3, finalProcessSecs: 5}, // Process time goes up
91		{initialProcessSecs: 5, finalProcessSecs: 3}, // Process time goes down
92	} {
93		t.Logf("Testing %d -> %d", testcase.initialProcessSecs, testcase.finalProcessSecs)
94
95		// processTimeSecs is used by the sender to coordinate with the receiver how long the receiver should
96		// pretend to process for. e.g. if we test 3s -> 5s, processTimeSecs will start at 3, causing receiver
97		// to process messages received for 3s while sender sends the first batch. Then, as sender begins to
98		// send the next batch, sender will swap processTimeSeconds to 5s and begin sending, and receiver will
99		// process each message for 5s. In this way we simulate a client whose time-to-ack (process time) changes.
100		processTimeSecs := testcase.initialProcessSecs
101
102		s, client, err := initConn(ctx, srv.Addr)
103		if err != nil {
104			t.Fatal(err)
105		}
106
107		// recvdWg increments for each message sent, and decrements for each message received.
108		recvdWg := &sync.WaitGroup{}
109
110		go startReceiving(ctx, t, s, recvdWg, &processTimeSecs)
111		startSending(t, queuedMsgs, &processTimeSecs, testcase.initialProcessSecs, testcase.finalProcessSecs, recvdWg)
112
113		recvdWg.Wait()
114		time.Sleep(100 * time.Millisecond) // Wait a bit more for resources to clean up
115		err = client.Close()
116		if err != nil {
117			t.Fatal(err)
118		}
119
120		modacks := modacksByTime(srv.Messages())
121		u := modackDeadlines(modacks)
122		initialDL := int32(minAckDeadline / time.Second)
123		if !setsAreEqual(u, []int32{initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs}) {
124			t.Fatalf("Expected modack deadlines to contain (exactly, and only) %ds, %ds, %ds. Instead, got %v",
125				initialDL, testcase.initialProcessSecs, testcase.finalProcessSecs, toSet(u))
126		}
127	}
128}
129
130// modacksByTime buckets modacks by time.
131func modacksByTime(msgs []*pstest.Message) map[time.Time][]pstest.Modack {
132	modacks := map[time.Time][]pstest.Modack{}
133
134	for _, msg := range msgs {
135		for _, m := range msg.Modacks {
136			modacks[m.ReceivedAt] = append(modacks[m.ReceivedAt], m)
137		}
138	}
139	return modacks
140}
141
142// setsAreEqual reports whether a and b contain the same values, ignoring duplicates.
143func setsAreEqual(haystack, needles []int32) bool {
144	hMap := map[int32]bool{}
145	nMap := map[int32]bool{}
146
147	for _, n := range needles {
148		nMap[n] = true
149	}
150
151	for _, n := range haystack {
152		hMap[n] = true
153	}
154
155	return reflect.DeepEqual(nMap, hMap)
156}
157
158// startReceiving pretends to be a client. It calls s.Receive and acks messages after some random delay. It also
159// looks out for dupes - any message that arrives twice will cause a failure.
160func startReceiving(ctx context.Context, t *testing.T, s *Subscription, recvdWg *sync.WaitGroup, processTimeSecs *int32) {
161	t.Log("Receiving..")
162
163	var recvdMu sync.Mutex
164	recvd := map[string]bool{}
165
166	err := s.Receive(ctx, func(ctx context.Context, msg *Message) {
167		msgData := string(msg.Data)
168		recvdMu.Lock()
169		_, ok := recvd[msgData]
170		if ok {
171			recvdMu.Unlock()
172			t.Fatalf("already saw \"%s\"\n", msgData)
173			return
174		}
175		recvd[msgData] = true
176		recvdMu.Unlock()
177
178		select {
179		case <-ctx.Done():
180			msg.Nack()
181			recvdWg.Done()
182		case <-time.After(time.Duration(atomic.LoadInt32(processTimeSecs)) * time.Second):
183			msg.Ack()
184			recvdWg.Done()
185		}
186	})
187	if err != nil {
188		if status.Code(err) != codes.Canceled {
189			t.Error(err)
190		}
191	}
192}
193
194// startSending sends four batches of messages broken up by minDeadline, initialProcessSecs, and finalProcessSecs.
195func startSending(t *testing.T, queuedMsgs chan int32, processTimeSecs *int32, initialProcessSecs int32, finalProcessSecs int32, recvdWg *sync.WaitGroup) {
196	var msg int32
197
198	// We must send this block to force the receiver to send its initially-configured modack time. The time that
199	// gets sent should be ignorant of the distribution, since there haven't been enough (any, actually) messages
200	// to create a distribution yet.
201	t.Log("minAckDeadlineSecsSending an initial message")
202	recvdWg.Add(1)
203	msg++
204	queuedMsgs <- msg
205	<-time.After(minAckDeadline)
206
207	t.Logf("Sending some messages to update distribution to %d. This new distribution will be used "+
208		"when the next batch of messages go out.", initialProcessSecs)
209	for i := 0; i < 10; i++ {
210		recvdWg.Add(1)
211		msg++
212		queuedMsgs <- msg
213	}
214	atomic.SwapInt32(processTimeSecs, finalProcessSecs)
215	<-time.After(time.Duration(initialProcessSecs) * time.Second)
216
217	t.Logf("Sending many messages to update distribution to %d. This new distribution will be used "+
218		"when the next batch of messages go out.", finalProcessSecs)
219	for i := 0; i < 100; i++ {
220		recvdWg.Add(1)
221		msg++
222		queuedMsgs <- msg // Send many messages to drastically change distribution
223	}
224	<-time.After(time.Duration(finalProcessSecs) * time.Second)
225
226	t.Logf("Last message going out, whose deadline should be %d.", finalProcessSecs)
227	recvdWg.Add(1)
228	msg++
229	queuedMsgs <- msg
230}
231
232// continuouslySend continuously sends messages that exist on the queuedMsgs chan.
233func continuouslySend(ctx context.Context, srv *pstest.Server, queuedMsgs chan int32) {
234	for {
235		select {
236		case <-ctx.Done():
237			return
238		case m := <-queuedMsgs:
239			srv.Publish(fullyQualifiedTopicName, []byte(fmt.Sprintf("message %d", m)), nil)
240		}
241	}
242}
243
244func toSet(arr []int32) []int32 {
245	var s []int32
246	m := map[int32]bool{}
247
248	for _, v := range arr {
249		_, ok := m[v]
250		if !ok {
251			s = append(s, v)
252			m[v] = true
253		}
254	}
255
256	return s
257
258}
259
260func initConn(ctx context.Context, addr string) (*Subscription, *Client, error) {
261	conn, err := grpc.Dial(addr, grpc.WithInsecure())
262	if err != nil {
263		return nil, nil, err
264	}
265	client, err := NewClient(ctx, projName, option.WithGRPCConn(conn))
266	if err != nil {
267		return nil, nil, err
268	}
269
270	topic := client.Topic(topicName)
271	s, err := client.CreateSubscription(ctx, fmt.Sprintf("sub-%d", time.Now().UnixNano()), SubscriptionConfig{Topic: topic})
272	if err != nil {
273		return nil, nil, err
274	}
275
276	exists, err := s.Exists(ctx)
277	if !exists {
278		return nil, nil, errors.New("Subscription does not exist")
279	}
280	if err != nil {
281		return nil, nil, err
282	}
283
284	return s, client, nil
285}
286
287// modackDeadlines takes a map of time => Modack, gathers all the Modack.AckDeadlines,
288// and returns them as a slice
289func modackDeadlines(m map[time.Time][]pstest.Modack) []int32 {
290	var u []int32
291	for _, vv := range m {
292		for _, v := range vv {
293			u = append(u, v.AckDeadline)
294		}
295	}
296	return u
297}
298