1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace IceInternal 6 { 7 using System; 8 using System.Collections.Generic; 9 using System.Diagnostics; 10 using System.Text; 11 12 public class MultiDictionary<K, V> : Dictionary<K, ICollection<V>> 13 { 14 public void Add(K key, V value)15 Add(K key, V value) 16 { 17 ICollection<V> list = null; 18 if(!TryGetValue(key, out list)) 19 { 20 list = new List<V>(); 21 Add(key, list); 22 } 23 list.Add(value); 24 } 25 26 public void Remove(K key, V value)27 Remove(K key, V value) 28 { 29 ICollection<V> list = this[key]; 30 list.Remove(value); 31 if(list.Count == 0) 32 { 33 Remove(key); 34 } 35 } 36 } 37 38 public sealed class OutgoingConnectionFactory 39 { 40 public interface CreateConnectionCallback 41 { setConnection(Ice.ConnectionI connection, bool compress)42 void setConnection(Ice.ConnectionI connection, bool compress); setException(Ice.LocalException ex)43 void setException(Ice.LocalException ex); 44 } 45 destroy()46 public void destroy() 47 { 48 lock(this) 49 { 50 if(_destroyed) 51 { 52 return; 53 } 54 55 foreach(ICollection<Ice.ConnectionI> connections in _connections.Values) 56 { 57 foreach(Ice.ConnectionI c in connections) 58 { 59 c.destroy(Ice.ConnectionI.CommunicatorDestroyed); 60 } 61 } 62 63 _destroyed = true; 64 _communicator = null; 65 System.Threading.Monitor.PulseAll(this); 66 } 67 } 68 updateConnectionObservers()69 public void updateConnectionObservers() 70 { 71 lock(this) 72 { 73 foreach(ICollection<Ice.ConnectionI> connections in _connections.Values) 74 { 75 foreach(Ice.ConnectionI c in connections) 76 { 77 c.updateObserver(); 78 } 79 } 80 } 81 } 82 waitUntilFinished()83 public void waitUntilFinished() 84 { 85 Dictionary<Connector, ICollection<Ice.ConnectionI>> connections = null; 86 lock(this) 87 { 88 // 89 // First we wait until the factory is destroyed. We also 90 // wait until there are no pending connections 91 // anymore. Only then we can be sure the _connections 92 // contains all connections. 93 // 94 while(!_destroyed || _pending.Count > 0 || _pendingConnectCount > 0) 95 { 96 System.Threading.Monitor.Wait(this); 97 } 98 99 // 100 // We want to wait until all connections are finished outside the 101 // thread synchronization. 102 // 103 connections = new Dictionary<Connector, ICollection<Ice.ConnectionI>>(_connections); 104 } 105 106 // 107 // Now we wait until the destruction of each connection is finished. 108 // 109 foreach(ICollection<Ice.ConnectionI> cl in connections.Values) 110 { 111 foreach(Ice.ConnectionI c in cl) 112 { 113 c.waitUntilFinished(); 114 } 115 } 116 117 lock(this) 118 { 119 // Ensure all the connections are finished and reapable at this point. 120 ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); 121 if(cons != null) 122 { 123 int size = 0; 124 foreach(ICollection<Ice.ConnectionI> cl in _connections.Values) 125 { 126 size += cl.Count; 127 } 128 Debug.Assert(cons.Count == size); 129 _connections.Clear(); 130 _connectionsByEndpoint.Clear(); 131 } 132 else 133 { 134 Debug.Assert(_connections.Count == 0); 135 Debug.Assert(_connectionsByEndpoint.Count == 0); 136 } 137 } 138 139 // 140 // Must be destroyed outside the synchronization since this might block waiting for 141 // a timer task to execute. 142 // 143 _monitor.destroy(); 144 } 145 create(EndpointI[] endpts, bool hasMore, Ice.EndpointSelectionType selType, CreateConnectionCallback callback)146 public void create(EndpointI[] endpts, bool hasMore, Ice.EndpointSelectionType selType, 147 CreateConnectionCallback callback) 148 { 149 Debug.Assert(endpts.Length > 0); 150 151 // 152 // Apply the overrides. 153 // 154 List<EndpointI> endpoints = applyOverrides(endpts); 155 156 // 157 // Try to find a connection to one of the given endpoints. 158 // 159 try 160 { 161 bool compress; 162 Ice.ConnectionI connection = findConnection(endpoints, out compress); 163 if(connection != null) 164 { 165 callback.setConnection(connection, compress); 166 return; 167 } 168 } 169 catch(Ice.LocalException ex) 170 { 171 callback.setException(ex); 172 return; 173 } 174 175 ConnectCallback cb = new ConnectCallback(this, endpoints, hasMore, callback, selType); 176 cb.getConnectors(); 177 } 178 setRouterInfo(IceInternal.RouterInfo routerInfo)179 public void setRouterInfo(IceInternal.RouterInfo routerInfo) 180 { 181 Debug.Assert(routerInfo != null); 182 Ice.ObjectAdapter adapter = routerInfo.getAdapter(); 183 EndpointI[] endpoints = routerInfo.getClientEndpoints(); // Must be called outside the synchronization 184 185 lock(this) 186 { 187 if(_destroyed) 188 { 189 throw new Ice.CommunicatorDestroyedException(); 190 } 191 192 // 193 // Search for connections to the router's client proxy 194 // endpoints, and update the object adapter for such 195 // connections, so that callbacks from the router can be 196 // received over such connections. 197 // 198 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 199 for(int i = 0; i < endpoints.Length; i++) 200 { 201 EndpointI endpoint = endpoints[i]; 202 203 // 204 // Modify endpoints with overrides. 205 // 206 if(defaultsAndOverrides.overrideTimeout) 207 { 208 endpoint = endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); 209 } 210 211 // 212 // The Ice.ConnectionI object does not take the compression flag of 213 // endpoints into account, but instead gets the information 214 // about whether messages should be compressed or not from 215 // other sources. In order to allow connection sharing for 216 // endpoints that differ in the value of the compression flag 217 // only, we always set the compression flag to false here in 218 // this connection factory. 219 // 220 endpoint = endpoint.compress(false); 221 222 foreach(ICollection<Ice.ConnectionI> connections in _connections.Values) 223 { 224 foreach(Ice.ConnectionI connection in connections) 225 { 226 if(connection.endpoint().Equals(endpoint)) 227 { 228 connection.setAdapter(adapter); 229 } 230 } 231 } 232 } 233 } 234 } 235 removeAdapter(Ice.ObjectAdapter adapter)236 public void removeAdapter(Ice.ObjectAdapter adapter) 237 { 238 lock(this) 239 { 240 if(_destroyed) 241 { 242 return; 243 } 244 245 foreach(ICollection<Ice.ConnectionI> connectionList in _connections.Values) 246 { 247 foreach(Ice.ConnectionI connection in connectionList) 248 { 249 if(connection.getAdapter() == adapter) 250 { 251 connection.setAdapter(null); 252 } 253 } 254 } 255 } 256 } 257 flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)258 public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) 259 { 260 ICollection<Ice.ConnectionI> c = new List<Ice.ConnectionI>(); 261 262 lock(this) 263 { 264 if(!_destroyed) 265 { 266 foreach(ICollection<Ice.ConnectionI> connectionList in _connections.Values) 267 { 268 foreach(Ice.ConnectionI conn in connectionList) 269 { 270 if(conn.isActiveOrHolding()) 271 { 272 c.Add(conn); 273 } 274 } 275 } 276 } 277 } 278 279 foreach(Ice.ConnectionI conn in c) 280 { 281 try 282 { 283 outAsync.flushConnection(conn, compressBatch); 284 } 285 catch(Ice.LocalException) 286 { 287 // Ignore. 288 } 289 } 290 } 291 292 // 293 // Only for use by Instance. 294 // OutgoingConnectionFactory(Ice.Communicator communicator, Instance instance)295 internal OutgoingConnectionFactory(Ice.Communicator communicator, Instance instance) 296 { 297 _communicator = communicator; 298 _instance = instance; 299 _destroyed = false; 300 _monitor = new FactoryACMMonitor(instance, instance.clientACM()); 301 _pendingConnectCount = 0; 302 } 303 applyOverrides(EndpointI[] endpts)304 private List<EndpointI> applyOverrides(EndpointI[] endpts) 305 { 306 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 307 List<EndpointI> endpoints = new List<EndpointI>(); 308 for(int i = 0; i < endpts.Length; i++) 309 { 310 // 311 // Modify endpoints with overrides. 312 // 313 if(defaultsAndOverrides.overrideTimeout) 314 { 315 endpoints.Add(endpts[i].timeout(defaultsAndOverrides.overrideTimeoutValue)); 316 } 317 else 318 { 319 endpoints.Add(endpts[i]); 320 } 321 } 322 323 return endpoints; 324 } 325 findConnection(List<EndpointI> endpoints, out bool compress)326 private Ice.ConnectionI findConnection(List<EndpointI> endpoints, out bool compress) 327 { 328 lock(this) 329 { 330 if(_destroyed) 331 { 332 throw new Ice.CommunicatorDestroyedException(); 333 } 334 335 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 336 Debug.Assert(endpoints.Count > 0); 337 338 foreach(EndpointI endpoint in endpoints) 339 { 340 ICollection<Ice.ConnectionI> connectionList = null; 341 if(!_connectionsByEndpoint.TryGetValue(endpoint, out connectionList)) 342 { 343 continue; 344 } 345 346 foreach(Ice.ConnectionI connection in connectionList) 347 { 348 if(connection.isActiveOrHolding()) // Don't return destroyed or unvalidated connections 349 { 350 if(defaultsAndOverrides.overrideCompress) 351 { 352 compress = defaultsAndOverrides.overrideCompressValue; 353 } 354 else 355 { 356 compress = endpoint.compress(); 357 } 358 return connection; 359 } 360 } 361 } 362 363 compress = false; // Satisfy the compiler 364 return null; 365 } 366 } 367 368 // 369 // Must be called while synchronized. 370 // findConnection(List<ConnectorInfo> connectors, out bool compress)371 private Ice.ConnectionI findConnection(List<ConnectorInfo> connectors, out bool compress) 372 { 373 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 374 foreach(ConnectorInfo ci in connectors) 375 { 376 if(_pending.ContainsKey(ci.connector)) 377 { 378 continue; 379 } 380 381 ICollection<Ice.ConnectionI> connectionList = null; 382 if(!_connections.TryGetValue(ci.connector, out connectionList)) 383 { 384 continue; 385 } 386 387 foreach(Ice.ConnectionI connection in connectionList) 388 { 389 if(connection.isActiveOrHolding()) // Don't return destroyed or un-validated connections 390 { 391 if(defaultsAndOverrides.overrideCompress) 392 { 393 compress = defaultsAndOverrides.overrideCompressValue; 394 } 395 else 396 { 397 compress = ci.endpoint.compress(); 398 } 399 return connection; 400 } 401 } 402 } 403 404 compress = false; // Satisfy the compiler 405 return null; 406 } 407 incPendingConnectCount()408 internal void incPendingConnectCount() 409 { 410 // 411 // Keep track of the number of pending connects. The outgoing connection factory 412 // waitUntilFinished() method waits for all the pending connects to terminate before 413 // to return. This ensures that the communicator client thread pool isn't destroyed 414 // too soon and will still be available to execute the ice_exception() callbacks for 415 // the asynchronous requests waiting on a connection to be established. 416 // 417 418 lock(this) 419 { 420 if(_destroyed) 421 { 422 throw new Ice.CommunicatorDestroyedException(); 423 } 424 ++_pendingConnectCount; 425 } 426 } 427 decPendingConnectCount()428 internal void decPendingConnectCount() 429 { 430 lock(this) 431 { 432 --_pendingConnectCount; 433 Debug.Assert(_pendingConnectCount >= 0); 434 if(_destroyed && _pendingConnectCount == 0) 435 { 436 System.Threading.Monitor.PulseAll(this); 437 } 438 } 439 } 440 getConnection(List<ConnectorInfo> connectors, ConnectCallback cb, out bool compress)441 private Ice.ConnectionI getConnection(List<ConnectorInfo> connectors, ConnectCallback cb, out bool compress) 442 { 443 lock(this) 444 { 445 if(_destroyed) 446 { 447 throw new Ice.CommunicatorDestroyedException(); 448 } 449 450 // 451 // Reap closed connections 452 // 453 ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); 454 if(cons != null) 455 { 456 foreach(Ice.ConnectionI c in cons) 457 { 458 _connections.Remove(c.connector(), c); 459 _connectionsByEndpoint.Remove(c.endpoint(), c); 460 _connectionsByEndpoint.Remove(c.endpoint().compress(true), c); 461 } 462 } 463 464 // 465 // Try to get the connection. We may need to wait for other threads to 466 // finish if one of them is currently establishing a connection to one 467 // of our connectors. 468 // 469 while(true) 470 { 471 if(_destroyed) 472 { 473 throw new Ice.CommunicatorDestroyedException(); 474 } 475 476 // 477 // Search for a matching connection. If we find one, we're done. 478 // 479 Ice.ConnectionI connection = findConnection(connectors, out compress); 480 if(connection != null) 481 { 482 return connection; 483 } 484 485 if(addToPending(cb, connectors)) 486 { 487 // 488 // If a callback is not specified we wait until another thread notifies us about a 489 // change to the pending list. Otherwise, if a callback is provided we're done: 490 // when the pending list changes the callback will be notified and will try to 491 // get the connection again. 492 // 493 if(cb == null) 494 { 495 System.Threading.Monitor.Wait(this); 496 } 497 else 498 { 499 return null; 500 } 501 } 502 else 503 { 504 // 505 // If no thread is currently establishing a connection to one of our connectors, 506 // we get out of this loop and start the connection establishment to one of the 507 // given connectors. 508 // 509 break; 510 } 511 } 512 } 513 514 // 515 // At this point, we're responsible for establishing the connection to one of 516 // the given connectors. If it's a non-blocking connect, calling nextConnector 517 // will start the connection establishment. Otherwise, we return null to get 518 // the caller to establish the connection. 519 // 520 if(cb != null) 521 { 522 cb.nextConnector(); 523 } 524 525 compress = false; // Satisfy the compiler 526 return null; 527 } 528 createConnection(Transceiver transceiver, ConnectorInfo ci)529 private Ice.ConnectionI createConnection(Transceiver transceiver, ConnectorInfo ci) 530 { 531 lock(this) 532 { 533 Debug.Assert(_pending.ContainsKey(ci.connector) && transceiver != null); 534 535 // 536 // Create and add the connection to the connection map. Adding the connection to the map 537 // is necessary to support the interruption of the connection initialization and validation 538 // in case the communicator is destroyed. 539 // 540 Ice.ConnectionI connection; 541 try 542 { 543 if(_destroyed) 544 { 545 throw new Ice.CommunicatorDestroyedException(); 546 } 547 548 connection = new Ice.ConnectionI(_communicator, _instance, _monitor, transceiver, ci.connector, 549 ci.endpoint.compress(false), null); 550 } 551 catch(Ice.LocalException) 552 { 553 try 554 { 555 transceiver.close(); 556 } 557 catch(Ice.LocalException) 558 { 559 // Ignore 560 } 561 throw; 562 } 563 564 _connections.Add(ci.connector, connection); 565 _connectionsByEndpoint.Add(connection.endpoint(), connection); 566 _connectionsByEndpoint.Add(connection.endpoint().compress(true), connection); 567 return connection; 568 } 569 } 570 finishGetConnection(List<ConnectorInfo> connectors, ConnectorInfo ci, Ice.ConnectionI connection, ConnectCallback cb)571 private void finishGetConnection(List<ConnectorInfo> connectors, 572 ConnectorInfo ci, 573 Ice.ConnectionI connection, 574 ConnectCallback cb) 575 { 576 HashSet<ConnectCallback> connectionCallbacks = new HashSet<ConnectCallback>(); 577 if(cb != null) 578 { 579 connectionCallbacks.Add(cb); 580 } 581 582 HashSet<ConnectCallback> callbacks = new HashSet<ConnectCallback>(); 583 lock(this) 584 { 585 foreach(ConnectorInfo c in connectors) 586 { 587 HashSet<ConnectCallback> s = null; 588 if(_pending.TryGetValue(c.connector, out s)) 589 { 590 foreach(ConnectCallback cc in s) 591 { 592 if(cc.hasConnector(ci)) 593 { 594 connectionCallbacks.Add(cc); 595 } 596 else 597 { 598 callbacks.Add(cc); 599 } 600 } 601 _pending.Remove(c.connector); 602 } 603 } 604 605 foreach(ConnectCallback cc in connectionCallbacks) 606 { 607 cc.removeFromPending(); 608 callbacks.Remove(cc); 609 } 610 foreach(ConnectCallback cc in callbacks) 611 { 612 cc.removeFromPending(); 613 } 614 System.Threading.Monitor.PulseAll(this); 615 } 616 617 bool compress; 618 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 619 if(defaultsAndOverrides.overrideCompress) 620 { 621 compress = defaultsAndOverrides.overrideCompressValue; 622 } 623 else 624 { 625 compress = ci.endpoint.compress(); 626 } 627 628 foreach(ConnectCallback cc in callbacks) 629 { 630 cc.getConnection(); 631 } 632 foreach(ConnectCallback cc in connectionCallbacks) 633 { 634 cc.setConnection(connection, compress); 635 } 636 } 637 finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb)638 private void finishGetConnection(List<ConnectorInfo> connectors, Ice.LocalException ex, ConnectCallback cb) 639 { 640 HashSet<ConnectCallback> failedCallbacks = new HashSet<ConnectCallback>(); 641 if(cb != null) 642 { 643 failedCallbacks.Add(cb); 644 } 645 646 HashSet<ConnectCallback> callbacks = new HashSet<ConnectCallback>(); 647 lock(this) 648 { 649 foreach(ConnectorInfo c in connectors) 650 { 651 HashSet<ConnectCallback> s = null; 652 if(_pending.TryGetValue(c.connector, out s)) 653 { 654 foreach(ConnectCallback cc in s) 655 { 656 if(cc.removeConnectors(connectors)) 657 { 658 failedCallbacks.Add(cc); 659 } 660 else 661 { 662 callbacks.Add(cc); 663 } 664 } 665 _pending.Remove(c.connector); 666 } 667 } 668 669 foreach(ConnectCallback cc in callbacks) 670 { 671 Debug.Assert(!failedCallbacks.Contains(cc)); 672 cc.removeFromPending(); 673 } 674 System.Threading.Monitor.PulseAll(this); 675 } 676 677 foreach(ConnectCallback cc in callbacks) 678 { 679 cc.getConnection(); 680 } 681 foreach(ConnectCallback cc in failedCallbacks) 682 { 683 cc.setException(ex); 684 } 685 } 686 handleConnectionException(Ice.LocalException ex, bool hasMore)687 private void handleConnectionException(Ice.LocalException ex, bool hasMore) 688 { 689 TraceLevels traceLevels = _instance.traceLevels(); 690 if(traceLevels.network >= 2) 691 { 692 StringBuilder s = new StringBuilder(); 693 s.Append("connection to endpoint failed"); 694 if(ex is Ice.CommunicatorDestroyedException) 695 { 696 s.Append("\n"); 697 } 698 else 699 { 700 if(hasMore) 701 { 702 s.Append(", trying next endpoint\n"); 703 } 704 else 705 { 706 s.Append(" and no more endpoints to try\n"); 707 } 708 } 709 s.Append(ex); 710 _instance.initializationData().logger.trace(traceLevels.networkCat, s.ToString()); 711 } 712 } 713 714 private bool addToPending(ConnectCallback cb, List<ConnectorInfo> connectors)715 addToPending(ConnectCallback cb, List<ConnectorInfo> connectors) 716 { 717 // 718 // Add the callback to each connector pending list. 719 // 720 bool found = false; 721 foreach(ConnectorInfo ci in connectors) 722 { 723 HashSet<ConnectCallback> cbs = null; 724 if(_pending.TryGetValue(ci.connector, out cbs)) 725 { 726 found = true; 727 if(cb != null) 728 { 729 cbs.Add(cb); // Add the callback to each pending connector. 730 } 731 } 732 } 733 734 if(found) 735 { 736 return true; 737 } 738 739 // 740 // If there's no pending connection for the given connectors, we're 741 // responsible for its establishment. We add empty pending lists, 742 // other callbacks to the same connectors will be queued. 743 // 744 foreach(ConnectorInfo ci in connectors) 745 { 746 if(!_pending.ContainsKey(ci.connector)) 747 { 748 _pending.Add(ci.connector, new HashSet<ConnectCallback>()); 749 } 750 } 751 return false; 752 } 753 754 private void removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors)755 removeFromPending(ConnectCallback cb, List<ConnectorInfo> connectors) 756 { 757 foreach(ConnectorInfo ci in connectors) 758 { 759 HashSet<ConnectCallback> cbs = null; 760 if(_pending.TryGetValue(ci.connector, out cbs)) 761 { 762 cbs.Remove(cb); 763 } 764 } 765 } 766 handleException(Ice.LocalException ex, bool hasMore)767 internal void handleException(Ice.LocalException ex, bool hasMore) 768 { 769 TraceLevels traceLevels = _instance.traceLevels(); 770 if(traceLevels.network >= 2) 771 { 772 StringBuilder s = new StringBuilder(); 773 s.Append("couldn't resolve endpoint host"); 774 if(ex is Ice.CommunicatorDestroyedException) 775 { 776 s.Append("\n"); 777 } 778 else 779 { 780 if(hasMore) 781 { 782 s.Append(", trying next endpoint\n"); 783 } 784 else 785 { 786 s.Append(" and no more endpoints to try\n"); 787 } 788 } 789 s.Append(ex); 790 _instance.initializationData().logger.trace(traceLevels.networkCat, s.ToString()); 791 } 792 } 793 794 private class ConnectorInfo 795 { ConnectorInfo(Connector c, EndpointI e)796 internal ConnectorInfo(Connector c, EndpointI e) 797 { 798 connector = c; 799 endpoint = e; 800 } 801 Equals(object obj)802 public override bool Equals(object obj) 803 { 804 ConnectorInfo r = (ConnectorInfo)obj; 805 return connector.Equals(r.connector); 806 } 807 GetHashCode()808 public override int GetHashCode() 809 { 810 return connector.GetHashCode(); 811 } 812 813 public Connector connector; 814 public EndpointI endpoint; 815 } 816 817 private class ConnectCallback : Ice.ConnectionI.StartCallback, EndpointI_connectors 818 { ConnectCallback(OutgoingConnectionFactory f, List<EndpointI> endpoints, bool more, CreateConnectionCallback cb, Ice.EndpointSelectionType selType)819 internal ConnectCallback(OutgoingConnectionFactory f, List<EndpointI> endpoints, bool more, 820 CreateConnectionCallback cb, Ice.EndpointSelectionType selType) 821 { 822 _factory = f; 823 _endpoints = endpoints; 824 _hasMore = more; 825 _callback = cb; 826 _selType = selType; 827 _endpointsIter = 0; 828 } 829 830 // 831 // Methods from ConnectionI.StartCallback 832 // connectionStartCompleted(Ice.ConnectionI connection)833 public void connectionStartCompleted(Ice.ConnectionI connection) 834 { 835 if(_observer != null) 836 { 837 _observer.detach(); 838 } 839 connection.activate(); 840 _factory.finishGetConnection(_connectors, _current, connection, this); 841 } 842 connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)843 public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) 844 { 845 if(connectionStartFailedImpl(ex)) 846 { 847 nextConnector(); 848 } 849 } 850 851 // 852 // Methods from EndpointI_connectors 853 // connectors(List<Connector> cons)854 public void connectors(List<Connector> cons) 855 { 856 foreach(Connector connector in cons) 857 { 858 _connectors.Add(new ConnectorInfo(connector, _currentEndpoint)); 859 } 860 861 if(_endpointsIter < _endpoints.Count) 862 { 863 nextEndpoint(); 864 } 865 else 866 { 867 Debug.Assert(_connectors.Count > 0); 868 869 // 870 // We now have all the connectors for the given endpoints. We can try to obtain the 871 // connection. 872 // 873 _iter = 0; 874 getConnection(); 875 } 876 } 877 exception(Ice.LocalException ex)878 public void exception(Ice.LocalException ex) 879 { 880 _factory.handleException(ex, _hasMore || _endpointsIter < _endpoints.Count); 881 if(_endpointsIter < _endpoints.Count) 882 { 883 nextEndpoint(); 884 } 885 else if(_connectors.Count > 0) 886 { 887 // 888 // We now have all the connectors for the given endpoints. We can try to obtain the 889 // connection. 890 // 891 _iter = 0; 892 getConnection(); 893 } 894 else 895 { 896 _callback.setException(ex); 897 _factory.decPendingConnectCount(); // Must be called last. 898 } 899 } 900 setConnection(Ice.ConnectionI connection, bool compress)901 public void setConnection(Ice.ConnectionI connection, bool compress) 902 { 903 // 904 // Callback from the factory: the connection to one of the callback 905 // connectors has been established. 906 // 907 _callback.setConnection(connection, compress); 908 _factory.decPendingConnectCount(); // Must be called last. 909 } 910 setException(Ice.LocalException ex)911 public void setException(Ice.LocalException ex) 912 { 913 // 914 // Callback from the factory: connection establishment failed. 915 // 916 _callback.setException(ex); 917 _factory.decPendingConnectCount(); // Must be called last. 918 } 919 hasConnector(ConnectorInfo ci)920 public bool hasConnector(ConnectorInfo ci) 921 { 922 return _connectors.Contains(ci); 923 } 924 removeConnectors(List<ConnectorInfo> connectors)925 public bool removeConnectors(List<ConnectorInfo> connectors) 926 { 927 foreach(ConnectorInfo ci in connectors) 928 { 929 while(_connectors.Remove(ci)); // Remove all of them. 930 } 931 return _connectors.Count == 0; 932 } 933 removeFromPending()934 public void removeFromPending() 935 { 936 _factory.removeFromPending(this, _connectors); 937 } 938 getConnectors()939 public void getConnectors() 940 { 941 try 942 { 943 // 944 // Notify the factory that there's an async connect pending. This is necessary 945 // to prevent the outgoing connection factory to be destroyed before all the 946 // pending asynchronous connects are finished. 947 // 948 _factory.incPendingConnectCount(); 949 } 950 catch(Ice.LocalException ex) 951 { 952 _callback.setException(ex); 953 return; 954 } 955 956 nextEndpoint(); 957 } 958 nextEndpoint()959 void nextEndpoint() 960 { 961 try 962 { 963 Debug.Assert(_endpointsIter < _endpoints.Count); 964 _currentEndpoint = _endpoints[_endpointsIter++]; 965 _currentEndpoint.connectors_async(_selType, this); 966 } 967 catch(Ice.LocalException ex) 968 { 969 exception(ex); 970 } 971 } 972 getConnection()973 internal void getConnection() 974 { 975 try 976 { 977 // 978 // If all the connectors have been created, we ask the factory to get a 979 // connection. 980 // 981 bool compress; 982 Ice.ConnectionI connection = _factory.getConnection(_connectors, this, out compress); 983 if(connection == null) 984 { 985 // 986 // A null return value from getConnection indicates that the connection 987 // is being established and that everthing has been done to ensure that 988 // the callback will be notified when the connection establishment is 989 // done. 990 // 991 return; 992 } 993 994 _callback.setConnection(connection, compress); 995 _factory.decPendingConnectCount(); // Must be called last. 996 } 997 catch(Ice.LocalException ex) 998 { 999 _callback.setException(ex); 1000 _factory.decPendingConnectCount(); // Must be called last. 1001 } 1002 } 1003 nextConnector()1004 internal void nextConnector() 1005 { 1006 while(true) 1007 { 1008 try 1009 { 1010 Debug.Assert(_iter < _connectors.Count); 1011 _current = _connectors[_iter++]; 1012 1013 Ice.Instrumentation.CommunicatorObserver obsv = _factory._instance.initializationData().observer; 1014 if(obsv != null) 1015 { 1016 _observer = obsv.getConnectionEstablishmentObserver(_current.endpoint, 1017 _current.connector.ToString()); 1018 if(_observer != null) 1019 { 1020 _observer.attach(); 1021 } 1022 } 1023 1024 if(_factory._instance.traceLevels().network >= 2) 1025 { 1026 StringBuilder s = new StringBuilder("trying to establish "); 1027 s.Append(_current.endpoint.protocol()); 1028 s.Append(" connection to "); 1029 s.Append(_current.connector.ToString()); 1030 _factory._instance.initializationData().logger.trace( 1031 _factory._instance.traceLevels().networkCat, s.ToString()); 1032 } 1033 1034 Ice.ConnectionI connection = _factory.createConnection(_current.connector.connect(), _current); 1035 connection.start(this); 1036 } 1037 catch(Ice.LocalException ex) 1038 { 1039 if(_factory._instance.traceLevels().network >= 2) 1040 { 1041 StringBuilder s = new StringBuilder("failed to establish "); 1042 s.Append(_current.endpoint.protocol()); 1043 s.Append(" connection to "); 1044 s.Append(_current.connector.ToString()); 1045 s.Append("\n"); 1046 s.Append(ex); 1047 _factory._instance.initializationData().logger.trace( 1048 _factory._instance.traceLevels().networkCat, s.ToString()); 1049 } 1050 1051 if(connectionStartFailedImpl(ex)) 1052 { 1053 continue; 1054 } 1055 } 1056 break; 1057 } 1058 } 1059 connectionStartFailedImpl(Ice.LocalException ex)1060 private bool connectionStartFailedImpl(Ice.LocalException ex) 1061 { 1062 if(_observer != null) 1063 { 1064 _observer.failed(ex.ice_id()); 1065 _observer.detach(); 1066 } 1067 _factory.handleConnectionException(ex, _hasMore || _iter < _connectors.Count); 1068 if(ex is Ice.CommunicatorDestroyedException) // No need to continue. 1069 { 1070 _factory.finishGetConnection(_connectors, ex, this); 1071 } 1072 else if(_iter < _connectors.Count) // Try the next connector. 1073 { 1074 return true; 1075 } 1076 else 1077 { 1078 _factory.finishGetConnection(_connectors, ex, this); 1079 } 1080 return false; 1081 } 1082 1083 private OutgoingConnectionFactory _factory; 1084 private bool _hasMore; 1085 private CreateConnectionCallback _callback; 1086 private List<EndpointI> _endpoints; 1087 private Ice.EndpointSelectionType _selType; 1088 private int _endpointsIter; 1089 private EndpointI _currentEndpoint; 1090 private List<ConnectorInfo> _connectors = new List<ConnectorInfo>(); 1091 private int _iter; 1092 private ConnectorInfo _current; 1093 private Ice.Instrumentation.Observer _observer; 1094 } 1095 1096 private Ice.Communicator _communicator; 1097 private readonly Instance _instance; 1098 private FactoryACMMonitor _monitor; 1099 private bool _destroyed; 1100 1101 private MultiDictionary<Connector, Ice.ConnectionI> _connections = 1102 new MultiDictionary<Connector, Ice.ConnectionI>(); 1103 private MultiDictionary<EndpointI, Ice.ConnectionI> _connectionsByEndpoint = 1104 new MultiDictionary<EndpointI, Ice.ConnectionI>(); 1105 private Dictionary<Connector, HashSet<ConnectCallback>> _pending = 1106 new Dictionary<Connector, HashSet<ConnectCallback>>(); 1107 private int _pendingConnectCount; 1108 } 1109 1110 public sealed class IncomingConnectionFactory : EventHandler, Ice.ConnectionI.StartCallback 1111 { 1112 private class StartAcceptor : TimerTask 1113 { StartAcceptor(IncomingConnectionFactory factory)1114 public StartAcceptor(IncomingConnectionFactory factory) 1115 { 1116 _factory = factory; 1117 } 1118 runTimerTask()1119 public void runTimerTask() 1120 { 1121 _factory.startAcceptor(); 1122 } 1123 1124 private IncomingConnectionFactory _factory; 1125 } 1126 startAcceptor()1127 public void startAcceptor() 1128 { 1129 lock(this) 1130 { 1131 if(_state >= StateClosed || _acceptorStarted) 1132 { 1133 return; 1134 } 1135 1136 try 1137 { 1138 createAcceptor(); 1139 } 1140 catch(Exception ex) 1141 { 1142 string s = "acceptor creation failed:\n" + ex + '\n' + _acceptor.ToString(); 1143 _instance.initializationData().logger.error(s); 1144 _instance.timer().schedule(new StartAcceptor(this), 1000); 1145 } 1146 } 1147 } 1148 activate()1149 public void activate() 1150 { 1151 lock(this) 1152 { 1153 setState(StateActive); 1154 } 1155 } 1156 hold()1157 public void hold() 1158 { 1159 lock(this) 1160 { 1161 setState(StateHolding); 1162 } 1163 } 1164 destroy()1165 public void destroy() 1166 { 1167 lock(this) 1168 { 1169 setState(StateClosed); 1170 } 1171 } 1172 updateConnectionObservers()1173 public void updateConnectionObservers() 1174 { 1175 lock(this) 1176 { 1177 foreach(Ice.ConnectionI connection in _connections) 1178 { 1179 connection.updateObserver(); 1180 } 1181 } 1182 } 1183 waitUntilHolding()1184 public void waitUntilHolding() 1185 { 1186 ICollection<Ice.ConnectionI> connections; 1187 1188 lock(this) 1189 { 1190 // 1191 // First we wait until the connection factory itself is in 1192 // holding state. 1193 // 1194 while(_state < StateHolding) 1195 { 1196 System.Threading.Monitor.Wait(this); 1197 } 1198 1199 // 1200 // We want to wait until all connections are in holding state 1201 // outside the thread synchronization. 1202 // 1203 connections = new List<Ice.ConnectionI>(_connections); 1204 } 1205 1206 // 1207 // Now we wait until each connection is in holding state. 1208 // 1209 foreach(Ice.ConnectionI connection in connections) 1210 { 1211 connection.waitUntilHolding(); 1212 } 1213 } 1214 waitUntilFinished()1215 public void waitUntilFinished() 1216 { 1217 ICollection<Ice.ConnectionI> connections = null; 1218 1219 lock(this) 1220 { 1221 // 1222 // First we wait until the factory is destroyed. If we are using 1223 // an acceptor, we also wait for it to be closed. 1224 // 1225 while(_state != StateFinished) 1226 { 1227 System.Threading.Monitor.Wait(this); 1228 } 1229 1230 // 1231 // Clear the OA. See bug 1673 for the details of why this is necessary. 1232 // 1233 _adapter = null; 1234 1235 // 1236 // We want to wait until all connections are finished outside the 1237 // thread synchronization. 1238 // 1239 connections = new List<Ice.ConnectionI>(_connections); 1240 } 1241 1242 foreach(Ice.ConnectionI connection in connections) 1243 { 1244 connection.waitUntilFinished(); 1245 } 1246 1247 lock(this) 1248 { 1249 if(_transceiver != null) 1250 { 1251 Debug.Assert(_connections.Count <= 1); // The connection isn't monitored or reaped. 1252 } 1253 else 1254 { 1255 // Ensure all the connections are finished and reapable at this point. 1256 ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); 1257 Debug.Assert((cons == null ? 0 : cons.Count) == _connections.Count); 1258 if(cons != null) 1259 { 1260 cons.Clear(); 1261 } 1262 } 1263 _connections.Clear(); 1264 } 1265 1266 // 1267 // Must be destroyed outside the synchronization since this might block waiting for 1268 // a timer task to execute. 1269 // 1270 _monitor.destroy(); 1271 } 1272 isLocal(EndpointI endpoint)1273 public bool isLocal(EndpointI endpoint) 1274 { 1275 if(_publishedEndpoint != null && endpoint.equivalent(_publishedEndpoint)) 1276 { 1277 return true; 1278 } 1279 lock(this) 1280 { 1281 return endpoint.equivalent(_endpoint); 1282 } 1283 } 1284 endpoint()1285 public EndpointI endpoint() 1286 { 1287 if(_publishedEndpoint != null) 1288 { 1289 return _publishedEndpoint; 1290 } 1291 lock(this) 1292 { 1293 return _endpoint; 1294 } 1295 } 1296 connections()1297 public ICollection<Ice.ConnectionI> connections() 1298 { 1299 lock(this) 1300 { 1301 ICollection<Ice.ConnectionI> connections = new List<Ice.ConnectionI>(); 1302 1303 // 1304 // Only copy connections which have not been destroyed. 1305 // 1306 foreach(Ice.ConnectionI connection in _connections) 1307 { 1308 if(connection.isActiveOrHolding()) 1309 { 1310 connections.Add(connection); 1311 } 1312 } 1313 1314 return connections; 1315 } 1316 } 1317 flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync)1318 public void flushAsyncBatchRequests(Ice.CompressBatch compressBatch, CommunicatorFlushBatchAsync outAsync) 1319 { 1320 // 1321 // connections() is synchronized, no need to synchronize here. 1322 // 1323 foreach(Ice.ConnectionI connection in connections()) 1324 { 1325 try 1326 { 1327 outAsync.flushConnection(connection, compressBatch); 1328 } 1329 catch(Ice.LocalException) 1330 { 1331 // Ignore. 1332 } 1333 } 1334 } 1335 1336 // 1337 // Operations from EventHandler. 1338 // startAsync(int operation, AsyncCallback callback, ref bool completedSynchronously)1339 public override bool startAsync(int operation, AsyncCallback callback, ref bool completedSynchronously) 1340 { 1341 if(_state >= StateClosed) 1342 { 1343 return false; 1344 } 1345 1346 Debug.Assert(_acceptor != null); 1347 try 1348 { 1349 completedSynchronously = _acceptor.startAccept(callback, this); 1350 } 1351 catch(Ice.LocalException ex) 1352 { 1353 _acceptorException = ex; 1354 completedSynchronously = true; 1355 } 1356 return true; 1357 } 1358 finishAsync(int unused)1359 public override bool finishAsync(int unused) 1360 { 1361 try 1362 { 1363 if(_acceptorException != null) 1364 { 1365 throw _acceptorException; 1366 } 1367 _acceptor.finishAccept(); 1368 } 1369 catch(Ice.LocalException ex) 1370 { 1371 _acceptorException = null; 1372 1373 string s = "couldn't accept connection:\n" + ex + '\n' + _acceptor.ToString(); 1374 _instance.initializationData().logger.error(s); 1375 if(_acceptorStarted) 1376 { 1377 _acceptorStarted = false; 1378 _adapter.getThreadPool().finish(this); 1379 closeAcceptor(); 1380 } 1381 } 1382 return _state < StateClosed; 1383 } 1384 message(ref ThreadPoolCurrent current)1385 public override void message(ref ThreadPoolCurrent current) 1386 { 1387 Ice.ConnectionI connection = null; 1388 1389 ThreadPoolMessage msg = new ThreadPoolMessage(this); 1390 1391 lock(this) 1392 { 1393 if(!msg.startIOScope(ref current)) 1394 { 1395 return; 1396 } 1397 1398 try 1399 { 1400 if(_state >= StateClosed) 1401 { 1402 return; 1403 } 1404 else if(_state == StateHolding) 1405 { 1406 return; 1407 } 1408 1409 // 1410 // Reap closed connections 1411 // 1412 ICollection<Ice.ConnectionI> cons = _monitor.swapReapedConnections(); 1413 if(cons != null) 1414 { 1415 foreach(Ice.ConnectionI c in cons) 1416 { 1417 _connections.Remove(c); 1418 } 1419 } 1420 1421 if(!_acceptorStarted) 1422 { 1423 return; 1424 } 1425 1426 // 1427 // Now accept a new connection. 1428 // 1429 Transceiver transceiver = null; 1430 try 1431 { 1432 transceiver = _acceptor.accept(); 1433 1434 if(_instance.traceLevels().network >= 2) 1435 { 1436 StringBuilder s = new StringBuilder("trying to accept "); 1437 s.Append(_endpoint.protocol()); 1438 s.Append(" connection\n"); 1439 s.Append(transceiver.ToString()); 1440 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1441 } 1442 } 1443 catch(Ice.SocketException ex) 1444 { 1445 if(Network.noMoreFds(ex.InnerException)) 1446 { 1447 string s = "can't accept more connections:\n" + ex + '\n' + _acceptor.ToString(); 1448 _instance.initializationData().logger.error(s); 1449 Debug.Assert(_acceptorStarted); 1450 _acceptorStarted = false; 1451 _adapter.getThreadPool().finish(this); 1452 closeAcceptor(); 1453 } 1454 1455 // Ignore socket exceptions. 1456 return; 1457 } 1458 catch(Ice.LocalException ex) 1459 { 1460 // Warn about other Ice local exceptions. 1461 if(_warn) 1462 { 1463 warning(ex); 1464 } 1465 return; 1466 } 1467 1468 Debug.Assert(transceiver != null); 1469 1470 try 1471 { 1472 connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, _monitor, transceiver, 1473 null, _endpoint, _adapter); 1474 } 1475 catch(Ice.LocalException ex) 1476 { 1477 try 1478 { 1479 transceiver.close(); 1480 } 1481 catch(Ice.LocalException) 1482 { 1483 // Ignore 1484 } 1485 1486 if(_warn) 1487 { 1488 warning(ex); 1489 } 1490 return; 1491 } 1492 1493 _connections.Add(connection); 1494 } 1495 finally 1496 { 1497 msg.finishIOScope(ref current); 1498 } 1499 } 1500 1501 Debug.Assert(connection != null); 1502 connection.start(this); 1503 } 1504 finished(ref ThreadPoolCurrent current)1505 public override void finished(ref ThreadPoolCurrent current) 1506 { 1507 lock(this) 1508 { 1509 if(_state < StateClosed) 1510 { 1511 // 1512 // If the acceptor hasn't been explicitly stopped (which is the case if the acceptor got closed 1513 // because of an unexpected error), try to restart the acceptor in 1 second. 1514 // 1515 _instance.timer().schedule(new StartAcceptor(this), 1000); 1516 return; 1517 } 1518 1519 Debug.Assert(_state >= StateClosed); 1520 setState(StateFinished); 1521 } 1522 } 1523 ToString()1524 public override string ToString() 1525 { 1526 if(_transceiver != null) 1527 { 1528 return _transceiver.ToString(); 1529 } 1530 return _acceptor.ToString(); 1531 } 1532 1533 // 1534 // Operations from ConnectionI.StartCallback 1535 // connectionStartCompleted(Ice.ConnectionI connection)1536 public void connectionStartCompleted(Ice.ConnectionI connection) 1537 { 1538 lock(this) 1539 { 1540 // 1541 // Initially, connections are in the holding state. If the factory is active 1542 // we activate the connection. 1543 // 1544 if(_state == StateActive) 1545 { 1546 connection.activate(); 1547 } 1548 } 1549 } 1550 connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex)1551 public void connectionStartFailed(Ice.ConnectionI connection, Ice.LocalException ex) 1552 { 1553 lock(this) 1554 { 1555 if(_state >= StateClosed) 1556 { 1557 return; 1558 } 1559 1560 // 1561 // Do not warn about connection exceptions here. The connection is not yet validated. 1562 // 1563 } 1564 } 1565 IncomingConnectionFactory(Instance instance, EndpointI endpoint, EndpointI publish, Ice.ObjectAdapterI adapter)1566 public IncomingConnectionFactory(Instance instance, EndpointI endpoint, EndpointI publish, 1567 Ice.ObjectAdapterI adapter) 1568 { 1569 _instance = instance; 1570 _endpoint = endpoint; 1571 _publishedEndpoint = publish; 1572 _adapter = adapter; 1573 _warn = _instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Connections") > 0; 1574 _connections = new HashSet<Ice.ConnectionI>(); 1575 _state = StateHolding; 1576 _acceptorStarted = false; 1577 _monitor = new FactoryACMMonitor(instance, ((Ice.ObjectAdapterI)adapter).getACM()); 1578 1579 DefaultsAndOverrides defaultsAndOverrides = _instance.defaultsAndOverrides(); 1580 if(defaultsAndOverrides.overrideTimeout) 1581 { 1582 _endpoint = _endpoint.timeout(defaultsAndOverrides.overrideTimeoutValue); 1583 } 1584 1585 if(defaultsAndOverrides.overrideCompress) 1586 { 1587 _endpoint = _endpoint.compress(defaultsAndOverrides.overrideCompressValue); 1588 } 1589 1590 try 1591 { 1592 _transceiver = _endpoint.transceiver(); 1593 if(_transceiver != null) 1594 { 1595 if(_instance.traceLevels().network >= 2) 1596 { 1597 StringBuilder s = new StringBuilder("attempting to bind to "); 1598 s.Append(_endpoint.protocol()); 1599 s.Append(" socket\n"); 1600 s.Append(_transceiver.ToString()); 1601 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1602 } 1603 _endpoint = _transceiver.bind(); 1604 1605 Ice.ConnectionI connection = new Ice.ConnectionI(_adapter.getCommunicator(), _instance, null, 1606 _transceiver, null, _endpoint, _adapter); 1607 connection.startAndWait(); 1608 _connections.Add(connection); 1609 } 1610 else 1611 { 1612 createAcceptor(); 1613 } 1614 } 1615 catch(Exception ex) 1616 { 1617 // 1618 // Clean up. 1619 // 1620 if(_transceiver != null) 1621 { 1622 try 1623 { 1624 _transceiver.close(); 1625 } 1626 catch(Ice.LocalException) 1627 { 1628 // Ignore 1629 } 1630 } 1631 1632 _state = StateFinished; 1633 _monitor.destroy(); 1634 _connections.Clear(); 1635 1636 if(ex is Ice.LocalException) 1637 { 1638 throw; 1639 } 1640 else 1641 { 1642 throw new Ice.SyscallException(ex); 1643 } 1644 } 1645 } 1646 1647 private const int StateActive = 0; 1648 private const int StateHolding = 1; 1649 private const int StateClosed = 2; 1650 private const int StateFinished = 3; 1651 setState(int state)1652 private void setState(int state) 1653 { 1654 if(_state == state) // Don't switch twice. 1655 { 1656 return; 1657 } 1658 1659 switch (state) 1660 { 1661 case StateActive: 1662 { 1663 if(_state != StateHolding) // Can only switch from holding to active. 1664 { 1665 return; 1666 } 1667 if(_acceptor != null) 1668 { 1669 if(_instance.traceLevels().network >= 1) 1670 { 1671 StringBuilder s = new StringBuilder("accepting "); 1672 s.Append(_endpoint.protocol()); 1673 s.Append(" connections at "); 1674 s.Append(_acceptor.ToString()); 1675 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, 1676 s.ToString()); 1677 } 1678 _adapter.getThreadPool().register(this, SocketOperation.Read); 1679 } 1680 1681 foreach(Ice.ConnectionI connection in _connections) 1682 { 1683 connection.activate(); 1684 } 1685 break; 1686 } 1687 1688 case StateHolding: 1689 { 1690 if(_state != StateActive) // Can only switch from active to holding. 1691 { 1692 return; 1693 } 1694 if(_acceptor != null) 1695 { 1696 if(_instance.traceLevels().network >= 1) 1697 { 1698 StringBuilder s = new StringBuilder("holding "); 1699 s.Append(_endpoint.protocol()); 1700 s.Append(" connections at "); 1701 s.Append(_acceptor.ToString()); 1702 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, 1703 s.ToString()); 1704 } 1705 _adapter.getThreadPool().unregister(this, SocketOperation.Read); 1706 } 1707 1708 foreach(Ice.ConnectionI connection in _connections) 1709 { 1710 connection.hold(); 1711 } 1712 break; 1713 } 1714 1715 case StateClosed: 1716 { 1717 if(_acceptorStarted) 1718 { 1719 _acceptorStarted = false; 1720 _adapter.getThreadPool().finish(this); 1721 closeAcceptor(); 1722 } 1723 else 1724 { 1725 state = StateFinished; 1726 } 1727 1728 foreach(Ice.ConnectionI connection in _connections) 1729 { 1730 connection.destroy(Ice.ConnectionI.ObjectAdapterDeactivated); 1731 } 1732 break; 1733 } 1734 1735 case StateFinished: 1736 { 1737 Debug.Assert(_state == StateClosed); 1738 break; 1739 } 1740 } 1741 1742 _state = state; 1743 System.Threading.Monitor.PulseAll(this); 1744 } 1745 createAcceptor()1746 private void createAcceptor() 1747 { 1748 try 1749 { 1750 Debug.Assert(!_acceptorStarted); 1751 _acceptor = _endpoint.acceptor(_adapter.getName()); 1752 Debug.Assert(_acceptor != null); 1753 1754 if(_instance.traceLevels().network >= 2) 1755 { 1756 StringBuilder s = new StringBuilder("attempting to bind to "); 1757 s.Append(_endpoint.protocol()); 1758 s.Append(" socket "); 1759 s.Append(_acceptor.ToString()); 1760 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1761 } 1762 _endpoint = _acceptor.listen(); 1763 1764 if(_instance.traceLevels().network >= 1) 1765 { 1766 StringBuilder s = new StringBuilder("listening for "); 1767 s.Append(_endpoint.protocol()); 1768 s.Append(" connections\n"); 1769 s.Append(_acceptor.toDetailedString()); 1770 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1771 } 1772 1773 _adapter.getThreadPool().initialize(this); 1774 1775 if(_state == StateActive) 1776 { 1777 _adapter.getThreadPool().register(this, SocketOperation.Read); 1778 } 1779 1780 _acceptorStarted = true; 1781 } 1782 catch(SystemException) 1783 { 1784 if(_acceptor != null) 1785 { 1786 _acceptor.close(); 1787 } 1788 throw; 1789 } 1790 } 1791 closeAcceptor()1792 private void closeAcceptor() 1793 { 1794 Debug.Assert(_acceptor != null); 1795 1796 if(_instance.traceLevels().network >= 1) 1797 { 1798 StringBuilder s = new StringBuilder("stopping to accept "); 1799 s.Append(_endpoint.protocol()); 1800 s.Append(" connections at "); 1801 s.Append(_acceptor.ToString()); 1802 _instance.initializationData().logger.trace(_instance.traceLevels().networkCat, s.ToString()); 1803 } 1804 1805 Debug.Assert(!_acceptorStarted); 1806 _acceptor.close(); 1807 } 1808 warning(Ice.LocalException ex)1809 private void warning(Ice.LocalException ex) 1810 { 1811 _instance.initializationData().logger.warning("connection exception:\n" + ex + '\n' + _acceptor.ToString()); 1812 } 1813 1814 private Instance _instance; 1815 private FactoryACMMonitor _monitor; 1816 1817 private Acceptor _acceptor; 1818 private readonly Transceiver _transceiver; 1819 private EndpointI _endpoint; 1820 private readonly EndpointI _publishedEndpoint; 1821 1822 private Ice.ObjectAdapterI _adapter; 1823 1824 private readonly bool _warn; 1825 1826 private HashSet<Ice.ConnectionI> _connections; 1827 1828 private int _state; 1829 private bool _acceptorStarted; 1830 private Ice.LocalException _acceptorException; 1831 } 1832 1833 } 1834