1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package fakeclient provides a fake implementation of an xDS client.
20package fakeclient
21
22import (
23	"sync"
24
25	"google.golang.org/grpc/xds/internal/balancer/lrs"
26	xdsclient "google.golang.org/grpc/xds/internal/client"
27	"google.golang.org/grpc/xds/internal/testutils"
28)
29
30// Client is a fake implementation of an xds client. It exposes a bunch of
31// channels to signal the occurrence of various events.
32type Client struct {
33	name         string
34	suWatchCh    *testutils.Channel
35	cdsWatchCh   *testutils.Channel
36	edsWatchCh   *testutils.Channel
37	suCancelCh   *testutils.Channel
38	cdsCancelCh  *testutils.Channel
39	edsCancelCh  *testutils.Channel
40	loadReportCh *testutils.Channel
41	closeCh      *testutils.Channel
42
43	mu        sync.Mutex
44	serviceCb func(xdsclient.ServiceUpdate, error)
45	cdsCb     func(xdsclient.CDSUpdate, error)
46	edsCb     func(*xdsclient.EDSUpdate, error)
47}
48
49// WatchService registers a LDS/RDS watch.
50func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() {
51	xdsC.mu.Lock()
52	defer xdsC.mu.Unlock()
53
54	xdsC.serviceCb = callback
55	xdsC.suWatchCh.Send(target)
56	return func() {
57		xdsC.suCancelCh.Send(nil)
58	}
59}
60
61// WaitForWatchService waits for WatchService to be invoked on this client
62// within a reasonable timeout, and returns the serviceName being watched.
63func (xdsC *Client) WaitForWatchService() (string, error) {
64	val, err := xdsC.suWatchCh.Receive()
65	if err != nil {
66		return "", err
67	}
68	return val.(string), err
69}
70
71// InvokeWatchServiceCallback invokes the registered service watch callback.
72func (xdsC *Client) InvokeWatchServiceCallback(cluster string, err error) {
73	xdsC.mu.Lock()
74	defer xdsC.mu.Unlock()
75
76	xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err)
77}
78
79// WatchCluster registers a CDS watch.
80func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.CDSUpdate, error)) func() {
81	xdsC.mu.Lock()
82	defer xdsC.mu.Unlock()
83
84	xdsC.cdsCb = callback
85	xdsC.cdsWatchCh.Send(clusterName)
86	return func() {
87		xdsC.cdsCancelCh.Send(nil)
88	}
89}
90
91// WaitForWatchCluster waits for WatchCluster to be invoked on this client
92// within a reasonable timeout, and returns the clusterName being watched.
93func (xdsC *Client) WaitForWatchCluster() (string, error) {
94	val, err := xdsC.cdsWatchCh.Receive()
95	if err != nil {
96		return "", err
97	}
98	return val.(string), err
99}
100
101// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
102func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.CDSUpdate, err error) {
103	xdsC.mu.Lock()
104	defer xdsC.mu.Unlock()
105
106	xdsC.cdsCb(update, err)
107}
108
109// WaitForCancelClusterWatch waits for a CDS watch to be cancelled within a
110// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
111func (xdsC *Client) WaitForCancelClusterWatch() error {
112	_, err := xdsC.cdsCancelCh.Receive()
113	return err
114}
115
116// WatchEDS registers an EDS watch for provided clusterName.
117func (xdsC *Client) WatchEDS(clusterName string, callback func(*xdsclient.EDSUpdate, error)) (cancel func()) {
118	xdsC.mu.Lock()
119	defer xdsC.mu.Unlock()
120
121	xdsC.edsCb = callback
122	xdsC.edsWatchCh.Send(clusterName)
123	return func() {
124		xdsC.edsCancelCh.Send(nil)
125	}
126}
127
128// WaitForWatchEDS waits for WatchEDS to be invoked on this client within a
129// reasonable timeout, and returns the clusterName being watched.
130func (xdsC *Client) WaitForWatchEDS() (string, error) {
131	val, err := xdsC.edsWatchCh.Receive()
132	if err != nil {
133		return "", err
134	}
135	return val.(string), err
136}
137
138// InvokeWatchEDSCallback invokes the registered edsWatch callback.
139func (xdsC *Client) InvokeWatchEDSCallback(update *xdsclient.EDSUpdate, err error) {
140	xdsC.mu.Lock()
141	defer xdsC.mu.Unlock()
142
143	xdsC.edsCb(update, err)
144}
145
146// ReportLoadArgs wraps the arguments passed to ReportLoad.
147type ReportLoadArgs struct {
148	// Server is the name of the server to which the load is reported.
149	Server string
150	// Cluster is the name of the cluster for which load is reported.
151	Cluster string
152}
153
154// ReportLoad starts reporting load about clusterName to server.
155func (xdsC *Client) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
156	xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName})
157	return func() {}
158}
159
160// WaitForReportLoad waits for ReportLoad to be invoked on this client within a
161// reasonable timeout, and returns the arguments passed to it.
162func (xdsC *Client) WaitForReportLoad() (ReportLoadArgs, error) {
163	val, err := xdsC.loadReportCh.Receive()
164	return val.(ReportLoadArgs), err
165}
166
167// Close closes the xds client.
168func (xdsC *Client) Close() {
169	xdsC.closeCh.Send(nil)
170}
171
172// WaitForClose waits for Close to be invoked on this client within a
173// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
174func (xdsC *Client) WaitForClose() error {
175	_, err := xdsC.closeCh.Receive()
176	return err
177}
178
179// Name returns the name of the xds client.
180func (xdsC *Client) Name() string {
181	return xdsC.name
182}
183
184// NewClient returns a new fake xds client.
185func NewClient() *Client {
186	return NewClientWithName("")
187}
188
189// NewClientWithName returns a new fake xds client with the provided name. This
190// is used in cases where multiple clients are created in the tests and we need
191// to make sure the client is created for the expected balancer name.
192func NewClientWithName(name string) *Client {
193	return &Client{
194		name:         name,
195		suWatchCh:    testutils.NewChannel(),
196		cdsWatchCh:   testutils.NewChannel(),
197		edsWatchCh:   testutils.NewChannel(),
198		suCancelCh:   testutils.NewChannel(),
199		cdsCancelCh:  testutils.NewChannel(),
200		edsCancelCh:  testutils.NewChannel(),
201		loadReportCh: testutils.NewChannel(),
202		closeCh:      testutils.NewChannel(),
203	}
204}
205