1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <limits.h>
22
23 #include "src/core/lib/channel/channel_args.h"
24 #include "src/core/lib/channel/channel_stack_builder.h"
25 #include "src/core/lib/gprpp/atomic.h"
26 #include "src/core/lib/iomgr/timer.h"
27 #include "src/core/lib/surface/channel_init.h"
28 #include "src/core/lib/transport/http2_errors.h"
29
30 // TODO(juanlishen): The idle filter is disabled in client channel by default
31 // due to b/143502997. Try to fix the bug and enable the filter by default.
32 #define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
33 // The user input idle timeout smaller than this would be capped to it.
34 #define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)
35
36 namespace grpc_core {
37
38 TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
39
40 #define GRPC_IDLE_FILTER_LOG(format, ...) \
41 do { \
42 if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) { \
43 gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
44 } \
45 } while (0)
46
47 namespace {
48
49 /*
50 client_idle_filter maintains a state tracking if there are active calls in the
51 channel and its internal idle_timer_. The states are specified as following:
52
53 +--------------------------------------------+-------------+---------+
54 | ChannelState | idle_timer_ | channel |
55 +--------------------------------------------+-------------+---------+
56 | IDLE | unset | idle |
57 | CALLS_ACTIVE | unset | busy |
58 | TIMER_PENDING | set-valid | idle |
59 | TIMER_PENDING_CALLS_ACTIVE | set-invalid | busy |
60 | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle |
61 +--------------------------------------------+-------------+---------+
62
63 IDLE: The initial state of the client_idle_filter, indicating the channel is
64 in IDLE.
65
66 CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
67
68 TIMER_PENDING: The state after the timer is set and no calls have arrived
69 after the timer is set. The channel must have 0 active call in this state. If
70 the timer is fired in this state, the channel will go into IDLE state.
71
72 TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
73 call has arrived after the timer is set. The channel must have 1 or 1+ active
74 calls in this state. If the timer is fired in this state, we won't reschedule
75 it.
76
77 TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
78 and at least one call has arrived after the timer is set, BUT the channel
79 currently has 0 active call. If the timer is fired in this state, we will
80 reschedule it according to the finish time of the latest call.
81
82 PROCESSING: The state set to block other threads when the setting thread is
83 doing some work to keep state consistency.
84
85 idle_timer_ will not be cancelled (unless the channel is shutting down).
86 If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
87 is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
88 changed.
89
90 State transitions:
91 IDLE
92 | ^
93 --------------------------------- *
94 | *
95 v *
96 CALLS_ACTIVE =================> TIMER_PENDING
97 ^ | ^
98 * ------------------------------ *
99 * | *
100 * v *
101 TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
102 ^ |
103 | |
104 ---------------------------------
105
106 ---> Triggered by IncreaseCallCount()
107 ===> Triggered by DecreaseCallCount()
108 ***> Triggered by IdleTimerCallback()
109 */
110 enum ChannelState {
111 IDLE,
112 CALLS_ACTIVE,
113 TIMER_PENDING,
114 TIMER_PENDING_CALLS_ACTIVE,
115 TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
116 PROCESSING
117 };
118
GetClientIdleTimeout(const grpc_channel_args * args)119 grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
120 return GPR_MAX(
121 grpc_channel_arg_get_integer(
122 grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
123 {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
124 MIN_IDLE_TIMEOUT_MS);
125 }
126
127 class ChannelData {
128 public:
129 static grpc_error* Init(grpc_channel_element* elem,
130 grpc_channel_element_args* args);
131 static void Destroy(grpc_channel_element* elem);
132
133 static void StartTransportOp(grpc_channel_element* elem,
134 grpc_transport_op* op);
135
136 void IncreaseCallCount();
137
138 void DecreaseCallCount();
139
140 private:
141 ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
142 grpc_error** error);
143 ~ChannelData() = default;
144
145 static void IdleTimerCallback(void* arg, grpc_error* error);
146 static void IdleTransportOpCompleteCallback(void* arg, grpc_error* error);
147
148 void StartIdleTimer();
149
150 void EnterIdle();
151
152 grpc_channel_element* elem_;
153 // The channel stack to which we take refs for pending callbacks.
154 grpc_channel_stack* channel_stack_;
155 // Timeout after the last RPC finishes on the client channel at which the
156 // channel goes back into IDLE state.
157 const grpc_millis client_idle_timeout_;
158
159 // Member data used to track the state of channel.
160 grpc_millis last_idle_time_;
161 Atomic<intptr_t> call_count_{0};
162 Atomic<ChannelState> state_{IDLE};
163
164 // Idle timer and its callback closure.
165 grpc_timer idle_timer_;
166 grpc_closure idle_timer_callback_;
167
168 // The transport op telling the client channel to enter IDLE.
169 grpc_transport_op idle_transport_op_;
170 grpc_closure idle_transport_op_complete_callback_;
171 };
172
Init(grpc_channel_element * elem,grpc_channel_element_args * args)173 grpc_error* ChannelData::Init(grpc_channel_element* elem,
174 grpc_channel_element_args* args) {
175 grpc_error* error = GRPC_ERROR_NONE;
176 new (elem->channel_data) ChannelData(elem, args, &error);
177 return error;
178 }
179
Destroy(grpc_channel_element * elem)180 void ChannelData::Destroy(grpc_channel_element* elem) {
181 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
182 chand->~ChannelData();
183 }
184
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)185 void ChannelData::StartTransportOp(grpc_channel_element* elem,
186 grpc_transport_op* op) {
187 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
188 // Catch the disconnect_with_error transport op.
189 if (op->disconnect_with_error != nullptr) {
190 // IncreaseCallCount() introduces a dummy call and prevent the timer from
191 // being reset by other threads.
192 chand->IncreaseCallCount();
193 // If the timer has been set, cancel the timer.
194 // No synchronization issues here. grpc_timer_cancel() is valid as long as
195 // the timer has been init()ed before.
196 grpc_timer_cancel(&chand->idle_timer_);
197 }
198 // Pass the op to the next filter.
199 grpc_channel_next_op(elem, op);
200 }
201
IncreaseCallCount()202 void ChannelData::IncreaseCallCount() {
203 const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
204 GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
205 previous_value + 1);
206 if (previous_value == 0) {
207 // This call is the one that makes the channel busy.
208 // Loop here to make sure the previous decrease operation has finished.
209 ChannelState state = state_.Load(MemoryOrder::RELAXED);
210 while (true) {
211 switch (state) {
212 // Timer has not been set. Switch to CALLS_ACTIVE.
213 case IDLE:
214 // In this case, no other threads will modify the state, so we can
215 // just store the value.
216 state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
217 return;
218 // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
219 case TIMER_PENDING:
220 case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
221 // At this point, the state may have been switched to IDLE by the
222 // idle timer callback. Therefore, use CAS operation to change the
223 // state atomically.
224 // Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
225 // been properly set in DecreaseCallCount().
226 if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
227 MemoryOrder::ACQUIRE,
228 MemoryOrder::RELAXED)) {
229 return;
230 }
231 break;
232 default:
233 // The state has not been switched to desired value yet, try again.
234 state = state_.Load(MemoryOrder::RELAXED);
235 break;
236 }
237 }
238 }
239 }
240
DecreaseCallCount()241 void ChannelData::DecreaseCallCount() {
242 const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
243 GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
244 previous_value - 1);
245 if (previous_value == 1) {
246 // This call is the one that makes the channel idle.
247 // last_idle_time_ does not need to be Atomic<> because busy-loops in
248 // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
249 // prevent multiple threads from simultaneously accessing this variable.
250 last_idle_time_ = ExecCtx::Get()->Now();
251 ChannelState state = state_.Load(MemoryOrder::RELAXED);
252 while (true) {
253 switch (state) {
254 // Timer has not been set. Set the timer and switch to TIMER_PENDING
255 case CALLS_ACTIVE:
256 // Release store here to make other threads see the updated value of
257 // last_idle_time_.
258 StartIdleTimer();
259 state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
260 return;
261 // Timer has been set. Switch to
262 // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
263 case TIMER_PENDING_CALLS_ACTIVE:
264 // At this point, the state may have been switched to CALLS_ACTIVE by
265 // the idle timer callback. Therefore, use CAS operation to change the
266 // state atomically.
267 // Release store here to make the idle timer callback see the updated
268 // value of last_idle_time_ to properly reset the idle timer.
269 if (state_.CompareExchangeWeak(
270 &state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
271 MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
272 return;
273 }
274 break;
275 default:
276 // The state has not been switched to desired value yet, try again.
277 state = state_.Load(MemoryOrder::RELAXED);
278 break;
279 }
280 }
281 }
282 }
283
ChannelData(grpc_channel_element * elem,grpc_channel_element_args * args,grpc_error **)284 ChannelData::ChannelData(grpc_channel_element* elem,
285 grpc_channel_element_args* args,
286 grpc_error** /*error*/)
287 : elem_(elem),
288 channel_stack_(args->channel_stack),
289 client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
290 // If the idle filter is explicitly disabled in channel args, this ctor should
291 // not get called.
292 GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
293 GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
294 client_idle_timeout_);
295 // Initialize the idle timer without setting it.
296 grpc_timer_init_unset(&idle_timer_);
297 // Initialize the idle timer callback closure.
298 GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
299 grpc_schedule_on_exec_ctx);
300 // Initialize the idle transport op complete callback.
301 GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
302 IdleTransportOpCompleteCallback, this,
303 grpc_schedule_on_exec_ctx);
304 }
305
IdleTimerCallback(void * arg,grpc_error * error)306 void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
307 GRPC_IDLE_FILTER_LOG("timer alarms");
308 ChannelData* chand = static_cast<ChannelData*>(arg);
309 if (error != GRPC_ERROR_NONE) {
310 GRPC_IDLE_FILTER_LOG("timer canceled");
311 GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
312 return;
313 }
314 bool finished = false;
315 ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
316 while (!finished) {
317 switch (state) {
318 case TIMER_PENDING:
319 // Change the state to PROCESSING to block IncreaseCallCout() until the
320 // EnterIdle() operation finishes, preventing mistakenly entering IDLE
321 // when active RPC exists.
322 finished = chand->state_.CompareExchangeWeak(
323 &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
324 if (finished) {
325 chand->EnterIdle();
326 chand->state_.Store(IDLE, MemoryOrder::RELAXED);
327 }
328 break;
329 case TIMER_PENDING_CALLS_ACTIVE:
330 finished = chand->state_.CompareExchangeWeak(
331 &state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
332 break;
333 case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
334 // Change the state to PROCESSING to block IncreaseCallCount() until the
335 // StartIdleTimer() operation finishes, preventing mistakenly restarting
336 // the timer after grpc_timer_cancel() when shutdown.
337 finished = chand->state_.CompareExchangeWeak(
338 &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
339 if (finished) {
340 chand->StartIdleTimer();
341 chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
342 }
343 break;
344 default:
345 // The state has not been switched to desired value yet, try again.
346 state = chand->state_.Load(MemoryOrder::RELAXED);
347 break;
348 }
349 }
350 GRPC_IDLE_FILTER_LOG("timer finishes");
351 GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
352 }
353
IdleTransportOpCompleteCallback(void * arg,grpc_error *)354 void ChannelData::IdleTransportOpCompleteCallback(void* arg,
355 grpc_error* /*error*/) {
356 ChannelData* chand = static_cast<ChannelData*>(arg);
357 GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
358 }
359
StartIdleTimer()360 void ChannelData::StartIdleTimer() {
361 GRPC_IDLE_FILTER_LOG("timer has started");
362 // Hold a ref to the channel stack for the timer callback.
363 GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
364 grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
365 &idle_timer_callback_);
366 }
367
EnterIdle()368 void ChannelData::EnterIdle() {
369 GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
370 // Hold a ref to the channel stack for the transport op.
371 GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
372 // Initialize the transport op.
373 idle_transport_op_ = {};
374 idle_transport_op_.disconnect_with_error = grpc_error_set_int(
375 GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
376 GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
377 idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
378 // Pass the transport op down to the channel stack.
379 grpc_channel_next_op(elem_, &idle_transport_op_);
380 }
381
382 class CallData {
383 public:
384 static grpc_error* Init(grpc_call_element* elem,
385 const grpc_call_element_args* args);
386 static void Destroy(grpc_call_element* elem,
387 const grpc_call_final_info* final_info,
388 grpc_closure* then_schedule_closure);
389 };
390
Init(grpc_call_element * elem,const grpc_call_element_args *)391 grpc_error* CallData::Init(grpc_call_element* elem,
392 const grpc_call_element_args* /*args*/) {
393 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
394 chand->IncreaseCallCount();
395 return GRPC_ERROR_NONE;
396 }
397
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)398 void CallData::Destroy(grpc_call_element* elem,
399 const grpc_call_final_info* /*final_info*/,
400 grpc_closure* /*ignored*/) {
401 ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
402 chand->DecreaseCallCount();
403 }
404
405 const grpc_channel_filter grpc_client_idle_filter = {
406 grpc_call_next_op,
407 ChannelData::StartTransportOp,
408 sizeof(CallData),
409 CallData::Init,
410 grpc_call_stack_ignore_set_pollset_or_pollset_set,
411 CallData::Destroy,
412 sizeof(ChannelData),
413 ChannelData::Init,
414 ChannelData::Destroy,
415 grpc_channel_next_get_info,
416 "client_idle"};
417
MaybeAddClientIdleFilter(grpc_channel_stack_builder * builder,void *)418 static bool MaybeAddClientIdleFilter(grpc_channel_stack_builder* builder,
419 void* /*arg*/) {
420 const grpc_channel_args* channel_args =
421 grpc_channel_stack_builder_get_channel_arguments(builder);
422 if (!grpc_channel_args_want_minimal_stack(channel_args) &&
423 GetClientIdleTimeout(channel_args) != INT_MAX) {
424 return grpc_channel_stack_builder_prepend_filter(
425 builder, &grpc_client_idle_filter, nullptr, nullptr);
426 } else {
427 return true;
428 }
429 }
430
431 } // namespace
432 } // namespace grpc_core
433
grpc_client_idle_filter_init(void)434 void grpc_client_idle_filter_init(void) {
435 grpc_channel_init_register_stage(
436 GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
437 grpc_core::MaybeAddClientIdleFilter, nullptr);
438 }
439
grpc_client_idle_filter_shutdown(void)440 void grpc_client_idle_filter_shutdown(void) {}
441