1 /*  $Id: fetch.cpp 634624 2021-07-15 17:15:43Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Evgueni Belyi based on Dmitrii Saprykin's code
27  *
28  * File Description:
29  *
30  * Cassandra fetch named annot record
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <memory>
37 #include <string>
38 #include <tuple>
39 #include <utility>
40 #include <vector>
41 
42 //#include <objtools/pubseq_gateway/impl/cassandra/acc_ver_hist/record.hpp>
43 //#include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
44 //#include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
45 //#include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
46 
47 #include <objtools/pubseq_gateway/impl/cassandra/acc_ver_hist/tasks.hpp>
48 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
49 
50 BEGIN_IDBLOB_SCOPE
51 USING_NCBI_SCOPE;
52 
CCassAccVerHistoryTaskFetch(unsigned int timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> connection,const string & keyspace,string accession,TAccVerHistConsumeCallback consume_callback,TDataErrorCallback data_error_cb,int16_t version,int16_t seq_id_type)53 CCassAccVerHistoryTaskFetch::CCassAccVerHistoryTaskFetch(
54     unsigned int timeout_ms,
55     unsigned int max_retries,
56     shared_ptr<CCassConnection> connection,
57     const string & keyspace,
58     string accession,
59     TAccVerHistConsumeCallback consume_callback,
60     TDataErrorCallback data_error_cb,
61     int16_t version,
62     int16_t seq_id_type
63 )
64     : CCassBlobWaiter(
65         timeout_ms, connection, keyspace,
66         0,
67         true, // false, // m_Async
68         max_retries, move(data_error_cb)
69     )
70     , m_Accession( move( accession))
71     , m_Version( version)
72     , m_SeqIdType( seq_id_type)
73     , m_Consume( move( consume_callback))
74     , m_PageSize( CCassQuery::DEFAULT_PAGE_SIZE)
75     , m_RestartCounter( 0)
76 {}
77 
SetConsumeCallback(TAccVerHistConsumeCallback callback)78 void CCassAccVerHistoryTaskFetch::SetConsumeCallback( TAccVerHistConsumeCallback callback)
79 {
80     m_Consume = move(callback);
81 }
82 
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)83 void CCassAccVerHistoryTaskFetch::SetDataReadyCB(
84     shared_ptr<CCassDataCallbackReceiver> callback)
85 {
86     if( callback && m_State != eInit)
87     {
88         NCBI_THROW(CCassandraException, eSeqFailed,
89            "CCassAccVerHistoryTaskFetch: DataReadyCB can't be assigned "
90            "after the loading process has started");
91     }
92     CCassBlobWaiter::SetDataReadyCB3( callback);
93 }
94 
Wait1()95 void CCassAccVerHistoryTaskFetch::Wait1()
96 {
97     bool restarted = false;
98     do
99     {
100         restarted = false;
101         switch (m_State)
102         {
103         case eError:
104         case eDone:
105             return;
106 
107         case eInit:
108         {
109             m_QueryArr.resize(1);
110             m_QueryArr[0] = { m_Conn->NewQuery(), 0};
111 
112             string sql =
113                 " SELECT"
114                 "     version, gi, date, chain, id_type, sat, sat_key"
115                 " FROM " + GetKeySpace() + ".acc_ver_hist "
116                 " WHERE"
117                 "     accession = ? ";
118             unsigned int params = 1;
119             if( m_Version > 0)
120             {
121                 sql += " AND version = ?";
122                 ++params;
123             }
124             if( m_SeqIdType > 0)
125             {
126                 sql += " AND id_type = ? ALLOW FILTERING";
127                 ++params;
128             }
129 
130             m_QueryArr[0].query->SetSQL(sql, params);
131             m_QueryArr[0].query->BindStr(0, m_Accession);
132 
133             unsigned int param = 1;
134             if( m_Version > 0)
135             {
136                 m_QueryArr[0].query->BindInt16( param, m_Version); param++;
137             }
138             if( m_SeqIdType > 0)
139             {
140                 m_QueryArr[0].query->BindInt16( param, m_SeqIdType);
141             }
142 
143             SetupQueryCB3( m_QueryArr[0].query);
144             UpdateLastActivity();
145             m_QueryArr[0].query->Query( CASS_CONSISTENCY_LOCAL_QUORUM,
146                                         m_Async, true, m_PageSize);
147             restarted = true;
148             m_State = eFetchStarted;
149             break;
150         }
151 
152         case eFetchStarted:
153         {
154             if( CheckReady( m_QueryArr[0].query, m_RestartCounter, restarted))
155             {
156                 bool do_next = true;
157                 auto state = m_QueryArr[0].query->NextRow();
158                 while( do_next && state == ar_dataready)
159                 {
160                     SAccVerHistRec record;
161                     record.accession = m_Accession;
162                     record.version   = m_QueryArr[0].query->FieldGetInt16Value( 0, 0);
163                     record.gi        = m_QueryArr[0].query->FieldGetInt64Value( 1, 0);
164                     record.date      = m_QueryArr[0].query->FieldGetInt64Value( 2, 0);
165                     record.chain     = m_QueryArr[0].query->FieldGetInt64Value( 3, 0);
166                     record.seq_id_type = m_QueryArr[0].query->FieldGetInt16Value( 4, 0);
167                     record.sat       = m_QueryArr[0].query->FieldGetInt16Value( 5, 0);
168                     record.sat_key   = m_QueryArr[0].query->FieldGetInt32Value( 6, 0);
169 
170                     if( m_Consume)
171                     {
172                         do_next = m_Consume( move( record), false);
173                     }
174                     if( do_next)
175                     {
176                         state = m_QueryArr[0].query->NextRow();
177                     }
178                 }
179                 if( !do_next || m_QueryArr[0].query->IsEOF())
180                 {
181                     if( m_Consume)
182                     {
183                         m_Consume( SAccVerHistRec(), true);
184                     }
185                     CloseAll();
186                     m_State = eDone;
187                 }
188             }
189             else if( restarted)
190             {
191                 ++m_RestartCounter;
192                 m_QueryArr[0].query->Close();
193                 m_State = eInit;
194             }
195             UpdateLastActivity();
196             break;
197         }
198 
199         default:
200         {
201             char msg[1024];
202             snprintf(msg, sizeof(msg),
203                 "Failed to fetch accession history (key=%s.%s|%hd|%hd) unexpected state (%d)",
204                      m_Keyspace.c_str(), m_Accession.c_str(), m_Version, m_SeqIdType,
205                      static_cast<int>(m_State));
206             Error( CRequestStatus::e502_BadGateway,
207                    CCassandraException::eQueryFailed, eDiag_Error, msg);
208         }
209         }
210     } while( restarted);
211 }
212 
213 END_IDBLOB_SCOPE
214