1 /* $Id: prefetch_impl.cpp 398104 2013-05-02 19:00:01Z vasilche $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko, Eugene Vasilchenko
27 *
28 * File Description:
29 * Prefetch implementation
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34 #include <objmgr/impl/prefetch_impl.hpp>
35 #include <corelib/ncbimtx.hpp>
36 #include <objmgr/bioseq_handle.hpp>
37 #include <objmgr/scope.hpp>
38 #include <objmgr/impl/tse_info.hpp>
39 #include <objmgr/impl/bioseq_info.hpp>
40 #include <objmgr/impl/scope_impl.hpp>
41 #include <objmgr/impl/data_source.hpp>
42
43 BEGIN_NCBI_SCOPE
BEGIN_SCOPE(objects)44 BEGIN_SCOPE(objects)
45
46 // NOTE: Max. value for semaphore must be prefetch depth + 1, because
47 // one extra-Post will be called when the token impl. is released.
48
49 CPrefetchTokenOld_Impl::CPrefetchTokenOld_Impl(const TIds& ids, unsigned int depth)
50 : m_TokenCount(0),
51 m_TSESemaphore(depth, max(depth+1, depth)),
52 m_Non_locking(false)
53 {
54 m_Ids = ids;
55 }
56
57
~CPrefetchTokenOld_Impl(void)58 CPrefetchTokenOld_Impl::~CPrefetchTokenOld_Impl(void)
59 {
60 return;
61 }
62
63
x_InitPrefetch(CScope & scope)64 void CPrefetchTokenOld_Impl::x_InitPrefetch(CScope& scope)
65 {
66 m_TSEs.resize(m_Ids.size());
67 m_CurrentId = 0;
68 CRef<CDataSource> source(scope.GetImpl().GetFirstLoaderSource());
69 if (!source) {
70 return;
71 }
72 source->Prefetch(*this);
73 }
74
75
x_SetNon_locking(void)76 void CPrefetchTokenOld_Impl::x_SetNon_locking(void)
77 {
78 m_Non_locking = true;
79 }
80
81
AddResolvedId(size_t id_idx,TTSE_Lock tse)82 void CPrefetchTokenOld_Impl::AddResolvedId(size_t id_idx, TTSE_Lock tse)
83 {
84 CFastMutexGuard guard(m_Lock);
85 if ( m_Non_locking ) {
86 m_TSESemaphore.Post();
87 return;
88 }
89 if (m_Ids.empty() || id_idx < m_CurrentId) {
90 // Token has been cleaned or id already passed, do not lock the TSE
91 return;
92 }
93 m_TSEs[id_idx] = tse;
94 int count = ++m_TSEMap[tse];
95 if (count > 1) {
96 // One more ID found in a prefetched TSE
97 m_TSESemaphore.Post();
98 }
99 }
100
101
IsEmpty(void) const102 bool CPrefetchTokenOld_Impl::IsEmpty(void) const
103 {
104 CFastMutexGuard guard(m_Lock);
105 return m_Ids.empty();
106 }
107
108
IsValid(void) const109 bool CPrefetchTokenOld_Impl::IsValid(void) const
110 {
111 CFastMutexGuard guard(m_Lock);
112 return m_CurrentId < m_Ids.size();
113 }
114
115
NextBioseqHandle(CScope & scope)116 CBioseq_Handle CPrefetchTokenOld_Impl::NextBioseqHandle(CScope& scope)
117 {
118 TTSE_Lock tse;
119 CSeq_id_Handle id;
120 {{
121 CFastMutexGuard guard(m_Lock);
122 // Can not call bool(*this) - creates deadlock
123 _ASSERT(m_CurrentId < m_Ids.size());
124 id = m_Ids[m_CurrentId];
125 // Keep temporary TSE lock
126 tse = m_TSEs[m_CurrentId];
127 m_TSEs[m_CurrentId].Reset();
128 ++m_CurrentId;
129 if ( tse ) {
130 TTSE_Map::iterator it = m_TSEMap.find(tse);
131 if ( --(it->second) < 1 ) {
132 m_TSEMap.erase(it);
133 // Signal that next TSE or next token may be prefetched
134 m_TSESemaphore.Post();
135 }
136 }
137 }}
138 return scope.GetBioseqHandle(id);
139 }
140
141
AddTokenReference(void)142 void CPrefetchTokenOld_Impl::AddTokenReference(void)
143 {
144 ++m_TokenCount;
145 }
146
147
RemoveTokenReference(void)148 void CPrefetchTokenOld_Impl::RemoveTokenReference(void)
149 {
150 if ( !(--m_TokenCount) ) {
151 // No more tokens, reset the queue
152 CFastMutexGuard guard(m_Lock);
153 m_Ids.clear();
154 m_TSEs.clear();
155 m_CurrentId = 0;
156 // Allow the thread to process next token
157 m_TSESemaphore.Post();
158 }
159 }
160
161
CPrefetchThreadOld(CDataSource & data_source)162 CPrefetchThreadOld::CPrefetchThreadOld(CDataSource& data_source)
163 : m_DataSource(data_source),
164 m_Stop(false)
165 {
166 return;
167 }
168
169
~CPrefetchThreadOld(void)170 CPrefetchThreadOld::~CPrefetchThreadOld(void)
171 {
172 return;
173 }
174
175
AddRequest(CPrefetchTokenOld_Impl & token)176 void CPrefetchThreadOld::AddRequest(CPrefetchTokenOld_Impl& token)
177 {
178 {{
179 CFastMutexGuard guard(m_Lock);
180 m_Queue.Push(Ref(&token));
181 }}
182 }
183
184
Terminate(void)185 void CPrefetchThreadOld::Terminate(void)
186 {
187 {{
188 CFastMutexGuard guard(m_Lock);
189 m_Stop = true;
190 }}
191 // Unlock the thread
192 m_Queue.Push(CRef<CPrefetchTokenOld_Impl>(0));
193 }
194
195
Main(void)196 void* CPrefetchThreadOld::Main(void)
197 {
198 do {
199 CRef<CPrefetchTokenOld_Impl> token = m_Queue.Pop();
200 {{
201 CFastMutexGuard guard(m_Lock);
202 if (m_Stop) {
203 return 0;
204 }
205 if ( token->IsEmpty() ) {
206 // Token may have been canceled
207 continue;
208 }
209 }}
210 bool release_token = false;
211 for (size_t i = 0; ; ++i) {
212 {{
213 CFastMutexGuard guard(m_Lock);
214 if (m_Stop) {
215 return 0;
216 }
217 }}
218 CSeq_id_Handle id;
219 token->m_TSESemaphore.Wait();
220 {{
221 // m_Ids may be cleaned up by the token, check size
222 // on every iteration.
223 CFastMutexGuard guard(token->m_Lock);
224 i = max(i, token->m_CurrentId);
225 if (i >= token->m_Ids.size()) {
226 // Can not release token now - mutex is still locked
227 release_token = true;
228 break;
229 }
230 id = token->m_Ids[i];
231 }}
232 try {
233 SSeqMatch_DS match = m_DataSource.BestResolve(id);
234 if ( match ) {
235 token->AddResolvedId(i, match.m_TSE_Lock);
236 }
237 } catch ( exception& ) {
238 // BestResolve() failed, go to the next id.
239 }
240 }
241 if (release_token) {
242 token.Reset();
243 }
244 } while (true);
245 return 0;
246 }
247
248
249 END_SCOPE(objects)
250 END_NCBI_SCOPE
251