1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4
5 #include <IceUtil/DisableWarnings.h>
6 #include <Ice/LoggerUtil.h>
7 #include <Ice/Communicator.h>
8 #include <Ice/LocalException.h>
9 #include <IceGrid/ServerCache.h>
10 #include <IceGrid/NodeCache.h>
11 #include <IceGrid/AdapterCache.h>
12 #include <IceGrid/ObjectCache.h>
13 #include <IceGrid/AllocatableObjectCache.h>
14 #include <IceGrid/SessionI.h>
15 #include <IceGrid/DescriptorHelper.h>
16 #include <IceGrid/Topics.h>
17
18 using namespace std;
19 using namespace IceGrid;
20
21 namespace IceGrid
22 {
23
24 struct AddCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void>
25 {
AddCommunicatorIceGrid::AddCommunicator26 AddCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry, const string& application) :
27 _serverCache(serverCache), _entry(entry), _application(application)
28 {
29 }
30
31 void
operator ()IceGrid::AddCommunicator32 operator()(const CommunicatorDescriptorPtr& desc)
33 {
34 _serverCache.addCommunicator(0, desc, _entry, _application);
35 }
36
37 void
operator ()IceGrid::AddCommunicator38 operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc)
39 {
40 _serverCache.addCommunicator(oldDesc, newDesc, _entry, _application);
41 }
42
43 ServerCache& _serverCache;
44 const ServerEntryPtr _entry;
45 const string _application;
46 };
47
48 struct RemoveCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void>
49 {
RemoveCommunicatorIceGrid::RemoveCommunicator50 RemoveCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry) :
51 _serverCache(serverCache), _entry(entry)
52 {
53 }
54
55 void
operator ()IceGrid::RemoveCommunicator56 operator()(const CommunicatorDescriptorPtr& desc)
57 {
58 _serverCache.removeCommunicator(desc, 0, _entry);
59 }
60
61 void
operator ()IceGrid::RemoveCommunicator62 operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc)
63 {
64 _serverCache.removeCommunicator(oldDesc, newDesc, _entry);
65 }
66
67 ServerCache& _serverCache;
68 const ServerEntryPtr _entry;
69 };
70
71 }
72
CheckUpdateResult(const string & server,const string & node,bool noRestart,bool remove,const Ice::AsyncResultPtr & result)73 CheckUpdateResult::CheckUpdateResult(const string& server,
74 const string& node,
75 bool noRestart,
76 bool remove,
77 const Ice::AsyncResultPtr& result) :
78 _server(server), _node(node), _remove(remove), _noRestart(noRestart), _result(result)
79 {
80 }
81
82 bool
getResult()83 CheckUpdateResult::getResult()
84 {
85 try
86 {
87 return ServerPrx::uncheckedCast(_result->getProxy())->end_checkUpdate(_result);
88 }
89 catch(const DeploymentException& ex)
90 {
91 ostringstream os;
92 if(_remove)
93 {
94 os << "check for server `" << _server << "' remove failed: " << ex.reason;
95 }
96 else
97 {
98 os << "check for server `" << _server << "' update failed: " << ex.reason;
99 }
100 throw DeploymentException(os.str());
101 }
102 catch(const Ice::OperationNotExistException&)
103 {
104 if(_noRestart)
105 {
106 throw DeploymentException("server `" + _server + "' doesn't support check for updates");
107 }
108 return false;
109 }
110 catch(const Ice::Exception& ex)
111 {
112 ostringstream os;
113 os << ex;
114 throw NodeUnreachableException(_node, os.str());
115 }
116 }
117
ServerCache(const Ice::CommunicatorPtr & communicator,const string & instanceName,NodeCache & nodeCache,AdapterCache & adapterCache,ObjectCache & objectCache,AllocatableObjectCache & allocatableObjectCache)118 ServerCache::ServerCache(const Ice::CommunicatorPtr& communicator,
119 const string& instanceName,
120 NodeCache& nodeCache,
121 AdapterCache& adapterCache,
122 ObjectCache& objectCache,
123 AllocatableObjectCache& allocatableObjectCache) :
124 _communicator(communicator),
125 _instanceName(instanceName),
126 _nodeCache(nodeCache),
127 _adapterCache(adapterCache),
128 _objectCache(objectCache),
129 _allocatableObjectCache(allocatableObjectCache)
130 {
131 }
132
133 ServerEntryPtr
add(const ServerInfo & info)134 ServerCache::add(const ServerInfo& info)
135 {
136 Lock sync(*this);
137
138 ServerEntryPtr entry = getImpl(info.descriptor->id);
139 if(!entry)
140 {
141 entry = new ServerEntry(*this, info.descriptor->id);
142 addImpl(info.descriptor->id, entry);
143 }
144 entry->update(info, false);
145
146 _nodeCache.get(info.node, true)->addServer(entry);
147
148 forEachCommunicator(AddCommunicator(*this, entry, info.application))(info.descriptor);
149
150 if(_traceLevels && _traceLevels->server > 0)
151 {
152 Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
153 out << "added server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')";
154 }
155
156 return entry;
157 }
158
159 ServerEntryPtr
get(const string & id) const160 ServerCache::get(const string& id) const
161 {
162 Lock sync(*this);
163 ServerEntryPtr entry = getImpl(id);
164 if(!entry)
165 {
166 throw ServerNotExistException(id);
167 }
168 return entry;
169 }
170
171 bool
has(const string & id) const172 ServerCache::has(const string& id) const
173 {
174 Lock sync(*this);
175 ServerEntryPtr entry = getImpl(id);
176 return entry && !entry->isDestroyed();
177 }
178
179 ServerEntryPtr
remove(const string & id,bool noRestart)180 ServerCache::remove(const string& id, bool noRestart)
181 {
182 Lock sync(*this);
183
184 ServerEntryPtr entry = getImpl(id);
185 assert(entry);
186
187 ServerInfo info = entry->getInfo();
188 forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor);
189
190 _nodeCache.get(info.node)->removeServer(entry);
191
192 entry->destroy(noRestart); // This must be done after otherwise some allocatable objects
193 // might allocate a destroyed server.
194
195 if(_traceLevels && _traceLevels->server > 0)
196 {
197 Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
198 out << "removed server `" << id << "'";
199 }
200
201 return entry;
202 }
203
204 void
preUpdate(const ServerInfo & newInfo,bool noRestart)205 ServerCache::preUpdate(const ServerInfo& newInfo, bool noRestart)
206 {
207 Lock sync(*this);
208
209 const string& id = newInfo.descriptor->id;
210 ServerEntryPtr entry = getImpl(id);
211 assert(entry);
212
213 if(!noRestart)
214 {
215 ServerInfo info = entry->getInfo();
216 forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor, newInfo.descriptor);
217 _nodeCache.get(info.node)->removeServer(entry);
218 }
219
220 if(_traceLevels && _traceLevels->server > 0)
221 {
222 Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
223 out << "updating server `" << id << "'";
224 if(noRestart)
225 {
226 out << " with no restart";
227 }
228 }
229 }
230
231 ServerEntryPtr
postUpdate(const ServerInfo & info,bool noRestart)232 ServerCache::postUpdate(const ServerInfo& info, bool noRestart)
233 {
234 Lock sync(*this);
235
236 ServerEntryPtr entry = getImpl(info.descriptor->id);
237 assert(entry);
238
239 ServerInfo oldInfo = entry->getInfo();
240 entry->update(info, noRestart);
241
242 if(!noRestart)
243 {
244 _nodeCache.get(info.node, true)->addServer(entry);
245 forEachCommunicator(AddCommunicator(*this, entry, info.application))(oldInfo.descriptor, info.descriptor);
246 }
247
248 if(_traceLevels && _traceLevels->server > 0)
249 {
250 Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
251 out << "updated server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')";
252 }
253
254 return entry;
255 }
256
257 void
clear(const string & id)258 ServerCache::clear(const string& id)
259 {
260 Lock sync(*this);
261 CacheByString<ServerEntry>::removeImpl(id);
262 }
263
264 void
setNodeObserverTopic(const NodeObserverTopicPtr & nodeObserverTopic)265 ServerCache::setNodeObserverTopic(const NodeObserverTopicPtr& nodeObserverTopic)
266 {
267 _nodeObserverTopic = nodeObserverTopic;
268 }
269
270 void
addCommunicator(const CommunicatorDescriptorPtr & oldDesc,const CommunicatorDescriptorPtr & newDesc,const ServerEntryPtr & server,const string & application)271 ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc,
272 const CommunicatorDescriptorPtr& newDesc,
273 const ServerEntryPtr& server,
274 const string& application)
275 {
276 if(!newDesc)
277 {
278 return; // Nothing to add
279 }
280 for(AdapterDescriptorSeq::const_iterator q = newDesc->adapters.begin() ; q != newDesc->adapters.end(); ++q)
281 {
282 AdapterDescriptor oldAdpt;
283 if(oldDesc)
284 {
285 for(AdapterDescriptorSeq::const_iterator p = oldDesc->adapters.begin() ; p != oldDesc->adapters.end(); ++p)
286 {
287 if(p->id == q->id)
288 {
289 oldAdpt = *p;
290 break;
291 }
292 }
293 }
294 assert(!q->id.empty());
295 _adapterCache.addServerAdapter(*q, server, application);
296
297 for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
298 {
299 _objectCache.add(toObjectInfo(_communicator, *r, q->id), application, server->getId());
300 }
301 for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r)
302 {
303 ObjectDescriptorSeq::const_iterator s;
304 for(s = oldAdpt.allocatables.begin(); s != oldAdpt.allocatables.end() && s->id != r->id; ++s);
305 if(s == oldAdpt.allocatables.end() || *s != *r) // Only add new or updated allocatables
306 {
307 _allocatableObjectCache.add(toObjectInfo(_communicator, *r, q->id), server);
308 }
309 }
310 }
311 }
312
313 void
removeCommunicator(const CommunicatorDescriptorPtr & oldDesc,const CommunicatorDescriptorPtr & newDesc,const ServerEntryPtr &)314 ServerCache::removeCommunicator(const CommunicatorDescriptorPtr& oldDesc,
315 const CommunicatorDescriptorPtr& newDesc,
316 const ServerEntryPtr& /*entry*/)
317 {
318 if(!oldDesc)
319 {
320 return; // Nothing to remove
321 }
322 for(AdapterDescriptorSeq::const_iterator q = oldDesc->adapters.begin() ; q != oldDesc->adapters.end(); ++q)
323 {
324 AdapterDescriptor newAdpt;
325 if(newDesc)
326 {
327 for(AdapterDescriptorSeq::const_iterator p = newDesc->adapters.begin() ; p != newDesc->adapters.end(); ++p)
328 {
329 if(p->id == q->id)
330 {
331 newAdpt = *p;
332 break;
333 }
334 }
335 }
336
337 for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
338 {
339 _objectCache.remove((*r).id);
340 }
341 for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r)
342 {
343 // Don't remove the allocatable if it's still in the new descriptor.
344 ObjectDescriptorSeq::const_iterator s;
345 for(s = newAdpt.allocatables.begin(); s != newAdpt.allocatables.end() && s->id != r->id; ++s);
346 if(s == newAdpt.allocatables.end() || *s != *r) // Only removed updated or removed allocatables
347 {
348 _allocatableObjectCache.remove(r->id);
349 }
350 }
351 _adapterCache.removeServerAdapter(q->id);
352 }
353 }
354
ServerEntry(ServerCache & cache,const string & id)355 ServerEntry::ServerEntry(ServerCache& cache, const string& id) :
356 Allocatable(false, 0),
357 _cache(cache),
358 _id(id),
359 _activationTimeout(-1),
360 _deactivationTimeout(-1),
361 _synchronizing(false),
362 _updated(false),
363 _noRestart(false)
364 {
365 }
366
367 void
sync()368 ServerEntry::sync()
369 {
370 syncImpl();
371 }
372
373 void
waitForSync(int timeout)374 ServerEntry::waitForSync(int timeout)
375 {
376 waitImpl(timeout);
377 }
378
379 void
waitForSyncNoThrow(int timeout)380 ServerEntry::waitForSyncNoThrow(int timeout)
381 {
382 try
383 {
384 waitImpl(timeout);
385 }
386 catch(const SynchronizationException&)
387 {
388 assert(timeout >= 0);
389 }
390 catch(const Ice::Exception&)
391 {
392 }
393 }
394
395 void
unsync()396 ServerEntry::unsync()
397 {
398 Lock sync(*this);
399 if(_loaded.get())
400 {
401 _load.reset(_loaded.release());
402 }
403 _proxy = 0;
404 _adapters.clear();
405 _activationTimeout = -1;
406 _deactivationTimeout = -1;
407 }
408
409 bool
addSyncCallback(const SynchronizationCallbackPtr & callback)410 ServerEntry::addSyncCallback(const SynchronizationCallbackPtr& callback)
411 {
412 Lock sync(*this);
413 if(!_loaded.get() && !_load.get())
414 {
415 throw ServerNotExistException();
416 }
417 if(_synchronizing)
418 {
419 _callbacks.push_back(callback);
420 }
421 return _synchronizing;
422 }
423
424 void
update(const ServerInfo & info,bool noRestart)425 ServerEntry::update(const ServerInfo& info, bool noRestart)
426 {
427 Lock sync(*this);
428
429 IceInternal::UniquePtr<ServerInfo> descriptor(new ServerInfo());
430 *descriptor = info;
431
432 _updated = true;
433
434 if(!_destroy.get())
435 {
436 if(_loaded.get() && descriptor->node != _loaded->node)
437 {
438 _destroy.reset(_loaded.release());
439 }
440 else if(_load.get() && descriptor->node != _load->node)
441 {
442 _destroy.reset(_load.release());
443 }
444 }
445
446 _load.reset(descriptor.release());
447 _noRestart = noRestart;
448 _loaded.reset();
449 _allocatable = info.descriptor->allocatable;
450 if(info.descriptor->activation == "session")
451 {
452 _allocatable = true;
453 _load->sessionId = _allocationSession ? _allocationSession->getId() : string("");
454 }
455 }
456
457 void
destroy(bool noRestart)458 ServerEntry::destroy(bool noRestart)
459 {
460 Lock sync(*this);
461
462 _updated = true;
463
464 assert(_loaded.get() || _load.get());
465 if(!_destroy.get())
466 {
467 if(_loaded.get())
468 {
469 assert(!_destroy.get());
470 _destroy.reset(_loaded.release());
471 }
472 else if(_load.get())
473 {
474 assert(!_destroy.get());
475 _destroy.reset(_load.release());
476 }
477 }
478
479 _noRestart = noRestart;
480 _load.reset();
481 _loaded.reset();
482 _allocatable = false;
483 }
484
485 ServerInfo
getInfo(bool resolve) const486 ServerEntry::getInfo(bool resolve) const
487 {
488 ServerInfo info;
489 SessionIPtr session;
490 {
491 Lock sync(*this);
492 if(!_loaded.get() && !_load.get())
493 {
494 throw ServerNotExistException();
495 }
496 info = _loaded.get() ? *_loaded : *_load;
497 session = _allocationSession;
498 }
499 assert(info.descriptor);
500 if(resolve)
501 {
502 try
503 {
504 return _cache.getNodeCache().get(info.node)->getServerInfo(info, session);
505 }
506 catch(const DeploymentException&)
507 {
508 }
509 catch(const NodeNotExistException&)
510 {
511 }
512 catch(const NodeUnreachableException&)
513 {
514 }
515 }
516 return info;
517 }
518
519 string
getId() const520 ServerEntry::getId() const
521 {
522 return _id;
523 }
524
525 ServerPrx
getProxy(bool upToDate,int timeout)526 ServerEntry::getProxy(bool upToDate, int timeout)
527 {
528 //
529 // NOTE: this might throw ServerNotExistException, NodeUnreachableException
530 // or DeploymentException.
531 //
532
533 int actTimeout, deactTimeout;
534 string node;
535 return getProxy(actTimeout, deactTimeout, node, upToDate, timeout);
536 }
537
538 ServerPrx
getProxy(int & activationTimeout,int & deactivationTimeout,string & node,bool upToDate,int timeout)539 ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& node, bool upToDate, int timeout)
540 {
541 //
542 // NOTE: this might throw ServerNotExistException, NodeUnreachableException
543 // or DeploymentException.
544 //
545 while(true)
546 {
547 {
548 Lock sync(*this);
549 if(_loaded.get() || (_proxy && _synchronizing && !upToDate)) // Synced or if not up to date is fine
550 {
551 assert(_loaded.get() || _load.get() || _destroy.get());
552 activationTimeout = _activationTimeout;
553 deactivationTimeout = _deactivationTimeout;
554 node = _loaded.get() ? _loaded->node : (_load.get() ? _load->node : _destroy->node);
555 return _proxy;
556 }
557 else if(!_load.get() && !_destroy.get())
558 {
559 throw ServerNotExistException(_id);
560 }
561 }
562
563 syncImpl();
564 waitImpl(timeout);
565 }
566 }
567
568 Ice::ObjectPrx
getAdminProxy()569 ServerEntry::getAdminProxy()
570 {
571 //
572 // The category must match the server admin category used by nodes
573 //
574 Ice::Identity adminId;
575 adminId.name = _id;
576 adminId.category = _cache.getInstanceName() + "-NodeServerAdminRouter";
577 return getProxy(true)->ice_identity(adminId);
578 }
579
580 AdapterPrx
getAdapter(const string & id,bool upToDate)581 ServerEntry::getAdapter(const string& id, bool upToDate)
582 {
583 //
584 // NOTE: this might throw AdapterNotExistException, NodeUnreachableException
585 // or DeploymentException.
586 //
587
588 int activationTimeout, deactivationTimeout;
589 return getAdapter(activationTimeout, deactivationTimeout, id, upToDate);
590 }
591
592 AdapterPrx
getAdapter(int & activationTimeout,int & deactivationTimeout,const string & id,bool upToDate)593 ServerEntry::getAdapter(int& activationTimeout, int& deactivationTimeout, const string& id, bool upToDate)
594 {
595 //
596 // NOTE: this might throw AdapterNotExistException, NodeUnreachableException
597 // or DeploymentException.
598 //
599 while(true)
600 {
601 {
602 Lock sync(*this);
603 if(_loaded.get() || (_proxy && _synchronizing && !upToDate)) // Synced or if not up to date is fine
604 {
605 AdapterPrxDict::const_iterator p = _adapters.find(id);
606 if(p != _adapters.end())
607 {
608 assert(p->second);
609 activationTimeout = _activationTimeout;
610 deactivationTimeout = _deactivationTimeout;
611 return p->second;
612 }
613 else
614 {
615 throw AdapterNotExistException(id);
616 }
617 }
618 else if(!_load.get() && !_destroy.get())
619 {
620 throw AdapterNotExistException(id);
621 }
622 }
623
624 syncImpl();
625 waitImpl(0); // Don't wait, just check for the result or throw SynchronizationException
626 }
627 }
628
629 float
getLoad(LoadSample sample) const630 ServerEntry::getLoad(LoadSample sample) const
631 {
632 string application;
633 string node;
634 {
635 Lock sync(*this);
636 if(_loaded.get())
637 {
638 application = _loaded->application;
639 node = _loaded->node;
640 }
641 else if(_load.get())
642 {
643 application = _load->application;
644 node = _load->node;
645 }
646 else
647 {
648 throw ServerNotExistException();
649 }
650 }
651
652 float factor;
653 LoadInfo load = _cache.getNodeCache().get(node)->getLoadInfoAndLoadFactor(application, factor);
654 switch(sample)
655 {
656 case LoadSample1:
657 return load.avg1 < 0.f ? 1.0f : load.avg1 * factor;
658 case LoadSample5:
659 return load.avg5 < 0.f ? 1.0f : load.avg5 * factor;
660 case LoadSample15:
661 return load.avg15 < 0.f ? 1.0f : load.avg15 * factor;
662 default:
663 assert(false);
664 return 1.0f;
665 }
666 }
667
668 void
syncImpl()669 ServerEntry::syncImpl()
670 {
671 ServerInfo load;
672 SessionIPtr session;
673 ServerInfo destroy;
674 int timeout = -1;
675 bool noRestart = false;
676
677 {
678 Lock sync(*this);
679 if(_synchronizing)
680 {
681 return;
682 }
683
684 if(!_load.get() && !_destroy.get())
685 {
686 _load.reset(_loaded.release()); // Re-load the current server.
687 }
688
689 _updated = false;
690 _exception.reset(0);
691
692 if(_destroy.get())
693 {
694 destroy = *_destroy;
695 timeout = _deactivationTimeout;
696 }
697 else if(_load.get())
698 {
699 load = *_load;
700 session = _allocationSession;
701 timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
702 }
703 else
704 {
705 return;
706 }
707
708 noRestart = _noRestart;
709 _synchronizing = true;
710 }
711
712 if(destroy.descriptor)
713 {
714 try
715 {
716 _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout, noRestart);
717 }
718 catch(const NodeNotExistException&)
719 {
720 exception(NodeUnreachableException(destroy.node, "node is not active"));
721 }
722 }
723 else if(load.descriptor)
724 {
725 try
726 {
727 _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
728 }
729 catch(const NodeNotExistException&)
730 {
731 exception(NodeUnreachableException(load.node, "node is not active"));
732 }
733 }
734 }
735
736 void
waitImpl(int timeout)737 ServerEntry::waitImpl(int timeout)
738 {
739 Lock sync(*this);
740 if(timeout != 0)
741 {
742 while(_synchronizing)
743 {
744 if(timeout > 0)
745 {
746 if(!timedWait(IceUtil::Time::seconds(timeout)))
747 {
748 break; // Timeout
749 }
750 }
751 else
752 {
753 wait();
754 }
755 }
756 }
757 if(_synchronizing) // If we are still synchronizing, throw SynchronizationException
758 {
759 throw SynchronizationException(__FILE__, __LINE__);
760 }
761
762 if(_exception.get())
763 {
764 try
765 {
766 _exception->ice_throw();
767 }
768 catch(const DeploymentException&)
769 {
770 throw;
771 }
772 catch(const NodeUnreachableException&)
773 {
774 throw;
775 }
776 catch(const Ice::Exception& ex) // This shouln't happen.
777 {
778 ostringstream os;
779 os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex;
780 TraceLevelsPtr traceLevels = _cache.getTraceLevels();
781 if(traceLevels)
782 {
783 Ice::Error err(traceLevels->logger);
784 err << os.str();
785 }
786 throw DeploymentException(os.str());
787 }
788 }
789 }
790
791 void
synchronized()792 ServerEntry::synchronized()
793 {
794 vector<SynchronizationCallbackPtr> callbacks;
795 {
796 Lock sync(*this);
797 _callbacks.swap(callbacks);
798 }
799 for(vector<SynchronizationCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
800 {
801 try
802 {
803 (*p)->synchronized();
804 }
805 catch(...)
806 {
807 assert(false);
808 }
809 }
810 }
811
812 void
synchronized(const Ice::Exception & ex)813 ServerEntry::synchronized(const Ice::Exception& ex)
814 {
815 vector<SynchronizationCallbackPtr> callbacks;
816 {
817 Lock sync(*this);
818 _callbacks.swap(callbacks);
819 }
820 for(vector<SynchronizationCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
821 {
822 try
823 {
824 (*p)->synchronized(ex);
825 }
826 catch(...)
827 {
828 assert(false);
829 }
830 }
831 }
832
833 void
loadCallback(const ServerPrx & proxy,const AdapterPrxDict & adpts,int at,int dt)834 ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, int at, int dt)
835 {
836 ServerInfo load;
837 SessionIPtr session;
838 ServerInfo destroy;
839 int timeout = -1;
840 bool synced = false;
841 bool noRestart = false;
842
843 {
844 Lock sync(*this);
845 if(!_updated)
846 {
847 //
848 // Set timeout on server and adapter proxies. Most of the
849 // calls on the proxies shouldn't block for longer than the
850 // node session timeout. Calls that might block for a longer
851 // time should set the correct timeout before invoking on the
852 // proxy (e.g.: server start/stop, adapter activate).
853 //
854 assert(_load.get());
855 _loaded.reset(_load.release());
856 _proxy = proxy;
857 _adapters = adpts;
858 _activationTimeout = at;
859 _deactivationTimeout = dt;
860
861 assert(!_destroy.get() && !_load.get());
862 _synchronizing = false;
863 synced = true;
864 notifyAll();
865 }
866 else
867 {
868 _updated = false;
869 if(_destroy.get())
870 {
871 destroy = *_destroy;
872 noRestart = _noRestart;
873 }
874 else if(_load.get())
875 {
876 load = *_load;
877 noRestart = _noRestart;
878 session = _allocationSession;
879 timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
880 }
881 }
882 }
883
884 if(synced)
885 {
886 synchronized();
887 return;
888 }
889
890 assert(destroy.descriptor || load.descriptor);
891 if(destroy.descriptor)
892 {
893 try
894 {
895 _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout, noRestart);
896 }
897 catch(const NodeNotExistException&)
898 {
899 exception(NodeUnreachableException(destroy.node, "node is not active"));
900 }
901 }
902 else if(load.descriptor)
903 {
904 try
905 {
906 _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
907 }
908 catch(const NodeNotExistException&)
909 {
910 exception(NodeUnreachableException(load.node, "node is not active"));
911 }
912 }
913 }
914
915 void
destroyCallback()916 ServerEntry::destroyCallback()
917 {
918 ServerInfo load;
919 bool noRestart = false;
920 SessionIPtr session;
921
922 {
923 Lock sync(*this);
924 _destroy.reset(0);
925 _proxy = 0;
926 _adapters.clear();
927 _activationTimeout = -1;
928 _deactivationTimeout = -1;
929
930 if(!_load.get())
931 {
932 assert(!_load.get() && !_loaded.get());
933 _synchronizing = false;
934 notifyAll();
935 }
936 else
937 {
938 _updated = false;
939 load = *_load;
940 noRestart = _noRestart;
941 session = _allocationSession;
942 }
943 }
944
945 if(load.descriptor)
946 {
947 try
948 {
949 _cache.getNodeCache().get(load.node)->loadServer(this, load, session, -1, noRestart);
950 }
951 catch(const NodeNotExistException&)
952 {
953 exception(NodeUnreachableException(load.node, "node is not active"));
954 }
955 }
956 else
957 {
958 synchronized();
959 _cache.clear(_id);
960 }
961 }
962
963 void
exception(const Ice::Exception & ex)964 ServerEntry::exception(const Ice::Exception& ex)
965 {
966 ServerInfo load;
967 SessionIPtr session;
968 bool noRestart = false;
969 bool remove = false;
970 int timeout = -1;
971
972 {
973 Lock sync(*this);
974 if((_destroy.get() && !_load.get()) || (!_destroy.get() && !_updated))
975 {
976 remove = _destroy.get();
977 _destroy.reset(0);
978 _exception.reset(ex.ice_clone());
979 _proxy = 0;
980 _adapters.clear();
981 _activationTimeout = -1;
982 _deactivationTimeout = -1;
983 _synchronizing = false;
984 notifyAll();
985 }
986 else
987 {
988 _destroy.reset(0);
989 _updated = false;
990 load = *_load.get();
991 noRestart = _noRestart;
992 session = _allocationSession;
993 timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
994 }
995 }
996
997 if(load.descriptor)
998 {
999 try
1000 {
1001 _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
1002 }
1003 catch(const NodeNotExistException&)
1004 {
1005 exception(NodeUnreachableException(load.node, "node is not active"));
1006 }
1007 }
1008 else
1009 {
1010 synchronized(ex);
1011 if(remove)
1012 {
1013 _cache.clear(_id);
1014 }
1015 }
1016 }
1017
1018 bool
isDestroyed()1019 ServerEntry::isDestroyed()
1020 {
1021 Lock sync(*this);
1022 return !_loaded.get() && !_load.get();
1023 }
1024
1025 bool
canRemove()1026 ServerEntry::canRemove()
1027 {
1028 Lock sync(*this);
1029 return !_loaded.get() && !_load.get() && !_destroy.get();
1030 }
1031
1032 CheckUpdateResultPtr
checkUpdate(const ServerInfo & info,bool noRestart)1033 ServerEntry::checkUpdate(const ServerInfo& info, bool noRestart)
1034 {
1035 SessionIPtr session;
1036 ServerInfo oldInfo;
1037 {
1038 Lock sync(*this);
1039 if(!_loaded.get() && !_load.get())
1040 {
1041 throw ServerNotExistException();
1042 }
1043
1044 oldInfo = _loaded.get() ? *_loaded : *_load;
1045 session = _allocationSession;
1046 }
1047
1048 NodeEntryPtr node;
1049 try
1050 {
1051 node = _cache.getNodeCache().get(oldInfo.node);
1052 }
1053 catch(const NodeNotExistException&)
1054 {
1055 throw NodeUnreachableException(info.node, "node is not active");
1056 }
1057
1058 ServerPrx server;
1059 try
1060 {
1061 server = getProxy(true, 5);
1062 }
1063 catch(const SynchronizationException&)
1064 {
1065 ostringstream os;
1066 os << "check for server `" << _id << "' update failed:";
1067 os << "timeout while waiting for the server to be loaded on the node";
1068 throw DeploymentException(os.str());
1069 }
1070 catch(const DeploymentException&)
1071 {
1072 if(noRestart)
1073 {
1074 // If the server can't be loaded and no restart is required, we throw
1075 // to indicate that the server update can't be checked.
1076 throw;
1077 }
1078 else
1079 {
1080 // Otherwise, we do as if the update is valid.
1081 return 0;
1082 }
1083 }
1084
1085 //
1086 // Provide a null descriptor if the server is to be removed from
1087 // the node. In this case, the check just ensures that the server
1088 // is stopped.
1089 //
1090 InternalServerDescriptorPtr desc;
1091 if(info.node == oldInfo.node && info.descriptor)
1092 {
1093 desc = node->getInternalServerDescriptor(info, session); // The new descriptor
1094 }
1095
1096 return new CheckUpdateResult(_id, oldInfo.node, noRestart, desc, server->begin_checkUpdate(desc, noRestart));
1097 }
1098
1099 bool
isEnabled() const1100 ServerEntry::isEnabled() const
1101 {
1102 return _cache.getNodeObserverTopic()->isServerEnabled(_id);
1103 }
1104
1105 void
allocated(const SessionIPtr & session)1106 ServerEntry::allocated(const SessionIPtr& session)
1107 {
1108 if(!_loaded.get() && !_load.get())
1109 {
1110 return;
1111 }
1112
1113 TraceLevelsPtr traceLevels = _cache.getTraceLevels();
1114 if(traceLevels && traceLevels->server > 1)
1115 {
1116 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1117 out << "server `" << _id << "' allocated by `" << session->getId() << "' (" << _count << ")";
1118 }
1119
1120 ServerDescriptorPtr desc = _loaded.get() ? _loaded->descriptor : _load->descriptor;
1121
1122 //
1123 // If the server has the session activation mode, we re-load the
1124 // server on the node as its deployment might have changed (it's
1125 // possible to use ${session.*} variable with server with the
1126 // session activation mode.
1127 //
1128 if(desc->activation == "session")
1129 {
1130 _updated = true;
1131 if(!_load.get())
1132 {
1133 _load.reset(_loaded.release());
1134 }
1135 _allocationSession = session;
1136 _load->sessionId = session->getId();
1137 }
1138
1139 Glacier2::IdentitySetPrx identitySet = session->getGlacier2IdentitySet();
1140 Glacier2::StringSetPrx adapterIdSet = session->getGlacier2AdapterIdSet();
1141 if(identitySet && adapterIdSet)
1142 {
1143 ServerHelperPtr helper = createHelper(desc);
1144 multiset<string> adapterIds;
1145 multiset<Ice::Identity> identities;
1146 helper->getIds(adapterIds, identities);
1147 try
1148 {
1149 //
1150 // SunCC won't accept the following:
1151 //
1152 // ctl->adapterIds()->add(Ice::StringSeq(adapterIds.begin(), adapterIds.end()));
1153 // ctl->identities()->add(Ice::IdentitySeq(identities.begin(), identities.end()));
1154 //
1155 Ice::StringSeq adapterIdSeq;
1156 for(multiset<string>::iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1157 {
1158 adapterIdSeq.push_back(*p);
1159 }
1160 Ice::IdentitySeq identitySeq;
1161 for(multiset<Ice::Identity>::iterator q = identities.begin(); q != identities.end(); ++q)
1162 {
1163 identitySeq.push_back(*q);
1164 }
1165 adapterIdSet->add(adapterIdSeq);
1166 identitySet->add(identitySeq);
1167 }
1168 catch(const Ice::LocalException& ex)
1169 {
1170 if(traceLevels && traceLevels->server > 0)
1171 {
1172 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1173 out << "couldn't add Glacier2 filters for server `" << _id << "' allocated by `"
1174 << session->getId() << ":\n" << ex;
1175 }
1176 }
1177 }
1178 }
1179
1180 void
allocatedNoSync(const SessionIPtr &)1181 ServerEntry::allocatedNoSync(const SessionIPtr& /*session*/)
1182 {
1183 {
1184 Lock sync(*this);
1185 if(!_updated ||
1186 (_loaded.get() && _loaded->descriptor->activation != "session") ||
1187 (_load.get() && _load->descriptor->activation != "session"))
1188 {
1189 return;
1190 }
1191 }
1192
1193 sync();
1194 waitForSyncNoThrow();
1195 }
1196
1197 void
released(const SessionIPtr & session)1198 ServerEntry::released(const SessionIPtr& session)
1199 {
1200 if(!_loaded.get() && !_load.get())
1201 {
1202 return;
1203 }
1204
1205 ServerDescriptorPtr desc = _loaded.get() ? _loaded->descriptor : _load->descriptor;
1206
1207 //
1208 // If the server has the session activation mode, we re-load the
1209 // server on the node as its deployment might have changed (it's
1210 // possible to use ${session.*} variable with server with the
1211 // session activation mode. Synchronizing the server will also
1212 // shutdown the server on the node.
1213 //
1214 if(desc->activation == "session")
1215 {
1216 _updated = true;
1217 if(!_load.get())
1218 {
1219 _load.reset(_loaded.release());
1220 }
1221 _load->sessionId = "";
1222 _allocationSession = 0;
1223 }
1224
1225 TraceLevelsPtr traceLevels = _cache.getTraceLevels();
1226
1227 Glacier2::IdentitySetPrx identitySet = session->getGlacier2IdentitySet();
1228 Glacier2::StringSetPrx adapterIdSet = session->getGlacier2AdapterIdSet();
1229 if(identitySet && adapterIdSet)
1230 {
1231 ServerHelperPtr helper = createHelper(desc);
1232 multiset<string> adapterIds;
1233 multiset<Ice::Identity> identities;
1234 helper->getIds(adapterIds, identities);
1235 try
1236 {
1237 //
1238 // SunCC won't accept the following:
1239 //
1240 // ctl->adapterIds()->remove(Ice::StringSeq(adapterIds.begin(), adapterIds.end()));
1241 // ctl->identities()->remove(Ice::IdentitySeq(identities.begin(), identities.end()));
1242 //
1243 Ice::StringSeq adapterIdSeq;
1244 for(multiset<string>::iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1245 {
1246 adapterIdSeq.push_back(*p);
1247 }
1248 Ice::IdentitySeq identitySeq;
1249 for(multiset<Ice::Identity>::iterator q = identities.begin(); q != identities.end(); ++q)
1250 {
1251 identitySeq.push_back(*q);
1252 }
1253 adapterIdSet->remove(adapterIdSeq);
1254 identitySet->remove(identitySeq);
1255 }
1256 catch(const Ice::LocalException& ex)
1257 {
1258 if(traceLevels && traceLevels->server > 0)
1259 {
1260 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1261 out << "couldn't remove Glacier2 filters for server `" << _id << "' allocated by `";
1262 out << session->getId() << ":\n" << ex;
1263 }
1264 }
1265 }
1266
1267 if(traceLevels && traceLevels->server > 1)
1268 {
1269 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1270 out << "server `" << _id << "' released by `" << session->getId() << "' (" << _count << ")";
1271 }
1272 }
1273
1274 void
releasedNoSync(const SessionIPtr &)1275 ServerEntry::releasedNoSync(const SessionIPtr& /*session*/)
1276 {
1277 {
1278 Lock sync(*this);
1279 if(!_updated ||
1280 (_loaded.get() && _loaded->descriptor->activation != "session") ||
1281 (_load.get() && _load->descriptor->activation != "session"))
1282 {
1283 return;
1284 }
1285 }
1286
1287 sync();
1288 waitForSyncNoThrow();
1289 }
1290