1 /*  $Id: resolve_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description: resolve processor
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include "resolve_processor.hpp"
35 #include "pubseq_gateway.hpp"
36 #include "pubseq_gateway_cache_utils.hpp"
37 #include "pubseq_gateway_convert_utils.hpp"
38 
39 USING_NCBI_SCOPE;
40 
41 using namespace std::placeholders;
42 
43 
CPSGS_ResolveProcessor()44 CPSGS_ResolveProcessor::CPSGS_ResolveProcessor() :
45     m_ResolveRequest(nullptr)
46 {}
47 
48 
CPSGS_ResolveProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority)49 CPSGS_ResolveProcessor::CPSGS_ResolveProcessor(
50                                 shared_ptr<CPSGS_Request> request,
51                                 shared_ptr<CPSGS_Reply> reply,
52                                 TProcessorPriority  priority) :
53     CPSGS_CassProcessorBase(request, reply, priority),
54     CPSGS_ResolveBase(request, reply,
55                       bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveFinished,
56                            this, _1),
57                       bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveError,
58                            this, _1, _2, _3, _4),
59                       bind(&CPSGS_ResolveProcessor::x_OnResolutionGoodData,
60                            this))
61 {
62     // Convenience to avoid calling
63     // m_Request->GetRequest<SPSGS_ResolveRequest>() everywhere
64     m_ResolveRequest = & request->GetRequest<SPSGS_ResolveRequest>();
65 }
66 
67 
~CPSGS_ResolveProcessor()68 CPSGS_ResolveProcessor::~CPSGS_ResolveProcessor()
69 {}
70 
71 
72 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const73 CPSGS_ResolveProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
74                                         shared_ptr<CPSGS_Reply> reply,
75                                         TProcessorPriority  priority) const
76 {
77     if (!IsCassandraProcessorEnabled(request))
78         return nullptr;
79 
80     if (request->GetRequestType() == CPSGS_Request::ePSGS_ResolveRequest) {
81         auto *      app = CPubseqGatewayApp::GetInstance();
82         auto        startup_data_state = app->GetStartupDataState();
83         if (startup_data_state != ePSGS_StartupDataOK) {
84             if (request->NeedTrace()) {
85                 reply->SendTrace("Cannot create " + GetName() +
86                                  " processor because Cassandra DB "
87                                  "is not available.\n" +
88                                  GetCassStartupDataStateMessage(startup_data_state),
89                                  request->GetStartTimestamp());
90             }
91             return nullptr;
92         }
93 
94         return new CPSGS_ResolveProcessor(request, reply, priority);
95     }
96     return nullptr;
97 }
98 
99 
Process(void)100 void CPSGS_ResolveProcessor::Process(void)
101 {
102     // In both cases: sync or async resolution --> a callback will be called
103     ResolveInputSeqId();
104 }
105 
106 
107 // This callback is called in all cases when there is no valid resolution, i.e.
108 // 404, or any kind of errors
109 void
x_OnSeqIdResolveError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)110 CPSGS_ResolveProcessor::x_OnSeqIdResolveError(
111                         CRequestStatus::ECode  status,
112                         int  code,
113                         EDiagSev  severity,
114                         const string &  message)
115 {
116     if (m_Cancelled) {
117         m_Completed = true;
118         return;
119     }
120 
121     CRequestContextResetter     context_resetter;
122     IPSGS_Processor::m_Request->SetRequestContext();
123 
124     UpdateOverallStatus(status);
125 
126     size_t      item_id = IPSGS_Processor::m_Reply->GetItemId();
127     if (status == CRequestStatus::e404_NotFound) {
128         IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
129                                                        message, status,
130                                                        ePSGS_NoBioseqInfo,
131                                                        eDiag_Error);
132     } else {
133         PSG_WARNING(message);
134         IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
135                                                        message, status,
136                                                        ePSGS_BioseqInfoError,
137                                                        severity);
138     }
139     IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
140 
141     m_Completed = true;
142     SignalFinishProcessing();
143 }
144 
145 
146 // This callback is called only in case of a valid resolution
147 void
x_OnSeqIdResolveFinished(SBioseqResolution && bioseq_resolution)148 CPSGS_ResolveProcessor::x_OnSeqIdResolveFinished(
149                             SBioseqResolution &&  bioseq_resolution)
150 {
151     if (m_Cancelled) {
152         m_Completed = true;
153         return;
154     }
155 
156     CRequestContextResetter     context_resetter;
157     IPSGS_Processor::m_Request->SetRequestContext();
158 
159     x_SendBioseqInfo(bioseq_resolution);
160 
161     m_Completed = true;
162     SignalFinishProcessing();
163 }
164 
165 
166 void
x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)167 CPSGS_ResolveProcessor::x_SendBioseqInfo(SBioseqResolution &  bioseq_resolution)
168 {
169     auto    effective_output_format = m_ResolveRequest->m_OutputFormat;
170     if (effective_output_format == SPSGS_ResolveRequest::ePSGS_NativeFormat ||
171         effective_output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
172         effective_output_format = SPSGS_ResolveRequest::ePSGS_JsonFormat;
173     }
174 
175     if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
176         bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
177         AdjustBioseqAccession(bioseq_resolution);
178 
179     string              data_to_send;
180     if (effective_output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat) {
181         data_to_send = ToJson(bioseq_resolution.m_BioseqInfo,
182                               m_ResolveRequest->m_IncludeDataFlags).
183                                                 Repr(CJsonNode::fStandardJson);
184     } else {
185         data_to_send = ToBioseqProtobuf(bioseq_resolution.m_BioseqInfo);
186     }
187 
188     size_t              item_id = IPSGS_Processor::m_Reply->GetItemId();
189     IPSGS_Processor::m_Reply->PrepareBioseqData(item_id, GetName(),
190                                                 data_to_send,
191                                                 effective_output_format);
192     IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
193 }
194 
195 
Cancel(void)196 void CPSGS_ResolveProcessor::Cancel(void)
197 {
198     m_Cancelled = true;
199     CancelLoaders();
200 }
201 
202 
GetStatus(void)203 IPSGS_Processor::EPSGS_Status CPSGS_ResolveProcessor::GetStatus(void)
204 {
205     auto    status = CPSGS_CassProcessorBase::GetStatus();
206     if (status == IPSGS_Processor::ePSGS_InProgress) {
207         return status;
208     }
209 
210     if (m_Cancelled) {
211         return IPSGS_Processor::ePSGS_Cancelled;
212     }
213 
214     return status;
215 }
216 
217 
GetName(void) const218 string CPSGS_ResolveProcessor::GetName(void) const
219 {
220     return "Cassandra-resolve";
221 }
222 
223 
ProcessEvent(void)224 void CPSGS_ResolveProcessor::ProcessEvent(void)
225 {
226     x_Peek(true);
227 }
228 
229 
x_Peek(bool need_wait)230 void CPSGS_ResolveProcessor::x_Peek(bool  need_wait)
231 {
232     if (m_Cancelled) {
233         return;
234     }
235 
236     if (m_InPeek) {
237         return;
238     }
239 
240     m_InPeek = true;
241 
242     // 1 -> call m_Loader->Wait1 to pick data
243     // 2 -> check if we have ready-to-send buffers
244     // 3 -> call reply->Send()  to send what we have if it is ready
245     bool        overall_final_state = false;
246 
247     while (true) {
248         auto initial_size = m_FetchDetails.size();
249 
250         for (auto &  details: m_FetchDetails) {
251             if (details)
252                 overall_final_state |= x_Peek(details, need_wait);
253         }
254 
255         if (initial_size == m_FetchDetails.size())
256             break;
257     }
258 
259     // Ready packets needs to be send only once when everything is finished
260     if (overall_final_state) {
261         if (AreAllFinishedRead()) {
262             if (IPSGS_Processor::m_Reply->IsOutputReady()) {
263                 IPSGS_Processor::m_Reply->Flush(false);
264             }
265         }
266     }
267 
268     m_InPeek = false;
269 }
270 
271 
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)272 bool CPSGS_ResolveProcessor::x_Peek(unique_ptr<CCassFetch> &  fetch_details,
273                                     bool  need_wait)
274 {
275     if (!fetch_details->GetLoader())
276         return true;
277 
278     bool    final_state = false;
279     if (need_wait)
280         if (!fetch_details->ReadFinished()) {
281             final_state = fetch_details->GetLoader()->Wait();
282             if (final_state) {
283                 fetch_details->SetReadFinished();
284             }
285         }
286 
287     if (fetch_details->GetLoader()->HasError() &&
288             IPSGS_Processor::m_Reply->IsOutputReady() &&
289             ! IPSGS_Processor::m_Reply->IsFinished()) {
290         // Send an error
291         string      error = fetch_details->GetLoader()->LastError();
292         auto *      app = CPubseqGatewayApp::GetInstance();
293 
294         app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
295         PSG_ERROR(error);
296 
297         IPSGS_Processor::m_Reply->PrepareProcessorMessage(
298                 IPSGS_Processor::m_Reply->GetItemId(),
299                 GetName(), error, CRequestStatus::e500_InternalServerError,
300                 ePSGS_UnknownError, eDiag_Error);
301 
302         // Mark finished
303         UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
304         fetch_details->SetReadFinished();
305         SignalFinishProcessing();
306     }
307 
308     return final_state;
309 }
310 
311 
x_OnResolutionGoodData(void)312 void CPSGS_ResolveProcessor::x_OnResolutionGoodData(void)
313 {
314     // The resolution process started to receive data which look good so
315     // the dispatcher should be notified that the other processors can be
316     // stopped
317     if (m_Cancelled || m_Completed)
318         return;
319 
320     if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
321         m_Completed = true;
322     }
323 }
324 
325