1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <Ice/Selector.h>
6 #include <Ice/EventHandler.h>
7 #include <Ice/Instance.h>
8 #include <Ice/LoggerUtil.h>
9 #include <Ice/LocalException.h>
10 #include <IceUtil/Time.h>
11
12 #ifdef ICE_USE_CFSTREAM
13 # include <CoreFoundation/CoreFoundation.h>
14 # include <CoreFoundation/CFStream.h>
15 #endif
16
17 using namespace std;
18 using namespace IceInternal;
19
20 #if defined(ICE_USE_KQUEUE)
21 namespace
22 {
23 struct timespec zeroTimeout = { 0, 0 };
24 }
25 #endif
26
27 #if defined(ICE_OS_UWP)
28 using namespace Windows::Storage::Streams;
29 using namespace Windows::Networking;
30 using namespace Windows::Networking::Sockets;
31 #endif
32
33 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
34
Selector(const InstancePtr & instance)35 Selector::Selector(const InstancePtr& instance) : _instance(instance)
36 {
37 }
38
~Selector()39 Selector::~Selector()
40 {
41 }
42
43 #ifdef ICE_USE_IOCP
44 void
setup(int sizeIO)45 Selector::setup(int sizeIO)
46 {
47 _handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, ICE_NULLPTR, 0, sizeIO);
48 if(_handle == ICE_NULLPTR)
49 {
50 throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
51 }
52 }
53 #endif
54
55 void
destroy()56 Selector::destroy()
57 {
58 #ifdef ICE_USE_IOCP
59 CloseHandle(_handle);
60 #endif
61 }
62
63 void
initialize(EventHandler * handler)64 Selector::initialize(EventHandler* handler)
65 {
66 if(!handler->getNativeInfo())
67 {
68 return;
69 }
70
71 #ifdef ICE_USE_IOCP
72 SOCKET socket = handler->getNativeInfo()->fd();
73 if (socket != INVALID_SOCKET)
74 {
75 if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(socket),
76 _handle,
77 reinterpret_cast<ULONG_PTR>(handler),
78 0) == ICE_NULLPTR)
79 {
80 throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
81 }
82 }
83 handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler));
84 #else
85 EventHandlerPtr h = ICE_GET_SHARED_FROM_THIS(handler);
86 handler->getNativeInfo()->setCompletedHandler(
87 ref new SocketOperationCompletedHandler(
88 [=](int operation)
89 {
90 //
91 // Use the reference counted handler to ensure it's not
92 // destroyed as long as the callback lambda exists.
93 //
94 completed(h.get(), static_cast<SocketOperation>(operation));
95 }));
96 #endif
97 }
98
99 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)100 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
101 {
102 handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
103 handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
104 if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead))
105 {
106 handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead);
107 completed(handler, SocketOperationRead); // Start an asynchrnous read
108 }
109 else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite))
110 {
111 handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite);
112 completed(handler, SocketOperationWrite); // Start an asynchrnous write
113 }
114 }
115
116 void
finish(IceInternal::EventHandler * handler)117 Selector::finish(IceInternal::EventHandler* handler)
118 {
119 #ifdef ICE_OS_UWP
120 // If async operations are no longer pending, clear the completion handler to break
121 // the cyclic reference count.
122 assert(!handler->_started && !handler->_pending);
123 handler->getNativeInfo()->setCompletedHandler(nullptr);
124 #endif
125 handler->_registered = SocketOperationNone;
126 handler->_finish = false; // Ensures that finished() is only called once on the event handler.
127 }
128
129 void
ready(EventHandler * handler,SocketOperation status,bool value)130 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
131 {
132 if(((handler->_ready & status) != 0) == value)
133 {
134 return; // Nothing to do if ready state already correctly set.
135 }
136
137 if(value)
138 {
139 handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
140 }
141 else
142 {
143 handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
144 }
145 }
146
147 EventHandler*
148 #ifdef ICE_USE_IOCP
getNextHandler(SocketOperation & status,DWORD & count,int & error,int timeout)149 Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int timeout)
150 #else
151 Selector::getNextHandler(SocketOperation& status, int timeout)
152 #endif
153 {
154 #ifdef ICE_USE_IOCP
155 ULONG_PTR key;
156 LPOVERLAPPED ol;
157 error = ERROR_SUCCESS;
158
159 if(!GetQueuedCompletionStatus(_handle, &count, &key, &ol, timeout > 0 ? timeout * 1000 : INFINITE))
160 {
161 int err = WSAGetLastError();
162 if(ol == 0)
163 {
164 if(err == WAIT_TIMEOUT)
165 {
166 throw SelectorTimeoutException();
167 }
168 else
169 {
170 Ice::SocketException ex(__FILE__, __LINE__, err);
171 Ice::Error out(_instance->initializationData().logger);
172 out << "couldn't dequeue packet from completion port:\n" << ex;
173 IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5)); // Sleep 5s to avoid looping
174 }
175 }
176 AsyncInfo* info = static_cast<AsyncInfo*>(ol);
177 if(info)
178 {
179 status = info->status;
180 }
181 count = 0;
182 error = WSAGetLastError();
183 return reinterpret_cast<EventHandler*>(key);
184 }
185
186 AsyncInfo* info = static_cast<AsyncInfo*>(ol);
187 if(info)
188 {
189 status = info->status;
190 }
191 else
192 {
193 status = reinterpret_cast<EventHandler*>(key)->_ready;
194 }
195 return reinterpret_cast<EventHandler*>(key);
196 #else
197 IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
198 while(_events.empty())
199 {
200 if(timeout > 0)
201 {
202 _monitor.timedWait(IceUtil::Time::seconds(timeout));
203 if(_events.empty())
204 {
205 throw SelectorTimeoutException();
206 }
207 }
208 else
209 {
210 _monitor.wait();
211 }
212 }
213 assert(!_events.empty());
214 IceInternal::EventHandlerPtr handler = _events.front().handler;
215 const SelectEvent& event = _events.front();
216 status = event.status;
217 _events.pop_front();
218 return handler.get();
219 #endif
220 }
221
222 void
completed(EventHandler * handler,SocketOperation op)223 Selector::completed(EventHandler* handler, SocketOperation op)
224 {
225 #ifdef ICE_USE_IOCP
226 AsyncInfo* info = 0;
227 NativeInfoPtr nativeInfo = handler->getNativeInfo();
228 if(nativeInfo)
229 {
230 info = nativeInfo->getAsyncInfo(op);
231 }
232 if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info))
233 {
234 throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
235 }
236 #else
237 IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
238 _events.push_back(SelectEvent(handler->shared_from_this(), op));
239 _monitor.notify();
240 #endif
241 }
242
243 #elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
244
Selector(const InstancePtr & instance)245 Selector::Selector(const InstancePtr& instance) : _instance(instance), _interrupted(false)
246 {
247 SOCKET fds[2];
248 createPipe(fds);
249 _fdIntrRead = fds[0];
250 _fdIntrWrite = fds[1];
251 _selecting = false;
252
253 #if defined(ICE_USE_EPOLL)
254 _events.resize(256);
255 _queueFd = epoll_create(1);
256 if(_queueFd < 0)
257 {
258 throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
259 }
260
261 epoll_event event;
262 memset(&event, 0, sizeof(epoll_event));
263 event.data.ptr = 0;
264 event.events = EPOLLIN;
265 if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
266 {
267 Ice::Error out(_instance->initializationData().logger);
268 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
269 }
270 #elif defined(ICE_USE_KQUEUE)
271 _events.resize(256);
272 _queueFd = kqueue();
273 if(_queueFd < 0)
274 {
275 throw Ice::SocketException(__FILE__, __LINE__, getSocketErrno());
276 }
277
278 struct kevent ev;
279 EV_SET(&ev, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0);
280 int rs = kevent(_queueFd, &ev, 1, 0, 0, 0);
281 if(rs < 0)
282 {
283 Ice::Error out(_instance->initializationData().logger);
284 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
285 }
286 #elif defined(ICE_USE_SELECT)
287 FD_ZERO(&_readFdSet);
288 FD_ZERO(&_writeFdSet);
289 FD_ZERO(&_errorFdSet);
290 FD_SET(_fdIntrRead, &_readFdSet);
291 #else
292 struct pollfd pollFd;
293 pollFd.fd = _fdIntrRead;
294 pollFd.events = POLLIN;
295 _pollFdSet.push_back(pollFd);
296 #endif
297 }
298
~Selector()299 Selector::~Selector()
300 {
301 }
302
303 void
destroy()304 Selector::destroy()
305 {
306 #if defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
307 try
308 {
309 closeSocket(_queueFd);
310 }
311 catch(const Ice::LocalException& ex)
312 {
313 Ice::Error out(_instance->initializationData().logger);
314 out << "exception in selector while calling closeSocket():\n" << ex;
315 }
316 #endif
317
318 try
319 {
320 closeSocket(_fdIntrWrite);
321 }
322 catch(const Ice::LocalException& ex)
323 {
324 Ice::Error out(_instance->initializationData().logger);
325 out << "exception in selector while calling closeSocket():\n" << ex;
326 }
327
328 try
329 {
330 closeSocket(_fdIntrRead);
331 }
332 catch(const Ice::LocalException& ex)
333 {
334 Ice::Error out(_instance->initializationData().logger);
335 out << "exception in selector while calling closeSocket():\n" << ex;
336 }
337 }
338
339 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)340 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
341 {
342 SocketOperation previous = handler->_registered;
343 handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
344 handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
345 if(previous == handler->_registered)
346 {
347 return;
348 }
349 checkReady(handler);
350
351 NativeInfoPtr nativeInfo = handler->getNativeInfo();
352 if(nativeInfo && nativeInfo->fd() != INVALID_SOCKET)
353 {
354 updateSelectorForEventHandler(handler, remove, add);
355 }
356 }
357
358 void
enable(EventHandler * handler,SocketOperation status)359 Selector::enable(EventHandler* handler, SocketOperation status)
360 {
361 if(!(handler->_disabled & status))
362 {
363 return;
364 }
365 handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status);
366 checkReady(handler);
367
368 NativeInfoPtr nativeInfo = handler->getNativeInfo();
369 if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
370 {
371 return;
372 }
373
374 if(handler->_registered & status)
375 {
376 #if defined(ICE_USE_EPOLL)
377 SOCKET fd = nativeInfo->fd();
378 SocketOperation previous = static_cast<SocketOperation>(handler->_registered & ~(handler->_disabled | status));
379 SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
380 epoll_event event;
381 memset(&event, 0, sizeof(epoll_event));
382 event.data.ptr = handler;
383 if(newStatus & SocketOperationRead)
384 {
385 event.events |= EPOLLIN;
386 }
387 if(newStatus & SocketOperationWrite)
388 {
389 event.events |= EPOLLOUT;
390 }
391 if(epoll_ctl(_queueFd, previous ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &event) != 0)
392 {
393 Ice::Error out(_instance->initializationData().logger);
394 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
395 }
396 #elif defined(ICE_USE_KQUEUE)
397 struct kevent ev;
398 SOCKET fd = handler->getNativeInfo()->fd();
399 EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_ENABLE, 0, 0, handler);
400 _changes.push_back(ev);
401 if(_selecting)
402 {
403 updateSelector();
404 }
405 #else
406 _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
407 wakeup();
408 #endif
409 }
410 }
411
412 void
disable(EventHandler * handler,SocketOperation status)413 Selector::disable(EventHandler* handler, SocketOperation status)
414 {
415 if(handler->_disabled & status)
416 {
417 return;
418 }
419 handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
420 checkReady(handler);
421
422 NativeInfoPtr nativeInfo = handler->getNativeInfo();
423 if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
424 {
425 return;
426 }
427
428 if(handler->_registered & status)
429 {
430 #if defined(ICE_USE_EPOLL)
431 SOCKET fd = nativeInfo->fd();
432 SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
433 epoll_event event;
434 memset(&event, 0, sizeof(epoll_event));
435 event.data.ptr = handler;
436 if(newStatus & SocketOperationRead)
437 {
438 event.events |= EPOLLIN;
439 }
440 if(newStatus & SocketOperationWrite)
441 {
442 event.events |= EPOLLOUT;
443 }
444 if(epoll_ctl(_queueFd, newStatus ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, fd, &event) != 0)
445 {
446 Ice::Error out(_instance->initializationData().logger);
447 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
448 }
449 #elif defined(ICE_USE_KQUEUE)
450 SOCKET fd = nativeInfo->fd();
451 struct kevent ev;
452 EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_DISABLE, 0, 0, handler);
453 _changes.push_back(ev);
454 if(_selecting)
455 {
456 updateSelector();
457 }
458 #else
459 _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
460 wakeup();
461 #endif
462 }
463 }
464
465 bool
finish(EventHandler * handler,bool closeNow)466 Selector::finish(EventHandler* handler, bool closeNow)
467 {
468 if(handler->_registered)
469 {
470 update(handler, handler->_registered, SocketOperationNone);
471 #if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE)
472 return false; // Don't close now if selecting
473 #endif
474 }
475
476 #if defined(ICE_USE_KQUEUE)
477 if(closeNow && !_changes.empty())
478 {
479 //
480 // Update selector now to remove the FD from the kqueue if
481 // we're going to close it now. This isn't necessary for
482 // epoll since we always update the epoll FD immediately.
483 //
484 updateSelector();
485 }
486 #elif !defined(ICE_USE_EPOLL)
487 if(!_changes.empty())
488 {
489 return false;
490 }
491 #endif
492
493 return closeNow;
494 }
495
496 void
ready(EventHandler * handler,SocketOperation status,bool value)497 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
498 {
499 if(((handler->_ready & status) != 0) == value)
500 {
501 return; // Nothing to do if ready state already correctly set.
502 }
503
504 if(status & SocketOperationConnect)
505 {
506 NativeInfoPtr nativeInfo = handler->getNativeInfo();
507 if(nativeInfo && nativeInfo->newFd() && handler->_registered)
508 {
509 // If new FD is set after connect, register the FD with the selector.
510 updateSelectorForEventHandler(handler, SocketOperationNone, handler->_registered);
511 }
512 }
513
514 if(value)
515 {
516 handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
517 }
518 else
519 {
520 handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
521 }
522 checkReady(handler);
523 }
524
525 void
wakeup()526 Selector::wakeup()
527 {
528 if(_selecting && !_interrupted)
529 {
530 char c = 0;
531 while(true)
532 {
533 if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
534 {
535 if(interrupted())
536 {
537 continue;
538 }
539
540 throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
541 }
542 break;
543 }
544 _interrupted = true;
545 }
546 }
547
548 void
startSelect()549 Selector::startSelect()
550 {
551 if(_interrupted)
552 {
553 char c;
554 while(true)
555 {
556 ssize_t ret = ::read(_fdIntrRead, &c, 1);
557 if(ret == SOCKET_ERROR)
558 {
559 if(interrupted())
560 {
561 continue;
562 }
563 throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
564 }
565 break;
566 }
567 _interrupted = false;
568 }
569
570 #if !defined(ICE_USE_EPOLL)
571 if(!_changes.empty())
572 {
573 updateSelector();
574 }
575 #endif
576 _selecting = true;
577
578 //
579 // If there are ready handlers, don't block in select, just do a non-blocking
580 // select to retrieve new ready handlers from the Java selector.
581 //
582 _selectNow = !_readyHandlers.empty();
583 }
584
585 void
finishSelect(vector<pair<EventHandler *,SocketOperation>> & handlers)586 Selector::finishSelect(vector<pair<EventHandler*, SocketOperation> >& handlers)
587 {
588 _selecting = false;
589
590 assert(handlers.empty());
591
592 #if defined(ICE_USE_POLL) || defined(ICE_USE_SELECT)
593 if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers
594 {
595 return;
596 }
597 #endif
598
599 #if defined(ICE_USE_POLL)
600 for(vector<struct pollfd>::const_iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
601 #else
602 for(int i = 0; i < _count; ++i)
603 #endif
604 {
605 pair<EventHandler*, SocketOperation> p;
606
607 #if defined(ICE_USE_EPOLL)
608 struct epoll_event& ev = _events[i];
609 p.first = reinterpret_cast<EventHandler*>(ev.data.ptr);
610 p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
611 SocketOperationRead : SocketOperationNone) |
612 ((ev.events & (EPOLLOUT | EPOLLERR)) ?
613 SocketOperationWrite : SocketOperationNone));
614 #elif defined(ICE_USE_KQUEUE)
615 struct kevent& ev = _events[i];
616 if(ev.flags & EV_ERROR)
617 {
618 Ice::Error out(_instance->initializationData().logger);
619 out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data);
620 continue;
621 }
622 p.first = reinterpret_cast<EventHandler*>(ev.udata);
623 p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite;
624 #elif defined(ICE_USE_SELECT)
625 //
626 // Round robin for the filedescriptors.
627 //
628 SOCKET fd;
629 p.second = SocketOperationNone;
630 if(i < _selectedReadFdSet.fd_count)
631 {
632 fd = _selectedReadFdSet.fd_array[i];
633 p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
634 }
635 else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count)
636 {
637 fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count];
638 p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
639 }
640 else
641 {
642 fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count];
643 p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect);
644 }
645
646 assert(fd != _fdIntrRead);
647 p.first = _handlers[fd];
648 #else
649 if(r->revents == 0)
650 {
651 continue;
652 }
653
654 SOCKET fd = r->fd;
655 assert(_handlers.find(fd) != _handlers.end());
656 p.first = _handlers[fd];
657 p.second = SocketOperationNone;
658 if(r->revents & (POLLIN | POLLERR | POLLHUP))
659 {
660 p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
661 }
662 if(r->revents & (POLLOUT | POLLERR | POLLHUP))
663 {
664 p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
665 }
666 assert(p.second);
667 #endif
668 if(!p.first)
669 {
670 continue; // Interrupted
671 }
672
673 map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.find(ICE_GET_SHARED_FROM_THIS(p.first));
674
675 if(q != _readyHandlers.end()) // Handler will be added by the loop below
676 {
677 q->second = p.second; // We just remember which operations are ready here.
678 }
679 else
680 {
681 handlers.push_back(p);
682 }
683 }
684
685 for(map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.begin(); q != _readyHandlers.end(); ++q)
686 {
687 pair<EventHandler*, SocketOperation> p;
688 p.first = q->first.get();
689 p.second = static_cast<SocketOperation>(p.first->_ready & ~p.first->_disabled & p.first->_registered);
690 p.second = static_cast<SocketOperation>(p.second | q->second);
691 if(p.second)
692 {
693 handlers.push_back(p);
694 }
695
696 //
697 // Reset the operation, it's only used by this method to temporarly store the socket status
698 // return by the select operation above.
699 //
700 q->second = SocketOperationNone;
701 }
702 }
703
704 void
select(int timeout)705 Selector::select(int timeout)
706 {
707 if(_selectNow)
708 {
709 timeout = 0;
710 }
711 else if(timeout > 0)
712 {
713 timeout = timeout * 1000;
714 }
715 else
716 {
717 timeout = -1;
718 }
719
720 int spuriousWakeup = 0;
721 while(true)
722 {
723 #if defined(ICE_USE_EPOLL)
724 _count = epoll_wait(_queueFd, &_events[0], _events.size(), timeout);
725 #elif defined(ICE_USE_KQUEUE)
726 assert(!_events.empty());
727 if(timeout >= 0)
728 {
729 struct timespec ts;
730 ts.tv_sec = timeout;
731 ts.tv_nsec = 0;
732 _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts);
733 }
734 else
735 {
736 _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0);
737 }
738 #elif defined(ICE_USE_SELECT)
739 fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet);
740 fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet);
741 fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet);
742 if(timeout >= 0)
743 {
744 struct timeval tv;
745 tv.tv_sec = timeout;
746 tv.tv_usec = 0;
747 _count = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows
748 }
749 else
750 {
751 _count = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows
752 }
753 #else
754 _count = poll(&_pollFdSet[0], _pollFdSet.size(), timeout);
755 #endif
756
757 if(_count == SOCKET_ERROR)
758 {
759 if(interrupted())
760 {
761 continue;
762 }
763
764 Ice::SocketException ex(__FILE__, __LINE__, IceInternal::getSocketErrno());
765 Ice::Error out(_instance->initializationData().logger);
766 out << "selector failed:\n" << ex;
767 IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5)); // Sleep 5s to avoid looping
768 }
769 else if(_count == 0 && timeout < 0)
770 {
771 if(++spuriousWakeup > 100)
772 {
773 spuriousWakeup = 0;
774 _instance->initializationData().logger->warning("spurious selector wakeup");
775 }
776 IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(1));
777 continue;
778 }
779 break;
780 }
781
782 if(_count == 0 && !_selectNow)
783 {
784 throw SelectorTimeoutException();
785 }
786 }
787
788 void
checkReady(EventHandler * handler)789 Selector::checkReady(EventHandler* handler)
790 {
791 if(handler->_ready & ~handler->_disabled & handler->_registered)
792 {
793 _readyHandlers.insert(make_pair(ICE_GET_SHARED_FROM_THIS(handler), SocketOperationNone));
794 wakeup();
795 }
796 else
797 {
798 map<EventHandlerPtr, SocketOperation>::iterator p = _readyHandlers.find(ICE_GET_SHARED_FROM_THIS(handler));
799 if(p != _readyHandlers.end())
800 {
801 _readyHandlers.erase(p);
802 }
803 }
804 }
805
806 void
updateSelector()807 Selector::updateSelector()
808 {
809 #if defined(ICE_USE_KQUEUE)
810 int rs = kevent(_queueFd, &_changes[0], _changes.size(), &_changes[0], _changes.size(), &zeroTimeout);
811 if(rs < 0)
812 {
813 Ice::Error out(_instance->initializationData().logger);
814 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
815 }
816 else
817 {
818 for(int i = 0; i < rs; ++i)
819 {
820 //
821 // Check for errors, we ignore EINPROGRESS that started showing up with macOS Sierra
822 // and which occurs when another thread removes the FD from the kqueue (see ICE-7419).
823 //
824 if(_changes[i].flags & EV_ERROR && _changes[i].data != EINPROGRESS)
825 {
826 Ice::Error out(_instance->initializationData().logger);
827 out << "error while updating selector:\n" << IceUtilInternal::errorToString(_changes[i].data);
828 }
829 }
830 }
831 _changes.clear();
832 #elif !defined(ICE_USE_EPOLL)
833 assert(!_selecting);
834
835 for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
836 {
837 EventHandler* handler = p->first;
838 SocketOperation status = p->second;
839
840 SOCKET fd = handler->getNativeInfo()->fd();
841 if(status)
842 {
843 #if defined(ICE_USE_SELECT)
844 if(status & SocketOperationRead)
845 {
846 FD_SET(fd, &_readFdSet);
847 }
848 else
849 {
850 FD_CLR(fd, &_readFdSet);
851 }
852 if(status & SocketOperationWrite)
853 {
854 FD_SET(fd, &_writeFdSet);
855 }
856 else
857 {
858 FD_CLR(fd, &_writeFdSet);
859 }
860 if(status & SocketOperationConnect)
861 {
862 FD_SET(fd, &_writeFdSet);
863 FD_SET(fd, &_errorFdSet);
864 }
865 else
866 {
867 FD_CLR(fd, &_writeFdSet);
868 FD_CLR(fd, &_errorFdSet);
869 }
870 _handlers[fd] = handler;
871 #else
872 short events = 0;
873 if(status & SocketOperationRead)
874 {
875 events |= POLLIN;
876 }
877 if(status & SocketOperationWrite)
878 {
879 events |= POLLOUT;
880 }
881 map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd);
882 if(q == _handlers.end())
883 {
884 struct pollfd pollFd;
885 pollFd.fd = fd;
886 pollFd.events = events;
887 pollFd.revents = 0;
888 _pollFdSet.push_back(pollFd);
889 _handlers.insert(make_pair(fd, handler));
890 }
891 else
892 {
893 for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
894 {
895 if(r->fd == fd)
896 {
897 r->events = events;
898 break;
899 }
900 }
901 }
902 #endif
903 }
904 else
905 {
906 #if defined(ICE_USE_SELECT)
907 FD_CLR(fd, &_readFdSet);
908 FD_CLR(fd, &_writeFdSet);
909 FD_CLR(fd, &_errorFdSet);
910 #else
911 for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
912 {
913 if(r->fd == fd)
914 {
915 _pollFdSet.erase(r);
916 break;
917 }
918 }
919 #endif
920 _handlers.erase(fd);
921 }
922 }
923 _changes.clear();
924 #endif
925 }
926
927 void
updateSelectorForEventHandler(EventHandler * handler,SocketOperation remove,SocketOperation add)928 Selector::updateSelectorForEventHandler(EventHandler* handler, SocketOperation remove, SocketOperation add)
929 {
930 #if defined(ICE_USE_EPOLL)
931 SocketOperation previous = handler->_registered;
932 previous = static_cast<SocketOperation>(previous & ~add);
933 previous = static_cast<SocketOperation>(previous | remove);
934 SOCKET fd = handler->getNativeInfo()->fd();
935 assert(fd != INVALID_SOCKET);
936 epoll_event event;
937 memset(&event, 0, sizeof(epoll_event));
938 event.data.ptr = handler;
939 SocketOperation status = handler->_registered;
940 if(handler->_disabled)
941 {
942 status = static_cast<SocketOperation>(status & ~handler->_disabled);
943 previous = static_cast<SocketOperation>(previous & ~handler->_disabled);
944 }
945 if(status & SocketOperationRead)
946 {
947 event.events |= EPOLLIN;
948 }
949 if(status & SocketOperationWrite)
950 {
951 event.events |= EPOLLOUT;
952 }
953 int op;
954 if(!previous && status)
955 {
956 op = EPOLL_CTL_ADD;
957 }
958 else if(previous && !status)
959 {
960 op = EPOLL_CTL_DEL;
961 }
962 else if(previous == status)
963 {
964 return;
965 }
966 else
967 {
968 op = EPOLL_CTL_MOD;
969 }
970 if(epoll_ctl(_queueFd, op, fd, &event) != 0)
971 {
972 Ice::Error out(_instance->initializationData().logger);
973 out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
974 }
975 #elif defined(ICE_USE_KQUEUE)
976 SOCKET fd = handler->getNativeInfo()->fd();
977 assert(fd != INVALID_SOCKET);
978 if(remove & SocketOperationRead)
979 {
980 struct kevent ev;
981 EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler);
982 _changes.push_back(ev);
983 }
984 if(remove & SocketOperationWrite)
985 {
986 struct kevent ev;
987 EV_SET(&ev, fd, EVFILT_WRITE, EV_DELETE, 0, 0, handler);
988 _changes.push_back(ev);
989 }
990 if(add & SocketOperationRead)
991 {
992 struct kevent ev;
993 EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0,
994 handler);
995 _changes.push_back(ev);
996 }
997 if(add & SocketOperationWrite)
998 {
999 struct kevent ev;
1000 EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0,
1001 handler);
1002 _changes.push_back(ev);
1003 }
1004 if(_selecting)
1005 {
1006 updateSelector();
1007 }
1008 #else
1009 _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
1010 wakeup();
1011 #endif
1012 checkReady(handler);
1013 }
1014
1015 #elif defined(ICE_USE_CFSTREAM)
1016
1017 namespace
1018 {
1019
selectorInterrupt(void * info)1020 void selectorInterrupt(void* info)
1021 {
1022 reinterpret_cast<Selector*>(info)->processInterrupt();
1023 }
1024
eventHandlerSocketCallback(CFSocketRef,CFSocketCallBackType callbackType,CFDataRef,const void * d,void * info)1025 void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info)
1026 {
1027 if(callbackType == kCFSocketReadCallBack)
1028 {
1029 reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead);
1030 }
1031 else if(callbackType == kCFSocketWriteCallBack)
1032 {
1033 reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite);
1034 }
1035 else if(callbackType == kCFSocketConnectCallBack)
1036 {
1037 reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
1038 d ? *reinterpret_cast<const SInt32*>(d) : 0);
1039 }
1040 }
1041
1042 class SelectorHelperThread : public IceUtil::Thread
1043 {
1044 public:
1045
SelectorHelperThread(Selector & selector)1046 SelectorHelperThread(Selector& selector) : _selector(selector)
1047 {
1048 }
1049
run()1050 virtual void run()
1051 {
1052 _selector.run();
1053
1054 #if TARGET_IPHONE_SIMULATOR != 0
1055 //
1056 // Workaround for CFSocket bug where the CFSocketManager thread crashes if an
1057 // invalidated socket is being processed for reads/writes. We add this sleep
1058 // mostly to prevent spurious crashes with testing. This bug is very unlikely
1059 // to be hit otherwise.
1060 //
1061 IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
1062 #endif
1063 }
1064
1065 private:
1066
1067 Selector& _selector;
1068 };
1069
1070 CFOptionFlags
toCFCallbacks(SocketOperation op)1071 toCFCallbacks(SocketOperation op)
1072 {
1073 CFOptionFlags cbs = 0;
1074 if(op & SocketOperationRead)
1075 {
1076 cbs |= kCFSocketReadCallBack;
1077 }
1078 if(op & SocketOperationWrite)
1079 {
1080 cbs |= kCFSocketWriteCallBack;
1081 }
1082 if(op & SocketOperationConnect)
1083 {
1084 cbs |= kCFSocketConnectCallBack;
1085 }
1086 return cbs;
1087 }
1088
1089 }
1090
EventHandlerWrapper(EventHandler * handler,Selector & selector)1091 EventHandlerWrapper::EventHandlerWrapper(EventHandler* handler, Selector& selector) :
1092 _handler(ICE_GET_SHARED_FROM_THIS(handler)),
1093 _streamNativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())),
1094 _selector(selector),
1095 _ready(SocketOperationNone),
1096 _finish(false)
1097 {
1098 if(_streamNativeInfo)
1099 {
1100 _streamNativeInfo->initStreams(this);
1101 }
1102 else if(handler->getNativeInfo())
1103 {
1104 SOCKET fd = handler->getNativeInfo()->fd();
1105 CFSocketContext ctx = { 0, this, 0, 0, 0 };
1106 _socket.reset(CFSocketCreateWithNative(kCFAllocatorDefault,
1107 fd,
1108 kCFSocketReadCallBack |
1109 kCFSocketWriteCallBack |
1110 kCFSocketConnectCallBack,
1111 eventHandlerSocketCallback,
1112 &ctx));
1113
1114 // Disable automatic re-enabling of callbacks and closing of the native socket.
1115 CFSocketSetSocketFlags(_socket.get(), 0);
1116 CFSocketDisableCallBacks(_socket.get(), kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
1117 _source.reset(CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket.get(), 0));
1118 }
1119 }
1120
~EventHandlerWrapper()1121 EventHandlerWrapper::~EventHandlerWrapper()
1122 {
1123 }
1124
1125 void
updateRunLoop()1126 EventHandlerWrapper::updateRunLoop()
1127 {
1128 SocketOperation op = _handler->_registered;
1129 assert(!op || !_finish);
1130
1131 if(_socket)
1132 {
1133 CFSocketDisableCallBacks(_socket.get(), kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
1134 if(op)
1135 {
1136 CFSocketEnableCallBacks(_socket.get(), toCFCallbacks(op));
1137 }
1138
1139 if(op && !CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode))
1140 {
1141 CFRunLoopAddSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1142 }
1143 else if(!op && CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode))
1144 {
1145 CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1146 }
1147
1148 if(_finish)
1149 {
1150 CFSocketInvalidate(_socket.get());
1151 }
1152 }
1153 else
1154 {
1155 SocketOperation readyOp = _streamNativeInfo->registerWithRunLoop(op);
1156 if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite)
1157 {
1158 _streamNativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
1159 }
1160
1161 if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead)
1162 {
1163 _streamNativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
1164 }
1165
1166 if(readyOp)
1167 {
1168 ready(readyOp, 0);
1169 }
1170
1171 if(_finish)
1172 {
1173 _streamNativeInfo->closeStreams();
1174 }
1175 }
1176 }
1177
1178 void
readyCallback(SocketOperation op,int error)1179 EventHandlerWrapper::readyCallback(SocketOperation op, int error)
1180 {
1181 _selector.ready(this, op, error);
1182 }
1183
1184 void
ready(SocketOperation op,int error)1185 EventHandlerWrapper::ready(SocketOperation op, int error)
1186 {
1187 if(!_socket)
1188 {
1189 //
1190 // Unregister the stream from the runloop as soon as we got the callback. This is
1191 // required to allow thread pool thread to perform read/write operations on the
1192 // stream (which can't be used from another thread than the run loop thread if
1193 // it's registered with a run loop).
1194 //
1195 op = _streamNativeInfo->unregisterFromRunLoop(op, error != 0);
1196 }
1197
1198 op = static_cast<SocketOperation>(_handler->_registered & op);
1199 if(!op || _ready & op)
1200 {
1201 return;
1202 }
1203
1204 if(_socket)
1205 {
1206 if(op & SocketOperationConnect)
1207 {
1208 _streamNativeInfo->setConnectError(error);
1209 }
1210 }
1211
1212 _ready = static_cast<SocketOperation>(_ready | op);
1213 checkReady();
1214 }
1215
1216 bool
checkReady()1217 EventHandlerWrapper::checkReady()
1218 {
1219 if((_ready | _handler->_ready) & ~_handler->_disabled & _handler->_registered)
1220 {
1221 _selector.addReadyHandler(this);
1222 return false;
1223 }
1224 else
1225 {
1226 return _handler->getNativeInfo() && !_finish;
1227 }
1228 }
1229
1230 SocketOperation
readyOp()1231 EventHandlerWrapper::readyOp()
1232 {
1233 assert(!(~_handler->_registered & _ready));
1234 SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & (_ready | _handler->_ready));
1235 _ready = static_cast<SocketOperation>(~op & _ready);
1236 return op;
1237 }
1238
1239 bool
update(SocketOperation remove,SocketOperation add)1240 EventHandlerWrapper::update(SocketOperation remove, SocketOperation add)
1241 {
1242 SocketOperation previous = _handler->_registered;
1243 _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove);
1244 _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add);
1245 if(previous == _handler->_registered)
1246 {
1247 return false;
1248 }
1249
1250 // Clear ready flags which might not be valid anymore.
1251 _ready = static_cast<SocketOperation>(_ready & _handler->_registered);
1252 return _handler->getNativeInfo();
1253 }
1254
1255 bool
finish()1256 EventHandlerWrapper::finish()
1257 {
1258 _finish = true;
1259 _ready = SocketOperationNone;
1260 _handler->_registered = SocketOperationNone;
1261 return _handler->getNativeInfo();
1262 }
1263
Selector(const InstancePtr & instance)1264 Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false)
1265 {
1266 CFRunLoopSourceContext ctx;
1267 memset(&ctx, 0, sizeof(CFRunLoopSourceContext));
1268 ctx.info = this;
1269 ctx.perform = selectorInterrupt;
1270 _source.reset(CFRunLoopSourceCreate(0, 0, &ctx));
1271 _runLoop = 0;
1272
1273 _thread = new SelectorHelperThread(*this);
1274 _thread->start();
1275
1276 Lock sync(*this);
1277 while(!_runLoop)
1278 {
1279 wait();
1280 }
1281 }
1282
~Selector()1283 Selector::~Selector()
1284 {
1285 }
1286
1287 void
destroy()1288 Selector::destroy()
1289 {
1290 {
1291 Lock sync(*this);
1292
1293 //
1294 // Make sure any pending changes are processed to ensure remaining
1295 // streams/sockets are closed.
1296 //
1297 _destroyed = true;
1298 CFRunLoopSourceSignal(_source.get());
1299 CFRunLoopWakeUp(_runLoop);
1300
1301 while(!_changes.empty())
1302 {
1303 CFRunLoopSourceSignal(_source.get());
1304 CFRunLoopWakeUp(_runLoop);
1305
1306 wait();
1307 }
1308 }
1309
1310 _thread->getThreadControl().join();
1311 _thread = 0;
1312
1313 Lock sync(*this);
1314 _source.reset(0);
1315
1316 //assert(_wrappers.empty());
1317 _readyHandlers.clear();
1318 _selectedHandlers.clear();
1319 }
1320
1321 void
initialize(EventHandler * handler)1322 Selector::initialize(EventHandler* handler)
1323 {
1324 Lock sync(*this);
1325 _wrappers[handler] = new EventHandlerWrapper(handler, *this);
1326 }
1327
1328 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)1329 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
1330 {
1331 Lock sync(*this);
1332 const EventHandlerWrapperPtr& wrapper = _wrappers[handler];
1333 if(wrapper->update(remove, add))
1334 {
1335 _changes.insert(wrapper);
1336 notify();
1337 }
1338 }
1339
1340 void
enable(EventHandler * handler,SocketOperation op)1341 Selector::enable(EventHandler* handler, SocketOperation op)
1342 {
1343 Lock sync(*this);
1344 if(!(handler->_disabled & op))
1345 {
1346 return;
1347 }
1348 handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~op);
1349
1350 if(handler->_registered & op)
1351 {
1352 _wrappers[handler]->checkReady();
1353 }
1354 }
1355
1356 void
disable(EventHandler * handler,SocketOperation op)1357 Selector::disable(EventHandler* handler, SocketOperation op)
1358 {
1359 Lock sync(*this);
1360 if(handler->_disabled & op)
1361 {
1362 return;
1363 }
1364 handler->_disabled = static_cast<SocketOperation>(handler->_disabled | op);
1365 }
1366
1367 bool
finish(EventHandler * handler,bool closeNow)1368 Selector::finish(EventHandler* handler, bool closeNow)
1369 {
1370 Lock sync(*this);
1371 std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
1372 assert(p != _wrappers.end());
1373 EventHandlerWrapperPtr wrapper = p->second;
1374 if(wrapper->finish())
1375 {
1376 _changes.insert(wrapper);
1377 notify();
1378 }
1379 _wrappers.erase(p);
1380 return closeNow;
1381 }
1382
1383 void
ready(EventHandler * handler,SocketOperation status,bool value)1384 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
1385 {
1386 if(((handler->_ready & status) != 0) == value)
1387 {
1388 return; // Nothing to do if ready state already correctly set.
1389 }
1390
1391 if(value)
1392 {
1393 handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
1394 }
1395 else
1396 {
1397 handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
1398 }
1399
1400 Lock sync(*this);
1401 std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
1402 assert(p != _wrappers.end());
1403 p->second->checkReady();
1404 }
1405
1406 void
startSelect()1407 Selector::startSelect()
1408 {
1409 Lock sync(*this);
1410
1411 //
1412 // Re-enable callbacks for previously selected handlers.
1413 //
1414 vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p;
1415 for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p)
1416 {
1417 if(p->first->checkReady())
1418 {
1419 _changes.insert(p->first);
1420 }
1421 }
1422 _selectedHandlers.clear();
1423 }
1424
1425 void
finishSelect(std::vector<std::pair<EventHandler *,SocketOperation>> & handlers)1426 Selector::finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers)
1427 {
1428 Lock sync(*this);
1429 handlers.clear();
1430 for(set<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p)
1431 {
1432 SocketOperation op = (*p)->readyOp();
1433 if(op)
1434 {
1435 _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op));
1436 handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op));
1437 }
1438 }
1439 _readyHandlers.clear();
1440 }
1441
1442 void
select(int timeout)1443 Selector::select(int timeout)
1444 {
1445 //
1446 // Wait for handlers to be ready.
1447 //
1448 Lock sync(*this);
1449 while(!_destroyed)
1450 {
1451 while(!_changes.empty())
1452 {
1453 CFRunLoopSourceSignal(_source.get());
1454 CFRunLoopWakeUp(_runLoop);
1455
1456 wait();
1457 }
1458
1459 if(_readyHandlers.empty())
1460 {
1461 if(timeout > 0)
1462 {
1463 if(!timedWait(IceUtil::Time::seconds(timeout)))
1464 {
1465 break;
1466 }
1467 }
1468 else
1469 {
1470 wait();
1471 }
1472 }
1473
1474 if(_changes.empty())
1475 {
1476 break;
1477 }
1478 }
1479 }
1480
1481 void
processInterrupt()1482 Selector::processInterrupt()
1483 {
1484 Lock sync(*this);
1485 if(!_changes.empty())
1486 {
1487 for(set<EventHandlerWrapperPtr>::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
1488 {
1489 (*p)->updateRunLoop();
1490 }
1491 _changes.clear();
1492 notify();
1493 }
1494 if(_destroyed)
1495 {
1496 CFRunLoopStop(_runLoop);
1497 }
1498 }
1499
1500 void
run()1501 Selector::run()
1502 {
1503 {
1504 Lock sync(*this);
1505 _runLoop = CFRunLoopGetCurrent();
1506 notify();
1507 }
1508
1509 CFRunLoopAddSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1510 CFRunLoopRun();
1511 CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1512 }
1513
1514 void
ready(EventHandlerWrapper * wrapper,SocketOperation op,int error)1515 Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
1516 {
1517 Lock sync(*this);
1518 wrapper->ready(op, error);
1519 }
1520
1521 void
addReadyHandler(EventHandlerWrapper * wrapper)1522 Selector::addReadyHandler(EventHandlerWrapper* wrapper)
1523 {
1524 // Called from ready()
1525 _readyHandlers.insert(wrapper);
1526 if(_readyHandlers.size() == 1)
1527 {
1528 notify();
1529 }
1530 }
1531
1532 #endif
1533