1 /*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers. This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose). The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down). In needed, we retry the
33 /// call. If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends. Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses. If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
43 /// resolver.
44 ///
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance. Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
50 ///
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
53
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
59
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
62
63 #include <inttypes.h>
64 #include <limits.h>
65 #include <string.h>
66
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
72
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/string.h"
88 #include "src/core/lib/gprpp/manual_constructor.h"
89 #include "src/core/lib/gprpp/memory.h"
90 #include "src/core/lib/gprpp/orphanable.h"
91 #include "src/core/lib/gprpp/ref_counted_ptr.h"
92 #include "src/core/lib/iomgr/combiner.h"
93 #include "src/core/lib/iomgr/sockaddr.h"
94 #include "src/core/lib/iomgr/sockaddr_utils.h"
95 #include "src/core/lib/iomgr/timer.h"
96 #include "src/core/lib/slice/slice_hash_table.h"
97 #include "src/core/lib/slice/slice_internal.h"
98 #include "src/core/lib/slice/slice_string_helpers.h"
99 #include "src/core/lib/surface/call.h"
100 #include "src/core/lib/surface/channel.h"
101 #include "src/core/lib/surface/channel_init.h"
102 #include "src/core/lib/transport/static_metadata.h"
103
104 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
105 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
106 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
107 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
108 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
109
110 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
111
112 namespace grpc_core {
113
114 TraceFlag grpc_lb_glb_trace(false, "glb");
115
116 namespace {
117
118 constexpr char kGrpclb[] = "grpclb";
119
120 class ParsedGrpcLbConfig : public LoadBalancingPolicy::Config {
121 public:
ParsedGrpcLbConfig(RefCountedPtr<LoadBalancingPolicy::Config> child_policy)122 explicit ParsedGrpcLbConfig(
123 RefCountedPtr<LoadBalancingPolicy::Config> child_policy)
124 : child_policy_(std::move(child_policy)) {}
name() const125 const char* name() const override { return kGrpclb; }
126
child_policy() const127 RefCountedPtr<LoadBalancingPolicy::Config> child_policy() const {
128 return child_policy_;
129 }
130
131 private:
132 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_;
133 };
134
135 class GrpcLb : public LoadBalancingPolicy {
136 public:
137 explicit GrpcLb(Args args);
138
name() const139 const char* name() const override { return kGrpclb; }
140
141 void UpdateLocked(UpdateArgs args) override;
142 void ResetBackoffLocked() override;
143
144 private:
145 /// Contains a call to the LB server and all the data related to the call.
146 class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
147 public:
148 explicit BalancerCallState(
149 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
150
151 // It's the caller's responsibility to ensure that Orphan() is called from
152 // inside the combiner.
153 void Orphan() override;
154
155 void StartQuery();
156
client_stats() const157 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
158
seen_initial_response() const159 bool seen_initial_response() const { return seen_initial_response_; }
seen_serverlist() const160 bool seen_serverlist() const { return seen_serverlist_; }
161
162 private:
163 GRPC_ALLOW_CLASS_TO_USE_NON_PUBLIC_DELETE
164
165 ~BalancerCallState();
166
grpclb_policy() const167 GrpcLb* grpclb_policy() const {
168 return static_cast<GrpcLb*>(grpclb_policy_.get());
169 }
170
171 void ScheduleNextClientLoadReportLocked();
172 void SendClientLoadReportLocked();
173
174 static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
175
176 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
177 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
178 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
179 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
180 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
181
182 // The owning LB policy.
183 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
184
185 // The streaming call to the LB server. Always non-NULL.
186 grpc_call* lb_call_ = nullptr;
187
188 // recv_initial_metadata
189 grpc_metadata_array lb_initial_metadata_recv_;
190
191 // send_message
192 grpc_byte_buffer* send_message_payload_ = nullptr;
193 grpc_closure lb_on_initial_request_sent_;
194
195 // recv_message
196 grpc_byte_buffer* recv_message_payload_ = nullptr;
197 grpc_closure lb_on_balancer_message_received_;
198 bool seen_initial_response_ = false;
199 bool seen_serverlist_ = false;
200
201 // recv_trailing_metadata
202 grpc_closure lb_on_balancer_status_received_;
203 grpc_metadata_array lb_trailing_metadata_recv_;
204 grpc_status_code lb_call_status_;
205 grpc_slice lb_call_status_details_;
206
207 // The stats for client-side load reporting associated with this LB call.
208 // Created after the first serverlist is received.
209 RefCountedPtr<GrpcLbClientStats> client_stats_;
210 grpc_millis client_stats_report_interval_ = 0;
211 grpc_timer client_load_report_timer_;
212 bool client_load_report_timer_callback_pending_ = false;
213 bool last_client_load_report_counters_were_zero_ = false;
214 bool client_load_report_is_due_ = false;
215 // The closure used for either the load report timer or the callback for
216 // completion of sending the load report.
217 grpc_closure client_load_report_closure_;
218 };
219
220 class Serverlist : public RefCounted<Serverlist> {
221 public:
222 // Takes ownership of serverlist.
Serverlist(grpc_grpclb_serverlist * serverlist)223 explicit Serverlist(grpc_grpclb_serverlist* serverlist)
224 : serverlist_(serverlist) {}
225
~Serverlist()226 ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
227
228 bool operator==(const Serverlist& other) const;
229
serverlist() const230 const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
231
232 // Returns a text representation suitable for logging.
233 UniquePtr<char> AsText() const;
234
235 // Extracts all non-drop entries into a ServerAddressList.
236 ServerAddressList GetServerAddressList(
237 GrpcLbClientStats* client_stats) const;
238
239 // Returns true if the serverlist contains at least one drop entry and
240 // no backend address entries.
241 bool ContainsAllDropEntries() const;
242
243 // Returns the LB token to use for a drop, or null if the call
244 // should not be dropped.
245 //
246 // Note: This is called from the picker, so it will be invoked in
247 // the channel's data plane combiner, NOT the control plane
248 // combiner. It should not be accessed by any other part of the LB
249 // policy.
250 const char* ShouldDrop();
251
252 private:
253 grpc_grpclb_serverlist* serverlist_;
254
255 // Guarded by the channel's data plane combiner, NOT the control
256 // plane combiner. It should not be accessed by anything but the
257 // picker via the ShouldDrop() method.
258 size_t drop_index_ = 0;
259 };
260
261 class Picker : public SubchannelPicker {
262 public:
Picker(GrpcLb * parent,RefCountedPtr<Serverlist> serverlist,UniquePtr<SubchannelPicker> child_picker,RefCountedPtr<GrpcLbClientStats> client_stats)263 Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
264 UniquePtr<SubchannelPicker> child_picker,
265 RefCountedPtr<GrpcLbClientStats> client_stats)
266 : parent_(parent),
267 serverlist_(std::move(serverlist)),
268 child_picker_(std::move(child_picker)),
269 client_stats_(std::move(client_stats)) {}
270
271 PickResult Pick(PickArgs args) override;
272
273 private:
274 // Storing the address for logging, but not holding a ref.
275 // DO NOT DEFERENCE!
276 GrpcLb* parent_;
277
278 // Serverlist to be used for determining drops.
279 RefCountedPtr<Serverlist> serverlist_;
280
281 UniquePtr<SubchannelPicker> child_picker_;
282 RefCountedPtr<GrpcLbClientStats> client_stats_;
283 };
284
285 class Helper : public ChannelControlHelper {
286 public:
Helper(RefCountedPtr<GrpcLb> parent)287 explicit Helper(RefCountedPtr<GrpcLb> parent)
288 : parent_(std::move(parent)) {}
289
290 RefCountedPtr<SubchannelInterface> CreateSubchannel(
291 const grpc_channel_args& args) override;
292 grpc_channel* CreateChannel(const char* target,
293 const grpc_channel_args& args) override;
294 void UpdateState(grpc_connectivity_state state,
295 UniquePtr<SubchannelPicker> picker) override;
296 void RequestReresolution() override;
297 void AddTraceEvent(TraceSeverity severity, const char* message) override;
298
set_child(LoadBalancingPolicy * child)299 void set_child(LoadBalancingPolicy* child) { child_ = child; }
300
301 private:
302 bool CalledByPendingChild() const;
303 bool CalledByCurrentChild() const;
304
305 RefCountedPtr<GrpcLb> parent_;
306 LoadBalancingPolicy* child_ = nullptr;
307 };
308
309 ~GrpcLb();
310
311 void ShutdownLocked() override;
312
313 // Helper functions used in UpdateLocked().
314 void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
315 const grpc_channel_args& args);
316 static void OnBalancerChannelConnectivityChangedLocked(void* arg,
317 grpc_error* error);
318 void CancelBalancerChannelConnectivityWatchLocked();
319
320 // Methods for dealing with fallback state.
321 void MaybeEnterFallbackModeAfterStartup();
322 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
323
324 // Methods for dealing with the balancer call.
325 void StartBalancerCallLocked();
326 void StartBalancerCallRetryTimerLocked();
327 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
328
329 // Methods for dealing with the child policy.
330 grpc_channel_args* CreateChildPolicyArgsLocked(
331 bool is_backend_from_grpclb_load_balancer);
332 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
333 const char* name, const grpc_channel_args* args);
334 void CreateOrUpdateChildPolicyLocked();
335
336 // Who the client is trying to communicate with.
337 const char* server_name_ = nullptr;
338
339 // Current channel args from the resolver.
340 grpc_channel_args* args_ = nullptr;
341
342 // Internal state.
343 bool shutting_down_ = false;
344
345 // The channel for communicating with the LB server.
346 grpc_channel* lb_channel_ = nullptr;
347 // Response generator to inject address updates into lb_channel_.
348 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
349
350 // The data associated with the current LB call. It holds a ref to this LB
351 // policy. It's initialized every time we query for backends. It's reset to
352 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
353 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
354 // contains a non-NULL lb_call_.
355 OrphanablePtr<BalancerCallState> lb_calld_;
356 // Timeout in milliseconds for the LB call. 0 means no deadline.
357 int lb_call_timeout_ms_ = 0;
358 // Balancer call retry state.
359 BackOff lb_call_backoff_;
360 bool retry_timer_callback_pending_ = false;
361 grpc_timer lb_call_retry_timer_;
362 grpc_closure lb_on_call_retry_;
363
364 // The deserialized response from the balancer. May be nullptr until one
365 // such response has arrived.
366 RefCountedPtr<Serverlist> serverlist_;
367
368 // Whether we're in fallback mode.
369 bool fallback_mode_ = false;
370 // The backend addresses from the resolver.
371 ServerAddressList fallback_backend_addresses_;
372 // State for fallback-at-startup checks.
373 // Timeout after startup after which we will go into fallback mode if
374 // we have not received a serverlist from the balancer.
375 int fallback_at_startup_timeout_ = 0;
376 bool fallback_at_startup_checks_pending_ = false;
377 grpc_timer lb_fallback_timer_;
378 grpc_closure lb_on_fallback_;
379 grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
380 grpc_closure lb_channel_on_connectivity_changed_;
381
382 // The child policy to use for the backends.
383 OrphanablePtr<LoadBalancingPolicy> child_policy_;
384 // When switching child policies, the new policy will be stored here
385 // until it reports READY, at which point it will be moved to child_policy_.
386 OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
387 // The child policy config.
388 RefCountedPtr<LoadBalancingPolicy::Config> child_policy_config_;
389 // Child policy in state READY.
390 bool child_policy_ready_ = false;
391 };
392
393 //
394 // GrpcLb::Serverlist
395 //
396
operator ==(const Serverlist & other) const397 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
398 return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
399 }
400
ParseServer(const grpc_grpclb_server * server,grpc_resolved_address * addr)401 void ParseServer(const grpc_grpclb_server* server,
402 grpc_resolved_address* addr) {
403 memset(addr, 0, sizeof(*addr));
404 if (server->drop) return;
405 const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
406 /* the addresses are given in binary format (a in(6)_addr struct) in
407 * server->ip_address.bytes. */
408 const grpc_grpclb_ip_address* ip = &server->ip_address;
409 if (ip->size == 4) {
410 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
411 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
412 addr4->sin_family = GRPC_AF_INET;
413 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
414 addr4->sin_port = netorder_port;
415 } else if (ip->size == 16) {
416 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
417 grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
418 addr6->sin6_family = GRPC_AF_INET6;
419 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
420 addr6->sin6_port = netorder_port;
421 }
422 }
423
AsText() const424 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
425 gpr_strvec entries;
426 gpr_strvec_init(&entries);
427 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
428 const auto* server = serverlist_->servers[i];
429 char* ipport;
430 if (server->drop) {
431 ipport = gpr_strdup("(drop)");
432 } else {
433 grpc_resolved_address addr;
434 ParseServer(server, &addr);
435 grpc_sockaddr_to_string(&ipport, &addr, false);
436 }
437 char* entry;
438 gpr_asprintf(&entry, " %" PRIuPTR ": %s token=%s\n", i, ipport,
439 server->load_balance_token);
440 gpr_free(ipport);
441 gpr_strvec_add(&entries, entry);
442 }
443 UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
444 gpr_strvec_destroy(&entries);
445 return result;
446 }
447
448 // vtable for LB token channel arg.
lb_token_copy(void * token)449 void* lb_token_copy(void* token) {
450 return token == nullptr
451 ? nullptr
452 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
453 }
lb_token_destroy(void * token)454 void lb_token_destroy(void* token) {
455 if (token != nullptr) {
456 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
457 }
458 }
lb_token_cmp(void * token1,void * token2)459 int lb_token_cmp(void* token1, void* token2) {
460 // Always indicate a match, since we don't want this channel arg to
461 // affect the subchannel's key in the index.
462 return 0;
463 }
464 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
465 lb_token_copy, lb_token_destroy, lb_token_cmp};
466
IsServerValid(const grpc_grpclb_server * server,size_t idx,bool log)467 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
468 if (server->drop) return false;
469 const grpc_grpclb_ip_address* ip = &server->ip_address;
470 if (GPR_UNLIKELY(server->port >> 16 != 0)) {
471 if (log) {
472 gpr_log(GPR_ERROR,
473 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
474 server->port, (unsigned long)idx);
475 }
476 return false;
477 }
478 if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
479 if (log) {
480 gpr_log(GPR_ERROR,
481 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
482 "serverlist. Ignoring",
483 ip->size, (unsigned long)idx);
484 }
485 return false;
486 }
487 return true;
488 }
489
490 // Returns addresses extracted from the serverlist.
GetServerAddressList(GrpcLbClientStats * client_stats) const491 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
492 GrpcLbClientStats* client_stats) const {
493 ServerAddressList addresses;
494 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
495 const grpc_grpclb_server* server = serverlist_->servers[i];
496 if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
497 // Address processing.
498 grpc_resolved_address addr;
499 ParseServer(server, &addr);
500 // LB token processing.
501 grpc_mdelem lb_token;
502 if (server->has_load_balance_token) {
503 const size_t lb_token_max_length =
504 GPR_ARRAY_SIZE(server->load_balance_token);
505 const size_t lb_token_length =
506 strnlen(server->load_balance_token, lb_token_max_length);
507 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
508 server->load_balance_token, lb_token_length);
509 lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
510 if (client_stats != nullptr) {
511 GPR_ASSERT(grpc_mdelem_set_user_data(
512 lb_token, GrpcLbClientStats::Destroy,
513 client_stats->Ref().release()) == client_stats);
514 }
515 } else {
516 char* uri = grpc_sockaddr_to_uri(&addr);
517 gpr_log(GPR_INFO,
518 "Missing LB token for backend address '%s'. The empty token will "
519 "be used instead",
520 uri);
521 gpr_free(uri);
522 lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
523 }
524 // Add address.
525 grpc_arg arg = grpc_channel_arg_pointer_create(
526 const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
527 (void*)lb_token.payload, &lb_token_arg_vtable);
528 grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
529 addresses.emplace_back(addr, args);
530 // Clean up.
531 GRPC_MDELEM_UNREF(lb_token);
532 }
533 return addresses;
534 }
535
ContainsAllDropEntries() const536 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
537 if (serverlist_->num_servers == 0) return false;
538 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
539 if (!serverlist_->servers[i]->drop) return false;
540 }
541 return true;
542 }
543
ShouldDrop()544 const char* GrpcLb::Serverlist::ShouldDrop() {
545 if (serverlist_->num_servers == 0) return nullptr;
546 grpc_grpclb_server* server = serverlist_->servers[drop_index_];
547 drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
548 return server->drop ? server->load_balance_token : nullptr;
549 }
550
551 //
552 // GrpcLb::Picker
553 //
554
Pick(PickArgs args)555 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
556 PickResult result;
557 // Check if we should drop the call.
558 const char* drop_token = serverlist_->ShouldDrop();
559 if (drop_token != nullptr) {
560 // Update client load reporting stats to indicate the number of
561 // dropped calls. Note that we have to do this here instead of in
562 // the client_load_reporting filter, because we do not create a
563 // subchannel call (and therefore no client_load_reporting filter)
564 // for dropped calls.
565 if (client_stats_ != nullptr) {
566 client_stats_->AddCallDropped(drop_token);
567 }
568 result.type = PickResult::PICK_COMPLETE;
569 return result;
570 }
571 // Forward pick to child policy.
572 result = child_picker_->Pick(args);
573 // If pick succeeded, add LB token to initial metadata.
574 if (result.type == PickResult::PICK_COMPLETE &&
575 result.connected_subchannel != nullptr) {
576 const grpc_arg* arg = grpc_channel_args_find(
577 result.connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
578 if (arg == nullptr) {
579 gpr_log(GPR_ERROR,
580 "[grpclb %p picker %p] No LB token for connected subchannel %p",
581 parent_, this, result.connected_subchannel.get());
582 abort();
583 }
584 grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
585 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
586 grpc_linked_mdelem* mdelem_storage = static_cast<grpc_linked_mdelem*>(
587 args.call_state->Alloc(sizeof(grpc_linked_mdelem)));
588 GPR_ASSERT(grpc_metadata_batch_add_tail(
589 args.initial_metadata, mdelem_storage,
590 GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
591 GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
592 grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
593 if (client_stats != nullptr) {
594 client_stats->AddCallStarted();
595 }
596 }
597 return result;
598 }
599
600 //
601 // GrpcLb::Helper
602 //
603
CalledByPendingChild() const604 bool GrpcLb::Helper::CalledByPendingChild() const {
605 GPR_ASSERT(child_ != nullptr);
606 return child_ == parent_->pending_child_policy_.get();
607 }
608
CalledByCurrentChild() const609 bool GrpcLb::Helper::CalledByCurrentChild() const {
610 GPR_ASSERT(child_ != nullptr);
611 return child_ == parent_->child_policy_.get();
612 }
613
CreateSubchannel(const grpc_channel_args & args)614 RefCountedPtr<SubchannelInterface> GrpcLb::Helper::CreateSubchannel(
615 const grpc_channel_args& args) {
616 if (parent_->shutting_down_ ||
617 (!CalledByPendingChild() && !CalledByCurrentChild())) {
618 return nullptr;
619 }
620 return parent_->channel_control_helper()->CreateSubchannel(args);
621 }
622
CreateChannel(const char * target,const grpc_channel_args & args)623 grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
624 const grpc_channel_args& args) {
625 if (parent_->shutting_down_ ||
626 (!CalledByPendingChild() && !CalledByCurrentChild())) {
627 return nullptr;
628 }
629 return parent_->channel_control_helper()->CreateChannel(target, args);
630 }
631
UpdateState(grpc_connectivity_state state,UniquePtr<SubchannelPicker> picker)632 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
633 UniquePtr<SubchannelPicker> picker) {
634 if (parent_->shutting_down_) return;
635 // If this request is from the pending child policy, ignore it until
636 // it reports READY, at which point we swap it into place.
637 if (CalledByPendingChild()) {
638 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
639 gpr_log(GPR_INFO,
640 "[grpclb %p helper %p] pending child policy %p reports state=%s",
641 parent_.get(), this, parent_->pending_child_policy_.get(),
642 grpc_connectivity_state_name(state));
643 }
644 if (state != GRPC_CHANNEL_READY) return;
645 grpc_pollset_set_del_pollset_set(
646 parent_->child_policy_->interested_parties(),
647 parent_->interested_parties());
648 parent_->child_policy_ = std::move(parent_->pending_child_policy_);
649 } else if (!CalledByCurrentChild()) {
650 // This request is from an outdated child, so ignore it.
651 return;
652 }
653 // Record whether child policy reports READY.
654 parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
655 // Enter fallback mode if needed.
656 parent_->MaybeEnterFallbackModeAfterStartup();
657 // There are three cases to consider here:
658 // 1. We're in fallback mode. In this case, we're always going to use
659 // the child policy's result, so we pass its picker through as-is.
660 // 2. The serverlist contains only drop entries. In this case, we
661 // want to use our own picker so that we can return the drops.
662 // 3. Not in fallback mode and serverlist is not all drops (i.e., it
663 // may be empty or contain at least one backend address). There are
664 // two sub-cases:
665 // a. The child policy is reporting state READY. In this case, we wrap
666 // the child's picker in our own, so that we can handle drops and LB
667 // token metadata for each pick.
668 // b. The child policy is reporting a state other than READY. In this
669 // case, we don't want to use our own picker, because we don't want
670 // to process drops for picks that yield a QUEUE result; this would
671 // result in dropping too many calls, since we will see the
672 // queued picks multiple times, and we'd consider each one a
673 // separate call for the drop calculation.
674 //
675 // Cases 1 and 3b: return picker from the child policy as-is.
676 if (parent_->serverlist_ == nullptr ||
677 (!parent_->serverlist_->ContainsAllDropEntries() &&
678 state != GRPC_CHANNEL_READY)) {
679 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
680 gpr_log(GPR_INFO,
681 "[grpclb %p helper %p] state=%s passing child picker %p as-is",
682 parent_.get(), this, grpc_connectivity_state_name(state),
683 picker.get());
684 }
685 parent_->channel_control_helper()->UpdateState(state, std::move(picker));
686 return;
687 }
688 // Cases 2 and 3a: wrap picker from the child in our own picker.
689 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
690 gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
691 parent_.get(), this, grpc_connectivity_state_name(state),
692 picker.get());
693 }
694 RefCountedPtr<GrpcLbClientStats> client_stats;
695 if (parent_->lb_calld_ != nullptr &&
696 parent_->lb_calld_->client_stats() != nullptr) {
697 client_stats = parent_->lb_calld_->client_stats()->Ref();
698 }
699 parent_->channel_control_helper()->UpdateState(
700 state, UniquePtr<SubchannelPicker>(
701 New<Picker>(parent_.get(), parent_->serverlist_,
702 std::move(picker), std::move(client_stats))));
703 }
704
RequestReresolution()705 void GrpcLb::Helper::RequestReresolution() {
706 if (parent_->shutting_down_) return;
707 const LoadBalancingPolicy* latest_child_policy =
708 parent_->pending_child_policy_ != nullptr
709 ? parent_->pending_child_policy_.get()
710 : parent_->child_policy_.get();
711 if (child_ != latest_child_policy) return;
712 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
713 gpr_log(GPR_INFO,
714 "[grpclb %p] Re-resolution requested from %schild policy (%p).",
715 parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
716 }
717 // If we are talking to a balancer, we expect to get updated addresses
718 // from the balancer, so we can ignore the re-resolution request from
719 // the child policy. Otherwise, pass the re-resolution request up to the
720 // channel.
721 if (parent_->lb_calld_ == nullptr ||
722 !parent_->lb_calld_->seen_initial_response()) {
723 parent_->channel_control_helper()->RequestReresolution();
724 }
725 }
726
AddTraceEvent(TraceSeverity severity,const char * message)727 void GrpcLb::Helper::AddTraceEvent(TraceSeverity severity,
728 const char* message) {
729 if (parent_->shutting_down_ ||
730 (!CalledByPendingChild() && !CalledByCurrentChild())) {
731 return;
732 }
733 parent_->channel_control_helper()->AddTraceEvent(severity, message);
734 }
735
736 //
737 // GrpcLb::BalancerCallState
738 //
739
BalancerCallState(RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)740 GrpcLb::BalancerCallState::BalancerCallState(
741 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
742 : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
743 grpclb_policy_(std::move(parent_grpclb_policy)) {
744 GPR_ASSERT(grpclb_policy_ != nullptr);
745 GPR_ASSERT(!grpclb_policy()->shutting_down_);
746 // Init the LB call. Note that the LB call will progress every time there's
747 // activity in grpclb_policy_->interested_parties(), which is comprised of
748 // the polling entities from client_channel.
749 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
750 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
751 const grpc_millis deadline =
752 grpclb_policy()->lb_call_timeout_ms_ == 0
753 ? GRPC_MILLIS_INF_FUTURE
754 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
755 lb_call_ = grpc_channel_create_pollset_set_call(
756 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
757 grpclb_policy_->interested_parties(),
758 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
759 nullptr, deadline, nullptr);
760 // Init the LB call request payload.
761 grpc_grpclb_request* request =
762 grpc_grpclb_request_create(grpclb_policy()->server_name_);
763 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
764 send_message_payload_ =
765 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
766 grpc_slice_unref_internal(request_payload_slice);
767 grpc_grpclb_request_destroy(request);
768 // Init other data associated with the LB call.
769 grpc_metadata_array_init(&lb_initial_metadata_recv_);
770 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
771 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
772 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
773 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
774 OnBalancerMessageReceivedLocked, this,
775 grpc_combiner_scheduler(grpclb_policy()->combiner()));
776 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
777 OnBalancerStatusReceivedLocked, this,
778 grpc_combiner_scheduler(grpclb_policy()->combiner()));
779 }
780
~BalancerCallState()781 GrpcLb::BalancerCallState::~BalancerCallState() {
782 GPR_ASSERT(lb_call_ != nullptr);
783 grpc_call_unref(lb_call_);
784 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
785 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
786 grpc_byte_buffer_destroy(send_message_payload_);
787 grpc_byte_buffer_destroy(recv_message_payload_);
788 grpc_slice_unref_internal(lb_call_status_details_);
789 }
790
Orphan()791 void GrpcLb::BalancerCallState::Orphan() {
792 GPR_ASSERT(lb_call_ != nullptr);
793 // If we are here because grpclb_policy wants to cancel the call,
794 // lb_on_balancer_status_received_ will complete the cancellation and clean
795 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
796 // call, then the following cancellation will be a no-op.
797 grpc_call_cancel(lb_call_, nullptr);
798 if (client_load_report_timer_callback_pending_) {
799 grpc_timer_cancel(&client_load_report_timer_);
800 }
801 // Note that the initial ref is hold by lb_on_balancer_status_received_
802 // instead of the caller of this function. So the corresponding unref happens
803 // in lb_on_balancer_status_received_ instead of here.
804 }
805
StartQuery()806 void GrpcLb::BalancerCallState::StartQuery() {
807 GPR_ASSERT(lb_call_ != nullptr);
808 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
809 gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
810 grpclb_policy_.get(), this, lb_call_);
811 }
812 // Create the ops.
813 grpc_call_error call_error;
814 grpc_op ops[3];
815 memset(ops, 0, sizeof(ops));
816 // Op: send initial metadata.
817 grpc_op* op = ops;
818 op->op = GRPC_OP_SEND_INITIAL_METADATA;
819 op->data.send_initial_metadata.count = 0;
820 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
821 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
822 op->reserved = nullptr;
823 op++;
824 // Op: send request message.
825 GPR_ASSERT(send_message_payload_ != nullptr);
826 op->op = GRPC_OP_SEND_MESSAGE;
827 op->data.send_message.send_message = send_message_payload_;
828 op->flags = 0;
829 op->reserved = nullptr;
830 op++;
831 // TODO(roth): We currently track this ref manually. Once the
832 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
833 // with the callback.
834 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
835 self.release();
836 call_error = grpc_call_start_batch_and_execute(
837 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
838 GPR_ASSERT(GRPC_CALL_OK == call_error);
839 // Op: recv initial metadata.
840 op = ops;
841 op->op = GRPC_OP_RECV_INITIAL_METADATA;
842 op->data.recv_initial_metadata.recv_initial_metadata =
843 &lb_initial_metadata_recv_;
844 op->flags = 0;
845 op->reserved = nullptr;
846 op++;
847 // Op: recv response.
848 op->op = GRPC_OP_RECV_MESSAGE;
849 op->data.recv_message.recv_message = &recv_message_payload_;
850 op->flags = 0;
851 op->reserved = nullptr;
852 op++;
853 // TODO(roth): We currently track this ref manually. Once the
854 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
855 // with the callback.
856 self = Ref(DEBUG_LOCATION, "on_message_received");
857 self.release();
858 call_error = grpc_call_start_batch_and_execute(
859 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
860 GPR_ASSERT(GRPC_CALL_OK == call_error);
861 // Op: recv server status.
862 op = ops;
863 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
864 op->data.recv_status_on_client.trailing_metadata =
865 &lb_trailing_metadata_recv_;
866 op->data.recv_status_on_client.status = &lb_call_status_;
867 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
868 op->flags = 0;
869 op->reserved = nullptr;
870 op++;
871 // This callback signals the end of the LB call, so it relies on the initial
872 // ref instead of a new ref. When it's invoked, it's the initial ref that is
873 // unreffed.
874 call_error = grpc_call_start_batch_and_execute(
875 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
876 GPR_ASSERT(GRPC_CALL_OK == call_error);
877 }
878
ScheduleNextClientLoadReportLocked()879 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
880 const grpc_millis next_client_load_report_time =
881 ExecCtx::Get()->Now() + client_stats_report_interval_;
882 GRPC_CLOSURE_INIT(&client_load_report_closure_,
883 MaybeSendClientLoadReportLocked, this,
884 grpc_combiner_scheduler(grpclb_policy()->combiner()));
885 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
886 &client_load_report_closure_);
887 client_load_report_timer_callback_pending_ = true;
888 }
889
MaybeSendClientLoadReportLocked(void * arg,grpc_error * error)890 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
891 void* arg, grpc_error* error) {
892 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
893 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
894 lb_calld->client_load_report_timer_callback_pending_ = false;
895 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
896 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
897 return;
898 }
899 // If we've already sent the initial request, then we can go ahead and send
900 // the load report. Otherwise, we need to wait until the initial request has
901 // been sent to send this (see OnInitialRequestSentLocked()).
902 if (lb_calld->send_message_payload_ == nullptr) {
903 lb_calld->SendClientLoadReportLocked();
904 } else {
905 lb_calld->client_load_report_is_due_ = true;
906 }
907 }
908
LoadReportCountersAreZero(grpc_grpclb_request * request)909 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
910 grpc_grpclb_request* request) {
911 GrpcLbClientStats::DroppedCallCounts* drop_entries =
912 static_cast<GrpcLbClientStats::DroppedCallCounts*>(
913 request->client_stats.calls_finished_with_drop.arg);
914 return request->client_stats.num_calls_started == 0 &&
915 request->client_stats.num_calls_finished == 0 &&
916 request->client_stats.num_calls_finished_with_client_failed_to_send ==
917 0 &&
918 request->client_stats.num_calls_finished_known_received == 0 &&
919 (drop_entries == nullptr || drop_entries->size() == 0);
920 }
921
SendClientLoadReportLocked()922 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
923 // Construct message payload.
924 GPR_ASSERT(send_message_payload_ == nullptr);
925 grpc_grpclb_request* request =
926 grpc_grpclb_load_report_request_create(client_stats_.get());
927 // Skip client load report if the counters were all zero in the last
928 // report and they are still zero in this one.
929 if (LoadReportCountersAreZero(request)) {
930 if (last_client_load_report_counters_were_zero_) {
931 grpc_grpclb_request_destroy(request);
932 ScheduleNextClientLoadReportLocked();
933 return;
934 }
935 last_client_load_report_counters_were_zero_ = true;
936 } else {
937 last_client_load_report_counters_were_zero_ = false;
938 }
939 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
940 send_message_payload_ =
941 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
942 grpc_slice_unref_internal(request_payload_slice);
943 grpc_grpclb_request_destroy(request);
944 // Send the report.
945 grpc_op op;
946 memset(&op, 0, sizeof(op));
947 op.op = GRPC_OP_SEND_MESSAGE;
948 op.data.send_message.send_message = send_message_payload_;
949 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
950 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
951 grpc_call_error call_error = grpc_call_start_batch_and_execute(
952 lb_call_, &op, 1, &client_load_report_closure_);
953 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
954 gpr_log(GPR_ERROR,
955 "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
956 grpclb_policy_.get(), this, call_error);
957 GPR_ASSERT(GRPC_CALL_OK == call_error);
958 }
959 }
960
ClientLoadReportDoneLocked(void * arg,grpc_error * error)961 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
962 grpc_error* error) {
963 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
964 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
965 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
966 lb_calld->send_message_payload_ = nullptr;
967 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
968 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
969 return;
970 }
971 lb_calld->ScheduleNextClientLoadReportLocked();
972 }
973
OnInitialRequestSentLocked(void * arg,grpc_error * error)974 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
975 grpc_error* error) {
976 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
977 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
978 lb_calld->send_message_payload_ = nullptr;
979 // If we attempted to send a client load report before the initial request was
980 // sent (and this lb_calld is still in use), send the load report now.
981 if (lb_calld->client_load_report_is_due_ &&
982 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
983 lb_calld->SendClientLoadReportLocked();
984 lb_calld->client_load_report_is_due_ = false;
985 }
986 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
987 }
988
OnBalancerMessageReceivedLocked(void * arg,grpc_error * error)989 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
990 void* arg, grpc_error* error) {
991 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
992 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
993 // Null payload means the LB call was cancelled.
994 if (lb_calld != grpclb_policy->lb_calld_.get() ||
995 lb_calld->recv_message_payload_ == nullptr) {
996 lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
997 return;
998 }
999 grpc_byte_buffer_reader bbr;
1000 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
1001 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
1002 grpc_byte_buffer_reader_destroy(&bbr);
1003 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
1004 lb_calld->recv_message_payload_ = nullptr;
1005 grpc_grpclb_initial_response* initial_response;
1006 grpc_grpclb_serverlist* serverlist;
1007 if (!lb_calld->seen_initial_response_ &&
1008 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
1009 nullptr) {
1010 // Have NOT seen initial response, look for initial response.
1011 if (initial_response->has_client_stats_report_interval) {
1012 lb_calld->client_stats_report_interval_ = GPR_MAX(
1013 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
1014 &initial_response->client_stats_report_interval));
1015 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1016 gpr_log(GPR_INFO,
1017 "[grpclb %p] lb_calld=%p: Received initial LB response "
1018 "message; client load reporting interval = %" PRId64
1019 " milliseconds",
1020 grpclb_policy, lb_calld,
1021 lb_calld->client_stats_report_interval_);
1022 }
1023 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1024 gpr_log(GPR_INFO,
1025 "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1026 "client load reporting NOT enabled",
1027 grpclb_policy, lb_calld);
1028 }
1029 grpc_grpclb_initial_response_destroy(initial_response);
1030 lb_calld->seen_initial_response_ = true;
1031 } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1032 response_slice)) != nullptr) {
1033 // Have seen initial response, look for serverlist.
1034 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1035 auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1036 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1037 UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1038 gpr_log(GPR_INFO,
1039 "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1040 " servers received:\n%s",
1041 grpclb_policy, lb_calld, serverlist->num_servers,
1042 serverlist_text.get());
1043 }
1044 lb_calld->seen_serverlist_ = true;
1045 // Start sending client load report only after we start using the
1046 // serverlist returned from the current LB call.
1047 if (lb_calld->client_stats_report_interval_ > 0 &&
1048 lb_calld->client_stats_ == nullptr) {
1049 lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1050 // Ref held by callback.
1051 lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1052 lb_calld->ScheduleNextClientLoadReportLocked();
1053 }
1054 // Check if the serverlist differs from the previous one.
1055 if (grpclb_policy->serverlist_ != nullptr &&
1056 *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1057 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1058 gpr_log(GPR_INFO,
1059 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1060 "current, ignoring.",
1061 grpclb_policy, lb_calld);
1062 }
1063 } else { // New serverlist.
1064 // Dispose of the fallback.
1065 // TODO(roth): Ideally, we should stay in fallback mode until we
1066 // know that we can reach at least one of the backends in the new
1067 // serverlist. Unfortunately, we can't do that, since we need to
1068 // send the new addresses to the child policy in order to determine
1069 // if they are reachable, and if we don't exit fallback mode now,
1070 // CreateOrUpdateChildPolicyLocked() will use the fallback
1071 // addresses instead of the addresses from the new serverlist.
1072 // However, if we can't reach any of the servers in the new
1073 // serverlist, then the child policy will never switch away from
1074 // the fallback addresses, but the grpclb policy will still think
1075 // that we're not in fallback mode, which means that we won't send
1076 // updates to the child policy when the fallback addresses are
1077 // updated by the resolver. This is sub-optimal, but the only way
1078 // to fix it is to maintain a completely separate child policy for
1079 // fallback mode, and that's more work than we want to put into
1080 // the grpclb implementation at this point, since we're deprecating
1081 // it in favor of the xds policy. We will implement this the
1082 // right way in the xds policy instead.
1083 if (grpclb_policy->fallback_mode_) {
1084 gpr_log(GPR_INFO,
1085 "[grpclb %p] Received response from balancer; exiting "
1086 "fallback mode",
1087 grpclb_policy);
1088 grpclb_policy->fallback_mode_ = false;
1089 }
1090 if (grpclb_policy->fallback_at_startup_checks_pending_) {
1091 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1092 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1093 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1094 }
1095 // Update the serverlist in the GrpcLb instance. This serverlist
1096 // instance will be destroyed either upon the next update or when the
1097 // GrpcLb instance is destroyed.
1098 grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1099 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1100 }
1101 } else {
1102 // No valid initial response or serverlist found.
1103 char* response_slice_str =
1104 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1105 gpr_log(GPR_ERROR,
1106 "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1107 "Ignoring.",
1108 grpclb_policy, lb_calld, response_slice_str);
1109 gpr_free(response_slice_str);
1110 }
1111 grpc_slice_unref_internal(response_slice);
1112 if (!grpclb_policy->shutting_down_) {
1113 // Keep listening for serverlist updates.
1114 grpc_op op;
1115 memset(&op, 0, sizeof(op));
1116 op.op = GRPC_OP_RECV_MESSAGE;
1117 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1118 op.flags = 0;
1119 op.reserved = nullptr;
1120 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1121 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1122 lb_calld->lb_call_, &op, 1,
1123 &lb_calld->lb_on_balancer_message_received_);
1124 GPR_ASSERT(GRPC_CALL_OK == call_error);
1125 } else {
1126 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1127 }
1128 }
1129
OnBalancerStatusReceivedLocked(void * arg,grpc_error * error)1130 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1131 void* arg, grpc_error* error) {
1132 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1133 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1134 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1135 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1136 char* status_details =
1137 grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1138 gpr_log(GPR_INFO,
1139 "[grpclb %p] lb_calld=%p: Status from LB server received. "
1140 "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1141 grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1142 lb_calld->lb_call_, grpc_error_string(error));
1143 gpr_free(status_details);
1144 }
1145 // If this lb_calld is still in use, this call ended because of a failure so
1146 // we want to retry connecting. Otherwise, we have deliberately ended this
1147 // call and no further action is required.
1148 if (lb_calld == grpclb_policy->lb_calld_.get()) {
1149 // If the fallback-at-startup checks are pending, go into fallback mode
1150 // immediately. This short-circuits the timeout for the fallback-at-startup
1151 // case.
1152 if (grpclb_policy->fallback_at_startup_checks_pending_) {
1153 GPR_ASSERT(!lb_calld->seen_serverlist_);
1154 gpr_log(GPR_INFO,
1155 "[grpclb %p] Balancer call finished without receiving "
1156 "serverlist; entering fallback mode",
1157 grpclb_policy);
1158 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1159 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1160 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1161 grpclb_policy->fallback_mode_ = true;
1162 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1163 } else {
1164 // This handles the fallback-after-startup case.
1165 grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1166 }
1167 grpclb_policy->lb_calld_.reset();
1168 GPR_ASSERT(!grpclb_policy->shutting_down_);
1169 grpclb_policy->channel_control_helper()->RequestReresolution();
1170 if (lb_calld->seen_initial_response_) {
1171 // If we lose connection to the LB server, reset the backoff and restart
1172 // the LB call immediately.
1173 grpclb_policy->lb_call_backoff_.Reset();
1174 grpclb_policy->StartBalancerCallLocked();
1175 } else {
1176 // If this LB call fails establishing any connection to the LB server,
1177 // retry later.
1178 grpclb_policy->StartBalancerCallRetryTimerLocked();
1179 }
1180 }
1181 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1182 }
1183
1184 //
1185 // helper code for creating balancer channel
1186 //
1187
ExtractBalancerAddresses(const ServerAddressList & addresses)1188 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1189 ServerAddressList balancer_addresses;
1190 for (size_t i = 0; i < addresses.size(); ++i) {
1191 if (addresses[i].IsBalancer()) {
1192 // Strip out the is_balancer channel arg, since we don't want to
1193 // recursively use the grpclb policy in the channel used to talk to
1194 // the balancers. Note that we do NOT strip out the balancer_name
1195 // channel arg, since we need that to set the authority correctly
1196 // to talk to the balancers.
1197 static const char* args_to_remove[] = {
1198 GRPC_ARG_ADDRESS_IS_BALANCER,
1199 };
1200 balancer_addresses.emplace_back(
1201 addresses[i].address(),
1202 grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1203 GPR_ARRAY_SIZE(args_to_remove)));
1204 }
1205 }
1206 return balancer_addresses;
1207 }
1208
1209 /* Returns the channel args for the LB channel, used to create a bidirectional
1210 * stream for the reception of load balancing updates.
1211 *
1212 * Inputs:
1213 * - \a addresses: corresponding to the balancers.
1214 * - \a response_generator: in order to propagate updates from the resolver
1215 * above the grpclb policy.
1216 * - \a args: other args inherited from the grpclb policy. */
BuildBalancerChannelArgs(const ServerAddressList & addresses,FakeResolverResponseGenerator * response_generator,const grpc_channel_args * args)1217 grpc_channel_args* BuildBalancerChannelArgs(
1218 const ServerAddressList& addresses,
1219 FakeResolverResponseGenerator* response_generator,
1220 const grpc_channel_args* args) {
1221 // Channel args to remove.
1222 static const char* args_to_remove[] = {
1223 // LB policy name, since we want to use the default (pick_first) in
1224 // the LB channel.
1225 GRPC_ARG_LB_POLICY_NAME,
1226 // Strip out the service config, since we don't want the LB policy
1227 // config specified for the parent channel to affect the LB channel.
1228 GRPC_ARG_SERVICE_CONFIG,
1229 // The channel arg for the server URI, since that will be different for
1230 // the LB channel than for the parent channel. The client channel
1231 // factory will re-add this arg with the right value.
1232 GRPC_ARG_SERVER_URI,
1233 // The fake resolver response generator, because we are replacing it
1234 // with the one from the grpclb policy, used to propagate updates to
1235 // the LB channel.
1236 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1237 // The LB channel should use the authority indicated by the target
1238 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
1239 // as opposed to the authority from the parent channel.
1240 GRPC_ARG_DEFAULT_AUTHORITY,
1241 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1242 // treated as a stand-alone channel and not inherit this argument from the
1243 // args of the parent channel.
1244 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1245 // Don't want to pass down channelz node from parent; the balancer
1246 // channel will get its own.
1247 GRPC_ARG_CHANNELZ_CHANNEL_NODE,
1248 };
1249 // Channel args to add.
1250 InlinedVector<grpc_arg, 3> args_to_add;
1251 // The fake resolver response generator, which we use to inject
1252 // address updates into the LB channel.
1253 args_to_add.emplace_back(
1254 grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1255 response_generator));
1256 // A channel arg indicating the target is a grpclb load balancer.
1257 args_to_add.emplace_back(grpc_channel_arg_integer_create(
1258 const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1));
1259 // The parent channel's channelz uuid.
1260 channelz::ChannelNode* channelz_node = nullptr;
1261 const grpc_arg* arg =
1262 grpc_channel_args_find(args, GRPC_ARG_CHANNELZ_CHANNEL_NODE);
1263 if (arg != nullptr && arg->type == GRPC_ARG_POINTER &&
1264 arg->value.pointer.p != nullptr) {
1265 channelz_node = static_cast<channelz::ChannelNode*>(arg->value.pointer.p);
1266 args_to_add.emplace_back(
1267 channelz::MakeParentUuidArg(channelz_node->uuid()));
1268 }
1269 // Construct channel args.
1270 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1271 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add.data(),
1272 args_to_add.size());
1273 // Make any necessary modifications for security.
1274 return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
1275 }
1276
1277 //
1278 // ctor and dtor
1279 //
1280
GrpcLb(Args args)1281 GrpcLb::GrpcLb(Args args)
1282 : LoadBalancingPolicy(std::move(args)),
1283 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1284 lb_call_backoff_(
1285 BackOff::Options()
1286 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1287 1000)
1288 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1289 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1290 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1291 1000)) {
1292 // Initialization.
1293 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1294 grpc_combiner_scheduler(combiner()));
1295 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1296 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1297 grpc_combiner_scheduler(args.combiner));
1298 // Record server name.
1299 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1300 const char* server_uri = grpc_channel_arg_get_string(arg);
1301 GPR_ASSERT(server_uri != nullptr);
1302 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1303 GPR_ASSERT(uri->path[0] != '\0');
1304 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1305 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1306 gpr_log(GPR_INFO,
1307 "[grpclb %p] Will use '%s' as the server name for LB request.",
1308 this, server_name_);
1309 }
1310 grpc_uri_destroy(uri);
1311 // Record LB call timeout.
1312 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1313 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1314 // Record fallback-at-startup timeout.
1315 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1316 fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1317 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1318 }
1319
~GrpcLb()1320 GrpcLb::~GrpcLb() {
1321 gpr_free((void*)server_name_);
1322 grpc_channel_args_destroy(args_);
1323 }
1324
ShutdownLocked()1325 void GrpcLb::ShutdownLocked() {
1326 shutting_down_ = true;
1327 lb_calld_.reset();
1328 if (retry_timer_callback_pending_) {
1329 grpc_timer_cancel(&lb_call_retry_timer_);
1330 }
1331 if (fallback_at_startup_checks_pending_) {
1332 grpc_timer_cancel(&lb_fallback_timer_);
1333 CancelBalancerChannelConnectivityWatchLocked();
1334 }
1335 if (child_policy_ != nullptr) {
1336 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1337 interested_parties());
1338 }
1339 if (pending_child_policy_ != nullptr) {
1340 grpc_pollset_set_del_pollset_set(
1341 pending_child_policy_->interested_parties(), interested_parties());
1342 }
1343 child_policy_.reset();
1344 pending_child_policy_.reset();
1345 // We destroy the LB channel here instead of in our destructor because
1346 // destroying the channel triggers a last callback to
1347 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1348 // alive when that callback is invoked.
1349 if (lb_channel_ != nullptr) {
1350 grpc_channel_destroy(lb_channel_);
1351 lb_channel_ = nullptr;
1352 }
1353 }
1354
1355 //
1356 // public methods
1357 //
1358
ResetBackoffLocked()1359 void GrpcLb::ResetBackoffLocked() {
1360 if (lb_channel_ != nullptr) {
1361 grpc_channel_reset_connect_backoff(lb_channel_);
1362 }
1363 if (child_policy_ != nullptr) {
1364 child_policy_->ResetBackoffLocked();
1365 }
1366 if (pending_child_policy_ != nullptr) {
1367 pending_child_policy_->ResetBackoffLocked();
1368 }
1369 }
1370
UpdateLocked(UpdateArgs args)1371 void GrpcLb::UpdateLocked(UpdateArgs args) {
1372 const bool is_initial_update = lb_channel_ == nullptr;
1373 auto* grpclb_config =
1374 static_cast<const ParsedGrpcLbConfig*>(args.config.get());
1375 if (grpclb_config != nullptr) {
1376 child_policy_config_ = grpclb_config->child_policy();
1377 } else {
1378 child_policy_config_ = nullptr;
1379 }
1380 ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1381 // Update the existing child policy.
1382 if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1383 // If this is the initial update, start the fallback-at-startup checks
1384 // and the balancer call.
1385 if (is_initial_update) {
1386 fallback_at_startup_checks_pending_ = true;
1387 // Start timer.
1388 grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1389 Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
1390 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1391 // Start watching the channel's connectivity state. If the channel
1392 // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1393 // fallback mode even if the fallback timeout has not elapsed.
1394 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1395 grpc_channel_get_channel_stack(lb_channel_));
1396 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1397 // Ref held by callback.
1398 Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1399 grpc_client_channel_watch_connectivity_state(
1400 client_channel_elem,
1401 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1402 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1403 nullptr);
1404 // Start balancer call.
1405 StartBalancerCallLocked();
1406 }
1407 }
1408
1409 //
1410 // helpers for UpdateLocked()
1411 //
1412
1413 // Returns the backend addresses extracted from the given addresses.
ExtractBackendAddresses(const ServerAddressList & addresses)1414 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1415 void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
1416 grpc_arg arg = grpc_channel_arg_pointer_create(
1417 const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
1418 &lb_token_arg_vtable);
1419 ServerAddressList backend_addresses;
1420 for (size_t i = 0; i < addresses.size(); ++i) {
1421 if (!addresses[i].IsBalancer()) {
1422 backend_addresses.emplace_back(
1423 addresses[i].address(),
1424 grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1425 }
1426 }
1427 return backend_addresses;
1428 }
1429
ProcessAddressesAndChannelArgsLocked(const ServerAddressList & addresses,const grpc_channel_args & args)1430 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1431 const ServerAddressList& addresses, const grpc_channel_args& args) {
1432 // Update fallback address list.
1433 fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1434 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1435 // since we use this to trigger the client_load_reporting filter.
1436 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1437 grpc_arg new_arg = grpc_channel_arg_string_create(
1438 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1439 grpc_channel_args_destroy(args_);
1440 args_ = grpc_channel_args_copy_and_add_and_remove(
1441 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1442 // Construct args for balancer channel.
1443 ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1444 grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1445 balancer_addresses, response_generator_.get(), &args);
1446 // Create balancer channel if needed.
1447 if (lb_channel_ == nullptr) {
1448 char* uri_str;
1449 gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1450 lb_channel_ =
1451 channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
1452 GPR_ASSERT(lb_channel_ != nullptr);
1453 gpr_free(uri_str);
1454 }
1455 // Propagate updates to the LB channel (pick_first) through the fake
1456 // resolver.
1457 Resolver::Result result;
1458 result.addresses = std::move(balancer_addresses);
1459 result.args = lb_channel_args;
1460 response_generator_->SetResponse(std::move(result));
1461 }
1462
OnBalancerChannelConnectivityChangedLocked(void * arg,grpc_error * error)1463 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1464 grpc_error* error) {
1465 GrpcLb* self = static_cast<GrpcLb*>(arg);
1466 if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1467 if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1468 // Not in TRANSIENT_FAILURE. Renew connectivity watch.
1469 grpc_channel_element* client_channel_elem =
1470 grpc_channel_stack_last_element(
1471 grpc_channel_get_channel_stack(self->lb_channel_));
1472 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1473 grpc_client_channel_watch_connectivity_state(
1474 client_channel_elem,
1475 grpc_polling_entity_create_from_pollset_set(
1476 self->interested_parties()),
1477 &self->lb_channel_connectivity_,
1478 &self->lb_channel_on_connectivity_changed_, nullptr);
1479 return; // Early out so we don't drop the ref below.
1480 }
1481 // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
1482 // fallback mode immediately.
1483 gpr_log(GPR_INFO,
1484 "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1485 "entering fallback mode",
1486 self);
1487 self->fallback_at_startup_checks_pending_ = false;
1488 grpc_timer_cancel(&self->lb_fallback_timer_);
1489 self->fallback_mode_ = true;
1490 self->CreateOrUpdateChildPolicyLocked();
1491 }
1492 // Done watching connectivity state, so drop ref.
1493 self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1494 }
1495
CancelBalancerChannelConnectivityWatchLocked()1496 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1497 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1498 grpc_channel_get_channel_stack(lb_channel_));
1499 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1500 grpc_client_channel_watch_connectivity_state(
1501 client_channel_elem,
1502 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1503 nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1504 }
1505
1506 //
1507 // code for balancer channel and call
1508 //
1509
StartBalancerCallLocked()1510 void GrpcLb::StartBalancerCallLocked() {
1511 GPR_ASSERT(lb_channel_ != nullptr);
1512 if (shutting_down_) return;
1513 // Init the LB call data.
1514 GPR_ASSERT(lb_calld_ == nullptr);
1515 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1516 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1517 gpr_log(GPR_INFO,
1518 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1519 this, lb_channel_, lb_calld_.get());
1520 }
1521 lb_calld_->StartQuery();
1522 }
1523
StartBalancerCallRetryTimerLocked()1524 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1525 grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1526 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1527 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1528 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1529 if (timeout > 0) {
1530 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1531 this, timeout);
1532 } else {
1533 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1534 this);
1535 }
1536 }
1537 // TODO(roth): We currently track this ref manually. Once the
1538 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1539 // with the callback.
1540 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1541 self.release();
1542 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1543 this, grpc_combiner_scheduler(combiner()));
1544 retry_timer_callback_pending_ = true;
1545 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1546 }
1547
OnBalancerCallRetryTimerLocked(void * arg,grpc_error * error)1548 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1549 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1550 grpclb_policy->retry_timer_callback_pending_ = false;
1551 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1552 grpclb_policy->lb_calld_ == nullptr) {
1553 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1554 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1555 grpclb_policy);
1556 }
1557 grpclb_policy->StartBalancerCallLocked();
1558 }
1559 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1560 }
1561
1562 //
1563 // code for handling fallback mode
1564 //
1565
MaybeEnterFallbackModeAfterStartup()1566 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1567 // Enter fallback mode if all of the following are true:
1568 // - We are not currently in fallback mode.
1569 // - We are not currently waiting for the initial fallback timeout.
1570 // - We are not currently in contact with the balancer.
1571 // - The child policy is not in state READY.
1572 if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1573 (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1574 !child_policy_ready_) {
1575 gpr_log(GPR_INFO,
1576 "[grpclb %p] lost contact with balancer and backends from "
1577 "most recent serverlist; entering fallback mode",
1578 this);
1579 fallback_mode_ = true;
1580 CreateOrUpdateChildPolicyLocked();
1581 }
1582 }
1583
OnFallbackTimerLocked(void * arg,grpc_error * error)1584 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1585 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1586 // If we receive a serverlist after the timer fires but before this callback
1587 // actually runs, don't fall back.
1588 if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1589 !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1590 gpr_log(GPR_INFO,
1591 "[grpclb %p] No response from balancer after fallback timeout; "
1592 "entering fallback mode",
1593 grpclb_policy);
1594 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1595 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1596 grpclb_policy->fallback_mode_ = true;
1597 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1598 }
1599 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1600 }
1601
1602 //
1603 // code for interacting with the child policy
1604 //
1605
CreateChildPolicyArgsLocked(bool is_backend_from_grpclb_load_balancer)1606 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1607 bool is_backend_from_grpclb_load_balancer) {
1608 InlinedVector<grpc_arg, 2> args_to_add;
1609 args_to_add.emplace_back(grpc_channel_arg_integer_create(
1610 const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1611 is_backend_from_grpclb_load_balancer));
1612 if (is_backend_from_grpclb_load_balancer) {
1613 args_to_add.emplace_back(grpc_channel_arg_integer_create(
1614 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
1615 }
1616 return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
1617 args_to_add.size());
1618 }
1619
CreateChildPolicyLocked(const char * name,const grpc_channel_args * args)1620 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1621 const char* name, const grpc_channel_args* args) {
1622 Helper* helper = New<Helper>(Ref());
1623 LoadBalancingPolicy::Args lb_policy_args;
1624 lb_policy_args.combiner = combiner();
1625 lb_policy_args.args = args;
1626 lb_policy_args.channel_control_helper =
1627 UniquePtr<ChannelControlHelper>(helper);
1628 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1629 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1630 name, std::move(lb_policy_args));
1631 if (GPR_UNLIKELY(lb_policy == nullptr)) {
1632 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1633 name);
1634 return nullptr;
1635 }
1636 helper->set_child(lb_policy.get());
1637 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1638 gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1639 name, lb_policy.get());
1640 }
1641 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1642 // created child policy. This will make the child policy progress upon
1643 // activity on gRPC LB, which in turn is tied to the application's call.
1644 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1645 interested_parties());
1646 return lb_policy;
1647 }
1648
CreateOrUpdateChildPolicyLocked()1649 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1650 if (shutting_down_) return;
1651 // Construct update args.
1652 UpdateArgs update_args;
1653 bool is_backend_from_grpclb_load_balancer = false;
1654 if (fallback_mode_) {
1655 // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1656 // received any serverlist from the balancer, we use the fallback backends
1657 // returned by the resolver. Note that the fallback backend list may be
1658 // empty, in which case the new round_robin policy will keep the requested
1659 // picks pending.
1660 update_args.addresses = fallback_backend_addresses_;
1661 } else {
1662 update_args.addresses = serverlist_->GetServerAddressList(
1663 lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1664 is_backend_from_grpclb_load_balancer = true;
1665 }
1666 update_args.args =
1667 CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1668 GPR_ASSERT(update_args.args != nullptr);
1669 update_args.config = child_policy_config_;
1670 // If the child policy name changes, we need to create a new child
1671 // policy. When this happens, we leave child_policy_ as-is and store
1672 // the new child policy in pending_child_policy_. Once the new child
1673 // policy transitions into state READY, we swap it into child_policy_,
1674 // replacing the original child policy. So pending_child_policy_ is
1675 // non-null only between when we apply an update that changes the child
1676 // policy name and when the new child reports state READY.
1677 //
1678 // Updates can arrive at any point during this transition. We always
1679 // apply updates relative to the most recently created child policy,
1680 // even if the most recent one is still in pending_child_policy_. This
1681 // is true both when applying the updates to an existing child policy
1682 // and when determining whether we need to create a new policy.
1683 //
1684 // As a result of this, there are several cases to consider here:
1685 //
1686 // 1. We have no existing child policy (i.e., we have started up but
1687 // have not yet received a serverlist from the balancer or gone
1688 // into fallback mode; in this case, both child_policy_ and
1689 // pending_child_policy_ are null). In this case, we create a
1690 // new child policy and store it in child_policy_.
1691 //
1692 // 2. We have an existing child policy and have no pending child policy
1693 // from a previous update (i.e., either there has not been a
1694 // previous update that changed the policy name, or we have already
1695 // finished swapping in the new policy; in this case, child_policy_
1696 // is non-null but pending_child_policy_ is null). In this case:
1697 // a. If child_policy_->name() equals child_policy_name, then we
1698 // update the existing child policy.
1699 // b. If child_policy_->name() does not equal child_policy_name,
1700 // we create a new policy. The policy will be stored in
1701 // pending_child_policy_ and will later be swapped into
1702 // child_policy_ by the helper when the new child transitions
1703 // into state READY.
1704 //
1705 // 3. We have an existing child policy and have a pending child policy
1706 // from a previous update (i.e., a previous update set
1707 // pending_child_policy_ as per case 2b above and that policy has
1708 // not yet transitioned into state READY and been swapped into
1709 // child_policy_; in this case, both child_policy_ and
1710 // pending_child_policy_ are non-null). In this case:
1711 // a. If pending_child_policy_->name() equals child_policy_name,
1712 // then we update the existing pending child policy.
1713 // b. If pending_child_policy->name() does not equal
1714 // child_policy_name, then we create a new policy. The new
1715 // policy is stored in pending_child_policy_ (replacing the one
1716 // that was there before, which will be immediately shut down)
1717 // and will later be swapped into child_policy_ by the helper
1718 // when the new child transitions into state READY.
1719 const char* child_policy_name = child_policy_config_ == nullptr
1720 ? "round_robin"
1721 : child_policy_config_->name();
1722 const bool create_policy =
1723 // case 1
1724 child_policy_ == nullptr ||
1725 // case 2b
1726 (pending_child_policy_ == nullptr &&
1727 strcmp(child_policy_->name(), child_policy_name) != 0) ||
1728 // case 3b
1729 (pending_child_policy_ != nullptr &&
1730 strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1731 LoadBalancingPolicy* policy_to_update = nullptr;
1732 if (create_policy) {
1733 // Cases 1, 2b, and 3b: create a new child policy.
1734 // If child_policy_ is null, we set it (case 1), else we set
1735 // pending_child_policy_ (cases 2b and 3b).
1736 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1737 gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1738 child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1739 }
1740 // Swap the policy into place.
1741 auto& lb_policy =
1742 child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1743 lb_policy = CreateChildPolicyLocked(child_policy_name, update_args.args);
1744 policy_to_update = lb_policy.get();
1745 } else {
1746 // Cases 2a and 3a: update an existing policy.
1747 // If we have a pending child policy, send the update to the pending
1748 // policy (case 3a), else send it to the current policy (case 2a).
1749 policy_to_update = pending_child_policy_ != nullptr
1750 ? pending_child_policy_.get()
1751 : child_policy_.get();
1752 }
1753 GPR_ASSERT(policy_to_update != nullptr);
1754 // Update the policy.
1755 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1756 gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1757 policy_to_update == pending_child_policy_.get() ? "pending " : "",
1758 policy_to_update);
1759 }
1760 policy_to_update->UpdateLocked(std::move(update_args));
1761 }
1762
1763 //
1764 // factory
1765 //
1766
1767 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1768 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const1769 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1770 LoadBalancingPolicy::Args args) const override {
1771 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1772 }
1773
name() const1774 const char* name() const override { return kGrpclb; }
1775
ParseLoadBalancingConfig(const grpc_json * json,grpc_error ** error) const1776 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
1777 const grpc_json* json, grpc_error** error) const override {
1778 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1779 if (json == nullptr) {
1780 return RefCountedPtr<LoadBalancingPolicy::Config>(
1781 New<ParsedGrpcLbConfig>(nullptr));
1782 }
1783 InlinedVector<grpc_error*, 2> error_list;
1784 RefCountedPtr<LoadBalancingPolicy::Config> child_policy;
1785 for (const grpc_json* field = json->child; field != nullptr;
1786 field = field->next) {
1787 if (field->key == nullptr) continue;
1788 if (strcmp(field->key, "childPolicy") == 0) {
1789 if (child_policy != nullptr) {
1790 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1791 "field:childPolicy error:Duplicate entry"));
1792 }
1793 grpc_error* parse_error = GRPC_ERROR_NONE;
1794 child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1795 field, &parse_error);
1796 if (parse_error != GRPC_ERROR_NONE) {
1797 error_list.push_back(parse_error);
1798 }
1799 }
1800 }
1801 if (error_list.empty()) {
1802 return RefCountedPtr<LoadBalancingPolicy::Config>(
1803 New<ParsedGrpcLbConfig>(std::move(child_policy)));
1804 } else {
1805 *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
1806 return nullptr;
1807 }
1808 }
1809 };
1810
1811 } // namespace
1812
1813 } // namespace grpc_core
1814
1815 //
1816 // Plugin registration
1817 //
1818
1819 namespace {
1820
1821 // Only add client_load_reporting filter if the grpclb LB policy is used.
maybe_add_client_load_reporting_filter(grpc_channel_stack_builder * builder,void * arg)1822 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1823 void* arg) {
1824 const grpc_channel_args* args =
1825 grpc_channel_stack_builder_get_channel_arguments(builder);
1826 const grpc_arg* channel_arg =
1827 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1828 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1829 strcmp(channel_arg->value.string, "grpclb") == 0) {
1830 return grpc_channel_stack_builder_append_filter(
1831 builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1832 }
1833 return true;
1834 }
1835
1836 } // namespace
1837
grpc_lb_policy_grpclb_init()1838 void grpc_lb_policy_grpclb_init() {
1839 grpc_core::LoadBalancingPolicyRegistry::Builder::
1840 RegisterLoadBalancingPolicyFactory(
1841 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1842 grpc_core::New<grpc_core::GrpcLbFactory>()));
1843 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1844 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1845 maybe_add_client_load_reporting_filter,
1846 (void*)&grpc_client_load_reporting_filter);
1847 }
1848
grpc_lb_policy_grpclb_shutdown()1849 void grpc_lb_policy_grpclb_shutdown() {}
1850