1// 2// detail/impl/kqueue_reactor.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// Copyright (c) 2005 Stefan Arentz (stefan at soze dot com) 7// 8// Distributed under the Boost Software License, Version 1.0. (See accompanying 9// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 10// 11 12#ifndef ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP 13#define ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP 14 15#if defined(_MSC_VER) && (_MSC_VER >= 1200) 16# pragma once 17#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 18 19#include "asio/detail/config.hpp" 20 21#if defined(ASIO_HAS_KQUEUE) 22 23#include "asio/detail/kqueue_reactor.hpp" 24#include "asio/detail/throw_error.hpp" 25#include "asio/error.hpp" 26 27#include "asio/detail/push_options.hpp" 28 29#if defined(__NetBSD__) 30# define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \ 31 EV_SET(ev, ident, filt, flags, fflags, data, \ 32 reinterpret_cast<intptr_t>(static_cast<void*>(udata))) 33#else 34# define ASIO_KQUEUE_EV_SET(ev, ident, filt, flags, fflags, data, udata) \ 35 EV_SET(ev, ident, filt, flags, fflags, data, udata) 36#endif 37 38namespace clmdep_asio { 39namespace detail { 40 41kqueue_reactor::kqueue_reactor(clmdep_asio::io_service& io_service) 42 : clmdep_asio::detail::service_base<kqueue_reactor>(io_service), 43 io_service_(use_service<io_service_impl>(io_service)), 44 mutex_(), 45 kqueue_fd_(do_kqueue_create()), 46 interrupter_(), 47 shutdown_(false) 48{ 49 struct kevent events[1]; 50 ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(), 51 EVFILT_READ, EV_ADD, 0, 0, &interrupter_); 52 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) 53 { 54 clmdep_asio::error_code error(errno, 55 clmdep_asio::error::get_system_category()); 56 clmdep_asio::detail::throw_error(error); 57 } 58} 59 60kqueue_reactor::~kqueue_reactor() 61{ 62 close(kqueue_fd_); 63} 64 65void kqueue_reactor::shutdown_service() 66{ 67 mutex::scoped_lock lock(mutex_); 68 shutdown_ = true; 69 lock.unlock(); 70 71 op_queue<operation> ops; 72 73 while (descriptor_state* state = registered_descriptors_.first()) 74 { 75 for (int i = 0; i < max_ops; ++i) 76 ops.push(state->op_queue_[i]); 77 state->shutdown_ = true; 78 registered_descriptors_.free(state); 79 } 80 81 timer_queues_.get_all_timers(ops); 82 83 io_service_.abandon_operations(ops); 84} 85 86void kqueue_reactor::fork_service(clmdep_asio::io_service::fork_event fork_ev) 87{ 88 if (fork_ev == clmdep_asio::io_service::fork_child) 89 { 90 // The kqueue descriptor is automatically closed in the child. 91 kqueue_fd_ = -1; 92 kqueue_fd_ = do_kqueue_create(); 93 94 interrupter_.recreate(); 95 96 struct kevent events[2]; 97 ASIO_KQUEUE_EV_SET(&events[0], interrupter_.read_descriptor(), 98 EVFILT_READ, EV_ADD, 0, 0, &interrupter_); 99 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) 100 { 101 clmdep_asio::error_code ec(errno, 102 clmdep_asio::error::get_system_category()); 103 clmdep_asio::detail::throw_error(ec, "kqueue interrupter registration"); 104 } 105 106 // Re-register all descriptors with kqueue. 107 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 108 for (descriptor_state* state = registered_descriptors_.first(); 109 state != 0; state = state->next_) 110 { 111 if (state->num_kevents_ > 0) 112 { 113 ASIO_KQUEUE_EV_SET(&events[0], state->descriptor_, 114 EVFILT_READ, EV_ADD | EV_CLEAR, 0, 0, state); 115 ASIO_KQUEUE_EV_SET(&events[1], state->descriptor_, 116 EVFILT_WRITE, EV_ADD | EV_CLEAR, 0, 0, state); 117 if (::kevent(kqueue_fd_, events, state->num_kevents_, 0, 0, 0) == -1) 118 { 119 clmdep_asio::error_code ec(errno, 120 clmdep_asio::error::get_system_category()); 121 clmdep_asio::detail::throw_error(ec, "kqueue re-registration"); 122 } 123 } 124 } 125 } 126} 127 128void kqueue_reactor::init_task() 129{ 130 io_service_.init_task(); 131} 132 133int kqueue_reactor::register_descriptor(socket_type descriptor, 134 kqueue_reactor::per_descriptor_data& descriptor_data) 135{ 136 descriptor_data = allocate_descriptor_state(); 137 138 mutex::scoped_lock lock(descriptor_data->mutex_); 139 140 descriptor_data->descriptor_ = descriptor; 141 descriptor_data->num_kevents_ = 0; 142 descriptor_data->shutdown_ = false; 143 144 return 0; 145} 146 147int kqueue_reactor::register_internal_descriptor( 148 int op_type, socket_type descriptor, 149 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op) 150{ 151 descriptor_data = allocate_descriptor_state(); 152 153 mutex::scoped_lock lock(descriptor_data->mutex_); 154 155 descriptor_data->descriptor_ = descriptor; 156 descriptor_data->num_kevents_ = 1; 157 descriptor_data->shutdown_ = false; 158 descriptor_data->op_queue_[op_type].push(op); 159 160 struct kevent events[1]; 161 ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, 162 EV_ADD | EV_CLEAR, 0, 0, descriptor_data); 163 if (::kevent(kqueue_fd_, events, 1, 0, 0, 0) == -1) 164 return errno; 165 166 return 0; 167} 168 169void kqueue_reactor::move_descriptor(socket_type, 170 kqueue_reactor::per_descriptor_data& target_descriptor_data, 171 kqueue_reactor::per_descriptor_data& source_descriptor_data) 172{ 173 target_descriptor_data = source_descriptor_data; 174 source_descriptor_data = 0; 175} 176 177void kqueue_reactor::start_op(int op_type, socket_type descriptor, 178 kqueue_reactor::per_descriptor_data& descriptor_data, reactor_op* op, 179 bool is_continuation, bool allow_speculative) 180{ 181 if (!descriptor_data) 182 { 183 op->ec_ = clmdep_asio::error::bad_descriptor; 184 post_immediate_completion(op, is_continuation); 185 return; 186 } 187 188 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 189 190 if (descriptor_data->shutdown_) 191 { 192 post_immediate_completion(op, is_continuation); 193 return; 194 } 195 196 if (descriptor_data->op_queue_[op_type].empty()) 197 { 198 static const int num_kevents[max_ops] = { 1, 2, 1 }; 199 200 if (allow_speculative 201 && (op_type != read_op 202 || descriptor_data->op_queue_[except_op].empty())) 203 { 204 if (op->perform()) 205 { 206 descriptor_lock.unlock(); 207 io_service_.post_immediate_completion(op, is_continuation); 208 return; 209 } 210 211 if (descriptor_data->num_kevents_ < num_kevents[op_type]) 212 { 213 struct kevent events[2]; 214 ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, 215 EV_ADD | EV_CLEAR, 0, 0, descriptor_data); 216 ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, 217 EV_ADD | EV_CLEAR, 0, 0, descriptor_data); 218 if (::kevent(kqueue_fd_, events, num_kevents[op_type], 0, 0, 0) != -1) 219 { 220 descriptor_data->num_kevents_ = num_kevents[op_type]; 221 } 222 else 223 { 224 op->ec_ = clmdep_asio::error_code(errno, 225 clmdep_asio::error::get_system_category()); 226 io_service_.post_immediate_completion(op, is_continuation); 227 return; 228 } 229 } 230 } 231 else 232 { 233 if (descriptor_data->num_kevents_ < num_kevents[op_type]) 234 descriptor_data->num_kevents_ = num_kevents[op_type]; 235 236 struct kevent events[2]; 237 ASIO_KQUEUE_EV_SET(&events[0], descriptor, EVFILT_READ, 238 EV_ADD | EV_CLEAR, 0, 0, descriptor_data); 239 ASIO_KQUEUE_EV_SET(&events[1], descriptor, EVFILT_WRITE, 240 EV_ADD | EV_CLEAR, 0, 0, descriptor_data); 241 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); 242 } 243 } 244 245 descriptor_data->op_queue_[op_type].push(op); 246 io_service_.work_started(); 247} 248 249void kqueue_reactor::cancel_ops(socket_type, 250 kqueue_reactor::per_descriptor_data& descriptor_data) 251{ 252 if (!descriptor_data) 253 return; 254 255 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 256 257 op_queue<operation> ops; 258 for (int i = 0; i < max_ops; ++i) 259 { 260 while (reactor_op* op = descriptor_data->op_queue_[i].front()) 261 { 262 op->ec_ = clmdep_asio::error::operation_aborted; 263 descriptor_data->op_queue_[i].pop(); 264 ops.push(op); 265 } 266 } 267 268 descriptor_lock.unlock(); 269 270 io_service_.post_deferred_completions(ops); 271} 272 273void kqueue_reactor::deregister_descriptor(socket_type descriptor, 274 kqueue_reactor::per_descriptor_data& descriptor_data, bool closing) 275{ 276 if (!descriptor_data) 277 return; 278 279 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 280 281 if (!descriptor_data->shutdown_) 282 { 283 if (closing) 284 { 285 // The descriptor will be automatically removed from the kqueue when it 286 // is closed. 287 } 288 else 289 { 290 struct kevent events[2]; 291 ASIO_KQUEUE_EV_SET(&events[0], descriptor, 292 EVFILT_READ, EV_DELETE, 0, 0, 0); 293 ASIO_KQUEUE_EV_SET(&events[1], descriptor, 294 EVFILT_WRITE, EV_DELETE, 0, 0, 0); 295 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); 296 } 297 298 op_queue<operation> ops; 299 for (int i = 0; i < max_ops; ++i) 300 { 301 while (reactor_op* op = descriptor_data->op_queue_[i].front()) 302 { 303 op->ec_ = clmdep_asio::error::operation_aborted; 304 descriptor_data->op_queue_[i].pop(); 305 ops.push(op); 306 } 307 } 308 309 descriptor_data->descriptor_ = -1; 310 descriptor_data->shutdown_ = true; 311 312 descriptor_lock.unlock(); 313 314 free_descriptor_state(descriptor_data); 315 descriptor_data = 0; 316 317 io_service_.post_deferred_completions(ops); 318 } 319} 320 321void kqueue_reactor::deregister_internal_descriptor(socket_type descriptor, 322 kqueue_reactor::per_descriptor_data& descriptor_data) 323{ 324 if (!descriptor_data) 325 return; 326 327 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 328 329 if (!descriptor_data->shutdown_) 330 { 331 struct kevent events[2]; 332 ASIO_KQUEUE_EV_SET(&events[0], descriptor, 333 EVFILT_READ, EV_DELETE, 0, 0, 0); 334 ASIO_KQUEUE_EV_SET(&events[1], descriptor, 335 EVFILT_WRITE, EV_DELETE, 0, 0, 0); 336 ::kevent(kqueue_fd_, events, descriptor_data->num_kevents_, 0, 0, 0); 337 338 op_queue<operation> ops; 339 for (int i = 0; i < max_ops; ++i) 340 ops.push(descriptor_data->op_queue_[i]); 341 342 descriptor_data->descriptor_ = -1; 343 descriptor_data->shutdown_ = true; 344 345 descriptor_lock.unlock(); 346 347 free_descriptor_state(descriptor_data); 348 descriptor_data = 0; 349 } 350} 351 352void kqueue_reactor::run(bool block, op_queue<operation>& ops) 353{ 354 mutex::scoped_lock lock(mutex_); 355 356 // Determine how long to block while waiting for events. 357 timespec timeout_buf = { 0, 0 }; 358 timespec* timeout = block ? get_timeout(timeout_buf) : &timeout_buf; 359 360 lock.unlock(); 361 362 // Block on the kqueue descriptor. 363 struct kevent events[128]; 364 int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); 365 366 // Dispatch the waiting events. 367 for (int i = 0; i < num_events; ++i) 368 { 369 void* ptr = reinterpret_cast<void*>(events[i].udata); 370 if (ptr == &interrupter_) 371 { 372 interrupter_.reset(); 373 } 374 else 375 { 376 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); 377 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 378 379 if (events[i].filter == EVFILT_WRITE 380 && descriptor_data->num_kevents_ == 2 381 && descriptor_data->op_queue_[write_op].empty()) 382 { 383 // Some descriptor types, like serial ports, don't seem to support 384 // EV_CLEAR with EVFILT_WRITE. Since we have no pending write 385 // operations we'll remove the EVFILT_WRITE registration here so that 386 // we don't end up in a tight spin. 387 struct kevent delete_events[1]; 388 ASIO_KQUEUE_EV_SET(&delete_events[0], 389 descriptor_data->descriptor_, EVFILT_WRITE, EV_DELETE, 0, 0, 0); 390 ::kevent(kqueue_fd_, delete_events, 1, 0, 0, 0); 391 descriptor_data->num_kevents_ = 1; 392 } 393 394 // Exception operations must be processed first to ensure that any 395 // out-of-band data is read before normal data. 396#if defined(__NetBSD__) 397 static const unsigned int filter[max_ops] = 398#else 399 static const int filter[max_ops] = 400#endif 401 { EVFILT_READ, EVFILT_WRITE, EVFILT_READ }; 402 for (int j = max_ops - 1; j >= 0; --j) 403 { 404 if (events[i].filter == filter[j]) 405 { 406 if (j != except_op || events[i].flags & EV_OOBAND) 407 { 408 while (reactor_op* op = descriptor_data->op_queue_[j].front()) 409 { 410 if (events[i].flags & EV_ERROR) 411 { 412 op->ec_ = clmdep_asio::error_code( 413 static_cast<int>(events[i].data), 414 clmdep_asio::error::get_system_category()); 415 descriptor_data->op_queue_[j].pop(); 416 ops.push(op); 417 } 418 if (op->perform()) 419 { 420 descriptor_data->op_queue_[j].pop(); 421 ops.push(op); 422 } 423 else 424 break; 425 } 426 } 427 } 428 } 429 } 430 } 431 432 lock.lock(); 433 timer_queues_.get_ready_timers(ops); 434} 435 436void kqueue_reactor::interrupt() 437{ 438 interrupter_.interrupt(); 439} 440 441int kqueue_reactor::do_kqueue_create() 442{ 443 int fd = ::kqueue(); 444 if (fd == -1) 445 { 446 clmdep_asio::error_code ec(errno, 447 clmdep_asio::error::get_system_category()); 448 clmdep_asio::detail::throw_error(ec, "kqueue"); 449 } 450 return fd; 451} 452 453kqueue_reactor::descriptor_state* kqueue_reactor::allocate_descriptor_state() 454{ 455 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 456 return registered_descriptors_.alloc(); 457} 458 459void kqueue_reactor::free_descriptor_state(kqueue_reactor::descriptor_state* s) 460{ 461 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 462 registered_descriptors_.free(s); 463} 464 465void kqueue_reactor::do_add_timer_queue(timer_queue_base& queue) 466{ 467 mutex::scoped_lock lock(mutex_); 468 timer_queues_.insert(&queue); 469} 470 471void kqueue_reactor::do_remove_timer_queue(timer_queue_base& queue) 472{ 473 mutex::scoped_lock lock(mutex_); 474 timer_queues_.erase(&queue); 475} 476 477timespec* kqueue_reactor::get_timeout(timespec& ts) 478{ 479 // By default we will wait no longer than 5 minutes. This will ensure that 480 // any changes to the system clock are detected after no longer than this. 481 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); 482 ts.tv_sec = usec / 1000000; 483 ts.tv_nsec = (usec % 1000000) * 1000; 484 return &ts; 485} 486 487} // namespace detail 488} // namespace clmdep_asio 489 490#undef ASIO_KQUEUE_EV_SET 491 492#include "asio/detail/pop_options.hpp" 493 494#endif // defined(ASIO_HAS_KQUEUE) 495 496#endif // ASIO_DETAIL_IMPL_KQUEUE_REACTOR_IPP 497