1 // 2 // Copyright (c) ZeroC, Inc. All rights reserved. 3 // 4 5 namespace IceLocatorDiscovery 6 { 7 using System; 8 using System.Collections.Generic; 9 using System.Diagnostics; 10 using System.Threading; 11 using System.Threading.Tasks; 12 using System.Text; 13 14 public sealed class PluginFactory : Ice.PluginFactory 15 { 16 public Ice.Plugin create(Ice.Communicator communicator, string name, string[] args)17 create(Ice.Communicator communicator, string name, string[] args) 18 { 19 return new PluginI(name, communicator); 20 } 21 } 22 23 public interface Plugin : Ice.Plugin 24 { getLocators(string instanceName, int waitTime)25 List<Ice.LocatorPrx> getLocators(string instanceName, int waitTime); 26 } 27 28 internal class Request : TaskCompletionSource<Ice.Object_Ice_invokeResult> 29 { Request(LocatorI locator, string operation, Ice.OperationMode mode, byte[] inParams, Dictionary<string, string> context)30 public Request(LocatorI locator, 31 string operation, 32 Ice.OperationMode mode, 33 byte[] inParams, 34 Dictionary<string, string> context) 35 { 36 _locator = locator; 37 _operation = operation; 38 _mode = mode; 39 _inParams = inParams; 40 _context = context; 41 } 42 43 public void invoke(Ice.LocatorPrx l)44 invoke(Ice.LocatorPrx l) 45 { 46 if(_locatorPrx == null || !_locatorPrx.Equals(l)) 47 { 48 _locatorPrx = l; 49 l.ice_invokeAsync(_operation, _mode, _inParams, _context).ContinueWith( 50 (task) => 51 { 52 try 53 { 54 SetResult(task.Result); 55 } 56 catch(AggregateException ae) 57 { 58 exception(ae.InnerException); 59 } 60 }); 61 } 62 else 63 { 64 Debug.Assert(_exception != null); 65 throw _exception; 66 } 67 } 68 69 private void exception(Exception ex)70 exception(Exception ex) 71 { 72 try 73 { 74 throw ex; 75 } 76 catch(Ice.RequestFailedException exc) 77 { 78 SetException(exc); 79 } 80 catch(Ice.UnknownException exc) 81 { 82 SetException(exc); 83 } 84 catch(Ice.NoEndpointException) 85 { 86 SetException(new Ice.ObjectNotExistException()); 87 } 88 catch(Ice.ObjectAdapterDeactivatedException) 89 { 90 SetException(new Ice.ObjectNotExistException()); 91 } 92 catch(Ice.CommunicatorDestroyedException) 93 { 94 SetException(new Ice.ObjectNotExistException()); 95 } 96 catch(Exception exc) 97 { 98 _exception = exc; 99 _locator.invoke(_locatorPrx, this); // Retry with new locator proxy 100 } 101 } 102 103 private readonly LocatorI _locator; 104 private readonly string _operation; 105 private readonly Ice.OperationMode _mode; 106 private readonly Dictionary<string, string> _context; 107 private readonly byte[] _inParams; 108 109 private Ice.LocatorPrx _locatorPrx; 110 private Exception _exception; 111 } 112 113 internal class VoidLocatorI : Ice.LocatorDisp_ 114 { 115 public override Task<Ice.ObjectPrx> findObjectByIdAsync(Ice.Identity id, Ice.Current current)116 findObjectByIdAsync(Ice.Identity id, Ice.Current current) 117 { 118 return null; 119 } 120 121 public override Task<Ice.ObjectPrx> findAdapterByIdAsync(string id, Ice.Current current)122 findAdapterByIdAsync(string id, Ice.Current current) 123 { 124 return null; 125 } 126 127 public override Ice.LocatorRegistryPrx getRegistry(Ice.Current current)128 getRegistry(Ice.Current current) 129 { 130 return null; 131 } 132 }; 133 134 internal class LocatorI : Ice.BlobjectAsync, IceInternal.TimerTask 135 { 136 public LocatorI(string name, LookupPrx lookup, Ice.Properties properties, string instanceName, Ice.LocatorPrx voidLocator)137 LocatorI(string name, LookupPrx lookup, Ice.Properties properties, string instanceName, 138 Ice.LocatorPrx voidLocator) 139 { 140 _lookup = lookup; 141 _timeout = properties.getPropertyAsIntWithDefault(name + ".Timeout", 300); 142 _retryCount = properties.getPropertyAsIntWithDefault(name + ".RetryCount", 3); 143 _retryDelay = properties.getPropertyAsIntWithDefault(name + ".RetryDelay", 2000); 144 _timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer(); 145 _traceLevel = properties.getPropertyAsInt(name + ".Trace.Lookup"); 146 _instanceName = instanceName; 147 _warned = false; 148 _locator = lookup.ice_getCommunicator().getDefaultLocator(); 149 _voidLocator = voidLocator; 150 _pendingRetryCount = 0; 151 _failureCount = 0; 152 _warnOnce = true; 153 154 // 155 // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast 156 // datagram on each endpoint. 157 // 158 var single = new Ice.Endpoint[1]; 159 foreach(var endpt in lookup.ice_getEndpoints()) 160 { 161 single[0] = endpt; 162 _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null; 163 } 164 Debug.Assert(_lookups.Count > 0); 165 } 166 167 public void setLookupReply(LookupReplyPrx lookupReply)168 setLookupReply(LookupReplyPrx lookupReply) 169 { 170 // 171 // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams. 172 // 173 var single = new Ice.Endpoint[1]; 174 foreach(var key in new List<LookupPrx>(_lookups.Keys)) 175 { 176 var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo(); 177 if(info.mcastInterface.Length > 0) 178 { 179 foreach(var q in lookupReply.ice_getEndpoints()) 180 { 181 var r = q.getInfo(); 182 if(r is Ice.IPEndpointInfo && ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface)) 183 { 184 single[0] = q; 185 _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single); 186 } 187 } 188 } 189 190 if(_lookups[key] == null) 191 { 192 // Fallback: just use the given lookup reply proxy if no matching endpoint found. 193 _lookups[key] = lookupReply; 194 } 195 } 196 } 197 198 public override Task<Ice.Object_Ice_invokeResult> ice_invokeAsync(byte[] inParams, Ice.Current current)199 ice_invokeAsync(byte[] inParams, Ice.Current current) 200 { 201 lock(this) 202 { 203 var request = new Request(this, current.operation, current.mode, inParams, current.ctx); 204 invoke(null, request); 205 return request.Task; 206 } 207 } 208 209 public List<Ice.LocatorPrx> getLocators(String instanceName, int waitTime)210 getLocators(String instanceName, int waitTime) 211 { 212 // 213 // Clear locators from previous search. 214 // 215 lock(this) 216 { 217 _locators.Clear(); 218 } 219 220 // 221 // Find a locator 222 // 223 invoke(null, null); 224 225 // 226 // Wait for responses 227 // 228 if(instanceName.Length == 0) 229 { 230 Thread.Sleep(waitTime); 231 } 232 else 233 { 234 lock(this) 235 { 236 while(!_locators.ContainsKey(instanceName) && _pendingRetryCount > 0) 237 { 238 Monitor.Wait(this, waitTime); 239 } 240 } 241 } 242 243 // 244 // Return found locators 245 // 246 lock(this) 247 { 248 return new List<Ice.LocatorPrx>(_locators.Values); 249 } 250 } 251 252 public void foundLocator(Ice.LocatorPrx locator)253 foundLocator(Ice.LocatorPrx locator) 254 { 255 lock(this) 256 { 257 if(locator == null || 258 (_instanceName.Length > 0 && !locator.ice_getIdentity().category.Equals(_instanceName))) 259 { 260 if(_traceLevel > 2) 261 { 262 StringBuilder s = new StringBuilder("ignoring locator reply: instance name doesn't match\n"); 263 s.Append("expected = ").Append(_instanceName); 264 s.Append("received = ").Append(locator.ice_getIdentity().category); 265 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 266 } 267 return; 268 } 269 270 // 271 // If we already have a locator assigned, ensure the given locator 272 // has the same identity, otherwise ignore it. 273 // 274 if(_pendingRequests.Count > 0 && 275 _locator != null && !locator.ice_getIdentity().category.Equals(_locator.ice_getIdentity().category)) 276 { 277 if(!_warned) 278 { 279 _warned = true; // Only warn once 280 281 locator.ice_getCommunicator().getLogger().warning( 282 "received Ice locator with different instance name:\n" + 283 "using = `" + _locator.ice_getIdentity().category + "'\n" + 284 "received = `" + locator.ice_getIdentity().category + "'\n" + 285 "This is typically the case if multiple Ice locators with different " + 286 "instance names are deployed and the property `IceLocatorDiscovery.InstanceName'" + 287 "is not set."); 288 289 } 290 return; 291 } 292 293 if(_pendingRetryCount > 0) // No need to retry, we found a locator 294 { 295 _timer.cancel(this); 296 _pendingRetryCount = 0; 297 } 298 299 if(_traceLevel > 0) 300 { 301 StringBuilder s = new StringBuilder("locator lookup succeeded:\nlocator = "); 302 s.Append(locator); 303 if(_instanceName.Length == 0) 304 { 305 s.Append("\ninstance name = ").Append(_instanceName); 306 } 307 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 308 } 309 310 Ice.LocatorPrx l = null; 311 if(_pendingRequests.Count == 0) 312 { 313 _locators.TryGetValue(locator.ice_getIdentity().category, out _locator); 314 } 315 else 316 { 317 l = _locator; 318 } 319 if(l != null) 320 { 321 // 322 // We found another locator replica, append its endpoints to the 323 // current locator proxy endpoints. 324 // 325 List<Ice.Endpoint> newEndpoints = new List<Ice.Endpoint>(l.ice_getEndpoints()); 326 foreach(Ice.Endpoint p in locator.ice_getEndpoints()) 327 { 328 // 329 // Only add endpoints if not already in the locator proxy endpoints 330 // 331 bool found = false; 332 foreach(Ice.Endpoint q in newEndpoints) 333 { 334 if(p.Equals(q)) 335 { 336 found = true; 337 break; 338 } 339 } 340 if(!found) 341 { 342 newEndpoints.Add(p); 343 } 344 } 345 l = (Ice.LocatorPrx)l.ice_endpoints(newEndpoints.ToArray()); 346 } 347 else 348 { 349 l = locator; 350 } 351 352 if(_pendingRequests.Count == 0) 353 { 354 _locators[locator.ice_getIdentity().category] = l; 355 Monitor.Pulse(this); 356 } 357 else 358 { 359 _locator = l; 360 if(_instanceName.Length == 0) 361 { 362 _instanceName = _locator.ice_getIdentity().category; // Stick to the first locator 363 } 364 365 // 366 // Send pending requests if any. 367 // 368 foreach(Request req in _pendingRequests) 369 { 370 req.invoke(_locator); 371 } 372 _pendingRequests.Clear(); 373 } 374 } 375 } 376 377 public void invoke(Ice.LocatorPrx locator, Request request)378 invoke(Ice.LocatorPrx locator, Request request) 379 { 380 lock(this) 381 { 382 if(request != null && _locator != null && _locator != locator) 383 { 384 request.invoke(_locator); 385 } 386 else if(request != null && IceInternal.Time.currentMonotonicTimeMillis() < _nextRetry) 387 { 388 request.invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires 389 } 390 else 391 { 392 _locator = null; 393 394 if(request != null) 395 { 396 _pendingRequests.Add(request); 397 } 398 399 if(_pendingRetryCount == 0) // No request in progress 400 { 401 _pendingRetryCount = _retryCount; 402 _failureCount = 0; 403 try 404 { 405 if(_traceLevel > 1) 406 { 407 StringBuilder s = new StringBuilder("looking up locator:\nlookup = "); 408 s.Append(_lookup); 409 if(_instanceName.Length == 0) 410 { 411 s.Append("\ninstance name = ").Append(_instanceName); 412 } 413 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 414 } 415 416 foreach(var l in _lookups) 417 { 418 l.Key.findLocatorAsync(_instanceName, l.Value).ContinueWith(t => { 419 try 420 { 421 t.Wait(); 422 } 423 catch(AggregateException ex) 424 { 425 exception(ex.InnerException); 426 } 427 }, l.Key.ice_scheduler()); // Send multicast request. 428 } 429 _timer.schedule(this, _timeout); 430 } 431 catch(Ice.LocalException ex) 432 { 433 if(_traceLevel > 0) 434 { 435 StringBuilder s = new StringBuilder("locator lookup failed:\nlookup = "); 436 s.Append(_lookup); 437 if(_instanceName.Length == 0) 438 { 439 s.Append("\ninstance name = ").Append(_instanceName); 440 } 441 s.Append("\n").Append(ex); 442 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 443 } 444 445 foreach(Request req in _pendingRequests) 446 { 447 req.invoke(_voidLocator); 448 } 449 _pendingRequests.Clear(); 450 _pendingRetryCount = 0; 451 } 452 } 453 } 454 } 455 } 456 exception(Exception ex)457 void exception(Exception ex) 458 { 459 lock(this) 460 { 461 if(++_failureCount == _lookups.Count && _pendingRetryCount > 0) 462 { 463 // 464 // All the lookup calls failed, cancel the timer and propagate the error to the requests. 465 // 466 _timer.cancel(this); 467 468 _pendingRetryCount = 0; 469 470 if(_warnOnce) 471 { 472 StringBuilder builder = new StringBuilder(); 473 builder.Append("failed to lookup locator with lookup proxy `"); 474 builder.Append(_lookup); 475 builder.Append("':\n"); 476 builder.Append(ex); 477 _lookup.ice_getCommunicator().getLogger().warning(builder.ToString()); 478 _warnOnce = false; 479 } 480 481 if(_traceLevel > 0) 482 { 483 StringBuilder s = new StringBuilder("locator lookup failed:\nlookup = "); 484 s.Append(_lookup); 485 if(_instanceName.Length == 0) 486 { 487 s.Append("\ninstance name = ").Append(_instanceName); 488 } 489 s.Append("\n").Append(ex); 490 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 491 } 492 493 if(_pendingRequests.Count == 0) 494 { 495 Monitor.Pulse(this); 496 } 497 else 498 { 499 foreach(Request req in _pendingRequests) 500 { 501 req.invoke(_voidLocator); 502 } 503 _pendingRequests.Clear(); 504 } 505 } 506 } 507 } 508 runTimerTask()509 public void runTimerTask() 510 { 511 lock(this) 512 { 513 if(--_pendingRetryCount > 0) 514 { 515 try 516 { 517 if(_traceLevel > 1) 518 { 519 StringBuilder s = new StringBuilder("retrying locator lookup:\nlookup = "); 520 s.Append(_lookup); 521 s.Append("\nretry count = ").Append(_retryCount); 522 if(_instanceName.Length == 0) 523 { 524 s.Append("\ninstance name = ").Append(_instanceName); 525 } 526 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 527 } 528 529 foreach(var l in _lookups) 530 { 531 l.Key.findLocatorAsync(_instanceName, l.Value).ContinueWith(t => { 532 try 533 { 534 t.Wait(); 535 } 536 catch(AggregateException ex) 537 { 538 exception(ex.InnerException); 539 } 540 }, l.Key.ice_scheduler()); // Send multicast request. 541 } 542 _timer.schedule(this, _timeout); 543 return; 544 } 545 catch(Ice.LocalException) 546 { 547 } 548 _pendingRetryCount = 0; 549 } 550 551 if(_traceLevel > 0) 552 { 553 StringBuilder s = new StringBuilder("locator lookup timed out:\nlookup = "); 554 s.Append(_lookup); 555 if(_instanceName.Length == 0) 556 { 557 s.Append("\ninstance name = ").Append(_instanceName); 558 } 559 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString()); 560 } 561 562 if(_pendingRequests.Count == 0) 563 { 564 Monitor.Pulse(this); 565 } 566 else 567 { 568 foreach(Request req in _pendingRequests) 569 { 570 req.invoke(_voidLocator); 571 } 572 _pendingRequests.Clear(); 573 } 574 _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; 575 } 576 } 577 578 private LookupPrx _lookup; 579 private Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>(); 580 private int _timeout; 581 private IceInternal.Timer _timer; 582 private int _traceLevel; 583 private int _retryCount; 584 private int _retryDelay; 585 586 private string _instanceName; 587 private bool _warned; 588 private Ice.LocatorPrx _locator; 589 private Ice.LocatorPrx _voidLocator; 590 private Dictionary<string, Ice.LocatorPrx> _locators = new Dictionary<string, Ice.LocatorPrx>(); 591 592 private int _pendingRetryCount; 593 private int _failureCount; 594 private bool _warnOnce = true; 595 private List<Request> _pendingRequests = new List<Request>(); 596 private long _nextRetry; 597 }; 598 599 internal class LookupReplyI : LookupReplyDisp_ 600 { LookupReplyI(LocatorI locator)601 public LookupReplyI(LocatorI locator) 602 { 603 _locator = locator; 604 } 605 606 public override void foundLocator(Ice.LocatorPrx locator, Ice.Current current)607 foundLocator(Ice.LocatorPrx locator, Ice.Current current) 608 { 609 _locator.foundLocator(locator); 610 } 611 612 private LocatorI _locator; 613 } 614 615 internal class PluginI : Ice.Plugin 616 { 617 public PluginI(string name, Ice.Communicator communicator)618 PluginI(string name, Ice.Communicator communicator) 619 { 620 _name = name; 621 _communicator = communicator; 622 } 623 624 public void initialize()625 initialize() 626 { 627 Ice.Properties properties = _communicator.getProperties(); 628 629 bool ipv4 = properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0; 630 bool preferIPv6 = properties.getPropertyAsInt("Ice.PreferIPv6Address") > 0; 631 string address; 632 if(ipv4 && !preferIPv6) 633 { 634 address = properties.getPropertyWithDefault(_name + ".Address", "239.255.0.1"); 635 } 636 else 637 { 638 address = properties.getPropertyWithDefault(_name + ".Address", "ff15::1"); 639 } 640 int port = properties.getPropertyAsIntWithDefault(_name + ".Port", 4061); 641 string intf = properties.getProperty(_name + ".Interface"); 642 643 string lookupEndpoints = properties.getProperty(_name + ".Lookup"); 644 if(lookupEndpoints.Length == 0) 645 { 646 int protocol = ipv4 && !preferIPv6 ? IceInternal.Network.EnableIPv4 : IceInternal.Network.EnableIPv6; 647 var interfaces = IceInternal.Network.getInterfacesForMulticast(intf, protocol); 648 foreach(string p in interfaces) 649 { 650 if(p != interfaces[0]) 651 { 652 lookupEndpoints += ":"; 653 } 654 lookupEndpoints += "udp -h \"" + address + "\" -p " + port + " --interface \"" + p + "\""; 655 } 656 } 657 658 if(properties.getProperty(_name + ".Reply.Endpoints").Length == 0) 659 { 660 properties.setProperty(_name + ".Reply.Endpoints", 661 "udp -h " + (intf.Length == 0 ? "*" : "\"" + intf + "\"")); 662 } 663 664 if(properties.getProperty(_name + ".Locator.Endpoints").Length == 0) 665 { 666 properties.setProperty(_name + ".Locator.AdapterId", Guid.NewGuid().ToString()); 667 } 668 669 _replyAdapter = _communicator.createObjectAdapter(_name + ".Reply"); 670 _locatorAdapter = _communicator.createObjectAdapter(_name + ".Locator"); 671 672 // We don't want those adapters to be registered with the locator so clear their locator. 673 _replyAdapter.setLocator(null); 674 _locatorAdapter.setLocator(null); 675 676 Ice.ObjectPrx lookupPrx = _communicator.stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints); 677 // No colloc optimization or router for the multicast proxy! 678 lookupPrx = lookupPrx.ice_collocationOptimized(false).ice_router(null); 679 680 Ice.LocatorPrx voidLo = Ice.LocatorPrxHelper.uncheckedCast(_locatorAdapter.addWithUUID(new VoidLocatorI())); 681 682 string instanceName = properties.getProperty(_name + ".InstanceName"); 683 Ice.Identity id = new Ice.Identity(); 684 id.name = "Locator"; 685 id.category = instanceName.Length > 0 ? instanceName : Guid.NewGuid().ToString(); 686 687 _defaultLocator = _communicator.getDefaultLocator(); 688 _locator = new LocatorI(_name, LookupPrxHelper.uncheckedCast(lookupPrx), properties, instanceName, voidLo); 689 _locatorPrx = Ice.LocatorPrxHelper.uncheckedCast(_locatorAdapter.addWithUUID(_locator)); 690 _communicator.setDefaultLocator(_locatorPrx); 691 692 Ice.ObjectPrx lookupReply = _replyAdapter.addWithUUID(new LookupReplyI(_locator)).ice_datagram(); 693 _locator.setLookupReply(LookupReplyPrxHelper.uncheckedCast(lookupReply)); 694 695 _replyAdapter.activate(); 696 _locatorAdapter.activate(); 697 } 698 699 public void destroy()700 destroy() 701 { 702 if(_replyAdapter != null) 703 { 704 _replyAdapter.destroy(); 705 } 706 if(_locatorAdapter != null) 707 { 708 _locatorAdapter.destroy(); 709 } 710 if(_communicator.getDefaultLocator().Equals(_locatorPrx)) 711 { 712 // Restore original default locator proxy, if the user didn't change it in the meantime 713 _communicator.setDefaultLocator(_defaultLocator); 714 } 715 } 716 717 List<Ice.LocatorPrx> getLocators(string instanceName, int waitTime)718 getLocators(string instanceName, int waitTime) 719 { 720 return _locator.getLocators(instanceName, waitTime); 721 } 722 723 private string _name; 724 private Ice.Communicator _communicator; 725 private Ice.ObjectAdapter _locatorAdapter; 726 private Ice.ObjectAdapter _replyAdapter; 727 private LocatorI _locator; 728 private Ice.LocatorPrx _locatorPrx; 729 private Ice.LocatorPrx _defaultLocator; 730 } 731 732 public class Util 733 { 734 public static void registerIceLocatorDiscovery(bool loadOnInitialize)735 registerIceLocatorDiscovery(bool loadOnInitialize) 736 { 737 Ice.Util.registerPluginFactory("IceLocatorDiscovery", new PluginFactory(), loadOnInitialize); 738 } 739 } 740 } 741