1 
2 // vim:sw=2:ai
3 
4 /*
5  * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6  * See COPYRIGHT.txt for details.
7  */
8 
9 #include <my_global.h>
10 #include <stdexcept>
11 
12 #include "hstcpcli.hpp"
13 #include "auto_file.hpp"
14 #include "string_util.hpp"
15 #include "auto_addrinfo.hpp"
16 #include "escape.hpp"
17 #include "util.hpp"
18 
19 /* TODO */
20 #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
21 #define MSG_NOSIGNAL 0
22 #endif
23 
24 #define DBG(x)
25 
26 namespace dena {
27 
28 struct hstcpcli : public hstcpcli_i, private noncopyable {
29   hstcpcli(const socket_args& args);
30   virtual void close();
31   virtual int reconnect();
32   virtual bool stable_point();
33   virtual void request_buf_open_index(size_t pst_id, const char *dbn,
34     const char *tbl, const char *idx, const char *retflds, const char *filflds);
35   virtual void request_buf_auth(const char *secret, const char *typ);
36   virtual void request_buf_exec_generic(size_t pst_id, const string_ref& op,
37     const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
38     const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
39     const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
40     const string_ref *invalues, size_t invalueslen);
41   virtual int request_send();
42   virtual int response_recv(size_t& num_flds_r);
43   virtual const string_ref *get_next_row();
44   virtual void response_buf_remove();
45   virtual int get_error_code();
46   virtual std::string get_error();
47  private:
48   int read_more();
49   void clear_error();
50   int set_error(int code, const std::string& str);
51  private:
52   auto_file fd;
53   socket_args sargs;
54   string_buffer readbuf;
55   string_buffer writebuf;
56   size_t response_end_offset; /* incl newline */
57   size_t cur_row_offset;
58   size_t num_flds;
59   size_t num_req_bufd; /* buffered but not yet sent */
60   size_t num_req_sent; /* sent but not yet received */
61   size_t num_req_rcvd; /* received but not yet removed */
62   int error_code;
63   std::string error_str;
64   std::vector<string_ref> flds;
65 };
66 
hstcpcli(const socket_args & args)67 hstcpcli::hstcpcli(const socket_args& args)
68   : sargs(args), response_end_offset(0), cur_row_offset(0), num_flds(0),
69     num_req_bufd(0), num_req_sent(0), num_req_rcvd(0), error_code(0)
70 {
71   std::string err;
72   if (socket_connect(fd, sargs, err) != 0) {
73     set_error(-1, err);
74   }
75 }
76 
77 void
close()78 hstcpcli::close()
79 {
80   fd.close();
81   readbuf.clear();
82   writebuf.clear();
83   flds.clear();
84   response_end_offset = 0;
85   cur_row_offset = 0;
86   num_flds = 0;
87   num_req_bufd = 0;
88   num_req_sent = 0;
89   num_req_rcvd = 0;
90 }
91 
92 int
reconnect()93 hstcpcli::reconnect()
94 {
95   clear_error();
96   close();
97   std::string err;
98   if (socket_connect(fd, sargs, err) != 0) {
99     set_error(-1, err);
100   }
101   return error_code;
102 }
103 
104 bool
stable_point()105 hstcpcli::stable_point()
106 {
107   /* returns true if cli can send a new request */
108   return fd.get() >= 0 && num_req_bufd == 0 && num_req_sent == 0 &&
109     num_req_rcvd == 0 && response_end_offset == 0;
110 }
111 
112 int
get_error_code()113 hstcpcli::get_error_code()
114 {
115   return error_code;
116 }
117 
118 std::string
get_error()119 hstcpcli::get_error()
120 {
121   return error_str;
122 }
123 
124 int
read_more()125 hstcpcli::read_more()
126 {
127   const size_t block_size = 4096; // FIXME
128   char *const wp = readbuf.make_space(block_size);
129   const ssize_t rlen = read(fd.get(), wp, block_size);
130   if (rlen <= 0) {
131     if (rlen < 0) {
132       error_str = "read: failed";
133     } else {
134       error_str = "read: eof";
135     }
136     return rlen;
137   }
138   readbuf.space_wrote(rlen);
139   return rlen;
140 }
141 
142 void
clear_error()143 hstcpcli::clear_error()
144 {
145   DBG(fprintf(stderr, "CLEAR_ERROR: %d\n", error_code));
146   error_code = 0;
147   error_str.clear();
148 }
149 
150 int
set_error(int code,const std::string & str)151 hstcpcli::set_error(int code, const std::string& str)
152 {
153   DBG(fprintf(stderr, "SET_ERROR: %d\n", code));
154   error_code = code;
155   error_str = str;
156   return error_code;
157 }
158 
159 void
request_buf_open_index(size_t pst_id,const char * dbn,const char * tbl,const char * idx,const char * retflds,const char * filflds)160 hstcpcli::request_buf_open_index(size_t pst_id, const char *dbn,
161   const char *tbl, const char *idx, const char *retflds, const char *filflds)
162 {
163   if (num_req_sent > 0 || num_req_rcvd > 0) {
164     close();
165     set_error(-1, "request_buf_open_index: protocol out of sync");
166     return;
167   }
168   const string_ref dbn_ref(dbn, strlen(dbn));
169   const string_ref tbl_ref(tbl, strlen(tbl));
170   const string_ref idx_ref(idx, strlen(idx));
171   const string_ref rfs_ref(retflds, strlen(retflds));
172   writebuf.append_literal("P\t");
173   append_uint32(writebuf, pst_id); // FIXME size_t ?
174   writebuf.append_literal("\t");
175   writebuf.append(dbn_ref.begin(), dbn_ref.end());
176   writebuf.append_literal("\t");
177   writebuf.append(tbl_ref.begin(), tbl_ref.end());
178   writebuf.append_literal("\t");
179   writebuf.append(idx_ref.begin(), idx_ref.end());
180   writebuf.append_literal("\t");
181   writebuf.append(rfs_ref.begin(), rfs_ref.end());
182   if (filflds != 0) {
183     const string_ref fls_ref(filflds, strlen(filflds));
184     writebuf.append_literal("\t");
185     writebuf.append(fls_ref.begin(), fls_ref.end());
186   }
187   writebuf.append_literal("\n");
188   ++num_req_bufd;
189 }
190 
191 void
request_buf_auth(const char * secret,const char * typ)192 hstcpcli::request_buf_auth(const char *secret, const char *typ)
193 {
194   if (num_req_sent > 0 || num_req_rcvd > 0) {
195     close();
196     set_error(-1, "request_buf_auth: protocol out of sync");
197     return;
198   }
199   if (typ == 0) {
200     typ = "1";
201   }
202   const string_ref typ_ref(typ, strlen(typ));
203   const string_ref secret_ref(secret, strlen(secret));
204   writebuf.append_literal("A\t");
205   writebuf.append(typ_ref.begin(), typ_ref.end());
206   writebuf.append_literal("\t");
207   writebuf.append(secret_ref.begin(), secret_ref.end());
208   writebuf.append_literal("\n");
209   ++num_req_bufd;
210 }
211 
212 namespace {
213 
214 void
append_delim_value(string_buffer & buf,const char * start,const char * finish)215 append_delim_value(string_buffer& buf, const char *start, const char *finish)
216 {
217   if (start == 0) {
218     /* null */
219     const char t[] = "\t\0";
220     buf.append(t, t + 2);
221   } else {
222     /* non-null */
223     buf.append_literal("\t");
224     escape_string(buf, start, finish);
225   }
226 }
227 
228 };
229 
230 void
request_buf_exec_generic(size_t pst_id,const string_ref & op,const string_ref * kvs,size_t kvslen,uint32_t limit,uint32_t skip,const string_ref & mod_op,const string_ref * mvs,size_t mvslen,const hstcpcli_filter * fils,size_t filslen,int invalues_keypart,const string_ref * invalues,size_t invalueslen)231 hstcpcli::request_buf_exec_generic(size_t pst_id, const string_ref& op,
232   const string_ref *kvs, size_t kvslen, uint32_t limit, uint32_t skip,
233   const string_ref& mod_op, const string_ref *mvs, size_t mvslen,
234   const hstcpcli_filter *fils, size_t filslen, int invalues_keypart,
235   const string_ref *invalues, size_t invalueslen)
236 {
237   if (num_req_sent > 0 || num_req_rcvd > 0) {
238     close();
239     set_error(-1, "request_buf_exec_generic: protocol out of sync");
240     return;
241   }
242   append_uint32(writebuf, pst_id); // FIXME size_t ?
243   writebuf.append_literal("\t");
244   writebuf.append(op.begin(), op.end());
245   writebuf.append_literal("\t");
246   append_uint32(writebuf, kvslen); // FIXME size_t ?
247   for (size_t i = 0; i < kvslen; ++i) {
248     const string_ref& kv = kvs[i];
249     append_delim_value(writebuf, kv.begin(), kv.end());
250   }
251   if (limit != 0 || skip != 0 || invalues_keypart >= 0 ||
252     mod_op.size() != 0 || filslen != 0) {
253     /* has more option */
254     writebuf.append_literal("\t");
255     append_uint32(writebuf, limit); // FIXME size_t ?
256     if (skip != 0 || invalues_keypart >= 0 ||
257       mod_op.size() != 0 || filslen != 0) {
258       writebuf.append_literal("\t");
259       append_uint32(writebuf, skip); // FIXME size_t ?
260     }
261     if (invalues_keypart >= 0) {
262       writebuf.append_literal("\t@\t");
263       append_uint32(writebuf, invalues_keypart);
264       writebuf.append_literal("\t");
265       append_uint32(writebuf, invalueslen);
266       for (size_t i = 0; i < invalueslen; ++i) {
267 	const string_ref& s = invalues[i];
268 	append_delim_value(writebuf, s.begin(), s.end());
269       }
270     }
271     for (size_t i = 0; i < filslen; ++i) {
272       const hstcpcli_filter& f = fils[i];
273       writebuf.append_literal("\t");
274       writebuf.append(f.filter_type.begin(), f.filter_type.end());
275       writebuf.append_literal("\t");
276       writebuf.append(f.op.begin(), f.op.end());
277       writebuf.append_literal("\t");
278       append_uint32(writebuf, f.ff_offset);
279       append_delim_value(writebuf, f.val.begin(), f.val.end());
280     }
281     if (mod_op.size() != 0) {
282       writebuf.append_literal("\t");
283       writebuf.append(mod_op.begin(), mod_op.end());
284       for (size_t i = 0; i < mvslen; ++i) {
285 	const string_ref& mv = mvs[i];
286 	append_delim_value(writebuf, mv.begin(), mv.end());
287       }
288     }
289   }
290   writebuf.append_literal("\n");
291   ++num_req_bufd;
292 }
293 
294 int
request_send()295 hstcpcli::request_send()
296 {
297   if (error_code < 0) {
298     return error_code;
299   }
300   clear_error();
301   if (fd.get() < 0) {
302     close();
303     return set_error(-1, "write: closed");
304   }
305   if (num_req_bufd == 0 || num_req_sent > 0 || num_req_rcvd > 0) {
306     close();
307     return set_error(-1, "request_send: protocol out of sync");
308   }
309   const size_t wrlen = writebuf.size();
310   const ssize_t r = send(fd.get(), writebuf.begin(), wrlen, MSG_NOSIGNAL);
311   if (r <= 0) {
312     close();
313     return set_error(-1, r < 0 ? "write: failed" : "write: eof");
314   }
315   writebuf.erase_front(r);
316   if (static_cast<size_t>(r) != wrlen) {
317     close();
318     return set_error(-1, "write: incomplete");
319   }
320   num_req_sent = num_req_bufd;
321   num_req_bufd = 0;
322   DBG(fprintf(stderr, "REQSEND 0\n"));
323   return 0;
324 }
325 
326 int
response_recv(size_t & num_flds_r)327 hstcpcli::response_recv(size_t& num_flds_r)
328 {
329   if (error_code < 0) {
330     return error_code;
331   }
332   clear_error();
333   if (num_req_bufd > 0 || num_req_sent == 0 || num_req_rcvd > 0 ||
334     response_end_offset != 0) {
335     close();
336     return set_error(-1, "response_recv: protocol out of sync");
337   }
338   cur_row_offset = 0;
339   num_flds_r = num_flds = 0;
340   if (fd.get() < 0) {
341     return set_error(-1, "read: closed");
342   }
343   size_t offset = 0;
344   while (true) {
345     const char *const lbegin = readbuf.begin() + offset;
346     const char *const lend = readbuf.end();
347     const char *const nl = memchr_char(lbegin, '\n', lend - lbegin);
348     if (nl != 0) {
349       offset = (nl + 1) - readbuf.begin();
350       break;
351     }
352     if (read_more() <= 0) {
353       close();
354       return set_error(-1, "read: eof");
355     }
356   }
357   response_end_offset = offset;
358   --num_req_sent;
359   ++num_req_rcvd;
360   char *start = readbuf.begin();
361   char *const finish = start + response_end_offset - 1;
362   const size_t resp_code = read_ui32(start, finish);
363   skip_one(start, finish);
364   num_flds_r = num_flds = read_ui32(start, finish);
365   if (resp_code != 0) {
366     skip_one(start, finish);
367     char *const err_begin = start;
368     read_token(start, finish);
369     char *const err_end = start;
370     std::string e = std::string(err_begin, err_end - err_begin);
371     if (e.empty()) {
372       e = "unknown_error";
373     }
374     return set_error(resp_code, e);
375   }
376   cur_row_offset = start - readbuf.begin();
377   DBG(fprintf(stderr, "[%s] ro=%zu eol=%zu\n",
378     std::string(readbuf.begin(), readbuf.begin() + response_end_offset)
379       .c_str(),
380     cur_row_offset, response_end_offset));
381   DBG(fprintf(stderr, "RES 0\n"));
382   return 0;
383 }
384 
385 const string_ref *
get_next_row()386 hstcpcli::get_next_row()
387 {
388   if (num_flds == 0) {
389     DBG(fprintf(stderr, "GNR NF 0\n"));
390     return 0;
391   }
392   if (flds.size() < num_flds) {
393     flds.resize(num_flds);
394   }
395   char *start = readbuf.begin() + cur_row_offset;
396   char *const finish = readbuf.begin() + response_end_offset - 1;
397   if (start >= finish) { /* start[0] == nl */
398     DBG(fprintf(stderr, "GNR FIN 0 %p %p\n", start, finish));
399     return 0;
400   }
401   for (size_t i = 0; i < num_flds; ++i) {
402     skip_one(start, finish);
403     char *const fld_begin = start;
404     read_token(start, finish);
405     char *const fld_end = start;
406     char *wp = fld_begin;
407     if (is_null_expression(fld_begin, fld_end)) {
408       /* null */
409       flds[i] = string_ref();
410     } else {
411       unescape_string(wp, fld_begin, fld_end); /* in-place */
412       flds[i] = string_ref(fld_begin, wp);
413     }
414   }
415   cur_row_offset = start - readbuf.begin();
416   return &flds[0];
417 }
418 
419 void
response_buf_remove()420 hstcpcli::response_buf_remove()
421 {
422   if (response_end_offset == 0) {
423     close();
424     set_error(-1, "response_buf_remove: protocol out of sync");
425     return;
426   }
427   readbuf.erase_front(response_end_offset);
428   response_end_offset = 0;
429   --num_req_rcvd;
430   cur_row_offset = 0;
431   num_flds = 0;
432   flds.clear();
433 }
434 
435 hstcpcli_ptr
create(const socket_args & args)436 hstcpcli_i::create(const socket_args& args)
437 {
438   return hstcpcli_ptr(new hstcpcli(args));
439 }
440 
441 };
442 
443