1 /*  $Id: fetch_split_history.cpp 623326 2021-01-12 13:17:34Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitrii Saprykin
27  *
28  * File Description:
29  *
30  * Fetch operation for Cassandra blob split history
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <objtools/pubseq_gateway/impl/cassandra/blob_task/fetch_split_history.hpp>
37 
38 #include <memory>
39 #include <string>
40 #include <utility>
41 
42 #include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
43 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
44 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
45 #include <objtools/pubseq_gateway/impl/cassandra/blob_record.hpp>
46 
47 BEGIN_IDBLOB_SCOPE
48 USING_NCBI_SCOPE;
49 
CCassBlobTaskFetchSplitHistory(unsigned int op_timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> conn,const string & keyspace,CBlobRecord::TSatKey sat_key,TConsumeCallback consume_callback,TDataErrorCallback data_error_cb)50 CCassBlobTaskFetchSplitHistory::CCassBlobTaskFetchSplitHistory(
51     unsigned int op_timeout_ms,
52     unsigned int max_retries,
53     shared_ptr<CCassConnection> conn,
54     const string & keyspace,
55     CBlobRecord::TSatKey sat_key,
56     TConsumeCallback consume_callback,
57     TDataErrorCallback data_error_cb
58 )
59     : CCassBlobTaskFetchSplitHistory(
60         op_timeout_ms, max_retries, conn, keyspace, sat_key, kAllVersions, move(consume_callback), data_error_cb
61     )
62 {
63 }
64 
CCassBlobTaskFetchSplitHistory(unsigned int timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> connection,const string & keyspace,CBlobRecord::TSatKey sat_key,SSplitHistoryRecord::TSplitVersion split_version,TConsumeCallback consume_callback,TDataErrorCallback data_error_cb)65 CCassBlobTaskFetchSplitHistory::CCassBlobTaskFetchSplitHistory(
66     unsigned int timeout_ms,
67     unsigned int max_retries,
68     shared_ptr<CCassConnection> connection,
69     const string & keyspace,
70     CBlobRecord::TSatKey sat_key,
71     SSplitHistoryRecord::TSplitVersion split_version,
72     TConsumeCallback consume_callback,
73     TDataErrorCallback data_error_cb
74 )
75     : CCassBlobWaiter(timeout_ms, connection, keyspace, sat_key, true, max_retries, move(data_error_cb))
76     , m_SplitVersion(split_version)
77     , m_ConsumeCallback(move(consume_callback))
78     , m_RestartCounter(0)
79 {
80 }
81 
SetConsumeCallback(TConsumeCallback callback)82 void CCassBlobTaskFetchSplitHistory::SetConsumeCallback(TConsumeCallback callback)
83 {
84     m_ConsumeCallback = move(callback);
85 }
86 
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)87 void CCassBlobTaskFetchSplitHistory::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
88 {
89     if (callback && m_State != eInit) {
90         NCBI_THROW(CCassandraException, eSeqFailed,
91            "CCassBlobTaskFetchSplitHistory: DataReadyCB can't be assigned "
92            "after the loading process has started");
93     }
94     CCassBlobWaiter::SetDataReadyCB3(callback);
95 }
96 
Wait1(void)97 void CCassBlobTaskFetchSplitHistory::Wait1(void)
98 {
99     bool restarted;
100     do {
101         restarted = false;
102         switch (m_State) {
103             case eError:
104             case eDone:
105                 return;
106 
107             case eInit: {
108                 m_QueryArr.resize(1);
109                 m_QueryArr[0] = {m_Conn->NewQuery(), 0};
110                 auto query = m_QueryArr[0].query;
111                 string sql = "SELECT split_version, last_modified, id2_info FROM " + GetKeySpace() +
112                     ".blob_split_history WHERE sat_key = ?";
113                 if (m_SplitVersion == kAllVersions) {
114                     query->SetSQL(sql, 1);
115                     query->BindInt32(0, m_Key);
116                 } else {
117                     sql.append(" and split_version = ?");
118                     query->SetSQL(sql, 2);
119                     query->BindInt32(0, m_Key);
120                     query->BindInt32(1, m_SplitVersion);
121                 }
122 
123                 SetupQueryCB3(query);
124                 UpdateLastActivity();
125                 query->Query(GetQueryConsistency(), m_Async, true);
126                 m_State = eWaitingForFetch;
127                 break;
128             }
129 
130             case eWaitingForFetch: {
131                 auto query = m_QueryArr[0].query;
132                 if (CheckReady(query, m_RestartCounter, restarted)) {
133                     while (query->NextRow() == ar_dataready) {
134                         size_t new_item_idx = m_Result.size();
135                         m_Result.resize(new_item_idx + 1);
136                         m_Result[new_item_idx].sat_key = m_Key;
137                         m_Result[new_item_idx].split_version = query->FieldGetInt32Value(0, 0);
138                         m_Result[new_item_idx].modified = query->FieldGetInt64Value(1, 0);
139                         m_Result[new_item_idx].id2_info = query->FieldGetStrValueDef(2, "");
140                     }
141                     if (query->IsEOF()) {
142                         if (m_ConsumeCallback) {
143                             m_ConsumeCallback(move(m_Result));
144                             m_Result.clear();
145                         }
146 
147                         CloseAll();
148                         m_State = eDone;
149                     }
150                 } else if (restarted) {
151                     ++m_RestartCounter;
152                     query->Close();
153                     m_State = eInit;
154                     m_Result.clear();
155                 }
156                 UpdateLastActivity();
157                 break;
158             }
159 
160             default: {
161                 char  msg[1024];
162                 snprintf(msg, sizeof(msg), "Failed to fetch bioseq info (key=%s.%d.%d) unexpected state (%d)",
163                     m_Keyspace.c_str(),
164                     m_Key,
165                     static_cast<int>(m_SplitVersion),
166                     static_cast<int>(m_State));
167                 Error(CRequestStatus::e502_BadGateway, CCassandraException::eQueryFailed, eDiag_Error, msg);
168             }
169         }
170     } while(restarted);
171 }
172 
173 END_IDBLOB_SCOPE
174