1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2013 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 #include "internal.h"
18 #include "logging.h"
19 #include "settings.h"
20 #include "http-priv.h"
21 #include "http.h"
22 #include "ctx-log-inl.h"
23 #include "sllist.h"
24 #include <lcbio/ssl.h>
25
26 #define LOGFMT CTX_LOGFMT
27 #define LOGID(req) CTX_LOGID(req->ioctx)
28 using namespace lcb::http;
29
30 #define LOGARGS(req, lvl) req->instance->settings, "http-io", LCB_LOG_##lvl, __FILE__, __LINE__
31
32 void
assign_response_headers(const lcb::htparse::Response & resp)33 Request::assign_response_headers(const lcb::htparse::Response& resp)
34 {
35 response_headers.assign(resp.headers.begin(), resp.headers.end());
36 response_headers_clist.clear();
37
38 std::vector<lcb::htparse::MimeHeader>::const_iterator ii;
39 for (ii = response_headers.begin(); ii != response_headers.end(); ++ii) {
40 response_headers_clist.push_back(ii->key.c_str());
41 response_headers_clist.push_back(ii->value.c_str());
42 }
43 response_headers_clist.push_back(NULL);
44 }
45
46 int
handle_parse_chunked(const char * buf,unsigned nbuf)47 Request::handle_parse_chunked(const char *buf, unsigned nbuf)
48 {
49 int parse_state, oldstate, diff;
50 using lcb::htparse::Parser;
51 lcb::htparse::Response& res = parser->get_cur_response();
52
53 do {
54 const char *rbody;
55 unsigned nused = -1, nbody = -1;
56 oldstate = res.state;
57
58 parse_state = parser->parse_ex(buf, nbuf, &nused, &nbody, &rbody);
59 diff = oldstate ^ parse_state;
60
61 /* Got headers now for the first time */
62 if (diff & Parser::S_HEADER) {
63 assign_response_headers(res);
64 if (res.status >= 300 && res.status <= 400) {
65 const char *redir = res.get_header_value("Location");
66 if (redir != NULL) {
67 pending_redirect.assign(redir);
68 return Parser::S_DONE;
69 }
70 }
71 }
72
73 if (parse_state & Parser::S_ERROR) {
74 /* nothing to do here */
75 return parse_state;
76 }
77
78 if (nbody) {
79 if (chunked) {
80 lcb_RESPHTTP htresp = { 0 };
81 init_resp(&htresp);
82 htresp.body = rbody;
83 htresp.nbody = nbody;
84 htresp.rc = LCB_SUCCESS;
85 passed_data = true;
86 callback(instance, LCB_CALLBACK_HTTP, (const lcb_RESPBASE *)&htresp);
87
88 } else {
89 res.body.append(rbody, nbody);
90 }
91 }
92
93 buf += nused;
94 nbuf -= nused;
95 } while ((parse_state & Parser::S_DONE) == 0 && is_ongoing() && nbuf);
96
97 if ( (parse_state & Parser::S_DONE) && is_ongoing()) {
98 lcb_RESPHTTP resp = { 0 };
99 if (chunked) {
100 buf = NULL;
101 nbuf = 0;
102 } else {
103 buf = res.body.c_str();
104 nbuf = res.body.size();
105 }
106
107 init_resp(&resp);
108 resp.rflags = LCB_RESP_F_FINAL;
109 resp.rc = LCB_SUCCESS;
110 resp.body = buf;
111 resp.nbody = nbuf;
112 passed_data = true;
113 callback(instance, LCB_CALLBACK_HTTP, (const lcb_RESPBASE*)&resp);
114 status |= Request::CBINVOKED;
115 }
116 return parse_state;
117 }
118
119 static void
io_read(lcbio_CTX * ctx,unsigned nr)120 io_read(lcbio_CTX *ctx, unsigned nr)
121 {
122 Request *req = reinterpret_cast<Request*>(lcbio_ctx_data(ctx));
123 lcb_t instance = req->instance;
124 /** this variable set to 0 (in progress), -1 (error), 1 (done) */
125 int rv = 0;
126 lcbio_CTXRDITER iter;
127 req->incref();
128
129 /** Delay the timer */
130 lcbio_timer_rearm(req->timer, req->timeout());
131
132 LCBIO_CTX_ITERFOR(ctx, &iter, nr) {
133 char *buf;
134 unsigned nbuf;
135 int parse_state;
136
137 buf = reinterpret_cast<char*>(lcbio_ctx_ribuf(&iter));
138 nbuf = lcbio_ctx_risize(&iter);
139 parse_state = req->handle_parse_chunked(buf, nbuf);
140
141 if ((parse_state & lcb::htparse::Parser::S_ERROR) ||
142 req->has_pending_redirect()) {
143 rv = -1;
144 break;
145 } else if (!req->is_ongoing()) {
146 rv = 1;
147 break;
148 }
149 }
150
151 if (rv == -1) {
152 // parse error or redirect
153 lcb_error_t err;
154 if (req->has_pending_redirect()) {
155 instance->bootstrap(lcb::BS_REFRESH_THROTTLE);
156 // Transfer control to redirect function()
157 lcb_log(LOGARGS(req, DEBUG), LOGFMT "Attempting redirect to %s", LOGID(req), req->pending_redirect.c_str());
158 req->redirect();
159 } else {
160 err = LCB_PROTOCOL_ERROR;
161 lcb_log(LOGARGS(req, ERR), LOGFMT "Got parser error while parsing HTTP stream", LOGID(req));
162 req->finish_or_retry(err);
163 }
164 } else if (rv == 1) {
165 // Complete
166 req->finish(LCB_SUCCESS);
167 } else {
168 // Pending
169 lcbio_ctx_rwant(ctx, req->paused ? 0 : 1);
170 lcbio_ctx_schedule(ctx);
171 }
172
173 req->decref();
174 }
175
176 void
pause()177 Request::pause()
178 {
179 if (!paused) {
180 paused = true;
181 if (ioctx) {
182 lcbio_ctx_rwant(ioctx, 0);
183 lcbio_ctx_schedule(ioctx);
184 }
185 }
186 }
187
188 void
resume()189 Request::resume()
190 {
191 if (!paused) {
192 return;
193 }
194
195 if (ioctx == NULL) {
196 return;
197 }
198 paused = false;
199 lcbio_ctx_rwant(ioctx, 1);
200 lcbio_ctx_schedule(ioctx);
201 }
202
203 static void
io_error(lcbio_CTX * ctx,lcb_error_t err)204 io_error(lcbio_CTX *ctx, lcb_error_t err)
205 {
206 Request *req = reinterpret_cast<Request*>(lcbio_ctx_data(ctx));
207 lcb_log(LOGARGS(req, ERR), LOGFMT "Got error while performing I/O on HTTP stream. Err=0x%x", LOGID(req), err);
208 req->finish_or_retry(err);
209 }
210
211 static void
request_timed_out(void * arg)212 request_timed_out(void *arg)
213 {
214 Request *req = reinterpret_cast<Request*>(arg);
215 (req)->finish(LCB_ETIMEDOUT);
216 }
217
218 static void
on_connected(lcbio_SOCKET * sock,void * arg,lcb_error_t err,lcbio_OSERR syserr)219 on_connected(lcbio_SOCKET *sock, void *arg, lcb_error_t err, lcbio_OSERR syserr)
220 {
221 Request *req = reinterpret_cast<Request*>(arg);
222 lcbio_CTXPROCS procs;
223 lcb_settings *settings = req->instance->settings;
224 req->creq = NULL;
225
226 if (err != LCB_SUCCESS) {
227 lcb_log(LOGARGS(req, ERR), "Connection to failed with Err=0x%x", err);
228 req->finish_or_retry(err);
229 return;
230 }
231
232 lcbio_sslify_if_needed(sock, settings);
233
234 procs.cb_err = io_error;
235 procs.cb_read = io_read;
236 req->ioctx = lcbio_ctx_new(sock, arg, &procs);
237 switch (req->reqtype) {
238 case LCB_HTTP_TYPE_N1QL:
239 sock->service = LCBIO_SERVICE_N1QL;
240 break;
241 case LCB_HTTP_TYPE_VIEW:
242 sock->service = LCBIO_SERVICE_VIEW;
243 break;
244 case LCB_HTTP_TYPE_FTS:
245 sock->service = LCBIO_SERVICE_FTS;
246 break;
247 case LCB_HTTP_TYPE_CBAS:
248 sock->service = LCBIO_SERVICE_CBAS;
249 break;
250 default:
251 sock->service = LCBIO_SERVICE_MGMT;
252 break;
253 }
254 req->ioctx->subsys = "mgmt/capi";
255 lcbio_ctx_put(req->ioctx, &req->preamble[0], req->preamble.size());
256 if (!req->body.empty()) {
257 lcbio_ctx_put(req->ioctx, &req->body[0], req->body.size());
258 }
259 lcbio_ctx_rwant(req->ioctx, 1);
260 lcbio_ctx_schedule(req->ioctx);
261 (void)syserr;
262 }
263
264 lcb_error_t
start_io(lcb_host_t & dest)265 Request::start_io(lcb_host_t& dest)
266 {
267 lcbio_MGR *pool = instance->http_sockpool;
268
269 creq = pool->get(dest, timeout(), on_connected, this);
270 if (!creq) {
271 return LCB_CONNECT_ERROR;
272 }
273
274 if (!timer) {
275 timer = lcbio_timer_new(io, this, request_timed_out);
276 }
277
278 if (!lcbio_timer_armed(timer)) {
279 lcbio_timer_rearm(timer, timeout());
280 }
281
282 return LCB_SUCCESS;
283 }
284
285 static void
pool_close_cb(lcbio_SOCKET * sock,int reusable,void * arg)286 pool_close_cb(lcbio_SOCKET *sock, int reusable, void *arg)
287 {
288 int close_ok = *(int *)arg;
289
290 lcbio_ref(sock);
291 if (reusable && close_ok) {
292 lcb::io::Pool::put(sock);
293 } else {
294 lcb::io::Pool::discard(sock);
295 }
296 }
297
298 void
close_io()299 Request::close_io()
300 {
301 lcb::io::ConnectionRequest::cancel(&creq);
302
303 if (!ioctx) {
304 return;
305 }
306
307 int can_ka;
308
309 if (parser && is_data_request()) {
310 can_ka = parser->can_keepalive();
311 } else {
312 can_ka = 0;
313 }
314
315 lcbio_ctx_close(ioctx, pool_close_cb, &can_ka);
316 ioctx = NULL;
317 }
318