1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <Ice/Ice.h>
6 #include <IceGrid/Topics.h>
7 #include <IceGrid/DescriptorHelper.h>
8 
9 using namespace std;
10 using namespace IceGrid;
11 
12 namespace
13 {
14 
15 //
16 // Encodings supported by the observers. We create one topic per
17 // encoding version and subscribe the observer to the appropriate
18 // topic depending on its encoding.
19 //
20 Ice::EncodingVersion encodings[] = {
21     { 1, 0 },
22     { 1, 1 }
23 };
24 
25 }
26 
ObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const string & name,Ice::Long dbSerial)27 ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name, Ice::Long dbSerial) :
28     _logger(topicManager->ice_getCommunicator()->getLogger()), _serial(0), _dbSerial(dbSerial)
29 {
30     for(int i = 0; i < static_cast<int>(sizeof(encodings) / sizeof(Ice::EncodingVersion)); ++i)
31     {
32         ostringstream os;
33         os << name << "-" << Ice::encodingVersionToString(encodings[i]);
34         IceStorm::TopicPrx t;
35         try
36         {
37             t = topicManager->create(os.str());
38         }
39         catch(const IceStorm::TopicExists&)
40         {
41             t = topicManager->retrieve(os.str());
42         }
43 
44         //
45         // NOTE: collocation optimization needs to be turned on for the
46         // topic because the subscribe() method is given a fixed proxy
47         // which can't be marshalled.
48         //
49         _topics[encodings[i]] = t;
50         _basePublishers.push_back(t->getPublisher()->ice_encodingVersion(encodings[i]));
51     }
52 }
53 
~ObserverTopic()54 ObserverTopic::~ObserverTopic()
55 {
56 }
57 
58 int
subscribe(const Ice::ObjectPrx & obsv,const string & name)59 ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
60 {
61     Lock sync(*this);
62     if(_topics.empty())
63     {
64         return -1;
65     }
66 
67     assert(obsv);
68     try
69     {
70         IceStorm::QoS qos;
71         qos["reliability"] = "ordered";
72         Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion());
73         map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
74         if(p == _topics.end())
75         {
76             Ice::Warning out(_logger);
77             out << "unsupported encoding version for observer `" << obsv << "'";
78             return -1;
79         }
80         initObserver(p->second->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
81     }
82     catch(const IceStorm::AlreadySubscribed&)
83     {
84         throw ObserverAlreadyRegisteredException(obsv->ice_getIdentity());
85     }
86 
87     if(!name.empty())
88     {
89         assert(_syncSubscribers.find(name) == _syncSubscribers.end());
90         _syncSubscribers.insert(name);
91         addExpectedUpdate(_serial, name);
92         return _serial;
93     }
94     return -1;
95 }
96 
97 void
unsubscribe(const Ice::ObjectPrx & observer,const string & name)98 ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
99 {
100     Lock sync(*this);
101     Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion());
102     map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator q = _topics.find(v);
103     if(q == _topics.end())
104     {
105         return;
106     }
107     try
108     {
109         q->second->unsubscribe(observer);
110     }
111     catch(const Ice::ObjectAdapterDeactivatedException&)
112     {
113     }
114 
115     assert(observer);
116 
117     if(!name.empty())
118     {
119         assert(_syncSubscribers.find(name) != _syncSubscribers.end());
120         _syncSubscribers.erase(name);
121 
122         map<int, set<string> >::iterator p = _waitForUpdates.begin();
123         bool notifyMonitor = false;
124         while(p != _waitForUpdates.end())
125         {
126             p->second.erase(name);
127             if(p->second.empty())
128             {
129                 _waitForUpdates.erase(p++);
130                 notifyMonitor = true;
131             }
132             else
133             {
134                 ++p;
135             }
136         }
137 
138         if(notifyMonitor)
139         {
140             notifyAll();
141         }
142     }
143 }
144 
145 void
destroy()146 ObserverTopic::destroy()
147 {
148     Lock sync(*this);
149     _topics.clear();
150     notifyAll();
151 }
152 
153 void
receivedUpdate(const string & name,int serial,const string & failure)154 ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
155 {
156     Lock sync(*this);
157     map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
158     if(p != _waitForUpdates.end())
159     {
160         p->second.erase(name);
161 
162         if(!failure.empty())
163         {
164             map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
165             if(q == _updateFailures.end())
166             {
167                 q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first;
168             }
169             q->second.insert(make_pair(name, failure));
170         }
171 
172         if(p->second.empty())
173         {
174             _waitForUpdates.erase(p);
175         }
176 
177         notifyAll();
178     }
179 }
180 
181 void
waitForSyncedSubscribers(int serial,const string & name)182 ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
183 {
184     Lock sync(*this);
185     waitForSyncedSubscribersNoSync(serial, name);
186 }
187 
188 int
getSerial() const189 ObserverTopic::getSerial() const
190 {
191     Lock sync(*this);
192     return _serial;
193 }
194 
195 void
addExpectedUpdate(int serial,const string & name)196 ObserverTopic::addExpectedUpdate(int serial, const string& name)
197 {
198     if(_syncSubscribers.empty() && name.empty())
199     {
200         return;
201     }
202 
203     // Must be called with the lock held.
204     if(name.empty())
205     {
206         assert(_waitForUpdates[serial].empty());
207         _waitForUpdates[serial] = _syncSubscribers;
208     }
209     else
210     {
211         _waitForUpdates[serial].insert(name);
212     }
213 }
214 
215 void
waitForSyncedSubscribersNoSync(int serial,const string & name)216 ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
217 {
218     if(serial < 0)
219     {
220         return;
221     }
222 
223     //
224     // Wait until all the updates are received or the service shutdown.
225     //
226     while(!_topics.empty())
227     {
228         map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
229         if(p == _waitForUpdates.end())
230         {
231             map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
232             if(q != _updateFailures.end())
233             {
234                 map<string, string> failures = q->second;
235                 _updateFailures.erase(q);
236                 ostringstream os;
237                 for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
238                 {
239                     os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
240                 }
241 
242                 Ice::Error err(_logger);
243                 err << os.str();
244             }
245             return;
246         }
247         else
248         {
249             if(!name.empty() && p->second.find(name) == p->second.end())
250             {
251                 return;
252             }
253             wait();
254         }
255     }
256 }
257 
258 void
updateSerial(Ice::Long dbSerial)259 ObserverTopic::updateSerial(Ice::Long dbSerial)
260 {
261     ++_serial;
262     if(dbSerial > 0)
263     {
264         _dbSerial = dbSerial;
265     }
266 }
267 
268 Ice::Context
getContext(int serial,Ice::Long dbSerial) const269 ObserverTopic::getContext(int serial, Ice::Long dbSerial) const
270 {
271     Ice::Context context;
272     {
273         ostringstream os;
274         os << serial;
275         context["serial"] = os.str();
276     }
277     if(dbSerial > 0)
278     {
279         ostringstream os;
280         os << dbSerial;
281         context["dbSerial"] = os.str();
282     }
283     return context;
284 }
285 
RegistryObserverTopic(const IceStorm::TopicManagerPrx & topicManager)286 RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
287     ObserverTopic(topicManager, "RegistryObserver")
288 {
289     _publishers = getPublishers<RegistryObserverPrx>();
290 }
291 
292 void
registryUp(const RegistryInfo & info)293 RegistryObserverTopic::registryUp(const RegistryInfo& info)
294 {
295     Lock sync(*this);
296     if(_topics.empty())
297     {
298         return;
299     }
300     updateSerial();
301     _registries.insert(make_pair(info.name, info));
302     try
303     {
304         for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
305         {
306             (*p)->registryUp(info);
307         }
308     }
309     catch(const Ice::LocalException& ex)
310     {
311         Ice::Warning out(_logger);
312         out << "unexpected exception while publishing `registryUp' update:\n" << ex;
313     }
314 }
315 
316 void
registryDown(const string & name)317 RegistryObserverTopic::registryDown(const string& name)
318 {
319     Lock sync(*this);
320     if(_topics.empty())
321     {
322         return;
323     }
324 
325     if(_registries.find(name) == _registries.end())
326     {
327         return;
328     }
329 
330     updateSerial();
331     _registries.erase(name);
332     try
333     {
334         for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
335         {
336             (*p)->registryDown(name);
337         }
338     }
339     catch(const Ice::LocalException& ex)
340     {
341         Ice::Warning out(_logger);
342         out << "unexpected exception while publishing `registryDown' update:\n" << ex;
343     }
344 }
345 
346 void
initObserver(const Ice::ObjectPrx & obsv)347 RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
348 {
349     RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
350     RegistryInfoSeq registries;
351     registries.reserve(_registries.size());
352     for(map<string, RegistryInfo>::const_iterator p = _registries.begin(); p != _registries.end(); ++p)
353     {
354         registries.push_back(p->second);
355     }
356     observer->registryInit(registries, getContext(_serial));
357 }
358 
NodeObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const Ice::ObjectAdapterPtr & adapter)359 NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
360                                      const Ice::ObjectAdapterPtr& adapter) :
361     ObserverTopic(topicManager, "NodeObserver")
362 {
363     _publishers = getPublishers<NodeObserverPrx>();
364     try
365     {
366         const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
367     }
368     catch(const Ice::LocalException&)
369     {
370     }
371 }
372 
373 void
nodeInit(const NodeDynamicInfoSeq &,const Ice::Current &)374 NodeObserverTopic::nodeInit(const NodeDynamicInfoSeq&, const Ice::Current&)
375 {
376     assert(false);
377 }
378 
379 void
nodeUp(const NodeDynamicInfo & info,const Ice::Current &)380 NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
381 {
382     Lock sync(*this);
383     if(_topics.empty())
384     {
385         return;
386     }
387     updateSerial();
388     _nodes.insert(make_pair(info.info.name, info));
389     for(ServerDynamicInfoSeq::const_iterator p = info.servers.begin(); p != info.servers.end(); ++p)
390     {
391         _serverStatus[p->id] = p->enabled;
392     }
393     try
394     {
395         for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
396         {
397             (*p)->nodeUp(info);
398         }
399     }
400     catch(const Ice::LocalException& ex)
401     {
402         Ice::Warning out(_logger);
403         out << "unexpected exception while publishing 'nodeUp' update:\n" << ex;
404     }
405 }
406 
407 void
nodeDown(const string &,const Ice::Current &)408 NodeObserverTopic::nodeDown(const string& /*name*/, const Ice::Current&)
409 {
410     assert(false);
411 }
412 
413 void
updateServer(const string & node,const ServerDynamicInfo & server,const Ice::Current &)414 NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
415 {
416     Lock sync(*this);
417     if(_topics.empty())
418     {
419         return;
420     }
421 
422     if(_nodes.find(node) == _nodes.end())
423     {
424         //
425         // If the node isn't known anymore, we ignore the update.
426         //
427         return;
428     }
429 
430     updateSerial();
431 
432     ServerDynamicInfoSeq& servers = _nodes[node].servers;
433     ServerDynamicInfoSeq::iterator p = servers.begin();
434     while(p != servers.end())
435     {
436         if(p->id == server.id)
437         {
438             if(server.state == Destroyed || (server.state == Inactive && server.enabled))
439             {
440                 servers.erase(p);
441             }
442             else
443             {
444                 *p = server;
445             }
446             break;
447         }
448         ++p;
449     }
450     if(server.state != Destroyed && (server.state != Inactive || !server.enabled) && p == servers.end())
451     {
452         servers.push_back(server);
453     }
454 
455     if(server.state != Destroyed)
456     {
457         _serverStatus[server.id] = server.enabled;
458     }
459     else
460     {
461         _serverStatus.erase(server.id);
462     }
463 
464     try
465     {
466         for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
467         {
468             (*q)->updateServer(node, server);
469         }
470     }
471     catch(const Ice::LocalException& ex)
472     {
473         Ice::Warning out(_logger);
474         out << "unexpected exception while publishing `updateServer' update:\n" << ex;
475     }
476 }
477 
478 void
updateAdapter(const string & node,const AdapterDynamicInfo & adapter,const Ice::Current &)479 NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
480 {
481     Lock sync(*this);
482     if(_topics.empty())
483     {
484         return;
485     }
486 
487     if(_nodes.find(node) == _nodes.end())
488     {
489         //
490         // If the node isn't known anymore, we ignore the update.
491         //
492         return;
493     }
494 
495     updateSerial();
496 
497     AdapterDynamicInfoSeq& adapters = _nodes[node].adapters;
498     AdapterDynamicInfoSeq::iterator p = adapters.begin();
499     while(p != adapters.end())
500     {
501         if(p->id == adapter.id)
502         {
503             if(adapter.proxy)
504             {
505                 *p = adapter;
506             }
507             else
508             {
509                 adapters.erase(p);
510             }
511             break;
512         }
513         ++p;
514     }
515     if(adapter.proxy && p == adapters.end())
516     {
517         adapters.push_back(adapter);
518     }
519 
520     try
521     {
522         for(vector<NodeObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
523         {
524             (*q)->updateAdapter(node, adapter);
525         }
526     }
527     catch(const Ice::LocalException& ex)
528     {
529         Ice::Warning out(_logger);
530         out << "unexpected exception while publishing `updateAdapter' update:\n" << ex;
531     }
532 }
533 
534 void
nodeDown(const string & name)535 NodeObserverTopic::nodeDown(const string& name)
536 {
537     Lock sync(*this);
538     if(_topics.empty())
539     {
540         return;
541     }
542 
543     updateSerial();
544 
545     if(_nodes.find(name) == _nodes.end())
546     {
547         return;
548     }
549 
550     ServerDynamicInfoSeq& servers = _nodes[name].servers;
551     for(ServerDynamicInfoSeq::const_iterator p = servers.begin(); p != servers.end(); ++p)
552     {
553         _serverStatus.erase(p->id);
554     }
555 
556     _nodes.erase(name);
557     try
558     {
559         for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
560         {
561             (*p)->nodeDown(name);
562         }
563     }
564     catch(const Ice::LocalException& ex)
565     {
566         Ice::Warning out(_logger);
567         out << "unexpected exception while publishing `nodeDown' update:\n" << ex;
568     }
569 }
570 
571 void
initObserver(const Ice::ObjectPrx & obsv)572 NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
573 {
574     NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
575     NodeDynamicInfoSeq nodes;
576     nodes.reserve(_nodes.size());
577     for(map<string, NodeDynamicInfo>::const_iterator p = _nodes.begin(); p != _nodes.end(); ++p)
578     {
579         nodes.push_back(p->second);
580     }
581     observer->nodeInit(nodes, getContext(_serial));
582 }
583 
584 bool
isServerEnabled(const string & server) const585 NodeObserverTopic::isServerEnabled(const string& server) const
586 {
587     Lock sync(*this);
588     if(_topics.empty())
589     {
590         return false;
591     }
592     map<string, bool>::const_iterator p = _serverStatus.find(server);
593     if(p != _serverStatus.end())
594     {
595         return p->second;
596     }
597     else
598     {
599         return true; // Assume the server is enabled if we don't know its status.
600     }
601 }
602 
ApplicationObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<string,ApplicationInfo> & applications,Ice::Long serial)603 ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
604                                                    const map<string, ApplicationInfo>& applications, Ice::Long serial) :
605     ObserverTopic(topicManager, "ApplicationObserver", serial),
606     _applications(applications)
607 {
608     _publishers = getPublishers<ApplicationObserverPrx>();
609 }
610 
611 int
applicationInit(Ice::Long dbSerial,const ApplicationInfoSeq & apps)612 ApplicationObserverTopic::applicationInit(Ice::Long dbSerial, const ApplicationInfoSeq& apps)
613 {
614     Lock sync(*this);
615     if(_topics.empty())
616     {
617         return -1;
618     }
619     updateSerial(dbSerial);
620     _applications.clear();
621     for(ApplicationInfoSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
622     {
623         _applications.insert(make_pair(p->descriptor.name, *p));
624     }
625     try
626     {
627         for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
628         {
629             (*p)->applicationInit(_serial, apps, getContext(_serial, dbSerial));
630         }
631     }
632     catch(const Ice::LocalException& ex)
633     {
634         Ice::Warning out(_logger);
635         out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
636     }
637     addExpectedUpdate(_serial);
638     return _serial;
639 }
640 
641 int
applicationAdded(Ice::Long dbSerial,const ApplicationInfo & info)642 ApplicationObserverTopic::applicationAdded(Ice::Long dbSerial, const ApplicationInfo& info)
643 {
644     Lock sync(*this);
645     if(_topics.empty())
646     {
647         return -1;
648     }
649 
650     updateSerial(dbSerial);
651     _applications.insert(make_pair(info.descriptor.name, info));
652     try
653     {
654         for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
655         {
656             (*p)->applicationAdded(_serial, info, getContext(_serial, dbSerial));
657         }
658     }
659     catch(const Ice::LocalException& ex)
660     {
661         Ice::Warning out(_logger);
662         out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
663     }
664     addExpectedUpdate(_serial);
665     return _serial;
666 }
667 
668 int
applicationRemoved(Ice::Long dbSerial,const string & name)669 ApplicationObserverTopic::applicationRemoved(Ice::Long dbSerial, const string& name)
670 {
671     Lock sync(*this);
672     if(_topics.empty())
673     {
674         return -1;
675     }
676     updateSerial(dbSerial);
677     _applications.erase(name);
678     try
679     {
680         for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
681         {
682             (*p)->applicationRemoved(_serial, name, getContext(_serial, dbSerial));
683         }
684     }
685     catch(const Ice::LocalException& ex)
686     {
687         Ice::Warning out(_logger);
688         out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
689     }
690     addExpectedUpdate(_serial);
691     return _serial;
692 }
693 
694 int
applicationUpdated(Ice::Long dbSerial,const ApplicationUpdateInfo & info)695 ApplicationObserverTopic::applicationUpdated(Ice::Long dbSerial, const ApplicationUpdateInfo& info)
696 {
697     Lock sync(*this);
698     if(_topics.empty())
699     {
700         return -1;
701     }
702 
703     updateSerial(dbSerial);
704     try
705     {
706         map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
707         if(p != _applications.end())
708         {
709             ApplicationHelper helper(_publishers[0]->ice_getCommunicator(), p->second.descriptor);
710             p->second.descriptor = helper.update(info.descriptor);
711             p->second.updateTime = info.updateTime;
712             p->second.updateUser = info.updateUser;
713             p->second.revision = info.revision;
714         }
715     }
716     catch(const DeploymentException& ex)
717     {
718         Ice::Error out(_logger);
719         out << "unexpected exception while instantiating application `" << info.descriptor.name << "':\n" << ex.reason;
720         assert(false);
721     }
722     catch(const std::exception& ex)
723     {
724         Ice::Error out(_logger);
725         out << "unexpected exception while instantiating application `" << info.descriptor.name << "':\n" << ex.what();
726         assert(false);
727     }
728     catch(...)
729     {
730         Ice::Error out(_logger);
731         out << "unexpected exception while instantiating application `" << info.descriptor.name << "'";
732         assert(false);
733     }
734     try
735     {
736         for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
737         {
738             (*p)->applicationUpdated(_serial, info, getContext(_serial, dbSerial));
739         }
740     }
741     catch(const Ice::LocalException& ex)
742     {
743         Ice::Warning out(_logger);
744         out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
745     }
746     addExpectedUpdate(_serial);
747     return _serial;
748 }
749 
750 void
initObserver(const Ice::ObjectPrx & obsv)751 ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
752 {
753     ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
754     ApplicationInfoSeq applications;
755     for(map<string, ApplicationInfo>::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
756     {
757         applications.push_back(p->second);
758     }
759     observer->applicationInit(_serial, applications, getContext(_serial, _dbSerial));
760 }
761 
AdapterObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<string,AdapterInfo> & adapters,Ice::Long serial)762 AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
763                                            const map<string, AdapterInfo>& adapters, Ice::Long serial) :
764     ObserverTopic(topicManager, "AdapterObserver", serial),
765     _adapters(adapters)
766 {
767     _publishers = getPublishers<AdapterObserverPrx>();
768 }
769 
770 int
adapterInit(Ice::Long dbSerial,const AdapterInfoSeq & adpts)771 AdapterObserverTopic::adapterInit(Ice::Long dbSerial, const AdapterInfoSeq& adpts)
772 {
773     Lock sync(*this);
774     if(_topics.empty())
775     {
776         return -1;
777     }
778     updateSerial(dbSerial);
779     _adapters.clear();
780     for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
781     {
782         _adapters.insert(make_pair(q->id, *q));
783     }
784     try
785     {
786         for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
787         {
788             (*p)->adapterInit(adpts, getContext(_serial, dbSerial));
789         }
790     }
791     catch(const Ice::LocalException& ex)
792     {
793         Ice::Warning out(_logger);
794         out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
795     }
796     addExpectedUpdate(_serial);
797     return _serial;
798 }
799 
800 int
adapterAdded(Ice::Long dbSerial,const AdapterInfo & info)801 AdapterObserverTopic::adapterAdded(Ice::Long dbSerial, const AdapterInfo& info)
802 {
803     Lock sync(*this);
804     if(_topics.empty())
805     {
806         return -1;
807     }
808     updateSerial(dbSerial);
809     _adapters.insert(make_pair(info.id, info));
810     try
811     {
812         for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
813         {
814             (*p)->adapterAdded(info, getContext(_serial, dbSerial));
815         }
816     }
817     catch(const Ice::LocalException& ex)
818     {
819         Ice::Warning out(_logger);
820         out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
821     }
822     addExpectedUpdate(_serial);
823     return _serial;
824 }
825 
826 int
adapterUpdated(Ice::Long dbSerial,const AdapterInfo & info)827 AdapterObserverTopic::adapterUpdated(Ice::Long dbSerial, const AdapterInfo& info)
828 {
829     Lock sync(*this);
830     if(_topics.empty())
831     {
832         return -1;
833     }
834     updateSerial(dbSerial);
835     _adapters[info.id] = info;
836     try
837     {
838         for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
839         {
840             (*p)->adapterUpdated(info, getContext(_serial, dbSerial));
841         }
842     }
843     catch(const Ice::LocalException& ex)
844     {
845         Ice::Warning out(_logger);
846         out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
847     }
848     addExpectedUpdate(_serial);
849     return _serial;
850 }
851 
852 int
adapterRemoved(Ice::Long dbSerial,const string & id)853 AdapterObserverTopic::adapterRemoved(Ice::Long dbSerial, const string& id)
854 {
855     Lock sync(*this);
856     if(_topics.empty())
857     {
858         return -1;
859     }
860     updateSerial(dbSerial);
861     _adapters.erase(id);
862     try
863     {
864         for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
865         {
866             (*p)->adapterRemoved(id, getContext(_serial, dbSerial));
867         }
868     }
869     catch(const Ice::LocalException& ex)
870     {
871         Ice::Warning out(_logger);
872         out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
873     }
874     addExpectedUpdate(_serial);
875     return _serial;
876 }
877 
878 void
initObserver(const Ice::ObjectPrx & obsv)879 AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
880 {
881     AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
882     AdapterInfoSeq adapters;
883     for(map<string, AdapterInfo>::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
884     {
885         adapters.push_back(p->second);
886     }
887     observer->adapterInit(adapters, getContext(_serial, _dbSerial));
888 }
889 
ObjectObserverTopic(const IceStorm::TopicManagerPrx & topicManager,const map<Ice::Identity,ObjectInfo> & objects,Ice::Long serial)890 ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
891                                          const map<Ice::Identity, ObjectInfo>& objects, Ice::Long serial) :
892     ObserverTopic(topicManager, "ObjectObserver", serial),
893     _objects(objects)
894 {
895     _publishers = getPublishers<ObjectObserverPrx>();
896 }
897 
898 int
objectInit(Ice::Long dbSerial,const ObjectInfoSeq & objects)899 ObjectObserverTopic::objectInit(Ice::Long dbSerial, const ObjectInfoSeq& objects)
900 {
901     Lock sync(*this);
902     if(_topics.empty())
903     {
904         return -1;
905     }
906     updateSerial(dbSerial);
907     _objects.clear();
908     for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
909     {
910         _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
911     }
912     try
913     {
914         for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
915         {
916             (*p)->objectInit(objects, getContext(_serial, dbSerial));
917         }
918     }
919     catch(const Ice::LocalException& ex)
920     {
921         Ice::Warning out(_logger);
922         out << "unexpected exception while publishing `objectInit' update:\n" << ex;
923     }
924     addExpectedUpdate(_serial);
925     return _serial;
926 }
927 
928 int
objectAdded(Ice::Long dbSerial,const ObjectInfo & info)929 ObjectObserverTopic::objectAdded(Ice::Long dbSerial, const ObjectInfo& info)
930 {
931     Lock sync(*this);
932     if(_topics.empty())
933     {
934         return -1;
935     }
936     updateSerial(dbSerial);
937     _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
938     try
939     {
940         for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
941         {
942             (*p)->objectAdded(info, getContext(_serial, dbSerial));
943         }
944     }
945     catch(const Ice::LocalException& ex)
946     {
947         Ice::Warning out(_logger);
948         out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
949     }
950     addExpectedUpdate(_serial);
951     return _serial;
952 }
953 
954 int
objectUpdated(Ice::Long dbSerial,const ObjectInfo & info)955 ObjectObserverTopic::objectUpdated(Ice::Long dbSerial, const ObjectInfo& info)
956 {
957     Lock sync(*this);
958     if(_topics.empty())
959     {
960         return -1;
961     }
962     updateSerial(dbSerial);
963     _objects[info.proxy->ice_getIdentity()] = info;
964     try
965     {
966         for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
967         {
968             (*p)->objectUpdated(info, getContext(_serial, dbSerial));
969         }
970     }
971     catch(const Ice::LocalException& ex)
972     {
973         Ice::Warning out(_logger);
974         out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
975     }
976     addExpectedUpdate(_serial);
977     return _serial;
978 }
979 
980 int
objectRemoved(Ice::Long dbSerial,const Ice::Identity & id)981 ObjectObserverTopic::objectRemoved(Ice::Long dbSerial, const Ice::Identity& id)
982 {
983     Lock sync(*this);
984     if(_topics.empty())
985     {
986         return -1;
987     }
988     updateSerial(dbSerial);
989     _objects.erase(id);
990     try
991     {
992         for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
993         {
994             (*p)->objectRemoved(id, getContext(_serial, dbSerial));
995         }
996     }
997     catch(const Ice::LocalException& ex)
998     {
999         Ice::Warning out(_logger);
1000         out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
1001     }
1002     addExpectedUpdate(_serial);
1003     return _serial;
1004 }
1005 
1006 int
wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq & infos)1007 ObjectObserverTopic::wellKnownObjectsAddedOrUpdated(const ObjectInfoSeq& infos)
1008 {
1009     Lock sync(*this);
1010     if(_topics.empty())
1011     {
1012         return -1;
1013     }
1014 
1015     for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1016     {
1017         updateSerial();
1018         map<Ice::Identity, ObjectInfo>::iterator q = _objects.find(p->proxy->ice_getIdentity());
1019         if(q != _objects.end())
1020         {
1021             q->second = *p;
1022             try
1023             {
1024                 for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
1025                 {
1026                     (*r)->objectUpdated(*p, getContext(_serial));
1027                 }
1028             }
1029             catch(const Ice::LocalException& ex)
1030             {
1031                 Ice::Warning out(_logger);
1032                 out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
1033             }
1034         }
1035         else
1036         {
1037             _objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
1038             try
1039             {
1040                 for(vector<ObjectObserverPrx>::const_iterator r = _publishers.begin(); r != _publishers.end(); ++r)
1041                 {
1042                     (*r)->objectAdded(*p, getContext(_serial));
1043                 }
1044             }
1045             catch(const Ice::LocalException& ex)
1046             {
1047                 Ice::Warning out(_logger);
1048                 out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
1049             }
1050         }
1051     }
1052 
1053     //
1054     // We don't wait for the update to be received by the replicas
1055     // here. This operation is called by ReplicaSessionI.
1056     //
1057     addExpectedUpdate(_serial);
1058     //waitForSyncedSubscribersNoSync(_serial);
1059     return _serial;
1060 }
1061 
1062 int
wellKnownObjectsRemoved(const ObjectInfoSeq & infos)1063 ObjectObserverTopic::wellKnownObjectsRemoved(const ObjectInfoSeq& infos)
1064 {
1065     Lock sync(*this);
1066     if(_topics.empty())
1067     {
1068         return -1;
1069     }
1070 
1071     for(ObjectInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1072     {
1073         updateSerial();
1074         _objects.erase(p->proxy->ice_getIdentity());
1075         try
1076         {
1077             for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
1078             {
1079                 (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
1080             }
1081         }
1082         catch(const Ice::LocalException& ex)
1083         {
1084             Ice::Warning out(_logger);
1085             out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
1086         }
1087     }
1088 
1089     //
1090     // We don't need to wait for the update to be received by the
1091     // replicas here. This operation is only called internaly by
1092     // IceGrid.
1093     //
1094     addExpectedUpdate(_serial);
1095     //waitForSyncedSubscribersNoSync(_serial);
1096     return _serial;
1097 }
1098 
1099 void
initObserver(const Ice::ObjectPrx & obsv)1100 ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
1101 {
1102     ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
1103     ObjectInfoSeq objects;
1104     for(map<Ice::Identity, ObjectInfo>::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
1105     {
1106         objects.push_back(p->second);
1107     }
1108     observer->objectInit(objects, getContext(_serial, _dbSerial));
1109 }
1110