1/*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package fakeclient provides a fake implementation of an xDS client.
20package fakeclient
21
22import (
23	"sync"
24
25	"google.golang.org/grpc/xds/internal/balancer/lrs"
26	xdsclient "google.golang.org/grpc/xds/internal/client"
27	"google.golang.org/grpc/xds/internal/testutils"
28)
29
30// Client is a fake implementation of an xds client. It exposes a bunch of
31// channels to signal the occurrence of various events.
32type Client struct {
33	name         string
34	suWatchCh    *testutils.Channel
35	cdsWatchCh   *testutils.Channel
36	edsWatchCh   *testutils.Channel
37	suCancelCh   *testutils.Channel
38	cdsCancelCh  *testutils.Channel
39	edsCancelCh  *testutils.Channel
40	loadReportCh *testutils.Channel
41	closeCh      *testutils.Channel
42
43	mu        sync.Mutex
44	serviceCb func(xdsclient.ServiceUpdate, error)
45	cdsCb     func(xdsclient.ClusterUpdate, error)
46	edsCb     func(xdsclient.EndpointsUpdate, error)
47}
48
49// WatchService registers a LDS/RDS watch.
50func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() {
51	xdsC.mu.Lock()
52	defer xdsC.mu.Unlock()
53
54	xdsC.serviceCb = callback
55	xdsC.suWatchCh.Send(target)
56	return func() {
57		xdsC.suCancelCh.Send(nil)
58	}
59}
60
61// WaitForWatchService waits for WatchService to be invoked on this client
62// within a reasonable timeout, and returns the serviceName being watched.
63func (xdsC *Client) WaitForWatchService() (string, error) {
64	val, err := xdsC.suWatchCh.Receive()
65	if err != nil {
66		return "", err
67	}
68	return val.(string), err
69}
70
71// InvokeWatchServiceCallback invokes the registered service watch callback.
72func (xdsC *Client) InvokeWatchServiceCallback(u xdsclient.ServiceUpdate, err error) {
73	xdsC.mu.Lock()
74	defer xdsC.mu.Unlock()
75
76	xdsC.serviceCb(u, err)
77}
78
79// WatchCluster registers a CDS watch.
80func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.ClusterUpdate, error)) func() {
81	xdsC.mu.Lock()
82	defer xdsC.mu.Unlock()
83
84	xdsC.cdsCb = callback
85	xdsC.cdsWatchCh.Send(clusterName)
86	return func() {
87		xdsC.cdsCancelCh.Send(nil)
88	}
89}
90
91// WaitForWatchCluster waits for WatchCluster to be invoked on this client
92// within a reasonable timeout, and returns the clusterName being watched.
93func (xdsC *Client) WaitForWatchCluster() (string, error) {
94	val, err := xdsC.cdsWatchCh.Receive()
95	if err != nil {
96		return "", err
97	}
98	return val.(string), err
99}
100
101// InvokeWatchClusterCallback invokes the registered cdsWatch callback.
102func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.ClusterUpdate, err error) {
103	xdsC.mu.Lock()
104	defer xdsC.mu.Unlock()
105
106	xdsC.cdsCb(update, err)
107}
108
109// WaitForCancelClusterWatch waits for a CDS watch to be cancelled within a
110// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
111func (xdsC *Client) WaitForCancelClusterWatch() error {
112	_, err := xdsC.cdsCancelCh.Receive()
113	return err
114}
115
116// WatchEndpoints registers an EDS watch for provided clusterName.
117func (xdsC *Client) WatchEndpoints(clusterName string, callback func(xdsclient.EndpointsUpdate, error)) (cancel func()) {
118	xdsC.mu.Lock()
119	defer xdsC.mu.Unlock()
120
121	xdsC.edsCb = callback
122	xdsC.edsWatchCh.Send(clusterName)
123	return func() {
124		xdsC.edsCancelCh.Send(nil)
125	}
126}
127
128// WaitForWatchEDS waits for WatchEndpoints to be invoked on this client within a
129// reasonable timeout, and returns the clusterName being watched.
130func (xdsC *Client) WaitForWatchEDS() (string, error) {
131	val, err := xdsC.edsWatchCh.Receive()
132	if err != nil {
133		return "", err
134	}
135	return val.(string), err
136}
137
138// InvokeWatchEDSCallback invokes the registered edsWatch callback.
139func (xdsC *Client) InvokeWatchEDSCallback(update xdsclient.EndpointsUpdate, err error) {
140	xdsC.mu.Lock()
141	defer xdsC.mu.Unlock()
142
143	xdsC.edsCb(update, err)
144}
145
146// WaitForCancelEDSWatch waits for a EDS watch to be cancelled within a
147// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
148func (xdsC *Client) WaitForCancelEDSWatch() error {
149	_, err := xdsC.edsCancelCh.Receive()
150	return err
151}
152
153// ReportLoadArgs wraps the arguments passed to ReportLoad.
154type ReportLoadArgs struct {
155	// Server is the name of the server to which the load is reported.
156	Server string
157	// Cluster is the name of the cluster for which load is reported.
158	Cluster string
159}
160
161// ReportLoad starts reporting load about clusterName to server.
162func (xdsC *Client) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) {
163	xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName})
164	return func() {}
165}
166
167// WaitForReportLoad waits for ReportLoad to be invoked on this client within a
168// reasonable timeout, and returns the arguments passed to it.
169func (xdsC *Client) WaitForReportLoad() (ReportLoadArgs, error) {
170	val, err := xdsC.loadReportCh.Receive()
171	return val.(ReportLoadArgs), err
172}
173
174// Close closes the xds client.
175func (xdsC *Client) Close() {
176	xdsC.closeCh.Send(nil)
177}
178
179// WaitForClose waits for Close to be invoked on this client within a
180// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise.
181func (xdsC *Client) WaitForClose() error {
182	_, err := xdsC.closeCh.Receive()
183	return err
184}
185
186// Name returns the name of the xds client.
187func (xdsC *Client) Name() string {
188	return xdsC.name
189}
190
191// NewClient returns a new fake xds client.
192func NewClient() *Client {
193	return NewClientWithName("")
194}
195
196// NewClientWithName returns a new fake xds client with the provided name. This
197// is used in cases where multiple clients are created in the tests and we need
198// to make sure the client is created for the expected balancer name.
199func NewClientWithName(name string) *Client {
200	return &Client{
201		name:         name,
202		suWatchCh:    testutils.NewChannel(),
203		cdsWatchCh:   testutils.NewChannel(),
204		edsWatchCh:   testutils.NewChannel(),
205		suCancelCh:   testutils.NewChannel(),
206		cdsCancelCh:  testutils.NewChannel(),
207		edsCancelCh:  testutils.NewChannel(),
208		loadReportCh: testutils.NewChannel(),
209		closeCh:      testutils.NewChannel(),
210	}
211}
212