1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <Ice/ConnectionFactory.h>
6 #include <Ice/ConnectionI.h>
7 #include <Ice/Instance.h>
8 #include <Ice/LoggerUtil.h>
9 #include <Ice/TraceLevels.h>
10 #include <Ice/DefaultsAndOverrides.h>
11 #include <Ice/Properties.h>
12 #include <Ice/Transceiver.h>
13 #include <Ice/Connector.h>
14 #include <Ice/Acceptor.h>
15 #include <Ice/ThreadPool.h>
16 #include <Ice/ObjectAdapterI.h> // For getThreadPool().
17 #include <Ice/Reference.h>
18 #include <Ice/EndpointI.h>
19 #include <Ice/RouterInfo.h>
20 #include <Ice/LocalException.h>
21 #include <Ice/Functional.h>
22 #include <Ice/OutgoingAsync.h>
23 #include <Ice/CommunicatorI.h>
24 #include <IceUtil/Random.h>
25 #include <iterator>
26 
27 #if TARGET_OS_IPHONE != 0
28 namespace IceInternal
29 {
30 
31 bool registerForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&);
32 void unregisterForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&);
33 
34 }
35 #endif
36 
37 using namespace std;
38 using namespace Ice;
39 using namespace Ice::Instrumentation;
40 using namespace IceInternal;
41 
42 IceUtil::Shared* IceInternal::upCast(OutgoingConnectionFactory* p) { return p; }
43 
44 #ifndef ICE_CPP11_MAPPING
45 IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; }
46 #endif
47 
48 namespace
49 {
50 
51 #ifdef ICE_CPP11_MAPPING
52 template <typename Map> void
53 remove(Map& m, const typename Map::key_type& k, const typename Map::mapped_type& v)
54 {
55     auto pr = m.equal_range(k);
56     assert(pr.first != pr.second);
57     for(auto q = pr.first; q != pr.second; ++q)
58     {
59         if(q->second.get() == v.get())
60         {
61             m.erase(q);
62             return;
63         }
64     }
65     assert(false); // Nothing was removed which is an error.
66 }
67 
68 template<typename Map, typename Predicate> typename Map::mapped_type
69 find(const Map& m, const typename Map::key_type& k, Predicate predicate)
70 {
71     auto pr = m.equal_range(k);
72     for(auto q = pr.first; q != pr.second; ++q)
73     {
74         if(predicate(q->second))
75         {
76             return q->second;
77         }
78     }
79     return nullptr;
80 }
81 
82 #else
83 template <typename K, typename V> void
84 remove(multimap<K, V>& m, K k, V v)
85 {
86     pair<typename multimap<K, V>::iterator, typename multimap<K, V>::iterator> pr = m.equal_range(k);
87     assert(pr.first != pr.second);
88     for(typename multimap<K, V>::iterator q = pr.first; q != pr.second; ++q)
89     {
90         if(q->second.get() == v.get())
91         {
92             m.erase(q);
93             return;
94         }
95     }
96     assert(false); // Nothing was removed which is an error.
97 }
98 
99 template <typename K, typename V> ::IceInternal::Handle<V>
100 find(const multimap<K,::IceInternal::Handle<V> >& m,
101      K k,
102      const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate)
103 {
104     pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator,
105          typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = m.equal_range(k);
106     for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q)
107     {
108         if(predicate(q->second))
109         {
110             return q->second;
111         }
112     }
113     return IceInternal::Handle<V>();
114 }
115 #endif
116 
117 class StartAcceptor : public IceUtil::TimerTask
118 #ifdef ICE_CPP11_MAPPING
119                     , public std::enable_shared_from_this<StartAcceptor>
120 #endif
121 {
122 public:
123 
124     StartAcceptor(const IncomingConnectionFactoryPtr& factory, const InstancePtr& instance) :
125         _factory(factory), _instance(instance)
126     {
127     }
128 
129     void
130     runTimerTask()
131     {
132         try
133         {
134             _factory->startAcceptor();
135         }
136         catch(const Ice::Exception& ex)
137         {
138             Error out(_instance->initializationData().logger);
139             out << "acceptor creation failed:\n" << ex << '\n' << _factory->toString();
140             _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::seconds(1));
141         }
142     }
143 
144 private:
145 
146     IncomingConnectionFactoryPtr _factory;
147     InstancePtr _instance;
148 };
149 
150 #if TARGET_OS_IPHONE != 0
151 class FinishCall : public DispatchWorkItem
152 {
153 public:
154 
155     FinishCall(const IncomingConnectionFactoryPtr& factory) : _factory(factory)
156     {
157     }
158 
159     virtual void
160     run()
161     {
162         _factory->finish();
163     }
164 
165 private:
166 
167     const IncomingConnectionFactoryPtr _factory;
168 };
169 #endif
170 
171 }
172 
173 bool
174 IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const
175 {
176     return connector == other.connector;
177 }
178 
179 void
180 IceInternal::OutgoingConnectionFactory::destroy()
181 {
182     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
183 
184     if(_destroyed)
185     {
186         return;
187     }
188 
189 #ifdef ICE_CPP11_COMPILER
190     for(const auto& p : _connections)
191     {
192         p.second->destroy(ConnectionI::CommunicatorDestroyed);
193     }
194 #else
195     for_each(_connections.begin(), _connections.end(),
196              bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason>
197                      (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed));
198 #endif
199     _destroyed = true;
200     _communicator = 0;
201 
202     notifyAll();
203 }
204 
205 void
206 IceInternal::OutgoingConnectionFactory::updateConnectionObservers()
207 {
208     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
209 #ifdef ICE_CPP11_COMPILER
210     for(const auto& p : _connections)
211     {
212         p.second->updateObserver();
213     }
214 #else
215     for_each(_connections.begin(), _connections.end(),
216              Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::updateObserver));
217 #endif
218 }
219 
220 void
221 IceInternal::OutgoingConnectionFactory::waitUntilFinished()
222 {
223     multimap<ConnectorPtr, ConnectionIPtr> connections;
224 
225     {
226         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
227 
228         //
229         // First we wait until the factory is destroyed. We also wait
230         // until there are no pending connections anymore. Only then
231         // we can be sure the _connections contains all connections.
232         //
233         while(!_destroyed || !_pending.empty() || _pendingConnectCount > 0)
234         {
235             wait();
236         }
237 
238         //
239         // We want to wait until all connections are finished outside the
240         // thread synchronization.
241         //
242         connections = _connections;
243     }
244 
245 #ifdef ICE_CPP11_COMPILER
246     for(const auto& p : _connections)
247     {
248         p.second->waitUntilFinished();
249     }
250 #else
251     for_each(connections.begin(), connections.end(),
252              Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished));
253 #endif
254     {
255         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
256         // Ensure all the connections are finished and reapable at this point.
257         vector<Ice::ConnectionIPtr> cons;
258         _monitor->swapReapedConnections(cons);
259         assert(cons.size() == _connections.size());
260         cons.clear();
261         _connections.clear();
262         _connectionsByEndpoint.clear();
263     }
264 
265     //
266     // Must be destroyed outside the synchronization since this might block waiting for
267     // a timer task to complete.
268     //
269     _monitor->destroy();
270 }
271 
272 void
273 IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts,
274                                                bool hasMore,
275                                                Ice::EndpointSelectionType selType,
276                                                const CreateConnectionCallbackPtr& callback)
277 {
278     assert(!endpts.empty());
279 
280     //
281     // Apply the overrides.
282     //
283     vector<EndpointIPtr> endpoints = applyOverrides(endpts);
284 
285     //
286     // Try to find a connection to one of the given endpoints.
287     //
288     try
289     {
290         bool compress;
291         Ice::ConnectionIPtr connection = findConnection(endpoints, compress);
292         if(connection)
293         {
294             callback->setConnection(connection, compress);
295             return;
296         }
297     }
298     catch(const Ice::LocalException& ex)
299     {
300         callback->setException(ex);
301         return;
302     }
303 
304 #ifdef ICE_CPP11_MAPPING
305     auto cb = make_shared<ConnectCallback>(_instance, this, endpoints, hasMore, callback, selType);
306 #else
307     ConnectCallbackPtr cb = new ConnectCallback(_instance, this, endpoints, hasMore, callback, selType);
308 #endif
309     cb->getConnectors();
310 }
311 
312 void
313 IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo)
314 {
315     assert(routerInfo);
316     ObjectAdapterPtr adapter = routerInfo->getAdapter();
317     vector<EndpointIPtr> endpoints = routerInfo->getClientEndpoints(); // Must be called outside the synchronization
318 
319     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
320 
321     if(_destroyed)
322     {
323         throw CommunicatorDestroyedException(__FILE__, __LINE__);
324     }
325 
326     //
327     // Search for connections to the router's client proxy endpoints,
328     // and update the object adapter for such connections, so that
329     // callbacks from the router can be received over such
330     // connections.
331     //
332     for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
333     {
334         EndpointIPtr endpoint = *p;
335 
336         //
337         // Modify endpoints with overrides.
338         //
339         if(_instance->defaultsAndOverrides()->overrideTimeout)
340         {
341             endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
342         }
343 
344         //
345         // The Connection object does not take the compression flag of
346         // endpoints into account, but instead gets the information
347         // about whether messages should be compressed or not from
348         // other sources. In order to allow connection sharing for
349         // endpoints that differ in the value of the compression flag
350         // only, we always set the compression flag to false here in
351         // this connection factory.
352         //
353         endpoint = endpoint->compress(false);
354 
355         for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q = _connections.begin();
356             q != _connections.end(); ++q)
357         {
358             if(q->second->endpoint() == endpoint)
359             {
360                 q->second->setAdapter(adapter);
361             }
362         }
363     }
364 }
365 
366 void
367 IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter)
368 {
369     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
370 
371     if(_destroyed)
372     {
373         return;
374     }
375 
376     for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p)
377     {
378         if(p->second->getAdapter() == adapter)
379         {
380             p->second->setAdapter(0);
381         }
382     }
383 }
384 
385 void
386 IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
387                                                                 Ice::CompressBatch compress)
388 {
389     list<ConnectionIPtr> c;
390 
391     {
392         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
393         for(multimap<ConnectorPtr,  ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end();
394             ++p)
395         {
396             if(p->second->isActiveOrHolding())
397             {
398                 c.push_back(p->second);
399             }
400         }
401     }
402 
403     for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p)
404     {
405         try
406         {
407             outAsync->flushConnection(*p, compress);
408         }
409         catch(const LocalException&)
410         {
411             // Ignore.
412         }
413     }
414 }
415 
416 IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const CommunicatorPtr& communicator,
417                                                                   const InstancePtr& instance) :
418     _communicator(communicator),
419     _instance(instance),
420     _monitor(new FactoryACMMonitor(instance, instance->clientACM())),
421     _destroyed(false),
422     _pendingConnectCount(0)
423 {
424 }
425 
426 IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory()
427 {
428     assert(_destroyed);
429     assert(_connections.empty());
430     assert(_connectionsByEndpoint.empty());
431     assert(_pending.empty());
432     assert(_pendingConnectCount == 0);
433 }
434 
435 vector<EndpointIPtr>
436 IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr>& endpts)
437 {
438     DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
439     vector<EndpointIPtr> endpoints = endpts;
440     for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p)
441     {
442         //
443         // Modify endpoints with overrides.
444         //
445         if(defaultsAndOverrides->overrideTimeout)
446         {
447             *p = (*p)->timeout(defaultsAndOverrides->overrideTimeoutValue);
448         }
449     }
450     return endpoints;
451 }
452 
453 ConnectionIPtr
454 IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool& compress)
455 {
456     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
457     if(_destroyed)
458     {
459         throw CommunicatorDestroyedException(__FILE__, __LINE__);
460     }
461 
462     DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
463     assert(!endpoints.empty());
464     for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
465     {
466 #ifdef ICE_CPP11_MAPPING
467         auto connection = find(_connectionsByEndpoint, *p,
468                                [](const ConnectionIPtr& conn)
469                                {
470                                    return conn->isActiveOrHolding();
471                                });
472 #else
473         ConnectionIPtr connection = find(_connectionsByEndpoint, *p, Ice::constMemFun(&ConnectionI::isActiveOrHolding));
474 #endif
475         if(connection)
476         {
477             if(defaultsAndOverrides->overrideCompress)
478             {
479                 compress = defaultsAndOverrides->overrideCompressValue;
480             }
481             else
482             {
483                 compress = (*p)->compress();
484             }
485             return connection;
486         }
487     }
488     return 0;
489 }
490 
491 ConnectionIPtr
492 IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress)
493 {
494     // This must be called with the mutex locked.
495 
496     DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
497     for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
498     {
499         if(_pending.find(p->connector) != _pending.end())
500         {
501             continue;
502         }
503 
504 #ifdef ICE_CPP11_MAPPING
505         auto connection = find(_connections, p->connector,
506                                [](const ConnectionIPtr& conn)
507                                {
508                                    return conn->isActiveOrHolding();
509                                });
510 #else
511         ConnectionIPtr connection = find(_connections, p->connector, Ice::constMemFun(&ConnectionI::isActiveOrHolding));
512 #endif
513         if(connection)
514         {
515             if(defaultsAndOverrides->overrideCompress)
516             {
517                 compress = defaultsAndOverrides->overrideCompressValue;
518             }
519             else
520             {
521                     compress = p->endpoint->compress();
522             }
523             return connection;
524         }
525     }
526 
527     return 0;
528 }
529 
530 void
531 IceInternal::OutgoingConnectionFactory::incPendingConnectCount()
532 {
533     //
534     // Keep track of the number of pending connects. The outgoing connection factory
535     // waitUntilFinished() method waits for all the pending connects to terminate before
536     // to return. This ensures that the communicator client thread pool isn't destroyed
537     // too soon and will still be available to execute the ice_exception() callbacks for
538     // the asynchronous requests waiting on a connection to be established.
539     //
540 
541     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
542     if(_destroyed)
543     {
544         throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
545     }
546     ++_pendingConnectCount;
547 }
548 
549 void
550 IceInternal::OutgoingConnectionFactory::decPendingConnectCount()
551 {
552     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
553     --_pendingConnectCount;
554     assert(_pendingConnectCount >= 0);
555     if(_destroyed && _pendingConnectCount == 0)
556     {
557         notifyAll();
558     }
559 }
560 
561 ConnectionIPtr
562 IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors,
563                                                       const ConnectCallbackPtr& cb,
564                                                       bool& compress)
565 {
566     {
567         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
568         if(_destroyed)
569         {
570             throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
571         }
572 
573         //
574         // Reap closed connections
575         //
576         vector<Ice::ConnectionIPtr> cons;
577         _monitor->swapReapedConnections(cons);
578         for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
579         {
580             remove(_connections, (*p)->connector(), *p);
581             remove(_connectionsByEndpoint, (*p)->endpoint(), *p);
582             remove(_connectionsByEndpoint, (*p)->endpoint()->compress(true), *p);
583         }
584 
585         //
586         // Try to get the connection. We may need to wait for other threads to
587         // finish if one of them is currently establishing a connection to one
588         // of our connectors.
589         //
590         while(true)
591         {
592             if(_destroyed)
593             {
594                 throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
595             }
596 
597             //
598             // Search for a matching connection. If we find one, we're done.
599             //
600             Ice::ConnectionIPtr connection = findConnection(connectors, compress);
601             if(connection)
602             {
603                 return connection;
604             }
605 
606             //
607             // Determine whether another thread/request is currently attempting to connect to
608             // one of our endpoints; if so we wait until it's done.
609             //
610             if(addToPending(cb, connectors))
611             {
612                 //
613                 // If a callback is not specified we wait until another thread notifies us about a
614                 // change to the pending list. Otherwise, if a callback is provided we're done:
615                 // when the pending list changes the callback will be notified and will try to
616                 // get the connection again.
617                 //
618                 if(!cb)
619                 {
620                     wait();
621                 }
622                 else
623                 {
624                     return 0;
625                 }
626             }
627             else
628             {
629                 //
630                 // If no thread is currently establishing a connection to one of our connectors,
631                 // we get out of this loop and start the connection establishment to one of the
632                 // given connectors.
633                 //
634                 break;
635             }
636         }
637     }
638 
639     //
640     // At this point, we're responsible for establishing the connection to one of
641     // the given connectors. If it's a non-blocking connect, calling nextConnector
642     // will start the connection establishment. Otherwise, we return null to get
643     // the caller to establish the connection.
644     //
645     if(cb)
646     {
647         cb->nextConnector();
648     }
649 
650     return 0;
651 }
652 
653 ConnectionIPtr
654 IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci)
655 {
656     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
657     assert(_pending.find(ci.connector) != _pending.end() && transceiver);
658 
659     //
660     // Create and add the connection to the connection map. Adding the connection to the map
661     // is necessary to support the interruption of the connection initialization and validation
662     // in case the communicator is destroyed.
663     //
664     Ice::ConnectionIPtr connection;
665     try
666     {
667         if(_destroyed)
668         {
669             throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__);
670         }
671 
672         connection = ConnectionI::create(_communicator, _instance, _monitor, transceiver, ci.connector,
673                                          ci.endpoint->compress(false), ICE_NULLPTR);
674     }
675     catch(const Ice::LocalException&)
676     {
677         try
678         {
679             transceiver->close();
680         }
681         catch(const Ice::LocalException&)
682         {
683             // Ignore
684         }
685         throw;
686     }
687 
688     _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection));
689     _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection));
690     _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true),
691                                                                            connection));
692     return connection;
693 }
694 
695 void
696 IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors,
697                                                             const ConnectorInfo& ci,
698                                                             const ConnectionIPtr& connection,
699                                                             const ConnectCallbackPtr& cb)
700 {
701     set<ConnectCallbackPtr> connectionCallbacks;
702     if(cb)
703     {
704         connectionCallbacks.insert(cb);
705     }
706 
707     set<ConnectCallbackPtr> callbacks;
708     {
709         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
710         for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
711         {
712             map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
713             if(q != _pending.end())
714             {
715                 for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
716                 {
717                     if((*r)->hasConnector(ci))
718                     {
719                         connectionCallbacks.insert(*r);
720                     }
721                     else
722                     {
723                         callbacks.insert(*r);
724                     }
725                 }
726                 _pending.erase(q);
727             }
728         }
729 
730         for(set<ConnectCallbackPtr>::iterator r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r)
731         {
732             (*r)->removeFromPending();
733             callbacks.erase(*r);
734         }
735         for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r)
736         {
737             (*r)->removeFromPending();
738         }
739         notifyAll();
740     }
741 
742     bool compress;
743     DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides();
744     if(defaultsAndOverrides->overrideCompress)
745     {
746         compress = defaultsAndOverrides->overrideCompressValue;
747     }
748     else
749     {
750         compress = ci.endpoint->compress();
751     }
752 
753     for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
754     {
755         (*p)->getConnection();
756     }
757     for(set<ConnectCallbackPtr>::const_iterator p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p)
758     {
759         (*p)->setConnection(connection, compress);
760     }
761 }
762 
763 void
764 IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors,
765                                                             const Ice::LocalException& ex,
766                                                             const ConnectCallbackPtr& cb)
767 {
768     set<ConnectCallbackPtr> failedCallbacks;
769     if(cb)
770     {
771         failedCallbacks.insert(cb);
772     }
773 
774     set<ConnectCallbackPtr> callbacks;
775     {
776         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
777         for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
778         {
779             map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
780             if(q != _pending.end())
781             {
782                 for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r)
783                 {
784                     if((*r)->removeConnectors(connectors))
785                     {
786                         failedCallbacks.insert(*r);
787                     }
788                     else
789                     {
790                         callbacks.insert(*r);
791                     }
792                 }
793                 _pending.erase(q);
794             }
795         }
796 
797         for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r)
798         {
799             assert(failedCallbacks.find(*r) == failedCallbacks.end());
800             (*r)->removeFromPending();
801         }
802         notifyAll();
803     }
804 
805     for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
806     {
807         (*p)->getConnection();
808     }
809     for(set<ConnectCallbackPtr>::const_iterator p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p)
810     {
811         (*p)->setException(ex);
812     }
813 }
814 
815 bool
816 IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb,
817                                                      const vector<ConnectorInfo>& connectors)
818 {
819     //
820     // Add the callback to each connector pending list.
821     //
822     bool found = false;
823     for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
824     {
825         map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
826         if(q != _pending.end())
827         {
828             found = true;
829             if(cb)
830             {
831                 q->second.insert(cb);
832             }
833         }
834     }
835 
836     if(found)
837     {
838         return true;
839     }
840 
841     //
842     // If there's no pending connection for the given connectors, we're
843     // responsible for its establishment. We add empty pending lists,
844     // other callbacks to the same connectors will be queued.
845     //
846     for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r)
847     {
848         if(_pending.find(r->connector) == _pending.end())
849         {
850             _pending.insert(pair<ConnectorPtr, set<ConnectCallbackPtr> >(r->connector, set<ConnectCallbackPtr>()));
851         }
852     }
853     return false;
854 }
855 
856 void
857 IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb,
858                                                           const vector<ConnectorInfo>& connectors)
859 {
860     for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
861     {
862         map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector);
863         if(q != _pending.end())
864         {
865             q->second.erase(cb);
866         }
867     }
868 }
869 
870 void
871 IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore)
872 {
873     TraceLevelsPtr traceLevels = _instance->traceLevels();
874     if(traceLevels->network >= 2)
875     {
876         Trace out(_instance->initializationData().logger, traceLevels->networkCat);
877 
878         out << "couldn't resolve endpoint host";
879         if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
880         {
881             out << "\n";
882         }
883         else
884         {
885             if(hasMore)
886             {
887                 out << ", trying next endpoint\n";
888             }
889             else
890             {
891                 out << " and no more endpoints to try\n";
892             }
893         }
894         out << ex;
895     }
896 }
897 
898 void
899 IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalException& ex, bool hasMore)
900 {
901     TraceLevelsPtr traceLevels = _instance->traceLevels();
902     if(traceLevels->network >= 2)
903     {
904         Trace out(_instance->initializationData().logger, traceLevels->networkCat);
905 
906         out << "connection to endpoint failed";
907         if(dynamic_cast<const CommunicatorDestroyedException*>(&ex))
908         {
909             out << "\n";
910         }
911         else
912         {
913             if(hasMore)
914             {
915                 out << ", trying next endpoint\n";
916             }
917             else
918             {
919                 out << " and no more endpoints to try\n";
920             }
921         }
922         out << ex;
923     }
924 }
925 
926 IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const InstancePtr& instance,
927                                                                          const OutgoingConnectionFactoryPtr& factory,
928                                                                          const vector<EndpointIPtr>& endpoints,
929                                                                          bool hasMore,
930                                                                          const CreateConnectionCallbackPtr& cb,
931                                                                          Ice::EndpointSelectionType selType) :
932     _instance(instance),
933     _factory(factory),
934     _endpoints(endpoints),
935     _hasMore(hasMore),
936     _callback(cb),
937     _selType(selType)
938 {
939     _endpointsIter = _endpoints.begin();
940 }
941 
942 //
943 // Methods from ConnectionI.StartCallback
944 //
945 void
946 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection)
947 {
948     if(_observer)
949     {
950         _observer->detach();
951     }
952 
953     connection->activate();
954     _factory->finishGetConnection(_connectors, *_iter, connection, ICE_SHARED_FROM_THIS);
955 }
956 
957 void
958 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& /*connection*/,
959                                                                                const LocalException& ex)
960 {
961     assert(_iter != _connectors.end());
962     if(connectionStartFailedImpl(ex))
963     {
964         nextConnector();
965     }
966 }
967 
968 //
969 // Methods from EndpointI_connectors
970 //
971 void
972 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector<ConnectorPtr>& connectors)
973 {
974     for(vector<ConnectorPtr>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
975     {
976         _connectors.push_back(ConnectorInfo(*p, *_endpointsIter));
977     }
978 
979     if(++_endpointsIter != _endpoints.end())
980     {
981         nextEndpoint();
982     }
983     else
984     {
985         assert(!_connectors.empty());
986 
987         //
988         // We now have all the connectors for the given endpoints. We can try to obtain the
989         // connection.
990         //
991         _iter = _connectors.begin();
992         getConnection();
993     }
994 }
995 
996 void
997 IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::LocalException& ex)
998 {
999     _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1);
1000     if(++_endpointsIter != _endpoints.end())
1001     {
1002         nextEndpoint();
1003     }
1004     else if(!_connectors.empty())
1005     {
1006         //
1007         // We now have all the connectors for the given endpoints. We can try to obtain the
1008         // connection.
1009         //
1010         _iter = _connectors.begin();
1011         getConnection();
1012     }
1013     else
1014     {
1015         _callback->setException(ex);
1016         _factory->decPendingConnectCount(); // Must be called last.
1017     }
1018 }
1019 
1020 void
1021 IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnectors()
1022 {
1023     try
1024     {
1025         //
1026         // Notify the factory that there's an async connect pending. This is necessary
1027         // to prevent the outgoing connection factory to be destroyed before all the
1028         // pending asynchronous connects are finished.
1029         //
1030         _factory->incPendingConnectCount();
1031     }
1032     catch(const Ice::LocalException& ex)
1033     {
1034         _callback->setException(ex);
1035         return;
1036     }
1037 
1038     nextEndpoint();
1039 }
1040 
1041 void
1042 IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint()
1043 {
1044     try
1045     {
1046         assert(_endpointsIter != _endpoints.end());
1047         (*_endpointsIter)->connectors_async(_selType, ICE_SHARED_FROM_THIS);
1048 
1049     }
1050     catch(const Ice::LocalException& ex)
1051     {
1052         exception(ex);
1053     }
1054 }
1055 
1056 void
1057 IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection()
1058 {
1059     try
1060     {
1061         //
1062         // If all the connectors have been created, we ask the factory to get a
1063         // connection.
1064         //
1065         bool compress;
1066         Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, ICE_SHARED_FROM_THIS, compress);
1067         if(!connection)
1068         {
1069             //
1070             // A null return value from getConnection indicates that the connection
1071             // is being established and that everthing has been done to ensure that
1072             // the callback will be notified when the connection establishment is
1073             // done or that the callback already obtain the connection.
1074             //
1075             return;
1076         }
1077 
1078         _callback->setConnection(connection, compress);
1079         _factory->decPendingConnectCount(); // Must be called last.
1080     }
1081     catch(const Ice::LocalException& ex)
1082     {
1083         _callback->setException(ex);
1084         _factory->decPendingConnectCount(); // Must be called last.
1085     }
1086 }
1087 
1088 void
1089 IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector()
1090 {
1091     while(true)
1092     {
1093         try
1094         {
1095             const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer;
1096             if(obsv)
1097             {
1098                 _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString());
1099                 if(_observer)
1100                 {
1101                     _observer->attach();
1102                 }
1103             }
1104 
1105             assert(_iter != _connectors.end());
1106 
1107             if(_instance->traceLevels()->network >= 2)
1108             {
1109                 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1110                 out << "trying to establish " << _iter->endpoint->protocol() << " connection to "
1111                     << _iter->connector->toString();
1112             }
1113             Ice::ConnectionIPtr connection = _factory->createConnection(_iter->connector->connect(), *_iter);
1114             connection->start(ICE_SHARED_FROM_THIS);
1115         }
1116         catch(const Ice::LocalException& ex)
1117         {
1118             if(_instance->traceLevels()->network >= 2)
1119             {
1120                 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1121                 out << "failed to establish " << _iter->endpoint->protocol() << " connection to "
1122                     << _iter->connector->toString() << "\n" << ex;
1123             }
1124 
1125             if(connectionStartFailedImpl(ex))
1126             {
1127                 continue; // More connectors to try, continue.
1128             }
1129         }
1130         break;
1131     }
1132 }
1133 
1134 void
1135 IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection,
1136                                                                        bool compress)
1137 {
1138     //
1139     // Callback from the factory: the connection to one of the callback
1140     // connectors has been established.
1141     //
1142     _callback->setConnection(connection, compress);
1143     _factory->decPendingConnectCount(); // Must be called last.
1144 }
1145 
1146 void
1147 IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex)
1148 {
1149     //
1150     // Callback from the factory: connection establishment failed.
1151     //
1152     _callback->setException(ex);
1153     _factory->decPendingConnectCount(); // Must be called last.
1154 }
1155 
1156 bool
1157 IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci)
1158 {
1159     return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end();
1160 }
1161 
1162 bool
1163 IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors)
1164 {
1165     //
1166     // Callback from the factory: connecting to the given connectors
1167     // failed, we remove the connectors and return true if there's
1168     // no more connectors left to try.
1169     //
1170     for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p)
1171     {
1172         _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end());
1173     }
1174     return _connectors.empty();
1175 }
1176 
1177 void
1178 IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending()
1179 {
1180     _factory->removeFromPending(ICE_SHARED_FROM_THIS, _connectors);
1181 }
1182 
1183 bool
1184 IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const
1185 {
1186     return this < &rhs;
1187 }
1188 
1189 bool
1190 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedImpl(const Ice::LocalException& ex)
1191 {
1192     if(_observer)
1193     {
1194         _observer->failed(ex.ice_id());
1195         _observer->detach();
1196     }
1197 
1198     _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1);
1199     if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue.
1200     {
1201         _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS);
1202     }
1203     else if(++_iter != _connectors.end()) // Try the next connector.
1204     {
1205         return true;
1206     }
1207     else
1208     {
1209         _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS);
1210     }
1211     return false;
1212 }
1213 
1214 void
1215 IceInternal::IncomingConnectionFactory::activate()
1216 {
1217     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1218     setState(StateActive);
1219 }
1220 
1221 void
1222 IceInternal::IncomingConnectionFactory::hold()
1223 {
1224     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1225     setState(StateHolding);
1226 }
1227 
1228 void
1229 IceInternal::IncomingConnectionFactory::destroy()
1230 {
1231     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1232     setState(StateClosed);
1233 }
1234 
1235 void
1236 IceInternal::IncomingConnectionFactory::updateConnectionObservers()
1237 {
1238     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1239 #ifdef ICE_CPP11_COMPILER
1240     for(const auto& conn : _connections)
1241     {
1242         conn->updateObserver();
1243     }
1244 #else
1245     for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::updateObserver));
1246 #endif
1247 }
1248 
1249 void
1250 IceInternal::IncomingConnectionFactory::waitUntilHolding() const
1251 {
1252     set<ConnectionIPtr> connections;
1253 
1254     {
1255         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1256 
1257         //
1258         // First we wait until the connection factory itself is in holding
1259         // state.
1260         //
1261         while(_state < StateHolding)
1262         {
1263             wait();
1264         }
1265 
1266         //
1267         // We want to wait until all connections are in holding state
1268         // outside the thread synchronization.
1269         //
1270         connections = _connections;
1271     }
1272 
1273     //
1274     // Now we wait until each connection is in holding state.
1275     //
1276 #ifdef ICE_CPP11_COMPILER
1277     for(const auto& conn : connections)
1278     {
1279         conn->waitUntilHolding();
1280     }
1281 #else
1282     for_each(connections.begin(), connections.end(), Ice::constVoidMemFun(&ConnectionI::waitUntilHolding));
1283 #endif
1284 }
1285 
1286 void
1287 IceInternal::IncomingConnectionFactory::waitUntilFinished()
1288 {
1289     set<ConnectionIPtr> connections;
1290     {
1291         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1292 
1293         //
1294         // First we wait until the factory is destroyed. If we are using
1295         // an acceptor, we also wait for it to be closed.
1296         //
1297         while(_state != StateFinished)
1298         {
1299             wait();
1300         }
1301 
1302         //
1303         // Clear the OA. See bug 1673 for the details of why this is necessary.
1304         //
1305         _adapter = 0;
1306 
1307         // We want to wait until all connections are finished outside the
1308         // thread synchronization.
1309         //
1310         connections = _connections;
1311     }
1312 
1313 #ifdef ICE_CPP11_COMPILER
1314     for(const auto& conn : connections)
1315     {
1316         conn->waitUntilFinished();
1317     }
1318 #else
1319     for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished));
1320 #endif
1321 
1322     {
1323         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1324         if(_transceiver)
1325         {
1326             assert(_connections.size() <= 1); // The connection isn't monitored or reaped.
1327         }
1328         else
1329         {
1330             // Ensure all the connections are finished and reapable at this point.
1331             vector<Ice::ConnectionIPtr> cons;
1332             _monitor->swapReapedConnections(cons);
1333             assert(cons.size() == _connections.size());
1334             cons.clear();
1335         }
1336         _connections.clear();
1337     }
1338 
1339     //
1340     // Must be destroyed outside the synchronization since this might block waiting for
1341     // a timer task to complete.
1342     //
1343     _monitor->destroy();
1344 }
1345 
1346 bool
1347 IceInternal::IncomingConnectionFactory::isLocal(const EndpointIPtr& endpoint) const
1348 {
1349     if(_publishedEndpoint && endpoint->equivalent(_publishedEndpoint))
1350     {
1351         return true;
1352     }
1353     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1354     return endpoint->equivalent(_endpoint);
1355 }
1356 
1357 EndpointIPtr
1358 IceInternal::IncomingConnectionFactory::endpoint() const
1359 {
1360     if(_publishedEndpoint)
1361     {
1362         return _publishedEndpoint;
1363     }
1364     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1365     return _endpoint;
1366 }
1367 
1368 list<ConnectionIPtr>
1369 IceInternal::IncomingConnectionFactory::connections() const
1370 {
1371     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1372 
1373     list<ConnectionIPtr> result;
1374 
1375     //
1376     // Only copy connections which have not been destroyed.
1377     //
1378 #ifdef ICE_CPP11_COMPILER
1379     remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result),
1380                    [](const ConnectionIPtr& conn)
1381                    {
1382                        return !conn->isActiveOrHolding();
1383                    });
1384 #else
1385     remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result),
1386                    not1(Ice::constMemFun(&ConnectionI::isActiveOrHolding)));
1387 #endif
1388     return result;
1389 }
1390 
1391 void
1392 IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync,
1393                                                                 Ice::CompressBatch compress)
1394 {
1395     list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here.
1396 
1397     for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p)
1398     {
1399         try
1400         {
1401             outAsync->flushConnection(*p, compress);
1402         }
1403         catch(const LocalException&)
1404         {
1405             // Ignore.
1406         }
1407     }
1408 }
1409 
1410 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP)
1411 bool
1412 IceInternal::IncomingConnectionFactory::startAsync(SocketOperation)
1413 {
1414     assert(_acceptor);
1415     if(_state >= StateClosed)
1416     {
1417         return false;
1418     }
1419 
1420     try
1421     {
1422         _acceptor->startAccept();
1423     }
1424     catch(const Ice::LocalException& ex)
1425     {
1426         ICE_SET_EXCEPTION_FROM_CLONE(_acceptorException, ex.ice_clone());
1427         _acceptor->getNativeInfo()->completed(SocketOperationRead);
1428     }
1429     return true;
1430 }
1431 
1432 bool
1433 IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation)
1434 {
1435     assert(_acceptor);
1436     try
1437     {
1438         if(_acceptorException)
1439         {
1440             _acceptorException->ice_throw();
1441         }
1442         _acceptor->finishAccept();
1443     }
1444     catch(const LocalException& ex)
1445     {
1446         _acceptorException.reset(ICE_NULLPTR);
1447 
1448         Error out(_instance->initializationData().logger);
1449         out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString();
1450         if(_acceptorStarted)
1451         {
1452             _acceptorStarted = false;
1453             if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
1454             {
1455                 closeAcceptor();
1456             }
1457         }
1458     }
1459     return _state < StateClosed;
1460 }
1461 #endif
1462 
1463 void
1464 IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current)
1465 {
1466     ConnectionIPtr connection;
1467 
1468     ThreadPoolMessage<IncomingConnectionFactory> msg(current, *this);
1469 
1470     {
1471         IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1472 
1473         ThreadPoolMessage<IncomingConnectionFactory>::IOScope io(msg);
1474         if(!io)
1475         {
1476             return;
1477         }
1478 
1479         if(_state >= StateClosed)
1480         {
1481             return;
1482         }
1483         else if(_state == StateHolding)
1484         {
1485             IceUtil::ThreadControl::yield();
1486             return;
1487         }
1488 
1489         //
1490         // Reap closed connections
1491         //
1492         vector<Ice::ConnectionIPtr> cons;
1493         _monitor->swapReapedConnections(cons);
1494         for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p)
1495         {
1496             _connections.erase(*p);
1497         }
1498 
1499         if(!_acceptorStarted)
1500         {
1501             return;
1502         }
1503 
1504         //
1505         // Now accept a new connection.
1506         //
1507         TransceiverPtr transceiver;
1508         try
1509         {
1510             transceiver = _acceptor->accept();
1511 
1512             if(_instance->traceLevels()->network >= 2)
1513             {
1514                 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1515                 out << "trying to accept " << _endpoint->protocol() << " connection\n" << transceiver->toString();
1516             }
1517         }
1518         catch(const SocketException& ex)
1519         {
1520             if(noMoreFds(ex.error))
1521             {
1522                 Error out(_instance->initializationData().logger);
1523                 out << "can't accept more connections:\n" << ex << '\n' << _acceptor->toString();
1524 
1525                 assert(_acceptorStarted);
1526                 _acceptorStarted = false;
1527                 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
1528                 {
1529                     closeAcceptor();
1530                 }
1531             }
1532 
1533             // Ignore socket exceptions.
1534             return;
1535         }
1536         catch(const LocalException& ex)
1537         {
1538             // Warn about other Ice local exceptions.
1539             if(_warn)
1540             {
1541                 Warning out(_instance->initializationData().logger);
1542                 out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
1543             }
1544             return;
1545         }
1546 
1547         assert(transceiver);
1548 
1549         try
1550         {
1551             connection = ConnectionI::create(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0,
1552                                              _endpoint, _adapter);
1553         }
1554         catch(const LocalException& ex)
1555         {
1556             try
1557             {
1558                 transceiver->close();
1559             }
1560             catch(const Ice::LocalException&)
1561             {
1562                 // Ignore.
1563             }
1564 
1565             if(_warn)
1566             {
1567                 Warning out(_instance->initializationData().logger);
1568                 out << "connection exception:\n" << ex << '\n' << _acceptor->toString();
1569             }
1570             return;
1571         }
1572 
1573         _connections.insert(connection);
1574     }
1575 
1576     assert(connection);
1577 
1578     connection->start(ICE_SHARED_FROM_THIS);
1579 }
1580 
1581 void
1582 IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close)
1583 {
1584     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1585     if(_state < StateClosed)
1586     {
1587         if(close)
1588         {
1589             closeAcceptor();
1590         }
1591 
1592         //
1593         // If the acceptor hasn't been explicitly stopped (which is the case if the acceptor got closed
1594         // because of an unexpected error), try to restart the acceptor in 1 second.
1595         //
1596         if(!_acceptorStopped)
1597         {
1598             _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS, _instance),
1599                                          IceUtil::Time::seconds(1));
1600         }
1601         return;
1602     }
1603 
1604     assert(_state >= StateClosed);
1605     setState(StateFinished);
1606 
1607     if(close)
1608     {
1609         closeAcceptor();
1610     }
1611 
1612 #if TARGET_OS_IPHONE != 0
1613     sync.release();
1614     finish();
1615 #endif
1616 }
1617 
1618 #if TARGET_OS_IPHONE != 0
1619 void
1620 IceInternal::IncomingConnectionFactory::finish()
1621 {
1622     unregisterForBackgroundNotification(ICE_SHARED_FROM_THIS);
1623 }
1624 #endif
1625 
1626 string
1627 IceInternal::IncomingConnectionFactory::toString() const
1628 {
1629     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1630     if(_transceiver)
1631     {
1632         return _transceiver->toString();
1633     }
1634     else if(_acceptor)
1635     {
1636         return _acceptor->toString();
1637     }
1638     else
1639     {
1640         return string();
1641     }
1642 }
1643 
1644 NativeInfoPtr
1645 IceInternal::IncomingConnectionFactory::getNativeInfo()
1646 {
1647     if(_transceiver)
1648     {
1649         return _transceiver->getNativeInfo();
1650     }
1651     else if(_acceptor)
1652     {
1653         return _acceptor->getNativeInfo();
1654     }
1655     else
1656     {
1657         return 0;
1658     }
1659 }
1660 
1661 void
1662 IceInternal::IncomingConnectionFactory::connectionStartCompleted(const Ice::ConnectionIPtr& connection)
1663 {
1664     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1665 
1666     //
1667     // Initialy, connections are in the holding state. If the factory is active
1668     // we activate the connection.
1669     //
1670     if(_state == StateActive)
1671     {
1672         connection->activate();
1673     }
1674 }
1675 
1676 void
1677 IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::ConnectionIPtr& /*connection*/,
1678                                                               const Ice::LocalException&)
1679 {
1680     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1681     if(_state >= StateClosed)
1682     {
1683         return;
1684     }
1685 
1686     //
1687     // Do not warn about connection exceptions here. The connection is not yet validated.
1688     //
1689 }
1690 
1691 //
1692 // COMPILERFIX: The ConnectionFactory setup is broken out into a separate initialize
1693 // function because when it was part of the constructor C++Builder 2007 apps would
1694 // crash if an execption was thrown from any calls within the constructor.
1695 //
1696 IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance,
1697                                                                   const EndpointIPtr& endpoint,
1698                                                                   const EndpointIPtr& publishedEndpoint,
1699                                                                   const ObjectAdapterIPtr& adapter) :
1700     _instance(instance),
1701     _monitor(new FactoryACMMonitor(instance, dynamic_cast<ObjectAdapterI*>(adapter.get())->getACM())),
1702     _endpoint(endpoint),
1703     _publishedEndpoint(publishedEndpoint),
1704     _acceptorStarted(false),
1705     _acceptorStopped(false),
1706     _adapter(adapter),
1707     _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0),
1708     _state(StateHolding)
1709 {
1710 }
1711 
1712 void
1713 IceInternal::IncomingConnectionFactory::startAcceptor()
1714 {
1715     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1716     if(_state >= StateClosed || _acceptorStarted)
1717     {
1718         return;
1719     }
1720 
1721     _acceptorStopped = false;
1722     createAcceptor();
1723 }
1724 
1725 void
1726 IceInternal::IncomingConnectionFactory::stopAcceptor()
1727 {
1728     IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
1729     if(_state >= StateClosed || !_acceptorStarted)
1730     {
1731         return;
1732     }
1733 
1734     _acceptorStopped = true;
1735     _acceptorStarted = false;
1736     if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
1737     {
1738         closeAcceptor();
1739     }
1740 }
1741 
1742 void
1743 IceInternal::IncomingConnectionFactory::initialize()
1744 {
1745     if(_instance->defaultsAndOverrides()->overrideTimeout)
1746     {
1747         _endpoint = _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue);
1748     }
1749 
1750     if(_instance->defaultsAndOverrides()->overrideCompress)
1751     {
1752         _endpoint = _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue);
1753     }
1754     try
1755     {
1756         const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver();
1757         if(_transceiver)
1758         {
1759             if(_instance->traceLevels()->network >= 2)
1760             {
1761                 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1762                 out << "attempting to bind to " << _endpoint->protocol() << " socket\n" << _transceiver->toString();
1763             }
1764             const_cast<EndpointIPtr&>(_endpoint) = _transceiver->bind();
1765             ConnectionIPtr connection(ConnectionI::create(_adapter->getCommunicator(), _instance, 0, _transceiver, 0,
1766                                                           _endpoint, _adapter));
1767             connection->start(0);
1768             _connections.insert(connection);
1769         }
1770         else
1771         {
1772 #if TARGET_OS_IPHONE != 0
1773             //
1774             // The notification center will call back on the factory to
1775             // start the acceptor if necessary.
1776             //
1777             registerForBackgroundNotification(ICE_SHARED_FROM_THIS);
1778 #else
1779             createAcceptor();
1780 #endif
1781         }
1782     }
1783     catch(const Ice::Exception&)
1784     {
1785         if(_transceiver)
1786         {
1787             try
1788             {
1789                 _transceiver->close();
1790             }
1791             catch(const Ice::LocalException&)
1792             {
1793                 // Ignore
1794             }
1795         }
1796 
1797         _state = StateFinished;
1798         _monitor->destroy();
1799         _connections.clear();
1800         throw;
1801     }
1802 }
1803 
1804 IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory()
1805 {
1806     assert(_state == StateFinished);
1807     assert(_connections.empty());
1808 }
1809 
1810 void
1811 IceInternal::IncomingConnectionFactory::setState(State state)
1812 {
1813     if(_state == state) // Don't switch twice.
1814     {
1815         return;
1816     }
1817 
1818     switch(state)
1819     {
1820         case StateActive:
1821         {
1822             if(_state != StateHolding) // Can only switch from holding to active.
1823             {
1824                 return;
1825             }
1826             if(_acceptor)
1827             {
1828                 if(_instance->traceLevels()->network >= 1)
1829                 {
1830                     Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1831                     out << "accepting " << _endpoint->protocol() << " connections at " << _acceptor->toString();
1832                 }
1833                 _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead);
1834             }
1835 #ifdef ICE_CPP11_COMPILER
1836             for(const auto& conn : _connections)
1837             {
1838                 conn->activate();
1839             }
1840 #else
1841             for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate));
1842 #endif
1843             break;
1844         }
1845 
1846         case StateHolding:
1847         {
1848             if(_state != StateActive) // Can only switch from active to holding.
1849             {
1850                 return;
1851             }
1852             if(_acceptor)
1853             {
1854                 if(_instance->traceLevels()->network >= 1)
1855                 {
1856                     Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1857                     out << "holding " << _endpoint->protocol() << " connections at " << _acceptor->toString();
1858                 }
1859                 _adapter->getThreadPool()->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead);
1860             }
1861 #ifdef ICE_CPP11_COMPILER
1862             for(const auto& conn : _connections)
1863             {
1864                 conn->hold();
1865             }
1866 #else
1867             for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold));
1868 #endif
1869             break;
1870         }
1871 
1872         case StateClosed:
1873         {
1874             if(_acceptorStarted)
1875             {
1876                 //
1877                 // If possible, close the acceptor now to prevent new connections from
1878                 // being accepted while we are deactivating. This is especially useful
1879                 // if there are no more threads in the thread pool available to dispatch
1880                 // the finish() call. Not all selector implementations do support this
1881                 // however.
1882                 //
1883                 _acceptorStarted = false;
1884                 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true))
1885                 {
1886                     closeAcceptor();
1887                 }
1888             }
1889             else
1890             {
1891 #if TARGET_OS_IPHONE != 0
1892                 _adapter->getThreadPool()->dispatch(new FinishCall(ICE_SHARED_FROM_THIS));
1893 #endif
1894                 state = StateFinished;
1895             }
1896 
1897 #ifdef ICE_CPP11_COMPILER
1898             for(const auto& conn : _connections)
1899             {
1900                 conn->destroy(ConnectionI::ObjectAdapterDeactivated);
1901             }
1902 #else
1903             for_each(_connections.begin(), _connections.end(),
1904                      bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated));
1905 #endif
1906             break;
1907         }
1908 
1909         case StateFinished:
1910         {
1911             assert(_state == StateClosed);
1912             break;
1913         }
1914     }
1915 
1916     _state = state;
1917     notifyAll();
1918 }
1919 
1920 void
1921 IceInternal::IncomingConnectionFactory::createAcceptor()
1922 {
1923     try
1924     {
1925         assert(!_acceptorStarted);
1926         _acceptor = _endpoint->acceptor(_adapter->getName());
1927         assert(_acceptor);
1928         if(_instance->traceLevels()->network >= 2)
1929         {
1930             Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1931             out << "attempting to bind to " << _endpoint->protocol() << " socket " << _acceptor->toString();
1932         }
1933 
1934         _endpoint = _acceptor->listen();
1935         if(_instance->traceLevels()->network >= 1)
1936         {
1937             Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1938             out << "listening for " << _endpoint->protocol() << " connections\n" << _acceptor->toDetailedString();
1939         }
1940 
1941         _adapter->getThreadPool()->initialize(ICE_SHARED_FROM_THIS);
1942         if(_state == StateActive)
1943         {
1944             _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead);
1945         }
1946 
1947         _acceptorStarted = true;
1948     }
1949     catch(const Ice::Exception&)
1950     {
1951         if(_acceptor)
1952         {
1953             _acceptor->close();
1954         }
1955         throw;
1956     }
1957 }
1958 
1959 void
1960 IceInternal::IncomingConnectionFactory::closeAcceptor()
1961 {
1962     assert(_acceptor);
1963 
1964     if(_instance->traceLevels()->network >= 1)
1965     {
1966         Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat);
1967         out << "stopping to accept " << _endpoint->protocol() << " connections at " << _acceptor->toString();
1968     }
1969 
1970     assert(!_acceptorStarted);
1971     _acceptor->close();
1972 }
1973