1 /*  $Id: cass_driver.hpp 619524 2020-11-05 19:41:53Z saprykin $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  *  Wrapper classes around cassandra "C"-API
31  *
32  */
33 
34 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_
35 #define OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_
36 
37 #include <cassandra.h>
38 #include <corelib/ncbistre.hpp>
39 #include <corelib/ncbistr.hpp>
40 #include <corelib/ncbiexpt.hpp>
41 #include <corelib/ncbimtx.hpp>
42 
43 #include <atomic>
44 #include <cassert>
45 #include <functional>
46 #include <limits>
47 #include <memory>
48 #include <set>
49 #include <sstream>
50 #include <string>
51 #include <tuple>
52 #include <unordered_map>
53 #include <utility>
54 #include <vector>
55 
56 #include "IdCassScope.hpp"
57 #include "cass_exception.hpp"
58 
59 #include <objtools/pubseq_gateway/impl/cassandra/cass_conv.hpp>
60 
61 BEGIN_IDBLOB_SCOPE
62 USING_NCBI_SCOPE;
63 
64 #define CASS_PREPARED_Q 1
65 #define CASS_DRV_TIMEOUT_MS 2000U
66 #define CASS_ERROR_SERVER_CONSISTENCY_NOT_ACHIEVED 0xC40063C0
67 
68 #define ASYNC_RESULT_MAP(XX) \
69     XX(ar_wait, "Wait") \
70     XX(ar_done, "Done") \
71     XX(ar_dataready, "DataReady")
72 
73 typedef enum {
74     #define XX(name, _) name,
75     ASYNC_RESULT_MAP(XX)
76     #undef XX
77     ASYNC_RSLT_LAST_ENTRY
78 } async_rslt_t;
79 
80 
81 typedef enum {
82     LB_DCAWARE = 0,     // the default, per CPP driver comments on
83                         // cass_cluster_set_load_balance_dc_aware function
84     LB_ROUNDROBIN
85 } loadbalancing_policy_t;
86 
87 
88 typedef enum {
89     dtNull,
90     dtUnknown,
91     dtBoolean,
92     dtInteger,
93     dtFloat,
94     dtText,
95     dtTimestamp,
96     dtCounter,
97     dtUuid,
98     dtBlob,
99     dtNetwork,
100     dtCollection,
101     dtCustom
102 } CCassDataType;
103 
104 class CCassQuery;
105 class CCassQueryCbRef;
106 class CCassConnection;
107 class CCassDataCallbackReceiver;
108 
109 using TCassQueryOnDataCallback = void(*)(CCassQuery&, void *);
110 using TCassQueryOnExecuteCallback = void(*)(CCassQuery&, void *);
111 using TCassQueryOnData2Callback = void(*)(void *);
112 
113 
114 class CCassConnection: public std::enable_shared_from_this<CCassConnection>
115 {
116     // cass_cluster_set_request_timeout takes in ms while
117     // cass_future_wait_timed takes in mks
118     // to avoid overflow we have to adjust to mks
119     // ~35minutes
120     static const unsigned int kCassMaxTimeout = numeric_limits<unsigned int>::max() / 1000;
121 
122     using TPreparedList = unordered_map<string, const CassPrepared *>;
123 
124     friend class CCassQuery;
125     friend class CCassConnectionFactory;
126 
127     void CloseSession(void);
128 
129  protected:
130     CCassConnection();
131     const CassPrepared * Prepare(const string & sql);
132 
133  public:
134     using TTokenValue = int64_t;
135     using TTokenRanges = vector<pair<TTokenValue, TTokenValue>>;
136 
137     CCassConnection(const CCassConnection&) = delete;
138     CCassConnection& operator=(const CCassConnection&) = delete;
139 
140     static shared_ptr<CCassConnection> Create(void);
141     virtual ~CCassConnection(void);
142     void Connect(void);
143     void Reconnect(void);
144     void Close(void);
145 
146     bool IsConnected(void);
147     int64_t GetActiveStatements(void) const;
148     CassMetrics GetMetrics(void);
149     void SetConnProp(const string & host, const string & user, const string & pwd, int16_t port = 0);
150 
151     void SetLoadBalancing(loadbalancing_policy_t policy);
152     void SetTokenAware(bool value);
153     void SetLatencyAware(bool value);
154 
155     void SetTimeouts(unsigned int ConnTimeoutMs);
156     void SetTimeouts(unsigned int ConnTimeoutMs, unsigned int QryTimeoutMs);
157 
158     void SetFallBackRdConsistency(bool value);
159     bool GetFallBackRdConsistency(void) const;
160 
161     void SetFallBackWrConsistency(unsigned int  value);
162     unsigned int GetFallBackWrConsistency(void) const;
163 
164     static void SetLogging(EDiagSev  severity);
165     static void DisableLogging(void);
166     static void UpdateLogging(void);
167 
168     unsigned int QryTimeout(void) const;
169     unsigned int QryTimeoutMks(void) const;
170     void SetRtLimits(unsigned int numThreadsIo, unsigned int numConnPerHost, unsigned int maxConnPerHost);
171 
172     void SetKeepAlive(unsigned int keepalive);
173 
174     void SetKeyspace(const string & keyspace);
175     string Keyspace(void) const;
176 
177     void SetBlackList(const string & blacklist);
178 
179     shared_ptr<CCassQuery> NewQuery();
180     void GetTokenRanges(TTokenRanges &ranges);
181     // @deprecated
182     void getTokenRanges(TTokenRanges &ranges);
183     vector<string> GetPartitionKeyColumnNames(string const & keyspace, string const & table) const;
184     static string NewTimeUUID();
185     static void Perform(
186         unsigned int optimeoutms,
187         const std::function<bool()>& PreLoopCB,
188         const std::function<void(const CCassandraException&)>& DbExceptCB,
189         const std::function<bool(bool)>& OpCB
190     );
191 
192  private:
193     string                          m_host;
194     int16_t                         m_port;
195     string                          m_user;
196     string                          m_pwd;
197     string                          m_blacklist;
198     string                          m_keyspace;
199     CassCluster *                   m_cluster;
200     CassSession *                   m_session;
201     unsigned int                    m_ctimeoutms;
202     unsigned int                    m_qtimeoutms;
203     unsigned int                    m_last_query_cnt;
204     loadbalancing_policy_t          m_loadbalancing;
205     bool                            m_tokenaware;
206     bool                            m_latencyaware;
207     unsigned int                    m_numThreadsIo;
208     unsigned int                    m_numConnPerHost;
209     unsigned int                    m_maxConnPerHost;
210     unsigned int                    m_keepalive;
211     bool                            m_fallback_readconsistency;
212     unsigned int                    m_FallbackWriteConsistency;
213     static atomic<CassUuidGen*>     m_CassUuidGen;
214     CSpinLock                       m_prepared_mux;
215     TPreparedList                   m_prepared;
216     atomic<int64_t>                 m_active_statements;
217 
218     static bool                     m_LoggingInitialized;
219     static bool                     m_LoggingEnabled;
220     static EDiagSev                 m_LoggingLevel;
221 };
222 
223 class CCassPrm
224 {
225     static constexpr size_t kMaxVarcharDebugBytes = 10;
226  protected:
227     CassValueType   m_type;
228     bool            m_assigned;
229 
230     union simpleval_t {
231         int8_t  i8;
232         int16_t i16;
233         int32_t i32;
234         int64_t i64;
simpleval_t()235         simpleval_t() {
236             memset(this, 0, sizeof(union simpleval_t));
237         }
238     } m_simpleval;
239 
240     string          m_bytes;
241     unique_ptr<CassCollection, function<void(CassCollection*)> >    m_collection;
242     unique_ptr<CassTuple, function<void(CassTuple*)> >              m_tuple;
243 
244     void Bind(CassStatement *  statement, unsigned int  idx);
245 
246  public:
CCassPrm()247     CCassPrm() :
248         m_simpleval(), m_collection(nullptr, nullptr)
249     {
250         Clear();
251     }
252 
253     CCassPrm(CCassPrm&&) = default;
254 
~CCassPrm()255     ~CCassPrm()
256     {
257         Clear();
258     }
259 
Clear(void)260     void Clear(void)
261     {
262         m_type = CASS_VALUE_TYPE_UNKNOWN;
263         m_assigned = false;
264         m_collection = nullptr;
265         m_bytes.clear();
266     }
267 
Assign(int8_t v)268     void Assign(int8_t  v)
269     {
270         m_type = CASS_VALUE_TYPE_TINY_INT;
271         m_simpleval.i8 = v;
272         m_assigned = true;
273     }
274 
Assign(int16_t v)275     void Assign(int16_t  v)
276     {
277         m_type = CASS_VALUE_TYPE_SMALL_INT;
278         m_simpleval.i16 = v;
279         m_assigned = true;
280     }
281 
Assign(int32_t v)282     void Assign(int32_t  v)
283     {
284         m_type = CASS_VALUE_TYPE_INT;
285         m_simpleval.i32 = v;
286         m_assigned = true;
287     }
288 
Assign(int64_t v)289     void Assign(int64_t  v)
290     {
291         m_type = CASS_VALUE_TYPE_BIGINT;
292         m_simpleval.i64 = v;
293         m_assigned = true;
294     }
295 
296     template<typename T, typename = typename enable_if<is_constructible<string, T>::value>::type >
297     // string, string&, string&&, char *
Assign(T v)298     void Assign(T v) {
299         m_type = CASS_VALUE_TYPE_VARCHAR;
300         m_bytes = string(v);
301         m_assigned = true;
302     }
303 
Assign(const unsigned char * buf,size_t len)304     void Assign(const unsigned char *  buf, size_t  len)
305     {
306         m_type = CASS_VALUE_TYPE_BLOB;
307         m_bytes = string(reinterpret_cast<const char *>(buf), len);
308         m_assigned = true;
309     }
310 
311     template<typename K, typename V>
Assign(const map<K,V> & v)312     void Assign(const map<K, V> &  v)
313     {
314         m_type = CASS_VALUE_TYPE_MAP;
315         m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > (
316             cass_collection_new(CASS_COLLECTION_TYPE_MAP, v.size() * 2),
317             [](CassCollection* coll) {
318                 cass_collection_free(coll);
319             }
320         );
321 
322         for (const auto &  it : v) {
323             Convert::ValueToCassCollection(it.first, m_collection.get());
324             Convert::ValueToCassCollection(it.second, m_collection.get());
325         }
326         m_assigned = true;
327     }
328 
329     template<typename I>
AssignList(I begin,I end,size_t sz)330     void AssignList(I begin, I end, size_t sz)
331     {
332         m_type = CASS_VALUE_TYPE_LIST;
333         m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > (
334             cass_collection_new(CASS_COLLECTION_TYPE_LIST, sz),
335             [](CassCollection* coll) {
336                 cass_collection_free(coll);
337             }
338         );
339 
340         for (; begin != end; ++begin) {
341             Convert::ValueToCassCollection(*begin, m_collection.get());
342         }
343         m_assigned = true;
344     }
345 
346     template<typename I>
AssignSet(I begin,I end,size_t sz)347     void AssignSet(I begin, I end, size_t sz)
348     {
349         m_type = CASS_VALUE_TYPE_SET;
350         m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > (
351             cass_collection_new(CASS_COLLECTION_TYPE_SET, sz),
352             [](CassCollection* coll) {
353                 cass_collection_free(coll);
354             }
355         );
356 
357         for (; begin != end; ++begin) {
358             Convert::ValueToCassCollection(*begin, m_collection.get());
359         }
360         m_assigned = true;
361     }
362 
363     template<typename I>
AssignSet(const set<I> & v)364     void AssignSet(const set<I>& v) {
365         AssignSet<typename set<I>::const_iterator>(v.cbegin(), v.cend(), v.size());
366     }
367 
368     template<typename ...T>
Assign(const tuple<T...> & t)369     void Assign(const tuple<T...>& t)
370     {
371         m_type = CASS_VALUE_TYPE_TUPLE;
372         m_assigned = false;
373         m_tuple = unique_ptr<CassTuple, function<void(CassTuple*)> >(
374             cass_tuple_new(tuple_size< tuple<T...> >::value),
375             [](CassTuple* coll){
376                 cass_tuple_free(coll);
377             }
378         );
379         Convert::ValueToCassTuple(t, m_tuple.get());
380         m_assigned = true;
381     }
382 
AssignNull(void)383     void AssignNull(void)
384     {
385         m_type = CASS_VALUE_TYPE_UNKNOWN;
386         m_assigned = true;
387     }
388 
IsAssigned(void) const389     bool IsAssigned(void) const
390     {
391         return m_assigned;
392     }
393 
GetType() const394     CassValueType GetType() const
395     {
396         return m_type;
397     }
398 
AsInt8(void) const399     int32_t AsInt8(void) const
400     {
401         switch (m_type) {
402             case CASS_VALUE_TYPE_TINY_INT:
403                 return m_simpleval.i8;
404             case CASS_VALUE_TYPE_VARCHAR: {
405                 int8_t     i8;
406                 if (NStr::StringToNumeric(m_bytes, &i8))
407                     return i8;
408                 else
409                     RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int8 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
410             }
411             // no break
412             default:
413                 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int8 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
414         }
415     }
416 
AsInt16(void) const417     int32_t AsInt16(void) const
418     {
419         switch (m_type) {
420             case CASS_VALUE_TYPE_TINY_INT:
421                 return m_simpleval.i8;
422             case CASS_VALUE_TYPE_SMALL_INT:
423                 return m_simpleval.i16;
424             case CASS_VALUE_TYPE_VARCHAR: {
425                 int16_t     i16;
426                 if (NStr::StringToNumeric(m_bytes, &i16))
427                     return i16;
428                 else
429                     RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int16 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
430             }
431             // no break
432             default:
433                 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int16 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
434         }
435     }
436 
AsInt32(void) const437     int32_t AsInt32(void) const
438     {
439         switch (m_type) {
440             case CASS_VALUE_TYPE_TINY_INT:
441                 return m_simpleval.i8;
442             case CASS_VALUE_TYPE_SMALL_INT:
443                 return m_simpleval.i16;
444             case CASS_VALUE_TYPE_INT:
445                 return m_simpleval.i32;
446             case CASS_VALUE_TYPE_BIGINT:
447                 return m_simpleval.i64;
448             case CASS_VALUE_TYPE_VARCHAR: {
449                 int32_t     i32;
450                 if (NStr::StringToNumeric(m_bytes, &i32))
451                     return i32;
452                 else
453                     RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int32 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
454             }
455             // no break
456             default:
457                 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int32 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
458         }
459     }
460 
AsInt64(void) const461     int64_t AsInt64(void) const
462     {
463         switch (m_type) {
464             case CASS_VALUE_TYPE_TINY_INT:
465                 return m_simpleval.i8;
466             case CASS_VALUE_TYPE_SMALL_INT:
467                 return m_simpleval.i16;
468             case CASS_VALUE_TYPE_INT:
469                 return m_simpleval.i32;
470             case CASS_VALUE_TYPE_BIGINT:
471                 return m_simpleval.i64;
472             case CASS_VALUE_TYPE_VARCHAR: {
473                 int64_t     i64;
474                 if (NStr::StringToNumeric(m_bytes, &i64))
475                     return i64;
476                 else
477                     RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int64 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
478             }
479             // no break
480             default:
481                 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int64 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
482         }
483     }
484 
AsString(string & value) const485     void AsString(string & value) const
486     {
487         switch (m_type) {
488             case CASS_VALUE_TYPE_TINY_INT:
489                 value = NStr::NumericToString(m_simpleval.i8);
490                 break;
491             case CASS_VALUE_TYPE_SMALL_INT:
492                 value = NStr::NumericToString(m_simpleval.i16);
493                 break;
494             case CASS_VALUE_TYPE_INT:
495                 value = NStr::NumericToString(m_simpleval.i32);
496                 break;
497             case CASS_VALUE_TYPE_BIGINT:
498                 value = NStr::NumericToString(m_simpleval.i64);
499                 break;
500             case CASS_VALUE_TYPE_VARCHAR:
501                 value = m_bytes;
502                 break;
503             case CASS_VALUE_TYPE_SET:
504             default:
505                 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to string (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")");
506         }
507     }
508 
AsString() const509     string AsString() const
510     {
511         string rv;
512         AsString(rv);
513         return rv;
514     }
515 
AsStringForDebug() const516     string AsStringForDebug() const
517     {
518         switch (m_type) {
519             case CASS_VALUE_TYPE_TINY_INT:
520             case CASS_VALUE_TYPE_SMALL_INT:
521             case CASS_VALUE_TYPE_INT:
522             case CASS_VALUE_TYPE_BIGINT:
523                 return AsString();
524             case CASS_VALUE_TYPE_SET:
525                 return "set(...)";
526             case CASS_VALUE_TYPE_MAP:
527                 return "map(...)";
528             case CASS_VALUE_TYPE_LIST:
529                 return "list(...)";
530             case CASS_VALUE_TYPE_TUPLE:
531                 return "tuple(...)";
532             case CASS_VALUE_TYPE_UNKNOWN:
533                 return "Null";
534             case CASS_VALUE_TYPE_VARCHAR:
535                 if (m_bytes.size() > kMaxVarcharDebugBytes) {
536                     return "'" + m_bytes.substr(0, kMaxVarcharDebugBytes) + "...'";
537                 } else {
538                     return "'" + m_bytes + "'";
539                 }
540             case CASS_VALUE_TYPE_BLOB: {
541                 stringstream s;
542                 s << "'0x";
543                 for (size_t i = 0; i < m_bytes.size() && i < kMaxVarcharDebugBytes; ++i) {
544                     s << hex << setfill('0') << setw(2) << static_cast<int16_t>(m_bytes[i]);
545                 }
546                 if (m_bytes.size() > kMaxVarcharDebugBytes) {
547                     s << "...'";
548                 } else {
549                     s << "'";
550                 }
551                 return s.str();
552             }
553             default:
554                 return "AsStringForDebugNotImplemented(t:" + to_string(m_type) + ")";
555         }
556     }
557 
558     friend class CCassQuery;
559 };
560 
561 
562 class CCassParams: public vector<CCassPrm>
563 {};
564 
565 class CCassDataCallbackReceiver {
566 public:
567     virtual ~CCassDataCallbackReceiver() = default;
568     virtual void OnData() = 0;
569 };
570 
571 class CCassQueryCbRef: public std::enable_shared_from_this<CCassQueryCbRef>
572 {
573  public:
CCassQueryCbRef(shared_ptr<CCassQuery> query)574     explicit CCassQueryCbRef(shared_ptr<CCassQuery> query)
575         : m_ondata(nullptr)
576         , m_data(nullptr)
577         , m_ondata2(nullptr)
578         , m_data2(nullptr)
579         , m_query(query)
580     {
581     }
582 
~CCassQueryCbRef()583     virtual ~CCassQueryCbRef()
584     {
585     }
586 
Attach(void (* ondata)(CCassQuery &,void *),void * data,void (* ondata2)(void *),void * data2,shared_ptr<CCassDataCallbackReceiver> ondata3)587     void Attach(void (*ondata)(CCassQuery&, void*),
588                 void* data, void (*ondata2)(void*), void* data2,
589                 shared_ptr<CCassDataCallbackReceiver> ondata3)
590     {
591         m_self = shared_from_this();
592         m_data = data;
593         m_ondata = ondata;
594 
595         m_data2 = data2;
596         m_ondata2 = ondata2;
597 
598         m_ondata3 = ondata3;
599     }
600 
Detach(bool remove_self_ref=false)601     void Detach(bool remove_self_ref = false)
602     {
603         if (remove_self_ref) {
604             m_self = nullptr;
605         }
606     }
607 
s_OnFutureCb(CassFuture * future,void * data)608     static void s_OnFutureCb(CassFuture* future, void* data)
609     {
610         try {
611             shared_ptr<CCassQueryCbRef> self(static_cast<CCassQueryCbRef*>(data)->shared_from_this());
612             assert(self->m_self);
613             self->m_self = nullptr;
614 
615             auto    query = self->m_query.lock();
616             if (query != nullptr) {
617                 if (self->m_ondata) {
618                     self->m_ondata(*query.get(), self->m_data);
619                 }
620                 if (self->m_ondata2) {
621                     self->m_ondata2(self->m_data2);
622                 }
623                 auto ondata3 = self->m_ondata3.lock();
624                 if (ondata3) {
625                     ondata3->OnData();
626                 }
627             }
628         } catch (const CException& e) {
629             ERR_POST("CException caught! Message: " << e.GetMsg());
630             abort();
631         }
632         catch (const exception& e) {
633             ERR_POST("exception caught! Message: " << e.what());
634             abort();
635         }
636     }
637 
638  private:
639     TCassQueryOnDataCallback        m_ondata;
640     void*                           m_data;
641 
642     TCassQueryOnData2Callback       m_ondata2;
643     void*                           m_data2;
644 
645     weak_ptr<CCassDataCallbackReceiver> m_ondata3;
646 
647     weak_ptr<CCassQuery>            m_query;
648     shared_ptr<CCassQueryCbRef>     m_self;
649 };
650 
651 class CCassQuery: public std::enable_shared_from_this<CCassQuery>
652 {
653  private:
654     CCassQuery(const CCassQuery&) = delete;
655     CCassQuery& operator=(const CCassQuery&) = delete;
656 
657     friend class CCassConnection;
658     friend class CCassQueryCbRef;
659 
660  private:
CheckParamAssigned(int iprm) const661     void CheckParamAssigned(int iprm) const
662     {
663         if (iprm < 0 || iprm >= static_cast<int64_t>(m_params.size())) {
664             RAISE_DB_ERROR(eBindFailed, "Param index #" + to_string(iprm) + " is out of range");
665         }
666         if (!m_params[iprm].IsAssigned()) {
667             RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Param #" + to_string(iprm) + " is not assigned");
668         }
669     }
670 
CheckParamExists(int iprm) const671     void CheckParamExists(int iprm) const
672     {
673         if (iprm < 0 || (unsigned int)iprm >= m_params.size()) {
674             RAISE_DB_ERROR(eBindFailed, "Param index #" + to_string(iprm) + " is out of range");
675         }
676     }
677 
678     shared_ptr<CCassConnection>     m_connection;
679     unsigned int                    m_qtimeoutms;
680     int64_t                         m_futuretime;
681     CassFuture *                    m_future;
682     CassBatch *                     m_batch;
683     CassStatement *                 m_statement;
684     const CassResult *              m_result;
685     CassIterator *                  m_iterator;
686     const CassRow *                 m_row;
687     unsigned int                    m_page_size;
688     bool                            m_EOF;
689     bool                            m_page_start;
690     CCassParams                     m_params;
691     string                          m_sql;
692     bool                            m_results_expected;
693     bool                            m_async;
694     bool                            m_allow_prepare;
695     bool                            m_is_prepared;
696     CassConsistency                 m_serial_consistency;
697 
698     shared_ptr<CCassQueryCbRef>     m_cb_ref;
699 
700     TCassQueryOnDataCallback        m_ondata;
701     void*                           m_ondata_data;
702 
703     TCassQueryOnData2Callback       m_ondata2;
704     void*                           m_ondata2_data;
705 
706     weak_ptr<CCassDataCallbackReceiver> m_ondata3;
707 
708     TCassQueryOnExecuteCallback     m_onexecute;
709     void*                           m_onexecute_data;
710 
711     async_rslt_t Wait(unsigned int timeoutmks);
712     void Bind(void);
713 
714     template<typename F>
GetColumn(F ifld) const715     const CassValue* GetColumn(F ifld) const
716     {
717         static_assert(sizeof(F) == 0, "Columns can be accessed by either index or name");
718         return CNotImplemented<F>::GetColumn_works_by_name_and_by_index();
719     }
720 
721     template<typename F>
GetColumnDef(F ifld) const722     string GetColumnDef(F ifld) const
723     {
724         static_assert(sizeof(F) == 0, "Columns can be accessed by either index or name");
725         return CNotImplemented<F>::GetColumn_works_by_name_and_by_index();
726     }
727 
728     void GetFuture();
729     void ProcessFutureResult();
730     void SetupOnDataCallback();
731     void InternalClose(bool closebatch);
732     void SetEOF(bool Value);
733 
734     template<class T>
735     class CNotImplemented
736     {};
737 
738  protected:
CCassQuery(const shared_ptr<CCassConnection> & connection)739     explicit CCassQuery(const shared_ptr<CCassConnection>& connection) :
740         m_connection(connection),
741         m_qtimeoutms(0),
742         m_futuretime(0),
743         m_future(nullptr),
744         m_batch(nullptr),
745         m_statement(nullptr),
746         m_result(nullptr),
747         m_iterator(nullptr),
748         m_row(nullptr),
749         m_page_size(0),
750         m_EOF(false),
751         m_page_start(false),
752         m_results_expected(false),
753         m_async(false),
754         m_allow_prepare(CASS_PREPARED_Q),
755         m_is_prepared(false),
756         m_serial_consistency(CASS_CONSISTENCY_ANY),
757         m_ondata(nullptr),
758         m_ondata_data(nullptr),
759         m_ondata2(nullptr),
760         m_ondata2_data(nullptr),
761         m_onexecute(nullptr),
762         m_onexecute_data(nullptr)
763     {
764         if (!m_connection) {
765             RAISE_DB_ERROR(eFatal, "Cassandra connection is not established");
766         }
767     }
768 
769  public:
770     virtual ~CCassQuery();
771     virtual void Close(void);
772 
773     void SetTimeout();
774     void SetTimeout(unsigned int t);
775 
776     unsigned int Timeout(void) const;
777 
778     bool IsReady(void);
779     void NewBatch(void);
780     async_rslt_t RunBatch();
781 
782     void SetSQL(const string& sql, unsigned int PrmCount);
783 
784     /* returns resultset */
785     void Query(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM,
786                bool run_async = false, bool allow_prepare = CASS_PREPARED_Q,
787                unsigned int page_size = DEFAULT_PAGE_SIZE);
788 
789     void RestartQuery(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM);
790 
791     /* returns no resultset */
792     void Execute(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM,
793                  bool run_async = false, bool allow_prepare = CASS_PREPARED_Q);
794     void RestartExecute(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM);
795     void Restart(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM);
796 
797     void SetSerialConsistency(CassConsistency c);
798 
IsActive(void) const799     bool IsActive(void) const
800     {
801         return (m_row != nullptr) || (m_statement != nullptr) || (m_batch != nullptr);
802     }
803 
804     virtual async_rslt_t WaitAsync(unsigned int timeoutmks);
805     string GetSQL() const;
806     virtual string ToString(void) const;
807 
808     virtual bool IsEOF(void) const;
809     virtual bool IsAsync(void) const;
810 
811     void BindNull(int iprm);
812     void BindInt8(int iprm, int8_t value);
813     void BindInt16(int iprm, int16_t value);
814     void BindInt32(int iprm, int32_t value);
815     void BindInt64(int iprm, int64_t value);
816     void BindStr(int iprm, const string& value);
817     void BindStr(int iprm, const char* value);
818     void BindBytes(int iprm, const unsigned char* buf, size_t len);
819 
820     template<typename K, typename V>
BindMap(int iprm,const map<K,V> & value)821     void BindMap(int iprm, const map<K, V>& value)
822     {
823         CheckParamExists(iprm);
824         m_params[iprm].Assign<K,V>(value);
825     }
826 
827     template<typename I>
BindList(int iprm,I begin,I end,size_t sz)828     void BindList(int iprm, I begin, I end, size_t sz)
829     {
830         CheckParamExists(iprm);
831         m_params[iprm].AssignList(begin, end, sz);
832     }
833 
834     template<typename I>
BindSet(int iprm,I begin,I end,size_t sz)835     void BindSet(int iprm, I begin, I end, size_t sz)
836     {
837         CheckParamExists(iprm);
838         m_params[iprm].AssignSet<I>(begin, end, sz);
839     }
840 
841     template<typename I>
BindSet(int iprm,const set<I> & v)842     void BindSet(int iprm, const set<I>& v)
843     {
844         BindSet(iprm, v.begin(), v.end(), v.size());
845     }
846 
847     template<typename ...T>
BindTuple(int iprm,const tuple<T...> & value)848     void BindTuple(int iprm, const tuple<T...>& value)
849     {
850         CheckParamExists(iprm);
851         m_params[iprm].Assign(value);
852     }
853 
ParamCount(void) const854     size_t ParamCount(void) const
855     {
856         return m_params.size();
857     }
858 
859     CassValueType ParamType(int  iprm) const;
860     int32_t ParamAsInt32(int iprm);
861     int64_t ParamAsInt64(int iprm);
862     string ParamAsStr(int iprm) const;
863     void ParamAsStr(int iprm, string& value) const;
864     string ParamAsStrForDebug(int iprm) const;
865 
866     async_rslt_t NextRow();
867 
868     template <typename F = int>
FieldIsNull(F ifld) const869     bool FieldIsNull(F ifld) const
870     {
871         const CassValue* clm = GetColumn(ifld);
872         return !clm || cass_value_is_null(clm);
873     }
874 
875     template <typename F = int>
FieldType(F ifld) const876     CCassDataType FieldType(F ifld) const
877     {
878         if (FieldIsNull(ifld)) {
879             return dtNull;
880         }
881 
882         const CassValue* clm = GetColumn(ifld);
883         CassValueType type = cass_value_type(clm);
884         switch (type) {
885             case CASS_VALUE_TYPE_CUSTOM:
886                 return dtCustom;
887             case CASS_VALUE_TYPE_TEXT:
888             case CASS_VALUE_TYPE_ASCII:
889             case CASS_VALUE_TYPE_VARCHAR:
890                 return dtText;
891             case CASS_VALUE_TYPE_VARINT:
892             case CASS_VALUE_TYPE_DECIMAL:
893             case CASS_VALUE_TYPE_TINY_INT:
894             case CASS_VALUE_TYPE_SMALL_INT:
895             case CASS_VALUE_TYPE_INT:
896             case CASS_VALUE_TYPE_BIGINT:
897                 return dtInteger;
898             case CASS_VALUE_TYPE_DOUBLE:
899             case CASS_VALUE_TYPE_FLOAT:
900                 return dtFloat;
901 
902             case CASS_VALUE_TYPE_BLOB:
903                 return dtBlob;
904             case CASS_VALUE_TYPE_BOOLEAN:
905                 return dtBoolean;
906             case CASS_VALUE_TYPE_TIMESTAMP:
907                 return dtTimestamp;
908             case CASS_VALUE_TYPE_COUNTER:
909                 return dtCounter;
910             case CASS_VALUE_TYPE_UUID:
911             case CASS_VALUE_TYPE_TIMEUUID:
912                 return dtUuid;
913             case CASS_VALUE_TYPE_LIST:
914             case CASS_VALUE_TYPE_MAP:
915             case CASS_VALUE_TYPE_SET:
916                 return dtCollection;
917             case CASS_VALUE_TYPE_INET:
918                 return dtNetwork;
919             case CASS_VALUE_TYPE_UNKNOWN:
920             default:
921                 return dtUnknown;
922         }
923     }
924 
925     template <typename F = int>
FieldGetBoolValue(F ifld) const926     bool FieldGetBoolValue(F ifld) const
927     {
928         bool rv;
929         try {
930             Convert::CassValueConvert<bool>(GetColumn(ifld), rv);
931         }
932         catch (CCassandraException& ex) {
933             ex.AddToMessage(GetColumnDef(ifld));
934             throw;
935         }
936         return rv;
937     }
938 
939     template <typename F = int>
FieldGetBoolValue(F ifld,bool _default) const940     bool FieldGetBoolValue(F ifld, bool _default) const
941     {
942         bool rv;
943         Convert::CassValueConvertDef<bool>(GetColumn(ifld), _default, rv);
944         return rv;
945     }
946 
947     template <typename F = int>
FieldGetInt8Value(F ifld) const948     int8_t FieldGetInt8Value(F ifld) const
949     {
950         int8_t rv;
951         try {
952             Convert::CassValueConvert<int8_t>(GetColumn(ifld), rv);
953         }
954         catch (CCassandraException& ex) {
955             ex.AddToMessage(GetColumnDef(ifld));
956             throw;
957         }
958         return rv;
959     }
960 
961     template <typename F = int>
FieldGetInt8Value(F ifld,int8_t _default) const962     int8_t FieldGetInt8Value(F ifld, int8_t _default) const
963     {
964         int8_t rv;
965         Convert::CassValueConvertDef<int8_t>(GetColumn(ifld), _default, rv);
966         return rv;
967     }
968 
969     template <typename F = int>
FieldGetInt16Value(F ifld) const970     int16_t FieldGetInt16Value(F ifld) const
971     {
972         int16_t rv;
973         try {
974             Convert::CassValueConvert<int16_t>(GetColumn(ifld), rv);
975         }
976         catch (CCassandraException& ex) {
977             ex.AddToMessage(GetColumnDef(ifld));
978             throw;
979         }
980         return rv;
981     }
982 
983     template <typename F = int>
FieldGetInt16Value(F ifld,int16_t _default) const984     int16_t FieldGetInt16Value(F ifld, int16_t _default) const
985     {
986         int16_t rv;
987         Convert::CassValueConvertDef<int16_t>(GetColumn(ifld), _default, rv);
988         return rv;
989     }
990 
991     template <typename F = int>
FieldGetInt32Value(F ifld) const992     int32_t FieldGetInt32Value(F ifld) const
993     {
994         int32_t rv;
995         try {
996             Convert::CassValueConvert<int32_t>(GetColumn(ifld), rv);
997         }
998         catch (CCassandraException& ex) {
999             ex.AddToMessage(GetColumnDef(ifld));
1000             throw;
1001         }
1002         return rv;
1003     }
1004 
1005     template <typename F = int>
FieldGetInt32Value(F ifld,int32_t _default) const1006     int32_t FieldGetInt32Value(F ifld, int32_t _default) const
1007     {
1008         int32_t rv;
1009         Convert::CassValueConvertDef<int32_t>(GetColumn(ifld), _default, rv);
1010         return rv;
1011     }
1012 
1013     template <typename F = int>
FieldGetInt64Value(F ifld) const1014     int64_t FieldGetInt64Value(F ifld) const
1015     {
1016         int64_t rv;
1017         try {
1018             Convert::CassValueConvert<int64_t>(GetColumn(ifld), rv);
1019         }
1020         catch (CCassandraException& ex) {
1021             ex.AddToMessage(GetColumnDef(ifld));
1022             throw;
1023         }
1024         return rv;
1025     }
1026 
1027     template <typename F = int>
FieldGetInt64Value(F ifld,int64_t _default) const1028     int64_t FieldGetInt64Value(F ifld, int64_t _default) const
1029     {
1030         int64_t rv;
1031         Convert::CassValueConvertDef<int64_t>(GetColumn(ifld), _default, rv);
1032         return rv;
1033     }
1034 
1035     template <typename F = int>
FieldGetFloatValue(F ifld) const1036     double FieldGetFloatValue(F ifld) const
1037     {
1038         double rv;
1039         try {
1040             Convert::CassValueConvert<double>(GetColumn(ifld), rv);
1041         }
1042         catch (CCassandraException& ex) {
1043             ex.AddToMessage(GetColumnDef(ifld));
1044             throw;
1045         }
1046 
1047         return rv;
1048     }
1049 
1050     template <typename F = int>
FieldGetFloatValue(F ifld,double _default) const1051     double FieldGetFloatValue(F ifld, double _default) const
1052     {
1053         double rv;
1054         Convert::CassValueConvertDef<double>(GetColumn(ifld), _default, rv);
1055         return rv;
1056     }
1057 
1058     template <typename F = int>
FieldGetStrValue(F ifld) const1059     string FieldGetStrValue(F ifld) const
1060     {
1061         string rv;
1062         try {
1063             Convert::CassValueConvert<string>(GetColumn(ifld), rv);
1064         }
1065         catch (CCassandraException& ex) {
1066             ex.AddToMessage(GetColumnDef(ifld));
1067             throw;
1068         }
1069         return rv;
1070     }
1071 
1072     template <typename F = int>
FieldGetStrValueDef(F ifld,const string & _default) const1073     string FieldGetStrValueDef(F ifld, const string& _default) const
1074     {
1075         string rv;
1076         Convert::CassValueConvertDef<string>(GetColumn(ifld), _default, rv);
1077         return rv;
1078     }
1079 
1080     template <typename I, typename F = int>
FieldGetContainerValue(F ifld,I insert_iterator) const1081     void FieldGetContainerValue(F ifld, I insert_iterator) const
1082     {
1083         using TValueType = typename I::container_type::value_type;
1084         const CassValue * clm = GetColumn(ifld);
1085         CassValueType type = cass_value_type(clm);
1086 
1087         switch (type) {
1088             case CASS_COLLECTION_TYPE_LIST:
1089             case CASS_COLLECTION_TYPE_SET:
1090             {
1091                 if (!cass_value_is_null(clm)) {
1092                     unique_ptr<CassIterator, function<void(CassIterator*)> > items_iterator_ptr(
1093                         cass_iterator_from_collection(clm),
1094                         [](CassIterator* itr) {
1095                             cass_iterator_free(itr);
1096                         }
1097                     );
1098                     while (cass_iterator_next(items_iterator_ptr.get())) {
1099                         const CassValue* value = cass_iterator_get_value(items_iterator_ptr.get());
1100                         if (!value) {
1101                             RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed");
1102                         }
1103 
1104                         TValueType v;
1105                         try {
1106                             Convert::CassValueConvert(value, v);
1107                         }
1108                         catch (CCassandraException& ex) {
1109                             ex.AddToMessage(GetColumnDef(ifld));
1110                             throw;
1111                         }
1112                         insert_iterator++ = move(v);
1113                     }
1114                 }
1115             }
1116             break;
1117             default:
1118                 if (!cass_value_is_null(clm)) {
1119                     RAISE_DB_ERROR(eFetchFailed,
1120                         "Failed to convert Cassandra value to collection: unsupported data type (Cassandra type - " +
1121                         to_string(static_cast<int>(type)) + ")"
1122                     );
1123                 }
1124         }
1125     }
1126 
1127     template <typename F = int>
FieldGetStrValue(F ifld,string & value) const1128     void FieldGetStrValue(F ifld, string & value) const
1129     {
1130         value = FieldGetStrValue(ifld);
1131     }
1132 
1133     template <typename F = int>
FieldGetStrValueDef(F ifld,string & value,const string & _default) const1134     void FieldGetStrValueDef(F ifld, string & value, const string & _default) const
1135     {
1136         value = FieldGetStrValueDef(ifld, _default);
1137     }
1138 
1139     template<typename T, typename F = int>
1140     T FieldGetTupleValue(F ifld) const
1141     {
1142         const CassValue * clm = GetColumn(ifld);
1143         if (!cass_value_is_null(clm)) {
1144             T t;
1145             try {
1146                 Convert::CassValueConvert(clm, t);
1147             }
1148             catch (CCassandraException& ex) {
1149                 ex.AddToMessage(GetColumnDef(ifld));
1150                 throw;
1151             }
1152             return t;
1153         }
1154         return T();
1155     }
1156 
1157     template<typename T, typename F = int>
FieldGetSetValues(F ifld,std::vector<T> & values) const1158     void FieldGetSetValues(F ifld, std::vector<T>& values) const
1159     {
1160         const CassValue * clm = GetColumn(ifld);
1161         CassValueType type = cass_value_type(clm);
1162         switch (type) {
1163             case CASS_VALUE_TYPE_SET: {
1164                 unique_ptr<CassIterator, function<void(CassIterator*)> > items_iterator_ptr(
1165                     cass_iterator_from_collection(clm),
1166                     [](CassIterator* itr) {
1167                         cass_iterator_free(itr);
1168                     }
1169                 );
1170                 while (cass_iterator_next(items_iterator_ptr.get())) {
1171                     T v;
1172                     const CassValue* value = cass_iterator_get_value(items_iterator_ptr.get());
1173                     if (!value) {
1174                         RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed");
1175                     }
1176                     try {
1177                         Convert::CassValueConvert(value, v);
1178                     }
1179                     catch (CCassandraException& ex) {
1180                         ex.AddToMessage(GetColumnDef(ifld));
1181                         throw;
1182                     }
1183                     values.push_back(move(v));
1184                 }
1185                 break;
1186             }
1187             default:
1188                 RAISE_DB_ERROR(eFetchFailed,
1189                     "Failed to convert Cassandra value to set<>: unsupported data type (Cassandra type - " +
1190                     to_string(static_cast<int>(type)) + ")"
1191                 );
1192         }
1193     }
1194 
1195     template<typename T, typename F = int>
FieldGetSetValues(F ifld,std::set<T> & values) const1196     void FieldGetSetValues(F ifld, std::set<T>& values) const
1197     {
1198         const CassValue * clm = GetColumn(ifld);
1199         CassValueType type = cass_value_type(clm);
1200         switch (type) {
1201             case CASS_VALUE_TYPE_SET: {
1202                 unique_ptr<CassIterator, function<void(CassIterator*)>> items_iterator(
1203                     cass_iterator_from_collection(clm),
1204                     [](CassIterator* itr) {
1205                         cass_iterator_free(itr);
1206                     }
1207                 );
1208                 if (items_iterator != nullptr) {
1209                     while (cass_iterator_next(items_iterator.get())) {
1210                         T v;
1211                         const CassValue* value = cass_iterator_get_value(items_iterator.get());
1212                         if (!value) {
1213                             RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed");
1214                         }
1215                         try {
1216                             Convert::CassValueConvert(value, v);
1217                         }
1218                         catch (CCassandraException& ex) {
1219                             ex.AddToMessage(GetColumnDef(ifld));
1220                             throw;
1221                         }
1222                         values.insert(move(v));
1223                     }
1224                 }
1225                 break;
1226             }
1227             default:
1228                 RAISE_DB_ERROR(eFetchFailed,
1229                     "Failed to convert Cassandra value to set<>: unsupported data type (Cassandra type - " +
1230                     to_string(static_cast<int>(type)) + ")"
1231                 );
1232         }
1233     }
1234 
1235     template <typename K, typename V, typename F = int>
FieldGetMapValue(F ifld,map<K,V> & result) const1236     void FieldGetMapValue(F ifld, map<K, V>& result) const
1237     {
1238         const CassValue * clm = GetColumn(ifld);
1239         CassValueType type = cass_value_type(clm);
1240         switch (type) {
1241             case CASS_VALUE_TYPE_MAP: {
1242                 result.clear();
1243                 unique_ptr<CassIterator, function<void(CassIterator*)>> items_iterator(
1244                     cass_iterator_from_map(clm),
1245                     [](CassIterator* itr) {
1246                         cass_iterator_free(itr);
1247                     }
1248                 );
1249                 if (items_iterator != nullptr) {
1250                     while (cass_iterator_next(items_iterator.get())) {
1251                         const CassValue* key = cass_iterator_get_map_key(items_iterator.get());
1252                         const CassValue* val = cass_iterator_get_map_value(items_iterator.get());
1253                         K k;
1254                         try {
1255                             Convert::CassValueConvert(key, k);
1256                         }
1257                         catch (CCassandraException& ex) {
1258                             ex.AddToMessage(GetColumnDef(ifld));
1259                             throw;
1260                         }
1261                         V v;
1262                         try {
1263                             Convert::CassValueConvert(val, v);
1264                         }
1265                         catch (CCassandraException& ex) {
1266                             ex.AddToMessage(GetColumnDef(ifld));
1267                             throw;
1268                         }
1269                         result.emplace(move(k), move(v));
1270                     }
1271                 }
1272                 break;
1273             }
1274             default:
1275                 RAISE_DB_ERROR(eFetchFailed,
1276                     "Failed to convert Cassandra value to map<>: unsupported data type (Cassandra type - " +
1277                     to_string(static_cast<int>(type)) + ")"
1278                 );
1279         }
1280     }
1281 
1282     template<typename F = int>
FieldGetBlobValue(F ifld,unsigned char * buf,size_t len) const1283     size_t FieldGetBlobValue(F ifld, unsigned char* buf, size_t len) const
1284     {
1285         const CassValue * clm = GetColumn(ifld);
1286         const unsigned char * output = nullptr;
1287         size_t outlen = 0;
1288         CassError err = cass_value_get_bytes(clm, &output, &outlen);
1289         if (err != CASS_OK) {
1290             RAISE_CASS_ERROR(err, eFetchFailed, "Failed to fetch blob data:");
1291         }
1292         if (len < outlen) {
1293             RAISE_DB_ERROR(eFetchFailed, "Failed to fetch blob data: insufficient buffer provided");
1294         }
1295         memcpy(buf, output, outlen);
1296         return outlen;
1297     }
1298 
1299     template<typename F = int>
FieldGetBlobRaw(F ifld,const unsigned char ** rawbuf) const1300     size_t FieldGetBlobRaw(F ifld, const unsigned char** rawbuf) const
1301     {
1302         if (rawbuf) {
1303             *rawbuf = nullptr;
1304         }
1305 
1306         const CassValue * clm = GetColumn(ifld);
1307         const unsigned char * output = nullptr;
1308         size_t outlen = 0;
1309         CassError rc = cass_value_get_bytes(clm, &output, &outlen);
1310         if (rc == CASS_ERROR_LIB_NULL_VALUE) {
1311             return 0;
1312         }
1313 
1314         if (rc != CASS_OK) {
1315             RAISE_CASS_ERROR(rc, eFetchFailed, "Failed to fetch blob data:");
1316         }
1317         if (rawbuf) {
1318             *rawbuf = output;
1319         }
1320         return outlen;
1321     }
1322 
1323     template<typename F = int>
FieldGetBlobSize(F ifld) const1324     size_t FieldGetBlobSize(F ifld) const
1325     {
1326         const CassValue * clm = GetColumn(ifld);
1327         const unsigned char * output = nullptr;
1328         size_t outlen = 0;
1329         CassError rc = cass_value_get_bytes(clm, &output, &outlen);
1330         if (rc != CASS_ERROR_LIB_NULL_VALUE && rc != CASS_OK) {
1331             RAISE_CASS_ERROR(rc, eFetchFailed, "Failed to fetch blob data:");
1332         }
1333         return outlen;
1334     }
1335 
GetConnection(void)1336     shared_ptr<CCassConnection> GetConnection(void)
1337     {
1338         return m_connection;
1339     }
1340 
1341     /**
1342      * Use SatOnData3. This one is unsafe and will be removed
1343      */
SetOnData(TCassQueryOnDataCallback cb,void * data)1344     NCBI_DEPRECATED void SetOnData(TCassQueryOnDataCallback cb, void* data)
1345     {
1346         if (m_ondata == cb && m_ondata_data == data) {
1347             return;
1348         }
1349         if (m_future) {
1350             if (m_ondata) {
1351                 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned");
1352             }
1353             if (!cb && m_future) {
1354                 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset");
1355             }
1356         }
1357         m_ondata = move(cb);
1358         m_ondata_data = data;
1359         if (m_future) {
1360             SetupOnDataCallback();
1361         }
1362     }
1363 
1364     /**
1365      * Use SatOnData3. This one is unsafe and will be removed
1366      */
SetOnData2(TCassQueryOnData2Callback cb,void * Data)1367     NCBI_DEPRECATED void SetOnData2(TCassQueryOnData2Callback cb, void* Data)
1368     {
1369         if (m_ondata2 == cb && m_ondata2_data == Data) {
1370             return;
1371         }
1372         if (m_future) {
1373             if (m_ondata2) {
1374                 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned");
1375             }
1376             if (!cb) {
1377                 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset");
1378             }
1379         }
1380         m_ondata2 = cb;
1381         m_ondata2_data = Data;
1382         if (m_future) {
1383             SetupOnDataCallback();
1384         }
1385     }
1386 
SetOnData3(shared_ptr<CCassDataCallbackReceiver> cb)1387     void SetOnData3(shared_ptr<CCassDataCallbackReceiver> cb)
1388     {
1389         auto ondata3 = m_ondata3.lock();
1390         if (ondata3 == cb) {
1391             return;
1392         }
1393 
1394         if (m_future) {
1395             if (ondata3) {
1396                 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned");
1397             }
1398             if (!cb) {
1399                 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset");
1400             }
1401         }
1402         m_ondata3 = cb;
1403         if (m_future) {
1404             SetupOnDataCallback();
1405         }
1406     }
1407 
SetOnExecute(void (* Cb)(CCassQuery &,void *),void * Data)1408     void SetOnExecute(void (*Cb)(CCassQuery&, void*), void* Data)
1409     {
1410         m_onexecute = Cb;
1411         m_onexecute_data = Data;
1412     }
1413 
1414     static const unsigned int DEFAULT_PAGE_SIZE;
1415 };
1416 
1417 template<>
1418 const CassValue* CCassQuery::GetColumn(int ifld) const;
1419 template<>
1420 const CassValue* CCassQuery::GetColumn(const string& name) const;
1421 template<>
1422 const CassValue* CCassQuery::GetColumn(const char* name) const;
1423 template<>
1424 string CCassQuery::GetColumnDef(int ifld) const;
1425 template<>
1426 string CCassQuery::GetColumnDef(const string& name) const;
1427 template<>
1428 string CCassQuery::GetColumnDef(const char* name) const;
1429 
1430 END_IDBLOB_SCOPE
1431 
1432 #endif // OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_
1433