1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 // Copyright (C) 2015-2018 Deutsche Telekom AG.
3 //
4 // Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
5 //          Andrei Pavel <andrei.pavel@qualitance.com>
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 //           http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 
19 #include <config.h>
20 
21 #include <cql/cql_connection.h>
22 #include <cql/cql_exchange.h>
23 #include <database/db_exceptions.h>
24 #include <database/db_log.h>
25 
26 #include <string>
27 
28 namespace isc {
29 namespace db {
30 
CqlConnection(const ParameterMap & parameters)31 CqlConnection::CqlConnection(const ParameterMap& parameters)
32     : DatabaseConnection(parameters), statements_(), cluster_(NULL),
33       session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM),
34       serial_consistency_(CASS_CONSISTENCY_UNKNOWN), schema_meta_(NULL),
35       keyspace_meta_(NULL), force_consistency_(true) {
36 }
37 
~CqlConnection()38 CqlConnection::~CqlConnection() {
39     // Free up the prepared statements, ignoring errors. Session and connection
40     // resources are deallocated.
41     CassError rc = CASS_OK;
42     std::string error;
43 
44     // Let's free the prepared statements.
45     for (StatementMapEntry s : statements_) {
46         CqlTaggedStatement statement = s.second;
47         if (statement.prepared_statement_) {
48             cass_prepared_free(statement.prepared_statement_);
49         }
50     }
51 
52     // If there's a session, tear it down and free the resources.
53     if (session_) {
54         cass_schema_meta_free(schema_meta_);
55         CassFuture* close_future = cass_session_close(session_);
56         cass_future_wait(close_future);
57         error = checkFutureError(
58             "CqlConnection::~CqlConnection(): cass_session_close() != CASS_OK",
59             close_future);
60         rc = cass_future_error_code(close_future);
61         cass_future_free(close_future);
62         cass_session_free(session_);
63         session_ = NULL;
64     }
65 
66     // Free the cluster if there's one.
67     if (cluster_) {
68         cass_cluster_free(cluster_);
69         cluster_ = NULL;
70     }
71 
72     if (rc != CASS_OK) {
73         // We're closing the connection anyway. Let's not throw at this stage.
74         DB_LOG_ERROR(CQL_DEALLOC_ERROR).arg(error);
75     }
76 }
77 
78 std::pair<uint32_t, uint32_t>
getVersion(const ParameterMap & parameters)79 CqlConnection::getVersion(const ParameterMap& parameters) {
80     // Get a connection.
81     CqlConnection conn(parameters);
82 
83     // Open the database.
84     conn.openDatabase();
85 
86     // Prepare statement.
87     conn.prepareStatements(CqlVersionExchange::tagged_statements_);
88 
89     std::unique_ptr<CqlVersionExchange> version_exchange(new CqlVersionExchange());
90     return version_exchange->retrieveVersion(conn);
91 }
92 
parseConsistency(std::string value)93 CassConsistency CqlConnection::parseConsistency(std::string value) {
94     static std::map<std::string, CassConsistency> consistency_map {
95         {"any", CASS_CONSISTENCY_ANY},
96         {"one", CASS_CONSISTENCY_ONE},
97         {"two", CASS_CONSISTENCY_TWO},
98         {"three", CASS_CONSISTENCY_THREE},
99         {"quorum", CASS_CONSISTENCY_QUORUM},
100         {"all", CASS_CONSISTENCY_ALL},
101         {"local-quorum", CASS_CONSISTENCY_LOCAL_QUORUM},
102         {"each-quorum", CASS_CONSISTENCY_EACH_QUORUM},
103         {"serial", CASS_CONSISTENCY_SERIAL},
104         {"local-serial", CASS_CONSISTENCY_LOCAL_SERIAL},
105         {"local-one", CASS_CONSISTENCY_LOCAL_ONE}
106     };
107     if (consistency_map.find(value) == consistency_map.end()) {
108         return CASS_CONSISTENCY_UNKNOWN;
109     }
110     return consistency_map[value];
111 }
112 
113 void
openDatabase()114 CqlConnection::openDatabase() {
115     CassError rc;
116     // Set up the values of the parameters
117     const char* contact_points = "127.0.0.1";
118     std::string scontact_points;
119     try {
120         scontact_points = getParameter("contact-points");
121         contact_points = scontact_points.c_str();
122     } catch (...) {
123         // No host. Fine, we'll use "127.0.0.1".
124     }
125 
126     const char* port = NULL;
127     std::string sport;
128     try {
129         sport = getParameter("port");
130         port = sport.c_str();
131     } catch (...) {
132         // No port. Fine, we'll use the default "9042".
133     }
134 
135     const char* user = NULL;
136     std::string suser;
137     try {
138         suser = getParameter("user");
139         user = suser.c_str();
140     } catch (...) {
141         // No user. Fine, we'll use NULL.
142     }
143 
144     const char* password = NULL;
145     std::string spassword;
146     try {
147         spassword = getParameter("password");
148         password = spassword.c_str();
149     } catch (...) {
150         // No password. Fine, we'll use NULL.
151     }
152 
153     const char* keyspace = "keatest";
154     std::string skeyspace;
155     try {
156         skeyspace = getParameter("keyspace");
157         keyspace = skeyspace.c_str();
158     } catch (...) {
159         // No keyspace name. Fine, we'll use "keatest".
160     }
161 
162     const char* consistency = NULL;
163     std::string sconsistency;
164     try {
165         sconsistency = getParameter("consistency");
166         consistency = sconsistency.c_str();
167     } catch (...) {
168         // No consistency. Fine, we'll use "quorum".
169     }
170 
171     const char* serial_consistency = NULL;
172     std::string sserial_consistency;
173     try {
174         sserial_consistency = getParameter("serial-consistency");
175         serial_consistency = sserial_consistency.c_str();
176     } catch (...) {
177         // No serial consistency. Fine, we'll use "serial".
178     }
179 
180     const char* reconnect_wait_time = NULL;
181     std::string sreconnect_wait_time;
182     try {
183         sreconnect_wait_time = getParameter("reconnect-wait-time");
184         reconnect_wait_time = sreconnect_wait_time.c_str();
185     } catch (...) {
186         // No reconnect wait time. Fine, we'll use the default "2000".
187     }
188 
189     const char* connect_timeout = NULL;
190     std::string sconnect_timeout;
191     try {
192         sconnect_timeout = getParameter("connect-timeout");
193         connect_timeout = sconnect_timeout.c_str();
194     } catch (...) {
195         // No connect timeout. Fine, we'll use the default "5000".
196     }
197 
198     const char* request_timeout = NULL;
199     std::string srequest_timeout;
200     try {
201         srequest_timeout = getParameter("request-timeout");
202         request_timeout = srequest_timeout.c_str();
203     } catch (...) {
204         // No request timeout. Fine, we'll use the default "12000".
205     }
206 
207     const char* tcp_keepalive = NULL;
208     std::string stcp_keepalive;
209     try {
210         stcp_keepalive = getParameter("tcp-keepalive");
211         tcp_keepalive = stcp_keepalive.c_str();
212     } catch (...) {
213         // No tcp-keepalive. Fine, we'll not use TCP keepalive.
214     }
215 
216     std::string stcp_nodelay;
217     try {
218         stcp_nodelay = getParameter("tcp-nodelay");
219     } catch (...) {
220         // No tcp-nodelay. Fine, we'll use the default false.
221     }
222 
223     cluster_ = cass_cluster_new();
224     cass_cluster_set_contact_points(cluster_, contact_points);
225 
226     if (user && password) {
227         cass_cluster_set_credentials(cluster_, user, password);
228     }
229 
230     if (port) {
231         int32_t port_number;
232         try {
233             port_number = boost::lexical_cast<int32_t>(port);
234             if (port_number < 1 || port_number > 65535) {
235                 isc_throw(DbOperationError,
236                           "CqlConnection::openDatabase(): "
237                           "port outside of range, expected "
238                           "1-65535, instead got "
239                               << port);
240             }
241         } catch (const boost::bad_lexical_cast& ex) {
242             isc_throw(DbOperationError,
243                       "CqlConnection::openDatabase(): invalid "
244                       "port, expected castable to int, instead got "
245                       "\"" << port
246                            << "\", " << ex.what());
247         }
248         cass_cluster_set_port(cluster_, port_number);
249     }
250 
251     if (consistency) {
252         CassConsistency desired_consistency = CqlConnection::parseConsistency(sconsistency);
253         CassConsistency desired_serial_consistency = CASS_CONSISTENCY_UNKNOWN;
254         if (serial_consistency) {
255             desired_serial_consistency = CqlConnection::parseConsistency(sserial_consistency);
256         }
257         if (desired_consistency != CASS_CONSISTENCY_UNKNOWN) {
258             setConsistency(true, desired_consistency, desired_serial_consistency);
259         }
260     }
261 
262     if (reconnect_wait_time) {
263         int32_t reconnect_wait_time_number;
264         try {
265             reconnect_wait_time_number =
266                 boost::lexical_cast<int32_t>(reconnect_wait_time);
267             if (reconnect_wait_time_number < 0) {
268                 isc_throw(DbOperationError,
269                           "CqlConnection::openDatabase(): invalid reconnect "
270                           "wait time, expected positive number, instead got "
271                               << reconnect_wait_time);
272             }
273         } catch (const boost::bad_lexical_cast& ex) {
274             isc_throw(DbOperationError,
275                       "CqlConnection::openDatabase(): "
276                       "invalid reconnect wait time, expected "
277                       "castable to int, instead got \""
278                           << reconnect_wait_time << "\", " << ex.what());
279         }
280 #if (CASS_VERSION_MAJOR > 2) || \
281     ((CASS_VERSION_MAJOR == 2) && (CASS_VERSION_MINOR >= 13))
282         cass_uint64_t delay_ms =
283             static_cast<cass_uint64_t>(reconnect_wait_time_number);
284         cass_cluster_set_constant_reconnect(cluster_, delay_ms);
285 #else
286         cass_cluster_set_reconnect_wait_time(cluster_,
287                                              reconnect_wait_time_number);
288 #endif
289     }
290 
291     if (connect_timeout) {
292         int32_t connect_timeout_number;
293         try {
294             connect_timeout_number =
295                 boost::lexical_cast<int32_t>(connect_timeout);
296             if (connect_timeout_number < 0) {
297                 isc_throw(DbOperationError,
298                           "CqlConnection::openDatabase(): "
299                           "invalid connect timeout, expected "
300                           "positive number, instead got "
301                               << connect_timeout);
302             }
303         } catch (const boost::bad_lexical_cast& ex) {
304             isc_throw(DbOperationError,
305                       "CqlConnection::openDatabase(): invalid connect timeout, "
306                       "expected castable to int, instead got \""
307                           << connect_timeout << "\", " << ex.what());
308         }
309         cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
310     }
311 
312     if (request_timeout) {
313         int32_t request_timeout_number;
314         try {
315             request_timeout_number =
316                 boost::lexical_cast<int32_t>(request_timeout);
317             if (request_timeout_number < 0) {
318                 isc_throw(DbOperationError,
319                           "CqlConnection::openDatabase(): "
320                           "invalid request timeout, expected "
321                           "positive number, instead got "
322                               << request_timeout);
323             }
324         } catch (const boost::bad_lexical_cast& ex) {
325             isc_throw(DbOperationError,
326                       "CqlConnection::openDatabase(): invalid request timeout, "
327                       "expected castable to int, instead got \""
328                           << request_timeout << "\", " << ex.what());
329         }
330         cass_cluster_set_request_timeout(cluster_, request_timeout_number);
331     }
332 
333     if (tcp_keepalive) {
334         int32_t tcp_keepalive_number;
335         try {
336             tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
337             if (tcp_keepalive_number < 0) {
338                 isc_throw(DbOperationError,
339                           "CqlConnection::openDatabase(): "
340                           "invalid TCP keepalive, expected "
341                           "positive number, instead got "
342                               << tcp_keepalive);
343             }
344         } catch (const boost::bad_lexical_cast& ex) {
345             isc_throw(DbOperationError,
346                       "CqlConnection::openDatabase(): invalid TCP keepalive, "
347                       "expected castable to int, instead got \""
348                           << tcp_keepalive << "\", " << ex.what());
349         }
350         cass_cluster_set_tcp_keepalive(cluster_, cass_true,
351                                        tcp_keepalive_number);
352     }
353 
354     if (stcp_nodelay == "true") {
355         cass_cluster_set_tcp_nodelay(cluster_, cass_true);
356     }
357 
358     session_ = cass_session_new();
359 
360     CassFuture* connect_future =
361         cass_session_connect_keyspace(session_, cluster_, keyspace);
362     cass_future_wait(connect_future);
363     const std::string error =
364         checkFutureError("CqlConnection::openDatabase(): "
365                          "cass_session_connect_keyspace() != CASS_OK",
366                          connect_future);
367     rc = cass_future_error_code(connect_future);
368     cass_future_free(connect_future);
369     if (rc != CASS_OK) {
370         cass_session_free(session_);
371         session_ = NULL;
372         cass_cluster_free(cluster_);
373         cluster_ = NULL;
374         isc_throw(DbOpenError, error);
375     }
376 
377     // Get keyspace meta.
378     schema_meta_ = cass_session_get_schema_meta(session_);
379     keyspace_meta_ = cass_schema_meta_keyspace_by_name(schema_meta_, keyspace);
380     if (!keyspace_meta_) {
381         isc_throw(DbOpenError, "CqlConnection::openDatabase(): "
382                                "!cass_schema_meta_keyspace_by_name()");
383     }
384 }
385 
386 void
prepareStatements(StatementMap & statements)387 CqlConnection::prepareStatements(StatementMap& statements) {
388     CassError rc = CASS_OK;
389     for (StatementMapEntry it : statements) {
390         CqlTaggedStatement& tagged_statement = it.second;
391         if (statements_.find(tagged_statement.name_) != statements_.end()) {
392             isc_throw(DbOperationError,
393                       "CqlConnection::prepareStatements(): "
394                       "duplicate statement with name "
395                           << tagged_statement.name_);
396         }
397 
398         CassFuture* future =
399             cass_session_prepare(session_, tagged_statement.text_);
400         cass_future_wait(future);
401         const std::string error =
402             checkFutureError("CqlConnection::prepareStatements():"
403                              " cass_session_prepare() != CASS_OK",
404                              future, tagged_statement.name_);
405         rc = cass_future_error_code(future);
406         if (rc != CASS_OK) {
407             cass_future_free(future);
408             isc_throw(DbOperationError, error);
409         }
410 
411         tagged_statement.prepared_statement_ = cass_future_get_prepared(future);
412         statements_.insert(it);
413         cass_future_free(future);
414     }
415 }
416 
417 void
setConsistency(bool force,CassConsistency consistency,CassConsistency serial_consistency)418 CqlConnection::setConsistency(bool force,
419                               CassConsistency consistency,
420                               CassConsistency serial_consistency) {
421     force_consistency_ = force;
422     consistency_ = consistency;
423     serial_consistency_ = serial_consistency;
424 }
425 
426 void
startTransaction()427 CqlConnection::startTransaction() {
428     DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_BEGIN_TRANSACTION);
429 }
430 
431 void
commit()432 CqlConnection::commit() {
433     DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_COMMIT);
434 }
435 
436 void
rollback()437 CqlConnection::rollback() {
438     DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_ROLLBACK);
439 }
440 
441 const std::string
checkFutureError(const std::string & what,CassFuture * future,StatementTag statement_tag)442 CqlConnection::checkFutureError(const std::string& what,
443                                 CassFuture* future,
444                                 StatementTag statement_tag /* = NULL */) {
445     CassError cass_error = cass_future_error_code(future);
446     const char* error_message;
447     size_t error_message_size;
448     cass_future_error_message(future, &error_message, &error_message_size);
449 
450     std::stringstream stream;
451     if (statement_tag && std::strlen(statement_tag) > 0) {
452         // future is from cass_session_execute() call.
453         stream << "Statement ";
454         stream << statement_tag;
455     } else {
456         // future is from cass_session_*() call.
457         stream << "Session action ";
458     }
459     if (cass_error == CASS_OK) {
460         stream << " executed successfully.";
461     } else {
462         stream << " failed, Kea error: " << what
463                << ", Cassandra error code: " << cass_error_desc(cass_error)
464                << ", Cassandra future error: " << error_message;
465     }
466     return stream.str();
467 }
468 
469 }  // namespace dhcp
470 }  // namespace isc
471