1// Copyright 2018 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package adsc
16
17import (
18	"context"
19	"crypto/tls"
20	"crypto/x509"
21	"encoding/json"
22	"fmt"
23	"io/ioutil"
24	"net"
25	"sync"
26	"time"
27
28	"istio.io/istio/pilot/pkg/networking/util"
29
30	xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2"
31	core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
32	ads "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
33	"github.com/envoyproxy/go-control-plane/pkg/conversion"
34	"github.com/golang/protobuf/jsonpb"
35	"github.com/golang/protobuf/proto"
36	pstruct "github.com/golang/protobuf/ptypes/struct"
37	"google.golang.org/grpc"
38	"google.golang.org/grpc/credentials"
39
40	istiolog "istio.io/pkg/log"
41)
42
43// Config for the ADS connection.
44type Config struct {
45	// Namespace defaults to 'default'
46	Namespace string
47
48	// Workload defaults to 'test'
49	Workload string
50
51	// Meta includes additional metadata for the node
52	Meta *pstruct.Struct
53
54	// NodeType defaults to sidecar. "ingress" and "router" are also supported.
55	NodeType string
56
57	// IP is currently the primary key used to locate inbound configs. It is sent by client,
58	// must match a known endpoint IP. Tests can use a ServiceEntry to register fake IPs.
59	IP string
60}
61
62// ADSC implements a basic client for ADS, for use in stress tests and tools
63// or libraries that need to connect to Istio pilot or other ADS servers.
64type ADSC struct {
65	// Stream is the GRPC connection stream, allowing direct GRPC send operations.
66	// Set after Dial is called.
67	stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesClient
68
69	conn *grpc.ClientConn
70
71	// NodeID is the node identity sent to Pilot.
72	nodeID string
73
74	certDir string
75	url     string
76
77	watchTime time.Time
78
79	// InitialLoad tracks the time to receive the initial configuration.
80	InitialLoad time.Duration
81
82	// httpListeners contains received listeners with a http_connection_manager filter.
83	httpListeners map[string]*xdsapi.Listener
84
85	// tcpListeners contains all listeners of type TCP (not-HTTP)
86	tcpListeners map[string]*xdsapi.Listener
87
88	// All received clusters of type eds, keyed by name
89	edsClusters map[string]*xdsapi.Cluster
90
91	// All received clusters of no-eds type, keyed by name
92	clusters map[string]*xdsapi.Cluster
93
94	// All received routes, keyed by route name
95	routes map[string]*xdsapi.RouteConfiguration
96
97	// All received endpoints, keyed by cluster name
98	eds map[string]*xdsapi.ClusterLoadAssignment
99
100	// Metadata has the node metadata to send to pilot.
101	// If nil, the defaults will be used.
102	Metadata *pstruct.Struct
103
104	// Updates includes the type of the last update received from the server.
105	Updates     chan string
106	VersionInfo map[string]string
107
108	mutex sync.Mutex
109}
110
111const (
112	typePrefix = "type.googleapis.com/envoy.api.v2."
113
114	// Constants used for XDS
115
116	// ClusterType is used for cluster discovery. Typically first request received
117	clusterType = typePrefix + "Cluster"
118	// EndpointType is used for EDS and ADS endpoint discovery. Typically second request.
119	endpointType = typePrefix + "ClusterLoadAssignment"
120	// ListenerType is sent after clusters and endpoints.
121	listenerType = typePrefix + "Listener"
122	// RouteType is sent after listeners.
123	routeType = typePrefix + "RouteConfiguration"
124)
125
126var (
127	adscLog = istiolog.RegisterScope("adsc", "adsc debugging", 0)
128)
129
130// Dial connects to a ADS server, with optional MTLS authentication if a cert dir is specified.
131func Dial(url string, certDir string, opts *Config) (*ADSC, error) {
132	adsc := &ADSC{
133		Updates:     make(chan string, 100),
134		VersionInfo: map[string]string{},
135		certDir:     certDir,
136		url:         url,
137	}
138	if opts.Namespace == "" {
139		opts.Namespace = "default"
140	}
141	if opts.NodeType == "" {
142		opts.NodeType = "sidecar"
143	}
144	if opts.IP == "" {
145		opts.IP = getPrivateIPIfAvailable().String()
146	}
147	if opts.Workload == "" {
148		opts.Workload = "test-1"
149	}
150	adsc.Metadata = opts.Meta
151
152	adsc.nodeID = fmt.Sprintf("%s~%s~%s.%s~%s.svc.cluster.local", opts.NodeType, opts.IP,
153		opts.Workload, opts.Namespace, opts.Namespace)
154
155	err := adsc.Run()
156	return adsc, err
157}
158
159// Returns a private IP address, or unspecified IP (0.0.0.0) if no IP is available
160func getPrivateIPIfAvailable() net.IP {
161	addrs, _ := net.InterfaceAddrs()
162	for _, addr := range addrs {
163		var ip net.IP
164		switch v := addr.(type) {
165		case *net.IPNet:
166			ip = v.IP
167		case *net.IPAddr:
168			ip = v.IP
169		default:
170			continue
171		}
172		if !ip.IsLoopback() {
173			return ip
174		}
175	}
176	return net.IPv4zero
177}
178
179func tlsConfig(certDir string) (*tls.Config, error) {
180	clientCert, err := tls.LoadX509KeyPair(certDir+"/cert-chain.pem",
181		certDir+"/key.pem")
182	if err != nil {
183		return nil, err
184	}
185
186	serverCABytes, err := ioutil.ReadFile(certDir + "/root-cert.pem")
187	if err != nil {
188		return nil, err
189	}
190	serverCAs := x509.NewCertPool()
191	if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok {
192		return nil, err
193	}
194
195	return &tls.Config{
196		Certificates: []tls.Certificate{clientCert},
197		RootCAs:      serverCAs,
198		ServerName:   "istio-pilot.istio-system",
199		VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
200			return nil
201		},
202	}, nil
203}
204
205// Close the stream.
206func (a *ADSC) Close() {
207	a.mutex.Lock()
208	a.conn.Close()
209	a.mutex.Unlock()
210}
211
212// Run will run the ADS client.
213func (a *ADSC) Run() error {
214
215	// TODO: pass version info, nonce properly
216	var err error
217	if len(a.certDir) > 0 {
218		tlsCfg, err := tlsConfig(a.certDir)
219		if err != nil {
220			return err
221		}
222		creds := credentials.NewTLS(tlsCfg)
223
224		opts := []grpc.DialOption{
225			// Verify Pilot cert and service account
226			grpc.WithTransportCredentials(creds),
227		}
228		a.conn, err = grpc.Dial(a.url, opts...)
229		if err != nil {
230			return err
231		}
232	} else {
233		a.conn, err = grpc.Dial(a.url, grpc.WithInsecure())
234		if err != nil {
235			return err
236		}
237	}
238
239	xds := ads.NewAggregatedDiscoveryServiceClient(a.conn)
240	edsstr, err := xds.StreamAggregatedResources(context.Background())
241	if err != nil {
242		return err
243	}
244	a.stream = edsstr
245	go a.handleRecv()
246	return nil
247}
248
249func (a *ADSC) handleRecv() {
250	for {
251		msg, err := a.stream.Recv()
252		if err != nil {
253			adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
254			a.Close()
255			a.WaitClear()
256			a.Updates <- "close"
257			return
258		}
259
260		listeners := []*xdsapi.Listener{}
261		clusters := []*xdsapi.Cluster{}
262		routes := []*xdsapi.RouteConfiguration{}
263		eds := []*xdsapi.ClusterLoadAssignment{}
264		for _, rsc := range msg.Resources { // Any
265			a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo
266			valBytes := rsc.Value
267			if rsc.TypeUrl == listenerType {
268				ll := &xdsapi.Listener{}
269				_ = proto.Unmarshal(valBytes, ll)
270				listeners = append(listeners, ll)
271			} else if rsc.TypeUrl == clusterType {
272				ll := &xdsapi.Cluster{}
273				_ = proto.Unmarshal(valBytes, ll)
274				clusters = append(clusters, ll)
275			} else if rsc.TypeUrl == endpointType {
276				ll := &xdsapi.ClusterLoadAssignment{}
277				_ = proto.Unmarshal(valBytes, ll)
278				eds = append(eds, ll)
279			} else if rsc.TypeUrl == routeType {
280				ll := &xdsapi.RouteConfiguration{}
281				_ = proto.Unmarshal(valBytes, ll)
282				routes = append(routes, ll)
283			}
284		}
285
286		// TODO: add hook to inject nacks
287		a.mutex.Lock()
288		a.ack(msg)
289		a.mutex.Unlock()
290
291		if len(listeners) > 0 {
292			a.handleLDS(listeners)
293		}
294		if len(clusters) > 0 {
295			a.handleCDS(clusters)
296		}
297		if len(eds) > 0 {
298			a.handleEDS(eds)
299		}
300		if len(routes) > 0 {
301			a.handleRDS(routes)
302		}
303	}
304
305}
306
307// nolint: staticcheck
308func (a *ADSC) handleLDS(ll []*xdsapi.Listener) {
309	lh := map[string]*xdsapi.Listener{}
310	lt := map[string]*xdsapi.Listener{}
311
312	routes := []string{}
313	ldsSize := 0
314
315	for _, l := range ll {
316		ldsSize += proto.Size(l)
317
318		// The last filter is the actual destination for inbound listener
319		filter := l.FilterChains[len(l.FilterChains)-1].Filters[0]
320
321		// The actual destination will be the next to the last if the last filter is a passthrough filter
322		if l.FilterChains[len(l.FilterChains)-1].GetName() == util.PassthroughFilterChain {
323			filter = l.FilterChains[len(l.FilterChains)-2].Filters[0]
324		}
325
326		if filter.Name == "mixer" {
327			filter = l.FilterChains[len(l.FilterChains)-1].Filters[1]
328		}
329		if filter.Name == "envoy.tcp_proxy" {
330			lt[l.Name] = l
331			config := filter.GetConfig()
332			if config == nil {
333				config, _ = conversion.MessageToStruct(filter.GetTypedConfig())
334			}
335			c := config.Fields["cluster"].GetStringValue()
336			adscLog.Debugf("TCP: %s -> %s", l.Name, c)
337		} else if filter.Name == "envoy.http_connection_manager" {
338			lh[l.Name] = l
339
340			// Getting from config is too painful..
341			port := l.Address.GetSocketAddress().GetPortValue()
342			if port == 15002 {
343				routes = append(routes, "http_proxy")
344			} else {
345				routes = append(routes, fmt.Sprintf("%d", port))
346			}
347		} else if filter.Name == "envoy.mongo_proxy" {
348			// ignore for now
349		} else if filter.Name == "envoy.redis_proxy" {
350			// ignore for now
351		} else if filter.Name == "envoy.filters.network.mysql_proxy" {
352			// ignore for now
353		} else {
354			tm := &jsonpb.Marshaler{Indent: "  "}
355			adscLog.Infof(tm.MarshalToString(l))
356		}
357	}
358
359	adscLog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize)
360	if adscLog.DebugEnabled() {
361		b, _ := json.MarshalIndent(ll, " ", " ")
362		adscLog.Debugf(string(b))
363	}
364	a.mutex.Lock()
365	defer a.mutex.Unlock()
366	if len(routes) > 0 {
367		a.sendRsc(routeType, routes)
368	}
369	a.httpListeners = lh
370	a.tcpListeners = lt
371
372	select {
373	case a.Updates <- "lds":
374	default:
375	}
376}
377
378// compact representations, for simplified debugging/testing
379
380// TCPListener extracts the core elements from envoy Listener.
381type TCPListener struct {
382	// Address is the address, as expected by go Dial and Listen, including port
383	Address string
384
385	// LogFile is the access log address for the listener
386	LogFile string
387
388	// Target is the destination cluster.
389	Target string
390}
391
392type Target struct {
393
394	// Address is a go address, extracted from the mangled cluster name.
395	Address string
396
397	// Endpoints are the resolved endpoints from eds or cluster static.
398	Endpoints map[string]Endpoint
399}
400
401type Endpoint struct {
402	// Weight extracted from eds
403	Weight int
404}
405
406// Save will save the json configs to files, using the base directory
407func (a *ADSC) Save(base string) error {
408	a.mutex.Lock()
409	defer a.mutex.Unlock()
410	strResponse, err := json.MarshalIndent(a.tcpListeners, "  ", "  ")
411	if err != nil {
412		return err
413	}
414	err = ioutil.WriteFile(base+"_lds_tcp.json", strResponse, 0644)
415	if err != nil {
416		return err
417	}
418	strResponse, err = json.MarshalIndent(a.httpListeners, "  ", "  ")
419	if err != nil {
420		return err
421	}
422	err = ioutil.WriteFile(base+"_lds_http.json", strResponse, 0644)
423	if err != nil {
424		return err
425	}
426	strResponse, err = json.MarshalIndent(a.routes, "  ", "  ")
427	if err != nil {
428		return err
429	}
430	err = ioutil.WriteFile(base+"_rds.json", strResponse, 0644)
431	if err != nil {
432		return err
433	}
434	strResponse, err = json.MarshalIndent(a.edsClusters, "  ", "  ")
435	if err != nil {
436		return err
437	}
438	err = ioutil.WriteFile(base+"_ecds.json", strResponse, 0644)
439	if err != nil {
440		return err
441	}
442	strResponse, err = json.MarshalIndent(a.clusters, "  ", "  ")
443	if err != nil {
444		return err
445	}
446	err = ioutil.WriteFile(base+"_cds.json", strResponse, 0644)
447	if err != nil {
448		return err
449	}
450	strResponse, err = json.MarshalIndent(a.eds, "  ", "  ")
451	if err != nil {
452		return err
453	}
454	err = ioutil.WriteFile(base+"_eds.json", strResponse, 0644)
455	if err != nil {
456		return err
457	}
458
459	return err
460}
461
462func (a *ADSC) handleCDS(ll []*xdsapi.Cluster) {
463
464	cn := []string{}
465	cdsSize := 0
466	edscds := map[string]*xdsapi.Cluster{}
467	cds := map[string]*xdsapi.Cluster{}
468	for _, c := range ll {
469		cdsSize += proto.Size(c)
470		switch v := c.ClusterDiscoveryType.(type) {
471		case *xdsapi.Cluster_Type:
472			if v.Type != xdsapi.Cluster_EDS {
473				cds[c.Name] = c
474				continue
475			}
476		}
477		cn = append(cn, c.Name)
478		edscds[c.Name] = c
479	}
480
481	adscLog.Infof("CDS: %d size=%d", len(cn), cdsSize)
482
483	if len(cn) > 0 {
484		a.sendRsc(endpointType, cn)
485	}
486	if adscLog.DebugEnabled() {
487		b, _ := json.MarshalIndent(ll, " ", " ")
488		adscLog.Info(string(b))
489	}
490
491	a.mutex.Lock()
492	defer a.mutex.Unlock()
493	a.edsClusters = edscds
494	a.clusters = cds
495
496	select {
497	case a.Updates <- "cds":
498	default:
499	}
500}
501
502func (a *ADSC) node() *core.Node {
503	n := &core.Node{
504		Id: a.nodeID,
505	}
506	if a.Metadata == nil {
507		n.Metadata = &pstruct.Struct{
508			Fields: map[string]*pstruct.Value{
509				"ISTIO_VERSION": {Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}},
510			}}
511	} else {
512		n.Metadata = a.Metadata
513	}
514	return n
515}
516
517func (a *ADSC) Send(req *xdsapi.DiscoveryRequest) error {
518	req.Node = a.node()
519	req.ResponseNonce = time.Now().String()
520	return a.stream.Send(req)
521}
522
523func (a *ADSC) handleEDS(eds []*xdsapi.ClusterLoadAssignment) {
524	la := map[string]*xdsapi.ClusterLoadAssignment{}
525	edsSize := 0
526	ep := 0
527	for _, cla := range eds {
528		edsSize += proto.Size(cla)
529		la[cla.ClusterName] = cla
530		ep += len(cla.Endpoints)
531	}
532
533	adscLog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep)
534	if adscLog.DebugEnabled() {
535		b, _ := json.MarshalIndent(eds, " ", " ")
536		adscLog.Info(string(b))
537	}
538	if a.InitialLoad == 0 {
539		// first load - Envoy loads listeners after endpoints
540		_ = a.stream.Send(&xdsapi.DiscoveryRequest{
541			ResponseNonce: time.Now().String(),
542			Node:          a.node(),
543			TypeUrl:       listenerType,
544		})
545	}
546
547	a.mutex.Lock()
548	defer a.mutex.Unlock()
549	a.eds = la
550
551	select {
552	case a.Updates <- "eds":
553	default:
554	}
555}
556
557func (a *ADSC) handleRDS(configurations []*xdsapi.RouteConfiguration) {
558
559	vh := 0
560	rcount := 0
561	size := 0
562
563	rds := map[string]*xdsapi.RouteConfiguration{}
564
565	for _, r := range configurations {
566		for _, h := range r.VirtualHosts {
567			vh++
568			for _, rt := range h.Routes {
569				rcount++
570				// Example: match:<prefix:"/" > route:<cluster:"outbound|9154||load-se-154.local" ...
571				adscLog.Debugf("Handle route %v, path %v, cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster())
572			}
573		}
574		rds[r.Name] = r
575		size += proto.Size(r)
576	}
577	if a.InitialLoad == 0 {
578		a.InitialLoad = time.Since(a.watchTime)
579		adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", len(configurations), size, vh, rcount, a.InitialLoad)
580	} else {
581		adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d", len(configurations), size, vh, rcount)
582	}
583
584	if adscLog.DebugEnabled() {
585		b, _ := json.MarshalIndent(configurations, " ", " ")
586		adscLog.Info(string(b))
587	}
588
589	a.mutex.Lock()
590	a.routes = rds
591	a.mutex.Unlock()
592
593	select {
594	case a.Updates <- "rds":
595	default:
596	}
597
598}
599
600// WaitClear will clear the waiting events, so next call to Wait will get
601// the next push type.
602func (a *ADSC) WaitClear() {
603	for {
604		select {
605		case <-a.Updates:
606		default:
607			return
608		}
609	}
610}
611
612// Wait for an updates for all the specified types
613// If updates is empty, this will wait for any update
614func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error) {
615	t := time.NewTimer(to)
616	want := map[string]struct{}{}
617	for _, update := range updates {
618		want[update] = struct{}{}
619	}
620	got := make([]string, 0, len(updates))
621	for {
622		select {
623		case t := <-a.Updates:
624			delete(want, t)
625			got = append(got, t)
626			if len(want) == 0 {
627				return got, nil
628			}
629		case <-t.C:
630			return got, fmt.Errorf("timeout, still waiting for updates: %v", want)
631		}
632	}
633}
634
635// EndpointsJSON returns the endpoints, formatted as JSON, for debugging.
636func (a *ADSC) EndpointsJSON() string {
637	out, _ := json.MarshalIndent(a.eds, " ", " ")
638	return string(out)
639}
640
641// Watch will start watching resources, starting with LDS. Based on the LDS response
642// it will start watching RDS and CDS.
643func (a *ADSC) Watch() {
644	a.watchTime = time.Now()
645	_ = a.stream.Send(&xdsapi.DiscoveryRequest{
646		ResponseNonce: time.Now().String(),
647		Node:          a.node(),
648		TypeUrl:       clusterType,
649	})
650}
651
652func (a *ADSC) sendRsc(typeurl string, rsc []string) {
653	_ = a.stream.Send(&xdsapi.DiscoveryRequest{
654		ResponseNonce: "",
655		Node:          a.node(),
656		TypeUrl:       typeurl,
657		ResourceNames: rsc,
658	})
659}
660
661func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) {
662	_ = a.stream.Send(&xdsapi.DiscoveryRequest{
663		ResponseNonce: msg.Nonce,
664		TypeUrl:       msg.TypeUrl,
665		Node:          a.node(),
666		VersionInfo:   msg.VersionInfo,
667	})
668}
669
670// GetHTTPListeners returns all the http listeners.
671func (a *ADSC) GetHTTPListeners() map[string]*xdsapi.Listener {
672	a.mutex.Lock()
673	defer a.mutex.Unlock()
674	return a.httpListeners
675}
676
677// GetTCPListeners returns all the tcp listeners.
678func (a *ADSC) GetTCPListeners() map[string]*xdsapi.Listener {
679	a.mutex.Lock()
680	defer a.mutex.Unlock()
681	return a.tcpListeners
682}
683
684// GetEdsClusters returns all the eds type clusters.
685func (a *ADSC) GetEdsClusters() map[string]*xdsapi.Cluster {
686	a.mutex.Lock()
687	defer a.mutex.Unlock()
688	return a.edsClusters
689}
690
691// GetClusters returns all the non-eds type clusters.
692func (a *ADSC) GetClusters() map[string]*xdsapi.Cluster {
693	a.mutex.Lock()
694	defer a.mutex.Unlock()
695	return a.clusters
696}
697
698// GetRoutes returns all the routes.
699func (a *ADSC) GetRoutes() map[string]*xdsapi.RouteConfiguration {
700	a.mutex.Lock()
701	defer a.mutex.Unlock()
702	return a.routes
703}
704
705// GetEndpoints returns all the routes.
706func (a *ADSC) GetEndpoints() map[string]*xdsapi.ClusterLoadAssignment {
707	a.mutex.Lock()
708	defer a.mutex.Unlock()
709	return a.eds
710}
711