1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2015 Brazil
3 
4   This library is free software; you can redistribute it and/or
5   modify it under the terms of the GNU Lesser General Public
6   License version 2.1 as published by the Free Software Foundation.
7 
8   This library is distributed in the hope that it will be useful,
9   but WITHOUT ANY WARRANTY; without even the implied warranty of
10   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11   Lesser General Public License for more details.
12 
13   You should have received a copy of the GNU Lesser General Public
14   License along with this library; if not, write to the Free Software
15   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1335  USA
16 */
17 
18 /* for grn_str_getopt() */
19 #include <grn_str.h>
20 #include <grn_msgpack.h>
21 
22 #include "zmq_compatible.h"
23 #include <stdio.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <pthread.h>
27 #include <groonga.h>
28 #include <inttypes.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <dirent.h>
32 
33 #include "util.h"
34 
35 #include <evhttp.h>
36 
37 #define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
38 #define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
39 #define SEND_WAIT 1000 /* 0.001sec */
40 
41 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
42 
43 typedef enum {
44   RUN_MODE_NONE   = 0x00,
45   RUN_MODE_USAGE  = 0x01,
46   RUN_MODE_DAEMON = 0x02,
47   RUN_MODE_ERROR  = 0x04
48 } run_mode;
49 
50 #define RUN_MODE_MASK                0x007f
51 
52 typedef struct {
53   const char *db_path;
54   const char *send_endpoint;
55   pthread_t thd;
56   void *zmq_ctx;
57 } send_thd_data;
58 
59 static volatile sig_atomic_t loop = 1;
60 
61 static void
load_to_groonga(grn_ctx * ctx,grn_obj * buf,const char * query,uint32_t query_len,const char * client_id,uint32_t client_id_len,const char * learn_target_name,uint32_t learn_target_name_len,uint64_t millisec,int submit)62 load_to_groonga(grn_ctx *ctx,
63                 grn_obj *buf,
64                 const char *query, uint32_t query_len,
65                 const char *client_id, uint32_t client_id_len,
66                 const char *learn_target_name, uint32_t learn_target_name_len,
67                 uint64_t millisec,
68                 int submit)
69 {
70   GRN_BULK_REWIND(buf);
71   GRN_TEXT_PUTS(ctx, buf, "load --table event_");
72   GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
73   GRN_TEXT_PUTS(ctx, buf, " --each 'suggest_preparer(_id,type,item,sequence,time,pair_");
74   GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
75   GRN_TEXT_PUTS(ctx, buf, ")'");
76   grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
77   grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
78 
79   GRN_BULK_REWIND(buf);
80   GRN_TEXT_PUTS(ctx, buf, "{\"item\":");
81   grn_text_esc(ctx, buf, query, query_len);
82   GRN_TEXT_PUTS(ctx, buf, ",\"sequence\":");
83   grn_text_esc(ctx, buf, client_id, client_id_len);
84   GRN_TEXT_PUTS(ctx, buf, ",\"time\":");
85   grn_text_ftoa(ctx, buf, (double)millisec / 1000);
86   if (submit) {
87     GRN_TEXT_PUTS(ctx, buf, ",\"type\":\"submit\"}");
88   } else {
89     GRN_TEXT_PUTS(ctx, buf, "}");
90   }
91   /* printf("%.*s\n", GRN_TEXT_LEN(buf), GRN_TEXT_VALUE(buf)); */
92   grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
93 
94   grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
95 
96   {
97     char *res;
98     int flags;
99     unsigned int res_len;
100     grn_ctx_recv(ctx, &res, &res_len, &flags);
101   }
102 }
103 
104 void
load_to_multi_targets(grn_ctx * ctx,grn_obj * buf,const char * query,uint32_t query_len,const char * client_id,uint32_t client_id_len,const char * learn_target_names,uint32_t learn_target_names_len,uint64_t millisec,int submit)105 load_to_multi_targets(grn_ctx *ctx,
106                       grn_obj *buf,
107                       const char *query, uint32_t query_len,
108                       const char *client_id, uint32_t client_id_len,
109                       const char *learn_target_names,
110                       uint32_t learn_target_names_len,
111                       uint64_t millisec,
112                       int submit)
113 {
114   if (millisec && query && client_id && learn_target_names) {
115     unsigned int tn_len;
116     const char *tn, *tnp, *tne;
117     tn = tnp = learn_target_names;
118     tne = learn_target_names + learn_target_names_len;
119     while (tnp <= tne) {
120       if (tnp == tne || *tnp == '|') {
121         tn_len = tnp - tn;
122 
123         /*
124         printf("sec: %" PRIu64 " query %.*s client_id: %.*s target: %.*s\n",
125           millisec,
126           query_len, query,
127           client_id_len, client_id,
128           tn_len, tn);
129         */
130         load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
131                         tn, tn_len, millisec, submit);
132 
133         tn = ++tnp;
134       } else {
135         tnp++;
136       }
137     }
138   }
139 }
140 
141 #define PACK_KEY_FROM_ID(id) do { \
142   int _k_len; \
143   char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \
144   _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \
145   msgpack_pack_str(&pk, _k_len); \
146   msgpack_pack_str_body(&pk, _k_buf, _k_len); \
147 } while (0)
148 
149 #define PACK_MAP_ITEM(col_name) do { \
150   grn_obj _v; \
151   msgpack_pack_str(&pk, sizeof(#col_name) - 1); \
152   msgpack_pack_str_body(&pk, #col_name, sizeof(#col_name) - 1); \
153   switch (col_##col_name->header.type) { \
154   case GRN_COLUMN_FIX_SIZE: \
155     GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
156     break; \
157   case GRN_COLUMN_VAR_SIZE: \
158     if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \
159       GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \
160     } else { \
161       GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
162     } \
163     break; \
164   } \
165   grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \
166  \
167   switch (_v.header.type) { \
168   case GRN_BULK: \
169     switch (_v.header.domain) { \
170     case GRN_DB_SHORT_TEXT: \
171       msgpack_pack_str(&pk, GRN_TEXT_LEN(&_v)); \
172       msgpack_pack_str_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \
173       break; \
174     case GRN_DB_INT32: \
175       msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \
176       break; \
177     case GRN_DB_UINT32: \
178       msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \
179       break; \
180     case GRN_DB_TIME: \
181       msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \
182       break; \
183     default: /* ref. to ShortText key */ \
184       PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \
185     } \
186     break; \
187   case GRN_UVECTOR: /* ref.s to ShortText key */ \
188     { \
189       grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \
190       msgpack_pack_array(&pk, _idve - _idv); \
191       for (; _idv < _idve; _idv++) { \
192         PACK_KEY_FROM_ID(*_idv); \
193       } \
194     } \
195     break; \
196   default: \
197     print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \
198     msgpack_pack_nil(&pk); \
199     break; \
200   } \
201   grn_obj_close(ctx, &_v); \
202 } while (0)
203 
204 static int
zmq_send_to_httpd(void * zmq_send_sock,void * data,size_t size)205 zmq_send_to_httpd(void *zmq_send_sock, void *data, size_t size)
206 {
207   zmq_msg_t msg;
208   if (!zmq_msg_init_size(&msg, size)) {
209     memcpy((void *)zmq_msg_data(&msg), data, size);
210     if (zmq_msg_send(&msg, zmq_send_sock, 0) == -1) {
211       print_error("zmq_send() error");
212       return -1;
213     }
214     zmq_msg_close(&msg);
215   } else {
216     print_error("zmq_msg_init_size() error");
217   }
218   return 0;
219 }
220 
221 static void
send_handler(void * zmq_send_sock,grn_ctx * ctx)222 send_handler(void *zmq_send_sock, grn_ctx *ctx)
223 {
224   grn_table_cursor *cur;
225   if ((cur = grn_table_cursor_open(ctx, grn_ctx_db(ctx), NULL, 0, NULL, 0,
226        0, -1, 0))) {
227     grn_id table_id;
228     while (loop && (table_id = grn_table_cursor_next(ctx, cur)) != GRN_ID_NIL) {
229       grn_obj *table;
230       if ((table = grn_ctx_at(ctx, table_id))) {
231         int name_len;
232         char name_buf[GRN_TABLE_MAX_KEY_SIZE];
233 
234         name_len = grn_obj_name(ctx, table, name_buf,
235                                 GRN_TABLE_MAX_KEY_SIZE);
236 
237         if (name_len > 5) {
238           if (table->header.type == GRN_TABLE_PAT_KEY &&
239               !memcmp(name_buf, CONST_STR_LEN("item_"))) {
240             /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
241             grn_obj *ref_table;
242             grn_table_cursor *tc;
243             grn_obj *col_last, *col_kana, *col_freq, *col_freq2,
244                     *col_buzz, *col_boost;
245 
246             col_kana = grn_obj_column(ctx, table, CONST_STR_LEN("kana"));
247             col_freq = grn_obj_column(ctx, table, CONST_STR_LEN("freq"));
248             col_last = grn_obj_column(ctx, table, CONST_STR_LEN("last"));
249             col_boost = grn_obj_column(ctx, table, CONST_STR_LEN("boost"));
250             col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
251             col_buzz = grn_obj_column(ctx, table, CONST_STR_LEN("buzz"));
252 
253             ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_kana));
254 
255             if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
256                                             0, 0, -1, 0))) {
257               grn_id rec_id;
258               while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
259                      != GRN_ID_NIL) {
260                 char *key;
261                 size_t key_len;
262                 msgpack_packer pk;
263                 msgpack_sbuffer sbuf;
264 
265                 msgpack_sbuffer_init(&sbuf);
266                 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
267 
268                 msgpack_pack_map(&pk, 8);
269 
270                 /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
271                 msgpack_pack_str(&pk, 6);
272                 msgpack_pack_str_body(&pk, "target", strlen("target"));
273                 msgpack_pack_str(&pk, name_len);
274                 msgpack_pack_str_body(&pk, name_buf, name_len);
275 
276                 msgpack_pack_str(&pk, 4);
277                 msgpack_pack_str_body(&pk,
278                                       GRN_COLUMN_NAME_KEY,
279                                       GRN_COLUMN_NAME_KEY_LEN);
280                 key_len = grn_table_cursor_get_key(ctx, tc, (void **)&key);
281                 msgpack_pack_str(&pk, key_len);
282                 msgpack_pack_str_body(&pk, key, key_len);
283 
284                 PACK_MAP_ITEM(last);
285                 PACK_MAP_ITEM(kana);
286                 PACK_MAP_ITEM(freq);
287                 PACK_MAP_ITEM(freq2);
288                 PACK_MAP_ITEM(buzz);
289                 PACK_MAP_ITEM(boost);
290 
291                 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
292 
293                 usleep(SEND_WAIT);
294 
295                 msgpack_sbuffer_destroy(&sbuf);
296               }
297               grn_table_cursor_close(ctx, tc);
298             }
299           } else if (table->header.type == GRN_TABLE_HASH_KEY &&
300                      !memcmp(name_buf, CONST_STR_LEN("pair_"))) {
301             grn_obj *ref_table;
302             grn_table_cursor *tc;
303             grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2;
304 
305             col_pre = grn_obj_column(ctx, table, CONST_STR_LEN("pre"));
306             col_post = grn_obj_column(ctx, table, CONST_STR_LEN("post"));
307             col_freq0 = grn_obj_column(ctx, table, CONST_STR_LEN("freq0"));
308             col_freq1 = grn_obj_column(ctx, table, CONST_STR_LEN("freq1"));
309             col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
310 
311             ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_pre));
312 
313             if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
314                                             0, 0, -1, 0))) {
315               grn_id rec_id;
316               while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
317                      != GRN_ID_NIL) {
318                 uint64_t *key;
319                 msgpack_packer pk;
320                 msgpack_sbuffer sbuf;
321 
322                 /* skip freq0 == 0 && freq1 == 0 && freq2 == 0 */
323                 {
324                   grn_obj f;
325                   grn_obj_get_value(ctx, col_freq0, rec_id, &f);
326                   if (!GRN_INT32_VALUE(&f)) {
327                     grn_obj_get_value(ctx, col_freq1, rec_id, &f);
328                     if (!GRN_INT32_VALUE(&f)) {
329                       grn_obj_get_value(ctx, col_freq2, rec_id, &f);
330                       if (!GRN_INT32_VALUE(&f)) { continue; }
331                     }
332                   }
333                 }
334 
335                 /* make pair_* message */
336                 msgpack_sbuffer_init(&sbuf);
337                 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
338 
339                 msgpack_pack_map(&pk, 7);
340                 /* ["_key","UInt64"],["pre","item_all"],["post","item_all"],["freq2","Int32"],["freq1","Int32"],["freq0","Int32"] */
341 
342                 msgpack_pack_str(&pk, 6);
343                 msgpack_pack_str_body(&pk, "target", strlen("target"));
344                 msgpack_pack_str(&pk, name_len);
345                 msgpack_pack_str_body(&pk, name_buf, name_len);
346 
347                 msgpack_pack_str(&pk, 4);
348                 msgpack_pack_str_body(&pk,
349                                       GRN_COLUMN_NAME_KEY,
350                                       GRN_COLUMN_NAME_KEY_LEN);
351                 grn_table_cursor_get_key(ctx, tc, (void **)&key);
352                 msgpack_pack_uint64(&pk, *key);
353 
354                 PACK_MAP_ITEM(pre);
355                 PACK_MAP_ITEM(post);
356                 PACK_MAP_ITEM(freq0);
357                 PACK_MAP_ITEM(freq1);
358                 PACK_MAP_ITEM(freq2);
359 
360                 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
361 
362                 usleep(SEND_WAIT);
363 
364                 msgpack_sbuffer_destroy(&sbuf);
365               }
366               grn_table_cursor_close(ctx, tc);
367             }
368           }
369         }
370         grn_obj_unlink(ctx, table);
371       }
372     }
373     grn_table_cursor_close(ctx, cur);
374   }
375 }
376 
377 static void *
send_to_httpd(void * arg)378 send_to_httpd(void *arg)
379 {
380   send_thd_data *thd = arg;
381   void *zmq_send_sock;
382   if ((zmq_send_sock = zmq_socket(thd->zmq_ctx, ZMQ_PUB))) {
383     if (!zmq_bind(zmq_send_sock, thd->send_endpoint)) {
384       grn_ctx ctx;
385       if (!(grn_ctx_init(&ctx, 0))) {
386         grn_obj *db;
387         if ((db = grn_db_open(&ctx, thd->db_path))) {
388           uint64_t hwm = 1;
389           zmq_setsockopt(zmq_send_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
390           while (loop) {
391             send_handler(zmq_send_sock, &ctx);
392           }
393           grn_obj_close(&ctx, db);
394         } else {
395           print_error("error in grn_db_open() on send thread.");
396         }
397         grn_ctx_fin(&ctx);
398       } else {
399         print_error("error in grn_ctx_init() on send thread.");
400       }
401     } else {
402       print_error("cannot bind zmq_socket.");
403     }
404   } else {
405     print_error("cannot create zmq_socket.");
406   }
407   return NULL;
408 }
409 
410 static void
handle_msg(msgpack_object * obj,grn_ctx * ctx,grn_obj * buf)411 handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
412 {
413   int submit_flag = 0;
414   uint64_t millisec = 0;
415   const char *query = NULL,
416              *client_id = NULL, *learn_target_names = NULL;
417   uint32_t query_len = 0, client_id_len = 0, learn_target_names_len = 0;
418   if (obj->type == MSGPACK_OBJECT_MAP) {
419     int i;
420     for (i = 0; i < obj->via.map.size; i++) {
421       msgpack_object_kv *kv;
422       msgpack_object *key;
423       msgpack_object *value;
424       kv = &(obj->via.map.ptr[i]);
425       key = &(kv->key);
426       value = &(kv->val);
427       if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) > 0) {
428         switch (MSGPACK_OBJECT_STR_PTR(key)[0]) {
429         case 'i':
430           if (value->type == MSGPACK_OBJECT_STR) {
431             client_id_len = MSGPACK_OBJECT_STR_SIZE(value);
432             client_id = MSGPACK_OBJECT_STR_PTR(value);
433           }
434           break;
435         case 'q':
436           if (value->type == MSGPACK_OBJECT_STR) {
437             query_len = MSGPACK_OBJECT_STR_SIZE(value);
438             query = MSGPACK_OBJECT_STR_PTR(value);
439           }
440           break;
441         case 'l':
442           if (value->type == MSGPACK_OBJECT_STR) {
443             learn_target_names_len = MSGPACK_OBJECT_STR_SIZE(value);
444             learn_target_names = MSGPACK_OBJECT_STR_PTR(value);
445           }
446           break;
447         case 's':
448           if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
449             millisec = kv->val.via.u64;
450           }
451           break;
452         case 't':
453           if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) {
454             submit_flag = (kv->val.via.boolean ? 1 : 0);
455           }
456           break;
457         default:
458           break;
459         }
460       }
461     }
462     load_to_multi_targets(ctx, buf, query, query_len,
463                           client_id, client_id_len,
464                           learn_target_names, learn_target_names_len,
465                           millisec, submit_flag);
466   }
467 }
468 
469 static void
recv_event_loop(msgpack_zone * mempool,void * zmq_sock,grn_ctx * ctx)470 recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
471 {
472   grn_obj buf;
473   zmq_pollitem_t items[] = {
474     { zmq_sock, 0, ZMQ_POLLIN, 0}
475   };
476   GRN_TEXT_INIT(&buf, 0);
477   while (loop) {
478     zmq_poll(items, 1, 10000);
479     if (items[0].revents & ZMQ_POLLIN) { /* always true */
480       zmq_msg_t msg;
481       if (zmq_msg_init(&msg)) {
482         print_error("cannot init zmq message.");
483       } else {
484         if (zmq_msg_recv(&msg, zmq_sock, 0) == -1) {
485           print_error("cannot recv zmq message.");
486         } else {
487           msgpack_object obj;
488           msgpack_unpack_return ret;
489           ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
490           if (MSGPACK_UNPACK_SUCCESS == ret) {
491             /* msgpack_object_print(stdout, obj); */
492             handle_msg(&obj, ctx, &buf);
493           }
494           msgpack_zone_clear(mempool);
495         }
496         zmq_msg_close(&msg);
497       }
498     }
499   }
500   grn_obj_unlink(ctx, &buf);
501 }
502 
503 struct _suggest_log_file {
504   FILE *fp;
505   char *path;
506   uint64_t line;
507   /* datas from one line */
508   int submit;
509   char *query;
510   uint64_t millisec;
511   char *client_id;
512   char *learn_target_name;
513   /* link list */
514   struct _suggest_log_file *next;
515 };
516 typedef struct _suggest_log_file suggest_log_file;
517 
518 #if 0
519 static void
520 print_log_file_list(suggest_log_file *list)
521 {
522   while (list) {
523     printf("fp:%p millisec:%" PRIu64 " next:%p\n",
524       list->fp, list->millisec, list->next);
525     list = list->next;
526   }
527 }
528 #endif
529 
530 static void
free_log_line_data(suggest_log_file * l)531 free_log_line_data(suggest_log_file *l)
532 {
533   if (l->query) {
534     free(l->query);
535     l->query = NULL;
536   }
537   if (l->client_id) {
538     free(l->client_id);
539     l->client_id = NULL;
540   }
541   if (l->learn_target_name) {
542     free(l->learn_target_name);
543     l->learn_target_name = NULL;
544   }
545 }
546 
547 #define MAX_LOG_LENGTH 0x2000
548 
549 static void
read_log_line(suggest_log_file ** list)550 read_log_line(suggest_log_file **list)
551 {
552   suggest_log_file *t = *list;
553   char line_buf[MAX_LOG_LENGTH];
554   while (1) {
555     free_log_line_data(t);
556     if (fgets(line_buf, MAX_LOG_LENGTH, t->fp)) {
557       char *eol;
558       t->line++;
559       if ((eol = strrchr(line_buf, '\n'))) {
560         const char *query, *types, *client_id, *learn_target_name;
561         struct evkeyvalq get_args;
562         *eol = '\0';
563         evhttp_parse_query(line_buf, &get_args);
564         parse_keyval(NULL,
565                      &get_args, &query, &types, &client_id, NULL,
566                      &learn_target_name, NULL, &(t->millisec), NULL, NULL, NULL,
567                      NULL);
568         if (query && client_id && learn_target_name && t->millisec) {
569           t->query = evhttp_decode_uri(query);
570           t->submit = (types && !strcmp(types, "submit"));
571           t->client_id = evhttp_decode_uri(client_id);
572           t->learn_target_name = evhttp_decode_uri(learn_target_name);
573           evhttp_clear_headers(&get_args);
574           break;
575         }
576         print_error("invalid line path:%s line:%" PRIu64,
577           t->path, t->line);
578         evhttp_clear_headers(&get_args);
579       } else {
580         /* read until new line */
581         while (1) {
582           int c = fgetc(t->fp);
583           if (c == '\n' || c == EOF) { break; }
584         }
585       }
586     } else {
587       /* terminate reading log */
588       fclose(t->fp);
589       free(t->path);
590       *list = t->next;
591       free(t);
592       break;
593     }
594   }
595 }
596 
597 /* re-sorting by list->millisec asc with moving a head item. */
598 static void
sort_log_file_list(suggest_log_file ** list)599 sort_log_file_list(suggest_log_file **list)
600 {
601   suggest_log_file *p, *target;
602   target = *list;
603   if (!target || !target->next || target->millisec < target->next->millisec) {
604     return;
605   }
606   *list = target->next;
607   for (p = *list; p; p = p->next) {
608     if (!p->next || target->millisec > p->next->millisec) {
609       target->next = p->next;
610       p->next = target;
611       return;
612     }
613   }
614 }
615 
616 #define PATH_SEPARATOR '/'
617 
618 static suggest_log_file *
gather_log_file(const char * dir_path,unsigned int dir_path_len)619 gather_log_file(const char *dir_path, unsigned int dir_path_len)
620 {
621   DIR *dir;
622   struct dirent *dirent;
623   char path[PATH_MAX + 1];
624   suggest_log_file *list = NULL;
625   if (!(dir = opendir(dir_path))) {
626     print_error("cannot open log directory.");
627     return NULL;
628   }
629   memcpy(path, dir_path, dir_path_len);
630   path[dir_path_len] = PATH_SEPARATOR;
631   while ((dirent = readdir(dir))) {
632     struct stat fstat;
633     unsigned int d_namlen, path_len;
634     if (*(dirent->d_name) == '.' && (
635       dirent->d_name[1] == '\0' ||
636         (dirent->d_name[1] == '.' && dirent->d_name[2] == '\0'))) {
637       continue;
638     }
639     d_namlen = strlen(dirent->d_name);
640     path_len = dir_path_len + 1 + d_namlen;
641     if (dir_path_len + d_namlen >= PATH_MAX) { continue; }
642     memcpy(path + dir_path_len + 1, dirent->d_name, d_namlen);
643     path[path_len] = '\0';
644     lstat(path, &fstat);
645     if (S_ISDIR(fstat.st_mode)) {
646       gather_log_file(path, path_len);
647     } else {
648       suggest_log_file *p = calloc(1, sizeof(suggest_log_file));
649       if (!(p->fp = fopen(path, "r"))) {
650         free(p);
651       } else {
652         if (list) {
653           p->next = list;
654         }
655         p->path = strdup(path);
656         list = p;
657         read_log_line(&list);
658         sort_log_file_list(&list);
659       }
660     }
661     /* print_log_file_list(list); */
662   }
663   return list;
664 }
665 
666 static void
load_log(grn_ctx * ctx,const char * log_dir_name)667 load_log(grn_ctx *ctx, const char *log_dir_name)
668 {
669   grn_obj buf;
670   suggest_log_file *list;
671   GRN_TEXT_INIT(&buf, 0);
672   list = gather_log_file(log_dir_name, strlen(log_dir_name));
673   while (list) {
674     /*
675     printf("file:%s line:%" PRIu64 " query:%s millisec:%" PRIu64 "\n",
676       list->path, list->line, list->query, list->millisec);
677     */
678     load_to_multi_targets(ctx, &buf,
679       list->query, strlen(list->query),
680       list->client_id, strlen(list->client_id),
681       list->learn_target_name, strlen(list->learn_target_name),
682       list->millisec,
683       list->submit);
684     read_log_line(&list);
685     sort_log_file_list(&list);
686   }
687   grn_obj_close(ctx, &buf);
688 }
689 
690 static void
usage(FILE * output)691 usage(FILE *output)
692 {
693   fprintf(output,
694           "Usage: groonga-suggest-learner [options...] db_path\n"
695           "options:\n"
696           "  -r <recv endpoint>: recv endpoint (default: %s)\n"
697           "  --receive-endpoint <recv endpoint>\n"
698           "\n"
699           "  -s <send endpoint>: send endpoint (default: %s)\n"
700           "  --send-endpoint <send endpoint>\n"
701           "\n"
702           "  -l <log directory>: load from log files made on webserver.\n"
703           "  --log-base-path <log directory>\n"
704           "\n"
705           "  --log-path <path> : output logs to <path>\n"
706           "  --log-level <level> : set log level to <level> (default: %d)\n"
707           "  -d, --daemon      : daemonize\n",
708           DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT,
709           GRN_LOG_DEFAULT_LEVEL);
710 }
711 
712 static void
signal_handler(int sig)713 signal_handler(int sig)
714 {
715   loop = 0;
716 }
717 
718 int
main(int argc,char ** argv)719 main(int argc, char **argv)
720 {
721   run_mode mode = RUN_MODE_NONE;
722   int n_processed_args;
723   const char *recv_endpoint = DEFAULT_RECV_ENDPOINT;
724   const char *send_endpoint = DEFAULT_SEND_ENDPOINT;
725   const char *log_base_path = NULL;
726   const char *db_path = NULL;
727 
728   /* parse options */
729   {
730     int flags = mode;
731     const char *log_path = NULL;
732     const char *log_level = NULL;
733     static grn_str_getopt_opt opts[] = {
734       {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE},
735       {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE},
736       {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE},
737       {'\0', "log-path", NULL, 0, GETOPT_OP_NONE},
738       {'\0', "log-level", NULL, 0, GETOPT_OP_NONE},
739       {'d', "daemon", NULL, RUN_MODE_DAEMON, GETOPT_OP_UPDATE},
740       {'h', "help", NULL, RUN_MODE_USAGE, GETOPT_OP_UPDATE},
741       {'\0', NULL, NULL, 0, 0}
742     };
743     opts[0].arg = &recv_endpoint;
744     opts[1].arg = &send_endpoint;
745     opts[2].arg = &log_base_path;
746     opts[3].arg = &log_path;
747     opts[4].arg = &log_level;
748 
749     n_processed_args = grn_str_getopt(argc, argv, opts, &flags);
750 
751     if (log_path) {
752       grn_default_logger_set_path(log_path);
753     }
754 
755     if (log_level) {
756       const char * const end = log_level + strlen(log_level);
757       const char *rest = NULL;
758       const int value = grn_atoi(log_level, end, &rest);
759       if (end != rest || value < 0 || value > 9) {
760         fprintf(stderr, "invalid log level: <%s>\n", log_level);
761         return EXIT_FAILURE;
762       }
763       grn_default_logger_set_max_level(value);
764     }
765 
766     mode = (flags & RUN_MODE_MASK);
767 
768     if (mode & RUN_MODE_USAGE) {
769       usage(stdout);
770       return EXIT_SUCCESS;
771     }
772 
773     if ((n_processed_args < 0) ||
774         (argc - n_processed_args) != 1) {
775       usage(stderr);
776     }
777 
778     db_path = argv[n_processed_args];
779   }
780 
781   /* main */
782   {
783     grn_ctx *ctx;
784     msgpack_zone *mempool;
785 
786     if (mode == RUN_MODE_DAEMON) {
787       daemonize();
788     }
789 
790     grn_init();
791 
792     ctx = grn_ctx_open(0);
793     if (!(grn_db_open(ctx, db_path))) {
794       print_error("cannot open database.");
795     } else {
796       if (log_base_path) {
797         /* loading log mode */
798         load_log(ctx, log_base_path);
799       } else {
800         /* zeromq/msgpack recv mode */
801         if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
802           print_error("cannot create msgpack zone.");
803         } else {
804           void *zmq_ctx, *zmq_recv_sock;
805           if (!(zmq_ctx = zmq_init(1))) {
806             print_error("cannot create zmq context.");
807           } else {
808             if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
809               print_error("cannot create zmq_socket.");
810             } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
811               print_error("cannot bind zmq_socket.");
812             } else {
813               send_thd_data thd;
814 
815               signal(SIGTERM, signal_handler);
816               signal(SIGINT, signal_handler);
817               signal(SIGQUIT, signal_handler);
818 
819               zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
820               thd.db_path = db_path;
821               thd.send_endpoint = send_endpoint;
822               thd.zmq_ctx = zmq_ctx;
823 
824               if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
825                 print_error("error in pthread_create() for sending datas.");
826               }
827               recv_event_loop(mempool, zmq_recv_sock, ctx);
828               if (pthread_join(thd.thd, NULL)) {
829                 print_error("error in pthread_join() for waiting completion of sending data.");
830               }
831             }
832             zmq_term(zmq_ctx);
833           }
834           msgpack_zone_free(mempool);
835         }
836       }
837     }
838     grn_obj_close(ctx, grn_ctx_db(ctx));
839     grn_ctx_fin(ctx);
840     grn_fin();
841   }
842   return 0;
843 }
844