1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 21 22 // IWYU pragma: private 23 24 #include <grpc/impl/codegen/port_platform.h> 25 26 #include <grpc/impl/codegen/grpc_types.h> 27 #include <grpcpp/impl/codegen/byte_buffer.h> 28 #include <grpcpp/impl/codegen/call.h> 29 #include <grpcpp/impl/codegen/call_hook.h> 30 #include <grpcpp/impl/codegen/completion_queue_tag.h> 31 #include <grpcpp/impl/codegen/core_codegen_interface.h> 32 #include <grpcpp/impl/codegen/interceptor_common.h> 33 #include <grpcpp/impl/codegen/rpc_service_method.h> 34 #include <grpcpp/impl/codegen/server_context.h> 35 36 namespace grpc { 37 38 class AsyncGenericService; 39 class Channel; 40 class CompletionQueue; 41 class GenericServerContext; 42 class ServerCompletionQueue; 43 class ServerCredentials; 44 class Service; 45 46 extern CoreCodegenInterface* g_core_codegen_interface; 47 48 /// Models a gRPC server. 49 /// 50 /// Servers are configured and started via \a grpc::ServerBuilder. 51 namespace internal { 52 class ServerAsyncStreamingInterface; 53 } // namespace internal 54 55 class CallbackGenericService; 56 57 namespace experimental { 58 class ServerInterceptorFactoryInterface; 59 } // namespace experimental 60 61 class ServerInterface : public internal::CallHook { 62 public: ~ServerInterface()63 ~ServerInterface() override {} 64 65 /// \a Shutdown does the following things: 66 /// 67 /// 1. Shutdown the server: deactivate all listening ports, mark it in 68 /// "shutdown mode" so that further call Request's or incoming RPC matches 69 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 70 /// calls as failed (!ok). This refers to calls that have been requested 71 /// at the server by the server-side library or application code but that 72 /// have not yet been matched to incoming RPCs from the client. Note that 73 /// this would even include default calls added automatically by the gRPC 74 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 75 /// 76 /// 2. Block until all rpc method handlers invoked automatically by the sync 77 /// API finish. 78 /// 79 /// 3. If all pending calls complete (and all their operations are 80 /// retrieved by Next) before \a deadline expires, this finishes 81 /// gracefully. Otherwise, forcefully cancel all pending calls associated 82 /// with the server after \a deadline expires. In the case of the sync API, 83 /// if the RPC function for a streaming call has already been started and 84 /// takes a week to complete, the RPC function won't be forcefully 85 /// terminated (since that would leave state corrupt and incomplete) and 86 /// the method handler will just keep running (which will prevent the 87 /// server from completing the "join" operation that it needs to do at 88 /// shutdown time). 89 /// 90 /// All completion queue associated with the server (for example, for async 91 /// serving) must be shutdown *after* this method has returned: 92 /// See \a ServerBuilder::AddCompletionQueue for details. 93 /// They must also be drained (by repeated Next) after being shutdown. 94 /// 95 /// \param deadline How long to wait until pending rpcs are forcefully 96 /// terminated. 97 template <class T> Shutdown(const T & deadline)98 void Shutdown(const T& deadline) { 99 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 100 } 101 102 /// Shutdown the server without a deadline and forced cancellation. 103 /// 104 /// All completion queue associated with the server (for example, for async 105 /// serving) must be shutdown *after* this method has returned: 106 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()107 void Shutdown() { 108 ShutdownInternal( 109 g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC)); 110 } 111 112 /// Block waiting for all work to complete. 113 /// 114 /// \warning The server must be either shutting down or some other thread must 115 /// call \a Shutdown for this function to ever return. 116 virtual void Wait() = 0; 117 118 protected: 119 friend class ::grpc::Service; 120 121 /// Register a service. This call does not take ownership of the service. 122 /// The service must exist for the lifetime of the Server instance. 123 virtual bool RegisterService(const std::string* host, Service* service) = 0; 124 125 /// Register a generic service. This call does not take ownership of the 126 /// service. The service must exist for the lifetime of the Server instance. 127 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 128 129 /// Register a callback generic service. This call does not take ownership of 130 /// the service. The service must exist for the lifetime of the Server 131 /// instance. May not be abstract since this is a post-1.0 API addition. 132 RegisterCallbackGenericService(CallbackGenericService *)133 virtual void RegisterCallbackGenericService(CallbackGenericService* 134 /*service*/) {} 135 136 /// Tries to bind \a server to the given \a addr. 137 /// 138 /// It can be invoked multiple times. 139 /// 140 /// \param addr The address to try to bind to the server (eg, localhost:1234, 141 /// 192.168.1.1:31416, [::1]:27182, etc.). 142 /// \params creds The credentials associated with the server. 143 /// 144 /// \return bound port number on success, 0 on failure. 145 /// 146 /// \warning It's an error to call this method on an already started server. 147 virtual int AddListeningPort(const std::string& addr, 148 ServerCredentials* creds) = 0; 149 150 /// Start the server. 151 /// 152 /// \param cqs Completion queues for handling asynchronous services. The 153 /// caller is required to keep all completion queues live until the server is 154 /// destroyed. 155 /// \param num_cqs How many completion queues does \a cqs hold. 156 virtual void Start(::grpc::ServerCompletionQueue** cqs, size_t num_cqs) = 0; 157 158 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 159 160 virtual int max_receive_message_size() const = 0; 161 162 virtual grpc_server* server() = 0; 163 164 void PerformOpsOnCall(internal::CallOpSetInterface* ops, 165 internal::Call* call) override = 0; 166 167 class BaseAsyncRequest : public internal::CompletionQueueTag { 168 public: 169 BaseAsyncRequest(ServerInterface* server, ::grpc::ServerContext* context, 170 internal::ServerAsyncStreamingInterface* stream, 171 ::grpc::CompletionQueue* call_cq, 172 ::grpc::ServerCompletionQueue* notification_cq, void* tag, 173 bool delete_on_finalize); 174 ~BaseAsyncRequest() override; 175 176 bool FinalizeResult(void** tag, bool* status) override; 177 178 private: 179 void ContinueFinalizeResultAfterInterception(); 180 181 protected: 182 ServerInterface* const server_; 183 ::grpc::ServerContext* const context_; 184 internal::ServerAsyncStreamingInterface* const stream_; 185 ::grpc::CompletionQueue* const call_cq_; 186 ::grpc::ServerCompletionQueue* const notification_cq_; 187 void* const tag_; 188 const bool delete_on_finalize_; 189 grpc_call* call_; 190 internal::Call call_wrapper_; 191 internal::InterceptorBatchMethodsImpl interceptor_methods_; 192 bool done_intercepting_; 193 }; 194 195 /// RegisteredAsyncRequest is not part of the C++ API 196 class RegisteredAsyncRequest : public BaseAsyncRequest { 197 public: 198 RegisteredAsyncRequest(ServerInterface* server, 199 ::grpc::ServerContext* context, 200 internal::ServerAsyncStreamingInterface* stream, 201 ::grpc::CompletionQueue* call_cq, 202 ::grpc::ServerCompletionQueue* notification_cq, 203 void* tag, const char* name, 204 internal::RpcMethod::RpcType type); 205 FinalizeResult(void ** tag,bool * status)206 bool FinalizeResult(void** tag, bool* status) override { 207 /* If we are done intercepting, then there is nothing more for us to do */ 208 if (done_intercepting_) { 209 return BaseAsyncRequest::FinalizeResult(tag, status); 210 } 211 call_wrapper_ = ::grpc::internal::Call( 212 call_, server_, call_cq_, server_->max_receive_message_size(), 213 context_->set_server_rpc_info(name_, type_, 214 *server_->interceptor_creators())); 215 return BaseAsyncRequest::FinalizeResult(tag, status); 216 } 217 218 protected: 219 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 220 ::grpc::ServerCompletionQueue* notification_cq); 221 const char* name_; 222 const internal::RpcMethod::RpcType type_; 223 }; 224 225 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 226 public: NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)227 NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 228 ServerInterface* server, 229 ::grpc::ServerContext* context, 230 internal::ServerAsyncStreamingInterface* stream, 231 ::grpc::CompletionQueue* call_cq, 232 ::grpc::ServerCompletionQueue* notification_cq, 233 void* tag) 234 : RegisteredAsyncRequest( 235 server, context, stream, call_cq, notification_cq, tag, 236 registered_method->name(), registered_method->method_type()) { 237 IssueRequest(registered_method->server_tag(), nullptr, notification_cq); 238 } 239 240 // uses RegisteredAsyncRequest::FinalizeResult 241 }; 242 243 template <class Message> 244 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 245 public: PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag,Message * request)246 PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 247 ServerInterface* server, ::grpc::ServerContext* context, 248 internal::ServerAsyncStreamingInterface* stream, 249 ::grpc::CompletionQueue* call_cq, 250 ::grpc::ServerCompletionQueue* notification_cq, 251 void* tag, Message* request) 252 : RegisteredAsyncRequest( 253 server, context, stream, call_cq, notification_cq, tag, 254 registered_method->name(), registered_method->method_type()), 255 registered_method_(registered_method), 256 request_(request) { 257 IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), 258 notification_cq); 259 } 260 ~PayloadAsyncRequest()261 ~PayloadAsyncRequest() override { 262 payload_.Release(); // We do not own the payload_ 263 } 264 FinalizeResult(void ** tag,bool * status)265 bool FinalizeResult(void** tag, bool* status) override { 266 /* If we are done intercepting, then there is nothing more for us to do */ 267 if (done_intercepting_) { 268 return RegisteredAsyncRequest::FinalizeResult(tag, status); 269 } 270 if (*status) { 271 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 272 payload_.bbuf_ptr(), request_) 273 .ok()) { 274 // If deserialization fails, we cancel the call and instantiate 275 // a new instance of ourselves to request another call. We then 276 // return false, which prevents the call from being returned to 277 // the application. 278 g_core_codegen_interface->grpc_call_cancel_with_status( 279 call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr); 280 g_core_codegen_interface->grpc_call_unref(call_); 281 new PayloadAsyncRequest(registered_method_, server_, context_, 282 stream_, call_cq_, notification_cq_, tag_, 283 request_); 284 delete this; 285 return false; 286 } 287 } 288 /* Set interception point for recv message */ 289 interceptor_methods_.AddInterceptionHookPoint( 290 experimental::InterceptionHookPoints::POST_RECV_MESSAGE); 291 interceptor_methods_.SetRecvMessage(request_, nullptr); 292 return RegisteredAsyncRequest::FinalizeResult(tag, status); 293 } 294 295 private: 296 internal::RpcServiceMethod* const registered_method_; 297 Message* const request_; 298 ByteBuffer payload_; 299 }; 300 301 class GenericAsyncRequest : public BaseAsyncRequest { 302 public: 303 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 304 internal::ServerAsyncStreamingInterface* stream, 305 ::grpc::CompletionQueue* call_cq, 306 ::grpc::ServerCompletionQueue* notification_cq, 307 void* tag, bool delete_on_finalize); 308 309 bool FinalizeResult(void** tag, bool* status) override; 310 311 private: 312 grpc_call_details call_details_; 313 }; 314 315 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag,Message * message)316 void RequestAsyncCall(internal::RpcServiceMethod* method, 317 ::grpc::ServerContext* context, 318 internal::ServerAsyncStreamingInterface* stream, 319 ::grpc::CompletionQueue* call_cq, 320 ::grpc::ServerCompletionQueue* notification_cq, 321 void* tag, Message* message) { 322 GPR_CODEGEN_ASSERT(method); 323 new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, 324 notification_cq, tag, message); 325 } 326 RequestAsyncCall(internal::RpcServiceMethod * method,::grpc::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)327 void RequestAsyncCall(internal::RpcServiceMethod* method, 328 ::grpc::ServerContext* context, 329 internal::ServerAsyncStreamingInterface* stream, 330 ::grpc::CompletionQueue* call_cq, 331 ::grpc::ServerCompletionQueue* notification_cq, 332 void* tag) { 333 GPR_CODEGEN_ASSERT(method); 334 new NoPayloadAsyncRequest(method, this, context, stream, call_cq, 335 notification_cq, tag); 336 } 337 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc::CompletionQueue * call_cq,::grpc::ServerCompletionQueue * notification_cq,void * tag)338 void RequestAsyncGenericCall(GenericServerContext* context, 339 internal::ServerAsyncStreamingInterface* stream, 340 ::grpc::CompletionQueue* call_cq, 341 ::grpc::ServerCompletionQueue* notification_cq, 342 void* tag) { 343 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 344 tag, true); 345 } 346 347 private: 348 // EXPERIMENTAL 349 // Getter method for the vector of interceptor factory objects. 350 // Returns a nullptr (rather than being pure) since this is a post-1.0 method 351 // and adding a new pure method to an interface would be a breaking change 352 // (even though this is private and non-API) 353 virtual std::vector< 354 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()355 interceptor_creators() { 356 return nullptr; 357 } 358 359 // A method to get the callbackable completion queue associated with this 360 // server. If the return value is nullptr, this server doesn't support 361 // callback operations. 362 // TODO(vjpai): Consider a better default like using a global CQ 363 // Returns nullptr (rather than being pure) since this is a post-1.0 method 364 // and adding a new pure method to an interface would be a breaking change 365 // (even though this is private and non-API) CallbackCQ()366 virtual ::grpc::CompletionQueue* CallbackCQ() { return nullptr; } 367 }; 368 369 } // namespace grpc 370 371 #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 372