1 
2 // vim:sw=2:ai
3 
4 /*
5  * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6  * See COPYRIGHT.txt for details.
7  */
8 
9 #include <netinet/in.h>
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <stdexcept>
15 #include <signal.h>
16 #include <list>
17 #if __linux__
18 #include <sys/epoll.h>
19 #endif
20 
21 #include "hstcpsvr_worker.hpp"
22 #include "string_buffer.hpp"
23 #include "auto_ptrcontainer.hpp"
24 #include "string_util.hpp"
25 #include "escape.hpp"
26 
27 #define DBG_FD(x)
28 #define DBG_TR(x)
29 #define DBG_EP(x)
30 #define DBG_MULTI(x)
31 
32 /* TODO */
33 #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
34 #define MSG_NOSIGNAL 0
35 #endif
36 
37 namespace dena {
38 
39 struct dbconnstate {
40   string_buffer readbuf;
41   string_buffer writebuf;
42   std::vector<prep_stmt> prep_stmts;
43   size_t resp_begin_pos;
44   size_t find_nl_pos;
resetdena::dbconnstate45   void reset() {
46     readbuf.clear();
47     writebuf.clear();
48     prep_stmts.clear();
49     resp_begin_pos = 0;
50     find_nl_pos = 0;
51   }
dbconnstatedena::dbconnstate52   dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
53 };
54 
55 struct hstcpsvr_conn;
56 typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
57 
58 struct hstcpsvr_conn : public dbcallback_i {
59  public:
60   auto_file fd;
61   sockaddr_storage addr;
62   socklen_t addr_len;
63   dbconnstate cstate;
64   std::string err;
65   size_t readsize;
66   bool nonblocking;
67   bool read_finished;
68   bool write_finished;
69   time_t nb_last_io;
70   hstcpsvr_conns_type::iterator conns_iter;
71   bool authorized;
72  public:
73   bool closed() const;
74   bool ok_to_close() const;
75   void reset();
76   int accept(const hstcpsvr_shared_c& cshared);
77   bool write_more(bool *more_r = 0);
78   bool read_more(bool *more_r = 0);
79  public:
80   virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
81   virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
82   virtual void dbcb_resp_short(uint32_t code, const char *msg);
83   virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
84   virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
85   virtual void dbcb_resp_begin(size_t num_flds);
86   virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
87   virtual void dbcb_resp_end();
88   virtual void dbcb_resp_cancel();
89  public:
hstcpsvr_conndena::hstcpsvr_conn90   hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
91     nonblocking(false), read_finished(false), write_finished(false),
92     nb_last_io(0), authorized(false) { }
93 };
94 
95 bool
closed() const96 hstcpsvr_conn::closed() const
97 {
98   return fd.get() < 0;
99 }
100 
101 bool
ok_to_close() const102 hstcpsvr_conn::ok_to_close() const
103 {
104   return write_finished || (read_finished && cstate.writebuf.size() == 0);
105 }
106 
107 void
reset()108 hstcpsvr_conn::reset()
109 {
110   addr = sockaddr_storage();
111   addr_len = sizeof(addr);
112   cstate.reset();
113   fd.reset();
114   read_finished = false;
115   write_finished = false;
116 }
117 
118 int
accept(const hstcpsvr_shared_c & cshared)119 hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
120 {
121   reset();
122   return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
123     addr_len, err);
124 }
125 
126 bool
write_more(bool * more_r)127 hstcpsvr_conn::write_more(bool *more_r)
128 {
129   if (write_finished || cstate.writebuf.size() == 0) {
130     return false;
131   }
132   const size_t wlen = cstate.writebuf.size();
133   ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
134   if (len <= 0) {
135     if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
136       cstate.writebuf.clear();
137       write_finished = true;
138     }
139     return false;
140   }
141   cstate.writebuf.erase_front(len);
142     /* FIXME: reallocate memory if too large */
143   if (more_r) {
144     *more_r = (static_cast<size_t>(len) == wlen);
145   }
146   return true;
147 }
148 
149 bool
read_more(bool * more_r)150 hstcpsvr_conn::read_more(bool *more_r)
151 {
152   if (read_finished) {
153     return false;
154   }
155   const size_t block_size = readsize > 4096 ? readsize : 4096;
156   char *wp = cstate.readbuf.make_space(block_size);
157   const ssize_t len = read(fd.get(), wp, block_size);
158   if (len <= 0) {
159     if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
160       read_finished = true;
161     }
162     return false;
163   }
164   cstate.readbuf.space_wrote(len);
165   if (more_r) {
166     *more_r = (static_cast<size_t>(len) == block_size);
167   }
168   return true;
169 }
170 
171 void
dbcb_set_prep_stmt(size_t pst_id,const prep_stmt & v)172 hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
173 {
174   if (cstate.prep_stmts.size() <= pst_id) {
175     cstate.prep_stmts.resize(pst_id + 1);
176   }
177   cstate.prep_stmts[pst_id] = v;
178 }
179 
180 const prep_stmt *
dbcb_get_prep_stmt(size_t pst_id) const181 hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
182 {
183   if (cstate.prep_stmts.size() <= pst_id) {
184     return 0;
185   }
186   return &cstate.prep_stmts[pst_id];
187 }
188 
189 void
dbcb_resp_short(uint32_t code,const char * msg)190 hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
191 {
192   write_ui32(cstate.writebuf, code);
193   const size_t msglen = strlen(msg);
194   if (msglen != 0) {
195     cstate.writebuf.append_literal("\t1\t");
196     cstate.writebuf.append(msg, msg + msglen);
197   } else {
198     cstate.writebuf.append_literal("\t1");
199   }
200   cstate.writebuf.append_literal("\n");
201 }
202 
203 void
dbcb_resp_short_num(uint32_t code,uint32_t value)204 hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
205 {
206   write_ui32(cstate.writebuf, code);
207   cstate.writebuf.append_literal("\t1\t");
208   write_ui32(cstate.writebuf, value);
209   cstate.writebuf.append_literal("\n");
210 }
211 
212 void
dbcb_resp_short_num64(uint32_t code,uint64_t value)213 hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
214 {
215   write_ui32(cstate.writebuf, code);
216   cstate.writebuf.append_literal("\t1\t");
217   write_ui64(cstate.writebuf, value);
218   cstate.writebuf.append_literal("\n");
219 }
220 
221 void
dbcb_resp_begin(size_t num_flds)222 hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
223 {
224   cstate.resp_begin_pos = cstate.writebuf.size();
225   cstate.writebuf.append_literal("0\t");
226   write_ui32(cstate.writebuf, num_flds);
227 }
228 
229 void
dbcb_resp_entry(const char * fld,size_t fldlen)230 hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
231 {
232   if (fld != 0) {
233     cstate.writebuf.append_literal("\t");
234     escape_string(cstate.writebuf, fld, fld + fldlen);
235   } else {
236     static const char t[] = "\t\0";
237     cstate.writebuf.append(t, t + 2);
238   }
239 }
240 
241 void
dbcb_resp_end()242 hstcpsvr_conn::dbcb_resp_end()
243 {
244   cstate.writebuf.append_literal("\n");
245   cstate.resp_begin_pos = 0;
246 }
247 
248 void
dbcb_resp_cancel()249 hstcpsvr_conn::dbcb_resp_cancel()
250 {
251   cstate.writebuf.resize(cstate.resp_begin_pos);
252   cstate.resp_begin_pos = 0;
253 }
254 
255 struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
256   hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
257   virtual void run();
258  private:
259   const hstcpsvr_shared_c& cshared;
260   volatile hstcpsvr_shared_v& vshared;
261   long worker_id;
262   dbcontext_ptr dbctx;
263   hstcpsvr_conns_type conns; /* conns refs dbctx */
264   time_t last_check_time;
265   std::vector<pollfd> pfds;
266   #ifdef __linux__
267   std::vector<epoll_event> events_vec;
268   auto_file epoll_fd;
269   #endif
270   bool accept_enabled;
271   int accept_balance;
272   std::vector<string_ref> invalues_work;
273   std::vector<record_filter> filters_work;
274  private:
275   int run_one_nb();
276   int run_one_ep();
277   void execute_lines(hstcpsvr_conn& conn);
278   void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
279   void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
280   void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
281     char *finish, hstcpsvr_conn& conn);
282   void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
283 };
284 
hstcpsvr_worker(const hstcpsvr_worker_arg & arg)285 hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
286   : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
287     dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
288     last_check_time(time(0)), accept_enabled(true), accept_balance(0)
289 {
290   #ifdef __linux__
291   if (cshared.sockargs.use_epoll) {
292     epoll_fd.reset(epoll_create(10));
293     if (epoll_fd.get() < 0) {
294       fatal_abort("epoll_create");
295     }
296     epoll_event ev = { EPOLLIN, { 0 } };
297     if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
298       != 0) {
299       fatal_abort("epoll_ctl EPOLL_CTL_ADD");
300     }
301     events_vec.resize(10240);
302   }
303   #endif
304   accept_balance = cshared.conf.get_int("accept_balance", 0);
305 }
306 
307 namespace {
308 
309 struct thr_init {
thr_initdena::__anone84201950111::thr_init310   thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
311     dbctx->init_thread(this, shutdown_flag);
312   }
~thr_initdena::__anone84201950111::thr_init313   ~thr_init() {
314     dbctx->term_thread();
315   }
316   const dbcontext_ptr& dbctx;
317 };
318 
319 }; // namespace
320 
321 void
run()322 hstcpsvr_worker::run()
323 {
324   thr_init initobj(dbctx, vshared.shutdown);
325 
326   {
327     lock_guard crit_sec(const_cast<mutex &>(vshared.v_mutex));
328     ++vshared.threads_started;
329     if (vshared.threads_started == cshared.num_threads)
330     {
331       pthread_cond_signal(
332         const_cast<pthread_cond_t *>(&vshared.threads_started_cond));
333     }
334   }
335 
336   dbctx->wait_for_server_to_start();
337 
338 #ifdef __linux__
339   if (cshared.sockargs.use_epoll) {
340     while (!vshared.shutdown && dbctx->check_alive()) {
341       run_one_ep();
342     }
343   } else if (cshared.sockargs.nonblocking) {
344     while (!vshared.shutdown && dbctx->check_alive()) {
345       run_one_nb();
346     }
347   } else {
348     /* UNUSED */
349     fatal_abort("run_one");
350   }
351   #else
352   while (!vshared.shutdown && dbctx->check_alive()) {
353     run_one_nb();
354   }
355   #endif
356 }
357 
358 int
run_one_nb()359 hstcpsvr_worker::run_one_nb()
360 {
361   size_t nfds = 0;
362   /* CLIENT SOCKETS */
363   for (hstcpsvr_conns_type::const_iterator i = conns.begin();
364     i != conns.end(); ++i) {
365     if (pfds.size() <= nfds) {
366       pfds.resize(nfds + 1);
367     }
368     pollfd& pfd = pfds[nfds++];
369     pfd.fd = (*i)->fd.get();
370     short ev = 0;
371     if ((*i)->cstate.writebuf.size() != 0) {
372       ev = POLLOUT;
373     } else {
374       ev = POLLIN;
375     }
376     pfd.events = pfd.revents = ev;
377   }
378   /* LISTENER */
379   {
380     const size_t cpt = cshared.nb_conn_per_thread;
381     const short ev = (cpt > nfds) ? POLLIN : 0;
382     if (pfds.size() <= nfds) {
383       pfds.resize(nfds + 1);
384     }
385     pollfd& pfd = pfds[nfds++];
386     pfd.fd = cshared.listen_fd.get();
387     pfd.events = pfd.revents = ev;
388   }
389   /* POLL */
390   const int npollev = poll(&pfds[0], nfds, 1 * 1000);
391   dbctx->set_statistics(conns.size(), npollev);
392   const time_t now = time(0);
393   size_t j = 0;
394   const short mask_in = ~POLLOUT;
395   const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
396   /* READ */
397   for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
398     ++i, ++j) {
399     pollfd& pfd = pfds[j];
400     if ((pfd.revents & mask_in) == 0) {
401       continue;
402     }
403     hstcpsvr_conn& conn = **i;
404     if (conn.read_more()) {
405       if (conn.cstate.readbuf.size() > 0) {
406 	const char ch = conn.cstate.readbuf.begin()[0];
407 	if (ch == 'Q') {
408 	  vshared.shutdown = 1;
409 	} else if (ch == '/') {
410 	  conn.cstate.readbuf.clear();
411 	  conn.cstate.find_nl_pos = 0;
412 	  conn.cstate.writebuf.clear();
413 	  conn.read_finished = true;
414 	  conn.write_finished = true;
415 	}
416       }
417       conn.nb_last_io = now;
418     }
419   }
420   /* EXECUTE */
421   j = 0;
422   for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
423     ++i, ++j) {
424     pollfd& pfd = pfds[j];
425     if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
426       continue;
427     }
428     execute_lines(**i);
429   }
430   /* COMMIT */
431   dbctx->unlock_tables_if();
432   const bool commit_error = dbctx->get_commit_error();
433   dbctx->clear_error();
434   /* WRITE/CLOSE */
435   j = 0;
436   for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
437     ++j) {
438     pollfd& pfd = pfds[j];
439     hstcpsvr_conn& conn = **i;
440     hstcpsvr_conns_type::iterator icur = i;
441     ++i;
442     if (commit_error) {
443       conn.reset();
444       continue;
445     }
446     if ((pfd.revents & (mask_out | mask_in)) != 0) {
447       if (conn.write_more()) {
448 	conn.nb_last_io = now;
449       }
450     }
451     if (cshared.sockargs.timeout != 0 &&
452       conn.nb_last_io + cshared.sockargs.timeout < now) {
453       conn.reset();
454     }
455     if (conn.closed() || conn.ok_to_close()) {
456       conns.erase_ptr(icur);
457     }
458   }
459   /* ACCEPT */
460   {
461     pollfd& pfd = pfds[nfds - 1];
462     if ((pfd.revents & mask_in) != 0) {
463       std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
464       c->nonblocking = true;
465       c->readsize = cshared.readsize;
466       c->accept(cshared);
467       if (c->fd.get() >= 0) {
468 	if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
469 	  fatal_abort("F_SETFL O_NONBLOCK");
470 	}
471 	c->nb_last_io = now;
472 	conns.push_back_ptr(c);
473       } else {
474 	/* errno == 11 (EAGAIN) is not a fatal error. */
475 	DENA_VERBOSE(100, fprintf(stderr,
476 	  "accept failed: errno=%d (not fatal)\n", errno));
477       }
478     }
479   }
480   DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
481     conns.size()));
482   if (conns.empty()) {
483     dbctx->close_tables_if();
484   }
485   dbctx->set_statistics(conns.size(), 0);
486   return 0;
487 }
488 
489 #ifdef __linux__
490 int
run_one_ep()491 hstcpsvr_worker::run_one_ep()
492 {
493   epoll_event *const events = &events_vec[0];
494   const size_t num_events = events_vec.size();
495   const time_t now = time(0);
496   size_t in_count = 0, out_count = 0, accept_count = 0;
497   int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
498   /* READ/ACCEPT */
499   dbctx->set_statistics(conns.size(), nfds);
500   for (int i = 0; i < nfds; ++i) {
501     epoll_event& ev = events[i];
502     if ((ev.events & EPOLLIN) == 0) {
503       continue;
504     }
505     hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
506     if (conn == 0) {
507       /* listener */
508       ++accept_count;
509       DBG_EP(fprintf(stderr, "IN listener\n"));
510       std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
511       c->nonblocking = true;
512       c->readsize = cshared.readsize;
513       c->accept(cshared);
514       if (c->fd.get() >= 0) {
515 	if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
516 	  fatal_abort("F_SETFL O_NONBLOCK");
517 	}
518 	epoll_event cev;
519 	cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
520 	cev.data.ptr = c.get();
521 	c->nb_last_io = now;
522 	const int fd = c->fd.get();
523 	conns.push_back_ptr(c);
524 	conns.back()->conns_iter = --conns.end();
525 	if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
526 	  fatal_abort("epoll_ctl EPOLL_CTL_ADD");
527 	}
528       } else {
529 	DENA_VERBOSE(100, fprintf(stderr,
530 	  "accept failed: errno=%d (not fatal)\n", errno));
531       }
532     } else {
533       /* client connection */
534       ++in_count;
535       DBG_EP(fprintf(stderr, "IN client\n"));
536       bool more_data = false;
537       while (conn->read_more(&more_data)) {
538 	DBG_EP(fprintf(stderr, "IN client read_more\n"));
539 	conn->nb_last_io = now;
540 	if (!more_data) {
541 	  break;
542 	}
543       }
544     }
545   }
546   /* EXECUTE */
547   for (int i = 0; i < nfds; ++i) {
548     epoll_event& ev = events[i];
549     hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
550     if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
551       conn->cstate.readbuf.size() == 0) {
552       continue;
553     }
554     const char ch = conn->cstate.readbuf.begin()[0];
555     if (ch == 'Q') {
556       vshared.shutdown = 1;
557     } else if (ch == '/') {
558       conn->cstate.readbuf.clear();
559       conn->cstate.find_nl_pos = 0;
560       conn->cstate.writebuf.clear();
561       conn->read_finished = true;
562       conn->write_finished = true;
563     } else {
564       execute_lines(*conn);
565     }
566   }
567   /* COMMIT */
568   dbctx->unlock_tables_if();
569   const bool commit_error = dbctx->get_commit_error();
570   dbctx->clear_error();
571   /* WRITE */
572   for (int i = 0; i < nfds; ++i) {
573     epoll_event& ev = events[i];
574     hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
575     if (commit_error && conn != 0) {
576       conn->reset();
577       continue;
578     }
579     if ((ev.events & EPOLLOUT) == 0) {
580       continue;
581     }
582     ++out_count;
583     if (conn == 0) {
584       /* listener */
585       DBG_EP(fprintf(stderr, "OUT listener\n"));
586     } else {
587       /* client connection */
588       DBG_EP(fprintf(stderr, "OUT client\n"));
589       bool more_data = false;
590       while (conn->write_more(&more_data)) {
591 	DBG_EP(fprintf(stderr, "OUT client write_more\n"));
592 	conn->nb_last_io = now;
593 	if (!more_data) {
594 	  break;
595 	}
596       }
597     }
598   }
599   /* CLOSE */
600   for (int i = 0; i < nfds; ++i) {
601     epoll_event& ev = events[i];
602     hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
603     if (conn != 0 && conn->ok_to_close()) {
604       DBG_EP(fprintf(stderr, "CLOSE close\n"));
605       conns.erase_ptr(conn->conns_iter);
606     }
607   }
608   /* TIMEOUT & cleanup */
609   if (last_check_time + 10 < now) {
610     for (hstcpsvr_conns_type::iterator i = conns.begin();
611       i != conns.end(); ) {
612       hstcpsvr_conns_type::iterator icur = i;
613       ++i;
614       if (cshared.sockargs.timeout != 0 &&
615 	(*icur)->nb_last_io + cshared.sockargs.timeout < now) {
616 	conns.erase_ptr((*icur)->conns_iter);
617       }
618     }
619     last_check_time = now;
620     DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
621       conns.size()));
622   }
623   DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
624     this, in_count, out_count, accept_count, conns.size()));
625   if (conns.empty()) {
626     dbctx->close_tables_if();
627   }
628   /* STATISTICS */
629   const size_t num_conns = conns.size();
630   dbctx->set_statistics(num_conns, 0);
631   /* ENABLE/DISABLE ACCEPT */
632   if (accept_balance != 0) {
633     cshared.thread_num_conns[worker_id] = num_conns;
634     size_t total_num_conns = 0;
635     for (long i = 0; i < cshared.num_threads; ++i) {
636       total_num_conns += cshared.thread_num_conns[i];
637     }
638     bool e_acc = false;
639     if (num_conns < 10 ||
640       total_num_conns * 2 > num_conns * cshared.num_threads) {
641       e_acc = true;
642     }
643     epoll_event ev = { EPOLLIN, { 0 } };
644     if (e_acc == accept_enabled) {
645     } else if (e_acc) {
646       if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
647 	!= 0) {
648 	fatal_abort("epoll_ctl EPOLL_CTL_ADD");
649       }
650     } else {
651       if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
652 	!= 0) {
653 	fatal_abort("epoll_ctl EPOLL_CTL_ADD");
654       }
655     }
656     accept_enabled = e_acc;
657   }
658   return 0;
659 }
660 #endif
661 
662 void
execute_lines(hstcpsvr_conn & conn)663 hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
664 {
665   DBG_MULTI(int cnt = 0);
666   dbconnstate& cstate = conn.cstate;
667   char *buf_end = cstate.readbuf.end();
668   char *line_begin = cstate.readbuf.begin();
669   char *find_pos = line_begin + cstate.find_nl_pos;
670   while (true) {
671     char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
672     if (nl == 0) {
673       break;
674     }
675     char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
676     DBG_MULTI(cnt++);
677     execute_line(line_begin, lf, conn);
678     find_pos = line_begin = nl + 1;
679   }
680   cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
681   cstate.find_nl_pos = cstate.readbuf.size();
682   DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
683 }
684 
685 void
execute_line(char * start,char * finish,hstcpsvr_conn & conn)686 hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
687 {
688   /* safe to modify, safe to dereference 'finish' */
689   char *const cmd_begin = start;
690   read_token(start, finish);
691   char *const cmd_end = start;
692   skip_one(start, finish);
693   if (cmd_begin == cmd_end) {
694     return conn.dbcb_resp_short(2, "cmd");
695   }
696   if (cmd_begin + 1 == cmd_end) {
697     if (cmd_begin[0] == 'P') {
698       if (cshared.require_auth && !conn.authorized) {
699 	return conn.dbcb_resp_short(3, "unauth");
700       }
701       return do_open_index(start, finish, conn);
702     }
703     if (cmd_begin[0] == 'A') {
704       return do_authorization(start, finish, conn);
705     }
706   }
707   if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
708     if (cshared.require_auth && !conn.authorized) {
709       return conn.dbcb_resp_short(3, "unauth");
710     }
711     return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
712   }
713   return conn.dbcb_resp_short(2, "cmd");
714 }
715 
716 void
do_open_index(char * start,char * finish,hstcpsvr_conn & conn)717 hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
718 {
719   const size_t pst_id = read_ui32(start, finish);
720   skip_one(start, finish);
721   /* dbname */
722   char *const dbname_begin = start;
723   read_token(start, finish);
724   char *const dbname_end = start;
725   skip_one(start, finish);
726   /* tblname */
727   char *const tblname_begin = start;
728   read_token(start, finish);
729   char *const tblname_end = start;
730   skip_one(start, finish);
731   /* idxname */
732   char *const idxname_begin = start;
733   read_token(start, finish);
734   char *const idxname_end = start;
735   skip_one(start, finish);
736   /* retfields */
737   char *const retflds_begin = start;
738   read_token(start, finish);
739   char *const retflds_end = start;
740   skip_one(start, finish);
741   /* filfields */
742   char *const filflds_begin = start;
743   read_token(start, finish);
744   char *const filflds_end = start;
745   dbname_end[0] = 0;
746   tblname_end[0] = 0;
747   idxname_end[0] = 0;
748   retflds_end[0] = 0;
749   filflds_end[0] = 0;
750   cmd_open_args args;
751   args.pst_id = pst_id;
752   args.dbn = dbname_begin;
753   args.tbl = tblname_begin;
754   args.idx = idxname_begin;
755   args.retflds = retflds_begin;
756   args.filflds = filflds_begin;
757   return dbctx->cmd_open(conn, args);
758 }
759 
760 void
do_exec_on_index(char * cmd_begin,char * cmd_end,char * start,char * finish,hstcpsvr_conn & conn)761 hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
762   char *finish, hstcpsvr_conn& conn)
763 {
764   cmd_exec_args args;
765   const size_t pst_id = read_ui32(cmd_begin, cmd_end);
766   if (pst_id >= conn.cstate.prep_stmts.size()) {
767     return conn.dbcb_resp_short(2, "stmtnum");
768   }
769   args.pst = &conn.cstate.prep_stmts[pst_id];
770   char *const op_begin = start;
771   read_token(start, finish);
772   char *const op_end = start;
773   args.op = string_ref(op_begin, op_end);
774   skip_one(start, finish);
775   const uint32_t fldnum = read_ui32(start, finish);
776   string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
777   auto_alloca_free<string_ref> flds_autofree(flds);
778   args.kvals = flds;
779   args.kvalslen = fldnum;
780   for (size_t i = 0; i < fldnum; ++i) {
781     skip_one(start, finish);
782     char *const f_begin = start;
783     read_token(start, finish);
784     char *const f_end = start;
785     if (is_null_expression(f_begin, f_end)) {
786       /* null */
787       flds[i] = string_ref();
788     } else {
789       /* non-null */
790       char *wp = f_begin;
791       unescape_string(wp, f_begin, f_end);
792       flds[i] = string_ref(f_begin, wp - f_begin);
793     }
794   }
795   skip_one(start, finish);
796   args.limit = read_ui32(start, finish);
797   skip_one(start, finish);
798   args.skip = read_ui32(start, finish);
799   if (start == finish) {
800     /* simple query */
801     return dbctx->cmd_exec(conn, args);
802   }
803   /* has more options */
804   skip_one(start, finish);
805   /* in-clause */
806   if (start[0] == '@') {
807     read_token(start, finish); /* '@' */
808     skip_one(start, finish);
809     args.invalues_keypart = read_ui32(start, finish);
810     skip_one(start, finish);
811     args.invalueslen = read_ui32(start, finish);
812     if (args.invalueslen <= 0) {
813       return conn.dbcb_resp_short(2, "invalueslen");
814     }
815     if (invalues_work.size() < args.invalueslen) {
816       invalues_work.resize(args.invalueslen);
817     }
818     args.invalues = &invalues_work[0];
819     for (uint32_t i = 0; i < args.invalueslen; ++i) {
820       skip_one(start, finish);
821       char *const invalue_begin = start;
822       read_token(start, finish);
823       char *const invalue_end = start;
824       char *wp = invalue_begin;
825       unescape_string(wp, invalue_begin, invalue_end);
826       invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
827     }
828     skip_one(start, finish);
829   }
830   if (start == finish) {
831     /* no more options */
832     return dbctx->cmd_exec(conn, args);
833   }
834   /* filters */
835   size_t filters_count = 0;
836   while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
837     char *const filter_type_begin = start;
838     read_token(start, finish);
839     char *const filter_type_end = start;
840     skip_one(start, finish);
841     char *const filter_op_begin = start;
842     read_token(start, finish);
843     char *const filter_op_end = start;
844     skip_one(start, finish);
845     const uint32_t ff_offset = read_ui32(start, finish);
846     skip_one(start, finish);
847     char *const filter_val_begin = start;
848     read_token(start, finish);
849     char *const filter_val_end = start;
850     skip_one(start, finish);
851     if (filters_work.size() <= filters_count) {
852       filters_work.resize(filters_count + 1);
853     }
854     record_filter& fi = filters_work[filters_count];
855     if (filter_type_end != filter_type_begin + 1) {
856       return conn.dbcb_resp_short(2, "filtertype");
857     }
858     fi.filter_type = (filter_type_begin[0] == 'W')
859       ? record_filter_type_break : record_filter_type_skip;
860     const uint32_t num_filflds = args.pst->get_filter_fields().size();
861     if (ff_offset >= num_filflds) {
862       return conn.dbcb_resp_short(2, "filterfld");
863     }
864     fi.op = string_ref(filter_op_begin, filter_op_end);
865     fi.ff_offset = ff_offset;
866     if (is_null_expression(filter_val_begin, filter_val_end)) {
867       /* null */
868       fi.val = string_ref();
869     } else {
870       /* non-null */
871       char *wp = filter_val_begin;
872       unescape_string(wp, filter_val_begin, filter_val_end);
873       fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
874     }
875     ++filters_count;
876   }
877   if (filters_count > 0) {
878     if (filters_work.size() <= filters_count) {
879       filters_work.resize(filters_count + 1);
880     }
881     filters_work[filters_count].op = string_ref(); /* sentinel */
882     args.filters = &filters_work[0];
883   } else {
884     args.filters = 0;
885   }
886   if (start == finish) {
887     /* no modops */
888     return dbctx->cmd_exec(conn, args);
889   }
890   /* has modops */
891   char *const mod_op_begin = start;
892   read_token(start, finish);
893   char *const mod_op_end = start;
894   args.mod_op = string_ref(mod_op_begin, mod_op_end);
895   const size_t num_uvals = args.pst->get_ret_fields().size();
896   string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
897   auto_alloca_free<string_ref> uflds_autofree(uflds);
898   for (size_t i = 0; i < num_uvals; ++i) {
899     skip_one(start, finish);
900     char *const f_begin = start;
901     read_token(start, finish);
902     char *const f_end = start;
903     if (is_null_expression(f_begin, f_end)) {
904       /* null */
905       uflds[i] = string_ref();
906     } else {
907       /* non-null */
908       char *wp = f_begin;
909       unescape_string(wp, f_begin, f_end);
910       uflds[i] = string_ref(f_begin, wp - f_begin);
911     }
912   }
913   args.uvals = uflds;
914   return dbctx->cmd_exec(conn, args);
915 }
916 
917 void
do_authorization(char * start,char * finish,hstcpsvr_conn & conn)918 hstcpsvr_worker::do_authorization(char *start, char *finish,
919   hstcpsvr_conn& conn)
920 {
921   /* auth type */
922   char *const authtype_begin = start;
923   read_token(start, finish);
924   char *const authtype_end = start;
925   const size_t authtype_len = authtype_end - authtype_begin;
926   skip_one(start, finish);
927   /* key */
928   char *const key_begin = start;
929   read_token(start, finish);
930   char *const key_end = start;
931   const size_t key_len = key_end - key_begin;
932   authtype_end[0] = 0;
933   key_end[0] = 0;
934   char *wp = key_begin;
935   unescape_string(wp, key_begin, key_end);
936   if (authtype_len != 1 || authtype_begin[0] != '1') {
937     return conn.dbcb_resp_short(3, "authtype");
938   }
939   if (cshared.plain_secret.size() == key_len &&
940     memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
941     conn.authorized = true;
942   } else {
943     conn.authorized = false;
944   }
945   if (!conn.authorized) {
946     return conn.dbcb_resp_short(3, "unauth");
947   } else {
948     return conn.dbcb_resp_short(0, "");
949   }
950 }
951 
952 hstcpsvr_worker_ptr
create(const hstcpsvr_worker_arg & arg)953 hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
954 {
955   return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
956 }
957 
958 };
959 
960