1 
2 // vim:sw=2:ai
3 
4 /*
5  * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6  * See COPYRIGHT.txt for details.
7  */
8 
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12 
13 #include "database.hpp"
14 #include "string_util.hpp"
15 #include "escape.hpp"
16 #include "mysql_incl.hpp"
17 
18 #define DBG_KEY(x)
19 #define DBG_SHUT(x)
20 #define DBG_LOCK(x)
21 #define DBG_THR(x)
22 #define DBG_CMP(x)
23 #define DBG_FLD(x)
24 #define DBG_FILTER(x)
25 #define DBG_REFCNT(x)
26 #define DBG_KEYLEN(x)
27 #define DBG_DELETED
28 
29 /* status variables */
30 unsigned long long int open_tables_count;
31 unsigned long long int close_tables_count;
32 unsigned long long int lock_tables_count;
33 unsigned long long int unlock_tables_count;
34 unsigned long long int index_exec_count;
35 
36 namespace dena {
37 
prep_stmt()38 prep_stmt::prep_stmt()
39   : dbctx(0), table_id(static_cast<size_t>(-1)),
40     idxnum(static_cast<size_t>(-1))
41 {
42 }
prep_stmt(dbcontext_i * c,size_t tbl,size_t idx,const fields_type & rf,const fields_type & ff)43 prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
44   const fields_type& rf, const fields_type& ff)
45   : dbctx(c), table_id(tbl), idxnum(idx), ret_fields(rf), filter_fields(ff)
46 {
47   if (dbctx) {
48     dbctx->table_addref(table_id);
49   }
50 }
~prep_stmt()51 prep_stmt::~prep_stmt()
52 {
53   if (dbctx) {
54     dbctx->table_release(table_id);
55   }
56 }
57 
prep_stmt(const prep_stmt & x)58 prep_stmt::prep_stmt(const prep_stmt& x)
59   : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
60   ret_fields(x.ret_fields), filter_fields(x.filter_fields)
61 {
62   if (dbctx) {
63     dbctx->table_addref(table_id);
64   }
65 }
66 
67 prep_stmt&
operator =(const prep_stmt & x)68 prep_stmt::operator =(const prep_stmt& x)
69 {
70   if (this != &x) {
71     if (dbctx) {
72       dbctx->table_release(table_id);
73     }
74     dbctx = x.dbctx;
75     table_id = x.table_id;
76     idxnum = x.idxnum;
77     ret_fields = x.ret_fields;
78     filter_fields = x.filter_fields;
79     if (dbctx) {
80       dbctx->table_addref(table_id);
81     }
82   }
83   return *this;
84 }
85 
86 struct database : public database_i, private noncopyable {
87   database(const config& c);
88   virtual ~database();
89   virtual dbcontext_ptr create_context(bool for_write) volatile;
90   virtual void stop() volatile;
91   virtual const config& get_conf() const volatile;
92  public:
93   int child_running;
94  private:
95   config conf;
96 };
97 
98 struct tablevec_entry {
99   TABLE *table;
100   size_t refcount;
101   bool modified;
tablevec_entrydena::tablevec_entry102   tablevec_entry() : table(0), refcount(0), modified(false) { }
103 };
104 
105 struct expr_user_lock : private noncopyable {
expr_user_lockdena::expr_user_lock106   expr_user_lock(THD *thd, int timeout)
107     : lck_key("handlersocket_wr", 16, &my_charset_latin1),
108       lck_timeout(timeout),
109       lck_func_get_lock(&lck_key, &lck_timeout),
110       lck_func_release_lock(&lck_key)
111   {
112     lck_key.fix_fields(thd, 0);
113     lck_timeout.fix_fields(thd, 0);
114     lck_func_get_lock.fix_fields(thd, 0);
115     lck_func_release_lock.fix_fields(thd, 0);
116   }
get_lockdena::expr_user_lock117   long long get_lock() {
118     return lck_func_get_lock.val_int();
119   }
release_lockdena::expr_user_lock120   long long release_lock() {
121     return lck_func_release_lock.val_int();
122   }
123  private:
124   Item_string lck_key;
125   Item_int lck_timeout;
126   Item_func_get_lock lck_func_get_lock;
127   Item_func_release_lock lck_func_release_lock;
128 };
129 
130 struct dbcontext : public dbcontext_i, private noncopyable {
131   dbcontext(volatile database *d, bool for_write);
132   virtual ~dbcontext();
133   virtual void init_thread(const void *stack_botton,
134     volatile int& shutdown_flag);
135   virtual void wait_for_server_to_start();
136   virtual void term_thread();
137   virtual bool check_alive();
138   virtual void lock_tables_if();
139   virtual void unlock_tables_if();
140   virtual bool get_commit_error();
141   virtual void clear_error();
142   virtual void close_tables_if();
143   virtual void table_addref(size_t tbl_id);
144   virtual void table_release(size_t tbl_id);
145   virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
146   virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
147   virtual void set_statistics(size_t num_conns, size_t num_active);
148  private:
149   int set_thread_message(const char *fmt, ...)
150     __attribute__((format (printf, 2, 3)));
151   bool parse_fields(TABLE *const table, const char *str,
152     prep_stmt::fields_type& flds);
153   void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
154     const string_ref *fvals, size_t fvalslen);
155   void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
156     const string_ref *fvals, size_t fvalslen);
157   void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
158     ha_rkey_function find_flag, const cmd_exec_args& args);
159   size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
160     const record_filter *filters);
161   bool fill_filter_buf(TABLE *table, const prep_stmt& pst,
162     const record_filter *filters, uchar *filter_buf, size_t len);
163   int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
164     const record_filter *filters, const uchar *filter_buf);
165   void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
166   void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
167   int modify_record(dbcallback_i& cb, TABLE *const table,
168     const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
169     size_t& modified_count);
170  private:
171   typedef std::vector<tablevec_entry> table_vec_type;
172   typedef std::pair<std::string, std::string> table_name_type;
173   typedef std::map<table_name_type, size_t> table_map_type;
174  private:
175   volatile database *const dbref;
176   bool for_write_flag;
177   THD *thd;
178   MYSQL_LOCK *lock;
179   bool lock_failed;
180   std::auto_ptr<expr_user_lock> user_lock;
181   int user_level_lock_timeout;
182   bool user_level_lock_locked;
183   bool commit_error;
184   std::vector<char> info_message_buf;
185   table_vec_type table_vec;
186   table_map_type table_map;
187 };
188 
database(const config & c)189 database::database(const config& c)
190   : child_running(1), conf(c)
191 {
192 }
193 
~database()194 database::~database()
195 {
196 }
197 
198 dbcontext_ptr
create_context(bool for_write)199 database::create_context(bool for_write) volatile
200 {
201   return dbcontext_ptr(new dbcontext(this, for_write));
202 }
203 
204 void
stop()205 database::stop() volatile
206 {
207   child_running = false;
208 }
209 
210 const config&
get_conf() const211 database::get_conf() const volatile
212 {
213   return const_cast<const config&>(conf);
214 }
215 
216 database_ptr
create(const config & conf)217 database_i::create(const config& conf)
218 {
219   return database_ptr(new database(conf));
220 }
221 
dbcontext(volatile database * d,bool for_write)222 dbcontext::dbcontext(volatile database *d, bool for_write)
223   : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
224     user_level_lock_timeout(0), user_level_lock_locked(false),
225     commit_error(false)
226 {
227   info_message_buf.resize(8192);
228   user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
229 }
230 
~dbcontext()231 dbcontext::~dbcontext()
232 {
233 }
234 
235 #define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd)
236 
237 void
init_thread(const void * stack_bottom,volatile int & shutdown_flag)238 dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
239 {
240   DBG_THR(fprintf(stderr, "HNDSOCK init thread\n"));
241   {
242     my_thread_init();
243     thd = new THD(false);
244     thd->thread_stack = (char *)stack_bottom;
245     DBG_THR(fprintf(stderr,
246       "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
247       "O: %zu %zu %zu %zu %zu %zu %zu\n",
248       thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
249       DENA_THR_OFFSETOF(mdl_context),
250       DENA_THR_OFFSETOF(net),
251       DENA_THR_OFFSETOF(LOCK_thd_data),
252       DENA_THR_OFFSETOF(mysys_var),
253       DENA_THR_OFFSETOF(stmt_arena),
254       DENA_THR_OFFSETOF(limit_found_rows),
255       DENA_THR_OFFSETOF(locked_tables_list)));
256     thd->store_globals();
257     thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
258     NET v;
259     memset(&v, 0, sizeof(v));
260     thd->net = v;
261     if (for_write_flag) {
262       #if MYSQL_VERSION_ID >= 50505
263       thd->variables.option_bits |= OPTION_BIN_LOG;
264       #else
265       thd->options |= OPTION_BIN_LOG;
266       #endif
267       safeFree(thd->db);
268       thd->db = 0;
269       thd->db = my_strdup("handlersocket", MYF(0));
270     }
271     my_pthread_setspecific_ptr(THR_THD, thd);
272     DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd));
273   }
274   {
275     pthread_mutex_lock(&LOCK_thread_count);
276     thd->thread_id = thread_id++;
277     #if MYSQL_VERSION_ID >= 50600
278     add_global_thread(thd);
279     #else
280     threads.append(thd);
281     ++thread_count;
282     #endif
283     pthread_mutex_unlock(&LOCK_thread_count);
284   }
285 
286   DBG_THR(fprintf(stderr, "HNDSOCK %p init thread done\n", thd));
287 
288   thd_proc_info(thd, &info_message_buf[0]);
289   set_thread_message("hs:listening");
290   DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
291 
292   lex_start(thd);
293 
294   user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
295 }
296 
297 void
wait_for_server_to_start()298 dbcontext::wait_for_server_to_start()
299 {
300   mysql_mutex_lock(&LOCK_server_started);
301   while (!mysqld_server_started)
302     mysql_cond_wait(&COND_server_started, &LOCK_server_started);
303   mysql_mutex_unlock(&LOCK_server_started);
304 }
305 
306 int
set_thread_message(const char * fmt,...)307 dbcontext::set_thread_message(const char *fmt, ...)
308 {
309   va_list ap;
310   va_start(ap, fmt);
311   const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(),
312     fmt, ap);
313   va_end(ap);
314   return n;
315 }
316 
317 void
term_thread()318 dbcontext::term_thread()
319 {
320   DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd));
321   unlock_tables_if();
322   my_pthread_setspecific_ptr(THR_THD, 0);
323   {
324     #if MYSQL_VERSION_ID >= 50600
325     thd->release_resources();
326     #endif
327     #if MYSQL_VERSION_ID < 50620
328     pthread_mutex_lock(&LOCK_thread_count);
329     #endif
330     #if MYSQL_VERSION_ID >= 50600
331     remove_global_thread(thd);
332     #else
333     --thread_count;
334     #endif
335     delete thd;
336     thd = 0;
337     #if MYSQL_VERSION_ID < 50620
338     pthread_mutex_unlock(&LOCK_thread_count);
339     #endif
340     my_thread_end();
341   }
342 }
343 
344 bool
check_alive()345 dbcontext::check_alive()
346 {
347   pthread_mutex_lock(&thd->mysys_var->mutex);
348   int killed = thd_killed(thd);
349   pthread_mutex_unlock(&thd->mysys_var->mutex);
350   DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %u\n", thd, killed,
351     (int)st, sizeof(*thd)));
352   if (killed) {
353     DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", killed));
354     return false;
355   }
356   return true;
357 }
358 
359 void
lock_tables_if()360 dbcontext::lock_tables_if()
361 {
362   if (lock_failed) {
363     return;
364   }
365   if (for_write_flag && !user_level_lock_locked) {
366     if (user_lock->get_lock()) {
367       user_level_lock_locked = true;
368     } else {
369       lock_failed = true;
370       return;
371     }
372   }
373   if (lock == 0) {
374     const size_t num_max = table_vec.size();
375     TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1);
376     size_t num_open = 0;
377     for (size_t i = 0; i < num_max; ++i) {
378       if (table_vec[i].refcount > 0) {
379 	tables[num_open++] = table_vec[i].table;
380       }
381       table_vec[i].modified = false;
382     }
383     #if MYSQL_VERSION_ID >= 50505
384     lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
385     #else
386     bool need_reopen= false;
387     lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open,
388       MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen);
389     #endif
390     statistic_increment(lock_tables_count, &LOCK_status);
391     thd_proc_info(thd, &info_message_buf[0]);
392     DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n",
393       thd, lock, num_max, num_open));
394     if (lock == 0) {
395       lock_failed = true;
396       DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n",
397 	thd));
398     }
399     if (for_write_flag) {
400       #if MYSQL_VERSION_ID >= 50505
401       thd->set_current_stmt_binlog_format_row();
402       #else
403       thd->current_stmt_binlog_row_based = 1;
404       #endif
405     }
406     DENA_ALLOCA_FREE(tables);
407   }
408   DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
409 }
410 
411 void
unlock_tables_if()412 dbcontext::unlock_tables_if()
413 {
414   if (lock != 0) {
415     DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n",
416       thd, thd->lock));
417     if (for_write_flag) {
418       for (size_t i = 0; i < table_vec.size(); ++i) {
419 	if (table_vec[i].modified) {
420 	  query_cache_invalidate3(thd, table_vec[i].table, 1);
421 	  table_vec[i].table->file->ha_release_auto_increment();
422 	}
423       }
424     }
425     {
426       bool suc = true;
427       #if MYSQL_VERSION_ID >= 50505
428       suc = (trans_commit_stmt(thd) == 0);
429       #else
430       suc = (ha_autocommit_or_rollback(thd, 0) == 0);
431       #endif
432       if (!suc) {
433 	commit_error = true;
434 	DENA_VERBOSE(10, fprintf(stderr,
435 	  "HNDSOCK unlock tables: commit failed\n"));
436       }
437     }
438     mysql_unlock_tables(thd, lock);
439     lock = thd->lock = 0;
440     statistic_increment(unlock_tables_count, &LOCK_status);
441   }
442   if (user_level_lock_locked) {
443     if (user_lock->release_lock()) {
444       user_level_lock_locked = false;
445     }
446   }
447 }
448 
449 bool
get_commit_error()450 dbcontext::get_commit_error()
451 {
452   return commit_error;
453 }
454 
455 void
clear_error()456 dbcontext::clear_error()
457 {
458   lock_failed = false;
459   commit_error = false;
460 }
461 
462 void
close_tables_if()463 dbcontext::close_tables_if()
464 {
465   unlock_tables_if();
466   DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
467   close_thread_tables(thd);
468   #if MYSQL_VERSION_ID >= 50505
469   thd->mdl_context.release_transactional_locks();
470   #endif
471   if (!table_vec.empty()) {
472     statistic_increment(close_tables_count, &LOCK_status);
473     table_vec.clear();
474     table_map.clear();
475   }
476 }
477 
478 void
table_addref(size_t tbl_id)479 dbcontext::table_addref(size_t tbl_id)
480 {
481   table_vec[tbl_id].refcount += 1;
482   DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id,
483     table_vec[tbl_id].refcount));
484 }
485 
486 void
table_release(size_t tbl_id)487 dbcontext::table_release(size_t tbl_id)
488 {
489   table_vec[tbl_id].refcount -= 1;
490   DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id,
491     table_vec[tbl_id].refcount));
492 }
493 
494 void
resp_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst)495 dbcontext::resp_record(dbcallback_i& cb, TABLE *const table,
496   const prep_stmt& pst)
497 {
498   char rwpstr_buf[64];
499   String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
500   const prep_stmt::fields_type& rf = pst.get_ret_fields();
501   const size_t n = rf.size();
502   for (size_t i = 0; i < n; ++i) {
503     uint32_t fn = rf[i];
504     Field *const fld = table->field[fn];
505     DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn));
506     if (fld->is_null()) {
507       /* null */
508       cb.dbcb_resp_entry(0, 0);
509     } else {
510       fld->val_str(&rwpstr, &rwpstr);
511       const size_t len = rwpstr.length();
512       if (len != 0) {
513 	/* non-empty */
514 	cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length());
515       } else {
516 	/* empty */
517 	static const char empty_str[] = "";
518 	cb.dbcb_resp_entry(empty_str, 0);
519       }
520     }
521   }
522 }
523 
524 void
dump_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst)525 dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
526   const prep_stmt& pst)
527 {
528   char rwpstr_buf[64];
529   String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
530   const prep_stmt::fields_type& rf = pst.get_ret_fields();
531   const size_t n = rf.size();
532   for (size_t i = 0; i < n; ++i) {
533     uint32_t fn = rf[i];
534     Field *const fld = table->field[fn];
535     if (fld->is_null()) {
536       /* null */
537       fprintf(stderr, "NULL");
538     } else {
539       fld->val_str(&rwpstr, &rwpstr);
540       const std::string s(rwpstr.ptr(), rwpstr.length());
541       fprintf(stderr, "[%s]", s.c_str());
542     }
543   }
544   fprintf(stderr, "\n");
545 }
546 
547 int
modify_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst,const cmd_exec_args & args,char mod_op,size_t & modified_count)548 dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
549   const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
550   size_t& modified_count)
551 {
552   if (mod_op == 'U') {
553     /* update */
554     handler *const hnd = table->file;
555     uchar *const buf = table->record[0];
556     store_record(table, record[1]);
557     const prep_stmt::fields_type& rf = pst.get_ret_fields();
558     const size_t n = rf.size();
559     for (size_t i = 0; i < n; ++i) {
560       const string_ref& nv = args.uvals[i];
561       uint32_t fn = rf[i];
562       Field *const fld = table->field[fn];
563       if (nv.begin() == 0) {
564 	fld->set_null();
565       } else {
566 	fld->set_notnull();
567 	fld->store(nv.begin(), nv.size(), &my_charset_bin);
568       }
569     }
570     table_vec[pst.get_table_id()].modified = true;
571     const int r = hnd->ha_update_row(table->record[1], buf);
572     if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
573       return r;
574     }
575     ++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */
576   } else if (mod_op == 'D') {
577     /* delete */
578     handler *const hnd = table->file;
579     table_vec[pst.get_table_id()].modified = true;
580     const int r = hnd->ha_delete_row(table->record[0]);
581     if (r != 0) {
582       return r;
583     }
584     ++modified_count;
585   } else if (mod_op == '+' || mod_op == '-') {
586     /* increment/decrement */
587     handler *const hnd = table->file;
588     uchar *const buf = table->record[0];
589     store_record(table, record[1]);
590     const prep_stmt::fields_type& rf = pst.get_ret_fields();
591     const size_t n = rf.size();
592     size_t i = 0;
593     for (i = 0; i < n; ++i) {
594       const string_ref& nv = args.uvals[i];
595       uint32_t fn = rf[i];
596       Field *const fld = table->field[fn];
597       if (fld->is_null() || nv.begin() == 0) {
598 	continue;
599       }
600       const long long pval = fld->val_int();
601       const long long llv = atoll_nocheck(nv.begin(), nv.end());
602       /* TODO: llv == 0? */
603       long long nval = 0;
604       if (mod_op == '+') {
605 	/* increment */
606 	nval = pval + llv;
607       } else {
608 	/* decrement */
609 	nval = pval - llv;
610 	if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
611 	  break; /* don't modify */
612 	}
613       }
614       fld->store(nval, false);
615     }
616     if (i == n) {
617       /* modify */
618       table_vec[pst.get_table_id()].modified = true;
619       const int r = hnd->ha_update_row(table->record[1], buf);
620       if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
621 	return r;
622       }
623       ++modified_count;
624     }
625   }
626   return 0;
627 }
628 
629 void
cmd_insert_internal(dbcallback_i & cb,const prep_stmt & pst,const string_ref * fvals,size_t fvalslen)630 dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
631   const string_ref *fvals, size_t fvalslen)
632 {
633   if (!for_write_flag) {
634     return cb.dbcb_resp_short(2, "readonly");
635   }
636   lock_tables_if();
637   if (lock == 0) {
638     return cb.dbcb_resp_short(1, "lock_tables");
639   }
640   if (pst.get_table_id() >= table_vec.size()) {
641     return cb.dbcb_resp_short(2, "tblnum");
642   }
643   TABLE *const table = table_vec[pst.get_table_id()].table;
644   handler *const hnd = table->file;
645   uchar *const buf = table->record[0];
646   empty_record(table);
647   memset(buf, 0, table->s->null_bytes); /* clear null flags */
648   const prep_stmt::fields_type& rf = pst.get_ret_fields();
649   const size_t n = std::min(rf.size(), fvalslen);
650   for (size_t i = 0; i < n; ++i) {
651     uint32_t fn = rf[i];
652     Field *const fld = table->field[fn];
653     if (fvals[i].begin() == 0) {
654       fld->set_null();
655     } else {
656       fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
657     }
658   }
659   table->next_number_field = table->found_next_number_field;
660     /* FIXME: test */
661   const int r = hnd->ha_write_row(buf);
662   const ulonglong insert_id = table->file->insert_id_for_cur_row;
663   table->next_number_field = 0;
664   table_vec[pst.get_table_id()].modified = true;
665   if (r == 0 && table->found_next_number_field != 0) {
666     return cb.dbcb_resp_short_num64(0, insert_id);
667   }
668   if (r != 0) {
669     return cb.dbcb_resp_short_num(1, r);
670   }
671   return cb.dbcb_resp_short(0, "");
672 }
673 
674 void
cmd_sql_internal(dbcallback_i & cb,const prep_stmt & pst,const string_ref * fvals,size_t fvalslen)675 dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
676   const string_ref *fvals, size_t fvalslen)
677 {
678   if (fvalslen < 1) {
679     return cb.dbcb_resp_short(2, "syntax");
680   }
681   return cb.dbcb_resp_short(2, "notimpl");
682 }
683 
684 static size_t
prepare_keybuf(const cmd_exec_args & args,uchar * key_buf,TABLE * table,KEY & kinfo,size_t invalues_index)685 prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
686   KEY& kinfo, size_t invalues_index)
687 {
688   size_t kplen_sum = 0;
689   DBG_KEY(fprintf(stderr, "SLOW\n"));
690   for (size_t i = 0; i < args.kvalslen; ++i) {
691     const KEY_PART_INFO & kpt = kinfo.key_part[i];
692     string_ref kval = args.kvals[i];
693     if (args.invalues_keypart >= 0 &&
694       static_cast<size_t>(args.invalues_keypart) == i) {
695       kval = args.invalues[invalues_index];
696     }
697     if (kval.begin() == 0) {
698       kpt.field->set_null();
699     } else {
700       kpt.field->set_notnull();
701     }
702     kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
703     kplen_sum += kpt.store_length;
704     DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
705       kpt.store_length));
706   }
707   key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
708   DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
709     kinfo.key_length));
710   return kplen_sum;
711 }
712 
713 void
cmd_find_internal(dbcallback_i & cb,const prep_stmt & pst,ha_rkey_function find_flag,const cmd_exec_args & args)714 dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
715   ha_rkey_function find_flag, const cmd_exec_args& args)
716 {
717   const bool debug_out = (verbose_level >= 100);
718   bool need_resp_record = true;
719   char mod_op = 0;
720   const string_ref& mod_op_str = args.mod_op;
721   if (mod_op_str.size() != 0) {
722     if (!for_write_flag) {
723       return cb.dbcb_resp_short(2, "readonly");
724     }
725     mod_op = mod_op_str.begin()[0];
726     need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?';
727     switch (mod_op) {
728     case 'U': /* update */
729     case 'D': /* delete */
730     case '+': /* increment */
731     case '-': /* decrement */
732       break;
733     default:
734       if (debug_out) {
735 	fprintf(stderr, "unknown modop: %c\n", mod_op);
736       }
737       return cb.dbcb_resp_short(2, "modop");
738     }
739   }
740   lock_tables_if();
741   if (lock == 0) {
742     return cb.dbcb_resp_short(1, "lock_tables");
743   }
744   if (pst.get_table_id() >= table_vec.size()) {
745     return cb.dbcb_resp_short(2, "tblnum");
746   }
747   TABLE *const table = table_vec[pst.get_table_id()].table;
748   /* keys */
749   if (pst.get_idxnum() >= table->s->keys) {
750     return cb.dbcb_resp_short(2, "idxnum");
751   }
752   KEY& kinfo = table->key_info[pst.get_idxnum()];
753   #if MYSQL_VERSION_ID >= 50600
754   if (args.kvalslen > kinfo.actual_key_parts) {
755   #else
756   if (args.kvalslen > kinfo.key_parts) {
757   #endif
758     return cb.dbcb_resp_short(2, "kpnum");
759   }
760   uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
761   size_t invalues_idx = 0;
762   size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
763   /* filters */
764   uchar *filter_buf = 0;
765   if (args.filters != 0) {
766     const size_t filter_buf_len = calc_filter_buf_size(table, pst,
767       args.filters);
768     filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
769     if (!fill_filter_buf(table, pst, args.filters, filter_buf,
770       filter_buf_len)) {
771       return cb.dbcb_resp_short(2, "filterblob");
772     }
773   }
774   /* handler */
775   table->read_set = &table->s->all_set;
776   handler *const hnd = table->file;
777   if (!for_write_flag) {
778     hnd->init_table_handle_for_HANDLER();
779   }
780   hnd->ha_index_or_rnd_end();
781   hnd->ha_index_init(pst.get_idxnum(), 1);
782   if (need_resp_record) {
783     cb.dbcb_resp_begin(pst.get_ret_fields().size());
784   }
785   const uint32_t limit = args.limit ? args.limit : 1;
786   uint32_t skip = args.skip;
787   size_t modified_count = 0;
788   int r = 0;
789   bool is_first = true;
790   for (uint32_t cnt = 0; cnt < limit + skip;) {
791     if (is_first) {
792       is_first = false;
793       const key_part_map kpm = (1U << args.kvalslen) - 1;
794       #if MYSQL_VERSION_ID >= 50611
795       r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
796       #else
797       r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
798       #endif
799     } else if (args.invalues_keypart >= 0) {
800       if (++invalues_idx >= args.invalueslen) {
801 	break;
802       }
803       kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
804       const key_part_map kpm = (1U << args.kvalslen) - 1;
805       #if MYSQL_VERSION_ID >= 50611
806       r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
807       #else
808       r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
809       #endif
810     } else {
811       switch (find_flag) {
812       case HA_READ_BEFORE_KEY:
813       case HA_READ_KEY_OR_PREV:
814 	#if MYSQL_VERSION_ID >= 50600
815 	r = hnd->ha_index_prev(table->record[0]);
816 	#else
817 	r = hnd->index_prev(table->record[0]);
818 	#endif
819 	break;
820       case HA_READ_AFTER_KEY:
821       case HA_READ_KEY_OR_NEXT:
822 	#if MYSQL_VERSION_ID >= 50600
823 	r = hnd->ha_index_next(table->record[0]);
824 	#else
825 	r = hnd->index_next(table->record[0]);
826 	#endif
827 	break;
828       case HA_READ_KEY_EXACT:
829 	#if MYSQL_VERSION_ID >= 50611
830 	r = hnd->ha_index_next_same(table->record[0], key_buf, kplen_sum);
831 	#else
832 	r = hnd->index_next_same(table->record[0], key_buf, kplen_sum);
833 	#endif
834 	break;
835       default:
836 	r = HA_ERR_END_OF_FILE; /* to finish the loop */
837 	break;
838       }
839     }
840     if (debug_out) {
841       fprintf(stderr, "r=%d\n", r);
842       if (r == 0 || r == HA_ERR_RECORD_DELETED) {
843 	dump_record(cb, table, pst);
844       }
845     }
846     int filter_res = 0;
847     if (r != 0) {
848       /* no-count */
849     } else if (args.filters != 0 && (filter_res = check_filter(cb, table,
850       pst, args.filters, filter_buf)) != 0) {
851       if (filter_res < 0) {
852 	break;
853       }
854     } else if (skip > 0) {
855       --skip;
856     } else {
857       /* hit */
858       if (need_resp_record) {
859 	resp_record(cb, table, pst);
860       }
861       if (mod_op != 0) {
862 	r = modify_record(cb, table, pst, args, mod_op, modified_count);
863       }
864       ++cnt;
865     }
866     if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
867       continue;
868     }
869     if (r != 0 && r != HA_ERR_RECORD_DELETED) {
870       break;
871     }
872   }
873   hnd->ha_index_or_rnd_end();
874   if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
875     r != HA_ERR_END_OF_FILE) {
876     /* failed */
877     if (need_resp_record) {
878       /* revert dbcb_resp_begin() and dbcb_resp_entry() */
879       cb.dbcb_resp_cancel();
880     }
881     cb.dbcb_resp_short_num(1, r);
882   } else {
883     /* succeeded */
884     if (need_resp_record) {
885       cb.dbcb_resp_end();
886     } else {
887       cb.dbcb_resp_short_num(0, modified_count);
888     }
889   }
890   DENA_ALLOCA_FREE(filter_buf);
891   DENA_ALLOCA_FREE(key_buf);
892 }
893 
894 size_t
895 dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
896   const record_filter *filters)
897 {
898   size_t filter_buf_len = 0;
899   for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
900     if (f->val.begin() == 0) {
901       continue;
902     }
903     const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
904     filter_buf_len += table->field[fn]->pack_length();
905   }
906   ++filter_buf_len;
907     /* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
908        Allocate 1 more byte for safety. */
909   return filter_buf_len;
910 }
911 
912 bool
913 dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst,
914   const record_filter *filters, uchar *filter_buf, size_t len)
915 {
916   memset(filter_buf, 0, len);
917   size_t pos = 0;
918   for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
919     if (f->val.begin() == 0) {
920       continue;
921     }
922     const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
923     Field *const fld = table->field[fn];
924     if ((fld->flags & BLOB_FLAG) != 0) {
925       return false;
926     }
927     fld->store(f->val.begin(), f->val.size(), &my_charset_bin);
928     const size_t packlen = fld->pack_length();
929     memcpy(filter_buf + pos, fld->ptr, packlen);
930     pos += packlen;
931   }
932   return true;
933 }
934 
935 int
936 dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
937   const record_filter *filters, const uchar *filter_buf)
938 {
939   DBG_FILTER(fprintf(stderr, "check_filter\n"));
940   size_t pos = 0;
941   for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
942     const string_ref& op = f->op;
943     const string_ref& val = f->val;
944     const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
945     Field *const fld = table->field[fn];
946     const size_t packlen = fld->pack_length();
947     const uchar *const bval = filter_buf + pos;
948     int cv = 0;
949     if (fld->is_null()) {
950       cv = (val.begin() == 0) ? 0 : -1;
951     } else {
952       cv = (val.begin() == 0) ? 1 : fld->cmp(bval);
953     }
954     DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv));
955     bool cond = true;
956     if (op.size() == 1) {
957       switch (op.begin()[0]) {
958       case '>':
959 	DBG_FILTER(fprintf(stderr, "check_filter op: >\n"));
960 	cond = (cv > 0);
961 	break;
962       case '<':
963 	DBG_FILTER(fprintf(stderr, "check_filter op: <\n"));
964 	cond = (cv < 0);
965 	break;
966       case '=':
967 	DBG_FILTER(fprintf(stderr, "check_filter op: =\n"));
968 	cond = (cv == 0);
969 	break;
970       default:
971 	DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
972 	cond = false; /* FIXME: error */
973 	break;
974       }
975     } else if (op.size() == 2 && op.begin()[1] == '=') {
976       switch (op.begin()[0]) {
977       case '>':
978 	DBG_FILTER(fprintf(stderr, "check_filter op: >=\n"));
979 	cond = (cv >= 0);
980 	break;
981       case '<':
982 	DBG_FILTER(fprintf(stderr, "check_filter op: <=\n"));
983 	cond = (cv <= 0);
984 	break;
985       case '!':
986 	DBG_FILTER(fprintf(stderr, "check_filter op: !=\n"));
987 	cond = (cv != 0);
988 	break;
989       default:
990 	DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
991 	cond = false; /* FIXME: error */
992 	break;
993       }
994     }
995     DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond));
996     if (!cond) {
997       return (f->filter_type == record_filter_type_skip) ? 1 : -1;
998     }
999     if (val.begin() != 0) {
1000       pos += packlen;
1001     }
1002   }
1003   return 0;
1004 }
1005 
1006 void
1007 dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
1008 {
1009   unlock_tables_if();
1010   const table_name_type k = std::make_pair(std::string(arg.dbn),
1011     std::string(arg.tbl));
1012   const table_map_type::const_iterator iter = table_map.find(k);
1013   uint32_t tblnum = 0;
1014   if (iter != table_map.end()) {
1015     tblnum = iter->second;
1016     DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(),
1017       (int)tblnum));
1018   } else {
1019     TABLE_LIST tables;
1020     TABLE *table = 0;
1021     bool refresh = true;
1022     const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
1023     #if MYSQL_VERSION_ID >= 50505
1024     tables.init_one_table(arg.dbn, strlen(arg.dbn), arg.tbl, strlen(arg.tbl),
1025       arg.tbl, lock_type);
1026     tables.mdl_request.init(MDL_key::TABLE, arg.dbn, arg.tbl,
1027       for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
1028     Open_table_context ot_act(thd, 0);
1029     #if MYSQL_VERSION_ID >= 50600
1030     if (!open_table(thd, &tables, &ot_act)) {
1031     #else
1032     if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
1033     #endif
1034       table = tables.table;
1035     }
1036     #else
1037     tables.init_one_table(arg.dbn, arg.tbl, lock_type);
1038     table = open_table(thd, &tables, thd->mem_root, &refresh,
1039       OPEN_VIEW_NO_PARSE);
1040     #endif
1041     if (table == 0) {
1042       DENA_VERBOSE(20, fprintf(stderr,
1043 	"HNDSOCK failed to open %p [%s] [%s] [%d]\n",
1044 	thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
1045       return cb.dbcb_resp_short(1, "open_table");
1046     }
1047     statistic_increment(open_tables_count, &LOCK_status);
1048     table->reginfo.lock_type = lock_type;
1049     table->use_all_columns();
1050     tblnum = table_vec.size();
1051     tablevec_entry e;
1052     e.table = table;
1053     table_vec.push_back(e);
1054     table_map[k] = tblnum;
1055   }
1056   size_t idxnum = static_cast<size_t>(-1);
1057   if (arg.idx[0] >= '0' && arg.idx[0] <= '9') {
1058     /* numeric */
1059     TABLE *const table = table_vec[tblnum].table;
1060     idxnum = atoi(arg.idx);
1061     if (idxnum >= table->s->keys) {
1062       return cb.dbcb_resp_short(2, "idxnum");
1063     }
1064   } else {
1065     const char *const idx_name_to_open =
1066       arg.idx[0]  == '\0' ? "PRIMARY" : arg.idx;
1067     TABLE *const table = table_vec[tblnum].table;
1068     for (uint i = 0; i < table->s->keys; ++i) {
1069       KEY& kinfo = table->key_info[i];
1070       if (strcmp(kinfo.name, idx_name_to_open) == 0) {
1071 	idxnum = i;
1072 	break;
1073       }
1074     }
1075   }
1076   if (idxnum == size_t(-1)) {
1077     return cb.dbcb_resp_short(2, "idxnum");
1078   }
1079   prep_stmt::fields_type rf;
1080   prep_stmt::fields_type ff;
1081   if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
1082     return cb.dbcb_resp_short(2, "fld");
1083   }
1084   if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
1085     return cb.dbcb_resp_short(2, "fld");
1086   }
1087   prep_stmt p(this, tblnum, idxnum, rf, ff);
1088   cb.dbcb_set_prep_stmt(arg.pst_id, p);
1089   return cb.dbcb_resp_short(0, "");
1090 }
1091 
1092 bool
1093 dbcontext::parse_fields(TABLE *const table, const char *str,
1094   prep_stmt::fields_type& flds)
1095 {
1096   string_ref flds_sr(str, strlen(str));
1097   std::vector<string_ref> fldnms;
1098   if (flds_sr.size() != 0) {
1099     split(',', flds_sr, fldnms);
1100   }
1101   for (size_t i = 0; i < fldnms.size(); ++i) {
1102     Field **fld = 0;
1103     size_t j = 0;
1104     for (fld = table->field; *fld; ++fld, ++j) {
1105       DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name));
1106       string_ref fn((*fld)->field_name, strlen((*fld)->field_name));
1107       if (fn == fldnms[i]) {
1108 	break;
1109       }
1110     }
1111     if (*fld == 0) {
1112       DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
1113 	std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
1114       return false;
1115     }
1116     DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name, j));
1117     flds.push_back(j);
1118   }
1119   return true;
1120 }
1121 
1122 enum db_write_op {
1123   db_write_op_none = 0,
1124   db_write_op_insert = 1,
1125   db_write_op_sql = 2,
1126 };
1127 
1128 void
1129 dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args)
1130 {
1131   const prep_stmt& p = *args.pst;
1132   if (p.get_table_id() == static_cast<size_t>(-1)) {
1133     return cb.dbcb_resp_short(2, "stmtnum");
1134   }
1135   ha_rkey_function find_flag = HA_READ_KEY_EXACT;
1136   db_write_op wrop = db_write_op_none;
1137   if (args.op.size() == 1) {
1138     switch (args.op.begin()[0]) {
1139     case '=':
1140       find_flag = HA_READ_KEY_EXACT;
1141       break;
1142     case '>':
1143       find_flag = HA_READ_AFTER_KEY;
1144       break;
1145     case '<':
1146       find_flag = HA_READ_BEFORE_KEY;
1147       break;
1148     case '+':
1149       wrop = db_write_op_insert;
1150       break;
1151     case 'S':
1152       wrop = db_write_op_sql;
1153       break;
1154     default:
1155       return cb.dbcb_resp_short(2, "op");
1156     }
1157   } else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
1158     switch (args.op.begin()[0]) {
1159     case '>':
1160       find_flag = HA_READ_KEY_OR_NEXT;
1161       break;
1162     case '<':
1163       find_flag = HA_READ_KEY_OR_PREV;
1164       break;
1165     default:
1166       return cb.dbcb_resp_short(2, "op");
1167     }
1168   } else {
1169     return cb.dbcb_resp_short(2, "op");
1170   }
1171   if (args.kvalslen <= 0) {
1172     return cb.dbcb_resp_short(2, "klen");
1173   }
1174   switch (wrop) {
1175   case db_write_op_none:
1176     return cmd_find_internal(cb, p, find_flag, args);
1177   case db_write_op_insert:
1178     return cmd_insert_internal(cb, p, args.kvals, args.kvalslen);
1179   case db_write_op_sql:
1180     return cmd_sql_internal(cb, p, args.kvals, args.kvalslen);
1181   }
1182 }
1183 
1184 void
1185 dbcontext::set_statistics(size_t num_conns, size_t num_active)
1186 {
1187   thd_proc_info(thd, &info_message_buf[0]);
1188   if (for_write_flag) {
1189     set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
1190       num_conns, num_active);
1191   } else {
1192     set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
1193       num_conns, num_active);
1194   }
1195 }
1196 
1197 };
1198 
1199