1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <Ice/Selector.h>
6 #include <Ice/EventHandler.h>
7 #include <Ice/Instance.h>
8 #include <Ice/LoggerUtil.h>
9 #include <Ice/LocalException.h>
10 #include <IceUtil/Time.h>
11 
12 #ifdef ICE_USE_CFSTREAM
13 #   include <CoreFoundation/CoreFoundation.h>
14 #   include <CoreFoundation/CFStream.h>
15 #endif
16 
17 using namespace std;
18 using namespace IceInternal;
19 
20 #if defined(ICE_USE_KQUEUE)
21 namespace
22 {
23 struct timespec zeroTimeout = { 0, 0 };
24 }
25 #endif
26 
27 #if defined(ICE_OS_UWP)
28 using namespace Windows::Storage::Streams;
29 using namespace Windows::Networking;
30 using namespace Windows::Networking::Sockets;
31 #endif
32 
33 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
34 
Selector(const InstancePtr & instance)35 Selector::Selector(const InstancePtr& instance) : _instance(instance)
36 {
37 }
38 
~Selector()39 Selector::~Selector()
40 {
41 }
42 
43 #ifdef ICE_USE_IOCP
44 void
setup(int sizeIO)45 Selector::setup(int sizeIO)
46 {
47     _handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, ICE_NULLPTR, 0, sizeIO);
48     if(_handle == ICE_NULLPTR)
49     {
50         throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
51     }
52 }
53 #endif
54 
55 void
destroy()56 Selector::destroy()
57 {
58 #ifdef ICE_USE_IOCP
59     CloseHandle(_handle);
60 #endif
61 }
62 
63 void
initialize(EventHandler * handler)64 Selector::initialize(EventHandler* handler)
65 {
66     if(!handler->getNativeInfo())
67     {
68         return;
69     }
70 
71 #ifdef ICE_USE_IOCP
72     SOCKET socket = handler->getNativeInfo()->fd();
73     if (socket != INVALID_SOCKET)
74     {
75         if (CreateIoCompletionPort(reinterpret_cast<HANDLE>(socket),
76                                    _handle,
77                                    reinterpret_cast<ULONG_PTR>(handler),
78                                    0) == ICE_NULLPTR)
79         {
80             throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
81         }
82     }
83     handler->getNativeInfo()->initialize(_handle, reinterpret_cast<ULONG_PTR>(handler));
84 #else
85     EventHandlerPtr h = ICE_GET_SHARED_FROM_THIS(handler);
86     handler->getNativeInfo()->setCompletedHandler(
87         ref new SocketOperationCompletedHandler(
88             [=](int operation)
89             {
90                 //
91                 // Use the reference counted handler to ensure it's not
92                 // destroyed as long as the callback lambda exists.
93                 //
94                 completed(h.get(), static_cast<SocketOperation>(operation));
95             }));
96 #endif
97 }
98 
99 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)100 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
101 {
102     handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
103     handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
104     if(add & SocketOperationRead && !(handler->_pending & SocketOperationRead))
105     {
106         handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationRead);
107         completed(handler, SocketOperationRead); // Start an asynchrnous read
108     }
109     else if(add & SocketOperationWrite && !(handler->_pending & SocketOperationWrite))
110     {
111         handler->_pending = static_cast<SocketOperation>(handler->_pending | SocketOperationWrite);
112         completed(handler, SocketOperationWrite); // Start an asynchrnous write
113     }
114 }
115 
116 void
finish(IceInternal::EventHandler * handler)117 Selector::finish(IceInternal::EventHandler* handler)
118 {
119 #ifdef ICE_OS_UWP
120     // If async operations are no longer pending, clear the completion handler to break
121     // the cyclic reference count.
122     assert(!handler->_started && !handler->_pending);
123     handler->getNativeInfo()->setCompletedHandler(nullptr);
124 #endif
125     handler->_registered = SocketOperationNone;
126     handler->_finish = false; // Ensures that finished() is only called once on the event handler.
127 }
128 
129 void
ready(EventHandler * handler,SocketOperation status,bool value)130 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
131 {
132     if(((handler->_ready & status) != 0) == value)
133     {
134         return; // Nothing to do if ready state already correctly set.
135     }
136 
137     if(value)
138     {
139         handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
140     }
141     else
142     {
143         handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
144     }
145 }
146 
147 EventHandler*
148 #ifdef ICE_USE_IOCP
getNextHandler(SocketOperation & status,DWORD & count,int & error,int timeout)149 Selector::getNextHandler(SocketOperation& status, DWORD& count, int& error, int timeout)
150 #else
151 Selector::getNextHandler(SocketOperation& status, int timeout)
152 #endif
153 {
154 #ifdef ICE_USE_IOCP
155     ULONG_PTR key;
156     LPOVERLAPPED ol;
157     error = ERROR_SUCCESS;
158 
159     if(!GetQueuedCompletionStatus(_handle, &count, &key, &ol, timeout > 0 ? timeout * 1000 : INFINITE))
160     {
161         int err = WSAGetLastError();
162         if(ol == 0)
163         {
164             if(err == WAIT_TIMEOUT)
165             {
166                 throw SelectorTimeoutException();
167             }
168             else
169             {
170                 Ice::SocketException ex(__FILE__, __LINE__, err);
171                 Ice::Error out(_instance->initializationData().logger);
172                 out << "couldn't dequeue packet from completion port:\n" << ex;
173                 IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5)); // Sleep 5s to avoid looping
174             }
175         }
176         AsyncInfo* info = static_cast<AsyncInfo*>(ol);
177         if(info)
178         {
179             status = info->status;
180         }
181         count = 0;
182         error = WSAGetLastError();
183         return reinterpret_cast<EventHandler*>(key);
184     }
185 
186     AsyncInfo* info = static_cast<AsyncInfo*>(ol);
187     if(info)
188     {
189         status = info->status;
190     }
191     else
192     {
193         status = reinterpret_cast<EventHandler*>(key)->_ready;
194     }
195     return reinterpret_cast<EventHandler*>(key);
196 #else
197     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
198     while(_events.empty())
199     {
200         if(timeout > 0)
201         {
202             _monitor.timedWait(IceUtil::Time::seconds(timeout));
203             if(_events.empty())
204             {
205                 throw SelectorTimeoutException();
206             }
207         }
208         else
209         {
210             _monitor.wait();
211         }
212     }
213     assert(!_events.empty());
214     IceInternal::EventHandlerPtr handler = _events.front().handler;
215     const SelectEvent& event = _events.front();
216     status = event.status;
217     _events.pop_front();
218     return handler.get();
219 #endif
220 }
221 
222 void
completed(EventHandler * handler,SocketOperation op)223 Selector::completed(EventHandler* handler, SocketOperation op)
224 {
225 #ifdef ICE_USE_IOCP
226     AsyncInfo* info = 0;
227     NativeInfoPtr nativeInfo = handler->getNativeInfo();
228     if(nativeInfo)
229     {
230         info = nativeInfo->getAsyncInfo(op);
231     }
232     if(!PostQueuedCompletionStatus(_handle, 0, reinterpret_cast<ULONG_PTR>(handler), info))
233     {
234         throw Ice::SocketException(__FILE__, __LINE__, GetLastError());
235     }
236 #else
237     IceUtil::Monitor<IceUtil::Mutex>::Lock lock(_monitor);
238     _events.push_back(SelectEvent(handler->shared_from_this(), op));
239     _monitor.notify();
240 #endif
241 }
242 
243 #elif defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL) || defined(ICE_USE_SELECT) || defined(ICE_USE_POLL)
244 
Selector(const InstancePtr & instance)245 Selector::Selector(const InstancePtr& instance) : _instance(instance), _interrupted(false)
246 {
247     SOCKET fds[2];
248     createPipe(fds);
249     _fdIntrRead = fds[0];
250     _fdIntrWrite = fds[1];
251     _selecting = false;
252 
253 #if defined(ICE_USE_EPOLL)
254     _events.resize(256);
255     _queueFd = epoll_create(1);
256     if(_queueFd < 0)
257     {
258         throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
259     }
260 
261     epoll_event event;
262     memset(&event, 0, sizeof(epoll_event));
263     event.data.ptr = 0;
264     event.events = EPOLLIN;
265     if(epoll_ctl(_queueFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
266     {
267         Ice::Error out(_instance->initializationData().logger);
268         out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
269     }
270 #elif defined(ICE_USE_KQUEUE)
271     _events.resize(256);
272     _queueFd = kqueue();
273     if(_queueFd < 0)
274     {
275         throw Ice::SocketException(__FILE__, __LINE__, getSocketErrno());
276     }
277 
278     struct kevent ev;
279     EV_SET(&ev, _fdIntrRead, EVFILT_READ, EV_ADD, 0, 0, 0);
280     int rs = kevent(_queueFd, &ev, 1, 0, 0, 0);
281     if(rs < 0)
282     {
283         Ice::Error out(_instance->initializationData().logger);
284         out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
285     }
286 #elif defined(ICE_USE_SELECT)
287     FD_ZERO(&_readFdSet);
288     FD_ZERO(&_writeFdSet);
289     FD_ZERO(&_errorFdSet);
290     FD_SET(_fdIntrRead, &_readFdSet);
291 #else
292     struct pollfd pollFd;
293     pollFd.fd = _fdIntrRead;
294     pollFd.events = POLLIN;
295     _pollFdSet.push_back(pollFd);
296 #endif
297 }
298 
~Selector()299 Selector::~Selector()
300 {
301 }
302 
303 void
destroy()304 Selector::destroy()
305 {
306 #if defined(ICE_USE_KQUEUE) || defined(ICE_USE_EPOLL)
307     try
308     {
309         closeSocket(_queueFd);
310     }
311     catch(const Ice::LocalException& ex)
312     {
313         Ice::Error out(_instance->initializationData().logger);
314         out << "exception in selector while calling closeSocket():\n" << ex;
315     }
316 #endif
317 
318     try
319     {
320         closeSocket(_fdIntrWrite);
321     }
322     catch(const Ice::LocalException& ex)
323     {
324         Ice::Error out(_instance->initializationData().logger);
325         out << "exception in selector while calling closeSocket():\n" << ex;
326     }
327 
328     try
329     {
330         closeSocket(_fdIntrRead);
331     }
332     catch(const Ice::LocalException& ex)
333     {
334         Ice::Error out(_instance->initializationData().logger);
335         out << "exception in selector while calling closeSocket():\n" << ex;
336     }
337 }
338 
339 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)340 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
341 {
342     SocketOperation previous = handler->_registered;
343     handler->_registered = static_cast<SocketOperation>(handler->_registered & ~remove);
344     handler->_registered = static_cast<SocketOperation>(handler->_registered | add);
345     if(previous == handler->_registered)
346     {
347         return;
348     }
349     checkReady(handler);
350 
351     NativeInfoPtr nativeInfo = handler->getNativeInfo();
352     if(nativeInfo && nativeInfo->fd() != INVALID_SOCKET)
353     {
354         updateSelectorForEventHandler(handler, remove, add);
355     }
356 }
357 
358 void
enable(EventHandler * handler,SocketOperation status)359 Selector::enable(EventHandler* handler, SocketOperation status)
360 {
361     if(!(handler->_disabled & status))
362     {
363         return;
364     }
365     handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~status);
366     checkReady(handler);
367 
368     NativeInfoPtr nativeInfo = handler->getNativeInfo();
369     if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
370     {
371         return;
372     }
373 
374     if(handler->_registered & status)
375     {
376 #if defined(ICE_USE_EPOLL)
377         SOCKET fd = nativeInfo->fd();
378         SocketOperation previous = static_cast<SocketOperation>(handler->_registered & ~(handler->_disabled | status));
379         SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
380         epoll_event event;
381         memset(&event, 0, sizeof(epoll_event));
382         event.data.ptr = handler;
383         if(newStatus & SocketOperationRead)
384         {
385             event.events |= EPOLLIN;
386         }
387         if(newStatus & SocketOperationWrite)
388         {
389             event.events |= EPOLLOUT;
390         }
391         if(epoll_ctl(_queueFd, previous ? EPOLL_CTL_MOD : EPOLL_CTL_ADD, fd, &event) != 0)
392         {
393             Ice::Error out(_instance->initializationData().logger);
394             out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
395         }
396 #elif defined(ICE_USE_KQUEUE)
397         struct kevent ev;
398         SOCKET fd = handler->getNativeInfo()->fd();
399         EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_ENABLE, 0, 0, handler);
400         _changes.push_back(ev);
401         if(_selecting)
402         {
403             updateSelector();
404         }
405 #else
406         _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
407         wakeup();
408 #endif
409     }
410 }
411 
412 void
disable(EventHandler * handler,SocketOperation status)413 Selector::disable(EventHandler* handler, SocketOperation status)
414 {
415     if(handler->_disabled & status)
416     {
417         return;
418     }
419     handler->_disabled = static_cast<SocketOperation>(handler->_disabled | status);
420     checkReady(handler);
421 
422     NativeInfoPtr nativeInfo = handler->getNativeInfo();
423     if(!nativeInfo || nativeInfo->fd() == INVALID_SOCKET)
424     {
425         return;
426     }
427 
428     if(handler->_registered & status)
429     {
430 #if defined(ICE_USE_EPOLL)
431         SOCKET fd = nativeInfo->fd();
432         SocketOperation newStatus = static_cast<SocketOperation>(handler->_registered & ~handler->_disabled);
433         epoll_event event;
434         memset(&event, 0, sizeof(epoll_event));
435         event.data.ptr = handler;
436         if(newStatus & SocketOperationRead)
437         {
438             event.events |= EPOLLIN;
439         }
440         if(newStatus & SocketOperationWrite)
441         {
442             event.events |= EPOLLOUT;
443         }
444         if(epoll_ctl(_queueFd, newStatus ? EPOLL_CTL_MOD : EPOLL_CTL_DEL, fd, &event) != 0)
445         {
446             Ice::Error out(_instance->initializationData().logger);
447             out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
448         }
449 #elif defined(ICE_USE_KQUEUE)
450         SOCKET fd = nativeInfo->fd();
451         struct kevent ev;
452         EV_SET(&ev, fd, status == SocketOperationRead ? EVFILT_READ : EVFILT_WRITE, EV_DISABLE, 0, 0, handler);
453         _changes.push_back(ev);
454         if(_selecting)
455         {
456             updateSelector();
457         }
458 #else
459         _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
460         wakeup();
461 #endif
462     }
463 }
464 
465 bool
finish(EventHandler * handler,bool closeNow)466 Selector::finish(EventHandler* handler, bool closeNow)
467 {
468     if(handler->_registered)
469     {
470         update(handler, handler->_registered, SocketOperationNone);
471 #if !defined(ICE_USE_EPOLL) && !defined(ICE_USE_KQUEUE)
472         return false; // Don't close now if selecting
473 #endif
474     }
475 
476 #if defined(ICE_USE_KQUEUE)
477     if(closeNow && !_changes.empty())
478     {
479         //
480         // Update selector now to remove the FD from the kqueue if
481         // we're going to close it now. This isn't necessary for
482         // epoll since we always update the epoll FD immediately.
483         //
484         updateSelector();
485     }
486 #elif !defined(ICE_USE_EPOLL)
487     if(!_changes.empty())
488     {
489         return false;
490     }
491 #endif
492 
493     return closeNow;
494 }
495 
496 void
ready(EventHandler * handler,SocketOperation status,bool value)497 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
498 {
499     if(((handler->_ready & status) != 0) == value)
500     {
501         return; // Nothing to do if ready state already correctly set.
502     }
503 
504     if(status & SocketOperationConnect)
505     {
506         NativeInfoPtr nativeInfo = handler->getNativeInfo();
507         if(nativeInfo && nativeInfo->newFd() && handler->_registered)
508         {
509             // If new FD is set after connect, register the FD with the selector.
510             updateSelectorForEventHandler(handler, SocketOperationNone, handler->_registered);
511         }
512     }
513 
514     if(value)
515     {
516         handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
517     }
518     else
519     {
520         handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
521     }
522     checkReady(handler);
523 }
524 
525 void
wakeup()526 Selector::wakeup()
527 {
528     if(_selecting && !_interrupted)
529     {
530         char c = 0;
531         while(true)
532         {
533             if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
534             {
535                 if(interrupted())
536                 {
537                     continue;
538                 }
539 
540                 throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
541             }
542             break;
543         }
544         _interrupted = true;
545     }
546 }
547 
548 void
startSelect()549 Selector::startSelect()
550 {
551     if(_interrupted)
552     {
553         char c;
554         while(true)
555         {
556             ssize_t ret = ::read(_fdIntrRead, &c, 1);
557             if(ret == SOCKET_ERROR)
558             {
559                 if(interrupted())
560                 {
561                     continue;
562                 }
563                 throw Ice::SocketException(__FILE__, __LINE__, IceInternal::getSocketErrno());
564             }
565             break;
566         }
567         _interrupted = false;
568     }
569 
570 #if !defined(ICE_USE_EPOLL)
571     if(!_changes.empty())
572     {
573         updateSelector();
574     }
575 #endif
576     _selecting = true;
577 
578     //
579     // If there are ready handlers, don't block in select, just do a non-blocking
580     // select to retrieve new ready handlers from the Java selector.
581     //
582     _selectNow = !_readyHandlers.empty();
583 }
584 
585 void
finishSelect(vector<pair<EventHandler *,SocketOperation>> & handlers)586 Selector::finishSelect(vector<pair<EventHandler*, SocketOperation> >& handlers)
587 {
588     _selecting = false;
589 
590     assert(handlers.empty());
591 
592 #if defined(ICE_USE_POLL) || defined(ICE_USE_SELECT)
593     if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers
594     {
595         return;
596     }
597 #endif
598 
599 #if defined(ICE_USE_POLL)
600     for(vector<struct pollfd>::const_iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
601 #else
602     for(int i = 0; i < _count; ++i)
603 #endif
604     {
605         pair<EventHandler*, SocketOperation> p;
606 
607 #if defined(ICE_USE_EPOLL)
608         struct epoll_event& ev = _events[i];
609         p.first = reinterpret_cast<EventHandler*>(ev.data.ptr);
610         p.second = static_cast<SocketOperation>(((ev.events & (EPOLLIN | EPOLLERR)) ?
611                                                  SocketOperationRead : SocketOperationNone) |
612                                                 ((ev.events & (EPOLLOUT | EPOLLERR)) ?
613                                                  SocketOperationWrite : SocketOperationNone));
614 #elif defined(ICE_USE_KQUEUE)
615         struct kevent& ev = _events[i];
616         if(ev.flags & EV_ERROR)
617         {
618             Ice::Error out(_instance->initializationData().logger);
619             out << "selector returned error:\n" << IceUtilInternal::errorToString(ev.data);
620             continue;
621         }
622         p.first = reinterpret_cast<EventHandler*>(ev.udata);
623         p.second = (ev.filter == EVFILT_READ) ? SocketOperationRead : SocketOperationWrite;
624 #elif defined(ICE_USE_SELECT)
625         //
626         // Round robin for the filedescriptors.
627         //
628         SOCKET fd;
629         p.second = SocketOperationNone;
630         if(i < _selectedReadFdSet.fd_count)
631         {
632             fd = _selectedReadFdSet.fd_array[i];
633             p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
634         }
635         else if(i < _selectedWriteFdSet.fd_count + _selectedReadFdSet.fd_count)
636         {
637             fd = _selectedWriteFdSet.fd_array[i - _selectedReadFdSet.fd_count];
638             p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
639         }
640         else
641         {
642             fd = _selectedErrorFdSet.fd_array[i - _selectedReadFdSet.fd_count - _selectedWriteFdSet.fd_count];
643             p.second = static_cast<SocketOperation>(p.second | SocketOperationConnect);
644         }
645 
646         assert(fd != _fdIntrRead);
647         p.first = _handlers[fd];
648 #else
649         if(r->revents == 0)
650         {
651             continue;
652         }
653 
654         SOCKET fd = r->fd;
655         assert(_handlers.find(fd) != _handlers.end());
656         p.first = _handlers[fd];
657         p.second = SocketOperationNone;
658         if(r->revents & (POLLIN | POLLERR | POLLHUP))
659         {
660             p.second = static_cast<SocketOperation>(p.second | SocketOperationRead);
661         }
662         if(r->revents & (POLLOUT | POLLERR | POLLHUP))
663         {
664             p.second = static_cast<SocketOperation>(p.second | SocketOperationWrite);
665         }
666         assert(p.second);
667 #endif
668         if(!p.first)
669         {
670             continue; // Interrupted
671         }
672 
673         map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.find(ICE_GET_SHARED_FROM_THIS(p.first));
674 
675         if(q != _readyHandlers.end()) // Handler will be added by the loop below
676         {
677             q->second = p.second; // We just remember which operations are ready here.
678         }
679         else
680         {
681             handlers.push_back(p);
682         }
683     }
684 
685     for(map<EventHandlerPtr, SocketOperation>::iterator q = _readyHandlers.begin(); q != _readyHandlers.end(); ++q)
686     {
687         pair<EventHandler*, SocketOperation> p;
688         p.first = q->first.get();
689         p.second = static_cast<SocketOperation>(p.first->_ready & ~p.first->_disabled & p.first->_registered);
690         p.second = static_cast<SocketOperation>(p.second | q->second);
691         if(p.second)
692         {
693             handlers.push_back(p);
694         }
695 
696         //
697         // Reset the operation, it's only used by this method to temporarly store the socket status
698         // return by the select operation above.
699         //
700         q->second = SocketOperationNone;
701     }
702 }
703 
704 void
select(int timeout)705 Selector::select(int timeout)
706 {
707     if(_selectNow)
708     {
709         timeout = 0;
710     }
711     else if(timeout > 0)
712     {
713         timeout = timeout * 1000;
714     }
715     else
716     {
717         timeout = -1;
718     }
719 
720     int spuriousWakeup = 0;
721     while(true)
722     {
723 #if defined(ICE_USE_EPOLL)
724         _count = epoll_wait(_queueFd, &_events[0], _events.size(), timeout);
725 #elif defined(ICE_USE_KQUEUE)
726         assert(!_events.empty());
727         if(timeout >= 0)
728         {
729             struct timespec ts;
730             ts.tv_sec = timeout;
731             ts.tv_nsec = 0;
732             _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), &ts);
733         }
734         else
735         {
736             _count = kevent(_queueFd, 0, 0, &_events[0], _events.size(), 0);
737         }
738 #elif defined(ICE_USE_SELECT)
739         fd_set* rFdSet = fdSetCopy(_selectedReadFdSet, _readFdSet);
740         fd_set* wFdSet = fdSetCopy(_selectedWriteFdSet, _writeFdSet);
741         fd_set* eFdSet = fdSetCopy(_selectedErrorFdSet, _errorFdSet);
742         if(timeout >= 0)
743         {
744             struct timeval tv;
745             tv.tv_sec = timeout;
746             tv.tv_usec = 0;
747             _count = ::select(0, rFdSet, wFdSet, eFdSet, &tv); // The first parameter is ignored on Windows
748         }
749         else
750         {
751             _count = ::select(0, rFdSet, wFdSet, eFdSet, 0); // The first parameter is ignored on Windows
752         }
753 #else
754         _count = poll(&_pollFdSet[0], _pollFdSet.size(), timeout);
755 #endif
756 
757         if(_count == SOCKET_ERROR)
758         {
759             if(interrupted())
760             {
761                 continue;
762             }
763 
764             Ice::SocketException ex(__FILE__, __LINE__, IceInternal::getSocketErrno());
765             Ice::Error out(_instance->initializationData().logger);
766             out << "selector failed:\n" << ex;
767             IceUtil::ThreadControl::sleep(IceUtil::Time::seconds(5)); // Sleep 5s to avoid looping
768         }
769         else if(_count == 0 && timeout < 0)
770         {
771             if(++spuriousWakeup > 100)
772             {
773                 spuriousWakeup = 0;
774                 _instance->initializationData().logger->warning("spurious selector wakeup");
775             }
776             IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(1));
777             continue;
778         }
779         break;
780     }
781 
782     if(_count == 0 && !_selectNow)
783     {
784         throw SelectorTimeoutException();
785     }
786 }
787 
788 void
checkReady(EventHandler * handler)789 Selector::checkReady(EventHandler* handler)
790 {
791     if(handler->_ready & ~handler->_disabled & handler->_registered)
792     {
793         _readyHandlers.insert(make_pair(ICE_GET_SHARED_FROM_THIS(handler), SocketOperationNone));
794         wakeup();
795     }
796     else
797     {
798         map<EventHandlerPtr, SocketOperation>::iterator p = _readyHandlers.find(ICE_GET_SHARED_FROM_THIS(handler));
799         if(p != _readyHandlers.end())
800         {
801             _readyHandlers.erase(p);
802         }
803     }
804 }
805 
806 void
updateSelector()807 Selector::updateSelector()
808 {
809 #if defined(ICE_USE_KQUEUE)
810     int rs = kevent(_queueFd, &_changes[0], _changes.size(), &_changes[0], _changes.size(), &zeroTimeout);
811     if(rs < 0)
812     {
813         Ice::Error out(_instance->initializationData().logger);
814         out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
815     }
816     else
817     {
818         for(int i = 0; i < rs; ++i)
819         {
820             //
821             // Check for errors, we ignore EINPROGRESS that started showing up with macOS Sierra
822             // and which occurs when another thread removes the FD from the kqueue (see ICE-7419).
823             //
824             if(_changes[i].flags & EV_ERROR && _changes[i].data != EINPROGRESS)
825             {
826                 Ice::Error out(_instance->initializationData().logger);
827                 out << "error while updating selector:\n" << IceUtilInternal::errorToString(_changes[i].data);
828             }
829         }
830     }
831     _changes.clear();
832 #elif !defined(ICE_USE_EPOLL)
833     assert(!_selecting);
834 
835     for(vector<pair<EventHandler*, SocketOperation> >::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
836     {
837         EventHandler* handler = p->first;
838         SocketOperation status = p->second;
839 
840         SOCKET fd = handler->getNativeInfo()->fd();
841         if(status)
842         {
843 #if defined(ICE_USE_SELECT)
844             if(status & SocketOperationRead)
845             {
846                 FD_SET(fd, &_readFdSet);
847             }
848             else
849             {
850                 FD_CLR(fd, &_readFdSet);
851             }
852             if(status & SocketOperationWrite)
853             {
854                 FD_SET(fd, &_writeFdSet);
855             }
856             else
857             {
858                 FD_CLR(fd, &_writeFdSet);
859             }
860             if(status & SocketOperationConnect)
861             {
862                 FD_SET(fd, &_writeFdSet);
863                 FD_SET(fd, &_errorFdSet);
864             }
865             else
866             {
867                 FD_CLR(fd, &_writeFdSet);
868                 FD_CLR(fd, &_errorFdSet);
869             }
870             _handlers[fd] = handler;
871 #else
872             short events = 0;
873             if(status & SocketOperationRead)
874             {
875                 events |= POLLIN;
876             }
877             if(status & SocketOperationWrite)
878             {
879                 events |= POLLOUT;
880             }
881             map<SOCKET, EventHandler*>::const_iterator q = _handlers.find(fd);
882             if(q == _handlers.end())
883             {
884                 struct pollfd pollFd;
885                 pollFd.fd = fd;
886                 pollFd.events = events;
887                 pollFd.revents = 0;
888                 _pollFdSet.push_back(pollFd);
889                 _handlers.insert(make_pair(fd, handler));
890             }
891             else
892             {
893                 for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
894                 {
895                     if(r->fd == fd)
896                     {
897                         r->events = events;
898                         break;
899                     }
900                 }
901             }
902 #endif
903         }
904         else
905         {
906 #if defined(ICE_USE_SELECT)
907             FD_CLR(fd, &_readFdSet);
908             FD_CLR(fd, &_writeFdSet);
909             FD_CLR(fd, &_errorFdSet);
910 #else
911             for(vector<struct pollfd>::iterator r = _pollFdSet.begin(); r != _pollFdSet.end(); ++r)
912             {
913                 if(r->fd == fd)
914                 {
915                     _pollFdSet.erase(r);
916                     break;
917                 }
918             }
919 #endif
920             _handlers.erase(fd);
921         }
922     }
923     _changes.clear();
924 #endif
925 }
926 
927 void
updateSelectorForEventHandler(EventHandler * handler,SocketOperation remove,SocketOperation add)928 Selector::updateSelectorForEventHandler(EventHandler* handler, SocketOperation remove, SocketOperation add)
929 {
930 #if defined(ICE_USE_EPOLL)
931     SocketOperation previous = handler->_registered;
932     previous = static_cast<SocketOperation>(previous & ~add);
933     previous = static_cast<SocketOperation>(previous | remove);
934     SOCKET fd = handler->getNativeInfo()->fd();
935     assert(fd != INVALID_SOCKET);
936     epoll_event event;
937     memset(&event, 0, sizeof(epoll_event));
938     event.data.ptr = handler;
939     SocketOperation status = handler->_registered;
940     if(handler->_disabled)
941     {
942         status = static_cast<SocketOperation>(status & ~handler->_disabled);
943         previous = static_cast<SocketOperation>(previous & ~handler->_disabled);
944     }
945     if(status & SocketOperationRead)
946     {
947         event.events |= EPOLLIN;
948     }
949     if(status & SocketOperationWrite)
950     {
951         event.events |= EPOLLOUT;
952     }
953     int op;
954     if(!previous && status)
955     {
956         op = EPOLL_CTL_ADD;
957     }
958     else if(previous && !status)
959     {
960         op = EPOLL_CTL_DEL;
961     }
962     else if(previous == status)
963     {
964         return;
965     }
966     else
967     {
968         op = EPOLL_CTL_MOD;
969     }
970     if(epoll_ctl(_queueFd, op, fd, &event) != 0)
971     {
972         Ice::Error out(_instance->initializationData().logger);
973         out << "error while updating selector:\n" << IceUtilInternal::errorToString(IceInternal::getSocketErrno());
974     }
975 #elif defined(ICE_USE_KQUEUE)
976     SOCKET fd = handler->getNativeInfo()->fd();
977     assert(fd != INVALID_SOCKET);
978     if(remove & SocketOperationRead)
979     {
980         struct kevent ev;
981         EV_SET(&ev, fd, EVFILT_READ, EV_DELETE, 0, 0, handler);
982         _changes.push_back(ev);
983     }
984     if(remove & SocketOperationWrite)
985     {
986         struct kevent ev;
987         EV_SET(&ev, fd, EVFILT_WRITE, EV_DELETE, 0, 0, handler);
988         _changes.push_back(ev);
989     }
990     if(add & SocketOperationRead)
991     {
992         struct kevent ev;
993         EV_SET(&ev, fd, EVFILT_READ, EV_ADD | (handler->_disabled & SocketOperationRead ? EV_DISABLE : 0), 0, 0,
994                handler);
995         _changes.push_back(ev);
996     }
997     if(add & SocketOperationWrite)
998     {
999         struct kevent ev;
1000         EV_SET(&ev, fd, EVFILT_WRITE, EV_ADD | (handler->_disabled & SocketOperationWrite ? EV_DISABLE : 0), 0, 0,
1001                handler);
1002         _changes.push_back(ev);
1003     }
1004     if(_selecting)
1005     {
1006         updateSelector();
1007     }
1008 #else
1009     _changes.push_back(make_pair(handler, static_cast<SocketOperation>(handler->_registered & ~handler->_disabled)));
1010     wakeup();
1011 #endif
1012     checkReady(handler);
1013 }
1014 
1015 #elif defined(ICE_USE_CFSTREAM)
1016 
1017 namespace
1018 {
1019 
selectorInterrupt(void * info)1020 void selectorInterrupt(void* info)
1021 {
1022     reinterpret_cast<Selector*>(info)->processInterrupt();
1023 }
1024 
eventHandlerSocketCallback(CFSocketRef,CFSocketCallBackType callbackType,CFDataRef,const void * d,void * info)1025 void eventHandlerSocketCallback(CFSocketRef, CFSocketCallBackType callbackType, CFDataRef, const void* d, void* info)
1026 {
1027     if(callbackType == kCFSocketReadCallBack)
1028     {
1029         reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationRead);
1030     }
1031     else if(callbackType == kCFSocketWriteCallBack)
1032     {
1033         reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationWrite);
1034     }
1035     else if(callbackType == kCFSocketConnectCallBack)
1036     {
1037         reinterpret_cast<EventHandlerWrapper*>(info)->readyCallback(SocketOperationConnect,
1038                                                                     d ? *reinterpret_cast<const SInt32*>(d) : 0);
1039     }
1040 }
1041 
1042 class SelectorHelperThread : public IceUtil::Thread
1043 {
1044 public:
1045 
SelectorHelperThread(Selector & selector)1046     SelectorHelperThread(Selector& selector) : _selector(selector)
1047     {
1048     }
1049 
run()1050     virtual void run()
1051     {
1052         _selector.run();
1053 
1054 #if TARGET_IPHONE_SIMULATOR != 0
1055         //
1056         // Workaround for CFSocket bug where the CFSocketManager thread crashes if an
1057         // invalidated socket is being processed for reads/writes. We add this sleep
1058         // mostly to prevent spurious crashes with testing. This bug is very unlikely
1059         // to be hit otherwise.
1060         //
1061         IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
1062 #endif
1063     }
1064 
1065 private:
1066 
1067     Selector& _selector;
1068 };
1069 
1070 CFOptionFlags
toCFCallbacks(SocketOperation op)1071 toCFCallbacks(SocketOperation op)
1072 {
1073     CFOptionFlags cbs = 0;
1074     if(op & SocketOperationRead)
1075     {
1076         cbs |= kCFSocketReadCallBack;
1077     }
1078     if(op & SocketOperationWrite)
1079     {
1080         cbs |= kCFSocketWriteCallBack;
1081     }
1082     if(op & SocketOperationConnect)
1083     {
1084         cbs |= kCFSocketConnectCallBack;
1085     }
1086     return cbs;
1087 }
1088 
1089 }
1090 
EventHandlerWrapper(EventHandler * handler,Selector & selector)1091 EventHandlerWrapper::EventHandlerWrapper(EventHandler* handler, Selector& selector) :
1092     _handler(ICE_GET_SHARED_FROM_THIS(handler)),
1093     _streamNativeInfo(StreamNativeInfoPtr::dynamicCast(handler->getNativeInfo())),
1094     _selector(selector),
1095     _ready(SocketOperationNone),
1096     _finish(false)
1097 {
1098     if(_streamNativeInfo)
1099     {
1100         _streamNativeInfo->initStreams(this);
1101     }
1102     else if(handler->getNativeInfo())
1103     {
1104         SOCKET fd = handler->getNativeInfo()->fd();
1105         CFSocketContext ctx = { 0, this, 0, 0, 0 };
1106         _socket.reset(CFSocketCreateWithNative(kCFAllocatorDefault,
1107                                                fd,
1108                                                kCFSocketReadCallBack |
1109                                                kCFSocketWriteCallBack |
1110                                                kCFSocketConnectCallBack,
1111                                                eventHandlerSocketCallback,
1112                                                &ctx));
1113 
1114         // Disable automatic re-enabling of callbacks and closing of the native socket.
1115         CFSocketSetSocketFlags(_socket.get(), 0);
1116         CFSocketDisableCallBacks(_socket.get(), kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
1117         _source.reset(CFSocketCreateRunLoopSource(kCFAllocatorDefault, _socket.get(), 0));
1118     }
1119 }
1120 
~EventHandlerWrapper()1121 EventHandlerWrapper::~EventHandlerWrapper()
1122 {
1123 }
1124 
1125 void
updateRunLoop()1126 EventHandlerWrapper::updateRunLoop()
1127 {
1128     SocketOperation op = _handler->_registered;
1129     assert(!op || !_finish);
1130 
1131     if(_socket)
1132     {
1133         CFSocketDisableCallBacks(_socket.get(), kCFSocketReadCallBack | kCFSocketWriteCallBack | kCFSocketConnectCallBack);
1134         if(op)
1135         {
1136             CFSocketEnableCallBacks(_socket.get(), toCFCallbacks(op));
1137         }
1138 
1139         if(op && !CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode))
1140         {
1141             CFRunLoopAddSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1142         }
1143         else if(!op && CFRunLoopContainsSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode))
1144         {
1145             CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1146         }
1147 
1148         if(_finish)
1149         {
1150             CFSocketInvalidate(_socket.get());
1151         }
1152     }
1153     else
1154     {
1155         SocketOperation readyOp = _streamNativeInfo->registerWithRunLoop(op);
1156         if(!(op & (SocketOperationWrite | SocketOperationConnect)) || _ready & SocketOperationWrite)
1157         {
1158             _streamNativeInfo->unregisterFromRunLoop(SocketOperationWrite, false);
1159         }
1160 
1161         if(!(op & (SocketOperationRead | SocketOperationConnect)) || _ready & SocketOperationRead)
1162         {
1163             _streamNativeInfo->unregisterFromRunLoop(SocketOperationRead, false);
1164         }
1165 
1166         if(readyOp)
1167         {
1168             ready(readyOp, 0);
1169         }
1170 
1171         if(_finish)
1172         {
1173             _streamNativeInfo->closeStreams();
1174         }
1175     }
1176 }
1177 
1178 void
readyCallback(SocketOperation op,int error)1179 EventHandlerWrapper::readyCallback(SocketOperation op, int error)
1180 {
1181     _selector.ready(this, op, error);
1182 }
1183 
1184 void
ready(SocketOperation op,int error)1185 EventHandlerWrapper::ready(SocketOperation op, int error)
1186 {
1187     if(!_socket)
1188     {
1189         //
1190         // Unregister the stream from the runloop as soon as we got the callback. This is
1191         // required to allow thread pool thread to perform read/write operations on the
1192         // stream (which can't be used from another thread than the run loop thread if
1193         // it's registered with a run loop).
1194         //
1195         op = _streamNativeInfo->unregisterFromRunLoop(op, error != 0);
1196     }
1197 
1198     op = static_cast<SocketOperation>(_handler->_registered & op);
1199     if(!op || _ready & op)
1200     {
1201         return;
1202     }
1203 
1204     if(_socket)
1205     {
1206         if(op & SocketOperationConnect)
1207         {
1208             _streamNativeInfo->setConnectError(error);
1209         }
1210     }
1211 
1212     _ready = static_cast<SocketOperation>(_ready | op);
1213     checkReady();
1214 }
1215 
1216 bool
checkReady()1217 EventHandlerWrapper::checkReady()
1218 {
1219     if((_ready | _handler->_ready) & ~_handler->_disabled & _handler->_registered)
1220     {
1221         _selector.addReadyHandler(this);
1222         return false;
1223     }
1224     else
1225     {
1226         return _handler->getNativeInfo() && !_finish;
1227     }
1228 }
1229 
1230 SocketOperation
readyOp()1231 EventHandlerWrapper::readyOp()
1232 {
1233     assert(!(~_handler->_registered & _ready));
1234     SocketOperation op = static_cast<SocketOperation>(~_handler->_disabled & (_ready | _handler->_ready));
1235     _ready = static_cast<SocketOperation>(~op & _ready);
1236     return op;
1237 }
1238 
1239 bool
update(SocketOperation remove,SocketOperation add)1240 EventHandlerWrapper::update(SocketOperation remove, SocketOperation add)
1241 {
1242     SocketOperation previous = _handler->_registered;
1243     _handler->_registered = static_cast<SocketOperation>(_handler->_registered & ~remove);
1244     _handler->_registered = static_cast<SocketOperation>(_handler->_registered | add);
1245     if(previous == _handler->_registered)
1246     {
1247         return false;
1248     }
1249 
1250     // Clear ready flags which might not be valid anymore.
1251     _ready = static_cast<SocketOperation>(_ready & _handler->_registered);
1252     return _handler->getNativeInfo();
1253 }
1254 
1255 bool
finish()1256 EventHandlerWrapper::finish()
1257 {
1258     _finish = true;
1259     _ready = SocketOperationNone;
1260     _handler->_registered = SocketOperationNone;
1261     return _handler->getNativeInfo();
1262 }
1263 
Selector(const InstancePtr & instance)1264 Selector::Selector(const InstancePtr& instance) : _instance(instance), _destroyed(false)
1265 {
1266     CFRunLoopSourceContext ctx;
1267     memset(&ctx, 0, sizeof(CFRunLoopSourceContext));
1268     ctx.info = this;
1269     ctx.perform = selectorInterrupt;
1270     _source.reset(CFRunLoopSourceCreate(0, 0, &ctx));
1271     _runLoop = 0;
1272 
1273     _thread = new SelectorHelperThread(*this);
1274     _thread->start();
1275 
1276     Lock sync(*this);
1277     while(!_runLoop)
1278     {
1279         wait();
1280     }
1281 }
1282 
~Selector()1283 Selector::~Selector()
1284 {
1285 }
1286 
1287 void
destroy()1288 Selector::destroy()
1289 {
1290     {
1291         Lock sync(*this);
1292 
1293         //
1294         // Make sure any pending changes are processed to ensure remaining
1295         // streams/sockets are closed.
1296         //
1297         _destroyed = true;
1298         CFRunLoopSourceSignal(_source.get());
1299         CFRunLoopWakeUp(_runLoop);
1300 
1301         while(!_changes.empty())
1302         {
1303             CFRunLoopSourceSignal(_source.get());
1304             CFRunLoopWakeUp(_runLoop);
1305 
1306             wait();
1307         }
1308     }
1309 
1310     _thread->getThreadControl().join();
1311     _thread = 0;
1312 
1313     Lock sync(*this);
1314     _source.reset(0);
1315 
1316     //assert(_wrappers.empty());
1317     _readyHandlers.clear();
1318     _selectedHandlers.clear();
1319 }
1320 
1321 void
initialize(EventHandler * handler)1322 Selector::initialize(EventHandler* handler)
1323 {
1324     Lock sync(*this);
1325     _wrappers[handler] = new EventHandlerWrapper(handler, *this);
1326 }
1327 
1328 void
update(EventHandler * handler,SocketOperation remove,SocketOperation add)1329 Selector::update(EventHandler* handler, SocketOperation remove, SocketOperation add)
1330 {
1331     Lock sync(*this);
1332     const EventHandlerWrapperPtr& wrapper = _wrappers[handler];
1333     if(wrapper->update(remove, add))
1334     {
1335         _changes.insert(wrapper);
1336         notify();
1337     }
1338 }
1339 
1340 void
enable(EventHandler * handler,SocketOperation op)1341 Selector::enable(EventHandler* handler, SocketOperation op)
1342 {
1343     Lock sync(*this);
1344     if(!(handler->_disabled & op))
1345     {
1346         return;
1347     }
1348     handler->_disabled = static_cast<SocketOperation>(handler->_disabled & ~op);
1349 
1350     if(handler->_registered & op)
1351     {
1352         _wrappers[handler]->checkReady();
1353     }
1354 }
1355 
1356 void
disable(EventHandler * handler,SocketOperation op)1357 Selector::disable(EventHandler* handler, SocketOperation op)
1358 {
1359     Lock sync(*this);
1360     if(handler->_disabled & op)
1361     {
1362         return;
1363     }
1364     handler->_disabled = static_cast<SocketOperation>(handler->_disabled | op);
1365 }
1366 
1367 bool
finish(EventHandler * handler,bool closeNow)1368 Selector::finish(EventHandler* handler, bool closeNow)
1369 {
1370     Lock sync(*this);
1371     std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
1372     assert(p != _wrappers.end());
1373     EventHandlerWrapperPtr wrapper = p->second;
1374     if(wrapper->finish())
1375     {
1376         _changes.insert(wrapper);
1377         notify();
1378     }
1379     _wrappers.erase(p);
1380     return closeNow;
1381 }
1382 
1383 void
ready(EventHandler * handler,SocketOperation status,bool value)1384 Selector::ready(EventHandler* handler, SocketOperation status, bool value)
1385 {
1386     if(((handler->_ready & status) != 0) == value)
1387     {
1388         return; // Nothing to do if ready state already correctly set.
1389     }
1390 
1391     if(value)
1392     {
1393         handler->_ready = static_cast<SocketOperation>(handler->_ready | status);
1394     }
1395     else
1396     {
1397         handler->_ready = static_cast<SocketOperation>(handler->_ready & ~status);
1398     }
1399 
1400     Lock sync(*this);
1401     std::map<EventHandler*, EventHandlerWrapperPtr>::iterator p = _wrappers.find(handler);
1402     assert(p != _wrappers.end());
1403     p->second->checkReady();
1404 }
1405 
1406 void
startSelect()1407 Selector::startSelect()
1408 {
1409     Lock sync(*this);
1410 
1411     //
1412     // Re-enable callbacks for previously selected handlers.
1413     //
1414     vector<pair<EventHandlerWrapperPtr, SocketOperation> >::const_iterator p;
1415     for(p = _selectedHandlers.begin(); p != _selectedHandlers.end(); ++p)
1416     {
1417         if(p->first->checkReady())
1418         {
1419             _changes.insert(p->first);
1420         }
1421     }
1422     _selectedHandlers.clear();
1423 }
1424 
1425 void
finishSelect(std::vector<std::pair<EventHandler *,SocketOperation>> & handlers)1426 Selector::finishSelect(std::vector<std::pair<EventHandler*, SocketOperation> >& handlers)
1427 {
1428     Lock sync(*this);
1429     handlers.clear();
1430     for(set<EventHandlerWrapperPtr>::const_iterator p = _readyHandlers.begin(); p != _readyHandlers.end(); ++p)
1431     {
1432         SocketOperation op = (*p)->readyOp();
1433         if(op)
1434         {
1435             _selectedHandlers.push_back(pair<EventHandlerWrapperPtr, SocketOperation>(*p, op));
1436             handlers.push_back(pair<EventHandler*, SocketOperation>((*p)->_handler.get(), op));
1437         }
1438     }
1439     _readyHandlers.clear();
1440 }
1441 
1442 void
select(int timeout)1443 Selector::select(int timeout)
1444 {
1445     //
1446     // Wait for handlers to be ready.
1447     //
1448     Lock sync(*this);
1449     while(!_destroyed)
1450     {
1451         while(!_changes.empty())
1452         {
1453             CFRunLoopSourceSignal(_source.get());
1454             CFRunLoopWakeUp(_runLoop);
1455 
1456             wait();
1457         }
1458 
1459         if(_readyHandlers.empty())
1460         {
1461             if(timeout > 0)
1462             {
1463                 if(!timedWait(IceUtil::Time::seconds(timeout)))
1464                 {
1465                     break;
1466                 }
1467             }
1468             else
1469             {
1470                 wait();
1471             }
1472         }
1473 
1474         if(_changes.empty())
1475         {
1476             break;
1477         }
1478     }
1479 }
1480 
1481 void
processInterrupt()1482 Selector::processInterrupt()
1483 {
1484     Lock sync(*this);
1485     if(!_changes.empty())
1486     {
1487         for(set<EventHandlerWrapperPtr>::const_iterator p = _changes.begin(); p != _changes.end(); ++p)
1488         {
1489             (*p)->updateRunLoop();
1490         }
1491         _changes.clear();
1492         notify();
1493     }
1494     if(_destroyed)
1495     {
1496         CFRunLoopStop(_runLoop);
1497     }
1498 }
1499 
1500 void
run()1501 Selector::run()
1502 {
1503     {
1504         Lock sync(*this);
1505         _runLoop = CFRunLoopGetCurrent();
1506         notify();
1507     }
1508 
1509     CFRunLoopAddSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1510     CFRunLoopRun();
1511     CFRunLoopRemoveSource(CFRunLoopGetCurrent(), _source.get(), kCFRunLoopDefaultMode);
1512 }
1513 
1514 void
ready(EventHandlerWrapper * wrapper,SocketOperation op,int error)1515 Selector::ready(EventHandlerWrapper* wrapper, SocketOperation op, int error)
1516 {
1517     Lock sync(*this);
1518     wrapper->ready(op, error);
1519 }
1520 
1521 void
addReadyHandler(EventHandlerWrapper * wrapper)1522 Selector::addReadyHandler(EventHandlerWrapper* wrapper)
1523 {
1524     // Called from ready()
1525     _readyHandlers.insert(wrapper);
1526     if(_readyHandlers.size() == 1)
1527     {
1528         notify();
1529     }
1530 }
1531 
1532 #endif
1533