1 /* 2 * Copyright (c) Facebook, Inc. and its affiliates. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #pragma once 18 19 #include <folly/ExceptionWrapper.h> 20 #include <folly/SocketAddress.h> 21 #include <folly/io/IOBufQueue.h> 22 #include <folly/io/async/AsyncServerSocket.h> 23 #include <folly/io/async/AsyncSocket.h> 24 #include <folly/io/async/EventBase.h> 25 26 namespace folly { 27 28 class ZeroCopyTestAsyncSocket { 29 public: ZeroCopyTestAsyncSocket(size_t * counter,folly::EventBase * evb,int numLoops,size_t bufferSize,bool zeroCopy)30 explicit ZeroCopyTestAsyncSocket( 31 size_t* counter, 32 folly::EventBase* evb, 33 int numLoops, 34 size_t bufferSize, 35 bool zeroCopy) 36 : counter_(counter), 37 evb_(evb), 38 numLoops_(numLoops), 39 sock_(new folly::AsyncSocket(evb)), 40 callback_(this), 41 client_(true) { 42 setBufferSize(bufferSize); 43 setZeroCopy(zeroCopy); 44 } 45 ZeroCopyTestAsyncSocket(size_t * counter,folly::EventBase * evb,NetworkSocket fd,int numLoops,size_t bufferSize,bool zeroCopy)46 explicit ZeroCopyTestAsyncSocket( 47 size_t* counter, 48 folly::EventBase* evb, 49 NetworkSocket fd, 50 int numLoops, 51 size_t bufferSize, 52 bool zeroCopy) 53 : counter_(counter), 54 evb_(evb), 55 numLoops_(numLoops), 56 sock_(new folly::AsyncSocket(evb, fd)), 57 callback_(this), 58 client_(false) { 59 setBufferSize(bufferSize); 60 setZeroCopy(zeroCopy); 61 // enable reads 62 if (sock_) { 63 sock_->setReadCB(&callback_); 64 } 65 } 66 ~ZeroCopyTestAsyncSocket()67 ~ZeroCopyTestAsyncSocket() { clearBuffers(); } 68 connect(const folly::SocketAddress & remote)69 void connect(const folly::SocketAddress& remote) { 70 if (sock_) { 71 sock_->connect(&callback_, remote); 72 } 73 } 74 isZeroCopyWriteInProgress()75 bool isZeroCopyWriteInProgress() const { 76 return sock_->isZeroCopyWriteInProgress(); 77 } 78 79 private: setZeroCopy(bool enable)80 void setZeroCopy(bool enable) { 81 zeroCopy_ = enable; 82 if (sock_) { 83 sock_->setZeroCopy(zeroCopy_); 84 } 85 } 86 setBufferSize(size_t bufferSize)87 void setBufferSize(size_t bufferSize) { 88 clearBuffers(); 89 bufferSize_ = bufferSize; 90 91 readBuffer_ = new char[bufferSize_]; 92 } 93 94 class Callback : public folly::AsyncSocket::ReadCallback, 95 public folly::AsyncSocket::ConnectCallback { 96 public: Callback(ZeroCopyTestAsyncSocket * parent)97 explicit Callback(ZeroCopyTestAsyncSocket* parent) : parent_(parent) {} 98 connectSuccess()99 void connectSuccess() noexcept override { 100 parent_->sock_->setReadCB(this); 101 parent_->onConnected(); 102 } 103 connectErr(const folly::AsyncSocketException & ex)104 void connectErr(const folly::AsyncSocketException& ex) noexcept override { 105 LOG(ERROR) << "Connect error: " << ex.what(); 106 parent_->onDataFinish(folly::exception_wrapper(ex)); 107 } 108 getReadBuffer(void ** bufReturn,size_t * lenReturn)109 void getReadBuffer(void** bufReturn, size_t* lenReturn) override { 110 parent_->getReadBuffer(bufReturn, lenReturn); 111 } 112 readDataAvailable(size_t len)113 void readDataAvailable(size_t len) noexcept override { 114 parent_->readDataAvailable(len); 115 } 116 readEOF()117 void readEOF() noexcept override { 118 parent_->onDataFinish(folly::exception_wrapper()); 119 } 120 readErr(const folly::AsyncSocketException & ex)121 void readErr(const folly::AsyncSocketException& ex) noexcept override { 122 parent_->onDataFinish(folly::exception_wrapper(ex)); 123 } 124 125 private: 126 ZeroCopyTestAsyncSocket* parent_{nullptr}; 127 }; 128 clearBuffers()129 void clearBuffers() { 130 if (readBuffer_) { 131 delete[] readBuffer_; 132 } 133 } 134 getReadBuffer(void ** bufReturn,size_t * lenReturn)135 void getReadBuffer(void** bufReturn, size_t* lenReturn) { 136 *bufReturn = readBuffer_ + readOffset_; 137 *lenReturn = bufferSize_ - readOffset_; 138 } 139 readDataAvailable(size_t len)140 void readDataAvailable(size_t len) noexcept { 141 readOffset_ += len; 142 if (readOffset_ == bufferSize_) { 143 readOffset_ = 0; 144 onDataReady(); 145 } 146 } 147 onConnected()148 void onConnected() { 149 setZeroCopy(zeroCopy_); 150 writeBuffer(); 151 } 152 onDataReady()153 void onDataReady() { 154 currLoop_++; 155 if (client_ && currLoop_ >= numLoops_) { 156 evb_->runInLoop( 157 [this] { 158 if (counter_ && (0 == --(*counter_))) { 159 evb_->terminateLoopSoon(); 160 } 161 }, 162 false /*thisIteration*/); 163 return; 164 } 165 writeBuffer(); 166 } 167 onDataFinish(folly::exception_wrapper)168 void onDataFinish(folly::exception_wrapper) { 169 if (client_) { 170 if (counter_ && (0 == --(*counter_))) { 171 evb_->terminateLoopSoon(); 172 } 173 } 174 } 175 writeBuffer()176 bool writeBuffer() { 177 // use calloc to make sure the memory is touched 178 // if the memory is just malloc'd, running the zeroCopyOn 179 // and the zeroCopyOff back to back on a system that does not support 180 // zerocopy leads to the second test being much slower 181 writeBuffer_ = 182 folly::IOBuf::takeOwnership(::calloc(1, bufferSize_), bufferSize_); 183 184 if (sock_ && writeBuffer_) { 185 sock_->writeChain( 186 nullptr, 187 std::move(writeBuffer_), 188 zeroCopy_ ? WriteFlags::WRITE_MSG_ZEROCOPY : WriteFlags::NONE); 189 } 190 191 return true; 192 } 193 194 size_t* counter_{nullptr}; 195 folly::EventBase* evb_; 196 int numLoops_{0}; 197 int currLoop_{0}; 198 bool zeroCopy_{false}; 199 200 folly::AsyncSocket::UniquePtr sock_; 201 Callback callback_; 202 203 size_t bufferSize_{0}; 204 size_t readOffset_{0}; 205 char* readBuffer_{nullptr}; 206 std::unique_ptr<folly::IOBuf> writeBuffer_; 207 208 bool client_; 209 }; 210 211 class ZeroCopyTestServer : public folly::AsyncServerSocket::AcceptCallback { 212 public: ZeroCopyTestServer(folly::EventBase * evb,int numLoops,size_t bufferSize,bool zeroCopy)213 explicit ZeroCopyTestServer( 214 folly::EventBase* evb, int numLoops, size_t bufferSize, bool zeroCopy) 215 : evb_(evb), 216 numLoops_(numLoops), 217 bufferSize_(bufferSize), 218 zeroCopy_(zeroCopy) {} 219 addCallbackToServerSocket(folly::AsyncServerSocket & sock)220 void addCallbackToServerSocket(folly::AsyncServerSocket& sock) { 221 sock.addAcceptCallback(this, evb_); 222 } 223 connectionAccepted(folly::NetworkSocket fd,const folly::SocketAddress &,AcceptInfo)224 void connectionAccepted( 225 folly::NetworkSocket fd, 226 const folly::SocketAddress& /* unused */, 227 AcceptInfo /* info */) noexcept override { 228 auto client = std::make_shared<ZeroCopyTestAsyncSocket>( 229 nullptr, evb_, fd, numLoops_, bufferSize_, zeroCopy_); 230 clients_[client.get()] = client; 231 } 232 acceptError(folly::exception_wrapper)233 void acceptError(folly::exception_wrapper) noexcept override {} 234 235 private: 236 folly::EventBase* evb_; 237 int numLoops_; 238 size_t bufferSize_; 239 bool zeroCopy_; 240 std::unique_ptr<ZeroCopyTestAsyncSocket> client_; 241 std::unordered_map< 242 ZeroCopyTestAsyncSocket*, 243 std::shared_ptr<ZeroCopyTestAsyncSocket>> 244 clients_; 245 }; 246 247 class ZeroCopyTest { 248 public: 249 explicit ZeroCopyTest( 250 size_t numClients, int numLoops, bool zeroCopy, size_t bufferSize); 251 bool run(); 252 253 private: connectAll()254 void connectAll() { 255 SocketAddress addr = listenSock_->getAddress(); 256 for (auto& client : clients_) { 257 client->connect(addr); 258 } 259 } 260 261 size_t numClients_; 262 size_t counter_; 263 int numLoops_; 264 bool zeroCopy_; 265 size_t bufferSize_; 266 267 EventBase evb_; 268 std::vector<std::unique_ptr<ZeroCopyTestAsyncSocket>> clients_; 269 folly::AsyncServerSocket::UniquePtr listenSock_; 270 ZeroCopyTestServer server_; 271 }; 272 273 } // namespace folly 274