1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <inttypes.h>
22 #include <limits.h>
23 #include <string.h>
24 
25 #include "absl/container/inlined_vector.h"
26 #include "absl/strings/str_format.h"
27 #include "absl/strings/str_join.h"
28 #include "absl/strings/string_view.h"
29 
30 #include <grpc/byte_buffer_reader.h>
31 #include <grpc/grpc.h>
32 #include <grpc/support/alloc.h>
33 #include <grpc/support/time.h>
34 
35 #include "src/core/ext/filters/client_channel/client_channel.h"
36 #include "src/core/ext/filters/client_channel/service_config.h"
37 #include "src/core/ext/xds/xds_api.h"
38 #include "src/core/ext/xds/xds_channel_args.h"
39 #include "src/core/ext/xds/xds_client.h"
40 #include "src/core/ext/xds/xds_client_stats.h"
41 #include "src/core/lib/backoff/backoff.h"
42 #include "src/core/lib/channel/channel_args.h"
43 #include "src/core/lib/channel/channel_stack.h"
44 #include "src/core/lib/gpr/string.h"
45 #include "src/core/lib/gprpp/map.h"
46 #include "src/core/lib/gprpp/memory.h"
47 #include "src/core/lib/gprpp/orphanable.h"
48 #include "src/core/lib/gprpp/ref_counted_ptr.h"
49 #include "src/core/lib/gprpp/sync.h"
50 #include "src/core/lib/iomgr/sockaddr.h"
51 #include "src/core/lib/iomgr/sockaddr_utils.h"
52 #include "src/core/lib/iomgr/timer.h"
53 #include "src/core/lib/slice/slice_internal.h"
54 #include "src/core/lib/slice/slice_string_helpers.h"
55 #include "src/core/lib/surface/call.h"
56 #include "src/core/lib/surface/channel.h"
57 #include "src/core/lib/surface/channel_init.h"
58 #include "src/core/lib/transport/static_metadata.h"
59 
60 #define GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS 1
61 #define GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER 1.6
62 #define GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS 120
63 #define GRPC_XDS_RECONNECT_JITTER 0.2
64 #define GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS 1000
65 
66 namespace grpc_core {
67 
68 TraceFlag grpc_xds_client_trace(false, "xds_client");
69 
70 namespace {
71 
72 Mutex* g_mu = nullptr;
73 const grpc_channel_args* g_channel_args = nullptr;
74 XdsClient* g_xds_client = nullptr;
75 
76 }  // namespace
77 
78 //
79 // Internal class declarations
80 //
81 
82 // An xds call wrapper that can restart a call upon failure. Holds a ref to
83 // the xds channel. The template parameter is the kind of wrapped xds call.
84 template <typename T>
85 class XdsClient::ChannelState::RetryableCall
86     : public InternallyRefCounted<RetryableCall<T>> {
87  public:
88   explicit RetryableCall(RefCountedPtr<ChannelState> chand);
89 
90   void Orphan() override;
91 
92   void OnCallFinishedLocked();
93 
calld() const94   T* calld() const { return calld_.get(); }
chand() const95   ChannelState* chand() const { return chand_.get(); }
96 
97   bool IsCurrentCallOnChannel() const;
98 
99  private:
100   void StartNewCallLocked();
101   void StartRetryTimerLocked();
102   static void OnRetryTimer(void* arg, grpc_error* error);
103   void OnRetryTimerLocked(grpc_error* error);
104 
105   // The wrapped xds call that talks to the xds server. It's instantiated
106   // every time we start a new call. It's null during call retry backoff.
107   OrphanablePtr<T> calld_;
108   // The owning xds channel.
109   RefCountedPtr<ChannelState> chand_;
110 
111   // Retry state.
112   BackOff backoff_;
113   grpc_timer retry_timer_;
114   grpc_closure on_retry_timer_;
115   bool retry_timer_callback_pending_ = false;
116 
117   bool shutting_down_ = false;
118 };
119 
120 // Contains an ADS call to the xds server.
121 class XdsClient::ChannelState::AdsCallState
122     : public InternallyRefCounted<AdsCallState> {
123  public:
124   // The ctor and dtor should not be used directly.
125   explicit AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent);
126   ~AdsCallState() override;
127 
128   void Orphan() override;
129 
parent() const130   RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
chand() const131   ChannelState* chand() const { return parent_->chand(); }
xds_client() const132   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const133   bool seen_response() const { return seen_response_; }
134 
135   void Subscribe(const std::string& type_url, const std::string& name);
136   void Unsubscribe(const std::string& type_url, const std::string& name,
137                    bool delay_unsubscription);
138 
139   bool HasSubscribedResources() const;
140 
141  private:
142   class ResourceState : public InternallyRefCounted<ResourceState> {
143    public:
ResourceState(const std::string & type_url,const std::string & name,bool sent_initial_request)144     ResourceState(const std::string& type_url, const std::string& name,
145                   bool sent_initial_request)
146         : type_url_(type_url),
147           name_(name),
148           sent_initial_request_(sent_initial_request) {
149       GRPC_CLOSURE_INIT(&timer_callback_, OnTimer, this,
150                         grpc_schedule_on_exec_ctx);
151     }
152 
Orphan()153     void Orphan() override {
154       Finish();
155       Unref(DEBUG_LOCATION, "Orphan");
156     }
157 
Start(RefCountedPtr<AdsCallState> ads_calld)158     void Start(RefCountedPtr<AdsCallState> ads_calld) {
159       if (sent_initial_request_) return;
160       sent_initial_request_ = true;
161       ads_calld_ = std::move(ads_calld);
162       Ref(DEBUG_LOCATION, "timer").release();
163       timer_pending_ = true;
164       grpc_timer_init(
165           &timer_,
166           ExecCtx::Get()->Now() + ads_calld_->xds_client()->request_timeout_,
167           &timer_callback_);
168     }
169 
Finish()170     void Finish() {
171       if (timer_pending_) {
172         grpc_timer_cancel(&timer_);
173         timer_pending_ = false;
174       }
175     }
176 
177    private:
OnTimer(void * arg,grpc_error * error)178     static void OnTimer(void* arg, grpc_error* error) {
179       ResourceState* self = static_cast<ResourceState*>(arg);
180       {
181         MutexLock lock(&self->ads_calld_->xds_client()->mu_);
182         self->OnTimerLocked(GRPC_ERROR_REF(error));
183       }
184       self->ads_calld_.reset();
185       self->Unref(DEBUG_LOCATION, "timer");
186     }
187 
OnTimerLocked(grpc_error * error)188     void OnTimerLocked(grpc_error* error) {
189       if (error == GRPC_ERROR_NONE && timer_pending_) {
190         timer_pending_ = false;
191         grpc_error* watcher_error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(
192             absl::StrFormat(
193                 "timeout obtaining resource {type=%s name=%s} from xds server",
194                 type_url_, name_)
195                 .c_str());
196         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
197           gpr_log(GPR_INFO, "[xds_client %p] %s", ads_calld_->xds_client(),
198                   grpc_error_string(watcher_error));
199         }
200         if (type_url_ == XdsApi::kLdsTypeUrl) {
201           ListenerState& state = ads_calld_->xds_client()->listener_map_[name_];
202           for (const auto& p : state.watchers) {
203             p.first->OnError(GRPC_ERROR_REF(watcher_error));
204           }
205         } else if (type_url_ == XdsApi::kRdsTypeUrl) {
206           RouteConfigState& state =
207               ads_calld_->xds_client()->route_config_map_[name_];
208           for (const auto& p : state.watchers) {
209             p.first->OnError(GRPC_ERROR_REF(watcher_error));
210           }
211         } else if (type_url_ == XdsApi::kCdsTypeUrl) {
212           ClusterState& state = ads_calld_->xds_client()->cluster_map_[name_];
213           for (const auto& p : state.watchers) {
214             p.first->OnError(GRPC_ERROR_REF(watcher_error));
215           }
216         } else if (type_url_ == XdsApi::kEdsTypeUrl) {
217           EndpointState& state = ads_calld_->xds_client()->endpoint_map_[name_];
218           for (const auto& p : state.watchers) {
219             p.first->OnError(GRPC_ERROR_REF(watcher_error));
220           }
221         } else {
222           GPR_UNREACHABLE_CODE(return );
223         }
224         GRPC_ERROR_UNREF(watcher_error);
225       }
226       GRPC_ERROR_UNREF(error);
227     }
228 
229     const std::string type_url_;
230     const std::string name_;
231 
232     RefCountedPtr<AdsCallState> ads_calld_;
233     bool sent_initial_request_;
234     bool timer_pending_ = false;
235     grpc_timer timer_;
236     grpc_closure timer_callback_;
237   };
238 
239   struct ResourceTypeState {
~ResourceTypeStategrpc_core::XdsClient::ChannelState::AdsCallState::ResourceTypeState240     ~ResourceTypeState() { GRPC_ERROR_UNREF(error); }
241 
242     // Nonce and error for this resource type.
243     std::string nonce;
244     grpc_error* error = GRPC_ERROR_NONE;
245 
246     // Subscribed resources of this type.
247     std::map<std::string /* name */, OrphanablePtr<ResourceState>>
248         subscribed_resources;
249   };
250 
251   void SendMessageLocked(const std::string& type_url);
252 
253   void AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map);
254   void AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map);
255   void AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map);
256   void AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map);
257 
258   static void OnRequestSent(void* arg, grpc_error* error);
259   void OnRequestSentLocked(grpc_error* error);
260   static void OnResponseReceived(void* arg, grpc_error* error);
261   bool OnResponseReceivedLocked();
262   static void OnStatusReceived(void* arg, grpc_error* error);
263   void OnStatusReceivedLocked(grpc_error* error);
264 
265   bool IsCurrentCallOnChannel() const;
266 
267   std::set<absl::string_view> ResourceNamesForRequest(
268       const std::string& type_url);
269 
270   // The owning RetryableCall<>.
271   RefCountedPtr<RetryableCall<AdsCallState>> parent_;
272 
273   bool sent_initial_message_ = false;
274   bool seen_response_ = false;
275 
276   // Always non-NULL.
277   grpc_call* call_;
278 
279   // recv_initial_metadata
280   grpc_metadata_array initial_metadata_recv_;
281 
282   // send_message
283   grpc_byte_buffer* send_message_payload_ = nullptr;
284   grpc_closure on_request_sent_;
285 
286   // recv_message
287   grpc_byte_buffer* recv_message_payload_ = nullptr;
288   grpc_closure on_response_received_;
289 
290   // recv_trailing_metadata
291   grpc_metadata_array trailing_metadata_recv_;
292   grpc_status_code status_code_;
293   grpc_slice status_details_;
294   grpc_closure on_status_received_;
295 
296   // Resource types for which requests need to be sent.
297   std::set<std::string /*type_url*/> buffered_requests_;
298 
299   // State for each resource type.
300   std::map<std::string /*type_url*/, ResourceTypeState> state_map_;
301 };
302 
303 // Contains an LRS call to the xds server.
304 class XdsClient::ChannelState::LrsCallState
305     : public InternallyRefCounted<LrsCallState> {
306  public:
307   // The ctor and dtor should not be used directly.
308   explicit LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent);
309   ~LrsCallState() override;
310 
311   void Orphan() override;
312 
313   void MaybeStartReportingLocked();
314 
parent()315   RetryableCall<LrsCallState>* parent() { return parent_.get(); }
chand() const316   ChannelState* chand() const { return parent_->chand(); }
xds_client() const317   XdsClient* xds_client() const { return chand()->xds_client(); }
seen_response() const318   bool seen_response() const { return seen_response_; }
319 
320  private:
321   // Reports client-side load stats according to a fixed interval.
322   class Reporter : public InternallyRefCounted<Reporter> {
323    public:
Reporter(RefCountedPtr<LrsCallState> parent,grpc_millis report_interval)324     Reporter(RefCountedPtr<LrsCallState> parent, grpc_millis report_interval)
325         : parent_(std::move(parent)), report_interval_(report_interval) {
326       GRPC_CLOSURE_INIT(&on_next_report_timer_, OnNextReportTimer, this,
327                         grpc_schedule_on_exec_ctx);
328       GRPC_CLOSURE_INIT(&on_report_done_, OnReportDone, this,
329                         grpc_schedule_on_exec_ctx);
330       ScheduleNextReportLocked();
331     }
332 
333     void Orphan() override;
334 
335    private:
336     void ScheduleNextReportLocked();
337     static void OnNextReportTimer(void* arg, grpc_error* error);
338     bool OnNextReportTimerLocked(grpc_error* error);
339     bool SendReportLocked();
340     static void OnReportDone(void* arg, grpc_error* error);
341     bool OnReportDoneLocked(grpc_error* error);
342 
IsCurrentReporterOnCall() const343     bool IsCurrentReporterOnCall() const {
344       return this == parent_->reporter_.get();
345     }
xds_client() const346     XdsClient* xds_client() const { return parent_->xds_client(); }
347 
348     // The owning LRS call.
349     RefCountedPtr<LrsCallState> parent_;
350 
351     // The load reporting state.
352     const grpc_millis report_interval_;
353     bool last_report_counters_were_zero_ = false;
354     bool next_report_timer_callback_pending_ = false;
355     grpc_timer next_report_timer_;
356     grpc_closure on_next_report_timer_;
357     grpc_closure on_report_done_;
358   };
359 
360   static void OnInitialRequestSent(void* arg, grpc_error* error);
361   void OnInitialRequestSentLocked();
362   static void OnResponseReceived(void* arg, grpc_error* error);
363   bool OnResponseReceivedLocked();
364   static void OnStatusReceived(void* arg, grpc_error* error);
365   void OnStatusReceivedLocked(grpc_error* error);
366 
367   bool IsCurrentCallOnChannel() const;
368 
369   // The owning RetryableCall<>.
370   RefCountedPtr<RetryableCall<LrsCallState>> parent_;
371   bool seen_response_ = false;
372 
373   // Always non-NULL.
374   grpc_call* call_;
375 
376   // recv_initial_metadata
377   grpc_metadata_array initial_metadata_recv_;
378 
379   // send_message
380   grpc_byte_buffer* send_message_payload_ = nullptr;
381   grpc_closure on_initial_request_sent_;
382 
383   // recv_message
384   grpc_byte_buffer* recv_message_payload_ = nullptr;
385   grpc_closure on_response_received_;
386 
387   // recv_trailing_metadata
388   grpc_metadata_array trailing_metadata_recv_;
389   grpc_status_code status_code_;
390   grpc_slice status_details_;
391   grpc_closure on_status_received_;
392 
393   // Load reporting state.
394   bool send_all_clusters_ = false;
395   std::set<std::string> cluster_names_;  // Asked for by the LRS server.
396   grpc_millis load_reporting_interval_ = 0;
397   OrphanablePtr<Reporter> reporter_;
398 };
399 
400 //
401 // XdsClient::ChannelState::StateWatcher
402 //
403 
404 class XdsClient::ChannelState::StateWatcher
405     : public AsyncConnectivityStateWatcherInterface {
406  public:
StateWatcher(RefCountedPtr<ChannelState> parent)407   explicit StateWatcher(RefCountedPtr<ChannelState> parent)
408       : parent_(std::move(parent)) {}
409 
410  private:
OnConnectivityStateChange(grpc_connectivity_state new_state,const absl::Status & status)411   void OnConnectivityStateChange(grpc_connectivity_state new_state,
412                                  const absl::Status& status) override {
413     MutexLock lock(&parent_->xds_client_->mu_);
414     if (!parent_->shutting_down_ &&
415         new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
416       // In TRANSIENT_FAILURE.  Notify all watchers of error.
417       gpr_log(GPR_INFO,
418               "[xds_client %p] xds channel in state:TRANSIENT_FAILURE "
419               "status_message:(%s)",
420               parent_->xds_client(), status.ToString().c_str());
421       parent_->xds_client()->NotifyOnErrorLocked(
422           GRPC_ERROR_CREATE_FROM_STATIC_STRING(
423               "xds channel in TRANSIENT_FAILURE"));
424     }
425   }
426 
427   RefCountedPtr<ChannelState> parent_;
428 };
429 
430 //
431 // XdsClient::ChannelState
432 //
433 
434 namespace {
435 
CreateXdsChannel(const XdsBootstrap::XdsServer & server)436 grpc_channel* CreateXdsChannel(const XdsBootstrap::XdsServer& server) {
437   // Build channel args.
438   absl::InlinedVector<grpc_arg, 2> args_to_add = {
439       grpc_channel_arg_integer_create(
440           const_cast<char*>(GRPC_ARG_KEEPALIVE_TIME_MS),
441           5 * 60 * GPR_MS_PER_SEC),
442       grpc_channel_arg_integer_create(
443           const_cast<char*>(GRPC_ARG_CHANNELZ_IS_INTERNAL_CHANNEL), 1),
444   };
445   grpc_channel_args* new_args = grpc_channel_args_copy_and_add(
446       g_channel_args, args_to_add.data(), args_to_add.size());
447   // Create channel creds.
448   RefCountedPtr<grpc_channel_credentials> channel_creds =
449       XdsChannelCredsRegistry::MakeChannelCreds(server.channel_creds_type,
450                                                 server.channel_creds_config);
451   // Create channel.
452   grpc_channel* channel = grpc_secure_channel_create(
453       channel_creds.get(), server.server_uri.c_str(), new_args, nullptr);
454   grpc_channel_args_destroy(new_args);
455   return channel;
456 }
457 
458 }  // namespace
459 
ChannelState(WeakRefCountedPtr<XdsClient> xds_client,const XdsBootstrap::XdsServer & server)460 XdsClient::ChannelState::ChannelState(WeakRefCountedPtr<XdsClient> xds_client,
461                                       const XdsBootstrap::XdsServer& server)
462     : InternallyRefCounted<ChannelState>(
463           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "ChannelState"
464                                                          : nullptr),
465       xds_client_(std::move(xds_client)),
466       server_(server) {
467   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
468     gpr_log(GPR_INFO, "[xds_client %p] creating channel to %s",
469             xds_client_.get(), server.server_uri.c_str());
470   }
471   channel_ = CreateXdsChannel(server);
472   GPR_ASSERT(channel_ != nullptr);
473   StartConnectivityWatchLocked();
474 }
475 
~ChannelState()476 XdsClient::ChannelState::~ChannelState() {
477   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
478     gpr_log(GPR_INFO, "[xds_client %p] Destroying xds channel %p", xds_client(),
479             this);
480   }
481   grpc_channel_destroy(channel_);
482   xds_client_.reset(DEBUG_LOCATION, "ChannelState");
483 }
484 
Orphan()485 void XdsClient::ChannelState::Orphan() {
486   shutting_down_ = true;
487   CancelConnectivityWatchLocked();
488   ads_calld_.reset();
489   lrs_calld_.reset();
490   Unref(DEBUG_LOCATION, "ChannelState+orphaned");
491 }
492 
ads_calld() const493 XdsClient::ChannelState::AdsCallState* XdsClient::ChannelState::ads_calld()
494     const {
495   return ads_calld_->calld();
496 }
497 
lrs_calld() const498 XdsClient::ChannelState::LrsCallState* XdsClient::ChannelState::lrs_calld()
499     const {
500   return lrs_calld_->calld();
501 }
502 
HasActiveAdsCall() const503 bool XdsClient::ChannelState::HasActiveAdsCall() const {
504   return ads_calld_->calld() != nullptr;
505 }
506 
MaybeStartLrsCall()507 void XdsClient::ChannelState::MaybeStartLrsCall() {
508   if (lrs_calld_ != nullptr) return;
509   lrs_calld_.reset(
510       new RetryableCall<LrsCallState>(Ref(DEBUG_LOCATION, "ChannelState+lrs")));
511 }
512 
StopLrsCall()513 void XdsClient::ChannelState::StopLrsCall() { lrs_calld_.reset(); }
514 
StartConnectivityWatchLocked()515 void XdsClient::ChannelState::StartConnectivityWatchLocked() {
516   grpc_channel_element* client_channel_elem =
517       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
518   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
519   watcher_ = new StateWatcher(Ref(DEBUG_LOCATION, "ChannelState+watch"));
520   grpc_client_channel_start_connectivity_watch(
521       client_channel_elem, GRPC_CHANNEL_IDLE,
522       OrphanablePtr<AsyncConnectivityStateWatcherInterface>(watcher_));
523 }
524 
CancelConnectivityWatchLocked()525 void XdsClient::ChannelState::CancelConnectivityWatchLocked() {
526   grpc_channel_element* client_channel_elem =
527       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
528   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
529   grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
530 }
531 
Subscribe(const std::string & type_url,const std::string & name)532 void XdsClient::ChannelState::Subscribe(const std::string& type_url,
533                                         const std::string& name) {
534   if (ads_calld_ == nullptr) {
535     // Start the ADS call if this is the first request.
536     ads_calld_.reset(new RetryableCall<AdsCallState>(
537         Ref(DEBUG_LOCATION, "ChannelState+ads")));
538     // Note: AdsCallState's ctor will automatically subscribe to all
539     // resources that the XdsClient already has watchers for, so we can
540     // return here.
541     return;
542   }
543   // If the ADS call is in backoff state, we don't need to do anything now
544   // because when the call is restarted it will resend all necessary requests.
545   if (ads_calld() == nullptr) return;
546   // Subscribe to this resource if the ADS call is active.
547   ads_calld()->Subscribe(type_url, name);
548 }
549 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)550 void XdsClient::ChannelState::Unsubscribe(const std::string& type_url,
551                                           const std::string& name,
552                                           bool delay_unsubscription) {
553   if (ads_calld_ != nullptr) {
554     auto* calld = ads_calld_->calld();
555     if (calld != nullptr) {
556       calld->Unsubscribe(type_url, name, delay_unsubscription);
557       if (!calld->HasSubscribedResources()) ads_calld_.reset();
558     }
559   }
560 }
561 
562 //
563 // XdsClient::ChannelState::RetryableCall<>
564 //
565 
566 template <typename T>
RetryableCall(RefCountedPtr<ChannelState> chand)567 XdsClient::ChannelState::RetryableCall<T>::RetryableCall(
568     RefCountedPtr<ChannelState> chand)
569     : chand_(std::move(chand)),
570       backoff_(
571           BackOff::Options()
572               .set_initial_backoff(GRPC_XDS_INITIAL_CONNECT_BACKOFF_SECONDS *
573                                    1000)
574               .set_multiplier(GRPC_XDS_RECONNECT_BACKOFF_MULTIPLIER)
575               .set_jitter(GRPC_XDS_RECONNECT_JITTER)
576               .set_max_backoff(GRPC_XDS_RECONNECT_MAX_BACKOFF_SECONDS * 1000)) {
577   // Closure Initialization
578   GRPC_CLOSURE_INIT(&on_retry_timer_, OnRetryTimer, this,
579                     grpc_schedule_on_exec_ctx);
580   StartNewCallLocked();
581 }
582 
583 template <typename T>
Orphan()584 void XdsClient::ChannelState::RetryableCall<T>::Orphan() {
585   shutting_down_ = true;
586   calld_.reset();
587   if (retry_timer_callback_pending_) grpc_timer_cancel(&retry_timer_);
588   this->Unref(DEBUG_LOCATION, "RetryableCall+orphaned");
589 }
590 
591 template <typename T>
OnCallFinishedLocked()592 void XdsClient::ChannelState::RetryableCall<T>::OnCallFinishedLocked() {
593   const bool seen_response = calld_->seen_response();
594   calld_.reset();
595   if (seen_response) {
596     // If we lost connection to the xds server, reset backoff and restart the
597     // call immediately.
598     backoff_.Reset();
599     StartNewCallLocked();
600   } else {
601     // If we failed to connect to the xds server, retry later.
602     StartRetryTimerLocked();
603   }
604 }
605 
606 template <typename T>
StartNewCallLocked()607 void XdsClient::ChannelState::RetryableCall<T>::StartNewCallLocked() {
608   if (shutting_down_) return;
609   GPR_ASSERT(chand_->channel_ != nullptr);
610   GPR_ASSERT(calld_ == nullptr);
611   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
612     gpr_log(GPR_INFO,
613             "[xds_client %p] Start new call from retryable call (chand: %p, "
614             "retryable call: %p)",
615             chand()->xds_client(), chand(), this);
616   }
617   calld_ = MakeOrphanable<T>(
618       this->Ref(DEBUG_LOCATION, "RetryableCall+start_new_call"));
619 }
620 
621 template <typename T>
StartRetryTimerLocked()622 void XdsClient::ChannelState::RetryableCall<T>::StartRetryTimerLocked() {
623   if (shutting_down_) return;
624   const grpc_millis next_attempt_time = backoff_.NextAttemptTime();
625   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
626     grpc_millis timeout = GPR_MAX(next_attempt_time - ExecCtx::Get()->Now(), 0);
627     gpr_log(GPR_INFO,
628             "[xds_client %p] Failed to connect to xds server (chand: %p) "
629             "retry timer will fire in %" PRId64 "ms.",
630             chand()->xds_client(), chand(), timeout);
631   }
632   this->Ref(DEBUG_LOCATION, "RetryableCall+retry_timer_start").release();
633   grpc_timer_init(&retry_timer_, next_attempt_time, &on_retry_timer_);
634   retry_timer_callback_pending_ = true;
635 }
636 
637 template <typename T>
OnRetryTimer(void * arg,grpc_error * error)638 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer(
639     void* arg, grpc_error* error) {
640   RetryableCall* calld = static_cast<RetryableCall*>(arg);
641   {
642     MutexLock lock(&calld->chand_->xds_client()->mu_);
643     calld->OnRetryTimerLocked(GRPC_ERROR_REF(error));
644   }
645   calld->Unref(DEBUG_LOCATION, "RetryableCall+retry_timer_done");
646 }
647 
648 template <typename T>
OnRetryTimerLocked(grpc_error * error)649 void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimerLocked(
650     grpc_error* error) {
651   retry_timer_callback_pending_ = false;
652   if (!shutting_down_ && error == GRPC_ERROR_NONE) {
653     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
654       gpr_log(
655           GPR_INFO,
656           "[xds_client %p] Retry timer fires (chand: %p, retryable call: %p)",
657           chand()->xds_client(), chand(), this);
658     }
659     StartNewCallLocked();
660   }
661   GRPC_ERROR_UNREF(error);
662 }
663 
664 //
665 // XdsClient::ChannelState::AdsCallState
666 //
667 
AdsCallState(RefCountedPtr<RetryableCall<AdsCallState>> parent)668 XdsClient::ChannelState::AdsCallState::AdsCallState(
669     RefCountedPtr<RetryableCall<AdsCallState>> parent)
670     : InternallyRefCounted<AdsCallState>(
671           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "AdsCallState"
672                                                          : nullptr),
673       parent_(std::move(parent)) {
674   // Init the ADS call. Note that the call will progress every time there's
675   // activity in xds_client()->interested_parties_, which is comprised of
676   // the polling entities from client_channel.
677   GPR_ASSERT(xds_client() != nullptr);
678   // Create a call with the specified method name.
679   const auto& method =
680       chand()->server_.ShouldUseV3()
681           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V3_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES
682           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_DISCOVERY_DOT_V2_DOT_AGGREGATEDDISCOVERYSERVICE_SLASH_STREAMAGGREGATEDRESOURCES;
683   call_ = grpc_channel_create_pollset_set_call(
684       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
685       xds_client()->interested_parties_, method, nullptr,
686       GRPC_MILLIS_INF_FUTURE, nullptr);
687   GPR_ASSERT(call_ != nullptr);
688   // Init data associated with the call.
689   grpc_metadata_array_init(&initial_metadata_recv_);
690   grpc_metadata_array_init(&trailing_metadata_recv_);
691   // Start the call.
692   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
693     gpr_log(GPR_INFO,
694             "[xds_client %p] Starting ADS call (chand: %p, calld: %p, "
695             "call: %p)",
696             xds_client(), chand(), this, call_);
697   }
698   // Create the ops.
699   grpc_call_error call_error;
700   grpc_op ops[3];
701   memset(ops, 0, sizeof(ops));
702   // Op: send initial metadata.
703   grpc_op* op = ops;
704   op->op = GRPC_OP_SEND_INITIAL_METADATA;
705   op->data.send_initial_metadata.count = 0;
706   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
707               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
708   op->reserved = nullptr;
709   op++;
710   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
711                                                  nullptr);
712   GPR_ASSERT(GRPC_CALL_OK == call_error);
713   // Op: send request message.
714   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
715                     grpc_schedule_on_exec_ctx);
716   for (const auto& p : xds_client()->listener_map_) {
717     Subscribe(XdsApi::kLdsTypeUrl, std::string(p.first));
718   }
719   for (const auto& p : xds_client()->route_config_map_) {
720     Subscribe(XdsApi::kRdsTypeUrl, std::string(p.first));
721   }
722   for (const auto& p : xds_client()->cluster_map_) {
723     Subscribe(XdsApi::kCdsTypeUrl, std::string(p.first));
724   }
725   for (const auto& p : xds_client()->endpoint_map_) {
726     Subscribe(XdsApi::kEdsTypeUrl, std::string(p.first));
727   }
728   // Op: recv initial metadata.
729   op = ops;
730   op->op = GRPC_OP_RECV_INITIAL_METADATA;
731   op->data.recv_initial_metadata.recv_initial_metadata =
732       &initial_metadata_recv_;
733   op->flags = 0;
734   op->reserved = nullptr;
735   op++;
736   // Op: recv response.
737   op->op = GRPC_OP_RECV_MESSAGE;
738   op->data.recv_message.recv_message = &recv_message_payload_;
739   op->flags = 0;
740   op->reserved = nullptr;
741   op++;
742   Ref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked").release();
743   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
744                     grpc_schedule_on_exec_ctx);
745   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
746                                                  &on_response_received_);
747   GPR_ASSERT(GRPC_CALL_OK == call_error);
748   // Op: recv server status.
749   op = ops;
750   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
751   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
752   op->data.recv_status_on_client.status = &status_code_;
753   op->data.recv_status_on_client.status_details = &status_details_;
754   op->flags = 0;
755   op->reserved = nullptr;
756   op++;
757   // This callback signals the end of the call, so it relies on the initial
758   // ref instead of a new ref. When it's invoked, it's the initial ref that is
759   // unreffed.
760   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
761                     grpc_schedule_on_exec_ctx);
762   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
763                                                  &on_status_received_);
764   GPR_ASSERT(GRPC_CALL_OK == call_error);
765 }
766 
~AdsCallState()767 XdsClient::ChannelState::AdsCallState::~AdsCallState() {
768   grpc_metadata_array_destroy(&initial_metadata_recv_);
769   grpc_metadata_array_destroy(&trailing_metadata_recv_);
770   grpc_byte_buffer_destroy(send_message_payload_);
771   grpc_byte_buffer_destroy(recv_message_payload_);
772   grpc_slice_unref_internal(status_details_);
773   GPR_ASSERT(call_ != nullptr);
774   grpc_call_unref(call_);
775 }
776 
Orphan()777 void XdsClient::ChannelState::AdsCallState::Orphan() {
778   GPR_ASSERT(call_ != nullptr);
779   // If we are here because xds_client wants to cancel the call,
780   // on_status_received_ will complete the cancellation and clean up. Otherwise,
781   // we are here because xds_client has to orphan a failed call, then the
782   // following cancellation will be a no-op.
783   grpc_call_cancel_internal(call_);
784   state_map_.clear();
785   // Note that the initial ref is hold by on_status_received_. So the
786   // corresponding unref happens in on_status_received_ instead of here.
787 }
788 
SendMessageLocked(const std::string & type_url)789 void XdsClient::ChannelState::AdsCallState::SendMessageLocked(
790     const std::string& type_url) {
791   // Buffer message sending if an existing message is in flight.
792   if (send_message_payload_ != nullptr) {
793     buffered_requests_.insert(type_url);
794     return;
795   }
796   auto& state = state_map_[type_url];
797   grpc_slice request_payload_slice;
798   std::set<absl::string_view> resource_names =
799       ResourceNamesForRequest(type_url);
800   request_payload_slice = xds_client()->api_.CreateAdsRequest(
801       chand()->server_, type_url, resource_names,
802       xds_client()->resource_version_map_[type_url], state.nonce,
803       GRPC_ERROR_REF(state.error), !sent_initial_message_);
804   if (type_url != XdsApi::kLdsTypeUrl && type_url != XdsApi::kRdsTypeUrl &&
805       type_url != XdsApi::kCdsTypeUrl && type_url != XdsApi::kEdsTypeUrl) {
806     state_map_.erase(type_url);
807   }
808   sent_initial_message_ = true;
809   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
810     gpr_log(GPR_INFO,
811             "[xds_client %p] sending ADS request: type=%s version=%s nonce=%s "
812             "error=%s resources=%s",
813             xds_client(), type_url.c_str(),
814             xds_client()->resource_version_map_[type_url].c_str(),
815             state.nonce.c_str(), grpc_error_string(state.error),
816             absl::StrJoin(resource_names, " ").c_str());
817   }
818   GRPC_ERROR_UNREF(state.error);
819   state.error = GRPC_ERROR_NONE;
820   // Create message payload.
821   send_message_payload_ =
822       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
823   grpc_slice_unref_internal(request_payload_slice);
824   // Send the message.
825   grpc_op op;
826   memset(&op, 0, sizeof(op));
827   op.op = GRPC_OP_SEND_MESSAGE;
828   op.data.send_message.send_message = send_message_payload_;
829   Ref(DEBUG_LOCATION, "ADS+OnRequestSentLocked").release();
830   GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this,
831                     grpc_schedule_on_exec_ctx);
832   grpc_call_error call_error =
833       grpc_call_start_batch_and_execute(call_, &op, 1, &on_request_sent_);
834   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
835     gpr_log(GPR_ERROR,
836             "[xds_client %p] calld=%p call_error=%d sending ADS message",
837             xds_client(), this, call_error);
838     GPR_ASSERT(GRPC_CALL_OK == call_error);
839   }
840 }
841 
Subscribe(const std::string & type_url,const std::string & name)842 void XdsClient::ChannelState::AdsCallState::Subscribe(
843     const std::string& type_url, const std::string& name) {
844   auto& state = state_map_[type_url].subscribed_resources[name];
845   if (state == nullptr) {
846     state = MakeOrphanable<ResourceState>(
847         type_url, name, !xds_client()->resource_version_map_[type_url].empty());
848     SendMessageLocked(type_url);
849   }
850 }
851 
Unsubscribe(const std::string & type_url,const std::string & name,bool delay_unsubscription)852 void XdsClient::ChannelState::AdsCallState::Unsubscribe(
853     const std::string& type_url, const std::string& name,
854     bool delay_unsubscription) {
855   state_map_[type_url].subscribed_resources.erase(name);
856   if (!delay_unsubscription) SendMessageLocked(type_url);
857 }
858 
HasSubscribedResources() const859 bool XdsClient::ChannelState::AdsCallState::HasSubscribedResources() const {
860   for (const auto& p : state_map_) {
861     if (!p.second.subscribed_resources.empty()) return true;
862   }
863   return false;
864 }
865 
AcceptLdsUpdate(XdsApi::LdsUpdateMap lds_update_map)866 void XdsClient::ChannelState::AdsCallState::AcceptLdsUpdate(
867     XdsApi::LdsUpdateMap lds_update_map) {
868   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
869     gpr_log(GPR_INFO,
870             "[xds_client %p] LDS update received containing %" PRIuPTR
871             " resources",
872             xds_client(), lds_update_map.size());
873   }
874   auto& lds_state = state_map_[XdsApi::kLdsTypeUrl];
875   std::set<std::string> rds_resource_names_seen;
876   for (auto& p : lds_update_map) {
877     const std::string& listener_name = p.first;
878     XdsApi::LdsUpdate& lds_update = p.second;
879     auto& state = lds_state.subscribed_resources[listener_name];
880     if (state != nullptr) state->Finish();
881     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
882       gpr_log(GPR_INFO, "[xds_client %p] LDS resource %s: route_config_name=%s",
883               xds_client(), listener_name.c_str(),
884               (!lds_update.route_config_name.empty()
885                    ? lds_update.route_config_name.c_str()
886                    : "<inlined>"));
887       if (lds_update.rds_update.has_value()) {
888         gpr_log(GPR_INFO, "RouteConfiguration: %s",
889                 lds_update.rds_update->ToString().c_str());
890       }
891     }
892     // Record the RDS resource names seen.
893     if (!lds_update.route_config_name.empty()) {
894       rds_resource_names_seen.insert(lds_update.route_config_name);
895     }
896     // Ignore identical update.
897     ListenerState& listener_state = xds_client()->listener_map_[listener_name];
898     if (listener_state.update.has_value() &&
899         *listener_state.update == lds_update) {
900       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
901         gpr_log(GPR_INFO,
902                 "[xds_client %p] LDS update for %s identical to current, "
903                 "ignoring.",
904                 xds_client(), listener_name.c_str());
905       }
906       continue;
907     }
908     // Update the listener state.
909     listener_state.update = std::move(lds_update);
910     // Notify watchers.
911     for (const auto& p : listener_state.watchers) {
912       p.first->OnListenerChanged(*listener_state.update);
913     }
914   }
915   // For any subscribed resource that is not present in the update,
916   // remove it from the cache and notify watchers that it does not exist.
917   for (const auto& p : lds_state.subscribed_resources) {
918     const std::string& listener_name = p.first;
919     if (lds_update_map.find(listener_name) == lds_update_map.end()) {
920       ListenerState& listener_state =
921           xds_client()->listener_map_[listener_name];
922       // If the resource was newly requested but has not yet been received,
923       // we don't want to generate an error for the watchers, because this LDS
924       // response may be in reaction to an earlier request that did not yet
925       // request the new resource, so its absence from the response does not
926       // necessarily indicate that the resource does not exist.
927       // For that case, we rely on the request timeout instead.
928       if (!listener_state.update.has_value()) continue;
929       listener_state.update.reset();
930       for (const auto& p : listener_state.watchers) {
931         p.first->OnResourceDoesNotExist();
932       }
933     }
934   }
935   // For any RDS resource that is no longer referred to by any LDS
936   // resources, remove it from the cache and notify watchers that it
937   // does not exist.
938   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
939   for (const auto& p : rds_state.subscribed_resources) {
940     const std::string& rds_resource_name = p.first;
941     if (rds_resource_names_seen.find(rds_resource_name) ==
942         rds_resource_names_seen.end()) {
943       RouteConfigState& route_config_state =
944           xds_client()->route_config_map_[rds_resource_name];
945       route_config_state.update.reset();
946       for (const auto& p : route_config_state.watchers) {
947         p.first->OnResourceDoesNotExist();
948       }
949     }
950   }
951 }
952 
AcceptRdsUpdate(XdsApi::RdsUpdateMap rds_update_map)953 void XdsClient::ChannelState::AdsCallState::AcceptRdsUpdate(
954     XdsApi::RdsUpdateMap rds_update_map) {
955   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
956     gpr_log(GPR_INFO,
957             "[xds_client %p] RDS update received containing %" PRIuPTR
958             " resources",
959             xds_client(), rds_update_map.size());
960   }
961   auto& rds_state = state_map_[XdsApi::kRdsTypeUrl];
962   for (auto& p : rds_update_map) {
963     const std::string& route_config_name = p.first;
964     XdsApi::RdsUpdate& rds_update = p.second;
965     auto& state = rds_state.subscribed_resources[route_config_name];
966     if (state != nullptr) state->Finish();
967     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
968       gpr_log(GPR_INFO, "[xds_client %p] RDS resource:\n%s", xds_client(),
969               rds_update.ToString().c_str());
970     }
971     RouteConfigState& route_config_state =
972         xds_client()->route_config_map_[route_config_name];
973     // Ignore identical update.
974     if (route_config_state.update.has_value() &&
975         *route_config_state.update == rds_update) {
976       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
977         gpr_log(GPR_INFO,
978                 "[xds_client %p] RDS resource identical to current, ignoring",
979                 xds_client());
980       }
981       continue;
982     }
983     // Update the cache.
984     route_config_state.update = std::move(rds_update);
985     // Notify all watchers.
986     for (const auto& p : route_config_state.watchers) {
987       p.first->OnRouteConfigChanged(*route_config_state.update);
988     }
989   }
990 }
991 
AcceptCdsUpdate(XdsApi::CdsUpdateMap cds_update_map)992 void XdsClient::ChannelState::AdsCallState::AcceptCdsUpdate(
993     XdsApi::CdsUpdateMap cds_update_map) {
994   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
995     gpr_log(GPR_INFO,
996             "[xds_client %p] CDS update received containing %" PRIuPTR
997             " resources",
998             xds_client(), cds_update_map.size());
999   }
1000   auto& cds_state = state_map_[XdsApi::kCdsTypeUrl];
1001   std::set<std::string> eds_resource_names_seen;
1002   for (auto& p : cds_update_map) {
1003     const char* cluster_name = p.first.c_str();
1004     XdsApi::CdsUpdate& cds_update = p.second;
1005     auto& state = cds_state.subscribed_resources[cluster_name];
1006     if (state != nullptr) state->Finish();
1007     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1008       gpr_log(GPR_INFO,
1009               "[xds_client %p] cluster=%s: eds_service_name=%s, "
1010               "lrs_load_reporting_server_name=%s",
1011               xds_client(), cluster_name, cds_update.eds_service_name.c_str(),
1012               cds_update.lrs_load_reporting_server_name.has_value()
1013                   ? cds_update.lrs_load_reporting_server_name.value().c_str()
1014                   : "(N/A)");
1015     }
1016     // Record the EDS resource names seen.
1017     eds_resource_names_seen.insert(cds_update.eds_service_name.empty()
1018                                        ? cluster_name
1019                                        : cds_update.eds_service_name);
1020     // Ignore identical update.
1021     ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1022     if (cluster_state.update.has_value() &&
1023         *cluster_state.update == cds_update) {
1024       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1025         gpr_log(GPR_INFO,
1026                 "[xds_client %p] CDS update identical to current, ignoring.",
1027                 xds_client());
1028       }
1029       continue;
1030     }
1031     // Update the cluster state.
1032     cluster_state.update = std::move(cds_update);
1033     // Notify all watchers.
1034     for (const auto& p : cluster_state.watchers) {
1035       p.first->OnClusterChanged(cluster_state.update.value());
1036     }
1037   }
1038   // For any subscribed resource that is not present in the update,
1039   // remove it from the cache and notify watchers that it does not exist.
1040   for (const auto& p : cds_state.subscribed_resources) {
1041     const std::string& cluster_name = p.first;
1042     if (cds_update_map.find(cluster_name) == cds_update_map.end()) {
1043       ClusterState& cluster_state = xds_client()->cluster_map_[cluster_name];
1044       // If the resource was newly requested but has not yet been received,
1045       // we don't want to generate an error for the watchers, because this CDS
1046       // response may be in reaction to an earlier request that did not yet
1047       // request the new resource, so its absence from the response does not
1048       // necessarily indicate that the resource does not exist.
1049       // For that case, we rely on the request timeout instead.
1050       if (!cluster_state.update.has_value()) continue;
1051       cluster_state.update.reset();
1052       for (const auto& p : cluster_state.watchers) {
1053         p.first->OnResourceDoesNotExist();
1054       }
1055     }
1056   }
1057   // For any EDS resource that is no longer referred to by any CDS
1058   // resources, remove it from the cache and notify watchers that it
1059   // does not exist.
1060   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1061   for (const auto& p : eds_state.subscribed_resources) {
1062     const std::string& eds_resource_name = p.first;
1063     if (eds_resource_names_seen.find(eds_resource_name) ==
1064         eds_resource_names_seen.end()) {
1065       EndpointState& endpoint_state =
1066           xds_client()->endpoint_map_[eds_resource_name];
1067       endpoint_state.update.reset();
1068       for (const auto& p : endpoint_state.watchers) {
1069         p.first->OnResourceDoesNotExist();
1070       }
1071     }
1072   }
1073 }
1074 
AcceptEdsUpdate(XdsApi::EdsUpdateMap eds_update_map)1075 void XdsClient::ChannelState::AdsCallState::AcceptEdsUpdate(
1076     XdsApi::EdsUpdateMap eds_update_map) {
1077   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1078     gpr_log(GPR_INFO,
1079             "[xds_client %p] EDS update received containing %" PRIuPTR
1080             " resources",
1081             xds_client(), eds_update_map.size());
1082   }
1083   auto& eds_state = state_map_[XdsApi::kEdsTypeUrl];
1084   for (auto& p : eds_update_map) {
1085     const char* eds_service_name = p.first.c_str();
1086     XdsApi::EdsUpdate& eds_update = p.second;
1087     auto& state = eds_state.subscribed_resources[eds_service_name];
1088     if (state != nullptr) state->Finish();
1089     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1090       gpr_log(GPR_INFO, "[xds_client %p] EDS resource %s: %s", xds_client(),
1091               eds_service_name, eds_update.ToString().c_str());
1092     }
1093     EndpointState& endpoint_state =
1094         xds_client()->endpoint_map_[eds_service_name];
1095     // Ignore identical update.
1096     if (endpoint_state.update.has_value() &&
1097         *endpoint_state.update == eds_update) {
1098       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1099         gpr_log(GPR_INFO,
1100                 "[xds_client %p] EDS update identical to current, ignoring.",
1101                 xds_client());
1102       }
1103       continue;
1104     }
1105     // Update the cluster state.
1106     endpoint_state.update = std::move(eds_update);
1107     // Notify all watchers.
1108     for (const auto& p : endpoint_state.watchers) {
1109       p.first->OnEndpointChanged(endpoint_state.update.value());
1110     }
1111   }
1112 }
1113 
OnRequestSent(void * arg,grpc_error * error)1114 void XdsClient::ChannelState::AdsCallState::OnRequestSent(void* arg,
1115                                                           grpc_error* error) {
1116   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1117   {
1118     MutexLock lock(&ads_calld->xds_client()->mu_);
1119     ads_calld->OnRequestSentLocked(GRPC_ERROR_REF(error));
1120   }
1121   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnRequestSentLocked");
1122 }
1123 
OnRequestSentLocked(grpc_error * error)1124 void XdsClient::ChannelState::AdsCallState::OnRequestSentLocked(
1125     grpc_error* error) {
1126   if (IsCurrentCallOnChannel() && error == GRPC_ERROR_NONE) {
1127     // Clean up the sent message.
1128     grpc_byte_buffer_destroy(send_message_payload_);
1129     send_message_payload_ = nullptr;
1130     // Continue to send another pending message if any.
1131     // TODO(roth): The current code to handle buffered messages has the
1132     // advantage of sending only the most recent list of resource names for
1133     // each resource type (no matter how many times that resource type has
1134     // been requested to send while the current message sending is still
1135     // pending). But its disadvantage is that we send the requests in fixed
1136     // order of resource types. We need to fix this if we are seeing some
1137     // resource type(s) starved due to frequent requests of other resource
1138     // type(s).
1139     auto it = buffered_requests_.begin();
1140     if (it != buffered_requests_.end()) {
1141       SendMessageLocked(*it);
1142       buffered_requests_.erase(it);
1143     }
1144   }
1145   GRPC_ERROR_UNREF(error);
1146 }
1147 
OnResponseReceived(void * arg,grpc_error *)1148 void XdsClient::ChannelState::AdsCallState::OnResponseReceived(
1149     void* arg, grpc_error* /* error */) {
1150   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1151   bool done;
1152   {
1153     MutexLock lock(&ads_calld->xds_client()->mu_);
1154     done = ads_calld->OnResponseReceivedLocked();
1155   }
1156   if (done) ads_calld->Unref(DEBUG_LOCATION, "ADS+OnResponseReceivedLocked");
1157 }
1158 
OnResponseReceivedLocked()1159 bool XdsClient::ChannelState::AdsCallState::OnResponseReceivedLocked() {
1160   // Empty payload means the call was cancelled.
1161   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1162     return true;
1163   }
1164   // Read the response.
1165   grpc_byte_buffer_reader bbr;
1166   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1167   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1168   grpc_byte_buffer_reader_destroy(&bbr);
1169   grpc_byte_buffer_destroy(recv_message_payload_);
1170   recv_message_payload_ = nullptr;
1171   // Parse and validate the response.
1172   XdsApi::AdsParseResult result = xds_client()->api_.ParseAdsResponse(
1173       response_slice, ResourceNamesForRequest(XdsApi::kLdsTypeUrl),
1174       ResourceNamesForRequest(XdsApi::kRdsTypeUrl),
1175       ResourceNamesForRequest(XdsApi::kCdsTypeUrl),
1176       ResourceNamesForRequest(XdsApi::kEdsTypeUrl));
1177   grpc_slice_unref_internal(response_slice);
1178   if (result.type_url.empty()) {
1179     // Ignore unparsable response.
1180     gpr_log(GPR_ERROR,
1181             "[xds_client %p] Error parsing ADS response (%s) -- ignoring",
1182             xds_client(), grpc_error_string(result.parse_error));
1183     GRPC_ERROR_UNREF(result.parse_error);
1184   } else {
1185     // Update nonce.
1186     auto& state = state_map_[result.type_url];
1187     state.nonce = std::move(result.nonce);
1188     // NACK or ACK the response.
1189     if (result.parse_error != GRPC_ERROR_NONE) {
1190       GRPC_ERROR_UNREF(state.error);
1191       state.error = result.parse_error;
1192       // NACK unacceptable update.
1193       gpr_log(GPR_ERROR,
1194               "[xds_client %p] ADS response invalid for resource type %s "
1195               "version %s, will NACK: nonce=%s error=%s",
1196               xds_client(), result.type_url.c_str(), result.version.c_str(),
1197               state.nonce.c_str(), grpc_error_string(result.parse_error));
1198       SendMessageLocked(result.type_url);
1199     } else {
1200       seen_response_ = true;
1201       // Accept the ADS response according to the type_url.
1202       if (result.type_url == XdsApi::kLdsTypeUrl) {
1203         AcceptLdsUpdate(std::move(result.lds_update_map));
1204       } else if (result.type_url == XdsApi::kRdsTypeUrl) {
1205         AcceptRdsUpdate(std::move(result.rds_update_map));
1206       } else if (result.type_url == XdsApi::kCdsTypeUrl) {
1207         AcceptCdsUpdate(std::move(result.cds_update_map));
1208       } else if (result.type_url == XdsApi::kEdsTypeUrl) {
1209         AcceptEdsUpdate(std::move(result.eds_update_map));
1210       }
1211       xds_client()->resource_version_map_[result.type_url] =
1212           std::move(result.version);
1213       // ACK the update.
1214       SendMessageLocked(result.type_url);
1215       // Start load reporting if needed.
1216       auto& lrs_call = chand()->lrs_calld_;
1217       if (lrs_call != nullptr) {
1218         LrsCallState* lrs_calld = lrs_call->calld();
1219         if (lrs_calld != nullptr) lrs_calld->MaybeStartReportingLocked();
1220       }
1221     }
1222   }
1223   if (xds_client()->shutting_down_) return true;
1224   // Keep listening for updates.
1225   grpc_op op;
1226   memset(&op, 0, sizeof(op));
1227   op.op = GRPC_OP_RECV_MESSAGE;
1228   op.data.recv_message.recv_message = &recv_message_payload_;
1229   op.flags = 0;
1230   op.reserved = nullptr;
1231   GPR_ASSERT(call_ != nullptr);
1232   // Reuse the "ADS+OnResponseReceivedLocked" ref taken in ctor.
1233   const grpc_call_error call_error =
1234       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1235   GPR_ASSERT(GRPC_CALL_OK == call_error);
1236   return false;
1237 }
1238 
OnStatusReceived(void * arg,grpc_error * error)1239 void XdsClient::ChannelState::AdsCallState::OnStatusReceived(
1240     void* arg, grpc_error* error) {
1241   AdsCallState* ads_calld = static_cast<AdsCallState*>(arg);
1242   {
1243     MutexLock lock(&ads_calld->xds_client()->mu_);
1244     ads_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1245   }
1246   ads_calld->Unref(DEBUG_LOCATION, "ADS+OnStatusReceivedLocked");
1247 }
1248 
OnStatusReceivedLocked(grpc_error * error)1249 void XdsClient::ChannelState::AdsCallState::OnStatusReceivedLocked(
1250     grpc_error* error) {
1251   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1252     char* status_details = grpc_slice_to_c_string(status_details_);
1253     gpr_log(GPR_INFO,
1254             "[xds_client %p] ADS call status received. Status = %d, details "
1255             "= '%s', (chand: %p, ads_calld: %p, call: %p), error '%s'",
1256             xds_client(), status_code_, status_details, chand(), this, call_,
1257             grpc_error_string(error));
1258     gpr_free(status_details);
1259   }
1260   // Ignore status from a stale call.
1261   if (IsCurrentCallOnChannel()) {
1262     // Try to restart the call.
1263     parent_->OnCallFinishedLocked();
1264     // Send error to all watchers.
1265     xds_client()->NotifyOnErrorLocked(
1266         GRPC_ERROR_CREATE_FROM_STATIC_STRING("xds call failed"));
1267   }
1268   GRPC_ERROR_UNREF(error);
1269 }
1270 
IsCurrentCallOnChannel() const1271 bool XdsClient::ChannelState::AdsCallState::IsCurrentCallOnChannel() const {
1272   // If the retryable ADS call is null (which only happens when the xds channel
1273   // is shutting down), all the ADS calls are stale.
1274   if (chand()->ads_calld_ == nullptr) return false;
1275   return this == chand()->ads_calld_->calld();
1276 }
1277 
1278 std::set<absl::string_view>
ResourceNamesForRequest(const std::string & type_url)1279 XdsClient::ChannelState::AdsCallState::ResourceNamesForRequest(
1280     const std::string& type_url) {
1281   std::set<absl::string_view> resource_names;
1282   auto it = state_map_.find(type_url);
1283   if (it != state_map_.end()) {
1284     for (auto& p : it->second.subscribed_resources) {
1285       resource_names.insert(p.first);
1286       OrphanablePtr<ResourceState>& state = p.second;
1287       state->Start(Ref(DEBUG_LOCATION, "ResourceState"));
1288     }
1289   }
1290   return resource_names;
1291 }
1292 
1293 //
1294 // XdsClient::ChannelState::LrsCallState::Reporter
1295 //
1296 
Orphan()1297 void XdsClient::ChannelState::LrsCallState::Reporter::Orphan() {
1298   if (next_report_timer_callback_pending_) {
1299     grpc_timer_cancel(&next_report_timer_);
1300   }
1301 }
1302 
1303 void XdsClient::ChannelState::LrsCallState::Reporter::
ScheduleNextReportLocked()1304     ScheduleNextReportLocked() {
1305   const grpc_millis next_report_time = ExecCtx::Get()->Now() + report_interval_;
1306   grpc_timer_init(&next_report_timer_, next_report_time,
1307                   &on_next_report_timer_);
1308   next_report_timer_callback_pending_ = true;
1309 }
1310 
OnNextReportTimer(void * arg,grpc_error * error)1311 void XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimer(
1312     void* arg, grpc_error* error) {
1313   Reporter* self = static_cast<Reporter*>(arg);
1314   bool done;
1315   {
1316     MutexLock lock(&self->xds_client()->mu_);
1317     done = self->OnNextReportTimerLocked(GRPC_ERROR_REF(error));
1318   }
1319   if (done) self->Unref(DEBUG_LOCATION, "Reporter+timer");
1320 }
1321 
OnNextReportTimerLocked(grpc_error * error)1322 bool XdsClient::ChannelState::LrsCallState::Reporter::OnNextReportTimerLocked(
1323     grpc_error* error) {
1324   next_report_timer_callback_pending_ = false;
1325   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1326     GRPC_ERROR_UNREF(error);
1327     return true;
1328   }
1329   return SendReportLocked();
1330 }
1331 
1332 namespace {
1333 
LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap & snapshot)1334 bool LoadReportCountersAreZero(const XdsApi::ClusterLoadReportMap& snapshot) {
1335   for (const auto& p : snapshot) {
1336     const XdsApi::ClusterLoadReport& cluster_snapshot = p.second;
1337     if (!cluster_snapshot.dropped_requests.IsZero()) return false;
1338     for (const auto& q : cluster_snapshot.locality_stats) {
1339       const XdsClusterLocalityStats::Snapshot& locality_snapshot = q.second;
1340       if (!locality_snapshot.IsZero()) return false;
1341     }
1342   }
1343   return true;
1344 }
1345 
1346 }  // namespace
1347 
SendReportLocked()1348 bool XdsClient::ChannelState::LrsCallState::Reporter::SendReportLocked() {
1349   // Construct snapshot from all reported stats.
1350   XdsApi::ClusterLoadReportMap snapshot =
1351       xds_client()->BuildLoadReportSnapshotLocked(parent_->send_all_clusters_,
1352                                                   parent_->cluster_names_);
1353   // Skip client load report if the counters were all zero in the last
1354   // report and they are still zero in this one.
1355   const bool old_val = last_report_counters_were_zero_;
1356   last_report_counters_were_zero_ = LoadReportCountersAreZero(snapshot);
1357   if (old_val && last_report_counters_were_zero_) {
1358     if (xds_client()->load_report_map_.empty()) {
1359       parent_->chand()->StopLrsCall();
1360       return true;
1361     }
1362     ScheduleNextReportLocked();
1363     return false;
1364   }
1365   // Create a request that contains the snapshot.
1366   grpc_slice request_payload_slice =
1367       xds_client()->api_.CreateLrsRequest(std::move(snapshot));
1368   parent_->send_message_payload_ =
1369       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1370   grpc_slice_unref_internal(request_payload_slice);
1371   // Send the report.
1372   grpc_op op;
1373   memset(&op, 0, sizeof(op));
1374   op.op = GRPC_OP_SEND_MESSAGE;
1375   op.data.send_message.send_message = parent_->send_message_payload_;
1376   grpc_call_error call_error = grpc_call_start_batch_and_execute(
1377       parent_->call_, &op, 1, &on_report_done_);
1378   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
1379     gpr_log(GPR_ERROR,
1380             "[xds_client %p] calld=%p call_error=%d sending client load report",
1381             xds_client(), this, call_error);
1382     GPR_ASSERT(GRPC_CALL_OK == call_error);
1383   }
1384   return false;
1385 }
1386 
OnReportDone(void * arg,grpc_error * error)1387 void XdsClient::ChannelState::LrsCallState::Reporter::OnReportDone(
1388     void* arg, grpc_error* error) {
1389   Reporter* self = static_cast<Reporter*>(arg);
1390   bool done;
1391   {
1392     MutexLock lock(&self->xds_client()->mu_);
1393     done = self->OnReportDoneLocked(GRPC_ERROR_REF(error));
1394   }
1395   if (done) self->Unref(DEBUG_LOCATION, "Reporter+report_done");
1396 }
1397 
OnReportDoneLocked(grpc_error * error)1398 bool XdsClient::ChannelState::LrsCallState::Reporter::OnReportDoneLocked(
1399     grpc_error* error) {
1400   grpc_byte_buffer_destroy(parent_->send_message_payload_);
1401   parent_->send_message_payload_ = nullptr;
1402   // If there are no more registered stats to report, cancel the call.
1403   if (xds_client()->load_report_map_.empty()) {
1404     parent_->chand()->StopLrsCall();
1405     GRPC_ERROR_UNREF(error);
1406     return true;
1407   }
1408   if (error != GRPC_ERROR_NONE || !IsCurrentReporterOnCall()) {
1409     GRPC_ERROR_UNREF(error);
1410     // If this reporter is no longer the current one on the call, the reason
1411     // might be that it was orphaned for a new one due to config update.
1412     if (!IsCurrentReporterOnCall()) {
1413       parent_->MaybeStartReportingLocked();
1414     }
1415     return true;
1416   }
1417   ScheduleNextReportLocked();
1418   return false;
1419 }
1420 
1421 //
1422 // XdsClient::ChannelState::LrsCallState
1423 //
1424 
LrsCallState(RefCountedPtr<RetryableCall<LrsCallState>> parent)1425 XdsClient::ChannelState::LrsCallState::LrsCallState(
1426     RefCountedPtr<RetryableCall<LrsCallState>> parent)
1427     : InternallyRefCounted<LrsCallState>(
1428           GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace) ? "LrsCallState"
1429                                                          : nullptr),
1430       parent_(std::move(parent)) {
1431   // Init the LRS call. Note that the call will progress every time there's
1432   // activity in xds_client()->interested_parties_, which is comprised of
1433   // the polling entities from client_channel.
1434   GPR_ASSERT(xds_client() != nullptr);
1435   const auto& method =
1436       chand()->server_.ShouldUseV3()
1437           ? GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V3_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS
1438           : GRPC_MDSTR_SLASH_ENVOY_DOT_SERVICE_DOT_LOAD_STATS_DOT_V2_DOT_LOADREPORTINGSERVICE_SLASH_STREAMLOADSTATS;
1439   call_ = grpc_channel_create_pollset_set_call(
1440       chand()->channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
1441       xds_client()->interested_parties_, method, nullptr,
1442       GRPC_MILLIS_INF_FUTURE, nullptr);
1443   GPR_ASSERT(call_ != nullptr);
1444   // Init the request payload.
1445   grpc_slice request_payload_slice =
1446       xds_client()->api_.CreateLrsInitialRequest(chand()->server_);
1447   send_message_payload_ =
1448       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
1449   grpc_slice_unref_internal(request_payload_slice);
1450   // Init other data associated with the LRS call.
1451   grpc_metadata_array_init(&initial_metadata_recv_);
1452   grpc_metadata_array_init(&trailing_metadata_recv_);
1453   // Start the call.
1454   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1455     gpr_log(GPR_INFO,
1456             "[xds_client %p] Starting LRS call (chand: %p, calld: %p, "
1457             "call: %p)",
1458             xds_client(), chand(), this, call_);
1459   }
1460   // Create the ops.
1461   grpc_call_error call_error;
1462   grpc_op ops[3];
1463   memset(ops, 0, sizeof(ops));
1464   // Op: send initial metadata.
1465   grpc_op* op = ops;
1466   op->op = GRPC_OP_SEND_INITIAL_METADATA;
1467   op->data.send_initial_metadata.count = 0;
1468   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
1469               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
1470   op->reserved = nullptr;
1471   op++;
1472   // Op: send request message.
1473   GPR_ASSERT(send_message_payload_ != nullptr);
1474   op->op = GRPC_OP_SEND_MESSAGE;
1475   op->data.send_message.send_message = send_message_payload_;
1476   op->flags = 0;
1477   op->reserved = nullptr;
1478   op++;
1479   Ref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked").release();
1480   GRPC_CLOSURE_INIT(&on_initial_request_sent_, OnInitialRequestSent, this,
1481                     grpc_schedule_on_exec_ctx);
1482   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1483                                                  &on_initial_request_sent_);
1484   GPR_ASSERT(GRPC_CALL_OK == call_error);
1485   // Op: recv initial metadata.
1486   op = ops;
1487   op->op = GRPC_OP_RECV_INITIAL_METADATA;
1488   op->data.recv_initial_metadata.recv_initial_metadata =
1489       &initial_metadata_recv_;
1490   op->flags = 0;
1491   op->reserved = nullptr;
1492   op++;
1493   // Op: recv response.
1494   op->op = GRPC_OP_RECV_MESSAGE;
1495   op->data.recv_message.recv_message = &recv_message_payload_;
1496   op->flags = 0;
1497   op->reserved = nullptr;
1498   op++;
1499   Ref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked").release();
1500   GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this,
1501                     grpc_schedule_on_exec_ctx);
1502   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1503                                                  &on_response_received_);
1504   GPR_ASSERT(GRPC_CALL_OK == call_error);
1505   // Op: recv server status.
1506   op = ops;
1507   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
1508   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
1509   op->data.recv_status_on_client.status = &status_code_;
1510   op->data.recv_status_on_client.status_details = &status_details_;
1511   op->flags = 0;
1512   op->reserved = nullptr;
1513   op++;
1514   // This callback signals the end of the call, so it relies on the initial
1515   // ref instead of a new ref. When it's invoked, it's the initial ref that is
1516   // unreffed.
1517   GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this,
1518                     grpc_schedule_on_exec_ctx);
1519   call_error = grpc_call_start_batch_and_execute(call_, ops, (size_t)(op - ops),
1520                                                  &on_status_received_);
1521   GPR_ASSERT(GRPC_CALL_OK == call_error);
1522 }
1523 
~LrsCallState()1524 XdsClient::ChannelState::LrsCallState::~LrsCallState() {
1525   grpc_metadata_array_destroy(&initial_metadata_recv_);
1526   grpc_metadata_array_destroy(&trailing_metadata_recv_);
1527   grpc_byte_buffer_destroy(send_message_payload_);
1528   grpc_byte_buffer_destroy(recv_message_payload_);
1529   grpc_slice_unref_internal(status_details_);
1530   GPR_ASSERT(call_ != nullptr);
1531   grpc_call_unref(call_);
1532 }
1533 
Orphan()1534 void XdsClient::ChannelState::LrsCallState::Orphan() {
1535   reporter_.reset();
1536   GPR_ASSERT(call_ != nullptr);
1537   // If we are here because xds_client wants to cancel the call,
1538   // on_status_received_ will complete the cancellation and clean up. Otherwise,
1539   // we are here because xds_client has to orphan a failed call, then the
1540   // following cancellation will be a no-op.
1541   grpc_call_cancel_internal(call_);
1542   // Note that the initial ref is hold by on_status_received_. So the
1543   // corresponding unref happens in on_status_received_ instead of here.
1544 }
1545 
MaybeStartReportingLocked()1546 void XdsClient::ChannelState::LrsCallState::MaybeStartReportingLocked() {
1547   // Don't start again if already started.
1548   if (reporter_ != nullptr) return;
1549   // Don't start if the previous send_message op (of the initial request or the
1550   // last report of the previous reporter) hasn't completed.
1551   if (send_message_payload_ != nullptr) return;
1552   // Don't start if no LRS response has arrived.
1553   if (!seen_response()) return;
1554   // Don't start if the ADS call hasn't received any valid response. Note that
1555   // this must be the first channel because it is the current channel but its
1556   // ADS call hasn't seen any response.
1557   if (chand()->ads_calld_ == nullptr ||
1558       chand()->ads_calld_->calld() == nullptr ||
1559       !chand()->ads_calld_->calld()->seen_response()) {
1560     return;
1561   }
1562   // Start reporting.
1563   reporter_ = MakeOrphanable<Reporter>(
1564       Ref(DEBUG_LOCATION, "LRS+load_report+start"), load_reporting_interval_);
1565 }
1566 
OnInitialRequestSent(void * arg,grpc_error *)1567 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSent(
1568     void* arg, grpc_error* /*error*/) {
1569   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1570   {
1571     MutexLock lock(&lrs_calld->xds_client()->mu_);
1572     lrs_calld->OnInitialRequestSentLocked();
1573   }
1574   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnInitialRequestSentLocked");
1575 }
1576 
OnInitialRequestSentLocked()1577 void XdsClient::ChannelState::LrsCallState::OnInitialRequestSentLocked() {
1578   // Clear the send_message_payload_.
1579   grpc_byte_buffer_destroy(send_message_payload_);
1580   send_message_payload_ = nullptr;
1581   MaybeStartReportingLocked();
1582 }
1583 
OnResponseReceived(void * arg,grpc_error *)1584 void XdsClient::ChannelState::LrsCallState::OnResponseReceived(
1585     void* arg, grpc_error* /*error*/) {
1586   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1587   bool done;
1588   {
1589     MutexLock lock(&lrs_calld->xds_client()->mu_);
1590     done = lrs_calld->OnResponseReceivedLocked();
1591   }
1592   if (done) lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnResponseReceivedLocked");
1593 }
1594 
OnResponseReceivedLocked()1595 bool XdsClient::ChannelState::LrsCallState::OnResponseReceivedLocked() {
1596   // Empty payload means the call was cancelled.
1597   if (!IsCurrentCallOnChannel() || recv_message_payload_ == nullptr) {
1598     return true;
1599   }
1600   // Read the response.
1601   grpc_byte_buffer_reader bbr;
1602   grpc_byte_buffer_reader_init(&bbr, recv_message_payload_);
1603   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1604   grpc_byte_buffer_reader_destroy(&bbr);
1605   grpc_byte_buffer_destroy(recv_message_payload_);
1606   recv_message_payload_ = nullptr;
1607   // This anonymous lambda is a hack to avoid the usage of goto.
1608   [&]() {
1609     // Parse the response.
1610     bool send_all_clusters = false;
1611     std::set<std::string> new_cluster_names;
1612     grpc_millis new_load_reporting_interval;
1613     grpc_error* parse_error = xds_client()->api_.ParseLrsResponse(
1614         response_slice, &send_all_clusters, &new_cluster_names,
1615         &new_load_reporting_interval);
1616     if (parse_error != GRPC_ERROR_NONE) {
1617       gpr_log(GPR_ERROR,
1618               "[xds_client %p] LRS response parsing failed. error=%s",
1619               xds_client(), grpc_error_string(parse_error));
1620       GRPC_ERROR_UNREF(parse_error);
1621       return;
1622     }
1623     seen_response_ = true;
1624     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1625       gpr_log(
1626           GPR_INFO,
1627           "[xds_client %p] LRS response received, %" PRIuPTR
1628           " cluster names, send_all_clusters=%d, load_report_interval=%" PRId64
1629           "ms",
1630           xds_client(), new_cluster_names.size(), send_all_clusters,
1631           new_load_reporting_interval);
1632       size_t i = 0;
1633       for (const auto& name : new_cluster_names) {
1634         gpr_log(GPR_INFO, "[xds_client %p] cluster_name %" PRIuPTR ": %s",
1635                 xds_client(), i++, name.c_str());
1636       }
1637     }
1638     if (new_load_reporting_interval <
1639         GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS) {
1640       new_load_reporting_interval =
1641           GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS;
1642       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1643         gpr_log(GPR_INFO,
1644                 "[xds_client %p] Increased load_report_interval to minimum "
1645                 "value %dms",
1646                 xds_client(), GRPC_XDS_MIN_CLIENT_LOAD_REPORTING_INTERVAL_MS);
1647       }
1648     }
1649     // Ignore identical update.
1650     if (send_all_clusters == send_all_clusters_ &&
1651         cluster_names_ == new_cluster_names &&
1652         load_reporting_interval_ == new_load_reporting_interval) {
1653       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1654         gpr_log(GPR_INFO,
1655                 "[xds_client %p] Incoming LRS response identical to current, "
1656                 "ignoring.",
1657                 xds_client());
1658       }
1659       return;
1660     }
1661     // Stop current load reporting (if any) to adopt the new config.
1662     reporter_.reset();
1663     // Record the new config.
1664     send_all_clusters_ = send_all_clusters;
1665     cluster_names_ = std::move(new_cluster_names);
1666     load_reporting_interval_ = new_load_reporting_interval;
1667     // Try starting sending load report.
1668     MaybeStartReportingLocked();
1669   }();
1670   grpc_slice_unref_internal(response_slice);
1671   if (xds_client()->shutting_down_) return true;
1672   // Keep listening for LRS config updates.
1673   grpc_op op;
1674   memset(&op, 0, sizeof(op));
1675   op.op = GRPC_OP_RECV_MESSAGE;
1676   op.data.recv_message.recv_message = &recv_message_payload_;
1677   op.flags = 0;
1678   op.reserved = nullptr;
1679   GPR_ASSERT(call_ != nullptr);
1680   // Reuse the "OnResponseReceivedLocked" ref taken in ctor.
1681   const grpc_call_error call_error =
1682       grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
1683   GPR_ASSERT(GRPC_CALL_OK == call_error);
1684   return false;
1685 }
1686 
OnStatusReceived(void * arg,grpc_error * error)1687 void XdsClient::ChannelState::LrsCallState::OnStatusReceived(
1688     void* arg, grpc_error* error) {
1689   LrsCallState* lrs_calld = static_cast<LrsCallState*>(arg);
1690   {
1691     MutexLock lock(&lrs_calld->xds_client()->mu_);
1692     lrs_calld->OnStatusReceivedLocked(GRPC_ERROR_REF(error));
1693   }
1694   lrs_calld->Unref(DEBUG_LOCATION, "LRS+OnStatusReceivedLocked");
1695 }
1696 
OnStatusReceivedLocked(grpc_error * error)1697 void XdsClient::ChannelState::LrsCallState::OnStatusReceivedLocked(
1698     grpc_error* error) {
1699   GPR_ASSERT(call_ != nullptr);
1700   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1701     char* status_details = grpc_slice_to_c_string(status_details_);
1702     gpr_log(GPR_INFO,
1703             "[xds_client %p] LRS call status received. Status = %d, details "
1704             "= '%s', (chand: %p, calld: %p, call: %p), error '%s'",
1705             xds_client(), status_code_, status_details, chand(), this, call_,
1706             grpc_error_string(error));
1707     gpr_free(status_details);
1708   }
1709   // Ignore status from a stale call.
1710   if (IsCurrentCallOnChannel()) {
1711     GPR_ASSERT(!xds_client()->shutting_down_);
1712     // Try to restart the call.
1713     parent_->OnCallFinishedLocked();
1714   }
1715   GRPC_ERROR_UNREF(error);
1716 }
1717 
IsCurrentCallOnChannel() const1718 bool XdsClient::ChannelState::LrsCallState::IsCurrentCallOnChannel() const {
1719   // If the retryable LRS call is null (which only happens when the xds channel
1720   // is shutting down), all the LRS calls are stale.
1721   if (chand()->lrs_calld_ == nullptr) return false;
1722   return this == chand()->lrs_calld_->calld();
1723 }
1724 
1725 //
1726 // XdsClient
1727 //
1728 
1729 namespace {
1730 
GetRequestTimeout()1731 grpc_millis GetRequestTimeout() {
1732   return grpc_channel_args_find_integer(
1733       g_channel_args, GRPC_ARG_XDS_RESOURCE_DOES_NOT_EXIST_TIMEOUT_MS,
1734       {15000, 0, INT_MAX});
1735 }
1736 
1737 }  // namespace
1738 
XdsClient(grpc_error ** error)1739 XdsClient::XdsClient(grpc_error** error)
1740     : DualRefCounted<XdsClient>(GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)
1741                                     ? "XdsClient"
1742                                     : nullptr),
1743       request_timeout_(GetRequestTimeout()),
1744       interested_parties_(grpc_pollset_set_create()),
1745       bootstrap_(
1746           XdsBootstrap::ReadFromFile(this, &grpc_xds_client_trace, error)),
1747       api_(this, &grpc_xds_client_trace,
1748            bootstrap_ == nullptr ? nullptr : bootstrap_->node()) {
1749   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1750     gpr_log(GPR_INFO, "[xds_client %p] creating xds client", this);
1751   }
1752   if (*error != GRPC_ERROR_NONE) {
1753     gpr_log(GPR_ERROR, "[xds_client %p] failed to read bootstrap file: %s",
1754             this, grpc_error_string(*error));
1755     return;
1756   }
1757   // Create ChannelState object.
1758   chand_ = MakeOrphanable<ChannelState>(
1759       WeakRef(DEBUG_LOCATION, "XdsClient+ChannelState"), bootstrap_->server());
1760 }
1761 
~XdsClient()1762 XdsClient::~XdsClient() {
1763   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1764     gpr_log(GPR_INFO, "[xds_client %p] destroying xds client", this);
1765   }
1766   grpc_pollset_set_destroy(interested_parties_);
1767 }
1768 
AddChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1769 void XdsClient::AddChannelzLinkage(
1770     channelz::ChannelNode* parent_channelz_node) {
1771   channelz::ChannelNode* xds_channelz_node =
1772       grpc_channel_get_channelz_node(chand_->channel());
1773   if (xds_channelz_node != nullptr) {
1774     parent_channelz_node->AddChildChannel(xds_channelz_node->uuid());
1775   }
1776 }
1777 
RemoveChannelzLinkage(channelz::ChannelNode * parent_channelz_node)1778 void XdsClient::RemoveChannelzLinkage(
1779     channelz::ChannelNode* parent_channelz_node) {
1780   channelz::ChannelNode* xds_channelz_node =
1781       grpc_channel_get_channelz_node(chand_->channel());
1782   if (xds_channelz_node != nullptr) {
1783     parent_channelz_node->RemoveChildChannel(xds_channelz_node->uuid());
1784   }
1785 }
1786 
Orphan()1787 void XdsClient::Orphan() {
1788   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1789     gpr_log(GPR_INFO, "[xds_client %p] shutting down xds client", this);
1790   }
1791   {
1792     MutexLock lock(g_mu);
1793     if (g_xds_client == this) g_xds_client = nullptr;
1794   }
1795   {
1796     MutexLock lock(&mu_);
1797     shutting_down_ = true;
1798     // Orphan ChannelState object.
1799     chand_.reset();
1800     // We do not clear cluster_map_ and endpoint_map_ if the xds client was
1801     // created by the XdsResolver because the maps contain refs for watchers
1802     // which in turn hold refs to the loadbalancing policies. At this point, it
1803     // is possible for ADS calls to be in progress. Unreffing the loadbalancing
1804     // policies before those calls are done would lead to issues such as
1805     // https://github.com/grpc/grpc/issues/20928.
1806     if (!listener_map_.empty()) {
1807       cluster_map_.clear();
1808       endpoint_map_.clear();
1809     }
1810   }
1811 }
1812 
WatchListenerData(absl::string_view listener_name,std::unique_ptr<ListenerWatcherInterface> watcher)1813 void XdsClient::WatchListenerData(
1814     absl::string_view listener_name,
1815     std::unique_ptr<ListenerWatcherInterface> watcher) {
1816   std::string listener_name_str = std::string(listener_name);
1817   MutexLock lock(&mu_);
1818   ListenerState& listener_state = listener_map_[listener_name_str];
1819   ListenerWatcherInterface* w = watcher.get();
1820   listener_state.watchers[w] = std::move(watcher);
1821   // If we've already received an LDS update, notify the new watcher
1822   // immediately.
1823   if (listener_state.update.has_value()) {
1824     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1825       gpr_log(GPR_INFO, "[xds_client %p] returning cached listener data for %s",
1826               this, listener_name_str.c_str());
1827     }
1828     w->OnListenerChanged(*listener_state.update);
1829   }
1830   chand_->Subscribe(XdsApi::kLdsTypeUrl, listener_name_str);
1831 }
1832 
CancelListenerDataWatch(absl::string_view listener_name,ListenerWatcherInterface * watcher,bool delay_unsubscription)1833 void XdsClient::CancelListenerDataWatch(absl::string_view listener_name,
1834                                         ListenerWatcherInterface* watcher,
1835                                         bool delay_unsubscription) {
1836   MutexLock lock(&mu_);
1837   if (shutting_down_) return;
1838   std::string listener_name_str = std::string(listener_name);
1839   ListenerState& listener_state = listener_map_[listener_name_str];
1840   auto it = listener_state.watchers.find(watcher);
1841   if (it != listener_state.watchers.end()) {
1842     listener_state.watchers.erase(it);
1843     if (listener_state.watchers.empty()) {
1844       listener_map_.erase(listener_name_str);
1845       chand_->Unsubscribe(XdsApi::kLdsTypeUrl, listener_name_str,
1846                           delay_unsubscription);
1847     }
1848   }
1849 }
1850 
WatchRouteConfigData(absl::string_view route_config_name,std::unique_ptr<RouteConfigWatcherInterface> watcher)1851 void XdsClient::WatchRouteConfigData(
1852     absl::string_view route_config_name,
1853     std::unique_ptr<RouteConfigWatcherInterface> watcher) {
1854   std::string route_config_name_str = std::string(route_config_name);
1855   MutexLock lock(&mu_);
1856   RouteConfigState& route_config_state =
1857       route_config_map_[route_config_name_str];
1858   RouteConfigWatcherInterface* w = watcher.get();
1859   route_config_state.watchers[w] = std::move(watcher);
1860   // If we've already received an RDS update, notify the new watcher
1861   // immediately.
1862   if (route_config_state.update.has_value()) {
1863     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1864       gpr_log(GPR_INFO,
1865               "[xds_client %p] returning cached route config data for %s", this,
1866               route_config_name_str.c_str());
1867     }
1868     w->OnRouteConfigChanged(*route_config_state.update);
1869   }
1870   chand_->Subscribe(XdsApi::kRdsTypeUrl, route_config_name_str);
1871 }
1872 
CancelRouteConfigDataWatch(absl::string_view route_config_name,RouteConfigWatcherInterface * watcher,bool delay_unsubscription)1873 void XdsClient::CancelRouteConfigDataWatch(absl::string_view route_config_name,
1874                                            RouteConfigWatcherInterface* watcher,
1875                                            bool delay_unsubscription) {
1876   MutexLock lock(&mu_);
1877   if (shutting_down_) return;
1878   std::string route_config_name_str = std::string(route_config_name);
1879   RouteConfigState& route_config_state =
1880       route_config_map_[route_config_name_str];
1881   auto it = route_config_state.watchers.find(watcher);
1882   if (it != route_config_state.watchers.end()) {
1883     route_config_state.watchers.erase(it);
1884     if (route_config_state.watchers.empty()) {
1885       route_config_map_.erase(route_config_name_str);
1886       chand_->Unsubscribe(XdsApi::kRdsTypeUrl, route_config_name_str,
1887                           delay_unsubscription);
1888     }
1889   }
1890 }
1891 
WatchClusterData(absl::string_view cluster_name,std::unique_ptr<ClusterWatcherInterface> watcher)1892 void XdsClient::WatchClusterData(
1893     absl::string_view cluster_name,
1894     std::unique_ptr<ClusterWatcherInterface> watcher) {
1895   std::string cluster_name_str = std::string(cluster_name);
1896   MutexLock lock(&mu_);
1897   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1898   ClusterWatcherInterface* w = watcher.get();
1899   cluster_state.watchers[w] = std::move(watcher);
1900   // If we've already received a CDS update, notify the new watcher
1901   // immediately.
1902   if (cluster_state.update.has_value()) {
1903     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1904       gpr_log(GPR_INFO, "[xds_client %p] returning cached cluster data for %s",
1905               this, cluster_name_str.c_str());
1906     }
1907     w->OnClusterChanged(cluster_state.update.value());
1908   }
1909   chand_->Subscribe(XdsApi::kCdsTypeUrl, cluster_name_str);
1910 }
1911 
CancelClusterDataWatch(absl::string_view cluster_name,ClusterWatcherInterface * watcher,bool delay_unsubscription)1912 void XdsClient::CancelClusterDataWatch(absl::string_view cluster_name,
1913                                        ClusterWatcherInterface* watcher,
1914                                        bool delay_unsubscription) {
1915   MutexLock lock(&mu_);
1916   if (shutting_down_) return;
1917   std::string cluster_name_str = std::string(cluster_name);
1918   ClusterState& cluster_state = cluster_map_[cluster_name_str];
1919   auto it = cluster_state.watchers.find(watcher);
1920   if (it != cluster_state.watchers.end()) {
1921     cluster_state.watchers.erase(it);
1922     if (cluster_state.watchers.empty()) {
1923       cluster_map_.erase(cluster_name_str);
1924       chand_->Unsubscribe(XdsApi::kCdsTypeUrl, cluster_name_str,
1925                           delay_unsubscription);
1926     }
1927   }
1928 }
1929 
WatchEndpointData(absl::string_view eds_service_name,std::unique_ptr<EndpointWatcherInterface> watcher)1930 void XdsClient::WatchEndpointData(
1931     absl::string_view eds_service_name,
1932     std::unique_ptr<EndpointWatcherInterface> watcher) {
1933   std::string eds_service_name_str = std::string(eds_service_name);
1934   MutexLock lock(&mu_);
1935   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1936   EndpointWatcherInterface* w = watcher.get();
1937   endpoint_state.watchers[w] = std::move(watcher);
1938   // If we've already received an EDS update, notify the new watcher
1939   // immediately.
1940   if (endpoint_state.update.has_value()) {
1941     if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
1942       gpr_log(GPR_INFO, "[xds_client %p] returning cached endpoint data for %s",
1943               this, eds_service_name_str.c_str());
1944     }
1945     w->OnEndpointChanged(endpoint_state.update.value());
1946   }
1947   chand_->Subscribe(XdsApi::kEdsTypeUrl, eds_service_name_str);
1948 }
1949 
CancelEndpointDataWatch(absl::string_view eds_service_name,EndpointWatcherInterface * watcher,bool delay_unsubscription)1950 void XdsClient::CancelEndpointDataWatch(absl::string_view eds_service_name,
1951                                         EndpointWatcherInterface* watcher,
1952                                         bool delay_unsubscription) {
1953   MutexLock lock(&mu_);
1954   if (shutting_down_) return;
1955   std::string eds_service_name_str = std::string(eds_service_name);
1956   EndpointState& endpoint_state = endpoint_map_[eds_service_name_str];
1957   auto it = endpoint_state.watchers.find(watcher);
1958   if (it != endpoint_state.watchers.end()) {
1959     endpoint_state.watchers.erase(it);
1960     if (endpoint_state.watchers.empty()) {
1961       endpoint_map_.erase(eds_service_name_str);
1962       chand_->Unsubscribe(XdsApi::kEdsTypeUrl, eds_service_name_str,
1963                           delay_unsubscription);
1964     }
1965   }
1966 }
1967 
AddClusterDropStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name)1968 RefCountedPtr<XdsClusterDropStats> XdsClient::AddClusterDropStats(
1969     absl::string_view lrs_server, absl::string_view cluster_name,
1970     absl::string_view eds_service_name) {
1971   // TODO(roth): When we add support for direct federation, use the
1972   // server name specified in lrs_server.
1973   auto key =
1974       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
1975   MutexLock lock(&mu_);
1976   // We jump through some hoops here to make sure that the absl::string_views
1977   // stored in the XdsClusterDropStats object point to the strings
1978   // in the load_report_map_ key, so that they have the same lifetime.
1979   auto it = load_report_map_
1980                 .emplace(std::make_pair(std::move(key), LoadReportState()))
1981                 .first;
1982   LoadReportState& load_report_state = it->second;
1983   RefCountedPtr<XdsClusterDropStats> cluster_drop_stats;
1984   if (load_report_state.drop_stats != nullptr) {
1985     cluster_drop_stats = load_report_state.drop_stats->RefIfNonZero();
1986   }
1987   if (cluster_drop_stats == nullptr) {
1988     if (load_report_state.drop_stats != nullptr) {
1989       load_report_state.deleted_drop_stats +=
1990           load_report_state.drop_stats->GetSnapshotAndReset();
1991     }
1992     cluster_drop_stats = MakeRefCounted<XdsClusterDropStats>(
1993         Ref(DEBUG_LOCATION, "DropStats"), lrs_server,
1994         it->first.first /*cluster_name*/,
1995         it->first.second /*eds_service_name*/);
1996     load_report_state.drop_stats = cluster_drop_stats.get();
1997   }
1998   chand_->MaybeStartLrsCall();
1999   return cluster_drop_stats;
2000 }
2001 
RemoveClusterDropStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,XdsClusterDropStats * cluster_drop_stats)2002 void XdsClient::RemoveClusterDropStats(
2003     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2004     absl::string_view eds_service_name,
2005     XdsClusterDropStats* cluster_drop_stats) {
2006   MutexLock lock(&mu_);
2007   // TODO(roth): When we add support for direct federation, use the
2008   // server name specified in lrs_server.
2009   auto it = load_report_map_.find(
2010       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2011   if (it == load_report_map_.end()) return;
2012   LoadReportState& load_report_state = it->second;
2013   if (load_report_state.drop_stats == cluster_drop_stats) {
2014     // Record final snapshot in deleted_drop_stats, which will be
2015     // added to the next load report.
2016     load_report_state.deleted_drop_stats +=
2017         load_report_state.drop_stats->GetSnapshotAndReset();
2018     load_report_state.drop_stats = nullptr;
2019   }
2020 }
2021 
AddClusterLocalityStats(absl::string_view lrs_server,absl::string_view cluster_name,absl::string_view eds_service_name,RefCountedPtr<XdsLocalityName> locality)2022 RefCountedPtr<XdsClusterLocalityStats> XdsClient::AddClusterLocalityStats(
2023     absl::string_view lrs_server, absl::string_view cluster_name,
2024     absl::string_view eds_service_name,
2025     RefCountedPtr<XdsLocalityName> locality) {
2026   // TODO(roth): When we add support for direct federation, use the
2027   // server name specified in lrs_server.
2028   auto key =
2029       std::make_pair(std::string(cluster_name), std::string(eds_service_name));
2030   MutexLock lock(&mu_);
2031   // We jump through some hoops here to make sure that the absl::string_views
2032   // stored in the XdsClusterLocalityStats object point to the strings
2033   // in the load_report_map_ key, so that they have the same lifetime.
2034   auto it = load_report_map_
2035                 .emplace(std::make_pair(std::move(key), LoadReportState()))
2036                 .first;
2037   LoadReportState& load_report_state = it->second;
2038   LoadReportState::LocalityState& locality_state =
2039       load_report_state.locality_stats[locality];
2040   RefCountedPtr<XdsClusterLocalityStats> cluster_locality_stats;
2041   if (locality_state.locality_stats != nullptr) {
2042     cluster_locality_stats = locality_state.locality_stats->RefIfNonZero();
2043   }
2044   if (cluster_locality_stats == nullptr) {
2045     if (locality_state.locality_stats != nullptr) {
2046       locality_state.deleted_locality_stats +=
2047           locality_state.locality_stats->GetSnapshotAndReset();
2048     }
2049     cluster_locality_stats = MakeRefCounted<XdsClusterLocalityStats>(
2050         Ref(DEBUG_LOCATION, "LocalityStats"), lrs_server,
2051         it->first.first /*cluster_name*/, it->first.second /*eds_service_name*/,
2052         std::move(locality));
2053     locality_state.locality_stats = cluster_locality_stats.get();
2054   }
2055   chand_->MaybeStartLrsCall();
2056   return cluster_locality_stats;
2057 }
2058 
RemoveClusterLocalityStats(absl::string_view,absl::string_view cluster_name,absl::string_view eds_service_name,const RefCountedPtr<XdsLocalityName> & locality,XdsClusterLocalityStats * cluster_locality_stats)2059 void XdsClient::RemoveClusterLocalityStats(
2060     absl::string_view /*lrs_server*/, absl::string_view cluster_name,
2061     absl::string_view eds_service_name,
2062     const RefCountedPtr<XdsLocalityName>& locality,
2063     XdsClusterLocalityStats* cluster_locality_stats) {
2064   MutexLock lock(&mu_);
2065   // TODO(roth): When we add support for direct federation, use the
2066   // server name specified in lrs_server.
2067   auto it = load_report_map_.find(
2068       std::make_pair(std::string(cluster_name), std::string(eds_service_name)));
2069   if (it == load_report_map_.end()) return;
2070   LoadReportState& load_report_state = it->second;
2071   auto locality_it = load_report_state.locality_stats.find(locality);
2072   if (locality_it == load_report_state.locality_stats.end()) return;
2073   LoadReportState::LocalityState& locality_state = locality_it->second;
2074   if (locality_state.locality_stats == cluster_locality_stats) {
2075     // Record final snapshot in deleted_locality_stats, which will be
2076     // added to the next load report.
2077     locality_state.deleted_locality_stats +=
2078         locality_state.locality_stats->GetSnapshotAndReset();
2079     locality_state.locality_stats = nullptr;
2080   }
2081 }
2082 
ResetBackoff()2083 void XdsClient::ResetBackoff() {
2084   MutexLock lock(&mu_);
2085   if (chand_ != nullptr) {
2086     grpc_channel_reset_connect_backoff(chand_->channel());
2087   }
2088 }
2089 
NotifyOnErrorLocked(grpc_error * error)2090 void XdsClient::NotifyOnErrorLocked(grpc_error* error) {
2091   for (const auto& p : listener_map_) {
2092     const ListenerState& listener_state = p.second;
2093     for (const auto& p : listener_state.watchers) {
2094       p.first->OnError(GRPC_ERROR_REF(error));
2095     }
2096   }
2097   for (const auto& p : route_config_map_) {
2098     const RouteConfigState& route_config_state = p.second;
2099     for (const auto& p : route_config_state.watchers) {
2100       p.first->OnError(GRPC_ERROR_REF(error));
2101     }
2102   }
2103   for (const auto& p : cluster_map_) {
2104     const ClusterState& cluster_state = p.second;
2105     for (const auto& p : cluster_state.watchers) {
2106       p.first->OnError(GRPC_ERROR_REF(error));
2107     }
2108   }
2109   for (const auto& p : endpoint_map_) {
2110     const EndpointState& endpoint_state = p.second;
2111     for (const auto& p : endpoint_state.watchers) {
2112       p.first->OnError(GRPC_ERROR_REF(error));
2113     }
2114   }
2115   GRPC_ERROR_UNREF(error);
2116 }
2117 
BuildLoadReportSnapshotLocked(bool send_all_clusters,const std::set<std::string> & clusters)2118 XdsApi::ClusterLoadReportMap XdsClient::BuildLoadReportSnapshotLocked(
2119     bool send_all_clusters, const std::set<std::string>& clusters) {
2120   if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2121     gpr_log(GPR_INFO, "[xds_client %p] start building load report", this);
2122   }
2123   XdsApi::ClusterLoadReportMap snapshot_map;
2124   for (auto load_report_it = load_report_map_.begin();
2125        load_report_it != load_report_map_.end();) {
2126     // Cluster key is cluster and EDS service name.
2127     const auto& cluster_key = load_report_it->first;
2128     LoadReportState& load_report = load_report_it->second;
2129     // If the CDS response for a cluster indicates to use LRS but the
2130     // LRS server does not say that it wants reports for this cluster,
2131     // then we'll have stats objects here whose data we're not going to
2132     // include in the load report.  However, we still need to clear out
2133     // the data from the stats objects, so that if the LRS server starts
2134     // asking for the data in the future, we don't incorrectly include
2135     // data from previous reporting intervals in that future report.
2136     const bool record_stats =
2137         send_all_clusters || clusters.find(cluster_key.first) != clusters.end();
2138     XdsApi::ClusterLoadReport snapshot;
2139     // Aggregate drop stats.
2140     snapshot.dropped_requests = std::move(load_report.deleted_drop_stats);
2141     if (load_report.drop_stats != nullptr) {
2142       snapshot.dropped_requests +=
2143           load_report.drop_stats->GetSnapshotAndReset();
2144       if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2145         gpr_log(GPR_INFO,
2146                 "[xds_client %p] cluster=%s eds_service_name=%s drop_stats=%p",
2147                 this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2148                 load_report.drop_stats);
2149       }
2150     }
2151     // Aggregate locality stats.
2152     for (auto it = load_report.locality_stats.begin();
2153          it != load_report.locality_stats.end();) {
2154       const RefCountedPtr<XdsLocalityName>& locality_name = it->first;
2155       auto& locality_state = it->second;
2156       XdsClusterLocalityStats::Snapshot& locality_snapshot =
2157           snapshot.locality_stats[locality_name];
2158       locality_snapshot = std::move(locality_state.deleted_locality_stats);
2159       if (locality_state.locality_stats != nullptr) {
2160         locality_snapshot +=
2161             locality_state.locality_stats->GetSnapshotAndReset();
2162         if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_client_trace)) {
2163           gpr_log(GPR_INFO,
2164                   "[xds_client %p] cluster=%s eds_service_name=%s "
2165                   "locality=%s locality_stats=%p",
2166                   this, cluster_key.first.c_str(), cluster_key.second.c_str(),
2167                   locality_name->AsHumanReadableString().c_str(),
2168                   locality_state.locality_stats);
2169         }
2170       }
2171       // If the only thing left in this entry was final snapshots from
2172       // deleted locality stats objects, remove the entry.
2173       if (locality_state.locality_stats == nullptr) {
2174         it = load_report.locality_stats.erase(it);
2175       } else {
2176         ++it;
2177       }
2178     }
2179     // Compute load report interval.
2180     const grpc_millis now = ExecCtx::Get()->Now();
2181     snapshot.load_report_interval = now - load_report.last_report_time;
2182     load_report.last_report_time = now;
2183     // Record snapshot.
2184     if (record_stats) {
2185       snapshot_map[cluster_key] = std::move(snapshot);
2186     }
2187     // If the only thing left in this entry was final snapshots from
2188     // deleted stats objects, remove the entry.
2189     if (load_report.locality_stats.empty() &&
2190         load_report.drop_stats == nullptr) {
2191       load_report_it = load_report_map_.erase(load_report_it);
2192     } else {
2193       ++load_report_it;
2194     }
2195   }
2196   return snapshot_map;
2197 }
2198 
2199 //
2200 // accessors for global state
2201 //
2202 
XdsClientGlobalInit()2203 void XdsClientGlobalInit() { g_mu = new Mutex; }
2204 
XdsClientGlobalShutdown()2205 void XdsClientGlobalShutdown() {
2206   delete g_mu;
2207   g_mu = nullptr;
2208 }
2209 
GetOrCreate(grpc_error ** error)2210 RefCountedPtr<XdsClient> XdsClient::GetOrCreate(grpc_error** error) {
2211   MutexLock lock(g_mu);
2212   if (g_xds_client != nullptr) {
2213     auto xds_client = g_xds_client->RefIfNonZero();
2214     if (xds_client != nullptr) return xds_client;
2215   }
2216   auto xds_client = MakeRefCounted<XdsClient>(error);
2217   g_xds_client = xds_client.get();
2218   return xds_client;
2219 }
2220 
2221 namespace internal {
2222 
SetXdsChannelArgsForTest(grpc_channel_args * args)2223 void SetXdsChannelArgsForTest(grpc_channel_args* args) {
2224   MutexLock lock(g_mu);
2225   g_channel_args = args;
2226 }
2227 
UnsetGlobalXdsClientForTest()2228 void UnsetGlobalXdsClientForTest() {
2229   MutexLock lock(g_mu);
2230   g_xds_client = nullptr;
2231 }
2232 
2233 }  // namespace internal
2234 
2235 }  // namespace grpc_core
2236