1 #ifndef SERIAL___RPCBASE__HPP
2 #define SERIAL___RPCBASE__HPP
3
4 /* $Id: rpcbase.hpp 615583 2020-09-01 17:28:37Z grichenk $
5 * ===========================================================================
6 *
7 * PUBLIC DOMAIN NOTICE
8 * National Center for Biotechnology Information
9 *
10 * This software/database is a "United States Government Work" under the
11 * terms of the United States Copyright Act. It was written as part of
12 * the author's official duties as a United States Government employee and
13 * thus cannot be copyrighted. This software/database is freely available
14 * to the public for use. The National Library of Medicine and the U.S.
15 * Government have not placed any restriction on its use or reproduction.
16 *
17 * Although all reasonable efforts have been taken to ensure the accuracy
18 * and reliability of the software and data, the NLM and the U.S.
19 * Government do not and cannot warrant the performance or results that
20 * may be obtained by using this software or data. The NLM and the U.S.
21 * Government disclaim all warranties, express or implied, including
22 * warranties of performance, merchantability or fitness for any particular
23 * purpose.
24 *
25 * Please cite the author in any work or product based on this material.
26 *
27 * ===========================================================================
28 *
29 * Author: Aaron Ucko, NCBI
30 *
31 * File Description:
32 * Generic template class for ASN.1/XML RPC clients
33 *
34 */
35
36 #include <corelib/ncbimtx.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <connect/ncbi_conn_stream.hpp>
39 #include <connect/ncbi_util.h>
40 #include <serial/objistr.hpp>
41 #include <serial/objostr.hpp>
42 #include <serial/serial.hpp>
43 #include <util/retry_ctx.hpp>
44 #include <serial/rpcbase_impl.hpp>
45
46 /** @addtogroup GenClassSupport
47 *
48 * @{
49 */
50
51
52 BEGIN_NCBI_SCOPE
53
54 /// CRPCClient -- prototype client for ASN.1/XML-based RPC.
55 /// Normally connects automatically on the first real request and
56 /// disconnects automatically in the destructor, but allows both events
57 /// to occur explicitly.
58
59 template <class TRequest, class TReply>
60 class CRPCClient : public CObject,
61 public CRPCClient_Base,
62 protected CConnIniter
63 {
64 public:
CRPCClient(const string & service=kEmptyStr)65 CRPCClient(const string& service = kEmptyStr)
66 : CRPCClient_Base(service, eSerial_AsnBinary),
67 m_Timeout(kDefaultTimeout)
68 {}
CRPCClient(const string & service,ESerialDataFormat format)69 CRPCClient(const string& service,
70 ESerialDataFormat format)
71 : CRPCClient_Base(service, format),
72 m_Timeout(kDefaultTimeout)
73 {}
CRPCClient(const string & service,ESerialDataFormat format,unsigned int try_limit)74 CRPCClient(const string& service,
75 ESerialDataFormat format,
76 unsigned int try_limit)
77 : CRPCClient_Base(service, format, try_limit),
78 m_Timeout(kDefaultTimeout)
79 {}
~CRPCClient(void)80 virtual ~CRPCClient(void)
81 {
82 if ( !sx_IsSpecial(m_Timeout) ) {
83 delete const_cast<STimeout*>(m_Timeout);
84 }
85 }
86
Ask(const TRequest & request,TReply & reply)87 virtual void Ask(const TRequest& request, TReply& reply)
88 { x_Ask(request, reply); }
89
WriteRequest(CObjectOStream & out,const TRequest & request)90 virtual void WriteRequest(CObjectOStream& out, const TRequest& request)
91 { out << request; }
92
ReadReply(CObjectIStream & in,TReply & reply)93 virtual void ReadReply(CObjectIStream& in, TReply& reply)
94 { in >> reply; }
95
96 EIO_Status SetTimeout(const STimeout* timeout,
97 EIO_Event direction = eIO_ReadWrite);
98 const STimeout* GetTimeout(EIO_Event direction = eIO_Read) const;
99
100 EIO_Status AsyncConnect(void* handle_buf, size_t handle_size);
101
102 protected:
GetAffinity(const TRequest &) const103 virtual string GetAffinity(const TRequest& /*request*/) const
104 {
105 return string();
106 }
107
x_WriteRequest(CObjectOStream & out,const CSerialObject & request)108 virtual void x_WriteRequest(CObjectOStream& out, const CSerialObject& request) override
109 {
110 WriteRequest(out, dynamic_cast<const TRequest&>(request));
111 }
112
x_ReadReply(CObjectIStream & in,CSerialObject & reply)113 virtual void x_ReadReply(CObjectIStream& in, CSerialObject& reply) override
114 {
115 ReadReply(in, dynamic_cast<TReply&>(reply));
116 }
117
x_GetAffinity(const CSerialObject & request) const118 virtual string x_GetAffinity(const CSerialObject& request) const override
119 {
120 return GetAffinity(dynamic_cast<const TRequest&>(request));
121 }
122
123 virtual void x_Connect(void) override;
124
125 /// Connect to a URL. (Discouraged; please establish and use a
126 /// suitable named service if possible.)
127 void x_ConnectURL(const string& url);
128
129 // CConn_HttpStream callback for parsing headers.
130 // 'user_data' must point to an instance of CRPCConnStatus.
131 static EHTTP_HeaderParse sx_ParseHeader(const char* http_header,
132 void* user_data,
133 int server_error);
134
135 static bool sx_IsSpecial(const STimeout* timeout);
136
137 const STimeout* m_Timeout; ///< Cloned if not special.
138
139 private:
140 void x_FillConnNetInfo(SConnNetInfo& net_info, SSERVICE_Extra* x_extra);
141
142 unique_ptr<CConn_ServiceStream> m_AsyncStream;
143 };
144
145
146 ///////////////////////////////////////////////////////////////////////////
147 // Inline methods
148
149 template<>
150 struct Deleter<SConnNetInfo>
151 {
DeleteDeleter152 static void Delete(SConnNetInfo* net_info)
153 { ConnNetInfo_Destroy(net_info); }
154 };
155
156
157 template<class TRequest, class TReply>
158 inline
x_FillConnNetInfo(SConnNetInfo & net_info,SSERVICE_Extra * x_extra)159 void CRPCClient<TRequest, TReply>::x_FillConnNetInfo(SConnNetInfo& net_info,
160 SSERVICE_Extra* x_extra)
161 {
162 if ( !m_Args.empty() ) {
163 if ( !ConnNetInfo_AppendArg(&net_info, m_Args.c_str(), 0) ) {
164 NCBI_THROW(CRPCClientException, eArgs,
165 "Error sending additional request arguments");
166 }
167 }
168 if ( m_RetryCtx.IsSetArgs() ) {
169 if ( !ConnNetInfo_AppendArg(&net_info, m_RetryCtx.GetArgs().c_str(),
170 0) ) {
171 NCBI_THROW(CRPCClientException, eArgs,
172 "Error sending retry context arguments");
173 }
174 } else if (x_extra != nullptr && !m_Affinity.empty()) {
175 if ( !ConnNetInfo_PostOverrideArg(&net_info, m_Affinity.c_str(), 0) ) {
176 NCBI_THROW(CRPCClientException, eArgs,
177 "Error sending request affinity");
178 }
179 }
180 if (x_extra == nullptr) {
181 return;
182 }
183 // Install callback for parsing headers.
184 memset(x_extra, 0, sizeof(*x_extra));
185 x_extra->data = &m_RetryCtx;
186 x_extra->parse_header = sx_ParseHeader;
187 x_extra->flags = fHTTP_NoAutoRetry;
188 const char* user_header = GetContentTypeHeader(GetFormat());
189 if (user_header != NULL && *user_header != '\0') {
190 if ( !ConnNetInfo_OverrideUserHeader(&net_info, user_header)) {
191 NCBI_THROW(CRPCClientException, eArgs,
192 "Error sending user header");
193 }
194 }
195 }
196
197 template<class TRequest, class TReply>
198 inline
x_Connect(void)199 void CRPCClient<TRequest, TReply>::x_Connect(void)
200 {
201 if (m_AsyncStream.get() != nullptr) {
202 m_AsyncStream->SetTimeout(eIO_Open, m_Timeout);
203 m_AsyncStream->SetTimeout(eIO_ReadWrite, m_Timeout);
204 x_SetStream(m_AsyncStream.release());
205 return;
206 } else if ( m_RetryCtx.IsSetUrl() ) {
207 x_ConnectURL(m_RetryCtx.GetUrl());
208 return;
209 }
210 _ASSERT( !m_Service.empty() );
211 SSERVICE_Extra x_extra;
212 AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
213 x_FillConnNetInfo(*net_info, &x_extra);
214
215 unique_ptr<CConn_ServiceStream> stream
216 (new CConn_ServiceStream
217 (m_Service, fSERV_Any | fSERV_DelayOpen, net_info.get(), &x_extra, m_Timeout));
218 if ( m_Canceler.NotNull() ) {
219 stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
220 }
221 x_SetStream(stream.release());
222 }
223
224
225 template<class TRequest, class TReply>
226 inline
x_ConnectURL(const string & url)227 void CRPCClient<TRequest, TReply>::x_ConnectURL(const string& url)
228 {
229 AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(nullptr));
230 if ( !ConnNetInfo_ParseURL(net_info.get(), url.c_str()) ) {
231 NCBI_THROW(CCoreException, eInvalidArg, "Error parsing URL " + url);
232 }
233 x_FillConnNetInfo(*net_info, nullptr);
234 unique_ptr<CConn_HttpStream> stream(new CConn_HttpStream(net_info.get(),
235 GetContentTypeHeader(GetFormat()),
236 sx_ParseHeader, // callback
237 &m_RetryCtx, // user data for the callback
238 0, // adjust callback
239 0, // cleanup callback
240 fHTTP_AutoReconnect | fHTTP_NoAutoRetry,
241 m_Timeout));
242 if ( m_Canceler.NotNull() ) {
243 stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
244 }
245 x_SetStream(stream.release());
246 }
247
248
249 template<class TRequest, class TReply>
250 inline
SetTimeout(const STimeout * timeout,EIO_Event direction)251 EIO_Status CRPCClient<TRequest, TReply>::SetTimeout(const STimeout* timeout,
252 EIO_Event direction)
253 {
254 // save for future use, especially if there's no stream at present.
255 {{
256 const STimeout* old_timeout = m_Timeout;
257 if (sx_IsSpecial(timeout)) {
258 m_Timeout = timeout;
259 } else { // make a copy
260 m_Timeout = new STimeout(*timeout);
261 }
262 if ( !sx_IsSpecial(old_timeout) ) {
263 delete const_cast<STimeout*>(old_timeout);
264 }
265 }}
266
267 CConn_IOStream* conn_stream
268 = dynamic_cast<CConn_IOStream*>(m_Stream.get());
269 if (conn_stream) {
270 return conn_stream->SetTimeout(direction, timeout);
271 } else if ( !m_Stream.get() ) {
272 return eIO_Success; // we've saved it, which is the best we can do...
273 } else {
274 return eIO_NotSupported;
275 }
276 }
277
278
279 template<class TRequest, class TReply>
280 inline
GetTimeout(EIO_Event direction) const281 const STimeout* CRPCClient<TRequest, TReply>::GetTimeout(EIO_Event direction)
282 const
283 {
284 CConn_IOStream* conn_stream
285 = dynamic_cast<CConn_IOStream*>(m_Stream.get());
286 if (conn_stream) {
287 return conn_stream->GetTimeout(direction);
288 }
289 else {
290 return m_Timeout;
291 }
292 }
293
294
295 template<class TRequest, class TReply>
296 inline
AsyncConnect(void * handle_buf,size_t handle_size)297 EIO_Status CRPCClient<TRequest, TReply>::AsyncConnect(void* handle_buf,
298 size_t handle_size)
299 {
300 static const STimeout kZeroTimeout = { 0, 0 };
301 _ASSERT( !m_Service.empty() );
302 SSERVICE_Extra x_extra;
303 AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
304 x_FillConnNetInfo(*net_info, &x_extra);
305
306 m_AsyncStream.reset(
307 new CConn_ServiceStream(m_Service, fSERV_Any, net_info.get(), &x_extra,
308 &kZeroTimeout));
309 if (m_Timeout == kDefaultTimeout) {
310 m_Timeout = kInfiniteTimeout;
311 }
312 EIO_Status status = m_AsyncStream->Status();
313 if ( m_Canceler.NotNull() ) {
314 m_AsyncStream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
315 }
316 if (handle_buf != nullptr) {
317 CONN conn = m_AsyncStream->GetCONN();
318 if (conn != nullptr) {
319 SOCK sock = nullptr;
320 if ((status = CONN_GetSOCK(conn, &sock)) == eIO_Success
321 && sock != nullptr) {
322 status = SOCK_GetOSHandle(sock, handle_buf, handle_size);
323 }
324 }
325 }
326 return status;
327 }
328
329
330 template<class TRequest, class TReply>
331 inline
332 EHTTP_HeaderParse
sx_ParseHeader(const char * http_header,void * user_data,int)333 CRPCClient<TRequest, TReply>::sx_ParseHeader(const char* http_header,
334 void* user_data,
335 int /*server_error*/)
336 {
337 if ( !user_data ) {
338 return eHTTP_HeaderContinue;
339 }
340 CHttpRetryContext* retry_ctx = reinterpret_cast<CHttpRetryContext*>(user_data);
341 _ASSERT(retry_ctx);
342 retry_ctx->ParseHeader(http_header);
343
344 // Always read response body - normal content or error.
345 return eHTTP_HeaderContinue;
346 }
347
348
349 template<class TRequest, class TReply>
350 inline
sx_IsSpecial(const STimeout * timeout)351 bool CRPCClient<TRequest, TReply>::sx_IsSpecial(const STimeout* timeout)
352 {
353 return timeout == kDefaultTimeout || timeout == kInfiniteTimeout;
354 }
355
356
357 END_NCBI_SCOPE
358
359
360 /* @} */
361
362 #endif /* SERIAL___RPCBASE__HPP */
363