1 /* $Id: fetch.cpp 634624 2021-07-15 17:15:43Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Evgueni Belyi based on Dmitrii Saprykin's code
27 *
28 * File Description:
29 *
30 * Cassandra fetch named annot record
31 *
32 */
33
34 #include <ncbi_pch.hpp>
35
36 #include <memory>
37 #include <string>
38 #include <tuple>
39 #include <utility>
40 #include <vector>
41
42 //#include <objtools/pubseq_gateway/impl/cassandra/acc_ver_hist/record.hpp>
43 //#include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
44 //#include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
45 //#include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
46
47 #include <objtools/pubseq_gateway/impl/cassandra/acc_ver_hist/tasks.hpp>
48 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
49
50 BEGIN_IDBLOB_SCOPE
51 USING_NCBI_SCOPE;
52
CCassAccVerHistoryTaskFetch(unsigned int timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> connection,const string & keyspace,string accession,TAccVerHistConsumeCallback consume_callback,TDataErrorCallback data_error_cb,int16_t version,int16_t seq_id_type)53 CCassAccVerHistoryTaskFetch::CCassAccVerHistoryTaskFetch(
54 unsigned int timeout_ms,
55 unsigned int max_retries,
56 shared_ptr<CCassConnection> connection,
57 const string & keyspace,
58 string accession,
59 TAccVerHistConsumeCallback consume_callback,
60 TDataErrorCallback data_error_cb,
61 int16_t version,
62 int16_t seq_id_type
63 )
64 : CCassBlobWaiter(
65 timeout_ms, connection, keyspace,
66 0,
67 true, // false, // m_Async
68 max_retries, move(data_error_cb)
69 )
70 , m_Accession( move( accession))
71 , m_Version( version)
72 , m_SeqIdType( seq_id_type)
73 , m_Consume( move( consume_callback))
74 , m_PageSize( CCassQuery::DEFAULT_PAGE_SIZE)
75 , m_RestartCounter( 0)
76 {}
77
SetConsumeCallback(TAccVerHistConsumeCallback callback)78 void CCassAccVerHistoryTaskFetch::SetConsumeCallback( TAccVerHistConsumeCallback callback)
79 {
80 m_Consume = move(callback);
81 }
82
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)83 void CCassAccVerHistoryTaskFetch::SetDataReadyCB(
84 shared_ptr<CCassDataCallbackReceiver> callback)
85 {
86 if( callback && m_State != eInit)
87 {
88 NCBI_THROW(CCassandraException, eSeqFailed,
89 "CCassAccVerHistoryTaskFetch: DataReadyCB can't be assigned "
90 "after the loading process has started");
91 }
92 CCassBlobWaiter::SetDataReadyCB3( callback);
93 }
94
Wait1()95 void CCassAccVerHistoryTaskFetch::Wait1()
96 {
97 bool restarted = false;
98 do
99 {
100 restarted = false;
101 switch (m_State)
102 {
103 case eError:
104 case eDone:
105 return;
106
107 case eInit:
108 {
109 m_QueryArr.resize(1);
110 m_QueryArr[0] = { m_Conn->NewQuery(), 0};
111
112 string sql =
113 " SELECT"
114 " version, gi, date, chain, id_type, sat, sat_key"
115 " FROM " + GetKeySpace() + ".acc_ver_hist "
116 " WHERE"
117 " accession = ? ";
118 unsigned int params = 1;
119 if( m_Version > 0)
120 {
121 sql += " AND version = ?";
122 ++params;
123 }
124 if( m_SeqIdType > 0)
125 {
126 sql += " AND id_type = ? ALLOW FILTERING";
127 ++params;
128 }
129
130 m_QueryArr[0].query->SetSQL(sql, params);
131 m_QueryArr[0].query->BindStr(0, m_Accession);
132
133 unsigned int param = 1;
134 if( m_Version > 0)
135 {
136 m_QueryArr[0].query->BindInt16( param, m_Version); param++;
137 }
138 if( m_SeqIdType > 0)
139 {
140 m_QueryArr[0].query->BindInt16( param, m_SeqIdType);
141 }
142
143 SetupQueryCB3( m_QueryArr[0].query);
144 UpdateLastActivity();
145 m_QueryArr[0].query->Query( CASS_CONSISTENCY_LOCAL_QUORUM,
146 m_Async, true, m_PageSize);
147 restarted = true;
148 m_State = eFetchStarted;
149 break;
150 }
151
152 case eFetchStarted:
153 {
154 if( CheckReady( m_QueryArr[0].query, m_RestartCounter, restarted))
155 {
156 bool do_next = true;
157 auto state = m_QueryArr[0].query->NextRow();
158 while( do_next && state == ar_dataready)
159 {
160 SAccVerHistRec record;
161 record.accession = m_Accession;
162 record.version = m_QueryArr[0].query->FieldGetInt16Value( 0, 0);
163 record.gi = m_QueryArr[0].query->FieldGetInt64Value( 1, 0);
164 record.date = m_QueryArr[0].query->FieldGetInt64Value( 2, 0);
165 record.chain = m_QueryArr[0].query->FieldGetInt64Value( 3, 0);
166 record.seq_id_type = m_QueryArr[0].query->FieldGetInt16Value( 4, 0);
167 record.sat = m_QueryArr[0].query->FieldGetInt16Value( 5, 0);
168 record.sat_key = m_QueryArr[0].query->FieldGetInt32Value( 6, 0);
169
170 if( m_Consume)
171 {
172 do_next = m_Consume( move( record), false);
173 }
174 if( do_next)
175 {
176 state = m_QueryArr[0].query->NextRow();
177 }
178 }
179 if( !do_next || m_QueryArr[0].query->IsEOF())
180 {
181 if( m_Consume)
182 {
183 m_Consume( SAccVerHistRec(), true);
184 }
185 CloseAll();
186 m_State = eDone;
187 }
188 }
189 else if( restarted)
190 {
191 ++m_RestartCounter;
192 m_QueryArr[0].query->Close();
193 m_State = eInit;
194 }
195 UpdateLastActivity();
196 break;
197 }
198
199 default:
200 {
201 char msg[1024];
202 snprintf(msg, sizeof(msg),
203 "Failed to fetch accession history (key=%s.%s|%hd|%hd) unexpected state (%d)",
204 m_Keyspace.c_str(), m_Accession.c_str(), m_Version, m_SeqIdType,
205 static_cast<int>(m_State));
206 Error( CRequestStatus::e502_BadGateway,
207 CCassandraException::eQueryFailed, eDiag_Error, msg);
208 }
209 }
210 } while( restarted);
211 }
212
213 END_IDBLOB_SCOPE
214