1 #ifndef CASSBLOBOP__HPP 2 #define CASSBLOBOP__HPP 3 4 /* $Id: cass_blob_op.hpp 634615 2021-07-15 17:14:29Z ivanov $ 5 * =========================================================================== 6 * 7 * PUBLIC DOMAIN NOTICE 8 * National Center for Biotechnology Information 9 * 10 * This software/database is a "United States Government Work" under the 11 * terms of the United States Copyright Act. It was written as part of 12 * the author's official duties as a United States Government employee and 13 * thus cannot be copyrighted. This software/database is freely available 14 * to the public for use. The National Library of Medicine and the U.S. 15 * Government have not placed any restriction on its use or reproduction. 16 * 17 * Although all reasonable efforts have been taken to ensure the accuracy 18 * and reliability of the software and data, the NLM and the U.S. 19 * Government do not and cannot warrant the performance or results that 20 * may be obtained by using this software or data. The NLM and the U.S. 21 * Government disclaim all warranties, express or implied, including 22 * warranties of performance, merchantability or fitness for any particular 23 * purpose. 24 * 25 * Please cite the author in any work or product based on this material. 26 * 27 * =========================================================================== 28 * 29 * Authors: Dmitri Dmitrienko 30 * 31 * File Description: 32 * 33 * BigData application layer 34 * 35 */ 36 37 #include <corelib/request_status.hpp> 38 #include <corelib/ncbidiag.hpp> 39 40 #include <atomic> 41 #include <vector> 42 #include <utility> 43 #include <string> 44 #include <memory> 45 46 #include "cass_driver.hpp" 47 #include "cass_exception.hpp" 48 #include "Key.hpp" 49 #include "IdCassScope.hpp" 50 #include "cass_util.hpp" 51 #include "blob_record.hpp" 52 #include "nannot/record.hpp" 53 #include "id2_split/record.hpp" 54 #include "acc_ver_hist/record.hpp" 55 56 BEGIN_IDBLOB_SCOPE 57 USING_NCBI_SCOPE; 58 59 class CCassBlobWaiter; 60 class CCassBlobTaskLoadBlob; 61 62 enum ECassTristate { 63 eUnknown, 64 eTrue, 65 eFalse 66 }; 67 68 using TBlobChunkCallback = function<void(const unsigned char * data, unsigned int size, int chunk_no)>; 69 using TPropsCallback = function<void(const SBlobStat& stat, bool isFound)>; 70 using TDataErrorCallback = function<void( 71 CRequestStatus::ECode status, 72 int code, 73 EDiagSev severity, 74 const string & message 75 )>; 76 using TDataReadyCallback = void(*)(void*); 77 78 class CCassBlobWaiter 79 { 80 public: 81 CCassBlobWaiter(const CCassBlobWaiter&) = delete; 82 CCassBlobWaiter& operator=(const CCassBlobWaiter&) = delete; 83 CCassBlobWaiter(CCassBlobWaiter&&) = default; 84 CCassBlobWaiter& operator=(CCassBlobWaiter&&) = default; 85 CCassBlobWaiter(unsigned int op_timeout_ms,shared_ptr<CCassConnection> conn,const string & keyspace,int32_t key,bool async,unsigned int max_retries,TDataErrorCallback error_cb)86 CCassBlobWaiter( 87 unsigned int op_timeout_ms, 88 shared_ptr<CCassConnection> conn, 89 const string & keyspace, 90 int32_t key, 91 bool async, 92 unsigned int max_retries, 93 TDataErrorCallback error_cb 94 ) 95 : m_ErrorCb(move(error_cb)) 96 , m_Conn(conn) 97 , m_Keyspace(keyspace) 98 , m_Key(key) 99 , m_State(eInit) 100 , m_OpTimeoutMs(op_timeout_ms) 101 , m_LastActivityMs(gettime() / 1000L) 102 , m_MaxRetries(max_retries) // 0 means no limit in auto-restart count, 103 // any other positive value is the limit, 104 // 1 means no 2nd start -> no re-starts at all 105 , m_Async(async) 106 , m_Cancelled(false) 107 { 108 } 109 ~CCassBlobWaiter()110 virtual ~CCassBlobWaiter() 111 { 112 CloseAll(); 113 } 114 Cancelled() const115 bool Cancelled() const 116 { 117 return m_Cancelled; 118 } 119 Cancel(void)120 virtual void Cancel(void) 121 { 122 if (m_State != eDone) { 123 m_Cancelled = true; 124 m_State = eError; 125 } 126 } 127 Wait(void)128 bool Wait(void) 129 { 130 while (m_State != eDone && m_State != eError && !m_Cancelled) { 131 try { 132 Wait1(); 133 } catch (const CCassandraException& e) { 134 // We will not re-throw here as CassandraException is not fatal 135 Error(CRequestStatus::e500_InternalServerError, e.GetErrCode(), eDiag_Error, e.what()); 136 } catch (const exception& e) { 137 // See ID-6241 There is a requirement to catch all exceptions and continue here 138 Error(CRequestStatus::e500_InternalServerError, CCassandraException::eUnknown, eDiag_Error, e.what()); 139 } catch (...) { 140 // See ID-6241 There is a requirement to catch all exceptions and continue here 141 Error(CRequestStatus::e500_InternalServerError, CCassandraException::eUnknown, eDiag_Error, "Unknown exception"); 142 } 143 if (m_Async) { 144 break; 145 } 146 } 147 return (m_State == eDone || m_State == eError || m_Cancelled); 148 } 149 HasError(void) const150 bool HasError(void) const 151 { 152 return !m_LastError.empty(); 153 } 154 LastError(void) const155 string LastError(void) const 156 { 157 return m_LastError; 158 } 159 ClearError(void)160 void ClearError(void) 161 { 162 m_LastError.clear(); 163 } 164 GetKeySpace(void) const165 string GetKeySpace(void) const 166 { 167 return m_Keyspace; 168 } 169 SetKeySpace(string const & keyspace)170 void SetKeySpace(string const & keyspace) { 171 if (m_State != eInit) { 172 NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobWaiter: Cannot change keyspace for started task"); 173 } 174 m_Keyspace = keyspace; 175 } 176 key(void) const177 int32_t key(void) const 178 { 179 return m_Key; 180 } 181 SetErrorCB(TDataErrorCallback error_cb)182 void SetErrorCB(TDataErrorCallback error_cb) 183 { 184 m_ErrorCb = std::move(error_cb); 185 } 186 SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3)187 void SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3) 188 { 189 m_DataReadyCb3 = datareadycb3; 190 } 191 192 protected: 193 enum EBlobWaiterState { 194 eInit = 0, 195 eDone = 10000, 196 eError = -1 197 }; 198 struct SQueryRec { 199 shared_ptr<CCassQuery> query; 200 unsigned int restart_count; 201 }; 202 CloseAll(void)203 void CloseAll(void) 204 { 205 for (auto & it : m_QueryArr) { 206 it.query->Close(); 207 it.restart_count = 0; 208 } 209 } 210 SetupQueryCB3(shared_ptr<CCassQuery> & query)211 void SetupQueryCB3(shared_ptr<CCassQuery>& query) 212 { 213 auto DataReadyCb3 = m_DataReadyCb3.lock(); 214 if (DataReadyCb3) { 215 query->SetOnData3(DataReadyCb3); 216 } else if (IsDataReadyCallbackExpired()) { 217 char msg[1024]; 218 snprintf(msg, sizeof(msg), "Failed to setup data ready callback (expired)"); 219 Error(CRequestStatus::e502_BadGateway, CCassandraException::eUnknown, eDiag_Error, msg); 220 } 221 } 222 223 // Returns true for expired non empty weak pointers IsDataReadyCallbackExpired() const224 bool IsDataReadyCallbackExpired() const 225 { 226 using wt = weak_ptr<CCassDataCallbackReceiver>; 227 return m_DataReadyCb3.owner_before(wt{}) || wt{}.owner_before(m_DataReadyCb3); 228 } 229 Error(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)230 void Error(CRequestStatus::ECode status, 231 int code, 232 EDiagSev severity, 233 const string & message) 234 { 235 assert(m_ErrorCb != nullptr); 236 m_State = eError; 237 m_LastError = message; 238 m_ErrorCb(status, code, severity, message); 239 } 240 IsTimedOut(void) const241 bool IsTimedOut(void) const 242 { 243 if (m_OpTimeoutMs > 0) 244 return ((gettime() / 1000L - m_LastActivityMs) > 245 m_OpTimeoutMs); 246 return false; 247 } 248 CanRestart(shared_ptr<CCassQuery> qry,unsigned int restart_count) const249 bool CanRestart(shared_ptr<CCassQuery> qry, unsigned int restart_count) const 250 { 251 bool is_timedout = IsTimedOut(); 252 bool is_out_of_retries = (m_MaxRetries > 0) && 253 (restart_count >= m_MaxRetries - 1); 254 255 ERR_POST(Info << "CanRestartQ? t/o=" << is_timedout << 256 ", o/r=" << is_out_of_retries << 257 ", last_active=" << m_LastActivityMs << 258 ", time=" << gettime() / 1000L << 259 ", timeout=" << m_OpTimeoutMs); 260 return !is_out_of_retries && !is_timedout && !m_Cancelled; 261 } 262 CanRestart(SQueryRec & it) const263 bool CanRestart(SQueryRec& it) const 264 { 265 return CanRestart(it.query, it.restart_count); 266 } 267 268 // This function is deprecated and will be removed after Dec-1 2020 AllParams(shared_ptr<CCassQuery> qry)269 NCBI_DEPRECATED static string AllParams(shared_ptr<CCassQuery> qry) { 270 string rv; 271 for (size_t i = 0; i < qry->ParamCount(); ++i) { 272 if (i > 0) 273 rv.append(", "); 274 rv.append(qry->ParamAsStr(i)); 275 } 276 return rv; 277 } 278 CheckReady(shared_ptr<CCassQuery> qry,unsigned int restart_counter,bool & need_repeat)279 bool CheckReady(shared_ptr<CCassQuery> qry, unsigned int restart_counter, bool& need_repeat) 280 { 281 need_repeat = false; 282 try { 283 if (m_Async && qry->WaitAsync(0) == ar_wait) { 284 return false; 285 } 286 return true; 287 } catch (const CCassandraException& e) { 288 if ( 289 (e.GetErrCode() == CCassandraException::eQueryTimeout 290 || e.GetErrCode() == CCassandraException::eQueryFailedRestartable) 291 && CanRestart(qry, restart_counter)) 292 { 293 need_repeat = true; 294 } else { 295 Error(CRequestStatus::e502_BadGateway, e.GetErrCode(), eDiag_Error, e.what()); 296 } 297 } 298 299 return false; 300 } 301 CheckReady(SQueryRec & it)302 bool CheckReady(SQueryRec& it) 303 { 304 bool need_restart = false; 305 bool rv = CheckReady(it.query, it.restart_count, need_restart); 306 if (!rv && need_restart) { 307 try { 308 ERR_POST(Warning 309 << "In-place restart (" + to_string(it.restart_count + 1) + "):\n" 310 << it.query->GetSQL() << "\nParams:\n" << QueryParamsToStringForDebug(it.query)); 311 it.query->Restart(); 312 ++it.restart_count; 313 } catch (const exception& ex) { 314 ERR_POST(NCBI_NS_NCBI::Error << "Failed to restart query (p2): " << ex.what()); 315 throw; 316 } 317 } 318 return rv; 319 } 320 UpdateLastActivity(void)321 void UpdateLastActivity(void) 322 { 323 m_LastActivityMs = gettime() / 1000L; 324 } 325 GetQueryConsistency(void)326 CassConsistency GetQueryConsistency(void) 327 { 328 return CASS_CONSISTENCY_LOCAL_QUORUM; 329 } 330 331 bool CheckMaxActive(void); 332 virtual void Wait1(void) = 0; 333 334 TDataErrorCallback m_ErrorCb; 335 weak_ptr<CCassDataCallbackReceiver> m_DataReadyCb3; 336 shared_ptr<CCassConnection> m_Conn; 337 string m_Keyspace; 338 int32_t m_Key; 339 atomic<int32_t> m_State; 340 unsigned int m_OpTimeoutMs; 341 int64_t m_LastActivityMs; 342 string m_LastError; 343 unsigned int m_MaxRetries; 344 bool m_Async; 345 atomic_bool m_Cancelled; 346 347 vector<SQueryRec> m_QueryArr; 348 349 private: 350 string QueryParamsToStringForDebug(shared_ptr<CCassQuery> const& query) const; 351 }; 352 353 class CCassBlobLoader: public CCassBlobWaiter 354 { 355 public: 356 CCassBlobLoader(const CCassBlobLoader&) = delete; 357 CCassBlobLoader(CCassBlobLoader&&) = delete; 358 CCassBlobLoader& operator=(const CCassBlobLoader&) = delete; 359 CCassBlobLoader& operator=(CCassBlobLoader&&) = delete; 360 361 CCassBlobLoader( 362 unsigned int op_timeout_ms, 363 shared_ptr<CCassConnection> conn, 364 const string & keyspace, 365 int32_t key, 366 bool async, 367 unsigned int max_retries, 368 TBlobChunkCallback data_chunk_cb, 369 TDataErrorCallback DataErrorCB 370 ); 371 372 void SetDataChunkCB(TBlobChunkCallback chunk_callback); 373 void SetDataReadyCB(TDataReadyCallback datareadycb, void * data); 374 375 SBlobStat GetBlobStat(void) const; 376 uint64_t GetBlobSize(void) const; 377 int32_t GetTotalChunksInBlob(void) const; SetPropsCallback(TPropsCallback prop_cb)378 void SetPropsCallback(TPropsCallback prop_cb) 379 { 380 m_PropsCallback = std::move(prop_cb); 381 } 382 383 protected: 384 virtual void Wait1(void) override; 385 386 private: 387 enum EBlobLoaderState { 388 eInit = CCassBlobWaiter::eInit, 389 eReadingEntity, 390 eReadingChunks, 391 eCheckingFlags, 392 eDone = CCassBlobWaiter::eDone, 393 eError = CCassBlobWaiter::eError 394 }; 395 396 void x_RequestFlags(shared_ptr<CCassQuery> qry, bool with_data); 397 void x_RequestChunk(shared_ptr<CCassQuery> qry, int local_id); 398 void x_RequestChunksAhead(void); 399 400 void x_PrepareChunkRequests(void); 401 int x_GetReadyChunkNo(bool & have_inactive); 402 bool x_AreAllChunksProcessed(void); 403 void x_MarkChunkProcessed(size_t chunk_no); 404 405 TDataReadyCallback m_DataReadyCb; 406 void * m_DataReadyData; 407 408 TPropsCallback m_PropsCallback; 409 bool m_StatLoaded; 410 SBlobStat m_BlobStat; 411 TBlobChunkCallback m_DataCb; 412 int64_t m_ExpectedSize; 413 int64_t m_RemainingSize; 414 int32_t m_LargeParts; 415 416 // Support for sending chunks as soon as they are ready regardless of the 417 // order they are delivered from Cassandra 418 vector<bool> m_ProcessedChunks; 419 }; 420 421 422 class CCassBlobOp: public enable_shared_from_this<CCassBlobOp> 423 { 424 public: 425 enum EBlopOpFlag { 426 eFlagOpOr, 427 eFlagOpAnd, 428 eFlagOpSet 429 }; 430 431 public: 432 CCassBlobOp(CCassBlobOp&&) = default; 433 CCassBlobOp& operator=(CCassBlobOp&&) = default; 434 CCassBlobOp(const CCassBlobOp&) = delete; 435 CCassBlobOp& operator=(const CCassBlobOp&) = delete; 436 CCassBlobOp(shared_ptr<CCassConnection> conn)437 explicit CCassBlobOp(shared_ptr<CCassConnection> conn) 438 : m_Conn(conn) 439 { 440 m_Keyspace = m_Conn->Keyspace(); 441 } 442 ~CCassBlobOp()443 virtual ~CCassBlobOp() 444 { 445 m_Conn = nullptr; 446 } 447 448 NCBI_DEPRECATED void GetBlobChunkSize(unsigned int timeout_ms, int64_t * chunk_size); 449 void GetBlobChunkSize(unsigned int timeout_ms, const string & keyspace, int64_t * chunk_size); SetKeyspace(const string & keyspace)450 void SetKeyspace(const string & keyspace) 451 { 452 m_Keyspace = keyspace; 453 } 454 GetKeyspace(void) const455 string GetKeyspace(void) const 456 { 457 return m_Keyspace; 458 } 459 460 void GetBlob(unsigned int op_timeout_ms, 461 int32_t key, unsigned int max_retries, 462 SBlobStat * blob_stat, TBlobChunkCallback data_chunk_cb); 463 void GetBlobAsync(unsigned int op_timeout_ms, 464 int32_t key, unsigned int max_retries, 465 TBlobChunkCallback data_chunk_cb, 466 TDataErrorCallback error_cb, 467 unique_ptr<CCassBlobWaiter> & waiter); 468 void InsertBlobExtended(unsigned int op_timeout_ms, unsigned int max_retries, 469 CBlobRecord * blob_rslt, TDataErrorCallback error_cb, 470 unique_ptr<CCassBlobWaiter> & waiter); 471 void InsertNAnnot(unsigned int op_timeout_ms, 472 int32_t key, unsigned int max_retries, 473 CBlobRecord * blob, CNAnnotRecord * annot, 474 TDataErrorCallback error_cb, 475 unique_ptr<CCassBlobWaiter> & waiter); 476 void DeleteNAnnot(unsigned int op_timeout_ms, 477 unsigned int max_retries, 478 CNAnnotRecord * annot, 479 TDataErrorCallback error_cb, 480 unique_ptr<CCassBlobWaiter> & waiter); 481 void FetchNAnnot(unsigned int op_timeout_ms, 482 unsigned int max_retries, 483 const string & accession, 484 int16_t version, 485 int16_t seq_id_type, 486 const vector<string>& annot_names, 487 TNAnnotConsumeCallback consume_callback, 488 TDataErrorCallback error_cb, 489 unique_ptr<CCassBlobWaiter> & waiter 490 ); 491 void FetchNAnnot(unsigned int op_timeout_ms, 492 unsigned int max_retries, 493 const string & accession, 494 int16_t version, 495 int16_t seq_id_type, 496 const vector<CTempString>& annot_names, 497 TNAnnotConsumeCallback consume_callback, 498 TDataErrorCallback error_cb, 499 unique_ptr<CCassBlobWaiter> & waiter 500 ); 501 void FetchNAnnot(unsigned int op_timeout_ms, 502 unsigned int max_retries, 503 const string & accession, 504 int16_t version, 505 int16_t seq_id_type, 506 TNAnnotConsumeCallback consume_callback, 507 TDataErrorCallback error_cb, 508 unique_ptr<CCassBlobWaiter> & waiter 509 ); 510 511 void FetchAccVerHistory( 512 unsigned int op_timeout_ms, 513 unsigned int max_retries, 514 const string & accession, 515 TAccVerHistConsumeCallback consume_callback, 516 TDataErrorCallback error_cb, 517 unique_ptr<CCassBlobWaiter> & waiter, 518 int16_t version = 0, 519 int16_t seq_id_type = 0 520 ); 521 522 523 void DeleteBlobExtended(unsigned int op_timeout_ms, 524 int32_t key, unsigned int max_retries, 525 TDataErrorCallback error_cb, 526 unique_ptr<CCassBlobWaiter> & waiter); 527 void DeleteExpiredBlobVersion(unsigned int op_timeout_ms, 528 int32_t key, CBlobRecord::TTimestamp last_modified, 529 CBlobRecord::TTimestamp expiration, 530 unsigned int max_retries, 531 TDataErrorCallback error_cb, 532 unique_ptr<CCassBlobWaiter> & waiter); 533 534 unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended( 535 unsigned int op_timeout_ms, 536 unsigned int max_retries, 537 CBlobRecord::TSatKey sat_key, 538 bool load_chunks, 539 TDataErrorCallback error_cb 540 ); 541 542 unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended( 543 unsigned int op_timeout_ms, 544 unsigned int max_retries, 545 CBlobRecord::TSatKey sat_key, 546 CBlobRecord::TTimestamp modified, 547 bool load_chunks, 548 TDataErrorCallback error_cb 549 ); 550 551 unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended( 552 unsigned int timeout_ms, 553 unsigned int max_retries, 554 unique_ptr<CBlobRecord> blob_record, 555 bool load_chunks, 556 TDataErrorCallback error_cb 557 ); 558 559 void UpdateBlobFlagsExtended( 560 unsigned int op_timeout_ms, 561 CBlobRecord::TSatKey key, 562 EBlobFlags flag, 563 bool set_flag 564 ); 565 566 void InsertID2Split( 567 unsigned int op_timeout_ms, 568 unsigned int max_retries, 569 CBlobRecord * blob, CID2SplitRecord* id2_split, 570 TDataErrorCallback error_cb, 571 unique_ptr<CCassBlobWaiter> & waiter); 572 573 // @deprecated Use GetSetting with explicit domain parameter 574 NCBI_DEPRECATED bool GetSetting(unsigned int op_timeout_ms, const string & name, string & value); 575 576 // @deprecated Use UpdateSetting with explicit domain parameter 577 NCBI_DEPRECATED void UpdateSetting(unsigned int op_timeout_ms, const string & name, const string & value); 578 579 bool GetSetting(unsigned int op_timeout_ms, const string & domain, const string & name, string & value); 580 void UpdateSetting(unsigned int op_timeout_ms, const string & domain, const string & name, const string & value); 581 GetConn(void)582 shared_ptr<CCassConnection> GetConn(void) 583 { 584 if (!m_Conn) { 585 NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobOp instance is not initialized with DB connection"); 586 } 587 return m_Conn; 588 } 589 590 private: 591 shared_ptr<CCassConnection> m_Conn; 592 string m_Keyspace; 593 }; 594 595 END_IDBLOB_SCOPE 596 597 #endif 598