1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <Ice/Connection.h>
6 #include <Ice/ObjectAdapter.h>
7 #include <Ice/Communicator.h>
8 #include <Ice/LocalException.h>
9 #include <Ice/Initialize.h>
10 #include <Ice/LoggerUtil.h>
11 #include <Ice/UUID.h>
12
13 #include <IceDiscovery/LookupI.h>
14 #include <iterator>
15
16 using namespace std;
17 using namespace Ice;
18 using namespace IceDiscovery;
19
20 #ifndef ICE_CPP11_MAPPING
21 namespace
22 {
23
24 class AdapterCallbackI : public IceUtil::Shared
25 {
26 public:
27
AdapterCallbackI(const LookupIPtr & lookup,const AdapterRequestPtr & request)28 AdapterCallbackI(const LookupIPtr& lookup, const AdapterRequestPtr& request) : _lookup(lookup), _request(request)
29 {
30 }
31
32 void
completed(const Ice::AsyncResultPtr & result)33 completed(const Ice::AsyncResultPtr& result)
34 {
35 try
36 {
37 result->throwLocalException();
38 }
39 catch(const Ice::LocalException& ex)
40 {
41 _lookup->adapterRequestException(_request, ex);
42 }
43 }
44
45 private:
46
47 LookupIPtr _lookup;
48 AdapterRequestPtr _request;
49 };
50
51 class ObjectCallbackI : public IceUtil::Shared
52 {
53 public:
54
ObjectCallbackI(const LookupIPtr & lookup,const ObjectRequestPtr & request)55 ObjectCallbackI(const LookupIPtr& lookup, const ObjectRequestPtr& request) : _lookup(lookup), _request(request)
56 {
57 }
58
59 void
completed(const Ice::AsyncResultPtr & result)60 completed(const Ice::AsyncResultPtr& result)
61 {
62 try
63 {
64 result->throwLocalException();
65 }
66 catch(const Ice::LocalException& ex)
67 {
68 _lookup->objectRequestException(_request, ex);
69 }
70 }
71
72 private:
73
74 LookupIPtr _lookup;
75 ObjectRequestPtr _request;
76 };
77
78 }
79 #endif
80
Request(const LookupIPtr & lookup,int retryCount)81 IceDiscovery::Request::Request(const LookupIPtr& lookup, int retryCount) :
82 _lookup(lookup), _requestId(Ice::generateUUID()), _retryCount(retryCount), _lookupCount(0), _failureCount(0)
83 {
84 }
85
86 bool
retry()87 IceDiscovery::Request::retry()
88 {
89 return --_retryCount >= 0;
90 }
91
92 void
invoke(const string & domainId,const vector<pair<LookupPrxPtr,LookupReplyPrxPtr>> & lookups)93 IceDiscovery::Request::invoke(const string& domainId, const vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >& lookups)
94 {
95 _lookupCount = lookups.size();
96 _failureCount = 0;
97 Ice::Identity id;
98 id.name = _requestId;
99 for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::const_iterator p = lookups.begin(); p != lookups.end(); ++p)
100 {
101 invokeWithLookup(domainId, p->first, ICE_UNCHECKED_CAST(LookupReplyPrx, p->second->ice_identity(id)));
102 }
103 }
104
105 bool
exception()106 IceDiscovery::Request::exception()
107 {
108 //
109 // If all the invocations on all the lookup proxies failed, report it to the locator.
110 //
111 if(++_failureCount == _lookupCount)
112 {
113 finished(0);
114 return true;
115 }
116 return false;
117 }
118
119 string
getRequestId() const120 IceDiscovery::Request::getRequestId() const
121 {
122 return _requestId;
123 }
124
AdapterRequest(const LookupIPtr & lookup,const std::string & adapterId,int retryCount)125 AdapterRequest::AdapterRequest(const LookupIPtr& lookup, const std::string& adapterId, int retryCount) :
126 RequestT<std::string, AdapterCB>(lookup, adapterId, retryCount),
127 _start(IceUtil::Time::now())
128 {
129 }
130
131 bool
retry()132 AdapterRequest::retry()
133 {
134 return _proxies.empty() && --_retryCount >= 0;
135 }
136
137 bool
response(const Ice::ObjectPrxPtr & proxy,bool isReplicaGroup)138 AdapterRequest::response(const Ice::ObjectPrxPtr& proxy, bool isReplicaGroup)
139 {
140 if(isReplicaGroup)
141 {
142 if(_latency == IceUtil::Time())
143 {
144 _latency = (IceUtil::Time::now() - _start) * _lookup->latencyMultiplier();
145 _lookup->timer()->cancel(ICE_SHARED_FROM_THIS);
146 _lookup->timer()->schedule(ICE_SHARED_FROM_THIS, _latency);
147 }
148 _proxies.insert(proxy);
149 return false;
150 }
151 finished(proxy);
152 return true;
153 }
154
155 void
finished(const ObjectPrxPtr & proxy)156 AdapterRequest::finished(const ObjectPrxPtr& proxy)
157 {
158 if(proxy || _proxies.empty())
159 {
160 RequestT<string, AdapterCB>::finished(proxy);
161 }
162 else if(_proxies.size() == 1)
163 {
164 RequestT<string, AdapterCB>::finished(*_proxies.begin());
165 }
166 else
167 {
168 EndpointSeq endpoints;
169 ObjectPrxPtr prx;
170 for(set<ObjectPrxPtr>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
171 {
172 if(!prx)
173 {
174 prx = *p;
175 }
176 Ice::EndpointSeq endpts = (*p)->ice_getEndpoints();
177 copy(endpts.begin(), endpts.end(), back_inserter(endpoints));
178 }
179 RequestT<string, AdapterCB>::finished(prx->ice_endpoints(endpoints));
180 }
181 }
182
183 void
invokeWithLookup(const string & domainId,const LookupPrxPtr & lookup,const LookupReplyPrxPtr & lookupReply)184 AdapterRequest::invokeWithLookup(const string& domainId, const LookupPrxPtr& lookup, const LookupReplyPrxPtr& lookupReply)
185 {
186 #ifdef ICE_CPP11_MAPPING
187 auto self = ICE_SHARED_FROM_THIS;
188 lookup->findAdapterByIdAsync(domainId, _id, lookupReply, nullptr, [self](exception_ptr ex)
189 {
190 try
191 {
192 rethrow_exception(ex);
193 }
194 catch(const Ice::LocalException& e)
195 {
196 self->_lookup->adapterRequestException(self, e);
197 }
198 });
199 #else
200 lookup->begin_findAdapterById(domainId, _id, lookupReply, newCallback(new AdapterCallbackI(_lookup, this),
201 &AdapterCallbackI::completed));
202 #endif
203 }
204
205 void
runTimerTask()206 AdapterRequest::runTimerTask()
207 {
208 _lookup->adapterRequestTimedOut(ICE_SHARED_FROM_THIS);
209 }
210
ObjectRequest(const LookupIPtr & lookup,const Ice::Identity & id,int retryCount)211 ObjectRequest::ObjectRequest(const LookupIPtr& lookup, const Ice::Identity& id, int retryCount) :
212 RequestT<Ice::Identity, ObjectCB>(lookup, id, retryCount)
213 {
214 }
215
216 void
response(const Ice::ObjectPrxPtr & proxy)217 ObjectRequest::response(const Ice::ObjectPrxPtr& proxy)
218 {
219 finished(proxy);
220 }
221
222 void
invokeWithLookup(const string & domainId,const LookupPrxPtr & lookup,const LookupReplyPrxPtr & lookupReply)223 ObjectRequest::invokeWithLookup(const string& domainId, const LookupPrxPtr& lookup, const LookupReplyPrxPtr& lookupReply)
224 {
225 #ifdef ICE_CPP11_MAPPING
226 auto self = ICE_SHARED_FROM_THIS;
227 lookup->findObjectByIdAsync(domainId, _id, lookupReply, nullptr, [self](exception_ptr ex)
228 {
229 try
230 {
231 rethrow_exception(ex);
232 }
233 catch(const Ice::LocalException& e)
234 {
235 self->_lookup->objectRequestException(self, e);
236 }
237 });
238 #else
239 lookup->begin_findObjectById(domainId, _id, lookupReply, newCallback(new ObjectCallbackI(_lookup, this),
240 &ObjectCallbackI::completed));
241
242 #endif
243 }
244
245 void
runTimerTask()246 ObjectRequest::runTimerTask()
247 {
248 _lookup->objectRequestTimedOut(ICE_SHARED_FROM_THIS);
249 }
250
LookupI(const LocatorRegistryIPtr & registry,const LookupPrxPtr & lookup,const Ice::PropertiesPtr & properties)251 LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrxPtr& lookup, const Ice::PropertiesPtr& properties) :
252 _registry(registry),
253 _lookup(lookup),
254 _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))),
255 _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)),
256 _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)),
257 _domainId(properties->getProperty("IceDiscovery.DomainId")),
258 _timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())),
259 _warnOnce(true)
260 {
261 //
262 // Create one lookup proxy per endpoint from the given proxy. We want to send a multicast
263 // datagram on each endpoint.
264 //
265 EndpointSeq endpoints = lookup->ice_getEndpoints();
266 for(vector<EndpointPtr>::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
267 {
268 EndpointSeq single;
269 single.push_back(*p);
270 _lookups.push_back(make_pair(lookup->ice_endpoints(single), LookupReplyPrxPtr()));
271 }
272 assert(!_lookups.empty());
273 }
274
~LookupI()275 LookupI::~LookupI()
276 {
277 }
278
279 void
destroy()280 LookupI::destroy()
281 {
282 Lock sync(*this);
283 for(map<Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.begin(); p != _objectRequests.end(); ++p)
284 {
285 p->second->finished(0);
286 _timer->cancel(p->second);
287 }
288 _objectRequests.clear();
289
290 for(map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.begin(); p != _adapterRequests.end(); ++p)
291 {
292 p->second->finished(0);
293 _timer->cancel(p->second);
294 }
295 _adapterRequests.clear();
296 }
297
298 void
setLookupReply(const LookupReplyPrxPtr & lookupReply)299 LookupI::setLookupReply(const LookupReplyPrxPtr& lookupReply)
300 {
301 //
302 // Use a lookup reply proxy whose adress matches the interface used to send multicast datagrams.
303 //
304 for(vector<pair<LookupPrxPtr, LookupReplyPrxPtr> >::iterator p = _lookups.begin(); p != _lookups.end(); ++p)
305 {
306 UDPEndpointInfoPtr info = ICE_DYNAMIC_CAST(UDPEndpointInfo, p->first->ice_getEndpoints()[0]->getInfo());
307 if(info && !info->mcastInterface.empty())
308 {
309 EndpointSeq endpts = lookupReply->ice_getEndpoints();
310 for(EndpointSeq::const_iterator q = endpts.begin(); q != endpts.end(); ++q)
311 {
312 IPEndpointInfoPtr r = ICE_DYNAMIC_CAST(IPEndpointInfo, (*q)->getInfo());
313 if(r && r->host == info->mcastInterface)
314 {
315 EndpointSeq single;
316 single.push_back(*q);
317 p->second = lookupReply->ice_endpoints(single);
318 }
319 }
320 }
321
322 if(!p->second)
323 {
324 p->second = lookupReply; // Fallback: just use the given lookup reply proxy if no matching endpoint found.
325 }
326 }
327 }
328
329 void
findObjectById(ICE_IN (string)domainId,ICE_IN (Ice::Identity)id,ICE_IN (LookupReplyPrxPtr)reply,const Ice::Current &)330 LookupI::findObjectById(ICE_IN(string) domainId, ICE_IN(Ice::Identity) id, ICE_IN(LookupReplyPrxPtr) reply,
331 const Ice::Current&)
332 {
333 if(domainId != _domainId)
334 {
335 return; // Ignore.
336 }
337
338 Ice::ObjectPrxPtr proxy = _registry->findObject(id);
339 if(proxy)
340 {
341 //
342 // Reply to the mulicast request using the given proxy.
343 //
344 try
345 {
346 #ifdef ICE_CPP11_MAPPING
347 reply->foundObjectByIdAsync(id, proxy);
348 #else
349 reply->begin_foundObjectById(id, proxy);
350 #endif
351 }
352 catch(const Ice::LocalException&)
353 {
354 // Ignore.
355 }
356 }
357 }
358
359 void
findAdapterById(ICE_IN (string)domainId,ICE_IN (string)adapterId,ICE_IN (LookupReplyPrxPtr)reply,const Ice::Current &)360 LookupI::findAdapterById(ICE_IN(string) domainId, ICE_IN(string) adapterId, ICE_IN(LookupReplyPrxPtr) reply,
361 const Ice::Current&)
362 {
363 if(domainId != _domainId)
364 {
365 return; // Ignore.
366 }
367
368 bool isReplicaGroup;
369 Ice::ObjectPrxPtr proxy = _registry->findAdapter(adapterId, isReplicaGroup);
370 if(proxy)
371 {
372 //
373 // Reply to the multicast request using the given proxy.
374 //
375 try
376 {
377 #ifdef ICE_CPP11_MAPPING
378 reply->foundAdapterByIdAsync(adapterId, proxy, isReplicaGroup);
379 #else
380 reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
381 #endif
382 }
383 catch(const Ice::LocalException&)
384 {
385 // Ignore.
386 }
387 }
388 }
389
390 void
findObject(const ObjectCB & cb,const Ice::Identity & id)391 LookupI::findObject(const ObjectCB& cb, const Ice::Identity& id)
392 {
393 Lock sync(*this);
394 map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
395 if(p == _objectRequests.end())
396 {
397 p = _objectRequests.insert(make_pair(id, ICE_MAKE_SHARED(ObjectRequest,
398 ICE_SHARED_FROM_THIS,
399 id,
400 _retryCount))).first;
401 }
402
403 if(p->second->addCallback(cb))
404 {
405 try
406 {
407 p->second->invoke(_domainId, _lookups);
408 _timer->schedule(p->second, _timeout);
409 }
410 catch(const Ice::LocalException&)
411 {
412 p->second->finished(ICE_NULLPTR);
413 _objectRequests.erase(p);
414 }
415 }
416 }
417
418 void
findAdapter(const AdapterCB & cb,const std::string & adapterId)419 LookupI::findAdapter(const AdapterCB& cb, const std::string& adapterId)
420 {
421 Lock sync(*this);
422 map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
423 if(p == _adapterRequests.end())
424 {
425 p = _adapterRequests.insert(make_pair(adapterId, ICE_MAKE_SHARED(AdapterRequest,
426 ICE_SHARED_FROM_THIS,
427 adapterId,
428 _retryCount))).first;
429 }
430
431 if(p->second->addCallback(cb))
432 {
433 try
434 {
435 p->second->invoke(_domainId, _lookups);
436 _timer->schedule(p->second, _timeout);
437 }
438 catch(const Ice::LocalException&)
439 {
440 p->second->finished(ICE_NULLPTR);
441 _adapterRequests.erase(p);
442 }
443 }
444 }
445
446 void
foundObject(const Ice::Identity & id,const string & requestId,const Ice::ObjectPrxPtr & proxy)447 LookupI::foundObject(const Ice::Identity& id, const string& requestId, const Ice::ObjectPrxPtr& proxy)
448 {
449 Lock sync(*this);
450 map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
451 if(p != _objectRequests.end() && p->second->getRequestId() == requestId) // Ignore responses from old requests
452 {
453 p->second->response(proxy);
454 _timer->cancel(p->second);
455 _objectRequests.erase(p);
456 }
457 }
458
459 void
foundAdapter(const string & adapterId,const string & requestId,const Ice::ObjectPrxPtr & proxy,bool isReplicaGroup)460 LookupI::foundAdapter(const string& adapterId, const string& requestId, const Ice::ObjectPrxPtr& proxy,
461 bool isReplicaGroup)
462 {
463 Lock sync(*this);
464 map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
465 if(p != _adapterRequests.end() && p->second->getRequestId() == requestId) // Ignore responses from old requests
466 {
467 if(p->second->response(proxy, isReplicaGroup))
468 {
469 _timer->cancel(p->second);
470 _adapterRequests.erase(p);
471 }
472 }
473 }
474
475 void
objectRequestTimedOut(const ObjectRequestPtr & request)476 LookupI::objectRequestTimedOut(const ObjectRequestPtr& request)
477 {
478 Lock sync(*this);
479 map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(request->getId());
480 if(p == _objectRequests.end() || p->second.get() != request.get())
481 {
482 return;
483 }
484
485 if(request->retry())
486 {
487 try
488 {
489 request->invoke(_domainId, _lookups);
490 _timer->schedule(request, _timeout);
491 return;
492 }
493 catch(const Ice::LocalException&)
494 {
495 }
496 }
497
498 request->finished(0);
499 _objectRequests.erase(p);
500 _timer->cancel(request);
501 }
502
503 void
adapterRequestException(const AdapterRequestPtr & request,const LocalException & ex)504 LookupI::adapterRequestException(const AdapterRequestPtr& request, const LocalException& ex)
505 {
506 Lock sync(*this);
507 map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(request->getId());
508 if(p == _adapterRequests.end() || p->second.get() != request.get())
509 {
510 return;
511 }
512
513 if(request->exception())
514 {
515 if(_warnOnce)
516 {
517 Warning warn(_lookup->ice_getCommunicator()->getLogger());
518 warn << "failed to lookup adapter `" << p->first << "' with lookup proxy `" << _lookup << "':\n" << ex;
519 _warnOnce = false;
520 }
521 _timer->cancel(request);
522 _adapterRequests.erase(p);
523 }
524 }
525
526 void
adapterRequestTimedOut(const AdapterRequestPtr & request)527 LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request)
528 {
529 Lock sync(*this);
530 map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(request->getId());
531 if(p == _adapterRequests.end() || p->second.get() != request.get())
532 {
533 return;
534 }
535
536 if(request->retry())
537 {
538 try
539 {
540 request->invoke(_domainId, _lookups);
541 _timer->schedule(request, _timeout);
542 return;
543 }
544 catch(const Ice::LocalException&)
545 {
546 }
547 }
548
549 request->finished(0);
550 _adapterRequests.erase(p);
551 _timer->cancel(request);
552 }
553
554 void
objectRequestException(const ObjectRequestPtr & request,const LocalException & ex)555 LookupI::objectRequestException(const ObjectRequestPtr& request, const LocalException& ex)
556 {
557 Lock sync(*this);
558 map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(request->getId());
559 if(p == _objectRequests.end() || p->second.get() != request.get())
560 {
561 return;
562 }
563
564 if(request->exception())
565 {
566 if(_warnOnce)
567 {
568 Warning warn(_lookup->ice_getCommunicator()->getLogger());
569 string id = _lookup->ice_getCommunicator()->identityToString(p->first);
570 warn << "failed to lookup object `" << id << "' with lookup proxy `" << _lookup << "':\n" << ex;
571 _warnOnce = false;
572 }
573 _timer->cancel(request);
574 _objectRequests.erase(p);
575 }
576 }
577
LookupReplyI(const LookupIPtr & lookup)578 LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup)
579 {
580 }
581
582 #ifdef ICE_CPP11_MAPPING
583 void
foundObjectById(Identity id,shared_ptr<ObjectPrx> proxy,const Current & current)584 LookupReplyI::foundObjectById(Identity id, shared_ptr<ObjectPrx> proxy, const Current& current)
585 {
586 _lookup->foundObject(id, current.id.name, proxy);
587 }
588
589 void
foundAdapterById(string adapterId,shared_ptr<ObjectPrx> proxy,bool isReplicaGroup,const Current & current)590 LookupReplyI::foundAdapterById(string adapterId, shared_ptr<ObjectPrx> proxy, bool isReplicaGroup,
591 const Current& current)
592 {
593 _lookup->foundAdapter(adapterId, current.id.name, proxy, isReplicaGroup);
594 }
595 #else
596 void
foundObjectById(const Identity & id,const ObjectPrxPtr & proxy,const Current & current)597 LookupReplyI::foundObjectById(const Identity& id, const ObjectPrxPtr& proxy, const Current& current)
598 {
599 _lookup->foundObject(id, current.id.name, proxy);
600 }
601
602 void
foundAdapterById(const string & adapterId,const ObjectPrxPtr & proxy,bool isReplicaGroup,const Current & current)603 LookupReplyI::foundAdapterById(const string& adapterId, const ObjectPrxPtr& proxy, bool isReplicaGroup,
604 const Current& current)
605 {
606 _lookup->foundAdapter(adapterId, current.id.name, proxy, isReplicaGroup);
607 }
608 #endif
609