1 /*  $Id: netcache_rw.cpp 619687 2020-11-09 15:14:58Z sadyrovr $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Maxim Didenko, Dmitry Kazimirov
27  *
28  * File Description:
29  *   Implementation of net cache client.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netcache_api_impl.hpp"
36 
37 #include <connect/services/netcache_api_expt.hpp>
38 #include <connect/services/error_codes.hpp>
39 
40 #ifdef NCBI_OS_LINUX
41 # include <sys/socket.h>
42 # include <netinet/in.h>
43 # include <netinet/tcp.h>
44 #endif
45 
46 
47 #define NCBI_USE_ERRCODE_X   ConnServ_NetCache
48 
49 
50 BEGIN_NCBI_SCOPE
51 
52 #define CACHE_XFER_BUFFER_SIZE 4096
53 #define MAX_PENDING_COUNT (1024 * 1024 * 1024)
54 
55 static const char s_InputBlobCachePrefix[] = ".nc_cache_input.";
56 static const char s_OutputBlobCachePrefix[] = ".nc_cache_output.";
57 
CNetCacheReader(SNetCacheAPIImpl * impl,const string & blob_id,CNetServer::SExecResult & exec_result,size_t * blob_size_ptr,const CNetCacheAPIParameters * parameters)58 CNetCacheReader::CNetCacheReader(SNetCacheAPIImpl* impl,
59         const string& blob_id,
60         CNetServer::SExecResult& exec_result,
61         size_t* blob_size_ptr,
62         const CNetCacheAPIParameters* parameters) :
63     m_BlobID(blob_id),
64     m_Connection(exec_result.conn)
65 {
66     switch (parameters->GetCachingMode()) {
67     case CNetCacheAPI::eCaching_AppDefault:
68         m_CachingEnabled = impl->m_CacheInput;
69         break;
70 
71     case CNetCacheAPI::eCaching_Disable:
72         m_CachingEnabled = false;
73         break;
74 
75     default: /* case CNetCacheAPI::eCaching_Enable: */
76         m_CachingEnabled = true;
77     }
78 
79     string::size_type pos = exec_result.response.find("SIZE=");
80 
81     if (pos == string::npos) {
82         exec_result.conn->Abort();
83         CONNSERV_THROW_FMT(CNetCacheException, eInvalidServerResponse,
84             m_Connection->m_Server,
85             "No SIZE field in reply to the blob reading command");
86     }
87 
88     m_BlobBytesToRead = m_BlobSize = NStr::StringToUInt8(
89         exec_result.response.c_str() + pos + sizeof("SIZE=") - 1,
90         NStr::fAllowTrailingSymbols);
91 
92     if (blob_size_ptr != NULL) {
93         *blob_size_ptr = CheckBlobSize(m_BlobBytesToRead);
94     }
95 
96     if (m_CachingEnabled) {
97         m_CacheFile.CreateTemporary(impl->m_TempDir, s_InputBlobCachePrefix);
98 
99         char buf[CACHE_XFER_BUFFER_SIZE];
100         Uint8 bytes_to_read = m_BlobBytesToRead;
101 
102         while (bytes_to_read > 0) {
103             size_t bytes_read = 0;
104             SocketRead(buf, sizeof(buf) <= bytes_to_read ?
105                 sizeof(buf) : (size_t) bytes_to_read, &bytes_read);
106             m_CacheFile.Write(buf, bytes_read);
107             bytes_to_read -= bytes_read;
108         }
109 
110         m_Connection = NULL;
111 
112         if (m_CacheFile.GetFilePos() != m_BlobBytesToRead) {
113             NCBI_THROW(CNetCacheException, eBlobClipped,
114                 "Blob size is greater than the amount of data cached for it");
115         }
116 
117         m_CacheFile.Flush();
118         m_CacheFile.SetFilePos(0);
119     }
120 }
121 
~CNetCacheReader()122 CNetCacheReader::~CNetCacheReader()
123 {
124     try {
125         Close();
126     } NCBI_CATCH_ALL_X(10, "CNetCacheReader::~CNetCacheReader()");
127 }
128 
Read(void * buf,size_t count,size_t * bytes_read_ptr)129 ERW_Result CNetCacheReader::Read(void*   buf,
130                                  size_t  count,
131                                  size_t* bytes_read_ptr)
132 {
133     if (m_BlobBytesToRead == 0) {
134         if (bytes_read_ptr != NULL)
135             *bytes_read_ptr = 0;
136         return eRW_Eof;
137     }
138 
139     if (m_BlobBytesToRead < count)
140         count = (size_t) m_BlobBytesToRead;
141 
142     size_t bytes_read = 0;
143 
144     if (count > 0) {
145         if (!m_CachingEnabled)
146             SocketRead(buf, count, &bytes_read);
147         else if ((bytes_read = m_CacheFile.Read(buf, count)) == 0) {
148             Uint8 remaining_bytes = m_BlobBytesToRead;
149             m_BlobBytesToRead = 0;
150             NCBI_THROW_FMT(CNetCacheException, eBlobClipped,
151                 "Unexpected EOF while reading file cache for " << m_BlobID <<
152                 " read from " <<
153                 m_Connection->m_Server->m_ServerInPool->m_Address.AsString() <<
154                 " (blob size: " << m_BlobSize <<
155                 ", unread bytes: " << remaining_bytes << ")");
156         }
157 
158         m_BlobBytesToRead -= bytes_read;
159     }
160 
161     if (bytes_read_ptr != NULL)
162         *bytes_read_ptr = bytes_read;
163 
164     return eRW_Success;
165 }
166 
PendingCount(size_t * count)167 ERW_Result CNetCacheReader::PendingCount(size_t* count)
168 {
169     if (m_CachingEnabled || m_BlobBytesToRead == 0) {
170         *count = m_BlobBytesToRead < MAX_PENDING_COUNT ?
171             (size_t) m_BlobBytesToRead : MAX_PENDING_COUNT;
172         return eRW_Success;
173     } else
174         return CSocketReaderWriter(&m_Connection->m_Socket).PendingCount(count);
175 }
176 
Close()177 void CNetCacheReader::Close()
178 {
179     if (m_CachingEnabled)
180         m_CacheFile.Close();
181     else if (m_BlobBytesToRead != 0)
182         m_Connection->Abort();
183 }
184 
SocketRead(void * buf,size_t count,size_t * bytes_read)185 void CNetCacheReader::SocketRead(void* buf, size_t count, size_t* bytes_read)
186 {
187 #ifdef NCBI_OS_LINUX
188     int fd = 0, val = 1;
189     m_Connection->m_Socket.GetOSHandle(&fd, sizeof(fd));
190     setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, sizeof(val));
191 #endif
192 
193     EIO_Status status = m_Connection->m_Socket.Read(buf, count, bytes_read);
194 
195     switch (status) {
196     case eIO_Success:
197         break;
198     case eIO_Timeout:
199         CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
200             m_Connection->m_Server,
201             "Timeout while reading blob contents");
202 
203     case eIO_Closed:
204         if (count > *bytes_read) {
205             Uint8 remaining_bytes = m_BlobBytesToRead;
206             m_BlobBytesToRead = 0;
207             CONNSERV_THROW_FMT(CNetCacheException, eBlobClipped,
208                 m_Connection->m_Server,
209                 "Unexpected EOF while reading " << m_BlobID <<
210                 " (blob size: " << m_BlobSize <<
211                 ", unread bytes: " << remaining_bytes << ")");
212         }
213         break;
214 
215     default:
216         CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
217             m_Connection->m_Server,
218             "Error while reading blob: " << IO_StatusStr(status));
219     }
220 }
221 
222 
223 /////////////////////////////////////////////////
CNetCacheWriter(SNetCacheAPIImpl * impl,string * blob_id,const string & key,ENetCacheResponseType response_type,const CNetCacheAPIParameters * parameters)224 CNetCacheWriter::CNetCacheWriter(SNetCacheAPIImpl* impl,
225         string* blob_id,
226         const string& key,
227         ENetCacheResponseType response_type,
228         const CNetCacheAPIParameters* parameters) :
229     m_ResponseType(response_type),
230     m_NetCacheAPI(impl),
231     m_BlobID(*blob_id),
232     m_Key(key),
233     m_Parameters(parameters)
234 {
235     switch (parameters->GetCachingMode()) {
236     case CNetCacheAPI::eCaching_AppDefault:
237         m_CachingEnabled = impl->m_CacheOutput;
238         break;
239 
240     case CNetCacheAPI::eCaching_Disable:
241         m_CachingEnabled = false;
242         break;
243 
244     default: /* case CNetCacheAPI::eCaching_Enable: */
245         m_CachingEnabled = true;
246     }
247 
248     if (m_CachingEnabled)
249         m_CacheFile.CreateTemporary(impl->m_TempDir, s_OutputBlobCachePrefix);
250 
251     if (!m_CachingEnabled || blob_id->empty()) {
252         EstablishConnection();
253         *blob_id = m_BlobID;
254     }
255 }
256 
~CNetCacheWriter()257 CNetCacheWriter::~CNetCacheWriter()
258 {
259     try {
260         Close();
261     } NCBI_CATCH_ALL_X(11, "Exception in ~CNetCacheWriter() [IGNORED]");
262 }
263 
Write(const void * buf,size_t count,size_t * bytes_written_ptr)264 ERW_Result CNetCacheWriter::Write(const void* buf,
265                                   size_t      count,
266                                   size_t*     bytes_written_ptr)
267 {
268     if (m_CachingEnabled) {
269         size_t bytes_written = m_CacheFile.Write(buf, count);
270         if (bytes_written_ptr != NULL)
271             *bytes_written_ptr = bytes_written;
272     } else if (IsConnectionOpen())
273         Transmit(buf, count, bytes_written_ptr);
274     else
275         return eRW_Error;
276 
277     return eRW_Success;
278 }
279 
Flush(void)280 ERW_Result CNetCacheWriter::Flush(void)
281 {
282     if (!m_CachingEnabled && IsConnectionOpen())
283         m_TransmissionWriter->Flush();
284 
285     return eRW_Success;
286 }
287 
Close()288 void CNetCacheWriter::Close()
289 {
290     if (m_CachingEnabled) {
291         m_CacheFile.Flush();
292 
293         bool blob_written = false;
294 
295         if (IsConnectionOpen()) {
296             try {
297                 UploadCacheFile();
298                 blob_written = true;
299             }
300             catch (CNetServiceException&) {
301             }
302         }
303 
304         if (!blob_written) {
305             EstablishConnection();
306 
307             UploadCacheFile();
308         }
309     }
310 
311     if (!IsConnectionOpen())
312         return;
313 
314     ERW_Result res = m_TransmissionWriter->Close();
315 
316     if (res != eRW_Success) {
317         AbortConnection();
318         if (res == eRW_Timeout) {
319             CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
320                 m_Connection->m_Server,
321                 "Timeout while sending EOF packet");
322         } else {
323             CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
324                 m_Connection->m_Server,
325                 "IO error while sending EOF packet");
326         }
327     }
328 
329     m_Connection->m_Socket.SetCork(false);
330 
331     if (m_ResponseType == eNetCache_Wait) {
332         try {
333             string dummy;
334             m_Connection->ReadCmdOutputLine(dummy, false);
335         }
336         catch (...) {
337             AbortConnection();
338             throw;
339         }
340     }
341 
342     ResetWriters();
343 
344     m_Connection = NULL;
345 }
346 
Abort()347 void CNetCacheWriter::Abort()
348 {
349     if (IsConnectionOpen())
350         AbortConnection();
351 }
352 
WriteBufferAndClose(const char * buf_ptr,size_t buf_size)353 void CNetCacheWriter::WriteBufferAndClose(const char* buf_ptr, size_t buf_size)
354 {
355     size_t bytes_written;
356 
357     while (buf_size > 0) {
358         if (Write(buf_ptr, buf_size, &bytes_written) != eRW_Success) {
359             NCBI_THROW(CNetServiceException, eCommunicationError, "Unknown error");
360         }
361 
362         buf_ptr += bytes_written;
363         buf_size -= bytes_written;
364     }
365 
366     Close();
367 }
368 
ResetWriters()369 void CNetCacheWriter::ResetWriters()
370 {
371     try {
372         m_TransmissionWriter.reset();
373         m_SocketReaderWriter.reset();
374     }
375     catch (...) {
376     }
377 }
378 
AbortConnection()379 void CNetCacheWriter::AbortConnection()
380 {
381     m_TransmissionWriter->SetSendEof(CTransmissionWriter::eDontSendEofPacket);
382 
383     ResetWriters();
384 
385     if (m_Connection->m_Socket.GetStatus(eIO_Open) != eIO_Closed)
386         m_Connection->Abort();
387 
388     m_Connection = NULL;
389 }
390 
TransmitImpl(const char * buf,size_t count)391 EIO_Status CNetCacheWriter::TransmitImpl(const char* buf, size_t count)
392 {
393     const STimeout timeout =
394         m_NetCacheAPI->m_Service->m_ServerPool.GetCommunicationTimeout();
395     CDeadline deadline(g_STimeoutToCTimeout(&timeout));
396 
397     vector<CSocketAPI::SPoll> poll(1,
398             CSocketAPI::SPoll(&m_Connection->m_Socket, eIO_ReadWrite));
399     EIO_Event& in = poll[0].m_Event;
400     EIO_Event& out = poll[0].m_REvent;
401     EIO_Status stat = eIO_Success;
402     ERW_Result res = eRW_Success;
403 
404     for (;;) {
405         const CNanoTimeout remaining = deadline.GetRemainingTime();
406         STimeout wait;
407         stat = CSocketAPI::Poll(poll, g_CTimeoutToSTimeout(remaining, wait));
408 
409         if (stat != eIO_Interrupt) {
410             if (stat != eIO_Success) {
411                 break;
412             }
413 
414             if (out == eIO_Close) {
415                 stat = eIO_Closed;
416                 break;
417             }
418 
419             if (out & eIO_Read) {
420                 string msg;
421 
422                 if (m_Connection->m_Socket.ReadLine(msg) != eIO_Closed) {
423                     if (!msg.empty()) {
424                         if (msg.find("ERR:") == 0) {
425                             msg.erase(0, 4);
426                             msg = NStr::ParseEscapes(msg);
427                         }
428 
429                         CONNSERV_THROW_FMT(CNetCacheException, eServerError,
430                                 m_Connection->m_Server, msg);
431                     }
432                 }
433             }
434 
435             // If we have done writing everything or due to some error
436             if (in == eIO_Read) {
437                 break;
438             }
439 
440             if (out & eIO_Write) {
441                 size_t written = 0;
442                 res = m_TransmissionWriter->Write(buf, count, &written);
443 
444                 if (res == eRW_Success) {
445                     buf += written;
446                     count -= written;
447                 }
448 
449                 // If there is an error on writing or we have done,
450                 // do a non-waiting check for incoming messages
451                 if (res != eRW_Success || !count) {
452                     in = eIO_Read;
453                     deadline = CDeadline(0, 0);
454                 } else {
455                     deadline = g_STimeoutToCTimeout(&timeout);
456                 }
457             }
458         }
459     }
460 
461     // If we have not done writing yet
462     if (in != eIO_Read) {
463         return stat;
464     }
465 
466     if (res == eRW_Success) {
467         return eIO_Success;
468     }
469 
470     CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
471             m_Connection->m_Server, g_RW_ResultToString(res));
472 }
473 
Transmit(const void * buf,size_t count,size_t * bytes_written)474 void CNetCacheWriter::Transmit(const void* buf,
475         size_t count, size_t* bytes_written)
476 {
477     try {
478         switch (TransmitImpl(static_cast<const char*>(buf), count))
479         {
480         case eIO_Closed:
481             CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
482                 m_Connection->m_Server,
483                 "Server closed communication channel (timeout?)");
484 
485         case eIO_Timeout:
486             CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
487                 m_Connection->m_Server,
488                     "Timeout while writing blob contents");
489 
490         case eIO_InvalidArg:
491         case eIO_NotSupported:
492             _TROUBLE;
493             /* FALL THROUGH if not DEBUG */
494 
495         case eIO_Unknown:
496             CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
497                 m_Connection->m_Server,
498                 "Unknown error");
499 
500         default:
501             if (bytes_written) *bytes_written = count;
502             return;
503         }
504     }
505     catch (...) {
506         AbortConnection();
507         throw;
508     }
509 }
510 
EstablishConnection()511 void CNetCacheWriter::EstablishConnection()
512 {
513     ResetWriters();
514 
515     m_Connection = m_NetCacheAPI->InitiateWriteCmd(this, m_Parameters);
516 
517     m_Connection->m_Socket.SetCork(true);
518 
519     m_SocketReaderWriter.reset(
520         new CSocketReaderWriter(&m_Connection->m_Socket, eNoOwnership,
521             eIO_WritePlain));
522 
523     m_TransmissionWriter.reset(
524         new CTransmissionWriter(m_SocketReaderWriter.get(),
525             eNoOwnership, CTransmissionWriter::eSendEofPacket));
526 }
527 
UploadCacheFile()528 void CNetCacheWriter::UploadCacheFile()
529 {
530     char buf[CACHE_XFER_BUFFER_SIZE];
531     size_t bytes_read;
532     size_t bytes_written;
533 
534     m_CacheFile.SetFilePos(0);
535     while ((bytes_read = m_CacheFile.Read(buf, sizeof(buf))) > 0)
536         Transmit(buf, bytes_read, &bytes_written);
537 }
538 
539 END_NCBI_SCOPE
540