1 /* 2 * 3 * Copyright 2015 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 20 #define GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 21 22 #include <grpc/impl/codegen/grpc_types.h> 23 #include <grpcpp/impl/codegen/byte_buffer.h> 24 #include <grpcpp/impl/codegen/call.h> 25 #include <grpcpp/impl/codegen/call_hook.h> 26 #include <grpcpp/impl/codegen/completion_queue_tag.h> 27 #include <grpcpp/impl/codegen/core_codegen_interface.h> 28 #include <grpcpp/impl/codegen/rpc_service_method.h> 29 #include <grpcpp/impl/codegen/server_context_impl.h> 30 31 namespace grpc_impl { 32 33 class Channel; 34 class CompletionQueue; 35 class ServerCompletionQueue; 36 class ServerCredentials; 37 } // namespace grpc_impl 38 namespace grpc { 39 40 class AsyncGenericService; 41 class GenericServerContext; 42 class Service; 43 44 extern CoreCodegenInterface* g_core_codegen_interface; 45 46 /// Models a gRPC server. 47 /// 48 /// Servers are configured and started via \a grpc::ServerBuilder. 49 namespace internal { 50 class ServerAsyncStreamingInterface; 51 } // namespace internal 52 53 namespace experimental { 54 class CallbackGenericService; 55 } // namespace experimental 56 57 class ServerInterface : public internal::CallHook { 58 public: ~ServerInterface()59 virtual ~ServerInterface() {} 60 61 /// \a Shutdown does the following things: 62 /// 63 /// 1. Shutdown the server: deactivate all listening ports, mark it in 64 /// "shutdown mode" so that further call Request's or incoming RPC matches 65 /// are no longer allowed. Also return all Request'ed-but-not-yet-active 66 /// calls as failed (!ok). This refers to calls that have been requested 67 /// at the server by the server-side library or application code but that 68 /// have not yet been matched to incoming RPCs from the client. Note that 69 /// this would even include default calls added automatically by the gRPC 70 /// C++ API without the user's input (e.g., "Unimplemented RPC method") 71 /// 72 /// 2. Block until all rpc method handlers invoked automatically by the sync 73 /// API finish. 74 /// 75 /// 3. If all pending calls complete (and all their operations are 76 /// retrieved by Next) before \a deadline expires, this finishes 77 /// gracefully. Otherwise, forcefully cancel all pending calls associated 78 /// with the server after \a deadline expires. In the case of the sync API, 79 /// if the RPC function for a streaming call has already been started and 80 /// takes a week to complete, the RPC function won't be forcefully 81 /// terminated (since that would leave state corrupt and incomplete) and 82 /// the method handler will just keep running (which will prevent the 83 /// server from completing the "join" operation that it needs to do at 84 /// shutdown time). 85 /// 86 /// All completion queue associated with the server (for example, for async 87 /// serving) must be shutdown *after* this method has returned: 88 /// See \a ServerBuilder::AddCompletionQueue for details. 89 /// They must also be drained (by repeated Next) after being shutdown. 90 /// 91 /// \param deadline How long to wait until pending rpcs are forcefully 92 /// terminated. 93 template <class T> Shutdown(const T & deadline)94 void Shutdown(const T& deadline) { 95 ShutdownInternal(TimePoint<T>(deadline).raw_time()); 96 } 97 98 /// Shutdown the server without a deadline and forced cancellation. 99 /// 100 /// All completion queue associated with the server (for example, for async 101 /// serving) must be shutdown *after* this method has returned: 102 /// See \a ServerBuilder::AddCompletionQueue for details. Shutdown()103 void Shutdown() { 104 ShutdownInternal( 105 g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_MONOTONIC)); 106 } 107 108 /// Block waiting for all work to complete. 109 /// 110 /// \warning The server must be either shutting down or some other thread must 111 /// call \a Shutdown for this function to ever return. 112 virtual void Wait() = 0; 113 114 protected: 115 friend class ::grpc::Service; 116 117 /// Register a service. This call does not take ownership of the service. 118 /// The service must exist for the lifetime of the Server instance. 119 virtual bool RegisterService(const grpc::string* host, Service* service) = 0; 120 121 /// Register a generic service. This call does not take ownership of the 122 /// service. The service must exist for the lifetime of the Server instance. 123 virtual void RegisterAsyncGenericService(AsyncGenericService* service) = 0; 124 125 /// NOTE: class experimental_registration_interface is not part of the public 126 /// API of this class 127 /// TODO(vjpai): Move these contents to public API when no longer experimental 128 class experimental_registration_interface { 129 public: ~experimental_registration_interface()130 virtual ~experimental_registration_interface() {} 131 /// May not be abstract since this is a post-1.0 API addition RegisterCallbackGenericService(experimental::CallbackGenericService * service)132 virtual void RegisterCallbackGenericService( 133 experimental::CallbackGenericService* service) {} 134 }; 135 136 /// NOTE: The function experimental_registration() is not stable public API. 137 /// It is a view to the experimental components of this class. It may be 138 /// changed or removed at any time. May not be abstract since this is a 139 /// post-1.0 API addition experimental_registration()140 virtual experimental_registration_interface* experimental_registration() { 141 return nullptr; 142 } 143 144 /// Tries to bind \a server to the given \a addr. 145 /// 146 /// It can be invoked multiple times. 147 /// 148 /// \param addr The address to try to bind to the server (eg, localhost:1234, 149 /// 192.168.1.1:31416, [::1]:27182, etc.). 150 /// \params creds The credentials associated with the server. 151 /// 152 /// \return bound port number on success, 0 on failure. 153 /// 154 /// \warning It's an error to call this method on an already started server. 155 virtual int AddListeningPort(const grpc::string& addr, 156 grpc_impl::ServerCredentials* creds) = 0; 157 158 /// Start the server. 159 /// 160 /// \param cqs Completion queues for handling asynchronous services. The 161 /// caller is required to keep all completion queues live until the server is 162 /// destroyed. 163 /// \param num_cqs How many completion queues does \a cqs hold. 164 virtual void Start(::grpc_impl::ServerCompletionQueue** cqs, 165 size_t num_cqs) = 0; 166 167 virtual void ShutdownInternal(gpr_timespec deadline) = 0; 168 169 virtual int max_receive_message_size() const = 0; 170 171 virtual grpc_server* server() = 0; 172 173 virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops, 174 internal::Call* call) = 0; 175 176 class BaseAsyncRequest : public internal::CompletionQueueTag { 177 public: 178 BaseAsyncRequest(ServerInterface* server, 179 ::grpc_impl::ServerContext* context, 180 internal::ServerAsyncStreamingInterface* stream, 181 ::grpc_impl::CompletionQueue* call_cq, 182 ::grpc_impl::ServerCompletionQueue* notification_cq, 183 void* tag, bool delete_on_finalize); 184 virtual ~BaseAsyncRequest(); 185 186 bool FinalizeResult(void** tag, bool* status) override; 187 188 private: 189 void ContinueFinalizeResultAfterInterception(); 190 191 protected: 192 ServerInterface* const server_; 193 ::grpc_impl::ServerContext* const context_; 194 internal::ServerAsyncStreamingInterface* const stream_; 195 ::grpc_impl::CompletionQueue* const call_cq_; 196 ::grpc_impl::ServerCompletionQueue* const notification_cq_; 197 void* const tag_; 198 const bool delete_on_finalize_; 199 grpc_call* call_; 200 internal::Call call_wrapper_; 201 internal::InterceptorBatchMethodsImpl interceptor_methods_; 202 bool done_intercepting_; 203 }; 204 205 /// RegisteredAsyncRequest is not part of the C++ API 206 class RegisteredAsyncRequest : public BaseAsyncRequest { 207 public: 208 RegisteredAsyncRequest(ServerInterface* server, 209 ::grpc_impl::ServerContext* context, 210 internal::ServerAsyncStreamingInterface* stream, 211 ::grpc_impl::CompletionQueue* call_cq, 212 ::grpc_impl::ServerCompletionQueue* notification_cq, 213 void* tag, const char* name, 214 internal::RpcMethod::RpcType type); 215 FinalizeResult(void ** tag,bool * status)216 virtual bool FinalizeResult(void** tag, bool* status) override { 217 /* If we are done intercepting, then there is nothing more for us to do */ 218 if (done_intercepting_) { 219 return BaseAsyncRequest::FinalizeResult(tag, status); 220 } 221 call_wrapper_ = ::grpc::internal::Call( 222 call_, server_, call_cq_, server_->max_receive_message_size(), 223 context_->set_server_rpc_info(name_, type_, 224 *server_->interceptor_creators())); 225 return BaseAsyncRequest::FinalizeResult(tag, status); 226 } 227 228 protected: 229 void IssueRequest(void* registered_method, grpc_byte_buffer** payload, 230 ::grpc_impl::ServerCompletionQueue* notification_cq); 231 const char* name_; 232 const internal::RpcMethod::RpcType type_; 233 }; 234 235 class NoPayloadAsyncRequest final : public RegisteredAsyncRequest { 236 public: NoPayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)237 NoPayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 238 ServerInterface* server, 239 ::grpc_impl::ServerContext* context, 240 internal::ServerAsyncStreamingInterface* stream, 241 ::grpc_impl::CompletionQueue* call_cq, 242 ::grpc_impl::ServerCompletionQueue* notification_cq, 243 void* tag) 244 : RegisteredAsyncRequest( 245 server, context, stream, call_cq, notification_cq, tag, 246 registered_method->name(), registered_method->method_type()) { 247 IssueRequest(registered_method->server_tag(), nullptr, notification_cq); 248 } 249 250 // uses RegisteredAsyncRequest::FinalizeResult 251 }; 252 253 template <class Message> 254 class PayloadAsyncRequest final : public RegisteredAsyncRequest { 255 public: PayloadAsyncRequest(internal::RpcServiceMethod * registered_method,ServerInterface * server,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag,Message * request)256 PayloadAsyncRequest(internal::RpcServiceMethod* registered_method, 257 ServerInterface* server, 258 ::grpc_impl::ServerContext* context, 259 internal::ServerAsyncStreamingInterface* stream, 260 ::grpc_impl::CompletionQueue* call_cq, 261 ::grpc_impl::ServerCompletionQueue* notification_cq, 262 void* tag, Message* request) 263 : RegisteredAsyncRequest( 264 server, context, stream, call_cq, notification_cq, tag, 265 registered_method->name(), registered_method->method_type()), 266 registered_method_(registered_method), 267 server_(server), 268 context_(context), 269 stream_(stream), 270 call_cq_(call_cq), 271 notification_cq_(notification_cq), 272 tag_(tag), 273 request_(request) { 274 IssueRequest(registered_method->server_tag(), payload_.bbuf_ptr(), 275 notification_cq); 276 } 277 ~PayloadAsyncRequest()278 ~PayloadAsyncRequest() { 279 payload_.Release(); // We do not own the payload_ 280 } 281 FinalizeResult(void ** tag,bool * status)282 bool FinalizeResult(void** tag, bool* status) override { 283 /* If we are done intercepting, then there is nothing more for us to do */ 284 if (done_intercepting_) { 285 return RegisteredAsyncRequest::FinalizeResult(tag, status); 286 } 287 if (*status) { 288 if (!payload_.Valid() || !SerializationTraits<Message>::Deserialize( 289 payload_.bbuf_ptr(), request_) 290 .ok()) { 291 // If deserialization fails, we cancel the call and instantiate 292 // a new instance of ourselves to request another call. We then 293 // return false, which prevents the call from being returned to 294 // the application. 295 g_core_codegen_interface->grpc_call_cancel_with_status( 296 call_, GRPC_STATUS_INTERNAL, "Unable to parse request", nullptr); 297 g_core_codegen_interface->grpc_call_unref(call_); 298 new PayloadAsyncRequest(registered_method_, server_, context_, 299 stream_, call_cq_, notification_cq_, tag_, 300 request_); 301 delete this; 302 return false; 303 } 304 } 305 /* Set interception point for recv message */ 306 interceptor_methods_.AddInterceptionHookPoint( 307 experimental::InterceptionHookPoints::POST_RECV_MESSAGE); 308 interceptor_methods_.SetRecvMessage(request_, nullptr); 309 return RegisteredAsyncRequest::FinalizeResult(tag, status); 310 } 311 312 private: 313 internal::RpcServiceMethod* const registered_method_; 314 ServerInterface* const server_; 315 ::grpc_impl::ServerContext* const context_; 316 internal::ServerAsyncStreamingInterface* const stream_; 317 ::grpc_impl::CompletionQueue* const call_cq_; 318 319 ::grpc_impl::ServerCompletionQueue* const notification_cq_; 320 void* const tag_; 321 Message* const request_; 322 ByteBuffer payload_; 323 }; 324 325 class GenericAsyncRequest : public BaseAsyncRequest { 326 public: 327 GenericAsyncRequest(ServerInterface* server, GenericServerContext* context, 328 internal::ServerAsyncStreamingInterface* stream, 329 ::grpc_impl::CompletionQueue* call_cq, 330 ::grpc_impl::ServerCompletionQueue* notification_cq, 331 void* tag, bool delete_on_finalize); 332 333 bool FinalizeResult(void** tag, bool* status) override; 334 335 private: 336 grpc_call_details call_details_; 337 }; 338 339 template <class Message> RequestAsyncCall(internal::RpcServiceMethod * method,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag,Message * message)340 void RequestAsyncCall(internal::RpcServiceMethod* method, 341 ::grpc_impl::ServerContext* context, 342 internal::ServerAsyncStreamingInterface* stream, 343 ::grpc_impl::CompletionQueue* call_cq, 344 ::grpc_impl::ServerCompletionQueue* notification_cq, 345 void* tag, Message* message) { 346 GPR_CODEGEN_ASSERT(method); 347 new PayloadAsyncRequest<Message>(method, this, context, stream, call_cq, 348 notification_cq, tag, message); 349 } 350 RequestAsyncCall(internal::RpcServiceMethod * method,::grpc_impl::ServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)351 void RequestAsyncCall(internal::RpcServiceMethod* method, 352 ::grpc_impl::ServerContext* context, 353 internal::ServerAsyncStreamingInterface* stream, 354 ::grpc_impl::CompletionQueue* call_cq, 355 ::grpc_impl::ServerCompletionQueue* notification_cq, 356 void* tag) { 357 GPR_CODEGEN_ASSERT(method); 358 new NoPayloadAsyncRequest(method, this, context, stream, call_cq, 359 notification_cq, tag); 360 } 361 RequestAsyncGenericCall(GenericServerContext * context,internal::ServerAsyncStreamingInterface * stream,::grpc_impl::CompletionQueue * call_cq,::grpc_impl::ServerCompletionQueue * notification_cq,void * tag)362 void RequestAsyncGenericCall( 363 GenericServerContext* context, 364 internal::ServerAsyncStreamingInterface* stream, 365 ::grpc_impl::CompletionQueue* call_cq, 366 ::grpc_impl::ServerCompletionQueue* notification_cq, void* tag) { 367 new GenericAsyncRequest(this, context, stream, call_cq, notification_cq, 368 tag, true); 369 } 370 371 private: 372 // EXPERIMENTAL 373 // Getter method for the vector of interceptor factory objects. 374 // Returns a nullptr (rather than being pure) since this is a post-1.0 method 375 // and adding a new pure method to an interface would be a breaking change 376 // (even though this is private and non-API) 377 virtual std::vector< 378 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>* interceptor_creators()379 interceptor_creators() { 380 return nullptr; 381 } 382 383 // EXPERIMENTAL 384 // A method to get the callbackable completion queue associated with this 385 // server. If the return value is nullptr, this server doesn't support 386 // callback operations. 387 // TODO(vjpai): Consider a better default like using a global CQ 388 // Returns nullptr (rather than being pure) since this is a post-1.0 method 389 // and adding a new pure method to an interface would be a breaking change 390 // (even though this is private and non-API) CallbackCQ()391 virtual ::grpc_impl::CompletionQueue* CallbackCQ() { return nullptr; } 392 }; 393 394 } // namespace grpc 395 396 #endif // GRPCPP_IMPL_CODEGEN_SERVER_INTERFACE_H 397