1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 #include <vector>
21 
22 #include <grpcpp/channel.h>
23 #include <grpcpp/client_context.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/generic/generic_stub.h>
26 #include <grpcpp/impl/codegen/proto_utils.h>
27 #include <grpcpp/server.h>
28 #include <grpcpp/server_builder.h>
29 #include <grpcpp/server_context.h>
30 #include <grpcpp/support/server_interceptor.h>
31 
32 #include "absl/memory/memory.h"
33 #include "absl/strings/match.h"
34 
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/end2end/interceptors_util.h"
39 #include "test/cpp/end2end/test_service_impl.h"
40 #include "test/cpp/util/byte_buffer_proto_helper.h"
41 
42 #include <gtest/gtest.h>
43 
44 namespace grpc {
45 namespace testing {
46 namespace {
47 
48 class LoggingInterceptor : public experimental::Interceptor {
49  public:
LoggingInterceptor(experimental::ServerRpcInfo * info)50   LoggingInterceptor(experimental::ServerRpcInfo* info) {
51     info_ = info;
52 
53     // Check the method name and compare to the type
54     const char* method = info->method();
55     experimental::ServerRpcInfo::Type type = info->type();
56 
57     // Check that we use one of our standard methods with expected type.
58     // Also allow the health checking service.
59     // We accept BIDI_STREAMING for Echo in case it's an AsyncGenericService
60     // being tested (the GenericRpc test).
61     // The empty method is for the Unimplemented requests that arise
62     // when draining the CQ.
63     EXPECT_TRUE(
64         strstr(method, "/grpc.health") == method ||
65         (strcmp(method, "/grpc.testing.EchoTestService/Echo") == 0 &&
66          (type == experimental::ServerRpcInfo::Type::UNARY ||
67           type == experimental::ServerRpcInfo::Type::BIDI_STREAMING)) ||
68         (strcmp(method, "/grpc.testing.EchoTestService/RequestStream") == 0 &&
69          type == experimental::ServerRpcInfo::Type::CLIENT_STREAMING) ||
70         (strcmp(method, "/grpc.testing.EchoTestService/ResponseStream") == 0 &&
71          type == experimental::ServerRpcInfo::Type::SERVER_STREAMING) ||
72         (strcmp(method, "/grpc.testing.EchoTestService/BidiStream") == 0 &&
73          type == experimental::ServerRpcInfo::Type::BIDI_STREAMING) ||
74         strcmp(method, "/grpc.testing.EchoTestService/Unimplemented") == 0 ||
75         (strcmp(method, "") == 0 &&
76          type == experimental::ServerRpcInfo::Type::BIDI_STREAMING));
77   }
78 
Intercept(experimental::InterceptorBatchMethods * methods)79   void Intercept(experimental::InterceptorBatchMethods* methods) override {
80     if (methods->QueryInterceptionHookPoint(
81             experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) {
82       auto* map = methods->GetSendInitialMetadata();
83       // Got nothing better to do here for now
84       EXPECT_EQ(map->size(), static_cast<unsigned>(0));
85     }
86     if (methods->QueryInterceptionHookPoint(
87             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
88       EchoRequest req;
89       auto* buffer = methods->GetSerializedSendMessage();
90       auto copied_buffer = *buffer;
91       EXPECT_TRUE(
92           SerializationTraits<EchoRequest>::Deserialize(&copied_buffer, &req)
93               .ok());
94       EXPECT_TRUE(req.message().find("Hello") == 0);
95     }
96     if (methods->QueryInterceptionHookPoint(
97             experimental::InterceptionHookPoints::PRE_SEND_STATUS)) {
98       auto* map = methods->GetSendTrailingMetadata();
99       bool found = false;
100       // Check that we received the metadata as an echo
101       for (const auto& pair : *map) {
102         found = absl::StartsWith(pair.first, "testkey") &&
103                 absl::StartsWith(pair.second, "testvalue");
104         if (found) break;
105       }
106       EXPECT_EQ(found, true);
107       auto status = methods->GetSendStatus();
108       EXPECT_EQ(status.ok(), true);
109     }
110     if (methods->QueryInterceptionHookPoint(
111             experimental::InterceptionHookPoints::POST_RECV_INITIAL_METADATA)) {
112       auto* map = methods->GetRecvInitialMetadata();
113       bool found = false;
114       // Check that we received the metadata as an echo
115       for (const auto& pair : *map) {
116         found = pair.first.find("testkey") == 0 &&
117                 pair.second.find("testvalue") == 0;
118         if (found) break;
119       }
120       EXPECT_EQ(found, true);
121     }
122     if (methods->QueryInterceptionHookPoint(
123             experimental::InterceptionHookPoints::POST_RECV_MESSAGE)) {
124       EchoResponse* resp =
125           static_cast<EchoResponse*>(methods->GetRecvMessage());
126       if (resp != nullptr) {
127         EXPECT_TRUE(resp->message().find("Hello") == 0);
128       }
129     }
130     if (methods->QueryInterceptionHookPoint(
131             experimental::InterceptionHookPoints::POST_RECV_CLOSE)) {
132       // Got nothing interesting to do here
133     }
134     methods->Proceed();
135   }
136 
137  private:
138   experimental::ServerRpcInfo* info_;
139 };
140 
141 class LoggingInterceptorFactory
142     : public experimental::ServerInterceptorFactoryInterface {
143  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)144   experimental::Interceptor* CreateServerInterceptor(
145       experimental::ServerRpcInfo* info) override {
146     return new LoggingInterceptor(info);
147   }
148 };
149 
150 // Test if SendMessage function family works as expected for sync/callback apis
151 class SyncSendMessageTester : public experimental::Interceptor {
152  public:
SyncSendMessageTester(experimental::ServerRpcInfo *)153   SyncSendMessageTester(experimental::ServerRpcInfo* /*info*/) {}
154 
Intercept(experimental::InterceptorBatchMethods * methods)155   void Intercept(experimental::InterceptorBatchMethods* methods) override {
156     if (methods->QueryInterceptionHookPoint(
157             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
158       string old_msg =
159           static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
160       EXPECT_EQ(old_msg.find("Hello"), 0u);
161       new_msg_.set_message("World" + old_msg);
162       methods->ModifySendMessage(&new_msg_);
163     }
164     methods->Proceed();
165   }
166 
167  private:
168   EchoRequest new_msg_;
169 };
170 
171 class SyncSendMessageTesterFactory
172     : public experimental::ServerInterceptorFactoryInterface {
173  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)174   experimental::Interceptor* CreateServerInterceptor(
175       experimental::ServerRpcInfo* info) override {
176     return new SyncSendMessageTester(info);
177   }
178 };
179 
180 // Test if SendMessage function family works as expected for sync/callback apis
181 class SyncSendMessageVerifier : public experimental::Interceptor {
182  public:
SyncSendMessageVerifier(experimental::ServerRpcInfo *)183   SyncSendMessageVerifier(experimental::ServerRpcInfo* /*info*/) {}
184 
Intercept(experimental::InterceptorBatchMethods * methods)185   void Intercept(experimental::InterceptorBatchMethods* methods) override {
186     if (methods->QueryInterceptionHookPoint(
187             experimental::InterceptionHookPoints::PRE_SEND_MESSAGE)) {
188       // Make sure that the changes made in SyncSendMessageTester persisted
189       string old_msg =
190           static_cast<const EchoRequest*>(methods->GetSendMessage())->message();
191       EXPECT_EQ(old_msg.find("World"), 0u);
192 
193       // Remove the "World" part of the string that we added earlier
194       new_msg_.set_message(old_msg.erase(0, 5));
195       methods->ModifySendMessage(&new_msg_);
196 
197       // LoggingInterceptor verifies that changes got reverted
198     }
199     methods->Proceed();
200   }
201 
202  private:
203   EchoRequest new_msg_;
204 };
205 
206 class SyncSendMessageVerifierFactory
207     : public experimental::ServerInterceptorFactoryInterface {
208  public:
CreateServerInterceptor(experimental::ServerRpcInfo * info)209   experimental::Interceptor* CreateServerInterceptor(
210       experimental::ServerRpcInfo* info) override {
211     return new SyncSendMessageVerifier(info);
212   }
213 };
214 
MakeBidiStreamingCall(const std::shared_ptr<Channel> & channel)215 void MakeBidiStreamingCall(const std::shared_ptr<Channel>& channel) {
216   auto stub = grpc::testing::EchoTestService::NewStub(channel);
217   ClientContext ctx;
218   EchoRequest req;
219   EchoResponse resp;
220   ctx.AddMetadata("testkey", "testvalue");
221   auto stream = stub->BidiStream(&ctx);
222   for (auto i = 0; i < 10; i++) {
223     req.set_message("Hello" + std::to_string(i));
224     stream->Write(req);
225     stream->Read(&resp);
226     EXPECT_EQ(req.message(), resp.message());
227   }
228   ASSERT_TRUE(stream->WritesDone());
229   Status s = stream->Finish();
230   EXPECT_EQ(s.ok(), true);
231 }
232 
233 class ServerInterceptorsEnd2endSyncUnaryTest : public ::testing::Test {
234  protected:
ServerInterceptorsEnd2endSyncUnaryTest()235   ServerInterceptorsEnd2endSyncUnaryTest() {
236     int port = grpc_pick_unused_port_or_die();
237 
238     ServerBuilder builder;
239     server_address_ = "localhost:" + std::to_string(port);
240     builder.AddListeningPort(server_address_, InsecureServerCredentials());
241     builder.RegisterService(&service_);
242 
243     std::vector<
244         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
245         creators;
246     creators.push_back(
247         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
248             new SyncSendMessageTesterFactory()));
249     creators.push_back(
250         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
251             new SyncSendMessageVerifierFactory()));
252     creators.push_back(
253         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
254             new LoggingInterceptorFactory()));
255     // Add 20 dummy interceptor factories and null interceptor factories
256     for (auto i = 0; i < 20; i++) {
257       creators.push_back(absl::make_unique<DummyInterceptorFactory>());
258       creators.push_back(absl::make_unique<NullInterceptorFactory>());
259     }
260     builder.experimental().SetInterceptorCreators(std::move(creators));
261     server_ = builder.BuildAndStart();
262   }
263   std::string server_address_;
264   TestServiceImpl service_;
265   std::unique_ptr<Server> server_;
266 };
267 
TEST_F(ServerInterceptorsEnd2endSyncUnaryTest,UnaryTest)268 TEST_F(ServerInterceptorsEnd2endSyncUnaryTest, UnaryTest) {
269   ChannelArguments args;
270   DummyInterceptor::Reset();
271   auto channel =
272       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
273   MakeCall(channel);
274   // Make sure all 20 dummy interceptors were run
275   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
276 }
277 
278 class ServerInterceptorsEnd2endSyncStreamingTest : public ::testing::Test {
279  protected:
ServerInterceptorsEnd2endSyncStreamingTest()280   ServerInterceptorsEnd2endSyncStreamingTest() {
281     int port = grpc_pick_unused_port_or_die();
282 
283     ServerBuilder builder;
284     server_address_ = "localhost:" + std::to_string(port);
285     builder.AddListeningPort(server_address_, InsecureServerCredentials());
286     builder.RegisterService(&service_);
287 
288     std::vector<
289         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
290         creators;
291     creators.push_back(
292         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
293             new SyncSendMessageTesterFactory()));
294     creators.push_back(
295         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
296             new SyncSendMessageVerifierFactory()));
297     creators.push_back(
298         std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
299             new LoggingInterceptorFactory()));
300     for (auto i = 0; i < 20; i++) {
301       creators.push_back(absl::make_unique<DummyInterceptorFactory>());
302     }
303     builder.experimental().SetInterceptorCreators(std::move(creators));
304     server_ = builder.BuildAndStart();
305   }
306   std::string server_address_;
307   EchoTestServiceStreamingImpl service_;
308   std::unique_ptr<Server> server_;
309 };
310 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ClientStreamingTest)311 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ClientStreamingTest) {
312   ChannelArguments args;
313   DummyInterceptor::Reset();
314   auto channel =
315       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
316   MakeClientStreamingCall(channel);
317   // Make sure all 20 dummy interceptors were run
318   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
319 }
320 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,ServerStreamingTest)321 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, ServerStreamingTest) {
322   ChannelArguments args;
323   DummyInterceptor::Reset();
324   auto channel =
325       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
326   MakeServerStreamingCall(channel);
327   // Make sure all 20 dummy interceptors were run
328   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
329 }
330 
TEST_F(ServerInterceptorsEnd2endSyncStreamingTest,BidiStreamingTest)331 TEST_F(ServerInterceptorsEnd2endSyncStreamingTest, BidiStreamingTest) {
332   ChannelArguments args;
333   DummyInterceptor::Reset();
334   auto channel =
335       grpc::CreateChannel(server_address_, InsecureChannelCredentials());
336   MakeBidiStreamingCall(channel);
337   // Make sure all 20 dummy interceptors were run
338   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
339 }
340 
341 class ServerInterceptorsAsyncEnd2endTest : public ::testing::Test {};
342 
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnaryTest)343 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnaryTest) {
344   DummyInterceptor::Reset();
345   int port = grpc_pick_unused_port_or_die();
346   string server_address = "localhost:" + std::to_string(port);
347   ServerBuilder builder;
348   EchoTestService::AsyncService service;
349   builder.AddListeningPort(server_address, InsecureServerCredentials());
350   builder.RegisterService(&service);
351   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
352       creators;
353   creators.push_back(
354       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
355           new LoggingInterceptorFactory()));
356   for (auto i = 0; i < 20; i++) {
357     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
358   }
359   builder.experimental().SetInterceptorCreators(std::move(creators));
360   auto cq = builder.AddCompletionQueue();
361   auto server = builder.BuildAndStart();
362 
363   ChannelArguments args;
364   auto channel =
365       grpc::CreateChannel(server_address, InsecureChannelCredentials());
366   auto stub = grpc::testing::EchoTestService::NewStub(channel);
367 
368   EchoRequest send_request;
369   EchoRequest recv_request;
370   EchoResponse send_response;
371   EchoResponse recv_response;
372   Status recv_status;
373 
374   ClientContext cli_ctx;
375   ServerContext srv_ctx;
376   grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
377 
378   send_request.set_message("Hello");
379   cli_ctx.AddMetadata("testkey", "testvalue");
380   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
381       stub->AsyncEcho(&cli_ctx, send_request, cq.get()));
382 
383   service.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq.get(),
384                       cq.get(), tag(2));
385 
386   response_reader->Finish(&recv_response, &recv_status, tag(4));
387 
388   Verifier().Expect(2, true).Verify(cq.get());
389   EXPECT_EQ(send_request.message(), recv_request.message());
390 
391   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
392   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
393 
394   send_response.set_message(recv_request.message());
395   response_writer.Finish(send_response, Status::OK, tag(3));
396   Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
397 
398   EXPECT_EQ(send_response.message(), recv_response.message());
399   EXPECT_TRUE(recv_status.ok());
400   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
401                             "testvalue"));
402 
403   // Make sure all 20 dummy interceptors were run
404   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
405 
406   server->Shutdown();
407   cq->Shutdown();
408   void* ignored_tag;
409   bool ignored_ok;
410   while (cq->Next(&ignored_tag, &ignored_ok)) {
411   }
412   grpc_recycle_unused_port(port);
413 }
414 
TEST_F(ServerInterceptorsAsyncEnd2endTest,BidiStreamingTest)415 TEST_F(ServerInterceptorsAsyncEnd2endTest, BidiStreamingTest) {
416   DummyInterceptor::Reset();
417   int port = grpc_pick_unused_port_or_die();
418   string server_address = "localhost:" + std::to_string(port);
419   ServerBuilder builder;
420   EchoTestService::AsyncService service;
421   builder.AddListeningPort(server_address, InsecureServerCredentials());
422   builder.RegisterService(&service);
423   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
424       creators;
425   creators.push_back(
426       std::unique_ptr<experimental::ServerInterceptorFactoryInterface>(
427           new LoggingInterceptorFactory()));
428   for (auto i = 0; i < 20; i++) {
429     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
430   }
431   builder.experimental().SetInterceptorCreators(std::move(creators));
432   auto cq = builder.AddCompletionQueue();
433   auto server = builder.BuildAndStart();
434 
435   ChannelArguments args;
436   auto channel =
437       grpc::CreateChannel(server_address, InsecureChannelCredentials());
438   auto stub = grpc::testing::EchoTestService::NewStub(channel);
439 
440   EchoRequest send_request;
441   EchoRequest recv_request;
442   EchoResponse send_response;
443   EchoResponse recv_response;
444   Status recv_status;
445 
446   ClientContext cli_ctx;
447   ServerContext srv_ctx;
448   grpc::ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
449 
450   send_request.set_message("Hello");
451   cli_ctx.AddMetadata("testkey", "testvalue");
452   std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse>>
453       cli_stream(stub->AsyncBidiStream(&cli_ctx, cq.get(), tag(1)));
454 
455   service.RequestBidiStream(&srv_ctx, &srv_stream, cq.get(), cq.get(), tag(2));
456 
457   Verifier().Expect(1, true).Expect(2, true).Verify(cq.get());
458 
459   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
460   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
461 
462   cli_stream->Write(send_request, tag(3));
463   srv_stream.Read(&recv_request, tag(4));
464   Verifier().Expect(3, true).Expect(4, true).Verify(cq.get());
465   EXPECT_EQ(send_request.message(), recv_request.message());
466 
467   send_response.set_message(recv_request.message());
468   srv_stream.Write(send_response, tag(5));
469   cli_stream->Read(&recv_response, tag(6));
470   Verifier().Expect(5, true).Expect(6, true).Verify(cq.get());
471   EXPECT_EQ(send_response.message(), recv_response.message());
472 
473   cli_stream->WritesDone(tag(7));
474   srv_stream.Read(&recv_request, tag(8));
475   Verifier().Expect(7, true).Expect(8, false).Verify(cq.get());
476 
477   srv_stream.Finish(Status::OK, tag(9));
478   cli_stream->Finish(&recv_status, tag(10));
479   Verifier().Expect(9, true).Expect(10, true).Verify(cq.get());
480 
481   EXPECT_TRUE(recv_status.ok());
482   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
483                             "testvalue"));
484 
485   // Make sure all 20 dummy interceptors were run
486   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
487 
488   server->Shutdown();
489   cq->Shutdown();
490   void* ignored_tag;
491   bool ignored_ok;
492   while (cq->Next(&ignored_tag, &ignored_ok)) {
493   }
494   grpc_recycle_unused_port(port);
495 }
496 
TEST_F(ServerInterceptorsAsyncEnd2endTest,GenericRPCTest)497 TEST_F(ServerInterceptorsAsyncEnd2endTest, GenericRPCTest) {
498   DummyInterceptor::Reset();
499   int port = grpc_pick_unused_port_or_die();
500   string server_address = "localhost:" + std::to_string(port);
501   ServerBuilder builder;
502   AsyncGenericService service;
503   builder.AddListeningPort(server_address, InsecureServerCredentials());
504   builder.RegisterAsyncGenericService(&service);
505   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
506       creators;
507   creators.reserve(20);
508   for (auto i = 0; i < 20; i++) {
509     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
510   }
511   builder.experimental().SetInterceptorCreators(std::move(creators));
512   auto srv_cq = builder.AddCompletionQueue();
513   CompletionQueue cli_cq;
514   auto server = builder.BuildAndStart();
515 
516   ChannelArguments args;
517   auto channel =
518       grpc::CreateChannel(server_address, InsecureChannelCredentials());
519   GenericStub generic_stub(channel);
520 
521   const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
522   EchoRequest send_request;
523   EchoRequest recv_request;
524   EchoResponse send_response;
525   EchoResponse recv_response;
526   Status recv_status;
527 
528   ClientContext cli_ctx;
529   GenericServerContext srv_ctx;
530   GenericServerAsyncReaderWriter stream(&srv_ctx);
531 
532   // The string needs to be long enough to test heap-based slice.
533   send_request.set_message("Hello");
534   cli_ctx.AddMetadata("testkey", "testvalue");
535 
536   CompletionQueue* cq = srv_cq.get();
537   std::thread request_call([cq]() { Verifier().Expect(4, true).Verify(cq); });
538   std::unique_ptr<GenericClientAsyncReaderWriter> call =
539       generic_stub.PrepareCall(&cli_ctx, kMethodName, &cli_cq);
540   call->StartCall(tag(1));
541   Verifier().Expect(1, true).Verify(&cli_cq);
542   std::unique_ptr<ByteBuffer> send_buffer =
543       SerializeToByteBuffer(&send_request);
544   call->Write(*send_buffer, tag(2));
545   // Send ByteBuffer can be destroyed after calling Write.
546   send_buffer.reset();
547   Verifier().Expect(2, true).Verify(&cli_cq);
548   call->WritesDone(tag(3));
549   Verifier().Expect(3, true).Verify(&cli_cq);
550 
551   service.RequestCall(&srv_ctx, &stream, srv_cq.get(), srv_cq.get(), tag(4));
552 
553   request_call.join();
554   EXPECT_EQ(kMethodName, srv_ctx.method());
555   EXPECT_TRUE(CheckMetadata(srv_ctx.client_metadata(), "testkey", "testvalue"));
556   srv_ctx.AddTrailingMetadata("testkey", "testvalue");
557 
558   ByteBuffer recv_buffer;
559   stream.Read(&recv_buffer, tag(5));
560   Verifier().Expect(5, true).Verify(srv_cq.get());
561   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
562   EXPECT_EQ(send_request.message(), recv_request.message());
563 
564   send_response.set_message(recv_request.message());
565   send_buffer = SerializeToByteBuffer(&send_response);
566   stream.Write(*send_buffer, tag(6));
567   send_buffer.reset();
568   Verifier().Expect(6, true).Verify(srv_cq.get());
569 
570   stream.Finish(Status::OK, tag(7));
571   // Shutdown srv_cq before we try to get the tag back, to verify that the
572   // interception API handles completion queue shutdowns that take place before
573   // all the tags are returned
574   srv_cq->Shutdown();
575   Verifier().Expect(7, true).Verify(srv_cq.get());
576 
577   recv_buffer.Clear();
578   call->Read(&recv_buffer, tag(8));
579   Verifier().Expect(8, true).Verify(&cli_cq);
580   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
581 
582   call->Finish(&recv_status, tag(9));
583   cli_cq.Shutdown();
584   Verifier().Expect(9, true).Verify(&cli_cq);
585 
586   EXPECT_EQ(send_response.message(), recv_response.message());
587   EXPECT_TRUE(recv_status.ok());
588   EXPECT_TRUE(CheckMetadata(cli_ctx.GetServerTrailingMetadata(), "testkey",
589                             "testvalue"));
590 
591   // Make sure all 20 dummy interceptors were run
592   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
593 
594   server->Shutdown();
595   void* ignored_tag;
596   bool ignored_ok;
597   while (cli_cq.Next(&ignored_tag, &ignored_ok)) {
598   }
599   while (srv_cq->Next(&ignored_tag, &ignored_ok)) {
600   }
601   grpc_recycle_unused_port(port);
602 }
603 
TEST_F(ServerInterceptorsAsyncEnd2endTest,UnimplementedRpcTest)604 TEST_F(ServerInterceptorsAsyncEnd2endTest, UnimplementedRpcTest) {
605   DummyInterceptor::Reset();
606   int port = grpc_pick_unused_port_or_die();
607   string server_address = "localhost:" + std::to_string(port);
608   ServerBuilder builder;
609   builder.AddListeningPort(server_address, InsecureServerCredentials());
610   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
611       creators;
612   creators.reserve(20);
613   for (auto i = 0; i < 20; i++) {
614     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
615   }
616   builder.experimental().SetInterceptorCreators(std::move(creators));
617   auto cq = builder.AddCompletionQueue();
618   auto server = builder.BuildAndStart();
619 
620   ChannelArguments args;
621   std::shared_ptr<Channel> channel =
622       grpc::CreateChannel(server_address, InsecureChannelCredentials());
623   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
624   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
625   EchoRequest send_request;
626   EchoResponse recv_response;
627   Status recv_status;
628 
629   ClientContext cli_ctx;
630   send_request.set_message("Hello");
631   std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
632       stub->AsyncUnimplemented(&cli_ctx, send_request, cq.get()));
633 
634   response_reader->Finish(&recv_response, &recv_status, tag(4));
635   Verifier().Expect(4, true).Verify(cq.get());
636 
637   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
638   EXPECT_EQ("", recv_status.error_message());
639 
640   // Make sure all 20 dummy interceptors were run
641   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
642 
643   server->Shutdown();
644   cq->Shutdown();
645   void* ignored_tag;
646   bool ignored_ok;
647   while (cq->Next(&ignored_tag, &ignored_ok)) {
648   }
649   grpc_recycle_unused_port(port);
650 }
651 
652 class ServerInterceptorsSyncUnimplementedEnd2endTest : public ::testing::Test {
653 };
654 
TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest,UnimplementedRpcTest)655 TEST_F(ServerInterceptorsSyncUnimplementedEnd2endTest, UnimplementedRpcTest) {
656   DummyInterceptor::Reset();
657   int port = grpc_pick_unused_port_or_die();
658   string server_address = "localhost:" + std::to_string(port);
659   ServerBuilder builder;
660   TestServiceImpl service;
661   builder.RegisterService(&service);
662   builder.AddListeningPort(server_address, InsecureServerCredentials());
663   std::vector<std::unique_ptr<experimental::ServerInterceptorFactoryInterface>>
664       creators;
665   creators.reserve(20);
666   for (auto i = 0; i < 20; i++) {
667     creators.push_back(absl::make_unique<DummyInterceptorFactory>());
668   }
669   builder.experimental().SetInterceptorCreators(std::move(creators));
670   auto server = builder.BuildAndStart();
671 
672   ChannelArguments args;
673   std::shared_ptr<Channel> channel =
674       grpc::CreateChannel(server_address, InsecureChannelCredentials());
675   std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub;
676   stub = grpc::testing::UnimplementedEchoService::NewStub(channel);
677   EchoRequest send_request;
678   EchoResponse recv_response;
679 
680   ClientContext cli_ctx;
681   send_request.set_message("Hello");
682   Status recv_status =
683       stub->Unimplemented(&cli_ctx, send_request, &recv_response);
684 
685   EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
686   EXPECT_EQ("", recv_status.error_message());
687 
688   // Make sure all 20 dummy interceptors were run
689   EXPECT_EQ(DummyInterceptor::GetNumTimesRun(), 20);
690 
691   server->Shutdown();
692   grpc_recycle_unused_port(port);
693 }
694 
695 }  // namespace
696 }  // namespace testing
697 }  // namespace grpc
698 
main(int argc,char ** argv)699 int main(int argc, char** argv) {
700   grpc::testing::TestEnvironment env(argc, argv);
701   ::testing::InitGoogleTest(&argc, argv);
702   return RUN_ALL_TESTS();
703 }
704