1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <memory>
20 #include <vector>
21
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/generic/generic_stub.h>
26 #include <grpcpp/impl/codegen/proto_utils.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/server_interceptor.h>
31
32 #include "absl/memory/memory.h"
33 #include "absl/strings/match.h"
34
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/end2end/interceptors_util.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41
42 #include <gtest/gtest.h>
43
44 namespace grpc {
45 namespace testing {
46 namespace {
47
48 class LoggingInterceptor : public experimental::Interceptor {
49 public:
LoggingInterceptor(experimental::ServerRpcInfo * info)50 LoggingInterceptor(experimental::ServerRpcInfo* info) {
51 info_ = info;
52
53 // Check the method name and compare to the type
54 const char* method = info->method();
55 experimental::ServerRpcInfo::Type type = info->type();
56
57 // Check that we use one of our standard methods with expected type.
58 // Also allow the health checking service.
59 // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
60 // being tested (the GenericRpc test).
61 // The empty method is for the Unimplemented requests that arise
62 // when draining the CQ.
63 EXPECT_TRUE(
64 strstr(method, "/grpc.health") == method ||
65 (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
66 (type == experimental::ServerRpcInfo::Type::UNARY ||
67 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
68 (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
69 type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
70 (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
71 type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
72 (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
73 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
74 strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
75 (strcmp(method, "") == 0 &&
76 type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
77 }
78
Intercept(experimental::InterceptorBatchMethods * methods)79 void Intercept(experimental::InterceptorBatchMethods* methods) override {
80 if (methods->QueryInterceptionHookPoint(
81 experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
82 auto* map = methods->GetSendInitialMetadata();
83 // Got nothing better to do here for now
84 EXPECT_EQ(map->size(), static_cast<unsigned>(0));
85 }
86 if (methods->QueryInterceptionHookPoint(
87 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
88 EchoRequest req;
89 auto* buffer = methods->GetSerializedSendMessage();
90 auto copied_buffer = *buffer;
91 EXPECT_TRUE(
92 SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
93 .ok());
94 EXPECT_TRUE(req.message().find("Hello") == 0);
95 }
96 if (methods->QueryInterceptionHookPoint(
97 experimental::InterceptionHookPoints::PRE_SEND_STATUS)) {
98 auto* map = methods->GetSendTrailingMetadata();
99 bool found = false;
100 // Check that we received the metadata as an echo
101 for (const auto& pair : *map) {
102 found = absl::StartsWith(pair.first, "testkey") &&
103 absl::StartsWith(pair.second, "testvalue");
104 if (found) break;
105 }
106 EXPECT_EQ(found, true);
107 auto status = methods->GetSendStatus();
108 EXPECT_EQ(status.ok(), true);
109 }
110 if (methods->QueryInterceptionHookPoint(
111 experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
112 auto* map = methods->GetRecvInitialMetadata();
113 bool found = false;
114 // Check that we received the metadata as an echo
115 for (const auto& pair : *map) {
116 found = pair.first.find("testkey") == 0 &&
117 pair.second.find("testvalue") == 0;
118 if (found) break;
119 }
120 EXPECT_EQ(found, true);
121 }
122 if (methods->QueryInterceptionHookPoint(
123 experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
124 EchoResponse* resp =
125 static_cast<EchoResponse*>(methods->GetRecvMessage());
126 if (resp != nullptr) {
127 EXPECT_TRUE(resp->message().find("Hello") == 0);
128 }
129 }
130 if (methods->QueryInterceptionHookPoint(
131 experimental::InterceptionHookPoints::POST_RECV_CLOSE)) {
132 // Got nothing interesting to do here
133 }
134 methods->Proceed();
135 }
136
137 private:
138 experimental::ServerRpcInfo* info_;
139 };
140
141 class LoggingInterceptorFactory
142 : public experimental::ServerInterceptorFactoryInterface {
143 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)144 experimental::Interceptor* CreateServerInterceptor(
145 experimental::ServerRpcInfo* info) override {
146 return new LoggingInterceptor(info);
147 }
148 };
149
150 // Test if SendMessage function family works as expected for sync/callback apis
151 class SyncSendMessageTester : public experimental::Interceptor {
152 public:
SyncSendMessageTester(experimental::ServerRpcInfo *)153 SyncSendMessageTester(experimental::ServerRpcInfo* /*info*/) {}
154
Intercept(experimental::InterceptorBatchMethods * methods)155 void Intercept(experimental::InterceptorBatchMethods* methods) override {
156 if (methods->QueryInterceptionHookPoint(
157 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
158 string old_msg =
159 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
160 EXPECT_EQ(old_msg.find("Hello"), 0u);
161 new_msg_.set_message("World" + old_msg);
162 methods->ModifySendMessage(&new_msg_);
163 }
164 methods->Proceed();
165 }
166
167 private:
168 EchoRequest new_msg_;
169 };
170
171 class SyncSendMessageTesterFactory
172 : public experimental::ServerInterceptorFactoryInterface {
173 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)174 experimental::Interceptor* CreateServerInterceptor(
175 experimental::ServerRpcInfo* info) override {
176 return new SyncSendMessageTester(info);
177 }
178 };
179
180 // Test if SendMessage function family works as expected for sync/callback apis
181 class SyncSendMessageVerifier : public experimental::Interceptor {
182 public:
SyncSendMessageVerifier(experimental::ServerRpcInfo *)183 SyncSendMessageVerifier(experimental::ServerRpcInfo* /*info*/) {}
184
Intercept(experimental::InterceptorBatchMethods * methods)185 void Intercept(experimental::InterceptorBatchMethods* methods) override {
186 if (methods->QueryInterceptionHookPoint(
187 experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
188 // Make sure that the changes made in SyncSendMessageTester persisted
189 string old_msg =
190 static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
191 EXPECT_EQ(old_msg.find("World"), 0u);
192
193 // Remove the "World" part of the string that we added earlier
194 new_msg_.set_message(old_msg.erase(0, 5));
195 methods->ModifySendMessage(&new_msg_);
196
197 // LoggingInterceptor verifies that changes got reverted
198 }
199 methods->Proceed();
200 }
201
202 private:
203 EchoRequest new_msg_;
204 };
205
206 class SyncSendMessageVerifierFactory
207 : public experimental::ServerInterceptorFactoryInterface {
208 public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)209 experimental::Interceptor* CreateServerInterceptor(
210 experimental::ServerRpcInfo* info) override {
211 return new SyncSendMessageVerifier(info);
212 }
213 };
214
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)215 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
216 auto stub = grpc::testing::EchoTestService::NewStub(channel);
217 ClientContext ctx;
218 EchoRequest req;
219 EchoResponse resp;
220 ctx.AddMetadata("testkey", "testvalue");
221 auto stream = stub->BidiStream(&ctx);
222 for (auto i = 0; i < 10; i++) {
223 req.set_message("Hello" + std::to_string(i));
224 stream->Write(req);
225 stream->Read(&resp);
226 EXPECT_EQ(req.message(), resp.message());
227 }
228 ASSERT_TRUE(stream->WritesDone());
229 Status s = stream->Finish();
230 EXPECT_EQ(s.ok(), true);
231 }
232
233 class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
234 protected:
ServerInterceptorsEnd2endSyncUnaryTest()235 ServerInterceptorsEnd2endSyncUnaryTest() {
236 int port = grpc_pick_unused_port_or_die();
237
238 ServerBuilder builder;
239 server_address_ = "localhost:" + std::to_string(port);
240 builder.AddListeningPort(server_address_, InsecureServerCredentials());
241 builder.RegisterService(&service_);
242
243 std::vector<
244 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
245 creators;
246 creators.push_back(
247 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
248 new SyncSendMessageTesterFactory()));
249 creators.push_back(
250 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
251 new SyncSendMessageVerifierFactory()));
252 creators.push_back(
253 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
254 new LoggingInterceptorFactory()));
255 // Add 20 dummy interceptor factories and null interceptor factories
256 for (auto i = 0; i < 20; i++) {
257 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
258 creators.push_back(absl::make_unique<NullInterceptorFactory>());
259 }
260 builder.experimental().SetInterceptorCreators(std::move(creators));
261 server_ = builder.BuildAndStart();
262 }
263 std::string server_address_;
264 TestServiceImpl service_;
265 std::unique_ptr<Server> server_;
266 };
267
TEST_F(ServerInterceptorsEnd2endSyncUnaryTest,UnaryTest)268 TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
269 ChannelArguments args;
270 DummyInterceptor::Reset();
271 auto channel =
272 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
273 MakeCall(channel);
274 // Make sure all 20 dummy interceptors were run
275 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
276 }
277
278 class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
279 protected:
ServerInterceptorsEnd2endSyncStreamingTest()280 ServerInterceptorsEnd2endSyncStreamingTest() {
281 int port = grpc_pick_unused_port_or_die();
282
283 ServerBuilder builder;
284 server_address_ = "localhost:" + std::to_string(port);
285 builder.AddListeningPort(server_address_, InsecureServerCredentials());
286 builder.RegisterService(&service_);
287
288 std::vector<
289 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
290 creators;
291 creators.push_back(
292 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
293 new SyncSendMessageTesterFactory()));
294 creators.push_back(
295 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
296 new SyncSendMessageVerifierFactory()));
297 creators.push_back(
298 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
299 new LoggingInterceptorFactory()));
300 for (auto i = 0; i < 20; i++) {
301 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
302 }
303 builder.experimental().SetInterceptorCreators(std::move(creators));
304 server_ = builder.BuildAndStart();
305 }
306 std::string server_address_;
307 EchoTestServiceStreamingImpl service_;
308 std::unique_ptr<Server> server_;
309 };
310
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ClientStreamingTest)311 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
312 ChannelArguments args;
313 DummyInterceptor::Reset();
314 auto channel =
315 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
316 MakeClientStreamingCall(channel);
317 // Make sure all 20 dummy interceptors were run
318 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
319 }
320
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ServerStreamingTest)321 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
322 ChannelArguments args;
323 DummyInterceptor::Reset();
324 auto channel =
325 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
326 MakeServerStreamingCall(channel);
327 // Make sure all 20 dummy interceptors were run
328 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
329 }
330
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,BidiStreamingTest)331 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
332 ChannelArguments args;
333 DummyInterceptor::Reset();
334 auto channel =
335 grpc::CreateChannel(server_address_, InsecureChannelCredentials());
336 MakeBidiStreamingCall(channel);
337 // Make sure all 20 dummy interceptors were run
338 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
339 }
340
341 class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {};
342
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnaryTest)343 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) {
344 DummyInterceptor::Reset();
345 int port = grpc_pick_unused_port_or_die();
346 string server_address = "localhost:" + std::to_string(port);
347 ServerBuilder builder;
348 EchoTestService::AsyncService service;
349 builder.AddListeningPort(server_address, InsecureServerCredentials());
350 builder.RegisterService(&service);
351 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
352 creators;
353 creators.push_back(
354 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
355 new LoggingInterceptorFactory()));
356 for (auto i = 0; i < 20; i++) {
357 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
358 }
359 builder.experimental().SetInterceptorCreators(std::move(creators));
360 auto cq = builder.AddCompletionQueue();
361 auto server = builder.BuildAndStart();
362
363 ChannelArguments args;
364 auto channel =
365 grpc::CreateChannel(server_address, InsecureChannelCredentials());
366 auto stub = grpc::testing::EchoTestService::NewStub(channel);
367
368 EchoRequest send_request;
369 EchoRequest recv_request;
370 EchoResponse send_response;
371 EchoResponse recv_response;
372 Status recv_status;
373
374 ClientContext cli_ctx;
375 ServerContext srv_ctx;
376 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
377
378 send_request.set_message("Hello");
379 cli_ctx.AddMetadata("testkey", "testvalue");
380 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
381 stub->AsyncEcho(&cli_ctx, send_request, cq.get()));
382
383 service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(),
384 cq.get(), tag(2));
385
386 response_reader->Finish(&recv_response, &recv_status, tag(4));
387
388 Verifier().Expect(2, true).Verify(cq.get());
389 EXPECT_EQ(send_request.message(), recv_request.message());
390
391 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
392 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
393
394 send_response.set_message(recv_request.message());
395 response_writer.Finish(send_response, Status::OK, tag(3));
396 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
397
398 EXPECT_EQ(send_response.message(), recv_response.message());
399 EXPECT_TRUE(recv_status.ok());
400 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
401 "testvalue"));
402
403 // Make sure all 20 dummy interceptors were run
404 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
405
406 server->Shutdown();
407 cq->Shutdown();
408 void* ignored_tag;
409 bool ignored_ok;
410 while (cq->Next(&ignored_tag, &ignored_ok)) {
411 }
412 grpc_recycle_unused_port(port);
413 }
414
TEST_F(ServerInterceptorsAsyncEnd2endTest,BidiStreamingTest)415 TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
416 DummyInterceptor::Reset();
417 int port = grpc_pick_unused_port_or_die();
418 string server_address = "localhost:" + std::to_string(port);
419 ServerBuilder builder;
420 EchoTestService::AsyncService service;
421 builder.AddListeningPort(server_address, InsecureServerCredentials());
422 builder.RegisterService(&service);
423 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
424 creators;
425 creators.push_back(
426 std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
427 new LoggingInterceptorFactory()));
428 for (auto i = 0; i < 20; i++) {
429 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
430 }
431 builder.experimental().SetInterceptorCreators(std::move(creators));
432 auto cq = builder.AddCompletionQueue();
433 auto server = builder.BuildAndStart();
434
435 ChannelArguments args;
436 auto channel =
437 grpc::CreateChannel(server_address, InsecureChannelCredentials());
438 auto stub = grpc::testing::EchoTestService::NewStub(channel);
439
440 EchoRequest send_request;
441 EchoRequest recv_request;
442 EchoResponse send_response;
443 EchoResponse recv_response;
444 Status recv_status;
445
446 ClientContext cli_ctx;
447 ServerContext srv_ctx;
448 grpc::ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
449
450 send_request.set_message("Hello");
451 cli_ctx.AddMetadata("testkey", "testvalue");
452 std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
453 cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1)));
454
455 service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2));
456
457 Verifier().Expect(1, true).Expect(2, true).Verify(cq.get());
458
459 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
460 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
461
462 cli_stream->Write(send_request, tag(3));
463 srv_stream.Read(&recv_request, tag(4));
464 Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
465 EXPECT_EQ(send_request.message(), recv_request.message());
466
467 send_response.set_message(recv_request.message());
468 srv_stream.Write(send_response, tag(5));
469 cli_stream->Read(&recv_response, tag(6));
470 Verifier().Expect(5, true).Expect(6, true).Verify(cq.get());
471 EXPECT_EQ(send_response.message(), recv_response.message());
472
473 cli_stream->WritesDone(tag(7));
474 srv_stream.Read(&recv_request, tag(8));
475 Verifier().Expect(7, true).Expect(8, false).Verify(cq.get());
476
477 srv_stream.Finish(Status::OK, tag(9));
478 cli_stream->Finish(&recv_status, tag(10));
479 Verifier().Expect(9, true).Expect(10, true).Verify(cq.get());
480
481 EXPECT_TRUE(recv_status.ok());
482 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
483 "testvalue"));
484
485 // Make sure all 20 dummy interceptors were run
486 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
487
488 server->Shutdown();
489 cq->Shutdown();
490 void* ignored_tag;
491 bool ignored_ok;
492 while (cq->Next(&ignored_tag, &ignored_ok)) {
493 }
494 grpc_recycle_unused_port(port);
495 }
496
TEST_F(ServerInterceptorsAsyncEnd2endTest,GenericRPCTest)497 TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
498 DummyInterceptor::Reset();
499 int port = grpc_pick_unused_port_or_die();
500 string server_address = "localhost:" + std::to_string(port);
501 ServerBuilder builder;
502 AsyncGenericService service;
503 builder.AddListeningPort(server_address, InsecureServerCredentials());
504 builder.RegisterAsyncGenericService(&service);
505 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
506 creators;
507 creators.reserve(20);
508 for (auto i = 0; i < 20; i++) {
509 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
510 }
511 builder.experimental().SetInterceptorCreators(std::move(creators));
512 auto srv_cq = builder.AddCompletionQueue();
513 CompletionQueue cli_cq;
514 auto server = builder.BuildAndStart();
515
516 ChannelArguments args;
517 auto channel =
518 grpc::CreateChannel(server_address, InsecureChannelCredentials());
519 GenericStub generic_stub(channel);
520
521 const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
522 EchoRequest send_request;
523 EchoRequest recv_request;
524 EchoResponse send_response;
525 EchoResponse recv_response;
526 Status recv_status;
527
528 ClientContext cli_ctx;
529 GenericServerContext srv_ctx;
530 GenericServerAsyncReaderWriter stream(&srv_ctx);
531
532 // The string needs to be long enough to test heap-based slice.
533 send_request.set_message("Hello");
534 cli_ctx.AddMetadata("testkey", "testvalue");
535
536 CompletionQueue* cq = srv_cq.get();
537 std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
538 std::unique_ptr<GenericClientAsyncReaderWriter> call =
539 generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
540 call->StartCall(tag(1));
541 Verifier().Expect(1, true).Verify(&cli_cq);
542 std::unique_ptr<ByteBuffer> send_buffer =
543 SerializeToByteBuffer(&send_request);
544 call->Write(*send_buffer, tag(2));
545 // Send ByteBuffer can be destroyed after calling Write.
546 send_buffer.reset();
547 Verifier().Expect(2, true).Verify(&cli_cq);
548 call->WritesDone(tag(3));
549 Verifier().Expect(3, true).Verify(&cli_cq);
550
551 service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
552
553 request_call.join();
554 EXPECT_EQ(kMethodName, srv_ctx.method());
555 EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
556 srv_ctx.AddTrailingMetadata("testkey", "testvalue");
557
558 ByteBuffer recv_buffer;
559 stream.Read(&recv_buffer, tag(5));
560 Verifier().Expect(5, true).Verify(srv_cq.get());
561 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
562 EXPECT_EQ(send_request.message(), recv_request.message());
563
564 send_response.set_message(recv_request.message());
565 send_buffer = SerializeToByteBuffer(&send_response);
566 stream.Write(*send_buffer, tag(6));
567 send_buffer.reset();
568 Verifier().Expect(6, true).Verify(srv_cq.get());
569
570 stream.Finish(Status::OK, tag(7));
571 // Shutdown srv_cq before we try to get the tag back, to verify that the
572 // interception API handles completion queue shutdowns that take place before
573 // all the tags are returned
574 srv_cq->Shutdown();
575 Verifier().Expect(7, true).Verify(srv_cq.get());
576
577 recv_buffer.Clear();
578 call->Read(&recv_buffer, tag(8));
579 Verifier().Expect(8, true).Verify(&cli_cq);
580 EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
581
582 call->Finish(&recv_status, tag(9));
583 cli_cq.Shutdown();
584 Verifier().Expect(9, true).Verify(&cli_cq);
585
586 EXPECT_EQ(send_response.message(), recv_response.message());
587 EXPECT_TRUE(recv_status.ok());
588 EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
589 "testvalue"));
590
591 // Make sure all 20 dummy interceptors were run
592 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
593
594 server->Shutdown();
595 void* ignored_tag;
596 bool ignored_ok;
597 while (cli_cq.Next(&ignored_tag, &ignored_ok)) {
598 }
599 while (srv_cq->Next(&ignored_tag, &ignored_ok)) {
600 }
601 grpc_recycle_unused_port(port);
602 }
603
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnimplementedRpcTest)604 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
605 DummyInterceptor::Reset();
606 int port = grpc_pick_unused_port_or_die();
607 string server_address = "localhost:" + std::to_string(port);
608 ServerBuilder builder;
609 builder.AddListeningPort(server_address, InsecureServerCredentials());
610 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
611 creators;
612 creators.reserve(20);
613 for (auto i = 0; i < 20; i++) {
614 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
615 }
616 builder.experimental().SetInterceptorCreators(std::move(creators));
617 auto cq = builder.AddCompletionQueue();
618 auto server = builder.BuildAndStart();
619
620 ChannelArguments args;
621 std::shared_ptr<Channel> channel =
622 grpc::CreateChannel(server_address, InsecureChannelCredentials());
623 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
624 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
625 EchoRequest send_request;
626 EchoResponse recv_response;
627 Status recv_status;
628
629 ClientContext cli_ctx;
630 send_request.set_message("Hello");
631 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
632 stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
633
634 response_reader->Finish(&recv_response, &recv_status, tag(4));
635 Verifier().Expect(4, true).Verify(cq.get());
636
637 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
638 EXPECT_EQ("", recv_status.error_message());
639
640 // Make sure all 20 dummy interceptors were run
641 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
642
643 server->Shutdown();
644 cq->Shutdown();
645 void* ignored_tag;
646 bool ignored_ok;
647 while (cq->Next(&ignored_tag, &ignored_ok)) {
648 }
649 grpc_recycle_unused_port(port);
650 }
651
652 class ServerInterceptorsSyncUnimplementedEnd2endTest : public ::testing::Test {
653 };
654
TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest,UnimplementedRpcTest)655 TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) {
656 DummyInterceptor::Reset();
657 int port = grpc_pick_unused_port_or_die();
658 string server_address = "localhost:" + std::to_string(port);
659 ServerBuilder builder;
660 TestServiceImpl service;
661 builder.RegisterService(&service);
662 builder.AddListeningPort(server_address, InsecureServerCredentials());
663 std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
664 creators;
665 creators.reserve(20);
666 for (auto i = 0; i < 20; i++) {
667 creators.push_back(absl::make_unique<DummyInterceptorFactory>());
668 }
669 builder.experimental().SetInterceptorCreators(std::move(creators));
670 auto server = builder.BuildAndStart();
671
672 ChannelArguments args;
673 std::shared_ptr<Channel> channel =
674 grpc::CreateChannel(server_address, InsecureChannelCredentials());
675 std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
676 stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
677 EchoRequest send_request;
678 EchoResponse recv_response;
679
680 ClientContext cli_ctx;
681 send_request.set_message("Hello");
682 Status recv_status =
683 stub->Unimplemented(&cli_ctx, send_request, &recv_response);
684
685 EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
686 EXPECT_EQ("", recv_status.error_message());
687
688 // Make sure all 20 dummy interceptors were run
689 EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
690
691 server->Shutdown();
692 grpc_recycle_unused_port(port);
693 }
694
695 } // namespace
696 } // namespace testing
697 } // namespace grpc
698
main(int argc,char ** argv)699 int main(int argc, char** argv) {
700 grpc::testing::TestEnvironment env(argc, argv);
701 ::testing::InitGoogleTest(&argc, argv);
702 return RUN_ALL_TESTS();
703 }
704