1 /*  $Id: rpcbase.cpp 615754 2020-09-03 15:12:21Z grichenk $
2 * ===========================================================================
3 *
4 *                            PUBLIC DOMAIN NOTICE
5 *               National Center for Biotechnology Information
6 *
7 *  This software/database is a "United States Government Work" under the
8 *  terms of the United States Copyright Act.  It was written as part of
9 *  the author's official duties as a United States Government employee and
10 *  thus cannot be copyrighted.  This software/database is freely available
11 *  to the public for use. The National Library of Medicine and the U.S.
12 *  Government have not placed any restriction on its use or reproduction.
13 *
14 *  Although all reasonable efforts have been taken to ensure the accuracy
15 *  and reliability of the software and data, the NLM and the U.S.
16 *  Government do not and cannot warrant the performance or results that
17 *  may be obtained by using this software or data. The NLM and the U.S.
18 *  Government disclaim all warranties, express or implied, including
19 *  warranties of performance, merchantability or fitness for any particular
20 *  purpose.
21 *
22 *  Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko
27 *
28 * File Description:
29 *   CRPCClient helpers
30 */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbistr.hpp>
34 #include <corelib/ncbi_cookies.hpp>
35 #include <serial/rpcbase_impl.hpp>
36 
37 BEGIN_NCBI_SCOPE
38 
39 
s_GetConfigString(const string & service,const string & variable)40 static string s_GetConfigString(const string& service,
41                                 const string& variable)
42 {
43     if (service.empty() || variable.empty()) return kEmptyStr;
44 
45     string env_var = service + "__RPC_CLIENT__" + variable;
46     NStr::ToUpper(env_var);
47     const TXChar* str = NcbiSys_getenv(_T_XCSTRING(env_var.c_str()));
48 
49     if (str && *str) {
50         return _T_CSTRING(str);
51     }
52 
53     CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
54     if (app  &&  app->HasLoadedConfig()) {
55         return app->GetConfig().Get(service + ".rpc_client", variable);
56     }
57     return kEmptyStr;
58 }
59 
60 
s_GetTryLimit(const string & service)61 static unsigned int s_GetTryLimit(const string& service)
62 {
63     string str = s_GetConfigString(service, "max_try");
64     if (!str.empty()) {
65         try {
66             unsigned int ret = NStr::StringToNumeric<unsigned int>(str);
67             return ret > 0 ? ret : 3;
68         }
69         catch (...) {
70             ERR_POST(Warning << "Bad " << service << "/max_try value: " << str);
71         }
72     }
73     return 3;
74 }
75 
76 
s_GetRetryDelay(const string & service)77 static CTimeSpan s_GetRetryDelay(const string& service)
78 {
79     CTimeSpan ret;
80     string str = s_GetConfigString(service, "retry_delay");
81     if (!str.empty()) {
82         try {
83             double sec = NStr::StringToNumeric<double>(str);
84             return CTimeSpan(sec > 0 ? sec : 0);
85         }
86         catch (...) {
87             ERR_POST(Warning << "Bad " << service << "/retry_delay value: " << str);
88         }
89     }
90     return ret;
91 }
92 
93 
CRPCClient_Base(const string & service,ESerialDataFormat format)94 CRPCClient_Base::CRPCClient_Base(const string&     service,
95                                  ESerialDataFormat format)
96     : m_Format(format),
97       m_RetryDelay(s_GetRetryDelay(service)),
98       m_TryCount(0),
99       m_RecursionCount(0),
100       m_Service(service),
101       m_TryLimit(s_GetTryLimit(service))
102 {
103 }
104 
105 
CRPCClient_Base(const string & service,ESerialDataFormat format,unsigned int try_limit)106 CRPCClient_Base::CRPCClient_Base(const string&     service,
107     ESerialDataFormat format,
108     unsigned int      try_limit)
109     : m_Format(format),
110       m_RetryDelay(s_GetRetryDelay(service)),
111       m_TryCount(0),
112       m_RecursionCount(0),
113       m_Service(service),
114       m_TryLimit(try_limit > 0 ? try_limit : 3)
115 {
116 }
117 
118 
~CRPCClient_Base(void)119 CRPCClient_Base::~CRPCClient_Base(void)
120 {
121     try {
122         Disconnect();
123     } STD_CATCH_ALL_XX(Serial_RPCClient, 2, "CRPCClient_Base::Disconnect()");
124 }
125 
126 
Connect(void)127 void CRPCClient_Base::Connect(void)
128 {
129     // Do not connect from recursive requests - this must be done
130     // by the main request only.
131     if (m_RecursionCount > 1) return;
132 
133     if (m_Stream.get()  &&  m_Stream->good()) {
134         return; // already connected
135     }
136     CMutexGuard LOCK(m_Mutex);
137     // repeat test with mutex held to avoid races
138     if (m_Stream.get()  &&  m_Stream->good()) {
139         return; // already connected
140     }
141     x_Connect();
142     m_RetryCtx.ResetNeedReconnect();
143 }
144 
145 
Disconnect(void)146 void CRPCClient_Base::Disconnect(void)
147 {
148     CMutexGuard LOCK(m_Mutex);
149     if ( !m_Stream.get()  ||  !m_Stream->good() ) {
150         // not connected -- don't call x_Disconnect, which might
151         // temporarily reconnect to send a fini!
152         return;
153     }
154     x_Disconnect();
155 }
156 
157 
Reset(void)158 void CRPCClient_Base::Reset(void)
159 {
160     CMutexGuard LOCK(m_Mutex);
161     if (m_Stream.get()  &&  m_Stream->good()) {
162         x_Disconnect();
163     }
164     x_Connect();
165 }
166 
167 
SetAffinity(const string & affinity)168 void CRPCClient_Base::SetAffinity(const string& affinity)
169 {
170     if (m_Affinity != affinity) {
171         if (m_RecursionCount > 1) {
172             ERR_POST("Affinity cannot be changed on a recursive request");
173             return;
174         }
175         Disconnect();
176         m_Affinity  = affinity;
177     }
178 }
179 
180 
x_Disconnect(void)181 void CRPCClient_Base::x_Disconnect(void)
182 {
183     m_In.reset();
184     m_Out.reset();
185     m_Stream.reset();
186 }
187 
188 
x_SetStream(CNcbiIostream * stream)189 void CRPCClient_Base::x_SetStream(CNcbiIostream* stream)
190 {
191     m_In .reset();
192     m_Out.reset();
193     m_Stream.reset(stream);
194     m_In .reset(CObjectIStream::Open(m_Format, *stream));
195     m_Out.reset(CObjectOStream::Open(m_Format, *stream));
196 }
197 
198 
199 class CCounterGuard
200 {
201 public:
CCounterGuard(int * counter)202     CCounterGuard(int* counter)
203         : m_Counter(*counter)
204     {
205         m_Counter++;
206     }
207 
~CCounterGuard(void)208     ~CCounterGuard(void)
209     {
210         m_Counter--;
211     }
212 
213 private:
214     int& m_Counter;
215 };
216 
x_Ask(const CSerialObject & request,CSerialObject & reply)217 void CRPCClient_Base::x_Ask(const CSerialObject& request, CSerialObject& reply)
218 {
219     CMutexGuard LOCK(m_Mutex);
220     if (m_RecursionCount == 0) {
221         m_TryCount = 0;
222     }
223     // Recursion counter needs to be decremented on both success and failure.
224     CCounterGuard recursion_guard(&m_RecursionCount);
225 
226     const string& request_name = request.GetThisTypeInfo() != NULL
227         ? ("("+request.GetThisTypeInfo()->GetName()+")")
228         : "(no_request_type)";
229 
230     // Reset headers from previous requests if any.
231     m_RetryCtx.Reset();
232     double max_span = m_RetryDelay.GetAsDouble()*m_TryLimit;
233     double span = max_span;
234     bool limit_by_time = !m_RetryDelay.IsEmpty();
235     // Retry context can be either the default one (m_RetryCtx), or provided
236     // through an exception.
237     for (;;) {
238         if ( IsCanceled() ) {
239             NCBI_THROW(CRPCClientException, eFailed,
240                        "Request canceled " + request_name);
241         }
242         try {
243             SetAffinity(x_GetAffinity(request));
244             Connect(); // No-op if already connected
245             if ( m_RetryCtx.IsSetContentOverride() ) {
246                 if (m_RetryCtx.GetContentOverride() != CHttpRetryContext::eNoContent  &&
247                     m_RetryCtx.IsSetContent() ) {
248                     *m_Stream << m_RetryCtx.GetContent();
249                 }
250             }
251             else {
252                 // by default re-send the original request
253                 x_WriteRequest(*m_Out, request);
254             }
255             m_Stream->peek(); // send data, read response headers
256             if (!m_Stream->good()  &&  !m_Stream->eof()) {
257                 NCBI_THROW(CRPCClientException, eFailed,
258                            "Connection stream is in bad state " + request_name);
259             }
260             if (m_RetryCtx.IsSetContentOverride()  &&
261                 m_RetryCtx.GetContentOverride() == CHttpRetryContext::eFromResponse) {
262                 // store response content to send it with the next retry
263                 CNcbiOstrstream buf;
264                 NcbiStreamCopy(buf, *m_Stream);
265                 m_RetryCtx.SetContent(CNcbiOstrstreamToString(buf));
266             }
267             else {
268                 // read normal response
269                 x_ReadReply(*m_In, reply);
270             }
271             // If reading reply succeeded and no retry was requested by the server, break.
272             if ( !m_RetryCtx.GetNeedRetry() ) {
273                 break;
274             }
275         } catch (CException& e) {
276             // Some exceptions tend to correspond to transient glitches;
277             // the remainder, however, may as well get propagated immediately.
278             CRPCClientException* rpc_ex = dynamic_cast<CRPCClientException*>(&e);
279             if (rpc_ex  &&  rpc_ex->GetErrCode() == CRPCClientException::eRetry) {
280                 if ( rpc_ex->IsSetRetryContext() ) {
281                     // Save information to the local retry context and proceed.
282                     m_RetryCtx = rpc_ex->GetRetryContext();
283                 }
284                 // proceed to retry
285             }
286             else if ( !dynamic_cast<CSerialException*>(&e)
287                       &&  !dynamic_cast<CIOException*>(&e) ) {
288                 // Not a retry related exception, abort.
289                 throw;
290             }
291         }
292         // No retries for recursive requests (e.g. AskInit called by Connect).
293         // Exit immediately, do not reset retry context - it may be used by
294         // the main request's retry loop.
295         if (m_RecursionCount > 1) return;
296 
297         // Retry request on exception or on explicit retry request from the server.
298 
299         // If using time limit, allow to make more than m_RetryLimit attempts
300         // if the server has set shorter delay.
301         if ((!limit_by_time  &&  ++m_TryCount >= m_TryLimit)  ||
302             !x_ShouldRetry(m_TryCount)) {
303             NCBI_THROW(CRPCClientException, eFailed,
304                        "Failed to receive reply after "
305                        + NStr::NumericToString(m_TryCount)
306                        + (m_TryCount == 1 ? " try " : " tries ")
307                        + request_name );
308         }
309         if ( m_RetryCtx.IsSetStop() ) {
310             NCBI_THROW(CRPCClientException, eFailed,
311                        "Retrying request stopped by the server: "
312                        + m_RetryCtx.GetStopReason() + ' ' + request_name);
313         }
314         CTimeSpan delay = x_GetRetryDelay(span);
315         if ( !delay.IsEmpty() ) {
316             SleepSec(delay.GetCompleteSeconds());
317             SleepMicroSec(delay.GetNanoSecondsAfterSecond() / 1000);
318             span -= delay.GetAsDouble();
319             if (limit_by_time  &&  span <= 0) {
320                 NCBI_THROW(CRPCClientException, eFailed,
321                            "Failed to receive reply in "
322                            + CTimeSpan(max_span).AsSmartString()
323                            + ' ' + request_name);
324             }
325         }
326         // Always reconnect on retry.
327         if ( IsCanceled() ) {
328             NCBI_THROW(CRPCClientException, eFailed,
329                        "Request canceled " + request_name);
330         }
331         try {
332             Reset();
333         } STD_CATCH_ALL_XX(Serial_RPCClient, 1,
334                            "CRPCClient_Base::Reset() " + request_name);
335     }
336     // Reset retry context when done.
337     m_RetryCtx.Reset();
338     // If there were any retries, force disconnect to prevent using old
339     // retry url, args etc. with the next request.
340     if ( m_TryCount > 0  &&  m_RecursionCount <= 1 ) {
341         Disconnect();
342     }
343 }
344 
345 
x_ShouldRetry(unsigned int tries)346 bool CRPCClient_Base::x_ShouldRetry(unsigned int tries) /* NCBI_FAKE_WARNING */
347 {
348     _TRACE("CRPCClient_Base::x_ShouldRetry: retrying after " << tries
349            << " failure(s)");
350     return true;
351 }
352 
353 
x_GetRetryDelay(double max_delay) const354 CTimeSpan CRPCClient_Base::x_GetRetryDelay(double max_delay) const
355 {
356     // If not set by the server, use local delay.
357     if ( !m_RetryCtx.IsSetDelay() ) {
358         return m_RetryDelay;
359     }
360     // If local delay is not zero, we have to limit total retries time to max_delay.
361     if (!m_RetryDelay.IsEmpty()  &&
362         m_RetryCtx.GetDelay().GetAsDouble() > max_delay) {
363         return CTimeSpan(max_delay);
364     }
365     return m_RetryCtx.GetDelay();
366 }
367 
368 
GetContentTypeHeader(ESerialDataFormat format)369 const char* CRPCClient_Base::GetContentTypeHeader(ESerialDataFormat format)
370 {
371     switch (format) {
372     case eSerial_None:
373         break;
374     case eSerial_AsnText:
375         return "Content-Type: x-ncbi-data/x-asn-text\r\n";
376     case eSerial_AsnBinary:
377         return "Content-Type: x-ncbi-data/x-asn-binary\r\n";
378     case eSerial_Xml:
379         return "Content-Type: application/xml\r\n";
380     case eSerial_Json:
381         return "Content-Type: application/json\r\n";
382     }
383     return NULL; // kEmptyCStr?
384 }
385 
386 
GetErrCodeString(void) const387 const char* CRPCClientException::GetErrCodeString(void) const
388 {
389     switch (GetErrCode()) {
390     case eRetry:  return "eRetry";
391     case eFailed: return "eFailed";
392     case eArgs:   return "eArgs";
393     case eOther:  return "eOther";
394     default:      return CException::GetErrCodeString();
395     }
396 }
397 
398 
399 END_NCBI_SCOPE
400