1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <IceUtil/StringUtil.h>
6 #include <IceUtil/Random.h>
7 #include <IceUtil/Functional.h>
8 #include <Ice/LoggerUtil.h>
9 #include <Ice/Communicator.h>
10 #include <Ice/ObjectAdapter.h>
11 #include <IceGrid/Database.h>
12 #include <IceGrid/TraceLevels.h>
13 #include <IceGrid/Util.h>
14 #include <IceGrid/DescriptorHelper.h>
15 #include <IceGrid/NodeSessionI.h>
16 #include <IceGrid/ReplicaSessionI.h>
17 #include <IceGrid/Session.h>
18 #include <IceGrid/Topics.h>
19 #include <IceGrid/IceGrid.h>
20
21 #include <algorithm>
22 #include <functional>
23 #include <iterator>
24
25 using namespace std;
26 using namespace IceGrid;
27
28 typedef IceDB::ReadWriteCursor<string, ApplicationInfo, IceDB::IceContext, Ice::OutputStream> ApplicationMapRWCursor;
29 typedef IceDB::ReadOnlyCursor<string, AdapterInfo, IceDB::IceContext, Ice::OutputStream> AdapterMapROCursor;
30 typedef IceDB::Cursor<string, string, IceDB::IceContext, Ice::OutputStream> AdaptersByGroupMapCursor;
31 typedef IceDB::ReadOnlyCursor<string, Ice::Identity, IceDB::IceContext, Ice::OutputStream> ObjectsByTypeMapROCursor;
32 typedef IceDB::ReadOnlyCursor<Ice::Identity, ObjectInfo, IceDB::IceContext, Ice::OutputStream> ObjectsMapROCursor;
33
34 namespace
35 {
36
37 const string applicationsDbName = "applications";
38 const string adaptersDbName = "adapters";
39 const string adaptersByReplicaGroupIdDbName = "adaptersByReplicaGroupId";
40 const string objectsDbName = "objects";
41 const string objectsByTypeDbName = "objectsByType";
42 const string internalObjectsDbName = "internal-objects";
43 const string internalObjectsByTypeDbName = "internal-objectsByType";
44 const string serialsDbName = "serials";
45
46 struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::ObjectPrx, float>&, bool>
47 {
operator ()__anon2d983f220111::ObjectLoadCI48 bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
49 {
50 return lhs.second < rhs.second;
51 }
52 };
53
54 template<typename K, typename V, typename C, typename H> vector<V>
toVector(const IceDB::ReadOnlyTxn & txn,const IceDB::Dbi<K,V,C,H> & m)55 toVector(const IceDB::ReadOnlyTxn& txn, const IceDB::Dbi<K, V, C, H>& m)
56 {
57 vector<V> v;
58 IceDB::ReadOnlyCursor<K, V, C, H> cursor(m, txn);
59 K key;
60 V value;
61 while(cursor.get(key, value, MDB_NEXT))
62 {
63 v.push_back(value);
64 }
65 return v;
66 }
67
68 template<typename K, typename V, typename C, typename H> map<K, V>
toMap(const IceDB::Txn & txn,const IceDB::Dbi<K,V,C,H> & d)69 toMap(const IceDB::Txn& txn, const IceDB::Dbi<K, V, C, H>& d)
70 {
71 std::map<K, V> m;
72 IceDB::Cursor<K, V, C, H> cursor(d, txn);
73 K key;
74 V value;
75 while(cursor.get(key, value, MDB_NEXT))
76 {
77 typename std::map<K, V>::value_type v(key, value);
78 m.insert(v);
79 }
80 cursor.close();
81 return m;
82 }
83
84 void
logError(const Ice::CommunicatorPtr & com,const IceDB::LMDBException & ex)85 logError(const Ice::CommunicatorPtr& com, const IceDB::LMDBException& ex)
86 {
87 Ice::Error error(com->getLogger());
88 error << "LMDB error: " << ex;
89 }
90
91 void
filterAdapterInfos(const string & filter,const string & replicaGroupId,const RegistryPluginFacadeIPtr & pluginFacade,const Ice::ConnectionPtr & con,const Ice::Context & ctx,AdapterInfoSeq & infos)92 filterAdapterInfos(const string& filter,
93 const string& replicaGroupId,
94 const RegistryPluginFacadeIPtr& pluginFacade,
95 const Ice::ConnectionPtr& con,
96 const Ice::Context& ctx,
97 AdapterInfoSeq& infos)
98 {
99 if(infos.empty() || !pluginFacade->hasReplicaGroupFilters())
100 {
101 return;
102 }
103
104 vector<ReplicaGroupFilterPtr> filters = pluginFacade->getReplicaGroupFilters(filter);
105 if(filters.empty())
106 {
107 return;
108 }
109
110 Ice::StringSeq adapterIds;
111 adapterIds.reserve(infos.size());
112 for(vector<AdapterInfo>::const_iterator p = infos.begin(); p != infos.end(); ++p)
113 {
114 adapterIds.push_back(p->id);
115 }
116
117 for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
118 {
119 adapterIds = (*q)->filter(replicaGroupId, adapterIds, con, ctx);
120 }
121
122 vector<AdapterInfo> filteredAdpts;
123 filteredAdpts.reserve(infos.size());
124 for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
125 {
126 for(vector<AdapterInfo>::const_iterator r = infos.begin(); r != infos.end(); ++r)
127 {
128 if(*q == r->id)
129 {
130 filteredAdpts.push_back(*r);
131 break;
132 }
133 }
134 }
135 infos.swap(filteredAdpts);
136 }
137
138 vector<AdapterInfo>
findByReplicaGroupId(const IceDB::Txn & txn,const StringAdapterInfoMap & adapters,const StringStringMap & adaptersByGroupId,const string & name)139 findByReplicaGroupId(const IceDB::Txn& txn,
140 const StringAdapterInfoMap& adapters,
141 const StringStringMap& adaptersByGroupId,
142 const string& name)
143 {
144 vector<AdapterInfo> result;
145 AdaptersByGroupMapCursor cursor(adaptersByGroupId, txn);
146 string id;
147 if(cursor.find(name, id))
148 {
149 AdapterInfo info;
150 adapters.get(txn, id, info);
151 result.push_back(info);
152
153 string n;
154 while(cursor.get(n, id, MDB_NEXT) && n == name)
155 {
156 adapters.get(txn, id, info);
157 result.push_back(info);
158 }
159 }
160 return result;
161 }
162
163 vector<ObjectInfo>
findByType(const IceDB::ReadOnlyTxn & txn,const IdentityObjectInfoMap & objects,const StringIdentityMap & objectsByType,const string & type)164 findByType(const IceDB::ReadOnlyTxn& txn,
165 const IdentityObjectInfoMap& objects,
166 const StringIdentityMap& objectsByType,
167 const string& type)
168 {
169 vector<ObjectInfo> result;
170 ObjectsByTypeMapROCursor cursor(objectsByType, txn);
171 Ice::Identity id;
172 if(cursor.find(type, id))
173 {
174 ObjectInfo info;
175 objects.get(txn, id, info);
176 result.push_back(info);
177
178 string t;
179 while(cursor.get(t, id, MDB_NEXT) && t == type)
180 {
181 objects.get(txn, id, info);
182 result.push_back(info);
183 }
184 }
185 return result;
186 }
187
188 }
189
Database(const Ice::ObjectAdapterPtr & registryAdapter,const IceStorm::TopicManagerPrx & topicManager,const string & instanceName,const TraceLevelsPtr & traceLevels,const RegistryInfo & info,bool readonly)190 Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
191 const IceStorm::TopicManagerPrx& topicManager,
192 const string& instanceName,
193 const TraceLevelsPtr& traceLevels,
194 const RegistryInfo& info,
195 bool readonly) :
196 _communicator(registryAdapter->getCommunicator()),
197 _internalAdapter(registryAdapter),
198 _topicManager(topicManager),
199 _instanceName(instanceName),
200 _traceLevels(traceLevels),
201 _master(info.name == "Master"),
202 _readonly(readonly || !_master),
203 _replicaCache(_communicator, topicManager),
204 _nodeCache(_communicator, _replicaCache, _readonly && _master ? string("Master (read-only)") : info.name),
205 _adapterCache(_communicator),
206 _objectCache(_communicator),
207 _allocatableObjectCache(_communicator),
208 _serverCache(_communicator, _instanceName, _nodeCache, _adapterCache, _objectCache, _allocatableObjectCache),
209 _dbLock(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path") + "/icedb.lock"),
210 _env(_communicator->getProperties()->getProperty("IceGrid.Registry.LMDB.Path"), 8,
211 IceDB::getMapSize(_communicator->getProperties()->getPropertyAsInt("IceGrid.Registry.LMDB.MapSize"))),
212 _pluginFacade(RegistryPluginFacadeIPtr::dynamicCast(getRegistryPluginFacade())),
213 _lock(0)
214 {
215 IceDB::ReadWriteTxn txn(_env);
216
217 IceDB::IceContext context;
218 context.communicator = _communicator;
219 context.encoding.major = 1;
220 context.encoding.minor = 1;
221
222 _applications = StringApplicationInfoMap(txn, applicationsDbName, context, MDB_CREATE);
223
224 _adapters = StringAdapterInfoMap(txn, adaptersDbName, context, MDB_CREATE);
225 _adaptersByGroupId = StringStringMap(txn, adaptersByReplicaGroupIdDbName, context, MDB_CREATE|MDB_DUPSORT);
226
227 _objects = IdentityObjectInfoMap(txn, objectsDbName, context, MDB_CREATE);
228 _objectsByType = StringIdentityMap(txn, objectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
229
230 _internalObjects = IdentityObjectInfoMap(txn, internalObjectsDbName, context, MDB_CREATE);
231 _internalObjectsByType = StringIdentityMap(txn, internalObjectsByTypeDbName, context, MDB_CREATE|MDB_DUPSORT);
232
233 _serials = StringLongMap(txn, serialsDbName, context, MDB_CREATE);
234
235 ServerEntrySeq entries;
236
237 string k;
238 ApplicationInfo v;
239 ApplicationMapRWCursor cursor(_applications, txn);
240 while(cursor.get(k, v, MDB_NEXT))
241 {
242 try
243 {
244 load(ApplicationHelper(_communicator, v.descriptor), entries, v.uuid, v.revision);
245 }
246 catch(const DeploymentException& ex)
247 {
248 Ice::Error err(_traceLevels->logger);
249 err << "invalid application `" << k << "':\n" << ex.reason;
250 }
251 }
252
253 _serverCache.setTraceLevels(_traceLevels);
254 _nodeCache.setTraceLevels(_traceLevels);
255 _replicaCache.setTraceLevels(_traceLevels);
256 _adapterCache.setTraceLevels(_traceLevels);
257 _objectCache.setTraceLevels(_traceLevels);
258 _allocatableObjectCache.setTraceLevels(_traceLevels);
259
260 _nodeObserverTopic = new NodeObserverTopic(_topicManager, _internalAdapter);
261 _registryObserverTopic = new RegistryObserverTopic(_topicManager);
262
263 _serverCache.setNodeObserverTopic(_nodeObserverTopic);
264
265 // Set all serials to 1 if they have not yet been set.
266 Ice::Long serial;
267 if(!_serials.get(txn, applicationsDbName, serial))
268 {
269 _serials.put(txn, applicationsDbName, 1);
270 }
271 if(!_serials.get(txn, adaptersDbName, serial))
272 {
273 _serials.put(txn, adaptersDbName, 1);
274 }
275 if(!_serials.get(txn, objectsDbName, serial))
276 {
277 _serials.put(txn, objectsDbName, 1);
278 }
279
280 _applicationObserverTopic =
281 new ApplicationObserverTopic(_topicManager, toMap(txn, _applications), getSerial(txn, applicationsDbName));
282 _adapterObserverTopic =
283 new AdapterObserverTopic(_topicManager, toMap(txn, _adapters), getSerial(txn, adaptersDbName));
284 _objectObserverTopic =
285 new ObjectObserverTopic(_topicManager, toMap(txn, _objects), getSerial(txn, objectsDbName));
286
287 txn.commit();
288
289 _registryObserverTopic->registryUp(info);
290
291 _pluginFacade->setDatabase(this);
292 }
293
294 std::string
getInstanceName() const295 Database::getInstanceName() const
296 {
297 return _instanceName;
298 }
299
300 void
destroy()301 Database::destroy()
302 {
303 _pluginFacade->setDatabase(0);
304
305 _registryObserverTopic->destroy();
306 _nodeObserverTopic->destroy();
307 _applicationObserverTopic->destroy();
308 _adapterObserverTopic->destroy();
309 _objectObserverTopic->destroy();
310 }
311
312 ObserverTopicPtr
getObserverTopic(TopicName name) const313 Database::getObserverTopic(TopicName name) const
314 {
315 switch(name)
316 {
317 case RegistryObserverTopicName:
318 return _registryObserverTopic;
319 case NodeObserverTopicName:
320 return _nodeObserverTopic;
321 case ApplicationObserverTopicName:
322 return _applicationObserverTopic;
323 case AdapterObserverTopicName:
324 return _adapterObserverTopic;
325 case ObjectObserverTopicName:
326 return _objectObserverTopic;
327 default:
328 break;
329 }
330 return 0;
331 }
332
333 void
checkSessionLock(AdminSessionI * session)334 Database::checkSessionLock(AdminSessionI* session)
335 {
336 if(_lock != 0 && session != _lock)
337 {
338 throw AccessDeniedException(_lockUserId); // Lock held by another session.
339 }
340 }
341
342 int
lock(AdminSessionI * session,const string & userId)343 Database::lock(AdminSessionI* session, const string& userId)
344 {
345 Lock sync(*this);
346
347 if(_lock != 0 && session != _lock)
348 {
349 throw AccessDeniedException(_lockUserId); // Lock held by another session.
350 }
351 assert(_lock == 0 || _lock == session);
352
353 _lock = session;
354 _lockUserId = userId;
355
356 return _applicationObserverTopic->getSerial();
357 }
358
359 void
unlock(AdminSessionI * session)360 Database::unlock(AdminSessionI* session)
361 {
362 Lock sync(*this);
363 if(_lock != session)
364 {
365 throw AccessDeniedException();
366 }
367
368 _lock = 0;
369 _lockUserId.clear();
370 }
371
372 void
syncApplications(const ApplicationInfoSeq & newApplications,Ice::Long dbSerial)373 Database::syncApplications(const ApplicationInfoSeq& newApplications, Ice::Long dbSerial)
374 {
375 assert(dbSerial != 0);
376 int serial = 0;
377 {
378 Lock sync(*this);
379
380 map<string, ApplicationInfo> oldApplications;
381 try
382 {
383 IceDB::ReadWriteTxn txn(_env);
384
385 oldApplications = toMap(txn, _applications);
386 _applications.clear(txn);
387 for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
388 {
389 _applications.put(txn, p->descriptor.name, *p);
390 }
391 dbSerial = updateSerial(txn, applicationsDbName, dbSerial);
392
393 txn.commit();
394 }
395 catch(const IceDB::LMDBException& ex)
396 {
397 logError(_communicator, ex);
398 throw;
399 }
400
401 ServerEntrySeq entries;
402 set<string> names;
403
404 for(ApplicationInfoSeq::const_iterator p = newApplications.begin(); p != newApplications.end(); ++p)
405 {
406 try
407 {
408 map<string, ApplicationInfo>::const_iterator q = oldApplications.find(p->descriptor.name);
409 if(q != oldApplications.end())
410 {
411 ApplicationHelper previous(_communicator, q->second.descriptor);
412 ApplicationHelper helper(_communicator, p->descriptor);
413 reload(previous, helper, entries, p->uuid, p->revision, false);
414 }
415 else
416 {
417 load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
418 }
419 }
420 catch(const DeploymentException& ex)
421 {
422 Ice::Warning warn(_traceLevels->logger);
423 warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
424 }
425 names.insert(p->descriptor.name);
426 }
427
428 for(map<string, ApplicationInfo>::iterator s = oldApplications.begin(); s != oldApplications.end(); ++s)
429 {
430 if(names.find(s->first) == names.end())
431 {
432 unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
433 }
434 }
435
436 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
437
438 if(_traceLevels->application > 0)
439 {
440 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
441 out << "synchronized applications (serial = `" << dbSerial << "')";
442 }
443
444 serial = _applicationObserverTopic->applicationInit(dbSerial, newApplications);
445 }
446 _applicationObserverTopic->waitForSyncedSubscribers(serial);
447 }
448
449 void
syncAdapters(const AdapterInfoSeq & adapters,Ice::Long dbSerial)450 Database::syncAdapters(const AdapterInfoSeq& adapters, Ice::Long dbSerial)
451 {
452 assert(dbSerial != 0);
453 int serial = 0;
454 {
455 Lock sync(*this);
456 try
457 {
458 IceDB::ReadWriteTxn txn(_env);
459
460 _adapters.clear(txn);
461 _adaptersByGroupId.clear(txn);
462 for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
463 {
464 addAdapter(txn, *r);
465 }
466 dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
467
468 txn.commit();
469 }
470 catch(const IceDB::KeyTooLongException&)
471 {
472 throw;
473 }
474 catch(const IceDB::LMDBException& ex)
475 {
476 logError(_communicator, ex);
477 throw;
478 }
479
480 if(_traceLevels->adapter > 0)
481 {
482 Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
483 out << "synchronized adapters (serial = `" << dbSerial << "')";
484 }
485
486 serial = _adapterObserverTopic->adapterInit(dbSerial, adapters);
487 }
488 _adapterObserverTopic->waitForSyncedSubscribers(serial);
489 }
490
491 void
syncObjects(const ObjectInfoSeq & objects,Ice::Long dbSerial)492 Database::syncObjects(const ObjectInfoSeq& objects, Ice::Long dbSerial)
493 {
494 assert(dbSerial != 0);
495 int serial = 0;
496 {
497 Lock sync(*this);
498 try
499 {
500 IceDB::ReadWriteTxn txn(_env);
501
502 _objects.clear(txn);
503 _objectsByType.clear(txn);
504 for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
505 {
506 addObject(txn, *q, false);
507 }
508 dbSerial = updateSerial(txn, objectsDbName, dbSerial);
509
510 txn.commit();
511 }
512 catch(const IceDB::LMDBException& ex)
513 {
514 logError(_communicator, ex);
515 throw;
516 }
517
518 if(_traceLevels->object > 0)
519 {
520 Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
521 out << "synchronized objects (serial = `" << dbSerial << "')";
522 }
523
524 serial = _objectObserverTopic->objectInit(dbSerial, objects);
525 }
526 _objectObserverTopic->waitForSyncedSubscribers(serial);
527 }
528
529 ApplicationInfoSeq
getApplications(Ice::Long & serial)530 Database::getApplications(Ice::Long& serial)
531 {
532 try
533 {
534 IceDB::ReadOnlyTxn txn(_env);
535
536 serial = getSerial(txn, applicationsDbName);
537 return toVector(txn, _applications);
538 }
539 catch(const IceDB::LMDBException& ex)
540 {
541 logError(_communicator, ex);
542 throw;
543 }
544 }
545
546 AdapterInfoSeq
getAdapters(Ice::Long & serial)547 Database::getAdapters(Ice::Long& serial)
548 {
549 try
550 {
551 IceDB::ReadOnlyTxn txn(_env);
552
553 serial = getSerial(txn, adaptersDbName);
554 return toVector(txn, _adapters);
555 }
556 catch(const IceDB::LMDBException& ex)
557 {
558 logError(_communicator, ex);
559 throw;
560 }
561 }
562
563 ObjectInfoSeq
getObjects(Ice::Long & serial)564 Database::getObjects(Ice::Long& serial)
565 {
566 try
567 {
568 IceDB::ReadOnlyTxn txn(_env);
569
570 serial = getSerial(txn, objectsDbName);
571 return toVector(txn, _objects);
572 }
573 catch(const IceDB::LMDBException& ex)
574 {
575 logError(_communicator, ex);
576 throw;
577 }
578 }
579
580 StringLongDict
getSerials() const581 Database::getSerials() const
582 {
583 IceDB::ReadOnlyTxn txn(_env);
584 return toMap(txn, _serials);
585 }
586
587 void
addApplication(const ApplicationInfo & info,AdminSessionI * session,Ice::Long dbSerial)588 Database::addApplication(const ApplicationInfo& info, AdminSessionI* session, Ice::Long dbSerial)
589 {
590 assert(dbSerial != 0 || _master);
591
592 int serial = 0; // Initialize to prevent warning.
593 ServerEntrySeq entries;
594 try
595 {
596 Lock sync(*this);
597 checkSessionLock(session);
598
599 waitForUpdate(info.descriptor.name);
600
601 IceDB::ReadWriteTxn txn(_env);
602
603 if(_applications.find(txn, info.descriptor.name))
604 {
605 throw DeploymentException("application `" + info.descriptor.name + "' already exists");
606 }
607
608 ApplicationHelper helper(_communicator, info.descriptor, true);
609 checkForAddition(helper, txn);
610 dbSerial = saveApplication(info, txn, dbSerial);
611
612 txn.commit();
613
614 load(helper, entries, info.uuid, info.revision);
615 startUpdating(info.descriptor.name, info.uuid, info.revision);
616
617 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
618 serial = _applicationObserverTopic->applicationAdded(dbSerial, info);
619 }
620 catch(const IceDB::KeyTooLongException& ex)
621 {
622 throw DeploymentException("application name `" + info.descriptor.name + "' is too long: " + ex.what());
623 }
624 catch(const IceDB::LMDBException& ex)
625 {
626 logError(_communicator, ex);
627 throw;
628 }
629
630 _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
631
632 //
633 // Mark the application as updated. All the replicas received the update so it's now safe
634 // for the nodes to start the servers.
635 //
636 {
637 Lock sync(*this);
638 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), info.descriptor.name);
639 assert(p != _updating.end());
640 p->markUpdated();
641 }
642
643 if(_master)
644 {
645 try
646 {
647 for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
648 {
649 try
650 {
651 (*p)->waitForSync();
652 }
653 catch(const NodeUnreachableException&)
654 {
655 // Ignore.
656 }
657 }
658 }
659 catch(const DeploymentException&)
660 {
661 try
662 {
663 Lock sync(*this);
664 entries.clear();
665 unload(ApplicationHelper(_communicator, info.descriptor), entries);
666
667 IceDB::ReadWriteTxn txn(_env);
668 dbSerial = removeApplication(info.descriptor.name, txn);
669 txn.commit();
670
671 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
672 serial = _applicationObserverTopic->applicationRemoved(dbSerial, info.descriptor.name);
673 }
674 catch(const DeploymentException& ex)
675 {
676 Ice::Error err(_traceLevels->logger);
677 err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
678 }
679 catch(const IceDB::LMDBException& ex)
680 {
681 logError(_communicator, ex);
682 }
683
684 _applicationObserverTopic->waitForSyncedSubscribers(serial);
685 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
686 finishUpdating(info.descriptor.name);
687 throw;
688 }
689 }
690
691 if(_traceLevels->application > 0)
692 {
693 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
694 out << "added application `" << info.descriptor.name << "' (serial = `" << dbSerial << "')";
695 }
696 finishUpdating(info.descriptor.name);
697 }
698
699 void
updateApplication(const ApplicationUpdateInfo & updt,bool noRestart,AdminSessionI * session,Ice::Long dbSerial)700 Database::updateApplication(const ApplicationUpdateInfo& updt, bool noRestart, AdminSessionI* session,
701 Ice::Long dbSerial)
702 {
703 assert(dbSerial != 0 || _master);
704
705 ApplicationInfo oldApp;
706 ApplicationUpdateInfo update = updt;
707 IceInternal::UniquePtr<ApplicationHelper> previous;
708 IceInternal::UniquePtr<ApplicationHelper> helper;
709 try
710 {
711 Lock sync(*this);
712 checkSessionLock(session);
713
714 waitForUpdate(update.descriptor.name);
715
716 IceDB::ReadOnlyTxn txn(_env);
717
718 if(!_applications.get(txn, update.descriptor.name, oldApp))
719 {
720 throw ApplicationNotExistException(update.descriptor.name);
721 }
722
723 if(update.revision < 0)
724 {
725 update.revision = oldApp.revision + 1;
726 }
727
728 previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
729 helper.reset(new ApplicationHelper(_communicator, previous->update(update.descriptor), true));
730
731 startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
732 }
733 catch(const IceDB::LMDBException& ex)
734 {
735 logError(_communicator, ex);
736 throw;
737 }
738
739 finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart, dbSerial);
740 }
741
742 void
syncApplicationDescriptor(const ApplicationDescriptor & newDesc,bool noRestart,AdminSessionI * session)743 Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, bool noRestart, AdminSessionI* session)
744 {
745 assert(_master);
746
747 ApplicationUpdateInfo update;
748 ApplicationInfo oldApp;
749 IceInternal::UniquePtr<ApplicationHelper> previous;
750 IceInternal::UniquePtr<ApplicationHelper> helper;
751 try
752 {
753 Lock sync(*this);
754 checkSessionLock(session);
755
756 waitForUpdate(newDesc.name);
757
758 IceDB::ReadOnlyTxn txn(_env);
759
760 if(!_applications.get(txn, newDesc.name, oldApp))
761 {
762 throw ApplicationNotExistException(newDesc.name);
763 }
764
765 previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
766 helper.reset(new ApplicationHelper(_communicator, newDesc, true));
767
768 update.updateTime = IceUtil::Time::now().toMilliSeconds();
769 update.updateUser = _lockUserId;
770 update.revision = oldApp.revision + 1;
771 update.descriptor = helper->diff(*previous);
772
773 startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
774 }
775 catch(const IceDB::LMDBException& ex)
776 {
777 logError(_communicator, ex);
778 throw;
779 }
780
781 finishApplicationUpdate(update, oldApp, *previous, *helper, session, noRestart);
782 }
783
784 void
instantiateServer(const string & application,const string & node,const ServerInstanceDescriptor & instance,AdminSessionI * session)785 Database::instantiateServer(const string& application,
786 const string& node,
787 const ServerInstanceDescriptor& instance,
788 AdminSessionI* session)
789 {
790 assert(_master);
791
792 ApplicationUpdateInfo update;
793 ApplicationInfo oldApp;
794 IceInternal::UniquePtr<ApplicationHelper> previous;
795 IceInternal::UniquePtr<ApplicationHelper> helper;
796
797 try
798 {
799 Lock sync(*this);
800 checkSessionLock(session);
801
802 waitForUpdate(application);
803
804 IceDB::ReadOnlyTxn txn(_env);
805
806 if(!_applications.get(txn, application, oldApp))
807 {
808 throw ApplicationNotExistException(application);
809 }
810
811 previous.reset(new ApplicationHelper(_communicator, oldApp.descriptor));
812 helper.reset(new ApplicationHelper(_communicator, previous->instantiateServer(node, instance), true));
813
814 update.updateTime = IceUtil::Time::now().toMilliSeconds();
815 update.updateUser = _lockUserId;
816 update.revision = oldApp.revision + 1;
817 update.descriptor = helper->diff(*previous);
818
819 startUpdating(update.descriptor.name, oldApp.uuid, oldApp.revision + 1);
820 }
821 catch(const IceDB::LMDBException& ex)
822 {
823 logError(_communicator, ex);
824 throw;
825 }
826
827 finishApplicationUpdate(update, oldApp, *previous, *helper, session, true);
828 }
829
830 void
removeApplication(const string & name,AdminSessionI * session,Ice::Long dbSerial)831 Database::removeApplication(const string& name, AdminSessionI* session, Ice::Long dbSerial)
832 {
833 assert(dbSerial != 0 || _master);
834 ServerEntrySeq entries;
835
836 int serial = 0; // Initialize to prevent warning.
837 try
838 {
839 Lock sync(*this);
840 checkSessionLock(session);
841
842 waitForUpdate(name);
843
844 ApplicationInfo appInfo;
845
846 IceDB::ReadWriteTxn txn(_env);
847
848 if(!_applications.get(txn, name, appInfo))
849 {
850 throw ApplicationNotExistException(name);
851 }
852
853 bool init = false;
854 try
855 {
856 ApplicationHelper helper(_communicator, appInfo.descriptor);
857 init = true;
858 checkForRemove(helper);
859 unload(helper, entries);
860 }
861 catch(const DeploymentException&)
862 {
863 if(init)
864 {
865 throw;
866 }
867 }
868 dbSerial = removeApplication(name, txn, dbSerial);
869
870 txn.commit();
871
872 startUpdating(name, appInfo.uuid, appInfo.revision);
873
874 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
875 serial = _applicationObserverTopic->applicationRemoved(dbSerial, name);
876 }
877 catch(const IceDB::LMDBException& ex)
878 {
879 logError(_communicator, ex);
880 throw;
881 }
882
883 _applicationObserverTopic->waitForSyncedSubscribers(serial);
884
885 if(_master)
886 {
887 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
888 }
889
890 if(_traceLevels->application > 0)
891 {
892 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
893 out << "removed application `" << name << "' (serial = `" << dbSerial << "')";
894 }
895
896 finishUpdating(name);
897 }
898
899 ApplicationInfo
getApplicationInfo(const std::string & name)900 Database::getApplicationInfo(const std::string& name)
901 {
902 IceDB::ReadOnlyTxn txn(_env);
903
904 ApplicationInfo info;
905 if(!_applications.get(txn, name, info))
906 {
907 throw ApplicationNotExistException(name);
908 }
909 return info;
910 }
911
912 Ice::StringSeq
getAllApplications(const string & expression)913 Database::getAllApplications(const string& expression)
914 {
915 IceDB::ReadOnlyTxn txn(_env);
916 return getMatchingKeys<map<string, ApplicationInfo> >(toMap(txn, _applications), expression);
917 }
918
919 void
waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr & cb,const string & uuid,int revision)920 Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
921 const string& uuid,
922 int revision)
923 {
924 Lock sync(*this);
925
926 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), make_pair(uuid, revision));
927 if(p != _updating.end() && !p->updated)
928 {
929 p->cbs.push_back(cb);
930 }
931 else
932 {
933 cb->ice_response();
934 }
935 }
936
937 NodeCache&
getNodeCache()938 Database::getNodeCache()
939 {
940 return _nodeCache;
941 }
942
943 NodeEntryPtr
getNode(const string & name,bool create) const944 Database::getNode(const string& name, bool create) const
945 {
946 return _nodeCache.get(name, create);
947 }
948
949 ReplicaCache&
getReplicaCache()950 Database::getReplicaCache()
951 {
952 return _replicaCache;
953 }
954
955 ReplicaEntryPtr
getReplica(const string & name) const956 Database::getReplica(const string& name) const
957 {
958 return _replicaCache.get(name);
959 }
960
961 ServerCache&
getServerCache()962 Database::getServerCache()
963 {
964 return _serverCache;
965 }
966
967 ServerEntryPtr
getServer(const string & id) const968 Database::getServer(const string& id) const
969 {
970 return _serverCache.get(id);
971 }
972
973 AllocatableObjectCache&
getAllocatableObjectCache()974 Database::getAllocatableObjectCache()
975 {
976 return _allocatableObjectCache;
977 }
978
979 AllocatableObjectEntryPtr
getAllocatableObject(const Ice::Identity & id) const980 Database::getAllocatableObject(const Ice::Identity& id) const
981 {
982 return _allocatableObjectCache.get(id);
983 }
984
985 void
setAdapterDirectProxy(const string & adapterId,const string & replicaGroupId,const Ice::ObjectPrx & proxy,Ice::Long dbSerial)986 Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy,
987 Ice::Long dbSerial)
988 {
989 assert(dbSerial != 0 || _master);
990
991 int serial = 0; // Initialize to prevent warning.
992 {
993 Lock sync(*this);
994 if(_adapterCache.has(adapterId))
995 {
996 throw AdapterExistsException(adapterId);
997 }
998 if(_adapterCache.has(replicaGroupId))
999 {
1000 throw DeploymentException("registering adapter `" + adapterId + "' with the replica group `" +
1001 replicaGroupId + "' is not allowed:\nthe replica group was added with an "
1002 "application descriptor and only adapters specified in an application descriptor "
1003 "can be member of this replica group");
1004 }
1005
1006 AdapterInfo info;
1007 info.id = adapterId;
1008 info.proxy = proxy;
1009 info.replicaGroupId = replicaGroupId;
1010
1011 bool updated = false;
1012 try
1013 {
1014 IceDB::ReadWriteTxn txn(_env);
1015
1016 AdapterInfo oldInfo;
1017 bool found = _adapters.get(txn, adapterId, oldInfo);
1018 if(proxy)
1019 {
1020 updated = found;
1021
1022 if(replicaGroupId != oldInfo.replicaGroupId)
1023 {
1024 _adaptersByGroupId.del(txn, oldInfo.replicaGroupId, adapterId);
1025 }
1026 addAdapter(txn, info);
1027 }
1028 else
1029 {
1030 if(!found)
1031 {
1032 return;
1033 }
1034 deleteAdapter(txn, oldInfo);
1035 }
1036 dbSerial = updateSerial(txn, adaptersDbName, dbSerial);
1037
1038 txn.commit();
1039 }
1040 catch(const IceDB::KeyTooLongException&)
1041 {
1042 throw;
1043 }
1044 catch(const IceDB::LMDBException& ex)
1045 {
1046 logError(_communicator, ex);
1047 throw;
1048 }
1049
1050 if(_traceLevels->adapter > 0)
1051 {
1052 Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
1053 out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
1054 if(!replicaGroupId.empty())
1055 {
1056 out << " with replica group `" << replicaGroupId << "'";
1057 }
1058 out << " (serial = `" << dbSerial << "')";
1059 }
1060
1061 if(proxy)
1062 {
1063 if(updated)
1064 {
1065 serial = _adapterObserverTopic->adapterUpdated(dbSerial, info);
1066 }
1067 else
1068 {
1069 serial = _adapterObserverTopic->adapterAdded(dbSerial, info);
1070 }
1071 }
1072 else
1073 {
1074 serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
1075 }
1076 }
1077 _adapterObserverTopic->waitForSyncedSubscribers(serial);
1078 }
1079
1080 Ice::ObjectPrx
getAdapterDirectProxy(const string & id,const Ice::EncodingVersion & encoding,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1081 Database::getAdapterDirectProxy(const string& id, const Ice::EncodingVersion& encoding, const Ice::ConnectionPtr& con,
1082 const Ice::Context& ctx)
1083 {
1084 IceDB::ReadOnlyTxn txn(_env);
1085
1086 AdapterInfo info;
1087 if(_adapters.get(txn, id, info))
1088 {
1089 return info.proxy;
1090 }
1091
1092 Ice::EndpointSeq endpoints;
1093 vector<AdapterInfo> infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1094 if(infos.empty())
1095 {
1096 throw AdapterNotExistException(id);
1097 }
1098
1099 filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
1100 for(unsigned int i = 0; i < infos.size(); ++i)
1101 {
1102 if(IceInternal::isSupported(encoding, infos[i].proxy->ice_getEncodingVersion()))
1103 {
1104 Ice::EndpointSeq edpts = infos[i].proxy->ice_getEndpoints();
1105 endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
1106 }
1107 }
1108 if(!endpoints.empty())
1109 {
1110 return _communicator->stringToProxy("dummy:default")->ice_endpoints(endpoints);
1111 }
1112 return 0;
1113 }
1114
1115 void
removeAdapter(const string & adapterId)1116 Database::removeAdapter(const string& adapterId)
1117 {
1118 assert(_master);
1119
1120 int serial = 0; // Initialize to prevent warning.
1121 {
1122 Lock sync(*this);
1123 if(_adapterCache.has(adapterId))
1124 {
1125 AdapterEntryPtr adpt = _adapterCache.get(adapterId);
1126 throw DeploymentException("removing adapter `" + adapterId + "' is not allowed:\n" +
1127 "the adapter was added with the application descriptor `" +
1128 adpt->getApplication() + "'");
1129 }
1130
1131 AdapterInfoSeq infos;
1132 Ice::Long dbSerial = 0;
1133 try
1134 {
1135 IceDB::ReadWriteTxn txn(_env);
1136
1137 AdapterInfo info;
1138 if(_adapters.get(txn, adapterId, info))
1139 {
1140 deleteAdapter(txn, info);
1141 }
1142 else
1143 {
1144 infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, adapterId);
1145 if(infos.empty())
1146 {
1147 throw AdapterNotExistException(adapterId);
1148 }
1149 for(AdapterInfoSeq::iterator p = infos.begin(); p != infos.end(); ++p)
1150 {
1151 _adaptersByGroupId.del(txn, p->replicaGroupId, p->id);
1152 p->replicaGroupId.clear();
1153 addAdapter(txn, *p);
1154 }
1155 }
1156 dbSerial = updateSerial(txn, adaptersDbName);
1157
1158 txn.commit();
1159 }
1160 catch(const IceDB::KeyTooLongException&)
1161 {
1162 throw;
1163 }
1164 catch(const IceDB::LMDBException& ex)
1165 {
1166 logError(_communicator, ex);
1167 throw;
1168 }
1169
1170 if(_traceLevels->adapter > 0)
1171 {
1172 Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
1173 out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "' (serial = `" << dbSerial << "')";
1174 }
1175
1176 if(infos.empty())
1177 {
1178 serial = _adapterObserverTopic->adapterRemoved(dbSerial, adapterId);
1179 }
1180 else
1181 {
1182 for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
1183 {
1184 serial = _adapterObserverTopic->adapterUpdated(dbSerial, *p);
1185 }
1186 }
1187 }
1188 _adapterObserverTopic->waitForSyncedSubscribers(serial);
1189 }
1190
1191 AdapterPrx
getAdapterProxy(const string & adapterId,const string & replicaGroupId,bool upToDate)1192 Database::getAdapterProxy(const string& adapterId, const string& replicaGroupId, bool upToDate)
1193 {
1194 Lock sync(*this); // Make sure this isn't call during an update.
1195 return _adapterCache.get(adapterId)->getProxy(replicaGroupId, upToDate);
1196 }
1197
1198 void
getLocatorAdapterInfo(const string & id,const Ice::ConnectionPtr & connection,const Ice::Context & context,LocatorAdapterInfoSeq & adpts,int & count,bool & replicaGroup,bool & roundRobin,const set<string> & excludes)1199 Database::getLocatorAdapterInfo(const string& id,
1200 const Ice::ConnectionPtr& connection,
1201 const Ice::Context& context,
1202 LocatorAdapterInfoSeq& adpts,
1203 int& count,
1204 bool& replicaGroup,
1205 bool& roundRobin,
1206 const set<string>& excludes)
1207 {
1208 string filter;
1209 {
1210 Lock sync(*this); // Make sure this isn't call during an update.
1211 _adapterCache.get(id)->getLocatorAdapterInfo(adpts, count, replicaGroup, roundRobin, filter, excludes);
1212 }
1213
1214 if(_pluginFacade->hasReplicaGroupFilters() && !adpts.empty())
1215 {
1216 vector<ReplicaGroupFilterPtr> filters = _pluginFacade->getReplicaGroupFilters(filter);
1217 if(!filters.empty())
1218 {
1219 Ice::StringSeq adapterIds;
1220 for(LocatorAdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
1221 {
1222 adapterIds.push_back(q->id);
1223 }
1224
1225 for(vector<ReplicaGroupFilterPtr>::const_iterator q = filters.begin(); q != filters.end(); ++q)
1226 {
1227 adapterIds = (*q)->filter(id, adapterIds, connection, context);
1228 }
1229
1230 LocatorAdapterInfoSeq filteredAdpts;
1231 filteredAdpts.reserve(adpts.size());
1232 for(Ice::StringSeq::const_iterator q = adapterIds.begin(); q != adapterIds.end(); ++q)
1233 {
1234 for(LocatorAdapterInfoSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
1235 {
1236 if(*q == r->id)
1237 {
1238 filteredAdpts.push_back(*r);
1239 break;
1240 }
1241 }
1242 }
1243 adpts.swap(filteredAdpts);
1244 }
1245 }
1246 }
1247
1248 bool
addAdapterSyncCallback(const string & id,const SynchronizationCallbackPtr & callback,const std::set<std::string> & excludes)1249 Database::addAdapterSyncCallback(const string& id,
1250 const SynchronizationCallbackPtr& callback,
1251 const std::set<std::string>& excludes)
1252 {
1253 Lock sync(*this); // Make sure this isn't call during an update.
1254 return _adapterCache.get(id)->addSyncCallback(callback, excludes);
1255 }
1256
1257 AdapterInfoSeq
getAdapterInfo(const string & id)1258 Database::getAdapterInfo(const string& id)
1259 {
1260 //
1261 // First we check if the given adapter id is associated to a
1262 // server, if that's the case we get the adapter proxy from the
1263 // server.
1264 //
1265 try
1266 {
1267 Lock sync(*this); // Make sure this isn't call during an update.
1268 return _adapterCache.get(id)->getAdapterInfo();
1269 }
1270 catch(const AdapterNotExistException&)
1271 {
1272 }
1273
1274 //
1275 // Otherwise, we check the adapter endpoint table -- if there's an
1276 // entry the adapter is managed by the registry itself.
1277 //
1278 IceDB::ReadOnlyTxn txn(_env);
1279
1280 AdapterInfo info;
1281 AdapterInfoSeq infos;
1282 if(_adapters.get(txn, id, info))
1283 {
1284 infos.push_back(info);
1285 }
1286 else
1287 {
1288 //
1289 // If it's not a regular object adapter, perhaps it's a replica
1290 // group...
1291 //
1292 infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1293 if(infos.empty())
1294 {
1295 throw AdapterNotExistException(id);
1296 }
1297 }
1298 return infos;
1299 }
1300
1301 AdapterInfoSeq
getFilteredAdapterInfo(const string & id,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1302 Database::getFilteredAdapterInfo(const string& id, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1303 {
1304 //
1305 // First we check if the given adapter id is associated to a
1306 // server, if that's the case we get the adapter proxy from the
1307 // server.
1308 //
1309 try
1310 {
1311 AdapterInfoSeq infos;
1312 ReplicaGroupEntryPtr replicaGroup;
1313 {
1314 Lock sync(*this); // Make sure this isn't call during an update.
1315
1316 AdapterEntryPtr entry = _adapterCache.get(id);
1317 infos = entry->getAdapterInfo();
1318 replicaGroup = ReplicaGroupEntryPtr::dynamicCast(entry);
1319 }
1320 if(replicaGroup)
1321 {
1322 filterAdapterInfos(replicaGroup->getFilter(), id, _pluginFacade, con, ctx, infos);
1323 }
1324 return infos;
1325 }
1326 catch(const AdapterNotExistException&)
1327 {
1328 }
1329
1330 //
1331 // Otherwise, we check the adapter endpoint table -- if there's an
1332 // entry the adapter is managed by the registry itself.
1333 //
1334 IceDB::ReadOnlyTxn txn(_env);
1335
1336 AdapterInfo info;
1337 AdapterInfoSeq infos;
1338 if(_adapters.get(txn, id, info))
1339 {
1340 infos.push_back(info);
1341 }
1342 else
1343 {
1344 //
1345 // If it's not a regular object adapter, perhaps it's a replica
1346 // group...
1347 //
1348 infos = findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id);
1349 if(infos.empty())
1350 {
1351 throw AdapterNotExistException(id);
1352 }
1353 filterAdapterInfos("", id, _pluginFacade, con, ctx, infos);
1354 }
1355 return infos;
1356 }
1357
1358 string
getAdapterServer(const string & id) const1359 Database::getAdapterServer(const string& id) const
1360 {
1361 try
1362 {
1363 Lock sync(*this); // Make sure this isn't call during an update.
1364 ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
1365 if(adapter)
1366 {
1367 return adapter->getServerId();
1368 }
1369 }
1370 catch(const AdapterNotExistException&)
1371 {
1372 }
1373 return "";
1374 }
1375
1376 string
getAdapterApplication(const string & id) const1377 Database::getAdapterApplication(const string& id) const
1378 {
1379 try
1380 {
1381 Lock sync(*this); // Make sure this isn't call during an update.
1382 return _adapterCache.get(id)->getApplication();
1383 }
1384 catch(const AdapterNotExistException&)
1385 {
1386 }
1387 return "";
1388 }
1389
1390 string
getAdapterNode(const string & id) const1391 Database::getAdapterNode(const string& id) const
1392 {
1393 try
1394 {
1395 Lock sync(*this); // Make sure this isn't call during an update.
1396 ServerAdapterEntryPtr adapter = ServerAdapterEntryPtr::dynamicCast(_adapterCache.get(id));
1397 if(adapter)
1398 {
1399 return adapter->getNodeName();
1400 }
1401 }
1402 catch(const AdapterNotExistException&)
1403 {
1404 }
1405 return "";
1406 }
1407
1408 Ice::StringSeq
getAllAdapters(const string & expression)1409 Database::getAllAdapters(const string& expression)
1410 {
1411 Lock sync(*this);
1412 vector<string> result;
1413 vector<string> ids = _adapterCache.getAll(expression);
1414 result.swap(ids);
1415 set<string> groups;
1416
1417 IceDB::ReadOnlyTxn txn(_env);
1418
1419 string name;
1420 AdapterInfo info;
1421 AdapterMapROCursor cursor(_adapters, txn);
1422 while(cursor.get(name, info, MDB_NEXT))
1423 {
1424 if(expression.empty() || IceUtilInternal::match(name, expression, true))
1425 {
1426 result.push_back(name);
1427 }
1428 string replicaGroupId = info.replicaGroupId;
1429 if(!replicaGroupId.empty() && (expression.empty() || IceUtilInternal::match(replicaGroupId, expression, true)))
1430 {
1431 groups.insert(replicaGroupId);
1432 }
1433 }
1434 cursor.close();
1435
1436 //
1437 // COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun.
1438 //
1439 //result.insert(result.end(), groups.begin(), groups.end())
1440 for(set<string>::const_iterator q = groups.begin(); q != groups.end(); ++q)
1441 {
1442 result.push_back(*q);
1443 }
1444 return result;
1445 }
1446
1447 void
addObject(const ObjectInfo & info)1448 Database::addObject(const ObjectInfo& info)
1449 {
1450 assert(_master);
1451
1452 int serial = 0;
1453 {
1454 Lock sync(*this);
1455 const Ice::Identity id = info.proxy->ice_getIdentity();
1456
1457 if(_objectCache.has(id))
1458 {
1459 throw ObjectExistsException(id);
1460 }
1461
1462 Ice::Long dbSerial = 0;
1463 try
1464 {
1465 IceDB::ReadWriteTxn txn(_env);
1466
1467 if(_objects.find(txn, id))
1468 {
1469 throw ObjectExistsException(id);
1470 }
1471 addObject(txn, info, false);
1472 dbSerial = updateSerial(txn, objectsDbName);
1473
1474 txn.commit();
1475 }
1476 catch(const IceDB::LMDBException& ex)
1477 {
1478 logError(_communicator, ex);
1479 throw;
1480 }
1481
1482 serial = _objectObserverTopic->objectAdded(dbSerial, info);
1483
1484 if(_traceLevels->object > 0)
1485 {
1486 Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1487 out << "added object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1488 }
1489 }
1490 _objectObserverTopic->waitForSyncedSubscribers(serial);
1491 }
1492
1493 void
addOrUpdateObject(const ObjectInfo & info,Ice::Long dbSerial)1494 Database::addOrUpdateObject(const ObjectInfo& info, Ice::Long dbSerial)
1495 {
1496 assert(dbSerial != 0 || _master);
1497
1498 int serial = 0; // Initialize to prevent warning.
1499 {
1500 Lock sync(*this);
1501 const Ice::Identity id = info.proxy->ice_getIdentity();
1502
1503 if(_objectCache.has(id))
1504 {
1505 throw ObjectExistsException(id);
1506 }
1507
1508 bool update = false;
1509 try
1510 {
1511 IceDB::ReadWriteTxn txn(_env);
1512
1513 Ice::Identity k;
1514 ObjectInfo v;
1515 update = _objects.get(txn, k, v);
1516 if(update)
1517 {
1518 _objectsByType.del(txn, v.type, v.proxy->ice_getIdentity());
1519 }
1520 addObject(txn, info, false);
1521 dbSerial = updateSerial(txn, objectsDbName, dbSerial);
1522
1523 txn.commit();
1524 }
1525 catch(const IceDB::LMDBException& ex)
1526 {
1527 logError(_communicator, ex);
1528 throw;
1529 }
1530
1531 if(update)
1532 {
1533 serial = _objectObserverTopic->objectUpdated(dbSerial, info);
1534 }
1535 else
1536 {
1537 serial = _objectObserverTopic->objectAdded(dbSerial, info);
1538 }
1539
1540 if(_traceLevels->object > 0)
1541 {
1542 Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1543 out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1544 }
1545 }
1546 _objectObserverTopic->waitForSyncedSubscribers(serial);
1547 }
1548
1549 void
removeObject(const Ice::Identity & id,Ice::Long dbSerial)1550 Database::removeObject(const Ice::Identity& id, Ice::Long dbSerial)
1551 {
1552 assert(dbSerial != 0 || _master);
1553
1554 int serial = 0; // Initialize to prevent warning.
1555 {
1556 Lock sync(*this);
1557 if(_objectCache.has(id))
1558 {
1559 throw DeploymentException("removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"
1560 + "the object was added with the application descriptor `" +
1561 _objectCache.get(id)->getApplication());
1562 }
1563
1564 try
1565 {
1566 IceDB::ReadWriteTxn txn(_env);
1567
1568 ObjectInfo info;
1569 if(!_objects.get(txn, id, info))
1570 {
1571 throw ObjectNotRegisteredException(id);
1572 }
1573 deleteObject(txn, info, false);
1574 dbSerial = updateSerial(txn, objectsDbName, dbSerial);
1575
1576 txn.commit();
1577 }
1578 catch(const IceDB::LMDBException& ex)
1579 {
1580 logError(_communicator, ex);
1581 throw;
1582 }
1583
1584 serial = _objectObserverTopic->objectRemoved(dbSerial, id);
1585
1586 if(_traceLevels->object > 0)
1587 {
1588 Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1589 out << "removed object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1590 }
1591 }
1592 _objectObserverTopic->waitForSyncedSubscribers(serial);
1593 }
1594
1595 void
updateObject(const Ice::ObjectPrx & proxy)1596 Database::updateObject(const Ice::ObjectPrx& proxy)
1597 {
1598 assert(_master);
1599
1600 int serial = 0;
1601 {
1602 Lock sync(*this);
1603
1604 const Ice::Identity id = proxy->ice_getIdentity();
1605 if(_objectCache.has(id))
1606 {
1607 throw DeploymentException("updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"
1608 + "the object was added with the application descriptor `" +
1609 _objectCache.get(id)->getApplication() + "'");
1610 }
1611
1612 ObjectInfo info;
1613 Ice::Long dbSerial = 0;
1614 try
1615 {
1616 IceDB::ReadWriteTxn txn(_env);
1617
1618 if(!_objects.get(txn, id, info))
1619 {
1620 throw ObjectNotRegisteredException(id);
1621 }
1622 info.proxy = proxy;
1623 addObject(txn, info, false);
1624 dbSerial = updateSerial(txn, objectsDbName);
1625
1626 txn.commit();
1627 }
1628 catch(const IceDB::LMDBException& ex)
1629 {
1630 logError(_communicator, ex);
1631 throw;
1632 }
1633
1634 serial = _objectObserverTopic->objectUpdated(dbSerial, info);
1635 if(_traceLevels->object > 0)
1636 {
1637 Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
1638 out << "updated object `" << _communicator->identityToString(id) << "' (serial = `" << dbSerial << "')";
1639 }
1640 }
1641 _objectObserverTopic->waitForSyncedSubscribers(serial);
1642 }
1643
1644 int
addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq & objects)1645 Database::addOrUpdateRegistryWellKnownObjects(const ObjectInfoSeq& objects)
1646 {
1647 Lock sync(*this);
1648 try
1649 {
1650 IceDB::ReadWriteTxn txn(_env);
1651 for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
1652 {
1653 Ice::Identity id = p->proxy->ice_getIdentity();
1654 ObjectInfo info;
1655 if(_objects.get(txn, id, info))
1656 {
1657 _objectsByType.del(txn, info.type, id);
1658 }
1659 addObject(txn, *p, false);
1660 }
1661 txn.commit();
1662 }
1663 catch(const IceDB::LMDBException& ex)
1664 {
1665 logError(_communicator, ex);
1666 throw;
1667 }
1668
1669 return _objectObserverTopic->wellKnownObjectsAddedOrUpdated(objects);
1670 }
1671
1672 int
removeRegistryWellKnownObjects(const ObjectInfoSeq & objects)1673 Database::removeRegistryWellKnownObjects(const ObjectInfoSeq& objects)
1674 {
1675 Lock sync(*this);
1676 try
1677 {
1678 IceDB::ReadWriteTxn txn(_env);
1679 for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
1680 {
1681 Ice::Identity id = p->proxy->ice_getIdentity();
1682 ObjectInfo info;
1683 if(_objects.get(txn, id, info))
1684 {
1685 deleteObject(txn, info, false);
1686 }
1687 }
1688 txn.commit();
1689 }
1690 catch(const IceDB::LMDBException& ex)
1691 {
1692 logError(_communicator, ex);
1693 throw;
1694 }
1695
1696 return _objectObserverTopic->wellKnownObjectsRemoved(objects);
1697 }
1698
1699 Ice::ObjectPrx
getObjectProxy(const Ice::Identity & id)1700 Database::getObjectProxy(const Ice::Identity& id)
1701 {
1702 try
1703 {
1704 //
1705 // Only return proxies for non allocatable objects.
1706 //
1707 return _objectCache.get(id)->getProxy();
1708 }
1709 catch(const ObjectNotRegisteredException&)
1710 {
1711 }
1712
1713 IceDB::ReadOnlyTxn txn(_env);
1714 ObjectInfo info;
1715 if(!_objects.get(txn, id, info))
1716 {
1717 throw ObjectNotRegisteredException(id);
1718 }
1719 return info.proxy;
1720 }
1721
1722 Ice::ObjectPrx
getObjectByType(const string & type,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1723 Database::getObjectByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1724 {
1725 Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
1726 if(objs.empty())
1727 {
1728 return 0;
1729 }
1730 return objs[IceUtilInternal::random(static_cast<int>(objs.size()))];
1731 }
1732
1733 Ice::ObjectPrx
getObjectByTypeOnLeastLoadedNode(const string & type,LoadSample sample,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1734 Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample, const Ice::ConnectionPtr& con,
1735 const Ice::Context& ctx)
1736 {
1737 Ice::ObjectProxySeq objs = getObjectsByType(type, con, ctx);
1738 if(objs.empty())
1739 {
1740 return 0;
1741 }
1742
1743 IceUtilInternal::shuffle(objs.begin(), objs.end());
1744 vector<pair<Ice::ObjectPrx, float> > objectsWithLoad;
1745 objectsWithLoad.reserve(objs.size());
1746 for(Ice::ObjectProxySeq::const_iterator p = objs.begin(); p != objs.end(); ++p)
1747 {
1748 float load = 1.0f;
1749 if(!(*p)->ice_getAdapterId().empty())
1750 {
1751 try
1752 {
1753 load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
1754 }
1755 catch(const AdapterNotExistException&)
1756 {
1757 }
1758 }
1759 objectsWithLoad.push_back(make_pair(*p, load));
1760 }
1761 return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), ObjectLoadCI())->first;
1762 }
1763
1764 Ice::ObjectProxySeq
getObjectsByType(const string & type,const Ice::ConnectionPtr & con,const Ice::Context & ctx)1765 Database::getObjectsByType(const string& type, const Ice::ConnectionPtr& con, const Ice::Context& ctx)
1766 {
1767 Ice::ObjectProxySeq proxies;
1768
1769 vector<ObjectEntryPtr> objects = _objectCache.getObjectsByType(type);
1770 for(vector<ObjectEntryPtr>::const_iterator q = objects.begin(); q != objects.end(); ++q)
1771 {
1772 if(_nodeObserverTopic->isServerEnabled((*q)->getServer())) // Only return proxies from enabled servers.
1773 {
1774 proxies.push_back((*q)->getProxy());
1775 }
1776 }
1777
1778 IceDB::ReadOnlyTxn txn(_env);
1779 vector<ObjectInfo> infos = findByType(txn, _objects, _objectsByType, type);
1780 for(unsigned int i = 0; i < infos.size(); ++i)
1781 {
1782 proxies.push_back(infos[i].proxy);
1783 }
1784
1785 if(con && !proxies.empty() && _pluginFacade->hasTypeFilters())
1786 {
1787 vector<TypeFilterPtr> filters = _pluginFacade->getTypeFilters(type);
1788 if(!filters.empty())
1789 {
1790 for(vector<TypeFilterPtr>::const_iterator p = filters.begin(); p != filters.end(); ++p)
1791 {
1792 proxies = (*p)->filter(type, proxies, con, ctx);
1793 }
1794 }
1795 }
1796 return proxies;
1797 }
1798
1799 ObjectInfo
getObjectInfo(const Ice::Identity & id)1800 Database::getObjectInfo(const Ice::Identity& id)
1801 {
1802 try
1803 {
1804 ObjectEntryPtr object = _objectCache.get(id);
1805 return object->getObjectInfo();
1806 }
1807 catch(const ObjectNotRegisteredException&)
1808 {
1809 }
1810
1811 IceDB::ReadOnlyTxn txn(_env);
1812 ObjectInfo info;
1813 if(!_objects.get(txn, id, info))
1814 {
1815 throw ObjectNotRegisteredException(id);
1816 }
1817 return info;
1818 }
1819
1820 ObjectInfoSeq
getAllObjectInfos(const string & expression)1821 Database::getAllObjectInfos(const string& expression)
1822 {
1823 ObjectInfoSeq infos = _objectCache.getAll(expression);
1824
1825 IceDB::ReadOnlyTxn txn(_env);
1826
1827 Ice::Identity id;
1828 ObjectInfo info;
1829 ObjectsMapROCursor cursor(_objects, txn);
1830 while(cursor.get(id, info, MDB_NEXT))
1831 {
1832 if(expression.empty() || IceUtilInternal::match(_communicator->identityToString(id), expression, true))
1833 {
1834 infos.push_back(info);
1835 }
1836 }
1837 return infos;
1838 }
1839
1840 ObjectInfoSeq
getObjectInfosByType(const string & type)1841 Database::getObjectInfosByType(const string& type)
1842 {
1843 ObjectInfoSeq infos = _objectCache.getAllByType(type);
1844
1845 IceDB::ReadOnlyTxn txn(_env);
1846 ObjectInfoSeq dbInfos = findByType(txn, _objects, _objectsByType, type);
1847 for(unsigned int i = 0; i < dbInfos.size(); ++i)
1848 {
1849 infos.push_back(dbInfos[i]);
1850 }
1851 return infos;
1852 }
1853
1854 void
addInternalObject(const ObjectInfo & info,bool replace)1855 Database::addInternalObject(const ObjectInfo& info, bool replace)
1856 {
1857 Lock sync(*this);
1858 const Ice::Identity id = info.proxy->ice_getIdentity();
1859
1860 try
1861 {
1862 IceDB::ReadWriteTxn txn(_env);
1863
1864 ObjectInfo oldInfo;
1865 if(_internalObjects.get(txn, id, oldInfo))
1866 {
1867 if(!replace)
1868 {
1869 throw ObjectExistsException(id);
1870 }
1871 _internalObjectsByType.del(txn, oldInfo.type, id);
1872 }
1873 addObject(txn, info, true);
1874
1875 txn.commit();
1876 }
1877 catch(const IceDB::LMDBException& ex)
1878 {
1879 logError(_communicator, ex);
1880 throw;
1881 }
1882 }
1883
1884 void
removeInternalObject(const Ice::Identity & id)1885 Database::removeInternalObject(const Ice::Identity& id)
1886 {
1887 Lock sync(*this);
1888
1889 try
1890 {
1891 IceDB::ReadWriteTxn txn(_env);
1892
1893 ObjectInfo info;
1894 if(!_internalObjects.get(txn, id, info))
1895 {
1896 throw ObjectNotRegisteredException(id);
1897 }
1898 deleteObject(txn, info, true);
1899
1900 txn.commit();
1901 }
1902 catch(const IceDB::LMDBException& ex)
1903 {
1904 logError(_communicator, ex);
1905 throw;
1906 }
1907 }
1908
1909 Ice::ObjectProxySeq
getInternalObjectsByType(const string & type)1910 Database::getInternalObjectsByType(const string& type)
1911 {
1912 Ice::ObjectProxySeq proxies;
1913
1914 IceDB::ReadOnlyTxn txn(_env);
1915 vector<ObjectInfo> infos = findByType(txn, _internalObjects, _internalObjectsByType, type);
1916 for(unsigned int i = 0; i < infos.size(); ++i)
1917 {
1918 proxies.push_back(infos[i].proxy);
1919 }
1920 return proxies;
1921 }
1922
1923 void
checkForAddition(const ApplicationHelper & app,const IceDB::ReadWriteTxn & txn)1924 Database::checkForAddition(const ApplicationHelper& app, const IceDB::ReadWriteTxn& txn)
1925 {
1926 set<string> serverIds;
1927 set<string> adapterIds;
1928 set<Ice::Identity> objectIds;
1929
1930 app.getIds(serverIds, adapterIds, objectIds);
1931
1932 for_each(serverIds.begin(), serverIds.end(), objFunc(*this, &Database::checkServerForAddition));
1933 if(!adapterIds.empty())
1934 {
1935 for(set<string>::const_iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1936 {
1937 checkAdapterForAddition(*p, txn);
1938 }
1939 }
1940 if(!objectIds.empty())
1941 {
1942 for(set<Ice::Identity>::const_iterator p = objectIds.begin(); p != objectIds.end(); ++p)
1943 {
1944 checkObjectForAddition(*p, txn);
1945 }
1946 }
1947
1948 set<string> repGrps;
1949 set<string> adptRepGrps;
1950 app.getReplicaGroups(repGrps, adptRepGrps);
1951 for_each(adptRepGrps.begin(), adptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
1952 }
1953
1954 void
checkForUpdate(const ApplicationHelper & origApp,const ApplicationHelper & newApp,const IceDB::ReadWriteTxn & txn)1955 Database::checkForUpdate(const ApplicationHelper& origApp,
1956 const ApplicationHelper& newApp,
1957 const IceDB::ReadWriteTxn& txn)
1958 {
1959 set<string> oldSvrs, newSvrs;
1960 set<string> oldAdpts, newAdpts;
1961 set<Ice::Identity> oldObjs, newObjs;
1962
1963 origApp.getIds(oldSvrs, oldAdpts, oldObjs);
1964 newApp.getIds(newSvrs, newAdpts, newObjs);
1965
1966 Ice::StringSeq addedSvrs;
1967 set_difference(newSvrs.begin(), newSvrs.end(), oldSvrs.begin(), oldSvrs.end(), back_inserter(addedSvrs));
1968 for_each(addedSvrs.begin(), addedSvrs.end(), objFunc(*this, &Database::checkServerForAddition));
1969
1970 Ice::StringSeq addedAdpts;
1971 set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), back_inserter(addedAdpts));
1972 if(!addedAdpts.empty())
1973 {
1974 for(Ice::StringSeq::const_iterator p = addedAdpts.begin(); p != addedAdpts.end(); ++p)
1975 {
1976 checkAdapterForAddition(*p, txn);
1977 }
1978 }
1979
1980 vector<Ice::Identity> addedObjs;
1981 set_difference(newObjs.begin(), newObjs.end(), oldObjs.begin(), oldObjs.end(), back_inserter(addedObjs));
1982 if(!addedObjs.empty())
1983 {
1984 for(vector<Ice::Identity>::const_iterator p = addedObjs.begin(); p != addedObjs.end(); ++p)
1985 {
1986 checkObjectForAddition(*p, txn);
1987 }
1988 }
1989
1990 set<string> oldRepGrps, newRepGrps;
1991 set<string> oldAdptRepGrps, newAdptRepGrps;
1992 origApp.getReplicaGroups(oldRepGrps, oldAdptRepGrps);
1993 newApp.getReplicaGroups(newRepGrps, newAdptRepGrps);
1994
1995 set<string> rmRepGrps;
1996 set_difference(oldRepGrps.begin(), oldRepGrps.end(), newRepGrps.begin(),newRepGrps.end(), set_inserter(rmRepGrps));
1997 for_each(rmRepGrps.begin(), rmRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
1998
1999 set<string> addedAdptRepGrps;
2000 set_difference(newAdptRepGrps.begin(),newAdptRepGrps.end(), oldAdptRepGrps.begin(), oldAdptRepGrps.end(),
2001 set_inserter(addedAdptRepGrps));
2002 for_each(addedAdptRepGrps.begin(), addedAdptRepGrps.end(), objFunc(*this, &Database::checkReplicaGroupExists));
2003
2004 vector<string> invalidAdptRepGrps;
2005 set_intersection(rmRepGrps.begin(), rmRepGrps.end(), newAdptRepGrps.begin(), newAdptRepGrps.end(),
2006 back_inserter(invalidAdptRepGrps));
2007 if(!invalidAdptRepGrps.empty())
2008 {
2009 throw DeploymentException("couldn't find replica group `" + invalidAdptRepGrps.front() + "'");
2010 }
2011 }
2012
2013 void
checkForRemove(const ApplicationHelper & app)2014 Database::checkForRemove(const ApplicationHelper& app)
2015 {
2016 set<string> replicaGroups;
2017 set<string> adapterReplicaGroups;
2018 app.getReplicaGroups(replicaGroups, adapterReplicaGroups);
2019 for_each(replicaGroups.begin(), replicaGroups.end(), objFunc(*this, &Database::checkReplicaGroupForRemove));
2020 }
2021
2022 void
checkServerForAddition(const string & id)2023 Database::checkServerForAddition(const string& id)
2024 {
2025 if(_serverCache.has(id))
2026 {
2027 throw DeploymentException("server `" + id + "' is already registered");
2028 }
2029 }
2030
2031 void
checkAdapterForAddition(const string & id,const IceDB::ReadWriteTxn & txn)2032 Database::checkAdapterForAddition(const string& id, const IceDB::ReadWriteTxn& txn)
2033 {
2034 bool found = false;
2035 if(_adapterCache.has(id))
2036 {
2037 found = true;
2038 }
2039 else
2040 {
2041 if(_adapters.find(txn, id))
2042 {
2043 found = true;
2044 }
2045 else
2046 {
2047 if(!findByReplicaGroupId(txn, _adapters, _adaptersByGroupId, id).empty())
2048 {
2049 found = true;
2050 }
2051 }
2052 }
2053
2054 if(found)
2055 {
2056 throw DeploymentException("adapter `" + id + "' is already registered");
2057 }
2058 }
2059
2060 void
checkObjectForAddition(const Ice::Identity & objectId,const IceDB::ReadWriteTxn & txn)2061 Database::checkObjectForAddition(const Ice::Identity& objectId,
2062 const IceDB::ReadWriteTxn& txn)
2063 {
2064 bool found = false;
2065 if(_objectCache.has(objectId) || _allocatableObjectCache.has(objectId))
2066 {
2067 found = true;
2068 }
2069 else
2070 {
2071 if(_objects.find(txn, objectId))
2072 {
2073 found = true;
2074 }
2075 }
2076
2077 if(found)
2078 {
2079 throw DeploymentException("object `" + _communicator->identityToString(objectId) + "' is already registered");
2080 }
2081 }
2082
2083 void
checkReplicaGroupExists(const string & replicaGroup)2084 Database::checkReplicaGroupExists(const string& replicaGroup)
2085 {
2086 ReplicaGroupEntryPtr entry;
2087 try
2088 {
2089 entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
2090 }
2091 catch(const AdapterNotExistException&)
2092 {
2093 }
2094
2095 if(!entry)
2096 {
2097 throw DeploymentException("couldn't find replica group `" + replicaGroup + "'");
2098 }
2099 }
2100
2101 void
checkReplicaGroupForRemove(const string & replicaGroup)2102 Database::checkReplicaGroupForRemove(const string& replicaGroup)
2103 {
2104 ReplicaGroupEntryPtr entry;
2105 try
2106 {
2107 entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(replicaGroup));
2108 }
2109 catch(const AdapterNotExistException&)
2110 {
2111 }
2112
2113 if(!entry)
2114 {
2115 //
2116 // This would indicate an inconsistency with the cache and
2117 // database. We don't print an error, it will be printed
2118 // when the application is actually removed.
2119 //
2120 return;
2121 }
2122
2123 if(entry->hasAdaptersFromOtherApplications())
2124 {
2125 throw DeploymentException("couldn't remove application because the replica group `" + replicaGroup +
2126 "' is used by object adapters from other applications.");
2127 }
2128 }
2129
2130 void
load(const ApplicationHelper & app,ServerEntrySeq & entries,const string & uuid,int revision)2131 Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const string& uuid, int revision)
2132 {
2133 const NodeDescriptorDict& nodes = app.getInstance().nodes;
2134 const string application = app.getInstance().name;
2135 for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
2136 {
2137 _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
2138 }
2139
2140 const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
2141 for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
2142 {
2143 assert(!r->id.empty());
2144 _adapterCache.addReplicaGroup(*r, application);
2145 for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2146 {
2147 _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
2148 }
2149 }
2150
2151 map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
2152 for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
2153 {
2154 entries.push_back(_serverCache.add(p->second));
2155 }
2156 }
2157
2158 void
unload(const ApplicationHelper & app,ServerEntrySeq & entries)2159 Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
2160 {
2161 map<string, ServerInfo> servers = app.getServerInfos("", 0);
2162 for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
2163 {
2164 entries.push_back(_serverCache.remove(p->first, false));
2165 }
2166
2167 const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
2168 for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
2169 {
2170 for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2171 {
2172 _objectCache.remove(o->id);
2173 }
2174 _adapterCache.removeReplicaGroup(r->id);
2175 }
2176
2177 const NodeDescriptorDict& nodes = app.getInstance().nodes;
2178 const string application = app.getInstance().name;
2179 for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
2180 {
2181 _nodeCache.get(n->first)->removeDescriptor(application);
2182 }
2183 }
2184
2185 void
reload(const ApplicationHelper & oldApp,const ApplicationHelper & newApp,ServerEntrySeq & entries,const string & uuid,int revision,bool noRestart)2186 Database::reload(const ApplicationHelper& oldApp,
2187 const ApplicationHelper& newApp,
2188 ServerEntrySeq& entries,
2189 const string& uuid,
2190 int revision,
2191 bool noRestart)
2192 {
2193 const string application = oldApp.getInstance().name;
2194
2195 //
2196 // Remove destroyed servers.
2197 //
2198 map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
2199 map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision);
2200 vector<pair<bool, ServerInfo> > load;
2201 for(map<string, ServerInfo>::const_iterator p = newServers.begin(); p != newServers.end(); ++p)
2202 {
2203 map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
2204 if(q == oldServers.end())
2205 {
2206 load.push_back(make_pair(false, p->second));
2207 }
2208 else if(isServerUpdated(p->second, q->second))
2209 {
2210 _serverCache.preUpdate(p->second, noRestart);
2211 load.push_back(make_pair(true, p->second));
2212 }
2213 else
2214 {
2215 ServerEntryPtr server = _serverCache.get(p->first);
2216 server->update(q->second, noRestart); // Just update the server revision on the node.
2217 entries.push_back(server);
2218 }
2219 }
2220 for(map<string, ServerInfo>::const_iterator p = oldServers.begin(); p != oldServers.end(); ++p)
2221 {
2222 map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
2223 if(q == newServers.end())
2224 {
2225 entries.push_back(_serverCache.remove(p->first, noRestart));
2226 }
2227 }
2228
2229 //
2230 // Remove destroyed replica groups.
2231 //
2232 const ReplicaGroupDescriptorSeq& oldAdpts = oldApp.getInstance().replicaGroups;
2233 const ReplicaGroupDescriptorSeq& newAdpts = newApp.getInstance().replicaGroups;
2234 for(ReplicaGroupDescriptorSeq::const_iterator r = oldAdpts.begin(); r != oldAdpts.end(); ++r)
2235 {
2236 ReplicaGroupDescriptorSeq::const_iterator t;
2237 for(t = newAdpts.begin(); t != newAdpts.end(); ++t)
2238 {
2239 if(t->id == r->id)
2240 {
2241 break;
2242 }
2243 }
2244 for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2245 {
2246 _objectCache.remove(o->id);
2247 }
2248 if(t == newAdpts.end())
2249 {
2250 _adapterCache.removeReplicaGroup(r->id);
2251 }
2252 }
2253
2254 //
2255 // Remove all the node descriptors.
2256 //
2257 const NodeDescriptorDict& oldNodes = oldApp.getInstance().nodes;
2258 for(NodeDescriptorDict::const_iterator n = oldNodes.begin(); n != oldNodes.end(); ++n)
2259 {
2260 _nodeCache.get(n->first)->removeDescriptor(application);
2261 }
2262
2263 //
2264 // Add back node descriptors.
2265 //
2266 const NodeDescriptorDict& newNodes = newApp.getInstance().nodes;
2267 for(NodeDescriptorDict::const_iterator n = newNodes.begin(); n != newNodes.end(); ++n)
2268 {
2269 _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
2270 }
2271
2272 //
2273 // Add back replica groups.
2274 //
2275 for(ReplicaGroupDescriptorSeq::const_iterator r = newAdpts.begin(); r != newAdpts.end(); ++r)
2276 {
2277 try
2278 {
2279 ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
2280 assert(entry);
2281 entry->update(application, r->loadBalancing, r->filter);
2282 }
2283 catch(const AdapterNotExistException&)
2284 {
2285 _adapterCache.addReplicaGroup(*r, application);
2286 }
2287
2288 for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
2289 {
2290 _objectCache.add(toObjectInfo(_communicator, *o, r->id), application, "");
2291 }
2292 }
2293
2294 //
2295 // Add back servers.
2296 //
2297 for(vector<pair<bool, ServerInfo> >::const_iterator q = load.begin(); q != load.end(); ++q)
2298 {
2299 if(q->first) // Update
2300 {
2301 entries.push_back(_serverCache.postUpdate(q->second, noRestart));
2302 }
2303 else
2304 {
2305 entries.push_back(_serverCache.add(q->second));
2306 }
2307 }
2308 }
2309
2310 Ice::Long
saveApplication(const ApplicationInfo & info,const IceDB::ReadWriteTxn & txn,Ice::Long dbSerial)2311 Database::saveApplication(const ApplicationInfo& info, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
2312 {
2313 assert(dbSerial != 0 || _master);
2314 _applications.put(txn, info.descriptor.name, info);
2315 return updateSerial(txn, applicationsDbName, dbSerial);
2316 }
2317
2318 Ice::Long
removeApplication(const string & name,const IceDB::ReadWriteTxn & txn,Ice::Long dbSerial)2319 Database::removeApplication(const string& name, const IceDB::ReadWriteTxn& txn, Ice::Long dbSerial)
2320 {
2321 assert(dbSerial != 0 || _master);
2322 _applications.del(txn, name);
2323 return updateSerial(txn, applicationsDbName, dbSerial);
2324 }
2325
2326 void
checkUpdate(const ApplicationHelper & oldApp,const ApplicationHelper & newApp,const string & uuid,int revision,bool noRestart)2327 Database::checkUpdate(const ApplicationHelper& oldApp,
2328 const ApplicationHelper& newApp,
2329 const string& uuid,
2330 int revision,
2331 bool noRestart)
2332 {
2333 const string application = oldApp.getInstance().name;
2334
2335 map<string, ServerInfo> oldServers = oldApp.getServerInfos(uuid, revision);
2336 map<string, ServerInfo> newServers = newApp.getServerInfos(uuid, revision + 1);
2337
2338 map<string, ServerInfo>::const_iterator p;
2339 vector<string> servers;
2340 vector<string> reasons;
2341 vector<CheckUpdateResultPtr> results;
2342 set<string> unreachableNodes;
2343
2344 if(noRestart)
2345 {
2346 for(p = oldServers.begin(); p != oldServers.end(); ++p)
2347 {
2348 map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
2349 if(q == newServers.end())
2350 {
2351 try
2352 {
2353 ServerInfo info = p->second;
2354 info.descriptor = 0; // Clear the descriptor to indicate removal.
2355 CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(info, true);
2356 if(result)
2357 {
2358 results.push_back(result);
2359 }
2360 }
2361 catch(const NodeUnreachableException& ex)
2362 {
2363 unreachableNodes.insert(ex.name);
2364 }
2365 catch(const DeploymentException& ex)
2366 {
2367 servers.push_back(p->first);
2368 reasons.push_back(ex.reason);
2369 }
2370 }
2371 }
2372 }
2373
2374 for(p = newServers.begin(); p != newServers.end(); ++p)
2375 {
2376 map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
2377 if(q != oldServers.end() && isServerUpdated(p->second, q->second))
2378 {
2379 if(noRestart &&
2380 p->second.node == q->second.node &&
2381 isServerUpdated(p->second, q->second, true)) // Ignore properties
2382 {
2383 //
2384 // The updates are not only property updates and noRestart is required, no
2385 // need to check the server update on the node, we know already it requires
2386 // a restart.
2387 //
2388 servers.push_back(p->first);
2389 reasons.push_back("update requires the server `" + p->first + "' to be stopped");
2390 }
2391 else
2392 {
2393 //
2394 // Ask the node to check the server update.
2395 //
2396 try
2397 {
2398 CheckUpdateResultPtr result = _serverCache.get(p->first)->checkUpdate(p->second, noRestart);
2399 if(result)
2400 {
2401 results.push_back(result);
2402 }
2403 }
2404 catch(const NodeUnreachableException& ex)
2405 {
2406 unreachableNodes.insert(ex.name);
2407 }
2408 catch(const DeploymentException& ex)
2409 {
2410 servers.push_back(p->first);
2411 reasons.push_back(ex.reason);
2412 }
2413 }
2414 }
2415 }
2416
2417 for(vector<CheckUpdateResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
2418 {
2419 try
2420 {
2421 (*q)->getResult();
2422 }
2423 catch(const NodeUnreachableException& ex)
2424 {
2425 unreachableNodes.insert(ex.name);
2426 }
2427 catch(const DeploymentException& ex)
2428 {
2429 servers.push_back((*q)->getServer());
2430 reasons.push_back(ex.reason);
2431 }
2432 }
2433
2434 if(noRestart)
2435 {
2436 if(!servers.empty() || !unreachableNodes.empty())
2437 {
2438 if(_traceLevels->application > 0)
2439 {
2440 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
2441 out << "check for application `" << application << "' update failed:";
2442 if(!unreachableNodes.empty())
2443 {
2444 #if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
2445 Ice::StringSeq nodes;
2446 for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
2447 {
2448 nodes.push_back(*r);
2449 }
2450 #else
2451 Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
2452 #endif
2453 if(nodes.size() == 1)
2454 {
2455 out << "\nthe node `" << nodes[0] << "' is down";
2456 }
2457 else
2458 {
2459 out << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
2460 }
2461 }
2462 if(!reasons.empty())
2463 {
2464 for(vector<string>::const_iterator r = reasons.begin(); r != reasons.end(); ++r)
2465 {
2466 out << "\n" << *r;
2467 }
2468 }
2469 }
2470
2471 ostringstream os;
2472 os << "check for application `" << application << "' update failed:";
2473 if(!servers.empty())
2474 {
2475 if(servers.size() == 1)
2476 {
2477 os << "\nthe server `" << servers[0] << "' would need to be stopped";
2478 }
2479 else
2480 {
2481 os << "\nthe servers `" << toString(servers, ", ") << "' would need to be stopped";
2482 }
2483 }
2484 if(!unreachableNodes.empty())
2485 {
2486 #if defined(__SUNPRO_CC) && defined(_RWSTD_NO_MEMBER_TEMPLATES)
2487 Ice::StringSeq nodes;
2488 for(set<string>::const_iterator r = unreachableNodes.begin(); r != unreachableNodes.end(); ++r)
2489 {
2490 nodes.push_back(*r);
2491 }
2492 #else
2493 Ice::StringSeq nodes(unreachableNodes.begin(), unreachableNodes.end());
2494 #endif
2495 if(nodes.size() == 1)
2496 {
2497 os << "\nthe node `" << nodes[0] << "' is down";
2498 }
2499 else
2500 {
2501 os << "\nthe nodes `" << toString(nodes, ", ") << "' are down";
2502 }
2503 }
2504 throw DeploymentException(os.str());
2505 }
2506 }
2507 else if(!reasons.empty())
2508 {
2509 ostringstream os;
2510 os << "check for application `" << application << "' update failed:";
2511 for(vector<string>::const_iterator r = reasons.begin(); r != reasons.end(); ++r)
2512 {
2513 os << "\n" << *r;
2514 }
2515 throw DeploymentException(os.str());
2516 }
2517 }
2518
2519 void
finishApplicationUpdate(const ApplicationUpdateInfo & update,const ApplicationInfo & oldApp,const ApplicationHelper & previousAppHelper,const ApplicationHelper & appHelper,AdminSessionI *,bool noRestart,Ice::Long dbSerial)2520 Database::finishApplicationUpdate(const ApplicationUpdateInfo& update,
2521 const ApplicationInfo& oldApp,
2522 const ApplicationHelper& previousAppHelper,
2523 const ApplicationHelper& appHelper,
2524 AdminSessionI* /*session*/,
2525 bool noRestart,
2526 Ice::Long dbSerial)
2527 {
2528 const ApplicationDescriptor& newDesc = appHelper.getDefinition();
2529
2530 ServerEntrySeq entries;
2531 int serial = 0;
2532 try
2533 {
2534 if(_master)
2535 {
2536 checkUpdate(previousAppHelper, appHelper, oldApp.uuid, oldApp.revision, noRestart);
2537 }
2538
2539 Lock sync(*this);
2540
2541 IceDB::ReadWriteTxn txn(_env);
2542
2543 checkForUpdate(previousAppHelper, appHelper, txn);
2544 reload(previousAppHelper, appHelper, entries, oldApp.uuid, oldApp.revision + 1, noRestart);
2545
2546 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
2547
2548 ApplicationInfo info = oldApp;
2549 info.updateTime = update.updateTime;
2550 info.updateUser = update.updateUser;
2551 info.revision = update.revision;
2552 info.descriptor = newDesc;
2553 dbSerial = saveApplication(info, txn, dbSerial);
2554
2555 txn.commit();
2556
2557 serial = _applicationObserverTopic->applicationUpdated(dbSerial, update);
2558 }
2559 catch(const DeploymentException&)
2560 {
2561 finishUpdating(update.descriptor.name);
2562 throw;
2563 }
2564 catch(const IceDB::LMDBException& ex)
2565 {
2566 logError(_communicator, ex);
2567 throw;
2568 }
2569
2570 _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for replicas to be updated.
2571
2572 //
2573 // Mark the application as updated. All the replicas received the update so it's now safe
2574 // for the nodes to start servers.
2575 //
2576 {
2577 Lock sync(*this);
2578 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
2579 assert(p != _updating.end());
2580 p->markUpdated();
2581 }
2582
2583 if(_master)
2584 {
2585 try
2586 {
2587 for(ServerEntrySeq::const_iterator p = entries.begin(); p != entries.end(); ++p)
2588 {
2589 try
2590 {
2591 (*p)->waitForSync();
2592 }
2593 catch(const NodeUnreachableException&)
2594 {
2595 // Ignore.
2596 }
2597 }
2598 }
2599 catch(const DeploymentException&)
2600 {
2601 ApplicationUpdateInfo newUpdate;
2602 {
2603 Lock sync(*this);
2604 entries.clear();
2605 ApplicationHelper previous(_communicator, newDesc);
2606 ApplicationHelper helper(_communicator, oldApp.descriptor);
2607
2608 ApplicationInfo info = oldApp;
2609 info.revision = update.revision + 1;
2610
2611 try
2612 {
2613 IceDB::ReadWriteTxn txn(_env);
2614 dbSerial = saveApplication(info, txn);
2615 txn.commit();
2616 }
2617 catch(const IceDB::LMDBException& ex)
2618 {
2619 logError(_communicator, ex);
2620 }
2621
2622 reload(previous, helper, entries, info.uuid, info.revision, noRestart);
2623
2624 newUpdate.updateTime = IceUtil::Time::now().toMilliSeconds();
2625 newUpdate.updateUser = _lockUserId;
2626 newUpdate.revision = info.revision;
2627 newUpdate.descriptor = helper.diff(previous);
2628
2629 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), update.descriptor.name);
2630 assert(p != _updating.end());
2631 p->unmarkUpdated();
2632
2633 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
2634
2635 serial = _applicationObserverTopic->applicationUpdated(dbSerial, newUpdate);
2636 }
2637 _applicationObserverTopic->waitForSyncedSubscribers(serial); // Wait for subscriber to be updated.
2638 for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitForSyncNoThrow));
2639 finishUpdating(newDesc.name);
2640 throw;
2641 }
2642 }
2643
2644 if(_traceLevels->application > 0)
2645 {
2646 Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
2647 out << "updated application `" << update.descriptor.name << "' (serial = `" << dbSerial << "')";
2648 }
2649 finishUpdating(update.descriptor.name);
2650 }
2651
2652 void
waitForUpdate(const string & name)2653 Database::waitForUpdate(const string& name)
2654 {
2655 while(find(_updating.begin(), _updating.end(), name) != _updating.end())
2656 {
2657 wait();
2658 }
2659 }
2660
2661 void
startUpdating(const string & name,const string & uuid,int revision)2662 Database::startUpdating(const string& name, const string& uuid, int revision)
2663 {
2664 // Must be called within the synchronization.
2665 assert(find(_updating.begin(), _updating.end(), name) == _updating.end());
2666 _updating.push_back(UpdateInfo(name, uuid, revision));
2667 }
2668
2669 void
finishUpdating(const string & name)2670 Database::finishUpdating(const string& name)
2671 {
2672 Lock sync(*this);
2673
2674 vector<UpdateInfo>::iterator p = find(_updating.begin(), _updating.end(), name);
2675 assert(p != _updating.end());
2676 p->markUpdated();
2677 _updating.erase(p);
2678 notifyAll();
2679 }
2680
2681 Ice::Long
getSerial(const IceDB::Txn & txn,const string & dbName)2682 Database::getSerial(const IceDB::Txn& txn, const string& dbName)
2683 {
2684 Ice::Long serial = 1;
2685 _serials.get(txn, dbName, serial);
2686 return serial;
2687 }
2688
2689 Ice::Long
updateSerial(const IceDB::ReadWriteTxn & txn,const string & dbName,Ice::Long serial)2690 Database::updateSerial(const IceDB::ReadWriteTxn& txn, const string& dbName, Ice::Long serial)
2691 {
2692 if(serial == -1) // The master we are talking to doesn't support serials (old IceGrid versions)
2693 {
2694 return -1;
2695 }
2696
2697 //
2698 // If a serial number is set, just update the serial number from the database,
2699 // otherwise if the serial is 0, we increment the serial from the database.
2700 //
2701 if(serial > 0)
2702 {
2703 _serials.put(txn, dbName, serial);
2704 return serial;
2705 }
2706 else
2707 {
2708 Ice::Long dbSerial = getSerial(txn, dbName) + 1;
2709 _serials.put(txn, dbName, dbSerial);
2710 return dbSerial;
2711 }
2712 }
2713
2714 void
addAdapter(const IceDB::ReadWriteTxn & txn,const AdapterInfo & info)2715 Database::addAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
2716 {
2717 _adapters.put(txn, info.id, info);
2718 _adaptersByGroupId.put(txn, info.replicaGroupId, info.id);
2719 }
2720
2721 void
deleteAdapter(const IceDB::ReadWriteTxn & txn,const AdapterInfo & info)2722 Database::deleteAdapter(const IceDB::ReadWriteTxn& txn, const AdapterInfo& info)
2723 {
2724
2725 _adapters.del(txn, info.id);
2726 _adaptersByGroupId.del(txn, info.replicaGroupId, info.id);
2727 }
2728
2729 void
addObject(const IceDB::ReadWriteTxn & txn,const ObjectInfo & info,bool internal)2730 Database::addObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
2731 {
2732 if(internal)
2733 {
2734 _internalObjects.put(txn, info.proxy->ice_getIdentity(), info);
2735 _internalObjectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
2736 }
2737 else
2738 {
2739 try
2740 {
2741 _objects.put(txn, info.proxy->ice_getIdentity(), info);
2742 }
2743 catch(const IceDB::KeyTooLongException& ex)
2744 {
2745 throw DeploymentException("object identity `" +
2746 _communicator->identityToString(info.proxy->ice_getIdentity())
2747 + "' is too long: " + ex.what());
2748 }
2749 try
2750 {
2751 _objectsByType.put(txn, info.type, info.proxy->ice_getIdentity());
2752 }
2753 catch(const IceDB::KeyTooLongException& ex)
2754 {
2755 throw DeploymentException("object type `" + info.type + "' is too long: " + ex.what());
2756 }
2757 }
2758 }
2759
2760 void
deleteObject(const IceDB::ReadWriteTxn & txn,const ObjectInfo & info,bool internal)2761 Database::deleteObject(const IceDB::ReadWriteTxn& txn, const ObjectInfo& info, bool internal)
2762 {
2763 if(internal)
2764 {
2765 _internalObjects.del(txn, info.proxy->ice_getIdentity());
2766 _internalObjectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
2767 }
2768 else
2769 {
2770 _objects.del(txn, info.proxy->ice_getIdentity());
2771 _objectsByType.del(txn, info.type, info.proxy->ice_getIdentity());
2772 }
2773 }
2774