1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 #include <Ice/ConnectionFactory.h> 6 #include <Ice/ConnectionI.h> 7 #include <Ice/Instance.h> 8 #include <Ice/LoggerUtil.h> 9 #include <Ice/TraceLevels.h> 10 #include <Ice/DefaultsAndOverrides.h> 11 #include <Ice/Properties.h> 12 #include <Ice/Transceiver.h> 13 #include <Ice/Connector.h> 14 #include <Ice/Acceptor.h> 15 #include <Ice/ThreadPool.h> 16 #include <Ice/ObjectAdapterI.h> // For getThreadPool(). 17 #include <Ice/Reference.h> 18 #include <Ice/EndpointI.h> 19 #include <Ice/RouterInfo.h> 20 #include <Ice/LocalException.h> 21 #include <Ice/Functional.h> 22 #include <Ice/OutgoingAsync.h> 23 #include <Ice/CommunicatorI.h> 24 #include <IceUtil/Random.h> 25 #include <iterator> 26 27 #if TARGET_OS_IPHONE != 0 28 namespace IceInternal 29 { 30 31 bool registerForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&); 32 void unregisterForBackgroundNotification(const IceInternal::IncomingConnectionFactoryPtr&); 33 34 } 35 #endif 36 37 using namespace std; 38 using namespace Ice; 39 using namespace Ice::Instrumentation; 40 using namespace IceInternal; 41 42 IceUtil::Shared* IceInternal::upCast(OutgoingConnectionFactory* p) { return p; } 43 44 #ifndef ICE_CPP11_MAPPING 45 IceUtil::Shared* IceInternal::upCast(IncomingConnectionFactory* p) { return p; } 46 #endif 47 48 namespace 49 { 50 51 #ifdef ICE_CPP11_MAPPING 52 template <typename Map> void 53 remove(Map& m, const typename Map::key_type& k, const typename Map::mapped_type& v) 54 { 55 auto pr = m.equal_range(k); 56 assert(pr.first != pr.second); 57 for(auto q = pr.first; q != pr.second; ++q) 58 { 59 if(q->second.get() == v.get()) 60 { 61 m.erase(q); 62 return; 63 } 64 } 65 assert(false); // Nothing was removed which is an error. 66 } 67 68 template<typename Map, typename Predicate> typename Map::mapped_type 69 find(const Map& m, const typename Map::key_type& k, Predicate predicate) 70 { 71 auto pr = m.equal_range(k); 72 for(auto q = pr.first; q != pr.second; ++q) 73 { 74 if(predicate(q->second)) 75 { 76 return q->second; 77 } 78 } 79 return nullptr; 80 } 81 82 #else 83 template <typename K, typename V> void 84 remove(multimap<K, V>& m, K k, V v) 85 { 86 pair<typename multimap<K, V>::iterator, typename multimap<K, V>::iterator> pr = m.equal_range(k); 87 assert(pr.first != pr.second); 88 for(typename multimap<K, V>::iterator q = pr.first; q != pr.second; ++q) 89 { 90 if(q->second.get() == v.get()) 91 { 92 m.erase(q); 93 return; 94 } 95 } 96 assert(false); // Nothing was removed which is an error. 97 } 98 99 template <typename K, typename V> ::IceInternal::Handle<V> 100 find(const multimap<K,::IceInternal::Handle<V> >& m, 101 K k, 102 const ::IceUtilInternal::ConstMemFun<bool, V, ::IceInternal::Handle<V> >& predicate) 103 { 104 pair<typename multimap<K, ::IceInternal::Handle<V> >::const_iterator, 105 typename multimap<K, ::IceInternal::Handle<V> >::const_iterator> pr = m.equal_range(k); 106 for(typename multimap<K, ::IceInternal::Handle<V> >::const_iterator q = pr.first; q != pr.second; ++q) 107 { 108 if(predicate(q->second)) 109 { 110 return q->second; 111 } 112 } 113 return IceInternal::Handle<V>(); 114 } 115 #endif 116 117 class StartAcceptor : public IceUtil::TimerTask 118 #ifdef ICE_CPP11_MAPPING 119 , public std::enable_shared_from_this<StartAcceptor> 120 #endif 121 { 122 public: 123 124 StartAcceptor(const IncomingConnectionFactoryPtr& factory, const InstancePtr& instance) : 125 _factory(factory), _instance(instance) 126 { 127 } 128 129 void 130 runTimerTask() 131 { 132 try 133 { 134 _factory->startAcceptor(); 135 } 136 catch(const Ice::Exception& ex) 137 { 138 Error out(_instance->initializationData().logger); 139 out << "acceptor creation failed:\n" << ex << '\n' << _factory->toString(); 140 _instance->timer()->schedule(ICE_SHARED_FROM_THIS, IceUtil::Time::seconds(1)); 141 } 142 } 143 144 private: 145 146 IncomingConnectionFactoryPtr _factory; 147 InstancePtr _instance; 148 }; 149 150 #if TARGET_OS_IPHONE != 0 151 class FinishCall : public DispatchWorkItem 152 { 153 public: 154 155 FinishCall(const IncomingConnectionFactoryPtr& factory) : _factory(factory) 156 { 157 } 158 159 virtual void 160 run() 161 { 162 _factory->finish(); 163 } 164 165 private: 166 167 const IncomingConnectionFactoryPtr _factory; 168 }; 169 #endif 170 171 } 172 173 bool 174 IceInternal::OutgoingConnectionFactory::ConnectorInfo::operator==(const ConnectorInfo& other) const 175 { 176 return connector == other.connector; 177 } 178 179 void 180 IceInternal::OutgoingConnectionFactory::destroy() 181 { 182 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 183 184 if(_destroyed) 185 { 186 return; 187 } 188 189 #ifdef ICE_CPP11_COMPILER 190 for(const auto& p : _connections) 191 { 192 p.second->destroy(ConnectionI::CommunicatorDestroyed); 193 } 194 #else 195 for_each(_connections.begin(), _connections.end(), 196 bind2nd(Ice::secondVoidMemFun1<const ConnectorPtr, ConnectionI, ConnectionI::DestructionReason> 197 (&ConnectionI::destroy), ConnectionI::CommunicatorDestroyed)); 198 #endif 199 _destroyed = true; 200 _communicator = 0; 201 202 notifyAll(); 203 } 204 205 void 206 IceInternal::OutgoingConnectionFactory::updateConnectionObservers() 207 { 208 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 209 #ifdef ICE_CPP11_COMPILER 210 for(const auto& p : _connections) 211 { 212 p.second->updateObserver(); 213 } 214 #else 215 for_each(_connections.begin(), _connections.end(), 216 Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::updateObserver)); 217 #endif 218 } 219 220 void 221 IceInternal::OutgoingConnectionFactory::waitUntilFinished() 222 { 223 multimap<ConnectorPtr, ConnectionIPtr> connections; 224 225 { 226 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 227 228 // 229 // First we wait until the factory is destroyed. We also wait 230 // until there are no pending connections anymore. Only then 231 // we can be sure the _connections contains all connections. 232 // 233 while(!_destroyed || !_pending.empty() || _pendingConnectCount > 0) 234 { 235 wait(); 236 } 237 238 // 239 // We want to wait until all connections are finished outside the 240 // thread synchronization. 241 // 242 connections = _connections; 243 } 244 245 #ifdef ICE_CPP11_COMPILER 246 for(const auto& p : _connections) 247 { 248 p.second->waitUntilFinished(); 249 } 250 #else 251 for_each(connections.begin(), connections.end(), 252 Ice::secondVoidMemFun<const ConnectorPtr, ConnectionI>(&ConnectionI::waitUntilFinished)); 253 #endif 254 { 255 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 256 // Ensure all the connections are finished and reapable at this point. 257 vector<Ice::ConnectionIPtr> cons; 258 _monitor->swapReapedConnections(cons); 259 assert(cons.size() == _connections.size()); 260 cons.clear(); 261 _connections.clear(); 262 _connectionsByEndpoint.clear(); 263 } 264 265 // 266 // Must be destroyed outside the synchronization since this might block waiting for 267 // a timer task to complete. 268 // 269 _monitor->destroy(); 270 } 271 272 void 273 IceInternal::OutgoingConnectionFactory::create(const vector<EndpointIPtr>& endpts, 274 bool hasMore, 275 Ice::EndpointSelectionType selType, 276 const CreateConnectionCallbackPtr& callback) 277 { 278 assert(!endpts.empty()); 279 280 // 281 // Apply the overrides. 282 // 283 vector<EndpointIPtr> endpoints = applyOverrides(endpts); 284 285 // 286 // Try to find a connection to one of the given endpoints. 287 // 288 try 289 { 290 bool compress; 291 Ice::ConnectionIPtr connection = findConnection(endpoints, compress); 292 if(connection) 293 { 294 callback->setConnection(connection, compress); 295 return; 296 } 297 } 298 catch(const Ice::LocalException& ex) 299 { 300 callback->setException(ex); 301 return; 302 } 303 304 #ifdef ICE_CPP11_MAPPING 305 auto cb = make_shared<ConnectCallback>(_instance, this, endpoints, hasMore, callback, selType); 306 #else 307 ConnectCallbackPtr cb = new ConnectCallback(_instance, this, endpoints, hasMore, callback, selType); 308 #endif 309 cb->getConnectors(); 310 } 311 312 void 313 IceInternal::OutgoingConnectionFactory::setRouterInfo(const RouterInfoPtr& routerInfo) 314 { 315 assert(routerInfo); 316 ObjectAdapterPtr adapter = routerInfo->getAdapter(); 317 vector<EndpointIPtr> endpoints = routerInfo->getClientEndpoints(); // Must be called outside the synchronization 318 319 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 320 321 if(_destroyed) 322 { 323 throw CommunicatorDestroyedException(__FILE__, __LINE__); 324 } 325 326 // 327 // Search for connections to the router's client proxy endpoints, 328 // and update the object adapter for such connections, so that 329 // callbacks from the router can be received over such 330 // connections. 331 // 332 for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) 333 { 334 EndpointIPtr endpoint = *p; 335 336 // 337 // Modify endpoints with overrides. 338 // 339 if(_instance->defaultsAndOverrides()->overrideTimeout) 340 { 341 endpoint = endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); 342 } 343 344 // 345 // The Connection object does not take the compression flag of 346 // endpoints into account, but instead gets the information 347 // about whether messages should be compressed or not from 348 // other sources. In order to allow connection sharing for 349 // endpoints that differ in the value of the compression flag 350 // only, we always set the compression flag to false here in 351 // this connection factory. 352 // 353 endpoint = endpoint->compress(false); 354 355 for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator q = _connections.begin(); 356 q != _connections.end(); ++q) 357 { 358 if(q->second->endpoint() == endpoint) 359 { 360 q->second->setAdapter(adapter); 361 } 362 } 363 } 364 } 365 366 void 367 IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& adapter) 368 { 369 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 370 371 if(_destroyed) 372 { 373 return; 374 } 375 376 for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); ++p) 377 { 378 if(p->second->getAdapter() == adapter) 379 { 380 p->second->setAdapter(0); 381 } 382 } 383 } 384 385 void 386 IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, 387 Ice::CompressBatch compress) 388 { 389 list<ConnectionIPtr> c; 390 391 { 392 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 393 for(multimap<ConnectorPtr, ConnectionIPtr>::const_iterator p = _connections.begin(); p != _connections.end(); 394 ++p) 395 { 396 if(p->second->isActiveOrHolding()) 397 { 398 c.push_back(p->second); 399 } 400 } 401 } 402 403 for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) 404 { 405 try 406 { 407 outAsync->flushConnection(*p, compress); 408 } 409 catch(const LocalException&) 410 { 411 // Ignore. 412 } 413 } 414 } 415 416 IceInternal::OutgoingConnectionFactory::OutgoingConnectionFactory(const CommunicatorPtr& communicator, 417 const InstancePtr& instance) : 418 _communicator(communicator), 419 _instance(instance), 420 _monitor(new FactoryACMMonitor(instance, instance->clientACM())), 421 _destroyed(false), 422 _pendingConnectCount(0) 423 { 424 } 425 426 IceInternal::OutgoingConnectionFactory::~OutgoingConnectionFactory() 427 { 428 assert(_destroyed); 429 assert(_connections.empty()); 430 assert(_connectionsByEndpoint.empty()); 431 assert(_pending.empty()); 432 assert(_pendingConnectCount == 0); 433 } 434 435 vector<EndpointIPtr> 436 IceInternal::OutgoingConnectionFactory::applyOverrides(const vector<EndpointIPtr>& endpts) 437 { 438 DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); 439 vector<EndpointIPtr> endpoints = endpts; 440 for(vector<EndpointIPtr>::iterator p = endpoints.begin(); p != endpoints.end(); ++p) 441 { 442 // 443 // Modify endpoints with overrides. 444 // 445 if(defaultsAndOverrides->overrideTimeout) 446 { 447 *p = (*p)->timeout(defaultsAndOverrides->overrideTimeoutValue); 448 } 449 } 450 return endpoints; 451 } 452 453 ConnectionIPtr 454 IceInternal::OutgoingConnectionFactory::findConnection(const vector<EndpointIPtr>& endpoints, bool& compress) 455 { 456 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 457 if(_destroyed) 458 { 459 throw CommunicatorDestroyedException(__FILE__, __LINE__); 460 } 461 462 DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); 463 assert(!endpoints.empty()); 464 for(vector<EndpointIPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) 465 { 466 #ifdef ICE_CPP11_MAPPING 467 auto connection = find(_connectionsByEndpoint, *p, 468 [](const ConnectionIPtr& conn) 469 { 470 return conn->isActiveOrHolding(); 471 }); 472 #else 473 ConnectionIPtr connection = find(_connectionsByEndpoint, *p, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); 474 #endif 475 if(connection) 476 { 477 if(defaultsAndOverrides->overrideCompress) 478 { 479 compress = defaultsAndOverrides->overrideCompressValue; 480 } 481 else 482 { 483 compress = (*p)->compress(); 484 } 485 return connection; 486 } 487 } 488 return 0; 489 } 490 491 ConnectionIPtr 492 IceInternal::OutgoingConnectionFactory::findConnection(const vector<ConnectorInfo>& connectors, bool& compress) 493 { 494 // This must be called with the mutex locked. 495 496 DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); 497 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 498 { 499 if(_pending.find(p->connector) != _pending.end()) 500 { 501 continue; 502 } 503 504 #ifdef ICE_CPP11_MAPPING 505 auto connection = find(_connections, p->connector, 506 [](const ConnectionIPtr& conn) 507 { 508 return conn->isActiveOrHolding(); 509 }); 510 #else 511 ConnectionIPtr connection = find(_connections, p->connector, Ice::constMemFun(&ConnectionI::isActiveOrHolding)); 512 #endif 513 if(connection) 514 { 515 if(defaultsAndOverrides->overrideCompress) 516 { 517 compress = defaultsAndOverrides->overrideCompressValue; 518 } 519 else 520 { 521 compress = p->endpoint->compress(); 522 } 523 return connection; 524 } 525 } 526 527 return 0; 528 } 529 530 void 531 IceInternal::OutgoingConnectionFactory::incPendingConnectCount() 532 { 533 // 534 // Keep track of the number of pending connects. The outgoing connection factory 535 // waitUntilFinished() method waits for all the pending connects to terminate before 536 // to return. This ensures that the communicator client thread pool isn't destroyed 537 // too soon and will still be available to execute the ice_exception() callbacks for 538 // the asynchronous requests waiting on a connection to be established. 539 // 540 541 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 542 if(_destroyed) 543 { 544 throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); 545 } 546 ++_pendingConnectCount; 547 } 548 549 void 550 IceInternal::OutgoingConnectionFactory::decPendingConnectCount() 551 { 552 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 553 --_pendingConnectCount; 554 assert(_pendingConnectCount >= 0); 555 if(_destroyed && _pendingConnectCount == 0) 556 { 557 notifyAll(); 558 } 559 } 560 561 ConnectionIPtr 562 IceInternal::OutgoingConnectionFactory::getConnection(const vector<ConnectorInfo>& connectors, 563 const ConnectCallbackPtr& cb, 564 bool& compress) 565 { 566 { 567 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 568 if(_destroyed) 569 { 570 throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); 571 } 572 573 // 574 // Reap closed connections 575 // 576 vector<Ice::ConnectionIPtr> cons; 577 _monitor->swapReapedConnections(cons); 578 for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) 579 { 580 remove(_connections, (*p)->connector(), *p); 581 remove(_connectionsByEndpoint, (*p)->endpoint(), *p); 582 remove(_connectionsByEndpoint, (*p)->endpoint()->compress(true), *p); 583 } 584 585 // 586 // Try to get the connection. We may need to wait for other threads to 587 // finish if one of them is currently establishing a connection to one 588 // of our connectors. 589 // 590 while(true) 591 { 592 if(_destroyed) 593 { 594 throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); 595 } 596 597 // 598 // Search for a matching connection. If we find one, we're done. 599 // 600 Ice::ConnectionIPtr connection = findConnection(connectors, compress); 601 if(connection) 602 { 603 return connection; 604 } 605 606 // 607 // Determine whether another thread/request is currently attempting to connect to 608 // one of our endpoints; if so we wait until it's done. 609 // 610 if(addToPending(cb, connectors)) 611 { 612 // 613 // If a callback is not specified we wait until another thread notifies us about a 614 // change to the pending list. Otherwise, if a callback is provided we're done: 615 // when the pending list changes the callback will be notified and will try to 616 // get the connection again. 617 // 618 if(!cb) 619 { 620 wait(); 621 } 622 else 623 { 624 return 0; 625 } 626 } 627 else 628 { 629 // 630 // If no thread is currently establishing a connection to one of our connectors, 631 // we get out of this loop and start the connection establishment to one of the 632 // given connectors. 633 // 634 break; 635 } 636 } 637 } 638 639 // 640 // At this point, we're responsible for establishing the connection to one of 641 // the given connectors. If it's a non-blocking connect, calling nextConnector 642 // will start the connection establishment. Otherwise, we return null to get 643 // the caller to establish the connection. 644 // 645 if(cb) 646 { 647 cb->nextConnector(); 648 } 649 650 return 0; 651 } 652 653 ConnectionIPtr 654 IceInternal::OutgoingConnectionFactory::createConnection(const TransceiverPtr& transceiver, const ConnectorInfo& ci) 655 { 656 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 657 assert(_pending.find(ci.connector) != _pending.end() && transceiver); 658 659 // 660 // Create and add the connection to the connection map. Adding the connection to the map 661 // is necessary to support the interruption of the connection initialization and validation 662 // in case the communicator is destroyed. 663 // 664 Ice::ConnectionIPtr connection; 665 try 666 { 667 if(_destroyed) 668 { 669 throw Ice::CommunicatorDestroyedException(__FILE__, __LINE__); 670 } 671 672 connection = ConnectionI::create(_communicator, _instance, _monitor, transceiver, ci.connector, 673 ci.endpoint->compress(false), ICE_NULLPTR); 674 } 675 catch(const Ice::LocalException&) 676 { 677 try 678 { 679 transceiver->close(); 680 } 681 catch(const Ice::LocalException&) 682 { 683 // Ignore 684 } 685 throw; 686 } 687 688 _connections.insert(pair<const ConnectorPtr, ConnectionIPtr>(ci.connector, connection)); 689 _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint(), connection)); 690 _connectionsByEndpoint.insert(pair<const EndpointIPtr, ConnectionIPtr>(connection->endpoint()->compress(true), 691 connection)); 692 return connection; 693 } 694 695 void 696 IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, 697 const ConnectorInfo& ci, 698 const ConnectionIPtr& connection, 699 const ConnectCallbackPtr& cb) 700 { 701 set<ConnectCallbackPtr> connectionCallbacks; 702 if(cb) 703 { 704 connectionCallbacks.insert(cb); 705 } 706 707 set<ConnectCallbackPtr> callbacks; 708 { 709 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 710 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 711 { 712 map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); 713 if(q != _pending.end()) 714 { 715 for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) 716 { 717 if((*r)->hasConnector(ci)) 718 { 719 connectionCallbacks.insert(*r); 720 } 721 else 722 { 723 callbacks.insert(*r); 724 } 725 } 726 _pending.erase(q); 727 } 728 } 729 730 for(set<ConnectCallbackPtr>::iterator r = connectionCallbacks.begin(); r != connectionCallbacks.end(); ++r) 731 { 732 (*r)->removeFromPending(); 733 callbacks.erase(*r); 734 } 735 for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) 736 { 737 (*r)->removeFromPending(); 738 } 739 notifyAll(); 740 } 741 742 bool compress; 743 DefaultsAndOverridesPtr defaultsAndOverrides = _instance->defaultsAndOverrides(); 744 if(defaultsAndOverrides->overrideCompress) 745 { 746 compress = defaultsAndOverrides->overrideCompressValue; 747 } 748 else 749 { 750 compress = ci.endpoint->compress(); 751 } 752 753 for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) 754 { 755 (*p)->getConnection(); 756 } 757 for(set<ConnectCallbackPtr>::const_iterator p = connectionCallbacks.begin(); p != connectionCallbacks.end(); ++p) 758 { 759 (*p)->setConnection(connection, compress); 760 } 761 } 762 763 void 764 IceInternal::OutgoingConnectionFactory::finishGetConnection(const vector<ConnectorInfo>& connectors, 765 const Ice::LocalException& ex, 766 const ConnectCallbackPtr& cb) 767 { 768 set<ConnectCallbackPtr> failedCallbacks; 769 if(cb) 770 { 771 failedCallbacks.insert(cb); 772 } 773 774 set<ConnectCallbackPtr> callbacks; 775 { 776 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 777 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 778 { 779 map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); 780 if(q != _pending.end()) 781 { 782 for(set<ConnectCallbackPtr>::const_iterator r = q->second.begin(); r != q->second.end(); ++r) 783 { 784 if((*r)->removeConnectors(connectors)) 785 { 786 failedCallbacks.insert(*r); 787 } 788 else 789 { 790 callbacks.insert(*r); 791 } 792 } 793 _pending.erase(q); 794 } 795 } 796 797 for(set<ConnectCallbackPtr>::iterator r = callbacks.begin(); r != callbacks.end(); ++r) 798 { 799 assert(failedCallbacks.find(*r) == failedCallbacks.end()); 800 (*r)->removeFromPending(); 801 } 802 notifyAll(); 803 } 804 805 for(set<ConnectCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p) 806 { 807 (*p)->getConnection(); 808 } 809 for(set<ConnectCallbackPtr>::const_iterator p = failedCallbacks.begin(); p != failedCallbacks.end(); ++p) 810 { 811 (*p)->setException(ex); 812 } 813 } 814 815 bool 816 IceInternal::OutgoingConnectionFactory::addToPending(const ConnectCallbackPtr& cb, 817 const vector<ConnectorInfo>& connectors) 818 { 819 // 820 // Add the callback to each connector pending list. 821 // 822 bool found = false; 823 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 824 { 825 map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); 826 if(q != _pending.end()) 827 { 828 found = true; 829 if(cb) 830 { 831 q->second.insert(cb); 832 } 833 } 834 } 835 836 if(found) 837 { 838 return true; 839 } 840 841 // 842 // If there's no pending connection for the given connectors, we're 843 // responsible for its establishment. We add empty pending lists, 844 // other callbacks to the same connectors will be queued. 845 // 846 for(vector<ConnectorInfo>::const_iterator r = connectors.begin(); r != connectors.end(); ++r) 847 { 848 if(_pending.find(r->connector) == _pending.end()) 849 { 850 _pending.insert(pair<ConnectorPtr, set<ConnectCallbackPtr> >(r->connector, set<ConnectCallbackPtr>())); 851 } 852 } 853 return false; 854 } 855 856 void 857 IceInternal::OutgoingConnectionFactory::removeFromPending(const ConnectCallbackPtr& cb, 858 const vector<ConnectorInfo>& connectors) 859 { 860 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 861 { 862 map<ConnectorPtr, set<ConnectCallbackPtr> >::iterator q = _pending.find(p->connector); 863 if(q != _pending.end()) 864 { 865 q->second.erase(cb); 866 } 867 } 868 } 869 870 void 871 IceInternal::OutgoingConnectionFactory::handleException(const LocalException& ex, bool hasMore) 872 { 873 TraceLevelsPtr traceLevels = _instance->traceLevels(); 874 if(traceLevels->network >= 2) 875 { 876 Trace out(_instance->initializationData().logger, traceLevels->networkCat); 877 878 out << "couldn't resolve endpoint host"; 879 if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) 880 { 881 out << "\n"; 882 } 883 else 884 { 885 if(hasMore) 886 { 887 out << ", trying next endpoint\n"; 888 } 889 else 890 { 891 out << " and no more endpoints to try\n"; 892 } 893 } 894 out << ex; 895 } 896 } 897 898 void 899 IceInternal::OutgoingConnectionFactory::handleConnectionException(const LocalException& ex, bool hasMore) 900 { 901 TraceLevelsPtr traceLevels = _instance->traceLevels(); 902 if(traceLevels->network >= 2) 903 { 904 Trace out(_instance->initializationData().logger, traceLevels->networkCat); 905 906 out << "connection to endpoint failed"; 907 if(dynamic_cast<const CommunicatorDestroyedException*>(&ex)) 908 { 909 out << "\n"; 910 } 911 else 912 { 913 if(hasMore) 914 { 915 out << ", trying next endpoint\n"; 916 } 917 else 918 { 919 out << " and no more endpoints to try\n"; 920 } 921 } 922 out << ex; 923 } 924 } 925 926 IceInternal::OutgoingConnectionFactory::ConnectCallback::ConnectCallback(const InstancePtr& instance, 927 const OutgoingConnectionFactoryPtr& factory, 928 const vector<EndpointIPtr>& endpoints, 929 bool hasMore, 930 const CreateConnectionCallbackPtr& cb, 931 Ice::EndpointSelectionType selType) : 932 _instance(instance), 933 _factory(factory), 934 _endpoints(endpoints), 935 _hasMore(hasMore), 936 _callback(cb), 937 _selType(selType) 938 { 939 _endpointsIter = _endpoints.begin(); 940 } 941 942 // 943 // Methods from ConnectionI.StartCallback 944 // 945 void 946 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartCompleted(const ConnectionIPtr& connection) 947 { 948 if(_observer) 949 { 950 _observer->detach(); 951 } 952 953 connection->activate(); 954 _factory->finishGetConnection(_connectors, *_iter, connection, ICE_SHARED_FROM_THIS); 955 } 956 957 void 958 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailed(const ConnectionIPtr& /*connection*/, 959 const LocalException& ex) 960 { 961 assert(_iter != _connectors.end()); 962 if(connectionStartFailedImpl(ex)) 963 { 964 nextConnector(); 965 } 966 } 967 968 // 969 // Methods from EndpointI_connectors 970 // 971 void 972 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectors(const vector<ConnectorPtr>& connectors) 973 { 974 for(vector<ConnectorPtr>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 975 { 976 _connectors.push_back(ConnectorInfo(*p, *_endpointsIter)); 977 } 978 979 if(++_endpointsIter != _endpoints.end()) 980 { 981 nextEndpoint(); 982 } 983 else 984 { 985 assert(!_connectors.empty()); 986 987 // 988 // We now have all the connectors for the given endpoints. We can try to obtain the 989 // connection. 990 // 991 _iter = _connectors.begin(); 992 getConnection(); 993 } 994 } 995 996 void 997 IceInternal::OutgoingConnectionFactory::ConnectCallback::exception(const Ice::LocalException& ex) 998 { 999 _factory->handleException(ex, _hasMore || _endpointsIter != _endpoints.end() - 1); 1000 if(++_endpointsIter != _endpoints.end()) 1001 { 1002 nextEndpoint(); 1003 } 1004 else if(!_connectors.empty()) 1005 { 1006 // 1007 // We now have all the connectors for the given endpoints. We can try to obtain the 1008 // connection. 1009 // 1010 _iter = _connectors.begin(); 1011 getConnection(); 1012 } 1013 else 1014 { 1015 _callback->setException(ex); 1016 _factory->decPendingConnectCount(); // Must be called last. 1017 } 1018 } 1019 1020 void 1021 IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnectors() 1022 { 1023 try 1024 { 1025 // 1026 // Notify the factory that there's an async connect pending. This is necessary 1027 // to prevent the outgoing connection factory to be destroyed before all the 1028 // pending asynchronous connects are finished. 1029 // 1030 _factory->incPendingConnectCount(); 1031 } 1032 catch(const Ice::LocalException& ex) 1033 { 1034 _callback->setException(ex); 1035 return; 1036 } 1037 1038 nextEndpoint(); 1039 } 1040 1041 void 1042 IceInternal::OutgoingConnectionFactory::ConnectCallback::nextEndpoint() 1043 { 1044 try 1045 { 1046 assert(_endpointsIter != _endpoints.end()); 1047 (*_endpointsIter)->connectors_async(_selType, ICE_SHARED_FROM_THIS); 1048 1049 } 1050 catch(const Ice::LocalException& ex) 1051 { 1052 exception(ex); 1053 } 1054 } 1055 1056 void 1057 IceInternal::OutgoingConnectionFactory::ConnectCallback::getConnection() 1058 { 1059 try 1060 { 1061 // 1062 // If all the connectors have been created, we ask the factory to get a 1063 // connection. 1064 // 1065 bool compress; 1066 Ice::ConnectionIPtr connection = _factory->getConnection(_connectors, ICE_SHARED_FROM_THIS, compress); 1067 if(!connection) 1068 { 1069 // 1070 // A null return value from getConnection indicates that the connection 1071 // is being established and that everthing has been done to ensure that 1072 // the callback will be notified when the connection establishment is 1073 // done or that the callback already obtain the connection. 1074 // 1075 return; 1076 } 1077 1078 _callback->setConnection(connection, compress); 1079 _factory->decPendingConnectCount(); // Must be called last. 1080 } 1081 catch(const Ice::LocalException& ex) 1082 { 1083 _callback->setException(ex); 1084 _factory->decPendingConnectCount(); // Must be called last. 1085 } 1086 } 1087 1088 void 1089 IceInternal::OutgoingConnectionFactory::ConnectCallback::nextConnector() 1090 { 1091 while(true) 1092 { 1093 try 1094 { 1095 const CommunicatorObserverPtr& obsv = _factory->_instance->initializationData().observer; 1096 if(obsv) 1097 { 1098 _observer = obsv->getConnectionEstablishmentObserver(_iter->endpoint, _iter->connector->toString()); 1099 if(_observer) 1100 { 1101 _observer->attach(); 1102 } 1103 } 1104 1105 assert(_iter != _connectors.end()); 1106 1107 if(_instance->traceLevels()->network >= 2) 1108 { 1109 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1110 out << "trying to establish " << _iter->endpoint->protocol() << " connection to " 1111 << _iter->connector->toString(); 1112 } 1113 Ice::ConnectionIPtr connection = _factory->createConnection(_iter->connector->connect(), *_iter); 1114 connection->start(ICE_SHARED_FROM_THIS); 1115 } 1116 catch(const Ice::LocalException& ex) 1117 { 1118 if(_instance->traceLevels()->network >= 2) 1119 { 1120 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1121 out << "failed to establish " << _iter->endpoint->protocol() << " connection to " 1122 << _iter->connector->toString() << "\n" << ex; 1123 } 1124 1125 if(connectionStartFailedImpl(ex)) 1126 { 1127 continue; // More connectors to try, continue. 1128 } 1129 } 1130 break; 1131 } 1132 } 1133 1134 void 1135 IceInternal::OutgoingConnectionFactory::ConnectCallback::setConnection(const Ice::ConnectionIPtr& connection, 1136 bool compress) 1137 { 1138 // 1139 // Callback from the factory: the connection to one of the callback 1140 // connectors has been established. 1141 // 1142 _callback->setConnection(connection, compress); 1143 _factory->decPendingConnectCount(); // Must be called last. 1144 } 1145 1146 void 1147 IceInternal::OutgoingConnectionFactory::ConnectCallback::setException(const Ice::LocalException& ex) 1148 { 1149 // 1150 // Callback from the factory: connection establishment failed. 1151 // 1152 _callback->setException(ex); 1153 _factory->decPendingConnectCount(); // Must be called last. 1154 } 1155 1156 bool 1157 IceInternal::OutgoingConnectionFactory::ConnectCallback::hasConnector(const ConnectorInfo& ci) 1158 { 1159 return find(_connectors.begin(), _connectors.end(), ci) != _connectors.end(); 1160 } 1161 1162 bool 1163 IceInternal::OutgoingConnectionFactory::ConnectCallback::removeConnectors(const vector<ConnectorInfo>& connectors) 1164 { 1165 // 1166 // Callback from the factory: connecting to the given connectors 1167 // failed, we remove the connectors and return true if there's 1168 // no more connectors left to try. 1169 // 1170 for(vector<ConnectorInfo>::const_iterator p = connectors.begin(); p != connectors.end(); ++p) 1171 { 1172 _connectors.erase(remove(_connectors.begin(), _connectors.end(), *p), _connectors.end()); 1173 } 1174 return _connectors.empty(); 1175 } 1176 1177 void 1178 IceInternal::OutgoingConnectionFactory::ConnectCallback::removeFromPending() 1179 { 1180 _factory->removeFromPending(ICE_SHARED_FROM_THIS, _connectors); 1181 } 1182 1183 bool 1184 IceInternal::OutgoingConnectionFactory::ConnectCallback::operator<(const ConnectCallback& rhs) const 1185 { 1186 return this < &rhs; 1187 } 1188 1189 bool 1190 IceInternal::OutgoingConnectionFactory::ConnectCallback::connectionStartFailedImpl(const Ice::LocalException& ex) 1191 { 1192 if(_observer) 1193 { 1194 _observer->failed(ex.ice_id()); 1195 _observer->detach(); 1196 } 1197 1198 _factory->handleConnectionException(ex, _hasMore || _iter != _connectors.end() - 1); 1199 if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) // No need to continue. 1200 { 1201 _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS); 1202 } 1203 else if(++_iter != _connectors.end()) // Try the next connector. 1204 { 1205 return true; 1206 } 1207 else 1208 { 1209 _factory->finishGetConnection(_connectors, ex, ICE_SHARED_FROM_THIS); 1210 } 1211 return false; 1212 } 1213 1214 void 1215 IceInternal::IncomingConnectionFactory::activate() 1216 { 1217 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1218 setState(StateActive); 1219 } 1220 1221 void 1222 IceInternal::IncomingConnectionFactory::hold() 1223 { 1224 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1225 setState(StateHolding); 1226 } 1227 1228 void 1229 IceInternal::IncomingConnectionFactory::destroy() 1230 { 1231 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1232 setState(StateClosed); 1233 } 1234 1235 void 1236 IceInternal::IncomingConnectionFactory::updateConnectionObservers() 1237 { 1238 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1239 #ifdef ICE_CPP11_COMPILER 1240 for(const auto& conn : _connections) 1241 { 1242 conn->updateObserver(); 1243 } 1244 #else 1245 for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::updateObserver)); 1246 #endif 1247 } 1248 1249 void 1250 IceInternal::IncomingConnectionFactory::waitUntilHolding() const 1251 { 1252 set<ConnectionIPtr> connections; 1253 1254 { 1255 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1256 1257 // 1258 // First we wait until the connection factory itself is in holding 1259 // state. 1260 // 1261 while(_state < StateHolding) 1262 { 1263 wait(); 1264 } 1265 1266 // 1267 // We want to wait until all connections are in holding state 1268 // outside the thread synchronization. 1269 // 1270 connections = _connections; 1271 } 1272 1273 // 1274 // Now we wait until each connection is in holding state. 1275 // 1276 #ifdef ICE_CPP11_COMPILER 1277 for(const auto& conn : connections) 1278 { 1279 conn->waitUntilHolding(); 1280 } 1281 #else 1282 for_each(connections.begin(), connections.end(), Ice::constVoidMemFun(&ConnectionI::waitUntilHolding)); 1283 #endif 1284 } 1285 1286 void 1287 IceInternal::IncomingConnectionFactory::waitUntilFinished() 1288 { 1289 set<ConnectionIPtr> connections; 1290 { 1291 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1292 1293 // 1294 // First we wait until the factory is destroyed. If we are using 1295 // an acceptor, we also wait for it to be closed. 1296 // 1297 while(_state != StateFinished) 1298 { 1299 wait(); 1300 } 1301 1302 // 1303 // Clear the OA. See bug 1673 for the details of why this is necessary. 1304 // 1305 _adapter = 0; 1306 1307 // We want to wait until all connections are finished outside the 1308 // thread synchronization. 1309 // 1310 connections = _connections; 1311 } 1312 1313 #ifdef ICE_CPP11_COMPILER 1314 for(const auto& conn : connections) 1315 { 1316 conn->waitUntilFinished(); 1317 } 1318 #else 1319 for_each(connections.begin(), connections.end(), Ice::voidMemFun(&ConnectionI::waitUntilFinished)); 1320 #endif 1321 1322 { 1323 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1324 if(_transceiver) 1325 { 1326 assert(_connections.size() <= 1); // The connection isn't monitored or reaped. 1327 } 1328 else 1329 { 1330 // Ensure all the connections are finished and reapable at this point. 1331 vector<Ice::ConnectionIPtr> cons; 1332 _monitor->swapReapedConnections(cons); 1333 assert(cons.size() == _connections.size()); 1334 cons.clear(); 1335 } 1336 _connections.clear(); 1337 } 1338 1339 // 1340 // Must be destroyed outside the synchronization since this might block waiting for 1341 // a timer task to complete. 1342 // 1343 _monitor->destroy(); 1344 } 1345 1346 bool 1347 IceInternal::IncomingConnectionFactory::isLocal(const EndpointIPtr& endpoint) const 1348 { 1349 if(_publishedEndpoint && endpoint->equivalent(_publishedEndpoint)) 1350 { 1351 return true; 1352 } 1353 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1354 return endpoint->equivalent(_endpoint); 1355 } 1356 1357 EndpointIPtr 1358 IceInternal::IncomingConnectionFactory::endpoint() const 1359 { 1360 if(_publishedEndpoint) 1361 { 1362 return _publishedEndpoint; 1363 } 1364 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1365 return _endpoint; 1366 } 1367 1368 list<ConnectionIPtr> 1369 IceInternal::IncomingConnectionFactory::connections() const 1370 { 1371 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1372 1373 list<ConnectionIPtr> result; 1374 1375 // 1376 // Only copy connections which have not been destroyed. 1377 // 1378 #ifdef ICE_CPP11_COMPILER 1379 remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), 1380 [](const ConnectionIPtr& conn) 1381 { 1382 return !conn->isActiveOrHolding(); 1383 }); 1384 #else 1385 remove_copy_if(_connections.begin(), _connections.end(), back_inserter(result), 1386 not1(Ice::constMemFun(&ConnectionI::isActiveOrHolding))); 1387 #endif 1388 return result; 1389 } 1390 1391 void 1392 IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchAsyncPtr& outAsync, 1393 Ice::CompressBatch compress) 1394 { 1395 list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. 1396 1397 for(list<ConnectionIPtr>::const_iterator p = c.begin(); p != c.end(); ++p) 1398 { 1399 try 1400 { 1401 outAsync->flushConnection(*p, compress); 1402 } 1403 catch(const LocalException&) 1404 { 1405 // Ignore. 1406 } 1407 } 1408 } 1409 1410 #if defined(ICE_USE_IOCP) || defined(ICE_OS_UWP) 1411 bool 1412 IceInternal::IncomingConnectionFactory::startAsync(SocketOperation) 1413 { 1414 assert(_acceptor); 1415 if(_state >= StateClosed) 1416 { 1417 return false; 1418 } 1419 1420 try 1421 { 1422 _acceptor->startAccept(); 1423 } 1424 catch(const Ice::LocalException& ex) 1425 { 1426 ICE_SET_EXCEPTION_FROM_CLONE(_acceptorException, ex.ice_clone()); 1427 _acceptor->getNativeInfo()->completed(SocketOperationRead); 1428 } 1429 return true; 1430 } 1431 1432 bool 1433 IceInternal::IncomingConnectionFactory::finishAsync(SocketOperation) 1434 { 1435 assert(_acceptor); 1436 try 1437 { 1438 if(_acceptorException) 1439 { 1440 _acceptorException->ice_throw(); 1441 } 1442 _acceptor->finishAccept(); 1443 } 1444 catch(const LocalException& ex) 1445 { 1446 _acceptorException.reset(ICE_NULLPTR); 1447 1448 Error out(_instance->initializationData().logger); 1449 out << "couldn't accept connection:\n" << ex << '\n' << _acceptor->toString(); 1450 if(_acceptorStarted) 1451 { 1452 _acceptorStarted = false; 1453 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) 1454 { 1455 closeAcceptor(); 1456 } 1457 } 1458 } 1459 return _state < StateClosed; 1460 } 1461 #endif 1462 1463 void 1464 IceInternal::IncomingConnectionFactory::message(ThreadPoolCurrent& current) 1465 { 1466 ConnectionIPtr connection; 1467 1468 ThreadPoolMessage<IncomingConnectionFactory> msg(current, *this); 1469 1470 { 1471 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1472 1473 ThreadPoolMessage<IncomingConnectionFactory>::IOScope io(msg); 1474 if(!io) 1475 { 1476 return; 1477 } 1478 1479 if(_state >= StateClosed) 1480 { 1481 return; 1482 } 1483 else if(_state == StateHolding) 1484 { 1485 IceUtil::ThreadControl::yield(); 1486 return; 1487 } 1488 1489 // 1490 // Reap closed connections 1491 // 1492 vector<Ice::ConnectionIPtr> cons; 1493 _monitor->swapReapedConnections(cons); 1494 for(vector<Ice::ConnectionIPtr>::const_iterator p = cons.begin(); p != cons.end(); ++p) 1495 { 1496 _connections.erase(*p); 1497 } 1498 1499 if(!_acceptorStarted) 1500 { 1501 return; 1502 } 1503 1504 // 1505 // Now accept a new connection. 1506 // 1507 TransceiverPtr transceiver; 1508 try 1509 { 1510 transceiver = _acceptor->accept(); 1511 1512 if(_instance->traceLevels()->network >= 2) 1513 { 1514 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1515 out << "trying to accept " << _endpoint->protocol() << " connection\n" << transceiver->toString(); 1516 } 1517 } 1518 catch(const SocketException& ex) 1519 { 1520 if(noMoreFds(ex.error)) 1521 { 1522 Error out(_instance->initializationData().logger); 1523 out << "can't accept more connections:\n" << ex << '\n' << _acceptor->toString(); 1524 1525 assert(_acceptorStarted); 1526 _acceptorStarted = false; 1527 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) 1528 { 1529 closeAcceptor(); 1530 } 1531 } 1532 1533 // Ignore socket exceptions. 1534 return; 1535 } 1536 catch(const LocalException& ex) 1537 { 1538 // Warn about other Ice local exceptions. 1539 if(_warn) 1540 { 1541 Warning out(_instance->initializationData().logger); 1542 out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); 1543 } 1544 return; 1545 } 1546 1547 assert(transceiver); 1548 1549 try 1550 { 1551 connection = ConnectionI::create(_adapter->getCommunicator(), _instance, _monitor, transceiver, 0, 1552 _endpoint, _adapter); 1553 } 1554 catch(const LocalException& ex) 1555 { 1556 try 1557 { 1558 transceiver->close(); 1559 } 1560 catch(const Ice::LocalException&) 1561 { 1562 // Ignore. 1563 } 1564 1565 if(_warn) 1566 { 1567 Warning out(_instance->initializationData().logger); 1568 out << "connection exception:\n" << ex << '\n' << _acceptor->toString(); 1569 } 1570 return; 1571 } 1572 1573 _connections.insert(connection); 1574 } 1575 1576 assert(connection); 1577 1578 connection->start(ICE_SHARED_FROM_THIS); 1579 } 1580 1581 void 1582 IceInternal::IncomingConnectionFactory::finished(ThreadPoolCurrent&, bool close) 1583 { 1584 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1585 if(_state < StateClosed) 1586 { 1587 if(close) 1588 { 1589 closeAcceptor(); 1590 } 1591 1592 // 1593 // If the acceptor hasn't been explicitly stopped (which is the case if the acceptor got closed 1594 // because of an unexpected error), try to restart the acceptor in 1 second. 1595 // 1596 if(!_acceptorStopped) 1597 { 1598 _instance->timer()->schedule(ICE_MAKE_SHARED(StartAcceptor, ICE_SHARED_FROM_THIS, _instance), 1599 IceUtil::Time::seconds(1)); 1600 } 1601 return; 1602 } 1603 1604 assert(_state >= StateClosed); 1605 setState(StateFinished); 1606 1607 if(close) 1608 { 1609 closeAcceptor(); 1610 } 1611 1612 #if TARGET_OS_IPHONE != 0 1613 sync.release(); 1614 finish(); 1615 #endif 1616 } 1617 1618 #if TARGET_OS_IPHONE != 0 1619 void 1620 IceInternal::IncomingConnectionFactory::finish() 1621 { 1622 unregisterForBackgroundNotification(ICE_SHARED_FROM_THIS); 1623 } 1624 #endif 1625 1626 string 1627 IceInternal::IncomingConnectionFactory::toString() const 1628 { 1629 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1630 if(_transceiver) 1631 { 1632 return _transceiver->toString(); 1633 } 1634 else if(_acceptor) 1635 { 1636 return _acceptor->toString(); 1637 } 1638 else 1639 { 1640 return string(); 1641 } 1642 } 1643 1644 NativeInfoPtr 1645 IceInternal::IncomingConnectionFactory::getNativeInfo() 1646 { 1647 if(_transceiver) 1648 { 1649 return _transceiver->getNativeInfo(); 1650 } 1651 else if(_acceptor) 1652 { 1653 return _acceptor->getNativeInfo(); 1654 } 1655 else 1656 { 1657 return 0; 1658 } 1659 } 1660 1661 void 1662 IceInternal::IncomingConnectionFactory::connectionStartCompleted(const Ice::ConnectionIPtr& connection) 1663 { 1664 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1665 1666 // 1667 // Initialy, connections are in the holding state. If the factory is active 1668 // we activate the connection. 1669 // 1670 if(_state == StateActive) 1671 { 1672 connection->activate(); 1673 } 1674 } 1675 1676 void 1677 IceInternal::IncomingConnectionFactory::connectionStartFailed(const Ice::ConnectionIPtr& /*connection*/, 1678 const Ice::LocalException&) 1679 { 1680 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1681 if(_state >= StateClosed) 1682 { 1683 return; 1684 } 1685 1686 // 1687 // Do not warn about connection exceptions here. The connection is not yet validated. 1688 // 1689 } 1690 1691 // 1692 // COMPILERFIX: The ConnectionFactory setup is broken out into a separate initialize 1693 // function because when it was part of the constructor C++Builder 2007 apps would 1694 // crash if an execption was thrown from any calls within the constructor. 1695 // 1696 IceInternal::IncomingConnectionFactory::IncomingConnectionFactory(const InstancePtr& instance, 1697 const EndpointIPtr& endpoint, 1698 const EndpointIPtr& publishedEndpoint, 1699 const ObjectAdapterIPtr& adapter) : 1700 _instance(instance), 1701 _monitor(new FactoryACMMonitor(instance, dynamic_cast<ObjectAdapterI*>(adapter.get())->getACM())), 1702 _endpoint(endpoint), 1703 _publishedEndpoint(publishedEndpoint), 1704 _acceptorStarted(false), 1705 _acceptorStopped(false), 1706 _adapter(adapter), 1707 _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), 1708 _state(StateHolding) 1709 { 1710 } 1711 1712 void 1713 IceInternal::IncomingConnectionFactory::startAcceptor() 1714 { 1715 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1716 if(_state >= StateClosed || _acceptorStarted) 1717 { 1718 return; 1719 } 1720 1721 _acceptorStopped = false; 1722 createAcceptor(); 1723 } 1724 1725 void 1726 IceInternal::IncomingConnectionFactory::stopAcceptor() 1727 { 1728 IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); 1729 if(_state >= StateClosed || !_acceptorStarted) 1730 { 1731 return; 1732 } 1733 1734 _acceptorStopped = true; 1735 _acceptorStarted = false; 1736 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) 1737 { 1738 closeAcceptor(); 1739 } 1740 } 1741 1742 void 1743 IceInternal::IncomingConnectionFactory::initialize() 1744 { 1745 if(_instance->defaultsAndOverrides()->overrideTimeout) 1746 { 1747 _endpoint = _endpoint->timeout(_instance->defaultsAndOverrides()->overrideTimeoutValue); 1748 } 1749 1750 if(_instance->defaultsAndOverrides()->overrideCompress) 1751 { 1752 _endpoint = _endpoint->compress(_instance->defaultsAndOverrides()->overrideCompressValue); 1753 } 1754 try 1755 { 1756 const_cast<TransceiverPtr&>(_transceiver) = _endpoint->transceiver(); 1757 if(_transceiver) 1758 { 1759 if(_instance->traceLevels()->network >= 2) 1760 { 1761 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1762 out << "attempting to bind to " << _endpoint->protocol() << " socket\n" << _transceiver->toString(); 1763 } 1764 const_cast<EndpointIPtr&>(_endpoint) = _transceiver->bind(); 1765 ConnectionIPtr connection(ConnectionI::create(_adapter->getCommunicator(), _instance, 0, _transceiver, 0, 1766 _endpoint, _adapter)); 1767 connection->start(0); 1768 _connections.insert(connection); 1769 } 1770 else 1771 { 1772 #if TARGET_OS_IPHONE != 0 1773 // 1774 // The notification center will call back on the factory to 1775 // start the acceptor if necessary. 1776 // 1777 registerForBackgroundNotification(ICE_SHARED_FROM_THIS); 1778 #else 1779 createAcceptor(); 1780 #endif 1781 } 1782 } 1783 catch(const Ice::Exception&) 1784 { 1785 if(_transceiver) 1786 { 1787 try 1788 { 1789 _transceiver->close(); 1790 } 1791 catch(const Ice::LocalException&) 1792 { 1793 // Ignore 1794 } 1795 } 1796 1797 _state = StateFinished; 1798 _monitor->destroy(); 1799 _connections.clear(); 1800 throw; 1801 } 1802 } 1803 1804 IceInternal::IncomingConnectionFactory::~IncomingConnectionFactory() 1805 { 1806 assert(_state == StateFinished); 1807 assert(_connections.empty()); 1808 } 1809 1810 void 1811 IceInternal::IncomingConnectionFactory::setState(State state) 1812 { 1813 if(_state == state) // Don't switch twice. 1814 { 1815 return; 1816 } 1817 1818 switch(state) 1819 { 1820 case StateActive: 1821 { 1822 if(_state != StateHolding) // Can only switch from holding to active. 1823 { 1824 return; 1825 } 1826 if(_acceptor) 1827 { 1828 if(_instance->traceLevels()->network >= 1) 1829 { 1830 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1831 out << "accepting " << _endpoint->protocol() << " connections at " << _acceptor->toString(); 1832 } 1833 _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead); 1834 } 1835 #ifdef ICE_CPP11_COMPILER 1836 for(const auto& conn : _connections) 1837 { 1838 conn->activate(); 1839 } 1840 #else 1841 for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::activate)); 1842 #endif 1843 break; 1844 } 1845 1846 case StateHolding: 1847 { 1848 if(_state != StateActive) // Can only switch from active to holding. 1849 { 1850 return; 1851 } 1852 if(_acceptor) 1853 { 1854 if(_instance->traceLevels()->network >= 1) 1855 { 1856 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1857 out << "holding " << _endpoint->protocol() << " connections at " << _acceptor->toString(); 1858 } 1859 _adapter->getThreadPool()->unregister(ICE_SHARED_FROM_THIS, SocketOperationRead); 1860 } 1861 #ifdef ICE_CPP11_COMPILER 1862 for(const auto& conn : _connections) 1863 { 1864 conn->hold(); 1865 } 1866 #else 1867 for_each(_connections.begin(), _connections.end(), Ice::voidMemFun(&ConnectionI::hold)); 1868 #endif 1869 break; 1870 } 1871 1872 case StateClosed: 1873 { 1874 if(_acceptorStarted) 1875 { 1876 // 1877 // If possible, close the acceptor now to prevent new connections from 1878 // being accepted while we are deactivating. This is especially useful 1879 // if there are no more threads in the thread pool available to dispatch 1880 // the finish() call. Not all selector implementations do support this 1881 // however. 1882 // 1883 _acceptorStarted = false; 1884 if(_adapter->getThreadPool()->finish(ICE_SHARED_FROM_THIS, true)) 1885 { 1886 closeAcceptor(); 1887 } 1888 } 1889 else 1890 { 1891 #if TARGET_OS_IPHONE != 0 1892 _adapter->getThreadPool()->dispatch(new FinishCall(ICE_SHARED_FROM_THIS)); 1893 #endif 1894 state = StateFinished; 1895 } 1896 1897 #ifdef ICE_CPP11_COMPILER 1898 for(const auto& conn : _connections) 1899 { 1900 conn->destroy(ConnectionI::ObjectAdapterDeactivated); 1901 } 1902 #else 1903 for_each(_connections.begin(), _connections.end(), 1904 bind2nd(Ice::voidMemFun1(&ConnectionI::destroy), ConnectionI::ObjectAdapterDeactivated)); 1905 #endif 1906 break; 1907 } 1908 1909 case StateFinished: 1910 { 1911 assert(_state == StateClosed); 1912 break; 1913 } 1914 } 1915 1916 _state = state; 1917 notifyAll(); 1918 } 1919 1920 void 1921 IceInternal::IncomingConnectionFactory::createAcceptor() 1922 { 1923 try 1924 { 1925 assert(!_acceptorStarted); 1926 _acceptor = _endpoint->acceptor(_adapter->getName()); 1927 assert(_acceptor); 1928 if(_instance->traceLevels()->network >= 2) 1929 { 1930 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1931 out << "attempting to bind to " << _endpoint->protocol() << " socket " << _acceptor->toString(); 1932 } 1933 1934 _endpoint = _acceptor->listen(); 1935 if(_instance->traceLevels()->network >= 1) 1936 { 1937 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1938 out << "listening for " << _endpoint->protocol() << " connections\n" << _acceptor->toDetailedString(); 1939 } 1940 1941 _adapter->getThreadPool()->initialize(ICE_SHARED_FROM_THIS); 1942 if(_state == StateActive) 1943 { 1944 _adapter->getThreadPool()->_register(ICE_SHARED_FROM_THIS, SocketOperationRead); 1945 } 1946 1947 _acceptorStarted = true; 1948 } 1949 catch(const Ice::Exception&) 1950 { 1951 if(_acceptor) 1952 { 1953 _acceptor->close(); 1954 } 1955 throw; 1956 } 1957 } 1958 1959 void 1960 IceInternal::IncomingConnectionFactory::closeAcceptor() 1961 { 1962 assert(_acceptor); 1963 1964 if(_instance->traceLevels()->network >= 1) 1965 { 1966 Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); 1967 out << "stopping to accept " << _endpoint->protocol() << " connections at " << _acceptor->toString(); 1968 } 1969 1970 assert(!_acceptorStarted); 1971 _acceptor->close(); 1972 } 1973