1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include "absl/strings/str_cat.h"
23 
24 #include <grpc/grpc.h>
25 
26 #include "src/core/ext/filters/client_channel/lb_policy.h"
27 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
29 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
31 #include "src/core/lib/channel/channel_args.h"
32 #include "src/core/lib/gprpp/orphanable.h"
33 #include "src/core/lib/gprpp/ref_counted_ptr.h"
34 #include "src/core/lib/iomgr/timer.h"
35 #include "src/core/lib/iomgr/work_serializer.h"
36 
37 namespace grpc_core {
38 
39 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
40 
41 namespace {
42 
43 constexpr char kPriority[] = "priority_experimental";
44 
45 // How long we keep a child around for after it is no longer being used
46 // (either because it has been removed from the config or because we
47 // have switched to a higher-priority child).
48 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
49 
50 // Default for how long we wait for a newly created child to get connected
51 // before starting to attempt the next priority.  Overridable via channel arg.
52 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
53 
54 // Config for priority LB policy.
55 class PriorityLbConfig : public LoadBalancingPolicy::Config {
56  public:
PriorityLbConfig(std::map<std::string,RefCountedPtr<LoadBalancingPolicy::Config>> children,std::vector<std::string> priorities)57   PriorityLbConfig(
58       std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>
59           children,
60       std::vector<std::string> priorities)
61       : children_(std::move(children)), priorities_(std::move(priorities)) {}
62 
name() const63   const char* name() const override { return kPriority; }
64 
65   const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>&
children() const66   children() const {
67     return children_;
68   }
priorities() const69   const std::vector<std::string>& priorities() const { return priorities_; }
70 
71  private:
72   const std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>>
73       children_;
74   const std::vector<std::string> priorities_;
75 };
76 
77 // priority LB policy.
78 class PriorityLb : public LoadBalancingPolicy {
79  public:
80   explicit PriorityLb(Args args);
81 
name() const82   const char* name() const override { return kPriority; }
83 
84   void UpdateLocked(UpdateArgs args) override;
85   void ExitIdleLocked() override;
86   void ResetBackoffLocked() override;
87 
88  private:
89   // Each ChildPriority holds a ref to the PriorityLb.
90   class ChildPriority : public InternallyRefCounted<ChildPriority> {
91    public:
92     ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
93 
~ChildPriority()94     ~ChildPriority() {
95       priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
96     }
97 
name() const98     const std::string& name() const { return name_; }
99 
100     void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config);
101     void ExitIdleLocked();
102     void ResetBackoffLocked();
103     void DeactivateLocked();
104     void MaybeReactivateLocked();
105     void MaybeCancelFailoverTimerLocked();
106 
107     void Orphan() override;
108 
GetPicker()109     std::unique_ptr<SubchannelPicker> GetPicker() {
110       return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
111     }
112 
connectivity_state() const113     grpc_connectivity_state connectivity_state() const {
114       return connectivity_state_;
115     }
failover_timer_callback_pending() const116     bool failover_timer_callback_pending() const {
117       return failover_timer_callback_pending_;
118     }
119 
120    private:
121     // A simple wrapper for ref-counting a picker from the child policy.
122     class RefCountedPicker : public RefCounted<RefCountedPicker> {
123      public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)124       explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
125           : picker_(std::move(picker)) {}
Pick(PickArgs args)126       PickResult Pick(PickArgs args) { return picker_->Pick(args); }
127 
128      private:
129       std::unique_ptr<SubchannelPicker> picker_;
130     };
131 
132     // A non-ref-counted wrapper for RefCountedPicker.
133     class RefCountedPickerWrapper : public SubchannelPicker {
134      public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)135       explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
136           : picker_(std::move(picker)) {}
Pick(PickArgs args)137       PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
138 
139      private:
140       RefCountedPtr<RefCountedPicker> picker_;
141     };
142 
143     class Helper : public ChannelControlHelper {
144      public:
Helper(RefCountedPtr<ChildPriority> priority)145       explicit Helper(RefCountedPtr<ChildPriority> priority)
146           : priority_(std::move(priority)) {}
147 
~Helper()148       ~Helper() { priority_.reset(DEBUG_LOCATION, "Helper"); }
149 
150       RefCountedPtr<SubchannelInterface> CreateSubchannel(
151           const grpc_channel_args& args) override;
152       void UpdateState(grpc_connectivity_state state,
153                        std::unique_ptr<SubchannelPicker> picker) override;
154       void RequestReresolution() override;
155       void AddTraceEvent(TraceSeverity severity,
156                          absl::string_view message) override;
157 
158      private:
159       RefCountedPtr<ChildPriority> priority_;
160     };
161 
162     // Methods for dealing with the child policy.
163     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
164         const grpc_channel_args* args);
165 
166     void OnConnectivityStateUpdateLocked(
167         grpc_connectivity_state state,
168         std::unique_ptr<SubchannelPicker> picker);
169 
170     void StartFailoverTimerLocked();
171 
172     static void OnFailoverTimer(void* arg, grpc_error* error);
173     void OnFailoverTimerLocked(grpc_error* error);
174     static void OnDeactivationTimer(void* arg, grpc_error* error);
175     void OnDeactivationTimerLocked(grpc_error* error);
176 
177     RefCountedPtr<PriorityLb> priority_policy_;
178     const std::string name_;
179 
180     OrphanablePtr<LoadBalancingPolicy> child_policy_;
181 
182     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
183     RefCountedPtr<RefCountedPicker> picker_wrapper_;
184 
185     // States for delayed removal.
186     grpc_timer deactivation_timer_;
187     grpc_closure on_deactivation_timer_;
188     bool deactivation_timer_callback_pending_ = false;
189 
190     // States of failover.
191     grpc_timer failover_timer_;
192     grpc_closure on_failover_timer_;
193     bool failover_timer_callback_pending_ = false;
194   };
195 
196   ~PriorityLb();
197 
198   void ShutdownLocked() override;
199 
200   // Returns UINT32_MAX if child is not in current priority list.
201   uint32_t GetChildPriorityLocked(const std::string& child_name) const;
202 
203   void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
204   void DeleteChild(ChildPriority* child);
205 
206   void TryNextPriorityLocked(bool report_connecting);
207   void SelectPriorityLocked(uint32_t priority);
208 
209   const int child_failover_timeout_ms_;
210 
211   // Current channel args and config from the resolver.
212   const grpc_channel_args* args_ = nullptr;
213   RefCountedPtr<PriorityLbConfig> config_;
214   HierarchicalAddressMap addresses_;
215 
216   // Internal state.
217   bool shutting_down_ = false;
218 
219   std::map<std::string, OrphanablePtr<ChildPriority>> children_;
220   // The priority that is being used.
221   uint32_t current_priority_ = UINT32_MAX;
222   // Points to the current child from before the most recent update.
223   // We will continue to use this child until we decide which of the new
224   // children to use.
225   ChildPriority* current_child_from_before_update_ = nullptr;
226 };
227 
228 //
229 // PriorityLb
230 //
231 
PriorityLb(Args args)232 PriorityLb::PriorityLb(Args args)
233     : LoadBalancingPolicy(std::move(args)),
234       child_failover_timeout_ms_(grpc_channel_args_find_integer(
235           args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
236           {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
237   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
238     gpr_log(GPR_INFO, "[priority_lb %p] created", this);
239   }
240 }
241 
~PriorityLb()242 PriorityLb::~PriorityLb() {
243   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
244     gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
245   }
246   grpc_channel_args_destroy(args_);
247 }
248 
ShutdownLocked()249 void PriorityLb::ShutdownLocked() {
250   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
251     gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
252   }
253   shutting_down_ = true;
254   children_.clear();
255 }
256 
ExitIdleLocked()257 void PriorityLb::ExitIdleLocked() {
258   if (current_priority_ != UINT32_MAX) {
259     const std::string& child_name = config_->priorities()[current_priority_];
260     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
261       gpr_log(GPR_INFO,
262               "[priority_lb %p] exiting IDLE for current priority %d child %s",
263               this, current_priority_, child_name.c_str());
264     }
265     children_[child_name]->ExitIdleLocked();
266   }
267 }
268 
ResetBackoffLocked()269 void PriorityLb::ResetBackoffLocked() {
270   for (const auto& p : children_) p.second->ResetBackoffLocked();
271 }
272 
UpdateLocked(UpdateArgs args)273 void PriorityLb::UpdateLocked(UpdateArgs args) {
274   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
275     gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
276   }
277   // Save current child.
278   if (current_priority_ != UINT32_MAX) {
279     const std::string& child_name = config_->priorities()[current_priority_];
280     current_child_from_before_update_ = children_[child_name].get();
281     // Unset current_priority_, since it was an index into the old
282     // config's priority list and may no longer be valid.  It will be
283     // reset later by TryNextPriorityLocked(), but we unset it here in
284     // case updating any of our children triggers a state update.
285     current_priority_ = UINT32_MAX;
286   }
287   // Update config.
288   config_ = std::move(args.config);
289   // Update args.
290   grpc_channel_args_destroy(args_);
291   args_ = args.args;
292   args.args = nullptr;
293   // Update addresses.
294   addresses_ = MakeHierarchicalAddressMap(args.addresses);
295   // Check all existing children against the new config.
296   for (const auto& p : children_) {
297     const std::string& child_name = p.first;
298     auto& child = p.second;
299     auto config_it = config_->children().find(child_name);
300     if (config_it == config_->children().end()) {
301       // Existing child not found in new config.  Deactivate it.
302       child->DeactivateLocked();
303     } else {
304       // Existing child found in new config.  Update it.
305       child->UpdateLocked(config_it->second);
306     }
307   }
308   // Try to get connected.
309   TryNextPriorityLocked(/*report_connecting=*/children_.empty());
310 }
311 
GetChildPriorityLocked(const std::string & child_name) const312 uint32_t PriorityLb::GetChildPriorityLocked(
313     const std::string& child_name) const {
314   for (uint32_t priority = 0; priority < config_->priorities().size();
315        ++priority) {
316     if (config_->priorities()[priority] == child_name) return priority;
317   }
318   return UINT32_MAX;
319 }
320 
HandleChildConnectivityStateChangeLocked(ChildPriority * child)321 void PriorityLb::HandleChildConnectivityStateChangeLocked(
322     ChildPriority* child) {
323   // Special case for the child that was the current child before the
324   // most recent update.
325   if (child == current_child_from_before_update_) {
326     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
327       gpr_log(GPR_INFO,
328               "[priority_lb %p] state update for current child from before "
329               "config update",
330               this);
331     }
332     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
333         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
334       // If it's still READY or IDLE, we stick with this child, so pass
335       // the new picker up to our parent.
336       channel_control_helper()->UpdateState(child->connectivity_state(),
337                                             child->GetPicker());
338     } else {
339       // If it's no longer READY or IDLE, we should stop using it.
340       // We already started trying other priorities as a result of the
341       // update, but calling TryNextPriorityLocked() ensures that we will
342       // properly select between CONNECTING and TRANSIENT_FAILURE as the
343       // new state to report to our parent.
344       current_child_from_before_update_ = nullptr;
345       TryNextPriorityLocked(/*report_connecting=*/true);
346     }
347     return;
348   }
349   // Otherwise, find the child's priority.
350   uint32_t child_priority = GetChildPriorityLocked(child->name());
351   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
352     gpr_log(GPR_INFO, "[priority_lb %p] state update for priority %d, child %s",
353             this, child_priority, child->name().c_str());
354   }
355   // Ignore priorities not in the current config.
356   if (child_priority == UINT32_MAX) return;
357   // Ignore lower-than-current priorities.
358   if (child_priority > current_priority_) return;
359   // If a child reports TRANSIENT_FAILURE, start trying the next priority.
360   // Note that even if this is for a higher-than-current priority, we
361   // may still need to create some children between this priority and
362   // the current one (e.g., if we got an update that inserted new
363   // priorities ahead of the current one).
364   if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
365     TryNextPriorityLocked(
366         /*report_connecting=*/child_priority == current_priority_);
367     return;
368   }
369   // The update is for a higher-than-current priority (or for any
370   // priority if we don't have any current priority).
371   if (child_priority < current_priority_) {
372     // If the child reports READY or IDLE, switch to that priority.
373     // Otherwise, ignore the update.
374     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
375         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
376       SelectPriorityLocked(child_priority);
377     }
378     return;
379   }
380   // The current priority has returned a new picker, so pass it up to
381   // our parent.
382   channel_control_helper()->UpdateState(child->connectivity_state(),
383                                         child->GetPicker());
384 }
385 
DeleteChild(ChildPriority * child)386 void PriorityLb::DeleteChild(ChildPriority* child) {
387   // If this was the current child from before the most recent update,
388   // stop using it.  We already started trying other priorities as a
389   // result of the update, but calling TryNextPriorityLocked() ensures that
390   // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
391   // new state to report to our parent.
392   if (current_child_from_before_update_ == child) {
393     current_child_from_before_update_ = nullptr;
394     TryNextPriorityLocked(/*report_connecting=*/true);
395   }
396   children_.erase(child->name());
397 }
398 
TryNextPriorityLocked(bool report_connecting)399 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
400   for (uint32_t priority = 0; priority < config_->priorities().size();
401        ++priority) {
402     // If the child for the priority does not exist yet, create it.
403     const std::string& child_name = config_->priorities()[priority];
404     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
405       gpr_log(GPR_INFO, "[priority_lb %p] trying priority %d, child %s", this,
406               priority, child_name.c_str());
407     }
408     auto& child = children_[child_name];
409     if (child == nullptr) {
410       if (report_connecting) {
411         channel_control_helper()->UpdateState(
412             GRPC_CHANNEL_CONNECTING,
413             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
414       }
415       child = MakeOrphanable<ChildPriority>(
416           Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
417       child->UpdateLocked(config_->children().find(child_name)->second);
418       return;
419     }
420     // The child already exists.
421     child->MaybeReactivateLocked();
422     // If the child is in state READY or IDLE, switch to it.
423     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
424         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
425       SelectPriorityLocked(priority);
426       return;
427     }
428     // Child is not READY or IDLE.
429     // If its failover timer is still pending, give it time to fire.
430     if (child->failover_timer_callback_pending()) {
431       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
432         gpr_log(GPR_INFO,
433                 "[priority_lb %p] priority %d, child %s: child still "
434                 "attempting to connect, will wait",
435                 this, priority, child_name.c_str());
436       }
437       if (report_connecting) {
438         channel_control_helper()->UpdateState(
439             GRPC_CHANNEL_CONNECTING,
440             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
441       }
442       return;
443     }
444     // Child has been failing for a while.  Move on to the next priority.
445   }
446   // If there are no more priorities to try, report TRANSIENT_FAILURE.
447   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
448     gpr_log(GPR_INFO,
449             "[priority_lb %p] no priority reachable, putting channel in "
450             "TRANSIENT_FAILURE",
451             this);
452   }
453   current_priority_ = UINT32_MAX;
454   current_child_from_before_update_ = nullptr;
455   grpc_error* error = grpc_error_set_int(
456       GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"),
457       GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
458   channel_control_helper()->UpdateState(
459       GRPC_CHANNEL_TRANSIENT_FAILURE,
460       absl::make_unique<TransientFailurePicker>(error));
461 }
462 
SelectPriorityLocked(uint32_t priority)463 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
464   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
465     gpr_log(GPR_INFO, "[priority_lb %p] selected priority %d, child %s", this,
466             priority, config_->priorities()[priority].c_str());
467   }
468   current_priority_ = priority;
469   current_child_from_before_update_ = nullptr;
470   // Deactivate lower priorities.
471   for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
472     const std::string& child_name = config_->priorities()[p];
473     auto it = children_.find(child_name);
474     if (it != children_.end()) it->second->DeactivateLocked();
475   }
476   // Update picker.
477   auto& child = children_[config_->priorities()[priority]];
478   channel_control_helper()->UpdateState(child->connectivity_state(),
479                                         child->GetPicker());
480 }
481 
482 //
483 // PriorityLb::ChildPriority
484 //
485 
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)486 PriorityLb::ChildPriority::ChildPriority(
487     RefCountedPtr<PriorityLb> priority_policy, std::string name)
488     : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
489   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
490     gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
491             priority_policy_.get(), name_.c_str(), this);
492   }
493   GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
494                     grpc_schedule_on_exec_ctx);
495   GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
496                     grpc_schedule_on_exec_ctx);
497   // Start the failover timer.
498   StartFailoverTimerLocked();
499 }
500 
Orphan()501 void PriorityLb::ChildPriority::Orphan() {
502   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
503     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
504             priority_policy_.get(), name_.c_str(), this);
505   }
506   MaybeCancelFailoverTimerLocked();
507   if (deactivation_timer_callback_pending_) {
508     grpc_timer_cancel(&deactivation_timer_);
509   }
510   // Remove the child policy's interested_parties pollset_set from the
511   // xDS policy.
512   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
513                                    priority_policy_->interested_parties());
514   child_policy_.reset();
515   // Drop our ref to the child's picker, in case it's holding a ref to
516   // the child.
517   picker_wrapper_.reset();
518   if (deactivation_timer_callback_pending_) {
519     grpc_timer_cancel(&deactivation_timer_);
520   }
521   Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
522 }
523 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config)524 void PriorityLb::ChildPriority::UpdateLocked(
525     RefCountedPtr<LoadBalancingPolicy::Config> config) {
526   if (priority_policy_->shutting_down_) return;
527   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
528     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
529             priority_policy_.get(), name_.c_str(), this);
530   }
531   // Create policy if needed.
532   if (child_policy_ == nullptr) {
533     child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
534   }
535   // Construct update args.
536   UpdateArgs update_args;
537   update_args.config = std::move(config);
538   update_args.addresses = priority_policy_->addresses_[name_];
539   update_args.args = grpc_channel_args_copy(priority_policy_->args_);
540   // Update the policy.
541   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
542     gpr_log(GPR_INFO,
543             "[priority_lb %p] child %s (%p): updating child policy handler %p",
544             priority_policy_.get(), name_.c_str(), this, child_policy_.get());
545   }
546   child_policy_->UpdateLocked(std::move(update_args));
547 }
548 
549 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)550 PriorityLb::ChildPriority::CreateChildPolicyLocked(
551     const grpc_channel_args* args) {
552   LoadBalancingPolicy::Args lb_policy_args;
553   lb_policy_args.work_serializer = priority_policy_->work_serializer();
554   lb_policy_args.args = args;
555   lb_policy_args.channel_control_helper =
556       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
557   OrphanablePtr<LoadBalancingPolicy> lb_policy =
558       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
559                                          &grpc_lb_priority_trace);
560   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
561     gpr_log(GPR_INFO,
562             "[priority_lb %p] child %s (%p): created new child policy "
563             "handler %p",
564             priority_policy_.get(), name_.c_str(), this, lb_policy.get());
565   }
566   // Add the parent's interested_parties pollset_set to that of the newly
567   // created child policy. This will make the child policy progress upon
568   // activity on the parent LB, which in turn is tied to the application's call.
569   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
570                                    priority_policy_->interested_parties());
571   return lb_policy;
572 }
573 
ExitIdleLocked()574 void PriorityLb::ChildPriority::ExitIdleLocked() {
575   if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
576       !failover_timer_callback_pending_) {
577     StartFailoverTimerLocked();
578   }
579   child_policy_->ExitIdleLocked();
580 }
581 
ResetBackoffLocked()582 void PriorityLb::ChildPriority::ResetBackoffLocked() {
583   child_policy_->ResetBackoffLocked();
584 }
585 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)586 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
587     grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
588   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
589     gpr_log(GPR_INFO,
590             "[priority_lb %p] child %s (%p): state update: %s, picker %p",
591             priority_policy_.get(), name_.c_str(), this,
592             ConnectivityStateName(state), picker.get());
593   }
594   // Store the state and picker.
595   connectivity_state_ = state;
596   picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
597   // If READY or TRANSIENT_FAILURE, cancel failover timer.
598   if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
599     MaybeCancelFailoverTimerLocked();
600   }
601   // Notify the parent policy.
602   priority_policy_->HandleChildConnectivityStateChangeLocked(this);
603 }
604 
StartFailoverTimerLocked()605 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
606   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
607     gpr_log(GPR_INFO,
608             "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
609             priority_policy_.get(), name_.c_str(), this,
610             priority_policy_->child_failover_timeout_ms_);
611   }
612   Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
613   grpc_timer_init(
614       &failover_timer_,
615       ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
616       &on_failover_timer_);
617   failover_timer_callback_pending_ = true;
618 }
619 
MaybeCancelFailoverTimerLocked()620 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
621   if (failover_timer_callback_pending_) {
622     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
623       gpr_log(GPR_INFO,
624               "[priority_lb %p] child %s (%p): cancelling failover timer",
625               priority_policy_.get(), name_.c_str(), this);
626     }
627     grpc_timer_cancel(&failover_timer_);
628     failover_timer_callback_pending_ = false;
629   }
630 }
631 
OnFailoverTimer(void * arg,grpc_error * error)632 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) {
633   ChildPriority* self = static_cast<ChildPriority*>(arg);
634   GRPC_ERROR_REF(error);  // ref owned by lambda
635   self->priority_policy_->work_serializer()->Run(
636       [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
637 }
638 
OnFailoverTimerLocked(grpc_error * error)639 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) {
640   if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
641       !priority_policy_->shutting_down_) {
642     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
643       gpr_log(GPR_INFO,
644               "[priority_lb %p] child %s (%p): failover timer fired, "
645               "reporting TRANSIENT_FAILURE",
646               priority_policy_.get(), name_.c_str(), this);
647     }
648     failover_timer_callback_pending_ = false;
649     OnConnectivityStateUpdateLocked(GRPC_CHANNEL_TRANSIENT_FAILURE, nullptr);
650   }
651   Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
652   GRPC_ERROR_UNREF(error);
653 }
654 
DeactivateLocked()655 void PriorityLb::ChildPriority::DeactivateLocked() {
656   // If already deactivated, don't do it again.
657   if (deactivation_timer_callback_pending_) return;
658   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
659     gpr_log(GPR_INFO,
660             "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
661             "ms.",
662             priority_policy_.get(), name_.c_str(), this,
663             kChildRetentionIntervalMs);
664   }
665   MaybeCancelFailoverTimerLocked();
666   // Start a timer to delete the child.
667   Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
668   grpc_timer_init(&deactivation_timer_,
669                   ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
670                   &on_deactivation_timer_);
671   deactivation_timer_callback_pending_ = true;
672 }
673 
MaybeReactivateLocked()674 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
675   if (deactivation_timer_callback_pending_) {
676     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
677       gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
678               priority_policy_.get(), name_.c_str(), this);
679     }
680     deactivation_timer_callback_pending_ = false;
681     grpc_timer_cancel(&deactivation_timer_);
682   }
683 }
684 
OnDeactivationTimer(void * arg,grpc_error * error)685 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
686                                                     grpc_error* error) {
687   ChildPriority* self = static_cast<ChildPriority*>(arg);
688   GRPC_ERROR_REF(error);  // ref owned by lambda
689   self->priority_policy_->work_serializer()->Run(
690       [self, error]() { self->OnDeactivationTimerLocked(error); },
691       DEBUG_LOCATION);
692 }
693 
OnDeactivationTimerLocked(grpc_error * error)694 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) {
695   if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
696       !priority_policy_->shutting_down_) {
697     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
698       gpr_log(GPR_INFO,
699               "[priority_lb %p] child %s (%p): deactivation timer fired, "
700               "deleting child",
701               priority_policy_.get(), name_.c_str(), this);
702     }
703     deactivation_timer_callback_pending_ = false;
704     priority_policy_->DeleteChild(this);
705   }
706   Unref(DEBUG_LOCATION, "ChildPriority+timer");
707   GRPC_ERROR_UNREF(error);
708 }
709 
710 //
711 // PriorityLb::ChildPriority::Helper
712 //
713 
RequestReresolution()714 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
715   if (priority_->priority_policy_->shutting_down_) return;
716   priority_->priority_policy_->channel_control_helper()->RequestReresolution();
717 }
718 
719 RefCountedPtr<SubchannelInterface>
CreateSubchannel(const grpc_channel_args & args)720 PriorityLb::ChildPriority::Helper::CreateSubchannel(
721     const grpc_channel_args& args) {
722   if (priority_->priority_policy_->shutting_down_) return nullptr;
723   return priority_->priority_policy_->channel_control_helper()
724       ->CreateSubchannel(args);
725 }
726 
UpdateState(grpc_connectivity_state state,std::unique_ptr<SubchannelPicker> picker)727 void PriorityLb::ChildPriority::Helper::UpdateState(
728     grpc_connectivity_state state, std::unique_ptr<SubchannelPicker> picker) {
729   if (priority_->priority_policy_->shutting_down_) return;
730   // Notify the priority.
731   priority_->OnConnectivityStateUpdateLocked(state, std::move(picker));
732 }
733 
AddTraceEvent(TraceSeverity severity,absl::string_view message)734 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
735     TraceSeverity severity, absl::string_view message) {
736   if (priority_->priority_policy_->shutting_down_) return;
737   priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
738                                                                        message);
739 }
740 
741 //
742 // factory
743 //
744 
745 class PriorityLbFactory : public LoadBalancingPolicyFactory {
746  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const747   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
748       LoadBalancingPolicy::Args args) const override {
749     return MakeOrphanable<PriorityLb>(std::move(args));
750   }
751 
name() const752   const char* name() const override { return kPriority; }
753 
ParseLoadBalancingConfig(const Json & json,grpc_error ** error) const754   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
755       const Json& json, grpc_error** error) const override {
756     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
757     if (json.type() == Json::Type::JSON_NULL) {
758       // priority was mentioned as a policy in the deprecated
759       // loadBalancingPolicy field or in the client API.
760       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
761           "field:loadBalancingPolicy error:priority policy requires "
762           "configuration. Please use loadBalancingConfig field of service "
763           "config instead.");
764       return nullptr;
765     }
766     std::vector<grpc_error*> error_list;
767     // Children.
768     std::map<std::string, RefCountedPtr<LoadBalancingPolicy::Config>> children;
769     auto it = json.object_value().find("children");
770     if (it == json.object_value().end()) {
771       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
772           "field:children error:required field missing"));
773     } else if (it->second.type() != Json::Type::OBJECT) {
774       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
775           "field:children error:type should be object"));
776     } else {
777       const Json::Object& object = it->second.object_value();
778       for (const auto& p : object) {
779         const std::string& child_name = p.first;
780         const Json& element = p.second;
781         if (element.type() != Json::Type::OBJECT) {
782           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
783               absl::StrCat("field:children key:", child_name,
784                            " error:should be type object")
785                   .c_str()));
786         } else {
787           auto it2 = element.object_value().find("config");
788           if (it2 == element.object_value().end()) {
789             error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
790                 absl::StrCat("field:children key:", child_name,
791                              " error:missing 'config' field")
792                     .c_str()));
793           } else {
794             grpc_error* parse_error = GRPC_ERROR_NONE;
795             auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
796                 it2->second, &parse_error);
797             if (config == nullptr) {
798               GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
799               error_list.push_back(
800                   GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
801                       absl::StrCat("field:children key:", child_name).c_str(),
802                       &parse_error, 1));
803               GRPC_ERROR_UNREF(parse_error);
804             }
805             children[child_name] = std::move(config);
806           }
807         }
808       }
809     }
810     // Priorities.
811     std::vector<std::string> priorities;
812     it = json.object_value().find("priorities");
813     if (it == json.object_value().end()) {
814       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
815           "field:priorities error:required field missing"));
816     } else if (it->second.type() != Json::Type::ARRAY) {
817       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
818           "field:priorities error:type should be array"));
819     } else {
820       const Json::Array& array = it->second.array_value();
821       for (size_t i = 0; i < array.size(); ++i) {
822         const Json& element = array[i];
823         if (element.type() != Json::Type::STRING) {
824           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
825               absl::StrCat("field:priorities element:", i,
826                            " error:should be type string")
827                   .c_str()));
828         } else if (children.find(element.string_value()) == children.end()) {
829           error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
830               absl::StrCat("field:priorities element:", i,
831                            " error:unknown child '", element.string_value(),
832                            "'")
833                   .c_str()));
834         } else {
835           priorities.emplace_back(element.string_value());
836         }
837       }
838       if (priorities.size() != children.size()) {
839         error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING(
840             absl::StrCat("field:priorities error:priorities size (",
841                          priorities.size(), ") != children size (",
842                          children.size(), ")")
843                 .c_str()));
844       }
845     }
846     if (error_list.empty()) {
847       return MakeRefCounted<PriorityLbConfig>(std::move(children),
848                                               std::move(priorities));
849     } else {
850       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
851           "priority_experimental LB policy config", &error_list);
852       return nullptr;
853     }
854   }
855 };
856 
857 }  // namespace
858 
859 }  // namespace grpc_core
860 
861 //
862 // Plugin registration
863 //
864 
grpc_lb_policy_priority_init()865 void grpc_lb_policy_priority_init() {
866   grpc_core::LoadBalancingPolicyRegistry::Builder::
867       RegisterLoadBalancingPolicyFactory(
868           absl::make_unique<grpc_core::PriorityLbFactory>());
869 }
870 
grpc_lb_policy_priority_shutdown()871 void grpc_lb_policy_priority_shutdown() {}
872