1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <algorithm>
20 #include <forward_list>
21 #include <functional>
22 #include <memory>
23 #include <mutex>
24 #include <thread>
25
26 #include <grpc/grpc.h>
27 #include <grpc/support/alloc.h>
28 #include <grpc/support/log.h>
29 #include <grpcpp/generic/async_generic_service.h>
30 #include <grpcpp/resource_quota.h>
31 #include <grpcpp/security/server_credentials.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
35 #include <grpcpp/support/config.h>
36
37 #include "src/core/lib/gprpp/host_port.h"
38 #include "src/core/lib/surface/completion_queue.h"
39 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
40 #include "test/core/util/test_config.h"
41 #include "test/cpp/qps/qps_server_builder.h"
42 #include "test/cpp/qps/server.h"
43
44 namespace grpc {
45 namespace testing {
46
47 template <class RequestType, class ResponseType, class ServiceType,
48 class ServerContextType>
49 class AsyncQpsServerTest final : public grpc::testing::Server {
50 public:
AsyncQpsServerTest(const ServerConfig & config,std::function<void (ServerBuilder *,ServiceType *)> register_service,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncResponseWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_unary_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReader<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_client_function,std::function<void (ServiceType *,ServerContextType *,RequestType *,ServerAsyncWriter<ResponseType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_from_server_function,std::function<void (ServiceType *,ServerContextType *,ServerAsyncReaderWriter<ResponseType,RequestType> *,CompletionQueue *,ServerCompletionQueue *,void *)> request_streaming_both_ways_function,std::function<grpc::Status (const PayloadConfig &,RequestType *,ResponseType *)> process_rpc)51 AsyncQpsServerTest(
52 const ServerConfig& config,
53 std::function<void(ServerBuilder*, ServiceType*)> register_service,
54 std::function<void(ServiceType*, ServerContextType*, RequestType*,
55 ServerAsyncResponseWriter<ResponseType>*,
56 CompletionQueue*, ServerCompletionQueue*, void*)>
57 request_unary_function,
58 std::function<void(ServiceType*, ServerContextType*,
59 ServerAsyncReaderWriter<ResponseType, RequestType>*,
60 CompletionQueue*, ServerCompletionQueue*, void*)>
61 request_streaming_function,
62 std::function<void(ServiceType*, ServerContextType*,
63 ServerAsyncReader<ResponseType, RequestType>*,
64 CompletionQueue*, ServerCompletionQueue*, void*)>
65 request_streaming_from_client_function,
66 std::function<void(ServiceType*, ServerContextType*, RequestType*,
67 ServerAsyncWriter<ResponseType>*, CompletionQueue*,
68 ServerCompletionQueue*, void*)>
69 request_streaming_from_server_function,
70 std::function<void(ServiceType*, ServerContextType*,
71 ServerAsyncReaderWriter<ResponseType, RequestType>*,
72 CompletionQueue*, ServerCompletionQueue*, void*)>
73 request_streaming_both_ways_function,
74 std::function<grpc::Status(const PayloadConfig&, RequestType*,
75 ResponseType*)>
76 process_rpc)
77 : Server(config) {
78 std::unique_ptr<ServerBuilder> builder = CreateQpsServerBuilder();
79
80 auto port_num = port();
81 // Negative port number means inproc server, so no listen port needed
82 if (port_num >= 0) {
83 std::string server_address = grpc_core::JoinHostPort("::", port_num);
84 builder->AddListeningPort(server_address.c_str(),
85 Server::CreateServerCredentials(config));
86 }
87
88 register_service(builder.get(), &async_service_);
89
90 int num_threads = config.async_server_threads();
91 if (num_threads <= 0) { // dynamic sizing
92 num_threads = cores();
93 gpr_log(GPR_INFO, "Sizing async server to %d threads", num_threads);
94 }
95
96 int tpc = std::max(1, config.threads_per_cq()); // 1 if unspecified
97 int num_cqs = (num_threads + tpc - 1) / tpc; // ceiling operator
98 for (int i = 0; i < num_cqs; i++) {
99 srv_cqs_.emplace_back(builder->AddCompletionQueue());
100 }
101 for (int i = 0; i < num_threads; i++) {
102 cq_.emplace_back(i % srv_cqs_.size());
103 }
104
105 ApplyConfigToBuilder(config, builder.get());
106
107 server_ = builder->BuildAndStart();
108
109 auto process_rpc_bound =
110 std::bind(process_rpc, config.payload_config(), std::placeholders::_1,
111 std::placeholders::_2);
112
113 for (int i = 0; i < 5000; i++) {
114 for (int j = 0; j < num_cqs; j++) {
115 if (request_unary_function) {
116 auto request_unary = std::bind(
117 request_unary_function, &async_service_, std::placeholders::_1,
118 std::placeholders::_2, std::placeholders::_3, srv_cqs_[j].get(),
119 srv_cqs_[j].get(), std::placeholders::_4);
120 contexts_.emplace_back(
121 new ServerRpcContextUnaryImpl(request_unary, process_rpc_bound));
122 }
123 if (request_streaming_function) {
124 auto request_streaming = std::bind(
125 request_streaming_function, &async_service_,
126 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
127 srv_cqs_[j].get(), std::placeholders::_3);
128 contexts_.emplace_back(new ServerRpcContextStreamingImpl(
129 request_streaming, process_rpc_bound));
130 }
131 if (request_streaming_from_client_function) {
132 auto request_streaming_from_client = std::bind(
133 request_streaming_from_client_function, &async_service_,
134 std::placeholders::_1, std::placeholders::_2, srv_cqs_[j].get(),
135 srv_cqs_[j].get(), std::placeholders::_3);
136 contexts_.emplace_back(new ServerRpcContextStreamingFromClientImpl(
137 request_streaming_from_client, process_rpc_bound));
138 }
139 if (request_streaming_from_server_function) {
140 auto request_streaming_from_server =
141 std::bind(request_streaming_from_server_function, &async_service_,
142 std::placeholders::_1, std::placeholders::_2,
143 std::placeholders::_3, srv_cqs_[j].get(),
144 srv_cqs_[j].get(), std::placeholders::_4);
145 contexts_.emplace_back(new ServerRpcContextStreamingFromServerImpl(
146 request_streaming_from_server, process_rpc_bound));
147 }
148 if (request_streaming_both_ways_function) {
149 // TODO(vjpai): Add this code
150 }
151 }
152 }
153
154 for (int i = 0; i < num_threads; i++) {
155 shutdown_state_.emplace_back(new PerThreadShutdownState());
156 threads_.emplace_back(&AsyncQpsServerTest::ThreadFunc, this, i);
157 }
158 }
~AsyncQpsServerTest()159 ~AsyncQpsServerTest() {
160 for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
161 std::lock_guard<std::mutex> lock((*ss)->mutex);
162 (*ss)->shutdown = true;
163 }
164 // TODO(vjpai): Remove the following deadline and allow full proper
165 // shutdown.
166 server_->Shutdown(std::chrono::system_clock::now() +
167 std::chrono::seconds(3));
168 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
169 (*cq)->Shutdown();
170 }
171 for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
172 thr->join();
173 }
174 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); ++cq) {
175 bool ok;
176 void* got_tag;
177 while ((*cq)->Next(&got_tag, &ok))
178 ;
179 }
180 }
181
GetPollCount()182 int GetPollCount() override {
183 int count = 0;
184 for (auto cq = srv_cqs_.begin(); cq != srv_cqs_.end(); cq++) {
185 count += grpc_get_cq_poll_num((*cq)->cq());
186 }
187 return count;
188 }
189
InProcessChannel(const ChannelArguments & args)190 std::shared_ptr<Channel> InProcessChannel(
191 const ChannelArguments& args) override {
192 return server_->InProcessChannel(args);
193 }
194
195 private:
ThreadFunc(int thread_idx)196 void ThreadFunc(int thread_idx) {
197 // Wait until work is available or we are shutting down
198 bool ok;
199 void* got_tag;
200 if (!srv_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
201 return;
202 }
203 ServerRpcContext* ctx;
204 std::mutex* mu_ptr = &shutdown_state_[thread_idx]->mutex;
205 do {
206 ctx = detag(got_tag);
207 // The tag is a pointer to an RPC context to invoke
208 // Proceed while holding a lock to make sure that
209 // this thread isn't supposed to shut down
210 mu_ptr->lock();
211 if (shutdown_state_[thread_idx]->shutdown) {
212 mu_ptr->unlock();
213 return;
214 }
215 } while (srv_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
216 [&, ctx, ok, mu_ptr]() {
217 ctx->lock();
218 if (!ctx->RunNextState(ok)) {
219 ctx->Reset();
220 }
221 ctx->unlock();
222 mu_ptr->unlock();
223 },
224 &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
225 }
226
227 class ServerRpcContext {
228 public:
ServerRpcContext()229 ServerRpcContext() {}
lock()230 void lock() { mu_.lock(); }
unlock()231 void unlock() { mu_.unlock(); }
~ServerRpcContext()232 virtual ~ServerRpcContext(){};
233 virtual bool RunNextState(bool) = 0; // next state, return false if done
234 virtual void Reset() = 0; // start this back at a clean state
235 private:
236 std::mutex mu_;
237 };
tag(ServerRpcContext * func)238 static void* tag(ServerRpcContext* func) { return static_cast<void*>(func); }
detag(void * tag)239 static ServerRpcContext* detag(void* tag) {
240 return static_cast<ServerRpcContext*>(tag);
241 }
242
243 class ServerRpcContextUnaryImpl final : public ServerRpcContext {
244 public:
ServerRpcContextUnaryImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncResponseWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)245 ServerRpcContextUnaryImpl(
246 std::function<void(ServerContextType*, RequestType*,
247 grpc::ServerAsyncResponseWriter<ResponseType>*,
248 void*)>
249 request_method,
250 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
251 : srv_ctx_(new ServerContextType),
252 next_state_(&ServerRpcContextUnaryImpl::invoker),
253 request_method_(request_method),
254 invoke_method_(invoke_method),
255 response_writer_(srv_ctx_.get()) {
256 request_method_(srv_ctx_.get(), &req_, &response_writer_,
257 AsyncQpsServerTest::tag(this));
258 }
~ServerRpcContextUnaryImpl()259 ~ServerRpcContextUnaryImpl() override {}
RunNextState(bool ok)260 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()261 void Reset() override {
262 srv_ctx_.reset(new ServerContextType);
263 req_ = RequestType();
264 response_writer_ =
265 grpc::ServerAsyncResponseWriter<ResponseType>(srv_ctx_.get());
266
267 // Then request the method
268 next_state_ = &ServerRpcContextUnaryImpl::invoker;
269 request_method_(srv_ctx_.get(), &req_, &response_writer_,
270 AsyncQpsServerTest::tag(this));
271 }
272
273 private:
finisher(bool)274 bool finisher(bool) { return false; }
invoker(bool ok)275 bool invoker(bool ok) {
276 if (!ok) {
277 return false;
278 }
279
280 // Call the RPC processing function
281 grpc::Status status = invoke_method_(&req_, &response_);
282
283 // Have the response writer work and invoke on_finish when done
284 next_state_ = &ServerRpcContextUnaryImpl::finisher;
285 response_writer_.Finish(response_, status, AsyncQpsServerTest::tag(this));
286 return true;
287 }
288 std::unique_ptr<ServerContextType> srv_ctx_;
289 RequestType req_;
290 ResponseType response_;
291 bool (ServerRpcContextUnaryImpl::*next_state_)(bool);
292 std::function<void(ServerContextType*, RequestType*,
293 grpc::ServerAsyncResponseWriter<ResponseType>*, void*)>
294 request_method_;
295 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
296 grpc::ServerAsyncResponseWriter<ResponseType> response_writer_;
297 };
298
299 class ServerRpcContextStreamingImpl final : public ServerRpcContext {
300 public:
ServerRpcContextStreamingImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReaderWriter<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)301 ServerRpcContextStreamingImpl(
302 std::function<void(
303 ServerContextType*,
304 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
305 request_method,
306 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
307 : srv_ctx_(new ServerContextType),
308 next_state_(&ServerRpcContextStreamingImpl::request_done),
309 request_method_(request_method),
310 invoke_method_(invoke_method),
311 stream_(srv_ctx_.get()) {
312 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
313 }
~ServerRpcContextStreamingImpl()314 ~ServerRpcContextStreamingImpl() override {}
RunNextState(bool ok)315 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()316 void Reset() override {
317 srv_ctx_.reset(new ServerContextType);
318 req_ = RequestType();
319 stream_ = grpc::ServerAsyncReaderWriter<ResponseType, RequestType>(
320 srv_ctx_.get());
321
322 // Then request the method
323 next_state_ = &ServerRpcContextStreamingImpl::request_done;
324 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
325 }
326
327 private:
request_done(bool ok)328 bool request_done(bool ok) {
329 if (!ok) {
330 return false;
331 }
332 next_state_ = &ServerRpcContextStreamingImpl::read_done;
333 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
334 return true;
335 }
336
read_done(bool ok)337 bool read_done(bool ok) {
338 if (ok) {
339 // invoke the method
340 // Call the RPC processing function
341 grpc::Status status = invoke_method_(&req_, &response_);
342 // initiate the write
343 next_state_ = &ServerRpcContextStreamingImpl::write_done;
344 stream_.Write(response_, AsyncQpsServerTest::tag(this));
345 } else { // client has sent writes done
346 // finish the stream
347 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
348 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
349 }
350 return true;
351 }
write_done(bool ok)352 bool write_done(bool ok) {
353 // now go back and get another streaming read!
354 if (ok) {
355 next_state_ = &ServerRpcContextStreamingImpl::read_done;
356 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
357 } else {
358 next_state_ = &ServerRpcContextStreamingImpl::finish_done;
359 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
360 }
361 return true;
362 }
finish_done(bool)363 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
364
365 std::unique_ptr<ServerContextType> srv_ctx_;
366 RequestType req_;
367 ResponseType response_;
368 bool (ServerRpcContextStreamingImpl::*next_state_)(bool);
369 std::function<void(
370 ServerContextType*,
371 grpc::ServerAsyncReaderWriter<ResponseType, RequestType>*, void*)>
372 request_method_;
373 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
374 grpc::ServerAsyncReaderWriter<ResponseType, RequestType> stream_;
375 };
376
377 class ServerRpcContextStreamingFromClientImpl final
378 : public ServerRpcContext {
379 public:
ServerRpcContextStreamingFromClientImpl(std::function<void (ServerContextType *,grpc::ServerAsyncReader<ResponseType,RequestType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)380 ServerRpcContextStreamingFromClientImpl(
381 std::function<void(ServerContextType*,
382 grpc::ServerAsyncReader<ResponseType, RequestType>*,
383 void*)>
384 request_method,
385 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
386 : srv_ctx_(new ServerContextType),
387 next_state_(&ServerRpcContextStreamingFromClientImpl::request_done),
388 request_method_(request_method),
389 invoke_method_(invoke_method),
390 stream_(srv_ctx_.get()) {
391 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
392 }
~ServerRpcContextStreamingFromClientImpl()393 ~ServerRpcContextStreamingFromClientImpl() override {}
RunNextState(bool ok)394 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()395 void Reset() override {
396 srv_ctx_.reset(new ServerContextType);
397 req_ = RequestType();
398 stream_ =
399 grpc::ServerAsyncReader<ResponseType, RequestType>(srv_ctx_.get());
400
401 // Then request the method
402 next_state_ = &ServerRpcContextStreamingFromClientImpl::request_done;
403 request_method_(srv_ctx_.get(), &stream_, AsyncQpsServerTest::tag(this));
404 }
405
406 private:
request_done(bool ok)407 bool request_done(bool ok) {
408 if (!ok) {
409 return false;
410 }
411 next_state_ = &ServerRpcContextStreamingFromClientImpl::read_done;
412 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
413 return true;
414 }
415
read_done(bool ok)416 bool read_done(bool ok) {
417 if (ok) {
418 // In this case, just do another read
419 // next_state_ is unchanged
420 stream_.Read(&req_, AsyncQpsServerTest::tag(this));
421 return true;
422 } else { // client has sent writes done
423 // invoke the method
424 // Call the RPC processing function
425 grpc::Status status = invoke_method_(&req_, &response_);
426 // finish the stream
427 next_state_ = &ServerRpcContextStreamingFromClientImpl::finish_done;
428 stream_.Finish(response_, Status::OK, AsyncQpsServerTest::tag(this));
429 }
430 return true;
431 }
finish_done(bool)432 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
433
434 std::unique_ptr<ServerContextType> srv_ctx_;
435 RequestType req_;
436 ResponseType response_;
437 bool (ServerRpcContextStreamingFromClientImpl::*next_state_)(bool);
438 std::function<void(ServerContextType*,
439 grpc::ServerAsyncReader<ResponseType, RequestType>*,
440 void*)>
441 request_method_;
442 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
443 grpc::ServerAsyncReader<ResponseType, RequestType> stream_;
444 };
445
446 class ServerRpcContextStreamingFromServerImpl final
447 : public ServerRpcContext {
448 public:
ServerRpcContextStreamingFromServerImpl(std::function<void (ServerContextType *,RequestType *,grpc::ServerAsyncWriter<ResponseType> *,void *)> request_method,std::function<grpc::Status (RequestType *,ResponseType *)> invoke_method)449 ServerRpcContextStreamingFromServerImpl(
450 std::function<void(ServerContextType*, RequestType*,
451 grpc::ServerAsyncWriter<ResponseType>*, void*)>
452 request_method,
453 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method)
454 : srv_ctx_(new ServerContextType),
455 next_state_(&ServerRpcContextStreamingFromServerImpl::request_done),
456 request_method_(request_method),
457 invoke_method_(invoke_method),
458 stream_(srv_ctx_.get()) {
459 request_method_(srv_ctx_.get(), &req_, &stream_,
460 AsyncQpsServerTest::tag(this));
461 }
~ServerRpcContextStreamingFromServerImpl()462 ~ServerRpcContextStreamingFromServerImpl() override {}
RunNextState(bool ok)463 bool RunNextState(bool ok) override { return (this->*next_state_)(ok); }
Reset()464 void Reset() override {
465 srv_ctx_.reset(new ServerContextType);
466 req_ = RequestType();
467 stream_ = grpc::ServerAsyncWriter<ResponseType>(srv_ctx_.get());
468
469 // Then request the method
470 next_state_ = &ServerRpcContextStreamingFromServerImpl::request_done;
471 request_method_(srv_ctx_.get(), &req_, &stream_,
472 AsyncQpsServerTest::tag(this));
473 }
474
475 private:
request_done(bool ok)476 bool request_done(bool ok) {
477 if (!ok) {
478 return false;
479 }
480 // invoke the method
481 // Call the RPC processing function
482 grpc::Status status = invoke_method_(&req_, &response_);
483
484 next_state_ = &ServerRpcContextStreamingFromServerImpl::write_done;
485 stream_.Write(response_, AsyncQpsServerTest::tag(this));
486 return true;
487 }
488
write_done(bool ok)489 bool write_done(bool ok) {
490 if (ok) {
491 // Do another write!
492 // next_state_ is unchanged
493 stream_.Write(response_, AsyncQpsServerTest::tag(this));
494 } else { // must be done so let's finish
495 next_state_ = &ServerRpcContextStreamingFromServerImpl::finish_done;
496 stream_.Finish(Status::OK, AsyncQpsServerTest::tag(this));
497 }
498 return true;
499 }
finish_done(bool)500 bool finish_done(bool /*ok*/) { return false; /*reset the context*/ }
501
502 std::unique_ptr<ServerContextType> srv_ctx_;
503 RequestType req_;
504 ResponseType response_;
505 bool (ServerRpcContextStreamingFromServerImpl::*next_state_)(bool);
506 std::function<void(ServerContextType*, RequestType*,
507 grpc::ServerAsyncWriter<ResponseType>*, void*)>
508 request_method_;
509 std::function<grpc::Status(RequestType*, ResponseType*)> invoke_method_;
510 grpc::ServerAsyncWriter<ResponseType> stream_;
511 };
512
513 std::vector<std::thread> threads_;
514 std::unique_ptr<grpc::Server> server_;
515 std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> srv_cqs_;
516 std::vector<int> cq_;
517 ServiceType async_service_;
518 std::vector<std::unique_ptr<ServerRpcContext>> contexts_;
519
520 struct PerThreadShutdownState {
521 mutable std::mutex mutex;
522 bool shutdown;
PerThreadShutdownStategrpc::testing::AsyncQpsServerTest::PerThreadShutdownState523 PerThreadShutdownState() : shutdown(false) {}
524 };
525
526 std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
527 };
528
RegisterBenchmarkService(ServerBuilder * builder,BenchmarkService::AsyncService * service)529 static void RegisterBenchmarkService(ServerBuilder* builder,
530 BenchmarkService::AsyncService* service) {
531 builder->RegisterService(service);
532 }
RegisterGenericService(ServerBuilder * builder,grpc::AsyncGenericService * service)533 static void RegisterGenericService(ServerBuilder* builder,
534 grpc::AsyncGenericService* service) {
535 builder->RegisterAsyncGenericService(service);
536 }
537
ProcessSimpleRPC(const PayloadConfig &,SimpleRequest * request,SimpleResponse * response)538 static Status ProcessSimpleRPC(const PayloadConfig&, SimpleRequest* request,
539 SimpleResponse* response) {
540 if (request->response_size() > 0) {
541 if (!Server::SetPayload(request->response_type(), request->response_size(),
542 response->mutable_payload())) {
543 return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
544 }
545 }
546 // We are done using the request. Clear it to reduce working memory.
547 // This proves to reduce cache misses in large message size cases.
548 request->Clear();
549 return Status::OK;
550 }
551
ProcessGenericRPC(const PayloadConfig & payload_config,ByteBuffer * request,ByteBuffer * response)552 static Status ProcessGenericRPC(const PayloadConfig& payload_config,
553 ByteBuffer* request, ByteBuffer* response) {
554 // We are done using the request. Clear it to reduce working memory.
555 // This proves to reduce cache misses in large message size cases.
556 request->Clear();
557 int resp_size = payload_config.bytebuf_params().resp_size();
558 std::unique_ptr<char[]> buf(new char[resp_size]);
559 memset(buf.get(), 0, static_cast<size_t>(resp_size));
560 Slice slice(buf.get(), resp_size);
561 *response = ByteBuffer(&slice, 1);
562 return Status::OK;
563 }
564
CreateAsyncServer(const ServerConfig & config)565 std::unique_ptr<Server> CreateAsyncServer(const ServerConfig& config) {
566 return std::unique_ptr<Server>(
567 new AsyncQpsServerTest<SimpleRequest, SimpleResponse,
568 BenchmarkService::AsyncService,
569 grpc::ServerContext>(
570 config, RegisterBenchmarkService,
571 &BenchmarkService::AsyncService::RequestUnaryCall,
572 &BenchmarkService::AsyncService::RequestStreamingCall,
573 &BenchmarkService::AsyncService::RequestStreamingFromClient,
574 &BenchmarkService::AsyncService::RequestStreamingFromServer,
575 &BenchmarkService::AsyncService::RequestStreamingBothWays,
576 ProcessSimpleRPC));
577 }
CreateAsyncGenericServer(const ServerConfig & config)578 std::unique_ptr<Server> CreateAsyncGenericServer(const ServerConfig& config) {
579 return std::unique_ptr<Server>(
580 new AsyncQpsServerTest<ByteBuffer, ByteBuffer, grpc::AsyncGenericService,
581 grpc::GenericServerContext>(
582 config, RegisterGenericService, nullptr,
583 &grpc::AsyncGenericService::RequestCall, nullptr, nullptr, nullptr,
584 ProcessGenericRPC));
585 }
586
587 } // namespace testing
588 } // namespace grpc
589