1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <random> 20 #include <glog/logging.h> 21 #include <folly/init/Init.h> 22 #include <folly/portability/GFlags.h> 23 #include <thrift/perf/cpp2/util/Operation.h> 24 #include <thrift/perf/cpp2/util/QPSStats.h> 25 #include <thrift/perf/cpp2/util/Util.h> 26 27 using apache::thrift::ClientConnectionIf; 28 using apache::thrift::ClientReceiveState; 29 using apache::thrift::RequestCallback; 30 using facebook::thrift::benchmarks::QPSStats; 31 32 template <typename AsyncClient> 33 class LoadCallback; 34 35 template <typename AsyncClient> 36 class Runner { 37 public: 38 friend class LoadCallback<AsyncClient>; 39 Runner(std::shared_ptr<folly::EventBase> evb,std::unique_ptr<Operation<AsyncClient>> ops,std::unique_ptr<std::discrete_distribution<int32_t>> distribution,int32_t max_outstanding_ops)40 Runner( 41 std::shared_ptr<folly::EventBase> evb, 42 std::unique_ptr<Operation<AsyncClient>> ops, 43 std::unique_ptr<std::discrete_distribution<int32_t>> distribution, 44 int32_t max_outstanding_ops) 45 : evb_(evb), 46 ops_(std::move(ops)), 47 d_(std::move(distribution)), 48 max_outstanding_ops_(max_outstanding_ops) {} 49 run()50 void run() { 51 // TODO: Implement sync calls. 52 while (ops_->outstandingOps() < max_outstanding_ops_) { 53 auto op = static_cast<OP_TYPE>((*d_)(gen_)); 54 auto cb = 55 std::make_unique<LoadCallback<AsyncClient>>(this, ops_.get(), op); 56 ops_->async(op, std::move(cb)); 57 } 58 } 59 finishCall()60 void finishCall() { 61 run(); // Attempt to perform more async calls 62 } 63 64 private: 65 std::shared_ptr<folly::EventBase> evb_; 66 std::unique_ptr<Operation<AsyncClient>> ops_; 67 std::unique_ptr<std::discrete_distribution<int32_t>> d_; 68 int32_t max_outstanding_ops_; 69 70 std::mt19937 gen_{std::random_device()()}; 71 }; 72 73 template <typename AsyncClient> 74 class LoadCallback : public RequestCallbackWithValidator { 75 public: LoadCallback(Runner<AsyncClient> * runner,Operation<AsyncClient> * ops,OP_TYPE op)76 LoadCallback( 77 Runner<AsyncClient>* runner, Operation<AsyncClient>* ops, OP_TYPE op) 78 : runner_(runner), ops_(ops), op_(op) {} 79 setIsOneway()80 void setIsOneway() { isOneway_ = true; } 81 82 // TODO: Properly handle errors and exceptions requestSent()83 void requestSent() override { 84 if (isOneway_) { 85 ops_->onewaySent(op_); 86 runner_->finishCall(); 87 } 88 } replyReceived(ClientReceiveState && rstate)89 void replyReceived(ClientReceiveState&& rstate) override { 90 if (validator) { 91 validator(rstate); 92 } 93 ops_->asyncReceived(op_, std::move(rstate)); 94 runner_->finishCall(); 95 } requestError(ClientReceiveState && rstate)96 void requestError(ClientReceiveState&& rstate) override { 97 ops_->asyncErrorReceived(op_, std::move(rstate)); 98 runner_->finishCall(); 99 } 100 101 private: 102 Runner<AsyncClient>* runner_; 103 Operation<AsyncClient>* ops_; 104 OP_TYPE op_; 105 bool isOneway_{false}; 106 }; 107