1// Copyright 2017 Istio Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package v2 16 17import ( 18 "time" 19 20 xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2" 21 "google.golang.org/grpc/codes" 22 23 "istio.io/istio/pilot/pkg/model" 24) 25 26// gen2 provides experimental support for extended generation mechanism. 27 28// handleReqAck checks if the message is an ack/nack and handles it, returning true. 29// If false, the request should be processed by calling the generator. 30func (s *DiscoveryServer) handleReqAck(con *XdsConnection, discReq *xdsapi.DiscoveryRequest) (*model.WatchedResource, bool) { 31 32 // All NACKs should have ErrorDetail set ! 33 // Relying on versionCode != sentVersionCode as nack is less reliable. 34 35 isAck := true 36 37 t := discReq.TypeUrl 38 con.mu.RLock() 39 w := con.node.Active[t] 40 if w == nil { 41 w = &model.WatchedResource{ 42 TypeUrl: t, 43 } 44 con.node.Active[t] = w 45 isAck = false // newly watched resource 46 } 47 con.mu.RUnlock() 48 49 if discReq.ErrorDetail != nil { 50 errCode := codes.Code(discReq.ErrorDetail.Code) 51 adsLog.Warnf("ADS: ACK ERROR %s %s:%s", con.ConID, errCode.String(), discReq.ErrorDetail.GetMessage()) 52 return w, true 53 } 54 55 if discReq.ResponseNonce == "" { 56 isAck = false // initial request 57 } 58 // This is an ACK response to a previous message - but it may refer to a response on a previous connection to 59 // a different XDS server instance. 60 nonceSent := w.NonceSent 61 62 // GRPC doesn't send version info in NACKs for RDS. Technically if nonce matches 63 // previous response, it is an ACK/NACK. 64 if nonceSent != "" && nonceSent == discReq.ResponseNonce { 65 adsLog.Debugf("ADS: ACK %s %s %s %v", con.ConID, discReq.VersionInfo, discReq.ResponseNonce, 66 time.Since(w.LastSent)) 67 w.NonceAcked = discReq.ResponseNonce 68 } 69 70 if nonceSent != discReq.ResponseNonce { 71 adsLog.Debugf("ADS:RDS: Expired nonce received %s, sent %s, received %s", 72 con.ConID, nonceSent, discReq.ResponseNonce) 73 rdsExpiredNonce.Increment() 74 // This is an ACK for a resource sent on an older stream, or out of sync. 75 // Send a response back. 76 isAck = false 77 } 78 79 // Change in the set of watched resource - regardless of ack, send new data. 80 if !listEqualUnordered(w.ResourceNames, discReq.ResourceNames) { 81 isAck = false 82 w.ResourceNames = discReq.ResourceNames 83 } 84 85 return w, isAck 86} 87 88// handleCustomGenerator uses model.Generator to generate the response. 89func (s *DiscoveryServer) handleCustomGenerator(con *XdsConnection, req *xdsapi.DiscoveryRequest) error { 90 w, isAck := s.handleReqAck(con, req) 91 if isAck { 92 return nil 93 } 94 95 push := s.globalPushContext() 96 resp := &xdsapi.DiscoveryResponse{ 97 TypeUrl: w.TypeUrl, 98 VersionInfo: push.Version, // TODO: we can now generate per-type version ! 99 Nonce: nonce(push.Version), 100 } 101 102 cl := con.node.XdsResourceGenerator.Generate(con.node, push, w) 103 sz := 0 104 for _, rc := range cl { 105 resp.Resources = append(resp.Resources, rc) 106 sz += len(rc.Value) 107 } 108 109 err := con.send(resp) 110 if err != nil { 111 adsLog.Warnf("ADS: Send failure %s: %v", con.ConID, err) 112 recordSendError(apiSendErrPushes, err) 113 return err 114 } 115 apiPushes.Increment() 116 w.LastSent = time.Now() 117 w.LastSize = sz // just resource size - doesn't include header and types 118 w.NonceSent = resp.Nonce 119 120 return nil 121} 122 123// TODO: verify that ProxyNeedsPush works correctly for Generator - ie. Sidecar visibility 124// is respected for arbitrary resource types. 125 126// Called for config updates. 127// Will not be called if ProxyNeedsPush returns false - ie. if the update 128func (s *DiscoveryServer) pushGeneratorV2(con *XdsConnection, push *model.PushContext, currentVersion string, w *model.WatchedResource) error { 129 // TODO: generators may send incremental changes if both sides agree on the protocol. 130 // This is specific to each generator type. 131 cl := con.node.XdsResourceGenerator.Generate(con.node, push, w) 132 if cl == nil { 133 return nil // No push needed. 134 } 135 136 // TODO: add a 'version' to the result of generator. If set, use it to determine if the result 137 // changed - in many cases it will not change, so we can skip the push. Also the version will 138 // become dependent of the specific resource - for example in case of API it'll be the largest 139 // version of the requested type. 140 141 resp := &xdsapi.DiscoveryResponse{ 142 TypeUrl: w.TypeUrl, 143 VersionInfo: currentVersion, 144 Nonce: nonce(push.Version), 145 } 146 147 sz := 0 148 for _, rc := range cl { 149 resp.Resources = append(resp.Resources, rc) 150 sz += len(rc.Value) 151 } 152 153 err := con.send(resp) 154 if err != nil { 155 adsLog.Warnf("ADS: Send failure %s: %v", con.ConID, err) 156 recordSendError(apiSendErrPushes, err) 157 return err 158 } 159 w.LastSent = time.Now() 160 w.LastSize = sz // just resource size - doesn't include header and types 161 w.NonceSent = resp.Nonce 162 163 adsLog.Infof("XDS: PUSH for node:%s listeners:%d", con.node.ID, len(cl)) 164 return nil 165} 166