1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package fakeclient provides a fake implementation of an xDS client.
20package fakeclient
21
22import (
23	"context"
24
25	"google.golang.org/grpc/internal/testutils"
26	xdsclient "google.golang.org/grpc/xds/internal/client"
27	"google.golang.org/grpc/xds/internal/client/bootstrap"
28	"google.golang.org/grpc/xds/internal/client/load"
29)
30
31// Client is a fake implementation of an xds client. It exposes a bunch of
32// channels to signal the occurrence of various events.
33type Client struct {
34	name         string
35	ldsWatchCh   *testutils.Channel
36	rdsWatchCh   *testutils.Channel
37	cdsWatchCh   *testutils.Channel
38	edsWatchCh   *testutils.Channel
39	ldsCancelCh  *testutils.Channel
40	rdsCancelCh  *testutils.Channel
41	cdsCancelCh  *testutils.Channel
42	edsCancelCh  *testutils.Channel
43	loadReportCh *testutils.Channel
44	closeCh      *testutils.Channel
45	loadStore    *load.Store
46	bootstrapCfg *bootstrap.Config
47
48	ldsCb func(xdsclient.ListenerUpdate, error)
49	rdsCb func(xdsclient.RouteConfigUpdate, error)
50	cdsCb func(xdsclient.ClusterUpdate, error)
51	edsCb func(xdsclient.EndpointsUpdate, error)
52}
53
54// WatchListener registers a LDS watch.
55func (xdsC *Client) WatchListener(serviceName string, callback func(xdsclient.ListenerUpdate, error)) func() {
56	xdsC.ldsCb = callback
57	xdsC.ldsWatchCh.Send(serviceName)
58	return func() {
59		xdsC.ldsCancelCh.Send(nil)
60	}
61}
62
63// WaitForWatchListener waits for WatchCluster to be invoked on this client and
64// returns the serviceName being watched.
65func (xdsC *Client) WaitForWatchListener(ctx context.Context) (string, error) {
66	val, err := xdsC.ldsWatchCh.Receive(ctx)
67	if err != nil {
68		return "", err
69	}
70	return val.(string), err
71}
72
73// InvokeWatchListenerCallback invokes the registered ldsWatch callback.
74//
75// Not thread safe with WatchListener. Only call this after
76// WaitForWatchListener.
77func (xdsC *Client) InvokeWatchListenerCallback(update xdsclient.ListenerUpdate, err error) {
78	xdsC.ldsCb(update, err)
79}
80
81// WaitForCancelListenerWatch waits for a LDS watch to be cancelled  and returns
82// context.DeadlineExceeded otherwise.
83func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
84	_, err := xdsC.ldsCancelCh.Receive(ctx)
85	return err
86}
87
88// WatchRouteConfig registers a RDS watch.
89func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsclient.RouteConfigUpdate, error)) func() {
90	xdsC.rdsCb = callback
91	xdsC.rdsWatchCh.Send(routeName)
92	return func() {
93		xdsC.rdsCancelCh.Send(nil)
94	}
95}
96
97// WaitForWatchRouteConfig waits for WatchCluster to be invoked on this client and
98// returns the routeName being watched.
99func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error) {
100	val, err := xdsC.rdsWatchCh.Receive(ctx)
101	if err != nil {
102		return "", err
103	}
104	return val.(string), err
105}
106
107// InvokeWatchRouteConfigCallback invokes the registered rdsWatch callback.
108//
109// Not thread safe with WatchRouteConfig. Only call this after
110// WaitForWatchRouteConfig.
111func (xdsC *Client) InvokeWatchRouteConfigCallback(update xdsclient.RouteConfigUpdate, err error) {
112	xdsC.rdsCb(update, err)
113}
114
115// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled  and returns
116// context.DeadlineExceeded otherwise.
117func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) error {
118	_, err := xdsC.rdsCancelCh.Receive(ctx)
119	return err
120}
121
122// WatchCluster registers a CDS watch.
123func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.ClusterUpdate, error)) func() {
124	xdsC.cdsCb = callback
125	xdsC.cdsWatchCh.Send(clusterName)
126	return func() {
127		xdsC.cdsCancelCh.Send(nil)
128	}
129}
130
131// WaitForWatchCluster waits for WatchCluster to be invoked on this client and
132// returns the clusterName being watched.
133func (xdsC *Client) WaitForWatchCluster(ctx context.Context) (string, error) {
134	val, err := xdsC.cdsWatchCh.Receive(ctx)
135	if err != nil {
136		return "", err
137	}
138	return val.(string), err
139}
140
141// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
142//
143// Not thread safe with WatchCluster. Only call this after
144// WaitForWatchCluster.
145func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.ClusterUpdate, err error) {
146	xdsC.cdsCb(update, err)
147}
148
149// WaitForCancelClusterWatch waits for a CDS watch to be cancelled  and returns
150// context.DeadlineExceeded otherwise.
151func (xdsC *Client) WaitForCancelClusterWatch(ctx context.Context) error {
152	_, err := xdsC.cdsCancelCh.Receive(ctx)
153	return err
154}
155
156// WatchEndpoints registers an EDS watch for provided clusterName.
157func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsclient.EndpointsUpdate, error)) (cancel func()) {
158	xdsC.edsCb = callback
159	xdsC.edsWatchCh.Send(clusterName)
160	return func() {
161		xdsC.edsCancelCh.Send(nil)
162	}
163}
164
165// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client and
166// returns the clusterName being watched.
167func (xdsC *Client) WaitForWatchEDS(ctx context.Context) (string, error) {
168	val, err := xdsC.edsWatchCh.Receive(ctx)
169	if err != nil {
170		return "", err
171	}
172	return val.(string), err
173}
174
175// InvokeWatchEDSCallback invokes the registered edsWatch callback.
176//
177// Not thread safe with WatchEndpoints. Only call this after
178// WaitForWatchEDS.
179func (xdsC *Client) InvokeWatchEDSCallback(update xdsclient.EndpointsUpdate, err error) {
180	xdsC.edsCb(update, err)
181}
182
183// WaitForCancelEDSWatch waits for a EDS watch to be cancelled and returns
184// context.DeadlineExceeded otherwise.
185func (xdsC *Client) WaitForCancelEDSWatch(ctx context.Context) error {
186	_, err := xdsC.edsCancelCh.Receive(ctx)
187	return err
188}
189
190// ReportLoadArgs wraps the arguments passed to ReportLoad.
191type ReportLoadArgs struct {
192	// Server is the name of the server to which the load is reported.
193	Server string
194}
195
196// ReportLoad starts reporting load about clusterName to server.
197func (xdsC *Client) ReportLoad(server string) (loadStore *load.Store, cancel func()) {
198	xdsC.loadReportCh.Send(ReportLoadArgs{Server: server})
199	return xdsC.loadStore, func() {}
200}
201
202// LoadStore returns the underlying load data store.
203func (xdsC *Client) LoadStore() *load.Store {
204	return xdsC.loadStore
205}
206
207// WaitForReportLoad waits for ReportLoad to be invoked on this client and
208// returns the arguments passed to it.
209func (xdsC *Client) WaitForReportLoad(ctx context.Context) (ReportLoadArgs, error) {
210	val, err := xdsC.loadReportCh.Receive(ctx)
211	return val.(ReportLoadArgs), err
212}
213
214// Close closes the xds client.
215func (xdsC *Client) Close() {
216	xdsC.closeCh.Send(nil)
217}
218
219// WaitForClose waits for Close to be invoked on this client and returns
220// context.DeadlineExceeded otherwise.
221func (xdsC *Client) WaitForClose(ctx context.Context) error {
222	_, err := xdsC.closeCh.Receive(ctx)
223	return err
224}
225
226// BootstrapConfig returns the bootstrap config.
227func (xdsC *Client) BootstrapConfig() *bootstrap.Config {
228	return xdsC.bootstrapCfg
229}
230
231// SetBootstrapConfig updates the bootstrap config.
232func (xdsC *Client) SetBootstrapConfig(cfg *bootstrap.Config) {
233	xdsC.bootstrapCfg = cfg
234}
235
236// Name returns the name of the xds client.
237func (xdsC *Client) Name() string {
238	return xdsC.name
239}
240
241// NewClient returns a new fake xds client.
242func NewClient() *Client {
243	return NewClientWithName("")
244}
245
246// NewClientWithName returns a new fake xds client with the provided name. This
247// is used in cases where multiple clients are created in the tests and we need
248// to make sure the client is created for the expected balancer name.
249func NewClientWithName(name string) *Client {
250	return &Client{
251		name:         name,
252		ldsWatchCh:   testutils.NewChannel(),
253		rdsWatchCh:   testutils.NewChannel(),
254		cdsWatchCh:   testutils.NewChannel(),
255		edsWatchCh:   testutils.NewChannel(),
256		ldsCancelCh:  testutils.NewChannel(),
257		rdsCancelCh:  testutils.NewChannel(),
258		cdsCancelCh:  testutils.NewChannel(),
259		edsCancelCh:  testutils.NewChannel(),
260		loadReportCh: testutils.NewChannel(),
261		closeCh:      testutils.NewChannel(),
262		loadStore:    load.NewStore(),
263	}
264}
265