1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2015 Brazil
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License version 2.1 as published by the Free Software Foundation.
7
8 This library is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 Lesser General Public License for more details.
12
13 You should have received a copy of the GNU Lesser General Public
14 License along with this library; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
16 */
17
18 /* groonga origin headers */
19 #include <grn_str.h>
20 #include <grn_msgpack.h>
21
22 #include <stdio.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <err.h>
31
32 #include <fcntl.h>
33 #include <sys/queue.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <netinet/in.h>
37 #include <sys/resource.h>
38
39 #include "zmq_compatible.h"
40 #include <event.h>
41 #include <evhttp.h>
42 #include <groonga.h>
43 #include <pthread.h>
44
45 #include "util.h"
46
47 #define DEFAULT_PORT 8080
48 #define DEFAULT_MAX_THREADS 8
49
50 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
51
52 #define LISTEN_BACKLOG 756
53 #define MIN_MAX_FDS 2048
54 #define MAX_THREADS 128 /* max 256 */
55
56 typedef enum {
57 run_mode_none = 0,
58 run_mode_usage,
59 run_mode_daemon,
60 run_mode_error
61 } run_mode;
62
63 #define RUN_MODE_MASK 0x007f
64 #define RUN_MODE_ENABLE_MAX_FD_CHECK 0x0080
65
66
67 typedef struct {
68 grn_ctx *ctx;
69 grn_obj *db;
70 void *zmq_sock;
71 grn_obj cmd_buf;
72 grn_obj pass_through_parameters;
73 pthread_t thd;
74 uint32_t thread_id;
75 struct event_base *base;
76 struct evhttp *httpd;
77 struct event pulse;
78 const char *log_base_path;
79 FILE *log_file;
80 uint32_t log_count;
81 grn_bool request_reopen_log_file;
82 } thd_data;
83
84 typedef struct {
85 const char *db_path;
86 const char *recv_endpoint;
87 pthread_t thd;
88 void *zmq_ctx;
89 } recv_thd_data;
90
91 #define CMD_BUF_SIZE 1024
92
93 static thd_data threads[MAX_THREADS];
94 static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
95 static uint32_t max_threads;
96 static volatile sig_atomic_t loop = 1;
97 static grn_obj *db;
98 static uint32_t n_lines_per_log_file = 1000000;
99
100 static int
suggest_result(grn_ctx * ctx,struct evbuffer * res_buf,const char * types,const char * query,const char * target_name,int frequency_threshold,double conditional_probability_threshold,int limit,grn_obj * cmd_buf,grn_obj * pass_through_parameters)101 suggest_result(grn_ctx *ctx,
102 struct evbuffer *res_buf, const char *types, const char *query,
103 const char *target_name, int frequency_threshold,
104 double conditional_probability_threshold, int limit,
105 grn_obj *cmd_buf, grn_obj *pass_through_parameters)
106 {
107 if (target_name && types && query) {
108 GRN_BULK_REWIND(cmd_buf);
109 GRN_TEXT_PUTS(ctx, cmd_buf, "/d/suggest?table=item_");
110 grn_text_urlenc(ctx, cmd_buf, target_name, strlen(target_name));
111 GRN_TEXT_PUTS(ctx, cmd_buf, "&column=kana&types=");
112 grn_text_urlenc(ctx, cmd_buf, types, strlen(types));
113 GRN_TEXT_PUTS(ctx, cmd_buf, "&query=");
114 grn_text_urlenc(ctx, cmd_buf, query, strlen(query));
115 GRN_TEXT_PUTS(ctx, cmd_buf, "&frequency_threshold=");
116 grn_text_itoa(ctx, cmd_buf, frequency_threshold);
117 GRN_TEXT_PUTS(ctx, cmd_buf, "&conditional_probability_threshold=");
118 grn_text_ftoa(ctx, cmd_buf, conditional_probability_threshold);
119 GRN_TEXT_PUTS(ctx, cmd_buf, "&limit=");
120 grn_text_itoa(ctx, cmd_buf, limit);
121 if (GRN_TEXT_LEN(pass_through_parameters) > 0) {
122 GRN_TEXT_PUTS(ctx, cmd_buf, "&");
123 GRN_TEXT_PUT(ctx, cmd_buf,
124 GRN_TEXT_VALUE(pass_through_parameters),
125 GRN_TEXT_LEN(pass_through_parameters));
126 }
127 {
128 char *res;
129 int flags;
130 unsigned int res_len;
131
132 grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), 0);
133 grn_ctx_recv(ctx, &res, &res_len, &flags);
134
135 evbuffer_add(res_buf, res, res_len);
136 return res_len;
137 }
138 } else {
139 evbuffer_add(res_buf, "{}", 2);
140 return 2;
141 }
142 }
143
144 static void
log_send(struct evkeyvalq * output_headers,struct evbuffer * res_buf,thd_data * thd,struct evkeyvalq * get_args)145 log_send(struct evkeyvalq *output_headers, struct evbuffer *res_buf,
146 thd_data *thd, struct evkeyvalq *get_args)
147 {
148 uint64_t millisec;
149 int frequency_threshold, limit;
150 double conditional_probability_threshold;
151 const char *callback, *types, *query, *client_id, *target_name,
152 *learn_target_name;
153
154 GRN_BULK_REWIND(&(thd->pass_through_parameters));
155 parse_keyval(thd->ctx, get_args, &query, &types, &client_id, &target_name,
156 &learn_target_name, &callback, &millisec, &frequency_threshold,
157 &conditional_probability_threshold, &limit,
158 &(thd->pass_through_parameters));
159
160 /* send data to learn client */
161 if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
162 char c;
163 size_t l;
164 msgpack_packer pk;
165 msgpack_sbuffer sbuf;
166 int cnt, submit_flag = 0;
167
168 msgpack_sbuffer_init(&sbuf);
169 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
170
171 cnt = 4;
172 if (types && !strcmp(types, "submit")) {
173 cnt++;
174 types = NULL;
175 submit_flag = 1;
176 }
177 msgpack_pack_map(&pk, cnt);
178
179 c = 'i';
180 msgpack_pack_str(&pk, 1);
181 msgpack_pack_str_body(&pk, &c, 1);
182 l = strlen(client_id);
183 msgpack_pack_str(&pk, l);
184 msgpack_pack_str_body(&pk, client_id, l);
185
186 c = 'q';
187 msgpack_pack_str(&pk, 1);
188 msgpack_pack_str_body(&pk, &c, 1);
189 l = strlen(query);
190 msgpack_pack_str(&pk, l);
191 msgpack_pack_str_body(&pk, query, l);
192
193 c = 's';
194 msgpack_pack_str(&pk, 1);
195 msgpack_pack_str_body(&pk, &c, 1);
196 msgpack_pack_uint64(&pk, millisec);
197
198 c = 'l';
199 msgpack_pack_str(&pk, 1);
200 msgpack_pack_str_body(&pk, &c, 1);
201 l = strlen(learn_target_name);
202 msgpack_pack_str(&pk, l);
203 msgpack_pack_str_body(&pk, learn_target_name, l);
204
205 if (submit_flag) {
206 c = 't';
207 msgpack_pack_str(&pk, 1);
208 msgpack_pack_str_body(&pk, &c, 1);
209 msgpack_pack_true(&pk);
210 }
211 {
212 zmq_msg_t msg;
213 if (!zmq_msg_init_size(&msg, sbuf.size)) {
214 memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
215 if (zmq_msg_send(&msg, thd->zmq_sock, 0) == -1) {
216 print_error("zmq_msg_send() error");
217 }
218 zmq_msg_close(&msg);
219 }
220 }
221 msgpack_sbuffer_destroy(&sbuf);
222 }
223 /* make result */
224 {
225 int content_length;
226 if (callback) {
227 evhttp_add_header(output_headers,
228 "Content-Type", "text/javascript; charset=UTF-8");
229 content_length = strlen(callback);
230 evbuffer_add(res_buf, callback, content_length);
231 evbuffer_add(res_buf, "(", 1);
232 content_length += suggest_result(thd->ctx,
233 res_buf, types, query, target_name,
234 frequency_threshold,
235 conditional_probability_threshold,
236 limit,
237 &(thd->cmd_buf),
238 &(thd->pass_through_parameters)) + 3;
239 evbuffer_add(res_buf, ");", 2);
240 } else {
241 evhttp_add_header(output_headers,
242 "Content-Type", "application/json; charset=UTF-8");
243 content_length = suggest_result(thd->ctx,
244 res_buf, types, query, target_name,
245 frequency_threshold,
246 conditional_probability_threshold,
247 limit,
248 &(thd->cmd_buf),
249 &(thd->pass_through_parameters));
250 }
251 if (content_length >= 0) {
252 #define NUM_BUF_SIZE 16
253 char num_buf[NUM_BUF_SIZE];
254 grn_snprintf(num_buf, NUM_BUF_SIZE, NUM_BUF_SIZE, "%d", content_length);
255 evhttp_add_header(output_headers, "Content-Length", num_buf);
256 #undef NUM_BUF_SIZE
257 }
258 }
259 }
260
261 static void
cleanup_httpd_thread(thd_data * thd)262 cleanup_httpd_thread(thd_data *thd) {
263 if (thd->log_file) {
264 fclose(thd->log_file);
265 }
266 if (thd->httpd) {
267 evhttp_free(thd->httpd);
268 }
269 if (thd->zmq_sock) {
270 zmq_close(thd->zmq_sock);
271 }
272 grn_obj_unlink(thd->ctx, &(thd->cmd_buf));
273 grn_obj_unlink(thd->ctx, &(thd->pass_through_parameters));
274 if (thd->ctx) {
275 grn_ctx_close(thd->ctx);
276 }
277 event_base_free(thd->base);
278 }
279
280 static void
close_log_file(thd_data * thread)281 close_log_file(thd_data *thread)
282 {
283 fclose(thread->log_file);
284 thread->log_file = NULL;
285 thread->request_reopen_log_file = GRN_FALSE;
286 }
287
288 static void
generic_handler(struct evhttp_request * req,void * arg)289 generic_handler(struct evhttp_request *req, void *arg)
290 {
291 struct evkeyvalq args;
292 thd_data *thd = arg;
293
294 if (!loop) {
295 event_base_loopexit(thd->base, NULL);
296 return;
297 }
298 if (!req->uri) { return; }
299
300 evhttp_parse_query(req->uri, &args);
301 {
302 struct evbuffer *res_buf;
303 if (!(res_buf = evbuffer_new())) {
304 err(1, "failed to create response buffer");
305 }
306
307 evhttp_add_header(req->output_headers, "Connection", "close");
308
309 log_send(req->output_headers, res_buf, thd, &args);
310 evhttp_send_reply(req, HTTP_OK, "OK", res_buf);
311 evbuffer_free(res_buf);
312 /* logging */
313 {
314 if (thd->log_base_path) {
315 if (thd->log_file && thd->request_reopen_log_file) {
316 close_log_file(thd);
317 }
318 if (!thd->log_file) {
319 time_t n;
320 struct tm *t_st;
321 char p[PATH_MAX + 1];
322
323 time(&n);
324 t_st = localtime(&n);
325
326 grn_snprintf(p,
327 PATH_MAX,
328 PATH_MAX,
329 "%s%04d%02d%02d%02d%02d%02d-%02d",
330 thd->log_base_path,
331 t_st->tm_year + 1900,
332 t_st->tm_mon + 1,
333 t_st->tm_mday,
334 t_st->tm_hour,
335 t_st->tm_min,
336 t_st->tm_sec,
337 thd->thread_id);
338
339 if (!(thd->log_file = fopen(p, "a"))) {
340 print_error("cannot open log_file %s.", p);
341 } else {
342 thd->log_count = 0;
343 }
344 }
345 if (thd->log_file) {
346 fprintf(thd->log_file, "%s\n", req->uri);
347 thd->log_count++;
348 if (n_lines_per_log_file > 0 &&
349 thd->log_count >= n_lines_per_log_file) {
350 close_log_file(thd);
351 }
352 }
353 }
354 }
355 }
356 evhttp_clear_headers(&args);
357 }
358
359 static int
bind_socket(int port)360 bind_socket(int port)
361 {
362 int nfd;
363 if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
364 print_error("cannot open socket for http.");
365 return -1;
366 } else {
367 int r, one = 1;
368 struct sockaddr_in addr;
369
370 r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int));
371 memset(&addr, 0, sizeof(addr));
372 addr.sin_family = AF_INET;
373 addr.sin_addr.s_addr = INADDR_ANY;
374 addr.sin_port = htons(port);
375
376 if ((r = bind(nfd, (struct sockaddr *)&addr, sizeof(addr))) < 0) {
377 print_error("cannot bind socket for http.");
378 return r;
379 }
380 if ((r = listen(nfd, LISTEN_BACKLOG)) < 0) {
381 print_error("cannot listen socket for http.");
382 return r;
383 }
384 if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) {
385 print_error("cannot fcntl socket for http.");
386 return -1;
387 }
388 return nfd;
389 }
390 }
391
392 static void
signal_handler(int sig)393 signal_handler(int sig)
394 {
395 loop = 0;
396 }
397
398 static void
signal_reopen_log_file(int sig)399 signal_reopen_log_file(int sig)
400 {
401 uint32_t i;
402
403 for (i = 0; i < max_threads; i++) {
404 threads[i].request_reopen_log_file = GRN_TRUE;
405 }
406 }
407
408 void
timeout_handler(int fd,short events,void * arg)409 timeout_handler(int fd, short events, void *arg) {
410 thd_data *thd = arg;
411 if (!loop) {
412 event_base_loopexit(thd->base, NULL);
413 } else {
414 struct timeval tv = {1, 0};
415 evtimer_add(&(thd->pulse), &tv);
416 }
417 }
418
419 static void *
dispatch(void * arg)420 dispatch(void *arg)
421 {
422 event_base_dispatch((struct event_base *)arg);
423 return NULL;
424 }
425
426 static void
msgpack2json(msgpack_object * o,grn_ctx * ctx,grn_obj * buf)427 msgpack2json(msgpack_object *o, grn_ctx *ctx, grn_obj *buf)
428 {
429 switch (o->type) {
430 case MSGPACK_OBJECT_POSITIVE_INTEGER:
431 grn_text_ulltoa(ctx, buf, o->via.u64);
432 break;
433 case MSGPACK_OBJECT_STR:
434 grn_text_esc(ctx, buf,
435 MSGPACK_OBJECT_STR_PTR(o),
436 MSGPACK_OBJECT_STR_SIZE(o));
437 break;
438 case MSGPACK_OBJECT_ARRAY:
439 GRN_TEXT_PUTC(ctx, buf, '[');
440 {
441 int i;
442 for (i = 0; i < o->via.array.size; i++) {
443 msgpack2json(o->via.array.ptr, ctx, buf);
444 }
445 }
446 GRN_TEXT_PUTC(ctx, buf, ']');
447 break;
448 case MSGPACK_OBJECT_FLOAT:
449 grn_text_ftoa(ctx, buf, MSGPACK_OBJECT_FLOAT_VALUE(o));
450 break;
451 default:
452 print_error("cannot handle this msgpack type.");
453 }
454 }
455
456 static void
load_from_learner(msgpack_object * o,grn_ctx * ctx,grn_obj * cmd_buf)457 load_from_learner(msgpack_object *o, grn_ctx *ctx, grn_obj *cmd_buf)
458 {
459 if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) {
460 msgpack_object_kv *kv;
461 msgpack_object *key;
462 msgpack_object *value;
463 kv = &(o->via.map.ptr[0]);
464 key = &(kv->key);
465 value = &(kv->val);
466 if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) == 6 &&
467 !memcmp(MSGPACK_OBJECT_STR_PTR(key), CONST_STR_LEN("target"))) {
468 if (value->type == MSGPACK_OBJECT_STR) {
469 int i;
470 GRN_BULK_REWIND(cmd_buf);
471 GRN_TEXT_PUTS(ctx, cmd_buf, "load --table ");
472 GRN_TEXT_PUT(ctx, cmd_buf,
473 MSGPACK_OBJECT_STR_PTR(value),
474 MSGPACK_OBJECT_STR_SIZE(value));
475 grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
476 grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
477 if (MSGPACK_OBJECT_STR_SIZE(value) > 5) {
478 if (!memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("item_")) ||
479 !memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("pair_"))) {
480 char delim = '{';
481 GRN_BULK_REWIND(cmd_buf);
482 for (i = 1; i < o->via.map.size; i++) {
483 GRN_TEXT_PUTC(ctx, cmd_buf, delim);
484 kv = &(o->via.map.ptr[i]);
485 msgpack2json(&(kv->key), ctx, cmd_buf);
486 GRN_TEXT_PUTC(ctx, cmd_buf, ':');
487 msgpack2json(&(kv->val), ctx, cmd_buf);
488 delim = ',';
489 }
490 GRN_TEXT_PUTC(ctx, cmd_buf, '}');
491 /* printf("msg: %.*s\n", GRN_TEXT_LEN(cmd_buf), GRN_TEXT_VALUE(cmd_buf)); */
492 grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
493 }
494 }
495 grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
496 {
497 char *res;
498 int flags;
499 unsigned int res_len;
500 grn_ctx_recv(ctx, &res, &res_len, &flags);
501 }
502 }
503 }
504 }
505 }
506
507 static void
recv_handler(grn_ctx * ctx,void * zmq_recv_sock,msgpack_zone * mempool,grn_obj * cmd_buf)508 recv_handler(grn_ctx *ctx, void *zmq_recv_sock, msgpack_zone *mempool, grn_obj *cmd_buf)
509 {
510 zmq_msg_t msg;
511
512 if (zmq_msg_init(&msg)) {
513 print_error("cannot init zmq message.");
514 } else {
515 if (zmq_msg_recv(&msg, zmq_recv_sock, 0) == -1) {
516 print_error("cannot recv zmq message.");
517 } else {
518 msgpack_object obj;
519 msgpack_unpack_return ret;
520
521 ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
522 if (MSGPACK_UNPACK_SUCCESS == ret) {
523 load_from_learner(&obj, ctx, cmd_buf);
524 } else {
525 print_error("invalid recv data.");
526 }
527 msgpack_zone_clear(mempool);
528 }
529 zmq_msg_close(&msg);
530 }
531 }
532
533 static void *
recv_from_learner(void * arg)534 recv_from_learner(void *arg)
535 {
536 void *zmq_recv_sock;
537 recv_thd_data *thd = arg;
538
539 if ((zmq_recv_sock = zmq_socket(thd->zmq_ctx, ZMQ_SUB))) {
540 if (!zmq_connect(zmq_recv_sock, thd->recv_endpoint)) {
541 grn_ctx ctx;
542 if (!grn_ctx_init(&ctx, 0)) {
543 if ((!grn_ctx_use(&ctx, db))) {
544 msgpack_zone *mempool;
545 if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
546 grn_obj cmd_buf;
547 zmq_pollitem_t items[] = {
548 { zmq_recv_sock, 0, ZMQ_POLLIN, 0}
549 };
550 GRN_TEXT_INIT(&cmd_buf, 0);
551 zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
552 while (loop) {
553 zmq_poll(items, 1, 10000);
554 if (items[0].revents & ZMQ_POLLIN) {
555 recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf);
556 }
557 }
558 grn_obj_unlink(&ctx, &cmd_buf);
559 msgpack_zone_free(mempool);
560 } else {
561 print_error("cannot create msgpack zone.");
562 }
563 /* db_close */
564 } else {
565 print_error("error in grn_db_open() on recv thread.");
566 }
567 grn_ctx_fin(&ctx);
568 } else {
569 print_error("error in grn_ctx_init() on recv thread.");
570 }
571 } else {
572 print_error("cannot create recv zmq_socket.");
573 }
574 } else {
575 print_error("cannot connect zmq_socket.");
576 }
577 return NULL;
578 }
579
580 static int
serve_threads(int nthreads,int port,const char * db_path,void * zmq_ctx,const char * send_endpoint,const char * recv_endpoint,const char * log_base_path)581 serve_threads(int nthreads, int port, const char *db_path, void *zmq_ctx,
582 const char *send_endpoint, const char *recv_endpoint,
583 const char *log_base_path)
584 {
585 int nfd;
586 uint32_t i;
587 if ((nfd = bind_socket(port)) < 0) {
588 print_error("cannot bind socket. please check port number with netstat.");
589 return -1;
590 }
591
592 for (i = 0; i < nthreads; i++) {
593 memset(&threads[i], 0, sizeof(threads[i]));
594 threads[i].request_reopen_log_file = GRN_FALSE;
595 if (!(threads[i].base = event_init())) {
596 print_error("error in event_init() on thread %d.", i);
597 } else {
598 if (!(threads[i].httpd = evhttp_new(threads[i].base))) {
599 print_error("error in evhttp_new() on thread %d.", i);
600 } else {
601 int r;
602 if ((r = evhttp_accept_socket(threads[i].httpd, nfd))) {
603 print_error("error in evhttp_accept_socket() on thread %d.", i);
604 } else {
605 if (send_endpoint) {
606 if (!(threads[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
607 print_error("cannot create zmq_socket.");
608 } else if (zmq_connect(threads[i].zmq_sock, send_endpoint)) {
609 print_error("cannot connect zmq_socket.");
610 zmq_close(threads[i].zmq_sock);
611 threads[i].zmq_sock = NULL;
612 } else {
613 uint64_t hwm = 1;
614 zmq_setsockopt(threads[i].zmq_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
615 }
616 } else {
617 threads[i].zmq_sock = NULL;
618 }
619 if (!(threads[i].ctx = grn_ctx_open(0))) {
620 print_error("error in grn_ctx_open() on thread %d.", i);
621 } else if (grn_ctx_use(threads[i].ctx, db)) {
622 print_error("error in grn_db_open() on thread %d.", i);
623 } else {
624 GRN_TEXT_INIT(&(threads[i].cmd_buf), 0);
625 GRN_TEXT_INIT(&(threads[i].pass_through_parameters), 0);
626 threads[i].log_base_path = log_base_path;
627 threads[i].thread_id = i;
628 evhttp_set_gencb(threads[i].httpd, generic_handler, &threads[i]);
629 evhttp_set_timeout(threads[i].httpd, 10);
630 {
631 struct timeval tv = {1, 0};
632 evtimer_set(&(threads[i].pulse), timeout_handler, &threads[i]);
633 evtimer_add(&(threads[i].pulse), &tv);
634 }
635 if ((r = pthread_create(&(threads[i].thd), NULL, dispatch, threads[i].base))) {
636 print_error("error in pthread_create() on thread %d.", i);
637 }
638 }
639 }
640 }
641 }
642 }
643
644 /* recv thread from learner */
645 if (recv_endpoint) {
646 recv_thd_data rthd;
647 rthd.db_path = db_path;
648 rthd.recv_endpoint = recv_endpoint;
649 rthd.zmq_ctx = zmq_ctx;
650
651 if (pthread_create(&(rthd.thd), NULL, recv_from_learner, &rthd)) {
652 print_error("error in pthread_create() on thread %d.", i);
653 }
654 if (pthread_join(rthd.thd, NULL)) {
655 print_error("error in pthread_join() on thread %d.", i);
656 }
657 } else {
658 while (loop) { sleep(1); }
659 }
660
661 /* join all httpd thread */
662 for (i = 0; i < nthreads; i++) {
663 if (threads[i].thd) {
664 if (pthread_join(threads[i].thd, NULL)) {
665 print_error("error in pthread_join() on thread %d.", i);
666 }
667 }
668 cleanup_httpd_thread(&(threads[i]));
669 }
670 return 0;
671 }
672
673 static uint32_t
get_core_number(void)674 get_core_number(void)
675 {
676 #ifdef ACTUALLY_GET_CORE_NUMBER
677 #ifdef _SC_NPROCESSORS_CONF
678 return sysconf(_SC_NPROCESSORS_CONF);
679 #else /* _SC_NPROCESSORS_CONF */
680 int n_processors;
681 size_t length = sizeof(n_processors);
682 int mib[] = {CTL_HW, HW_NCPU};
683 if (sysctl(mib, sizeof(mib) / sizeof(mib[0]),
684 &n_processors, &length, NULL, 0) == 0 &&
685 length == sizeof(n_processors) &&
686 0 < n_processors) {
687 return n_processors;
688 } else {
689 return 1;
690 }
691 #endif /* _SC_NPROCESSORS_CONF */
692 #endif /* ACTUALLY_GET_CORE_NUMBER */
693 return 0;
694 }
695
696 static void
usage(FILE * output)697 usage(FILE *output)
698 {
699 fprintf(
700 output,
701 "Usage: groonga-suggest-httpd [options...] db_path\n"
702 "db_path:\n"
703 " specify groonga database path which is used for suggestion.\n"
704 "\n"
705 "options:\n"
706 " -p, --port <port number> : http server port number\n"
707 " (default: %d)\n"
708 /*
709 " --address <ip/hostname> : server address to listen\n"
710 " (default: %s)\n"
711 */
712 " -c <thread number> : number of server threads\n"
713 " (deprecated. use --n-threads)\n"
714 " -t, --n-threads <thread number> : number of server threads\n"
715 " (default: %d)\n"
716 " -s, --send-endpoint <send endpoint> : send endpoint\n"
717 " (ex. tcp://example.com:1234)\n"
718 " -r, --receive-endpoint <receive endpoint> : receive endpoint\n"
719 " (ex. tcp://example.com:1235)\n"
720 " -l, --log-base-path <path prefix> : log path prefix\n"
721 " --n-lines-per-log-file <lines number> : number of lines in a log file\n"
722 " use 0 for disabling this\n"
723 " (default: %d)\n"
724 " -d, --daemon : daemonize\n"
725 " --disable-max-fd-check : disable max FD check on start\n"
726 " -h, --help : show this message\n",
727 DEFAULT_PORT, default_max_threads, n_lines_per_log_file);
728 }
729
730 int
main(int argc,char ** argv)731 main(int argc, char **argv)
732 {
733 int port_no = DEFAULT_PORT;
734 const char *max_threads_string = NULL, *port_string = NULL;
735 const char *address;
736 const char *send_endpoint = NULL, *recv_endpoint = NULL, *log_base_path = NULL;
737 const char *n_lines_per_log_file_string = NULL;
738 int n_processed_args, flags = RUN_MODE_ENABLE_MAX_FD_CHECK;
739 run_mode mode = run_mode_none;
740
741 if (!(default_max_threads = get_core_number())) {
742 default_max_threads = DEFAULT_MAX_THREADS;
743 }
744
745 /* parse options */
746 {
747 static grn_str_getopt_opt opts[] = {
748 {'c', NULL, NULL, 0, GETOPT_OP_NONE}, /* deprecated */
749 {'t', "n-threads", NULL, 0, GETOPT_OP_NONE},
750 {'h', "help", NULL, run_mode_usage, GETOPT_OP_UPDATE},
751 {'p', "port", NULL, 0, GETOPT_OP_NONE},
752 {'\0', "bind-address", NULL, 0, GETOPT_OP_NONE}, /* not supported yet */
753 {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE},
754 {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE},
755 {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE},
756 {'\0', "n-lines-per-log-file", NULL, 0, GETOPT_OP_NONE},
757 {'d', "daemon", NULL, run_mode_daemon, GETOPT_OP_UPDATE},
758 {'\0', "disable-max-fd-check", NULL, RUN_MODE_ENABLE_MAX_FD_CHECK,
759 GETOPT_OP_OFF},
760 {'\0', NULL, NULL, 0, 0}
761 };
762 opts[0].arg = &max_threads_string;
763 opts[1].arg = &max_threads_string;
764 opts[3].arg = &port_string;
765 opts[4].arg = &address;
766 opts[5].arg = &send_endpoint;
767 opts[6].arg = &recv_endpoint;
768 opts[7].arg = &log_base_path;
769 opts[8].arg = &n_lines_per_log_file_string;
770
771 n_processed_args = grn_str_getopt(argc, argv, opts, &flags);
772 }
773
774 /* main */
775 mode = (flags & RUN_MODE_MASK);
776 if (n_processed_args < 0 ||
777 (argc - n_processed_args) != 1 ||
778 mode == run_mode_error) {
779 usage(stderr);
780 return EXIT_FAILURE;
781 } else if (mode == run_mode_usage) {
782 usage(stdout);
783 return EXIT_SUCCESS;
784 } else {
785 grn_ctx ctx;
786 void *zmq_ctx;
787
788 if (max_threads_string) {
789 max_threads = atoi(max_threads_string);
790 if (max_threads > MAX_THREADS) {
791 print_error("too many threads. limit to %d.", MAX_THREADS);
792 max_threads = MAX_THREADS;
793 }
794 } else {
795 max_threads = default_max_threads;
796 }
797
798 if (port_string) {
799 port_no = atoi(port_string);
800 }
801
802 if (flags & RUN_MODE_ENABLE_MAX_FD_CHECK) {
803 /* check environment */
804 struct rlimit rlim;
805 if (!getrlimit(RLIMIT_NOFILE, &rlim)) {
806 if (rlim.rlim_max < MIN_MAX_FDS) {
807 print_error("too small max fds. %d required.", MIN_MAX_FDS);
808 return -1;
809 }
810 rlim.rlim_cur = rlim.rlim_cur;
811 setrlimit(RLIMIT_NOFILE, &rlim);
812 }
813 }
814
815 if (n_lines_per_log_file_string) {
816 int64_t n_lines;
817 n_lines = grn_atoll(n_lines_per_log_file_string,
818 n_lines_per_log_file_string + strlen(n_lines_per_log_file_string),
819 NULL);
820 if (n_lines < 0) {
821 print_error("--n-lines-per-log-file must be >= 0: <%s>",
822 n_lines_per_log_file_string);
823 return(EXIT_FAILURE);
824 }
825 if (n_lines > UINT32_MAX) {
826 print_error("--n-lines-per-log-file must be <= %ld: <%s>",
827 UINT32_MAX, n_lines_per_log_file_string);
828 return(EXIT_FAILURE);
829 }
830 n_lines_per_log_file = (uint32_t)n_lines;
831 }
832
833 if (mode == run_mode_daemon) {
834 daemonize();
835 }
836
837 grn_init();
838 grn_ctx_init(&ctx, 0);
839 if ((db = grn_db_open(&ctx, argv[n_processed_args]))) {
840 if ((zmq_ctx = zmq_init(1))) {
841 signal(SIGTERM, signal_handler);
842 signal(SIGINT, signal_handler);
843 signal(SIGQUIT, signal_handler);
844 signal(SIGUSR1, signal_reopen_log_file);
845
846 serve_threads(max_threads, port_no, argv[n_processed_args], zmq_ctx,
847 send_endpoint, recv_endpoint, log_base_path);
848 zmq_term(zmq_ctx);
849 } else {
850 print_error("cannot create zmq context.");
851 }
852 grn_obj_close(&ctx, db);
853 } else {
854 print_error("cannot open db.");
855 }
856 grn_ctx_fin(&ctx);
857 grn_fin();
858 }
859 return 0;
860 }
861