1 /* 2 * 3 * Copyright 2016 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 #ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 20 #define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 21 22 #include <condition_variable> 23 #include <memory> 24 #include <mutex> 25 #include <string> 26 #include <thread> 27 28 #include <gtest/gtest.h> 29 30 #include <grpc/grpc.h> 31 #include <grpc/support/log.h> 32 #include <grpcpp/alarm.h> 33 #include <grpcpp/security/credentials.h> 34 #include <grpcpp/server_context.h> 35 36 #include "src/proto/grpc/testing/echo.grpc.pb.h" 37 #include "test/cpp/util/string_ref_helper.h" 38 39 namespace grpc { 40 namespace testing { 41 42 const int kServerDefaultResponseStreamsToSend = 3; 43 const char* const kServerResponseStreamsToSend = "server_responses_to_send"; 44 const char* const kServerTryCancelRequest = "server_try_cancel"; 45 const char* const kClientTryCancelRequest = "client_try_cancel"; 46 const char* const kDebugInfoTrailerKey = "debug-info-bin"; 47 const char* const kServerFinishAfterNReads = "server_finish_after_n_reads"; 48 const char* const kServerUseCoalescingApi = "server_use_coalescing_api"; 49 const char* const kCheckClientInitialMetadataKey = "custom_client_metadata"; 50 const char* const kCheckClientInitialMetadataVal = "Value for client metadata"; 51 52 typedef enum { 53 DO_NOT_CANCEL = 0, 54 CANCEL_BEFORE_PROCESSING, 55 CANCEL_DURING_PROCESSING, 56 CANCEL_AFTER_PROCESSING 57 } ServerTryCancelRequestPhase; 58 59 namespace internal { 60 // When echo_deadline is requested, deadline seen in the ServerContext is set in 61 // the response in seconds. 62 void MaybeEchoDeadline(ServerContextBase* context, const EchoRequest* request, 63 EchoResponse* response); 64 65 void CheckServerAuthContext(const ServerContextBase* context, 66 const std::string& expected_transport_security_type, 67 const std::string& expected_client_identity); 68 69 // Returns the number of pairs in metadata that exactly match the given 70 // key-value pair. Returns -1 if the pair wasn't found. 71 int MetadataMatchCount( 72 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 73 const std::string& key, const std::string& value); 74 75 int GetIntValueFromMetadataHelper( 76 const char* key, 77 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 78 int default_value); 79 80 int GetIntValueFromMetadata( 81 const char* key, 82 const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, 83 int default_value); 84 85 void ServerTryCancel(ServerContext* context); 86 } // namespace internal 87 88 class TestServiceSignaller { 89 public: ClientWaitUntilRpcStarted()90 void ClientWaitUntilRpcStarted() { 91 std::unique_lock<std::mutex> lock(mu_); 92 cv_rpc_started_.wait(lock, [this] { return rpc_started_; }); 93 } ServerWaitToContinue()94 void ServerWaitToContinue() { 95 std::unique_lock<std::mutex> lock(mu_); 96 cv_server_continue_.wait(lock, [this] { return server_should_continue_; }); 97 } SignalClientThatRpcStarted()98 void SignalClientThatRpcStarted() { 99 std::unique_lock<std::mutex> lock(mu_); 100 rpc_started_ = true; 101 cv_rpc_started_.notify_one(); 102 } SignalServerToContinue()103 void SignalServerToContinue() { 104 std::unique_lock<std::mutex> lock(mu_); 105 server_should_continue_ = true; 106 cv_server_continue_.notify_one(); 107 } 108 109 private: 110 std::mutex mu_; 111 std::condition_variable cv_rpc_started_; 112 bool rpc_started_ /* GUARDED_BY(mu_) */ = false; 113 std::condition_variable cv_server_continue_; 114 bool server_should_continue_ /* GUARDED_BY(mu_) */ = false; 115 }; 116 117 template <typename RpcService> 118 class TestMultipleServiceImpl : public RpcService { 119 public: TestMultipleServiceImpl()120 TestMultipleServiceImpl() : signal_client_(false), host_() {} TestMultipleServiceImpl(const std::string & host)121 explicit TestMultipleServiceImpl(const std::string& host) 122 : signal_client_(false), host_(new std::string(host)) {} 123 Echo(ServerContext * context,const EchoRequest * request,EchoResponse * response)124 Status Echo(ServerContext* context, const EchoRequest* request, 125 EchoResponse* response) { 126 if (request->has_param() && 127 request->param().server_notify_client_when_started()) { 128 signaller_.SignalClientThatRpcStarted(); 129 signaller_.ServerWaitToContinue(); 130 } 131 132 // A bit of sleep to make sure that short deadline tests fail 133 if (request->has_param() && request->param().server_sleep_us() > 0) { 134 gpr_sleep_until( 135 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), 136 gpr_time_from_micros(request->param().server_sleep_us(), 137 GPR_TIMESPAN))); 138 } 139 140 if (request->has_param() && request->param().server_die()) { 141 gpr_log(GPR_ERROR, "The request should not reach application handler."); 142 GPR_ASSERT(0); 143 } 144 if (request->has_param() && request->param().has_expected_error()) { 145 const auto& error = request->param().expected_error(); 146 return Status(static_cast<StatusCode>(error.code()), 147 error.error_message(), error.binary_error_details()); 148 } 149 int server_try_cancel = internal::GetIntValueFromMetadata( 150 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 151 if (server_try_cancel > DO_NOT_CANCEL) { 152 // Since this is a unary RPC, by the time this server handler is called, 153 // the 'request' message is already read from the client. So the scenarios 154 // in server_try_cancel don't make much sense. Just cancel the RPC as long 155 // as server_try_cancel is not DO_NOT_CANCEL 156 internal::ServerTryCancel(context); 157 return Status::CANCELLED; 158 } 159 160 response->set_message(request->message()); 161 internal::MaybeEchoDeadline(context, request, response); 162 if (host_) { 163 response->mutable_param()->set_host(*host_); 164 } 165 if (request->has_param() && request->param().client_cancel_after_us()) { 166 { 167 std::unique_lock<std::mutex> lock(mu_); 168 signal_client_ = true; 169 ++rpcs_waiting_for_client_cancel_; 170 } 171 while (!context->IsCancelled()) { 172 gpr_sleep_until(gpr_time_add( 173 gpr_now(GPR_CLOCK_REALTIME), 174 gpr_time_from_micros(request->param().client_cancel_after_us(), 175 GPR_TIMESPAN))); 176 } 177 { 178 std::unique_lock<std::mutex> lock(mu_); 179 --rpcs_waiting_for_client_cancel_; 180 } 181 return Status::CANCELLED; 182 } else if (request->has_param() && 183 request->param().server_cancel_after_us()) { 184 gpr_sleep_until(gpr_time_add( 185 gpr_now(GPR_CLOCK_REALTIME), 186 gpr_time_from_micros(request->param().server_cancel_after_us(), 187 GPR_TIMESPAN))); 188 return Status::CANCELLED; 189 } else if (!request->has_param() || 190 !request->param().skip_cancelled_check()) { 191 EXPECT_FALSE(context->IsCancelled()); 192 } 193 194 if (request->has_param() && request->param().echo_metadata_initially()) { 195 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 196 context->client_metadata(); 197 for (const auto& metadatum : client_metadata) { 198 context->AddInitialMetadata(ToString(metadatum.first), 199 ToString(metadatum.second)); 200 } 201 } 202 203 if (request->has_param() && request->param().echo_metadata()) { 204 const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata = 205 context->client_metadata(); 206 for (const auto& metadatum : client_metadata) { 207 context->AddTrailingMetadata(ToString(metadatum.first), 208 ToString(metadatum.second)); 209 } 210 // Terminate rpc with error and debug info in trailer. 211 if (request->param().debug_info().stack_entries_size() || 212 !request->param().debug_info().detail().empty()) { 213 std::string serialized_debug_info = 214 request->param().debug_info().SerializeAsString(); 215 context->AddTrailingMetadata(kDebugInfoTrailerKey, 216 serialized_debug_info); 217 return Status::CANCELLED; 218 } 219 } 220 if (request->has_param() && 221 (request->param().expected_client_identity().length() > 0 || 222 request->param().check_auth_context())) { 223 internal::CheckServerAuthContext( 224 context, request->param().expected_transport_security_type(), 225 request->param().expected_client_identity()); 226 } 227 if (request->has_param() && 228 request->param().response_message_length() > 0) { 229 response->set_message( 230 std::string(request->param().response_message_length(), '\0')); 231 } 232 if (request->has_param() && request->param().echo_peer()) { 233 response->mutable_param()->set_peer(context->peer()); 234 } 235 return Status::OK; 236 } 237 Echo1(ServerContext * context,const EchoRequest * request,EchoResponse * response)238 Status Echo1(ServerContext* context, const EchoRequest* request, 239 EchoResponse* response) { 240 return Echo(context, request, response); 241 } 242 Echo2(ServerContext * context,const EchoRequest * request,EchoResponse * response)243 Status Echo2(ServerContext* context, const EchoRequest* request, 244 EchoResponse* response) { 245 return Echo(context, request, response); 246 } 247 CheckClientInitialMetadata(ServerContext * context,const SimpleRequest *,SimpleResponse *)248 Status CheckClientInitialMetadata(ServerContext* context, 249 const SimpleRequest* /*request*/, 250 SimpleResponse* /*response*/) { 251 EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(), 252 kCheckClientInitialMetadataKey, 253 kCheckClientInitialMetadataVal), 254 1); 255 EXPECT_EQ(1u, 256 context->client_metadata().count(kCheckClientInitialMetadataKey)); 257 return Status::OK; 258 } 259 260 // Unimplemented is left unimplemented to test the returned error. 261 RequestStream(ServerContext * context,ServerReader<EchoRequest> * reader,EchoResponse * response)262 Status RequestStream(ServerContext* context, 263 ServerReader<EchoRequest>* reader, 264 EchoResponse* response) { 265 // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by 266 // the server by calling ServerContext::TryCancel() depending on the value: 267 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads 268 // any message from the client 269 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 270 // reading messages from the client 271 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads 272 // all the messages from the client 273 int server_try_cancel = internal::GetIntValueFromMetadata( 274 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 275 276 EchoRequest request; 277 response->set_message(""); 278 279 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 280 internal::ServerTryCancel(context); 281 return Status::CANCELLED; 282 } 283 284 std::thread* server_try_cancel_thd = nullptr; 285 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 286 server_try_cancel_thd = 287 new std::thread([context] { internal::ServerTryCancel(context); }); 288 } 289 290 int num_msgs_read = 0; 291 while (reader->Read(&request)) { 292 response->mutable_message()->append(request.message()); 293 } 294 gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read); 295 296 if (server_try_cancel_thd != nullptr) { 297 server_try_cancel_thd->join(); 298 delete server_try_cancel_thd; 299 return Status::CANCELLED; 300 } 301 302 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 303 internal::ServerTryCancel(context); 304 return Status::CANCELLED; 305 } 306 307 return Status::OK; 308 } 309 310 // Return 'kNumResponseStreamMsgs' messages. 311 // TODO(yangg) make it generic by adding a parameter into EchoRequest ResponseStream(ServerContext * context,const EchoRequest * request,ServerWriter<EchoResponse> * writer)312 Status ResponseStream(ServerContext* context, const EchoRequest* request, 313 ServerWriter<EchoResponse>* writer) { 314 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 315 // server by calling ServerContext::TryCancel() depending on the value: 316 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes 317 // any messages to the client 318 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 319 // writing messages to the client 320 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes 321 // all the messages to the client 322 int server_try_cancel = internal::GetIntValueFromMetadata( 323 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 324 325 int server_coalescing_api = internal::GetIntValueFromMetadata( 326 kServerUseCoalescingApi, context->client_metadata(), 0); 327 328 int server_responses_to_send = internal::GetIntValueFromMetadata( 329 kServerResponseStreamsToSend, context->client_metadata(), 330 kServerDefaultResponseStreamsToSend); 331 332 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 333 internal::ServerTryCancel(context); 334 return Status::CANCELLED; 335 } 336 337 EchoResponse response; 338 std::thread* server_try_cancel_thd = nullptr; 339 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 340 server_try_cancel_thd = 341 new std::thread([context] { internal::ServerTryCancel(context); }); 342 } 343 344 for (int i = 0; i < server_responses_to_send; i++) { 345 response.set_message(request->message() + std::to_string(i)); 346 if (i == server_responses_to_send - 1 && server_coalescing_api != 0) { 347 writer->WriteLast(response, WriteOptions()); 348 } else { 349 writer->Write(response); 350 } 351 } 352 353 if (server_try_cancel_thd != nullptr) { 354 server_try_cancel_thd->join(); 355 delete server_try_cancel_thd; 356 return Status::CANCELLED; 357 } 358 359 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 360 internal::ServerTryCancel(context); 361 return Status::CANCELLED; 362 } 363 364 return Status::OK; 365 } 366 BidiStream(ServerContext * context,ServerReaderWriter<EchoResponse,EchoRequest> * stream)367 Status BidiStream(ServerContext* context, 368 ServerReaderWriter<EchoResponse, EchoRequest>* stream) { 369 // If server_try_cancel is set in the metadata, the RPC is cancelled by the 370 // server by calling ServerContext::TryCancel() depending on the value: 371 // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/ 372 // writes any messages from/to the client 373 // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is 374 // reading/writing messages from/to the client 375 // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server 376 // reads/writes all messages from/to the client 377 int server_try_cancel = internal::GetIntValueFromMetadata( 378 kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL); 379 380 int client_try_cancel = static_cast<bool>(internal::GetIntValueFromMetadata( 381 kClientTryCancelRequest, context->client_metadata(), 0)); 382 383 EchoRequest request; 384 EchoResponse response; 385 386 if (server_try_cancel == CANCEL_BEFORE_PROCESSING) { 387 internal::ServerTryCancel(context); 388 return Status::CANCELLED; 389 } 390 391 std::thread* server_try_cancel_thd = nullptr; 392 if (server_try_cancel == CANCEL_DURING_PROCESSING) { 393 server_try_cancel_thd = 394 new std::thread([context] { internal::ServerTryCancel(context); }); 395 } 396 397 // kServerFinishAfterNReads suggests after how many reads, the server should 398 // write the last message and send status (coalesced using WriteLast) 399 int server_write_last = internal::GetIntValueFromMetadata( 400 kServerFinishAfterNReads, context->client_metadata(), 0); 401 402 int read_counts = 0; 403 while (stream->Read(&request)) { 404 read_counts++; 405 gpr_log(GPR_INFO, "recv msg %s", request.message().c_str()); 406 response.set_message(request.message()); 407 if (read_counts == server_write_last) { 408 stream->WriteLast(response, WriteOptions()); 409 break; 410 } else { 411 stream->Write(response); 412 } 413 } 414 415 if (client_try_cancel) { 416 EXPECT_TRUE(context->IsCancelled()); 417 } 418 419 if (server_try_cancel_thd != nullptr) { 420 server_try_cancel_thd->join(); 421 delete server_try_cancel_thd; 422 return Status::CANCELLED; 423 } 424 425 if (server_try_cancel == CANCEL_AFTER_PROCESSING) { 426 internal::ServerTryCancel(context); 427 return Status::CANCELLED; 428 } 429 430 return Status::OK; 431 } 432 433 // Unimplemented is left unimplemented to test the returned error. signal_client()434 bool signal_client() { 435 std::unique_lock<std::mutex> lock(mu_); 436 return signal_client_; 437 } ClientWaitUntilRpcStarted()438 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } SignalServerToContinue()439 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } RpcsWaitingForClientCancel()440 uint64_t RpcsWaitingForClientCancel() { 441 std::unique_lock<std::mutex> lock(mu_); 442 return rpcs_waiting_for_client_cancel_; 443 } 444 445 private: 446 bool signal_client_; 447 std::mutex mu_; 448 TestServiceSignaller signaller_; 449 std::unique_ptr<std::string> host_; 450 uint64_t rpcs_waiting_for_client_cancel_ = 0; 451 }; 452 453 class CallbackTestServiceImpl 454 : public ::grpc::testing::EchoTestService::CallbackService { 455 public: CallbackTestServiceImpl()456 CallbackTestServiceImpl() : signal_client_(false), host_() {} CallbackTestServiceImpl(const std::string & host)457 explicit CallbackTestServiceImpl(const std::string& host) 458 : signal_client_(false), host_(new std::string(host)) {} 459 460 ServerUnaryReactor* Echo(CallbackServerContext* context, 461 const EchoRequest* request, 462 EchoResponse* response) override; 463 464 ServerUnaryReactor* CheckClientInitialMetadata(CallbackServerContext* context, 465 const SimpleRequest*, 466 SimpleResponse*) override; 467 468 ServerReadReactor<EchoRequest>* RequestStream( 469 CallbackServerContext* context, EchoResponse* response) override; 470 471 ServerWriteReactor<EchoResponse>* ResponseStream( 472 CallbackServerContext* context, const EchoRequest* request) override; 473 474 ServerBidiReactor<EchoRequest, EchoResponse>* BidiStream( 475 CallbackServerContext* context) override; 476 477 // Unimplemented is left unimplemented to test the returned error. signal_client()478 bool signal_client() { 479 std::unique_lock<std::mutex> lock(mu_); 480 return signal_client_; 481 } ClientWaitUntilRpcStarted()482 void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); } SignalServerToContinue()483 void SignalServerToContinue() { signaller_.SignalServerToContinue(); } 484 485 private: 486 bool signal_client_; 487 std::mutex mu_; 488 TestServiceSignaller signaller_; 489 std::unique_ptr<std::string> host_; 490 }; 491 492 using TestServiceImpl = 493 TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>; 494 495 } // namespace testing 496 } // namespace grpc 497 498 #endif // GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H 499