1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <memory>
20 
21 #include "absl/memory/memory.h"
22 
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/server.h>
27 #include <grpcpp/server_builder.h>
28 #include <grpcpp/server_context.h>
29 
30 #include "src/core/lib/gpr/tls.h"
31 #include "src/core/lib/iomgr/port.h"
32 #include "src/proto/grpc/testing/echo.grpc.pb.h"
33 #include "test/core/util/port.h"
34 #include "test/core/util/test_config.h"
35 
36 #ifdef GRPC_POSIX_SOCKET
37 #include "src/core/lib/iomgr/ev_posix.h"
38 #endif  // GRPC_POSIX_SOCKET
39 
40 #include <gtest/gtest.h>
41 
42 #ifdef GRPC_POSIX_SOCKET
43 // Thread-local variable to so that only polls from this test assert
44 // non-blocking (not polls from resolver, timer thread, etc), and only when the
45 // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for
46 // picking a port or other reasons).
47 static GPR_THREAD_LOCAL(bool) g_is_nonblocking_poll;
48 
49 namespace {
50 
maybe_assert_non_blocking_poll(struct pollfd * pfds,nfds_t nfds,int timeout)51 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
52                                    int timeout) {
53   // Only assert that this poll should have zero timeout if we're in the
54   // middle of a zero-timeout CQ Next.
55   if (g_is_nonblocking_poll) {
56     GPR_ASSERT(timeout == 0);
57   }
58   return poll(pfds, nfds, timeout);
59 }
60 
61 }  // namespace
62 
63 namespace grpc {
64 namespace testing {
65 namespace {
66 
tag(int i)67 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
detag(void * p)68 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
69 
70 class NonblockingTest : public ::testing::Test {
71  protected:
NonblockingTest()72   NonblockingTest() {}
73 
SetUp()74   void SetUp() override {
75     port_ = grpc_pick_unused_port_or_die();
76     server_address_ << "localhost:" << port_;
77 
78     // Setup server
79     BuildAndStartServer();
80   }
81 
LoopForTag(void ** tag,bool * ok)82   bool LoopForTag(void** tag, bool* ok) {
83     // Temporarily set the thread-local nonblocking poll flag so that the polls
84     // caused by this loop are indeed sent by the library with zero timeout.
85     bool orig_val = g_is_nonblocking_poll;
86     g_is_nonblocking_poll = true;
87     for (;;) {
88       auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
89       if (r == CompletionQueue::SHUTDOWN) {
90         g_is_nonblocking_poll = orig_val;
91         return false;
92       } else if (r == CompletionQueue::GOT_EVENT) {
93         g_is_nonblocking_poll = orig_val;
94         return true;
95       }
96     }
97   }
98 
TearDown()99   void TearDown() override {
100     server_->Shutdown();
101     void* ignored_tag;
102     bool ignored_ok;
103     cq_->Shutdown();
104     while (LoopForTag(&ignored_tag, &ignored_ok)) {
105     }
106     stub_.reset();
107     grpc_recycle_unused_port(port_);
108   }
109 
BuildAndStartServer()110   void BuildAndStartServer() {
111     ServerBuilder builder;
112     builder.AddListeningPort(server_address_.str(),
113                              grpc::InsecureServerCredentials());
114     service_ =
115         absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
116     builder.RegisterService(service_.get());
117     cq_ = builder.AddCompletionQueue();
118     server_ = builder.BuildAndStart();
119   }
120 
ResetStub()121   void ResetStub() {
122     std::shared_ptr<Channel> channel = grpc::CreateChannel(
123         server_address_.str(), grpc::InsecureChannelCredentials());
124     stub_ = grpc::testing::EchoTestService::NewStub(channel);
125   }
126 
SendRpc(int num_rpcs)127   void SendRpc(int num_rpcs) {
128     for (int i = 0; i < num_rpcs; i++) {
129       EchoRequest send_request;
130       EchoRequest recv_request;
131       EchoResponse send_response;
132       EchoResponse recv_response;
133       Status recv_status;
134 
135       ClientContext cli_ctx;
136       ServerContext srv_ctx;
137       grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
138 
139       send_request.set_message("hello non-blocking world");
140       std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
141           stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
142 
143       response_reader->StartCall();
144       response_reader->Finish(&recv_response, &recv_status, tag(4));
145 
146       service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
147                             cq_.get(), cq_.get(), tag(2));
148 
149       void* got_tag;
150       bool ok;
151       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
152       EXPECT_TRUE(ok);
153       EXPECT_EQ(detag(got_tag), 2);
154       EXPECT_EQ(send_request.message(), recv_request.message());
155 
156       send_response.set_message(recv_request.message());
157       response_writer.Finish(send_response, Status::OK, tag(3));
158 
159       int tagsum = 0;
160       int tagprod = 1;
161       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
162       EXPECT_TRUE(ok);
163       tagsum += detag(got_tag);
164       tagprod *= detag(got_tag);
165 
166       EXPECT_TRUE(LoopForTag(&got_tag, &ok));
167       EXPECT_TRUE(ok);
168       tagsum += detag(got_tag);
169       tagprod *= detag(got_tag);
170 
171       EXPECT_EQ(tagsum, 7);
172       EXPECT_EQ(tagprod, 12);
173       EXPECT_EQ(send_response.message(), recv_response.message());
174       EXPECT_TRUE(recv_status.ok());
175     }
176   }
177 
178   std::unique_ptr<ServerCompletionQueue> cq_;
179   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
180   std::unique_ptr<Server> server_;
181   std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
182   std::ostringstream server_address_;
183   int port_;
184 };
185 
TEST_F(NonblockingTest,SimpleRpc)186 TEST_F(NonblockingTest, SimpleRpc) {
187   ResetStub();
188   SendRpc(10);
189 }
190 
191 }  // namespace
192 }  // namespace testing
193 }  // namespace grpc
194 
195 #endif  // GRPC_POSIX_SOCKET
196 
main(int argc,char ** argv)197 int main(int argc, char** argv) {
198 #ifdef GRPC_POSIX_SOCKET
199   // Override the poll function before anything else can happen
200   grpc_poll_function = maybe_assert_non_blocking_poll;
201 
202   grpc::testing::TestEnvironment env(argc, argv);
203   ::testing::InitGoogleTest(&argc, argv);
204 
205   // Start the nonblocking poll thread-local variable as false because the
206   // thread that issues RPCs starts by picking a port (which has non-zero
207   // timeout).
208   g_is_nonblocking_poll = false;
209 
210   int ret = RUN_ALL_TESTS();
211 
212   return ret;
213 #else   // GRPC_POSIX_SOCKET
214   (void)argc;
215   (void)argv;
216   return 0;
217 #endif  // GRPC_POSIX_SOCKET
218 }
219