1 /*
2 Copyright (c) 2012, 2020, MariaDB Corporation.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation // gcc: Class implementation
19 #endif
20
21 #include <my_config.h>
22 #include <mysql/plugin.h>
23 #include "ha_cassandra.h"
24 #include "sql_class.h"
25
26 #define DYNCOL_USUAL 20
27 #define DYNCOL_DELTA 100
28 #define DYNCOL_USUAL_REC 1024
29 #define DYNCOL_DELTA_REC 1024
30
31 static handler *cassandra_create_handler(handlerton *hton,
32 TABLE_SHARE *table,
33 MEM_ROOT *mem_root);
34
35 extern int dynamic_column_error_message(enum_dyncol_func_result rc);
36
37 handlerton *cassandra_hton;
38
39
40 /*
41 Hash used to track the number of open tables; variable for example share
42 methods
43 */
44 static HASH cassandra_open_tables;
45
46 /* The mutex used to init the hash; variable for example share methods */
47 mysql_mutex_t cassandra_mutex;
48
49
50 /**
51 Structure for CREATE TABLE options (table options).
52 It needs to be called ha_table_option_struct.
53
54 The option values can be specified in the CREATE TABLE at the end:
55 CREATE TABLE ( ... ) *here*
56 */
57
58 struct ha_table_option_struct
59 {
60 const char *thrift_host;
61 int thrift_port;
62 const char *keyspace;
63 const char *column_family;
64 };
65
66
67 ha_create_table_option cassandra_table_option_list[]=
68 {
69 /*
70 one option that takes an arbitrary string
71 */
72 HA_TOPTION_STRING("thrift_host", thrift_host),
73 HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
74 HA_TOPTION_STRING("keyspace", keyspace),
75 HA_TOPTION_STRING("column_family", column_family),
76 HA_TOPTION_END
77 };
78
79 /**
80 Structure for CREATE TABLE options (field options).
81 */
82
83 struct ha_field_option_struct
84 {
85 bool dyncol_field;
86 };
87
88 ha_create_table_option cassandra_field_option_list[]=
89 {
90 /*
91 Collect all other columns as dynamic here,
92 the valid values are YES/NO, ON/OFF, 1/0.
93 The default is 0, that is false, no, off.
94 */
95 HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
96 HA_FOPTION_END
97 };
98
99 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
100 "Number of rows in an INSERT batch",
101 NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
102
103 static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
104 "Number of rows in a multiget(MRR) batch",
105 NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
106
107 static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
108 "Number of rows in an rnd_read (full scan) batch",
109 NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
110
111 static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
112 "Number of times to retry Cassandra calls that failed due to timeouts or "
113 "network communication problems. The default, 0, means not to retry.",
114 NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
115
116 /* These match values in enum_cassandra_consistency_level */
117 const char *cassandra_consistency_level[] =
118 {
119 "ONE",
120 "QUORUM",
121 "LOCAL_QUORUM",
122 "EACH_QUORUM",
123 "ALL",
124 "ANY",
125 "TWO",
126 "THREE",
127 NullS
128 };
129
130 TYPELIB cassandra_consistency_level_typelib= {
131 array_elements(cassandra_consistency_level) - 1, "",
132 cassandra_consistency_level, NULL
133 };
134
135
136 static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
137 "Cassandra consistency level to use for write operations", NULL, NULL,
138 ONE, &cassandra_consistency_level_typelib);
139
140 static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
141 "Cassandra consistency level to use for read operations", NULL, NULL,
142 ONE, &cassandra_consistency_level_typelib);
143
144
145 mysql_mutex_t cassandra_default_host_lock;
146 static char* cassandra_default_thrift_host = NULL;
147 static char cassandra_default_host_buf[256]="";
148
149 static void
cassandra_default_thrift_host_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)150 cassandra_default_thrift_host_update(THD *thd,
151 struct st_mysql_sys_var* var,
152 void* var_ptr, /*!< out: where the
153 formal string goes */
154 const void* save) /*!< in: immediate result
155 from check function */
156 {
157 const char *new_host= *((char**)save);
158 const size_t max_len= sizeof(cassandra_default_host_buf);
159
160 mysql_mutex_lock(&cassandra_default_host_lock);
161
162 if (new_host)
163 {
164 strncpy(cassandra_default_host_buf, new_host, max_len-1);
165 cassandra_default_host_buf[max_len-1]= 0;
166 cassandra_default_thrift_host= cassandra_default_host_buf;
167 }
168 else
169 {
170 cassandra_default_host_buf[0]= 0;
171 cassandra_default_thrift_host= NULL;
172 }
173
174 *((const char**)var_ptr)= cassandra_default_thrift_host;
175
176 mysql_mutex_unlock(&cassandra_default_host_lock);
177 }
178
179
180 static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
181 PLUGIN_VAR_RQCMDARG,
182 "Default host for Cassandra thrift connections",
183 /*check*/NULL,
184 cassandra_default_thrift_host_update,
185 /*default*/NULL);
186
187 static struct st_mysql_sys_var* cassandra_system_variables[]= {
188 MYSQL_SYSVAR(insert_batch_size),
189 MYSQL_SYSVAR(multiget_batch_size),
190 MYSQL_SYSVAR(rnd_batch_size),
191
192 MYSQL_SYSVAR(default_thrift_host),
193 MYSQL_SYSVAR(write_consistency),
194 MYSQL_SYSVAR(read_consistency),
195 MYSQL_SYSVAR(failure_retries),
196 NULL
197 };
198
199 Cassandra_status_vars cassandra_counters;
200
201 /**
202 @brief
203 Function we use in the creation of our hash to get key.
204 */
205
cassandra_get_key(CASSANDRA_SHARE * share,size_t * length,my_bool not_used)206 static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
207 my_bool not_used __attribute__((unused)))
208 {
209 *length=share->table_name_length;
210 return (uchar*) share->table_name;
211 }
212
213 #ifdef HAVE_PSI_INTERFACE
214 static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
215
216 static PSI_mutex_info all_cassandra_mutexes[]=
217 {
218 { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
219 { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
220 };
221
init_cassandra_psi_keys()222 static void init_cassandra_psi_keys()
223 {
224 const char* category= "cassandra";
225 int count;
226
227 if (PSI_server == NULL)
228 return;
229
230 count= array_elements(all_cassandra_mutexes);
231 PSI_server->register_mutex(category, all_cassandra_mutexes, count);
232 }
233 #endif
234
cassandra_init_func(void * p)235 static int cassandra_init_func(void *p)
236 {
237 DBUG_ENTER("cassandra_init_func");
238
239 #ifdef HAVE_PSI_INTERFACE
240 init_cassandra_psi_keys();
241 #endif
242
243 cassandra_hton= (handlerton *)p;
244 mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
245 (void) my_hash_init(PSI_INSTRUMENT_ME, &cassandra_open_tables,system_charset_info,32,0,0,
246 (my_hash_get_key) cassandra_get_key,0,0);
247
248 cassandra_hton->create= cassandra_create_handler;
249 /*
250 Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
251 TABLE to create an *empty* table from scratch. Cassandra table won't be
252 emptied if re-created.
253 */
254 cassandra_hton->flags= 0;
255 cassandra_hton->table_options= cassandra_table_option_list;
256 cassandra_hton->field_options= cassandra_field_option_list;
257
258 mysql_mutex_init(0 /* no instrumentation */,
259 &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
260
261 DBUG_RETURN(0);
262 }
263
264
cassandra_done_func(void * p)265 static int cassandra_done_func(void *p)
266 {
267 int error= 0;
268 DBUG_ENTER("cassandra_done_func");
269 if (cassandra_open_tables.records)
270 error= 1;
271 my_hash_free(&cassandra_open_tables);
272 mysql_mutex_destroy(&cassandra_mutex);
273 mysql_mutex_destroy(&cassandra_default_host_lock);
274 DBUG_RETURN(error);
275 }
276
277
278 /**
279 @brief
280 Example of simple lock controls. The "share" it creates is a
281 structure we will pass to each cassandra handler. Do you have to have
282 one of these? Well, you have pieces that are used for locking, and
283 they are needed to function.
284 */
285
get_share(const char * table_name,TABLE * table)286 static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
287 {
288 CASSANDRA_SHARE *share;
289 uint length;
290 char *tmp_name;
291
292 mysql_mutex_lock(&cassandra_mutex);
293 length=(uint) strlen(table_name);
294
295 if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
296 (uchar*) table_name,
297 length)))
298 {
299 if (!(share=(CASSANDRA_SHARE *)
300 my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), PSI_INSTRUMENT_ME,
301 &share, sizeof(*share),
302 &tmp_name, length+1,
303 NullS)))
304 {
305 mysql_mutex_unlock(&cassandra_mutex);
306 return NULL;
307 }
308
309 share->use_count=0;
310 share->table_name_length=length;
311 share->table_name=tmp_name;
312 strmov(share->table_name,table_name);
313 if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
314 goto error;
315 thr_lock_init(&share->lock);
316 mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
317 &share->mutex, MY_MUTEX_INIT_FAST);
318 }
319 share->use_count++;
320 mysql_mutex_unlock(&cassandra_mutex);
321
322 return share;
323
324 error:
325 mysql_mutex_destroy(&share->mutex);
326 my_free(share);
327
328 return NULL;
329 }
330
331
332 /**
333 @brief
334 Free lock controls. We call this whenever we close a table. If the table had
335 the last reference to the share, then we free memory associated with it.
336 */
337
free_share(CASSANDRA_SHARE * share)338 static int free_share(CASSANDRA_SHARE *share)
339 {
340 mysql_mutex_lock(&cassandra_mutex);
341 if (!--share->use_count)
342 {
343 my_hash_delete(&cassandra_open_tables, (uchar*) share);
344 thr_lock_delete(&share->lock);
345 mysql_mutex_destroy(&share->mutex);
346 my_free(share);
347 }
348 mysql_mutex_unlock(&cassandra_mutex);
349
350 return 0;
351 }
352
353
cassandra_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)354 static handler* cassandra_create_handler(handlerton *hton,
355 TABLE_SHARE *table,
356 MEM_ROOT *mem_root)
357 {
358 return new (mem_root) ha_cassandra(hton, table);
359 }
360
361
ha_cassandra(handlerton * hton,TABLE_SHARE * table_arg)362 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
363 :handler(hton, table_arg),
364 se(NULL), field_converters(NULL),
365 special_type_field_converters(NULL),
366 special_type_field_names(NULL), n_special_type_fields(0),
367 rowkey_converter(NULL),
368 dyncol_field(0), dyncol_set(0)
369 {}
370
371
connect_and_check_options(TABLE * table_arg)372 int ha_cassandra::connect_and_check_options(TABLE *table_arg)
373 {
374 ha_table_option_struct *options= table_arg->s->option_struct;
375 int res;
376 DBUG_ENTER("ha_cassandra::connect_and_check_options");
377
378 if ((res= check_field_options(table_arg->s->field)) ||
379 (res= check_table_options(options)))
380 DBUG_RETURN(res);
381
382 se= create_cassandra_se();
383 se->set_column_family(options->column_family);
384 const char *thrift_host= options->thrift_host? options->thrift_host:
385 cassandra_default_thrift_host;
386 if (se->connect(thrift_host, options->thrift_port, options->keyspace))
387 {
388 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
389 DBUG_RETURN(HA_ERR_NO_CONNECTION);
390 }
391
392 if (setup_field_converters(table_arg->field, table_arg->s->fields))
393 {
394 DBUG_RETURN(HA_ERR_NO_CONNECTION);
395 }
396
397 DBUG_RETURN(0);
398 }
399
400
check_field_options(Field ** fields)401 int ha_cassandra::check_field_options(Field **fields)
402 {
403 Field **field;
404 uint i;
405 DBUG_ENTER("ha_cassandra::check_field_options");
406 for (field= fields, i= 0; *field; field++, i++)
407 {
408 ha_field_option_struct *field_options= (*field)->option_struct;
409 if (field_options && field_options->dyncol_field)
410 {
411 if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
412 {
413 my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name.str);
414 DBUG_RETURN(HA_WRONG_CREATE_OPTION);
415 }
416 dyncol_set= 1;
417 dyncol_field= i;
418 bzero(&dynamic_values, sizeof(dynamic_values));
419 bzero(&dynamic_names, sizeof(dynamic_names));
420 bzero(&dynamic_rec, sizeof(dynamic_rec));
421 }
422 }
423 DBUG_RETURN(0);
424 }
425
426
open(const char * name,int mode,uint test_if_locked)427 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
428 {
429 DBUG_ENTER("ha_cassandra::open");
430
431 if (!(share = get_share(name, table)))
432 DBUG_RETURN(1);
433 thr_lock_data_init(&share->lock,&lock,NULL);
434
435 DBUG_ASSERT(!se);
436 /*
437 Don't do the following on open: it prevents SHOW CREATE TABLE when the server
438 has gone away.
439 */
440 /*
441 int res;
442 if ((res= connect_and_check_options(table)))
443 {
444 DBUG_RETURN(res);
445 }
446 */
447
448 info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
449 insert_lineno= 0;
450
451 DBUG_RETURN(0);
452 }
453
454
close(void)455 int ha_cassandra::close(void)
456 {
457 DBUG_ENTER("ha_cassandra::close");
458 delete se;
459 se= NULL;
460 free_field_converters();
461 DBUG_RETURN(free_share(share));
462 }
463
464
check_table_options(ha_table_option_struct * options)465 int ha_cassandra::check_table_options(ha_table_option_struct *options)
466 {
467 if (!options->thrift_host && (!cassandra_default_thrift_host ||
468 !cassandra_default_thrift_host[0]))
469 {
470 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
471 "thrift_host table option must be specified, or "
472 "@@cassandra_default_thrift_host must be set");
473 return HA_WRONG_CREATE_OPTION;
474 }
475
476 if (!options->keyspace || !options->column_family)
477 {
478 my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
479 "keyspace and column_family table options must be specified");
480 return HA_WRONG_CREATE_OPTION;
481 }
482 return 0;
483 }
484
485
486 /**
487 @brief
488 create() is called to create a table. The variable name will have the name
489 of the table.
490
491 @details
492 When create() is called you do not need to worry about
493 opening the table. Also, the .frm file will have already been
494 created so adjusting create_info is not necessary. You can overwrite
495 the .frm file at this point if you wish to change the table
496 definition, but there are no methods currently provided for doing
497 so.
498
499 Called from handle.cc by ha_create_table().
500
501 @see
502 ha_create_table() in handle.cc
503 */
504
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)505 int ha_cassandra::create(const char *name, TABLE *table_arg,
506 HA_CREATE_INFO *create_info)
507 {
508 int res;
509 DBUG_ENTER("ha_cassandra::create");
510
511 if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
512 table_arg->key_info[0].user_defined_key_parts != 1 ||
513 table_arg->key_info[0].key_part[0].fieldnr != 1)
514 {
515 my_error(ER_WRONG_COLUMN_NAME, MYF(0),
516 "Table must have PRIMARY KEY defined over the first column");
517 DBUG_RETURN(HA_WRONG_CREATE_OPTION);
518 }
519
520 DBUG_ASSERT(!se);
521 if ((res= connect_and_check_options(table_arg)))
522 DBUG_RETURN(res);
523
524 insert_lineno= 0;
525 DBUG_RETURN(0);
526 }
527
528 /*
529 Mapping needs to
530 - copy value from MySQL record to Thrift buffer
531 - copy value from Thrift bufer to MySQL record..
532
533 */
534
535 /* Converter base */
536 class ColumnDataConverter
537 {
538 public:
539 Field *field;
540
541 /* This will save Cassandra's data in the Field */
542 virtual int cassandra_to_mariadb(const char *cass_data,
543 int cass_data_len)=0;
544
545 /*
546 This will get data from the Field pointer, store Cassandra's form
547 in internal buffer, and return pointer/size.
548
549 @return
550 false - OK
551 true - Failed to convert value (completely, there is no value to insert
552 at all).
553 */
554 virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
~ColumnDataConverter()555 virtual ~ColumnDataConverter() {};
556 };
557
558
559 class DoubleDataConverter : public ColumnDataConverter
560 {
561 double buf;
562 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)563 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
564 {
565 DBUG_ASSERT(cass_data_len == sizeof(double));
566 double *pdata= (double*) cass_data;
567 field->store(*pdata);
568 return 0;
569 }
570
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)571 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
572 {
573 buf= field->val_real();
574 *cass_data= (char*)&buf;
575 *cass_data_len=sizeof(double);
576 return false;
577 }
~DoubleDataConverter()578 ~DoubleDataConverter(){}
579 };
580
581
582 class FloatDataConverter : public ColumnDataConverter
583 {
584 float buf;
585 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)586 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
587 {
588 DBUG_ASSERT(cass_data_len == sizeof(float));
589 float *pdata= (float*) cass_data;
590 field->store(*pdata);
591 return 0;
592 }
593
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)594 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
595 {
596 buf= field->val_real();
597 *cass_data= (char*)&buf;
598 *cass_data_len=sizeof(float);
599 return false;
600 }
~FloatDataConverter()601 ~FloatDataConverter(){}
602 };
603
flip64(const char * from,char * to)604 static void flip64(const char *from, char* to)
605 {
606 to[0]= from[7];
607 to[1]= from[6];
608 to[2]= from[5];
609 to[3]= from[4];
610 to[4]= from[3];
611 to[5]= from[2];
612 to[6]= from[1];
613 to[7]= from[0];
614 }
615
616 class BigintDataConverter : public ColumnDataConverter
617 {
618 longlong buf;
619 bool flip; /* is false when reading counter columns */
620 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)621 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
622 {
623 longlong tmp;
624 DBUG_ASSERT(cass_data_len == sizeof(longlong));
625 if (flip)
626 flip64(cass_data, (char*)&tmp);
627 else
628 memcpy(&tmp, cass_data, sizeof(longlong));
629 field->store(tmp);
630 return 0;
631 }
632
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)633 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
634 {
635 longlong tmp= field->val_int();
636 if (flip)
637 flip64((const char*)&tmp, (char*)&buf);
638 else
639 memcpy(&buf, &tmp, sizeof(longlong));
640 *cass_data= (char*)&buf;
641 *cass_data_len=sizeof(longlong);
642 return false;
643 }
BigintDataConverter(bool flip_arg)644 BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
~BigintDataConverter()645 ~BigintDataConverter(){}
646 };
647
flip32(const char * from,char * to)648 static void flip32(const char *from, char* to)
649 {
650 to[0]= from[3];
651 to[1]= from[2];
652 to[2]= from[1];
653 to[3]= from[0];
654 }
655
656
657 class TinyintDataConverter : public ColumnDataConverter
658 {
659 char buf;
660 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)661 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
662 {
663 DBUG_ASSERT(cass_data_len == 1);
664 field->store(cass_data[0]);
665 return 0;
666 }
667
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)668 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
669 {
670 buf= field->val_int()? 1 : 0; /* TODO: error handling? */
671 *cass_data= (char*)&buf;
672 *cass_data_len= 1;
673 return false;
674 }
~TinyintDataConverter()675 ~TinyintDataConverter(){}
676 };
677
678
679 class Int32DataConverter : public ColumnDataConverter
680 {
681 int32_t buf;
682 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)683 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
684 {
685 int32_t tmp;
686 DBUG_ASSERT(cass_data_len == sizeof(int32_t));
687 flip32(cass_data, (char*)&tmp);
688 field->store(tmp);
689 return 0;
690 }
691
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)692 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
693 {
694 int32_t tmp= field->val_int();
695 flip32((const char*)&tmp, (char*)&buf);
696 *cass_data= (char*)&buf;
697 *cass_data_len=sizeof(int32_t);
698 return false;
699 }
~Int32DataConverter()700 ~Int32DataConverter(){}
701 };
702
703
704 class StringCopyConverter : public ColumnDataConverter
705 {
706 String buf;
707 size_t max_length;
708 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)709 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
710 {
711 if ((size_t)cass_data_len > max_length)
712 return 1;
713 field->store(cass_data, cass_data_len,field->charset());
714 return 0;
715 }
716
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)717 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
718 {
719 String *pstr= field->val_str(&buf);
720 *cass_data= (char*)pstr->ptr();
721 *cass_data_len= pstr->length();
722 return false;
723 }
StringCopyConverter(size_t max_length_arg)724 StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter()725 ~StringCopyConverter(){}
726 };
727
728
729 class TimestampDataConverter : public ColumnDataConverter
730 {
731 int64_t buf;
732 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)733 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
734 {
735 /* Cassandra data is milliseconds-since-epoch in network byte order */
736 int64_t tmp;
737 DBUG_ASSERT(cass_data_len==8);
738 flip64(cass_data, (char*)&tmp);
739 /*
740 store_TIME's arguments:
741 - seconds since epoch
742 - microsecond fraction of a second.
743 */
744 ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
745 return 0;
746 }
747
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)748 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
749 {
750 my_time_t ts_time;
751 ulong ts_microsec;
752 int64_t tmp;
753 ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec);
754
755 /* Cassandra needs milliseconds-since-epoch */
756 tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000;
757 flip64((const char*)&tmp, (char*)&buf);
758
759 *cass_data= (char*)&buf;
760 *cass_data_len= 8;
761 return false;
762 }
~TimestampDataConverter()763 ~TimestampDataConverter(){}
764 };
765
766
767
convert_hex_digit(const char c)768 static int convert_hex_digit(const char c)
769 {
770 int num;
771 if (c >= '0' && c <= '9')
772 num= c - '0';
773 else if (c >= 'A' && c <= 'F')
774 num= c - 'A' + 10;
775 else if (c >= 'a' && c <= 'f')
776 num= c - 'a' + 10;
777 else
778 return -1; /* Couldn't convert */
779 return num;
780 }
781
782
783 const char map2number[]="0123456789abcdef";
784
convert_uuid2string(char * str,const char * cass_data)785 static void convert_uuid2string(char *str, const char *cass_data)
786 {
787 char *ptr= str;
788 /* UUID arrives as 16-byte number in network byte order */
789 for (uint i=0; i < 16; i++)
790 {
791 *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
792 *(ptr++)= map2number[cass_data[i] & 0xF];
793 if (i == 3 || i == 5 || i == 7 || i == 9)
794 *(ptr++)= '-';
795 }
796 *ptr= 0;
797 }
798
convert_string2uuid(char * buf,const char * str)799 static bool convert_string2uuid(char *buf, const char *str)
800 {
801 int lower, upper;
802 for (uint i= 0; i < 16; i++)
803 {
804 if ((upper= convert_hex_digit(str[0])) == -1 ||
805 (lower= convert_hex_digit(str[1])) == -1)
806 {
807 return true;
808 }
809 buf[i]= lower | (upper << 4);
810 str += 2;
811 if (i == 3 || i == 5 || i == 7 || i == 9)
812 {
813 if (str[0] != '-')
814 return true;
815 str++;
816 }
817 }
818 return false;
819 }
820
821
822 class UuidDataConverter : public ColumnDataConverter
823 {
824 char buf[16]; /* Binary UUID representation */
825 String str_buf;
826 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)827 int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
828 {
829 DBUG_ASSERT(cass_data_len==16);
830 char str[37];
831 convert_uuid2string(str, cass_data);
832 field->store(str, 36,field->charset());
833 return 0;
834 }
835
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)836 bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
837 {
838 String *uuid_str= field->val_str(&str_buf);
839
840 if (uuid_str->length() != 36)
841 return true;
842
843 if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
844 return true;
845 *cass_data= buf;
846 *cass_data_len= 16;
847 return false;
848 }
~UuidDataConverter()849 ~UuidDataConverter(){}
850 };
851
852 /**
853 Converting dynamic columns types to/from casandra types
854 */
855
856
857 /**
858 Check and initialize (if it is needed) string MEM_ROOT
859 */
alloc_strings_memroot(MEM_ROOT * mem_root)860 static void alloc_strings_memroot(MEM_ROOT *mem_root)
861 {
862 if (!alloc_root_inited(mem_root))
863 {
864 /*
865 The mem_root used to allocate UUID (of length 36 + \0) so make
866 appropriate allocated size
867 */
868 init_alloc_root(PSI_INSTRUMENT_ME, mem_root,
869 (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
870 ALLOC_ROOT_MIN_BLOCK_SIZE,
871 (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
872 ALLOC_ROOT_MIN_BLOCK_SIZE, MYF(MY_THREAD_SPECIFIC));
873 }
874 }
875
free_strings_memroot(MEM_ROOT * mem_root)876 static void free_strings_memroot(MEM_ROOT *mem_root)
877 {
878 if (alloc_root_inited(mem_root))
879 free_root(mem_root, MYF(0));
880 }
881
cassandra_to_dyncol_intLong(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)882 bool cassandra_to_dyncol_intLong(const char *cass_data,
883 int cass_data_len __attribute__((unused)),
884 DYNAMIC_COLUMN_VALUE *value,
885 MEM_ROOT *mem_root __attribute__((unused)))
886 {
887 value->type= DYN_COL_INT;
888 #ifdef WORDS_BIGENDIAN
889 value->x.long_value= (longlong)*cass_data;
890 #else
891 flip64(cass_data, (char *)&value->x.long_value);
892 #endif
893 return 0;
894 }
895
dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)896 bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
897 char **cass_data, int *cass_data_len,
898 void* buff, void **freemem)
899 {
900 longlong *tmp= (longlong *) buff;
901 enum enum_dyncol_func_result rc=
902 mariadb_dyncol_val_long(tmp, value);
903 if (rc < 0)
904 return true;
905 *cass_data_len= sizeof(longlong);
906 #ifdef WORDS_BIGENDIAN
907 *cass_data= (char *)buff;
908 #else
909 flip64((char *)buff, (char *)buff + sizeof(longlong));
910 *cass_data= (char *)buff + sizeof(longlong);
911 #endif
912 *freemem= NULL;
913 return false;
914 }
915
cassandra_to_dyncol_intInt32(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)916 bool cassandra_to_dyncol_intInt32(const char *cass_data,
917 int cass_data_len __attribute__((unused)),
918 DYNAMIC_COLUMN_VALUE *value,
919 MEM_ROOT *mem_root __attribute__((unused)))
920 {
921 int32 tmp;
922 value->type= DYN_COL_INT;
923 #ifdef WORDS_BIGENDIAN
924 tmp= *((int32 *)cass_data);
925 #else
926 flip32(cass_data, (char *)&tmp);
927 #endif
928 value->x.long_value= tmp;
929 return 0;
930 }
931
932
dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)933 bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
934 char **cass_data, int *cass_data_len,
935 void* buff, void **freemem)
936 {
937 longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
938 enum enum_dyncol_func_result rc=
939 mariadb_dyncol_val_long(tmp, value);
940 if (rc < 0)
941 return true;
942 *cass_data_len= sizeof(int32);
943 *cass_data= (char *)buff;
944 #ifdef WORDS_BIGENDIAN
945 *((int32 *) buff) = (int32) *tmp;
946 #else
947 {
948 int32 tmp2= (int32) *tmp;
949 flip32((char *)&tmp2, (char *)buff);
950 }
951 #endif
952 *freemem= NULL;
953 return false;
954 }
955
956
cassandra_to_dyncol_intCounter(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)957 bool cassandra_to_dyncol_intCounter(const char *cass_data,
958 int cass_data_len __attribute__((unused)),
959 DYNAMIC_COLUMN_VALUE *value,
960 MEM_ROOT *mem_root __attribute__((unused)))
961 {
962 value->type= DYN_COL_INT;
963 value->x.long_value= *((longlong *)cass_data);
964 return 0;
965 }
966
967
dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)968 bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
969 char **cass_data, int *cass_data_len,
970 void* buff, void **freemem)
971 {
972 longlong *tmp= (longlong *)buff;
973 enum enum_dyncol_func_result rc=
974 mariadb_dyncol_val_long(tmp, value);
975 if (rc < 0)
976 return true;
977 *cass_data_len= sizeof(longlong);
978 *cass_data= (char *)buff;
979 *freemem= NULL;
980 return false;
981 }
982
cassandra_to_dyncol_doubleFloat(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)983 bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
984 int cass_data_len __attribute__((unused)),
985 DYNAMIC_COLUMN_VALUE *value,
986 MEM_ROOT *mem_root __attribute__((unused)))
987 {
988 value->type= DYN_COL_DOUBLE;
989 value->x.double_value= *((float *)cass_data);
990 return 0;
991 }
992
dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)993 bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
994 char **cass_data, int *cass_data_len,
995 void* buff, void **freemem)
996 {
997 double tmp;
998 enum enum_dyncol_func_result rc=
999 mariadb_dyncol_val_double(&tmp, value);
1000 if (rc < 0)
1001 return true;
1002 *((float *)buff)= (float) tmp;
1003 *cass_data_len= sizeof(float);
1004 *cass_data= (char *)buff;
1005 *freemem= NULL;
1006 return false;
1007 }
1008
cassandra_to_dyncol_doubleDouble(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1009 bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
1010 int cass_data_len __attribute__((unused)),
1011 DYNAMIC_COLUMN_VALUE *value,
1012 MEM_ROOT *mem_root
1013 __attribute__((unused)))
1014 {
1015 value->type= DYN_COL_DOUBLE;
1016 value->x.double_value= *((double *)cass_data);
1017 return 0;
1018 }
1019
dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1020 bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
1021 char **cass_data, int *cass_data_len,
1022 void* buff, void **freemem)
1023 {
1024 double *tmp= (double *)buff;
1025 enum enum_dyncol_func_result rc=
1026 mariadb_dyncol_val_double(tmp, value);
1027 if (rc < 0)
1028 return true;
1029 *cass_data_len= sizeof(double);
1030 *cass_data= (char *)buff;
1031 *freemem= NULL;
1032 return false;
1033 }
1034
cassandra_to_dyncol_strStr(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,CHARSET_INFO * cs)1035 bool cassandra_to_dyncol_strStr(const char *cass_data,
1036 int cass_data_len,
1037 DYNAMIC_COLUMN_VALUE *value,
1038 CHARSET_INFO *cs)
1039 {
1040 value->type= DYN_COL_STRING;
1041 value->x.string.charset= cs;
1042 value->x.string.value.str= (char *)cass_data;
1043 value->x.string.value.length= cass_data_len;
1044 return 0;
1045 }
1046
dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem,CHARSET_INFO * cs)1047 bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
1048 char **cass_data, int *cass_data_len,
1049 void* buff, void **freemem, CHARSET_INFO *cs)
1050 {
1051 DYNAMIC_STRING tmp;
1052 if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1053 return 1;
1054 enum enum_dyncol_func_result rc=
1055 mariadb_dyncol_val_str(&tmp, value, cs, '\0');
1056 if (rc < 0)
1057 {
1058 dynstr_free(&tmp);
1059 return 1;
1060 }
1061 *cass_data_len= tmp.length;
1062 *(cass_data)= tmp.str;
1063 *freemem= tmp.str;
1064 return 0;
1065 }
1066
cassandra_to_dyncol_strBytes(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1067 bool cassandra_to_dyncol_strBytes(const char *cass_data,
1068 int cass_data_len,
1069 DYNAMIC_COLUMN_VALUE *value,
1070 MEM_ROOT *mem_root __attribute__((unused)))
1071 {
1072 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1073 &my_charset_bin);
1074 }
1075
dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1076 bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
1077 char **cass_data, int *cass_data_len,
1078 void* buff, void **freemem)
1079 {
1080 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1081 buff, freemem, &my_charset_bin);
1082 }
1083
cassandra_to_dyncol_strAscii(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1084 bool cassandra_to_dyncol_strAscii(const char *cass_data,
1085 int cass_data_len,
1086 DYNAMIC_COLUMN_VALUE *value,
1087 MEM_ROOT *mem_root __attribute__((unused)))
1088 {
1089 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1090 &my_charset_latin1_bin);
1091 }
1092
dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1093 bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
1094 char **cass_data, int *cass_data_len,
1095 void* buff, void **freemem)
1096 {
1097 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1098 buff, freemem, &my_charset_latin1_bin);
1099 }
1100
cassandra_to_dyncol_strUTF8(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1101 bool cassandra_to_dyncol_strUTF8(const char *cass_data,
1102 int cass_data_len,
1103 DYNAMIC_COLUMN_VALUE *value,
1104 MEM_ROOT *mem_root __attribute__((unused)))
1105 {
1106 return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1107 &my_charset_utf8mb3_unicode_ci);
1108 }
1109
dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1110 bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
1111 char **cass_data, int *cass_data_len,
1112 void* buff, void **freemem)
1113 {
1114 return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1115 buff, freemem, &my_charset_utf8mb3_unicode_ci);
1116 }
1117
cassandra_to_dyncol_strUUID(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1118 bool cassandra_to_dyncol_strUUID(const char *cass_data,
1119 int cass_data_len,
1120 DYNAMIC_COLUMN_VALUE *value,
1121 MEM_ROOT *mem_root)
1122 {
1123 value->type= DYN_COL_STRING;
1124 value->x.string.charset= &my_charset_bin;
1125 alloc_strings_memroot(mem_root);
1126 value->x.string.value.str= (char *)alloc_root(mem_root, 37);
1127 if (!value->x.string.value.str)
1128 {
1129 value->x.string.value.length= 0;
1130 return 1;
1131 }
1132 convert_uuid2string(value->x.string.value.str, cass_data);
1133 value->x.string.value.length= 36;
1134 return 0;
1135 }
1136
dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1137 bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
1138 char **cass_data, int *cass_data_len,
1139 void* buff, void **freemem)
1140 {
1141 DYNAMIC_STRING tmp;
1142 if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1143 return true;
1144 enum enum_dyncol_func_result rc=
1145 mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0');
1146 if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
1147 {
1148 dynstr_free(&tmp);
1149 return true;
1150 }
1151
1152 *cass_data_len= tmp.length;
1153 *(cass_data)= tmp.str;
1154 *freemem= tmp.str;
1155 return 0;
1156 }
1157
cassandra_to_dyncol_intBool(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1158 bool cassandra_to_dyncol_intBool(const char *cass_data,
1159 int cass_data_len,
1160 DYNAMIC_COLUMN_VALUE *value,
1161 MEM_ROOT *mem_root __attribute__((unused)))
1162 {
1163 value->type= DYN_COL_INT;
1164 value->x.long_value= (cass_data[0] ? 1 : 0);
1165 return 0;
1166 }
1167
dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1168 bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
1169 char **cass_data, int *cass_data_len,
1170 void* buff, void **freemem)
1171 {
1172 longlong tmp;
1173 enum enum_dyncol_func_result rc=
1174 mariadb_dyncol_val_long(&tmp, value);
1175 if (rc < 0)
1176 return true;
1177 ((char *)buff)[0]= (tmp ? 1 : 0);
1178 *cass_data_len= 1;
1179 *(cass_data)= (char *)buff;
1180 *freemem= 0;
1181 return 0;
1182 }
1183
1184
1185 const char * const validator_bigint= "org.apache.cassandra.db.marshal.LongType";
1186 const char * const validator_int= "org.apache.cassandra.db.marshal.Int32Type";
1187 const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
1188
1189 const char * const validator_float= "org.apache.cassandra.db.marshal.FloatType";
1190 const char * const validator_double= "org.apache.cassandra.db.marshal.DoubleType";
1191
1192 const char * const validator_blob= "org.apache.cassandra.db.marshal.BytesType";
1193 const char * const validator_ascii= "org.apache.cassandra.db.marshal.AsciiType";
1194 const char * const validator_text= "org.apache.cassandra.db.marshal.UTF8Type";
1195
1196 const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
1197
1198 const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
1199
1200 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
1201
1202 /* VARINTs are stored as big-endian big numbers. */
1203 const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
1204 const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
1205
1206
1207 static CASSANDRA_TYPE_DEF cassandra_types[]=
1208 {
1209 {
1210 validator_bigint,
1211 &cassandra_to_dyncol_intLong,
1212 &dyncol_to_cassandraLong
1213 },
1214 {
1215 validator_int,
1216 &cassandra_to_dyncol_intInt32,
1217 &dyncol_to_cassandraInt32
1218 },
1219 {
1220 validator_counter,
1221 cassandra_to_dyncol_intCounter,
1222 &dyncol_to_cassandraCounter
1223 },
1224 {
1225 validator_float,
1226 &cassandra_to_dyncol_doubleFloat,
1227 &dyncol_to_cassandraFloat
1228 },
1229 {
1230 validator_double,
1231 &cassandra_to_dyncol_doubleDouble,
1232 &dyncol_to_cassandraDouble
1233 },
1234 {
1235 validator_blob,
1236 &cassandra_to_dyncol_strBytes,
1237 &dyncol_to_cassandraBytes
1238 },
1239 {
1240 validator_ascii,
1241 &cassandra_to_dyncol_strAscii,
1242 &dyncol_to_cassandraAscii
1243 },
1244 {
1245 validator_text,
1246 &cassandra_to_dyncol_strUTF8,
1247 &dyncol_to_cassandraUTF8
1248 },
1249 {
1250 validator_timestamp,
1251 &cassandra_to_dyncol_intLong,
1252 &dyncol_to_cassandraLong
1253 },
1254 {
1255 validator_uuid,
1256 &cassandra_to_dyncol_strUUID,
1257 &dyncol_to_cassandraUUID
1258 },
1259 {
1260 validator_boolean,
1261 &cassandra_to_dyncol_intBool,
1262 &dyncol_to_cassandraBool
1263 },
1264 {
1265 validator_varint,
1266 &cassandra_to_dyncol_strBytes,
1267 &dyncol_to_cassandraBytes
1268 },
1269 {
1270 validator_decimal,
1271 &cassandra_to_dyncol_strBytes,
1272 &dyncol_to_cassandraBytes
1273 }
1274 };
1275
get_cassandra_type(const char * validator)1276 CASSANDRA_TYPE get_cassandra_type(const char *validator)
1277 {
1278 CASSANDRA_TYPE rc;
1279 switch(validator[32])
1280 {
1281 case 'L':
1282 rc= CT_BIGINT;
1283 break;
1284 case 'I':
1285 rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
1286 rc= CT_INT;
1287 break;
1288 case 'C':
1289 rc= CT_COUNTER;
1290 break;
1291 case 'F':
1292 rc= CT_FLOAT;
1293 break;
1294 case 'D':
1295 switch (validator[33])
1296 {
1297 case 'o':
1298 rc= CT_DOUBLE;
1299 break;
1300 case 'a':
1301 rc= CT_TIMESTAMP;
1302 break;
1303 case 'e':
1304 rc= CT_DECIMAL;
1305 break;
1306 default:
1307 rc= CT_BLOB;
1308 break;
1309 }
1310 break;
1311 case 'B':
1312 rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
1313 break;
1314 case 'A':
1315 rc= CT_ASCII;
1316 break;
1317 case 'U':
1318 rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
1319 break;
1320 default:
1321 rc= CT_BLOB;
1322 }
1323 DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
1324 return rc;
1325 }
1326
map_field_to_validator(Field * field,const char * validator_name)1327 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
1328 {
1329 ColumnDataConverter *res= NULL;
1330
1331 switch(field->type()) {
1332 case MYSQL_TYPE_TINY:
1333 if (!strcmp(validator_name, validator_boolean))
1334 {
1335 res= new TinyintDataConverter;
1336 break;
1337 }
1338 /* fall through: */
1339 case MYSQL_TYPE_SHORT:
1340 case MYSQL_TYPE_LONGLONG:
1341 {
1342 bool is_counter= false;
1343 if (!strcmp(validator_name, validator_bigint) ||
1344 !strcmp(validator_name, validator_timestamp) ||
1345 (is_counter= !strcmp(validator_name, validator_counter)))
1346 res= new BigintDataConverter(!is_counter);
1347 break;
1348 }
1349 case MYSQL_TYPE_FLOAT:
1350 if (!strcmp(validator_name, validator_float))
1351 res= new FloatDataConverter;
1352 break;
1353
1354 case MYSQL_TYPE_DOUBLE:
1355 if (!strcmp(validator_name, validator_double))
1356 res= new DoubleDataConverter;
1357 break;
1358
1359 case MYSQL_TYPE_TIMESTAMP:
1360 if (!strcmp(validator_name, validator_timestamp))
1361 res= new TimestampDataConverter;
1362 break;
1363
1364 case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
1365 if (!strcmp(validator_name, validator_uuid) &&
1366 field->real_type() == MYSQL_TYPE_STRING &&
1367 field->field_length == 36)
1368 {
1369 // UUID maps to CHAR(36), its text representation
1370 res= new UuidDataConverter;
1371 break;
1372 }
1373 /* fall through: */
1374 case MYSQL_TYPE_VAR_STRING:
1375 case MYSQL_TYPE_VARCHAR:
1376 case MYSQL_TYPE_BLOB:
1377 {
1378 /*
1379 Cassandra's "varint" type is a binary-encoded arbitary-length
1380 big-endian number.
1381 - It can be mapped to VARBINARY(N), with sufficiently big N.
1382 - If the value does not fit into N bytes, it is an error. We should not
1383 truncate it, because that is just as good as returning garbage.
1384 - varint should not be mapped to BINARY(N), because BINARY(N) values
1385 are zero-padded, which will work as multiplying the value by
1386 2^k for some value of k.
1387 */
1388 if (field->type() == MYSQL_TYPE_VARCHAR &&
1389 field->binary() &&
1390 (!strcmp(validator_name, validator_varint) ||
1391 !strcmp(validator_name, validator_decimal)))
1392 {
1393 res= new StringCopyConverter(field->field_length);
1394 break;
1395 }
1396
1397 if (!strcmp(validator_name, validator_blob) ||
1398 !strcmp(validator_name, validator_ascii) ||
1399 !strcmp(validator_name, validator_text))
1400 {
1401 res= new StringCopyConverter((size_t)-1);
1402 }
1403 break;
1404 }
1405 case MYSQL_TYPE_LONG:
1406 if (!strcmp(validator_name, validator_int))
1407 res= new Int32DataConverter;
1408 break;
1409
1410 default:;
1411 }
1412 return res;
1413 }
1414
1415
setup_field_converters(Field ** field_arg,uint n_fields)1416 bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
1417 {
1418 char *col_name;
1419 int col_name_len;
1420 char *col_type;
1421 int col_type_len;
1422 size_t ddl_fields= se->get_ddl_size();
1423 const char *default_type= se->get_default_validator();
1424 uint max_non_default_fields;
1425 DBUG_ENTER("ha_cassandra::setup_field_converters");
1426 DBUG_ASSERT(default_type);
1427
1428 DBUG_ASSERT(!field_converters);
1429 DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
1430
1431 /*
1432 We always should take into account that in case of using dynamic columns
1433 sql description contain one field which does not described in
1434 Cassandra DDL also key field is described separately. So that
1435 is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
1436 */
1437 max_non_default_fields= ddl_fields + 2 - n_fields;
1438 if (ddl_fields < (n_fields - dyncol_set - 1))
1439 {
1440 se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
1441 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1442 DBUG_RETURN(true);
1443 }
1444
1445 /* allocate memory in one chunk */
1446 size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
1447 (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
1448 (dyncol_set ? max_non_default_fields : 0);
1449 if (!(field_converters= (ColumnDataConverter**)my_malloc(PSI_INSTRUMENT_ME, memsize, MYF(0))))
1450 DBUG_RETURN(true);
1451 bzero(field_converters, memsize);
1452 n_field_converters= n_fields;
1453
1454 if (dyncol_set)
1455 {
1456 special_type_field_converters=
1457 (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
1458 special_type_field_names=
1459 ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
1460
1461 if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_values,
1462 sizeof(DYNAMIC_COLUMN_VALUE),
1463 DYNCOL_USUAL, DYNCOL_DELTA, MYF(0)))
1464 DBUG_RETURN(true);
1465 else
1466 if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_names,
1467 sizeof(LEX_STRING),
1468 DYNCOL_USUAL, DYNCOL_DELTA,MYF(0)))
1469 {
1470 delete_dynamic(&dynamic_values);
1471 DBUG_RETURN(true);
1472 }
1473 else
1474 if (init_dynamic_string(&dynamic_rec, NULL,
1475 DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
1476 {
1477 delete_dynamic(&dynamic_values);
1478 delete_dynamic(&dynamic_names);
1479 DBUG_RETURN(true);
1480 }
1481
1482 /* Dynamic column field has special processing */
1483 field_converters[dyncol_field]= NULL;
1484
1485 default_type_def= cassandra_types + get_cassandra_type(default_type);
1486 }
1487
1488 se->first_ddl_column();
1489 uint n_mapped= 0;
1490 while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
1491 &col_type_len))
1492 {
1493 Field **field;
1494 uint i;
1495 /* Mapping for the 1st field is already known */
1496 for (field= field_arg + 1, i= 1; *field; field++, i++)
1497 {
1498 if ((!dyncol_set || dyncol_field != i) &&
1499 !strcmp((*field)->field_name.str, col_name))
1500 {
1501 n_mapped++;
1502 ColumnDataConverter **conv= field_converters + (*field)->field_index;
1503 if (!(*conv= map_field_to_validator(*field, col_type)))
1504 {
1505 se->print_error("Failed to map column %s to datatype %s",
1506 (*field)->field_name.str, col_type);
1507 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1508 DBUG_RETURN(true);
1509 }
1510 (*conv)->field= *field;
1511 break;
1512 }
1513 }
1514 if (dyncol_set && !(*field)) // is needed and not found
1515 {
1516 DBUG_PRINT("info",("Field not found: %s", col_name));
1517 if (strcmp(col_type, default_type))
1518 {
1519 DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
1520 col_name, col_type));
1521 special_type_field_names[n_special_type_fields].length= col_name_len;
1522 special_type_field_names[n_special_type_fields].str= col_name;
1523 special_type_field_converters[n_special_type_fields]=
1524 cassandra_types[get_cassandra_type(col_type)];
1525 n_special_type_fields++;
1526 }
1527 }
1528 }
1529
1530 if (n_mapped != n_fields - 1 - dyncol_set)
1531 {
1532 Field *first_unmapped= NULL;
1533 /* Find the first field */
1534 for (uint i= 1; i < n_fields;i++)
1535 {
1536 if (!field_converters[i])
1537 {
1538 first_unmapped= field_arg[i];
1539 break;
1540 }
1541 }
1542 DBUG_ASSERT(first_unmapped);
1543
1544 se->print_error("Field `%s` could not be mapped to any field in Cassandra",
1545 first_unmapped->field_name.str);
1546 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1547 DBUG_RETURN(true);
1548 }
1549
1550 /*
1551 Setup type conversion for row_key.
1552 */
1553 se->get_rowkey_type(&col_name, &col_type);
1554 if (col_name && strcmp(col_name, (*field_arg)->field_name.str))
1555 {
1556 se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
1557 col_name);
1558 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1559 DBUG_RETURN(true);
1560 }
1561 if (!col_name && strcmp("rowkey", (*field_arg)->field_name.str))
1562 {
1563 se->print_error("target column family has no key_alias defined, "
1564 "PRIMARY KEY column must be named 'rowkey'");
1565 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1566 DBUG_RETURN(true);
1567 }
1568
1569 if (col_type != NULL)
1570 {
1571 if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
1572 {
1573 se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
1574 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1575 DBUG_RETURN(true);
1576 }
1577 rowkey_converter->field= *field_arg;
1578 }
1579 else
1580 {
1581 se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
1582 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1583 DBUG_RETURN(true);
1584 }
1585
1586 DBUG_RETURN(false);
1587 }
1588
1589
free_field_converters()1590 void ha_cassandra::free_field_converters()
1591 {
1592 delete rowkey_converter;
1593 rowkey_converter= NULL;
1594
1595 if (dyncol_set)
1596 {
1597 delete_dynamic(&dynamic_values);
1598 delete_dynamic(&dynamic_names);
1599 dynstr_free(&dynamic_rec);
1600 }
1601 if (field_converters)
1602 {
1603 for (uint i=0; i < n_field_converters; i++)
1604 if (field_converters[i])
1605 {
1606 DBUG_ASSERT(!dyncol_set || i != dyncol_field);
1607 delete field_converters[i];
1608 }
1609 my_free(field_converters);
1610 field_converters= NULL;
1611 }
1612 }
1613
1614
index_init(uint idx,bool sorted)1615 int ha_cassandra::index_init(uint idx, bool sorted)
1616 {
1617 int ires;
1618 if (!se && (ires= connect_and_check_options(table)))
1619 return ires;
1620 return 0;
1621 }
1622
1623 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
1624
index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)1625 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
1626 key_part_map keypart_map,
1627 enum ha_rkey_function find_flag)
1628 {
1629 int rc= 0;
1630 DBUG_ENTER("ha_cassandra::index_read_map");
1631
1632 if (find_flag != HA_READ_KEY_EXACT)
1633 {
1634 DBUG_ASSERT(0); /* Non-equality lookups should never be done */
1635 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1636 }
1637
1638 uint key_len= calculate_key_len(table, active_index, key, keypart_map);
1639 store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
1640
1641 char *cass_key;
1642 int cass_key_len;
1643 MY_BITMAP *old_map;
1644
1645 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1646
1647 if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1648 {
1649 /* We get here when making lookups like uuid_column='not-an-uuid' */
1650 dbug_tmp_restore_column_map(&table->read_set, old_map);
1651 DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
1652 }
1653
1654 dbug_tmp_restore_column_map(&table->read_set, old_map);
1655
1656 bool found;
1657 if (se->get_slice(cass_key, cass_key_len, &found))
1658 {
1659 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1660 rc= HA_ERR_INTERNAL_ERROR;
1661 }
1662
1663 /* TODO: what if we're not reading all columns?? */
1664 if (!found)
1665 rc= HA_ERR_KEY_NOT_FOUND;
1666 else
1667 rc= read_cassandra_columns(false);
1668
1669 DBUG_RETURN(rc);
1670 }
1671
1672
print_conversion_error(const char * field_name,char * cass_value,int cass_value_len)1673 void ha_cassandra::print_conversion_error(const char *field_name,
1674 char *cass_value,
1675 int cass_value_len)
1676 {
1677 char buf[32];
1678 char *p= cass_value;
1679 size_t i= 0;
1680 for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
1681 {
1682 buf[i++]= map2number[(*p >> 4) & 0xF];
1683 buf[i++]= map2number[*p & 0xF];
1684 }
1685 buf[i]=0;
1686
1687 se->print_error("Unable to convert value for field `%s` from Cassandra's data"
1688 " format. Source data is %d bytes, 0x%s%s",
1689 field_name, cass_value_len, buf,
1690 (i == sizeof(buf) - 1)? "..." : "");
1691 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1692 }
1693
1694
1695
get_cassandra_field_def(char * cass_name,int cass_name_len)1696 CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
1697 int cass_name_len)
1698 {
1699 CASSANDRA_TYPE_DEF *type= default_type_def;
1700 for(uint i= 0; i < n_special_type_fields; i++)
1701 {
1702 if (cass_name_len == (int)special_type_field_names[i].length &&
1703 memcmp(cass_name, special_type_field_names[i].str,
1704 cass_name_len) == 0)
1705 {
1706 type= special_type_field_converters + i;
1707 break;
1708 }
1709 }
1710 return type;
1711 }
1712
read_cassandra_columns(bool unpack_pk)1713 int ha_cassandra::read_cassandra_columns(bool unpack_pk)
1714 {
1715 MEM_ROOT strings_root;
1716 char *cass_name;
1717 char *cass_value;
1718 int cass_value_len, cass_name_len;
1719 Field **field;
1720 int res= 0;
1721 ulong total_name_len= 0;
1722
1723 clear_alloc_root(&strings_root);
1724 /*
1725 cassandra_to_mariadb() calls will use field->store(...) methods, which
1726 require that the column is in the table->write_set
1727 */
1728 MY_BITMAP *old_map;
1729 old_map= dbug_tmp_use_all_columns(table, &table->write_set);
1730
1731 /* Start with all fields being NULL */
1732 for (field= table->field + 1; *field; field++)
1733 (*field)->set_null();
1734
1735 while (!se->get_next_read_column(&cass_name, &cass_name_len,
1736 &cass_value, &cass_value_len))
1737 {
1738 // map to our column. todo: use hash or something..
1739 bool found= 0;
1740 for (field= table->field + 1; *field; field++)
1741 {
1742 uint fieldnr= (*field)->field_index;
1743 if ((!dyncol_set || dyncol_field != fieldnr) &&
1744 !strcmp((*field)->field_name.str, cass_name))
1745 {
1746 found= 1;
1747 (*field)->set_notnull();
1748 if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
1749 cass_value_len))
1750 {
1751 print_conversion_error((*field)->field_name.str, cass_value,
1752 cass_value_len);
1753 res=1;
1754 goto err;
1755 }
1756 break;
1757 }
1758 }
1759 if (dyncol_set && !found)
1760 {
1761 DYNAMIC_COLUMN_VALUE val;
1762 LEX_STRING nm;
1763 CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
1764 cass_name_len);
1765 nm.str= cass_name;
1766 nm.length= cass_name_len;
1767 if (nm.length > MAX_NAME_LENGTH)
1768 {
1769 se->print_error("Unable to convert value for field `%s`"
1770 " from Cassandra's data format. Name"
1771 " length exceed limit of %u: '%s'",
1772 table->field[dyncol_field]->field_name.str,
1773 (uint)MAX_NAME_LENGTH, cass_name);
1774 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1775 res=1;
1776 goto err;
1777 }
1778 total_name_len+= cass_name_len;
1779 if (nm.length > MAX_TOTAL_NAME_LENGTH)
1780 {
1781 se->print_error("Unable to convert value for field `%s`"
1782 " from Cassandra's data format. Sum of all names"
1783 " length exceed limit of %lu",
1784 table->field[dyncol_field]->field_name.str,
1785 cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
1786 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1787 res=1;
1788 goto err;
1789 }
1790
1791 if ((res= (*(type->cassandra_to_dynamic))(cass_value,
1792 cass_value_len, &val,
1793 &strings_root)) ||
1794 insert_dynamic(&dynamic_names, (uchar *) &nm) ||
1795 insert_dynamic(&dynamic_values, (uchar *) &val))
1796 {
1797 if (res)
1798 {
1799 print_conversion_error(cass_name, cass_value, cass_value_len);
1800 }
1801 free_strings_memroot(&strings_root);
1802 // EOM shouldm be already reported if happened
1803 res= 1;
1804 goto err;
1805 }
1806 }
1807 }
1808
1809 dynamic_rec.length= 0;
1810 if (dyncol_set)
1811 {
1812 if (mariadb_dyncol_create_many_named(&dynamic_rec,
1813 dynamic_names.elements,
1814 (LEX_STRING *)dynamic_names.buffer,
1815 (DYNAMIC_COLUMN_VALUE *)
1816 dynamic_values.buffer,
1817 FALSE) < 0)
1818 dynamic_rec.length= 0;
1819
1820 free_strings_memroot(&strings_root);
1821 dynamic_values.elements= dynamic_names.elements= 0;
1822
1823 if (dynamic_rec.length == 0)
1824 table->field[dyncol_field]->set_null();
1825 else
1826 {
1827 Field_blob *blob= (Field_blob *)table->field[dyncol_field];
1828 blob->set_notnull();
1829 blob->store_length(dynamic_rec.length);
1830 *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
1831 dynamic_rec.str;
1832 }
1833 }
1834
1835 if (unpack_pk)
1836 {
1837 /* Unpack rowkey to primary key */
1838 field= table->field;
1839 (*field)->set_notnull();
1840 se->get_read_rowkey(&cass_value, &cass_value_len);
1841 if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
1842 {
1843 print_conversion_error((*field)->field_name.str, cass_value, cass_value_len);
1844 res=1;
1845 goto err;
1846 }
1847 }
1848
1849 err:
1850 dbug_tmp_restore_column_map(&table->write_set, old_map);
1851 return res;
1852 }
1853
read_dyncol(uint * count,DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names,String * valcol)1854 int ha_cassandra::read_dyncol(uint *count,
1855 DYNAMIC_COLUMN_VALUE **vals,
1856 LEX_STRING **names,
1857 String *valcol)
1858 {
1859 String *strcol;
1860 DYNAMIC_COLUMN col;
1861
1862 enum enum_dyncol_func_result rc;
1863 DBUG_ENTER("ha_cassandra::read_dyncol");
1864
1865 Field *field= table->field[dyncol_field];
1866 DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
1867 /* It is blob and it does not use buffer */
1868 strcol= field->val_str(NULL, valcol);
1869 if (field->is_null())
1870 {
1871 *count= 0;
1872 *names= 0;
1873 *vals= 0;
1874 DBUG_RETURN(0); // nothing to write
1875 }
1876 /*
1877 dynamic_column_vals only read the string so we can
1878 cheat here with assignment
1879 */
1880 bzero(&col, sizeof(col));
1881 col.str= (char *)strcol->ptr();
1882 col.length= strcol->length();
1883 if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0)
1884 {
1885 dynamic_column_error_message(rc);
1886 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1887 }
1888 DBUG_RETURN(0);
1889 }
1890
write_dynamic_row(uint count,DYNAMIC_COLUMN_VALUE * vals,LEX_STRING * names)1891 int ha_cassandra::write_dynamic_row(uint count,
1892 DYNAMIC_COLUMN_VALUE *vals,
1893 LEX_STRING *names)
1894 {
1895 uint i;
1896 DBUG_ENTER("ha_cassandra::write_dynamic_row");
1897 DBUG_ASSERT(dyncol_set);
1898
1899
1900 for (i= 0; i < count; i++)
1901 {
1902 char buff[16];
1903 CASSANDRA_TYPE_DEF *type;
1904 void *freemem= NULL;
1905 char *cass_data;
1906 int cass_data_len;
1907
1908 DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str));
1909 type= get_cassandra_field_def(names[i].str, (int) names[i].length);
1910 if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len,
1911 buff, &freemem))
1912 {
1913 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1914 names[i].str, insert_lineno);
1915 DBUG_RETURN(HA_ERR_GENERIC);
1916 }
1917 se->add_insert_column(names[i].str, names[i].length,
1918 cass_data, cass_data_len);
1919 if (freemem)
1920 my_free(freemem);
1921 }
1922 DBUG_RETURN(0);
1923 }
1924
free_dynamic_row(DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names)1925 void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
1926 LEX_STRING **names)
1927 {
1928 mariadb_dyncol_unpack_free(*names, *vals);
1929 *vals= 0;
1930 *names= 0;
1931 }
1932
write_row(const uchar * buf)1933 int ha_cassandra::write_row(const uchar *buf)
1934 {
1935 MY_BITMAP *old_map;
1936 int ires;
1937 DBUG_ENTER("ha_cassandra::write_row");
1938
1939 if (!se && (ires= connect_and_check_options(table)))
1940 DBUG_RETURN(ires);
1941
1942 if (!doing_insert_batch)
1943 se->clear_insert_buffer();
1944
1945 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1946
1947 insert_lineno++;
1948
1949 /* Convert the key */
1950 char *cass_key;
1951 int cass_key_len;
1952 if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1953 {
1954 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1955 rowkey_converter->field->field_name.str, insert_lineno);
1956 dbug_tmp_restore_column_map(&table->read_set, old_map);
1957 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1958 }
1959 se->start_row_insert(cass_key, cass_key_len);
1960
1961 /* Convert other fields */
1962 for (uint i= 1; i < table->s->fields; i++)
1963 {
1964 char *cass_data;
1965 int cass_data_len;
1966 if (dyncol_set && dyncol_field == i)
1967 {
1968 String valcol;
1969 DYNAMIC_COLUMN_VALUE *vals;
1970 LEX_STRING *names;
1971 uint count;
1972 int rc;
1973 DBUG_ASSERT(field_converters[i] == NULL);
1974 if (!(rc= read_dyncol(&count, &vals, &names, &valcol)))
1975 rc= write_dynamic_row(count, vals, names);
1976 free_dynamic_row(&vals, &names);
1977 if (rc)
1978 {
1979 dbug_tmp_restore_column_map(&table->read_set, old_map);
1980 DBUG_RETURN(rc);
1981 }
1982 }
1983 else
1984 {
1985 if (field_converters[i]->mariadb_to_cassandra(&cass_data,
1986 &cass_data_len))
1987 {
1988 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1989 field_converters[i]->field->field_name.str, insert_lineno);
1990 dbug_tmp_restore_column_map(&table->read_set, old_map);
1991 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1992 }
1993 se->add_insert_column(field_converters[i]->field->field_name.str, 0,
1994 cass_data, cass_data_len);
1995 }
1996 }
1997
1998 dbug_tmp_restore_column_map(&table->read_set, old_map);
1999
2000 bool res;
2001
2002 if (doing_insert_batch)
2003 {
2004 res= 0;
2005 if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
2006 {
2007 res= se->do_insert();
2008 insert_rows_batched= 0;
2009 }
2010 }
2011 else
2012 res= se->do_insert();
2013
2014 if (res)
2015 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2016
2017 DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2018 }
2019
2020
start_bulk_insert(ha_rows rows,uint flags)2021 void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
2022 {
2023 int ires;
2024 if (!se && (ires= connect_and_check_options(table)))
2025 return;
2026
2027 doing_insert_batch= true;
2028 insert_rows_batched= 0;
2029
2030 se->clear_insert_buffer();
2031 }
2032
2033
end_bulk_insert()2034 int ha_cassandra::end_bulk_insert()
2035 {
2036 DBUG_ENTER("ha_cassandra::end_bulk_insert");
2037
2038 if (!doing_insert_batch)
2039 {
2040 /* SQL layer can make end_bulk_insert call without start_bulk_insert call */
2041 DBUG_RETURN(0);
2042 }
2043
2044 /* Flush out the insert buffer */
2045 doing_insert_batch= false;
2046 bool bres= se->do_insert();
2047 se->clear_insert_buffer();
2048
2049 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2050 }
2051
2052
rnd_init(bool scan)2053 int ha_cassandra::rnd_init(bool scan)
2054 {
2055 bool bres;
2056 int ires;
2057 DBUG_ENTER("ha_cassandra::rnd_init");
2058
2059 if (!se && (ires= connect_and_check_options(table)))
2060 DBUG_RETURN(ires);
2061
2062 if (!scan)
2063 {
2064 /* Prepare for rnd_pos() calls. We don't need to anything. */
2065 DBUG_RETURN(0);
2066 }
2067
2068 if (dyncol_set)
2069 {
2070 se->clear_read_all_columns();
2071 }
2072 else
2073 {
2074 se->clear_read_columns();
2075 for (uint i= 1; i < table->s->fields; i++)
2076 se->add_read_column(table->field[i]->field_name.str);
2077 }
2078
2079 se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
2080 bres= se->get_range_slices(false);
2081 if (bres)
2082 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2083
2084 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2085 }
2086
2087
rnd_end()2088 int ha_cassandra::rnd_end()
2089 {
2090 DBUG_ENTER("ha_cassandra::rnd_end");
2091
2092 se->finish_reading_range_slices();
2093 DBUG_RETURN(0);
2094 }
2095
2096
rnd_next(uchar * buf)2097 int ha_cassandra::rnd_next(uchar *buf)
2098 {
2099 int rc;
2100 bool reached_eof;
2101 DBUG_ENTER("ha_cassandra::rnd_next");
2102
2103 // Unpack and return the next record.
2104 if (se->get_next_range_slice_row(&reached_eof))
2105 {
2106 rc= HA_ERR_INTERNAL_ERROR;
2107 }
2108 else
2109 {
2110 if (reached_eof)
2111 rc= HA_ERR_END_OF_FILE;
2112 else
2113 rc= read_cassandra_columns(true);
2114 }
2115
2116 DBUG_RETURN(rc);
2117 }
2118
2119
delete_all_rows()2120 int ha_cassandra::delete_all_rows()
2121 {
2122 bool bres;
2123 int ires;
2124 DBUG_ENTER("ha_cassandra::delete_all_rows");
2125
2126 if (!se && (ires= connect_and_check_options(table)))
2127 DBUG_RETURN(ires);
2128
2129 bres= se->truncate();
2130
2131 if (bres)
2132 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2133
2134 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2135 }
2136
2137
delete_row(const uchar * buf)2138 int ha_cassandra::delete_row(const uchar *buf)
2139 {
2140 bool bres;
2141 DBUG_ENTER("ha_cassandra::delete_row");
2142
2143 bres= se->remove_row();
2144
2145 if (bres)
2146 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2147
2148 DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2149 }
2150
2151
info(uint flag)2152 int ha_cassandra::info(uint flag)
2153 {
2154 DBUG_ENTER("ha_cassandra::info");
2155
2156 if (!table)
2157 return 1;
2158
2159 if (flag & HA_STATUS_VARIABLE)
2160 {
2161 stats.records= 1000;
2162 stats.deleted= 0;
2163 }
2164 if (flag & HA_STATUS_CONST)
2165 {
2166 ref_length= table->field[0]->key_length();
2167 }
2168
2169 DBUG_RETURN(0);
2170 }
2171
2172
2173 void key_copy(uchar *to_key, const uchar *from_record, const KEY *key_info,
2174 uint key_length, bool with_zerofill);
2175
2176
position(const uchar * record)2177 void ha_cassandra::position(const uchar *record)
2178 {
2179 DBUG_ENTER("ha_cassandra::position");
2180
2181 /* Copy the primary key to rowid */
2182 key_copy(ref, (uchar*)record, &table->key_info[0],
2183 table->field[0]->key_length(), true);
2184
2185 DBUG_VOID_RETURN;
2186 }
2187
2188
rnd_pos(uchar * buf,uchar * pos)2189 int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
2190 {
2191 int rc;
2192 DBUG_ENTER("ha_cassandra::rnd_pos");
2193
2194 int save_active_index= active_index;
2195 active_index= 0; /* The primary key */
2196 rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
2197
2198 active_index= save_active_index;
2199
2200 DBUG_RETURN(rc);
2201 }
2202
2203
reset()2204 int ha_cassandra::reset()
2205 {
2206 doing_insert_batch= false;
2207 insert_lineno= 0;
2208 if (se)
2209 {
2210 se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
2211 THDVAR(table->in_use, write_consistency));
2212 se->set_n_retries(THDVAR(table->in_use, failure_retries));
2213 }
2214 return 0;
2215 }
2216
2217 /////////////////////////////////////////////////////////////////////////////
2218 // MRR implementation
2219 /////////////////////////////////////////////////////////////////////////////
2220
2221
2222 /*
2223 - The key can be only primary key
2224 - allow equality-ranges only.
2225 - anything else?
2226 */
multi_range_read_info_const(uint keyno,RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint * bufsz,uint * flags,Cost_estimate * cost)2227 ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
2228 void *seq_init_param,
2229 uint n_ranges, uint *bufsz,
2230 uint *flags, Cost_estimate *cost)
2231 {
2232 /* No support for const ranges so far */
2233 return HA_POS_ERROR;
2234 }
2235
2236
multi_range_read_info(uint keyno,uint n_ranges,uint keys,uint key_parts,uint * bufsz,uint * flags,Cost_estimate * cost)2237 ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
2238 uint key_parts, uint *bufsz,
2239 uint *flags, Cost_estimate *cost)
2240 {
2241 /* Can only be equality lookups on the primary key... */
2242 // TODO anything else?
2243 *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
2244 *flags |= HA_MRR_NO_ASSOCIATION;
2245
2246 return 10;
2247 }
2248
2249
multi_range_read_init(RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint mode,HANDLER_BUFFER * buf)2250 int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
2251 uint n_ranges, uint mode, HANDLER_BUFFER *buf)
2252 {
2253 int res;
2254 mrr_iter= seq->init(seq_init_param, n_ranges, mode);
2255 mrr_funcs= *seq;
2256 res= mrr_start_read();
2257 return (res? HA_ERR_INTERNAL_ERROR: 0);
2258 }
2259
2260
mrr_start_read()2261 bool ha_cassandra::mrr_start_read()
2262 {
2263 uint key_len;
2264
2265 MY_BITMAP *old_map;
2266 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2267
2268 se->new_lookup_keys();
2269
2270 while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
2271 {
2272 char *cass_key;
2273 int cass_key_len;
2274
2275 DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
2276
2277 uchar *key= (uchar*)mrr_cur_range.start_key.key;
2278 key_len= mrr_cur_range.start_key.length;
2279 //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
2280 store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
2281
2282 rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
2283
2284 // Primitive buffer control
2285 if ((ulong) se->add_lookup_key(cass_key, cass_key_len) >
2286 THDVAR(table->in_use, multiget_batch_size))
2287 break;
2288 }
2289
2290 dbug_tmp_restore_column_map(&table->read_set, old_map);
2291
2292 return se->multiget_slice();
2293 }
2294
2295
multi_range_read_next(range_id_t * range_info)2296 int ha_cassandra::multi_range_read_next(range_id_t *range_info)
2297 {
2298 int res;
2299 while(1)
2300 {
2301 if (!se->get_next_multiget_row())
2302 {
2303 res= read_cassandra_columns(true);
2304 break;
2305 }
2306 else
2307 {
2308 if (source_exhausted)
2309 {
2310 res= HA_ERR_END_OF_FILE;
2311 break;
2312 }
2313 else
2314 {
2315 if (mrr_start_read())
2316 {
2317 res= HA_ERR_INTERNAL_ERROR;
2318 break;
2319 }
2320 }
2321 }
2322 /*
2323 We get here if we've refilled the buffer and done another read. Try
2324 reading from results again
2325 */
2326 }
2327 return res;
2328 }
2329
2330
multi_range_read_explain_info(uint mrr_mode,char * str,size_t size)2331 int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
2332 {
2333 const char *mrr_str= "multiget_slice";
2334
2335 if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
2336 {
2337 uint mrr_str_len= strlen(mrr_str);
2338 uint copy_len= MY_MIN(mrr_str_len, size);
2339 memcpy(str, mrr_str, size);
2340 return copy_len;
2341 }
2342 return 0;
2343 }
2344
2345
2346 class Column_name_enumerator_impl : public Column_name_enumerator
2347 {
2348 ha_cassandra *obj;
2349 uint idx;
2350 public:
Column_name_enumerator_impl(ha_cassandra * obj_arg)2351 Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
get_next_name()2352 const char* get_next_name()
2353 {
2354 if (idx == obj->table->s->fields)
2355 return NULL;
2356 else
2357 return obj->table->field[idx++]->field_name.str;
2358 }
2359 };
2360
2361
update_row(const uchar * old_data,const uchar * new_data)2362 int ha_cassandra::update_row(const uchar *old_data, const uchar *new_data)
2363 {
2364 DYNAMIC_COLUMN_VALUE *oldvals, *vals;
2365 LEX_STRING *oldnames, *names;
2366 uint oldcount, count;
2367 String oldvalcol, valcol;
2368 MY_BITMAP *old_map;
2369 int res;
2370 DBUG_ENTER("ha_cassandra::update_row");
2371 /* Currently, it is guaranteed that new_data == table->record[0] */
2372 DBUG_ASSERT(new_data == table->record[0]);
2373 /* For now, just rewrite the full record */
2374 se->clear_insert_buffer();
2375
2376 old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2377
2378 char *old_key;
2379 int old_key_len;
2380 se->get_read_rowkey(&old_key, &old_key_len);
2381
2382 /* Get the key we're going to write */
2383 char *new_key;
2384 int new_key_len;
2385 if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
2386 {
2387 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2388 rowkey_converter->field->field_name.str, insert_lineno);
2389 dbug_tmp_restore_column_map(&table->read_set, old_map);
2390 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2391 }
2392
2393 /*
2394 Compare it to the key we've read. For all types that Cassandra supports,
2395 binary byte-wise comparison can be used
2396 */
2397 bool new_primary_key;
2398 if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
2399 new_primary_key= true;
2400 else
2401 new_primary_key= false;
2402
2403 if (dyncol_set)
2404 {
2405 Field *field= table->field[dyncol_field];
2406 /* move to get old_data */
2407 my_ptrdiff_t diff;
2408 diff= (my_ptrdiff_t) (old_data - new_data);
2409 field->move_field_offset(diff); // Points now at old_data
2410 if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol)))
2411 DBUG_RETURN(res);
2412 field->move_field_offset(-diff); // back to new_data
2413 if ((res= read_dyncol(&count, &vals, &names, &valcol)))
2414 {
2415 free_dynamic_row(&oldvals, &oldnames);
2416 DBUG_RETURN(res);
2417 }
2418 }
2419
2420 if (new_primary_key)
2421 {
2422 /*
2423 Primary key value changed. This is essentially a DELETE + INSERT.
2424 Add a DELETE operation into the batch
2425 */
2426 Column_name_enumerator_impl name_enumerator(this);
2427 se->add_row_deletion(old_key, old_key_len, &name_enumerator,
2428 oldnames,
2429 (dyncol_set ? oldcount : 0));
2430 oldcount= 0; // they will be deleted
2431 }
2432
2433 se->start_row_insert(new_key, new_key_len);
2434
2435 /* Convert other fields */
2436 for (uint i= 1; i < table->s->fields; i++)
2437 {
2438 char *cass_data;
2439 int cass_data_len;
2440 if (dyncol_set && dyncol_field == i)
2441 {
2442 DBUG_ASSERT(field_converters[i] == NULL);
2443 if ((res= write_dynamic_row(count, vals, names)))
2444 goto err;
2445 }
2446 else
2447 {
2448 if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
2449 {
2450 my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2451 field_converters[i]->field->field_name.str, insert_lineno);
2452 dbug_tmp_restore_column_map(&table->read_set, old_map);
2453 DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2454 }
2455 se->add_insert_column(field_converters[i]->field->field_name.str, 0,
2456 cass_data, cass_data_len);
2457 }
2458 }
2459 if (dyncol_set)
2460 {
2461 /* find removed fields */
2462 uint i= 0, j= 0;
2463 /* both array are sorted */
2464 for(; i < oldcount; i++)
2465 {
2466 int scmp= 0;
2467 while (j < count &&
2468 (scmp = mariadb_dyncol_column_cmp_named(names + j,
2469 oldnames + i)) < 0)
2470 j++;
2471 if (j < count &&
2472 scmp == 0)
2473 j++;
2474 else
2475 se->add_insert_delete_column(oldnames[i].str, oldnames[i].length);
2476 }
2477 }
2478
2479 dbug_tmp_restore_column_map(&table->read_set, old_map);
2480
2481 res= se->do_insert();
2482
2483 if (res)
2484 my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2485
2486 err:
2487 if (dyncol_set)
2488 {
2489 free_dynamic_row(&oldvals, &oldnames);
2490 free_dynamic_row(&vals, &names);
2491 }
2492
2493 DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2494 }
2495
2496
2497 /*
2498 We can't really have any locks for Cassandra Storage Engine. We're reading
2499 from Cassandra cluster, and other clients can asynchronously modify the data.
2500
2501 We can enforce locking within this process, but this will not be useful.
2502
2503 Thus, store_lock() should express that:
2504 - Writes do not block other writes
2505 - Reads should not block anything either, including INSERTs.
2506 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)2507 THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
2508 THR_LOCK_DATA **to,
2509 enum thr_lock_type lock_type)
2510 {
2511 DBUG_ENTER("ha_cassandra::store_lock");
2512 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2513 {
2514 /* Writes allow other writes */
2515 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2516 lock_type <= TL_WRITE))
2517 lock_type = TL_WRITE_ALLOW_WRITE;
2518
2519 /* Reads allow everything, including INSERTs */
2520 if (lock_type == TL_READ_NO_INSERT)
2521 lock_type = TL_READ;
2522
2523 lock.type= lock_type;
2524 }
2525 *to++= &lock;
2526 DBUG_RETURN(to);
2527 }
2528
2529
2530 /**
2531 check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
2532 if new and old definition are compatible
2533
2534 @details If there are no other explicit signs like changed number of
2535 fields this function will be called by compare_tables()
2536 (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
2537 file.
2538
2539 */
2540
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)2541 bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
2542 uint table_changes)
2543 {
2544 DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
2545 /* Checked, we intend to have this empty for Cassandra SE. */
2546 DBUG_RETURN(COMPATIBLE_DATA_YES);
2547 }
2548
2549
print_error(const char * format,...)2550 void Cassandra_se_interface::print_error(const char *format, ...)
2551 {
2552 va_list ap;
2553 va_start(ap, format);
2554 // it's not a problem if output was truncated
2555 my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
2556 va_end(ap);
2557 }
2558
2559
2560 struct st_mysql_storage_engine cassandra_storage_engine=
2561 { MYSQL_HANDLERTON_INTERFACE_VERSION };
2562
2563 static SHOW_VAR cassandra_status_variables[]= {
2564 {"row_inserts",
2565 (char*) &cassandra_counters.row_inserts, SHOW_LONG},
2566 {"row_insert_batches",
2567 (char*) &cassandra_counters.row_insert_batches, SHOW_LONG},
2568
2569 {"multiget_keys_scanned",
2570 (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
2571 {"multiget_reads",
2572 (char*) &cassandra_counters.multiget_reads, SHOW_LONG},
2573 {"multiget_rows_read",
2574 (char*) &cassandra_counters.multiget_rows_read, SHOW_LONG},
2575
2576 {"network_exceptions",
2577 (char*) &cassandra_counters.network_exceptions, SHOW_LONG},
2578 {"timeout_exceptions",
2579 (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
2580 {"unavailable_exceptions",
2581 (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
2582 {NullS, NullS, SHOW_LONG}
2583 };
2584
2585
2586
2587
maria_declare_plugin(cassandra)2588 maria_declare_plugin(cassandra)
2589 {
2590 MYSQL_STORAGE_ENGINE_PLUGIN,
2591 &cassandra_storage_engine,
2592 "CASSANDRA",
2593 "Monty Program Ab",
2594 "Cassandra storage engine",
2595 PLUGIN_LICENSE_GPL,
2596 cassandra_init_func, /* Plugin Init */
2597 cassandra_done_func, /* Plugin Deinit */
2598 0x0001, /* version number (0.1) */
2599 cassandra_status_variables, /* status variables */
2600 cassandra_system_variables, /* system variables */
2601 "0.1", /* string version */
2602 MariaDB_PLUGIN_MATURITY_EXPERIMENTAL /* maturity */
2603 }
2604 maria_declare_plugin_end;
2605