1 /*  $Id: osg_processor_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Eugene Vasilchenko
27  *
28  * File Description: base class for processors which may generate os_gateway
29  *                   fetches
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "osg_processor_base.hpp"
36 
37 #include "osg_fetch.hpp"
38 #include "osg_caller.hpp"
39 #include "osg_connection.hpp"
40 
41 #include <objects/id2/ID2_Request_Packet.hpp>
42 #include <objects/id2/ID2_Request.hpp>
43 #include <objects/id2/ID2_Params.hpp>
44 #include <objects/id2/ID2_Param.hpp>
45 #include <objects/id2/ID2_Reply.hpp>
46 
47 
48 BEGIN_NCBI_NAMESPACE;
49 BEGIN_NAMESPACE(psg);
50 BEGIN_NAMESPACE(osg);
51 
52 
CPSGS_OSGProcessorBase(const CRef<COSGConnectionPool> & pool,const shared_ptr<CPSGS_Request> & request,const shared_ptr<CPSGS_Reply> & reply,TProcessorPriority priority)53 CPSGS_OSGProcessorBase::CPSGS_OSGProcessorBase(const CRef<COSGConnectionPool>& pool,
54                                                const shared_ptr<CPSGS_Request>& request,
55                                                const shared_ptr<CPSGS_Reply>& reply,
56                                                TProcessorPriority priority)
57     : m_Context(request->GetRequestContext()),
58       m_ConnectionPool(pool),
59       m_Status(IPSGS_Processor::ePSGS_InProgress),
60       m_Canceled(false)
61 {
62     m_Request = request;
63     m_Reply = reply;
64     m_Priority = priority;
65     _ASSERT(m_ConnectionPool);
66     _ASSERT(m_Request);
67     _ASSERT(m_Reply);
68 }
69 
70 
71 static int s_DebugLevel = eDebugLevel_default;
72 static EDiagSev s_DiagSeverity = eDiag_Trace;
73 
74 
GetDebugLevel()75 int GetDebugLevel()
76 {
77     return s_DebugLevel;
78 }
79 
80 
SetDebugLevel(int level)81 void SetDebugLevel(int level)
82 {
83     s_DebugLevel = level;
84 }
85 
86 
GetDiagSeverity()87 Severity GetDiagSeverity()
88 {
89     return Severity(s_DiagSeverity);
90 }
91 
92 
SetDiagSeverity(EDiagSev severity)93 void SetDiagSeverity(EDiagSev severity)
94 {
95     s_DiagSeverity = severity;
96 }
97 
98 
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const99 IPSGS_Processor* CPSGS_OSGProcessorBase::CreateProcessor(shared_ptr<CPSGS_Request> request,
100                                                          shared_ptr<CPSGS_Reply> reply,
101                                                          TProcessorPriority priority) const
102 {
103     return nullptr;
104 }
105 
106 
~CPSGS_OSGProcessorBase()107 CPSGS_OSGProcessorBase::~CPSGS_OSGProcessorBase()
108 {
109 }
110 
111 
Process()112 void CPSGS_OSGProcessorBase::Process()
113 {
114     if ( m_Canceled ) {
115         return;
116     }
117     if ( m_Fetches.empty() ) {
118         CreateRequests();
119     }
120     _ASSERT(!m_Fetches.empty());
121 
122     for ( double retry_count = m_ConnectionPool->GetRetryCount(); retry_count > 0; ) {
123         if ( m_Canceled ) {
124             return;
125         }
126 
127         // We need to distinguish different kinds of communication failures with different
128         //   effect on retry logic.
129         // 1. stale/disconnected connection failure - there maybe multiple in active connection pool
130         // 2. multiple simultaneous failures from concurrent incoming requests
131         // 3. repeated failure of specific request at OSG server
132         // In the first case we shouldn't account all such failures in the same retry counter -
133         //   it will overflow easily, and quite unnecessary.
134         // In the first case we shouldn't increase wait time too much -
135         //   the failures should be treated as single failure for the sake of waiting before
136         //   next connection attempt.
137         // In the third case we should make sure we abandon the failing request when retry limit
138         //   is reached. It should be detected no matter of concurrent successful requests.
139 
140         bool last_attempt = retry_count <= 1;
141         COSGCaller caller;
142         try {
143             caller.AllocateConnection(m_ConnectionPool, m_Context);
144         }
145         catch ( exception& exc ) {
146             if ( last_attempt ) {
147                 ERR_POST("OSG: failed opening connection: "<<exc.what());
148                 throw;
149             }
150             else {
151                 // failed new connection - consume full retry
152                 ERR_POST("OSG: retrying after failure opening connection: "<<exc.what());
153                 retry_count -= 1;
154                 continue;
155             }
156         }
157 
158         if ( m_Canceled ) {
159             return;
160         }
161 
162         try {
163             caller.SendRequest(*this);
164             caller.WaitForReplies(*this);
165         }
166         catch ( exception& exc ) {
167             if ( last_attempt ) {
168                 ERR_POST("OSG: failed receiving replies: "<<exc.what());
169                 throw;
170             }
171             else {
172                 // this may be failure of old connection
173                 ERR_POST("OSG: retrying after failure receiving replies: "<<exc.what());
174                 if ( caller.GetRequestPacket().Get().front()->GetSerial_number() <= 1 ) {
175                     // new connection - consume full retry
176                     retry_count -= 1;
177                 }
178                 else {
179                     // old connection from pool - consume part of retry
180                     retry_count -= 1./m_ConnectionPool->GetMaxConnectionCount();
181                 }
182                 continue;
183             }
184         }
185 
186         // successful
187         break;
188     }
189 
190     if ( m_Canceled ) {
191         return;
192     }
193     ProcessReplies();
194 }
195 
196 
Cancel()197 void CPSGS_OSGProcessorBase::Cancel()
198 {
199     m_Canceled = true;
200 }
201 
202 
ResetReplies()203 void CPSGS_OSGProcessorBase::ResetReplies()
204 {
205 }
206 
207 
NotifyOSGCallStart()208 void CPSGS_OSGProcessorBase::NotifyOSGCallStart()
209 {
210 }
211 
212 
NotifyOSGCallReply(const CID2_Reply &)213 void CPSGS_OSGProcessorBase::NotifyOSGCallReply(const CID2_Reply& /*reply*/)
214 {
215 }
216 
217 
NotifyOSGCallEnd()218 void CPSGS_OSGProcessorBase::NotifyOSGCallEnd()
219 {
220 }
221 
222 
AddRequest(const CRef<CID2_Request> & req0)223 void CPSGS_OSGProcessorBase::AddRequest(const CRef<CID2_Request>& req0)
224 {
225     CRef<CID2_Request> req = req0;
226     if ( 1 ) {
227         // set hops
228         auto hops = GetRequest()->GetRequest<SPSGS_RequestBase>().m_Hops + 1;
229         CRef<CID2_Param> param(new CID2_Param);
230         param->SetName("hops");
231         param->SetValue().push_back(to_string(hops));
232         req->SetParams().Set().push_back(param);
233     }
234     m_Fetches.push_back(Ref(new COSGFetch(req, m_Context)));
235 }
236 
237 
GetStatus()238 IPSGS_Processor::EPSGS_Status CPSGS_OSGProcessorBase::GetStatus()
239 {
240     return m_Status;
241 }
242 
243 
SetFinalStatus(EPSGS_Status status)244 void CPSGS_OSGProcessorBase::SetFinalStatus(EPSGS_Status status)
245 {
246     _ASSERT(m_Status == ePSGS_InProgress || status == m_Status);
247     m_Status = status;
248 }
249 
250 
FinalizeResult()251 void CPSGS_OSGProcessorBase::FinalizeResult()
252 {
253     _ASSERT(m_Status != ePSGS_InProgress);
254     SignalFinishProcessing();
255 }
256 
257 
FinalizeResult(EPSGS_Status status)258 void CPSGS_OSGProcessorBase::FinalizeResult(EPSGS_Status status)
259 {
260     SetFinalStatus(status);
261     FinalizeResult();
262 }
263 
264 
265 END_NAMESPACE(osg);
266 END_NAMESPACE(psg);
267 END_NCBI_NAMESPACE;
268