1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <random>
20 #include <glog/logging.h>
21 #include <folly/init/Init.h>
22 #include <folly/portability/GFlags.h>
23 #include <thrift/perf/cpp2/util/Operation.h>
24 #include <thrift/perf/cpp2/util/QPSStats.h>
25 #include <thrift/perf/cpp2/util/Util.h>
26 
27 using apache::thrift::ClientConnectionIf;
28 using apache::thrift::ClientReceiveState;
29 using apache::thrift::RequestCallback;
30 using facebook::thrift::benchmarks::QPSStats;
31 
32 template <typename AsyncClient>
33 class LoadCallback;
34 
35 template <typename AsyncClient>
36 class Runner {
37  public:
38   friend class LoadCallback<AsyncClient>;
39 
Runner(std::shared_ptr<folly::EventBase> evb,std::unique_ptr<Operation<AsyncClient>> ops,std::unique_ptr<std::discrete_distribution<int32_t>> distribution,int32_t max_outstanding_ops)40   Runner(
41       std::shared_ptr<folly::EventBase> evb,
42       std::unique_ptr<Operation<AsyncClient>> ops,
43       std::unique_ptr<std::discrete_distribution<int32_t>> distribution,
44       int32_t max_outstanding_ops)
45       : evb_(evb),
46         ops_(std::move(ops)),
47         d_(std::move(distribution)),
48         max_outstanding_ops_(max_outstanding_ops) {}
49 
run()50   void run() {
51     // TODO: Implement sync calls.
52     while (ops_->outstandingOps() < max_outstanding_ops_) {
53       auto op = static_cast<OP_TYPE>((*d_)(gen_));
54       auto cb =
55           std::make_unique<LoadCallback<AsyncClient>>(this, ops_.get(), op);
56       ops_->async(op, std::move(cb));
57     }
58   }
59 
finishCall()60   void finishCall() {
61     run(); // Attempt to perform more async calls
62   }
63 
64  private:
65   std::shared_ptr<folly::EventBase> evb_;
66   std::unique_ptr<Operation<AsyncClient>> ops_;
67   std::unique_ptr<std::discrete_distribution<int32_t>> d_;
68   int32_t max_outstanding_ops_;
69 
70   std::mt19937 gen_{std::random_device()()};
71 };
72 
73 template <typename AsyncClient>
74 class LoadCallback : public RequestCallbackWithValidator {
75  public:
LoadCallback(Runner<AsyncClient> * runner,Operation<AsyncClient> * ops,OP_TYPE op)76   LoadCallback(
77       Runner<AsyncClient>* runner, Operation<AsyncClient>* ops, OP_TYPE op)
78       : runner_(runner), ops_(ops), op_(op) {}
79 
setIsOneway()80   void setIsOneway() { isOneway_ = true; }
81 
82   // TODO: Properly handle errors and exceptions
requestSent()83   void requestSent() override {
84     if (isOneway_) {
85       ops_->onewaySent(op_);
86       runner_->finishCall();
87     }
88   }
replyReceived(ClientReceiveState && rstate)89   void replyReceived(ClientReceiveState&& rstate) override {
90     if (validator) {
91       validator(rstate);
92     }
93     ops_->asyncReceived(op_, std::move(rstate));
94     runner_->finishCall();
95   }
requestError(ClientReceiveState && rstate)96   void requestError(ClientReceiveState&& rstate) override {
97     ops_->asyncErrorReceived(op_, std::move(rstate));
98     runner_->finishCall();
99   }
100 
101  private:
102   Runner<AsyncClient>* runner_;
103   Operation<AsyncClient>* ops_;
104   OP_TYPE op_;
105   bool isOneway_{false};
106 };
107