1 /*
2    Copyright (c) 2012, Monty Program Ab
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
16 
17 #ifdef USE_PRAGMA_IMPLEMENTATION
18 #pragma implementation        // gcc: Class implementation
19 #endif
20 
21 #include <my_config.h>
22 #include <mysql/plugin.h>
23 #include "ha_cassandra.h"
24 #include "sql_class.h"
25 
26 #define DYNCOL_USUAL 20
27 #define DYNCOL_DELTA 100
28 #define DYNCOL_USUAL_REC 1024
29 #define DYNCOL_DELTA_REC 1024
30 
31 static handler *cassandra_create_handler(handlerton *hton,
32                                        TABLE_SHARE *table,
33                                        MEM_ROOT *mem_root);
34 
35 extern int dynamic_column_error_message(enum_dyncol_func_result rc);
36 
37 handlerton *cassandra_hton;
38 
39 
40 /*
41    Hash used to track the number of open tables; variable for example share
42    methods
43 */
44 static HASH cassandra_open_tables;
45 
46 /* The mutex used to init the hash; variable for example share methods */
47 mysql_mutex_t cassandra_mutex;
48 
49 
50 /**
51   Structure for CREATE TABLE options (table options).
52   It needs to be called ha_table_option_struct.
53 
54   The option values can be specified in the CREATE TABLE at the end:
55   CREATE TABLE ( ... ) *here*
56 */
57 
58 struct ha_table_option_struct
59 {
60   const char *thrift_host;
61   int         thrift_port;
62   const char *keyspace;
63   const char *column_family;
64 };
65 
66 
67 ha_create_table_option cassandra_table_option_list[]=
68 {
69   /*
70     one option that takes an arbitrary string
71   */
72   HA_TOPTION_STRING("thrift_host", thrift_host),
73   HA_TOPTION_NUMBER("thrift_port", thrift_port, 9160, 1, 65535, 0),
74   HA_TOPTION_STRING("keyspace", keyspace),
75   HA_TOPTION_STRING("column_family", column_family),
76   HA_TOPTION_END
77 };
78 
79 /**
80   Structure for CREATE TABLE options (field options).
81 */
82 
83 struct ha_field_option_struct
84 {
85   bool dyncol_field;
86 };
87 
88 ha_create_table_option cassandra_field_option_list[]=
89 {
90   /*
91     Collect all other columns as dynamic here,
92     the valid values are YES/NO, ON/OFF, 1/0.
93     The default is 0, that is false, no, off.
94   */
95   HA_FOPTION_BOOL("DYNAMIC_COLUMN_STORAGE", dyncol_field, 0),
96   HA_FOPTION_END
97 };
98 
99 static MYSQL_THDVAR_ULONG(insert_batch_size, PLUGIN_VAR_RQCMDARG,
100   "Number of rows in an INSERT batch",
101   NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
102 
103 static MYSQL_THDVAR_ULONG(multiget_batch_size, PLUGIN_VAR_RQCMDARG,
104   "Number of rows in a multiget(MRR) batch",
105   NULL, NULL, /*default*/ 100, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
106 
107 static MYSQL_THDVAR_ULONG(rnd_batch_size, PLUGIN_VAR_RQCMDARG,
108   "Number of rows in an rnd_read (full scan) batch",
109   NULL, NULL, /*default*/ 10*1000, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
110 
111 static MYSQL_THDVAR_ULONG(failure_retries, PLUGIN_VAR_RQCMDARG,
112   "Number of times to retry Cassandra calls that failed due to timeouts or "
113   "network communication problems. The default, 0, means not to retry.",
114   NULL, NULL, /*default*/ 3, /*min*/ 1, /*max*/ 1024*1024*1024, 0);
115 
116 /* These match values in enum_cassandra_consistency_level */
117 const char *cassandra_consistency_level[] =
118 {
119   "ONE",
120   "QUORUM",
121   "LOCAL_QUORUM",
122   "EACH_QUORUM",
123   "ALL",
124   "ANY",
125   "TWO",
126   "THREE",
127    NullS
128 };
129 
130 TYPELIB cassandra_consistency_level_typelib= {
131   array_elements(cassandra_consistency_level) - 1, "",
132   cassandra_consistency_level, NULL
133 };
134 
135 
136 static MYSQL_THDVAR_ENUM(write_consistency, PLUGIN_VAR_RQCMDARG,
137   "Cassandra consistency level to use for write operations", NULL, NULL,
138   ONE, &cassandra_consistency_level_typelib);
139 
140 static MYSQL_THDVAR_ENUM(read_consistency, PLUGIN_VAR_RQCMDARG,
141   "Cassandra consistency level to use for read operations", NULL, NULL,
142   ONE, &cassandra_consistency_level_typelib);
143 
144 
145 mysql_mutex_t cassandra_default_host_lock;
146 static char* cassandra_default_thrift_host = NULL;
147 static char cassandra_default_host_buf[256]="";
148 
149 static void
cassandra_default_thrift_host_update(THD * thd,struct st_mysql_sys_var * var,void * var_ptr,const void * save)150 cassandra_default_thrift_host_update(THD *thd,
151                                      struct st_mysql_sys_var* var,
152                                      void* var_ptr, /*!< out: where the
153                                                     formal string goes */
154                                      const void* save) /*!< in: immediate result
155                                                        from check function */
156 {
157   const char *new_host= *((char**)save);
158   const size_t max_len= sizeof(cassandra_default_host_buf);
159 
160   mysql_mutex_lock(&cassandra_default_host_lock);
161 
162   if (new_host)
163   {
164     strncpy(cassandra_default_host_buf, new_host, max_len-1);
165     cassandra_default_host_buf[max_len-1]= 0;
166     cassandra_default_thrift_host= cassandra_default_host_buf;
167   }
168   else
169   {
170     cassandra_default_host_buf[0]= 0;
171     cassandra_default_thrift_host= NULL;
172   }
173 
174   *((const char**)var_ptr)= cassandra_default_thrift_host;
175 
176   mysql_mutex_unlock(&cassandra_default_host_lock);
177 }
178 
179 
180 static MYSQL_SYSVAR_STR(default_thrift_host, cassandra_default_thrift_host,
181                         PLUGIN_VAR_RQCMDARG,
182                         "Default host for Cassandra thrift connections",
183                         /*check*/NULL,
184                         cassandra_default_thrift_host_update,
185                         /*default*/NULL);
186 
187 static struct st_mysql_sys_var* cassandra_system_variables[]= {
188   MYSQL_SYSVAR(insert_batch_size),
189   MYSQL_SYSVAR(multiget_batch_size),
190   MYSQL_SYSVAR(rnd_batch_size),
191 
192   MYSQL_SYSVAR(default_thrift_host),
193   MYSQL_SYSVAR(write_consistency),
194   MYSQL_SYSVAR(read_consistency),
195   MYSQL_SYSVAR(failure_retries),
196   NULL
197 };
198 
199 Cassandra_status_vars cassandra_counters;
200 
201 /**
202   @brief
203   Function we use in the creation of our hash to get key.
204 */
205 
cassandra_get_key(CASSANDRA_SHARE * share,size_t * length,my_bool not_used)206 static uchar* cassandra_get_key(CASSANDRA_SHARE *share, size_t *length,
207                              my_bool not_used __attribute__((unused)))
208 {
209   *length=share->table_name_length;
210   return (uchar*) share->table_name;
211 }
212 
213 #ifdef HAVE_PSI_INTERFACE
214 static PSI_mutex_key ex_key_mutex_example, ex_key_mutex_CASSANDRA_SHARE_mutex;
215 
216 static PSI_mutex_info all_cassandra_mutexes[]=
217 {
218   { &ex_key_mutex_example, "cassandra", PSI_FLAG_GLOBAL},
219   { &ex_key_mutex_CASSANDRA_SHARE_mutex, "CASSANDRA_SHARE::mutex", 0}
220 };
221 
init_cassandra_psi_keys()222 static void init_cassandra_psi_keys()
223 {
224   const char* category= "cassandra";
225   int count;
226 
227   if (PSI_server == NULL)
228     return;
229 
230   count= array_elements(all_cassandra_mutexes);
231   PSI_server->register_mutex(category, all_cassandra_mutexes, count);
232 }
233 #endif
234 
cassandra_init_func(void * p)235 static int cassandra_init_func(void *p)
236 {
237   DBUG_ENTER("cassandra_init_func");
238 
239 #ifdef HAVE_PSI_INTERFACE
240   init_cassandra_psi_keys();
241 #endif
242 
243   cassandra_hton= (handlerton *)p;
244   mysql_mutex_init(ex_key_mutex_example, &cassandra_mutex, MY_MUTEX_INIT_FAST);
245   (void) my_hash_init(&cassandra_open_tables,system_charset_info,32,0,0,
246                       (my_hash_get_key) cassandra_get_key,0,0);
247 
248   cassandra_hton->state=   SHOW_OPTION_YES;
249   cassandra_hton->create=  cassandra_create_handler;
250   /*
251     Don't specify HTON_CAN_RECREATE in flags. re-create is used by TRUNCATE
252     TABLE to create an *empty* table from scratch. Cassandra table won't be
253     emptied if re-created.
254   */
255   cassandra_hton->flags=   0;
256   cassandra_hton->table_options= cassandra_table_option_list;
257   cassandra_hton->field_options= cassandra_field_option_list;
258 
259   mysql_mutex_init(0 /* no instrumentation */,
260                    &cassandra_default_host_lock, MY_MUTEX_INIT_FAST);
261 
262   DBUG_RETURN(0);
263 }
264 
265 
cassandra_done_func(void * p)266 static int cassandra_done_func(void *p)
267 {
268   int error= 0;
269   DBUG_ENTER("cassandra_done_func");
270   if (cassandra_open_tables.records)
271     error= 1;
272   my_hash_free(&cassandra_open_tables);
273   mysql_mutex_destroy(&cassandra_mutex);
274   mysql_mutex_destroy(&cassandra_default_host_lock);
275   DBUG_RETURN(error);
276 }
277 
278 
279 /**
280   @brief
281   Example of simple lock controls. The "share" it creates is a
282   structure we will pass to each cassandra handler. Do you have to have
283   one of these? Well, you have pieces that are used for locking, and
284   they are needed to function.
285 */
286 
get_share(const char * table_name,TABLE * table)287 static CASSANDRA_SHARE *get_share(const char *table_name, TABLE *table)
288 {
289   CASSANDRA_SHARE *share;
290   uint length;
291   char *tmp_name;
292 
293   mysql_mutex_lock(&cassandra_mutex);
294   length=(uint) strlen(table_name);
295 
296   if (!(share=(CASSANDRA_SHARE*) my_hash_search(&cassandra_open_tables,
297                                               (uchar*) table_name,
298                                               length)))
299   {
300     if (!(share=(CASSANDRA_SHARE *)
301           my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
302                           &share, sizeof(*share),
303                           &tmp_name, length+1,
304                           NullS)))
305     {
306       mysql_mutex_unlock(&cassandra_mutex);
307       return NULL;
308     }
309 
310     share->use_count=0;
311     share->table_name_length=length;
312     share->table_name=tmp_name;
313     strmov(share->table_name,table_name);
314     if (my_hash_insert(&cassandra_open_tables, (uchar*) share))
315       goto error;
316     thr_lock_init(&share->lock);
317     mysql_mutex_init(ex_key_mutex_CASSANDRA_SHARE_mutex,
318                      &share->mutex, MY_MUTEX_INIT_FAST);
319   }
320   share->use_count++;
321   mysql_mutex_unlock(&cassandra_mutex);
322 
323   return share;
324 
325 error:
326   mysql_mutex_destroy(&share->mutex);
327   my_free(share);
328 
329   return NULL;
330 }
331 
332 
333 /**
334   @brief
335   Free lock controls. We call this whenever we close a table. If the table had
336   the last reference to the share, then we free memory associated with it.
337 */
338 
free_share(CASSANDRA_SHARE * share)339 static int free_share(CASSANDRA_SHARE *share)
340 {
341   mysql_mutex_lock(&cassandra_mutex);
342   if (!--share->use_count)
343   {
344     my_hash_delete(&cassandra_open_tables, (uchar*) share);
345     thr_lock_delete(&share->lock);
346     mysql_mutex_destroy(&share->mutex);
347     my_free(share);
348   }
349   mysql_mutex_unlock(&cassandra_mutex);
350 
351   return 0;
352 }
353 
354 
cassandra_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)355 static handler* cassandra_create_handler(handlerton *hton,
356                                        TABLE_SHARE *table,
357                                        MEM_ROOT *mem_root)
358 {
359   return new (mem_root) ha_cassandra(hton, table);
360 }
361 
362 
ha_cassandra(handlerton * hton,TABLE_SHARE * table_arg)363 ha_cassandra::ha_cassandra(handlerton *hton, TABLE_SHARE *table_arg)
364   :handler(hton, table_arg),
365    se(NULL), field_converters(NULL),
366    special_type_field_converters(NULL),
367    special_type_field_names(NULL), n_special_type_fields(0),
368    rowkey_converter(NULL),
369    dyncol_field(0), dyncol_set(0)
370 {}
371 
372 
connect_and_check_options(TABLE * table_arg)373 int ha_cassandra::connect_and_check_options(TABLE *table_arg)
374 {
375   ha_table_option_struct *options= table_arg->s->option_struct;
376   int res;
377   DBUG_ENTER("ha_cassandra::connect_and_check_options");
378 
379   if ((res= check_field_options(table_arg->s->field)) ||
380       (res= check_table_options(options)))
381     DBUG_RETURN(res);
382 
383   se= create_cassandra_se();
384   se->set_column_family(options->column_family);
385   const char *thrift_host= options->thrift_host? options->thrift_host:
386                            cassandra_default_thrift_host;
387   if (se->connect(thrift_host, options->thrift_port, options->keyspace))
388   {
389     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), se->error_str());
390     DBUG_RETURN(HA_ERR_NO_CONNECTION);
391   }
392 
393   if (setup_field_converters(table_arg->field, table_arg->s->fields))
394   {
395     DBUG_RETURN(HA_ERR_NO_CONNECTION);
396   }
397 
398   DBUG_RETURN(0);
399 }
400 
401 
check_field_options(Field ** fields)402 int ha_cassandra::check_field_options(Field **fields)
403 {
404   Field **field;
405   uint i;
406   DBUG_ENTER("ha_cassandra::check_field_options");
407   for (field= fields, i= 0; *field; field++, i++)
408   {
409     ha_field_option_struct *field_options= (*field)->option_struct;
410     if (field_options && field_options->dyncol_field)
411     {
412       if (dyncol_set || (*field)->type() != MYSQL_TYPE_BLOB)
413       {
414          my_error(ER_WRONG_FIELD_SPEC, MYF(0), (*field)->field_name.str);
415          DBUG_RETURN(HA_WRONG_CREATE_OPTION);
416       }
417       dyncol_set= 1;
418       dyncol_field= i;
419       bzero(&dynamic_values, sizeof(dynamic_values));
420       bzero(&dynamic_names, sizeof(dynamic_names));
421       bzero(&dynamic_rec, sizeof(dynamic_rec));
422     }
423   }
424   DBUG_RETURN(0);
425 }
426 
427 
open(const char * name,int mode,uint test_if_locked)428 int ha_cassandra::open(const char *name, int mode, uint test_if_locked)
429 {
430   DBUG_ENTER("ha_cassandra::open");
431 
432   if (!(share = get_share(name, table)))
433     DBUG_RETURN(1);
434   thr_lock_data_init(&share->lock,&lock,NULL);
435 
436   DBUG_ASSERT(!se);
437   /*
438     Don't do the following on open: it prevents SHOW CREATE TABLE when the server
439     has gone away.
440   */
441   /*
442   int res;
443   if ((res= connect_and_check_options(table)))
444   {
445     DBUG_RETURN(res);
446   }
447   */
448 
449   info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
450   insert_lineno= 0;
451 
452   DBUG_RETURN(0);
453 }
454 
455 
close(void)456 int ha_cassandra::close(void)
457 {
458   DBUG_ENTER("ha_cassandra::close");
459   delete se;
460   se= NULL;
461   free_field_converters();
462   DBUG_RETURN(free_share(share));
463 }
464 
465 
check_table_options(ha_table_option_struct * options)466 int ha_cassandra::check_table_options(ha_table_option_struct *options)
467 {
468   if (!options->thrift_host && (!cassandra_default_thrift_host ||
469                                 !cassandra_default_thrift_host[0]))
470   {
471     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
472              "thrift_host table option must be specified, or "
473              "@@cassandra_default_thrift_host must be set");
474     return HA_WRONG_CREATE_OPTION;
475   }
476 
477   if (!options->keyspace || !options->column_family)
478   {
479     my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0),
480              "keyspace and column_family table options must be specified");
481     return HA_WRONG_CREATE_OPTION;
482   }
483   return 0;
484 }
485 
486 
487 /**
488   @brief
489   create() is called to create a table. The variable name will have the name
490   of the table.
491 
492   @details
493   When create() is called you do not need to worry about
494   opening the table. Also, the .frm file will have already been
495   created so adjusting create_info is not necessary. You can overwrite
496   the .frm file at this point if you wish to change the table
497   definition, but there are no methods currently provided for doing
498   so.
499 
500   Called from handle.cc by ha_create_table().
501 
502   @see
503   ha_create_table() in handle.cc
504 */
505 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)506 int ha_cassandra::create(const char *name, TABLE *table_arg,
507                          HA_CREATE_INFO *create_info)
508 {
509   int res;
510   DBUG_ENTER("ha_cassandra::create");
511 
512   if (table_arg->s->keys != 1 || table_arg->s->primary_key !=0 ||
513       table_arg->key_info[0].user_defined_key_parts != 1 ||
514       table_arg->key_info[0].key_part[0].fieldnr != 1)
515   {
516     my_error(ER_WRONG_COLUMN_NAME, MYF(0),
517              "Table must have PRIMARY KEY defined over the first column");
518     DBUG_RETURN(HA_WRONG_CREATE_OPTION);
519   }
520 
521   DBUG_ASSERT(!se);
522   if ((res= connect_and_check_options(table_arg)))
523     DBUG_RETURN(res);
524 
525   insert_lineno= 0;
526   DBUG_RETURN(0);
527 }
528 
529 /*
530   Mapping needs to
531   - copy value from MySQL record to Thrift buffer
532   - copy value from Thrift bufer to MySQL record..
533 
534 */
535 
536 /* Converter base */
537 class ColumnDataConverter
538 {
539 public:
540   Field *field;
541 
542   /* This will save Cassandra's data in the Field */
543   virtual int cassandra_to_mariadb(const char *cass_data,
544                                     int cass_data_len)=0;
545 
546   /*
547     This will get data from the Field pointer, store Cassandra's form
548     in internal buffer, and return pointer/size.
549 
550     @return
551       false - OK
552       true  - Failed to convert value (completely, there is no value to insert
553               at all).
554   */
555   virtual bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)=0;
~ColumnDataConverter()556   virtual ~ColumnDataConverter() {};
557 };
558 
559 
560 class DoubleDataConverter : public ColumnDataConverter
561 {
562   double buf;
563 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)564   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
565   {
566     DBUG_ASSERT(cass_data_len == sizeof(double));
567     double *pdata= (double*) cass_data;
568     field->store(*pdata);
569     return 0;
570   }
571 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)572   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
573   {
574     buf= field->val_real();
575     *cass_data= (char*)&buf;
576     *cass_data_len=sizeof(double);
577     return false;
578   }
~DoubleDataConverter()579   ~DoubleDataConverter(){}
580 };
581 
582 
583 class FloatDataConverter : public ColumnDataConverter
584 {
585   float buf;
586 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)587   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
588   {
589     DBUG_ASSERT(cass_data_len == sizeof(float));
590     float *pdata= (float*) cass_data;
591     field->store(*pdata);
592     return 0;
593   }
594 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)595   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
596   {
597     buf= field->val_real();
598     *cass_data= (char*)&buf;
599     *cass_data_len=sizeof(float);
600     return false;
601   }
~FloatDataConverter()602   ~FloatDataConverter(){}
603 };
604 
flip64(const char * from,char * to)605 static void flip64(const char *from, char* to)
606 {
607   to[0]= from[7];
608   to[1]= from[6];
609   to[2]= from[5];
610   to[3]= from[4];
611   to[4]= from[3];
612   to[5]= from[2];
613   to[6]= from[1];
614   to[7]= from[0];
615 }
616 
617 class BigintDataConverter : public ColumnDataConverter
618 {
619   longlong buf;
620   bool flip; /* is false when reading counter columns */
621 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)622   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
623   {
624     longlong tmp;
625     DBUG_ASSERT(cass_data_len == sizeof(longlong));
626     if (flip)
627       flip64(cass_data, (char*)&tmp);
628     else
629       memcpy(&tmp, cass_data, sizeof(longlong));
630     field->store(tmp);
631     return 0;
632   }
633 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)634   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
635   {
636     longlong tmp= field->val_int();
637     if (flip)
638       flip64((const char*)&tmp, (char*)&buf);
639     else
640       memcpy(&buf, &tmp, sizeof(longlong));
641     *cass_data= (char*)&buf;
642     *cass_data_len=sizeof(longlong);
643     return false;
644   }
BigintDataConverter(bool flip_arg)645   BigintDataConverter(bool flip_arg) : flip(flip_arg) {}
~BigintDataConverter()646   ~BigintDataConverter(){}
647 };
648 
flip32(const char * from,char * to)649 static void flip32(const char *from, char* to)
650 {
651   to[0]= from[3];
652   to[1]= from[2];
653   to[2]= from[1];
654   to[3]= from[0];
655 }
656 
657 
658 class TinyintDataConverter : public ColumnDataConverter
659 {
660   char buf;
661 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)662   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
663   {
664     DBUG_ASSERT(cass_data_len == 1);
665     field->store(cass_data[0]);
666     return 0;
667   }
668 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)669   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
670   {
671     buf= field->val_int()? 1 : 0; /* TODO: error handling? */
672     *cass_data= (char*)&buf;
673     *cass_data_len= 1;
674     return false;
675   }
~TinyintDataConverter()676   ~TinyintDataConverter(){}
677 };
678 
679 
680 class Int32DataConverter : public ColumnDataConverter
681 {
682   int32_t buf;
683 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)684   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
685   {
686     int32_t tmp;
687     DBUG_ASSERT(cass_data_len == sizeof(int32_t));
688     flip32(cass_data, (char*)&tmp);
689     field->store(tmp);
690     return 0;
691   }
692 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)693   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
694   {
695     int32_t tmp= field->val_int();
696     flip32((const char*)&tmp, (char*)&buf);
697     *cass_data= (char*)&buf;
698     *cass_data_len=sizeof(int32_t);
699     return false;
700   }
~Int32DataConverter()701   ~Int32DataConverter(){}
702 };
703 
704 
705 class StringCopyConverter : public ColumnDataConverter
706 {
707   String buf;
708   size_t max_length;
709 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)710   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
711   {
712     if ((size_t)cass_data_len > max_length)
713       return 1;
714     field->store(cass_data, cass_data_len,field->charset());
715     return 0;
716   }
717 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)718   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
719   {
720     String *pstr= field->val_str(&buf);
721     *cass_data= (char*)pstr->ptr();
722     *cass_data_len= pstr->length();
723     return false;
724   }
StringCopyConverter(size_t max_length_arg)725   StringCopyConverter(size_t max_length_arg) : max_length(max_length_arg) {}
~StringCopyConverter()726   ~StringCopyConverter(){}
727 };
728 
729 
730 class TimestampDataConverter : public ColumnDataConverter
731 {
732   int64_t buf;
733 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)734   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
735   {
736     /* Cassandra data is milliseconds-since-epoch in network byte order */
737     int64_t tmp;
738     DBUG_ASSERT(cass_data_len==8);
739     flip64(cass_data, (char*)&tmp);
740     /*
741       store_TIME's arguments:
742       - seconds since epoch
743       - microsecond fraction of a second.
744     */
745     ((Field_timestamp*)field)->store_TIME(tmp / 1000, (tmp % 1000)*1000);
746     return 0;
747   }
748 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)749   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
750   {
751     my_time_t ts_time;
752     ulong ts_microsec;
753     int64_t tmp;
754     ts_time= ((Field_timestamp*)field)->get_timestamp(&ts_microsec);
755 
756     /* Cassandra needs milliseconds-since-epoch */
757     tmp= ((int64_t)ts_time) * 1000 + ts_microsec/1000;
758     flip64((const char*)&tmp, (char*)&buf);
759 
760     *cass_data= (char*)&buf;
761     *cass_data_len= 8;
762     return false;
763   }
~TimestampDataConverter()764   ~TimestampDataConverter(){}
765 };
766 
767 
768 
convert_hex_digit(const char c)769 static int convert_hex_digit(const char c)
770 {
771   int num;
772   if (c >= '0' && c <= '9')
773     num= c - '0';
774   else if (c >= 'A' && c <= 'F')
775     num= c - 'A' + 10;
776   else if (c >= 'a' && c <= 'f')
777     num= c - 'a' + 10;
778   else
779     return -1; /* Couldn't convert */
780   return num;
781 }
782 
783 
784 const char map2number[]="0123456789abcdef";
785 
convert_uuid2string(char * str,const char * cass_data)786 static void convert_uuid2string(char *str, const char *cass_data)
787 {
788   char *ptr= str;
789   /* UUID arrives as 16-byte number in network byte order */
790   for (uint i=0; i < 16; i++)
791   {
792     *(ptr++)= map2number[(cass_data[i] >> 4) & 0xF];
793     *(ptr++)= map2number[cass_data[i] & 0xF];
794     if (i == 3 || i == 5 || i == 7 || i == 9)
795       *(ptr++)= '-';
796   }
797   *ptr= 0;
798 }
799 
convert_string2uuid(char * buf,const char * str)800 static bool convert_string2uuid(char *buf, const char *str)
801 {
802   int lower, upper;
803   for (uint i= 0; i < 16; i++)
804   {
805     if ((upper= convert_hex_digit(str[0])) == -1 ||
806         (lower= convert_hex_digit(str[1])) == -1)
807     {
808       return true;
809     }
810     buf[i]= lower | (upper << 4);
811     str += 2;
812     if (i == 3 || i == 5 || i == 7 || i == 9)
813     {
814       if (str[0] != '-')
815         return true;
816       str++;
817     }
818   }
819   return false;
820 }
821 
822 
823 class UuidDataConverter : public ColumnDataConverter
824 {
825   char buf[16]; /* Binary UUID representation */
826   String str_buf;
827 public:
cassandra_to_mariadb(const char * cass_data,int cass_data_len)828   int cassandra_to_mariadb(const char *cass_data, int cass_data_len)
829   {
830     DBUG_ASSERT(cass_data_len==16);
831     char str[37];
832     convert_uuid2string(str, cass_data);
833     field->store(str, 36,field->charset());
834     return 0;
835   }
836 
mariadb_to_cassandra(char ** cass_data,int * cass_data_len)837   bool mariadb_to_cassandra(char **cass_data, int *cass_data_len)
838   {
839     String *uuid_str= field->val_str(&str_buf);
840 
841     if (uuid_str->length() != 36)
842       return true;
843 
844     if (convert_string2uuid(buf, (char*)uuid_str->c_ptr()))
845       return true;
846     *cass_data= buf;
847     *cass_data_len= 16;
848     return false;
849   }
~UuidDataConverter()850   ~UuidDataConverter(){}
851 };
852 
853 /**
854   Converting dynamic columns types to/from casandra types
855 */
856 
857 
858 /**
859   Check and initialize (if it is needed) string MEM_ROOT
860 */
alloc_strings_memroot(MEM_ROOT * mem_root)861 static void alloc_strings_memroot(MEM_ROOT *mem_root)
862 {
863   if (!alloc_root_inited(mem_root))
864   {
865     /*
866       The mem_root used to allocate UUID (of length 36 + \0) so make
867       appropriate allocated size
868     */
869     init_alloc_root(mem_root, "cassandra",
870                     (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
871                     ALLOC_ROOT_MIN_BLOCK_SIZE,
872                     (36 + 1 + ALIGN_SIZE(sizeof(USED_MEM))) * 10 +
873                     ALLOC_ROOT_MIN_BLOCK_SIZE, MYF(MY_THREAD_SPECIFIC));
874   }
875 }
876 
free_strings_memroot(MEM_ROOT * mem_root)877 static void free_strings_memroot(MEM_ROOT *mem_root)
878 {
879   if (alloc_root_inited(mem_root))
880     free_root(mem_root, MYF(0));
881 }
882 
cassandra_to_dyncol_intLong(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)883 bool cassandra_to_dyncol_intLong(const char *cass_data,
884                                  int cass_data_len __attribute__((unused)),
885                                  DYNAMIC_COLUMN_VALUE *value,
886                                  MEM_ROOT *mem_root __attribute__((unused)))
887 {
888   value->type= DYN_COL_INT;
889 #ifdef WORDS_BIGENDIAN
890   value->x.long_value= (longlong)*cass_data;
891 #else
892   flip64(cass_data, (char *)&value->x.long_value);
893 #endif
894   return 0;
895 }
896 
dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)897 bool dyncol_to_cassandraLong(DYNAMIC_COLUMN_VALUE *value,
898                              char **cass_data, int *cass_data_len,
899                              void* buff, void **freemem)
900 {
901   longlong *tmp= (longlong *) buff;
902   enum enum_dyncol_func_result rc=
903     mariadb_dyncol_val_long(tmp, value);
904   if (rc < 0)
905     return true;
906   *cass_data_len= sizeof(longlong);
907 #ifdef WORDS_BIGENDIAN
908   *cass_data= (char *)buff;
909 #else
910   flip64((char *)buff, (char *)buff + sizeof(longlong));
911   *cass_data= (char *)buff + sizeof(longlong);
912 #endif
913   *freemem= NULL;
914   return false;
915 }
916 
cassandra_to_dyncol_intInt32(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)917 bool cassandra_to_dyncol_intInt32(const char *cass_data,
918                                   int cass_data_len __attribute__((unused)),
919                                   DYNAMIC_COLUMN_VALUE *value,
920                                   MEM_ROOT *mem_root __attribute__((unused)))
921 {
922   int32 tmp;
923   value->type= DYN_COL_INT;
924 #ifdef WORDS_BIGENDIAN
925   tmp= *((int32 *)cass_data);
926 #else
927   flip32(cass_data, (char *)&tmp);
928 #endif
929   value->x.long_value= tmp;
930   return 0;
931 }
932 
933 
dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)934 bool dyncol_to_cassandraInt32(DYNAMIC_COLUMN_VALUE *value,
935                               char **cass_data, int *cass_data_len,
936                               void* buff, void **freemem)
937 {
938   longlong *tmp= (longlong *) ((char *)buff + sizeof(longlong));
939   enum enum_dyncol_func_result rc=
940     mariadb_dyncol_val_long(tmp, value);
941   if (rc < 0)
942     return true;
943   *cass_data_len= sizeof(int32);
944   *cass_data= (char *)buff;
945 #ifdef WORDS_BIGENDIAN
946   *((int32 *) buff) = (int32) *tmp;
947 #else
948   {
949     int32 tmp2= (int32) *tmp;
950     flip32((char *)&tmp2, (char *)buff);
951   }
952 #endif
953   *freemem= NULL;
954   return false;
955 }
956 
957 
cassandra_to_dyncol_intCounter(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)958 bool cassandra_to_dyncol_intCounter(const char *cass_data,
959                                     int cass_data_len __attribute__((unused)),
960                                     DYNAMIC_COLUMN_VALUE *value,
961                                     MEM_ROOT *mem_root __attribute__((unused)))
962 {
963   value->type= DYN_COL_INT;
964   value->x.long_value= *((longlong *)cass_data);
965   return 0;
966 }
967 
968 
dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)969 bool dyncol_to_cassandraCounter(DYNAMIC_COLUMN_VALUE *value,
970                                 char **cass_data, int *cass_data_len,
971                                 void* buff, void **freemem)
972 {
973   longlong *tmp= (longlong *)buff;
974   enum enum_dyncol_func_result rc=
975     mariadb_dyncol_val_long(tmp, value);
976   if (rc < 0)
977     return true;
978   *cass_data_len= sizeof(longlong);
979   *cass_data= (char *)buff;
980   *freemem= NULL;
981   return false;
982 }
983 
cassandra_to_dyncol_doubleFloat(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)984 bool cassandra_to_dyncol_doubleFloat(const char *cass_data,
985                                      int cass_data_len __attribute__((unused)),
986                                      DYNAMIC_COLUMN_VALUE *value,
987                                      MEM_ROOT *mem_root __attribute__((unused)))
988 {
989   value->type= DYN_COL_DOUBLE;
990   value->x.double_value= *((float *)cass_data);
991   return 0;
992 }
993 
dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)994 bool dyncol_to_cassandraFloat(DYNAMIC_COLUMN_VALUE *value,
995                               char **cass_data, int *cass_data_len,
996                               void* buff, void **freemem)
997 {
998   double tmp;
999   enum enum_dyncol_func_result rc=
1000     mariadb_dyncol_val_double(&tmp, value);
1001   if (rc < 0)
1002     return true;
1003   *((float *)buff)= (float) tmp;
1004   *cass_data_len= sizeof(float);
1005   *cass_data= (char *)buff;
1006   *freemem= NULL;
1007   return false;
1008 }
1009 
cassandra_to_dyncol_doubleDouble(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1010 bool cassandra_to_dyncol_doubleDouble(const char *cass_data,
1011                                       int cass_data_len __attribute__((unused)),
1012                                       DYNAMIC_COLUMN_VALUE *value,
1013                                       MEM_ROOT *mem_root
1014                                       __attribute__((unused)))
1015 {
1016   value->type= DYN_COL_DOUBLE;
1017   value->x.double_value= *((double *)cass_data);
1018   return 0;
1019 }
1020 
dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1021 bool dyncol_to_cassandraDouble(DYNAMIC_COLUMN_VALUE *value,
1022                                char **cass_data, int *cass_data_len,
1023                                void* buff, void **freemem)
1024 {
1025   double *tmp= (double *)buff;
1026   enum enum_dyncol_func_result rc=
1027     mariadb_dyncol_val_double(tmp, value);
1028   if (rc < 0)
1029     return true;
1030   *cass_data_len= sizeof(double);
1031   *cass_data= (char *)buff;
1032   *freemem= NULL;
1033   return false;
1034 }
1035 
cassandra_to_dyncol_strStr(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,CHARSET_INFO * cs)1036 bool cassandra_to_dyncol_strStr(const char *cass_data,
1037                                 int cass_data_len,
1038                                 DYNAMIC_COLUMN_VALUE *value,
1039                                 CHARSET_INFO *cs)
1040 {
1041   value->type= DYN_COL_STRING;
1042   value->x.string.charset= cs;
1043   value->x.string.value.str= (char *)cass_data;
1044   value->x.string.value.length= cass_data_len;
1045   return 0;
1046 }
1047 
dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem,CHARSET_INFO * cs)1048 bool dyncol_to_cassandraStr(DYNAMIC_COLUMN_VALUE *value,
1049                             char **cass_data, int *cass_data_len,
1050                             void* buff, void **freemem, CHARSET_INFO *cs)
1051 {
1052   DYNAMIC_STRING tmp;
1053   if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1054     return 1;
1055   enum enum_dyncol_func_result rc=
1056     mariadb_dyncol_val_str(&tmp, value, cs, '\0');
1057   if (rc < 0)
1058   {
1059     dynstr_free(&tmp);
1060     return 1;
1061   }
1062   *cass_data_len= tmp.length;
1063   *(cass_data)= tmp.str;
1064   *freemem= tmp.str;
1065   return 0;
1066 }
1067 
cassandra_to_dyncol_strBytes(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1068 bool cassandra_to_dyncol_strBytes(const char *cass_data,
1069                                   int cass_data_len,
1070                                   DYNAMIC_COLUMN_VALUE *value,
1071                                   MEM_ROOT *mem_root __attribute__((unused)))
1072 {
1073   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1074                                     &my_charset_bin);
1075 }
1076 
dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1077 bool dyncol_to_cassandraBytes(DYNAMIC_COLUMN_VALUE *value,
1078                               char **cass_data, int *cass_data_len,
1079                               void* buff, void **freemem)
1080 {
1081   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1082                                 buff, freemem, &my_charset_bin);
1083 }
1084 
cassandra_to_dyncol_strAscii(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1085 bool cassandra_to_dyncol_strAscii(const char *cass_data,
1086                                   int cass_data_len,
1087                                   DYNAMIC_COLUMN_VALUE *value,
1088                                   MEM_ROOT *mem_root __attribute__((unused)))
1089 {
1090   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1091                                     &my_charset_latin1_bin);
1092 }
1093 
dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1094 bool dyncol_to_cassandraAscii(DYNAMIC_COLUMN_VALUE *value,
1095                               char **cass_data, int *cass_data_len,
1096                               void* buff, void **freemem)
1097 {
1098   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1099                                 buff, freemem, &my_charset_latin1_bin);
1100 }
1101 
cassandra_to_dyncol_strUTF8(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1102 bool cassandra_to_dyncol_strUTF8(const char *cass_data,
1103                                  int cass_data_len,
1104                                  DYNAMIC_COLUMN_VALUE *value,
1105                                  MEM_ROOT *mem_root __attribute__((unused)))
1106 {
1107   return cassandra_to_dyncol_strStr(cass_data, cass_data_len, value,
1108                                     &my_charset_utf8_unicode_ci);
1109 }
1110 
dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1111 bool dyncol_to_cassandraUTF8(DYNAMIC_COLUMN_VALUE *value,
1112                              char **cass_data, int *cass_data_len,
1113                              void* buff, void **freemem)
1114 {
1115   return dyncol_to_cassandraStr(value, cass_data, cass_data_len,
1116                                 buff, freemem, &my_charset_utf8_unicode_ci);
1117 }
1118 
cassandra_to_dyncol_strUUID(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1119 bool cassandra_to_dyncol_strUUID(const char *cass_data,
1120                                  int cass_data_len,
1121                                  DYNAMIC_COLUMN_VALUE *value,
1122                                  MEM_ROOT *mem_root)
1123 {
1124   value->type= DYN_COL_STRING;
1125   value->x.string.charset= &my_charset_bin;
1126   alloc_strings_memroot(mem_root);
1127   value->x.string.value.str= (char *)alloc_root(mem_root, 37);
1128   if (!value->x.string.value.str)
1129   {
1130     value->x.string.value.length= 0;
1131     return 1;
1132   }
1133   convert_uuid2string(value->x.string.value.str, cass_data);
1134   value->x.string.value.length= 36;
1135   return 0;
1136 }
1137 
dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1138 bool dyncol_to_cassandraUUID(DYNAMIC_COLUMN_VALUE *value,
1139                              char **cass_data, int *cass_data_len,
1140                              void* buff, void **freemem)
1141 {
1142   DYNAMIC_STRING tmp;
1143   if (init_dynamic_string(&tmp, NULL, 1024, 1024))
1144     return true;
1145   enum enum_dyncol_func_result rc=
1146     mariadb_dyncol_val_str(&tmp, value, &my_charset_latin1_bin, '\0');
1147   if (rc < 0 || tmp.length != 36 || convert_string2uuid((char *)buff, tmp.str))
1148   {
1149     dynstr_free(&tmp);
1150     return true;
1151   }
1152 
1153   *cass_data_len= tmp.length;
1154   *(cass_data)= tmp.str;
1155   *freemem= tmp.str;
1156   return 0;
1157 }
1158 
cassandra_to_dyncol_intBool(const char * cass_data,int cass_data_len,DYNAMIC_COLUMN_VALUE * value,MEM_ROOT * mem_root)1159 bool cassandra_to_dyncol_intBool(const char *cass_data,
1160                                  int cass_data_len,
1161                                  DYNAMIC_COLUMN_VALUE *value,
1162                                  MEM_ROOT *mem_root __attribute__((unused)))
1163 {
1164   value->type= DYN_COL_INT;
1165   value->x.long_value= (cass_data[0] ? 1 : 0);
1166   return 0;
1167 }
1168 
dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE * value,char ** cass_data,int * cass_data_len,void * buff,void ** freemem)1169 bool dyncol_to_cassandraBool(DYNAMIC_COLUMN_VALUE *value,
1170                              char **cass_data, int *cass_data_len,
1171                              void* buff, void **freemem)
1172 {
1173   longlong tmp;
1174   enum enum_dyncol_func_result rc=
1175     mariadb_dyncol_val_long(&tmp, value);
1176   if (rc < 0)
1177     return true;
1178   ((char *)buff)[0]= (tmp ? 1 : 0);
1179   *cass_data_len= 1;
1180   *(cass_data)= (char *)buff;
1181   *freemem= 0;
1182   return 0;
1183 }
1184 
1185 
1186 const char * const validator_bigint=  "org.apache.cassandra.db.marshal.LongType";
1187 const char * const validator_int=     "org.apache.cassandra.db.marshal.Int32Type";
1188 const char * const validator_counter= "org.apache.cassandra.db.marshal.CounterColumnType";
1189 
1190 const char * const validator_float=   "org.apache.cassandra.db.marshal.FloatType";
1191 const char * const validator_double=  "org.apache.cassandra.db.marshal.DoubleType";
1192 
1193 const char * const validator_blob=    "org.apache.cassandra.db.marshal.BytesType";
1194 const char * const validator_ascii=   "org.apache.cassandra.db.marshal.AsciiType";
1195 const char * const validator_text=    "org.apache.cassandra.db.marshal.UTF8Type";
1196 
1197 const char * const validator_timestamp="org.apache.cassandra.db.marshal.DateType";
1198 
1199 const char * const validator_uuid= "org.apache.cassandra.db.marshal.UUIDType";
1200 
1201 const char * const validator_boolean= "org.apache.cassandra.db.marshal.BooleanType";
1202 
1203 /* VARINTs are stored as big-endian big numbers. */
1204 const char * const validator_varint= "org.apache.cassandra.db.marshal.IntegerType";
1205 const char * const validator_decimal= "org.apache.cassandra.db.marshal.DecimalType";
1206 
1207 
1208 static CASSANDRA_TYPE_DEF cassandra_types[]=
1209 {
1210   {
1211     validator_bigint,
1212     &cassandra_to_dyncol_intLong,
1213     &dyncol_to_cassandraLong
1214   },
1215   {
1216     validator_int,
1217     &cassandra_to_dyncol_intInt32,
1218     &dyncol_to_cassandraInt32
1219   },
1220   {
1221     validator_counter,
1222     cassandra_to_dyncol_intCounter,
1223     &dyncol_to_cassandraCounter
1224   },
1225   {
1226     validator_float,
1227     &cassandra_to_dyncol_doubleFloat,
1228     &dyncol_to_cassandraFloat
1229   },
1230   {
1231     validator_double,
1232     &cassandra_to_dyncol_doubleDouble,
1233     &dyncol_to_cassandraDouble
1234   },
1235   {
1236     validator_blob,
1237     &cassandra_to_dyncol_strBytes,
1238     &dyncol_to_cassandraBytes
1239   },
1240   {
1241     validator_ascii,
1242     &cassandra_to_dyncol_strAscii,
1243     &dyncol_to_cassandraAscii
1244   },
1245   {
1246     validator_text,
1247     &cassandra_to_dyncol_strUTF8,
1248     &dyncol_to_cassandraUTF8
1249   },
1250   {
1251     validator_timestamp,
1252     &cassandra_to_dyncol_intLong,
1253     &dyncol_to_cassandraLong
1254   },
1255   {
1256     validator_uuid,
1257     &cassandra_to_dyncol_strUUID,
1258     &dyncol_to_cassandraUUID
1259   },
1260   {
1261     validator_boolean,
1262     &cassandra_to_dyncol_intBool,
1263     &dyncol_to_cassandraBool
1264   },
1265   {
1266     validator_varint,
1267     &cassandra_to_dyncol_strBytes,
1268     &dyncol_to_cassandraBytes
1269   },
1270   {
1271     validator_decimal,
1272     &cassandra_to_dyncol_strBytes,
1273     &dyncol_to_cassandraBytes
1274   }
1275 };
1276 
get_cassandra_type(const char * validator)1277 CASSANDRA_TYPE get_cassandra_type(const char *validator)
1278 {
1279   CASSANDRA_TYPE rc;
1280   switch(validator[32])
1281   {
1282   case 'L':
1283     rc= CT_BIGINT;
1284     break;
1285   case 'I':
1286     rc= (validator[35] == '3' ? CT_INT : CT_VARINT);
1287     rc= CT_INT;
1288     break;
1289   case 'C':
1290     rc= CT_COUNTER;
1291     break;
1292   case 'F':
1293     rc= CT_FLOAT;
1294     break;
1295   case 'D':
1296     switch (validator[33])
1297     {
1298     case 'o':
1299       rc= CT_DOUBLE;
1300       break;
1301     case 'a':
1302       rc= CT_TIMESTAMP;
1303       break;
1304     case 'e':
1305       rc= CT_DECIMAL;
1306       break;
1307     default:
1308       rc= CT_BLOB;
1309       break;
1310     }
1311     break;
1312   case 'B':
1313     rc= (validator[33] == 'o' ? CT_BOOLEAN : CT_BLOB);
1314     break;
1315   case 'A':
1316     rc= CT_ASCII;
1317     break;
1318   case 'U':
1319     rc= (validator[33] == 'T' ? CT_TEXT : CT_UUID);
1320     break;
1321   default:
1322     rc= CT_BLOB;
1323   }
1324   DBUG_ASSERT(strcmp(cassandra_types[rc].name, validator) == 0);
1325   return rc;
1326 }
1327 
map_field_to_validator(Field * field,const char * validator_name)1328 ColumnDataConverter *map_field_to_validator(Field *field, const char *validator_name)
1329 {
1330   ColumnDataConverter *res= NULL;
1331 
1332   switch(field->type()) {
1333     case MYSQL_TYPE_TINY:
1334       if (!strcmp(validator_name, validator_boolean))
1335       {
1336         res= new TinyintDataConverter;
1337         break;
1338       }
1339       /* fall through: */
1340     case MYSQL_TYPE_SHORT:
1341     case MYSQL_TYPE_LONGLONG:
1342     {
1343       bool is_counter= false;
1344       if (!strcmp(validator_name, validator_bigint) ||
1345           !strcmp(validator_name, validator_timestamp) ||
1346           (is_counter= !strcmp(validator_name, validator_counter)))
1347         res= new BigintDataConverter(!is_counter);
1348       break;
1349     }
1350     case MYSQL_TYPE_FLOAT:
1351       if (!strcmp(validator_name, validator_float))
1352         res= new FloatDataConverter;
1353       break;
1354 
1355     case MYSQL_TYPE_DOUBLE:
1356       if (!strcmp(validator_name, validator_double))
1357         res= new DoubleDataConverter;
1358       break;
1359 
1360     case MYSQL_TYPE_TIMESTAMP:
1361       if (!strcmp(validator_name, validator_timestamp))
1362         res= new TimestampDataConverter;
1363       break;
1364 
1365     case MYSQL_TYPE_STRING: // these are space padded CHAR(n) strings.
1366       if (!strcmp(validator_name, validator_uuid) &&
1367           field->real_type() == MYSQL_TYPE_STRING &&
1368           field->field_length == 36)
1369       {
1370         // UUID maps to CHAR(36), its text representation
1371         res= new UuidDataConverter;
1372         break;
1373       }
1374       /* fall through: */
1375     case MYSQL_TYPE_VAR_STRING:
1376     case MYSQL_TYPE_VARCHAR:
1377     case MYSQL_TYPE_BLOB:
1378     {
1379       /*
1380         Cassandra's "varint" type is a binary-encoded arbitary-length
1381         big-endian number.
1382         - It can be mapped to VARBINARY(N), with sufficiently big N.
1383         - If the value does not fit into N bytes, it is an error. We should not
1384           truncate it, because that is just as good as returning garbage.
1385         - varint should not be mapped to BINARY(N), because BINARY(N) values
1386           are zero-padded, which will work as multiplying the value by
1387           2^k for some value of k.
1388       */
1389       if (field->type() == MYSQL_TYPE_VARCHAR &&
1390           field->binary() &&
1391           (!strcmp(validator_name, validator_varint) ||
1392            !strcmp(validator_name, validator_decimal)))
1393       {
1394         res= new StringCopyConverter(field->field_length);
1395         break;
1396       }
1397 
1398       if (!strcmp(validator_name, validator_blob) ||
1399           !strcmp(validator_name, validator_ascii) ||
1400           !strcmp(validator_name, validator_text))
1401       {
1402         res= new StringCopyConverter((size_t)-1);
1403       }
1404       break;
1405     }
1406     case MYSQL_TYPE_LONG:
1407       if (!strcmp(validator_name, validator_int))
1408         res= new Int32DataConverter;
1409       break;
1410 
1411     default:;
1412   }
1413   return res;
1414 }
1415 
1416 
setup_field_converters(Field ** field_arg,uint n_fields)1417 bool ha_cassandra::setup_field_converters(Field **field_arg, uint n_fields)
1418 {
1419   char *col_name;
1420   int  col_name_len;
1421   char *col_type;
1422   int col_type_len;
1423   size_t ddl_fields= se->get_ddl_size();
1424   const char *default_type= se->get_default_validator();
1425   uint max_non_default_fields;
1426   DBUG_ENTER("ha_cassandra::setup_field_converters");
1427   DBUG_ASSERT(default_type);
1428 
1429   DBUG_ASSERT(!field_converters);
1430   DBUG_ASSERT(dyncol_set == 0 || dyncol_set == 1);
1431 
1432   /*
1433     We always should take into account that in case of using dynamic columns
1434     sql description contain one field which does not described in
1435     Cassandra DDL also key field is described separately. So that
1436     is why we use "n_fields - dyncol_set - 1" or "ddl_fields + 2".
1437   */
1438   max_non_default_fields= ddl_fields + 2 - n_fields;
1439   if (ddl_fields < (n_fields - dyncol_set - 1))
1440   {
1441     se->print_error("Some of SQL fields were not mapped to Cassandra's fields");
1442     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1443     DBUG_RETURN(true);
1444   }
1445 
1446   /* allocate memory in one chunk */
1447   size_t memsize= sizeof(ColumnDataConverter*) * n_fields +
1448     (sizeof(LEX_STRING) + sizeof(CASSANDRA_TYPE_DEF))*
1449     (dyncol_set ? max_non_default_fields : 0);
1450   if (!(field_converters= (ColumnDataConverter**)my_malloc(memsize, MYF(0))))
1451     DBUG_RETURN(true);
1452   bzero(field_converters, memsize);
1453   n_field_converters= n_fields;
1454 
1455   if (dyncol_set)
1456   {
1457     special_type_field_converters=
1458       (CASSANDRA_TYPE_DEF *)(field_converters + n_fields);
1459     special_type_field_names=
1460       ((LEX_STRING*)(special_type_field_converters + max_non_default_fields));
1461 
1462     if (my_init_dynamic_array(&dynamic_values,
1463                            sizeof(DYNAMIC_COLUMN_VALUE),
1464                            DYNCOL_USUAL, DYNCOL_DELTA, MYF(0)))
1465       DBUG_RETURN(true);
1466     else
1467       if (my_init_dynamic_array(&dynamic_names,
1468                              sizeof(LEX_STRING),
1469                              DYNCOL_USUAL, DYNCOL_DELTA,MYF(0)))
1470       {
1471         delete_dynamic(&dynamic_values);
1472         DBUG_RETURN(true);
1473       }
1474       else
1475         if (init_dynamic_string(&dynamic_rec, NULL,
1476                                 DYNCOL_USUAL_REC, DYNCOL_DELTA_REC))
1477         {
1478           delete_dynamic(&dynamic_values);
1479           delete_dynamic(&dynamic_names);
1480           DBUG_RETURN(true);
1481         }
1482 
1483     /* Dynamic column field has special processing */
1484     field_converters[dyncol_field]= NULL;
1485 
1486     default_type_def= cassandra_types + get_cassandra_type(default_type);
1487   }
1488 
1489   se->first_ddl_column();
1490   uint n_mapped= 0;
1491   while (!se->next_ddl_column(&col_name, &col_name_len, &col_type,
1492                               &col_type_len))
1493   {
1494     Field **field;
1495     uint i;
1496     /* Mapping for the 1st field is already known */
1497     for (field= field_arg + 1, i= 1; *field; field++, i++)
1498     {
1499       if ((!dyncol_set || dyncol_field != i) &&
1500           !strcmp((*field)->field_name.str, col_name))
1501       {
1502         n_mapped++;
1503         ColumnDataConverter **conv= field_converters + (*field)->field_index;
1504         if (!(*conv= map_field_to_validator(*field, col_type)))
1505         {
1506           se->print_error("Failed to map column %s to datatype %s",
1507                           (*field)->field_name.str, col_type);
1508           my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1509           DBUG_RETURN(true);
1510         }
1511         (*conv)->field= *field;
1512         break;
1513       }
1514     }
1515     if (dyncol_set && !(*field)) // is needed and not found
1516     {
1517       DBUG_PRINT("info",("Field not found: %s", col_name));
1518       if (strcmp(col_type, default_type))
1519       {
1520         DBUG_PRINT("info",("Field '%s' non-default type: '%s'",
1521                            col_name, col_type));
1522         special_type_field_names[n_special_type_fields].length= col_name_len;
1523         special_type_field_names[n_special_type_fields].str= col_name;
1524         special_type_field_converters[n_special_type_fields]=
1525           cassandra_types[get_cassandra_type(col_type)];
1526         n_special_type_fields++;
1527       }
1528     }
1529   }
1530 
1531   if (n_mapped != n_fields - 1 - dyncol_set)
1532   {
1533     Field *first_unmapped= NULL;
1534     /* Find the first field */
1535     for (uint i= 1; i < n_fields;i++)
1536     {
1537       if (!field_converters[i])
1538       {
1539         first_unmapped= field_arg[i];
1540         break;
1541       }
1542     }
1543     DBUG_ASSERT(first_unmapped);
1544 
1545     se->print_error("Field `%s` could not be mapped to any field in Cassandra",
1546                     first_unmapped->field_name.str);
1547     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1548     DBUG_RETURN(true);
1549   }
1550 
1551   /*
1552     Setup type conversion for row_key.
1553   */
1554   se->get_rowkey_type(&col_name, &col_type);
1555   if (col_name && strcmp(col_name, (*field_arg)->field_name.str))
1556   {
1557     se->print_error("PRIMARY KEY column must match Cassandra's name '%s'",
1558                     col_name);
1559     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1560     DBUG_RETURN(true);
1561   }
1562   if (!col_name && strcmp("rowkey", (*field_arg)->field_name.str))
1563   {
1564     se->print_error("target column family has no key_alias defined, "
1565                     "PRIMARY KEY column must be named 'rowkey'");
1566     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1567     DBUG_RETURN(true);
1568   }
1569 
1570   if (col_type != NULL)
1571   {
1572     if (!(rowkey_converter= map_field_to_validator(*field_arg, col_type)))
1573     {
1574       se->print_error("Failed to map PRIMARY KEY to datatype %s", col_type);
1575       my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1576       DBUG_RETURN(true);
1577     }
1578     rowkey_converter->field= *field_arg;
1579   }
1580   else
1581   {
1582     se->print_error("Cassandra's rowkey has no defined datatype (todo: support this)");
1583     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1584     DBUG_RETURN(true);
1585   }
1586 
1587   DBUG_RETURN(false);
1588 }
1589 
1590 
free_field_converters()1591 void ha_cassandra::free_field_converters()
1592 {
1593   delete rowkey_converter;
1594   rowkey_converter= NULL;
1595 
1596   if (dyncol_set)
1597   {
1598     delete_dynamic(&dynamic_values);
1599     delete_dynamic(&dynamic_names);
1600     dynstr_free(&dynamic_rec);
1601   }
1602   if (field_converters)
1603   {
1604     for (uint i=0; i < n_field_converters; i++)
1605       if (field_converters[i])
1606       {
1607         DBUG_ASSERT(!dyncol_set || i != dyncol_field);
1608         delete field_converters[i];
1609       }
1610     my_free(field_converters);
1611     field_converters= NULL;
1612   }
1613 }
1614 
1615 
index_init(uint idx,bool sorted)1616 int ha_cassandra::index_init(uint idx, bool sorted)
1617 {
1618   int ires;
1619   if (!se && (ires= connect_and_check_options(table)))
1620     return ires;
1621   return 0;
1622 }
1623 
1624 void store_key_image_to_rec(Field *field, uchar *ptr, uint len);
1625 
index_read_map(uchar * buf,const uchar * key,key_part_map keypart_map,enum ha_rkey_function find_flag)1626 int ha_cassandra::index_read_map(uchar *buf, const uchar *key,
1627                                  key_part_map keypart_map,
1628                                  enum ha_rkey_function find_flag)
1629 {
1630   int rc= 0;
1631   DBUG_ENTER("ha_cassandra::index_read_map");
1632 
1633   if (find_flag != HA_READ_KEY_EXACT)
1634   {
1635     DBUG_ASSERT(0); /* Non-equality lookups should never be done */
1636     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1637   }
1638 
1639   uint key_len= calculate_key_len(table, active_index, key, keypart_map);
1640   store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
1641 
1642   char *cass_key;
1643   int cass_key_len;
1644   MY_BITMAP *old_map;
1645 
1646   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1647 
1648   if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1649   {
1650     /* We get here when making lookups like uuid_column='not-an-uuid' */
1651     dbug_tmp_restore_column_map(&table->read_set, old_map);
1652     DBUG_RETURN(HA_ERR_KEY_NOT_FOUND);
1653   }
1654 
1655   dbug_tmp_restore_column_map(&table->read_set, old_map);
1656 
1657   bool found;
1658   if (se->get_slice(cass_key, cass_key_len, &found))
1659   {
1660     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1661     rc= HA_ERR_INTERNAL_ERROR;
1662   }
1663 
1664   /* TODO: what if we're not reading all columns?? */
1665   if (!found)
1666     rc= HA_ERR_KEY_NOT_FOUND;
1667   else
1668     rc= read_cassandra_columns(false);
1669 
1670   DBUG_RETURN(rc);
1671 }
1672 
1673 
print_conversion_error(const char * field_name,char * cass_value,int cass_value_len)1674 void ha_cassandra::print_conversion_error(const char *field_name,
1675                                           char *cass_value,
1676                                           int cass_value_len)
1677 {
1678   char buf[32];
1679   char *p= cass_value;
1680   size_t i= 0;
1681   for (; (i < sizeof(buf)-1) && (p < cass_value + cass_value_len); p++)
1682   {
1683     buf[i++]= map2number[(*p >> 4) & 0xF];
1684     buf[i++]= map2number[*p & 0xF];
1685   }
1686   buf[i]=0;
1687 
1688   se->print_error("Unable to convert value for field `%s` from Cassandra's data"
1689                   " format. Source data is %d bytes, 0x%s%s",
1690                   field_name, cass_value_len, buf,
1691                   (i == sizeof(buf) - 1)? "..." : "");
1692   my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1693 }
1694 
1695 
1696 
get_cassandra_field_def(char * cass_name,int cass_name_len)1697 CASSANDRA_TYPE_DEF * ha_cassandra::get_cassandra_field_def(char *cass_name,
1698                                                            int cass_name_len)
1699 {
1700   CASSANDRA_TYPE_DEF *type= default_type_def;
1701   for(uint i= 0; i < n_special_type_fields; i++)
1702   {
1703     if (cass_name_len == (int)special_type_field_names[i].length &&
1704         memcmp(cass_name, special_type_field_names[i].str,
1705                cass_name_len) == 0)
1706     {
1707       type= special_type_field_converters + i;
1708       break;
1709     }
1710   }
1711   return type;
1712 }
1713 
read_cassandra_columns(bool unpack_pk)1714 int ha_cassandra::read_cassandra_columns(bool unpack_pk)
1715 {
1716   MEM_ROOT strings_root;
1717   char *cass_name;
1718   char *cass_value;
1719   int cass_value_len, cass_name_len;
1720   Field **field;
1721   int res= 0;
1722   ulong total_name_len= 0;
1723 
1724   clear_alloc_root(&strings_root);
1725   /*
1726     cassandra_to_mariadb() calls will use field->store(...) methods, which
1727     require that the column is in the table->write_set
1728   */
1729   MY_BITMAP *old_map;
1730   old_map= dbug_tmp_use_all_columns(table, &table->write_set);
1731 
1732   /* Start with all fields being NULL */
1733   for (field= table->field + 1; *field; field++)
1734     (*field)->set_null();
1735 
1736   while (!se->get_next_read_column(&cass_name, &cass_name_len,
1737                                    &cass_value, &cass_value_len))
1738   {
1739     // map to our column. todo: use hash or something..
1740     bool found= 0;
1741     for (field= table->field + 1; *field; field++)
1742     {
1743       uint fieldnr= (*field)->field_index;
1744       if ((!dyncol_set || dyncol_field != fieldnr) &&
1745           !strcmp((*field)->field_name.str, cass_name))
1746       {
1747         found= 1;
1748         (*field)->set_notnull();
1749         if (field_converters[fieldnr]->cassandra_to_mariadb(cass_value,
1750                                                             cass_value_len))
1751         {
1752           print_conversion_error((*field)->field_name.str, cass_value,
1753                                  cass_value_len);
1754           res=1;
1755           goto err;
1756         }
1757         break;
1758       }
1759     }
1760     if (dyncol_set && !found)
1761     {
1762       DYNAMIC_COLUMN_VALUE val;
1763       LEX_STRING nm;
1764       CASSANDRA_TYPE_DEF *type= get_cassandra_field_def(cass_name,
1765                                                         cass_name_len);
1766       nm.str= cass_name;
1767       nm.length= cass_name_len;
1768       if (nm.length > MAX_NAME_LENGTH)
1769       {
1770         se->print_error("Unable to convert value for field `%s`"
1771                         " from Cassandra's data format. Name"
1772                         " length exceed limit of %u: '%s'",
1773                         table->field[dyncol_field]->field_name.str,
1774                         (uint)MAX_NAME_LENGTH, cass_name);
1775         my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1776         res=1;
1777         goto err;
1778       }
1779       total_name_len+= cass_name_len;
1780       if (nm.length > MAX_TOTAL_NAME_LENGTH)
1781       {
1782         se->print_error("Unable to convert value for field `%s`"
1783                         " from Cassandra's data format. Sum of all names"
1784                         " length exceed limit of %lu",
1785                         table->field[dyncol_field]->field_name.str,
1786                         cass_name, (uint)MAX_TOTAL_NAME_LENGTH);
1787         my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
1788         res=1;
1789         goto err;
1790       }
1791 
1792       if ((res= (*(type->cassandra_to_dynamic))(cass_value,
1793                                                 cass_value_len, &val,
1794                                                 &strings_root)) ||
1795           insert_dynamic(&dynamic_names, (uchar *) &nm) ||
1796           insert_dynamic(&dynamic_values, (uchar *) &val))
1797       {
1798         if (res)
1799         {
1800           print_conversion_error(cass_name, cass_value, cass_value_len);
1801         }
1802         free_strings_memroot(&strings_root);
1803         // EOM shouldm be already reported if happened
1804         res= 1;
1805         goto err;
1806       }
1807     }
1808   }
1809 
1810   dynamic_rec.length= 0;
1811   if (dyncol_set)
1812   {
1813     if (mariadb_dyncol_create_many_named(&dynamic_rec,
1814                                          dynamic_names.elements,
1815                                          (LEX_STRING *)dynamic_names.buffer,
1816                                          (DYNAMIC_COLUMN_VALUE *)
1817                                          dynamic_values.buffer,
1818                                          FALSE) < 0)
1819       dynamic_rec.length= 0;
1820 
1821     free_strings_memroot(&strings_root);
1822     dynamic_values.elements= dynamic_names.elements= 0;
1823 
1824     if (dynamic_rec.length == 0)
1825       table->field[dyncol_field]->set_null();
1826     else
1827     {
1828       Field_blob *blob= (Field_blob *)table->field[dyncol_field];
1829       blob->set_notnull();
1830       blob->store_length(dynamic_rec.length);
1831       *((char **)(((char *)blob->ptr) + blob->pack_length_no_ptr()))=
1832         dynamic_rec.str;
1833     }
1834   }
1835 
1836   if (unpack_pk)
1837   {
1838     /* Unpack rowkey to primary key */
1839     field= table->field;
1840     (*field)->set_notnull();
1841     se->get_read_rowkey(&cass_value, &cass_value_len);
1842     if (rowkey_converter->cassandra_to_mariadb(cass_value, cass_value_len))
1843     {
1844       print_conversion_error((*field)->field_name.str, cass_value, cass_value_len);
1845       res=1;
1846       goto err;
1847     }
1848   }
1849 
1850 err:
1851   dbug_tmp_restore_column_map(&table->write_set, old_map);
1852   return res;
1853 }
1854 
read_dyncol(uint * count,DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names,String * valcol)1855 int ha_cassandra::read_dyncol(uint *count,
1856                               DYNAMIC_COLUMN_VALUE **vals,
1857                               LEX_STRING **names,
1858                               String *valcol)
1859 {
1860   String *strcol;
1861   DYNAMIC_COLUMN col;
1862 
1863   enum enum_dyncol_func_result rc;
1864   DBUG_ENTER("ha_cassandra::read_dyncol");
1865 
1866   Field *field= table->field[dyncol_field];
1867   DBUG_ASSERT(field->type() == MYSQL_TYPE_BLOB);
1868   /* It is blob and it does not use buffer */
1869   strcol= field->val_str(NULL, valcol);
1870   if (field->is_null())
1871   {
1872     *count= 0;
1873     *names= 0;
1874     *vals= 0;
1875     DBUG_RETURN(0); // nothing to write
1876   }
1877   /*
1878     dynamic_column_vals only read the string so we can
1879     cheat here with assignment
1880   */
1881   bzero(&col, sizeof(col));
1882   col.str= (char *)strcol->ptr();
1883   col.length= strcol->length();
1884   if ((rc= mariadb_dyncol_unpack(&col, count, names, vals)) < 0)
1885   {
1886     dynamic_column_error_message(rc);
1887     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1888   }
1889   DBUG_RETURN(0);
1890 }
1891 
write_dynamic_row(uint count,DYNAMIC_COLUMN_VALUE * vals,LEX_STRING * names)1892 int ha_cassandra::write_dynamic_row(uint count,
1893                                     DYNAMIC_COLUMN_VALUE *vals,
1894                                     LEX_STRING *names)
1895 {
1896   uint i;
1897   DBUG_ENTER("ha_cassandra::write_dynamic_row");
1898   DBUG_ASSERT(dyncol_set);
1899 
1900 
1901   for (i= 0; i < count; i++)
1902   {
1903     char buff[16];
1904     CASSANDRA_TYPE_DEF *type;
1905     void *freemem= NULL;
1906     char *cass_data;
1907     int cass_data_len;
1908 
1909     DBUG_PRINT("info", ("field %*s", (int)names[i].length, names[i].str));
1910     type= get_cassandra_field_def(names[i].str, (int) names[i].length);
1911     if ((*type->dynamic_to_cassandra)(vals +i, &cass_data, &cass_data_len,
1912                                       buff, &freemem))
1913     {
1914       my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1915                names[i].str, insert_lineno);
1916       DBUG_RETURN(HA_ERR_GENERIC);
1917     }
1918     se->add_insert_column(names[i].str, names[i].length,
1919                           cass_data, cass_data_len);
1920     if (freemem)
1921       my_free(freemem);
1922   }
1923   DBUG_RETURN(0);
1924 }
1925 
free_dynamic_row(DYNAMIC_COLUMN_VALUE ** vals,LEX_STRING ** names)1926 void ha_cassandra::free_dynamic_row(DYNAMIC_COLUMN_VALUE **vals,
1927                                     LEX_STRING **names)
1928 {
1929   mariadb_dyncol_unpack_free(*names, *vals);
1930   *vals= 0;
1931   *names= 0;
1932 }
1933 
write_row(const uchar * buf)1934 int ha_cassandra::write_row(const uchar *buf)
1935 {
1936   MY_BITMAP *old_map;
1937   int ires;
1938   DBUG_ENTER("ha_cassandra::write_row");
1939 
1940   if (!se && (ires= connect_and_check_options(table)))
1941     DBUG_RETURN(ires);
1942 
1943   if (!doing_insert_batch)
1944     se->clear_insert_buffer();
1945 
1946   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
1947 
1948   insert_lineno++;
1949 
1950   /* Convert the key */
1951   char *cass_key;
1952   int cass_key_len;
1953   if (rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len))
1954   {
1955     my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1956              rowkey_converter->field->field_name.str, insert_lineno);
1957     dbug_tmp_restore_column_map(&table->read_set, old_map);
1958     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1959   }
1960   se->start_row_insert(cass_key, cass_key_len);
1961 
1962   /* Convert other fields */
1963   for (uint i= 1; i < table->s->fields; i++)
1964   {
1965     char *cass_data;
1966     int cass_data_len;
1967     if (dyncol_set && dyncol_field == i)
1968     {
1969       String valcol;
1970       DYNAMIC_COLUMN_VALUE *vals;
1971       LEX_STRING *names;
1972       uint count;
1973       int rc;
1974       DBUG_ASSERT(field_converters[i] == NULL);
1975       if (!(rc= read_dyncol(&count, &vals, &names, &valcol)))
1976         rc= write_dynamic_row(count, vals, names);
1977       free_dynamic_row(&vals, &names);
1978       if (rc)
1979       {
1980         dbug_tmp_restore_column_map(&table->read_set, old_map);
1981         DBUG_RETURN(rc);
1982       }
1983     }
1984     else
1985     {
1986       if (field_converters[i]->mariadb_to_cassandra(&cass_data,
1987                                                     &cass_data_len))
1988       {
1989         my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
1990                  field_converters[i]->field->field_name.str, insert_lineno);
1991         dbug_tmp_restore_column_map(&table->read_set, old_map);
1992         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
1993       }
1994       se->add_insert_column(field_converters[i]->field->field_name.str, 0,
1995                             cass_data, cass_data_len);
1996     }
1997   }
1998 
1999   dbug_tmp_restore_column_map(&table->read_set, old_map);
2000 
2001   bool res;
2002 
2003   if (doing_insert_batch)
2004   {
2005     res= 0;
2006     if (++insert_rows_batched >= THDVAR(table->in_use, insert_batch_size))
2007     {
2008       res= se->do_insert();
2009       insert_rows_batched= 0;
2010     }
2011   }
2012   else
2013     res= se->do_insert();
2014 
2015   if (res)
2016     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2017 
2018   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2019 }
2020 
2021 
start_bulk_insert(ha_rows rows,uint flags)2022 void ha_cassandra::start_bulk_insert(ha_rows rows, uint flags)
2023 {
2024   int ires;
2025   if (!se && (ires= connect_and_check_options(table)))
2026     return;
2027 
2028   doing_insert_batch= true;
2029   insert_rows_batched= 0;
2030 
2031   se->clear_insert_buffer();
2032 }
2033 
2034 
end_bulk_insert()2035 int ha_cassandra::end_bulk_insert()
2036 {
2037   DBUG_ENTER("ha_cassandra::end_bulk_insert");
2038 
2039   if (!doing_insert_batch)
2040   {
2041     /* SQL layer can make end_bulk_insert call without start_bulk_insert call */
2042     DBUG_RETURN(0);
2043   }
2044 
2045   /* Flush out the insert buffer */
2046   doing_insert_batch= false;
2047   bool bres= se->do_insert();
2048   se->clear_insert_buffer();
2049 
2050   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2051 }
2052 
2053 
rnd_init(bool scan)2054 int ha_cassandra::rnd_init(bool scan)
2055 {
2056   bool bres;
2057   int ires;
2058   DBUG_ENTER("ha_cassandra::rnd_init");
2059 
2060   if (!se && (ires= connect_and_check_options(table)))
2061     DBUG_RETURN(ires);
2062 
2063   if (!scan)
2064   {
2065     /* Prepare for rnd_pos() calls. We don't need to anything. */
2066     DBUG_RETURN(0);
2067   }
2068 
2069   if (dyncol_set)
2070   {
2071     se->clear_read_all_columns();
2072   }
2073   else
2074   {
2075     se->clear_read_columns();
2076     for (uint i= 1; i < table->s->fields; i++)
2077       se->add_read_column(table->field[i]->field_name.str);
2078   }
2079 
2080   se->read_batch_size= THDVAR(table->in_use, rnd_batch_size);
2081   bres= se->get_range_slices(false);
2082   if (bres)
2083     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2084 
2085   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2086 }
2087 
2088 
rnd_end()2089 int ha_cassandra::rnd_end()
2090 {
2091   DBUG_ENTER("ha_cassandra::rnd_end");
2092 
2093   se->finish_reading_range_slices();
2094   DBUG_RETURN(0);
2095 }
2096 
2097 
rnd_next(uchar * buf)2098 int ha_cassandra::rnd_next(uchar *buf)
2099 {
2100   int rc;
2101   bool reached_eof;
2102   DBUG_ENTER("ha_cassandra::rnd_next");
2103 
2104   // Unpack and return the next record.
2105   if (se->get_next_range_slice_row(&reached_eof))
2106   {
2107     rc= HA_ERR_INTERNAL_ERROR;
2108   }
2109   else
2110   {
2111     if (reached_eof)
2112       rc= HA_ERR_END_OF_FILE;
2113     else
2114       rc= read_cassandra_columns(true);
2115   }
2116 
2117   DBUG_RETURN(rc);
2118 }
2119 
2120 
delete_all_rows()2121 int ha_cassandra::delete_all_rows()
2122 {
2123   bool bres;
2124   int ires;
2125   DBUG_ENTER("ha_cassandra::delete_all_rows");
2126 
2127   if (!se && (ires= connect_and_check_options(table)))
2128     DBUG_RETURN(ires);
2129 
2130   bres= se->truncate();
2131 
2132   if (bres)
2133     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2134 
2135   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2136 }
2137 
2138 
delete_row(const uchar * buf)2139 int ha_cassandra::delete_row(const uchar *buf)
2140 {
2141   bool bres;
2142   DBUG_ENTER("ha_cassandra::delete_row");
2143 
2144   bres= se->remove_row();
2145 
2146   if (bres)
2147     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2148 
2149   DBUG_RETURN(bres? HA_ERR_INTERNAL_ERROR: 0);
2150 }
2151 
2152 
info(uint flag)2153 int ha_cassandra::info(uint flag)
2154 {
2155   DBUG_ENTER("ha_cassandra::info");
2156 
2157   if (!table)
2158     return 1;
2159 
2160   if (flag & HA_STATUS_VARIABLE)
2161   {
2162     stats.records= 1000;
2163     stats.deleted= 0;
2164   }
2165   if (flag & HA_STATUS_CONST)
2166   {
2167     ref_length= table->field[0]->key_length();
2168   }
2169 
2170   DBUG_RETURN(0);
2171 }
2172 
2173 
2174 void key_copy(uchar *to_key, const uchar *from_record, KEY *key_info,
2175               uint key_length, bool with_zerofill);
2176 
2177 
position(const uchar * record)2178 void ha_cassandra::position(const uchar *record)
2179 {
2180   DBUG_ENTER("ha_cassandra::position");
2181 
2182   /* Copy the primary key to rowid */
2183   key_copy(ref, (uchar*)record, &table->key_info[0],
2184            table->field[0]->key_length(), true);
2185 
2186   DBUG_VOID_RETURN;
2187 }
2188 
2189 
rnd_pos(uchar * buf,uchar * pos)2190 int ha_cassandra::rnd_pos(uchar *buf, uchar *pos)
2191 {
2192   int rc;
2193   DBUG_ENTER("ha_cassandra::rnd_pos");
2194 
2195   int save_active_index= active_index;
2196   active_index= 0; /* The primary key */
2197   rc= index_read_map(buf, pos, key_part_map(1), HA_READ_KEY_EXACT);
2198 
2199   active_index= save_active_index;
2200 
2201   DBUG_RETURN(rc);
2202 }
2203 
2204 
reset()2205 int ha_cassandra::reset()
2206 {
2207   doing_insert_batch= false;
2208   insert_lineno= 0;
2209   if (se)
2210   {
2211     se->set_consistency_levels(THDVAR(table->in_use, read_consistency),
2212                                THDVAR(table->in_use, write_consistency));
2213     se->set_n_retries(THDVAR(table->in_use, failure_retries));
2214   }
2215   return 0;
2216 }
2217 
2218 /////////////////////////////////////////////////////////////////////////////
2219 // MRR implementation
2220 /////////////////////////////////////////////////////////////////////////////
2221 
2222 
2223 /*
2224  - The key can be only primary key
2225   - allow equality-ranges only.
2226   - anything else?
2227 */
multi_range_read_info_const(uint keyno,RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint * bufsz,uint * flags,Cost_estimate * cost)2228 ha_rows ha_cassandra::multi_range_read_info_const(uint keyno, RANGE_SEQ_IF *seq,
2229                                                   void *seq_init_param,
2230                                                   uint n_ranges, uint *bufsz,
2231                                                   uint *flags, Cost_estimate *cost)
2232 {
2233   /* No support for const ranges so far */
2234   return HA_POS_ERROR;
2235 }
2236 
2237 
multi_range_read_info(uint keyno,uint n_ranges,uint keys,uint key_parts,uint * bufsz,uint * flags,Cost_estimate * cost)2238 ha_rows ha_cassandra::multi_range_read_info(uint keyno, uint n_ranges, uint keys,
2239                               uint key_parts, uint *bufsz,
2240                               uint *flags, Cost_estimate *cost)
2241 {
2242   /* Can only be equality lookups on the primary key... */
2243   // TODO anything else?
2244   *flags &= ~HA_MRR_USE_DEFAULT_IMPL;
2245   *flags |= HA_MRR_NO_ASSOCIATION;
2246 
2247   return 10;
2248 }
2249 
2250 
multi_range_read_init(RANGE_SEQ_IF * seq,void * seq_init_param,uint n_ranges,uint mode,HANDLER_BUFFER * buf)2251 int ha_cassandra::multi_range_read_init(RANGE_SEQ_IF *seq, void *seq_init_param,
2252                           uint n_ranges, uint mode, HANDLER_BUFFER *buf)
2253 {
2254   int res;
2255   mrr_iter= seq->init(seq_init_param, n_ranges, mode);
2256   mrr_funcs= *seq;
2257   res= mrr_start_read();
2258   return (res? HA_ERR_INTERNAL_ERROR: 0);
2259 }
2260 
2261 
mrr_start_read()2262 bool ha_cassandra::mrr_start_read()
2263 {
2264   uint key_len;
2265 
2266   MY_BITMAP *old_map;
2267   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2268 
2269   se->new_lookup_keys();
2270 
2271   while (!(source_exhausted= mrr_funcs.next(mrr_iter, &mrr_cur_range)))
2272   {
2273     char *cass_key;
2274     int cass_key_len;
2275 
2276     DBUG_ASSERT(mrr_cur_range.range_flag & EQ_RANGE);
2277 
2278     uchar *key= (uchar*)mrr_cur_range.start_key.key;
2279     key_len= mrr_cur_range.start_key.length;
2280     //key_len= calculate_key_len(table, active_index, key, keypart_map); // NEED THIS??
2281     store_key_image_to_rec(table->field[0], (uchar*)key, key_len);
2282 
2283     rowkey_converter->mariadb_to_cassandra(&cass_key, &cass_key_len);
2284 
2285     // Primitive buffer control
2286     if ((ulong) se->add_lookup_key(cass_key, cass_key_len) >
2287         THDVAR(table->in_use, multiget_batch_size))
2288       break;
2289   }
2290 
2291   dbug_tmp_restore_column_map(&table->read_set, old_map);
2292 
2293   return se->multiget_slice();
2294 }
2295 
2296 
multi_range_read_next(range_id_t * range_info)2297 int ha_cassandra::multi_range_read_next(range_id_t *range_info)
2298 {
2299   int res;
2300   while(1)
2301   {
2302     if (!se->get_next_multiget_row())
2303     {
2304       res= read_cassandra_columns(true);
2305       break;
2306     }
2307     else
2308     {
2309       if (source_exhausted)
2310       {
2311         res= HA_ERR_END_OF_FILE;
2312         break;
2313       }
2314       else
2315       {
2316         if (mrr_start_read())
2317         {
2318           res= HA_ERR_INTERNAL_ERROR;
2319           break;
2320         }
2321       }
2322     }
2323     /*
2324       We get here if we've refilled the buffer and done another read. Try
2325       reading from results again
2326     */
2327   }
2328   return res;
2329 }
2330 
2331 
multi_range_read_explain_info(uint mrr_mode,char * str,size_t size)2332 int ha_cassandra::multi_range_read_explain_info(uint mrr_mode, char *str, size_t size)
2333 {
2334   const char *mrr_str= "multiget_slice";
2335 
2336   if (!(mrr_mode & HA_MRR_USE_DEFAULT_IMPL))
2337   {
2338     uint mrr_str_len= strlen(mrr_str);
2339     uint copy_len= MY_MIN(mrr_str_len, size);
2340     memcpy(str, mrr_str, size);
2341     return copy_len;
2342   }
2343   return 0;
2344 }
2345 
2346 
2347 class Column_name_enumerator_impl : public Column_name_enumerator
2348 {
2349   ha_cassandra *obj;
2350   uint idx;
2351 public:
Column_name_enumerator_impl(ha_cassandra * obj_arg)2352   Column_name_enumerator_impl(ha_cassandra *obj_arg) : obj(obj_arg), idx(1) {}
get_next_name()2353   const char* get_next_name()
2354   {
2355     if (idx == obj->table->s->fields)
2356       return NULL;
2357     else
2358       return obj->table->field[idx++]->field_name.str;
2359   }
2360 };
2361 
2362 
update_row(const uchar * old_data,const uchar * new_data)2363 int ha_cassandra::update_row(const uchar *old_data, const uchar *new_data)
2364 {
2365   DYNAMIC_COLUMN_VALUE *oldvals, *vals;
2366   LEX_STRING *oldnames, *names;
2367   uint oldcount, count;
2368   String oldvalcol, valcol;
2369   MY_BITMAP *old_map;
2370   int res;
2371   DBUG_ENTER("ha_cassandra::update_row");
2372   /* Currently, it is guaranteed that new_data == table->record[0] */
2373   DBUG_ASSERT(new_data == table->record[0]);
2374   /* For now, just rewrite the full record */
2375   se->clear_insert_buffer();
2376 
2377   old_map= dbug_tmp_use_all_columns(table, &table->read_set);
2378 
2379   char *old_key;
2380   int old_key_len;
2381   se->get_read_rowkey(&old_key, &old_key_len);
2382 
2383   /* Get the key we're going to write */
2384   char *new_key;
2385   int new_key_len;
2386   if (rowkey_converter->mariadb_to_cassandra(&new_key, &new_key_len))
2387   {
2388     my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2389              rowkey_converter->field->field_name.str, insert_lineno);
2390     dbug_tmp_restore_column_map(&table->read_set, old_map);
2391     DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2392   }
2393 
2394   /*
2395     Compare it to the key we've read. For all types that Cassandra supports,
2396     binary byte-wise comparison can be used
2397   */
2398   bool new_primary_key;
2399   if (new_key_len != old_key_len || memcmp(old_key, new_key, new_key_len))
2400     new_primary_key= true;
2401   else
2402     new_primary_key= false;
2403 
2404   if (dyncol_set)
2405   {
2406     Field *field= table->field[dyncol_field];
2407     /* move to get old_data */
2408     my_ptrdiff_t diff;
2409     diff= (my_ptrdiff_t) (old_data - new_data);
2410     field->move_field_offset(diff);      // Points now at old_data
2411     if ((res= read_dyncol(&oldcount, &oldvals, &oldnames, &oldvalcol)))
2412       DBUG_RETURN(res);
2413     field->move_field_offset(-diff);     // back to new_data
2414     if ((res= read_dyncol(&count, &vals, &names, &valcol)))
2415     {
2416       free_dynamic_row(&oldvals, &oldnames);
2417       DBUG_RETURN(res);
2418     }
2419   }
2420 
2421   if (new_primary_key)
2422   {
2423     /*
2424       Primary key value changed. This is essentially a DELETE + INSERT.
2425       Add a DELETE operation into the batch
2426     */
2427     Column_name_enumerator_impl name_enumerator(this);
2428     se->add_row_deletion(old_key, old_key_len, &name_enumerator,
2429                          oldnames,
2430                          (dyncol_set ? oldcount : 0));
2431     oldcount= 0; // they will be deleted
2432   }
2433 
2434   se->start_row_insert(new_key, new_key_len);
2435 
2436   /* Convert other fields */
2437   for (uint i= 1; i < table->s->fields; i++)
2438   {
2439     char *cass_data;
2440     int cass_data_len;
2441     if (dyncol_set && dyncol_field == i)
2442     {
2443       DBUG_ASSERT(field_converters[i] == NULL);
2444       if ((res= write_dynamic_row(count, vals, names)))
2445         goto err;
2446     }
2447     else
2448     {
2449       if (field_converters[i]->mariadb_to_cassandra(&cass_data, &cass_data_len))
2450       {
2451         my_error(ER_WARN_DATA_OUT_OF_RANGE, MYF(0),
2452                  field_converters[i]->field->field_name.str, insert_lineno);
2453         dbug_tmp_restore_column_map(&table->read_set, old_map);
2454         DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
2455       }
2456       se->add_insert_column(field_converters[i]->field->field_name.str, 0,
2457                             cass_data, cass_data_len);
2458     }
2459   }
2460   if (dyncol_set)
2461   {
2462     /* find removed fields */
2463     uint i= 0, j= 0;
2464     /* both array are sorted */
2465     for(; i < oldcount; i++)
2466     {
2467       int scmp= 0;
2468       while (j < count &&
2469              (scmp = mariadb_dyncol_column_cmp_named(names + j,
2470                                                      oldnames + i)) < 0)
2471         j++;
2472       if (j < count &&
2473           scmp == 0)
2474         j++;
2475       else
2476         se->add_insert_delete_column(oldnames[i].str, oldnames[i].length);
2477     }
2478   }
2479 
2480   dbug_tmp_restore_column_map(&table->read_set, old_map);
2481 
2482   res= se->do_insert();
2483 
2484   if (res)
2485     my_error(ER_INTERNAL_ERROR, MYF(0), se->error_str());
2486 
2487 err:
2488   if (dyncol_set)
2489   {
2490     free_dynamic_row(&oldvals, &oldnames);
2491     free_dynamic_row(&vals, &names);
2492   }
2493 
2494   DBUG_RETURN(res? HA_ERR_INTERNAL_ERROR: 0);
2495 }
2496 
2497 
2498 /*
2499   We can't really have any locks for Cassandra Storage Engine. We're reading
2500   from Cassandra cluster, and other clients can asynchronously modify the data.
2501 
2502   We can enforce locking within this process, but this will not be useful.
2503 
2504   Thus, store_lock() should express that:
2505   - Writes do not block other writes
2506   - Reads should not block anything either, including INSERTs.
2507 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)2508 THR_LOCK_DATA **ha_cassandra::store_lock(THD *thd,
2509                                          THR_LOCK_DATA **to,
2510                                          enum thr_lock_type lock_type)
2511 {
2512   DBUG_ENTER("ha_cassandra::store_lock");
2513   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
2514   {
2515     /* Writes allow other writes */
2516     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
2517          lock_type <= TL_WRITE))
2518       lock_type = TL_WRITE_ALLOW_WRITE;
2519 
2520     /* Reads allow everything, including INSERTs */
2521     if (lock_type == TL_READ_NO_INSERT)
2522       lock_type = TL_READ;
2523 
2524     lock.type= lock_type;
2525   }
2526   *to++= &lock;
2527   DBUG_RETURN(to);
2528 }
2529 
2530 
records_in_range(uint inx,key_range * min_key,key_range * max_key)2531 ha_rows ha_cassandra::records_in_range(uint inx, key_range *min_key,
2532                                        key_range *max_key)
2533 {
2534   DBUG_ENTER("ha_cassandra::records_in_range");
2535   DBUG_RETURN(HA_POS_ERROR); /* Range scans are not supported */
2536 }
2537 
2538 
2539 /**
2540   check_if_incompatible_data() called if ALTER TABLE can't detect otherwise
2541   if new and old definition are compatible
2542 
2543   @details If there are no other explicit signs like changed number of
2544   fields this function will be called by compare_tables()
2545   (sql/sql_tables.cc) to decide should we rewrite whole table or only .frm
2546   file.
2547 
2548 */
2549 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)2550 bool ha_cassandra::check_if_incompatible_data(HA_CREATE_INFO *info,
2551                                             uint table_changes)
2552 {
2553   DBUG_ENTER("ha_cassandra::check_if_incompatible_data");
2554   /* Checked, we intend to have this empty for Cassandra SE. */
2555   DBUG_RETURN(COMPATIBLE_DATA_YES);
2556 }
2557 
2558 
print_error(const char * format,...)2559 void Cassandra_se_interface::print_error(const char *format, ...)
2560 {
2561   va_list ap;
2562   va_start(ap, format);
2563   // it's not a problem if output was truncated
2564   my_vsnprintf(err_buffer, sizeof(err_buffer), format, ap);
2565   va_end(ap);
2566 }
2567 
2568 
2569 struct st_mysql_storage_engine cassandra_storage_engine=
2570 { MYSQL_HANDLERTON_INTERFACE_VERSION };
2571 
2572 static SHOW_VAR cassandra_status_variables[]= {
2573   {"row_inserts",
2574     (char*) &cassandra_counters.row_inserts,         SHOW_LONG},
2575   {"row_insert_batches",
2576     (char*) &cassandra_counters.row_insert_batches,  SHOW_LONG},
2577 
2578   {"multiget_keys_scanned",
2579     (char*) &cassandra_counters.multiget_keys_scanned, SHOW_LONG},
2580   {"multiget_reads",
2581     (char*) &cassandra_counters.multiget_reads,      SHOW_LONG},
2582   {"multiget_rows_read",
2583     (char*) &cassandra_counters.multiget_rows_read,  SHOW_LONG},
2584 
2585   {"network_exceptions",
2586     (char*) &cassandra_counters.network_exceptions, SHOW_LONG},
2587   {"timeout_exceptions",
2588     (char*) &cassandra_counters.timeout_exceptions, SHOW_LONG},
2589   {"unavailable_exceptions",
2590     (char*) &cassandra_counters.unavailable_exceptions, SHOW_LONG},
2591   {NullS, NullS, SHOW_LONG}
2592 };
2593 
2594 
2595 
2596 
maria_declare_plugin(cassandra)2597 maria_declare_plugin(cassandra)
2598 {
2599   MYSQL_STORAGE_ENGINE_PLUGIN,
2600   &cassandra_storage_engine,
2601   "CASSANDRA",
2602   "Monty Program Ab",
2603   "Cassandra storage engine",
2604   PLUGIN_LICENSE_GPL,
2605   cassandra_init_func,                            /* Plugin Init */
2606   cassandra_done_func,                            /* Plugin Deinit */
2607   0x0001,                                        /* version number (0.1) */
2608   cassandra_status_variables,                     /* status variables */
2609   cassandra_system_variables,                     /* system variables */
2610   "0.1",                                        /* string version */
2611   MariaDB_PLUGIN_MATURITY_EXPERIMENTAL          /* maturity */
2612 }
2613 maria_declare_plugin_end;
2614