1 /* $Id: fetch_split_history.cpp 623326 2021-01-12 13:17:34Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitrii Saprykin
27 *
28 * File Description:
29 *
30 * Fetch operation for Cassandra blob split history
31 *
32 */
33
34 #include <ncbi_pch.hpp>
35
36 #include <objtools/pubseq_gateway/impl/cassandra/blob_task/fetch_split_history.hpp>
37
38 #include <memory>
39 #include <string>
40 #include <utility>
41
42 #include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
43 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
44 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
45 #include <objtools/pubseq_gateway/impl/cassandra/blob_record.hpp>
46
47 BEGIN_IDBLOB_SCOPE
48 USING_NCBI_SCOPE;
49
CCassBlobTaskFetchSplitHistory(unsigned int op_timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> conn,const string & keyspace,CBlobRecord::TSatKey sat_key,TConsumeCallback consume_callback,TDataErrorCallback data_error_cb)50 CCassBlobTaskFetchSplitHistory::CCassBlobTaskFetchSplitHistory(
51 unsigned int op_timeout_ms,
52 unsigned int max_retries,
53 shared_ptr<CCassConnection> conn,
54 const string & keyspace,
55 CBlobRecord::TSatKey sat_key,
56 TConsumeCallback consume_callback,
57 TDataErrorCallback data_error_cb
58 )
59 : CCassBlobTaskFetchSplitHistory(
60 op_timeout_ms, max_retries, conn, keyspace, sat_key, kAllVersions, move(consume_callback), data_error_cb
61 )
62 {
63 }
64
CCassBlobTaskFetchSplitHistory(unsigned int timeout_ms,unsigned int max_retries,shared_ptr<CCassConnection> connection,const string & keyspace,CBlobRecord::TSatKey sat_key,SSplitHistoryRecord::TSplitVersion split_version,TConsumeCallback consume_callback,TDataErrorCallback data_error_cb)65 CCassBlobTaskFetchSplitHistory::CCassBlobTaskFetchSplitHistory(
66 unsigned int timeout_ms,
67 unsigned int max_retries,
68 shared_ptr<CCassConnection> connection,
69 const string & keyspace,
70 CBlobRecord::TSatKey sat_key,
71 SSplitHistoryRecord::TSplitVersion split_version,
72 TConsumeCallback consume_callback,
73 TDataErrorCallback data_error_cb
74 )
75 : CCassBlobWaiter(timeout_ms, connection, keyspace, sat_key, true, max_retries, move(data_error_cb))
76 , m_SplitVersion(split_version)
77 , m_ConsumeCallback(move(consume_callback))
78 , m_RestartCounter(0)
79 {
80 }
81
SetConsumeCallback(TConsumeCallback callback)82 void CCassBlobTaskFetchSplitHistory::SetConsumeCallback(TConsumeCallback callback)
83 {
84 m_ConsumeCallback = move(callback);
85 }
86
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)87 void CCassBlobTaskFetchSplitHistory::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
88 {
89 if (callback && m_State != eInit) {
90 NCBI_THROW(CCassandraException, eSeqFailed,
91 "CCassBlobTaskFetchSplitHistory: DataReadyCB can't be assigned "
92 "after the loading process has started");
93 }
94 CCassBlobWaiter::SetDataReadyCB3(callback);
95 }
96
Wait1(void)97 void CCassBlobTaskFetchSplitHistory::Wait1(void)
98 {
99 bool restarted;
100 do {
101 restarted = false;
102 switch (m_State) {
103 case eError:
104 case eDone:
105 return;
106
107 case eInit: {
108 m_QueryArr.resize(1);
109 m_QueryArr[0] = {m_Conn->NewQuery(), 0};
110 auto query = m_QueryArr[0].query;
111 string sql = "SELECT split_version, last_modified, id2_info FROM " + GetKeySpace() +
112 ".blob_split_history WHERE sat_key = ?";
113 if (m_SplitVersion == kAllVersions) {
114 query->SetSQL(sql, 1);
115 query->BindInt32(0, m_Key);
116 } else {
117 sql.append(" and split_version = ?");
118 query->SetSQL(sql, 2);
119 query->BindInt32(0, m_Key);
120 query->BindInt32(1, m_SplitVersion);
121 }
122
123 SetupQueryCB3(query);
124 UpdateLastActivity();
125 query->Query(GetQueryConsistency(), m_Async, true);
126 m_State = eWaitingForFetch;
127 break;
128 }
129
130 case eWaitingForFetch: {
131 auto query = m_QueryArr[0].query;
132 if (CheckReady(query, m_RestartCounter, restarted)) {
133 while (query->NextRow() == ar_dataready) {
134 size_t new_item_idx = m_Result.size();
135 m_Result.resize(new_item_idx + 1);
136 m_Result[new_item_idx].sat_key = m_Key;
137 m_Result[new_item_idx].split_version = query->FieldGetInt32Value(0, 0);
138 m_Result[new_item_idx].modified = query->FieldGetInt64Value(1, 0);
139 m_Result[new_item_idx].id2_info = query->FieldGetStrValueDef(2, "");
140 }
141 if (query->IsEOF()) {
142 if (m_ConsumeCallback) {
143 m_ConsumeCallback(move(m_Result));
144 m_Result.clear();
145 }
146
147 CloseAll();
148 m_State = eDone;
149 }
150 } else if (restarted) {
151 ++m_RestartCounter;
152 query->Close();
153 m_State = eInit;
154 m_Result.clear();
155 }
156 UpdateLastActivity();
157 break;
158 }
159
160 default: {
161 char msg[1024];
162 snprintf(msg, sizeof(msg), "Failed to fetch bioseq info (key=%s.%d.%d) unexpected state (%d)",
163 m_Keyspace.c_str(),
164 m_Key,
165 static_cast<int>(m_SplitVersion),
166 static_cast<int>(m_State));
167 Error(CRequestStatus::e502_BadGateway, CCassandraException::eQueryFailed, eDiag_Error, msg);
168 }
169 }
170 } while(restarted);
171 }
172
173 END_IDBLOB_SCOPE
174