1 /*
2    Copyright (c) 2012, 2020, MariaDB Corporation.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation        // gcc: Class implementation
19 #endif
20 
21 #include <my_config.h>
22 #include <mysql/plugin.h>
23 #include "ha_cassandra.h"
24 #include "sql_class.h"
25 
26 #define DYNCOL_USUAL 20
27 #define DYNCOL_DELTA 100
28 #define DYNCOL_USUAL_REC 1024
29 #define DYNCOL_DELTA_REC 1024
30 
31 static handler *cassandra_create_handler(handlerton *hton,
32                                        TABLE_SHARE *table,
33                                        MEM_ROOT *mem_root);
34 
35 extern int dynamic_column_error_message(enum_dyncol_func_result rc);
36 
37 handlerton *cassandra_hton;
38 
39 
40 /*
41    Hash used to track the number of open tables; variable for example share
42    methods
43 */
44 static HASH cassandra_open_tables;
45 
46 /* The mutex used to init the hash; variable for example share methods */
47 mysql_mutex_t cassandra_mutex;
48 
49 
50 /**
51   Structure for CREATE TABLE options (table options).
52   It needs to be called ha_table_option_struct.
53 
54   The option values can be specified in the CREATE TABLE at the end:
55   CREATE TABLE ( ... ) *here*
56 */
57 
58 struct ha_table_option_struct
59 {
60   const char *thrift_host;
61   int         thrift_port;
62   const char *keyspace;
63   const char *column_family;
64 };
65 
66 
67 ha_create_table_option cassandra_table_option_list[]=
68 {
69   /*
70     one option that takes an arbitrary string
71   */
72   HA_TOPTION_STRING("thrift_host", thrift_host),
73   HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
74   HA_TOPTION_STRING("keyspace", keyspace),
75   HA_TOPTION_STRING("column_family", column_family),
76   HA_TOPTION_END
77 };
78 
79 /**
80   Structure for CREATE TABLE options (field options).
81 */
82 
83 struct ha_field_option_struct
84 {
85   bool dyncol_field;
86 };
87 
88 ha_create_table_option cassandra_field_option_list[]=
89 {
90   /*
91     Collect all other columns as dynamic here,
92     the valid values are YES/NO, ON/OFF, 1/0.
93     The default is 0, that is false, no, off.
94   */
95   HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
96   HA_FOPTION_END
97 };
98 
99 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
100   "Number of rows in an INSERT batch",
101   NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
102 
103 static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
104   "Number of rows in a multiget(MRR) batch",
105   NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
106 
107 static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
108   "Number of rows in an rnd_read (full scan) batch",
109   NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
110 
111 static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
112   "Number of times to retry Cassandra calls that failed due to timeouts or "
113   "network communication problems. The default, 0, means not to retry.",
114   NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
115 
116 /* These match values in enum_cassandra_consistency_level */
117 const char *cassandra_consistency_level[] =
118 {
119   "ONE",
120   "QUORUM",
121   "LOCAL_QUORUM",
122   "EACH_QUORUM",
123   "ALL",
124   "ANY",
125   "TWO",
126   "THREE",
127    NullS
128 };
129 
130 TYPELIB cassandra_consistency_level_typelib= {
131   array_elements(cassandra_consistency_level) - 1, "",
132   cassandra_consistency_level, NULL
133 };
134 
135 
136 static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
137   "Cassandra consistency level to use for write operations", NULL, NULL,
138   ONE, &cassandra_consistency_level_typelib);
139 
140 static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
141   "Cassandra consistency level to use for read operations", NULL, NULL,
142   ONE, &cassandra_consistency_level_typelib);
143 
144 
145 mysql_mutex_t cassandra_default_host_lock;
146 static char* cassandra_default_thrift_host = NULL;
147 static char cassandra_default_host_buf[256]="";
148 
149 static void
cassandra_default_thrift_host_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)150 cassandra_default_thrift_host_update(THD *thd,
151                                      struct st_mysql_sys_var* var,
152                                      void* var_ptr, /*!< out: where the
153                                                     formal string goes */
154                                      const void* save) /*!< in: immediate result
155                                                        from check function */
156 {
157   const char *new_host= *((char**)save);
158   const size_t max_len= sizeof(cassandra_default_host_buf);
159 
160   mysql_mutex_lock(&cassandra_default_host_lock);
161 
162   if (new_host)
163   {
164     strncpy(cassandra_default_host_buf, new_host, max_len-1);
165     cassandra_default_host_buf[max_len-1]= 0;
166     cassandra_default_thrift_host= cassandra_default_host_buf;
167   }
168   else
169   {
170     cassandra_default_host_buf[0]= 0;
171     cassandra_default_thrift_host= NULL;
172   }
173 
174   *((const char**)var_ptr)= cassandra_default_thrift_host;
175 
176   mysql_mutex_unlock(&cassandra_default_host_lock);
177 }
178 
179 
180 static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
181                         PLUGIN_VAR_RQCMDARG,
182                         "Default host for Cassandra thrift connections",
183                         /*check*/NULL,
184                         cassandra_default_thrift_host_update,
185                         /*default*/NULL);
186 
187 static struct st_mysql_sys_var* cassandra_system_variables[]= {
188   MYSQL_SYSVAR(insert_batch_size),
189   MYSQL_SYSVAR(multiget_batch_size),
190   MYSQL_SYSVAR(rnd_batch_size),
191 
192   MYSQL_SYSVAR(default_thrift_host),
193   MYSQL_SYSVAR(write_consistency),
194   MYSQL_SYSVAR(read_consistency),
195   MYSQL_SYSVAR(failure_retries),
196   NULL
197 };
198 
199 Cassandra_status_vars cassandra_counters;
200 
201 /**
202   @brief
203   Function we use in the creation of our hash to get key.
204 */
205 
cassandra_get_key(CASSANDRA_SHARE * share,size_t * length,my_bool not_used)206 static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
207                              my_bool not_used __attribute__((unused)))
208 {
209   *length=share->table_name_length;
210   return (uchar*) share->table_name;
211 }
212 
213 #ifdef HAVE_PSI_INTERFACE
214 static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
215 
216 static PSI_mutex_info all_cassandra_mutexes[]=
217 {
218   { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
219   { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
220 };
221 
init_cassandra_psi_keys()222 static void init_cassandra_psi_keys()
223 {
224   const char* category= "cassandra";
225   int count;
226 
227   if (PSI_server == NULL)
228     return;
229 
230   count= array_elements(all_cassandra_mutexes);
231   PSI_server->register_mutex(category, all_cassandra_mutexes, count);
232 }
233 #endif
234 
cassandra_init_func(void * p)235 static int cassandra_init_func(void *p)
236 {
237   DBUG_ENTER("cassandra_init_func");
238 
239 #ifdef HAVE_PSI_INTERFACE
240   init_cassandra_psi_keys();
241 #endif
242 
243   cassandra_hton= (handlerton *)p;
244   mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
245   (void) my_hash_init(PSI_INSTRUMENT_ME, &cassandra_open_tables,system_charset_info,32,0,0,
246                       (my_hash_get_key) cassandra_get_key,0,0);
247 
248   cassandra_hton->create=  cassandra_create_handler;
249   /*
250     Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
251     TABLE to create an *empty* table from scratch. Cassandra table won't be
252     emptied if re-created.
253   */
254   cassandra_hton->flags=   0;
255   cassandra_hton->table_options= cassandra_table_option_list;
256   cassandra_hton->field_options= cassandra_field_option_list;
257 
258   mysql_mutex_init(0 /* no instrumentation */,
259                    &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
260 
261   DBUG_RETURN(0);
262 }
263 
264 
cassandra_done_func(void * p)265 static int cassandra_done_func(void *p)
266 {
267   int error= 0;
268   DBUG_ENTER("cassandra_done_func");
269   if (cassandra_open_tables.records)
270     error= 1;
271   my_hash_free(&cassandra_open_tables);
272   mysql_mutex_destroy(&cassandra_mutex);
273   mysql_mutex_destroy(&cassandra_default_host_lock);
274   DBUG_RETURN(error);
275 }
276 
277 
278 /**
279   @brief
280   Example of simple lock controls. The "share" it creates is a
281   structure we will pass to each cassandra handler. Do you have to have
282   one of these? Well, you have pieces that are used for locking, and
283   they are needed to function.
284 */
285 
get_share(const char * table_name,TABLE * table)286 static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
287 {
288   CASSANDRA_SHARE *share;
289   uint length;
290   char *tmp_name;
291 
292   mysql_mutex_lock(&cassandra_mutex);
293   length=(uint) strlen(table_name);
294 
295   if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
296                                               (uchar*) table_name,
297                                               length)))
298   {
299     if (!(share=(CASSANDRA_SHARE *)
300           my_multi_malloc(MYF(MY_WME | MY_ZEROFILL), PSI_INSTRUMENT_ME,
301                           &share, sizeof(*share),
302                           &tmp_name, length+1,
303                           NullS)))
304     {
305       mysql_mutex_unlock(&cassandra_mutex);
306       return NULL;
307     }
308 
309     share->use_count=0;
310     share->table_name_length=length;
311     share->table_name=tmp_name;
312     strmov(share->table_name,table_name);
313     if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
314       goto error;
315     thr_lock_init(&share->lock);
316     mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
317                      &share->mutex, MY_MUTEX_INIT_FAST);
318   }
319   share->use_count++;
320   mysql_mutex_unlock(&cassandra_mutex);
321 
322   return share;
323 
324 error:
325   mysql_mutex_destroy(&share->mutex);
326   my_free(share);
327 
328   return NULL;
329 }
330 
331 
332 /**
333   @brief
334   Free lock controls. We call this whenever we close a table. If the table had
335   the last reference to the share, then we free memory associated with it.
336 */
337 
free_share(CASSANDRA_SHARE * share)338 static int free_share(CASSANDRA_SHARE *share)
339 {
340   mysql_mutex_lock(&cassandra_mutex);
341   if (!--share->use_count)
342   {
343     my_hash_delete(&cassandra_open_tables, (uchar*) share);
344     thr_lock_delete(&share->lock);
345     mysql_mutex_destroy(&share->mutex);
346     my_free(share);
347   }
348   mysql_mutex_unlock(&cassandra_mutex);
349 
350   return 0;
351 }
352 
353 
cassandra_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)354 static handler* cassandra_create_handler(handlerton *hton,
355                                        TABLE_SHARE *table,
356                                        MEM_ROOT *mem_root)
357 {
358   return new (mem_root) ha_cassandra(hton, table);
359 }
360 
361 
ha_cassandra(handlerton * hton,TABLE_SHARE * table_arg)362 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
363   :handler(hton, table_arg),
364    se(NULL), field_converters(NULL),
365    special_type_field_converters(NULL),
366    special_type_field_names(NULL), n_special_type_fields(0),
367    rowkey_converter(NULL),
368    dyncol_field(0), dyncol_set(0)
369 {}
370 
371 
connect_and_check_options(TABLE * table_arg)372 int ha_cassandra::connect_and_check_options(TABLE *table_arg)
373 {
374   ha_table_option_struct *options= table_arg->s->option_struct;
375   int res;
376   DBUG_ENTER("ha_cassandra::connect_and_check_options");
377 
378   if ((res= check_field_options(table_arg->s->field)) ||
379       (res= check_table_options(options)))
380     DBUG_RETURN(res);
381 
382   se= create_cassandra_se();
383   se->set_column_family(options->column_family);
384   const char *thrift_host= options->thrift_host? options->thrift_host:
385                            cassandra_default_thrift_host;
386   if (se->connect(thrift_host, options->thrift_port, options->keyspace))
387   {
388     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
389     DBUG_RETURN(HA_ERR_NO_CONNECTION);
390   }
391 
392   if (setup_field_converters(table_arg->field, table_arg->s->fields))
393   {
394     DBUG_RETURN(HA_ERR_NO_CONNECTION);
395   }
396 
397   DBUG_RETURN(0);
398 }
399 
400 
check_field_options(Field ** fields)401 int ha_cassandra::check_field_options(Field **fields)
402 {
403   Field **field;
404   uint i;
405   DBUG_ENTER("ha_cassandra::check_field_options");
406   for (field= fields, i= 0; *field; field++, i++)
407   {
408     ha_field_option_struct *field_options= (*field)->option_struct;
409     if (field_options && field_options->dyncol_field)
410     {
411       if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
412       {
413          my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name.str);
414          DBUG_RETURN(HA_WRONG_CREATE_OPTION);
415       }
416       dyncol_set= 1;
417       dyncol_field= i;
418       bzero(&dynamic_values, sizeof(dynamic_values));
419       bzero(&dynamic_names, sizeof(dynamic_names));
420       bzero(&dynamic_rec, sizeof(dynamic_rec));
421     }
422   }
423   DBUG_RETURN(0);
424 }
425 
426 
open(const char * name,int mode,uint test_if_locked)427 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
428 {
429   DBUG_ENTER("ha_cassandra::open");
430 
431   if (!(share = get_share(name, table)))
432     DBUG_RETURN(1);
433   thr_lock_data_init(&share->lock,&lock,NULL);
434 
435   DBUG_ASSERT(!se);
436   /*
437     Don't do the following on open: it prevents SHOW CREATE TABLE when the server
438     has gone away.
439   */
440   /*
441   int res;
442   if ((res= connect_and_check_options(table)))
443   {
444     DBUG_RETURN(res);
445   }
446   */
447 
448   info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
449   insert_lineno= 0;
450 
451   DBUG_RETURN(0);
452 }
453 
454 
close(void)455 int ha_cassandra::close(void)
456 {
457   DBUG_ENTER("ha_cassandra::close");
458   delete se;
459   se= NULL;
460   free_field_converters();
461   DBUG_RETURN(free_share(share));
462 }
463 
464 
check_table_options(ha_table_option_struct * options)465 int ha_cassandra::check_table_options(ha_table_option_struct *options)
466 {
467   if (!options->thrift_host && (!cassandra_default_thrift_host ||
468                                 !cassandra_default_thrift_host[0]))
469   {
470     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
471              "thrift_host table option must be specified, or "
472              "@@cassandra_default_thrift_host must be set");
473     return HA_WRONG_CREATE_OPTION;
474   }
475 
476   if (!options->keyspace || !options->column_family)
477   {
478     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
479              "keyspace and column_family table options must be specified");
480     return HA_WRONG_CREATE_OPTION;
481   }
482   return 0;
483 }
484 
485 
486 /**
487   @brief
488   create() is called to create a table. The variable name will have the name
489   of the table.
490 
491   @details
492   When create() is called you do not need to worry about
493   opening the table. Also, the .frm file will have already been
494   created so adjusting create_info is not necessary. You can overwrite
495   the .frm file at this point if you wish to change the table
496   definition, but there are no methods currently provided for doing
497   so.
498 
499   Called from handle.cc by ha_create_table().
500 
501   @see
502   ha_create_table() in handle.cc
503 */
504 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)505 int ha_cassandra::create(const char *name, TABLE *table_arg,
506                          HA_CREATE_INFO *create_info)
507 {
508   int res;
509   DBUG_ENTER("ha_cassandra::create");
510 
511   if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
512       table_arg->key_info[0].user_defined_key_parts != 1 ||
513       table_arg->key_info[0].key_part[0].fieldnr != 1)
514   {
515     my_error(ER_WRONG_COLUMN_NAME, MYF(0),
516              "Table must have PRIMARY KEY defined over the first column");
517     DBUG_RETURN(HA_WRONG_CREATE_OPTION);
518   }
519 
520   DBUG_ASSERT(!se);
521   if ((res= connect_and_check_options(table_arg)))
522     DBUG_RETURN(res);
523 
524   insert_lineno= 0;
525   DBUG_RETURN(0);
526 }
527 
528 /*
529   Mapping needs to
530   - copy value from MySQL record to Thrift buffer
531   - copy value from Thrift bufer to MySQL record..
532 
533 */
534 
535 /* Converter base */
536 class ColumnDataConverter
537 {
538 public:
539   Field *field;
540 
541   /* This will save Cassandra's data in the Field */
542   virtual int cassandra_to_mariadb(const char *cass_data,
543                                     int cass_data_len)=0;
544 
545   /*
546     This will get data from the Field pointer, store Cassandra's form
547     in internal buffer, and return pointer/size.
548 
549     @return
550       false - OK
551       true  - Failed to convert value (completely, there is no value to insert
552               at all).
553   */
554   virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
~ColumnDataConverter()555   virtual ~ColumnDataConverter() {};
556 };
557 
558 
559 class DoubleDataConverter : public ColumnDataConverter
560 {
561   double buf;
562 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)563   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
564   {
565     DBUG_ASSERT(cass_data_len == sizeof(double));
566     double *pdata= (double*) cass_data;
567     field->store(*pdata);
568     return 0;
569   }
570 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)571   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
572   {
573     buf= field->val_real();
574     *cass_data= (char*)&buf;
575     *cass_data_len=sizeof(double);
576     return false;
577   }
~DoubleDataConverter()578   ~DoubleDataConverter(){}
579 };
580 
581 
582 class FloatDataConverter : public ColumnDataConverter
583 {
584   float buf;
585 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)586   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
587   {
588     DBUG_ASSERT(cass_data_len == sizeof(float));
589     float *pdata= (float*) cass_data;
590     field->store(*pdata);
591     return 0;
592   }
593 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)594   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
595   {
596     buf= field->val_real();
597     *cass_data= (char*)&buf;
598     *cass_data_len=sizeof(float);
599     return false;
600   }
~FloatDataConverter()601   ~FloatDataConverter(){}
602 };
603 
flip64(const char * from,char * to)604 static void flip64(const char *from, char* to)
605 {
606   to[0]= from[7];
607   to[1]= from[6];
608   to[2]= from[5];
609   to[3]= from[4];
610   to[4]= from[3];
611   to[5]= from[2];
612   to[6]= from[1];
613   to[7]= from[0];
614 }
615 
616 class BigintDataConverter : public ColumnDataConverter
617 {
618   longlong buf;
619   bool flip; /* is false when reading counter columns */
620 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)621   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
622   {
623     longlong tmp;
624     DBUG_ASSERT(cass_data_len == sizeof(longlong));
625     if (flip)
626       flip64(cass_data, (char*)&tmp);
627     else
628       memcpy(&tmp, cass_data, sizeof(longlong));
629     field->store(tmp);
630     return 0;
631   }
632 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)633   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
634   {
635     longlong tmp= field->val_int();
636     if (flip)
637       flip64((const char*)&tmp, (char*)&buf);
638     else
639       memcpy(&buf, &tmp, sizeof(longlong));
640     *cass_data= (char*)&buf;
641     *cass_data_len=sizeof(longlong);
642     return false;
643   }
BigintDataConverter(bool flip_arg)644   BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
~BigintDataConverter()645   ~BigintDataConverter(){}
646 };
647 
flip32(const char * from,char * to)648 static void flip32(const char *from, char* to)
649 {
650   to[0]= from[3];
651   to[1]= from[2];
652   to[2]= from[1];
653   to[3]= from[0];
654 }
655 
656 
657 class TinyintDataConverter : public ColumnDataConverter
658 {
659   char buf;
660 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)661   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
662   {
663     DBUG_ASSERT(cass_data_len == 1);
664     field->store(cass_data[0]);
665     return 0;
666   }
667 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)668   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
669   {
670     buf= field->val_int()? 1 : 0; /* TODO: error handling? */
671     *cass_data= (char*)&buf;
672     *cass_data_len= 1;
673     return false;
674   }
~TinyintDataConverter()675   ~TinyintDataConverter(){}
676 };
677 
678 
679 class Int32DataConverter : public ColumnDataConverter
680 {
681   int32_t buf;
682 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)683   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
684   {
685     int32_t tmp;
686     DBUG_ASSERT(cass_data_len == sizeof(int32_t));
687     flip32(cass_data, (char*)&tmp);
688     field->store(tmp);
689     return 0;
690   }
691 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)692   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
693   {
694     int32_t tmp= field->val_int();
695     flip32((const char*)&tmp, (char*)&buf);
696     *cass_data= (char*)&buf;
697     *cass_data_len=sizeof(int32_t);
698     return false;
699   }
~Int32DataConverter()700   ~Int32DataConverter(){}
701 };
702 
703 
704 class StringCopyConverter : public ColumnDataConverter
705 {
706   String buf;
707   size_t max_length;
708 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)709   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
710   {
711     if ((size_t)cass_data_len > max_length)
712       return 1;
713     field->store(cass_data, cass_data_len,field->charset());
714     return 0;
715   }
716 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)717   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
718   {
719     String *pstr= field->val_str(&buf);
720     *cass_data= (char*)pstr->ptr();
721     *cass_data_len= pstr->length();
722     return false;
723   }
StringCopyConverter(size_t max_length_arg)724   StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter()725   ~StringCopyConverter(){}
726 };
727 
728 
729 class TimestampDataConverter : public ColumnDataConverter
730 {
731   int64_t buf;
732 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)733   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
734   {
735     /* Cassandra data is milliseconds-since-epoch in network byte order */
736     int64_t tmp;
737     DBUG_ASSERT(cass_data_len==8);
738     flip64(cass_data, (char*)&tmp);
739     /*
740       store_TIME's arguments:
741       - seconds since epoch
742       - microsecond fraction of a second.
743     */
744     ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
745     return 0;
746   }
747 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)748   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
749   {
750     my_time_t ts_time;
751     ulong ts_microsec;
752     int64_t tmp;
753     ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec);
754 
755     /* Cassandra needs milliseconds-since-epoch */
756     tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000;
757     flip64((const char*)&tmp, (char*)&buf);
758 
759     *cass_data= (char*)&buf;
760     *cass_data_len= 8;
761     return false;
762   }
~TimestampDataConverter()763   ~TimestampDataConverter(){}
764 };
765 
766 
767 
convert_hex_digit(const char c)768 static int convert_hex_digit(const char c)
769 {
770   int num;
771   if (c >= '0' && c <= '9')
772     num= c - '0';
773   else if (c >= 'A' && c <= 'F')
774     num= c - 'A' + 10;
775   else if (c >= 'a' && c <= 'f')
776     num= c - 'a' + 10;
777   else
778     return -1; /* Couldn't convert */
779   return num;
780 }
781 
782 
783 const char map2number[]="0123456789abcdef";
784 
convert_uuid2string(char * str,const char * cass_data)785 static void convert_uuid2string(char *str, const char *cass_data)
786 {
787   char *ptr= str;
788   /* UUID arrives as 16-byte number in network byte order */
789   for (uint i=0; i < 16; i++)
790   {
791     *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
792     *(ptr++)= map2number[cass_data[i] & 0xF];
793     if (i == 3 || i == 5 || i == 7 || i == 9)
794       *(ptr++)= '-';
795   }
796   *ptr= 0;
797 }
798 
convert_string2uuid(char * buf,const char * str)799 static bool convert_string2uuid(char *buf, const char *str)
800 {
801   int lower, upper;
802   for (uint i= 0; i < 16; i++)
803   {
804     if ((upper= convert_hex_digit(str[0])) == -1 ||
805         (lower= convert_hex_digit(str[1])) == -1)
806     {
807       return true;
808     }
809     buf[i]= lower | (upper << 4);
810     str += 2;
811     if (i == 3 || i == 5 || i == 7 || i == 9)
812     {
813       if (str[0] != '-')
814         return true;
815       str++;
816     }
817   }
818   return false;
819 }
820 
821 
822 class UuidDataConverter : public ColumnDataConverter
823 {
824   char buf[16]; /* Binary UUID representation */
825   String str_buf;
826 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)827   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
828   {
829     DBUG_ASSERT(cass_data_len==16);
830     char str[37];
831     convert_uuid2string(str, cass_data);
832     field->store(str, 36,field->charset());
833     return 0;
834   }
835 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)836   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
837   {
838     String *uuid_str= field->val_str(&str_buf);
839 
840     if (uuid_str->length() != 36)
841       return true;
842 
843     if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
844       return true;
845     *cass_data= buf;
846     *cass_data_len= 16;
847     return false;
848   }
~UuidDataConverter()849   ~UuidDataConverter(){}
850 };
851 
852 /**
853   Converting dynamic columns types to/from casandra types
854 */
855 
856 
857 /**
858   Check and initialize (if it is needed) string MEM_ROOT
859 */
alloc_strings_memroot(MEM_ROOT * mem_root)860 static void alloc_strings_memroot(MEM_ROOT *mem_root)
861 {
862   if (!alloc_root_inited(mem_root))
863   {
864     /*
865       The mem_root used to allocate UUID (of length 36 + \0) so make
866       appropriate allocated size
867     */
868     init_alloc_root(PSI_INSTRUMENT_ME, mem_root,
869                     (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
870                     ALLOC_ROOT_MIN_BLOCK_SIZE,
871                     (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
872                     ALLOC_ROOT_MIN_BLOCK_SIZE, MYF(MY_THREAD_SPECIFIC));
873   }
874 }
875 
free_strings_memroot(MEM_ROOT * mem_root)876 static void free_strings_memroot(MEM_ROOT *mem_root)
877 {
878   if (alloc_root_inited(mem_root))
879     free_root(mem_root, MYF(0));
880 }
881 
cassandra_to_dyncol_intLong(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)882 bool cassandra_to_dyncol_intLong(const char *cass_data,
883                                  int cass_data_len __attribute__((unused)),
884                                  DYNAMIC_COLUMN_VALUE *value,
885                                  MEM_ROOT *mem_root __attribute__((unused)))
886 {
887   value->type= DYN_COL_INT;
888 #ifdef WORDS_BIGENDIAN
889   value->x.long_value= (longlong)*cass_data;
890 #else
891   flip64(cass_data, (char *)&value->x.long_value);
892 #endif
893   return 0;
894 }
895 
dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)896 bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
897                              char **cass_data, int *cass_data_len,
898                              void* buff, void **freemem)
899 {
900   longlong *tmp= (longlong *) buff;
901   enum enum_dyncol_func_result rc=
902     mariadb_dyncol_val_long(tmp, value);
903   if (rc < 0)
904     return true;
905   *cass_data_len= sizeof(longlong);
906 #ifdef WORDS_BIGENDIAN
907   *cass_data= (char *)buff;
908 #else
909   flip64((char *)buff, (char *)buff + sizeof(longlong));
910   *cass_data= (char *)buff + sizeof(longlong);
911 #endif
912   *freemem= NULL;
913   return false;
914 }
915 
cassandra_to_dyncol_intInt32(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)916 bool cassandra_to_dyncol_intInt32(const char *cass_data,
917                                   int cass_data_len __attribute__((unused)),
918                                   DYNAMIC_COLUMN_VALUE *value,
919                                   MEM_ROOT *mem_root __attribute__((unused)))
920 {
921   int32 tmp;
922   value->type= DYN_COL_INT;
923 #ifdef WORDS_BIGENDIAN
924   tmp= *((int32 *)cass_data);
925 #else
926   flip32(cass_data, (char *)&tmp);
927 #endif
928   value->x.long_value= tmp;
929   return 0;
930 }
931 
932 
dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)933 bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
934                               char **cass_data, int *cass_data_len,
935                               void* buff, void **freemem)
936 {
937   longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
938   enum enum_dyncol_func_result rc=
939     mariadb_dyncol_val_long(tmp, value);
940   if (rc < 0)
941     return true;
942   *cass_data_len= sizeof(int32);
943   *cass_data= (char *)buff;
944 #ifdef WORDS_BIGENDIAN
945   *((int32 *) buff) = (int32) *tmp;
946 #else
947   {
948     int32 tmp2= (int32) *tmp;
949     flip32((char *)&tmp2, (char *)buff);
950   }
951 #endif
952   *freemem= NULL;
953   return false;
954 }
955 
956 
cassandra_to_dyncol_intCounter(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)957 bool cassandra_to_dyncol_intCounter(const char *cass_data,
958                                     int cass_data_len __attribute__((unused)),
959                                     DYNAMIC_COLUMN_VALUE *value,
960                                     MEM_ROOT *mem_root __attribute__((unused)))
961 {
962   value->type= DYN_COL_INT;
963   value->x.long_value= *((longlong *)cass_data);
964   return 0;
965 }
966 
967 
dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)968 bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
969                                 char **cass_data, int *cass_data_len,
970                                 void* buff, void **freemem)
971 {
972   longlong *tmp= (longlong *)buff;
973   enum enum_dyncol_func_result rc=
974     mariadb_dyncol_val_long(tmp, value);
975   if (rc < 0)
976     return true;
977   *cass_data_len= sizeof(longlong);
978   *cass_data= (char *)buff;
979   *freemem= NULL;
980   return false;
981 }
982 
cassandra_to_dyncol_doubleFloat(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)983 bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
984                                      int cass_data_len __attribute__((unused)),
985                                      DYNAMIC_COLUMN_VALUE *value,
986                                      MEM_ROOT *mem_root __attribute__((unused)))
987 {
988   value->type= DYN_COL_DOUBLE;
989   value->x.double_value= *((float *)cass_data);
990   return 0;
991 }
992 
dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)993 bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
994                               char **cass_data, int *cass_data_len,
995                               void* buff, void **freemem)
996 {
997   double tmp;
998   enum enum_dyncol_func_result rc=
999     mariadb_dyncol_val_double(&tmp, value);
1000   if (rc < 0)
1001     return true;
1002   *((float *)buff)= (float) tmp;
1003   *cass_data_len= sizeof(float);
1004   *cass_data= (char *)buff;
1005   *freemem= NULL;
1006   return false;
1007 }
1008 
cassandra_to_dyncol_doubleDouble(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1009 bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
1010                                       int cass_data_len __attribute__((unused)),
1011                                       DYNAMIC_COLUMN_VALUE *value,
1012                                       MEM_ROOT *mem_root
1013                                       __attribute__((unused)))
1014 {
1015   value->type= DYN_COL_DOUBLE;
1016   value->x.double_value= *((double *)cass_data);
1017   return 0;
1018 }
1019 
dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1020 bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
1021                                char **cass_data, int *cass_data_len,
1022                                void* buff, void **freemem)
1023 {
1024   double *tmp= (double *)buff;
1025   enum enum_dyncol_func_result rc=
1026     mariadb_dyncol_val_double(tmp, value);
1027   if (rc < 0)
1028     return true;
1029   *cass_data_len= sizeof(double);
1030   *cass_data= (char *)buff;
1031   *freemem= NULL;
1032   return false;
1033 }
1034 
cassandra_to_dyncol_strStr(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,CHARSET_INFO * cs)1035 bool cassandra_to_dyncol_strStr(const char *cass_data,
1036                                 int cass_data_len,
1037                                 DYNAMIC_COLUMN_VALUE *value,
1038                                 CHARSET_INFO *cs)
1039 {
1040   value->type= DYN_COL_STRING;
1041   value->x.string.charset= cs;
1042   value->x.string.value.str= (char *)cass_data;
1043   value->x.string.value.length= cass_data_len;
1044   return 0;
1045 }
1046 
dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem,CHARSET_INFO * cs)1047 bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
1048                             char **cass_data, int *cass_data_len,
1049                             void* buff, void **freemem, CHARSET_INFO *cs)
1050 {
1051   DYNAMIC_STRING tmp;
1052   if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1053     return 1;
1054   enum enum_dyncol_func_result rc=
1055     mariadb_dyncol_val_str(&tmp, value, cs, '\0');
1056   if (rc < 0)
1057   {
1058     dynstr_free(&tmp);
1059     return 1;
1060   }
1061   *cass_data_len= tmp.length;
1062   *(cass_data)= tmp.str;
1063   *freemem= tmp.str;
1064   return 0;
1065 }
1066 
cassandra_to_dyncol_strBytes(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1067 bool cassandra_to_dyncol_strBytes(const char *cass_data,
1068                                   int cass_data_len,
1069                                   DYNAMIC_COLUMN_VALUE *value,
1070                                   MEM_ROOT *mem_root __attribute__((unused)))
1071 {
1072   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1073                                     &my_charset_bin);
1074 }
1075 
dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1076 bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
1077                               char **cass_data, int *cass_data_len,
1078                               void* buff, void **freemem)
1079 {
1080   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1081                                 buff, freemem, &my_charset_bin);
1082 }
1083 
cassandra_to_dyncol_strAscii(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1084 bool cassandra_to_dyncol_strAscii(const char *cass_data,
1085                                   int cass_data_len,
1086                                   DYNAMIC_COLUMN_VALUE *value,
1087                                   MEM_ROOT *mem_root __attribute__((unused)))
1088 {
1089   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1090                                     &my_charset_latin1_bin);
1091 }
1092 
dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1093 bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
1094                               char **cass_data, int *cass_data_len,
1095                               void* buff, void **freemem)
1096 {
1097   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1098                                 buff, freemem, &my_charset_latin1_bin);
1099 }
1100 
cassandra_to_dyncol_strUTF8(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1101 bool cassandra_to_dyncol_strUTF8(const char *cass_data,
1102                                  int cass_data_len,
1103                                  DYNAMIC_COLUMN_VALUE *value,
1104                                  MEM_ROOT *mem_root __attribute__((unused)))
1105 {
1106   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1107                                     &my_charset_utf8mb3_unicode_ci);
1108 }
1109 
dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1110 bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
1111                              char **cass_data, int *cass_data_len,
1112                              void* buff, void **freemem)
1113 {
1114   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1115                                 buff, freemem, &my_charset_utf8mb3_unicode_ci);
1116 }
1117 
cassandra_to_dyncol_strUUID(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1118 bool cassandra_to_dyncol_strUUID(const char *cass_data,
1119                                  int cass_data_len,
1120                                  DYNAMIC_COLUMN_VALUE *value,
1121                                  MEM_ROOT *mem_root)
1122 {
1123   value->type= DYN_COL_STRING;
1124   value->x.string.charset= &my_charset_bin;
1125   alloc_strings_memroot(mem_root);
1126   value->x.string.value.str= (char *)alloc_root(mem_root, 37);
1127   if (!value->x.string.value.str)
1128   {
1129     value->x.string.value.length= 0;
1130     return 1;
1131   }
1132   convert_uuid2string(value->x.string.value.str, cass_data);
1133   value->x.string.value.length= 36;
1134   return 0;
1135 }
1136 
dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1137 bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
1138                              char **cass_data, int *cass_data_len,
1139                              void* buff, void **freemem)
1140 {
1141   DYNAMIC_STRING tmp;
1142   if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1143     return true;
1144   enum enum_dyncol_func_result rc=
1145     mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0');
1146   if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
1147   {
1148     dynstr_free(&tmp);
1149     return true;
1150   }
1151 
1152   *cass_data_len= tmp.length;
1153   *(cass_data)= tmp.str;
1154   *freemem= tmp.str;
1155   return 0;
1156 }
1157 
cassandra_to_dyncol_intBool(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1158 bool cassandra_to_dyncol_intBool(const char *cass_data,
1159                                  int cass_data_len,
1160                                  DYNAMIC_COLUMN_VALUE *value,
1161                                  MEM_ROOT *mem_root __attribute__((unused)))
1162 {
1163   value->type= DYN_COL_INT;
1164   value->x.long_value= (cass_data[0] ? 1 : 0);
1165   return 0;
1166 }
1167 
dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1168 bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
1169                              char **cass_data, int *cass_data_len,
1170                              void* buff, void **freemem)
1171 {
1172   longlong tmp;
1173   enum enum_dyncol_func_result rc=
1174     mariadb_dyncol_val_long(&tmp, value);
1175   if (rc < 0)
1176     return true;
1177   ((char *)buff)[0]= (tmp ? 1 : 0);
1178   *cass_data_len= 1;
1179   *(cass_data)= (char *)buff;
1180   *freemem= 0;
1181   return 0;
1182 }
1183 
1184 
1185 const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
1186 const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
1187 const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
1188 
1189 const char * const validator_float=   "org.apache.cassandra.db.marshal.FloatType";
1190 const char * const validator_double=  "org.apache.cassandra.db.marshal.DoubleType";
1191 
1192 const char * const validator_blob=    "org.apache.cassandra.db.marshal.BytesType";
1193 const char * const validator_ascii=   "org.apache.cassandra.db.marshal.AsciiType";
1194 const char * const validator_text=    "org.apache.cassandra.db.marshal.UTF8Type";
1195 
1196 const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
1197 
1198 const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
1199 
1200 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
1201 
1202 /* VARINTs are stored as big-endian big numbers. */
1203 const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
1204 const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
1205 
1206 
1207 static CASSANDRA_TYPE_DEF cassandra_types[]=
1208 {
1209   {
1210     validator_bigint,
1211     &cassandra_to_dyncol_intLong,
1212     &dyncol_to_cassandraLong
1213   },
1214   {
1215     validator_int,
1216     &cassandra_to_dyncol_intInt32,
1217     &dyncol_to_cassandraInt32
1218   },
1219   {
1220     validator_counter,
1221     cassandra_to_dyncol_intCounter,
1222     &dyncol_to_cassandraCounter
1223   },
1224   {
1225     validator_float,
1226     &cassandra_to_dyncol_doubleFloat,
1227     &dyncol_to_cassandraFloat
1228   },
1229   {
1230     validator_double,
1231     &cassandra_to_dyncol_doubleDouble,
1232     &dyncol_to_cassandraDouble
1233   },
1234   {
1235     validator_blob,
1236     &cassandra_to_dyncol_strBytes,
1237     &dyncol_to_cassandraBytes
1238   },
1239   {
1240     validator_ascii,
1241     &cassandra_to_dyncol_strAscii,
1242     &dyncol_to_cassandraAscii
1243   },
1244   {
1245     validator_text,
1246     &cassandra_to_dyncol_strUTF8,
1247     &dyncol_to_cassandraUTF8
1248   },
1249   {
1250     validator_timestamp,
1251     &cassandra_to_dyncol_intLong,
1252     &dyncol_to_cassandraLong
1253   },
1254   {
1255     validator_uuid,
1256     &cassandra_to_dyncol_strUUID,
1257     &dyncol_to_cassandraUUID
1258   },
1259   {
1260     validator_boolean,
1261     &cassandra_to_dyncol_intBool,
1262     &dyncol_to_cassandraBool
1263   },
1264   {
1265     validator_varint,
1266     &cassandra_to_dyncol_strBytes,
1267     &dyncol_to_cassandraBytes
1268   },
1269   {
1270     validator_decimal,
1271     &cassandra_to_dyncol_strBytes,
1272     &dyncol_to_cassandraBytes
1273   }
1274 };
1275 
get_cassandra_type(const char * validator)1276 CASSANDRA_TYPE get_cassandra_type(const char *validator)
1277 {
1278   CASSANDRA_TYPE rc;
1279   switch(validator[32])
1280   {
1281   case 'L':
1282     rc= CT_BIGINT;
1283     break;
1284   case 'I':
1285     rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
1286     rc= CT_INT;
1287     break;
1288   case 'C':
1289     rc= CT_COUNTER;
1290     break;
1291   case 'F':
1292     rc= CT_FLOAT;
1293     break;
1294   case 'D':
1295     switch (validator[33])
1296     {
1297     case 'o':
1298       rc= CT_DOUBLE;
1299       break;
1300     case 'a':
1301       rc= CT_TIMESTAMP;
1302       break;
1303     case 'e':
1304       rc= CT_DECIMAL;
1305       break;
1306     default:
1307       rc= CT_BLOB;
1308       break;
1309     }
1310     break;
1311   case 'B':
1312     rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
1313     break;
1314   case 'A':
1315     rc= CT_ASCII;
1316     break;
1317   case 'U':
1318     rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
1319     break;
1320   default:
1321     rc= CT_BLOB;
1322   }
1323   DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
1324   return rc;
1325 }
1326 
map_field_to_validator(Field * field,const char * validator_name)1327 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
1328 {
1329   ColumnDataConverter *res= NULL;
1330 
1331   switch(field->type()) {
1332     case MYSQL_TYPE_TINY:
1333       if (!strcmp(validator_name, validator_boolean))
1334       {
1335         res= new TinyintDataConverter;
1336         break;
1337       }
1338       /* fall through: */
1339     case MYSQL_TYPE_SHORT:
1340     case MYSQL_TYPE_LONGLONG:
1341     {
1342       bool is_counter= false;
1343       if (!strcmp(validator_name, validator_bigint) ||
1344           !strcmp(validator_name, validator_timestamp) ||
1345           (is_counter= !strcmp(validator_name, validator_counter)))
1346         res= new BigintDataConverter(!is_counter);
1347       break;
1348     }
1349     case MYSQL_TYPE_FLOAT:
1350       if (!strcmp(validator_name, validator_float))
1351         res= new FloatDataConverter;
1352       break;
1353 
1354     case MYSQL_TYPE_DOUBLE:
1355       if (!strcmp(validator_name, validator_double))
1356         res= new DoubleDataConverter;
1357       break;
1358 
1359     case MYSQL_TYPE_TIMESTAMP:
1360       if (!strcmp(validator_name, validator_timestamp))
1361         res= new TimestampDataConverter;
1362       break;
1363 
1364     case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
1365       if (!strcmp(validator_name, validator_uuid) &&
1366           field->real_type() == MYSQL_TYPE_STRING &&
1367           field->field_length == 36)
1368       {
1369         // UUID maps to CHAR(36), its text representation
1370         res= new UuidDataConverter;
1371         break;
1372       }
1373       /* fall through: */
1374     case MYSQL_TYPE_VAR_STRING:
1375     case MYSQL_TYPE_VARCHAR:
1376     case MYSQL_TYPE_BLOB:
1377     {
1378       /*
1379         Cassandra's "varint" type is a binary-encoded arbitary-length
1380         big-endian number.
1381         - It can be mapped to VARBINARY(N), with sufficiently big N.
1382         - If the value does not fit into N bytes, it is an error. We should not
1383           truncate it, because that is just as good as returning garbage.
1384         - varint should not be mapped to BINARY(N), because BINARY(N) values
1385           are zero-padded, which will work as multiplying the value by
1386           2^k for some value of k.
1387       */
1388       if (field->type() == MYSQL_TYPE_VARCHAR &&
1389           field->binary() &&
1390           (!strcmp(validator_name, validator_varint) ||
1391            !strcmp(validator_name, validator_decimal)))
1392       {
1393         res= new StringCopyConverter(field->field_length);
1394         break;
1395       }
1396 
1397       if (!strcmp(validator_name, validator_blob) ||
1398           !strcmp(validator_name, validator_ascii) ||
1399           !strcmp(validator_name, validator_text))
1400       {
1401         res= new StringCopyConverter((size_t)-1);
1402       }
1403       break;
1404     }
1405     case MYSQL_TYPE_LONG:
1406       if (!strcmp(validator_name, validator_int))
1407         res= new Int32DataConverter;
1408       break;
1409 
1410     default:;
1411   }
1412   return res;
1413 }
1414 
1415 
setup_field_converters(Field ** field_arg,uint n_fields)1416 bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
1417 {
1418   char *col_name;
1419   int  col_name_len;
1420   char *col_type;
1421   int col_type_len;
1422   size_t ddl_fields= se->get_ddl_size();
1423   const char *default_type= se->get_default_validator();
1424   uint max_non_default_fields;
1425   DBUG_ENTER("ha_cassandra::setup_field_converters");
1426   DBUG_ASSERT(default_type);
1427 
1428   DBUG_ASSERT(!field_converters);
1429   DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
1430 
1431   /*
1432     We always should take into account that in case of using dynamic columns
1433     sql description contain one field which does not described in
1434     Cassandra DDL also key field is described separately. So that
1435     is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
1436   */
1437   max_non_default_fields= ddl_fields + 2 - n_fields;
1438   if (ddl_fields < (n_fields - dyncol_set - 1))
1439   {
1440     se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
1441     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1442     DBUG_RETURN(true);
1443   }
1444 
1445   /* allocate memory in one chunk */
1446   size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
1447     (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
1448     (dyncol_set ? max_non_default_fields : 0);
1449   if (!(field_converters= (ColumnDataConverter**)my_malloc(PSI_INSTRUMENT_ME, memsize, MYF(0))))
1450     DBUG_RETURN(true);
1451   bzero(field_converters, memsize);
1452   n_field_converters= n_fields;
1453 
1454   if (dyncol_set)
1455   {
1456     special_type_field_converters=
1457       (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
1458     special_type_field_names=
1459       ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
1460 
1461     if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_values,
1462                            sizeof(DYNAMIC_COLUMN_VALUE),
1463                            DYNCOL_USUAL, DYNCOL_DELTA, MYF(0)))
1464       DBUG_RETURN(true);
1465     else
1466       if (my_init_dynamic_array(PSI_INSTRUMENT_ME, &dynamic_names,
1467                              sizeof(LEX_STRING),
1468                              DYNCOL_USUAL, DYNCOL_DELTA,MYF(0)))
1469       {
1470         delete_dynamic(&dynamic_values);
1471         DBUG_RETURN(true);
1472       }
1473       else
1474         if (init_dynamic_string(&dynamic_rec, NULL,
1475                                 DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
1476         {
1477           delete_dynamic(&dynamic_values);
1478           delete_dynamic(&dynamic_names);
1479           DBUG_RETURN(true);
1480         }
1481 
1482     /* Dynamic column field has special processing */
1483     field_converters[dyncol_field]= NULL;
1484 
1485     default_type_def= cassandra_types + get_cassandra_type(default_type);
1486   }
1487 
1488   se->first_ddl_column();
1489   uint n_mapped= 0;
1490   while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
1491                               &col_type_len))
1492   {
1493     Field **field;
1494     uint i;
1495     /* Mapping for the 1st field is already known */
1496     for (field= field_arg + 1, i= 1; *field; field++, i++)
1497     {
1498       if ((!dyncol_set || dyncol_field != i) &&
1499           !strcmp((*field)->field_name.str, col_name))
1500       {
1501         n_mapped++;
1502         ColumnDataConverter **conv= field_converters + (*field)->field_index;
1503         if (!(*conv= map_field_to_validator(*field, col_type)))
1504         {
1505           se->print_error("Failed to map column %s to datatype %s",
1506                           (*field)->field_name.str, col_type);
1507           my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1508           DBUG_RETURN(true);
1509         }
1510         (*conv)->field= *field;
1511         break;
1512       }
1513     }
1514     if (dyncol_set && !(*field)) // is needed and not found
1515     {
1516       DBUG_PRINT("info",("Field not found: %s", col_name));
1517       if (strcmp(col_type, default_type))
1518       {
1519         DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
1520                            col_name, col_type));
1521         special_type_field_names[n_special_type_fields].length= col_name_len;
1522         special_type_field_names[n_special_type_fields].str= col_name;
1523         special_type_field_converters[n_special_type_fields]=
1524           cassandra_types[get_cassandra_type(col_type)];
1525         n_special_type_fields++;
1526       }
1527     }
1528   }
1529 
1530   if (n_mapped != n_fields - 1 - dyncol_set)
1531   {
1532     Field *first_unmapped= NULL;
1533     /* Find the first field */
1534     for (uint i= 1; i < n_fields;i++)
1535     {
1536       if (!field_converters[i])
1537       {
1538         first_unmapped= field_arg[i];
1539         break;
1540       }
1541     }
1542     DBUG_ASSERT(first_unmapped);
1543 
1544     se->print_error("Field `%s` could not be mapped to any field in Cassandra",
1545                     first_unmapped->field_name.str);
1546     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1547     DBUG_RETURN(true);
1548   }
1549 
1550   /*
1551     Setup type conversion for row_key.
1552   */
1553   se->get_rowkey_type(&col_name, &col_type);
1554   if (col_name && strcmp(col_name, (*field_arg)->field_name.str))
1555   {
1556     se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
1557                     col_name);
1558     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1559     DBUG_RETURN(true);
1560   }
1561   if (!col_name && strcmp("rowkey", (*field_arg)->field_name.str))
1562   {
1563     se->print_error("target column family has no key_alias defined, "
1564                     "PRIMARY KEY column must be named 'rowkey'");
1565     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1566     DBUG_RETURN(true);
1567   }
1568 
1569   if (col_type != NULL)
1570   {
1571     if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
1572     {
1573       se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
1574       my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1575       DBUG_RETURN(true);
1576     }
1577     rowkey_converter->field= *field_arg;
1578   }
1579   else
1580   {
1581     se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
1582     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1583     DBUG_RETURN(true);
1584   }
1585 
1586   DBUG_RETURN(false);
1587 }
1588 
1589 
free_field_converters()1590 void ha_cassandra::free_field_converters()
1591 {
1592   delete rowkey_converter;
1593   rowkey_converter= NULL;
1594 
1595   if (dyncol_set)
1596   {
1597     delete_dynamic(&dynamic_values);
1598     delete_dynamic(&dynamic_names);
1599     dynstr_free(&dynamic_rec);
1600   }
1601   if (field_converters)
1602   {
1603     for (uint i=0; i < n_field_converters; i++)
1604       if (field_converters[i])
1605       {
1606         DBUG_ASSERT(!dyncol_set || i != dyncol_field);
1607         delete field_converters[i];
1608       }
1609     my_free(field_converters);
1610     field_converters= NULL;
1611   }
1612 }
1613 
1614 
index_init(uint idx,bool sorted)1615 int ha_cassandra::index_init(uint idx, bool sorted)
1616 {
1617   int ires;
1618   if (!se && (ires= connect_and_check_options(table)))
1619     return ires;
1620   return 0;
1621 }
1622 
1623 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
1624 
index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)1625 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
1626                                  key_part_map keypart_map,
1627                                  enum ha_rkey_function find_flag)
1628 {
1629   int rc= 0;
1630   DBUG_ENTER("ha_cassandra::index_read_map");
1631 
1632   if (find_flag != HA_READ_KEY_EXACT)
1633   {
1634     DBUG_ASSERT(0); /* Non-equality lookups should never be done */
1635     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1636   }
1637 
1638   uint key_len= calculate_key_len(table, active_index, key, keypart_map);
1639   store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
1640 
1641   char *cass_key;
1642   int cass_key_len;
1643   MY_BITMAP *old_map;
1644 
1645   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1646 
1647   if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1648   {
1649     /* We get here when making lookups like uuid_column='not-an-uuid' */
1650     dbug_tmp_restore_column_map(&table->read_set, old_map);
1651     DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
1652   }
1653 
1654   dbug_tmp_restore_column_map(&table->read_set, old_map);
1655 
1656   bool found;
1657   if (se->get_slice(cass_key, cass_key_len, &found))
1658   {
1659     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1660     rc= HA_ERR_INTERNAL_ERROR;
1661   }
1662 
1663   /* TODO: what if we're not reading all columns?? */
1664   if (!found)
1665     rc= HA_ERR_KEY_NOT_FOUND;
1666   else
1667     rc= read_cassandra_columns(false);
1668 
1669   DBUG_RETURN(rc);
1670 }
1671 
1672 
print_conversion_error(const char * field_name,char * cass_value,int cass_value_len)1673 void ha_cassandra::print_conversion_error(const char *field_name,
1674                                           char *cass_value,
1675                                           int cass_value_len)
1676 {
1677   char buf[32];
1678   char *p= cass_value;
1679   size_t i= 0;
1680   for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
1681   {
1682     buf[i++]= map2number[(*p >> 4) & 0xF];
1683     buf[i++]= map2number[*p & 0xF];
1684   }
1685   buf[i]=0;
1686 
1687   se->print_error("Unable to convert value for field `%s` from Cassandra's data"
1688                   " format. Source data is %d bytes, 0x%s%s",
1689                   field_name, cass_value_len, buf,
1690                   (i == sizeof(buf) - 1)? "..." : "");
1691   my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1692 }
1693 
1694 
1695 
get_cassandra_field_def(char * cass_name,int cass_name_len)1696 CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
1697                                                            int cass_name_len)
1698 {
1699   CASSANDRA_TYPE_DEF *type= default_type_def;
1700   for(uint i= 0; i < n_special_type_fields; i++)
1701   {
1702     if (cass_name_len == (int)special_type_field_names[i].length &&
1703         memcmp(cass_name, special_type_field_names[i].str,
1704                cass_name_len) == 0)
1705     {
1706       type= special_type_field_converters + i;
1707       break;
1708     }
1709   }
1710   return type;
1711 }
1712 
read_cassandra_columns(bool unpack_pk)1713 int ha_cassandra::read_cassandra_columns(bool unpack_pk)
1714 {
1715   MEM_ROOT strings_root;
1716   char *cass_name;
1717   char *cass_value;
1718   int cass_value_len, cass_name_len;
1719   Field **field;
1720   int res= 0;
1721   ulong total_name_len= 0;
1722 
1723   clear_alloc_root(&strings_root);
1724   /*
1725     cassandra_to_mariadb() calls will use field->store(...) methods, which
1726     require that the column is in the table->write_set
1727   */
1728   MY_BITMAP *old_map;
1729   old_map= dbug_tmp_use_all_columns(table, &table->write_set);
1730 
1731   /* Start with all fields being NULL */
1732   for (field= table->field + 1; *field; field++)
1733     (*field)->set_null();
1734 
1735   while (!se->get_next_read_column(&cass_name, &cass_name_len,
1736                                    &cass_value, &cass_value_len))
1737   {
1738     // map to our column. todo: use hash or something..
1739     bool found= 0;
1740     for (field= table->field + 1; *field; field++)
1741     {
1742       uint fieldnr= (*field)->field_index;
1743       if ((!dyncol_set || dyncol_field != fieldnr) &&
1744           !strcmp((*field)->field_name.str, cass_name))
1745       {
1746         found= 1;
1747         (*field)->set_notnull();
1748         if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
1749                                                             cass_value_len))
1750         {
1751           print_conversion_error((*field)->field_name.str, cass_value,
1752                                  cass_value_len);
1753           res=1;
1754           goto err;
1755         }
1756         break;
1757       }
1758     }
1759     if (dyncol_set && !found)
1760     {
1761       DYNAMIC_COLUMN_VALUE val;
1762       LEX_STRING nm;
1763       CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
1764                                                         cass_name_len);
1765       nm.str= cass_name;
1766       nm.length= cass_name_len;
1767       if (nm.length > MAX_NAME_LENGTH)
1768       {
1769         se->print_error("Unable to convert value for field `%s`"
1770                         " from Cassandra's data format. Name"
1771                         " length exceed limit of %u: '%s'",
1772                         table->field[dyncol_field]->field_name.str,
1773                         (uint)MAX_NAME_LENGTH, cass_name);
1774         my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1775         res=1;
1776         goto err;
1777       }
1778       total_name_len+= cass_name_len;
1779       if (nm.length > MAX_TOTAL_NAME_LENGTH)
1780       {
1781         se->print_error("Unable to convert value for field `%s`"
1782                         " from Cassandra's data format. Sum of all names"
1783                         " length exceed limit of %lu",
1784                         table->field[dyncol_field]->field_name.str,
1785                         cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
1786         my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1787         res=1;
1788         goto err;
1789       }
1790 
1791       if ((res= (*(type->cassandra_to_dynamic))(cass_value,
1792                                                 cass_value_len, &val,
1793                                                 &strings_root)) ||
1794           insert_dynamic(&dynamic_names, (uchar *) &nm) ||
1795           insert_dynamic(&dynamic_values, (uchar *) &val))
1796       {
1797         if (res)
1798         {
1799           print_conversion_error(cass_name, cass_value, cass_value_len);
1800         }
1801         free_strings_memroot(&strings_root);
1802         // EOM shouldm be already reported if happened
1803         res= 1;
1804         goto err;
1805       }
1806     }
1807   }
1808 
1809   dynamic_rec.length= 0;
1810   if (dyncol_set)
1811   {
1812     if (mariadb_dyncol_create_many_named(&dynamic_rec,
1813                                          dynamic_names.elements,
1814                                          (LEX_STRING *)dynamic_names.buffer,
1815                                          (DYNAMIC_COLUMN_VALUE *)
1816                                          dynamic_values.buffer,
1817                                          FALSE) < 0)
1818       dynamic_rec.length= 0;
1819 
1820     free_strings_memroot(&strings_root);
1821     dynamic_values.elements= dynamic_names.elements= 0;
1822 
1823     if (dynamic_rec.length == 0)
1824       table->field[dyncol_field]->set_null();
1825     else
1826     {
1827       Field_blob *blob= (Field_blob *)table->field[dyncol_field];
1828       blob->set_notnull();
1829       blob->store_length(dynamic_rec.length);
1830       *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
1831         dynamic_rec.str;
1832     }
1833   }
1834 
1835   if (unpack_pk)
1836   {
1837     /* Unpack rowkey to primary key */
1838     field= table->field;
1839     (*field)->set_notnull();
1840     se->get_read_rowkey(&cass_value, &cass_value_len);
1841     if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
1842     {
1843       print_conversion_error((*field)->field_name.str, cass_value, cass_value_len);
1844       res=1;
1845       goto err;
1846     }
1847   }
1848 
1849 err:
1850   dbug_tmp_restore_column_map(&table->write_set, old_map);
1851   return res;
1852 }
1853 
read_dyncol(uint * count,DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names,String * valcol)1854 int ha_cassandra::read_dyncol(uint *count,
1855                               DYNAMIC_COLUMN_VALUE **vals,
1856                               LEX_STRING **names,
1857                               String *valcol)
1858 {
1859   String *strcol;
1860   DYNAMIC_COLUMN col;
1861 
1862   enum enum_dyncol_func_result rc;
1863   DBUG_ENTER("ha_cassandra::read_dyncol");
1864 
1865   Field *field= table->field[dyncol_field];
1866   DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
1867   /* It is blob and it does not use buffer */
1868   strcol= field->val_str(NULL, valcol);
1869   if (field->is_null())
1870   {
1871     *count= 0;
1872     *names= 0;
1873     *vals= 0;
1874     DBUG_RETURN(0); // nothing to write
1875   }
1876   /*
1877     dynamic_column_vals only read the string so we can
1878     cheat here with assignment
1879   */
1880   bzero(&col, sizeof(col));
1881   col.str= (char *)strcol->ptr();
1882   col.length= strcol->length();
1883   if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0)
1884   {
1885     dynamic_column_error_message(rc);
1886     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1887   }
1888   DBUG_RETURN(0);
1889 }
1890 
write_dynamic_row(uint count,DYNAMIC_COLUMN_VALUE * vals,LEX_STRING * names)1891 int ha_cassandra::write_dynamic_row(uint count,
1892                                     DYNAMIC_COLUMN_VALUE *vals,
1893                                     LEX_STRING *names)
1894 {
1895   uint i;
1896   DBUG_ENTER("ha_cassandra::write_dynamic_row");
1897   DBUG_ASSERT(dyncol_set);
1898 
1899 
1900   for (i= 0; i < count; i++)
1901   {
1902     char buff[16];
1903     CASSANDRA_TYPE_DEF *type;
1904     void *freemem= NULL;
1905     char *cass_data;
1906     int cass_data_len;
1907 
1908     DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str));
1909     type= get_cassandra_field_def(names[i].str, (int) names[i].length);
1910     if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len,
1911                                       buff, &freemem))
1912     {
1913       my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1914                names[i].str, insert_lineno);
1915       DBUG_RETURN(HA_ERR_GENERIC);
1916     }
1917     se->add_insert_column(names[i].str, names[i].length,
1918                           cass_data, cass_data_len);
1919     if (freemem)
1920       my_free(freemem);
1921   }
1922   DBUG_RETURN(0);
1923 }
1924 
free_dynamic_row(DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names)1925 void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
1926                                     LEX_STRING **names)
1927 {
1928   mariadb_dyncol_unpack_free(*names, *vals);
1929   *vals= 0;
1930   *names= 0;
1931 }
1932 
write_row(const uchar * buf)1933 int ha_cassandra::write_row(const uchar *buf)
1934 {
1935   MY_BITMAP *old_map;
1936   int ires;
1937   DBUG_ENTER("ha_cassandra::write_row");
1938 
1939   if (!se && (ires= connect_and_check_options(table)))
1940     DBUG_RETURN(ires);
1941 
1942   if (!doing_insert_batch)
1943     se->clear_insert_buffer();
1944 
1945   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1946 
1947   insert_lineno++;
1948 
1949   /* Convert the key */
1950   char *cass_key;
1951   int cass_key_len;
1952   if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1953   {
1954     my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1955              rowkey_converter->field->field_name.str, insert_lineno);
1956     dbug_tmp_restore_column_map(&table->read_set, old_map);
1957     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1958   }
1959   se->start_row_insert(cass_key, cass_key_len);
1960 
1961   /* Convert other fields */
1962   for (uint i= 1; i < table->s->fields; i++)
1963   {
1964     char *cass_data;
1965     int cass_data_len;
1966     if (dyncol_set && dyncol_field == i)
1967     {
1968       String valcol;
1969       DYNAMIC_COLUMN_VALUE *vals;
1970       LEX_STRING *names;
1971       uint count;
1972       int rc;
1973       DBUG_ASSERT(field_converters[i] == NULL);
1974       if (!(rc= read_dyncol(&count, &vals, &names, &valcol)))
1975         rc= write_dynamic_row(count, vals, names);
1976       free_dynamic_row(&vals, &names);
1977       if (rc)
1978       {
1979         dbug_tmp_restore_column_map(&table->read_set, old_map);
1980         DBUG_RETURN(rc);
1981       }
1982     }
1983     else
1984     {
1985       if (field_converters[i]->mariadb_to_cassandra(&cass_data,
1986                                                     &cass_data_len))
1987       {
1988         my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1989                  field_converters[i]->field->field_name.str, insert_lineno);
1990         dbug_tmp_restore_column_map(&table->read_set, old_map);
1991         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1992       }
1993       se->add_insert_column(field_converters[i]->field->field_name.str, 0,
1994                             cass_data, cass_data_len);
1995     }
1996   }
1997 
1998   dbug_tmp_restore_column_map(&table->read_set, old_map);
1999 
2000   bool res;
2001 
2002   if (doing_insert_batch)
2003   {
2004     res= 0;
2005     if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
2006     {
2007       res= se->do_insert();
2008       insert_rows_batched= 0;
2009     }
2010   }
2011   else
2012     res= se->do_insert();
2013 
2014   if (res)
2015     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2016 
2017   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2018 }
2019 
2020 
start_bulk_insert(ha_rows rows,uint flags)2021 void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
2022 {
2023   int ires;
2024   if (!se && (ires= connect_and_check_options(table)))
2025     return;
2026 
2027   doing_insert_batch= true;
2028   insert_rows_batched= 0;
2029 
2030   se->clear_insert_buffer();
2031 }
2032 
2033 
end_bulk_insert()2034 int ha_cassandra::end_bulk_insert()
2035 {
2036   DBUG_ENTER("ha_cassandra::end_bulk_insert");
2037 
2038   if (!doing_insert_batch)
2039   {
2040     /* SQL layer can make end_bulk_insert call without start_bulk_insert call */
2041     DBUG_RETURN(0);
2042   }
2043 
2044   /* Flush out the insert buffer */
2045   doing_insert_batch= false;
2046   bool bres= se->do_insert();
2047   se->clear_insert_buffer();
2048 
2049   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2050 }
2051 
2052 
rnd_init(bool scan)2053 int ha_cassandra::rnd_init(bool scan)
2054 {
2055   bool bres;
2056   int ires;
2057   DBUG_ENTER("ha_cassandra::rnd_init");
2058 
2059   if (!se && (ires= connect_and_check_options(table)))
2060     DBUG_RETURN(ires);
2061 
2062   if (!scan)
2063   {
2064     /* Prepare for rnd_pos() calls. We don't need to anything. */
2065     DBUG_RETURN(0);
2066   }
2067 
2068   if (dyncol_set)
2069   {
2070     se->clear_read_all_columns();
2071   }
2072   else
2073   {
2074     se->clear_read_columns();
2075     for (uint i= 1; i < table->s->fields; i++)
2076       se->add_read_column(table->field[i]->field_name.str);
2077   }
2078 
2079   se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
2080   bres= se->get_range_slices(false);
2081   if (bres)
2082     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2083 
2084   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2085 }
2086 
2087 
rnd_end()2088 int ha_cassandra::rnd_end()
2089 {
2090   DBUG_ENTER("ha_cassandra::rnd_end");
2091 
2092   se->finish_reading_range_slices();
2093   DBUG_RETURN(0);
2094 }
2095 
2096 
rnd_next(uchar * buf)2097 int ha_cassandra::rnd_next(uchar *buf)
2098 {
2099   int rc;
2100   bool reached_eof;
2101   DBUG_ENTER("ha_cassandra::rnd_next");
2102 
2103   // Unpack and return the next record.
2104   if (se->get_next_range_slice_row(&reached_eof))
2105   {
2106     rc= HA_ERR_INTERNAL_ERROR;
2107   }
2108   else
2109   {
2110     if (reached_eof)
2111       rc= HA_ERR_END_OF_FILE;
2112     else
2113       rc= read_cassandra_columns(true);
2114   }
2115 
2116   DBUG_RETURN(rc);
2117 }
2118 
2119 
delete_all_rows()2120 int ha_cassandra::delete_all_rows()
2121 {
2122   bool bres;
2123   int ires;
2124   DBUG_ENTER("ha_cassandra::delete_all_rows");
2125 
2126   if (!se && (ires= connect_and_check_options(table)))
2127     DBUG_RETURN(ires);
2128 
2129   bres= se->truncate();
2130 
2131   if (bres)
2132     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2133 
2134   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2135 }
2136 
2137 
delete_row(const uchar * buf)2138 int ha_cassandra::delete_row(const uchar *buf)
2139 {
2140   bool bres;
2141   DBUG_ENTER("ha_cassandra::delete_row");
2142 
2143   bres= se->remove_row();
2144 
2145   if (bres)
2146     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2147 
2148   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2149 }
2150 
2151 
info(uint flag)2152 int ha_cassandra::info(uint flag)
2153 {
2154   DBUG_ENTER("ha_cassandra::info");
2155 
2156   if (!table)
2157     return 1;
2158 
2159   if (flag & HA_STATUS_VARIABLE)
2160   {
2161     stats.records= 1000;
2162     stats.deleted= 0;
2163   }
2164   if (flag & HA_STATUS_CONST)
2165   {
2166     ref_length= table->field[0]->key_length();
2167   }
2168 
2169   DBUG_RETURN(0);
2170 }
2171 
2172 
2173 void key_copy(uchar *to_key, const uchar *from_record, const KEY *key_info,
2174               uint key_length, bool with_zerofill);
2175 
2176 
position(const uchar * record)2177 void ha_cassandra::position(const uchar *record)
2178 {
2179   DBUG_ENTER("ha_cassandra::position");
2180 
2181   /* Copy the primary key to rowid */
2182   key_copy(ref, (uchar*)record, &table->key_info[0],
2183            table->field[0]->key_length(), true);
2184 
2185   DBUG_VOID_RETURN;
2186 }
2187 
2188 
rnd_pos(uchar * buf,uchar * pos)2189 int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
2190 {
2191   int rc;
2192   DBUG_ENTER("ha_cassandra::rnd_pos");
2193 
2194   int save_active_index= active_index;
2195   active_index= 0; /* The primary key */
2196   rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
2197 
2198   active_index= save_active_index;
2199 
2200   DBUG_RETURN(rc);
2201 }
2202 
2203 
reset()2204 int ha_cassandra::reset()
2205 {
2206   doing_insert_batch= false;
2207   insert_lineno= 0;
2208   if (se)
2209   {
2210     se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
2211                                THDVAR(table->in_use, write_consistency));
2212     se->set_n_retries(THDVAR(table->in_use, failure_retries));
2213   }
2214   return 0;
2215 }
2216 
2217 /////////////////////////////////////////////////////////////////////////////
2218 // MRR implementation
2219 /////////////////////////////////////////////////////////////////////////////
2220 
2221 
2222 /*
2223  - The key can be only primary key
2224   - allow equality-ranges only.
2225   - anything else?
2226 */
multi_range_read_info_const(uint keyno,RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint * bufsz,uint * flags,Cost_estimate * cost)2227 ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
2228                                                   void *seq_init_param,
2229                                                   uint n_ranges, uint *bufsz,
2230                                                   uint *flags, Cost_estimate *cost)
2231 {
2232   /* No support for const ranges so far */
2233   return HA_POS_ERROR;
2234 }
2235 
2236 
multi_range_read_info(uint keyno,uint n_ranges,uint keys,uint key_parts,uint * bufsz,uint * flags,Cost_estimate * cost)2237 ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
2238                               uint key_parts, uint *bufsz,
2239                               uint *flags, Cost_estimate *cost)
2240 {
2241   /* Can only be equality lookups on the primary key... */
2242   // TODO anything else?
2243   *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
2244   *flags |= HA_MRR_NO_ASSOCIATION;
2245 
2246   return 10;
2247 }
2248 
2249 
multi_range_read_init(RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint mode,HANDLER_BUFFER * buf)2250 int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
2251                           uint n_ranges, uint mode, HANDLER_BUFFER *buf)
2252 {
2253   int res;
2254   mrr_iter= seq->init(seq_init_param, n_ranges, mode);
2255   mrr_funcs= *seq;
2256   res= mrr_start_read();
2257   return (res? HA_ERR_INTERNAL_ERROR: 0);
2258 }
2259 
2260 
mrr_start_read()2261 bool ha_cassandra::mrr_start_read()
2262 {
2263   uint key_len;
2264 
2265   MY_BITMAP *old_map;
2266   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2267 
2268   se->new_lookup_keys();
2269 
2270   while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
2271   {
2272     char *cass_key;
2273     int cass_key_len;
2274 
2275     DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
2276 
2277     uchar *key= (uchar*)mrr_cur_range.start_key.key;
2278     key_len= mrr_cur_range.start_key.length;
2279     //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
2280     store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
2281 
2282     rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
2283 
2284     // Primitive buffer control
2285     if ((ulong) se->add_lookup_key(cass_key, cass_key_len) >
2286         THDVAR(table->in_use, multiget_batch_size))
2287       break;
2288   }
2289 
2290   dbug_tmp_restore_column_map(&table->read_set, old_map);
2291 
2292   return se->multiget_slice();
2293 }
2294 
2295 
multi_range_read_next(range_id_t * range_info)2296 int ha_cassandra::multi_range_read_next(range_id_t *range_info)
2297 {
2298   int res;
2299   while(1)
2300   {
2301     if (!se->get_next_multiget_row())
2302     {
2303       res= read_cassandra_columns(true);
2304       break;
2305     }
2306     else
2307     {
2308       if (source_exhausted)
2309       {
2310         res= HA_ERR_END_OF_FILE;
2311         break;
2312       }
2313       else
2314       {
2315         if (mrr_start_read())
2316         {
2317           res= HA_ERR_INTERNAL_ERROR;
2318           break;
2319         }
2320       }
2321     }
2322     /*
2323       We get here if we've refilled the buffer and done another read. Try
2324       reading from results again
2325     */
2326   }
2327   return res;
2328 }
2329 
2330 
multi_range_read_explain_info(uint mrr_mode,char * str,size_t size)2331 int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
2332 {
2333   const char *mrr_str= "multiget_slice";
2334 
2335   if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
2336   {
2337     uint mrr_str_len= strlen(mrr_str);
2338     uint copy_len= MY_MIN(mrr_str_len, size);
2339     memcpy(str, mrr_str, size);
2340     return copy_len;
2341   }
2342   return 0;
2343 }
2344 
2345 
2346 class Column_name_enumerator_impl : public Column_name_enumerator
2347 {
2348   ha_cassandra *obj;
2349   uint idx;
2350 public:
Column_name_enumerator_impl(ha_cassandra * obj_arg)2351   Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
get_next_name()2352   const char* get_next_name()
2353   {
2354     if (idx == obj->table->s->fields)
2355       return NULL;
2356     else
2357       return obj->table->field[idx++]->field_name.str;
2358   }
2359 };
2360 
2361 
update_row(const uchar * old_data,const uchar * new_data)2362 int ha_cassandra::update_row(const uchar *old_data, const uchar *new_data)
2363 {
2364   DYNAMIC_COLUMN_VALUE *oldvals, *vals;
2365   LEX_STRING *oldnames, *names;
2366   uint oldcount, count;
2367   String oldvalcol, valcol;
2368   MY_BITMAP *old_map;
2369   int res;
2370   DBUG_ENTER("ha_cassandra::update_row");
2371   /* Currently, it is guaranteed that new_data == table->record[0] */
2372   DBUG_ASSERT(new_data == table->record[0]);
2373   /* For now, just rewrite the full record */
2374   se->clear_insert_buffer();
2375 
2376   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2377 
2378   char *old_key;
2379   int old_key_len;
2380   se->get_read_rowkey(&old_key, &old_key_len);
2381 
2382   /* Get the key we're going to write */
2383   char *new_key;
2384   int new_key_len;
2385   if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
2386   {
2387     my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2388              rowkey_converter->field->field_name.str, insert_lineno);
2389     dbug_tmp_restore_column_map(&table->read_set, old_map);
2390     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2391   }
2392 
2393   /*
2394     Compare it to the key we've read. For all types that Cassandra supports,
2395     binary byte-wise comparison can be used
2396   */
2397   bool new_primary_key;
2398   if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
2399     new_primary_key= true;
2400   else
2401     new_primary_key= false;
2402 
2403   if (dyncol_set)
2404   {
2405     Field *field= table->field[dyncol_field];
2406     /* move to get old_data */
2407     my_ptrdiff_t diff;
2408     diff= (my_ptrdiff_t) (old_data - new_data);
2409     field->move_field_offset(diff);      // Points now at old_data
2410     if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol)))
2411       DBUG_RETURN(res);
2412     field->move_field_offset(-diff);     // back to new_data
2413     if ((res= read_dyncol(&count, &vals, &names, &valcol)))
2414     {
2415       free_dynamic_row(&oldvals, &oldnames);
2416       DBUG_RETURN(res);
2417     }
2418   }
2419 
2420   if (new_primary_key)
2421   {
2422     /*
2423       Primary key value changed. This is essentially a DELETE + INSERT.
2424       Add a DELETE operation into the batch
2425     */
2426     Column_name_enumerator_impl name_enumerator(this);
2427     se->add_row_deletion(old_key, old_key_len, &name_enumerator,
2428                          oldnames,
2429                          (dyncol_set ? oldcount : 0));
2430     oldcount= 0; // they will be deleted
2431   }
2432 
2433   se->start_row_insert(new_key, new_key_len);
2434 
2435   /* Convert other fields */
2436   for (uint i= 1; i < table->s->fields; i++)
2437   {
2438     char *cass_data;
2439     int cass_data_len;
2440     if (dyncol_set && dyncol_field == i)
2441     {
2442       DBUG_ASSERT(field_converters[i] == NULL);
2443       if ((res= write_dynamic_row(count, vals, names)))
2444         goto err;
2445     }
2446     else
2447     {
2448       if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
2449       {
2450         my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2451                  field_converters[i]->field->field_name.str, insert_lineno);
2452         dbug_tmp_restore_column_map(&table->read_set, old_map);
2453         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2454       }
2455       se->add_insert_column(field_converters[i]->field->field_name.str, 0,
2456                             cass_data, cass_data_len);
2457     }
2458   }
2459   if (dyncol_set)
2460   {
2461     /* find removed fields */
2462     uint i= 0, j= 0;
2463     /* both array are sorted */
2464     for(; i < oldcount; i++)
2465     {
2466       int scmp= 0;
2467       while (j < count &&
2468              (scmp = mariadb_dyncol_column_cmp_named(names + j,
2469                                                      oldnames + i)) < 0)
2470         j++;
2471       if (j < count &&
2472           scmp == 0)
2473         j++;
2474       else
2475         se->add_insert_delete_column(oldnames[i].str, oldnames[i].length);
2476     }
2477   }
2478 
2479   dbug_tmp_restore_column_map(&table->read_set, old_map);
2480 
2481   res= se->do_insert();
2482 
2483   if (res)
2484     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2485 
2486 err:
2487   if (dyncol_set)
2488   {
2489     free_dynamic_row(&oldvals, &oldnames);
2490     free_dynamic_row(&vals, &names);
2491   }
2492 
2493   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2494 }
2495 
2496 
2497 /*
2498   We can't really have any locks for Cassandra Storage Engine. We're reading
2499   from Cassandra cluster, and other clients can asynchronously modify the data.
2500 
2501   We can enforce locking within this process, but this will not be useful.
2502 
2503   Thus, store_lock() should express that:
2504   - Writes do not block other writes
2505   - Reads should not block anything either, including INSERTs.
2506 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)2507 THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
2508                                          THR_LOCK_DATA **to,
2509                                          enum thr_lock_type lock_type)
2510 {
2511   DBUG_ENTER("ha_cassandra::store_lock");
2512   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2513   {
2514     /* Writes allow other writes */
2515     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2516          lock_type <= TL_WRITE))
2517       lock_type = TL_WRITE_ALLOW_WRITE;
2518 
2519     /* Reads allow everything, including INSERTs */
2520     if (lock_type == TL_READ_NO_INSERT)
2521       lock_type = TL_READ;
2522 
2523     lock.type= lock_type;
2524   }
2525   *to++= &lock;
2526   DBUG_RETURN(to);
2527 }
2528 
2529 
2530 /**
2531   check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
2532   if new and old definition are compatible
2533 
2534   @details If there are no other explicit signs like changed number of
2535   fields this function will be called by compare_tables()
2536   (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
2537   file.
2538 
2539 */
2540 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)2541 bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
2542                                             uint table_changes)
2543 {
2544   DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
2545   /* Checked, we intend to have this empty for Cassandra SE. */
2546   DBUG_RETURN(COMPATIBLE_DATA_YES);
2547 }
2548 
2549 
print_error(const char * format,...)2550 void Cassandra_se_interface::print_error(const char *format, ...)
2551 {
2552   va_list ap;
2553   va_start(ap, format);
2554   // it's not a problem if output was truncated
2555   my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
2556   va_end(ap);
2557 }
2558 
2559 
2560 struct st_mysql_storage_engine cassandra_storage_engine=
2561 { MYSQL_HANDLERTON_INTERFACE_VERSION };
2562 
2563 static SHOW_VAR cassandra_status_variables[]= {
2564   {"row_inserts",
2565     (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
2566   {"row_insert_batches",
2567     (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
2568 
2569   {"multiget_keys_scanned",
2570     (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
2571   {"multiget_reads",
2572     (char*) &cassandra_counters.multiget_reads,      SHOW_LONG},
2573   {"multiget_rows_read",
2574     (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},
2575 
2576   {"network_exceptions",
2577     (char*) &cassandra_counters.network_exceptions, SHOW_LONG},
2578   {"timeout_exceptions",
2579     (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
2580   {"unavailable_exceptions",
2581     (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
2582   {NullS, NullS, SHOW_LONG}
2583 };
2584 
2585 
2586 
2587 
maria_declare_plugin(cassandra)2588 maria_declare_plugin(cassandra)
2589 {
2590   MYSQL_STORAGE_ENGINE_PLUGIN,
2591   &cassandra_storage_engine,
2592   "CASSANDRA",
2593   "Monty Program Ab",
2594   "Cassandra storage engine",
2595   PLUGIN_LICENSE_GPL,
2596   cassandra_init_func,                            /* Plugin Init */
2597   cassandra_done_func,                            /* Plugin Deinit */
2598   0x0001,                                        /* version number (0.1) */
2599   cassandra_status_variables,                     /* status variables */
2600   cassandra_system_variables,                     /* system variables */
2601   "0.1",                                        /* string version */
2602   MariaDB_PLUGIN_MATURITY_EXPERIMENTAL          /* maturity */
2603 }
2604 maria_declare_plugin_end;
2605