1 
2 // vim:sw=2:ai
3 
4 /*
5  * Copyright (C) 2010-2011 DeNA Co.,Ltd.. All rights reserved.
6  * Copyright (C) 2011-2017 Kentoku SHIBA
7  * See COPYRIGHT.txt for details.
8  */
9 
10 #include <my_global.h>
11 #include "mysql_version.h"
12 #include "hs_compat.h"
13 #if MYSQL_VERSION_ID < 50500
14 #include "mysql_priv.h"
15 #include <mysql/plugin.h>
16 #else
17 #include "sql_priv.h"
18 #include "probes_mysql.h"
19 #include "sql_class.h"
20 #endif
21 
22 #include "hstcpcli.hpp"
23 #include "auto_file.hpp"
24 #include "string_util.hpp"
25 #include "auto_addrinfo.hpp"
26 #include "escape.hpp"
27 #include "util.hpp"
28 
29 /* TODO */
30 #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
31 #define MSG_NOSIGNAL 0
32 #endif
33 
34 #define DBG(x)
35 
36 namespace dena {
37 
hstresult()38 hstresult::hstresult()
39 {
40   SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16,
41     MYF(MY_WME));
42 }
43 
~hstresult()44 hstresult::~hstresult()
45 {
46   delete_dynamic(&flds);
47 }
48 
49 struct hstcpcli : public hstcpcli_i, private noncopyable {
50   hstcpcli(const socket_args& args);
51   virtual ~hstcpcli();
52   virtual void close();
53   virtual int reconnect();
54   virtual bool stable_point();
55   virtual void request_buf_open_index(size_t pst_id, const char *dbn,
56     const char *tbl, const char *idx, const char *retflds, const char *filflds);
57   virtual void request_buf_auth(const char *secret, const char *typ);
58   virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
59     const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
60     const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
61     const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
62     const string_ref *invalues, size_t invalueslen);
63   virtual size_t request_buf_append(const char *start, const char *finish);
64   virtual void request_reset();
65   virtual int request_send();
66   virtual int response_recv(size_t& num_flds_r);
67   virtual int get_result(hstresult& result);
68   virtual const string_ref *get_next_row();
69   virtual const string_ref *get_next_row_from_result(hstresult& result);
70   virtual size_t get_row_size();
71   virtual size_t get_row_size_from_result(hstresult& result);
72   virtual void response_buf_remove();
73   virtual int get_error_code();
74   virtual String& get_error();
75   virtual void clear_error();
76   virtual int set_timeout(int send_timeout, int recv_timeout);
get_num_req_bufddena::hstcpcli77   virtual size_t get_num_req_bufd() { return num_req_bufd; }
get_num_req_sentdena::hstcpcli78   virtual size_t get_num_req_sent() { return num_req_sent; }
get_num_req_rcvddena::hstcpcli79   virtual size_t get_num_req_rcvd() { return num_req_rcvd; }
get_response_end_offsetdena::hstcpcli80   virtual size_t get_response_end_offset() { return response_end_offset; }
get_readbuf_begindena::hstcpcli81   virtual const char *get_readbuf_begin() { return readbuf.begin(); }
get_readbuf_enddena::hstcpcli82   virtual const char *get_readbuf_end() { return readbuf.end(); }
get_writebuf_begindena::hstcpcli83   virtual const char *get_writebuf_begin() { return writebuf.begin(); }
get_writebuf_sizedena::hstcpcli84   virtual size_t get_writebuf_size() { return writebuf.size(); }
85   virtual void write_error_to_log(const char *func_name, const char *file_name,
86     ulong line_no);
87  private:
88   int read_more();
89   int set_error(int code, const String& str);
90   int set_error(int code, const char *str);
91  private:
92   auto_file fd;
93   socket_args sargs;
94   string_buffer readbuf;
95   string_buffer writebuf;
96   size_t response_end_offset; /* incl newline */
97   size_t cur_row_offset;
98   size_t cur_row_size;
99   size_t num_flds;
100   size_t num_req_bufd; /* buffered but not yet sent */
101   size_t num_req_sent; /* sent but not yet received */
102   size_t num_req_rcvd; /* received but not yet removed */
103   int error_code;
104   String error_str;
105   DYNAMIC_ARRAY flds;
106   int errno_buf;
107 };
108 
hstcpcli(const socket_args & args)109 hstcpcli::hstcpcli(const socket_args& args)
110   : sargs(args), response_end_offset(0), cur_row_offset(0), cur_row_size(0),
111     num_flds(0), num_req_bufd(0), num_req_sent(0), num_req_rcvd(0),
112     error_code(0), errno_buf(0)
113 {
114   String err;
115   SPD_INIT_DYNAMIC_ARRAY2(&flds, sizeof(string_ref), NULL, 16, 16, MYF(MY_WME));
116   if (socket_connect(fd, sargs, err) != 0) {
117     set_error(-1, err);
118   }
119 }
120 
~hstcpcli()121 hstcpcli::~hstcpcli()
122 {
123   delete_dynamic(&flds);
124 }
125 
126 void
close()127 hstcpcli::close()
128 {
129   fd.close();
130   readbuf.clear();
131   writebuf.clear();
132   response_end_offset = 0;
133   cur_row_offset = 0;
134   num_flds = 0;
135   num_req_bufd = 0;
136   num_req_sent = 0;
137   num_req_rcvd = 0;
138 }
139 
140 int
reconnect()141 hstcpcli::reconnect()
142 {
143   clear_error();
144   close();
145   String err;
146   if (socket_connect(fd, sargs, err) != 0) {
147     set_error(-1, err);
148   }
149   return error_code;
150 }
151 
152 int
set_timeout(int send_timeout,int recv_timeout)153 hstcpcli::set_timeout(int send_timeout, int recv_timeout)
154 {
155   String err;
156   sargs.send_timeout = send_timeout;
157   sargs.recv_timeout = recv_timeout;
158   if (socket_set_timeout(fd, sargs, err) != 0) {
159     set_error(-1, err);
160   }
161   return error_code;
162 }
163 
164 bool
stable_point()165 hstcpcli::stable_point()
166 {
167   /* returns true if cli can send a new request */
168   return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
169     num_req_rcvd == 0 && response_end_offset == 0;
170 }
171 
172 int
get_error_code()173 hstcpcli::get_error_code()
174 {
175   return error_code;
176 }
177 
178 String&
get_error()179 hstcpcli::get_error()
180 {
181   return error_str;
182 }
183 
184 int
read_more()185 hstcpcli::read_more()
186 {
187   const size_t block_size = 4096; // FIXME
188   char *const wp = readbuf.make_space(block_size);
189   int rlen;
190   errno = 0;
191   while ((rlen = read(fd.get(), wp, block_size)) <= 0) {
192     errno_buf = errno;
193     if (rlen < 0) {
194       if (errno == EINTR || errno == EAGAIN)
195       {
196         errno = 0;
197         continue;
198       }
199       error_str = String("read: failed", &my_charset_bin);
200     } else {
201       error_str = String("read: eof", &my_charset_bin);
202     }
203     return rlen;
204   }
205   readbuf.space_wrote(rlen);
206   return rlen;
207 }
208 
209 void
clear_error()210 hstcpcli::clear_error()
211 {
212   DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
213   error_code = 0;
214   error_str.length(0);
215 }
216 
217 int
set_error(int code,const String & str)218 hstcpcli::set_error(int code, const String& str)
219 {
220   DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
221   error_code = code;
222   error_str = str;
223   return error_code;
224 }
225 
226 int
set_error(int code,const char * str)227 hstcpcli::set_error(int code, const char *str)
228 {
229   uint32 str_len = strlen(str);
230   DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
231   error_code = code;
232   error_str.length(0);
233   if (error_str.reserve(str_len + 1))
234     return 0;
235   error_str.q_append(str, str_len);
236   error_str.c_ptr_safe();
237   return error_code;
238 }
239 
240 void
request_buf_open_index(size_t pst_id,const char * dbn,const char * tbl,const char * idx,const char * retflds,const char * filflds)241 hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
242   const char *tbl, const char *idx, const char *retflds, const char *filflds)
243 {
244 /*
245   if (num_req_sent > 0 || num_req_rcvd > 0) {
246 */
247   if (num_req_rcvd > 0) {
248     close();
249     set_error(-1, "request_buf_open_index: protocol out of sync");
250     return;
251   }
252   const string_ref dbn_ref(dbn, strlen(dbn));
253   const string_ref tbl_ref(tbl, strlen(tbl));
254   const string_ref idx_ref(idx, strlen(idx));
255   const string_ref rfs_ref(retflds, strlen(retflds));
256   writebuf.append_literal("P\t");
257   append_uint32(writebuf, pst_id); // FIXME size_t ?
258   writebuf.append_literal("\t");
259   writebuf.append(dbn_ref.begin(), dbn_ref.end());
260   writebuf.append_literal("\t");
261   writebuf.append(tbl_ref.begin(), tbl_ref.end());
262   writebuf.append_literal("\t");
263   writebuf.append(idx_ref.begin(), idx_ref.end());
264   writebuf.append_literal("\t");
265   writebuf.append(rfs_ref.begin(), rfs_ref.end());
266   if (filflds != 0) {
267     const string_ref fls_ref(filflds, strlen(filflds));
268     writebuf.append_literal("\t");
269     writebuf.append(fls_ref.begin(), fls_ref.end());
270   }
271   writebuf.append_literal("\n");
272   ++num_req_bufd;
273 }
274 
275 void
request_buf_auth(const char * secret,const char * typ)276 hstcpcli::request_buf_auth(const char *secret, const char *typ)
277 {
278 /*
279   if (num_req_sent > 0 || num_req_rcvd > 0) {
280 */
281   if (num_req_rcvd > 0) {
282     close();
283     set_error(-1, "request_buf_auth: protocol out of sync");
284     return;
285   }
286   if (typ == 0) {
287     typ = "1";
288   }
289   const string_ref typ_ref(typ, strlen(typ));
290   const string_ref secret_ref(secret, strlen(secret));
291   writebuf.append_literal("A\t");
292   writebuf.append(typ_ref.begin(), typ_ref.end());
293   writebuf.append_literal("\t");
294   writebuf.append(secret_ref.begin(), secret_ref.end());
295   writebuf.append_literal("\n");
296   ++num_req_bufd;
297 }
298 
299 namespace {
300 
301 void
append_delim_value(string_buffer & buf,const char * start,const char * finish)302 append_delim_value(string_buffer& buf, const char *start, const char *finish)
303 {
304   if (start == 0) {
305     /* null */
306     const char t[] = "\t\0";
307     buf.append(t, t + 2);
308   } else {
309     /* non-null */
310     buf.append_literal("\t");
311     escape_string(buf, start, finish);
312   }
313 }
314 
315 };
316 
317 void
request_buf_exec_generic(size_t pst_id,const string_ref & op,const string_ref * kvs,size_t kvslen,uint32 limit,uint32 skip,const string_ref & mod_op,const string_ref * mvs,size_t mvslen,const hstcpcli_filter * fils,size_t filslen,int invalues_keypart,const string_ref * invalues,size_t invalueslen)318 hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
319   const string_ref *kvs, size_t kvslen, uint32 limit, uint32 skip,
320   const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
321   const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
322   const string_ref *invalues, size_t invalueslen)
323 {
324 /*
325   if (num_req_sent > 0 || num_req_rcvd > 0) {
326 */
327   if (num_req_rcvd > 0) {
328     close();
329     set_error(-1, "request_buf_exec_generic: protocol out of sync");
330     return;
331   }
332   append_uint32(writebuf, pst_id); // FIXME size_t ?
333   writebuf.append_literal("\t");
334   writebuf.append(op.begin(), op.end());
335   writebuf.append_literal("\t");
336   append_uint32(writebuf, kvslen); // FIXME size_t ?
337   for (size_t i = 0; i < kvslen; ++i) {
338     const string_ref& kv = kvs[i];
339     append_delim_value(writebuf, kv.begin(), kv.end());
340   }
341   if (limit != 0 || skip != 0 || invalues_keypart >= 0 ||
342     mod_op.size() != 0 || filslen != 0) {
343     /* has more option */
344     writebuf.append_literal("\t");
345     append_uint32(writebuf, limit); // FIXME size_t ?
346     if (skip != 0 || invalues_keypart >= 0 ||
347       mod_op.size() != 0 || filslen != 0) {
348       writebuf.append_literal("\t");
349       append_uint32(writebuf, skip); // FIXME size_t ?
350     }
351     if (invalues_keypart >= 0) {
352       writebuf.append_literal("\t@\t");
353       append_uint32(writebuf, invalues_keypart);
354       writebuf.append_literal("\t");
355       append_uint32(writebuf, invalueslen);
356       for (size_t i = 0; i < invalueslen; ++i) {
357         const string_ref& s = invalues[i];
358         append_delim_value(writebuf, s.begin(), s.end());
359       }
360     }
361     for (size_t i = 0; i < filslen; ++i) {
362       const hstcpcli_filter& f = fils[i];
363       writebuf.append_literal("\t");
364       writebuf.append(f.filter_type.begin(), f.filter_type.end());
365       writebuf.append_literal("\t");
366       writebuf.append(f.op.begin(), f.op.end());
367       writebuf.append_literal("\t");
368       append_uint32(writebuf, f.ff_offset);
369       append_delim_value(writebuf, f.val.begin(), f.val.end());
370     }
371     if (mod_op.size() != 0) {
372       writebuf.append_literal("\t");
373       writebuf.append(mod_op.begin(), mod_op.end());
374       for (size_t i = 0; i < mvslen; ++i) {
375         const string_ref& mv = mvs[i];
376         append_delim_value(writebuf, mv.begin(), mv.end());
377       }
378     }
379   }
380   writebuf.append_literal("\n");
381   ++num_req_bufd;
382 }
383 
384 size_t
request_buf_append(const char * start,const char * finish)385 hstcpcli::request_buf_append(const char *start, const char *finish)
386 {
387 /*
388   if (num_req_sent > 0 || num_req_rcvd > 0) {
389 */
390   if (num_req_rcvd > 0) {
391     close();
392     set_error(-1, "request_buf_append: protocol out of sync");
393     return 0;
394   }
395   const char *nl = start;
396   size_t num_req = 0;
397   while ((nl = memchr_char(nl, '\n', finish - nl))) {
398     if (nl == finish)
399       break;
400     num_req++;
401     nl++;
402   }
403   num_req++;
404   writebuf.append(start, finish);
405   if (*(finish - 1) != '\n')
406     writebuf.append_literal("\n");
407   num_req_bufd += num_req;
408   return num_req;
409 }
410 
411 void
request_reset()412 hstcpcli::request_reset()
413 {
414   if (num_req_bufd) {
415     writebuf.erase_front(writebuf.size());
416     num_req_bufd = 0;
417   }
418 }
419 
420 int
request_send()421 hstcpcli::request_send()
422 {
423   if (error_code < 0) {
424     return error_code;
425   }
426   clear_error();
427   if (fd.get() < 0) {
428     close();
429     return set_error(-1, "write: closed");
430   }
431 /*
432   if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
433 */
434   if (num_req_bufd == 0 || num_req_rcvd > 0) {
435     close();
436     return set_error(-1, "request_send: protocol out of sync");
437   }
438   const size_t wrlen = writebuf.size();
439   const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
440   if (r <= 0) {
441     close();
442     return set_error(-1, r < 0 ? "write: failed" : "write: eof");
443   }
444   writebuf.erase_front(r);
445   if (static_cast<size_t>(r) != wrlen) {
446     close();
447     return set_error(-1, "write: incomplete");
448   }
449   num_req_sent += num_req_bufd;
450   num_req_bufd = 0;
451   DBG(fprintf(stderr, "REQSEND 0\n"));
452   return 0;
453 }
454 
455 int
response_recv(size_t & num_flds_r)456 hstcpcli::response_recv(size_t& num_flds_r)
457 {
458   if (error_code < 0) {
459     return error_code;
460   }
461   clear_error();
462   if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
463     response_end_offset != 0) {
464     close();
465     return set_error(-1, "response_recv: protocol out of sync");
466   }
467   cur_row_offset = 0;
468   num_flds_r = num_flds = 0;
469   if (fd.get() < 0) {
470     return set_error(-1, "read: closed");
471   }
472   size_t offset = 0;
473   while (true) {
474     const char *const lbegin = readbuf.begin() + offset;
475     const char *const lend = readbuf.end();
476     if (lbegin < lend)
477     {
478       const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
479       if (nl != 0) {
480         offset += (nl + 1) - lbegin;
481         break;
482       }
483       offset += lend - lbegin;
484     }
485     if (read_more() <= 0) {
486       close();
487       error_code = -1;
488       return error_code;
489     }
490   }
491   response_end_offset = offset;
492   --num_req_sent;
493   ++num_req_rcvd;
494   char *start = readbuf.begin();
495   char *const finish = start + response_end_offset - 1;
496   const size_t resp_code = read_ui32(start, finish);
497   skip_one(start, finish);
498   num_flds_r = num_flds = read_ui32(start, finish);
499   if (resp_code != 0) {
500     skip_one(start, finish);
501     char *const err_begin = start;
502     read_token(start, finish);
503     char *const err_end = start;
504     String e = String(err_begin, (uint32)(err_end - err_begin), &my_charset_bin);
505     if (!e.length()) {
506       e = String("unknown_error", &my_charset_bin);
507     }
508     return set_error(resp_code, e);
509   }
510   cur_row_size = 0;
511   cur_row_offset = start - readbuf.begin();
512   DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
513     String(readbuf.begin(), readbuf.begin() + response_end_offset)
514       .c_str(),
515     cur_row_offset, response_end_offset));
516   DBG(fprintf(stderr, "RES 0\n"));
517   if (flds.max_element < num_flds)
518   {
519     if (allocate_dynamic(&flds, num_flds))
520       return set_error(-1, "out of memory");
521   }
522   flds.elements = num_flds;
523   return 0;
524 }
525 
526 int
get_result(hstresult & result)527 hstcpcli::get_result(hstresult& result)
528 {
529 /*
530   readbuf.swap(result.readbuf);
531 */
532   char *const wp = result.readbuf.make_space(response_end_offset);
533   memcpy(wp, readbuf.begin(), response_end_offset);
534   result.readbuf.space_wrote(response_end_offset);
535   result.response_end_offset = response_end_offset;
536   result.num_flds = num_flds;
537   result.cur_row_size = cur_row_size;
538   result.cur_row_offset = cur_row_offset;
539   if (result.flds.max_element < num_flds)
540   {
541     if (allocate_dynamic(&result.flds, num_flds))
542       return set_error(-1, "out of memory");
543   }
544   result.flds.elements = num_flds;
545   return 0;
546 }
547 
548 const string_ref *
get_next_row()549 hstcpcli::get_next_row()
550 {
551   if (num_flds == 0 || flds.elements < num_flds) {
552     DBG(fprintf(stderr, "GNR NF 0\n"));
553     return 0;
554   }
555   char *start = readbuf.begin() + cur_row_offset;
556   char *const finish = readbuf.begin() + response_end_offset - 1;
557   if (start >= finish) { /* start[0] == nl */
558     DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
559     return 0;
560   }
561   for (size_t i = 0; i < num_flds; ++i) {
562     skip_one(start, finish);
563     char *const fld_begin = start;
564     read_token(start, finish);
565     char *const fld_end = start;
566     char *wp = fld_begin;
567     if (is_null_expression(fld_begin, fld_end)) {
568       /* null */
569       ((string_ref *) flds.buffer)[i] = string_ref();
570     } else {
571       unescape_string(wp, fld_begin, fld_end); /* in-place */
572       ((string_ref *) flds.buffer)[i] = string_ref(fld_begin, wp);
573     }
574   }
575   cur_row_size = start - (readbuf.begin() + cur_row_offset);
576   cur_row_offset = start - readbuf.begin();
577   return (string_ref *) flds.buffer;
578 }
579 
580 const string_ref *
get_next_row_from_result(hstresult & result)581 hstcpcli::get_next_row_from_result(hstresult& result)
582 {
583   if (result.num_flds == 0 || result.flds.elements < result.num_flds) {
584     DBG(fprintf(stderr, "GNR NF 0\n"));
585     return 0;
586   }
587   char *start = result.readbuf.begin() + result.cur_row_offset;
588   char *const finish = result.readbuf.begin() + result.response_end_offset - 1;
589   if (start >= finish) { /* start[0] == nl */
590     DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
591     return 0;
592   }
593   for (size_t i = 0; i < result.num_flds; ++i) {
594     skip_one(start, finish);
595     char *const fld_begin = start;
596     read_token(start, finish);
597     char *const fld_end = start;
598     char *wp = fld_begin;
599     if (is_null_expression(fld_begin, fld_end)) {
600       /* null */
601       ((string_ref *) result.flds.buffer)[i] = string_ref();
602     } else {
603       unescape_string(wp, fld_begin, fld_end); /* in-place */
604       ((string_ref *) result.flds.buffer)[i] = string_ref(fld_begin, wp);
605     }
606   }
607   result.cur_row_size =
608     start - (result.readbuf.begin() + result.cur_row_offset);
609   result.cur_row_offset = start - result.readbuf.begin();
610   return (string_ref *) result.flds.buffer;
611 }
612 
613 size_t
get_row_size()614 hstcpcli::get_row_size()
615 {
616   return cur_row_size;
617 }
618 
619 size_t
get_row_size_from_result(hstresult & result)620 hstcpcli::get_row_size_from_result(hstresult& result)
621 {
622   return result.cur_row_size;
623 }
624 
625 void
response_buf_remove()626 hstcpcli::response_buf_remove()
627 {
628   if (response_end_offset == 0) {
629     close();
630     set_error(-1, "response_buf_remove: protocol out of sync");
631     return;
632   }
633   readbuf.erase_front(response_end_offset);
634   response_end_offset = 0;
635   --num_req_rcvd;
636   cur_row_offset = 0;
637   num_flds = 0;
638 }
639 
640 void
write_error_to_log(const char * func_name,const char * file_name,ulong line_no)641 hstcpcli::write_error_to_log(
642   const char *func_name,
643   const char *file_name,
644   ulong line_no
645 ) {
646   if (errno_buf) {
647     time_t cur_time = (time_t) time((time_t*) 0);
648     struct tm lt;
649     struct tm *l_time = localtime_r(&cur_time, &lt);
650     fprintf(stderr,
651       "%04d%02d%02d %02d:%02d:%02d [ERROR] hstcpcli: [%d][%s]"
652       " [%s][%s][%lu] errno=%d\n",
653       l_time->tm_year + 1900, l_time->tm_mon + 1, l_time->tm_mday,
654       l_time->tm_hour, l_time->tm_min, l_time->tm_sec,
655       error_code, error_str.c_ptr_safe(),
656       func_name, file_name, line_no, errno_buf);
657   }
658 }
659 
660 hstcpcli_ptr
create(const socket_args & args)661 hstcpcli_i::create(const socket_args& args)
662 {
663   return hstcpcli_ptr(new hstcpcli(args));
664 }
665 
666 };
667 
668