1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <Ice/Connection.h>
6 #include <Ice/ObjectAdapter.h>
7 #include <Ice/Communicator.h>
8 #include <Ice/LocalException.h>
9 #include <Ice/Initialize.h>
10 #include <Ice/LoggerUtil.h>
11 #include <Ice/UUID.h>
12 
13 #include <IceDiscovery/LookupI.h>
14 #include <iterator>
15 
16 using namespace std;
17 using namespace Ice;
18 using namespace IceDiscovery;
19 
20 #ifndef ICE_CPP11_MAPPING
21 namespace
22 {
23 
24 class AdapterCallbackI : public IceUtil::Shared
25 {
26 public:
27 
AdapterCallbackI(const LookupIPtr & lookup,const AdapterRequestPtr & request)28     AdapterCallbackI(const LookupIPtr& lookup, const AdapterRequestPtr& request) : _lookup(lookup), _request(request)
29     {
30     }
31 
32     void
completed(const Ice::AsyncResultPtr & result)33     completed(const Ice::AsyncResultPtr& result)
34     {
35         try
36         {
37             result->throwLocalException();
38         }
39         catch(const Ice::LocalException& ex)
40         {
41             _lookup->adapterRequestException(_request, ex);
42         }
43     }
44 
45 private:
46 
47     LookupIPtr _lookup;
48     AdapterRequestPtr _request;
49 };
50 
51 class ObjectCallbackI : public IceUtil::Shared
52 {
53 public:
54 
ObjectCallbackI(const LookupIPtr & lookup,const ObjectRequestPtr & request)55     ObjectCallbackI(const LookupIPtr& lookup, const ObjectRequestPtr& request) : _lookup(lookup), _request(request)
56     {
57     }
58 
59     void
completed(const Ice::AsyncResultPtr & result)60     completed(const Ice::AsyncResultPtr& result)
61     {
62         try
63         {
64             result->throwLocalException();
65         }
66         catch(const Ice::LocalException& ex)
67         {
68             _lookup->objectRequestException(_request, ex);
69         }
70     }
71 
72 private:
73 
74     LookupIPtr _lookup;
75     ObjectRequestPtr _request;
76 };
77 
78 }
79 #endif
80 
Request(const LookupIPtr & lookup,int retryCount)81 IceDiscovery::Request::Request(const LookupIPtr& lookup, int retryCount) :
82     _lookup(lookup), _requestId(Ice::generateUUID()), _retryCount(retryCount), _lookupCount(0), _failureCount(0)
83 {
84 }
85 
86 bool
retry()87 IceDiscovery::Request::retry()
88 {
89     return --_retryCount >= 0;
90 }
91 
92 void
invoke(const string & domainId,const vector<pair<LookupPrxPtr,LookupReplyPrxPtr>> & lookups)93 IceDiscovery::Request::invoke(const string& domainId, const vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >& lookups)
94 {
95     _lookupCount = lookups.size();
96     _failureCount = 0;
97     Ice::Identity id;
98     id.name = _requestId;
99     for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator p = lookups.begin(); p != lookups.end(); ++p)
100     {
101         invokeWithLookup(domainId, p->first, ICE_UNCHECKED_CAST(LookupReplyPrx, p->second->ice_identity(id)));
102     }
103 }
104 
105 bool
exception()106 IceDiscovery::Request::exception()
107 {
108     //
109     // If all the invocations on all the lookup proxies failed, report it to the locator.
110     //
111     if(++_failureCount == _lookupCount)
112     {
113         finished(0);
114         return true;
115     }
116     return false;
117 }
118 
119 string
getRequestId() const120 IceDiscovery::Request::getRequestId() const
121 {
122     return _requestId;
123 }
124 
AdapterRequest(const LookupIPtr & lookup,const std::string & adapterId,int retryCount)125 AdapterRequest::AdapterRequest(const LookupIPtr& lookup, const std::string& adapterId, int retryCount) :
126     RequestT<std::string, AdapterCB>(lookup, adapterId, retryCount),
127     _start(IceUtil::Time::now())
128 {
129 }
130 
131 bool
retry()132 AdapterRequest::retry()
133 {
134     return _proxies.empty() && --_retryCount >= 0;
135 }
136 
137 bool
response(const Ice::ObjectPrxPtr & proxy,bool isReplicaGroup)138 AdapterRequest::response(const Ice::ObjectPrxPtr& proxy, bool isReplicaGroup)
139 {
140     if(isReplicaGroup)
141     {
142         if(_latency == IceUtil::Time())
143         {
144             _latency = (IceUtil::Time::now() - _start) * _lookup->latencyMultiplier();
145             _lookup->timer()->cancel(ICE_SHARED_FROM_THIS);
146             _lookup->timer()->schedule(ICE_SHARED_FROM_THIS, _latency);
147         }
148         _proxies.insert(proxy);
149         return false;
150     }
151     finished(proxy);
152     return true;
153 }
154 
155 void
finished(const ObjectPrxPtr & proxy)156 AdapterRequest::finished(const ObjectPrxPtr& proxy)
157 {
158     if(proxy || _proxies.empty())
159     {
160         RequestT<string, AdapterCB>::finished(proxy);
161     }
162     else if(_proxies.size() == 1)
163     {
164         RequestT<string, AdapterCB>::finished(*_proxies.begin());
165     }
166     else
167     {
168         EndpointSeq endpoints;
169         ObjectPrxPtr prx;
170         for(set<ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
171         {
172             if(!prx)
173             {
174                 prx = *p;
175             }
176             Ice::EndpointSeq endpts = (*p)->ice_getEndpoints();
177             copy(endpts.begin(), endpts.end(), back_inserter(endpoints));
178         }
179         RequestT<string, AdapterCB>::finished(prx->ice_endpoints(endpoints));
180     }
181 }
182 
183 void
invokeWithLookup(const string & domainId,const LookupPrxPtr & lookup,const LookupReplyPrxPtr & lookupReply)184 AdapterRequest::invokeWithLookup(const string& domainId, const LookupPrxPtr& lookup, const LookupReplyPrxPtr& lookupReply)
185 {
186 #ifdef ICE_CPP11_MAPPING
187     auto self = ICE_SHARED_FROM_THIS;
188     lookup->findAdapterByIdAsync(domainId, _id, lookupReply, nullptr, [self](exception_ptr ex)
189     {
190         try
191         {
192             rethrow_exception(ex);
193         }
194         catch(const Ice::LocalException& e)
195         {
196             self->_lookup->adapterRequestException(self, e);
197         }
198     });
199 #else
200     lookup->begin_findAdapterById(domainId, _id, lookupReply, newCallback(new AdapterCallbackI(_lookup, this),
201                                                                           &AdapterCallbackI::completed));
202 #endif
203 }
204 
205 void
runTimerTask()206 AdapterRequest::runTimerTask()
207 {
208     _lookup->adapterRequestTimedOut(ICE_SHARED_FROM_THIS);
209 }
210 
ObjectRequest(const LookupIPtr & lookup,const Ice::Identity & id,int retryCount)211 ObjectRequest::ObjectRequest(const LookupIPtr& lookup, const Ice::Identity& id, int retryCount) :
212     RequestT<Ice::Identity, ObjectCB>(lookup, id, retryCount)
213 {
214 }
215 
216 void
response(const Ice::ObjectPrxPtr & proxy)217 ObjectRequest::response(const Ice::ObjectPrxPtr& proxy)
218 {
219     finished(proxy);
220 }
221 
222 void
invokeWithLookup(const string & domainId,const LookupPrxPtr & lookup,const LookupReplyPrxPtr & lookupReply)223 ObjectRequest::invokeWithLookup(const string& domainId, const LookupPrxPtr& lookup, const LookupReplyPrxPtr& lookupReply)
224 {
225 #ifdef ICE_CPP11_MAPPING
226     auto self = ICE_SHARED_FROM_THIS;
227     lookup->findObjectByIdAsync(domainId, _id, lookupReply, nullptr, [self](exception_ptr ex)
228     {
229         try
230         {
231             rethrow_exception(ex);
232         }
233         catch(const Ice::LocalException& e)
234         {
235             self->_lookup->objectRequestException(self, e);
236         }
237     });
238 #else
239     lookup->begin_findObjectById(domainId, _id, lookupReply, newCallback(new ObjectCallbackI(_lookup, this),
240                                                                          &ObjectCallbackI::completed));
241 
242 #endif
243 }
244 
245 void
runTimerTask()246 ObjectRequest::runTimerTask()
247 {
248     _lookup->objectRequestTimedOut(ICE_SHARED_FROM_THIS);
249 }
250 
LookupI(const LocatorRegistryIPtr & registry,const LookupPrxPtr & lookup,const Ice::PropertiesPtr & properties)251 LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrxPtr& lookup, const Ice::PropertiesPtr& properties) :
252     _registry(registry),
253     _lookup(lookup),
254     _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))),
255     _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)),
256     _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)),
257     _domainId(properties->getProperty("IceDiscovery.DomainId")),
258     _timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())),
259     _warnOnce(true)
260 {
261     //
262     // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
263     // datagram on each endpoint.
264     //
265     EndpointSeq endpoints = lookup->ice_getEndpoints();
266     for(vector<EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
267     {
268         EndpointSeq single;
269         single.push_back(*p);
270         _lookups.push_back(make_pair(lookup->ice_endpoints(single), LookupReplyPrxPtr()));
271     }
272     assert(!_lookups.empty());
273 }
274 
~LookupI()275 LookupI::~LookupI()
276 {
277 }
278 
279 void
destroy()280 LookupI::destroy()
281 {
282     Lock sync(*this);
283     for(map<Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.begin(); p != _objectRequests.end(); ++p)
284     {
285         p->second->finished(0);
286         _timer->cancel(p->second);
287     }
288     _objectRequests.clear();
289 
290     for(map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.begin(); p != _adapterRequests.end(); ++p)
291     {
292         p->second->finished(0);
293         _timer->cancel(p->second);
294     }
295     _adapterRequests.clear();
296 }
297 
298 void
setLookupReply(const LookupReplyPrxPtr & lookupReply)299 LookupI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
300 {
301     //
302     // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
303     //
304     for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookups.begin(); p != _lookups.end(); ++p)
305     {
306         UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo());
307         if(info && !info->mcastInterface.empty())
308         {
309             EndpointSeq endpts = lookupReply->ice_getEndpoints();
310             for(EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q)
311             {
312                 IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(IPEndpointInfo, (*q)->getInfo());
313                 if(r && r->host == info->mcastInterface)
314                 {
315                     EndpointSeq single;
316                     single.push_back(*q);
317                     p->second = lookupReply->ice_endpoints(single);
318                 }
319             }
320         }
321 
322         if(!p->second)
323         {
324             p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found.
325         }
326     }
327 }
328 
329 void
findObjectById(ICE_IN (string)domainId,ICE_IN (Ice::Identity)id,ICE_IN (LookupReplyPrxPtr)reply,const Ice::Current &)330 LookupI::findObjectById(ICE_IN(string) domainId, ICE_IN(Ice::Identity) id, ICE_IN(LookupReplyPrxPtr) reply,
331                         const Ice::Current&)
332 {
333     if(domainId != _domainId)
334     {
335         return; // Ignore.
336     }
337 
338     Ice::ObjectPrxPtr proxy = _registry->findObject(id);
339     if(proxy)
340     {
341         //
342         // Reply to the mulicast request using the given proxy.
343         //
344         try
345         {
346 #ifdef ICE_CPP11_MAPPING
347             reply->foundObjectByIdAsync(id, proxy);
348 #else
349             reply->begin_foundObjectById(id, proxy);
350 #endif
351         }
352         catch(const Ice::LocalException&)
353         {
354             // Ignore.
355         }
356     }
357 }
358 
359 void
findAdapterById(ICE_IN (string)domainId,ICE_IN (string)adapterId,ICE_IN (LookupReplyPrxPtr)reply,const Ice::Current &)360 LookupI::findAdapterById(ICE_IN(string) domainId, ICE_IN(string) adapterId, ICE_IN(LookupReplyPrxPtr) reply,
361                          const Ice::Current&)
362 {
363     if(domainId != _domainId)
364     {
365         return; // Ignore.
366     }
367 
368     bool isReplicaGroup;
369     Ice::ObjectPrxPtr proxy = _registry->findAdapter(adapterId, isReplicaGroup);
370     if(proxy)
371     {
372         //
373         // Reply to the multicast request using the given proxy.
374         //
375         try
376         {
377 #ifdef ICE_CPP11_MAPPING
378             reply->foundAdapterByIdAsync(adapterId, proxy, isReplicaGroup);
379 #else
380             reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
381 #endif
382         }
383         catch(const Ice::LocalException&)
384         {
385             // Ignore.
386         }
387     }
388 }
389 
390 void
findObject(const ObjectCB & cb,const Ice::Identity & id)391 LookupI::findObject(const ObjectCB& cb, const Ice::Identity& id)
392 {
393     Lock sync(*this);
394     map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
395     if(p == _objectRequests.end())
396     {
397         p = _objectRequests.insert(make_pair(id, ICE_MAKE_SHARED(ObjectRequest,
398                                                                  ICE_SHARED_FROM_THIS,
399                                                                  id,
400                                                                  _retryCount))).first;
401     }
402 
403     if(p->second->addCallback(cb))
404     {
405         try
406         {
407             p->second->invoke(_domainId, _lookups);
408             _timer->schedule(p->second, _timeout);
409         }
410         catch(const Ice::LocalException&)
411         {
412             p->second->finished(ICE_NULLPTR);
413             _objectRequests.erase(p);
414         }
415     }
416 }
417 
418 void
findAdapter(const AdapterCB & cb,const std::string & adapterId)419 LookupI::findAdapter(const AdapterCB& cb, const std::string& adapterId)
420 {
421     Lock sync(*this);
422     map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
423     if(p == _adapterRequests.end())
424     {
425         p = _adapterRequests.insert(make_pair(adapterId, ICE_MAKE_SHARED(AdapterRequest,
426                                                                          ICE_SHARED_FROM_THIS,
427                                                                          adapterId,
428                                                                          _retryCount))).first;
429     }
430 
431     if(p->second->addCallback(cb))
432     {
433         try
434         {
435             p->second->invoke(_domainId, _lookups);
436             _timer->schedule(p->second, _timeout);
437         }
438         catch(const Ice::LocalException&)
439         {
440             p->second->finished(ICE_NULLPTR);
441             _adapterRequests.erase(p);
442         }
443     }
444 }
445 
446 void
foundObject(const Ice::Identity & id,const string & requestId,const Ice::ObjectPrxPtr & proxy)447 LookupI::foundObject(const Ice::Identity& id, const string& requestId, const Ice::ObjectPrxPtr& proxy)
448 {
449     Lock sync(*this);
450     map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
451     if(p != _objectRequests.end() && p->second->getRequestId() == requestId) // Ignore responses from old requests
452     {
453         p->second->response(proxy);
454         _timer->cancel(p->second);
455         _objectRequests.erase(p);
456     }
457 }
458 
459 void
foundAdapter(const string & adapterId,const string & requestId,const Ice::ObjectPrxPtr & proxy,bool isReplicaGroup)460 LookupI::foundAdapter(const string& adapterId, const string& requestId, const Ice::ObjectPrxPtr& proxy,
461                       bool isReplicaGroup)
462 {
463     Lock sync(*this);
464     map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
465     if(p != _adapterRequests.end() && p->second->getRequestId() == requestId) // Ignore responses from old requests
466     {
467         if(p->second->response(proxy, isReplicaGroup))
468         {
469             _timer->cancel(p->second);
470             _adapterRequests.erase(p);
471         }
472     }
473 }
474 
475 void
objectRequestTimedOut(const ObjectRequestPtr & request)476 LookupI::objectRequestTimedOut(const ObjectRequestPtr& request)
477 {
478     Lock sync(*this);
479     map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(request->getId());
480     if(p == _objectRequests.end() || p->second.get() != request.get())
481     {
482         return;
483     }
484 
485     if(request->retry())
486     {
487         try
488         {
489             request->invoke(_domainId, _lookups);
490             _timer->schedule(request, _timeout);
491             return;
492         }
493         catch(const Ice::LocalException&)
494         {
495         }
496     }
497 
498     request->finished(0);
499     _objectRequests.erase(p);
500     _timer->cancel(request);
501 }
502 
503 void
adapterRequestException(const AdapterRequestPtr & request,const LocalException & ex)504 LookupI::adapterRequestException(const AdapterRequestPtr& request, const LocalException& ex)
505 {
506     Lock sync(*this);
507     map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(request->getId());
508     if(p == _adapterRequests.end() || p->second.get() != request.get())
509     {
510         return;
511     }
512 
513     if(request->exception())
514     {
515         if(_warnOnce)
516         {
517             Warning warn(_lookup->ice_getCommunicator()->getLogger());
518             warn << "failed to lookup adapter `" << p->first << "' with lookup proxy `" << _lookup << "':\n" << ex;
519             _warnOnce = false;
520         }
521         _timer->cancel(request);
522         _adapterRequests.erase(p);
523     }
524 }
525 
526 void
adapterRequestTimedOut(const AdapterRequestPtr & request)527 LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request)
528 {
529     Lock sync(*this);
530     map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(request->getId());
531     if(p == _adapterRequests.end() || p->second.get() != request.get())
532     {
533         return;
534     }
535 
536     if(request->retry())
537     {
538         try
539         {
540             request->invoke(_domainId, _lookups);
541             _timer->schedule(request, _timeout);
542             return;
543         }
544         catch(const Ice::LocalException&)
545         {
546         }
547     }
548 
549     request->finished(0);
550     _adapterRequests.erase(p);
551     _timer->cancel(request);
552 }
553 
554 void
objectRequestException(const ObjectRequestPtr & request,const LocalException & ex)555 LookupI::objectRequestException(const ObjectRequestPtr& request, const LocalException& ex)
556 {
557     Lock sync(*this);
558     map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(request->getId());
559     if(p == _objectRequests.end() || p->second.get() != request.get())
560     {
561         return;
562     }
563 
564     if(request->exception())
565     {
566         if(_warnOnce)
567         {
568             Warning warn(_lookup->ice_getCommunicator()->getLogger());
569             string id = _lookup->ice_getCommunicator()->identityToString(p->first);
570             warn << "failed to lookup object `" << id << "' with lookup proxy `" << _lookup << "':\n" << ex;
571             _warnOnce = false;
572         }
573         _timer->cancel(request);
574         _objectRequests.erase(p);
575     }
576 }
577 
LookupReplyI(const LookupIPtr & lookup)578 LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup)
579 {
580 }
581 
582 #ifdef ICE_CPP11_MAPPING
583 void
foundObjectById(Identity id,shared_ptr<ObjectPrx> proxy,const Current & current)584 LookupReplyI::foundObjectById(Identity id, shared_ptr<ObjectPrx> proxy, const Current& current)
585 {
586     _lookup->foundObject(id, current.id.name, proxy);
587 }
588 
589 void
foundAdapterById(string adapterId,shared_ptr<ObjectPrx> proxy,bool isReplicaGroup,const Current & current)590 LookupReplyI::foundAdapterById(string adapterId, shared_ptr<ObjectPrx> proxy, bool isReplicaGroup,
591                                const Current& current)
592 {
593     _lookup->foundAdapter(adapterId, current.id.name, proxy, isReplicaGroup);
594 }
595 #else
596 void
foundObjectById(const Identity & id,const ObjectPrxPtr & proxy,const Current & current)597 LookupReplyI::foundObjectById(const Identity& id, const ObjectPrxPtr& proxy, const Current& current)
598 {
599     _lookup->foundObject(id, current.id.name, proxy);
600 }
601 
602 void
foundAdapterById(const string & adapterId,const ObjectPrxPtr & proxy,bool isReplicaGroup,const Current & current)603 LookupReplyI::foundAdapterById(const string& adapterId, const ObjectPrxPtr& proxy, bool isReplicaGroup,
604                                const Current& current)
605 {
606     _lookup->foundAdapter(adapterId, current.id.name, proxy, isReplicaGroup);
607 }
608 #endif
609