1
2 // vim:sw=2:ai
3
4 /*
5 * Copyright (C) 2010 DeNA Co.,Ltd.. All rights reserved.
6 * See COPYRIGHT.txt for details.
7 */
8
9 #include <netinet/in.h>
10 #include <errno.h>
11 #include <poll.h>
12 #include <unistd.h>
13 #include <fcntl.h>
14 #include <stdexcept>
15 #include <signal.h>
16 #include <list>
17 #if __linux__
18 #include <sys/epoll.h>
19 #endif
20
21 #include "hstcpsvr_worker.hpp"
22 #include "string_buffer.hpp"
23 #include "auto_ptrcontainer.hpp"
24 #include "string_util.hpp"
25 #include "escape.hpp"
26
27 #define DBG_FD(x)
28 #define DBG_TR(x)
29 #define DBG_EP(x)
30 #define DBG_MULTI(x)
31
32 /* TODO */
33 #if !defined(__linux__) && !defined(__FreeBSD__) && !defined(MSG_NOSIGNAL)
34 #define MSG_NOSIGNAL 0
35 #endif
36
37 namespace dena {
38
39 struct dbconnstate {
40 string_buffer readbuf;
41 string_buffer writebuf;
42 std::vector<prep_stmt> prep_stmts;
43 size_t resp_begin_pos;
44 size_t find_nl_pos;
resetdena::dbconnstate45 void reset() {
46 readbuf.clear();
47 writebuf.clear();
48 prep_stmts.clear();
49 resp_begin_pos = 0;
50 find_nl_pos = 0;
51 }
dbconnstatedena::dbconnstate52 dbconnstate() : resp_begin_pos(0), find_nl_pos(0) { }
53 };
54
55 struct hstcpsvr_conn;
56 typedef auto_ptrcontainer< std::list<hstcpsvr_conn *> > hstcpsvr_conns_type;
57
58 struct hstcpsvr_conn : public dbcallback_i {
59 public:
60 auto_file fd;
61 sockaddr_storage addr;
62 socklen_t addr_len;
63 dbconnstate cstate;
64 std::string err;
65 size_t readsize;
66 bool nonblocking;
67 bool read_finished;
68 bool write_finished;
69 time_t nb_last_io;
70 hstcpsvr_conns_type::iterator conns_iter;
71 bool authorized;
72 public:
73 bool closed() const;
74 bool ok_to_close() const;
75 void reset();
76 int accept(const hstcpsvr_shared_c& cshared);
77 bool write_more(bool *more_r = 0);
78 bool read_more(bool *more_r = 0);
79 public:
80 virtual void dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v);
81 virtual const prep_stmt *dbcb_get_prep_stmt(size_t pst_id) const;
82 virtual void dbcb_resp_short(uint32_t code, const char *msg);
83 virtual void dbcb_resp_short_num(uint32_t code, uint32_t value);
84 virtual void dbcb_resp_short_num64(uint32_t code, uint64_t value);
85 virtual void dbcb_resp_begin(size_t num_flds);
86 virtual void dbcb_resp_entry(const char *fld, size_t fldlen);
87 virtual void dbcb_resp_end();
88 virtual void dbcb_resp_cancel();
89 public:
hstcpsvr_conndena::hstcpsvr_conn90 hstcpsvr_conn() : addr_len(sizeof(addr)), readsize(4096),
91 nonblocking(false), read_finished(false), write_finished(false),
92 nb_last_io(0), authorized(false) { }
93 };
94
95 bool
closed() const96 hstcpsvr_conn::closed() const
97 {
98 return fd.get() < 0;
99 }
100
101 bool
ok_to_close() const102 hstcpsvr_conn::ok_to_close() const
103 {
104 return write_finished || (read_finished && cstate.writebuf.size() == 0);
105 }
106
107 void
reset()108 hstcpsvr_conn::reset()
109 {
110 addr = sockaddr_storage();
111 addr_len = sizeof(addr);
112 cstate.reset();
113 fd.reset();
114 read_finished = false;
115 write_finished = false;
116 }
117
118 int
accept(const hstcpsvr_shared_c & cshared)119 hstcpsvr_conn::accept(const hstcpsvr_shared_c& cshared)
120 {
121 reset();
122 return socket_accept(cshared.listen_fd.get(), fd, cshared.sockargs, addr,
123 addr_len, err);
124 }
125
126 bool
write_more(bool * more_r)127 hstcpsvr_conn::write_more(bool *more_r)
128 {
129 if (write_finished || cstate.writebuf.size() == 0) {
130 return false;
131 }
132 const size_t wlen = cstate.writebuf.size();
133 ssize_t len = send(fd.get(), cstate.writebuf.begin(), wlen, MSG_NOSIGNAL);
134 if (len <= 0) {
135 if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
136 cstate.writebuf.clear();
137 write_finished = true;
138 }
139 return false;
140 }
141 cstate.writebuf.erase_front(len);
142 /* FIXME: reallocate memory if too large */
143 if (more_r) {
144 *more_r = (static_cast<size_t>(len) == wlen);
145 }
146 return true;
147 }
148
149 bool
read_more(bool * more_r)150 hstcpsvr_conn::read_more(bool *more_r)
151 {
152 if (read_finished) {
153 return false;
154 }
155 const size_t block_size = readsize > 4096 ? readsize : 4096;
156 char *wp = cstate.readbuf.make_space(block_size);
157 const ssize_t len = read(fd.get(), wp, block_size);
158 if (len <= 0) {
159 if (len == 0 || !nonblocking || errno != EWOULDBLOCK) {
160 read_finished = true;
161 }
162 return false;
163 }
164 cstate.readbuf.space_wrote(len);
165 if (more_r) {
166 *more_r = (static_cast<size_t>(len) == block_size);
167 }
168 return true;
169 }
170
171 void
dbcb_set_prep_stmt(size_t pst_id,const prep_stmt & v)172 hstcpsvr_conn::dbcb_set_prep_stmt(size_t pst_id, const prep_stmt& v)
173 {
174 if (cstate.prep_stmts.size() <= pst_id) {
175 cstate.prep_stmts.resize(pst_id + 1);
176 }
177 cstate.prep_stmts[pst_id] = v;
178 }
179
180 const prep_stmt *
dbcb_get_prep_stmt(size_t pst_id) const181 hstcpsvr_conn::dbcb_get_prep_stmt(size_t pst_id) const
182 {
183 if (cstate.prep_stmts.size() <= pst_id) {
184 return 0;
185 }
186 return &cstate.prep_stmts[pst_id];
187 }
188
189 void
dbcb_resp_short(uint32_t code,const char * msg)190 hstcpsvr_conn::dbcb_resp_short(uint32_t code, const char *msg)
191 {
192 write_ui32(cstate.writebuf, code);
193 const size_t msglen = strlen(msg);
194 if (msglen != 0) {
195 cstate.writebuf.append_literal("\t1\t");
196 cstate.writebuf.append(msg, msg + msglen);
197 } else {
198 cstate.writebuf.append_literal("\t1");
199 }
200 cstate.writebuf.append_literal("\n");
201 }
202
203 void
dbcb_resp_short_num(uint32_t code,uint32_t value)204 hstcpsvr_conn::dbcb_resp_short_num(uint32_t code, uint32_t value)
205 {
206 write_ui32(cstate.writebuf, code);
207 cstate.writebuf.append_literal("\t1\t");
208 write_ui32(cstate.writebuf, value);
209 cstate.writebuf.append_literal("\n");
210 }
211
212 void
dbcb_resp_short_num64(uint32_t code,uint64_t value)213 hstcpsvr_conn::dbcb_resp_short_num64(uint32_t code, uint64_t value)
214 {
215 write_ui32(cstate.writebuf, code);
216 cstate.writebuf.append_literal("\t1\t");
217 write_ui64(cstate.writebuf, value);
218 cstate.writebuf.append_literal("\n");
219 }
220
221 void
dbcb_resp_begin(size_t num_flds)222 hstcpsvr_conn::dbcb_resp_begin(size_t num_flds)
223 {
224 cstate.resp_begin_pos = cstate.writebuf.size();
225 cstate.writebuf.append_literal("0\t");
226 write_ui32(cstate.writebuf, num_flds);
227 }
228
229 void
dbcb_resp_entry(const char * fld,size_t fldlen)230 hstcpsvr_conn::dbcb_resp_entry(const char *fld, size_t fldlen)
231 {
232 if (fld != 0) {
233 cstate.writebuf.append_literal("\t");
234 escape_string(cstate.writebuf, fld, fld + fldlen);
235 } else {
236 static const char t[] = "\t\0";
237 cstate.writebuf.append(t, t + 2);
238 }
239 }
240
241 void
dbcb_resp_end()242 hstcpsvr_conn::dbcb_resp_end()
243 {
244 cstate.writebuf.append_literal("\n");
245 cstate.resp_begin_pos = 0;
246 }
247
248 void
dbcb_resp_cancel()249 hstcpsvr_conn::dbcb_resp_cancel()
250 {
251 cstate.writebuf.resize(cstate.resp_begin_pos);
252 cstate.resp_begin_pos = 0;
253 }
254
255 struct hstcpsvr_worker : public hstcpsvr_worker_i, private noncopyable {
256 hstcpsvr_worker(const hstcpsvr_worker_arg& arg);
257 virtual void run();
258 private:
259 const hstcpsvr_shared_c& cshared;
260 volatile hstcpsvr_shared_v& vshared;
261 long worker_id;
262 dbcontext_ptr dbctx;
263 hstcpsvr_conns_type conns; /* conns refs dbctx */
264 time_t last_check_time;
265 std::vector<pollfd> pfds;
266 #ifdef __linux__
267 std::vector<epoll_event> events_vec;
268 auto_file epoll_fd;
269 #endif
270 bool accept_enabled;
271 int accept_balance;
272 std::vector<string_ref> invalues_work;
273 std::vector<record_filter> filters_work;
274 private:
275 int run_one_nb();
276 int run_one_ep();
277 void execute_lines(hstcpsvr_conn& conn);
278 void execute_line(char *start, char *finish, hstcpsvr_conn& conn);
279 void do_open_index(char *start, char *finish, hstcpsvr_conn& conn);
280 void do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
281 char *finish, hstcpsvr_conn& conn);
282 void do_authorization(char *start, char *finish, hstcpsvr_conn& conn);
283 };
284
hstcpsvr_worker(const hstcpsvr_worker_arg & arg)285 hstcpsvr_worker::hstcpsvr_worker(const hstcpsvr_worker_arg& arg)
286 : cshared(*arg.cshared), vshared(*arg.vshared), worker_id(arg.worker_id),
287 dbctx(cshared.dbptr->create_context(cshared.for_write_flag)),
288 last_check_time(time(0)), accept_enabled(true), accept_balance(0)
289 {
290 #ifdef __linux__
291 if (cshared.sockargs.use_epoll) {
292 epoll_fd.reset(epoll_create(10));
293 if (epoll_fd.get() < 0) {
294 fatal_abort("epoll_create");
295 }
296 epoll_event ev = { EPOLLIN, { 0 } };
297 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
298 != 0) {
299 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
300 }
301 events_vec.resize(10240);
302 }
303 #endif
304 accept_balance = cshared.conf.get_int("accept_balance", 0);
305 }
306
307 namespace {
308
309 struct thr_init {
thr_initdena::__anone84201950111::thr_init310 thr_init(const dbcontext_ptr& dc, volatile int& shutdown_flag) : dbctx(dc) {
311 dbctx->init_thread(this, shutdown_flag);
312 }
~thr_initdena::__anone84201950111::thr_init313 ~thr_init() {
314 dbctx->term_thread();
315 }
316 const dbcontext_ptr& dbctx;
317 };
318
319 }; // namespace
320
321 void
run()322 hstcpsvr_worker::run()
323 {
324 thr_init initobj(dbctx, vshared.shutdown);
325
326 {
327 lock_guard crit_sec(const_cast<mutex &>(vshared.v_mutex));
328 ++vshared.threads_started;
329 if (vshared.threads_started == cshared.num_threads)
330 {
331 pthread_cond_signal(
332 const_cast<pthread_cond_t *>(&vshared.threads_started_cond));
333 }
334 }
335
336 dbctx->wait_for_server_to_start();
337
338 #ifdef __linux__
339 if (cshared.sockargs.use_epoll) {
340 while (!vshared.shutdown && dbctx->check_alive()) {
341 run_one_ep();
342 }
343 } else if (cshared.sockargs.nonblocking) {
344 while (!vshared.shutdown && dbctx->check_alive()) {
345 run_one_nb();
346 }
347 } else {
348 /* UNUSED */
349 fatal_abort("run_one");
350 }
351 #else
352 while (!vshared.shutdown && dbctx->check_alive()) {
353 run_one_nb();
354 }
355 #endif
356 }
357
358 int
run_one_nb()359 hstcpsvr_worker::run_one_nb()
360 {
361 size_t nfds = 0;
362 /* CLIENT SOCKETS */
363 for (hstcpsvr_conns_type::const_iterator i = conns.begin();
364 i != conns.end(); ++i) {
365 if (pfds.size() <= nfds) {
366 pfds.resize(nfds + 1);
367 }
368 pollfd& pfd = pfds[nfds++];
369 pfd.fd = (*i)->fd.get();
370 short ev = 0;
371 if ((*i)->cstate.writebuf.size() != 0) {
372 ev = POLLOUT;
373 } else {
374 ev = POLLIN;
375 }
376 pfd.events = pfd.revents = ev;
377 }
378 /* LISTENER */
379 {
380 const size_t cpt = cshared.nb_conn_per_thread;
381 const short ev = (cpt > nfds) ? POLLIN : 0;
382 if (pfds.size() <= nfds) {
383 pfds.resize(nfds + 1);
384 }
385 pollfd& pfd = pfds[nfds++];
386 pfd.fd = cshared.listen_fd.get();
387 pfd.events = pfd.revents = ev;
388 }
389 /* POLL */
390 const int npollev = poll(&pfds[0], nfds, 1 * 1000);
391 dbctx->set_statistics(conns.size(), npollev);
392 const time_t now = time(0);
393 size_t j = 0;
394 const short mask_in = ~POLLOUT;
395 const short mask_out = POLLOUT | POLLERR | POLLHUP | POLLNVAL;
396 /* READ */
397 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
398 ++i, ++j) {
399 pollfd& pfd = pfds[j];
400 if ((pfd.revents & mask_in) == 0) {
401 continue;
402 }
403 hstcpsvr_conn& conn = **i;
404 if (conn.read_more()) {
405 if (conn.cstate.readbuf.size() > 0) {
406 const char ch = conn.cstate.readbuf.begin()[0];
407 if (ch == 'Q') {
408 vshared.shutdown = 1;
409 } else if (ch == '/') {
410 conn.cstate.readbuf.clear();
411 conn.cstate.find_nl_pos = 0;
412 conn.cstate.writebuf.clear();
413 conn.read_finished = true;
414 conn.write_finished = true;
415 }
416 }
417 conn.nb_last_io = now;
418 }
419 }
420 /* EXECUTE */
421 j = 0;
422 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
423 ++i, ++j) {
424 pollfd& pfd = pfds[j];
425 if ((pfd.revents & mask_in) == 0 || (*i)->cstate.readbuf.size() == 0) {
426 continue;
427 }
428 execute_lines(**i);
429 }
430 /* COMMIT */
431 dbctx->unlock_tables_if();
432 const bool commit_error = dbctx->get_commit_error();
433 dbctx->clear_error();
434 /* WRITE/CLOSE */
435 j = 0;
436 for (hstcpsvr_conns_type::iterator i = conns.begin(); i != conns.end();
437 ++j) {
438 pollfd& pfd = pfds[j];
439 hstcpsvr_conn& conn = **i;
440 hstcpsvr_conns_type::iterator icur = i;
441 ++i;
442 if (commit_error) {
443 conn.reset();
444 continue;
445 }
446 if ((pfd.revents & (mask_out | mask_in)) != 0) {
447 if (conn.write_more()) {
448 conn.nb_last_io = now;
449 }
450 }
451 if (cshared.sockargs.timeout != 0 &&
452 conn.nb_last_io + cshared.sockargs.timeout < now) {
453 conn.reset();
454 }
455 if (conn.closed() || conn.ok_to_close()) {
456 conns.erase_ptr(icur);
457 }
458 }
459 /* ACCEPT */
460 {
461 pollfd& pfd = pfds[nfds - 1];
462 if ((pfd.revents & mask_in) != 0) {
463 std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
464 c->nonblocking = true;
465 c->readsize = cshared.readsize;
466 c->accept(cshared);
467 if (c->fd.get() >= 0) {
468 if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
469 fatal_abort("F_SETFL O_NONBLOCK");
470 }
471 c->nb_last_io = now;
472 conns.push_back_ptr(c);
473 } else {
474 /* errno == 11 (EAGAIN) is not a fatal error. */
475 DENA_VERBOSE(100, fprintf(stderr,
476 "accept failed: errno=%d (not fatal)\n", errno));
477 }
478 }
479 }
480 DENA_VERBOSE(30, fprintf(stderr, "nb: %p nfds=%zu cns=%zu\n", this, nfds,
481 conns.size()));
482 if (conns.empty()) {
483 dbctx->close_tables_if();
484 }
485 dbctx->set_statistics(conns.size(), 0);
486 return 0;
487 }
488
489 #ifdef __linux__
490 int
run_one_ep()491 hstcpsvr_worker::run_one_ep()
492 {
493 epoll_event *const events = &events_vec[0];
494 const size_t num_events = events_vec.size();
495 const time_t now = time(0);
496 size_t in_count = 0, out_count = 0, accept_count = 0;
497 int nfds = epoll_wait(epoll_fd.get(), events, num_events, 1000);
498 /* READ/ACCEPT */
499 dbctx->set_statistics(conns.size(), nfds);
500 for (int i = 0; i < nfds; ++i) {
501 epoll_event& ev = events[i];
502 if ((ev.events & EPOLLIN) == 0) {
503 continue;
504 }
505 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
506 if (conn == 0) {
507 /* listener */
508 ++accept_count;
509 DBG_EP(fprintf(stderr, "IN listener\n"));
510 std::auto_ptr<hstcpsvr_conn> c(new hstcpsvr_conn());
511 c->nonblocking = true;
512 c->readsize = cshared.readsize;
513 c->accept(cshared);
514 if (c->fd.get() >= 0) {
515 if (fcntl(c->fd.get(), F_SETFL, O_NONBLOCK) != 0) {
516 fatal_abort("F_SETFL O_NONBLOCK");
517 }
518 epoll_event cev;
519 cev.events = EPOLLIN | EPOLLOUT | EPOLLET;
520 cev.data.ptr = c.get();
521 c->nb_last_io = now;
522 const int fd = c->fd.get();
523 conns.push_back_ptr(c);
524 conns.back()->conns_iter = --conns.end();
525 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, fd, &cev) != 0) {
526 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
527 }
528 } else {
529 DENA_VERBOSE(100, fprintf(stderr,
530 "accept failed: errno=%d (not fatal)\n", errno));
531 }
532 } else {
533 /* client connection */
534 ++in_count;
535 DBG_EP(fprintf(stderr, "IN client\n"));
536 bool more_data = false;
537 while (conn->read_more(&more_data)) {
538 DBG_EP(fprintf(stderr, "IN client read_more\n"));
539 conn->nb_last_io = now;
540 if (!more_data) {
541 break;
542 }
543 }
544 }
545 }
546 /* EXECUTE */
547 for (int i = 0; i < nfds; ++i) {
548 epoll_event& ev = events[i];
549 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
550 if ((ev.events & EPOLLIN) == 0 || conn == 0 ||
551 conn->cstate.readbuf.size() == 0) {
552 continue;
553 }
554 const char ch = conn->cstate.readbuf.begin()[0];
555 if (ch == 'Q') {
556 vshared.shutdown = 1;
557 } else if (ch == '/') {
558 conn->cstate.readbuf.clear();
559 conn->cstate.find_nl_pos = 0;
560 conn->cstate.writebuf.clear();
561 conn->read_finished = true;
562 conn->write_finished = true;
563 } else {
564 execute_lines(*conn);
565 }
566 }
567 /* COMMIT */
568 dbctx->unlock_tables_if();
569 const bool commit_error = dbctx->get_commit_error();
570 dbctx->clear_error();
571 /* WRITE */
572 for (int i = 0; i < nfds; ++i) {
573 epoll_event& ev = events[i];
574 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
575 if (commit_error && conn != 0) {
576 conn->reset();
577 continue;
578 }
579 if ((ev.events & EPOLLOUT) == 0) {
580 continue;
581 }
582 ++out_count;
583 if (conn == 0) {
584 /* listener */
585 DBG_EP(fprintf(stderr, "OUT listener\n"));
586 } else {
587 /* client connection */
588 DBG_EP(fprintf(stderr, "OUT client\n"));
589 bool more_data = false;
590 while (conn->write_more(&more_data)) {
591 DBG_EP(fprintf(stderr, "OUT client write_more\n"));
592 conn->nb_last_io = now;
593 if (!more_data) {
594 break;
595 }
596 }
597 }
598 }
599 /* CLOSE */
600 for (int i = 0; i < nfds; ++i) {
601 epoll_event& ev = events[i];
602 hstcpsvr_conn *const conn = static_cast<hstcpsvr_conn *>(ev.data.ptr);
603 if (conn != 0 && conn->ok_to_close()) {
604 DBG_EP(fprintf(stderr, "CLOSE close\n"));
605 conns.erase_ptr(conn->conns_iter);
606 }
607 }
608 /* TIMEOUT & cleanup */
609 if (last_check_time + 10 < now) {
610 for (hstcpsvr_conns_type::iterator i = conns.begin();
611 i != conns.end(); ) {
612 hstcpsvr_conns_type::iterator icur = i;
613 ++i;
614 if (cshared.sockargs.timeout != 0 &&
615 (*icur)->nb_last_io + cshared.sockargs.timeout < now) {
616 conns.erase_ptr((*icur)->conns_iter);
617 }
618 }
619 last_check_time = now;
620 DENA_VERBOSE(20, fprintf(stderr, "ep: %p nfds=%d cns=%zu\n", this, nfds,
621 conns.size()));
622 }
623 DENA_VERBOSE(30, fprintf(stderr, "%p in=%zu out=%zu ac=%zu, cns=%zu\n",
624 this, in_count, out_count, accept_count, conns.size()));
625 if (conns.empty()) {
626 dbctx->close_tables_if();
627 }
628 /* STATISTICS */
629 const size_t num_conns = conns.size();
630 dbctx->set_statistics(num_conns, 0);
631 /* ENABLE/DISABLE ACCEPT */
632 if (accept_balance != 0) {
633 cshared.thread_num_conns[worker_id] = num_conns;
634 size_t total_num_conns = 0;
635 for (long i = 0; i < cshared.num_threads; ++i) {
636 total_num_conns += cshared.thread_num_conns[i];
637 }
638 bool e_acc = false;
639 if (num_conns < 10 ||
640 total_num_conns * 2 > num_conns * cshared.num_threads) {
641 e_acc = true;
642 }
643 epoll_event ev = { EPOLLIN, { 0 } };
644 if (e_acc == accept_enabled) {
645 } else if (e_acc) {
646 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_ADD, cshared.listen_fd.get(), &ev)
647 != 0) {
648 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
649 }
650 } else {
651 if (epoll_ctl(epoll_fd.get(), EPOLL_CTL_DEL, cshared.listen_fd.get(), &ev)
652 != 0) {
653 fatal_abort("epoll_ctl EPOLL_CTL_ADD");
654 }
655 }
656 accept_enabled = e_acc;
657 }
658 return 0;
659 }
660 #endif
661
662 void
execute_lines(hstcpsvr_conn & conn)663 hstcpsvr_worker::execute_lines(hstcpsvr_conn& conn)
664 {
665 DBG_MULTI(int cnt = 0);
666 dbconnstate& cstate = conn.cstate;
667 char *buf_end = cstate.readbuf.end();
668 char *line_begin = cstate.readbuf.begin();
669 char *find_pos = line_begin + cstate.find_nl_pos;
670 while (true) {
671 char *const nl = memchr_char(find_pos, '\n', buf_end - find_pos);
672 if (nl == 0) {
673 break;
674 }
675 char *const lf = (line_begin != nl && nl[-1] == '\r') ? nl - 1 : nl;
676 DBG_MULTI(cnt++);
677 execute_line(line_begin, lf, conn);
678 find_pos = line_begin = nl + 1;
679 }
680 cstate.readbuf.erase_front(line_begin - cstate.readbuf.begin());
681 cstate.find_nl_pos = cstate.readbuf.size();
682 DBG_MULTI(fprintf(stderr, "cnt=%d\n", cnt));
683 }
684
685 void
execute_line(char * start,char * finish,hstcpsvr_conn & conn)686 hstcpsvr_worker::execute_line(char *start, char *finish, hstcpsvr_conn& conn)
687 {
688 /* safe to modify, safe to dereference 'finish' */
689 char *const cmd_begin = start;
690 read_token(start, finish);
691 char *const cmd_end = start;
692 skip_one(start, finish);
693 if (cmd_begin == cmd_end) {
694 return conn.dbcb_resp_short(2, "cmd");
695 }
696 if (cmd_begin + 1 == cmd_end) {
697 if (cmd_begin[0] == 'P') {
698 if (cshared.require_auth && !conn.authorized) {
699 return conn.dbcb_resp_short(3, "unauth");
700 }
701 return do_open_index(start, finish, conn);
702 }
703 if (cmd_begin[0] == 'A') {
704 return do_authorization(start, finish, conn);
705 }
706 }
707 if (cmd_begin[0] >= '0' && cmd_begin[0] <= '9') {
708 if (cshared.require_auth && !conn.authorized) {
709 return conn.dbcb_resp_short(3, "unauth");
710 }
711 return do_exec_on_index(cmd_begin, cmd_end, start, finish, conn);
712 }
713 return conn.dbcb_resp_short(2, "cmd");
714 }
715
716 void
do_open_index(char * start,char * finish,hstcpsvr_conn & conn)717 hstcpsvr_worker::do_open_index(char *start, char *finish, hstcpsvr_conn& conn)
718 {
719 const size_t pst_id = read_ui32(start, finish);
720 skip_one(start, finish);
721 /* dbname */
722 char *const dbname_begin = start;
723 read_token(start, finish);
724 char *const dbname_end = start;
725 skip_one(start, finish);
726 /* tblname */
727 char *const tblname_begin = start;
728 read_token(start, finish);
729 char *const tblname_end = start;
730 skip_one(start, finish);
731 /* idxname */
732 char *const idxname_begin = start;
733 read_token(start, finish);
734 char *const idxname_end = start;
735 skip_one(start, finish);
736 /* retfields */
737 char *const retflds_begin = start;
738 read_token(start, finish);
739 char *const retflds_end = start;
740 skip_one(start, finish);
741 /* filfields */
742 char *const filflds_begin = start;
743 read_token(start, finish);
744 char *const filflds_end = start;
745 dbname_end[0] = 0;
746 tblname_end[0] = 0;
747 idxname_end[0] = 0;
748 retflds_end[0] = 0;
749 filflds_end[0] = 0;
750 cmd_open_args args;
751 args.pst_id = pst_id;
752 args.dbn = dbname_begin;
753 args.tbl = tblname_begin;
754 args.idx = idxname_begin;
755 args.retflds = retflds_begin;
756 args.filflds = filflds_begin;
757 return dbctx->cmd_open(conn, args);
758 }
759
760 void
do_exec_on_index(char * cmd_begin,char * cmd_end,char * start,char * finish,hstcpsvr_conn & conn)761 hstcpsvr_worker::do_exec_on_index(char *cmd_begin, char *cmd_end, char *start,
762 char *finish, hstcpsvr_conn& conn)
763 {
764 cmd_exec_args args;
765 const size_t pst_id = read_ui32(cmd_begin, cmd_end);
766 if (pst_id >= conn.cstate.prep_stmts.size()) {
767 return conn.dbcb_resp_short(2, "stmtnum");
768 }
769 args.pst = &conn.cstate.prep_stmts[pst_id];
770 char *const op_begin = start;
771 read_token(start, finish);
772 char *const op_end = start;
773 args.op = string_ref(op_begin, op_end);
774 skip_one(start, finish);
775 const uint32_t fldnum = read_ui32(start, finish);
776 string_ref *const flds = DENA_ALLOCA_ALLOCATE(string_ref, fldnum);
777 auto_alloca_free<string_ref> flds_autofree(flds);
778 args.kvals = flds;
779 args.kvalslen = fldnum;
780 for (size_t i = 0; i < fldnum; ++i) {
781 skip_one(start, finish);
782 char *const f_begin = start;
783 read_token(start, finish);
784 char *const f_end = start;
785 if (is_null_expression(f_begin, f_end)) {
786 /* null */
787 flds[i] = string_ref();
788 } else {
789 /* non-null */
790 char *wp = f_begin;
791 unescape_string(wp, f_begin, f_end);
792 flds[i] = string_ref(f_begin, wp - f_begin);
793 }
794 }
795 skip_one(start, finish);
796 args.limit = read_ui32(start, finish);
797 skip_one(start, finish);
798 args.skip = read_ui32(start, finish);
799 if (start == finish) {
800 /* simple query */
801 return dbctx->cmd_exec(conn, args);
802 }
803 /* has more options */
804 skip_one(start, finish);
805 /* in-clause */
806 if (start[0] == '@') {
807 read_token(start, finish); /* '@' */
808 skip_one(start, finish);
809 args.invalues_keypart = read_ui32(start, finish);
810 skip_one(start, finish);
811 args.invalueslen = read_ui32(start, finish);
812 if (args.invalueslen <= 0) {
813 return conn.dbcb_resp_short(2, "invalueslen");
814 }
815 if (invalues_work.size() < args.invalueslen) {
816 invalues_work.resize(args.invalueslen);
817 }
818 args.invalues = &invalues_work[0];
819 for (uint32_t i = 0; i < args.invalueslen; ++i) {
820 skip_one(start, finish);
821 char *const invalue_begin = start;
822 read_token(start, finish);
823 char *const invalue_end = start;
824 char *wp = invalue_begin;
825 unescape_string(wp, invalue_begin, invalue_end);
826 invalues_work[i] = string_ref(invalue_begin, wp - invalue_begin);
827 }
828 skip_one(start, finish);
829 }
830 if (start == finish) {
831 /* no more options */
832 return dbctx->cmd_exec(conn, args);
833 }
834 /* filters */
835 size_t filters_count = 0;
836 while (start != finish && (start[0] == 'W' || start[0] == 'F')) {
837 char *const filter_type_begin = start;
838 read_token(start, finish);
839 char *const filter_type_end = start;
840 skip_one(start, finish);
841 char *const filter_op_begin = start;
842 read_token(start, finish);
843 char *const filter_op_end = start;
844 skip_one(start, finish);
845 const uint32_t ff_offset = read_ui32(start, finish);
846 skip_one(start, finish);
847 char *const filter_val_begin = start;
848 read_token(start, finish);
849 char *const filter_val_end = start;
850 skip_one(start, finish);
851 if (filters_work.size() <= filters_count) {
852 filters_work.resize(filters_count + 1);
853 }
854 record_filter& fi = filters_work[filters_count];
855 if (filter_type_end != filter_type_begin + 1) {
856 return conn.dbcb_resp_short(2, "filtertype");
857 }
858 fi.filter_type = (filter_type_begin[0] == 'W')
859 ? record_filter_type_break : record_filter_type_skip;
860 const uint32_t num_filflds = args.pst->get_filter_fields().size();
861 if (ff_offset >= num_filflds) {
862 return conn.dbcb_resp_short(2, "filterfld");
863 }
864 fi.op = string_ref(filter_op_begin, filter_op_end);
865 fi.ff_offset = ff_offset;
866 if (is_null_expression(filter_val_begin, filter_val_end)) {
867 /* null */
868 fi.val = string_ref();
869 } else {
870 /* non-null */
871 char *wp = filter_val_begin;
872 unescape_string(wp, filter_val_begin, filter_val_end);
873 fi.val = string_ref(filter_val_begin, wp - filter_val_begin);
874 }
875 ++filters_count;
876 }
877 if (filters_count > 0) {
878 if (filters_work.size() <= filters_count) {
879 filters_work.resize(filters_count + 1);
880 }
881 filters_work[filters_count].op = string_ref(); /* sentinel */
882 args.filters = &filters_work[0];
883 } else {
884 args.filters = 0;
885 }
886 if (start == finish) {
887 /* no modops */
888 return dbctx->cmd_exec(conn, args);
889 }
890 /* has modops */
891 char *const mod_op_begin = start;
892 read_token(start, finish);
893 char *const mod_op_end = start;
894 args.mod_op = string_ref(mod_op_begin, mod_op_end);
895 const size_t num_uvals = args.pst->get_ret_fields().size();
896 string_ref *const uflds = DENA_ALLOCA_ALLOCATE(string_ref, num_uvals);
897 auto_alloca_free<string_ref> uflds_autofree(uflds);
898 for (size_t i = 0; i < num_uvals; ++i) {
899 skip_one(start, finish);
900 char *const f_begin = start;
901 read_token(start, finish);
902 char *const f_end = start;
903 if (is_null_expression(f_begin, f_end)) {
904 /* null */
905 uflds[i] = string_ref();
906 } else {
907 /* non-null */
908 char *wp = f_begin;
909 unescape_string(wp, f_begin, f_end);
910 uflds[i] = string_ref(f_begin, wp - f_begin);
911 }
912 }
913 args.uvals = uflds;
914 return dbctx->cmd_exec(conn, args);
915 }
916
917 void
do_authorization(char * start,char * finish,hstcpsvr_conn & conn)918 hstcpsvr_worker::do_authorization(char *start, char *finish,
919 hstcpsvr_conn& conn)
920 {
921 /* auth type */
922 char *const authtype_begin = start;
923 read_token(start, finish);
924 char *const authtype_end = start;
925 const size_t authtype_len = authtype_end - authtype_begin;
926 skip_one(start, finish);
927 /* key */
928 char *const key_begin = start;
929 read_token(start, finish);
930 char *const key_end = start;
931 const size_t key_len = key_end - key_begin;
932 authtype_end[0] = 0;
933 key_end[0] = 0;
934 char *wp = key_begin;
935 unescape_string(wp, key_begin, key_end);
936 if (authtype_len != 1 || authtype_begin[0] != '1') {
937 return conn.dbcb_resp_short(3, "authtype");
938 }
939 if (cshared.plain_secret.size() == key_len &&
940 memcmp(cshared.plain_secret.data(), key_begin, key_len) == 0) {
941 conn.authorized = true;
942 } else {
943 conn.authorized = false;
944 }
945 if (!conn.authorized) {
946 return conn.dbcb_resp_short(3, "unauth");
947 } else {
948 return conn.dbcb_resp_short(0, "");
949 }
950 }
951
952 hstcpsvr_worker_ptr
create(const hstcpsvr_worker_arg & arg)953 hstcpsvr_worker_i::create(const hstcpsvr_worker_arg& arg)
954 {
955 return hstcpsvr_worker_ptr(new hstcpsvr_worker(arg));
956 }
957
958 };
959
960