1 /*
2 Copyright 2019 Google LLC
3 
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7 
8     https://www.apache.org/licenses/LICENSE-2.0
9 
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16 
17 #ifndef APIB_IOTHREAD_H
18 #define APIB_IOTHREAD_H
19 
20 #include <openssl/ssl.h>
21 
22 #include <atomic>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <thread>
27 #include <vector>
28 
29 #include "apib/apib_commandqueue.h"
30 #include "apib/apib_lines.h"
31 #include "apib/apib_oauth.h"
32 #include "apib/apib_rand.h"
33 #include "apib/apib_url.h"
34 #include "apib/socket.h"
35 #include "apib/tlssocket.h"
36 #include "ev.h"
37 #include "http_parser.h"
38 
39 namespace apib {
40 
41 // Constants used to keep track of which headers were
42 // already set
43 
44 class ConnectionState;
45 class Counters;
46 
47 // This structure represents a single thread that runs a benchmark
48 // across multiple connections.
49 class IOThread {
50  public:
51   // The caller must set these directly to configure the thread
52   int index = 0;
53   int numConnections = 0;
54   bool verbose = false;
55   std::string httpVerb;
56   std::string sslCipher;
57   std::string sendData;
58   SSL_CTX* sslCtx = nullptr;
59   OAuthInfo* oauth = nullptr;
60   std::vector<std::string>* headers = nullptr;
61   int headersSet = 0;
62   unsigned int thinkTime = 0;
63   int noKeepAlive = 0;
64   int keepRunning = 0;
65   // Everything ABOVE must be initialized.
66 
67   // Constants for "headersSet"
68   static constexpr int kHostSet = (1 << 0);
69   static constexpr int kContentLengthSet = (1 << 1);
70   static constexpr int kContentTypeSet = (1 << 2);
71   static constexpr int kAuthorizationSet = (1 << 3);
72   static constexpr int kConnectionSet = (1 << 4);
73   static constexpr int kUserAgentSet = (1 << 5);
74 
75   IOThread();
76   ~IOThread();
77 
78   // Start the thread. It's up to the caller to initialize everything
79   // in the structure above. This call will spawn a thread, and keep
80   // running until "Stop" is called.
81   void Start();
82 
83   // Stop the thread. It will signal for a stop, and then stop
84   // more forcefully after "timeoutSecs" seconds
85   void RequestStop(int timeoutSecs);
86 
87   // Wait for the thread to exit cleanly.
88   void Join();
89 
90   // Convenience that stops and joins all at once with a one-second timeout
91   void Stop();
92 
93   // Change the number of connections. This will happen as part of normal
94   // processing, with unneeded connections shutting down when done
95   // with their current requests.
96   void SetNumConnections(int newConnections);
97 
loop()98   struct ev_loop* loop() {
99     return loop_;
100   }
threadIndex()101   int threadIndex() { return index; }
parserSettings()102   http_parser_settings* parserSettings() { return &parserSettings_; }
shouldKeepRunning()103   bool shouldKeepRunning() { return keepRunning; }
rand()104   RandomGenerator* rand() { return &rand_; }
105 
106   void recordRead(size_t c);
107   void recordWrite(size_t c);
108   void recordResult(int statusCode, int64_t latency);
109 
110   // Swap the current set of performance counters and start new ones.
111   // The caller must free the result.
112   Counters* exchangeCounters();
113 
114   // A utility function to print out the back ends for Libev
115   static std::string GetEvBackends(int mask);
116 
117  private:
118   // We will manually choose "select", if available, if the number
119   // if connections in this thread is below this limit. This is faster.
120   static constexpr int kMaxSelectFds = 100;
121 
122   void threadLoop();
123   void threadLoopBody();
124   static void initializeParser();
125   static void processCommands(struct ev_loop* loop, ev_async* a, int revents);
126   static void hardShutdown(struct ev_loop* loop, ev_timer* timer, int revents);
127   void setNumConnections(size_t newVal);
getCounters()128   Counters* getCounters() {
129     return reinterpret_cast<Counters*>(counterPtr_.load());
130   }
131 
132   static http_parser_settings parserSettings_;
133 
134   std::vector<ConnectionState*> connections_;
135   std::thread* thread_ = nullptr;
136   RandomGenerator rand_;
137   struct ev_loop* loop_ = nullptr;
138   ev_async async_;
139   CommandQueue commands_;
140   ev_timer shutdownTimer_;
141   std::atomic_uintptr_t counterPtr_;
142 };
143 
144 // This is an internal class used per connection.
145 class ConnectionState {
146  public:
147   ConnectionState(int index, IOThread* t);
148   ~ConnectionState();
149 
150   // Called when asynchronous I/O completes
151   void WriteDone(int err);
152   void ReadDone(int err);
153   void CloseDone();
154 
155   // Connect in a non-blocking way, and return non-zero on error.
156   int Connect();
157   void ConnectAndSend();
158   int StartConnect();
159 
160   // Write what's in "sendBuf" to the socket, and call io_WriteDone when done.
161   void SendWrite();
162 
163   // Read the whole HTTP response and call "io_ReadDone" when done.
164   void SendRead();
165 
166   // Do what it says on the tin, and call "CloseDone" when done.
167   void Close();
168 
169   // Reset internal state so that the connection can be opened again
170   void Reset();
171 
index()172   int index() const { return index_; }
stopRunning()173   void stopRunning() { keepRunning_ = 0; }
174   static int httpComplete(http_parser* p);
175 
176  private:
177   // The size of the buffer to read from when calling read()
178   // or SSL_read()
179   static constexpr int kReadBufSize = 8192;
180   // In the event that connecting a socket fails, we will wait
181   // for this time, in seconds, before trying again.
182   // Nevertheless, if this ever gets used then the benchmark
183   // is pretty much ruined anyway...
184   static constexpr double kConnectFailureDelay = 0.25;
185 
186   void addThinkTime();
187   void sendAfterDelay(double seconds);
188   void recycle(bool closeConn);
189   void writeRequest();
190 
191   int singleRead(struct ev_loop* loop, ev_io* w, int revents);
192   int singleWrite(struct ev_loop* loop, ev_io* w, int revents);
193 
194   static void completeShutdown(struct ev_loop* loop, ev_io* w, int revents);
195   static void readReady(struct ev_loop* loop, ev_io* w, int revents);
196   static void writeReady(struct ev_loop* loop, ev_io* w, int revents);
197   static void thinkingDone(struct ev_loop* loop, ev_timer* t, int revents);
198 
199   const int index_ = 0;
200   bool keepRunning_ = 0;
201   std::unique_ptr<Socket> socket_;
202   IOThread* t_ = nullptr;
203   bool backwardsIo_ = false;
204   ev_io io_;
205   ev_timer thinkTimer_;
206   URLInfo* url_ = nullptr;
207   bool writeDirty_ = true;
208   std::ostringstream writeBuf_;
209   std::string fullWrite_;
210   size_t fullWritePos_ = 0;
211   char* readBuf_ = nullptr;
212   size_t readBufPos_ = 0;
213   http_parser parser_;
214   bool readDone_ = false;
215   bool needsOpen_ = false;
216   long long startTime_ = 0LL;
217 };
218 
219 // A typedef used to clean up some messy interfaces
220 typedef std::vector<std::unique_ptr<IOThread>> ThreadList;
221 
222 // Debugging macro
223 #define io_Verbose(c, ...) \
224   if ((c)->t_->verbose) {  \
225     printf(__VA_ARGS__);   \
226   }
227 
228 #define iothread_Verbose(t, ...) \
229   if ((t)->verbose) {            \
230     printf(__VA_ARGS__);         \
231   }
232 
233 }  // namespace apib
234 
235 #endif  // APIB_IOTHREAD_H