1// +build go1.12
2
3/*
4 *
5 * Copyright 2020 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package client
22
23import (
24	"context"
25	"testing"
26
27	"github.com/google/go-cmp/cmp"
28
29	"google.golang.org/grpc/internal/testutils"
30)
31
32type clusterUpdateErr struct {
33	u   ClusterUpdate
34	err error
35}
36
37// TestClusterWatch covers the cases:
38// - an update is received after a watch()
39// - an update for another resource name
40// - an update is received after cancel()
41func (s) TestClusterWatch(t *testing.T) {
42	apiClientCh, cleanup := overrideNewAPIClient()
43	defer cleanup()
44
45	client, err := newWithConfig(clientOpts(testXDSServer, false))
46	if err != nil {
47		t.Fatalf("failed to create client: %v", err)
48	}
49	defer client.Close()
50
51	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
52	defer cancel()
53	c, err := apiClientCh.Receive(ctx)
54	if err != nil {
55		t.Fatalf("timeout when waiting for API client to be created: %v", err)
56	}
57	apiClient := c.(*testAPIClient)
58
59	clusterUpdateCh := testutils.NewChannel()
60	cancelWatch := client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
61		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
62	})
63	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
64		t.Fatalf("want new watch to start, got error %v", err)
65	}
66
67	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
68	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{})
69	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
70		t.Fatal(err)
71	}
72
73	// Another update, with an extra resource for a different resource name.
74	client.NewClusters(map[string]ClusterUpdate{
75		testCDSName:  wantUpdate,
76		"randomName": {},
77	}, UpdateMetadata{})
78	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
79		t.Fatal(err)
80	}
81
82	// Cancel watch, and send update again.
83	cancelWatch()
84	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{})
85	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
86	defer sCancel()
87	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
88		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
89	}
90}
91
92// TestClusterTwoWatchSameResourceName covers the case where an update is received
93// after two watch() for the same resource name.
94func (s) TestClusterTwoWatchSameResourceName(t *testing.T) {
95	apiClientCh, cleanup := overrideNewAPIClient()
96	defer cleanup()
97
98	client, err := newWithConfig(clientOpts(testXDSServer, false))
99	if err != nil {
100		t.Fatalf("failed to create client: %v", err)
101	}
102	defer client.Close()
103
104	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
105	defer cancel()
106	c, err := apiClientCh.Receive(ctx)
107	if err != nil {
108		t.Fatalf("timeout when waiting for API client to be created: %v", err)
109	}
110	apiClient := c.(*testAPIClient)
111
112	var clusterUpdateChs []*testutils.Channel
113	var cancelLastWatch func()
114	const count = 2
115	for i := 0; i < count; i++ {
116		clusterUpdateCh := testutils.NewChannel()
117		clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
118		cancelLastWatch = client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
119			clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
120		})
121
122		if i == 0 {
123			// A new watch is registered on the underlying API client only for
124			// the first iteration because we are using the same resource name.
125			if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
126				t.Fatalf("want new watch to start, got error %v", err)
127			}
128		}
129	}
130
131	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
132	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{})
133	for i := 0; i < count; i++ {
134		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
135			t.Fatal(err)
136		}
137	}
138
139	// Cancel the last watch, and send update again.
140	cancelLastWatch()
141	client.NewClusters(map[string]ClusterUpdate{testCDSName: wantUpdate}, UpdateMetadata{})
142	for i := 0; i < count-1; i++ {
143		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate); err != nil {
144			t.Fatal(err)
145		}
146	}
147
148	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
149	defer sCancel()
150	if u, err := clusterUpdateChs[count-1].Receive(sCtx); err != context.DeadlineExceeded {
151		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
152	}
153}
154
155// TestClusterThreeWatchDifferentResourceName covers the case where an update is
156// received after three watch() for different resource names.
157func (s) TestClusterThreeWatchDifferentResourceName(t *testing.T) {
158	apiClientCh, cleanup := overrideNewAPIClient()
159	defer cleanup()
160
161	client, err := newWithConfig(clientOpts(testXDSServer, false))
162	if err != nil {
163		t.Fatalf("failed to create client: %v", err)
164	}
165	defer client.Close()
166
167	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
168	defer cancel()
169	c, err := apiClientCh.Receive(ctx)
170	if err != nil {
171		t.Fatalf("timeout when waiting for API client to be created: %v", err)
172	}
173	apiClient := c.(*testAPIClient)
174
175	// Two watches for the same name.
176	var clusterUpdateChs []*testutils.Channel
177	const count = 2
178	for i := 0; i < count; i++ {
179		clusterUpdateCh := testutils.NewChannel()
180		clusterUpdateChs = append(clusterUpdateChs, clusterUpdateCh)
181		client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
182			clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
183		})
184
185		if i == 0 {
186			// A new watch is registered on the underlying API client only for
187			// the first iteration because we are using the same resource name.
188			if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
189				t.Fatalf("want new watch to start, got error %v", err)
190			}
191		}
192	}
193
194	// Third watch for a different name.
195	clusterUpdateCh2 := testutils.NewChannel()
196	client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
197		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
198	})
199	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
200		t.Fatalf("want new watch to start, got error %v", err)
201	}
202
203	wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
204	wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
205	client.NewClusters(map[string]ClusterUpdate{
206		testCDSName + "1": wantUpdate1,
207		testCDSName + "2": wantUpdate2,
208	}, UpdateMetadata{})
209
210	for i := 0; i < count; i++ {
211		if err := verifyClusterUpdate(ctx, clusterUpdateChs[i], wantUpdate1); err != nil {
212			t.Fatal(err)
213		}
214	}
215	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
216		t.Fatal(err)
217	}
218}
219
220// TestClusterWatchAfterCache covers the case where watch is called after the update
221// is in cache.
222func (s) TestClusterWatchAfterCache(t *testing.T) {
223	apiClientCh, cleanup := overrideNewAPIClient()
224	defer cleanup()
225
226	client, err := newWithConfig(clientOpts(testXDSServer, false))
227	if err != nil {
228		t.Fatalf("failed to create client: %v", err)
229	}
230	defer client.Close()
231
232	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
233	defer cancel()
234	c, err := apiClientCh.Receive(ctx)
235	if err != nil {
236		t.Fatalf("timeout when waiting for API client to be created: %v", err)
237	}
238	apiClient := c.(*testAPIClient)
239
240	clusterUpdateCh := testutils.NewChannel()
241	client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
242		clusterUpdateCh.Send(clusterUpdateErr{u: update, err: err})
243	})
244	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
245		t.Fatalf("want new watch to start, got error %v", err)
246	}
247
248	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
249	client.NewClusters(map[string]ClusterUpdate{
250		testCDSName: wantUpdate,
251	}, UpdateMetadata{})
252	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
253		t.Fatal(err)
254	}
255
256	// Another watch for the resource in cache.
257	clusterUpdateCh2 := testutils.NewChannel()
258	client.WatchCluster(testCDSName, func(update ClusterUpdate, err error) {
259		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
260	})
261	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
262	defer sCancel()
263	if n, err := apiClient.addWatches[ClusterResource].Receive(sCtx); err != context.DeadlineExceeded {
264		t.Fatalf("want no new watch to start (recv timeout), got resource name: %v error %v", n, err)
265	}
266
267	// New watch should receives the update.
268	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate); err != nil {
269		t.Fatal(err)
270	}
271
272	// Old watch should see nothing.
273	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
274	defer sCancel()
275	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
276		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
277	}
278}
279
280// TestClusterWatchExpiryTimer tests the case where the client does not receive
281// an CDS response for the request that it sends out. We want the watch callback
282// to be invoked with an error once the watchExpiryTimer fires.
283func (s) TestClusterWatchExpiryTimer(t *testing.T) {
284	apiClientCh, cleanup := overrideNewAPIClient()
285	defer cleanup()
286
287	client, err := newWithConfig(clientOpts(testXDSServer, true))
288	if err != nil {
289		t.Fatalf("failed to create client: %v", err)
290	}
291	defer client.Close()
292
293	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
294	defer cancel()
295	c, err := apiClientCh.Receive(ctx)
296	if err != nil {
297		t.Fatalf("timeout when waiting for API client to be created: %v", err)
298	}
299	apiClient := c.(*testAPIClient)
300
301	clusterUpdateCh := testutils.NewChannel()
302	client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
303		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
304	})
305	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
306		t.Fatalf("want new watch to start, got error %v", err)
307	}
308
309	u, err := clusterUpdateCh.Receive(ctx)
310	if err != nil {
311		t.Fatalf("timeout when waiting for cluster update: %v", err)
312	}
313	gotUpdate := u.(clusterUpdateErr)
314	if gotUpdate.err == nil || !cmp.Equal(gotUpdate.u, ClusterUpdate{}) {
315		t.Fatalf("unexpected clusterUpdate: (%v, %v), want: (ClusterUpdate{}, nil)", gotUpdate.u, gotUpdate.err)
316	}
317}
318
319// TestClusterWatchExpiryTimerStop tests the case where the client does receive
320// an CDS response for the request that it sends out. We want no error even
321// after expiry timeout.
322func (s) TestClusterWatchExpiryTimerStop(t *testing.T) {
323	apiClientCh, cleanup := overrideNewAPIClient()
324	defer cleanup()
325
326	client, err := newWithConfig(clientOpts(testXDSServer, true))
327	if err != nil {
328		t.Fatalf("failed to create client: %v", err)
329	}
330	defer client.Close()
331
332	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
333	defer cancel()
334	c, err := apiClientCh.Receive(ctx)
335	if err != nil {
336		t.Fatalf("timeout when waiting for API client to be created: %v", err)
337	}
338	apiClient := c.(*testAPIClient)
339
340	clusterUpdateCh := testutils.NewChannel()
341	client.WatchCluster(testCDSName, func(u ClusterUpdate, err error) {
342		clusterUpdateCh.Send(clusterUpdateErr{u: u, err: err})
343	})
344	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
345		t.Fatalf("want new watch to start, got error %v", err)
346	}
347
348	wantUpdate := ClusterUpdate{ServiceName: testEDSName}
349	client.NewClusters(map[string]ClusterUpdate{
350		testCDSName: wantUpdate,
351	}, UpdateMetadata{})
352	if err := verifyClusterUpdate(ctx, clusterUpdateCh, wantUpdate); err != nil {
353		t.Fatal(err)
354	}
355
356	// Wait for an error, the error should never happen.
357	sCtx, sCancel := context.WithTimeout(ctx, defaultTestWatchExpiryTimeout)
358	defer sCancel()
359	if u, err := clusterUpdateCh.Receive(sCtx); err != context.DeadlineExceeded {
360		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
361	}
362}
363
364// TestClusterResourceRemoved covers the cases:
365// - an update is received after a watch()
366// - another update is received, with one resource removed
367//   - this should trigger callback with resource removed error
368// - one more update without the removed resource
369//   - the callback (above) shouldn't receive any update
370func (s) TestClusterResourceRemoved(t *testing.T) {
371	apiClientCh, cleanup := overrideNewAPIClient()
372	defer cleanup()
373
374	client, err := newWithConfig(clientOpts(testXDSServer, false))
375	if err != nil {
376		t.Fatalf("failed to create client: %v", err)
377	}
378	defer client.Close()
379
380	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
381	defer cancel()
382	c, err := apiClientCh.Receive(ctx)
383	if err != nil {
384		t.Fatalf("timeout when waiting for API client to be created: %v", err)
385	}
386	apiClient := c.(*testAPIClient)
387
388	clusterUpdateCh1 := testutils.NewChannel()
389	client.WatchCluster(testCDSName+"1", func(update ClusterUpdate, err error) {
390		clusterUpdateCh1.Send(clusterUpdateErr{u: update, err: err})
391	})
392	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
393		t.Fatalf("want new watch to start, got error %v", err)
394	}
395
396	// Another watch for a different name.
397	clusterUpdateCh2 := testutils.NewChannel()
398	client.WatchCluster(testCDSName+"2", func(update ClusterUpdate, err error) {
399		clusterUpdateCh2.Send(clusterUpdateErr{u: update, err: err})
400	})
401	if _, err := apiClient.addWatches[ClusterResource].Receive(ctx); err != nil {
402		t.Fatalf("want new watch to start, got error %v", err)
403	}
404
405	wantUpdate1 := ClusterUpdate{ServiceName: testEDSName + "1"}
406	wantUpdate2 := ClusterUpdate{ServiceName: testEDSName + "2"}
407	client.NewClusters(map[string]ClusterUpdate{
408		testCDSName + "1": wantUpdate1,
409		testCDSName + "2": wantUpdate2,
410	}, UpdateMetadata{})
411	if err := verifyClusterUpdate(ctx, clusterUpdateCh1, wantUpdate1); err != nil {
412		t.Fatal(err)
413	}
414	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
415		t.Fatal(err)
416	}
417
418	// Send another update to remove resource 1.
419	client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2}, UpdateMetadata{})
420
421	// Watcher 1 should get an error.
422	if u, err := clusterUpdateCh1.Receive(ctx); err != nil || ErrType(u.(clusterUpdateErr).err) != ErrorTypeResourceNotFound {
423		t.Errorf("unexpected clusterUpdate: %v, error receiving from channel: %v, want update with error resource not found", u, err)
424	}
425
426	// Watcher 2 should get the same update again.
427	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
428		t.Fatal(err)
429	}
430
431	// Send one more update without resource 1.
432	client.NewClusters(map[string]ClusterUpdate{testCDSName + "2": wantUpdate2}, UpdateMetadata{})
433
434	// Watcher 1 should not see an update.
435	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
436	defer sCancel()
437	if u, err := clusterUpdateCh1.Receive(sCtx); err != context.DeadlineExceeded {
438		t.Errorf("unexpected clusterUpdate: %v, %v, want channel recv timeout", u, err)
439	}
440
441	// Watcher 2 should get the same update again.
442	if err := verifyClusterUpdate(ctx, clusterUpdateCh2, wantUpdate2); err != nil {
443		t.Fatal(err)
444	}
445}
446