1 /* Copyright (c) 2015-2018 Dovecot authors, see the included COPYING file */
2 
3 #include "lib.h"
4 #include "istream.h"
5 #include "array.h"
6 #include "hostpid.h"
7 #include "hex-binary.h"
8 #include "str.h"
9 #include "ioloop.h"
10 #include "net.h"
11 #include "write-full.h"
12 #include "time-util.h"
13 #include "var-expand.h"
14 #include "safe-memset.h"
15 #include "settings-parser.h"
16 #include "sql-api-private.h"
17 
18 #ifdef BUILD_CASSANDRA
19 #include <stdio.h>
20 #include <fcntl.h>
21 #include <unistd.h>
22 #include <cassandra.h>
23 #include <pthread.h>
24 
25 #define IS_CONNECTED(db) \
26 	((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
27 	 (db)->api.state != SQL_DB_STATE_CONNECTING)
28 
29 #define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
30 #define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
31 #define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
32 
33 #define CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS (5*1000)
34 
35 typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
36 
37 enum cassandra_counter_type {
38 	CASSANDRA_COUNTER_TYPE_QUERY_SENT,
39 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK,
40 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS,
41 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL,
42 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT,
43 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT,
44 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE,
45 	CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER,
46 	CASSANDRA_COUNTER_TYPE_QUERY_SLOW,
47 
48 	CASSANDRA_COUNTER_COUNT
49 };
50 static const char *counter_names[CASSANDRA_COUNTER_COUNT] = {
51 	"sent",
52 	"recv_ok",
53 	"recv_err_no_hosts",
54 	"recv_err_queue_full",
55 	"recv_err_client_timeout",
56 	"recv_err_server_timeout",
57 	"recv_err_server_unavailable",
58 	"recv_err_other",
59 	"slow",
60 };
61 
62 enum cassandra_query_type {
63 	CASSANDRA_QUERY_TYPE_READ,
64 	CASSANDRA_QUERY_TYPE_READ_MORE,
65 	CASSANDRA_QUERY_TYPE_WRITE,
66 	CASSANDRA_QUERY_TYPE_DELETE,
67 
68 	CASSANDRA_QUERY_TYPE_COUNT
69 };
70 
71 static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
72 	"read", "read-more", "write", "delete"
73 };
74 
75 struct cassandra_callback {
76 	unsigned int id;
77 	struct timeout *to;
78 	CassFuture *future;
79 	struct cassandra_db *db;
80 	driver_cassandra_callback_t *callback;
81 	void *context;
82 };
83 
84 struct cassandra_db {
85 	struct sql_db api;
86 
87 	char *hosts, *keyspace, *user, *password;
88 	CassConsistency read_consistency, write_consistency, delete_consistency;
89 	CassConsistency read_fallback_consistency, write_fallback_consistency;
90 	CassConsistency delete_fallback_consistency;
91 	CassLogLevel log_level;
92 	bool debug_queries;
93 	bool latency_aware_routing;
94 	bool init_ssl;
95 	unsigned int protocol_version;
96 	unsigned int num_threads;
97 	unsigned int connect_timeout_msecs, request_timeout_msecs;
98 	unsigned int warn_timeout_msecs;
99 	unsigned int heartbeat_interval_secs, idle_timeout_secs;
100 	unsigned int execution_retry_interval_msecs, execution_retry_times;
101 	unsigned int page_size;
102 	in_port_t port;
103 
104 	CassCluster *cluster;
105 	CassSession *session;
106 	CassTimestampGen *timestamp_gen;
107 	CassSsl *ssl;
108 
109 	int fd_pipe[2];
110 	struct io *io_pipe;
111 	ARRAY(struct cassandra_sql_prepared_statement *) pending_prepares;
112 	ARRAY(struct cassandra_callback *) callbacks;
113 	ARRAY(struct cassandra_result *) results;
114 	unsigned int callback_ids;
115 
116 	char *metrics_path;
117 	char *ssl_ca_file;
118 	char *ssl_cert_file;
119 	char *ssl_private_key_file;
120 	char *ssl_private_key_password;
121 	CassSslVerifyFlags ssl_verify_flags;
122 
123 	struct timeout *to_metrics;
124 	uint64_t counters[CASSANDRA_COUNTER_COUNT];
125 
126 	struct timeval primary_query_last_sent[CASSANDRA_QUERY_TYPE_COUNT];
127 	time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
128 	unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
129 
130 	/* for synchronous queries: */
131 	struct ioloop *ioloop, *orig_ioloop;
132 	struct sql_result *sync_result;
133 
134 	char *error;
135 };
136 
137 struct cassandra_result {
138 	struct sql_result api;
139 	CassStatement *statement;
140 	const CassResult *result;
141 	CassIterator *iterator;
142 	char *query;
143 	char *error;
144 	CassConsistency consistency, fallback_consistency;
145 	enum cassandra_query_type query_type;
146 	struct timeval page0_start_time, start_time, finish_time;
147 	unsigned int row_count, total_row_count, page_num;
148 	cass_int64_t timestamp;
149 
150 	pool_t row_pool;
151 	ARRAY_TYPE(const_string) fields;
152 	ARRAY(size_t) field_sizes;
153 
154 	sql_query_callback_t *callback;
155 	void *context;
156 
157 	bool is_prepared:1;
158 	bool query_sent:1;
159 	bool finished:1;
160 	bool paging_continues:1;
161 };
162 
163 struct cassandra_transaction_context {
164 	struct sql_transaction_context ctx;
165 	int refcount;
166 
167 	sql_commit_callback_t *callback;
168 	void *context;
169 
170 	struct cassandra_sql_statement *stmt;
171 	char *query;
172 	cass_int64_t query_timestamp;
173 	char *error;
174 
175 	bool begin_succeeded:1;
176 	bool begin_failed:1;
177 	bool failed:1;
178 };
179 
180 struct cassandra_sql_arg {
181 	unsigned int column_idx;
182 
183 	char *value_str;
184 	const unsigned char *value_binary;
185 	size_t value_binary_size;
186 	int64_t value_int64;
187 };
188 
189 struct cassandra_sql_statement {
190 	struct sql_statement stmt;
191 
192 	struct cassandra_sql_prepared_statement *prep;
193 	CassStatement *cass_stmt;
194 
195 	ARRAY(struct cassandra_sql_arg) pending_args;
196 	cass_int64_t timestamp;
197 
198 	struct cassandra_result *result;
199 };
200 
201 struct cassandra_sql_prepared_statement {
202 	struct sql_prepared_statement prep_stmt;
203 
204 	/* NULL, until the prepare is asynchronously finished */
205 	const CassPrepared *prepared;
206 	/* statements waiting for prepare to finish */
207 	ARRAY(struct cassandra_sql_statement *) pending_statements;
208 	/* an error here will cause the prepare to be retried on the next
209 	   execution attempt. */
210 	char *error;
211 
212 	bool pending;
213 };
214 
215 extern const struct sql_db driver_cassandra_db;
216 extern const struct sql_result driver_cassandra_result;
217 
218 static struct {
219 	CassConsistency consistency;
220 	const char *name;
221 } cass_consistency_names[] = {
222 	{ CASS_CONSISTENCY_ANY, "any" },
223 	{ CASS_CONSISTENCY_ONE, "one" },
224 	{ CASS_CONSISTENCY_TWO, "two" },
225 	{ CASS_CONSISTENCY_THREE, "three" },
226 	{ CASS_CONSISTENCY_QUORUM, "quorum" },
227 	{ CASS_CONSISTENCY_ALL, "all" },
228 	{ CASS_CONSISTENCY_LOCAL_QUORUM, "local-quorum" },
229 	{ CASS_CONSISTENCY_EACH_QUORUM, "each-quorum" },
230 	{ CASS_CONSISTENCY_SERIAL, "serial" },
231 	{ CASS_CONSISTENCY_LOCAL_SERIAL, "local-serial" },
232 	{ CASS_CONSISTENCY_LOCAL_ONE, "local-one" }
233 };
234 
235 static struct {
236 	CassLogLevel log_level;
237 	const char *name;
238 } cass_log_level_names[] = {
239 	{ CASS_LOG_CRITICAL, "critical" },
240 	{ CASS_LOG_ERROR, "error" },
241 	{ CASS_LOG_WARN, "warn" },
242 	{ CASS_LOG_INFO, "info" },
243 	{ CASS_LOG_DEBUG, "debug" },
244 	{ CASS_LOG_TRACE, "trace" }
245 };
246 
247 static struct event_category event_category_cassandra = {
248 	.parent = &event_category_sql,
249 	.name = "cassandra"
250 };
251 
252 static pthread_t main_thread_id;
253 static bool main_thread_id_set;
254 
255 static void driver_cassandra_prepare_pending(struct cassandra_db *db);
256 static void
257 prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt);
258 static void driver_cassandra_result_send_query(struct cassandra_result *result);
259 static void driver_cassandra_send_queries(struct cassandra_db *db);
260 static void result_finish(struct cassandra_result *result);
261 
log_one_line(const CassLogMessage * message,enum log_type log_type,const char * log_level_str,const char * text,size_t text_len)262 static void log_one_line(const CassLogMessage *message,
263 			 enum log_type log_type, const char *log_level_str,
264 			 const char *text, size_t text_len)
265 {
266 	/* NOTE: We may not be in the main thread. We can't use the
267 	   standard Dovecot functions that may use data stack. That's why
268 	   we can't use i_log_type() in here, but have to re-implement the
269 	   internal logging protocol. Otherwise preserve Cassandra's own
270 	   logging format. */
271 	fprintf(stderr, "\001%c%s %u.%03u %s(%s:%d:%s): %.*s\n",
272 		log_type+1, my_pid,
273 		(unsigned int)(message->time_ms / 1000),
274 		(unsigned int)(message->time_ms % 1000),
275 		log_level_str,
276 		message->file, message->line, message->function,
277 		(int)text_len, text);
278 }
279 
280 static void
driver_cassandra_log_handler(const CassLogMessage * message,void * data ATTR_UNUSED)281 driver_cassandra_log_handler(const CassLogMessage* message,
282 			     void *data ATTR_UNUSED)
283 {
284 	enum log_type log_type = LOG_TYPE_ERROR;
285 	const char *log_level_str = "";
286 
287 	switch (message->severity) {
288 	case CASS_LOG_DISABLED:
289 	case CASS_LOG_LAST_ENTRY:
290 		i_unreached();
291 	case CASS_LOG_CRITICAL:
292 		log_type = LOG_TYPE_PANIC;
293 		break;
294 	case CASS_LOG_ERROR:
295 		log_type = LOG_TYPE_ERROR;
296 		break;
297 	case CASS_LOG_WARN:
298 		log_type = LOG_TYPE_WARNING;
299 		break;
300 	case CASS_LOG_INFO:
301 		log_type = LOG_TYPE_INFO;
302 		break;
303 	case CASS_LOG_TRACE:
304 		log_level_str = "[TRACE] ";
305 		/* fall through */
306 	case CASS_LOG_DEBUG:
307 		log_type = LOG_TYPE_DEBUG;
308 		break;
309 	}
310 
311 	/* Log message may contain LFs, so log each line separately. */
312 	const char *p, *line = message->message;
313 	while ((p = strchr(line, '\n')) != NULL) {
314 		log_one_line(message, log_type, log_level_str, line, p - line);
315 		line = p+1;
316 	}
317 	log_one_line(message, log_type, log_level_str, line, strlen(line));
318 }
319 
driver_cassandra_init_log(void)320 static void driver_cassandra_init_log(void)
321 {
322 	failure_callback_t *fatal_callback, *error_callback;
323 	failure_callback_t *info_callback, *debug_callback;
324 
325 	i_get_failure_handlers(&fatal_callback, &error_callback,
326 			       &info_callback, &debug_callback);
327 	if (i_failure_handler_is_internal(debug_callback)) {
328 		/* Using internal logging protocol. Use it ourself to set log
329 		   levels correctly. */
330 		cass_log_set_callback(driver_cassandra_log_handler, NULL);
331 	}
332 }
333 
consistency_parse(const char * str,CassConsistency * consistency_r)334 static int consistency_parse(const char *str, CassConsistency *consistency_r)
335 {
336 	unsigned int i;
337 
338 	for (i = 0; i < N_ELEMENTS(cass_consistency_names); i++) {
339 		if (strcmp(cass_consistency_names[i].name, str) == 0) {
340 			*consistency_r = cass_consistency_names[i].consistency;
341 			return 0;
342 		}
343 	}
344 	return -1;
345 }
346 
log_level_parse(const char * str,CassLogLevel * log_level_r)347 static int log_level_parse(const char *str, CassLogLevel *log_level_r)
348 {
349 	unsigned int i;
350 
351 	for (i = 0; i < N_ELEMENTS(cass_log_level_names); i++) {
352 		if (strcmp(cass_log_level_names[i].name, str) == 0) {
353 			*log_level_r = cass_log_level_names[i].log_level;
354 			return 0;
355 		}
356 	}
357 	return -1;
358 }
359 
driver_cassandra_set_state(struct cassandra_db * db,enum sql_db_state state)360 static void driver_cassandra_set_state(struct cassandra_db *db,
361 				       enum sql_db_state state)
362 {
363 	/* switch back to original ioloop in case the caller wants to
364 	   add/remove timeouts */
365 	if (db->ioloop != NULL)
366 		io_loop_set_current(db->orig_ioloop);
367 	sql_db_set_state(&db->api, state);
368 	if (db->ioloop != NULL)
369 		io_loop_set_current(db->ioloop);
370 }
371 
driver_cassandra_close(struct cassandra_db * db,const char * error)372 static void driver_cassandra_close(struct cassandra_db *db, const char *error)
373 {
374 	struct cassandra_sql_prepared_statement *prep_stmt;
375 	struct cassandra_result *const *resultp;
376 
377 	io_remove(&db->io_pipe);
378 	if (db->fd_pipe[0] != -1) {
379 		i_close_fd(&db->fd_pipe[0]);
380 		i_close_fd(&db->fd_pipe[1]);
381 	}
382 	driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
383 
384 	array_foreach_elem(&db->pending_prepares, prep_stmt) {
385 		prep_stmt->pending = FALSE;
386 		prep_stmt->error = i_strdup(error);
387 		prepare_finish_pending_statements(prep_stmt);
388 	}
389 	array_clear(&db->pending_prepares);
390 
391 	while (array_count(&db->results) > 0) {
392 		resultp = array_front(&db->results);
393 		if ((*resultp)->error == NULL)
394 			(*resultp)->error = i_strdup(error);
395 		result_finish(*resultp);
396 	}
397 
398 	if (db->ioloop != NULL) {
399 		/* running a sync query, stop it */
400 		io_loop_stop(db->ioloop);
401 	}
402 }
403 
driver_cassandra_log_error(struct cassandra_db * db,CassFuture * future,const char * str)404 static void driver_cassandra_log_error(struct cassandra_db *db,
405 				       CassFuture *future, const char *str)
406 {
407 	const char *message;
408 	size_t size;
409 
410 	cass_future_error_message(future, &message, &size);
411 	e_error(db->api.event, "%s: %.*s", str, (int)size, message);
412 }
413 
414 static struct cassandra_callback *
cassandra_callback_detach(struct cassandra_db * db,unsigned int id)415 cassandra_callback_detach(struct cassandra_db *db, unsigned int id)
416 {
417 	struct cassandra_callback *cb, *const *cbp;
418 
419 	/* usually there are only a few callbacks, so don't bother with using
420 	   a hash table */
421 	array_foreach(&db->callbacks, cbp) {
422 		cb = *cbp;
423 		if (cb->id == id) {
424 			array_delete(&db->callbacks,
425 				     array_foreach_idx(&db->callbacks, cbp), 1);
426 			return cb;
427 		}
428 	}
429 	return NULL;
430 }
431 
cassandra_callback_run(struct cassandra_callback * cb)432 static void cassandra_callback_run(struct cassandra_callback *cb)
433 {
434 	timeout_remove(&cb->to);
435 	cb->callback(cb->future, cb->context);
436 	cass_future_free(cb->future);
437 	i_free(cb);
438 }
439 
driver_cassandra_future_callback(CassFuture * future ATTR_UNUSED,void * context)440 static void driver_cassandra_future_callback(CassFuture *future ATTR_UNUSED,
441 					     void *context)
442 {
443 	struct cassandra_callback *cb = context;
444 
445 	if (pthread_equal(pthread_self(), main_thread_id) != 0) {
446 		/* called immediately from the main thread. */
447 		cassandra_callback_detach(cb->db, cb->id);
448 		cb->to = timeout_add_short(0, cassandra_callback_run, cb);
449 		return;
450 	}
451 
452 	/* this isn't the main thread - communicate with main thread by
453 	   writing the callback id to the pipe. note that we must not use
454 	   almost any dovecot functions here because most of them are using
455 	   data-stack, which isn't thread-safe. especially don't use
456 	   i_error() here. */
457 	if (write_full(cb->db->fd_pipe[1], &cb->id, sizeof(cb->id)) < 0) {
458 		const char *str = t_strdup_printf(
459 			"cassandra: write(pipe) failed: %s\n",
460 			strerror(errno));
461 		(void)write_full(STDERR_FILENO, str, strlen(str));
462 	}
463 }
464 
driver_cassandra_input_id(struct cassandra_db * db,unsigned int id)465 static void driver_cassandra_input_id(struct cassandra_db *db, unsigned int id)
466 {
467 	struct cassandra_callback *cb;
468 
469 	cb = cassandra_callback_detach(db, id);
470 	if (cb == NULL)
471 		i_panic("cassandra: Received unknown ID %u", id);
472 	cassandra_callback_run(cb);
473 }
474 
driver_cassandra_input(struct cassandra_db * db)475 static void driver_cassandra_input(struct cassandra_db *db)
476 {
477 	unsigned int ids[1024];
478 	ssize_t ret;
479 
480 	ret = read(db->fd_pipe[0], ids, sizeof(ids));
481 	if (ret < 0)
482 		e_error(db->api.event, "read(pipe) failed: %m");
483 	else if (ret == 0)
484 		e_error(db->api.event, "read(pipe) failed: EOF");
485 	else if (ret % sizeof(ids[0]) != 0)
486 		e_error(db->api.event, "read(pipe) returned wrong amount of data");
487 	else {
488 		/* success */
489 		unsigned int i, count = ret / sizeof(ids[0]);
490 
491 		for (i = 0; i < count &&
492 			    db->api.state != SQL_DB_STATE_DISCONNECTED; i++)
493 			driver_cassandra_input_id(db, ids[i]);
494 		return;
495 	}
496 	driver_cassandra_close(db, "IPC pipe closed");
497 }
498 
499 static void
driver_cassandra_set_callback(CassFuture * future,struct cassandra_db * db,driver_cassandra_callback_t * callback,void * context)500 driver_cassandra_set_callback(CassFuture *future, struct cassandra_db *db,
501 			      driver_cassandra_callback_t *callback,
502 			      void *context)
503 {
504 	struct cassandra_callback *cb;
505 
506 	i_assert(callback != NULL);
507 
508 	cb = i_new(struct cassandra_callback, 1);
509 	cb->future = future;
510 	cb->callback = callback;
511 	cb->context = context;
512 	cb->db = db;
513 
514 	array_push_back(&db->callbacks, &cb);
515 	cb->id = ++db->callback_ids;
516 	if (cb->id == 0)
517 		cb->id = ++db->callback_ids;
518 
519 	/* NOTE: The callback may be called immediately by this same thread.
520 	   This is checked within the callback. It may also be called at any
521 	   time after this call by another thread. So we must not access "cb"
522 	   again after this call. */
523 	cass_future_set_callback(future, driver_cassandra_future_callback, cb);
524 }
525 
connect_callback(CassFuture * future,void * context)526 static void connect_callback(CassFuture *future, void *context)
527 {
528 	struct cassandra_db *db = context;
529 
530 	if (cass_future_error_code(future) != CASS_OK) {
531 		driver_cassandra_log_error(db, future,
532 					   "Couldn't connect to Cassandra");
533 		driver_cassandra_close(db, "Couldn't connect to Cassandra");
534 		return;
535 	}
536 	driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
537 	if (db->ioloop != NULL) {
538 		/* driver_cassandra_sync_init() waiting for connection to
539 		   finish */
540 		io_loop_stop(db->ioloop);
541 	}
542 	driver_cassandra_prepare_pending(db);
543 	driver_cassandra_send_queries(db);
544 }
545 
driver_cassandra_connect(struct sql_db * _db)546 static int driver_cassandra_connect(struct sql_db *_db)
547 {
548 	struct cassandra_db *db = (struct cassandra_db *)_db;
549 	CassFuture *future;
550 
551 	i_assert(db->api.state == SQL_DB_STATE_DISCONNECTED);
552 
553 	if (pipe(db->fd_pipe) < 0) {
554 		e_error(_db->event, "pipe() failed: %m");
555 		return -1;
556 	}
557 	db->io_pipe = io_add(db->fd_pipe[0], IO_READ,
558 			     driver_cassandra_input, db);
559 	driver_cassandra_set_state(db, SQL_DB_STATE_CONNECTING);
560 
561 	future = cass_session_connect_keyspace(db->session, db->cluster,
562 					       db->keyspace);
563 	driver_cassandra_set_callback(future, db, connect_callback, db);
564 	return 0;
565 }
566 
driver_cassandra_disconnect(struct sql_db * _db)567 static void driver_cassandra_disconnect(struct sql_db *_db)
568 {
569 	struct cassandra_db *db = (struct cassandra_db *)_db;
570 
571 	driver_cassandra_close(db, "Disconnected");
572 }
573 
574 static const char *
driver_cassandra_escape_string(struct sql_db * db ATTR_UNUSED,const char * string)575 driver_cassandra_escape_string(struct sql_db *db ATTR_UNUSED,
576 			       const char *string)
577 {
578 	string_t *escaped;
579 	unsigned int i;
580 
581 	if (strchr(string, '\'') == NULL)
582 		return string;
583 	escaped = t_str_new(strlen(string)+10);
584 	for (i = 0; string[i] != '\0'; i++) {
585 		if (string[i] == '\'')
586 			str_append_c(escaped, '\'');
587 		str_append_c(escaped, string[i]);
588 	}
589 	return str_c(escaped);
590 }
591 
driver_cassandra_parse_connect_string(struct cassandra_db * db,const char * connect_string,const char ** error_r)592 static int driver_cassandra_parse_connect_string(struct cassandra_db *db,
593 						 const char *connect_string,
594 						 const char **error_r)
595 {
596 	const char *const *args, *key, *value, *error;
597 	string_t *hosts = t_str_new(64);
598 	bool read_fallback_set = FALSE, write_fallback_set = FALSE;
599 	bool delete_fallback_set = FALSE;
600 
601 	db->log_level = CASS_LOG_WARN;
602 	db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
603 	db->write_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
604 	db->delete_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
605 	db->connect_timeout_msecs = SQL_CONNECT_TIMEOUT_SECS*1000;
606 	db->request_timeout_msecs = SQL_QUERY_TIMEOUT_SECS*1000;
607 	db->warn_timeout_msecs = CASS_QUERY_DEFAULT_WARN_TIMEOUT_MSECS;
608 
609 	args = t_strsplit_spaces(connect_string, " ");
610 	for (; *args != NULL; args++) {
611 		value = strchr(*args, '=');
612 		if (value == NULL) {
613 			*error_r = t_strdup_printf(
614 				"Missing value in connect string: %s", *args);
615 			return -1;
616 		}
617 		key = t_strdup_until(*args, value++);
618 
619 		if (str_begins(key, "ssl_"))
620 			db->init_ssl = TRUE;
621 
622 		if (strcmp(key, "host") == 0) {
623 			if (str_len(hosts) > 0)
624 				str_append_c(hosts, ',');
625 			str_append(hosts, value);
626 		} else if (strcmp(key, "port") == 0) {
627 			if (net_str2port(value, &db->port) < 0) {
628 				*error_r = t_strdup_printf(
629 					"Invalid port: %s", value);
630 				return -1;
631 			}
632 		} else if (strcmp(key, "dbname") == 0 ||
633 			   strcmp(key, "keyspace") == 0) {
634 			i_free(db->keyspace);
635 			db->keyspace = i_strdup(value);
636 		} else if (strcmp(key, "user") == 0) {
637 			i_free(db->user);
638 			db->user = i_strdup(value);
639 		} else if (strcmp(key, "password") == 0) {
640 			i_free(db->password);
641 			db->password = i_strdup(value);
642 		} else if (strcmp(key, "read_consistency") == 0) {
643 			if (consistency_parse(value, &db->read_consistency) < 0) {
644 				*error_r = t_strdup_printf(
645 					"Unknown read_consistency: %s", value);
646 				return -1;
647 			}
648 		} else if (strcmp(key, "read_fallback_consistency") == 0) {
649 			if (consistency_parse(value, &db->read_fallback_consistency) < 0) {
650 				*error_r = t_strdup_printf(
651 					"Unknown read_fallback_consistency: %s", value);
652 				return -1;
653 			}
654 			read_fallback_set = TRUE;
655 		} else if (strcmp(key, "write_consistency") == 0) {
656 			if (consistency_parse(value,
657 					      &db->write_consistency) < 0) {
658 				*error_r = t_strdup_printf(
659 					"Unknown write_consistency: %s", value);
660 				return -1;
661 			}
662 		} else if (strcmp(key, "write_fallback_consistency") == 0) {
663 			if (consistency_parse(value,
664 					      &db->write_fallback_consistency) < 0) {
665 				*error_r = t_strdup_printf(
666 					"Unknown write_fallback_consistency: %s",
667 					value);
668 				return -1;
669 			}
670 			write_fallback_set = TRUE;
671 		} else if (strcmp(key, "delete_consistency") == 0) {
672 			if (consistency_parse(value,
673 					      &db->delete_consistency) < 0) {
674 				*error_r = t_strdup_printf(
675 					"Unknown delete_consistency: %s", value);
676 				return -1;
677 			}
678 		} else if (strcmp(key, "delete_fallback_consistency") == 0) {
679 			if (consistency_parse(value,
680 					      &db->delete_fallback_consistency) < 0) {
681 				*error_r = t_strdup_printf(
682 					"Unknown delete_fallback_consistency: %s",
683 					value);
684 				return -1;
685 			}
686 			delete_fallback_set = TRUE;
687 		} else if (strcmp(key, "log_level") == 0) {
688 			if (log_level_parse(value, &db->log_level) < 0) {
689 				*error_r = t_strdup_printf(
690 					"Unknown log_level: %s", value);
691 				return -1;
692 			}
693 		} else if (strcmp(key, "debug_queries") == 0) {
694 			db->debug_queries = TRUE;
695 		} else if (strcmp(key, "latency_aware_routing") == 0) {
696 			db->latency_aware_routing = TRUE;
697 		} else if (strcmp(key, "version") == 0) {
698 			if (str_to_uint(value, &db->protocol_version) < 0) {
699 				*error_r = t_strdup_printf(
700 					"Invalid version: %s", value);
701 				return -1;
702 			}
703 		} else if (strcmp(key, "num_threads") == 0) {
704 			if (str_to_uint(value, &db->num_threads) < 0) {
705 				*error_r = t_strdup_printf(
706 					"Invalid num_threads: %s", value);
707 				return -1;
708 			}
709 		} else if (strcmp(key, "heartbeat_interval") == 0) {
710 			if (settings_get_time(value, &db->heartbeat_interval_secs,
711 					      &error) < 0) {
712 				*error_r = t_strdup_printf(
713 					"Invalid heartbeat_interval '%s': %s",
714 					value, error);
715 				return -1;
716 			}
717 		} else if (strcmp(key, "idle_timeout") == 0) {
718 			if (settings_get_time(value, &db->idle_timeout_secs,
719 					      &error) < 0) {
720 				*error_r = t_strdup_printf(
721 					"Invalid idle_timeout '%s': %s",
722 					value, error);
723 				return -1;
724 			}
725 		} else if (strcmp(key, "connect_timeout") == 0) {
726 			if (settings_get_time_msecs(value,
727 						    &db->connect_timeout_msecs,
728 						    &error) < 0) {
729 				*error_r = t_strdup_printf(
730 					"Invalid connect_timeout '%s': %s",
731 					value, error);
732 				return -1;
733 			}
734 		} else if (strcmp(key, "request_timeout") == 0) {
735 			if (settings_get_time_msecs(value,
736 						    &db->request_timeout_msecs,
737 						    &error) < 0) {
738 				*error_r = t_strdup_printf(
739 					"Invalid request_timeout '%s': %s",
740 					value, error);
741 				return -1;
742 			}
743 		} else if (strcmp(key, "warn_timeout") == 0) {
744 			if (settings_get_time_msecs(value,
745 						    &db->warn_timeout_msecs,
746 						    &error) < 0) {
747 				*error_r = t_strdup_printf(
748 					"Invalid warn_timeout '%s': %s",
749 					value, error);
750 				return -1;
751 			}
752 		} else if (strcmp(key, "metrics") == 0) {
753 			i_free(db->metrics_path);
754 			db->metrics_path = i_strdup(value);
755 		} else if (strcmp(key, "execution_retry_interval") == 0) {
756 			if (settings_get_time_msecs(value,
757 						    &db->execution_retry_interval_msecs,
758 						    &error) < 0) {
759 				*error_r = t_strdup_printf(
760 					"Invalid execution_retry_interval '%s': %s",
761 					value, error);
762 				return -1;
763 			}
764 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
765 			*error_r = t_strdup_printf(
766 	"This cassandra version does not support execution_retry_interval");
767 			return -1;
768 #endif
769 		} else if (strcmp(key, "execution_retry_times") == 0) {
770 			if (str_to_uint(value, &db->execution_retry_times) < 0) {
771 				*error_r = t_strdup_printf(
772 					"Invalid execution_retry_times %s",
773 					value);
774 				return -1;
775 			}
776 #ifndef HAVE_CASSANDRA_SPECULATIVE_POLICY
777 			*error_r = t_strdup_printf(
778 	"This cassandra version does not support execution_retry_times");
779 			return -1;
780 #endif
781 		} else if (strcmp(key, "page_size") == 0) {
782 			if (str_to_uint(value, &db->page_size) < 0) {
783 				*error_r = t_strdup_printf(
784 					"Invalid page_size: %s",
785 					value);
786 				return -1;
787 			}
788 		} else if (strcmp(key, "ssl_ca") == 0) {
789 			db->ssl_ca_file = i_strdup(value);
790 		} else if (strcmp(key, "ssl_cert_file") == 0) {
791 			db->ssl_cert_file = i_strdup(value);
792 		} else if (strcmp(key, "ssl_private_key_file") == 0) {
793 			db->ssl_private_key_file = i_strdup(value);
794 		} else if (strcmp(key, "ssl_private_key_password") == 0) {
795 			db->ssl_private_key_password = i_strdup(value);
796 		} else if (strcmp(key, "ssl_verify") == 0) {
797 			if (strcmp(value, "none") == 0) {
798 				db->ssl_verify_flags = CASS_SSL_VERIFY_NONE;
799 			} else if (strcmp(value, "cert") == 0) {
800 				db->ssl_verify_flags = CASS_SSL_VERIFY_PEER_CERT;
801 			} else if (strcmp(value, "cert-ip") == 0) {
802 				db->ssl_verify_flags =
803 					CASS_SSL_VERIFY_PEER_CERT |
804 					CASS_SSL_VERIFY_PEER_IDENTITY;
805 #if HAVE_DECL_CASS_SSL_VERIFY_PEER_IDENTITY_DNS == 1
806 			} else if (strcmp(value, "cert-dns") == 0) {
807 				db->ssl_verify_flags =
808 					CASS_SSL_VERIFY_PEER_CERT |
809 					CASS_SSL_VERIFY_PEER_IDENTITY_DNS;
810 #endif
811 			} else {
812 				*error_r = t_strdup_printf(
813 					"Unsupported ssl_verify flags: '%s'",
814 					value);
815 				return -1;
816 			}
817 		} else {
818 			*error_r = t_strdup_printf(
819 				"Unknown connect string: %s", key);
820 			return -1;
821 		}
822 	}
823 
824 	if (!read_fallback_set)
825 		db->read_fallback_consistency = db->read_consistency;
826 	if (!write_fallback_set)
827 		db->write_fallback_consistency = db->write_consistency;
828 	if (!delete_fallback_set)
829 		db->delete_fallback_consistency = db->delete_consistency;
830 
831 	if (str_len(hosts) == 0) {
832 		*error_r = t_strdup_printf("No hosts given in connect string");
833 		return -1;
834 	}
835 	if (db->keyspace == NULL) {
836 		*error_r = t_strdup_printf("No dbname given in connect string");
837 		return -1;
838 	}
839 
840 	if ((db->ssl_cert_file != NULL && db->ssl_private_key_file == NULL) ||
841 	    (db->ssl_cert_file == NULL && db->ssl_private_key_file != NULL)) {
842 		*error_r = "ssl_cert_file and ssl_private_key_file need to be both set";
843 		return -1;
844 	}
845 
846 	db->hosts = i_strdup(str_c(hosts));
847 	return 0;
848 }
849 
850 static void
driver_cassandra_get_metrics_json(struct cassandra_db * db,string_t * dest)851 driver_cassandra_get_metrics_json(struct cassandra_db *db, string_t *dest)
852 {
853 #define ADD_UINT64(_struct, _field) \
854 	str_printfa(dest, "\""#_field"\": %llu,", \
855 		(unsigned long long)metrics._struct._field);
856 #define ADD_DOUBLE(_struct, _field) \
857 	str_printfa(dest, "\""#_field"\": %02lf,", metrics._struct._field);
858 	CassMetrics metrics;
859 
860 	cass_session_get_metrics(db->session, &metrics);
861 	str_append(dest, "{ \"requests\": {");
862 	ADD_UINT64(requests, min);
863 	ADD_UINT64(requests, max);
864 	ADD_UINT64(requests, mean);
865 	ADD_UINT64(requests, stddev);
866 	ADD_UINT64(requests, median);
867 	ADD_UINT64(requests, percentile_75th);
868 	ADD_UINT64(requests, percentile_95th);
869 	ADD_UINT64(requests, percentile_98th);
870 	ADD_UINT64(requests, percentile_99th);
871 	ADD_UINT64(requests, percentile_999th);
872 	ADD_DOUBLE(requests, mean_rate);
873 	ADD_DOUBLE(requests, one_minute_rate);
874 	ADD_DOUBLE(requests, five_minute_rate);
875 	ADD_DOUBLE(requests, fifteen_minute_rate);
876 	str_truncate(dest, str_len(dest)-1);
877 
878 	str_append(dest, "}, \"stats\": {");
879 	ADD_UINT64(stats, total_connections);
880 	ADD_UINT64(stats, available_connections);
881 	ADD_UINT64(stats, exceeded_pending_requests_water_mark);
882 	ADD_UINT64(stats, exceeded_write_bytes_water_mark);
883 	str_truncate(dest, str_len(dest)-1);
884 
885 	str_append(dest, "}, \"errors\": {");
886 	ADD_UINT64(errors, connection_timeouts);
887 	ADD_UINT64(errors, pending_request_timeouts);
888 	ADD_UINT64(errors, request_timeouts);
889 	str_truncate(dest, str_len(dest)-1);
890 
891 	str_append(dest, "}, \"queries\": {");
892 	for (unsigned int i = 0; i < CASSANDRA_COUNTER_COUNT; i++) {
893 		str_printfa(dest, "\"%s\": %"PRIu64",", counter_names[i],
894 			    db->counters[i]);
895 	}
896 	str_truncate(dest, str_len(dest)-1);
897 	str_append(dest, "}}");
898 }
899 
driver_cassandra_metrics_write(struct cassandra_db * db)900 static void driver_cassandra_metrics_write(struct cassandra_db *db)
901 {
902 	struct var_expand_table tab[] = {
903 		{ '\0', NULL, NULL }
904 	};
905 	string_t *path = t_str_new(64);
906 	string_t *data;
907 	const char *error;
908 	int fd;
909 
910 	if (var_expand(path, db->metrics_path, tab, &error) <= 0) {
911 		e_error(db->api.event, "Failed to expand metrics_path=%s: %s",
912 			db->metrics_path, error);
913 		return;
914 	}
915 
916 	fd = open(str_c(path), O_WRONLY | O_CREAT | O_TRUNC | O_NONBLOCK, 0600);
917 	if (fd == -1) {
918 		e_error(db->api.event, "creat(%s) failed: %m", str_c(path));
919 		return;
920 	}
921 	data = t_str_new(1024);
922 	driver_cassandra_get_metrics_json(db, data);
923 	if (write_full(fd, str_data(data), str_len(data)) < 0)
924 		e_error(db->api.event, "write(%s) failed: %m", str_c(path));
925 	i_close_fd(&fd);
926 }
927 
driver_cassandra_free(struct cassandra_db ** _db)928 static void driver_cassandra_free(struct cassandra_db **_db)
929 {
930 	struct cassandra_db *db = *_db;
931 	*_db = NULL;
932 
933 	event_unref(&db->api.event);
934 	i_free(db->metrics_path);
935 	i_free(db->hosts);
936 	i_free(db->error);
937 	i_free(db->keyspace);
938 	i_free(db->user);
939 	i_free(db->password);
940 	i_free(db->ssl_ca_file);
941 	i_free(db->ssl_cert_file);
942 	i_free(db->ssl_private_key_file);
943 	i_free_and_null(db->ssl_private_key_password);
944 	array_free(&db->api.module_contexts);
945 	if (db->ssl != NULL)
946 		cass_ssl_free(db->ssl);
947 	i_free(db);
948 }
949 
driver_cassandra_init_ssl(struct cassandra_db * db,const char ** error_r)950 static int driver_cassandra_init_ssl(struct cassandra_db *db, const char **error_r)
951 {
952 	buffer_t *buf = t_buffer_create(512);
953 	CassError c_err;
954 
955 	db->ssl = cass_ssl_new();
956 	i_assert(db->ssl != NULL);
957 
958 	if (db->ssl_ca_file != NULL) {
959 		if (buffer_append_full_file(buf, db->ssl_ca_file, SIZE_MAX,
960 					    error_r) < 0)
961 			return -1;
962 		if ((c_err = cass_ssl_add_trusted_cert(db->ssl, str_c(buf))) != CASS_OK) {
963 			*error_r = cass_error_desc(c_err);
964 			return -1;
965 		}
966 	}
967 
968 	if (db->ssl_private_key_file != NULL && db->ssl_cert_file != NULL) {
969 		buffer_set_used_size(buf, 0);
970 		if (buffer_append_full_file(buf, db->ssl_private_key_file,
971 					    SIZE_MAX, error_r) < 0)
972 			return -1;
973 		c_err = cass_ssl_set_private_key(db->ssl, str_c(buf),
974 					         db->ssl_private_key_password);
975 		safe_memset(buffer_get_modifiable_data(buf, NULL), 0, buf->used);
976 		if (c_err != CASS_OK) {
977 			*error_r = cass_error_desc(c_err);
978 			return -1;
979 		}
980 
981 		buffer_set_used_size(buf, 0);
982 		if (buffer_append_full_file(buf, db->ssl_cert_file, SIZE_MAX, error_r) < 0)
983 			return -1;
984 		if ((c_err = cass_ssl_set_cert(db->ssl, str_c(buf))) != CASS_OK) {
985 			*error_r = cass_error_desc(c_err);
986 			return -1;
987 		}
988 	}
989 
990 	cass_ssl_set_verify_flags(db->ssl, db->ssl_verify_flags);
991 
992 	return 0;
993 }
994 
driver_cassandra_init_full_v(const struct sql_settings * set,struct sql_db ** db_r,const char ** error_r)995 static int driver_cassandra_init_full_v(const struct sql_settings *set,
996 					struct sql_db **db_r,
997 					const char **error_r)
998 {
999 	struct cassandra_db *db;
1000 	int ret;
1001 
1002 	db = i_new(struct cassandra_db, 1);
1003 	db->api = driver_cassandra_db;
1004 	db->fd_pipe[0] = db->fd_pipe[1] = -1;
1005 	db->api.event = event_create(set->event_parent);
1006 	event_add_category(db->api.event, &event_category_cassandra);
1007 	event_set_append_log_prefix(db->api.event, "cassandra: ");
1008 
1009 	T_BEGIN {
1010 		ret = driver_cassandra_parse_connect_string(db,
1011 			set->connect_string, error_r);
1012 	} T_END_PASS_STR_IF(ret < 0, error_r);
1013 
1014 	if (ret < 0) {
1015 		driver_cassandra_free(&db);
1016 		return -1;
1017 	}
1018 
1019 	if (db->init_ssl && driver_cassandra_init_ssl(db, error_r) < 0) {
1020 		driver_cassandra_free(&db);
1021 		return -1;
1022 	}
1023 
1024 	driver_cassandra_init_log();
1025 	cass_log_set_level(db->log_level);
1026 	if (db->log_level >= CASS_LOG_DEBUG)
1027 		event_set_forced_debug(db->api.event, TRUE);
1028 
1029 	if (db->protocol_version > 0 && db->protocol_version < 4) {
1030 		/* binding with column indexes requires v4 */
1031 		db->api.v.prepared_statement_init = NULL;
1032 		db->api.v.prepared_statement_deinit = NULL;
1033 		db->api.v.statement_init_prepared = NULL;
1034 	}
1035 
1036 	db->timestamp_gen = cass_timestamp_gen_monotonic_new();
1037 	db->cluster = cass_cluster_new();
1038 
1039 #ifdef HAVE_CASS_CLUSTER_SET_USE_HOSTNAME_RESOLUTION
1040 	if ((db->ssl_verify_flags & CASS_SSL_VERIFY_PEER_IDENTITY_DNS) != 0) {
1041 		CassError c_err;
1042 		if ((c_err = cass_cluster_set_use_hostname_resolution(
1043 				db->cluster, cass_true)) != CASS_OK) {
1044 			*error_r = cass_error_desc(c_err);
1045 			driver_cassandra_free(&db);
1046 			return -1;
1047 		}
1048 	}
1049 #endif
1050 	cass_cluster_set_ssl(db->cluster, db->ssl);
1051 	cass_cluster_set_timestamp_gen(db->cluster, db->timestamp_gen);
1052 	cass_cluster_set_connect_timeout(db->cluster, db->connect_timeout_msecs);
1053 	cass_cluster_set_request_timeout(db->cluster, db->request_timeout_msecs);
1054 	cass_cluster_set_contact_points(db->cluster, db->hosts);
1055 	if (db->user != NULL && db->password != NULL)
1056 		cass_cluster_set_credentials(db->cluster, db->user, db->password);
1057 	if (db->port != 0)
1058 		cass_cluster_set_port(db->cluster, db->port);
1059 	if (db->protocol_version != 0)
1060 		cass_cluster_set_protocol_version(db->cluster, db->protocol_version);
1061 	if (db->num_threads != 0)
1062 		cass_cluster_set_num_threads_io(db->cluster, db->num_threads);
1063 	if (db->latency_aware_routing)
1064 		cass_cluster_set_latency_aware_routing(db->cluster, cass_true);
1065 	if (db->heartbeat_interval_secs != 0)
1066 		cass_cluster_set_connection_heartbeat_interval(db->cluster,
1067 			db->heartbeat_interval_secs);
1068 	if (db->idle_timeout_secs != 0)
1069 		cass_cluster_set_connection_idle_timeout(db->cluster,
1070 			db->idle_timeout_secs);
1071 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
1072 	if (db->execution_retry_times > 0 && db->execution_retry_interval_msecs > 0)
1073 		cass_cluster_set_constant_speculative_execution_policy(
1074 			db->cluster, db->execution_retry_interval_msecs,
1075 			db->execution_retry_times);
1076 #endif
1077 	if (db->ssl != NULL) {
1078 		e_debug(db->api.event, "Enabling TLS for cluster");
1079 		cass_cluster_set_ssl(db->cluster, db->ssl);
1080 	}
1081 	db->session = cass_session_new();
1082 	if (db->metrics_path != NULL)
1083 		db->to_metrics = timeout_add(1000, driver_cassandra_metrics_write,
1084 					     db);
1085 	i_array_init(&db->results, 16);
1086 	i_array_init(&db->callbacks, 16);
1087 	i_array_init(&db->pending_prepares, 16);
1088 	if (!main_thread_id_set) {
1089 		main_thread_id = pthread_self();
1090 		main_thread_id_set = TRUE;
1091 	}
1092 
1093 	*db_r = &db->api;
1094 	return 0;
1095 }
1096 
driver_cassandra_deinit_v(struct sql_db * _db)1097 static void driver_cassandra_deinit_v(struct sql_db *_db)
1098 {
1099 	struct cassandra_db *db = (struct cassandra_db *)_db;
1100 
1101 	driver_cassandra_close(db, "Deinitialized");
1102 
1103 	i_assert(array_count(&db->callbacks) == 0);
1104 	array_free(&db->callbacks);
1105 	i_assert(array_count(&db->results) == 0);
1106 	array_free(&db->results);
1107 	i_assert(array_count(&db->pending_prepares) == 0);
1108 	array_free(&db->pending_prepares);
1109 
1110 	cass_session_free(db->session);
1111 	cass_cluster_free(db->cluster);
1112 	cass_timestamp_gen_free(db->timestamp_gen);
1113 	timeout_remove(&db->to_metrics);
1114 	sql_connection_log_finished(_db);
1115 	driver_cassandra_free(&db);
1116 }
1117 
driver_cassandra_result_unlink(struct cassandra_db * db,struct cassandra_result * result)1118 static void driver_cassandra_result_unlink(struct cassandra_db *db,
1119 					   struct cassandra_result *result)
1120 {
1121 	struct cassandra_result *const *results;
1122 	unsigned int i, count;
1123 
1124 	results = array_get(&db->results, &count);
1125 	for (i = 0; i < count; i++) {
1126 		if (results[i] == result) {
1127 			array_delete(&db->results, i, 1);
1128 			return;
1129 		}
1130 	}
1131 	i_unreached();
1132 }
1133 
driver_cassandra_log_result(struct cassandra_result * result,bool all_pages,long long reply_usecs)1134 static void driver_cassandra_log_result(struct cassandra_result *result,
1135 					bool all_pages, long long reply_usecs)
1136 {
1137 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1138 	struct timeval now;
1139 	unsigned int row_count;
1140 
1141 	i_gettimeofday(&now);
1142 
1143 	string_t *str = t_str_new(128);
1144 	str_printfa(str, "Finished %squery '%s' (",
1145 		    result->is_prepared ? "prepared " : "", result->query);
1146 	if (result->timestamp != 0)
1147 		str_printfa(str, "timestamp=%"PRId64", ", result->timestamp);
1148 	if (all_pages) {
1149 		str_printfa(str, "%u pages in total, ", result->page_num);
1150 		row_count = result->total_row_count;
1151 	} else {
1152 		if (result->page_num > 0 || result->paging_continues)
1153 			str_printfa(str, "page %u, ", result->page_num);
1154 		row_count = result->row_count;
1155 	}
1156 	str_printfa(str, "%u rows, %lld+%lld us): %s", row_count, reply_usecs,
1157 		    timeval_diff_usecs(&now, &result->finish_time),
1158 		    result->error != NULL ? result->error : "success");
1159 
1160 	struct event_passthrough *e =
1161 		sql_query_finished_event(&db->api, result->api.event,
1162 					 result->query, result->error == NULL,
1163 					 NULL);
1164 	if (result->error != NULL)
1165 		e->add_str("error", result->error);
1166 
1167 	struct event *event = e->event();
1168 	if (db->debug_queries)
1169 		event_set_forced_debug(event, TRUE);
1170 	if (reply_usecs/1000 >= db->warn_timeout_msecs) {
1171 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SLOW]++;
1172 		e_warning(event, "%s", str_c(str));
1173 	} else {
1174 		e_debug(event, "%s", str_c(str));
1175 	}
1176 }
1177 
driver_cassandra_result_free(struct sql_result * _result)1178 static void driver_cassandra_result_free(struct sql_result *_result)
1179 {
1180 	struct cassandra_db *db = (struct cassandra_db *)_result->db;
1181 	struct cassandra_result *result = (struct cassandra_result *)_result;
1182 	long long reply_usecs;
1183 
1184 	i_assert(!result->api.callback);
1185 	i_assert(result->callback == NULL);
1186 
1187 	if (_result == db->sync_result)
1188 		db->sync_result = NULL;
1189 
1190 	reply_usecs = timeval_diff_usecs(&result->finish_time,
1191 					 &result->start_time);
1192 	driver_cassandra_log_result(result, FALSE, reply_usecs);
1193 
1194 	if (result->page_num > 0 && !result->paging_continues) {
1195 		/* Multi-page query finishes now. Log a debug/warning summary
1196 		   message about it separate from the per-page messages. */
1197 		reply_usecs = timeval_diff_usecs(&result->finish_time,
1198 						 &result->page0_start_time);
1199 		driver_cassandra_log_result(result, TRUE, reply_usecs);
1200 	}
1201 
1202 	if (result->result != NULL)
1203 		cass_result_free(result->result);
1204 	if (result->iterator != NULL)
1205 		cass_iterator_free(result->iterator);
1206 	if (result->statement != NULL)
1207 		cass_statement_free(result->statement);
1208 	pool_unref(&result->row_pool);
1209 	event_unref(&result->api.event);
1210 	i_free(result->query);
1211 	i_free(result->error);
1212 	i_free(result);
1213 }
1214 
result_finish(struct cassandra_result * result)1215 static void result_finish(struct cassandra_result *result)
1216 {
1217 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1218 	bool free_result = TRUE;
1219 
1220 	result->finished = TRUE;
1221 	result->finish_time = ioloop_timeval;
1222 	driver_cassandra_result_unlink(db, result);
1223 
1224 	i_assert((result->error != NULL) == (result->iterator == NULL));
1225 
1226 	result->api.callback = TRUE;
1227 	T_BEGIN {
1228 		result->callback(&result->api, result->context);
1229 	} T_END;
1230 	result->api.callback = FALSE;
1231 
1232 	free_result = db->sync_result != &result->api;
1233 	if (db->ioloop != NULL)
1234 		io_loop_stop(db->ioloop);
1235 
1236 	i_assert(!free_result || result->api.refcount > 0);
1237 	result->callback = NULL;
1238 	if (free_result)
1239 		sql_result_unref(&result->api);
1240 }
1241 
query_resend_with_fallback(struct cassandra_result * result)1242 static void query_resend_with_fallback(struct cassandra_result *result)
1243 {
1244 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1245 	time_t last_warning =
1246 		ioloop_time - db->last_fallback_warning[result->query_type];
1247 
1248 	if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
1249 		e_warning(db->api.event,
1250 			  "%s - retrying future %s queries with consistency %s (instead of %s)",
1251 			  result->error, cassandra_query_type_names[result->query_type],
1252 			  cass_consistency_string(result->fallback_consistency),
1253 			  cass_consistency_string(result->consistency));
1254 		db->last_fallback_warning[result->query_type] = ioloop_time;
1255 	}
1256 	i_free_and_null(result->error);
1257 	db->fallback_failures[result->query_type]++;
1258 
1259 	result->consistency = result->fallback_consistency;
1260 	driver_cassandra_result_send_query(result);
1261 }
1262 
counters_inc_error(struct cassandra_db * db,CassError error)1263 static void counters_inc_error(struct cassandra_db *db, CassError error)
1264 {
1265 	switch (error) {
1266 	case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1267 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_NO_HOSTS]++;
1268 		break;
1269 	case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1270 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_QUEUE_FULL]++;
1271 		break;
1272 	case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1273 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_CLIENT_TIMEOUT]++;
1274 		break;
1275 	case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1276 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_TIMEOUT]++;
1277 		break;
1278 	case CASS_ERROR_SERVER_UNAVAILABLE:
1279 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_SERVER_UNAVAILABLE]++;
1280 		break;
1281 	default:
1282 		db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_ERR_OTHER]++;
1283 		break;
1284 	}
1285 }
1286 
query_error_want_fallback(CassError error)1287 static bool query_error_want_fallback(CassError error)
1288 {
1289 	switch (error) {
1290 	case CASS_ERROR_LIB_WRITE_ERROR:
1291 	case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1292 		/* Communication problems on client side. Maybe it will work
1293 		   with fallback consistency? */
1294 		return TRUE;
1295 	case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1296 		/* The client library couldn't connect to enough Cassandra
1297 		   nodes. The error message text is the same as for
1298 		   CASS_ERROR_SERVER_UNAVAILABLE. */
1299 		return TRUE;
1300 	case CASS_ERROR_SERVER_SERVER_ERROR:
1301 	case CASS_ERROR_SERVER_OVERLOADED:
1302 	case CASS_ERROR_SERVER_IS_BOOTSTRAPPING:
1303 	case CASS_ERROR_SERVER_READ_TIMEOUT:
1304 	case CASS_ERROR_SERVER_READ_FAILURE:
1305 	case CASS_ERROR_SERVER_WRITE_FAILURE:
1306 		/* Servers are having trouble. Maybe with fallback consistency
1307 		   we can reach non-troubled servers? */
1308 		return TRUE;
1309 	case CASS_ERROR_SERVER_UNAVAILABLE:
1310 		/* Cassandra server knows that there aren't enough nodes
1311 		   available. "All hosts in current policy attempted and were
1312 		   either unavailable or failed". */
1313 		return TRUE;
1314 	case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1315 		/* Cassandra server couldn't reach all the needed nodes.
1316 		   This may be because it hasn't yet detected that the servers
1317 		   are down, or because the servers are just too busy. We'll
1318 		   try the fallback consistency to avoid unnecessary temporary
1319 		   errors. */
1320 		return TRUE;
1321 	default:
1322 		return FALSE;
1323 	}
1324 }
1325 
1326 static enum sql_result_error_type
driver_cassandra_error_is_uncertain(CassError error)1327 driver_cassandra_error_is_uncertain(CassError error)
1328 {
1329 	switch (error) {
1330 	case CASS_ERROR_SERVER_WRITE_FAILURE:
1331 		/* This happens when some of the replicas that were contacted
1332 		 * by the coordinator replied with an error. */
1333 	case CASS_ERROR_SERVER_WRITE_TIMEOUT:
1334 		/* A Cassandra timeout during a write query. */
1335 	case CASS_ERROR_SERVER_UNAVAILABLE:
1336 		/* The coordinator knows there are not enough replicas alive
1337 		 * to perform a query with the requested consistency level. */
1338 	case CASS_ERROR_LIB_REQUEST_TIMED_OUT:
1339 		/* A request sent from the driver has timed out. */
1340 	case CASS_ERROR_LIB_WRITE_ERROR:
1341 		/* A write error occured. */
1342 		return SQL_RESULT_ERROR_TYPE_WRITE_UNCERTAIN;
1343 	default:
1344 		return SQL_RESULT_ERROR_TYPE_UNKNOWN;
1345 	}
1346 }
1347 
query_callback(CassFuture * future,void * context)1348 static void query_callback(CassFuture *future, void *context)
1349 {
1350 	struct cassandra_result *result = context;
1351 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1352 	CassError error = cass_future_error_code(future);
1353 
1354 	if (error != CASS_OK) {
1355 		const char *errmsg;
1356 		size_t errsize;
1357 		int msecs;
1358 
1359 		cass_future_error_message(future, &errmsg, &errsize);
1360 		i_free(result->error);
1361 
1362 		msecs = timeval_diff_msecs(&ioloop_timeval, &result->start_time);
1363 		counters_inc_error(db, error);
1364 		/* Timeouts bring uncertainty whether the query succeeded or
1365 		   not. Also _SERVER_UNAVAILABLE could have actually written
1366 		   enough copies of the data for the query to succeed. */
1367 		result->api.error_type = driver_cassandra_error_is_uncertain(error);
1368 		result->error = i_strdup_printf(
1369 			"Query '%s' failed: %.*s (in %u.%03u secs%s)",
1370 			result->query, (int)errsize, errmsg, msecs/1000, msecs%1000,
1371 			result->page_num == 0 ?
1372 				"" :
1373 				t_strdup_printf(", page %u", result->page_num));
1374 
1375 		if (query_error_want_fallback(error) &&
1376 		    result->fallback_consistency != result->consistency) {
1377 			/* retry with fallback consistency */
1378 			query_resend_with_fallback(result);
1379 			return;
1380 		}
1381 		result_finish(result);
1382 		return;
1383 	}
1384 	db->counters[CASSANDRA_COUNTER_TYPE_QUERY_RECV_OK]++;
1385 
1386 	if (result->fallback_consistency != result->consistency) {
1387 		/* non-fallback query finished successfully. if there had been
1388 		   any fallbacks, reset them. */
1389 		db->fallback_failures[result->query_type] = 0;
1390 	}
1391 
1392 	result->result = cass_future_get_result(future);
1393 	result->iterator = cass_iterator_from_result(result->result);
1394 	result_finish(result);
1395 }
1396 
driver_cassandra_init_statement(struct cassandra_result * result)1397 static void driver_cassandra_init_statement(struct cassandra_result *result)
1398 {
1399 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1400 
1401 	cass_statement_set_consistency(result->statement, result->consistency);
1402 
1403 #ifdef HAVE_CASSANDRA_SPECULATIVE_POLICY
1404 	cass_statement_set_is_idempotent(result->statement, cass_true);
1405 #endif
1406 	if (db->page_size > 0)
1407 		cass_statement_set_paging_size(result->statement, db->page_size);
1408 }
1409 
driver_cassandra_result_send_query(struct cassandra_result * result)1410 static void driver_cassandra_result_send_query(struct cassandra_result *result)
1411 {
1412 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1413 	CassFuture *future;
1414 
1415 	i_assert(result->statement != NULL);
1416 
1417 	db->counters[CASSANDRA_COUNTER_TYPE_QUERY_SENT]++;
1418 	if (result->query_type != CASSANDRA_QUERY_TYPE_READ_MORE)
1419 		driver_cassandra_init_statement(result);
1420 
1421 	future = cass_session_execute(db->session, result->statement);
1422 	driver_cassandra_set_callback(future, db, query_callback, result);
1423 }
1424 
1425 static bool
driver_cassandra_want_fallback_query(struct cassandra_result * result)1426 driver_cassandra_want_fallback_query(struct cassandra_result *result)
1427 {
1428 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1429 	unsigned int failure_count = db->fallback_failures[result->query_type];
1430 	unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
1431 	struct timeval tv;
1432 
1433 	if (failure_count == 0)
1434 		return FALSE;
1435 	/* double the retries every time. */
1436 	for (i = 1; i < failure_count; i++) {
1437 		msecs *= 2;
1438 		if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
1439 			msecs = CASSANDRA_FALLBACK_MAX_RETRY_MSECS;
1440 			break;
1441 		}
1442 	}
1443 	/* If last primary query sent timestamp + msecs is older than current
1444 	   time, we need to retry the primary query. Note that this practically
1445 	   prevents multiple primary queries from being attempted
1446 	   simultaneously, because the caller updates primary_query_last_sent
1447 	   immediately when returning.
1448 
1449 	   The only time when multiple primary queries can be running in
1450 	   parallel is when the earlier query is being slow and hasn't finished
1451 	   early enough. This could even be a wanted feature, since while the
1452 	   first query might have to wait for a timeout, Cassandra could have
1453 	   been fixed in the meantime and the second query finishes
1454 	   successfully. */
1455 	tv = db->primary_query_last_sent[result->query_type];
1456 	timeval_add_msecs(&tv, msecs);
1457 	return timeval_cmp(&ioloop_timeval, &tv) < 0;
1458 }
1459 
driver_cassandra_send_query(struct cassandra_result * result)1460 static int driver_cassandra_send_query(struct cassandra_result *result)
1461 {
1462 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1463 	int ret;
1464 
1465 	if (!SQL_DB_IS_READY(&db->api)) {
1466 		if ((ret = sql_connect(&db->api)) <= 0) {
1467 			if (ret < 0)
1468 				driver_cassandra_close(db,
1469 					"Couldn't connect to Cassandra");
1470 			return ret;
1471 		}
1472 	}
1473 
1474 	if (result->page0_start_time.tv_sec == 0)
1475 		result->page0_start_time = ioloop_timeval;
1476 	result->start_time = ioloop_timeval;
1477 	result->row_pool = pool_alloconly_create("cassandra result", 512);
1478 	switch (result->query_type) {
1479 	case CASSANDRA_QUERY_TYPE_READ:
1480 		result->consistency = db->read_consistency;
1481 		result->fallback_consistency = db->read_fallback_consistency;
1482 		break;
1483 	case CASSANDRA_QUERY_TYPE_READ_MORE:
1484 		/* consistency is already set and we don't want to fallback
1485 		   at this point anymore. */
1486 		result->fallback_consistency = result->consistency;
1487 		break;
1488 	case CASSANDRA_QUERY_TYPE_WRITE:
1489 		result->consistency = db->write_consistency;
1490 		result->fallback_consistency = db->write_fallback_consistency;
1491 		break;
1492 	case CASSANDRA_QUERY_TYPE_DELETE:
1493 		result->consistency = db->delete_consistency;
1494 		result->fallback_consistency = db->delete_fallback_consistency;
1495 		break;
1496 	case CASSANDRA_QUERY_TYPE_COUNT:
1497 		i_unreached();
1498 	}
1499 
1500 	if (driver_cassandra_want_fallback_query(result))
1501 		result->consistency = result->fallback_consistency;
1502 	else
1503 		db->primary_query_last_sent[result->query_type] = ioloop_timeval;
1504 
1505 	driver_cassandra_result_send_query(result);
1506 	result->query_sent = TRUE;
1507 	return 1;
1508 }
1509 
driver_cassandra_send_queries(struct cassandra_db * db)1510 static void driver_cassandra_send_queries(struct cassandra_db *db)
1511 {
1512 	struct cassandra_result *const *results;
1513 	unsigned int i, count;
1514 
1515 	results = array_get(&db->results, &count);
1516 	for (i = 0; i < count; i++) {
1517 		if (!results[i]->query_sent && results[i]->statement != NULL) {
1518 			if (driver_cassandra_send_query(results[i]) <= 0)
1519 				break;
1520 		}
1521 	}
1522 }
1523 
exec_callback(struct sql_result * _result ATTR_UNUSED,void * context ATTR_UNUSED)1524 static void exec_callback(struct sql_result *_result ATTR_UNUSED,
1525 			  void *context ATTR_UNUSED)
1526 {
1527 }
1528 
1529 static struct cassandra_result *
driver_cassandra_query_init(struct cassandra_db * db,const char * query,enum cassandra_query_type query_type,bool is_prepared,sql_query_callback_t * callback,void * context)1530 driver_cassandra_query_init(struct cassandra_db *db, const char *query,
1531 			    enum cassandra_query_type query_type,
1532 			    bool is_prepared,
1533 			    sql_query_callback_t *callback, void *context)
1534 {
1535 	struct cassandra_result *result;
1536 
1537 	result = i_new(struct cassandra_result, 1);
1538 	result->api = driver_cassandra_result;
1539 	result->api.db = &db->api;
1540 	result->api.refcount = 1;
1541 	result->callback = callback;
1542 	result->context = context;
1543 	result->query_type = query_type;
1544 	result->query = i_strdup(query);
1545 	result->is_prepared = is_prepared;
1546 	result->api.event = event_create(db->api.event);
1547 	array_push_back(&db->results, &result);
1548 	return result;
1549 }
1550 
1551 static void
driver_cassandra_query_full(struct sql_db * _db,const char * query,enum cassandra_query_type query_type,sql_query_callback_t * callback,void * context)1552 driver_cassandra_query_full(struct sql_db *_db, const char *query,
1553 			    enum cassandra_query_type query_type,
1554 			    sql_query_callback_t *callback, void *context)
1555 {
1556 	struct cassandra_db *db = (struct cassandra_db *)_db;
1557 	struct cassandra_result *result;
1558 
1559 	result = driver_cassandra_query_init(db, query, query_type, FALSE,
1560 					     callback, context);
1561 	result->statement = cass_statement_new(query, 0);
1562 	(void)driver_cassandra_send_query(result);
1563 }
1564 
driver_cassandra_exec(struct sql_db * db,const char * query)1565 static void driver_cassandra_exec(struct sql_db *db, const char *query)
1566 {
1567 	driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_WRITE,
1568 				    exec_callback, NULL);
1569 }
1570 
driver_cassandra_query(struct sql_db * db,const char * query,sql_query_callback_t * callback,void * context)1571 static void driver_cassandra_query(struct sql_db *db, const char *query,
1572 				   sql_query_callback_t *callback, void *context)
1573 {
1574 	driver_cassandra_query_full(db, query, CASSANDRA_QUERY_TYPE_READ,
1575 				    callback, context);
1576 }
1577 
cassandra_query_s_callback(struct sql_result * result,void * context)1578 static void cassandra_query_s_callback(struct sql_result *result, void *context)
1579 {
1580 	struct cassandra_db *db = context;
1581 
1582 	db->sync_result = result;
1583 }
1584 
driver_cassandra_sync_init(struct cassandra_db * db)1585 static void driver_cassandra_sync_init(struct cassandra_db *db)
1586 {
1587 	if (sql_connect(&db->api) < 0)
1588 		return;
1589 	db->orig_ioloop = current_ioloop;
1590 	db->ioloop = io_loop_create();
1591 	if (IS_CONNECTED(db))
1592 		return;
1593 	i_assert(db->api.state == SQL_DB_STATE_CONNECTING);
1594 
1595 	db->io_pipe = io_loop_move_io(&db->io_pipe);
1596 	/* wait for connecting to finish */
1597 	io_loop_run(db->ioloop);
1598 }
1599 
driver_cassandra_sync_deinit(struct cassandra_db * db)1600 static void driver_cassandra_sync_deinit(struct cassandra_db *db)
1601 {
1602 	if (db->orig_ioloop == NULL)
1603 		return;
1604 	if (db->io_pipe != NULL) {
1605 		io_loop_set_current(db->orig_ioloop);
1606 		db->io_pipe = io_loop_move_io(&db->io_pipe);
1607 		io_loop_set_current(db->ioloop);
1608 	}
1609 	io_loop_destroy(&db->ioloop);
1610 }
1611 
1612 static struct sql_result *
driver_cassandra_sync_query(struct cassandra_db * db,const char * query,enum cassandra_query_type query_type)1613 driver_cassandra_sync_query(struct cassandra_db *db, const char *query,
1614 			    enum cassandra_query_type query_type)
1615 {
1616 	struct sql_result *result;
1617 
1618 	i_assert(db->sync_result == NULL);
1619 
1620 	switch (db->api.state) {
1621 	case SQL_DB_STATE_CONNECTING:
1622 	case SQL_DB_STATE_BUSY:
1623 		i_unreached();
1624 	case SQL_DB_STATE_DISCONNECTED:
1625 		sql_not_connected_result.refcount++;
1626 		return &sql_not_connected_result;
1627 	case SQL_DB_STATE_IDLE:
1628 		break;
1629 	}
1630 
1631 	driver_cassandra_query_full(&db->api, query, query_type,
1632 				    cassandra_query_s_callback, db);
1633 	if (db->sync_result == NULL) {
1634 		db->io_pipe = io_loop_move_io(&db->io_pipe);
1635 		io_loop_run(db->ioloop);
1636 	}
1637 
1638 	result = db->sync_result;
1639 	if (result == &sql_not_connected_result) {
1640 		/* we don't end up in cassandra's free function, so sync_result
1641 		   won't be set to NULL if we don't do it here. */
1642 		db->sync_result = NULL;
1643 	} else if (result == NULL) {
1644 		result = &sql_not_connected_result;
1645 		result->refcount++;
1646 	}
1647 	return result;
1648 }
1649 
1650 static struct sql_result *
driver_cassandra_query_s(struct sql_db * _db,const char * query)1651 driver_cassandra_query_s(struct sql_db *_db, const char *query)
1652 {
1653 	struct cassandra_db *db = (struct cassandra_db *)_db;
1654 	struct sql_result *result;
1655 
1656 	driver_cassandra_sync_init(db);
1657 	result = driver_cassandra_sync_query(db, query,
1658 					     CASSANDRA_QUERY_TYPE_READ);
1659 	driver_cassandra_sync_deinit(db);
1660 	return result;
1661 }
1662 
1663 static int
driver_cassandra_get_value(struct cassandra_result * result,const CassValue * value,const char ** str_r,size_t * len_r)1664 driver_cassandra_get_value(struct cassandra_result *result,
1665 			   const CassValue *value, const char **str_r,
1666 			   size_t *len_r)
1667 {
1668 	const unsigned char *output;
1669 	void *output_dup;
1670 	size_t output_size;
1671 	CassError rc;
1672 	const char *type;
1673 
1674 	if (cass_value_is_null(value) != 0) {
1675 		*str_r = NULL;
1676 		*len_r = 0;
1677 		return 0;
1678 	}
1679 
1680 	switch (cass_data_type_type(cass_value_data_type(value))) {
1681 	case CASS_VALUE_TYPE_INT: {
1682 		cass_int32_t num;
1683 
1684 		rc = cass_value_get_int32(value, &num);
1685 		if (rc == CASS_OK) {
1686 			const char *str = t_strdup_printf("%d", num);
1687 			output_size = strlen(str);
1688 			output = (const void *)str;
1689 		}
1690 		type = "int32";
1691 		break;
1692 	}
1693 	case CASS_VALUE_TYPE_TIMESTAMP:
1694 	case CASS_VALUE_TYPE_BIGINT: {
1695 		cass_int64_t num;
1696 
1697 		rc = cass_value_get_int64(value, &num);
1698 		if (rc == CASS_OK) {
1699 			const char *str = t_strdup_printf("%lld", (long long)num);
1700 			output_size = strlen(str);
1701 			output = (const void *)str;
1702 		}
1703 		type = "int64";
1704 		break;
1705 	}
1706 	default:
1707 		rc = cass_value_get_bytes(value, &output, &output_size);
1708 		type = "bytes";
1709 		break;
1710 	}
1711 	if (rc != CASS_OK) {
1712 		i_free(result->error);
1713 		result->error = i_strdup_printf("Couldn't get value as %s: %s",
1714 						type, cass_error_desc(rc));
1715 		return -1;
1716 	}
1717 	output_dup = p_malloc(result->row_pool, output_size + 1);
1718 	memcpy(output_dup, output, output_size);
1719 	*str_r = output_dup;
1720 	*len_r = output_size;
1721 	return 0;
1722 }
1723 
driver_cassandra_result_next_page(struct cassandra_result * result)1724 static int driver_cassandra_result_next_page(struct cassandra_result *result)
1725 {
1726 	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
1727 
1728 	if (db->page_size == 0) {
1729 		/* no paging */
1730 		return 0;
1731 	}
1732 	if (cass_result_has_more_pages(result->result) == cass_false)
1733 		return 0;
1734 
1735 	/* callers that don't support sql_query_more() will still get a useful
1736 	   error message. */
1737 	i_free(result->error);
1738 	result->error = i_strdup(
1739 		"Paged query has more results, but not supported by the caller");
1740 	return SQL_RESULT_NEXT_MORE;
1741 }
1742 
driver_cassandra_result_next_row(struct sql_result * _result)1743 static int driver_cassandra_result_next_row(struct sql_result *_result)
1744 {
1745 	struct cassandra_result *result = (struct cassandra_result *)_result;
1746 	const CassRow *row;
1747 	const CassValue *value;
1748 	const char *str;
1749 	size_t size;
1750 	unsigned int i;
1751 	int ret = 1;
1752 
1753 	if (result->iterator == NULL)
1754 		return -1;
1755 
1756 	if (cass_iterator_next(result->iterator) == 0)
1757 		return driver_cassandra_result_next_page(result);
1758 	result->row_count++;
1759 	result->total_row_count++;
1760 
1761 	p_clear(result->row_pool);
1762 	p_array_init(&result->fields, result->row_pool, 8);
1763 	p_array_init(&result->field_sizes, result->row_pool, 8);
1764 
1765 	row = cass_iterator_get_row(result->iterator);
1766 	for (i = 0; (value = cass_row_get_column(row, i)) != NULL; i++) {
1767 		if (driver_cassandra_get_value(result, value, &str, &size) < 0) {
1768 			ret = -1;
1769 			break;
1770 		}
1771 		array_push_back(&result->fields, &str);
1772 		array_push_back(&result->field_sizes, &size);
1773 	}
1774 	return ret;
1775 }
1776 
1777 static void
driver_cassandra_result_more(struct sql_result ** _result,bool async,sql_query_callback_t * callback,void * context)1778 driver_cassandra_result_more(struct sql_result **_result, bool async,
1779 			     sql_query_callback_t *callback, void *context)
1780 {
1781 	struct cassandra_db *db = (struct cassandra_db *)(*_result)->db;
1782 	struct cassandra_result *new_result;
1783 	struct cassandra_result *old_result =
1784 		(struct cassandra_result *)*_result;
1785 
1786 	/* Initialize the next page as a new sql_result */
1787 	new_result = driver_cassandra_query_init(db, old_result->query,
1788 						 CASSANDRA_QUERY_TYPE_READ_MORE,
1789 						 old_result->is_prepared,
1790 						 callback, context);
1791 
1792 	/* Preserve the statement and update its paging state */
1793 	new_result->statement = old_result->statement;
1794 	old_result->statement = NULL;
1795 	cass_statement_set_paging_state(new_result->statement,
1796 					old_result->result);
1797 	old_result->paging_continues = TRUE;
1798 	/* The caller did support paging. Clear out the "...not supported by
1799 	   the caller" error text, so it won't be in the debug log output. */
1800 	i_free_and_null(old_result->error);
1801 
1802 	new_result->timestamp = old_result->timestamp;
1803 	new_result->consistency = old_result->consistency;
1804 	new_result->page_num = old_result->page_num + 1;
1805 	new_result->page0_start_time = old_result->page0_start_time;
1806 	new_result->total_row_count = old_result->total_row_count;
1807 
1808 	sql_result_unref(*_result);
1809 	*_result = NULL;
1810 
1811 	if (async)
1812 		(void)driver_cassandra_send_query(new_result);
1813 	else {
1814 		i_assert(db->api.state == SQL_DB_STATE_IDLE);
1815 		driver_cassandra_sync_init(db);
1816 		(void)driver_cassandra_send_query(new_result);
1817 		if (new_result->result == NULL) {
1818 			db->io_pipe = io_loop_move_io(&db->io_pipe);
1819 			io_loop_run(db->ioloop);
1820 		}
1821 		driver_cassandra_sync_deinit(db);
1822 
1823 		callback(&new_result->api, context);
1824 	}
1825 }
1826 
1827 static unsigned int
driver_cassandra_result_get_fields_count(struct sql_result * _result)1828 driver_cassandra_result_get_fields_count(struct sql_result *_result)
1829 {
1830 	struct cassandra_result *result = (struct cassandra_result *)_result;
1831 
1832 	return array_count(&result->fields);
1833 }
1834 
1835 static const char *
driver_cassandra_result_get_field_name(struct sql_result * _result ATTR_UNUSED,unsigned int idx ATTR_UNUSED)1836 driver_cassandra_result_get_field_name(struct sql_result *_result ATTR_UNUSED,
1837 				       unsigned int idx ATTR_UNUSED)
1838 {
1839 	i_unreached();
1840 }
1841 
1842 static int
driver_cassandra_result_find_field(struct sql_result * _result ATTR_UNUSED,const char * field_name ATTR_UNUSED)1843 driver_cassandra_result_find_field(struct sql_result *_result ATTR_UNUSED,
1844 				   const char *field_name ATTR_UNUSED)
1845 {
1846 	i_unreached();
1847 }
1848 
1849 static const char *
driver_cassandra_result_get_field_value(struct sql_result * _result,unsigned int idx)1850 driver_cassandra_result_get_field_value(struct sql_result *_result,
1851 					unsigned int idx)
1852 {
1853 	struct cassandra_result *result = (struct cassandra_result *)_result;
1854 
1855 	return array_idx_elem(&result->fields, idx);
1856 }
1857 
1858 static const unsigned char *
driver_cassandra_result_get_field_value_binary(struct sql_result * _result ATTR_UNUSED,unsigned int idx ATTR_UNUSED,size_t * size_r ATTR_UNUSED)1859 driver_cassandra_result_get_field_value_binary(struct sql_result *_result ATTR_UNUSED,
1860 					       unsigned int idx ATTR_UNUSED,
1861 					       size_t *size_r ATTR_UNUSED)
1862 {
1863 	struct cassandra_result *result = (struct cassandra_result *)_result;
1864 	const char *str;
1865 	const size_t *sizep;
1866 
1867 	str = array_idx_elem(&result->fields, idx);
1868 	sizep = array_idx(&result->field_sizes, idx);
1869 	*size_r = *sizep;
1870 	return (const void *)str;
1871 }
1872 
1873 static const char *
driver_cassandra_result_find_field_value(struct sql_result * result ATTR_UNUSED,const char * field_name ATTR_UNUSED)1874 driver_cassandra_result_find_field_value(struct sql_result *result ATTR_UNUSED,
1875 					 const char *field_name ATTR_UNUSED)
1876 {
1877 	i_unreached();
1878 }
1879 
1880 static const char *const *
driver_cassandra_result_get_values(struct sql_result * _result)1881 driver_cassandra_result_get_values(struct sql_result *_result)
1882 {
1883 	struct cassandra_result *result = (struct cassandra_result *)_result;
1884 
1885 	return array_front(&result->fields);
1886 }
1887 
driver_cassandra_result_get_error(struct sql_result * _result)1888 static const char *driver_cassandra_result_get_error(struct sql_result *_result)
1889 {
1890 	struct cassandra_result *result = (struct cassandra_result *)_result;
1891 
1892 	if (result->error != NULL)
1893 		return result->error;
1894 	return "FIXME";
1895 }
1896 
1897 static struct sql_transaction_context *
driver_cassandra_transaction_begin(struct sql_db * db)1898 driver_cassandra_transaction_begin(struct sql_db *db)
1899 {
1900 	struct cassandra_transaction_context *ctx;
1901 
1902 	ctx = i_new(struct cassandra_transaction_context, 1);
1903 	ctx->ctx.db = db;
1904 	ctx->ctx.event = event_create(db->event);
1905 	ctx->refcount = 1;
1906 	return &ctx->ctx;
1907 }
1908 
1909 static void
driver_cassandra_transaction_unref(struct cassandra_transaction_context ** _ctx)1910 driver_cassandra_transaction_unref(struct cassandra_transaction_context **_ctx)
1911 {
1912 	struct cassandra_transaction_context *ctx = *_ctx;
1913 
1914 	*_ctx = NULL;
1915 	i_assert(ctx->refcount > 0);
1916 	if (--ctx->refcount > 0)
1917 		return;
1918 
1919 	event_unref(&ctx->ctx.event);
1920 	i_free(ctx->query);
1921 	i_free(ctx->error);
1922 	i_free(ctx);
1923 }
1924 
1925 static void
transaction_set_failed(struct cassandra_transaction_context * ctx,const char * error)1926 transaction_set_failed(struct cassandra_transaction_context *ctx,
1927 		       const char *error)
1928 {
1929 	if (ctx->failed) {
1930 		i_assert(ctx->error != NULL);
1931 	} else {
1932 		i_assert(ctx->error == NULL);
1933 		ctx->failed = TRUE;
1934 		ctx->error = i_strdup(error);
1935 	}
1936 }
1937 
1938 static void
transaction_commit_callback(struct sql_result * result,void * context)1939 transaction_commit_callback(struct sql_result *result, void *context)
1940 {
1941 	struct cassandra_transaction_context *ctx = context;
1942 	struct sql_commit_result commit_result;
1943 
1944 	i_zero(&commit_result);
1945 	if (sql_result_next_row(result) < 0) {
1946 		commit_result.error = sql_result_get_error(result);
1947 		commit_result.error_type = sql_result_get_error_type(result);
1948 		e_debug(sql_transaction_finished_event(&ctx->ctx)->
1949 			add_str("error", commit_result.error)->event(),
1950 			"Transaction failed");
1951 	} else {
1952 		e_debug(sql_transaction_finished_event(&ctx->ctx)->event(),
1953 			"Transaction committed");
1954 	}
1955 	ctx->callback(&commit_result, ctx->context);
1956 	driver_cassandra_transaction_unref(&ctx);
1957 }
1958 
1959 static void
driver_cassandra_transaction_commit(struct sql_transaction_context * _ctx,sql_commit_callback_t * callback,void * context)1960 driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
1961 				    sql_commit_callback_t *callback, void *context)
1962 {
1963 	struct cassandra_transaction_context *ctx =
1964 		(struct cassandra_transaction_context *)_ctx;
1965 	struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
1966 	enum cassandra_query_type query_type;
1967 	struct sql_commit_result result;
1968 
1969 	i_zero(&result);
1970 	ctx->callback = callback;
1971 	ctx->context = context;
1972 
1973 	if (ctx->failed || (ctx->query == NULL && ctx->stmt == NULL)) {
1974 		if (ctx->failed)
1975 			result.error = ctx->error;
1976 
1977 		e_debug(sql_transaction_finished_event(_ctx)->
1978 			add_str("error", "Rolled back")->event(),
1979 			"Transaction rolled back");
1980 		callback(&result, context);
1981 		driver_cassandra_transaction_unref(&ctx);
1982 		return;
1983 	}
1984 
1985 	/* just a single query, send it */
1986 	const char *query = ctx->query != NULL ?
1987 		ctx->query : sql_statement_get_query(&ctx->stmt->stmt);
1988 	if (strncasecmp(query, "DELETE ", 7) == 0)
1989 		query_type = CASSANDRA_QUERY_TYPE_DELETE;
1990 	else
1991 		query_type = CASSANDRA_QUERY_TYPE_WRITE;
1992 
1993 	if (ctx->query != NULL) {
1994 		struct cassandra_result *cass_result;
1995 
1996 		cass_result = driver_cassandra_query_init(db, query, query_type,
1997 			FALSE, transaction_commit_callback, ctx);
1998 		cass_result->statement = cass_statement_new(query, 0);
1999 		if (ctx->query_timestamp != 0) {
2000 			cass_result->timestamp = ctx->query_timestamp;
2001 			cass_statement_set_timestamp(cass_result->statement,
2002 						     ctx->query_timestamp);
2003 		}
2004 		(void)driver_cassandra_send_query(cass_result);
2005 	} else {
2006 		ctx->stmt->result =
2007 			driver_cassandra_query_init(db, query, query_type, TRUE,
2008 				transaction_commit_callback, ctx);
2009 		if (ctx->stmt->cass_stmt == NULL) {
2010 			/* wait for prepare to finish */
2011 		} else {
2012 			ctx->stmt->result->statement = ctx->stmt->cass_stmt;
2013 			ctx->stmt->result->timestamp = ctx->stmt->timestamp;
2014 			(void)driver_cassandra_send_query(ctx->stmt->result);
2015 			pool_unref(&ctx->stmt->stmt.pool);
2016 		}
2017 	}
2018 }
2019 
2020 static void
driver_cassandra_try_commit_s(struct cassandra_transaction_context * ctx)2021 driver_cassandra_try_commit_s(struct cassandra_transaction_context *ctx)
2022 {
2023 	struct sql_transaction_context *_ctx = &ctx->ctx;
2024 	struct cassandra_db *db = (struct cassandra_db *)_ctx->db;
2025 	struct sql_result *result = NULL;
2026 	enum cassandra_query_type query_type;
2027 
2028 	/* just a single query, send it */
2029 	if (strncasecmp(ctx->query, "DELETE ", 7) == 0)
2030 		query_type = CASSANDRA_QUERY_TYPE_DELETE;
2031 	else
2032 		query_type = CASSANDRA_QUERY_TYPE_WRITE;
2033 	driver_cassandra_sync_init(db);
2034 	result = driver_cassandra_sync_query(db, ctx->query, query_type);
2035 	driver_cassandra_sync_deinit(db);
2036 
2037 	if (sql_result_next_row(result) < 0)
2038 		transaction_set_failed(ctx, sql_result_get_error(result));
2039 	sql_result_unref(result);
2040 }
2041 
2042 static int
driver_cassandra_transaction_commit_s(struct sql_transaction_context * _ctx,const char ** error_r)2043 driver_cassandra_transaction_commit_s(struct sql_transaction_context *_ctx,
2044 				      const char **error_r)
2045 {
2046 	struct cassandra_transaction_context *ctx =
2047 		(struct cassandra_transaction_context *)_ctx;
2048 
2049 	if (ctx->stmt != NULL) {
2050 		/* nothing should be using this - don't bother implementing */
2051 		i_panic("cassandra: sql_transaction_commit_s() not supported for prepared statements");
2052 	}
2053 
2054 	if (ctx->query != NULL && !ctx->failed)
2055 		driver_cassandra_try_commit_s(ctx);
2056 	*error_r = t_strdup(ctx->error);
2057 
2058 	i_assert(ctx->refcount == 1);
2059 	i_assert((*error_r != NULL) == ctx->failed);
2060 	driver_cassandra_transaction_unref(&ctx);
2061 	return *error_r == NULL ? 0 : -1;
2062 }
2063 
2064 static void
driver_cassandra_transaction_rollback(struct sql_transaction_context * _ctx)2065 driver_cassandra_transaction_rollback(struct sql_transaction_context *_ctx)
2066 {
2067 	struct cassandra_transaction_context *ctx =
2068 		(struct cassandra_transaction_context *)_ctx;
2069 
2070 	i_assert(ctx->refcount == 1);
2071 	driver_cassandra_transaction_unref(&ctx);
2072 }
2073 
2074 static void
driver_cassandra_update(struct sql_transaction_context * _ctx,const char * query,unsigned int * affected_rows)2075 driver_cassandra_update(struct sql_transaction_context *_ctx, const char *query,
2076 			unsigned int *affected_rows)
2077 {
2078 	struct cassandra_transaction_context *ctx =
2079 		(struct cassandra_transaction_context *)_ctx;
2080 
2081 	i_assert(affected_rows == NULL);
2082 
2083 	if (ctx->query != NULL || ctx->stmt != NULL) {
2084 		transaction_set_failed(ctx, "Multiple changes in transaction not supported");
2085 		return;
2086 	}
2087 	ctx->query = i_strdup(query);
2088 }
2089 
2090 static const char *
driver_cassandra_escape_blob(struct sql_db * _db ATTR_UNUSED,const unsigned char * data,size_t size)2091 driver_cassandra_escape_blob(struct sql_db *_db ATTR_UNUSED,
2092 			     const unsigned char *data, size_t size)
2093 {
2094 	string_t *str = t_str_new(128);
2095 
2096 	str_append(str, "0x");
2097 	binary_to_hex_append(str, data, size);
2098 	return str_c(str);
2099 }
2100 
2101 static CassError
driver_cassandra_bind_int(struct cassandra_sql_statement * stmt,unsigned int column_idx,int64_t value)2102 driver_cassandra_bind_int(struct cassandra_sql_statement *stmt,
2103 			  unsigned int column_idx, int64_t value)
2104 {
2105 	const CassDataType *data_type;
2106 	CassValueType value_type;
2107 
2108 	i_assert(stmt->prep != NULL);
2109 
2110 	/* statements require exactly correct value type */
2111 	data_type = cass_prepared_parameter_data_type(stmt->prep->prepared,
2112 						      column_idx);
2113 	value_type = cass_data_type_type(data_type);
2114 
2115 	switch (value_type) {
2116 	case CASS_VALUE_TYPE_INT:
2117 		if (value < INT32_MIN || value > INT32_MAX)
2118 			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2119 		return cass_statement_bind_int32(stmt->cass_stmt, column_idx,
2120 						 value);
2121 	case CASS_VALUE_TYPE_TIMESTAMP:
2122 	case CASS_VALUE_TYPE_BIGINT:
2123 		return cass_statement_bind_int64(stmt->cass_stmt, column_idx,
2124 						 value);
2125 	case CASS_VALUE_TYPE_SMALL_INT:
2126 		if (value < INT16_MIN || value > INT16_MAX)
2127 			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2128 		return cass_statement_bind_int16(stmt->cass_stmt, column_idx,
2129 						 value);
2130 	case CASS_VALUE_TYPE_TINY_INT:
2131 		if (value < INT8_MIN || value > INT8_MAX)
2132 			return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2133 		return cass_statement_bind_int8(stmt->cass_stmt, column_idx,
2134 						value);
2135 	default:
2136 		return CASS_ERROR_LIB_INVALID_VALUE_TYPE;
2137 	}
2138 }
2139 
prepare_finish_arg(struct cassandra_sql_statement * stmt,const struct cassandra_sql_arg * arg)2140 static void prepare_finish_arg(struct cassandra_sql_statement *stmt,
2141 			       const struct cassandra_sql_arg *arg)
2142 {
2143 	CassError rc;
2144 
2145 	if (arg->value_str != NULL) {
2146 		rc = cass_statement_bind_string(stmt->cass_stmt, arg->column_idx,
2147 						arg->value_str);
2148 	} else if (arg->value_binary != NULL) {
2149 		rc = cass_statement_bind_bytes(stmt->cass_stmt, arg->column_idx,
2150 					       arg->value_binary,
2151 					       arg->value_binary_size);
2152 	} else {
2153 		rc = driver_cassandra_bind_int(stmt, arg->column_idx,
2154 					       arg->value_int64);
2155 	}
2156 	if (rc != CASS_OK) {
2157 		e_error(stmt->stmt.db->event,
2158 			"Statement '%s': Failed to bind column %u: %s",
2159 			stmt->stmt.query_template, arg->column_idx,
2160 			cass_error_desc(rc));
2161 	}
2162 }
2163 
prepare_finish_statement(struct cassandra_sql_statement * stmt)2164 static void prepare_finish_statement(struct cassandra_sql_statement *stmt)
2165 {
2166 	const struct cassandra_sql_arg *arg;
2167 
2168 	if (stmt->prep->prepared == NULL) {
2169 		i_assert(stmt->prep->error != NULL);
2170 
2171 		if (stmt->result != NULL) {
2172 			stmt->result->error = i_strdup(stmt->prep->error);
2173 			result_finish(stmt->result);
2174 		}
2175 		pool_unref(&stmt->stmt.pool);
2176 		return;
2177 	}
2178 	stmt->cass_stmt = cass_prepared_bind(stmt->prep->prepared);
2179 
2180 	if (stmt->timestamp != 0)
2181 		cass_statement_set_timestamp(stmt->cass_stmt, stmt->timestamp);
2182 
2183 	if (array_is_created(&stmt->pending_args)) {
2184 		array_foreach(&stmt->pending_args, arg)
2185 			prepare_finish_arg(stmt, arg);
2186 	}
2187 	if (stmt->result != NULL) {
2188 		stmt->result->statement = stmt->cass_stmt;
2189 		stmt->result->timestamp = stmt->timestamp;
2190 		(void)driver_cassandra_send_query(stmt->result);
2191 		pool_unref(&stmt->stmt.pool);
2192 	}
2193 }
2194 
2195 static void
prepare_finish_pending_statements(struct cassandra_sql_prepared_statement * prep_stmt)2196 prepare_finish_pending_statements(struct cassandra_sql_prepared_statement *prep_stmt)
2197 {
2198 	struct cassandra_sql_statement *stmt;
2199 
2200 	array_foreach_elem(&prep_stmt->pending_statements, stmt)
2201 		prepare_finish_statement(stmt);
2202 	array_clear(&prep_stmt->pending_statements);
2203 }
2204 
prepare_callback(CassFuture * future,void * context)2205 static void prepare_callback(CassFuture *future, void *context)
2206 {
2207 	struct cassandra_sql_prepared_statement *prep_stmt = context;
2208 	CassError error = cass_future_error_code(future);
2209 
2210 	if (error != CASS_OK) {
2211 		const char *errmsg;
2212 		size_t errsize;
2213 
2214 		cass_future_error_message(future, &errmsg, &errsize);
2215 		i_free(prep_stmt->error);
2216 		prep_stmt->error = i_strndup(errmsg, errsize);
2217 	} else {
2218 		prep_stmt->prepared = cass_future_get_prepared(future);
2219 	}
2220 
2221 	prepare_finish_pending_statements(prep_stmt);
2222 }
2223 
prepare_start(struct cassandra_sql_prepared_statement * prep_stmt)2224 static void prepare_start(struct cassandra_sql_prepared_statement *prep_stmt)
2225 {
2226 	struct cassandra_db *db = (struct cassandra_db *)prep_stmt->prep_stmt.db;
2227 	CassFuture *future;
2228 
2229 	if (!SQL_DB_IS_READY(&db->api)) {
2230 		if (!prep_stmt->pending) {
2231 			prep_stmt->pending = TRUE;
2232 			array_push_back(&db->pending_prepares, &prep_stmt);
2233 
2234 			if (sql_connect(&db->api) < 0)
2235 				i_unreached();
2236 		}
2237 		return;
2238 	}
2239 
2240 	/* clear the current error in case we're retrying */
2241 	i_free_and_null(prep_stmt->error);
2242 
2243 	future = cass_session_prepare(db->session,
2244 				      prep_stmt->prep_stmt.query_template);
2245 	driver_cassandra_set_callback(future, db, prepare_callback, prep_stmt);
2246 }
2247 
driver_cassandra_prepare_pending(struct cassandra_db * db)2248 static void driver_cassandra_prepare_pending(struct cassandra_db *db)
2249 {
2250 	struct cassandra_sql_prepared_statement *prep_stmt;
2251 
2252 	i_assert(SQL_DB_IS_READY(&db->api));
2253 
2254 	array_foreach_elem(&db->pending_prepares, prep_stmt) {
2255 		prep_stmt->pending = FALSE;
2256 		prepare_start(prep_stmt);
2257 	}
2258 	array_clear(&db->pending_prepares);
2259 }
2260 
2261 static struct sql_prepared_statement *
driver_cassandra_prepared_statement_init(struct sql_db * db,const char * query_template)2262 driver_cassandra_prepared_statement_init(struct sql_db *db,
2263 					 const char *query_template)
2264 {
2265 	struct cassandra_sql_prepared_statement *prep_stmt =
2266 		i_new(struct cassandra_sql_prepared_statement, 1);
2267 	prep_stmt->prep_stmt.db = db;
2268 	prep_stmt->prep_stmt.refcount = 1;
2269 	prep_stmt->prep_stmt.query_template = i_strdup(query_template);
2270 	i_array_init(&prep_stmt->pending_statements, 4);
2271 	prepare_start(prep_stmt);
2272 	return &prep_stmt->prep_stmt;
2273 }
2274 
2275 static void
driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement * _prep_stmt)2276 driver_cassandra_prepared_statement_deinit(struct sql_prepared_statement *_prep_stmt)
2277 {
2278 	struct cassandra_sql_prepared_statement *prep_stmt =
2279 		(struct cassandra_sql_prepared_statement *)_prep_stmt;
2280 
2281 	i_assert(array_count(&prep_stmt->pending_statements) == 0);
2282 	if (prep_stmt->prepared != NULL)
2283 		cass_prepared_free(prep_stmt->prepared);
2284 	array_free(&prep_stmt->pending_statements);
2285 	i_free(prep_stmt->error);
2286 	i_free(prep_stmt->prep_stmt.query_template);
2287 	i_free(prep_stmt);
2288 }
2289 
2290 static struct sql_statement *
driver_cassandra_statement_init(struct sql_db * db ATTR_UNUSED,const char * query_template ATTR_UNUSED)2291 driver_cassandra_statement_init(struct sql_db *db ATTR_UNUSED,
2292 				const char *query_template ATTR_UNUSED)
2293 {
2294 	pool_t pool = pool_alloconly_create("cassandra sql statement", 1024);
2295 	struct cassandra_sql_statement *stmt =
2296 		p_new(pool, struct cassandra_sql_statement, 1);
2297 	stmt->stmt.pool = pool;
2298 	return &stmt->stmt;
2299 }
2300 
2301 static struct sql_statement *
driver_cassandra_statement_init_prepared(struct sql_prepared_statement * _prep_stmt)2302 driver_cassandra_statement_init_prepared(struct sql_prepared_statement *_prep_stmt)
2303 {
2304 	struct cassandra_sql_prepared_statement *prep_stmt =
2305 		(struct cassandra_sql_prepared_statement *)_prep_stmt;
2306 	pool_t pool = pool_alloconly_create("cassandra prepared sql statement", 1024);
2307 	struct cassandra_sql_statement *stmt =
2308 		p_new(pool, struct cassandra_sql_statement, 1);
2309 
2310 	stmt->stmt.pool = pool;
2311 	stmt->stmt.query_template =
2312 		p_strdup(stmt->stmt.pool, prep_stmt->prep_stmt.query_template);
2313 	stmt->prep = prep_stmt;
2314 
2315 	if (prep_stmt->prepared != NULL) {
2316 		/* statement is already prepared. we can use it immediately. */
2317 		stmt->cass_stmt = cass_prepared_bind(prep_stmt->prepared);
2318 	} else {
2319 		if (prep_stmt->error != NULL)
2320 			prepare_start(prep_stmt);
2321 		/* need to wait until prepare is finished */
2322 		array_push_back(&prep_stmt->pending_statements, &stmt);
2323 	}
2324 	return &stmt->stmt;
2325 }
2326 
2327 static void
driver_cassandra_statement_abort(struct sql_statement * _stmt)2328 driver_cassandra_statement_abort(struct sql_statement *_stmt)
2329 {
2330 	struct cassandra_sql_statement *stmt =
2331 		(struct cassandra_sql_statement *)_stmt;
2332 
2333 	if (stmt->cass_stmt != NULL)
2334 		cass_statement_free(stmt->cass_stmt);
2335 }
2336 
2337 static void
driver_cassandra_statement_set_timestamp(struct sql_statement * _stmt,const struct timespec * ts)2338 driver_cassandra_statement_set_timestamp(struct sql_statement *_stmt,
2339 					 const struct timespec *ts)
2340 {
2341 	struct cassandra_sql_statement *stmt =
2342 		(struct cassandra_sql_statement *)_stmt;
2343 	cass_int64_t ts_usecs =
2344 		(cass_int64_t)ts->tv_sec * 1000000ULL +
2345 		ts->tv_nsec / 1000;
2346 
2347 	i_assert(stmt->result == NULL);
2348 
2349 	if (stmt->cass_stmt != NULL)
2350 		cass_statement_set_timestamp(stmt->cass_stmt, ts_usecs);
2351 	stmt->timestamp = ts_usecs;
2352 }
2353 
2354 static struct cassandra_sql_arg *
driver_cassandra_add_pending_arg(struct cassandra_sql_statement * stmt,unsigned int column_idx)2355 driver_cassandra_add_pending_arg(struct cassandra_sql_statement *stmt,
2356 				 unsigned int column_idx)
2357 {
2358 	struct cassandra_sql_arg *arg;
2359 
2360 	if (!array_is_created(&stmt->pending_args))
2361 		p_array_init(&stmt->pending_args, stmt->stmt.pool, 8);
2362 	arg = array_append_space(&stmt->pending_args);
2363 	arg->column_idx = column_idx;
2364 	return arg;
2365 }
2366 
2367 static void
driver_cassandra_statement_bind_str(struct sql_statement * _stmt,unsigned int column_idx,const char * value)2368 driver_cassandra_statement_bind_str(struct sql_statement *_stmt,
2369 				    unsigned int column_idx,
2370 				    const char *value)
2371 {
2372 	struct cassandra_sql_statement *stmt =
2373 		(struct cassandra_sql_statement *)_stmt;
2374 	if (stmt->cass_stmt != NULL)
2375 		cass_statement_bind_string(stmt->cass_stmt, column_idx, value);
2376 	else if (stmt->prep != NULL) {
2377 		struct cassandra_sql_arg *arg =
2378 			driver_cassandra_add_pending_arg(stmt, column_idx);
2379 		arg->value_str = p_strdup(_stmt->pool, value);
2380 	}
2381 }
2382 
2383 static void
driver_cassandra_statement_bind_binary(struct sql_statement * _stmt,unsigned int column_idx,const void * value,size_t value_size)2384 driver_cassandra_statement_bind_binary(struct sql_statement *_stmt,
2385 				       unsigned int column_idx,
2386 				       const void *value, size_t value_size)
2387 {
2388 	struct cassandra_sql_statement *stmt =
2389 		(struct cassandra_sql_statement *)_stmt;
2390 
2391 	if (stmt->cass_stmt != NULL) {
2392 		cass_statement_bind_bytes(stmt->cass_stmt, column_idx,
2393 					  value, value_size);
2394 	} else if (stmt->prep != NULL) {
2395 		struct cassandra_sql_arg *arg =
2396 			driver_cassandra_add_pending_arg(stmt, column_idx);
2397 		arg->value_binary = value_size == 0 ? &uchar_nul :
2398 			p_memdup(_stmt->pool, value, value_size);
2399 		arg->value_binary_size = value_size;
2400 	}
2401 }
2402 
2403 static void
driver_cassandra_statement_bind_int64(struct sql_statement * _stmt,unsigned int column_idx,int64_t value)2404 driver_cassandra_statement_bind_int64(struct sql_statement *_stmt,
2405 				      unsigned int column_idx, int64_t value)
2406 {
2407 	struct cassandra_sql_statement *stmt =
2408 		(struct cassandra_sql_statement *)_stmt;
2409 
2410 	if (stmt->cass_stmt != NULL)
2411 		driver_cassandra_bind_int(stmt, column_idx, value);
2412 	else if (stmt->prep != NULL) {
2413 		struct cassandra_sql_arg *arg =
2414 			driver_cassandra_add_pending_arg(stmt, column_idx);
2415 		arg->value_int64 = value;
2416 	}
2417 }
2418 
2419 static void
driver_cassandra_statement_query(struct sql_statement * _stmt,sql_query_callback_t * callback,void * context)2420 driver_cassandra_statement_query(struct sql_statement *_stmt,
2421 				 sql_query_callback_t *callback, void *context)
2422 {
2423 	struct cassandra_sql_statement *stmt =
2424 		(struct cassandra_sql_statement *)_stmt;
2425 	struct cassandra_db *db = (struct cassandra_db *)_stmt->db;
2426 	const char *query = sql_statement_get_query(_stmt);
2427 	bool is_prepared = stmt->cass_stmt != NULL || stmt->prep != NULL;
2428 
2429 	stmt->result = driver_cassandra_query_init(db, query,
2430 						   CASSANDRA_QUERY_TYPE_READ,
2431 						   is_prepared,
2432 						   callback, context);
2433 	if (stmt->cass_stmt != NULL) {
2434 		stmt->result->statement = stmt->cass_stmt;
2435 		stmt->result->timestamp = stmt->timestamp;
2436 	} else if (stmt->prep != NULL) {
2437 		/* wait for prepare to finish */
2438 		return;
2439 	} else {
2440 		stmt->result->statement = cass_statement_new(query, 0);
2441 		stmt->result->timestamp = stmt->timestamp;
2442 		if (stmt->timestamp != 0) {
2443 			cass_statement_set_timestamp(stmt->result->statement,
2444 						     stmt->timestamp);
2445 		}
2446 	}
2447 	(void)driver_cassandra_send_query(stmt->result);
2448 	pool_unref(&_stmt->pool);
2449 }
2450 
2451 static struct sql_result *
driver_cassandra_statement_query_s(struct sql_statement * _stmt ATTR_UNUSED)2452 driver_cassandra_statement_query_s(struct sql_statement *_stmt ATTR_UNUSED)
2453 {
2454 	i_panic("cassandra: sql_statement_query_s() not supported");
2455 }
2456 
2457 static void
driver_cassandra_update_stmt(struct sql_transaction_context * _ctx,struct sql_statement * _stmt,unsigned int * affected_rows)2458 driver_cassandra_update_stmt(struct sql_transaction_context *_ctx,
2459 			     struct sql_statement *_stmt,
2460 			     unsigned int *affected_rows)
2461 {
2462 	struct cassandra_transaction_context *ctx =
2463 		(struct cassandra_transaction_context *)_ctx;
2464 	struct cassandra_sql_statement *stmt =
2465 		(struct cassandra_sql_statement *)_stmt;
2466 
2467 	i_assert(affected_rows == NULL);
2468 
2469 	if (ctx->query != NULL || ctx->stmt != NULL) {
2470 		transaction_set_failed(ctx,
2471 			"Multiple changes in transaction not supported");
2472 		return;
2473 	}
2474 	if (stmt->prep != NULL)
2475 		ctx->stmt = stmt;
2476 	else {
2477 		ctx->query = i_strdup(sql_statement_get_query(_stmt));
2478 		ctx->query_timestamp = stmt->timestamp;
2479 		pool_unref(&_stmt->pool);
2480 	}
2481 }
2482 
driver_cassandra_have_work(struct cassandra_db * db)2483 static bool driver_cassandra_have_work(struct cassandra_db *db)
2484 {
2485 	return array_not_empty(&db->pending_prepares) ||
2486 		array_not_empty(&db->callbacks) ||
2487 		array_not_empty(&db->results);
2488 }
2489 
driver_cassandra_wait(struct sql_db * _db)2490 static void driver_cassandra_wait(struct sql_db *_db)
2491 {
2492 	struct cassandra_db *db = (struct cassandra_db *)_db;
2493 
2494 	if (!driver_cassandra_have_work(db))
2495 		return;
2496 
2497 	struct ioloop *prev_ioloop = current_ioloop;
2498 	db->ioloop = io_loop_create();
2499 	db->io_pipe = io_loop_move_io(&db->io_pipe);
2500 	while (driver_cassandra_have_work(db))
2501 		io_loop_run(db->ioloop);
2502 
2503 	io_loop_set_current(prev_ioloop);
2504 	db->io_pipe = io_loop_move_io(&db->io_pipe);
2505 	io_loop_set_current(db->ioloop);
2506 	io_loop_destroy(&db->ioloop);
2507 }
2508 
2509 const struct sql_db driver_cassandra_db = {
2510 	.name = "cassandra",
2511 	.flags = SQL_DB_FLAG_PREP_STATEMENTS,
2512 
2513 	.v = {
2514 		.init_full = driver_cassandra_init_full_v,
2515 		.deinit = driver_cassandra_deinit_v,
2516 		.connect = driver_cassandra_connect,
2517 		.disconnect = driver_cassandra_disconnect,
2518 		.escape_string = driver_cassandra_escape_string,
2519 		.exec = driver_cassandra_exec,
2520 		.query = driver_cassandra_query,
2521 		.query_s = driver_cassandra_query_s,
2522 		.wait = driver_cassandra_wait,
2523 
2524 		.transaction_begin = driver_cassandra_transaction_begin,
2525 		.transaction_commit = driver_cassandra_transaction_commit,
2526 		.transaction_commit_s = driver_cassandra_transaction_commit_s,
2527 		.transaction_rollback = driver_cassandra_transaction_rollback,
2528 
2529 		.update = driver_cassandra_update,
2530 
2531 		.escape_blob = driver_cassandra_escape_blob,
2532 
2533 		.prepared_statement_init = driver_cassandra_prepared_statement_init,
2534 		.prepared_statement_deinit = driver_cassandra_prepared_statement_deinit,
2535 		.statement_init = driver_cassandra_statement_init,
2536 		.statement_init_prepared = driver_cassandra_statement_init_prepared,
2537 		.statement_abort = driver_cassandra_statement_abort,
2538 		.statement_set_timestamp = driver_cassandra_statement_set_timestamp,
2539 		.statement_bind_str = driver_cassandra_statement_bind_str,
2540 		.statement_bind_binary = driver_cassandra_statement_bind_binary,
2541 		.statement_bind_int64 = driver_cassandra_statement_bind_int64,
2542 		.statement_query = driver_cassandra_statement_query,
2543 		.statement_query_s = driver_cassandra_statement_query_s,
2544 		.update_stmt = driver_cassandra_update_stmt,
2545 	}
2546 };
2547 
2548 const struct sql_result driver_cassandra_result = {
2549 	.v = {
2550 		driver_cassandra_result_free,
2551 		driver_cassandra_result_next_row,
2552 		driver_cassandra_result_get_fields_count,
2553 		driver_cassandra_result_get_field_name,
2554 		driver_cassandra_result_find_field,
2555 		driver_cassandra_result_get_field_value,
2556 		driver_cassandra_result_get_field_value_binary,
2557 		driver_cassandra_result_find_field_value,
2558 		driver_cassandra_result_get_values,
2559 		driver_cassandra_result_get_error,
2560 		driver_cassandra_result_more,
2561 	}
2562 };
2563 
2564 const char *driver_cassandra_version = DOVECOT_ABI_VERSION;
2565 
2566 void driver_cassandra_init(void);
2567 void driver_cassandra_deinit(void);
2568 
driver_cassandra_init(void)2569 void driver_cassandra_init(void)
2570 {
2571 	sql_driver_register(&driver_cassandra_db);
2572 }
2573 
driver_cassandra_deinit(void)2574 void driver_cassandra_deinit(void)
2575 {
2576 	sql_driver_unregister(&driver_cassandra_db);
2577 }
2578 
2579 #endif
2580