1 /* $Id: prefetch_manager_impl.cpp 246000 2011-02-10 18:51:35Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Eugene Vasilchenko
27 *
28 * File Description:
29 * Prefetch implementation
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <objmgr/impl/prefetch_manager_impl.hpp>
35 #include <objmgr/prefetch_manager.hpp>
36 #include <objmgr/objmgr_exception.hpp>
37 #include <corelib/ncbithr.hpp>
38 #include <corelib/ncbi_system.hpp>
39 #include <corelib/ncbi_safe_static.hpp>
40
41
42 BEGIN_NCBI_SCOPE
43 BEGIN_SCOPE(objects)
44
45
46 class CPrefetchManager_Impl;
47
48
49 BEGIN_SCOPE(prefetch)
50
51 /////////////////////////////////////////////////////////////////////////////
52 // CCancelRequest::
53 //
54 // Exception used to cancel requests safely, cleaning up
55 // all the resources allocated.
56 //
57
58 class CCancelRequestException
59 {
60 public:
61 // Create new exception object, initialize counter.
62 CCancelRequestException(void);
63
64 // Create a copy of exception object, increase counter.
65 CCancelRequestException(const CCancelRequestException& prev);
66
67 // Destroy the object, decrease counter. If the counter is
68 // zero outside of CThread::Wrapper(), rethrow exception.
69 ~CCancelRequestException(void);
70
71 // Inform the object it has reached normal catch.
SetFinished(void)72 void SetFinished(void)
73 {
74 m_Data->m_Finished = true;
75 }
76 private:
77 struct SData {
SDataCCancelRequestException::SData78 SData(void)
79 : m_RefCounter(1),
80 m_Finished(false)
81 {
82 }
83 int m_RefCounter;
84 bool m_Finished;
85 };
86 SData *m_Data;
87 };
88
89
CCancelRequestException(void)90 CCancelRequestException::CCancelRequestException(void)
91 : m_Data(new SData())
92 {
93 }
94
95
CCancelRequestException(const CCancelRequestException & prev)96 CCancelRequestException::CCancelRequestException(const CCancelRequestException& prev)
97 : m_Data(prev.m_Data)
98 {
99 ++m_Data->m_RefCounter;
100 }
101
102
~CCancelRequestException(void)103 CCancelRequestException::~CCancelRequestException(void)
104 {
105 if ( --m_Data->m_RefCounter > 0 ) {
106 // Not the last object - continue to handle exceptions
107 return;
108 }
109
110 bool finished = m_Data->m_Finished; // save the flag
111 delete m_Data;
112
113 if ( !finished ) {
114 ERR_POST(Critical<<"CancelRequest() failed due to catch(...) in "<<
115 CStackTrace());
116 }
117 }
118
END_SCOPE(prefetch)119 END_SCOPE(prefetch)
120
121 CPrefetchRequest::CPrefetchRequest(CObjectFor<CMutex>* state_mutex,
122 IPrefetchAction* action,
123 IPrefetchListener* listener,
124 unsigned int priority)
125 : CThreadPool_Task(priority),
126 m_StateMutex(state_mutex),
127 m_Action(action),
128 m_Listener(listener),
129 m_Progress(0)
130 {
131 }
132
133
~CPrefetchRequest(void)134 CPrefetchRequest::~CPrefetchRequest(void)
135 {
136 }
137
138
GetState(void) const139 CPrefetchRequest::EState CPrefetchRequest::GetState(void) const
140 {
141 switch (GetStatus()) {
142 case CThreadPool_Task::eIdle:
143 return eInvalid;
144 case CThreadPool_Task::eQueued:
145 return SPrefetchTypes::eQueued;
146 case CThreadPool_Task::eExecuting:
147 return eStarted;
148 case CThreadPool_Task::eCompleted:
149 return SPrefetchTypes::eCompleted;
150 case CThreadPool_Task::eCanceled:
151 return SPrefetchTypes::eCanceled;
152 case CThreadPool_Task::eFailed:
153 return SPrefetchTypes::eFailed;
154 }
155
156 return eInvalid;
157 }
158
159
SetListener(IPrefetchListener * listener)160 void CPrefetchRequest::SetListener(IPrefetchListener* listener)
161 {
162 CMutexGuard guard(m_StateMutex->GetData());
163 if ( m_Listener ) {
164 NCBI_THROW(CObjMgrException, eOtherError,
165 "CPrefetchToken::SetListener: listener already set");
166 }
167 m_Listener = listener;
168 }
169
170
OnStatusChange(EStatus)171 void CPrefetchRequest::OnStatusChange(EStatus /* old */)
172 {
173 if (m_Listener) {
174 m_Listener->PrefetchNotify(Ref(this), GetState());
175 }
176 }
177
178 CPrefetchRequest::TProgress
SetProgress(TProgress progress)179 CPrefetchRequest::SetProgress(TProgress progress)
180 {
181 CMutexGuard guard(m_StateMutex->GetData());
182 if ( GetStatus() != eExecuting ) {
183 NCBI_THROW(CObjMgrException, eOtherError,
184 "CPrefetchToken::SetProgress: not processing");
185 }
186 TProgress old_progress = m_Progress;
187 if ( progress != old_progress ) {
188 m_Progress = progress;
189 if ( m_Listener ) {
190 m_Listener->PrefetchNotify(Ref(this), eAdvanced);
191 }
192 }
193 return old_progress;
194 }
195
196
Execute(void)197 CPrefetchRequest::EStatus CPrefetchRequest::Execute(void)
198 {
199 try {
200 EStatus result = CThreadPool_Task::eCompleted;
201 if (m_Action.NotNull()) {
202 if (! GetAction()->Execute(Ref(this))) {
203 if ( IsCancelRequested() )
204 result = CThreadPool_Task::eCanceled;
205 else
206 result = CThreadPool_Task::eFailed;
207 }
208 }
209 return result;
210 }
211 catch ( CPrefetchCanceled& /* ignored */ ) {
212 return CThreadPool_Task::eCanceled;
213 }
214 catch ( prefetch::CCancelRequestException& exc ) {
215 exc.SetFinished();
216 return CThreadPool_Task::eCanceled;
217 }
218 }
219
220
CPrefetchManager_Impl(unsigned max_threads,CThread::TRunMode threads_mode)221 CPrefetchManager_Impl::CPrefetchManager_Impl(unsigned max_threads,
222 CThread::TRunMode threads_mode)
223 : CThreadPool(kMax_Int, max_threads, 2, threads_mode),
224 m_StateMutex(new CObjectFor<CMutex>())
225 {
226 }
227
228
~CPrefetchManager_Impl(void)229 CPrefetchManager_Impl::~CPrefetchManager_Impl(void)
230 {
231 }
232
233
AddAction(TPriority priority,IPrefetchAction * action,IPrefetchListener * listener)234 CRef<CPrefetchRequest> CPrefetchManager_Impl::AddAction(TPriority priority,
235 IPrefetchAction* action,
236 IPrefetchListener* listener)
237 {
238 CMutexGuard guard0(GetMainPoolMutex());
239 if ( action && IsAborted() ) {
240 throw prefetch::CCancelRequestException();
241 }
242 CMutexGuard guard(m_StateMutex->GetData());
243 CRef<CPrefetchRequest> req(new CPrefetchRequest(m_StateMutex,
244 action,
245 listener,
246 priority));
247 AddTask(req);
248 return req;
249 }
250
251
IsActive(void)252 bool CPrefetchManager::IsActive(void)
253 {
254 CThreadPool_Thread* thread = dynamic_cast<CThreadPool_Thread*>(
255 CThread::GetCurrentThread());
256 if ( !thread ) {
257 return false;
258 }
259
260 CRef<CThreadPool_Task> req = thread->GetCurrentTask();
261 if ( !req ) {
262 return false;
263 }
264
265 if ( req->IsCancelRequested() && dynamic_cast<CPrefetchRequest*>(&*req) ) {
266 throw prefetch::CCancelRequestException();
267 }
268
269 return true;
270 }
271
272
273 END_SCOPE(objects)
274 END_NCBI_SCOPE
275