1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <chrono>
20 #include <memory>
21 #include <mutex>
22 #include <sstream>
23 #include <string>
24 #include <thread>
25 #include <vector>
26
27 #include <grpc/grpc.h>
28 #include <grpc/support/alloc.h>
29 #include <grpc/support/log.h>
30 #include <grpc/support/time.h>
31 #include <grpcpp/channel.h>
32 #include <grpcpp/client_context.h>
33 #include <grpcpp/server.h>
34 #include <grpcpp/server_builder.h>
35
36 #include "src/core/lib/profiling/timers.h"
37 #include "src/proto/grpc/testing/benchmark_service.grpc.pb.h"
38 #include "test/cpp/qps/client.h"
39 #include "test/cpp/qps/interarrival.h"
40 #include "test/cpp/qps/usage_timer.h"
41
42 namespace grpc {
43 namespace testing {
44
BenchmarkStubCreator(const std::shared_ptr<Channel> & ch)45 static std::unique_ptr<BenchmarkService::Stub> BenchmarkStubCreator(
46 const std::shared_ptr<Channel>& ch) {
47 return BenchmarkService::NewStub(ch);
48 }
49
50 class SynchronousClient
51 : public ClientImpl<BenchmarkService::Stub, SimpleRequest> {
52 public:
SynchronousClient(const ClientConfig & config)53 SynchronousClient(const ClientConfig& config)
54 : ClientImpl<BenchmarkService::Stub, SimpleRequest>(
55 config, BenchmarkStubCreator) {
56 num_threads_ =
57 config.outstanding_rpcs_per_channel() * config.client_channels();
58 responses_.resize(num_threads_);
59 SetupLoadTest(config, num_threads_);
60 }
61
~SynchronousClient()62 ~SynchronousClient() override {}
63
64 virtual bool InitThreadFuncImpl(size_t thread_idx) = 0;
65 virtual bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) = 0;
66
ThreadFunc(size_t thread_idx,Thread * t)67 void ThreadFunc(size_t thread_idx, Thread* t) override {
68 if (!InitThreadFuncImpl(thread_idx)) {
69 return;
70 }
71 for (;;) {
72 // run the loop body
73 HistogramEntry entry;
74 const bool thread_still_ok = ThreadFuncImpl(&entry, thread_idx);
75 t->UpdateHistogram(&entry);
76 if (!thread_still_ok || ThreadCompleted()) {
77 return;
78 }
79 }
80 }
81
82 protected:
83 // WaitToIssue returns false if we realize that we need to break out
WaitToIssue(int thread_idx)84 bool WaitToIssue(int thread_idx) {
85 if (!closed_loop_) {
86 const gpr_timespec next_issue_time = NextIssueTime(thread_idx);
87 // Avoid sleeping for too long continuously because we might
88 // need to terminate before then. This is an issue since
89 // exponential distribution can occasionally produce bad outliers
90 while (true) {
91 const gpr_timespec one_sec_delay =
92 gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
93 gpr_time_from_seconds(1, GPR_TIMESPAN));
94 if (gpr_time_cmp(next_issue_time, one_sec_delay) <= 0) {
95 gpr_sleep_until(next_issue_time);
96 return true;
97 } else {
98 gpr_sleep_until(one_sec_delay);
99 if (gpr_atm_acq_load(&thread_pool_done_) != static_cast<gpr_atm>(0)) {
100 return false;
101 }
102 }
103 }
104 }
105 return true;
106 }
107
108 size_t num_threads_;
109 std::vector<SimpleResponse> responses_;
110 };
111
112 class SynchronousUnaryClient final : public SynchronousClient {
113 public:
SynchronousUnaryClient(const ClientConfig & config)114 SynchronousUnaryClient(const ClientConfig& config)
115 : SynchronousClient(config) {
116 StartThreads(num_threads_);
117 }
~SynchronousUnaryClient()118 ~SynchronousUnaryClient() override {}
119
InitThreadFuncImpl(size_t)120 bool InitThreadFuncImpl(size_t /*thread_idx*/) override { return true; }
121
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)122 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
123 if (!WaitToIssue(thread_idx)) {
124 return true;
125 }
126 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
127 double start = UsageTimer::Now();
128 GPR_TIMER_SCOPE("SynchronousUnaryClient::ThreadFunc", 0);
129 grpc::ClientContext context;
130 grpc::Status s =
131 stub->UnaryCall(&context, request_, &responses_[thread_idx]);
132 if (s.ok()) {
133 entry->set_value((UsageTimer::Now() - start) * 1e9);
134 }
135 entry->set_status(s.error_code());
136 return true;
137 }
138
139 private:
DestroyMultithreading()140 void DestroyMultithreading() final { EndThreads(); }
141 };
142
143 template <class StreamType>
144 class SynchronousStreamingClient : public SynchronousClient {
145 public:
SynchronousStreamingClient(const ClientConfig & config)146 SynchronousStreamingClient(const ClientConfig& config)
147 : SynchronousClient(config),
148 context_(num_threads_),
149 stream_(num_threads_),
150 stream_mu_(num_threads_),
151 shutdown_(num_threads_),
152 messages_per_stream_(config.messages_per_stream()),
153 messages_issued_(num_threads_) {
154 StartThreads(num_threads_);
155 }
~SynchronousStreamingClient()156 ~SynchronousStreamingClient() override {
157 CleanupAllStreams([this](size_t thread_idx) {
158 // Don't log any kind of error since we may have canceled this
159 stream_[thread_idx]->Finish().IgnoreError();
160 });
161 }
162
163 protected:
164 std::vector<grpc::ClientContext> context_;
165 std::vector<std::unique_ptr<StreamType>> stream_;
166 // stream_mu_ is only needed when changing an element of stream_ or context_
167 std::vector<std::mutex> stream_mu_;
168 // use struct Bool rather than bool because vector<bool> is not concurrent
169 struct Bool {
170 bool val;
Boolgrpc::testing::SynchronousStreamingClient::Bool171 Bool() : val(false) {}
172 };
173 std::vector<Bool> shutdown_;
174 const int messages_per_stream_;
175 std::vector<int> messages_issued_;
176
FinishStream(HistogramEntry * entry,size_t thread_idx)177 void FinishStream(HistogramEntry* entry, size_t thread_idx) {
178 Status s = stream_[thread_idx]->Finish();
179 // don't set the value since the stream is failed and shouldn't be timed
180 entry->set_status(s.error_code());
181 if (!s.ok()) {
182 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
183 if (!shutdown_[thread_idx].val) {
184 gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s",
185 thread_idx, s.error_message().c_str());
186 }
187 }
188 // Lock the stream_mu_ now because the client context could change
189 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
190 context_[thread_idx].~ClientContext();
191 new (&context_[thread_idx]) ClientContext();
192 }
193
CleanupAllStreams(const std::function<void (size_t)> & cleaner)194 void CleanupAllStreams(const std::function<void(size_t)>& cleaner) {
195 std::vector<std::thread> cleanup_threads;
196 for (size_t i = 0; i < num_threads_; i++) {
197 cleanup_threads.emplace_back([this, i, cleaner] {
198 std::lock_guard<std::mutex> l(stream_mu_[i]);
199 shutdown_[i].val = true;
200 if (stream_[i]) {
201 cleaner(i);
202 }
203 });
204 }
205 for (auto& th : cleanup_threads) {
206 th.join();
207 }
208 }
209
210 private:
DestroyMultithreading()211 void DestroyMultithreading() final {
212 CleanupAllStreams(
213 [this](size_t thread_idx) { context_[thread_idx].TryCancel(); });
214 EndThreads();
215 }
216 };
217
218 class SynchronousStreamingPingPongClient final
219 : public SynchronousStreamingClient<
220 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
221 public:
SynchronousStreamingPingPongClient(const ClientConfig & config)222 SynchronousStreamingPingPongClient(const ClientConfig& config)
223 : SynchronousStreamingClient(config) {}
~SynchronousStreamingPingPongClient()224 ~SynchronousStreamingPingPongClient() override {
225 CleanupAllStreams(
226 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
227 }
228
229 private:
InitThreadFuncImpl(size_t thread_idx)230 bool InitThreadFuncImpl(size_t thread_idx) override {
231 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
232 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
233 if (!shutdown_[thread_idx].val) {
234 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
235 } else {
236 return false;
237 }
238 messages_issued_[thread_idx] = 0;
239 return true;
240 }
241
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)242 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
243 if (!WaitToIssue(thread_idx)) {
244 return true;
245 }
246 GPR_TIMER_SCOPE("SynchronousStreamingPingPongClient::ThreadFunc", 0);
247 double start = UsageTimer::Now();
248 if (stream_[thread_idx]->Write(request_) &&
249 stream_[thread_idx]->Read(&responses_[thread_idx])) {
250 entry->set_value((UsageTimer::Now() - start) * 1e9);
251 // don't set the status since there isn't one yet
252 if ((messages_per_stream_ != 0) &&
253 (++messages_issued_[thread_idx] < messages_per_stream_)) {
254 return true;
255 } else if (messages_per_stream_ == 0) {
256 return true;
257 } else {
258 // Fall through to the below resetting code after finish
259 }
260 }
261 stream_[thread_idx]->WritesDone();
262 FinishStream(entry, thread_idx);
263 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
264 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
265 if (!shutdown_[thread_idx].val) {
266 stream_[thread_idx] = stub->StreamingCall(&context_[thread_idx]);
267 } else {
268 stream_[thread_idx].reset();
269 return false;
270 }
271 messages_issued_[thread_idx] = 0;
272 return true;
273 }
274 };
275
276 class SynchronousStreamingFromClientClient final
277 : public SynchronousStreamingClient<grpc::ClientWriter<SimpleRequest>> {
278 public:
SynchronousStreamingFromClientClient(const ClientConfig & config)279 SynchronousStreamingFromClientClient(const ClientConfig& config)
280 : SynchronousStreamingClient(config), last_issue_(num_threads_) {}
~SynchronousStreamingFromClientClient()281 ~SynchronousStreamingFromClientClient() override {
282 CleanupAllStreams(
283 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
284 }
285
286 private:
287 std::vector<double> last_issue_;
288
InitThreadFuncImpl(size_t thread_idx)289 bool InitThreadFuncImpl(size_t thread_idx) override {
290 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
291 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
292 if (!shutdown_[thread_idx].val) {
293 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
294 &responses_[thread_idx]);
295 } else {
296 return false;
297 }
298 last_issue_[thread_idx] = UsageTimer::Now();
299 return true;
300 }
301
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)302 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
303 // Figure out how to make histogram sensible if this is rate-paced
304 if (!WaitToIssue(thread_idx)) {
305 return true;
306 }
307 GPR_TIMER_SCOPE("SynchronousStreamingFromClientClient::ThreadFunc", 0);
308 if (stream_[thread_idx]->Write(request_)) {
309 double now = UsageTimer::Now();
310 entry->set_value((now - last_issue_[thread_idx]) * 1e9);
311 last_issue_[thread_idx] = now;
312 return true;
313 }
314 stream_[thread_idx]->WritesDone();
315 FinishStream(entry, thread_idx);
316 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
317 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
318 if (!shutdown_[thread_idx].val) {
319 stream_[thread_idx] = stub->StreamingFromClient(&context_[thread_idx],
320 &responses_[thread_idx]);
321 } else {
322 stream_[thread_idx].reset();
323 return false;
324 }
325 return true;
326 }
327 };
328
329 class SynchronousStreamingFromServerClient final
330 : public SynchronousStreamingClient<grpc::ClientReader<SimpleResponse>> {
331 public:
SynchronousStreamingFromServerClient(const ClientConfig & config)332 SynchronousStreamingFromServerClient(const ClientConfig& config)
333 : SynchronousStreamingClient(config), last_recv_(num_threads_) {}
~SynchronousStreamingFromServerClient()334 ~SynchronousStreamingFromServerClient() override {}
335
336 private:
337 std::vector<double> last_recv_;
338
InitThreadFuncImpl(size_t thread_idx)339 bool InitThreadFuncImpl(size_t thread_idx) override {
340 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
341 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
342 if (!shutdown_[thread_idx].val) {
343 stream_[thread_idx] =
344 stub->StreamingFromServer(&context_[thread_idx], request_);
345 } else {
346 return false;
347 }
348 last_recv_[thread_idx] = UsageTimer::Now();
349 return true;
350 }
351
ThreadFuncImpl(HistogramEntry * entry,size_t thread_idx)352 bool ThreadFuncImpl(HistogramEntry* entry, size_t thread_idx) override {
353 GPR_TIMER_SCOPE("SynchronousStreamingFromServerClient::ThreadFunc", 0);
354 if (stream_[thread_idx]->Read(&responses_[thread_idx])) {
355 double now = UsageTimer::Now();
356 entry->set_value((now - last_recv_[thread_idx]) * 1e9);
357 last_recv_[thread_idx] = now;
358 return true;
359 }
360 FinishStream(entry, thread_idx);
361 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
362 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
363 if (!shutdown_[thread_idx].val) {
364 stream_[thread_idx] =
365 stub->StreamingFromServer(&context_[thread_idx], request_);
366 } else {
367 stream_[thread_idx].reset();
368 return false;
369 }
370 return true;
371 }
372 };
373
374 class SynchronousStreamingBothWaysClient final
375 : public SynchronousStreamingClient<
376 grpc::ClientReaderWriter<SimpleRequest, SimpleResponse>> {
377 public:
SynchronousStreamingBothWaysClient(const ClientConfig & config)378 SynchronousStreamingBothWaysClient(const ClientConfig& config)
379 : SynchronousStreamingClient(config) {}
~SynchronousStreamingBothWaysClient()380 ~SynchronousStreamingBothWaysClient() override {
381 CleanupAllStreams(
382 [this](size_t thread_idx) { stream_[thread_idx]->WritesDone(); });
383 }
384
385 private:
InitThreadFuncImpl(size_t thread_idx)386 bool InitThreadFuncImpl(size_t thread_idx) override {
387 auto* stub = channels_[thread_idx % channels_.size()].get_stub();
388 std::lock_guard<std::mutex> l(stream_mu_[thread_idx]);
389 if (!shutdown_[thread_idx].val) {
390 stream_[thread_idx] = stub->StreamingBothWays(&context_[thread_idx]);
391 } else {
392 return false;
393 }
394 return true;
395 }
396
ThreadFuncImpl(HistogramEntry *,size_t)397 bool ThreadFuncImpl(HistogramEntry* /*entry*/,
398 size_t /*thread_idx*/) override {
399 // TODO (vjpai): Do this
400 return true;
401 }
402 };
403
CreateSynchronousClient(const ClientConfig & config)404 std::unique_ptr<Client> CreateSynchronousClient(const ClientConfig& config) {
405 GPR_ASSERT(!config.use_coalesce_api()); // not supported yet.
406 switch (config.rpc_type()) {
407 case UNARY:
408 return std::unique_ptr<Client>(new SynchronousUnaryClient(config));
409 case STREAMING:
410 return std::unique_ptr<Client>(
411 new SynchronousStreamingPingPongClient(config));
412 case STREAMING_FROM_CLIENT:
413 return std::unique_ptr<Client>(
414 new SynchronousStreamingFromClientClient(config));
415 case STREAMING_FROM_SERVER:
416 return std::unique_ptr<Client>(
417 new SynchronousStreamingFromServerClient(config));
418 case STREAMING_BOTH_WAYS:
419 return std::unique_ptr<Client>(
420 new SynchronousStreamingBothWaysClient(config));
421 default:
422 assert(false);
423 return nullptr;
424 }
425 }
426
427 } // namespace testing
428 } // namespace grpc
429