1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #pragma once
18 
19 #include <folly/ExceptionWrapper.h>
20 #include <folly/SocketAddress.h>
21 #include <folly/io/IOBufQueue.h>
22 #include <folly/io/async/AsyncServerSocket.h>
23 #include <folly/io/async/AsyncSocket.h>
24 #include <folly/io/async/EventBase.h>
25 
26 namespace folly {
27 
28 class ZeroCopyTestAsyncSocket {
29  public:
ZeroCopyTestAsyncSocket(size_t * counter,folly::EventBase * evb,int numLoops,size_t bufferSize,bool zeroCopy)30   explicit ZeroCopyTestAsyncSocket(
31       size_t* counter,
32       folly::EventBase* evb,
33       int numLoops,
34       size_t bufferSize,
35       bool zeroCopy)
36       : counter_(counter),
37         evb_(evb),
38         numLoops_(numLoops),
39         sock_(new folly::AsyncSocket(evb)),
40         callback_(this),
41         client_(true) {
42     setBufferSize(bufferSize);
43     setZeroCopy(zeroCopy);
44   }
45 
ZeroCopyTestAsyncSocket(size_t * counter,folly::EventBase * evb,NetworkSocket fd,int numLoops,size_t bufferSize,bool zeroCopy)46   explicit ZeroCopyTestAsyncSocket(
47       size_t* counter,
48       folly::EventBase* evb,
49       NetworkSocket fd,
50       int numLoops,
51       size_t bufferSize,
52       bool zeroCopy)
53       : counter_(counter),
54         evb_(evb),
55         numLoops_(numLoops),
56         sock_(new folly::AsyncSocket(evb, fd)),
57         callback_(this),
58         client_(false) {
59     setBufferSize(bufferSize);
60     setZeroCopy(zeroCopy);
61     // enable reads
62     if (sock_) {
63       sock_->setReadCB(&callback_);
64     }
65   }
66 
~ZeroCopyTestAsyncSocket()67   ~ZeroCopyTestAsyncSocket() { clearBuffers(); }
68 
connect(const folly::SocketAddress & remote)69   void connect(const folly::SocketAddress& remote) {
70     if (sock_) {
71       sock_->connect(&callback_, remote);
72     }
73   }
74 
isZeroCopyWriteInProgress()75   bool isZeroCopyWriteInProgress() const {
76     return sock_->isZeroCopyWriteInProgress();
77   }
78 
79  private:
setZeroCopy(bool enable)80   void setZeroCopy(bool enable) {
81     zeroCopy_ = enable;
82     if (sock_) {
83       sock_->setZeroCopy(zeroCopy_);
84     }
85   }
86 
setBufferSize(size_t bufferSize)87   void setBufferSize(size_t bufferSize) {
88     clearBuffers();
89     bufferSize_ = bufferSize;
90 
91     readBuffer_ = new char[bufferSize_];
92   }
93 
94   class Callback : public folly::AsyncSocket::ReadCallback,
95                    public folly::AsyncSocket::ConnectCallback {
96    public:
Callback(ZeroCopyTestAsyncSocket * parent)97     explicit Callback(ZeroCopyTestAsyncSocket* parent) : parent_(parent) {}
98 
connectSuccess()99     void connectSuccess() noexcept override {
100       parent_->sock_->setReadCB(this);
101       parent_->onConnected();
102     }
103 
connectErr(const folly::AsyncSocketException & ex)104     void connectErr(const folly::AsyncSocketException& ex) noexcept override {
105       LOG(ERROR) << "Connect error: " << ex.what();
106       parent_->onDataFinish(folly::exception_wrapper(ex));
107     }
108 
getReadBuffer(void ** bufReturn,size_t * lenReturn)109     void getReadBuffer(void** bufReturn, size_t* lenReturn) override {
110       parent_->getReadBuffer(bufReturn, lenReturn);
111     }
112 
readDataAvailable(size_t len)113     void readDataAvailable(size_t len) noexcept override {
114       parent_->readDataAvailable(len);
115     }
116 
readEOF()117     void readEOF() noexcept override {
118       parent_->onDataFinish(folly::exception_wrapper());
119     }
120 
readErr(const folly::AsyncSocketException & ex)121     void readErr(const folly::AsyncSocketException& ex) noexcept override {
122       parent_->onDataFinish(folly::exception_wrapper(ex));
123     }
124 
125    private:
126     ZeroCopyTestAsyncSocket* parent_{nullptr};
127   };
128 
clearBuffers()129   void clearBuffers() {
130     if (readBuffer_) {
131       delete[] readBuffer_;
132     }
133   }
134 
getReadBuffer(void ** bufReturn,size_t * lenReturn)135   void getReadBuffer(void** bufReturn, size_t* lenReturn) {
136     *bufReturn = readBuffer_ + readOffset_;
137     *lenReturn = bufferSize_ - readOffset_;
138   }
139 
readDataAvailable(size_t len)140   void readDataAvailable(size_t len) noexcept {
141     readOffset_ += len;
142     if (readOffset_ == bufferSize_) {
143       readOffset_ = 0;
144       onDataReady();
145     }
146   }
147 
onConnected()148   void onConnected() {
149     setZeroCopy(zeroCopy_);
150     writeBuffer();
151   }
152 
onDataReady()153   void onDataReady() {
154     currLoop_++;
155     if (client_ && currLoop_ >= numLoops_) {
156       evb_->runInLoop(
157           [this] {
158             if (counter_ && (0 == --(*counter_))) {
159               evb_->terminateLoopSoon();
160             }
161           },
162           false /*thisIteration*/);
163       return;
164     }
165     writeBuffer();
166   }
167 
onDataFinish(folly::exception_wrapper)168   void onDataFinish(folly::exception_wrapper) {
169     if (client_) {
170       if (counter_ && (0 == --(*counter_))) {
171         evb_->terminateLoopSoon();
172       }
173     }
174   }
175 
writeBuffer()176   bool writeBuffer() {
177     // use calloc to make sure the memory is touched
178     // if the memory is just malloc'd, running the zeroCopyOn
179     // and the zeroCopyOff back to back on a system that does not support
180     // zerocopy leads to the second test being much slower
181     writeBuffer_ =
182         folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_);
183 
184     if (sock_ && writeBuffer_) {
185       sock_->writeChain(
186           nullptr,
187           std::move(writeBuffer_),
188           zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE);
189     }
190 
191     return true;
192   }
193 
194   size_t* counter_{nullptr};
195   folly::EventBase* evb_;
196   int numLoops_{0};
197   int currLoop_{0};
198   bool zeroCopy_{false};
199 
200   folly::AsyncSocket::UniquePtr sock_;
201   Callback callback_;
202 
203   size_t bufferSize_{0};
204   size_t readOffset_{0};
205   char* readBuffer_{nullptr};
206   std::unique_ptr<folly::IOBuf> writeBuffer_;
207 
208   bool client_;
209 };
210 
211 class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback {
212  public:
ZeroCopyTestServer(folly::EventBase * evb,int numLoops,size_t bufferSize,bool zeroCopy)213   explicit ZeroCopyTestServer(
214       folly::EventBase* evb, int numLoops, size_t bufferSize, bool zeroCopy)
215       : evb_(evb),
216         numLoops_(numLoops),
217         bufferSize_(bufferSize),
218         zeroCopy_(zeroCopy) {}
219 
addCallbackToServerSocket(folly::AsyncServerSocket & sock)220   void addCallbackToServerSocket(folly::AsyncServerSocket& sock) {
221     sock.addAcceptCallback(this, evb_);
222   }
223 
connectionAccepted(folly::NetworkSocket fd,const folly::SocketAddress &,AcceptInfo)224   void connectionAccepted(
225       folly::NetworkSocket fd,
226       const folly::SocketAddress& /* unused */,
227       AcceptInfo /* info */) noexcept override {
228     auto client = std::make_shared<ZeroCopyTestAsyncSocket>(
229         nullptr, evb_, fd, numLoops_, bufferSize_, zeroCopy_);
230     clients_[client.get()] = client;
231   }
232 
acceptError(folly::exception_wrapper)233   void acceptError(folly::exception_wrapper) noexcept override {}
234 
235  private:
236   folly::EventBase* evb_;
237   int numLoops_;
238   size_t bufferSize_;
239   bool zeroCopy_;
240   std::unique_ptr<ZeroCopyTestAsyncSocket> client_;
241   std::unordered_map<
242       ZeroCopyTestAsyncSocket*,
243       std::shared_ptr<ZeroCopyTestAsyncSocket>>
244       clients_;
245 };
246 
247 class ZeroCopyTest {
248  public:
249   explicit ZeroCopyTest(
250       size_t numClients, int numLoops, bool zeroCopy, size_t bufferSize);
251   bool run();
252 
253  private:
connectAll()254   void connectAll() {
255     SocketAddress addr = listenSock_->getAddress();
256     for (auto& client : clients_) {
257       client->connect(addr);
258     }
259   }
260 
261   size_t numClients_;
262   size_t counter_;
263   int numLoops_;
264   bool zeroCopy_;
265   size_t bufferSize_;
266 
267   EventBase evb_;
268   std::vector<std::unique_ptr<ZeroCopyTestAsyncSocket>> clients_;
269   folly::AsyncServerSocket::UniquePtr listenSock_;
270   ZeroCopyTestServer server_;
271 };
272 
273 } // namespace folly
274