1
2 // vim:sw=2:ai
3
4 /*
5 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6 * See COPYRIGHT.txt for details.
7 */
8
9 #include <stdlib.h>
10 #include <stdio.h>
11 #include <string.h>
12
13 #include "database.hpp"
14 #include "string_util.hpp"
15 #include "escape.hpp"
16 #include "mysql_incl.hpp"
17
18 #define DBG_KEY(x)
19 #define DBG_SHUT(x)
20 #define DBG_LOCK(x)
21 #define DBG_THR(x)
22 #define DBG_CMP(x)
23 #define DBG_FLD(x)
24 #define DBG_FILTER(x)
25 #define DBG_REFCNT(x)
26 #define DBG_KEYLEN(x)
27 #define DBG_DELETED
28
29 /* status variables */
30 unsigned long long int open_tables_count;
31 unsigned long long int close_tables_count;
32 unsigned long long int lock_tables_count;
33 unsigned long long int unlock_tables_count;
34 unsigned long long int index_exec_count;
35
36 namespace dena {
37
prep_stmt()38 prep_stmt::prep_stmt()
39 : dbctx(0), table_id(static_cast<size_t>(-1)),
40 idxnum(static_cast<size_t>(-1))
41 {
42 }
prep_stmt(dbcontext_i * c,size_t tbl,size_t idx,const fields_type & rf,const fields_type & ff)43 prep_stmt::prep_stmt(dbcontext_i *c, size_t tbl, size_t idx,
44 const fields_type& rf, const fields_type& ff)
45 : dbctx(c), table_id(tbl), idxnum(idx), ret_fields(rf), filter_fields(ff)
46 {
47 if (dbctx) {
48 dbctx->table_addref(table_id);
49 }
50 }
~prep_stmt()51 prep_stmt::~prep_stmt()
52 {
53 if (dbctx) {
54 dbctx->table_release(table_id);
55 }
56 }
57
prep_stmt(const prep_stmt & x)58 prep_stmt::prep_stmt(const prep_stmt& x)
59 : dbctx(x.dbctx), table_id(x.table_id), idxnum(x.idxnum),
60 ret_fields(x.ret_fields), filter_fields(x.filter_fields)
61 {
62 if (dbctx) {
63 dbctx->table_addref(table_id);
64 }
65 }
66
67 prep_stmt&
operator =(const prep_stmt & x)68 prep_stmt::operator =(const prep_stmt& x)
69 {
70 if (this != &x) {
71 if (dbctx) {
72 dbctx->table_release(table_id);
73 }
74 dbctx = x.dbctx;
75 table_id = x.table_id;
76 idxnum = x.idxnum;
77 ret_fields = x.ret_fields;
78 filter_fields = x.filter_fields;
79 if (dbctx) {
80 dbctx->table_addref(table_id);
81 }
82 }
83 return *this;
84 }
85
86 struct database : public database_i, private noncopyable {
87 database(const config& c);
88 virtual ~database();
89 virtual dbcontext_ptr create_context(bool for_write) volatile;
90 virtual void stop() volatile;
91 virtual const config& get_conf() const volatile;
92 public:
93 int child_running;
94 private:
95 config conf;
96 };
97
98 struct tablevec_entry {
99 TABLE *table;
100 size_t refcount;
101 bool modified;
tablevec_entrydena::tablevec_entry102 tablevec_entry() : table(0), refcount(0), modified(false) { }
103 };
104
105 struct expr_user_lock : private noncopyable {
expr_user_lockdena::expr_user_lock106 expr_user_lock(THD *thd, int timeout)
107 : lck_key("handlersocket_wr", 16, &my_charset_latin1),
108 lck_timeout(timeout),
109 lck_func_get_lock(&lck_key, &lck_timeout),
110 lck_func_release_lock(&lck_key)
111 {
112 lck_key.fix_fields(thd, 0);
113 lck_timeout.fix_fields(thd, 0);
114 lck_func_get_lock.fix_fields(thd, 0);
115 lck_func_release_lock.fix_fields(thd, 0);
116 }
get_lockdena::expr_user_lock117 long long get_lock() {
118 return lck_func_get_lock.val_int();
119 }
release_lockdena::expr_user_lock120 long long release_lock() {
121 return lck_func_release_lock.val_int();
122 }
123 private:
124 Item_string lck_key;
125 Item_int lck_timeout;
126 Item_func_get_lock lck_func_get_lock;
127 Item_func_release_lock lck_func_release_lock;
128 };
129
130 struct dbcontext : public dbcontext_i, private noncopyable {
131 dbcontext(volatile database *d, bool for_write);
132 virtual ~dbcontext();
133 virtual void init_thread(const void *stack_botton,
134 volatile int& shutdown_flag);
135 virtual void wait_for_server_to_start();
136 virtual void term_thread();
137 virtual bool check_alive();
138 virtual void lock_tables_if();
139 virtual void unlock_tables_if();
140 virtual bool get_commit_error();
141 virtual void clear_error();
142 virtual void close_tables_if();
143 virtual void table_addref(size_t tbl_id);
144 virtual void table_release(size_t tbl_id);
145 virtual void cmd_open(dbcallback_i& cb, const cmd_open_args& args);
146 virtual void cmd_exec(dbcallback_i& cb, const cmd_exec_args& args);
147 virtual void set_statistics(size_t num_conns, size_t num_active);
148 private:
149 int set_thread_message(const char *fmt, ...)
150 __attribute__((format (printf, 2, 3)));
151 bool parse_fields(TABLE *const table, const char *str,
152 prep_stmt::fields_type& flds);
153 void cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
154 const string_ref *fvals, size_t fvalslen);
155 void cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
156 const string_ref *fvals, size_t fvalslen);
157 void cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
158 ha_rkey_function find_flag, const cmd_exec_args& args);
159 size_t calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
160 const record_filter *filters);
161 bool fill_filter_buf(TABLE *table, const prep_stmt& pst,
162 const record_filter *filters, uchar *filter_buf, size_t len);
163 int check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
164 const record_filter *filters, const uchar *filter_buf);
165 void resp_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
166 void dump_record(dbcallback_i& cb, TABLE *const table, const prep_stmt& pst);
167 int modify_record(dbcallback_i& cb, TABLE *const table,
168 const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
169 size_t& modified_count);
170 private:
171 typedef std::vector<tablevec_entry> table_vec_type;
172 typedef std::pair<std::string, std::string> table_name_type;
173 typedef std::map<table_name_type, size_t> table_map_type;
174 private:
175 volatile database *const dbref;
176 bool for_write_flag;
177 THD *thd;
178 MYSQL_LOCK *lock;
179 bool lock_failed;
180 std::auto_ptr<expr_user_lock> user_lock;
181 int user_level_lock_timeout;
182 bool user_level_lock_locked;
183 bool commit_error;
184 std::vector<char> info_message_buf;
185 table_vec_type table_vec;
186 table_map_type table_map;
187 };
188
database(const config & c)189 database::database(const config& c)
190 : child_running(1), conf(c)
191 {
192 }
193
~database()194 database::~database()
195 {
196 }
197
198 dbcontext_ptr
create_context(bool for_write)199 database::create_context(bool for_write) volatile
200 {
201 return dbcontext_ptr(new dbcontext(this, for_write));
202 }
203
204 void
stop()205 database::stop() volatile
206 {
207 child_running = false;
208 }
209
210 const config&
get_conf() const211 database::get_conf() const volatile
212 {
213 return const_cast<const config&>(conf);
214 }
215
216 database_ptr
create(const config & conf)217 database_i::create(const config& conf)
218 {
219 return database_ptr(new database(conf));
220 }
221
dbcontext(volatile database * d,bool for_write)222 dbcontext::dbcontext(volatile database *d, bool for_write)
223 : dbref(d), for_write_flag(for_write), thd(0), lock(0), lock_failed(false),
224 user_level_lock_timeout(0), user_level_lock_locked(false),
225 commit_error(false)
226 {
227 info_message_buf.resize(8192);
228 user_level_lock_timeout = d->get_conf().get_int("wrlock_timeout", 12);
229 }
230
~dbcontext()231 dbcontext::~dbcontext()
232 {
233 }
234
235 #define DENA_THR_OFFSETOF(fld) ((char *)(&thd->fld) - (char *)thd)
236
237 void
init_thread(const void * stack_bottom,volatile int & shutdown_flag)238 dbcontext::init_thread(const void *stack_bottom, volatile int& shutdown_flag)
239 {
240 DBG_THR(fprintf(stderr, "HNDSOCK init thread\n"));
241 {
242 my_thread_init();
243 thd = new THD(false);
244 thd->thread_stack = (char *)stack_bottom;
245 DBG_THR(fprintf(stderr,
246 "thread_stack = %p sizeof(THD)=%zu sizeof(mtx)=%zu "
247 "O: %zu %zu %zu %zu %zu %zu %zu\n",
248 thd->thread_stack, sizeof(THD), sizeof(LOCK_thread_count),
249 DENA_THR_OFFSETOF(mdl_context),
250 DENA_THR_OFFSETOF(net),
251 DENA_THR_OFFSETOF(LOCK_thd_data),
252 DENA_THR_OFFSETOF(mysys_var),
253 DENA_THR_OFFSETOF(stmt_arena),
254 DENA_THR_OFFSETOF(limit_found_rows),
255 DENA_THR_OFFSETOF(locked_tables_list)));
256 thd->store_globals();
257 thd->system_thread = static_cast<enum_thread_type>(1<<30UL);
258 NET v;
259 memset(&v, 0, sizeof(v));
260 thd->net = v;
261 if (for_write_flag) {
262 #if MYSQL_VERSION_ID >= 50505
263 thd->variables.option_bits |= OPTION_BIN_LOG;
264 #else
265 thd->options |= OPTION_BIN_LOG;
266 #endif
267 safeFree(thd->db);
268 thd->db = 0;
269 thd->db = my_strdup("handlersocket", MYF(0));
270 }
271 my_pthread_setspecific_ptr(THR_THD, thd);
272 DBG_THR(fprintf(stderr, "HNDSOCK x0 %p\n", thd));
273 }
274 {
275 pthread_mutex_lock(&LOCK_thread_count);
276 thd->thread_id = thread_id++;
277 #if MYSQL_VERSION_ID >= 50600
278 add_global_thread(thd);
279 #else
280 threads.append(thd);
281 ++thread_count;
282 #endif
283 pthread_mutex_unlock(&LOCK_thread_count);
284 }
285
286 DBG_THR(fprintf(stderr, "HNDSOCK %p init thread done\n", thd));
287
288 thd_proc_info(thd, &info_message_buf[0]);
289 set_thread_message("hs:listening");
290 DBG_THR(fprintf(stderr, "HNDSOCK x1 %p\n", thd));
291
292 lex_start(thd);
293
294 user_lock.reset(new expr_user_lock(thd, user_level_lock_timeout));
295 }
296
297 void
wait_for_server_to_start()298 dbcontext::wait_for_server_to_start()
299 {
300 mysql_mutex_lock(&LOCK_server_started);
301 while (!mysqld_server_started)
302 mysql_cond_wait(&COND_server_started, &LOCK_server_started);
303 mysql_mutex_unlock(&LOCK_server_started);
304 }
305
306 int
set_thread_message(const char * fmt,...)307 dbcontext::set_thread_message(const char *fmt, ...)
308 {
309 va_list ap;
310 va_start(ap, fmt);
311 const int n = vsnprintf(&info_message_buf[0], info_message_buf.size(),
312 fmt, ap);
313 va_end(ap);
314 return n;
315 }
316
317 void
term_thread()318 dbcontext::term_thread()
319 {
320 DBG_THR(fprintf(stderr, "HNDSOCK thread end %p\n", thd));
321 unlock_tables_if();
322 my_pthread_setspecific_ptr(THR_THD, 0);
323 {
324 #if MYSQL_VERSION_ID >= 50600
325 thd->release_resources();
326 #endif
327 #if MYSQL_VERSION_ID < 50620
328 pthread_mutex_lock(&LOCK_thread_count);
329 #endif
330 #if MYSQL_VERSION_ID >= 50600
331 remove_global_thread(thd);
332 #else
333 --thread_count;
334 #endif
335 delete thd;
336 thd = 0;
337 #if MYSQL_VERSION_ID < 50620
338 pthread_mutex_unlock(&LOCK_thread_count);
339 #endif
340 my_thread_end();
341 }
342 }
343
344 bool
check_alive()345 dbcontext::check_alive()
346 {
347 pthread_mutex_lock(&thd->mysys_var->mutex);
348 int killed = thd_killed(thd);
349 pthread_mutex_unlock(&thd->mysys_var->mutex);
350 DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %p %p %d %u\n", thd, killed,
351 (int)st, sizeof(*thd)));
352 if (killed) {
353 DBG_SHUT(fprintf(stderr, "chk HNDSOCK kst %d break\n", killed));
354 return false;
355 }
356 return true;
357 }
358
359 void
lock_tables_if()360 dbcontext::lock_tables_if()
361 {
362 if (lock_failed) {
363 return;
364 }
365 if (for_write_flag && !user_level_lock_locked) {
366 if (user_lock->get_lock()) {
367 user_level_lock_locked = true;
368 } else {
369 lock_failed = true;
370 return;
371 }
372 }
373 if (lock == 0) {
374 const size_t num_max = table_vec.size();
375 TABLE **const tables = DENA_ALLOCA_ALLOCATE(TABLE *, num_max + 1);
376 size_t num_open = 0;
377 for (size_t i = 0; i < num_max; ++i) {
378 if (table_vec[i].refcount > 0) {
379 tables[num_open++] = table_vec[i].table;
380 }
381 table_vec[i].modified = false;
382 }
383 #if MYSQL_VERSION_ID >= 50505
384 lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open, 0);
385 #else
386 bool need_reopen= false;
387 lock = thd->lock = mysql_lock_tables(thd, &tables[0], num_open,
388 MYSQL_LOCK_NOTIFY_IF_NEED_REOPEN, &need_reopen);
389 #endif
390 statistic_increment(lock_tables_count, &LOCK_status);
391 thd_proc_info(thd, &info_message_buf[0]);
392 DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK lock tables %p %p %zu %zu\n",
393 thd, lock, num_max, num_open));
394 if (lock == 0) {
395 lock_failed = true;
396 DENA_VERBOSE(10, fprintf(stderr, "HNDSOCK failed to lock tables %p\n",
397 thd));
398 }
399 if (for_write_flag) {
400 #if MYSQL_VERSION_ID >= 50505
401 thd->set_current_stmt_binlog_format_row();
402 #else
403 thd->current_stmt_binlog_row_based = 1;
404 #endif
405 }
406 DENA_ALLOCA_FREE(tables);
407 }
408 DBG_LOCK(fprintf(stderr, "HNDSOCK tblnum=%d\n", (int)tblnum));
409 }
410
411 void
unlock_tables_if()412 dbcontext::unlock_tables_if()
413 {
414 if (lock != 0) {
415 DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK unlock tables %p %p\n",
416 thd, thd->lock));
417 if (for_write_flag) {
418 for (size_t i = 0; i < table_vec.size(); ++i) {
419 if (table_vec[i].modified) {
420 query_cache_invalidate3(thd, table_vec[i].table, 1);
421 table_vec[i].table->file->ha_release_auto_increment();
422 }
423 }
424 }
425 {
426 bool suc = true;
427 #if MYSQL_VERSION_ID >= 50505
428 suc = (trans_commit_stmt(thd) == 0);
429 #else
430 suc = (ha_autocommit_or_rollback(thd, 0) == 0);
431 #endif
432 if (!suc) {
433 commit_error = true;
434 DENA_VERBOSE(10, fprintf(stderr,
435 "HNDSOCK unlock tables: commit failed\n"));
436 }
437 }
438 mysql_unlock_tables(thd, lock);
439 lock = thd->lock = 0;
440 statistic_increment(unlock_tables_count, &LOCK_status);
441 }
442 if (user_level_lock_locked) {
443 if (user_lock->release_lock()) {
444 user_level_lock_locked = false;
445 }
446 }
447 }
448
449 bool
get_commit_error()450 dbcontext::get_commit_error()
451 {
452 return commit_error;
453 }
454
455 void
clear_error()456 dbcontext::clear_error()
457 {
458 lock_failed = false;
459 commit_error = false;
460 }
461
462 void
close_tables_if()463 dbcontext::close_tables_if()
464 {
465 unlock_tables_if();
466 DENA_VERBOSE(100, fprintf(stderr, "HNDSOCK close tables\n"));
467 close_thread_tables(thd);
468 #if MYSQL_VERSION_ID >= 50505
469 thd->mdl_context.release_transactional_locks();
470 #endif
471 if (!table_vec.empty()) {
472 statistic_increment(close_tables_count, &LOCK_status);
473 table_vec.clear();
474 table_map.clear();
475 }
476 }
477
478 void
table_addref(size_t tbl_id)479 dbcontext::table_addref(size_t tbl_id)
480 {
481 table_vec[tbl_id].refcount += 1;
482 DBG_REFCNT(fprintf(stderr, "%p %zu %zu addref\n", this, tbl_id,
483 table_vec[tbl_id].refcount));
484 }
485
486 void
table_release(size_t tbl_id)487 dbcontext::table_release(size_t tbl_id)
488 {
489 table_vec[tbl_id].refcount -= 1;
490 DBG_REFCNT(fprintf(stderr, "%p %zu %zu release\n", this, tbl_id,
491 table_vec[tbl_id].refcount));
492 }
493
494 void
resp_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst)495 dbcontext::resp_record(dbcallback_i& cb, TABLE *const table,
496 const prep_stmt& pst)
497 {
498 char rwpstr_buf[64];
499 String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
500 const prep_stmt::fields_type& rf = pst.get_ret_fields();
501 const size_t n = rf.size();
502 for (size_t i = 0; i < n; ++i) {
503 uint32_t fn = rf[i];
504 Field *const fld = table->field[fn];
505 DBG_FLD(fprintf(stderr, "fld=%p %zu\n", fld, fn));
506 if (fld->is_null()) {
507 /* null */
508 cb.dbcb_resp_entry(0, 0);
509 } else {
510 fld->val_str(&rwpstr, &rwpstr);
511 const size_t len = rwpstr.length();
512 if (len != 0) {
513 /* non-empty */
514 cb.dbcb_resp_entry(rwpstr.ptr(), rwpstr.length());
515 } else {
516 /* empty */
517 static const char empty_str[] = "";
518 cb.dbcb_resp_entry(empty_str, 0);
519 }
520 }
521 }
522 }
523
524 void
dump_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst)525 dbcontext::dump_record(dbcallback_i& cb, TABLE *const table,
526 const prep_stmt& pst)
527 {
528 char rwpstr_buf[64];
529 String rwpstr(rwpstr_buf, sizeof(rwpstr_buf), &my_charset_bin);
530 const prep_stmt::fields_type& rf = pst.get_ret_fields();
531 const size_t n = rf.size();
532 for (size_t i = 0; i < n; ++i) {
533 uint32_t fn = rf[i];
534 Field *const fld = table->field[fn];
535 if (fld->is_null()) {
536 /* null */
537 fprintf(stderr, "NULL");
538 } else {
539 fld->val_str(&rwpstr, &rwpstr);
540 const std::string s(rwpstr.ptr(), rwpstr.length());
541 fprintf(stderr, "[%s]", s.c_str());
542 }
543 }
544 fprintf(stderr, "\n");
545 }
546
547 int
modify_record(dbcallback_i & cb,TABLE * const table,const prep_stmt & pst,const cmd_exec_args & args,char mod_op,size_t & modified_count)548 dbcontext::modify_record(dbcallback_i& cb, TABLE *const table,
549 const prep_stmt& pst, const cmd_exec_args& args, char mod_op,
550 size_t& modified_count)
551 {
552 if (mod_op == 'U') {
553 /* update */
554 handler *const hnd = table->file;
555 uchar *const buf = table->record[0];
556 store_record(table, record[1]);
557 const prep_stmt::fields_type& rf = pst.get_ret_fields();
558 const size_t n = rf.size();
559 for (size_t i = 0; i < n; ++i) {
560 const string_ref& nv = args.uvals[i];
561 uint32_t fn = rf[i];
562 Field *const fld = table->field[fn];
563 if (nv.begin() == 0) {
564 fld->set_null();
565 } else {
566 fld->set_notnull();
567 fld->store(nv.begin(), nv.size(), &my_charset_bin);
568 }
569 }
570 table_vec[pst.get_table_id()].modified = true;
571 const int r = hnd->ha_update_row(table->record[1], buf);
572 if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
573 return r;
574 }
575 ++modified_count; /* TODO: HA_ERR_RECORD_IS_THE_SAME? */
576 } else if (mod_op == 'D') {
577 /* delete */
578 handler *const hnd = table->file;
579 table_vec[pst.get_table_id()].modified = true;
580 const int r = hnd->ha_delete_row(table->record[0]);
581 if (r != 0) {
582 return r;
583 }
584 ++modified_count;
585 } else if (mod_op == '+' || mod_op == '-') {
586 /* increment/decrement */
587 handler *const hnd = table->file;
588 uchar *const buf = table->record[0];
589 store_record(table, record[1]);
590 const prep_stmt::fields_type& rf = pst.get_ret_fields();
591 const size_t n = rf.size();
592 size_t i = 0;
593 for (i = 0; i < n; ++i) {
594 const string_ref& nv = args.uvals[i];
595 uint32_t fn = rf[i];
596 Field *const fld = table->field[fn];
597 if (fld->is_null() || nv.begin() == 0) {
598 continue;
599 }
600 const long long pval = fld->val_int();
601 const long long llv = atoll_nocheck(nv.begin(), nv.end());
602 /* TODO: llv == 0? */
603 long long nval = 0;
604 if (mod_op == '+') {
605 /* increment */
606 nval = pval + llv;
607 } else {
608 /* decrement */
609 nval = pval - llv;
610 if ((pval < 0 && nval > 0) || (pval > 0 && nval < 0)) {
611 break; /* don't modify */
612 }
613 }
614 fld->store(nval, false);
615 }
616 if (i == n) {
617 /* modify */
618 table_vec[pst.get_table_id()].modified = true;
619 const int r = hnd->ha_update_row(table->record[1], buf);
620 if (r != 0 && r != HA_ERR_RECORD_IS_THE_SAME) {
621 return r;
622 }
623 ++modified_count;
624 }
625 }
626 return 0;
627 }
628
629 void
cmd_insert_internal(dbcallback_i & cb,const prep_stmt & pst,const string_ref * fvals,size_t fvalslen)630 dbcontext::cmd_insert_internal(dbcallback_i& cb, const prep_stmt& pst,
631 const string_ref *fvals, size_t fvalslen)
632 {
633 if (!for_write_flag) {
634 return cb.dbcb_resp_short(2, "readonly");
635 }
636 lock_tables_if();
637 if (lock == 0) {
638 return cb.dbcb_resp_short(1, "lock_tables");
639 }
640 if (pst.get_table_id() >= table_vec.size()) {
641 return cb.dbcb_resp_short(2, "tblnum");
642 }
643 TABLE *const table = table_vec[pst.get_table_id()].table;
644 handler *const hnd = table->file;
645 uchar *const buf = table->record[0];
646 empty_record(table);
647 memset(buf, 0, table->s->null_bytes); /* clear null flags */
648 const prep_stmt::fields_type& rf = pst.get_ret_fields();
649 const size_t n = std::min(rf.size(), fvalslen);
650 for (size_t i = 0; i < n; ++i) {
651 uint32_t fn = rf[i];
652 Field *const fld = table->field[fn];
653 if (fvals[i].begin() == 0) {
654 fld->set_null();
655 } else {
656 fld->store(fvals[i].begin(), fvals[i].size(), &my_charset_bin);
657 }
658 }
659 table->next_number_field = table->found_next_number_field;
660 /* FIXME: test */
661 const int r = hnd->ha_write_row(buf);
662 const ulonglong insert_id = table->file->insert_id_for_cur_row;
663 table->next_number_field = 0;
664 table_vec[pst.get_table_id()].modified = true;
665 if (r == 0 && table->found_next_number_field != 0) {
666 return cb.dbcb_resp_short_num64(0, insert_id);
667 }
668 if (r != 0) {
669 return cb.dbcb_resp_short_num(1, r);
670 }
671 return cb.dbcb_resp_short(0, "");
672 }
673
674 void
cmd_sql_internal(dbcallback_i & cb,const prep_stmt & pst,const string_ref * fvals,size_t fvalslen)675 dbcontext::cmd_sql_internal(dbcallback_i& cb, const prep_stmt& pst,
676 const string_ref *fvals, size_t fvalslen)
677 {
678 if (fvalslen < 1) {
679 return cb.dbcb_resp_short(2, "syntax");
680 }
681 return cb.dbcb_resp_short(2, "notimpl");
682 }
683
684 static size_t
prepare_keybuf(const cmd_exec_args & args,uchar * key_buf,TABLE * table,KEY & kinfo,size_t invalues_index)685 prepare_keybuf(const cmd_exec_args& args, uchar *key_buf, TABLE *table,
686 KEY& kinfo, size_t invalues_index)
687 {
688 size_t kplen_sum = 0;
689 DBG_KEY(fprintf(stderr, "SLOW\n"));
690 for (size_t i = 0; i < args.kvalslen; ++i) {
691 const KEY_PART_INFO & kpt = kinfo.key_part[i];
692 string_ref kval = args.kvals[i];
693 if (args.invalues_keypart >= 0 &&
694 static_cast<size_t>(args.invalues_keypart) == i) {
695 kval = args.invalues[invalues_index];
696 }
697 if (kval.begin() == 0) {
698 kpt.field->set_null();
699 } else {
700 kpt.field->set_notnull();
701 }
702 kpt.field->store(kval.begin(), kval.size(), &my_charset_bin);
703 kplen_sum += kpt.store_length;
704 DBG_KEYLEN(fprintf(stderr, "l=%u sl=%zu\n", kpt.length,
705 kpt.store_length));
706 }
707 key_copy(key_buf, table->record[0], &kinfo, kplen_sum);
708 DBG_KEYLEN(fprintf(stderr, "sum=%zu flen=%u\n", kplen_sum,
709 kinfo.key_length));
710 return kplen_sum;
711 }
712
713 void
cmd_find_internal(dbcallback_i & cb,const prep_stmt & pst,ha_rkey_function find_flag,const cmd_exec_args & args)714 dbcontext::cmd_find_internal(dbcallback_i& cb, const prep_stmt& pst,
715 ha_rkey_function find_flag, const cmd_exec_args& args)
716 {
717 const bool debug_out = (verbose_level >= 100);
718 bool need_resp_record = true;
719 char mod_op = 0;
720 const string_ref& mod_op_str = args.mod_op;
721 if (mod_op_str.size() != 0) {
722 if (!for_write_flag) {
723 return cb.dbcb_resp_short(2, "readonly");
724 }
725 mod_op = mod_op_str.begin()[0];
726 need_resp_record = mod_op_str.size() > 1 && mod_op_str.begin()[1] == '?';
727 switch (mod_op) {
728 case 'U': /* update */
729 case 'D': /* delete */
730 case '+': /* increment */
731 case '-': /* decrement */
732 break;
733 default:
734 if (debug_out) {
735 fprintf(stderr, "unknown modop: %c\n", mod_op);
736 }
737 return cb.dbcb_resp_short(2, "modop");
738 }
739 }
740 lock_tables_if();
741 if (lock == 0) {
742 return cb.dbcb_resp_short(1, "lock_tables");
743 }
744 if (pst.get_table_id() >= table_vec.size()) {
745 return cb.dbcb_resp_short(2, "tblnum");
746 }
747 TABLE *const table = table_vec[pst.get_table_id()].table;
748 /* keys */
749 if (pst.get_idxnum() >= table->s->keys) {
750 return cb.dbcb_resp_short(2, "idxnum");
751 }
752 KEY& kinfo = table->key_info[pst.get_idxnum()];
753 #if MYSQL_VERSION_ID >= 50600
754 if (args.kvalslen > kinfo.actual_key_parts) {
755 #else
756 if (args.kvalslen > kinfo.key_parts) {
757 #endif
758 return cb.dbcb_resp_short(2, "kpnum");
759 }
760 uchar *const key_buf = DENA_ALLOCA_ALLOCATE(uchar, kinfo.key_length);
761 size_t invalues_idx = 0;
762 size_t kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
763 /* filters */
764 uchar *filter_buf = 0;
765 if (args.filters != 0) {
766 const size_t filter_buf_len = calc_filter_buf_size(table, pst,
767 args.filters);
768 filter_buf = DENA_ALLOCA_ALLOCATE(uchar, filter_buf_len);
769 if (!fill_filter_buf(table, pst, args.filters, filter_buf,
770 filter_buf_len)) {
771 return cb.dbcb_resp_short(2, "filterblob");
772 }
773 }
774 /* handler */
775 table->read_set = &table->s->all_set;
776 handler *const hnd = table->file;
777 if (!for_write_flag) {
778 hnd->init_table_handle_for_HANDLER();
779 }
780 hnd->ha_index_or_rnd_end();
781 hnd->ha_index_init(pst.get_idxnum(), 1);
782 if (need_resp_record) {
783 cb.dbcb_resp_begin(pst.get_ret_fields().size());
784 }
785 const uint32_t limit = args.limit ? args.limit : 1;
786 uint32_t skip = args.skip;
787 size_t modified_count = 0;
788 int r = 0;
789 bool is_first = true;
790 for (uint32_t cnt = 0; cnt < limit + skip;) {
791 if (is_first) {
792 is_first = false;
793 const key_part_map kpm = (1U << args.kvalslen) - 1;
794 #if MYSQL_VERSION_ID >= 50611
795 r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
796 #else
797 r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
798 #endif
799 } else if (args.invalues_keypart >= 0) {
800 if (++invalues_idx >= args.invalueslen) {
801 break;
802 }
803 kplen_sum = prepare_keybuf(args, key_buf, table, kinfo, invalues_idx);
804 const key_part_map kpm = (1U << args.kvalslen) - 1;
805 #if MYSQL_VERSION_ID >= 50611
806 r = hnd->ha_index_read_map(table->record[0], key_buf, kpm, find_flag);
807 #else
808 r = hnd->index_read_map(table->record[0], key_buf, kpm, find_flag);
809 #endif
810 } else {
811 switch (find_flag) {
812 case HA_READ_BEFORE_KEY:
813 case HA_READ_KEY_OR_PREV:
814 #if MYSQL_VERSION_ID >= 50600
815 r = hnd->ha_index_prev(table->record[0]);
816 #else
817 r = hnd->index_prev(table->record[0]);
818 #endif
819 break;
820 case HA_READ_AFTER_KEY:
821 case HA_READ_KEY_OR_NEXT:
822 #if MYSQL_VERSION_ID >= 50600
823 r = hnd->ha_index_next(table->record[0]);
824 #else
825 r = hnd->index_next(table->record[0]);
826 #endif
827 break;
828 case HA_READ_KEY_EXACT:
829 #if MYSQL_VERSION_ID >= 50611
830 r = hnd->ha_index_next_same(table->record[0], key_buf, kplen_sum);
831 #else
832 r = hnd->index_next_same(table->record[0], key_buf, kplen_sum);
833 #endif
834 break;
835 default:
836 r = HA_ERR_END_OF_FILE; /* to finish the loop */
837 break;
838 }
839 }
840 if (debug_out) {
841 fprintf(stderr, "r=%d\n", r);
842 if (r == 0 || r == HA_ERR_RECORD_DELETED) {
843 dump_record(cb, table, pst);
844 }
845 }
846 int filter_res = 0;
847 if (r != 0) {
848 /* no-count */
849 } else if (args.filters != 0 && (filter_res = check_filter(cb, table,
850 pst, args.filters, filter_buf)) != 0) {
851 if (filter_res < 0) {
852 break;
853 }
854 } else if (skip > 0) {
855 --skip;
856 } else {
857 /* hit */
858 if (need_resp_record) {
859 resp_record(cb, table, pst);
860 }
861 if (mod_op != 0) {
862 r = modify_record(cb, table, pst, args, mod_op, modified_count);
863 }
864 ++cnt;
865 }
866 if (args.invalues_keypart >= 0 && r == HA_ERR_KEY_NOT_FOUND) {
867 continue;
868 }
869 if (r != 0 && r != HA_ERR_RECORD_DELETED) {
870 break;
871 }
872 }
873 hnd->ha_index_or_rnd_end();
874 if (r != 0 && r != HA_ERR_RECORD_DELETED && r != HA_ERR_KEY_NOT_FOUND &&
875 r != HA_ERR_END_OF_FILE) {
876 /* failed */
877 if (need_resp_record) {
878 /* revert dbcb_resp_begin() and dbcb_resp_entry() */
879 cb.dbcb_resp_cancel();
880 }
881 cb.dbcb_resp_short_num(1, r);
882 } else {
883 /* succeeded */
884 if (need_resp_record) {
885 cb.dbcb_resp_end();
886 } else {
887 cb.dbcb_resp_short_num(0, modified_count);
888 }
889 }
890 DENA_ALLOCA_FREE(filter_buf);
891 DENA_ALLOCA_FREE(key_buf);
892 }
893
894 size_t
895 dbcontext::calc_filter_buf_size(TABLE *table, const prep_stmt& pst,
896 const record_filter *filters)
897 {
898 size_t filter_buf_len = 0;
899 for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
900 if (f->val.begin() == 0) {
901 continue;
902 }
903 const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
904 filter_buf_len += table->field[fn]->pack_length();
905 }
906 ++filter_buf_len;
907 /* Field_medium::cmp() calls uint3korr(), which may read 4 bytes.
908 Allocate 1 more byte for safety. */
909 return filter_buf_len;
910 }
911
912 bool
913 dbcontext::fill_filter_buf(TABLE *table, const prep_stmt& pst,
914 const record_filter *filters, uchar *filter_buf, size_t len)
915 {
916 memset(filter_buf, 0, len);
917 size_t pos = 0;
918 for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
919 if (f->val.begin() == 0) {
920 continue;
921 }
922 const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
923 Field *const fld = table->field[fn];
924 if ((fld->flags & BLOB_FLAG) != 0) {
925 return false;
926 }
927 fld->store(f->val.begin(), f->val.size(), &my_charset_bin);
928 const size_t packlen = fld->pack_length();
929 memcpy(filter_buf + pos, fld->ptr, packlen);
930 pos += packlen;
931 }
932 return true;
933 }
934
935 int
936 dbcontext::check_filter(dbcallback_i& cb, TABLE *table, const prep_stmt& pst,
937 const record_filter *filters, const uchar *filter_buf)
938 {
939 DBG_FILTER(fprintf(stderr, "check_filter\n"));
940 size_t pos = 0;
941 for (const record_filter *f = filters; f->op.begin() != 0; ++f) {
942 const string_ref& op = f->op;
943 const string_ref& val = f->val;
944 const uint32_t fn = pst.get_filter_fields()[f->ff_offset];
945 Field *const fld = table->field[fn];
946 const size_t packlen = fld->pack_length();
947 const uchar *const bval = filter_buf + pos;
948 int cv = 0;
949 if (fld->is_null()) {
950 cv = (val.begin() == 0) ? 0 : -1;
951 } else {
952 cv = (val.begin() == 0) ? 1 : fld->cmp(bval);
953 }
954 DBG_FILTER(fprintf(stderr, "check_filter cv=%d\n", cv));
955 bool cond = true;
956 if (op.size() == 1) {
957 switch (op.begin()[0]) {
958 case '>':
959 DBG_FILTER(fprintf(stderr, "check_filter op: >\n"));
960 cond = (cv > 0);
961 break;
962 case '<':
963 DBG_FILTER(fprintf(stderr, "check_filter op: <\n"));
964 cond = (cv < 0);
965 break;
966 case '=':
967 DBG_FILTER(fprintf(stderr, "check_filter op: =\n"));
968 cond = (cv == 0);
969 break;
970 default:
971 DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
972 cond = false; /* FIXME: error */
973 break;
974 }
975 } else if (op.size() == 2 && op.begin()[1] == '=') {
976 switch (op.begin()[0]) {
977 case '>':
978 DBG_FILTER(fprintf(stderr, "check_filter op: >=\n"));
979 cond = (cv >= 0);
980 break;
981 case '<':
982 DBG_FILTER(fprintf(stderr, "check_filter op: <=\n"));
983 cond = (cv <= 0);
984 break;
985 case '!':
986 DBG_FILTER(fprintf(stderr, "check_filter op: !=\n"));
987 cond = (cv != 0);
988 break;
989 default:
990 DBG_FILTER(fprintf(stderr, "check_filter op: unknown\n"));
991 cond = false; /* FIXME: error */
992 break;
993 }
994 }
995 DBG_FILTER(fprintf(stderr, "check_filter cond: %d\n", (int)cond));
996 if (!cond) {
997 return (f->filter_type == record_filter_type_skip) ? 1 : -1;
998 }
999 if (val.begin() != 0) {
1000 pos += packlen;
1001 }
1002 }
1003 return 0;
1004 }
1005
1006 void
1007 dbcontext::cmd_open(dbcallback_i& cb, const cmd_open_args& arg)
1008 {
1009 unlock_tables_if();
1010 const table_name_type k = std::make_pair(std::string(arg.dbn),
1011 std::string(arg.tbl));
1012 const table_map_type::const_iterator iter = table_map.find(k);
1013 uint32_t tblnum = 0;
1014 if (iter != table_map.end()) {
1015 tblnum = iter->second;
1016 DBG_CMP(fprintf(stderr, "HNDSOCK k=%s tblnum=%d\n", k.c_str(),
1017 (int)tblnum));
1018 } else {
1019 TABLE_LIST tables;
1020 TABLE *table = 0;
1021 bool refresh = true;
1022 const thr_lock_type lock_type = for_write_flag ? TL_WRITE : TL_READ;
1023 #if MYSQL_VERSION_ID >= 50505
1024 tables.init_one_table(arg.dbn, strlen(arg.dbn), arg.tbl, strlen(arg.tbl),
1025 arg.tbl, lock_type);
1026 tables.mdl_request.init(MDL_key::TABLE, arg.dbn, arg.tbl,
1027 for_write_flag ? MDL_SHARED_WRITE : MDL_SHARED_READ, MDL_TRANSACTION);
1028 Open_table_context ot_act(thd, 0);
1029 #if MYSQL_VERSION_ID >= 50600
1030 if (!open_table(thd, &tables, &ot_act)) {
1031 #else
1032 if (!open_table(thd, &tables, thd->mem_root, &ot_act)) {
1033 #endif
1034 table = tables.table;
1035 }
1036 #else
1037 tables.init_one_table(arg.dbn, arg.tbl, lock_type);
1038 table = open_table(thd, &tables, thd->mem_root, &refresh,
1039 OPEN_VIEW_NO_PARSE);
1040 #endif
1041 if (table == 0) {
1042 DENA_VERBOSE(20, fprintf(stderr,
1043 "HNDSOCK failed to open %p [%s] [%s] [%d]\n",
1044 thd, arg.dbn, arg.tbl, static_cast<int>(refresh)));
1045 return cb.dbcb_resp_short(1, "open_table");
1046 }
1047 statistic_increment(open_tables_count, &LOCK_status);
1048 table->reginfo.lock_type = lock_type;
1049 table->use_all_columns();
1050 tblnum = table_vec.size();
1051 tablevec_entry e;
1052 e.table = table;
1053 table_vec.push_back(e);
1054 table_map[k] = tblnum;
1055 }
1056 size_t idxnum = static_cast<size_t>(-1);
1057 if (arg.idx[0] >= '0' && arg.idx[0] <= '9') {
1058 /* numeric */
1059 TABLE *const table = table_vec[tblnum].table;
1060 idxnum = atoi(arg.idx);
1061 if (idxnum >= table->s->keys) {
1062 return cb.dbcb_resp_short(2, "idxnum");
1063 }
1064 } else {
1065 const char *const idx_name_to_open =
1066 arg.idx[0] == '\0' ? "PRIMARY" : arg.idx;
1067 TABLE *const table = table_vec[tblnum].table;
1068 for (uint i = 0; i < table->s->keys; ++i) {
1069 KEY& kinfo = table->key_info[i];
1070 if (strcmp(kinfo.name, idx_name_to_open) == 0) {
1071 idxnum = i;
1072 break;
1073 }
1074 }
1075 }
1076 if (idxnum == size_t(-1)) {
1077 return cb.dbcb_resp_short(2, "idxnum");
1078 }
1079 prep_stmt::fields_type rf;
1080 prep_stmt::fields_type ff;
1081 if (!parse_fields(table_vec[tblnum].table, arg.retflds, rf)) {
1082 return cb.dbcb_resp_short(2, "fld");
1083 }
1084 if (!parse_fields(table_vec[tblnum].table, arg.filflds, ff)) {
1085 return cb.dbcb_resp_short(2, "fld");
1086 }
1087 prep_stmt p(this, tblnum, idxnum, rf, ff);
1088 cb.dbcb_set_prep_stmt(arg.pst_id, p);
1089 return cb.dbcb_resp_short(0, "");
1090 }
1091
1092 bool
1093 dbcontext::parse_fields(TABLE *const table, const char *str,
1094 prep_stmt::fields_type& flds)
1095 {
1096 string_ref flds_sr(str, strlen(str));
1097 std::vector<string_ref> fldnms;
1098 if (flds_sr.size() != 0) {
1099 split(',', flds_sr, fldnms);
1100 }
1101 for (size_t i = 0; i < fldnms.size(); ++i) {
1102 Field **fld = 0;
1103 size_t j = 0;
1104 for (fld = table->field; *fld; ++fld, ++j) {
1105 DBG_FLD(fprintf(stderr, "f %s\n", (*fld)->field_name));
1106 string_ref fn((*fld)->field_name, strlen((*fld)->field_name));
1107 if (fn == fldnms[i]) {
1108 break;
1109 }
1110 }
1111 if (*fld == 0) {
1112 DBG_FLD(fprintf(stderr, "UNKNOWN FLD %s [%s]\n", retflds,
1113 std::string(fldnms[i].begin(), fldnms[i].size()).c_str()));
1114 return false;
1115 }
1116 DBG_FLD(fprintf(stderr, "FLD %s %zu\n", (*fld)->field_name, j));
1117 flds.push_back(j);
1118 }
1119 return true;
1120 }
1121
1122 enum db_write_op {
1123 db_write_op_none = 0,
1124 db_write_op_insert = 1,
1125 db_write_op_sql = 2,
1126 };
1127
1128 void
1129 dbcontext::cmd_exec(dbcallback_i& cb, const cmd_exec_args& args)
1130 {
1131 const prep_stmt& p = *args.pst;
1132 if (p.get_table_id() == static_cast<size_t>(-1)) {
1133 return cb.dbcb_resp_short(2, "stmtnum");
1134 }
1135 ha_rkey_function find_flag = HA_READ_KEY_EXACT;
1136 db_write_op wrop = db_write_op_none;
1137 if (args.op.size() == 1) {
1138 switch (args.op.begin()[0]) {
1139 case '=':
1140 find_flag = HA_READ_KEY_EXACT;
1141 break;
1142 case '>':
1143 find_flag = HA_READ_AFTER_KEY;
1144 break;
1145 case '<':
1146 find_flag = HA_READ_BEFORE_KEY;
1147 break;
1148 case '+':
1149 wrop = db_write_op_insert;
1150 break;
1151 case 'S':
1152 wrop = db_write_op_sql;
1153 break;
1154 default:
1155 return cb.dbcb_resp_short(2, "op");
1156 }
1157 } else if (args.op.size() == 2 && args.op.begin()[1] == '=') {
1158 switch (args.op.begin()[0]) {
1159 case '>':
1160 find_flag = HA_READ_KEY_OR_NEXT;
1161 break;
1162 case '<':
1163 find_flag = HA_READ_KEY_OR_PREV;
1164 break;
1165 default:
1166 return cb.dbcb_resp_short(2, "op");
1167 }
1168 } else {
1169 return cb.dbcb_resp_short(2, "op");
1170 }
1171 if (args.kvalslen <= 0) {
1172 return cb.dbcb_resp_short(2, "klen");
1173 }
1174 switch (wrop) {
1175 case db_write_op_none:
1176 return cmd_find_internal(cb, p, find_flag, args);
1177 case db_write_op_insert:
1178 return cmd_insert_internal(cb, p, args.kvals, args.kvalslen);
1179 case db_write_op_sql:
1180 return cmd_sql_internal(cb, p, args.kvals, args.kvalslen);
1181 }
1182 }
1183
1184 void
1185 dbcontext::set_statistics(size_t num_conns, size_t num_active)
1186 {
1187 thd_proc_info(thd, &info_message_buf[0]);
1188 if (for_write_flag) {
1189 set_thread_message("handlersocket: mode=wr, %zu conns, %zu active",
1190 num_conns, num_active);
1191 } else {
1192 set_thread_message("handlersocket: mode=rd, %zu conns, %zu active",
1193 num_conns, num_active);
1194 }
1195 }
1196
1197 };
1198
1199