1 /*
2  * nghttp2 - HTTP/2 C Library
3  *
4  * Copyright (c) 2015 British Broadcasting Corporation
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining
7  * a copy of this software and associated documentation files (the
8  * "Software"), to deal in the Software without restriction, including
9  * without limitation the rights to use, copy, modify, merge, publish,
10  * distribute, sublicense, and/or sell copies of the Software, and to
11  * permit persons to whom the Software is furnished to do so, subject to
12  * the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be
15  * included in all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
21  * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
22  * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
23  * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
24  */
25 #include "h2load_http1_session.h"
26 
27 #include <cassert>
28 #include <cerrno>
29 
30 #include "h2load.h"
31 #include "util.h"
32 #include "template.h"
33 
34 #include <iostream>
35 #include <fstream>
36 
37 using namespace nghttp2;
38 
39 namespace h2load {
40 
41 namespace {
42 // HTTP response message begin
htp_msg_begincb(llhttp_t * htp)43 int htp_msg_begincb(llhttp_t *htp) {
44   auto session = static_cast<Http1Session *>(htp->data);
45 
46   if (session->stream_resp_counter_ > session->stream_req_counter_) {
47     return -1;
48   }
49 
50   return 0;
51 }
52 } // namespace
53 
54 namespace {
55 // HTTP response status code
htp_statuscb(llhttp_t * htp,const char * at,size_t length)56 int htp_statuscb(llhttp_t *htp, const char *at, size_t length) {
57   auto session = static_cast<Http1Session *>(htp->data);
58   auto client = session->get_client();
59 
60   if (htp->status_code / 100 == 1) {
61     return 0;
62   }
63 
64   client->on_status_code(session->stream_resp_counter_, htp->status_code);
65 
66   return 0;
67 }
68 } // namespace
69 
70 namespace {
71 // HTTP response message complete
htp_msg_completecb(llhttp_t * htp)72 int htp_msg_completecb(llhttp_t *htp) {
73   auto session = static_cast<Http1Session *>(htp->data);
74   auto client = session->get_client();
75 
76   if (htp->status_code / 100 == 1) {
77     return 0;
78   }
79 
80   client->final = llhttp_should_keep_alive(htp) == 0;
81   auto req_stat = client->get_req_stat(session->stream_resp_counter_);
82 
83   assert(req_stat);
84 
85   auto config = client->worker->config;
86   if (req_stat->data_offset >= config->data_length) {
87     client->on_stream_close(session->stream_resp_counter_, true, client->final);
88   }
89 
90   session->stream_resp_counter_ += 2;
91 
92   if (client->final) {
93     session->stream_req_counter_ = session->stream_resp_counter_;
94 
95     // Connection is going down.  If we have still request to do,
96     // create new connection and keep on doing the job.
97     if (client->req_left) {
98       client->try_new_connection();
99     }
100 
101     return HPE_PAUSED;
102   }
103 
104   return 0;
105 }
106 } // namespace
107 
108 namespace {
htp_hdr_keycb(llhttp_t * htp,const char * data,size_t len)109 int htp_hdr_keycb(llhttp_t *htp, const char *data, size_t len) {
110   auto session = static_cast<Http1Session *>(htp->data);
111   auto client = session->get_client();
112 
113   client->worker->stats.bytes_head += len;
114   client->worker->stats.bytes_head_decomp += len;
115   return 0;
116 }
117 } // namespace
118 
119 namespace {
htp_hdr_valcb(llhttp_t * htp,const char * data,size_t len)120 int htp_hdr_valcb(llhttp_t *htp, const char *data, size_t len) {
121   auto session = static_cast<Http1Session *>(htp->data);
122   auto client = session->get_client();
123 
124   client->worker->stats.bytes_head += len;
125   client->worker->stats.bytes_head_decomp += len;
126   return 0;
127 }
128 } // namespace
129 
130 namespace {
htp_hdrs_completecb(llhttp_t * htp)131 int htp_hdrs_completecb(llhttp_t *htp) {
132   return !http2::expect_response_body(htp->status_code);
133 }
134 } // namespace
135 
136 namespace {
htp_body_cb(llhttp_t * htp,const char * data,size_t len)137 int htp_body_cb(llhttp_t *htp, const char *data, size_t len) {
138   auto session = static_cast<Http1Session *>(htp->data);
139   auto client = session->get_client();
140 
141   client->record_ttfb();
142   client->worker->stats.bytes_body += len;
143 
144   return 0;
145 }
146 } // namespace
147 
148 namespace {
149 constexpr llhttp_settings_t htp_hooks = {
150     htp_msg_begincb,     // llhttp_cb      on_message_begin;
151     nullptr,             // llhttp_data_cb on_url;
152     htp_statuscb,        // llhttp_data_cb on_status;
153     htp_hdr_keycb,       // llhttp_data_cb on_header_field;
154     htp_hdr_valcb,       // llhttp_data_cb on_header_value;
155     htp_hdrs_completecb, // llhttp_cb      on_headers_complete;
156     htp_body_cb,         // llhttp_data_cb on_body;
157     htp_msg_completecb,  // llhttp_cb      on_message_complete;
158     nullptr,             // llhttp_cb      on_chunk_header
159     nullptr,             // llhttp_cb      on_chunk_complete
160 };
161 } // namespace
162 
Http1Session(Client * client)163 Http1Session::Http1Session(Client *client)
164     : stream_req_counter_(1),
165       stream_resp_counter_(1),
166       client_(client),
167       htp_(),
168       complete_(false) {
169   llhttp_init(&htp_, HTTP_RESPONSE, &htp_hooks);
170   htp_.data = this;
171 }
172 
~Http1Session()173 Http1Session::~Http1Session() {}
174 
on_connect()175 void Http1Session::on_connect() { client_->signal_write(); }
176 
submit_request()177 int Http1Session::submit_request() {
178   auto config = client_->worker->config;
179   const auto &req = config->h1reqs[client_->reqidx];
180   client_->reqidx++;
181 
182   if (client_->reqidx == config->h1reqs.size()) {
183     client_->reqidx = 0;
184   }
185 
186   client_->on_request(stream_req_counter_);
187 
188   auto req_stat = client_->get_req_stat(stream_req_counter_);
189 
190   client_->record_request_time(req_stat);
191   client_->wb.append(req);
192 
193   if (config->data_fd == -1 || config->data_length == 0) {
194     // increment for next request
195     stream_req_counter_ += 2;
196 
197     return 0;
198   }
199 
200   return on_write();
201 }
202 
on_read(const uint8_t * data,size_t len)203 int Http1Session::on_read(const uint8_t *data, size_t len) {
204   auto htperr =
205       llhttp_execute(&htp_, reinterpret_cast<const char *>(data), len);
206   auto nread = htperr == HPE_OK
207                    ? len
208                    : static_cast<size_t>(reinterpret_cast<const uint8_t *>(
209                                              llhttp_get_error_pos(&htp_)) -
210                                          data);
211 
212   if (client_->worker->config->verbose) {
213     std::cout.write(reinterpret_cast<const char *>(data), nread);
214   }
215 
216   if (htperr == HPE_PAUSED) {
217     // pause is done only when connection: close is requested
218     return -1;
219   }
220 
221   if (htperr != HPE_OK) {
222     std::cerr << "[ERROR] HTTP parse error: "
223               << "(" << llhttp_errno_name(htperr) << ") "
224               << llhttp_get_error_reason(&htp_) << std::endl;
225     return -1;
226   }
227 
228   return 0;
229 }
230 
on_write()231 int Http1Session::on_write() {
232   if (complete_) {
233     return -1;
234   }
235 
236   auto config = client_->worker->config;
237   auto req_stat = client_->get_req_stat(stream_req_counter_);
238   if (!req_stat) {
239     return 0;
240   }
241 
242   if (req_stat->data_offset < config->data_length) {
243     auto req_stat = client_->get_req_stat(stream_req_counter_);
244     auto &wb = client_->wb;
245 
246     // TODO unfortunately, wb has no interface to use with read(2)
247     // family functions.
248     std::array<uint8_t, 16_k> buf;
249 
250     ssize_t nread;
251     while ((nread = pread(config->data_fd, buf.data(), buf.size(),
252                           req_stat->data_offset)) == -1 &&
253            errno == EINTR)
254       ;
255 
256     if (nread == -1) {
257       return -1;
258     }
259 
260     req_stat->data_offset += nread;
261 
262     wb.append(buf.data(), nread);
263 
264     if (client_->worker->config->verbose) {
265       std::cout << "[send " << nread << " byte(s)]" << std::endl;
266     }
267 
268     if (req_stat->data_offset == config->data_length) {
269       // increment for next request
270       stream_req_counter_ += 2;
271 
272       if (stream_resp_counter_ == stream_req_counter_) {
273         // Response has already been received
274         client_->on_stream_close(stream_resp_counter_ - 2, true,
275                                  client_->final);
276       }
277     }
278   }
279 
280   return 0;
281 }
282 
terminate()283 void Http1Session::terminate() { complete_ = true; }
284 
get_client()285 Client *Http1Session::get_client() { return client_; }
286 
max_concurrent_streams()287 size_t Http1Session::max_concurrent_streams() {
288   auto config = client_->worker->config;
289 
290   return config->data_fd == -1 ? config->max_concurrent_streams : 1;
291 }
292 
293 } // namespace h2load
294