1 /* $Id: rpcbase.cpp 615754 2020-09-03 15:12:21Z grichenk $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Aleksey Grichenko
27 *
28 * File Description:
29 * CRPCClient helpers
30 */
31
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbistr.hpp>
34 #include <corelib/ncbi_cookies.hpp>
35 #include <serial/rpcbase_impl.hpp>
36
37 BEGIN_NCBI_SCOPE
38
39
s_GetConfigString(const string & service,const string & variable)40 static string s_GetConfigString(const string& service,
41 const string& variable)
42 {
43 if (service.empty() || variable.empty()) return kEmptyStr;
44
45 string env_var = service + "__RPC_CLIENT__" + variable;
46 NStr::ToUpper(env_var);
47 const TXChar* str = NcbiSys_getenv(_T_XCSTRING(env_var.c_str()));
48
49 if (str && *str) {
50 return _T_CSTRING(str);
51 }
52
53 CNcbiApplicationGuard app = CNcbiApplication::InstanceGuard();
54 if (app && app->HasLoadedConfig()) {
55 return app->GetConfig().Get(service + ".rpc_client", variable);
56 }
57 return kEmptyStr;
58 }
59
60
s_GetTryLimit(const string & service)61 static unsigned int s_GetTryLimit(const string& service)
62 {
63 string str = s_GetConfigString(service, "max_try");
64 if (!str.empty()) {
65 try {
66 unsigned int ret = NStr::StringToNumeric<unsigned int>(str);
67 return ret > 0 ? ret : 3;
68 }
69 catch (...) {
70 ERR_POST(Warning << "Bad " << service << "/max_try value: " << str);
71 }
72 }
73 return 3;
74 }
75
76
s_GetRetryDelay(const string & service)77 static CTimeSpan s_GetRetryDelay(const string& service)
78 {
79 CTimeSpan ret;
80 string str = s_GetConfigString(service, "retry_delay");
81 if (!str.empty()) {
82 try {
83 double sec = NStr::StringToNumeric<double>(str);
84 return CTimeSpan(sec > 0 ? sec : 0);
85 }
86 catch (...) {
87 ERR_POST(Warning << "Bad " << service << "/retry_delay value: " << str);
88 }
89 }
90 return ret;
91 }
92
93
CRPCClient_Base(const string & service,ESerialDataFormat format)94 CRPCClient_Base::CRPCClient_Base(const string& service,
95 ESerialDataFormat format)
96 : m_Format(format),
97 m_RetryDelay(s_GetRetryDelay(service)),
98 m_TryCount(0),
99 m_RecursionCount(0),
100 m_Service(service),
101 m_TryLimit(s_GetTryLimit(service))
102 {
103 }
104
105
CRPCClient_Base(const string & service,ESerialDataFormat format,unsigned int try_limit)106 CRPCClient_Base::CRPCClient_Base(const string& service,
107 ESerialDataFormat format,
108 unsigned int try_limit)
109 : m_Format(format),
110 m_RetryDelay(s_GetRetryDelay(service)),
111 m_TryCount(0),
112 m_RecursionCount(0),
113 m_Service(service),
114 m_TryLimit(try_limit > 0 ? try_limit : 3)
115 {
116 }
117
118
~CRPCClient_Base(void)119 CRPCClient_Base::~CRPCClient_Base(void)
120 {
121 try {
122 Disconnect();
123 } STD_CATCH_ALL_XX(Serial_RPCClient, 2, "CRPCClient_Base::Disconnect()");
124 }
125
126
Connect(void)127 void CRPCClient_Base::Connect(void)
128 {
129 // Do not connect from recursive requests - this must be done
130 // by the main request only.
131 if (m_RecursionCount > 1) return;
132
133 if (m_Stream.get() && m_Stream->good()) {
134 return; // already connected
135 }
136 CMutexGuard LOCK(m_Mutex);
137 // repeat test with mutex held to avoid races
138 if (m_Stream.get() && m_Stream->good()) {
139 return; // already connected
140 }
141 x_Connect();
142 m_RetryCtx.ResetNeedReconnect();
143 }
144
145
Disconnect(void)146 void CRPCClient_Base::Disconnect(void)
147 {
148 CMutexGuard LOCK(m_Mutex);
149 if ( !m_Stream.get() || !m_Stream->good() ) {
150 // not connected -- don't call x_Disconnect, which might
151 // temporarily reconnect to send a fini!
152 return;
153 }
154 x_Disconnect();
155 }
156
157
Reset(void)158 void CRPCClient_Base::Reset(void)
159 {
160 CMutexGuard LOCK(m_Mutex);
161 if (m_Stream.get() && m_Stream->good()) {
162 x_Disconnect();
163 }
164 x_Connect();
165 }
166
167
SetAffinity(const string & affinity)168 void CRPCClient_Base::SetAffinity(const string& affinity)
169 {
170 if (m_Affinity != affinity) {
171 if (m_RecursionCount > 1) {
172 ERR_POST("Affinity cannot be changed on a recursive request");
173 return;
174 }
175 Disconnect();
176 m_Affinity = affinity;
177 }
178 }
179
180
x_Disconnect(void)181 void CRPCClient_Base::x_Disconnect(void)
182 {
183 m_In.reset();
184 m_Out.reset();
185 m_Stream.reset();
186 }
187
188
x_SetStream(CNcbiIostream * stream)189 void CRPCClient_Base::x_SetStream(CNcbiIostream* stream)
190 {
191 m_In .reset();
192 m_Out.reset();
193 m_Stream.reset(stream);
194 m_In .reset(CObjectIStream::Open(m_Format, *stream));
195 m_Out.reset(CObjectOStream::Open(m_Format, *stream));
196 }
197
198
199 class CCounterGuard
200 {
201 public:
CCounterGuard(int * counter)202 CCounterGuard(int* counter)
203 : m_Counter(*counter)
204 {
205 m_Counter++;
206 }
207
~CCounterGuard(void)208 ~CCounterGuard(void)
209 {
210 m_Counter--;
211 }
212
213 private:
214 int& m_Counter;
215 };
216
x_Ask(const CSerialObject & request,CSerialObject & reply)217 void CRPCClient_Base::x_Ask(const CSerialObject& request, CSerialObject& reply)
218 {
219 CMutexGuard LOCK(m_Mutex);
220 if (m_RecursionCount == 0) {
221 m_TryCount = 0;
222 }
223 // Recursion counter needs to be decremented on both success and failure.
224 CCounterGuard recursion_guard(&m_RecursionCount);
225
226 const string& request_name = request.GetThisTypeInfo() != NULL
227 ? ("("+request.GetThisTypeInfo()->GetName()+")")
228 : "(no_request_type)";
229
230 // Reset headers from previous requests if any.
231 m_RetryCtx.Reset();
232 double max_span = m_RetryDelay.GetAsDouble()*m_TryLimit;
233 double span = max_span;
234 bool limit_by_time = !m_RetryDelay.IsEmpty();
235 // Retry context can be either the default one (m_RetryCtx), or provided
236 // through an exception.
237 for (;;) {
238 if ( IsCanceled() ) {
239 NCBI_THROW(CRPCClientException, eFailed,
240 "Request canceled " + request_name);
241 }
242 try {
243 SetAffinity(x_GetAffinity(request));
244 Connect(); // No-op if already connected
245 if ( m_RetryCtx.IsSetContentOverride() ) {
246 if (m_RetryCtx.GetContentOverride() != CHttpRetryContext::eNoContent &&
247 m_RetryCtx.IsSetContent() ) {
248 *m_Stream << m_RetryCtx.GetContent();
249 }
250 }
251 else {
252 // by default re-send the original request
253 x_WriteRequest(*m_Out, request);
254 }
255 m_Stream->peek(); // send data, read response headers
256 if (!m_Stream->good() && !m_Stream->eof()) {
257 NCBI_THROW(CRPCClientException, eFailed,
258 "Connection stream is in bad state " + request_name);
259 }
260 if (m_RetryCtx.IsSetContentOverride() &&
261 m_RetryCtx.GetContentOverride() == CHttpRetryContext::eFromResponse) {
262 // store response content to send it with the next retry
263 CNcbiOstrstream buf;
264 NcbiStreamCopy(buf, *m_Stream);
265 m_RetryCtx.SetContent(CNcbiOstrstreamToString(buf));
266 }
267 else {
268 // read normal response
269 x_ReadReply(*m_In, reply);
270 }
271 // If reading reply succeeded and no retry was requested by the server, break.
272 if ( !m_RetryCtx.GetNeedRetry() ) {
273 break;
274 }
275 } catch (CException& e) {
276 // Some exceptions tend to correspond to transient glitches;
277 // the remainder, however, may as well get propagated immediately.
278 CRPCClientException* rpc_ex = dynamic_cast<CRPCClientException*>(&e);
279 if (rpc_ex && rpc_ex->GetErrCode() == CRPCClientException::eRetry) {
280 if ( rpc_ex->IsSetRetryContext() ) {
281 // Save information to the local retry context and proceed.
282 m_RetryCtx = rpc_ex->GetRetryContext();
283 }
284 // proceed to retry
285 }
286 else if ( !dynamic_cast<CSerialException*>(&e)
287 && !dynamic_cast<CIOException*>(&e) ) {
288 // Not a retry related exception, abort.
289 throw;
290 }
291 }
292 // No retries for recursive requests (e.g. AskInit called by Connect).
293 // Exit immediately, do not reset retry context - it may be used by
294 // the main request's retry loop.
295 if (m_RecursionCount > 1) return;
296
297 // Retry request on exception or on explicit retry request from the server.
298
299 // If using time limit, allow to make more than m_RetryLimit attempts
300 // if the server has set shorter delay.
301 if ((!limit_by_time && ++m_TryCount >= m_TryLimit) ||
302 !x_ShouldRetry(m_TryCount)) {
303 NCBI_THROW(CRPCClientException, eFailed,
304 "Failed to receive reply after "
305 + NStr::NumericToString(m_TryCount)
306 + (m_TryCount == 1 ? " try " : " tries ")
307 + request_name );
308 }
309 if ( m_RetryCtx.IsSetStop() ) {
310 NCBI_THROW(CRPCClientException, eFailed,
311 "Retrying request stopped by the server: "
312 + m_RetryCtx.GetStopReason() + ' ' + request_name);
313 }
314 CTimeSpan delay = x_GetRetryDelay(span);
315 if ( !delay.IsEmpty() ) {
316 SleepSec(delay.GetCompleteSeconds());
317 SleepMicroSec(delay.GetNanoSecondsAfterSecond() / 1000);
318 span -= delay.GetAsDouble();
319 if (limit_by_time && span <= 0) {
320 NCBI_THROW(CRPCClientException, eFailed,
321 "Failed to receive reply in "
322 + CTimeSpan(max_span).AsSmartString()
323 + ' ' + request_name);
324 }
325 }
326 // Always reconnect on retry.
327 if ( IsCanceled() ) {
328 NCBI_THROW(CRPCClientException, eFailed,
329 "Request canceled " + request_name);
330 }
331 try {
332 Reset();
333 } STD_CATCH_ALL_XX(Serial_RPCClient, 1,
334 "CRPCClient_Base::Reset() " + request_name);
335 }
336 // Reset retry context when done.
337 m_RetryCtx.Reset();
338 // If there were any retries, force disconnect to prevent using old
339 // retry url, args etc. with the next request.
340 if ( m_TryCount > 0 && m_RecursionCount <= 1 ) {
341 Disconnect();
342 }
343 }
344
345
x_ShouldRetry(unsigned int tries)346 bool CRPCClient_Base::x_ShouldRetry(unsigned int tries) /* NCBI_FAKE_WARNING */
347 {
348 _TRACE("CRPCClient_Base::x_ShouldRetry: retrying after " << tries
349 << " failure(s)");
350 return true;
351 }
352
353
x_GetRetryDelay(double max_delay) const354 CTimeSpan CRPCClient_Base::x_GetRetryDelay(double max_delay) const
355 {
356 // If not set by the server, use local delay.
357 if ( !m_RetryCtx.IsSetDelay() ) {
358 return m_RetryDelay;
359 }
360 // If local delay is not zero, we have to limit total retries time to max_delay.
361 if (!m_RetryDelay.IsEmpty() &&
362 m_RetryCtx.GetDelay().GetAsDouble() > max_delay) {
363 return CTimeSpan(max_delay);
364 }
365 return m_RetryCtx.GetDelay();
366 }
367
368
GetContentTypeHeader(ESerialDataFormat format)369 const char* CRPCClient_Base::GetContentTypeHeader(ESerialDataFormat format)
370 {
371 switch (format) {
372 case eSerial_None:
373 break;
374 case eSerial_AsnText:
375 return "Content-Type: x-ncbi-data/x-asn-text\r\n";
376 case eSerial_AsnBinary:
377 return "Content-Type: x-ncbi-data/x-asn-binary\r\n";
378 case eSerial_Xml:
379 return "Content-Type: application/xml\r\n";
380 case eSerial_Json:
381 return "Content-Type: application/json\r\n";
382 }
383 return NULL; // kEmptyCStr?
384 }
385
386
GetErrCodeString(void) const387 const char* CRPCClientException::GetErrCodeString(void) const
388 {
389 switch (GetErrCode()) {
390 case eRetry: return "eRetry";
391 case eFailed: return "eFailed";
392 case eArgs: return "eArgs";
393 case eOther: return "eOther";
394 default: return CException::GetErrCodeString();
395 }
396 }
397
398
399 END_NCBI_SCOPE
400