1 #ifndef SERIAL___RPCBASE__HPP
2 #define SERIAL___RPCBASE__HPP
3 
4 /*  $Id: rpcbase.hpp 615583 2020-09-01 17:28:37Z grichenk $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Author:  Aaron Ucko, NCBI
30  *
31  * File Description:
32  *   Generic template class for ASN.1/XML RPC clients
33  *
34  */
35 
36 #include <corelib/ncbimtx.hpp>
37 #include <corelib/ncbi_system.hpp>
38 #include <connect/ncbi_conn_stream.hpp>
39 #include <connect/ncbi_util.h>
40 #include <serial/objistr.hpp>
41 #include <serial/objostr.hpp>
42 #include <serial/serial.hpp>
43 #include <util/retry_ctx.hpp>
44 #include <serial/rpcbase_impl.hpp>
45 
46 /** @addtogroup GenClassSupport
47  *
48  * @{
49  */
50 
51 
52 BEGIN_NCBI_SCOPE
53 
54 /// CRPCClient -- prototype client for ASN.1/XML-based RPC.
55 /// Normally connects automatically on the first real request and
56 /// disconnects automatically in the destructor, but allows both events
57 /// to occur explicitly.
58 
59 template <class TRequest, class TReply>
60 class CRPCClient : public    CObject,
61                    public    CRPCClient_Base,
62                    protected CConnIniter
63 {
64 public:
CRPCClient(const string & service=kEmptyStr)65     CRPCClient(const string& service = kEmptyStr)
66         : CRPCClient_Base(service, eSerial_AsnBinary),
67           m_Timeout(kDefaultTimeout)
68         {}
CRPCClient(const string & service,ESerialDataFormat format)69     CRPCClient(const string&     service,
70         ESerialDataFormat        format)
71         : CRPCClient_Base(service, format),
72         m_Timeout(kDefaultTimeout)
73     {}
CRPCClient(const string & service,ESerialDataFormat format,unsigned int try_limit)74     CRPCClient(const string& service,
75         ESerialDataFormat    format,
76         unsigned int         try_limit)
77         : CRPCClient_Base(service, format, try_limit),
78         m_Timeout(kDefaultTimeout)
79     {}
~CRPCClient(void)80     virtual ~CRPCClient(void)
81     {
82         if ( !sx_IsSpecial(m_Timeout) ) {
83             delete const_cast<STimeout*>(m_Timeout);
84         }
85     }
86 
Ask(const TRequest & request,TReply & reply)87     virtual void Ask(const TRequest& request, TReply& reply)
88     { x_Ask(request, reply); }
89 
WriteRequest(CObjectOStream & out,const TRequest & request)90     virtual void WriteRequest(CObjectOStream& out, const TRequest& request)
91     { out << request; }
92 
ReadReply(CObjectIStream & in,TReply & reply)93     virtual void ReadReply(CObjectIStream& in, TReply& reply)
94     { in >> reply; }
95 
96     EIO_Status      SetTimeout(const STimeout* timeout,
97                                EIO_Event direction = eIO_ReadWrite);
98     const STimeout* GetTimeout(EIO_Event direction = eIO_Read) const;
99 
100     EIO_Status AsyncConnect(void* handle_buf, size_t handle_size);
101 
102 protected:
GetAffinity(const TRequest &) const103     virtual string GetAffinity(const TRequest& /*request*/) const
104     {
105         return string();
106     }
107 
x_WriteRequest(CObjectOStream & out,const CSerialObject & request)108     virtual void x_WriteRequest(CObjectOStream& out, const CSerialObject& request) override
109     {
110         WriteRequest(out, dynamic_cast<const TRequest&>(request));
111     }
112 
x_ReadReply(CObjectIStream & in,CSerialObject & reply)113     virtual void x_ReadReply(CObjectIStream& in, CSerialObject& reply) override
114     {
115         ReadReply(in, dynamic_cast<TReply&>(reply));
116     }
117 
x_GetAffinity(const CSerialObject & request) const118     virtual string x_GetAffinity(const CSerialObject& request) const override
119     {
120         return GetAffinity(dynamic_cast<const TRequest&>(request));
121     }
122 
123     virtual void x_Connect(void) override;
124 
125     /// Connect to a URL.  (Discouraged; please establish and use a
126     /// suitable named service if possible.)
127     void x_ConnectURL(const string& url);
128 
129     // CConn_HttpStream callback for parsing headers.
130     // 'user_data' must point to an instance of CRPCConnStatus.
131     static EHTTP_HeaderParse sx_ParseHeader(const char* http_header,
132                                             void*       user_data,
133                                             int         server_error);
134 
135     static bool sx_IsSpecial(const STimeout* timeout);
136 
137     const STimeout*          m_Timeout; ///< Cloned if not special.
138 
139 private:
140     void x_FillConnNetInfo(SConnNetInfo& net_info, SSERVICE_Extra* x_extra);
141 
142     unique_ptr<CConn_ServiceStream> m_AsyncStream;
143 };
144 
145 
146 ///////////////////////////////////////////////////////////////////////////
147 // Inline methods
148 
149 template<>
150 struct Deleter<SConnNetInfo>
151 {
DeleteDeleter152     static void Delete(SConnNetInfo* net_info)
153     { ConnNetInfo_Destroy(net_info); }
154 };
155 
156 
157 template<class TRequest, class TReply>
158 inline
x_FillConnNetInfo(SConnNetInfo & net_info,SSERVICE_Extra * x_extra)159 void CRPCClient<TRequest, TReply>::x_FillConnNetInfo(SConnNetInfo& net_info,
160                                                      SSERVICE_Extra* x_extra)
161 {
162     if ( !m_Args.empty() ) {
163         if ( !ConnNetInfo_AppendArg(&net_info, m_Args.c_str(), 0) ) {
164             NCBI_THROW(CRPCClientException, eArgs,
165                 "Error sending additional request arguments");
166         }
167     }
168     if ( m_RetryCtx.IsSetArgs() ) {
169         if ( !ConnNetInfo_AppendArg(&net_info, m_RetryCtx.GetArgs().c_str(),
170                                     0) ) {
171             NCBI_THROW(CRPCClientException, eArgs,
172                 "Error sending retry context arguments");
173         }
174     } else if (x_extra != nullptr  &&  !m_Affinity.empty()) {
175         if ( !ConnNetInfo_PostOverrideArg(&net_info, m_Affinity.c_str(), 0) ) {
176             NCBI_THROW(CRPCClientException, eArgs,
177                 "Error sending request affinity");
178         }
179     }
180     if (x_extra == nullptr) {
181         return;
182     }
183     // Install callback for parsing headers.
184     memset(x_extra, 0, sizeof(*x_extra));
185     x_extra->data = &m_RetryCtx;
186     x_extra->parse_header = sx_ParseHeader;
187     x_extra->flags = fHTTP_NoAutoRetry;
188     const char* user_header = GetContentTypeHeader(GetFormat());
189     if (user_header != NULL  &&  *user_header != '\0') {
190         if ( !ConnNetInfo_OverrideUserHeader(&net_info, user_header)) {
191             NCBI_THROW(CRPCClientException, eArgs,
192                 "Error sending user header");
193         }
194     }
195 }
196 
197 template<class TRequest, class TReply>
198 inline
x_Connect(void)199 void CRPCClient<TRequest, TReply>::x_Connect(void)
200 {
201     if (m_AsyncStream.get() != nullptr) {
202         m_AsyncStream->SetTimeout(eIO_Open, m_Timeout);
203         m_AsyncStream->SetTimeout(eIO_ReadWrite, m_Timeout);
204         x_SetStream(m_AsyncStream.release());
205         return;
206     } else if ( m_RetryCtx.IsSetUrl() ) {
207         x_ConnectURL(m_RetryCtx.GetUrl());
208         return;
209     }
210     _ASSERT( !m_Service.empty() );
211     SSERVICE_Extra x_extra;
212     AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
213     x_FillConnNetInfo(*net_info, &x_extra);
214 
215     unique_ptr<CConn_ServiceStream> stream
216         (new CConn_ServiceStream
217          (m_Service, fSERV_Any | fSERV_DelayOpen, net_info.get(), &x_extra, m_Timeout));
218     if ( m_Canceler.NotNull() ) {
219         stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
220     }
221     x_SetStream(stream.release());
222 }
223 
224 
225 template<class TRequest, class TReply>
226 inline
x_ConnectURL(const string & url)227 void CRPCClient<TRequest, TReply>::x_ConnectURL(const string& url)
228 {
229     AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(nullptr));
230     if ( !ConnNetInfo_ParseURL(net_info.get(), url.c_str()) ) {
231         NCBI_THROW(CCoreException, eInvalidArg, "Error parsing URL " + url);
232     }
233     x_FillConnNetInfo(*net_info, nullptr);
234     unique_ptr<CConn_HttpStream> stream(new CConn_HttpStream(net_info.get(),
235         GetContentTypeHeader(GetFormat()),
236         sx_ParseHeader, // callback
237         &m_RetryCtx,    // user data for the callback
238         0, // adjust callback
239         0, // cleanup callback
240         fHTTP_AutoReconnect | fHTTP_NoAutoRetry,
241         m_Timeout));
242     if ( m_Canceler.NotNull() ) {
243         stream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
244     }
245     x_SetStream(stream.release());
246 }
247 
248 
249 template<class TRequest, class TReply>
250 inline
SetTimeout(const STimeout * timeout,EIO_Event direction)251 EIO_Status CRPCClient<TRequest, TReply>::SetTimeout(const STimeout* timeout,
252                                                     EIO_Event direction)
253 {
254     // save for future use, especially if there's no stream at present.
255     {{
256         const STimeout* old_timeout = m_Timeout;
257         if (sx_IsSpecial(timeout)) {
258             m_Timeout = timeout;
259         } else { // make a copy
260             m_Timeout = new STimeout(*timeout);
261         }
262         if ( !sx_IsSpecial(old_timeout) ) {
263             delete const_cast<STimeout*>(old_timeout);
264         }
265     }}
266 
267     CConn_IOStream* conn_stream
268         = dynamic_cast<CConn_IOStream*>(m_Stream.get());
269     if (conn_stream) {
270         return conn_stream->SetTimeout(direction, timeout);
271     } else if ( !m_Stream.get() ) {
272         return eIO_Success; // we've saved it, which is the best we can do...
273     } else {
274         return eIO_NotSupported;
275     }
276 }
277 
278 
279 template<class TRequest, class TReply>
280 inline
GetTimeout(EIO_Event direction) const281 const STimeout* CRPCClient<TRequest, TReply>::GetTimeout(EIO_Event direction)
282     const
283 {
284     CConn_IOStream* conn_stream
285         = dynamic_cast<CConn_IOStream*>(m_Stream.get());
286     if (conn_stream) {
287         return conn_stream->GetTimeout(direction);
288     }
289     else {
290         return m_Timeout;
291     }
292 }
293 
294 
295 template<class TRequest, class TReply>
296 inline
AsyncConnect(void * handle_buf,size_t handle_size)297 EIO_Status CRPCClient<TRequest, TReply>::AsyncConnect(void* handle_buf,
298                                                       size_t handle_size)
299 {
300     static const STimeout kZeroTimeout = { 0, 0 };
301     _ASSERT( !m_Service.empty() );
302     SSERVICE_Extra x_extra;
303     AutoPtr<SConnNetInfo> net_info(ConnNetInfo_Create(m_Service.c_str()));
304     x_FillConnNetInfo(*net_info, &x_extra);
305 
306     m_AsyncStream.reset(
307         new CConn_ServiceStream(m_Service, fSERV_Any, net_info.get(), &x_extra,
308                                 &kZeroTimeout));
309     if (m_Timeout == kDefaultTimeout) {
310         m_Timeout = kInfiniteTimeout;
311     }
312     EIO_Status status = m_AsyncStream->Status();
313     if ( m_Canceler.NotNull() ) {
314         m_AsyncStream->SetCanceledCallback(m_Canceler.GetNonNullPointer());
315     }
316     if (handle_buf != nullptr) {
317         CONN conn = m_AsyncStream->GetCONN();
318         if (conn != nullptr) {
319             SOCK sock = nullptr;
320             if ((status = CONN_GetSOCK(conn, &sock)) == eIO_Success
321                 &&  sock != nullptr) {
322                 status = SOCK_GetOSHandle(sock, handle_buf, handle_size);
323             }
324         }
325     }
326     return status;
327 }
328 
329 
330 template<class TRequest, class TReply>
331 inline
332 EHTTP_HeaderParse
sx_ParseHeader(const char * http_header,void * user_data,int)333 CRPCClient<TRequest, TReply>::sx_ParseHeader(const char* http_header,
334                                              void*       user_data,
335                                              int         /*server_error*/)
336 {
337     if ( !user_data ) {
338         return eHTTP_HeaderContinue;
339     }
340     CHttpRetryContext* retry_ctx = reinterpret_cast<CHttpRetryContext*>(user_data);
341     _ASSERT(retry_ctx);
342     retry_ctx->ParseHeader(http_header);
343 
344     // Always read response body - normal content or error.
345     return eHTTP_HeaderContinue;
346 }
347 
348 
349 template<class TRequest, class TReply>
350 inline
sx_IsSpecial(const STimeout * timeout)351 bool CRPCClient<TRequest, TReply>::sx_IsSpecial(const STimeout* timeout)
352 {
353     return timeout == kDefaultTimeout  ||  timeout == kInfiniteTimeout;
354 }
355 
356 
357 END_NCBI_SCOPE
358 
359 
360 /* @} */
361 
362 #endif  /* SERIAL___RPCBASE__HPP */
363