1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include <limits.h>
22 
23 #include "src/core/lib/channel/channel_args.h"
24 #include "src/core/lib/channel/channel_stack_builder.h"
25 #include "src/core/lib/gprpp/atomic.h"
26 #include "src/core/lib/iomgr/timer.h"
27 #include "src/core/lib/surface/channel_init.h"
28 #include "src/core/lib/transport/http2_errors.h"
29 
30 // TODO(juanlishen): The idle filter is disabled in client channel by default
31 // due to b/143502997. Try to fix the bug and enable the filter by default.
32 #define DEFAULT_IDLE_TIMEOUT_MS INT_MAX
33 // The user input idle timeout smaller than this would be capped to it.
34 #define MIN_IDLE_TIMEOUT_MS (1 /*second*/ * 1000)
35 
36 namespace grpc_core {
37 
38 TraceFlag grpc_trace_client_idle_filter(false, "client_idle_filter");
39 
40 #define GRPC_IDLE_FILTER_LOG(format, ...)                               \
41   do {                                                                  \
42     if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_client_idle_filter)) {       \
43       gpr_log(GPR_INFO, "(client idle filter) " format, ##__VA_ARGS__); \
44     }                                                                   \
45   } while (0)
46 
47 namespace {
48 
49 /*
50   client_idle_filter maintains a state tracking if there are active calls in the
51   channel and its internal idle_timer_. The states are specified as following:
52 
53   +--------------------------------------------+-------------+---------+
54   |               ChannelState                 | idle_timer_ | channel |
55   +--------------------------------------------+-------------+---------+
56   | IDLE                                       | unset       | idle    |
57   | CALLS_ACTIVE                               | unset       | busy    |
58   | TIMER_PENDING                              | set-valid   | idle    |
59   | TIMER_PENDING_CALLS_ACTIVE                 | set-invalid | busy    |
60   | TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START | set-invalid | idle    |
61   +--------------------------------------------+-------------+---------+
62 
63   IDLE: The initial state of the client_idle_filter, indicating the channel is
64   in IDLE.
65 
66   CALLS_ACTIVE: The channel has 1 or 1+ active calls and the timer is not set.
67 
68   TIMER_PENDING: The state after the timer is set and no calls have arrived
69   after the timer is set. The channel must have 0 active call in this state. If
70   the timer is fired in this state, the channel will go into IDLE state.
71 
72   TIMER_PENDING_CALLS_ACTIVE: The state after the timer is set and at least one
73   call has arrived after the timer is set. The channel must have 1 or 1+ active
74   calls in this state. If the timer is fired in this state, we won't reschedule
75   it.
76 
77   TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START: The state after the timer is set
78   and at least one call has arrived after the timer is set, BUT the channel
79   currently has 0 active call. If the timer is fired in this state, we will
80   reschedule it according to the finish time of the latest call.
81 
82   PROCESSING: The state set to block other threads when the setting thread is
83   doing some work to keep state consistency.
84 
85   idle_timer_ will not be cancelled (unless the channel is shutting down).
86   If the timer callback is called when the idle_timer_ is valid (i.e. idle_state
87   is TIMER_PENDING), the channel will enter IDLE, otherwise the channel won't be
88   changed.
89 
90   State transitions:
91                                             IDLE
92                                             |  ^
93             ---------------------------------  *
94             |                                  *
95             v                                  *
96       CALLS_ACTIVE =================> TIMER_PENDING
97             ^                               |  ^
98             *  ------------------------------  *
99             *  |                               *
100             *  v                               *
101 TIMER_PENDING_CALLS_ACTIVE ===> TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
102             ^                               |
103             |                               |
104             ---------------------------------
105 
106   ---> Triggered by IncreaseCallCount()
107   ===> Triggered by DecreaseCallCount()
108   ***> Triggered by IdleTimerCallback()
109 */
110 enum ChannelState {
111   IDLE,
112   CALLS_ACTIVE,
113   TIMER_PENDING,
114   TIMER_PENDING_CALLS_ACTIVE,
115   TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
116   PROCESSING
117 };
118 
GetClientIdleTimeout(const grpc_channel_args * args)119 grpc_millis GetClientIdleTimeout(const grpc_channel_args* args) {
120   return GPR_MAX(
121       grpc_channel_arg_get_integer(
122           grpc_channel_args_find(args, GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS),
123           {DEFAULT_IDLE_TIMEOUT_MS, 0, INT_MAX}),
124       MIN_IDLE_TIMEOUT_MS);
125 }
126 
127 class ChannelData {
128  public:
129   static grpc_error* Init(grpc_channel_element* elem,
130                           grpc_channel_element_args* args);
131   static void Destroy(grpc_channel_element* elem);
132 
133   static void StartTransportOp(grpc_channel_element* elem,
134                                grpc_transport_op* op);
135 
136   void IncreaseCallCount();
137 
138   void DecreaseCallCount();
139 
140  private:
141   ChannelData(grpc_channel_element* elem, grpc_channel_element_args* args,
142               grpc_error** error);
143   ~ChannelData() = default;
144 
145   static void IdleTimerCallback(void* arg, grpc_error* error);
146   static void IdleTransportOpCompleteCallback(void* arg, grpc_error* error);
147 
148   void StartIdleTimer();
149 
150   void EnterIdle();
151 
152   grpc_channel_element* elem_;
153   // The channel stack to which we take refs for pending callbacks.
154   grpc_channel_stack* channel_stack_;
155   // Timeout after the last RPC finishes on the client channel at which the
156   // channel goes back into IDLE state.
157   const grpc_millis client_idle_timeout_;
158 
159   // Member data used to track the state of channel.
160   grpc_millis last_idle_time_;
161   Atomic<intptr_t> call_count_{0};
162   Atomic<ChannelState> state_{IDLE};
163 
164   // Idle timer and its callback closure.
165   grpc_timer idle_timer_;
166   grpc_closure idle_timer_callback_;
167 
168   // The transport op telling the client channel to enter IDLE.
169   grpc_transport_op idle_transport_op_;
170   grpc_closure idle_transport_op_complete_callback_;
171 };
172 
Init(grpc_channel_element * elem,grpc_channel_element_args * args)173 grpc_error* ChannelData::Init(grpc_channel_element* elem,
174                               grpc_channel_element_args* args) {
175   grpc_error* error = GRPC_ERROR_NONE;
176   new (elem->channel_data) ChannelData(elem, args, &error);
177   return error;
178 }
179 
Destroy(grpc_channel_element * elem)180 void ChannelData::Destroy(grpc_channel_element* elem) {
181   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
182   chand->~ChannelData();
183 }
184 
StartTransportOp(grpc_channel_element * elem,grpc_transport_op * op)185 void ChannelData::StartTransportOp(grpc_channel_element* elem,
186                                    grpc_transport_op* op) {
187   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
188   // Catch the disconnect_with_error transport op.
189   if (op->disconnect_with_error != nullptr) {
190     // IncreaseCallCount() introduces a dummy call and prevent the timer from
191     // being reset by other threads.
192     chand->IncreaseCallCount();
193     // If the timer has been set, cancel the timer.
194     // No synchronization issues here. grpc_timer_cancel() is valid as long as
195     // the timer has been init()ed before.
196     grpc_timer_cancel(&chand->idle_timer_);
197   }
198   // Pass the op to the next filter.
199   grpc_channel_next_op(elem, op);
200 }
201 
IncreaseCallCount()202 void ChannelData::IncreaseCallCount() {
203   const intptr_t previous_value = call_count_.FetchAdd(1, MemoryOrder::RELAXED);
204   GRPC_IDLE_FILTER_LOG("call counter has increased to %" PRIuPTR,
205                        previous_value + 1);
206   if (previous_value == 0) {
207     // This call is the one that makes the channel busy.
208     // Loop here to make sure the previous decrease operation has finished.
209     ChannelState state = state_.Load(MemoryOrder::RELAXED);
210     while (true) {
211       switch (state) {
212         // Timer has not been set. Switch to CALLS_ACTIVE.
213         case IDLE:
214           // In this case, no other threads will modify the state, so we can
215           // just store the value.
216           state_.Store(CALLS_ACTIVE, MemoryOrder::RELAXED);
217           return;
218         // Timer has been set. Switch to TIMER_PENDING_CALLS_ACTIVE.
219         case TIMER_PENDING:
220         case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
221           // At this point, the state may have been switched to IDLE by the
222           // idle timer callback. Therefore, use CAS operation to change the
223           // state atomically.
224           // Use MemoryOrder::ACQUIRE on success to ensure last_idle_time_ has
225           // been properly set in DecreaseCallCount().
226           if (state_.CompareExchangeWeak(&state, TIMER_PENDING_CALLS_ACTIVE,
227                                          MemoryOrder::ACQUIRE,
228                                          MemoryOrder::RELAXED)) {
229             return;
230           }
231           break;
232         default:
233           // The state has not been switched to desired value yet, try again.
234           state = state_.Load(MemoryOrder::RELAXED);
235           break;
236       }
237     }
238   }
239 }
240 
DecreaseCallCount()241 void ChannelData::DecreaseCallCount() {
242   const intptr_t previous_value = call_count_.FetchSub(1, MemoryOrder::RELAXED);
243   GRPC_IDLE_FILTER_LOG("call counter has decreased to %" PRIuPTR,
244                        previous_value - 1);
245   if (previous_value == 1) {
246     // This call is the one that makes the channel idle.
247     // last_idle_time_ does not need to be Atomic<> because busy-loops in
248     // IncreaseCallCount(), DecreaseCallCount() and IdleTimerCallback() will
249     // prevent multiple threads from simultaneously accessing this variable.
250     last_idle_time_ = ExecCtx::Get()->Now();
251     ChannelState state = state_.Load(MemoryOrder::RELAXED);
252     while (true) {
253       switch (state) {
254         // Timer has not been set. Set the timer and switch to TIMER_PENDING
255         case CALLS_ACTIVE:
256           // Release store here to make other threads see the updated value of
257           // last_idle_time_.
258           StartIdleTimer();
259           state_.Store(TIMER_PENDING, MemoryOrder::RELEASE);
260           return;
261         // Timer has been set. Switch to
262         // TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START
263         case TIMER_PENDING_CALLS_ACTIVE:
264           // At this point, the state may have been switched to CALLS_ACTIVE by
265           // the idle timer callback. Therefore, use CAS operation to change the
266           // state atomically.
267           // Release store here to make the idle timer callback see the updated
268           // value of last_idle_time_ to properly reset the idle timer.
269           if (state_.CompareExchangeWeak(
270                   &state, TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START,
271                   MemoryOrder::RELEASE, MemoryOrder::RELAXED)) {
272             return;
273           }
274           break;
275         default:
276           // The state has not been switched to desired value yet, try again.
277           state = state_.Load(MemoryOrder::RELAXED);
278           break;
279       }
280     }
281   }
282 }
283 
ChannelData(grpc_channel_element * elem,grpc_channel_element_args * args,grpc_error **)284 ChannelData::ChannelData(grpc_channel_element* elem,
285                          grpc_channel_element_args* args,
286                          grpc_error** /*error*/)
287     : elem_(elem),
288       channel_stack_(args->channel_stack),
289       client_idle_timeout_(GetClientIdleTimeout(args->channel_args)) {
290   // If the idle filter is explicitly disabled in channel args, this ctor should
291   // not get called.
292   GPR_ASSERT(client_idle_timeout_ != GRPC_MILLIS_INF_FUTURE);
293   GRPC_IDLE_FILTER_LOG("created with max_leisure_time = %" PRId64 " ms",
294                        client_idle_timeout_);
295   // Initialize the idle timer without setting it.
296   grpc_timer_init_unset(&idle_timer_);
297   // Initialize the idle timer callback closure.
298   GRPC_CLOSURE_INIT(&idle_timer_callback_, IdleTimerCallback, this,
299                     grpc_schedule_on_exec_ctx);
300   // Initialize the idle transport op complete callback.
301   GRPC_CLOSURE_INIT(&idle_transport_op_complete_callback_,
302                     IdleTransportOpCompleteCallback, this,
303                     grpc_schedule_on_exec_ctx);
304 }
305 
IdleTimerCallback(void * arg,grpc_error * error)306 void ChannelData::IdleTimerCallback(void* arg, grpc_error* error) {
307   GRPC_IDLE_FILTER_LOG("timer alarms");
308   ChannelData* chand = static_cast<ChannelData*>(arg);
309   if (error != GRPC_ERROR_NONE) {
310     GRPC_IDLE_FILTER_LOG("timer canceled");
311     GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
312     return;
313   }
314   bool finished = false;
315   ChannelState state = chand->state_.Load(MemoryOrder::RELAXED);
316   while (!finished) {
317     switch (state) {
318       case TIMER_PENDING:
319         // Change the state to PROCESSING to block IncreaseCallCout() until the
320         // EnterIdle() operation finishes, preventing mistakenly entering IDLE
321         // when active RPC exists.
322         finished = chand->state_.CompareExchangeWeak(
323             &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
324         if (finished) {
325           chand->EnterIdle();
326           chand->state_.Store(IDLE, MemoryOrder::RELAXED);
327         }
328         break;
329       case TIMER_PENDING_CALLS_ACTIVE:
330         finished = chand->state_.CompareExchangeWeak(
331             &state, CALLS_ACTIVE, MemoryOrder::RELAXED, MemoryOrder::RELAXED);
332         break;
333       case TIMER_PENDING_CALLS_SEEN_SINCE_TIMER_START:
334         // Change the state to PROCESSING to block IncreaseCallCount() until the
335         // StartIdleTimer() operation finishes, preventing mistakenly restarting
336         // the timer after grpc_timer_cancel() when shutdown.
337         finished = chand->state_.CompareExchangeWeak(
338             &state, PROCESSING, MemoryOrder::ACQUIRE, MemoryOrder::RELAXED);
339         if (finished) {
340           chand->StartIdleTimer();
341           chand->state_.Store(TIMER_PENDING, MemoryOrder::RELAXED);
342         }
343         break;
344       default:
345         // The state has not been switched to desired value yet, try again.
346         state = chand->state_.Load(MemoryOrder::RELAXED);
347         break;
348     }
349   }
350   GRPC_IDLE_FILTER_LOG("timer finishes");
351   GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "max idle timer callback");
352 }
353 
IdleTransportOpCompleteCallback(void * arg,grpc_error *)354 void ChannelData::IdleTransportOpCompleteCallback(void* arg,
355                                                   grpc_error* /*error*/) {
356   ChannelData* chand = static_cast<ChannelData*>(arg);
357   GRPC_CHANNEL_STACK_UNREF(chand->channel_stack_, "idle transport op");
358 }
359 
StartIdleTimer()360 void ChannelData::StartIdleTimer() {
361   GRPC_IDLE_FILTER_LOG("timer has started");
362   // Hold a ref to the channel stack for the timer callback.
363   GRPC_CHANNEL_STACK_REF(channel_stack_, "max idle timer callback");
364   grpc_timer_init(&idle_timer_, last_idle_time_ + client_idle_timeout_,
365                   &idle_timer_callback_);
366 }
367 
EnterIdle()368 void ChannelData::EnterIdle() {
369   GRPC_IDLE_FILTER_LOG("the channel will enter IDLE");
370   // Hold a ref to the channel stack for the transport op.
371   GRPC_CHANNEL_STACK_REF(channel_stack_, "idle transport op");
372   // Initialize the transport op.
373   idle_transport_op_ = {};
374   idle_transport_op_.disconnect_with_error = grpc_error_set_int(
375       GRPC_ERROR_CREATE_FROM_STATIC_STRING("enter idle"),
376       GRPC_ERROR_INT_CHANNEL_CONNECTIVITY_STATE, GRPC_CHANNEL_IDLE);
377   idle_transport_op_.on_consumed = &idle_transport_op_complete_callback_;
378   // Pass the transport op down to the channel stack.
379   grpc_channel_next_op(elem_, &idle_transport_op_);
380 }
381 
382 class CallData {
383  public:
384   static grpc_error* Init(grpc_call_element* elem,
385                           const grpc_call_element_args* args);
386   static void Destroy(grpc_call_element* elem,
387                       const grpc_call_final_info* final_info,
388                       grpc_closure* then_schedule_closure);
389 };
390 
Init(grpc_call_element * elem,const grpc_call_element_args *)391 grpc_error* CallData::Init(grpc_call_element* elem,
392                            const grpc_call_element_args* /*args*/) {
393   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
394   chand->IncreaseCallCount();
395   return GRPC_ERROR_NONE;
396 }
397 
Destroy(grpc_call_element * elem,const grpc_call_final_info *,grpc_closure *)398 void CallData::Destroy(grpc_call_element* elem,
399                        const grpc_call_final_info* /*final_info*/,
400                        grpc_closure* /*ignored*/) {
401   ChannelData* chand = static_cast<ChannelData*>(elem->channel_data);
402   chand->DecreaseCallCount();
403 }
404 
405 const grpc_channel_filter grpc_client_idle_filter = {
406     grpc_call_next_op,
407     ChannelData::StartTransportOp,
408     sizeof(CallData),
409     CallData::Init,
410     grpc_call_stack_ignore_set_pollset_or_pollset_set,
411     CallData::Destroy,
412     sizeof(ChannelData),
413     ChannelData::Init,
414     ChannelData::Destroy,
415     grpc_channel_next_get_info,
416     "client_idle"};
417 
MaybeAddClientIdleFilter(grpc_channel_stack_builder * builder,void *)418 static bool MaybeAddClientIdleFilter(grpc_channel_stack_builder* builder,
419                                      void* /*arg*/) {
420   const grpc_channel_args* channel_args =
421       grpc_channel_stack_builder_get_channel_arguments(builder);
422   if (!grpc_channel_args_want_minimal_stack(channel_args) &&
423       GetClientIdleTimeout(channel_args) != INT_MAX) {
424     return grpc_channel_stack_builder_prepend_filter(
425         builder, &grpc_client_idle_filter, nullptr, nullptr);
426   } else {
427     return true;
428   }
429 }
430 
431 }  // namespace
432 }  // namespace grpc_core
433 
grpc_client_idle_filter_init(void)434 void grpc_client_idle_filter_init(void) {
435   grpc_channel_init_register_stage(
436       GRPC_CLIENT_CHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
437       grpc_core::MaybeAddClientIdleFilter, nullptr);
438 }
439 
grpc_client_idle_filter_shutdown(void)440 void grpc_client_idle_filter_shutdown(void) {}
441