1 /* -*- c-basic-offset: 2 -*- */
2 /* Copyright(C) 2010-2015 Brazil
3
4 This library is free software; you can redistribute it and/or
5 modify it under the terms of the GNU Lesser General Public
6 License version 2.1 as published by the Free Software Foundation.
7
8 This library is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
11 Lesser General Public License for more details.
12
13 You should have received a copy of the GNU Lesser General Public
14 License along with this library; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
16 */
17
18 /* for grn_str_getopt() */
19 #include <grn_str.h>
20 #include <grn_msgpack.h>
21
22 #include "zmq_compatible.h"
23 #include <stdio.h>
24 #include <signal.h>
25 #include <unistd.h>
26 #include <pthread.h>
27 #include <groonga.h>
28 #include <inttypes.h>
29 #include <sys/stat.h>
30 #include <sys/types.h>
31 #include <dirent.h>
32
33 #include "util.h"
34
35 #include <evhttp.h>
36
37 #define DEFAULT_RECV_ENDPOINT "tcp://*:1234"
38 #define DEFAULT_SEND_ENDPOINT "tcp://*:1235"
39 #define SEND_WAIT 1000 /* 0.001sec */
40
41 #define CONST_STR_LEN(x) x, x ? sizeof(x) - 1 : 0
42
43 typedef enum {
44 RUN_MODE_NONE = 0x00,
45 RUN_MODE_USAGE = 0x01,
46 RUN_MODE_DAEMON = 0x02,
47 RUN_MODE_ERROR = 0x04
48 } run_mode;
49
50 #define RUN_MODE_MASK 0x007f
51
52 typedef struct {
53 const char *db_path;
54 const char *send_endpoint;
55 pthread_t thd;
56 void *zmq_ctx;
57 } send_thd_data;
58
59 static volatile sig_atomic_t loop = 1;
60
61 static void
load_to_groonga(grn_ctx * ctx,grn_obj * buf,const char * query,uint32_t query_len,const char * client_id,uint32_t client_id_len,const char * learn_target_name,uint32_t learn_target_name_len,uint64_t millisec,int submit)62 load_to_groonga(grn_ctx *ctx,
63 grn_obj *buf,
64 const char *query, uint32_t query_len,
65 const char *client_id, uint32_t client_id_len,
66 const char *learn_target_name, uint32_t learn_target_name_len,
67 uint64_t millisec,
68 int submit)
69 {
70 GRN_BULK_REWIND(buf);
71 GRN_TEXT_PUTS(ctx, buf, "load --table event_");
72 GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
73 GRN_TEXT_PUTS(ctx, buf, " --each 'suggest_preparer(_id,type,item,sequence,time,pair_");
74 GRN_TEXT_PUT(ctx, buf, learn_target_name, learn_target_name_len);
75 GRN_TEXT_PUTS(ctx, buf, ")'");
76 grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
77 grn_ctx_send(ctx, CONST_STR_LEN("["), GRN_CTX_MORE);
78
79 GRN_BULK_REWIND(buf);
80 GRN_TEXT_PUTS(ctx, buf, "{\"item\":");
81 grn_text_esc(ctx, buf, query, query_len);
82 GRN_TEXT_PUTS(ctx, buf, ",\"sequence\":");
83 grn_text_esc(ctx, buf, client_id, client_id_len);
84 GRN_TEXT_PUTS(ctx, buf, ",\"time\":");
85 grn_text_ftoa(ctx, buf, (double)millisec / 1000);
86 if (submit) {
87 GRN_TEXT_PUTS(ctx, buf, ",\"type\":\"submit\"}");
88 } else {
89 GRN_TEXT_PUTS(ctx, buf, "}");
90 }
91 /* printf("%.*s\n", GRN_TEXT_LEN(buf), GRN_TEXT_VALUE(buf)); */
92 grn_ctx_send(ctx, GRN_TEXT_VALUE(buf), GRN_TEXT_LEN(buf), GRN_CTX_MORE);
93
94 grn_ctx_send(ctx, CONST_STR_LEN("]"), 0);
95
96 {
97 char *res;
98 int flags;
99 unsigned int res_len;
100 grn_ctx_recv(ctx, &res, &res_len, &flags);
101 }
102 }
103
104 void
load_to_multi_targets(grn_ctx * ctx,grn_obj * buf,const char * query,uint32_t query_len,const char * client_id,uint32_t client_id_len,const char * learn_target_names,uint32_t learn_target_names_len,uint64_t millisec,int submit)105 load_to_multi_targets(grn_ctx *ctx,
106 grn_obj *buf,
107 const char *query, uint32_t query_len,
108 const char *client_id, uint32_t client_id_len,
109 const char *learn_target_names,
110 uint32_t learn_target_names_len,
111 uint64_t millisec,
112 int submit)
113 {
114 if (millisec && query && client_id && learn_target_names) {
115 unsigned int tn_len;
116 const char *tn, *tnp, *tne;
117 tn = tnp = learn_target_names;
118 tne = learn_target_names + learn_target_names_len;
119 while (tnp <= tne) {
120 if (tnp == tne || *tnp == '|') {
121 tn_len = tnp - tn;
122
123 /*
124 printf("sec: %" PRIu64 " query %.*s client_id: %.*s target: %.*s\n",
125 millisec,
126 query_len, query,
127 client_id_len, client_id,
128 tn_len, tn);
129 */
130 load_to_groonga(ctx, buf, query, query_len, client_id, client_id_len,
131 tn, tn_len, millisec, submit);
132
133 tn = ++tnp;
134 } else {
135 tnp++;
136 }
137 }
138 }
139 }
140
141 #define PACK_KEY_FROM_ID(id) do { \
142 int _k_len; \
143 char _k_buf[GRN_TABLE_MAX_KEY_SIZE]; \
144 _k_len = grn_table_get_key(ctx, ref_table, (id), _k_buf, GRN_TABLE_MAX_KEY_SIZE); \
145 msgpack_pack_str(&pk, _k_len); \
146 msgpack_pack_str_body(&pk, _k_buf, _k_len); \
147 } while (0)
148
149 #define PACK_MAP_ITEM(col_name) do { \
150 grn_obj _v; \
151 msgpack_pack_str(&pk, sizeof(#col_name) - 1); \
152 msgpack_pack_str_body(&pk, #col_name, sizeof(#col_name) - 1); \
153 switch (col_##col_name->header.type) { \
154 case GRN_COLUMN_FIX_SIZE: \
155 GRN_VALUE_FIX_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
156 break; \
157 case GRN_COLUMN_VAR_SIZE: \
158 if ((col_##col_name->header.flags & GRN_OBJ_COLUMN_TYPE_MASK) == GRN_OBJ_COLUMN_VECTOR) { \
159 GRN_VALUE_FIX_SIZE_INIT(&_v, GRN_OBJ_VECTOR, grn_obj_get_range(ctx, col_##col_name)); \
160 } else { \
161 GRN_VALUE_VAR_SIZE_INIT(&_v, 0, grn_obj_get_range(ctx, col_##col_name)); \
162 } \
163 break; \
164 } \
165 grn_obj_get_value(ctx, col_##col_name, rec_id, &_v); \
166 \
167 switch (_v.header.type) { \
168 case GRN_BULK: \
169 switch (_v.header.domain) { \
170 case GRN_DB_SHORT_TEXT: \
171 msgpack_pack_str(&pk, GRN_TEXT_LEN(&_v)); \
172 msgpack_pack_str_body(&pk, GRN_TEXT_VALUE(&_v), GRN_TEXT_LEN(&_v)); \
173 break; \
174 case GRN_DB_INT32: \
175 msgpack_pack_int32(&pk, GRN_INT32_VALUE(&_v)); \
176 break; \
177 case GRN_DB_UINT32: \
178 msgpack_pack_uint32(&pk, GRN_UINT32_VALUE(&_v)); \
179 break; \
180 case GRN_DB_TIME: \
181 msgpack_pack_double(&pk, (double)GRN_TIME_VALUE(&_v) / GRN_TIME_USEC_PER_SEC); \
182 break; \
183 default: /* ref. to ShortText key */ \
184 PACK_KEY_FROM_ID(GRN_RECORD_VALUE(&_v)); \
185 } \
186 break; \
187 case GRN_UVECTOR: /* ref.s to ShortText key */ \
188 { \
189 grn_id *_idv = (grn_id *)GRN_BULK_HEAD(&_v), *_idve = (grn_id *)GRN_BULK_CURR(&_v); \
190 msgpack_pack_array(&pk, _idve - _idv); \
191 for (; _idv < _idve; _idv++) { \
192 PACK_KEY_FROM_ID(*_idv); \
193 } \
194 } \
195 break; \
196 default: \
197 print_error("invalid groonga object type(%d) for msgpack.", _v.header.type); \
198 msgpack_pack_nil(&pk); \
199 break; \
200 } \
201 grn_obj_close(ctx, &_v); \
202 } while (0)
203
204 static int
zmq_send_to_httpd(void * zmq_send_sock,void * data,size_t size)205 zmq_send_to_httpd(void *zmq_send_sock, void *data, size_t size)
206 {
207 zmq_msg_t msg;
208 if (!zmq_msg_init_size(&msg, size)) {
209 memcpy((void *)zmq_msg_data(&msg), data, size);
210 if (zmq_msg_send(&msg, zmq_send_sock, 0) == -1) {
211 print_error("zmq_send() error");
212 return -1;
213 }
214 zmq_msg_close(&msg);
215 } else {
216 print_error("zmq_msg_init_size() error");
217 }
218 return 0;
219 }
220
221 static void
send_handler(void * zmq_send_sock,grn_ctx * ctx)222 send_handler(void *zmq_send_sock, grn_ctx *ctx)
223 {
224 grn_table_cursor *cur;
225 if ((cur = grn_table_cursor_open(ctx, grn_ctx_db(ctx), NULL, 0, NULL, 0,
226 0, -1, 0))) {
227 grn_id table_id;
228 while (loop && (table_id = grn_table_cursor_next(ctx, cur)) != GRN_ID_NIL) {
229 grn_obj *table;
230 if ((table = grn_ctx_at(ctx, table_id))) {
231 int name_len;
232 char name_buf[GRN_TABLE_MAX_KEY_SIZE];
233
234 name_len = grn_obj_name(ctx, table, name_buf,
235 GRN_TABLE_MAX_KEY_SIZE);
236
237 if (name_len > 5) {
238 if (table->header.type == GRN_TABLE_PAT_KEY &&
239 !memcmp(name_buf, CONST_STR_LEN("item_"))) {
240 /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
241 grn_obj *ref_table;
242 grn_table_cursor *tc;
243 grn_obj *col_last, *col_kana, *col_freq, *col_freq2,
244 *col_buzz, *col_boost;
245
246 col_kana = grn_obj_column(ctx, table, CONST_STR_LEN("kana"));
247 col_freq = grn_obj_column(ctx, table, CONST_STR_LEN("freq"));
248 col_last = grn_obj_column(ctx, table, CONST_STR_LEN("last"));
249 col_boost = grn_obj_column(ctx, table, CONST_STR_LEN("boost"));
250 col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
251 col_buzz = grn_obj_column(ctx, table, CONST_STR_LEN("buzz"));
252
253 ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_kana));
254
255 if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
256 0, 0, -1, 0))) {
257 grn_id rec_id;
258 while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
259 != GRN_ID_NIL) {
260 char *key;
261 size_t key_len;
262 msgpack_packer pk;
263 msgpack_sbuffer sbuf;
264
265 msgpack_sbuffer_init(&sbuf);
266 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
267
268 msgpack_pack_map(&pk, 8);
269
270 /* ["_key","ShortText"],["last","Time"],["kana","kana"],["freq2","Int32"],["freq","Int32"],["co","pair_all"],["buzz","Int32"],["boost","Int32"] */
271 msgpack_pack_str(&pk, 6);
272 msgpack_pack_str_body(&pk, "target", strlen("target"));
273 msgpack_pack_str(&pk, name_len);
274 msgpack_pack_str_body(&pk, name_buf, name_len);
275
276 msgpack_pack_str(&pk, 4);
277 msgpack_pack_str_body(&pk,
278 GRN_COLUMN_NAME_KEY,
279 GRN_COLUMN_NAME_KEY_LEN);
280 key_len = grn_table_cursor_get_key(ctx, tc, (void **)&key);
281 msgpack_pack_str(&pk, key_len);
282 msgpack_pack_str_body(&pk, key, key_len);
283
284 PACK_MAP_ITEM(last);
285 PACK_MAP_ITEM(kana);
286 PACK_MAP_ITEM(freq);
287 PACK_MAP_ITEM(freq2);
288 PACK_MAP_ITEM(buzz);
289 PACK_MAP_ITEM(boost);
290
291 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
292
293 usleep(SEND_WAIT);
294
295 msgpack_sbuffer_destroy(&sbuf);
296 }
297 grn_table_cursor_close(ctx, tc);
298 }
299 } else if (table->header.type == GRN_TABLE_HASH_KEY &&
300 !memcmp(name_buf, CONST_STR_LEN("pair_"))) {
301 grn_obj *ref_table;
302 grn_table_cursor *tc;
303 grn_obj *col_pre, *col_post, *col_freq0, *col_freq1, *col_freq2;
304
305 col_pre = grn_obj_column(ctx, table, CONST_STR_LEN("pre"));
306 col_post = grn_obj_column(ctx, table, CONST_STR_LEN("post"));
307 col_freq0 = grn_obj_column(ctx, table, CONST_STR_LEN("freq0"));
308 col_freq1 = grn_obj_column(ctx, table, CONST_STR_LEN("freq1"));
309 col_freq2 = grn_obj_column(ctx, table, CONST_STR_LEN("freq2"));
310
311 ref_table = grn_ctx_at(ctx, grn_obj_get_range(ctx, col_pre));
312
313 if ((tc = grn_table_cursor_open(ctx, table, NULL, 0, NULL,
314 0, 0, -1, 0))) {
315 grn_id rec_id;
316 while (loop && (rec_id = grn_table_cursor_next(ctx, tc))
317 != GRN_ID_NIL) {
318 uint64_t *key;
319 msgpack_packer pk;
320 msgpack_sbuffer sbuf;
321
322 /* skip freq0 == 0 && freq1 == 0 && freq2 == 0 */
323 {
324 grn_obj f;
325 grn_obj_get_value(ctx, col_freq0, rec_id, &f);
326 if (!GRN_INT32_VALUE(&f)) {
327 grn_obj_get_value(ctx, col_freq1, rec_id, &f);
328 if (!GRN_INT32_VALUE(&f)) {
329 grn_obj_get_value(ctx, col_freq2, rec_id, &f);
330 if (!GRN_INT32_VALUE(&f)) { continue; }
331 }
332 }
333 }
334
335 /* make pair_* message */
336 msgpack_sbuffer_init(&sbuf);
337 msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
338
339 msgpack_pack_map(&pk, 7);
340 /* ["_key","UInt64"],["pre","item_all"],["post","item_all"],["freq2","Int32"],["freq1","Int32"],["freq0","Int32"] */
341
342 msgpack_pack_str(&pk, 6);
343 msgpack_pack_str_body(&pk, "target", strlen("target"));
344 msgpack_pack_str(&pk, name_len);
345 msgpack_pack_str_body(&pk, name_buf, name_len);
346
347 msgpack_pack_str(&pk, 4);
348 msgpack_pack_str_body(&pk,
349 GRN_COLUMN_NAME_KEY,
350 GRN_COLUMN_NAME_KEY_LEN);
351 grn_table_cursor_get_key(ctx, tc, (void **)&key);
352 msgpack_pack_uint64(&pk, *key);
353
354 PACK_MAP_ITEM(pre);
355 PACK_MAP_ITEM(post);
356 PACK_MAP_ITEM(freq0);
357 PACK_MAP_ITEM(freq1);
358 PACK_MAP_ITEM(freq2);
359
360 zmq_send_to_httpd(zmq_send_sock, sbuf.data, sbuf.size);
361
362 usleep(SEND_WAIT);
363
364 msgpack_sbuffer_destroy(&sbuf);
365 }
366 grn_table_cursor_close(ctx, tc);
367 }
368 }
369 }
370 grn_obj_unlink(ctx, table);
371 }
372 }
373 grn_table_cursor_close(ctx, cur);
374 }
375 }
376
377 static void *
send_to_httpd(void * arg)378 send_to_httpd(void *arg)
379 {
380 send_thd_data *thd = arg;
381 void *zmq_send_sock;
382 if ((zmq_send_sock = zmq_socket(thd->zmq_ctx, ZMQ_PUB))) {
383 if (!zmq_bind(zmq_send_sock, thd->send_endpoint)) {
384 grn_ctx ctx;
385 if (!(grn_ctx_init(&ctx, 0))) {
386 grn_obj *db;
387 if ((db = grn_db_open(&ctx, thd->db_path))) {
388 uint64_t hwm = 1;
389 zmq_setsockopt(zmq_send_sock, ZMQ_SNDHWM, &hwm, sizeof(uint64_t));
390 while (loop) {
391 send_handler(zmq_send_sock, &ctx);
392 }
393 grn_obj_close(&ctx, db);
394 } else {
395 print_error("error in grn_db_open() on send thread.");
396 }
397 grn_ctx_fin(&ctx);
398 } else {
399 print_error("error in grn_ctx_init() on send thread.");
400 }
401 } else {
402 print_error("cannot bind zmq_socket.");
403 }
404 } else {
405 print_error("cannot create zmq_socket.");
406 }
407 return NULL;
408 }
409
410 static void
handle_msg(msgpack_object * obj,grn_ctx * ctx,grn_obj * buf)411 handle_msg(msgpack_object *obj, grn_ctx *ctx, grn_obj *buf)
412 {
413 int submit_flag = 0;
414 uint64_t millisec = 0;
415 const char *query = NULL,
416 *client_id = NULL, *learn_target_names = NULL;
417 uint32_t query_len = 0, client_id_len = 0, learn_target_names_len = 0;
418 if (obj->type == MSGPACK_OBJECT_MAP) {
419 int i;
420 for (i = 0; i < obj->via.map.size; i++) {
421 msgpack_object_kv *kv;
422 msgpack_object *key;
423 msgpack_object *value;
424 kv = &(obj->via.map.ptr[i]);
425 key = &(kv->key);
426 value = &(kv->val);
427 if (key->type == MSGPACK_OBJECT_STR && MSGPACK_OBJECT_STR_SIZE(key) > 0) {
428 switch (MSGPACK_OBJECT_STR_PTR(key)[0]) {
429 case 'i':
430 if (value->type == MSGPACK_OBJECT_STR) {
431 client_id_len = MSGPACK_OBJECT_STR_SIZE(value);
432 client_id = MSGPACK_OBJECT_STR_PTR(value);
433 }
434 break;
435 case 'q':
436 if (value->type == MSGPACK_OBJECT_STR) {
437 query_len = MSGPACK_OBJECT_STR_SIZE(value);
438 query = MSGPACK_OBJECT_STR_PTR(value);
439 }
440 break;
441 case 'l':
442 if (value->type == MSGPACK_OBJECT_STR) {
443 learn_target_names_len = MSGPACK_OBJECT_STR_SIZE(value);
444 learn_target_names = MSGPACK_OBJECT_STR_PTR(value);
445 }
446 break;
447 case 's':
448 if (kv->val.type == MSGPACK_OBJECT_POSITIVE_INTEGER) {
449 millisec = kv->val.via.u64;
450 }
451 break;
452 case 't':
453 if (kv->val.type == MSGPACK_OBJECT_BOOLEAN) {
454 submit_flag = (kv->val.via.boolean ? 1 : 0);
455 }
456 break;
457 default:
458 break;
459 }
460 }
461 }
462 load_to_multi_targets(ctx, buf, query, query_len,
463 client_id, client_id_len,
464 learn_target_names, learn_target_names_len,
465 millisec, submit_flag);
466 }
467 }
468
469 static void
recv_event_loop(msgpack_zone * mempool,void * zmq_sock,grn_ctx * ctx)470 recv_event_loop(msgpack_zone *mempool, void *zmq_sock, grn_ctx *ctx)
471 {
472 grn_obj buf;
473 zmq_pollitem_t items[] = {
474 { zmq_sock, 0, ZMQ_POLLIN, 0}
475 };
476 GRN_TEXT_INIT(&buf, 0);
477 while (loop) {
478 zmq_poll(items, 1, 10000);
479 if (items[0].revents & ZMQ_POLLIN) { /* always true */
480 zmq_msg_t msg;
481 if (zmq_msg_init(&msg)) {
482 print_error("cannot init zmq message.");
483 } else {
484 if (zmq_msg_recv(&msg, zmq_sock, 0) == -1) {
485 print_error("cannot recv zmq message.");
486 } else {
487 msgpack_object obj;
488 msgpack_unpack_return ret;
489 ret = msgpack_unpack(zmq_msg_data(&msg), zmq_msg_size(&msg), NULL, mempool, &obj);
490 if (MSGPACK_UNPACK_SUCCESS == ret) {
491 /* msgpack_object_print(stdout, obj); */
492 handle_msg(&obj, ctx, &buf);
493 }
494 msgpack_zone_clear(mempool);
495 }
496 zmq_msg_close(&msg);
497 }
498 }
499 }
500 grn_obj_unlink(ctx, &buf);
501 }
502
503 struct _suggest_log_file {
504 FILE *fp;
505 char *path;
506 uint64_t line;
507 /* datas from one line */
508 int submit;
509 char *query;
510 uint64_t millisec;
511 char *client_id;
512 char *learn_target_name;
513 /* link list */
514 struct _suggest_log_file *next;
515 };
516 typedef struct _suggest_log_file suggest_log_file;
517
518 #if 0
519 static void
520 print_log_file_list(suggest_log_file *list)
521 {
522 while (list) {
523 printf("fp:%p millisec:%" PRIu64 " next:%p\n",
524 list->fp, list->millisec, list->next);
525 list = list->next;
526 }
527 }
528 #endif
529
530 static void
free_log_line_data(suggest_log_file * l)531 free_log_line_data(suggest_log_file *l)
532 {
533 if (l->query) {
534 free(l->query);
535 l->query = NULL;
536 }
537 if (l->client_id) {
538 free(l->client_id);
539 l->client_id = NULL;
540 }
541 if (l->learn_target_name) {
542 free(l->learn_target_name);
543 l->learn_target_name = NULL;
544 }
545 }
546
547 #define MAX_LOG_LENGTH 0x2000
548
549 static void
read_log_line(suggest_log_file ** list)550 read_log_line(suggest_log_file **list)
551 {
552 suggest_log_file *t = *list;
553 char line_buf[MAX_LOG_LENGTH];
554 while (1) {
555 free_log_line_data(t);
556 if (fgets(line_buf, MAX_LOG_LENGTH, t->fp)) {
557 char *eol;
558 t->line++;
559 if ((eol = strrchr(line_buf, '\n'))) {
560 const char *query, *types, *client_id, *learn_target_name;
561 struct evkeyvalq get_args;
562 *eol = '\0';
563 evhttp_parse_query(line_buf, &get_args);
564 parse_keyval(NULL,
565 &get_args, &query, &types, &client_id, NULL,
566 &learn_target_name, NULL, &(t->millisec), NULL, NULL, NULL,
567 NULL);
568 if (query && client_id && learn_target_name && t->millisec) {
569 t->query = evhttp_decode_uri(query);
570 t->submit = (types && !strcmp(types, "submit"));
571 t->client_id = evhttp_decode_uri(client_id);
572 t->learn_target_name = evhttp_decode_uri(learn_target_name);
573 evhttp_clear_headers(&get_args);
574 break;
575 }
576 print_error("invalid line path:%s line:%" PRIu64,
577 t->path, t->line);
578 evhttp_clear_headers(&get_args);
579 } else {
580 /* read until new line */
581 while (1) {
582 int c = fgetc(t->fp);
583 if (c == '\n' || c == EOF) { break; }
584 }
585 }
586 } else {
587 /* terminate reading log */
588 fclose(t->fp);
589 free(t->path);
590 *list = t->next;
591 free(t);
592 break;
593 }
594 }
595 }
596
597 /* re-sorting by list->millisec asc with moving a head item. */
598 static void
sort_log_file_list(suggest_log_file ** list)599 sort_log_file_list(suggest_log_file **list)
600 {
601 suggest_log_file *p, *target;
602 target = *list;
603 if (!target || !target->next || target->millisec < target->next->millisec) {
604 return;
605 }
606 *list = target->next;
607 for (p = *list; p; p = p->next) {
608 if (!p->next || target->millisec > p->next->millisec) {
609 target->next = p->next;
610 p->next = target;
611 return;
612 }
613 }
614 }
615
616 #define PATH_SEPARATOR '/'
617
618 static suggest_log_file *
gather_log_file(const char * dir_path,unsigned int dir_path_len)619 gather_log_file(const char *dir_path, unsigned int dir_path_len)
620 {
621 DIR *dir;
622 struct dirent *dirent;
623 char path[PATH_MAX + 1];
624 suggest_log_file *list = NULL;
625 if (!(dir = opendir(dir_path))) {
626 print_error("cannot open log directory.");
627 return NULL;
628 }
629 memcpy(path, dir_path, dir_path_len);
630 path[dir_path_len] = PATH_SEPARATOR;
631 while ((dirent = readdir(dir))) {
632 struct stat fstat;
633 unsigned int d_namlen, path_len;
634 if (*(dirent->d_name) == '.' && (
635 dirent->d_name[1] == '\0' ||
636 (dirent->d_name[1] == '.' && dirent->d_name[2] == '\0'))) {
637 continue;
638 }
639 d_namlen = strlen(dirent->d_name);
640 path_len = dir_path_len + 1 + d_namlen;
641 if (dir_path_len + d_namlen >= PATH_MAX) { continue; }
642 memcpy(path + dir_path_len + 1, dirent->d_name, d_namlen);
643 path[path_len] = '\0';
644 lstat(path, &fstat);
645 if (S_ISDIR(fstat.st_mode)) {
646 gather_log_file(path, path_len);
647 } else {
648 suggest_log_file *p = calloc(1, sizeof(suggest_log_file));
649 if (!(p->fp = fopen(path, "r"))) {
650 free(p);
651 } else {
652 if (list) {
653 p->next = list;
654 }
655 p->path = strdup(path);
656 list = p;
657 read_log_line(&list);
658 sort_log_file_list(&list);
659 }
660 }
661 /* print_log_file_list(list); */
662 }
663 return list;
664 }
665
666 static void
load_log(grn_ctx * ctx,const char * log_dir_name)667 load_log(grn_ctx *ctx, const char *log_dir_name)
668 {
669 grn_obj buf;
670 suggest_log_file *list;
671 GRN_TEXT_INIT(&buf, 0);
672 list = gather_log_file(log_dir_name, strlen(log_dir_name));
673 while (list) {
674 /*
675 printf("file:%s line:%" PRIu64 " query:%s millisec:%" PRIu64 "\n",
676 list->path, list->line, list->query, list->millisec);
677 */
678 load_to_multi_targets(ctx, &buf,
679 list->query, strlen(list->query),
680 list->client_id, strlen(list->client_id),
681 list->learn_target_name, strlen(list->learn_target_name),
682 list->millisec,
683 list->submit);
684 read_log_line(&list);
685 sort_log_file_list(&list);
686 }
687 grn_obj_close(ctx, &buf);
688 }
689
690 static void
usage(FILE * output)691 usage(FILE *output)
692 {
693 fprintf(output,
694 "Usage: groonga-suggest-learner [options...] db_path\n"
695 "options:\n"
696 " -r <recv endpoint>: recv endpoint (default: %s)\n"
697 " --receive-endpoint <recv endpoint>\n"
698 "\n"
699 " -s <send endpoint>: send endpoint (default: %s)\n"
700 " --send-endpoint <send endpoint>\n"
701 "\n"
702 " -l <log directory>: load from log files made on webserver.\n"
703 " --log-base-path <log directory>\n"
704 "\n"
705 " --log-path <path> : output logs to <path>\n"
706 " --log-level <level> : set log level to <level> (default: %d)\n"
707 " -d, --daemon : daemonize\n",
708 DEFAULT_RECV_ENDPOINT, DEFAULT_SEND_ENDPOINT,
709 GRN_LOG_DEFAULT_LEVEL);
710 }
711
712 static void
signal_handler(int sig)713 signal_handler(int sig)
714 {
715 loop = 0;
716 }
717
718 int
main(int argc,char ** argv)719 main(int argc, char **argv)
720 {
721 run_mode mode = RUN_MODE_NONE;
722 int n_processed_args;
723 const char *recv_endpoint = DEFAULT_RECV_ENDPOINT;
724 const char *send_endpoint = DEFAULT_SEND_ENDPOINT;
725 const char *log_base_path = NULL;
726 const char *db_path = NULL;
727
728 /* parse options */
729 {
730 int flags = mode;
731 const char *log_path = NULL;
732 const char *log_level = NULL;
733 static grn_str_getopt_opt opts[] = {
734 {'r', "receive-endpoint", NULL, 0, GETOPT_OP_NONE},
735 {'s', "send-endpoint", NULL, 0, GETOPT_OP_NONE},
736 {'l', "log-base-path", NULL, 0, GETOPT_OP_NONE},
737 {'\0', "log-path", NULL, 0, GETOPT_OP_NONE},
738 {'\0', "log-level", NULL, 0, GETOPT_OP_NONE},
739 {'d', "daemon", NULL, RUN_MODE_DAEMON, GETOPT_OP_UPDATE},
740 {'h', "help", NULL, RUN_MODE_USAGE, GETOPT_OP_UPDATE},
741 {'\0', NULL, NULL, 0, 0}
742 };
743 opts[0].arg = &recv_endpoint;
744 opts[1].arg = &send_endpoint;
745 opts[2].arg = &log_base_path;
746 opts[3].arg = &log_path;
747 opts[4].arg = &log_level;
748
749 n_processed_args = grn_str_getopt(argc, argv, opts, &flags);
750
751 if (log_path) {
752 grn_default_logger_set_path(log_path);
753 }
754
755 if (log_level) {
756 const char * const end = log_level + strlen(log_level);
757 const char *rest = NULL;
758 const int value = grn_atoi(log_level, end, &rest);
759 if (end != rest || value < 0 || value > 9) {
760 fprintf(stderr, "invalid log level: <%s>\n", log_level);
761 return EXIT_FAILURE;
762 }
763 grn_default_logger_set_max_level(value);
764 }
765
766 mode = (flags & RUN_MODE_MASK);
767
768 if (mode & RUN_MODE_USAGE) {
769 usage(stdout);
770 return EXIT_SUCCESS;
771 }
772
773 if ((n_processed_args < 0) ||
774 (argc - n_processed_args) != 1) {
775 usage(stderr);
776 }
777
778 db_path = argv[n_processed_args];
779 }
780
781 /* main */
782 {
783 grn_ctx *ctx;
784 msgpack_zone *mempool;
785
786 if (mode == RUN_MODE_DAEMON) {
787 daemonize();
788 }
789
790 grn_init();
791
792 ctx = grn_ctx_open(0);
793 if (!(grn_db_open(ctx, db_path))) {
794 print_error("cannot open database.");
795 } else {
796 if (log_base_path) {
797 /* loading log mode */
798 load_log(ctx, log_base_path);
799 } else {
800 /* zeromq/msgpack recv mode */
801 if (!(mempool = msgpack_zone_new(MSGPACK_ZONE_CHUNK_SIZE))) {
802 print_error("cannot create msgpack zone.");
803 } else {
804 void *zmq_ctx, *zmq_recv_sock;
805 if (!(zmq_ctx = zmq_init(1))) {
806 print_error("cannot create zmq context.");
807 } else {
808 if (!(zmq_recv_sock = zmq_socket(zmq_ctx, ZMQ_SUB))) {
809 print_error("cannot create zmq_socket.");
810 } else if (zmq_bind(zmq_recv_sock, recv_endpoint)) {
811 print_error("cannot bind zmq_socket.");
812 } else {
813 send_thd_data thd;
814
815 signal(SIGTERM, signal_handler);
816 signal(SIGINT, signal_handler);
817 signal(SIGQUIT, signal_handler);
818
819 zmq_setsockopt(zmq_recv_sock, ZMQ_SUBSCRIBE, "", 0);
820 thd.db_path = db_path;
821 thd.send_endpoint = send_endpoint;
822 thd.zmq_ctx = zmq_ctx;
823
824 if (pthread_create(&(thd.thd), NULL, send_to_httpd, &thd)) {
825 print_error("error in pthread_create() for sending datas.");
826 }
827 recv_event_loop(mempool, zmq_recv_sock, ctx);
828 if (pthread_join(thd.thd, NULL)) {
829 print_error("error in pthread_join() for waiting completion of sending data.");
830 }
831 }
832 zmq_term(zmq_ctx);
833 }
834 msgpack_zone_free(mempool);
835 }
836 }
837 }
838 grn_obj_close(ctx, grn_ctx_db(ctx));
839 grn_ctx_fin(ctx);
840 grn_fin();
841 }
842 return 0;
843 }
844