1 /**
2 * Copyright (c) 2015, Timothy Stack
3 *
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions are met:
8 *
9 * * Redistributions of source code must retain the above copyright notice, this
10 * list of conditions and the following disclaimer.
11 * * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
14 * * Neither the name of Timothy Stack nor the names of its contributors
15 * may be used to endorse or promote products derived from this software
16 * without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ''AS IS'' AND ANY
19 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
20 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
21 * DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY
22 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
23 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
24 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
25 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
27 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 * @file curl_looper.cc
30 */
31
32 #include "config.h"
33
34 #include <algorithm>
35
36 #if defined(HAVE_LIBCURL)
37 #include <curl/multi.h>
38
39 #include "curl_looper.hh"
40
41 using namespace std;
42
43 struct curl_request_eq {
curl_request_eqcurl_request_eq44 explicit curl_request_eq(const std::string &name) : cre_name(name) {
45 };
46
operator ()curl_request_eq47 bool operator()(const std::shared_ptr<curl_request>& cr) const {
48 return this->cre_name == cr->get_name();
49 };
50
operator ()curl_request_eq51 bool operator()(const pair<mstime_t, std::shared_ptr<curl_request>> &pair) const {
52 return this->cre_name == pair.second->get_name();
53 };
54
55 const std::string &cre_name;
56 };
57
debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * userp)58 int curl_request::debug_cb(CURL *handle,
59 curl_infotype type,
60 char *data,
61 size_t size,
62 void *userp) {
63 curl_request *cr = (curl_request *) userp;
64 bool write_to_log;
65
66 switch (type) {
67 case CURLINFO_TEXT:
68 write_to_log = true;
69 break;
70 case CURLINFO_HEADER_IN:
71 case CURLINFO_HEADER_OUT:
72 if (lnav_log_level == lnav_log_level_t::TRACE) {
73 write_to_log = true;
74 }
75 else {
76 write_to_log = false;
77 }
78 break;
79 default:
80 write_to_log = false;
81 break;
82 }
83
84 if (write_to_log) {
85 while (size > 0 && isspace(data[size - 1])) {
86 size -= 1;
87 }
88 log_debug("%s:%.*s", cr->get_name().c_str(), size, data);
89 }
90
91 return 0;
92 }
93
loop_body()94 void curl_looper::loop_body()
95 {
96 mstime_t current_time = getmstime();
97
98 this->perform_io();
99
100 this->check_for_finished_requests();
101
102 this->check_for_new_requests();
103
104 this->requeue_requests(current_time + 5);
105 }
106
perform_io()107 void curl_looper::perform_io()
108 {
109 if (this->cl_handle_to_request.empty()) {
110 return;
111 }
112
113 mstime_t current_time = getmstime();
114 auto timeout = this->compute_timeout(current_time);
115 int running_handles;
116
117 curl_multi_wait(this->cl_curl_multi,
118 nullptr,
119 0,
120 timeout.count(),
121 nullptr);
122 curl_multi_perform(this->cl_curl_multi, &running_handles);
123 }
124
requeue_requests(mstime_t up_to_time)125 void curl_looper::requeue_requests(mstime_t up_to_time)
126 {
127 while (!this->cl_poll_queue.empty() &&
128 this->cl_poll_queue.front().first <= up_to_time) {
129 auto cr = this->cl_poll_queue.front().second;
130
131 log_debug("%s:polling request is ready again -- %p",
132 cr->get_name().c_str(), cr.get());
133 this->cl_handle_to_request[cr->get_handle()] = cr;
134 curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
135 this->cl_poll_queue.erase(this->cl_poll_queue.begin());
136 }
137 }
138
check_for_new_requests()139 void curl_looper::check_for_new_requests() {
140 while (!this->cl_new_requests.empty()) {
141 auto cr = this->cl_new_requests.back();
142
143 log_info("%s:new curl request %p",
144 cr->get_name().c_str(),
145 cr.get());
146 this->cl_handle_to_request[cr->get_handle()] = cr;
147 curl_multi_add_handle(this->cl_curl_multi, cr->get_handle());
148 this->cl_new_requests.pop_back();
149 }
150 while (!this->cl_close_requests.empty()) {
151 const std::string &name = this->cl_close_requests.back();
152 auto all_iter = find_if(
153 this->cl_all_requests.begin(),
154 this->cl_all_requests.end(),
155 curl_request_eq(name));
156
157 log_info("attempting to close request -- %s", name.c_str());
158 if (all_iter != this->cl_all_requests.end()) {
159 auto cr = *all_iter;
160
161 log_info("%s:closing request -- %p",
162 cr->get_name().c_str(), cr.get());
163 (*all_iter)->close();
164 auto act_iter = this->cl_handle_to_request.find(cr->get_handle());
165 if (act_iter != this->cl_handle_to_request.end()) {
166 curl_multi_remove_handle(this->cl_curl_multi,
167 cr->get_handle());
168 this->cl_handle_to_request.erase(act_iter);
169 }
170 auto poll_iter = find_if(this->cl_poll_queue.begin(),
171 this->cl_poll_queue.end(),
172 curl_request_eq(name));
173 if (poll_iter != this->cl_poll_queue.end()) {
174 this->cl_poll_queue.erase(poll_iter);
175 }
176 this->cl_all_requests.erase(all_iter);
177 }
178 else {
179 log_error("Unable to find request with the name -- %s",
180 name.c_str());
181 }
182
183 this->cl_close_requests.pop_back();
184 }
185 }
186
check_for_finished_requests()187 void curl_looper::check_for_finished_requests()
188 {
189 CURLMsg *msg;
190 int msgs_left;
191
192 while ((msg = curl_multi_info_read(this->cl_curl_multi, &msgs_left)) !=
193 nullptr) {
194 if (msg->msg != CURLMSG_DONE) {
195 continue;
196 }
197
198 CURL *easy = msg->easy_handle;
199 auto iter = this->cl_handle_to_request.find(easy);
200
201 curl_multi_remove_handle(this->cl_curl_multi, easy);
202 if (iter != this->cl_handle_to_request.end()) {
203 auto cr = iter->second;
204 long delay_ms;
205
206 this->cl_handle_to_request.erase(iter);
207 delay_ms = cr->complete(msg->data.result);
208 if (delay_ms < 0) {
209 log_info("%s:curl_request %p finished, deleting...",
210 cr->get_name().c_str(), cr.get());
211 auto all_iter = find(this->cl_all_requests.begin(),
212 this->cl_all_requests.end(),
213 cr);
214 if (all_iter != this->cl_all_requests.end()) {
215 this->cl_all_requests.erase(all_iter);
216 }
217 }
218 else {
219 log_debug("%s:curl_request %p is polling, requeueing in %d",
220 cr->get_name().c_str(),
221 cr.get(),
222 delay_ms);
223 this->cl_poll_queue.emplace_back(getmstime() + delay_ms, cr);
224 sort(this->cl_poll_queue.begin(), this->cl_poll_queue.end());
225 }
226 }
227 }
228 }
229
compute_timeout(mstime_t current_time) const230 std::chrono::milliseconds curl_looper::compute_timeout(mstime_t current_time) const
231 {
232 std::chrono::milliseconds retval = 1s;
233
234 if (!this->cl_handle_to_request.empty()) {
235 retval = 1ms;
236 } else if (!this->cl_poll_queue.empty()) {
237 retval = std::max(
238 1ms,
239 std::chrono::milliseconds(this->cl_poll_queue.front().first - current_time));
240 }
241
242 ensure(retval.count() > 0);
243
244 return retval;
245 }
246
247 #endif
248