1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <IceUtil/StringUtil.h>
6 #include <IceUtil/Random.h>
7 #include <IceUtil/Functional.h>
8 #include <Ice/LoggerUtil.h>
9 #include <Ice/Communicator.h>
10 #include <Ice/ObjectAdapter.h>
11 #include <IceGrid/Database.h>
12 #include <IceGrid/TraceLevels.h>
13 #include <IceGrid/Util.h>
14 #include <IceGrid/DescriptorHelper.h>
15 #include <IceGrid/NodeSessionI.h>
16 #include <IceGrid/ReplicaSessionI.h>
17 #include <IceGrid/Session.h>
18 #include <IceGrid/Topics.h>
19 #include <IceGrid/IceGrid.h>
20 
21 #include <algorithm>
22 #include <functional>
23 #include <iterator>
24 
25 using namespace std;
26 using namespace IceGrid;
27 
28 typedef IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream> ApplicationMapRWCursor;
29 typedef IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream> AdapterMapROCursor;
30 typedef IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream> AdaptersByGroupMapCursor;
31 typedef IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> ObjectsByTypeMapROCursor;
32 typedef IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream> ObjectsMapROCursor;
33 
34 namespace
35 {
36 
37 const string applicationsDbName = "applications";
38 const string adaptersDbName = "adapters";
39 const string adaptersByReplicaGroupIdDbName = "adaptersByReplicaGroupId";
40 const string objectsDbName = "objects";
41 const string objectsByTypeDbName = "objectsByType";
42 const string internalObjectsDbName = "internal-objects";
43 const string internalObjectsByTypeDbName = "internal-objectsByType";
44 const string serialsDbName = "serials";
45 
46 struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
47 {
operator ()__anon2d983f220111::ObjectLoadCI48     bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
49     {
50         return lhs.second < rhs.second;
51     }
52 };
53 
54 template<typename K, typename V, typename C, typename H> vector<V>
toVector(const IceDB::ReadOnlyTxn & txn,const IceDB::Dbi<K,V,C,H> & m)55 toVector(const IceDB::ReadOnlyTxn& txn, const IceDB::Dbi<K, V, C, H>& m)
56 {
57     vector<V> v;
58     IceDB::ReadOnlyCursor<K, V, C, H> cursor(m, txn);
59     K key;
60     V value;
61     while(cursor.get(key, value, MDB_NEXT))
62     {
63         v.push_back(value);
64     }
65     return v;
66 }
67 
68 template<typename K, typename V, typename C, typename H> map<K, V>
toMap(const IceDB::Txn & txn,const IceDB::Dbi<K,V,C,H> & d)69 toMap(const IceDB::Txn& txn, const IceDB::Dbi<K, V, C, H>& d)
70 {
71     std::map<K, V> m;
72     IceDB::Cursor<K, V, C, H> cursor(d, txn);
73     K key;
74     V value;
75     while(cursor.get(key, value, MDB_NEXT))
76     {
77         typename std::map<K, V>::value_type v(key, value);
78         m.insert(v);
79     }
80     cursor.close();
81     return m;
82 }
83 
84 void
logError(const Ice::CommunicatorPtr & com,const IceDB::LMDBException & ex)85 logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
86 {
87     Ice::Error error(com->getLogger());
88     error << "LMDB error: " << ex;
89 }
90 
91 void
filterAdapterInfos(const string & filter,const string & replicaGroupId,const RegistryPluginFacadeIPtr & pluginFacade,const Ice::ConnectionPtr & con,const Ice::Context & ctx,AdapterInfoSeq & infos)92 filterAdapterInfos(const string& filter,
93                    const string& replicaGroupId,
94                    const RegistryPluginFacadeIPtr& pluginFacade,
95                    const Ice::ConnectionPtr& con,
96                    const Ice::Context& ctx,
97                    AdapterInfoSeq& infos)
98 {
99     if(infos.empty() || !pluginFacade->hasReplicaGroupFilters())
100     {
101         return;
102     }
103 
104     vector<ReplicaGroupFilterPtr> filters = pluginFacade->getReplicaGroupFilters(filter);
105     if(filters.empty())
106     {
107         return;
108     }
109 
110     Ice::StringSeq adapterIds;
111     adapterIds.reserve(infos.size());
112     for(vector<AdapterInfo>::const_iterator p = infos.begin(); p != infos.end(); ++p)
113     {
114         adapterIds.push_back(p->id);
115     }
116 
117     for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
118     {
119         adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx);
120     }
121 
122     vector<AdapterInfo> filteredAdpts;
123     filteredAdpts.reserve(infos.size());
124     for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
125     {
126         for(vector<AdapterInfo>::const_iterator r = infos.begin(); r != infos.end(); ++r)
127         {
128             if(*q == r->id)
129             {
130                 filteredAdpts.push_back(*r);
131                 break;
132             }
133         }
134     }
135     infos.swap(filteredAdpts);
136 }
137 
138 vector<AdapterInfo>
findByReplicaGroupId(const IceDB::Txn & txn,const StringAdapterInfoMap & adapters,const StringStringMap & adaptersByGroupId,const string & name)139 findByReplicaGroupId(const IceDB::Txn& txn,
140                      const StringAdapterInfoMap& adapters,
141                      const StringStringMap& adaptersByGroupId,
142                      const string& name)
143 {
144     vector<AdapterInfo> result;
145     AdaptersByGroupMapCursor cursor(adaptersByGroupId, txn);
146     string id;
147     if(cursor.find(name, id))
148     {
149         AdapterInfo info;
150         adapters.get(txn, id, info);
151         result.push_back(info);
152 
153         string n;
154         while(cursor.get(n, id, MDB_NEXT) && n == name)
155         {
156             adapters.get(txn, id, info);
157             result.push_back(info);
158         }
159     }
160     return result;
161 }
162 
163 vector<ObjectInfo>
findByType(const IceDB::ReadOnlyTxn & txn,const IdentityObjectInfoMap & objects,const StringIdentityMap & objectsByType,const string & type)164 findByType(const IceDB::ReadOnlyTxn& txn,
165            const IdentityObjectInfoMap& objects,
166            const StringIdentityMap& objectsByType,
167            const string& type)
168 {
169     vector<ObjectInfo> result;
170     ObjectsByTypeMapROCursor cursor(objectsByType, txn);
171     Ice::Identity id;
172     if(cursor.find(type, id))
173     {
174         ObjectInfo info;
175         objects.get(txn, id, info);
176         result.push_back(info);
177 
178         string t;
179         while(cursor.get(t, id, MDB_NEXT) && t == type)
180         {
181             objects.get(txn, id, info);
182             result.push_back(info);
183         }
184     }
185     return result;
186 }
187 
188 }
189 
Database(const Ice::ObjectAdapterPtr & registryAdapter,const IceStorm::TopicManagerPrx & topicManager,const string & instanceName,const TraceLevelsPtr & traceLevels,const RegistryInfo & info,bool readonly)190 Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
191                    const IceStorm::TopicManagerPrx& topicManager,
192                    const string& instanceName,
193                    const TraceLevelsPtr& traceLevels,
194                    const RegistryInfo& info,
195                    bool readonly) :
196     _communicator(registryAdapter->getCommunicator()),
197     _internalAdapter(registryAdapter),
198     _topicManager(topicManager),
199     _instanceName(instanceName),
200     _traceLevels(traceLevels),
201     _master(info.name == "Master"),
202     _readonly(readonly || !_master),
203     _replicaCache(_communicator, topicManager),
204     _nodeCache(_communicator, _replicaCache, _readonly && _master ? string("Master (read-only)") : info.name),
205     _adapterCache(_communicator),
206     _objectCache(_communicator),
207     _allocatableObjectCache(_communicator),
208     _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
209     _dbLock(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path") + "/icedb.lock"),
210     _env(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path"), 8,
211          IceDB::getMapSize(_communicator->getProperties()->getPropertyAsInt("IceGrid.Registry.LMDB.MapSize"))),
212     _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
213     _lock(0)
214 {
215     IceDB::ReadWriteTxn txn(_env);
216 
217     IceDB::IceContext context;
218     context.communicator = _communicator;
219     context.encoding.major = 1;
220     context.encoding.minor = 1;
221 
222     _applications = StringApplicationInfoMap(txn, applicationsDbName, context, MDB_CREATE);
223 
224     _adapters = StringAdapterInfoMap(txn, adaptersDbName, context, MDB_CREATE);
225     _adaptersByGroupId = StringStringMap(txn, adaptersByReplicaGroupIdDbName, context, MDB_CREATE|MDB_DUPSORT);
226 
227     _objects = IdentityObjectInfoMap(txn, objectsDbName, context, MDB_CREATE);
228     _objectsByType = StringIdentityMap(txn, objectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
229 
230     _internalObjects = IdentityObjectInfoMap(txn, internalObjectsDbName, context, MDB_CREATE);
231     _internalObjectsByType = StringIdentityMap(txn, internalObjectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
232 
233     _serials = StringLongMap(txn, serialsDbName, context, MDB_CREATE);
234 
235     ServerEntrySeq entries;
236 
237     string k;
238     ApplicationInfo v;
239     ApplicationMapRWCursor cursor(_applications, txn);
240     while(cursor.get(k, v, MDB_NEXT))
241     {
242         try
243         {
244             load(ApplicationHelper(_communicator, v.descriptor), entries, v.uuid, v.revision);
245         }
246         catch(const DeploymentException& ex)
247         {
248             Ice::Error err(_traceLevels->logger);
249             err << "invalid application `" << k << "':\n" << ex.reason;
250         }
251     }
252 
253     _serverCache.setTraceLevels(_traceLevels);
254     _nodeCache.setTraceLevels(_traceLevels);
255     _replicaCache.setTraceLevels(_traceLevels);
256     _adapterCache.setTraceLevels(_traceLevels);
257     _objectCache.setTraceLevels(_traceLevels);
258     _allocatableObjectCache.setTraceLevels(_traceLevels);
259 
260     _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
261     _registryObserverTopic = new RegistryObserverTopic(_topicManager);
262 
263     _serverCache.setNodeObserverTopic(_nodeObserverTopic);
264 
265     // Set all serials to 1 if they have not yet been set.
266     Ice::Long serial;
267     if(!_serials.get(txn, applicationsDbName, serial))
268     {
269         _serials.put(txn, applicationsDbName, 1);
270     }
271     if(!_serials.get(txn, adaptersDbName, serial))
272     {
273         _serials.put(txn, adaptersDbName, 1);
274     }
275     if(!_serials.get(txn, objectsDbName, serial))
276     {
277         _serials.put(txn, objectsDbName, 1);
278     }
279 
280     _applicationObserverTopic =
281         new ApplicationObserverTopic(_topicManager, toMap(txn, _applications), getSerial(txn, applicationsDbName));
282     _adapterObserverTopic =
283         new AdapterObserverTopic(_topicManager, toMap(txn, _adapters), getSerial(txn, adaptersDbName));
284     _objectObserverTopic =
285         new ObjectObserverTopic(_topicManager, toMap(txn, _objects), getSerial(txn, objectsDbName));
286 
287     txn.commit();
288 
289     _registryObserverTopic->registryUp(info);
290 
291     _pluginFacade->setDatabase(this);
292 }
293 
294 std::string
getInstanceName() const295 Database::getInstanceName() const
296 {
297     return _instanceName;
298 }
299 
300 void
destroy()301 Database::destroy()
302 {
303     _pluginFacade->setDatabase(0);
304 
305     _registryObserverTopic->destroy();
306     _nodeObserverTopic->destroy();
307     _applicationObserverTopic->destroy();
308     _adapterObserverTopic->destroy();
309     _objectObserverTopic->destroy();
310 }
311 
312 ObserverTopicPtr
getObserverTopic(TopicName name) const313 Database::getObserverTopic(TopicName name) const
314 {
315     switch(name)
316     {
317     case RegistryObserverTopicName:
318         return _registryObserverTopic;
319     case NodeObserverTopicName:
320         return _nodeObserverTopic;
321     case ApplicationObserverTopicName:
322         return _applicationObserverTopic;
323     case AdapterObserverTopicName:
324         return _adapterObserverTopic;
325     case ObjectObserverTopicName:
326         return _objectObserverTopic;
327     default:
328         break;
329     }
330     return 0;
331 }
332 
333 void
checkSessionLock(AdminSessionI * session)334 Database::checkSessionLock(AdminSessionI* session)
335 {
336     if(_lock != 0 && session != _lock)
337     {
338         throw AccessDeniedException(_lockUserId); // Lock held by another session.
339     }
340 }
341 
342 int
lock(AdminSessionI * session,const string & userId)343 Database::lock(AdminSessionI* session, const string& userId)
344 {
345     Lock sync(*this);
346 
347     if(_lock != 0 && session != _lock)
348     {
349         throw AccessDeniedException(_lockUserId); // Lock held by another session.
350     }
351     assert(_lock == 0 || _lock == session);
352 
353     _lock = session;
354     _lockUserId = userId;
355 
356     return _applicationObserverTopic->getSerial();
357 }
358 
359 void
unlock(AdminSessionI * session)360 Database::unlock(AdminSessionI* session)
361 {
362     Lock sync(*this);
363     if(_lock != session)
364     {
365         throw AccessDeniedException();
366     }
367 
368     _lock = 0;
369     _lockUserId.clear();
370 }
371 
372 void
syncApplications(const ApplicationInfoSeq & newApplications,Ice::Long dbSerial)373 Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long dbSerial)
374 {
375     assert(dbSerial != 0);
376     int serial = 0;
377     {
378         Lock sync(*this);
379 
380         map<string, ApplicationInfo> oldApplications;
381         try
382         {
383             IceDB::ReadWriteTxn txn(_env);
384 
385             oldApplications = toMap(txn, _applications);
386             _applications.clear(txn);
387             for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
388             {
389                 _applications.put(txn, p->descriptor.name, *p);
390             }
391             dbSerial = updateSerial(txn, applicationsDbName, dbSerial);
392 
393             txn.commit();
394         }
395         catch(const IceDB::LMDBException& ex)
396         {
397             logError(_communicator, ex);
398             throw;
399         }
400 
401         ServerEntrySeq entries;
402         set<string> names;
403 
404         for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
405         {
406             try
407             {
408                 map<string, ApplicationInfo>::const_iterator q = oldApplications.find(p->descriptor.name);
409                 if(q != oldApplications.end())
410                 {
411                     ApplicationHelper previous(_communicator, q->second.descriptor);
412                     ApplicationHelper helper(_communicator, p->descriptor);
413                     reload(previous, helper, entries, p->uuid, p->revision, false);
414                 }
415                 else
416                 {
417                     load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
418                 }
419             }
420             catch(const DeploymentException& ex)
421             {
422                 Ice::Warning warn(_traceLevels->logger);
423                 warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
424             }
425             names.insert(p->descriptor.name);
426         }
427 
428         for(map<string, ApplicationInfo>::iterator s = oldApplications.begin(); s != oldApplications.end(); ++s)
429         {
430             if(names.find(s->first) == names.end())
431             {
432                 unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
433             }
434         }
435 
436         for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
437 
438         if(_traceLevels->application > 0)
439         {
440             Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
441             out << "synchronized applications (serial = `" << dbSerial << "')";
442         }
443 
444         serial = _applicationObserverTopic->applicationInit(dbSerial, newApplications);
445     }
446     _applicationObserverTopic->waitForSyncedSubscribers(serial);
447 }
448 
449 void
syncAdapters(const AdapterInfoSeq & adapters,Ice::Long dbSerial)450 Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
451 {
452     assert(dbSerial != 0);
453     int serial = 0;
454     {
455         Lock sync(*this);
456         try
457         {
458             IceDB::ReadWriteTxn txn(_env);
459 
460             _adapters.clear(txn);
461             _adaptersByGroupId.clear(txn);
462             for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
463             {
464                 addAdapter(txn, *r);
465             }
466             dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
467 
468             txn.commit();
469         }
470         catch(const IceDB::KeyTooLongException&)
471         {
472             throw;
473         }
474         catch(const IceDB::LMDBException& ex)
475         {
476             logError(_communicator, ex);
477             throw;
478         }
479 
480         if(_traceLevels->adapter > 0)
481         {
482             Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
483             out << "synchronized adapters (serial = `" << dbSerial << "')";
484         }
485 
486         serial = _adapterObserverTopic->adapterInit(dbSerial, adapters);
487     }
488     _adapterObserverTopic->waitForSyncedSubscribers(serial);
489 }
490 
491 void
syncObjects(const ObjectInfoSeq & objects,Ice::Long dbSerial)492 Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
493 {
494     assert(dbSerial != 0);
495     int serial = 0;
496     {
497         Lock sync(*this);
498         try
499         {
500             IceDB::ReadWriteTxn txn(_env);
501 
502             _objects.clear(txn);
503             _objectsByType.clear(txn);
504             for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
505             {
506                 addObject(txn, *q, false);
507             }
508             dbSerial = updateSerial(txn, objectsDbName, dbSerial);
509 
510             txn.commit();
511         }
512         catch(const IceDB::LMDBException& ex)
513         {
514             logError(_communicator, ex);
515             throw;
516         }
517 
518         if(_traceLevels->object > 0)
519         {
520             Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
521             out << "synchronized objects (serial = `" << dbSerial << "')";
522         }
523 
524         serial = _objectObserverTopic->objectInit(dbSerial, objects);
525     }
526     _objectObserverTopic->waitForSyncedSubscribers(serial);
527 }
528 
529 ApplicationInfoSeq
getApplications(Ice::Long & serial)530 Database::getApplications(Ice::Long& serial)
531 {
532     try
533     {
534         IceDB::ReadOnlyTxn txn(_env);
535 
536         serial = getSerial(txn, applicationsDbName);
537         return toVector(txn, _applications);
538     }
539     catch(const IceDB::LMDBException& ex)
540     {
541         logError(_communicator, ex);
542         throw;
543     }
544 }
545 
546 AdapterInfoSeq
getAdapters(Ice::Long & serial)547 Database::getAdapters(Ice::Long& serial)
548 {
549     try
550     {
551         IceDB::ReadOnlyTxn txn(_env);
552 
553         serial = getSerial(txn, adaptersDbName);
554         return toVector(txn, _adapters);
555     }
556     catch(const IceDB::LMDBException& ex)
557     {
558         logError(_communicator, ex);
559         throw;
560     }
561 }
562 
563 ObjectInfoSeq
getObjects(Ice::Long & serial)564 Database::getObjects(Ice::Long& serial)
565 {
566     try
567     {
568         IceDB::ReadOnlyTxn txn(_env);
569 
570         serial = getSerial(txn, objectsDbName);
571         return toVector(txn, _objects);
572     }
573     catch(const IceDB::LMDBException& ex)
574     {
575         logError(_communicator, ex);
576         throw;
577     }
578 }
579 
580 StringLongDict
getSerials() const581 Database::getSerials() const
582 {
583     IceDB::ReadOnlyTxn txn(_env);
584     return toMap(txn, _serials);
585 }
586 
587 void
addApplication(const ApplicationInfo & info,AdminSessionI * session,Ice::Long dbSerial)588 Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ice::Long dbSerial)
589 {
590     assert(dbSerial != 0 || _master);
591 
592     int serial = 0; // Initialize to prevent warning.
593     ServerEntrySeq entries;
594     try
595     {
596         Lock sync(*this);
597         checkSessionLock(session);
598 
599         waitForUpdate(info.descriptor.name);
600 
601         IceDB::ReadWriteTxn txn(_env);
602 
603         if(_applications.find(txn, info.descriptor.name))
604         {
605             throw DeploymentException("application `" + info.descriptor.name + "' already exists");
606         }
607 
608         ApplicationHelper helper(_communicator, info.descriptor, true);
609         checkForAddition(helper, txn);
610         dbSerial = saveApplication(info, txn, dbSerial);
611 
612         txn.commit();
613 
614         load(helper, entries, info.uuid, info.revision);
615         startUpdating(info.descriptor.name, info.uuid, info.revision);
616 
617         for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
618         serial = _applicationObserverTopic->applicationAdded(dbSerial, info);
619     }
620     catch(const IceDB::KeyTooLongException& ex)
621     {
622         throw DeploymentException("application name `" + info.descriptor.name + "' is too long: " + ex.what());
623     }
624     catch(const IceDB::LMDBException& ex)
625     {
626         logError(_communicator, ex);
627         throw;
628     }
629 
630     _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
631 
632     //
633     // Mark the application as updated. All the replicas received the update so it's now safe
634     // for the nodes to start the servers.
635     //
636     {
637         Lock sync(*this);
638         vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), info.descriptor.name);
639         assert(p != _updating.end());
640         p->markUpdated();
641     }
642 
643     if(_master)
644     {
645         try
646         {
647             for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
648             {
649                 try
650                 {
651                     (*p)->waitForSync();
652                 }
653                 catch(const NodeUnreachableException&)
654                 {
655                     // Ignore.
656                 }
657             }
658         }
659         catch(const DeploymentException&)
660         {
661             try
662             {
663                 Lock sync(*this);
664                 entries.clear();
665                 unload(ApplicationHelper(_communicator, info.descriptor), entries);
666 
667                 IceDB::ReadWriteTxn txn(_env);
668                 dbSerial = removeApplication(info.descriptor.name, txn);
669                 txn.commit();
670 
671                 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
672                 serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
673             }
674             catch(const DeploymentException& ex)
675             {
676                 Ice::Error err(_traceLevels->logger);
677                 err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
678             }
679             catch(const IceDB::LMDBException& ex)
680             {
681                 logError(_communicator, ex);
682             }
683 
684             _applicationObserverTopic->waitForSyncedSubscribers(serial);
685             for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
686             finishUpdating(info.descriptor.name);
687             throw;
688         }
689     }
690 
691     if(_traceLevels->application > 0)
692     {
693         Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
694         out << "added application `" << info.descriptor.name << "' (serial = `" << dbSerial << "')";
695     }
696     finishUpdating(info.descriptor.name);
697 }
698 
699 void
updateApplication(const ApplicationUpdateInfo & updt,bool noRestart,AdminSessionI * session,Ice::Long dbSerial)700 Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session,
701                             Ice::Long dbSerial)
702 {
703     assert(dbSerial != 0 || _master);
704 
705     ApplicationInfo oldApp;
706     ApplicationUpdateInfo update = updt;
707     IceInternal::UniquePtr<ApplicationHelper> previous;
708     IceInternal::UniquePtr<ApplicationHelper> helper;
709     try
710     {
711         Lock sync(*this);
712         checkSessionLock(session);
713 
714         waitForUpdate(update.descriptor.name);
715 
716         IceDB::ReadOnlyTxn txn(_env);
717 
718         if(!_applications.get(txn, update.descriptor.name, oldApp))
719         {
720             throw ApplicationNotExistException(update.descriptor.name);
721         }
722 
723         if(update.revision < 0)
724         {
725             update.revision = oldApp.revision + 1;
726         }
727 
728         previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
729         helper.reset(new ApplicationHelper(_communicator, previous->update(update.descriptor), true));
730 
731         startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
732     }
733     catch(const IceDB::LMDBException& ex)
734     {
735         logError(_communicator, ex);
736         throw;
737     }
738 
739     finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial);
740 }
741 
742 void
syncApplicationDescriptor(const ApplicationDescriptor & newDesc,bool noRestart,AdminSessionI * session)743 Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool noRestart, AdminSessionI* session)
744 {
745     assert(_master);
746 
747     ApplicationUpdateInfo update;
748     ApplicationInfo oldApp;
749     IceInternal::UniquePtr<ApplicationHelper> previous;
750     IceInternal::UniquePtr<ApplicationHelper> helper;
751     try
752     {
753         Lock sync(*this);
754         checkSessionLock(session);
755 
756         waitForUpdate(newDesc.name);
757 
758         IceDB::ReadOnlyTxn txn(_env);
759 
760         if(!_applications.get(txn, newDesc.name, oldApp))
761         {
762             throw ApplicationNotExistException(newDesc.name);
763         }
764 
765         previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
766         helper.reset(new ApplicationHelper(_communicator, newDesc, true));
767 
768         update.updateTime = IceUtil::Time::now().toMilliSeconds();
769         update.updateUser = _lockUserId;
770         update.revision = oldApp.revision + 1;
771         update.descriptor = helper->diff(*previous);
772 
773         startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
774     }
775     catch(const IceDB::LMDBException& ex)
776     {
777         logError(_communicator, ex);
778         throw;
779     }
780 
781     finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
782 }
783 
784 void
instantiateServer(const string & application,const string & node,const ServerInstanceDescriptor & instance,AdminSessionI * session)785 Database::instantiateServer(const string& application,
786                             const string& node,
787                             const ServerInstanceDescriptor& instance,
788                             AdminSessionI* session)
789 {
790     assert(_master);
791 
792     ApplicationUpdateInfo update;
793     ApplicationInfo oldApp;
794     IceInternal::UniquePtr<ApplicationHelper> previous;
795     IceInternal::UniquePtr<ApplicationHelper> helper;
796 
797     try
798     {
799         Lock sync(*this);
800         checkSessionLock(session);
801 
802         waitForUpdate(application);
803 
804         IceDB::ReadOnlyTxn txn(_env);
805 
806         if(!_applications.get(txn, application, oldApp))
807         {
808             throw ApplicationNotExistException(application);
809         }
810 
811         previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
812         helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
813 
814         update.updateTime = IceUtil::Time::now().toMilliSeconds();
815         update.updateUser = _lockUserId;
816         update.revision = oldApp.revision + 1;
817         update.descriptor = helper->diff(*previous);
818 
819         startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
820     }
821     catch(const IceDB::LMDBException& ex)
822     {
823         logError(_communicator, ex);
824         throw;
825     }
826 
827     finishApplicationUpdate(update, oldApp, *previous, *helper, session, true);
828 }
829 
830 void
removeApplication(const string & name,AdminSessionI * session,Ice::Long dbSerial)831 Database::removeApplication(const string& name, AdminSessionI* session, Ice::Long dbSerial)
832 {
833     assert(dbSerial != 0 || _master);
834     ServerEntrySeq entries;
835 
836     int serial = 0; // Initialize to prevent warning.
837     try
838     {
839         Lock sync(*this);
840         checkSessionLock(session);
841 
842         waitForUpdate(name);
843 
844         ApplicationInfo appInfo;
845 
846         IceDB::ReadWriteTxn txn(_env);
847 
848         if(!_applications.get(txn, name, appInfo))
849         {
850             throw ApplicationNotExistException(name);
851         }
852 
853         bool init = false;
854         try
855         {
856             ApplicationHelper helper(_communicator, appInfo.descriptor);
857             init = true;
858             checkForRemove(helper);
859             unload(helper, entries);
860         }
861         catch(const DeploymentException&)
862         {
863             if(init)
864             {
865                 throw;
866             }
867         }
868         dbSerial = removeApplication(name, txn, dbSerial);
869 
870         txn.commit();
871 
872         startUpdating(name, appInfo.uuid, appInfo.revision);
873 
874         for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
875         serial = _applicationObserverTopic->applicationRemoved(dbSerial, name);
876     }
877     catch(const IceDB::LMDBException& ex)
878     {
879         logError(_communicator, ex);
880         throw;
881     }
882 
883     _applicationObserverTopic->waitForSyncedSubscribers(serial);
884 
885     if(_master)
886     {
887         for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
888     }
889 
890     if(_traceLevels->application > 0)
891     {
892         Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
893         out << "removed application `" << name << "' (serial = `" << dbSerial << "')";
894     }
895 
896     finishUpdating(name);
897 }
898 
899 ApplicationInfo
getApplicationInfo(const std::string & name)900 Database::getApplicationInfo(const std::string& name)
901 {
902     IceDB::ReadOnlyTxn txn(_env);
903 
904     ApplicationInfo info;
905     if(!_applications.get(txn, name, info))
906     {
907         throw ApplicationNotExistException(name);
908     }
909     return info;
910 }
911 
912 Ice::StringSeq
getAllApplications(const string & expression)913 Database::getAllApplications(const string& expression)
914 {
915     IceDB::ReadOnlyTxn txn(_env);
916     return getMatchingKeys<map<string, ApplicationInfo> >(toMap(txn, _applications), expression);
917 }
918 
919 void
waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr & cb,const string & uuid,int revision)920 Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
921                                    const string& uuid,
922                                    int revision)
923 {
924     Lock sync(*this);
925 
926     vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), make_pair(uuid, revision));
927     if(p != _updating.end() && !p->updated)
928     {
929         p->cbs.push_back(cb);
930     }
931     else
932     {
933         cb->ice_response();
934     }
935 }
936 
937 NodeCache&
getNodeCache()938 Database::getNodeCache()
939 {
940     return _nodeCache;
941 }
942 
943 NodeEntryPtr
getNode(const string & name,bool create) const944 Database::getNode(const string& name, bool create) const
945 {
946     return _nodeCache.get(name, create);
947 }
948 
949 ReplicaCache&
getReplicaCache()950 Database::getReplicaCache()
951 {
952     return _replicaCache;
953 }
954 
955 ReplicaEntryPtr
getReplica(const string & name) const956 Database::getReplica(const string& name) const
957 {
958     return _replicaCache.get(name);
959 }
960 
961 ServerCache&
getServerCache()962 Database::getServerCache()
963 {
964     return _serverCache;
965 }
966 
967 ServerEntryPtr
getServer(const string & id) const968 Database::getServer(const string& id) const
969 {
970     return _serverCache.get(id);
971 }
972 
973 AllocatableObjectCache&
getAllocatableObjectCache()974 Database::getAllocatableObjectCache()
975 {
976     return _allocatableObjectCache;
977 }
978 
979 AllocatableObjectEntryPtr
getAllocatableObject(const Ice::Identity & id) const980 Database::getAllocatableObject(const Ice::Identity& id) const
981 {
982     return _allocatableObjectCache.get(id);
983 }
984 
985 void
setAdapterDirectProxy(const string & adapterId,const string & replicaGroupId,const Ice::ObjectPrx & proxy,Ice::Long dbSerial)986 Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
987                                 Ice::Long dbSerial)
988 {
989     assert(dbSerial != 0 || _master);
990 
991     int serial = 0; // Initialize to prevent warning.
992     {
993         Lock sync(*this);
994         if(_adapterCache.has(adapterId))
995         {
996             throw AdapterExistsException(adapterId);
997         }
998         if(_adapterCache.has(replicaGroupId))
999         {
1000             throw DeploymentException("registering adapter `" + adapterId + "' with the replica group `" +
1001                                       replicaGroupId + "' is not allowed:\nthe replica group was added with an "
1002                                       "application descriptor and only adapters specified in an application descriptor "
1003                                       "can be member of this replica group");
1004         }
1005 
1006         AdapterInfo info;
1007         info.id = adapterId;
1008         info.proxy = proxy;
1009         info.replicaGroupId = replicaGroupId;
1010 
1011         bool updated = false;
1012         try
1013         {
1014             IceDB::ReadWriteTxn txn(_env);
1015 
1016             AdapterInfo oldInfo;
1017             bool found = _adapters.get(txn, adapterId, oldInfo);
1018             if(proxy)
1019             {
1020                 updated = found;
1021 
1022                 if(replicaGroupId != oldInfo.replicaGroupId)
1023                 {
1024                     _adaptersByGroupId.del(txn, oldInfo.replicaGroupId, adapterId);
1025                 }
1026                 addAdapter(txn, info);
1027             }
1028             else
1029             {
1030                 if(!found)
1031                 {
1032                     return;
1033                 }
1034                 deleteAdapter(txn, oldInfo);
1035             }
1036             dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
1037 
1038             txn.commit();
1039         }
1040         catch(const IceDB::KeyTooLongException&)
1041         {
1042             throw;
1043         }
1044         catch(const IceDB::LMDBException& ex)
1045         {
1046             logError(_communicator, ex);
1047             throw;
1048         }
1049 
1050         if(_traceLevels->adapter > 0)
1051         {
1052             Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
1053             out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
1054             if(!replicaGroupId.empty())
1055             {
1056                 out << " with replica group `" << replicaGroupId << "'";
1057             }
1058             out << " (serial = `" << dbSerial << "')";
1059         }
1060 
1061         if(proxy)
1062         {
1063             if(updated)
1064             {
1065                 serial = _adapterObserverTopic->adapterUpdated(dbSerial, info);
1066             }
1067             else
1068             {
1069                 serial = _adapterObserverTopic->adapterAdded(dbSerial, info);
1070             }
1071         }
1072         else
1073         {
1074             serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
1075         }
1076     }
1077     _adapterObserverTopic->waitForSyncedSubscribers(serial);
1078 }
1079 
1080 Ice::ObjectPrx
getAdapterDirectProxy(const string & id,const Ice::EncodingVersion & encoding,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1081 Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
1082                                 const Ice::Context& ctx)
1083 {
1084     IceDB::ReadOnlyTxn txn(_env);
1085 
1086     AdapterInfo info;
1087     if(_adapters.get(txn, id, info))
1088     {
1089         return info.proxy;
1090     }
1091 
1092     Ice::EndpointSeq endpoints;
1093     vector<AdapterInfo> infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1094     if(infos.empty())
1095     {
1096         throw AdapterNotExistException(id);
1097     }
1098 
1099     filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
1100     for(unsigned int i = 0; i < infos.size(); ++i)
1101     {
1102         if(IceInternal::isSupported(encoding, infos[i].proxy->ice_getEncodingVersion()))
1103         {
1104             Ice::EndpointSeq edpts = infos[i].proxy->ice_getEndpoints();
1105             endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
1106         }
1107     }
1108     if(!endpoints.empty())
1109     {
1110         return _communicator->stringToProxy("dummy:default")->ice_endpoints(endpoints);
1111     }
1112     return 0;
1113 }
1114 
1115 void
removeAdapter(const string & adapterId)1116 Database::removeAdapter(const string& adapterId)
1117 {
1118     assert(_master);
1119 
1120     int serial = 0; // Initialize to prevent warning.
1121     {
1122         Lock sync(*this);
1123         if(_adapterCache.has(adapterId))
1124         {
1125             AdapterEntryPtr adpt = _adapterCache.get(adapterId);
1126             throw DeploymentException("removing adapter `" + adapterId + "' is not allowed:\n" +
1127                                       "the adapter was added with the application descriptor `" +
1128                                       adpt->getApplication() + "'");
1129         }
1130 
1131         AdapterInfoSeq infos;
1132         Ice::Long dbSerial = 0;
1133         try
1134         {
1135             IceDB::ReadWriteTxn txn(_env);
1136 
1137             AdapterInfo info;
1138             if(_adapters.get(txn, adapterId, info))
1139             {
1140                 deleteAdapter(txn, info);
1141             }
1142             else
1143             {
1144                 infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, adapterId);
1145                 if(infos.empty())
1146                 {
1147                     throw AdapterNotExistException(adapterId);
1148                 }
1149                 for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p)
1150                 {
1151                     _adaptersByGroupId.del(txn, p->replicaGroupId, p->id);
1152                     p->replicaGroupId.clear();
1153                     addAdapter(txn, *p);
1154                 }
1155             }
1156             dbSerial = updateSerial(txn, adaptersDbName);
1157 
1158             txn.commit();
1159         }
1160         catch(const IceDB::KeyTooLongException&)
1161         {
1162             throw;
1163         }
1164         catch(const IceDB::LMDBException& ex)
1165         {
1166             logError(_communicator, ex);
1167             throw;
1168         }
1169 
1170         if(_traceLevels->adapter > 0)
1171         {
1172             Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
1173             out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "' (serial = `" << dbSerial << "')";
1174         }
1175 
1176         if(infos.empty())
1177         {
1178             serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
1179         }
1180         else
1181         {
1182             for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1183             {
1184                 serial = _adapterObserverTopic->adapterUpdated(dbSerial, *p);
1185             }
1186         }
1187     }
1188     _adapterObserverTopic->waitForSyncedSubscribers(serial);
1189 }
1190 
1191 AdapterPrx
getAdapterProxy(const string & adapterId,const string & replicaGroupId,bool upToDate)1192 Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, bool upToDate)
1193 {
1194     Lock sync(*this); // Make sure this isn't call during an update.
1195     return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate);
1196 }
1197 
1198 void
getLocatorAdapterInfo(const string & id,const Ice::ConnectionPtr & connection,const Ice::Context & context,LocatorAdapterInfoSeq & adpts,int & count,bool & replicaGroup,bool & roundRobin,const set<string> & excludes)1199 Database::getLocatorAdapterInfo(const string& id,
1200                                 const Ice::ConnectionPtr& connection,
1201                                 const Ice::Context& context,
1202                                 LocatorAdapterInfoSeq& adpts,
1203                                 int& count,
1204                                 bool& replicaGroup,
1205                                 bool& roundRobin,
1206                                 const set<string>& excludes)
1207 {
1208     string filter;
1209     {
1210         Lock sync(*this); // Make sure this isn't call during an update.
1211         _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes);
1212     }
1213 
1214     if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty())
1215     {
1216         vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter);
1217         if(!filters.empty())
1218         {
1219             Ice::StringSeq adapterIds;
1220             for(LocatorAdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
1221             {
1222                 adapterIds.push_back(q->id);
1223             }
1224 
1225             for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
1226             {
1227                 adapterIds = (*q)->filter(id, adapterIds, connection, context);
1228             }
1229 
1230             LocatorAdapterInfoSeq filteredAdpts;
1231             filteredAdpts.reserve(adpts.size());
1232             for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
1233             {
1234                 for(LocatorAdapterInfoSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
1235                 {
1236                     if(*q == r->id)
1237                     {
1238                         filteredAdpts.push_back(*r);
1239                         break;
1240                     }
1241                 }
1242             }
1243             adpts.swap(filteredAdpts);
1244         }
1245     }
1246 }
1247 
1248 bool
addAdapterSyncCallback(const string & id,const SynchronizationCallbackPtr & callback,const std::set<std::string> & excludes)1249 Database::addAdapterSyncCallback(const string& id,
1250                                  const SynchronizationCallbackPtr& callback,
1251                                  const std::set<std::string>& excludes)
1252 {
1253     Lock sync(*this); // Make sure this isn't call during an update.
1254     return _adapterCache.get(id)->addSyncCallback(callback, excludes);
1255 }
1256 
1257 AdapterInfoSeq
getAdapterInfo(const string & id)1258 Database::getAdapterInfo(const string& id)
1259 {
1260     //
1261     // First we check if the given adapter id is associated to a
1262     // server, if that's the case we get the adapter proxy from the
1263     // server.
1264     //
1265     try
1266     {
1267         Lock sync(*this); // Make sure this isn't call during an update.
1268         return _adapterCache.get(id)->getAdapterInfo();
1269     }
1270     catch(const AdapterNotExistException&)
1271     {
1272     }
1273 
1274     //
1275     // Otherwise, we check the adapter endpoint table -- if there's an
1276     // entry the adapter is managed by the registry itself.
1277     //
1278     IceDB::ReadOnlyTxn txn(_env);
1279 
1280     AdapterInfo info;
1281     AdapterInfoSeq infos;
1282     if(_adapters.get(txn, id, info))
1283     {
1284         infos.push_back(info);
1285     }
1286     else
1287     {
1288         //
1289         // If it's not a regular object adapter, perhaps it's a replica
1290         // group...
1291         //
1292         infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1293         if(infos.empty())
1294         {
1295             throw AdapterNotExistException(id);
1296         }
1297     }
1298     return infos;
1299 }
1300 
1301 AdapterInfoSeq
getFilteredAdapterInfo(const string & id,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1302 Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1303 {
1304     //
1305     // First we check if the given adapter id is associated to a
1306     // server, if that's the case we get the adapter proxy from the
1307     // server.
1308     //
1309     try
1310     {
1311         AdapterInfoSeq infos;
1312         ReplicaGroupEntryPtr replicaGroup;
1313         {
1314             Lock sync(*this); // Make sure this isn't call during an update.
1315 
1316             AdapterEntryPtr entry = _adapterCache.get(id);
1317             infos = entry->getAdapterInfo();
1318             replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry);
1319         }
1320         if(replicaGroup)
1321         {
1322             filterAdapterInfos(replicaGroup->getFilter(), id, _pluginFacade, con, ctx, infos);
1323         }
1324         return infos;
1325     }
1326     catch(const AdapterNotExistException&)
1327     {
1328     }
1329 
1330     //
1331     // Otherwise, we check the adapter endpoint table -- if there's an
1332     // entry the adapter is managed by the registry itself.
1333     //
1334     IceDB::ReadOnlyTxn txn(_env);
1335 
1336     AdapterInfo info;
1337     AdapterInfoSeq infos;
1338     if(_adapters.get(txn, id, info))
1339     {
1340         infos.push_back(info);
1341     }
1342     else
1343     {
1344         //
1345         // If it's not a regular object adapter, perhaps it's a replica
1346         // group...
1347         //
1348         infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1349         if(infos.empty())
1350         {
1351             throw AdapterNotExistException(id);
1352         }
1353         filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
1354     }
1355     return infos;
1356 }
1357 
1358 string
getAdapterServer(const string & id) const1359 Database::getAdapterServer(const string& id) const
1360 {
1361     try
1362     {
1363         Lock sync(*this); // Make sure this isn't call during an update.
1364         ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
1365         if(adapter)
1366         {
1367             return adapter->getServerId();
1368         }
1369     }
1370     catch(const AdapterNotExistException&)
1371     {
1372     }
1373     return "";
1374 }
1375 
1376 string
getAdapterApplication(const string & id) const1377 Database::getAdapterApplication(const string& id) const
1378 {
1379     try
1380     {
1381         Lock sync(*this); // Make sure this isn't call during an update.
1382         return _adapterCache.get(id)->getApplication();
1383     }
1384     catch(const AdapterNotExistException&)
1385     {
1386     }
1387     return "";
1388 }
1389 
1390 string
getAdapterNode(const string & id) const1391 Database::getAdapterNode(const string& id) const
1392 {
1393     try
1394     {
1395         Lock sync(*this); // Make sure this isn't call during an update.
1396         ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
1397         if(adapter)
1398         {
1399             return adapter->getNodeName();
1400         }
1401     }
1402     catch(const AdapterNotExistException&)
1403     {
1404     }
1405     return "";
1406 }
1407 
1408 Ice::StringSeq
getAllAdapters(const string & expression)1409 Database::getAllAdapters(const string& expression)
1410 {
1411     Lock sync(*this);
1412     vector<string> result;
1413     vector<string> ids = _adapterCache.getAll(expression);
1414     result.swap(ids);
1415     set<string> groups;
1416 
1417     IceDB::ReadOnlyTxn txn(_env);
1418 
1419     string name;
1420     AdapterInfo info;
1421     AdapterMapROCursor cursor(_adapters, txn);
1422     while(cursor.get(name, info, MDB_NEXT))
1423     {
1424         if(expression.empty() || IceUtilInternal::match(name, expression, true))
1425         {
1426             result.push_back(name);
1427         }
1428         string replicaGroupId = info.replicaGroupId;
1429         if(!replicaGroupId.empty() && (expression.empty() || IceUtilInternal::match(replicaGroupId, expression, true)))
1430         {
1431             groups.insert(replicaGroupId);
1432         }
1433     }
1434     cursor.close();
1435 
1436     //
1437     // COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun.
1438     //
1439     //result.insert(result.end(), groups.begin(), groups.end())
1440     for(set<string>::const_iterator q = groups.begin(); q != groups.end(); ++q)
1441     {
1442         result.push_back(*q);
1443     }
1444     return result;
1445 }
1446 
1447 void
addObject(const ObjectInfo & info)1448 Database::addObject(const ObjectInfo& info)
1449 {
1450     assert(_master);
1451 
1452     int serial = 0;
1453     {
1454         Lock sync(*this);
1455         const Ice::Identity id = info.proxy->ice_getIdentity();
1456 
1457         if(_objectCache.has(id))
1458         {
1459             throw ObjectExistsException(id);
1460         }
1461 
1462         Ice::Long dbSerial = 0;
1463         try
1464         {
1465             IceDB::ReadWriteTxn txn(_env);
1466 
1467             if(_objects.find(txn, id))
1468             {
1469                 throw ObjectExistsException(id);
1470             }
1471             addObject(txn, info, false);
1472             dbSerial = updateSerial(txn, objectsDbName);
1473 
1474             txn.commit();
1475         }
1476         catch(const IceDB::LMDBException& ex)
1477         {
1478             logError(_communicator, ex);
1479             throw;
1480         }
1481 
1482         serial = _objectObserverTopic->objectAdded(dbSerial, info);
1483 
1484         if(_traceLevels->object > 0)
1485         {
1486             Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1487             out << "added object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1488         }
1489     }
1490     _objectObserverTopic->waitForSyncedSubscribers(serial);
1491 }
1492 
1493 void
addOrUpdateObject(const ObjectInfo & info,Ice::Long dbSerial)1494 Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
1495 {
1496     assert(dbSerial != 0 || _master);
1497 
1498     int serial = 0; // Initialize to prevent warning.
1499     {
1500         Lock sync(*this);
1501         const Ice::Identity id = info.proxy->ice_getIdentity();
1502 
1503         if(_objectCache.has(id))
1504         {
1505             throw ObjectExistsException(id);
1506         }
1507 
1508         bool update = false;
1509         try
1510         {
1511             IceDB::ReadWriteTxn txn(_env);
1512 
1513             Ice::Identity k;
1514             ObjectInfo v;
1515             update = _objects.get(txn, k, v);
1516             if(update)
1517             {
1518                 _objectsByType.del(txn, v.type, v.proxy->ice_getIdentity());
1519             }
1520             addObject(txn, info, false);
1521             dbSerial = updateSerial(txn, objectsDbName, dbSerial);
1522 
1523             txn.commit();
1524         }
1525         catch(const IceDB::LMDBException& ex)
1526         {
1527             logError(_communicator, ex);
1528             throw;
1529         }
1530 
1531         if(update)
1532         {
1533             serial = _objectObserverTopic->objectUpdated(dbSerial, info);
1534         }
1535         else
1536         {
1537             serial = _objectObserverTopic->objectAdded(dbSerial, info);
1538         }
1539 
1540         if(_traceLevels->object > 0)
1541         {
1542             Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1543             out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1544         }
1545     }
1546     _objectObserverTopic->waitForSyncedSubscribers(serial);
1547 }
1548 
1549 void
removeObject(const Ice::Identity & id,Ice::Long dbSerial)1550 Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
1551 {
1552     assert(dbSerial != 0 || _master);
1553 
1554     int serial = 0; // Initialize to prevent warning.
1555     {
1556         Lock sync(*this);
1557         if(_objectCache.has(id))
1558         {
1559             throw DeploymentException("removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"
1560                                       + "the object was added with the application descriptor `" +
1561                                       _objectCache.get(id)->getApplication());
1562         }
1563 
1564         try
1565         {
1566             IceDB::ReadWriteTxn txn(_env);
1567 
1568             ObjectInfo info;
1569             if(!_objects.get(txn, id, info))
1570             {
1571                 throw ObjectNotRegisteredException(id);
1572             }
1573             deleteObject(txn, info, false);
1574             dbSerial = updateSerial(txn, objectsDbName, dbSerial);
1575 
1576             txn.commit();
1577         }
1578         catch(const IceDB::LMDBException& ex)
1579         {
1580             logError(_communicator, ex);
1581             throw;
1582         }
1583 
1584         serial = _objectObserverTopic->objectRemoved(dbSerial, id);
1585 
1586         if(_traceLevels->object > 0)
1587         {
1588             Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1589             out << "removed object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1590         }
1591     }
1592     _objectObserverTopic->waitForSyncedSubscribers(serial);
1593 }
1594 
1595 void
updateObject(const Ice::ObjectPrx & proxy)1596 Database::updateObject(const Ice::ObjectPrx& proxy)
1597 {
1598     assert(_master);
1599 
1600     int serial = 0;
1601     {
1602         Lock sync(*this);
1603 
1604         const Ice::Identity id = proxy->ice_getIdentity();
1605         if(_objectCache.has(id))
1606         {
1607             throw DeploymentException("updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"
1608                                       + "the object was added with the application descriptor `" +
1609                                       _objectCache.get(id)->getApplication() + "'");
1610         }
1611 
1612         ObjectInfo info;
1613         Ice::Long dbSerial = 0;
1614         try
1615         {
1616             IceDB::ReadWriteTxn txn(_env);
1617 
1618             if(!_objects.get(txn, id, info))
1619             {
1620                 throw ObjectNotRegisteredException(id);
1621             }
1622             info.proxy = proxy;
1623             addObject(txn, info, false);
1624             dbSerial = updateSerial(txn, objectsDbName);
1625 
1626             txn.commit();
1627         }
1628         catch(const IceDB::LMDBException& ex)
1629         {
1630             logError(_communicator, ex);
1631             throw;
1632         }
1633 
1634         serial = _objectObserverTopic->objectUpdated(dbSerial, info);
1635         if(_traceLevels->object > 0)
1636         {
1637             Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1638             out << "updated object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1639         }
1640     }
1641     _objectObserverTopic->waitForSyncedSubscribers(serial);
1642 }
1643 
1644 int
addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq & objects)1645 Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
1646 {
1647     Lock sync(*this);
1648     try
1649     {
1650         IceDB::ReadWriteTxn txn(_env);
1651         for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
1652         {
1653             Ice::Identity id = p->proxy->ice_getIdentity();
1654             ObjectInfo info;
1655             if(_objects.get(txn, id, info))
1656             {
1657                 _objectsByType.del(txn, info.type, id);
1658             }
1659             addObject(txn, *p, false);
1660         }
1661         txn.commit();
1662     }
1663     catch(const IceDB::LMDBException& ex)
1664     {
1665         logError(_communicator, ex);
1666         throw;
1667     }
1668 
1669     return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects);
1670 }
1671 
1672 int
removeRegistryWellKnownObjects(const ObjectInfoSeq & objects)1673 Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
1674 {
1675     Lock sync(*this);
1676     try
1677     {
1678         IceDB::ReadWriteTxn txn(_env);
1679         for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
1680         {
1681             Ice::Identity id = p->proxy->ice_getIdentity();
1682             ObjectInfo info;
1683             if(_objects.get(txn, id, info))
1684             {
1685                 deleteObject(txn, info, false);
1686             }
1687         }
1688         txn.commit();
1689     }
1690     catch(const IceDB::LMDBException& ex)
1691     {
1692         logError(_communicator, ex);
1693         throw;
1694     }
1695 
1696     return _objectObserverTopic->wellKnownObjectsRemoved(objects);
1697 }
1698 
1699 Ice::ObjectPrx
getObjectProxy(const Ice::Identity & id)1700 Database::getObjectProxy(const Ice::Identity& id)
1701 {
1702     try
1703     {
1704         //
1705         // Only return proxies for non allocatable objects.
1706         //
1707         return _objectCache.get(id)->getProxy();
1708     }
1709     catch(const ObjectNotRegisteredException&)
1710     {
1711     }
1712 
1713     IceDB::ReadOnlyTxn txn(_env);
1714     ObjectInfo info;
1715     if(!_objects.get(txn, id, info))
1716     {
1717         throw ObjectNotRegisteredException(id);
1718     }
1719     return info.proxy;
1720 }
1721 
1722 Ice::ObjectPrx
getObjectByType(const string & type,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1723 Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1724 {
1725     Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
1726     if(objs.empty())
1727     {
1728         return 0;
1729     }
1730     return objs[IceUtilInternal::random(static_cast<int>(objs.size()))];
1731 }
1732 
1733 Ice::ObjectPrx
getObjectByTypeOnLeastLoadedNode(const string & type,LoadSample sample,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1734 Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
1735                                            const Ice::Context& ctx)
1736 {
1737     Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
1738     if(objs.empty())
1739     {
1740         return 0;
1741     }
1742 
1743     IceUtilInternal::shuffle(objs.begin(), objs.end());
1744     vector<pair<Ice::ObjectPrx, float> > objectsWithLoad;
1745     objectsWithLoad.reserve(objs.size());
1746     for(Ice::ObjectProxySeq::const_iterator p = objs.begin(); p != objs.end(); ++p)
1747     {
1748         float load = 1.0f;
1749         if(!(*p)->ice_getAdapterId().empty())
1750         {
1751             try
1752             {
1753                 load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
1754             }
1755             catch(const AdapterNotExistException&)
1756             {
1757             }
1758         }
1759         objectsWithLoad.push_back(make_pair(*p, load));
1760     }
1761     return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), ObjectLoadCI())->first;
1762 }
1763 
1764 Ice::ObjectProxySeq
getObjectsByType(const string & type,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1765 Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1766 {
1767     Ice::ObjectProxySeq proxies;
1768 
1769     vector<ObjectEntryPtr> objects = _objectCache.getObjectsByType(type);
1770     for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
1771     {
1772         if(_nodeObserverTopic->isServerEnabled((*q)->getServer())) // Only return proxies from enabled servers.
1773         {
1774             proxies.push_back((*q)->getProxy());
1775         }
1776     }
1777 
1778     IceDB::ReadOnlyTxn txn(_env);
1779     vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type);
1780     for(unsigned int i = 0; i < infos.size(); ++i)
1781     {
1782         proxies.push_back(infos[i].proxy);
1783     }
1784 
1785     if(con && !proxies.empty() && _pluginFacade->hasTypeFilters())
1786     {
1787         vector<TypeFilterPtr> filters = _pluginFacade->getTypeFilters(type);
1788         if(!filters.empty())
1789         {
1790             for(vector<TypeFilterPtr>::const_iterator p = filters.begin(); p != filters.end(); ++p)
1791             {
1792                 proxies = (*p)->filter(type, proxies, con, ctx);
1793             }
1794         }
1795     }
1796     return proxies;
1797 }
1798 
1799 ObjectInfo
getObjectInfo(const Ice::Identity & id)1800 Database::getObjectInfo(const Ice::Identity& id)
1801 {
1802     try
1803     {
1804         ObjectEntryPtr object = _objectCache.get(id);
1805         return object->getObjectInfo();
1806     }
1807     catch(const ObjectNotRegisteredException&)
1808     {
1809     }
1810 
1811     IceDB::ReadOnlyTxn txn(_env);
1812     ObjectInfo info;
1813     if(!_objects.get(txn, id, info))
1814     {
1815         throw ObjectNotRegisteredException(id);
1816     }
1817     return info;
1818 }
1819 
1820 ObjectInfoSeq
getAllObjectInfos(const string & expression)1821 Database::getAllObjectInfos(const string& expression)
1822 {
1823     ObjectInfoSeq infos = _objectCache.getAll(expression);
1824 
1825     IceDB::ReadOnlyTxn txn(_env);
1826 
1827     Ice::Identity id;
1828     ObjectInfo info;
1829     ObjectsMapROCursor cursor(_objects, txn);
1830     while(cursor.get(id, info, MDB_NEXT))
1831     {
1832         if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(id), expression, true))
1833         {
1834             infos.push_back(info);
1835         }
1836     }
1837     return infos;
1838 }
1839 
1840 ObjectInfoSeq
getObjectInfosByType(const string & type)1841 Database::getObjectInfosByType(const string& type)
1842 {
1843     ObjectInfoSeq infos = _objectCache.getAllByType(type);
1844 
1845     IceDB::ReadOnlyTxn txn(_env);
1846     ObjectInfoSeq dbInfos = findByType(txn, _objects, _objectsByType, type);
1847     for(unsigned int i = 0; i < dbInfos.size(); ++i)
1848     {
1849         infos.push_back(dbInfos[i]);
1850     }
1851     return infos;
1852 }
1853 
1854 void
addInternalObject(const ObjectInfo & info,bool replace)1855 Database::addInternalObject(const ObjectInfo& info, bool replace)
1856 {
1857     Lock sync(*this);
1858     const Ice::Identity id = info.proxy->ice_getIdentity();
1859 
1860     try
1861     {
1862         IceDB::ReadWriteTxn txn(_env);
1863 
1864         ObjectInfo oldInfo;
1865         if(_internalObjects.get(txn, id, oldInfo))
1866         {
1867             if(!replace)
1868             {
1869                 throw ObjectExistsException(id);
1870             }
1871             _internalObjectsByType.del(txn, oldInfo.type, id);
1872         }
1873         addObject(txn, info, true);
1874 
1875         txn.commit();
1876     }
1877     catch(const IceDB::LMDBException& ex)
1878     {
1879         logError(_communicator, ex);
1880         throw;
1881     }
1882 }
1883 
1884 void
removeInternalObject(const Ice::Identity & id)1885 Database::removeInternalObject(const Ice::Identity& id)
1886 {
1887     Lock sync(*this);
1888 
1889     try
1890     {
1891         IceDB::ReadWriteTxn txn(_env);
1892 
1893         ObjectInfo info;
1894         if(!_internalObjects.get(txn, id, info))
1895         {
1896             throw ObjectNotRegisteredException(id);
1897         }
1898         deleteObject(txn, info, true);
1899 
1900         txn.commit();
1901     }
1902     catch(const IceDB::LMDBException& ex)
1903     {
1904         logError(_communicator, ex);
1905         throw;
1906     }
1907 }
1908 
1909 Ice::ObjectProxySeq
getInternalObjectsByType(const string & type)1910 Database::getInternalObjectsByType(const string& type)
1911 {
1912     Ice::ObjectProxySeq proxies;
1913 
1914     IceDB::ReadOnlyTxn txn(_env);
1915     vector<ObjectInfo> infos = findByType(txn, _internalObjects, _internalObjectsByType, type);
1916     for(unsigned int i = 0; i < infos.size(); ++i)
1917     {
1918         proxies.push_back(infos[i].proxy);
1919     }
1920     return proxies;
1921 }
1922 
1923 void
checkForAddition(const ApplicationHelper & app,const IceDB::ReadWriteTxn & txn)1924 Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteTxn& txn)
1925 {
1926     set<string> serverIds;
1927     set<string> adapterIds;
1928     set<Ice::Identity> objectIds;
1929 
1930     app.getIds(serverIds, adapterIds, objectIds);
1931 
1932     for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition));
1933     if(!adapterIds.empty())
1934     {
1935         for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1936         {
1937             checkAdapterForAddition(*p, txn);
1938         }
1939     }
1940     if(!objectIds.empty())
1941     {
1942         for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p)
1943         {
1944             checkObjectForAddition(*p, txn);
1945         }
1946     }
1947 
1948     set<string> repGrps;
1949     set<string> adptRepGrps;
1950     app.getReplicaGroups(repGrps, adptRepGrps);
1951     for_each(adptRepGrps.begin(), adptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
1952 }
1953 
1954 void
checkForUpdate(const ApplicationHelper & origApp,const ApplicationHelper & newApp,const IceDB::ReadWriteTxn & txn)1955 Database::checkForUpdate(const ApplicationHelper& origApp,
1956                          const ApplicationHelper& newApp,
1957                          const IceDB::ReadWriteTxn& txn)
1958 {
1959     set<string> oldSvrs, newSvrs;
1960     set<string> oldAdpts, newAdpts;
1961     set<Ice::Identity> oldObjs, newObjs;
1962 
1963     origApp.getIds(oldSvrs, oldAdpts, oldObjs);
1964     newApp.getIds(newSvrs, newAdpts, newObjs);
1965 
1966     Ice::StringSeq addedSvrs;
1967     set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs));
1968     for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition));
1969 
1970     Ice::StringSeq addedAdpts;
1971     set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts));
1972     if(!addedAdpts.empty())
1973     {
1974         for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p)
1975         {
1976             checkAdapterForAddition(*p, txn);
1977         }
1978     }
1979 
1980     vector<Ice::Identity> addedObjs;
1981     set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs));
1982     if(!addedObjs.empty())
1983     {
1984         for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p)
1985         {
1986             checkObjectForAddition(*p, txn);
1987         }
1988     }
1989 
1990     set<string> oldRepGrps, newRepGrps;
1991     set<string> oldAdptRepGrps, newAdptRepGrps;
1992     origApp.getReplicaGroups(oldRepGrps, oldAdptRepGrps);
1993     newApp.getReplicaGroups(newRepGrps, newAdptRepGrps);
1994 
1995     set<string> rmRepGrps;
1996     set_difference(oldRepGrps.begin(), oldRepGrps.end(), newRepGrps.begin(),newRepGrps.end(), set_inserter(rmRepGrps));
1997     for_each(rmRepGrps.begin(), rmRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
1998 
1999     set<string> addedAdptRepGrps;
2000     set_difference(newAdptRepGrps.begin(),newAdptRepGrps.end(), oldAdptRepGrps.begin(), oldAdptRepGrps.end(),
2001                    set_inserter(addedAdptRepGrps));
2002     for_each(addedAdptRepGrps.begin(), addedAdptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
2003 
2004     vector<string> invalidAdptRepGrps;
2005     set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(),
2006                      back_inserter(invalidAdptRepGrps));
2007     if(!invalidAdptRepGrps.empty())
2008     {
2009         throw DeploymentException("couldn't find replica group `" + invalidAdptRepGrps.front() + "'");
2010     }
2011 }
2012 
2013 void
checkForRemove(const ApplicationHelper & app)2014 Database::checkForRemove(const ApplicationHelper& app)
2015 {
2016     set<string> replicaGroups;
2017     set<string> adapterReplicaGroups;
2018     app.getReplicaGroups(replicaGroups, adapterReplicaGroups);
2019     for_each(replicaGroups.begin(), replicaGroups.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
2020 }
2021 
2022 void
checkServerForAddition(const string & id)2023 Database::checkServerForAddition(const string& id)
2024 {
2025     if(_serverCache.has(id))
2026     {
2027         throw DeploymentException("server `" + id + "' is already registered");
2028     }
2029 }
2030 
2031 void
checkAdapterForAddition(const string & id,const IceDB::ReadWriteTxn & txn)2032 Database::checkAdapterForAddition(const string& id, const IceDB::ReadWriteTxn& txn)
2033 {
2034     bool found = false;
2035     if(_adapterCache.has(id))
2036     {
2037         found = true;
2038     }
2039     else
2040     {
2041         if(_adapters.find(txn, id))
2042         {
2043             found = true;
2044         }
2045         else
2046         {
2047             if(!findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id).empty())
2048             {
2049                 found = true;
2050             }
2051         }
2052     }
2053 
2054     if(found)
2055     {
2056         throw DeploymentException("adapter `" + id + "' is already registered");
2057     }
2058 }
2059 
2060 void
checkObjectForAddition(const Ice::Identity & objectId,const IceDB::ReadWriteTxn & txn)2061 Database::checkObjectForAddition(const Ice::Identity& objectId,
2062                                  const IceDB::ReadWriteTxn& txn)
2063 {
2064     bool found = false;
2065     if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId))
2066     {
2067         found = true;
2068     }
2069     else
2070     {
2071         if(_objects.find(txn, objectId))
2072         {
2073             found = true;
2074         }
2075     }
2076 
2077     if(found)
2078     {
2079         throw DeploymentException("object `" + _communicator->identityToString(objectId) + "' is already registered");
2080     }
2081 }
2082 
2083 void
checkReplicaGroupExists(const string & replicaGroup)2084 Database::checkReplicaGroupExists(const string& replicaGroup)
2085 {
2086     ReplicaGroupEntryPtr entry;
2087     try
2088     {
2089         entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
2090     }
2091     catch(const AdapterNotExistException&)
2092     {
2093     }
2094 
2095     if(!entry)
2096     {
2097         throw DeploymentException("couldn't find replica group `" + replicaGroup + "'");
2098     }
2099 }
2100 
2101 void
checkReplicaGroupForRemove(const string & replicaGroup)2102 Database::checkReplicaGroupForRemove(const string& replicaGroup)
2103 {
2104     ReplicaGroupEntryPtr entry;
2105     try
2106     {
2107         entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
2108     }
2109     catch(const AdapterNotExistException&)
2110     {
2111     }
2112 
2113     if(!entry)
2114     {
2115         //
2116         // This would indicate an inconsistency with the cache and
2117         // database. We don't print an error, it will be printed
2118         // when the application is actually removed.
2119         //
2120         return;
2121     }
2122 
2123     if(entry->hasAdaptersFromOtherApplications())
2124     {
2125         throw DeploymentException("couldn't remove application because the replica group `" + replicaGroup +
2126                                   "' is used by object adapters from other applications.");
2127     }
2128 }
2129 
2130 void
load(const ApplicationHelper & app,ServerEntrySeq & entries,const string & uuid,int revision)2131 Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const string& uuid, int revision)
2132 {
2133     const NodeDescriptorDict& nodes = app.getInstance().nodes;
2134     const string application = app.getInstance().name;
2135     for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
2136     {
2137         _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
2138     }
2139 
2140     const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
2141     for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
2142     {
2143         assert(!r->id.empty());
2144         _adapterCache.addReplicaGroup(*r, application);
2145         for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2146         {
2147             _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
2148         }
2149     }
2150 
2151     map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
2152     for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
2153     {
2154         entries.push_back(_serverCache.add(p->second));
2155     }
2156 }
2157 
2158 void
unload(const ApplicationHelper & app,ServerEntrySeq & entries)2159 Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
2160 {
2161     map<string, ServerInfo> servers = app.getServerInfos("", 0);
2162     for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
2163     {
2164         entries.push_back(_serverCache.remove(p->first, false));
2165     }
2166 
2167     const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
2168     for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
2169     {
2170         for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2171         {
2172             _objectCache.remove(o->id);
2173         }
2174         _adapterCache.removeReplicaGroup(r->id);
2175     }
2176 
2177     const NodeDescriptorDict& nodes = app.getInstance().nodes;
2178     const string application = app.getInstance().name;
2179     for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
2180     {
2181         _nodeCache.get(n->first)->removeDescriptor(application);
2182     }
2183 }
2184 
2185 void
reload(const ApplicationHelper & oldApp,const ApplicationHelper & newApp,ServerEntrySeq & entries,const string & uuid,int revision,bool noRestart)2186 Database::reload(const ApplicationHelper& oldApp,
2187                  const ApplicationHelper& newApp,
2188                  ServerEntrySeq& entries,
2189                  const string& uuid,
2190                  int revision,
2191                  bool noRestart)
2192 {
2193     const string application = oldApp.getInstance().name;
2194 
2195     //
2196     // Remove destroyed servers.
2197     //
2198     map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
2199     map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision);
2200     vector<pair<bool, ServerInfo> > load;
2201     for(map<string, ServerInfo>::const_iterator p = newServers.begin(); p != newServers.end(); ++p)
2202     {
2203         map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
2204         if(q == oldServers.end())
2205         {
2206             load.push_back(make_pair(false, p->second));
2207         }
2208         else if(isServerUpdated(p->second, q->second))
2209         {
2210             _serverCache.preUpdate(p->second, noRestart);
2211             load.push_back(make_pair(true, p->second));
2212         }
2213         else
2214         {
2215             ServerEntryPtr server = _serverCache.get(p->first);
2216             server->update(q->second, noRestart); // Just update the server revision on the node.
2217             entries.push_back(server);
2218         }
2219     }
2220     for(map<string, ServerInfo>::const_iterator p = oldServers.begin(); p != oldServers.end(); ++p)
2221     {
2222         map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
2223         if(q == newServers.end())
2224         {
2225             entries.push_back(_serverCache.remove(p->first, noRestart));
2226         }
2227     }
2228 
2229     //
2230     // Remove destroyed replica groups.
2231     //
2232     const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups;
2233     const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups;
2234     for(ReplicaGroupDescriptorSeq::const_iterator r = oldAdpts.begin(); r != oldAdpts.end(); ++r)
2235     {
2236         ReplicaGroupDescriptorSeq::const_iterator t;
2237         for(t = newAdpts.begin(); t != newAdpts.end(); ++t)
2238         {
2239             if(t->id == r->id)
2240             {
2241                 break;
2242             }
2243         }
2244         for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2245         {
2246             _objectCache.remove(o->id);
2247         }
2248         if(t == newAdpts.end())
2249         {
2250             _adapterCache.removeReplicaGroup(r->id);
2251         }
2252     }
2253 
2254     //
2255     // Remove all the node descriptors.
2256     //
2257     const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes;
2258     for(NodeDescriptorDict::const_iterator n = oldNodes.begin(); n != oldNodes.end(); ++n)
2259     {
2260         _nodeCache.get(n->first)->removeDescriptor(application);
2261     }
2262 
2263     //
2264     // Add back node descriptors.
2265     //
2266     const NodeDescriptorDict& newNodes = newApp.getInstance().nodes;
2267     for(NodeDescriptorDict::const_iterator n = newNodes.begin(); n != newNodes.end(); ++n)
2268     {
2269         _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
2270     }
2271 
2272     //
2273     // Add back replica groups.
2274     //
2275     for(ReplicaGroupDescriptorSeq::const_iterator r = newAdpts.begin(); r != newAdpts.end(); ++r)
2276     {
2277         try
2278         {
2279             ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
2280             assert(entry);
2281             entry->update(application, r->loadBalancing, r->filter);
2282         }
2283         catch(const AdapterNotExistException&)
2284         {
2285             _adapterCache.addReplicaGroup(*r, application);
2286         }
2287 
2288         for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2289         {
2290             _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
2291         }
2292     }
2293 
2294     //
2295     // Add back servers.
2296     //
2297     for(vector<pair<bool, ServerInfo> >::const_iterator q = load.begin(); q != load.end(); ++q)
2298     {
2299         if(q->first) // Update
2300         {
2301             entries.push_back(_serverCache.postUpdate(q->second, noRestart));
2302         }
2303         else
2304         {
2305             entries.push_back(_serverCache.add(q->second));
2306         }
2307     }
2308 }
2309 
2310 Ice::Long
saveApplication(const ApplicationInfo & info,const IceDB::ReadWriteTxn & txn,Ice::Long dbSerial)2311 Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
2312 {
2313     assert(dbSerial != 0 || _master);
2314     _applications.put(txn, info.descriptor.name, info);
2315     return updateSerial(txn, applicationsDbName, dbSerial);
2316 }
2317 
2318 Ice::Long
removeApplication(const string & name,const IceDB::ReadWriteTxn & txn,Ice::Long dbSerial)2319 Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
2320 {
2321     assert(dbSerial != 0 || _master);
2322     _applications.del(txn, name);
2323     return updateSerial(txn, applicationsDbName, dbSerial);
2324 }
2325 
2326 void
checkUpdate(const ApplicationHelper & oldApp,const ApplicationHelper & newApp,const string & uuid,int revision,bool noRestart)2327 Database::checkUpdate(const ApplicationHelper& oldApp,
2328                       const ApplicationHelper& newApp,
2329                       const string& uuid,
2330                       int revision,
2331                       bool noRestart)
2332 {
2333     const string application = oldApp.getInstance().name;
2334 
2335     map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
2336     map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision + 1);
2337 
2338     map<string, ServerInfo>::const_iterator p;
2339     vector<string> servers;
2340     vector<string> reasons;
2341     vector<CheckUpdateResultPtr> results;
2342     set<string> unreachableNodes;
2343 
2344     if(noRestart)
2345     {
2346         for(p = oldServers.begin(); p != oldServers.end(); ++p)
2347         {
2348             map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
2349             if(q == newServers.end())
2350             {
2351                 try
2352                 {
2353                     ServerInfo info = p->second;
2354                     info.descriptor = 0; // Clear the descriptor to indicate removal.
2355                     CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(info, true);
2356                     if(result)
2357                     {
2358                         results.push_back(result);
2359                     }
2360                 }
2361                 catch(const NodeUnreachableException& ex)
2362                 {
2363                     unreachableNodes.insert(ex.name);
2364                 }
2365                 catch(const DeploymentException& ex)
2366                 {
2367                     servers.push_back(p->first);
2368                     reasons.push_back(ex.reason);
2369                 }
2370             }
2371         }
2372     }
2373 
2374     for(p = newServers.begin(); p != newServers.end(); ++p)
2375     {
2376         map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
2377         if(q != oldServers.end() && isServerUpdated(p->second, q->second))
2378         {
2379             if(noRestart &&
2380                p->second.node == q->second.node &&
2381                isServerUpdated(p->second, q->second, true)) // Ignore properties
2382             {
2383                 //
2384                 // The updates are not only property updates and noRestart is required, no
2385                 // need to check the server update on the node, we know already it requires
2386                 // a restart.
2387                 //
2388                 servers.push_back(p->first);
2389                 reasons.push_back("update requires the server `" + p->first + "' to be stopped");
2390             }
2391             else
2392             {
2393                 //
2394                 // Ask the node to check the server update.
2395                 //
2396                 try
2397                 {
2398                     CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(p->second, noRestart);
2399                     if(result)
2400                     {
2401                         results.push_back(result);
2402                     }
2403                 }
2404                 catch(const NodeUnreachableException& ex)
2405                 {
2406                     unreachableNodes.insert(ex.name);
2407                 }
2408                 catch(const DeploymentException& ex)
2409                 {
2410                     servers.push_back(p->first);
2411                     reasons.push_back(ex.reason);
2412                 }
2413             }
2414         }
2415     }
2416 
2417     for(vector<CheckUpdateResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
2418     {
2419         try
2420         {
2421             (*q)->getResult();
2422         }
2423         catch(const NodeUnreachableException& ex)
2424         {
2425             unreachableNodes.insert(ex.name);
2426         }
2427         catch(const DeploymentException& ex)
2428         {
2429             servers.push_back((*q)->getServer());
2430             reasons.push_back(ex.reason);
2431         }
2432     }
2433 
2434     if(noRestart)
2435     {
2436         if(!servers.empty() || !unreachableNodes.empty())
2437         {
2438             if(_traceLevels->application > 0)
2439             {
2440                 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
2441                 out << "check for application `" << application << "' update failed:";
2442                 if(!unreachableNodes.empty())
2443                 {
2444 #if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
2445                     Ice::StringSeq nodes;
2446                     for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
2447                     {
2448                         nodes.push_back(*r);
2449                     }
2450 #else
2451                     Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
2452 #endif
2453                     if(nodes.size() == 1)
2454                     {
2455                         out << "\nthe node `" << nodes[0] << "' is down";
2456                     }
2457                     else
2458                     {
2459                         out << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
2460                     }
2461                 }
2462                 if(!reasons.empty())
2463                 {
2464                     for(vector<string>::const_iterator r = reasons.begin(); r != reasons.end(); ++r)
2465                     {
2466                         out << "\n" << *r;
2467                     }
2468                 }
2469             }
2470 
2471             ostringstream os;
2472             os << "check for application `" << application << "' update failed:";
2473             if(!servers.empty())
2474             {
2475                 if(servers.size() == 1)
2476                 {
2477                     os << "\nthe server `" << servers[0] << "' would need to be stopped";
2478                 }
2479                 else
2480                 {
2481                     os << "\nthe servers `" << toString(servers, ", ") << "' would need to be stopped";
2482                 }
2483             }
2484             if(!unreachableNodes.empty())
2485             {
2486 #if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
2487                 Ice::StringSeq nodes;
2488                 for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
2489                 {
2490                     nodes.push_back(*r);
2491                 }
2492 #else
2493                 Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
2494 #endif
2495                 if(nodes.size() == 1)
2496                 {
2497                     os << "\nthe node `" << nodes[0] << "' is down";
2498                 }
2499                 else
2500                 {
2501                     os << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
2502                 }
2503             }
2504             throw DeploymentException(os.str());
2505         }
2506     }
2507     else if(!reasons.empty())
2508     {
2509         ostringstream os;
2510         os << "check for application `" << application << "' update failed:";
2511         for(vector<string>::const_iterator r = reasons.begin(); r != reasons.end(); ++r)
2512         {
2513             os << "\n" << *r;
2514         }
2515         throw DeploymentException(os.str());
2516     }
2517 }
2518 
2519 void
finishApplicationUpdate(const ApplicationUpdateInfo & update,const ApplicationInfo & oldApp,const ApplicationHelper & previousAppHelper,const ApplicationHelper & appHelper,AdminSessionI *,bool noRestart,Ice::Long dbSerial)2520 Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
2521                                   const ApplicationInfo& oldApp,
2522                                   const ApplicationHelper& previousAppHelper,
2523                                   const ApplicationHelper& appHelper,
2524                                   AdminSessionI* /*session*/,
2525                                   bool noRestart,
2526                                   Ice::Long dbSerial)
2527 {
2528     const ApplicationDescriptor& newDesc = appHelper.getDefinition();
2529 
2530     ServerEntrySeq entries;
2531     int serial = 0;
2532     try
2533     {
2534         if(_master)
2535         {
2536             checkUpdate(previousAppHelper, appHelper, oldApp.uuid, oldApp.revision, noRestart);
2537         }
2538 
2539         Lock sync(*this);
2540 
2541         IceDB::ReadWriteTxn txn(_env);
2542 
2543         checkForUpdate(previousAppHelper, appHelper, txn);
2544         reload(previousAppHelper, appHelper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
2545 
2546         for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
2547 
2548         ApplicationInfo info = oldApp;
2549         info.updateTime = update.updateTime;
2550         info.updateUser = update.updateUser;
2551         info.revision = update.revision;
2552         info.descriptor = newDesc;
2553         dbSerial = saveApplication(info, txn, dbSerial);
2554 
2555         txn.commit();
2556 
2557         serial = _applicationObserverTopic->applicationUpdated(dbSerial, update);
2558     }
2559     catch(const DeploymentException&)
2560     {
2561         finishUpdating(update.descriptor.name);
2562         throw;
2563     }
2564     catch(const IceDB::LMDBException& ex)
2565     {
2566         logError(_communicator, ex);
2567         throw;
2568     }
2569 
2570     _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
2571 
2572     //
2573     // Mark the application as updated. All the replicas received the update so it's now safe
2574     // for the nodes to start servers.
2575     //
2576     {
2577         Lock sync(*this);
2578         vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
2579         assert(p != _updating.end());
2580         p->markUpdated();
2581     }
2582 
2583     if(_master)
2584     {
2585         try
2586         {
2587             for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
2588             {
2589                 try
2590                 {
2591                     (*p)->waitForSync();
2592                 }
2593                 catch(const NodeUnreachableException&)
2594                 {
2595                     // Ignore.
2596                 }
2597             }
2598         }
2599         catch(const DeploymentException&)
2600         {
2601             ApplicationUpdateInfo newUpdate;
2602             {
2603                 Lock sync(*this);
2604                 entries.clear();
2605                 ApplicationHelper previous(_communicator, newDesc);
2606                 ApplicationHelper helper(_communicator, oldApp.descriptor);
2607 
2608                 ApplicationInfo info = oldApp;
2609                 info.revision = update.revision + 1;
2610 
2611                 try
2612                 {
2613                     IceDB::ReadWriteTxn txn(_env);
2614                     dbSerial = saveApplication(info, txn);
2615                     txn.commit();
2616                 }
2617                 catch(const IceDB::LMDBException& ex)
2618                 {
2619                     logError(_communicator, ex);
2620                 }
2621 
2622                 reload(previous, helper, entries, info.uuid, info.revision, noRestart);
2623 
2624                 newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
2625                 newUpdate.updateUser = _lockUserId;
2626                 newUpdate.revision = info.revision;
2627                 newUpdate.descriptor = helper.diff(previous);
2628 
2629                 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
2630                 assert(p != _updating.end());
2631                 p->unmarkUpdated();
2632 
2633                 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
2634 
2635                 serial = _applicationObserverTopic->applicationUpdated(dbSerial, newUpdate);
2636             }
2637             _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for subscriber to be updated.
2638             for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
2639             finishUpdating(newDesc.name);
2640             throw;
2641         }
2642     }
2643 
2644     if(_traceLevels->application > 0)
2645     {
2646         Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
2647         out << "updated application `" << update.descriptor.name << "' (serial = `" << dbSerial << "')";
2648     }
2649     finishUpdating(update.descriptor.name);
2650 }
2651 
2652 void
waitForUpdate(const string & name)2653 Database::waitForUpdate(const string& name)
2654 {
2655     while(find(_updating.begin(), _updating.end(), name) != _updating.end())
2656     {
2657         wait();
2658     }
2659 }
2660 
2661 void
startUpdating(const string & name,const string & uuid,int revision)2662 Database::startUpdating(const string& name, const string& uuid, int revision)
2663 {
2664     // Must be called within the synchronization.
2665     assert(find(_updating.begin(), _updating.end(), name) == _updating.end());
2666     _updating.push_back(UpdateInfo(name, uuid, revision));
2667 }
2668 
2669 void
finishUpdating(const string & name)2670 Database::finishUpdating(const string& name)
2671 {
2672     Lock sync(*this);
2673 
2674     vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), name);
2675     assert(p != _updating.end());
2676     p->markUpdated();
2677     _updating.erase(p);
2678     notifyAll();
2679 }
2680 
2681 Ice::Long
getSerial(const IceDB::Txn & txn,const string & dbName)2682 Database::getSerial(const IceDB::Txn& txn, const string& dbName)
2683 {
2684     Ice::Long serial = 1;
2685     _serials.get(txn, dbName, serial);
2686     return serial;
2687 }
2688 
2689 Ice::Long
updateSerial(const IceDB::ReadWriteTxn & txn,const string & dbName,Ice::Long serial)2690 Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, Ice::Long serial)
2691 {
2692     if(serial == -1) // The master we are talking to doesn't support serials (old IceGrid versions)
2693     {
2694         return -1;
2695     }
2696 
2697     //
2698     // If a serial number is set, just update the serial number from the database,
2699     // otherwise if the serial is 0, we increment the serial from the database.
2700     //
2701     if(serial > 0)
2702     {
2703         _serials.put(txn, dbName, serial);
2704         return serial;
2705     }
2706     else
2707     {
2708         Ice::Long dbSerial = getSerial(txn, dbName) + 1;
2709         _serials.put(txn, dbName, dbSerial);
2710         return dbSerial;
2711     }
2712 }
2713 
2714 void
addAdapter(const IceDB::ReadWriteTxn & txn,const AdapterInfo & info)2715 Database::addAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
2716 {
2717     _adapters.put(txn, info.id, info);
2718     _adaptersByGroupId.put(txn, info.replicaGroupId, info.id);
2719 }
2720 
2721 void
deleteAdapter(const IceDB::ReadWriteTxn & txn,const AdapterInfo & info)2722 Database::deleteAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
2723 {
2724 
2725     _adapters.del(txn, info.id);
2726     _adaptersByGroupId.del(txn, info.replicaGroupId, info.id);
2727 }
2728 
2729 void
addObject(const IceDB::ReadWriteTxn & txn,const ObjectInfo & info,bool internal)2730 Database::addObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
2731 {
2732     if(internal)
2733     {
2734         _internalObjects.put(txn, info.proxy->ice_getIdentity(), info);
2735         _internalObjectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
2736     }
2737     else
2738     {
2739         try
2740         {
2741             _objects.put(txn, info.proxy->ice_getIdentity(), info);
2742         }
2743         catch(const IceDB::KeyTooLongException& ex)
2744         {
2745             throw DeploymentException("object identity `" +
2746                                       _communicator->identityToString(info.proxy->ice_getIdentity())
2747                                       + "' is too long: " + ex.what());
2748         }
2749         try
2750         {
2751             _objectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
2752         }
2753         catch(const IceDB::KeyTooLongException& ex)
2754         {
2755             throw DeploymentException("object type `" + info.type + "' is too long: " + ex.what());
2756         }
2757     }
2758 }
2759 
2760 void
deleteObject(const IceDB::ReadWriteTxn & txn,const ObjectInfo & info,bool internal)2761 Database::deleteObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
2762 {
2763     if(internal)
2764     {
2765         _internalObjects.del(txn, info.proxy->ice_getIdentity());
2766         _internalObjectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
2767     }
2768     else
2769     {
2770         _objects.del(txn, info.proxy->ice_getIdentity());
2771         _objectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
2772     }
2773 }
2774