1 /* -*- c-basic-offset: 2 -*- */
2 /*
3   Copyright(C) 2009-2017 Brazil
4 
5   This library is free software; you can redistribute it and/or
6   modify it under the terms of the GNU Lesser General Public
7   License version 2.1 as published by the Free Software Foundation.
8 
9   This library is distributed in the hope that it will be useful,
10   but WITHOUT ANY WARRANTY; without even the implied warranty of
11   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   Lesser General Public License for more details.
13 
14   You should have received a copy of the GNU Lesser General Public
15   License along with this library; if not, write to the Free Software
16   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
17 */
18 
19 #include <string.h>
20 #include <stdio.h>
21 #include <ctype.h>
22 #include <fcntl.h>
23 #include <sys/stat.h>
24 #include <errno.h>
25 
26 #ifdef WIN32
27 # define GROONGA_MAIN
28 #endif /* WIN32 */
29 #include <grn.h>
30 
31 #include <grn_com.h>
32 #include <grn_ctx_impl.h>
33 #include <grn_proc.h>
34 #include <grn_db.h>
35 #include <grn_util.h>
36 #include <grn_error.h>
37 
38 #ifdef HAVE_SYS_WAIT_H
39 # include <sys/wait.h>
40 #endif /* HAVE_SYS_WAIT_H */
41 #ifdef HAVE_SYS_SOCKET_H
42 # include <sys/socket.h>
43 #endif /* HAVE_SYS_SOCKET_H */
44 #ifndef WIN32
45 # include <netinet/in.h>
46 #endif /* WIN32 */
47 
48 #ifdef HAVE_SYS_RESOURCE_H
49 # include <sys/resource.h>
50 #endif /* HAVE_SYS_RESOURCE_H */
51 
52 #ifdef HAVE_SYS_SYSCTL_H
53 # include <sys/sysctl.h>
54 #endif /* HAVE_SYS_SYSCTL_H */
55 
56 #ifdef WIN32
57 # include <io.h>
58 # include <direct.h>
59 #else /* WIN32 */
60 # include <sys/uio.h>
61 #endif /* WIN32 */
62 
63 #ifndef USE_MSG_NOSIGNAL
64 # ifdef MSG_NOSIGNAL
65 #  undef MSG_NOSIGNAL
66 # endif
67 # define MSG_NOSIGNAL 0
68 #endif /* USE_MSG_NOSIGNAL */
69 
70 #ifndef STDIN_FILENO
71 # define STDIN_FILENO 0
72 #endif /* STDIN_FILENO */
73 #ifndef STDOUT_FILENO
74 # define STDOUT_FILENO 1
75 #endif /* STDOUT_FILENO */
76 #ifndef STDERR_FILENO
77 # define STDERR_FILENO 2
78 #endif /* STDERR_FILENO */
79 
80 #define DEFAULT_HTTP_PORT 10041
81 #define DEFAULT_GQTP_PORT 10043
82 #define DEFAULT_DEST "localhost"
83 #define DEFAULT_MAX_N_FLOATING_THREADS 8
84 #define MAX_CON 0x10000
85 
86 #define RLIMIT_NOFILE_MINIMUM 4096
87 
88 static char bind_address[HOST_NAME_MAX + 1];
89 static char hostname[HOST_NAME_MAX + 1];
90 static int port = DEFAULT_GQTP_PORT;
91 static int batchmode;
92 static int number_of_lines = 0;
93 static int newdb;
94 static grn_bool is_daemon_mode = GRN_FALSE;
95 static int (*do_client)(int argc, char **argv);
96 static int (*do_server)(char *path);
97 static const char *pid_file_path = NULL;
98 static const char *input_path = NULL;
99 static grn_file_reader *input_reader = NULL;
100 static FILE *output = NULL;
101 static grn_bool is_memcached_mode = GRN_FALSE;
102 static const char *memcached_column_name = NULL;
103 
104 static int ready_notify_pipe[2];
105 #define PIPE_READ  0
106 #define PIPE_WRITE 1
107 
108 static grn_encoding encoding;
109 static const char *windows_event_source_name = "Groonga";
110 static grn_bool use_windows_event_log = GRN_FALSE;
111 static grn_obj http_response_server_line;
112 
113 static int
grn_rc_to_exit_code(grn_rc rc)114 grn_rc_to_exit_code(grn_rc rc)
115 {
116   if (rc == GRN_SUCCESS) {
117     return EXIT_SUCCESS;
118   } else {
119     return EXIT_FAILURE;
120   }
121 }
122 
123 static void
break_accept_event_loop(grn_ctx * ctx)124 break_accept_event_loop(grn_ctx *ctx)
125 {
126   grn_com *client;
127   const char *address;
128 
129   if (strcmp(bind_address, "0.0.0.0") == 0) {
130     address = "127.0.0.1";
131   } else if (strcmp(bind_address, "::") == 0) {
132     address = "::1";
133     } else {
134     address = bind_address;
135   }
136   client = grn_com_copen(ctx, NULL, address, port);
137   if (client) {
138     grn_com_close(ctx, client);
139   }
140 }
141 
142 #ifdef GRN_WITH_LIBEDIT
143 #include <locale.h>
144 #include <histedit.h>
145 static EditLine   *line_editor = NULL;
146 static HistoryW   *line_editor_history = NULL;
147 static HistEventW line_editor_history_event;
148 static char       line_editor_history_path[PATH_MAX] = "";
149 
150 static const wchar_t *
line_editor_prompt(EditLine * e)151 line_editor_prompt(EditLine *e __attribute__((unused)))
152 {
153   return L"> ";
154 }
155 static const wchar_t * const line_editor_editor = L"emacs";
156 
157 static void
line_editor_init(int argc,char * argv[])158 line_editor_init(int argc __attribute__((unused)), char *argv[])
159 {
160   const char * const HOME_PATH = getenv("HOME");
161   const char * const HISTORY_PATH = "/.groonga-history";
162 
163   setlocale(LC_ALL, "");
164 
165   if (strlen(HOME_PATH) + strlen(HISTORY_PATH) < PATH_MAX) {
166     grn_strcpy(line_editor_history_path, PATH_MAX, HOME_PATH);
167     grn_strcat(line_editor_history_path, PATH_MAX, HISTORY_PATH);
168   } else {
169     line_editor_history_path[0] = '\0';
170   }
171 
172   line_editor_history = history_winit();
173   history_w(line_editor_history, &line_editor_history_event, H_SETSIZE, 200);
174   if (line_editor_history_path[0]) {
175     history_w(line_editor_history, &line_editor_history_event,
176               H_LOAD, line_editor_history_path);
177   }
178 
179   line_editor = el_init(argv[0], stdin, stdout, stderr);
180   el_wset(line_editor, EL_PROMPT, &line_editor_prompt);
181   el_wset(line_editor, EL_EDITOR, line_editor_editor);
182   el_wset(line_editor, EL_HIST, history_w, line_editor_history);
183   el_source(line_editor, NULL);
184 }
185 
186 static void
line_editor_fin(void)187 line_editor_fin(void)
188 {
189   if (line_editor) {
190     el_end(line_editor);
191     if (line_editor_history) {
192       if (line_editor_history_path[0]) {
193         history_w(line_editor_history, &line_editor_history_event,
194                   H_SAVE, line_editor_history_path);
195       }
196       history_wend(line_editor_history);
197     }
198   }
199 }
200 
201 static grn_rc
line_editor_fgets(grn_ctx * ctx,grn_obj * buf)202 line_editor_fgets(grn_ctx *ctx, grn_obj *buf)
203 {
204   grn_rc rc = GRN_SUCCESS;
205   const wchar_t *line;
206   int nchar;
207   line = el_wgets(line_editor, &nchar);
208   if (nchar > 0) {
209     int i;
210     char multibyte_buf[MB_CUR_MAX];
211     size_t multibyte_len;
212     mbstate_t ps;
213     history_w(line_editor_history, &line_editor_history_event, H_ENTER, line);
214     memset(&ps, 0, sizeof(ps));
215     wcrtomb(NULL, L'\0', &ps);
216     for (i = 0; i < nchar; i++) {
217       multibyte_len = wcrtomb(multibyte_buf, line[i], &ps);
218       if (multibyte_len == (size_t)-1) {
219         GRN_LOG(ctx, GRN_LOG_WARNING,
220                 "[prompt][libedit] failed to read input: %s", strerror(errno));
221         rc = GRN_INVALID_ARGUMENT;
222       } else {
223         GRN_TEXT_PUT(ctx, buf, multibyte_buf, multibyte_len);
224       }
225     }
226   } else {
227     rc = GRN_END_OF_DATA;
228   }
229   return rc;
230 }
231 #endif /* GRN_WITH_LIBEDIT */
232 
233 inline static grn_rc
read_next_line(grn_ctx * ctx,grn_obj * buf)234 read_next_line(grn_ctx *ctx, grn_obj *buf)
235 {
236   static int the_first_read = GRN_TRUE;
237   grn_rc rc = GRN_SUCCESS;
238   if (!batchmode) {
239 #ifdef GRN_WITH_LIBEDIT
240     rc = line_editor_fgets(ctx, buf);
241 #else
242     fprintf(stderr, "> ");
243     fflush(stderr);
244     rc = grn_file_reader_read_line(ctx, input_reader, buf);
245 #endif
246   } else {
247     rc = grn_file_reader_read_line(ctx, input_reader, buf);
248     if (rc != GRN_END_OF_DATA) {
249       number_of_lines++;
250     }
251   }
252   if (the_first_read && GRN_TEXT_LEN(buf) > 0) {
253     const char bom[] = {0xef, 0xbb, 0xbf};
254     if (GRN_CTX_GET_ENCODING(ctx) == GRN_ENC_UTF8 &&
255         GRN_TEXT_LEN(buf) > 3 && !memcmp(GRN_TEXT_VALUE(buf), bom, 3)) {
256       grn_obj buf_without_bom;
257       GRN_TEXT_INIT(&buf_without_bom, 0);
258       GRN_TEXT_PUT(ctx, &buf_without_bom,
259                    GRN_TEXT_VALUE(buf) + 3, GRN_TEXT_LEN(buf) - 3);
260       GRN_TEXT_SET(ctx, buf,
261                    GRN_TEXT_VALUE(&buf_without_bom),
262                    GRN_TEXT_LEN(&buf_without_bom));
263       grn_obj_unlink(ctx, &buf_without_bom);
264     }
265     the_first_read = GRN_FALSE;
266   }
267   if (GRN_TEXT_LEN(buf) > 0 &&
268       GRN_TEXT_VALUE(buf)[GRN_TEXT_LEN(buf) - 1] == '\n') {
269     grn_bulk_truncate(ctx, buf, GRN_TEXT_LEN(buf) - 1);
270   }
271   if (GRN_TEXT_LEN(buf) > 0 &&
272       GRN_TEXT_VALUE(buf)[GRN_TEXT_LEN(buf) - 1] == '\r') {
273     grn_bulk_truncate(ctx, buf, GRN_TEXT_LEN(buf) - 1);
274   }
275   return rc;
276 }
277 
278 inline static grn_rc
prompt(grn_ctx * ctx,grn_obj * buf)279 prompt(grn_ctx *ctx, grn_obj *buf)
280 {
281   grn_rc rc = GRN_SUCCESS;
282   grn_bool need_next_line = GRN_TRUE;
283   GRN_BULK_REWIND(buf);
284   while (need_next_line) {
285     rc = read_next_line(ctx, buf);
286     if (rc == GRN_SUCCESS &&
287         GRN_TEXT_LEN(buf) > 0 &&
288         GRN_TEXT_VALUE(buf)[GRN_TEXT_LEN(buf) - 1] == '\\') {
289       grn_bulk_truncate(ctx, buf, GRN_TEXT_LEN(buf) - 1);
290       need_next_line = GRN_TRUE;
291     } else {
292       need_next_line = GRN_FALSE;
293     }
294   }
295   return rc;
296 }
297 
298 static void
output_envelope(grn_ctx * ctx,grn_rc rc,grn_obj * head,grn_obj * body,grn_obj * foot)299 output_envelope(grn_ctx *ctx, grn_rc rc, grn_obj *head, grn_obj *body, grn_obj *foot)
300 {
301   grn_output_envelope(ctx, rc, head, body, foot, input_path, number_of_lines);
302 }
303 
304 static void
s_output_raw(grn_ctx * ctx,int flags,FILE * stream)305 s_output_raw(grn_ctx *ctx, int flags, FILE *stream)
306 {
307   char *chunk = NULL;
308   unsigned int chunk_size = 0;
309   int recv_flags;
310 
311   grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
312   if (chunk_size > 0) {
313     fwrite(chunk, 1, chunk_size, stream);
314   }
315 
316   if (flags & GRN_CTX_TAIL) {
317     grn_obj *command;
318 
319     if (grn_ctx_get_output_type(ctx) == GRN_CONTENT_GROONGA_COMMAND_LIST &&
320         chunk_size > 0 &&
321         chunk[chunk_size - 1] != '\n') {
322       fwrite("\n", 1, 1, stream);
323     }
324     fflush(stream);
325 
326     command = GRN_CTX_USER_DATA(ctx)->ptr;
327     GRN_BULK_REWIND(command);
328   }
329 }
330 
331 static void
s_output_typed(grn_ctx * ctx,int flags,FILE * stream)332 s_output_typed(grn_ctx *ctx, int flags, FILE *stream)
333 {
334   if (ctx && ctx->impl && (flags & GRN_CTX_TAIL)) {
335     char *chunk = NULL;
336     unsigned int chunk_size = 0;
337     int recv_flags;
338     grn_obj body;
339     grn_obj *command;
340 
341     GRN_TEXT_INIT(&body, 0);
342     grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
343     GRN_TEXT_SET(ctx, &body, chunk, chunk_size);
344 
345     if (GRN_TEXT_LEN(&body) || ctx->rc) {
346       grn_obj head, foot;
347       GRN_TEXT_INIT(&head, 0);
348       GRN_TEXT_INIT(&foot, 0);
349       output_envelope(ctx, ctx->rc, &head, &body, &foot);
350       fwrite(GRN_TEXT_VALUE(&head), 1, GRN_TEXT_LEN(&head), stream);
351       fwrite(GRN_TEXT_VALUE(&body), 1, GRN_TEXT_LEN(&body), stream);
352       fwrite(GRN_TEXT_VALUE(&foot), 1, GRN_TEXT_LEN(&foot), stream);
353       fputc('\n', stream);
354       fflush(stream);
355       GRN_OBJ_FIN(ctx, &head);
356       GRN_OBJ_FIN(ctx, &foot);
357     }
358     GRN_OBJ_FIN(ctx, &body);
359 
360     command = GRN_CTX_USER_DATA(ctx)->ptr;
361     GRN_BULK_REWIND(command);
362   }
363 }
364 
365 static void
s_output(grn_ctx * ctx,int flags,void * arg)366 s_output(grn_ctx *ctx, int flags, void *arg)
367 {
368   FILE *stream = (FILE *)arg;
369 
370   switch (grn_ctx_get_output_type(ctx)) {
371   case GRN_CONTENT_GROONGA_COMMAND_LIST :
372   case GRN_CONTENT_NONE :
373     s_output_raw(ctx, flags, stream);
374     break;
375   default :
376     s_output_typed(ctx, flags, stream);
377     break;
378   }
379 }
380 
381 static int
do_alone(int argc,char ** argv)382 do_alone(int argc, char **argv)
383 {
384   int exit_code = EXIT_FAILURE;
385   char *path = NULL;
386   grn_obj *db;
387   grn_ctx ctx_, *ctx = &ctx_;
388   grn_ctx_init(ctx, 0);
389   if (argc > 0 && argv) { path = *argv++; argc--; }
390   db = (newdb || !path) ? grn_db_create(ctx, path, NULL) : grn_db_open(ctx, path);
391   if (db) {
392     grn_obj command;
393     GRN_TEXT_INIT(&command, 0);
394     GRN_CTX_USER_DATA(ctx)->ptr = &command;
395     grn_ctx_recv_handler_set(ctx, s_output, output);
396     if (!argc) {
397       grn_obj text;
398       GRN_TEXT_INIT(&text, 0);
399       while (prompt(ctx, &text) != GRN_END_OF_DATA) {
400         GRN_TEXT_PUT(ctx, &command, GRN_TEXT_VALUE(&text), GRN_TEXT_LEN(&text));
401         grn_ctx_send(ctx, GRN_TEXT_VALUE(&text), GRN_TEXT_LEN(&text), 0);
402         if (ctx->stat == GRN_CTX_QUIT) { break; }
403       }
404       exit_code = grn_rc_to_exit_code(ctx->rc);
405       grn_obj_unlink(ctx, &text);
406     } else {
407       grn_rc rc;
408       rc = grn_ctx_sendv(ctx, argc, argv, 0);
409       exit_code = grn_rc_to_exit_code(rc);
410     }
411     grn_obj_unlink(ctx, &command);
412     grn_obj_close(ctx, db);
413   } else {
414     fprintf(stderr, "db open failed (%s): %s\n", path, ctx->errbuf);
415   }
416   grn_ctx_fin(ctx);
417   return exit_code;
418 }
419 
420 static int
c_output(grn_ctx * ctx)421 c_output(grn_ctx *ctx)
422 {
423   int flags;
424   char *str;
425   unsigned int str_len;
426   do {
427     grn_ctx_recv(ctx, &str, &str_len, &flags);
428     /*
429     if (ctx->rc) {
430       fprintf(stderr, "grn_ctx_recv failed\n");
431       return -1;
432     }
433     */
434     if (str_len || ctx->rc) {
435       grn_obj head, body, foot;
436       GRN_TEXT_INIT(&head, 0);
437       GRN_TEXT_INIT(&body, GRN_OBJ_DO_SHALLOW_COPY);
438       GRN_TEXT_INIT(&foot, 0);
439       if (ctx->rc == GRN_SUCCESS) {
440         GRN_TEXT_SET(ctx, &body, str, str_len);
441       } else {
442         ERR(ctx->rc, "%.*s", str_len, str);
443       }
444       output_envelope(ctx, ctx->rc, &head, &body, &foot);
445       fwrite(GRN_TEXT_VALUE(&head), 1, GRN_TEXT_LEN(&head), output);
446       fwrite(GRN_TEXT_VALUE(&body), 1, GRN_TEXT_LEN(&body), output);
447       fwrite(GRN_TEXT_VALUE(&foot), 1, GRN_TEXT_LEN(&foot), output);
448       fputc('\n', output);
449       fflush(output);
450       GRN_OBJ_FIN(ctx, &head);
451       GRN_OBJ_FIN(ctx, &body);
452       GRN_OBJ_FIN(ctx, &foot);
453     }
454   } while ((flags & GRN_CTX_MORE));
455   return 0;
456 }
457 
458 static int
g_client(int argc,char ** argv)459 g_client(int argc, char **argv)
460 {
461   int exit_code = EXIT_FAILURE;
462   grn_ctx ctx_, *ctx = &ctx_;
463   const char *hostname = DEFAULT_DEST;
464   if (argc > 0 && argv) { hostname = *argv++; argc--; }
465   grn_ctx_init(ctx, 0);
466   if (!grn_ctx_connect(ctx, hostname, port, 0)) {
467     if (!argc) {
468       grn_obj text;
469       GRN_TEXT_INIT(&text, 0);
470       while (prompt(ctx, &text) != GRN_END_OF_DATA) {
471         grn_ctx_send(ctx, GRN_TEXT_VALUE(&text), GRN_TEXT_LEN(&text), 0);
472         exit_code = grn_rc_to_exit_code(ctx->rc);
473         if (ctx->rc != GRN_SUCCESS) { break; }
474         if (c_output(ctx)) { goto exit; }
475         if (ctx->stat == GRN_CTX_QUIT) { break; }
476       }
477       grn_obj_unlink(ctx, &text);
478     } else {
479       grn_rc rc;
480       rc = grn_ctx_sendv(ctx, argc, argv, 0);
481       exit_code = grn_rc_to_exit_code(rc);
482       if (c_output(ctx)) { goto exit; }
483     }
484   } else {
485     fprintf(stderr, "grn_ctx_connect failed (%s:%d)\n", hostname, port);
486   }
487 exit :
488   grn_ctx_fin(ctx);
489   return exit_code;
490 }
491 
492 /* server */
493 
494 typedef void (*grn_edge_dispatcher_func)(grn_ctx *ctx, grn_edge *edge);
495 typedef void (*grn_handler_func)(grn_ctx *ctx, grn_obj *msg);
496 
497 static grn_com_queue ctx_new;
498 static grn_com_queue ctx_old;
499 static grn_mutex q_mutex;
500 static grn_cond q_cond;
501 static uint32_t n_running_threads = 0;
502 static uint32_t n_floating_threads = 0;
503 static uint32_t max_n_floating_threads;
504 
505 static uint32_t
groonga_get_thread_limit(void * data)506 groonga_get_thread_limit(void *data)
507 {
508   return max_n_floating_threads;
509 }
510 
511 static void
groonga_set_thread_limit(uint32_t new_limit,void * data)512 groonga_set_thread_limit(uint32_t new_limit, void *data)
513 {
514   uint32_t i;
515   uint32_t current_n_floating_threads;
516   static uint32_t n_changing_threads = 0;
517   uint32_t prev_n_changing_threads;
518 
519   GRN_ATOMIC_ADD_EX(&n_changing_threads, 1, prev_n_changing_threads);
520 
521   MUTEX_LOCK_ENSURE(&grn_gctx, q_mutex);
522   current_n_floating_threads = n_floating_threads;
523   max_n_floating_threads = new_limit;
524   MUTEX_UNLOCK(q_mutex);
525 
526   if (prev_n_changing_threads > 0) {
527     GRN_ATOMIC_ADD_EX(&n_changing_threads, -1, prev_n_changing_threads);
528     return;
529   }
530 
531   if (current_n_floating_threads > new_limit) {
532     for (i = 0; i < current_n_floating_threads; i++) {
533       MUTEX_LOCK_ENSURE(&grn_gctx, q_mutex);
534       COND_SIGNAL(q_cond);
535       MUTEX_UNLOCK(q_mutex);
536     }
537   }
538 
539   while (GRN_TRUE) {
540     grn_bool is_reduced;
541     MUTEX_LOCK_ENSURE(&grn_gctx, q_mutex);
542     is_reduced = (n_running_threads <= max_n_floating_threads);
543     if (!is_reduced && n_floating_threads > 0) {
544       COND_SIGNAL(q_cond);
545     }
546     MUTEX_UNLOCK(q_mutex);
547     if (is_reduced) {
548       break;
549     }
550     grn_nanosleep(1000000);
551   }
552 
553   GRN_ATOMIC_ADD_EX(&n_changing_threads, -1, prev_n_changing_threads);
554 }
555 
556 typedef struct {
557   grn_mutex mutex;
558   grn_ctx ctx;
559   grn_pat *entries;
560   uint64_t earliest_unix_time_msec;
561 } request_timer_data;
562 static request_timer_data the_request_timer_data;
563 
564 static void *
request_timer_register(const char * request_id,unsigned int request_id_size,double timeout,void * user_data)565 request_timer_register(const char *request_id,
566                        unsigned int request_id_size,
567                        double timeout,
568                        void *user_data)
569 {
570   request_timer_data *data = user_data;
571   grn_id id = GRN_ID_NIL;
572 
573   {
574     grn_ctx *ctx = &(data->ctx);
575     grn_bool is_first_timer;
576     grn_timeval tv;
577     uint64_t timeout_unix_time_msec;
578     void *value;
579 
580     MUTEX_LOCK(data->mutex);
581     is_first_timer = (grn_pat_size(ctx, data->entries) == 0);
582     grn_timeval_now(ctx, &tv);
583     timeout_unix_time_msec = GRN_TIMEVAL_TO_MSEC(&tv) + (timeout * 1000);
584     while (GRN_TRUE) {
585       int added;
586       id = grn_pat_add(ctx, data->entries,
587                        &timeout_unix_time_msec, sizeof(uint64_t),
588                        &value, &added);
589       if (added != 0) {
590         break;
591       }
592       timeout_unix_time_msec++;
593     }
594     grn_memcpy(value, &request_id_size, sizeof(unsigned int));
595     grn_memcpy(((uint8_t *)value) + sizeof(unsigned int),
596                request_id, request_id_size);
597     if (data->earliest_unix_time_msec == 0 ||
598         data->earliest_unix_time_msec > timeout_unix_time_msec) {
599       data->earliest_unix_time_msec = timeout_unix_time_msec;
600     }
601     if (is_first_timer) {
602       break_accept_event_loop(ctx);
603     }
604     MUTEX_UNLOCK(data->mutex);
605   }
606 
607   return (void *)(uint64_t)id;
608 }
609 
610 static void
request_timer_unregister(void * timer_id,void * user_data)611 request_timer_unregister(void *timer_id,
612                          void *user_data)
613 {
614   request_timer_data *data = user_data;
615   grn_id id = (grn_id)(uint64_t)timer_id;
616 
617   {
618     grn_ctx *ctx = &(data->ctx);
619     uint64_t timeout_unix_time_msec;
620     int key_size;
621 
622     MUTEX_LOCK(data->mutex);
623     key_size = grn_pat_get_key(ctx,
624                                data->entries,
625                                id,
626                                &timeout_unix_time_msec,
627                                sizeof(uint64_t));
628     if (key_size > 0) {
629       grn_pat_delete_by_id(ctx, data->entries, id, NULL);
630       if (data->earliest_unix_time_msec >= timeout_unix_time_msec) {
631         data->earliest_unix_time_msec = 0;
632       }
633     }
634     MUTEX_UNLOCK(data->mutex);
635   }
636 }
637 
638 static void
request_timer_fin(void * user_data)639 request_timer_fin(void *user_data)
640 {
641   request_timer_data *data = user_data;
642 
643   {
644     grn_ctx *ctx = &(data->ctx);
645     grn_pat_close(ctx, data->entries);
646     grn_ctx_fin(ctx);
647     MUTEX_FIN(data->mutex);
648   }
649 }
650 
651 static void
request_timer_init(void)652 request_timer_init(void)
653 {
654   static grn_request_timer timer;
655   request_timer_data *data = &the_request_timer_data;
656   grn_ctx *ctx;
657 
658   MUTEX_INIT(data->mutex);
659   ctx = &(data->ctx);
660   grn_ctx_init(ctx, 0);
661   data->entries = grn_pat_create(ctx,
662                                  NULL,
663                                  sizeof(uint64_t),
664                                  GRN_TABLE_MAX_KEY_SIZE,
665                                  GRN_OBJ_KEY_UINT);
666   data->earliest_unix_time_msec = 0;
667 
668   timer.user_data = data;
669   timer.register_func = request_timer_register;
670   timer.unregister_func = request_timer_unregister;
671   timer.fin_func = request_timer_fin;
672 
673   grn_request_timer_set(&timer);
674 }
675 
676 static grn_bool
request_timer_ensure_earliest_unix_time_msec(void)677 request_timer_ensure_earliest_unix_time_msec(void)
678 {
679   request_timer_data *data = &the_request_timer_data;
680   grn_ctx *ctx;
681   grn_pat_cursor *cursor;
682 
683   if (data->earliest_unix_time_msec > 0) {
684     return GRN_TRUE;
685   }
686 
687   ctx = &(data->ctx);
688   cursor = grn_pat_cursor_open(ctx, data->entries,
689                                NULL, 0,
690                                NULL, 0,
691                                0, 1, GRN_CURSOR_ASCENDING);
692   if (!cursor) {
693     return GRN_FALSE;
694   }
695   while (grn_pat_cursor_next(ctx, cursor) != GRN_ID_NIL) {
696     void *key;
697     uint64_t timeout_unix_time_msec;
698 
699     grn_pat_cursor_get_key(ctx, cursor, &key);
700     timeout_unix_time_msec = *(uint64_t *)key;
701     data->earliest_unix_time_msec = timeout_unix_time_msec;
702     break;
703   }
704   grn_pat_cursor_close(ctx, cursor);
705 
706   return data->earliest_unix_time_msec > 0;
707 }
708 
709 static int
request_timer_get_poll_timeout(void)710 request_timer_get_poll_timeout(void)
711 {
712   request_timer_data *data = &the_request_timer_data;
713   int timeout = 1000;
714   grn_ctx *ctx;
715   grn_timeval tv;
716 
717   MUTEX_LOCK(data->mutex);
718   ctx = &(data->ctx);
719   if (grn_pat_size(ctx, data->entries) == 0) {
720     goto exit;
721   }
722 
723   if (!request_timer_ensure_earliest_unix_time_msec()) {
724     goto exit;
725   }
726 
727   grn_timeval_now(ctx, &tv);
728   timeout = data->earliest_unix_time_msec - GRN_TIMEVAL_TO_MSEC(&tv);
729   if (timeout < 0) {
730     timeout = 0;
731   } else if (timeout > 1000) {
732     timeout = 1000;
733   }
734 
735 exit :
736   MUTEX_UNLOCK(data->mutex);
737 
738   return timeout;
739 }
740 
741 static void
request_timer_process_timeout(void)742 request_timer_process_timeout(void)
743 {
744   request_timer_data *data = &the_request_timer_data;
745   grn_ctx *ctx;
746   grn_timeval tv;
747   uint64_t max;
748   grn_pat_cursor *cursor;
749 
750   ctx = &(data->ctx);
751   if (grn_pat_size(ctx, data->entries) == 0) {
752     return;
753   }
754 
755   grn_timeval_now(ctx, &tv);
756   max = GRN_TIMEVAL_TO_MSEC(&tv);
757   cursor = grn_pat_cursor_open(ctx, data->entries,
758                                NULL, 0,
759                                &max, sizeof(uint64_t),
760                                0, -1, GRN_CURSOR_ASCENDING);
761   if (!cursor) {
762     return;
763   }
764 
765   grn_id id;
766   while ((id = grn_pat_cursor_next(ctx, cursor)) != GRN_ID_NIL) {
767     void *value;
768     const char *request_id;
769     unsigned int request_id_size;
770 
771     grn_pat_cursor_get_value(ctx, cursor, &value);
772     request_id_size = *((unsigned int *)value);
773     request_id = (const char *)(((uint8_t *)value) + sizeof(unsigned int));
774     grn_request_canceler_cancel(request_id, request_id_size);
775   }
776   grn_pat_cursor_close(ctx, cursor);
777 }
778 
779 static void
reset_ready_notify_pipe(void)780 reset_ready_notify_pipe(void)
781 {
782   ready_notify_pipe[PIPE_READ]  = 0;
783   ready_notify_pipe[PIPE_WRITE] = 0;
784 }
785 
786 static void
close_ready_notify_pipe(void)787 close_ready_notify_pipe(void)
788 {
789   if (ready_notify_pipe[PIPE_READ] > 0) {
790     close(ready_notify_pipe[PIPE_READ]);
791   }
792   if (ready_notify_pipe[PIPE_WRITE] > 0) {
793     close(ready_notify_pipe[PIPE_WRITE]);
794   }
795   reset_ready_notify_pipe();
796 }
797 
798 static void
send_ready_notify(void)799 send_ready_notify(void)
800 {
801   if (ready_notify_pipe[PIPE_WRITE] > 0) {
802     const char *ready_notify_message = "ready";
803     write(ready_notify_pipe[PIPE_WRITE],
804           ready_notify_message,
805           strlen(ready_notify_message));
806   }
807   close_ready_notify_pipe();
808 }
809 
810 static void
create_pid_file(void)811 create_pid_file(void)
812 {
813   FILE *pid_file = NULL;
814 
815   if (!pid_file_path) {
816     return;
817   }
818 
819   pid_file = fopen(pid_file_path, "w");
820   if (!pid_file) {
821     fprintf(stderr,
822             "Failed to open PID file: <%s>: <%s>\n",
823             pid_file_path, grn_strerror(errno));
824     return;
825   }
826 
827   {
828 #ifdef WIN32
829     DWORD pid;
830     pid = GetCurrentProcessId();
831     fprintf(pid_file, "%" GRN_FMT_DWORD "\n", pid);
832 #else /* WIN32 */
833     pid_t pid;
834     pid = grn_getpid();
835     fprintf(pid_file, "%d\n", pid);
836 #endif /* WIN32 */
837   }
838   fclose(pid_file);
839 }
840 
841 static void
clean_pid_file(void)842 clean_pid_file(void)
843 {
844   if (pid_file_path) {
845     grn_unlink(pid_file_path);
846   }
847 }
848 
849 static int
daemonize(void)850 daemonize(void)
851 {
852   int exit_code = EXIT_SUCCESS;
853 #ifndef WIN32
854 
855   if (pipe(ready_notify_pipe) == -1) {
856     reset_ready_notify_pipe();
857   }
858 
859   switch (fork()) {
860   case 0:
861     break;
862   case -1:
863     perror("fork");
864     return EXIT_FAILURE;
865   default:
866     wait(NULL);
867     if (ready_notify_pipe[PIPE_READ] > 0) {
868       int max_fd;
869       fd_set read_fds;
870       FD_ZERO(&read_fds);
871       FD_SET(ready_notify_pipe[PIPE_READ], &read_fds);
872       max_fd = ready_notify_pipe[PIPE_READ] + 1;
873       select(max_fd, &read_fds, NULL, NULL, NULL);
874     }
875     close_ready_notify_pipe();
876     _exit(EXIT_SUCCESS);
877   }
878   switch (fork()) {
879   case 0:
880     if (pid_file_path) {
881       create_pid_file();
882     } else {
883       pid_t pid;
884       pid = grn_getpid();
885       fprintf(stderr, "%d\n", pid);
886     }
887     break;
888   case -1:
889     perror("fork");
890     return EXIT_FAILURE;
891   default:
892     close_ready_notify_pipe();
893     _exit(EXIT_SUCCESS);
894   }
895   {
896     int null_fd;
897     grn_open(null_fd, "/dev/null", O_RDWR);
898     if (null_fd != -1) {
899       dup2(null_fd, STDIN_FILENO);
900       dup2(null_fd, STDOUT_FILENO);
901       dup2(null_fd, STDERR_FILENO);
902       if (null_fd > STDERR_FILENO) { grn_close(null_fd); }
903     }
904   }
905 #endif /* WIN32 */
906   return exit_code;
907 }
908 
909 static void
run_server_loop(grn_ctx * ctx,grn_com_event * ev)910 run_server_loop(grn_ctx *ctx, grn_com_event *ev)
911 {
912   request_timer_init();
913   while (!grn_com_event_poll(ctx, ev, request_timer_get_poll_timeout()) &&
914          grn_gctx.stat != GRN_CTX_QUIT) {
915     grn_edge *edge;
916     while ((edge = (grn_edge *)grn_com_queue_deque(ctx, &ctx_old))) {
917       grn_obj *msg;
918       while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) {
919         grn_msg_close(&edge->ctx, msg);
920       }
921       while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) {
922         grn_msg_close(ctx, msg);
923       }
924       grn_ctx_fin(&edge->ctx);
925       if (edge->com->has_sid && edge->com->opaque == edge) {
926         grn_com_close(ctx, edge->com);
927       }
928       grn_edges_delete(ctx, edge);
929     }
930     request_timer_process_timeout();
931     /* todo : log stat */
932   }
933   for (;;) {
934     MUTEX_LOCK_ENSURE(ctx, q_mutex);
935     if (n_running_threads == n_floating_threads) { break; }
936     MUTEX_UNLOCK(q_mutex);
937     grn_nanosleep(1000000);
938   }
939   {
940     grn_edge *edge;
941     GRN_HASH_EACH(ctx, grn_edges, id, NULL, NULL, &edge, {
942       grn_obj *obj;
943       while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) {
944         grn_msg_close(&edge->ctx, obj);
945       }
946       while ((obj = (grn_obj *)grn_com_queue_deque(ctx, &edge->recv_new))) {
947         grn_msg_close(ctx, obj);
948       }
949       grn_ctx_fin(&edge->ctx);
950       if (edge->com->has_sid) {
951         grn_com_close(ctx, edge->com);
952       }
953       grn_edges_delete(ctx, edge);
954     });
955   }
956   {
957     grn_com *com;
958     GRN_HASH_EACH(ctx, ev->hash, id, NULL, NULL, &com, { grn_com_close(ctx, com); });
959   }
960 }
961 
962 static int
run_server(grn_ctx * ctx,grn_obj * db,grn_com_event * ev,grn_edge_dispatcher_func dispatcher,grn_handler_func handler)963 run_server(grn_ctx *ctx, grn_obj *db, grn_com_event *ev,
964            grn_edge_dispatcher_func dispatcher, grn_handler_func handler)
965 {
966   int exit_code = EXIT_SUCCESS;
967   struct hostent *he;
968   if (!(he = gethostbyname(hostname))) {
969     send_ready_notify();
970     SOERR("gethostbyname");
971   } else {
972     ev->opaque = db;
973     grn_edges_init(ctx, dispatcher);
974     if (!grn_com_sopen(ctx, ev, bind_address, port, handler, he)) {
975       send_ready_notify();
976       run_server_loop(ctx, ev);
977       exit_code = EXIT_SUCCESS;
978     } else {
979       send_ready_notify();
980       fprintf(stderr, "grn_com_sopen failed (%s:%d): %s\n",
981               bind_address, port, ctx->errbuf);
982     }
983     grn_edges_fin(ctx);
984   }
985   return exit_code;
986 }
987 
988 static grn_bool memcached_init(grn_ctx *ctx);
989 
990 static int
start_service(grn_ctx * ctx,const char * db_path,grn_edge_dispatcher_func dispatcher,grn_handler_func handler)991 start_service(grn_ctx *ctx, const char *db_path,
992               grn_edge_dispatcher_func dispatcher, grn_handler_func handler)
993 {
994   int exit_code = EXIT_SUCCESS;
995   grn_com_event ev;
996 
997   if (is_daemon_mode) {
998     exit_code = daemonize();
999     if (exit_code != EXIT_SUCCESS) {
1000       return exit_code;
1001     }
1002   } else {
1003     create_pid_file();
1004   }
1005 
1006   if (!grn_com_event_init(ctx, &ev, MAX_CON, sizeof(grn_com))) {
1007     grn_obj *db;
1008     db = (newdb || !db_path) ? grn_db_create(ctx, db_path, NULL) : grn_db_open(ctx, db_path);
1009     if (db) {
1010       if (is_memcached_mode) {
1011         if (!memcached_init(ctx)) {
1012           fprintf(stderr, "failed to initialize memcached mode: %s\n",
1013                   ctx->errbuf);
1014           exit_code = EXIT_FAILURE;
1015           send_ready_notify();
1016         }
1017       }
1018       if (exit_code == EXIT_SUCCESS) {
1019         exit_code = run_server(ctx, db, &ev, dispatcher, handler);
1020       }
1021       grn_obj_close(ctx, db);
1022     } else {
1023       fprintf(stderr, "db open failed (%s): %s\n", db_path, ctx->errbuf);
1024       exit_code = EXIT_FAILURE;
1025       send_ready_notify();
1026     }
1027     grn_com_event_fin(ctx, &ev);
1028   } else {
1029     fprintf(stderr, "grn_com_event_init failed\n");
1030     exit_code = EXIT_FAILURE;
1031     send_ready_notify();
1032   }
1033 
1034   clean_pid_file();
1035 
1036   return exit_code;
1037 }
1038 
1039 typedef struct {
1040   grn_msg *msg;
1041   grn_bool in_body;
1042   grn_bool is_chunked;
1043 } ht_context;
1044 
1045 static void
h_output_set_header(grn_ctx * ctx,grn_obj * header,grn_rc rc,long long int content_length,grn_obj * foot)1046 h_output_set_header(grn_ctx *ctx,
1047                     grn_obj *header,
1048                     grn_rc rc,
1049                     long long int content_length,
1050                     grn_obj *foot)
1051 {
1052   switch (rc) {
1053   case GRN_SUCCESS :
1054     GRN_TEXT_SETS(ctx, header, "HTTP/1.1 200 OK\r\n");
1055     break;
1056   case GRN_INVALID_ARGUMENT :
1057   case GRN_FUNCTION_NOT_IMPLEMENTED :
1058   case GRN_SYNTAX_ERROR :
1059     GRN_TEXT_SETS(ctx, header, "HTTP/1.1 400 Bad Request\r\n");
1060     break;
1061   case GRN_NO_SUCH_FILE_OR_DIRECTORY :
1062     GRN_TEXT_SETS(ctx, header, "HTTP/1.1 404 Not Found\r\n");
1063     break;
1064   case GRN_CANCEL :
1065     GRN_TEXT_SETS(ctx, header, "HTTP/1.1 408 Request Timeout\r\n");
1066     break;
1067   default :
1068     GRN_TEXT_SETS(ctx, header, "HTTP/1.1 500 Internal Server Error\r\n");
1069     break;
1070   }
1071   GRN_TEXT_PUT(ctx, header,
1072                GRN_TEXT_VALUE(&http_response_server_line),
1073                GRN_TEXT_LEN(&http_response_server_line));
1074   GRN_TEXT_PUTS(ctx, header, "Content-Type: ");
1075   if (grn_ctx_get_output_type(ctx) == GRN_CONTENT_JSON &&
1076       foot &&
1077       GRN_TEXT_LEN(foot) > 0 &&
1078       GRN_TEXT_VALUE(foot)[GRN_TEXT_LEN(foot) - 1] == ';') {
1079     GRN_TEXT_PUTS(ctx, header, "application/javascript");
1080   } else {
1081     GRN_TEXT_PUTS(ctx, header, grn_ctx_get_mime_type(ctx));
1082   }
1083   GRN_TEXT_PUTS(ctx, header, "\r\n");
1084   if (content_length >= 0) {
1085     GRN_TEXT_PUTS(ctx, header, "Connection: close\r\n");
1086     GRN_TEXT_PUTS(ctx, header, "Content-Length: ");
1087     grn_text_lltoa(ctx, header, content_length);
1088     GRN_TEXT_PUTS(ctx, header, "\r\n");
1089   } else {
1090     GRN_TEXT_PUTS(ctx, header, "Transfer-Encoding: chunked\r\n");
1091   }
1092   GRN_TEXT_PUTS(ctx, header, "\r\n");
1093 }
1094 
1095 static void
h_output_send(grn_ctx * ctx,grn_sock fd,grn_obj * header,grn_obj * head,grn_obj * body,grn_obj * foot)1096 h_output_send(grn_ctx *ctx, grn_sock fd,
1097               grn_obj *header, grn_obj *head, grn_obj *body, grn_obj *foot)
1098 {
1099   ssize_t ret;
1100   ssize_t len = 0;
1101 #ifdef WIN32
1102   int n_buffers = 0;
1103   WSABUF wsabufs[4];
1104   if (header) {
1105     wsabufs[n_buffers].buf = GRN_TEXT_VALUE(header);
1106     wsabufs[n_buffers].len = GRN_TEXT_LEN(header);
1107     len += GRN_TEXT_LEN(header);
1108     n_buffers++;
1109   }
1110   if (head) {
1111     wsabufs[n_buffers].buf = GRN_TEXT_VALUE(head);
1112     wsabufs[n_buffers].len = GRN_TEXT_LEN(head);
1113     len += GRN_TEXT_LEN(head);
1114     n_buffers++;
1115   }
1116   if (body) {
1117     wsabufs[n_buffers].buf = GRN_TEXT_VALUE(body);
1118     wsabufs[n_buffers].len = GRN_TEXT_LEN(body);
1119     len += GRN_TEXT_LEN(body);
1120     n_buffers++;
1121   }
1122   if (foot) {
1123     wsabufs[n_buffers].buf = GRN_TEXT_VALUE(foot);
1124     wsabufs[n_buffers].len = GRN_TEXT_LEN(foot);
1125     len += GRN_TEXT_LEN(foot);
1126     n_buffers++;
1127   }
1128   {
1129     DWORD sent;
1130     if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) {
1131       SOERR("WSASend");
1132     }
1133     ret = sent;
1134   }
1135 #else /* WIN32 */
1136   struct iovec msg_iov[4];
1137   struct msghdr msg;
1138   msg.msg_name = NULL;
1139   msg.msg_namelen = 0;
1140   msg.msg_iov = msg_iov;
1141   msg.msg_iovlen = 0;
1142   msg.msg_control = NULL;
1143   msg.msg_controllen = 0;
1144   msg.msg_flags = 0;
1145 
1146   if (header) {
1147     msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(header);
1148     msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(header);
1149     len += GRN_TEXT_LEN(header);
1150     msg.msg_iovlen++;
1151   }
1152   if (head) {
1153     msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(head);
1154     msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(head);
1155     len += GRN_TEXT_LEN(head);
1156     msg.msg_iovlen++;
1157   }
1158   if (body) {
1159     msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(body);
1160     msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(body);
1161     len += GRN_TEXT_LEN(body);
1162     msg.msg_iovlen++;
1163   }
1164   if (foot) {
1165     msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(foot);
1166     msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(foot);
1167     len += GRN_TEXT_LEN(foot);
1168     msg.msg_iovlen++;
1169   }
1170   if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) {
1171     SOERR("sendmsg");
1172   }
1173 #endif /* WIN32 */
1174   if (ret != len) {
1175     GRN_LOG(&grn_gctx, GRN_LOG_NOTICE,
1176             "couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")",
1177             (long long int)ret, (long long int)len);
1178   }
1179 }
1180 
1181 static void
h_output_raw(grn_ctx * ctx,int flags,ht_context * hc)1182 h_output_raw(grn_ctx *ctx, int flags, ht_context *hc)
1183 {
1184   grn_rc expr_rc = ctx->rc;
1185   grn_sock fd = hc->msg->u.fd;
1186   grn_obj header_;
1187   grn_obj head_;
1188   grn_obj body_;
1189   grn_obj foot_;
1190   grn_obj *header = NULL;
1191   grn_obj *head = NULL;
1192   grn_obj *body = NULL;
1193   grn_obj *foot = NULL;
1194   char *chunk = NULL;
1195   unsigned int chunk_size = 0;
1196   int recv_flags;
1197   grn_bool is_last_message = (flags & GRN_CTX_TAIL);
1198 
1199   GRN_TEXT_INIT(&header_, 0);
1200   GRN_TEXT_INIT(&head_, 0);
1201   GRN_TEXT_INIT(&body_, GRN_OBJ_DO_SHALLOW_COPY);
1202   GRN_TEXT_INIT(&foot_, 0);
1203 
1204   grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
1205   GRN_TEXT_SET(ctx, &body_, chunk, chunk_size);
1206 
1207   if (!hc->in_body) {
1208     if (is_last_message) {
1209       h_output_set_header(ctx, &header_, expr_rc, GRN_TEXT_LEN(&body_), NULL);
1210       hc->is_chunked = GRN_FALSE;
1211     } else {
1212       h_output_set_header(ctx, &header_, expr_rc, -1, NULL);
1213       hc->is_chunked = GRN_TRUE;
1214     }
1215     header = &header_;
1216     hc->in_body = GRN_TRUE;
1217   }
1218 
1219   if (GRN_TEXT_LEN(&body_) > 0) {
1220     if (hc->is_chunked) {
1221       grn_text_printf(ctx, &head_,
1222                       "%x\r\n", (unsigned int)GRN_TEXT_LEN(&body_));
1223       head = &head_;
1224       GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
1225       foot = &foot_;
1226     }
1227     body = &body_;
1228   }
1229 
1230   if (is_last_message) {
1231     if (hc->is_chunked) {
1232       GRN_TEXT_PUTS(ctx, &foot_, "0\r\n");
1233       GRN_TEXT_PUTS(ctx, &foot_, "Connection: close\r\n");
1234       GRN_TEXT_PUTS(ctx, &foot_, "\r\n");
1235       foot = &foot_;
1236     }
1237   }
1238 
1239   h_output_send(ctx, fd, header, head, body, foot);
1240 
1241   GRN_OBJ_FIN(ctx, &foot_);
1242   GRN_OBJ_FIN(ctx, &body_);
1243   GRN_OBJ_FIN(ctx, &head_);
1244   GRN_OBJ_FIN(ctx, &header_);
1245 }
1246 
1247 static void
h_output_typed(grn_ctx * ctx,int flags,ht_context * hc)1248 h_output_typed(grn_ctx *ctx, int flags, ht_context *hc)
1249 {
1250   grn_rc expr_rc = ctx->rc;
1251   grn_sock fd = hc->msg->u.fd;
1252   grn_obj header, head, body, foot;
1253   char *chunk = NULL;
1254   unsigned int chunk_size = 0;
1255   int recv_flags;
1256   grn_bool should_return_body;
1257 
1258   if (!(flags & GRN_CTX_TAIL)) { return; }
1259 
1260   switch (hc->msg->header.qtype) {
1261   case 'G' :
1262   case 'P' :
1263     should_return_body = GRN_TRUE;
1264     break;
1265   default :
1266     should_return_body = GRN_FALSE;
1267     break;
1268   }
1269 
1270   GRN_TEXT_INIT(&header, 0);
1271   GRN_TEXT_INIT(&head, 0);
1272   GRN_TEXT_INIT(&body, 0);
1273   GRN_TEXT_INIT(&foot, 0);
1274 
1275   grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags);
1276   GRN_TEXT_SET(ctx, &body, chunk, chunk_size);
1277 
1278   output_envelope(ctx, expr_rc, &head, &body, &foot);
1279   h_output_set_header(ctx, &header, expr_rc,
1280                       GRN_TEXT_LEN(&head) +
1281                       GRN_TEXT_LEN(&body) +
1282                       GRN_TEXT_LEN(&foot),
1283                       &foot);
1284   if (should_return_body) {
1285     h_output_send(ctx, fd, &header, &head, &body, &foot);
1286   } else {
1287     h_output_send(ctx, fd, &header, NULL, NULL, NULL);
1288   }
1289   GRN_OBJ_FIN(ctx, &foot);
1290   GRN_OBJ_FIN(ctx, &body);
1291   GRN_OBJ_FIN(ctx, &head);
1292   GRN_OBJ_FIN(ctx, &header);
1293 }
1294 
1295 static void
h_output(grn_ctx * ctx,int flags,void * arg)1296 h_output(grn_ctx *ctx, int flags, void *arg)
1297 {
1298   ht_context *hc = (ht_context *)arg;
1299 
1300   switch (grn_ctx_get_output_type(ctx)) {
1301   case GRN_CONTENT_GROONGA_COMMAND_LIST :
1302   case GRN_CONTENT_NONE :
1303     h_output_raw(ctx, flags, hc);
1304     break;
1305   default :
1306     h_output_typed(ctx, flags, hc);
1307     break;
1308   }
1309 }
1310 
1311 static void
do_htreq_get(grn_ctx * ctx,ht_context * hc)1312 do_htreq_get(grn_ctx *ctx, ht_context *hc)
1313 {
1314   grn_msg *msg = hc->msg;
1315   char *path = NULL;
1316   char *pathe = GRN_BULK_HEAD((grn_obj *)msg);
1317   char *e = GRN_BULK_CURR((grn_obj *)msg);
1318   for (;; pathe++) {
1319     if (e <= pathe + 6) {
1320       /* invalid request */
1321       return;
1322     }
1323     if (*pathe == ' ') {
1324       if (!path) {
1325         path = pathe + 1;
1326       } else {
1327         if (!memcmp(pathe + 1, "HTTP/1", 6)) {
1328           break;
1329         }
1330       }
1331     }
1332   }
1333   grn_ctx_send(ctx, path, pathe - path, GRN_CTX_TAIL);
1334 }
1335 
1336 typedef struct {
1337   const char *path_start;
1338   int path_length;
1339   long long int content_length;
1340   grn_bool have_100_continue;
1341   const char *body_start;
1342 } h_post_header;
1343 
1344 #define STRING_EQUAL(string, string_length, constant_string)\
1345   (string_length == strlen(constant_string) &&\
1346    strncmp(string, constant_string, string_length) == 0)
1347 
1348 #define STRING_EQUAL_CI(string, string_length, constant_string)\
1349   (string_length == strlen(constant_string) &&\
1350    grn_strncasecmp(string, constant_string, string_length) == 0)
1351 
1352 static const char *
do_htreq_post_parse_header_request_line(grn_ctx * ctx,const char * start,const char * end,h_post_header * header)1353 do_htreq_post_parse_header_request_line(grn_ctx *ctx,
1354                                         const char *start,
1355                                         const char *end,
1356                                         h_post_header *header)
1357 {
1358   const char *current;
1359 
1360   {
1361     const char *method = start;
1362     int method_length = -1;
1363 
1364     for (current = method; current < end; current++) {
1365       if (current[0] == '\n') {
1366         return NULL;
1367       }
1368       if (current[0] == ' ') {
1369         method_length = current - method;
1370         current++;
1371         break;
1372       }
1373     }
1374     if (method_length == -1) {
1375       return NULL;
1376     }
1377     if (!STRING_EQUAL_CI(method, method_length, "POST")) {
1378       return NULL;
1379     }
1380   }
1381 
1382   {
1383     header->path_start = current;
1384     header->path_length = -1;
1385     for (; current < end; current++) {
1386       if (current[0] == '\n') {
1387         return NULL;
1388       }
1389       if (current[0] == ' ') {
1390         header->path_length = current - header->path_start;
1391         current++;
1392         break;
1393       }
1394     }
1395     if (header->path_length == -1) {
1396       return NULL;
1397     }
1398   }
1399 
1400   {
1401     const char *http_version_start = current;
1402     int http_version_length = -1;
1403     for (; current < end; current++) {
1404       if (current[0] == '\n') {
1405         http_version_length = current - http_version_start;
1406         if (http_version_length > 0 &&
1407             http_version_start[http_version_length - 1] == '\r') {
1408           http_version_length--;
1409         }
1410         current++;
1411         break;
1412       }
1413     }
1414     if (http_version_length == -1) {
1415       return NULL;
1416     }
1417     if (!(STRING_EQUAL_CI(http_version_start, http_version_length, "HTTP/1.0") ||
1418           STRING_EQUAL_CI(http_version_start, http_version_length, "HTTP/1.1"))) {
1419       return NULL;
1420     }
1421   }
1422 
1423   return current;
1424 }
1425 
1426 static const char *
do_htreq_post_parse_header_values(grn_ctx * ctx,const char * start,const char * end,h_post_header * header)1427 do_htreq_post_parse_header_values(grn_ctx *ctx,
1428                                   const char *start,
1429                                   const char *end,
1430                                   h_post_header *header)
1431 {
1432   const char *current;
1433   const char *name = start;
1434   int name_length = -1;
1435   const char *value = NULL;
1436   int value_length = -1;
1437 
1438   for (current = start; current < end; current++) {
1439     switch (current[0]) {
1440     case '\n' :
1441       if (name_length == -1) {
1442         if (current - name == 1 && current[-1] == '\r') {
1443           return current + 1;
1444         } else {
1445           /* No ":" header line. TODO: report error. */
1446           return NULL;
1447         }
1448       } else {
1449         while (value < current && value[0] == ' ') {
1450           value++;
1451         }
1452         value_length = current - value;
1453         if (value_length > 0 && value[value_length - 1] == '\r') {
1454           value_length--;
1455         }
1456         if (STRING_EQUAL_CI(name, name_length, "Content-Length")) {
1457           const char *rest;
1458           header->content_length = grn_atoll(value, value + value_length, &rest);
1459           if (rest != value + value_length) {
1460             /* Invalid Content-Length value. TODO: report error. */
1461             header->content_length = -1;
1462           }
1463         } else if (STRING_EQUAL_CI(name, name_length, "Expect")) {
1464           if (STRING_EQUAL(value, value_length, "100-continue")) {
1465             header->have_100_continue = GRN_TRUE;
1466           }
1467         }
1468       }
1469       name = current + 1;
1470       name_length = -1;
1471       value = NULL;
1472       value_length = -1;
1473       break;
1474     case ':' :
1475       if (name_length == -1) {
1476         name_length = current - name;
1477         value = current + 1;
1478       }
1479       break;
1480     default :
1481       break;
1482     }
1483   }
1484 
1485   return NULL;
1486 }
1487 
1488 static grn_bool
do_htreq_post_parse_header(grn_ctx * ctx,const char * start,const char * end,h_post_header * header)1489 do_htreq_post_parse_header(grn_ctx *ctx,
1490                            const char *start,
1491                            const char *end,
1492                            h_post_header *header)
1493 {
1494   const char *current;
1495 
1496   current = do_htreq_post_parse_header_request_line(ctx, start, end, header);
1497   if (!current) {
1498     return GRN_FALSE;
1499   }
1500   current = do_htreq_post_parse_header_values(ctx, current, end, header);
1501   if (!current) {
1502     return GRN_FALSE;
1503   }
1504 
1505   if (current == end) {
1506     header->body_start = NULL;
1507   } else {
1508     header->body_start = current;
1509   }
1510 
1511   return GRN_TRUE;
1512 }
1513 
1514 static void
do_htreq_post(grn_ctx * ctx,ht_context * hc)1515 do_htreq_post(grn_ctx *ctx, ht_context *hc)
1516 {
1517   grn_msg *msg = hc->msg;
1518   grn_sock fd = msg->u.fd;
1519   const char *end;
1520   h_post_header header;
1521 
1522   header.path_start = NULL;
1523   header.path_length = -1;
1524   header.content_length = -1;
1525   header.body_start = NULL;
1526   header.have_100_continue = GRN_FALSE;
1527 
1528   end = GRN_BULK_CURR((grn_obj *)msg);
1529   if (!do_htreq_post_parse_header(ctx,
1530                                   GRN_BULK_HEAD((grn_obj *)msg),
1531                                   end,
1532                                   &header)) {
1533     return;
1534   }
1535 
1536   grn_ctx_send(ctx, header.path_start, header.path_length, GRN_CTX_MORE);
1537   if (ctx->rc != GRN_SUCCESS) {
1538     ht_context context;
1539     context.msg = msg;
1540     context.in_body = GRN_FALSE;
1541     context.is_chunked = GRN_FALSE;
1542     h_output(ctx, GRN_CTX_TAIL, &context);
1543     return;
1544   }
1545 
1546   if (header.have_100_continue) {
1547     const char *continue_message = "HTTP/1.1 100 Continue\r\n";
1548     ssize_t send_size;
1549     int send_flags = MSG_NOSIGNAL;
1550     send_size = send(fd, continue_message, strlen(continue_message), send_flags);
1551     if (send_size == -1) {
1552       SOERR("send");
1553       return;
1554     }
1555   }
1556 
1557   {
1558     grn_obj chunk_buffer;
1559     long long int read_content_length = 0;
1560 
1561     GRN_TEXT_INIT(&chunk_buffer, 0);
1562     while (read_content_length < header.content_length) {
1563 #define POST_BUFFER_SIZE 8192
1564       char buffer[POST_BUFFER_SIZE];
1565       const char *buffer_start, *buffer_current, *buffer_end;
1566 
1567       if (header.body_start) {
1568         buffer_start = header.body_start;
1569         buffer_end = end;
1570         header.body_start = NULL;
1571       } else {
1572         ssize_t recv_length;
1573         int recv_flags = 0;
1574         recv_length = recv(fd, buffer, POST_BUFFER_SIZE, recv_flags);
1575         if (recv_length == 0) {
1576           break;
1577         }
1578         if (recv_length == -1) {
1579           SOERR("recv");
1580           break;
1581         }
1582         buffer_start = buffer;
1583         buffer_end = buffer_start + recv_length;
1584       }
1585       read_content_length += buffer_end - buffer_start;
1586 
1587       buffer_current = buffer_end - 1;
1588       for (; buffer_current > buffer_start; buffer_current--) {
1589         grn_bool is_separator;
1590         switch (buffer_current[0]) {
1591         case '\n' :
1592         case ',' :
1593           is_separator = GRN_TRUE;
1594           break;
1595         default :
1596           is_separator = GRN_FALSE;
1597           break;
1598         }
1599         if (!is_separator) {
1600           continue;
1601         }
1602 
1603         GRN_TEXT_PUT(ctx,
1604                      &chunk_buffer,
1605                      buffer_start,
1606                      buffer_current + 1 - buffer_start);
1607         {
1608           int flags = 0;
1609           if (!(read_content_length == header.content_length &&
1610                 buffer_current + 1 == buffer_end)) {
1611             flags |= GRN_CTX_MORE;
1612           } else {
1613             flags |= GRN_CTX_TAIL;
1614           }
1615           grn_ctx_send(ctx,
1616                        GRN_TEXT_VALUE(&chunk_buffer),
1617                        GRN_TEXT_LEN(&chunk_buffer),
1618                        flags);
1619         }
1620         buffer_start = buffer_current + 1;
1621         GRN_BULK_REWIND(&chunk_buffer);
1622         break;
1623       }
1624       if (buffer_end > buffer_start) {
1625         GRN_TEXT_PUT(ctx, &chunk_buffer,
1626                      buffer_start, buffer_end - buffer_start);
1627       }
1628 #undef POST_BUFFER_SIZE
1629 
1630       if (ctx->rc != GRN_SUCCESS) {
1631         break;
1632       }
1633     }
1634 
1635     if (ctx->rc == GRN_CANCEL) {
1636       h_output(ctx, GRN_CTX_TAIL, hc);
1637     } else if (ctx->rc == GRN_SUCCESS && GRN_TEXT_LEN(&chunk_buffer) > 0) {
1638       grn_ctx_send(ctx,
1639                    GRN_TEXT_VALUE(&chunk_buffer),
1640                    GRN_TEXT_LEN(&chunk_buffer),
1641                    GRN_CTX_TAIL);
1642     }
1643 
1644     GRN_OBJ_FIN(ctx, &chunk_buffer);
1645   }
1646 }
1647 
1648 static void
do_htreq(grn_ctx * ctx,ht_context * hc)1649 do_htreq(grn_ctx *ctx, ht_context *hc)
1650 {
1651   grn_msg *msg = hc->msg;
1652   grn_com_header *header = &msg->header;
1653   switch (header->qtype) {
1654   case 'G' : /* GET */
1655   case 'H' : /* HEAD */
1656     do_htreq_get(ctx, hc);
1657     break;
1658   case 'P' : /* POST */
1659     do_htreq_post(ctx, hc);
1660     break;
1661   }
1662   /* if (ctx->rc != GRN_OPERATION_WOULD_BLOCK) {...} */
1663   grn_msg_close(ctx, (grn_obj *)msg);
1664   /* if not keep alive connection */
1665   grn_sock_close(msg->u.fd);
1666   grn_com_event_start_accept(ctx, msg->acceptor->ev);
1667 }
1668 
1669 enum {
1670   MBRES_SUCCESS = 0x00,
1671   MBRES_KEY_ENOENT = 0x01,
1672   MBRES_KEY_EEXISTS = 0x02,
1673   MBRES_E2BIG = 0x03,
1674   MBRES_EINVAL = 0x04,
1675   MBRES_NOT_STORED = 0x05,
1676   MBRES_UNKNOWN_COMMAND = 0x81,
1677   MBRES_ENOMEM = 0x82,
1678 };
1679 
1680 enum {
1681   MBCMD_GET = 0x00,
1682   MBCMD_SET = 0x01,
1683   MBCMD_ADD = 0x02,
1684   MBCMD_REPLACE = 0x03,
1685   MBCMD_DELETE = 0x04,
1686   MBCMD_INCREMENT = 0x05,
1687   MBCMD_DECREMENT = 0x06,
1688   MBCMD_QUIT = 0x07,
1689   MBCMD_FLUSH = 0x08,
1690   MBCMD_GETQ = 0x09,
1691   MBCMD_NOOP = 0x0a,
1692   MBCMD_VERSION = 0x0b,
1693   MBCMD_GETK = 0x0c,
1694   MBCMD_GETKQ = 0x0d,
1695   MBCMD_APPEND = 0x0e,
1696   MBCMD_PREPEND = 0x0f,
1697   MBCMD_STAT = 0x10,
1698   MBCMD_SETQ = 0x11,
1699   MBCMD_ADDQ = 0x12,
1700   MBCMD_REPLACEQ = 0x13,
1701   MBCMD_DELETEQ = 0x14,
1702   MBCMD_INCREMENTQ = 0x15,
1703   MBCMD_DECREMENTQ = 0x16,
1704   MBCMD_QUITQ = 0x17,
1705   MBCMD_FLUSHQ = 0x18,
1706   MBCMD_APPENDQ = 0x19,
1707   MBCMD_PREPENDQ = 0x1a
1708 };
1709 
1710 static grn_obj *cache_table = NULL;
1711 static grn_obj *cache_value = NULL;
1712 static grn_obj *cache_flags = NULL;
1713 static grn_obj *cache_expire = NULL;
1714 static grn_obj *cache_cas = NULL;
1715 
1716 #define CTX_GET(name) (grn_ctx_get(ctx, (name), strlen(name)))
1717 
1718 static grn_bool
memcached_setup_flags_column(grn_ctx * ctx,const char * name)1719 memcached_setup_flags_column(grn_ctx *ctx, const char *name)
1720 {
1721   cache_flags = grn_obj_column(ctx, cache_table, name, strlen(name));
1722   if (cache_flags) {
1723     return GRN_TRUE;
1724   }
1725 
1726   cache_flags = grn_column_create(ctx, cache_table, name, strlen(name), NULL,
1727                                   GRN_OBJ_COLUMN_SCALAR|GRN_OBJ_PERSISTENT,
1728                                   grn_ctx_at(ctx, GRN_DB_UINT32));
1729   if (!cache_flags) {
1730     return GRN_FALSE;
1731   }
1732 
1733   return GRN_TRUE;
1734 }
1735 
1736 static grn_bool
memcached_setup_expire_column(grn_ctx * ctx,const char * name)1737 memcached_setup_expire_column(grn_ctx *ctx, const char *name)
1738 {
1739   cache_expire = grn_obj_column(ctx, cache_table, name, strlen(name));
1740   if (cache_expire) {
1741     return GRN_TRUE;
1742   }
1743 
1744   cache_expire = grn_column_create(ctx, cache_table, name, strlen(name), NULL,
1745                                    GRN_OBJ_COLUMN_SCALAR|GRN_OBJ_PERSISTENT,
1746                                    grn_ctx_at(ctx, GRN_DB_UINT32));
1747   if (!cache_expire) {
1748     return GRN_FALSE;
1749   }
1750 
1751   return GRN_TRUE;
1752 }
1753 
1754 static grn_bool
memcached_setup_cas_column(grn_ctx * ctx,const char * name)1755 memcached_setup_cas_column(grn_ctx *ctx, const char *name)
1756 {
1757   cache_cas = grn_obj_column(ctx, cache_table, name, strlen(name));
1758   if (cache_cas) {
1759     return GRN_TRUE;
1760   }
1761 
1762   cache_cas = grn_column_create(ctx, cache_table, name, strlen(name), NULL,
1763                                 GRN_OBJ_COLUMN_SCALAR|GRN_OBJ_PERSISTENT,
1764                                 grn_ctx_at(ctx, GRN_DB_UINT64));
1765   if (!cache_cas) {
1766     return GRN_FALSE;
1767   }
1768 
1769   return GRN_TRUE;
1770 }
1771 
1772 static grn_bool
memcached_init(grn_ctx * ctx)1773 memcached_init(grn_ctx *ctx)
1774 {
1775   if (memcached_column_name) {
1776     cache_value = CTX_GET(memcached_column_name);
1777     if (!cache_value) {
1778       ERR(GRN_INVALID_ARGUMENT,
1779           "memcached column doesn't exist: <%s>",
1780           memcached_column_name);
1781       return GRN_FALSE;
1782     }
1783     if (!(grn_obj_is_column(ctx, cache_value) &&
1784           ((cache_value->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) ==
1785            GRN_OBJ_COLUMN_SCALAR))) {
1786       grn_obj inspected;
1787       GRN_TEXT_INIT(&inspected, 0);
1788       grn_inspect(ctx, &inspected, cache_value);
1789       ERR(GRN_INVALID_ARGUMENT,
1790           "memcached column must be scalar column: <%.*s>",
1791           (int)GRN_TEXT_LEN(&inspected),
1792           GRN_TEXT_VALUE(&inspected));
1793       GRN_OBJ_FIN(ctx, &inspected);
1794       return GRN_FALSE;
1795     }
1796     if (!(GRN_DB_SHORT_TEXT <= grn_obj_get_range(ctx, cache_value) &&
1797           grn_obj_get_range(ctx, cache_value) <= GRN_DB_LONG_TEXT)) {
1798       grn_obj inspected;
1799       GRN_TEXT_INIT(&inspected, 0);
1800       grn_inspect(ctx, &inspected, cache_value);
1801       ERR(GRN_INVALID_ARGUMENT,
1802           "memcached column must be text column: <%.*s>",
1803           (int)GRN_TEXT_LEN(&inspected),
1804           GRN_TEXT_VALUE(&inspected));
1805       GRN_OBJ_FIN(ctx, &inspected);
1806       return GRN_FALSE;
1807     }
1808 
1809     cache_table = grn_ctx_at(ctx, cache_value->header.domain);
1810     if (cache_table->header.type == GRN_TABLE_NO_KEY) {
1811       grn_obj inspected;
1812       GRN_TEXT_INIT(&inspected, 0);
1813       grn_inspect(ctx, &inspected, cache_table);
1814       ERR(GRN_INVALID_ARGUMENT,
1815           "memcached column's table must be HASH_KEY, PAT_KEY or DAT_KEY table: "
1816           "<%.*s>",
1817           (int)GRN_TEXT_LEN(&inspected),
1818           GRN_TEXT_VALUE(&inspected));
1819       GRN_OBJ_FIN(ctx, &inspected);
1820       return GRN_FALSE;
1821     }
1822 
1823     {
1824       char column_name[GRN_TABLE_MAX_KEY_SIZE];
1825       char value_column_name[GRN_TABLE_MAX_KEY_SIZE];
1826       int value_column_name_size;
1827 
1828       value_column_name_size = grn_column_name(ctx, cache_value,
1829                                                value_column_name,
1830                                                GRN_TABLE_MAX_KEY_SIZE);
1831       grn_snprintf(column_name,
1832                    GRN_TABLE_MAX_KEY_SIZE,
1833                    GRN_TABLE_MAX_KEY_SIZE,
1834                    "%.*s_memcached_flags",
1835                    value_column_name_size,
1836                    value_column_name);
1837       if (!memcached_setup_flags_column(ctx, column_name)) {
1838         return GRN_FALSE;
1839       }
1840       grn_snprintf(column_name,
1841                    GRN_TABLE_MAX_KEY_SIZE,
1842                    GRN_TABLE_MAX_KEY_SIZE,
1843                    "%.*s_memcached_expire",
1844                    value_column_name_size,
1845                    value_column_name);
1846       if (!memcached_setup_expire_column(ctx, column_name)) {
1847         return GRN_FALSE;
1848       }
1849       grn_snprintf(column_name,
1850                    GRN_TABLE_MAX_KEY_SIZE,
1851                    GRN_TABLE_MAX_KEY_SIZE,
1852                    "%.*s_memcached_cas",
1853                    value_column_name_size,
1854                    value_column_name);
1855       if (!memcached_setup_cas_column(ctx, column_name)) {
1856         return GRN_FALSE;
1857       }
1858     }
1859   } else {
1860     const char *table_name = "Memcache";
1861     const char *value_column_name = "value";
1862 
1863     cache_table = CTX_GET(table_name);
1864     if (!cache_table) {
1865       cache_table = grn_table_create(ctx, table_name, strlen(table_name), NULL,
1866                                      GRN_OBJ_TABLE_PAT_KEY|GRN_OBJ_PERSISTENT,
1867                                      grn_ctx_at(ctx, GRN_DB_SHORT_TEXT),
1868                                      NULL);
1869       if (!cache_table) {
1870         return GRN_FALSE;
1871       }
1872     }
1873 
1874     cache_value = grn_obj_column(ctx, cache_table,
1875                                  value_column_name,
1876                                  strlen(value_column_name));
1877     if (!cache_value) {
1878       cache_value = grn_column_create(ctx, cache_table,
1879                                       value_column_name,
1880                                       strlen(value_column_name),
1881                                       NULL,
1882                                       GRN_OBJ_COLUMN_SCALAR|GRN_OBJ_PERSISTENT,
1883                                       grn_ctx_at(ctx, GRN_DB_SHORT_TEXT));
1884       if (!cache_value) {
1885         return GRN_FALSE;
1886       }
1887     }
1888 
1889     if (!memcached_setup_flags_column(ctx, "flags")) {
1890       return GRN_FALSE;
1891     }
1892     if (!memcached_setup_expire_column(ctx, "expire")) {
1893       return GRN_FALSE;
1894     }
1895     if (!memcached_setup_cas_column(ctx, "cas")) {
1896       return GRN_FALSE;
1897     }
1898   }
1899 
1900   return GRN_TRUE;
1901 }
1902 
1903 #define RELATIVE_TIME_THRESH 1000000000
1904 
1905 #define MBRES(ctx,re,status,key_len,extra_len,flags) do {\
1906   grn_msg_set_property((ctx), (re), (status), (key_len), (extra_len));\
1907   grn_msg_send((ctx), (re), (flags));\
1908 } while (0)
1909 
1910 #define GRN_MSG_MBRES(block) do {\
1911   if (!quiet) {\
1912     grn_obj *re = grn_msg_open_for_reply(ctx, (grn_obj *)msg, &edge->send_old);\
1913     ((grn_msg *)re)->header.qtype = header->qtype;\
1914     block\
1915   }\
1916 } while (0)
1917 
1918 static uint64_t
get_mbreq_cas_id()1919 get_mbreq_cas_id()
1920 {
1921   static uint64_t cas_id = 0;
1922   /* FIXME: use GRN_ATOMIC_ADD_EX_64, but it is not implemented */
1923   return ++cas_id;
1924 }
1925 
1926 static void
do_mbreq(grn_ctx * ctx,grn_edge * edge)1927 do_mbreq(grn_ctx *ctx, grn_edge *edge)
1928 {
1929   int quiet = 0;
1930   int flags = 0;
1931   grn_msg *msg = edge->msg;
1932   grn_com_header *header = &msg->header;
1933 
1934   switch (header->qtype) {
1935   case MBCMD_GETQ :
1936     flags = GRN_CTX_MORE;
1937     /* fallthru */
1938   case MBCMD_GET :
1939     {
1940       grn_id rid;
1941       uint16_t keylen = ntohs(header->keylen);
1942       char *key = GRN_BULK_HEAD((grn_obj *)msg);
1943       rid = grn_table_get(ctx, cache_table, key, keylen);
1944       if (!rid) {
1945         GRN_MSG_MBRES({
1946           MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
1947         });
1948       } else {
1949         grn_timeval tv;
1950         uint32_t expire;
1951         {
1952           grn_obj expire_buf;
1953           GRN_UINT32_INIT(&expire_buf, 0);
1954           grn_obj_get_value(ctx, cache_expire, rid, &expire_buf);
1955           expire = GRN_UINT32_VALUE(&expire_buf);
1956           grn_obj_close(ctx, &expire_buf);
1957         }
1958         grn_timeval_now(ctx, &tv);
1959         if (expire && expire < tv.tv_sec) {
1960           grn_table_delete_by_id(ctx, cache_table, rid);
1961           GRN_MSG_MBRES({
1962             MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
1963           });
1964         } else {
1965           grn_obj cas_buf;
1966           GRN_UINT64_INIT(&cas_buf, 0);
1967           grn_obj_get_value(ctx, cache_cas, rid, &cas_buf);
1968           GRN_MSG_MBRES({
1969             grn_obj_get_value(ctx, cache_flags, rid, re);
1970             grn_obj_get_value(ctx, cache_value, rid, re);
1971             ((grn_msg *)re)->header.cas = GRN_UINT64_VALUE(&cas_buf);
1972             MBRES(ctx, re, MBRES_SUCCESS, 0, 4, flags);
1973           });
1974           grn_obj_close(ctx, &cas_buf);
1975         }
1976       }
1977     }
1978     break;
1979   case MBCMD_SETQ :
1980   case MBCMD_ADDQ :
1981   case MBCMD_REPLACEQ :
1982     quiet = 1;
1983     /* fallthru */
1984   case MBCMD_SET :
1985   case MBCMD_ADD :
1986   case MBCMD_REPLACE :
1987     {
1988       grn_id rid;
1989       uint32_t size = ntohl(header->size);
1990       uint16_t keylen = ntohs(header->keylen);
1991       uint8_t extralen = header->level;
1992       char *body = GRN_BULK_HEAD((grn_obj *)msg);
1993       uint32_t flags = *((uint32_t *)body);
1994       uint32_t expire = ntohl(*((uint32_t *)(body + 4)));
1995       uint32_t valuelen = size - keylen - extralen;
1996       char *key = body + 8;
1997       char *value = key + keylen;
1998       int added = 0;
1999       int f = (header->qtype == MBCMD_REPLACE ||
2000                header->qtype == MBCMD_REPLACEQ) ? 0 : GRN_TABLE_ADD;
2001       GRN_ASSERT(extralen == 8);
2002       if (header->qtype == MBCMD_REPLACE || header->qtype == MBCMD_REPLACEQ) {
2003         rid = grn_table_get(ctx, cache_table, key, keylen);
2004       } else {
2005         rid = grn_table_add(ctx, cache_table, key, keylen, &added);
2006       }
2007       if (!rid) {
2008         GRN_MSG_MBRES({
2009           MBRES(ctx, re, (f & GRN_TABLE_ADD) ? MBRES_ENOMEM : MBRES_NOT_STORED, 0, 0, 0);
2010         });
2011       } else {
2012         if (added) {
2013           if (header->cas) {
2014             GRN_MSG_MBRES({
2015               MBRES(ctx, re, MBRES_EINVAL, 0, 0, 0);
2016             });
2017           } else {
2018             grn_obj text_buf, uint32_buf;
2019             GRN_TEXT_INIT(&text_buf, GRN_OBJ_DO_SHALLOW_COPY);
2020             GRN_TEXT_SET_REF(&text_buf, value, valuelen);
2021             grn_obj_set_value(ctx, cache_value, rid, &text_buf, GRN_OBJ_SET);
2022             GRN_UINT32_INIT(&uint32_buf, 0);
2023             GRN_UINT32_SET(ctx, &uint32_buf, flags);
2024             grn_obj_set_value(ctx, cache_flags, rid, &uint32_buf, GRN_OBJ_SET);
2025             if (expire && expire < RELATIVE_TIME_THRESH) {
2026               grn_timeval tv;
2027               grn_timeval_now(ctx, &tv);
2028               expire += tv.tv_sec;
2029             }
2030             GRN_UINT32_SET(ctx, &uint32_buf, expire);
2031             grn_obj_set_value(ctx, cache_expire, rid, &uint32_buf, GRN_OBJ_SET);
2032             grn_obj_close(ctx, &uint32_buf);
2033             {
2034               grn_obj cas_buf;
2035               uint64_t cas_id = get_mbreq_cas_id();
2036               GRN_UINT64_INIT(&cas_buf, 0);
2037               GRN_UINT64_SET(ctx, &cas_buf, cas_id);
2038               grn_obj_set_value(ctx, cache_cas, rid, &cas_buf, GRN_OBJ_SET);
2039               grn_obj_close(ctx, &cas_buf);
2040               GRN_MSG_MBRES({
2041                 ((grn_msg *)re)->header.cas = cas_id;
2042                 MBRES(ctx, re, MBRES_SUCCESS, 0, 0, 0);
2043               });
2044             }
2045           }
2046         } else {
2047           if (header->qtype != MBCMD_SET && header->qtype != MBCMD_SETQ) {
2048             grn_obj uint32_buf;
2049             grn_timeval tv;
2050             uint32_t oexpire;
2051 
2052             GRN_UINT32_INIT(&uint32_buf, 0);
2053             grn_obj_get_value(ctx, cache_expire, rid, &uint32_buf);
2054             oexpire = GRN_UINT32_VALUE(&uint32_buf);
2055             grn_timeval_now(ctx, &tv);
2056 
2057             if (oexpire && oexpire < tv.tv_sec) {
2058               if (header->qtype == MBCMD_REPLACE ||
2059                   header->qtype == MBCMD_REPLACEQ) {
2060                 grn_table_delete_by_id(ctx, cache_table, rid);
2061                 GRN_MSG_MBRES({
2062                   MBRES(ctx, re, MBRES_NOT_STORED, 0, 0, 0);
2063                 });
2064                 break;
2065               }
2066             } else if (header->qtype == MBCMD_ADD ||
2067                        header->qtype == MBCMD_ADDQ) {
2068               GRN_MSG_MBRES({
2069                 MBRES(ctx, re, MBRES_NOT_STORED, 0, 0, 0);
2070               });
2071               break;
2072             }
2073           }
2074           {
2075             if (header->cas) {
2076               grn_obj cas_buf;
2077               GRN_UINT64_INIT(&cas_buf, 0);
2078               grn_obj_get_value(ctx, cache_cas, rid, &cas_buf);
2079               if (header->cas != GRN_UINT64_VALUE(&cas_buf)) {
2080                 GRN_MSG_MBRES({
2081                   MBRES(ctx, re, MBRES_NOT_STORED, 0, 0, 0);
2082                 });
2083               }
2084             }
2085             {
2086               grn_obj text_buf, uint32_buf;
2087               GRN_TEXT_INIT(&text_buf, GRN_OBJ_DO_SHALLOW_COPY);
2088               GRN_TEXT_SET_REF(&text_buf, value, valuelen);
2089               grn_obj_set_value(ctx, cache_value, rid, &text_buf, GRN_OBJ_SET);
2090               GRN_UINT32_INIT(&uint32_buf, 0);
2091               GRN_UINT32_SET(ctx, &uint32_buf, flags);
2092               grn_obj_set_value(ctx, cache_flags, rid, &uint32_buf, GRN_OBJ_SET);
2093               if (expire && expire < RELATIVE_TIME_THRESH) {
2094                 grn_timeval tv;
2095                 grn_timeval_now(ctx, &tv);
2096                 expire += tv.tv_sec;
2097               }
2098               GRN_UINT32_SET(ctx, &uint32_buf, expire);
2099               grn_obj_set_value(ctx, cache_expire, rid, &uint32_buf, GRN_OBJ_SET);
2100               {
2101                 grn_obj cas_buf;
2102                 uint64_t cas_id = get_mbreq_cas_id();
2103                 GRN_UINT64_INIT(&cas_buf, 0);
2104                 GRN_UINT64_SET(ctx, &cas_buf, cas_id);
2105                 grn_obj_set_value(ctx, cache_cas, rid, &cas_buf, GRN_OBJ_SET);
2106                 GRN_MSG_MBRES({
2107                   ((grn_msg *)re)->header.cas = cas_id;
2108                   MBRES(ctx, re, MBRES_SUCCESS, 0, 0, 0);
2109                 });
2110               }
2111             }
2112           }
2113         }
2114       }
2115     }
2116     break;
2117   case MBCMD_DELETEQ :
2118     quiet = 1;
2119     /* fallthru */
2120   case MBCMD_DELETE :
2121     {
2122       grn_id rid;
2123       uint16_t keylen = ntohs(header->keylen);
2124       char *key = GRN_BULK_HEAD((grn_obj *)msg);
2125       rid = grn_table_get(ctx, cache_table, key, keylen);
2126       if (!rid) {
2127         /* GRN_LOG(ctx, GRN_LOG_NOTICE, "GET k=%d not found", keylen); */
2128         GRN_MSG_MBRES({
2129           MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
2130         });
2131       } else {
2132         grn_table_delete_by_id(ctx, cache_table, rid);
2133         GRN_MSG_MBRES({
2134           MBRES(ctx, re, MBRES_SUCCESS, 0, 4, 0);
2135         });
2136       }
2137     }
2138     break;
2139   case MBCMD_INCREMENTQ :
2140   case MBCMD_DECREMENTQ :
2141     quiet = 1;
2142     /* fallthru */
2143   case MBCMD_INCREMENT :
2144   case MBCMD_DECREMENT :
2145     {
2146       grn_id rid;
2147       int added = 0;
2148       uint64_t delta, init;
2149       uint16_t keylen = ntohs(header->keylen);
2150       char *body = GRN_BULK_HEAD((grn_obj *)msg);
2151       char *key = body + 20;
2152       uint32_t expire = ntohl(*((uint32_t *)(body + 16)));
2153       grn_ntoh(&delta, body, 8);
2154       grn_ntoh(&init, body + 8, 8);
2155       GRN_ASSERT(header->level == 20); /* extralen */
2156       if (expire == 0xffffffff) {
2157         rid = grn_table_get(ctx, cache_table, key, keylen);
2158       } else {
2159         rid = grn_table_add(ctx, cache_table, key, keylen, &added);
2160       }
2161       if (!rid) {
2162         GRN_MSG_MBRES({
2163           MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
2164         });
2165       } else {
2166         grn_obj uint32_buf, text_buf;
2167         GRN_UINT32_INIT(&uint32_buf, 0);
2168         GRN_TEXT_INIT(&text_buf, GRN_OBJ_DO_SHALLOW_COPY);
2169         if (added) {
2170           GRN_TEXT_SET_REF(&text_buf, &init, 8);
2171           grn_obj_set_value(ctx, cache_value, rid, &text_buf, GRN_OBJ_SET);
2172           GRN_UINT32_SET(ctx, &uint32_buf, 0);
2173           grn_obj_set_value(ctx, cache_flags, rid, &uint32_buf, GRN_OBJ_SET);
2174         } else {
2175           grn_timeval tv;
2176           uint32_t oexpire;
2177 
2178           grn_obj_get_value(ctx, cache_expire, rid, &uint32_buf);
2179           oexpire = GRN_UINT32_VALUE(&uint32_buf);
2180           grn_timeval_now(ctx, &tv);
2181 
2182           if (oexpire && oexpire < tv.tv_sec) {
2183             if (expire == 0xffffffffU) {
2184               GRN_MSG_MBRES({
2185                 MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
2186               });
2187               break;
2188             } else {
2189               GRN_TEXT_SET_REF(&text_buf, &init, 8);
2190               grn_obj_set_value(ctx, cache_value, rid, &text_buf, GRN_OBJ_SET);
2191               GRN_UINT32_SET(ctx, &uint32_buf, 0);
2192               grn_obj_set_value(ctx, cache_flags, rid, &uint32_buf, GRN_OBJ_SET);
2193             }
2194           } else {
2195             grn_obj uint64_buf;
2196             GRN_UINT64_INIT(&uint64_buf, 0);
2197             GRN_UINT64_SET(ctx, &uint64_buf, delta);
2198             grn_obj_set_value(ctx, cache_value, rid, &uint64_buf,
2199                               header->qtype == MBCMD_INCREMENT ||
2200                               header->qtype == MBCMD_INCREMENTQ
2201                               ? GRN_OBJ_INCR
2202                               : GRN_OBJ_DECR);
2203           }
2204         }
2205         if (expire && expire < RELATIVE_TIME_THRESH) {
2206           grn_timeval tv;
2207           grn_timeval_now(ctx, &tv);
2208           expire += tv.tv_sec;
2209         }
2210         GRN_UINT32_SET(ctx, &uint32_buf, expire);
2211         grn_obj_set_value(ctx, cache_expire, rid, &uint32_buf, GRN_OBJ_SET);
2212         GRN_MSG_MBRES({
2213           /* TODO: get_mbreq_cas_id() */
2214           grn_obj_get_value(ctx, cache_value, rid, re);
2215           grn_hton(&delta, (uint64_t *)GRN_BULK_HEAD(re), 8);
2216           GRN_TEXT_SET(ctx, re, &delta, sizeof(uint64_t));
2217           MBRES(ctx, re, MBRES_SUCCESS, 0, sizeof(uint64_t), 0);
2218         });
2219       }
2220     }
2221     break;
2222   case MBCMD_FLUSHQ :
2223     quiet = 1;
2224     /* fallthru */
2225   case MBCMD_FLUSH :
2226     {
2227       uint32_t expire;
2228       uint8_t extralen = header->level;
2229       if (extralen) {
2230         char *body = GRN_BULK_HEAD((grn_obj *)msg);
2231         GRN_ASSERT(extralen == 4);
2232         expire = ntohl(*((uint32_t *)(body)));
2233         if (expire < RELATIVE_TIME_THRESH) {
2234           grn_timeval tv;
2235           grn_timeval_now(ctx, &tv);
2236           if (expire) {
2237             expire += tv.tv_sec;
2238           } else {
2239             expire = tv.tv_sec - 1;
2240           }
2241         }
2242       } else {
2243         grn_timeval tv;
2244         grn_timeval_now(ctx, &tv);
2245         expire = tv.tv_sec - 1;
2246       }
2247       {
2248         grn_obj exp_buf;
2249         GRN_UINT32_INIT(&exp_buf, 0);
2250         GRN_UINT32_SET(ctx, &exp_buf, expire);
2251         GRN_TABLE_EACH(ctx, cache_table, 0, 0, rid, NULL, NULL, NULL, {
2252           grn_obj_set_value(ctx, cache_expire, rid, &exp_buf, GRN_OBJ_SET);
2253         });
2254         GRN_MSG_MBRES({
2255           MBRES(ctx, re, MBRES_SUCCESS, 0, 4, 0);
2256         });
2257         grn_obj_close(ctx, &exp_buf);
2258       }
2259     }
2260     break;
2261   case MBCMD_NOOP :
2262     break;
2263   case MBCMD_VERSION :
2264     GRN_MSG_MBRES({
2265       grn_bulk_write(ctx, re, PACKAGE_VERSION, strlen(PACKAGE_VERSION));
2266       MBRES(ctx, re, MBRES_SUCCESS, 0, 0, 0);
2267     });
2268     break;
2269   case MBCMD_GETKQ :
2270     flags = GRN_CTX_MORE;
2271     /* fallthru */
2272   case MBCMD_GETK :
2273     {
2274       grn_id rid;
2275       uint16_t keylen = ntohs(header->keylen);
2276       char *key = GRN_BULK_HEAD((grn_obj *)msg);
2277       rid = grn_table_get(ctx, cache_table, key, keylen);
2278       if (!rid) {
2279         GRN_MSG_MBRES({
2280           MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
2281         });
2282       } else {
2283         grn_obj uint32_buf;
2284         grn_timeval tv;
2285         uint32_t expire;
2286         GRN_UINT32_INIT(&uint32_buf, 0);
2287         grn_obj_get_value(ctx, cache_expire, rid, &uint32_buf);
2288         expire = GRN_UINT32_VALUE(&uint32_buf);
2289         grn_timeval_now(ctx, &tv);
2290         if (expire && expire < tv.tv_sec) {
2291           grn_table_delete_by_id(ctx, cache_table, rid);
2292           GRN_MSG_MBRES({
2293             MBRES(ctx, re, MBRES_KEY_ENOENT, 0, 0, 0);
2294           });
2295         } else {
2296           grn_obj uint64_buf;
2297           GRN_UINT64_INIT(&uint64_buf, 0);
2298           grn_obj_get_value(ctx, cache_cas, rid, &uint64_buf);
2299           GRN_MSG_MBRES({
2300             grn_obj_get_value(ctx, cache_flags, rid, re);
2301             grn_bulk_write(ctx, re, key, keylen);
2302             grn_obj_get_value(ctx, cache_value, rid, re);
2303             ((grn_msg *)re)->header.cas = GRN_UINT64_VALUE(&uint64_buf);
2304             MBRES(ctx, re, MBRES_SUCCESS, keylen, 4, flags);
2305           });
2306         }
2307       }
2308     }
2309     break;
2310   case MBCMD_APPENDQ :
2311   case MBCMD_PREPENDQ :
2312     quiet = 1;
2313     /* fallthru */
2314   case MBCMD_APPEND :
2315   case MBCMD_PREPEND :
2316     {
2317       grn_id rid;
2318       uint32_t size = ntohl(header->size);
2319       uint16_t keylen = ntohs(header->keylen);
2320       char *key = GRN_BULK_HEAD((grn_obj *)msg);
2321       char *value = key + keylen;
2322       uint32_t valuelen = size - keylen;
2323       rid = grn_table_add(ctx, cache_table, key, keylen, NULL);
2324       if (!rid) {
2325         GRN_MSG_MBRES({
2326           MBRES(ctx, re, MBRES_ENOMEM, 0, 0, 0);
2327         });
2328       } else {
2329         /* FIXME: check expire */
2330         grn_obj buf;
2331         int flags = header->qtype == MBCMD_APPEND ? GRN_OBJ_APPEND : GRN_OBJ_PREPEND;
2332         GRN_TEXT_INIT(&buf, GRN_OBJ_DO_SHALLOW_COPY);
2333         GRN_TEXT_SET_REF(&buf, value, valuelen);
2334         grn_obj_set_value(ctx, cache_value, rid, &buf, flags);
2335         GRN_MSG_MBRES({
2336           MBRES(ctx, re, MBRES_SUCCESS, 0, 0, 0);
2337         });
2338       }
2339     }
2340     break;
2341   case MBCMD_STAT :
2342     {
2343       pid_t pid = grn_getpid();
2344       GRN_MSG_MBRES({
2345         grn_bulk_write(ctx, re, "pid", 3);
2346         grn_text_itoa(ctx, re, pid);
2347         MBRES(ctx, re, MBRES_SUCCESS, 3, 0, 0);
2348       });
2349     }
2350     break;
2351   case MBCMD_QUITQ :
2352     quiet = 1;
2353     /* fallthru */
2354   case MBCMD_QUIT :
2355     GRN_MSG_MBRES({
2356       MBRES(ctx, re, MBRES_SUCCESS, 0, 0, 0);
2357     });
2358     /* fallthru */
2359   default :
2360     ctx->stat = GRN_CTX_QUIT;
2361     break;
2362   }
2363 }
2364 
2365 /* worker thread */
2366 
2367 enum {
2368   EDGE_IDLE = 0x00,
2369   EDGE_WAIT = 0x01,
2370   EDGE_DOING = 0x02,
2371   EDGE_ABORT = 0x03,
2372 };
2373 
2374 static void
check_rlimit_nofile(grn_ctx * ctx)2375 check_rlimit_nofile(grn_ctx *ctx)
2376 {
2377 #ifndef WIN32
2378   struct rlimit limit;
2379   limit.rlim_cur = 0;
2380   limit.rlim_max = 0;
2381   getrlimit(RLIMIT_NOFILE, &limit);
2382   if (limit.rlim_cur < RLIMIT_NOFILE_MINIMUM) {
2383     limit.rlim_cur = RLIMIT_NOFILE_MINIMUM;
2384     limit.rlim_max = RLIMIT_NOFILE_MINIMUM;
2385     setrlimit(RLIMIT_NOFILE, &limit);
2386     limit.rlim_cur = 0;
2387     limit.rlim_max = 0;
2388     getrlimit(RLIMIT_NOFILE, &limit);
2389   }
2390   GRN_LOG(ctx, GRN_LOG_NOTICE,
2391           "RLIMIT_NOFILE(%" GRN_FMT_LLD ",%" GRN_FMT_LLD ")",
2392           (long long int)limit.rlim_cur, (long long int)limit.rlim_max);
2393 #endif /* WIN32 */
2394 }
2395 
2396 static grn_thread_func_result CALLBACK
h_worker(void * arg)2397 h_worker(void *arg)
2398 {
2399   ht_context hc;
2400   grn_ctx ctx_, *ctx = &ctx_;
2401   grn_ctx_init(ctx, 0);
2402   grn_ctx_use(ctx, (grn_obj *)arg);
2403   grn_ctx_recv_handler_set(ctx, h_output, &hc);
2404   MUTEX_LOCK_ENSURE(ctx, q_mutex);
2405   GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread start (%d/%d)",
2406           n_floating_threads, n_running_threads);
2407   while (n_running_threads <= max_n_floating_threads &&
2408          grn_gctx.stat != GRN_CTX_QUIT) {
2409     grn_obj *msg;
2410     if (ctx->rc == GRN_CANCEL) {
2411       ctx->rc = GRN_SUCCESS;
2412     }
2413     n_floating_threads++;
2414     while (!(msg = (grn_obj *)grn_com_queue_deque(&grn_gctx, &ctx_new))) {
2415       COND_WAIT(q_cond, q_mutex);
2416       if (grn_gctx.stat == GRN_CTX_QUIT) {
2417         n_floating_threads--;
2418         goto exit;
2419       }
2420       if (n_running_threads > max_n_floating_threads) {
2421         n_floating_threads--;
2422         goto exit;
2423       }
2424     }
2425     n_floating_threads--;
2426     MUTEX_UNLOCK(q_mutex);
2427     hc.msg = (grn_msg *)msg;
2428     hc.in_body = GRN_FALSE;
2429     hc.is_chunked = GRN_FALSE;
2430     do_htreq(ctx, &hc);
2431     MUTEX_LOCK_ENSURE(ctx, q_mutex);
2432   }
2433 exit :
2434   n_running_threads--;
2435   GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread end (%d/%d)",
2436           n_floating_threads, n_running_threads);
2437   if (grn_gctx.stat == GRN_CTX_QUIT) {
2438     break_accept_event_loop(ctx);
2439   }
2440   grn_ctx_fin(ctx);
2441   MUTEX_UNLOCK(q_mutex);
2442   return GRN_THREAD_FUNC_RETURN_VALUE;
2443 }
2444 
2445 static void
h_handler(grn_ctx * ctx,grn_obj * msg)2446 h_handler(grn_ctx *ctx, grn_obj *msg)
2447 {
2448   grn_com *com = ((grn_msg *)msg)->u.peer;
2449   if (ctx->rc) {
2450     grn_com_close(ctx, com);
2451     grn_msg_close(ctx, msg);
2452   } else {
2453     grn_sock fd = com->fd;
2454     void *arg = com->ev->opaque;
2455     /* if not keep alive connection */
2456     grn_com_event_del(ctx, com->ev, fd);
2457     ((grn_msg *)msg)->u.fd = fd;
2458     MUTEX_LOCK_ENSURE(ctx, q_mutex);
2459     grn_com_queue_enque(ctx, &ctx_new, (grn_com_queue_entry *)msg);
2460     if (n_floating_threads == 0 && n_running_threads < max_n_floating_threads) {
2461       grn_thread thread;
2462       n_running_threads++;
2463       if (THREAD_CREATE(thread, h_worker, arg)) {
2464         n_running_threads--;
2465         SERR("pthread_create");
2466       }
2467     }
2468     COND_SIGNAL(q_cond);
2469     MUTEX_UNLOCK(q_mutex);
2470   }
2471 }
2472 
2473 static int
h_server(char * path)2474 h_server(char *path)
2475 {
2476   int exit_code = EXIT_FAILURE;
2477   grn_ctx ctx_, *ctx = &ctx_;
2478   grn_ctx_init(ctx, 0);
2479   GRN_COM_QUEUE_INIT(&ctx_new);
2480   GRN_COM_QUEUE_INIT(&ctx_old);
2481   check_rlimit_nofile(ctx);
2482   GRN_TEXT_INIT(&http_response_server_line, 0);
2483   grn_text_printf(ctx,
2484                   &http_response_server_line,
2485                   "Server: %s/%s\r\n",
2486                   grn_get_package_label(),
2487                   grn_get_version());
2488   exit_code = start_service(ctx, path, NULL, h_handler);
2489   GRN_OBJ_FIN(ctx, &http_response_server_line);
2490   grn_ctx_fin(ctx);
2491   return exit_code;
2492 }
2493 
2494 static grn_thread_func_result CALLBACK
g_worker(void * arg)2495 g_worker(void *arg)
2496 {
2497   MUTEX_LOCK_ENSURE(NULL, q_mutex);
2498   GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread start (%d/%d)",
2499           n_floating_threads, n_running_threads);
2500   while (n_running_threads <= max_n_floating_threads &&
2501          grn_gctx.stat != GRN_CTX_QUIT) {
2502     grn_ctx *ctx;
2503     grn_edge *edge;
2504     n_floating_threads++;
2505     while (!(edge = (grn_edge *)grn_com_queue_deque(&grn_gctx, &ctx_new))) {
2506       COND_WAIT(q_cond, q_mutex);
2507       if (grn_gctx.stat == GRN_CTX_QUIT) {
2508         n_floating_threads--;
2509         goto exit;
2510       }
2511       if (n_running_threads > max_n_floating_threads) {
2512         n_floating_threads--;
2513         goto exit;
2514       }
2515     }
2516     ctx = &edge->ctx;
2517     n_floating_threads--;
2518     if (edge->stat == EDGE_DOING) { continue; }
2519     if (edge->stat == EDGE_WAIT) {
2520       edge->stat = EDGE_DOING;
2521       while (!GRN_COM_QUEUE_EMPTYP(&edge->recv_new)) {
2522         grn_obj *msg;
2523         MUTEX_UNLOCK(q_mutex);
2524         /* if (edge->flags == GRN_EDGE_WORKER) */
2525         while (ctx->stat != GRN_CTX_QUIT &&
2526                (edge->msg = (grn_msg *)grn_com_queue_deque(ctx, &edge->recv_new))) {
2527           grn_com_header *header = &edge->msg->header;
2528           msg = (grn_obj *)edge->msg;
2529           switch (header->proto) {
2530           case GRN_COM_PROTO_MBREQ :
2531             do_mbreq(ctx, edge);
2532             break;
2533           case GRN_COM_PROTO_GQTP :
2534             grn_ctx_send(ctx, GRN_BULK_HEAD(msg), GRN_BULK_VSIZE(msg), header->flags);
2535             ERRCLR(ctx);
2536             if (ctx->rc == GRN_CANCEL) {
2537               ctx->rc = GRN_SUCCESS;
2538             }
2539             break;
2540           default :
2541             ctx->stat = GRN_CTX_QUIT;
2542             break;
2543           }
2544           grn_msg_close(ctx, msg);
2545         }
2546         while ((msg = (grn_obj *)grn_com_queue_deque(ctx, &edge->send_old))) {
2547           grn_msg_close(ctx, msg);
2548         }
2549         MUTEX_LOCK_ENSURE(ctx, q_mutex);
2550         if (ctx->stat == GRN_CTX_QUIT || edge->stat == EDGE_ABORT) { break; }
2551       }
2552     }
2553     if (ctx->stat == GRN_CTX_QUIT || edge->stat == EDGE_ABORT) {
2554       grn_com_queue_enque(&grn_gctx, &ctx_old, (grn_com_queue_entry *)edge);
2555       edge->stat = EDGE_ABORT;
2556     } else {
2557       edge->stat = EDGE_IDLE;
2558     }
2559   };
2560 exit :
2561   n_running_threads--;
2562   GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, "thread end (%d/%d)",
2563           n_floating_threads, n_running_threads);
2564   MUTEX_UNLOCK(q_mutex);
2565   return GRN_THREAD_FUNC_RETURN_VALUE;
2566 }
2567 
2568 static void
g_dispatcher(grn_ctx * ctx,grn_edge * edge)2569 g_dispatcher(grn_ctx *ctx, grn_edge *edge)
2570 {
2571   MUTEX_LOCK_ENSURE(ctx, q_mutex);
2572   if (edge->stat == EDGE_IDLE) {
2573     grn_com_queue_enque(ctx, &ctx_new, (grn_com_queue_entry *)edge);
2574     edge->stat = EDGE_WAIT;
2575     if (n_floating_threads == 0 && n_running_threads < max_n_floating_threads) {
2576       grn_thread thread;
2577       n_running_threads++;
2578       if (THREAD_CREATE(thread, g_worker, NULL)) {
2579         n_running_threads--;
2580         SERR("pthread_create");
2581       }
2582     }
2583     COND_SIGNAL(q_cond);
2584   }
2585   MUTEX_UNLOCK(q_mutex);
2586 }
2587 
2588 static void
g_output(grn_ctx * ctx,int flags,void * arg)2589 g_output(grn_ctx *ctx, int flags, void *arg)
2590 {
2591   grn_edge *edge = arg;
2592   grn_com *com = edge->com;
2593   grn_msg *req = edge->msg, *msg = (grn_msg *)ctx->impl->output.buf;
2594   msg->edge_id = req->edge_id;
2595   msg->header.proto = req->header.proto == GRN_COM_PROTO_MBREQ
2596     ? GRN_COM_PROTO_MBRES : req->header.proto;
2597   if (ctx->rc != GRN_SUCCESS && GRN_BULK_VSIZE(ctx->impl->output.buf) == 0) {
2598     GRN_TEXT_PUTS(ctx, ctx->impl->output.buf, ctx->errbuf);
2599   }
2600   if (grn_msg_send(ctx, (grn_obj *)msg,
2601                    (flags & GRN_CTX_MORE) ? GRN_CTX_MORE : GRN_CTX_TAIL)) {
2602     edge->stat = EDGE_ABORT;
2603   }
2604   ctx->impl->output.buf = grn_msg_open(ctx, com, &edge->send_old);
2605 }
2606 
2607 static void
g_handler(grn_ctx * ctx,grn_obj * msg)2608 g_handler(grn_ctx *ctx, grn_obj *msg)
2609 {
2610   grn_edge *edge;
2611   grn_com *com = ((grn_msg *)msg)->u.peer;
2612   if (ctx->rc) {
2613     if (com->has_sid) {
2614       if ((edge = com->opaque)) {
2615         MUTEX_LOCK_ENSURE(ctx, q_mutex);
2616         if (edge->stat == EDGE_IDLE) {
2617           grn_com_queue_enque(ctx, &ctx_old, (grn_com_queue_entry *)edge);
2618         }
2619         edge->stat = EDGE_ABORT;
2620         MUTEX_UNLOCK(q_mutex);
2621       } else {
2622         grn_com_close(ctx, com);
2623       }
2624     }
2625     grn_msg_close(ctx, msg);
2626   } else {
2627     int added;
2628     edge = grn_edges_add(ctx, &((grn_msg *)msg)->edge_id, &added);
2629     if (added) {
2630       grn_ctx_init(&edge->ctx, 0);
2631       GRN_COM_QUEUE_INIT(&edge->recv_new);
2632       GRN_COM_QUEUE_INIT(&edge->send_old);
2633       grn_ctx_use(&edge->ctx, (grn_obj *)com->ev->opaque);
2634       grn_ctx_recv_handler_set(&edge->ctx, g_output, edge);
2635       com->opaque = edge;
2636       grn_obj_close(&edge->ctx, edge->ctx.impl->output.buf);
2637       edge->ctx.impl->output.buf =
2638         grn_msg_open(&edge->ctx, com, &edge->send_old);
2639       edge->com = com;
2640       edge->stat = EDGE_IDLE;
2641       edge->flags = GRN_EDGE_WORKER;
2642     }
2643     if (edge->ctx.stat == GRN_CTX_QUIT || edge->stat == EDGE_ABORT) {
2644       grn_msg_close(ctx, msg);
2645     } else {
2646       grn_com_queue_enque(ctx, &edge->recv_new, (grn_com_queue_entry *)msg);
2647       g_dispatcher(ctx, edge);
2648     }
2649   }
2650 }
2651 
2652 static int
g_server(char * path)2653 g_server(char *path)
2654 {
2655   int exit_code = EXIT_FAILURE;
2656   grn_ctx ctx_, *ctx = &ctx_;
2657   grn_ctx_init(ctx, 0);
2658   GRN_COM_QUEUE_INIT(&ctx_new);
2659   GRN_COM_QUEUE_INIT(&ctx_old);
2660   check_rlimit_nofile(ctx);
2661   exit_code = start_service(ctx, path, g_dispatcher, g_handler);
2662   grn_ctx_fin(ctx);
2663   return exit_code;
2664 }
2665 
2666 enum {
2667   ACTION_USAGE = 1,
2668   ACTION_VERSION,
2669   ACTION_SHOW_CONFIG,
2670   ACTION_ERROR
2671 };
2672 
2673 #define ACTION_MASK          (0x0f)
2674 #define MODE_MASK            (0xf0)
2675 #define FLAG_MODE_ALONE      (1 << 4)
2676 #define FLAG_MODE_CLIENT     (1 << 5)
2677 #define FLAG_MODE_DAEMON     (1 << 6)
2678 #define FLAG_MODE_SERVER     (1 << 7)
2679 #define FLAG_NEW_DB     (1 << 8)
2680 #define FLAG_USE_WINDOWS_EVENT_LOG (1 << 9)
2681 
2682 static uint32_t
get_core_number(void)2683 get_core_number(void)
2684 {
2685 #ifdef WIN32
2686   SYSTEM_INFO sinfo;
2687   GetSystemInfo(&sinfo);
2688   return sinfo.dwNumberOfProcessors;
2689 #else /* WIN32 */
2690 #  ifdef _SC_NPROCESSORS_CONF
2691   return sysconf(_SC_NPROCESSORS_CONF);
2692 #  else
2693   int n_processors;
2694   size_t length = sizeof(n_processors);
2695   int mib[] = {CTL_HW, HW_NCPU};
2696   if (sysctl(mib, sizeof(mib) / sizeof(mib[0]),
2697              &n_processors, &length, NULL, 0) == 0 &&
2698       length == sizeof(n_processors) &&
2699       0 < n_processors) {
2700     return n_processors;
2701   } else {
2702     return 1;
2703   }
2704 #  endif /* _SC_NPROCESSORS_CONF */
2705 #endif /* WIN32 */
2706 }
2707 
2708 /*
2709  * The length of each line, including an end-of-line, in config file should be
2710  * shorter than (CONFIG_FILE_BUF_SIZE - 1) bytes. Too long lines are ignored.
2711  * Note that both '\r' and '\n' are handled as end-of-lines.
2712  *
2713  * '#' and ';' are special symbols to start comments. A comment ends with an
2714  * end-of-line.
2715  *
2716  * Format: name[=value]
2717  * - Preceding/trailing white-spaces of each line are removed.
2718  * - White-spaces aroung '=' are removed.
2719  * - name does not allow white-spaces.
2720  */
2721 #define CONFIG_FILE_BUF_SIZE 4096
2722 #define CONFIG_FILE_MAX_NAME_LENGTH 128
2723 #define CONFIG_FILE_MAX_VALUE_LENGTH 2048
2724 
2725 typedef enum {
2726   CONFIG_FILE_SUCCESS,
2727   CONFIG_FILE_FORMAT_ERROR,
2728   CONFIG_FILE_FOPEN_ERROR,
2729   CONFIG_FILE_MALLOC_ERROR,
2730   CONFIG_FILE_ATEXIT_ERROR
2731 } config_file_status;
2732 
2733 /*
2734  * The node type of a linked list for storing values. Note that a value is
2735  * stored in the extra space of an object.
2736  */
2737 typedef struct _config_file_entry {
2738   struct _config_file_entry *next;
2739 } config_file_entry;
2740 
2741 static config_file_entry *config_file_entry_head = NULL;
2742 
2743 static void
config_file_clear(void)2744 config_file_clear(void) {
2745   while (config_file_entry_head) {
2746     config_file_entry *next = config_file_entry_head->next;
2747     free(config_file_entry_head);
2748     config_file_entry_head = next;
2749   }
2750 }
2751 
2752 static config_file_status
config_file_register(const char * path,const grn_str_getopt_opt * opts,int * flags,const char * name,size_t name_length,const char * value,size_t value_length)2753 config_file_register(const char *path, const grn_str_getopt_opt *opts,
2754                      int *flags, const char *name, size_t name_length,
2755                      const char *value, size_t value_length)
2756 {
2757   char name_buf[CONFIG_FILE_MAX_NAME_LENGTH + 3];
2758   config_file_entry *entry = NULL;
2759   char *args[4];
2760 
2761   name_buf[0] = name_buf[1] = '-';
2762   grn_strcpy(name_buf + 2, CONFIG_FILE_MAX_NAME_LENGTH + 1, name);
2763 
2764   if (value) {
2765     const size_t entry_size = sizeof(config_file_entry) + value_length + 1;
2766     entry = (config_file_entry *)malloc(entry_size);
2767     if (!entry) {
2768       fprintf(stderr, "memory allocation failed: %u bytes\n",
2769               (unsigned int)entry_size);
2770       return CONFIG_FILE_MALLOC_ERROR;
2771     }
2772     grn_strcpy((char *)(entry + 1), value_length + 1, value);
2773     entry->next = config_file_entry_head;
2774     if (!config_file_entry_head) {
2775       if (atexit(config_file_clear)) {
2776         free(entry);
2777         return CONFIG_FILE_ATEXIT_ERROR;
2778       }
2779     }
2780     config_file_entry_head = entry;
2781   }
2782 
2783   args[0] = (char *)path;
2784   args[1] = name_buf;
2785   args[2] = entry ? (char *)(entry + 1) : NULL;
2786   args[3] = NULL;
2787   grn_str_getopt(entry ? 3 : 2, args, opts, flags);
2788   return CONFIG_FILE_SUCCESS;
2789 }
2790 
2791 static config_file_status
config_file_parse(const char * path,const grn_str_getopt_opt * opts,int * flags,char * buf)2792 config_file_parse(const char *path, const grn_str_getopt_opt *opts,
2793                   int *flags, char *buf) {
2794   char *ptr, *name, *value;
2795   size_t name_length, value_length;
2796 
2797   while (isspace((unsigned char)*buf)) {
2798     buf++;
2799   }
2800 
2801   ptr = buf;
2802   while (*ptr && *ptr != '#' && *ptr != ';') {
2803     ptr++;
2804   }
2805 
2806   do {
2807     *ptr-- = '\0';
2808   } while (ptr >= buf && isspace((unsigned char)*ptr));
2809 
2810   if (!*buf) {
2811     return CONFIG_FILE_SUCCESS;
2812   }
2813 
2814   name = ptr = buf;
2815   while (*ptr && !isspace((unsigned char)*ptr) && *ptr != '=') {
2816     ptr++;
2817   }
2818   while (isspace((unsigned char)*ptr)) {
2819     *ptr++ = '\0';
2820   }
2821 
2822   name_length = strlen(name);
2823   if (name_length == 0) {
2824     return CONFIG_FILE_SUCCESS;
2825   } else if (name_length > CONFIG_FILE_MAX_NAME_LENGTH) {
2826     fprintf(stderr, "too long name in config file: %u bytes\n",
2827             (unsigned int)name_length);
2828     return CONFIG_FILE_FORMAT_ERROR;
2829   }
2830 
2831   if (*ptr == '=') {
2832     *ptr++ = '\0';
2833     while (isspace((unsigned char)*ptr)) {
2834       ptr++;
2835     }
2836     value = ptr;
2837   } else if (*ptr) {
2838     fprintf(stderr, "invalid name in config file\n");
2839     return CONFIG_FILE_FORMAT_ERROR;
2840   } else {
2841     value = NULL;
2842   }
2843 
2844   value_length = value ? strlen(value) : 0;
2845   if (value_length > CONFIG_FILE_MAX_VALUE_LENGTH) {
2846     fprintf(stderr, "too long value in config file: %u bytes\n",
2847             (unsigned int)value_length);
2848     return CONFIG_FILE_FORMAT_ERROR;
2849   }
2850 
2851   return config_file_register(path, opts, flags,
2852                               name, name_length, value, value_length);
2853 }
2854 
2855 static config_file_status
config_file_load(const char * path,const grn_str_getopt_opt * opts,int * flags)2856 config_file_load(const char *path, const grn_str_getopt_opt *opts, int *flags)
2857 {
2858   config_file_status status = CONFIG_FILE_SUCCESS;
2859   char buf[CONFIG_FILE_BUF_SIZE];
2860   size_t length = 0;
2861   FILE * const file = fopen(path, "rb");
2862   if (!file) {
2863     return CONFIG_FILE_FOPEN_ERROR;
2864   }
2865 
2866   for ( ; ; ) {
2867     int c = fgetc(file);
2868     if (c == '\r' || c == '\n' || c == EOF) {
2869       if (length < sizeof(buf) - 1) {
2870         buf[length] = '\0';
2871         status = config_file_parse(path, opts, flags, buf);
2872         if (status != CONFIG_FILE_SUCCESS) {
2873           break;
2874         }
2875       }
2876       length = 0;
2877     } else if (c == '\0') {
2878       fprintf(stderr, "prohibited '\\0' in config file: %s\n", path);
2879       status = CONFIG_FILE_FORMAT_ERROR;
2880       break;
2881     } else {
2882       if (length < sizeof(buf) - 1) {
2883         buf[length] = (char)c;
2884       }
2885       length++;
2886     }
2887 
2888     if (c == EOF) {
2889       break;
2890     }
2891   }
2892 
2893   fclose(file);
2894   return status;
2895 }
2896 
2897 static const int default_http_port = DEFAULT_HTTP_PORT;
2898 static const int default_gqtp_port = DEFAULT_GQTP_PORT;
2899 static grn_encoding default_encoding = GRN_ENC_DEFAULT;
2900 static uint32_t default_max_n_threads = DEFAULT_MAX_N_FLOATING_THREADS;
2901 static const grn_log_level default_log_level = GRN_LOG_DEFAULT_LEVEL;
2902 static const char * const default_protocol = "gqtp";
2903 static const char *default_hostname = "localhost";
2904 static const char * const default_dest = "localhost";
2905 static const char *default_log_path = "";
2906 static const char *default_query_log_path = "";
2907 static const char *default_config_path = "";
2908 static const char *default_document_root = "";
2909 static grn_command_version default_default_command_version =
2910     GRN_COMMAND_VERSION_DEFAULT;
2911 static int64_t default_default_match_escalation_threshold = 0;
2912 static const char * const default_bind_address = "0.0.0.0";
2913 static double default_default_request_timeout = 0.0;
2914 
2915 static void
init_default_hostname(void)2916 init_default_hostname(void)
2917 {
2918   static char hostname[HOST_NAME_MAX + 1];
2919   struct addrinfo hints, *result;
2920 
2921   hostname[HOST_NAME_MAX] = '\0';
2922   if (gethostname(hostname, HOST_NAME_MAX) == -1)
2923     return;
2924 
2925   memset(&hints, 0, sizeof(hints));
2926   hints.ai_family = AF_UNSPEC;
2927   hints.ai_socktype = SOCK_STREAM;
2928   hints.ai_addr = NULL;
2929   hints.ai_canonname = NULL;
2930   hints.ai_next = NULL;
2931   if (getaddrinfo(hostname, NULL, &hints, &result) != 0)
2932     return;
2933   freeaddrinfo(result);
2934 
2935   default_hostname = hostname;
2936 }
2937 
2938 static void
init_default_settings(void)2939 init_default_settings(void)
2940 {
2941   output = stdout;
2942 
2943   default_encoding = grn_encoding_parse(GRN_DEFAULT_ENCODING);
2944 
2945   {
2946     const uint32_t n_cores = get_core_number();
2947     if (n_cores != 0) {
2948       default_max_n_threads = n_cores;
2949     }
2950   }
2951 
2952   init_default_hostname();
2953 
2954   default_log_path = grn_default_logger_get_path();
2955   default_query_log_path = grn_default_query_logger_get_path();
2956 
2957   default_config_path = getenv("GRN_CONFIG_PATH");
2958   if (!default_config_path) {
2959     default_config_path = GRN_CONFIG_PATH;
2960     if (!default_config_path) {
2961       default_config_path = "";
2962     }
2963   }
2964 
2965 #ifdef WIN32
2966   {
2967     static char windows_default_document_root[PATH_MAX];
2968     size_t document_root_length = strlen(grn_windows_base_dir()) + 1 +
2969         strlen(GRN_DEFAULT_RELATIVE_DOCUMENT_ROOT) + 1;
2970     if (document_root_length >= PATH_MAX) {
2971       fprintf(stderr, "can't use default root: too long path\n");
2972     } else {
2973       grn_strcpy(windows_default_document_root, PATH_MAX,
2974                  grn_windows_base_dir());
2975       grn_strcat(windows_default_document_root, PATH_MAX,
2976                  "/");
2977       grn_strcat(windows_default_document_root, PATH_MAX,
2978                  GRN_DEFAULT_RELATIVE_DOCUMENT_ROOT);
2979       default_document_root = windows_default_document_root;
2980     }
2981   }
2982 #else
2983   default_document_root = GRN_DEFAULT_DOCUMENT_ROOT;
2984 #endif
2985 
2986   default_default_command_version = grn_get_default_command_version();
2987   default_default_match_escalation_threshold =
2988     grn_get_default_match_escalation_threshold();
2989   default_default_request_timeout = grn_get_default_request_timeout();
2990 }
2991 
2992 static void
show_config(FILE * out,const grn_str_getopt_opt * opts,int flags)2993 show_config(FILE *out, const grn_str_getopt_opt *opts, int flags)
2994 {
2995   const grn_str_getopt_opt *o;
2996 
2997   for (o = opts; o->opt || o->longopt; o++) {
2998     switch (o->op) {
2999     case GETOPT_OP_NONE:
3000       if (o->arg && *o->arg) {
3001         if (o->longopt && strcmp(o->longopt, "config-path")) {
3002           fprintf(out, "%s=%s\n", o->longopt, *o->arg);
3003         }
3004       }
3005       break;
3006     case GETOPT_OP_ON:
3007       if (flags & o->flag) {
3008         goto no_arg;
3009       }
3010       break;
3011     case GETOPT_OP_OFF:
3012       if (!(flags & o->flag)) {
3013         goto no_arg;
3014       }
3015       break;
3016     case GETOPT_OP_UPDATE:
3017       if (flags == o->flag) {
3018       no_arg:
3019         if (o->longopt) {
3020           fprintf(out, "%s\n", o->longopt);
3021         }
3022       }
3023       break;
3024     }
3025   }
3026 }
3027 
3028 static void
show_version(void)3029 show_version(void)
3030 {
3031   printf("%s %s [",
3032          grn_get_package_label(),
3033          grn_get_version());
3034 
3035   /* FIXME: Should we detect host information dynamically on Windows? */
3036 #ifdef HOST_OS
3037   printf("%s,", HOST_OS);
3038 #endif
3039 #ifdef HOST_CPU
3040   printf("%s,", HOST_CPU);
3041 #endif
3042   printf("%s", GRN_DEFAULT_ENCODING);
3043 
3044   printf(",match-escalation-threshold=%" GRN_FMT_LLD,
3045          grn_get_default_match_escalation_threshold());
3046 
3047 #ifndef NO_NFKC
3048   printf(",nfkc");
3049 #endif
3050 #ifdef GRN_WITH_MECAB
3051   printf(",mecab");
3052 #endif
3053 #ifdef GRN_WITH_MESSAGE_PACK
3054   printf(",msgpack");
3055 #endif
3056 #ifdef GRN_WITH_MRUBY
3057   printf(",mruby");
3058 #endif
3059 #ifdef GRN_WITH_ONIGMO
3060   printf(",onigmo");
3061 #endif
3062 #ifdef GRN_WITH_ZLIB
3063   printf(",zlib");
3064 #endif
3065 #ifdef GRN_WITH_LZ4
3066   printf(",lz4");
3067 #endif
3068 #ifdef GRN_WITH_ZSTD
3069   printf(",zstd");
3070 #endif
3071 #ifdef USE_KQUEUE
3072   printf(",kqueue");
3073 #endif
3074 #ifdef USE_EPOLL
3075   printf(",epoll");
3076 #endif
3077 #ifdef USE_POLL
3078   printf(",poll");
3079 #endif
3080   printf("]\n");
3081 
3082 #ifdef CONFIGURE_OPTIONS
3083   printf("\n");
3084   printf("configure options: <%s>\n", CONFIGURE_OPTIONS);
3085 #endif
3086 }
3087 
3088 static void
show_usage(FILE * output)3089 show_usage(FILE *output)
3090 {
3091   uint32_t default_cache_limit = GRN_CACHE_DEFAULT_MAX_N_ENTRIES;
3092 
3093   fprintf(output,
3094           "Usage: groonga [options...] [dest]\n"
3095           "\n"
3096           "Mode options: (default: standalone)\n"
3097           " By default, groonga runs in standalone mode.\n"
3098           "  -c:   run in client mode\n"
3099           "  -s:   run in server mode\n"
3100           "  -d:   run in daemon mode\n"
3101           "\n"
3102           "Database creation options:\n"
3103           "  -n:                  create new database (except client mode)\n"
3104           "  -e, --encoding <encoding>:\n"
3105           "                       specify encoding for new database\n"
3106           "                       [none|euc|utf8|sjis|latin1|koi8r] (default: %s)\n"
3107           "\n"
3108           "Standalone/client options:\n"
3109           "      --file <path>:          read commands from specified file\n"
3110           "      --input-fd <FD>:        read commands from specified file descriptor\n"
3111           "                              --file has a prioriry over --input-fd\n"
3112           "      --output-fd <FD>:       output response to specified file descriptor\n"
3113           "  -p, --port <port number>:   specify server port number (client mode only)\n"
3114           "                              (default: %d)\n"
3115           "\n"
3116           "Server/daemon options:\n"
3117           "      --bind-address <ip/hostname>:\n"
3118           "                                specify server address to bind\n"
3119           "                                (default: %s)\n"
3120           "  -p, --port <port number>:     specify server port number\n"
3121           "                                (HTTP default: %d, GQTP default: %d)\n"
3122           "  -i, --server-id <ip/hostname>:\n"
3123           "                                specify server ID address (default: %s)\n"
3124           "      --protocol <protocol>:    specify server protocol to listen\n"
3125           "                                [gqtp|http|memcached] (default: %s)\n"
3126           "      --document-root <path>:   specify document root path (http only)\n"
3127           "                                (default: %s)\n"
3128           "      --cache-limit <limit>:    specify max number of cache data (default: %u)\n"
3129           "  -t, --max-threads <max threads>:\n"
3130           "                                specify max number of threads (default: %u)\n"
3131           "      --pid-path <path>:        specify file to write process ID to\n"
3132           "                                (daemon mode only)\n"
3133           "      --default-request-timeout <timeout>:\n"
3134           "                                specify the default request timeout in seconds\n"
3135           "                                (default: %f)\n"
3136           "      --cache-base-path <path>: specify the cache base path\n"
3137           "                                You can make cache persistent by this option\n"
3138           "                                You must specify path on memory file system\n"
3139           "                                (default: none; disabled)\n"
3140           "\n"
3141           "Memcached options:\n"
3142           "      --memcached-column <column>:\n"
3143           "                                specify column to access by memcached protocol\n"
3144           "                                The column must be text type column and\n"
3145           "                                its table must be not NO_KEY table\n"
3146           "\n"
3147           "Logging options:\n"
3148           "  -l, --log-level <log level>:\n"
3149           "                           specify log level\n"
3150           "                           [none|emergency|alert|critical|\n"
3151           "                            error|warning|notice|info|debug|dump]\n"
3152           "                           (default: %s)\n"
3153           "      --log-path <path>:   specify log path\n"
3154           "                           (default: %s)\n"
3155           "      --log-rotate-threshold-size <threshold>:\n"
3156           "                           specify threshold for log rotate\n"
3157           "                           Log file is rotated when\n"
3158           "                           log file size is larger than or\n"
3159           "                           equals to the threshold\n"
3160           "                           (default: 0; disabled)\n"
3161 #ifdef WIN32
3162           "      --use-windows-event-log:\n"
3163           "                           report logs as Windows events\n"
3164 #endif /* WIN32 */
3165           "      --query-log-path <path>:\n"
3166           "                           specify query log path\n"
3167           "                           (default: %s)\n"
3168           "      --query-log-rotate-threshold-size <threshold>:\n"
3169           "                           specify threshold for query log rotate\n"
3170           "                           Query log file is rotated when\n"
3171           "                           query log file size is larger than or\n"
3172           "                           equals to the threshold\n"
3173           "                           (default: 0; disabled)\n"
3174           "\n"
3175           "Common options:\n"
3176           "      --working-directory <path>:\n"
3177           "                       specify working directory path\n"
3178           "                       (none)\n"
3179           "      --config-path <path>:\n"
3180           "                       specify config file path\n"
3181           "                       (default: %s)\n"
3182           "      --default-command-version <version>:\n"
3183           "                       specify default command version (default: %d)\n"
3184           "      --default-match-escalation-threshold <threshold>:\n"
3185           "                       specify default match escalation threshold"
3186           " (default: %" GRN_FMT_LLD ")\n"
3187           "\n"
3188           "      --show-config:   show config\n"
3189           "  -h, --help:          show usage\n"
3190           "      --version:       show groonga version\n"
3191           "\n"
3192           "dest:\n"
3193           "  <db pathname> [<commands>]: in standalone mode\n"
3194           "  <db pathname>: in server/daemon mode\n"
3195           "  <dest hostname> [<commands>]: in client mode (default: %s)\n",
3196           grn_encoding_to_string(default_encoding),
3197           default_gqtp_port, default_bind_address,
3198           default_http_port, default_gqtp_port, default_hostname, default_protocol,
3199           default_document_root, default_cache_limit, default_max_n_threads,
3200           default_default_request_timeout,
3201           grn_log_level_to_string(default_log_level),
3202           default_log_path, default_query_log_path,
3203           default_config_path, default_default_command_version,
3204           (long long int)default_default_match_escalation_threshold,
3205           default_dest);
3206 }
3207 
3208 int
main(int argc,char ** argv)3209 main(int argc, char **argv)
3210 {
3211   const char *port_arg = NULL;
3212   const char *encoding_arg = NULL;
3213   const char *max_n_threads_arg = NULL;
3214   const char *log_level_arg = NULL;
3215   const char *bind_address_arg = NULL;
3216   const char *hostname_arg = NULL;
3217   const char *protocol_arg = NULL;
3218   const char *log_path_arg = GRN_LOG_PATH;
3219   const char *log_rotate_threshold_size_arg = NULL;
3220   const char *query_log_path_arg = NULL;
3221   const char *query_log_rotate_threshold_size_arg = NULL;
3222   const char *cache_limit_arg = NULL;
3223   const char *document_root_arg = NULL;
3224   const char *default_command_version_arg = NULL;
3225   const char *default_match_escalation_threshold_arg = NULL;
3226   const char *input_fd_arg = NULL;
3227   const char *output_fd_arg = NULL;
3228   const char *working_directory_arg = NULL;
3229   const char *config_path = NULL;
3230   const char *default_request_timeout_arg = NULL;
3231   const char *cache_base_path = NULL;
3232   int exit_code = EXIT_SUCCESS;
3233   int i;
3234   int flags = 0;
3235   uint32_t cache_limit = 0;
3236   grn_command_version default_command_version;
3237   int64_t default_match_escalation_threshold = 0;
3238   double default_request_timeout = 0.0;
3239   grn_bool need_line_editor = GRN_FALSE;
3240   static grn_str_getopt_opt opts[] = {
3241     {'p', "port", NULL, 0, GETOPT_OP_NONE},
3242     {'e', "encoding", NULL, 0, GETOPT_OP_NONE},
3243     {'t', "max-threads", NULL, 0, GETOPT_OP_NONE},
3244     {'h', "help", NULL, ACTION_USAGE, GETOPT_OP_UPDATE},
3245     {'c', NULL, NULL, FLAG_MODE_CLIENT, GETOPT_OP_ON},
3246     {'d', NULL, NULL, FLAG_MODE_DAEMON, GETOPT_OP_ON},
3247     {'s', NULL, NULL, FLAG_MODE_SERVER, GETOPT_OP_ON},
3248     {'l', "log-level", NULL, 0, GETOPT_OP_NONE},
3249     {'i', "server-id", NULL, 0, GETOPT_OP_NONE},
3250     {'n', NULL, NULL, FLAG_NEW_DB, GETOPT_OP_ON},
3251     {'\0', "protocol", NULL, 0, GETOPT_OP_NONE},
3252     {'\0', "version", NULL, ACTION_VERSION, GETOPT_OP_UPDATE},
3253     {'\0', "log-path", NULL, 0, GETOPT_OP_NONE},
3254     {'\0', "log-rotate-threshold-size", NULL, 0, GETOPT_OP_NONE},
3255     {'\0', "query-log-path", NULL, 0, GETOPT_OP_NONE},
3256     {'\0', "query-log-rotate-threshold-size", NULL, 0, GETOPT_OP_NONE},
3257     {'\0', "pid-path", NULL, 0, GETOPT_OP_NONE},
3258     {'\0', "config-path", NULL, 0, GETOPT_OP_NONE},
3259     {'\0', "show-config", NULL, ACTION_SHOW_CONFIG, GETOPT_OP_UPDATE},
3260     {'\0', "cache-limit", NULL, 0, GETOPT_OP_NONE},
3261     {'\0', "file", NULL, 0, GETOPT_OP_NONE},
3262     {'\0', "document-root", NULL, 0, GETOPT_OP_NONE},
3263     {'\0', "default-command-version", NULL, 0, GETOPT_OP_NONE},
3264     {'\0', "default-match-escalation-threshold", NULL, 0, GETOPT_OP_NONE},
3265     {'\0', "bind-address", NULL, 0, GETOPT_OP_NONE},
3266     {'\0', "input-fd", NULL, 0, GETOPT_OP_NONE},
3267     {'\0', "output-fd", NULL, 0, GETOPT_OP_NONE},
3268     {'\0', "working-directory", NULL, 0, GETOPT_OP_NONE},
3269     {'\0', "use-windows-event-log", NULL,
3270      FLAG_USE_WINDOWS_EVENT_LOG, GETOPT_OP_ON},
3271     {'\0', "memcached-column", NULL, 0, GETOPT_OP_NONE},
3272     {'\0', "default-request-timeout", NULL, 0, GETOPT_OP_NONE},
3273     {'\0', "cache-base-path", NULL, 0, GETOPT_OP_NONE},
3274     {'\0', NULL, NULL, 0, 0}
3275   };
3276   opts[0].arg = &port_arg;
3277   opts[1].arg = &encoding_arg;
3278   opts[2].arg = &max_n_threads_arg;
3279   opts[7].arg = &log_level_arg;
3280   opts[8].arg = &hostname_arg;
3281   opts[10].arg = &protocol_arg;
3282   opts[12].arg = &log_path_arg;
3283   opts[13].arg = &log_rotate_threshold_size_arg;
3284   opts[14].arg = &query_log_path_arg;
3285   opts[15].arg = &query_log_rotate_threshold_size_arg;
3286   opts[16].arg = &pid_file_path;
3287   opts[17].arg = &config_path;
3288   opts[19].arg = &cache_limit_arg;
3289   opts[20].arg = &input_path;
3290   opts[21].arg = &document_root_arg;
3291   opts[22].arg = &default_command_version_arg;
3292   opts[23].arg = &default_match_escalation_threshold_arg;
3293   opts[24].arg = &bind_address_arg;
3294   opts[25].arg = &input_fd_arg;
3295   opts[26].arg = &output_fd_arg;
3296   opts[27].arg = &working_directory_arg;
3297   opts[29].arg = &memcached_column_name;
3298   opts[30].arg = &default_request_timeout_arg;
3299   opts[31].arg = &cache_base_path;
3300 
3301   reset_ready_notify_pipe();
3302 
3303   init_default_settings();
3304 
3305   /* only for parsing --config-path. */
3306   i = grn_str_getopt(argc, argv, opts, &flags);
3307   if (i < 0) {
3308     show_usage(stderr);
3309     return EXIT_FAILURE;
3310   }
3311 
3312   if (config_path) {
3313     const config_file_status status = config_file_load(config_path, opts, &flags);
3314     if (status == CONFIG_FILE_FOPEN_ERROR) {
3315       fprintf(stderr, "%s: can't open config file: %s (%s)\n",
3316               argv[0], config_path, strerror(errno));
3317       return EXIT_FAILURE;
3318     } else if (status != CONFIG_FILE_SUCCESS) {
3319       fprintf(stderr, "%s: failed to parse config file: %s (%s)\n",
3320               argv[0], config_path,
3321               (status == CONFIG_FILE_FORMAT_ERROR) ? "Invalid format" : strerror(errno));
3322       return EXIT_FAILURE;
3323     }
3324   } else if (*default_config_path) {
3325     const config_file_status status =
3326         config_file_load(default_config_path, opts, &flags);
3327     if (status != CONFIG_FILE_SUCCESS && status != CONFIG_FILE_FOPEN_ERROR) {
3328       fprintf(stderr, "%s: failed to parse config file: %s (%s)\n",
3329               argv[0], default_config_path,
3330               (status == CONFIG_FILE_FORMAT_ERROR) ? "Invalid format" : strerror(errno));
3331       return EXIT_FAILURE;
3332     }
3333   }
3334 
3335   if (working_directory_arg) {
3336     if (chdir(working_directory_arg) == -1) {
3337       fprintf(stderr, "%s: failed to change directory: %s: %s\n",
3338               argv[0], working_directory_arg, strerror(errno));
3339       return EXIT_FAILURE;
3340     }
3341   }
3342 
3343   if (cache_base_path) {
3344     grn_set_default_cache_base_path(cache_base_path);
3345   }
3346 
3347   /* ignore mode option in config file */
3348   flags = (flags == ACTION_ERROR) ? 0 : (flags & ~ACTION_MASK);
3349 
3350   i = grn_str_getopt(argc, argv, opts, &flags);
3351   if (i < 0) { flags = ACTION_ERROR; }
3352   switch (flags & ACTION_MASK) {
3353   case ACTION_VERSION :
3354     show_version();
3355     return EXIT_SUCCESS;
3356   case ACTION_USAGE :
3357     show_usage(output);
3358     return EXIT_SUCCESS;
3359   case ACTION_SHOW_CONFIG :
3360     show_config(output, opts, flags & ~ACTION_MASK);
3361     return EXIT_SUCCESS;
3362   case ACTION_ERROR :
3363     show_usage(stderr);
3364     return EXIT_FAILURE;
3365   }
3366 
3367   if ((flags & MODE_MASK) == 0) {
3368     flags |= FLAG_MODE_ALONE;
3369   }
3370 
3371   if (port_arg) {
3372     const char * const end = port_arg + strlen(port_arg);
3373     const char *rest = NULL;
3374     const int value = grn_atoi(port_arg, end, &rest);
3375     if (rest != end || value <= 0 || value > 65535) {
3376       fprintf(stderr, "invalid port number: <%s>\n", port_arg);
3377       return EXIT_FAILURE;
3378     }
3379     port = value;
3380   } else {
3381     if (protocol_arg) {
3382       if (*protocol_arg == 'h' || *protocol_arg == 'H') {
3383         port = default_http_port;
3384       }
3385     }
3386   }
3387 
3388   if (encoding_arg) {
3389     switch (*encoding_arg) {
3390     case 'n' :
3391     case 'N' :
3392       encoding = GRN_ENC_NONE;
3393       break;
3394     case 'e' :
3395     case 'E' :
3396       encoding = GRN_ENC_EUC_JP;
3397       break;
3398     case 'u' :
3399     case 'U' :
3400       encoding = GRN_ENC_UTF8;
3401       break;
3402     case 's' :
3403     case 'S' :
3404       encoding = GRN_ENC_SJIS;
3405       break;
3406     case 'l' :
3407     case 'L' :
3408       encoding = GRN_ENC_LATIN1;
3409       break;
3410     case 'k' :
3411     case 'K' :
3412       encoding = GRN_ENC_KOI8R;
3413       break;
3414     default:
3415       encoding = GRN_ENC_DEFAULT;
3416       break;
3417     }
3418   } else {
3419     encoding = GRN_ENC_DEFAULT;
3420   }
3421 
3422   if (!grn_document_root) {
3423     grn_document_root = default_document_root;
3424   }
3425 
3426   if (protocol_arg) {
3427     switch (*protocol_arg) {
3428     case 'g' :
3429     case 'G' :
3430       do_client = g_client;
3431       do_server = g_server;
3432       break;
3433     case 'h' :
3434     case 'H' :
3435       do_client = g_client;
3436       do_server = h_server;
3437       break;
3438     case 'm' :
3439     case 'M' :
3440       is_memcached_mode = GRN_TRUE;
3441       do_client = g_client;
3442       do_server = g_server;
3443       break;
3444     default :
3445       do_client = g_client;
3446       do_server = g_server;
3447       break;
3448     }
3449   } else {
3450     do_client = g_client;
3451     do_server = g_server;
3452   }
3453 
3454 #ifdef WIN32
3455   if (flags & FLAG_USE_WINDOWS_EVENT_LOG) {
3456     use_windows_event_log = GRN_TRUE;
3457   }
3458 #endif /* WIN32 */
3459 
3460   if (use_windows_event_log) {
3461     grn_windows_event_logger_set(NULL, windows_event_source_name);
3462   }
3463 
3464   if (log_path_arg) {
3465     grn_default_logger_set_path(log_path_arg);
3466   }
3467 
3468   if (log_rotate_threshold_size_arg) {
3469     const char * const end =
3470       log_rotate_threshold_size_arg +
3471       strlen(log_rotate_threshold_size_arg);
3472     const char *rest = NULL;
3473     const uint64_t value = grn_atoull(log_rotate_threshold_size_arg, end, &rest);
3474     if (end != rest) {
3475       fprintf(stderr, "invalid log rotate threshold size: <%s>\n",
3476               log_rotate_threshold_size_arg);
3477       return EXIT_FAILURE;
3478     }
3479     grn_default_logger_set_rotate_threshold_size(value);
3480   }
3481 
3482   if (query_log_path_arg) {
3483     grn_default_query_logger_set_path(query_log_path_arg);
3484   }
3485 
3486   if (query_log_rotate_threshold_size_arg) {
3487     const char * const end =
3488       query_log_rotate_threshold_size_arg +
3489       strlen(query_log_rotate_threshold_size_arg);
3490     const char *rest = NULL;
3491     const uint64_t value =
3492       grn_atoull(query_log_rotate_threshold_size_arg, end, &rest);
3493     if (end != rest) {
3494       fprintf(stderr, "invalid query log rotate threshold size: <%s>\n",
3495               query_log_rotate_threshold_size_arg);
3496       return EXIT_FAILURE;
3497     }
3498     grn_default_query_logger_set_rotate_threshold_size(value);
3499   }
3500 
3501   {
3502     grn_log_level log_level;
3503 
3504     if (log_level_arg) {
3505       grn_bool parsed;
3506 
3507       parsed = grn_log_level_parse(log_level_arg, &log_level);
3508       if (!parsed) {
3509         const char * const end = log_level_arg + strlen(log_level_arg);
3510         const char *rest = NULL;
3511         const int value = grn_atoi(log_level_arg, end, &rest);
3512         if (end != rest || value < GRN_LOG_NONE || value > GRN_LOG_DUMP) {
3513           fprintf(stderr, "invalid log level: <%s>\n", log_level_arg);
3514           return EXIT_FAILURE;
3515         }
3516         log_level = value;
3517       }
3518     } else {
3519       log_level = default_log_level;
3520     }
3521 
3522     grn_default_logger_set_max_level(log_level);
3523   }
3524 
3525   if (max_n_threads_arg) {
3526     const char * const end = max_n_threads_arg + strlen(max_n_threads_arg);
3527     const char *rest = NULL;
3528     const uint32_t value = grn_atoui(max_n_threads_arg, end, &rest);
3529     if (end != rest || value < 1 || value > 100) {
3530       fprintf(stderr, "invalid max number of threads: <%s>\n",
3531               max_n_threads_arg);
3532       return EXIT_FAILURE;
3533     }
3534     max_n_floating_threads = value;
3535   } else {
3536     if (flags & FLAG_MODE_ALONE) {
3537       max_n_floating_threads = 1;
3538     } else {
3539       max_n_floating_threads = default_max_n_threads;
3540     }
3541   }
3542 
3543   grn_thread_set_get_limit_func(groonga_get_thread_limit, NULL);
3544   grn_thread_set_set_limit_func(groonga_set_thread_limit, NULL);
3545 
3546   if (output_fd_arg) {
3547     const char * const end = output_fd_arg + strlen(output_fd_arg);
3548     const char *rest = NULL;
3549     const int output_fd = grn_atoi(output_fd_arg, end, &rest);
3550     if (rest != end || output_fd == 0) {
3551       fprintf(stderr, "invalid output FD: <%s>\n", output_fd_arg);
3552       return EXIT_FAILURE;
3553     }
3554     output = fdopen(output_fd, "w");
3555     if (!output) {
3556       fprintf(stderr, "can't open output FD: %d (%s)\n",
3557               output_fd, strerror(errno));
3558       return EXIT_FAILURE;
3559     }
3560   }
3561 
3562 
3563   if (bind_address_arg) {
3564     const size_t bind_address_length = strlen(bind_address_arg);
3565     if (bind_address_length > HOST_NAME_MAX) {
3566       fprintf(stderr, "too long bind address: %s (%u bytes):"
3567                       " must not be longer than %u bytes\n",
3568               bind_address_arg, (unsigned int)bind_address_length, HOST_NAME_MAX);
3569       return EXIT_FAILURE;
3570     }
3571     grn_strcpy(bind_address, HOST_NAME_MAX + 1, bind_address_arg);
3572   } else {
3573     grn_strcpy(bind_address, HOST_NAME_MAX + 1, default_bind_address);
3574   }
3575 
3576   if (hostname_arg) {
3577     const size_t hostname_length = strlen(hostname_arg);
3578     if (hostname_length > HOST_NAME_MAX) {
3579       fprintf(stderr, "too long hostname: %s (%u bytes):"
3580                       " must not be longer than %u bytes\n",
3581               hostname_arg, (unsigned int)hostname_length, HOST_NAME_MAX);
3582       return EXIT_FAILURE;
3583     }
3584     grn_strcpy(hostname, HOST_NAME_MAX + 1, hostname_arg);
3585   } else {
3586     grn_strcpy(hostname, HOST_NAME_MAX + 1, default_hostname);
3587   }
3588 
3589   if (document_root_arg) {
3590     grn_document_root = document_root_arg;
3591   }
3592 
3593   if (default_command_version_arg) {
3594     const char * const end = default_command_version_arg
3595         + strlen(default_command_version_arg);
3596     const char *rest = NULL;
3597     const int value = grn_atoi(default_command_version_arg, end, &rest);
3598     if (end != rest || value < GRN_COMMAND_VERSION_MIN ||
3599         value > GRN_COMMAND_VERSION_MAX) {
3600       fprintf(stderr, "invalid command version: <%s>\n",
3601               default_command_version_arg);
3602       return EXIT_FAILURE;
3603     }
3604     default_command_version = value;
3605   } else {
3606     default_command_version = default_default_command_version;
3607   }
3608 
3609   if (default_match_escalation_threshold_arg) {
3610     const char * const end = default_match_escalation_threshold_arg
3611         + strlen(default_match_escalation_threshold_arg);
3612     const char *rest = NULL;
3613     const int64_t value = grn_atoll(default_match_escalation_threshold_arg, end, &rest);
3614     if (end != rest) {
3615       fprintf(stderr, "invalid match escalation threshold: <%s>\n",
3616               default_match_escalation_threshold_arg);
3617       return EXIT_FAILURE;
3618     }
3619     default_match_escalation_threshold = value;
3620   } else {
3621     default_match_escalation_threshold = default_default_match_escalation_threshold;
3622   }
3623 
3624   if (cache_limit_arg) {
3625     const char * const end = cache_limit_arg + strlen(cache_limit_arg);
3626     const char *rest = NULL;
3627     const uint32_t value = grn_atoui(cache_limit_arg, end, &rest);
3628     if (end != rest) {
3629       fprintf(stderr, "invalid --cache-limit value: <%s>\n", cache_limit_arg);
3630       return EXIT_FAILURE;
3631     }
3632     cache_limit = value;
3633   }
3634 
3635   if (default_request_timeout_arg) {
3636     const char * const end =
3637       default_request_timeout_arg + strlen(default_request_timeout_arg);
3638     char *rest = NULL;
3639     double value;
3640     value = strtod(default_request_timeout_arg, &rest);
3641     if (end != rest) {
3642       fprintf(stderr, "invalid default request timeout: <%s>\n",
3643               default_request_timeout_arg);
3644       return EXIT_FAILURE;
3645     }
3646     default_request_timeout = value;
3647   } else {
3648     default_request_timeout = default_default_request_timeout;
3649   }
3650 
3651   grn_gctx.errbuf[0] = '\0';
3652   if (grn_init()) {
3653     fprintf(stderr, "failed to initialize Groonga: %s\n", grn_gctx.errbuf);
3654     return EXIT_FAILURE;
3655   }
3656 
3657   grn_set_default_encoding(encoding);
3658 
3659   if (default_command_version_arg) {
3660     grn_set_default_command_version(default_command_version);
3661   }
3662 
3663   if (default_match_escalation_threshold_arg) {
3664     grn_set_default_match_escalation_threshold(default_match_escalation_threshold);
3665   }
3666 
3667   if (default_request_timeout_arg) {
3668     grn_set_default_request_timeout(default_request_timeout);
3669   }
3670 
3671   grn_set_segv_handler();
3672   grn_set_int_handler();
3673   grn_set_term_handler();
3674 
3675   if (cache_limit_arg) {
3676     grn_cache *cache;
3677     cache = grn_cache_current_get(&grn_gctx);
3678     grn_cache_set_max_n_entries(&grn_gctx, cache, cache_limit);
3679   }
3680 
3681   MUTEX_INIT(q_mutex);
3682   COND_INIT(q_cond);
3683 
3684   if (input_path) {
3685     input_reader = grn_file_reader_open(&grn_gctx, input_path);
3686     if (!input_reader) {
3687       fprintf(stderr, "can't open input file: %s (%s)\n",
3688               input_path, strerror(errno));
3689       return EXIT_FAILURE;
3690     }
3691     batchmode = GRN_TRUE;
3692   } else {
3693     if (input_fd_arg) {
3694       const char * const end = input_fd_arg + strlen(input_fd_arg);
3695       const char *rest = NULL;
3696       const int input_fd = grn_atoi(input_fd_arg, end, &rest);
3697       if (rest != end || input_fd == 0) {
3698         fprintf(stderr, "invalid input FD: <%s>\n", input_fd_arg);
3699         return EXIT_FAILURE;
3700       }
3701       if (dup2(input_fd, STDIN_FILENO) == -1) {
3702         fprintf(stderr, "can't open input FD: %d (%s)\n",
3703                 input_fd, strerror(errno));
3704         return EXIT_FAILURE;
3705       }
3706       input_reader = grn_file_reader_open(&grn_gctx, "-");
3707       if (!input_reader) {
3708         fprintf(stderr, "%s", grn_gctx.errbuf);
3709         return EXIT_FAILURE;
3710       }
3711       batchmode = GRN_TRUE;
3712     } else {
3713       input_reader = grn_file_reader_open(&grn_gctx, "-");
3714       if (!input_reader) {
3715         fprintf(stderr, "%s", grn_gctx.errbuf);
3716         return EXIT_FAILURE;
3717       }
3718       if (argc - i > 1) {
3719         batchmode = GRN_TRUE;
3720       } else {
3721         batchmode = !grn_isatty(0);
3722       }
3723     }
3724   }
3725 
3726   if ((flags & (FLAG_MODE_ALONE | FLAG_MODE_CLIENT)) &&
3727       !batchmode) {
3728     need_line_editor = GRN_TRUE;
3729   }
3730 
3731 #ifdef GRN_WITH_LIBEDIT
3732   if (need_line_editor) {
3733     line_editor_init(argc, argv);
3734   }
3735 #endif
3736 
3737   newdb = (flags & FLAG_NEW_DB);
3738   is_daemon_mode = (flags & FLAG_MODE_DAEMON);
3739   if (flags & FLAG_MODE_CLIENT) {
3740     exit_code = do_client(argc - i, argv + i);
3741   } else if (is_daemon_mode || (flags & FLAG_MODE_SERVER)) {
3742     exit_code = do_server(argc > i ? argv[i] : NULL);
3743   } else {
3744     exit_code = do_alone(argc - i, argv + i);
3745   }
3746 
3747   COND_FIN(q_cond);
3748   MUTEX_FIN(q_mutex);
3749 
3750   if (input_reader) {
3751     grn_file_reader_close(&grn_gctx, input_reader);
3752   }
3753 #ifdef GRN_WITH_LIBEDIT
3754   if (need_line_editor) {
3755     line_editor_fin();
3756   }
3757 #endif
3758   if (output != stdout) {
3759     fclose(output);
3760   }
3761   grn_fin();
3762   return exit_code;
3763 }
3764