1 /**
2  * Copyright (c) 2015, Timothy Stack
3  *
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are met:
8  *
9  * * Redistributions of source code must retain the above copyright notice, this
10  * list of conditions and the following disclaimer.
11  * * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  * * Neither the name of Timothy Stack nor the names of its contributors
15  * may be used to endorse or promote products derived from this software
16  * without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19  * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21  * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23  * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25  * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27  * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  *
29  * @file curl_looper.cc
30  */
31 
32 #include "config.h"
33 
34 #include <algorithm>
35 
36 #if defined(HAVE_LIBCURL)
37 #include <curl/multi.h>
38 
39 #include "curl_looper.hh"
40 
41 using namespace std;
42 
43 struct curl_request_eq {
curl_request_eqcurl_request_eq44     explicit curl_request_eq(const std::string &name) : cre_name(name) {
45     };
46 
operator ()curl_request_eq47     bool operator()(const std::shared_ptr<curl_request>& cr) const {
48         return this->cre_name == cr->get_name();
49     };
50 
operator ()curl_request_eq51     bool operator()(const pair<mstime_t, std::shared_ptr<curl_request>> &pair) const {
52         return this->cre_name == pair.second->get_name();
53     };
54 
55     const std::string &cre_name;
56 };
57 
debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * userp)58 int curl_request::debug_cb(CURL *handle,
59                            curl_infotype type,
60                            char *data,
61                            size_t size,
62                            void *userp) {
63     curl_request *cr = (curl_request *) userp;
64     bool write_to_log;
65 
66     switch (type) {
67         case CURLINFO_TEXT:
68             write_to_log = true;
69             break;
70         case CURLINFO_HEADER_IN:
71         case CURLINFO_HEADER_OUT:
72             if (lnav_log_level == lnav_log_level_t::TRACE) {
73                 write_to_log = true;
74             }
75             else {
76                 write_to_log = false;
77             }
78             break;
79         default:
80             write_to_log = false;
81             break;
82     }
83 
84     if (write_to_log) {
85         while (size > 0 && isspace(data[size - 1])) {
86             size -= 1;
87         }
88         log_debug("%s:%.*s", cr->get_name().c_str(), size, data);
89     }
90 
91     return 0;
92 }
93 
loop_body()94 void curl_looper::loop_body()
95 {
96     mstime_t current_time = getmstime();
97 
98     this->perform_io();
99 
100     this->check_for_finished_requests();
101 
102     this->check_for_new_requests();
103 
104     this->requeue_requests(current_time + 5);
105 }
106 
perform_io()107 void curl_looper::perform_io()
108 {
109     if (this->cl_handle_to_request.empty()) {
110         return;
111     }
112 
113     mstime_t current_time = getmstime();
114     auto timeout = this->compute_timeout(current_time);
115     int running_handles;
116 
117     curl_multi_wait(this->cl_curl_multi,
118                     nullptr,
119                     0,
120                     timeout.count(),
121                     nullptr);
122     curl_multi_perform(this->cl_curl_multi, &running_handles);
123 }
124 
requeue_requests(mstime_t up_to_time)125 void curl_looper::requeue_requests(mstime_t up_to_time)
126 {
127     while (!this->cl_poll_queue.empty() &&
128            this->cl_poll_queue.front().first <= up_to_time) {
129         auto cr = this->cl_poll_queue.front().second;
130 
131         log_debug("%s:polling request is ready again -- %p",
132                   cr->get_name().c_str(), cr.get());
133         this->cl_handle_to_request[cr->get_handle()] = cr;
134         curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
135         this->cl_poll_queue.erase(this->cl_poll_queue.begin());
136     }
137 }
138 
check_for_new_requests()139 void curl_looper::check_for_new_requests() {
140     while (!this->cl_new_requests.empty()) {
141         auto cr = this->cl_new_requests.back();
142 
143         log_info("%s:new curl request %p",
144                  cr->get_name().c_str(),
145                  cr.get());
146         this->cl_handle_to_request[cr->get_handle()] = cr;
147         curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
148         this->cl_new_requests.pop_back();
149     }
150     while (!this->cl_close_requests.empty()) {
151         const std::string &name = this->cl_close_requests.back();
152         auto all_iter = find_if(
153                 this->cl_all_requests.begin(),
154                 this->cl_all_requests.end(),
155                 curl_request_eq(name));
156 
157         log_info("attempting to close request -- %s", name.c_str());
158         if (all_iter != this->cl_all_requests.end()) {
159             auto cr = *all_iter;
160 
161             log_info("%s:closing request -- %p",
162                      cr->get_name().c_str(), cr.get());
163             (*all_iter)->close();
164             auto act_iter = this->cl_handle_to_request.find(cr->get_handle());
165             if (act_iter != this->cl_handle_to_request.end()) {
166                 curl_multi_remove_handle(this->cl_curl_multi,
167                                          cr->get_handle());
168                 this->cl_handle_to_request.erase(act_iter);
169             }
170             auto poll_iter = find_if(this->cl_poll_queue.begin(),
171                                      this->cl_poll_queue.end(),
172                                      curl_request_eq(name));
173             if (poll_iter != this->cl_poll_queue.end()) {
174                 this->cl_poll_queue.erase(poll_iter);
175             }
176             this->cl_all_requests.erase(all_iter);
177         }
178         else {
179             log_error("Unable to find request with the name -- %s",
180                       name.c_str());
181         }
182 
183         this->cl_close_requests.pop_back();
184     }
185 }
186 
check_for_finished_requests()187 void curl_looper::check_for_finished_requests()
188 {
189     CURLMsg *msg;
190     int msgs_left;
191 
192     while ((msg = curl_multi_info_read(this->cl_curl_multi, &msgs_left)) !=
193         nullptr) {
194         if (msg->msg != CURLMSG_DONE) {
195             continue;
196         }
197 
198         CURL *easy = msg->easy_handle;
199         auto iter = this->cl_handle_to_request.find(easy);
200 
201         curl_multi_remove_handle(this->cl_curl_multi, easy);
202         if (iter != this->cl_handle_to_request.end()) {
203             auto cr = iter->second;
204             long delay_ms;
205 
206             this->cl_handle_to_request.erase(iter);
207             delay_ms = cr->complete(msg->data.result);
208             if (delay_ms < 0) {
209                 log_info("%s:curl_request %p finished, deleting...",
210                          cr->get_name().c_str(), cr.get());
211                 auto all_iter = find(this->cl_all_requests.begin(),
212                                      this->cl_all_requests.end(),
213                                      cr);
214                 if (all_iter != this->cl_all_requests.end()) {
215                     this->cl_all_requests.erase(all_iter);
216                 }
217             }
218             else {
219                 log_debug("%s:curl_request %p is polling, requeueing in %d",
220                           cr->get_name().c_str(),
221                           cr.get(),
222                           delay_ms);
223                 this->cl_poll_queue.emplace_back(getmstime() + delay_ms, cr);
224                 sort(this->cl_poll_queue.begin(), this->cl_poll_queue.end());
225             }
226         }
227     }
228 }
229 
compute_timeout(mstime_t current_time) const230 std::chrono::milliseconds curl_looper::compute_timeout(mstime_t current_time) const
231 {
232     std::chrono::milliseconds retval = 1s;
233 
234     if (!this->cl_handle_to_request.empty()) {
235         retval = 1ms;
236     } else if (!this->cl_poll_queue.empty()) {
237         retval = std::max(
238             1ms,
239             std::chrono::milliseconds(this->cl_poll_queue.front().first - current_time));
240     }
241 
242     ensure(retval.count() > 0);
243 
244     return retval;
245 }
246 
247 #endif
248