1 /* 2 * 3 * Copyright 2018 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H 20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H 21 22 #include <grpc/support/port_platform.h> 23 24 #include <atomic> 25 26 #include <grpc/grpc.h> 27 #include <grpc/support/sync.h> 28 29 #include "src/core/ext/filters/client_channel/client_channel_channelz.h" 30 #include "src/core/ext/filters/client_channel/subchannel.h" 31 #include "src/core/lib/backoff/backoff.h" 32 #include "src/core/lib/gprpp/arena.h" 33 #include "src/core/lib/gprpp/orphanable.h" 34 #include "src/core/lib/gprpp/ref_counted_ptr.h" 35 #include "src/core/lib/gprpp/sync.h" 36 #include "src/core/lib/iomgr/call_combiner.h" 37 #include "src/core/lib/iomgr/closure.h" 38 #include "src/core/lib/iomgr/polling_entity.h" 39 #include "src/core/lib/iomgr/timer.h" 40 #include "src/core/lib/transport/byte_stream.h" 41 #include "src/core/lib/transport/metadata_batch.h" 42 #include "src/core/lib/transport/transport.h" 43 44 namespace grpc_core { 45 46 class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> { 47 public: 48 HealthCheckClient(std::string service_name, 49 RefCountedPtr<ConnectedSubchannel> connected_subchannel, 50 grpc_pollset_set* interested_parties, 51 RefCountedPtr<channelz::SubchannelNode> channelz_node, 52 RefCountedPtr<ConnectivityStateWatcherInterface> watcher); 53 54 ~HealthCheckClient() override; 55 56 void Orphan() override; 57 58 private: 59 // Contains a call to the backend and all the data related to the call. 60 class CallState : public Orphanable { 61 public: 62 CallState(RefCountedPtr<HealthCheckClient> health_check_client, 63 grpc_pollset_set* interested_parties_); 64 ~CallState() override; 65 66 void Orphan() override; 67 68 void StartCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthCheckClient::mu_); 69 70 private: 71 void Cancel(); 72 73 void StartBatch(grpc_transport_stream_op_batch* batch); 74 static void StartBatchInCallCombiner(void* arg, grpc_error_handle error); 75 76 void CallEndedLocked(bool retry) 77 ABSL_EXCLUSIVE_LOCKS_REQUIRED(health_check_client_->mu_); 78 79 static void OnComplete(void* arg, grpc_error_handle error); 80 static void RecvInitialMetadataReady(void* arg, grpc_error_handle error); 81 static void RecvMessageReady(void* arg, grpc_error_handle error); 82 static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error); 83 static void StartCancel(void* arg, grpc_error_handle error); 84 static void OnCancelComplete(void* arg, grpc_error_handle error); 85 86 static void OnByteStreamNext(void* arg, grpc_error_handle error); 87 void ContinueReadingRecvMessage(); 88 grpc_error_handle PullSliceFromRecvMessage(); 89 void DoneReadingRecvMessage(grpc_error_handle error); 90 91 static void AfterCallStackDestruction(void* arg, grpc_error_handle error); 92 93 RefCountedPtr<HealthCheckClient> health_check_client_; 94 grpc_polling_entity pollent_; 95 96 Arena* arena_; 97 grpc_core::CallCombiner call_combiner_; 98 grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {}; 99 100 // The streaming call to the backend. Always non-null. 101 // Refs are tracked manually; when the last ref is released, the 102 // CallState object will be automatically destroyed. 103 SubchannelCall* call_; 104 105 grpc_transport_stream_op_batch_payload payload_; 106 grpc_transport_stream_op_batch batch_; 107 grpc_transport_stream_op_batch recv_message_batch_; 108 grpc_transport_stream_op_batch recv_trailing_metadata_batch_; 109 110 grpc_closure on_complete_; 111 112 // send_initial_metadata 113 grpc_metadata_batch send_initial_metadata_; 114 grpc_linked_mdelem path_metadata_storage_; 115 116 // send_message 117 ManualConstructor<SliceBufferByteStream> send_message_; 118 119 // send_trailing_metadata 120 grpc_metadata_batch send_trailing_metadata_; 121 122 // recv_initial_metadata 123 grpc_metadata_batch recv_initial_metadata_; 124 grpc_closure recv_initial_metadata_ready_; 125 126 // recv_message 127 OrphanablePtr<ByteStream> recv_message_; 128 grpc_closure recv_message_ready_; 129 grpc_slice_buffer recv_message_buffer_; 130 std::atomic<bool> seen_response_{false}; 131 132 // True if the cancel_stream batch has been started. 133 std::atomic<bool> cancelled_{false}; 134 135 // recv_trailing_metadata 136 grpc_metadata_batch recv_trailing_metadata_; 137 grpc_transport_stream_stats collect_stats_; 138 grpc_closure recv_trailing_metadata_ready_; 139 140 // Closure for call stack destruction. 141 grpc_closure after_call_stack_destruction_; 142 }; 143 144 void StartCall(); 145 void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 146 147 void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 148 static void OnRetryTimer(void* arg, grpc_error_handle error); 149 150 void SetHealthStatus(grpc_connectivity_state state, const char* reason); 151 void SetHealthStatusLocked(grpc_connectivity_state state, const char* reason) 152 ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); 153 154 std::string service_name_; 155 RefCountedPtr<ConnectedSubchannel> connected_subchannel_; 156 grpc_pollset_set* interested_parties_; // Do not own. 157 RefCountedPtr<channelz::SubchannelNode> channelz_node_; 158 159 Mutex mu_; 160 RefCountedPtr<ConnectivityStateWatcherInterface> watcher_ 161 ABSL_GUARDED_BY(mu_); 162 bool shutting_down_ ABSL_GUARDED_BY(mu_) = false; 163 164 // The data associated with the current health check call. It holds a ref 165 // to this HealthCheckClient object. 166 OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_); 167 168 // Call retry state. 169 BackOff retry_backoff_ ABSL_GUARDED_BY(mu_); 170 grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_); 171 grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_); 172 bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false; 173 }; 174 175 } // namespace grpc_core 176 177 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */ 178