1
2 // vim:sw=2:ai
3
4 /*
5 * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved.
6 * Copyright (C) 2011-2017 Kentoku SHIBA
7 * See COPYRIGHT.txt for details.
8 */
9
10 #include <my_global.h>
11 #include "mysql_version.h"
12 #include "hs_compat.h"
13 #if MYSQL_VERSION_ID < 50500
14 #include "mysql_priv.h"
15 #include <mysql/plugin.h>
16 #else
17 #include "sql_priv.h"
18 #include "probes_mysql.h"
19 #include "sql_class.h"
20 #endif
21
22 #include "hstcpcli.hpp"
23 #include "auto_file.hpp"
24 #include "string_util.hpp"
25 #include "auto_addrinfo.hpp"
26 #include "escape.hpp"
27 #include "util.hpp"
28
29 /* TODO */
30 #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
31 #define MSG_NOSIGNAL 0
32 #endif
33
34 #define DBG(x)
35
36 namespace dena {
37
hstresult()38 hstresult::hstresult()
39 {
40 SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16,
41 MYF(MY_WME));
42 }
43
~hstresult()44 hstresult::~hstresult()
45 {
46 delete_dynamic(&flds);
47 }
48
49 struct hstcpcli : public hstcpcli_i, private noncopyable {
50 hstcpcli(const socket_args& args);
51 virtual ~hstcpcli();
52 virtual void close();
53 virtual int reconnect();
54 virtual bool stable_point();
55 virtual void request_buf_open_index(size_t pst_id, const char *dbn,
56 const char *tbl, const char *idx, const char *retflds, const char *filflds);
57 virtual void request_buf_auth(const char *secret, const char *typ);
58 virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
59 const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
60 const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
61 const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
62 const string_ref *invalues, size_t invalueslen);
63 virtual size_t request_buf_append(const char *start, const char *finish);
64 virtual void request_reset();
65 virtual int request_send();
66 virtual int response_recv(size_t& num_flds_r);
67 virtual int get_result(hstresult& result);
68 virtual const string_ref *get_next_row();
69 virtual const string_ref *get_next_row_from_result(hstresult& result);
70 virtual size_t get_row_size();
71 virtual size_t get_row_size_from_result(hstresult& result);
72 virtual void response_buf_remove();
73 virtual int get_error_code();
74 virtual String& get_error();
75 virtual void clear_error();
76 virtual int set_timeout(int send_timeout, int recv_timeout);
get_num_req_bufddena::hstcpcli77 virtual size_t get_num_req_bufd() { return num_req_bufd; }
get_num_req_sentdena::hstcpcli78 virtual size_t get_num_req_sent() { return num_req_sent; }
get_num_req_rcvddena::hstcpcli79 virtual size_t get_num_req_rcvd() { return num_req_rcvd; }
get_response_end_offsetdena::hstcpcli80 virtual size_t get_response_end_offset() { return response_end_offset; }
get_readbuf_begindena::hstcpcli81 virtual const char *get_readbuf_begin() { return readbuf.begin(); }
get_readbuf_enddena::hstcpcli82 virtual const char *get_readbuf_end() { return readbuf.end(); }
get_writebuf_begindena::hstcpcli83 virtual const char *get_writebuf_begin() { return writebuf.begin(); }
get_writebuf_sizedena::hstcpcli84 virtual size_t get_writebuf_size() { return writebuf.size(); }
85 virtual void write_error_to_log(const char *func_name, const char *file_name,
86 ulong line_no);
87 private:
88 int read_more();
89 int set_error(int code, const String& str);
90 int set_error(int code, const char *str);
91 private:
92 auto_file fd;
93 socket_args sargs;
94 string_buffer readbuf;
95 string_buffer writebuf;
96 size_t response_end_offset; /* incl newline */
97 size_t cur_row_offset;
98 size_t cur_row_size;
99 size_t num_flds;
100 size_t num_req_bufd; /* buffered but not yet sent */
101 size_t num_req_sent; /* sent but not yet received */
102 size_t num_req_rcvd; /* received but not yet removed */
103 int error_code;
104 String error_str;
105 DYNAMIC_ARRAY flds;
106 int errno_buf;
107 };
108
hstcpcli(const socket_args & args)109 hstcpcli::hstcpcli(const socket_args& args)
110 : sargs(args), response_end_offset(0), cur_row_offset(0), cur_row_size(0),
111 num_flds(0), num_req_bufd(0), num_req_sent(0), num_req_rcvd(0),
112 error_code(0), errno_buf(0)
113 {
114 String err;
115 SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME));
116 if (socket_connect(fd, sargs, err) != 0) {
117 set_error(-1, err);
118 }
119 }
120
~hstcpcli()121 hstcpcli::~hstcpcli()
122 {
123 delete_dynamic(&flds);
124 }
125
126 void
close()127 hstcpcli::close()
128 {
129 fd.close();
130 readbuf.clear();
131 writebuf.clear();
132 response_end_offset = 0;
133 cur_row_offset = 0;
134 num_flds = 0;
135 num_req_bufd = 0;
136 num_req_sent = 0;
137 num_req_rcvd = 0;
138 }
139
140 int
reconnect()141 hstcpcli::reconnect()
142 {
143 clear_error();
144 close();
145 String err;
146 if (socket_connect(fd, sargs, err) != 0) {
147 set_error(-1, err);
148 }
149 return error_code;
150 }
151
152 int
set_timeout(int send_timeout,int recv_timeout)153 hstcpcli::set_timeout(int send_timeout, int recv_timeout)
154 {
155 String err;
156 sargs.send_timeout = send_timeout;
157 sargs.recv_timeout = recv_timeout;
158 if (socket_set_timeout(fd, sargs, err) != 0) {
159 set_error(-1, err);
160 }
161 return error_code;
162 }
163
164 bool
stable_point()165 hstcpcli::stable_point()
166 {
167 /* returns true if cli can send a new request */
168 return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
169 num_req_rcvd == 0 && response_end_offset == 0;
170 }
171
172 int
get_error_code()173 hstcpcli::get_error_code()
174 {
175 return error_code;
176 }
177
178 String&
get_error()179 hstcpcli::get_error()
180 {
181 return error_str;
182 }
183
184 int
read_more()185 hstcpcli::read_more()
186 {
187 const size_t block_size = 4096; // FIXME
188 char *const wp = readbuf.make_space(block_size);
189 int rlen;
190 errno = 0;
191 while ((rlen = read(fd.get(), wp, block_size)) <= 0) {
192 errno_buf = errno;
193 if (rlen < 0) {
194 if (errno == EINTR || errno == EAGAIN)
195 {
196 errno = 0;
197 continue;
198 }
199 error_str = String("read: failed", &my_charset_bin);
200 } else {
201 error_str = String("read: eof", &my_charset_bin);
202 }
203 return rlen;
204 }
205 readbuf.space_wrote(rlen);
206 return rlen;
207 }
208
209 void
clear_error()210 hstcpcli::clear_error()
211 {
212 DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
213 error_code = 0;
214 error_str.length(0);
215 }
216
217 int
set_error(int code,const String & str)218 hstcpcli::set_error(int code, const String& str)
219 {
220 DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
221 error_code = code;
222 error_str = str;
223 return error_code;
224 }
225
226 int
set_error(int code,const char * str)227 hstcpcli::set_error(int code, const char *str)
228 {
229 uint32 str_len = strlen(str);
230 DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
231 error_code = code;
232 error_str.length(0);
233 if (error_str.reserve(str_len + 1))
234 return 0;
235 error_str.q_append(str, str_len);
236 error_str.c_ptr_safe();
237 return error_code;
238 }
239
240 void
request_buf_open_index(size_t pst_id,const char * dbn,const char * tbl,const char * idx,const char * retflds,const char * filflds)241 hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
242 const char *tbl, const char *idx, const char *retflds, const char *filflds)
243 {
244 /*
245 if (num_req_sent > 0 || num_req_rcvd > 0) {
246 */
247 if (num_req_rcvd > 0) {
248 close();
249 set_error(-1, "request_buf_open_index: protocol out of sync");
250 return;
251 }
252 const string_ref dbn_ref(dbn, strlen(dbn));
253 const string_ref tbl_ref(tbl, strlen(tbl));
254 const string_ref idx_ref(idx, strlen(idx));
255 const string_ref rfs_ref(retflds, strlen(retflds));
256 writebuf.append_literal("P\t");
257 append_uint32(writebuf, pst_id); // FIXME size_t ?
258 writebuf.append_literal("\t");
259 writebuf.append(dbn_ref.begin(), dbn_ref.end());
260 writebuf.append_literal("\t");
261 writebuf.append(tbl_ref.begin(), tbl_ref.end());
262 writebuf.append_literal("\t");
263 writebuf.append(idx_ref.begin(), idx_ref.end());
264 writebuf.append_literal("\t");
265 writebuf.append(rfs_ref.begin(), rfs_ref.end());
266 if (filflds != 0) {
267 const string_ref fls_ref(filflds, strlen(filflds));
268 writebuf.append_literal("\t");
269 writebuf.append(fls_ref.begin(), fls_ref.end());
270 }
271 writebuf.append_literal("\n");
272 ++num_req_bufd;
273 }
274
275 void
request_buf_auth(const char * secret,const char * typ)276 hstcpcli::request_buf_auth(const char *secret, const char *typ)
277 {
278 /*
279 if (num_req_sent > 0 || num_req_rcvd > 0) {
280 */
281 if (num_req_rcvd > 0) {
282 close();
283 set_error(-1, "request_buf_auth: protocol out of sync");
284 return;
285 }
286 if (typ == 0) {
287 typ = "1";
288 }
289 const string_ref typ_ref(typ, strlen(typ));
290 const string_ref secret_ref(secret, strlen(secret));
291 writebuf.append_literal("A\t");
292 writebuf.append(typ_ref.begin(), typ_ref.end());
293 writebuf.append_literal("\t");
294 writebuf.append(secret_ref.begin(), secret_ref.end());
295 writebuf.append_literal("\n");
296 ++num_req_bufd;
297 }
298
299 namespace {
300
301 void
append_delim_value(string_buffer & buf,const char * start,const char * finish)302 append_delim_value(string_buffer& buf, const char *start, const char *finish)
303 {
304 if (start == 0) {
305 /* null */
306 const char t[] = "\t\0";
307 buf.append(t, t + 2);
308 } else {
309 /* non-null */
310 buf.append_literal("\t");
311 escape_string(buf, start, finish);
312 }
313 }
314
315 };
316
317 void
request_buf_exec_generic(size_t pst_id,const string_ref & op,const string_ref * kvs,size_t kvslen,uint32 limit,uint32 skip,const string_ref & mod_op,const string_ref * mvs,size_t mvslen,const hstcpcli_filter * fils,size_t filslen,int invalues_keypart,const string_ref * invalues,size_t invalueslen)318 hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
319 const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
320 const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
321 const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
322 const string_ref *invalues, size_t invalueslen)
323 {
324 /*
325 if (num_req_sent > 0 || num_req_rcvd > 0) {
326 */
327 if (num_req_rcvd > 0) {
328 close();
329 set_error(-1, "request_buf_exec_generic: protocol out of sync");
330 return;
331 }
332 append_uint32(writebuf, pst_id); // FIXME size_t ?
333 writebuf.append_literal("\t");
334 writebuf.append(op.begin(), op.end());
335 writebuf.append_literal("\t");
336 append_uint32(writebuf, kvslen); // FIXME size_t ?
337 for (size_t i = 0; i < kvslen; ++i) {
338 const string_ref& kv = kvs[i];
339 append_delim_value(writebuf, kv.begin(), kv.end());
340 }
341 if (limit != 0 || skip != 0 || invalues_keypart >= 0 ||
342 mod_op.size() != 0 || filslen != 0) {
343 /* has more option */
344 writebuf.append_literal("\t");
345 append_uint32(writebuf, limit); // FIXME size_t ?
346 if (skip != 0 || invalues_keypart >= 0 ||
347 mod_op.size() != 0 || filslen != 0) {
348 writebuf.append_literal("\t");
349 append_uint32(writebuf, skip); // FIXME size_t ?
350 }
351 if (invalues_keypart >= 0) {
352 writebuf.append_literal("\t@\t");
353 append_uint32(writebuf, invalues_keypart);
354 writebuf.append_literal("\t");
355 append_uint32(writebuf, invalueslen);
356 for (size_t i = 0; i < invalueslen; ++i) {
357 const string_ref& s = invalues[i];
358 append_delim_value(writebuf, s.begin(), s.end());
359 }
360 }
361 for (size_t i = 0; i < filslen; ++i) {
362 const hstcpcli_filter& f = fils[i];
363 writebuf.append_literal("\t");
364 writebuf.append(f.filter_type.begin(), f.filter_type.end());
365 writebuf.append_literal("\t");
366 writebuf.append(f.op.begin(), f.op.end());
367 writebuf.append_literal("\t");
368 append_uint32(writebuf, f.ff_offset);
369 append_delim_value(writebuf, f.val.begin(), f.val.end());
370 }
371 if (mod_op.size() != 0) {
372 writebuf.append_literal("\t");
373 writebuf.append(mod_op.begin(), mod_op.end());
374 for (size_t i = 0; i < mvslen; ++i) {
375 const string_ref& mv = mvs[i];
376 append_delim_value(writebuf, mv.begin(), mv.end());
377 }
378 }
379 }
380 writebuf.append_literal("\n");
381 ++num_req_bufd;
382 }
383
384 size_t
request_buf_append(const char * start,const char * finish)385 hstcpcli::request_buf_append(const char *start, const char *finish)
386 {
387 /*
388 if (num_req_sent > 0 || num_req_rcvd > 0) {
389 */
390 if (num_req_rcvd > 0) {
391 close();
392 set_error(-1, "request_buf_append: protocol out of sync");
393 return 0;
394 }
395 const char *nl = start;
396 size_t num_req = 0;
397 while ((nl = memchr_char(nl, '\n', finish - nl))) {
398 if (nl == finish)
399 break;
400 num_req++;
401 nl++;
402 }
403 num_req++;
404 writebuf.append(start, finish);
405 if (*(finish - 1) != '\n')
406 writebuf.append_literal("\n");
407 num_req_bufd += num_req;
408 return num_req;
409 }
410
411 void
request_reset()412 hstcpcli::request_reset()
413 {
414 if (num_req_bufd) {
415 writebuf.erase_front(writebuf.size());
416 num_req_bufd = 0;
417 }
418 }
419
420 int
request_send()421 hstcpcli::request_send()
422 {
423 if (error_code < 0) {
424 return error_code;
425 }
426 clear_error();
427 if (fd.get() < 0) {
428 close();
429 return set_error(-1, "write: closed");
430 }
431 /*
432 if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
433 */
434 if (num_req_bufd == 0 || num_req_rcvd > 0) {
435 close();
436 return set_error(-1, "request_send: protocol out of sync");
437 }
438 const size_t wrlen = writebuf.size();
439 const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
440 if (r <= 0) {
441 close();
442 return set_error(-1, r < 0 ? "write: failed" : "write: eof");
443 }
444 writebuf.erase_front(r);
445 if (static_cast<size_t>(r) != wrlen) {
446 close();
447 return set_error(-1, "write: incomplete");
448 }
449 num_req_sent += num_req_bufd;
450 num_req_bufd = 0;
451 DBG(fprintf(stderr, "REQSEND 0\n"));
452 return 0;
453 }
454
455 int
response_recv(size_t & num_flds_r)456 hstcpcli::response_recv(size_t& num_flds_r)
457 {
458 if (error_code < 0) {
459 return error_code;
460 }
461 clear_error();
462 if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
463 response_end_offset != 0) {
464 close();
465 return set_error(-1, "response_recv: protocol out of sync");
466 }
467 cur_row_offset = 0;
468 num_flds_r = num_flds = 0;
469 if (fd.get() < 0) {
470 return set_error(-1, "read: closed");
471 }
472 size_t offset = 0;
473 while (true) {
474 const char *const lbegin = readbuf.begin() + offset;
475 const char *const lend = readbuf.end();
476 if (lbegin < lend)
477 {
478 const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
479 if (nl != 0) {
480 offset += (nl + 1) - lbegin;
481 break;
482 }
483 offset += lend - lbegin;
484 }
485 if (read_more() <= 0) {
486 close();
487 error_code = -1;
488 return error_code;
489 }
490 }
491 response_end_offset = offset;
492 --num_req_sent;
493 ++num_req_rcvd;
494 char *start = readbuf.begin();
495 char *const finish = start + response_end_offset - 1;
496 const size_t resp_code = read_ui32(start, finish);
497 skip_one(start, finish);
498 num_flds_r = num_flds = read_ui32(start, finish);
499 if (resp_code != 0) {
500 skip_one(start, finish);
501 char *const err_begin = start;
502 read_token(start, finish);
503 char *const err_end = start;
504 String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin);
505 if (!e.length()) {
506 e = String("unknown_error", &my_charset_bin);
507 }
508 return set_error(resp_code, e);
509 }
510 cur_row_size = 0;
511 cur_row_offset = start - readbuf.begin();
512 DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
513 String(readbuf.begin(), readbuf.begin() + response_end_offset)
514 .c_str(),
515 cur_row_offset, response_end_offset));
516 DBG(fprintf(stderr, "RES 0\n"));
517 if (flds.max_element < num_flds)
518 {
519 if (allocate_dynamic(&flds, num_flds))
520 return set_error(-1, "out of memory");
521 }
522 flds.elements = num_flds;
523 return 0;
524 }
525
526 int
get_result(hstresult & result)527 hstcpcli::get_result(hstresult& result)
528 {
529 /*
530 readbuf.swap(result.readbuf);
531 */
532 char *const wp = result.readbuf.make_space(response_end_offset);
533 memcpy(wp, readbuf.begin(), response_end_offset);
534 result.readbuf.space_wrote(response_end_offset);
535 result.response_end_offset = response_end_offset;
536 result.num_flds = num_flds;
537 result.cur_row_size = cur_row_size;
538 result.cur_row_offset = cur_row_offset;
539 if (result.flds.max_element < num_flds)
540 {
541 if (allocate_dynamic(&result.flds, num_flds))
542 return set_error(-1, "out of memory");
543 }
544 result.flds.elements = num_flds;
545 return 0;
546 }
547
548 const string_ref *
get_next_row()549 hstcpcli::get_next_row()
550 {
551 if (num_flds == 0 || flds.elements < num_flds) {
552 DBG(fprintf(stderr, "GNR NF 0\n"));
553 return 0;
554 }
555 char *start = readbuf.begin() + cur_row_offset;
556 char *const finish = readbuf.begin() + response_end_offset - 1;
557 if (start >= finish) { /* start[0] == nl */
558 DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
559 return 0;
560 }
561 for (size_t i = 0; i < num_flds; ++i) {
562 skip_one(start, finish);
563 char *const fld_begin = start;
564 read_token(start, finish);
565 char *const fld_end = start;
566 char *wp = fld_begin;
567 if (is_null_expression(fld_begin, fld_end)) {
568 /* null */
569 ((string_ref *) flds.buffer)[i] = string_ref();
570 } else {
571 unescape_string(wp, fld_begin, fld_end); /* in-place */
572 ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp);
573 }
574 }
575 cur_row_size = start - (readbuf.begin() + cur_row_offset);
576 cur_row_offset = start - readbuf.begin();
577 return (string_ref *) flds.buffer;
578 }
579
580 const string_ref *
get_next_row_from_result(hstresult & result)581 hstcpcli::get_next_row_from_result(hstresult& result)
582 {
583 if (result.num_flds == 0 || result.flds.elements < result.num_flds) {
584 DBG(fprintf(stderr, "GNR NF 0\n"));
585 return 0;
586 }
587 char *start = result.readbuf.begin() + result.cur_row_offset;
588 char *const finish = result.readbuf.begin() + result.response_end_offset - 1;
589 if (start >= finish) { /* start[0] == nl */
590 DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
591 return 0;
592 }
593 for (size_t i = 0; i < result.num_flds; ++i) {
594 skip_one(start, finish);
595 char *const fld_begin = start;
596 read_token(start, finish);
597 char *const fld_end = start;
598 char *wp = fld_begin;
599 if (is_null_expression(fld_begin, fld_end)) {
600 /* null */
601 ((string_ref *) result.flds.buffer)[i] = string_ref();
602 } else {
603 unescape_string(wp, fld_begin, fld_end); /* in-place */
604 ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp);
605 }
606 }
607 result.cur_row_size =
608 start - (result.readbuf.begin() + result.cur_row_offset);
609 result.cur_row_offset = start - result.readbuf.begin();
610 return (string_ref *) result.flds.buffer;
611 }
612
613 size_t
get_row_size()614 hstcpcli::get_row_size()
615 {
616 return cur_row_size;
617 }
618
619 size_t
get_row_size_from_result(hstresult & result)620 hstcpcli::get_row_size_from_result(hstresult& result)
621 {
622 return result.cur_row_size;
623 }
624
625 void
response_buf_remove()626 hstcpcli::response_buf_remove()
627 {
628 if (response_end_offset == 0) {
629 close();
630 set_error(-1, "response_buf_remove: protocol out of sync");
631 return;
632 }
633 readbuf.erase_front(response_end_offset);
634 response_end_offset = 0;
635 --num_req_rcvd;
636 cur_row_offset = 0;
637 num_flds = 0;
638 }
639
640 void
write_error_to_log(const char * func_name,const char * file_name,ulong line_no)641 hstcpcli::write_error_to_log(
642 const char *func_name,
643 const char *file_name,
644 ulong line_no
645 ) {
646 if (errno_buf) {
647 time_t cur_time = (time_t) time((time_t*) 0);
648 struct tm lt;
649 struct tm *l_time = localtime_r(&cur_time, <);
650 fprintf(stderr,
651 "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]"
652 " [%s][%s][%lu] errno=%d\n",
653 l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday,
654 l_time->tm_hour, l_time->tm_min, l_time->tm_sec,
655 error_code, error_str.c_ptr_safe(),
656 func_name, file_name, line_no, errno_buf);
657 }
658 }
659
660 hstcpcli_ptr
create(const socket_args & args)661 hstcpcli_i::create(const socket_args& args)
662 {
663 return hstcpcli_ptr(new hstcpcli(args));
664 }
665
666 };
667
668