1 /* 2 Copyright 2019 Google LLC 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 https://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 #ifndef APIB_IOTHREAD_H 18 #define APIB_IOTHREAD_H 19 20 #include <openssl/ssl.h> 21 22 #include <atomic> 23 #include <memory> 24 #include <sstream> 25 #include <string> 26 #include <thread> 27 #include <vector> 28 29 #include "apib/apib_commandqueue.h" 30 #include "apib/apib_lines.h" 31 #include "apib/apib_oauth.h" 32 #include "apib/apib_rand.h" 33 #include "apib/apib_url.h" 34 #include "apib/socket.h" 35 #include "apib/tlssocket.h" 36 #include "ev.h" 37 #include "http_parser.h" 38 39 namespace apib { 40 41 // Constants used to keep track of which headers were 42 // already set 43 44 class ConnectionState; 45 class Counters; 46 47 // This structure represents a single thread that runs a benchmark 48 // across multiple connections. 49 class IOThread { 50 public: 51 // The caller must set these directly to configure the thread 52 int index = 0; 53 int numConnections = 0; 54 bool verbose = false; 55 std::string httpVerb; 56 std::string sslCipher; 57 std::string sendData; 58 SSL_CTX* sslCtx = nullptr; 59 OAuthInfo* oauth = nullptr; 60 std::vector<std::string>* headers = nullptr; 61 int headersSet = 0; 62 unsigned int thinkTime = 0; 63 int noKeepAlive = 0; 64 int keepRunning = 0; 65 // Everything ABOVE must be initialized. 66 67 // Constants for "headersSet" 68 static constexpr int kHostSet = (1 << 0); 69 static constexpr int kContentLengthSet = (1 << 1); 70 static constexpr int kContentTypeSet = (1 << 2); 71 static constexpr int kAuthorizationSet = (1 << 3); 72 static constexpr int kConnectionSet = (1 << 4); 73 static constexpr int kUserAgentSet = (1 << 5); 74 75 IOThread(); 76 ~IOThread(); 77 78 // Start the thread. It's up to the caller to initialize everything 79 // in the structure above. This call will spawn a thread, and keep 80 // running until "Stop" is called. 81 void Start(); 82 83 // Stop the thread. It will signal for a stop, and then stop 84 // more forcefully after "timeoutSecs" seconds 85 void RequestStop(int timeoutSecs); 86 87 // Wait for the thread to exit cleanly. 88 void Join(); 89 90 // Convenience that stops and joins all at once with a one-second timeout 91 void Stop(); 92 93 // Change the number of connections. This will happen as part of normal 94 // processing, with unneeded connections shutting down when done 95 // with their current requests. 96 void SetNumConnections(int newConnections); 97 loop()98 struct ev_loop* loop() { 99 return loop_; 100 } threadIndex()101 int threadIndex() { return index; } parserSettings()102 http_parser_settings* parserSettings() { return &parserSettings_; } shouldKeepRunning()103 bool shouldKeepRunning() { return keepRunning; } rand()104 RandomGenerator* rand() { return &rand_; } 105 106 void recordRead(size_t c); 107 void recordWrite(size_t c); 108 void recordResult(int statusCode, int64_t latency); 109 110 // Swap the current set of performance counters and start new ones. 111 // The caller must free the result. 112 Counters* exchangeCounters(); 113 114 // A utility function to print out the back ends for Libev 115 static std::string GetEvBackends(int mask); 116 117 private: 118 // We will manually choose "select", if available, if the number 119 // if connections in this thread is below this limit. This is faster. 120 static constexpr int kMaxSelectFds = 100; 121 122 void threadLoop(); 123 void threadLoopBody(); 124 static void initializeParser(); 125 static void processCommands(struct ev_loop* loop, ev_async* a, int revents); 126 static void hardShutdown(struct ev_loop* loop, ev_timer* timer, int revents); 127 void setNumConnections(size_t newVal); getCounters()128 Counters* getCounters() { 129 return reinterpret_cast<Counters*>(counterPtr_.load()); 130 } 131 132 static http_parser_settings parserSettings_; 133 134 std::vector<ConnectionState*> connections_; 135 std::thread* thread_ = nullptr; 136 RandomGenerator rand_; 137 struct ev_loop* loop_ = nullptr; 138 ev_async async_; 139 CommandQueue commands_; 140 ev_timer shutdownTimer_; 141 std::atomic_uintptr_t counterPtr_; 142 }; 143 144 // This is an internal class used per connection. 145 class ConnectionState { 146 public: 147 ConnectionState(int index, IOThread* t); 148 ~ConnectionState(); 149 150 // Called when asynchronous I/O completes 151 void WriteDone(int err); 152 void ReadDone(int err); 153 void CloseDone(); 154 155 // Connect in a non-blocking way, and return non-zero on error. 156 int Connect(); 157 void ConnectAndSend(); 158 int StartConnect(); 159 160 // Write what's in "sendBuf" to the socket, and call io_WriteDone when done. 161 void SendWrite(); 162 163 // Read the whole HTTP response and call "io_ReadDone" when done. 164 void SendRead(); 165 166 // Do what it says on the tin, and call "CloseDone" when done. 167 void Close(); 168 169 // Reset internal state so that the connection can be opened again 170 void Reset(); 171 index()172 int index() const { return index_; } stopRunning()173 void stopRunning() { keepRunning_ = 0; } 174 static int httpComplete(http_parser* p); 175 176 private: 177 // The size of the buffer to read from when calling read() 178 // or SSL_read() 179 static constexpr int kReadBufSize = 8192; 180 // In the event that connecting a socket fails, we will wait 181 // for this time, in seconds, before trying again. 182 // Nevertheless, if this ever gets used then the benchmark 183 // is pretty much ruined anyway... 184 static constexpr double kConnectFailureDelay = 0.25; 185 186 void addThinkTime(); 187 void sendAfterDelay(double seconds); 188 void recycle(bool closeConn); 189 void writeRequest(); 190 191 int singleRead(struct ev_loop* loop, ev_io* w, int revents); 192 int singleWrite(struct ev_loop* loop, ev_io* w, int revents); 193 194 static void completeShutdown(struct ev_loop* loop, ev_io* w, int revents); 195 static void readReady(struct ev_loop* loop, ev_io* w, int revents); 196 static void writeReady(struct ev_loop* loop, ev_io* w, int revents); 197 static void thinkingDone(struct ev_loop* loop, ev_timer* t, int revents); 198 199 const int index_ = 0; 200 bool keepRunning_ = 0; 201 std::unique_ptr<Socket> socket_; 202 IOThread* t_ = nullptr; 203 bool backwardsIo_ = false; 204 ev_io io_; 205 ev_timer thinkTimer_; 206 URLInfo* url_ = nullptr; 207 bool writeDirty_ = true; 208 std::ostringstream writeBuf_; 209 std::string fullWrite_; 210 size_t fullWritePos_ = 0; 211 char* readBuf_ = nullptr; 212 size_t readBufPos_ = 0; 213 http_parser parser_; 214 bool readDone_ = false; 215 bool needsOpen_ = false; 216 long long startTime_ = 0LL; 217 }; 218 219 // A typedef used to clean up some messy interfaces 220 typedef std::vector<std::unique_ptr<IOThread>> ThreadList; 221 222 // Debugging macro 223 #define io_Verbose(c, ...) \ 224 if ((c)->t_->verbose) { \ 225 printf(__VA_ARGS__); \ 226 } 227 228 #define iothread_Verbose(t, ...) \ 229 if ((t)->verbose) { \ 230 printf(__VA_ARGS__); \ 231 } 232 233 } // namespace apib 234 235 #endif // APIB_IOTHREAD_H