1 /*  $Id: prefetch_impl.cpp 398104 2013-05-02 19:00:01Z vasilche $
2 * ===========================================================================
3 *
4 *                            PUBLIC DOMAIN NOTICE
5 *               National Center for Biotechnology Information
6 *
7 *  This software/database is a "United States Government Work" under the
8 *  terms of the United States Copyright Act.  It was written as part of
9 *  the author's official duties as a United States Government employee and
10 *  thus cannot be copyrighted.  This software/database is freely available
11 *  to the public for use. The National Library of Medicine and the U.S.
12 *  Government have not placed any restriction on its use or reproduction.
13 *
14 *  Although all reasonable efforts have been taken to ensure the accuracy
15 *  and reliability of the software and data, the NLM and the U.S.
16 *  Government do not and cannot warrant the performance or results that
17 *  may be obtained by using this software or data. The NLM and the U.S.
18 *  Government disclaim all warranties, express or implied, including
19 *  warranties of performance, merchantability or fitness for any particular
20 *  purpose.
21 *
22 *  Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko, Eugene Vasilchenko
27 *
28 * File Description:
29 *   Prefetch implementation
30 *
31 */
32 
33 #include <ncbi_pch.hpp>
34 #include <objmgr/impl/prefetch_impl.hpp>
35 #include <corelib/ncbimtx.hpp>
36 #include <objmgr/bioseq_handle.hpp>
37 #include <objmgr/scope.hpp>
38 #include <objmgr/impl/tse_info.hpp>
39 #include <objmgr/impl/bioseq_info.hpp>
40 #include <objmgr/impl/scope_impl.hpp>
41 #include <objmgr/impl/data_source.hpp>
42 
43 BEGIN_NCBI_SCOPE
BEGIN_SCOPE(objects)44 BEGIN_SCOPE(objects)
45 
46 // NOTE: Max. value for semaphore must be prefetch depth + 1, because
47 // one extra-Post will be called when the token impl. is released.
48 
49 CPrefetchTokenOld_Impl::CPrefetchTokenOld_Impl(const TIds& ids, unsigned int depth)
50     : m_TokenCount(0),
51       m_TSESemaphore(depth, max(depth+1, depth)),
52       m_Non_locking(false)
53 {
54     m_Ids = ids;
55 }
56 
57 
~CPrefetchTokenOld_Impl(void)58 CPrefetchTokenOld_Impl::~CPrefetchTokenOld_Impl(void)
59 {
60     return;
61 }
62 
63 
x_InitPrefetch(CScope & scope)64 void CPrefetchTokenOld_Impl::x_InitPrefetch(CScope& scope)
65 {
66     m_TSEs.resize(m_Ids.size());
67     m_CurrentId = 0;
68     CRef<CDataSource> source(scope.GetImpl().GetFirstLoaderSource());
69     if (!source) {
70         return;
71     }
72     source->Prefetch(*this);
73 }
74 
75 
x_SetNon_locking(void)76 void CPrefetchTokenOld_Impl::x_SetNon_locking(void)
77 {
78     m_Non_locking = true;
79 }
80 
81 
AddResolvedId(size_t id_idx,TTSE_Lock tse)82 void CPrefetchTokenOld_Impl::AddResolvedId(size_t id_idx, TTSE_Lock tse)
83 {
84     CFastMutexGuard guard(m_Lock);
85     if ( m_Non_locking ) {
86         m_TSESemaphore.Post();
87         return;
88     }
89     if (m_Ids.empty()  ||  id_idx < m_CurrentId) {
90         // Token has been cleaned or id already passed, do not lock the TSE
91         return;
92     }
93     m_TSEs[id_idx] = tse;
94     int count = ++m_TSEMap[tse];
95     if (count > 1) {
96         // One more ID found in a prefetched TSE
97         m_TSESemaphore.Post();
98     }
99 }
100 
101 
IsEmpty(void) const102 bool CPrefetchTokenOld_Impl::IsEmpty(void) const
103 {
104     CFastMutexGuard guard(m_Lock);
105     return m_Ids.empty();
106 }
107 
108 
IsValid(void) const109 bool CPrefetchTokenOld_Impl::IsValid(void) const
110 {
111     CFastMutexGuard guard(m_Lock);
112     return m_CurrentId < m_Ids.size();
113 }
114 
115 
NextBioseqHandle(CScope & scope)116 CBioseq_Handle CPrefetchTokenOld_Impl::NextBioseqHandle(CScope& scope)
117 {
118     TTSE_Lock tse;
119     CSeq_id_Handle id;
120     {{
121         CFastMutexGuard guard(m_Lock);
122         // Can not call bool(*this) - creates deadlock
123         _ASSERT(m_CurrentId < m_Ids.size());
124         id = m_Ids[m_CurrentId];
125         // Keep temporary TSE lock
126         tse = m_TSEs[m_CurrentId];
127         m_TSEs[m_CurrentId].Reset();
128         ++m_CurrentId;
129         if ( tse ) {
130             TTSE_Map::iterator it = m_TSEMap.find(tse);
131             if ( --(it->second) < 1 ) {
132                 m_TSEMap.erase(it);
133                 // Signal that next TSE or next token may be prefetched
134                 m_TSESemaphore.Post();
135             }
136         }
137     }}
138     return scope.GetBioseqHandle(id);
139 }
140 
141 
AddTokenReference(void)142 void CPrefetchTokenOld_Impl::AddTokenReference(void)
143 {
144     ++m_TokenCount;
145 }
146 
147 
RemoveTokenReference(void)148 void CPrefetchTokenOld_Impl::RemoveTokenReference(void)
149 {
150     if ( !(--m_TokenCount) ) {
151         // No more tokens, reset the queue
152         CFastMutexGuard guard(m_Lock);
153         m_Ids.clear();
154         m_TSEs.clear();
155         m_CurrentId = 0;
156         // Allow the thread to process next token
157         m_TSESemaphore.Post();
158     }
159 }
160 
161 
CPrefetchThreadOld(CDataSource & data_source)162 CPrefetchThreadOld::CPrefetchThreadOld(CDataSource& data_source)
163     : m_DataSource(data_source),
164       m_Stop(false)
165 {
166     return;
167 }
168 
169 
~CPrefetchThreadOld(void)170 CPrefetchThreadOld::~CPrefetchThreadOld(void)
171 {
172     return;
173 }
174 
175 
AddRequest(CPrefetchTokenOld_Impl & token)176 void CPrefetchThreadOld::AddRequest(CPrefetchTokenOld_Impl& token)
177 {
178     {{
179         CFastMutexGuard guard(m_Lock);
180         m_Queue.Push(Ref(&token));
181     }}
182 }
183 
184 
Terminate(void)185 void CPrefetchThreadOld::Terminate(void)
186 {
187     {{
188         CFastMutexGuard guard(m_Lock);
189         m_Stop = true;
190     }}
191     // Unlock the thread
192     m_Queue.Push(CRef<CPrefetchTokenOld_Impl>(0));
193 }
194 
195 
Main(void)196 void* CPrefetchThreadOld::Main(void)
197 {
198     do {
199         CRef<CPrefetchTokenOld_Impl> token = m_Queue.Pop();
200         {{
201             CFastMutexGuard guard(m_Lock);
202             if (m_Stop) {
203                 return 0;
204             }
205             if ( token->IsEmpty() ) {
206                 // Token may have been canceled
207                 continue;
208             }
209         }}
210         bool release_token = false;
211         for (size_t i = 0; ; ++i) {
212             {{
213                 CFastMutexGuard guard(m_Lock);
214                 if (m_Stop) {
215                     return 0;
216                 }
217             }}
218             CSeq_id_Handle id;
219             token->m_TSESemaphore.Wait();
220             {{
221                 // m_Ids may be cleaned up by the token, check size
222                 // on every iteration.
223                 CFastMutexGuard guard(token->m_Lock);
224                 i = max(i, token->m_CurrentId);
225                 if (i >= token->m_Ids.size()) {
226                     // Can not release token now - mutex is still locked
227                     release_token = true;
228                     break;
229                 }
230                 id = token->m_Ids[i];
231             }}
232             try {
233                 SSeqMatch_DS match = m_DataSource.BestResolve(id);
234                 if ( match ) {
235                     token->AddResolvedId(i, match.m_TSE_Lock);
236                 }
237             } catch ( exception& ) {
238                 // BestResolve() failed, go to the next id.
239             }
240         }
241         if (release_token) {
242             token.Reset();
243         }
244     } while (true);
245     return 0;
246 }
247 
248 
249 END_SCOPE(objects)
250 END_NCBI_SCOPE
251