1 /* $Id: osg_processor_base.cpp 629837 2021-04-22 12:47:49Z ivanov $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Eugene Vasilchenko
27 *
28 * File Description: base class for processors which may generate os_gateway
29 * fetches
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "osg_processor_base.hpp"
36
37 #include "osg_fetch.hpp"
38 #include "osg_caller.hpp"
39 #include "osg_connection.hpp"
40
41 #include <objects/id2/ID2_Request_Packet.hpp>
42 #include <objects/id2/ID2_Request.hpp>
43 #include <objects/id2/ID2_Params.hpp>
44 #include <objects/id2/ID2_Param.hpp>
45 #include <objects/id2/ID2_Reply.hpp>
46
47
48 BEGIN_NCBI_NAMESPACE;
49 BEGIN_NAMESPACE(psg);
50 BEGIN_NAMESPACE(osg);
51
52
CPSGS_OSGProcessorBase(const CRef<COSGConnectionPool> & pool,const shared_ptr<CPSGS_Request> & request,const shared_ptr<CPSGS_Reply> & reply,TProcessorPriority priority)53 CPSGS_OSGProcessorBase::CPSGS_OSGProcessorBase(const CRef<COSGConnectionPool>& pool,
54 const shared_ptr<CPSGS_Request>& request,
55 const shared_ptr<CPSGS_Reply>& reply,
56 TProcessorPriority priority)
57 : m_Context(request->GetRequestContext()),
58 m_ConnectionPool(pool),
59 m_Status(IPSGS_Processor::ePSGS_InProgress),
60 m_Canceled(false)
61 {
62 m_Request = request;
63 m_Reply = reply;
64 m_Priority = priority;
65 _ASSERT(m_ConnectionPool);
66 _ASSERT(m_Request);
67 _ASSERT(m_Reply);
68 }
69
70
71 static int s_DebugLevel = eDebugLevel_default;
72 static EDiagSev s_DiagSeverity = eDiag_Trace;
73
74
GetDebugLevel()75 int GetDebugLevel()
76 {
77 return s_DebugLevel;
78 }
79
80
SetDebugLevel(int level)81 void SetDebugLevel(int level)
82 {
83 s_DebugLevel = level;
84 }
85
86
GetDiagSeverity()87 Severity GetDiagSeverity()
88 {
89 return Severity(s_DiagSeverity);
90 }
91
92
SetDiagSeverity(EDiagSev severity)93 void SetDiagSeverity(EDiagSev severity)
94 {
95 s_DiagSeverity = severity;
96 }
97
98
CreateProcessor(shared_ptr<CPSGS_Request> request,shared_ptr<CPSGS_Reply> reply,TProcessorPriority priority) const99 IPSGS_Processor* CPSGS_OSGProcessorBase::CreateProcessor(shared_ptr<CPSGS_Request> request,
100 shared_ptr<CPSGS_Reply> reply,
101 TProcessorPriority priority) const
102 {
103 return nullptr;
104 }
105
106
~CPSGS_OSGProcessorBase()107 CPSGS_OSGProcessorBase::~CPSGS_OSGProcessorBase()
108 {
109 }
110
111
Process()112 void CPSGS_OSGProcessorBase::Process()
113 {
114 if ( m_Canceled ) {
115 return;
116 }
117 if ( m_Fetches.empty() ) {
118 CreateRequests();
119 }
120 _ASSERT(!m_Fetches.empty());
121
122 for ( double retry_count = m_ConnectionPool->GetRetryCount(); retry_count > 0; ) {
123 if ( m_Canceled ) {
124 return;
125 }
126
127 // We need to distinguish different kinds of communication failures with different
128 // effect on retry logic.
129 // 1. stale/disconnected connection failure - there maybe multiple in active connection pool
130 // 2. multiple simultaneous failures from concurrent incoming requests
131 // 3. repeated failure of specific request at OSG server
132 // In the first case we shouldn't account all such failures in the same retry counter -
133 // it will overflow easily, and quite unnecessary.
134 // In the first case we shouldn't increase wait time too much -
135 // the failures should be treated as single failure for the sake of waiting before
136 // next connection attempt.
137 // In the third case we should make sure we abandon the failing request when retry limit
138 // is reached. It should be detected no matter of concurrent successful requests.
139
140 bool last_attempt = retry_count <= 1;
141 COSGCaller caller;
142 try {
143 caller.AllocateConnection(m_ConnectionPool, m_Context);
144 }
145 catch ( exception& exc ) {
146 if ( last_attempt ) {
147 ERR_POST("OSG: failed opening connection: "<<exc.what());
148 throw;
149 }
150 else {
151 // failed new connection - consume full retry
152 ERR_POST("OSG: retrying after failure opening connection: "<<exc.what());
153 retry_count -= 1;
154 continue;
155 }
156 }
157
158 if ( m_Canceled ) {
159 return;
160 }
161
162 try {
163 caller.SendRequest(*this);
164 caller.WaitForReplies(*this);
165 }
166 catch ( exception& exc ) {
167 if ( last_attempt ) {
168 ERR_POST("OSG: failed receiving replies: "<<exc.what());
169 throw;
170 }
171 else {
172 // this may be failure of old connection
173 ERR_POST("OSG: retrying after failure receiving replies: "<<exc.what());
174 if ( caller.GetRequestPacket().Get().front()->GetSerial_number() <= 1 ) {
175 // new connection - consume full retry
176 retry_count -= 1;
177 }
178 else {
179 // old connection from pool - consume part of retry
180 retry_count -= 1./m_ConnectionPool->GetMaxConnectionCount();
181 }
182 continue;
183 }
184 }
185
186 // successful
187 break;
188 }
189
190 if ( m_Canceled ) {
191 return;
192 }
193 ProcessReplies();
194 }
195
196
Cancel()197 void CPSGS_OSGProcessorBase::Cancel()
198 {
199 m_Canceled = true;
200 }
201
202
ResetReplies()203 void CPSGS_OSGProcessorBase::ResetReplies()
204 {
205 }
206
207
NotifyOSGCallStart()208 void CPSGS_OSGProcessorBase::NotifyOSGCallStart()
209 {
210 }
211
212
NotifyOSGCallReply(const CID2_Reply &)213 void CPSGS_OSGProcessorBase::NotifyOSGCallReply(const CID2_Reply& /*reply*/)
214 {
215 }
216
217
NotifyOSGCallEnd()218 void CPSGS_OSGProcessorBase::NotifyOSGCallEnd()
219 {
220 }
221
222
AddRequest(const CRef<CID2_Request> & req0)223 void CPSGS_OSGProcessorBase::AddRequest(const CRef<CID2_Request>& req0)
224 {
225 CRef<CID2_Request> req = req0;
226 if ( 1 ) {
227 // set hops
228 auto hops = GetRequest()->GetRequest<SPSGS_RequestBase>().m_Hops + 1;
229 CRef<CID2_Param> param(new CID2_Param);
230 param->SetName("hops");
231 param->SetValue().push_back(to_string(hops));
232 req->SetParams().Set().push_back(param);
233 }
234 m_Fetches.push_back(Ref(new COSGFetch(req, m_Context)));
235 }
236
237
GetStatus()238 IPSGS_Processor::EPSGS_Status CPSGS_OSGProcessorBase::GetStatus()
239 {
240 return m_Status;
241 }
242
243
SetFinalStatus(EPSGS_Status status)244 void CPSGS_OSGProcessorBase::SetFinalStatus(EPSGS_Status status)
245 {
246 _ASSERT(m_Status == ePSGS_InProgress || status == m_Status);
247 m_Status = status;
248 }
249
250
FinalizeResult()251 void CPSGS_OSGProcessorBase::FinalizeResult()
252 {
253 _ASSERT(m_Status != ePSGS_InProgress);
254 SignalFinishProcessing();
255 }
256
257
FinalizeResult(EPSGS_Status status)258 void CPSGS_OSGProcessorBase::FinalizeResult(EPSGS_Status status)
259 {
260 SetFinalStatus(status);
261 FinalizeResult();
262 }
263
264
265 END_NAMESPACE(osg);
266 END_NAMESPACE(psg);
267 END_NCBI_NAMESPACE;
268