1 /*
2 Copyright (c) 2012, Monty Program Ab
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20
21 #include <my_config.h>
22 #include <mysql/plugin.h>
23 #include "ha_cassandra.h"
24 #include "sql_class.h"
25
26 #define DYNCOL_USUAL 20
27 #define DYNCOL_DELTA 100
28 #define DYNCOL_USUAL_REC 1024
29 #define DYNCOL_DELTA_REC 1024
30
31 static handler *cassandra_create_handler(handlerton *hton,
32 TABLE_SHARE *table,
33 MEM_ROOT *mem_root);
34
35 extern int dynamic_column_error_message(enum_dyncol_func_result rc);
36
37 handlerton *cassandra_hton;
38
39
40 /*
41 Hash used to track the number of open tables; variable for example share
42 methods
43 */
44 static HASH cassandra_open_tables;
45
46 /* The mutex used to init the hash; variable for example share methods */
47 mysql_mutex_t cassandra_mutex;
48
49
50 /**
51 Structure for CREATE TABLE options (table options).
52 It needs to be called ha_table_option_struct.
53
54 The option values can be specified in the CREATE TABLE at the end:
55 CREATE TABLE ( ... ) *here*
56 */
57
58 struct ha_table_option_struct
59 {
60 const char *thrift_host;
61 int thrift_port;
62 const char *keyspace;
63 const char *column_family;
64 };
65
66
67 ha_create_table_option cassandra_table_option_list[]=
68 {
69 /*
70 one option that takes an arbitrary string
71 */
72 HA_TOPTION_STRING("thrift_host", thrift_host),
73 HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
74 HA_TOPTION_STRING("keyspace", keyspace),
75 HA_TOPTION_STRING("column_family", column_family),
76 HA_TOPTION_END
77 };
78
79 /**
80 Structure for CREATE TABLE options (field options).
81 */
82
83 struct ha_field_option_struct
84 {
85 bool dyncol_field;
86 };
87
88 ha_create_table_option cassandra_field_option_list[]=
89 {
90 /*
91 Collect all other columns as dynamic here,
92 the valid values are YES/NO, ON/OFF, 1/0.
93 The default is 0, that is false, no, off.
94 */
95 HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
96 HA_FOPTION_END
97 };
98
99 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
100 "Number of rows in an INSERT batch",
101 NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
102
103 static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
104 "Number of rows in a multiget(MRR) batch",
105 NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
106
107 static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
108 "Number of rows in an rnd_read (full scan) batch",
109 NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
110
111 static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
112 "Number of times to retry Cassandra calls that failed due to timeouts or "
113 "network communication problems. The default, 0, means not to retry.",
114 NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
115
116 /* These match values in enum_cassandra_consistency_level */
117 const char *cassandra_consistency_level[] =
118 {
119 "ONE",
120 "QUORUM",
121 "LOCAL_QUORUM",
122 "EACH_QUORUM",
123 "ALL",
124 "ANY",
125 "TWO",
126 "THREE",
127 NullS
128 };
129
130 TYPELIB cassandra_consistency_level_typelib= {
131 array_elements(cassandra_consistency_level) - 1, "",
132 cassandra_consistency_level, NULL
133 };
134
135
136 static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
137 "Cassandra consistency level to use for write operations", NULL, NULL,
138 ONE, &cassandra_consistency_level_typelib);
139
140 static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
141 "Cassandra consistency level to use for read operations", NULL, NULL,
142 ONE, &cassandra_consistency_level_typelib);
143
144
145 mysql_mutex_t cassandra_default_host_lock;
146 static char* cassandra_default_thrift_host = NULL;
147 static char cassandra_default_host_buf[256]="";
148
149 static void
cassandra_default_thrift_host_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)150 cassandra_default_thrift_host_update(THD *thd,
151 struct st_mysql_sys_var* var,
152 void* var_ptr, /*!< out: where the
153 formal string goes */
154 const void* save) /*!< in: immediate result
155 from check function */
156 {
157 const char *new_host= *((char**)save);
158 const size_t max_len= sizeof(cassandra_default_host_buf);
159
160 mysql_mutex_lock(&cassandra_default_host_lock);
161
162 if (new_host)
163 {
164 strncpy(cassandra_default_host_buf, new_host, max_len-1);
165 cassandra_default_host_buf[max_len-1]= 0;
166 cassandra_default_thrift_host= cassandra_default_host_buf;
167 }
168 else
169 {
170 cassandra_default_host_buf[0]= 0;
171 cassandra_default_thrift_host= NULL;
172 }
173
174 *((const char**)var_ptr)= cassandra_default_thrift_host;
175
176 mysql_mutex_unlock(&cassandra_default_host_lock);
177 }
178
179
180 static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
181 PLUGIN_VAR_RQCMDARG,
182 "Default host for Cassandra thrift connections",
183 /*check*/NULL,
184 cassandra_default_thrift_host_update,
185 /*default*/NULL);
186
187 static struct st_mysql_sys_var* cassandra_system_variables[]= {
188 MYSQL_SYSVAR(insert_batch_size),
189 MYSQL_SYSVAR(multiget_batch_size),
190 MYSQL_SYSVAR(rnd_batch_size),
191
192 MYSQL_SYSVAR(default_thrift_host),
193 MYSQL_SYSVAR(write_consistency),
194 MYSQL_SYSVAR(read_consistency),
195 MYSQL_SYSVAR(failure_retries),
196 NULL
197 };
198
199 Cassandra_status_vars cassandra_counters;
200
201 /**
202 @brief
203 Function we use in the creation of our hash to get key.
204 */
205
cassandra_get_key(CASSANDRA_SHARE * share,size_t * length,my_bool not_used)206 static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
207 my_bool not_used __attribute__((unused)))
208 {
209 *length=share->table_name_length;
210 return (uchar*) share->table_name;
211 }
212
213 #ifdef HAVE_PSI_INTERFACE
214 static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
215
216 static PSI_mutex_info all_cassandra_mutexes[]=
217 {
218 { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
219 { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
220 };
221
init_cassandra_psi_keys()222 static void init_cassandra_psi_keys()
223 {
224 const char* category= "cassandra";
225 int count;
226
227 if (PSI_server == NULL)
228 return;
229
230 count= array_elements(all_cassandra_mutexes);
231 PSI_server->register_mutex(category, all_cassandra_mutexes, count);
232 }
233 #endif
234
cassandra_init_func(void * p)235 static int cassandra_init_func(void *p)
236 {
237 DBUG_ENTER("cassandra_init_func");
238
239 #ifdef HAVE_PSI_INTERFACE
240 init_cassandra_psi_keys();
241 #endif
242
243 cassandra_hton= (handlerton *)p;
244 mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
245 (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
246 (my_hash_get_key) cassandra_get_key,0,0);
247
248 cassandra_hton->state= SHOW_OPTION_YES;
249 cassandra_hton->create= cassandra_create_handler;
250 /*
251 Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
252 TABLE to create an *empty* table from scratch. Cassandra table won't be
253 emptied if re-created.
254 */
255 cassandra_hton->flags= 0;
256 cassandra_hton->table_options= cassandra_table_option_list;
257 cassandra_hton->field_options= cassandra_field_option_list;
258
259 mysql_mutex_init(0 /* no instrumentation */,
260 &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
261
262 DBUG_RETURN(0);
263 }
264
265
cassandra_done_func(void * p)266 static int cassandra_done_func(void *p)
267 {
268 int error= 0;
269 DBUG_ENTER("cassandra_done_func");
270 if (cassandra_open_tables.records)
271 error= 1;
272 my_hash_free(&cassandra_open_tables);
273 mysql_mutex_destroy(&cassandra_mutex);
274 mysql_mutex_destroy(&cassandra_default_host_lock);
275 DBUG_RETURN(error);
276 }
277
278
279 /**
280 @brief
281 Example of simple lock controls. The "share" it creates is a
282 structure we will pass to each cassandra handler. Do you have to have
283 one of these? Well, you have pieces that are used for locking, and
284 they are needed to function.
285 */
286
get_share(const char * table_name,TABLE * table)287 static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
288 {
289 CASSANDRA_SHARE *share;
290 uint length;
291 char *tmp_name;
292
293 mysql_mutex_lock(&cassandra_mutex);
294 length=(uint) strlen(table_name);
295
296 if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
297 (uchar*) table_name,
298 length)))
299 {
300 if (!(share=(CASSANDRA_SHARE *)
301 my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
302 &share, sizeof(*share),
303 &tmp_name, length+1,
304 NullS)))
305 {
306 mysql_mutex_unlock(&cassandra_mutex);
307 return NULL;
308 }
309
310 share->use_count=0;
311 share->table_name_length=length;
312 share->table_name=tmp_name;
313 strmov(share->table_name,table_name);
314 if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
315 goto error;
316 thr_lock_init(&share->lock);
317 mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
318 &share->mutex, MY_MUTEX_INIT_FAST);
319 }
320 share->use_count++;
321 mysql_mutex_unlock(&cassandra_mutex);
322
323 return share;
324
325 error:
326 mysql_mutex_destroy(&share->mutex);
327 my_free(share);
328
329 return NULL;
330 }
331
332
333 /**
334 @brief
335 Free lock controls. We call this whenever we close a table. If the table had
336 the last reference to the share, then we free memory associated with it.
337 */
338
free_share(CASSANDRA_SHARE * share)339 static int free_share(CASSANDRA_SHARE *share)
340 {
341 mysql_mutex_lock(&cassandra_mutex);
342 if (!--share->use_count)
343 {
344 my_hash_delete(&cassandra_open_tables, (uchar*) share);
345 thr_lock_delete(&share->lock);
346 mysql_mutex_destroy(&share->mutex);
347 my_free(share);
348 }
349 mysql_mutex_unlock(&cassandra_mutex);
350
351 return 0;
352 }
353
354
cassandra_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)355 static handler* cassandra_create_handler(handlerton *hton,
356 TABLE_SHARE *table,
357 MEM_ROOT *mem_root)
358 {
359 return new (mem_root) ha_cassandra(hton, table);
360 }
361
362
ha_cassandra(handlerton * hton,TABLE_SHARE * table_arg)363 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
364 :handler(hton, table_arg),
365 se(NULL), field_converters(NULL),
366 special_type_field_converters(NULL),
367 special_type_field_names(NULL), n_special_type_fields(0),
368 rowkey_converter(NULL),
369 dyncol_field(0), dyncol_set(0)
370 {}
371
372
connect_and_check_options(TABLE * table_arg)373 int ha_cassandra::connect_and_check_options(TABLE *table_arg)
374 {
375 ha_table_option_struct *options= table_arg->s->option_struct;
376 int res;
377 DBUG_ENTER("ha_cassandra::connect_and_check_options");
378
379 if ((res= check_field_options(table_arg->s->field)) ||
380 (res= check_table_options(options)))
381 DBUG_RETURN(res);
382
383 se= create_cassandra_se();
384 se->set_column_family(options->column_family);
385 const char *thrift_host= options->thrift_host? options->thrift_host:
386 cassandra_default_thrift_host;
387 if (se->connect(thrift_host, options->thrift_port, options->keyspace))
388 {
389 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
390 DBUG_RETURN(HA_ERR_NO_CONNECTION);
391 }
392
393 if (setup_field_converters(table_arg->field, table_arg->s->fields))
394 {
395 DBUG_RETURN(HA_ERR_NO_CONNECTION);
396 }
397
398 DBUG_RETURN(0);
399 }
400
401
check_field_options(Field ** fields)402 int ha_cassandra::check_field_options(Field **fields)
403 {
404 Field **field;
405 uint i;
406 DBUG_ENTER("ha_cassandra::check_field_options");
407 for (field= fields, i= 0; *field; field++, i++)
408 {
409 ha_field_option_struct *field_options= (*field)->option_struct;
410 if (field_options && field_options->dyncol_field)
411 {
412 if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
413 {
414 my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name.str);
415 DBUG_RETURN(HA_WRONG_CREATE_OPTION);
416 }
417 dyncol_set= 1;
418 dyncol_field= i;
419 bzero(&dynamic_values, sizeof(dynamic_values));
420 bzero(&dynamic_names, sizeof(dynamic_names));
421 bzero(&dynamic_rec, sizeof(dynamic_rec));
422 }
423 }
424 DBUG_RETURN(0);
425 }
426
427
open(const char * name,int mode,uint test_if_locked)428 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
429 {
430 DBUG_ENTER("ha_cassandra::open");
431
432 if (!(share = get_share(name, table)))
433 DBUG_RETURN(1);
434 thr_lock_data_init(&share->lock,&lock,NULL);
435
436 DBUG_ASSERT(!se);
437 /*
438 Don't do the following on open: it prevents SHOW CREATE TABLE when the server
439 has gone away.
440 */
441 /*
442 int res;
443 if ((res= connect_and_check_options(table)))
444 {
445 DBUG_RETURN(res);
446 }
447 */
448
449 info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
450 insert_lineno= 0;
451
452 DBUG_RETURN(0);
453 }
454
455
close(void)456 int ha_cassandra::close(void)
457 {
458 DBUG_ENTER("ha_cassandra::close");
459 delete se;
460 se= NULL;
461 free_field_converters();
462 DBUG_RETURN(free_share(share));
463 }
464
465
check_table_options(ha_table_option_struct * options)466 int ha_cassandra::check_table_options(ha_table_option_struct *options)
467 {
468 if (!options->thrift_host && (!cassandra_default_thrift_host ||
469 !cassandra_default_thrift_host[0]))
470 {
471 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
472 "thrift_host table option must be specified, or "
473 "@@cassandra_default_thrift_host must be set");
474 return HA_WRONG_CREATE_OPTION;
475 }
476
477 if (!options->keyspace || !options->column_family)
478 {
479 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
480 "keyspace and column_family table options must be specified");
481 return HA_WRONG_CREATE_OPTION;
482 }
483 return 0;
484 }
485
486
487 /**
488 @brief
489 create() is called to create a table. The variable name will have the name
490 of the table.
491
492 @details
493 When create() is called you do not need to worry about
494 opening the table. Also, the .frm file will have already been
495 created so adjusting create_info is not necessary. You can overwrite
496 the .frm file at this point if you wish to change the table
497 definition, but there are no methods currently provided for doing
498 so.
499
500 Called from handle.cc by ha_create_table().
501
502 @see
503 ha_create_table() in handle.cc
504 */
505
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)506 int ha_cassandra::create(const char *name, TABLE *table_arg,
507 HA_CREATE_INFO *create_info)
508 {
509 int res;
510 DBUG_ENTER("ha_cassandra::create");
511
512 if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
513 table_arg->key_info[0].user_defined_key_parts != 1 ||
514 table_arg->key_info[0].key_part[0].fieldnr != 1)
515 {
516 my_error(ER_WRONG_COLUMN_NAME, MYF(0),
517 "Table must have PRIMARY KEY defined over the first column");
518 DBUG_RETURN(HA_WRONG_CREATE_OPTION);
519 }
520
521 DBUG_ASSERT(!se);
522 if ((res= connect_and_check_options(table_arg)))
523 DBUG_RETURN(res);
524
525 insert_lineno= 0;
526 DBUG_RETURN(0);
527 }
528
529 /*
530 Mapping needs to
531 - copy value from MySQL record to Thrift buffer
532 - copy value from Thrift bufer to MySQL record..
533
534 */
535
536 /* Converter base */
537 class ColumnDataConverter
538 {
539 public:
540 Field *field;
541
542 /* This will save Cassandra's data in the Field */
543 virtual int cassandra_to_mariadb(const char *cass_data,
544 int cass_data_len)=0;
545
546 /*
547 This will get data from the Field pointer, store Cassandra's form
548 in internal buffer, and return pointer/size.
549
550 @return
551 false - OK
552 true - Failed to convert value (completely, there is no value to insert
553 at all).
554 */
555 virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
~ColumnDataConverter()556 virtual ~ColumnDataConverter() {};
557 };
558
559
560 class DoubleDataConverter : public ColumnDataConverter
561 {
562 double buf;
563 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)564 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
565 {
566 DBUG_ASSERT(cass_data_len == sizeof(double));
567 double *pdata= (double*) cass_data;
568 field->store(*pdata);
569 return 0;
570 }
571
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)572 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
573 {
574 buf= field->val_real();
575 *cass_data= (char*)&buf;
576 *cass_data_len=sizeof(double);
577 return false;
578 }
~DoubleDataConverter()579 ~DoubleDataConverter(){}
580 };
581
582
583 class FloatDataConverter : public ColumnDataConverter
584 {
585 float buf;
586 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)587 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
588 {
589 DBUG_ASSERT(cass_data_len == sizeof(float));
590 float *pdata= (float*) cass_data;
591 field->store(*pdata);
592 return 0;
593 }
594
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)595 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
596 {
597 buf= field->val_real();
598 *cass_data= (char*)&buf;
599 *cass_data_len=sizeof(float);
600 return false;
601 }
~FloatDataConverter()602 ~FloatDataConverter(){}
603 };
604
flip64(const char * from,char * to)605 static void flip64(const char *from, char* to)
606 {
607 to[0]= from[7];
608 to[1]= from[6];
609 to[2]= from[5];
610 to[3]= from[4];
611 to[4]= from[3];
612 to[5]= from[2];
613 to[6]= from[1];
614 to[7]= from[0];
615 }
616
617 class BigintDataConverter : public ColumnDataConverter
618 {
619 longlong buf;
620 bool flip; /* is false when reading counter columns */
621 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)622 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
623 {
624 longlong tmp;
625 DBUG_ASSERT(cass_data_len == sizeof(longlong));
626 if (flip)
627 flip64(cass_data, (char*)&tmp);
628 else
629 memcpy(&tmp, cass_data, sizeof(longlong));
630 field->store(tmp);
631 return 0;
632 }
633
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)634 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
635 {
636 longlong tmp= field->val_int();
637 if (flip)
638 flip64((const char*)&tmp, (char*)&buf);
639 else
640 memcpy(&buf, &tmp, sizeof(longlong));
641 *cass_data= (char*)&buf;
642 *cass_data_len=sizeof(longlong);
643 return false;
644 }
BigintDataConverter(bool flip_arg)645 BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
~BigintDataConverter()646 ~BigintDataConverter(){}
647 };
648
flip32(const char * from,char * to)649 static void flip32(const char *from, char* to)
650 {
651 to[0]= from[3];
652 to[1]= from[2];
653 to[2]= from[1];
654 to[3]= from[0];
655 }
656
657
658 class TinyintDataConverter : public ColumnDataConverter
659 {
660 char buf;
661 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)662 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
663 {
664 DBUG_ASSERT(cass_data_len == 1);
665 field->store(cass_data[0]);
666 return 0;
667 }
668
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)669 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
670 {
671 buf= field->val_int()? 1 : 0; /* TODO: error handling? */
672 *cass_data= (char*)&buf;
673 *cass_data_len= 1;
674 return false;
675 }
~TinyintDataConverter()676 ~TinyintDataConverter(){}
677 };
678
679
680 class Int32DataConverter : public ColumnDataConverter
681 {
682 int32_t buf;
683 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)684 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
685 {
686 int32_t tmp;
687 DBUG_ASSERT(cass_data_len == sizeof(int32_t));
688 flip32(cass_data, (char*)&tmp);
689 field->store(tmp);
690 return 0;
691 }
692
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)693 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
694 {
695 int32_t tmp= field->val_int();
696 flip32((const char*)&tmp, (char*)&buf);
697 *cass_data= (char*)&buf;
698 *cass_data_len=sizeof(int32_t);
699 return false;
700 }
~Int32DataConverter()701 ~Int32DataConverter(){}
702 };
703
704
705 class StringCopyConverter : public ColumnDataConverter
706 {
707 String buf;
708 size_t max_length;
709 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)710 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
711 {
712 if ((size_t)cass_data_len > max_length)
713 return 1;
714 field->store(cass_data, cass_data_len,field->charset());
715 return 0;
716 }
717
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)718 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
719 {
720 String *pstr= field->val_str(&buf);
721 *cass_data= (char*)pstr->ptr();
722 *cass_data_len= pstr->length();
723 return false;
724 }
StringCopyConverter(size_t max_length_arg)725 StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter()726 ~StringCopyConverter(){}
727 };
728
729
730 class TimestampDataConverter : public ColumnDataConverter
731 {
732 int64_t buf;
733 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)734 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
735 {
736 /* Cassandra data is milliseconds-since-epoch in network byte order */
737 int64_t tmp;
738 DBUG_ASSERT(cass_data_len==8);
739 flip64(cass_data, (char*)&tmp);
740 /*
741 store_TIME's arguments:
742 - seconds since epoch
743 - microsecond fraction of a second.
744 */
745 ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
746 return 0;
747 }
748
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)749 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
750 {
751 my_time_t ts_time;
752 ulong ts_microsec;
753 int64_t tmp;
754 ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec);
755
756 /* Cassandra needs milliseconds-since-epoch */
757 tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000;
758 flip64((const char*)&tmp, (char*)&buf);
759
760 *cass_data= (char*)&buf;
761 *cass_data_len= 8;
762 return false;
763 }
~TimestampDataConverter()764 ~TimestampDataConverter(){}
765 };
766
767
768
convert_hex_digit(const char c)769 static int convert_hex_digit(const char c)
770 {
771 int num;
772 if (c >= '0' && c <= '9')
773 num= c - '0';
774 else if (c >= 'A' && c <= 'F')
775 num= c - 'A' + 10;
776 else if (c >= 'a' && c <= 'f')
777 num= c - 'a' + 10;
778 else
779 return -1; /* Couldn't convert */
780 return num;
781 }
782
783
784 const char map2number[]="0123456789abcdef";
785
convert_uuid2string(char * str,const char * cass_data)786 static void convert_uuid2string(char *str, const char *cass_data)
787 {
788 char *ptr= str;
789 /* UUID arrives as 16-byte number in network byte order */
790 for (uint i=0; i < 16; i++)
791 {
792 *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
793 *(ptr++)= map2number[cass_data[i] & 0xF];
794 if (i == 3 || i == 5 || i == 7 || i == 9)
795 *(ptr++)= '-';
796 }
797 *ptr= 0;
798 }
799
convert_string2uuid(char * buf,const char * str)800 static bool convert_string2uuid(char *buf, const char *str)
801 {
802 int lower, upper;
803 for (uint i= 0; i < 16; i++)
804 {
805 if ((upper= convert_hex_digit(str[0])) == -1 ||
806 (lower= convert_hex_digit(str[1])) == -1)
807 {
808 return true;
809 }
810 buf[i]= lower | (upper << 4);
811 str += 2;
812 if (i == 3 || i == 5 || i == 7 || i == 9)
813 {
814 if (str[0] != '-')
815 return true;
816 str++;
817 }
818 }
819 return false;
820 }
821
822
823 class UuidDataConverter : public ColumnDataConverter
824 {
825 char buf[16]; /* Binary UUID representation */
826 String str_buf;
827 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)828 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
829 {
830 DBUG_ASSERT(cass_data_len==16);
831 char str[37];
832 convert_uuid2string(str, cass_data);
833 field->store(str, 36,field->charset());
834 return 0;
835 }
836
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)837 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
838 {
839 String *uuid_str= field->val_str(&str_buf);
840
841 if (uuid_str->length() != 36)
842 return true;
843
844 if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
845 return true;
846 *cass_data= buf;
847 *cass_data_len= 16;
848 return false;
849 }
~UuidDataConverter()850 ~UuidDataConverter(){}
851 };
852
853 /**
854 Converting dynamic columns types to/from casandra types
855 */
856
857
858 /**
859 Check and initialize (if it is needed) string MEM_ROOT
860 */
alloc_strings_memroot(MEM_ROOT * mem_root)861 static void alloc_strings_memroot(MEM_ROOT *mem_root)
862 {
863 if (!alloc_root_inited(mem_root))
864 {
865 /*
866 The mem_root used to allocate UUID (of length 36 + \0) so make
867 appropriate allocated size
868 */
869 init_alloc_root(mem_root, "cassandra",
870 (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
871 ALLOC_ROOT_MIN_BLOCK_SIZE,
872 (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
873 ALLOC_ROOT_MIN_BLOCK_SIZE, MYF(MY_THREAD_SPECIFIC));
874 }
875 }
876
free_strings_memroot(MEM_ROOT * mem_root)877 static void free_strings_memroot(MEM_ROOT *mem_root)
878 {
879 if (alloc_root_inited(mem_root))
880 free_root(mem_root, MYF(0));
881 }
882
cassandra_to_dyncol_intLong(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)883 bool cassandra_to_dyncol_intLong(const char *cass_data,
884 int cass_data_len __attribute__((unused)),
885 DYNAMIC_COLUMN_VALUE *value,
886 MEM_ROOT *mem_root __attribute__((unused)))
887 {
888 value->type= DYN_COL_INT;
889 #ifdef WORDS_BIGENDIAN
890 value->x.long_value= (longlong)*cass_data;
891 #else
892 flip64(cass_data, (char *)&value->x.long_value);
893 #endif
894 return 0;
895 }
896
dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)897 bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
898 char **cass_data, int *cass_data_len,
899 void* buff, void **freemem)
900 {
901 longlong *tmp= (longlong *) buff;
902 enum enum_dyncol_func_result rc=
903 mariadb_dyncol_val_long(tmp, value);
904 if (rc < 0)
905 return true;
906 *cass_data_len= sizeof(longlong);
907 #ifdef WORDS_BIGENDIAN
908 *cass_data= (char *)buff;
909 #else
910 flip64((char *)buff, (char *)buff + sizeof(longlong));
911 *cass_data= (char *)buff + sizeof(longlong);
912 #endif
913 *freemem= NULL;
914 return false;
915 }
916
cassandra_to_dyncol_intInt32(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)917 bool cassandra_to_dyncol_intInt32(const char *cass_data,
918 int cass_data_len __attribute__((unused)),
919 DYNAMIC_COLUMN_VALUE *value,
920 MEM_ROOT *mem_root __attribute__((unused)))
921 {
922 int32 tmp;
923 value->type= DYN_COL_INT;
924 #ifdef WORDS_BIGENDIAN
925 tmp= *((int32 *)cass_data);
926 #else
927 flip32(cass_data, (char *)&tmp);
928 #endif
929 value->x.long_value= tmp;
930 return 0;
931 }
932
933
dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)934 bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
935 char **cass_data, int *cass_data_len,
936 void* buff, void **freemem)
937 {
938 longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
939 enum enum_dyncol_func_result rc=
940 mariadb_dyncol_val_long(tmp, value);
941 if (rc < 0)
942 return true;
943 *cass_data_len= sizeof(int32);
944 *cass_data= (char *)buff;
945 #ifdef WORDS_BIGENDIAN
946 *((int32 *) buff) = (int32) *tmp;
947 #else
948 {
949 int32 tmp2= (int32) *tmp;
950 flip32((char *)&tmp2, (char *)buff);
951 }
952 #endif
953 *freemem= NULL;
954 return false;
955 }
956
957
cassandra_to_dyncol_intCounter(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)958 bool cassandra_to_dyncol_intCounter(const char *cass_data,
959 int cass_data_len __attribute__((unused)),
960 DYNAMIC_COLUMN_VALUE *value,
961 MEM_ROOT *mem_root __attribute__((unused)))
962 {
963 value->type= DYN_COL_INT;
964 value->x.long_value= *((longlong *)cass_data);
965 return 0;
966 }
967
968
dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)969 bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
970 char **cass_data, int *cass_data_len,
971 void* buff, void **freemem)
972 {
973 longlong *tmp= (longlong *)buff;
974 enum enum_dyncol_func_result rc=
975 mariadb_dyncol_val_long(tmp, value);
976 if (rc < 0)
977 return true;
978 *cass_data_len= sizeof(longlong);
979 *cass_data= (char *)buff;
980 *freemem= NULL;
981 return false;
982 }
983
cassandra_to_dyncol_doubleFloat(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)984 bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
985 int cass_data_len __attribute__((unused)),
986 DYNAMIC_COLUMN_VALUE *value,
987 MEM_ROOT *mem_root __attribute__((unused)))
988 {
989 value->type= DYN_COL_DOUBLE;
990 value->x.double_value= *((float *)cass_data);
991 return 0;
992 }
993
dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)994 bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
995 char **cass_data, int *cass_data_len,
996 void* buff, void **freemem)
997 {
998 double tmp;
999 enum enum_dyncol_func_result rc=
1000 mariadb_dyncol_val_double(&tmp, value);
1001 if (rc < 0)
1002 return true;
1003 *((float *)buff)= (float) tmp;
1004 *cass_data_len= sizeof(float);
1005 *cass_data= (char *)buff;
1006 *freemem= NULL;
1007 return false;
1008 }
1009
cassandra_to_dyncol_doubleDouble(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1010 bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
1011 int cass_data_len __attribute__((unused)),
1012 DYNAMIC_COLUMN_VALUE *value,
1013 MEM_ROOT *mem_root
1014 __attribute__((unused)))
1015 {
1016 value->type= DYN_COL_DOUBLE;
1017 value->x.double_value= *((double *)cass_data);
1018 return 0;
1019 }
1020
dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1021 bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
1022 char **cass_data, int *cass_data_len,
1023 void* buff, void **freemem)
1024 {
1025 double *tmp= (double *)buff;
1026 enum enum_dyncol_func_result rc=
1027 mariadb_dyncol_val_double(tmp, value);
1028 if (rc < 0)
1029 return true;
1030 *cass_data_len= sizeof(double);
1031 *cass_data= (char *)buff;
1032 *freemem= NULL;
1033 return false;
1034 }
1035
cassandra_to_dyncol_strStr(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,CHARSET_INFO * cs)1036 bool cassandra_to_dyncol_strStr(const char *cass_data,
1037 int cass_data_len,
1038 DYNAMIC_COLUMN_VALUE *value,
1039 CHARSET_INFO *cs)
1040 {
1041 value->type= DYN_COL_STRING;
1042 value->x.string.charset= cs;
1043 value->x.string.value.str= (char *)cass_data;
1044 value->x.string.value.length= cass_data_len;
1045 return 0;
1046 }
1047
dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem,CHARSET_INFO * cs)1048 bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
1049 char **cass_data, int *cass_data_len,
1050 void* buff, void **freemem, CHARSET_INFO *cs)
1051 {
1052 DYNAMIC_STRING tmp;
1053 if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1054 return 1;
1055 enum enum_dyncol_func_result rc=
1056 mariadb_dyncol_val_str(&tmp, value, cs, '\0');
1057 if (rc < 0)
1058 {
1059 dynstr_free(&tmp);
1060 return 1;
1061 }
1062 *cass_data_len= tmp.length;
1063 *(cass_data)= tmp.str;
1064 *freemem= tmp.str;
1065 return 0;
1066 }
1067
cassandra_to_dyncol_strBytes(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1068 bool cassandra_to_dyncol_strBytes(const char *cass_data,
1069 int cass_data_len,
1070 DYNAMIC_COLUMN_VALUE *value,
1071 MEM_ROOT *mem_root __attribute__((unused)))
1072 {
1073 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1074 &my_charset_bin);
1075 }
1076
dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1077 bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
1078 char **cass_data, int *cass_data_len,
1079 void* buff, void **freemem)
1080 {
1081 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1082 buff, freemem, &my_charset_bin);
1083 }
1084
cassandra_to_dyncol_strAscii(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1085 bool cassandra_to_dyncol_strAscii(const char *cass_data,
1086 int cass_data_len,
1087 DYNAMIC_COLUMN_VALUE *value,
1088 MEM_ROOT *mem_root __attribute__((unused)))
1089 {
1090 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1091 &my_charset_latin1_bin);
1092 }
1093
dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1094 bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
1095 char **cass_data, int *cass_data_len,
1096 void* buff, void **freemem)
1097 {
1098 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1099 buff, freemem, &my_charset_latin1_bin);
1100 }
1101
cassandra_to_dyncol_strUTF8(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1102 bool cassandra_to_dyncol_strUTF8(const char *cass_data,
1103 int cass_data_len,
1104 DYNAMIC_COLUMN_VALUE *value,
1105 MEM_ROOT *mem_root __attribute__((unused)))
1106 {
1107 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1108 &my_charset_utf8_unicode_ci);
1109 }
1110
dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1111 bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
1112 char **cass_data, int *cass_data_len,
1113 void* buff, void **freemem)
1114 {
1115 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1116 buff, freemem, &my_charset_utf8_unicode_ci);
1117 }
1118
cassandra_to_dyncol_strUUID(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1119 bool cassandra_to_dyncol_strUUID(const char *cass_data,
1120 int cass_data_len,
1121 DYNAMIC_COLUMN_VALUE *value,
1122 MEM_ROOT *mem_root)
1123 {
1124 value->type= DYN_COL_STRING;
1125 value->x.string.charset= &my_charset_bin;
1126 alloc_strings_memroot(mem_root);
1127 value->x.string.value.str= (char *)alloc_root(mem_root, 37);
1128 if (!value->x.string.value.str)
1129 {
1130 value->x.string.value.length= 0;
1131 return 1;
1132 }
1133 convert_uuid2string(value->x.string.value.str, cass_data);
1134 value->x.string.value.length= 36;
1135 return 0;
1136 }
1137
dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1138 bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
1139 char **cass_data, int *cass_data_len,
1140 void* buff, void **freemem)
1141 {
1142 DYNAMIC_STRING tmp;
1143 if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1144 return true;
1145 enum enum_dyncol_func_result rc=
1146 mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0');
1147 if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
1148 {
1149 dynstr_free(&tmp);
1150 return true;
1151 }
1152
1153 *cass_data_len= tmp.length;
1154 *(cass_data)= tmp.str;
1155 *freemem= tmp.str;
1156 return 0;
1157 }
1158
cassandra_to_dyncol_intBool(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1159 bool cassandra_to_dyncol_intBool(const char *cass_data,
1160 int cass_data_len,
1161 DYNAMIC_COLUMN_VALUE *value,
1162 MEM_ROOT *mem_root __attribute__((unused)))
1163 {
1164 value->type= DYN_COL_INT;
1165 value->x.long_value= (cass_data[0] ? 1 : 0);
1166 return 0;
1167 }
1168
dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1169 bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
1170 char **cass_data, int *cass_data_len,
1171 void* buff, void **freemem)
1172 {
1173 longlong tmp;
1174 enum enum_dyncol_func_result rc=
1175 mariadb_dyncol_val_long(&tmp, value);
1176 if (rc < 0)
1177 return true;
1178 ((char *)buff)[0]= (tmp ? 1 : 0);
1179 *cass_data_len= 1;
1180 *(cass_data)= (char *)buff;
1181 *freemem= 0;
1182 return 0;
1183 }
1184
1185
1186 const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
1187 const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
1188 const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
1189
1190 const char * const validator_float= "org.apache.cassandra.db.marshal.FloatType";
1191 const char * const validator_double= "org.apache.cassandra.db.marshal.DoubleType";
1192
1193 const char * const validator_blob= "org.apache.cassandra.db.marshal.BytesType";
1194 const char * const validator_ascii= "org.apache.cassandra.db.marshal.AsciiType";
1195 const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type";
1196
1197 const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
1198
1199 const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
1200
1201 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
1202
1203 /* VARINTs are stored as big-endian big numbers. */
1204 const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
1205 const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
1206
1207
1208 static CASSANDRA_TYPE_DEF cassandra_types[]=
1209 {
1210 {
1211 validator_bigint,
1212 &cassandra_to_dyncol_intLong,
1213 &dyncol_to_cassandraLong
1214 },
1215 {
1216 validator_int,
1217 &cassandra_to_dyncol_intInt32,
1218 &dyncol_to_cassandraInt32
1219 },
1220 {
1221 validator_counter,
1222 cassandra_to_dyncol_intCounter,
1223 &dyncol_to_cassandraCounter
1224 },
1225 {
1226 validator_float,
1227 &cassandra_to_dyncol_doubleFloat,
1228 &dyncol_to_cassandraFloat
1229 },
1230 {
1231 validator_double,
1232 &cassandra_to_dyncol_doubleDouble,
1233 &dyncol_to_cassandraDouble
1234 },
1235 {
1236 validator_blob,
1237 &cassandra_to_dyncol_strBytes,
1238 &dyncol_to_cassandraBytes
1239 },
1240 {
1241 validator_ascii,
1242 &cassandra_to_dyncol_strAscii,
1243 &dyncol_to_cassandraAscii
1244 },
1245 {
1246 validator_text,
1247 &cassandra_to_dyncol_strUTF8,
1248 &dyncol_to_cassandraUTF8
1249 },
1250 {
1251 validator_timestamp,
1252 &cassandra_to_dyncol_intLong,
1253 &dyncol_to_cassandraLong
1254 },
1255 {
1256 validator_uuid,
1257 &cassandra_to_dyncol_strUUID,
1258 &dyncol_to_cassandraUUID
1259 },
1260 {
1261 validator_boolean,
1262 &cassandra_to_dyncol_intBool,
1263 &dyncol_to_cassandraBool
1264 },
1265 {
1266 validator_varint,
1267 &cassandra_to_dyncol_strBytes,
1268 &dyncol_to_cassandraBytes
1269 },
1270 {
1271 validator_decimal,
1272 &cassandra_to_dyncol_strBytes,
1273 &dyncol_to_cassandraBytes
1274 }
1275 };
1276
get_cassandra_type(const char * validator)1277 CASSANDRA_TYPE get_cassandra_type(const char *validator)
1278 {
1279 CASSANDRA_TYPE rc;
1280 switch(validator[32])
1281 {
1282 case 'L':
1283 rc= CT_BIGINT;
1284 break;
1285 case 'I':
1286 rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
1287 rc= CT_INT;
1288 break;
1289 case 'C':
1290 rc= CT_COUNTER;
1291 break;
1292 case 'F':
1293 rc= CT_FLOAT;
1294 break;
1295 case 'D':
1296 switch (validator[33])
1297 {
1298 case 'o':
1299 rc= CT_DOUBLE;
1300 break;
1301 case 'a':
1302 rc= CT_TIMESTAMP;
1303 break;
1304 case 'e':
1305 rc= CT_DECIMAL;
1306 break;
1307 default:
1308 rc= CT_BLOB;
1309 break;
1310 }
1311 break;
1312 case 'B':
1313 rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
1314 break;
1315 case 'A':
1316 rc= CT_ASCII;
1317 break;
1318 case 'U':
1319 rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
1320 break;
1321 default:
1322 rc= CT_BLOB;
1323 }
1324 DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
1325 return rc;
1326 }
1327
map_field_to_validator(Field * field,const char * validator_name)1328 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
1329 {
1330 ColumnDataConverter *res= NULL;
1331
1332 switch(field->type()) {
1333 case MYSQL_TYPE_TINY:
1334 if (!strcmp(validator_name, validator_boolean))
1335 {
1336 res= new TinyintDataConverter;
1337 break;
1338 }
1339 /* fall through: */
1340 case MYSQL_TYPE_SHORT:
1341 case MYSQL_TYPE_LONGLONG:
1342 {
1343 bool is_counter= false;
1344 if (!strcmp(validator_name, validator_bigint) ||
1345 !strcmp(validator_name, validator_timestamp) ||
1346 (is_counter= !strcmp(validator_name, validator_counter)))
1347 res= new BigintDataConverter(!is_counter);
1348 break;
1349 }
1350 case MYSQL_TYPE_FLOAT:
1351 if (!strcmp(validator_name, validator_float))
1352 res= new FloatDataConverter;
1353 break;
1354
1355 case MYSQL_TYPE_DOUBLE:
1356 if (!strcmp(validator_name, validator_double))
1357 res= new DoubleDataConverter;
1358 break;
1359
1360 case MYSQL_TYPE_TIMESTAMP:
1361 if (!strcmp(validator_name, validator_timestamp))
1362 res= new TimestampDataConverter;
1363 break;
1364
1365 case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
1366 if (!strcmp(validator_name, validator_uuid) &&
1367 field->real_type() == MYSQL_TYPE_STRING &&
1368 field->field_length == 36)
1369 {
1370 // UUID maps to CHAR(36), its text representation
1371 res= new UuidDataConverter;
1372 break;
1373 }
1374 /* fall through: */
1375 case MYSQL_TYPE_VAR_STRING:
1376 case MYSQL_TYPE_VARCHAR:
1377 case MYSQL_TYPE_BLOB:
1378 {
1379 /*
1380 Cassandra's "varint" type is a binary-encoded arbitary-length
1381 big-endian number.
1382 - It can be mapped to VARBINARY(N), with sufficiently big N.
1383 - If the value does not fit into N bytes, it is an error. We should not
1384 truncate it, because that is just as good as returning garbage.
1385 - varint should not be mapped to BINARY(N), because BINARY(N) values
1386 are zero-padded, which will work as multiplying the value by
1387 2^k for some value of k.
1388 */
1389 if (field->type() == MYSQL_TYPE_VARCHAR &&
1390 field->binary() &&
1391 (!strcmp(validator_name, validator_varint) ||
1392 !strcmp(validator_name, validator_decimal)))
1393 {
1394 res= new StringCopyConverter(field->field_length);
1395 break;
1396 }
1397
1398 if (!strcmp(validator_name, validator_blob) ||
1399 !strcmp(validator_name, validator_ascii) ||
1400 !strcmp(validator_name, validator_text))
1401 {
1402 res= new StringCopyConverter((size_t)-1);
1403 }
1404 break;
1405 }
1406 case MYSQL_TYPE_LONG:
1407 if (!strcmp(validator_name, validator_int))
1408 res= new Int32DataConverter;
1409 break;
1410
1411 default:;
1412 }
1413 return res;
1414 }
1415
1416
setup_field_converters(Field ** field_arg,uint n_fields)1417 bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
1418 {
1419 char *col_name;
1420 int col_name_len;
1421 char *col_type;
1422 int col_type_len;
1423 size_t ddl_fields= se->get_ddl_size();
1424 const char *default_type= se->get_default_validator();
1425 uint max_non_default_fields;
1426 DBUG_ENTER("ha_cassandra::setup_field_converters");
1427 DBUG_ASSERT(default_type);
1428
1429 DBUG_ASSERT(!field_converters);
1430 DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
1431
1432 /*
1433 We always should take into account that in case of using dynamic columns
1434 sql description contain one field which does not described in
1435 Cassandra DDL also key field is described separately. So that
1436 is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
1437 */
1438 max_non_default_fields= ddl_fields + 2 - n_fields;
1439 if (ddl_fields < (n_fields - dyncol_set - 1))
1440 {
1441 se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
1442 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1443 DBUG_RETURN(true);
1444 }
1445
1446 /* allocate memory in one chunk */
1447 size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
1448 (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
1449 (dyncol_set ? max_non_default_fields : 0);
1450 if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
1451 DBUG_RETURN(true);
1452 bzero(field_converters, memsize);
1453 n_field_converters= n_fields;
1454
1455 if (dyncol_set)
1456 {
1457 special_type_field_converters=
1458 (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
1459 special_type_field_names=
1460 ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
1461
1462 if (my_init_dynamic_array(&dynamic_values,
1463 sizeof(DYNAMIC_COLUMN_VALUE),
1464 DYNCOL_USUAL, DYNCOL_DELTA, MYF(0)))
1465 DBUG_RETURN(true);
1466 else
1467 if (my_init_dynamic_array(&dynamic_names,
1468 sizeof(LEX_STRING),
1469 DYNCOL_USUAL, DYNCOL_DELTA,MYF(0)))
1470 {
1471 delete_dynamic(&dynamic_values);
1472 DBUG_RETURN(true);
1473 }
1474 else
1475 if (init_dynamic_string(&dynamic_rec, NULL,
1476 DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
1477 {
1478 delete_dynamic(&dynamic_values);
1479 delete_dynamic(&dynamic_names);
1480 DBUG_RETURN(true);
1481 }
1482
1483 /* Dynamic column field has special processing */
1484 field_converters[dyncol_field]= NULL;
1485
1486 default_type_def= cassandra_types + get_cassandra_type(default_type);
1487 }
1488
1489 se->first_ddl_column();
1490 uint n_mapped= 0;
1491 while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
1492 &col_type_len))
1493 {
1494 Field **field;
1495 uint i;
1496 /* Mapping for the 1st field is already known */
1497 for (field= field_arg + 1, i= 1; *field; field++, i++)
1498 {
1499 if ((!dyncol_set || dyncol_field != i) &&
1500 !strcmp((*field)->field_name.str, col_name))
1501 {
1502 n_mapped++;
1503 ColumnDataConverter **conv= field_converters + (*field)->field_index;
1504 if (!(*conv= map_field_to_validator(*field, col_type)))
1505 {
1506 se->print_error("Failed to map column %s to datatype %s",
1507 (*field)->field_name.str, col_type);
1508 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1509 DBUG_RETURN(true);
1510 }
1511 (*conv)->field= *field;
1512 break;
1513 }
1514 }
1515 if (dyncol_set && !(*field)) // is needed and not found
1516 {
1517 DBUG_PRINT("info",("Field not found: %s", col_name));
1518 if (strcmp(col_type, default_type))
1519 {
1520 DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
1521 col_name, col_type));
1522 special_type_field_names[n_special_type_fields].length= col_name_len;
1523 special_type_field_names[n_special_type_fields].str= col_name;
1524 special_type_field_converters[n_special_type_fields]=
1525 cassandra_types[get_cassandra_type(col_type)];
1526 n_special_type_fields++;
1527 }
1528 }
1529 }
1530
1531 if (n_mapped != n_fields - 1 - dyncol_set)
1532 {
1533 Field *first_unmapped= NULL;
1534 /* Find the first field */
1535 for (uint i= 1; i < n_fields;i++)
1536 {
1537 if (!field_converters[i])
1538 {
1539 first_unmapped= field_arg[i];
1540 break;
1541 }
1542 }
1543 DBUG_ASSERT(first_unmapped);
1544
1545 se->print_error("Field `%s` could not be mapped to any field in Cassandra",
1546 first_unmapped->field_name.str);
1547 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1548 DBUG_RETURN(true);
1549 }
1550
1551 /*
1552 Setup type conversion for row_key.
1553 */
1554 se->get_rowkey_type(&col_name, &col_type);
1555 if (col_name && strcmp(col_name, (*field_arg)->field_name.str))
1556 {
1557 se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
1558 col_name);
1559 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1560 DBUG_RETURN(true);
1561 }
1562 if (!col_name && strcmp("rowkey", (*field_arg)->field_name.str))
1563 {
1564 se->print_error("target column family has no key_alias defined, "
1565 "PRIMARY KEY column must be named 'rowkey'");
1566 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1567 DBUG_RETURN(true);
1568 }
1569
1570 if (col_type != NULL)
1571 {
1572 if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
1573 {
1574 se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
1575 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1576 DBUG_RETURN(true);
1577 }
1578 rowkey_converter->field= *field_arg;
1579 }
1580 else
1581 {
1582 se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
1583 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1584 DBUG_RETURN(true);
1585 }
1586
1587 DBUG_RETURN(false);
1588 }
1589
1590
free_field_converters()1591 void ha_cassandra::free_field_converters()
1592 {
1593 delete rowkey_converter;
1594 rowkey_converter= NULL;
1595
1596 if (dyncol_set)
1597 {
1598 delete_dynamic(&dynamic_values);
1599 delete_dynamic(&dynamic_names);
1600 dynstr_free(&dynamic_rec);
1601 }
1602 if (field_converters)
1603 {
1604 for (uint i=0; i < n_field_converters; i++)
1605 if (field_converters[i])
1606 {
1607 DBUG_ASSERT(!dyncol_set || i != dyncol_field);
1608 delete field_converters[i];
1609 }
1610 my_free(field_converters);
1611 field_converters= NULL;
1612 }
1613 }
1614
1615
index_init(uint idx,bool sorted)1616 int ha_cassandra::index_init(uint idx, bool sorted)
1617 {
1618 int ires;
1619 if (!se && (ires= connect_and_check_options(table)))
1620 return ires;
1621 return 0;
1622 }
1623
1624 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
1625
index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)1626 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
1627 key_part_map keypart_map,
1628 enum ha_rkey_function find_flag)
1629 {
1630 int rc= 0;
1631 DBUG_ENTER("ha_cassandra::index_read_map");
1632
1633 if (find_flag != HA_READ_KEY_EXACT)
1634 {
1635 DBUG_ASSERT(0); /* Non-equality lookups should never be done */
1636 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1637 }
1638
1639 uint key_len= calculate_key_len(table, active_index, key, keypart_map);
1640 store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
1641
1642 char *cass_key;
1643 int cass_key_len;
1644 MY_BITMAP *old_map;
1645
1646 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1647
1648 if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1649 {
1650 /* We get here when making lookups like uuid_column='not-an-uuid' */
1651 dbug_tmp_restore_column_map(&table->read_set, old_map);
1652 DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
1653 }
1654
1655 dbug_tmp_restore_column_map(&table->read_set, old_map);
1656
1657 bool found;
1658 if (se->get_slice(cass_key, cass_key_len, &found))
1659 {
1660 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1661 rc= HA_ERR_INTERNAL_ERROR;
1662 }
1663
1664 /* TODO: what if we're not reading all columns?? */
1665 if (!found)
1666 rc= HA_ERR_KEY_NOT_FOUND;
1667 else
1668 rc= read_cassandra_columns(false);
1669
1670 DBUG_RETURN(rc);
1671 }
1672
1673
print_conversion_error(const char * field_name,char * cass_value,int cass_value_len)1674 void ha_cassandra::print_conversion_error(const char *field_name,
1675 char *cass_value,
1676 int cass_value_len)
1677 {
1678 char buf[32];
1679 char *p= cass_value;
1680 size_t i= 0;
1681 for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
1682 {
1683 buf[i++]= map2number[(*p >> 4) & 0xF];
1684 buf[i++]= map2number[*p & 0xF];
1685 }
1686 buf[i]=0;
1687
1688 se->print_error("Unable to convert value for field `%s` from Cassandra's data"
1689 " format. Source data is %d bytes, 0x%s%s",
1690 field_name, cass_value_len, buf,
1691 (i == sizeof(buf) - 1)? "..." : "");
1692 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1693 }
1694
1695
1696
get_cassandra_field_def(char * cass_name,int cass_name_len)1697 CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
1698 int cass_name_len)
1699 {
1700 CASSANDRA_TYPE_DEF *type= default_type_def;
1701 for(uint i= 0; i < n_special_type_fields; i++)
1702 {
1703 if (cass_name_len == (int)special_type_field_names[i].length &&
1704 memcmp(cass_name, special_type_field_names[i].str,
1705 cass_name_len) == 0)
1706 {
1707 type= special_type_field_converters + i;
1708 break;
1709 }
1710 }
1711 return type;
1712 }
1713
read_cassandra_columns(bool unpack_pk)1714 int ha_cassandra::read_cassandra_columns(bool unpack_pk)
1715 {
1716 MEM_ROOT strings_root;
1717 char *cass_name;
1718 char *cass_value;
1719 int cass_value_len, cass_name_len;
1720 Field **field;
1721 int res= 0;
1722 ulong total_name_len= 0;
1723
1724 clear_alloc_root(&strings_root);
1725 /*
1726 cassandra_to_mariadb() calls will use field->store(...) methods, which
1727 require that the column is in the table->write_set
1728 */
1729 MY_BITMAP *old_map;
1730 old_map= dbug_tmp_use_all_columns(table, &table->write_set);
1731
1732 /* Start with all fields being NULL */
1733 for (field= table->field + 1; *field; field++)
1734 (*field)->set_null();
1735
1736 while (!se->get_next_read_column(&cass_name, &cass_name_len,
1737 &cass_value, &cass_value_len))
1738 {
1739 // map to our column. todo: use hash or something..
1740 bool found= 0;
1741 for (field= table->field + 1; *field; field++)
1742 {
1743 uint fieldnr= (*field)->field_index;
1744 if ((!dyncol_set || dyncol_field != fieldnr) &&
1745 !strcmp((*field)->field_name.str, cass_name))
1746 {
1747 found= 1;
1748 (*field)->set_notnull();
1749 if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
1750 cass_value_len))
1751 {
1752 print_conversion_error((*field)->field_name.str, cass_value,
1753 cass_value_len);
1754 res=1;
1755 goto err;
1756 }
1757 break;
1758 }
1759 }
1760 if (dyncol_set && !found)
1761 {
1762 DYNAMIC_COLUMN_VALUE val;
1763 LEX_STRING nm;
1764 CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
1765 cass_name_len);
1766 nm.str= cass_name;
1767 nm.length= cass_name_len;
1768 if (nm.length > MAX_NAME_LENGTH)
1769 {
1770 se->print_error("Unable to convert value for field `%s`"
1771 " from Cassandra's data format. Name"
1772 " length exceed limit of %u: '%s'",
1773 table->field[dyncol_field]->field_name.str,
1774 (uint)MAX_NAME_LENGTH, cass_name);
1775 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1776 res=1;
1777 goto err;
1778 }
1779 total_name_len+= cass_name_len;
1780 if (nm.length > MAX_TOTAL_NAME_LENGTH)
1781 {
1782 se->print_error("Unable to convert value for field `%s`"
1783 " from Cassandra's data format. Sum of all names"
1784 " length exceed limit of %lu",
1785 table->field[dyncol_field]->field_name.str,
1786 cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
1787 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1788 res=1;
1789 goto err;
1790 }
1791
1792 if ((res= (*(type->cassandra_to_dynamic))(cass_value,
1793 cass_value_len, &val,
1794 &strings_root)) ||
1795 insert_dynamic(&dynamic_names, (uchar *) &nm) ||
1796 insert_dynamic(&dynamic_values, (uchar *) &val))
1797 {
1798 if (res)
1799 {
1800 print_conversion_error(cass_name, cass_value, cass_value_len);
1801 }
1802 free_strings_memroot(&strings_root);
1803 // EOM shouldm be already reported if happened
1804 res= 1;
1805 goto err;
1806 }
1807 }
1808 }
1809
1810 dynamic_rec.length= 0;
1811 if (dyncol_set)
1812 {
1813 if (mariadb_dyncol_create_many_named(&dynamic_rec,
1814 dynamic_names.elements,
1815 (LEX_STRING *)dynamic_names.buffer,
1816 (DYNAMIC_COLUMN_VALUE *)
1817 dynamic_values.buffer,
1818 FALSE) < 0)
1819 dynamic_rec.length= 0;
1820
1821 free_strings_memroot(&strings_root);
1822 dynamic_values.elements= dynamic_names.elements= 0;
1823
1824 if (dynamic_rec.length == 0)
1825 table->field[dyncol_field]->set_null();
1826 else
1827 {
1828 Field_blob *blob= (Field_blob *)table->field[dyncol_field];
1829 blob->set_notnull();
1830 blob->store_length(dynamic_rec.length);
1831 *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
1832 dynamic_rec.str;
1833 }
1834 }
1835
1836 if (unpack_pk)
1837 {
1838 /* Unpack rowkey to primary key */
1839 field= table->field;
1840 (*field)->set_notnull();
1841 se->get_read_rowkey(&cass_value, &cass_value_len);
1842 if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
1843 {
1844 print_conversion_error((*field)->field_name.str, cass_value, cass_value_len);
1845 res=1;
1846 goto err;
1847 }
1848 }
1849
1850 err:
1851 dbug_tmp_restore_column_map(&table->write_set, old_map);
1852 return res;
1853 }
1854
read_dyncol(uint * count,DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names,String * valcol)1855 int ha_cassandra::read_dyncol(uint *count,
1856 DYNAMIC_COLUMN_VALUE **vals,
1857 LEX_STRING **names,
1858 String *valcol)
1859 {
1860 String *strcol;
1861 DYNAMIC_COLUMN col;
1862
1863 enum enum_dyncol_func_result rc;
1864 DBUG_ENTER("ha_cassandra::read_dyncol");
1865
1866 Field *field= table->field[dyncol_field];
1867 DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
1868 /* It is blob and it does not use buffer */
1869 strcol= field->val_str(NULL, valcol);
1870 if (field->is_null())
1871 {
1872 *count= 0;
1873 *names= 0;
1874 *vals= 0;
1875 DBUG_RETURN(0); // nothing to write
1876 }
1877 /*
1878 dynamic_column_vals only read the string so we can
1879 cheat here with assignment
1880 */
1881 bzero(&col, sizeof(col));
1882 col.str= (char *)strcol->ptr();
1883 col.length= strcol->length();
1884 if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0)
1885 {
1886 dynamic_column_error_message(rc);
1887 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1888 }
1889 DBUG_RETURN(0);
1890 }
1891
write_dynamic_row(uint count,DYNAMIC_COLUMN_VALUE * vals,LEX_STRING * names)1892 int ha_cassandra::write_dynamic_row(uint count,
1893 DYNAMIC_COLUMN_VALUE *vals,
1894 LEX_STRING *names)
1895 {
1896 uint i;
1897 DBUG_ENTER("ha_cassandra::write_dynamic_row");
1898 DBUG_ASSERT(dyncol_set);
1899
1900
1901 for (i= 0; i < count; i++)
1902 {
1903 char buff[16];
1904 CASSANDRA_TYPE_DEF *type;
1905 void *freemem= NULL;
1906 char *cass_data;
1907 int cass_data_len;
1908
1909 DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str));
1910 type= get_cassandra_field_def(names[i].str, (int) names[i].length);
1911 if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len,
1912 buff, &freemem))
1913 {
1914 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1915 names[i].str, insert_lineno);
1916 DBUG_RETURN(HA_ERR_GENERIC);
1917 }
1918 se->add_insert_column(names[i].str, names[i].length,
1919 cass_data, cass_data_len);
1920 if (freemem)
1921 my_free(freemem);
1922 }
1923 DBUG_RETURN(0);
1924 }
1925
free_dynamic_row(DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names)1926 void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
1927 LEX_STRING **names)
1928 {
1929 mariadb_dyncol_unpack_free(*names, *vals);
1930 *vals= 0;
1931 *names= 0;
1932 }
1933
write_row(const uchar * buf)1934 int ha_cassandra::write_row(const uchar *buf)
1935 {
1936 MY_BITMAP *old_map;
1937 int ires;
1938 DBUG_ENTER("ha_cassandra::write_row");
1939
1940 if (!se && (ires= connect_and_check_options(table)))
1941 DBUG_RETURN(ires);
1942
1943 if (!doing_insert_batch)
1944 se->clear_insert_buffer();
1945
1946 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1947
1948 insert_lineno++;
1949
1950 /* Convert the key */
1951 char *cass_key;
1952 int cass_key_len;
1953 if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1954 {
1955 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1956 rowkey_converter->field->field_name.str, insert_lineno);
1957 dbug_tmp_restore_column_map(&table->read_set, old_map);
1958 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1959 }
1960 se->start_row_insert(cass_key, cass_key_len);
1961
1962 /* Convert other fields */
1963 for (uint i= 1; i < table->s->fields; i++)
1964 {
1965 char *cass_data;
1966 int cass_data_len;
1967 if (dyncol_set && dyncol_field == i)
1968 {
1969 String valcol;
1970 DYNAMIC_COLUMN_VALUE *vals;
1971 LEX_STRING *names;
1972 uint count;
1973 int rc;
1974 DBUG_ASSERT(field_converters[i] == NULL);
1975 if (!(rc= read_dyncol(&count, &vals, &names, &valcol)))
1976 rc= write_dynamic_row(count, vals, names);
1977 free_dynamic_row(&vals, &names);
1978 if (rc)
1979 {
1980 dbug_tmp_restore_column_map(&table->read_set, old_map);
1981 DBUG_RETURN(rc);
1982 }
1983 }
1984 else
1985 {
1986 if (field_converters[i]->mariadb_to_cassandra(&cass_data,
1987 &cass_data_len))
1988 {
1989 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1990 field_converters[i]->field->field_name.str, insert_lineno);
1991 dbug_tmp_restore_column_map(&table->read_set, old_map);
1992 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1993 }
1994 se->add_insert_column(field_converters[i]->field->field_name.str, 0,
1995 cass_data, cass_data_len);
1996 }
1997 }
1998
1999 dbug_tmp_restore_column_map(&table->read_set, old_map);
2000
2001 bool res;
2002
2003 if (doing_insert_batch)
2004 {
2005 res= 0;
2006 if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
2007 {
2008 res= se->do_insert();
2009 insert_rows_batched= 0;
2010 }
2011 }
2012 else
2013 res= se->do_insert();
2014
2015 if (res)
2016 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2017
2018 DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2019 }
2020
2021
start_bulk_insert(ha_rows rows,uint flags)2022 void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
2023 {
2024 int ires;
2025 if (!se && (ires= connect_and_check_options(table)))
2026 return;
2027
2028 doing_insert_batch= true;
2029 insert_rows_batched= 0;
2030
2031 se->clear_insert_buffer();
2032 }
2033
2034
end_bulk_insert()2035 int ha_cassandra::end_bulk_insert()
2036 {
2037 DBUG_ENTER("ha_cassandra::end_bulk_insert");
2038
2039 if (!doing_insert_batch)
2040 {
2041 /* SQL layer can make end_bulk_insert call without start_bulk_insert call */
2042 DBUG_RETURN(0);
2043 }
2044
2045 /* Flush out the insert buffer */
2046 doing_insert_batch= false;
2047 bool bres= se->do_insert();
2048 se->clear_insert_buffer();
2049
2050 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2051 }
2052
2053
rnd_init(bool scan)2054 int ha_cassandra::rnd_init(bool scan)
2055 {
2056 bool bres;
2057 int ires;
2058 DBUG_ENTER("ha_cassandra::rnd_init");
2059
2060 if (!se && (ires= connect_and_check_options(table)))
2061 DBUG_RETURN(ires);
2062
2063 if (!scan)
2064 {
2065 /* Prepare for rnd_pos() calls. We don't need to anything. */
2066 DBUG_RETURN(0);
2067 }
2068
2069 if (dyncol_set)
2070 {
2071 se->clear_read_all_columns();
2072 }
2073 else
2074 {
2075 se->clear_read_columns();
2076 for (uint i= 1; i < table->s->fields; i++)
2077 se->add_read_column(table->field[i]->field_name.str);
2078 }
2079
2080 se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
2081 bres= se->get_range_slices(false);
2082 if (bres)
2083 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2084
2085 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2086 }
2087
2088
rnd_end()2089 int ha_cassandra::rnd_end()
2090 {
2091 DBUG_ENTER("ha_cassandra::rnd_end");
2092
2093 se->finish_reading_range_slices();
2094 DBUG_RETURN(0);
2095 }
2096
2097
rnd_next(uchar * buf)2098 int ha_cassandra::rnd_next(uchar *buf)
2099 {
2100 int rc;
2101 bool reached_eof;
2102 DBUG_ENTER("ha_cassandra::rnd_next");
2103
2104 // Unpack and return the next record.
2105 if (se->get_next_range_slice_row(&reached_eof))
2106 {
2107 rc= HA_ERR_INTERNAL_ERROR;
2108 }
2109 else
2110 {
2111 if (reached_eof)
2112 rc= HA_ERR_END_OF_FILE;
2113 else
2114 rc= read_cassandra_columns(true);
2115 }
2116
2117 DBUG_RETURN(rc);
2118 }
2119
2120
delete_all_rows()2121 int ha_cassandra::delete_all_rows()
2122 {
2123 bool bres;
2124 int ires;
2125 DBUG_ENTER("ha_cassandra::delete_all_rows");
2126
2127 if (!se && (ires= connect_and_check_options(table)))
2128 DBUG_RETURN(ires);
2129
2130 bres= se->truncate();
2131
2132 if (bres)
2133 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2134
2135 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2136 }
2137
2138
delete_row(const uchar * buf)2139 int ha_cassandra::delete_row(const uchar *buf)
2140 {
2141 bool bres;
2142 DBUG_ENTER("ha_cassandra::delete_row");
2143
2144 bres= se->remove_row();
2145
2146 if (bres)
2147 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2148
2149 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2150 }
2151
2152
info(uint flag)2153 int ha_cassandra::info(uint flag)
2154 {
2155 DBUG_ENTER("ha_cassandra::info");
2156
2157 if (!table)
2158 return 1;
2159
2160 if (flag & HA_STATUS_VARIABLE)
2161 {
2162 stats.records= 1000;
2163 stats.deleted= 0;
2164 }
2165 if (flag & HA_STATUS_CONST)
2166 {
2167 ref_length= table->field[0]->key_length();
2168 }
2169
2170 DBUG_RETURN(0);
2171 }
2172
2173
2174 void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info,
2175 uint key_length, bool with_zerofill);
2176
2177
position(const uchar * record)2178 void ha_cassandra::position(const uchar *record)
2179 {
2180 DBUG_ENTER("ha_cassandra::position");
2181
2182 /* Copy the primary key to rowid */
2183 key_copy(ref, (uchar*)record, &table->key_info[0],
2184 table->field[0]->key_length(), true);
2185
2186 DBUG_VOID_RETURN;
2187 }
2188
2189
rnd_pos(uchar * buf,uchar * pos)2190 int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
2191 {
2192 int rc;
2193 DBUG_ENTER("ha_cassandra::rnd_pos");
2194
2195 int save_active_index= active_index;
2196 active_index= 0; /* The primary key */
2197 rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
2198
2199 active_index= save_active_index;
2200
2201 DBUG_RETURN(rc);
2202 }
2203
2204
reset()2205 int ha_cassandra::reset()
2206 {
2207 doing_insert_batch= false;
2208 insert_lineno= 0;
2209 if (se)
2210 {
2211 se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
2212 THDVAR(table->in_use, write_consistency));
2213 se->set_n_retries(THDVAR(table->in_use, failure_retries));
2214 }
2215 return 0;
2216 }
2217
2218 /////////////////////////////////////////////////////////////////////////////
2219 // MRR implementation
2220 /////////////////////////////////////////////////////////////////////////////
2221
2222
2223 /*
2224 - The key can be only primary key
2225 - allow equality-ranges only.
2226 - anything else?
2227 */
multi_range_read_info_const(uint keyno,RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint * bufsz,uint * flags,Cost_estimate * cost)2228 ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
2229 void *seq_init_param,
2230 uint n_ranges, uint *bufsz,
2231 uint *flags, Cost_estimate *cost)
2232 {
2233 /* No support for const ranges so far */
2234 return HA_POS_ERROR;
2235 }
2236
2237
multi_range_read_info(uint keyno,uint n_ranges,uint keys,uint key_parts,uint * bufsz,uint * flags,Cost_estimate * cost)2238 ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
2239 uint key_parts, uint *bufsz,
2240 uint *flags, Cost_estimate *cost)
2241 {
2242 /* Can only be equality lookups on the primary key... */
2243 // TODO anything else?
2244 *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
2245 *flags |= HA_MRR_NO_ASSOCIATION;
2246
2247 return 10;
2248 }
2249
2250
multi_range_read_init(RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint mode,HANDLER_BUFFER * buf)2251 int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
2252 uint n_ranges, uint mode, HANDLER_BUFFER *buf)
2253 {
2254 int res;
2255 mrr_iter= seq->init(seq_init_param, n_ranges, mode);
2256 mrr_funcs= *seq;
2257 res= mrr_start_read();
2258 return (res? HA_ERR_INTERNAL_ERROR: 0);
2259 }
2260
2261
mrr_start_read()2262 bool ha_cassandra::mrr_start_read()
2263 {
2264 uint key_len;
2265
2266 MY_BITMAP *old_map;
2267 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2268
2269 se->new_lookup_keys();
2270
2271 while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
2272 {
2273 char *cass_key;
2274 int cass_key_len;
2275
2276 DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
2277
2278 uchar *key= (uchar*)mrr_cur_range.start_key.key;
2279 key_len= mrr_cur_range.start_key.length;
2280 //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
2281 store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
2282
2283 rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
2284
2285 // Primitive buffer control
2286 if ((ulong) se->add_lookup_key(cass_key, cass_key_len) >
2287 THDVAR(table->in_use, multiget_batch_size))
2288 break;
2289 }
2290
2291 dbug_tmp_restore_column_map(&table->read_set, old_map);
2292
2293 return se->multiget_slice();
2294 }
2295
2296
multi_range_read_next(range_id_t * range_info)2297 int ha_cassandra::multi_range_read_next(range_id_t *range_info)
2298 {
2299 int res;
2300 while(1)
2301 {
2302 if (!se->get_next_multiget_row())
2303 {
2304 res= read_cassandra_columns(true);
2305 break;
2306 }
2307 else
2308 {
2309 if (source_exhausted)
2310 {
2311 res= HA_ERR_END_OF_FILE;
2312 break;
2313 }
2314 else
2315 {
2316 if (mrr_start_read())
2317 {
2318 res= HA_ERR_INTERNAL_ERROR;
2319 break;
2320 }
2321 }
2322 }
2323 /*
2324 We get here if we've refilled the buffer and done another read. Try
2325 reading from results again
2326 */
2327 }
2328 return res;
2329 }
2330
2331
multi_range_read_explain_info(uint mrr_mode,char * str,size_t size)2332 int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
2333 {
2334 const char *mrr_str= "multiget_slice";
2335
2336 if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
2337 {
2338 uint mrr_str_len= strlen(mrr_str);
2339 uint copy_len= MY_MIN(mrr_str_len, size);
2340 memcpy(str, mrr_str, size);
2341 return copy_len;
2342 }
2343 return 0;
2344 }
2345
2346
2347 class Column_name_enumerator_impl : public Column_name_enumerator
2348 {
2349 ha_cassandra *obj;
2350 uint idx;
2351 public:
Column_name_enumerator_impl(ha_cassandra * obj_arg)2352 Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
get_next_name()2353 const char* get_next_name()
2354 {
2355 if (idx == obj->table->s->fields)
2356 return NULL;
2357 else
2358 return obj->table->field[idx++]->field_name.str;
2359 }
2360 };
2361
2362
update_row(const uchar * old_data,const uchar * new_data)2363 int ha_cassandra::update_row(const uchar *old_data, const uchar *new_data)
2364 {
2365 DYNAMIC_COLUMN_VALUE *oldvals, *vals;
2366 LEX_STRING *oldnames, *names;
2367 uint oldcount, count;
2368 String oldvalcol, valcol;
2369 MY_BITMAP *old_map;
2370 int res;
2371 DBUG_ENTER("ha_cassandra::update_row");
2372 /* Currently, it is guaranteed that new_data == table->record[0] */
2373 DBUG_ASSERT(new_data == table->record[0]);
2374 /* For now, just rewrite the full record */
2375 se->clear_insert_buffer();
2376
2377 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2378
2379 char *old_key;
2380 int old_key_len;
2381 se->get_read_rowkey(&old_key, &old_key_len);
2382
2383 /* Get the key we're going to write */
2384 char *new_key;
2385 int new_key_len;
2386 if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
2387 {
2388 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2389 rowkey_converter->field->field_name.str, insert_lineno);
2390 dbug_tmp_restore_column_map(&table->read_set, old_map);
2391 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2392 }
2393
2394 /*
2395 Compare it to the key we've read. For all types that Cassandra supports,
2396 binary byte-wise comparison can be used
2397 */
2398 bool new_primary_key;
2399 if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
2400 new_primary_key= true;
2401 else
2402 new_primary_key= false;
2403
2404 if (dyncol_set)
2405 {
2406 Field *field= table->field[dyncol_field];
2407 /* move to get old_data */
2408 my_ptrdiff_t diff;
2409 diff= (my_ptrdiff_t) (old_data - new_data);
2410 field->move_field_offset(diff); // Points now at old_data
2411 if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol)))
2412 DBUG_RETURN(res);
2413 field->move_field_offset(-diff); // back to new_data
2414 if ((res= read_dyncol(&count, &vals, &names, &valcol)))
2415 {
2416 free_dynamic_row(&oldvals, &oldnames);
2417 DBUG_RETURN(res);
2418 }
2419 }
2420
2421 if (new_primary_key)
2422 {
2423 /*
2424 Primary key value changed. This is essentially a DELETE + INSERT.
2425 Add a DELETE operation into the batch
2426 */
2427 Column_name_enumerator_impl name_enumerator(this);
2428 se->add_row_deletion(old_key, old_key_len, &name_enumerator,
2429 oldnames,
2430 (dyncol_set ? oldcount : 0));
2431 oldcount= 0; // they will be deleted
2432 }
2433
2434 se->start_row_insert(new_key, new_key_len);
2435
2436 /* Convert other fields */
2437 for (uint i= 1; i < table->s->fields; i++)
2438 {
2439 char *cass_data;
2440 int cass_data_len;
2441 if (dyncol_set && dyncol_field == i)
2442 {
2443 DBUG_ASSERT(field_converters[i] == NULL);
2444 if ((res= write_dynamic_row(count, vals, names)))
2445 goto err;
2446 }
2447 else
2448 {
2449 if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
2450 {
2451 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2452 field_converters[i]->field->field_name.str, insert_lineno);
2453 dbug_tmp_restore_column_map(&table->read_set, old_map);
2454 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2455 }
2456 se->add_insert_column(field_converters[i]->field->field_name.str, 0,
2457 cass_data, cass_data_len);
2458 }
2459 }
2460 if (dyncol_set)
2461 {
2462 /* find removed fields */
2463 uint i= 0, j= 0;
2464 /* both array are sorted */
2465 for(; i < oldcount; i++)
2466 {
2467 int scmp= 0;
2468 while (j < count &&
2469 (scmp = mariadb_dyncol_column_cmp_named(names + j,
2470 oldnames + i)) < 0)
2471 j++;
2472 if (j < count &&
2473 scmp == 0)
2474 j++;
2475 else
2476 se->add_insert_delete_column(oldnames[i].str, oldnames[i].length);
2477 }
2478 }
2479
2480 dbug_tmp_restore_column_map(&table->read_set, old_map);
2481
2482 res= se->do_insert();
2483
2484 if (res)
2485 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2486
2487 err:
2488 if (dyncol_set)
2489 {
2490 free_dynamic_row(&oldvals, &oldnames);
2491 free_dynamic_row(&vals, &names);
2492 }
2493
2494 DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2495 }
2496
2497
2498 /*
2499 We can't really have any locks for Cassandra Storage Engine. We're reading
2500 from Cassandra cluster, and other clients can asynchronously modify the data.
2501
2502 We can enforce locking within this process, but this will not be useful.
2503
2504 Thus, store_lock() should express that:
2505 - Writes do not block other writes
2506 - Reads should not block anything either, including INSERTs.
2507 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)2508 THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
2509 THR_LOCK_DATA **to,
2510 enum thr_lock_type lock_type)
2511 {
2512 DBUG_ENTER("ha_cassandra::store_lock");
2513 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2514 {
2515 /* Writes allow other writes */
2516 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2517 lock_type <= TL_WRITE))
2518 lock_type = TL_WRITE_ALLOW_WRITE;
2519
2520 /* Reads allow everything, including INSERTs */
2521 if (lock_type == TL_READ_NO_INSERT)
2522 lock_type = TL_READ;
2523
2524 lock.type= lock_type;
2525 }
2526 *to++= &lock;
2527 DBUG_RETURN(to);
2528 }
2529
2530
records_in_range(uint inx,key_range * min_key,key_range * max_key)2531 ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
2532 key_range *max_key)
2533 {
2534 DBUG_ENTER("ha_cassandra::records_in_range");
2535 DBUG_RETURN(HA_POS_ERROR); /* Range scans are not supported */
2536 }
2537
2538
2539 /**
2540 check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
2541 if new and old definition are compatible
2542
2543 @details If there are no other explicit signs like changed number of
2544 fields this function will be called by compare_tables()
2545 (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
2546 file.
2547
2548 */
2549
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)2550 bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
2551 uint table_changes)
2552 {
2553 DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
2554 /* Checked, we intend to have this empty for Cassandra SE. */
2555 DBUG_RETURN(COMPATIBLE_DATA_YES);
2556 }
2557
2558
print_error(const char * format,...)2559 void Cassandra_se_interface::print_error(const char *format, ...)
2560 {
2561 va_list ap;
2562 va_start(ap, format);
2563 // it's not a problem if output was truncated
2564 my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
2565 va_end(ap);
2566 }
2567
2568
2569 struct st_mysql_storage_engine cassandra_storage_engine=
2570 { MYSQL_HANDLERTON_INTERFACE_VERSION };
2571
2572 static SHOW_VAR cassandra_status_variables[]= {
2573 {"row_inserts",
2574 (char*) &cassandra_counters.row_inserts, SHOW_LONG},
2575 {"row_insert_batches",
2576 (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
2577
2578 {"multiget_keys_scanned",
2579 (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
2580 {"multiget_reads",
2581 (char*) &cassandra_counters.multiget_reads, SHOW_LONG},
2582 {"multiget_rows_read",
2583 (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
2584
2585 {"network_exceptions",
2586 (char*) &cassandra_counters.network_exceptions, SHOW_LONG},
2587 {"timeout_exceptions",
2588 (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
2589 {"unavailable_exceptions",
2590 (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
2591 {NullS, NullS, SHOW_LONG}
2592 };
2593
2594
2595
2596
maria_declare_plugin(cassandra)2597 maria_declare_plugin(cassandra)
2598 {
2599 MYSQL_STORAGE_ENGINE_PLUGIN,
2600 &cassandra_storage_engine,
2601 "CASSANDRA",
2602 "Monty Program Ab",
2603 "Cassandra storage engine",
2604 PLUGIN_LICENSE_GPL,
2605 cassandra_init_func, /* Plugin Init */
2606 cassandra_done_func, /* Plugin Deinit */
2607 0x0001, /* version number (0.1) */
2608 cassandra_status_variables, /* status variables */
2609 cassandra_system_variables, /* system variables */
2610 "0.1", /* string version */
2611 MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */
2612 }
2613 maria_declare_plugin_end;
2614