1// Copyright 2020 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mock
16
17import (
18	"context"
19	"errors"
20	"fmt"
21	"net"
22	"strconv"
23	"strings"
24	"sync"
25	"testing"
26	"time"
27
28	"istio.io/pkg/log"
29
30	api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
31	core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
32	listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
33	route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
34	hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
35	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
36	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
37	"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
38	"github.com/envoyproxy/go-control-plane/pkg/conversion"
39	xds "github.com/envoyproxy/go-control-plane/pkg/server/v2"
40	"github.com/envoyproxy/go-control-plane/pkg/wellknown"
41	"google.golang.org/grpc"
42	"google.golang.org/grpc/credentials"
43	"google.golang.org/grpc/metadata"
44)
45
46var xdsServerLog = log.RegisterScope("xdsServer", "XDS service debugging", 0)
47
48const (
49	// credentialTokenHeaderKey is the header key in gPRC header which is used to
50	// pass credential token from envoy's SDS request to SDS service.
51	credentialTokenHeaderKey = "authorization"
52)
53
54type DynamicListener struct {
55	Port int
56}
57
58func (l *DynamicListener) makeListener() *api.Listener {
59	manager := &hcm.HttpConnectionManager{
60		CodecType:  hcm.HttpConnectionManager_AUTO,
61		StatPrefix: "http",
62		RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{
63			RouteConfig: &api.RouteConfiguration{
64				Name: "testListener",
65				VirtualHosts: []*route.VirtualHost{{
66					Name:    "backend",
67					Domains: []string{"*"},
68					Routes: []*route.Route{{
69						Match: &route.RouteMatch{PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/"}},
70						Action: &route.Route_Route{Route: &route.RouteAction{
71							ClusterSpecifier: &route.RouteAction_Cluster{Cluster: "backend"},
72						}},
73					}}}}}},
74		HttpFilters: []*hcm.HttpFilter{{
75			Name: wellknown.Router,
76		}},
77	}
78
79	hTTPConnectionManager, err := conversion.MessageToStruct(manager)
80	if err != nil {
81		panic(err)
82	}
83
84	return &api.Listener{
85		Name: strconv.Itoa(l.Port),
86		Address: &core.Address{Address: &core.Address_SocketAddress{SocketAddress: &core.SocketAddress{
87			Address:       "127.0.0.1",
88			PortSpecifier: &core.SocketAddress_PortValue{PortValue: uint32(l.Port)}}}},
89		FilterChains: []*listener.FilterChain{{
90			Filters: []*listener.Filter{{
91				Name:       wellknown.HTTPConnectionManager,
92				ConfigType: &listener.Filter_Config{Config: hTTPConnectionManager},
93			}},
94		}},
95	}
96}
97
98type hasher struct{}
99
100func (hasher) ID(*core.Node) string {
101	return ""
102}
103
104// XDSConf has config for XDS server
105type XDSConf struct {
106	Port     int
107	CertFile string
108	KeyFile  string
109}
110
111// StartXDSServer sets up a mock XDS server
112// nolint: interfacer
113func StartXDSServer(conf XDSConf, cb *XDSCallbacks, ls *DynamicListener, isTLS bool) (*grpc.Server, error) {
114	snapshotCache := cache.NewSnapshotCache(false, hasher{}, nil)
115	server := xds.NewServer(context.Background(), snapshotCache, cb)
116	var gRPCServer *grpc.Server
117	if isTLS {
118		tlsCred, err := credentials.NewServerTLSFromFile(conf.CertFile, conf.KeyFile)
119		if err != nil {
120			xdsServerLog.Errorf("Failed to setup TLS: %v", err)
121			return nil, err
122		}
123		gRPCServer = grpc.NewServer(grpc.Creds(tlsCred))
124	} else {
125		gRPCServer = grpc.NewServer()
126	}
127	lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", conf.Port))
128	if err != nil {
129		xdsServerLog.Errorf("xDS server failed to listen on %s: %v", fmt.Sprintf(":%d", conf.Port), err)
130		return nil, err
131	}
132	xdsServerLog.Infof("%s xDS server listens on %s", time.Now().String(), lis.Addr().String())
133	discovery.RegisterAggregatedDiscoveryServiceServer(gRPCServer, server)
134	snapshot := cache.Snapshot{}
135	snapshot.Resources[types.Listener] = cache.Resources{Version: time.Now().String(), Items: map[string]types.Resource{
136		"backend": ls.makeListener()}}
137	_ = snapshotCache.SetSnapshot("", snapshot)
138	go func() {
139		_ = gRPCServer.Serve(lis)
140	}()
141	return gRPCServer, nil
142}
143
144type XDSCallbacks struct {
145	numStream        int
146	numReq           int
147	numTokenReceived int
148
149	callbackError     bool
150	lastReceivedToken string
151	mutex             sync.RWMutex
152	expectedToken     string
153	t                 *testing.T
154
155	// These members close a stream for numStreamClose times, each time the stream
156	// lasts for streamDuration seconds. The numStreamClose + 1 stream is kept open.
157	numStreamClose int
158	streamDuration time.Duration
159}
160
161func CreateXdsCallback(t *testing.T) *XDSCallbacks {
162	return &XDSCallbacks{t: t}
163}
164
165func (c *XDSCallbacks) SetCallbackError(setErr bool) {
166	c.mutex.Lock()
167	defer c.mutex.Unlock()
168	c.callbackError = setErr
169}
170
171func (c *XDSCallbacks) SetExpectedToken(expected string) {
172	c.mutex.Lock()
173	defer c.mutex.Unlock()
174	c.expectedToken = expected
175}
176
177// SetNumberOfStreamClose force XDS server to close gRPC stream n times. Each
178// stream will last d seconds before close.
179func (c *XDSCallbacks) SetNumberOfStreamClose(n int, d int) {
180	c.mutex.Lock()
181	defer c.mutex.Unlock()
182	c.numStreamClose = n
183	c.streamDuration = time.Duration(d) * time.Second
184}
185
186func (c *XDSCallbacks) ExpectedToken() string {
187	c.mutex.Lock()
188	defer c.mutex.Unlock()
189	return c.expectedToken
190}
191
192func (c *XDSCallbacks) NumStream() int {
193	c.mutex.Lock()
194	defer c.mutex.Unlock()
195	return c.numStream
196}
197
198func (c *XDSCallbacks) NumTokenReceived() int {
199	c.mutex.Lock()
200	defer c.mutex.Unlock()
201	return c.numTokenReceived
202}
203
204func (c *XDSCallbacks) OnStreamOpen(ctx context.Context, id int64, url string) error {
205	xdsServerLog.Infof("xDS stream (id: %d, url: %s) is open", id, url)
206
207	c.mutex.Lock()
208	defer c.mutex.Unlock()
209	c.numStream++
210	if metadata, ok := metadata.FromIncomingContext(ctx); ok {
211		if h, ok := metadata[credentialTokenHeaderKey]; ok {
212			if len(h) != 1 {
213				c.t.Errorf("xDS stream (id: %d, url: %s) sends multiple tokens (%d)", id, url, len(h))
214			}
215			if h[0] != c.lastReceivedToken {
216				c.numTokenReceived++
217				c.lastReceivedToken = h[0]
218			}
219			if c.expectedToken != "" && strings.TrimPrefix(h[0], "Bearer ") != c.expectedToken {
220				c.t.Errorf("xDS stream (id: %d, url: %s) sent a token that does "+
221					"not match expected token (%s vs %s)", id, url, h[0], c.expectedToken)
222			} else {
223				xdsServerLog.Infof("xDS stream (id: %d, url: %s) has valid token: %v", id, url, h[0])
224			}
225		} else {
226			c.t.Errorf("XDS stream (id: %d, url: %s) does not have token in metadata %+v",
227				id, url, metadata)
228		}
229	} else {
230		c.t.Errorf("failed to get metadata from XDS stream (id: %d, url: %s)", id, url)
231	}
232
233	if c.callbackError {
234		return errors.New("fake stream error")
235	}
236	return nil
237}
238func (c *XDSCallbacks) OnStreamClosed(id int64) {
239	xdsServerLog.Infof("xDS stream (id: %d) is closed", id)
240}
241func (c *XDSCallbacks) OnStreamRequest(id int64, _ *api.DiscoveryRequest) error {
242	xdsServerLog.Infof("receive xDS request (id: %d)", id)
243
244	c.mutex.Lock()
245	defer c.mutex.Unlock()
246	c.numReq++
247
248	// Send out the first response to finish Envoy initialization, and close stream
249	// in followup requests.
250	if c.numReq > 1 && c.numStream <= c.numStreamClose {
251		time.Sleep(c.streamDuration)
252		xdsServerLog.Infof("force close %d/%d xDS stream (id: %d)", c.numStream, c.numStreamClose, id)
253		return fmt.Errorf("force to close the stream (id: %d)", id)
254	}
255	return nil
256}
257func (c *XDSCallbacks) OnStreamResponse(id int64, _ *api.DiscoveryRequest, _ *api.DiscoveryResponse) {
258	xdsServerLog.Infof("on stream %d response", id)
259}
260func (c *XDSCallbacks) OnFetchRequest(context.Context, *api.DiscoveryRequest) error {
261	xdsServerLog.Infof("on fetch request")
262	return nil
263}
264func (c *XDSCallbacks) OnFetchResponse(*api.DiscoveryRequest, *api.DiscoveryResponse) {
265	xdsServerLog.Infof("on fetch response")
266}
267