1 //
2 // Copyright 2018 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include <grpc/support/port_platform.h>
18 
19 #include <inttypes.h>
20 #include <limits.h>
21 
22 #include "absl/strings/str_cat.h"
23 #include "absl/strings/str_format.h"
24 
25 #include <grpc/grpc.h>
26 
27 #include "src/core/ext/filters/client_channel/lb_policy.h"
28 #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h"
29 #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h"
30 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
31 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
32 #include "src/core/lib/channel/channel_args.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/iomgr/timer.h"
36 #include "src/core/lib/iomgr/work_serializer.h"
37 #include "src/core/lib/transport/error_utils.h"
38 
39 namespace grpc_core {
40 
41 TraceFlag grpc_lb_priority_trace(false, "priority_lb");
42 
43 namespace {
44 
45 constexpr char kPriority[] = "priority_experimental";
46 
47 // How long we keep a child around for after it is no longer being used
48 // (either because it has been removed from the config or because we
49 // have switched to a higher-priority child).
50 constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000;
51 
52 // Default for how long we wait for a newly created child to get connected
53 // before starting to attempt the next priority.  Overridable via channel arg.
54 constexpr int kDefaultChildFailoverTimeoutMs = 10000;
55 
56 // Config for priority LB policy.
57 class PriorityLbConfig : public LoadBalancingPolicy::Config {
58  public:
59   struct PriorityLbChild {
60     RefCountedPtr<LoadBalancingPolicy::Config> config;
61     bool ignore_reresolution_requests = false;
62   };
63 
PriorityLbConfig(std::map<std::string,PriorityLbChild> children,std::vector<std::string> priorities)64   PriorityLbConfig(std::map<std::string, PriorityLbChild> children,
65                    std::vector<std::string> priorities)
66       : children_(std::move(children)), priorities_(std::move(priorities)) {}
67 
name() const68   const char* name() const override { return kPriority; }
69 
children() const70   const std::map<std::string, PriorityLbChild>& children() const {
71     return children_;
72   }
priorities() const73   const std::vector<std::string>& priorities() const { return priorities_; }
74 
75  private:
76   const std::map<std::string, PriorityLbChild> children_;
77   const std::vector<std::string> priorities_;
78 };
79 
80 // priority LB policy.
81 class PriorityLb : public LoadBalancingPolicy {
82  public:
83   explicit PriorityLb(Args args);
84 
name() const85   const char* name() const override { return kPriority; }
86 
87   void UpdateLocked(UpdateArgs args) override;
88   void ExitIdleLocked() override;
89   void ResetBackoffLocked() override;
90 
91  private:
92   // Each ChildPriority holds a ref to the PriorityLb.
93   class ChildPriority : public InternallyRefCounted<ChildPriority> {
94    public:
95     ChildPriority(RefCountedPtr<PriorityLb> priority_policy, std::string name);
96 
~ChildPriority()97     ~ChildPriority() override {
98       priority_policy_.reset(DEBUG_LOCATION, "ChildPriority");
99     }
100 
name() const101     const std::string& name() const { return name_; }
102 
103     void UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,
104                       bool ignore_reresolution_requests);
105     void ExitIdleLocked();
106     void ResetBackoffLocked();
107     void DeactivateLocked();
108     void MaybeReactivateLocked();
109     void MaybeCancelFailoverTimerLocked();
110 
111     void Orphan() override;
112 
GetPicker()113     std::unique_ptr<SubchannelPicker> GetPicker() {
114       return absl::make_unique<RefCountedPickerWrapper>(picker_wrapper_);
115     }
116 
connectivity_state() const117     grpc_connectivity_state connectivity_state() const {
118       return connectivity_state_;
119     }
120 
connectivity_status() const121     const absl::Status& connectivity_status() const {
122       return connectivity_status_;
123     }
124 
failover_timer_callback_pending() const125     bool failover_timer_callback_pending() const {
126       return failover_timer_callback_pending_;
127     }
128 
129    private:
130     // A simple wrapper for ref-counting a picker from the child policy.
131     class RefCountedPicker : public RefCounted<RefCountedPicker> {
132      public:
RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)133       explicit RefCountedPicker(std::unique_ptr<SubchannelPicker> picker)
134           : picker_(std::move(picker)) {}
Pick(PickArgs args)135       PickResult Pick(PickArgs args) { return picker_->Pick(args); }
136 
137      private:
138       std::unique_ptr<SubchannelPicker> picker_;
139     };
140 
141     // A non-ref-counted wrapper for RefCountedPicker.
142     class RefCountedPickerWrapper : public SubchannelPicker {
143      public:
RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)144       explicit RefCountedPickerWrapper(RefCountedPtr<RefCountedPicker> picker)
145           : picker_(std::move(picker)) {}
Pick(PickArgs args)146       PickResult Pick(PickArgs args) override { return picker_->Pick(args); }
147 
148      private:
149       RefCountedPtr<RefCountedPicker> picker_;
150     };
151 
152     class Helper : public ChannelControlHelper {
153      public:
Helper(RefCountedPtr<ChildPriority> priority)154       explicit Helper(RefCountedPtr<ChildPriority> priority)
155           : priority_(std::move(priority)) {}
156 
~Helper()157       ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); }
158 
159       RefCountedPtr<SubchannelInterface> CreateSubchannel(
160           ServerAddress address, const grpc_channel_args& args) override;
161       void UpdateState(grpc_connectivity_state state,
162                        const absl::Status& status,
163                        std::unique_ptr<SubchannelPicker> picker) override;
164       void RequestReresolution() override;
165       absl::string_view GetAuthority() override;
166       void AddTraceEvent(TraceSeverity severity,
167                          absl::string_view message) override;
168 
169      private:
170       RefCountedPtr<ChildPriority> priority_;
171     };
172 
173     // Methods for dealing with the child policy.
174     OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
175         const grpc_channel_args* args);
176 
177     void OnConnectivityStateUpdateLocked(
178         grpc_connectivity_state state, const absl::Status& status,
179         std::unique_ptr<SubchannelPicker> picker);
180 
181     void StartFailoverTimerLocked();
182 
183     static void OnFailoverTimer(void* arg, grpc_error_handle error);
184     void OnFailoverTimerLocked(grpc_error_handle error);
185     static void OnDeactivationTimer(void* arg, grpc_error_handle error);
186     void OnDeactivationTimerLocked(grpc_error_handle error);
187 
188     RefCountedPtr<PriorityLb> priority_policy_;
189     const std::string name_;
190     bool ignore_reresolution_requests_ = false;
191 
192     OrphanablePtr<LoadBalancingPolicy> child_policy_;
193 
194     grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING;
195     absl::Status connectivity_status_;
196     RefCountedPtr<RefCountedPicker> picker_wrapper_;
197 
198     // States for delayed removal.
199     grpc_timer deactivation_timer_;
200     grpc_closure on_deactivation_timer_;
201     bool deactivation_timer_callback_pending_ = false;
202 
203     // States of failover.
204     grpc_timer failover_timer_;
205     grpc_closure on_failover_timer_;
206     bool failover_timer_callback_pending_ = false;
207   };
208 
209   ~PriorityLb() override;
210 
211   void ShutdownLocked() override;
212 
213   // Returns UINT32_MAX if child is not in current priority list.
214   uint32_t GetChildPriorityLocked(const std::string& child_name) const;
215 
216   void HandleChildConnectivityStateChangeLocked(ChildPriority* child);
217   void DeleteChild(ChildPriority* child);
218 
219   void TryNextPriorityLocked(bool report_connecting);
220   void SelectPriorityLocked(uint32_t priority);
221 
222   const int child_failover_timeout_ms_;
223 
224   // Current channel args and config from the resolver.
225   const grpc_channel_args* args_ = nullptr;
226   RefCountedPtr<PriorityLbConfig> config_;
227   HierarchicalAddressMap addresses_;
228 
229   // Internal state.
230   bool shutting_down_ = false;
231 
232   std::map<std::string, OrphanablePtr<ChildPriority>> children_;
233   // The priority that is being used.
234   uint32_t current_priority_ = UINT32_MAX;
235   // Points to the current child from before the most recent update.
236   // We will continue to use this child until we decide which of the new
237   // children to use.
238   ChildPriority* current_child_from_before_update_ = nullptr;
239 };
240 
241 //
242 // PriorityLb
243 //
244 
PriorityLb(Args args)245 PriorityLb::PriorityLb(Args args)
246     : LoadBalancingPolicy(std::move(args)),
247       child_failover_timeout_ms_(grpc_channel_args_find_integer(
248           args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS,
249           {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) {
250   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
251     gpr_log(GPR_INFO, "[priority_lb %p] created", this);
252   }
253 }
254 
~PriorityLb()255 PriorityLb::~PriorityLb() {
256   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
257     gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this);
258   }
259   grpc_channel_args_destroy(args_);
260 }
261 
ShutdownLocked()262 void PriorityLb::ShutdownLocked() {
263   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
264     gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this);
265   }
266   shutting_down_ = true;
267   children_.clear();
268 }
269 
ExitIdleLocked()270 void PriorityLb::ExitIdleLocked() {
271   if (current_priority_ != UINT32_MAX) {
272     const std::string& child_name = config_->priorities()[current_priority_];
273     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
274       gpr_log(GPR_INFO,
275               "[priority_lb %p] exiting IDLE for current priority %d child %s",
276               this, current_priority_, child_name.c_str());
277     }
278     children_[child_name]->ExitIdleLocked();
279   }
280 }
281 
ResetBackoffLocked()282 void PriorityLb::ResetBackoffLocked() {
283   for (const auto& p : children_) p.second->ResetBackoffLocked();
284 }
285 
UpdateLocked(UpdateArgs args)286 void PriorityLb::UpdateLocked(UpdateArgs args) {
287   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
288     gpr_log(GPR_INFO, "[priority_lb %p] received update", this);
289   }
290   // Save current child.
291   if (current_priority_ != UINT32_MAX) {
292     const std::string& child_name = config_->priorities()[current_priority_];
293     current_child_from_before_update_ = children_[child_name].get();
294     // Unset current_priority_, since it was an index into the old
295     // config's priority list and may no longer be valid.  It will be
296     // reset later by TryNextPriorityLocked(), but we unset it here in
297     // case updating any of our children triggers a state update.
298     current_priority_ = UINT32_MAX;
299   }
300   // Update config.
301   config_ = std::move(args.config);
302   // Update args.
303   grpc_channel_args_destroy(args_);
304   args_ = args.args;
305   args.args = nullptr;
306   // Update addresses.
307   addresses_ = MakeHierarchicalAddressMap(args.addresses);
308   // Check all existing children against the new config.
309   for (const auto& p : children_) {
310     const std::string& child_name = p.first;
311     auto& child = p.second;
312     auto config_it = config_->children().find(child_name);
313     if (config_it == config_->children().end()) {
314       // Existing child not found in new config.  Deactivate it.
315       child->DeactivateLocked();
316     } else {
317       // Existing child found in new config.  Update it.
318       child->UpdateLocked(config_it->second.config,
319                           config_it->second.ignore_reresolution_requests);
320     }
321   }
322   // Try to get connected.
323   TryNextPriorityLocked(/*report_connecting=*/children_.empty());
324 }
325 
GetChildPriorityLocked(const std::string & child_name) const326 uint32_t PriorityLb::GetChildPriorityLocked(
327     const std::string& child_name) const {
328   for (uint32_t priority = 0; priority < config_->priorities().size();
329        ++priority) {
330     if (config_->priorities()[priority] == child_name) return priority;
331   }
332   return UINT32_MAX;
333 }
334 
HandleChildConnectivityStateChangeLocked(ChildPriority * child)335 void PriorityLb::HandleChildConnectivityStateChangeLocked(
336     ChildPriority* child) {
337   // Special case for the child that was the current child before the
338   // most recent update.
339   if (child == current_child_from_before_update_) {
340     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
341       gpr_log(GPR_INFO,
342               "[priority_lb %p] state update for current child from before "
343               "config update",
344               this);
345     }
346     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
347         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
348       // If it's still READY or IDLE, we stick with this child, so pass
349       // the new picker up to our parent.
350       channel_control_helper()->UpdateState(child->connectivity_state(),
351                                             child->connectivity_status(),
352                                             child->GetPicker());
353     } else {
354       // If it's no longer READY or IDLE, we should stop using it.
355       // We already started trying other priorities as a result of the
356       // update, but calling TryNextPriorityLocked() ensures that we will
357       // properly select between CONNECTING and TRANSIENT_FAILURE as the
358       // new state to report to our parent.
359       current_child_from_before_update_ = nullptr;
360       TryNextPriorityLocked(/*report_connecting=*/true);
361     }
362     return;
363   }
364   // Otherwise, find the child's priority.
365   uint32_t child_priority = GetChildPriorityLocked(child->name());
366   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
367     gpr_log(GPR_INFO,
368             "[priority_lb %p] state update for priority %u, child %s, current "
369             "priority %u",
370             this, child_priority, child->name().c_str(), current_priority_);
371   }
372   // Ignore priorities not in the current config.
373   if (child_priority == UINT32_MAX) return;
374   // Ignore lower-than-current priorities.
375   if (child_priority > current_priority_) return;
376   // If a child reports TRANSIENT_FAILURE, start trying the next priority.
377   // Note that even if this is for a higher-than-current priority, we
378   // may still need to create some children between this priority and
379   // the current one (e.g., if we got an update that inserted new
380   // priorities ahead of the current one).
381   if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) {
382     TryNextPriorityLocked(
383         /*report_connecting=*/child_priority == current_priority_);
384     return;
385   }
386   // The update is for a higher-than-current priority (or for any
387   // priority if we don't have any current priority).
388   if (child_priority < current_priority_) {
389     // If the child reports READY or IDLE, switch to that priority.
390     // Otherwise, ignore the update.
391     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
392         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
393       SelectPriorityLocked(child_priority);
394     }
395     return;
396   }
397   // The current priority has returned a new picker, so pass it up to
398   // our parent.
399   channel_control_helper()->UpdateState(child->connectivity_state(),
400                                         child->connectivity_status(),
401                                         child->GetPicker());
402 }
403 
DeleteChild(ChildPriority * child)404 void PriorityLb::DeleteChild(ChildPriority* child) {
405   // If this was the current child from before the most recent update,
406   // stop using it.  We already started trying other priorities as a
407   // result of the update, but calling TryNextPriorityLocked() ensures that
408   // we will properly select between CONNECTING and TRANSIENT_FAILURE as the
409   // new state to report to our parent.
410   if (current_child_from_before_update_ == child) {
411     current_child_from_before_update_ = nullptr;
412     TryNextPriorityLocked(/*report_connecting=*/true);
413   }
414   children_.erase(child->name());
415 }
416 
TryNextPriorityLocked(bool report_connecting)417 void PriorityLb::TryNextPriorityLocked(bool report_connecting) {
418   current_priority_ = UINT32_MAX;
419   for (uint32_t priority = 0; priority < config_->priorities().size();
420        ++priority) {
421     // If the child for the priority does not exist yet, create it.
422     const std::string& child_name = config_->priorities()[priority];
423     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
424       gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this,
425               priority, child_name.c_str());
426     }
427     auto& child = children_[child_name];
428     if (child == nullptr) {
429       if (report_connecting) {
430         channel_control_helper()->UpdateState(
431             GRPC_CHANNEL_CONNECTING, absl::Status(),
432             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
433       }
434       child = MakeOrphanable<ChildPriority>(
435           Ref(DEBUG_LOCATION, "ChildPriority"), child_name);
436       auto child_config = config_->children().find(child_name);
437       GPR_DEBUG_ASSERT(child_config != config_->children().end());
438       child->UpdateLocked(child_config->second.config,
439                           child_config->second.ignore_reresolution_requests);
440       return;
441     }
442     // The child already exists.
443     child->MaybeReactivateLocked();
444     // If the child is in state READY or IDLE, switch to it.
445     if (child->connectivity_state() == GRPC_CHANNEL_READY ||
446         child->connectivity_state() == GRPC_CHANNEL_IDLE) {
447       SelectPriorityLocked(priority);
448       return;
449     }
450     // Child is not READY or IDLE.
451     // If its failover timer is still pending, give it time to fire.
452     if (child->failover_timer_callback_pending()) {
453       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
454         gpr_log(GPR_INFO,
455                 "[priority_lb %p] priority %u, child %s: child still "
456                 "attempting to connect, will wait",
457                 this, priority, child_name.c_str());
458       }
459       if (report_connecting) {
460         channel_control_helper()->UpdateState(
461             GRPC_CHANNEL_CONNECTING, absl::Status(),
462             absl::make_unique<QueuePicker>(Ref(DEBUG_LOCATION, "QueuePicker")));
463       }
464       return;
465     }
466     // Child has been failing for a while.  Move on to the next priority.
467   }
468   // If there are no more priorities to try, report TRANSIENT_FAILURE.
469   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
470     gpr_log(GPR_INFO,
471             "[priority_lb %p] no priority reachable, putting channel in "
472             "TRANSIENT_FAILURE",
473             this);
474   }
475   current_child_from_before_update_ = nullptr;
476   absl::Status status = absl::UnavailableError("no ready priority");
477   channel_control_helper()->UpdateState(
478       GRPC_CHANNEL_TRANSIENT_FAILURE, status,
479       absl::make_unique<TransientFailurePicker>(status));
480 }
481 
SelectPriorityLocked(uint32_t priority)482 void PriorityLb::SelectPriorityLocked(uint32_t priority) {
483   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
484     gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this,
485             priority, config_->priorities()[priority].c_str());
486   }
487   current_priority_ = priority;
488   current_child_from_before_update_ = nullptr;
489   // Deactivate lower priorities.
490   for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) {
491     const std::string& child_name = config_->priorities()[p];
492     auto it = children_.find(child_name);
493     if (it != children_.end()) it->second->DeactivateLocked();
494   }
495   // Update picker.
496   auto& child = children_[config_->priorities()[priority]];
497   channel_control_helper()->UpdateState(child->connectivity_state(),
498                                         child->connectivity_status(),
499                                         child->GetPicker());
500 }
501 
502 //
503 // PriorityLb::ChildPriority
504 //
505 
ChildPriority(RefCountedPtr<PriorityLb> priority_policy,std::string name)506 PriorityLb::ChildPriority::ChildPriority(
507     RefCountedPtr<PriorityLb> priority_policy, std::string name)
508     : priority_policy_(std::move(priority_policy)), name_(std::move(name)) {
509   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
510     gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)",
511             priority_policy_.get(), name_.c_str(), this);
512   }
513   GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this,
514                     grpc_schedule_on_exec_ctx);
515   GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this,
516                     grpc_schedule_on_exec_ctx);
517   // Start the failover timer.
518   StartFailoverTimerLocked();
519 }
520 
Orphan()521 void PriorityLb::ChildPriority::Orphan() {
522   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
523     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned",
524             priority_policy_.get(), name_.c_str(), this);
525   }
526   MaybeCancelFailoverTimerLocked();
527   if (deactivation_timer_callback_pending_) {
528     grpc_timer_cancel(&deactivation_timer_);
529   }
530   // Remove the child policy's interested_parties pollset_set from the
531   // xDS policy.
532   grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
533                                    priority_policy_->interested_parties());
534   child_policy_.reset();
535   // Drop our ref to the child's picker, in case it's holding a ref to
536   // the child.
537   picker_wrapper_.reset();
538   if (deactivation_timer_callback_pending_) {
539     grpc_timer_cancel(&deactivation_timer_);
540   }
541   Unref(DEBUG_LOCATION, "ChildPriority+Orphan");
542 }
543 
UpdateLocked(RefCountedPtr<LoadBalancingPolicy::Config> config,bool ignore_reresolution_requests)544 void PriorityLb::ChildPriority::UpdateLocked(
545     RefCountedPtr<LoadBalancingPolicy::Config> config,
546     bool ignore_reresolution_requests) {
547   if (priority_policy_->shutting_down_) return;
548   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
549     gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update",
550             priority_policy_.get(), name_.c_str(), this);
551   }
552   ignore_reresolution_requests_ = ignore_reresolution_requests;
553   // Create policy if needed.
554   if (child_policy_ == nullptr) {
555     child_policy_ = CreateChildPolicyLocked(priority_policy_->args_);
556   }
557   // Construct update args.
558   UpdateArgs update_args;
559   update_args.config = std::move(config);
560   update_args.addresses = priority_policy_->addresses_[name_];
561   update_args.args = grpc_channel_args_copy(priority_policy_->args_);
562   // Update the policy.
563   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
564     gpr_log(GPR_INFO,
565             "[priority_lb %p] child %s (%p): updating child policy handler %p",
566             priority_policy_.get(), name_.c_str(), this, child_policy_.get());
567   }
568   child_policy_->UpdateLocked(std::move(update_args));
569 }
570 
571 OrphanablePtr<LoadBalancingPolicy>
CreateChildPolicyLocked(const grpc_channel_args * args)572 PriorityLb::ChildPriority::CreateChildPolicyLocked(
573     const grpc_channel_args* args) {
574   LoadBalancingPolicy::Args lb_policy_args;
575   lb_policy_args.work_serializer = priority_policy_->work_serializer();
576   lb_policy_args.args = args;
577   lb_policy_args.channel_control_helper =
578       absl::make_unique<Helper>(this->Ref(DEBUG_LOCATION, "Helper"));
579   OrphanablePtr<LoadBalancingPolicy> lb_policy =
580       MakeOrphanable<ChildPolicyHandler>(std::move(lb_policy_args),
581                                          &grpc_lb_priority_trace);
582   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
583     gpr_log(GPR_INFO,
584             "[priority_lb %p] child %s (%p): created new child policy "
585             "handler %p",
586             priority_policy_.get(), name_.c_str(), this, lb_policy.get());
587   }
588   // Add the parent's interested_parties pollset_set to that of the newly
589   // created child policy. This will make the child policy progress upon
590   // activity on the parent LB, which in turn is tied to the application's call.
591   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
592                                    priority_policy_->interested_parties());
593   return lb_policy;
594 }
595 
ExitIdleLocked()596 void PriorityLb::ChildPriority::ExitIdleLocked() {
597   if (connectivity_state_ == GRPC_CHANNEL_IDLE &&
598       !failover_timer_callback_pending_) {
599     StartFailoverTimerLocked();
600   }
601   child_policy_->ExitIdleLocked();
602 }
603 
ResetBackoffLocked()604 void PriorityLb::ChildPriority::ResetBackoffLocked() {
605   child_policy_->ResetBackoffLocked();
606 }
607 
OnConnectivityStateUpdateLocked(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)608 void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked(
609     grpc_connectivity_state state, const absl::Status& status,
610     std::unique_ptr<SubchannelPicker> picker) {
611   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
612     gpr_log(GPR_INFO,
613             "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p",
614             priority_policy_.get(), name_.c_str(), this,
615             ConnectivityStateName(state), status.ToString().c_str(),
616             picker.get());
617   }
618   // Store the state and picker.
619   connectivity_state_ = state;
620   connectivity_status_ = status;
621   picker_wrapper_ = MakeRefCounted<RefCountedPicker>(std::move(picker));
622   // If READY or TRANSIENT_FAILURE, cancel failover timer.
623   if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
624     MaybeCancelFailoverTimerLocked();
625   }
626   // Notify the parent policy.
627   priority_policy_->HandleChildConnectivityStateChangeLocked(this);
628 }
629 
StartFailoverTimerLocked()630 void PriorityLb::ChildPriority::StartFailoverTimerLocked() {
631   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
632     gpr_log(GPR_INFO,
633             "[priority_lb %p] child %s (%p): starting failover timer for %d ms",
634             priority_policy_.get(), name_.c_str(), this,
635             priority_policy_->child_failover_timeout_ms_);
636   }
637   Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release();
638   grpc_timer_init(
639       &failover_timer_,
640       ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_,
641       &on_failover_timer_);
642   failover_timer_callback_pending_ = true;
643 }
644 
MaybeCancelFailoverTimerLocked()645 void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() {
646   if (failover_timer_callback_pending_) {
647     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
648       gpr_log(GPR_INFO,
649               "[priority_lb %p] child %s (%p): cancelling failover timer",
650               priority_policy_.get(), name_.c_str(), this);
651     }
652     grpc_timer_cancel(&failover_timer_);
653     failover_timer_callback_pending_ = false;
654   }
655 }
656 
OnFailoverTimer(void * arg,grpc_error_handle error)657 void PriorityLb::ChildPriority::OnFailoverTimer(void* arg,
658                                                 grpc_error_handle error) {
659   ChildPriority* self = static_cast<ChildPriority*>(arg);
660   (void)GRPC_ERROR_REF(error);  // ref owned by lambda
661   self->priority_policy_->work_serializer()->Run(
662       [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION);
663 }
664 
OnFailoverTimerLocked(grpc_error_handle error)665 void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error_handle error) {
666   if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ &&
667       !priority_policy_->shutting_down_) {
668     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
669       gpr_log(GPR_INFO,
670               "[priority_lb %p] child %s (%p): failover timer fired, "
671               "reporting TRANSIENT_FAILURE",
672               priority_policy_.get(), name_.c_str(), this);
673     }
674     failover_timer_callback_pending_ = false;
675     OnConnectivityStateUpdateLocked(
676         GRPC_CHANNEL_TRANSIENT_FAILURE,
677         absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"),
678         nullptr);
679   }
680   Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked");
681   GRPC_ERROR_UNREF(error);
682 }
683 
DeactivateLocked()684 void PriorityLb::ChildPriority::DeactivateLocked() {
685   // If already deactivated, don't do it again.
686   if (deactivation_timer_callback_pending_) return;
687   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
688     gpr_log(GPR_INFO,
689             "[priority_lb %p] child %s (%p): deactivating -- will remove in %d "
690             "ms.",
691             priority_policy_.get(), name_.c_str(), this,
692             kChildRetentionIntervalMs);
693   }
694   MaybeCancelFailoverTimerLocked();
695   // Start a timer to delete the child.
696   Ref(DEBUG_LOCATION, "ChildPriority+timer").release();
697   grpc_timer_init(&deactivation_timer_,
698                   ExecCtx::Get()->Now() + kChildRetentionIntervalMs,
699                   &on_deactivation_timer_);
700   deactivation_timer_callback_pending_ = true;
701 }
702 
MaybeReactivateLocked()703 void PriorityLb::ChildPriority::MaybeReactivateLocked() {
704   if (deactivation_timer_callback_pending_) {
705     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
706       gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating",
707               priority_policy_.get(), name_.c_str(), this);
708     }
709     deactivation_timer_callback_pending_ = false;
710     grpc_timer_cancel(&deactivation_timer_);
711   }
712 }
713 
OnDeactivationTimer(void * arg,grpc_error_handle error)714 void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg,
715                                                     grpc_error_handle error) {
716   ChildPriority* self = static_cast<ChildPriority*>(arg);
717   (void)GRPC_ERROR_REF(error);  // ref owned by lambda
718   self->priority_policy_->work_serializer()->Run(
719       [self, error]() { self->OnDeactivationTimerLocked(error); },
720       DEBUG_LOCATION);
721 }
722 
OnDeactivationTimerLocked(grpc_error_handle error)723 void PriorityLb::ChildPriority::OnDeactivationTimerLocked(
724     grpc_error_handle error) {
725   if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ &&
726       !priority_policy_->shutting_down_) {
727     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) {
728       gpr_log(GPR_INFO,
729               "[priority_lb %p] child %s (%p): deactivation timer fired, "
730               "deleting child",
731               priority_policy_.get(), name_.c_str(), this);
732     }
733     deactivation_timer_callback_pending_ = false;
734     priority_policy_->DeleteChild(this);
735   }
736   Unref(DEBUG_LOCATION, "ChildPriority+timer");
737   GRPC_ERROR_UNREF(error);
738 }
739 
740 //
741 // PriorityLb::ChildPriority::Helper
742 //
743 
744 RefCountedPtr<SubchannelInterface>
CreateSubchannel(ServerAddress address,const grpc_channel_args & args)745 PriorityLb::ChildPriority::Helper::CreateSubchannel(
746     ServerAddress address, const grpc_channel_args& args) {
747   if (priority_->priority_policy_->shutting_down_) return nullptr;
748   return priority_->priority_policy_->channel_control_helper()
749       ->CreateSubchannel(std::move(address), args);
750 }
751 
UpdateState(grpc_connectivity_state state,const absl::Status & status,std::unique_ptr<SubchannelPicker> picker)752 void PriorityLb::ChildPriority::Helper::UpdateState(
753     grpc_connectivity_state state, const absl::Status& status,
754     std::unique_ptr<SubchannelPicker> picker) {
755   if (priority_->priority_policy_->shutting_down_) return;
756   // Notify the priority.
757   priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker));
758 }
759 
RequestReresolution()760 void PriorityLb::ChildPriority::Helper::RequestReresolution() {
761   if (priority_->priority_policy_->shutting_down_) return;
762   if (priority_->ignore_reresolution_requests_) {
763     return;
764   }
765   priority_->priority_policy_->channel_control_helper()->RequestReresolution();
766 }
767 
GetAuthority()768 absl::string_view PriorityLb::ChildPriority::Helper::GetAuthority() {
769   return priority_->priority_policy_->channel_control_helper()->GetAuthority();
770 }
771 
AddTraceEvent(TraceSeverity severity,absl::string_view message)772 void PriorityLb::ChildPriority::Helper::AddTraceEvent(
773     TraceSeverity severity, absl::string_view message) {
774   if (priority_->priority_policy_->shutting_down_) return;
775   priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity,
776                                                                        message);
777 }
778 
779 //
780 // factory
781 //
782 
783 class PriorityLbFactory : public LoadBalancingPolicyFactory {
784  public:
CreateLoadBalancingPolicy(LoadBalancingPolicy::Args args) const785   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
786       LoadBalancingPolicy::Args args) const override {
787     return MakeOrphanable<PriorityLb>(std::move(args));
788   }
789 
name() const790   const char* name() const override { return kPriority; }
791 
ParseLoadBalancingConfig(const Json & json,grpc_error_handle * error) const792   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
793       const Json& json, grpc_error_handle* error) const override {
794     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
795     if (json.type() == Json::Type::JSON_NULL) {
796       // priority was mentioned as a policy in the deprecated
797       // loadBalancingPolicy field or in the client API.
798       *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
799           "field:loadBalancingPolicy error:priority policy requires "
800           "configuration. Please use loadBalancingConfig field of service "
801           "config instead.");
802       return nullptr;
803     }
804     std::vector<grpc_error_handle> error_list;
805     // Children.
806     std::map<std::string, PriorityLbConfig::PriorityLbChild> children;
807     auto it = json.object_value().find("children");
808     if (it == json.object_value().end()) {
809       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
810           "field:children error:required field missing"));
811     } else if (it->second.type() != Json::Type::OBJECT) {
812       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
813           "field:children error:type should be object"));
814     } else {
815       const Json::Object& object = it->second.object_value();
816       for (const auto& p : object) {
817         const std::string& child_name = p.first;
818         const Json& element = p.second;
819         if (element.type() != Json::Type::OBJECT) {
820           error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
821               absl::StrCat("field:children key:", child_name,
822                            " error:should be type object")));
823         } else {
824           auto it2 = element.object_value().find("config");
825           if (it2 == element.object_value().end()) {
826             error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
827                 absl::StrCat("field:children key:", child_name,
828                              " error:missing 'config' field")));
829           } else {
830             grpc_error_handle parse_error = GRPC_ERROR_NONE;
831             auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
832                 it2->second, &parse_error);
833             bool ignore_resolution_requests = false;
834             // If present, ignore_reresolution_requests must be of type
835             // boolean.
836             auto it3 =
837                 element.object_value().find("ignore_reresolution_requests");
838             if (it3 != element.object_value().end()) {
839               if (it3->second.type() == Json::Type::JSON_TRUE) {
840                 ignore_resolution_requests = true;
841               } else if (it3->second.type() != Json::Type::JSON_FALSE) {
842                 error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(
843                     absl::StrCat("field:children key:", child_name,
844                                  " field:ignore_reresolution_requests:should "
845                                  "be type boolean")));
846               }
847             }
848             if (config == nullptr) {
849               GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE);
850               error_list.push_back(
851                   GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(
852                       absl::StrCat("field:children key:", child_name).c_str(),
853                       &parse_error, 1));
854               GRPC_ERROR_UNREF(parse_error);
855             }
856             children[child_name].config = std::move(config);
857             children[child_name].ignore_reresolution_requests =
858                 ignore_resolution_requests;
859           }
860         }
861       }
862     }
863     // Priorities.
864     std::vector<std::string> priorities;
865     it = json.object_value().find("priorities");
866     if (it == json.object_value().end()) {
867       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
868           "field:priorities error:required field missing"));
869     } else if (it->second.type() != Json::Type::ARRAY) {
870       error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
871           "field:priorities error:type should be array"));
872     } else {
873       const Json::Array& array = it->second.array_value();
874       for (size_t i = 0; i < array.size(); ++i) {
875         const Json& element = array[i];
876         if (element.type() != Json::Type::STRING) {
877           error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
878               "field:priorities element:", i, " error:should be type string")));
879         } else if (children.find(element.string_value()) == children.end()) {
880           error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
881               "field:priorities element:", i, " error:unknown child '",
882               element.string_value(), "'")));
883         } else {
884           priorities.emplace_back(element.string_value());
885         }
886       }
887       if (priorities.size() != children.size()) {
888         error_list.push_back(GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat(
889             "field:priorities error:priorities size (", priorities.size(),
890             ") != children size (", children.size(), ")")));
891       }
892     }
893     if (error_list.empty()) {
894       return MakeRefCounted<PriorityLbConfig>(std::move(children),
895                                               std::move(priorities));
896     } else {
897       *error = GRPC_ERROR_CREATE_FROM_VECTOR(
898           "priority_experimental LB policy config", &error_list);
899       return nullptr;
900     }
901   }
902 };
903 
904 }  // namespace
905 
906 }  // namespace grpc_core
907 
908 //
909 // Plugin registration
910 //
911 
grpc_lb_policy_priority_init()912 void grpc_lb_policy_priority_init() {
913   grpc_core::LoadBalancingPolicyRegistry::Builder::
914       RegisterLoadBalancingPolicyFactory(
915           absl::make_unique<grpc_core::PriorityLbFactory>());
916 }
917 
grpc_lb_policy_priority_shutdown()918 void grpc_lb_policy_priority_shutdown() {}
919