1 /* $Id: cass_driver.hpp 619524 2020-11-05 19:41:53Z saprykin $ 2 * =========================================================================== 3 * 4 * PUBLIC DOMAIN NOTICE 5 * National Center for Biotechnology Information 6 * 7 * This software/database is a "United States Government Work" under the 8 * terms of the United States Copyright Act. It was written as part of 9 * the author's official duties as a United States Government employee and 10 * thus cannot be copyrighted. This software/database is freely available 11 * to the public for use. The National Library of Medicine and the U.S. 12 * Government have not placed any restriction on its use or reproduction. 13 * 14 * Although all reasonable efforts have been taken to ensure the accuracy 15 * and reliability of the software and data, the NLM and the U.S. 16 * Government do not and cannot warrant the performance or results that 17 * may be obtained by using this software or data. The NLM and the U.S. 18 * Government disclaim all warranties, express or implied, including 19 * warranties of performance, merchantability or fitness for any particular 20 * purpose. 21 * 22 * Please cite the author in any work or product based on this material. 23 * 24 * =========================================================================== 25 * 26 * Authors: Dmitri Dmitrienko 27 * 28 * File Description: 29 * 30 * Wrapper classes around cassandra "C"-API 31 * 32 */ 33 34 #ifndef OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_ 35 #define OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_ 36 37 #include <cassandra.h> 38 #include <corelib/ncbistre.hpp> 39 #include <corelib/ncbistr.hpp> 40 #include <corelib/ncbiexpt.hpp> 41 #include <corelib/ncbimtx.hpp> 42 43 #include <atomic> 44 #include <cassert> 45 #include <functional> 46 #include <limits> 47 #include <memory> 48 #include <set> 49 #include <sstream> 50 #include <string> 51 #include <tuple> 52 #include <unordered_map> 53 #include <utility> 54 #include <vector> 55 56 #include "IdCassScope.hpp" 57 #include "cass_exception.hpp" 58 59 #include <objtools/pubseq_gateway/impl/cassandra/cass_conv.hpp> 60 61 BEGIN_IDBLOB_SCOPE 62 USING_NCBI_SCOPE; 63 64 #define CASS_PREPARED_Q 1 65 #define CASS_DRV_TIMEOUT_MS 2000U 66 #define CASS_ERROR_SERVER_CONSISTENCY_NOT_ACHIEVED 0xC40063C0 67 68 #define ASYNC_RESULT_MAP(XX) \ 69 XX(ar_wait, "Wait") \ 70 XX(ar_done, "Done") \ 71 XX(ar_dataready, "DataReady") 72 73 typedef enum { 74 #define XX(name, _) name, 75 ASYNC_RESULT_MAP(XX) 76 #undef XX 77 ASYNC_RSLT_LAST_ENTRY 78 } async_rslt_t; 79 80 81 typedef enum { 82 LB_DCAWARE = 0, // the default, per CPP driver comments on 83 // cass_cluster_set_load_balance_dc_aware function 84 LB_ROUNDROBIN 85 } loadbalancing_policy_t; 86 87 88 typedef enum { 89 dtNull, 90 dtUnknown, 91 dtBoolean, 92 dtInteger, 93 dtFloat, 94 dtText, 95 dtTimestamp, 96 dtCounter, 97 dtUuid, 98 dtBlob, 99 dtNetwork, 100 dtCollection, 101 dtCustom 102 } CCassDataType; 103 104 class CCassQuery; 105 class CCassQueryCbRef; 106 class CCassConnection; 107 class CCassDataCallbackReceiver; 108 109 using TCassQueryOnDataCallback = void(*)(CCassQuery&, void *); 110 using TCassQueryOnExecuteCallback = void(*)(CCassQuery&, void *); 111 using TCassQueryOnData2Callback = void(*)(void *); 112 113 114 class CCassConnection: public std::enable_shared_from_this<CCassConnection> 115 { 116 // cass_cluster_set_request_timeout takes in ms while 117 // cass_future_wait_timed takes in mks 118 // to avoid overflow we have to adjust to mks 119 // ~35minutes 120 static const unsigned int kCassMaxTimeout = numeric_limits<unsigned int>::max() / 1000; 121 122 using TPreparedList = unordered_map<string, const CassPrepared *>; 123 124 friend class CCassQuery; 125 friend class CCassConnectionFactory; 126 127 void CloseSession(void); 128 129 protected: 130 CCassConnection(); 131 const CassPrepared * Prepare(const string & sql); 132 133 public: 134 using TTokenValue = int64_t; 135 using TTokenRanges = vector<pair<TTokenValue, TTokenValue>>; 136 137 CCassConnection(const CCassConnection&) = delete; 138 CCassConnection& operator=(const CCassConnection&) = delete; 139 140 static shared_ptr<CCassConnection> Create(void); 141 virtual ~CCassConnection(void); 142 void Connect(void); 143 void Reconnect(void); 144 void Close(void); 145 146 bool IsConnected(void); 147 int64_t GetActiveStatements(void) const; 148 CassMetrics GetMetrics(void); 149 void SetConnProp(const string & host, const string & user, const string & pwd, int16_t port = 0); 150 151 void SetLoadBalancing(loadbalancing_policy_t policy); 152 void SetTokenAware(bool value); 153 void SetLatencyAware(bool value); 154 155 void SetTimeouts(unsigned int ConnTimeoutMs); 156 void SetTimeouts(unsigned int ConnTimeoutMs, unsigned int QryTimeoutMs); 157 158 void SetFallBackRdConsistency(bool value); 159 bool GetFallBackRdConsistency(void) const; 160 161 void SetFallBackWrConsistency(unsigned int value); 162 unsigned int GetFallBackWrConsistency(void) const; 163 164 static void SetLogging(EDiagSev severity); 165 static void DisableLogging(void); 166 static void UpdateLogging(void); 167 168 unsigned int QryTimeout(void) const; 169 unsigned int QryTimeoutMks(void) const; 170 void SetRtLimits(unsigned int numThreadsIo, unsigned int numConnPerHost, unsigned int maxConnPerHost); 171 172 void SetKeepAlive(unsigned int keepalive); 173 174 void SetKeyspace(const string & keyspace); 175 string Keyspace(void) const; 176 177 void SetBlackList(const string & blacklist); 178 179 shared_ptr<CCassQuery> NewQuery(); 180 void GetTokenRanges(TTokenRanges &ranges); 181 // @deprecated 182 void getTokenRanges(TTokenRanges &ranges); 183 vector<string> GetPartitionKeyColumnNames(string const & keyspace, string const & table) const; 184 static string NewTimeUUID(); 185 static void Perform( 186 unsigned int optimeoutms, 187 const std::function<bool()>& PreLoopCB, 188 const std::function<void(const CCassandraException&)>& DbExceptCB, 189 const std::function<bool(bool)>& OpCB 190 ); 191 192 private: 193 string m_host; 194 int16_t m_port; 195 string m_user; 196 string m_pwd; 197 string m_blacklist; 198 string m_keyspace; 199 CassCluster * m_cluster; 200 CassSession * m_session; 201 unsigned int m_ctimeoutms; 202 unsigned int m_qtimeoutms; 203 unsigned int m_last_query_cnt; 204 loadbalancing_policy_t m_loadbalancing; 205 bool m_tokenaware; 206 bool m_latencyaware; 207 unsigned int m_numThreadsIo; 208 unsigned int m_numConnPerHost; 209 unsigned int m_maxConnPerHost; 210 unsigned int m_keepalive; 211 bool m_fallback_readconsistency; 212 unsigned int m_FallbackWriteConsistency; 213 static atomic<CassUuidGen*> m_CassUuidGen; 214 CSpinLock m_prepared_mux; 215 TPreparedList m_prepared; 216 atomic<int64_t> m_active_statements; 217 218 static bool m_LoggingInitialized; 219 static bool m_LoggingEnabled; 220 static EDiagSev m_LoggingLevel; 221 }; 222 223 class CCassPrm 224 { 225 static constexpr size_t kMaxVarcharDebugBytes = 10; 226 protected: 227 CassValueType m_type; 228 bool m_assigned; 229 230 union simpleval_t { 231 int8_t i8; 232 int16_t i16; 233 int32_t i32; 234 int64_t i64; simpleval_t()235 simpleval_t() { 236 memset(this, 0, sizeof(union simpleval_t)); 237 } 238 } m_simpleval; 239 240 string m_bytes; 241 unique_ptr<CassCollection, function<void(CassCollection*)> > m_collection; 242 unique_ptr<CassTuple, function<void(CassTuple*)> > m_tuple; 243 244 void Bind(CassStatement * statement, unsigned int idx); 245 246 public: CCassPrm()247 CCassPrm() : 248 m_simpleval(), m_collection(nullptr, nullptr) 249 { 250 Clear(); 251 } 252 253 CCassPrm(CCassPrm&&) = default; 254 ~CCassPrm()255 ~CCassPrm() 256 { 257 Clear(); 258 } 259 Clear(void)260 void Clear(void) 261 { 262 m_type = CASS_VALUE_TYPE_UNKNOWN; 263 m_assigned = false; 264 m_collection = nullptr; 265 m_bytes.clear(); 266 } 267 Assign(int8_t v)268 void Assign(int8_t v) 269 { 270 m_type = CASS_VALUE_TYPE_TINY_INT; 271 m_simpleval.i8 = v; 272 m_assigned = true; 273 } 274 Assign(int16_t v)275 void Assign(int16_t v) 276 { 277 m_type = CASS_VALUE_TYPE_SMALL_INT; 278 m_simpleval.i16 = v; 279 m_assigned = true; 280 } 281 Assign(int32_t v)282 void Assign(int32_t v) 283 { 284 m_type = CASS_VALUE_TYPE_INT; 285 m_simpleval.i32 = v; 286 m_assigned = true; 287 } 288 Assign(int64_t v)289 void Assign(int64_t v) 290 { 291 m_type = CASS_VALUE_TYPE_BIGINT; 292 m_simpleval.i64 = v; 293 m_assigned = true; 294 } 295 296 template<typename T, typename = typename enable_if<is_constructible<string, T>::value>::type > 297 // string, string&, string&&, char * Assign(T v)298 void Assign(T v) { 299 m_type = CASS_VALUE_TYPE_VARCHAR; 300 m_bytes = string(v); 301 m_assigned = true; 302 } 303 Assign(const unsigned char * buf,size_t len)304 void Assign(const unsigned char * buf, size_t len) 305 { 306 m_type = CASS_VALUE_TYPE_BLOB; 307 m_bytes = string(reinterpret_cast<const char *>(buf), len); 308 m_assigned = true; 309 } 310 311 template<typename K, typename V> Assign(const map<K,V> & v)312 void Assign(const map<K, V> & v) 313 { 314 m_type = CASS_VALUE_TYPE_MAP; 315 m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > ( 316 cass_collection_new(CASS_COLLECTION_TYPE_MAP, v.size() * 2), 317 [](CassCollection* coll) { 318 cass_collection_free(coll); 319 } 320 ); 321 322 for (const auto & it : v) { 323 Convert::ValueToCassCollection(it.first, m_collection.get()); 324 Convert::ValueToCassCollection(it.second, m_collection.get()); 325 } 326 m_assigned = true; 327 } 328 329 template<typename I> AssignList(I begin,I end,size_t sz)330 void AssignList(I begin, I end, size_t sz) 331 { 332 m_type = CASS_VALUE_TYPE_LIST; 333 m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > ( 334 cass_collection_new(CASS_COLLECTION_TYPE_LIST, sz), 335 [](CassCollection* coll) { 336 cass_collection_free(coll); 337 } 338 ); 339 340 for (; begin != end; ++begin) { 341 Convert::ValueToCassCollection(*begin, m_collection.get()); 342 } 343 m_assigned = true; 344 } 345 346 template<typename I> AssignSet(I begin,I end,size_t sz)347 void AssignSet(I begin, I end, size_t sz) 348 { 349 m_type = CASS_VALUE_TYPE_SET; 350 m_collection = unique_ptr<CassCollection, function<void(CassCollection*)> > ( 351 cass_collection_new(CASS_COLLECTION_TYPE_SET, sz), 352 [](CassCollection* coll) { 353 cass_collection_free(coll); 354 } 355 ); 356 357 for (; begin != end; ++begin) { 358 Convert::ValueToCassCollection(*begin, m_collection.get()); 359 } 360 m_assigned = true; 361 } 362 363 template<typename I> AssignSet(const set<I> & v)364 void AssignSet(const set<I>& v) { 365 AssignSet<typename set<I>::const_iterator>(v.cbegin(), v.cend(), v.size()); 366 } 367 368 template<typename ...T> Assign(const tuple<T...> & t)369 void Assign(const tuple<T...>& t) 370 { 371 m_type = CASS_VALUE_TYPE_TUPLE; 372 m_assigned = false; 373 m_tuple = unique_ptr<CassTuple, function<void(CassTuple*)> >( 374 cass_tuple_new(tuple_size< tuple<T...> >::value), 375 [](CassTuple* coll){ 376 cass_tuple_free(coll); 377 } 378 ); 379 Convert::ValueToCassTuple(t, m_tuple.get()); 380 m_assigned = true; 381 } 382 AssignNull(void)383 void AssignNull(void) 384 { 385 m_type = CASS_VALUE_TYPE_UNKNOWN; 386 m_assigned = true; 387 } 388 IsAssigned(void) const389 bool IsAssigned(void) const 390 { 391 return m_assigned; 392 } 393 GetType() const394 CassValueType GetType() const 395 { 396 return m_type; 397 } 398 AsInt8(void) const399 int32_t AsInt8(void) const 400 { 401 switch (m_type) { 402 case CASS_VALUE_TYPE_TINY_INT: 403 return m_simpleval.i8; 404 case CASS_VALUE_TYPE_VARCHAR: { 405 int8_t i8; 406 if (NStr::StringToNumeric(m_bytes, &i8)) 407 return i8; 408 else 409 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int8 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 410 } 411 // no break 412 default: 413 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int8 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 414 } 415 } 416 AsInt16(void) const417 int32_t AsInt16(void) const 418 { 419 switch (m_type) { 420 case CASS_VALUE_TYPE_TINY_INT: 421 return m_simpleval.i8; 422 case CASS_VALUE_TYPE_SMALL_INT: 423 return m_simpleval.i16; 424 case CASS_VALUE_TYPE_VARCHAR: { 425 int16_t i16; 426 if (NStr::StringToNumeric(m_bytes, &i16)) 427 return i16; 428 else 429 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int16 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 430 } 431 // no break 432 default: 433 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int16 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 434 } 435 } 436 AsInt32(void) const437 int32_t AsInt32(void) const 438 { 439 switch (m_type) { 440 case CASS_VALUE_TYPE_TINY_INT: 441 return m_simpleval.i8; 442 case CASS_VALUE_TYPE_SMALL_INT: 443 return m_simpleval.i16; 444 case CASS_VALUE_TYPE_INT: 445 return m_simpleval.i32; 446 case CASS_VALUE_TYPE_BIGINT: 447 return m_simpleval.i64; 448 case CASS_VALUE_TYPE_VARCHAR: { 449 int32_t i32; 450 if (NStr::StringToNumeric(m_bytes, &i32)) 451 return i32; 452 else 453 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int32 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 454 } 455 // no break 456 default: 457 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int32 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 458 } 459 } 460 AsInt64(void) const461 int64_t AsInt64(void) const 462 { 463 switch (m_type) { 464 case CASS_VALUE_TYPE_TINY_INT: 465 return m_simpleval.i8; 466 case CASS_VALUE_TYPE_SMALL_INT: 467 return m_simpleval.i16; 468 case CASS_VALUE_TYPE_INT: 469 return m_simpleval.i32; 470 case CASS_VALUE_TYPE_BIGINT: 471 return m_simpleval.i64; 472 case CASS_VALUE_TYPE_VARCHAR: { 473 int64_t i64; 474 if (NStr::StringToNumeric(m_bytes, &i64)) 475 return i64; 476 else 477 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int64 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 478 } 479 // no break 480 default: 481 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to int64 (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 482 } 483 } 484 AsString(string & value) const485 void AsString(string & value) const 486 { 487 switch (m_type) { 488 case CASS_VALUE_TYPE_TINY_INT: 489 value = NStr::NumericToString(m_simpleval.i8); 490 break; 491 case CASS_VALUE_TYPE_SMALL_INT: 492 value = NStr::NumericToString(m_simpleval.i16); 493 break; 494 case CASS_VALUE_TYPE_INT: 495 value = NStr::NumericToString(m_simpleval.i32); 496 break; 497 case CASS_VALUE_TYPE_BIGINT: 498 value = NStr::NumericToString(m_simpleval.i64); 499 break; 500 case CASS_VALUE_TYPE_VARCHAR: 501 value = m_bytes; 502 break; 503 case CASS_VALUE_TYPE_SET: 504 default: 505 RAISE_DB_ERROR(eBindFailed, string("Can't convert Param to string (type=") + NStr::NumericToString(static_cast<int>(m_type)) + ")"); 506 } 507 } 508 AsString() const509 string AsString() const 510 { 511 string rv; 512 AsString(rv); 513 return rv; 514 } 515 AsStringForDebug() const516 string AsStringForDebug() const 517 { 518 switch (m_type) { 519 case CASS_VALUE_TYPE_TINY_INT: 520 case CASS_VALUE_TYPE_SMALL_INT: 521 case CASS_VALUE_TYPE_INT: 522 case CASS_VALUE_TYPE_BIGINT: 523 return AsString(); 524 case CASS_VALUE_TYPE_SET: 525 return "set(...)"; 526 case CASS_VALUE_TYPE_MAP: 527 return "map(...)"; 528 case CASS_VALUE_TYPE_LIST: 529 return "list(...)"; 530 case CASS_VALUE_TYPE_TUPLE: 531 return "tuple(...)"; 532 case CASS_VALUE_TYPE_UNKNOWN: 533 return "Null"; 534 case CASS_VALUE_TYPE_VARCHAR: 535 if (m_bytes.size() > kMaxVarcharDebugBytes) { 536 return "'" + m_bytes.substr(0, kMaxVarcharDebugBytes) + "...'"; 537 } else { 538 return "'" + m_bytes + "'"; 539 } 540 case CASS_VALUE_TYPE_BLOB: { 541 stringstream s; 542 s << "'0x"; 543 for (size_t i = 0; i < m_bytes.size() && i < kMaxVarcharDebugBytes; ++i) { 544 s << hex << setfill('0') << setw(2) << static_cast<int16_t>(m_bytes[i]); 545 } 546 if (m_bytes.size() > kMaxVarcharDebugBytes) { 547 s << "...'"; 548 } else { 549 s << "'"; 550 } 551 return s.str(); 552 } 553 default: 554 return "AsStringForDebugNotImplemented(t:" + to_string(m_type) + ")"; 555 } 556 } 557 558 friend class CCassQuery; 559 }; 560 561 562 class CCassParams: public vector<CCassPrm> 563 {}; 564 565 class CCassDataCallbackReceiver { 566 public: 567 virtual ~CCassDataCallbackReceiver() = default; 568 virtual void OnData() = 0; 569 }; 570 571 class CCassQueryCbRef: public std::enable_shared_from_this<CCassQueryCbRef> 572 { 573 public: CCassQueryCbRef(shared_ptr<CCassQuery> query)574 explicit CCassQueryCbRef(shared_ptr<CCassQuery> query) 575 : m_ondata(nullptr) 576 , m_data(nullptr) 577 , m_ondata2(nullptr) 578 , m_data2(nullptr) 579 , m_query(query) 580 { 581 } 582 ~CCassQueryCbRef()583 virtual ~CCassQueryCbRef() 584 { 585 } 586 Attach(void (* ondata)(CCassQuery &,void *),void * data,void (* ondata2)(void *),void * data2,shared_ptr<CCassDataCallbackReceiver> ondata3)587 void Attach(void (*ondata)(CCassQuery&, void*), 588 void* data, void (*ondata2)(void*), void* data2, 589 shared_ptr<CCassDataCallbackReceiver> ondata3) 590 { 591 m_self = shared_from_this(); 592 m_data = data; 593 m_ondata = ondata; 594 595 m_data2 = data2; 596 m_ondata2 = ondata2; 597 598 m_ondata3 = ondata3; 599 } 600 Detach(bool remove_self_ref=false)601 void Detach(bool remove_self_ref = false) 602 { 603 if (remove_self_ref) { 604 m_self = nullptr; 605 } 606 } 607 s_OnFutureCb(CassFuture * future,void * data)608 static void s_OnFutureCb(CassFuture* future, void* data) 609 { 610 try { 611 shared_ptr<CCassQueryCbRef> self(static_cast<CCassQueryCbRef*>(data)->shared_from_this()); 612 assert(self->m_self); 613 self->m_self = nullptr; 614 615 auto query = self->m_query.lock(); 616 if (query != nullptr) { 617 if (self->m_ondata) { 618 self->m_ondata(*query.get(), self->m_data); 619 } 620 if (self->m_ondata2) { 621 self->m_ondata2(self->m_data2); 622 } 623 auto ondata3 = self->m_ondata3.lock(); 624 if (ondata3) { 625 ondata3->OnData(); 626 } 627 } 628 } catch (const CException& e) { 629 ERR_POST("CException caught! Message: " << e.GetMsg()); 630 abort(); 631 } 632 catch (const exception& e) { 633 ERR_POST("exception caught! Message: " << e.what()); 634 abort(); 635 } 636 } 637 638 private: 639 TCassQueryOnDataCallback m_ondata; 640 void* m_data; 641 642 TCassQueryOnData2Callback m_ondata2; 643 void* m_data2; 644 645 weak_ptr<CCassDataCallbackReceiver> m_ondata3; 646 647 weak_ptr<CCassQuery> m_query; 648 shared_ptr<CCassQueryCbRef> m_self; 649 }; 650 651 class CCassQuery: public std::enable_shared_from_this<CCassQuery> 652 { 653 private: 654 CCassQuery(const CCassQuery&) = delete; 655 CCassQuery& operator=(const CCassQuery&) = delete; 656 657 friend class CCassConnection; 658 friend class CCassQueryCbRef; 659 660 private: CheckParamAssigned(int iprm) const661 void CheckParamAssigned(int iprm) const 662 { 663 if (iprm < 0 || iprm >= static_cast<int64_t>(m_params.size())) { 664 RAISE_DB_ERROR(eBindFailed, "Param index #" + to_string(iprm) + " is out of range"); 665 } 666 if (!m_params[iprm].IsAssigned()) { 667 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Param #" + to_string(iprm) + " is not assigned"); 668 } 669 } 670 CheckParamExists(int iprm) const671 void CheckParamExists(int iprm) const 672 { 673 if (iprm < 0 || (unsigned int)iprm >= m_params.size()) { 674 RAISE_DB_ERROR(eBindFailed, "Param index #" + to_string(iprm) + " is out of range"); 675 } 676 } 677 678 shared_ptr<CCassConnection> m_connection; 679 unsigned int m_qtimeoutms; 680 int64_t m_futuretime; 681 CassFuture * m_future; 682 CassBatch * m_batch; 683 CassStatement * m_statement; 684 const CassResult * m_result; 685 CassIterator * m_iterator; 686 const CassRow * m_row; 687 unsigned int m_page_size; 688 bool m_EOF; 689 bool m_page_start; 690 CCassParams m_params; 691 string m_sql; 692 bool m_results_expected; 693 bool m_async; 694 bool m_allow_prepare; 695 bool m_is_prepared; 696 CassConsistency m_serial_consistency; 697 698 shared_ptr<CCassQueryCbRef> m_cb_ref; 699 700 TCassQueryOnDataCallback m_ondata; 701 void* m_ondata_data; 702 703 TCassQueryOnData2Callback m_ondata2; 704 void* m_ondata2_data; 705 706 weak_ptr<CCassDataCallbackReceiver> m_ondata3; 707 708 TCassQueryOnExecuteCallback m_onexecute; 709 void* m_onexecute_data; 710 711 async_rslt_t Wait(unsigned int timeoutmks); 712 void Bind(void); 713 714 template<typename F> GetColumn(F ifld) const715 const CassValue* GetColumn(F ifld) const 716 { 717 static_assert(sizeof(F) == 0, "Columns can be accessed by either index or name"); 718 return CNotImplemented<F>::GetColumn_works_by_name_and_by_index(); 719 } 720 721 template<typename F> GetColumnDef(F ifld) const722 string GetColumnDef(F ifld) const 723 { 724 static_assert(sizeof(F) == 0, "Columns can be accessed by either index or name"); 725 return CNotImplemented<F>::GetColumn_works_by_name_and_by_index(); 726 } 727 728 void GetFuture(); 729 void ProcessFutureResult(); 730 void SetupOnDataCallback(); 731 void InternalClose(bool closebatch); 732 void SetEOF(bool Value); 733 734 template<class T> 735 class CNotImplemented 736 {}; 737 738 protected: CCassQuery(const shared_ptr<CCassConnection> & connection)739 explicit CCassQuery(const shared_ptr<CCassConnection>& connection) : 740 m_connection(connection), 741 m_qtimeoutms(0), 742 m_futuretime(0), 743 m_future(nullptr), 744 m_batch(nullptr), 745 m_statement(nullptr), 746 m_result(nullptr), 747 m_iterator(nullptr), 748 m_row(nullptr), 749 m_page_size(0), 750 m_EOF(false), 751 m_page_start(false), 752 m_results_expected(false), 753 m_async(false), 754 m_allow_prepare(CASS_PREPARED_Q), 755 m_is_prepared(false), 756 m_serial_consistency(CASS_CONSISTENCY_ANY), 757 m_ondata(nullptr), 758 m_ondata_data(nullptr), 759 m_ondata2(nullptr), 760 m_ondata2_data(nullptr), 761 m_onexecute(nullptr), 762 m_onexecute_data(nullptr) 763 { 764 if (!m_connection) { 765 RAISE_DB_ERROR(eFatal, "Cassandra connection is not established"); 766 } 767 } 768 769 public: 770 virtual ~CCassQuery(); 771 virtual void Close(void); 772 773 void SetTimeout(); 774 void SetTimeout(unsigned int t); 775 776 unsigned int Timeout(void) const; 777 778 bool IsReady(void); 779 void NewBatch(void); 780 async_rslt_t RunBatch(); 781 782 void SetSQL(const string& sql, unsigned int PrmCount); 783 784 /* returns resultset */ 785 void Query(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM, 786 bool run_async = false, bool allow_prepare = CASS_PREPARED_Q, 787 unsigned int page_size = DEFAULT_PAGE_SIZE); 788 789 void RestartQuery(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM); 790 791 /* returns no resultset */ 792 void Execute(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM, 793 bool run_async = false, bool allow_prepare = CASS_PREPARED_Q); 794 void RestartExecute(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM); 795 void Restart(CassConsistency c = CASS_CONSISTENCY_LOCAL_QUORUM); 796 797 void SetSerialConsistency(CassConsistency c); 798 IsActive(void) const799 bool IsActive(void) const 800 { 801 return (m_row != nullptr) || (m_statement != nullptr) || (m_batch != nullptr); 802 } 803 804 virtual async_rslt_t WaitAsync(unsigned int timeoutmks); 805 string GetSQL() const; 806 virtual string ToString(void) const; 807 808 virtual bool IsEOF(void) const; 809 virtual bool IsAsync(void) const; 810 811 void BindNull(int iprm); 812 void BindInt8(int iprm, int8_t value); 813 void BindInt16(int iprm, int16_t value); 814 void BindInt32(int iprm, int32_t value); 815 void BindInt64(int iprm, int64_t value); 816 void BindStr(int iprm, const string& value); 817 void BindStr(int iprm, const char* value); 818 void BindBytes(int iprm, const unsigned char* buf, size_t len); 819 820 template<typename K, typename V> BindMap(int iprm,const map<K,V> & value)821 void BindMap(int iprm, const map<K, V>& value) 822 { 823 CheckParamExists(iprm); 824 m_params[iprm].Assign<K,V>(value); 825 } 826 827 template<typename I> BindList(int iprm,I begin,I end,size_t sz)828 void BindList(int iprm, I begin, I end, size_t sz) 829 { 830 CheckParamExists(iprm); 831 m_params[iprm].AssignList(begin, end, sz); 832 } 833 834 template<typename I> BindSet(int iprm,I begin,I end,size_t sz)835 void BindSet(int iprm, I begin, I end, size_t sz) 836 { 837 CheckParamExists(iprm); 838 m_params[iprm].AssignSet<I>(begin, end, sz); 839 } 840 841 template<typename I> BindSet(int iprm,const set<I> & v)842 void BindSet(int iprm, const set<I>& v) 843 { 844 BindSet(iprm, v.begin(), v.end(), v.size()); 845 } 846 847 template<typename ...T> BindTuple(int iprm,const tuple<T...> & value)848 void BindTuple(int iprm, const tuple<T...>& value) 849 { 850 CheckParamExists(iprm); 851 m_params[iprm].Assign(value); 852 } 853 ParamCount(void) const854 size_t ParamCount(void) const 855 { 856 return m_params.size(); 857 } 858 859 CassValueType ParamType(int iprm) const; 860 int32_t ParamAsInt32(int iprm); 861 int64_t ParamAsInt64(int iprm); 862 string ParamAsStr(int iprm) const; 863 void ParamAsStr(int iprm, string& value) const; 864 string ParamAsStrForDebug(int iprm) const; 865 866 async_rslt_t NextRow(); 867 868 template <typename F = int> FieldIsNull(F ifld) const869 bool FieldIsNull(F ifld) const 870 { 871 const CassValue* clm = GetColumn(ifld); 872 return !clm || cass_value_is_null(clm); 873 } 874 875 template <typename F = int> FieldType(F ifld) const876 CCassDataType FieldType(F ifld) const 877 { 878 if (FieldIsNull(ifld)) { 879 return dtNull; 880 } 881 882 const CassValue* clm = GetColumn(ifld); 883 CassValueType type = cass_value_type(clm); 884 switch (type) { 885 case CASS_VALUE_TYPE_CUSTOM: 886 return dtCustom; 887 case CASS_VALUE_TYPE_TEXT: 888 case CASS_VALUE_TYPE_ASCII: 889 case CASS_VALUE_TYPE_VARCHAR: 890 return dtText; 891 case CASS_VALUE_TYPE_VARINT: 892 case CASS_VALUE_TYPE_DECIMAL: 893 case CASS_VALUE_TYPE_TINY_INT: 894 case CASS_VALUE_TYPE_SMALL_INT: 895 case CASS_VALUE_TYPE_INT: 896 case CASS_VALUE_TYPE_BIGINT: 897 return dtInteger; 898 case CASS_VALUE_TYPE_DOUBLE: 899 case CASS_VALUE_TYPE_FLOAT: 900 return dtFloat; 901 902 case CASS_VALUE_TYPE_BLOB: 903 return dtBlob; 904 case CASS_VALUE_TYPE_BOOLEAN: 905 return dtBoolean; 906 case CASS_VALUE_TYPE_TIMESTAMP: 907 return dtTimestamp; 908 case CASS_VALUE_TYPE_COUNTER: 909 return dtCounter; 910 case CASS_VALUE_TYPE_UUID: 911 case CASS_VALUE_TYPE_TIMEUUID: 912 return dtUuid; 913 case CASS_VALUE_TYPE_LIST: 914 case CASS_VALUE_TYPE_MAP: 915 case CASS_VALUE_TYPE_SET: 916 return dtCollection; 917 case CASS_VALUE_TYPE_INET: 918 return dtNetwork; 919 case CASS_VALUE_TYPE_UNKNOWN: 920 default: 921 return dtUnknown; 922 } 923 } 924 925 template <typename F = int> FieldGetBoolValue(F ifld) const926 bool FieldGetBoolValue(F ifld) const 927 { 928 bool rv; 929 try { 930 Convert::CassValueConvert<bool>(GetColumn(ifld), rv); 931 } 932 catch (CCassandraException& ex) { 933 ex.AddToMessage(GetColumnDef(ifld)); 934 throw; 935 } 936 return rv; 937 } 938 939 template <typename F = int> FieldGetBoolValue(F ifld,bool _default) const940 bool FieldGetBoolValue(F ifld, bool _default) const 941 { 942 bool rv; 943 Convert::CassValueConvertDef<bool>(GetColumn(ifld), _default, rv); 944 return rv; 945 } 946 947 template <typename F = int> FieldGetInt8Value(F ifld) const948 int8_t FieldGetInt8Value(F ifld) const 949 { 950 int8_t rv; 951 try { 952 Convert::CassValueConvert<int8_t>(GetColumn(ifld), rv); 953 } 954 catch (CCassandraException& ex) { 955 ex.AddToMessage(GetColumnDef(ifld)); 956 throw; 957 } 958 return rv; 959 } 960 961 template <typename F = int> FieldGetInt8Value(F ifld,int8_t _default) const962 int8_t FieldGetInt8Value(F ifld, int8_t _default) const 963 { 964 int8_t rv; 965 Convert::CassValueConvertDef<int8_t>(GetColumn(ifld), _default, rv); 966 return rv; 967 } 968 969 template <typename F = int> FieldGetInt16Value(F ifld) const970 int16_t FieldGetInt16Value(F ifld) const 971 { 972 int16_t rv; 973 try { 974 Convert::CassValueConvert<int16_t>(GetColumn(ifld), rv); 975 } 976 catch (CCassandraException& ex) { 977 ex.AddToMessage(GetColumnDef(ifld)); 978 throw; 979 } 980 return rv; 981 } 982 983 template <typename F = int> FieldGetInt16Value(F ifld,int16_t _default) const984 int16_t FieldGetInt16Value(F ifld, int16_t _default) const 985 { 986 int16_t rv; 987 Convert::CassValueConvertDef<int16_t>(GetColumn(ifld), _default, rv); 988 return rv; 989 } 990 991 template <typename F = int> FieldGetInt32Value(F ifld) const992 int32_t FieldGetInt32Value(F ifld) const 993 { 994 int32_t rv; 995 try { 996 Convert::CassValueConvert<int32_t>(GetColumn(ifld), rv); 997 } 998 catch (CCassandraException& ex) { 999 ex.AddToMessage(GetColumnDef(ifld)); 1000 throw; 1001 } 1002 return rv; 1003 } 1004 1005 template <typename F = int> FieldGetInt32Value(F ifld,int32_t _default) const1006 int32_t FieldGetInt32Value(F ifld, int32_t _default) const 1007 { 1008 int32_t rv; 1009 Convert::CassValueConvertDef<int32_t>(GetColumn(ifld), _default, rv); 1010 return rv; 1011 } 1012 1013 template <typename F = int> FieldGetInt64Value(F ifld) const1014 int64_t FieldGetInt64Value(F ifld) const 1015 { 1016 int64_t rv; 1017 try { 1018 Convert::CassValueConvert<int64_t>(GetColumn(ifld), rv); 1019 } 1020 catch (CCassandraException& ex) { 1021 ex.AddToMessage(GetColumnDef(ifld)); 1022 throw; 1023 } 1024 return rv; 1025 } 1026 1027 template <typename F = int> FieldGetInt64Value(F ifld,int64_t _default) const1028 int64_t FieldGetInt64Value(F ifld, int64_t _default) const 1029 { 1030 int64_t rv; 1031 Convert::CassValueConvertDef<int64_t>(GetColumn(ifld), _default, rv); 1032 return rv; 1033 } 1034 1035 template <typename F = int> FieldGetFloatValue(F ifld) const1036 double FieldGetFloatValue(F ifld) const 1037 { 1038 double rv; 1039 try { 1040 Convert::CassValueConvert<double>(GetColumn(ifld), rv); 1041 } 1042 catch (CCassandraException& ex) { 1043 ex.AddToMessage(GetColumnDef(ifld)); 1044 throw; 1045 } 1046 1047 return rv; 1048 } 1049 1050 template <typename F = int> FieldGetFloatValue(F ifld,double _default) const1051 double FieldGetFloatValue(F ifld, double _default) const 1052 { 1053 double rv; 1054 Convert::CassValueConvertDef<double>(GetColumn(ifld), _default, rv); 1055 return rv; 1056 } 1057 1058 template <typename F = int> FieldGetStrValue(F ifld) const1059 string FieldGetStrValue(F ifld) const 1060 { 1061 string rv; 1062 try { 1063 Convert::CassValueConvert<string>(GetColumn(ifld), rv); 1064 } 1065 catch (CCassandraException& ex) { 1066 ex.AddToMessage(GetColumnDef(ifld)); 1067 throw; 1068 } 1069 return rv; 1070 } 1071 1072 template <typename F = int> FieldGetStrValueDef(F ifld,const string & _default) const1073 string FieldGetStrValueDef(F ifld, const string& _default) const 1074 { 1075 string rv; 1076 Convert::CassValueConvertDef<string>(GetColumn(ifld), _default, rv); 1077 return rv; 1078 } 1079 1080 template <typename I, typename F = int> FieldGetContainerValue(F ifld,I insert_iterator) const1081 void FieldGetContainerValue(F ifld, I insert_iterator) const 1082 { 1083 using TValueType = typename I::container_type::value_type; 1084 const CassValue * clm = GetColumn(ifld); 1085 CassValueType type = cass_value_type(clm); 1086 1087 switch (type) { 1088 case CASS_COLLECTION_TYPE_LIST: 1089 case CASS_COLLECTION_TYPE_SET: 1090 { 1091 if (!cass_value_is_null(clm)) { 1092 unique_ptr<CassIterator, function<void(CassIterator*)> > items_iterator_ptr( 1093 cass_iterator_from_collection(clm), 1094 [](CassIterator* itr) { 1095 cass_iterator_free(itr); 1096 } 1097 ); 1098 while (cass_iterator_next(items_iterator_ptr.get())) { 1099 const CassValue* value = cass_iterator_get_value(items_iterator_ptr.get()); 1100 if (!value) { 1101 RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed"); 1102 } 1103 1104 TValueType v; 1105 try { 1106 Convert::CassValueConvert(value, v); 1107 } 1108 catch (CCassandraException& ex) { 1109 ex.AddToMessage(GetColumnDef(ifld)); 1110 throw; 1111 } 1112 insert_iterator++ = move(v); 1113 } 1114 } 1115 } 1116 break; 1117 default: 1118 if (!cass_value_is_null(clm)) { 1119 RAISE_DB_ERROR(eFetchFailed, 1120 "Failed to convert Cassandra value to collection: unsupported data type (Cassandra type - " + 1121 to_string(static_cast<int>(type)) + ")" 1122 ); 1123 } 1124 } 1125 } 1126 1127 template <typename F = int> FieldGetStrValue(F ifld,string & value) const1128 void FieldGetStrValue(F ifld, string & value) const 1129 { 1130 value = FieldGetStrValue(ifld); 1131 } 1132 1133 template <typename F = int> FieldGetStrValueDef(F ifld,string & value,const string & _default) const1134 void FieldGetStrValueDef(F ifld, string & value, const string & _default) const 1135 { 1136 value = FieldGetStrValueDef(ifld, _default); 1137 } 1138 1139 template<typename T, typename F = int> 1140 T FieldGetTupleValue(F ifld) const 1141 { 1142 const CassValue * clm = GetColumn(ifld); 1143 if (!cass_value_is_null(clm)) { 1144 T t; 1145 try { 1146 Convert::CassValueConvert(clm, t); 1147 } 1148 catch (CCassandraException& ex) { 1149 ex.AddToMessage(GetColumnDef(ifld)); 1150 throw; 1151 } 1152 return t; 1153 } 1154 return T(); 1155 } 1156 1157 template<typename T, typename F = int> FieldGetSetValues(F ifld,std::vector<T> & values) const1158 void FieldGetSetValues(F ifld, std::vector<T>& values) const 1159 { 1160 const CassValue * clm = GetColumn(ifld); 1161 CassValueType type = cass_value_type(clm); 1162 switch (type) { 1163 case CASS_VALUE_TYPE_SET: { 1164 unique_ptr<CassIterator, function<void(CassIterator*)> > items_iterator_ptr( 1165 cass_iterator_from_collection(clm), 1166 [](CassIterator* itr) { 1167 cass_iterator_free(itr); 1168 } 1169 ); 1170 while (cass_iterator_next(items_iterator_ptr.get())) { 1171 T v; 1172 const CassValue* value = cass_iterator_get_value(items_iterator_ptr.get()); 1173 if (!value) { 1174 RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed"); 1175 } 1176 try { 1177 Convert::CassValueConvert(value, v); 1178 } 1179 catch (CCassandraException& ex) { 1180 ex.AddToMessage(GetColumnDef(ifld)); 1181 throw; 1182 } 1183 values.push_back(move(v)); 1184 } 1185 break; 1186 } 1187 default: 1188 RAISE_DB_ERROR(eFetchFailed, 1189 "Failed to convert Cassandra value to set<>: unsupported data type (Cassandra type - " + 1190 to_string(static_cast<int>(type)) + ")" 1191 ); 1192 } 1193 } 1194 1195 template<typename T, typename F = int> FieldGetSetValues(F ifld,std::set<T> & values) const1196 void FieldGetSetValues(F ifld, std::set<T>& values) const 1197 { 1198 const CassValue * clm = GetColumn(ifld); 1199 CassValueType type = cass_value_type(clm); 1200 switch (type) { 1201 case CASS_VALUE_TYPE_SET: { 1202 unique_ptr<CassIterator, function<void(CassIterator*)>> items_iterator( 1203 cass_iterator_from_collection(clm), 1204 [](CassIterator* itr) { 1205 cass_iterator_free(itr); 1206 } 1207 ); 1208 if (items_iterator != nullptr) { 1209 while (cass_iterator_next(items_iterator.get())) { 1210 T v; 1211 const CassValue* value = cass_iterator_get_value(items_iterator.get()); 1212 if (!value) { 1213 RAISE_DB_ERROR(eSeqFailed, "cass iterator fetch failed"); 1214 } 1215 try { 1216 Convert::CassValueConvert(value, v); 1217 } 1218 catch (CCassandraException& ex) { 1219 ex.AddToMessage(GetColumnDef(ifld)); 1220 throw; 1221 } 1222 values.insert(move(v)); 1223 } 1224 } 1225 break; 1226 } 1227 default: 1228 RAISE_DB_ERROR(eFetchFailed, 1229 "Failed to convert Cassandra value to set<>: unsupported data type (Cassandra type - " + 1230 to_string(static_cast<int>(type)) + ")" 1231 ); 1232 } 1233 } 1234 1235 template <typename K, typename V, typename F = int> FieldGetMapValue(F ifld,map<K,V> & result) const1236 void FieldGetMapValue(F ifld, map<K, V>& result) const 1237 { 1238 const CassValue * clm = GetColumn(ifld); 1239 CassValueType type = cass_value_type(clm); 1240 switch (type) { 1241 case CASS_VALUE_TYPE_MAP: { 1242 result.clear(); 1243 unique_ptr<CassIterator, function<void(CassIterator*)>> items_iterator( 1244 cass_iterator_from_map(clm), 1245 [](CassIterator* itr) { 1246 cass_iterator_free(itr); 1247 } 1248 ); 1249 if (items_iterator != nullptr) { 1250 while (cass_iterator_next(items_iterator.get())) { 1251 const CassValue* key = cass_iterator_get_map_key(items_iterator.get()); 1252 const CassValue* val = cass_iterator_get_map_value(items_iterator.get()); 1253 K k; 1254 try { 1255 Convert::CassValueConvert(key, k); 1256 } 1257 catch (CCassandraException& ex) { 1258 ex.AddToMessage(GetColumnDef(ifld)); 1259 throw; 1260 } 1261 V v; 1262 try { 1263 Convert::CassValueConvert(val, v); 1264 } 1265 catch (CCassandraException& ex) { 1266 ex.AddToMessage(GetColumnDef(ifld)); 1267 throw; 1268 } 1269 result.emplace(move(k), move(v)); 1270 } 1271 } 1272 break; 1273 } 1274 default: 1275 RAISE_DB_ERROR(eFetchFailed, 1276 "Failed to convert Cassandra value to map<>: unsupported data type (Cassandra type - " + 1277 to_string(static_cast<int>(type)) + ")" 1278 ); 1279 } 1280 } 1281 1282 template<typename F = int> FieldGetBlobValue(F ifld,unsigned char * buf,size_t len) const1283 size_t FieldGetBlobValue(F ifld, unsigned char* buf, size_t len) const 1284 { 1285 const CassValue * clm = GetColumn(ifld); 1286 const unsigned char * output = nullptr; 1287 size_t outlen = 0; 1288 CassError err = cass_value_get_bytes(clm, &output, &outlen); 1289 if (err != CASS_OK) { 1290 RAISE_CASS_ERROR(err, eFetchFailed, "Failed to fetch blob data:"); 1291 } 1292 if (len < outlen) { 1293 RAISE_DB_ERROR(eFetchFailed, "Failed to fetch blob data: insufficient buffer provided"); 1294 } 1295 memcpy(buf, output, outlen); 1296 return outlen; 1297 } 1298 1299 template<typename F = int> FieldGetBlobRaw(F ifld,const unsigned char ** rawbuf) const1300 size_t FieldGetBlobRaw(F ifld, const unsigned char** rawbuf) const 1301 { 1302 if (rawbuf) { 1303 *rawbuf = nullptr; 1304 } 1305 1306 const CassValue * clm = GetColumn(ifld); 1307 const unsigned char * output = nullptr; 1308 size_t outlen = 0; 1309 CassError rc = cass_value_get_bytes(clm, &output, &outlen); 1310 if (rc == CASS_ERROR_LIB_NULL_VALUE) { 1311 return 0; 1312 } 1313 1314 if (rc != CASS_OK) { 1315 RAISE_CASS_ERROR(rc, eFetchFailed, "Failed to fetch blob data:"); 1316 } 1317 if (rawbuf) { 1318 *rawbuf = output; 1319 } 1320 return outlen; 1321 } 1322 1323 template<typename F = int> FieldGetBlobSize(F ifld) const1324 size_t FieldGetBlobSize(F ifld) const 1325 { 1326 const CassValue * clm = GetColumn(ifld); 1327 const unsigned char * output = nullptr; 1328 size_t outlen = 0; 1329 CassError rc = cass_value_get_bytes(clm, &output, &outlen); 1330 if (rc != CASS_ERROR_LIB_NULL_VALUE && rc != CASS_OK) { 1331 RAISE_CASS_ERROR(rc, eFetchFailed, "Failed to fetch blob data:"); 1332 } 1333 return outlen; 1334 } 1335 GetConnection(void)1336 shared_ptr<CCassConnection> GetConnection(void) 1337 { 1338 return m_connection; 1339 } 1340 1341 /** 1342 * Use SatOnData3. This one is unsafe and will be removed 1343 */ SetOnData(TCassQueryOnDataCallback cb,void * data)1344 NCBI_DEPRECATED void SetOnData(TCassQueryOnDataCallback cb, void* data) 1345 { 1346 if (m_ondata == cb && m_ondata_data == data) { 1347 return; 1348 } 1349 if (m_future) { 1350 if (m_ondata) { 1351 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned"); 1352 } 1353 if (!cb && m_future) { 1354 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset"); 1355 } 1356 } 1357 m_ondata = move(cb); 1358 m_ondata_data = data; 1359 if (m_future) { 1360 SetupOnDataCallback(); 1361 } 1362 } 1363 1364 /** 1365 * Use SatOnData3. This one is unsafe and will be removed 1366 */ SetOnData2(TCassQueryOnData2Callback cb,void * Data)1367 NCBI_DEPRECATED void SetOnData2(TCassQueryOnData2Callback cb, void* Data) 1368 { 1369 if (m_ondata2 == cb && m_ondata2_data == Data) { 1370 return; 1371 } 1372 if (m_future) { 1373 if (m_ondata2) { 1374 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned"); 1375 } 1376 if (!cb) { 1377 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset"); 1378 } 1379 } 1380 m_ondata2 = cb; 1381 m_ondata2_data = Data; 1382 if (m_future) { 1383 SetupOnDataCallback(); 1384 } 1385 } 1386 SetOnData3(shared_ptr<CCassDataCallbackReceiver> cb)1387 void SetOnData3(shared_ptr<CCassDataCallbackReceiver> cb) 1388 { 1389 auto ondata3 = m_ondata3.lock(); 1390 if (ondata3 == cb) { 1391 return; 1392 } 1393 1394 if (m_future) { 1395 if (ondata3) { 1396 RAISE_DB_ERROR(eSeqFailed, "Future callback has already been assigned"); 1397 } 1398 if (!cb) { 1399 RAISE_DB_ERROR(eSeqFailed, "Future callback can not be reset"); 1400 } 1401 } 1402 m_ondata3 = cb; 1403 if (m_future) { 1404 SetupOnDataCallback(); 1405 } 1406 } 1407 SetOnExecute(void (* Cb)(CCassQuery &,void *),void * Data)1408 void SetOnExecute(void (*Cb)(CCassQuery&, void*), void* Data) 1409 { 1410 m_onexecute = Cb; 1411 m_onexecute_data = Data; 1412 } 1413 1414 static const unsigned int DEFAULT_PAGE_SIZE; 1415 }; 1416 1417 template<> 1418 const CassValue* CCassQuery::GetColumn(int ifld) const; 1419 template<> 1420 const CassValue* CCassQuery::GetColumn(const string& name) const; 1421 template<> 1422 const CassValue* CCassQuery::GetColumn(const char* name) const; 1423 template<> 1424 string CCassQuery::GetColumnDef(int ifld) const; 1425 template<> 1426 string CCassQuery::GetColumnDef(const string& name) const; 1427 template<> 1428 string CCassQuery::GetColumnDef(const char* name) const; 1429 1430 END_IDBLOB_SCOPE 1431 1432 #endif // OBJTOOLS__PUBSEQ_GATEWAY__IMPL__CASSANDRA__CASS_DRIVER_HPP_ 1433