1 #include "coeurl/client.hpp"
2
3 #include <event2/thread.h>
4 #include <spdlog/sinks/null_sink.h>
5
6 #include <thread>
7
8 #include "coeurl/request.hpp"
9
10 namespace coeurl {
11 std::shared_ptr<spdlog::logger> Client::log = spdlog::null_logger_mt("coeurl_null");
12
13 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)14 void Client::mcode_or_die(const char *where, CURLMcode code) {
15 if (CURLM_OK != code) {
16 const char *s = curl_multi_strerror(code);
17 switch (code) {
18 case CURLM_BAD_SOCKET:
19 Client::log->error("{} returns {}", where, s);
20 /* ignore this error */
21 return;
22 case CURLM_BAD_HANDLE:
23 case CURLM_BAD_EASY_HANDLE:
24 case CURLM_OUT_OF_MEMORY:
25 case CURLM_INTERNAL_ERROR:
26 case CURLM_UNKNOWN_OPTION:
27 case CURLM_LAST:
28 break;
29 default:
30 s = "CURLM_unknown";
31 break;
32 }
33 Client::log->critical("{} returns {}", where, s);
34 throw std::runtime_error(s);
35 }
36 }
37
38 /* Information associated with a specific socket */
39 struct SockInfo {
40 curl_socket_t sockfd;
41 struct event ev;
42 };
43
44 /* Update the event timer after curl_multi library calls */
multi_timer_cb(CURLM * multi,long timeout_ms,Client * g)45 int Client::multi_timer_cb(CURLM *multi, long timeout_ms, Client *g) {
46 struct timeval timeout;
47 (void)multi;
48
49 timeout.tv_sec = timeout_ms / 1000;
50 timeout.tv_usec = (timeout_ms % 1000) * 1000;
51 Client::log->trace("multi_timer_cb: Setting timeout to {} ms", timeout_ms);
52
53 /*
54 * if timeout_ms is -1, just delete the timer
55 *
56 * For all other values of timeout_ms, this should set or *update* the timer
57 * to the new value
58 */
59 if (timeout_ms == -1)
60 event_del(&g->timer_event);
61 else /* includes timeout zero */ {
62 event_add(&g->timer_event, &timeout);
63 }
64 return 0;
65 }
66
67 /* Called by libevent when we get action on a multi socket */
event_cb(evutil_socket_t fd,short kind,void * userp)68 void Client::event_cb(evutil_socket_t fd, short kind, void *userp) {
69 Client *g = (Client *)userp;
70
71 int action = ((kind & EV_READ) ? CURL_CSELECT_IN : 0) | ((kind & EV_WRITE) ? CURL_CSELECT_OUT : 0);
72
73 CURLMcode rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
74 mcode_or_die("event_cb: curl_multi_socket_action", rc);
75
76 g->check_multi_info();
77 if (g->still_running <= 0 && g->running_requests.empty()) {
78 Client::log->trace("last transfer done, kill timeout");
79 if (evtimer_pending(&g->timer_event, NULL)) {
80 evtimer_del(&g->timer_event);
81 }
82 }
83 }
84
85 /* Called by libevent when our timeout expires */
timer_cb(evutil_socket_t,short,void * userp)86 void Client::timer_cb(evutil_socket_t, short, void *userp) {
87 Client::log->trace("timer_cb");
88
89 Client *g = (Client *)userp;
90
91 CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
92 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
93 g->check_multi_info();
94 }
95
96 // Invoked when we were told to shut down.
stop_ev_loop_cb(evutil_socket_t,short,void * userp)97 void Client::stop_ev_loop_cb(evutil_socket_t, short, void *userp) {
98 Client::log->trace("stop_ev_loop_cb");
99
100 Client *g = (Client *)userp;
101
102 CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
103 mcode_or_die("stop_ev_loop_cb: curl_multi_socket_action", rc);
104 g->check_multi_info();
105 }
106
107 /* Called by libevent when our timeout expires */
add_pending_requests_cb(evutil_socket_t,short,void * userp)108 void Client::add_pending_requests_cb(evutil_socket_t, short, void *userp) {
109 Client::log->trace("add_pending_requests_cb");
110
111 Client *g = (Client *)userp;
112
113 {
114 const std::scoped_lock lock(g->pending_requests_mutex, g->running_requests_mutex);
115
116 for (size_t i = 0; i < g->pending_requests.size(); i++) {
117 const auto &conn = g->pending_requests[i];
118
119 Client::log->trace("Adding easy {} to multi {} ({})", conn->easy, g->multi, conn->url_.c_str());
120 auto rc = curl_multi_add_handle(g->multi, conn->easy);
121 mcode_or_die("new_conn: curl_multi_add_handle", rc);
122
123 g->running_requests.push_back(std::move(g->pending_requests[i]));
124 }
125
126 g->pending_requests.clear();
127 }
128 }
129
130 /* Called by libevent when our timeout expires */
cancel_requests_cb(evutil_socket_t,short,void * userp)131 void Client::cancel_requests_cb(evutil_socket_t, short, void *userp) {
132 Client::log->trace("cancel_requests_cb");
133
134 Client *g = (Client *)userp;
135
136 {
137 // prevent new requests from being added
138 const std::scoped_lock lock(g->pending_requests_mutex);
139
140 // safe to access now, since we are running on the worker thread and only
141 // there running_requests is modified
142 while (!g->running_requests.empty())
143 g->remove_request(g->running_requests.back().get());
144 }
145
146 CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
147 mcode_or_die("timer_cb: curl_multi_socket_action", rc);
148 g->check_multi_info();
149 }
150
151 /* Clean up the SockInfo structure */
remsock(SockInfo * f)152 void Client::remsock(SockInfo *f) {
153 if (f) {
154 if (event_initialized(&f->ev)) {
155 event_del(&f->ev);
156 }
157 delete f;
158 }
159 }
160
161 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,int act)162 void Client::setsock(SockInfo *f, curl_socket_t s, int act) {
163 short kind = ((act & CURL_POLL_IN) ? EV_READ : 0) | ((act & CURL_POLL_OUT) ? EV_WRITE : 0) | EV_PERSIST;
164
165 f->sockfd = s;
166 if (event_initialized(&f->ev)) {
167 event_del(&f->ev);
168 }
169 event_assign(&f->ev, this->evbase, f->sockfd, kind, event_cb, this);
170 event_add(&f->ev, NULL);
171 }
172
173 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,int action)174 void Client::addsock(curl_socket_t s, int action) {
175 SockInfo *fdp = new SockInfo();
176
177 setsock(fdp, s, action);
178 curl_multi_assign(this->multi, s, fdp);
179 }
180
181 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)182 int Client::sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) {
183 Client *g = (Client *)cbp;
184 SockInfo *fdp = (SockInfo *)sockp;
185 const char *whatstr[] = {"none", "IN", "OUT", "INOUT", "REMOVE"};
186
187 Client::log->trace("socket callback: s={} e={} what={} ", s, e, whatstr[what]);
188 if (what == CURL_POLL_REMOVE) {
189 g->remsock(fdp);
190 } else {
191 if (!fdp) {
192 Client::log->trace("Adding data: {}", whatstr[what]);
193 g->addsock(s, what);
194 } else {
195 Client::log->trace("Changing action to: {}", whatstr[what]);
196 g->setsock(fdp, s, what);
197 }
198 }
199 return 0;
200 }
201
Client()202 Client::Client() {
203 std::once_flag threads_once;
204 #ifdef WIN32
205 std::call_once(threads_once, evthread_use_windows_threads);
206 #elif defined(EVENT__HAVE_PTHREADS)
207 std::call_once(threads_once, evthread_use_pthreads);
208 #else
209 #error "No supported threading backend!"
210 #endif
211
212 /* Make sure the SSL or WinSock backends are initialized */
213 std::once_flag curl_once;
214 std::call_once(curl_once, curl_global_init, CURL_GLOBAL_DEFAULT);
215
216 this->evbase = event_base_new();
217 this->multi = curl_multi_init();
218 event_assign(&this->timer_event, this->evbase, -1, 0, timer_cb, this);
219 event_assign(&this->add_request_timer, this->evbase, -1, 0, add_pending_requests_cb, this);
220 event_assign(&this->stop_event, this->evbase, -1, 0, stop_ev_loop_cb, this);
221 event_assign(&this->cancel_requests_timer, this->evbase, -1, 0, cancel_requests_cb, this);
222
223 /* setup the generic multi interface options we want */
224 curl_multi_setopt(this->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
225 curl_multi_setopt(this->multi, CURLMOPT_SOCKETDATA, this);
226 curl_multi_setopt(this->multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
227 curl_multi_setopt(this->multi, CURLMOPT_TIMERDATA, this);
228
229 bg_thread = std::thread([this]() { this->run(); });
230 }
231
~Client()232 Client::~Client() {
233 close();
234
235 event_del(&this->timer_event);
236 event_del(&this->add_request_timer);
237 event_del(&this->stop_event);
238 event_del(&this->cancel_requests_timer);
239 event_base_free(this->evbase);
240 curl_multi_cleanup(this->multi);
241 }
242
close(bool force)243 void Client::close(bool force) {
244 std::unique_lock l{stopped_mutex};
245 if (stopped)
246 return;
247
248 Client::log->trace("STOP");
249
250 if (force)
251 shutdown();
252
253 stopped = true;
254 event_active(&this->stop_event, 0, 0);
255
256 Client::log->trace("WAITING");
257 if (bg_thread.get_id() != std::this_thread::get_id())
258 bg_thread.join();
259 else
260 bg_thread.detach();
261 Client::log->trace("CLOSED");
262 }
263
shutdown()264 void Client::shutdown() { event_active(&this->cancel_requests_timer, 0, 0); }
265
run()266 void Client::run() { event_base_loop(this->evbase, EVLOOP_NO_EXIT_ON_EMPTY); }
267
268 /* Check for completed transfers, and remove their easy handles */
check_multi_info()269 void Client::check_multi_info() {
270 CURLMsg *msg;
271 int msgs_left;
272
273 Client::log->trace("REMAINING: {}", this->still_running);
274 while ((msg = curl_multi_info_read(this->multi, &msgs_left))) {
275 if (msg->msg == CURLMSG_DONE) {
276 CURL *easy = msg->easy_handle;
277
278 Request *conn;
279 curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
280 conn->status = Request::Status::Done;
281 conn->curl_error = msg->data.result;
282
283 remove_request(conn);
284 }
285 }
286
287 if (this->still_running == 0)
288 add_pending_requests_cb(0, 0, this);
289
290 if (this->still_running == 0 && this->running_requests.empty() && this->stopped) {
291 event_base_loopbreak(this->evbase);
292 Client::log->trace("BREAK");
293 }
294 Client::log->trace("after check_multi_info: {}", this->still_running);
295 }
296
submit_request(std::shared_ptr<Request> conn)297 void Client::submit_request(std::shared_ptr<Request> conn) {
298 Client::log->trace("SUBMIT");
299
300 {
301 const std::scoped_lock lock(pending_requests_mutex);
302
303 pending_requests.push_back(conn);
304 }
305
306 event_active(&add_request_timer, 0, 0);
307 }
remove_request(Request * r)308 void Client::remove_request(Request *r) {
309 Client::log->trace("REMOVE");
310
311 std::shared_ptr<Request> req;
312
313 {
314 std::scoped_lock lock(this->running_requests_mutex);
315 curl_multi_remove_handle(this->multi, r->easy);
316
317 for (auto it = this->running_requests.begin(); this->running_requests.end() != it; ++it) {
318 if (it->get() == r) {
319 req = std::move(*it);
320 this->running_requests.erase(it);
321 break;
322 }
323 }
324 }
325
326 if (req) {
327 long http_code;
328 curl_easy_getinfo(req->easy, CURLINFO_RESPONSE_CODE, &http_code);
329
330 Client::log->trace("DONE: {} => {} ({}) http: {}", req->url_, req->curl_error, req->error, http_code);
331
332 if (req->on_complete_)
333 req->on_complete_(*req.get());
334 }
335 }
336
get(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)337 void Client::get(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
338 long max_redirects) {
339 auto req = std::make_shared<Request>(this, Request::Method::Get, std::move(url));
340
341 req->on_complete(std::move(callback));
342
343 if (!headers.empty())
344 req->request_headers(headers);
345
346 if (max_redirects > 0)
347 req->max_redirects(max_redirects);
348
349 req->connection_timeout(connection_timeout_);
350
351 this->submit_request(std::move(req));
352 }
353
delete_(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)354 void Client::delete_(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
355 long max_redirects) {
356 auto req = std::make_shared<Request>(this, Request::Method::Delete, std::move(url));
357
358 req->on_complete(std::move(callback));
359
360 if (!headers.empty())
361 req->request_headers(headers);
362
363 if (max_redirects > 0)
364 req->max_redirects(max_redirects);
365
366 req->connection_timeout(connection_timeout_);
367
368 this->submit_request(std::move(req));
369 }
370
delete_(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)371 void Client::delete_(std::string url, std::string request_body, std::string mimetype, std::function<void(const Request &)> callback, const Headers &headers,
372 long max_redirects) {
373 auto req = std::make_shared<Request>(this, Request::Method::Delete, std::move(url));
374
375 req->request(request_body, mimetype);
376 req->on_complete(std::move(callback));
377
378 if (!headers.empty())
379 req->request_headers(headers);
380
381 if (max_redirects > 0)
382 req->max_redirects(max_redirects);
383
384 req->connection_timeout(connection_timeout_);
385
386 this->submit_request(std::move(req));
387 }
388
head(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)389 void Client::head(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
390 long max_redirects) {
391 auto req = std::make_shared<Request>(this, Request::Method::Head, std::move(url));
392
393 req->on_complete(std::move(callback));
394
395 if (!headers.empty())
396 req->request_headers(headers);
397
398 if (max_redirects > 0)
399 req->max_redirects(max_redirects);
400
401 req->connection_timeout(connection_timeout_);
402
403 this->submit_request(std::move(req));
404 }
405
options(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)406 void Client::options(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
407 long max_redirects) {
408 auto req = std::make_shared<Request>(this, Request::Method::Options, std::move(url));
409
410 req->on_complete(std::move(callback));
411
412 if (!headers.empty())
413 req->request_headers(headers);
414
415 if (max_redirects > 0)
416 req->max_redirects(max_redirects);
417
418 req->connection_timeout(connection_timeout_);
419
420 this->submit_request(std::move(req));
421 }
422
put(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)423 void Client::put(std::string url, std::string request_body, std::string mimetype,
424 std::function<void(const Request &)> callback, const Headers &headers, long max_redirects) {
425 auto req = std::make_shared<Request>(this, Request::Method::Put, std::move(url));
426
427 req->request(request_body, mimetype);
428 req->on_complete(std::move(callback));
429
430 if (!headers.empty())
431 req->request_headers(headers);
432
433 if (max_redirects > 0)
434 req->max_redirects(max_redirects);
435
436 req->connection_timeout(connection_timeout_);
437
438 this->submit_request(std::move(req));
439 }
440
post(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)441 void Client::post(std::string url, std::string request_body, std::string mimetype,
442 std::function<void(const Request &)> callback, const Headers &headers, long max_redirects) {
443 auto req = std::make_shared<Request>(this, Request::Method::Post, std::move(url));
444
445 req->request(request_body, mimetype);
446 req->on_complete(std::move(callback));
447
448 if (!headers.empty())
449 req->request_headers(headers);
450
451 if (max_redirects > 0)
452 req->max_redirects(max_redirects);
453
454 req->connection_timeout(connection_timeout_);
455
456 this->submit_request(std::move(req));
457 }
458 } // namespace coeurl
459