1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2015 Brazil
3 
4   This library is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Lesser General Public
6   License version 2.1 as published by the Free Software Foundation.
7 
8   This library is distributed in the hope that it will be useful,
9   but WITHOUT ANY WARRANTY; without even the implied warranty of
10   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11   Lesser General Public License for more details.
12 
13   You should have received a copy of the GNU Lesser General Public
14   License along with this library; if not, write to the Free Software
15   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
16 */
17 
18 /* groonga origin headers */
19 #include <grn_str.h>
20 #include <grn_msgpack.h>
21 
22 #include <stdio.h>
23 #include <signal.h>
24 #include <string.h>
25 #include <sys/types.h>
26 #include <sys/time.h>
27 #include <time.h>
28 #include <stdlib.h>
29 #include <unistd.h>
30 #include <err.h>
31 
32 #include <fcntl.h>
33 #include <sys/queue.h>
34 #include <sys/socket.h>
35 #include <sys/types.h>
36 #include <netinet/in.h>
37 #include <sys/resource.h>
38 
39 #include "zmq_compatible.h"
40 #include <event.h>
41 #include <evhttp.h>
42 #include <groonga.h>
43 #include <pthread.h>
44 
45 #include "util.h"
46 
47 #define DEFAULT_PORT 8080
48 #define DEFAULT_MAX_THREADS 8
49 
50 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
51 
52 #define LISTEN_BACKLOG 756
53 #define MIN_MAX_FDS 2048
54 #define MAX_THREADS 128 /* max 256 */
55 
56 typedef enum {
57   run_mode_none = 0,
58   run_mode_usage,
59   run_mode_daemon,
60   run_mode_error
61 } run_mode;
62 
63 #define RUN_MODE_MASK                0x007f
64 #define RUN_MODE_ENABLE_MAX_FD_CHECK 0x0080
65 
66 
67 typedef struct {
68   grn_ctx *ctx;
69   grn_obj *db;
70   void *zmq_sock;
71   grn_obj cmd_buf;
72   grn_obj pass_through_parameters;
73   pthread_t thd;
74   uint32_t thread_id;
75   struct event_base *base;
76   struct evhttp *httpd;
77   struct event pulse;
78   const char *log_base_path;
79   FILE *log_file;
80   uint32_t log_count;
81   grn_bool request_reopen_log_file;
82 } thd_data;
83 
84 typedef struct {
85   const char *db_path;
86   const char *recv_endpoint;
87   pthread_t thd;
88   void *zmq_ctx;
89 } recv_thd_data;
90 
91 #define CMD_BUF_SIZE 1024
92 
93 static thd_data threads[MAX_THREADS];
94 static uint32_t default_max_threads = DEFAULT_MAX_THREADS;
95 static uint32_t max_threads;
96 static volatile sig_atomic_t loop = 1;
97 static grn_obj *db;
98 static uint32_t n_lines_per_log_file = 1000000;
99 
100 static int
suggest_result(grn_ctx * ctx,struct evbuffer * res_buf,const char * types,const char * query,const char * target_name,int frequency_threshold,double conditional_probability_threshold,int limit,grn_obj * cmd_buf,grn_obj * pass_through_parameters)101 suggest_result(grn_ctx *ctx,
102                struct evbuffer *res_buf, const char *types, const char *query,
103                const char *target_name, int frequency_threshold,
104                double conditional_probability_threshold, int limit,
105                grn_obj *cmd_buf, grn_obj *pass_through_parameters)
106 {
107   if (target_name && types && query) {
108     GRN_BULK_REWIND(cmd_buf);
109     GRN_TEXT_PUTS(ctx, cmd_buf, "/d/suggest?table=item_");
110     grn_text_urlenc(ctx, cmd_buf, target_name, strlen(target_name));
111     GRN_TEXT_PUTS(ctx, cmd_buf, "&column=kana&types=");
112     grn_text_urlenc(ctx, cmd_buf, types, strlen(types));
113     GRN_TEXT_PUTS(ctx, cmd_buf, "&query=");
114     grn_text_urlenc(ctx, cmd_buf, query, strlen(query));
115     GRN_TEXT_PUTS(ctx, cmd_buf, "&frequency_threshold=");
116     grn_text_itoa(ctx, cmd_buf, frequency_threshold);
117     GRN_TEXT_PUTS(ctx, cmd_buf, "&conditional_probability_threshold=");
118     grn_text_ftoa(ctx, cmd_buf, conditional_probability_threshold);
119     GRN_TEXT_PUTS(ctx, cmd_buf, "&limit=");
120     grn_text_itoa(ctx, cmd_buf, limit);
121     if (GRN_TEXT_LEN(pass_through_parameters) > 0) {
122       GRN_TEXT_PUTS(ctx, cmd_buf, "&");
123       GRN_TEXT_PUT(ctx, cmd_buf,
124                    GRN_TEXT_VALUE(pass_through_parameters),
125                    GRN_TEXT_LEN(pass_through_parameters));
126     }
127     {
128       char *res;
129       int flags;
130       unsigned int res_len;
131 
132       grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), 0);
133       grn_ctx_recv(ctx, &res, &res_len, &flags);
134 
135       evbuffer_add(res_buf, res, res_len);
136       return res_len;
137     }
138   } else {
139     evbuffer_add(res_buf, "{}", 2);
140     return 2;
141   }
142 }
143 
144 static void
log_send(struct evkeyvalq * output_headers,struct evbuffer * res_buf,thd_data * thd,struct evkeyvalq * get_args)145 log_send(struct evkeyvalq *output_headers, struct evbuffer *res_buf,
146          thd_data *thd, struct evkeyvalq *get_args)
147 {
148   uint64_t millisec;
149   int frequency_threshold, limit;
150   double conditional_probability_threshold;
151   const char *callback, *types, *query, *client_id, *target_name,
152              *learn_target_name;
153 
154   GRN_BULK_REWIND(&(thd->pass_through_parameters));
155   parse_keyval(thd->ctx, get_args, &query, &types, &client_id, &target_name,
156                &learn_target_name, &callback, &millisec, &frequency_threshold,
157                &conditional_probability_threshold, &limit,
158                &(thd->pass_through_parameters));
159 
160   /* send data to learn client */
161   if (thd->zmq_sock && millisec && client_id && query && learn_target_name) {
162     char c;
163     size_t l;
164     msgpack_packer pk;
165     msgpack_sbuffer sbuf;
166     int cnt, submit_flag = 0;
167 
168     msgpack_sbuffer_init(&sbuf);
169     msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
170 
171     cnt = 4;
172     if (types && !strcmp(types, "submit")) {
173       cnt++;
174       types = NULL;
175       submit_flag = 1;
176     }
177     msgpack_pack_map(&pk, cnt);
178 
179     c = 'i';
180     msgpack_pack_str(&pk, 1);
181     msgpack_pack_str_body(&pk, &c, 1);
182     l = strlen(client_id);
183     msgpack_pack_str(&pk, l);
184     msgpack_pack_str_body(&pk, client_id, l);
185 
186     c = 'q';
187     msgpack_pack_str(&pk, 1);
188     msgpack_pack_str_body(&pk, &c, 1);
189     l = strlen(query);
190     msgpack_pack_str(&pk, l);
191     msgpack_pack_str_body(&pk, query, l);
192 
193     c = 's';
194     msgpack_pack_str(&pk, 1);
195     msgpack_pack_str_body(&pk, &c, 1);
196     msgpack_pack_uint64(&pk, millisec);
197 
198     c = 'l';
199     msgpack_pack_str(&pk, 1);
200     msgpack_pack_str_body(&pk, &c, 1);
201     l = strlen(learn_target_name);
202     msgpack_pack_str(&pk, l);
203     msgpack_pack_str_body(&pk, learn_target_name, l);
204 
205     if (submit_flag) {
206       c = 't';
207       msgpack_pack_str(&pk, 1);
208       msgpack_pack_str_body(&pk, &c, 1);
209       msgpack_pack_true(&pk);
210     }
211     {
212       zmq_msg_t msg;
213       if (!zmq_msg_init_size(&msg, sbuf.size)) {
214         memcpy((void *)zmq_msg_data(&msg), sbuf.data, sbuf.size);
215         if (zmq_msg_send(&msg, thd->zmq_sock, 0) == -1) {
216           print_error("zmq_msg_send() error");
217         }
218         zmq_msg_close(&msg);
219       }
220     }
221     msgpack_sbuffer_destroy(&sbuf);
222   }
223   /* make result */
224   {
225     int content_length;
226     if (callback) {
227       evhttp_add_header(output_headers,
228                         "Content-Type", "text/javascript; charset=UTF-8");
229       content_length = strlen(callback);
230       evbuffer_add(res_buf, callback, content_length);
231       evbuffer_add(res_buf, "(", 1);
232       content_length += suggest_result(thd->ctx,
233                                        res_buf, types, query, target_name,
234                                        frequency_threshold,
235                                        conditional_probability_threshold,
236                                        limit,
237                                        &(thd->cmd_buf),
238                                        &(thd->pass_through_parameters)) + 3;
239       evbuffer_add(res_buf, ");", 2);
240     } else {
241       evhttp_add_header(output_headers,
242                         "Content-Type", "application/json; charset=UTF-8");
243       content_length = suggest_result(thd->ctx,
244                                       res_buf, types, query, target_name,
245                                       frequency_threshold,
246                                       conditional_probability_threshold,
247                                       limit,
248                                       &(thd->cmd_buf),
249                                       &(thd->pass_through_parameters));
250     }
251     if (content_length >= 0) {
252 #define NUM_BUF_SIZE 16
253       char num_buf[NUM_BUF_SIZE];
254       grn_snprintf(num_buf, NUM_BUF_SIZE, NUM_BUF_SIZE, "%d", content_length);
255       evhttp_add_header(output_headers, "Content-Length", num_buf);
256 #undef NUM_BUF_SIZE
257     }
258   }
259 }
260 
261 static void
cleanup_httpd_thread(thd_data * thd)262 cleanup_httpd_thread(thd_data *thd) {
263   if (thd->log_file) {
264     fclose(thd->log_file);
265   }
266   if (thd->httpd) {
267     evhttp_free(thd->httpd);
268   }
269   if (thd->zmq_sock) {
270     zmq_close(thd->zmq_sock);
271   }
272   grn_obj_unlink(thd->ctx, &(thd->cmd_buf));
273   grn_obj_unlink(thd->ctx, &(thd->pass_through_parameters));
274   if (thd->ctx) {
275     grn_ctx_close(thd->ctx);
276   }
277   event_base_free(thd->base);
278 }
279 
280 static void
close_log_file(thd_data * thread)281 close_log_file(thd_data *thread)
282 {
283   fclose(thread->log_file);
284   thread->log_file = NULL;
285   thread->request_reopen_log_file = GRN_FALSE;
286 }
287 
288 static void
generic_handler(struct evhttp_request * req,void * arg)289 generic_handler(struct evhttp_request *req, void *arg)
290 {
291   struct evkeyvalq args;
292   thd_data *thd = arg;
293 
294   if (!loop) {
295     event_base_loopexit(thd->base, NULL);
296     return;
297   }
298   if (!req->uri) { return; }
299 
300   evhttp_parse_query(req->uri, &args);
301   {
302     struct evbuffer *res_buf;
303     if (!(res_buf = evbuffer_new())) {
304       err(1, "failed to create response buffer");
305     }
306 
307     evhttp_add_header(req->output_headers, "Connection", "close");
308 
309     log_send(req->output_headers, res_buf, thd, &args);
310     evhttp_send_reply(req, HTTP_OK, "OK", res_buf);
311     evbuffer_free(res_buf);
312     /* logging */
313     {
314       if (thd->log_base_path) {
315         if (thd->log_file && thd->request_reopen_log_file) {
316           close_log_file(thd);
317         }
318         if (!thd->log_file) {
319           time_t n;
320           struct tm *t_st;
321           char p[PATH_MAX + 1];
322 
323           time(&n);
324           t_st = localtime(&n);
325 
326           grn_snprintf(p,
327                        PATH_MAX,
328                        PATH_MAX,
329                        "%s%04d%02d%02d%02d%02d%02d-%02d",
330                        thd->log_base_path,
331                        t_st->tm_year + 1900,
332                        t_st->tm_mon + 1,
333                        t_st->tm_mday,
334                        t_st->tm_hour,
335                        t_st->tm_min,
336                        t_st->tm_sec,
337                        thd->thread_id);
338 
339           if (!(thd->log_file = fopen(p, "a"))) {
340             print_error("cannot open log_file %s.", p);
341           } else {
342             thd->log_count = 0;
343           }
344         }
345         if (thd->log_file) {
346           fprintf(thd->log_file, "%s\n", req->uri);
347           thd->log_count++;
348           if (n_lines_per_log_file > 0 &&
349               thd->log_count >= n_lines_per_log_file) {
350             close_log_file(thd);
351           }
352         }
353       }
354     }
355   }
356   evhttp_clear_headers(&args);
357 }
358 
359 static int
bind_socket(int port)360 bind_socket(int port)
361 {
362   int nfd;
363   if ((nfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
364     print_error("cannot open socket for http.");
365     return -1;
366   } else {
367     int r, one = 1;
368     struct sockaddr_in addr;
369 
370     r = setsockopt(nfd, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(int));
371     memset(&addr, 0, sizeof(addr));
372     addr.sin_family = AF_INET;
373     addr.sin_addr.s_addr = INADDR_ANY;
374     addr.sin_port = htons(port);
375 
376     if ((r = bind(nfd, (struct sockaddr *)&addr, sizeof(addr))) < 0) {
377       print_error("cannot bind socket for http.");
378       return r;
379     }
380     if ((r = listen(nfd, LISTEN_BACKLOG)) < 0) {
381       print_error("cannot listen socket for http.");
382       return r;
383     }
384     if ((r = fcntl(nfd, F_GETFL, 0)) < 0 || fcntl(nfd, F_SETFL, r | O_NONBLOCK) < 0 ) {
385       print_error("cannot fcntl socket for http.");
386       return -1;
387     }
388     return nfd;
389   }
390 }
391 
392 static void
signal_handler(int sig)393 signal_handler(int sig)
394 {
395   loop = 0;
396 }
397 
398 static void
signal_reopen_log_file(int sig)399 signal_reopen_log_file(int sig)
400 {
401   uint32_t i;
402 
403   for (i = 0; i < max_threads; i++) {
404     threads[i].request_reopen_log_file = GRN_TRUE;
405   }
406 }
407 
408 void
timeout_handler(int fd,short events,void * arg)409 timeout_handler(int fd, short events, void *arg) {
410   thd_data *thd = arg;
411   if (!loop) {
412     event_base_loopexit(thd->base, NULL);
413   } else {
414     struct timeval tv = {1, 0};
415     evtimer_add(&(thd->pulse), &tv);
416   }
417 }
418 
419 static void *
dispatch(void * arg)420 dispatch(void *arg)
421 {
422   event_base_dispatch((struct event_base *)arg);
423   return NULL;
424 }
425 
426 static void
msgpack2json(msgpack_object * o,grn_ctx * ctx,grn_obj * buf)427 msgpack2json(msgpack_object *o, grn_ctx *ctx, grn_obj *buf)
428 {
429   switch (o->type) {
430   case MSGPACK_OBJECT_POSITIVE_INTEGER:
431     grn_text_ulltoa(ctx, buf, o->via.u64);
432     break;
433   case MSGPACK_OBJECT_STR:
434     grn_text_esc(ctx, buf,
435                  MSGPACK_OBJECT_STR_PTR(o),
436                  MSGPACK_OBJECT_STR_SIZE(o));
437     break;
438   case MSGPACK_OBJECT_ARRAY:
439     GRN_TEXT_PUTC(ctx, buf, '[');
440     {
441       int i;
442       for (i = 0; i < o->via.array.size; i++) {
443         msgpack2json(o->via.array.ptr, ctx, buf);
444       }
445     }
446     GRN_TEXT_PUTC(ctx, buf, ']');
447     break;
448   case MSGPACK_OBJECT_FLOAT:
449     grn_text_ftoa(ctx, buf, MSGPACK_OBJECT_FLOAT_VALUE(o));
450     break;
451   default:
452     print_error("cannot handle this msgpack type.");
453   }
454 }
455 
456 static void
load_from_learner(msgpack_object * o,grn_ctx * ctx,grn_obj * cmd_buf)457 load_from_learner(msgpack_object *o, grn_ctx *ctx, grn_obj *cmd_buf)
458 {
459   if (o->type == MSGPACK_OBJECT_MAP && o->via.map.size) {
460     msgpack_object_kv *kv;
461     msgpack_object *key;
462     msgpack_object *value;
463     kv = &(o->via.map.ptr[0]);
464     key = &(kv->key);
465     value = &(kv->val);
466     if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) == 6 &&
467         !memcmp(MSGPACK_OBJECT_STR_PTR(key), CONST_STR_LEN("target"))) {
468       if (value->type == MSGPACK_OBJECT_STR) {
469         int i;
470         GRN_BULK_REWIND(cmd_buf);
471         GRN_TEXT_PUTS(ctx, cmd_buf, "load --table ");
472         GRN_TEXT_PUT(ctx, cmd_buf,
473                      MSGPACK_OBJECT_STR_PTR(value),
474                      MSGPACK_OBJECT_STR_SIZE(value));
475         grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
476         grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
477         if (MSGPACK_OBJECT_STR_SIZE(value) > 5) {
478           if (!memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("item_")) ||
479               !memcmp(MSGPACK_OBJECT_STR_PTR(value), CONST_STR_LEN("pair_"))) {
480             char delim = '{';
481             GRN_BULK_REWIND(cmd_buf);
482             for (i = 1; i < o->via.map.size; i++) {
483               GRN_TEXT_PUTC(ctx, cmd_buf, delim);
484               kv = &(o->via.map.ptr[i]);
485               msgpack2json(&(kv->key), ctx, cmd_buf);
486               GRN_TEXT_PUTC(ctx, cmd_buf, ':');
487               msgpack2json(&(kv->val), ctx, cmd_buf);
488               delim = ',';
489             }
490             GRN_TEXT_PUTC(ctx, cmd_buf, '}');
491             /* printf("msg: %.*s\n", GRN_TEXT_LEN(cmd_buf), GRN_TEXT_VALUE(cmd_buf)); */
492             grn_ctx_send(ctx, GRN_TEXT_VALUE(cmd_buf), GRN_TEXT_LEN(cmd_buf), GRN_CTX_MORE);
493           }
494         }
495         grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
496         {
497           char *res;
498           int flags;
499           unsigned int res_len;
500           grn_ctx_recv(ctx, &res, &res_len, &flags);
501         }
502       }
503     }
504   }
505 }
506 
507 static void
recv_handler(grn_ctx * ctx,void * zmq_recv_sock,msgpack_zone * mempool,grn_obj * cmd_buf)508 recv_handler(grn_ctx *ctx, void *zmq_recv_sock, msgpack_zone *mempool, grn_obj *cmd_buf)
509 {
510   zmq_msg_t msg;
511 
512   if (zmq_msg_init(&msg)) {
513     print_error("cannot init zmq message.");
514   } else {
515     if (zmq_msg_recv(&msg, zmq_recv_sock, 0) == -1) {
516       print_error("cannot recv zmq message.");
517     } else {
518       msgpack_object obj;
519       msgpack_unpack_return ret;
520 
521       ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
522       if (MSGPACK_UNPACK_SUCCESS == ret) {
523         load_from_learner(&obj, ctx, cmd_buf);
524       } else {
525         print_error("invalid recv data.");
526       }
527       msgpack_zone_clear(mempool);
528     }
529     zmq_msg_close(&msg);
530   }
531 }
532 
533 static void *
recv_from_learner(void * arg)534 recv_from_learner(void *arg)
535 {
536   void *zmq_recv_sock;
537   recv_thd_data *thd = arg;
538 
539   if ((zmq_recv_sock = zmq_socket(thd->zmq_ctx, ZMQ_SUB))) {
540     if (!zmq_connect(zmq_recv_sock, thd->recv_endpoint)) {
541       grn_ctx ctx;
542       if (!grn_ctx_init(&ctx, 0)) {
543         if ((!grn_ctx_use(&ctx, db))) {
544           msgpack_zone *mempool;
545           if ((mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
546             grn_obj cmd_buf;
547             zmq_pollitem_t items[] = {
548               { zmq_recv_sock, 0, ZMQ_POLLIN, 0}
549             };
550             GRN_TEXT_INIT(&cmd_buf, 0);
551             zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
552             while (loop) {
553               zmq_poll(items, 1, 10000);
554               if (items[0].revents & ZMQ_POLLIN) {
555                 recv_handler(&ctx, zmq_recv_sock, mempool, &cmd_buf);
556               }
557             }
558             grn_obj_unlink(&ctx, &cmd_buf);
559             msgpack_zone_free(mempool);
560           } else {
561             print_error("cannot create msgpack zone.");
562           }
563           /* db_close */
564         } else {
565           print_error("error in grn_db_open() on recv thread.");
566         }
567         grn_ctx_fin(&ctx);
568       } else {
569         print_error("error in grn_ctx_init() on recv thread.");
570       }
571     } else {
572       print_error("cannot create recv zmq_socket.");
573     }
574   } else {
575     print_error("cannot connect zmq_socket.");
576   }
577   return NULL;
578 }
579 
580 static int
serve_threads(int nthreads,int port,const char * db_path,void * zmq_ctx,const char * send_endpoint,const char * recv_endpoint,const char * log_base_path)581 serve_threads(int nthreads, int port, const char *db_path, void *zmq_ctx,
582               const char *send_endpoint, const char *recv_endpoint,
583               const char *log_base_path)
584 {
585   int nfd;
586   uint32_t i;
587   if ((nfd = bind_socket(port)) < 0) {
588     print_error("cannot bind socket. please check port number with netstat.");
589     return -1;
590   }
591 
592   for (i = 0; i < nthreads; i++) {
593     memset(&threads[i], 0, sizeof(threads[i]));
594     threads[i].request_reopen_log_file = GRN_FALSE;
595     if (!(threads[i].base = event_init())) {
596       print_error("error in event_init() on thread %d.", i);
597     } else {
598       if (!(threads[i].httpd = evhttp_new(threads[i].base))) {
599         print_error("error in evhttp_new() on thread %d.", i);
600       } else {
601         int r;
602         if ((r = evhttp_accept_socket(threads[i].httpd, nfd))) {
603           print_error("error in evhttp_accept_socket() on thread %d.", i);
604         } else {
605           if (send_endpoint) {
606             if (!(threads[i].zmq_sock = zmq_socket(zmq_ctx, ZMQ_PUB))) {
607               print_error("cannot create zmq_socket.");
608             } else if (zmq_connect(threads[i].zmq_sock, send_endpoint)) {
609               print_error("cannot connect zmq_socket.");
610               zmq_close(threads[i].zmq_sock);
611               threads[i].zmq_sock = NULL;
612             } else {
613               uint64_t hwm = 1;
614               zmq_setsockopt(threads[i].zmq_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
615             }
616           } else {
617             threads[i].zmq_sock = NULL;
618           }
619           if (!(threads[i].ctx = grn_ctx_open(0))) {
620             print_error("error in grn_ctx_open() on thread %d.", i);
621           } else if (grn_ctx_use(threads[i].ctx, db)) {
622             print_error("error in grn_db_open() on thread %d.", i);
623           } else {
624             GRN_TEXT_INIT(&(threads[i].cmd_buf), 0);
625             GRN_TEXT_INIT(&(threads[i].pass_through_parameters), 0);
626             threads[i].log_base_path = log_base_path;
627             threads[i].thread_id = i;
628             evhttp_set_gencb(threads[i].httpd, generic_handler, &threads[i]);
629             evhttp_set_timeout(threads[i].httpd, 10);
630             {
631               struct timeval tv = {1, 0};
632               evtimer_set(&(threads[i].pulse), timeout_handler, &threads[i]);
633               evtimer_add(&(threads[i].pulse), &tv);
634             }
635             if ((r = pthread_create(&(threads[i].thd), NULL, dispatch, threads[i].base))) {
636               print_error("error in pthread_create() on thread %d.", i);
637             }
638           }
639         }
640       }
641     }
642   }
643 
644   /* recv thread from learner */
645   if (recv_endpoint) {
646     recv_thd_data rthd;
647     rthd.db_path = db_path;
648     rthd.recv_endpoint = recv_endpoint;
649     rthd.zmq_ctx = zmq_ctx;
650 
651     if (pthread_create(&(rthd.thd), NULL, recv_from_learner, &rthd)) {
652       print_error("error in pthread_create() on thread %d.", i);
653     }
654     if (pthread_join(rthd.thd, NULL)) {
655       print_error("error in pthread_join() on thread %d.", i);
656     }
657   } else {
658     while (loop) { sleep(1); }
659   }
660 
661   /* join all httpd thread */
662   for (i = 0; i < nthreads; i++) {
663     if (threads[i].thd) {
664       if (pthread_join(threads[i].thd, NULL)) {
665         print_error("error in pthread_join() on thread %d.", i);
666       }
667     }
668     cleanup_httpd_thread(&(threads[i]));
669   }
670   return 0;
671 }
672 
673 static uint32_t
get_core_number(void)674 get_core_number(void)
675 {
676 #ifdef ACTUALLY_GET_CORE_NUMBER
677 #ifdef _SC_NPROCESSORS_CONF
678   return sysconf(_SC_NPROCESSORS_CONF);
679 #else /* _SC_NPROCESSORS_CONF */
680   int n_processors;
681   size_t length = sizeof(n_processors);
682   int mib[] = {CTL_HW, HW_NCPU};
683   if (sysctl(mib, sizeof(mib) / sizeof(mib[0]),
684              &n_processors, &length, NULL, 0) == 0 &&
685       length == sizeof(n_processors) &&
686       0 < n_processors) {
687     return n_processors;
688   } else {
689     return 1;
690   }
691 #endif /* _SC_NPROCESSORS_CONF */
692 #endif /* ACTUALLY_GET_CORE_NUMBER */
693   return 0;
694 }
695 
696 static void
usage(FILE * output)697 usage(FILE *output)
698 {
699   fprintf(
700     output,
701     "Usage: groonga-suggest-httpd [options...] db_path\n"
702     "db_path:\n"
703     "  specify groonga database path which is used for suggestion.\n"
704     "\n"
705     "options:\n"
706     "  -p, --port <port number>                  : http server port number\n"
707     "                                              (default: %d)\n"
708     /*
709     "  --address <ip/hostname>                   : server address to listen\n"
710     "                                              (default: %s)\n"
711     */
712     "  -c <thread number>                        : number of server threads\n"
713     "                                              (deprecated. use --n-threads)\n"
714     "  -t, --n-threads <thread number>           : number of server threads\n"
715     "                                              (default: %d)\n"
716     "  -s, --send-endpoint <send endpoint>       : send endpoint\n"
717     "                                              (ex. tcp://example.com:1234)\n"
718     "  -r, --receive-endpoint <receive endpoint> : receive endpoint\n"
719     "                                              (ex. tcp://example.com:1235)\n"
720     "  -l, --log-base-path <path prefix>         : log path prefix\n"
721     "  --n-lines-per-log-file <lines number>     : number of lines in a log file\n"
722     "                                              use 0 for disabling this\n"
723     "                                              (default: %d)\n"
724     "  -d, --daemon                              : daemonize\n"
725     "  --disable-max-fd-check                    : disable max FD check on start\n"
726     "  -h, --help                                : show this message\n",
727     DEFAULT_PORT, default_max_threads, n_lines_per_log_file);
728 }
729 
730 int
main(int argc,char ** argv)731 main(int argc, char **argv)
732 {
733   int port_no = DEFAULT_PORT;
734   const char *max_threads_string = NULL, *port_string = NULL;
735   const char *address;
736   const char *send_endpoint = NULL, *recv_endpoint = NULL, *log_base_path = NULL;
737   const char *n_lines_per_log_file_string = NULL;
738   int n_processed_args, flags = RUN_MODE_ENABLE_MAX_FD_CHECK;
739   run_mode mode = run_mode_none;
740 
741   if (!(default_max_threads = get_core_number())) {
742     default_max_threads = DEFAULT_MAX_THREADS;
743   }
744 
745   /* parse options */
746   {
747     static grn_str_getopt_opt opts[] = {
748       {'c', NULL, NULL, 0, GETOPT_OP_NONE}, /* deprecated */
749       {'t', "n-threads", NULL, 0, GETOPT_OP_NONE},
750       {'h', "help", NULL, run_mode_usage, GETOPT_OP_UPDATE},
751       {'p', "port", NULL, 0, GETOPT_OP_NONE},
752       {'\0', "bind-address", NULL, 0, GETOPT_OP_NONE}, /* not supported yet */
753       {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE},
754       {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE},
755       {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE},
756       {'\0', "n-lines-per-log-file", NULL, 0, GETOPT_OP_NONE},
757       {'d', "daemon", NULL, run_mode_daemon, GETOPT_OP_UPDATE},
758       {'\0', "disable-max-fd-check", NULL, RUN_MODE_ENABLE_MAX_FD_CHECK,
759        GETOPT_OP_OFF},
760       {'\0', NULL, NULL, 0, 0}
761     };
762     opts[0].arg = &max_threads_string;
763     opts[1].arg = &max_threads_string;
764     opts[3].arg = &port_string;
765     opts[4].arg = &address;
766     opts[5].arg = &send_endpoint;
767     opts[6].arg = &recv_endpoint;
768     opts[7].arg = &log_base_path;
769     opts[8].arg = &n_lines_per_log_file_string;
770 
771     n_processed_args = grn_str_getopt(argc, argv, opts, &flags);
772   }
773 
774   /* main */
775   mode = (flags & RUN_MODE_MASK);
776   if (n_processed_args < 0 ||
777       (argc - n_processed_args) != 1 ||
778       mode == run_mode_error) {
779     usage(stderr);
780     return EXIT_FAILURE;
781   } else if (mode == run_mode_usage) {
782     usage(stdout);
783     return EXIT_SUCCESS;
784   } else {
785     grn_ctx ctx;
786     void *zmq_ctx;
787 
788     if (max_threads_string) {
789       max_threads = atoi(max_threads_string);
790       if (max_threads > MAX_THREADS) {
791         print_error("too many threads. limit to %d.", MAX_THREADS);
792         max_threads = MAX_THREADS;
793       }
794     } else {
795       max_threads = default_max_threads;
796     }
797 
798     if (port_string) {
799       port_no = atoi(port_string);
800     }
801 
802     if (flags & RUN_MODE_ENABLE_MAX_FD_CHECK) {
803       /* check environment */
804       struct rlimit rlim;
805       if (!getrlimit(RLIMIT_NOFILE, &rlim)) {
806         if (rlim.rlim_max < MIN_MAX_FDS) {
807           print_error("too small max fds. %d required.", MIN_MAX_FDS);
808           return -1;
809         }
810         rlim.rlim_cur = rlim.rlim_cur;
811         setrlimit(RLIMIT_NOFILE, &rlim);
812       }
813     }
814 
815     if (n_lines_per_log_file_string) {
816       int64_t n_lines;
817       n_lines = grn_atoll(n_lines_per_log_file_string,
818                           n_lines_per_log_file_string + strlen(n_lines_per_log_file_string),
819                           NULL);
820       if (n_lines < 0) {
821         print_error("--n-lines-per-log-file must be >= 0: <%s>",
822                     n_lines_per_log_file_string);
823         return(EXIT_FAILURE);
824       }
825       if (n_lines > UINT32_MAX) {
826         print_error("--n-lines-per-log-file must be <= %ld: <%s>",
827                     UINT32_MAX, n_lines_per_log_file_string);
828         return(EXIT_FAILURE);
829       }
830       n_lines_per_log_file = (uint32_t)n_lines;
831     }
832 
833     if (mode == run_mode_daemon) {
834       daemonize();
835     }
836 
837     grn_init();
838     grn_ctx_init(&ctx, 0);
839     if ((db = grn_db_open(&ctx, argv[n_processed_args]))) {
840       if ((zmq_ctx = zmq_init(1))) {
841         signal(SIGTERM, signal_handler);
842         signal(SIGINT, signal_handler);
843         signal(SIGQUIT, signal_handler);
844         signal(SIGUSR1, signal_reopen_log_file);
845 
846         serve_threads(max_threads, port_no, argv[n_processed_args], zmq_ctx,
847                       send_endpoint, recv_endpoint, log_base_path);
848         zmq_term(zmq_ctx);
849       } else {
850         print_error("cannot create zmq context.");
851       }
852       grn_obj_close(&ctx, db);
853     } else {
854       print_error("cannot open db.");
855     }
856     grn_ctx_fin(&ctx);
857     grn_fin();
858   }
859   return 0;
860 }
861