1// Copyright 2017 Istio Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v2
16
17import (
18	"time"
19
20	xdsapi "github.com/envoyproxy/go-control-plane/envoy/api/v2"
21	"google.golang.org/grpc/codes"
22
23	"istio.io/istio/pilot/pkg/model"
24)
25
26// gen2 provides experimental support for extended generation mechanism.
27
28// handleReqAck checks if the message is an ack/nack and handles it, returning true.
29// If false, the request should be processed by calling the generator.
30func (s *DiscoveryServer) handleReqAck(con *XdsConnection, discReq *xdsapi.DiscoveryRequest) (*model.WatchedResource, bool) {
31
32	// All NACKs should have ErrorDetail set !
33	// Relying on versionCode != sentVersionCode as nack is less reliable.
34
35	isAck := true
36
37	t := discReq.TypeUrl
38	con.mu.RLock()
39	w := con.node.Active[t]
40	if w == nil {
41		w = &model.WatchedResource{
42			TypeUrl: t,
43		}
44		con.node.Active[t] = w
45		isAck = false // newly watched resource
46	}
47	con.mu.RUnlock()
48
49	if discReq.ErrorDetail != nil {
50		errCode := codes.Code(discReq.ErrorDetail.Code)
51		adsLog.Warnf("ADS: ACK ERROR %s %s:%s", con.ConID, errCode.String(), discReq.ErrorDetail.GetMessage())
52		return w, true
53	}
54
55	if discReq.ResponseNonce == "" {
56		isAck = false // initial request
57	}
58	// This is an ACK response to a previous message - but it may refer to a response on a previous connection to
59	// a different XDS server instance.
60	nonceSent := w.NonceSent
61
62	// GRPC doesn't send version info in NACKs for RDS. Technically if nonce matches
63	// previous response, it is an ACK/NACK.
64	if nonceSent != "" && nonceSent == discReq.ResponseNonce {
65		adsLog.Debugf("ADS: ACK %s %s %s %v", con.ConID, discReq.VersionInfo, discReq.ResponseNonce,
66			time.Since(w.LastSent))
67		w.NonceAcked = discReq.ResponseNonce
68	}
69
70	if nonceSent != discReq.ResponseNonce {
71		adsLog.Debugf("ADS:RDS: Expired nonce received %s, sent %s, received %s",
72			con.ConID, nonceSent, discReq.ResponseNonce)
73		rdsExpiredNonce.Increment()
74		// This is an ACK for a resource sent on an older stream, or out of sync.
75		// Send a response back.
76		isAck = false
77	}
78
79	// Change in the set of watched resource - regardless of ack, send new data.
80	if !listEqualUnordered(w.ResourceNames, discReq.ResourceNames) {
81		isAck = false
82		w.ResourceNames = discReq.ResourceNames
83	}
84
85	return w, isAck
86}
87
88// handleCustomGenerator uses model.Generator to generate the response.
89func (s *DiscoveryServer) handleCustomGenerator(con *XdsConnection, req *xdsapi.DiscoveryRequest) error {
90	w, isAck := s.handleReqAck(con, req)
91	if isAck {
92		return nil
93	}
94
95	push := s.globalPushContext()
96	resp := &xdsapi.DiscoveryResponse{
97		TypeUrl:     w.TypeUrl,
98		VersionInfo: push.Version, // TODO: we can now generate per-type version !
99		Nonce:       nonce(push.Version),
100	}
101
102	cl := con.node.XdsResourceGenerator.Generate(con.node, push, w)
103	sz := 0
104	for _, rc := range cl {
105		resp.Resources = append(resp.Resources, rc)
106		sz += len(rc.Value)
107	}
108
109	err := con.send(resp)
110	if err != nil {
111		adsLog.Warnf("ADS: Send failure %s: %v", con.ConID, err)
112		recordSendError(apiSendErrPushes, err)
113		return err
114	}
115	apiPushes.Increment()
116	w.LastSent = time.Now()
117	w.LastSize = sz // just resource size - doesn't include header and types
118	w.NonceSent = resp.Nonce
119
120	return nil
121}
122
123// TODO: verify that ProxyNeedsPush works correctly for Generator - ie. Sidecar visibility
124// is respected for arbitrary resource types.
125
126// Called for config updates.
127// Will not be called if ProxyNeedsPush returns false - ie. if the update
128func (s *DiscoveryServer) pushGeneratorV2(con *XdsConnection, push *model.PushContext, currentVersion string, w *model.WatchedResource) error {
129	// TODO: generators may send incremental changes if both sides agree on the protocol.
130	// This is specific to each generator type.
131	cl := con.node.XdsResourceGenerator.Generate(con.node, push, w)
132	if cl == nil {
133		return nil // No push needed.
134	}
135
136	// TODO: add a 'version' to the result of generator. If set, use it to determine if the result
137	// changed - in many cases it will not change, so we can skip the push. Also the version will
138	// become dependent of the specific resource - for example in case of API it'll be the largest
139	// version of the requested type.
140
141	resp := &xdsapi.DiscoveryResponse{
142		TypeUrl:     w.TypeUrl,
143		VersionInfo: currentVersion,
144		Nonce:       nonce(push.Version),
145	}
146
147	sz := 0
148	for _, rc := range cl {
149		resp.Resources = append(resp.Resources, rc)
150		sz += len(rc.Value)
151	}
152
153	err := con.send(resp)
154	if err != nil {
155		adsLog.Warnf("ADS: Send failure %s: %v", con.ConID, err)
156		recordSendError(apiSendErrPushes, err)
157		return err
158	}
159	w.LastSent = time.Now()
160	w.LastSize = sz // just resource size - doesn't include header and types
161	w.NonceSent = resp.Nonce
162
163	adsLog.Infof("XDS: PUSH for node:%s listeners:%d", con.node.ID, len(cl))
164	return nil
165}
166