1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
22 
23 #include <stdint.h>
24 #include <stdio.h>
25 
26 #include "upb/upb.hpp"
27 
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34 
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39 
40 namespace grpc_core {
41 
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43 
44 //
45 // HealthCheckClient
46 //
47 
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49     std::string service_name,
50     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51     grpc_pollset_set* interested_parties,
52     RefCountedPtr<channelz::SubchannelNode> channelz_node,
53     RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54     : InternallyRefCounted<HealthCheckClient>(
55           GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56               ? "HealthCheckClient"
57               : nullptr),
58       service_name_(std::move(service_name)),
59       connected_subchannel_(std::move(connected_subchannel)),
60       interested_parties_(interested_parties),
61       channelz_node_(std::move(channelz_node)),
62       watcher_(std::move(watcher)),
63       retry_backoff_(
64           BackOff::Options()
65               .set_initial_backoff(
66                   HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67               .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68               .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69               .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70                                1000)) {
71   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72     gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73   }
74   GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75                     grpc_schedule_on_exec_ctx);
76   StartCall();
77 }
78 
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81     gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82   }
83 }
84 
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86                                         const char* reason) {
87   MutexLock lock(&mu_);
88   SetHealthStatusLocked(state, reason);
89 }
90 
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92                                               const char* reason) {
93   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94     gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95             ConnectivityStateName(state), reason);
96   }
97   if (watcher_ != nullptr) {
98     watcher_->Notify(state,
99                      state == GRPC_CHANNEL_TRANSIENT_FAILURE
100                          ? absl::Status(absl::StatusCode::kUnavailable, reason)
101                          : absl::Status());
102   }
103 }
104 
Orphan()105 void HealthCheckClient::Orphan() {
106   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107     gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108   }
109   {
110     MutexLock lock(&mu_);
111     shutting_down_ = true;
112     watcher_.reset();
113     call_state_.reset();
114     if (retry_timer_callback_pending_) {
115       grpc_timer_cancel(&retry_timer_);
116     }
117   }
118   Unref(DEBUG_LOCATION, "orphan");
119 }
120 
StartCall()121 void HealthCheckClient::StartCall() {
122   MutexLock lock(&mu_);
123   StartCallLocked();
124 }
125 
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127   if (shutting_down_) return;
128   GPR_ASSERT(call_state_ == nullptr);
129   SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130   call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132     gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133             call_state_.get());
134   }
135   call_state_->StartCall();
136 }
137 
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139   SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140                         "health check call failed; will retry after backoff");
141   grpc_millis next_try = retry_backoff_.NextAttemptTime();
142   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143     gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145     if (timeout > 0) {
146       gpr_log(GPR_INFO,
147               "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148               timeout);
149     } else {
150       gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151               this);
152     }
153   }
154   // Ref for callback, tracked manually.
155   Ref(DEBUG_LOCATION, "health_retry_timer").release();
156   retry_timer_callback_pending_ = true;
157   grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159 
OnRetryTimer(void * arg,grpc_error_handle error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
161   HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162   {
163     MutexLock lock(&self->mu_);
164     self->retry_timer_callback_pending_ = false;
165     if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166         self->call_state_ == nullptr) {
167       if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168         gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169                 self);
170       }
171       self->StartCallLocked();
172     }
173   }
174   self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176 
177 //
178 // protobuf helpers
179 //
180 
181 namespace {
182 
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184                    ManualConstructor<SliceBufferByteStream>* send_message) {
185   upb::Arena arena;
186   grpc_health_v1_HealthCheckRequest* request_struct =
187       grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188   grpc_health_v1_HealthCheckRequest_set_service(
189       request_struct,
190       upb_strview_make(service_name.data(), service_name.size()));
191   size_t buf_length;
192   char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193       request_struct, arena.ptr(), &buf_length);
194   grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195   memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196   grpc_slice_buffer slice_buffer;
197   grpc_slice_buffer_init(&slice_buffer);
198   grpc_slice_buffer_add(&slice_buffer, request_slice);
199   send_message->Init(&slice_buffer, 0);
200   grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202 
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error_handle * error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
206   // If message is empty, assume unhealthy.
207   if (slice_buffer->length == 0) {
208     *error =
209         GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210     return false;
211   }
212   // Concatenate the slices to form a single string.
213   std::unique_ptr<uint8_t> recv_message_deleter;
214   uint8_t* recv_message;
215   if (slice_buffer->count == 1) {
216     recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217   } else {
218     recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219     recv_message_deleter.reset(recv_message);
220     size_t offset = 0;
221     for (size_t i = 0; i < slice_buffer->count; ++i) {
222       memcpy(recv_message + offset,
223              GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224              GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225       offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226     }
227   }
228   // Deserialize message.
229   upb::Arena arena;
230   grpc_health_v1_HealthCheckResponse* response_struct =
231       grpc_health_v1_HealthCheckResponse_parse(
232           reinterpret_cast<char*>(recv_message), slice_buffer->length,
233           arena.ptr());
234   if (response_struct == nullptr) {
235     // Can't parse message; assume unhealthy.
236     *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237         "cannot parse health check response");
238     return false;
239   }
240   int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241   return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243 
244 }  // namespace
245 
246 //
247 // HealthCheckClient::CallState
248 //
249 
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251     RefCountedPtr<HealthCheckClient> health_check_client,
252     grpc_pollset_set* interested_parties)
253     : health_check_client_(std::move(health_check_client)),
254       pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255       arena_(Arena::Create(health_check_client_->connected_subchannel_
256                                ->GetInitialCallSizeEstimate())),
257       payload_(context_),
258       send_initial_metadata_(arena_),
259       send_trailing_metadata_(arena_),
260       recv_initial_metadata_(arena_),
261       recv_trailing_metadata_(arena_) {}
262 
~CallState()263 HealthCheckClient::CallState::~CallState() {
264   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
265     gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
266             health_check_client_.get(), this);
267   }
268   for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
269     if (context_[i].destroy != nullptr) {
270       context_[i].destroy(context_[i].value);
271     }
272   }
273   // Unset the call combiner cancellation closure.  This has the
274   // effect of scheduling the previously set cancellation closure, if
275   // any, so that it can release any internal references it may be
276   // holding to the call stack.
277   call_combiner_.SetNotifyOnCancel(nullptr);
278   arena_->Destroy();
279 }
280 
Orphan()281 void HealthCheckClient::CallState::Orphan() {
282   call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
283   Cancel();
284 }
285 
StartCall()286 void HealthCheckClient::CallState::StartCall() {
287   SubchannelCall::Args args = {
288       health_check_client_->connected_subchannel_,
289       &pollent_,
290       GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
291       gpr_get_cycle_counter(),  // start_time
292       GRPC_MILLIS_INF_FUTURE,   // deadline
293       arena_,
294       context_,
295       &call_combiner_,
296   };
297   grpc_error_handle error = GRPC_ERROR_NONE;
298   call_ = SubchannelCall::Create(std::move(args), &error).release();
299   // Register after-destruction callback.
300   GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
301                     this, grpc_schedule_on_exec_ctx);
302   call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
303   // Check if creation failed.
304   if (error != GRPC_ERROR_NONE) {
305     gpr_log(GPR_ERROR,
306             "HealthCheckClient %p CallState %p: error creating health "
307             "checking call on subchannel (%s); will retry",
308             health_check_client_.get(), this,
309             grpc_error_std_string(error).c_str());
310     GRPC_ERROR_UNREF(error);
311     CallEndedLocked(/*retry=*/true);
312     return;
313   }
314   // Initialize payload and batch.
315   payload_.context = context_;
316   batch_.payload = &payload_;
317   // on_complete callback takes ref, handled manually.
318   call_->Ref(DEBUG_LOCATION, "on_complete").release();
319   batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
320                                          grpc_schedule_on_exec_ctx);
321   // Add send_initial_metadata op.
322   error = grpc_metadata_batch_add_head(
323       &send_initial_metadata_, &path_metadata_storage_,
324       grpc_mdelem_from_slices(
325           GRPC_MDSTR_PATH,
326           GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
327       GRPC_BATCH_PATH);
328   GPR_ASSERT(error == GRPC_ERROR_NONE);
329   payload_.send_initial_metadata.send_initial_metadata =
330       &send_initial_metadata_;
331   payload_.send_initial_metadata.send_initial_metadata_flags = 0;
332   payload_.send_initial_metadata.peer_string = nullptr;
333   batch_.send_initial_metadata = true;
334   // Add send_message op.
335   EncodeRequest(health_check_client_->service_name_, &send_message_);
336   payload_.send_message.send_message.reset(send_message_.get());
337   batch_.send_message = true;
338   // Add send_trailing_metadata op.
339   payload_.send_trailing_metadata.send_trailing_metadata =
340       &send_trailing_metadata_;
341   batch_.send_trailing_metadata = true;
342   // Add recv_initial_metadata op.
343   payload_.recv_initial_metadata.recv_initial_metadata =
344       &recv_initial_metadata_;
345   payload_.recv_initial_metadata.recv_flags = nullptr;
346   payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
347   payload_.recv_initial_metadata.peer_string = nullptr;
348   // recv_initial_metadata_ready callback takes ref, handled manually.
349   call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
350   payload_.recv_initial_metadata.recv_initial_metadata_ready =
351       GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
352                         this, grpc_schedule_on_exec_ctx);
353   batch_.recv_initial_metadata = true;
354   // Add recv_message op.
355   payload_.recv_message.recv_message = &recv_message_;
356   payload_.recv_message.call_failed_before_recv_message = nullptr;
357   // recv_message callback takes ref, handled manually.
358   call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
359   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
360       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
361   batch_.recv_message = true;
362   // Start batch.
363   StartBatch(&batch_);
364   // Initialize recv_trailing_metadata batch.
365   recv_trailing_metadata_batch_.payload = &payload_;
366   // Add recv_trailing_metadata op.
367   payload_.recv_trailing_metadata.recv_trailing_metadata =
368       &recv_trailing_metadata_;
369   payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
370   // This callback signals the end of the call, so it relies on the
371   // initial ref instead of taking a new ref.  When it's invoked, the
372   // initial ref is released.
373   payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
374       GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
375                         RecvTrailingMetadataReady, this,
376                         grpc_schedule_on_exec_ctx);
377   recv_trailing_metadata_batch_.recv_trailing_metadata = true;
378   // Start recv_trailing_metadata batch.
379   StartBatch(&recv_trailing_metadata_batch_);
380 }
381 
StartBatchInCallCombiner(void * arg,grpc_error_handle)382 void HealthCheckClient::CallState::StartBatchInCallCombiner(
383     void* arg, grpc_error_handle /*error*/) {
384   grpc_transport_stream_op_batch* batch =
385       static_cast<grpc_transport_stream_op_batch*>(arg);
386   SubchannelCall* call =
387       static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
388   call->StartTransportStreamOpBatch(batch);
389 }
390 
StartBatch(grpc_transport_stream_op_batch * batch)391 void HealthCheckClient::CallState::StartBatch(
392     grpc_transport_stream_op_batch* batch) {
393   batch->handler_private.extra_arg = call_;
394   GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
395                     batch, grpc_schedule_on_exec_ctx);
396   GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
397                            GRPC_ERROR_NONE, "start_subchannel_batch");
398 }
399 
AfterCallStackDestruction(void * arg,grpc_error_handle)400 void HealthCheckClient::CallState::AfterCallStackDestruction(
401     void* arg, grpc_error_handle /*error*/) {
402   HealthCheckClient::CallState* self =
403       static_cast<HealthCheckClient::CallState*>(arg);
404   delete self;
405 }
406 
OnCancelComplete(void * arg,grpc_error_handle)407 void HealthCheckClient::CallState::OnCancelComplete(
408     void* arg, grpc_error_handle /*error*/) {
409   HealthCheckClient::CallState* self =
410       static_cast<HealthCheckClient::CallState*>(arg);
411   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
412   self->call_->Unref(DEBUG_LOCATION, "cancel");
413 }
414 
StartCancel(void * arg,grpc_error_handle)415 void HealthCheckClient::CallState::StartCancel(void* arg,
416                                                grpc_error_handle /*error*/) {
417   HealthCheckClient::CallState* self =
418       static_cast<HealthCheckClient::CallState*>(arg);
419   auto* batch = grpc_make_transport_stream_op(
420       GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
421   batch->cancel_stream = true;
422   batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
423   self->call_->StartTransportStreamOpBatch(batch);
424 }
425 
Cancel()426 void HealthCheckClient::CallState::Cancel() {
427   bool expected = false;
428   if (cancelled_.compare_exchange_strong(expected, true,
429                                          std::memory_order_acq_rel,
430                                          std::memory_order_acquire)) {
431     call_->Ref(DEBUG_LOCATION, "cancel").release();
432     GRPC_CALL_COMBINER_START(
433         &call_combiner_,
434         GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
435         GRPC_ERROR_NONE, "health_cancel");
436   }
437 }
438 
OnComplete(void * arg,grpc_error_handle)439 void HealthCheckClient::CallState::OnComplete(void* arg,
440                                               grpc_error_handle /*error*/) {
441   HealthCheckClient::CallState* self =
442       static_cast<HealthCheckClient::CallState*>(arg);
443   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
444   self->send_initial_metadata_.Clear();
445   self->send_trailing_metadata_.Clear();
446   self->call_->Unref(DEBUG_LOCATION, "on_complete");
447 }
448 
RecvInitialMetadataReady(void * arg,grpc_error_handle)449 void HealthCheckClient::CallState::RecvInitialMetadataReady(
450     void* arg, grpc_error_handle /*error*/) {
451   HealthCheckClient::CallState* self =
452       static_cast<HealthCheckClient::CallState*>(arg);
453   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
454   self->recv_initial_metadata_.Clear();
455   self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
456 }
457 
DoneReadingRecvMessage(grpc_error_handle error)458 void HealthCheckClient::CallState::DoneReadingRecvMessage(
459     grpc_error_handle error) {
460   recv_message_.reset();
461   if (error != GRPC_ERROR_NONE) {
462     GRPC_ERROR_UNREF(error);
463     Cancel();
464     grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
465     call_->Unref(DEBUG_LOCATION, "recv_message_ready");
466     return;
467   }
468   const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
469   const grpc_connectivity_state state =
470       healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
471   health_check_client_->SetHealthStatus(
472       state, error == GRPC_ERROR_NONE && !healthy
473                  ? "backend unhealthy"
474                  : grpc_error_std_string(error).c_str());
475   seen_response_.store(true, std::memory_order_release);
476   grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
477   // Start another recv_message batch.
478   // This re-uses the ref we're holding.
479   // Note: Can't just reuse batch_ here, since we don't know that all
480   // callbacks from the original batch have completed yet.
481   recv_message_batch_.payload = &payload_;
482   payload_.recv_message.recv_message = &recv_message_;
483   payload_.recv_message.call_failed_before_recv_message = nullptr;
484   payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
485       &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
486   recv_message_batch_.recv_message = true;
487   StartBatch(&recv_message_batch_);
488 }
489 
PullSliceFromRecvMessage()490 grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
491   grpc_slice slice;
492   grpc_error_handle error = recv_message_->Pull(&slice);
493   if (error == GRPC_ERROR_NONE) {
494     grpc_slice_buffer_add(&recv_message_buffer_, slice);
495   }
496   return error;
497 }
498 
ContinueReadingRecvMessage()499 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
500   while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
501     grpc_error_handle error = PullSliceFromRecvMessage();
502     if (error != GRPC_ERROR_NONE) {
503       DoneReadingRecvMessage(error);
504       return;
505     }
506     if (recv_message_buffer_.length == recv_message_->length()) {
507       DoneReadingRecvMessage(GRPC_ERROR_NONE);
508       break;
509     }
510   }
511 }
512 
OnByteStreamNext(void * arg,grpc_error_handle error)513 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
514                                                     grpc_error_handle error) {
515   HealthCheckClient::CallState* self =
516       static_cast<HealthCheckClient::CallState*>(arg);
517   if (error != GRPC_ERROR_NONE) {
518     self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
519     return;
520   }
521   error = self->PullSliceFromRecvMessage();
522   if (error != GRPC_ERROR_NONE) {
523     self->DoneReadingRecvMessage(error);
524     return;
525   }
526   if (self->recv_message_buffer_.length == self->recv_message_->length()) {
527     self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
528   } else {
529     self->ContinueReadingRecvMessage();
530   }
531 }
532 
RecvMessageReady(void * arg,grpc_error_handle)533 void HealthCheckClient::CallState::RecvMessageReady(
534     void* arg, grpc_error_handle /*error*/) {
535   HealthCheckClient::CallState* self =
536       static_cast<HealthCheckClient::CallState*>(arg);
537   GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
538   if (self->recv_message_ == nullptr) {
539     self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
540     return;
541   }
542   grpc_slice_buffer_init(&self->recv_message_buffer_);
543   GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
544                     grpc_schedule_on_exec_ctx);
545   self->ContinueReadingRecvMessage();
546   // Ref will continue to be held until we finish draining the byte stream.
547 }
548 
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)549 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
550     void* arg, grpc_error_handle error) {
551   HealthCheckClient::CallState* self =
552       static_cast<HealthCheckClient::CallState*>(arg);
553   GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
554                           "recv_trailing_metadata_ready");
555   // Get call status.
556   grpc_status_code status = GRPC_STATUS_UNKNOWN;
557   if (error != GRPC_ERROR_NONE) {
558     grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
559                           nullptr /* slice */, nullptr /* http_error */,
560                           nullptr /* error_string */);
561   } else if (self->recv_trailing_metadata_.legacy_index()->named.grpc_status !=
562              nullptr) {
563     status = grpc_get_status_code_from_metadata(
564         self->recv_trailing_metadata_.legacy_index()->named.grpc_status->md);
565   }
566   if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
567     gpr_log(GPR_INFO,
568             "HealthCheckClient %p CallState %p: health watch failed with "
569             "status %d",
570             self->health_check_client_.get(), self, status);
571   }
572   // Clean up.
573   self->recv_trailing_metadata_.Clear();
574   // For status UNIMPLEMENTED, give up and assume always healthy.
575   bool retry = true;
576   if (status == GRPC_STATUS_UNIMPLEMENTED) {
577     static const char kErrorMessage[] =
578         "health checking Watch method returned UNIMPLEMENTED; "
579         "disabling health checks but assuming server is healthy";
580     gpr_log(GPR_ERROR, kErrorMessage);
581     if (self->health_check_client_->channelz_node_ != nullptr) {
582       self->health_check_client_->channelz_node_->AddTraceEvent(
583           channelz::ChannelTrace::Error,
584           grpc_slice_from_static_string(kErrorMessage));
585     }
586     self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
587                                                 kErrorMessage);
588     retry = false;
589   }
590   MutexLock lock(&self->health_check_client_->mu_);
591   self->CallEndedLocked(retry);
592 }
593 
CallEndedLocked(bool retry)594 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
595   // If this CallState is still in use, this call ended because of a failure,
596   // so we need to stop using it and optionally create a new one.
597   // Otherwise, we have deliberately ended this call, and no further action
598   // is required.
599   if (this == health_check_client_->call_state_.get()) {
600     health_check_client_->call_state_.reset();
601     if (retry) {
602       GPR_ASSERT(!health_check_client_->shutting_down_);
603       if (seen_response_.load(std::memory_order_acquire)) {
604         // If the call fails after we've gotten a successful response, reset
605         // the backoff and restart the call immediately.
606         health_check_client_->retry_backoff_.Reset();
607         health_check_client_->StartCallLocked();
608       } else {
609         // If the call failed without receiving any messages, retry later.
610         health_check_client_->StartRetryTimerLocked();
611       }
612     }
613   }
614   // When the last ref to the call stack goes away, the CallState object
615   // will be automatically destroyed.
616   call_->Unref(DEBUG_LOCATION, "call_ended");
617 }
618 
619 }  // namespace grpc_core
620