1/* 2 * 3 * Copyright 2019 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19// Package fakeclient provides a fake implementation of an xDS client. 20package fakeclient 21 22import ( 23 "sync" 24 25 "google.golang.org/grpc/xds/internal/balancer/lrs" 26 xdsclient "google.golang.org/grpc/xds/internal/client" 27 "google.golang.org/grpc/xds/internal/testutils" 28) 29 30// Client is a fake implementation of an xds client. It exposes a bunch of 31// channels to signal the occurrence of various events. 32type Client struct { 33 name string 34 suWatchCh *testutils.Channel 35 cdsWatchCh *testutils.Channel 36 edsWatchCh *testutils.Channel 37 suCancelCh *testutils.Channel 38 cdsCancelCh *testutils.Channel 39 edsCancelCh *testutils.Channel 40 loadReportCh *testutils.Channel 41 closeCh *testutils.Channel 42 43 mu sync.Mutex 44 serviceCb func(xdsclient.ServiceUpdate, error) 45 cdsCb func(xdsclient.CDSUpdate, error) 46 edsCb func(*xdsclient.EDSUpdate, error) 47} 48 49// WatchService registers a LDS/RDS watch. 50func (xdsC *Client) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() { 51 xdsC.mu.Lock() 52 defer xdsC.mu.Unlock() 53 54 xdsC.serviceCb = callback 55 xdsC.suWatchCh.Send(target) 56 return func() { 57 xdsC.suCancelCh.Send(nil) 58 } 59} 60 61// WaitForWatchService waits for WatchService to be invoked on this client 62// within a reasonable timeout, and returns the serviceName being watched. 63func (xdsC *Client) WaitForWatchService() (string, error) { 64 val, err := xdsC.suWatchCh.Receive() 65 if err != nil { 66 return "", err 67 } 68 return val.(string), err 69} 70 71// InvokeWatchServiceCallback invokes the registered service watch callback. 72func (xdsC *Client) InvokeWatchServiceCallback(cluster string, err error) { 73 xdsC.mu.Lock() 74 defer xdsC.mu.Unlock() 75 76 xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err) 77} 78 79// WatchCluster registers a CDS watch. 80func (xdsC *Client) WatchCluster(clusterName string, callback func(xdsclient.CDSUpdate, error)) func() { 81 xdsC.mu.Lock() 82 defer xdsC.mu.Unlock() 83 84 xdsC.cdsCb = callback 85 xdsC.cdsWatchCh.Send(clusterName) 86 return func() { 87 xdsC.cdsCancelCh.Send(nil) 88 } 89} 90 91// WaitForWatchCluster waits for WatchCluster to be invoked on this client 92// within a reasonable timeout, and returns the clusterName being watched. 93func (xdsC *Client) WaitForWatchCluster() (string, error) { 94 val, err := xdsC.cdsWatchCh.Receive() 95 if err != nil { 96 return "", err 97 } 98 return val.(string), err 99} 100 101// InvokeWatchClusterCallback invokes the registered cdsWatch callback. 102func (xdsC *Client) InvokeWatchClusterCallback(update xdsclient.CDSUpdate, err error) { 103 xdsC.mu.Lock() 104 defer xdsC.mu.Unlock() 105 106 xdsC.cdsCb(update, err) 107} 108 109// WaitForCancelClusterWatch waits for a CDS watch to be cancelled within a 110// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise. 111func (xdsC *Client) WaitForCancelClusterWatch() error { 112 _, err := xdsC.cdsCancelCh.Receive() 113 return err 114} 115 116// WatchEDS registers an EDS watch for provided clusterName. 117func (xdsC *Client) WatchEDS(clusterName string, callback func(*xdsclient.EDSUpdate, error)) (cancel func()) { 118 xdsC.mu.Lock() 119 defer xdsC.mu.Unlock() 120 121 xdsC.edsCb = callback 122 xdsC.edsWatchCh.Send(clusterName) 123 return func() { 124 xdsC.edsCancelCh.Send(nil) 125 } 126} 127 128// WaitForWatchEDS waits for WatchEDS to be invoked on this client within a 129// reasonable timeout, and returns the clusterName being watched. 130func (xdsC *Client) WaitForWatchEDS() (string, error) { 131 val, err := xdsC.edsWatchCh.Receive() 132 if err != nil { 133 return "", err 134 } 135 return val.(string), err 136} 137 138// InvokeWatchEDSCallback invokes the registered edsWatch callback. 139func (xdsC *Client) InvokeWatchEDSCallback(update *xdsclient.EDSUpdate, err error) { 140 xdsC.mu.Lock() 141 defer xdsC.mu.Unlock() 142 143 xdsC.edsCb(update, err) 144} 145 146// ReportLoadArgs wraps the arguments passed to ReportLoad. 147type ReportLoadArgs struct { 148 // Server is the name of the server to which the load is reported. 149 Server string 150 // Cluster is the name of the cluster for which load is reported. 151 Cluster string 152} 153 154// ReportLoad starts reporting load about clusterName to server. 155func (xdsC *Client) ReportLoad(server string, clusterName string, loadStore lrs.Store) (cancel func()) { 156 xdsC.loadReportCh.Send(ReportLoadArgs{Server: server, Cluster: clusterName}) 157 return func() {} 158} 159 160// WaitForReportLoad waits for ReportLoad to be invoked on this client within a 161// reasonable timeout, and returns the arguments passed to it. 162func (xdsC *Client) WaitForReportLoad() (ReportLoadArgs, error) { 163 val, err := xdsC.loadReportCh.Receive() 164 return val.(ReportLoadArgs), err 165} 166 167// Close closes the xds client. 168func (xdsC *Client) Close() { 169 xdsC.closeCh.Send(nil) 170} 171 172// WaitForClose waits for Close to be invoked on this client within a 173// reasonable timeout, and returns testutils.ErrRecvTimeout otherwise. 174func (xdsC *Client) WaitForClose() error { 175 _, err := xdsC.closeCh.Receive() 176 return err 177} 178 179// Name returns the name of the xds client. 180func (xdsC *Client) Name() string { 181 return xdsC.name 182} 183 184// NewClient returns a new fake xds client. 185func NewClient() *Client { 186 return NewClientWithName("") 187} 188 189// NewClientWithName returns a new fake xds client with the provided name. This 190// is used in cases where multiple clients are created in the tests and we need 191// to make sure the client is created for the expected balancer name. 192func NewClientWithName(name string) *Client { 193 return &Client{ 194 name: name, 195 suWatchCh: testutils.NewChannel(), 196 cdsWatchCh: testutils.NewChannel(), 197 edsWatchCh: testutils.NewChannel(), 198 suCancelCh: testutils.NewChannel(), 199 cdsCancelCh: testutils.NewChannel(), 200 edsCancelCh: testutils.NewChannel(), 201 loadReportCh: testutils.NewChannel(), 202 closeCh: testutils.NewChannel(), 203 } 204} 205