1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thrift/perf/cpp/AsyncLoadHandler2.h>
18 
19 #include <folly/io/async/EventBase.h>
20 #include <folly/portability/Unistd.h>
21 #include <thrift/lib/cpp/concurrency/Util.h>
22 
23 using namespace apache::thrift::test;
24 
25 using apache::thrift::concurrency::Util;
26 
27 namespace apache {
28 namespace thrift {
29 
async_eb_noop(std::unique_ptr<HandlerCallback<void>> callback)30 void AsyncLoadHandler2::async_eb_noop(
31     std::unique_ptr<HandlerCallback<void>> callback) {
32   // Note that we could have done this with a sync function,
33   // but an inline async op is faster, and we want to maintain
34   // parity with the old loadgen for comparison testing
35   callback->done();
36 }
37 
async_eb_onewayNoop(std::unique_ptr<HandlerCallbackBase>)38 void AsyncLoadHandler2::async_eb_onewayNoop(
39     std::unique_ptr<HandlerCallbackBase> /* callback */) {}
40 
async_eb_asyncNoop(std::unique_ptr<HandlerCallback<void>> callback)41 void AsyncLoadHandler2::async_eb_asyncNoop(
42     std::unique_ptr<HandlerCallback<void>> callback) {
43   callback->done();
44 }
45 
async_eb_sleep(std::unique_ptr<HandlerCallback<void>> callback,int64_t microseconds)46 void AsyncLoadHandler2::async_eb_sleep(
47     std::unique_ptr<HandlerCallback<void>> callback, int64_t microseconds) {
48   // May leak if task never finishes
49   HandlerCallback<void>* callbackp = callback.release();
50   callbackp->getEventBase()->runInEventBaseThread([=]() {
51     callbackp->getEventBase()->tryRunAfterDelay(
52         [=]() {
53           std::unique_ptr<HandlerCallback<void>> cb(callbackp);
54           cb->done();
55         },
56         microseconds / Util::US_PER_MS);
57   });
58 }
59 
async_eb_onewaySleep(std::unique_ptr<HandlerCallbackBase> callback,int64_t microseconds)60 void AsyncLoadHandler2::async_eb_onewaySleep(
61     std::unique_ptr<HandlerCallbackBase> callback, int64_t microseconds) {
62   auto callbackp = callback.release();
63   // May leak if task never finishes
64   auto eb = callbackp->getEventBase();
65   eb->runInEventBaseThread([=]() {
66     eb->tryRunAfterDelay(
67         [=]() { delete callbackp; }, microseconds / Util::US_PER_MS);
68   });
69 }
70 
sync_burn(int64_t microseconds)71 void AsyncLoadHandler2::sync_burn(int64_t microseconds) {
72   // Slightly different from thrift1, this happens in a
73   // thread pool.
74   auto start = std::chrono::steady_clock::now();
75   auto end = start + std::chrono::microseconds(microseconds);
76   auto now = start;
77   do {
78     now = std::chrono::steady_clock::now();
79   } while (now < end);
80 }
81 
future_burn(int64_t microseconds)82 folly::Future<folly::Unit> AsyncLoadHandler2::future_burn(
83     int64_t microseconds) {
84   folly::Promise<folly::Unit> promise;
85   auto future = promise.getFuture();
86 
87   sync_burn(microseconds);
88   promise.setValue();
89 
90   return future;
91 }
92 
sync_onewayBurn(int64_t microseconds)93 void AsyncLoadHandler2::sync_onewayBurn(int64_t microseconds) {
94   sync_burn(microseconds);
95 }
96 
future_onewayBurn(int64_t microseconds)97 folly::Future<folly::Unit> AsyncLoadHandler2::future_onewayBurn(
98     int64_t microseconds) {
99   folly::Promise<folly::Unit> promise;
100   auto future = promise.getFuture();
101 
102   sync_onewayBurn(microseconds);
103   promise.setValue();
104 
105   return future;
106 }
107 
async_eb_badSleep(std::unique_ptr<HandlerCallback<void>> callback,int64_t microseconds)108 void AsyncLoadHandler2::async_eb_badSleep(
109     std::unique_ptr<HandlerCallback<void>> callback, int64_t microseconds) {
110   usleep(microseconds);
111   callback->done();
112 }
113 
async_eb_badBurn(std::unique_ptr<HandlerCallback<void>> callback,int64_t microseconds)114 void AsyncLoadHandler2::async_eb_badBurn(
115     std::unique_ptr<HandlerCallback<void>> callback, int64_t microseconds) {
116   // This is a true (bad) async call.
117   sync_burn(microseconds);
118   callback->done();
119 }
120 
async_eb_throwError(std::unique_ptr<HandlerCallback<void>> callback,int32_t code)121 void AsyncLoadHandler2::async_eb_throwError(
122     std::unique_ptr<HandlerCallback<void>> callback, int32_t code) {
123   LoadError error;
124   *error.code_ref() = code;
125   callback->exception(error);
126 }
127 
async_eb_throwUnexpected(std::unique_ptr<HandlerCallback<void>> callback,int32_t)128 void AsyncLoadHandler2::async_eb_throwUnexpected(
129     std::unique_ptr<HandlerCallback<void>> callback, int32_t /* code */) {
130   // FIXME: it isn't possible to implement this behavior with the async code
131   //
132   // Actually throwing an exception from the handler is bad, and EventBase
133   // should probably be changed to fatal the entire program if that happens.
134   callback->done();
135 }
136 
async_eb_onewayThrow(std::unique_ptr<HandlerCallbackBase> callback,int32_t code)137 void AsyncLoadHandler2::async_eb_onewayThrow(
138     std::unique_ptr<HandlerCallbackBase> callback, int32_t code) {
139   LoadError error;
140   *error.code_ref() = code;
141   callback->exception(error);
142 }
143 
async_eb_send(std::unique_ptr<HandlerCallback<void>> callback,std::unique_ptr<std::string>)144 void AsyncLoadHandler2::async_eb_send(
145     std::unique_ptr<HandlerCallback<void>> callback,
146     std::unique_ptr<std::string> /* data */) {
147   callback->done();
148 }
149 
async_eb_onewaySend(std::unique_ptr<HandlerCallbackBase>,std::unique_ptr<std::string>)150 void AsyncLoadHandler2::async_eb_onewaySend(
151     std::unique_ptr<HandlerCallbackBase> /* callback */,
152     std::unique_ptr<std::string> /* data */) {}
153 
async_eb_recv(std::unique_ptr<HandlerCallback<std::unique_ptr<std::string>>> callback,int64_t bytes)154 void AsyncLoadHandler2::async_eb_recv(
155     std::unique_ptr<HandlerCallback<std::unique_ptr<std::string>>> callback,
156     int64_t bytes) {
157   std::unique_ptr<std::string> ret(new std::string(bytes, 'a'));
158   callback->result(std::move(ret));
159 }
160 
async_eb_sendrecv(std::unique_ptr<HandlerCallback<std::unique_ptr<std::string>>> callback,std::unique_ptr<std::string>,int64_t recvBytes)161 void AsyncLoadHandler2::async_eb_sendrecv(
162     std::unique_ptr<HandlerCallback<std::unique_ptr<std::string>>> callback,
163     std::unique_ptr<std::string> /* data */,
164     int64_t recvBytes) {
165   std::unique_ptr<std::string> ret(new std::string(recvBytes, 'a'));
166   callback->result(std::move(ret));
167 }
168 
sync_echo(std::string & output,std::unique_ptr<std::string> data)169 void AsyncLoadHandler2::sync_echo(
170     // Slightly different from thrift1, this happens in a
171     // thread pool.
172     std::string& output,
173     std::unique_ptr<std::string> data) {
174   output = std::move(*data);
175 }
176 
future_echo(std::unique_ptr<std::string> data)177 folly::Future<std::unique_ptr<std::string>> AsyncLoadHandler2::future_echo(
178     std::unique_ptr<std::string> data) {
179   folly::Promise<std::unique_ptr<std::string>> promise;
180   auto future = promise.getFuture();
181 
182   folly::via(folly::RequestEventBase::get())
183       .thenValue([this, promise = std::move(promise), data = std::move(data)](
184                      auto&&) mutable {
185         std::string output;
186         sync_echo(output, std::move(data));
187         promise.setValue(std::make_unique<std::string>(std::move(output)));
188       });
189 
190   return future;
191 }
192 
async_eb_largeContainer(std::unique_ptr<HandlerCallback<void>> callback,std::unique_ptr<std::vector<BigStruct>>)193 void AsyncLoadHandler2::async_eb_largeContainer(
194     std::unique_ptr<HandlerCallback<void>> callback,
195     std::unique_ptr<std::vector<BigStruct>>) {
196   callback->done();
197 }
198 
async_eb_iterAllFields(std::unique_ptr<HandlerCallback<std::unique_ptr<std::vector<BigStruct>>>> callback,std::unique_ptr<std::vector<BigStruct>> items)199 void AsyncLoadHandler2::async_eb_iterAllFields(
200     std::unique_ptr<HandlerCallback<std::unique_ptr<std::vector<BigStruct>>>>
201         callback,
202     std::unique_ptr<std::vector<BigStruct>> items) {
203   std::string x;
204   for (auto& item : *items) {
205     x = *item.stringField_ref();
206     for (auto& i : *item.stringList_ref()) {
207       x = i;
208     }
209   }
210   callback->result(std::move(items));
211 }
212 
async_eb_add(std::unique_ptr<HandlerCallback<int64_t>> callback,int64_t a,int64_t b)213 void AsyncLoadHandler2::async_eb_add(
214     std::unique_ptr<HandlerCallback<int64_t>> callback, int64_t a, int64_t b) {
215   callback->result(a + b);
216 }
217 
218 } // namespace thrift
219 } // namespace apache
220