1 /*  $Id: psgs_dispatcher.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include "psgs_dispatcher.hpp"
34 #include "pubseq_gateway_logging.hpp"
35 
36 USING_NCBI_SCOPE;
37 
38 
AddProcessor(unique_ptr<IPSGS_Processor> processor)39 void CPSGS_Dispatcher::AddProcessor(unique_ptr<IPSGS_Processor> processor)
40 {
41     m_RegisteredProcessors.push_back(move(processor));
42 }
43 
44 
45 list<IPSGS_Processor *>
DispatchRequest(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply)46 CPSGS_Dispatcher::DispatchRequest(shared_ptr<CPSGS_Request> request,
47                                   shared_ptr<CPSGS_Reply> reply)
48 {
49     list<IPSGS_Processor *>         ret;
50     list<SProcessorData>            proc_group;
51     TProcessorPriority              priority = m_RegisteredProcessors.size();
52     auto                            request_id = request->GetRequestId();
53 
54     for (auto const &  proc : m_RegisteredProcessors) {
55         if (request->NeedTrace()) {
56             reply->SendTrace("Try to create processor: " + proc->GetName(),
57                              request->GetStartTimestamp());
58         }
59         IPSGS_Processor *   p = proc->CreateProcessor(request, reply, priority);
60         if (p) {
61             ret.push_back(p);
62 
63             m_GroupsLock.lock();
64 
65             auto    it = m_ProcessorGroups.find(request_id);
66             auto    new_proc = SProcessorData(p, ePSGS_Up,
67                                               IPSGS_Processor::ePSGS_InProgress);
68             if (it == m_ProcessorGroups.end()) {
69                 // First processor in the group so create the list
70                 list<SProcessorData>    procs;
71                 procs.emplace_back(new_proc);
72                 m_ProcessorGroups[request_id] = move(procs);
73             } else {
74                 // Additional processor in the group so use existing list
75                 it->second.emplace_back(new_proc);
76             }
77 
78             m_GroupsLock.unlock();
79 
80             if (request->NeedTrace()) {
81                 reply->SendTrace("Processor " + proc->GetName() +
82                                  " has been created sucessfully (priority: " +
83                                  to_string(priority) + ")",
84                                  request->GetStartTimestamp());
85             }
86         } else {
87             if (request->NeedTrace()) {
88                 reply->SendTrace("Processor " + proc->GetName() +
89                                  " has not been created",
90                                  request->GetStartTimestamp());
91             }
92         }
93         --priority;
94     }
95 
96     if (ret.empty()) {
97         string  msg = "No matching processors found to serve the request";
98         PSG_TRACE(msg);
99 
100         reply->PrepareReplyMessage(msg, CRequestStatus::e404_NotFound,
101                                    ePSGS_NoProcessor, eDiag_Error);
102         reply->PrepareReplyCompletion();
103         reply->Flush(true);
104         x_PrintRequestStop(request, CRequestStatus::e404_NotFound);
105     }
106 
107     return ret;
108 }
109 
110 
111 IPSGS_Processor::EPSGS_StartProcessing
SignalStartProcessing(IPSGS_Processor * processor)112 CPSGS_Dispatcher::SignalStartProcessing(IPSGS_Processor *  processor)
113 {
114     // Basically the Cancel() call needs to be invoked for each of the other
115     // processors.
116     size_t                      request_id = processor->GetRequest()->GetRequestId();
117     list<IPSGS_Processor *>     to_be_cancelled;    // To avoid calling Cancel()
118                                                     // under the lock
119 
120 
121     m_GroupsLock.lock();
122 
123     if (processor->GetRequest()->NeedTrace()) {
124         // Trace sending is under a lock intentionally: to avoid changes in the
125         // sequence of processing the signal due to a race when tracing is
126         // switched on.
127         // In a normal operation the tracing is switched off anyway.
128         processor->GetReply()->SendTrace(
129             "Processor: " + processor->GetName() + " (priority: " +
130             to_string(processor->GetPriority()) +
131             ") signalled start dealing with data for the client",
132             processor->GetRequest()->GetStartTimestamp());
133     }
134 
135     auto    procs = m_ProcessorGroups.find(request_id);
136     if (procs == m_ProcessorGroups.end()) {
137         // The processors group does not exist anymore
138         m_GroupsLock.unlock();
139         return IPSGS_Processor::ePSGS_Cancel;
140     }
141 
142     // The group found; check that the processor has not been cancelled yet
143     for (const auto &  proc: procs->second) {
144         if (proc.m_Processor == processor) {
145             if (proc.m_DispatchStatus == ePSGS_Cancelled) {
146                 // The other processor has already called Cancel() for this one
147                 m_GroupsLock.unlock();
148                 return IPSGS_Processor::ePSGS_Cancel;
149             }
150             if (proc.m_DispatchStatus != ePSGS_Up) {
151                 PSG_ERROR("Logic error: the processor dispatcher received "
152                           "'start processing' when its dispatch status is not UP (i.e. FINISHED)");
153             }
154             break;
155         }
156     }
157 
158     // Everything looks OK; this is the first processor who started to send
159     // data to the client so the other processors should be cancelled
160     for (auto &  proc: procs->second) {
161         if (proc.m_Processor == processor)
162             continue;
163         if (proc.m_DispatchStatus == ePSGS_Up) {
164             proc.m_DispatchStatus = ePSGS_Cancelled;
165             to_be_cancelled.push_back(proc.m_Processor);
166         }
167     }
168 
169     m_GroupsLock.unlock();
170 
171     // Call the other processor's Cancel() out of the lock
172     for (auto & proc: to_be_cancelled) {
173 
174         if (processor->GetRequest()->NeedTrace()) {
175             processor->GetReply()->SendTrace(
176                 "Invoking Cancel() for the processor: " + proc->GetName() +
177                 " (priority: " + to_string(proc->GetPriority()) + ")",
178                 processor->GetRequest()->GetStartTimestamp());
179         }
180 
181         proc->Cancel();
182     }
183 
184     return IPSGS_Processor::ePSGS_Proceed;
185 }
186 
187 
SignalFinishProcessing(IPSGS_Processor * processor)188 void CPSGS_Dispatcher::SignalFinishProcessing(IPSGS_Processor *  processor)
189 {
190     // It needs to check if all the processors finished.
191     // If so then the best finish status is used, sent to the client and the
192     // group is deleted
193 
194     bool                            all_procs_finished = true;
195     size_t                          request_id = processor->GetRequest()->GetRequestId();
196     IPSGS_Processor::EPSGS_Status   best_status = processor->GetStatus();
197 
198     m_GroupsLock.lock();
199 
200     if (processor->GetRequest()->NeedTrace()) {
201         // Trace sending is under a lock intentionally: to avoid changes in the
202         // sequence of processing the signal due to a race when tracing is
203         // switched on.
204         // In a normal operation the tracing is switched off anyway.
205         processor->GetReply()->SendTrace(
206             "Processor: " + processor->GetName() + " (priority: " +
207             to_string(processor->GetPriority()) +
208             ") signalled finish with status " +
209             IPSGS_Processor::StatusToString(processor->GetStatus()),
210             processor->GetRequest()->GetStartTimestamp());
211     }
212 
213     auto    procs = m_ProcessorGroups.find(request_id);
214     if (procs == m_ProcessorGroups.end()) {
215         // The processors group does not exist any more
216         m_GroupsLock.unlock();
217         return;
218     }
219 
220     for (auto &  proc: procs->second) {
221         if (proc.m_Processor == processor) {
222             if (proc.m_DispatchStatus == ePSGS_Up) {
223                 proc.m_DispatchStatus = ePSGS_Finished;
224                 proc.m_FinishStatus = processor->GetStatus();
225                 continue;
226             }
227 
228             // The status is already cancelled or finished which may mean that
229             // it is not the first time call. However it is possible that
230             // during the first time the output was not ready so the final PSG
231             // chunk has not been sent. Thus we should continue as usual.
232         }
233 
234         switch (proc.m_DispatchStatus) {
235             case ePSGS_Finished:
236                 best_status = min(best_status, proc.m_FinishStatus);
237                 break;
238             case ePSGS_Up:
239                 all_procs_finished = false;
240                 break;
241             case ePSGS_Cancelled:
242                 // Finished but the cancelled processor do not participate in
243                 // the overall reply status
244                 break;
245         }
246     }
247 
248     if (all_procs_finished) {
249         // - Send the best status to the client
250         // - Clear the group
251         CRequestStatus::ECode   request_status;
252         switch (best_status) {
253             case IPSGS_Processor::ePSGS_Found:
254                 request_status = CRequestStatus::e200_Ok;
255                 break;
256             case IPSGS_Processor::ePSGS_NotFound:
257                 request_status = CRequestStatus::e404_NotFound;
258                 break;
259             default:
260                 request_status = CRequestStatus::e500_InternalServerError;
261         }
262 
263         auto    reply = processor->GetReply();
264         if (!reply->IsFinished() && reply->IsOutputReady()) {
265             if (processor->GetRequest()->NeedTrace()) {
266                 reply->SendTrace(
267                     "Request processing finished; final status: " +
268                     to_string(request_status),
269                     processor->GetRequest()->GetStartTimestamp());
270             }
271 
272             reply->PrepareReplyCompletion();
273             reply->Flush(true);
274             x_PrintRequestStop(processor->GetRequest(),
275                                request_status);
276 
277             // Clear the group after the final chunk is sent
278             m_ProcessorGroups.erase(procs);
279         }
280     }
281 
282     m_GroupsLock.unlock();
283 }
284 
285 
SignalConnectionCancelled(IPSGS_Processor * processor)286 void CPSGS_Dispatcher::SignalConnectionCancelled(IPSGS_Processor *  processor)
287 {
288     // When a connection is cancelled there will be no possibility to
289     // send anything over the connection. So basically what is needed to do is
290     // to print request stop and delete the processors group.
291     auto        request = processor->GetRequest();
292     size_t      request_id = request->GetRequestId();
293 
294     m_GroupsLock.lock();
295 
296     auto    procs = m_ProcessorGroups.find(request_id);
297     if (procs == m_ProcessorGroups.end()) {
298         // The processors group does not exist any more
299         m_GroupsLock.unlock();
300         return;
301     }
302 
303     if (request->GetRequestContext().NotNull()) {
304         request->SetRequestContext();
305         PSG_MESSAGE("HTTP connection has been cancelled");
306         CDiagContext::SetRequestContext(NULL);
307     }
308 
309     x_PrintRequestStop(processor->GetRequest(), CRequestStatus::e200_Ok);
310     m_ProcessorGroups.erase(procs);
311     m_GroupsLock.unlock();
312 }
313 
314 
x_PrintRequestStop(shared_ptr<CPSGS_Request> request,CRequestStatus::ECode status)315 void CPSGS_Dispatcher::x_PrintRequestStop(shared_ptr<CPSGS_Request> request,
316                                           CRequestStatus::ECode  status)
317 {
318     if (request->GetRequestContext().NotNull()) {
319         request->SetRequestContext();
320         CDiagContext::GetRequestContext().SetRequestStatus(status);
321         GetDiagContext().PrintRequestStop();
322         CDiagContext::GetRequestContext().Reset();
323         CDiagContext::SetRequestContext(NULL);
324     }
325 }
326 
327