1 /*  $Id: get_public_comment.cpp 623326 2021-01-12 13:17:34Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitrii Saprykin
27  *
28  * File Description:
29  *
30  * Task to resolve blob public comment from Cassandra
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <objtools/pubseq_gateway/impl/cassandra/status_history/get_public_comment.hpp>
37 
38 #include <memory>
39 #include <string>
40 #include <utility>
41 
42 #include <objtools/pubseq_gateway/impl/cassandra/cass_blob_op.hpp>
43 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
44 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
45 
46 BEGIN_IDBLOB_SCOPE
47 USING_NCBI_SCOPE;
48 
49 BEGIN_SCOPE()
50 
51 constexpr int64_t kMaxReplacesRetries = 5;
52 constexpr TBlobStatusFlagsBase kWithdrawnMask =
53     static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eWithdrawn) +
54     static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eWithdrawnPermanently);
55 const char * kDefaultSuppressedMessage = "BLOB_STATUS_SUPPRESSED";
56 const char * kDefaultWithdrawnMessage= "BLOB_STATUS_WITHDRAWN";
57 
IsBlobWithdrawn(TBlobFlagBase flags)58 bool IsBlobWithdrawn(TBlobFlagBase flags) {
59     return (flags & static_cast<TBlobFlagBase>(EBlobFlags::eWithdrawn)) != 0;
60 }
61 
IsBlobSuppressed(TBlobFlagBase flags)62 bool IsBlobSuppressed(TBlobFlagBase flags) {
63     return (flags & static_cast<TBlobFlagBase>(EBlobFlags::eSuppress)) != 0;
64 }
65 
SameWithdrawn(TBlobStatusFlagsBase a,TBlobStatusFlagsBase b)66 bool SameWithdrawn(TBlobStatusFlagsBase a, TBlobStatusFlagsBase b) {
67     return (a & kWithdrawnMask) == (b & kWithdrawnMask);
68 }
69 
IsHistorySuppressed(TBlobStatusFlagsBase flags)70 bool IsHistorySuppressed(TBlobStatusFlagsBase flags) {
71     return (flags & static_cast<TBlobStatusFlagsBase>(EBlobStatusFlags::eSuppressPermanently)) != 0;
72 }
73 
END_SCOPE()74 END_SCOPE()
75 
76 CCassStatusHistoryTaskGetPublicComment::CCassStatusHistoryTaskGetPublicComment(
77     unsigned int op_timeout_ms,
78     unsigned int max_retries,
79     shared_ptr<CCassConnection> conn,
80     const string & keyspace,
81     CBlobRecord const &blob,
82     TDataErrorCallback data_error_cb
83 )
84     : CCassBlobWaiter(
85         op_timeout_ms, conn, keyspace, blob.GetKey(),
86         true, max_retries, move(data_error_cb)
87       )
88     , m_CommentCallback(nullptr)
89     , m_Messages(nullptr)
90     , m_BlobFlags(blob.GetFlags())
91     , m_FirstHistoryFlags(-1)
92     , m_MatchingStatusRowFound(false)
93     , m_ReplacesRetries(kMaxReplacesRetries)
94 {}
95 
SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver> callback)96 void CCassStatusHistoryTaskGetPublicComment::SetDataReadyCB(shared_ptr<CCassDataCallbackReceiver>  callback)
97 {
98     if (callback && m_State != eInit) {
99         NCBI_THROW(CCassandraException, eSeqFailed,
100            "CCassStatusHistoryTaskGetPublicComment: DataReadyCB can't be assigned "
101            "after the loading process has started");
102     }
103     CCassBlobWaiter::SetDataReadyCB3(callback);
104 }
105 
JumpToReplaced(CBlobRecord::TSatKey replaced)106 void CCassStatusHistoryTaskGetPublicComment::JumpToReplaced(CBlobRecord::TSatKey replaced)
107 {
108     --m_ReplacesRetries;
109     m_Key = replaced;
110     m_MatchingStatusRowFound = false;
111     m_State = eStartReading;
112     m_PublicComment.clear();
113 }
114 
SetMessages(CPSGMessages const * messages)115 void CCassStatusHistoryTaskGetPublicComment::SetMessages(CPSGMessages const * messages)
116 {
117     m_Messages = messages;
118 }
119 
SetCommentCallback(TCommentCallback callback)120 void CCassStatusHistoryTaskGetPublicComment::SetCommentCallback(TCommentCallback callback)
121 {
122     m_CommentCallback = move(callback);
123 }
124 
Wait1()125 void CCassStatusHistoryTaskGetPublicComment::Wait1()
126 {
127     bool b_need_repeat;
128     do {
129         b_need_repeat = false;
130         switch (m_State) {
131             case eError:
132             case eDone:
133                 return;
134 
135             case eInit: {
136                 if (!IsBlobSuppressed(m_BlobFlags) && !IsBlobWithdrawn(m_BlobFlags)) {
137                     if (m_CommentCallback) {
138                         m_CommentCallback("", false);
139                     }
140                     m_State = eDone;
141                 } else {
142                     m_State = eStartReading;
143                     b_need_repeat = true;
144                 }
145                 break;
146             }
147 
148             case eStartReading: {
149                 CloseAll();
150                 m_QueryArr.clear();
151                 m_QueryArr.push_back({m_Conn->NewQuery(), 0});
152                 auto query = m_QueryArr[0].query;
153                 string sql =
154                     "SELECT flags, public_comment, replaces "
155                     "FROM " + GetKeySpace() + ".blob_status_history WHERE sat_key = ?";
156                 query->SetSQL(sql, 1);
157                 query->BindInt32(0, m_Key);
158                 SetupQueryCB3(query);
159                 UpdateLastActivity();
160                 query->Query(GetQueryConsistency(), m_Async, true);
161                 m_State = eReadingHistory;
162                 break;
163             }
164 
165             case eReadingHistory: {
166                 auto query = m_QueryArr[0].query;
167                 if (CheckReady(m_QueryArr[0])) {
168                     while (m_State == eReadingHistory && query->NextRow() == ar_dataready) {
169                         int64_t flags = query->FieldGetInt64Value(0, 0);
170                         string comment = query->FieldGetStrValueDef(1, "");
171                         CBlobRecord::TSatKey replaces = query->FieldGetInt32Value(2, 0);
172 
173                         // blob_prop does not have full withdrawn representation so
174                         // as a workaround we use first history record flags
175                         if (m_FirstHistoryFlags == -1) {
176                             m_FirstHistoryFlags = flags;
177                         }
178                         // blob is withdrawn
179                         if (IsBlobWithdrawn(m_BlobFlags)) {
180                             if (!SameWithdrawn(flags, m_FirstHistoryFlags)) {
181                                 if (m_MatchingStatusRowFound) {
182                                     m_State = eReturnResult;
183                                 } else if (replaces > 0 && m_ReplacesRetries > 0) {
184                                     JumpToReplaced(replaces);
185                                 } else {
186                                     m_State = eReturnResult;
187                                 }
188                                 b_need_repeat = true;
189                             } else {
190                                 m_MatchingStatusRowFound = true;
191                                 m_PublicComment = comment;
192                             }
193                         }
194                         // blob is suppressed
195                         else {
196                             if (!IsHistorySuppressed(flags)) {
197                                 if (m_MatchingStatusRowFound) {
198                                     m_State = eReturnResult;
199                                 } else if (replaces > 0 && m_ReplacesRetries > 0) {
200                                     JumpToReplaced(replaces);
201                                 } else {
202                                     m_State = eReturnResult;
203                                 }
204                                 b_need_repeat = true;
205                             } else {
206                                 m_MatchingStatusRowFound = true;
207                                 m_PublicComment = comment;
208                             }
209                         }
210                     }
211                     if (query->IsEOF()) {
212                         m_State = eReturnResult;
213                         b_need_repeat = true;
214                     }
215                 }
216                 break;
217             }
218 
219             case eReturnResult: {
220                 CloseAll();
221                 if (m_CommentCallback) {
222                     if (m_PublicComment.empty()) {
223                         if (m_Messages != nullptr) {
224                             string comment;
225                             const char * message_type = nullptr;
226                             if (IsBlobSuppressed(m_BlobFlags)) {
227                                 comment = m_Messages->Get(kDefaultSuppressedMessage);
228                                 message_type = kDefaultSuppressedMessage;
229                             } else if (IsBlobWithdrawn(m_BlobFlags)) {
230                                 comment = m_Messages->Get(kDefaultWithdrawnMessage);
231                                 message_type = kDefaultWithdrawnMessage;
232                             }
233                             if (comment.empty() && message_type != nullptr) {
234                                 char msg[1024];
235                                 snprintf(msg, sizeof(msg), "Message is empty for (%s)", message_type);
236                                 Error(CRequestStatus::e502_BadGateway, CCassandraException::eMissData, eDiag_Error, msg);
237                             } else {
238                                 m_CommentCallback(move(comment), true);
239                             }
240                         } else {
241                             Error(CRequestStatus::e502_BadGateway, CCassandraException::eMissData,
242                                 eDiag_Error, "Messages provider not configured for Public Comment retrieval");
243                         }
244                     } else {
245                         m_CommentCallback(move(m_PublicComment), true);
246                     }
247                 }
248                 m_State = eDone;
249                 break;
250             }
251 
252             default: {
253                 char msg[1024];
254                 snprintf(msg, sizeof(msg), "Failed to get public comment for record (key=%s.%d) unexpected state (%d)",
255                     m_Keyspace.c_str(), m_Key, static_cast<int>(m_State));
256                 Error(CRequestStatus::e502_BadGateway, CCassandraException::eQueryFailed, eDiag_Error, msg);
257             }
258         }
259     } while (b_need_repeat);
260 }
261 
262 END_IDBLOB_SCOPE
263