1 // Copyright (C) 2018-2021 Internet Systems Consortium, Inc. ("ISC")
2 // Copyright (C) 2015-2018 Deutsche Telekom AG.
3 //
4 // Authors: Razvan Becheriu <razvan.becheriu@qualitance.com>
5 // Andrei Pavel <andrei.pavel@qualitance.com>
6 //
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
10 //
11 // http://www.apache.org/licenses/LICENSE-2.0
12 //
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18
19 #include <config.h>
20
21 #include <cql/cql_connection.h>
22 #include <cql/cql_exchange.h>
23 #include <database/db_exceptions.h>
24 #include <database/db_log.h>
25
26 #include <string>
27
28 namespace isc {
29 namespace db {
30
CqlConnection(const ParameterMap & parameters)31 CqlConnection::CqlConnection(const ParameterMap& parameters)
32 : DatabaseConnection(parameters), statements_(), cluster_(NULL),
33 session_(NULL), consistency_(CASS_CONSISTENCY_QUORUM),
34 serial_consistency_(CASS_CONSISTENCY_UNKNOWN), schema_meta_(NULL),
35 keyspace_meta_(NULL), force_consistency_(true) {
36 }
37
~CqlConnection()38 CqlConnection::~CqlConnection() {
39 // Free up the prepared statements, ignoring errors. Session and connection
40 // resources are deallocated.
41 CassError rc = CASS_OK;
42 std::string error;
43
44 // Let's free the prepared statements.
45 for (StatementMapEntry s : statements_) {
46 CqlTaggedStatement statement = s.second;
47 if (statement.prepared_statement_) {
48 cass_prepared_free(statement.prepared_statement_);
49 }
50 }
51
52 // If there's a session, tear it down and free the resources.
53 if (session_) {
54 cass_schema_meta_free(schema_meta_);
55 CassFuture* close_future = cass_session_close(session_);
56 cass_future_wait(close_future);
57 error = checkFutureError(
58 "CqlConnection::~CqlConnection(): cass_session_close() != CASS_OK",
59 close_future);
60 rc = cass_future_error_code(close_future);
61 cass_future_free(close_future);
62 cass_session_free(session_);
63 session_ = NULL;
64 }
65
66 // Free the cluster if there's one.
67 if (cluster_) {
68 cass_cluster_free(cluster_);
69 cluster_ = NULL;
70 }
71
72 if (rc != CASS_OK) {
73 // We're closing the connection anyway. Let's not throw at this stage.
74 DB_LOG_ERROR(CQL_DEALLOC_ERROR).arg(error);
75 }
76 }
77
78 std::pair<uint32_t, uint32_t>
getVersion(const ParameterMap & parameters)79 CqlConnection::getVersion(const ParameterMap& parameters) {
80 // Get a connection.
81 CqlConnection conn(parameters);
82
83 // Open the database.
84 conn.openDatabase();
85
86 // Prepare statement.
87 conn.prepareStatements(CqlVersionExchange::tagged_statements_);
88
89 std::unique_ptr<CqlVersionExchange> version_exchange(new CqlVersionExchange());
90 return version_exchange->retrieveVersion(conn);
91 }
92
parseConsistency(std::string value)93 CassConsistency CqlConnection::parseConsistency(std::string value) {
94 static std::map<std::string, CassConsistency> consistency_map {
95 {"any", CASS_CONSISTENCY_ANY},
96 {"one", CASS_CONSISTENCY_ONE},
97 {"two", CASS_CONSISTENCY_TWO},
98 {"three", CASS_CONSISTENCY_THREE},
99 {"quorum", CASS_CONSISTENCY_QUORUM},
100 {"all", CASS_CONSISTENCY_ALL},
101 {"local-quorum", CASS_CONSISTENCY_LOCAL_QUORUM},
102 {"each-quorum", CASS_CONSISTENCY_EACH_QUORUM},
103 {"serial", CASS_CONSISTENCY_SERIAL},
104 {"local-serial", CASS_CONSISTENCY_LOCAL_SERIAL},
105 {"local-one", CASS_CONSISTENCY_LOCAL_ONE}
106 };
107 if (consistency_map.find(value) == consistency_map.end()) {
108 return CASS_CONSISTENCY_UNKNOWN;
109 }
110 return consistency_map[value];
111 }
112
113 void
openDatabase()114 CqlConnection::openDatabase() {
115 CassError rc;
116 // Set up the values of the parameters
117 const char* contact_points = "127.0.0.1";
118 std::string scontact_points;
119 try {
120 scontact_points = getParameter("contact-points");
121 contact_points = scontact_points.c_str();
122 } catch (...) {
123 // No host. Fine, we'll use "127.0.0.1".
124 }
125
126 const char* port = NULL;
127 std::string sport;
128 try {
129 sport = getParameter("port");
130 port = sport.c_str();
131 } catch (...) {
132 // No port. Fine, we'll use the default "9042".
133 }
134
135 const char* user = NULL;
136 std::string suser;
137 try {
138 suser = getParameter("user");
139 user = suser.c_str();
140 } catch (...) {
141 // No user. Fine, we'll use NULL.
142 }
143
144 const char* password = NULL;
145 std::string spassword;
146 try {
147 spassword = getParameter("password");
148 password = spassword.c_str();
149 } catch (...) {
150 // No password. Fine, we'll use NULL.
151 }
152
153 const char* keyspace = "keatest";
154 std::string skeyspace;
155 try {
156 skeyspace = getParameter("keyspace");
157 keyspace = skeyspace.c_str();
158 } catch (...) {
159 // No keyspace name. Fine, we'll use "keatest".
160 }
161
162 const char* consistency = NULL;
163 std::string sconsistency;
164 try {
165 sconsistency = getParameter("consistency");
166 consistency = sconsistency.c_str();
167 } catch (...) {
168 // No consistency. Fine, we'll use "quorum".
169 }
170
171 const char* serial_consistency = NULL;
172 std::string sserial_consistency;
173 try {
174 sserial_consistency = getParameter("serial-consistency");
175 serial_consistency = sserial_consistency.c_str();
176 } catch (...) {
177 // No serial consistency. Fine, we'll use "serial".
178 }
179
180 const char* reconnect_wait_time = NULL;
181 std::string sreconnect_wait_time;
182 try {
183 sreconnect_wait_time = getParameter("reconnect-wait-time");
184 reconnect_wait_time = sreconnect_wait_time.c_str();
185 } catch (...) {
186 // No reconnect wait time. Fine, we'll use the default "2000".
187 }
188
189 const char* connect_timeout = NULL;
190 std::string sconnect_timeout;
191 try {
192 sconnect_timeout = getParameter("connect-timeout");
193 connect_timeout = sconnect_timeout.c_str();
194 } catch (...) {
195 // No connect timeout. Fine, we'll use the default "5000".
196 }
197
198 const char* request_timeout = NULL;
199 std::string srequest_timeout;
200 try {
201 srequest_timeout = getParameter("request-timeout");
202 request_timeout = srequest_timeout.c_str();
203 } catch (...) {
204 // No request timeout. Fine, we'll use the default "12000".
205 }
206
207 const char* tcp_keepalive = NULL;
208 std::string stcp_keepalive;
209 try {
210 stcp_keepalive = getParameter("tcp-keepalive");
211 tcp_keepalive = stcp_keepalive.c_str();
212 } catch (...) {
213 // No tcp-keepalive. Fine, we'll not use TCP keepalive.
214 }
215
216 std::string stcp_nodelay;
217 try {
218 stcp_nodelay = getParameter("tcp-nodelay");
219 } catch (...) {
220 // No tcp-nodelay. Fine, we'll use the default false.
221 }
222
223 cluster_ = cass_cluster_new();
224 cass_cluster_set_contact_points(cluster_, contact_points);
225
226 if (user && password) {
227 cass_cluster_set_credentials(cluster_, user, password);
228 }
229
230 if (port) {
231 int32_t port_number;
232 try {
233 port_number = boost::lexical_cast<int32_t>(port);
234 if (port_number < 1 || port_number > 65535) {
235 isc_throw(DbOperationError,
236 "CqlConnection::openDatabase(): "
237 "port outside of range, expected "
238 "1-65535, instead got "
239 << port);
240 }
241 } catch (const boost::bad_lexical_cast& ex) {
242 isc_throw(DbOperationError,
243 "CqlConnection::openDatabase(): invalid "
244 "port, expected castable to int, instead got "
245 "\"" << port
246 << "\", " << ex.what());
247 }
248 cass_cluster_set_port(cluster_, port_number);
249 }
250
251 if (consistency) {
252 CassConsistency desired_consistency = CqlConnection::parseConsistency(sconsistency);
253 CassConsistency desired_serial_consistency = CASS_CONSISTENCY_UNKNOWN;
254 if (serial_consistency) {
255 desired_serial_consistency = CqlConnection::parseConsistency(sserial_consistency);
256 }
257 if (desired_consistency != CASS_CONSISTENCY_UNKNOWN) {
258 setConsistency(true, desired_consistency, desired_serial_consistency);
259 }
260 }
261
262 if (reconnect_wait_time) {
263 int32_t reconnect_wait_time_number;
264 try {
265 reconnect_wait_time_number =
266 boost::lexical_cast<int32_t>(reconnect_wait_time);
267 if (reconnect_wait_time_number < 0) {
268 isc_throw(DbOperationError,
269 "CqlConnection::openDatabase(): invalid reconnect "
270 "wait time, expected positive number, instead got "
271 << reconnect_wait_time);
272 }
273 } catch (const boost::bad_lexical_cast& ex) {
274 isc_throw(DbOperationError,
275 "CqlConnection::openDatabase(): "
276 "invalid reconnect wait time, expected "
277 "castable to int, instead got \""
278 << reconnect_wait_time << "\", " << ex.what());
279 }
280 #if (CASS_VERSION_MAJOR > 2) || \
281 ((CASS_VERSION_MAJOR == 2) && (CASS_VERSION_MINOR >= 13))
282 cass_uint64_t delay_ms =
283 static_cast<cass_uint64_t>(reconnect_wait_time_number);
284 cass_cluster_set_constant_reconnect(cluster_, delay_ms);
285 #else
286 cass_cluster_set_reconnect_wait_time(cluster_,
287 reconnect_wait_time_number);
288 #endif
289 }
290
291 if (connect_timeout) {
292 int32_t connect_timeout_number;
293 try {
294 connect_timeout_number =
295 boost::lexical_cast<int32_t>(connect_timeout);
296 if (connect_timeout_number < 0) {
297 isc_throw(DbOperationError,
298 "CqlConnection::openDatabase(): "
299 "invalid connect timeout, expected "
300 "positive number, instead got "
301 << connect_timeout);
302 }
303 } catch (const boost::bad_lexical_cast& ex) {
304 isc_throw(DbOperationError,
305 "CqlConnection::openDatabase(): invalid connect timeout, "
306 "expected castable to int, instead got \""
307 << connect_timeout << "\", " << ex.what());
308 }
309 cass_cluster_set_connect_timeout(cluster_, connect_timeout_number);
310 }
311
312 if (request_timeout) {
313 int32_t request_timeout_number;
314 try {
315 request_timeout_number =
316 boost::lexical_cast<int32_t>(request_timeout);
317 if (request_timeout_number < 0) {
318 isc_throw(DbOperationError,
319 "CqlConnection::openDatabase(): "
320 "invalid request timeout, expected "
321 "positive number, instead got "
322 << request_timeout);
323 }
324 } catch (const boost::bad_lexical_cast& ex) {
325 isc_throw(DbOperationError,
326 "CqlConnection::openDatabase(): invalid request timeout, "
327 "expected castable to int, instead got \""
328 << request_timeout << "\", " << ex.what());
329 }
330 cass_cluster_set_request_timeout(cluster_, request_timeout_number);
331 }
332
333 if (tcp_keepalive) {
334 int32_t tcp_keepalive_number;
335 try {
336 tcp_keepalive_number = boost::lexical_cast<int32_t>(tcp_keepalive);
337 if (tcp_keepalive_number < 0) {
338 isc_throw(DbOperationError,
339 "CqlConnection::openDatabase(): "
340 "invalid TCP keepalive, expected "
341 "positive number, instead got "
342 << tcp_keepalive);
343 }
344 } catch (const boost::bad_lexical_cast& ex) {
345 isc_throw(DbOperationError,
346 "CqlConnection::openDatabase(): invalid TCP keepalive, "
347 "expected castable to int, instead got \""
348 << tcp_keepalive << "\", " << ex.what());
349 }
350 cass_cluster_set_tcp_keepalive(cluster_, cass_true,
351 tcp_keepalive_number);
352 }
353
354 if (stcp_nodelay == "true") {
355 cass_cluster_set_tcp_nodelay(cluster_, cass_true);
356 }
357
358 session_ = cass_session_new();
359
360 CassFuture* connect_future =
361 cass_session_connect_keyspace(session_, cluster_, keyspace);
362 cass_future_wait(connect_future);
363 const std::string error =
364 checkFutureError("CqlConnection::openDatabase(): "
365 "cass_session_connect_keyspace() != CASS_OK",
366 connect_future);
367 rc = cass_future_error_code(connect_future);
368 cass_future_free(connect_future);
369 if (rc != CASS_OK) {
370 cass_session_free(session_);
371 session_ = NULL;
372 cass_cluster_free(cluster_);
373 cluster_ = NULL;
374 isc_throw(DbOpenError, error);
375 }
376
377 // Get keyspace meta.
378 schema_meta_ = cass_session_get_schema_meta(session_);
379 keyspace_meta_ = cass_schema_meta_keyspace_by_name(schema_meta_, keyspace);
380 if (!keyspace_meta_) {
381 isc_throw(DbOpenError, "CqlConnection::openDatabase(): "
382 "!cass_schema_meta_keyspace_by_name()");
383 }
384 }
385
386 void
prepareStatements(StatementMap & statements)387 CqlConnection::prepareStatements(StatementMap& statements) {
388 CassError rc = CASS_OK;
389 for (StatementMapEntry it : statements) {
390 CqlTaggedStatement& tagged_statement = it.second;
391 if (statements_.find(tagged_statement.name_) != statements_.end()) {
392 isc_throw(DbOperationError,
393 "CqlConnection::prepareStatements(): "
394 "duplicate statement with name "
395 << tagged_statement.name_);
396 }
397
398 CassFuture* future =
399 cass_session_prepare(session_, tagged_statement.text_);
400 cass_future_wait(future);
401 const std::string error =
402 checkFutureError("CqlConnection::prepareStatements():"
403 " cass_session_prepare() != CASS_OK",
404 future, tagged_statement.name_);
405 rc = cass_future_error_code(future);
406 if (rc != CASS_OK) {
407 cass_future_free(future);
408 isc_throw(DbOperationError, error);
409 }
410
411 tagged_statement.prepared_statement_ = cass_future_get_prepared(future);
412 statements_.insert(it);
413 cass_future_free(future);
414 }
415 }
416
417 void
setConsistency(bool force,CassConsistency consistency,CassConsistency serial_consistency)418 CqlConnection::setConsistency(bool force,
419 CassConsistency consistency,
420 CassConsistency serial_consistency) {
421 force_consistency_ = force;
422 consistency_ = consistency;
423 serial_consistency_ = serial_consistency;
424 }
425
426 void
startTransaction()427 CqlConnection::startTransaction() {
428 DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_BEGIN_TRANSACTION);
429 }
430
431 void
commit()432 CqlConnection::commit() {
433 DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_COMMIT);
434 }
435
436 void
rollback()437 CqlConnection::rollback() {
438 DB_LOG_DEBUG(DB_DBG_TRACE_DETAIL, CQL_CONNECTION_ROLLBACK);
439 }
440
441 const std::string
checkFutureError(const std::string & what,CassFuture * future,StatementTag statement_tag)442 CqlConnection::checkFutureError(const std::string& what,
443 CassFuture* future,
444 StatementTag statement_tag /* = NULL */) {
445 CassError cass_error = cass_future_error_code(future);
446 const char* error_message;
447 size_t error_message_size;
448 cass_future_error_message(future, &error_message, &error_message_size);
449
450 std::stringstream stream;
451 if (statement_tag && std::strlen(statement_tag) > 0) {
452 // future is from cass_session_execute() call.
453 stream << "Statement ";
454 stream << statement_tag;
455 } else {
456 // future is from cass_session_*() call.
457 stream << "Session action ";
458 }
459 if (cass_error == CASS_OK) {
460 stream << " executed successfully.";
461 } else {
462 stream << " failed, Kea error: " << what
463 << ", Cassandra error code: " << cass_error_desc(cass_error)
464 << ", Cassandra future error: " << error_message;
465 }
466 return stream.str();
467 }
468
469 } // namespace dhcp
470 } // namespace isc
471