1 /* $Id: cass_driver.cpp 617515 2020-10-02 13:22:45Z saprykin $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Dmitri Dmitrienko
27 *
28 * File Description:
29 *
30 * Wrapper class around cassandra "C"-API
31 *
32 */
33
34 #include <ncbi_pch.hpp>
35
36 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
37
38 #include <unistd.h>
39 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
40 #include <objtools/pubseq_gateway/impl/cassandra/cass_exception.hpp>
41 #include <objtools/pubseq_gateway/impl/cassandra/cass_util.hpp>
42
43 #include <atomic>
44 #include <limits>
45 #include <memory>
46 #include <set>
47 #include <string>
48 #include <sstream>
49 #include <utility>
50 #include <vector>
51
52 #include "corelib/ncbitime.hpp"
53 #include "corelib/ncbistr.hpp"
54
55 BEGIN_IDBLOB_SCOPE
56 USING_NCBI_SCOPE;
57
58 BEGIN_SCOPE()
59 constexpr unsigned kDefaultIOThreads = 32;
60 constexpr cass_duration_t kDisconnectTimeoutMcs = 5000000;
LogCallback(const CassLogMessage * message,void *)61 void LogCallback(const CassLogMessage * message, void * /*data*/)
62 {
63 switch (message->severity) {
64 case CASS_LOG_CRITICAL:
65 ERR_POST(Critical << message->message);
66 break;
67 case CASS_LOG_WARN:
68 ERR_POST(Warning << message->message);
69 break;
70 case CASS_LOG_INFO:
71 ERR_POST(Info << message->message);
72 break;
73 case CASS_LOG_DEBUG:
74 ERR_POST(Trace << message->message);
75 break;
76 case CASS_LOG_TRACE:
77 ERR_POST(Trace << message->message);
78 break;
79 case CASS_LOG_ERROR:
80 default:
81 ERR_POST(Error << message->message);
82 }
83 }
84
s_MapFromToolkitSeverity(EDiagSev severity)85 CassLogLevel s_MapFromToolkitSeverity(EDiagSev severity)
86 {
87 switch (severity) {
88 case eDiag_Info: return CASS_LOG_INFO;
89 case eDiag_Warning: return CASS_LOG_WARN;
90 case eDiag_Error: return CASS_LOG_ERROR;
91 case eDiag_Critical: return CASS_LOG_CRITICAL;
92 case eDiag_Fatal: return CASS_LOG_CRITICAL;
93 case eDiag_Trace: return CASS_LOG_TRACE;
94 }
95 return CASS_LOG_ERROR;
96 }
97 END_SCOPE()
98
99 bool CCassConnection::m_LoggingInitialized = false;
100 bool CCassConnection::m_LoggingEnabled = false;
101 EDiagSev CCassConnection::m_LoggingLevel = eDiag_Error;
102 atomic<CassUuidGen*> CCassConnection::m_CassUuidGen(nullptr);
103
104 const unsigned int CCassQuery::DEFAULT_PAGE_SIZE = 4096;
105
106 /** CCassConnection */
CCassConnection()107 CCassConnection::CCassConnection()
108 : m_port(0)
109 , m_cluster(nullptr)
110 , m_session(nullptr)
111 , m_ctimeoutms(0)
112 , m_qtimeoutms(0)
113 , m_last_query_cnt(0)
114 , m_loadbalancing(LB_DCAWARE)
115 , m_tokenaware(true)
116 , m_latencyaware(false)
117 , m_numThreadsIo(0)
118 , m_numConnPerHost(0)
119 , m_maxConnPerHost(0)
120 , m_keepalive(0)
121 , m_fallback_readconsistency(false)
122 , m_FallbackWriteConsistency(0)
123 , m_active_statements(0)
124 {}
125
126
~CCassConnection()127 CCassConnection::~CCassConnection()
128 {
129 Close();
130 }
131
132
SetLogging(EDiagSev severity)133 void CCassConnection::SetLogging(EDiagSev severity)
134 {
135 cass_log_set_level(s_MapFromToolkitSeverity(severity));
136 cass_log_set_callback(LogCallback, nullptr);
137 m_LoggingEnabled = true;
138 m_LoggingLevel = severity;
139 m_LoggingInitialized = true;
140 }
141
142
DisableLogging(void)143 void CCassConnection::DisableLogging(void)
144 {
145 cass_log_set_level(CASS_LOG_DISABLED);
146 m_LoggingEnabled = false;
147 m_LoggingInitialized = true;
148 }
149
150
UpdateLogging(void)151 void CCassConnection::UpdateLogging(void)
152 {
153 if (m_LoggingInitialized) {
154 if (m_LoggingEnabled) {
155 SetLogging(m_LoggingLevel);
156 } else {
157 DisableLogging();
158 }
159 }
160 }
161
QryTimeout(void) const162 unsigned int CCassConnection::QryTimeout(void) const
163 {
164 return m_qtimeoutms;
165 }
166
QryTimeoutMks(void) const167 unsigned int CCassConnection::QryTimeoutMks(void) const
168 {
169 return QryTimeout() * 1000;
170 }
171
SetRtLimits(unsigned int numThreadsIo,unsigned int numConnPerHost,unsigned int maxConnPerHost)172 void CCassConnection::SetRtLimits(unsigned int numThreadsIo, unsigned int numConnPerHost,
173 unsigned int maxConnPerHost)
174 {
175 m_numThreadsIo = numThreadsIo;
176 m_numConnPerHost = numConnPerHost;
177 m_maxConnPerHost = maxConnPerHost;
178 }
179
SetKeepAlive(unsigned int keepalive)180 void CCassConnection::SetKeepAlive(unsigned int keepalive)
181 {
182 m_keepalive = keepalive;
183 }
184
185
Create()186 shared_ptr<CCassConnection> CCassConnection::Create()
187 {
188 return shared_ptr<CCassConnection>(new CCassConnection());
189 }
190
191
SetLoadBalancing(loadbalancing_policy_t policy)192 void CCassConnection::SetLoadBalancing(loadbalancing_policy_t policy)
193 {
194 m_loadbalancing = policy;
195 }
196
197
SetTokenAware(bool value)198 void CCassConnection::SetTokenAware(bool value)
199 {
200 m_tokenaware = value;
201 }
202
203
SetLatencyAware(bool value)204 void CCassConnection::SetLatencyAware(bool value)
205 {
206 m_latencyaware = value;
207 }
208
SetTimeouts(unsigned int ConnTimeoutMs)209 void CCassConnection::SetTimeouts(unsigned int ConnTimeoutMs)
210 {
211 SetTimeouts(ConnTimeoutMs, CASS_DRV_TIMEOUT_MS);
212 }
213
SetTimeouts(unsigned int ConnTimeoutMs,unsigned int QryTimeoutMs)214 void CCassConnection::SetTimeouts(unsigned int ConnTimeoutMs, unsigned int QryTimeoutMs)
215 {
216 if (ConnTimeoutMs == 0 || ConnTimeoutMs > kCassMaxTimeout) {
217 ConnTimeoutMs = kCassMaxTimeout;
218 }
219 if (QryTimeoutMs == 0 || QryTimeoutMs > kCassMaxTimeout) {
220 QryTimeoutMs = kCassMaxTimeout;
221 }
222 m_qtimeoutms = QryTimeoutMs;
223 m_ctimeoutms = ConnTimeoutMs;
224 if (m_cluster) {
225 cass_cluster_set_request_timeout(m_cluster, m_qtimeoutms);
226 }
227 }
228
SetFallBackRdConsistency(bool value)229 void CCassConnection::SetFallBackRdConsistency(bool value)
230 {
231 m_fallback_readconsistency = value;
232 }
233
GetFallBackRdConsistency(void) const234 bool CCassConnection::GetFallBackRdConsistency(void) const
235 {
236 return m_fallback_readconsistency;
237 }
238
SetFallBackWrConsistency(unsigned int value)239 void CCassConnection::SetFallBackWrConsistency(unsigned int value)
240 {
241 m_FallbackWriteConsistency = value;
242 }
243
GetFallBackWrConsistency(void) const244 unsigned int CCassConnection::GetFallBackWrConsistency(void) const
245 {
246 return m_FallbackWriteConsistency;
247 }
248
NewTimeUUID()249 string CCassConnection::NewTimeUUID()
250 {
251 char buf[CASS_UUID_STRING_LENGTH];
252 if (!m_CassUuidGen.load()) {
253 CassUuidGen *gen, *nothing = nullptr;
254 gen = cass_uuid_gen_new();
255 if (!m_CassUuidGen.compare_exchange_weak(nothing, gen)) {
256 cass_uuid_gen_free(gen);
257 }
258 }
259
260 CassUuid uuid;
261 cass_uuid_gen_time(static_cast<CassUuidGen*>(m_CassUuidGen.load()), &uuid);
262 cass_uuid_string(uuid, buf);
263 return string(buf);
264 }
265
Connect()266 void CCassConnection::Connect()
267 {
268 if (IsConnected()) {
269 NCBI_THROW(CCassandraException, eSeqFailed, "cassandra driver has already been connected");
270 }
271 if (m_host.empty()) {
272 NCBI_THROW(CCassandraException, eSeqFailed, "cassandra host list is empty");
273 }
274
275 UpdateLogging();
276 m_cluster = cass_cluster_new();
277
278 cass_cluster_set_connect_timeout(m_cluster, m_ctimeoutms);
279 cass_cluster_set_contact_points(m_cluster, m_host.c_str());
280 if (m_port > 0) {
281 cass_cluster_set_port(m_cluster, m_port);
282 }
283 if (m_qtimeoutms > 0) {
284 cass_cluster_set_request_timeout(m_cluster, m_qtimeoutms);
285 }
286
287 if (!m_user.empty()) {
288 cass_cluster_set_credentials(m_cluster, m_user.c_str(), m_pwd.c_str());
289 }
290
291 if (m_loadbalancing != LB_DCAWARE && m_loadbalancing == LB_ROUNDROBIN) {
292 cass_cluster_set_load_balance_round_robin(m_cluster);
293 }
294
295 cass_cluster_set_token_aware_routing(m_cluster, m_tokenaware ? cass_true : cass_false);
296 cass_cluster_set_latency_aware_routing(m_cluster, m_latencyaware ? cass_true : cass_false);
297 cass_cluster_set_num_threads_io(m_cluster, m_numThreadsIo ? m_numThreadsIo : kDefaultIOThreads);
298
299
300 if (m_maxConnPerHost) {
301 cass_cluster_set_max_connections_per_host(m_cluster, m_maxConnPerHost);
302 }
303
304 if (m_numConnPerHost) {
305 cass_cluster_set_core_connections_per_host(m_cluster, m_numConnPerHost);
306 }
307
308 if (m_keepalive > 0) {
309 cass_cluster_set_tcp_keepalive(m_cluster, cass_true, m_keepalive);
310 }
311
312 if (!m_blacklist.empty()) {
313 cass_cluster_set_blacklist_filtering(m_cluster, m_blacklist.c_str());
314 }
315
316 try {
317 Reconnect();
318 } catch (CCassandraException const &) {
319 Close();
320 throw;
321 }
322 }
323
324
CloseSession()325 void CCassConnection::CloseSession()
326 {
327 {
328 CSpinGuard guard(m_prepared_mux);
329 for(auto & item : m_prepared) {
330 if (item.second) {
331 cass_prepared_free(item.second);
332 item.second = nullptr;
333 }
334 }
335 m_prepared.clear();
336 }
337
338 if (m_session) {
339 CassFuture * close_future;
340 bool free = false;
341 close_future = cass_session_close(m_session);
342 if (close_future) {
343 free = cass_future_wait_timed(close_future, kDisconnectTimeoutMcs);
344 cass_future_free(close_future);
345 }
346 // otherwise we can't free it, let better leak than crash
347 if (free) {
348 cass_session_free(m_session);
349 }
350 m_session = nullptr;
351 }
352 }
353
354
Reconnect()355 void CCassConnection::Reconnect()
356 {
357 if (!m_cluster) {
358 NCBI_THROW(CCassandraException, eSeqFailed,
359 "invalid sequence of operations, driver is not connected, can't re-connect");
360 }
361
362 if (m_session) {
363 CloseSession();
364 }
365
366 m_session = cass_session_new();
367 if (!m_session) {
368 RAISE_DB_ERROR(eRsrcFailed, "failed to get cassandra session handle");
369 }
370
371 CassFuture * __future = cass_session_connect(m_session, m_cluster);
372 if (!__future) {
373 RAISE_DB_ERROR(eRsrcFailed, "failed to obtain cassandra connection future");
374 }
375
376 unique_ptr<CassFuture, function<void(CassFuture*)>> future(
377 __future,
378 [](CassFuture* future)
379 {
380 cass_future_free(future);
381 }
382 );
383
384 CassError rc = CASS_OK;
385 cass_future_wait(future.get());
386 rc = cass_future_error_code(future.get());
387 if (rc != CASS_OK) {
388 RAISE_CASS_CONN_ERROR(future.get(), string(""));
389 }
390
391 if (!m_keyspace.empty()) {
392 string _sav = m_keyspace;
393 m_keyspace.clear();
394 SetKeyspace(_sav);
395 }
396 }
397
398
Close()399 void CCassConnection::Close()
400 {
401 CloseSession();
402 if (m_cluster) {
403 cass_cluster_free(m_cluster);
404 m_cluster = nullptr;
405 }
406 }
407
IsConnected(void)408 bool CCassConnection::IsConnected(void)
409 {
410 return (m_cluster || m_session);
411 }
412
GetActiveStatements(void) const413 int64_t CCassConnection::GetActiveStatements(void) const
414 {
415 return m_active_statements;
416 }
417
GetMetrics(void)418 CassMetrics CCassConnection::GetMetrics(void)
419 {
420 CassMetrics metrics;
421 if(m_session) {
422 cass_session_get_metrics(m_session, &metrics);
423 }
424 return metrics;
425 }
426
SetConnProp(const string & host,const string & user,const string & pwd,int16_t port)427 void CCassConnection::SetConnProp(
428 const string & host,
429 const string & user,
430 const string & pwd,
431 int16_t port)
432 {
433 m_host = host;
434 m_user = user;
435 m_pwd = pwd;
436 m_port = port;
437 if (IsConnected()) {
438 Close();
439 }
440 }
441
442
SetKeyspace(const string & keyspace)443 void CCassConnection::SetKeyspace(const string & keyspace)
444 {
445 if (m_keyspace != keyspace) {
446 if (IsConnected()) {
447 shared_ptr<CCassQuery> query(NewQuery());
448 query->SetSQL(string("USE ") + keyspace, 0);
449 query->Execute(CASS_CONSISTENCY_LOCAL_QUORUM);
450 }
451 m_keyspace = keyspace;
452 }
453 }
454
Keyspace(void) const455 string CCassConnection::Keyspace(void) const
456 {
457 return m_keyspace;
458 }
459
SetBlackList(const string & blacklist)460 void CCassConnection::SetBlackList(const string & blacklist)
461 {
462 m_blacklist = blacklist;
463 }
464
NewQuery()465 shared_ptr<CCassQuery> CCassConnection::NewQuery()
466 {
467 if (!m_session) {
468 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, driver is not connected");
469 }
470
471 shared_ptr<CCassQuery> rv(new CCassQuery(shared_from_this()));
472 rv->SetTimeout(m_qtimeoutms);
473 return rv;
474 }
475
getTokenRanges(TTokenRanges & ranges)476 void CCassConnection::getTokenRanges(TTokenRanges &ranges)
477 {
478 GetTokenRanges(ranges);
479 }
480
GetTokenRanges(TTokenRanges & ranges)481 void CCassConnection::GetTokenRanges(TTokenRanges &ranges)
482 {
483 set<TTokenValue> cluster_tokens;
484 map<string, vector<TTokenValue>> peer_tokens;
485
486 shared_ptr<CCassQuery> query(NewQuery());
487 query->SetSQL("select data_center, schema_version, host_id, tokens "
488 "from system.local", 0);
489 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
490
491 string host_id, datacenter, schema;
492 vector<string> tokens;
493
494 query->NextRow();
495 query->FieldGetStrValue(0, datacenter);
496 query->FieldGetStrValue(1, schema);
497 query->FieldGetStrValue(2, host_id);
498 query->FieldGetSetValues(3, tokens);
499 auto itr = peer_tokens.insert(make_pair(host_id,vector<TTokenValue>())).first;
500 for(const auto & item: tokens) {
501 TTokenValue value = strtol(item.c_str(), nullptr, 10);
502 itr->second.push_back(value);
503 cluster_tokens.insert(value);
504 }
505
506 ERR_POST(Info << "GET_TOKEN_MAP: Schema version is " << schema);
507 ERR_POST(Info << "GET_TOKEN_MAP: datacenter is " << datacenter);
508 ERR_POST(Info << "GET_TOKEN_MAP: host_id is " << host_id);
509
510 unsigned int query_host_count = 0;
511 unsigned int retries = 0;
512 set<string> query_hosts;
513 while (retries < 3 && (query_host_count == 0 || peer_tokens.size() == query_host_count)) {
514 retries++;
515 if (query_host_count != 0) {
516 ERR_POST(Info << "GET_TOKEN_MAP: Host_id count is too small. "
517 "Retrying system.peers fetch. " << retries);
518 }
519
520 query_host_count = 0;
521 query_hosts.clear();
522 query->SetSQL("select host_id, data_center, schema_version, tokens from system.peers", 0);
523 query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
524 while (query->NextRow() == ar_dataready) {
525 string peer_host_id, peer_dc, peer_schema;
526 vector<string> tokens;
527 query->FieldGetStrValue(1, peer_dc);
528 query->FieldGetStrValue(2, peer_schema);
529 if (datacenter == peer_dc && schema == peer_schema) {
530 query->FieldGetStrValue(0, peer_host_id);
531 if (query_hosts.find(peer_host_id) == query_hosts.end()) {
532 query_hosts.insert(peer_host_id);
533 query_host_count++;
534 }
535 query->FieldGetSetValues(3, tokens);
536 ERR_POST(Info << "GET_TOKEN_MAP: host is " << peer_host_id);
537 ERR_POST(Info << "GET_TOKEN_MAP: tokens " << tokens.size());
538 auto itr = peer_tokens.find(peer_host_id);
539 if (itr == peer_tokens.end()) {
540 itr = peer_tokens.insert(make_pair(peer_host_id, vector<TTokenValue>())).first;
541 for (const auto & item : tokens) {
542 TTokenValue value = strtol(item.c_str(), nullptr, 10);
543 itr->second.push_back(value);
544 cluster_tokens.insert(value);
545 }
546 }
547 }
548 }
549 ERR_POST(Info << "GET_TOKEN_MAP: PEERS HOST COUNT IS " << peer_tokens.size());
550 ERR_POST(Info << "GET_TOKEN_MAP: QUERY HOST COUNT IS " << query_host_count);
551 }
552
553 for(const auto& token : cluster_tokens) {
554 ERR_POST(Trace << "GET_TOKEN_MAP: \ttoken " << token);
555 }
556
557 ERR_POST(Info << "GET_TOKEN_MAP: tokens size " << cluster_tokens.size());
558 ranges.reserve(cluster_tokens.size() + 1);
559 TTokenValue lower_bound = numeric_limits<TTokenValue>::min();
560 for (int64_t token : cluster_tokens) {
561 ERR_POST(Trace << "GET_TOKEN_MAP: token " << token << " : " << token - lower_bound);
562 ranges.push_back(make_pair(lower_bound, token));
563 lower_bound = token;
564 }
565 ranges.push_back(make_pair(lower_bound, numeric_limits<TTokenValue>::max()));
566 }
567
GetPartitionKeyColumnNames(string const & keyspace,string const & table) const568 vector<string> CCassConnection::GetPartitionKeyColumnNames(string const & keyspace, string const & table) const
569 {
570 if (!m_session) {
571 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, driver is not connected");
572 }
573 using TMeta = const CassSchemaMeta;
574 unique_ptr<TMeta, function<void(TMeta*)>> schema_meta(
575 cass_session_get_schema_meta(m_session),
576 [](TMeta* meta)-> void {
577 cass_schema_meta_free(meta);
578 }
579 );
580 if (!schema_meta) {
581 NCBI_THROW(CCassandraException, eFatal, "Cluster metadata is not resolved");
582 }
583
584 const CassKeyspaceMeta* keyspace_meta = cass_schema_meta_keyspace_by_name_n(
585 schema_meta.get(), keyspace.c_str(), keyspace.size()
586 );
587 if (!keyspace_meta) {
588 NCBI_THROW(CCassandraException, eNotFound, "Keyspace not found");
589 }
590
591 const CassTableMeta* table_meta = cass_keyspace_meta_table_by_name_n(
592 keyspace_meta, table.c_str(), table.size()
593 );
594 if (!table_meta) {
595 NCBI_THROW(CCassandraException, eNotFound, "Table not found");
596 }
597
598 size_t partition_key_count = cass_table_meta_partition_key_count(table_meta);
599 vector<string> result;
600 result.reserve(partition_key_count);
601 for (size_t i = 0; i < partition_key_count; ++i) {
602 const CassColumnMeta* column_meta = cass_table_meta_partition_key(table_meta, i);
603 if (column_meta) {
604 const char* name;
605 size_t name_length;
606 cass_column_meta_name(column_meta, &name, &name_length);
607 result.emplace_back(name, name_length);
608 }
609 }
610 return result;
611 }
612
Prepare(const string & sql)613 const CassPrepared * CCassConnection::Prepare(const string & sql)
614 {
615 const CassPrepared * rv = nullptr;
616 #if CASS_PREPARED_Q
617 {
618 CSpinGuard guard(m_prepared_mux);
619 auto it = m_prepared.find(sql);
620 if (it != m_prepared.end()) {
621 rv = it->second;
622 return rv;
623 }
624 }
625
626 CSpinGuard guard(m_prepared_mux);
627 auto it = m_prepared.find(sql);
628 if (it == m_prepared.end())
629 {
630 const char * query = sql.c_str();
631 CassFuture * __future = cass_session_prepare(m_session, query);
632 if (!__future) {
633 RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query future"));
634 }
635
636 unique_ptr<CassFuture, function<void(CassFuture*)>> future(
637 __future,
638 [](CassFuture* future)
639 {
640 cass_future_free(future);
641 }
642 );
643
644 bool b;
645 b = cass_future_wait_timed(future.get(), m_qtimeoutms * 1000L);
646 if (!b) {
647 RAISE_DB_QRY_TIMEOUT(m_qtimeoutms, 0, string("failed to prepare query \"") + sql + "\"");
648 }
649 CassError rc = cass_future_error_code(future.get());
650 if (rc != CASS_OK) {
651 string msg = (rc == CASS_ERROR_SERVER_SYNTAX_ERROR || rc == CASS_ERROR_SERVER_INVALID_QUERY) ?
652 string(", sql: ") + sql : string("");
653 RAISE_CASS_QRY_ERROR(future.get(), msg);
654 }
655
656 rv = cass_future_get_prepared(future.get());
657 if (!rv) {
658 RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain prepared handle for sql: ") + sql);
659 }
660 m_prepared.emplace(sql, rv);
661 } else {
662 rv = it->second;
663 return rv;
664 }
665 #endif
666 return rv;
667 }
668
669
Perform(unsigned int optimeoutms,const std::function<bool ()> & PreLoopCB,const std::function<void (const CCassandraException &)> & DbExceptCB,const std::function<bool (bool)> & OpCB)670 void CCassConnection::Perform(
671 unsigned int optimeoutms,
672 const std::function<bool()> & PreLoopCB,
673 const std::function<void(const CCassandraException&)> & DbExceptCB,
674 const std::function<bool(bool)> & OpCB)
675 {
676 int err_cnt = 0;
677 bool is_repeated = false;
678 int64_t op_begin = gettime();
679 while (!PreLoopCB || PreLoopCB()) {
680 try {
681 if (OpCB(is_repeated)) {
682 break;
683 }
684 err_cnt = 0;
685 } catch (const CCassandraException & e) {
686 // log and ignore, app-specific layer is responsible for
687 // re-connetion if needed
688 if (DbExceptCB) {
689 DbExceptCB(e);
690 }
691
692 if (e.GetErrCode() == CCassandraException::eQueryTimeout && ++err_cnt < 10) {
693 ERR_POST(Info << "CAPTURED TIMEOUT: " << e.TimeoutMsg() << ", RESTARTING OP");
694 } else if (e.GetErrCode() == CCassandraException::eQueryFailedRestartable) {
695 ERR_POST(Info << "CAPTURED RESTARTABLE EXCEPTION: " << e.what() << ", RESTARTING OP");
696 } else {
697 // timer exceeded (10 times we got timeout and havn't read
698 // anyting, or got another error -> try to reconnect
699 ERR_POST("2. CAPTURED " << e.what());
700 throw;
701 }
702
703 int64_t op_time_ms = (gettime() - op_begin) / 1000;
704 if (optimeoutms != 0 && op_time_ms > optimeoutms) {
705 throw;
706 }
707 } catch(...) {
708 throw;
709 }
710 is_repeated = true;
711 }
712 }
713
714 /** CCassPrm */
Bind(CassStatement * statement,unsigned int idx)715 void CCassPrm::Bind(CassStatement * statement, unsigned int idx)
716 {
717 CassError rc = CASS_OK;
718 switch (m_type) {
719 case CASS_VALUE_TYPE_UNKNOWN:
720 if (!IsAssigned()) {
721 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Param #" + to_string(idx) + " is not assigned");
722 }
723 rc = cass_statement_bind_null(statement, idx);
724 break;
725 case CASS_VALUE_TYPE_TINY_INT:
726 rc = cass_statement_bind_int8(statement, idx, static_cast<cass_int8_t>(m_simpleval.i8));
727 break;
728 case CASS_VALUE_TYPE_SMALL_INT:
729 rc = cass_statement_bind_int16(statement, idx, static_cast<cass_int16_t>(m_simpleval.i16));
730 break;
731 case CASS_VALUE_TYPE_INT:
732 rc = cass_statement_bind_int32(statement, idx, static_cast<cass_int32_t>(m_simpleval.i32));
733 break;
734 case CASS_VALUE_TYPE_BIGINT:
735 rc = cass_statement_bind_int64(statement, idx, static_cast<cass_int64_t>(m_simpleval.i64));
736 break;
737
738 /*
739 * @saprykin
740 * Removed silent binding null if string is empty. It creates a lot of
741 * tombstones in storage. Sometimes if user wants to write empty string
742 * it means that we should write empty string.
743 * There is a proper method for writing nulls => CCassQuery::BindNull
744 */
745 case CASS_VALUE_TYPE_VARCHAR:
746 rc = cass_statement_bind_string(statement, idx, m_bytes.c_str());
747 break;
748 case CASS_VALUE_TYPE_BLOB:
749 if (m_bytes.size() > 0) {
750 rc = cass_statement_bind_bytes(
751 statement, idx,
752 reinterpret_cast<const unsigned char*>(m_bytes.c_str()),
753 m_bytes.size());
754 } else {
755 rc = cass_statement_bind_null(statement, idx);
756 }
757 break;
758 case CASS_VALUE_TYPE_SET:
759 case CASS_VALUE_TYPE_LIST:
760 case CASS_VALUE_TYPE_MAP:
761 if (m_collection.get()) {
762 rc = cass_statement_bind_collection(statement, idx, m_collection.get());
763 } else {
764 rc = cass_statement_bind_null(statement, idx);
765 }
766 break;
767 case CASS_VALUE_TYPE_TUPLE:
768 if (m_tuple.get()) {
769 rc = cass_statement_bind_tuple(statement, idx, m_tuple.get());
770 } else {
771 rc = cass_statement_bind_null(statement, idx);
772 }
773 break;
774 default:
775 RAISE_DB_ERROR(eBindFailed, string("Bind for (") + to_string(static_cast<int>(m_type)) + ") type is not implemented");
776 }
777 if (rc != CASS_OK) {
778 RAISE_CASS_ERROR(rc, eBindFailed,
779 "Bind for (" + to_string(static_cast<int>(m_type)) + ") failed with rc= " + to_string(static_cast<int>(rc)));
780 }
781 }
782
783
784 /** CCassQuery */
SetTimeout()785 void CCassQuery::SetTimeout()
786 {
787 SetTimeout(CASS_DRV_TIMEOUT_MS);
788 }
789
SetTimeout(unsigned int t)790 void CCassQuery::SetTimeout(unsigned int t)
791 {
792 m_qtimeoutms = t;
793 }
794
Timeout(void) const795 unsigned int CCassQuery::Timeout(void) const
796 {
797 return m_qtimeoutms;
798 }
799
Close(void)800 void CCassQuery::Close(void)
801 {
802 InternalClose(true);
803 }
804
~CCassQuery()805 CCassQuery::~CCassQuery()
806 {
807 Close();
808 m_ondata_data = nullptr;
809 m_ondata2_data = nullptr;
810 m_onexecute_data = nullptr;
811 }
812
InternalClose(bool closebatch)813 void CCassQuery::InternalClose(bool closebatch)
814 {
815 m_params.clear();
816 if (m_future) {
817 cass_future_free(m_future);
818 --m_connection->m_active_statements;
819 m_future = nullptr;
820 m_futuretime = 0;
821 }
822 if (m_statement) {
823 cass_statement_free(m_statement);
824 m_statement = nullptr;
825 }
826 if (m_batch && closebatch) {
827 cass_batch_free(m_batch);
828 m_batch = nullptr;
829 }
830 if (m_iterator) {
831 cass_iterator_free(m_iterator);
832 m_iterator = nullptr;
833 }
834 if (m_result) {
835 cass_result_free(m_result);
836 m_result = nullptr;
837 }
838 /*
839 * @saprykin
840 * Commented out.
841 * InternalClose is called from SetSQL and reseting SerialConsistency
842 * level adds SetSQL very nasty side effect.
843 * Consider not reusing CCassQuery for different types of requests.
844 */
845 // m_serial_consistency = CASS_CONSISTENCY_ANY;
846 m_row = nullptr;
847 m_page_start = false;
848 m_page_size = 0;
849 m_results_expected = false;
850 m_async = false;
851 m_allow_prepare = CASS_PREPARED_Q;
852 m_is_prepared = false;
853 if (m_cb_ref) {
854 m_cb_ref->Detach();
855 m_cb_ref = nullptr;
856 }
857 }
858
GetSQL() const859 string CCassQuery::GetSQL() const {
860 return m_sql;
861 }
862
SetSQL(const string & sql,unsigned int PrmCount)863 void CCassQuery::SetSQL(const string & sql, unsigned int PrmCount)
864 {
865 InternalClose(false);
866 m_sql = sql;
867 m_params.resize(PrmCount);
868 }
869
870
Bind()871 void CCassQuery::Bind()
872 {
873 if (!m_statement) {
874 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, query is closed");
875 }
876 int cnt = 0;
877 for (auto & it: m_params) {
878 it.Bind(m_statement, cnt);
879 cnt++;
880 }
881 }
882
883
BindNull(int iprm)884 void CCassQuery::BindNull(int iprm)
885 {
886 CheckParamExists(iprm);
887 m_params[iprm].AssignNull();
888 }
889
BindInt8(int iprm,int8_t value)890 void CCassQuery::BindInt8(int iprm, int8_t value)
891 {
892 CheckParamExists(iprm);
893 m_params[iprm].Assign(value);
894 }
895
BindInt16(int iprm,int16_t value)896 void CCassQuery::BindInt16(int iprm, int16_t value)
897 {
898 CheckParamExists(iprm);
899 m_params[iprm].Assign(value);
900 }
901
BindInt32(int iprm,int32_t value)902 void CCassQuery::BindInt32(int iprm, int32_t value)
903 {
904 CheckParamExists(iprm);
905 m_params[iprm].Assign(value);
906 }
907
908
BindInt64(int iprm,int64_t value)909 void CCassQuery::BindInt64(int iprm, int64_t value)
910 {
911 CheckParamExists(iprm);
912 m_params[iprm].Assign(value);
913 }
914
915
BindStr(int iprm,const string & value)916 void CCassQuery::BindStr(int iprm, const string & value)
917 {
918 CheckParamExists(iprm);
919 m_params[iprm].Assign(value);
920 }
921
922
BindStr(int iprm,const char * value)923 void CCassQuery::BindStr(int iprm, const char * value)
924 {
925 CheckParamExists(iprm);
926 m_params[iprm].Assign(value);
927 }
928
929
BindBytes(int iprm,const unsigned char * buf,size_t len)930 void CCassQuery::BindBytes(int iprm, const unsigned char * buf, size_t len)
931 {
932 CheckParamExists(iprm);
933 m_params[iprm].Assign(buf, len);
934 }
935
936
ParamAsInt32(int iprm)937 int32_t CCassQuery::ParamAsInt32(int iprm)
938 {
939 CheckParamAssigned(iprm);
940 return m_params[iprm].AsInt32();
941 }
942
943
ParamAsInt64(int iprm)944 int64_t CCassQuery::ParamAsInt64(int iprm)
945 {
946 CheckParamAssigned(iprm);
947 return m_params[iprm].AsInt64();
948 }
949
950
ParamAsStr(int iprm) const951 string CCassQuery::ParamAsStr(int iprm) const
952 {
953 CheckParamAssigned(iprm);
954 return m_params[iprm].AsString();
955 }
956
957
ParamAsStr(int iprm,string & value) const958 void CCassQuery::ParamAsStr(int iprm, string & value) const
959 {
960 CheckParamAssigned(iprm);
961 m_params[iprm].AsString(value);
962 }
963
ParamAsStrForDebug(int iprm) const964 string CCassQuery::ParamAsStrForDebug(int iprm) const
965 {
966 CheckParamAssigned(iprm);
967 return m_params[iprm].AsStringForDebug();
968 }
969
ParamType(int iprm) const970 CassValueType CCassQuery::ParamType(int iprm) const
971 {
972 CheckParamAssigned(iprm);
973 return m_params[iprm].GetType();
974 }
975
SetEOF(bool value)976 void CCassQuery::SetEOF(bool value)
977 {
978 if (m_EOF != value) {
979 m_EOF = value;
980 }
981 }
982
NewBatch()983 void CCassQuery::NewBatch()
984 {
985 if (!m_connection) {
986 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
987 }
988 if (m_batch) {
989 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, batch has already been started");
990 }
991 if (IsActive()) {
992 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, query is active");
993 }
994
995 m_batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
996 if (!m_batch) {
997 RAISE_DB_ERROR(eRsrcFailed, "failed to create batch");
998 }
999 }
1000
1001
Query(CassConsistency c,bool run_async,bool allow_prepared,unsigned int page_size)1002 void CCassQuery::Query(CassConsistency c, bool run_async,
1003 bool allow_prepared, unsigned int page_size)
1004 {
1005 if (!m_connection) {
1006 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1007 }
1008 if (m_sql.empty()) {
1009 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, SQL is not set");
1010 }
1011 if (m_batch) {
1012 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, can't run select in batch mode");
1013 }
1014 if (IsActive()) {
1015 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is active");
1016 }
1017 const CassPrepared * prepared = nullptr;
1018 if (allow_prepared) {
1019 prepared = m_connection->Prepare(m_sql);
1020 m_is_prepared = prepared != nullptr;
1021 }
1022 if (m_is_prepared) {
1023 m_statement = cass_prepared_bind(prepared);
1024 } else {
1025 m_statement = cass_statement_new(m_sql.c_str(), m_params.size());
1026 }
1027
1028 if (!m_statement) {
1029 RAISE_DB_ERROR(eRsrcFailed, string("failed to create cassandra query"));
1030 }
1031
1032 try {
1033 CassError rc;
1034 Bind();
1035 rc = cass_statement_set_consistency(m_statement, c);
1036 if (rc != CASS_OK) {
1037 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set consistency level " + to_string(static_cast<int>(c)));
1038 }
1039 if (m_serial_consistency != CASS_CONSISTENCY_ANY) {
1040 rc = cass_statement_set_serial_consistency(m_statement, m_serial_consistency);
1041 if (rc != CASS_OK) {
1042 RAISE_CASS_ERROR(rc, eQueryFailed,
1043 "Failed to set serial consistency level " + to_string(static_cast<int>(m_serial_consistency)));
1044 }
1045 }
1046 if (page_size > 0) {
1047 rc = cass_statement_set_paging_size(m_statement, page_size);
1048 if (rc != CASS_OK) {
1049 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set page size to " + to_string(static_cast<int>(page_size)));
1050 }
1051 }
1052
1053 m_page_start = true;
1054 m_page_size = page_size;
1055 SetEOF(false);
1056 m_results_expected = true;
1057 m_async = run_async;
1058 if (!m_batch) {
1059 if (run_async) {
1060 GetFuture();
1061 } else {
1062 Wait(m_qtimeoutms * 1000L);
1063 }
1064 }
1065 } catch(...) {
1066 Close();
1067 throw;
1068 }
1069 }
1070
RestartQuery(CassConsistency c)1071 void CCassQuery::RestartQuery(CassConsistency c)
1072 {
1073 if (!m_future) {
1074 RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1075 }
1076 unsigned int page_size = m_page_size;
1077 CCassParams params = move(m_params);
1078 bool async = m_async, allow_prepared = m_is_prepared;
1079 Close();
1080 m_params = move(params);
1081 Query(c, async, allow_prepared, page_size);
1082 }
1083
Execute(CassConsistency c,bool run_async,bool allow_prepared)1084 void CCassQuery::Execute(CassConsistency c, bool run_async, bool allow_prepared)
1085 {
1086 if (!m_connection) {
1087 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1088 }
1089 if (m_sql.empty()) {
1090 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, SQL is not set");
1091 }
1092 if (m_row != nullptr || m_statement != nullptr) {
1093 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is active");
1094 }
1095
1096 const CassPrepared * prepared = nullptr;
1097 if (allow_prepared) {
1098 prepared = m_connection->Prepare(m_sql);
1099 m_is_prepared = prepared != nullptr;
1100 }
1101
1102 if (m_is_prepared) {
1103 m_statement = cass_prepared_bind(prepared);
1104 } else {
1105 m_statement = cass_statement_new(m_sql.c_str(), m_params.size());
1106 }
1107
1108 if (!m_statement) {
1109 RAISE_DB_ERROR(eRsrcFailed, "failed to create cassandra query");
1110 }
1111
1112 try {
1113 CassError rc;
1114 Bind();
1115 rc = cass_statement_set_consistency(m_statement, c);
1116 if (rc != CASS_OK) {
1117 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set consistency level " + to_string(static_cast<int>(c)));
1118 }
1119 if (m_serial_consistency != CASS_CONSISTENCY_ANY) {
1120 rc = cass_statement_set_serial_consistency(m_statement, m_serial_consistency);
1121 if (rc != CASS_OK) {
1122 RAISE_CASS_ERROR(rc, eQueryFailed,
1123 "Failed to set serial consistency level " + to_string(static_cast<int>(m_serial_consistency)));
1124 }
1125 }
1126
1127 if (m_batch) {
1128 CassError rc = cass_batch_add_statement(m_batch, m_statement);
1129 if (rc != CASS_OK) {
1130 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to add statement to batch, sql: " + m_sql);
1131 }
1132 cass_statement_free(m_statement);
1133 m_statement = nullptr;
1134 }
1135
1136 m_page_start = false;
1137 m_page_size = 0;
1138 SetEOF(false);
1139 m_results_expected = false;
1140 m_async = run_async;
1141 if (!m_batch) {
1142 if (run_async) {
1143 GetFuture();
1144 } else {
1145 Wait(0);
1146 }
1147 }
1148 } catch(...) {
1149 if (m_statement) {
1150 cass_statement_free(m_statement);
1151 m_statement = nullptr;
1152 }
1153 throw;
1154 }
1155 }
1156
RestartExecute(CassConsistency c)1157 void CCassQuery::RestartExecute(CassConsistency c)
1158 {
1159 if (!m_future) {
1160 RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1161 }
1162 CCassParams params = move(m_params);
1163 bool async = m_async, allow_prepared = m_is_prepared;
1164 Close();
1165 m_params = move(params);
1166 Execute(c, async, allow_prepared);
1167 }
1168
Restart(CassConsistency c)1169 void CCassQuery::Restart(CassConsistency c)
1170 {
1171 if (!m_future) {
1172 RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1173 }
1174 if (m_results_expected) {
1175 RestartQuery(c);
1176 } else {
1177 RestartExecute(c);
1178 }
1179 }
1180
1181
SetSerialConsistency(CassConsistency c)1182 void CCassQuery::SetSerialConsistency(CassConsistency c)
1183 {
1184 m_serial_consistency = c;
1185 }
1186
1187
RunBatch()1188 async_rslt_t CCassQuery::RunBatch()
1189 {
1190 async_rslt_t rv = ar_wait;
1191 if (!m_batch) {
1192 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, batch is not created");
1193 }
1194
1195 if (m_async) {
1196 GetFuture();
1197 } else {
1198 rv = Wait(m_qtimeoutms * 1000L);
1199 if (m_batch) {
1200 cass_batch_free(m_batch);
1201 m_batch = nullptr;
1202 }
1203 }
1204 return rv;
1205 }
1206
1207
WaitAsync(unsigned int timeoutmks)1208 async_rslt_t CCassQuery::WaitAsync(unsigned int timeoutmks)
1209 {
1210 if (!m_async) {
1211 RAISE_DB_ERROR(eSeqFailed, "attempt to wait on query in non-async state");
1212 }
1213 return Wait(timeoutmks);
1214 }
1215
1216
IsReady()1217 bool CCassQuery::IsReady()
1218 {
1219 GetFuture();
1220 return cass_future_ready(m_future) == cass_true;
1221 }
1222
1223
GetFuture()1224 void CCassQuery::GetFuture()
1225 {
1226 if (!m_connection) {
1227 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1228 }
1229 if (!IsActive() && !m_batch) {
1230 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is not active");
1231 }
1232 if (m_iterator || m_result) {
1233 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, results already obtained");
1234 }
1235 if (!m_future) {
1236 ++m_connection->m_active_statements;
1237 if (m_batch) {
1238 m_future = cass_session_execute_batch(m_connection->m_session, m_batch);
1239 } else {
1240 m_future = cass_session_execute(m_connection->m_session, m_statement);
1241 }
1242 if (!m_future) {
1243 --m_connection->m_active_statements;
1244 RAISE_DB_ERROR(eRsrcFailed, "failed to obtain cassandra query future");
1245 }
1246 m_futuretime = gettime();
1247 if (m_ondata || m_ondata2 || m_ondata3.lock()) {
1248 SetupOnDataCallback();
1249 }
1250 }
1251 }
1252
1253
Wait(unsigned int timeoutmks)1254 async_rslt_t CCassQuery::Wait(unsigned int timeoutmks)
1255 {
1256 if (m_results_expected && m_result) {
1257 if (m_async) {
1258 return ar_dataready;
1259 }
1260 RAISE_DB_ERROR(eSeqFailed, "result has already been allocated");
1261 }
1262
1263 if (m_EOF && m_results_expected) {
1264 return ar_done;
1265 }
1266
1267 GetFuture();
1268 bool rv;
1269 if (timeoutmks != 0) {
1270 rv = cass_future_wait_timed(m_future, timeoutmks);
1271 } else if (!m_async) {
1272 cass_future_wait(m_future);
1273 rv = true;
1274 } else {
1275 rv = cass_future_ready(m_future);
1276 }
1277
1278 if (!rv) {
1279 if (!m_async && timeoutmks > 0) {
1280 int64_t t = (gettime() - m_futuretime) / 1000L;
1281 RAISE_DB_QRY_TIMEOUT(t, timeoutmks / 1000L, string("failed to perform query \"") + m_sql + "\"");
1282 } else {
1283 return ar_wait;
1284 }
1285 } else {
1286 ProcessFutureResult();
1287 if ((m_statement || m_batch) && !m_results_expected) {
1288 // next request was run in m_ondata event handler
1289 return ar_wait;
1290 } else if (m_EOF || !m_results_expected) {
1291 return ar_done;
1292 } else if (m_result) {
1293 return ar_dataready;
1294 } else {
1295 return ar_wait;
1296 }
1297 }
1298 }
1299
SetupOnDataCallback()1300 void CCassQuery::SetupOnDataCallback()
1301 {
1302 if (!m_future) {
1303 RAISE_DB_ERROR(eSeqFailed, "Future is not assigned");
1304 }
1305 if (!m_ondata && !m_ondata2 && !m_ondata3.lock()) {
1306 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, m_ondata is not set");
1307 }
1308 if (m_cb_ref) {
1309 m_cb_ref->Detach();
1310 }
1311 m_cb_ref.reset(new CCassQueryCbRef(shared_from_this()));
1312 m_cb_ref->Attach(m_ondata, m_ondata_data, m_ondata2, m_ondata2_data, m_ondata3.lock());
1313 CassError rv = cass_future_set_callback(m_future, CCassQueryCbRef::s_OnFutureCb, m_cb_ref.get());
1314 if (rv != CASS_OK) {
1315 m_cb_ref->Detach(true);
1316 m_cb_ref = nullptr;
1317 RAISE_DB_ERROR(eSeqFailed, "failed to assign future callback");
1318 }
1319 }
1320
ProcessFutureResult()1321 void CCassQuery::ProcessFutureResult()
1322 {
1323 if (!m_future) {
1324 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, m_future is not set");
1325 }
1326 if (m_iterator || m_result) {
1327 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, results already obtained");
1328 }
1329 CassError rc = cass_future_error_code(m_future);
1330 if (rc != CASS_OK) {
1331 if (m_statement) {
1332 cass_statement_free(m_statement);
1333 m_statement = nullptr;
1334 }
1335 string msg;
1336 if (rc == CASS_ERROR_SERVER_SYNTAX_ERROR || rc == CASS_ERROR_SERVER_INVALID_QUERY) {
1337 msg = ", sql: " + m_sql;
1338 }
1339 RAISE_CASS_QRY_ERROR(m_future, msg);
1340 }
1341
1342 if (m_results_expected) {
1343 m_result = cass_future_get_result(m_future);
1344 if (!m_result) {
1345 RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query result"));
1346 }
1347 if (m_iterator) {
1348 RAISE_DB_ERROR(eSeqFailed, "iterator has already been allocated");
1349 }
1350
1351 m_iterator = cass_iterator_from_result(m_result);
1352 if (!m_iterator) {
1353 RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query iterator"));
1354 }
1355 /*
1356 we release m_future here for statements that return dataset,
1357 otherwise we free it in the destructor, keeping m_future not null
1358 as an indication that we have already waited for query to finish
1359 */
1360 cass_future_free(m_future);
1361 --m_connection->m_active_statements;
1362 m_future = nullptr;
1363 if (m_cb_ref) {
1364 m_cb_ref->Detach();
1365 m_cb_ref = nullptr;
1366 }
1367 m_futuretime = 0;
1368 m_page_start = false;
1369 } else {
1370 if (m_statement) {
1371 cass_statement_free(m_statement);
1372 m_statement = nullptr;
1373 }
1374 if (m_batch) {
1375 cass_batch_free(m_batch);
1376 m_batch = nullptr;
1377 }
1378 SetEOF(true);
1379 if (m_onexecute) {
1380 m_onexecute(*this, m_onexecute_data);
1381 }
1382 }
1383 }
1384
1385
NextRow()1386 async_rslt_t CCassQuery::NextRow()
1387 {
1388 if (!IsActive()) {
1389 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is not active");
1390 }
1391 while(1) {
1392 m_row = nullptr;
1393 if (m_result == nullptr) {
1394 async_rslt_t wr;
1395 if (m_async) {
1396 wr = Wait(0);
1397 } else {
1398 wr = Wait(m_qtimeoutms * 1000L);
1399 }
1400 if (wr != ar_dataready) {
1401 return wr;
1402 }
1403 }
1404 if (m_iterator && m_result) {
1405 bool b = cass_iterator_next(m_iterator);
1406 if (!b) {
1407 if (m_page_size > 0) {
1408 bool has_more_pages = cass_result_has_more_pages(m_result);
1409 if (has_more_pages) {
1410 CassError err;
1411 if ((err = cass_statement_set_paging_state(m_statement, m_result)) != CASS_OK) {
1412 RAISE_CASS_ERROR(err, eFetchFailed, string("failed to retrive next page"));
1413 }
1414 }
1415 cass_iterator_free(m_iterator);
1416 m_iterator = nullptr;
1417 cass_result_free(m_result);
1418 m_result = nullptr;
1419 if (!has_more_pages) {
1420 SetEOF(true);
1421 return ar_done;
1422 }
1423 // go to above
1424 m_page_start = true;
1425 } else {
1426 SetEOF(true);
1427 return ar_done;
1428 }
1429 } else {
1430 m_row = cass_iterator_get_row(m_iterator);
1431 if (!m_row) {
1432 RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query result row"));
1433 }
1434 return ar_dataready;
1435 }
1436 } else {
1437 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, attempt to fetch next row on a closed query");
1438 }
1439 }
1440 }
1441
1442
1443 template<>
GetColumn(int ifld) const1444 const CassValue * CCassQuery::GetColumn(int ifld) const
1445 {
1446 if (!m_row) {
1447 RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1448 }
1449 const CassValue * clm = cass_row_get_column(m_row, ifld);
1450 if (!clm) {
1451 RAISE_DB_ERROR(eSeqFailed, "column is not fetched (index " + to_string(ifld) + " beyound the range?)");
1452 }
1453 return clm;
1454 }
1455
1456
1457 template<>
GetColumn(const string & name) const1458 const CassValue * CCassQuery::GetColumn(const string & name) const
1459 {
1460 if (!m_row) {
1461 RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1462 }
1463 const CassValue * clm = cass_row_get_column_by_name_n(m_row, name.c_str(), name.size());
1464 if (!clm) {
1465 RAISE_DB_ERROR(eSeqFailed, "column " + name + " is not available");
1466 }
1467 return clm;
1468 }
1469
1470 template<>
GetColumn(const char * name) const1471 const CassValue * CCassQuery::GetColumn(const char * name) const
1472 {
1473 if (!m_row) {
1474 RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1475 }
1476
1477 const CassValue * clm = cass_row_get_column_by_name(m_row, name);
1478 if (!clm) {
1479 RAISE_DB_ERROR(eSeqFailed, "column " + string(name) + " is not available");
1480 }
1481 return clm;
1482 }
1483
1484 template<>
GetColumnDef(int ifld) const1485 string CCassQuery::GetColumnDef(int ifld) const {
1486 return ToString() + "\ncolumn: " + NStr::NumericToString(ifld);
1487 }
1488
1489 template<>
GetColumnDef(const string & name) const1490 string CCassQuery::GetColumnDef(const string& name) const {
1491 return ToString() + "\ncolumn: " + name;
1492 }
1493
1494 template<>
GetColumnDef(const char * name) const1495 string CCassQuery::GetColumnDef(const char* name) const {
1496 return ToString() + "\ncolumn: " + string(name);
1497 }
1498
ToString() const1499 string CCassQuery::ToString() const
1500 {
1501 string params;
1502 for (size_t i = 0; i < ParamCount(); ++i) {
1503 if (!params.empty()) {
1504 params.append(", ");
1505 }
1506 switch (ParamType(i)) {
1507 case CASS_VALUE_TYPE_SET:
1508 params.append("?SET");
1509 break;
1510 case CASS_VALUE_TYPE_LIST:
1511 params.append("?LIST");
1512 break;
1513 case CASS_VALUE_TYPE_MAP:
1514 params.append("?MAP");
1515 break;
1516 case CASS_VALUE_TYPE_TUPLE:
1517 params.append("?TUPLE");
1518 break;
1519 default: {
1520 string prm;
1521 try {
1522 prm = ParamAsStr(i);
1523 }
1524 catch(const CCassandraException&) {
1525 prm = "???[" + NStr::NumericToString(i) + "]";
1526 }
1527 params.append(prm);
1528 }
1529 }
1530 }
1531
1532 return m_sql.empty() ? "<>" : m_sql + "\nparams: " + params;
1533 }
1534
IsEOF(void) const1535 bool CCassQuery::IsEOF(void) const
1536 {
1537 return m_EOF;
1538 }
1539
IsAsync(void) const1540 bool CCassQuery::IsAsync(void) const
1541 {
1542 return m_async;
1543 }
1544
1545 END_IDBLOB_SCOPE
1546