1 //
2 // Copyright (c) ZeroC, Inc. All rights reserved.
3 //
4 
5 #include <IceUtil/DisableWarnings.h>
6 #include <Ice/LoggerUtil.h>
7 #include <Ice/Communicator.h>
8 #include <Ice/LocalException.h>
9 #include <IceGrid/ServerCache.h>
10 #include <IceGrid/NodeCache.h>
11 #include <IceGrid/AdapterCache.h>
12 #include <IceGrid/ObjectCache.h>
13 #include <IceGrid/AllocatableObjectCache.h>
14 #include <IceGrid/SessionI.h>
15 #include <IceGrid/DescriptorHelper.h>
16 #include <IceGrid/Topics.h>
17 
18 using namespace std;
19 using namespace IceGrid;
20 
21 namespace IceGrid
22 {
23 
24     struct AddCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void>
25     {
AddCommunicatorIceGrid::AddCommunicator26         AddCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry, const string& application) :
27             _serverCache(serverCache), _entry(entry), _application(application)
28         {
29         }
30 
31         void
operator ()IceGrid::AddCommunicator32         operator()(const CommunicatorDescriptorPtr& desc)
33         {
34             _serverCache.addCommunicator(0, desc, _entry, _application);
35         }
36 
37         void
operator ()IceGrid::AddCommunicator38         operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc)
39         {
40             _serverCache.addCommunicator(oldDesc, newDesc, _entry, _application);
41         }
42 
43         ServerCache& _serverCache;
44         const ServerEntryPtr _entry;
45         const string _application;
46     };
47 
48     struct RemoveCommunicator : std::unary_function<CommunicatorDescriptorPtr&, void>
49     {
RemoveCommunicatorIceGrid::RemoveCommunicator50         RemoveCommunicator(ServerCache& serverCache, const ServerEntryPtr& entry) :
51             _serverCache(serverCache), _entry(entry)
52         {
53         }
54 
55         void
operator ()IceGrid::RemoveCommunicator56         operator()(const CommunicatorDescriptorPtr& desc)
57         {
58             _serverCache.removeCommunicator(desc, 0, _entry);
59         }
60 
61         void
operator ()IceGrid::RemoveCommunicator62         operator()(const CommunicatorDescriptorPtr& oldDesc, const CommunicatorDescriptorPtr& newDesc)
63         {
64             _serverCache.removeCommunicator(oldDesc, newDesc, _entry);
65         }
66 
67         ServerCache& _serverCache;
68         const ServerEntryPtr _entry;
69     };
70 
71 }
72 
CheckUpdateResult(const string & server,const string & node,bool noRestart,bool remove,const Ice::AsyncResultPtr & result)73 CheckUpdateResult::CheckUpdateResult(const string& server,
74                                      const string& node,
75                                      bool noRestart,
76                                      bool remove,
77                                      const Ice::AsyncResultPtr& result) :
78     _server(server), _node(node), _remove(remove), _noRestart(noRestart), _result(result)
79 {
80 }
81 
82 bool
getResult()83 CheckUpdateResult::getResult()
84 {
85     try
86     {
87         return ServerPrx::uncheckedCast(_result->getProxy())->end_checkUpdate(_result);
88     }
89     catch(const DeploymentException& ex)
90     {
91         ostringstream os;
92         if(_remove)
93         {
94             os << "check for server `" << _server << "' remove failed: " << ex.reason;
95         }
96         else
97         {
98             os << "check for server `" << _server << "' update failed: " << ex.reason;
99         }
100         throw DeploymentException(os.str());
101     }
102     catch(const Ice::OperationNotExistException&)
103     {
104         if(_noRestart)
105         {
106             throw DeploymentException("server `" + _server + "' doesn't support check for updates");
107         }
108         return false;
109     }
110     catch(const Ice::Exception& ex)
111     {
112         ostringstream os;
113         os << ex;
114         throw NodeUnreachableException(_node, os.str());
115     }
116 }
117 
ServerCache(const Ice::CommunicatorPtr & communicator,const string & instanceName,NodeCache & nodeCache,AdapterCache & adapterCache,ObjectCache & objectCache,AllocatableObjectCache & allocatableObjectCache)118 ServerCache::ServerCache(const Ice::CommunicatorPtr& communicator,
119                          const string& instanceName,
120                          NodeCache& nodeCache,
121                          AdapterCache& adapterCache,
122                          ObjectCache& objectCache,
123                          AllocatableObjectCache& allocatableObjectCache) :
124     _communicator(communicator),
125     _instanceName(instanceName),
126     _nodeCache(nodeCache),
127     _adapterCache(adapterCache),
128     _objectCache(objectCache),
129     _allocatableObjectCache(allocatableObjectCache)
130 {
131 }
132 
133 ServerEntryPtr
add(const ServerInfo & info)134 ServerCache::add(const ServerInfo& info)
135 {
136     Lock sync(*this);
137 
138     ServerEntryPtr entry = getImpl(info.descriptor->id);
139     if(!entry)
140     {
141         entry = new ServerEntry(*this, info.descriptor->id);
142         addImpl(info.descriptor->id, entry);
143     }
144     entry->update(info, false);
145 
146     _nodeCache.get(info.node, true)->addServer(entry);
147 
148     forEachCommunicator(AddCommunicator(*this, entry, info.application))(info.descriptor);
149 
150     if(_traceLevels && _traceLevels->server > 0)
151     {
152         Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
153         out << "added server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')";
154     }
155 
156     return entry;
157 }
158 
159 ServerEntryPtr
get(const string & id) const160 ServerCache::get(const string& id) const
161 {
162     Lock sync(*this);
163     ServerEntryPtr entry = getImpl(id);
164     if(!entry)
165     {
166         throw ServerNotExistException(id);
167     }
168     return entry;
169 }
170 
171 bool
has(const string & id) const172 ServerCache::has(const string& id) const
173 {
174     Lock sync(*this);
175     ServerEntryPtr entry = getImpl(id);
176     return entry && !entry->isDestroyed();
177 }
178 
179 ServerEntryPtr
remove(const string & id,bool noRestart)180 ServerCache::remove(const string& id, bool noRestart)
181 {
182     Lock sync(*this);
183 
184     ServerEntryPtr entry = getImpl(id);
185     assert(entry);
186 
187     ServerInfo info = entry->getInfo();
188     forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor);
189 
190     _nodeCache.get(info.node)->removeServer(entry);
191 
192     entry->destroy(noRestart); // This must be done after otherwise some allocatable objects
193                                // might allocate a destroyed server.
194 
195     if(_traceLevels && _traceLevels->server > 0)
196     {
197         Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
198         out << "removed server `" << id << "'";
199     }
200 
201     return entry;
202 }
203 
204 void
preUpdate(const ServerInfo & newInfo,bool noRestart)205 ServerCache::preUpdate(const ServerInfo& newInfo, bool noRestart)
206 {
207     Lock sync(*this);
208 
209     const string& id = newInfo.descriptor->id;
210     ServerEntryPtr entry = getImpl(id);
211     assert(entry);
212 
213     if(!noRestart)
214     {
215         ServerInfo info = entry->getInfo();
216         forEachCommunicator(RemoveCommunicator(*this, entry))(info.descriptor, newInfo.descriptor);
217         _nodeCache.get(info.node)->removeServer(entry);
218     }
219 
220     if(_traceLevels && _traceLevels->server > 0)
221     {
222         Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
223         out << "updating server `" << id << "'";
224         if(noRestart)
225         {
226             out << " with no restart";
227         }
228     }
229 }
230 
231 ServerEntryPtr
postUpdate(const ServerInfo & info,bool noRestart)232 ServerCache::postUpdate(const ServerInfo& info, bool noRestart)
233 {
234     Lock sync(*this);
235 
236     ServerEntryPtr entry = getImpl(info.descriptor->id);
237     assert(entry);
238 
239     ServerInfo oldInfo = entry->getInfo();
240     entry->update(info, noRestart);
241 
242     if(!noRestart)
243     {
244         _nodeCache.get(info.node, true)->addServer(entry);
245         forEachCommunicator(AddCommunicator(*this, entry, info.application))(oldInfo.descriptor, info.descriptor);
246     }
247 
248     if(_traceLevels && _traceLevels->server > 0)
249     {
250         Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
251         out << "updated server `" << info.descriptor->id << "' (`" << info.uuid << "', `" << info.revision << "')";
252     }
253 
254     return entry;
255 }
256 
257 void
clear(const string & id)258 ServerCache::clear(const string& id)
259 {
260     Lock sync(*this);
261     CacheByString<ServerEntry>::removeImpl(id);
262 }
263 
264 void
setNodeObserverTopic(const NodeObserverTopicPtr & nodeObserverTopic)265 ServerCache::setNodeObserverTopic(const NodeObserverTopicPtr& nodeObserverTopic)
266 {
267     _nodeObserverTopic = nodeObserverTopic;
268 }
269 
270 void
addCommunicator(const CommunicatorDescriptorPtr & oldDesc,const CommunicatorDescriptorPtr & newDesc,const ServerEntryPtr & server,const string & application)271 ServerCache::addCommunicator(const CommunicatorDescriptorPtr& oldDesc,
272                              const CommunicatorDescriptorPtr& newDesc,
273                              const ServerEntryPtr& server,
274                              const string& application)
275 {
276     if(!newDesc)
277     {
278         return; // Nothing to add
279     }
280     for(AdapterDescriptorSeq::const_iterator q = newDesc->adapters.begin() ; q != newDesc->adapters.end(); ++q)
281     {
282         AdapterDescriptor oldAdpt;
283         if(oldDesc)
284         {
285             for(AdapterDescriptorSeq::const_iterator p = oldDesc->adapters.begin() ; p != oldDesc->adapters.end(); ++p)
286             {
287                 if(p->id == q->id)
288                 {
289                     oldAdpt = *p;
290                     break;
291                 }
292             }
293         }
294         assert(!q->id.empty());
295         _adapterCache.addServerAdapter(*q, server, application);
296 
297         for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
298         {
299             _objectCache.add(toObjectInfo(_communicator, *r, q->id), application, server->getId());
300         }
301         for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r)
302         {
303             ObjectDescriptorSeq::const_iterator s;
304             for(s = oldAdpt.allocatables.begin(); s != oldAdpt.allocatables.end() && s->id != r->id; ++s);
305             if(s == oldAdpt.allocatables.end() || *s != *r) // Only add new or updated allocatables
306             {
307                 _allocatableObjectCache.add(toObjectInfo(_communicator, *r, q->id), server);
308             }
309         }
310     }
311 }
312 
313 void
removeCommunicator(const CommunicatorDescriptorPtr & oldDesc,const CommunicatorDescriptorPtr & newDesc,const ServerEntryPtr &)314 ServerCache::removeCommunicator(const CommunicatorDescriptorPtr& oldDesc,
315                                 const CommunicatorDescriptorPtr& newDesc,
316                                 const ServerEntryPtr& /*entry*/)
317 {
318     if(!oldDesc)
319     {
320         return; // Nothing to remove
321     }
322     for(AdapterDescriptorSeq::const_iterator q = oldDesc->adapters.begin() ; q != oldDesc->adapters.end(); ++q)
323     {
324         AdapterDescriptor newAdpt;
325         if(newDesc)
326         {
327             for(AdapterDescriptorSeq::const_iterator p = newDesc->adapters.begin() ; p != newDesc->adapters.end(); ++p)
328             {
329                 if(p->id == q->id)
330                 {
331                     newAdpt = *p;
332                     break;
333                 }
334             }
335         }
336 
337         for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
338         {
339             _objectCache.remove((*r).id);
340         }
341         for(ObjectDescriptorSeq::const_iterator r = q->allocatables.begin(); r != q->allocatables.end(); ++r)
342         {
343             // Don't remove the allocatable if it's still in the new descriptor.
344             ObjectDescriptorSeq::const_iterator s;
345             for(s = newAdpt.allocatables.begin(); s != newAdpt.allocatables.end() && s->id != r->id; ++s);
346             if(s == newAdpt.allocatables.end() || *s != *r) // Only removed updated or removed allocatables
347             {
348                 _allocatableObjectCache.remove(r->id);
349             }
350         }
351         _adapterCache.removeServerAdapter(q->id);
352     }
353 }
354 
ServerEntry(ServerCache & cache,const string & id)355 ServerEntry::ServerEntry(ServerCache& cache, const string& id) :
356     Allocatable(false, 0),
357     _cache(cache),
358     _id(id),
359     _activationTimeout(-1),
360     _deactivationTimeout(-1),
361     _synchronizing(false),
362     _updated(false),
363     _noRestart(false)
364 {
365 }
366 
367 void
sync()368 ServerEntry::sync()
369 {
370     syncImpl();
371 }
372 
373 void
waitForSync(int timeout)374 ServerEntry::waitForSync(int timeout)
375 {
376     waitImpl(timeout);
377 }
378 
379 void
waitForSyncNoThrow(int timeout)380 ServerEntry::waitForSyncNoThrow(int timeout)
381 {
382     try
383     {
384         waitImpl(timeout);
385     }
386     catch(const SynchronizationException&)
387     {
388         assert(timeout >= 0);
389     }
390     catch(const Ice::Exception&)
391     {
392     }
393 }
394 
395 void
unsync()396 ServerEntry::unsync()
397 {
398     Lock sync(*this);
399     if(_loaded.get())
400     {
401         _load.reset(_loaded.release());
402     }
403     _proxy = 0;
404     _adapters.clear();
405     _activationTimeout = -1;
406     _deactivationTimeout = -1;
407 }
408 
409 bool
addSyncCallback(const SynchronizationCallbackPtr & callback)410 ServerEntry::addSyncCallback(const SynchronizationCallbackPtr& callback)
411 {
412     Lock sync(*this);
413     if(!_loaded.get() && !_load.get())
414     {
415         throw ServerNotExistException();
416     }
417     if(_synchronizing)
418     {
419         _callbacks.push_back(callback);
420     }
421     return _synchronizing;
422 }
423 
424 void
update(const ServerInfo & info,bool noRestart)425 ServerEntry::update(const ServerInfo& info, bool noRestart)
426 {
427     Lock sync(*this);
428 
429     IceInternal::UniquePtr<ServerInfo> descriptor(new ServerInfo());
430     *descriptor = info;
431 
432     _updated = true;
433 
434     if(!_destroy.get())
435     {
436         if(_loaded.get() && descriptor->node != _loaded->node)
437         {
438             _destroy.reset(_loaded.release());
439         }
440         else if(_load.get() && descriptor->node != _load->node)
441         {
442             _destroy.reset(_load.release());
443         }
444     }
445 
446     _load.reset(descriptor.release());
447     _noRestart = noRestart;
448     _loaded.reset();
449     _allocatable = info.descriptor->allocatable;
450     if(info.descriptor->activation == "session")
451     {
452         _allocatable = true;
453         _load->sessionId = _allocationSession ? _allocationSession->getId() : string("");
454     }
455 }
456 
457 void
destroy(bool noRestart)458 ServerEntry::destroy(bool noRestart)
459 {
460     Lock sync(*this);
461 
462     _updated = true;
463 
464     assert(_loaded.get() || _load.get());
465     if(!_destroy.get())
466     {
467         if(_loaded.get())
468         {
469             assert(!_destroy.get());
470             _destroy.reset(_loaded.release());
471         }
472         else if(_load.get())
473         {
474             assert(!_destroy.get());
475             _destroy.reset(_load.release());
476         }
477     }
478 
479     _noRestart = noRestart;
480     _load.reset();
481     _loaded.reset();
482     _allocatable = false;
483 }
484 
485 ServerInfo
getInfo(bool resolve) const486 ServerEntry::getInfo(bool resolve) const
487 {
488     ServerInfo info;
489     SessionIPtr session;
490     {
491         Lock sync(*this);
492         if(!_loaded.get() && !_load.get())
493         {
494             throw ServerNotExistException();
495         }
496         info = _loaded.get() ? *_loaded : *_load;
497         session = _allocationSession;
498     }
499     assert(info.descriptor);
500     if(resolve)
501     {
502         try
503         {
504             return _cache.getNodeCache().get(info.node)->getServerInfo(info, session);
505         }
506         catch(const DeploymentException&)
507         {
508         }
509         catch(const NodeNotExistException&)
510         {
511         }
512         catch(const NodeUnreachableException&)
513         {
514         }
515     }
516     return info;
517 }
518 
519 string
getId() const520 ServerEntry::getId() const
521 {
522     return _id;
523 }
524 
525 ServerPrx
getProxy(bool upToDate,int timeout)526 ServerEntry::getProxy(bool upToDate, int timeout)
527 {
528     //
529     // NOTE: this might throw ServerNotExistException, NodeUnreachableException
530     // or DeploymentException.
531     //
532 
533     int actTimeout, deactTimeout;
534     string node;
535     return getProxy(actTimeout, deactTimeout, node, upToDate, timeout);
536 }
537 
538 ServerPrx
getProxy(int & activationTimeout,int & deactivationTimeout,string & node,bool upToDate,int timeout)539 ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& node, bool upToDate, int timeout)
540 {
541     //
542     // NOTE: this might throw ServerNotExistException, NodeUnreachableException
543     // or DeploymentException.
544     //
545     while(true)
546     {
547         {
548             Lock sync(*this);
549             if(_loaded.get() || (_proxy && _synchronizing && !upToDate)) // Synced or if not up to date is fine
550             {
551                 assert(_loaded.get() || _load.get() || _destroy.get());
552                 activationTimeout = _activationTimeout;
553                 deactivationTimeout = _deactivationTimeout;
554                 node = _loaded.get() ? _loaded->node : (_load.get() ? _load->node : _destroy->node);
555                 return _proxy;
556             }
557             else if(!_load.get() && !_destroy.get())
558             {
559                 throw ServerNotExistException(_id);
560             }
561         }
562 
563         syncImpl();
564         waitImpl(timeout);
565     }
566 }
567 
568 Ice::ObjectPrx
getAdminProxy()569 ServerEntry::getAdminProxy()
570 {
571     //
572     // The category must match the server admin category used by nodes
573     //
574     Ice::Identity adminId;
575     adminId.name = _id;
576     adminId.category = _cache.getInstanceName() + "-NodeServerAdminRouter";
577     return getProxy(true)->ice_identity(adminId);
578 }
579 
580 AdapterPrx
getAdapter(const string & id,bool upToDate)581 ServerEntry::getAdapter(const string& id, bool upToDate)
582 {
583     //
584     // NOTE: this might throw AdapterNotExistException, NodeUnreachableException
585     // or DeploymentException.
586     //
587 
588     int activationTimeout, deactivationTimeout;
589     return getAdapter(activationTimeout, deactivationTimeout, id, upToDate);
590 }
591 
592 AdapterPrx
getAdapter(int & activationTimeout,int & deactivationTimeout,const string & id,bool upToDate)593 ServerEntry::getAdapter(int& activationTimeout, int& deactivationTimeout, const string& id, bool upToDate)
594 {
595     //
596     // NOTE: this might throw AdapterNotExistException, NodeUnreachableException
597     // or DeploymentException.
598     //
599     while(true)
600     {
601         {
602             Lock sync(*this);
603             if(_loaded.get() || (_proxy && _synchronizing && !upToDate)) // Synced or if not up to date is fine
604             {
605                 AdapterPrxDict::const_iterator p = _adapters.find(id);
606                 if(p != _adapters.end())
607                 {
608                     assert(p->second);
609                     activationTimeout = _activationTimeout;
610                     deactivationTimeout = _deactivationTimeout;
611                     return p->second;
612                 }
613                 else
614                 {
615                     throw AdapterNotExistException(id);
616                 }
617             }
618             else if(!_load.get() && !_destroy.get())
619             {
620                 throw AdapterNotExistException(id);
621             }
622         }
623 
624         syncImpl();
625         waitImpl(0); // Don't wait, just check for the result or throw SynchronizationException
626     }
627 }
628 
629 float
getLoad(LoadSample sample) const630 ServerEntry::getLoad(LoadSample sample) const
631 {
632     string application;
633     string node;
634     {
635         Lock sync(*this);
636         if(_loaded.get())
637         {
638             application = _loaded->application;
639             node = _loaded->node;
640         }
641         else if(_load.get())
642         {
643             application = _load->application;
644             node = _load->node;
645         }
646         else
647         {
648             throw ServerNotExistException();
649         }
650     }
651 
652     float factor;
653     LoadInfo load = _cache.getNodeCache().get(node)->getLoadInfoAndLoadFactor(application, factor);
654     switch(sample)
655     {
656     case LoadSample1:
657         return load.avg1 < 0.f ? 1.0f : load.avg1 * factor;
658     case LoadSample5:
659         return load.avg5 < 0.f ? 1.0f : load.avg5 * factor;
660     case LoadSample15:
661         return load.avg15 < 0.f ? 1.0f : load.avg15 * factor;
662     default:
663         assert(false);
664         return 1.0f;
665     }
666 }
667 
668 void
syncImpl()669 ServerEntry::syncImpl()
670 {
671     ServerInfo load;
672     SessionIPtr session;
673     ServerInfo destroy;
674     int timeout = -1;
675     bool noRestart = false;
676 
677     {
678         Lock sync(*this);
679         if(_synchronizing)
680         {
681             return;
682         }
683 
684         if(!_load.get() && !_destroy.get())
685         {
686             _load.reset(_loaded.release()); // Re-load the current server.
687         }
688 
689         _updated = false;
690         _exception.reset(0);
691 
692         if(_destroy.get())
693         {
694             destroy = *_destroy;
695             timeout = _deactivationTimeout;
696         }
697         else if(_load.get())
698         {
699             load = *_load;
700             session = _allocationSession;
701             timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
702         }
703         else
704         {
705             return;
706         }
707 
708         noRestart = _noRestart;
709         _synchronizing = true;
710     }
711 
712     if(destroy.descriptor)
713     {
714         try
715         {
716             _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout, noRestart);
717         }
718         catch(const NodeNotExistException&)
719         {
720             exception(NodeUnreachableException(destroy.node, "node is not active"));
721         }
722     }
723     else if(load.descriptor)
724     {
725         try
726         {
727             _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
728         }
729         catch(const NodeNotExistException&)
730         {
731             exception(NodeUnreachableException(load.node, "node is not active"));
732         }
733     }
734 }
735 
736 void
waitImpl(int timeout)737 ServerEntry::waitImpl(int timeout)
738 {
739     Lock sync(*this);
740     if(timeout != 0)
741     {
742         while(_synchronizing)
743         {
744             if(timeout > 0)
745             {
746                 if(!timedWait(IceUtil::Time::seconds(timeout)))
747                 {
748                     break; // Timeout
749                 }
750             }
751             else
752             {
753                 wait();
754             }
755         }
756     }
757     if(_synchronizing) // If we are still synchronizing, throw SynchronizationException
758     {
759         throw SynchronizationException(__FILE__, __LINE__);
760     }
761 
762     if(_exception.get())
763     {
764         try
765         {
766             _exception->ice_throw();
767         }
768         catch(const DeploymentException&)
769         {
770             throw;
771         }
772         catch(const NodeUnreachableException&)
773         {
774             throw;
775         }
776         catch(const Ice::Exception& ex) // This shouln't happen.
777         {
778             ostringstream os;
779             os << "unexpected exception while synchronizing server `" + _id + "':\n" << ex;
780             TraceLevelsPtr traceLevels = _cache.getTraceLevels();
781             if(traceLevels)
782             {
783                 Ice::Error err(traceLevels->logger);
784                 err << os.str();
785             }
786             throw DeploymentException(os.str());
787         }
788     }
789 }
790 
791 void
synchronized()792 ServerEntry::synchronized()
793 {
794     vector<SynchronizationCallbackPtr> callbacks;
795     {
796         Lock sync(*this);
797         _callbacks.swap(callbacks);
798     }
799     for(vector<SynchronizationCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
800     {
801         try
802         {
803             (*p)->synchronized();
804         }
805         catch(...)
806         {
807             assert(false);
808         }
809     }
810 }
811 
812 void
synchronized(const Ice::Exception & ex)813 ServerEntry::synchronized(const Ice::Exception& ex)
814 {
815     vector<SynchronizationCallbackPtr> callbacks;
816     {
817         Lock sync(*this);
818         _callbacks.swap(callbacks);
819     }
820     for(vector<SynchronizationCallbackPtr>::const_iterator p = callbacks.begin(); p != callbacks.end(); ++p)
821     {
822         try
823         {
824             (*p)->synchronized(ex);
825         }
826         catch(...)
827         {
828             assert(false);
829         }
830     }
831 }
832 
833 void
loadCallback(const ServerPrx & proxy,const AdapterPrxDict & adpts,int at,int dt)834 ServerEntry::loadCallback(const ServerPrx& proxy, const AdapterPrxDict& adpts, int at, int dt)
835 {
836     ServerInfo load;
837     SessionIPtr session;
838     ServerInfo destroy;
839     int timeout = -1;
840     bool synced = false;
841     bool noRestart = false;
842 
843     {
844         Lock sync(*this);
845         if(!_updated)
846         {
847             //
848             // Set timeout on server and adapter proxies. Most of the
849             // calls on the proxies shouldn't block for longer than the
850             // node session timeout. Calls that might block for a longer
851             // time should set the correct timeout before invoking on the
852             // proxy (e.g.: server start/stop, adapter activate).
853             //
854             assert(_load.get());
855             _loaded.reset(_load.release());
856             _proxy = proxy;
857             _adapters = adpts;
858             _activationTimeout = at;
859             _deactivationTimeout = dt;
860 
861             assert(!_destroy.get() && !_load.get());
862             _synchronizing = false;
863             synced = true;
864             notifyAll();
865         }
866         else
867         {
868             _updated = false;
869             if(_destroy.get())
870             {
871                 destroy = *_destroy;
872                 noRestart = _noRestart;
873             }
874             else if(_load.get())
875             {
876                 load = *_load;
877                 noRestart = _noRestart;
878                 session = _allocationSession;
879                 timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
880             }
881         }
882     }
883 
884     if(synced)
885     {
886         synchronized();
887         return;
888     }
889 
890     assert(destroy.descriptor || load.descriptor);
891     if(destroy.descriptor)
892     {
893         try
894         {
895             _cache.getNodeCache().get(destroy.node)->destroyServer(this, destroy, timeout, noRestart);
896         }
897         catch(const NodeNotExistException&)
898         {
899             exception(NodeUnreachableException(destroy.node, "node is not active"));
900         }
901     }
902     else if(load.descriptor)
903     {
904         try
905         {
906             _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
907         }
908         catch(const NodeNotExistException&)
909         {
910             exception(NodeUnreachableException(load.node, "node is not active"));
911         }
912     }
913 }
914 
915 void
destroyCallback()916 ServerEntry::destroyCallback()
917 {
918     ServerInfo load;
919     bool noRestart = false;
920     SessionIPtr session;
921 
922     {
923         Lock sync(*this);
924         _destroy.reset(0);
925         _proxy = 0;
926         _adapters.clear();
927         _activationTimeout = -1;
928         _deactivationTimeout = -1;
929 
930         if(!_load.get())
931         {
932             assert(!_load.get() && !_loaded.get());
933             _synchronizing = false;
934             notifyAll();
935         }
936         else
937         {
938             _updated = false;
939             load = *_load;
940             noRestart = _noRestart;
941             session = _allocationSession;
942         }
943     }
944 
945     if(load.descriptor)
946     {
947         try
948         {
949             _cache.getNodeCache().get(load.node)->loadServer(this, load, session, -1, noRestart);
950         }
951         catch(const NodeNotExistException&)
952         {
953             exception(NodeUnreachableException(load.node, "node is not active"));
954         }
955     }
956     else
957     {
958         synchronized();
959         _cache.clear(_id);
960     }
961 }
962 
963 void
exception(const Ice::Exception & ex)964 ServerEntry::exception(const Ice::Exception& ex)
965 {
966     ServerInfo load;
967     SessionIPtr session;
968     bool noRestart = false;
969     bool remove = false;
970     int timeout = -1;
971 
972     {
973         Lock sync(*this);
974         if((_destroy.get() && !_load.get()) || (!_destroy.get() && !_updated))
975         {
976             remove = _destroy.get();
977             _destroy.reset(0);
978             _exception.reset(ex.ice_clone());
979             _proxy = 0;
980             _adapters.clear();
981             _activationTimeout = -1;
982             _deactivationTimeout = -1;
983             _synchronizing = false;
984             notifyAll();
985         }
986         else
987         {
988             _destroy.reset(0);
989             _updated = false;
990             load = *_load.get();
991             noRestart = _noRestart;
992             session = _allocationSession;
993             timeout = _deactivationTimeout; // loadServer might block to deactivate the previous server.
994         }
995     }
996 
997     if(load.descriptor)
998     {
999         try
1000         {
1001             _cache.getNodeCache().get(load.node)->loadServer(this, load, session, timeout, noRestart);
1002         }
1003         catch(const NodeNotExistException&)
1004         {
1005             exception(NodeUnreachableException(load.node, "node is not active"));
1006         }
1007     }
1008     else
1009     {
1010         synchronized(ex);
1011         if(remove)
1012         {
1013             _cache.clear(_id);
1014         }
1015     }
1016 }
1017 
1018 bool
isDestroyed()1019 ServerEntry::isDestroyed()
1020 {
1021      Lock sync(*this);
1022      return !_loaded.get() && !_load.get();
1023 }
1024 
1025 bool
canRemove()1026 ServerEntry::canRemove()
1027 {
1028      Lock sync(*this);
1029      return !_loaded.get() && !_load.get() && !_destroy.get();
1030 }
1031 
1032 CheckUpdateResultPtr
checkUpdate(const ServerInfo & info,bool noRestart)1033 ServerEntry::checkUpdate(const ServerInfo& info, bool noRestart)
1034 {
1035     SessionIPtr session;
1036     ServerInfo oldInfo;
1037     {
1038         Lock sync(*this);
1039         if(!_loaded.get() && !_load.get())
1040         {
1041             throw ServerNotExistException();
1042         }
1043 
1044         oldInfo = _loaded.get() ? *_loaded : *_load;
1045         session = _allocationSession;
1046     }
1047 
1048     NodeEntryPtr node;
1049     try
1050     {
1051         node = _cache.getNodeCache().get(oldInfo.node);
1052     }
1053     catch(const NodeNotExistException&)
1054     {
1055         throw NodeUnreachableException(info.node, "node is not active");
1056     }
1057 
1058     ServerPrx server;
1059     try
1060     {
1061         server = getProxy(true, 5);
1062     }
1063     catch(const SynchronizationException&)
1064     {
1065         ostringstream os;
1066         os << "check for server `" << _id << "' update failed:";
1067         os << "timeout while waiting for the server to be loaded on the node";
1068         throw DeploymentException(os.str());
1069     }
1070     catch(const DeploymentException&)
1071     {
1072         if(noRestart)
1073         {
1074             // If the server can't be loaded and no restart is required, we throw
1075             // to indicate that the server update can't be checked.
1076             throw;
1077         }
1078         else
1079         {
1080             // Otherwise, we do as if the update is valid.
1081             return 0;
1082         }
1083     }
1084 
1085     //
1086     // Provide a null descriptor if the server is to be removed from
1087     // the node. In this case, the check just ensures that the server
1088     // is stopped.
1089     //
1090     InternalServerDescriptorPtr desc;
1091     if(info.node == oldInfo.node && info.descriptor)
1092     {
1093         desc = node->getInternalServerDescriptor(info, session); // The new descriptor
1094     }
1095 
1096     return new CheckUpdateResult(_id, oldInfo.node, noRestart, desc, server->begin_checkUpdate(desc, noRestart));
1097 }
1098 
1099 bool
isEnabled() const1100 ServerEntry::isEnabled() const
1101 {
1102     return _cache.getNodeObserverTopic()->isServerEnabled(_id);
1103 }
1104 
1105 void
allocated(const SessionIPtr & session)1106 ServerEntry::allocated(const SessionIPtr& session)
1107 {
1108     if(!_loaded.get() && !_load.get())
1109     {
1110         return;
1111     }
1112 
1113     TraceLevelsPtr traceLevels = _cache.getTraceLevels();
1114     if(traceLevels && traceLevels->server > 1)
1115     {
1116         Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1117         out << "server `" << _id << "' allocated by `" << session->getId() << "' (" << _count << ")";
1118     }
1119 
1120     ServerDescriptorPtr desc = _loaded.get() ? _loaded->descriptor : _load->descriptor;
1121 
1122     //
1123     // If the server has the session activation mode, we re-load the
1124     // server on the node as its deployment might have changed (it's
1125     // possible to use ${session.*} variable with server with the
1126     // session activation mode.
1127     //
1128     if(desc->activation == "session")
1129     {
1130         _updated = true;
1131         if(!_load.get())
1132         {
1133             _load.reset(_loaded.release());
1134         }
1135         _allocationSession = session;
1136         _load->sessionId = session->getId();
1137     }
1138 
1139     Glacier2::IdentitySetPrx identitySet = session->getGlacier2IdentitySet();
1140     Glacier2::StringSetPrx adapterIdSet = session->getGlacier2AdapterIdSet();
1141     if(identitySet && adapterIdSet)
1142     {
1143         ServerHelperPtr helper = createHelper(desc);
1144         multiset<string> adapterIds;
1145         multiset<Ice::Identity> identities;
1146         helper->getIds(adapterIds, identities);
1147         try
1148         {
1149             //
1150             // SunCC won't accept the following:
1151             //
1152             // ctl->adapterIds()->add(Ice::StringSeq(adapterIds.begin(), adapterIds.end()));
1153             // ctl->identities()->add(Ice::IdentitySeq(identities.begin(), identities.end()));
1154             //
1155             Ice::StringSeq adapterIdSeq;
1156             for(multiset<string>::iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1157             {
1158                 adapterIdSeq.push_back(*p);
1159             }
1160             Ice::IdentitySeq identitySeq;
1161             for(multiset<Ice::Identity>::iterator q = identities.begin(); q != identities.end(); ++q)
1162             {
1163                 identitySeq.push_back(*q);
1164             }
1165             adapterIdSet->add(adapterIdSeq);
1166             identitySet->add(identitySeq);
1167         }
1168         catch(const Ice::LocalException& ex)
1169         {
1170             if(traceLevels && traceLevels->server > 0)
1171             {
1172                 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1173                 out << "couldn't add Glacier2 filters for server `" << _id << "' allocated by `"
1174                     << session->getId() << ":\n" << ex;
1175             }
1176         }
1177     }
1178 }
1179 
1180 void
allocatedNoSync(const SessionIPtr &)1181 ServerEntry::allocatedNoSync(const SessionIPtr& /*session*/)
1182 {
1183     {
1184         Lock sync(*this);
1185         if(!_updated ||
1186            (_loaded.get() && _loaded->descriptor->activation != "session") ||
1187            (_load.get() && _load->descriptor->activation != "session"))
1188         {
1189             return;
1190         }
1191     }
1192 
1193     sync();
1194     waitForSyncNoThrow();
1195 }
1196 
1197 void
released(const SessionIPtr & session)1198 ServerEntry::released(const SessionIPtr& session)
1199 {
1200     if(!_loaded.get() && !_load.get())
1201     {
1202         return;
1203     }
1204 
1205     ServerDescriptorPtr desc = _loaded.get() ? _loaded->descriptor : _load->descriptor;
1206 
1207     //
1208     // If the server has the session activation mode, we re-load the
1209     // server on the node as its deployment might have changed (it's
1210     // possible to use ${session.*} variable with server with the
1211     // session activation mode. Synchronizing the server will also
1212     // shutdown the server on the node.
1213     //
1214     if(desc->activation == "session")
1215     {
1216         _updated = true;
1217         if(!_load.get())
1218         {
1219             _load.reset(_loaded.release());
1220         }
1221         _load->sessionId = "";
1222         _allocationSession = 0;
1223     }
1224 
1225     TraceLevelsPtr traceLevels = _cache.getTraceLevels();
1226 
1227     Glacier2::IdentitySetPrx identitySet = session->getGlacier2IdentitySet();
1228     Glacier2::StringSetPrx adapterIdSet = session->getGlacier2AdapterIdSet();
1229     if(identitySet && adapterIdSet)
1230     {
1231         ServerHelperPtr helper = createHelper(desc);
1232         multiset<string> adapterIds;
1233         multiset<Ice::Identity> identities;
1234         helper->getIds(adapterIds, identities);
1235         try
1236         {
1237             //
1238             // SunCC won't accept the following:
1239             //
1240             // ctl->adapterIds()->remove(Ice::StringSeq(adapterIds.begin(), adapterIds.end()));
1241             // ctl->identities()->remove(Ice::IdentitySeq(identities.begin(), identities.end()));
1242             //
1243             Ice::StringSeq adapterIdSeq;
1244             for(multiset<string>::iterator p = adapterIds.begin(); p != adapterIds.end(); ++p)
1245             {
1246                 adapterIdSeq.push_back(*p);
1247             }
1248             Ice::IdentitySeq identitySeq;
1249             for(multiset<Ice::Identity>::iterator q = identities.begin(); q != identities.end(); ++q)
1250             {
1251                 identitySeq.push_back(*q);
1252             }
1253             adapterIdSet->remove(adapterIdSeq);
1254             identitySet->remove(identitySeq);
1255         }
1256         catch(const Ice::LocalException& ex)
1257         {
1258             if(traceLevels && traceLevels->server > 0)
1259             {
1260                 Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1261                 out << "couldn't remove Glacier2 filters for server `" << _id << "' allocated by `";
1262                 out << session->getId() << ":\n" << ex;
1263             }
1264         }
1265     }
1266 
1267     if(traceLevels && traceLevels->server > 1)
1268     {
1269         Ice::Trace out(traceLevels->logger, traceLevels->serverCat);
1270         out << "server `" << _id << "' released by `" << session->getId() << "' (" << _count << ")";
1271     }
1272 }
1273 
1274 void
releasedNoSync(const SessionIPtr &)1275 ServerEntry::releasedNoSync(const SessionIPtr& /*session*/)
1276 {
1277     {
1278         Lock sync(*this);
1279         if(!_updated ||
1280            (_loaded.get() && _loaded->descriptor->activation != "session") ||
1281            (_load.get() && _load->descriptor->activation != "session"))
1282         {
1283             return;
1284         }
1285     }
1286 
1287     sync();
1288     waitForSyncNoThrow();
1289 }
1290