1 /* $Id: netcache_rw.cpp 619687 2020-11-09 15:14:58Z sadyrovr $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Author: Maxim Didenko, Dmitry Kazimirov
27 *
28 * File Description:
29 * Implementation of net cache client.
30 *
31 */
32
33 #include <ncbi_pch.hpp>
34
35 #include "netcache_api_impl.hpp"
36
37 #include <connect/services/netcache_api_expt.hpp>
38 #include <connect/services/error_codes.hpp>
39
40 #ifdef NCBI_OS_LINUX
41 # include <sys/socket.h>
42 # include <netinet/in.h>
43 # include <netinet/tcp.h>
44 #endif
45
46
47 #define NCBI_USE_ERRCODE_X ConnServ_NetCache
48
49
50 BEGIN_NCBI_SCOPE
51
52 #define CACHE_XFER_BUFFER_SIZE 4096
53 #define MAX_PENDING_COUNT (1024 * 1024 * 1024)
54
55 static const char s_InputBlobCachePrefix[] = ".nc_cache_input.";
56 static const char s_OutputBlobCachePrefix[] = ".nc_cache_output.";
57
CNetCacheReader(SNetCacheAPIImpl * impl,const string & blob_id,CNetServer::SExecResult & exec_result,size_t * blob_size_ptr,const CNetCacheAPIParameters * parameters)58 CNetCacheReader::CNetCacheReader(SNetCacheAPIImpl* impl,
59 const string& blob_id,
60 CNetServer::SExecResult& exec_result,
61 size_t* blob_size_ptr,
62 const CNetCacheAPIParameters* parameters) :
63 m_BlobID(blob_id),
64 m_Connection(exec_result.conn)
65 {
66 switch (parameters->GetCachingMode()) {
67 case CNetCacheAPI::eCaching_AppDefault:
68 m_CachingEnabled = impl->m_CacheInput;
69 break;
70
71 case CNetCacheAPI::eCaching_Disable:
72 m_CachingEnabled = false;
73 break;
74
75 default: /* case CNetCacheAPI::eCaching_Enable: */
76 m_CachingEnabled = true;
77 }
78
79 string::size_type pos = exec_result.response.find("SIZE=");
80
81 if (pos == string::npos) {
82 exec_result.conn->Abort();
83 CONNSERV_THROW_FMT(CNetCacheException, eInvalidServerResponse,
84 m_Connection->m_Server,
85 "No SIZE field in reply to the blob reading command");
86 }
87
88 m_BlobBytesToRead = m_BlobSize = NStr::StringToUInt8(
89 exec_result.response.c_str() + pos + sizeof("SIZE=") - 1,
90 NStr::fAllowTrailingSymbols);
91
92 if (blob_size_ptr != NULL) {
93 *blob_size_ptr = CheckBlobSize(m_BlobBytesToRead);
94 }
95
96 if (m_CachingEnabled) {
97 m_CacheFile.CreateTemporary(impl->m_TempDir, s_InputBlobCachePrefix);
98
99 char buf[CACHE_XFER_BUFFER_SIZE];
100 Uint8 bytes_to_read = m_BlobBytesToRead;
101
102 while (bytes_to_read > 0) {
103 size_t bytes_read = 0;
104 SocketRead(buf, sizeof(buf) <= bytes_to_read ?
105 sizeof(buf) : (size_t) bytes_to_read, &bytes_read);
106 m_CacheFile.Write(buf, bytes_read);
107 bytes_to_read -= bytes_read;
108 }
109
110 m_Connection = NULL;
111
112 if (m_CacheFile.GetFilePos() != m_BlobBytesToRead) {
113 NCBI_THROW(CNetCacheException, eBlobClipped,
114 "Blob size is greater than the amount of data cached for it");
115 }
116
117 m_CacheFile.Flush();
118 m_CacheFile.SetFilePos(0);
119 }
120 }
121
~CNetCacheReader()122 CNetCacheReader::~CNetCacheReader()
123 {
124 try {
125 Close();
126 } NCBI_CATCH_ALL_X(10, "CNetCacheReader::~CNetCacheReader()");
127 }
128
Read(void * buf,size_t count,size_t * bytes_read_ptr)129 ERW_Result CNetCacheReader::Read(void* buf,
130 size_t count,
131 size_t* bytes_read_ptr)
132 {
133 if (m_BlobBytesToRead == 0) {
134 if (bytes_read_ptr != NULL)
135 *bytes_read_ptr = 0;
136 return eRW_Eof;
137 }
138
139 if (m_BlobBytesToRead < count)
140 count = (size_t) m_BlobBytesToRead;
141
142 size_t bytes_read = 0;
143
144 if (count > 0) {
145 if (!m_CachingEnabled)
146 SocketRead(buf, count, &bytes_read);
147 else if ((bytes_read = m_CacheFile.Read(buf, count)) == 0) {
148 Uint8 remaining_bytes = m_BlobBytesToRead;
149 m_BlobBytesToRead = 0;
150 NCBI_THROW_FMT(CNetCacheException, eBlobClipped,
151 "Unexpected EOF while reading file cache for " << m_BlobID <<
152 " read from " <<
153 m_Connection->m_Server->m_ServerInPool->m_Address.AsString() <<
154 " (blob size: " << m_BlobSize <<
155 ", unread bytes: " << remaining_bytes << ")");
156 }
157
158 m_BlobBytesToRead -= bytes_read;
159 }
160
161 if (bytes_read_ptr != NULL)
162 *bytes_read_ptr = bytes_read;
163
164 return eRW_Success;
165 }
166
PendingCount(size_t * count)167 ERW_Result CNetCacheReader::PendingCount(size_t* count)
168 {
169 if (m_CachingEnabled || m_BlobBytesToRead == 0) {
170 *count = m_BlobBytesToRead < MAX_PENDING_COUNT ?
171 (size_t) m_BlobBytesToRead : MAX_PENDING_COUNT;
172 return eRW_Success;
173 } else
174 return CSocketReaderWriter(&m_Connection->m_Socket).PendingCount(count);
175 }
176
Close()177 void CNetCacheReader::Close()
178 {
179 if (m_CachingEnabled)
180 m_CacheFile.Close();
181 else if (m_BlobBytesToRead != 0)
182 m_Connection->Abort();
183 }
184
SocketRead(void * buf,size_t count,size_t * bytes_read)185 void CNetCacheReader::SocketRead(void* buf, size_t count, size_t* bytes_read)
186 {
187 #ifdef NCBI_OS_LINUX
188 int fd = 0, val = 1;
189 m_Connection->m_Socket.GetOSHandle(&fd, sizeof(fd));
190 setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, sizeof(val));
191 #endif
192
193 EIO_Status status = m_Connection->m_Socket.Read(buf, count, bytes_read);
194
195 switch (status) {
196 case eIO_Success:
197 break;
198 case eIO_Timeout:
199 CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
200 m_Connection->m_Server,
201 "Timeout while reading blob contents");
202
203 case eIO_Closed:
204 if (count > *bytes_read) {
205 Uint8 remaining_bytes = m_BlobBytesToRead;
206 m_BlobBytesToRead = 0;
207 CONNSERV_THROW_FMT(CNetCacheException, eBlobClipped,
208 m_Connection->m_Server,
209 "Unexpected EOF while reading " << m_BlobID <<
210 " (blob size: " << m_BlobSize <<
211 ", unread bytes: " << remaining_bytes << ")");
212 }
213 break;
214
215 default:
216 CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
217 m_Connection->m_Server,
218 "Error while reading blob: " << IO_StatusStr(status));
219 }
220 }
221
222
223 /////////////////////////////////////////////////
CNetCacheWriter(SNetCacheAPIImpl * impl,string * blob_id,const string & key,ENetCacheResponseType response_type,const CNetCacheAPIParameters * parameters)224 CNetCacheWriter::CNetCacheWriter(SNetCacheAPIImpl* impl,
225 string* blob_id,
226 const string& key,
227 ENetCacheResponseType response_type,
228 const CNetCacheAPIParameters* parameters) :
229 m_ResponseType(response_type),
230 m_NetCacheAPI(impl),
231 m_BlobID(*blob_id),
232 m_Key(key),
233 m_Parameters(parameters)
234 {
235 switch (parameters->GetCachingMode()) {
236 case CNetCacheAPI::eCaching_AppDefault:
237 m_CachingEnabled = impl->m_CacheOutput;
238 break;
239
240 case CNetCacheAPI::eCaching_Disable:
241 m_CachingEnabled = false;
242 break;
243
244 default: /* case CNetCacheAPI::eCaching_Enable: */
245 m_CachingEnabled = true;
246 }
247
248 if (m_CachingEnabled)
249 m_CacheFile.CreateTemporary(impl->m_TempDir, s_OutputBlobCachePrefix);
250
251 if (!m_CachingEnabled || blob_id->empty()) {
252 EstablishConnection();
253 *blob_id = m_BlobID;
254 }
255 }
256
~CNetCacheWriter()257 CNetCacheWriter::~CNetCacheWriter()
258 {
259 try {
260 Close();
261 } NCBI_CATCH_ALL_X(11, "Exception in ~CNetCacheWriter() [IGNORED]");
262 }
263
Write(const void * buf,size_t count,size_t * bytes_written_ptr)264 ERW_Result CNetCacheWriter::Write(const void* buf,
265 size_t count,
266 size_t* bytes_written_ptr)
267 {
268 if (m_CachingEnabled) {
269 size_t bytes_written = m_CacheFile.Write(buf, count);
270 if (bytes_written_ptr != NULL)
271 *bytes_written_ptr = bytes_written;
272 } else if (IsConnectionOpen())
273 Transmit(buf, count, bytes_written_ptr);
274 else
275 return eRW_Error;
276
277 return eRW_Success;
278 }
279
Flush(void)280 ERW_Result CNetCacheWriter::Flush(void)
281 {
282 if (!m_CachingEnabled && IsConnectionOpen())
283 m_TransmissionWriter->Flush();
284
285 return eRW_Success;
286 }
287
Close()288 void CNetCacheWriter::Close()
289 {
290 if (m_CachingEnabled) {
291 m_CacheFile.Flush();
292
293 bool blob_written = false;
294
295 if (IsConnectionOpen()) {
296 try {
297 UploadCacheFile();
298 blob_written = true;
299 }
300 catch (CNetServiceException&) {
301 }
302 }
303
304 if (!blob_written) {
305 EstablishConnection();
306
307 UploadCacheFile();
308 }
309 }
310
311 if (!IsConnectionOpen())
312 return;
313
314 ERW_Result res = m_TransmissionWriter->Close();
315
316 if (res != eRW_Success) {
317 AbortConnection();
318 if (res == eRW_Timeout) {
319 CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
320 m_Connection->m_Server,
321 "Timeout while sending EOF packet");
322 } else {
323 CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
324 m_Connection->m_Server,
325 "IO error while sending EOF packet");
326 }
327 }
328
329 m_Connection->m_Socket.SetCork(false);
330
331 if (m_ResponseType == eNetCache_Wait) {
332 try {
333 string dummy;
334 m_Connection->ReadCmdOutputLine(dummy, false);
335 }
336 catch (...) {
337 AbortConnection();
338 throw;
339 }
340 }
341
342 ResetWriters();
343
344 m_Connection = NULL;
345 }
346
Abort()347 void CNetCacheWriter::Abort()
348 {
349 if (IsConnectionOpen())
350 AbortConnection();
351 }
352
WriteBufferAndClose(const char * buf_ptr,size_t buf_size)353 void CNetCacheWriter::WriteBufferAndClose(const char* buf_ptr, size_t buf_size)
354 {
355 size_t bytes_written;
356
357 while (buf_size > 0) {
358 if (Write(buf_ptr, buf_size, &bytes_written) != eRW_Success) {
359 NCBI_THROW(CNetServiceException, eCommunicationError, "Unknown error");
360 }
361
362 buf_ptr += bytes_written;
363 buf_size -= bytes_written;
364 }
365
366 Close();
367 }
368
ResetWriters()369 void CNetCacheWriter::ResetWriters()
370 {
371 try {
372 m_TransmissionWriter.reset();
373 m_SocketReaderWriter.reset();
374 }
375 catch (...) {
376 }
377 }
378
AbortConnection()379 void CNetCacheWriter::AbortConnection()
380 {
381 m_TransmissionWriter->SetSendEof(CTransmissionWriter::eDontSendEofPacket);
382
383 ResetWriters();
384
385 if (m_Connection->m_Socket.GetStatus(eIO_Open) != eIO_Closed)
386 m_Connection->Abort();
387
388 m_Connection = NULL;
389 }
390
TransmitImpl(const char * buf,size_t count)391 EIO_Status CNetCacheWriter::TransmitImpl(const char* buf, size_t count)
392 {
393 const STimeout timeout =
394 m_NetCacheAPI->m_Service->m_ServerPool.GetCommunicationTimeout();
395 CDeadline deadline(g_STimeoutToCTimeout(&timeout));
396
397 vector<CSocketAPI::SPoll> poll(1,
398 CSocketAPI::SPoll(&m_Connection->m_Socket, eIO_ReadWrite));
399 EIO_Event& in = poll[0].m_Event;
400 EIO_Event& out = poll[0].m_REvent;
401 EIO_Status stat = eIO_Success;
402 ERW_Result res = eRW_Success;
403
404 for (;;) {
405 const CNanoTimeout remaining = deadline.GetRemainingTime();
406 STimeout wait;
407 stat = CSocketAPI::Poll(poll, g_CTimeoutToSTimeout(remaining, wait));
408
409 if (stat != eIO_Interrupt) {
410 if (stat != eIO_Success) {
411 break;
412 }
413
414 if (out == eIO_Close) {
415 stat = eIO_Closed;
416 break;
417 }
418
419 if (out & eIO_Read) {
420 string msg;
421
422 if (m_Connection->m_Socket.ReadLine(msg) != eIO_Closed) {
423 if (!msg.empty()) {
424 if (msg.find("ERR:") == 0) {
425 msg.erase(0, 4);
426 msg = NStr::ParseEscapes(msg);
427 }
428
429 CONNSERV_THROW_FMT(CNetCacheException, eServerError,
430 m_Connection->m_Server, msg);
431 }
432 }
433 }
434
435 // If we have done writing everything or due to some error
436 if (in == eIO_Read) {
437 break;
438 }
439
440 if (out & eIO_Write) {
441 size_t written = 0;
442 res = m_TransmissionWriter->Write(buf, count, &written);
443
444 if (res == eRW_Success) {
445 buf += written;
446 count -= written;
447 }
448
449 // If there is an error on writing or we have done,
450 // do a non-waiting check for incoming messages
451 if (res != eRW_Success || !count) {
452 in = eIO_Read;
453 deadline = CDeadline(0, 0);
454 } else {
455 deadline = g_STimeoutToCTimeout(&timeout);
456 }
457 }
458 }
459 }
460
461 // If we have not done writing yet
462 if (in != eIO_Read) {
463 return stat;
464 }
465
466 if (res == eRW_Success) {
467 return eIO_Success;
468 }
469
470 CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
471 m_Connection->m_Server, g_RW_ResultToString(res));
472 }
473
Transmit(const void * buf,size_t count,size_t * bytes_written)474 void CNetCacheWriter::Transmit(const void* buf,
475 size_t count, size_t* bytes_written)
476 {
477 try {
478 switch (TransmitImpl(static_cast<const char*>(buf), count))
479 {
480 case eIO_Closed:
481 CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
482 m_Connection->m_Server,
483 "Server closed communication channel (timeout?)");
484
485 case eIO_Timeout:
486 CONNSERV_THROW_FMT(CNetServiceException, eTimeout,
487 m_Connection->m_Server,
488 "Timeout while writing blob contents");
489
490 case eIO_InvalidArg:
491 case eIO_NotSupported:
492 _TROUBLE;
493 /* FALL THROUGH if not DEBUG */
494
495 case eIO_Unknown:
496 CONNSERV_THROW_FMT(CNetServiceException, eCommunicationError,
497 m_Connection->m_Server,
498 "Unknown error");
499
500 default:
501 if (bytes_written) *bytes_written = count;
502 return;
503 }
504 }
505 catch (...) {
506 AbortConnection();
507 throw;
508 }
509 }
510
EstablishConnection()511 void CNetCacheWriter::EstablishConnection()
512 {
513 ResetWriters();
514
515 m_Connection = m_NetCacheAPI->InitiateWriteCmd(this, m_Parameters);
516
517 m_Connection->m_Socket.SetCork(true);
518
519 m_SocketReaderWriter.reset(
520 new CSocketReaderWriter(&m_Connection->m_Socket, eNoOwnership,
521 eIO_WritePlain));
522
523 m_TransmissionWriter.reset(
524 new CTransmissionWriter(m_SocketReaderWriter.get(),
525 eNoOwnership, CTransmissionWriter::eSendEofPacket));
526 }
527
UploadCacheFile()528 void CNetCacheWriter::UploadCacheFile()
529 {
530 char buf[CACHE_XFER_BUFFER_SIZE];
531 size_t bytes_read;
532 size_t bytes_written;
533
534 m_CacheFile.SetFilePos(0);
535 while ((bytes_read = m_CacheFile.Read(buf, sizeof(buf))) > 0)
536 Transmit(buf, bytes_read, &bytes_written);
537 }
538
539 END_NCBI_SCOPE
540