1// 2// detail/impl/epoll_reactor.ipp 3// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4// 5// Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com) 6// 7// Distributed under the Boost Software License, Version 1.0. (See accompanying 8// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9// 10 11#ifndef ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP 12#define ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP 13 14#if defined(_MSC_VER) && (_MSC_VER >= 1200) 15# pragma once 16#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) 17 18#include "asio/detail/config.hpp" 19 20#if defined(ASIO_HAS_EPOLL) 21 22#include <cstddef> 23#include <sys/epoll.h> 24#include "asio/detail/epoll_reactor.hpp" 25#include "asio/detail/throw_error.hpp" 26#include "asio/error.hpp" 27 28#if defined(ASIO_HAS_TIMERFD) 29# include <sys/timerfd.h> 30#endif // defined(ASIO_HAS_TIMERFD) 31 32#include "asio/detail/push_options.hpp" 33 34namespace asio { 35namespace detail { 36 37epoll_reactor::epoll_reactor(asio::execution_context& ctx) 38 : execution_context_service_base<epoll_reactor>(ctx), 39 scheduler_(use_service<scheduler>(ctx)), 40 mutex_(ASIO_CONCURRENCY_HINT_IS_LOCKING( 41 SCHEDULER, scheduler_.concurrency_hint())), 42 interrupter_(), 43 epoll_fd_(do_epoll_create()), 44 timer_fd_(do_timerfd_create()), 45 shutdown_(false), 46 registered_descriptors_mutex_(mutex_.enabled()) 47{ 48 // Add the interrupter's descriptor to epoll. 49 epoll_event ev = { 0, { 0 } }; 50 ev.events = EPOLLIN | EPOLLERR | EPOLLET; 51 ev.data.ptr = &interrupter_; 52 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); 53 interrupter_.interrupt(); 54 55 // Add the timer descriptor to epoll. 56 if (timer_fd_ != -1) 57 { 58 ev.events = EPOLLIN | EPOLLERR; 59 ev.data.ptr = &timer_fd_; 60 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); 61 } 62} 63 64epoll_reactor::~epoll_reactor() 65{ 66 if (epoll_fd_ != -1) 67 close(epoll_fd_); 68 if (timer_fd_ != -1) 69 close(timer_fd_); 70} 71 72void epoll_reactor::shutdown() 73{ 74 mutex::scoped_lock lock(mutex_); 75 shutdown_ = true; 76 lock.unlock(); 77 78 op_queue<operation> ops; 79 80 while (descriptor_state* state = registered_descriptors_.first()) 81 { 82 for (int i = 0; i < max_ops; ++i) 83 ops.push(state->op_queue_[i]); 84 state->shutdown_ = true; 85 registered_descriptors_.free(state); 86 } 87 88 timer_queues_.get_all_timers(ops); 89 90 scheduler_.abandon_operations(ops); 91} 92 93void epoll_reactor::notify_fork( 94 asio::execution_context::fork_event fork_ev) 95{ 96 if (fork_ev == asio::execution_context::fork_child) 97 { 98 if (epoll_fd_ != -1) 99 ::close(epoll_fd_); 100 epoll_fd_ = -1; 101 epoll_fd_ = do_epoll_create(); 102 103 if (timer_fd_ != -1) 104 ::close(timer_fd_); 105 timer_fd_ = -1; 106 timer_fd_ = do_timerfd_create(); 107 108 interrupter_.recreate(); 109 110 // Add the interrupter's descriptor to epoll. 111 epoll_event ev = { 0, { 0 } }; 112 ev.events = EPOLLIN | EPOLLERR | EPOLLET; 113 ev.data.ptr = &interrupter_; 114 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupter_.read_descriptor(), &ev); 115 interrupter_.interrupt(); 116 117 // Add the timer descriptor to epoll. 118 if (timer_fd_ != -1) 119 { 120 ev.events = EPOLLIN | EPOLLERR; 121 ev.data.ptr = &timer_fd_; 122 epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &ev); 123 } 124 125 update_timeout(); 126 127 // Re-register all descriptors with epoll. 128 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 129 for (descriptor_state* state = registered_descriptors_.first(); 130 state != 0; state = state->next_) 131 { 132 ev.events = state->registered_events_; 133 ev.data.ptr = state; 134 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, state->descriptor_, &ev); 135 if (result != 0) 136 { 137 asio::error_code ec(errno, 138 asio::error::get_system_category()); 139 asio::detail::throw_error(ec, "epoll re-registration"); 140 } 141 } 142 } 143} 144 145void epoll_reactor::init_task() 146{ 147 scheduler_.init_task(); 148} 149 150int epoll_reactor::register_descriptor(socket_type descriptor, 151 epoll_reactor::per_descriptor_data& descriptor_data) 152{ 153 descriptor_data = allocate_descriptor_state(); 154 155 ASIO_HANDLER_REACTOR_REGISTRATION(( 156 context(), static_cast<uintmax_t>(descriptor), 157 reinterpret_cast<uintmax_t>(descriptor_data))); 158 159 { 160 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 161 162 descriptor_data->reactor_ = this; 163 descriptor_data->descriptor_ = descriptor; 164 descriptor_data->shutdown_ = false; 165 for (int i = 0; i < max_ops; ++i) 166 descriptor_data->try_speculative_[i] = true; 167 } 168 169 epoll_event ev = { 0, { 0 } }; 170 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; 171 descriptor_data->registered_events_ = ev.events; 172 ev.data.ptr = descriptor_data; 173 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 174 if (result != 0) 175 { 176 if (errno == EPERM) 177 { 178 // This file descriptor type is not supported by epoll. However, if it is 179 // a regular file then operations on it will not block. We will allow 180 // this descriptor to be used and fail later if an operation on it would 181 // otherwise require a trip through the reactor. 182 descriptor_data->registered_events_ = 0; 183 return 0; 184 } 185 return errno; 186 } 187 188 return 0; 189} 190 191int epoll_reactor::register_internal_descriptor( 192 int op_type, socket_type descriptor, 193 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op) 194{ 195 descriptor_data = allocate_descriptor_state(); 196 197 ASIO_HANDLER_REACTOR_REGISTRATION(( 198 context(), static_cast<uintmax_t>(descriptor), 199 reinterpret_cast<uintmax_t>(descriptor_data))); 200 201 { 202 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 203 204 descriptor_data->reactor_ = this; 205 descriptor_data->descriptor_ = descriptor; 206 descriptor_data->shutdown_ = false; 207 descriptor_data->op_queue_[op_type].push(op); 208 for (int i = 0; i < max_ops; ++i) 209 descriptor_data->try_speculative_[i] = true; 210 } 211 212 epoll_event ev = { 0, { 0 } }; 213 ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; 214 descriptor_data->registered_events_ = ev.events; 215 ev.data.ptr = descriptor_data; 216 int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); 217 if (result != 0) 218 return errno; 219 220 return 0; 221} 222 223void epoll_reactor::move_descriptor(socket_type, 224 epoll_reactor::per_descriptor_data& target_descriptor_data, 225 epoll_reactor::per_descriptor_data& source_descriptor_data) 226{ 227 target_descriptor_data = source_descriptor_data; 228 source_descriptor_data = 0; 229} 230 231void epoll_reactor::start_op(int op_type, socket_type descriptor, 232 epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op, 233 bool is_continuation, bool allow_speculative) 234{ 235 if (!descriptor_data) 236 { 237 op->ec_ = asio::error::bad_descriptor; 238 post_immediate_completion(op, is_continuation); 239 return; 240 } 241 242 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 243 244 if (descriptor_data->shutdown_) 245 { 246 post_immediate_completion(op, is_continuation); 247 return; 248 } 249 250 if (descriptor_data->op_queue_[op_type].empty()) 251 { 252 if (allow_speculative 253 && (op_type != read_op 254 || descriptor_data->op_queue_[except_op].empty())) 255 { 256 if (descriptor_data->try_speculative_[op_type]) 257 { 258 if (reactor_op::status status = op->perform()) 259 { 260 if (status == reactor_op::done_and_exhausted) 261 if (descriptor_data->registered_events_ != 0) 262 descriptor_data->try_speculative_[op_type] = false; 263 descriptor_lock.unlock(); 264 scheduler_.post_immediate_completion(op, is_continuation); 265 return; 266 } 267 } 268 269 if (descriptor_data->registered_events_ == 0) 270 { 271 op->ec_ = asio::error::operation_not_supported; 272 scheduler_.post_immediate_completion(op, is_continuation); 273 return; 274 } 275 276 if (op_type == write_op) 277 { 278 if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) 279 { 280 epoll_event ev = { 0, { 0 } }; 281 ev.events = descriptor_data->registered_events_ | EPOLLOUT; 282 ev.data.ptr = descriptor_data; 283 if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) 284 { 285 descriptor_data->registered_events_ |= ev.events; 286 } 287 else 288 { 289 op->ec_ = asio::error_code(errno, 290 asio::error::get_system_category()); 291 scheduler_.post_immediate_completion(op, is_continuation); 292 return; 293 } 294 } 295 } 296 } 297 else if (descriptor_data->registered_events_ == 0) 298 { 299 op->ec_ = asio::error::operation_not_supported; 300 scheduler_.post_immediate_completion(op, is_continuation); 301 return; 302 } 303 else 304 { 305 if (op_type == write_op) 306 { 307 descriptor_data->registered_events_ |= EPOLLOUT; 308 } 309 310 epoll_event ev = { 0, { 0 } }; 311 ev.events = descriptor_data->registered_events_; 312 ev.data.ptr = descriptor_data; 313 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); 314 } 315 } 316 317 descriptor_data->op_queue_[op_type].push(op); 318 scheduler_.work_started(); 319} 320 321void epoll_reactor::cancel_ops(socket_type, 322 epoll_reactor::per_descriptor_data& descriptor_data) 323{ 324 if (!descriptor_data) 325 return; 326 327 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 328 329 op_queue<operation> ops; 330 for (int i = 0; i < max_ops; ++i) 331 { 332 while (reactor_op* op = descriptor_data->op_queue_[i].front()) 333 { 334 op->ec_ = asio::error::operation_aborted; 335 descriptor_data->op_queue_[i].pop(); 336 ops.push(op); 337 } 338 } 339 340 descriptor_lock.unlock(); 341 342 scheduler_.post_deferred_completions(ops); 343} 344 345void epoll_reactor::deregister_descriptor(socket_type descriptor, 346 epoll_reactor::per_descriptor_data& descriptor_data, bool closing) 347{ 348 if (!descriptor_data) 349 return; 350 351 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 352 353 if (!descriptor_data->shutdown_) 354 { 355 if (closing) 356 { 357 // The descriptor will be automatically removed from the epoll set when 358 // it is closed. 359 } 360 else if (descriptor_data->registered_events_ != 0) 361 { 362 epoll_event ev = { 0, { 0 } }; 363 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); 364 } 365 366 op_queue<operation> ops; 367 for (int i = 0; i < max_ops; ++i) 368 { 369 while (reactor_op* op = descriptor_data->op_queue_[i].front()) 370 { 371 op->ec_ = asio::error::operation_aborted; 372 descriptor_data->op_queue_[i].pop(); 373 ops.push(op); 374 } 375 } 376 377 descriptor_data->descriptor_ = -1; 378 descriptor_data->shutdown_ = true; 379 380 descriptor_lock.unlock(); 381 382 ASIO_HANDLER_REACTOR_DEREGISTRATION(( 383 context(), static_cast<uintmax_t>(descriptor), 384 reinterpret_cast<uintmax_t>(descriptor_data))); 385 386 free_descriptor_state(descriptor_data); 387 descriptor_data = 0; 388 389 scheduler_.post_deferred_completions(ops); 390 } 391} 392 393void epoll_reactor::deregister_internal_descriptor(socket_type descriptor, 394 epoll_reactor::per_descriptor_data& descriptor_data) 395{ 396 if (!descriptor_data) 397 return; 398 399 mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); 400 401 if (!descriptor_data->shutdown_) 402 { 403 epoll_event ev = { 0, { 0 } }; 404 epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, descriptor, &ev); 405 406 op_queue<operation> ops; 407 for (int i = 0; i < max_ops; ++i) 408 ops.push(descriptor_data->op_queue_[i]); 409 410 descriptor_data->descriptor_ = -1; 411 descriptor_data->shutdown_ = true; 412 413 descriptor_lock.unlock(); 414 415 ASIO_HANDLER_REACTOR_DEREGISTRATION(( 416 context(), static_cast<uintmax_t>(descriptor), 417 reinterpret_cast<uintmax_t>(descriptor_data))); 418 419 free_descriptor_state(descriptor_data); 420 descriptor_data = 0; 421 } 422} 423 424void epoll_reactor::run(long usec, op_queue<operation>& ops) 425{ 426 // This code relies on the fact that the scheduler queues the reactor task 427 // behind all descriptor operations generated by this function. This means, 428 // that by the time we reach this point, any previously returned descriptor 429 // operations have already been dequeued. Therefore it is now safe for us to 430 // reuse and return them for the scheduler to queue again. 431 432 // Calculate timeout. Check the timer queues only if timerfd is not in use. 433 int timeout; 434 if (usec == 0) 435 timeout = 0; 436 else 437 { 438 timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1); 439 if (timer_fd_ == -1) 440 { 441 mutex::scoped_lock lock(mutex_); 442 timeout = get_timeout(timeout); 443 } 444 } 445 446 // Block on the epoll descriptor. 447 epoll_event events[128]; 448 int num_events = epoll_wait(epoll_fd_, events, 128, timeout); 449 450#if defined(ASIO_ENABLE_HANDLER_TRACKING) 451 // Trace the waiting events. 452 for (int i = 0; i < num_events; ++i) 453 { 454 void* ptr = events[i].data.ptr; 455 if (ptr == &interrupter_) 456 { 457 // Ignore. 458 } 459# if defined(ASIO_HAS_TIMERFD) 460 else if (ptr == &timer_fd_) 461 { 462 // Ignore. 463 } 464# endif // defined(ASIO_HAS_TIMERFD) 465 { 466 unsigned event_mask = 0; 467 if ((events[i].events & EPOLLIN) != 0) 468 event_mask |= ASIO_HANDLER_REACTOR_READ_EVENT; 469 if ((events[i].events & EPOLLOUT)) 470 event_mask |= ASIO_HANDLER_REACTOR_WRITE_EVENT; 471 if ((events[i].events & (EPOLLERR | EPOLLHUP)) != 0) 472 event_mask |= ASIO_HANDLER_REACTOR_ERROR_EVENT; 473 ASIO_HANDLER_REACTOR_EVENTS((context(), 474 reinterpret_cast<uintmax_t>(ptr), event_mask)); 475 } 476 } 477#endif // defined(ASIO_ENABLE_HANDLER_TRACKING) 478 479#if defined(ASIO_HAS_TIMERFD) 480 bool check_timers = (timer_fd_ == -1); 481#else // defined(ASIO_HAS_TIMERFD) 482 bool check_timers = true; 483#endif // defined(ASIO_HAS_TIMERFD) 484 485 // Dispatch the waiting events. 486 for (int i = 0; i < num_events; ++i) 487 { 488 void* ptr = events[i].data.ptr; 489 if (ptr == &interrupter_) 490 { 491 // No need to reset the interrupter since we're leaving the descriptor 492 // in a ready-to-read state and relying on edge-triggered notifications 493 // to make it so that we only get woken up when the descriptor's epoll 494 // registration is updated. 495 496#if defined(ASIO_HAS_TIMERFD) 497 if (timer_fd_ == -1) 498 check_timers = true; 499#else // defined(ASIO_HAS_TIMERFD) 500 check_timers = true; 501#endif // defined(ASIO_HAS_TIMERFD) 502 } 503#if defined(ASIO_HAS_TIMERFD) 504 else if (ptr == &timer_fd_) 505 { 506 check_timers = true; 507 } 508#endif // defined(ASIO_HAS_TIMERFD) 509 else 510 { 511 // The descriptor operation doesn't count as work in and of itself, so we 512 // don't call work_started() here. This still allows the scheduler to 513 // stop if the only remaining operations are descriptor operations. 514 descriptor_state* descriptor_data = static_cast<descriptor_state*>(ptr); 515 descriptor_data->set_ready_events(events[i].events); 516 ops.push(descriptor_data); 517 } 518 } 519 520 if (check_timers) 521 { 522 mutex::scoped_lock common_lock(mutex_); 523 timer_queues_.get_ready_timers(ops); 524 525#if defined(ASIO_HAS_TIMERFD) 526 if (timer_fd_ != -1) 527 { 528 itimerspec new_timeout; 529 itimerspec old_timeout; 530 int flags = get_timeout(new_timeout); 531 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); 532 } 533#endif // defined(ASIO_HAS_TIMERFD) 534 } 535} 536 537void epoll_reactor::interrupt() 538{ 539 epoll_event ev = { 0, { 0 } }; 540 ev.events = EPOLLIN | EPOLLERR | EPOLLET; 541 ev.data.ptr = &interrupter_; 542 epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, interrupter_.read_descriptor(), &ev); 543} 544 545int epoll_reactor::do_epoll_create() 546{ 547#if defined(EPOLL_CLOEXEC) 548 int fd = epoll_create1(EPOLL_CLOEXEC); 549#else // defined(EPOLL_CLOEXEC) 550 int fd = -1; 551 errno = EINVAL; 552#endif // defined(EPOLL_CLOEXEC) 553 554 if (fd == -1 && (errno == EINVAL || errno == ENOSYS)) 555 { 556 fd = epoll_create(epoll_size); 557 if (fd != -1) 558 ::fcntl(fd, F_SETFD, FD_CLOEXEC); 559 } 560 561 if (fd == -1) 562 { 563 asio::error_code ec(errno, 564 asio::error::get_system_category()); 565 asio::detail::throw_error(ec, "epoll"); 566 } 567 568 return fd; 569} 570 571int epoll_reactor::do_timerfd_create() 572{ 573#if defined(ASIO_HAS_TIMERFD) 574# if defined(TFD_CLOEXEC) 575 int fd = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC); 576# else // defined(TFD_CLOEXEC) 577 int fd = -1; 578 errno = EINVAL; 579# endif // defined(TFD_CLOEXEC) 580 581 if (fd == -1 && errno == EINVAL) 582 { 583 fd = timerfd_create(CLOCK_MONOTONIC, 0); 584 if (fd != -1) 585 ::fcntl(fd, F_SETFD, FD_CLOEXEC); 586 } 587 588 return fd; 589#else // defined(ASIO_HAS_TIMERFD) 590 return -1; 591#endif // defined(ASIO_HAS_TIMERFD) 592} 593 594epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state() 595{ 596 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 597 return registered_descriptors_.alloc(registered_descriptors_mutex_.enabled()); 598} 599 600void epoll_reactor::free_descriptor_state(epoll_reactor::descriptor_state* s) 601{ 602 mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); 603 registered_descriptors_.free(s); 604} 605 606void epoll_reactor::do_add_timer_queue(timer_queue_base& queue) 607{ 608 mutex::scoped_lock lock(mutex_); 609 timer_queues_.insert(&queue); 610} 611 612void epoll_reactor::do_remove_timer_queue(timer_queue_base& queue) 613{ 614 mutex::scoped_lock lock(mutex_); 615 timer_queues_.erase(&queue); 616} 617 618void epoll_reactor::update_timeout() 619{ 620#if defined(ASIO_HAS_TIMERFD) 621 if (timer_fd_ != -1) 622 { 623 itimerspec new_timeout; 624 itimerspec old_timeout; 625 int flags = get_timeout(new_timeout); 626 timerfd_settime(timer_fd_, flags, &new_timeout, &old_timeout); 627 return; 628 } 629#endif // defined(ASIO_HAS_TIMERFD) 630 interrupt(); 631} 632 633int epoll_reactor::get_timeout(int msec) 634{ 635 // By default we will wait no longer than 5 minutes. This will ensure that 636 // any changes to the system clock are detected after no longer than this. 637 const int max_msec = 5 * 60 * 1000; 638 return timer_queues_.wait_duration_msec( 639 (msec < 0 || max_msec < msec) ? max_msec : msec); 640} 641 642#if defined(ASIO_HAS_TIMERFD) 643int epoll_reactor::get_timeout(itimerspec& ts) 644{ 645 ts.it_interval.tv_sec = 0; 646 ts.it_interval.tv_nsec = 0; 647 648 long usec = timer_queues_.wait_duration_usec(5 * 60 * 1000 * 1000); 649 ts.it_value.tv_sec = usec / 1000000; 650 ts.it_value.tv_nsec = usec ? (usec % 1000000) * 1000 : 1; 651 652 return usec ? 0 : TFD_TIMER_ABSTIME; 653} 654#endif // defined(ASIO_HAS_TIMERFD) 655 656struct epoll_reactor::perform_io_cleanup_on_block_exit 657{ 658 explicit perform_io_cleanup_on_block_exit(epoll_reactor* r) 659 : reactor_(r), first_op_(0) 660 { 661 } 662 663 ~perform_io_cleanup_on_block_exit() 664 { 665 if (first_op_) 666 { 667 // Post the remaining completed operations for invocation. 668 if (!ops_.empty()) 669 reactor_->scheduler_.post_deferred_completions(ops_); 670 671 // A user-initiated operation has completed, but there's no need to 672 // explicitly call work_finished() here. Instead, we'll take advantage of 673 // the fact that the scheduler will call work_finished() once we return. 674 } 675 else 676 { 677 // No user-initiated operations have completed, so we need to compensate 678 // for the work_finished() call that the scheduler will make once this 679 // operation returns. 680 reactor_->scheduler_.compensating_work_started(); 681 } 682 } 683 684 epoll_reactor* reactor_; 685 op_queue<operation> ops_; 686 operation* first_op_; 687}; 688 689epoll_reactor::descriptor_state::descriptor_state(bool locking) 690 : operation(&epoll_reactor::descriptor_state::do_complete), 691 mutex_(locking) 692{ 693} 694 695operation* epoll_reactor::descriptor_state::perform_io(uint32_t events) 696{ 697 mutex_.lock(); 698 perform_io_cleanup_on_block_exit io_cleanup(reactor_); 699 mutex::scoped_lock descriptor_lock(mutex_, mutex::scoped_lock::adopt_lock); 700 701 // Exception operations must be processed first to ensure that any 702 // out-of-band data is read before normal data. 703 static const int flag[max_ops] = { EPOLLIN, EPOLLOUT, EPOLLPRI }; 704 for (int j = max_ops - 1; j >= 0; --j) 705 { 706 if (events & (flag[j] | EPOLLERR | EPOLLHUP)) 707 { 708 try_speculative_[j] = true; 709 while (reactor_op* op = op_queue_[j].front()) 710 { 711 if (reactor_op::status status = op->perform()) 712 { 713 op_queue_[j].pop(); 714 io_cleanup.ops_.push(op); 715 if (status == reactor_op::done_and_exhausted) 716 { 717 try_speculative_[j] = false; 718 break; 719 } 720 } 721 else 722 break; 723 } 724 } 725 } 726 727 // The first operation will be returned for completion now. The others will 728 // be posted for later by the io_cleanup object's destructor. 729 io_cleanup.first_op_ = io_cleanup.ops_.front(); 730 io_cleanup.ops_.pop(); 731 return io_cleanup.first_op_; 732} 733 734void epoll_reactor::descriptor_state::do_complete( 735 void* owner, operation* base, 736 const asio::error_code& ec, std::size_t bytes_transferred) 737{ 738 if (owner) 739 { 740 descriptor_state* descriptor_data = static_cast<descriptor_state*>(base); 741 uint32_t events = static_cast<uint32_t>(bytes_transferred); 742 if (operation* op = descriptor_data->perform_io(events)) 743 { 744 op->complete(owner, ec, 0); 745 } 746 } 747} 748 749} // namespace detail 750} // namespace asio 751 752#include "asio/detail/pop_options.hpp" 753 754#endif // defined(ASIO_HAS_EPOLL) 755 756#endif // ASIO_DETAIL_IMPL_EPOLL_REACTOR_IPP 757