1// 2// detail/impl/dev_poll_reactor.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2017 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// 7// Distributed under the Boost Software License, Version 1.0. (See accompanying 8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9// 10 11#ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP 12#define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP 13 14#if defined(_MSC_VER) && (_MSC_VER >= 1200) 15# pragma once 16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 17 18#include <boost/asio/detail/config.hpp> 19 20#if defined(BOOST_ASIO_HAS_DEV_POLL) 21 22#include <boost/asio/detail/dev_poll_reactor.hpp> 23#include <boost/asio/detail/assert.hpp> 24#include <boost/asio/detail/throw_error.hpp> 25#include <boost/asio/error.hpp> 26 27#include <boost/asio/detail/push_options.hpp> 28 29namespace boost { 30namespace asio { 31namespace detail { 32 33dev_poll_reactor::dev_poll_reactor(boost::asio::execution_context& ctx) 34 : boost::asio::detail::execution_context_service_base<dev_poll_reactor>(ctx), 35 scheduler_(use_service<scheduler>(ctx)), 36 mutex_(), 37 dev_poll_fd_(do_dev_poll_create()), 38 interrupter_(), 39 shutdown_(false) 40{ 41 // Add the interrupter's descriptor to /dev/poll. 42 ::pollfd ev = { 0, 0, 0 }; 43 ev.fd = interrupter_.read_descriptor(); 44 ev.events = POLLIN | POLLERR; 45 ev.revents = 0; 46 ::write(dev_poll_fd_, &ev, sizeof(ev)); 47} 48 49dev_poll_reactor::~dev_poll_reactor() 50{ 51 shutdown(); 52 ::close(dev_poll_fd_); 53} 54 55void dev_poll_reactor::shutdown() 56{ 57 boost::asio::detail::mutex::scoped_lock lock(mutex_); 58 shutdown_ = true; 59 lock.unlock(); 60 61 op_queue<operation> ops; 62 63 for (int i = 0; i < max_ops; ++i) 64 op_queue_[i].get_all_operations(ops); 65 66 timer_queues_.get_all_timers(ops); 67 68 scheduler_.abandon_operations(ops); 69} 70 71void dev_poll_reactor::notify_fork( 72 boost::asio::execution_context::fork_event fork_ev) 73{ 74 if (fork_ev == boost::asio::execution_context::fork_child) 75 { 76 detail::mutex::scoped_lock lock(mutex_); 77 78 if (dev_poll_fd_ != -1) 79 ::close(dev_poll_fd_); 80 dev_poll_fd_ = -1; 81 dev_poll_fd_ = do_dev_poll_create(); 82 83 interrupter_.recreate(); 84 85 // Add the interrupter's descriptor to /dev/poll. 86 ::pollfd ev = { 0, 0, 0 }; 87 ev.fd = interrupter_.read_descriptor(); 88 ev.events = POLLIN | POLLERR; 89 ev.revents = 0; 90 ::write(dev_poll_fd_, &ev, sizeof(ev)); 91 92 // Re-register all descriptors with /dev/poll. The changes will be written 93 // to the /dev/poll descriptor the next time the reactor is run. 94 for (int i = 0; i < max_ops; ++i) 95 { 96 reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin(); 97 reactor_op_queue<socket_type>::iterator end = op_queue_[i].end(); 98 for (; iter != end; ++iter) 99 { 100 ::pollfd& pending_ev = add_pending_event_change(iter->first); 101 pending_ev.events |= POLLERR | POLLHUP; 102 switch (i) 103 { 104 case read_op: pending_ev.events |= POLLIN; break; 105 case write_op: pending_ev.events |= POLLOUT; break; 106 case except_op: pending_ev.events |= POLLPRI; break; 107 default: break; 108 } 109 } 110 } 111 interrupter_.interrupt(); 112 } 113} 114 115void dev_poll_reactor::init_task() 116{ 117 scheduler_.init_task(); 118} 119 120int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&) 121{ 122 return 0; 123} 124 125int dev_poll_reactor::register_internal_descriptor(int op_type, 126 socket_type descriptor, per_descriptor_data&, reactor_op* op) 127{ 128 boost::asio::detail::mutex::scoped_lock lock(mutex_); 129 130 op_queue_[op_type].enqueue_operation(descriptor, op); 131 ::pollfd& ev = add_pending_event_change(descriptor); 132 ev.events = POLLERR | POLLHUP; 133 switch (op_type) 134 { 135 case read_op: ev.events |= POLLIN; break; 136 case write_op: ev.events |= POLLOUT; break; 137 case except_op: ev.events |= POLLPRI; break; 138 default: break; 139 } 140 interrupter_.interrupt(); 141 142 return 0; 143} 144 145void dev_poll_reactor::move_descriptor(socket_type, 146 dev_poll_reactor::per_descriptor_data&, 147 dev_poll_reactor::per_descriptor_data&) 148{ 149} 150 151void dev_poll_reactor::start_op(int op_type, socket_type descriptor, 152 dev_poll_reactor::per_descriptor_data&, reactor_op* op, 153 bool is_continuation, bool allow_speculative) 154{ 155 boost::asio::detail::mutex::scoped_lock lock(mutex_); 156 157 if (shutdown_) 158 { 159 post_immediate_completion(op, is_continuation); 160 return; 161 } 162 163 if (allow_speculative) 164 { 165 if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor)) 166 { 167 if (!op_queue_[op_type].has_operation(descriptor)) 168 { 169 if (op->perform()) 170 { 171 lock.unlock(); 172 scheduler_.post_immediate_completion(op, is_continuation); 173 return; 174 } 175 } 176 } 177 } 178 179 bool first = op_queue_[op_type].enqueue_operation(descriptor, op); 180 scheduler_.work_started(); 181 if (first) 182 { 183 ::pollfd& ev = add_pending_event_change(descriptor); 184 ev.events = POLLERR | POLLHUP; 185 if (op_type == read_op 186 || op_queue_[read_op].has_operation(descriptor)) 187 ev.events |= POLLIN; 188 if (op_type == write_op 189 || op_queue_[write_op].has_operation(descriptor)) 190 ev.events |= POLLOUT; 191 if (op_type == except_op 192 || op_queue_[except_op].has_operation(descriptor)) 193 ev.events |= POLLPRI; 194 interrupter_.interrupt(); 195 } 196} 197 198void dev_poll_reactor::cancel_ops(socket_type descriptor, 199 dev_poll_reactor::per_descriptor_data&) 200{ 201 boost::asio::detail::mutex::scoped_lock lock(mutex_); 202 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); 203} 204 205void dev_poll_reactor::deregister_descriptor(socket_type descriptor, 206 dev_poll_reactor::per_descriptor_data&, bool) 207{ 208 boost::asio::detail::mutex::scoped_lock lock(mutex_); 209 210 // Remove the descriptor from /dev/poll. 211 ::pollfd& ev = add_pending_event_change(descriptor); 212 ev.events = POLLREMOVE; 213 interrupter_.interrupt(); 214 215 // Cancel any outstanding operations associated with the descriptor. 216 cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted); 217} 218 219void dev_poll_reactor::deregister_internal_descriptor( 220 socket_type descriptor, dev_poll_reactor::per_descriptor_data&) 221{ 222 boost::asio::detail::mutex::scoped_lock lock(mutex_); 223 224 // Remove the descriptor from /dev/poll. Since this function is only called 225 // during a fork, we can apply the change immediately. 226 ::pollfd ev = { 0, 0, 0 }; 227 ev.fd = descriptor; 228 ev.events = POLLREMOVE; 229 ev.revents = 0; 230 ::write(dev_poll_fd_, &ev, sizeof(ev)); 231 232 // Destroy all operations associated with the descriptor. 233 op_queue<operation> ops; 234 boost::system::error_code ec; 235 for (int i = 0; i < max_ops; ++i) 236 op_queue_[i].cancel_operations(descriptor, ops, ec); 237} 238 239void dev_poll_reactor::cleanup_descriptor_data( 240 dev_poll_reactor::per_descriptor_data&) 241{ 242} 243 244void dev_poll_reactor::run(long usec, op_queue<operation>& ops) 245{ 246 boost::asio::detail::mutex::scoped_lock lock(mutex_); 247 248 // We can return immediately if there's no work to do and the reactor is 249 // not supposed to block. 250 if (usec == 0 && op_queue_[read_op].empty() && op_queue_[write_op].empty() 251 && op_queue_[except_op].empty() && timer_queues_.all_empty()) 252 return; 253 254 // Write the pending event registration changes to the /dev/poll descriptor. 255 std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size(); 256 if (events_size > 0) 257 { 258 errno = 0; 259 int result = ::write(dev_poll_fd_, 260 &pending_event_changes_[0], events_size); 261 if (result != static_cast<int>(events_size)) 262 { 263 boost::system::error_code ec = boost::system::error_code( 264 errno, boost::asio::error::get_system_category()); 265 for (std::size_t i = 0; i < pending_event_changes_.size(); ++i) 266 { 267 int descriptor = pending_event_changes_[i].fd; 268 for (int j = 0; j < max_ops; ++j) 269 op_queue_[j].cancel_operations(descriptor, ops, ec); 270 } 271 } 272 pending_event_changes_.clear(); 273 pending_event_change_index_.clear(); 274 } 275 276 // Calculate timeout. 277 int timeout; 278 if (usec == 0) 279 timeout = 0; 280 else 281 { 282 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1); 283 timeout = get_timeout(timeout); 284 } 285 lock.unlock(); 286 287 // Block on the /dev/poll descriptor. 288 ::pollfd events[128] = { { 0, 0, 0 } }; 289 ::dvpoll dp = { 0, 0, 0 }; 290 dp.dp_fds = events; 291 dp.dp_nfds = 128; 292 dp.dp_timeout = timeout; 293 int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp); 294 295 lock.lock(); 296 297 // Dispatch the waiting events. 298 for (int i = 0; i < num_events; ++i) 299 { 300 int descriptor = events[i].fd; 301 if (descriptor == interrupter_.read_descriptor()) 302 { 303 interrupter_.reset(); 304 } 305 else 306 { 307 bool more_reads = false; 308 bool more_writes = false; 309 bool more_except = false; 310 311 // Exception operations must be processed first to ensure that any 312 // out-of-band data is read before normal data. 313 if (events[i].events & (POLLPRI | POLLERR | POLLHUP)) 314 more_except = 315 op_queue_[except_op].perform_operations(descriptor, ops); 316 else 317 more_except = op_queue_[except_op].has_operation(descriptor); 318 319 if (events[i].events & (POLLIN | POLLERR | POLLHUP)) 320 more_reads = op_queue_[read_op].perform_operations(descriptor, ops); 321 else 322 more_reads = op_queue_[read_op].has_operation(descriptor); 323 324 if (events[i].events & (POLLOUT | POLLERR | POLLHUP)) 325 more_writes = op_queue_[write_op].perform_operations(descriptor, ops); 326 else 327 more_writes = op_queue_[write_op].has_operation(descriptor); 328 329 if ((events[i].events & (POLLERR | POLLHUP)) != 0 330 && !more_except && !more_reads && !more_writes) 331 { 332 // If we have an event and no operations associated with the 333 // descriptor then we need to delete the descriptor from /dev/poll. 334 // The poll operation can produce POLLHUP or POLLERR events when there 335 // is no operation pending, so if we do not remove the descriptor we 336 // can end up in a tight polling loop. 337 ::pollfd ev = { 0, 0, 0 }; 338 ev.fd = descriptor; 339 ev.events = POLLREMOVE; 340 ev.revents = 0; 341 ::write(dev_poll_fd_, &ev, sizeof(ev)); 342 } 343 else 344 { 345 ::pollfd ev = { 0, 0, 0 }; 346 ev.fd = descriptor; 347 ev.events = POLLERR | POLLHUP; 348 if (more_reads) 349 ev.events |= POLLIN; 350 if (more_writes) 351 ev.events |= POLLOUT; 352 if (more_except) 353 ev.events |= POLLPRI; 354 ev.revents = 0; 355 int result = ::write(dev_poll_fd_, &ev, sizeof(ev)); 356 if (result != sizeof(ev)) 357 { 358 boost::system::error_code ec(errno, 359 boost::asio::error::get_system_category()); 360 for (int j = 0; j < max_ops; ++j) 361 op_queue_[j].cancel_operations(descriptor, ops, ec); 362 } 363 } 364 } 365 } 366 timer_queues_.get_ready_timers(ops); 367} 368 369void dev_poll_reactor::interrupt() 370{ 371 interrupter_.interrupt(); 372} 373 374int dev_poll_reactor::do_dev_poll_create() 375{ 376 int fd = ::open("/dev/poll", O_RDWR); 377 if (fd == -1) 378 { 379 boost::system::error_code ec(errno, 380 boost::asio::error::get_system_category()); 381 boost::asio::detail::throw_error(ec, "/dev/poll"); 382 } 383 return fd; 384} 385 386void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue) 387{ 388 mutex::scoped_lock lock(mutex_); 389 timer_queues_.insert(&queue); 390} 391 392void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue) 393{ 394 mutex::scoped_lock lock(mutex_); 395 timer_queues_.erase(&queue); 396} 397 398int dev_poll_reactor::get_timeout(int msec) 399{ 400 // By default we will wait no longer than 5 minutes. This will ensure that 401 // any changes to the system clock are detected after no longer than this. 402 const int max_msec = 5 * 60 * 1000; 403 return timer_queues_.wait_duration_msec( 404 (msec < 0 || max_msec < msec) ? max_msec : msec); 405} 406 407void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor, 408 const boost::system::error_code& ec) 409{ 410 bool need_interrupt = false; 411 op_queue<operation> ops; 412 for (int i = 0; i < max_ops; ++i) 413 need_interrupt = op_queue_[i].cancel_operations( 414 descriptor, ops, ec) || need_interrupt; 415 scheduler_.post_deferred_completions(ops); 416 if (need_interrupt) 417 interrupter_.interrupt(); 418} 419 420::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor) 421{ 422 hash_map<int, std::size_t>::iterator iter 423 = pending_event_change_index_.find(descriptor); 424 if (iter == pending_event_change_index_.end()) 425 { 426 std::size_t index = pending_event_changes_.size(); 427 pending_event_changes_.reserve(pending_event_changes_.size() + 1); 428 pending_event_change_index_.insert(std::make_pair(descriptor, index)); 429 pending_event_changes_.push_back(::pollfd()); 430 pending_event_changes_[index].fd = descriptor; 431 pending_event_changes_[index].revents = 0; 432 return pending_event_changes_[index]; 433 } 434 else 435 { 436 return pending_event_changes_[iter->second]; 437 } 438} 439 440} // namespace detail 441} // namespace asio 442} // namespace boost 443 444#include <boost/asio/detail/pop_options.hpp> 445 446#endif // defined(BOOST_ASIO_HAS_DEV_POLL) 447 448#endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP 449