1 /*  $Id: cass_driver.cpp 617515 2020-10-02 13:22:45Z saprykin $
2  * ===========================================================================
3  *
4  *                            PUBLIC DOMAIN NOTICE
5  *               National Center for Biotechnology Information
6  *
7  *  This software/database is a "United States Government Work" under the
8  *  terms of the United States Copyright Act.  It was written as part of
9  *  the author's official duties as a United States Government employee and
10  *  thus cannot be copyrighted.  This software/database is freely available
11  *  to the public for use. The National Library of Medicine and the U.S.
12  *  Government have not placed any restriction on its use or reproduction.
13  *
14  *  Although all reasonable efforts have been taken to ensure the accuracy
15  *  and reliability of the software and data, the NLM and the U.S.
16  *  Government do not and cannot warrant the performance or results that
17  *  may be obtained by using this software or data. The NLM and the U.S.
18  *  Government disclaim all warranties, express or implied, including
19  *  warranties of performance, merchantability or fitness for any particular
20  *  purpose.
21  *
22  *  Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Dmitri Dmitrienko
27  *
28  * File Description:
29  *
30  *  Wrapper class around cassandra "C"-API
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
36 #include <objtools/pubseq_gateway/impl/cassandra/cass_driver.hpp>
37 
38 #include <unistd.h>
39 #include <objtools/pubseq_gateway/impl/cassandra/IdCassScope.hpp>
40 #include <objtools/pubseq_gateway/impl/cassandra/cass_exception.hpp>
41 #include <objtools/pubseq_gateway/impl/cassandra/cass_util.hpp>
42 
43 #include <atomic>
44 #include <limits>
45 #include <memory>
46 #include <set>
47 #include <string>
48 #include <sstream>
49 #include <utility>
50 #include <vector>
51 
52 #include "corelib/ncbitime.hpp"
53 #include "corelib/ncbistr.hpp"
54 
55 BEGIN_IDBLOB_SCOPE
56 USING_NCBI_SCOPE;
57 
58 BEGIN_SCOPE()
59     constexpr unsigned kDefaultIOThreads = 32;
60     constexpr cass_duration_t kDisconnectTimeoutMcs = 5000000;
LogCallback(const CassLogMessage * message,void *)61     void LogCallback(const CassLogMessage * message, void * /*data*/)
62     {
63         switch (message->severity) {
64             case CASS_LOG_CRITICAL:
65                 ERR_POST(Critical << message->message);
66                 break;
67             case CASS_LOG_WARN:
68                 ERR_POST(Warning << message->message);
69                 break;
70             case CASS_LOG_INFO:
71                 ERR_POST(Info << message->message);
72                 break;
73             case CASS_LOG_DEBUG:
74                 ERR_POST(Trace << message->message);
75                 break;
76             case CASS_LOG_TRACE:
77                 ERR_POST(Trace << message->message);
78                 break;
79             case CASS_LOG_ERROR:
80             default:
81                 ERR_POST(Error << message->message);
82         }
83     }
84 
s_MapFromToolkitSeverity(EDiagSev severity)85     CassLogLevel s_MapFromToolkitSeverity(EDiagSev  severity)
86     {
87         switch (severity) {
88             case eDiag_Info:        return CASS_LOG_INFO;
89             case eDiag_Warning:     return CASS_LOG_WARN;
90             case eDiag_Error:       return CASS_LOG_ERROR;
91             case eDiag_Critical:    return CASS_LOG_CRITICAL;
92             case eDiag_Fatal:       return CASS_LOG_CRITICAL;
93             case eDiag_Trace:       return CASS_LOG_TRACE;
94         }
95         return CASS_LOG_ERROR;
96     }
97 END_SCOPE()
98 
99 bool CCassConnection::m_LoggingInitialized = false;
100 bool CCassConnection::m_LoggingEnabled = false;
101 EDiagSev CCassConnection::m_LoggingLevel = eDiag_Error;
102 atomic<CassUuidGen*> CCassConnection::m_CassUuidGen(nullptr);
103 
104 const unsigned int CCassQuery::DEFAULT_PAGE_SIZE = 4096;
105 
106 /** CCassConnection */
CCassConnection()107 CCassConnection::CCassConnection()
108     : m_port(0)
109     , m_cluster(nullptr)
110     , m_session(nullptr)
111     , m_ctimeoutms(0)
112     , m_qtimeoutms(0)
113     , m_last_query_cnt(0)
114     , m_loadbalancing(LB_DCAWARE)
115     , m_tokenaware(true)
116     , m_latencyaware(false)
117     , m_numThreadsIo(0)
118     , m_numConnPerHost(0)
119     , m_maxConnPerHost(0)
120     , m_keepalive(0)
121     , m_fallback_readconsistency(false)
122     , m_FallbackWriteConsistency(0)
123     , m_active_statements(0)
124 {}
125 
126 
~CCassConnection()127 CCassConnection::~CCassConnection()
128 {
129     Close();
130 }
131 
132 
SetLogging(EDiagSev severity)133 void CCassConnection::SetLogging(EDiagSev  severity)
134 {
135     cass_log_set_level(s_MapFromToolkitSeverity(severity));
136     cass_log_set_callback(LogCallback, nullptr);
137     m_LoggingEnabled = true;
138     m_LoggingLevel = severity;
139     m_LoggingInitialized = true;
140 }
141 
142 
DisableLogging(void)143 void CCassConnection::DisableLogging(void)
144 {
145     cass_log_set_level(CASS_LOG_DISABLED);
146     m_LoggingEnabled = false;
147     m_LoggingInitialized = true;
148 }
149 
150 
UpdateLogging(void)151 void CCassConnection::UpdateLogging(void)
152 {
153     if (m_LoggingInitialized) {
154         if (m_LoggingEnabled) {
155             SetLogging(m_LoggingLevel);
156         } else {
157             DisableLogging();
158         }
159     }
160 }
161 
QryTimeout(void) const162 unsigned int CCassConnection::QryTimeout(void) const
163 {
164     return m_qtimeoutms;
165 }
166 
QryTimeoutMks(void) const167 unsigned int CCassConnection::QryTimeoutMks(void) const
168 {
169     return QryTimeout() * 1000;
170 }
171 
SetRtLimits(unsigned int numThreadsIo,unsigned int numConnPerHost,unsigned int maxConnPerHost)172 void CCassConnection::SetRtLimits(unsigned int  numThreadsIo, unsigned int  numConnPerHost,
173                  unsigned int  maxConnPerHost)
174 {
175     m_numThreadsIo = numThreadsIo;
176     m_numConnPerHost = numConnPerHost;
177     m_maxConnPerHost = maxConnPerHost;
178 }
179 
SetKeepAlive(unsigned int keepalive)180 void CCassConnection::SetKeepAlive(unsigned int keepalive)
181 {
182     m_keepalive = keepalive;
183 }
184 
185 
Create()186 shared_ptr<CCassConnection> CCassConnection::Create()
187 {
188     return shared_ptr<CCassConnection>(new CCassConnection());
189 }
190 
191 
SetLoadBalancing(loadbalancing_policy_t policy)192 void CCassConnection::SetLoadBalancing(loadbalancing_policy_t  policy)
193 {
194     m_loadbalancing = policy;
195 }
196 
197 
SetTokenAware(bool value)198 void CCassConnection::SetTokenAware(bool  value)
199 {
200     m_tokenaware = value;
201 }
202 
203 
SetLatencyAware(bool value)204 void CCassConnection::SetLatencyAware(bool  value)
205 {
206     m_latencyaware = value;
207 }
208 
SetTimeouts(unsigned int ConnTimeoutMs)209 void CCassConnection::SetTimeouts(unsigned int ConnTimeoutMs)
210 {
211     SetTimeouts(ConnTimeoutMs, CASS_DRV_TIMEOUT_MS);
212 }
213 
SetTimeouts(unsigned int ConnTimeoutMs,unsigned int QryTimeoutMs)214 void CCassConnection::SetTimeouts(unsigned int ConnTimeoutMs, unsigned int QryTimeoutMs)
215 {
216     if (ConnTimeoutMs == 0 || ConnTimeoutMs > kCassMaxTimeout) {
217         ConnTimeoutMs = kCassMaxTimeout;
218     }
219     if (QryTimeoutMs == 0 || QryTimeoutMs > kCassMaxTimeout) {
220         QryTimeoutMs = kCassMaxTimeout;
221     }
222     m_qtimeoutms = QryTimeoutMs;
223     m_ctimeoutms = ConnTimeoutMs;
224     if (m_cluster) {
225         cass_cluster_set_request_timeout(m_cluster, m_qtimeoutms);
226     }
227 }
228 
SetFallBackRdConsistency(bool value)229 void CCassConnection::SetFallBackRdConsistency(bool value)
230 {
231     m_fallback_readconsistency = value;
232 }
233 
GetFallBackRdConsistency(void) const234 bool CCassConnection::GetFallBackRdConsistency(void) const
235 {
236     return m_fallback_readconsistency;
237 }
238 
SetFallBackWrConsistency(unsigned int value)239 void CCassConnection::SetFallBackWrConsistency(unsigned int  value)
240 {
241     m_FallbackWriteConsistency = value;
242 }
243 
GetFallBackWrConsistency(void) const244 unsigned int CCassConnection::GetFallBackWrConsistency(void) const
245 {
246     return m_FallbackWriteConsistency;
247 }
248 
NewTimeUUID()249 string CCassConnection::NewTimeUUID()
250 {
251     char buf[CASS_UUID_STRING_LENGTH];
252     if (!m_CassUuidGen.load()) {
253         CassUuidGen *gen, *nothing = nullptr;
254         gen = cass_uuid_gen_new();
255         if (!m_CassUuidGen.compare_exchange_weak(nothing, gen)) {
256             cass_uuid_gen_free(gen);
257         }
258     }
259 
260     CassUuid uuid;
261     cass_uuid_gen_time(static_cast<CassUuidGen*>(m_CassUuidGen.load()), &uuid);
262     cass_uuid_string(uuid, buf);
263     return string(buf);
264 }
265 
Connect()266 void CCassConnection::Connect()
267 {
268     if (IsConnected()) {
269         NCBI_THROW(CCassandraException, eSeqFailed, "cassandra driver has already been connected");
270     }
271     if (m_host.empty()) {
272         NCBI_THROW(CCassandraException, eSeqFailed, "cassandra host list is empty");
273     }
274 
275     UpdateLogging();
276     m_cluster = cass_cluster_new();
277 
278     cass_cluster_set_connect_timeout(m_cluster, m_ctimeoutms);
279     cass_cluster_set_contact_points(m_cluster, m_host.c_str());
280     if (m_port > 0) {
281         cass_cluster_set_port(m_cluster, m_port);
282     }
283     if (m_qtimeoutms > 0) {
284         cass_cluster_set_request_timeout(m_cluster, m_qtimeoutms);
285     }
286 
287     if (!m_user.empty()) {
288         cass_cluster_set_credentials(m_cluster, m_user.c_str(), m_pwd.c_str());
289     }
290 
291     if (m_loadbalancing != LB_DCAWARE && m_loadbalancing == LB_ROUNDROBIN) {
292         cass_cluster_set_load_balance_round_robin(m_cluster);
293     }
294 
295     cass_cluster_set_token_aware_routing(m_cluster, m_tokenaware ? cass_true : cass_false);
296     cass_cluster_set_latency_aware_routing(m_cluster, m_latencyaware ? cass_true : cass_false);
297     cass_cluster_set_num_threads_io(m_cluster, m_numThreadsIo ? m_numThreadsIo : kDefaultIOThreads);
298 
299 
300     if (m_maxConnPerHost) {
301         cass_cluster_set_max_connections_per_host(m_cluster, m_maxConnPerHost);
302     }
303 
304     if (m_numConnPerHost) {
305         cass_cluster_set_core_connections_per_host(m_cluster, m_numConnPerHost);
306     }
307 
308     if (m_keepalive > 0) {
309         cass_cluster_set_tcp_keepalive(m_cluster, cass_true, m_keepalive);
310     }
311 
312     if (!m_blacklist.empty()) {
313         cass_cluster_set_blacklist_filtering(m_cluster, m_blacklist.c_str());
314     }
315 
316     try {
317         Reconnect();
318     } catch (CCassandraException const &) {
319         Close();
320         throw;
321     }
322 }
323 
324 
CloseSession()325 void CCassConnection::CloseSession()
326 {
327     {
328         CSpinGuard guard(m_prepared_mux);
329         for(auto & item : m_prepared) {
330             if (item.second) {
331                 cass_prepared_free(item.second);
332                 item.second = nullptr;
333             }
334         }
335         m_prepared.clear();
336     }
337 
338     if (m_session) {
339         CassFuture * close_future;
340         bool free = false;
341         close_future = cass_session_close(m_session);
342         if (close_future) {
343             free = cass_future_wait_timed(close_future, kDisconnectTimeoutMcs);
344             cass_future_free(close_future);
345         }
346         // otherwise we can't free it, let better leak than crash
347         if (free) {
348             cass_session_free(m_session);
349         }
350         m_session = nullptr;
351     }
352 }
353 
354 
Reconnect()355 void CCassConnection::Reconnect()
356 {
357     if (!m_cluster) {
358         NCBI_THROW(CCassandraException, eSeqFailed,
359                    "invalid sequence of operations, driver is not connected, can't re-connect");
360     }
361 
362     if (m_session) {
363         CloseSession();
364     }
365 
366     m_session = cass_session_new();
367     if (!m_session) {
368         RAISE_DB_ERROR(eRsrcFailed, "failed to get cassandra session handle");
369     }
370 
371     CassFuture * __future = cass_session_connect(m_session, m_cluster);
372     if (!__future) {
373         RAISE_DB_ERROR(eRsrcFailed, "failed to obtain cassandra connection future");
374     }
375 
376     unique_ptr<CassFuture, function<void(CassFuture*)>> future(
377         __future,
378         [](CassFuture* future)
379         {
380             cass_future_free(future);
381         }
382     );
383 
384     CassError rc = CASS_OK;
385     cass_future_wait(future.get());
386     rc = cass_future_error_code(future.get());
387     if (rc != CASS_OK) {
388         RAISE_CASS_CONN_ERROR(future.get(), string(""));
389     }
390 
391     if (!m_keyspace.empty()) {
392         string _sav = m_keyspace;
393         m_keyspace.clear();
394         SetKeyspace(_sav);
395     }
396 }
397 
398 
Close()399 void CCassConnection::Close()
400 {
401     CloseSession();
402     if (m_cluster) {
403         cass_cluster_free(m_cluster);
404         m_cluster = nullptr;
405     }
406 }
407 
IsConnected(void)408 bool CCassConnection::IsConnected(void)
409 {
410     return (m_cluster || m_session);
411 }
412 
GetActiveStatements(void) const413 int64_t CCassConnection::GetActiveStatements(void) const
414 {
415     return m_active_statements;
416 }
417 
GetMetrics(void)418 CassMetrics CCassConnection::GetMetrics(void)
419 {
420     CassMetrics metrics;
421     if(m_session) {
422         cass_session_get_metrics(m_session, &metrics);
423     }
424     return metrics;
425 }
426 
SetConnProp(const string & host,const string & user,const string & pwd,int16_t port)427 void CCassConnection::SetConnProp(
428     const string & host,
429     const string & user,
430     const string & pwd,
431     int16_t port)
432 {
433     m_host = host;
434     m_user = user;
435     m_pwd = pwd;
436     m_port = port;
437     if (IsConnected()) {
438         Close();
439     }
440 }
441 
442 
SetKeyspace(const string & keyspace)443 void CCassConnection::SetKeyspace(const string &  keyspace)
444 {
445     if (m_keyspace != keyspace) {
446         if (IsConnected()) {
447             shared_ptr<CCassQuery> query(NewQuery());
448             query->SetSQL(string("USE ") + keyspace, 0);
449             query->Execute(CASS_CONSISTENCY_LOCAL_QUORUM);
450         }
451         m_keyspace = keyspace;
452     }
453 }
454 
Keyspace(void) const455 string CCassConnection::Keyspace(void) const
456 {
457     return m_keyspace;
458 }
459 
SetBlackList(const string & blacklist)460 void CCassConnection::SetBlackList(const string & blacklist)
461 {
462     m_blacklist = blacklist;
463 }
464 
NewQuery()465 shared_ptr<CCassQuery> CCassConnection::NewQuery()
466 {
467     if (!m_session) {
468         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, driver is not connected");
469     }
470 
471     shared_ptr<CCassQuery> rv(new CCassQuery(shared_from_this()));
472     rv->SetTimeout(m_qtimeoutms);
473     return rv;
474 }
475 
getTokenRanges(TTokenRanges & ranges)476 void CCassConnection::getTokenRanges(TTokenRanges &ranges)
477 {
478     GetTokenRanges(ranges);
479 }
480 
GetTokenRanges(TTokenRanges & ranges)481 void CCassConnection::GetTokenRanges(TTokenRanges &ranges)
482 {
483     set<TTokenValue> cluster_tokens;
484     map<string, vector<TTokenValue>> peer_tokens;
485 
486     shared_ptr<CCassQuery> query(NewQuery());
487     query->SetSQL("select data_center, schema_version, host_id, tokens "
488                   "from system.local", 0);
489     query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
490 
491     string host_id,  datacenter, schema;
492     vector<string> tokens;
493 
494     query->NextRow();
495     query->FieldGetStrValue(0, datacenter);
496     query->FieldGetStrValue(1, schema);
497     query->FieldGetStrValue(2, host_id);
498     query->FieldGetSetValues(3, tokens);
499     auto itr = peer_tokens.insert(make_pair(host_id,vector<TTokenValue>())).first;
500     for(const auto & item: tokens) {
501         TTokenValue value = strtol(item.c_str(), nullptr, 10);
502         itr->second.push_back(value);
503         cluster_tokens.insert(value);
504     }
505 
506     ERR_POST(Info << "GET_TOKEN_MAP: Schema version is " << schema);
507     ERR_POST(Info << "GET_TOKEN_MAP: datacenter is " << datacenter);
508     ERR_POST(Info << "GET_TOKEN_MAP: host_id is " << host_id);
509 
510     unsigned int query_host_count = 0;
511     unsigned int retries = 0;
512     set<string> query_hosts;
513     while (retries < 3 && (query_host_count == 0 || peer_tokens.size() == query_host_count)) {
514         retries++;
515         if (query_host_count != 0) {
516             ERR_POST(Info << "GET_TOKEN_MAP: Host_id count is too small. "
517                 "Retrying system.peers fetch. " << retries);
518         }
519 
520         query_host_count = 0;
521         query_hosts.clear();
522         query->SetSQL("select host_id, data_center, schema_version, tokens from system.peers", 0);
523         query->Query(CassConsistency::CASS_CONSISTENCY_LOCAL_ONE, false, false);
524         while (query->NextRow() == ar_dataready) {
525             string peer_host_id, peer_dc, peer_schema;
526             vector<string> tokens;
527             query->FieldGetStrValue(1, peer_dc);
528             query->FieldGetStrValue(2, peer_schema);
529             if (datacenter == peer_dc && schema == peer_schema) {
530                 query->FieldGetStrValue(0, peer_host_id);
531                 if (query_hosts.find(peer_host_id) == query_hosts.end()) {
532                     query_hosts.insert(peer_host_id);
533                     query_host_count++;
534                 }
535                 query->FieldGetSetValues(3, tokens);
536                 ERR_POST(Info << "GET_TOKEN_MAP: host is " << peer_host_id);
537                 ERR_POST(Info << "GET_TOKEN_MAP: tokens " << tokens.size());
538                 auto itr = peer_tokens.find(peer_host_id);
539                 if (itr == peer_tokens.end()) {
540                     itr = peer_tokens.insert(make_pair(peer_host_id, vector<TTokenValue>())).first;
541                     for (const auto & item : tokens) {
542                         TTokenValue value = strtol(item.c_str(), nullptr, 10);
543                         itr->second.push_back(value);
544                         cluster_tokens.insert(value);
545                     }
546                 }
547             }
548         }
549         ERR_POST(Info << "GET_TOKEN_MAP: PEERS HOST COUNT IS " << peer_tokens.size());
550         ERR_POST(Info << "GET_TOKEN_MAP: QUERY HOST COUNT IS " << query_host_count);
551     }
552 
553     for(const auto& token : cluster_tokens) {
554         ERR_POST(Trace << "GET_TOKEN_MAP: \ttoken " << token);
555     }
556 
557     ERR_POST(Info << "GET_TOKEN_MAP: tokens size " << cluster_tokens.size());
558     ranges.reserve(cluster_tokens.size() + 1);
559     TTokenValue lower_bound = numeric_limits<TTokenValue>::min();
560     for (int64_t token : cluster_tokens) {
561         ERR_POST(Trace << "GET_TOKEN_MAP: token " << token << " : " << token - lower_bound);
562         ranges.push_back(make_pair(lower_bound, token));
563         lower_bound = token;
564     }
565     ranges.push_back(make_pair(lower_bound, numeric_limits<TTokenValue>::max()));
566 }
567 
GetPartitionKeyColumnNames(string const & keyspace,string const & table) const568 vector<string> CCassConnection::GetPartitionKeyColumnNames(string const & keyspace, string const & table) const
569 {
570     if (!m_session) {
571         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, driver is not connected");
572     }
573     using TMeta = const CassSchemaMeta;
574     unique_ptr<TMeta, function<void(TMeta*)>> schema_meta(
575         cass_session_get_schema_meta(m_session),
576         [](TMeta* meta)-> void {
577             cass_schema_meta_free(meta);
578         }
579     );
580     if (!schema_meta) {
581         NCBI_THROW(CCassandraException, eFatal, "Cluster metadata is not resolved");
582     }
583 
584     const CassKeyspaceMeta* keyspace_meta = cass_schema_meta_keyspace_by_name_n(
585         schema_meta.get(), keyspace.c_str(), keyspace.size()
586     );
587     if (!keyspace_meta) {
588         NCBI_THROW(CCassandraException, eNotFound, "Keyspace not found");
589     }
590 
591     const CassTableMeta* table_meta = cass_keyspace_meta_table_by_name_n(
592         keyspace_meta, table.c_str(), table.size()
593     );
594     if (!table_meta) {
595         NCBI_THROW(CCassandraException, eNotFound, "Table not found");
596     }
597 
598     size_t partition_key_count = cass_table_meta_partition_key_count(table_meta);
599     vector<string> result;
600     result.reserve(partition_key_count);
601     for (size_t i = 0; i < partition_key_count; ++i) {
602         const CassColumnMeta* column_meta = cass_table_meta_partition_key(table_meta, i);
603         if (column_meta) {
604             const char* name;
605             size_t name_length;
606             cass_column_meta_name(column_meta, &name, &name_length);
607             result.emplace_back(name, name_length);
608         }
609     }
610     return result;
611 }
612 
Prepare(const string & sql)613 const CassPrepared * CCassConnection::Prepare(const string &  sql)
614 {
615     const CassPrepared * rv = nullptr;
616 #if CASS_PREPARED_Q
617     {
618         CSpinGuard guard(m_prepared_mux);
619         auto it = m_prepared.find(sql);
620         if (it != m_prepared.end()) {
621             rv = it->second;
622             return rv;
623         }
624     }
625 
626     CSpinGuard guard(m_prepared_mux);
627     auto it = m_prepared.find(sql);
628     if (it == m_prepared.end())
629     {
630         const char * query = sql.c_str();
631         CassFuture * __future = cass_session_prepare(m_session, query);
632         if (!__future) {
633             RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query future"));
634         }
635 
636         unique_ptr<CassFuture, function<void(CassFuture*)>> future(
637             __future,
638             [](CassFuture* future)
639             {
640                 cass_future_free(future);
641             }
642         );
643 
644         bool b;
645         b = cass_future_wait_timed(future.get(), m_qtimeoutms * 1000L);
646         if (!b) {
647             RAISE_DB_QRY_TIMEOUT(m_qtimeoutms, 0, string("failed to prepare query \"") + sql + "\"");
648         }
649         CassError rc = cass_future_error_code(future.get());
650         if (rc != CASS_OK) {
651             string msg = (rc == CASS_ERROR_SERVER_SYNTAX_ERROR || rc == CASS_ERROR_SERVER_INVALID_QUERY) ?
652                             string(", sql: ") + sql : string("");
653             RAISE_CASS_QRY_ERROR(future.get(), msg);
654         }
655 
656         rv = cass_future_get_prepared(future.get());
657         if (!rv) {
658             RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain prepared handle for sql: ") + sql);
659         }
660         m_prepared.emplace(sql, rv);
661     } else {
662         rv = it->second;
663         return rv;
664     }
665 #endif
666     return rv;
667 }
668 
669 
Perform(unsigned int optimeoutms,const std::function<bool ()> & PreLoopCB,const std::function<void (const CCassandraException &)> & DbExceptCB,const std::function<bool (bool)> & OpCB)670 void CCassConnection::Perform(
671                 unsigned int optimeoutms,
672                 const std::function<bool()> &  PreLoopCB,
673                 const std::function<void(const CCassandraException&)> &  DbExceptCB,
674                 const std::function<bool(bool)> &  OpCB)
675 {
676     int err_cnt = 0;
677     bool is_repeated = false;
678     int64_t op_begin = gettime();
679     while (!PreLoopCB || PreLoopCB()) {
680         try {
681             if (OpCB(is_repeated)) {
682                 break;
683             }
684             err_cnt = 0;
685         } catch (const CCassandraException &  e) {
686             // log and ignore, app-specific layer is responsible for
687             // re-connetion if needed
688             if (DbExceptCB) {
689                 DbExceptCB(e);
690             }
691 
692             if (e.GetErrCode() == CCassandraException::eQueryTimeout && ++err_cnt < 10) {
693                 ERR_POST(Info << "CAPTURED TIMEOUT: " << e.TimeoutMsg() << ", RESTARTING OP");
694             } else if (e.GetErrCode() == CCassandraException::eQueryFailedRestartable) {
695                 ERR_POST(Info << "CAPTURED RESTARTABLE EXCEPTION: " << e.what() << ", RESTARTING OP");
696             } else {
697                 // timer exceeded (10 times we got timeout and havn't read
698                 // anyting, or got another error -> try to reconnect
699                 ERR_POST("2. CAPTURED " << e.what());
700                 throw;
701             }
702 
703             int64_t     op_time_ms = (gettime() - op_begin) / 1000;
704             if (optimeoutms != 0 && op_time_ms > optimeoutms) {
705                 throw;
706             }
707         } catch(...) {
708             throw;
709         }
710         is_repeated = true;
711     }
712 }
713 
714 /** CCassPrm */
Bind(CassStatement * statement,unsigned int idx)715 void CCassPrm::Bind(CassStatement * statement, unsigned int idx)
716 {
717     CassError rc = CASS_OK;
718     switch (m_type) {
719         case CASS_VALUE_TYPE_UNKNOWN:
720             if (!IsAssigned()) {
721                 RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Param #" + to_string(idx) + " is not assigned");
722             }
723             rc = cass_statement_bind_null(statement, idx);
724             break;
725         case CASS_VALUE_TYPE_TINY_INT:
726             rc = cass_statement_bind_int8(statement, idx, static_cast<cass_int8_t>(m_simpleval.i8));
727             break;
728         case CASS_VALUE_TYPE_SMALL_INT:
729             rc = cass_statement_bind_int16(statement, idx, static_cast<cass_int16_t>(m_simpleval.i16));
730             break;
731         case CASS_VALUE_TYPE_INT:
732             rc = cass_statement_bind_int32(statement, idx, static_cast<cass_int32_t>(m_simpleval.i32));
733             break;
734         case CASS_VALUE_TYPE_BIGINT:
735             rc = cass_statement_bind_int64(statement, idx, static_cast<cass_int64_t>(m_simpleval.i64));
736             break;
737 
738         /*
739          * @saprykin
740          * Removed silent binding null if string is empty. It creates a lot of
741          * tombstones in storage. Sometimes if user wants to write empty string
742          * it means that we should write empty string.
743          * There is a proper method for writing nulls => CCassQuery::BindNull
744          */
745         case CASS_VALUE_TYPE_VARCHAR:
746             rc = cass_statement_bind_string(statement, idx, m_bytes.c_str());
747             break;
748         case CASS_VALUE_TYPE_BLOB:
749             if (m_bytes.size() > 0) {
750                 rc = cass_statement_bind_bytes(
751                     statement, idx,
752                     reinterpret_cast<const unsigned char*>(m_bytes.c_str()),
753                     m_bytes.size());
754             } else {
755                 rc = cass_statement_bind_null(statement, idx);
756             }
757             break;
758         case CASS_VALUE_TYPE_SET:
759         case CASS_VALUE_TYPE_LIST:
760         case CASS_VALUE_TYPE_MAP:
761             if (m_collection.get()) {
762                 rc = cass_statement_bind_collection(statement, idx, m_collection.get());
763             } else {
764                 rc = cass_statement_bind_null(statement, idx);
765             }
766             break;
767         case CASS_VALUE_TYPE_TUPLE:
768             if (m_tuple.get()) {
769                 rc = cass_statement_bind_tuple(statement, idx, m_tuple.get());
770             } else {
771                 rc = cass_statement_bind_null(statement, idx);
772             }
773             break;
774         default:
775             RAISE_DB_ERROR(eBindFailed, string("Bind for (") + to_string(static_cast<int>(m_type)) + ") type is not implemented");
776     }
777     if (rc != CASS_OK) {
778         RAISE_CASS_ERROR(rc, eBindFailed,
779             "Bind for (" + to_string(static_cast<int>(m_type)) + ") failed with rc= " + to_string(static_cast<int>(rc)));
780     }
781 }
782 
783 
784 /**  CCassQuery */
SetTimeout()785 void CCassQuery::SetTimeout()
786 {
787     SetTimeout(CASS_DRV_TIMEOUT_MS);
788 }
789 
SetTimeout(unsigned int t)790 void CCassQuery::SetTimeout(unsigned int t)
791 {
792     m_qtimeoutms = t;
793 }
794 
Timeout(void) const795 unsigned int CCassQuery::Timeout(void) const
796 {
797     return m_qtimeoutms;
798 }
799 
Close(void)800 void CCassQuery::Close(void)
801 {
802     InternalClose(true);
803 }
804 
~CCassQuery()805 CCassQuery::~CCassQuery()
806 {
807     Close();
808     m_ondata_data = nullptr;
809     m_ondata2_data = nullptr;
810     m_onexecute_data = nullptr;
811 }
812 
InternalClose(bool closebatch)813 void CCassQuery::InternalClose(bool  closebatch)
814 {
815     m_params.clear();
816     if (m_future) {
817         cass_future_free(m_future);
818         --m_connection->m_active_statements;
819         m_future = nullptr;
820         m_futuretime = 0;
821     }
822     if (m_statement) {
823         cass_statement_free(m_statement);
824         m_statement = nullptr;
825     }
826     if (m_batch && closebatch) {
827         cass_batch_free(m_batch);
828         m_batch = nullptr;
829     }
830     if (m_iterator) {
831         cass_iterator_free(m_iterator);
832         m_iterator = nullptr;
833     }
834     if (m_result) {
835         cass_result_free(m_result);
836         m_result = nullptr;
837     }
838     /*
839      * @saprykin
840      * Commented out.
841      * InternalClose is called from SetSQL and reseting SerialConsistency
842      * level adds SetSQL very nasty side effect.
843      * Consider not reusing CCassQuery for different types of requests.
844      */
845     // m_serial_consistency = CASS_CONSISTENCY_ANY;
846     m_row = nullptr;
847     m_page_start = false;
848     m_page_size = 0;
849     m_results_expected = false;
850     m_async = false;
851     m_allow_prepare = CASS_PREPARED_Q;
852     m_is_prepared = false;
853     if (m_cb_ref) {
854         m_cb_ref->Detach();
855         m_cb_ref = nullptr;
856     }
857 }
858 
GetSQL() const859 string CCassQuery::GetSQL() const {
860     return m_sql;
861 }
862 
SetSQL(const string & sql,unsigned int PrmCount)863 void CCassQuery::SetSQL(const string &  sql, unsigned int  PrmCount)
864 {
865     InternalClose(false);
866     m_sql = sql;
867     m_params.resize(PrmCount);
868 }
869 
870 
Bind()871 void CCassQuery::Bind()
872 {
873     if (!m_statement) {
874         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, query is closed");
875     }
876     int cnt = 0;
877     for (auto & it: m_params) {
878         it.Bind(m_statement, cnt);
879         cnt++;
880     }
881 }
882 
883 
BindNull(int iprm)884 void CCassQuery::BindNull(int iprm)
885 {
886     CheckParamExists(iprm);
887     m_params[iprm].AssignNull();
888 }
889 
BindInt8(int iprm,int8_t value)890 void CCassQuery::BindInt8(int iprm, int8_t value)
891 {
892     CheckParamExists(iprm);
893     m_params[iprm].Assign(value);
894 }
895 
BindInt16(int iprm,int16_t value)896 void CCassQuery::BindInt16(int iprm, int16_t value)
897 {
898     CheckParamExists(iprm);
899     m_params[iprm].Assign(value);
900 }
901 
BindInt32(int iprm,int32_t value)902 void CCassQuery::BindInt32(int iprm, int32_t value)
903 {
904     CheckParamExists(iprm);
905     m_params[iprm].Assign(value);
906 }
907 
908 
BindInt64(int iprm,int64_t value)909 void CCassQuery::BindInt64(int  iprm, int64_t  value)
910 {
911     CheckParamExists(iprm);
912     m_params[iprm].Assign(value);
913 }
914 
915 
BindStr(int iprm,const string & value)916 void CCassQuery::BindStr(int  iprm, const string &  value)
917 {
918     CheckParamExists(iprm);
919     m_params[iprm].Assign(value);
920 }
921 
922 
BindStr(int iprm,const char * value)923 void CCassQuery::BindStr(int  iprm, const char *  value)
924 {
925     CheckParamExists(iprm);
926     m_params[iprm].Assign(value);
927 }
928 
929 
BindBytes(int iprm,const unsigned char * buf,size_t len)930 void CCassQuery::BindBytes(int  iprm, const unsigned char *  buf, size_t  len)
931 {
932     CheckParamExists(iprm);
933     m_params[iprm].Assign(buf, len);
934 }
935 
936 
ParamAsInt32(int iprm)937 int32_t CCassQuery::ParamAsInt32(int  iprm)
938 {
939     CheckParamAssigned(iprm);
940     return m_params[iprm].AsInt32();
941 }
942 
943 
ParamAsInt64(int iprm)944 int64_t CCassQuery::ParamAsInt64(int  iprm)
945 {
946     CheckParamAssigned(iprm);
947     return m_params[iprm].AsInt64();
948 }
949 
950 
ParamAsStr(int iprm) const951 string CCassQuery::ParamAsStr(int  iprm) const
952 {
953     CheckParamAssigned(iprm);
954     return m_params[iprm].AsString();
955 }
956 
957 
ParamAsStr(int iprm,string & value) const958 void CCassQuery::ParamAsStr(int iprm, string & value) const
959 {
960     CheckParamAssigned(iprm);
961     m_params[iprm].AsString(value);
962 }
963 
ParamAsStrForDebug(int iprm) const964 string CCassQuery::ParamAsStrForDebug(int iprm) const
965 {
966     CheckParamAssigned(iprm);
967     return m_params[iprm].AsStringForDebug();
968 }
969 
ParamType(int iprm) const970 CassValueType CCassQuery::ParamType(int iprm) const
971 {
972     CheckParamAssigned(iprm);
973     return m_params[iprm].GetType();
974 }
975 
SetEOF(bool value)976 void CCassQuery::SetEOF(bool value)
977 {
978     if (m_EOF != value) {
979         m_EOF = value;
980     }
981 }
982 
NewBatch()983 void CCassQuery::NewBatch()
984 {
985     if (!m_connection) {
986         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
987     }
988     if (m_batch) {
989         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, batch has already been started");
990     }
991     if (IsActive()) {
992         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, query is active");
993     }
994 
995     m_batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
996     if (!m_batch) {
997         RAISE_DB_ERROR(eRsrcFailed, "failed to create batch");
998     }
999 }
1000 
1001 
Query(CassConsistency c,bool run_async,bool allow_prepared,unsigned int page_size)1002 void CCassQuery::Query(CassConsistency  c, bool  run_async,
1003                        bool  allow_prepared, unsigned int  page_size)
1004 {
1005     if (!m_connection) {
1006         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1007     }
1008     if (m_sql.empty()) {
1009         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, SQL is not set");
1010     }
1011     if (m_batch) {
1012         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, can't run select in batch mode");
1013     }
1014     if (IsActive()) {
1015         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is active");
1016     }
1017     const CassPrepared * prepared = nullptr;
1018     if (allow_prepared) {
1019         prepared = m_connection->Prepare(m_sql);
1020         m_is_prepared = prepared != nullptr;
1021     }
1022     if (m_is_prepared) {
1023         m_statement = cass_prepared_bind(prepared);
1024     } else {
1025         m_statement = cass_statement_new(m_sql.c_str(), m_params.size());
1026     }
1027 
1028     if (!m_statement) {
1029         RAISE_DB_ERROR(eRsrcFailed, string("failed to create cassandra query"));
1030     }
1031 
1032     try {
1033         CassError rc;
1034         Bind();
1035         rc = cass_statement_set_consistency(m_statement, c);
1036         if (rc != CASS_OK) {
1037             RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set consistency level " + to_string(static_cast<int>(c)));
1038         }
1039         if (m_serial_consistency != CASS_CONSISTENCY_ANY) {
1040             rc = cass_statement_set_serial_consistency(m_statement, m_serial_consistency);
1041             if (rc != CASS_OK) {
1042                 RAISE_CASS_ERROR(rc, eQueryFailed,
1043                     "Failed to set serial consistency level " + to_string(static_cast<int>(m_serial_consistency)));
1044             }
1045         }
1046         if (page_size > 0) {
1047             rc = cass_statement_set_paging_size(m_statement, page_size);
1048             if (rc != CASS_OK) {
1049                 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set page size to " + to_string(static_cast<int>(page_size)));
1050             }
1051         }
1052 
1053         m_page_start = true;
1054         m_page_size = page_size;
1055         SetEOF(false);
1056         m_results_expected = true;
1057         m_async = run_async;
1058         if (!m_batch) {
1059             if (run_async) {
1060                 GetFuture();
1061             } else {
1062                 Wait(m_qtimeoutms * 1000L);
1063             }
1064         }
1065     } catch(...) {
1066         Close();
1067         throw;
1068     }
1069 }
1070 
RestartQuery(CassConsistency c)1071 void CCassQuery::RestartQuery(CassConsistency c)
1072 {
1073     if (!m_future) {
1074         RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1075     }
1076     unsigned int page_size = m_page_size;
1077     CCassParams params = move(m_params);
1078     bool async = m_async, allow_prepared = m_is_prepared;
1079     Close();
1080     m_params = move(params);
1081     Query(c, async, allow_prepared, page_size);
1082 }
1083 
Execute(CassConsistency c,bool run_async,bool allow_prepared)1084 void CCassQuery::Execute(CassConsistency c, bool run_async, bool allow_prepared)
1085 {
1086     if (!m_connection) {
1087         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1088     }
1089     if (m_sql.empty()) {
1090         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, SQL is not set");
1091     }
1092     if (m_row != nullptr || m_statement != nullptr) {
1093         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is active");
1094     }
1095 
1096     const CassPrepared * prepared = nullptr;
1097     if (allow_prepared) {
1098         prepared = m_connection->Prepare(m_sql);
1099         m_is_prepared = prepared != nullptr;
1100     }
1101 
1102     if (m_is_prepared) {
1103         m_statement = cass_prepared_bind(prepared);
1104     } else {
1105         m_statement = cass_statement_new(m_sql.c_str(), m_params.size());
1106     }
1107 
1108     if (!m_statement) {
1109         RAISE_DB_ERROR(eRsrcFailed, "failed to create cassandra query");
1110     }
1111 
1112     try {
1113         CassError rc;
1114         Bind();
1115         rc = cass_statement_set_consistency(m_statement, c);
1116         if (rc != CASS_OK) {
1117             RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to set consistency level " + to_string(static_cast<int>(c)));
1118         }
1119         if (m_serial_consistency != CASS_CONSISTENCY_ANY) {
1120             rc = cass_statement_set_serial_consistency(m_statement, m_serial_consistency);
1121             if (rc != CASS_OK) {
1122                 RAISE_CASS_ERROR(rc, eQueryFailed,
1123                     "Failed to set serial consistency level " + to_string(static_cast<int>(m_serial_consistency)));
1124             }
1125         }
1126 
1127         if (m_batch) {
1128             CassError rc = cass_batch_add_statement(m_batch, m_statement);
1129             if (rc != CASS_OK) {
1130                 RAISE_CASS_ERROR(rc, eQueryFailed, "Failed to add statement to batch, sql: " + m_sql);
1131             }
1132             cass_statement_free(m_statement);
1133             m_statement = nullptr;
1134         }
1135 
1136         m_page_start = false;
1137         m_page_size = 0;
1138         SetEOF(false);
1139         m_results_expected = false;
1140         m_async = run_async;
1141         if (!m_batch) {
1142             if (run_async) {
1143                 GetFuture();
1144             } else {
1145                 Wait(0);
1146             }
1147         }
1148     } catch(...) {
1149         if (m_statement) {
1150             cass_statement_free(m_statement);
1151             m_statement = nullptr;
1152         }
1153         throw;
1154     }
1155 }
1156 
RestartExecute(CassConsistency c)1157 void CCassQuery::RestartExecute(CassConsistency c)
1158 {
1159     if (!m_future) {
1160         RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1161     }
1162     CCassParams params = move(m_params);
1163     bool async = m_async, allow_prepared = m_is_prepared;
1164     Close();
1165     m_params = move(params);
1166     Execute(c, async, allow_prepared);
1167 }
1168 
Restart(CassConsistency c)1169 void CCassQuery::Restart(CassConsistency c)
1170 {
1171     if (!m_future) {
1172         RAISE_DB_ERROR(eSeqFailed, "Query is is not in restartable state");
1173     }
1174     if (m_results_expected) {
1175         RestartQuery(c);
1176     } else {
1177         RestartExecute(c);
1178     }
1179 }
1180 
1181 
SetSerialConsistency(CassConsistency c)1182 void CCassQuery::SetSerialConsistency(CassConsistency  c)
1183 {
1184     m_serial_consistency = c;
1185 }
1186 
1187 
RunBatch()1188 async_rslt_t CCassQuery::RunBatch()
1189 {
1190     async_rslt_t rv = ar_wait;
1191     if (!m_batch) {
1192         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, batch is not created");
1193     }
1194 
1195     if (m_async) {
1196         GetFuture();
1197     } else {
1198         rv = Wait(m_qtimeoutms * 1000L);
1199         if (m_batch) {
1200             cass_batch_free(m_batch);
1201             m_batch = nullptr;
1202         }
1203     }
1204     return rv;
1205 }
1206 
1207 
WaitAsync(unsigned int timeoutmks)1208 async_rslt_t CCassQuery::WaitAsync(unsigned int  timeoutmks)
1209 {
1210     if (!m_async) {
1211         RAISE_DB_ERROR(eSeqFailed, "attempt to wait on query in non-async state");
1212     }
1213     return Wait(timeoutmks);
1214 }
1215 
1216 
IsReady()1217 bool CCassQuery::IsReady()
1218 {
1219     GetFuture();
1220     return cass_future_ready(m_future) == cass_true;
1221 }
1222 
1223 
GetFuture()1224 void CCassQuery::GetFuture()
1225 {
1226     if (!m_connection) {
1227         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, DB connection closed");
1228     }
1229     if (!IsActive() && !m_batch) {
1230         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is not active");
1231     }
1232     if (m_iterator || m_result) {
1233         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, results already obtained");
1234     }
1235     if (!m_future) {
1236         ++m_connection->m_active_statements;
1237         if (m_batch) {
1238             m_future = cass_session_execute_batch(m_connection->m_session, m_batch);
1239         } else {
1240             m_future = cass_session_execute(m_connection->m_session, m_statement);
1241         }
1242         if (!m_future) {
1243             --m_connection->m_active_statements;
1244             RAISE_DB_ERROR(eRsrcFailed, "failed to obtain cassandra query future");
1245         }
1246         m_futuretime = gettime();
1247         if (m_ondata || m_ondata2 || m_ondata3.lock()) {
1248             SetupOnDataCallback();
1249         }
1250     }
1251 }
1252 
1253 
Wait(unsigned int timeoutmks)1254 async_rslt_t  CCassQuery::Wait(unsigned int  timeoutmks)
1255 {
1256     if (m_results_expected && m_result) {
1257         if (m_async) {
1258             return ar_dataready;
1259         }
1260         RAISE_DB_ERROR(eSeqFailed, "result has already been allocated");
1261     }
1262 
1263     if (m_EOF && m_results_expected) {
1264         return ar_done;
1265     }
1266 
1267     GetFuture();
1268     bool rv;
1269     if (timeoutmks != 0) {
1270         rv = cass_future_wait_timed(m_future, timeoutmks);
1271     } else if (!m_async) {
1272         cass_future_wait(m_future);
1273         rv = true;
1274     } else {
1275         rv = cass_future_ready(m_future);
1276     }
1277 
1278     if (!rv) {
1279         if (!m_async && timeoutmks > 0) {
1280             int64_t t = (gettime() - m_futuretime) / 1000L;
1281             RAISE_DB_QRY_TIMEOUT(t, timeoutmks / 1000L, string("failed to perform query \"") + m_sql + "\"");
1282         } else {
1283             return ar_wait;
1284         }
1285     } else {
1286         ProcessFutureResult();
1287         if ((m_statement || m_batch) && !m_results_expected) {
1288             // next request was run in m_ondata event handler
1289             return ar_wait;
1290         } else if (m_EOF || !m_results_expected) {
1291             return ar_done;
1292         } else if (m_result) {
1293             return ar_dataready;
1294         } else {
1295             return ar_wait;
1296         }
1297     }
1298 }
1299 
SetupOnDataCallback()1300 void CCassQuery::SetupOnDataCallback()
1301 {
1302     if (!m_future) {
1303         RAISE_DB_ERROR(eSeqFailed, "Future is not assigned");
1304     }
1305     if (!m_ondata && !m_ondata2 && !m_ondata3.lock()) {
1306         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, m_ondata is not set");
1307     }
1308     if (m_cb_ref) {
1309         m_cb_ref->Detach();
1310     }
1311     m_cb_ref.reset(new CCassQueryCbRef(shared_from_this()));
1312     m_cb_ref->Attach(m_ondata, m_ondata_data, m_ondata2, m_ondata2_data, m_ondata3.lock());
1313     CassError rv = cass_future_set_callback(m_future, CCassQueryCbRef::s_OnFutureCb, m_cb_ref.get());
1314     if (rv != CASS_OK) {
1315         m_cb_ref->Detach(true);
1316         m_cb_ref = nullptr;
1317         RAISE_DB_ERROR(eSeqFailed, "failed to assign future callback");
1318     }
1319 }
1320 
ProcessFutureResult()1321 void CCassQuery::ProcessFutureResult()
1322 {
1323     if (!m_future) {
1324         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, m_future is not set");
1325     }
1326     if (m_iterator || m_result) {
1327         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, results already obtained");
1328     }
1329     CassError rc = cass_future_error_code(m_future);
1330     if (rc != CASS_OK) {
1331         if (m_statement) {
1332             cass_statement_free(m_statement);
1333             m_statement = nullptr;
1334         }
1335         string msg;
1336         if (rc == CASS_ERROR_SERVER_SYNTAX_ERROR || rc == CASS_ERROR_SERVER_INVALID_QUERY) {
1337             msg = ", sql: " + m_sql;
1338         }
1339         RAISE_CASS_QRY_ERROR(m_future, msg);
1340     }
1341 
1342     if (m_results_expected) {
1343         m_result = cass_future_get_result(m_future);
1344         if (!m_result) {
1345             RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query result"));
1346         }
1347         if (m_iterator) {
1348             RAISE_DB_ERROR(eSeqFailed, "iterator has already been allocated");
1349         }
1350 
1351         m_iterator = cass_iterator_from_result(m_result);
1352         if (!m_iterator) {
1353             RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query iterator"));
1354         }
1355         /*
1356             we release m_future here for statements that return dataset,
1357             otherwise we free it in the destructor, keeping m_future not null
1358             as an indication that we have already waited for query to finish
1359         */
1360         cass_future_free(m_future);
1361         --m_connection->m_active_statements;
1362         m_future = nullptr;
1363         if (m_cb_ref) {
1364             m_cb_ref->Detach();
1365             m_cb_ref = nullptr;
1366         }
1367         m_futuretime = 0;
1368         m_page_start = false;
1369     } else {
1370         if (m_statement) {
1371             cass_statement_free(m_statement);
1372             m_statement = nullptr;
1373         }
1374         if (m_batch) {
1375             cass_batch_free(m_batch);
1376             m_batch = nullptr;
1377         }
1378         SetEOF(true);
1379         if (m_onexecute) {
1380             m_onexecute(*this, m_onexecute_data);
1381         }
1382     }
1383 }
1384 
1385 
NextRow()1386 async_rslt_t CCassQuery::NextRow()
1387 {
1388     if (!IsActive()) {
1389         RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, Query is not active");
1390     }
1391     while(1) {
1392         m_row = nullptr;
1393         if (m_result == nullptr) {
1394             async_rslt_t wr;
1395             if (m_async) {
1396                 wr = Wait(0);
1397             } else {
1398                 wr = Wait(m_qtimeoutms * 1000L);
1399             }
1400             if (wr != ar_dataready) {
1401                 return wr;
1402             }
1403         }
1404         if (m_iterator && m_result) {
1405             bool b = cass_iterator_next(m_iterator);
1406             if (!b) {
1407                 if (m_page_size > 0) {
1408                     bool has_more_pages = cass_result_has_more_pages(m_result);
1409                     if (has_more_pages) {
1410                         CassError err;
1411                         if ((err = cass_statement_set_paging_state(m_statement, m_result)) != CASS_OK) {
1412                             RAISE_CASS_ERROR(err, eFetchFailed, string("failed to retrive next page"));
1413                         }
1414                     }
1415                     cass_iterator_free(m_iterator);
1416                     m_iterator = nullptr;
1417                     cass_result_free(m_result);
1418                     m_result = nullptr;
1419                     if (!has_more_pages) {
1420                         SetEOF(true);
1421                         return ar_done;
1422                     }
1423                     // go to above
1424                     m_page_start = true;
1425                 } else {
1426                     SetEOF(true);
1427                     return ar_done;
1428                 }
1429             } else {
1430                 m_row = cass_iterator_get_row(m_iterator);
1431                 if (!m_row) {
1432                     RAISE_DB_ERROR(eRsrcFailed, string("failed to obtain cassandra query result row"));
1433                 }
1434                 return ar_dataready;
1435             }
1436         } else {
1437             RAISE_DB_ERROR(eSeqFailed, "invalid sequence of operations, attempt to fetch next row on a closed query");
1438         }
1439     }
1440 }
1441 
1442 
1443 template<>
GetColumn(int ifld) const1444 const CassValue * CCassQuery::GetColumn(int ifld) const
1445 {
1446     if (!m_row) {
1447         RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1448     }
1449     const CassValue * clm = cass_row_get_column(m_row, ifld);
1450     if (!clm) {
1451         RAISE_DB_ERROR(eSeqFailed, "column is not fetched (index " + to_string(ifld) + " beyound the range?)");
1452     }
1453     return clm;
1454 }
1455 
1456 
1457 template<>
GetColumn(const string & name) const1458 const CassValue * CCassQuery::GetColumn(const string & name) const
1459 {
1460     if (!m_row) {
1461         RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1462     }
1463     const CassValue * clm = cass_row_get_column_by_name_n(m_row, name.c_str(), name.size());
1464     if (!clm) {
1465         RAISE_DB_ERROR(eSeqFailed, "column " + name + " is not available");
1466     }
1467     return clm;
1468 }
1469 
1470 template<>
GetColumn(const char * name) const1471 const CassValue * CCassQuery::GetColumn(const char * name) const
1472 {
1473     if (!m_row) {
1474         RAISE_DB_ERROR(eSeqFailed, "query row is not fetched");
1475     }
1476 
1477     const CassValue * clm = cass_row_get_column_by_name(m_row, name);
1478     if (!clm) {
1479         RAISE_DB_ERROR(eSeqFailed, "column " + string(name) + " is not available");
1480     }
1481     return clm;
1482 }
1483 
1484 template<>
GetColumnDef(int ifld) const1485 string CCassQuery::GetColumnDef(int ifld) const {
1486     return ToString() + "\ncolumn: " + NStr::NumericToString(ifld);
1487 }
1488 
1489 template<>
GetColumnDef(const string & name) const1490 string CCassQuery::GetColumnDef(const string& name) const {
1491     return ToString() + "\ncolumn: " + name;
1492 }
1493 
1494 template<>
GetColumnDef(const char * name) const1495 string CCassQuery::GetColumnDef(const char* name) const {
1496     return ToString() + "\ncolumn: " + string(name);
1497 }
1498 
ToString() const1499 string CCassQuery::ToString() const
1500 {
1501     string params;
1502     for (size_t i = 0; i < ParamCount(); ++i) {
1503         if (!params.empty()) {
1504             params.append(", ");
1505         }
1506         switch (ParamType(i)) {
1507             case CASS_VALUE_TYPE_SET:
1508                 params.append("?SET");
1509                 break;
1510             case CASS_VALUE_TYPE_LIST:
1511                 params.append("?LIST");
1512                 break;
1513             case CASS_VALUE_TYPE_MAP:
1514                 params.append("?MAP");
1515                 break;
1516             case CASS_VALUE_TYPE_TUPLE:
1517                 params.append("?TUPLE");
1518                 break;
1519             default: {
1520                 string prm;
1521                 try {
1522                     prm = ParamAsStr(i);
1523                 }
1524                 catch(const CCassandraException&) {
1525                     prm = "???[" + NStr::NumericToString(i) + "]";
1526                 }
1527                 params.append(prm);
1528             }
1529         }
1530     }
1531 
1532     return m_sql.empty() ? "<>" : m_sql + "\nparams: " + params;
1533 }
1534 
IsEOF(void) const1535 bool CCassQuery::IsEOF(void) const
1536 {
1537     return m_EOF;
1538 }
1539 
IsAsync(void) const1540 bool CCassQuery::IsAsync(void) const
1541 {
1542     return m_async;
1543 }
1544 
1545 END_IDBLOB_SCOPE
1546