1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose).  The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down).  In needed, we retry the
33 /// call.  If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends.  Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses.  If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
43 /// resolver.
44 ///
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance.  Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
50 ///
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
53 
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
59 
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
62 
63 #include <inttypes.h>
64 #include <limits.h>
65 #include <string.h>
66 
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
72 
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/string.h"
88 #include "src/core/lib/gprpp/manual_constructor.h"
89 #include "src/core/lib/gprpp/memory.h"
90 #include "src/core/lib/gprpp/orphanable.h"
91 #include "src/core/lib/gprpp/ref_counted_ptr.h"
92 #include "src/core/lib/iomgr/combiner.h"
93 #include "src/core/lib/iomgr/sockaddr.h"
94 #include "src/core/lib/iomgr/sockaddr_utils.h"
95 #include "src/core/lib/iomgr/timer.h"
96 #include "src/core/lib/slice/slice_hash_table.h"
97 #include "src/core/lib/slice/slice_internal.h"
98 #include "src/core/lib/slice/slice_string_helpers.h"
99 #include "src/core/lib/surface/call.h"
100 #include "src/core/lib/surface/channel.h"
101 #include "src/core/lib/surface/channel_init.h"
102 #include "src/core/lib/transport/static_metadata.h"
103 
104 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
105 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
106 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
107 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
108 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
109 
110 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
111 
112 namespace grpc_core {
113 
114 TraceFlag grpc_lb_glb_trace(false, "glb");
115 
116 namespace {
117 
118 constexpr char kGrpclb[] = "grpclb";
119 
120 class ParsedGrpcLbConfig : public LoadBalancingPolicy::Config {
121  public:
ParsedGrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy)122   explicit ParsedGrpcLbConfig(
123       RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
124       : child_policy_(std::move(child_policy)) {}
name() const125   const char* name() const override { return kGrpclb; }
126 
child_policy() const127   RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
128     return child_policy_;
129   }
130 
131  private:
132   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
133 };
134 
135 class GrpcLb : public LoadBalancingPolicy {
136  public:
137   explicit GrpcLb(Args args);
138 
name() const139   const char* name() const override { return kGrpclb; }
140 
141   void UpdateLocked(UpdateArgs args) override;
142   void ResetBackoffLocked() override;
143 
144  private:
145   /// Contains a call to the LB server and all the data related to the call.
146   class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
147    public:
148     explicit BalancerCallState(
149         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
150 
151     // It's the caller's responsibility to ensure that Orphan() is called from
152     // inside the combiner.
153     void Orphan() override;
154 
155     void StartQuery();
156 
client_stats() const157     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
158 
seen_initial_response() const159     bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const160     bool seen_serverlist() const { return seen_serverlist_; }
161 
162    private:
163     GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
164 
165     ~BalancerCallState();
166 
grpclb_policy() const167     GrpcLb* grpclb_policy() const {
168       return static_cast<GrpcLb*>(grpclb_policy_.get());
169     }
170 
171     void ScheduleNextClientLoadReportLocked();
172     void SendClientLoadReportLocked();
173 
174     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
175 
176     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
177     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
178     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
179     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
180     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
181 
182     // The owning LB policy.
183     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
184 
185     // The streaming call to the LB server. Always non-NULL.
186     grpc_call* lb_call_ = nullptr;
187 
188     // recv_initial_metadata
189     grpc_metadata_array lb_initial_metadata_recv_;
190 
191     // send_message
192     grpc_byte_buffer* send_message_payload_ = nullptr;
193     grpc_closure lb_on_initial_request_sent_;
194 
195     // recv_message
196     grpc_byte_buffer* recv_message_payload_ = nullptr;
197     grpc_closure lb_on_balancer_message_received_;
198     bool seen_initial_response_ = false;
199     bool seen_serverlist_ = false;
200 
201     // recv_trailing_metadata
202     grpc_closure lb_on_balancer_status_received_;
203     grpc_metadata_array lb_trailing_metadata_recv_;
204     grpc_status_code lb_call_status_;
205     grpc_slice lb_call_status_details_;
206 
207     // The stats for client-side load reporting associated with this LB call.
208     // Created after the first serverlist is received.
209     RefCountedPtr<GrpcLbClientStats> client_stats_;
210     grpc_millis client_stats_report_interval_ = 0;
211     grpc_timer client_load_report_timer_;
212     bool client_load_report_timer_callback_pending_ = false;
213     bool last_client_load_report_counters_were_zero_ = false;
214     bool client_load_report_is_due_ = false;
215     // The closure used for either the load report timer or the callback for
216     // completion of sending the load report.
217     grpc_closure client_load_report_closure_;
218   };
219 
220   class Serverlist : public RefCounted<Serverlist> {
221    public:
222     // Takes ownership of serverlist.
Serverlist(grpc_grpclb_serverlist * serverlist)223     explicit Serverlist(grpc_grpclb_serverlist* serverlist)
224         : serverlist_(serverlist) {}
225 
~Serverlist()226     ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
227 
228     bool operator==(const Serverlist& other) const;
229 
serverlist() const230     const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
231 
232     // Returns a text representation suitable for logging.
233     UniquePtr<char> AsText() const;
234 
235     // Extracts all non-drop entries into a ServerAddressList.
236     ServerAddressList GetServerAddressList(
237         GrpcLbClientStats* client_stats) const;
238 
239     // Returns true if the serverlist contains at least one drop entry and
240     // no backend address entries.
241     bool ContainsAllDropEntries() const;
242 
243     // Returns the LB token to use for a drop, or null if the call
244     // should not be dropped.
245     //
246     // Note: This is called from the picker, so it will be invoked in
247     // the channel's data plane combiner, NOT the control plane
248     // combiner.  It should not be accessed by any other part of the LB
249     // policy.
250     const char* ShouldDrop();
251 
252    private:
253     grpc_grpclb_serverlist* serverlist_;
254 
255     // Guarded by the channel's data plane combiner, NOT the control
256     // plane combiner.  It should not be accessed by anything but the
257     // picker via the ShouldDrop() method.
258     size_t drop_index_ = 0;
259   };
260 
261   class Picker : public SubchannelPicker {
262    public:
Picker(GrpcLb * parent,RefCountedPtr<Serverlist> serverlist,UniquePtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)263     Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
264            UniquePtr<SubchannelPicker> child_picker,
265            RefCountedPtr<GrpcLbClientStats> client_stats)
266         : parent_(parent),
267           serverlist_(std::move(serverlist)),
268           child_picker_(std::move(child_picker)),
269           client_stats_(std::move(client_stats)) {}
270 
271     PickResult Pick(PickArgs args) override;
272 
273    private:
274     // Storing the address for logging, but not holding a ref.
275     // DO NOT DEFERENCE!
276     GrpcLb* parent_;
277 
278     // Serverlist to be used for determining drops.
279     RefCountedPtr<Serverlist> serverlist_;
280 
281     UniquePtr<SubchannelPicker> child_picker_;
282     RefCountedPtr<GrpcLbClientStats> client_stats_;
283   };
284 
285   class Helper : public ChannelControlHelper {
286    public:
Helper(RefCountedPtr<GrpcLb> parent)287     explicit Helper(RefCountedPtr<GrpcLb> parent)
288         : parent_(std::move(parent)) {}
289 
290     RefCountedPtr<SubchannelInterface> CreateSubchannel(
291         const grpc_channel_args& args) override;
292     grpc_channel* CreateChannel(const char* target,
293                                 const grpc_channel_args& args) override;
294     void UpdateState(grpc_connectivity_state state,
295                      UniquePtr<SubchannelPicker> picker) override;
296     void RequestReresolution() override;
297     void AddTraceEvent(TraceSeverity severity, const char* message) override;
298 
set_child(LoadBalancingPolicy * child)299     void set_child(LoadBalancingPolicy* child) { child_ = child; }
300 
301    private:
302     bool CalledByPendingChild() const;
303     bool CalledByCurrentChild() const;
304 
305     RefCountedPtr<GrpcLb> parent_;
306     LoadBalancingPolicy* child_ = nullptr;
307   };
308 
309   ~GrpcLb();
310 
311   void ShutdownLocked() override;
312 
313   // Helper functions used in UpdateLocked().
314   void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
315                                             const grpc_channel_args& args);
316   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
317                                                          grpc_error* error);
318   void CancelBalancerChannelConnectivityWatchLocked();
319 
320   // Methods for dealing with fallback state.
321   void MaybeEnterFallbackModeAfterStartup();
322   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
323 
324   // Methods for dealing with the balancer call.
325   void StartBalancerCallLocked();
326   void StartBalancerCallRetryTimerLocked();
327   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
328 
329   // Methods for dealing with the child policy.
330   grpc_channel_args* CreateChildPolicyArgsLocked(
331       bool is_backend_from_grpclb_load_balancer);
332   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
333       const char* name, const grpc_channel_args* args);
334   void CreateOrUpdateChildPolicyLocked();
335 
336   // Who the client is trying to communicate with.
337   const char* server_name_ = nullptr;
338 
339   // Current channel args from the resolver.
340   grpc_channel_args* args_ = nullptr;
341 
342   // Internal state.
343   bool shutting_down_ = false;
344 
345   // The channel for communicating with the LB server.
346   grpc_channel* lb_channel_ = nullptr;
347   // Response generator to inject address updates into lb_channel_.
348   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
349 
350   // The data associated with the current LB call. It holds a ref to this LB
351   // policy. It's initialized every time we query for backends. It's reset to
352   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
353   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
354   // contains a non-NULL lb_call_.
355   OrphanablePtr<BalancerCallState> lb_calld_;
356   // Timeout in milliseconds for the LB call. 0 means no deadline.
357   int lb_call_timeout_ms_ = 0;
358   // Balancer call retry state.
359   BackOff lb_call_backoff_;
360   bool retry_timer_callback_pending_ = false;
361   grpc_timer lb_call_retry_timer_;
362   grpc_closure lb_on_call_retry_;
363 
364   // The deserialized response from the balancer. May be nullptr until one
365   // such response has arrived.
366   RefCountedPtr<Serverlist> serverlist_;
367 
368   // Whether we're in fallback mode.
369   bool fallback_mode_ = false;
370   // The backend addresses from the resolver.
371   ServerAddressList fallback_backend_addresses_;
372   // State for fallback-at-startup checks.
373   // Timeout after startup after which we will go into fallback mode if
374   // we have not received a serverlist from the balancer.
375   int fallback_at_startup_timeout_ = 0;
376   bool fallback_at_startup_checks_pending_ = false;
377   grpc_timer lb_fallback_timer_;
378   grpc_closure lb_on_fallback_;
379   grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
380   grpc_closure lb_channel_on_connectivity_changed_;
381 
382   // The child policy to use for the backends.
383   OrphanablePtr<LoadBalancingPolicy> child_policy_;
384   // When switching child policies, the new policy will be stored here
385   // until it reports READY, at which point it will be moved to child_policy_.
386   OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
387   // The child policy config.
388   RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
389   // Child policy in state READY.
390   bool child_policy_ready_ = false;
391 };
392 
393 //
394 // GrpcLb::Serverlist
395 //
396 
operator ==(const Serverlist & other) const397 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
398   return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
399 }
400 
ParseServer(const grpc_grpclb_server * server,grpc_resolved_address * addr)401 void ParseServer(const grpc_grpclb_server* server,
402                  grpc_resolved_address* addr) {
403   memset(addr, 0, sizeof(*addr));
404   if (server->drop) return;
405   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
406   /* the addresses are given in binary format (a in(6)_addr struct) in
407    * server->ip_address.bytes. */
408   const grpc_grpclb_ip_address* ip = &server->ip_address;
409   if (ip->size == 4) {
410     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
411     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
412     addr4->sin_family = GRPC_AF_INET;
413     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
414     addr4->sin_port = netorder_port;
415   } else if (ip->size == 16) {
416     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
417     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
418     addr6->sin6_family = GRPC_AF_INET6;
419     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
420     addr6->sin6_port = netorder_port;
421   }
422 }
423 
AsText() const424 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
425   gpr_strvec entries;
426   gpr_strvec_init(&entries);
427   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
428     const auto* server = serverlist_->servers[i];
429     char* ipport;
430     if (server->drop) {
431       ipport = gpr_strdup("(drop)");
432     } else {
433       grpc_resolved_address addr;
434       ParseServer(server, &addr);
435       grpc_sockaddr_to_string(&ipport, &addr, false);
436     }
437     char* entry;
438     gpr_asprintf(&entry, "  %" PRIuPTR ": %s token=%s\n", i, ipport,
439                  server->load_balance_token);
440     gpr_free(ipport);
441     gpr_strvec_add(&entries, entry);
442   }
443   UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
444   gpr_strvec_destroy(&entries);
445   return result;
446 }
447 
448 // vtable for LB token channel arg.
lb_token_copy(void * token)449 void* lb_token_copy(void* token) {
450   return token == nullptr
451              ? nullptr
452              : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
453 }
lb_token_destroy(void * token)454 void lb_token_destroy(void* token) {
455   if (token != nullptr) {
456     GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
457   }
458 }
lb_token_cmp(void * token1,void * token2)459 int lb_token_cmp(void* token1, void* token2) {
460   // Always indicate a match, since we don't want this channel arg to
461   // affect the subchannel's key in the index.
462   return 0;
463 }
464 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
465     lb_token_copy, lb_token_destroy, lb_token_cmp};
466 
IsServerValid(const grpc_grpclb_server * server,size_t idx,bool log)467 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
468   if (server->drop) return false;
469   const grpc_grpclb_ip_address* ip = &server->ip_address;
470   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
471     if (log) {
472       gpr_log(GPR_ERROR,
473               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
474               server->port, (unsigned long)idx);
475     }
476     return false;
477   }
478   if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
479     if (log) {
480       gpr_log(GPR_ERROR,
481               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
482               "serverlist. Ignoring",
483               ip->size, (unsigned long)idx);
484     }
485     return false;
486   }
487   return true;
488 }
489 
490 // Returns addresses extracted from the serverlist.
GetServerAddressList(GrpcLbClientStats * client_stats) const491 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
492     GrpcLbClientStats* client_stats) const {
493   ServerAddressList addresses;
494   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
495     const grpc_grpclb_server* server = serverlist_->servers[i];
496     if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
497     // Address processing.
498     grpc_resolved_address addr;
499     ParseServer(server, &addr);
500     // LB token processing.
501     grpc_mdelem lb_token;
502     if (server->has_load_balance_token) {
503       const size_t lb_token_max_length =
504           GPR_ARRAY_SIZE(server->load_balance_token);
505       const size_t lb_token_length =
506           strnlen(server->load_balance_token, lb_token_max_length);
507       grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
508           server->load_balance_token, lb_token_length);
509       lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
510       if (client_stats != nullptr) {
511         GPR_ASSERT(grpc_mdelem_set_user_data(
512                        lb_token, GrpcLbClientStats::Destroy,
513                        client_stats->Ref().release()) == client_stats);
514       }
515     } else {
516       char* uri = grpc_sockaddr_to_uri(&addr);
517       gpr_log(GPR_INFO,
518               "Missing LB token for backend address '%s'. The empty token will "
519               "be used instead",
520               uri);
521       gpr_free(uri);
522       lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
523     }
524     // Add address.
525     grpc_arg arg = grpc_channel_arg_pointer_create(
526         const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
527         (void*)lb_token.payload, &lb_token_arg_vtable);
528     grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
529     addresses.emplace_back(addr, args);
530     // Clean up.
531     GRPC_MDELEM_UNREF(lb_token);
532   }
533   return addresses;
534 }
535 
ContainsAllDropEntries() const536 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
537   if (serverlist_->num_servers == 0) return false;
538   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
539     if (!serverlist_->servers[i]->drop) return false;
540   }
541   return true;
542 }
543 
ShouldDrop()544 const char* GrpcLb::Serverlist::ShouldDrop() {
545   if (serverlist_->num_servers == 0) return nullptr;
546   grpc_grpclb_server* server = serverlist_->servers[drop_index_];
547   drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
548   return server->drop ? server->load_balance_token : nullptr;
549 }
550 
551 //
552 // GrpcLb::Picker
553 //
554 
Pick(PickArgs args)555 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
556   PickResult result;
557   // Check if we should drop the call.
558   const char* drop_token = serverlist_->ShouldDrop();
559   if (drop_token != nullptr) {
560     // Update client load reporting stats to indicate the number of
561     // dropped calls.  Note that we have to do this here instead of in
562     // the client_load_reporting filter, because we do not create a
563     // subchannel call (and therefore no client_load_reporting filter)
564     // for dropped calls.
565     if (client_stats_ != nullptr) {
566       client_stats_->AddCallDropped(drop_token);
567     }
568     result.type = PickResult::PICK_COMPLETE;
569     return result;
570   }
571   // Forward pick to child policy.
572   result = child_picker_->Pick(args);
573   // If pick succeeded, add LB token to initial metadata.
574   if (result.type == PickResult::PICK_COMPLETE &&
575       result.connected_subchannel != nullptr) {
576     const grpc_arg* arg = grpc_channel_args_find(
577         result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
578     if (arg == nullptr) {
579       gpr_log(GPR_ERROR,
580               "[grpclb %p picker %p] No LB token for connected subchannel %p",
581               parent_, this, result.connected_subchannel.get());
582       abort();
583     }
584     grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
585     GPR_ASSERT(!GRPC_MDISNULL(lb_token));
586     grpc_linked_mdelem* mdelem_storage = static_cast<grpc_linked_mdelem*>(
587         args.call_state->Alloc(sizeof(grpc_linked_mdelem)));
588     GPR_ASSERT(grpc_metadata_batch_add_tail(
589                    args.initial_metadata, mdelem_storage,
590                    GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
591     GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
592         grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
593     if (client_stats != nullptr) {
594       client_stats->AddCallStarted();
595     }
596   }
597   return result;
598 }
599 
600 //
601 // GrpcLb::Helper
602 //
603 
CalledByPendingChild() const604 bool GrpcLb::Helper::CalledByPendingChild() const {
605   GPR_ASSERT(child_ != nullptr);
606   return child_ == parent_->pending_child_policy_.get();
607 }
608 
CalledByCurrentChild() const609 bool GrpcLb::Helper::CalledByCurrentChild() const {
610   GPR_ASSERT(child_ != nullptr);
611   return child_ == parent_->child_policy_.get();
612 }
613 
CreateSubchannel(const grpc_channel_args & args)614 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
615     const grpc_channel_args& args) {
616   if (parent_->shutting_down_ ||
617       (!CalledByPendingChild() && !CalledByCurrentChild())) {
618     return nullptr;
619   }
620   return parent_->channel_control_helper()->CreateSubchannel(args);
621 }
622 
CreateChannel(const char * target,const grpc_channel_args & args)623 grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
624                                             const grpc_channel_args& args) {
625   if (parent_->shutting_down_ ||
626       (!CalledByPendingChild() && !CalledByCurrentChild())) {
627     return nullptr;
628   }
629   return parent_->channel_control_helper()->CreateChannel(target, args);
630 }
631 
UpdateState(grpc_connectivity_state state,UniquePtr<SubchannelPicker> picker)632 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
633                                  UniquePtr<SubchannelPicker> picker) {
634   if (parent_->shutting_down_) return;
635   // If this request is from the pending child policy, ignore it until
636   // it reports READY, at which point we swap it into place.
637   if (CalledByPendingChild()) {
638     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
639       gpr_log(GPR_INFO,
640               "[grpclb %p helper %p] pending child policy %p reports state=%s",
641               parent_.get(), this, parent_->pending_child_policy_.get(),
642               grpc_connectivity_state_name(state));
643     }
644     if (state != GRPC_CHANNEL_READY) return;
645     grpc_pollset_set_del_pollset_set(
646         parent_->child_policy_->interested_parties(),
647         parent_->interested_parties());
648     parent_->child_policy_ = std::move(parent_->pending_child_policy_);
649   } else if (!CalledByCurrentChild()) {
650     // This request is from an outdated child, so ignore it.
651     return;
652   }
653   // Record whether child policy reports READY.
654   parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
655   // Enter fallback mode if needed.
656   parent_->MaybeEnterFallbackModeAfterStartup();
657   // There are three cases to consider here:
658   // 1. We're in fallback mode.  In this case, we're always going to use
659   //    the child policy's result, so we pass its picker through as-is.
660   // 2. The serverlist contains only drop entries.  In this case, we
661   //    want to use our own picker so that we can return the drops.
662   // 3. Not in fallback mode and serverlist is not all drops (i.e., it
663   //    may be empty or contain at least one backend address).  There are
664   //    two sub-cases:
665   //    a. The child policy is reporting state READY.  In this case, we wrap
666   //       the child's picker in our own, so that we can handle drops and LB
667   //       token metadata for each pick.
668   //    b. The child policy is reporting a state other than READY.  In this
669   //       case, we don't want to use our own picker, because we don't want
670   //       to process drops for picks that yield a QUEUE result; this would
671   //       result in dropping too many calls, since we will see the
672   //       queued picks multiple times, and we'd consider each one a
673   //       separate call for the drop calculation.
674   //
675   // Cases 1 and 3b: return picker from the child policy as-is.
676   if (parent_->serverlist_ == nullptr ||
677       (!parent_->serverlist_->ContainsAllDropEntries() &&
678        state != GRPC_CHANNEL_READY)) {
679     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
680       gpr_log(GPR_INFO,
681               "[grpclb %p helper %p] state=%s passing child picker %p as-is",
682               parent_.get(), this, grpc_connectivity_state_name(state),
683               picker.get());
684     }
685     parent_->channel_control_helper()->UpdateState(state, std::move(picker));
686     return;
687   }
688   // Cases 2 and 3a: wrap picker from the child in our own picker.
689   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
690     gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
691             parent_.get(), this, grpc_connectivity_state_name(state),
692             picker.get());
693   }
694   RefCountedPtr<GrpcLbClientStats> client_stats;
695   if (parent_->lb_calld_ != nullptr &&
696       parent_->lb_calld_->client_stats() != nullptr) {
697     client_stats = parent_->lb_calld_->client_stats()->Ref();
698   }
699   parent_->channel_control_helper()->UpdateState(
700       state, UniquePtr<SubchannelPicker>(
701                  New<Picker>(parent_.get(), parent_->serverlist_,
702                              std::move(picker), std::move(client_stats))));
703 }
704 
RequestReresolution()705 void GrpcLb::Helper::RequestReresolution() {
706   if (parent_->shutting_down_) return;
707   const LoadBalancingPolicy* latest_child_policy =
708       parent_->pending_child_policy_ != nullptr
709           ? parent_->pending_child_policy_.get()
710           : parent_->child_policy_.get();
711   if (child_ != latest_child_policy) return;
712   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
713     gpr_log(GPR_INFO,
714             "[grpclb %p] Re-resolution requested from %schild policy (%p).",
715             parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
716   }
717   // If we are talking to a balancer, we expect to get updated addresses
718   // from the balancer, so we can ignore the re-resolution request from
719   // the child policy. Otherwise, pass the re-resolution request up to the
720   // channel.
721   if (parent_->lb_calld_ == nullptr ||
722       !parent_->lb_calld_->seen_initial_response()) {
723     parent_->channel_control_helper()->RequestReresolution();
724   }
725 }
726 
AddTraceEvent(TraceSeverity severity,const char * message)727 void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
728                                    const char* message) {
729   if (parent_->shutting_down_ ||
730       (!CalledByPendingChild() && !CalledByCurrentChild())) {
731     return;
732   }
733   parent_->channel_control_helper()->AddTraceEvent(severity, message);
734 }
735 
736 //
737 // GrpcLb::BalancerCallState
738 //
739 
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)740 GrpcLb::BalancerCallState::BalancerCallState(
741     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
742     : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
743       grpclb_policy_(std::move(parent_grpclb_policy)) {
744   GPR_ASSERT(grpclb_policy_ != nullptr);
745   GPR_ASSERT(!grpclb_policy()->shutting_down_);
746   // Init the LB call. Note that the LB call will progress every time there's
747   // activity in grpclb_policy_->interested_parties(), which is comprised of
748   // the polling entities from client_channel.
749   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
750   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
751   const grpc_millis deadline =
752       grpclb_policy()->lb_call_timeout_ms_ == 0
753           ? GRPC_MILLIS_INF_FUTURE
754           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
755   lb_call_ = grpc_channel_create_pollset_set_call(
756       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
757       grpclb_policy_->interested_parties(),
758       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
759       nullptr, deadline, nullptr);
760   // Init the LB call request payload.
761   grpc_grpclb_request* request =
762       grpc_grpclb_request_create(grpclb_policy()->server_name_);
763   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
764   send_message_payload_ =
765       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
766   grpc_slice_unref_internal(request_payload_slice);
767   grpc_grpclb_request_destroy(request);
768   // Init other data associated with the LB call.
769   grpc_metadata_array_init(&lb_initial_metadata_recv_);
770   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
771   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
772                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
773   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
774                     OnBalancerMessageReceivedLocked, this,
775                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
776   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
777                     OnBalancerStatusReceivedLocked, this,
778                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
779 }
780 
~BalancerCallState()781 GrpcLb::BalancerCallState::~BalancerCallState() {
782   GPR_ASSERT(lb_call_ != nullptr);
783   grpc_call_unref(lb_call_);
784   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
785   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
786   grpc_byte_buffer_destroy(send_message_payload_);
787   grpc_byte_buffer_destroy(recv_message_payload_);
788   grpc_slice_unref_internal(lb_call_status_details_);
789 }
790 
Orphan()791 void GrpcLb::BalancerCallState::Orphan() {
792   GPR_ASSERT(lb_call_ != nullptr);
793   // If we are here because grpclb_policy wants to cancel the call,
794   // lb_on_balancer_status_received_ will complete the cancellation and clean
795   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
796   // call, then the following cancellation will be a no-op.
797   grpc_call_cancel(lb_call_, nullptr);
798   if (client_load_report_timer_callback_pending_) {
799     grpc_timer_cancel(&client_load_report_timer_);
800   }
801   // Note that the initial ref is hold by lb_on_balancer_status_received_
802   // instead of the caller of this function. So the corresponding unref happens
803   // in lb_on_balancer_status_received_ instead of here.
804 }
805 
StartQuery()806 void GrpcLb::BalancerCallState::StartQuery() {
807   GPR_ASSERT(lb_call_ != nullptr);
808   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
809     gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
810             grpclb_policy_.get(), this, lb_call_);
811   }
812   // Create the ops.
813   grpc_call_error call_error;
814   grpc_op ops[3];
815   memset(ops, 0, sizeof(ops));
816   // Op: send initial metadata.
817   grpc_op* op = ops;
818   op->op = GRPC_OP_SEND_INITIAL_METADATA;
819   op->data.send_initial_metadata.count = 0;
820   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
821               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
822   op->reserved = nullptr;
823   op++;
824   // Op: send request message.
825   GPR_ASSERT(send_message_payload_ != nullptr);
826   op->op = GRPC_OP_SEND_MESSAGE;
827   op->data.send_message.send_message = send_message_payload_;
828   op->flags = 0;
829   op->reserved = nullptr;
830   op++;
831   // TODO(roth): We currently track this ref manually.  Once the
832   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
833   // with the callback.
834   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
835   self.release();
836   call_error = grpc_call_start_batch_and_execute(
837       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
838   GPR_ASSERT(GRPC_CALL_OK == call_error);
839   // Op: recv initial metadata.
840   op = ops;
841   op->op = GRPC_OP_RECV_INITIAL_METADATA;
842   op->data.recv_initial_metadata.recv_initial_metadata =
843       &lb_initial_metadata_recv_;
844   op->flags = 0;
845   op->reserved = nullptr;
846   op++;
847   // Op: recv response.
848   op->op = GRPC_OP_RECV_MESSAGE;
849   op->data.recv_message.recv_message = &recv_message_payload_;
850   op->flags = 0;
851   op->reserved = nullptr;
852   op++;
853   // TODO(roth): We currently track this ref manually.  Once the
854   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
855   // with the callback.
856   self = Ref(DEBUG_LOCATION, "on_message_received");
857   self.release();
858   call_error = grpc_call_start_batch_and_execute(
859       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
860   GPR_ASSERT(GRPC_CALL_OK == call_error);
861   // Op: recv server status.
862   op = ops;
863   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
864   op->data.recv_status_on_client.trailing_metadata =
865       &lb_trailing_metadata_recv_;
866   op->data.recv_status_on_client.status = &lb_call_status_;
867   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
868   op->flags = 0;
869   op->reserved = nullptr;
870   op++;
871   // This callback signals the end of the LB call, so it relies on the initial
872   // ref instead of a new ref. When it's invoked, it's the initial ref that is
873   // unreffed.
874   call_error = grpc_call_start_batch_and_execute(
875       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
876   GPR_ASSERT(GRPC_CALL_OK == call_error);
877 }
878 
ScheduleNextClientLoadReportLocked()879 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
880   const grpc_millis next_client_load_report_time =
881       ExecCtx::Get()->Now() + client_stats_report_interval_;
882   GRPC_CLOSURE_INIT(&client_load_report_closure_,
883                     MaybeSendClientLoadReportLocked, this,
884                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
885   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
886                   &client_load_report_closure_);
887   client_load_report_timer_callback_pending_ = true;
888 }
889 
MaybeSendClientLoadReportLocked(void * arg,grpc_error * error)890 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
891     void* arg, grpc_error* error) {
892   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
893   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
894   lb_calld->client_load_report_timer_callback_pending_ = false;
895   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
896     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
897     return;
898   }
899   // If we've already sent the initial request, then we can go ahead and send
900   // the load report. Otherwise, we need to wait until the initial request has
901   // been sent to send this (see OnInitialRequestSentLocked()).
902   if (lb_calld->send_message_payload_ == nullptr) {
903     lb_calld->SendClientLoadReportLocked();
904   } else {
905     lb_calld->client_load_report_is_due_ = true;
906   }
907 }
908 
LoadReportCountersAreZero(grpc_grpclb_request * request)909 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
910     grpc_grpclb_request* request) {
911   GrpcLbClientStats::DroppedCallCounts* drop_entries =
912       static_cast<GrpcLbClientStats::DroppedCallCounts*>(
913           request->client_stats.calls_finished_with_drop.arg);
914   return request->client_stats.num_calls_started == 0 &&
915          request->client_stats.num_calls_finished == 0 &&
916          request->client_stats.num_calls_finished_with_client_failed_to_send ==
917              0 &&
918          request->client_stats.num_calls_finished_known_received == 0 &&
919          (drop_entries == nullptr || drop_entries->size() == 0);
920 }
921 
SendClientLoadReportLocked()922 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
923   // Construct message payload.
924   GPR_ASSERT(send_message_payload_ == nullptr);
925   grpc_grpclb_request* request =
926       grpc_grpclb_load_report_request_create(client_stats_.get());
927   // Skip client load report if the counters were all zero in the last
928   // report and they are still zero in this one.
929   if (LoadReportCountersAreZero(request)) {
930     if (last_client_load_report_counters_were_zero_) {
931       grpc_grpclb_request_destroy(request);
932       ScheduleNextClientLoadReportLocked();
933       return;
934     }
935     last_client_load_report_counters_were_zero_ = true;
936   } else {
937     last_client_load_report_counters_were_zero_ = false;
938   }
939   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
940   send_message_payload_ =
941       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
942   grpc_slice_unref_internal(request_payload_slice);
943   grpc_grpclb_request_destroy(request);
944   // Send the report.
945   grpc_op op;
946   memset(&op, 0, sizeof(op));
947   op.op = GRPC_OP_SEND_MESSAGE;
948   op.data.send_message.send_message = send_message_payload_;
949   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
950                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
951   grpc_call_error call_error = grpc_call_start_batch_and_execute(
952       lb_call_, &op, 1, &client_load_report_closure_);
953   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
954     gpr_log(GPR_ERROR,
955             "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
956             grpclb_policy_.get(), this, call_error);
957     GPR_ASSERT(GRPC_CALL_OK == call_error);
958   }
959 }
960 
ClientLoadReportDoneLocked(void * arg,grpc_error * error)961 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
962                                                            grpc_error* error) {
963   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
964   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
965   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
966   lb_calld->send_message_payload_ = nullptr;
967   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
968     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
969     return;
970   }
971   lb_calld->ScheduleNextClientLoadReportLocked();
972 }
973 
OnInitialRequestSentLocked(void * arg,grpc_error * error)974 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
975                                                            grpc_error* error) {
976   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
977   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
978   lb_calld->send_message_payload_ = nullptr;
979   // If we attempted to send a client load report before the initial request was
980   // sent (and this lb_calld is still in use), send the load report now.
981   if (lb_calld->client_load_report_is_due_ &&
982       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
983     lb_calld->SendClientLoadReportLocked();
984     lb_calld->client_load_report_is_due_ = false;
985   }
986   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
987 }
988 
OnBalancerMessageReceivedLocked(void * arg,grpc_error * error)989 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
990     void* arg, grpc_error* error) {
991   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
992   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
993   // Null payload means the LB call was cancelled.
994   if (lb_calld != grpclb_policy->lb_calld_.get() ||
995       lb_calld->recv_message_payload_ == nullptr) {
996     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
997     return;
998   }
999   grpc_byte_buffer_reader bbr;
1000   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
1001   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1002   grpc_byte_buffer_reader_destroy(&bbr);
1003   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
1004   lb_calld->recv_message_payload_ = nullptr;
1005   grpc_grpclb_initial_response* initial_response;
1006   grpc_grpclb_serverlist* serverlist;
1007   if (!lb_calld->seen_initial_response_ &&
1008       (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
1009           nullptr) {
1010     // Have NOT seen initial response, look for initial response.
1011     if (initial_response->has_client_stats_report_interval) {
1012       lb_calld->client_stats_report_interval_ = GPR_MAX(
1013           GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
1014                               &initial_response->client_stats_report_interval));
1015       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1016         gpr_log(GPR_INFO,
1017                 "[grpclb %p] lb_calld=%p: Received initial LB response "
1018                 "message; client load reporting interval = %" PRId64
1019                 " milliseconds",
1020                 grpclb_policy, lb_calld,
1021                 lb_calld->client_stats_report_interval_);
1022       }
1023     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1024       gpr_log(GPR_INFO,
1025               "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1026               "client load reporting NOT enabled",
1027               grpclb_policy, lb_calld);
1028     }
1029     grpc_grpclb_initial_response_destroy(initial_response);
1030     lb_calld->seen_initial_response_ = true;
1031   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1032                   response_slice)) != nullptr) {
1033     // Have seen initial response, look for serverlist.
1034     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1035     auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1036     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1037       UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1038       gpr_log(GPR_INFO,
1039               "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1040               " servers received:\n%s",
1041               grpclb_policy, lb_calld, serverlist->num_servers,
1042               serverlist_text.get());
1043     }
1044     lb_calld->seen_serverlist_ = true;
1045     // Start sending client load report only after we start using the
1046     // serverlist returned from the current LB call.
1047     if (lb_calld->client_stats_report_interval_ > 0 &&
1048         lb_calld->client_stats_ == nullptr) {
1049       lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1050       // Ref held by callback.
1051       lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1052       lb_calld->ScheduleNextClientLoadReportLocked();
1053     }
1054     // Check if the serverlist differs from the previous one.
1055     if (grpclb_policy->serverlist_ != nullptr &&
1056         *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1057       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1058         gpr_log(GPR_INFO,
1059                 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1060                 "current, ignoring.",
1061                 grpclb_policy, lb_calld);
1062       }
1063     } else {  // New serverlist.
1064       // Dispose of the fallback.
1065       // TODO(roth): Ideally, we should stay in fallback mode until we
1066       // know that we can reach at least one of the backends in the new
1067       // serverlist.  Unfortunately, we can't do that, since we need to
1068       // send the new addresses to the child policy in order to determine
1069       // if they are reachable, and if we don't exit fallback mode now,
1070       // CreateOrUpdateChildPolicyLocked() will use the fallback
1071       // addresses instead of the addresses from the new serverlist.
1072       // However, if we can't reach any of the servers in the new
1073       // serverlist, then the child policy will never switch away from
1074       // the fallback addresses, but the grpclb policy will still think
1075       // that we're not in fallback mode, which means that we won't send
1076       // updates to the child policy when the fallback addresses are
1077       // updated by the resolver.  This is sub-optimal, but the only way
1078       // to fix it is to maintain a completely separate child policy for
1079       // fallback mode, and that's more work than we want to put into
1080       // the grpclb implementation at this point, since we're deprecating
1081       // it in favor of the xds policy.  We will implement this the
1082       // right way in the xds policy instead.
1083       if (grpclb_policy->fallback_mode_) {
1084         gpr_log(GPR_INFO,
1085                 "[grpclb %p] Received response from balancer; exiting "
1086                 "fallback mode",
1087                 grpclb_policy);
1088         grpclb_policy->fallback_mode_ = false;
1089       }
1090       if (grpclb_policy->fallback_at_startup_checks_pending_) {
1091         grpclb_policy->fallback_at_startup_checks_pending_ = false;
1092         grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1093         grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1094       }
1095       // Update the serverlist in the GrpcLb instance. This serverlist
1096       // instance will be destroyed either upon the next update or when the
1097       // GrpcLb instance is destroyed.
1098       grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1099       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1100     }
1101   } else {
1102     // No valid initial response or serverlist found.
1103     char* response_slice_str =
1104         grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1105     gpr_log(GPR_ERROR,
1106             "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1107             "Ignoring.",
1108             grpclb_policy, lb_calld, response_slice_str);
1109     gpr_free(response_slice_str);
1110   }
1111   grpc_slice_unref_internal(response_slice);
1112   if (!grpclb_policy->shutting_down_) {
1113     // Keep listening for serverlist updates.
1114     grpc_op op;
1115     memset(&op, 0, sizeof(op));
1116     op.op = GRPC_OP_RECV_MESSAGE;
1117     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1118     op.flags = 0;
1119     op.reserved = nullptr;
1120     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1121     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1122         lb_calld->lb_call_, &op, 1,
1123         &lb_calld->lb_on_balancer_message_received_);
1124     GPR_ASSERT(GRPC_CALL_OK == call_error);
1125   } else {
1126     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1127   }
1128 }
1129 
OnBalancerStatusReceivedLocked(void * arg,grpc_error * error)1130 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1131     void* arg, grpc_error* error) {
1132   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1133   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1134   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1135   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1136     char* status_details =
1137         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1138     gpr_log(GPR_INFO,
1139             "[grpclb %p] lb_calld=%p: Status from LB server received. "
1140             "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1141             grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1142             lb_calld->lb_call_, grpc_error_string(error));
1143     gpr_free(status_details);
1144   }
1145   // If this lb_calld is still in use, this call ended because of a failure so
1146   // we want to retry connecting. Otherwise, we have deliberately ended this
1147   // call and no further action is required.
1148   if (lb_calld == grpclb_policy->lb_calld_.get()) {
1149     // If the fallback-at-startup checks are pending, go into fallback mode
1150     // immediately.  This short-circuits the timeout for the fallback-at-startup
1151     // case.
1152     if (grpclb_policy->fallback_at_startup_checks_pending_) {
1153       GPR_ASSERT(!lb_calld->seen_serverlist_);
1154       gpr_log(GPR_INFO,
1155               "[grpclb %p] Balancer call finished without receiving "
1156               "serverlist; entering fallback mode",
1157               grpclb_policy);
1158       grpclb_policy->fallback_at_startup_checks_pending_ = false;
1159       grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1160       grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1161       grpclb_policy->fallback_mode_ = true;
1162       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1163     } else {
1164       // This handles the fallback-after-startup case.
1165       grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1166     }
1167     grpclb_policy->lb_calld_.reset();
1168     GPR_ASSERT(!grpclb_policy->shutting_down_);
1169     grpclb_policy->channel_control_helper()->RequestReresolution();
1170     if (lb_calld->seen_initial_response_) {
1171       // If we lose connection to the LB server, reset the backoff and restart
1172       // the LB call immediately.
1173       grpclb_policy->lb_call_backoff_.Reset();
1174       grpclb_policy->StartBalancerCallLocked();
1175     } else {
1176       // If this LB call fails establishing any connection to the LB server,
1177       // retry later.
1178       grpclb_policy->StartBalancerCallRetryTimerLocked();
1179     }
1180   }
1181   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1182 }
1183 
1184 //
1185 // helper code for creating balancer channel
1186 //
1187 
ExtractBalancerAddresses(const ServerAddressList & addresses)1188 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1189   ServerAddressList balancer_addresses;
1190   for (size_t i = 0; i < addresses.size(); ++i) {
1191     if (addresses[i].IsBalancer()) {
1192       // Strip out the is_balancer channel arg, since we don't want to
1193       // recursively use the grpclb policy in the channel used to talk to
1194       // the balancers.  Note that we do NOT strip out the balancer_name
1195       // channel arg, since we need that to set the authority correctly
1196       // to talk to the balancers.
1197       static const char* args_to_remove[] = {
1198           GRPC_ARG_ADDRESS_IS_BALANCER,
1199       };
1200       balancer_addresses.emplace_back(
1201           addresses[i].address(),
1202           grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1203                                             GPR_ARRAY_SIZE(args_to_remove)));
1204     }
1205   }
1206   return balancer_addresses;
1207 }
1208 
1209 /* Returns the channel args for the LB channel, used to create a bidirectional
1210  * stream for the reception of load balancing updates.
1211  *
1212  * Inputs:
1213  *   - \a addresses: corresponding to the balancers.
1214  *   - \a response_generator: in order to propagate updates from the resolver
1215  *   above the grpclb policy.
1216  *   - \a args: other args inherited from the grpclb policy. */
BuildBalancerChannelArgs(const ServerAddressList & addresses,FakeResolverResponseGenerator * response_generator,const grpc_channel_args * args)1217 grpc_channel_args* BuildBalancerChannelArgs(
1218     const ServerAddressList& addresses,
1219     FakeResolverResponseGenerator* response_generator,
1220     const grpc_channel_args* args) {
1221   // Channel args to remove.
1222   static const char* args_to_remove[] = {
1223       // LB policy name, since we want to use the default (pick_first) in
1224       // the LB channel.
1225       GRPC_ARG_LB_POLICY_NAME,
1226       // Strip out the service config, since we don't want the LB policy
1227       // config specified for the parent channel to affect the LB channel.
1228       GRPC_ARG_SERVICE_CONFIG,
1229       // The channel arg for the server URI, since that will be different for
1230       // the LB channel than for the parent channel.  The client channel
1231       // factory will re-add this arg with the right value.
1232       GRPC_ARG_SERVER_URI,
1233       // The fake resolver response generator, because we are replacing it
1234       // with the one from the grpclb policy, used to propagate updates to
1235       // the LB channel.
1236       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1237       // The LB channel should use the authority indicated by the target
1238       // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
1239       // as opposed to the authority from the parent channel.
1240       GRPC_ARG_DEFAULT_AUTHORITY,
1241       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1242       // treated as a stand-alone channel and not inherit this argument from the
1243       // args of the parent channel.
1244       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1245       // Don't want to pass down channelz node from parent; the balancer
1246       // channel will get its own.
1247       GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1248   };
1249   // Channel args to add.
1250   InlinedVector<grpc_arg, 3> args_to_add;
1251   // The fake resolver response generator, which we use to inject
1252   // address updates into the LB channel.
1253   args_to_add.emplace_back(
1254       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1255           response_generator));
1256   // A channel arg indicating the target is a grpclb load balancer.
1257   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1258       const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1));
1259   // The parent channel's channelz uuid.
1260   channelz::ChannelNode* channelz_node = nullptr;
1261   const grpc_arg* arg =
1262       grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1263   if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
1264       arg->value.pointer.p != nullptr) {
1265     channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1266     args_to_add.emplace_back(
1267         channelz::MakeParentUuidArg(channelz_node->uuid()));
1268   }
1269   // Construct channel args.
1270   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1271       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1272       args_to_add.size());
1273   // Make any necessary modifications for security.
1274   return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
1275 }
1276 
1277 //
1278 // ctor and dtor
1279 //
1280 
GrpcLb(Args args)1281 GrpcLb::GrpcLb(Args args)
1282     : LoadBalancingPolicy(std::move(args)),
1283       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1284       lb_call_backoff_(
1285           BackOff::Options()
1286               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1287                                    1000)
1288               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1289               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1290               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1291                                1000)) {
1292   // Initialization.
1293   GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1294                     grpc_combiner_scheduler(combiner()));
1295   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1296                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1297                     grpc_combiner_scheduler(args.combiner));
1298   // Record server name.
1299   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1300   const char* server_uri = grpc_channel_arg_get_string(arg);
1301   GPR_ASSERT(server_uri != nullptr);
1302   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1303   GPR_ASSERT(uri->path[0] != '\0');
1304   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1305   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1306     gpr_log(GPR_INFO,
1307             "[grpclb %p] Will use '%s' as the server name for LB request.",
1308             this, server_name_);
1309   }
1310   grpc_uri_destroy(uri);
1311   // Record LB call timeout.
1312   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1313   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1314   // Record fallback-at-startup timeout.
1315   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1316   fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1317       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1318 }
1319 
~GrpcLb()1320 GrpcLb::~GrpcLb() {
1321   gpr_free((void*)server_name_);
1322   grpc_channel_args_destroy(args_);
1323 }
1324 
ShutdownLocked()1325 void GrpcLb::ShutdownLocked() {
1326   shutting_down_ = true;
1327   lb_calld_.reset();
1328   if (retry_timer_callback_pending_) {
1329     grpc_timer_cancel(&lb_call_retry_timer_);
1330   }
1331   if (fallback_at_startup_checks_pending_) {
1332     grpc_timer_cancel(&lb_fallback_timer_);
1333     CancelBalancerChannelConnectivityWatchLocked();
1334   }
1335   if (child_policy_ != nullptr) {
1336     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1337                                      interested_parties());
1338   }
1339   if (pending_child_policy_ != nullptr) {
1340     grpc_pollset_set_del_pollset_set(
1341         pending_child_policy_->interested_parties(), interested_parties());
1342   }
1343   child_policy_.reset();
1344   pending_child_policy_.reset();
1345   // We destroy the LB channel here instead of in our destructor because
1346   // destroying the channel triggers a last callback to
1347   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1348   // alive when that callback is invoked.
1349   if (lb_channel_ != nullptr) {
1350     grpc_channel_destroy(lb_channel_);
1351     lb_channel_ = nullptr;
1352   }
1353 }
1354 
1355 //
1356 // public methods
1357 //
1358 
ResetBackoffLocked()1359 void GrpcLb::ResetBackoffLocked() {
1360   if (lb_channel_ != nullptr) {
1361     grpc_channel_reset_connect_backoff(lb_channel_);
1362   }
1363   if (child_policy_ != nullptr) {
1364     child_policy_->ResetBackoffLocked();
1365   }
1366   if (pending_child_policy_ != nullptr) {
1367     pending_child_policy_->ResetBackoffLocked();
1368   }
1369 }
1370 
UpdateLocked(UpdateArgs args)1371 void GrpcLb::UpdateLocked(UpdateArgs args) {
1372   const bool is_initial_update = lb_channel_ == nullptr;
1373   auto* grpclb_config =
1374       static_cast<const ParsedGrpcLbConfig*>(args.config.get());
1375   if (grpclb_config != nullptr) {
1376     child_policy_config_ = grpclb_config->child_policy();
1377   } else {
1378     child_policy_config_ = nullptr;
1379   }
1380   ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1381   // Update the existing child policy.
1382   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1383   // If this is the initial update, start the fallback-at-startup checks
1384   // and the balancer call.
1385   if (is_initial_update) {
1386     fallback_at_startup_checks_pending_ = true;
1387     // Start timer.
1388     grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1389     Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Ref for callback
1390     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1391     // Start watching the channel's connectivity state.  If the channel
1392     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1393     // fallback mode even if the fallback timeout has not elapsed.
1394     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1395         grpc_channel_get_channel_stack(lb_channel_));
1396     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1397     // Ref held by callback.
1398     Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1399     grpc_client_channel_watch_connectivity_state(
1400         client_channel_elem,
1401         grpc_polling_entity_create_from_pollset_set(interested_parties()),
1402         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1403         nullptr);
1404     // Start balancer call.
1405     StartBalancerCallLocked();
1406   }
1407 }
1408 
1409 //
1410 // helpers for UpdateLocked()
1411 //
1412 
1413 // Returns the backend addresses extracted from the given addresses.
ExtractBackendAddresses(const ServerAddressList & addresses)1414 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1415   void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
1416   grpc_arg arg = grpc_channel_arg_pointer_create(
1417       const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
1418       &lb_token_arg_vtable);
1419   ServerAddressList backend_addresses;
1420   for (size_t i = 0; i < addresses.size(); ++i) {
1421     if (!addresses[i].IsBalancer()) {
1422       backend_addresses.emplace_back(
1423           addresses[i].address(),
1424           grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1425     }
1426   }
1427   return backend_addresses;
1428 }
1429 
ProcessAddressesAndChannelArgsLocked(const ServerAddressList & addresses,const grpc_channel_args & args)1430 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1431     const ServerAddressList& addresses, const grpc_channel_args& args) {
1432   // Update fallback address list.
1433   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1434   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1435   // since we use this to trigger the client_load_reporting filter.
1436   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1437   grpc_arg new_arg = grpc_channel_arg_string_create(
1438       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1439   grpc_channel_args_destroy(args_);
1440   args_ = grpc_channel_args_copy_and_add_and_remove(
1441       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1442   // Construct args for balancer channel.
1443   ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1444   grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1445       balancer_addresses, response_generator_.get(), &args);
1446   // Create balancer channel if needed.
1447   if (lb_channel_ == nullptr) {
1448     char* uri_str;
1449     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1450     lb_channel_ =
1451         channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
1452     GPR_ASSERT(lb_channel_ != nullptr);
1453     gpr_free(uri_str);
1454   }
1455   // Propagate updates to the LB channel (pick_first) through the fake
1456   // resolver.
1457   Resolver::Result result;
1458   result.addresses = std::move(balancer_addresses);
1459   result.args = lb_channel_args;
1460   response_generator_->SetResponse(std::move(result));
1461 }
1462 
OnBalancerChannelConnectivityChangedLocked(void * arg,grpc_error * error)1463 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1464                                                         grpc_error* error) {
1465   GrpcLb* self = static_cast<GrpcLb*>(arg);
1466   if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1467     if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1468       // Not in TRANSIENT_FAILURE.  Renew connectivity watch.
1469       grpc_channel_element* client_channel_elem =
1470           grpc_channel_stack_last_element(
1471               grpc_channel_get_channel_stack(self->lb_channel_));
1472       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1473       grpc_client_channel_watch_connectivity_state(
1474           client_channel_elem,
1475           grpc_polling_entity_create_from_pollset_set(
1476               self->interested_parties()),
1477           &self->lb_channel_connectivity_,
1478           &self->lb_channel_on_connectivity_changed_, nullptr);
1479       return;  // Early out so we don't drop the ref below.
1480     }
1481     // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
1482     // fallback mode immediately.
1483     gpr_log(GPR_INFO,
1484             "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1485             "entering fallback mode",
1486             self);
1487     self->fallback_at_startup_checks_pending_ = false;
1488     grpc_timer_cancel(&self->lb_fallback_timer_);
1489     self->fallback_mode_ = true;
1490     self->CreateOrUpdateChildPolicyLocked();
1491   }
1492   // Done watching connectivity state, so drop ref.
1493   self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1494 }
1495 
CancelBalancerChannelConnectivityWatchLocked()1496 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1497   grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1498       grpc_channel_get_channel_stack(lb_channel_));
1499   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1500   grpc_client_channel_watch_connectivity_state(
1501       client_channel_elem,
1502       grpc_polling_entity_create_from_pollset_set(interested_parties()),
1503       nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1504 }
1505 
1506 //
1507 // code for balancer channel and call
1508 //
1509 
StartBalancerCallLocked()1510 void GrpcLb::StartBalancerCallLocked() {
1511   GPR_ASSERT(lb_channel_ != nullptr);
1512   if (shutting_down_) return;
1513   // Init the LB call data.
1514   GPR_ASSERT(lb_calld_ == nullptr);
1515   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1516   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1517     gpr_log(GPR_INFO,
1518             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1519             this, lb_channel_, lb_calld_.get());
1520   }
1521   lb_calld_->StartQuery();
1522 }
1523 
StartBalancerCallRetryTimerLocked()1524 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1525   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1526   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1527     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1528     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1529     if (timeout > 0) {
1530       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1531               this, timeout);
1532     } else {
1533       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1534               this);
1535     }
1536   }
1537   // TODO(roth): We currently track this ref manually.  Once the
1538   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1539   // with the callback.
1540   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1541   self.release();
1542   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1543                     this, grpc_combiner_scheduler(combiner()));
1544   retry_timer_callback_pending_ = true;
1545   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1546 }
1547 
OnBalancerCallRetryTimerLocked(void * arg,grpc_error * error)1548 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1549   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1550   grpclb_policy->retry_timer_callback_pending_ = false;
1551   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1552       grpclb_policy->lb_calld_ == nullptr) {
1553     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1554       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1555               grpclb_policy);
1556     }
1557     grpclb_policy->StartBalancerCallLocked();
1558   }
1559   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1560 }
1561 
1562 //
1563 // code for handling fallback mode
1564 //
1565 
MaybeEnterFallbackModeAfterStartup()1566 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1567   // Enter fallback mode if all of the following are true:
1568   // - We are not currently in fallback mode.
1569   // - We are not currently waiting for the initial fallback timeout.
1570   // - We are not currently in contact with the balancer.
1571   // - The child policy is not in state READY.
1572   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1573       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1574       !child_policy_ready_) {
1575     gpr_log(GPR_INFO,
1576             "[grpclb %p] lost contact with balancer and backends from "
1577             "most recent serverlist; entering fallback mode",
1578             this);
1579     fallback_mode_ = true;
1580     CreateOrUpdateChildPolicyLocked();
1581   }
1582 }
1583 
OnFallbackTimerLocked(void * arg,grpc_error * error)1584 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1585   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1586   // If we receive a serverlist after the timer fires but before this callback
1587   // actually runs, don't fall back.
1588   if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1589       !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1590     gpr_log(GPR_INFO,
1591             "[grpclb %p] No response from balancer after fallback timeout; "
1592             "entering fallback mode",
1593             grpclb_policy);
1594     grpclb_policy->fallback_at_startup_checks_pending_ = false;
1595     grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1596     grpclb_policy->fallback_mode_ = true;
1597     grpclb_policy->CreateOrUpdateChildPolicyLocked();
1598   }
1599   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1600 }
1601 
1602 //
1603 // code for interacting with the child policy
1604 //
1605 
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1606 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1607     bool is_backend_from_grpclb_load_balancer) {
1608   InlinedVector<grpc_arg, 2> args_to_add;
1609   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1610       const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1611       is_backend_from_grpclb_load_balancer));
1612   if (is_backend_from_grpclb_load_balancer) {
1613     args_to_add.emplace_back(grpc_channel_arg_integer_create(
1614         const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
1615   }
1616   return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
1617                                         args_to_add.size());
1618 }
1619 
CreateChildPolicyLocked(const char * name,const grpc_channel_args * args)1620 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1621     const char* name, const grpc_channel_args* args) {
1622   Helper* helper = New<Helper>(Ref());
1623   LoadBalancingPolicy::Args lb_policy_args;
1624   lb_policy_args.combiner = combiner();
1625   lb_policy_args.args = args;
1626   lb_policy_args.channel_control_helper =
1627       UniquePtr<ChannelControlHelper>(helper);
1628   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1629       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1630           name, std::move(lb_policy_args));
1631   if (GPR_UNLIKELY(lb_policy == nullptr)) {
1632     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1633             name);
1634     return nullptr;
1635   }
1636   helper->set_child(lb_policy.get());
1637   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1638     gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1639             name, lb_policy.get());
1640   }
1641   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1642   // created child policy. This will make the child policy progress upon
1643   // activity on gRPC LB, which in turn is tied to the application's call.
1644   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1645                                    interested_parties());
1646   return lb_policy;
1647 }
1648 
CreateOrUpdateChildPolicyLocked()1649 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1650   if (shutting_down_) return;
1651   // Construct update args.
1652   UpdateArgs update_args;
1653   bool is_backend_from_grpclb_load_balancer = false;
1654   if (fallback_mode_) {
1655     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1656     // received any serverlist from the balancer, we use the fallback backends
1657     // returned by the resolver. Note that the fallback backend list may be
1658     // empty, in which case the new round_robin policy will keep the requested
1659     // picks pending.
1660     update_args.addresses = fallback_backend_addresses_;
1661   } else {
1662     update_args.addresses = serverlist_->GetServerAddressList(
1663         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1664     is_backend_from_grpclb_load_balancer = true;
1665   }
1666   update_args.args =
1667       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1668   GPR_ASSERT(update_args.args != nullptr);
1669   update_args.config = child_policy_config_;
1670   // If the child policy name changes, we need to create a new child
1671   // policy.  When this happens, we leave child_policy_ as-is and store
1672   // the new child policy in pending_child_policy_.  Once the new child
1673   // policy transitions into state READY, we swap it into child_policy_,
1674   // replacing the original child policy.  So pending_child_policy_ is
1675   // non-null only between when we apply an update that changes the child
1676   // policy name and when the new child reports state READY.
1677   //
1678   // Updates can arrive at any point during this transition.  We always
1679   // apply updates relative to the most recently created child policy,
1680   // even if the most recent one is still in pending_child_policy_.  This
1681   // is true both when applying the updates to an existing child policy
1682   // and when determining whether we need to create a new policy.
1683   //
1684   // As a result of this, there are several cases to consider here:
1685   //
1686   // 1. We have no existing child policy (i.e., we have started up but
1687   //    have not yet received a serverlist from the balancer or gone
1688   //    into fallback mode; in this case, both child_policy_ and
1689   //    pending_child_policy_ are null).  In this case, we create a
1690   //    new child policy and store it in child_policy_.
1691   //
1692   // 2. We have an existing child policy and have no pending child policy
1693   //    from a previous update (i.e., either there has not been a
1694   //    previous update that changed the policy name, or we have already
1695   //    finished swapping in the new policy; in this case, child_policy_
1696   //    is non-null but pending_child_policy_ is null).  In this case:
1697   //    a. If child_policy_->name() equals child_policy_name, then we
1698   //       update the existing child policy.
1699   //    b. If child_policy_->name() does not equal child_policy_name,
1700   //       we create a new policy.  The policy will be stored in
1701   //       pending_child_policy_ and will later be swapped into
1702   //       child_policy_ by the helper when the new child transitions
1703   //       into state READY.
1704   //
1705   // 3. We have an existing child policy and have a pending child policy
1706   //    from a previous update (i.e., a previous update set
1707   //    pending_child_policy_ as per case 2b above and that policy has
1708   //    not yet transitioned into state READY and been swapped into
1709   //    child_policy_; in this case, both child_policy_ and
1710   //    pending_child_policy_ are non-null).  In this case:
1711   //    a. If pending_child_policy_->name() equals child_policy_name,
1712   //       then we update the existing pending child policy.
1713   //    b. If pending_child_policy->name() does not equal
1714   //       child_policy_name, then we create a new policy.  The new
1715   //       policy is stored in pending_child_policy_ (replacing the one
1716   //       that was there before, which will be immediately shut down)
1717   //       and will later be swapped into child_policy_ by the helper
1718   //       when the new child transitions into state READY.
1719   const char* child_policy_name = child_policy_config_ == nullptr
1720                                       ? "round_robin"
1721                                       : child_policy_config_->name();
1722   const bool create_policy =
1723       // case 1
1724       child_policy_ == nullptr ||
1725       // case 2b
1726       (pending_child_policy_ == nullptr &&
1727        strcmp(child_policy_->name(), child_policy_name) != 0) ||
1728       // case 3b
1729       (pending_child_policy_ != nullptr &&
1730        strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1731   LoadBalancingPolicy* policy_to_update = nullptr;
1732   if (create_policy) {
1733     // Cases 1, 2b, and 3b: create a new child policy.
1734     // If child_policy_ is null, we set it (case 1), else we set
1735     // pending_child_policy_ (cases 2b and 3b).
1736     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1737       gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1738               child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1739     }
1740     // Swap the policy into place.
1741     auto& lb_policy =
1742         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1743     lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
1744     policy_to_update = lb_policy.get();
1745   } else {
1746     // Cases 2a and 3a: update an existing policy.
1747     // If we have a pending child policy, send the update to the pending
1748     // policy (case 3a), else send it to the current policy (case 2a).
1749     policy_to_update = pending_child_policy_ != nullptr
1750                            ? pending_child_policy_.get()
1751                            : child_policy_.get();
1752   }
1753   GPR_ASSERT(policy_to_update != nullptr);
1754   // Update the policy.
1755   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1756     gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1757             policy_to_update == pending_child_policy_.get() ? "pending " : "",
1758             policy_to_update);
1759   }
1760   policy_to_update->UpdateLocked(std::move(update_args));
1761 }
1762 
1763 //
1764 // factory
1765 //
1766 
1767 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1768  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1769   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1770       LoadBalancingPolicy::Args args) const override {
1771     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1772   }
1773 
name() const1774   const char* name() const override { return kGrpclb; }
1775 
ParseLoadBalancingConfig(const grpc_json * json,grpc_error ** error) const1776   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1777       const grpc_json* json, grpc_error** error) const override {
1778     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1779     if (json == nullptr) {
1780       return RefCountedPtr<LoadBalancingPolicy::Config>(
1781           New<ParsedGrpcLbConfig>(nullptr));
1782     }
1783     InlinedVector<grpc_error*, 2> error_list;
1784     RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1785     for (const grpc_json* field = json->child; field != nullptr;
1786          field = field->next) {
1787       if (field->key == nullptr) continue;
1788       if (strcmp(field->key, "childPolicy") == 0) {
1789         if (child_policy != nullptr) {
1790           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1791               "field:childPolicy error:Duplicate entry"));
1792         }
1793         grpc_error* parse_error = GRPC_ERROR_NONE;
1794         child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1795             field, &parse_error);
1796         if (parse_error != GRPC_ERROR_NONE) {
1797           error_list.push_back(parse_error);
1798         }
1799       }
1800     }
1801     if (error_list.empty()) {
1802       return RefCountedPtr<LoadBalancingPolicy::Config>(
1803           New<ParsedGrpcLbConfig>(std::move(child_policy)));
1804     } else {
1805       *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
1806       return nullptr;
1807     }
1808   }
1809 };
1810 
1811 }  // namespace
1812 
1813 }  // namespace grpc_core
1814 
1815 //
1816 // Plugin registration
1817 //
1818 
1819 namespace {
1820 
1821 // Only add client_load_reporting filter if the grpclb LB policy is used.
maybe_add_client_load_reporting_filter(grpc_channel_stack_builder * builder,void * arg)1822 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1823                                             void* arg) {
1824   const grpc_channel_args* args =
1825       grpc_channel_stack_builder_get_channel_arguments(builder);
1826   const grpc_arg* channel_arg =
1827       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1828   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1829       strcmp(channel_arg->value.string, "grpclb") == 0) {
1830     return grpc_channel_stack_builder_append_filter(
1831         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1832   }
1833   return true;
1834 }
1835 
1836 }  // namespace
1837 
grpc_lb_policy_grpclb_init()1838 void grpc_lb_policy_grpclb_init() {
1839   grpc_core::LoadBalancingPolicyRegistry::Builder::
1840       RegisterLoadBalancingPolicyFactory(
1841           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1842               grpc_core::New<grpc_core::GrpcLbFactory>()));
1843   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1844                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1845                                    maybe_add_client_load_reporting_filter,
1846                                    (void*)&grpc_client_load_reporting_filter);
1847 }
1848 
grpc_lb_policy_grpclb_shutdown()1849 void grpc_lb_policy_grpclb_shutdown() {}
1850