1// Copyright 2018 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package adsc 16 17import ( 18 "context" 19 "crypto/tls" 20 "crypto/x509" 21 "encoding/json" 22 "fmt" 23 "io/ioutil" 24 "net" 25 "sync" 26 "time" 27 28 "istio.io/istio/pilot/pkg/networking/util" 29 30 xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2" 31 core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" 32 ads "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" 33 "github.com/envoyproxy/go-control-plane/pkg/conversion" 34 "github.com/golang/protobuf/jsonpb" 35 "github.com/golang/protobuf/proto" 36 pstruct "github.com/golang/protobuf/ptypes/struct" 37 "google.golang.org/grpc" 38 "google.golang.org/grpc/credentials" 39 40 istiolog "istio.io/pkg/log" 41) 42 43// Config for the ADS connection. 44type Config struct { 45 // Namespace defaults to 'default' 46 Namespace string 47 48 // Workload defaults to 'test' 49 Workload string 50 51 // Meta includes additional metadata for the node 52 Meta *pstruct.Struct 53 54 // NodeType defaults to sidecar. "ingress" and "router" are also supported. 55 NodeType string 56 57 // IP is currently the primary key used to locate inbound configs. It is sent by client, 58 // must match a known endpoint IP. Tests can use a ServiceEntry to register fake IPs. 59 IP string 60} 61 62// ADSC implements a basic client for ADS, for use in stress tests and tools 63// or libraries that need to connect to Istio pilot or other ADS servers. 64type ADSC struct { 65 // Stream is the GRPC connection stream, allowing direct GRPC send operations. 66 // Set after Dial is called. 67 stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesClient 68 69 conn *grpc.ClientConn 70 71 // NodeID is the node identity sent to Pilot. 72 nodeID string 73 74 certDir string 75 url string 76 77 watchTime time.Time 78 79 // InitialLoad tracks the time to receive the initial configuration. 80 InitialLoad time.Duration 81 82 // httpListeners contains received listeners with a http_connection_manager filter. 83 httpListeners map[string]*xdsapi.Listener 84 85 // tcpListeners contains all listeners of type TCP (not-HTTP) 86 tcpListeners map[string]*xdsapi.Listener 87 88 // All received clusters of type eds, keyed by name 89 edsClusters map[string]*xdsapi.Cluster 90 91 // All received clusters of no-eds type, keyed by name 92 clusters map[string]*xdsapi.Cluster 93 94 // All received routes, keyed by route name 95 routes map[string]*xdsapi.RouteConfiguration 96 97 // All received endpoints, keyed by cluster name 98 eds map[string]*xdsapi.ClusterLoadAssignment 99 100 // Metadata has the node metadata to send to pilot. 101 // If nil, the defaults will be used. 102 Metadata *pstruct.Struct 103 104 // Updates includes the type of the last update received from the server. 105 Updates chan string 106 VersionInfo map[string]string 107 108 mutex sync.Mutex 109} 110 111const ( 112 typePrefix = "type.googleapis.com/envoy.api.v2." 113 114 // Constants used for XDS 115 116 // ClusterType is used for cluster discovery. Typically first request received 117 clusterType = typePrefix + "Cluster" 118 // EndpointType is used for EDS and ADS endpoint discovery. Typically second request. 119 endpointType = typePrefix + "ClusterLoadAssignment" 120 // ListenerType is sent after clusters and endpoints. 121 listenerType = typePrefix + "Listener" 122 // RouteType is sent after listeners. 123 routeType = typePrefix + "RouteConfiguration" 124) 125 126var ( 127 adscLog = istiolog.RegisterScope("adsc", "adsc debugging", 0) 128) 129 130// Dial connects to a ADS server, with optional MTLS authentication if a cert dir is specified. 131func Dial(url string, certDir string, opts *Config) (*ADSC, error) { 132 adsc := &ADSC{ 133 Updates: make(chan string, 100), 134 VersionInfo: map[string]string{}, 135 certDir: certDir, 136 url: url, 137 } 138 if opts.Namespace == "" { 139 opts.Namespace = "default" 140 } 141 if opts.NodeType == "" { 142 opts.NodeType = "sidecar" 143 } 144 if opts.IP == "" { 145 opts.IP = getPrivateIPIfAvailable().String() 146 } 147 if opts.Workload == "" { 148 opts.Workload = "test-1" 149 } 150 adsc.Metadata = opts.Meta 151 152 adsc.nodeID = fmt.Sprintf("%s~%s~%s.%s~%s.svc.cluster.local", opts.NodeType, opts.IP, 153 opts.Workload, opts.Namespace, opts.Namespace) 154 155 err := adsc.Run() 156 return adsc, err 157} 158 159// Returns a private IP address, or unspecified IP (0.0.0.0) if no IP is available 160func getPrivateIPIfAvailable() net.IP { 161 addrs, _ := net.InterfaceAddrs() 162 for _, addr := range addrs { 163 var ip net.IP 164 switch v := addr.(type) { 165 case *net.IPNet: 166 ip = v.IP 167 case *net.IPAddr: 168 ip = v.IP 169 default: 170 continue 171 } 172 if !ip.IsLoopback() { 173 return ip 174 } 175 } 176 return net.IPv4zero 177} 178 179func tlsConfig(certDir string) (*tls.Config, error) { 180 clientCert, err := tls.LoadX509KeyPair(certDir+"/cert-chain.pem", 181 certDir+"/key.pem") 182 if err != nil { 183 return nil, err 184 } 185 186 serverCABytes, err := ioutil.ReadFile(certDir + "/root-cert.pem") 187 if err != nil { 188 return nil, err 189 } 190 serverCAs := x509.NewCertPool() 191 if ok := serverCAs.AppendCertsFromPEM(serverCABytes); !ok { 192 return nil, err 193 } 194 195 return &tls.Config{ 196 Certificates: []tls.Certificate{clientCert}, 197 RootCAs: serverCAs, 198 ServerName: "istio-pilot.istio-system", 199 VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error { 200 return nil 201 }, 202 }, nil 203} 204 205// Close the stream. 206func (a *ADSC) Close() { 207 a.mutex.Lock() 208 a.conn.Close() 209 a.mutex.Unlock() 210} 211 212// Run will run the ADS client. 213func (a *ADSC) Run() error { 214 215 // TODO: pass version info, nonce properly 216 var err error 217 if len(a.certDir) > 0 { 218 tlsCfg, err := tlsConfig(a.certDir) 219 if err != nil { 220 return err 221 } 222 creds := credentials.NewTLS(tlsCfg) 223 224 opts := []grpc.DialOption{ 225 // Verify Pilot cert and service account 226 grpc.WithTransportCredentials(creds), 227 } 228 a.conn, err = grpc.Dial(a.url, opts...) 229 if err != nil { 230 return err 231 } 232 } else { 233 a.conn, err = grpc.Dial(a.url, grpc.WithInsecure()) 234 if err != nil { 235 return err 236 } 237 } 238 239 xds := ads.NewAggregatedDiscoveryServiceClient(a.conn) 240 edsstr, err := xds.StreamAggregatedResources(context.Background()) 241 if err != nil { 242 return err 243 } 244 a.stream = edsstr 245 go a.handleRecv() 246 return nil 247} 248 249func (a *ADSC) handleRecv() { 250 for { 251 msg, err := a.stream.Recv() 252 if err != nil { 253 adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err) 254 a.Close() 255 a.WaitClear() 256 a.Updates <- "close" 257 return 258 } 259 260 listeners := []*xdsapi.Listener{} 261 clusters := []*xdsapi.Cluster{} 262 routes := []*xdsapi.RouteConfiguration{} 263 eds := []*xdsapi.ClusterLoadAssignment{} 264 for _, rsc := range msg.Resources { // Any 265 a.VersionInfo[rsc.TypeUrl] = msg.VersionInfo 266 valBytes := rsc.Value 267 if rsc.TypeUrl == listenerType { 268 ll := &xdsapi.Listener{} 269 _ = proto.Unmarshal(valBytes, ll) 270 listeners = append(listeners, ll) 271 } else if rsc.TypeUrl == clusterType { 272 ll := &xdsapi.Cluster{} 273 _ = proto.Unmarshal(valBytes, ll) 274 clusters = append(clusters, ll) 275 } else if rsc.TypeUrl == endpointType { 276 ll := &xdsapi.ClusterLoadAssignment{} 277 _ = proto.Unmarshal(valBytes, ll) 278 eds = append(eds, ll) 279 } else if rsc.TypeUrl == routeType { 280 ll := &xdsapi.RouteConfiguration{} 281 _ = proto.Unmarshal(valBytes, ll) 282 routes = append(routes, ll) 283 } 284 } 285 286 // TODO: add hook to inject nacks 287 a.mutex.Lock() 288 a.ack(msg) 289 a.mutex.Unlock() 290 291 if len(listeners) > 0 { 292 a.handleLDS(listeners) 293 } 294 if len(clusters) > 0 { 295 a.handleCDS(clusters) 296 } 297 if len(eds) > 0 { 298 a.handleEDS(eds) 299 } 300 if len(routes) > 0 { 301 a.handleRDS(routes) 302 } 303 } 304 305} 306 307// nolint: staticcheck 308func (a *ADSC) handleLDS(ll []*xdsapi.Listener) { 309 lh := map[string]*xdsapi.Listener{} 310 lt := map[string]*xdsapi.Listener{} 311 312 routes := []string{} 313 ldsSize := 0 314 315 for _, l := range ll { 316 ldsSize += proto.Size(l) 317 318 // The last filter is the actual destination for inbound listener 319 filter := l.FilterChains[len(l.FilterChains)-1].Filters[0] 320 321 // The actual destination will be the next to the last if the last filter is a passthrough filter 322 if l.FilterChains[len(l.FilterChains)-1].GetName() == util.PassthroughFilterChain { 323 filter = l.FilterChains[len(l.FilterChains)-2].Filters[0] 324 } 325 326 if filter.Name == "mixer" { 327 filter = l.FilterChains[len(l.FilterChains)-1].Filters[1] 328 } 329 if filter.Name == "envoy.tcp_proxy" { 330 lt[l.Name] = l 331 config := filter.GetConfig() 332 if config == nil { 333 config, _ = conversion.MessageToStruct(filter.GetTypedConfig()) 334 } 335 c := config.Fields["cluster"].GetStringValue() 336 adscLog.Debugf("TCP: %s -> %s", l.Name, c) 337 } else if filter.Name == "envoy.http_connection_manager" { 338 lh[l.Name] = l 339 340 // Getting from config is too painful.. 341 port := l.Address.GetSocketAddress().GetPortValue() 342 if port == 15002 { 343 routes = append(routes, "http_proxy") 344 } else { 345 routes = append(routes, fmt.Sprintf("%d", port)) 346 } 347 } else if filter.Name == "envoy.mongo_proxy" { 348 // ignore for now 349 } else if filter.Name == "envoy.redis_proxy" { 350 // ignore for now 351 } else if filter.Name == "envoy.filters.network.mysql_proxy" { 352 // ignore for now 353 } else { 354 tm := &jsonpb.Marshaler{Indent: " "} 355 adscLog.Infof(tm.MarshalToString(l)) 356 } 357 } 358 359 adscLog.Infof("LDS: http=%d tcp=%d size=%d", len(lh), len(lt), ldsSize) 360 if adscLog.DebugEnabled() { 361 b, _ := json.MarshalIndent(ll, " ", " ") 362 adscLog.Debugf(string(b)) 363 } 364 a.mutex.Lock() 365 defer a.mutex.Unlock() 366 if len(routes) > 0 { 367 a.sendRsc(routeType, routes) 368 } 369 a.httpListeners = lh 370 a.tcpListeners = lt 371 372 select { 373 case a.Updates <- "lds": 374 default: 375 } 376} 377 378// compact representations, for simplified debugging/testing 379 380// TCPListener extracts the core elements from envoy Listener. 381type TCPListener struct { 382 // Address is the address, as expected by go Dial and Listen, including port 383 Address string 384 385 // LogFile is the access log address for the listener 386 LogFile string 387 388 // Target is the destination cluster. 389 Target string 390} 391 392type Target struct { 393 394 // Address is a go address, extracted from the mangled cluster name. 395 Address string 396 397 // Endpoints are the resolved endpoints from eds or cluster static. 398 Endpoints map[string]Endpoint 399} 400 401type Endpoint struct { 402 // Weight extracted from eds 403 Weight int 404} 405 406// Save will save the json configs to files, using the base directory 407func (a *ADSC) Save(base string) error { 408 a.mutex.Lock() 409 defer a.mutex.Unlock() 410 strResponse, err := json.MarshalIndent(a.tcpListeners, " ", " ") 411 if err != nil { 412 return err 413 } 414 err = ioutil.WriteFile(base+"_lds_tcp.json", strResponse, 0644) 415 if err != nil { 416 return err 417 } 418 strResponse, err = json.MarshalIndent(a.httpListeners, " ", " ") 419 if err != nil { 420 return err 421 } 422 err = ioutil.WriteFile(base+"_lds_http.json", strResponse, 0644) 423 if err != nil { 424 return err 425 } 426 strResponse, err = json.MarshalIndent(a.routes, " ", " ") 427 if err != nil { 428 return err 429 } 430 err = ioutil.WriteFile(base+"_rds.json", strResponse, 0644) 431 if err != nil { 432 return err 433 } 434 strResponse, err = json.MarshalIndent(a.edsClusters, " ", " ") 435 if err != nil { 436 return err 437 } 438 err = ioutil.WriteFile(base+"_ecds.json", strResponse, 0644) 439 if err != nil { 440 return err 441 } 442 strResponse, err = json.MarshalIndent(a.clusters, " ", " ") 443 if err != nil { 444 return err 445 } 446 err = ioutil.WriteFile(base+"_cds.json", strResponse, 0644) 447 if err != nil { 448 return err 449 } 450 strResponse, err = json.MarshalIndent(a.eds, " ", " ") 451 if err != nil { 452 return err 453 } 454 err = ioutil.WriteFile(base+"_eds.json", strResponse, 0644) 455 if err != nil { 456 return err 457 } 458 459 return err 460} 461 462func (a *ADSC) handleCDS(ll []*xdsapi.Cluster) { 463 464 cn := []string{} 465 cdsSize := 0 466 edscds := map[string]*xdsapi.Cluster{} 467 cds := map[string]*xdsapi.Cluster{} 468 for _, c := range ll { 469 cdsSize += proto.Size(c) 470 switch v := c.ClusterDiscoveryType.(type) { 471 case *xdsapi.Cluster_Type: 472 if v.Type != xdsapi.Cluster_EDS { 473 cds[c.Name] = c 474 continue 475 } 476 } 477 cn = append(cn, c.Name) 478 edscds[c.Name] = c 479 } 480 481 adscLog.Infof("CDS: %d size=%d", len(cn), cdsSize) 482 483 if len(cn) > 0 { 484 a.sendRsc(endpointType, cn) 485 } 486 if adscLog.DebugEnabled() { 487 b, _ := json.MarshalIndent(ll, " ", " ") 488 adscLog.Info(string(b)) 489 } 490 491 a.mutex.Lock() 492 defer a.mutex.Unlock() 493 a.edsClusters = edscds 494 a.clusters = cds 495 496 select { 497 case a.Updates <- "cds": 498 default: 499 } 500} 501 502func (a *ADSC) node() *core.Node { 503 n := &core.Node{ 504 Id: a.nodeID, 505 } 506 if a.Metadata == nil { 507 n.Metadata = &pstruct.Struct{ 508 Fields: map[string]*pstruct.Value{ 509 "ISTIO_VERSION": {Kind: &pstruct.Value_StringValue{StringValue: "65536.65536.65536"}}, 510 }} 511 } else { 512 n.Metadata = a.Metadata 513 } 514 return n 515} 516 517func (a *ADSC) Send(req *xdsapi.DiscoveryRequest) error { 518 req.Node = a.node() 519 req.ResponseNonce = time.Now().String() 520 return a.stream.Send(req) 521} 522 523func (a *ADSC) handleEDS(eds []*xdsapi.ClusterLoadAssignment) { 524 la := map[string]*xdsapi.ClusterLoadAssignment{} 525 edsSize := 0 526 ep := 0 527 for _, cla := range eds { 528 edsSize += proto.Size(cla) 529 la[cla.ClusterName] = cla 530 ep += len(cla.Endpoints) 531 } 532 533 adscLog.Infof("eds: %d size=%d ep=%d", len(eds), edsSize, ep) 534 if adscLog.DebugEnabled() { 535 b, _ := json.MarshalIndent(eds, " ", " ") 536 adscLog.Info(string(b)) 537 } 538 if a.InitialLoad == 0 { 539 // first load - Envoy loads listeners after endpoints 540 _ = a.stream.Send(&xdsapi.DiscoveryRequest{ 541 ResponseNonce: time.Now().String(), 542 Node: a.node(), 543 TypeUrl: listenerType, 544 }) 545 } 546 547 a.mutex.Lock() 548 defer a.mutex.Unlock() 549 a.eds = la 550 551 select { 552 case a.Updates <- "eds": 553 default: 554 } 555} 556 557func (a *ADSC) handleRDS(configurations []*xdsapi.RouteConfiguration) { 558 559 vh := 0 560 rcount := 0 561 size := 0 562 563 rds := map[string]*xdsapi.RouteConfiguration{} 564 565 for _, r := range configurations { 566 for _, h := range r.VirtualHosts { 567 vh++ 568 for _, rt := range h.Routes { 569 rcount++ 570 // Example: match:<prefix:"/" > route:<cluster:"outbound|9154||load-se-154.local" ... 571 adscLog.Debugf("Handle route %v, path %v, cluster %v", h.Name, rt.Match.PathSpecifier, rt.GetRoute().GetCluster()) 572 } 573 } 574 rds[r.Name] = r 575 size += proto.Size(r) 576 } 577 if a.InitialLoad == 0 { 578 a.InitialLoad = time.Since(a.watchTime) 579 adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d time=%d", len(configurations), size, vh, rcount, a.InitialLoad) 580 } else { 581 adscLog.Infof("RDS: %d size=%d vhosts=%d routes=%d", len(configurations), size, vh, rcount) 582 } 583 584 if adscLog.DebugEnabled() { 585 b, _ := json.MarshalIndent(configurations, " ", " ") 586 adscLog.Info(string(b)) 587 } 588 589 a.mutex.Lock() 590 a.routes = rds 591 a.mutex.Unlock() 592 593 select { 594 case a.Updates <- "rds": 595 default: 596 } 597 598} 599 600// WaitClear will clear the waiting events, so next call to Wait will get 601// the next push type. 602func (a *ADSC) WaitClear() { 603 for { 604 select { 605 case <-a.Updates: 606 default: 607 return 608 } 609 } 610} 611 612// Wait for an updates for all the specified types 613// If updates is empty, this will wait for any update 614func (a *ADSC) Wait(to time.Duration, updates ...string) ([]string, error) { 615 t := time.NewTimer(to) 616 want := map[string]struct{}{} 617 for _, update := range updates { 618 want[update] = struct{}{} 619 } 620 got := make([]string, 0, len(updates)) 621 for { 622 select { 623 case t := <-a.Updates: 624 delete(want, t) 625 got = append(got, t) 626 if len(want) == 0 { 627 return got, nil 628 } 629 case <-t.C: 630 return got, fmt.Errorf("timeout, still waiting for updates: %v", want) 631 } 632 } 633} 634 635// EndpointsJSON returns the endpoints, formatted as JSON, for debugging. 636func (a *ADSC) EndpointsJSON() string { 637 out, _ := json.MarshalIndent(a.eds, " ", " ") 638 return string(out) 639} 640 641// Watch will start watching resources, starting with LDS. Based on the LDS response 642// it will start watching RDS and CDS. 643func (a *ADSC) Watch() { 644 a.watchTime = time.Now() 645 _ = a.stream.Send(&xdsapi.DiscoveryRequest{ 646 ResponseNonce: time.Now().String(), 647 Node: a.node(), 648 TypeUrl: clusterType, 649 }) 650} 651 652func (a *ADSC) sendRsc(typeurl string, rsc []string) { 653 _ = a.stream.Send(&xdsapi.DiscoveryRequest{ 654 ResponseNonce: "", 655 Node: a.node(), 656 TypeUrl: typeurl, 657 ResourceNames: rsc, 658 }) 659} 660 661func (a *ADSC) ack(msg *xdsapi.DiscoveryResponse) { 662 _ = a.stream.Send(&xdsapi.DiscoveryRequest{ 663 ResponseNonce: msg.Nonce, 664 TypeUrl: msg.TypeUrl, 665 Node: a.node(), 666 VersionInfo: msg.VersionInfo, 667 }) 668} 669 670// GetHTTPListeners returns all the http listeners. 671func (a *ADSC) GetHTTPListeners() map[string]*xdsapi.Listener { 672 a.mutex.Lock() 673 defer a.mutex.Unlock() 674 return a.httpListeners 675} 676 677// GetTCPListeners returns all the tcp listeners. 678func (a *ADSC) GetTCPListeners() map[string]*xdsapi.Listener { 679 a.mutex.Lock() 680 defer a.mutex.Unlock() 681 return a.tcpListeners 682} 683 684// GetEdsClusters returns all the eds type clusters. 685func (a *ADSC) GetEdsClusters() map[string]*xdsapi.Cluster { 686 a.mutex.Lock() 687 defer a.mutex.Unlock() 688 return a.edsClusters 689} 690 691// GetClusters returns all the non-eds type clusters. 692func (a *ADSC) GetClusters() map[string]*xdsapi.Cluster { 693 a.mutex.Lock() 694 defer a.mutex.Unlock() 695 return a.clusters 696} 697 698// GetRoutes returns all the routes. 699func (a *ADSC) GetRoutes() map[string]*xdsapi.RouteConfiguration { 700 a.mutex.Lock() 701 defer a.mutex.Unlock() 702 return a.routes 703} 704 705// GetEndpoints returns all the routes. 706func (a *ADSC) GetEndpoints() map[string]*xdsapi.ClusterLoadAssignment { 707 a.mutex.Lock() 708 defer a.mutex.Unlock() 709 return a.eds 710} 711