1 /* $Id: resolve_processor.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description: resolve processor
29 *
30 */
31
32 #include <ncbi_pch.hpp>
33
34 #include "resolve_processor.hpp"
35 #include "pubseq_gateway.hpp"
36 #include "pubseq_gateway_cache_utils.hpp"
37 #include "pubseq_gateway_convert_utils.hpp"
38
39 USING_NCBI_SCOPE;
40
41 using namespace std::placeholders;
42
43
CPSGS_ResolveProcessor()44 CPSGS_ResolveProcessor::CPSGS_ResolveProcessor() :
45 m_ResolveRequest(nullptr)
46 {}
47
48
CPSGS_ResolveProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority)49 CPSGS_ResolveProcessor::CPSGS_ResolveProcessor(
50 shared_ptr<CPSGS_Request> request,
51 shared_ptr<CPSGS_Reply> reply,
52 TProcessorPriority priority) :
53 CPSGS_CassProcessorBase(request, reply, priority),
54 CPSGS_ResolveBase(request, reply,
55 bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveFinished,
56 this, _1),
57 bind(&CPSGS_ResolveProcessor::x_OnSeqIdResolveError,
58 this, _1, _2, _3, _4),
59 bind(&CPSGS_ResolveProcessor::x_OnResolutionGoodData,
60 this))
61 {
62 // Convenience to avoid calling
63 // m_Request->GetRequest<SPSGS_ResolveRequest>() everywhere
64 m_ResolveRequest = & request->GetRequest<SPSGS_ResolveRequest>();
65 }
66
67
~CPSGS_ResolveProcessor()68 CPSGS_ResolveProcessor::~CPSGS_ResolveProcessor()
69 {}
70
71
72 IPSGS_Processor*
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const73 CPSGS_ResolveProcessor::CreateProcessor(shared_ptr<CPSGS_Request> request,
74 shared_ptr<CPSGS_Reply> reply,
75 TProcessorPriority priority) const
76 {
77 if (!IsCassandraProcessorEnabled(request))
78 return nullptr;
79
80 if (request->GetRequestType() == CPSGS_Request::ePSGS_ResolveRequest) {
81 auto * app = CPubseqGatewayApp::GetInstance();
82 auto startup_data_state = app->GetStartupDataState();
83 if (startup_data_state != ePSGS_StartupDataOK) {
84 if (request->NeedTrace()) {
85 reply->SendTrace("Cannot create " + GetName() +
86 " processor because Cassandra DB "
87 "is not available.\n" +
88 GetCassStartupDataStateMessage(startup_data_state),
89 request->GetStartTimestamp());
90 }
91 return nullptr;
92 }
93
94 return new CPSGS_ResolveProcessor(request, reply, priority);
95 }
96 return nullptr;
97 }
98
99
Process(void)100 void CPSGS_ResolveProcessor::Process(void)
101 {
102 // In both cases: sync or async resolution --> a callback will be called
103 ResolveInputSeqId();
104 }
105
106
107 // This callback is called in all cases when there is no valid resolution, i.e.
108 // 404, or any kind of errors
109 void
x_OnSeqIdResolveError(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)110 CPSGS_ResolveProcessor::x_OnSeqIdResolveError(
111 CRequestStatus::ECode status,
112 int code,
113 EDiagSev severity,
114 const string & message)
115 {
116 if (m_Cancelled) {
117 m_Completed = true;
118 return;
119 }
120
121 CRequestContextResetter context_resetter;
122 IPSGS_Processor::m_Request->SetRequestContext();
123
124 UpdateOverallStatus(status);
125
126 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
127 if (status == CRequestStatus::e404_NotFound) {
128 IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
129 message, status,
130 ePSGS_NoBioseqInfo,
131 eDiag_Error);
132 } else {
133 PSG_WARNING(message);
134 IPSGS_Processor::m_Reply->PrepareBioseqMessage(item_id, GetName(),
135 message, status,
136 ePSGS_BioseqInfoError,
137 severity);
138 }
139 IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
140
141 m_Completed = true;
142 SignalFinishProcessing();
143 }
144
145
146 // This callback is called only in case of a valid resolution
147 void
x_OnSeqIdResolveFinished(SBioseqResolution && bioseq_resolution)148 CPSGS_ResolveProcessor::x_OnSeqIdResolveFinished(
149 SBioseqResolution && bioseq_resolution)
150 {
151 if (m_Cancelled) {
152 m_Completed = true;
153 return;
154 }
155
156 CRequestContextResetter context_resetter;
157 IPSGS_Processor::m_Request->SetRequestContext();
158
159 x_SendBioseqInfo(bioseq_resolution);
160
161 m_Completed = true;
162 SignalFinishProcessing();
163 }
164
165
166 void
x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)167 CPSGS_ResolveProcessor::x_SendBioseqInfo(SBioseqResolution & bioseq_resolution)
168 {
169 auto effective_output_format = m_ResolveRequest->m_OutputFormat;
170 if (effective_output_format == SPSGS_ResolveRequest::ePSGS_NativeFormat ||
171 effective_output_format == SPSGS_ResolveRequest::ePSGS_UnknownFormat) {
172 effective_output_format = SPSGS_ResolveRequest::ePSGS_JsonFormat;
173 }
174
175 if (bioseq_resolution.m_ResolutionResult == ePSGS_BioseqDB ||
176 bioseq_resolution.m_ResolutionResult == ePSGS_BioseqCache)
177 AdjustBioseqAccession(bioseq_resolution);
178
179 string data_to_send;
180 if (effective_output_format == SPSGS_ResolveRequest::ePSGS_JsonFormat) {
181 data_to_send = ToJson(bioseq_resolution.m_BioseqInfo,
182 m_ResolveRequest->m_IncludeDataFlags).
183 Repr(CJsonNode::fStandardJson);
184 } else {
185 data_to_send = ToBioseqProtobuf(bioseq_resolution.m_BioseqInfo);
186 }
187
188 size_t item_id = IPSGS_Processor::m_Reply->GetItemId();
189 IPSGS_Processor::m_Reply->PrepareBioseqData(item_id, GetName(),
190 data_to_send,
191 effective_output_format);
192 IPSGS_Processor::m_Reply->PrepareBioseqCompletion(item_id, GetName(), 2);
193 }
194
195
Cancel(void)196 void CPSGS_ResolveProcessor::Cancel(void)
197 {
198 m_Cancelled = true;
199 CancelLoaders();
200 }
201
202
GetStatus(void)203 IPSGS_Processor::EPSGS_Status CPSGS_ResolveProcessor::GetStatus(void)
204 {
205 auto status = CPSGS_CassProcessorBase::GetStatus();
206 if (status == IPSGS_Processor::ePSGS_InProgress) {
207 return status;
208 }
209
210 if (m_Cancelled) {
211 return IPSGS_Processor::ePSGS_Cancelled;
212 }
213
214 return status;
215 }
216
217
GetName(void) const218 string CPSGS_ResolveProcessor::GetName(void) const
219 {
220 return "Cassandra-resolve";
221 }
222
223
ProcessEvent(void)224 void CPSGS_ResolveProcessor::ProcessEvent(void)
225 {
226 x_Peek(true);
227 }
228
229
x_Peek(bool need_wait)230 void CPSGS_ResolveProcessor::x_Peek(bool need_wait)
231 {
232 if (m_Cancelled) {
233 return;
234 }
235
236 if (m_InPeek) {
237 return;
238 }
239
240 m_InPeek = true;
241
242 // 1 -> call m_Loader->Wait1 to pick data
243 // 2 -> check if we have ready-to-send buffers
244 // 3 -> call reply->Send() to send what we have if it is ready
245 bool overall_final_state = false;
246
247 while (true) {
248 auto initial_size = m_FetchDetails.size();
249
250 for (auto & details: m_FetchDetails) {
251 if (details)
252 overall_final_state |= x_Peek(details, need_wait);
253 }
254
255 if (initial_size == m_FetchDetails.size())
256 break;
257 }
258
259 // Ready packets needs to be send only once when everything is finished
260 if (overall_final_state) {
261 if (AreAllFinishedRead()) {
262 if (IPSGS_Processor::m_Reply->IsOutputReady()) {
263 IPSGS_Processor::m_Reply->Flush(false);
264 }
265 }
266 }
267
268 m_InPeek = false;
269 }
270
271
x_Peek(unique_ptr<CCassFetch> & fetch_details,bool need_wait)272 bool CPSGS_ResolveProcessor::x_Peek(unique_ptr<CCassFetch> & fetch_details,
273 bool need_wait)
274 {
275 if (!fetch_details->GetLoader())
276 return true;
277
278 bool final_state = false;
279 if (need_wait)
280 if (!fetch_details->ReadFinished()) {
281 final_state = fetch_details->GetLoader()->Wait();
282 if (final_state) {
283 fetch_details->SetReadFinished();
284 }
285 }
286
287 if (fetch_details->GetLoader()->HasError() &&
288 IPSGS_Processor::m_Reply->IsOutputReady() &&
289 ! IPSGS_Processor::m_Reply->IsFinished()) {
290 // Send an error
291 string error = fetch_details->GetLoader()->LastError();
292 auto * app = CPubseqGatewayApp::GetInstance();
293
294 app->GetCounters().Increment(CPSGSCounters::ePSGS_UnknownError);
295 PSG_ERROR(error);
296
297 IPSGS_Processor::m_Reply->PrepareProcessorMessage(
298 IPSGS_Processor::m_Reply->GetItemId(),
299 GetName(), error, CRequestStatus::e500_InternalServerError,
300 ePSGS_UnknownError, eDiag_Error);
301
302 // Mark finished
303 UpdateOverallStatus(CRequestStatus::e500_InternalServerError);
304 fetch_details->SetReadFinished();
305 SignalFinishProcessing();
306 }
307
308 return final_state;
309 }
310
311
x_OnResolutionGoodData(void)312 void CPSGS_ResolveProcessor::x_OnResolutionGoodData(void)
313 {
314 // The resolution process started to receive data which look good so
315 // the dispatcher should be notified that the other processors can be
316 // stopped
317 if (m_Cancelled || m_Completed)
318 return;
319
320 if (SignalStartProcessing() == EPSGS_StartProcessing::ePSGS_Cancel) {
321 m_Completed = true;
322 }
323 }
324
325