1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 namespace IceLocatorDiscovery
6 {
7     using System;
8     using System.Collections.Generic;
9     using System.Diagnostics;
10     using System.Threading;
11     using System.Threading.Tasks;
12     using System.Text;
13 
14     public sealed class PluginFactory : Ice.PluginFactory
15     {
16         public Ice.Plugin
create(Ice.Communicator communicator, string name, string[] args)17         create(Ice.Communicator communicator, string name, string[] args)
18         {
19             return new PluginI(name, communicator);
20         }
21     }
22 
23     public interface Plugin : Ice.Plugin
24     {
getLocators(string instanceName, int waitTime)25         List<Ice.LocatorPrx> getLocators(string instanceName, int waitTime);
26     }
27 
28     internal class Request : TaskCompletionSource<Ice.Object_Ice_invokeResult>
29     {
Request(LocatorI locator, string operation, Ice.OperationMode mode, byte[] inParams, Dictionary<string, string> context)30         public Request(LocatorI locator,
31                        string operation,
32                        Ice.OperationMode mode,
33                        byte[] inParams,
34                        Dictionary<string, string> context)
35         {
36             _locator = locator;
37             _operation = operation;
38             _mode = mode;
39             _inParams = inParams;
40             _context = context;
41         }
42 
43         public void
invoke(Ice.LocatorPrx l)44         invoke(Ice.LocatorPrx l)
45         {
46             if(_locatorPrx == null || !_locatorPrx.Equals(l))
47             {
48                 _locatorPrx = l;
49                 l.ice_invokeAsync(_operation, _mode, _inParams, _context).ContinueWith(
50                     (task) =>
51                     {
52                         try
53                         {
54                             SetResult(task.Result);
55                         }
56                         catch(AggregateException ae)
57                         {
58                             exception(ae.InnerException);
59                         }
60                     });
61             }
62             else
63             {
64                 Debug.Assert(_exception != null);
65                 throw _exception;
66             }
67         }
68 
69         private void
exception(Exception ex)70         exception(Exception ex)
71         {
72             try
73             {
74                 throw ex;
75             }
76             catch(Ice.RequestFailedException exc)
77             {
78                 SetException(exc);
79             }
80             catch(Ice.UnknownException exc)
81             {
82                 SetException(exc);
83             }
84             catch(Ice.NoEndpointException)
85             {
86                 SetException(new Ice.ObjectNotExistException());
87             }
88             catch(Ice.ObjectAdapterDeactivatedException)
89             {
90                 SetException(new Ice.ObjectNotExistException());
91             }
92             catch(Ice.CommunicatorDestroyedException)
93             {
94                 SetException(new Ice.ObjectNotExistException());
95             }
96             catch(Exception exc)
97             {
98                 _exception = exc;
99                 _locator.invoke(_locatorPrx, this); // Retry with new locator proxy
100             }
101         }
102 
103         private readonly LocatorI _locator;
104         private readonly string _operation;
105         private readonly Ice.OperationMode _mode;
106         private readonly Dictionary<string, string> _context;
107         private readonly byte[] _inParams;
108 
109         private Ice.LocatorPrx _locatorPrx;
110         private Exception _exception;
111     }
112 
113     internal class VoidLocatorI : Ice.LocatorDisp_
114     {
115         public override Task<Ice.ObjectPrx>
findObjectByIdAsync(Ice.Identity id, Ice.Current current)116         findObjectByIdAsync(Ice.Identity id, Ice.Current current)
117         {
118             return null;
119         }
120 
121         public override Task<Ice.ObjectPrx>
findAdapterByIdAsync(string id, Ice.Current current)122         findAdapterByIdAsync(string id, Ice.Current current)
123         {
124             return null;
125         }
126 
127         public override Ice.LocatorRegistryPrx
getRegistry(Ice.Current current)128         getRegistry(Ice.Current current)
129         {
130             return null;
131         }
132     };
133 
134     internal class LocatorI : Ice.BlobjectAsync, IceInternal.TimerTask
135     {
136         public
LocatorI(string name, LookupPrx lookup, Ice.Properties properties, string instanceName, Ice.LocatorPrx voidLocator)137         LocatorI(string name, LookupPrx lookup, Ice.Properties properties, string instanceName,
138                  Ice.LocatorPrx voidLocator)
139         {
140             _lookup = lookup;
141             _timeout = properties.getPropertyAsIntWithDefault(name + ".Timeout", 300);
142             _retryCount = properties.getPropertyAsIntWithDefault(name + ".RetryCount", 3);
143             _retryDelay = properties.getPropertyAsIntWithDefault(name + ".RetryDelay", 2000);
144             _timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer();
145             _traceLevel = properties.getPropertyAsInt(name + ".Trace.Lookup");
146             _instanceName = instanceName;
147             _warned = false;
148             _locator = lookup.ice_getCommunicator().getDefaultLocator();
149             _voidLocator = voidLocator;
150             _pendingRetryCount = 0;
151             _failureCount = 0;
152             _warnOnce = true;
153 
154             //
155             // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
156             // datagram on each endpoint.
157             //
158             var single = new Ice.Endpoint[1];
159             foreach(var endpt in lookup.ice_getEndpoints())
160             {
161                 single[0] = endpt;
162                 _lookups[(LookupPrx)lookup.ice_endpoints(single)] = null;
163             }
164             Debug.Assert(_lookups.Count > 0);
165         }
166 
167         public void
setLookupReply(LookupReplyPrx lookupReply)168         setLookupReply(LookupReplyPrx lookupReply)
169         {
170             //
171             // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
172             //
173             var single = new Ice.Endpoint[1];
174             foreach(var key in new List<LookupPrx>(_lookups.Keys))
175             {
176                 var info = (Ice.UDPEndpointInfo)key.ice_getEndpoints()[0].getInfo();
177                 if(info.mcastInterface.Length > 0)
178                 {
179                     foreach(var q in lookupReply.ice_getEndpoints())
180                     {
181                         var r = q.getInfo();
182                         if(r is Ice.IPEndpointInfo && ((Ice.IPEndpointInfo)r).host.Equals(info.mcastInterface))
183                         {
184                             single[0] = q;
185                             _lookups[key] = (LookupReplyPrx)lookupReply.ice_endpoints(single);
186                         }
187                     }
188                 }
189 
190                 if(_lookups[key] == null)
191                 {
192                     // Fallback: just use the given lookup reply proxy if no matching endpoint found.
193                     _lookups[key] = lookupReply;
194                 }
195             }
196         }
197 
198         public override Task<Ice.Object_Ice_invokeResult>
ice_invokeAsync(byte[] inParams, Ice.Current current)199         ice_invokeAsync(byte[] inParams, Ice.Current current)
200         {
201             lock(this)
202             {
203                 var request = new Request(this, current.operation, current.mode, inParams, current.ctx);
204                 invoke(null, request);
205                 return request.Task;
206             }
207         }
208 
209         public List<Ice.LocatorPrx>
getLocators(String instanceName, int waitTime)210         getLocators(String instanceName, int waitTime)
211         {
212             //
213             // Clear locators from previous search.
214             //
215             lock(this)
216             {
217                 _locators.Clear();
218             }
219 
220             //
221             // Find a locator
222             //
223             invoke(null, null);
224 
225             //
226             // Wait for responses
227             //
228             if(instanceName.Length == 0)
229             {
230                 Thread.Sleep(waitTime);
231             }
232             else
233             {
234                 lock(this)
235                 {
236                     while(!_locators.ContainsKey(instanceName) && _pendingRetryCount > 0)
237                     {
238                         Monitor.Wait(this, waitTime);
239                     }
240                 }
241             }
242 
243             //
244             // Return found locators
245             //
246             lock(this)
247             {
248                 return new List<Ice.LocatorPrx>(_locators.Values);
249             }
250         }
251 
252         public void
foundLocator(Ice.LocatorPrx locator)253         foundLocator(Ice.LocatorPrx locator)
254         {
255             lock(this)
256             {
257                 if(locator == null ||
258                    (_instanceName.Length > 0 && !locator.ice_getIdentity().category.Equals(_instanceName)))
259                 {
260                     if(_traceLevel > 2)
261                     {
262                         StringBuilder s = new StringBuilder("ignoring locator reply: instance name doesn't match\n");
263                         s.Append("expected = ").Append(_instanceName);
264                         s.Append("received = ").Append(locator.ice_getIdentity().category);
265                         _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
266                     }
267                     return;
268                 }
269 
270                 //
271                 // If we already have a locator assigned, ensure the given locator
272                 // has the same identity, otherwise ignore it.
273                 //
274                 if(_pendingRequests.Count > 0 &&
275                    _locator != null && !locator.ice_getIdentity().category.Equals(_locator.ice_getIdentity().category))
276                 {
277                     if(!_warned)
278                     {
279                         _warned = true; // Only warn once
280 
281                         locator.ice_getCommunicator().getLogger().warning(
282                         "received Ice locator with different instance name:\n" +
283                         "using = `" + _locator.ice_getIdentity().category + "'\n" +
284                         "received = `" + locator.ice_getIdentity().category + "'\n" +
285                         "This is typically the case if multiple Ice locators with different " +
286                         "instance names are deployed and the property `IceLocatorDiscovery.InstanceName'" +
287                         "is not set.");
288 
289                     }
290                     return;
291                 }
292 
293                 if(_pendingRetryCount > 0) // No need to retry, we found a locator
294                 {
295                     _timer.cancel(this);
296                     _pendingRetryCount = 0;
297                 }
298 
299                 if(_traceLevel > 0)
300                 {
301                     StringBuilder s = new StringBuilder("locator lookup succeeded:\nlocator = ");
302                     s.Append(locator);
303                     if(_instanceName.Length == 0)
304                     {
305                         s.Append("\ninstance name = ").Append(_instanceName);
306                     }
307                     _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
308                 }
309 
310                 Ice.LocatorPrx l = null;
311                 if(_pendingRequests.Count == 0)
312                 {
313                     _locators.TryGetValue(locator.ice_getIdentity().category, out _locator);
314                 }
315                 else
316                 {
317                     l = _locator;
318                 }
319                 if(l != null)
320                 {
321                     //
322                     // We found another locator replica, append its endpoints to the
323                     // current locator proxy endpoints.
324                     //
325                     List<Ice.Endpoint> newEndpoints = new List<Ice.Endpoint>(l.ice_getEndpoints());
326                     foreach(Ice.Endpoint p in locator.ice_getEndpoints())
327                     {
328                         //
329                         // Only add endpoints if not already in the locator proxy endpoints
330                         //
331                         bool found = false;
332                         foreach(Ice.Endpoint q in newEndpoints)
333                         {
334                             if(p.Equals(q))
335                             {
336                                 found = true;
337                                 break;
338                             }
339                         }
340                         if(!found)
341                         {
342                             newEndpoints.Add(p);
343                         }
344                     }
345                     l = (Ice.LocatorPrx)l.ice_endpoints(newEndpoints.ToArray());
346                 }
347                 else
348                 {
349                     l = locator;
350                 }
351 
352                 if(_pendingRequests.Count == 0)
353                 {
354                     _locators[locator.ice_getIdentity().category] = l;
355                     Monitor.Pulse(this);
356                 }
357                 else
358                 {
359                     _locator = l;
360                     if(_instanceName.Length == 0)
361                     {
362                         _instanceName = _locator.ice_getIdentity().category; // Stick to the first locator
363                     }
364 
365                     //
366                     // Send pending requests if any.
367                     //
368                     foreach(Request req in _pendingRequests)
369                     {
370                         req.invoke(_locator);
371                     }
372                     _pendingRequests.Clear();
373                 }
374             }
375         }
376 
377         public void
invoke(Ice.LocatorPrx locator, Request request)378         invoke(Ice.LocatorPrx locator, Request request)
379         {
380             lock(this)
381             {
382                 if(request != null && _locator != null && _locator != locator)
383                 {
384                     request.invoke(_locator);
385                 }
386                 else if(request != null && IceInternal.Time.currentMonotonicTimeMillis() < _nextRetry)
387                 {
388                     request.invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires
389                 }
390                 else
391                 {
392                     _locator = null;
393 
394                     if(request != null)
395                     {
396                         _pendingRequests.Add(request);
397                     }
398 
399                     if(_pendingRetryCount == 0) // No request in progress
400                     {
401                         _pendingRetryCount = _retryCount;
402                         _failureCount = 0;
403                         try
404                         {
405                             if(_traceLevel > 1)
406                             {
407                                 StringBuilder s = new StringBuilder("looking up locator:\nlookup = ");
408                                 s.Append(_lookup);
409                                 if(_instanceName.Length == 0)
410                                 {
411                                     s.Append("\ninstance name = ").Append(_instanceName);
412                                 }
413                                 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
414                             }
415 
416                             foreach(var l in _lookups)
417                             {
418                                 l.Key.findLocatorAsync(_instanceName, l.Value).ContinueWith(t => {
419                                     try
420                                     {
421                                         t.Wait();
422                                     }
423                                     catch(AggregateException ex)
424                                     {
425                                         exception(ex.InnerException);
426                                     }
427                                 }, l.Key.ice_scheduler()); // Send multicast request.
428                             }
429                             _timer.schedule(this, _timeout);
430                         }
431                         catch(Ice.LocalException ex)
432                         {
433                             if(_traceLevel > 0)
434                             {
435                                 StringBuilder s = new StringBuilder("locator lookup failed:\nlookup = ");
436                                 s.Append(_lookup);
437                                 if(_instanceName.Length == 0)
438                                 {
439                                     s.Append("\ninstance name = ").Append(_instanceName);
440                                 }
441                                 s.Append("\n").Append(ex);
442                                 _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
443                             }
444 
445                             foreach(Request req in _pendingRequests)
446                             {
447                                 req.invoke(_voidLocator);
448                             }
449                             _pendingRequests.Clear();
450                             _pendingRetryCount = 0;
451                         }
452                     }
453                 }
454             }
455         }
456 
exception(Exception ex)457         void exception(Exception ex)
458         {
459             lock(this)
460             {
461                 if(++_failureCount == _lookups.Count && _pendingRetryCount > 0)
462                 {
463                     //
464                     // All the lookup calls failed, cancel the timer and propagate the error to the requests.
465                     //
466                     _timer.cancel(this);
467 
468                     _pendingRetryCount = 0;
469 
470                     if(_warnOnce)
471                     {
472                         StringBuilder builder = new StringBuilder();
473                         builder.Append("failed to lookup locator with lookup proxy `");
474                         builder.Append(_lookup);
475                         builder.Append("':\n");
476                         builder.Append(ex);
477                         _lookup.ice_getCommunicator().getLogger().warning(builder.ToString());
478                         _warnOnce = false;
479                     }
480 
481                     if(_traceLevel > 0)
482                     {
483                         StringBuilder s = new StringBuilder("locator lookup failed:\nlookup = ");
484                         s.Append(_lookup);
485                         if(_instanceName.Length == 0)
486                         {
487                             s.Append("\ninstance name = ").Append(_instanceName);
488                         }
489                         s.Append("\n").Append(ex);
490                         _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
491                     }
492 
493                     if(_pendingRequests.Count == 0)
494                     {
495                         Monitor.Pulse(this);
496                     }
497                     else
498                     {
499                         foreach(Request req in _pendingRequests)
500                         {
501                             req.invoke(_voidLocator);
502                         }
503                         _pendingRequests.Clear();
504                     }
505                 }
506             }
507         }
508 
runTimerTask()509         public void runTimerTask()
510         {
511             lock(this)
512             {
513                 if(--_pendingRetryCount > 0)
514                 {
515                     try
516                     {
517                         if(_traceLevel > 1)
518                         {
519                             StringBuilder s = new StringBuilder("retrying locator lookup:\nlookup = ");
520                             s.Append(_lookup);
521                             s.Append("\nretry count = ").Append(_retryCount);
522                             if(_instanceName.Length == 0)
523                             {
524                                 s.Append("\ninstance name = ").Append(_instanceName);
525                             }
526                             _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
527                         }
528 
529                         foreach(var l in _lookups)
530                         {
531                             l.Key.findLocatorAsync(_instanceName, l.Value).ContinueWith(t => {
532                                     try
533                                     {
534                                         t.Wait();
535                                     }
536                                     catch(AggregateException ex)
537                                     {
538                                         exception(ex.InnerException);
539                                     }
540                                 }, l.Key.ice_scheduler()); // Send multicast request.
541                         }
542                         _timer.schedule(this, _timeout);
543                         return;
544                     }
545                     catch(Ice.LocalException)
546                     {
547                     }
548                     _pendingRetryCount = 0;
549                 }
550 
551                 if(_traceLevel > 0)
552                 {
553                     StringBuilder s = new StringBuilder("locator lookup timed out:\nlookup = ");
554                     s.Append(_lookup);
555                     if(_instanceName.Length == 0)
556                     {
557                         s.Append("\ninstance name = ").Append(_instanceName);
558                     }
559                     _lookup.ice_getCommunicator().getLogger().trace("Lookup", s.ToString());
560                 }
561 
562                 if(_pendingRequests.Count == 0)
563                 {
564                     Monitor.Pulse(this);
565                 }
566                 else
567                 {
568                     foreach(Request req in _pendingRequests)
569                     {
570                         req.invoke(_voidLocator);
571                     }
572                     _pendingRequests.Clear();
573                 }
574                 _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay;
575             }
576         }
577 
578         private LookupPrx _lookup;
579         private Dictionary<LookupPrx, LookupReplyPrx> _lookups = new Dictionary<LookupPrx, LookupReplyPrx>();
580         private int _timeout;
581         private IceInternal.Timer _timer;
582         private int _traceLevel;
583         private int _retryCount;
584         private int _retryDelay;
585 
586         private string _instanceName;
587         private bool _warned;
588         private Ice.LocatorPrx _locator;
589         private Ice.LocatorPrx _voidLocator;
590         private Dictionary<string, Ice.LocatorPrx> _locators = new Dictionary<string, Ice.LocatorPrx>();
591 
592         private int _pendingRetryCount;
593         private int _failureCount;
594         private bool _warnOnce = true;
595         private List<Request> _pendingRequests = new List<Request>();
596         private long _nextRetry;
597     };
598 
599     internal class LookupReplyI : LookupReplyDisp_
600     {
LookupReplyI(LocatorI locator)601         public LookupReplyI(LocatorI locator)
602         {
603             _locator = locator;
604         }
605 
606         public override void
foundLocator(Ice.LocatorPrx locator, Ice.Current current)607         foundLocator(Ice.LocatorPrx locator, Ice.Current current)
608         {
609             _locator.foundLocator(locator);
610         }
611 
612         private LocatorI _locator;
613     }
614 
615     internal class PluginI : Ice.Plugin
616     {
617         public
PluginI(string name, Ice.Communicator communicator)618         PluginI(string name, Ice.Communicator communicator)
619         {
620             _name = name;
621             _communicator = communicator;
622         }
623 
624         public void
initialize()625         initialize()
626         {
627             Ice.Properties properties = _communicator.getProperties();
628 
629             bool ipv4 = properties.getPropertyAsIntWithDefault("Ice.IPv4", 1) > 0;
630             bool preferIPv6 = properties.getPropertyAsInt("Ice.PreferIPv6Address") > 0;
631             string address;
632             if(ipv4 && !preferIPv6)
633             {
634                 address = properties.getPropertyWithDefault(_name + ".Address", "239.255.0.1");
635             }
636             else
637             {
638                 address = properties.getPropertyWithDefault(_name + ".Address", "ff15::1");
639             }
640             int port = properties.getPropertyAsIntWithDefault(_name + ".Port", 4061);
641             string intf = properties.getProperty(_name + ".Interface");
642 
643             string lookupEndpoints = properties.getProperty(_name + ".Lookup");
644             if(lookupEndpoints.Length == 0)
645             {
646                 int protocol = ipv4 && !preferIPv6 ? IceInternal.Network.EnableIPv4 : IceInternal.Network.EnableIPv6;
647                 var interfaces = IceInternal.Network.getInterfacesForMulticast(intf, protocol);
648                 foreach(string p in interfaces)
649                 {
650                     if(p != interfaces[0])
651                     {
652                         lookupEndpoints += ":";
653                     }
654                     lookupEndpoints += "udp -h \"" + address + "\" -p " + port + " --interface \"" + p + "\"";
655                 }
656             }
657 
658             if(properties.getProperty(_name + ".Reply.Endpoints").Length == 0)
659             {
660                 properties.setProperty(_name + ".Reply.Endpoints",
661                                        "udp -h " + (intf.Length == 0 ? "*" : "\"" + intf + "\""));
662             }
663 
664             if(properties.getProperty(_name + ".Locator.Endpoints").Length == 0)
665             {
666                 properties.setProperty(_name + ".Locator.AdapterId", Guid.NewGuid().ToString());
667             }
668 
669             _replyAdapter = _communicator.createObjectAdapter(_name + ".Reply");
670             _locatorAdapter = _communicator.createObjectAdapter(_name + ".Locator");
671 
672             // We don't want those adapters to be registered with the locator so clear their locator.
673             _replyAdapter.setLocator(null);
674             _locatorAdapter.setLocator(null);
675 
676             Ice.ObjectPrx lookupPrx = _communicator.stringToProxy("IceLocatorDiscovery/Lookup -d:" + lookupEndpoints);
677             // No colloc optimization or router for the multicast proxy!
678             lookupPrx = lookupPrx.ice_collocationOptimized(false).ice_router(null);
679 
680             Ice.LocatorPrx voidLo = Ice.LocatorPrxHelper.uncheckedCast(_locatorAdapter.addWithUUID(new VoidLocatorI()));
681 
682             string instanceName = properties.getProperty(_name + ".InstanceName");
683             Ice.Identity id = new Ice.Identity();
684             id.name = "Locator";
685             id.category = instanceName.Length > 0 ? instanceName : Guid.NewGuid().ToString();
686 
687             _defaultLocator = _communicator.getDefaultLocator();
688             _locator = new LocatorI(_name, LookupPrxHelper.uncheckedCast(lookupPrx), properties, instanceName, voidLo);
689             _locatorPrx = Ice.LocatorPrxHelper.uncheckedCast(_locatorAdapter.addWithUUID(_locator));
690             _communicator.setDefaultLocator(_locatorPrx);
691 
692             Ice.ObjectPrx lookupReply = _replyAdapter.addWithUUID(new LookupReplyI(_locator)).ice_datagram();
693             _locator.setLookupReply(LookupReplyPrxHelper.uncheckedCast(lookupReply));
694 
695             _replyAdapter.activate();
696             _locatorAdapter.activate();
697         }
698 
699         public void
destroy()700         destroy()
701         {
702             if(_replyAdapter != null)
703             {
704                 _replyAdapter.destroy();
705             }
706             if(_locatorAdapter != null)
707             {
708                 _locatorAdapter.destroy();
709             }
710             if(_communicator.getDefaultLocator().Equals(_locatorPrx))
711             {
712                 // Restore original default locator proxy, if the user didn't change it in the meantime
713                 _communicator.setDefaultLocator(_defaultLocator);
714             }
715         }
716 
717         List<Ice.LocatorPrx>
getLocators(string instanceName, int waitTime)718         getLocators(string instanceName, int waitTime)
719         {
720             return _locator.getLocators(instanceName, waitTime);
721         }
722 
723         private string _name;
724         private Ice.Communicator _communicator;
725         private Ice.ObjectAdapter _locatorAdapter;
726         private Ice.ObjectAdapter _replyAdapter;
727         private LocatorI _locator;
728         private Ice.LocatorPrx _locatorPrx;
729         private Ice.LocatorPrx _defaultLocator;
730     }
731 
732     public class Util
733     {
734         public static void
registerIceLocatorDiscovery(bool loadOnInitialize)735         registerIceLocatorDiscovery(bool loadOnInitialize)
736         {
737             Ice.Util.registerPluginFactory("IceLocatorDiscovery", new PluginFactory(), loadOnInitialize);
738         }
739     }
740 }
741