1 #include "coeurl/client.hpp"
2 
3 #include <event2/thread.h>
4 #include <spdlog/sinks/null_sink.h>
5 
6 #include <thread>
7 
8 #include "coeurl/request.hpp"
9 
10 namespace coeurl {
11 std::shared_ptr<spdlog::logger> Client::log = spdlog::null_logger_mt("coeurl_null");
12 
13 /* Die if we get a bad CURLMcode somewhere */
mcode_or_die(const char * where,CURLMcode code)14 void Client::mcode_or_die(const char *where, CURLMcode code) {
15     if (CURLM_OK != code) {
16         const char *s = curl_multi_strerror(code);
17         switch (code) {
18         case CURLM_BAD_SOCKET:
19             Client::log->error("{} returns {}", where, s);
20             /* ignore this error */
21             return;
22         case CURLM_BAD_HANDLE:
23         case CURLM_BAD_EASY_HANDLE:
24         case CURLM_OUT_OF_MEMORY:
25         case CURLM_INTERNAL_ERROR:
26         case CURLM_UNKNOWN_OPTION:
27         case CURLM_LAST:
28             break;
29         default:
30             s = "CURLM_unknown";
31             break;
32         }
33         Client::log->critical("{} returns {}", where, s);
34         throw std::runtime_error(s);
35     }
36 }
37 
38 /* Information associated with a specific socket */
39 struct SockInfo {
40     curl_socket_t sockfd;
41     struct event ev;
42 };
43 
44 /* Update the event timer after curl_multi library calls */
multi_timer_cb(CURLM * multi,long timeout_ms,Client * g)45 int Client::multi_timer_cb(CURLM *multi, long timeout_ms, Client *g) {
46     struct timeval timeout;
47     (void)multi;
48 
49     timeout.tv_sec = timeout_ms / 1000;
50     timeout.tv_usec = (timeout_ms % 1000) * 1000;
51     Client::log->trace("multi_timer_cb: Setting timeout to {} ms", timeout_ms);
52 
53     /*
54      * if timeout_ms is -1, just delete the timer
55      *
56      * For all other values of timeout_ms, this should set or *update* the timer
57      * to the new value
58      */
59     if (timeout_ms == -1)
60         event_del(&g->timer_event);
61     else /* includes timeout zero */ {
62         event_add(&g->timer_event, &timeout);
63     }
64     return 0;
65 }
66 
67 /* Called by libevent when we get action on a multi socket */
event_cb(evutil_socket_t fd,short kind,void * userp)68 void Client::event_cb(evutil_socket_t fd, short kind, void *userp) {
69     Client *g = (Client *)userp;
70 
71     int action = ((kind & EV_READ) ? CURL_CSELECT_IN : 0) | ((kind & EV_WRITE) ? CURL_CSELECT_OUT : 0);
72 
73     CURLMcode rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
74     mcode_or_die("event_cb: curl_multi_socket_action", rc);
75 
76     g->check_multi_info();
77     if (g->still_running <= 0 && g->running_requests.empty()) {
78         Client::log->trace("last transfer done, kill timeout");
79         if (evtimer_pending(&g->timer_event, NULL)) {
80             evtimer_del(&g->timer_event);
81         }
82     }
83 }
84 
85 /* Called by libevent when our timeout expires */
timer_cb(evutil_socket_t,short,void * userp)86 void Client::timer_cb(evutil_socket_t, short, void *userp) {
87     Client::log->trace("timer_cb");
88 
89     Client *g = (Client *)userp;
90 
91     CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
92     mcode_or_die("timer_cb: curl_multi_socket_action", rc);
93     g->check_multi_info();
94 }
95 
96 // Invoked when we were told to shut down.
stop_ev_loop_cb(evutil_socket_t,short,void * userp)97 void Client::stop_ev_loop_cb(evutil_socket_t, short, void *userp) {
98     Client::log->trace("stop_ev_loop_cb");
99 
100     Client *g = (Client *)userp;
101 
102     CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
103     mcode_or_die("stop_ev_loop_cb: curl_multi_socket_action", rc);
104     g->check_multi_info();
105 }
106 
107 /* Called by libevent when our timeout expires */
add_pending_requests_cb(evutil_socket_t,short,void * userp)108 void Client::add_pending_requests_cb(evutil_socket_t, short, void *userp) {
109     Client::log->trace("add_pending_requests_cb");
110 
111     Client *g = (Client *)userp;
112 
113     {
114         const std::scoped_lock lock(g->pending_requests_mutex, g->running_requests_mutex);
115 
116         for (size_t i = 0; i < g->pending_requests.size(); i++) {
117             const auto &conn = g->pending_requests[i];
118 
119             Client::log->trace("Adding easy {} to multi {} ({})", conn->easy, g->multi, conn->url_.c_str());
120             auto rc = curl_multi_add_handle(g->multi, conn->easy);
121             mcode_or_die("new_conn: curl_multi_add_handle", rc);
122 
123             g->running_requests.push_back(std::move(g->pending_requests[i]));
124         }
125 
126         g->pending_requests.clear();
127     }
128 }
129 
130 /* Called by libevent when our timeout expires */
cancel_requests_cb(evutil_socket_t,short,void * userp)131 void Client::cancel_requests_cb(evutil_socket_t, short, void *userp) {
132     Client::log->trace("cancel_requests_cb");
133 
134     Client *g = (Client *)userp;
135 
136     {
137         // prevent new requests from being added
138         const std::scoped_lock lock(g->pending_requests_mutex);
139 
140         // safe to access now, since we are running on the worker thread and only
141         // there running_requests is modified
142         while (!g->running_requests.empty())
143             g->remove_request(g->running_requests.back().get());
144     }
145 
146     CURLMcode rc = curl_multi_socket_action(g->multi, CURL_SOCKET_TIMEOUT, 0, &g->still_running);
147     mcode_or_die("timer_cb: curl_multi_socket_action", rc);
148     g->check_multi_info();
149 }
150 
151 /* Clean up the SockInfo structure */
remsock(SockInfo * f)152 void Client::remsock(SockInfo *f) {
153     if (f) {
154         if (event_initialized(&f->ev)) {
155             event_del(&f->ev);
156         }
157         delete f;
158     }
159 }
160 
161 /* Assign information to a SockInfo structure */
setsock(SockInfo * f,curl_socket_t s,int act)162 void Client::setsock(SockInfo *f, curl_socket_t s, int act) {
163     short kind = ((act & CURL_POLL_IN) ? EV_READ : 0) | ((act & CURL_POLL_OUT) ? EV_WRITE : 0) | EV_PERSIST;
164 
165     f->sockfd = s;
166     if (event_initialized(&f->ev)) {
167         event_del(&f->ev);
168     }
169     event_assign(&f->ev, this->evbase, f->sockfd, kind, event_cb, this);
170     event_add(&f->ev, NULL);
171 }
172 
173 /* Initialize a new SockInfo structure */
addsock(curl_socket_t s,int action)174 void Client::addsock(curl_socket_t s, int action) {
175     SockInfo *fdp = new SockInfo();
176 
177     setsock(fdp, s, action);
178     curl_multi_assign(this->multi, s, fdp);
179 }
180 
181 /* CURLMOPT_SOCKETFUNCTION */
sock_cb(CURL * e,curl_socket_t s,int what,void * cbp,void * sockp)182 int Client::sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp) {
183     Client *g = (Client *)cbp;
184     SockInfo *fdp = (SockInfo *)sockp;
185     const char *whatstr[] = {"none", "IN", "OUT", "INOUT", "REMOVE"};
186 
187     Client::log->trace("socket callback: s={} e={} what={} ", s, e, whatstr[what]);
188     if (what == CURL_POLL_REMOVE) {
189         g->remsock(fdp);
190     } else {
191         if (!fdp) {
192             Client::log->trace("Adding data: {}", whatstr[what]);
193             g->addsock(s, what);
194         } else {
195             Client::log->trace("Changing action to: {}", whatstr[what]);
196             g->setsock(fdp, s, what);
197         }
198     }
199     return 0;
200 }
201 
Client()202 Client::Client() {
203     std::once_flag threads_once;
204 #ifdef WIN32
205     std::call_once(threads_once, evthread_use_windows_threads);
206 #elif defined(EVENT__HAVE_PTHREADS)
207     std::call_once(threads_once, evthread_use_pthreads);
208 #else
209 #error "No supported threading backend!"
210 #endif
211 
212     /* Make sure the SSL or WinSock backends are initialized */
213     std::once_flag curl_once;
214     std::call_once(curl_once, curl_global_init, CURL_GLOBAL_DEFAULT);
215 
216     this->evbase = event_base_new();
217     this->multi = curl_multi_init();
218     event_assign(&this->timer_event, this->evbase, -1, 0, timer_cb, this);
219     event_assign(&this->add_request_timer, this->evbase, -1, 0, add_pending_requests_cb, this);
220     event_assign(&this->stop_event, this->evbase, -1, 0, stop_ev_loop_cb, this);
221     event_assign(&this->cancel_requests_timer, this->evbase, -1, 0, cancel_requests_cb, this);
222 
223     /* setup the generic multi interface options we want */
224     curl_multi_setopt(this->multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
225     curl_multi_setopt(this->multi, CURLMOPT_SOCKETDATA, this);
226     curl_multi_setopt(this->multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
227     curl_multi_setopt(this->multi, CURLMOPT_TIMERDATA, this);
228 
229     bg_thread = std::thread([this]() { this->run(); });
230 }
231 
~Client()232 Client::~Client() {
233     close();
234 
235     event_del(&this->timer_event);
236     event_del(&this->add_request_timer);
237     event_del(&this->stop_event);
238     event_del(&this->cancel_requests_timer);
239     event_base_free(this->evbase);
240     curl_multi_cleanup(this->multi);
241 }
242 
close(bool force)243 void Client::close(bool force) {
244     std::unique_lock l{stopped_mutex};
245     if (stopped)
246         return;
247 
248     Client::log->trace("STOP");
249 
250     if (force)
251         shutdown();
252 
253     stopped = true;
254     event_active(&this->stop_event, 0, 0);
255 
256     Client::log->trace("WAITING");
257     if (bg_thread.get_id() != std::this_thread::get_id())
258         bg_thread.join();
259     else
260         bg_thread.detach();
261     Client::log->trace("CLOSED");
262 }
263 
shutdown()264 void Client::shutdown() { event_active(&this->cancel_requests_timer, 0, 0); }
265 
run()266 void Client::run() { event_base_loop(this->evbase, EVLOOP_NO_EXIT_ON_EMPTY); }
267 
268 /* Check for completed transfers, and remove their easy handles */
check_multi_info()269 void Client::check_multi_info() {
270     CURLMsg *msg;
271     int msgs_left;
272 
273     Client::log->trace("REMAINING: {}", this->still_running);
274     while ((msg = curl_multi_info_read(this->multi, &msgs_left))) {
275         if (msg->msg == CURLMSG_DONE) {
276             CURL *easy = msg->easy_handle;
277 
278             Request *conn;
279             curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
280             conn->status = Request::Status::Done;
281             conn->curl_error = msg->data.result;
282 
283             remove_request(conn);
284         }
285     }
286 
287     if (this->still_running == 0)
288         add_pending_requests_cb(0, 0, this);
289 
290     if (this->still_running == 0 && this->running_requests.empty() && this->stopped) {
291         event_base_loopbreak(this->evbase);
292         Client::log->trace("BREAK");
293     }
294     Client::log->trace("after check_multi_info: {}", this->still_running);
295 }
296 
submit_request(std::shared_ptr<Request> conn)297 void Client::submit_request(std::shared_ptr<Request> conn) {
298     Client::log->trace("SUBMIT");
299 
300     {
301         const std::scoped_lock lock(pending_requests_mutex);
302 
303         pending_requests.push_back(conn);
304     }
305 
306     event_active(&add_request_timer, 0, 0);
307 }
remove_request(Request * r)308 void Client::remove_request(Request *r) {
309     Client::log->trace("REMOVE");
310 
311     std::shared_ptr<Request> req;
312 
313     {
314         std::scoped_lock lock(this->running_requests_mutex);
315         curl_multi_remove_handle(this->multi, r->easy);
316 
317         for (auto it = this->running_requests.begin(); this->running_requests.end() != it; ++it) {
318             if (it->get() == r) {
319                 req = std::move(*it);
320                 this->running_requests.erase(it);
321                 break;
322             }
323         }
324     }
325 
326     if (req) {
327         long http_code;
328         curl_easy_getinfo(req->easy, CURLINFO_RESPONSE_CODE, &http_code);
329 
330         Client::log->trace("DONE: {} => {} ({}) http: {}", req->url_, req->curl_error, req->error, http_code);
331 
332         if (req->on_complete_)
333             req->on_complete_(*req.get());
334     }
335 }
336 
get(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)337 void Client::get(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
338                  long max_redirects) {
339     auto req = std::make_shared<Request>(this, Request::Method::Get, std::move(url));
340 
341     req->on_complete(std::move(callback));
342 
343     if (!headers.empty())
344         req->request_headers(headers);
345 
346     if (max_redirects > 0)
347         req->max_redirects(max_redirects);
348 
349     req->connection_timeout(connection_timeout_);
350 
351     this->submit_request(std::move(req));
352 }
353 
delete_(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)354 void Client::delete_(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
355                      long max_redirects) {
356     auto req = std::make_shared<Request>(this, Request::Method::Delete, std::move(url));
357 
358     req->on_complete(std::move(callback));
359 
360     if (!headers.empty())
361         req->request_headers(headers);
362 
363     if (max_redirects > 0)
364         req->max_redirects(max_redirects);
365 
366     req->connection_timeout(connection_timeout_);
367 
368     this->submit_request(std::move(req));
369 }
370 
delete_(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)371 void Client::delete_(std::string url, std::string request_body, std::string mimetype, std::function<void(const Request &)> callback, const Headers &headers,
372                      long max_redirects) {
373     auto req = std::make_shared<Request>(this, Request::Method::Delete, std::move(url));
374 
375     req->request(request_body, mimetype);
376     req->on_complete(std::move(callback));
377 
378     if (!headers.empty())
379         req->request_headers(headers);
380 
381     if (max_redirects > 0)
382         req->max_redirects(max_redirects);
383 
384     req->connection_timeout(connection_timeout_);
385 
386     this->submit_request(std::move(req));
387 }
388 
head(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)389 void Client::head(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
390                   long max_redirects) {
391     auto req = std::make_shared<Request>(this, Request::Method::Head, std::move(url));
392 
393     req->on_complete(std::move(callback));
394 
395     if (!headers.empty())
396         req->request_headers(headers);
397 
398     if (max_redirects > 0)
399         req->max_redirects(max_redirects);
400 
401     req->connection_timeout(connection_timeout_);
402 
403     this->submit_request(std::move(req));
404 }
405 
options(std::string url,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)406 void Client::options(std::string url, std::function<void(const Request &)> callback, const Headers &headers,
407                      long max_redirects) {
408     auto req = std::make_shared<Request>(this, Request::Method::Options, std::move(url));
409 
410     req->on_complete(std::move(callback));
411 
412     if (!headers.empty())
413         req->request_headers(headers);
414 
415     if (max_redirects > 0)
416         req->max_redirects(max_redirects);
417 
418     req->connection_timeout(connection_timeout_);
419 
420     this->submit_request(std::move(req));
421 }
422 
put(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)423 void Client::put(std::string url, std::string request_body, std::string mimetype,
424                  std::function<void(const Request &)> callback, const Headers &headers, long max_redirects) {
425     auto req = std::make_shared<Request>(this, Request::Method::Put, std::move(url));
426 
427     req->request(request_body, mimetype);
428     req->on_complete(std::move(callback));
429 
430     if (!headers.empty())
431         req->request_headers(headers);
432 
433     if (max_redirects > 0)
434         req->max_redirects(max_redirects);
435 
436     req->connection_timeout(connection_timeout_);
437 
438     this->submit_request(std::move(req));
439 }
440 
post(std::string url,std::string request_body,std::string mimetype,std::function<void (const Request &)> callback,const Headers & headers,long max_redirects)441 void Client::post(std::string url, std::string request_body, std::string mimetype,
442                   std::function<void(const Request &)> callback, const Headers &headers, long max_redirects) {
443     auto req = std::make_shared<Request>(this, Request::Method::Post, std::move(url));
444 
445     req->request(request_body, mimetype);
446     req->on_complete(std::move(callback));
447 
448     if (!headers.empty())
449         req->request_headers(headers);
450 
451     if (max_redirects > 0)
452         req->max_redirects(max_redirects);
453 
454     req->connection_timeout(connection_timeout_);
455 
456     this->submit_request(std::move(req));
457 }
458 } // namespace coeurl
459