1 #ifndef CASSBLOBOP__HPP
2 #define CASSBLOBOP__HPP
3 
4 /*  $Id: cass_blob_op.hpp 634615 2021-07-15 17:14:29Z ivanov $
5  * ===========================================================================
6  *
7  *                            PUBLIC DOMAIN NOTICE
8  *               National Center for Biotechnology Information
9  *
10  *  This software/database is a "United States Government Work" under the
11  *  terms of the United States Copyright Act.  It was written as part of
12  *  the author's official duties as a United States Government employee and
13  *  thus cannot be copyrighted.  This software/database is freely available
14  *  to the public for use. The National Library of Medicine and the U.S.
15  *  Government have not placed any restriction on its use or reproduction.
16  *
17  *  Although all reasonable efforts have been taken to ensure the accuracy
18  *  and reliability of the software and data, the NLM and the U.S.
19  *  Government do not and cannot warrant the performance or results that
20  *  may be obtained by using this software or data. The NLM and the U.S.
21  *  Government disclaim all warranties, express or implied, including
22  *  warranties of performance, merchantability or fitness for any particular
23  *  purpose.
24  *
25  *  Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Dmitri Dmitrienko
30  *
31  * File Description:
32  *
33  *  BigData application layer
34  *
35  */
36 
37 #include <corelib/request_status.hpp>
38 #include <corelib/ncbidiag.hpp>
39 
40 #include <atomic>
41 #include <vector>
42 #include <utility>
43 #include <string>
44 #include <memory>
45 
46 #include "cass_driver.hpp"
47 #include "cass_exception.hpp"
48 #include "Key.hpp"
49 #include "IdCassScope.hpp"
50 #include "cass_util.hpp"
51 #include "blob_record.hpp"
52 #include "nannot/record.hpp"
53 #include "id2_split/record.hpp"
54 #include "acc_ver_hist/record.hpp"
55 
56 BEGIN_IDBLOB_SCOPE
57 USING_NCBI_SCOPE;
58 
59 class CCassBlobWaiter;
60 class CCassBlobTaskLoadBlob;
61 
62 enum ECassTristate {
63     eUnknown,
64     eTrue,
65     eFalse
66 };
67 
68 using TBlobChunkCallback = function<void(const unsigned char * data, unsigned int size, int chunk_no)>;
69 using TPropsCallback     = function<void(const SBlobStat& stat, bool isFound)>;
70 using TDataErrorCallback = function<void(
71     CRequestStatus::ECode status,
72     int code,
73     EDiagSev severity,
74     const string & message
75 )>;
76 using TDataReadyCallback = void(*)(void*);
77 
78 class CCassBlobWaiter
79 {
80  public:
81     CCassBlobWaiter(const CCassBlobWaiter&) = delete;
82     CCassBlobWaiter& operator=(const CCassBlobWaiter&) = delete;
83     CCassBlobWaiter(CCassBlobWaiter&&) = default;
84     CCassBlobWaiter& operator=(CCassBlobWaiter&&) = default;
85 
CCassBlobWaiter(unsigned int op_timeout_ms,shared_ptr<CCassConnection> conn,const string & keyspace,int32_t key,bool async,unsigned int max_retries,TDataErrorCallback error_cb)86     CCassBlobWaiter(
87         unsigned int op_timeout_ms,
88         shared_ptr<CCassConnection> conn,
89         const string & keyspace,
90         int32_t key,
91         bool async,
92         unsigned int max_retries,
93         TDataErrorCallback error_cb
94     )
95       : m_ErrorCb(move(error_cb))
96       ,  m_Conn(conn)
97       ,  m_Keyspace(keyspace)
98       ,  m_Key(key)
99       ,  m_State(eInit)
100       ,  m_OpTimeoutMs(op_timeout_ms)
101       ,  m_LastActivityMs(gettime() / 1000L)
102       ,  m_MaxRetries(max_retries)  // 0 means no limit in auto-restart count,
103                                     // any other positive value is the limit,
104                                     // 1 means no 2nd start -> no re-starts at all
105       ,  m_Async(async)
106       ,  m_Cancelled(false)
107     {
108     }
109 
~CCassBlobWaiter()110     virtual ~CCassBlobWaiter()
111     {
112         CloseAll();
113     }
114 
Cancelled() const115     bool Cancelled() const
116     {
117         return m_Cancelled;
118     }
119 
Cancel(void)120     virtual void Cancel(void)
121     {
122         if (m_State != eDone) {
123             m_Cancelled = true;
124             m_State = eError;
125         }
126     }
127 
Wait(void)128     bool Wait(void)
129     {
130         while (m_State != eDone && m_State != eError && !m_Cancelled) {
131             try {
132                 Wait1();
133             } catch (const CCassandraException& e) {
134                 // We will not re-throw here as CassandraException is not fatal
135                 Error(CRequestStatus::e500_InternalServerError, e.GetErrCode(), eDiag_Error, e.what());
136             } catch (const exception& e) {
137                 // See ID-6241 There is a requirement to catch all exceptions and continue here
138                 Error(CRequestStatus::e500_InternalServerError, CCassandraException::eUnknown, eDiag_Error, e.what());
139             } catch (...) {
140                 // See ID-6241 There is a requirement to catch all exceptions and continue here
141                 Error(CRequestStatus::e500_InternalServerError, CCassandraException::eUnknown, eDiag_Error, "Unknown exception");
142             }
143             if (m_Async) {
144                 break;
145             }
146         }
147         return (m_State == eDone || m_State == eError || m_Cancelled);
148     }
149 
HasError(void) const150     bool HasError(void) const
151     {
152         return !m_LastError.empty();
153     }
154 
LastError(void) const155     string LastError(void) const
156     {
157         return m_LastError;
158     }
159 
ClearError(void)160     void ClearError(void)
161     {
162         m_LastError.clear();
163     }
164 
GetKeySpace(void) const165     string GetKeySpace(void) const
166     {
167         return m_Keyspace;
168     }
169 
SetKeySpace(string const & keyspace)170     void SetKeySpace(string const & keyspace) {
171         if (m_State != eInit) {
172             NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobWaiter: Cannot change keyspace for started task");
173         }
174         m_Keyspace = keyspace;
175     }
176 
key(void) const177     int32_t key(void) const
178     {
179         return m_Key;
180     }
181 
SetErrorCB(TDataErrorCallback error_cb)182     void SetErrorCB(TDataErrorCallback error_cb)
183     {
184         m_ErrorCb = std::move(error_cb);
185     }
186 
SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3)187     void SetDataReadyCB3(shared_ptr<CCassDataCallbackReceiver> datareadycb3)
188     {
189         m_DataReadyCb3 = datareadycb3;
190     }
191 
192  protected:
193     enum EBlobWaiterState {
194         eInit = 0,
195         eDone = 10000,
196         eError = -1
197     };
198     struct SQueryRec {
199         shared_ptr<CCassQuery> query;
200         unsigned int restart_count;
201     };
202 
CloseAll(void)203     void CloseAll(void)
204     {
205         for (auto & it : m_QueryArr) {
206             it.query->Close();
207             it.restart_count = 0;
208         }
209     }
210 
SetupQueryCB3(shared_ptr<CCassQuery> & query)211     void SetupQueryCB3(shared_ptr<CCassQuery>& query)
212     {
213         auto DataReadyCb3 = m_DataReadyCb3.lock();
214         if (DataReadyCb3) {
215             query->SetOnData3(DataReadyCb3);
216         } else if (IsDataReadyCallbackExpired()) {
217             char msg[1024];
218             snprintf(msg, sizeof(msg), "Failed to setup data ready callback (expired)");
219             Error(CRequestStatus::e502_BadGateway, CCassandraException::eUnknown, eDiag_Error, msg);
220         }
221     }
222 
223     // Returns true for expired non empty weak pointers
IsDataReadyCallbackExpired() const224     bool IsDataReadyCallbackExpired() const
225     {
226         using wt = weak_ptr<CCassDataCallbackReceiver>;
227         return m_DataReadyCb3.owner_before(wt{}) || wt{}.owner_before(m_DataReadyCb3);
228     }
229 
Error(CRequestStatus::ECode status,int code,EDiagSev severity,const string & message)230     void Error(CRequestStatus::ECode  status,
231                int  code,
232                EDiagSev  severity,
233                const string &  message)
234     {
235         assert(m_ErrorCb != nullptr);
236         m_State = eError;
237         m_LastError = message;
238         m_ErrorCb(status, code, severity, message);
239     }
240 
IsTimedOut(void) const241     bool IsTimedOut(void) const
242     {
243         if (m_OpTimeoutMs > 0)
244             return ((gettime() / 1000L - m_LastActivityMs) >
245                     m_OpTimeoutMs);
246         return false;
247     }
248 
CanRestart(shared_ptr<CCassQuery> qry,unsigned int restart_count) const249     bool CanRestart(shared_ptr<CCassQuery> qry, unsigned int restart_count) const
250     {
251         bool    is_timedout = IsTimedOut();
252         bool    is_out_of_retries = (m_MaxRetries > 0) &&
253                                     (restart_count >= m_MaxRetries - 1);
254 
255         ERR_POST(Info << "CanRestartQ? t/o=" << is_timedout <<
256                  ", o/r=" << is_out_of_retries <<
257                  ", last_active=" << m_LastActivityMs <<
258                  ", time=" << gettime() / 1000L <<
259                  ", timeout=" << m_OpTimeoutMs);
260         return !is_out_of_retries && !is_timedout && !m_Cancelled;
261     }
262 
CanRestart(SQueryRec & it) const263     bool CanRestart(SQueryRec& it) const
264     {
265         return CanRestart(it.query, it.restart_count);
266     }
267 
268     // This function is deprecated and will be removed after Dec-1 2020
AllParams(shared_ptr<CCassQuery> qry)269     NCBI_DEPRECATED static string AllParams(shared_ptr<CCassQuery> qry) {
270         string rv;
271         for (size_t i = 0; i < qry->ParamCount(); ++i) {
272             if (i > 0)
273                 rv.append(", ");
274             rv.append(qry->ParamAsStr(i));
275         }
276         return rv;
277     }
278 
CheckReady(shared_ptr<CCassQuery> qry,unsigned int restart_counter,bool & need_repeat)279     bool CheckReady(shared_ptr<CCassQuery> qry, unsigned int restart_counter, bool& need_repeat)
280     {
281         need_repeat = false;
282         try {
283             if (m_Async && qry->WaitAsync(0) == ar_wait) {
284                 return false;
285             }
286             return true;
287         } catch (const CCassandraException& e) {
288             if (
289                 (e.GetErrCode() == CCassandraException::eQueryTimeout
290                     || e.GetErrCode() == CCassandraException::eQueryFailedRestartable)
291                 && CanRestart(qry, restart_counter))
292             {
293                 need_repeat = true;
294             } else {
295                 Error(CRequestStatus::e502_BadGateway, e.GetErrCode(), eDiag_Error, e.what());
296             }
297         }
298 
299         return false;
300     }
301 
CheckReady(SQueryRec & it)302     bool CheckReady(SQueryRec& it)
303     {
304         bool need_restart = false;
305         bool rv = CheckReady(it.query, it.restart_count, need_restart);
306         if (!rv && need_restart) {
307             try {
308                 ERR_POST(Warning
309                     << "In-place restart (" + to_string(it.restart_count + 1) + "):\n"
310                     << it.query->GetSQL() << "\nParams:\n" << QueryParamsToStringForDebug(it.query));
311                 it.query->Restart();
312                 ++it.restart_count;
313             } catch (const exception& ex) {
314                 ERR_POST(NCBI_NS_NCBI::Error << "Failed to restart query (p2): " << ex.what());
315                 throw;
316             }
317         }
318         return rv;
319     }
320 
UpdateLastActivity(void)321     void UpdateLastActivity(void)
322     {
323         m_LastActivityMs = gettime() / 1000L;
324     }
325 
GetQueryConsistency(void)326     CassConsistency GetQueryConsistency(void)
327     {
328         return CASS_CONSISTENCY_LOCAL_QUORUM;
329     }
330 
331     bool CheckMaxActive(void);
332     virtual void Wait1(void) = 0;
333 
334     TDataErrorCallback              m_ErrorCb;
335     weak_ptr<CCassDataCallbackReceiver> m_DataReadyCb3;
336     shared_ptr<CCassConnection>     m_Conn;
337     string                          m_Keyspace;
338     int32_t                         m_Key;
339     atomic<int32_t>                 m_State;
340     unsigned int                    m_OpTimeoutMs;
341     int64_t                         m_LastActivityMs;
342     string                          m_LastError;
343     unsigned int                    m_MaxRetries;
344     bool                            m_Async;
345     atomic_bool                     m_Cancelled;
346 
347     vector<SQueryRec>  m_QueryArr;
348 
349  private:
350     string QueryParamsToStringForDebug(shared_ptr<CCassQuery> const& query) const;
351 };
352 
353 class CCassBlobLoader: public CCassBlobWaiter
354 {
355  public:
356     CCassBlobLoader(const CCassBlobLoader&) = delete;
357     CCassBlobLoader(CCassBlobLoader&&) = delete;
358     CCassBlobLoader& operator=(const CCassBlobLoader&) = delete;
359     CCassBlobLoader& operator=(CCassBlobLoader&&) = delete;
360 
361     CCassBlobLoader(
362         unsigned int op_timeout_ms,
363         shared_ptr<CCassConnection> conn,
364         const string & keyspace,
365         int32_t key,
366         bool async,
367         unsigned int max_retries,
368         TBlobChunkCallback data_chunk_cb,
369         TDataErrorCallback DataErrorCB
370     );
371 
372     void SetDataChunkCB(TBlobChunkCallback chunk_callback);
373     void SetDataReadyCB(TDataReadyCallback datareadycb, void * data);
374 
375     SBlobStat GetBlobStat(void) const;
376     uint64_t GetBlobSize(void) const;
377     int32_t GetTotalChunksInBlob(void) const;
SetPropsCallback(TPropsCallback prop_cb)378     void SetPropsCallback(TPropsCallback prop_cb)
379     {
380         m_PropsCallback = std::move(prop_cb);
381     }
382 
383  protected:
384     virtual void Wait1(void) override;
385 
386  private:
387     enum EBlobLoaderState {
388         eInit = CCassBlobWaiter::eInit,
389         eReadingEntity,
390         eReadingChunks,
391         eCheckingFlags,
392         eDone = CCassBlobWaiter::eDone,
393         eError = CCassBlobWaiter::eError
394     };
395 
396     void x_RequestFlags(shared_ptr<CCassQuery> qry, bool with_data);
397     void x_RequestChunk(shared_ptr<CCassQuery> qry, int local_id);
398     void x_RequestChunksAhead(void);
399 
400     void x_PrepareChunkRequests(void);
401     int x_GetReadyChunkNo(bool &  have_inactive);
402     bool x_AreAllChunksProcessed(void);
403     void x_MarkChunkProcessed(size_t  chunk_no);
404 
405     TDataReadyCallback  m_DataReadyCb;
406     void *              m_DataReadyData;
407 
408     TPropsCallback      m_PropsCallback;
409     bool                m_StatLoaded;
410     SBlobStat           m_BlobStat;
411     TBlobChunkCallback  m_DataCb;
412     int64_t             m_ExpectedSize;
413     int64_t             m_RemainingSize;
414     int32_t             m_LargeParts;
415 
416     // Support for sending chunks as soon as they are ready regardless of the
417     // order they are delivered from Cassandra
418     vector<bool> m_ProcessedChunks;
419 };
420 
421 
422 class CCassBlobOp: public enable_shared_from_this<CCassBlobOp>
423 {
424  public:
425     enum EBlopOpFlag {
426         eFlagOpOr,
427         eFlagOpAnd,
428         eFlagOpSet
429     };
430 
431  public:
432     CCassBlobOp(CCassBlobOp&&) = default;
433     CCassBlobOp& operator=(CCassBlobOp&&) = default;
434     CCassBlobOp(const CCassBlobOp&) = delete;
435     CCassBlobOp& operator=(const CCassBlobOp&) = delete;
436 
CCassBlobOp(shared_ptr<CCassConnection> conn)437     explicit CCassBlobOp(shared_ptr<CCassConnection> conn)
438         : m_Conn(conn)
439     {
440         m_Keyspace = m_Conn->Keyspace();
441     }
442 
~CCassBlobOp()443     virtual ~CCassBlobOp()
444     {
445         m_Conn = nullptr;
446     }
447 
448     NCBI_DEPRECATED void GetBlobChunkSize(unsigned int timeout_ms, int64_t * chunk_size);
449     void GetBlobChunkSize(unsigned int timeout_ms, const string & keyspace, int64_t * chunk_size);
SetKeyspace(const string & keyspace)450     void SetKeyspace(const string &  keyspace)
451     {
452         m_Keyspace = keyspace;
453     }
454 
GetKeyspace(void) const455     string GetKeyspace(void) const
456     {
457         return m_Keyspace;
458     }
459 
460     void GetBlob(unsigned int  op_timeout_ms,
461                  int32_t  key, unsigned int  max_retries,
462                  SBlobStat *  blob_stat, TBlobChunkCallback data_chunk_cb);
463     void GetBlobAsync(unsigned int  op_timeout_ms,
464                       int32_t  key, unsigned int  max_retries,
465                       TBlobChunkCallback data_chunk_cb,
466                       TDataErrorCallback error_cb,
467                       unique_ptr<CCassBlobWaiter> &  waiter);
468     void InsertBlobExtended(unsigned int  op_timeout_ms, unsigned int  max_retries,
469                          CBlobRecord *  blob_rslt, TDataErrorCallback  error_cb,
470                          unique_ptr<CCassBlobWaiter> &  waiter);
471     void InsertNAnnot(unsigned int  op_timeout_ms,
472                      int32_t  key, unsigned int  max_retries,
473                      CBlobRecord * blob, CNAnnotRecord * annot,
474                      TDataErrorCallback error_cb,
475                      unique_ptr<CCassBlobWaiter> & waiter);
476     void DeleteNAnnot(unsigned int op_timeout_ms,
477                      unsigned int max_retries,
478                      CNAnnotRecord * annot,
479                      TDataErrorCallback error_cb,
480                      unique_ptr<CCassBlobWaiter> & waiter);
481     void FetchNAnnot(unsigned int op_timeout_ms,
482         unsigned int max_retries,
483         const string & accession,
484         int16_t version,
485         int16_t seq_id_type,
486         const vector<string>& annot_names,
487         TNAnnotConsumeCallback consume_callback,
488         TDataErrorCallback error_cb,
489         unique_ptr<CCassBlobWaiter> & waiter
490     );
491     void FetchNAnnot(unsigned int op_timeout_ms,
492         unsigned int max_retries,
493         const string & accession,
494         int16_t version,
495         int16_t seq_id_type,
496         const vector<CTempString>& annot_names,
497         TNAnnotConsumeCallback consume_callback,
498         TDataErrorCallback error_cb,
499         unique_ptr<CCassBlobWaiter> & waiter
500     );
501     void FetchNAnnot(unsigned int op_timeout_ms,
502         unsigned int max_retries,
503         const string & accession,
504         int16_t version,
505         int16_t seq_id_type,
506         TNAnnotConsumeCallback consume_callback,
507         TDataErrorCallback error_cb,
508         unique_ptr<CCassBlobWaiter> & waiter
509     );
510 
511     void FetchAccVerHistory(
512         unsigned int op_timeout_ms,
513         unsigned int max_retries,
514         const string & accession,
515         TAccVerHistConsumeCallback consume_callback,
516         TDataErrorCallback error_cb,
517         unique_ptr<CCassBlobWaiter> & waiter,
518         int16_t version = 0,
519         int16_t seq_id_type = 0
520     );
521 
522 
523     void DeleteBlobExtended(unsigned int  op_timeout_ms,
524                          int32_t  key, unsigned int  max_retries,
525                          TDataErrorCallback error_cb,
526                          unique_ptr<CCassBlobWaiter> &  waiter);
527     void DeleteExpiredBlobVersion(unsigned int op_timeout_ms,
528                              int32_t key, CBlobRecord::TTimestamp last_modified,
529                              CBlobRecord::TTimestamp expiration,
530                              unsigned int  max_retries,
531                              TDataErrorCallback error_cb,
532                              unique_ptr<CCassBlobWaiter> & waiter);
533 
534     unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended(
535         unsigned int op_timeout_ms,
536         unsigned int max_retries,
537         CBlobRecord::TSatKey sat_key,
538         bool load_chunks,
539         TDataErrorCallback error_cb
540     );
541 
542     unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended(
543         unsigned int op_timeout_ms,
544         unsigned int max_retries,
545         CBlobRecord::TSatKey sat_key,
546         CBlobRecord::TTimestamp modified,
547         bool load_chunks,
548         TDataErrorCallback error_cb
549     );
550 
551     unique_ptr<CCassBlobTaskLoadBlob> GetBlobExtended(
552         unsigned int timeout_ms,
553         unsigned int max_retries,
554         unique_ptr<CBlobRecord> blob_record,
555         bool load_chunks,
556         TDataErrorCallback error_cb
557     );
558 
559     void UpdateBlobFlagsExtended(
560         unsigned int op_timeout_ms,
561         CBlobRecord::TSatKey key,
562         EBlobFlags flag,
563         bool set_flag
564     );
565 
566     void InsertID2Split(
567         unsigned int op_timeout_ms,
568         unsigned int max_retries,
569         CBlobRecord * blob, CID2SplitRecord* id2_split,
570         TDataErrorCallback error_cb,
571         unique_ptr<CCassBlobWaiter> & waiter);
572 
573     // @deprecated Use GetSetting with explicit domain parameter
574     NCBI_DEPRECATED bool GetSetting(unsigned int op_timeout_ms, const string & name, string & value);
575 
576     // @deprecated Use UpdateSetting with explicit domain parameter
577     NCBI_DEPRECATED void UpdateSetting(unsigned int op_timeout_ms, const string & name, const string & value);
578 
579     bool GetSetting(unsigned int op_timeout_ms, const string & domain, const string & name, string & value);
580     void UpdateSetting(unsigned int op_timeout_ms, const string & domain, const string & name, const string & value);
581 
GetConn(void)582     shared_ptr<CCassConnection> GetConn(void)
583     {
584         if (!m_Conn) {
585             NCBI_THROW(CCassandraException, eSeqFailed, "CCassBlobOp instance is not initialized with DB connection");
586         }
587         return m_Conn;
588     }
589 
590  private:
591     shared_ptr<CCassConnection> m_Conn;
592     string m_Keyspace;
593 };
594 
595 END_IDBLOB_SCOPE
596 
597 #endif
598