1 /* $Id: get_public_comment.cpp 623326 2021-01-12 13:17:34Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitrii Saprykin
27 *
28 * File Description:
29 *
30 * Task to resolve blob public comment from Cassandra
31 *
32 */
33
34 #include <ncbi_pch.hpp>
35
36 #include <objtools/pubseq_gateway/impl/cassandra/status_history/get_public_comment.hpp>
37
38 #include <memory>
39 #include <string>
40 #include <utility>
41
42 #include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
43 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
44 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
45
46 BEGIN_IDBLOB_SCOPE
47 USING_NCBI_SCOPE;
48
49 BEGIN_SCOPE()
50
51 constexpr int64_t kMaxReplacesRetries = 5;
52 constexpr TBlobStatusFlagsBase kWithdrawnMask =
53 static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eWithdrawn) +
54 static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eWithdrawnPermanently);
55 const char * kDefaultSuppressedMessage = "BLOB_STATUS_SUPPRESSED";
56 const char * kDefaultWithdrawnMessage= "BLOB_STATUS_WITHDRAWN";
57
IsBlobWithdrawn(TBlobFlagBase flags)58 bool IsBlobWithdrawn(TBlobFlagBase flags) {
59 return (flags & static_cast<TBlobFlagBase>(EBlobFlags::eWithdrawn)) != 0;
60 }
61
IsBlobSuppressed(TBlobFlagBase flags)62 bool IsBlobSuppressed(TBlobFlagBase flags) {
63 return (flags & static_cast<TBlobFlagBase>(EBlobFlags::eSuppress)) != 0;
64 }
65
SameWithdrawn(TBlobStatusFlagsBase a,TBlobStatusFlagsBase b)66 bool SameWithdrawn(TBlobStatusFlagsBase a, TBlobStatusFlagsBase b) {
67 return (a & kWithdrawnMask) == (b & kWithdrawnMask);
68 }
69
IsHistorySuppressed(TBlobStatusFlagsBase flags)70 bool IsHistorySuppressed(TBlobStatusFlagsBase flags) {
71 return (flags & static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eSuppressPermanently)) != 0;
72 }
73
END_SCOPE()74 END_SCOPE()
75
76 CCassStatusHistoryTaskGetPublicComment::CCassStatusHistoryTaskGetPublicComment(
77 unsigned int op_timeout_ms,
78 unsigned int max_retries,
79 shared_ptr<CCassConnection> conn,
80 const string & keyspace,
81 CBlobRecord const &blob,
82 TDataErrorCallback data_error_cb
83 )
84 : CCassBlobWaiter(
85 op_timeout_ms, conn, keyspace, blob.GetKey(),
86 true, max_retries, move(data_error_cb)
87 )
88 , m_CommentCallback(nullptr)
89 , m_Messages(nullptr)
90 , m_BlobFlags(blob.GetFlags())
91 , m_FirstHistoryFlags(-1)
92 , m_MatchingStatusRowFound(false)
93 , m_ReplacesRetries(kMaxReplacesRetries)
94 {}
95
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)96 void CCassStatusHistoryTaskGetPublicComment::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)
97 {
98 if (callback && m_State != eInit) {
99 NCBI_THROW(CCassandraException, eSeqFailed,
100 "CCassStatusHistoryTaskGetPublicComment: DataReadyCB can't be assigned "
101 "after the loading process has started");
102 }
103 CCassBlobWaiter::SetDataReadyCB3(callback);
104 }
105
JumpToReplaced(CBlobRecord::TSatKey replaced)106 void CCassStatusHistoryTaskGetPublicComment::JumpToReplaced(CBlobRecord::TSatKey replaced)
107 {
108 --m_ReplacesRetries;
109 m_Key = replaced;
110 m_MatchingStatusRowFound = false;
111 m_State = eStartReading;
112 m_PublicComment.clear();
113 }
114
SetMessages(CPSGMessages const * messages)115 void CCassStatusHistoryTaskGetPublicComment::SetMessages(CPSGMessages const * messages)
116 {
117 m_Messages = messages;
118 }
119
SetCommentCallback(TCommentCallback callback)120 void CCassStatusHistoryTaskGetPublicComment::SetCommentCallback(TCommentCallback callback)
121 {
122 m_CommentCallback = move(callback);
123 }
124
Wait1()125 void CCassStatusHistoryTaskGetPublicComment::Wait1()
126 {
127 bool b_need_repeat;
128 do {
129 b_need_repeat = false;
130 switch (m_State) {
131 case eError:
132 case eDone:
133 return;
134
135 case eInit: {
136 if (!IsBlobSuppressed(m_BlobFlags) && !IsBlobWithdrawn(m_BlobFlags)) {
137 if (m_CommentCallback) {
138 m_CommentCallback("", false);
139 }
140 m_State = eDone;
141 } else {
142 m_State = eStartReading;
143 b_need_repeat = true;
144 }
145 break;
146 }
147
148 case eStartReading: {
149 CloseAll();
150 m_QueryArr.clear();
151 m_QueryArr.push_back({m_Conn->NewQuery(), 0});
152 auto query = m_QueryArr[0].query;
153 string sql =
154 "SELECT flags, public_comment, replaces "
155 "FROM " + GetKeySpace() + ".blob_status_history WHERE sat_key = ?";
156 query->SetSQL(sql, 1);
157 query->BindInt32(0, m_Key);
158 SetupQueryCB3(query);
159 UpdateLastActivity();
160 query->Query(GetQueryConsistency(), m_Async, true);
161 m_State = eReadingHistory;
162 break;
163 }
164
165 case eReadingHistory: {
166 auto query = m_QueryArr[0].query;
167 if (CheckReady(m_QueryArr[0])) {
168 while (m_State == eReadingHistory && query->NextRow() == ar_dataready) {
169 int64_t flags = query->FieldGetInt64Value(0, 0);
170 string comment = query->FieldGetStrValueDef(1, "");
171 CBlobRecord::TSatKey replaces = query->FieldGetInt32Value(2, 0);
172
173 // blob_prop does not have full withdrawn representation so
174 // as a workaround we use first history record flags
175 if (m_FirstHistoryFlags == -1) {
176 m_FirstHistoryFlags = flags;
177 }
178 // blob is withdrawn
179 if (IsBlobWithdrawn(m_BlobFlags)) {
180 if (!SameWithdrawn(flags, m_FirstHistoryFlags)) {
181 if (m_MatchingStatusRowFound) {
182 m_State = eReturnResult;
183 } else if (replaces > 0 && m_ReplacesRetries > 0) {
184 JumpToReplaced(replaces);
185 } else {
186 m_State = eReturnResult;
187 }
188 b_need_repeat = true;
189 } else {
190 m_MatchingStatusRowFound = true;
191 m_PublicComment = comment;
192 }
193 }
194 // blob is suppressed
195 else {
196 if (!IsHistorySuppressed(flags)) {
197 if (m_MatchingStatusRowFound) {
198 m_State = eReturnResult;
199 } else if (replaces > 0 && m_ReplacesRetries > 0) {
200 JumpToReplaced(replaces);
201 } else {
202 m_State = eReturnResult;
203 }
204 b_need_repeat = true;
205 } else {
206 m_MatchingStatusRowFound = true;
207 m_PublicComment = comment;
208 }
209 }
210 }
211 if (query->IsEOF()) {
212 m_State = eReturnResult;
213 b_need_repeat = true;
214 }
215 }
216 break;
217 }
218
219 case eReturnResult: {
220 CloseAll();
221 if (m_CommentCallback) {
222 if (m_PublicComment.empty()) {
223 if (m_Messages != nullptr) {
224 string comment;
225 const char * message_type = nullptr;
226 if (IsBlobSuppressed(m_BlobFlags)) {
227 comment = m_Messages->Get(kDefaultSuppressedMessage);
228 message_type = kDefaultSuppressedMessage;
229 } else if (IsBlobWithdrawn(m_BlobFlags)) {
230 comment = m_Messages->Get(kDefaultWithdrawnMessage);
231 message_type = kDefaultWithdrawnMessage;
232 }
233 if (comment.empty() && message_type != nullptr) {
234 char msg[1024];
235 snprintf(msg, sizeof(msg), "Message is empty for (%s)", message_type);
236 Error(CRequestStatus::e502_BadGateway, CCassandraException::eMissData, eDiag_Error, msg);
237 } else {
238 m_CommentCallback(move(comment), true);
239 }
240 } else {
241 Error(CRequestStatus::e502_BadGateway, CCassandraException::eMissData,
242 eDiag_Error, "Messages provider not configured for Public Comment retrieval");
243 }
244 } else {
245 m_CommentCallback(move(m_PublicComment), true);
246 }
247 }
248 m_State = eDone;
249 break;
250 }
251
252 default: {
253 char msg[1024];
254 snprintf(msg, sizeof(msg), "Failed to get public comment for record (key=%s.%d) unexpected state (%d)",
255 m_Keyspace.c_str(), m_Key, static_cast<int>(m_State));
256 Error(CRequestStatus::e502_BadGateway, CCassandraException::eQueryFailed, eDiag_Error, msg);
257 }
258 }
259 } while (b_need_repeat);
260 }
261
262 END_IDBLOB_SCOPE
263