1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <Ice/Ice.h>
6 #include <IceGrid/Topics.h>
7 #include <IceGrid/DescriptorHelper.h>
8
9 using namespace std;
10 using namespace IceGrid;
11
12 namespace
13 {
14
15 //
16 // Encodings supported by the observers. We create one topic per
17 // encoding version and subscribe the observer to the appropriate
18 // topic depending on its encoding.
19 //
20 Ice::EncodingVersion encodings[] = {
21 { 1, 0 },
22 { 1, 1 }
23 };
24
25 }
26
ObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const string & name,Ice::Long dbSerial)27 ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) :
28 _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial)
29 {
30 for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i)
31 {
32 ostringstream os;
33 os << name << "-" << Ice::encodingVersionToString(encodings[i]);
34 IceStorm::TopicPrx t;
35 try
36 {
37 t = topicManager->create(os.str());
38 }
39 catch(const IceStorm::TopicExists&)
40 {
41 t = topicManager->retrieve(os.str());
42 }
43
44 //
45 // NOTE: collocation optimization needs to be turned on for the
46 // topic because the subscribe() method is given a fixed proxy
47 // which can't be marshalled.
48 //
49 _topics[encodings[i]] = t;
50 _basePublishers.push_back(t->getPublisher()->ice_encodingVersion(encodings[i]));
51 }
52 }
53
~ObserverTopic()54 ObserverTopic::~ObserverTopic()
55 {
56 }
57
58 int
subscribe(const Ice::ObjectPrx & obsv,const string & name)59 ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
60 {
61 Lock sync(*this);
62 if(_topics.empty())
63 {
64 return -1;
65 }
66
67 assert(obsv);
68 try
69 {
70 IceStorm::QoS qos;
71 qos["reliability"] = "ordered";
72 Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion());
73 map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
74 if(p == _topics.end())
75 {
76 Ice::Warning out(_logger);
77 out << "unsupported encoding version for observer `" << obsv << "'";
78 return -1;
79 }
80 initObserver(p->second->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
81 }
82 catch(const IceStorm::AlreadySubscribed&)
83 {
84 throw ObserverAlreadyRegisteredException(obsv->ice_getIdentity());
85 }
86
87 if(!name.empty())
88 {
89 assert(_syncSubscribers.find(name) == _syncSubscribers.end());
90 _syncSubscribers.insert(name);
91 addExpectedUpdate(_serial, name);
92 return _serial;
93 }
94 return -1;
95 }
96
97 void
unsubscribe(const Ice::ObjectPrx & observer,const string & name)98 ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
99 {
100 Lock sync(*this);
101 Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion());
102 map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator q = _topics.find(v);
103 if(q == _topics.end())
104 {
105 return;
106 }
107 try
108 {
109 q->second->unsubscribe(observer);
110 }
111 catch(const Ice::ObjectAdapterDeactivatedException&)
112 {
113 }
114
115 assert(observer);
116
117 if(!name.empty())
118 {
119 assert(_syncSubscribers.find(name) != _syncSubscribers.end());
120 _syncSubscribers.erase(name);
121
122 map<int, set<string> >::iterator p = _waitForUpdates.begin();
123 bool notifyMonitor = false;
124 while(p != _waitForUpdates.end())
125 {
126 p->second.erase(name);
127 if(p->second.empty())
128 {
129 _waitForUpdates.erase(p++);
130 notifyMonitor = true;
131 }
132 else
133 {
134 ++p;
135 }
136 }
137
138 if(notifyMonitor)
139 {
140 notifyAll();
141 }
142 }
143 }
144
145 void
destroy()146 ObserverTopic::destroy()
147 {
148 Lock sync(*this);
149 _topics.clear();
150 notifyAll();
151 }
152
153 void
receivedUpdate(const string & name,int serial,const string & failure)154 ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
155 {
156 Lock sync(*this);
157 map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
158 if(p != _waitForUpdates.end())
159 {
160 p->second.erase(name);
161
162 if(!failure.empty())
163 {
164 map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
165 if(q == _updateFailures.end())
166 {
167 q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first;
168 }
169 q->second.insert(make_pair(name, failure));
170 }
171
172 if(p->second.empty())
173 {
174 _waitForUpdates.erase(p);
175 }
176
177 notifyAll();
178 }
179 }
180
181 void
waitForSyncedSubscribers(int serial,const string & name)182 ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
183 {
184 Lock sync(*this);
185 waitForSyncedSubscribersNoSync(serial, name);
186 }
187
188 int
getSerial() const189 ObserverTopic::getSerial() const
190 {
191 Lock sync(*this);
192 return _serial;
193 }
194
195 void
addExpectedUpdate(int serial,const string & name)196 ObserverTopic::addExpectedUpdate(int serial, const string& name)
197 {
198 if(_syncSubscribers.empty() && name.empty())
199 {
200 return;
201 }
202
203 // Must be called with the lock held.
204 if(name.empty())
205 {
206 assert(_waitForUpdates[serial].empty());
207 _waitForUpdates[serial] = _syncSubscribers;
208 }
209 else
210 {
211 _waitForUpdates[serial].insert(name);
212 }
213 }
214
215 void
waitForSyncedSubscribersNoSync(int serial,const string & name)216 ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
217 {
218 if(serial < 0)
219 {
220 return;
221 }
222
223 //
224 // Wait until all the updates are received or the service shutdown.
225 //
226 while(!_topics.empty())
227 {
228 map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
229 if(p == _waitForUpdates.end())
230 {
231 map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
232 if(q != _updateFailures.end())
233 {
234 map<string, string> failures = q->second;
235 _updateFailures.erase(q);
236 ostringstream os;
237 for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
238 {
239 os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
240 }
241
242 Ice::Error err(_logger);
243 err << os.str();
244 }
245 return;
246 }
247 else
248 {
249 if(!name.empty() && p->second.find(name) == p->second.end())
250 {
251 return;
252 }
253 wait();
254 }
255 }
256 }
257
258 void
updateSerial(Ice::Long dbSerial)259 ObserverTopic::updateSerial(Ice::Long dbSerial)
260 {
261 ++_serial;
262 if(dbSerial > 0)
263 {
264 _dbSerial = dbSerial;
265 }
266 }
267
268 Ice::Context
getContext(int serial,Ice::Long dbSerial) const269 ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
270 {
271 Ice::Context context;
272 {
273 ostringstream os;
274 os << serial;
275 context["serial"] = os.str();
276 }
277 if(dbSerial > 0)
278 {
279 ostringstream os;
280 os << dbSerial;
281 context["dbSerial"] = os.str();
282 }
283 return context;
284 }
285
RegistryObserverTopic(const IceStorm::TopicManagerPrx & topicManager)286 RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
287 ObserverTopic(topicManager, "RegistryObserver")
288 {
289 _publishers = getPublishers<RegistryObserverPrx>();
290 }
291
292 void
registryUp(const RegistryInfo & info)293 RegistryObserverTopic::registryUp(const RegistryInfo& info)
294 {
295 Lock sync(*this);
296 if(_topics.empty())
297 {
298 return;
299 }
300 updateSerial();
301 _registries.insert(make_pair(info.name, info));
302 try
303 {
304 for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
305 {
306 (*p)->registryUp(info);
307 }
308 }
309 catch(const Ice::LocalException& ex)
310 {
311 Ice::Warning out(_logger);
312 out << "unexpected exception while publishing `registryUp' update:\n" << ex;
313 }
314 }
315
316 void
registryDown(const string & name)317 RegistryObserverTopic::registryDown(const string& name)
318 {
319 Lock sync(*this);
320 if(_topics.empty())
321 {
322 return;
323 }
324
325 if(_registries.find(name) == _registries.end())
326 {
327 return;
328 }
329
330 updateSerial();
331 _registries.erase(name);
332 try
333 {
334 for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
335 {
336 (*p)->registryDown(name);
337 }
338 }
339 catch(const Ice::LocalException& ex)
340 {
341 Ice::Warning out(_logger);
342 out << "unexpected exception while publishing `registryDown' update:\n" << ex;
343 }
344 }
345
346 void
initObserver(const Ice::ObjectPrx & obsv)347 RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
348 {
349 RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
350 RegistryInfoSeq registries;
351 registries.reserve(_registries.size());
352 for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
353 {
354 registries.push_back(p->second);
355 }
356 observer->registryInit(registries, getContext(_serial));
357 }
358
NodeObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const Ice::ObjectAdapterPtr & adapter)359 NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
360 const Ice::ObjectAdapterPtr& adapter) :
361 ObserverTopic(topicManager, "NodeObserver")
362 {
363 _publishers = getPublishers<NodeObserverPrx>();
364 try
365 {
366 const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
367 }
368 catch(const Ice::LocalException&)
369 {
370 }
371 }
372
373 void
nodeInit(const NodeDynamicInfoSeq &,const Ice::Current &)374 NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&)
375 {
376 assert(false);
377 }
378
379 void
nodeUp(const NodeDynamicInfo & info,const Ice::Current &)380 NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
381 {
382 Lock sync(*this);
383 if(_topics.empty())
384 {
385 return;
386 }
387 updateSerial();
388 _nodes.insert(make_pair(info.info.name, info));
389 for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p)
390 {
391 _serverStatus[p->id] = p->enabled;
392 }
393 try
394 {
395 for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
396 {
397 (*p)->nodeUp(info);
398 }
399 }
400 catch(const Ice::LocalException& ex)
401 {
402 Ice::Warning out(_logger);
403 out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
404 }
405 }
406
407 void
nodeDown(const string &,const Ice::Current &)408 NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&)
409 {
410 assert(false);
411 }
412
413 void
updateServer(const string & node,const ServerDynamicInfo & server,const Ice::Current &)414 NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
415 {
416 Lock sync(*this);
417 if(_topics.empty())
418 {
419 return;
420 }
421
422 if(_nodes.find(node) == _nodes.end())
423 {
424 //
425 // If the node isn't known anymore, we ignore the update.
426 //
427 return;
428 }
429
430 updateSerial();
431
432 ServerDynamicInfoSeq& servers = _nodes[node].servers;
433 ServerDynamicInfoSeq::iterator p = servers.begin();
434 while(p != servers.end())
435 {
436 if(p->id == server.id)
437 {
438 if(server.state == Destroyed || (server.state == Inactive && server.enabled))
439 {
440 servers.erase(p);
441 }
442 else
443 {
444 *p = server;
445 }
446 break;
447 }
448 ++p;
449 }
450 if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end())
451 {
452 servers.push_back(server);
453 }
454
455 if(server.state != Destroyed)
456 {
457 _serverStatus[server.id] = server.enabled;
458 }
459 else
460 {
461 _serverStatus.erase(server.id);
462 }
463
464 try
465 {
466 for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
467 {
468 (*q)->updateServer(node, server);
469 }
470 }
471 catch(const Ice::LocalException& ex)
472 {
473 Ice::Warning out(_logger);
474 out << "unexpected exception while publishing `updateServer' update:\n" << ex;
475 }
476 }
477
478 void
updateAdapter(const string & node,const AdapterDynamicInfo & adapter,const Ice::Current &)479 NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
480 {
481 Lock sync(*this);
482 if(_topics.empty())
483 {
484 return;
485 }
486
487 if(_nodes.find(node) == _nodes.end())
488 {
489 //
490 // If the node isn't known anymore, we ignore the update.
491 //
492 return;
493 }
494
495 updateSerial();
496
497 AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
498 AdapterDynamicInfoSeq::iterator p = adapters.begin();
499 while(p != adapters.end())
500 {
501 if(p->id == adapter.id)
502 {
503 if(adapter.proxy)
504 {
505 *p = adapter;
506 }
507 else
508 {
509 adapters.erase(p);
510 }
511 break;
512 }
513 ++p;
514 }
515 if(adapter.proxy && p == adapters.end())
516 {
517 adapters.push_back(adapter);
518 }
519
520 try
521 {
522 for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
523 {
524 (*q)->updateAdapter(node, adapter);
525 }
526 }
527 catch(const Ice::LocalException& ex)
528 {
529 Ice::Warning out(_logger);
530 out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
531 }
532 }
533
534 void
nodeDown(const string & name)535 NodeObserverTopic::nodeDown(const string& name)
536 {
537 Lock sync(*this);
538 if(_topics.empty())
539 {
540 return;
541 }
542
543 updateSerial();
544
545 if(_nodes.find(name) == _nodes.end())
546 {
547 return;
548 }
549
550 ServerDynamicInfoSeq& servers = _nodes[name].servers;
551 for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
552 {
553 _serverStatus.erase(p->id);
554 }
555
556 _nodes.erase(name);
557 try
558 {
559 for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
560 {
561 (*p)->nodeDown(name);
562 }
563 }
564 catch(const Ice::LocalException& ex)
565 {
566 Ice::Warning out(_logger);
567 out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
568 }
569 }
570
571 void
initObserver(const Ice::ObjectPrx & obsv)572 NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
573 {
574 NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
575 NodeDynamicInfoSeq nodes;
576 nodes.reserve(_nodes.size());
577 for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
578 {
579 nodes.push_back(p->second);
580 }
581 observer->nodeInit(nodes, getContext(_serial));
582 }
583
584 bool
isServerEnabled(const string & server) const585 NodeObserverTopic::isServerEnabled(const string& server) const
586 {
587 Lock sync(*this);
588 if(_topics.empty())
589 {
590 return false;
591 }
592 map<string, bool>::const_iterator p = _serverStatus.find(server);
593 if(p != _serverStatus.end())
594 {
595 return p->second;
596 }
597 else
598 {
599 return true; // Assume the server is enabled if we don't know its status.
600 }
601 }
602
ApplicationObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<string,ApplicationInfo> & applications,Ice::Long serial)603 ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
604 const map<string, ApplicationInfo>& applications, Ice::Long serial) :
605 ObserverTopic(topicManager, "ApplicationObserver", serial),
606 _applications(applications)
607 {
608 _publishers = getPublishers<ApplicationObserverPrx>();
609 }
610
611 int
applicationInit(Ice::Long dbSerial,const ApplicationInfoSeq & apps)612 ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps)
613 {
614 Lock sync(*this);
615 if(_topics.empty())
616 {
617 return -1;
618 }
619 updateSerial(dbSerial);
620 _applications.clear();
621 for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
622 {
623 _applications.insert(make_pair(p->descriptor.name, *p));
624 }
625 try
626 {
627 for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
628 {
629 (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial));
630 }
631 }
632 catch(const Ice::LocalException& ex)
633 {
634 Ice::Warning out(_logger);
635 out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
636 }
637 addExpectedUpdate(_serial);
638 return _serial;
639 }
640
641 int
applicationAdded(Ice::Long dbSerial,const ApplicationInfo & info)642 ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
643 {
644 Lock sync(*this);
645 if(_topics.empty())
646 {
647 return -1;
648 }
649
650 updateSerial(dbSerial);
651 _applications.insert(make_pair(info.descriptor.name, info));
652 try
653 {
654 for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
655 {
656 (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial));
657 }
658 }
659 catch(const Ice::LocalException& ex)
660 {
661 Ice::Warning out(_logger);
662 out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
663 }
664 addExpectedUpdate(_serial);
665 return _serial;
666 }
667
668 int
applicationRemoved(Ice::Long dbSerial,const string & name)669 ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
670 {
671 Lock sync(*this);
672 if(_topics.empty())
673 {
674 return -1;
675 }
676 updateSerial(dbSerial);
677 _applications.erase(name);
678 try
679 {
680 for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
681 {
682 (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial));
683 }
684 }
685 catch(const Ice::LocalException& ex)
686 {
687 Ice::Warning out(_logger);
688 out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
689 }
690 addExpectedUpdate(_serial);
691 return _serial;
692 }
693
694 int
applicationUpdated(Ice::Long dbSerial,const ApplicationUpdateInfo & info)695 ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
696 {
697 Lock sync(*this);
698 if(_topics.empty())
699 {
700 return -1;
701 }
702
703 updateSerial(dbSerial);
704 try
705 {
706 map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
707 if(p != _applications.end())
708 {
709 ApplicationHelper helper(_publishers[0]->ice_getCommunicator(), p->second.descriptor);
710 p->second.descriptor = helper.update(info.descriptor);
711 p->second.updateTime = info.updateTime;
712 p->second.updateUser = info.updateUser;
713 p->second.revision = info.revision;
714 }
715 }
716 catch(const DeploymentException& ex)
717 {
718 Ice::Error out(_logger);
719 out << "unexpected exception while instantiating application `" << info.descriptor.name << "':\n" << ex.reason;
720 assert(false);
721 }
722 catch(const std::exception& ex)
723 {
724 Ice::Error out(_logger);
725 out << "unexpected exception while instantiating application `" << info.descriptor.name << "':\n" << ex.what();
726 assert(false);
727 }
728 catch(...)
729 {
730 Ice::Error out(_logger);
731 out << "unexpected exception while instantiating application `" << info.descriptor.name << "'";
732 assert(false);
733 }
734 try
735 {
736 for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
737 {
738 (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial));
739 }
740 }
741 catch(const Ice::LocalException& ex)
742 {
743 Ice::Warning out(_logger);
744 out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
745 }
746 addExpectedUpdate(_serial);
747 return _serial;
748 }
749
750 void
initObserver(const Ice::ObjectPrx & obsv)751 ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
752 {
753 ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
754 ApplicationInfoSeq applications;
755 for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
756 {
757 applications.push_back(p->second);
758 }
759 observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial));
760 }
761
AdapterObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<string,AdapterInfo> & adapters,Ice::Long serial)762 AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
763 const map<string, AdapterInfo>& adapters, Ice::Long serial) :
764 ObserverTopic(topicManager, "AdapterObserver", serial),
765 _adapters(adapters)
766 {
767 _publishers = getPublishers<AdapterObserverPrx>();
768 }
769
770 int
adapterInit(Ice::Long dbSerial,const AdapterInfoSeq & adpts)771 AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
772 {
773 Lock sync(*this);
774 if(_topics.empty())
775 {
776 return -1;
777 }
778 updateSerial(dbSerial);
779 _adapters.clear();
780 for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
781 {
782 _adapters.insert(make_pair(q->id, *q));
783 }
784 try
785 {
786 for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
787 {
788 (*p)->adapterInit(adpts, getContext(_serial, dbSerial));
789 }
790 }
791 catch(const Ice::LocalException& ex)
792 {
793 Ice::Warning out(_logger);
794 out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
795 }
796 addExpectedUpdate(_serial);
797 return _serial;
798 }
799
800 int
adapterAdded(Ice::Long dbSerial,const AdapterInfo & info)801 AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
802 {
803 Lock sync(*this);
804 if(_topics.empty())
805 {
806 return -1;
807 }
808 updateSerial(dbSerial);
809 _adapters.insert(make_pair(info.id, info));
810 try
811 {
812 for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
813 {
814 (*p)->adapterAdded(info, getContext(_serial, dbSerial));
815 }
816 }
817 catch(const Ice::LocalException& ex)
818 {
819 Ice::Warning out(_logger);
820 out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
821 }
822 addExpectedUpdate(_serial);
823 return _serial;
824 }
825
826 int
adapterUpdated(Ice::Long dbSerial,const AdapterInfo & info)827 AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
828 {
829 Lock sync(*this);
830 if(_topics.empty())
831 {
832 return -1;
833 }
834 updateSerial(dbSerial);
835 _adapters[info.id] = info;
836 try
837 {
838 for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
839 {
840 (*p)->adapterUpdated(info, getContext(_serial, dbSerial));
841 }
842 }
843 catch(const Ice::LocalException& ex)
844 {
845 Ice::Warning out(_logger);
846 out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
847 }
848 addExpectedUpdate(_serial);
849 return _serial;
850 }
851
852 int
adapterRemoved(Ice::Long dbSerial,const string & id)853 AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
854 {
855 Lock sync(*this);
856 if(_topics.empty())
857 {
858 return -1;
859 }
860 updateSerial(dbSerial);
861 _adapters.erase(id);
862 try
863 {
864 for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
865 {
866 (*p)->adapterRemoved(id, getContext(_serial, dbSerial));
867 }
868 }
869 catch(const Ice::LocalException& ex)
870 {
871 Ice::Warning out(_logger);
872 out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
873 }
874 addExpectedUpdate(_serial);
875 return _serial;
876 }
877
878 void
initObserver(const Ice::ObjectPrx & obsv)879 AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
880 {
881 AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
882 AdapterInfoSeq adapters;
883 for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
884 {
885 adapters.push_back(p->second);
886 }
887 observer->adapterInit(adapters, getContext(_serial, _dbSerial));
888 }
889
ObjectObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<Ice::Identity,ObjectInfo> & objects,Ice::Long serial)890 ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
891 const map<Ice::Identity, ObjectInfo>& objects, Ice::Long serial) :
892 ObserverTopic(topicManager, "ObjectObserver", serial),
893 _objects(objects)
894 {
895 _publishers = getPublishers<ObjectObserverPrx>();
896 }
897
898 int
objectInit(Ice::Long dbSerial,const ObjectInfoSeq & objects)899 ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
900 {
901 Lock sync(*this);
902 if(_topics.empty())
903 {
904 return -1;
905 }
906 updateSerial(dbSerial);
907 _objects.clear();
908 for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
909 {
910 _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
911 }
912 try
913 {
914 for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
915 {
916 (*p)->objectInit(objects, getContext(_serial, dbSerial));
917 }
918 }
919 catch(const Ice::LocalException& ex)
920 {
921 Ice::Warning out(_logger);
922 out << "unexpected exception while publishing `objectInit' update:\n" << ex;
923 }
924 addExpectedUpdate(_serial);
925 return _serial;
926 }
927
928 int
objectAdded(Ice::Long dbSerial,const ObjectInfo & info)929 ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
930 {
931 Lock sync(*this);
932 if(_topics.empty())
933 {
934 return -1;
935 }
936 updateSerial(dbSerial);
937 _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
938 try
939 {
940 for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
941 {
942 (*p)->objectAdded(info, getContext(_serial, dbSerial));
943 }
944 }
945 catch(const Ice::LocalException& ex)
946 {
947 Ice::Warning out(_logger);
948 out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
949 }
950 addExpectedUpdate(_serial);
951 return _serial;
952 }
953
954 int
objectUpdated(Ice::Long dbSerial,const ObjectInfo & info)955 ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
956 {
957 Lock sync(*this);
958 if(_topics.empty())
959 {
960 return -1;
961 }
962 updateSerial(dbSerial);
963 _objects[info.proxy->ice_getIdentity()] = info;
964 try
965 {
966 for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
967 {
968 (*p)->objectUpdated(info, getContext(_serial, dbSerial));
969 }
970 }
971 catch(const Ice::LocalException& ex)
972 {
973 Ice::Warning out(_logger);
974 out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
975 }
976 addExpectedUpdate(_serial);
977 return _serial;
978 }
979
980 int
objectRemoved(Ice::Long dbSerial,const Ice::Identity & id)981 ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
982 {
983 Lock sync(*this);
984 if(_topics.empty())
985 {
986 return -1;
987 }
988 updateSerial(dbSerial);
989 _objects.erase(id);
990 try
991 {
992 for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
993 {
994 (*p)->objectRemoved(id, getContext(_serial, dbSerial));
995 }
996 }
997 catch(const Ice::LocalException& ex)
998 {
999 Ice::Warning out(_logger);
1000 out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
1001 }
1002 addExpectedUpdate(_serial);
1003 return _serial;
1004 }
1005
1006 int
wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq & infos)1007 ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
1008 {
1009 Lock sync(*this);
1010 if(_topics.empty())
1011 {
1012 return -1;
1013 }
1014
1015 for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1016 {
1017 updateSerial();
1018 map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity());
1019 if(q != _objects.end())
1020 {
1021 q->second = *p;
1022 try
1023 {
1024 for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
1025 {
1026 (*r)->objectUpdated(*p, getContext(_serial));
1027 }
1028 }
1029 catch(const Ice::LocalException& ex)
1030 {
1031 Ice::Warning out(_logger);
1032 out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
1033 }
1034 }
1035 else
1036 {
1037 _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
1038 try
1039 {
1040 for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
1041 {
1042 (*r)->objectAdded(*p, getContext(_serial));
1043 }
1044 }
1045 catch(const Ice::LocalException& ex)
1046 {
1047 Ice::Warning out(_logger);
1048 out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
1049 }
1050 }
1051 }
1052
1053 //
1054 // We don't wait for the update to be received by the replicas
1055 // here. This operation is called by ReplicaSessionI.
1056 //
1057 addExpectedUpdate(_serial);
1058 //waitForSyncedSubscribersNoSync(_serial);
1059 return _serial;
1060 }
1061
1062 int
wellKnownObjectsRemoved(const ObjectInfoSeq & infos)1063 ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
1064 {
1065 Lock sync(*this);
1066 if(_topics.empty())
1067 {
1068 return -1;
1069 }
1070
1071 for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1072 {
1073 updateSerial();
1074 _objects.erase(p->proxy->ice_getIdentity());
1075 try
1076 {
1077 for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
1078 {
1079 (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
1080 }
1081 }
1082 catch(const Ice::LocalException& ex)
1083 {
1084 Ice::Warning out(_logger);
1085 out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
1086 }
1087 }
1088
1089 //
1090 // We don't need to wait for the update to be received by the
1091 // replicas here. This operation is only called internaly by
1092 // IceGrid.
1093 //
1094 addExpectedUpdate(_serial);
1095 //waitForSyncedSubscribersNoSync(_serial);
1096 return _serial;
1097 }
1098
1099 void
initObserver(const Ice::ObjectPrx & obsv)1100 ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
1101 {
1102 ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
1103 ObjectInfoSeq objects;
1104 for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
1105 {
1106 objects.push_back(p->second);
1107 }
1108 observer->objectInit(objects, getContext(_serial, _dbSerial));
1109 }
1110