1 /* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
2
3 #include "lib.h"
4 #include "istream.h"
5 #include "array.h"
6 #include "hostpid.h"
7 #include "hex-binary.h"
8 #include "str.h"
9 #include "ioloop.h"
10 #include "net.h"
11 #include "write-full.h"
12 #include "time-util.h"
13 #include "var-expand.h"
14 #include "safe-memset.h"
15 #include "settings-parser.h"
16 #include "sql-api-private.h"
17
18 #ifdef BUILD_CASSANDRA
19 #include <stdio.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22 #include <cassandra.h>
23 #include <pthread.h>
24
25 #define IS_CONNECTED(db) \
26 ((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
27 (db)->api.state != SQL_DB_STATE_CONNECTING)
28
29 #define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
30 #define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
31 #define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
32
33 #define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
34
35 typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
36
37 enum cassandra_counter_type {
38 CASSANDRA_COUNTER_TYPE_QUERY_SENT,
39 CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK,
40 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
41 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
42 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
43 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
44 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
45 CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER,
46 CASSANDRA_COUNTER_TYPE_QUERY_SLOW,
47
48 CASSANDRA_COUNTER_COUNT
49 };
50 static const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
51 "sent",
52 "recv_ok",
53 "recv_err_no_hosts",
54 "recv_err_queue_full",
55 "recv_err_client_timeout",
56 "recv_err_server_timeout",
57 "recv_err_server_unavailable",
58 "recv_err_other",
59 "slow",
60 };
61
62 enum cassandra_query_type {
63 CASSANDRA_QUERY_TYPE_READ,
64 CASSANDRA_QUERY_TYPE_READ_MORE,
65 CASSANDRA_QUERY_TYPE_WRITE,
66 CASSANDRA_QUERY_TYPE_DELETE,
67
68 CASSANDRA_QUERY_TYPE_COUNT
69 };
70
71 static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
72 "read", "read-more", "write", "delete"
73 };
74
75 struct cassandra_callback {
76 unsigned int id;
77 struct timeout *to;
78 CassFuture *future;
79 struct cassandra_db *db;
80 driver_cassandra_callback_t *callback;
81 void *context;
82 };
83
84 struct cassandra_db {
85 struct sql_db api;
86
87 char *hosts, *keyspace, *user, *password;
88 CassConsistency read_consistency, write_consistency, delete_consistency;
89 CassConsistency read_fallback_consistency, write_fallback_consistency;
90 CassConsistency delete_fallback_consistency;
91 CassLogLevel log_level;
92 bool debug_queries;
93 bool latency_aware_routing;
94 bool init_ssl;
95 unsigned int protocol_version;
96 unsigned int num_threads;
97 unsigned int connect_timeout_msecs, request_timeout_msecs;
98 unsigned int warn_timeout_msecs;
99 unsigned int heartbeat_interval_secs, idle_timeout_secs;
100 unsigned int execution_retry_interval_msecs, execution_retry_times;
101 unsigned int page_size;
102 in_port_t port;
103
104 CassCluster *cluster;
105 CassSession *session;
106 CassTimestampGen *timestamp_gen;
107 CassSsl *ssl;
108
109 int fd_pipe[2];
110 struct io *io_pipe;
111 ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
112 ARRAY(struct cassandra_callback *) callbacks;
113 ARRAY(struct cassandra_result *) results;
114 unsigned int callback_ids;
115
116 char *metrics_path;
117 char *ssl_ca_file;
118 char *ssl_cert_file;
119 char *ssl_private_key_file;
120 char *ssl_private_key_password;
121 CassSslVerifyFlags ssl_verify_flags;
122
123 struct timeout *to_metrics;
124 uint64_t counters[CASSANDRA_COUNTER_COUNT];
125
126 struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
127 time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
128 unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
129
130 /* for synchronous queries: */
131 struct ioloop *ioloop, *orig_ioloop;
132 struct sql_result *sync_result;
133
134 char *error;
135 };
136
137 struct cassandra_result {
138 struct sql_result api;
139 CassStatement *statement;
140 const CassResult *result;
141 CassIterator *iterator;
142 char *query;
143 char *error;
144 CassConsistency consistency, fallback_consistency;
145 enum cassandra_query_type query_type;
146 struct timeval page0_start_time, start_time, finish_time;
147 unsigned int row_count, total_row_count, page_num;
148 cass_int64_t timestamp;
149
150 pool_t row_pool;
151 ARRAY_TYPE(const_string) fields;
152 ARRAY(size_t) field_sizes;
153
154 sql_query_callback_t *callback;
155 void *context;
156
157 bool is_prepared:1;
158 bool query_sent:1;
159 bool finished:1;
160 bool paging_continues:1;
161 };
162
163 struct cassandra_transaction_context {
164 struct sql_transaction_context ctx;
165 int refcount;
166
167 sql_commit_callback_t *callback;
168 void *context;
169
170 struct cassandra_sql_statement *stmt;
171 char *query;
172 cass_int64_t query_timestamp;
173 char *error;
174
175 bool begin_succeeded:1;
176 bool begin_failed:1;
177 bool failed:1;
178 };
179
180 struct cassandra_sql_arg {
181 unsigned int column_idx;
182
183 char *value_str;
184 const unsigned char *value_binary;
185 size_t value_binary_size;
186 int64_t value_int64;
187 };
188
189 struct cassandra_sql_statement {
190 struct sql_statement stmt;
191
192 struct cassandra_sql_prepared_statement *prep;
193 CassStatement *cass_stmt;
194
195 ARRAY(struct cassandra_sql_arg) pending_args;
196 cass_int64_t timestamp;
197
198 struct cassandra_result *result;
199 };
200
201 struct cassandra_sql_prepared_statement {
202 struct sql_prepared_statement prep_stmt;
203
204 /* NULL, until the prepare is asynchronously finished */
205 const CassPrepared *prepared;
206 /* statements waiting for prepare to finish */
207 ARRAY(struct cassandra_sql_statement *) pending_statements;
208 /* an error here will cause the prepare to be retried on the next
209 execution attempt. */
210 char *error;
211
212 bool pending;
213 };
214
215 extern const struct sql_db driver_cassandra_db;
216 extern const struct sql_result driver_cassandra_result;
217
218 static struct {
219 CassConsistency consistency;
220 const char *name;
221 } cass_consistency_names[] = {
222 { CASS_CONSISTENCY_ANY, "any" },
223 { CASS_CONSISTENCY_ONE, "one" },
224 { CASS_CONSISTENCY_TWO, "two" },
225 { CASS_CONSISTENCY_THREE, "three" },
226 { CASS_CONSISTENCY_QUORUM, "quorum" },
227 { CASS_CONSISTENCY_ALL, "all" },
228 { CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
229 { CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
230 { CASS_CONSISTENCY_SERIAL, "serial" },
231 { CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
232 { CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
233 };
234
235 static struct {
236 CassLogLevel log_level;
237 const char *name;
238 } cass_log_level_names[] = {
239 { CASS_LOG_CRITICAL, "critical" },
240 { CASS_LOG_ERROR, "error" },
241 { CASS_LOG_WARN, "warn" },
242 { CASS_LOG_INFO, "info" },
243 { CASS_LOG_DEBUG, "debug" },
244 { CASS_LOG_TRACE, "trace" }
245 };
246
247 static struct event_category event_category_cassandra = {
248 .parent = &event_category_sql,
249 .name = "cassandra"
250 };
251
252 static pthread_t main_thread_id;
253 static bool main_thread_id_set;
254
255 static void driver_cassandra_prepare_pending(struct cassandra_db *db);
256 static void
257 prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
258 static void driver_cassandra_result_send_query(struct cassandra_result *result);
259 static void driver_cassandra_send_queries(struct cassandra_db *db);
260 static void result_finish(struct cassandra_result *result);
261
log_one_line(const CassLogMessage * message,enum log_type log_type,const char * log_level_str,const char * text,size_t text_len)262 static void log_one_line(const CassLogMessage *message,
263 enum log_type log_type, const char *log_level_str,
264 const char *text, size_t text_len)
265 {
266 /* NOTE: We may not be in the main thread. We can't use the
267 standard Dovecot functions that may use data stack. That's why
268 we can't use i_log_type() in here, but have to re-implement the
269 internal logging protocol. Otherwise preserve Cassandra's own
270 logging format. */
271 fprintf(stderr, "\001%c%s %u.%03u %s(%s:%d:%s): %.*s\n",
272 log_type+1, my_pid,
273 (unsigned int)(message->time_ms / 1000),
274 (unsigned int)(message->time_ms % 1000),
275 log_level_str,
276 message->file, message->line, message->function,
277 (int)text_len, text);
278 }
279
280 static void
driver_cassandra_log_handler(const CassLogMessage * message,void * data ATTR_UNUSED)281 driver_cassandra_log_handler(const CassLogMessage* message,
282 void *data ATTR_UNUSED)
283 {
284 enum log_type log_type = LOG_TYPE_ERROR;
285 const char *log_level_str = "";
286
287 switch (message->severity) {
288 case CASS_LOG_DISABLED:
289 case CASS_LOG_LAST_ENTRY:
290 i_unreached();
291 case CASS_LOG_CRITICAL:
292 log_type = LOG_TYPE_PANIC;
293 break;
294 case CASS_LOG_ERROR:
295 log_type = LOG_TYPE_ERROR;
296 break;
297 case CASS_LOG_WARN:
298 log_type = LOG_TYPE_WARNING;
299 break;
300 case CASS_LOG_INFO:
301 log_type = LOG_TYPE_INFO;
302 break;
303 case CASS_LOG_TRACE:
304 log_level_str = "[TRACE] ";
305 /* fall through */
306 case CASS_LOG_DEBUG:
307 log_type = LOG_TYPE_DEBUG;
308 break;
309 }
310
311 /* Log message may contain LFs, so log each line separately. */
312 const char *p, *line = message->message;
313 while ((p = strchr(line, '\n')) != NULL) {
314 log_one_line(message, log_type, log_level_str, line, p - line);
315 line = p+1;
316 }
317 log_one_line(message, log_type, log_level_str, line, strlen(line));
318 }
319
driver_cassandra_init_log(void)320 static void driver_cassandra_init_log(void)
321 {
322 failure_callback_t *fatal_callback, *error_callback;
323 failure_callback_t *info_callback, *debug_callback;
324
325 i_get_failure_handlers(&fatal_callback, &error_callback,
326 &info_callback, &debug_callback);
327 if (i_failure_handler_is_internal(debug_callback)) {
328 /* Using internal logging protocol. Use it ourself to set log
329 levels correctly. */
330 cass_log_set_callback(driver_cassandra_log_handler, NULL);
331 }
332 }
333
consistency_parse(const char * str,CassConsistency * consistency_r)334 static int consistency_parse(const char *str, CassConsistency *consistency_r)
335 {
336 unsigned int i;
337
338 for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
339 if (strcmp(cass_consistency_names[i].name, str) == 0) {
340 *consistency_r = cass_consistency_names[i].consistency;
341 return 0;
342 }
343 }
344 return -1;
345 }
346
log_level_parse(const char * str,CassLogLevel * log_level_r)347 static int log_level_parse(const char *str, CassLogLevel *log_level_r)
348 {
349 unsigned int i;
350
351 for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
352 if (strcmp(cass_log_level_names[i].name, str) == 0) {
353 *log_level_r = cass_log_level_names[i].log_level;
354 return 0;
355 }
356 }
357 return -1;
358 }
359
driver_cassandra_set_state(struct cassandra_db * db,enum sql_db_state state)360 static void driver_cassandra_set_state(struct cassandra_db *db,
361 enum sql_db_state state)
362 {
363 /* switch back to original ioloop in case the caller wants to
364 add/remove timeouts */
365 if (db->ioloop != NULL)
366 io_loop_set_current(db->orig_ioloop);
367 sql_db_set_state(&db->api, state);
368 if (db->ioloop != NULL)
369 io_loop_set_current(db->ioloop);
370 }
371
driver_cassandra_close(struct cassandra_db * db,const char * error)372 static void driver_cassandra_close(struct cassandra_db *db, const char *error)
373 {
374 struct cassandra_sql_prepared_statement *prep_stmt;
375 struct cassandra_result *const *resultp;
376
377 io_remove(&db->io_pipe);
378 if (db->fd_pipe[0] != -1) {
379 i_close_fd(&db->fd_pipe[0]);
380 i_close_fd(&db->fd_pipe[1]);
381 }
382 driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
383
384 array_foreach_elem(&db->pending_prepares, prep_stmt) {
385 prep_stmt->pending = FALSE;
386 prep_stmt->error = i_strdup(error);
387 prepare_finish_pending_statements(prep_stmt);
388 }
389 array_clear(&db->pending_prepares);
390
391 while (array_count(&db->results) > 0) {
392 resultp = array_front(&db->results);
393 if ((*resultp)->error == NULL)
394 (*resultp)->error = i_strdup(error);
395 result_finish(*resultp);
396 }
397
398 if (db->ioloop != NULL) {
399 /* running a sync query, stop it */
400 io_loop_stop(db->ioloop);
401 }
402 }
403
driver_cassandra_log_error(struct cassandra_db * db,CassFuture * future,const char * str)404 static void driver_cassandra_log_error(struct cassandra_db *db,
405 CassFuture *future, const char *str)
406 {
407 const char *message;
408 size_t size;
409
410 cass_future_error_message(future, &message, &size);
411 e_error(db->api.event, "%s: %.*s", str, (int)size, message);
412 }
413
414 static struct cassandra_callback *
cassandra_callback_detach(struct cassandra_db * db,unsigned int id)415 cassandra_callback_detach(struct cassandra_db *db, unsigned int id)
416 {
417 struct cassandra_callback *cb, *const *cbp;
418
419 /* usually there are only a few callbacks, so don't bother with using
420 a hash table */
421 array_foreach(&db->callbacks, cbp) {
422 cb = *cbp;
423 if (cb->id == id) {
424 array_delete(&db->callbacks,
425 array_foreach_idx(&db->callbacks, cbp), 1);
426 return cb;
427 }
428 }
429 return NULL;
430 }
431
cassandra_callback_run(struct cassandra_callback * cb)432 static void cassandra_callback_run(struct cassandra_callback *cb)
433 {
434 timeout_remove(&cb->to);
435 cb->callback(cb->future, cb->context);
436 cass_future_free(cb->future);
437 i_free(cb);
438 }
439
driver_cassandra_future_callback(CassFuture * future ATTR_UNUSED,void * context)440 static void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
441 void *context)
442 {
443 struct cassandra_callback *cb = context;
444
445 if (pthread_equal(pthread_self(), main_thread_id) != 0) {
446 /* called immediately from the main thread. */
447 cassandra_callback_detach(cb->db, cb->id);
448 cb->to = timeout_add_short(0, cassandra_callback_run, cb);
449 return;
450 }
451
452 /* this isn't the main thread - communicate with main thread by
453 writing the callback id to the pipe. note that we must not use
454 almost any dovecot functions here because most of them are using
455 data-stack, which isn't thread-safe. especially don't use
456 i_error() here. */
457 if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
458 const char *str = t_strdup_printf(
459 "cassandra: write(pipe) failed: %s\n",
460 strerror(errno));
461 (void)write_full(STDERR_FILENO, str, strlen(str));
462 }
463 }
464
driver_cassandra_input_id(struct cassandra_db * db,unsigned int id)465 static void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
466 {
467 struct cassandra_callback *cb;
468
469 cb = cassandra_callback_detach(db, id);
470 if (cb == NULL)
471 i_panic("cassandra: Received unknown ID %u", id);
472 cassandra_callback_run(cb);
473 }
474
driver_cassandra_input(struct cassandra_db * db)475 static void driver_cassandra_input(struct cassandra_db *db)
476 {
477 unsigned int ids[1024];
478 ssize_t ret;
479
480 ret = read(db->fd_pipe[0], ids, sizeof(ids));
481 if (ret < 0)
482 e_error(db->api.event, "read(pipe) failed: %m");
483 else if (ret == 0)
484 e_error(db->api.event, "read(pipe) failed: EOF");
485 else if (ret % sizeof(ids[0]) != 0)
486 e_error(db->api.event, "read(pipe) returned wrong amount of data");
487 else {
488 /* success */
489 unsigned int i, count = ret / sizeof(ids[0]);
490
491 for (i = 0; i < count &&
492 db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
493 driver_cassandra_input_id(db, ids[i]);
494 return;
495 }
496 driver_cassandra_close(db, "IPC pipe closed");
497 }
498
499 static void
driver_cassandra_set_callback(CassFuture * future,struct cassandra_db * db,driver_cassandra_callback_t * callback,void * context)500 driver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
501 driver_cassandra_callback_t *callback,
502 void *context)
503 {
504 struct cassandra_callback *cb;
505
506 i_assert(callback != NULL);
507
508 cb = i_new(struct cassandra_callback, 1);
509 cb->future = future;
510 cb->callback = callback;
511 cb->context = context;
512 cb->db = db;
513
514 array_push_back(&db->callbacks, &cb);
515 cb->id = ++db->callback_ids;
516 if (cb->id == 0)
517 cb->id = ++db->callback_ids;
518
519 /* NOTE: The callback may be called immediately by this same thread.
520 This is checked within the callback. It may also be called at any
521 time after this call by another thread. So we must not access "cb"
522 again after this call. */
523 cass_future_set_callback(future, driver_cassandra_future_callback, cb);
524 }
525
connect_callback(CassFuture * future,void * context)526 static void connect_callback(CassFuture *future, void *context)
527 {
528 struct cassandra_db *db = context;
529
530 if (cass_future_error_code(future) != CASS_OK) {
531 driver_cassandra_log_error(db, future,
532 "Couldn't connect to Cassandra");
533 driver_cassandra_close(db, "Couldn't connect to Cassandra");
534 return;
535 }
536 driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
537 if (db->ioloop != NULL) {
538 /* driver_cassandra_sync_init() waiting for connection to
539 finish */
540 io_loop_stop(db->ioloop);
541 }
542 driver_cassandra_prepare_pending(db);
543 driver_cassandra_send_queries(db);
544 }
545
driver_cassandra_connect(struct sql_db * _db)546 static int driver_cassandra_connect(struct sql_db *_db)
547 {
548 struct cassandra_db *db = (struct cassandra_db *)_db;
549 CassFuture *future;
550
551 i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
552
553 if (pipe(db->fd_pipe) < 0) {
554 e_error(_db->event, "pipe() failed: %m");
555 return -1;
556 }
557 db->io_pipe = io_add(db->fd_pipe[0], IO_READ,
558 driver_cassandra_input, db);
559 driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
560
561 future = cass_session_connect_keyspace(db->session, db->cluster,
562 db->keyspace);
563 driver_cassandra_set_callback(future, db, connect_callback, db);
564 return 0;
565 }
566
driver_cassandra_disconnect(struct sql_db * _db)567 static void driver_cassandra_disconnect(struct sql_db *_db)
568 {
569 struct cassandra_db *db = (struct cassandra_db *)_db;
570
571 driver_cassandra_close(db, "Disconnected");
572 }
573
574 static const char *
driver_cassandra_escape_string(struct sql_db * db ATTR_UNUSED,const char * string)575 driver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
576 const char *string)
577 {
578 string_t *escaped;
579 unsigned int i;
580
581 if (strchr(string, '\'') == NULL)
582 return string;
583 escaped = t_str_new(strlen(string)+10);
584 for (i = 0; string[i] != '\0'; i++) {
585 if (string[i] == '\'')
586 str_append_c(escaped, '\'');
587 str_append_c(escaped, string[i]);
588 }
589 return str_c(escaped);
590 }
591
driver_cassandra_parse_connect_string(struct cassandra_db * db,const char * connect_string,const char ** error_r)592 static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
593 const char *connect_string,
594 const char **error_r)
595 {
596 const char *const *args, *key, *value, *error;
597 string_t *hosts = t_str_new(64);
598 bool read_fallback_set = FALSE, write_fallback_set = FALSE;
599 bool delete_fallback_set = FALSE;
600
601 db->log_level = CASS_LOG_WARN;
602 db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
603 db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
604 db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
605 db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
606 db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
607 db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
608
609 args = t_strsplit_spaces(connect_string, " ");
610 for (; *args != NULL; args++) {
611 value = strchr(*args, '=');
612 if (value == NULL) {
613 *error_r = t_strdup_printf(
614 "Missing value in connect string: %s", *args);
615 return -1;
616 }
617 key = t_strdup_until(*args, value++);
618
619 if (str_begins(key, "ssl_"))
620 db->init_ssl = TRUE;
621
622 if (strcmp(key, "host") == 0) {
623 if (str_len(hosts) > 0)
624 str_append_c(hosts, ',');
625 str_append(hosts, value);
626 } else if (strcmp(key, "port") == 0) {
627 if (net_str2port(value, &db->port) < 0) {
628 *error_r = t_strdup_printf(
629 "Invalid port: %s", value);
630 return -1;
631 }
632 } else if (strcmp(key, "dbname") == 0 ||
633 strcmp(key, "keyspace") == 0) {
634 i_free(db->keyspace);
635 db->keyspace = i_strdup(value);
636 } else if (strcmp(key, "user") == 0) {
637 i_free(db->user);
638 db->user = i_strdup(value);
639 } else if (strcmp(key, "password") == 0) {
640 i_free(db->password);
641 db->password = i_strdup(value);
642 } else if (strcmp(key, "read_consistency") == 0) {
643 if (consistency_parse(value, &db->read_consistency) < 0) {
644 *error_r = t_strdup_printf(
645 "Unknown read_consistency: %s", value);
646 return -1;
647 }
648 } else if (strcmp(key, "read_fallback_consistency") == 0) {
649 if (consistency_parse(value, &db->read_fallback_consistency) < 0) {
650 *error_r = t_strdup_printf(
651 "Unknown read_fallback_consistency: %s", value);
652 return -1;
653 }
654 read_fallback_set = TRUE;
655 } else if (strcmp(key, "write_consistency") == 0) {
656 if (consistency_parse(value,
657 &db->write_consistency) < 0) {
658 *error_r = t_strdup_printf(
659 "Unknown write_consistency: %s", value);
660 return -1;
661 }
662 } else if (strcmp(key, "write_fallback_consistency") == 0) {
663 if (consistency_parse(value,
664 &db->write_fallback_consistency) < 0) {
665 *error_r = t_strdup_printf(
666 "Unknown write_fallback_consistency: %s",
667 value);
668 return -1;
669 }
670 write_fallback_set = TRUE;
671 } else if (strcmp(key, "delete_consistency") == 0) {
672 if (consistency_parse(value,
673 &db->delete_consistency) < 0) {
674 *error_r = t_strdup_printf(
675 "Unknown delete_consistency: %s", value);
676 return -1;
677 }
678 } else if (strcmp(key, "delete_fallback_consistency") == 0) {
679 if (consistency_parse(value,
680 &db->delete_fallback_consistency) < 0) {
681 *error_r = t_strdup_printf(
682 "Unknown delete_fallback_consistency: %s",
683 value);
684 return -1;
685 }
686 delete_fallback_set = TRUE;
687 } else if (strcmp(key, "log_level") == 0) {
688 if (log_level_parse(value, &db->log_level) < 0) {
689 *error_r = t_strdup_printf(
690 "Unknown log_level: %s", value);
691 return -1;
692 }
693 } else if (strcmp(key, "debug_queries") == 0) {
694 db->debug_queries = TRUE;
695 } else if (strcmp(key, "latency_aware_routing") == 0) {
696 db->latency_aware_routing = TRUE;
697 } else if (strcmp(key, "version") == 0) {
698 if (str_to_uint(value, &db->protocol_version) < 0) {
699 *error_r = t_strdup_printf(
700 "Invalid version: %s", value);
701 return -1;
702 }
703 } else if (strcmp(key, "num_threads") == 0) {
704 if (str_to_uint(value, &db->num_threads) < 0) {
705 *error_r = t_strdup_printf(
706 "Invalid num_threads: %s", value);
707 return -1;
708 }
709 } else if (strcmp(key, "heartbeat_interval") == 0) {
710 if (settings_get_time(value, &db->heartbeat_interval_secs,
711 &error) < 0) {
712 *error_r = t_strdup_printf(
713 "Invalid heartbeat_interval '%s': %s",
714 value, error);
715 return -1;
716 }
717 } else if (strcmp(key, "idle_timeout") == 0) {
718 if (settings_get_time(value, &db->idle_timeout_secs,
719 &error) < 0) {
720 *error_r = t_strdup_printf(
721 "Invalid idle_timeout '%s': %s",
722 value, error);
723 return -1;
724 }
725 } else if (strcmp(key, "connect_timeout") == 0) {
726 if (settings_get_time_msecs(value,
727 &db->connect_timeout_msecs,
728 &error) < 0) {
729 *error_r = t_strdup_printf(
730 "Invalid connect_timeout '%s': %s",
731 value, error);
732 return -1;
733 }
734 } else if (strcmp(key, "request_timeout") == 0) {
735 if (settings_get_time_msecs(value,
736 &db->request_timeout_msecs,
737 &error) < 0) {
738 *error_r = t_strdup_printf(
739 "Invalid request_timeout '%s': %s",
740 value, error);
741 return -1;
742 }
743 } else if (strcmp(key, "warn_timeout") == 0) {
744 if (settings_get_time_msecs(value,
745 &db->warn_timeout_msecs,
746 &error) < 0) {
747 *error_r = t_strdup_printf(
748 "Invalid warn_timeout '%s': %s",
749 value, error);
750 return -1;
751 }
752 } else if (strcmp(key, "metrics") == 0) {
753 i_free(db->metrics_path);
754 db->metrics_path = i_strdup(value);
755 } else if (strcmp(key, "execution_retry_interval") == 0) {
756 if (settings_get_time_msecs(value,
757 &db->execution_retry_interval_msecs,
758 &error) < 0) {
759 *error_r = t_strdup_printf(
760 "Invalid execution_retry_interval '%s': %s",
761 value, error);
762 return -1;
763 }
764 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
765 *error_r = t_strdup_printf(
766 "This cassandra version does not support execution_retry_interval");
767 return -1;
768 #endif
769 } else if (strcmp(key, "execution_retry_times") == 0) {
770 if (str_to_uint(value, &db->execution_retry_times) < 0) {
771 *error_r = t_strdup_printf(
772 "Invalid execution_retry_times %s",
773 value);
774 return -1;
775 }
776 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
777 *error_r = t_strdup_printf(
778 "This cassandra version does not support execution_retry_times");
779 return -1;
780 #endif
781 } else if (strcmp(key, "page_size") == 0) {
782 if (str_to_uint(value, &db->page_size) < 0) {
783 *error_r = t_strdup_printf(
784 "Invalid page_size: %s",
785 value);
786 return -1;
787 }
788 } else if (strcmp(key, "ssl_ca") == 0) {
789 db->ssl_ca_file = i_strdup(value);
790 } else if (strcmp(key, "ssl_cert_file") == 0) {
791 db->ssl_cert_file = i_strdup(value);
792 } else if (strcmp(key, "ssl_private_key_file") == 0) {
793 db->ssl_private_key_file = i_strdup(value);
794 } else if (strcmp(key, "ssl_private_key_password") == 0) {
795 db->ssl_private_key_password = i_strdup(value);
796 } else if (strcmp(key, "ssl_verify") == 0) {
797 if (strcmp(value, "none") == 0) {
798 db->ssl_verify_flags = CASS_SSL_VERIFY_NONE;
799 } else if (strcmp(value, "cert") == 0) {
800 db->ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT;
801 } else if (strcmp(value, "cert-ip") == 0) {
802 db->ssl_verify_flags =
803 CASS_SSL_VERIFY_PEER_CERT |
804 CASS_SSL_VERIFY_PEER_IDENTITY;
805 #if HAVE_DECL_CASS_SSL_VERIFY_PEER_IDENTITY_DNS == 1
806 } else if (strcmp(value, "cert-dns") == 0) {
807 db->ssl_verify_flags =
808 CASS_SSL_VERIFY_PEER_CERT |
809 CASS_SSL_VERIFY_PEER_IDENTITY_DNS;
810 #endif
811 } else {
812 *error_r = t_strdup_printf(
813 "Unsupported ssl_verify flags: '%s'",
814 value);
815 return -1;
816 }
817 } else {
818 *error_r = t_strdup_printf(
819 "Unknown connect string: %s", key);
820 return -1;
821 }
822 }
823
824 if (!read_fallback_set)
825 db->read_fallback_consistency = db->read_consistency;
826 if (!write_fallback_set)
827 db->write_fallback_consistency = db->write_consistency;
828 if (!delete_fallback_set)
829 db->delete_fallback_consistency = db->delete_consistency;
830
831 if (str_len(hosts) == 0) {
832 *error_r = t_strdup_printf("No hosts given in connect string");
833 return -1;
834 }
835 if (db->keyspace == NULL) {
836 *error_r = t_strdup_printf("No dbname given in connect string");
837 return -1;
838 }
839
840 if ((db->ssl_cert_file != NULL && db->ssl_private_key_file == NULL) ||
841 (db->ssl_cert_file == NULL && db->ssl_private_key_file != NULL)) {
842 *error_r = "ssl_cert_file and ssl_private_key_file need to be both set";
843 return -1;
844 }
845
846 db->hosts = i_strdup(str_c(hosts));
847 return 0;
848 }
849
850 static void
driver_cassandra_get_metrics_json(struct cassandra_db * db,string_t * dest)851 driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
852 {
853 #define ADD_UINT64(_struct, _field) \
854 str_printfa(dest, "\""#_field"\": %llu,", \
855 (unsigned long long)metrics._struct._field);
856 #define ADD_DOUBLE(_struct, _field) \
857 str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
858 CassMetrics metrics;
859
860 cass_session_get_metrics(db->session, &metrics);
861 str_append(dest, "{ \"requests\": {");
862 ADD_UINT64(requests, min);
863 ADD_UINT64(requests, max);
864 ADD_UINT64(requests, mean);
865 ADD_UINT64(requests, stddev);
866 ADD_UINT64(requests, median);
867 ADD_UINT64(requests, percentile_75th);
868 ADD_UINT64(requests, percentile_95th);
869 ADD_UINT64(requests, percentile_98th);
870 ADD_UINT64(requests, percentile_99th);
871 ADD_UINT64(requests, percentile_999th);
872 ADD_DOUBLE(requests, mean_rate);
873 ADD_DOUBLE(requests, one_minute_rate);
874 ADD_DOUBLE(requests, five_minute_rate);
875 ADD_DOUBLE(requests, fifteen_minute_rate);
876 str_truncate(dest, str_len(dest)-1);
877
878 str_append(dest, "}, \"stats\": {");
879 ADD_UINT64(stats, total_connections);
880 ADD_UINT64(stats, available_connections);
881 ADD_UINT64(stats, exceeded_pending_requests_water_mark);
882 ADD_UINT64(stats, exceeded_write_bytes_water_mark);
883 str_truncate(dest, str_len(dest)-1);
884
885 str_append(dest, "}, \"errors\": {");
886 ADD_UINT64(errors, connection_timeouts);
887 ADD_UINT64(errors, pending_request_timeouts);
888 ADD_UINT64(errors, request_timeouts);
889 str_truncate(dest, str_len(dest)-1);
890
891 str_append(dest, "}, \"queries\": {");
892 for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
893 str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i],
894 db->counters[i]);
895 }
896 str_truncate(dest, str_len(dest)-1);
897 str_append(dest, "}}");
898 }
899
driver_cassandra_metrics_write(struct cassandra_db * db)900 static void driver_cassandra_metrics_write(struct cassandra_db *db)
901 {
902 struct var_expand_table tab[] = {
903 { '\0', NULL, NULL }
904 };
905 string_t *path = t_str_new(64);
906 string_t *data;
907 const char *error;
908 int fd;
909
910 if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
911 e_error(db->api.event, "Failed to expand metrics_path=%s: %s",
912 db->metrics_path, error);
913 return;
914 }
915
916 fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
917 if (fd == -1) {
918 e_error(db->api.event, "creat(%s) failed: %m", str_c(path));
919 return;
920 }
921 data = t_str_new(1024);
922 driver_cassandra_get_metrics_json(db, data);
923 if (write_full(fd, str_data(data), str_len(data)) < 0)
924 e_error(db->api.event, "write(%s) failed: %m", str_c(path));
925 i_close_fd(&fd);
926 }
927
driver_cassandra_free(struct cassandra_db ** _db)928 static void driver_cassandra_free(struct cassandra_db **_db)
929 {
930 struct cassandra_db *db = *_db;
931 *_db = NULL;
932
933 event_unref(&db->api.event);
934 i_free(db->metrics_path);
935 i_free(db->hosts);
936 i_free(db->error);
937 i_free(db->keyspace);
938 i_free(db->user);
939 i_free(db->password);
940 i_free(db->ssl_ca_file);
941 i_free(db->ssl_cert_file);
942 i_free(db->ssl_private_key_file);
943 i_free_and_null(db->ssl_private_key_password);
944 array_free(&db->api.module_contexts);
945 if (db->ssl != NULL)
946 cass_ssl_free(db->ssl);
947 i_free(db);
948 }
949
driver_cassandra_init_ssl(struct cassandra_db * db,const char ** error_r)950 static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error_r)
951 {
952 buffer_t *buf = t_buffer_create(512);
953 CassError c_err;
954
955 db->ssl = cass_ssl_new();
956 i_assert(db->ssl != NULL);
957
958 if (db->ssl_ca_file != NULL) {
959 if (buffer_append_full_file(buf, db->ssl_ca_file, SIZE_MAX,
960 error_r) < 0)
961 return -1;
962 if ((c_err = cass_ssl_add_trusted_cert(db->ssl, str_c(buf))) != CASS_OK) {
963 *error_r = cass_error_desc(c_err);
964 return -1;
965 }
966 }
967
968 if (db->ssl_private_key_file != NULL && db->ssl_cert_file != NULL) {
969 buffer_set_used_size(buf, 0);
970 if (buffer_append_full_file(buf, db->ssl_private_key_file,
971 SIZE_MAX, error_r) < 0)
972 return -1;
973 c_err = cass_ssl_set_private_key(db->ssl, str_c(buf),
974 db->ssl_private_key_password);
975 safe_memset(buffer_get_modifiable_data(buf, NULL), 0, buf->used);
976 if (c_err != CASS_OK) {
977 *error_r = cass_error_desc(c_err);
978 return -1;
979 }
980
981 buffer_set_used_size(buf, 0);
982 if (buffer_append_full_file(buf, db->ssl_cert_file, SIZE_MAX, error_r) < 0)
983 return -1;
984 if ((c_err = cass_ssl_set_cert(db->ssl, str_c(buf))) != CASS_OK) {
985 *error_r = cass_error_desc(c_err);
986 return -1;
987 }
988 }
989
990 cass_ssl_set_verify_flags(db->ssl, db->ssl_verify_flags);
991
992 return 0;
993 }
994
driver_cassandra_init_full_v(const struct sql_settings * set,struct sql_db ** db_r,const char ** error_r)995 static int driver_cassandra_init_full_v(const struct sql_settings *set,
996 struct sql_db **db_r,
997 const char **error_r)
998 {
999 struct cassandra_db *db;
1000 int ret;
1001
1002 db = i_new(struct cassandra_db, 1);
1003 db->api = driver_cassandra_db;
1004 db->fd_pipe[0] = db->fd_pipe[1] = -1;
1005 db->api.event = event_create(set->event_parent);
1006 event_add_category(db->api.event, &event_category_cassandra);
1007 event_set_append_log_prefix(db->api.event, "cassandra: ");
1008
1009 T_BEGIN {
1010 ret = driver_cassandra_parse_connect_string(db,
1011 set->connect_string, error_r);
1012 } T_END_PASS_STR_IF(ret < 0, error_r);
1013
1014 if (ret < 0) {
1015 driver_cassandra_free(&db);
1016 return -1;
1017 }
1018
1019 if (db->init_ssl && driver_cassandra_init_ssl(db, error_r) < 0) {
1020 driver_cassandra_free(&db);
1021 return -1;
1022 }
1023
1024 driver_cassandra_init_log();
1025 cass_log_set_level(db->log_level);
1026 if (db->log_level >= CASS_LOG_DEBUG)
1027 event_set_forced_debug(db->api.event, TRUE);
1028
1029 if (db->protocol_version > 0 && db->protocol_version < 4) {
1030 /* binding with column indexes requires v4 */
1031 db->api.v.prepared_statement_init = NULL;
1032 db->api.v.prepared_statement_deinit = NULL;
1033 db->api.v.statement_init_prepared = NULL;
1034 }
1035
1036 db->timestamp_gen = cass_timestamp_gen_monotonic_new();
1037 db->cluster = cass_cluster_new();
1038
1039 #ifdef HAVE_CASS_CLUSTER_SET_USE_HOSTNAME_RESOLUTION
1040 if ((db->ssl_verify_flags & CASS_SSL_VERIFY_PEER_IDENTITY_DNS) != 0) {
1041 CassError c_err;
1042 if ((c_err = cass_cluster_set_use_hostname_resolution(
1043 db->cluster, cass_true)) != CASS_OK) {
1044 *error_r = cass_error_desc(c_err);
1045 driver_cassandra_free(&db);
1046 return -1;
1047 }
1048 }
1049 #endif
1050 cass_cluster_set_ssl(db->cluster, db->ssl);
1051 cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
1052 cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
1053 cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
1054 cass_cluster_set_contact_points(db->cluster, db->hosts);
1055 if (db->user != NULL && db->password != NULL)
1056 cass_cluster_set_credentials(db->cluster, db->user, db->password);
1057 if (db->port != 0)
1058 cass_cluster_set_port(db->cluster, db->port);
1059 if (db->protocol_version != 0)
1060 cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
1061 if (db->num_threads != 0)
1062 cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
1063 if (db->latency_aware_routing)
1064 cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
1065 if (db->heartbeat_interval_secs != 0)
1066 cass_cluster_set_connection_heartbeat_interval(db->cluster,
1067 db->heartbeat_interval_secs);
1068 if (db->idle_timeout_secs != 0)
1069 cass_cluster_set_connection_idle_timeout(db->cluster,
1070 db->idle_timeout_secs);
1071 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
1072 if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
1073 cass_cluster_set_constant_speculative_execution_policy(
1074 db->cluster, db->execution_retry_interval_msecs,
1075 db->execution_retry_times);
1076 #endif
1077 if (db->ssl != NULL) {
1078 e_debug(db->api.event, "Enabling TLS for cluster");
1079 cass_cluster_set_ssl(db->cluster, db->ssl);
1080 }
1081 db->session = cass_session_new();
1082 if (db->metrics_path != NULL)
1083 db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write,
1084 db);
1085 i_array_init(&db->results, 16);
1086 i_array_init(&db->callbacks, 16);
1087 i_array_init(&db->pending_prepares, 16);
1088 if (!main_thread_id_set) {
1089 main_thread_id = pthread_self();
1090 main_thread_id_set = TRUE;
1091 }
1092
1093 *db_r = &db->api;
1094 return 0;
1095 }
1096
driver_cassandra_deinit_v(struct sql_db * _db)1097 static void driver_cassandra_deinit_v(struct sql_db *_db)
1098 {
1099 struct cassandra_db *db = (struct cassandra_db *)_db;
1100
1101 driver_cassandra_close(db, "Deinitialized");
1102
1103 i_assert(array_count(&db->callbacks) == 0);
1104 array_free(&db->callbacks);
1105 i_assert(array_count(&db->results) == 0);
1106 array_free(&db->results);
1107 i_assert(array_count(&db->pending_prepares) == 0);
1108 array_free(&db->pending_prepares);
1109
1110 cass_session_free(db->session);
1111 cass_cluster_free(db->cluster);
1112 cass_timestamp_gen_free(db->timestamp_gen);
1113 timeout_remove(&db->to_metrics);
1114 sql_connection_log_finished(_db);
1115 driver_cassandra_free(&db);
1116 }
1117
driver_cassandra_result_unlink(struct cassandra_db * db,struct cassandra_result * result)1118 static void driver_cassandra_result_unlink(struct cassandra_db *db,
1119 struct cassandra_result *result)
1120 {
1121 struct cassandra_result *const *results;
1122 unsigned int i, count;
1123
1124 results = array_get(&db->results, &count);
1125 for (i = 0; i < count; i++) {
1126 if (results[i] == result) {
1127 array_delete(&db->results, i, 1);
1128 return;
1129 }
1130 }
1131 i_unreached();
1132 }
1133
driver_cassandra_log_result(struct cassandra_result * result,bool all_pages,long long reply_usecs)1134 static void driver_cassandra_log_result(struct cassandra_result *result,
1135 bool all_pages, long long reply_usecs)
1136 {
1137 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1138 struct timeval now;
1139 unsigned int row_count;
1140
1141 i_gettimeofday(&now);
1142
1143 string_t *str = t_str_new(128);
1144 str_printfa(str, "Finished %squery '%s' (",
1145 result->is_prepared ? "prepared " : "", result->query);
1146 if (result->timestamp != 0)
1147 str_printfa(str, "timestamp=%"PRId64", ", result->timestamp);
1148 if (all_pages) {
1149 str_printfa(str, "%u pages in total, ", result->page_num);
1150 row_count = result->total_row_count;
1151 } else {
1152 if (result->page_num > 0 || result->paging_continues)
1153 str_printfa(str, "page %u, ", result->page_num);
1154 row_count = result->row_count;
1155 }
1156 str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
1157 timeval_diff_usecs(&now, &result->finish_time),
1158 result->error != NULL ? result->error : "success");
1159
1160 struct event_passthrough *e =
1161 sql_query_finished_event(&db->api, result->api.event,
1162 result->query, result->error == NULL,
1163 NULL);
1164 if (result->error != NULL)
1165 e->add_str("error", result->error);
1166
1167 struct event *event = e->event();
1168 if (db->debug_queries)
1169 event_set_forced_debug(event, TRUE);
1170 if (reply_usecs/1000 >= db->warn_timeout_msecs) {
1171 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
1172 e_warning(event, "%s", str_c(str));
1173 } else {
1174 e_debug(event, "%s", str_c(str));
1175 }
1176 }
1177
driver_cassandra_result_free(struct sql_result * _result)1178 static void driver_cassandra_result_free(struct sql_result *_result)
1179 {
1180 struct cassandra_db *db = (struct cassandra_db *)_result->db;
1181 struct cassandra_result *result = (struct cassandra_result *)_result;
1182 long long reply_usecs;
1183
1184 i_assert(!result->api.callback);
1185 i_assert(result->callback == NULL);
1186
1187 if (_result == db->sync_result)
1188 db->sync_result = NULL;
1189
1190 reply_usecs = timeval_diff_usecs(&result->finish_time,
1191 &result->start_time);
1192 driver_cassandra_log_result(result, FALSE, reply_usecs);
1193
1194 if (result->page_num > 0 && !result->paging_continues) {
1195 /* Multi-page query finishes now. Log a debug/warning summary
1196 message about it separate from the per-page messages. */
1197 reply_usecs = timeval_diff_usecs(&result->finish_time,
1198 &result->page0_start_time);
1199 driver_cassandra_log_result(result, TRUE, reply_usecs);
1200 }
1201
1202 if (result->result != NULL)
1203 cass_result_free(result->result);
1204 if (result->iterator != NULL)
1205 cass_iterator_free(result->iterator);
1206 if (result->statement != NULL)
1207 cass_statement_free(result->statement);
1208 pool_unref(&result->row_pool);
1209 event_unref(&result->api.event);
1210 i_free(result->query);
1211 i_free(result->error);
1212 i_free(result);
1213 }
1214
result_finish(struct cassandra_result * result)1215 static void result_finish(struct cassandra_result *result)
1216 {
1217 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1218 bool free_result = TRUE;
1219
1220 result->finished = TRUE;
1221 result->finish_time = ioloop_timeval;
1222 driver_cassandra_result_unlink(db, result);
1223
1224 i_assert((result->error != NULL) == (result->iterator == NULL));
1225
1226 result->api.callback = TRUE;
1227 T_BEGIN {
1228 result->callback(&result->api, result->context);
1229 } T_END;
1230 result->api.callback = FALSE;
1231
1232 free_result = db->sync_result != &result->api;
1233 if (db->ioloop != NULL)
1234 io_loop_stop(db->ioloop);
1235
1236 i_assert(!free_result || result->api.refcount > 0);
1237 result->callback = NULL;
1238 if (free_result)
1239 sql_result_unref(&result->api);
1240 }
1241
query_resend_with_fallback(struct cassandra_result * result)1242 static void query_resend_with_fallback(struct cassandra_result *result)
1243 {
1244 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1245 time_t last_warning =
1246 ioloop_time - db->last_fallback_warning[result->query_type];
1247
1248 if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
1249 e_warning(db->api.event,
1250 "%s - retrying future %s queries with consistency %s (instead of %s)",
1251 result->error, cassandra_query_type_names[result->query_type],
1252 cass_consistency_string(result->fallback_consistency),
1253 cass_consistency_string(result->consistency));
1254 db->last_fallback_warning[result->query_type] = ioloop_time;
1255 }
1256 i_free_and_null(result->error);
1257 db->fallback_failures[result->query_type]++;
1258
1259 result->consistency = result->fallback_consistency;
1260 driver_cassandra_result_send_query(result);
1261 }
1262
counters_inc_error(struct cassandra_db * db,CassError error)1263 static void counters_inc_error(struct cassandra_db *db, CassError error)
1264 {
1265 switch (error) {
1266 case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1267 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
1268 break;
1269 case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1270 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
1271 break;
1272 case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1273 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
1274 break;
1275 case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1276 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
1277 break;
1278 case CASS_ERROR_SERVER_UNAVAILABLE:
1279 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
1280 break;
1281 default:
1282 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
1283 break;
1284 }
1285 }
1286
query_error_want_fallback(CassError error)1287 static bool query_error_want_fallback(CassError error)
1288 {
1289 switch (error) {
1290 case CASS_ERROR_LIB_WRITE_ERROR:
1291 case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1292 /* Communication problems on client side. Maybe it will work
1293 with fallback consistency? */
1294 return TRUE;
1295 case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1296 /* The client library couldn't connect to enough Cassandra
1297 nodes. The error message text is the same as for
1298 CASS_ERROR_SERVER_UNAVAILABLE. */
1299 return TRUE;
1300 case CASS_ERROR_SERVER_SERVER_ERROR:
1301 case CASS_ERROR_SERVER_OVERLOADED:
1302 case CASS_ERROR_SERVER_IS_BOOTSTRAPPING:
1303 case CASS_ERROR_SERVER_READ_TIMEOUT:
1304 case CASS_ERROR_SERVER_READ_FAILURE:
1305 case CASS_ERROR_SERVER_WRITE_FAILURE:
1306 /* Servers are having trouble. Maybe with fallback consistency
1307 we can reach non-troubled servers? */
1308 return TRUE;
1309 case CASS_ERROR_SERVER_UNAVAILABLE:
1310 /* Cassandra server knows that there aren't enough nodes
1311 available. "All hosts in current policy attempted and were
1312 either unavailable or failed". */
1313 return TRUE;
1314 case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1315 /* Cassandra server couldn't reach all the needed nodes.
1316 This may be because it hasn't yet detected that the servers
1317 are down, or because the servers are just too busy. We'll
1318 try the fallback consistency to avoid unnecessary temporary
1319 errors. */
1320 return TRUE;
1321 default:
1322 return FALSE;
1323 }
1324 }
1325
1326 static enum sql_result_error_type
driver_cassandra_error_is_uncertain(CassError error)1327 driver_cassandra_error_is_uncertain(CassError error)
1328 {
1329 switch (error) {
1330 case CASS_ERROR_SERVER_WRITE_FAILURE:
1331 /* This happens when some of the replicas that were contacted
1332 * by the coordinator replied with an error. */
1333 case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1334 /* A Cassandra timeout during a write query. */
1335 case CASS_ERROR_SERVER_UNAVAILABLE:
1336 /* The coordinator knows there are not enough replicas alive
1337 * to perform a query with the requested consistency level. */
1338 case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1339 /* A request sent from the driver has timed out. */
1340 case CASS_ERROR_LIB_WRITE_ERROR:
1341 /* A write error occured. */
1342 return SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN;
1343 default:
1344 return SQL_RESULT_ERROR_TYPE_UNKNOWN;
1345 }
1346 }
1347
query_callback(CassFuture * future,void * context)1348 static void query_callback(CassFuture *future, void *context)
1349 {
1350 struct cassandra_result *result = context;
1351 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1352 CassError error = cass_future_error_code(future);
1353
1354 if (error != CASS_OK) {
1355 const char *errmsg;
1356 size_t errsize;
1357 int msecs;
1358
1359 cass_future_error_message(future, &errmsg, &errsize);
1360 i_free(result->error);
1361
1362 msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
1363 counters_inc_error(db, error);
1364 /* Timeouts bring uncertainty whether the query succeeded or
1365 not. Also _SERVER_UNAVAILABLE could have actually written
1366 enough copies of the data for the query to succeed. */
1367 result->api.error_type = driver_cassandra_error_is_uncertain(error);
1368 result->error = i_strdup_printf(
1369 "Query '%s' failed: %.*s (in %u.%03u secs%s)",
1370 result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
1371 result->page_num == 0 ?
1372 "" :
1373 t_strdup_printf(", page %u", result->page_num));
1374
1375 if (query_error_want_fallback(error) &&
1376 result->fallback_consistency != result->consistency) {
1377 /* retry with fallback consistency */
1378 query_resend_with_fallback(result);
1379 return;
1380 }
1381 result_finish(result);
1382 return;
1383 }
1384 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
1385
1386 if (result->fallback_consistency != result->consistency) {
1387 /* non-fallback query finished successfully. if there had been
1388 any fallbacks, reset them. */
1389 db->fallback_failures[result->query_type] = 0;
1390 }
1391
1392 result->result = cass_future_get_result(future);
1393 result->iterator = cass_iterator_from_result(result->result);
1394 result_finish(result);
1395 }
1396
driver_cassandra_init_statement(struct cassandra_result * result)1397 static void driver_cassandra_init_statement(struct cassandra_result *result)
1398 {
1399 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1400
1401 cass_statement_set_consistency(result->statement, result->consistency);
1402
1403 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
1404 cass_statement_set_is_idempotent(result->statement, cass_true);
1405 #endif
1406 if (db->page_size > 0)
1407 cass_statement_set_paging_size(result->statement, db->page_size);
1408 }
1409
driver_cassandra_result_send_query(struct cassandra_result * result)1410 static void driver_cassandra_result_send_query(struct cassandra_result *result)
1411 {
1412 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1413 CassFuture *future;
1414
1415 i_assert(result->statement != NULL);
1416
1417 db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
1418 if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
1419 driver_cassandra_init_statement(result);
1420
1421 future = cass_session_execute(db->session, result->statement);
1422 driver_cassandra_set_callback(future, db, query_callback, result);
1423 }
1424
1425 static bool
driver_cassandra_want_fallback_query(struct cassandra_result * result)1426 driver_cassandra_want_fallback_query(struct cassandra_result *result)
1427 {
1428 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1429 unsigned int failure_count = db->fallback_failures[result->query_type];
1430 unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
1431 struct timeval tv;
1432
1433 if (failure_count == 0)
1434 return FALSE;
1435 /* double the retries every time. */
1436 for (i = 1; i < failure_count; i++) {
1437 msecs *= 2;
1438 if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
1439 msecs = CASSANDRA_FALLBACK_MAX_RETRY_MSECS;
1440 break;
1441 }
1442 }
1443 /* If last primary query sent timestamp + msecs is older than current
1444 time, we need to retry the primary query. Note that this practically
1445 prevents multiple primary queries from being attempted
1446 simultaneously, because the caller updates primary_query_last_sent
1447 immediately when returning.
1448
1449 The only time when multiple primary queries can be running in
1450 parallel is when the earlier query is being slow and hasn't finished
1451 early enough. This could even be a wanted feature, since while the
1452 first query might have to wait for a timeout, Cassandra could have
1453 been fixed in the meantime and the second query finishes
1454 successfully. */
1455 tv = db->primary_query_last_sent[result->query_type];
1456 timeval_add_msecs(&tv, msecs);
1457 return timeval_cmp(&ioloop_timeval, &tv) < 0;
1458 }
1459
driver_cassandra_send_query(struct cassandra_result * result)1460 static int driver_cassandra_send_query(struct cassandra_result *result)
1461 {
1462 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1463 int ret;
1464
1465 if (!SQL_DB_IS_READY(&db->api)) {
1466 if ((ret = sql_connect(&db->api)) <= 0) {
1467 if (ret < 0)
1468 driver_cassandra_close(db,
1469 "Couldn't connect to Cassandra");
1470 return ret;
1471 }
1472 }
1473
1474 if (result->page0_start_time.tv_sec == 0)
1475 result->page0_start_time = ioloop_timeval;
1476 result->start_time = ioloop_timeval;
1477 result->row_pool = pool_alloconly_create("cassandra result", 512);
1478 switch (result->query_type) {
1479 case CASSANDRA_QUERY_TYPE_READ:
1480 result->consistency = db->read_consistency;
1481 result->fallback_consistency = db->read_fallback_consistency;
1482 break;
1483 case CASSANDRA_QUERY_TYPE_READ_MORE:
1484 /* consistency is already set and we don't want to fallback
1485 at this point anymore. */
1486 result->fallback_consistency = result->consistency;
1487 break;
1488 case CASSANDRA_QUERY_TYPE_WRITE:
1489 result->consistency = db->write_consistency;
1490 result->fallback_consistency = db->write_fallback_consistency;
1491 break;
1492 case CASSANDRA_QUERY_TYPE_DELETE:
1493 result->consistency = db->delete_consistency;
1494 result->fallback_consistency = db->delete_fallback_consistency;
1495 break;
1496 case CASSANDRA_QUERY_TYPE_COUNT:
1497 i_unreached();
1498 }
1499
1500 if (driver_cassandra_want_fallback_query(result))
1501 result->consistency = result->fallback_consistency;
1502 else
1503 db->primary_query_last_sent[result->query_type] = ioloop_timeval;
1504
1505 driver_cassandra_result_send_query(result);
1506 result->query_sent = TRUE;
1507 return 1;
1508 }
1509
driver_cassandra_send_queries(struct cassandra_db * db)1510 static void driver_cassandra_send_queries(struct cassandra_db *db)
1511 {
1512 struct cassandra_result *const *results;
1513 unsigned int i, count;
1514
1515 results = array_get(&db->results, &count);
1516 for (i = 0; i < count; i++) {
1517 if (!results[i]->query_sent && results[i]->statement != NULL) {
1518 if (driver_cassandra_send_query(results[i]) <= 0)
1519 break;
1520 }
1521 }
1522 }
1523
exec_callback(struct sql_result * _result ATTR_UNUSED,void * context ATTR_UNUSED)1524 static void exec_callback(struct sql_result *_result ATTR_UNUSED,
1525 void *context ATTR_UNUSED)
1526 {
1527 }
1528
1529 static struct cassandra_result *
driver_cassandra_query_init(struct cassandra_db * db,const char * query,enum cassandra_query_type query_type,bool is_prepared,sql_query_callback_t * callback,void * context)1530 driver_cassandra_query_init(struct cassandra_db *db, const char *query,
1531 enum cassandra_query_type query_type,
1532 bool is_prepared,
1533 sql_query_callback_t *callback, void *context)
1534 {
1535 struct cassandra_result *result;
1536
1537 result = i_new(struct cassandra_result, 1);
1538 result->api = driver_cassandra_result;
1539 result->api.db = &db->api;
1540 result->api.refcount = 1;
1541 result->callback = callback;
1542 result->context = context;
1543 result->query_type = query_type;
1544 result->query = i_strdup(query);
1545 result->is_prepared = is_prepared;
1546 result->api.event = event_create(db->api.event);
1547 array_push_back(&db->results, &result);
1548 return result;
1549 }
1550
1551 static void
driver_cassandra_query_full(struct sql_db * _db,const char * query,enum cassandra_query_type query_type,sql_query_callback_t * callback,void * context)1552 driver_cassandra_query_full(struct sql_db *_db, const char *query,
1553 enum cassandra_query_type query_type,
1554 sql_query_callback_t *callback, void *context)
1555 {
1556 struct cassandra_db *db = (struct cassandra_db *)_db;
1557 struct cassandra_result *result;
1558
1559 result = driver_cassandra_query_init(db, query, query_type, FALSE,
1560 callback, context);
1561 result->statement = cass_statement_new(query, 0);
1562 (void)driver_cassandra_send_query(result);
1563 }
1564
driver_cassandra_exec(struct sql_db * db,const char * query)1565 static void driver_cassandra_exec(struct sql_db *db, const char *query)
1566 {
1567 driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE,
1568 exec_callback, NULL);
1569 }
1570
driver_cassandra_query(struct sql_db * db,const char * query,sql_query_callback_t * callback,void * context)1571 static void driver_cassandra_query(struct sql_db *db, const char *query,
1572 sql_query_callback_t *callback, void *context)
1573 {
1574 driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ,
1575 callback, context);
1576 }
1577
cassandra_query_s_callback(struct sql_result * result,void * context)1578 static void cassandra_query_s_callback(struct sql_result *result, void *context)
1579 {
1580 struct cassandra_db *db = context;
1581
1582 db->sync_result = result;
1583 }
1584
driver_cassandra_sync_init(struct cassandra_db * db)1585 static void driver_cassandra_sync_init(struct cassandra_db *db)
1586 {
1587 if (sql_connect(&db->api) < 0)
1588 return;
1589 db->orig_ioloop = current_ioloop;
1590 db->ioloop = io_loop_create();
1591 if (IS_CONNECTED(db))
1592 return;
1593 i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
1594
1595 db->io_pipe = io_loop_move_io(&db->io_pipe);
1596 /* wait for connecting to finish */
1597 io_loop_run(db->ioloop);
1598 }
1599
driver_cassandra_sync_deinit(struct cassandra_db * db)1600 static void driver_cassandra_sync_deinit(struct cassandra_db *db)
1601 {
1602 if (db->orig_ioloop == NULL)
1603 return;
1604 if (db->io_pipe != NULL) {
1605 io_loop_set_current(db->orig_ioloop);
1606 db->io_pipe = io_loop_move_io(&db->io_pipe);
1607 io_loop_set_current(db->ioloop);
1608 }
1609 io_loop_destroy(&db->ioloop);
1610 }
1611
1612 static struct sql_result *
driver_cassandra_sync_query(struct cassandra_db * db,const char * query,enum cassandra_query_type query_type)1613 driver_cassandra_sync_query(struct cassandra_db *db, const char *query,
1614 enum cassandra_query_type query_type)
1615 {
1616 struct sql_result *result;
1617
1618 i_assert(db->sync_result == NULL);
1619
1620 switch (db->api.state) {
1621 case SQL_DB_STATE_CONNECTING:
1622 case SQL_DB_STATE_BUSY:
1623 i_unreached();
1624 case SQL_DB_STATE_DISCONNECTED:
1625 sql_not_connected_result.refcount++;
1626 return &sql_not_connected_result;
1627 case SQL_DB_STATE_IDLE:
1628 break;
1629 }
1630
1631 driver_cassandra_query_full(&db->api, query, query_type,
1632 cassandra_query_s_callback, db);
1633 if (db->sync_result == NULL) {
1634 db->io_pipe = io_loop_move_io(&db->io_pipe);
1635 io_loop_run(db->ioloop);
1636 }
1637
1638 result = db->sync_result;
1639 if (result == &sql_not_connected_result) {
1640 /* we don't end up in cassandra's free function, so sync_result
1641 won't be set to NULL if we don't do it here. */
1642 db->sync_result = NULL;
1643 } else if (result == NULL) {
1644 result = &sql_not_connected_result;
1645 result->refcount++;
1646 }
1647 return result;
1648 }
1649
1650 static struct sql_result *
driver_cassandra_query_s(struct sql_db * _db,const char * query)1651 driver_cassandra_query_s(struct sql_db *_db, const char *query)
1652 {
1653 struct cassandra_db *db = (struct cassandra_db *)_db;
1654 struct sql_result *result;
1655
1656 driver_cassandra_sync_init(db);
1657 result = driver_cassandra_sync_query(db, query,
1658 CASSANDRA_QUERY_TYPE_READ);
1659 driver_cassandra_sync_deinit(db);
1660 return result;
1661 }
1662
1663 static int
driver_cassandra_get_value(struct cassandra_result * result,const CassValue * value,const char ** str_r,size_t * len_r)1664 driver_cassandra_get_value(struct cassandra_result *result,
1665 const CassValue *value, const char **str_r,
1666 size_t *len_r)
1667 {
1668 const unsigned char *output;
1669 void *output_dup;
1670 size_t output_size;
1671 CassError rc;
1672 const char *type;
1673
1674 if (cass_value_is_null(value) != 0) {
1675 *str_r = NULL;
1676 *len_r = 0;
1677 return 0;
1678 }
1679
1680 switch (cass_data_type_type(cass_value_data_type(value))) {
1681 case CASS_VALUE_TYPE_INT: {
1682 cass_int32_t num;
1683
1684 rc = cass_value_get_int32(value, &num);
1685 if (rc == CASS_OK) {
1686 const char *str = t_strdup_printf("%d", num);
1687 output_size = strlen(str);
1688 output = (const void *)str;
1689 }
1690 type = "int32";
1691 break;
1692 }
1693 case CASS_VALUE_TYPE_TIMESTAMP:
1694 case CASS_VALUE_TYPE_BIGINT: {
1695 cass_int64_t num;
1696
1697 rc = cass_value_get_int64(value, &num);
1698 if (rc == CASS_OK) {
1699 const char *str = t_strdup_printf("%lld", (long long)num);
1700 output_size = strlen(str);
1701 output = (const void *)str;
1702 }
1703 type = "int64";
1704 break;
1705 }
1706 default:
1707 rc = cass_value_get_bytes(value, &output, &output_size);
1708 type = "bytes";
1709 break;
1710 }
1711 if (rc != CASS_OK) {
1712 i_free(result->error);
1713 result->error = i_strdup_printf("Couldn't get value as %s: %s",
1714 type, cass_error_desc(rc));
1715 return -1;
1716 }
1717 output_dup = p_malloc(result->row_pool, output_size + 1);
1718 memcpy(output_dup, output, output_size);
1719 *str_r = output_dup;
1720 *len_r = output_size;
1721 return 0;
1722 }
1723
driver_cassandra_result_next_page(struct cassandra_result * result)1724 static int driver_cassandra_result_next_page(struct cassandra_result *result)
1725 {
1726 struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1727
1728 if (db->page_size == 0) {
1729 /* no paging */
1730 return 0;
1731 }
1732 if (cass_result_has_more_pages(result->result) == cass_false)
1733 return 0;
1734
1735 /* callers that don't support sql_query_more() will still get a useful
1736 error message. */
1737 i_free(result->error);
1738 result->error = i_strdup(
1739 "Paged query has more results, but not supported by the caller");
1740 return SQL_RESULT_NEXT_MORE;
1741 }
1742
driver_cassandra_result_next_row(struct sql_result * _result)1743 static int driver_cassandra_result_next_row(struct sql_result *_result)
1744 {
1745 struct cassandra_result *result = (struct cassandra_result *)_result;
1746 const CassRow *row;
1747 const CassValue *value;
1748 const char *str;
1749 size_t size;
1750 unsigned int i;
1751 int ret = 1;
1752
1753 if (result->iterator == NULL)
1754 return -1;
1755
1756 if (cass_iterator_next(result->iterator) == 0)
1757 return driver_cassandra_result_next_page(result);
1758 result->row_count++;
1759 result->total_row_count++;
1760
1761 p_clear(result->row_pool);
1762 p_array_init(&result->fields, result->row_pool, 8);
1763 p_array_init(&result->field_sizes, result->row_pool, 8);
1764
1765 row = cass_iterator_get_row(result->iterator);
1766 for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
1767 if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
1768 ret = -1;
1769 break;
1770 }
1771 array_push_back(&result->fields, &str);
1772 array_push_back(&result->field_sizes, &size);
1773 }
1774 return ret;
1775 }
1776
1777 static void
driver_cassandra_result_more(struct sql_result ** _result,bool async,sql_query_callback_t * callback,void * context)1778 driver_cassandra_result_more(struct sql_result **_result, bool async,
1779 sql_query_callback_t *callback, void *context)
1780 {
1781 struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
1782 struct cassandra_result *new_result;
1783 struct cassandra_result *old_result =
1784 (struct cassandra_result *)*_result;
1785
1786 /* Initialize the next page as a new sql_result */
1787 new_result = driver_cassandra_query_init(db, old_result->query,
1788 CASSANDRA_QUERY_TYPE_READ_MORE,
1789 old_result->is_prepared,
1790 callback, context);
1791
1792 /* Preserve the statement and update its paging state */
1793 new_result->statement = old_result->statement;
1794 old_result->statement = NULL;
1795 cass_statement_set_paging_state(new_result->statement,
1796 old_result->result);
1797 old_result->paging_continues = TRUE;
1798 /* The caller did support paging. Clear out the "...not supported by
1799 the caller" error text, so it won't be in the debug log output. */
1800 i_free_and_null(old_result->error);
1801
1802 new_result->timestamp = old_result->timestamp;
1803 new_result->consistency = old_result->consistency;
1804 new_result->page_num = old_result->page_num + 1;
1805 new_result->page0_start_time = old_result->page0_start_time;
1806 new_result->total_row_count = old_result->total_row_count;
1807
1808 sql_result_unref(*_result);
1809 *_result = NULL;
1810
1811 if (async)
1812 (void)driver_cassandra_send_query(new_result);
1813 else {
1814 i_assert(db->api.state == SQL_DB_STATE_IDLE);
1815 driver_cassandra_sync_init(db);
1816 (void)driver_cassandra_send_query(new_result);
1817 if (new_result->result == NULL) {
1818 db->io_pipe = io_loop_move_io(&db->io_pipe);
1819 io_loop_run(db->ioloop);
1820 }
1821 driver_cassandra_sync_deinit(db);
1822
1823 callback(&new_result->api, context);
1824 }
1825 }
1826
1827 static unsigned int
driver_cassandra_result_get_fields_count(struct sql_result * _result)1828 driver_cassandra_result_get_fields_count(struct sql_result *_result)
1829 {
1830 struct cassandra_result *result = (struct cassandra_result *)_result;
1831
1832 return array_count(&result->fields);
1833 }
1834
1835 static const char *
driver_cassandra_result_get_field_name(struct sql_result * _result ATTR_UNUSED,unsigned int idx ATTR_UNUSED)1836 driver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
1837 unsigned int idx ATTR_UNUSED)
1838 {
1839 i_unreached();
1840 }
1841
1842 static int
driver_cassandra_result_find_field(struct sql_result * _result ATTR_UNUSED,const char * field_name ATTR_UNUSED)1843 driver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
1844 const char *field_name ATTR_UNUSED)
1845 {
1846 i_unreached();
1847 }
1848
1849 static const char *
driver_cassandra_result_get_field_value(struct sql_result * _result,unsigned int idx)1850 driver_cassandra_result_get_field_value(struct sql_result *_result,
1851 unsigned int idx)
1852 {
1853 struct cassandra_result *result = (struct cassandra_result *)_result;
1854
1855 return array_idx_elem(&result->fields, idx);
1856 }
1857
1858 static const unsigned char *
driver_cassandra_result_get_field_value_binary(struct sql_result * _result ATTR_UNUSED,unsigned int idx ATTR_UNUSED,size_t * size_r ATTR_UNUSED)1859 driver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
1860 unsigned int idx ATTR_UNUSED,
1861 size_t *size_r ATTR_UNUSED)
1862 {
1863 struct cassandra_result *result = (struct cassandra_result *)_result;
1864 const char *str;
1865 const size_t *sizep;
1866
1867 str = array_idx_elem(&result->fields, idx);
1868 sizep = array_idx(&result->field_sizes, idx);
1869 *size_r = *sizep;
1870 return (const void *)str;
1871 }
1872
1873 static const char *
driver_cassandra_result_find_field_value(struct sql_result * result ATTR_UNUSED,const char * field_name ATTR_UNUSED)1874 driver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
1875 const char *field_name ATTR_UNUSED)
1876 {
1877 i_unreached();
1878 }
1879
1880 static const char *const *
driver_cassandra_result_get_values(struct sql_result * _result)1881 driver_cassandra_result_get_values(struct sql_result *_result)
1882 {
1883 struct cassandra_result *result = (struct cassandra_result *)_result;
1884
1885 return array_front(&result->fields);
1886 }
1887
driver_cassandra_result_get_error(struct sql_result * _result)1888 static const char *driver_cassandra_result_get_error(struct sql_result *_result)
1889 {
1890 struct cassandra_result *result = (struct cassandra_result *)_result;
1891
1892 if (result->error != NULL)
1893 return result->error;
1894 return "FIXME";
1895 }
1896
1897 static struct sql_transaction_context *
driver_cassandra_transaction_begin(struct sql_db * db)1898 driver_cassandra_transaction_begin(struct sql_db *db)
1899 {
1900 struct cassandra_transaction_context *ctx;
1901
1902 ctx = i_new(struct cassandra_transaction_context, 1);
1903 ctx->ctx.db = db;
1904 ctx->ctx.event = event_create(db->event);
1905 ctx->refcount = 1;
1906 return &ctx->ctx;
1907 }
1908
1909 static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context ** _ctx)1910 driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
1911 {
1912 struct cassandra_transaction_context *ctx = *_ctx;
1913
1914 *_ctx = NULL;
1915 i_assert(ctx->refcount > 0);
1916 if (--ctx->refcount > 0)
1917 return;
1918
1919 event_unref(&ctx->ctx.event);
1920 i_free(ctx->query);
1921 i_free(ctx->error);
1922 i_free(ctx);
1923 }
1924
1925 static void
transaction_set_failed(struct cassandra_transaction_context * ctx,const char * error)1926 transaction_set_failed(struct cassandra_transaction_context *ctx,
1927 const char *error)
1928 {
1929 if (ctx->failed) {
1930 i_assert(ctx->error != NULL);
1931 } else {
1932 i_assert(ctx->error == NULL);
1933 ctx->failed = TRUE;
1934 ctx->error = i_strdup(error);
1935 }
1936 }
1937
1938 static void
transaction_commit_callback(struct sql_result * result,void * context)1939 transaction_commit_callback(struct sql_result *result, void *context)
1940 {
1941 struct cassandra_transaction_context *ctx = context;
1942 struct sql_commit_result commit_result;
1943
1944 i_zero(&commit_result);
1945 if (sql_result_next_row(result) < 0) {
1946 commit_result.error = sql_result_get_error(result);
1947 commit_result.error_type = sql_result_get_error_type(result);
1948 e_debug(sql_transaction_finished_event(&ctx->ctx)->
1949 add_str("error", commit_result.error)->event(),
1950 "Transaction failed");
1951 } else {
1952 e_debug(sql_transaction_finished_event(&ctx->ctx)->event(),
1953 "Transaction committed");
1954 }
1955 ctx->callback(&commit_result, ctx->context);
1956 driver_cassandra_transaction_unref(&ctx);
1957 }
1958
1959 static void
driver_cassandra_transaction_commit(struct sql_transaction_context * _ctx,sql_commit_callback_t * callback,void * context)1960 driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
1961 sql_commit_callback_t *callback, void *context)
1962 {
1963 struct cassandra_transaction_context *ctx =
1964 (struct cassandra_transaction_context *)_ctx;
1965 struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
1966 enum cassandra_query_type query_type;
1967 struct sql_commit_result result;
1968
1969 i_zero(&result);
1970 ctx->callback = callback;
1971 ctx->context = context;
1972
1973 if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
1974 if (ctx->failed)
1975 result.error = ctx->error;
1976
1977 e_debug(sql_transaction_finished_event(_ctx)->
1978 add_str("error", "Rolled back")->event(),
1979 "Transaction rolled back");
1980 callback(&result, context);
1981 driver_cassandra_transaction_unref(&ctx);
1982 return;
1983 }
1984
1985 /* just a single query, send it */
1986 const char *query = ctx->query != NULL ?
1987 ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
1988 if (strncasecmp(query, "DELETE ", 7) == 0)
1989 query_type = CASSANDRA_QUERY_TYPE_DELETE;
1990 else
1991 query_type = CASSANDRA_QUERY_TYPE_WRITE;
1992
1993 if (ctx->query != NULL) {
1994 struct cassandra_result *cass_result;
1995
1996 cass_result = driver_cassandra_query_init(db, query, query_type,
1997 FALSE, transaction_commit_callback, ctx);
1998 cass_result->statement = cass_statement_new(query, 0);
1999 if (ctx->query_timestamp != 0) {
2000 cass_result->timestamp = ctx->query_timestamp;
2001 cass_statement_set_timestamp(cass_result->statement,
2002 ctx->query_timestamp);
2003 }
2004 (void)driver_cassandra_send_query(cass_result);
2005 } else {
2006 ctx->stmt->result =
2007 driver_cassandra_query_init(db, query, query_type, TRUE,
2008 transaction_commit_callback, ctx);
2009 if (ctx->stmt->cass_stmt == NULL) {
2010 /* wait for prepare to finish */
2011 } else {
2012 ctx->stmt->result->statement = ctx->stmt->cass_stmt;
2013 ctx->stmt->result->timestamp = ctx->stmt->timestamp;
2014 (void)driver_cassandra_send_query(ctx->stmt->result);
2015 pool_unref(&ctx->stmt->stmt.pool);
2016 }
2017 }
2018 }
2019
2020 static void
driver_cassandra_try_commit_s(struct cassandra_transaction_context * ctx)2021 driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
2022 {
2023 struct sql_transaction_context *_ctx = &ctx->ctx;
2024 struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
2025 struct sql_result *result = NULL;
2026 enum cassandra_query_type query_type;
2027
2028 /* just a single query, send it */
2029 if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
2030 query_type = CASSANDRA_QUERY_TYPE_DELETE;
2031 else
2032 query_type = CASSANDRA_QUERY_TYPE_WRITE;
2033 driver_cassandra_sync_init(db);
2034 result = driver_cassandra_sync_query(db, ctx->query, query_type);
2035 driver_cassandra_sync_deinit(db);
2036
2037 if (sql_result_next_row(result) < 0)
2038 transaction_set_failed(ctx, sql_result_get_error(result));
2039 sql_result_unref(result);
2040 }
2041
2042 static int
driver_cassandra_transaction_commit_s(struct sql_transaction_context * _ctx,const char ** error_r)2043 driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
2044 const char **error_r)
2045 {
2046 struct cassandra_transaction_context *ctx =
2047 (struct cassandra_transaction_context *)_ctx;
2048
2049 if (ctx->stmt != NULL) {
2050 /* nothing should be using this - don't bother implementing */
2051 i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
2052 }
2053
2054 if (ctx->query != NULL && !ctx->failed)
2055 driver_cassandra_try_commit_s(ctx);
2056 *error_r = t_strdup(ctx->error);
2057
2058 i_assert(ctx->refcount == 1);
2059 i_assert((*error_r != NULL) == ctx->failed);
2060 driver_cassandra_transaction_unref(&ctx);
2061 return *error_r == NULL ? 0 : -1;
2062 }
2063
2064 static void
driver_cassandra_transaction_rollback(struct sql_transaction_context * _ctx)2065 driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
2066 {
2067 struct cassandra_transaction_context *ctx =
2068 (struct cassandra_transaction_context *)_ctx;
2069
2070 i_assert(ctx->refcount == 1);
2071 driver_cassandra_transaction_unref(&ctx);
2072 }
2073
2074 static void
driver_cassandra_update(struct sql_transaction_context * _ctx,const char * query,unsigned int * affected_rows)2075 driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
2076 unsigned int *affected_rows)
2077 {
2078 struct cassandra_transaction_context *ctx =
2079 (struct cassandra_transaction_context *)_ctx;
2080
2081 i_assert(affected_rows == NULL);
2082
2083 if (ctx->query != NULL || ctx->stmt != NULL) {
2084 transaction_set_failed(ctx, "Multiple changes in transaction not supported");
2085 return;
2086 }
2087 ctx->query = i_strdup(query);
2088 }
2089
2090 static const char *
driver_cassandra_escape_blob(struct sql_db * _db ATTR_UNUSED,const unsigned char * data,size_t size)2091 driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
2092 const unsigned char *data, size_t size)
2093 {
2094 string_t *str = t_str_new(128);
2095
2096 str_append(str, "0x");
2097 binary_to_hex_append(str, data, size);
2098 return str_c(str);
2099 }
2100
2101 static CassError
driver_cassandra_bind_int(struct cassandra_sql_statement * stmt,unsigned int column_idx,int64_t value)2102 driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
2103 unsigned int column_idx, int64_t value)
2104 {
2105 const CassDataType *data_type;
2106 CassValueType value_type;
2107
2108 i_assert(stmt->prep != NULL);
2109
2110 /* statements require exactly correct value type */
2111 data_type = cass_prepared_parameter_data_type(stmt->prep->prepared,
2112 column_idx);
2113 value_type = cass_data_type_type(data_type);
2114
2115 switch (value_type) {
2116 case CASS_VALUE_TYPE_INT:
2117 if (value < INT32_MIN || value > INT32_MAX)
2118 return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2119 return cass_statement_bind_int32(stmt->cass_stmt, column_idx,
2120 value);
2121 case CASS_VALUE_TYPE_TIMESTAMP:
2122 case CASS_VALUE_TYPE_BIGINT:
2123 return cass_statement_bind_int64(stmt->cass_stmt, column_idx,
2124 value);
2125 case CASS_VALUE_TYPE_SMALL_INT:
2126 if (value < INT16_MIN || value > INT16_MAX)
2127 return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2128 return cass_statement_bind_int16(stmt->cass_stmt, column_idx,
2129 value);
2130 case CASS_VALUE_TYPE_TINY_INT:
2131 if (value < INT8_MIN || value > INT8_MAX)
2132 return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2133 return cass_statement_bind_int8(stmt->cass_stmt, column_idx,
2134 value);
2135 default:
2136 return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2137 }
2138 }
2139
prepare_finish_arg(struct cassandra_sql_statement * stmt,const struct cassandra_sql_arg * arg)2140 static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
2141 const struct cassandra_sql_arg *arg)
2142 {
2143 CassError rc;
2144
2145 if (arg->value_str != NULL) {
2146 rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
2147 arg->value_str);
2148 } else if (arg->value_binary != NULL) {
2149 rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
2150 arg->value_binary,
2151 arg->value_binary_size);
2152 } else {
2153 rc = driver_cassandra_bind_int(stmt, arg->column_idx,
2154 arg->value_int64);
2155 }
2156 if (rc != CASS_OK) {
2157 e_error(stmt->stmt.db->event,
2158 "Statement '%s': Failed to bind column %u: %s",
2159 stmt->stmt.query_template, arg->column_idx,
2160 cass_error_desc(rc));
2161 }
2162 }
2163
prepare_finish_statement(struct cassandra_sql_statement * stmt)2164 static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
2165 {
2166 const struct cassandra_sql_arg *arg;
2167
2168 if (stmt->prep->prepared == NULL) {
2169 i_assert(stmt->prep->error != NULL);
2170
2171 if (stmt->result != NULL) {
2172 stmt->result->error = i_strdup(stmt->prep->error);
2173 result_finish(stmt->result);
2174 }
2175 pool_unref(&stmt->stmt.pool);
2176 return;
2177 }
2178 stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
2179
2180 if (stmt->timestamp != 0)
2181 cass_statement_set_timestamp(stmt->cass_stmt, stmt->timestamp);
2182
2183 if (array_is_created(&stmt->pending_args)) {
2184 array_foreach(&stmt->pending_args, arg)
2185 prepare_finish_arg(stmt, arg);
2186 }
2187 if (stmt->result != NULL) {
2188 stmt->result->statement = stmt->cass_stmt;
2189 stmt->result->timestamp = stmt->timestamp;
2190 (void)driver_cassandra_send_query(stmt->result);
2191 pool_unref(&stmt->stmt.pool);
2192 }
2193 }
2194
2195 static void
prepare_finish_pending_statements(struct cassandra_sql_prepared_statement * prep_stmt)2196 prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
2197 {
2198 struct cassandra_sql_statement *stmt;
2199
2200 array_foreach_elem(&prep_stmt->pending_statements, stmt)
2201 prepare_finish_statement(stmt);
2202 array_clear(&prep_stmt->pending_statements);
2203 }
2204
prepare_callback(CassFuture * future,void * context)2205 static void prepare_callback(CassFuture *future, void *context)
2206 {
2207 struct cassandra_sql_prepared_statement *prep_stmt = context;
2208 CassError error = cass_future_error_code(future);
2209
2210 if (error != CASS_OK) {
2211 const char *errmsg;
2212 size_t errsize;
2213
2214 cass_future_error_message(future, &errmsg, &errsize);
2215 i_free(prep_stmt->error);
2216 prep_stmt->error = i_strndup(errmsg, errsize);
2217 } else {
2218 prep_stmt->prepared = cass_future_get_prepared(future);
2219 }
2220
2221 prepare_finish_pending_statements(prep_stmt);
2222 }
2223
prepare_start(struct cassandra_sql_prepared_statement * prep_stmt)2224 static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
2225 {
2226 struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
2227 CassFuture *future;
2228
2229 if (!SQL_DB_IS_READY(&db->api)) {
2230 if (!prep_stmt->pending) {
2231 prep_stmt->pending = TRUE;
2232 array_push_back(&db->pending_prepares, &prep_stmt);
2233
2234 if (sql_connect(&db->api) < 0)
2235 i_unreached();
2236 }
2237 return;
2238 }
2239
2240 /* clear the current error in case we're retrying */
2241 i_free_and_null(prep_stmt->error);
2242
2243 future = cass_session_prepare(db->session,
2244 prep_stmt->prep_stmt.query_template);
2245 driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
2246 }
2247
driver_cassandra_prepare_pending(struct cassandra_db * db)2248 static void driver_cassandra_prepare_pending(struct cassandra_db *db)
2249 {
2250 struct cassandra_sql_prepared_statement *prep_stmt;
2251
2252 i_assert(SQL_DB_IS_READY(&db->api));
2253
2254 array_foreach_elem(&db->pending_prepares, prep_stmt) {
2255 prep_stmt->pending = FALSE;
2256 prepare_start(prep_stmt);
2257 }
2258 array_clear(&db->pending_prepares);
2259 }
2260
2261 static struct sql_prepared_statement *
driver_cassandra_prepared_statement_init(struct sql_db * db,const char * query_template)2262 driver_cassandra_prepared_statement_init(struct sql_db *db,
2263 const char *query_template)
2264 {
2265 struct cassandra_sql_prepared_statement *prep_stmt =
2266 i_new(struct cassandra_sql_prepared_statement, 1);
2267 prep_stmt->prep_stmt.db = db;
2268 prep_stmt->prep_stmt.refcount = 1;
2269 prep_stmt->prep_stmt.query_template = i_strdup(query_template);
2270 i_array_init(&prep_stmt->pending_statements, 4);
2271 prepare_start(prep_stmt);
2272 return &prep_stmt->prep_stmt;
2273 }
2274
2275 static void
driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement * _prep_stmt)2276 driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
2277 {
2278 struct cassandra_sql_prepared_statement *prep_stmt =
2279 (struct cassandra_sql_prepared_statement *)_prep_stmt;
2280
2281 i_assert(array_count(&prep_stmt->pending_statements) == 0);
2282 if (prep_stmt->prepared != NULL)
2283 cass_prepared_free(prep_stmt->prepared);
2284 array_free(&prep_stmt->pending_statements);
2285 i_free(prep_stmt->error);
2286 i_free(prep_stmt->prep_stmt.query_template);
2287 i_free(prep_stmt);
2288 }
2289
2290 static struct sql_statement *
driver_cassandra_statement_init(struct sql_db * db ATTR_UNUSED,const char * query_template ATTR_UNUSED)2291 driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
2292 const char *query_template ATTR_UNUSED)
2293 {
2294 pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
2295 struct cassandra_sql_statement *stmt =
2296 p_new(pool, struct cassandra_sql_statement, 1);
2297 stmt->stmt.pool = pool;
2298 return &stmt->stmt;
2299 }
2300
2301 static struct sql_statement *
driver_cassandra_statement_init_prepared(struct sql_prepared_statement * _prep_stmt)2302 driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
2303 {
2304 struct cassandra_sql_prepared_statement *prep_stmt =
2305 (struct cassandra_sql_prepared_statement *)_prep_stmt;
2306 pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
2307 struct cassandra_sql_statement *stmt =
2308 p_new(pool, struct cassandra_sql_statement, 1);
2309
2310 stmt->stmt.pool = pool;
2311 stmt->stmt.query_template =
2312 p_strdup(stmt->stmt.pool, prep_stmt->prep_stmt.query_template);
2313 stmt->prep = prep_stmt;
2314
2315 if (prep_stmt->prepared != NULL) {
2316 /* statement is already prepared. we can use it immediately. */
2317 stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
2318 } else {
2319 if (prep_stmt->error != NULL)
2320 prepare_start(prep_stmt);
2321 /* need to wait until prepare is finished */
2322 array_push_back(&prep_stmt->pending_statements, &stmt);
2323 }
2324 return &stmt->stmt;
2325 }
2326
2327 static void
driver_cassandra_statement_abort(struct sql_statement * _stmt)2328 driver_cassandra_statement_abort(struct sql_statement *_stmt)
2329 {
2330 struct cassandra_sql_statement *stmt =
2331 (struct cassandra_sql_statement *)_stmt;
2332
2333 if (stmt->cass_stmt != NULL)
2334 cass_statement_free(stmt->cass_stmt);
2335 }
2336
2337 static void
driver_cassandra_statement_set_timestamp(struct sql_statement * _stmt,const struct timespec * ts)2338 driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
2339 const struct timespec *ts)
2340 {
2341 struct cassandra_sql_statement *stmt =
2342 (struct cassandra_sql_statement *)_stmt;
2343 cass_int64_t ts_usecs =
2344 (cass_int64_t)ts->tv_sec * 1000000ULL +
2345 ts->tv_nsec / 1000;
2346
2347 i_assert(stmt->result == NULL);
2348
2349 if (stmt->cass_stmt != NULL)
2350 cass_statement_set_timestamp(stmt->cass_stmt, ts_usecs);
2351 stmt->timestamp = ts_usecs;
2352 }
2353
2354 static struct cassandra_sql_arg *
driver_cassandra_add_pending_arg(struct cassandra_sql_statement * stmt,unsigned int column_idx)2355 driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
2356 unsigned int column_idx)
2357 {
2358 struct cassandra_sql_arg *arg;
2359
2360 if (!array_is_created(&stmt->pending_args))
2361 p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
2362 arg = array_append_space(&stmt->pending_args);
2363 arg->column_idx = column_idx;
2364 return arg;
2365 }
2366
2367 static void
driver_cassandra_statement_bind_str(struct sql_statement * _stmt,unsigned int column_idx,const char * value)2368 driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
2369 unsigned int column_idx,
2370 const char *value)
2371 {
2372 struct cassandra_sql_statement *stmt =
2373 (struct cassandra_sql_statement *)_stmt;
2374 if (stmt->cass_stmt != NULL)
2375 cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
2376 else if (stmt->prep != NULL) {
2377 struct cassandra_sql_arg *arg =
2378 driver_cassandra_add_pending_arg(stmt, column_idx);
2379 arg->value_str = p_strdup(_stmt->pool, value);
2380 }
2381 }
2382
2383 static void
driver_cassandra_statement_bind_binary(struct sql_statement * _stmt,unsigned int column_idx,const void * value,size_t value_size)2384 driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
2385 unsigned int column_idx,
2386 const void *value, size_t value_size)
2387 {
2388 struct cassandra_sql_statement *stmt =
2389 (struct cassandra_sql_statement *)_stmt;
2390
2391 if (stmt->cass_stmt != NULL) {
2392 cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
2393 value, value_size);
2394 } else if (stmt->prep != NULL) {
2395 struct cassandra_sql_arg *arg =
2396 driver_cassandra_add_pending_arg(stmt, column_idx);
2397 arg->value_binary = value_size == 0 ? &uchar_nul :
2398 p_memdup(_stmt->pool, value, value_size);
2399 arg->value_binary_size = value_size;
2400 }
2401 }
2402
2403 static void
driver_cassandra_statement_bind_int64(struct sql_statement * _stmt,unsigned int column_idx,int64_t value)2404 driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
2405 unsigned int column_idx, int64_t value)
2406 {
2407 struct cassandra_sql_statement *stmt =
2408 (struct cassandra_sql_statement *)_stmt;
2409
2410 if (stmt->cass_stmt != NULL)
2411 driver_cassandra_bind_int(stmt, column_idx, value);
2412 else if (stmt->prep != NULL) {
2413 struct cassandra_sql_arg *arg =
2414 driver_cassandra_add_pending_arg(stmt, column_idx);
2415 arg->value_int64 = value;
2416 }
2417 }
2418
2419 static void
driver_cassandra_statement_query(struct sql_statement * _stmt,sql_query_callback_t * callback,void * context)2420 driver_cassandra_statement_query(struct sql_statement *_stmt,
2421 sql_query_callback_t *callback, void *context)
2422 {
2423 struct cassandra_sql_statement *stmt =
2424 (struct cassandra_sql_statement *)_stmt;
2425 struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
2426 const char *query = sql_statement_get_query(_stmt);
2427 bool is_prepared = stmt->cass_stmt != NULL || stmt->prep != NULL;
2428
2429 stmt->result = driver_cassandra_query_init(db, query,
2430 CASSANDRA_QUERY_TYPE_READ,
2431 is_prepared,
2432 callback, context);
2433 if (stmt->cass_stmt != NULL) {
2434 stmt->result->statement = stmt->cass_stmt;
2435 stmt->result->timestamp = stmt->timestamp;
2436 } else if (stmt->prep != NULL) {
2437 /* wait for prepare to finish */
2438 return;
2439 } else {
2440 stmt->result->statement = cass_statement_new(query, 0);
2441 stmt->result->timestamp = stmt->timestamp;
2442 if (stmt->timestamp != 0) {
2443 cass_statement_set_timestamp(stmt->result->statement,
2444 stmt->timestamp);
2445 }
2446 }
2447 (void)driver_cassandra_send_query(stmt->result);
2448 pool_unref(&_stmt->pool);
2449 }
2450
2451 static struct sql_result *
driver_cassandra_statement_query_s(struct sql_statement * _stmt ATTR_UNUSED)2452 driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
2453 {
2454 i_panic("cassandra: sql_statement_query_s() not supported");
2455 }
2456
2457 static void
driver_cassandra_update_stmt(struct sql_transaction_context * _ctx,struct sql_statement * _stmt,unsigned int * affected_rows)2458 driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
2459 struct sql_statement *_stmt,
2460 unsigned int *affected_rows)
2461 {
2462 struct cassandra_transaction_context *ctx =
2463 (struct cassandra_transaction_context *)_ctx;
2464 struct cassandra_sql_statement *stmt =
2465 (struct cassandra_sql_statement *)_stmt;
2466
2467 i_assert(affected_rows == NULL);
2468
2469 if (ctx->query != NULL || ctx->stmt != NULL) {
2470 transaction_set_failed(ctx,
2471 "Multiple changes in transaction not supported");
2472 return;
2473 }
2474 if (stmt->prep != NULL)
2475 ctx->stmt = stmt;
2476 else {
2477 ctx->query = i_strdup(sql_statement_get_query(_stmt));
2478 ctx->query_timestamp = stmt->timestamp;
2479 pool_unref(&_stmt->pool);
2480 }
2481 }
2482
driver_cassandra_have_work(struct cassandra_db * db)2483 static bool driver_cassandra_have_work(struct cassandra_db *db)
2484 {
2485 return array_not_empty(&db->pending_prepares) ||
2486 array_not_empty(&db->callbacks) ||
2487 array_not_empty(&db->results);
2488 }
2489
driver_cassandra_wait(struct sql_db * _db)2490 static void driver_cassandra_wait(struct sql_db *_db)
2491 {
2492 struct cassandra_db *db = (struct cassandra_db *)_db;
2493
2494 if (!driver_cassandra_have_work(db))
2495 return;
2496
2497 struct ioloop *prev_ioloop = current_ioloop;
2498 db->ioloop = io_loop_create();
2499 db->io_pipe = io_loop_move_io(&db->io_pipe);
2500 while (driver_cassandra_have_work(db))
2501 io_loop_run(db->ioloop);
2502
2503 io_loop_set_current(prev_ioloop);
2504 db->io_pipe = io_loop_move_io(&db->io_pipe);
2505 io_loop_set_current(db->ioloop);
2506 io_loop_destroy(&db->ioloop);
2507 }
2508
2509 const struct sql_db driver_cassandra_db = {
2510 .name = "cassandra",
2511 .flags = SQL_DB_FLAG_PREP_STATEMENTS,
2512
2513 .v = {
2514 .init_full = driver_cassandra_init_full_v,
2515 .deinit = driver_cassandra_deinit_v,
2516 .connect = driver_cassandra_connect,
2517 .disconnect = driver_cassandra_disconnect,
2518 .escape_string = driver_cassandra_escape_string,
2519 .exec = driver_cassandra_exec,
2520 .query = driver_cassandra_query,
2521 .query_s = driver_cassandra_query_s,
2522 .wait = driver_cassandra_wait,
2523
2524 .transaction_begin = driver_cassandra_transaction_begin,
2525 .transaction_commit = driver_cassandra_transaction_commit,
2526 .transaction_commit_s = driver_cassandra_transaction_commit_s,
2527 .transaction_rollback = driver_cassandra_transaction_rollback,
2528
2529 .update = driver_cassandra_update,
2530
2531 .escape_blob = driver_cassandra_escape_blob,
2532
2533 .prepared_statement_init = driver_cassandra_prepared_statement_init,
2534 .prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
2535 .statement_init = driver_cassandra_statement_init,
2536 .statement_init_prepared = driver_cassandra_statement_init_prepared,
2537 .statement_abort = driver_cassandra_statement_abort,
2538 .statement_set_timestamp = driver_cassandra_statement_set_timestamp,
2539 .statement_bind_str = driver_cassandra_statement_bind_str,
2540 .statement_bind_binary = driver_cassandra_statement_bind_binary,
2541 .statement_bind_int64 = driver_cassandra_statement_bind_int64,
2542 .statement_query = driver_cassandra_statement_query,
2543 .statement_query_s = driver_cassandra_statement_query_s,
2544 .update_stmt = driver_cassandra_update_stmt,
2545 }
2546 };
2547
2548 const struct sql_result driver_cassandra_result = {
2549 .v = {
2550 driver_cassandra_result_free,
2551 driver_cassandra_result_next_row,
2552 driver_cassandra_result_get_fields_count,
2553 driver_cassandra_result_get_field_name,
2554 driver_cassandra_result_find_field,
2555 driver_cassandra_result_get_field_value,
2556 driver_cassandra_result_get_field_value_binary,
2557 driver_cassandra_result_find_field_value,
2558 driver_cassandra_result_get_values,
2559 driver_cassandra_result_get_error,
2560 driver_cassandra_result_more,
2561 }
2562 };
2563
2564 const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
2565
2566 void driver_cassandra_init(void);
2567 void driver_cassandra_deinit(void);
2568
driver_cassandra_init(void)2569 void driver_cassandra_init(void)
2570 {
2571 sql_driver_register(&driver_cassandra_db);
2572 }
2573
driver_cassandra_deinit(void)2574 void driver_cassandra_deinit(void)
2575 {
2576 sql_driver_unregister(&driver_cassandra_db);
2577 }
2578
2579 #endif
2580