1 /* $Id: psgs_dispatcher.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Sergey Satskiy
27 *
28 * File Description:
29 *
30 */
31 #include <ncbi_pch.hpp>
32
33 #include "psgs_dispatcher.hpp"
34 #include "pubseq_gateway_logging.hpp"
35
36 USING_NCBI_SCOPE;
37
38
AddProcessor(unique_ptr<IPSGS_Processor> processor)39 void CPSGS_Dispatcher::AddProcessor(unique_ptr<IPSGS_Processor> processor)
40 {
41 m_RegisteredProcessors.push_back(move(processor));
42 }
43
44
45 list<IPSGS_Processor *>
DispatchRequest(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply)46 CPSGS_Dispatcher::DispatchRequest(shared_ptr<CPSGS_Request> request,
47 shared_ptr<CPSGS_Reply> reply)
48 {
49 list<IPSGS_Processor *> ret;
50 list<SProcessorData> proc_group;
51 TProcessorPriority priority = m_RegisteredProcessors.size();
52 auto request_id = request->GetRequestId();
53
54 for (auto const & proc : m_RegisteredProcessors) {
55 if (request->NeedTrace()) {
56 reply->SendTrace("Try to create processor: " + proc->GetName(),
57 request->GetStartTimestamp());
58 }
59 IPSGS_Processor * p = proc->CreateProcessor(request, reply, priority);
60 if (p) {
61 ret.push_back(p);
62
63 m_GroupsLock.lock();
64
65 auto it = m_ProcessorGroups.find(request_id);
66 auto new_proc = SProcessorData(p, ePSGS_Up,
67 IPSGS_Processor::ePSGS_InProgress);
68 if (it == m_ProcessorGroups.end()) {
69 // First processor in the group so create the list
70 list<SProcessorData> procs;
71 procs.emplace_back(new_proc);
72 m_ProcessorGroups[request_id] = move(procs);
73 } else {
74 // Additional processor in the group so use existing list
75 it->second.emplace_back(new_proc);
76 }
77
78 m_GroupsLock.unlock();
79
80 if (request->NeedTrace()) {
81 reply->SendTrace("Processor " + proc->GetName() +
82 " has been created sucessfully (priority: " +
83 to_string(priority) + ")",
84 request->GetStartTimestamp());
85 }
86 } else {
87 if (request->NeedTrace()) {
88 reply->SendTrace("Processor " + proc->GetName() +
89 " has not been created",
90 request->GetStartTimestamp());
91 }
92 }
93 --priority;
94 }
95
96 if (ret.empty()) {
97 string msg = "No matching processors found to serve the request";
98 PSG_TRACE(msg);
99
100 reply->PrepareReplyMessage(msg, CRequestStatus::e404_NotFound,
101 ePSGS_NoProcessor, eDiag_Error);
102 reply->PrepareReplyCompletion();
103 reply->Flush(true);
104 x_PrintRequestStop(request, CRequestStatus::e404_NotFound);
105 }
106
107 return ret;
108 }
109
110
111 IPSGS_Processor::EPSGS_StartProcessing
SignalStartProcessing(IPSGS_Processor * processor)112 CPSGS_Dispatcher::SignalStartProcessing(IPSGS_Processor * processor)
113 {
114 // Basically the Cancel() call needs to be invoked for each of the other
115 // processors.
116 size_t request_id = processor->GetRequest()->GetRequestId();
117 list<IPSGS_Processor *> to_be_cancelled; // To avoid calling Cancel()
118 // under the lock
119
120
121 m_GroupsLock.lock();
122
123 if (processor->GetRequest()->NeedTrace()) {
124 // Trace sending is under a lock intentionally: to avoid changes in the
125 // sequence of processing the signal due to a race when tracing is
126 // switched on.
127 // In a normal operation the tracing is switched off anyway.
128 processor->GetReply()->SendTrace(
129 "Processor: " + processor->GetName() + " (priority: " +
130 to_string(processor->GetPriority()) +
131 ") signalled start dealing with data for the client",
132 processor->GetRequest()->GetStartTimestamp());
133 }
134
135 auto procs = m_ProcessorGroups.find(request_id);
136 if (procs == m_ProcessorGroups.end()) {
137 // The processors group does not exist anymore
138 m_GroupsLock.unlock();
139 return IPSGS_Processor::ePSGS_Cancel;
140 }
141
142 // The group found; check that the processor has not been cancelled yet
143 for (const auto & proc: procs->second) {
144 if (proc.m_Processor == processor) {
145 if (proc.m_DispatchStatus == ePSGS_Cancelled) {
146 // The other processor has already called Cancel() for this one
147 m_GroupsLock.unlock();
148 return IPSGS_Processor::ePSGS_Cancel;
149 }
150 if (proc.m_DispatchStatus != ePSGS_Up) {
151 PSG_ERROR("Logic error: the processor dispatcher received "
152 "'start processing' when its dispatch status is not UP (i.e. FINISHED)");
153 }
154 break;
155 }
156 }
157
158 // Everything looks OK; this is the first processor who started to send
159 // data to the client so the other processors should be cancelled
160 for (auto & proc: procs->second) {
161 if (proc.m_Processor == processor)
162 continue;
163 if (proc.m_DispatchStatus == ePSGS_Up) {
164 proc.m_DispatchStatus = ePSGS_Cancelled;
165 to_be_cancelled.push_back(proc.m_Processor);
166 }
167 }
168
169 m_GroupsLock.unlock();
170
171 // Call the other processor's Cancel() out of the lock
172 for (auto & proc: to_be_cancelled) {
173
174 if (processor->GetRequest()->NeedTrace()) {
175 processor->GetReply()->SendTrace(
176 "Invoking Cancel() for the processor: " + proc->GetName() +
177 " (priority: " + to_string(proc->GetPriority()) + ")",
178 processor->GetRequest()->GetStartTimestamp());
179 }
180
181 proc->Cancel();
182 }
183
184 return IPSGS_Processor::ePSGS_Proceed;
185 }
186
187
SignalFinishProcessing(IPSGS_Processor * processor)188 void CPSGS_Dispatcher::SignalFinishProcessing(IPSGS_Processor * processor)
189 {
190 // It needs to check if all the processors finished.
191 // If so then the best finish status is used, sent to the client and the
192 // group is deleted
193
194 bool all_procs_finished = true;
195 size_t request_id = processor->GetRequest()->GetRequestId();
196 IPSGS_Processor::EPSGS_Status best_status = processor->GetStatus();
197
198 m_GroupsLock.lock();
199
200 if (processor->GetRequest()->NeedTrace()) {
201 // Trace sending is under a lock intentionally: to avoid changes in the
202 // sequence of processing the signal due to a race when tracing is
203 // switched on.
204 // In a normal operation the tracing is switched off anyway.
205 processor->GetReply()->SendTrace(
206 "Processor: " + processor->GetName() + " (priority: " +
207 to_string(processor->GetPriority()) +
208 ") signalled finish with status " +
209 IPSGS_Processor::StatusToString(processor->GetStatus()),
210 processor->GetRequest()->GetStartTimestamp());
211 }
212
213 auto procs = m_ProcessorGroups.find(request_id);
214 if (procs == m_ProcessorGroups.end()) {
215 // The processors group does not exist any more
216 m_GroupsLock.unlock();
217 return;
218 }
219
220 for (auto & proc: procs->second) {
221 if (proc.m_Processor == processor) {
222 if (proc.m_DispatchStatus == ePSGS_Up) {
223 proc.m_DispatchStatus = ePSGS_Finished;
224 proc.m_FinishStatus = processor->GetStatus();
225 continue;
226 }
227
228 // The status is already cancelled or finished which may mean that
229 // it is not the first time call. However it is possible that
230 // during the first time the output was not ready so the final PSG
231 // chunk has not been sent. Thus we should continue as usual.
232 }
233
234 switch (proc.m_DispatchStatus) {
235 case ePSGS_Finished:
236 best_status = min(best_status, proc.m_FinishStatus);
237 break;
238 case ePSGS_Up:
239 all_procs_finished = false;
240 break;
241 case ePSGS_Cancelled:
242 // Finished but the cancelled processor do not participate in
243 // the overall reply status
244 break;
245 }
246 }
247
248 if (all_procs_finished) {
249 // - Send the best status to the client
250 // - Clear the group
251 CRequestStatus::ECode request_status;
252 switch (best_status) {
253 case IPSGS_Processor::ePSGS_Found:
254 request_status = CRequestStatus::e200_Ok;
255 break;
256 case IPSGS_Processor::ePSGS_NotFound:
257 request_status = CRequestStatus::e404_NotFound;
258 break;
259 default:
260 request_status = CRequestStatus::e500_InternalServerError;
261 }
262
263 auto reply = processor->GetReply();
264 if (!reply->IsFinished() && reply->IsOutputReady()) {
265 if (processor->GetRequest()->NeedTrace()) {
266 reply->SendTrace(
267 "Request processing finished; final status: " +
268 to_string(request_status),
269 processor->GetRequest()->GetStartTimestamp());
270 }
271
272 reply->PrepareReplyCompletion();
273 reply->Flush(true);
274 x_PrintRequestStop(processor->GetRequest(),
275 request_status);
276
277 // Clear the group after the final chunk is sent
278 m_ProcessorGroups.erase(procs);
279 }
280 }
281
282 m_GroupsLock.unlock();
283 }
284
285
SignalConnectionCancelled(IPSGS_Processor * processor)286 void CPSGS_Dispatcher::SignalConnectionCancelled(IPSGS_Processor * processor)
287 {
288 // When a connection is cancelled there will be no possibility to
289 // send anything over the connection. So basically what is needed to do is
290 // to print request stop and delete the processors group.
291 auto request = processor->GetRequest();
292 size_t request_id = request->GetRequestId();
293
294 m_GroupsLock.lock();
295
296 auto procs = m_ProcessorGroups.find(request_id);
297 if (procs == m_ProcessorGroups.end()) {
298 // The processors group does not exist any more
299 m_GroupsLock.unlock();
300 return;
301 }
302
303 if (request->GetRequestContext().NotNull()) {
304 request->SetRequestContext();
305 PSG_MESSAGE("HTTP connection has been cancelled");
306 CDiagContext::SetRequestContext(NULL);
307 }
308
309 x_PrintRequestStop(processor->GetRequest(), CRequestStatus::e200_Ok);
310 m_ProcessorGroups.erase(procs);
311 m_GroupsLock.unlock();
312 }
313
314
x_PrintRequestStop(shared_ptr<CPSGS_Request> request,CRequestStatus::ECode status)315 void CPSGS_Dispatcher::x_PrintRequestStop(shared_ptr<CPSGS_Request> request,
316 CRequestStatus::ECode status)
317 {
318 if (request->GetRequestContext().NotNull()) {
319 request->SetRequestContext();
320 CDiagContext::GetRequestContext().SetRequestStatus(status);
321 GetDiagContext().PrintRequestStop();
322 CDiagContext::GetRequestContext().Reset();
323 CDiagContext::SetRequestContext(NULL);
324 }
325 }
326
327