1// Copyright 2020 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mock 16 17import ( 18 "context" 19 "errors" 20 "fmt" 21 "net" 22 "strconv" 23 "strings" 24 "sync" 25 "testing" 26 "time" 27 28 "istio.io/pkg/log" 29 30 api "github.com/envoyproxy/go-control-plane/envoy/api/v2" 31 core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 32 listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" 33 route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" 34 hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" 35 discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 36 "github.com/envoyproxy/go-control-plane/pkg/cache/types" 37 "github.com/envoyproxy/go-control-plane/pkg/cache/v2" 38 "github.com/envoyproxy/go-control-plane/pkg/conversion" 39 xds "github.com/envoyproxy/go-control-plane/pkg/server/v2" 40 "github.com/envoyproxy/go-control-plane/pkg/wellknown" 41 "google.golang.org/grpc" 42 "google.golang.org/grpc/credentials" 43 "google.golang.org/grpc/metadata" 44) 45 46var xdsServerLog = log.RegisterScope("xdsServer", "XDS service debugging", 0) 47 48const ( 49 // credentialTokenHeaderKey is the header key in gPRC header which is used to 50 // pass credential token from envoy's SDS request to SDS service. 51 credentialTokenHeaderKey = "authorization" 52) 53 54type DynamicListener struct { 55 Port int 56} 57 58func (l *DynamicListener) makeListener() *api.Listener { 59 manager := &hcm.HttpConnectionManager{ 60 CodecType: hcm.HttpConnectionManager_AUTO, 61 StatPrefix: "http", 62 RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{ 63 RouteConfig: &api.RouteConfiguration{ 64 Name: "testListener", 65 VirtualHosts: []*route.VirtualHost{{ 66 Name: "backend", 67 Domains: []string{"*"}, 68 Routes: []*route.Route{{ 69 Match: &route.RouteMatch{PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/"}}, 70 Action: &route.Route_Route{Route: &route.RouteAction{ 71 ClusterSpecifier: &route.RouteAction_Cluster{Cluster: "backend"}, 72 }}, 73 }}}}}}, 74 HttpFilters: []*hcm.HttpFilter{{ 75 Name: wellknown.Router, 76 }}, 77 } 78 79 hTTPConnectionManager, err := conversion.MessageToStruct(manager) 80 if err != nil { 81 panic(err) 82 } 83 84 return &api.Listener{ 85 Name: strconv.Itoa(l.Port), 86 Address: &core.Address{Address: &core.Address_SocketAddress{SocketAddress: &core.SocketAddress{ 87 Address: "127.0.0.1", 88 PortSpecifier: &core.SocketAddress_PortValue{PortValue: uint32(l.Port)}}}}, 89 FilterChains: []*listener.FilterChain{{ 90 Filters: []*listener.Filter{{ 91 Name: wellknown.HTTPConnectionManager, 92 ConfigType: &listener.Filter_Config{Config: hTTPConnectionManager}, 93 }}, 94 }}, 95 } 96} 97 98type hasher struct{} 99 100func (hasher) ID(*core.Node) string { 101 return "" 102} 103 104// XDSConf has config for XDS server 105type XDSConf struct { 106 Port int 107 CertFile string 108 KeyFile string 109} 110 111// StartXDSServer sets up a mock XDS server 112// nolint: interfacer 113func StartXDSServer(conf XDSConf, cb *XDSCallbacks, ls *DynamicListener, isTLS bool) (*grpc.Server, error) { 114 snapshotCache := cache.NewSnapshotCache(false, hasher{}, nil) 115 server := xds.NewServer(context.Background(), snapshotCache, cb) 116 var gRPCServer *grpc.Server 117 if isTLS { 118 tlsCred, err := credentials.NewServerTLSFromFile(conf.CertFile, conf.KeyFile) 119 if err != nil { 120 xdsServerLog.Errorf("Failed to setup TLS: %v", err) 121 return nil, err 122 } 123 gRPCServer = grpc.NewServer(grpc.Creds(tlsCred)) 124 } else { 125 gRPCServer = grpc.NewServer() 126 } 127 lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", conf.Port)) 128 if err != nil { 129 xdsServerLog.Errorf("xDS server failed to listen on %s: %v", fmt.Sprintf(":%d", conf.Port), err) 130 return nil, err 131 } 132 xdsServerLog.Infof("%s xDS server listens on %s", time.Now().String(), lis.Addr().String()) 133 discovery.RegisterAggregatedDiscoveryServiceServer(gRPCServer, server) 134 snapshot := cache.Snapshot{} 135 snapshot.Resources[types.Listener] = cache.Resources{Version: time.Now().String(), Items: map[string]types.Resource{ 136 "backend": ls.makeListener()}} 137 _ = snapshotCache.SetSnapshot("", snapshot) 138 go func() { 139 _ = gRPCServer.Serve(lis) 140 }() 141 return gRPCServer, nil 142} 143 144type XDSCallbacks struct { 145 numStream int 146 numReq int 147 numTokenReceived int 148 149 callbackError bool 150 lastReceivedToken string 151 mutex sync.RWMutex 152 expectedToken string 153 t *testing.T 154 155 // These members close a stream for numStreamClose times, each time the stream 156 // lasts for streamDuration seconds. The numStreamClose + 1 stream is kept open. 157 numStreamClose int 158 streamDuration time.Duration 159} 160 161func CreateXdsCallback(t *testing.T) *XDSCallbacks { 162 return &XDSCallbacks{t: t} 163} 164 165func (c *XDSCallbacks) SetCallbackError(setErr bool) { 166 c.mutex.Lock() 167 defer c.mutex.Unlock() 168 c.callbackError = setErr 169} 170 171func (c *XDSCallbacks) SetExpectedToken(expected string) { 172 c.mutex.Lock() 173 defer c.mutex.Unlock() 174 c.expectedToken = expected 175} 176 177// SetNumberOfStreamClose force XDS server to close gRPC stream n times. Each 178// stream will last d seconds before close. 179func (c *XDSCallbacks) SetNumberOfStreamClose(n int, d int) { 180 c.mutex.Lock() 181 defer c.mutex.Unlock() 182 c.numStreamClose = n 183 c.streamDuration = time.Duration(d) * time.Second 184} 185 186func (c *XDSCallbacks) ExpectedToken() string { 187 c.mutex.Lock() 188 defer c.mutex.Unlock() 189 return c.expectedToken 190} 191 192func (c *XDSCallbacks) NumStream() int { 193 c.mutex.Lock() 194 defer c.mutex.Unlock() 195 return c.numStream 196} 197 198func (c *XDSCallbacks) NumTokenReceived() int { 199 c.mutex.Lock() 200 defer c.mutex.Unlock() 201 return c.numTokenReceived 202} 203 204func (c *XDSCallbacks) OnStreamOpen(ctx context.Context, id int64, url string) error { 205 xdsServerLog.Infof("xDS stream (id: %d, url: %s) is open", id, url) 206 207 c.mutex.Lock() 208 defer c.mutex.Unlock() 209 c.numStream++ 210 if metadata, ok := metadata.FromIncomingContext(ctx); ok { 211 if h, ok := metadata[credentialTokenHeaderKey]; ok { 212 if len(h) != 1 { 213 c.t.Errorf("xDS stream (id: %d, url: %s) sends multiple tokens (%d)", id, url, len(h)) 214 } 215 if h[0] != c.lastReceivedToken { 216 c.numTokenReceived++ 217 c.lastReceivedToken = h[0] 218 } 219 if c.expectedToken != "" && strings.TrimPrefix(h[0], "Bearer ") != c.expectedToken { 220 c.t.Errorf("xDS stream (id: %d, url: %s) sent a token that does "+ 221 "not match expected token (%s vs %s)", id, url, h[0], c.expectedToken) 222 } else { 223 xdsServerLog.Infof("xDS stream (id: %d, url: %s) has valid token: %v", id, url, h[0]) 224 } 225 } else { 226 c.t.Errorf("XDS stream (id: %d, url: %s) does not have token in metadata %+v", 227 id, url, metadata) 228 } 229 } else { 230 c.t.Errorf("failed to get metadata from XDS stream (id: %d, url: %s)", id, url) 231 } 232 233 if c.callbackError { 234 return errors.New("fake stream error") 235 } 236 return nil 237} 238func (c *XDSCallbacks) OnStreamClosed(id int64) { 239 xdsServerLog.Infof("xDS stream (id: %d) is closed", id) 240} 241func (c *XDSCallbacks) OnStreamRequest(id int64, _ *api.DiscoveryRequest) error { 242 xdsServerLog.Infof("receive xDS request (id: %d)", id) 243 244 c.mutex.Lock() 245 defer c.mutex.Unlock() 246 c.numReq++ 247 248 // Send out the first response to finish Envoy initialization, and close stream 249 // in followup requests. 250 if c.numReq > 1 && c.numStream <= c.numStreamClose { 251 time.Sleep(c.streamDuration) 252 xdsServerLog.Infof("force close %d/%d xDS stream (id: %d)", c.numStream, c.numStreamClose, id) 253 return fmt.Errorf("force to close the stream (id: %d)", id) 254 } 255 return nil 256} 257func (c *XDSCallbacks) OnStreamResponse(id int64, _ *api.DiscoveryRequest, _ *api.DiscoveryResponse) { 258 xdsServerLog.Infof("on stream %d response", id) 259} 260func (c *XDSCallbacks) OnFetchRequest(context.Context, *api.DiscoveryRequest) error { 261 xdsServerLog.Infof("on fetch request") 262 return nil 263} 264func (c *XDSCallbacks) OnFetchResponse(*api.DiscoveryRequest, *api.DiscoveryResponse) { 265 xdsServerLog.Infof("on fetch response") 266} 267