1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "internal.h"
18 #include "logging.h"
19 #include "settings.h"
20 #include "http-priv.h"
21 #include "http.h"
22 #include "ctx-log-inl.h"
23 #include "sllist.h"
24 #include <lcbio/ssl.h>
25 
26 #define LOGFMT CTX_LOGFMT
27 #define LOGID(req) CTX_LOGID(req->ioctx)
28 using namespace lcb::http;
29 
30 #define LOGARGS(req, lvl) req->instance->settings, "http-io", LCB_LOG_##lvl, __FILE__, __LINE__
31 
32 void
assign_response_headers(const lcb::htparse::Response & resp)33 Request::assign_response_headers(const lcb::htparse::Response& resp)
34 {
35     response_headers.assign(resp.headers.begin(), resp.headers.end());
36     response_headers_clist.clear();
37 
38     std::vector<lcb::htparse::MimeHeader>::const_iterator ii;
39     for (ii = response_headers.begin(); ii != response_headers.end(); ++ii) {
40         response_headers_clist.push_back(ii->key.c_str());
41         response_headers_clist.push_back(ii->value.c_str());
42     }
43     response_headers_clist.push_back(NULL);
44 }
45 
46 int
handle_parse_chunked(const char * buf,unsigned nbuf)47 Request::handle_parse_chunked(const char *buf, unsigned nbuf)
48 {
49     int parse_state, oldstate, diff;
50     using lcb::htparse::Parser;
51     lcb::htparse::Response& res = parser->get_cur_response();
52 
53     do {
54         const char *rbody;
55         unsigned nused = -1, nbody = -1;
56         oldstate = res.state;
57 
58         parse_state = parser->parse_ex(buf, nbuf, &nused, &nbody, &rbody);
59         diff = oldstate ^ parse_state;
60 
61         /* Got headers now for the first time */
62         if (diff & Parser::S_HEADER) {
63             assign_response_headers(res);
64             if (res.status >=  300 && res.status <= 400) {
65                 const char *redir = res.get_header_value("Location");
66                 if (redir != NULL) {
67                     pending_redirect.assign(redir);
68                     return Parser::S_DONE;
69                 }
70             }
71         }
72 
73         if (parse_state & Parser::S_ERROR) {
74             /* nothing to do here */
75             return parse_state;
76         }
77 
78         if (nbody) {
79             if (chunked) {
80                 lcb_RESPHTTP htresp = { 0 };
81                 init_resp(&htresp);
82                 htresp.body = rbody;
83                 htresp.nbody = nbody;
84                 htresp.rc = LCB_SUCCESS;
85                 passed_data = true;
86                 callback(instance, LCB_CALLBACK_HTTP, (const lcb_RESPBASE *)&htresp);
87 
88             } else {
89                 res.body.append(rbody, nbody);
90             }
91         }
92 
93         buf += nused;
94         nbuf -= nused;
95     } while ((parse_state & Parser::S_DONE) == 0 && is_ongoing() && nbuf);
96 
97     if ( (parse_state & Parser::S_DONE) && is_ongoing()) {
98         lcb_RESPHTTP resp = { 0 };
99         if (chunked) {
100             buf = NULL;
101             nbuf = 0;
102         } else {
103             buf = res.body.c_str();
104             nbuf = res.body.size();
105         }
106 
107         init_resp(&resp);
108         resp.rflags = LCB_RESP_F_FINAL;
109         resp.rc = LCB_SUCCESS;
110         resp.body = buf;
111         resp.nbody = nbuf;
112         passed_data = true;
113         callback(instance, LCB_CALLBACK_HTTP, (const lcb_RESPBASE*)&resp);
114         status |= Request::CBINVOKED;
115     }
116     return parse_state;
117 }
118 
119 static void
io_read(lcbio_CTX * ctx,unsigned nr)120 io_read(lcbio_CTX *ctx, unsigned nr)
121 {
122     Request *req = reinterpret_cast<Request*>(lcbio_ctx_data(ctx));
123     lcb_t instance = req->instance;
124     /** this variable set to 0 (in progress), -1 (error), 1 (done) */
125     int rv = 0;
126     lcbio_CTXRDITER iter;
127     req->incref();
128 
129     /** Delay the timer */
130     lcbio_timer_rearm(req->timer, req->timeout());
131 
132     LCBIO_CTX_ITERFOR(ctx, &iter, nr) {
133         char *buf;
134         unsigned nbuf;
135         int parse_state;
136 
137         buf = reinterpret_cast<char*>(lcbio_ctx_ribuf(&iter));
138         nbuf = lcbio_ctx_risize(&iter);
139         parse_state = req->handle_parse_chunked(buf, nbuf);
140 
141         if ((parse_state & lcb::htparse::Parser::S_ERROR) ||
142                 req->has_pending_redirect()) {
143             rv = -1;
144             break;
145         } else if (!req->is_ongoing()) {
146             rv = 1;
147             break;
148         }
149     }
150 
151     if (rv == -1) {
152         // parse error or redirect
153         lcb_error_t err;
154         if (req->has_pending_redirect()) {
155             instance->bootstrap(lcb::BS_REFRESH_THROTTLE);
156             // Transfer control to redirect function()
157             lcb_log(LOGARGS(req, DEBUG), LOGFMT "Attempting redirect to %s", LOGID(req), req->pending_redirect.c_str());
158             req->redirect();
159         } else {
160             err = LCB_PROTOCOL_ERROR;
161             lcb_log(LOGARGS(req, ERR), LOGFMT "Got parser error while parsing HTTP stream", LOGID(req));
162             req->finish_or_retry(err);
163         }
164     } else if (rv == 1) {
165         // Complete
166         req->finish(LCB_SUCCESS);
167     } else {
168         // Pending
169         lcbio_ctx_rwant(ctx, req->paused ? 0 : 1);
170         lcbio_ctx_schedule(ctx);
171     }
172 
173     req->decref();
174 }
175 
176 void
pause()177 Request::pause()
178 {
179     if (!paused) {
180         paused = true;
181         if (ioctx) {
182             lcbio_ctx_rwant(ioctx, 0);
183             lcbio_ctx_schedule(ioctx);
184         }
185     }
186 }
187 
188 void
resume()189 Request::resume()
190 {
191     if (!paused) {
192         return;
193     }
194 
195     if (ioctx == NULL) {
196         return;
197     }
198     paused = false;
199     lcbio_ctx_rwant(ioctx, 1);
200     lcbio_ctx_schedule(ioctx);
201 }
202 
203 static void
io_error(lcbio_CTX * ctx,lcb_error_t err)204 io_error(lcbio_CTX *ctx, lcb_error_t err)
205 {
206     Request *req = reinterpret_cast<Request*>(lcbio_ctx_data(ctx));
207     lcb_log(LOGARGS(req, ERR), LOGFMT "Got error while performing I/O on HTTP stream. Err=0x%x", LOGID(req), err);
208     req->finish_or_retry(err);
209 }
210 
211 static void
request_timed_out(void * arg)212 request_timed_out(void *arg)
213 {
214     Request *req = reinterpret_cast<Request*>(arg);
215     (req)->finish(LCB_ETIMEDOUT);
216 }
217 
218 static void
on_connected(lcbio_SOCKET * sock,void * arg,lcb_error_t err,lcbio_OSERR syserr)219 on_connected(lcbio_SOCKET *sock, void *arg, lcb_error_t err, lcbio_OSERR syserr)
220 {
221     Request *req = reinterpret_cast<Request*>(arg);
222     lcbio_CTXPROCS procs;
223     lcb_settings *settings = req->instance->settings;
224     req->creq = NULL;
225 
226     if (err != LCB_SUCCESS) {
227         lcb_log(LOGARGS(req, ERR), "Connection to failed with Err=0x%x", err);
228         req->finish_or_retry(err);
229         return;
230     }
231 
232     lcbio_sslify_if_needed(sock, settings);
233 
234     procs.cb_err = io_error;
235     procs.cb_read = io_read;
236     req->ioctx = lcbio_ctx_new(sock, arg, &procs);
237     switch (req->reqtype) {
238     case LCB_HTTP_TYPE_N1QL:
239         sock->service = LCBIO_SERVICE_N1QL;
240         break;
241     case LCB_HTTP_TYPE_VIEW:
242         sock->service = LCBIO_SERVICE_VIEW;
243         break;
244     case LCB_HTTP_TYPE_FTS:
245         sock->service = LCBIO_SERVICE_FTS;
246         break;
247     case LCB_HTTP_TYPE_CBAS:
248         sock->service = LCBIO_SERVICE_CBAS;
249         break;
250     default:
251         sock->service = LCBIO_SERVICE_MGMT;
252         break;
253     }
254     req->ioctx->subsys = "mgmt/capi";
255     lcbio_ctx_put(req->ioctx, &req->preamble[0], req->preamble.size());
256     if (!req->body.empty()) {
257         lcbio_ctx_put(req->ioctx, &req->body[0], req->body.size());
258     }
259     lcbio_ctx_rwant(req->ioctx, 1);
260     lcbio_ctx_schedule(req->ioctx);
261     (void)syserr;
262 }
263 
264 lcb_error_t
start_io(lcb_host_t & dest)265 Request::start_io(lcb_host_t& dest)
266 {
267     lcbio_MGR *pool = instance->http_sockpool;
268 
269     creq = pool->get(dest, timeout(), on_connected, this);
270     if (!creq) {
271         return LCB_CONNECT_ERROR;
272     }
273 
274     if (!timer) {
275         timer = lcbio_timer_new(io, this, request_timed_out);
276     }
277 
278     if (!lcbio_timer_armed(timer)) {
279         lcbio_timer_rearm(timer, timeout());
280     }
281 
282     return LCB_SUCCESS;
283 }
284 
285 static void
pool_close_cb(lcbio_SOCKET * sock,int reusable,void * arg)286 pool_close_cb(lcbio_SOCKET *sock, int reusable, void *arg)
287 {
288     int close_ok = *(int *)arg;
289 
290     lcbio_ref(sock);
291     if (reusable && close_ok) {
292         lcb::io::Pool::put(sock);
293     } else {
294         lcb::io::Pool::discard(sock);
295     }
296 }
297 
298 void
close_io()299 Request::close_io()
300 {
301     lcb::io::ConnectionRequest::cancel(&creq);
302 
303     if (!ioctx) {
304         return;
305     }
306 
307     int can_ka;
308 
309     if (parser && is_data_request()) {
310         can_ka = parser->can_keepalive();
311     } else {
312         can_ka = 0;
313     }
314 
315     lcbio_ctx_close(ioctx, pool_close_cb, &can_ka);
316     ioctx = NULL;
317 }
318