1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL. If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so. If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact: Jari Sundell <jaris@ifi.uio.no>
33 //
34 // Skomakerveien 33
35 // 3185 Skoppum, NORWAY
36
37 #include "config.h"
38
39 #include <algorithm>
40
41 #include <stdexcept>
42 #include <unistd.h>
43 #include <sys/time.h>
44
45 #include "net/socket_set.h"
46 #include "rak/allocators.h"
47
48 #include "event.h"
49 #include "exceptions.h"
50 #include "poll_select.h"
51 #include "torrent.h"
52 #include "rak/timer.h"
53 #include "rak/error_number.h"
54 #include "utils/log.h"
55 #include "utils/thread_base.h"
56
57 #define LT_LOG_EVENT(event, log_level, log_fmt, ...) \
58 lt_log_print(LOG_SOCKET_##log_level, "select->%s(%i): " log_fmt, event->type_name(), event->file_descriptor(), __VA_ARGS__);
59
60 namespace torrent {
61
62 Poll::slot_poll Poll::m_slot_create_poll;
63
64 template <typename _Operation>
65 struct poll_check_t {
66 poll_check_t(Poll* p, fd_set* s, _Operation op) : m_poll(p), m_set(s), m_op(op) {}
event_mask(Event * e)67
68 bool operator () (Event* s) {
69 // This check is nessesary as other events may remove a socket
70 // from the set.
71 if (s == NULL)
72 return false;
73
74 // This check is not nessesary, just for debugging.
75 if (s->file_descriptor() < 0)
76 throw internal_error("poll_check: s->fd < 0");
77
78 if (FD_ISSET(s->file_descriptor(), m_set)) {
79 m_op(s);
80
81 // We waive the global lock after an event has been processed in
82 // order to ensure that 's' doesn't get removed before the op is
83 // called.
84 if ((m_poll->flags() & Poll::flag_waive_global_lock) && thread_base::global_queue_size() != 0)
85 thread_base::waive_global_lock();
86
87 return true;
88 } else {
89 return false;
90 }
91 }
92
93 Poll* m_poll;
94 fd_set* m_set;
95 _Operation m_op;
96 };
97
98 template <typename _Operation>
99 inline poll_check_t<_Operation>
100 poll_check(Poll* p, fd_set* s, _Operation op) {
101 return poll_check_t<_Operation>(p, s, op);
102 }
103
104 struct poll_mark {
105 poll_mark(fd_set* s, unsigned int* m) : m_max(m), m_set(s) {}
106
107 void operator () (Event* s) {
108 // Neither of these checks are nessesary, just for debugging.
109 if (s == NULL)
110 throw internal_error("poll_mark: s == NULL");
111
112 if (s->file_descriptor() < 0)
113 throw internal_error("poll_mark: s->fd < 0");
114
115 *m_max = std::max(*m_max, (unsigned int)s->file_descriptor());
116
117 FD_SET(s->file_descriptor(), m_set);
118 }
create(int maxOpenSockets)119
120 unsigned int* m_max;
121 fd_set* m_set;
122 };
123
124 PollSelect*
125 PollSelect::create(int maxOpenSockets) {
126 if (maxOpenSockets <= 0)
127 throw internal_error("PollSelect::set_open_max(...) received an invalid value");
PollEPoll(int fd,int maxEvents,int maxOpenSockets)128
129 // Just a temp hack, make some special template function for this...
130 //
131 // Also consider how portable this is for specialized C++
132 // allocators.
133 struct block_type {
134 PollSelect t1;
135 SocketSet t2;
136 SocketSet t3;
137 SocketSet t4;
138 };
139
140 rak::cacheline_allocator<Block*> cl_alloc;
141 block_type* block = new (cl_alloc) block_type;
142
143 PollSelect* p = new (&block->t1) PollSelect;
144
poll(int msec)145 p->m_readSet = new (&block->t2) SocketSet;
146 p->m_writeSet = new (&block->t3) SocketSet;
147 p->m_exceptSet = new (&block->t4) SocketSet;
148
149 p->m_readSet->reserve(maxOpenSockets);
150 p->m_writeSet->reserve(maxOpenSockets);
151 p->m_exceptSet->reserve(maxOpenSockets);
152
153 return p;
154 }
155
156 PollSelect::~PollSelect() {
157 m_readSet->prepare();
158 m_writeSet->prepare();
159 m_exceptSet->prepare();
perform()160
161 // Re-add this check when you've cleaned up the client shutdown procedure.
162 if (!m_readSet->empty() || !m_writeSet->empty() || !m_exceptSet->empty()) {
163 throw internal_error("PollSelect::~PollSelect() called but the sets are not empty");
164
165 // for (SocketSet::const_iterator itr = m_readSet->begin(); itr != m_readSet->end(); itr++)
166 // std::cout << "R" << (*itr)->file_descriptor() << std::endl;
167
168 // for (SocketSet::const_iterator itr = m_writeSet->begin(); itr != m_writeSet->end(); itr++)
169 // std::cout << "W" << (*itr)->file_descriptor() << std::endl;
170
171 // for (SocketSet::const_iterator itr = m_exceptSet->begin(); itr != m_exceptSet->end(); itr++)
172 // std::cout << "E" << (*itr)->file_descriptor() << std::endl;
173 }
174
175 // delete m_readSet;
176 // delete m_writeSet;
177 // delete m_exceptSet;
178
179 m_readSet = m_writeSet = m_exceptSet = NULL;
180 }
181
182 uint32_t
183 PollSelect::open_max() const {
184 return m_readSet->max_size();
185 }
186
187 unsigned int
188 PollSelect::fdset(fd_set* readSet, fd_set* writeSet, fd_set* exceptSet) {
189 unsigned int maxFd = 0;
190
191 m_readSet->prepare();
192 std::for_each(m_readSet->begin(), m_readSet->end(), poll_mark(readSet, &maxFd));
193
194 m_writeSet->prepare();
195 std::for_each(m_writeSet->begin(), m_writeSet->end(), poll_mark(writeSet, &maxFd));
196
197 m_exceptSet->prepare();
198 std::for_each(m_exceptSet->begin(), m_exceptSet->end(), poll_mark(exceptSet, &maxFd));
do_poll(int64_t timeout_usec,int flags)199
200 return maxFd;
201 }
202
203 unsigned int
204 PollSelect::perform(fd_set* readSet, fd_set* writeSet, fd_set* exceptSet) {
205 unsigned int count = 0;
206
207 // Make sure we don't do read/write on fd's that are in except. This should
208 // not be a problem as any except call should remove it from the m_*Set's.
209 m_exceptSet->prepare();
210 count += std::count_if(m_exceptSet->begin(), m_exceptSet->end(),
211 poll_check(this, exceptSet, std::mem_fun(&Event::event_error)));
212
213 m_readSet->prepare();
214 count += std::count_if(m_readSet->begin(), m_readSet->end(),
215 poll_check(this, readSet, std::mem_fun(&Event::event_read)));
216
217 m_writeSet->prepare();
218 count += std::count_if(m_writeSet->begin(), m_writeSet->end(),
219 poll_check(this, writeSet, std::mem_fun(&Event::event_write)));
220
221 return count;
222 }
223
224 unsigned int
225 PollSelect::do_poll(int64_t timeout_usec, int flags) {
226 rak::timer timeout = rak::timer(timeout_usec);
227
open_max() const228 timeout += 10;
229
230 uint32_t set_size = open_max();
231
232 char read_set_buffer[set_size];
open(Event * event)233 char write_set_buffer[set_size];
234 char error_set_buffer[set_size];
235 fd_set* read_set = (fd_set*)read_set_buffer;
236 fd_set* write_set = (fd_set*)write_set_buffer;
237 fd_set* error_set = (fd_set*)error_set_buffer;
238 std::memset(read_set_buffer, 0, set_size);
239 std::memset(write_set_buffer, 0, set_size);
240 std::memset(error_set_buffer, 0, set_size);
close(Event * event)241
242 unsigned int maxFd = fdset(read_set, write_set, error_set);
243 timeval t = timeout.tval();
244
245 if (!(flags & poll_worker_thread)) {
246 thread_base::entering_main_polling();
247 thread_base::release_global_lock();
248 }
249
250 int status = select(maxFd + 1, read_set, write_set, error_set, &t);
251
252 if (!(flags & poll_worker_thread)) {
253 thread_base::leaving_main_polling();
254 thread_base::acquire_global_lock();
255 }
256
closed(Event * event)257 if (status == -1) {
258 if (rak::error_number::current().value() != rak::error_number::e_intr) {
259 throw std::runtime_error("PollSelect::work(): " + std::string(rak::error_number::current().c_str()));
260 }
261
262 return 0;
263 }
264
265 return perform(read_set, write_set, error_set);
266 }
267
268 #ifdef LT_LOG_POLL_OPEN
269 inline static void
270 log_poll_open(Event* event) {
271 static int log_fd = -1;
272 char buffer[256];
273
in_read(Event * event)274 if (log_fd == -1) {
275 snprintf(buffer, 256, LT_LOG_POLL_OPEN, getpid());
276
277 if ((log_fd = open(buffer, O_WRONLY | O_CREAT | O_TRUNC)) == -1)
278 throw internal_error("Could not open poll open log file.");
279 }
280
281 unsigned int buf_lenght = snprintf(buffer, 256, "open %i\n",
282 event->fd());
283
in_error(Event * event)284 }
285 #endif
286
287 void
288 PollSelect::open(Event* event) {
289 LT_LOG_EVENT(event, DEBUG, "Open event.", 0);
290
291 if ((uint32_t)event->file_descriptor() >= m_readSet->max_size())
292 throw internal_error("Tried to add a socket to PollSelect that is larger than PollSelect::get_open_max()");
293
294 if (in_read(event) || in_write(event) || in_error(event))
295 throw internal_error("PollSelect::open(...) called on an inserted event");
296 }
297
insert_write(Event * event)298 void
299 PollSelect::close(Event* event) {
300 LT_LOG_EVENT(event, DEBUG, "Close event.", 0);
301
302 if ((uint32_t)event->file_descriptor() >= m_readSet->max_size())
303 throw internal_error("PollSelect::close(...) called with an invalid file descriptor");
304
305 if (in_read(event) || in_write(event) || in_error(event))
306 throw internal_error("PollSelect::close(...) called on an inserted event");
307 }
308
309 void
310 PollSelect::closed(Event* event) {
311 LT_LOG_EVENT(event, DEBUG, "Closed event.", 0);
312
313 // event->get_fd() was closed, remove it from the sets.
314 m_readSet->erase(event);
315 m_writeSet->erase(event);
remove_read(Event * event)316 m_exceptSet->erase(event);
317 }
318
319 bool
320 PollSelect::in_read(Event* event) {
321 return m_readSet->find(event) != m_readSet->end();
322 }
323
remove_write(Event * event)324 bool
325 PollSelect::in_write(Event* event) {
326 return m_writeSet->find(event) != m_writeSet->end();
327 }
328
329 bool
330 PollSelect::in_error(Event* event) {
331 return m_exceptSet->find(event) != m_exceptSet->end();
remove_error(Event * event)332 }
333
334 void
335 PollSelect::insert_read(Event* event) {
336 LT_LOG_EVENT(event, DEBUG, "Insert read.", 0);
337 m_readSet->insert(event);
338 }
339
340 void
341 PollSelect::insert_write(Event* event) {
342 LT_LOG_EVENT(event, DEBUG, "Insert write.", 0);
343 m_writeSet->insert(event);
344 }
345
346 void
347 PollSelect::insert_error(Event* event) {
348 LT_LOG_EVENT(event, DEBUG, "Insert error.", 0);
349 m_exceptSet->insert(event);
350 }
351
352 void
353 PollSelect::remove_read(Event* event) {
354 LT_LOG_EVENT(event, DEBUG, "Remove read.", 0);
355 m_readSet->erase(event);
356 }
357
358 void
359 PollSelect::remove_write(Event* event) {
360 LT_LOG_EVENT(event, DEBUG, "Remove write.", 0);
361 m_writeSet->erase(event);
362 }
363
364 void
365 PollSelect::remove_error(Event* event) {
366 LT_LOG_EVENT(event, DEBUG, "Remove error.", 0);
367 m_exceptSet->erase(event);
368 }
369
370 }
371