1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <string.h>
25
26 #include "absl/container/inlined_vector.h"
27
28 #include <grpc/support/alloc.h>
29
30 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
31 #include "src/core/ext/filters/client_channel/server_address.h"
32 // TODO(roth): Should not need the include of subchannel.h here, since
33 // that implementation should be hidden from the LB policy API.
34 #include "src/core/ext/filters/client_channel/subchannel.h"
35 #include "src/core/ext/filters/client_channel/subchannel_interface.h"
36 #include "src/core/lib/address_utils/sockaddr_utils.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/orphanable.h"
40 #include "src/core/lib/gprpp/ref_counted.h"
41 #include "src/core/lib/gprpp/ref_counted_ptr.h"
42 #include "src/core/lib/iomgr/closure.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44
45 // Code for maintaining a list of subchannels within an LB policy.
46 //
47 // To use this, callers must create their own subclasses, like so:
48 /*
49
50 class MySubchannelList; // Forward declaration.
51
52 class MySubchannelData
53 : public SubchannelData<MySubchannelList, MySubchannelData> {
54 public:
55 void ProcessConnectivityChangeLocked(
56 grpc_connectivity_state connectivity_state) override {
57 // ...code to handle connectivity changes...
58 }
59 };
60
61 class MySubchannelList
62 : public SubchannelList<MySubchannelList, MySubchannelData> {
63 };
64
65 */
66 // All methods will be called from within the client_channel work serializer.
67
68 namespace grpc_core {
69
70 // Forward declaration.
71 template <typename SubchannelListType, typename SubchannelDataType>
72 class SubchannelList;
73
74 // Stores data for a particular subchannel in a subchannel list.
75 // Callers must create a subclass that implements the
76 // ProcessConnectivityChangeLocked() method.
77 template <typename SubchannelListType, typename SubchannelDataType>
78 class SubchannelData {
79 public:
80 // Returns a pointer to the subchannel list containing this object.
subchannel_list()81 SubchannelListType* subchannel_list() const {
82 return static_cast<SubchannelListType*>(subchannel_list_);
83 }
84
85 // Returns the index into the subchannel list of this object.
Index()86 size_t Index() const {
87 return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
88 subchannel_list_->subchannel(0));
89 }
90
91 // Returns a pointer to the subchannel.
subchannel()92 SubchannelInterface* subchannel() const { return subchannel_.get(); }
93
94 // Synchronously checks the subchannel's connectivity state.
95 // Must not be called while there is a connectivity notification
96 // pending (i.e., between calling StartConnectivityWatchLocked() and
97 // calling CancelConnectivityWatchLocked()).
CheckConnectivityStateLocked()98 grpc_connectivity_state CheckConnectivityStateLocked() {
99 GPR_ASSERT(pending_watcher_ == nullptr);
100 connectivity_state_ = subchannel_->CheckConnectivityState();
101 return connectivity_state_;
102 }
103
104 // Resets the connection backoff.
105 // TODO(roth): This method should go away when we move the backoff
106 // code out of the subchannel and into the LB policies.
107 void ResetBackoffLocked();
108
109 // Starts watching the connectivity state of the subchannel.
110 // ProcessConnectivityChangeLocked() will be called whenever the
111 // connectivity state changes.
112 void StartConnectivityWatchLocked();
113
114 // Cancels watching the connectivity state of the subchannel.
115 void CancelConnectivityWatchLocked(const char* reason);
116
117 // Cancels any pending connectivity watch and unrefs the subchannel.
118 void ShutdownLocked();
119
120 protected:
121 SubchannelData(
122 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
123 const ServerAddress& address,
124 RefCountedPtr<SubchannelInterface> subchannel);
125
126 virtual ~SubchannelData();
127
128 // After StartConnectivityWatchLocked() is called, this method will be
129 // invoked whenever the subchannel's connectivity state changes.
130 // To stop watching, use CancelConnectivityWatchLocked().
131 virtual void ProcessConnectivityChangeLocked(
132 grpc_connectivity_state connectivity_state) = 0;
133
134 private:
135 // Watcher for subchannel connectivity state.
136 class Watcher
137 : public SubchannelInterface::ConnectivityStateWatcherInterface {
138 public:
Watcher(SubchannelData<SubchannelListType,SubchannelDataType> * subchannel_data,RefCountedPtr<SubchannelListType> subchannel_list)139 Watcher(
140 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
141 RefCountedPtr<SubchannelListType> subchannel_list)
142 : subchannel_data_(subchannel_data),
143 subchannel_list_(std::move(subchannel_list)) {}
144
~Watcher()145 ~Watcher() override {
146 subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor");
147 }
148
149 void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
150
interested_parties()151 grpc_pollset_set* interested_parties() override {
152 return subchannel_list_->policy()->interested_parties();
153 }
154
155 private:
156 SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
157 RefCountedPtr<SubchannelListType> subchannel_list_;
158 };
159
160 // Unrefs the subchannel.
161 void UnrefSubchannelLocked(const char* reason);
162
163 // Backpointer to owning subchannel list. Not owned.
164 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
165 // The subchannel.
166 RefCountedPtr<SubchannelInterface> subchannel_;
167 // Will be non-null when the subchannel's state is being watched.
168 SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
169 nullptr;
170 // Data updated by the watcher.
171 grpc_connectivity_state connectivity_state_;
172 };
173
174 // A list of subchannels.
175 template <typename SubchannelListType, typename SubchannelDataType>
176 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
177 public:
178 typedef absl::InlinedVector<SubchannelDataType, 10> SubchannelVector;
179
180 // The number of subchannels in the list.
num_subchannels()181 size_t num_subchannels() const { return subchannels_.size(); }
182
183 // The data for the subchannel at a particular index.
subchannel(size_t index)184 SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
185
186 // Returns true if the subchannel list is shutting down.
shutting_down()187 bool shutting_down() const { return shutting_down_; }
188
189 // Accessors.
policy()190 LoadBalancingPolicy* policy() const { return policy_; }
tracer()191 TraceFlag* tracer() const { return tracer_; }
192
193 // Resets connection backoff of all subchannels.
194 // TODO(roth): We will probably need to rethink this as part of moving
195 // the backoff code out of subchannels and into LB policies.
196 void ResetBackoffLocked();
197
Orphan()198 void Orphan() override {
199 ShutdownLocked();
200 InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
201 }
202
203 protected:
204 SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
205 ServerAddressList addresses,
206 LoadBalancingPolicy::ChannelControlHelper* helper,
207 const grpc_channel_args& args);
208
209 virtual ~SubchannelList();
210
211 private:
212 // For accessing Ref() and Unref().
213 friend class SubchannelData<SubchannelListType, SubchannelDataType>;
214
215 void ShutdownLocked();
216
217 // Backpointer to owning policy.
218 LoadBalancingPolicy* policy_;
219
220 TraceFlag* tracer_;
221
222 // The list of subchannels.
223 SubchannelVector subchannels_;
224
225 // Is this list shutting down? This may be true due to the shutdown of the
226 // policy itself or because a newer update has arrived while this one hadn't
227 // finished processing.
228 bool shutting_down_ = false;
229 };
230
231 //
232 // implementation -- no user-servicable parts below
233 //
234
235 //
236 // SubchannelData::Watcher
237 //
238
239 template <typename SubchannelListType, typename SubchannelDataType>
240 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
OnConnectivityStateChange(grpc_connectivity_state new_state)241 OnConnectivityStateChange(grpc_connectivity_state new_state) {
242 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
243 gpr_log(GPR_INFO,
244 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
245 " (subchannel %p): connectivity changed: state=%s, "
246 "shutting_down=%d, pending_watcher=%p",
247 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
248 subchannel_list_.get(), subchannel_data_->Index(),
249 subchannel_list_->num_subchannels(),
250 subchannel_data_->subchannel_.get(),
251 ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
252 subchannel_data_->pending_watcher_);
253 }
254 if (!subchannel_list_->shutting_down() &&
255 subchannel_data_->pending_watcher_ != nullptr) {
256 subchannel_data_->connectivity_state_ = new_state;
257 // Call the subclass's ProcessConnectivityChangeLocked() method.
258 subchannel_data_->ProcessConnectivityChangeLocked(new_state);
259 }
260 }
261
262 //
263 // SubchannelData
264 //
265
266 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelData(SubchannelList<SubchannelListType,SubchannelDataType> * subchannel_list,const ServerAddress &,RefCountedPtr<SubchannelInterface> subchannel)267 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
268 SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
269 const ServerAddress& /*address*/,
270 RefCountedPtr<SubchannelInterface> subchannel)
271 : subchannel_list_(subchannel_list),
272 subchannel_(std::move(subchannel)),
273 // We assume that the current state is IDLE. If not, we'll get a
274 // callback telling us that.
275 connectivity_state_(GRPC_CHANNEL_IDLE) {}
276
277 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelData()278 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
279 GPR_ASSERT(subchannel_ == nullptr);
280 }
281
282 template <typename SubchannelListType, typename SubchannelDataType>
283 void SubchannelData<SubchannelListType, SubchannelDataType>::
UnrefSubchannelLocked(const char * reason)284 UnrefSubchannelLocked(const char* reason) {
285 if (subchannel_ != nullptr) {
286 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
287 gpr_log(GPR_INFO,
288 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
289 " (subchannel %p): unreffing subchannel (%s)",
290 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
291 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
292 subchannel_.get(), reason);
293 }
294 subchannel_.reset();
295 }
296 }
297
298 template <typename SubchannelListType, typename SubchannelDataType>
299 void SubchannelData<SubchannelListType,
ResetBackoffLocked()300 SubchannelDataType>::ResetBackoffLocked() {
301 if (subchannel_ != nullptr) {
302 subchannel_->ResetBackoff();
303 }
304 }
305
306 template <typename SubchannelListType, typename SubchannelDataType>
307 void SubchannelData<SubchannelListType,
StartConnectivityWatchLocked()308 SubchannelDataType>::StartConnectivityWatchLocked() {
309 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
310 gpr_log(GPR_INFO,
311 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
312 " (subchannel %p): starting watch (from %s)",
313 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
314 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
315 subchannel_.get(), ConnectivityStateName(connectivity_state_));
316 }
317 GPR_ASSERT(pending_watcher_ == nullptr);
318 pending_watcher_ =
319 new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
320 subchannel_->WatchConnectivityState(
321 connectivity_state_,
322 std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
323 pending_watcher_));
324 }
325
326 template <typename SubchannelListType, typename SubchannelDataType>
327 void SubchannelData<SubchannelListType, SubchannelDataType>::
CancelConnectivityWatchLocked(const char * reason)328 CancelConnectivityWatchLocked(const char* reason) {
329 if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
330 gpr_log(GPR_INFO,
331 "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
332 " (subchannel %p): canceling connectivity watch (%s)",
333 subchannel_list_->tracer()->name(), subchannel_list_->policy(),
334 subchannel_list_, Index(), subchannel_list_->num_subchannels(),
335 subchannel_.get(), reason);
336 }
337 if (pending_watcher_ != nullptr) {
338 subchannel_->CancelConnectivityStateWatch(pending_watcher_);
339 pending_watcher_ = nullptr;
340 }
341 }
342
343 template <typename SubchannelListType, typename SubchannelDataType>
ShutdownLocked()344 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
345 if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
346 UnrefSubchannelLocked("shutdown");
347 }
348
349 //
350 // SubchannelList
351 //
352
353 template <typename SubchannelListType, typename SubchannelDataType>
SubchannelList(LoadBalancingPolicy * policy,TraceFlag * tracer,ServerAddressList addresses,LoadBalancingPolicy::ChannelControlHelper * helper,const grpc_channel_args & args)354 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
355 LoadBalancingPolicy* policy, TraceFlag* tracer, ServerAddressList addresses,
356 LoadBalancingPolicy::ChannelControlHelper* helper,
357 const grpc_channel_args& args)
358 : InternallyRefCounted<SubchannelListType>(
359 GRPC_TRACE_FLAG_ENABLED(*tracer) ? "SubchannelList" : nullptr),
360 policy_(policy),
361 tracer_(tracer) {
362 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
363 gpr_log(GPR_INFO,
364 "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
365 tracer_->name(), policy, this, addresses.size());
366 }
367 subchannels_.reserve(addresses.size());
368 // Create a subchannel for each address.
369 for (ServerAddress address : addresses) {
370 RefCountedPtr<SubchannelInterface> subchannel =
371 helper->CreateSubchannel(address, args);
372 if (subchannel == nullptr) {
373 // Subchannel could not be created.
374 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
375 gpr_log(GPR_INFO,
376 "[%s %p] could not create subchannel for address %s, "
377 "ignoring",
378 tracer_->name(), policy_, address.ToString().c_str());
379 }
380 continue;
381 }
382 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
383 gpr_log(GPR_INFO,
384 "[%s %p] subchannel list %p index %" PRIuPTR
385 ": Created subchannel %p for address %s",
386 tracer_->name(), policy_, this, subchannels_.size(),
387 subchannel.get(), address.ToString().c_str());
388 }
389 subchannels_.emplace_back(this, std::move(address), std::move(subchannel));
390 }
391 }
392
393 template <typename SubchannelListType, typename SubchannelDataType>
~SubchannelList()394 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
395 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
396 gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
397 policy_, this);
398 }
399 }
400
401 template <typename SubchannelListType, typename SubchannelDataType>
ShutdownLocked()402 void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
403 if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
404 gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
405 tracer_->name(), policy_, this);
406 }
407 GPR_ASSERT(!shutting_down_);
408 shutting_down_ = true;
409 for (size_t i = 0; i < subchannels_.size(); i++) {
410 SubchannelDataType* sd = &subchannels_[i];
411 sd->ShutdownLocked();
412 }
413 }
414
415 template <typename SubchannelListType, typename SubchannelDataType>
416 void SubchannelList<SubchannelListType,
ResetBackoffLocked()417 SubchannelDataType>::ResetBackoffLocked() {
418 for (size_t i = 0; i < subchannels_.size(); i++) {
419 SubchannelDataType* sd = &subchannels_[i];
420 sd->ResetBackoffLocked();
421 }
422 }
423
424 } // namespace grpc_core
425
426 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */
427