1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/health/health_check_client.h"
22
23 #include <stdint.h>
24 #include <stdio.h>
25
26 #include "upb/upb.hpp"
27
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/gprpp/sync.h"
30 #include "src/core/lib/slice/slice_internal.h"
31 #include "src/core/lib/transport/error_utils.h"
32 #include "src/core/lib/transport/status_metadata.h"
33 #include "src/proto/grpc/health/v1/health.upb.h"
34
35 #define HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS 1
36 #define HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER 1.6
37 #define HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS 120
38 #define HEALTH_CHECK_RECONNECT_JITTER 0.2
39
40 namespace grpc_core {
41
42 TraceFlag grpc_health_check_client_trace(false, "health_check_client");
43
44 //
45 // HealthCheckClient
46 //
47
HealthCheckClient(std::string service_name,RefCountedPtr<ConnectedSubchannel> connected_subchannel,grpc_pollset_set * interested_parties,RefCountedPtr<channelz::SubchannelNode> channelz_node,RefCountedPtr<ConnectivityStateWatcherInterface> watcher)48 HealthCheckClient::HealthCheckClient(
49 std::string service_name,
50 RefCountedPtr<ConnectedSubchannel> connected_subchannel,
51 grpc_pollset_set* interested_parties,
52 RefCountedPtr<channelz::SubchannelNode> channelz_node,
53 RefCountedPtr<ConnectivityStateWatcherInterface> watcher)
54 : InternallyRefCounted<HealthCheckClient>(
55 GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)
56 ? "HealthCheckClient"
57 : nullptr),
58 service_name_(std::move(service_name)),
59 connected_subchannel_(std::move(connected_subchannel)),
60 interested_parties_(interested_parties),
61 channelz_node_(std::move(channelz_node)),
62 watcher_(std::move(watcher)),
63 retry_backoff_(
64 BackOff::Options()
65 .set_initial_backoff(
66 HEALTH_CHECK_INITIAL_CONNECT_BACKOFF_SECONDS * 1000)
67 .set_multiplier(HEALTH_CHECK_RECONNECT_BACKOFF_MULTIPLIER)
68 .set_jitter(HEALTH_CHECK_RECONNECT_JITTER)
69 .set_max_backoff(HEALTH_CHECK_RECONNECT_MAX_BACKOFF_SECONDS *
70 1000)) {
71 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
72 gpr_log(GPR_INFO, "created HealthCheckClient %p", this);
73 }
74 GRPC_CLOSURE_INIT(&retry_timer_callback_, OnRetryTimer, this,
75 grpc_schedule_on_exec_ctx);
76 StartCall();
77 }
78
~HealthCheckClient()79 HealthCheckClient::~HealthCheckClient() {
80 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
81 gpr_log(GPR_INFO, "destroying HealthCheckClient %p", this);
82 }
83 }
84
SetHealthStatus(grpc_connectivity_state state,const char * reason)85 void HealthCheckClient::SetHealthStatus(grpc_connectivity_state state,
86 const char* reason) {
87 MutexLock lock(&mu_);
88 SetHealthStatusLocked(state, reason);
89 }
90
SetHealthStatusLocked(grpc_connectivity_state state,const char * reason)91 void HealthCheckClient::SetHealthStatusLocked(grpc_connectivity_state state,
92 const char* reason) {
93 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
94 gpr_log(GPR_INFO, "HealthCheckClient %p: setting state=%s reason=%s", this,
95 ConnectivityStateName(state), reason);
96 }
97 if (watcher_ != nullptr) {
98 watcher_->Notify(state,
99 state == GRPC_CHANNEL_TRANSIENT_FAILURE
100 ? absl::Status(absl::StatusCode::kUnavailable, reason)
101 : absl::Status());
102 }
103 }
104
Orphan()105 void HealthCheckClient::Orphan() {
106 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
107 gpr_log(GPR_INFO, "HealthCheckClient %p: shutting down", this);
108 }
109 {
110 MutexLock lock(&mu_);
111 shutting_down_ = true;
112 watcher_.reset();
113 call_state_.reset();
114 if (retry_timer_callback_pending_) {
115 grpc_timer_cancel(&retry_timer_);
116 }
117 }
118 Unref(DEBUG_LOCATION, "orphan");
119 }
120
StartCall()121 void HealthCheckClient::StartCall() {
122 MutexLock lock(&mu_);
123 StartCallLocked();
124 }
125
StartCallLocked()126 void HealthCheckClient::StartCallLocked() {
127 if (shutting_down_) return;
128 GPR_ASSERT(call_state_ == nullptr);
129 SetHealthStatusLocked(GRPC_CHANNEL_CONNECTING, "starting health watch");
130 call_state_ = MakeOrphanable<CallState>(Ref(), interested_parties_);
131 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
132 gpr_log(GPR_INFO, "HealthCheckClient %p: created CallState %p", this,
133 call_state_.get());
134 }
135 call_state_->StartCall();
136 }
137
StartRetryTimerLocked()138 void HealthCheckClient::StartRetryTimerLocked() {
139 SetHealthStatusLocked(GRPC_CHANNEL_TRANSIENT_FAILURE,
140 "health check call failed; will retry after backoff");
141 grpc_millis next_try = retry_backoff_.NextAttemptTime();
142 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
143 gpr_log(GPR_INFO, "HealthCheckClient %p: health check call lost...", this);
144 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
145 if (timeout > 0) {
146 gpr_log(GPR_INFO,
147 "HealthCheckClient %p: ... will retry in %" PRId64 "ms.", this,
148 timeout);
149 } else {
150 gpr_log(GPR_INFO, "HealthCheckClient %p: ... retrying immediately.",
151 this);
152 }
153 }
154 // Ref for callback, tracked manually.
155 Ref(DEBUG_LOCATION, "health_retry_timer").release();
156 retry_timer_callback_pending_ = true;
157 grpc_timer_init(&retry_timer_, next_try, &retry_timer_callback_);
158 }
159
OnRetryTimer(void * arg,grpc_error_handle error)160 void HealthCheckClient::OnRetryTimer(void* arg, grpc_error_handle error) {
161 HealthCheckClient* self = static_cast<HealthCheckClient*>(arg);
162 {
163 MutexLock lock(&self->mu_);
164 self->retry_timer_callback_pending_ = false;
165 if (!self->shutting_down_ && error == GRPC_ERROR_NONE &&
166 self->call_state_ == nullptr) {
167 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
168 gpr_log(GPR_INFO, "HealthCheckClient %p: restarting health check call",
169 self);
170 }
171 self->StartCallLocked();
172 }
173 }
174 self->Unref(DEBUG_LOCATION, "health_retry_timer");
175 }
176
177 //
178 // protobuf helpers
179 //
180
181 namespace {
182
EncodeRequest(const std::string & service_name,ManualConstructor<SliceBufferByteStream> * send_message)183 void EncodeRequest(const std::string& service_name,
184 ManualConstructor<SliceBufferByteStream>* send_message) {
185 upb::Arena arena;
186 grpc_health_v1_HealthCheckRequest* request_struct =
187 grpc_health_v1_HealthCheckRequest_new(arena.ptr());
188 grpc_health_v1_HealthCheckRequest_set_service(
189 request_struct,
190 upb_strview_make(service_name.data(), service_name.size()));
191 size_t buf_length;
192 char* buf = grpc_health_v1_HealthCheckRequest_serialize(
193 request_struct, arena.ptr(), &buf_length);
194 grpc_slice request_slice = GRPC_SLICE_MALLOC(buf_length);
195 memcpy(GRPC_SLICE_START_PTR(request_slice), buf, buf_length);
196 grpc_slice_buffer slice_buffer;
197 grpc_slice_buffer_init(&slice_buffer);
198 grpc_slice_buffer_add(&slice_buffer, request_slice);
199 send_message->Init(&slice_buffer, 0);
200 grpc_slice_buffer_destroy_internal(&slice_buffer);
201 }
202
203 // Returns true if healthy.
204 // If there was an error parsing the response, sets *error and returns false.
DecodeResponse(grpc_slice_buffer * slice_buffer,grpc_error_handle * error)205 bool DecodeResponse(grpc_slice_buffer* slice_buffer, grpc_error_handle* error) {
206 // If message is empty, assume unhealthy.
207 if (slice_buffer->length == 0) {
208 *error =
209 GRPC_ERROR_CREATE_FROM_STATIC_STRING("health check response was empty");
210 return false;
211 }
212 // Concatenate the slices to form a single string.
213 std::unique_ptr<uint8_t> recv_message_deleter;
214 uint8_t* recv_message;
215 if (slice_buffer->count == 1) {
216 recv_message = GRPC_SLICE_START_PTR(slice_buffer->slices[0]);
217 } else {
218 recv_message = static_cast<uint8_t*>(gpr_malloc(slice_buffer->length));
219 recv_message_deleter.reset(recv_message);
220 size_t offset = 0;
221 for (size_t i = 0; i < slice_buffer->count; ++i) {
222 memcpy(recv_message + offset,
223 GRPC_SLICE_START_PTR(slice_buffer->slices[i]),
224 GRPC_SLICE_LENGTH(slice_buffer->slices[i]));
225 offset += GRPC_SLICE_LENGTH(slice_buffer->slices[i]);
226 }
227 }
228 // Deserialize message.
229 upb::Arena arena;
230 grpc_health_v1_HealthCheckResponse* response_struct =
231 grpc_health_v1_HealthCheckResponse_parse(
232 reinterpret_cast<char*>(recv_message), slice_buffer->length,
233 arena.ptr());
234 if (response_struct == nullptr) {
235 // Can't parse message; assume unhealthy.
236 *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(
237 "cannot parse health check response");
238 return false;
239 }
240 int32_t status = grpc_health_v1_HealthCheckResponse_status(response_struct);
241 return status == grpc_health_v1_HealthCheckResponse_SERVING;
242 }
243
244 } // namespace
245
246 //
247 // HealthCheckClient::CallState
248 //
249
CallState(RefCountedPtr<HealthCheckClient> health_check_client,grpc_pollset_set * interested_parties)250 HealthCheckClient::CallState::CallState(
251 RefCountedPtr<HealthCheckClient> health_check_client,
252 grpc_pollset_set* interested_parties)
253 : health_check_client_(std::move(health_check_client)),
254 pollent_(grpc_polling_entity_create_from_pollset_set(interested_parties)),
255 arena_(Arena::Create(health_check_client_->connected_subchannel_
256 ->GetInitialCallSizeEstimate())),
257 payload_(context_),
258 send_initial_metadata_(arena_),
259 send_trailing_metadata_(arena_),
260 recv_initial_metadata_(arena_),
261 recv_trailing_metadata_(arena_) {}
262
~CallState()263 HealthCheckClient::CallState::~CallState() {
264 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
265 gpr_log(GPR_INFO, "HealthCheckClient %p: destroying CallState %p",
266 health_check_client_.get(), this);
267 }
268 for (size_t i = 0; i < GRPC_CONTEXT_COUNT; i++) {
269 if (context_[i].destroy != nullptr) {
270 context_[i].destroy(context_[i].value);
271 }
272 }
273 // Unset the call combiner cancellation closure. This has the
274 // effect of scheduling the previously set cancellation closure, if
275 // any, so that it can release any internal references it may be
276 // holding to the call stack.
277 call_combiner_.SetNotifyOnCancel(nullptr);
278 arena_->Destroy();
279 }
280
Orphan()281 void HealthCheckClient::CallState::Orphan() {
282 call_combiner_.Cancel(GRPC_ERROR_CANCELLED);
283 Cancel();
284 }
285
StartCall()286 void HealthCheckClient::CallState::StartCall() {
287 SubchannelCall::Args args = {
288 health_check_client_->connected_subchannel_,
289 &pollent_,
290 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH,
291 gpr_get_cycle_counter(), // start_time
292 GRPC_MILLIS_INF_FUTURE, // deadline
293 arena_,
294 context_,
295 &call_combiner_,
296 };
297 grpc_error_handle error = GRPC_ERROR_NONE;
298 call_ = SubchannelCall::Create(std::move(args), &error).release();
299 // Register after-destruction callback.
300 GRPC_CLOSURE_INIT(&after_call_stack_destruction_, AfterCallStackDestruction,
301 this, grpc_schedule_on_exec_ctx);
302 call_->SetAfterCallStackDestroy(&after_call_stack_destruction_);
303 // Check if creation failed.
304 if (error != GRPC_ERROR_NONE) {
305 gpr_log(GPR_ERROR,
306 "HealthCheckClient %p CallState %p: error creating health "
307 "checking call on subchannel (%s); will retry",
308 health_check_client_.get(), this,
309 grpc_error_std_string(error).c_str());
310 GRPC_ERROR_UNREF(error);
311 CallEndedLocked(/*retry=*/true);
312 return;
313 }
314 // Initialize payload and batch.
315 payload_.context = context_;
316 batch_.payload = &payload_;
317 // on_complete callback takes ref, handled manually.
318 call_->Ref(DEBUG_LOCATION, "on_complete").release();
319 batch_.on_complete = GRPC_CLOSURE_INIT(&on_complete_, OnComplete, this,
320 grpc_schedule_on_exec_ctx);
321 // Add send_initial_metadata op.
322 error = grpc_metadata_batch_add_head(
323 &send_initial_metadata_, &path_metadata_storage_,
324 grpc_mdelem_from_slices(
325 GRPC_MDSTR_PATH,
326 GRPC_MDSTR_SLASH_GRPC_DOT_HEALTH_DOT_V1_DOT_HEALTH_SLASH_WATCH),
327 GRPC_BATCH_PATH);
328 GPR_ASSERT(error == GRPC_ERROR_NONE);
329 payload_.send_initial_metadata.send_initial_metadata =
330 &send_initial_metadata_;
331 payload_.send_initial_metadata.send_initial_metadata_flags = 0;
332 payload_.send_initial_metadata.peer_string = nullptr;
333 batch_.send_initial_metadata = true;
334 // Add send_message op.
335 EncodeRequest(health_check_client_->service_name_, &send_message_);
336 payload_.send_message.send_message.reset(send_message_.get());
337 batch_.send_message = true;
338 // Add send_trailing_metadata op.
339 payload_.send_trailing_metadata.send_trailing_metadata =
340 &send_trailing_metadata_;
341 batch_.send_trailing_metadata = true;
342 // Add recv_initial_metadata op.
343 payload_.recv_initial_metadata.recv_initial_metadata =
344 &recv_initial_metadata_;
345 payload_.recv_initial_metadata.recv_flags = nullptr;
346 payload_.recv_initial_metadata.trailing_metadata_available = nullptr;
347 payload_.recv_initial_metadata.peer_string = nullptr;
348 // recv_initial_metadata_ready callback takes ref, handled manually.
349 call_->Ref(DEBUG_LOCATION, "recv_initial_metadata_ready").release();
350 payload_.recv_initial_metadata.recv_initial_metadata_ready =
351 GRPC_CLOSURE_INIT(&recv_initial_metadata_ready_, RecvInitialMetadataReady,
352 this, grpc_schedule_on_exec_ctx);
353 batch_.recv_initial_metadata = true;
354 // Add recv_message op.
355 payload_.recv_message.recv_message = &recv_message_;
356 payload_.recv_message.call_failed_before_recv_message = nullptr;
357 // recv_message callback takes ref, handled manually.
358 call_->Ref(DEBUG_LOCATION, "recv_message_ready").release();
359 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
360 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
361 batch_.recv_message = true;
362 // Start batch.
363 StartBatch(&batch_);
364 // Initialize recv_trailing_metadata batch.
365 recv_trailing_metadata_batch_.payload = &payload_;
366 // Add recv_trailing_metadata op.
367 payload_.recv_trailing_metadata.recv_trailing_metadata =
368 &recv_trailing_metadata_;
369 payload_.recv_trailing_metadata.collect_stats = &collect_stats_;
370 // This callback signals the end of the call, so it relies on the
371 // initial ref instead of taking a new ref. When it's invoked, the
372 // initial ref is released.
373 payload_.recv_trailing_metadata.recv_trailing_metadata_ready =
374 GRPC_CLOSURE_INIT(&recv_trailing_metadata_ready_,
375 RecvTrailingMetadataReady, this,
376 grpc_schedule_on_exec_ctx);
377 recv_trailing_metadata_batch_.recv_trailing_metadata = true;
378 // Start recv_trailing_metadata batch.
379 StartBatch(&recv_trailing_metadata_batch_);
380 }
381
StartBatchInCallCombiner(void * arg,grpc_error_handle)382 void HealthCheckClient::CallState::StartBatchInCallCombiner(
383 void* arg, grpc_error_handle /*error*/) {
384 grpc_transport_stream_op_batch* batch =
385 static_cast<grpc_transport_stream_op_batch*>(arg);
386 SubchannelCall* call =
387 static_cast<SubchannelCall*>(batch->handler_private.extra_arg);
388 call->StartTransportStreamOpBatch(batch);
389 }
390
StartBatch(grpc_transport_stream_op_batch * batch)391 void HealthCheckClient::CallState::StartBatch(
392 grpc_transport_stream_op_batch* batch) {
393 batch->handler_private.extra_arg = call_;
394 GRPC_CLOSURE_INIT(&batch->handler_private.closure, StartBatchInCallCombiner,
395 batch, grpc_schedule_on_exec_ctx);
396 GRPC_CALL_COMBINER_START(&call_combiner_, &batch->handler_private.closure,
397 GRPC_ERROR_NONE, "start_subchannel_batch");
398 }
399
AfterCallStackDestruction(void * arg,grpc_error_handle)400 void HealthCheckClient::CallState::AfterCallStackDestruction(
401 void* arg, grpc_error_handle /*error*/) {
402 HealthCheckClient::CallState* self =
403 static_cast<HealthCheckClient::CallState*>(arg);
404 delete self;
405 }
406
OnCancelComplete(void * arg,grpc_error_handle)407 void HealthCheckClient::CallState::OnCancelComplete(
408 void* arg, grpc_error_handle /*error*/) {
409 HealthCheckClient::CallState* self =
410 static_cast<HealthCheckClient::CallState*>(arg);
411 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "health_cancel");
412 self->call_->Unref(DEBUG_LOCATION, "cancel");
413 }
414
StartCancel(void * arg,grpc_error_handle)415 void HealthCheckClient::CallState::StartCancel(void* arg,
416 grpc_error_handle /*error*/) {
417 HealthCheckClient::CallState* self =
418 static_cast<HealthCheckClient::CallState*>(arg);
419 auto* batch = grpc_make_transport_stream_op(
420 GRPC_CLOSURE_CREATE(OnCancelComplete, self, grpc_schedule_on_exec_ctx));
421 batch->cancel_stream = true;
422 batch->payload->cancel_stream.cancel_error = GRPC_ERROR_CANCELLED;
423 self->call_->StartTransportStreamOpBatch(batch);
424 }
425
Cancel()426 void HealthCheckClient::CallState::Cancel() {
427 bool expected = false;
428 if (cancelled_.compare_exchange_strong(expected, true,
429 std::memory_order_acq_rel,
430 std::memory_order_acquire)) {
431 call_->Ref(DEBUG_LOCATION, "cancel").release();
432 GRPC_CALL_COMBINER_START(
433 &call_combiner_,
434 GRPC_CLOSURE_CREATE(StartCancel, this, grpc_schedule_on_exec_ctx),
435 GRPC_ERROR_NONE, "health_cancel");
436 }
437 }
438
OnComplete(void * arg,grpc_error_handle)439 void HealthCheckClient::CallState::OnComplete(void* arg,
440 grpc_error_handle /*error*/) {
441 HealthCheckClient::CallState* self =
442 static_cast<HealthCheckClient::CallState*>(arg);
443 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "on_complete");
444 self->send_initial_metadata_.Clear();
445 self->send_trailing_metadata_.Clear();
446 self->call_->Unref(DEBUG_LOCATION, "on_complete");
447 }
448
RecvInitialMetadataReady(void * arg,grpc_error_handle)449 void HealthCheckClient::CallState::RecvInitialMetadataReady(
450 void* arg, grpc_error_handle /*error*/) {
451 HealthCheckClient::CallState* self =
452 static_cast<HealthCheckClient::CallState*>(arg);
453 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_initial_metadata_ready");
454 self->recv_initial_metadata_.Clear();
455 self->call_->Unref(DEBUG_LOCATION, "recv_initial_metadata_ready");
456 }
457
DoneReadingRecvMessage(grpc_error_handle error)458 void HealthCheckClient::CallState::DoneReadingRecvMessage(
459 grpc_error_handle error) {
460 recv_message_.reset();
461 if (error != GRPC_ERROR_NONE) {
462 GRPC_ERROR_UNREF(error);
463 Cancel();
464 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
465 call_->Unref(DEBUG_LOCATION, "recv_message_ready");
466 return;
467 }
468 const bool healthy = DecodeResponse(&recv_message_buffer_, &error);
469 const grpc_connectivity_state state =
470 healthy ? GRPC_CHANNEL_READY : GRPC_CHANNEL_TRANSIENT_FAILURE;
471 health_check_client_->SetHealthStatus(
472 state, error == GRPC_ERROR_NONE && !healthy
473 ? "backend unhealthy"
474 : grpc_error_std_string(error).c_str());
475 seen_response_.store(true, std::memory_order_release);
476 grpc_slice_buffer_destroy_internal(&recv_message_buffer_);
477 // Start another recv_message batch.
478 // This re-uses the ref we're holding.
479 // Note: Can't just reuse batch_ here, since we don't know that all
480 // callbacks from the original batch have completed yet.
481 recv_message_batch_.payload = &payload_;
482 payload_.recv_message.recv_message = &recv_message_;
483 payload_.recv_message.call_failed_before_recv_message = nullptr;
484 payload_.recv_message.recv_message_ready = GRPC_CLOSURE_INIT(
485 &recv_message_ready_, RecvMessageReady, this, grpc_schedule_on_exec_ctx);
486 recv_message_batch_.recv_message = true;
487 StartBatch(&recv_message_batch_);
488 }
489
PullSliceFromRecvMessage()490 grpc_error_handle HealthCheckClient::CallState::PullSliceFromRecvMessage() {
491 grpc_slice slice;
492 grpc_error_handle error = recv_message_->Pull(&slice);
493 if (error == GRPC_ERROR_NONE) {
494 grpc_slice_buffer_add(&recv_message_buffer_, slice);
495 }
496 return error;
497 }
498
ContinueReadingRecvMessage()499 void HealthCheckClient::CallState::ContinueReadingRecvMessage() {
500 while (recv_message_->Next(SIZE_MAX, &recv_message_ready_)) {
501 grpc_error_handle error = PullSliceFromRecvMessage();
502 if (error != GRPC_ERROR_NONE) {
503 DoneReadingRecvMessage(error);
504 return;
505 }
506 if (recv_message_buffer_.length == recv_message_->length()) {
507 DoneReadingRecvMessage(GRPC_ERROR_NONE);
508 break;
509 }
510 }
511 }
512
OnByteStreamNext(void * arg,grpc_error_handle error)513 void HealthCheckClient::CallState::OnByteStreamNext(void* arg,
514 grpc_error_handle error) {
515 HealthCheckClient::CallState* self =
516 static_cast<HealthCheckClient::CallState*>(arg);
517 if (error != GRPC_ERROR_NONE) {
518 self->DoneReadingRecvMessage(GRPC_ERROR_REF(error));
519 return;
520 }
521 error = self->PullSliceFromRecvMessage();
522 if (error != GRPC_ERROR_NONE) {
523 self->DoneReadingRecvMessage(error);
524 return;
525 }
526 if (self->recv_message_buffer_.length == self->recv_message_->length()) {
527 self->DoneReadingRecvMessage(GRPC_ERROR_NONE);
528 } else {
529 self->ContinueReadingRecvMessage();
530 }
531 }
532
RecvMessageReady(void * arg,grpc_error_handle)533 void HealthCheckClient::CallState::RecvMessageReady(
534 void* arg, grpc_error_handle /*error*/) {
535 HealthCheckClient::CallState* self =
536 static_cast<HealthCheckClient::CallState*>(arg);
537 GRPC_CALL_COMBINER_STOP(&self->call_combiner_, "recv_message_ready");
538 if (self->recv_message_ == nullptr) {
539 self->call_->Unref(DEBUG_LOCATION, "recv_message_ready");
540 return;
541 }
542 grpc_slice_buffer_init(&self->recv_message_buffer_);
543 GRPC_CLOSURE_INIT(&self->recv_message_ready_, OnByteStreamNext, self,
544 grpc_schedule_on_exec_ctx);
545 self->ContinueReadingRecvMessage();
546 // Ref will continue to be held until we finish draining the byte stream.
547 }
548
RecvTrailingMetadataReady(void * arg,grpc_error_handle error)549 void HealthCheckClient::CallState::RecvTrailingMetadataReady(
550 void* arg, grpc_error_handle error) {
551 HealthCheckClient::CallState* self =
552 static_cast<HealthCheckClient::CallState*>(arg);
553 GRPC_CALL_COMBINER_STOP(&self->call_combiner_,
554 "recv_trailing_metadata_ready");
555 // Get call status.
556 grpc_status_code status = GRPC_STATUS_UNKNOWN;
557 if (error != GRPC_ERROR_NONE) {
558 grpc_error_get_status(error, GRPC_MILLIS_INF_FUTURE, &status,
559 nullptr /* slice */, nullptr /* http_error */,
560 nullptr /* error_string */);
561 } else if (self->recv_trailing_metadata_.legacy_index()->named.grpc_status !=
562 nullptr) {
563 status = grpc_get_status_code_from_metadata(
564 self->recv_trailing_metadata_.legacy_index()->named.grpc_status->md);
565 }
566 if (GRPC_TRACE_FLAG_ENABLED(grpc_health_check_client_trace)) {
567 gpr_log(GPR_INFO,
568 "HealthCheckClient %p CallState %p: health watch failed with "
569 "status %d",
570 self->health_check_client_.get(), self, status);
571 }
572 // Clean up.
573 self->recv_trailing_metadata_.Clear();
574 // For status UNIMPLEMENTED, give up and assume always healthy.
575 bool retry = true;
576 if (status == GRPC_STATUS_UNIMPLEMENTED) {
577 static const char kErrorMessage[] =
578 "health checking Watch method returned UNIMPLEMENTED; "
579 "disabling health checks but assuming server is healthy";
580 gpr_log(GPR_ERROR, kErrorMessage);
581 if (self->health_check_client_->channelz_node_ != nullptr) {
582 self->health_check_client_->channelz_node_->AddTraceEvent(
583 channelz::ChannelTrace::Error,
584 grpc_slice_from_static_string(kErrorMessage));
585 }
586 self->health_check_client_->SetHealthStatus(GRPC_CHANNEL_READY,
587 kErrorMessage);
588 retry = false;
589 }
590 MutexLock lock(&self->health_check_client_->mu_);
591 self->CallEndedLocked(retry);
592 }
593
CallEndedLocked(bool retry)594 void HealthCheckClient::CallState::CallEndedLocked(bool retry) {
595 // If this CallState is still in use, this call ended because of a failure,
596 // so we need to stop using it and optionally create a new one.
597 // Otherwise, we have deliberately ended this call, and no further action
598 // is required.
599 if (this == health_check_client_->call_state_.get()) {
600 health_check_client_->call_state_.reset();
601 if (retry) {
602 GPR_ASSERT(!health_check_client_->shutting_down_);
603 if (seen_response_.load(std::memory_order_acquire)) {
604 // If the call fails after we've gotten a successful response, reset
605 // the backoff and restart the call immediately.
606 health_check_client_->retry_backoff_.Reset();
607 health_check_client_->StartCallLocked();
608 } else {
609 // If the call failed without receiving any messages, retry later.
610 health_check_client_->StartRetryTimerLocked();
611 }
612 }
613 }
614 // When the last ref to the call stack goes away, the CallState object
615 // will be automatically destroyed.
616 call_->Unref(DEBUG_LOCATION, "call_ended");
617 }
618
619 } // namespace grpc_core
620