1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include <grpc/support/port_platform.h>
18
19 #include <inttypes.h>
20 #include <limits.h>
21
22 #include "absl/strings/str_cat.h"
23
24 #include <grpc/grpc.h>
25
26 #include "src/core/ext/filters/client_channel/lb_policy.h"
27 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
29 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/iomgr/timer.h"
35 #include "src/core/lib/iomgr/work_serializer.h"
36
37 namespace grpc_core {
38
39 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
40
41 namespace {
42
43 constexpr char kPriority[] = "priority_experimental";
44
45 // How long we keep a child around for after it is no longer being used
46 // (either because it has been removed from the config or because we
47 // have switched to a higher-priority child).
48 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
49
50 // Default for how long we wait for a newly created child to get connected
51 // before starting to attempt the next priority. Overridable via channel arg.
52 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
53
54 // Config for priority LB policy.
55 class PriorityLbConfig : public LoadBalancingPolicy::Config {
56 public:
PriorityLbConfig(std::map<std::string,RefCountedPtr<LoadBalancingPolicy::Config>> children,std::vector<std::string> priorities)57 PriorityLbConfig(
58 std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>
59 children,
60 std::vector<std::string> priorities)
61 : children_(std::move(children)), priorities_(std::move(priorities)) {}
62
name() const63 const char* name() const override { return kPriority; }
64
65 const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>&
children() const66 children() const {
67 return children_;
68 }
priorities() const69 const std::vector<std::string>& priorities() const { return priorities_; }
70
71 private:
72 const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>
73 children_;
74 const std::vector<std::string> priorities_;
75 };
76
77 // priority LB policy.
78 class PriorityLb : public LoadBalancingPolicy {
79 public:
80 explicit PriorityLb(Args args);
81
name() const82 const char* name() const override { return kPriority; }
83
84 void UpdateLocked(UpdateArgs args) override;
85 void ExitIdleLocked() override;
86 void ResetBackoffLocked() override;
87
88 private:
89 // Each ChildPriority holds a ref to the PriorityLb.
90 class ChildPriority : public InternallyRefCounted<ChildPriority> {
91 public:
92 ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
93
~ChildPriority()94 ~ChildPriority() {
95 priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
96 }
97
name() const98 const std::string& name() const { return name_; }
99
100 void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config);
101 void ExitIdleLocked();
102 void ResetBackoffLocked();
103 void DeactivateLocked();
104 void MaybeReactivateLocked();
105 void MaybeCancelFailoverTimerLocked();
106
107 void Orphan() override;
108
GetPicker()109 std::unique_ptr<SubchannelPicker> GetPicker() {
110 return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
111 }
112
connectivity_state() const113 grpc_connectivity_state connectivity_state() const {
114 return connectivity_state_;
115 }
failover_timer_callback_pending() const116 bool failover_timer_callback_pending() const {
117 return failover_timer_callback_pending_;
118 }
119
120 private:
121 // A simple wrapper for ref-counting a picker from the child policy.
122 class RefCountedPicker : public RefCounted<RefCountedPicker> {
123 public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)124 explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
125 : picker_(std::move(picker)) {}
Pick(PickArgs args)126 PickResult Pick(PickArgs args) { return picker_->Pick(args); }
127
128 private:
129 std::unique_ptr<SubchannelPicker> picker_;
130 };
131
132 // A non-ref-counted wrapper for RefCountedPicker.
133 class RefCountedPickerWrapper : public SubchannelPicker {
134 public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)135 explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
136 : picker_(std::move(picker)) {}
Pick(PickArgs args)137 PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
138
139 private:
140 RefCountedPtr<RefCountedPicker> picker_;
141 };
142
143 class Helper : public ChannelControlHelper {
144 public:
Helper(RefCountedPtr<ChildPriority> priority)145 explicit Helper(RefCountedPtr<ChildPriority> priority)
146 : priority_(std::move(priority)) {}
147
~Helper()148 ~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); }
149
150 RefCountedPtr<SubchannelInterface> CreateSubchannel(
151 const grpc_channel_args& args) override;
152 void UpdateState(grpc_connectivity_state state,
153 std::unique_ptr<SubchannelPicker> picker) override;
154 void RequestReresolution() override;
155 void AddTraceEvent(TraceSeverity severity,
156 absl::string_view message) override;
157
158 private:
159 RefCountedPtr<ChildPriority> priority_;
160 };
161
162 // Methods for dealing with the child policy.
163 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
164 const grpc_channel_args* args);
165
166 void OnConnectivityStateUpdateLocked(
167 grpc_connectivity_state state,
168 std::unique_ptr<SubchannelPicker> picker);
169
170 void StartFailoverTimerLocked();
171
172 static void OnFailoverTimer(void* arg, grpc_error* error);
173 void OnFailoverTimerLocked(grpc_error* error);
174 static void OnDeactivationTimer(void* arg, grpc_error* error);
175 void OnDeactivationTimerLocked(grpc_error* error);
176
177 RefCountedPtr<PriorityLb> priority_policy_;
178 const std::string name_;
179
180 OrphanablePtr<LoadBalancingPolicy> child_policy_;
181
182 grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
183 RefCountedPtr<RefCountedPicker> picker_wrapper_;
184
185 // States for delayed removal.
186 grpc_timer deactivation_timer_;
187 grpc_closure on_deactivation_timer_;
188 bool deactivation_timer_callback_pending_ = false;
189
190 // States of failover.
191 grpc_timer failover_timer_;
192 grpc_closure on_failover_timer_;
193 bool failover_timer_callback_pending_ = false;
194 };
195
196 ~PriorityLb();
197
198 void ShutdownLocked() override;
199
200 // Returns UINT32_MAX if child is not in current priority list.
201 uint32_t GetChildPriorityLocked(const std::string& child_name) const;
202
203 void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
204 void DeleteChild(ChildPriority* child);
205
206 void TryNextPriorityLocked(bool report_connecting);
207 void SelectPriorityLocked(uint32_t priority);
208
209 const int child_failover_timeout_ms_;
210
211 // Current channel args and config from the resolver.
212 const grpc_channel_args* args_ = nullptr;
213 RefCountedPtr<PriorityLbConfig> config_;
214 HierarchicalAddressMap addresses_;
215
216 // Internal state.
217 bool shutting_down_ = false;
218
219 std::map<std::string, OrphanablePtr<ChildPriority>> children_;
220 // The priority that is being used.
221 uint32_t current_priority_ = UINT32_MAX;
222 // Points to the current child from before the most recent update.
223 // We will continue to use this child until we decide which of the new
224 // children to use.
225 ChildPriority* current_child_from_before_update_ = nullptr;
226 };
227
228 //
229 // PriorityLb
230 //
231
PriorityLb(Args args)232 PriorityLb::PriorityLb(Args args)
233 : LoadBalancingPolicy(std::move(args)),
234 child_failover_timeout_ms_(grpc_channel_args_find_integer(
235 args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
236 {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
237 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
238 gpr_log(GPR_INFO, "[priority_lb %p] created", this);
239 }
240 }
241
~PriorityLb()242 PriorityLb::~PriorityLb() {
243 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
244 gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
245 }
246 grpc_channel_args_destroy(args_);
247 }
248
ShutdownLocked()249 void PriorityLb::ShutdownLocked() {
250 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
251 gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
252 }
253 shutting_down_ = true;
254 children_.clear();
255 }
256
ExitIdleLocked()257 void PriorityLb::ExitIdleLocked() {
258 if (current_priority_ != UINT32_MAX) {
259 const std::string& child_name = config_->priorities()[current_priority_];
260 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
261 gpr_log(GPR_INFO,
262 "[priority_lb %p] exiting IDLE for current priority %d child %s",
263 this, current_priority_, child_name.c_str());
264 }
265 children_[child_name]->ExitIdleLocked();
266 }
267 }
268
ResetBackoffLocked()269 void PriorityLb::ResetBackoffLocked() {
270 for (const auto& p : children_) p.second->ResetBackoffLocked();
271 }
272
UpdateLocked(UpdateArgs args)273 void PriorityLb::UpdateLocked(UpdateArgs args) {
274 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
275 gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
276 }
277 // Save current child.
278 if (current_priority_ != UINT32_MAX) {
279 const std::string& child_name = config_->priorities()[current_priority_];
280 current_child_from_before_update_ = children_[child_name].get();
281 // Unset current_priority_, since it was an index into the old
282 // config's priority list and may no longer be valid. It will be
283 // reset later by TryNextPriorityLocked(), but we unset it here in
284 // case updating any of our children triggers a state update.
285 current_priority_ = UINT32_MAX;
286 }
287 // Update config.
288 config_ = std::move(args.config);
289 // Update args.
290 grpc_channel_args_destroy(args_);
291 args_ = args.args;
292 args.args = nullptr;
293 // Update addresses.
294 addresses_ = MakeHierarchicalAddressMap(args.addresses);
295 // Check all existing children against the new config.
296 for (const auto& p : children_) {
297 const std::string& child_name = p.first;
298 auto& child = p.second;
299 auto config_it = config_->children().find(child_name);
300 if (config_it == config_->children().end()) {
301 // Existing child not found in new config. Deactivate it.
302 child->DeactivateLocked();
303 } else {
304 // Existing child found in new config. Update it.
305 child->UpdateLocked(config_it->second);
306 }
307 }
308 // Try to get connected.
309 TryNextPriorityLocked(/*report_connecting=*/children_.empty());
310 }
311
GetChildPriorityLocked(const std::string & child_name) const312 uint32_t PriorityLb::GetChildPriorityLocked(
313 const std::string& child_name) const {
314 for (uint32_t priority = 0; priority < config_->priorities().size();
315 ++priority) {
316 if (config_->priorities()[priority] == child_name) return priority;
317 }
318 return UINT32_MAX;
319 }
320
HandleChildConnectivityStateChangeLocked(ChildPriority * child)321 void PriorityLb::HandleChildConnectivityStateChangeLocked(
322 ChildPriority* child) {
323 // Special case for the child that was the current child before the
324 // most recent update.
325 if (child == current_child_from_before_update_) {
326 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
327 gpr_log(GPR_INFO,
328 "[priority_lb %p] state update for current child from before "
329 "config update",
330 this);
331 }
332 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
333 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
334 // If it's still READY or IDLE, we stick with this child, so pass
335 // the new picker up to our parent.
336 channel_control_helper()->UpdateState(child->connectivity_state(),
337 child->GetPicker());
338 } else {
339 // If it's no longer READY or IDLE, we should stop using it.
340 // We already started trying other priorities as a result of the
341 // update, but calling TryNextPriorityLocked() ensures that we will
342 // properly select between CONNECTING and TRANSIENT_FAILURE as the
343 // new state to report to our parent.
344 current_child_from_before_update_ = nullptr;
345 TryNextPriorityLocked(/*report_connecting=*/true);
346 }
347 return;
348 }
349 // Otherwise, find the child's priority.
350 uint32_t child_priority = GetChildPriorityLocked(child->name());
351 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
352 gpr_log(GPR_INFO, "[priority_lb %p] state update for priority %d, child %s",
353 this, child_priority, child->name().c_str());
354 }
355 // Ignore priorities not in the current config.
356 if (child_priority == UINT32_MAX) return;
357 // Ignore lower-than-current priorities.
358 if (child_priority > current_priority_) return;
359 // If a child reports TRANSIENT_FAILURE, start trying the next priority.
360 // Note that even if this is for a higher-than-current priority, we
361 // may still need to create some children between this priority and
362 // the current one (e.g., if we got an update that inserted new
363 // priorities ahead of the current one).
364 if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
365 TryNextPriorityLocked(
366 /*report_connecting=*/child_priority == current_priority_);
367 return;
368 }
369 // The update is for a higher-than-current priority (or for any
370 // priority if we don't have any current priority).
371 if (child_priority < current_priority_) {
372 // If the child reports READY or IDLE, switch to that priority.
373 // Otherwise, ignore the update.
374 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
375 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
376 SelectPriorityLocked(child_priority);
377 }
378 return;
379 }
380 // The current priority has returned a new picker, so pass it up to
381 // our parent.
382 channel_control_helper()->UpdateState(child->connectivity_state(),
383 child->GetPicker());
384 }
385
DeleteChild(ChildPriority * child)386 void PriorityLb::DeleteChild(ChildPriority* child) {
387 // If this was the current child from before the most recent update,
388 // stop using it. We already started trying other priorities as a
389 // result of the update, but calling TryNextPriorityLocked() ensures that
390 // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
391 // new state to report to our parent.
392 if (current_child_from_before_update_ == child) {
393 current_child_from_before_update_ = nullptr;
394 TryNextPriorityLocked(/*report_connecting=*/true);
395 }
396 children_.erase(child->name());
397 }
398
TryNextPriorityLocked(bool report_connecting)399 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
400 for (uint32_t priority = 0; priority < config_->priorities().size();
401 ++priority) {
402 // If the child for the priority does not exist yet, create it.
403 const std::string& child_name = config_->priorities()[priority];
404 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
405 gpr_log(GPR_INFO, "[priority_lb %p] trying priority %d, child %s", this,
406 priority, child_name.c_str());
407 }
408 auto& child = children_[child_name];
409 if (child == nullptr) {
410 if (report_connecting) {
411 channel_control_helper()->UpdateState(
412 GRPC_CHANNEL_CONNECTING,
413 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
414 }
415 child = MakeOrphanable<ChildPriority>(
416 Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
417 child->UpdateLocked(config_->children().find(child_name)->second);
418 return;
419 }
420 // The child already exists.
421 child->MaybeReactivateLocked();
422 // If the child is in state READY or IDLE, switch to it.
423 if (child->connectivity_state() == GRPC_CHANNEL_READY ||
424 child->connectivity_state() == GRPC_CHANNEL_IDLE) {
425 SelectPriorityLocked(priority);
426 return;
427 }
428 // Child is not READY or IDLE.
429 // If its failover timer is still pending, give it time to fire.
430 if (child->failover_timer_callback_pending()) {
431 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
432 gpr_log(GPR_INFO,
433 "[priority_lb %p] priority %d, child %s: child still "
434 "attempting to connect, will wait",
435 this, priority, child_name.c_str());
436 }
437 if (report_connecting) {
438 channel_control_helper()->UpdateState(
439 GRPC_CHANNEL_CONNECTING,
440 absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
441 }
442 return;
443 }
444 // Child has been failing for a while. Move on to the next priority.
445 }
446 // If there are no more priorities to try, report TRANSIENT_FAILURE.
447 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
448 gpr_log(GPR_INFO,
449 "[priority_lb %p] no priority reachable, putting channel in "
450 "TRANSIENT_FAILURE",
451 this);
452 }
453 current_priority_ = UINT32_MAX;
454 current_child_from_before_update_ = nullptr;
455 grpc_error* error = grpc_error_set_int(
456 GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
457 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
458 channel_control_helper()->UpdateState(
459 GRPC_CHANNEL_TRANSIENT_FAILURE,
460 absl::make_unique<TransientFailurePicker>(error));
461 }
462
SelectPriorityLocked(uint32_t priority)463 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
464 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
465 gpr_log(GPR_INFO, "[priority_lb %p] selected priority %d, child %s", this,
466 priority, config_->priorities()[priority].c_str());
467 }
468 current_priority_ = priority;
469 current_child_from_before_update_ = nullptr;
470 // Deactivate lower priorities.
471 for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
472 const std::string& child_name = config_->priorities()[p];
473 auto it = children_.find(child_name);
474 if (it != children_.end()) it->second->DeactivateLocked();
475 }
476 // Update picker.
477 auto& child = children_[config_->priorities()[priority]];
478 channel_control_helper()->UpdateState(child->connectivity_state(),
479 child->GetPicker());
480 }
481
482 //
483 // PriorityLb::ChildPriority
484 //
485
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)486 PriorityLb::ChildPriority::ChildPriority(
487 RefCountedPtr<PriorityLb> priority_policy, std::string name)
488 : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
489 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
490 gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
491 priority_policy_.get(), name_.c_str(), this);
492 }
493 GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
494 grpc_schedule_on_exec_ctx);
495 GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
496 grpc_schedule_on_exec_ctx);
497 // Start the failover timer.
498 StartFailoverTimerLocked();
499 }
500
Orphan()501 void PriorityLb::ChildPriority::Orphan() {
502 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
503 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
504 priority_policy_.get(), name_.c_str(), this);
505 }
506 MaybeCancelFailoverTimerLocked();
507 if (deactivation_timer_callback_pending_) {
508 grpc_timer_cancel(&deactivation_timer_);
509 }
510 // Remove the child policy's interested_parties pollset_set from the
511 // xDS policy.
512 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
513 priority_policy_->interested_parties());
514 child_policy_.reset();
515 // Drop our ref to the child's picker, in case it's holding a ref to
516 // the child.
517 picker_wrapper_.reset();
518 if (deactivation_timer_callback_pending_) {
519 grpc_timer_cancel(&deactivation_timer_);
520 }
521 Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
522 }
523
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config)524 void PriorityLb::ChildPriority::UpdateLocked(
525 RefCountedPtr<LoadBalancingPolicy::Config> config) {
526 if (priority_policy_->shutting_down_) return;
527 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
528 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
529 priority_policy_.get(), name_.c_str(), this);
530 }
531 // Create policy if needed.
532 if (child_policy_ == nullptr) {
533 child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
534 }
535 // Construct update args.
536 UpdateArgs update_args;
537 update_args.config = std::move(config);
538 update_args.addresses = priority_policy_->addresses_[name_];
539 update_args.args = grpc_channel_args_copy(priority_policy_->args_);
540 // Update the policy.
541 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
542 gpr_log(GPR_INFO,
543 "[priority_lb %p] child %s (%p): updating child policy handler %p",
544 priority_policy_.get(), name_.c_str(), this, child_policy_.get());
545 }
546 child_policy_->UpdateLocked(std::move(update_args));
547 }
548
549 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)550 PriorityLb::ChildPriority::CreateChildPolicyLocked(
551 const grpc_channel_args* args) {
552 LoadBalancingPolicy::Args lb_policy_args;
553 lb_policy_args.work_serializer = priority_policy_->work_serializer();
554 lb_policy_args.args = args;
555 lb_policy_args.channel_control_helper =
556 absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
557 OrphanablePtr<LoadBalancingPolicy> lb_policy =
558 MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
559 &grpc_lb_priority_trace);
560 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
561 gpr_log(GPR_INFO,
562 "[priority_lb %p] child %s (%p): created new child policy "
563 "handler %p",
564 priority_policy_.get(), name_.c_str(), this, lb_policy.get());
565 }
566 // Add the parent's interested_parties pollset_set to that of the newly
567 // created child policy. This will make the child policy progress upon
568 // activity on the parent LB, which in turn is tied to the application's call.
569 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
570 priority_policy_->interested_parties());
571 return lb_policy;
572 }
573
ExitIdleLocked()574 void PriorityLb::ChildPriority::ExitIdleLocked() {
575 if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
576 !failover_timer_callback_pending_) {
577 StartFailoverTimerLocked();
578 }
579 child_policy_->ExitIdleLocked();
580 }
581
ResetBackoffLocked()582 void PriorityLb::ChildPriority::ResetBackoffLocked() {
583 child_policy_->ResetBackoffLocked();
584 }
585
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)586 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
587 grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
588 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
589 gpr_log(GPR_INFO,
590 "[priority_lb %p] child %s (%p): state update: %s, picker %p",
591 priority_policy_.get(), name_.c_str(), this,
592 ConnectivityStateName(state), picker.get());
593 }
594 // Store the state and picker.
595 connectivity_state_ = state;
596 picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
597 // If READY or TRANSIENT_FAILURE, cancel failover timer.
598 if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
599 MaybeCancelFailoverTimerLocked();
600 }
601 // Notify the parent policy.
602 priority_policy_->HandleChildConnectivityStateChangeLocked(this);
603 }
604
StartFailoverTimerLocked()605 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
606 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
607 gpr_log(GPR_INFO,
608 "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
609 priority_policy_.get(), name_.c_str(), this,
610 priority_policy_->child_failover_timeout_ms_);
611 }
612 Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
613 grpc_timer_init(
614 &failover_timer_,
615 ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
616 &on_failover_timer_);
617 failover_timer_callback_pending_ = true;
618 }
619
MaybeCancelFailoverTimerLocked()620 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
621 if (failover_timer_callback_pending_) {
622 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
623 gpr_log(GPR_INFO,
624 "[priority_lb %p] child %s (%p): cancelling failover timer",
625 priority_policy_.get(), name_.c_str(), this);
626 }
627 grpc_timer_cancel(&failover_timer_);
628 failover_timer_callback_pending_ = false;
629 }
630 }
631
OnFailoverTimer(void * arg,grpc_error * error)632 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) {
633 ChildPriority* self = static_cast<ChildPriority*>(arg);
634 GRPC_ERROR_REF(error); // ref owned by lambda
635 self->priority_policy_->work_serializer()->Run(
636 [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
637 }
638
OnFailoverTimerLocked(grpc_error * error)639 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) {
640 if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
641 !priority_policy_->shutting_down_) {
642 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
643 gpr_log(GPR_INFO,
644 "[priority_lb %p] child %s (%p): failover timer fired, "
645 "reporting TRANSIENT_FAILURE",
646 priority_policy_.get(), name_.c_str(), this);
647 }
648 failover_timer_callback_pending_ = false;
649 OnConnectivityStateUpdateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, nullptr);
650 }
651 Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
652 GRPC_ERROR_UNREF(error);
653 }
654
DeactivateLocked()655 void PriorityLb::ChildPriority::DeactivateLocked() {
656 // If already deactivated, don't do it again.
657 if (deactivation_timer_callback_pending_) return;
658 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
659 gpr_log(GPR_INFO,
660 "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
661 "ms.",
662 priority_policy_.get(), name_.c_str(), this,
663 kChildRetentionIntervalMs);
664 }
665 MaybeCancelFailoverTimerLocked();
666 // Start a timer to delete the child.
667 Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
668 grpc_timer_init(&deactivation_timer_,
669 ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
670 &on_deactivation_timer_);
671 deactivation_timer_callback_pending_ = true;
672 }
673
MaybeReactivateLocked()674 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
675 if (deactivation_timer_callback_pending_) {
676 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
677 gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
678 priority_policy_.get(), name_.c_str(), this);
679 }
680 deactivation_timer_callback_pending_ = false;
681 grpc_timer_cancel(&deactivation_timer_);
682 }
683 }
684
OnDeactivationTimer(void * arg,grpc_error * error)685 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
686 grpc_error* error) {
687 ChildPriority* self = static_cast<ChildPriority*>(arg);
688 GRPC_ERROR_REF(error); // ref owned by lambda
689 self->priority_policy_->work_serializer()->Run(
690 [self, error]() { self->OnDeactivationTimerLocked(error); },
691 DEBUG_LOCATION);
692 }
693
OnDeactivationTimerLocked(grpc_error * error)694 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) {
695 if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
696 !priority_policy_->shutting_down_) {
697 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
698 gpr_log(GPR_INFO,
699 "[priority_lb %p] child %s (%p): deactivation timer fired, "
700 "deleting child",
701 priority_policy_.get(), name_.c_str(), this);
702 }
703 deactivation_timer_callback_pending_ = false;
704 priority_policy_->DeleteChild(this);
705 }
706 Unref(DEBUG_LOCATION, "ChildPriority+timer");
707 GRPC_ERROR_UNREF(error);
708 }
709
710 //
711 // PriorityLb::ChildPriority::Helper
712 //
713
RequestReresolution()714 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
715 if (priority_->priority_policy_->shutting_down_) return;
716 priority_->priority_policy_->channel_control_helper()->RequestReresolution();
717 }
718
719 RefCountedPtr<SubchannelInterface>
CreateSubchannel(const grpc_channel_args & args)720 PriorityLb::ChildPriority::Helper::CreateSubchannel(
721 const grpc_channel_args& args) {
722 if (priority_->priority_policy_->shutting_down_) return nullptr;
723 return priority_->priority_policy_->channel_control_helper()
724 ->CreateSubchannel(args);
725 }
726
UpdateState(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)727 void PriorityLb::ChildPriority::Helper::UpdateState(
728 grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
729 if (priority_->priority_policy_->shutting_down_) return;
730 // Notify the priority.
731 priority_->OnConnectivityStateUpdateLocked(state, std::move(picker));
732 }
733
AddTraceEvent(TraceSeverity severity,absl::string_view message)734 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
735 TraceSeverity severity, absl::string_view message) {
736 if (priority_->priority_policy_->shutting_down_) return;
737 priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
738 message);
739 }
740
741 //
742 // factory
743 //
744
745 class PriorityLbFactory : public LoadBalancingPolicyFactory {
746 public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const747 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
748 LoadBalancingPolicy::Args args) const override {
749 return MakeOrphanable<PriorityLb>(std::move(args));
750 }
751
name() const752 const char* name() const override { return kPriority; }
753
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const754 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
755 const Json& json, grpc_error** error) const override {
756 GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
757 if (json.type() == Json::Type::JSON_NULL) {
758 // priority was mentioned as a policy in the deprecated
759 // loadBalancingPolicy field or in the client API.
760 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
761 "field:loadBalancingPolicy error:priority policy requires "
762 "configuration. Please use loadBalancingConfig field of service "
763 "config instead.");
764 return nullptr;
765 }
766 std::vector<grpc_error*> error_list;
767 // Children.
768 std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> children;
769 auto it = json.object_value().find("children");
770 if (it == json.object_value().end()) {
771 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
772 "field:children error:required field missing"));
773 } else if (it->second.type() != Json::Type::OBJECT) {
774 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
775 "field:children error:type should be object"));
776 } else {
777 const Json::Object& object = it->second.object_value();
778 for (const auto& p : object) {
779 const std::string& child_name = p.first;
780 const Json& element = p.second;
781 if (element.type() != Json::Type::OBJECT) {
782 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
783 absl::StrCat("field:children key:", child_name,
784 " error:should be type object")
785 .c_str()));
786 } else {
787 auto it2 = element.object_value().find("config");
788 if (it2 == element.object_value().end()) {
789 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
790 absl::StrCat("field:children key:", child_name,
791 " error:missing 'config' field")
792 .c_str()));
793 } else {
794 grpc_error* parse_error = GRPC_ERROR_NONE;
795 auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
796 it2->second, &parse_error);
797 if (config == nullptr) {
798 GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
799 error_list.push_back(
800 GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
801 absl::StrCat("field:children key:", child_name).c_str(),
802 &parse_error, 1));
803 GRPC_ERROR_UNREF(parse_error);
804 }
805 children[child_name] = std::move(config);
806 }
807 }
808 }
809 }
810 // Priorities.
811 std::vector<std::string> priorities;
812 it = json.object_value().find("priorities");
813 if (it == json.object_value().end()) {
814 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
815 "field:priorities error:required field missing"));
816 } else if (it->second.type() != Json::Type::ARRAY) {
817 error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
818 "field:priorities error:type should be array"));
819 } else {
820 const Json::Array& array = it->second.array_value();
821 for (size_t i = 0; i < array.size(); ++i) {
822 const Json& element = array[i];
823 if (element.type() != Json::Type::STRING) {
824 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
825 absl::StrCat("field:priorities element:", i,
826 " error:should be type string")
827 .c_str()));
828 } else if (children.find(element.string_value()) == children.end()) {
829 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
830 absl::StrCat("field:priorities element:", i,
831 " error:unknown child '", element.string_value(),
832 "'")
833 .c_str()));
834 } else {
835 priorities.emplace_back(element.string_value());
836 }
837 }
838 if (priorities.size() != children.size()) {
839 error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
840 absl::StrCat("field:priorities error:priorities size (",
841 priorities.size(), ") != children size (",
842 children.size(), ")")
843 .c_str()));
844 }
845 }
846 if (error_list.empty()) {
847 return MakeRefCounted<PriorityLbConfig>(std::move(children),
848 std::move(priorities));
849 } else {
850 *error = GRPC_ERROR_CREATE_FROM_VECTOR(
851 "priority_experimental LB policy config", &error_list);
852 return nullptr;
853 }
854 }
855 };
856
857 } // namespace
858
859 } // namespace grpc_core
860
861 //
862 // Plugin registration
863 //
864
grpc_lb_policy_priority_init()865 void grpc_lb_policy_priority_init() {
866 grpc_core::LoadBalancingPolicyRegistry::Builder::
867 RegisterLoadBalancingPolicyFactory(
868 absl::make_unique<grpc_core::PriorityLbFactory>());
869 }
870
grpc_lb_policy_priority_shutdown()871 void grpc_lb_policy_priority_shutdown() {}
872