1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24
25 #include <grpc/grpc.h>
26
27 #include "src/core/ext/filters/client_channel/lb_policy.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 #include "src/core/lib/transport/error_utils.h"
38
39 namespace grpc_core {
40
41 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
42
43 namespace {
44
45 constexpr char kPriority[] = "priority_experimental";
46
47 // How long we keep a child around for after it is no longer being used
48 // (either because it has been removed from the config or because we
49 // have switched to a higher-priority child).
50 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
51
52 // Default for how long we wait for a newly created child to get connected
53 // before starting to attempt the next priority. Overridable via channel arg.
54 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
55
56 // Config for priority LB policy.
57 class PriorityLbConfig : public LoadBalancingPolicy::Config {
58 public:
59 struct PriorityLbChild {
60 RefCountedPtr<LoadBalancingPolicy::Config> config;
61 bool ignore_reresolution_requests = false;
62 };
63
PriorityLbConfig(std::map<std::string,PriorityLbChild> children,std::vector<std::string> priorities)64 PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
65 std::vector<std::string> priorities)
66 : children_(std::move(children)), priorities_(std::move(priorities)) {}
67
name() const68 const char* name() const override { return kPriority; }
69
children() const70 const std::map<std::string, PriorityLbChild>& children() const {
71 return children_;
72 }
priorities() const73 const std::vector<std::string>& priorities() const { return priorities_; }
74
75 private:
76 const std::map<std::string, PriorityLbChild> children_;
77 const std::vector<std::string> priorities_;
78 };
79
80 // priority LB policy.
81 class PriorityLb : public LoadBalancingPolicy {
82 public:
83 explicit PriorityLb(Args args);
84
name() const85 const char* name() const override { return kPriority; }
86
87 void UpdateLocked(UpdateArgs args) override;
88 void ExitIdleLocked() override;
89 void ResetBackoffLocked() override;
90
91 private:
92 // Each ChildPriority holds a ref to the PriorityLb.
93 class ChildPriority : public InternallyRefCounted<ChildPriority> {
94 public:
95 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
96
~ChildPriority()97 ~ChildPriority() override {
98 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
99 }
100
name() const101 const std::string& name() const { return name_; }
102
103 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
104 bool ignore_reresolution_requests);
105 void ExitIdleLocked();
106 void ResetBackoffLocked();
107 void DeactivateLocked();
108 void MaybeReactivateLocked();
109 void MaybeCancelFailoverTimerLocked();
110
111 void Orphan() override;
112
GetPicker()113 std::unique_ptr<SubchannelPicker> GetPicker() {
114 return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
115 }
116
connectivity_state() const117 grpc_connectivity_state connectivity_state() const {
118 return connectivity_state_;
119 }
120
connectivity_status() const121 const absl::Status& connectivity_status() const {
122 return connectivity_status_;
123 }
124
failover_timer_callback_pending() const125 bool failover_timer_callback_pending() const {
126 return failover_timer_callback_pending_;
127 }
128
129 private:
130 // A simple wrapper for ref-counting a picker from the child policy.
131 class RefCountedPicker : public RefCounted<RefCountedPicker> {
132 public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)133 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
134 : picker_(std::move(picker)) {}
Pick(PickArgs args)135 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
136
137 private:
138 std::unique_ptr<SubchannelPicker> picker_;
139 };
140
141 // A non-ref-counted wrapper for RefCountedPicker.
142 class RefCountedPickerWrapper : public SubchannelPicker {
143 public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)144 explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
145 : picker_(std::move(picker)) {}
Pick(PickArgs args)146 PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
147
148 private:
149 RefCountedPtr<RefCountedPicker> picker_;
150 };
151
152 class Helper : public ChannelControlHelper {
153 public:
Helper(RefCountedPtr<ChildPriority> priority)154 explicit Helper(RefCountedPtr<ChildPriority> priority)
155 : priority_(std::move(priority)) {}
156
~Helper()157 ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
158
159 RefCountedPtr<SubchannelInterface> CreateSubchannel(
160 ServerAddress address, const grpc_channel_args& args) override;
161 void UpdateState(grpc_connectivity_state state,
162 const absl::Status& status,
163 std::unique_ptr<SubchannelPicker> picker) override;
164 void RequestReresolution() override;
165 absl::string_view GetAuthority() override;
166 void AddTraceEvent(TraceSeverity severity,
167 absl::string_view message) override;
168
169 private:
170 RefCountedPtr<ChildPriority> priority_;
171 };
172
173 // Methods for dealing with the child policy.
174 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
175 const grpc_channel_args* args);
176
177 void OnConnectivityStateUpdateLocked(
178 grpc_connectivity_state state, const absl::Status& status,
179 std::unique_ptr<SubchannelPicker> picker);
180
181 void StartFailoverTimerLocked();
182
183 static void OnFailoverTimer(void* arg, grpc_error_handle error);
184 void OnFailoverTimerLocked(grpc_error_handle error);
185 static void OnDeactivationTimer(void* arg, grpc_error_handle error);
186 void OnDeactivationTimerLocked(grpc_error_handle error);
187
188 RefCountedPtr<PriorityLb> priority_policy_;
189 const std::string name_;
190 bool ignore_reresolution_requests_ = false;
191
192 OrphanablePtr<LoadBalancingPolicy> child_policy_;
193
194 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
195 absl::Status connectivity_status_;
196 RefCountedPtr<RefCountedPicker> picker_wrapper_;
197
198 // States for delayed removal.
199 grpc_timer deactivation_timer_;
200 grpc_closure on_deactivation_timer_;
201 bool deactivation_timer_callback_pending_ = false;
202
203 // States of failover.
204 grpc_timer failover_timer_;
205 grpc_closure on_failover_timer_;
206 bool failover_timer_callback_pending_ = false;
207 };
208
209 ~PriorityLb() override;
210
211 void ShutdownLocked() override;
212
213 // Returns UINT32_MAX if child is not in current priority list.
214 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
215
216 void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
217 void DeleteChild(ChildPriority* child);
218
219 void TryNextPriorityLocked(bool report_connecting);
220 void SelectPriorityLocked(uint32_t priority);
221
222 const int child_failover_timeout_ms_;
223
224 // Current channel args and config from the resolver.
225 const grpc_channel_args* args_ = nullptr;
226 RefCountedPtr<PriorityLbConfig> config_;
227 HierarchicalAddressMap addresses_;
228
229 // Internal state.
230 bool shutting_down_ = false;
231
232 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
233 // The priority that is being used.
234 uint32_t current_priority_ = UINT32_MAX;
235 // Points to the current child from before the most recent update.
236 // We will continue to use this child until we decide which of the new
237 // children to use.
238 ChildPriority* current_child_from_before_update_ = nullptr;
239 };
240
241 //
242 // PriorityLb
243 //
244
PriorityLb(Args args)245 PriorityLb::PriorityLb(Args args)
246 : LoadBalancingPolicy(std::move(args)),
247 child_failover_timeout_ms_(grpc_channel_args_find_integer(
248 args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
249 {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
250 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
251 gpr_log(GPR_INFO, "[priority_lb %p] created", this);
252 }
253 }
254
~PriorityLb()255 PriorityLb::~PriorityLb() {
256 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
257 gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
258 }
259 grpc_channel_args_destroy(args_);
260 }
261
ShutdownLocked()262 void PriorityLb::ShutdownLocked() {
263 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
264 gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
265 }
266 shutting_down_ = true;
267 children_.clear();
268 }
269
ExitIdleLocked()270 void PriorityLb::ExitIdleLocked() {
271 if (current_priority_ != UINT32_MAX) {
272 const std::string& child_name = config_->priorities()[current_priority_];
273 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
274 gpr_log(GPR_INFO,
275 "[priority_lb %p] exiting IDLE for current priority %d child %s",
276 this, current_priority_, child_name.c_str());
277 }
278 children_[child_name]->ExitIdleLocked();
279 }
280 }
281
ResetBackoffLocked()282 void PriorityLb::ResetBackoffLocked() {
283 for (const auto& p : children_) p.second->ResetBackoffLocked();
284 }
285
UpdateLocked(UpdateArgs args)286 void PriorityLb::UpdateLocked(UpdateArgs args) {
287 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
288 gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
289 }
290 // Save current child.
291 if (current_priority_ != UINT32_MAX) {
292 const std::string& child_name = config_->priorities()[current_priority_];
293 current_child_from_before_update_ = children_[child_name].get();
294 // Unset current_priority_, since it was an index into the old
295 // config's priority list and may no longer be valid. It will be
296 // reset later by TryNextPriorityLocked(), but we unset it here in
297 // case updating any of our children triggers a state update.
298 current_priority_ = UINT32_MAX;
299 }
300 // Update config.
301 config_ = std::move(args.config);
302 // Update args.
303 grpc_channel_args_destroy(args_);
304 args_ = args.args;
305 args.args = nullptr;
306 // Update addresses.
307 addresses_ = MakeHierarchicalAddressMap(args.addresses);
308 // Check all existing children against the new config.
309 for (const auto& p : children_) {
310 const std::string& child_name = p.first;
311 auto& child = p.second;
312 auto config_it = config_->children().find(child_name);
313 if (config_it == config_->children().end()) {
314 // Existing child not found in new config. Deactivate it.
315 child->DeactivateLocked();
316 } else {
317 // Existing child found in new config. Update it.
318 child->UpdateLocked(config_it->second.config,
319 config_it->second.ignore_reresolution_requests);
320 }
321 }
322 // Try to get connected.
323 TryNextPriorityLocked(/*report_connecting=*/children_.empty());
324 }
325
GetChildPriorityLocked(const std::string & child_name) const326 uint32_t PriorityLb::GetChildPriorityLocked(
327 const std::string& child_name) const {
328 for (uint32_t priority = 0; priority < config_->priorities().size();
329 ++priority) {
330 if (config_->priorities()[priority] == child_name) return priority;
331 }
332 return UINT32_MAX;
333 }
334
HandleChildConnectivityStateChangeLocked(ChildPriority * child)335 void PriorityLb::HandleChildConnectivityStateChangeLocked(
336 ChildPriority* child) {
337 // Special case for the child that was the current child before the
338 // most recent update.
339 if (child == current_child_from_before_update_) {
340 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
341 gpr_log(GPR_INFO,
342 "[priority_lb %p] state update for current child from before "
343 "config update",
344 this);
345 }
346 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
347 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
348 // If it's still READY or IDLE, we stick with this child, so pass
349 // the new picker up to our parent.
350 channel_control_helper()->UpdateState(child->connectivity_state(),
351 child->connectivity_status(),
352 child->GetPicker());
353 } else {
354 // If it's no longer READY or IDLE, we should stop using it.
355 // We already started trying other priorities as a result of the
356 // update, but calling TryNextPriorityLocked() ensures that we will
357 // properly select between CONNECTING and TRANSIENT_FAILURE as the
358 // new state to report to our parent.
359 current_child_from_before_update_ = nullptr;
360 TryNextPriorityLocked(/*report_connecting=*/true);
361 }
362 return;
363 }
364 // Otherwise, find the child's priority.
365 uint32_t child_priority = GetChildPriorityLocked(child->name());
366 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
367 gpr_log(GPR_INFO,
368 "[priority_lb %p] state update for priority %u, child %s, current "
369 "priority %u",
370 this, child_priority, child->name().c_str(), current_priority_);
371 }
372 // Ignore priorities not in the current config.
373 if (child_priority == UINT32_MAX) return;
374 // Ignore lower-than-current priorities.
375 if (child_priority > current_priority_) return;
376 // If a child reports TRANSIENT_FAILURE, start trying the next priority.
377 // Note that even if this is for a higher-than-current priority, we
378 // may still need to create some children between this priority and
379 // the current one (e.g., if we got an update that inserted new
380 // priorities ahead of the current one).
381 if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
382 TryNextPriorityLocked(
383 /*report_connecting=*/child_priority == current_priority_);
384 return;
385 }
386 // The update is for a higher-than-current priority (or for any
387 // priority if we don't have any current priority).
388 if (child_priority < current_priority_) {
389 // If the child reports READY or IDLE, switch to that priority.
390 // Otherwise, ignore the update.
391 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
392 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
393 SelectPriorityLocked(child_priority);
394 }
395 return;
396 }
397 // The current priority has returned a new picker, so pass it up to
398 // our parent.
399 channel_control_helper()->UpdateState(child->connectivity_state(),
400 child->connectivity_status(),
401 child->GetPicker());
402 }
403
DeleteChild(ChildPriority * child)404 void PriorityLb::DeleteChild(ChildPriority* child) {
405 // If this was the current child from before the most recent update,
406 // stop using it. We already started trying other priorities as a
407 // result of the update, but calling TryNextPriorityLocked() ensures that
408 // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
409 // new state to report to our parent.
410 if (current_child_from_before_update_ == child) {
411 current_child_from_before_update_ = nullptr;
412 TryNextPriorityLocked(/*report_connecting=*/true);
413 }
414 children_.erase(child->name());
415 }
416
TryNextPriorityLocked(bool report_connecting)417 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
418 current_priority_ = UINT32_MAX;
419 for (uint32_t priority = 0; priority < config_->priorities().size();
420 ++priority) {
421 // If the child for the priority does not exist yet, create it.
422 const std::string& child_name = config_->priorities()[priority];
423 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
424 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
425 priority, child_name.c_str());
426 }
427 auto& child = children_[child_name];
428 if (child == nullptr) {
429 if (report_connecting) {
430 channel_control_helper()->UpdateState(
431 GRPC_CHANNEL_CONNECTING, absl::Status(),
432 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
433 }
434 child = MakeOrphanable<ChildPriority>(
435 Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
436 auto child_config = config_->children().find(child_name);
437 GPR_DEBUG_ASSERT(child_config != config_->children().end());
438 child->UpdateLocked(child_config->second.config,
439 child_config->second.ignore_reresolution_requests);
440 return;
441 }
442 // The child already exists.
443 child->MaybeReactivateLocked();
444 // If the child is in state READY or IDLE, switch to it.
445 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
446 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
447 SelectPriorityLocked(priority);
448 return;
449 }
450 // Child is not READY or IDLE.
451 // If its failover timer is still pending, give it time to fire.
452 if (child->failover_timer_callback_pending()) {
453 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
454 gpr_log(GPR_INFO,
455 "[priority_lb %p] priority %u, child %s: child still "
456 "attempting to connect, will wait",
457 this, priority, child_name.c_str());
458 }
459 if (report_connecting) {
460 channel_control_helper()->UpdateState(
461 GRPC_CHANNEL_CONNECTING, absl::Status(),
462 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
463 }
464 return;
465 }
466 // Child has been failing for a while. Move on to the next priority.
467 }
468 // If there are no more priorities to try, report TRANSIENT_FAILURE.
469 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
470 gpr_log(GPR_INFO,
471 "[priority_lb %p] no priority reachable, putting channel in "
472 "TRANSIENT_FAILURE",
473 this);
474 }
475 current_child_from_before_update_ = nullptr;
476 absl::Status status = absl::UnavailableError("no ready priority");
477 channel_control_helper()->UpdateState(
478 GRPC_CHANNEL_TRANSIENT_FAILURE, status,
479 absl::make_unique<TransientFailurePicker>(status));
480 }
481
SelectPriorityLocked(uint32_t priority)482 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
483 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
484 gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
485 priority, config_->priorities()[priority].c_str());
486 }
487 current_priority_ = priority;
488 current_child_from_before_update_ = nullptr;
489 // Deactivate lower priorities.
490 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
491 const std::string& child_name = config_->priorities()[p];
492 auto it = children_.find(child_name);
493 if (it != children_.end()) it->second->DeactivateLocked();
494 }
495 // Update picker.
496 auto& child = children_[config_->priorities()[priority]];
497 channel_control_helper()->UpdateState(child->connectivity_state(),
498 child->connectivity_status(),
499 child->GetPicker());
500 }
501
502 //
503 // PriorityLb::ChildPriority
504 //
505
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)506 PriorityLb::ChildPriority::ChildPriority(
507 RefCountedPtr<PriorityLb> priority_policy, std::string name)
508 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
509 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
510 gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
511 priority_policy_.get(), name_.c_str(), this);
512 }
513 GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
514 grpc_schedule_on_exec_ctx);
515 GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
516 grpc_schedule_on_exec_ctx);
517 // Start the failover timer.
518 StartFailoverTimerLocked();
519 }
520
Orphan()521 void PriorityLb::ChildPriority::Orphan() {
522 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
523 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
524 priority_policy_.get(), name_.c_str(), this);
525 }
526 MaybeCancelFailoverTimerLocked();
527 if (deactivation_timer_callback_pending_) {
528 grpc_timer_cancel(&deactivation_timer_);
529 }
530 // Remove the child policy's interested_parties pollset_set from the
531 // xDS policy.
532 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
533 priority_policy_->interested_parties());
534 child_policy_.reset();
535 // Drop our ref to the child's picker, in case it's holding a ref to
536 // the child.
537 picker_wrapper_.reset();
538 if (deactivation_timer_callback_pending_) {
539 grpc_timer_cancel(&deactivation_timer_);
540 }
541 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
542 }
543
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)544 void PriorityLb::ChildPriority::UpdateLocked(
545 RefCountedPtr<LoadBalancingPolicy::Config> config,
546 bool ignore_reresolution_requests) {
547 if (priority_policy_->shutting_down_) return;
548 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
549 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
550 priority_policy_.get(), name_.c_str(), this);
551 }
552 ignore_reresolution_requests_ = ignore_reresolution_requests;
553 // Create policy if needed.
554 if (child_policy_ == nullptr) {
555 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
556 }
557 // Construct update args.
558 UpdateArgs update_args;
559 update_args.config = std::move(config);
560 update_args.addresses = priority_policy_->addresses_[name_];
561 update_args.args = grpc_channel_args_copy(priority_policy_->args_);
562 // Update the policy.
563 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
564 gpr_log(GPR_INFO,
565 "[priority_lb %p] child %s (%p): updating child policy handler %p",
566 priority_policy_.get(), name_.c_str(), this, child_policy_.get());
567 }
568 child_policy_->UpdateLocked(std::move(update_args));
569 }
570
571 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)572 PriorityLb::ChildPriority::CreateChildPolicyLocked(
573 const grpc_channel_args* args) {
574 LoadBalancingPolicy::Args lb_policy_args;
575 lb_policy_args.work_serializer = priority_policy_->work_serializer();
576 lb_policy_args.args = args;
577 lb_policy_args.channel_control_helper =
578 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
579 OrphanablePtr<LoadBalancingPolicy> lb_policy =
580 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
581 &grpc_lb_priority_trace);
582 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
583 gpr_log(GPR_INFO,
584 "[priority_lb %p] child %s (%p): created new child policy "
585 "handler %p",
586 priority_policy_.get(), name_.c_str(), this, lb_policy.get());
587 }
588 // Add the parent's interested_parties pollset_set to that of the newly
589 // created child policy. This will make the child policy progress upon
590 // activity on the parent LB, which in turn is tied to the application's call.
591 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
592 priority_policy_->interested_parties());
593 return lb_policy;
594 }
595
ExitIdleLocked()596 void PriorityLb::ChildPriority::ExitIdleLocked() {
597 if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
598 !failover_timer_callback_pending_) {
599 StartFailoverTimerLocked();
600 }
601 child_policy_->ExitIdleLocked();
602 }
603
ResetBackoffLocked()604 void PriorityLb::ChildPriority::ResetBackoffLocked() {
605 child_policy_->ResetBackoffLocked();
606 }
607
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)608 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
609 grpc_connectivity_state state, const absl::Status& status,
610 std::unique_ptr<SubchannelPicker> picker) {
611 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
612 gpr_log(GPR_INFO,
613 "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
614 priority_policy_.get(), name_.c_str(), this,
615 ConnectivityStateName(state), status.ToString().c_str(),
616 picker.get());
617 }
618 // Store the state and picker.
619 connectivity_state_ = state;
620 connectivity_status_ = status;
621 picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
622 // If READY or TRANSIENT_FAILURE, cancel failover timer.
623 if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
624 MaybeCancelFailoverTimerLocked();
625 }
626 // Notify the parent policy.
627 priority_policy_->HandleChildConnectivityStateChangeLocked(this);
628 }
629
StartFailoverTimerLocked()630 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
631 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
632 gpr_log(GPR_INFO,
633 "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
634 priority_policy_.get(), name_.c_str(), this,
635 priority_policy_->child_failover_timeout_ms_);
636 }
637 Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
638 grpc_timer_init(
639 &failover_timer_,
640 ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
641 &on_failover_timer_);
642 failover_timer_callback_pending_ = true;
643 }
644
MaybeCancelFailoverTimerLocked()645 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
646 if (failover_timer_callback_pending_) {
647 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
648 gpr_log(GPR_INFO,
649 "[priority_lb %p] child %s (%p): cancelling failover timer",
650 priority_policy_.get(), name_.c_str(), this);
651 }
652 grpc_timer_cancel(&failover_timer_);
653 failover_timer_callback_pending_ = false;
654 }
655 }
656
OnFailoverTimer(void * arg,grpc_error_handle error)657 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg,
658 grpc_error_handle error) {
659 ChildPriority* self = static_cast<ChildPriority*>(arg);
660 (void)GRPC_ERROR_REF(error); // ref owned by lambda
661 self->priority_policy_->work_serializer()->Run(
662 [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
663 }
664
OnFailoverTimerLocked(grpc_error_handle error)665 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error_handle error) {
666 if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
667 !priority_policy_->shutting_down_) {
668 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
669 gpr_log(GPR_INFO,
670 "[priority_lb %p] child %s (%p): failover timer fired, "
671 "reporting TRANSIENT_FAILURE",
672 priority_policy_.get(), name_.c_str(), this);
673 }
674 failover_timer_callback_pending_ = false;
675 OnConnectivityStateUpdateLocked(
676 GRPC_CHANNEL_TRANSIENT_FAILURE,
677 absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
678 nullptr);
679 }
680 Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
681 GRPC_ERROR_UNREF(error);
682 }
683
DeactivateLocked()684 void PriorityLb::ChildPriority::DeactivateLocked() {
685 // If already deactivated, don't do it again.
686 if (deactivation_timer_callback_pending_) return;
687 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
688 gpr_log(GPR_INFO,
689 "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
690 "ms.",
691 priority_policy_.get(), name_.c_str(), this,
692 kChildRetentionIntervalMs);
693 }
694 MaybeCancelFailoverTimerLocked();
695 // Start a timer to delete the child.
696 Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
697 grpc_timer_init(&deactivation_timer_,
698 ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
699 &on_deactivation_timer_);
700 deactivation_timer_callback_pending_ = true;
701 }
702
MaybeReactivateLocked()703 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
704 if (deactivation_timer_callback_pending_) {
705 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
706 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
707 priority_policy_.get(), name_.c_str(), this);
708 }
709 deactivation_timer_callback_pending_ = false;
710 grpc_timer_cancel(&deactivation_timer_);
711 }
712 }
713
OnDeactivationTimer(void * arg,grpc_error_handle error)714 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
715 grpc_error_handle error) {
716 ChildPriority* self = static_cast<ChildPriority*>(arg);
717 (void)GRPC_ERROR_REF(error); // ref owned by lambda
718 self->priority_policy_->work_serializer()->Run(
719 [self, error]() { self->OnDeactivationTimerLocked(error); },
720 DEBUG_LOCATION);
721 }
722
OnDeactivationTimerLocked(grpc_error_handle error)723 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(
724 grpc_error_handle error) {
725 if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
726 !priority_policy_->shutting_down_) {
727 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
728 gpr_log(GPR_INFO,
729 "[priority_lb %p] child %s (%p): deactivation timer fired, "
730 "deleting child",
731 priority_policy_.get(), name_.c_str(), this);
732 }
733 deactivation_timer_callback_pending_ = false;
734 priority_policy_->DeleteChild(this);
735 }
736 Unref(DEBUG_LOCATION, "ChildPriority+timer");
737 GRPC_ERROR_UNREF(error);
738 }
739
740 //
741 // PriorityLb::ChildPriority::Helper
742 //
743
744 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)745 PriorityLb::ChildPriority::Helper::CreateSubchannel(
746 ServerAddress address, const grpc_channel_args& args) {
747 if (priority_->priority_policy_->shutting_down_) return nullptr;
748 return priority_->priority_policy_->channel_control_helper()
749 ->CreateSubchannel(std::move(address), args);
750 }
751
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)752 void PriorityLb::ChildPriority::Helper::UpdateState(
753 grpc_connectivity_state state, const absl::Status& status,
754 std::unique_ptr<SubchannelPicker> picker) {
755 if (priority_->priority_policy_->shutting_down_) return;
756 // Notify the priority.
757 priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
758 }
759
RequestReresolution()760 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
761 if (priority_->priority_policy_->shutting_down_) return;
762 if (priority_->ignore_reresolution_requests_) {
763 return;
764 }
765 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
766 }
767
GetAuthority()768 absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
769 return priority_->priority_policy_->channel_control_helper()->GetAuthority();
770 }
771
AddTraceEvent(TraceSeverity severity,absl::string_view message)772 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
773 TraceSeverity severity, absl::string_view message) {
774 if (priority_->priority_policy_->shutting_down_) return;
775 priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
776 message);
777 }
778
779 //
780 // factory
781 //
782
783 class PriorityLbFactory : public LoadBalancingPolicyFactory {
784 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const785 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
786 LoadBalancingPolicy::Args args) const override {
787 return MakeOrphanable<PriorityLb>(std::move(args));
788 }
789
name() const790 const char* name() const override { return kPriority; }
791
ParseLoadBalancingConfig(const Json & json,grpc_error_handle * error) const792 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
793 const Json& json, grpc_error_handle* error) const override {
794 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
795 if (json.type() == Json::Type::JSON_NULL) {
796 // priority was mentioned as a policy in the deprecated
797 // loadBalancingPolicy field or in the client API.
798 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
799 "field:loadBalancingPolicy error:priority policy requires "
800 "configuration. Please use loadBalancingConfig field of service "
801 "config instead.");
802 return nullptr;
803 }
804 std::vector<grpc_error_handle> error_list;
805 // Children.
806 std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
807 auto it = json.object_value().find("children");
808 if (it == json.object_value().end()) {
809 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
810 "field:children error:required field missing"));
811 } else if (it->second.type() != Json::Type::OBJECT) {
812 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
813 "field:children error:type should be object"));
814 } else {
815 const Json::Object& object = it->second.object_value();
816 for (const auto& p : object) {
817 const std::string& child_name = p.first;
818 const Json& element = p.second;
819 if (element.type() != Json::Type::OBJECT) {
820 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
821 absl::StrCat("field:children key:", child_name,
822 " error:should be type object")));
823 } else {
824 auto it2 = element.object_value().find("config");
825 if (it2 == element.object_value().end()) {
826 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
827 absl::StrCat("field:children key:", child_name,
828 " error:missing 'config' field")));
829 } else {
830 grpc_error_handle parse_error = GRPC_ERROR_NONE;
831 auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
832 it2->second, &parse_error);
833 bool ignore_resolution_requests = false;
834 // If present, ignore_reresolution_requests must be of type
835 // boolean.
836 auto it3 =
837 element.object_value().find("ignore_reresolution_requests");
838 if (it3 != element.object_value().end()) {
839 if (it3->second.type() == Json::Type::JSON_TRUE) {
840 ignore_resolution_requests = true;
841 } else if (it3->second.type() != Json::Type::JSON_FALSE) {
842 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
843 absl::StrCat("field:children key:", child_name,
844 " field:ignore_reresolution_requests:should "
845 "be type boolean")));
846 }
847 }
848 if (config == nullptr) {
849 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
850 error_list.push_back(
851 GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
852 absl::StrCat("field:children key:", child_name).c_str(),
853 &parse_error, 1));
854 GRPC_ERROR_UNREF(parse_error);
855 }
856 children[child_name].config = std::move(config);
857 children[child_name].ignore_reresolution_requests =
858 ignore_resolution_requests;
859 }
860 }
861 }
862 }
863 // Priorities.
864 std::vector<std::string> priorities;
865 it = json.object_value().find("priorities");
866 if (it == json.object_value().end()) {
867 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
868 "field:priorities error:required field missing"));
869 } else if (it->second.type() != Json::Type::ARRAY) {
870 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
871 "field:priorities error:type should be array"));
872 } else {
873 const Json::Array& array = it->second.array_value();
874 for (size_t i = 0; i < array.size(); ++i) {
875 const Json& element = array[i];
876 if (element.type() != Json::Type::STRING) {
877 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
878 "field:priorities element:", i, " error:should be type string")));
879 } else if (children.find(element.string_value()) == children.end()) {
880 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
881 "field:priorities element:", i, " error:unknown child '",
882 element.string_value(), "'")));
883 } else {
884 priorities.emplace_back(element.string_value());
885 }
886 }
887 if (priorities.size() != children.size()) {
888 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
889 "field:priorities error:priorities size (", priorities.size(),
890 ") != children size (", children.size(), ")")));
891 }
892 }
893 if (error_list.empty()) {
894 return MakeRefCounted<PriorityLbConfig>(std::move(children),
895 std::move(priorities));
896 } else {
897 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
898 "priority_experimental LB policy config", &error_list);
899 return nullptr;
900 }
901 }
902 };
903
904 } // namespace
905
906 } // namespace grpc_core
907
908 //
909 // Plugin registration
910 //
911
grpc_lb_policy_priority_init()912 void grpc_lb_policy_priority_init() {
913 grpc_core::LoadBalancingPolicyRegistry::Builder::
914 RegisterLoadBalancingPolicyFactory(
915 absl::make_unique<grpc_core::PriorityLbFactory>());
916 }
917
grpc_lb_policy_priority_shutdown()918 void grpc_lb_policy_priority_shutdown() {}
919