1 /*  $Id: prefetch_manager_impl.cpp 246000 2011-02-10 18:51:35Z vasilche $
2 * ===========================================================================
3 *
4 *                            PUBLIC DOMAIN NOTICE
5 *               National Center for Biotechnology Information
6 *
7 *  This software/database is a "United States Government Work" under the
8 *  terms of the United States Copyright Act.  It was written as part of
9 *  the author's official duties as a United States Government employee and
10 *  thus cannot be copyrighted.  This software/database is freely available
11 *  to the public for use. The National Library of Medicine and the U.S.
12 *  Government have not placed any restriction on its use or reproduction.
13 *
14 *  Although all reasonable efforts have been taken to ensure the accuracy
15 *  and reliability of the software and data, the NLM and the U.S.
16 *  Government do not and cannot warrant the performance or results that
17 *  may be obtained by using this software or data. The NLM and the U.S.
18 *  Government disclaim all warranties, express or implied, including
19 *  warranties of performance, merchantability or fitness for any particular
20 *  purpose.
21 *
22 *  Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Eugene Vasilchenko
27 *
28 * File Description:
29 *   Prefetch implementation
30 *
31 */
32 
33 #include <ncbi_pch.hpp>
34 #include <objmgr/impl/prefetch_manager_impl.hpp>
35 #include <objmgr/prefetch_manager.hpp>
36 #include <objmgr/objmgr_exception.hpp>
37 #include <corelib/ncbithr.hpp>
38 #include <corelib/ncbi_system.hpp>
39 #include <corelib/ncbi_safe_static.hpp>
40 
41 
42 BEGIN_NCBI_SCOPE
43 BEGIN_SCOPE(objects)
44 
45 
46 class CPrefetchManager_Impl;
47 
48 
49 BEGIN_SCOPE(prefetch)
50 
51 /////////////////////////////////////////////////////////////////////////////
52 //  CCancelRequest::
53 //
54 //    Exception used to cancel requests safely, cleaning up
55 //    all the resources allocated.
56 //
57 
58 class CCancelRequestException
59 {
60 public:
61     // Create new exception object, initialize counter.
62     CCancelRequestException(void);
63 
64     // Create a copy of exception object, increase counter.
65     CCancelRequestException(const CCancelRequestException& prev);
66 
67     // Destroy the object, decrease counter. If the counter is
68     // zero outside of CThread::Wrapper(), rethrow exception.
69     ~CCancelRequestException(void);
70 
71     // Inform the object it has reached normal catch.
SetFinished(void)72     void SetFinished(void)
73     {
74         m_Data->m_Finished = true;
75     }
76 private:
77     struct SData {
SDataCCancelRequestException::SData78         SData(void)
79             : m_RefCounter(1),
80               m_Finished(false)
81             {
82             }
83         int m_RefCounter;
84         bool m_Finished;
85     };
86     SData *m_Data;
87 };
88 
89 
CCancelRequestException(void)90 CCancelRequestException::CCancelRequestException(void)
91     : m_Data(new SData())
92 {
93 }
94 
95 
CCancelRequestException(const CCancelRequestException & prev)96 CCancelRequestException::CCancelRequestException(const CCancelRequestException& prev)
97     : m_Data(prev.m_Data)
98 {
99     ++m_Data->m_RefCounter;
100 }
101 
102 
~CCancelRequestException(void)103 CCancelRequestException::~CCancelRequestException(void)
104 {
105     if ( --m_Data->m_RefCounter > 0 ) {
106         // Not the last object - continue to handle exceptions
107         return;
108     }
109 
110     bool finished = m_Data->m_Finished; // save the flag
111     delete m_Data;
112 
113     if ( !finished ) {
114         ERR_POST(Critical<<"CancelRequest() failed due to catch(...) in "<<
115                  CStackTrace());
116     }
117 }
118 
END_SCOPE(prefetch)119 END_SCOPE(prefetch)
120 
121 CPrefetchRequest::CPrefetchRequest(CObjectFor<CMutex>* state_mutex,
122                                    IPrefetchAction* action,
123                                    IPrefetchListener* listener,
124                                    unsigned int priority)
125     : CThreadPool_Task(priority),
126       m_StateMutex(state_mutex),
127       m_Action(action),
128       m_Listener(listener),
129       m_Progress(0)
130 {
131 }
132 
133 
~CPrefetchRequest(void)134 CPrefetchRequest::~CPrefetchRequest(void)
135 {
136 }
137 
138 
GetState(void) const139 CPrefetchRequest::EState CPrefetchRequest::GetState(void) const
140 {
141     switch (GetStatus()) {
142     case CThreadPool_Task::eIdle:
143         return eInvalid;
144     case CThreadPool_Task::eQueued:
145         return SPrefetchTypes::eQueued;
146     case CThreadPool_Task::eExecuting:
147         return eStarted;
148     case CThreadPool_Task::eCompleted:
149         return SPrefetchTypes::eCompleted;
150     case CThreadPool_Task::eCanceled:
151         return SPrefetchTypes::eCanceled;
152     case CThreadPool_Task::eFailed:
153         return SPrefetchTypes::eFailed;
154     }
155 
156     return eInvalid;
157 }
158 
159 
SetListener(IPrefetchListener * listener)160 void CPrefetchRequest::SetListener(IPrefetchListener* listener)
161 {
162     CMutexGuard guard(m_StateMutex->GetData());
163     if ( m_Listener ) {
164         NCBI_THROW(CObjMgrException, eOtherError,
165                    "CPrefetchToken::SetListener: listener already set");
166     }
167     m_Listener = listener;
168 }
169 
170 
OnStatusChange(EStatus)171 void CPrefetchRequest::OnStatusChange(EStatus /* old */)
172 {
173     if (m_Listener) {
174         m_Listener->PrefetchNotify(Ref(this), GetState());
175     }
176 }
177 
178 CPrefetchRequest::TProgress
SetProgress(TProgress progress)179 CPrefetchRequest::SetProgress(TProgress progress)
180 {
181     CMutexGuard guard(m_StateMutex->GetData());
182     if ( GetStatus() != eExecuting ) {
183         NCBI_THROW(CObjMgrException, eOtherError,
184                    "CPrefetchToken::SetProgress: not processing");
185     }
186     TProgress old_progress = m_Progress;
187     if ( progress != old_progress ) {
188         m_Progress = progress;
189         if ( m_Listener ) {
190             m_Listener->PrefetchNotify(Ref(this), eAdvanced);
191         }
192     }
193     return old_progress;
194 }
195 
196 
Execute(void)197 CPrefetchRequest::EStatus CPrefetchRequest::Execute(void)
198 {
199     try {
200         EStatus result = CThreadPool_Task::eCompleted;
201         if (m_Action.NotNull()) {
202             if (! GetAction()->Execute(Ref(this))) {
203                 if ( IsCancelRequested() )
204                     result = CThreadPool_Task::eCanceled;
205                 else
206                     result = CThreadPool_Task::eFailed;
207             }
208         }
209         return result;
210     }
211     catch ( CPrefetchCanceled& /* ignored */ ) {
212         return CThreadPool_Task::eCanceled;
213     }
214     catch ( prefetch::CCancelRequestException& exc ) {
215         exc.SetFinished();
216         return CThreadPool_Task::eCanceled;
217     }
218 }
219 
220 
CPrefetchManager_Impl(unsigned max_threads,CThread::TRunMode threads_mode)221 CPrefetchManager_Impl::CPrefetchManager_Impl(unsigned max_threads,
222                                              CThread::TRunMode threads_mode)
223     : CThreadPool(kMax_Int, max_threads, 2, threads_mode),
224       m_StateMutex(new CObjectFor<CMutex>())
225 {
226 }
227 
228 
~CPrefetchManager_Impl(void)229 CPrefetchManager_Impl::~CPrefetchManager_Impl(void)
230 {
231 }
232 
233 
AddAction(TPriority priority,IPrefetchAction * action,IPrefetchListener * listener)234 CRef<CPrefetchRequest> CPrefetchManager_Impl::AddAction(TPriority priority,
235                                                         IPrefetchAction* action,
236                                                         IPrefetchListener* listener)
237 {
238     CMutexGuard guard0(GetMainPoolMutex());
239     if ( action && IsAborted() ) {
240         throw prefetch::CCancelRequestException();
241     }
242     CMutexGuard guard(m_StateMutex->GetData());
243     CRef<CPrefetchRequest> req(new CPrefetchRequest(m_StateMutex,
244                                                     action,
245                                                     listener,
246                                                     priority));
247     AddTask(req);
248     return req;
249 }
250 
251 
IsActive(void)252 bool CPrefetchManager::IsActive(void)
253 {
254     CThreadPool_Thread* thread = dynamic_cast<CThreadPool_Thread*>(
255                                                 CThread::GetCurrentThread());
256     if ( !thread ) {
257         return false;
258     }
259 
260     CRef<CThreadPool_Task> req = thread->GetCurrentTask();
261     if ( !req ) {
262         return false;
263     }
264 
265     if ( req->IsCancelRequested() && dynamic_cast<CPrefetchRequest*>(&*req) ) {
266         throw prefetch::CCancelRequestException();
267     }
268 
269     return true;
270 }
271 
272 
273 END_SCOPE(objects)
274 END_NCBI_SCOPE
275