1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H
21 
22 #include <grpc/support/port_platform.h>
23 
24 #include <atomic>
25 
26 #include <grpc/grpc.h>
27 #include <grpc/support/sync.h>
28 
29 #include "src/core/ext/filters/client_channel/client_channel_channelz.h"
30 #include "src/core/ext/filters/client_channel/subchannel.h"
31 #include "src/core/lib/backoff/backoff.h"
32 #include "src/core/lib/gprpp/arena.h"
33 #include "src/core/lib/gprpp/orphanable.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/gprpp/sync.h"
36 #include "src/core/lib/iomgr/call_combiner.h"
37 #include "src/core/lib/iomgr/closure.h"
38 #include "src/core/lib/iomgr/polling_entity.h"
39 #include "src/core/lib/iomgr/timer.h"
40 #include "src/core/lib/transport/byte_stream.h"
41 #include "src/core/lib/transport/metadata_batch.h"
42 #include "src/core/lib/transport/transport.h"
43 
44 namespace grpc_core {
45 
46 class HealthCheckClient : public InternallyRefCounted<HealthCheckClient> {
47  public:
48   HealthCheckClient(std::string service_name,
49                     RefCountedPtr<ConnectedSubchannel> connected_subchannel,
50                     grpc_pollset_set* interested_parties,
51                     RefCountedPtr<channelz::SubchannelNode> channelz_node,
52                     RefCountedPtr<ConnectivityStateWatcherInterface> watcher);
53 
54   ~HealthCheckClient() override;
55 
56   void Orphan() override;
57 
58  private:
59   // Contains a call to the backend and all the data related to the call.
60   class CallState : public Orphanable {
61    public:
62     CallState(RefCountedPtr<HealthCheckClient> health_check_client,
63               grpc_pollset_set* interested_parties_);
64     ~CallState() override;
65 
66     void Orphan() override;
67 
68     void StartCall() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthCheckClient::mu_);
69 
70    private:
71     void Cancel();
72 
73     void StartBatch(grpc_transport_stream_op_batch* batch);
74     static void StartBatchInCallCombiner(void* arg, grpc_error_handle error);
75 
76     void CallEndedLocked(bool retry)
77         ABSL_EXCLUSIVE_LOCKS_REQUIRED(health_check_client_->mu_);
78 
79     static void OnComplete(void* arg, grpc_error_handle error);
80     static void RecvInitialMetadataReady(void* arg, grpc_error_handle error);
81     static void RecvMessageReady(void* arg, grpc_error_handle error);
82     static void RecvTrailingMetadataReady(void* arg, grpc_error_handle error);
83     static void StartCancel(void* arg, grpc_error_handle error);
84     static void OnCancelComplete(void* arg, grpc_error_handle error);
85 
86     static void OnByteStreamNext(void* arg, grpc_error_handle error);
87     void ContinueReadingRecvMessage();
88     grpc_error_handle PullSliceFromRecvMessage();
89     void DoneReadingRecvMessage(grpc_error_handle error);
90 
91     static void AfterCallStackDestruction(void* arg, grpc_error_handle error);
92 
93     RefCountedPtr<HealthCheckClient> health_check_client_;
94     grpc_polling_entity pollent_;
95 
96     Arena* arena_;
97     grpc_core::CallCombiner call_combiner_;
98     grpc_call_context_element context_[GRPC_CONTEXT_COUNT] = {};
99 
100     // The streaming call to the backend. Always non-null.
101     // Refs are tracked manually; when the last ref is released, the
102     // CallState object will be automatically destroyed.
103     SubchannelCall* call_;
104 
105     grpc_transport_stream_op_batch_payload payload_;
106     grpc_transport_stream_op_batch batch_;
107     grpc_transport_stream_op_batch recv_message_batch_;
108     grpc_transport_stream_op_batch recv_trailing_metadata_batch_;
109 
110     grpc_closure on_complete_;
111 
112     // send_initial_metadata
113     grpc_metadata_batch send_initial_metadata_;
114     grpc_linked_mdelem path_metadata_storage_;
115 
116     // send_message
117     ManualConstructor<SliceBufferByteStream> send_message_;
118 
119     // send_trailing_metadata
120     grpc_metadata_batch send_trailing_metadata_;
121 
122     // recv_initial_metadata
123     grpc_metadata_batch recv_initial_metadata_;
124     grpc_closure recv_initial_metadata_ready_;
125 
126     // recv_message
127     OrphanablePtr<ByteStream> recv_message_;
128     grpc_closure recv_message_ready_;
129     grpc_slice_buffer recv_message_buffer_;
130     std::atomic<bool> seen_response_{false};
131 
132     // True if the cancel_stream batch has been started.
133     std::atomic<bool> cancelled_{false};
134 
135     // recv_trailing_metadata
136     grpc_metadata_batch recv_trailing_metadata_;
137     grpc_transport_stream_stats collect_stats_;
138     grpc_closure recv_trailing_metadata_ready_;
139 
140     // Closure for call stack destruction.
141     grpc_closure after_call_stack_destruction_;
142   };
143 
144   void StartCall();
145   void StartCallLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
146 
147   void StartRetryTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
148   static void OnRetryTimer(void* arg, grpc_error_handle error);
149 
150   void SetHealthStatus(grpc_connectivity_state state, const char* reason);
151   void SetHealthStatusLocked(grpc_connectivity_state state, const char* reason)
152       ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
153 
154   std::string service_name_;
155   RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
156   grpc_pollset_set* interested_parties_;  // Do not own.
157   RefCountedPtr<channelz::SubchannelNode> channelz_node_;
158 
159   Mutex mu_;
160   RefCountedPtr<ConnectivityStateWatcherInterface> watcher_
161       ABSL_GUARDED_BY(mu_);
162   bool shutting_down_ ABSL_GUARDED_BY(mu_) = false;
163 
164   // The data associated with the current health check call.  It holds a ref
165   // to this HealthCheckClient object.
166   OrphanablePtr<CallState> call_state_ ABSL_GUARDED_BY(mu_);
167 
168   // Call retry state.
169   BackOff retry_backoff_ ABSL_GUARDED_BY(mu_);
170   grpc_timer retry_timer_ ABSL_GUARDED_BY(mu_);
171   grpc_closure retry_timer_callback_ ABSL_GUARDED_BY(mu_);
172   bool retry_timer_callback_pending_ ABSL_GUARDED_BY(mu_) = false;
173 };
174 
175 }  // namespace grpc_core
176 
177 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_HEALTH_HEALTH_CHECK_CLIENT_H */
178