1 /*****************************************************************************
2
3 Copyright (c) 2007, 2017, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /******************************************************************//**
28 @file fts/fts0opt.cc
29 Full Text Search optimize thread
30
31 Created 2007/03/27 Sunny Bains
32 Completed 2011/7/10 Sunny and Jimmy Yang
33
34 ***********************************************************************/
35
36 #include "fts0fts.h"
37 #include "row0sel.h"
38 #include "que0types.h"
39 #include "fts0priv.h"
40 #include "fts0types.h"
41 #include "ut0wqueue.h"
42 #include "srv0start.h"
43 #include "zlib.h"
44
45 #ifndef UNIV_NONINL
46 #include "fts0types.ic"
47 #include "fts0vlc.ic"
48 #endif
49
50 /** The FTS optimize thread's work queue. */
51 static ib_wqueue_t* fts_optimize_wq;
52
53 /** Time to wait for a message. */
54 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
55
56 /** Default optimize interval in secs. */
57 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
58
59 /** Server is shutting down, so does we exiting the optimize thread */
60 static bool fts_opt_start_shutdown = false;
61
62 /** Initial size of nodes in fts_word_t. */
63 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
64
65 /** Last time we did check whether system need a sync */
66 static ib_time_t last_check_sync_time;
67
68 #if 0
69 /** Check each table in round robin to see whether they'd
70 need to be "optimized" */
71 static ulint fts_optimize_sync_iterator = 0;
72 #endif
73
74 /** State of a table within the optimization sub system. */
75 enum fts_state_t {
76 FTS_STATE_LOADED,
77 FTS_STATE_RUNNING,
78 FTS_STATE_SUSPENDED,
79 FTS_STATE_DONE,
80 FTS_STATE_EMPTY
81 };
82
83 /** FTS optimize thread message types. */
84 enum fts_msg_type_t {
85 FTS_MSG_START, /*!< Start optimizing thread */
86
87 FTS_MSG_PAUSE, /*!< Pause optimizing thread */
88
89 FTS_MSG_STOP, /*!< Stop optimizing and exit thread */
90
91 FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
92 work queue */
93
94 FTS_MSG_OPTIMIZE_TABLE, /*!< Optimize a table */
95
96 FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
97 threads work queue */
98 FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
99 };
100
101 /** Compressed list of words that have been read from FTS INDEX
102 that needs to be optimized. */
103 struct fts_zip_t {
104 lint status; /*!< Status of (un)/zip operation */
105
106 ulint n_words; /*!< Number of words compressed */
107
108 ulint block_sz; /*!< Size of a block in bytes */
109
110 ib_vector_t* blocks; /*!< Vector of compressed blocks */
111
112 ib_alloc_t* heap_alloc; /*!< Heap to use for allocations */
113
114 ulint pos; /*!< Offset into blocks */
115
116 ulint last_big_block; /*!< Offset of last block in the
117 blocks array that is of size
118 block_sz. Blocks beyond this offset
119 are of size FTS_MAX_WORD_LEN */
120
121 z_streamp zp; /*!< ZLib state */
122
123 /*!< The value of the last word read
124 from the FTS INDEX table. This is
125 used to discard duplicates */
126
127 fts_string_t word; /*!< UTF-8 string */
128
129 ulint max_words; /*!< maximum number of words to read
130 in one pase */
131 };
132
133 /** Prepared statemets used during optimize */
134 struct fts_optimize_graph_t {
135 /*!< Delete a word from FTS INDEX */
136 que_t* delete_nodes_graph;
137 /*!< Insert a word into FTS INDEX */
138 que_t* write_nodes_graph;
139 /*!< COMMIT a transaction */
140 que_t* commit_graph;
141 /*!< Read the nodes from FTS_INDEX */
142 que_t* read_nodes_graph;
143 };
144
145 /** Used by fts_optimize() to store state. */
146 struct fts_optimize_t {
147 trx_t* trx; /*!< The transaction used for all SQL */
148
149 ib_alloc_t* self_heap; /*!< Heap to use for allocations */
150
151 char* name_prefix; /*!< FTS table name prefix */
152
153 fts_table_t fts_index_table;/*!< Common table definition */
154
155 /*!< Common table definition */
156 fts_table_t fts_common_table;
157
158 dict_table_t* table; /*!< Table that has to be queried */
159
160 dict_index_t* index; /*!< The FTS index to be optimized */
161
162 fts_doc_ids_t* to_delete; /*!< doc ids to delete, we check against
163 this vector and purge the matching
164 entries during the optimizing
165 process. The vector entries are
166 sorted on doc id */
167
168 ulint del_pos; /*!< Offset within to_delete vector,
169 this is used to keep track of where
170 we are up to in the vector */
171
172 ibool done; /*!< TRUE when optimize finishes */
173
174 ib_vector_t* words; /*!< Word + Nodes read from FTS_INDEX,
175 it contains instances of fts_word_t */
176
177 fts_zip_t* zip; /*!< Words read from the FTS_INDEX */
178
179 fts_optimize_graph_t /*!< Prepared statements used during */
180 graph; /*optimize */
181
182 ulint n_completed; /*!< Number of FTS indexes that have
183 been optimized */
184 ibool del_list_regenerated;
185 /*!< BEING_DELETED list regenarated */
186 };
187
188 /** Used by the optimize, to keep state during compacting nodes. */
189 struct fts_encode_t {
190 doc_id_t src_last_doc_id;/*!< Last doc id read from src node */
191 byte* src_ilist_ptr; /*!< Current ptr within src ilist */
192 };
193
194 /** We use this information to determine when to start the optimize
195 cycle for a table. */
196 struct fts_slot_t {
197 dict_table_t* table; /*!< Table to optimize */
198
199 table_id_t table_id; /*!< Table id */
200
201 fts_state_t state; /*!< State of this slot */
202
203 ulint added; /*!< Number of doc ids added since the
204 last time this table was optimized */
205
206 ulint deleted; /*!< Number of doc ids deleted since the
207 last time this table was optimized */
208
209 ib_time_t last_run; /*!< Time last run completed */
210
211 ib_time_t completed; /*!< Optimize finish time */
212
213 ib_time_t interval_time; /*!< Minimum time to wait before
214 optimizing the table again. */
215 };
216
217 /** A table remove message for the FTS optimize thread. */
218 struct fts_msg_del_t {
219 dict_table_t* table; /*!< The table to remove */
220
221 os_event_t event; /*!< Event to synchronize acknowledgement
222 of receipt and processing of the
223 this message by the consumer */
224 };
225
226 /** Stop the optimize thread. */
227 struct fts_msg_optimize_t {
228 dict_table_t* table; /*!< Table to optimize */
229 };
230
231 /** The FTS optimize message work queue message type. */
232 struct fts_msg_t {
233 fts_msg_type_t type; /*!< Message type */
234
235 void* ptr; /*!< The message contents */
236
237 mem_heap_t* heap; /*!< The heap used to allocate this
238 message, the message consumer will
239 free the heap. */
240 };
241
242 /** The number of words to read and optimize in a single pass. */
243 UNIV_INTERN ulong fts_num_word_optimize;
244
245 // FIXME
246 UNIV_INTERN char fts_enable_diag_print;
247
248 /** ZLib compressed block size.*/
249 static ulint FTS_ZIP_BLOCK_SIZE = 1024;
250
251 /** The amount of time optimizing in a single pass, in milliseconds. */
252 static ib_time_t fts_optimize_time_limit = 0;
253
254 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
255 static const char* fts_init_delete_sql =
256 "BEGIN\n"
257 "\n"
258 "INSERT INTO \"%s_BEING_DELETED\"\n"
259 "SELECT doc_id FROM \"%s_DELETED\";\n"
260 "\n"
261 "INSERT INTO \"%s_BEING_DELETED_CACHE\"\n"
262 "SELECT doc_id FROM \"%s_DELETED_CACHE\";\n";
263
264 static const char* fts_delete_doc_ids_sql =
265 "BEGIN\n"
266 "\n"
267 "DELETE FROM \"%s_DELETED\" WHERE doc_id = :doc_id1;\n"
268 "DELETE FROM \"%s_DELETED_CACHE\" WHERE doc_id = :doc_id2;\n";
269
270 static const char* fts_end_delete_sql =
271 "BEGIN\n"
272 "\n"
273 "DELETE FROM \"%s_BEING_DELETED\";\n"
274 "DELETE FROM \"%s_BEING_DELETED_CACHE\";\n";
275
276 /**********************************************************************//**
277 Initialize fts_zip_t. */
278 static
279 void
fts_zip_initialize(fts_zip_t * zip)280 fts_zip_initialize(
281 /*===============*/
282 fts_zip_t* zip) /*!< out: zip instance to initialize */
283 {
284 zip->pos = 0;
285 zip->n_words = 0;
286
287 zip->status = Z_OK;
288
289 zip->last_big_block = 0;
290
291 zip->word.f_len = 0;
292 memset(zip->word.f_str, 0, FTS_MAX_WORD_LEN);
293
294 ib_vector_reset(zip->blocks);
295
296 memset(zip->zp, 0, sizeof(*zip->zp));
297 }
298
299 /**********************************************************************//**
300 Create an instance of fts_zip_t.
301 @return a new instance of fts_zip_t */
302 static
303 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)304 fts_zip_create(
305 /*===========*/
306 mem_heap_t* heap, /*!< in: heap */
307 ulint block_sz, /*!< in: size of a zip block.*/
308 ulint max_words) /*!< in: max words to read */
309 {
310 fts_zip_t* zip;
311
312 zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
313
314 zip->word.f_str = static_cast<byte*>(
315 mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
316
317 zip->block_sz = block_sz;
318
319 zip->heap_alloc = ib_heap_allocator_create(heap);
320
321 zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
322
323 zip->max_words = max_words;
324
325 zip->zp = static_cast<z_stream*>(
326 mem_heap_zalloc(heap, sizeof(*zip->zp)));
327
328 return(zip);
329 }
330
331 /**********************************************************************//**
332 Initialize an instance of fts_zip_t. */
333 static
334 void
fts_zip_init(fts_zip_t * zip)335 fts_zip_init(
336 /*=========*/
337
338 fts_zip_t* zip) /*!< in: zip instance to init */
339 {
340 memset(zip->zp, 0, sizeof(*zip->zp));
341
342 zip->word.f_len = 0;
343 *zip->word.f_str = '\0';
344 }
345
346 /**********************************************************************//**
347 Create a fts_optimizer_word_t instance.
348 @return new instance */
349 UNIV_INTERN
350 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)351 fts_word_init(
352 /*==========*/
353 fts_word_t* word, /*!< in: word to initialize */
354 byte* utf8, /*!< in: UTF-8 string */
355 ulint len) /*!< in: length of string in bytes */
356 {
357 mem_heap_t* heap = mem_heap_create(sizeof(fts_node_t));
358
359 memset(word, 0, sizeof(*word));
360
361 word->text.f_len = len;
362 word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
363
364 /* Need to copy the NUL character too. */
365 memcpy(word->text.f_str, utf8, word->text.f_len);
366 word->text.f_str[word->text.f_len] = 0;
367
368 word->heap_alloc = ib_heap_allocator_create(heap);
369
370 word->nodes = ib_vector_create(
371 word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
372
373 return(word);
374 }
375
376 /**********************************************************************//**
377 Read the FTS INDEX row.
378 @return fts_node_t instance */
379 static
380 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)381 fts_optimize_read_node(
382 /*===================*/
383 fts_word_t* word, /*!< in: */
384 que_node_t* exp) /*!< in: */
385 {
386 int i;
387 fts_node_t* node = static_cast<fts_node_t*>(
388 ib_vector_push(word->nodes, NULL));
389
390 /* Start from 1 since the first node has been read by the caller */
391 for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
392
393 dfield_t* dfield = que_node_get_val(exp);
394 byte* data = static_cast<byte*>(
395 dfield_get_data(dfield));
396 ulint len = dfield_get_len(dfield);
397
398 ut_a(len != UNIV_SQL_NULL);
399
400 /* Note: The column numbers below must match the SELECT */
401 switch (i) {
402 case 1: /* DOC_COUNT */
403 node->doc_count = mach_read_from_4(data);
404 break;
405
406 case 2: /* FIRST_DOC_ID */
407 node->first_doc_id = fts_read_doc_id(data);
408 break;
409
410 case 3: /* LAST_DOC_ID */
411 node->last_doc_id = fts_read_doc_id(data);
412 break;
413
414 case 4: /* ILIST */
415 node->ilist_size_alloc = node->ilist_size = len;
416 node->ilist = static_cast<byte*>(ut_malloc(len));
417 memcpy(node->ilist, data, len);
418 break;
419
420 default:
421 ut_error;
422 }
423 }
424
425 /* Make sure all columns were read. */
426 ut_a(i == 5);
427
428 return(node);
429 }
430
431 /**********************************************************************//**
432 Callback function to fetch the rows in an FTS INDEX record.
433 @return always returns non-NULL */
434 UNIV_INTERN
435 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)436 fts_optimize_index_fetch_node(
437 /*==========================*/
438 void* row, /*!< in: sel_node_t* */
439 void* user_arg) /*!< in: pointer to ib_vector_t */
440 {
441 fts_word_t* word;
442 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
443 fts_fetch_t* fetch = static_cast<fts_fetch_t*>(user_arg);
444 ib_vector_t* words = static_cast<ib_vector_t*>(fetch->read_arg);
445 que_node_t* exp = sel_node->select_list;
446 dfield_t* dfield = que_node_get_val(exp);
447 void* data = dfield_get_data(dfield);
448 ulint dfield_len = dfield_get_len(dfield);
449 fts_node_t* node;
450 bool is_word_init = false;
451
452 ut_a(dfield_len <= FTS_MAX_WORD_LEN);
453
454 if (ib_vector_size(words) == 0) {
455
456 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
457 fts_word_init(word, (byte*) data, dfield_len);
458 is_word_init = true;
459 }
460
461 word = static_cast<fts_word_t*>(ib_vector_last(words));
462
463 if (dfield_len != word->text.f_len
464 || memcmp(word->text.f_str, data, dfield_len)) {
465
466 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
467 fts_word_init(word, (byte*) data, dfield_len);
468 is_word_init = true;
469 }
470
471 node = fts_optimize_read_node(word, que_node_get_next(exp));
472
473 fetch->total_memory += node->ilist_size;
474 if (is_word_init) {
475 fetch->total_memory += sizeof(fts_word_t)
476 + sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
477 + sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
478 } else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
479 fetch->total_memory += sizeof(fts_node_t);
480 }
481
482 if (fetch->total_memory >= fts_result_cache_limit) {
483 return(FALSE);
484 }
485
486 return(TRUE);
487 }
488
489 /**********************************************************************//**
490 Read the rows from the FTS inde.
491 @return DB_SUCCESS or error code */
492 UNIV_INTERN
493 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)494 fts_index_fetch_nodes(
495 /*==================*/
496 trx_t* trx, /*!< in: transaction */
497 que_t** graph, /*!< in: prepared statement */
498 fts_table_t* fts_table, /*!< in: table of the FTS INDEX */
499 const fts_string_t*
500 word, /*!< in: the word to fetch */
501 fts_fetch_t* fetch) /*!< in: fetch callback.*/
502 {
503 pars_info_t* info;
504 dberr_t error;
505
506 trx->op_info = "fetching FTS index nodes";
507
508 if (*graph) {
509 info = (*graph)->info;
510 } else {
511 info = pars_info_create();
512 }
513
514 pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
515 pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
516
517 if (!*graph) {
518 ulint selected;
519
520 ut_a(fts_table->type == FTS_INDEX_TABLE);
521
522 selected = fts_select_index(fts_table->charset,
523 word->f_str, word->f_len);
524
525 fts_table->suffix = fts_get_suffix(selected);
526
527 *graph = fts_parse_sql(
528 fts_table,
529 info,
530 "DECLARE FUNCTION my_func;\n"
531 "DECLARE CURSOR c IS"
532 " SELECT word, doc_count, first_doc_id, last_doc_id, "
533 "ilist\n"
534 " FROM \"%s\"\n"
535 " WHERE word LIKE :word\n"
536 " ORDER BY first_doc_id;\n"
537 "BEGIN\n"
538 "\n"
539 "OPEN c;\n"
540 "WHILE 1 = 1 LOOP\n"
541 " FETCH c INTO my_func();\n"
542 " IF c % NOTFOUND THEN\n"
543 " EXIT;\n"
544 " END IF;\n"
545 "END LOOP;\n"
546 "CLOSE c;");
547 }
548
549 for(;;) {
550 error = fts_eval_sql(trx, *graph);
551
552 if (error == DB_SUCCESS) {
553 fts_sql_commit(trx);
554
555 break; /* Exit the loop. */
556 } else {
557 fts_sql_rollback(trx);
558
559 ut_print_timestamp(stderr);
560
561 if (error == DB_LOCK_WAIT_TIMEOUT) {
562 fprintf(stderr, " InnoDB: Warning: lock wait "
563 "timeout reading FTS index. "
564 "Retrying!\n");
565
566 trx->error_state = DB_SUCCESS;
567 } else {
568 fprintf(stderr, " InnoDB: Error: (%s) "
569 "while reading FTS index.\n",
570 ut_strerr(error));
571
572 break; /* Exit the loop. */
573 }
574 }
575 }
576
577 return(error);
578 }
579
580 /**********************************************************************//**
581 Read a word */
582 static
583 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)584 fts_zip_read_word(
585 /*==============*/
586 fts_zip_t* zip, /*!< in: Zip state + data */
587 fts_string_t* word) /*!< out: uncompressed word */
588 {
589 short len = 0;
590 void* null = NULL;
591 byte* ptr = word->f_str;
592 int flush = Z_NO_FLUSH;
593
594 /* Either there was an error or we are at the Z_STREAM_END. */
595 if (zip->status != Z_OK) {
596 return(NULL);
597 }
598
599 zip->zp->next_out = reinterpret_cast<byte*>(&len);
600 zip->zp->avail_out = sizeof(len);
601
602 while (zip->status == Z_OK && zip->zp->avail_out > 0) {
603
604 /* Finished decompressing block. */
605 if (zip->zp->avail_in == 0) {
606
607 /* Free the block thats been decompressed. */
608 if (zip->pos > 0) {
609 ulint prev = zip->pos - 1;
610
611 ut_a(zip->pos < ib_vector_size(zip->blocks));
612
613 ut_free(ib_vector_getp(zip->blocks, prev));
614 ib_vector_set(zip->blocks, prev, &null);
615 }
616
617 /* Any more blocks to decompress. */
618 if (zip->pos < ib_vector_size(zip->blocks)) {
619
620 zip->zp->next_in = static_cast<byte*>(
621 ib_vector_getp(
622 zip->blocks, zip->pos));
623
624 if (zip->pos > zip->last_big_block) {
625 zip->zp->avail_in =
626 FTS_MAX_WORD_LEN;
627 } else {
628 zip->zp->avail_in = static_cast<uInt>(zip->block_sz);
629 }
630
631 ++zip->pos;
632 } else {
633 flush = Z_FINISH;
634 }
635 }
636
637 switch (zip->status = inflate(zip->zp, flush)) {
638 case Z_OK:
639 if (zip->zp->avail_out == 0 && len > 0) {
640
641 ut_a(len <= FTS_MAX_WORD_LEN);
642 ptr[len] = 0;
643
644 zip->zp->next_out = ptr;
645 zip->zp->avail_out = len;
646
647 word->f_len = len;
648 len = 0;
649 }
650 break;
651
652 case Z_BUF_ERROR: /* No progress possible. */
653 case Z_STREAM_END:
654 inflateEnd(zip->zp);
655 break;
656
657 case Z_STREAM_ERROR:
658 default:
659 ut_error;
660 }
661 }
662
663 /* All blocks must be freed at end of inflate. */
664 if (zip->status != Z_OK) {
665 for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
666 if (ib_vector_getp(zip->blocks, i)) {
667 ut_free(ib_vector_getp(zip->blocks, i));
668 ib_vector_set(zip->blocks, i, &null);
669 }
670 }
671 }
672
673 if (ptr != NULL) {
674 ut_ad(word->f_len == strlen((char*) ptr));
675 }
676
677 return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
678 }
679
680 /**********************************************************************//**
681 Callback function to fetch and compress the word in an FTS
682 INDEX record.
683 @return FALSE on EOF */
684 static
685 ibool
fts_fetch_index_words(void * row,void * user_arg)686 fts_fetch_index_words(
687 /*==================*/
688 void* row, /*!< in: sel_node_t* */
689 void* user_arg) /*!< in: pointer to ib_vector_t */
690 {
691 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
692 fts_zip_t* zip = static_cast<fts_zip_t*>(user_arg);
693 que_node_t* exp = sel_node->select_list;
694 dfield_t* dfield = que_node_get_val(exp);
695 short len = static_cast<short>(dfield_get_len(dfield));
696 void* data = dfield_get_data(dfield);
697
698 /* Skip the duplicate words. */
699 if (zip->word.f_len == static_cast<ulint>(len)
700 && !memcmp(zip->word.f_str, data, len)) {
701
702 return(TRUE);
703 }
704
705 ut_a(len <= FTS_MAX_WORD_LEN);
706
707 memcpy(zip->word.f_str, data, len);
708 zip->word.f_len = len;
709
710 ut_a(zip->zp->avail_in == 0);
711 ut_a(zip->zp->next_in == NULL);
712
713 /* The string is prefixed by len. */
714 zip->zp->next_in = reinterpret_cast<byte*>(&len);
715 zip->zp->avail_in = sizeof(len);
716
717 /* Compress the word, create output blocks as necessary. */
718 while (zip->zp->avail_in > 0) {
719
720 /* No space left in output buffer, create a new one. */
721 if (zip->zp->avail_out == 0) {
722 byte* block;
723
724 block = static_cast<byte*>(ut_malloc(zip->block_sz));
725 ib_vector_push(zip->blocks, &block);
726
727 zip->zp->next_out = block;
728 zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
729 }
730
731 switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
732 case Z_OK:
733 if (zip->zp->avail_in == 0) {
734 zip->zp->next_in = static_cast<byte*>(data);
735 zip->zp->avail_in = len;
736 ut_a(len <= FTS_MAX_WORD_LEN);
737 len = 0;
738 }
739 break;
740
741 case Z_STREAM_END:
742 case Z_BUF_ERROR:
743 case Z_STREAM_ERROR:
744 default:
745 ut_error;
746 break;
747 }
748 }
749
750 /* All data should have been compressed. */
751 ut_a(zip->zp->avail_in == 0);
752 zip->zp->next_in = NULL;
753
754 ++zip->n_words;
755
756 return(zip->n_words >= zip->max_words ? FALSE : TRUE);
757 }
758
759 /**********************************************************************//**
760 Finish Zip deflate. */
761 static
762 void
fts_zip_deflate_end(fts_zip_t * zip)763 fts_zip_deflate_end(
764 /*================*/
765 fts_zip_t* zip) /*!< in: instance that should be closed*/
766 {
767 ut_a(zip->zp->avail_in == 0);
768 ut_a(zip->zp->next_in == NULL);
769
770 zip->status = deflate(zip->zp, Z_FINISH);
771
772 ut_a(ib_vector_size(zip->blocks) > 0);
773 zip->last_big_block = ib_vector_size(zip->blocks) - 1;
774
775 /* Allocate smaller block(s), since this is trailing data. */
776 while (zip->status == Z_OK) {
777 byte* block;
778
779 ut_a(zip->zp->avail_out == 0);
780
781 block = static_cast<byte*>(ut_malloc(FTS_MAX_WORD_LEN + 1));
782 ib_vector_push(zip->blocks, &block);
783
784 zip->zp->next_out = block;
785 zip->zp->avail_out = FTS_MAX_WORD_LEN;
786
787 zip->status = deflate(zip->zp, Z_FINISH);
788 }
789
790 ut_a(zip->status == Z_STREAM_END);
791
792 zip->status = deflateEnd(zip->zp);
793 ut_a(zip->status == Z_OK);
794
795 /* Reset the ZLib data structure. */
796 memset(zip->zp, 0, sizeof(*zip->zp));
797 }
798
799 /**********************************************************************//**
800 Read the words from the FTS INDEX.
801 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
802 to search else error code */
803 static MY_ATTRIBUTE((nonnull, warn_unused_result))
804 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)805 fts_index_fetch_words(
806 /*==================*/
807 fts_optimize_t* optim, /*!< in: optimize scratch pad */
808 const fts_string_t* word, /*!< in: get words greater than this
809 word */
810 ulint n_words)/*!< in: max words to read */
811 {
812 pars_info_t* info;
813 que_t* graph;
814 ulint selected;
815 fts_zip_t* zip = NULL;
816 dberr_t error = DB_SUCCESS;
817 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
818 ibool inited = FALSE;
819
820 optim->trx->op_info = "fetching FTS index words";
821
822 if (optim->zip == NULL) {
823 optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
824 } else {
825 fts_zip_initialize(optim->zip);
826 }
827
828 for (selected = fts_select_index(
829 optim->fts_index_table.charset, word->f_str, word->f_len);
830 fts_index_selector[selected].value;
831 selected++) {
832
833 optim->fts_index_table.suffix = fts_get_suffix(selected);
834
835 /* We've search all indexes. */
836 if (optim->fts_index_table.suffix == NULL) {
837 return(DB_TABLE_NOT_FOUND);
838 }
839
840 info = pars_info_create();
841
842 pars_info_bind_function(
843 info, "my_func", fts_fetch_index_words, optim->zip);
844
845 pars_info_bind_varchar_literal(
846 info, "word", word->f_str, word->f_len);
847
848 graph = fts_parse_sql(
849 &optim->fts_index_table,
850 info,
851 "DECLARE FUNCTION my_func;\n"
852 "DECLARE CURSOR c IS"
853 " SELECT word\n"
854 " FROM \"%s\"\n"
855 " WHERE word > :word\n"
856 " ORDER BY word;\n"
857 "BEGIN\n"
858 "\n"
859 "OPEN c;\n"
860 "WHILE 1 = 1 LOOP\n"
861 " FETCH c INTO my_func();\n"
862 " IF c % NOTFOUND THEN\n"
863 " EXIT;\n"
864 " END IF;\n"
865 "END LOOP;\n"
866 "CLOSE c;");
867
868 zip = optim->zip;
869
870 for(;;) {
871 int err;
872
873 if (!inited && ((err = deflateInit(zip->zp, 9))
874 != Z_OK)) {
875 ut_print_timestamp(stderr);
876 fprintf(stderr,
877 " InnoDB: Error: ZLib deflateInit() "
878 "failed: %d\n", err);
879
880 error = DB_ERROR;
881 break;
882 } else {
883 inited = TRUE;
884 error = fts_eval_sql(optim->trx, graph);
885 }
886
887 if (error == DB_SUCCESS) {
888 //FIXME fts_sql_commit(optim->trx);
889 break;
890 } else {
891 //FIXME fts_sql_rollback(optim->trx);
892
893 ut_print_timestamp(stderr);
894
895 if (error == DB_LOCK_WAIT_TIMEOUT) {
896 fprintf(stderr, " InnoDB: "
897 "Warning: lock wait "
898 "timeout reading document. "
899 "Retrying!\n");
900
901 /* We need to reset the ZLib state. */
902 inited = FALSE;
903 deflateEnd(zip->zp);
904 fts_zip_init(zip);
905
906 optim->trx->error_state = DB_SUCCESS;
907 } else {
908 fprintf(stderr, " InnoDB: Error: (%s) "
909 "while reading document.\n",
910 ut_strerr(error));
911
912 break; /* Exit the loop. */
913 }
914 }
915 }
916
917 fts_que_graph_free(graph);
918
919 /* Check if max word to fetch is exceeded */
920 if (optim->zip->n_words >= n_words) {
921 break;
922 }
923 }
924
925 if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
926
927 /* All data should have been read. */
928 ut_a(zip->zp->avail_in == 0);
929
930 fts_zip_deflate_end(zip);
931 } else {
932 deflateEnd(zip->zp);
933 }
934
935 return(error);
936 }
937
938 /**********************************************************************//**
939 Callback function to fetch the doc id from the record.
940 @return always returns TRUE */
941 static
942 ibool
fts_fetch_doc_ids(void * row,void * user_arg)943 fts_fetch_doc_ids(
944 /*==============*/
945 void* row, /*!< in: sel_node_t* */
946 void* user_arg) /*!< in: pointer to ib_vector_t */
947 {
948 que_node_t* exp;
949 int i = 0;
950 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
951 fts_doc_ids_t* fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
952 fts_update_t* update = static_cast<fts_update_t*>(
953 ib_vector_push(fts_doc_ids->doc_ids, NULL));
954
955 for (exp = sel_node->select_list;
956 exp;
957 exp = que_node_get_next(exp), ++i) {
958
959 dfield_t* dfield = que_node_get_val(exp);
960 void* data = dfield_get_data(dfield);
961 ulint len = dfield_get_len(dfield);
962
963 ut_a(len != UNIV_SQL_NULL);
964
965 /* Note: The column numbers below must match the SELECT. */
966 switch (i) {
967 case 0: /* DOC_ID */
968 update->fts_indexes = NULL;
969 update->doc_id = fts_read_doc_id(
970 static_cast<byte*>(data));
971 break;
972
973 default:
974 ut_error;
975 }
976 }
977
978 return(TRUE);
979 }
980
981 /**********************************************************************//**
982 Read the rows from a FTS common auxiliary table.
983 @return DB_SUCCESS or error code */
984 UNIV_INTERN
985 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)986 fts_table_fetch_doc_ids(
987 /*====================*/
988 trx_t* trx, /*!< in: transaction */
989 fts_table_t* fts_table, /*!< in: table */
990 fts_doc_ids_t* doc_ids) /*!< in: For collecting doc ids */
991 {
992 dberr_t error;
993 que_t* graph;
994 pars_info_t* info = pars_info_create();
995 ibool alloc_bk_trx = FALSE;
996
997 ut_a(fts_table->suffix != NULL);
998 ut_a(fts_table->type == FTS_COMMON_TABLE);
999
1000 if (!trx) {
1001 trx = trx_allocate_for_background();
1002 alloc_bk_trx = TRUE;
1003 }
1004
1005 trx->op_info = "fetching FTS doc ids";
1006
1007 pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
1008
1009 graph = fts_parse_sql(
1010 fts_table,
1011 info,
1012 "DECLARE FUNCTION my_func;\n"
1013 "DECLARE CURSOR c IS"
1014 " SELECT doc_id FROM \"%s\";\n"
1015 "BEGIN\n"
1016 "\n"
1017 "OPEN c;\n"
1018 "WHILE 1 = 1 LOOP\n"
1019 " FETCH c INTO my_func();\n"
1020 " IF c % NOTFOUND THEN\n"
1021 " EXIT;\n"
1022 " END IF;\n"
1023 "END LOOP;\n"
1024 "CLOSE c;");
1025
1026 error = fts_eval_sql(trx, graph);
1027
1028 mutex_enter(&dict_sys->mutex);
1029 que_graph_free(graph);
1030 mutex_exit(&dict_sys->mutex);
1031
1032 if (error == DB_SUCCESS) {
1033 fts_sql_commit(trx);
1034
1035 ib_vector_sort(doc_ids->doc_ids, fts_update_doc_id_cmp);
1036 } else {
1037 fts_sql_rollback(trx);
1038 }
1039
1040 if (alloc_bk_trx) {
1041 trx_free_for_background(trx);
1042 }
1043
1044 return(error);
1045 }
1046
1047 /**********************************************************************//**
1048 Do a binary search for a doc id in the array
1049 @return +ve index if found -ve index where it should be inserted
1050 if not found */
1051 UNIV_INTERN
1052 int
fts_bsearch(fts_update_t * array,int lower,int upper,doc_id_t doc_id)1053 fts_bsearch(
1054 /*========*/
1055 fts_update_t* array, /*!< in: array to sort */
1056 int lower, /*!< in: the array lower bound */
1057 int upper, /*!< in: the array upper bound */
1058 doc_id_t doc_id) /*!< in: the doc id to search for */
1059 {
1060 int orig_size = upper;
1061
1062 if (upper == 0) {
1063 /* Nothing to search */
1064 return(-1);
1065 } else {
1066 while (lower < upper) {
1067 int i = (lower + upper) >> 1;
1068
1069 if (doc_id > array[i].doc_id) {
1070 lower = i + 1;
1071 } else if (doc_id < array[i].doc_id) {
1072 upper = i - 1;
1073 } else {
1074 return(i); /* Found. */
1075 }
1076 }
1077 }
1078
1079 if (lower == upper && lower < orig_size) {
1080 if (doc_id == array[lower].doc_id) {
1081 return(lower);
1082 } else if (lower == 0) {
1083 return(-1);
1084 }
1085 }
1086
1087 /* Not found. */
1088 return( (lower == 0) ? -1 : -lower);
1089 }
1090
1091 /**********************************************************************//**
1092 Search in the to delete array whether any of the doc ids within
1093 the [first, last] range are to be deleted
1094 @return +ve index if found -ve index where it should be inserted
1095 if not found */
1096 static
1097 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1098 fts_optimize_lookup(
1099 /*================*/
1100 ib_vector_t* doc_ids, /*!< in: array to search */
1101 ulint lower, /*!< in: lower limit of array */
1102 doc_id_t first_doc_id, /*!< in: doc id to lookup */
1103 doc_id_t last_doc_id) /*!< in: doc id to lookup */
1104 {
1105 int pos;
1106 int upper = static_cast<int>(ib_vector_size(doc_ids));
1107 fts_update_t* array = (fts_update_t*) doc_ids->data;
1108
1109 pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1110
1111 ut_a(abs(pos) <= upper + 1);
1112
1113 if (pos < 0) {
1114
1115 int i = abs(pos);
1116
1117 /* If i is 1, it could be first_doc_id is less than
1118 either the first or second array item, do a
1119 double check */
1120 if (i == 1 && array[0].doc_id <= last_doc_id
1121 && first_doc_id < array[0].doc_id) {
1122 pos = 0;
1123 } else if (i < upper && array[i].doc_id <= last_doc_id) {
1124
1125 /* Check if the "next" doc id is within the
1126 first & last doc id of the node. */
1127 pos = i;
1128 }
1129 }
1130
1131 return(pos);
1132 }
1133
1134 /**********************************************************************//**
1135 Encode the word pos list into the node
1136 @return DB_SUCCESS or error code*/
1137 static MY_ATTRIBUTE((nonnull))
1138 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1139 fts_optimize_encode_node(
1140 /*=====================*/
1141 fts_node_t* node, /*!< in: node to fill*/
1142 doc_id_t doc_id, /*!< in: doc id to encode */
1143 fts_encode_t* enc) /*!< in: encoding state.*/
1144 {
1145 byte* dst;
1146 ulint enc_len;
1147 ulint pos_enc_len;
1148 doc_id_t doc_id_delta;
1149 dberr_t error = DB_SUCCESS;
1150 byte* src = enc->src_ilist_ptr;
1151
1152 if (node->first_doc_id == 0) {
1153 ut_a(node->last_doc_id == 0);
1154
1155 node->first_doc_id = doc_id;
1156 }
1157
1158 /* Calculate the space required to store the ilist. */
1159 ut_ad(doc_id > node->last_doc_id);
1160 doc_id_delta = doc_id - node->last_doc_id;
1161 enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1162
1163 /* Calculate the size of the encoded pos array. */
1164 while (*src) {
1165 fts_decode_vlc(&src);
1166 }
1167
1168 /* Skip the 0x00 byte at the end of the word positions list. */
1169 ++src;
1170
1171 /* Number of encoded pos bytes to copy. */
1172 pos_enc_len = src - enc->src_ilist_ptr;
1173
1174 /* Total number of bytes required for copy. */
1175 enc_len += pos_enc_len;
1176
1177 /* Check we have enough space in the destination buffer for
1178 copying the document word list. */
1179 if (!node->ilist) {
1180 ulint new_size;
1181
1182 ut_a(node->ilist_size == 0);
1183
1184 new_size = enc_len > FTS_ILIST_MAX_SIZE
1185 ? enc_len : FTS_ILIST_MAX_SIZE;
1186
1187 node->ilist = static_cast<byte*>(ut_malloc(new_size));
1188 node->ilist_size_alloc = new_size;
1189
1190 } else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1191 ulint new_size = node->ilist_size + enc_len;
1192 byte* ilist = static_cast<byte*>(ut_malloc(new_size));
1193
1194 memcpy(ilist, node->ilist, node->ilist_size);
1195
1196 ut_free(node->ilist);
1197
1198 node->ilist = ilist;
1199 node->ilist_size_alloc = new_size;
1200 }
1201
1202 src = enc->src_ilist_ptr;
1203 dst = node->ilist + node->ilist_size;
1204
1205 /* Encode the doc id. Cast to ulint, the delta should be small and
1206 therefore no loss of precision. */
1207 dst += fts_encode_int((ulint) doc_id_delta, dst);
1208
1209 /* Copy the encoded pos array. */
1210 memcpy(dst, src, pos_enc_len);
1211
1212 node->last_doc_id = doc_id;
1213
1214 /* Data copied upto here. */
1215 node->ilist_size += enc_len;
1216 enc->src_ilist_ptr += pos_enc_len;
1217
1218 ut_a(node->ilist_size <= node->ilist_size_alloc);
1219
1220 return(error);
1221 }
1222
1223 /**********************************************************************//**
1224 Optimize the data contained in a node.
1225 @return DB_SUCCESS or error code*/
1226 static MY_ATTRIBUTE((nonnull))
1227 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1228 fts_optimize_node(
1229 /*==============*/
1230 ib_vector_t* del_vec, /*!< in: vector of doc ids to delete*/
1231 int* del_pos, /*!< in: offset into above vector */
1232 fts_node_t* dst_node, /*!< in: node to fill*/
1233 fts_node_t* src_node, /*!< in: source node for data*/
1234 fts_encode_t* enc) /*!< in: encoding state */
1235 {
1236 ulint copied;
1237 dberr_t error = DB_SUCCESS;
1238 doc_id_t doc_id = enc->src_last_doc_id;
1239
1240 if (!enc->src_ilist_ptr) {
1241 enc->src_ilist_ptr = src_node->ilist;
1242 }
1243
1244 copied = enc->src_ilist_ptr - src_node->ilist;
1245
1246 /* While there is data in the source node and space to copy
1247 into in the destination node. */
1248 while (copied < src_node->ilist_size
1249 && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1250
1251 doc_id_t delta;
1252 doc_id_t del_doc_id = FTS_NULL_DOC_ID;
1253
1254 delta = fts_decode_vlc(&enc->src_ilist_ptr);
1255
1256 test_again:
1257 /* Check whether the doc id is in the delete list, if
1258 so then we skip the entries but we need to track the
1259 delta for decoding the entries following this document's
1260 entries. */
1261 if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1262 fts_update_t* update;
1263
1264 update = (fts_update_t*) ib_vector_get(
1265 del_vec, *del_pos);
1266
1267 del_doc_id = update->doc_id;
1268 }
1269
1270 if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1271 ut_a(delta == src_node->first_doc_id);
1272 }
1273
1274 doc_id += delta;
1275
1276 if (del_doc_id > 0 && doc_id == del_doc_id) {
1277
1278 ++*del_pos;
1279
1280 /* Skip the entries for this document. */
1281 while (*enc->src_ilist_ptr) {
1282 fts_decode_vlc(&enc->src_ilist_ptr);
1283 }
1284
1285 /* Skip the end of word position marker. */
1286 ++enc->src_ilist_ptr;
1287
1288 } else {
1289
1290 /* DOC ID already becomes larger than
1291 del_doc_id, check the next del_doc_id */
1292 if (del_doc_id > 0 && doc_id > del_doc_id) {
1293 del_doc_id = 0;
1294 ++*del_pos;
1295 delta = 0;
1296 goto test_again;
1297 }
1298
1299 /* Decode and copy the word positions into
1300 the dest node. */
1301 fts_optimize_encode_node(dst_node, doc_id, enc);
1302
1303 ++dst_node->doc_count;
1304
1305 ut_a(dst_node->last_doc_id == doc_id);
1306 }
1307
1308 /* Bytes copied so for from source. */
1309 copied = enc->src_ilist_ptr - src_node->ilist;
1310 }
1311
1312 if (copied >= src_node->ilist_size) {
1313 ut_a(doc_id == src_node->last_doc_id);
1314 }
1315
1316 enc->src_last_doc_id = doc_id;
1317
1318 return(error);
1319 }
1320
1321 /**********************************************************************//**
1322 Determine the starting pos within the deleted doc id vector for a word.
1323 @return delete position */
1324 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1325 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1326 fts_optimize_deleted_pos(
1327 /*=====================*/
1328 fts_optimize_t* optim, /*!< in: optimize state data */
1329 fts_word_t* word) /*!< in: the word data to check */
1330 {
1331 int del_pos;
1332 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1333
1334 /* Get the first and last dict ids for the word, we will use
1335 these values to determine which doc ids need to be removed
1336 when we coalesce the nodes. This way we can reduce the numer
1337 of elements that need to be searched in the deleted doc ids
1338 vector and secondly we can remove the doc ids during the
1339 coalescing phase. */
1340 if (ib_vector_size(del_vec) > 0) {
1341 fts_node_t* node;
1342 doc_id_t last_id;
1343 doc_id_t first_id;
1344 ulint size = ib_vector_size(word->nodes);
1345
1346 node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1347 first_id = node->first_doc_id;
1348
1349 node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1350 last_id = node->last_doc_id;
1351
1352 ut_a(first_id <= last_id);
1353
1354 del_pos = fts_optimize_lookup(
1355 del_vec, optim->del_pos, first_id, last_id);
1356 } else {
1357
1358 del_pos = -1; /* Note that there is nothing to delete. */
1359 }
1360
1361 return(del_pos);
1362 }
1363
1364 #define FTS_DEBUG_PRINT
1365 /**********************************************************************//**
1366 Compact the nodes for a word, we also remove any doc ids during the
1367 compaction pass.
1368 @return DB_SUCCESS or error code.*/
1369 static
1370 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1371 fts_optimize_word(
1372 /*==============*/
1373 fts_optimize_t* optim, /*!< in: optimize state data */
1374 fts_word_t* word) /*!< in: the word to optimize */
1375 {
1376 fts_encode_t enc;
1377 ib_vector_t* nodes;
1378 ulint i = 0;
1379 int del_pos;
1380 fts_node_t* dst_node = NULL;
1381 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1382 ulint size = ib_vector_size(word->nodes);
1383
1384 del_pos = fts_optimize_deleted_pos(optim, word);
1385 nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1386
1387 enc.src_last_doc_id = 0;
1388 enc.src_ilist_ptr = NULL;
1389
1390 if (fts_enable_diag_print) {
1391 word->text.f_str[word->text.f_len] = 0;
1392 fprintf(stderr, "FTS_OPTIMIZE: optimize \"%s\"\n",
1393 word->text.f_str);
1394 }
1395
1396 while (i < size) {
1397 ulint copied;
1398 fts_node_t* src_node;
1399
1400 src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1401
1402 if (dst_node == NULL
1403 || dst_node->last_doc_id > src_node->first_doc_id) {
1404
1405 dst_node = static_cast<fts_node_t*>(
1406 ib_vector_push(nodes, NULL));
1407 memset(dst_node, 0, sizeof(*dst_node));
1408 }
1409
1410 /* Copy from the src to the dst node. */
1411 fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1412
1413 ut_a(enc.src_ilist_ptr != NULL);
1414
1415 /* Determine the numer of bytes copied to dst_node. */
1416 copied = enc.src_ilist_ptr - src_node->ilist;
1417
1418 /* Can't copy more than whats in the vlc array. */
1419 ut_a(copied <= src_node->ilist_size);
1420
1421 /* We are done with this node release the resources. */
1422 if (copied == src_node->ilist_size) {
1423
1424 enc.src_last_doc_id = 0;
1425 enc.src_ilist_ptr = NULL;
1426
1427 ut_free(src_node->ilist);
1428
1429 src_node->ilist = NULL;
1430 src_node->ilist_size = src_node->ilist_size_alloc = 0;
1431
1432 src_node = NULL;
1433
1434 ++i; /* Get next source node to OPTIMIZE. */
1435 }
1436
1437 if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1438
1439 dst_node = NULL;
1440 }
1441 }
1442
1443 /* All dst nodes created should have been added to the vector. */
1444 ut_a(dst_node == NULL);
1445
1446 /* Return the OPTIMIZED nodes. */
1447 return(nodes);
1448 }
1449
1450 /**********************************************************************//**
1451 Update the FTS index table. This is a delete followed by an insert.
1452 @return DB_SUCCESS or error code */
1453 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1454 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1455 fts_optimize_write_word(
1456 /*====================*/
1457 trx_t* trx, /*!< in: transaction */
1458 fts_table_t* fts_table, /*!< in: table of FTS index */
1459 fts_string_t* word, /*!< in: word data to write */
1460 ib_vector_t* nodes) /*!< in: the nodes to write */
1461 {
1462 ulint i;
1463 pars_info_t* info;
1464 que_t* graph;
1465 ulint selected;
1466 dberr_t error = DB_SUCCESS;
1467 char* table_name = fts_get_table_name(fts_table);
1468
1469 info = pars_info_create();
1470
1471 ut_ad(fts_table->charset);
1472
1473 if (fts_enable_diag_print) {
1474 fprintf(stderr, "FTS_OPTIMIZE: processed \"%s\"\n",
1475 word->f_str);
1476 }
1477
1478 pars_info_bind_varchar_literal(
1479 info, "word", word->f_str, word->f_len);
1480
1481 selected = fts_select_index(fts_table->charset,
1482 word->f_str, word->f_len);
1483
1484 fts_table->suffix = fts_get_suffix(selected);
1485
1486 graph = fts_parse_sql(
1487 fts_table,
1488 info,
1489 "BEGIN DELETE FROM \"%s\" WHERE word = :word;");
1490
1491 error = fts_eval_sql(trx, graph);
1492
1493 if (error != DB_SUCCESS) {
1494 ut_print_timestamp(stderr);
1495 fprintf(stderr, " InnoDB: Error: (%s) during optimize, "
1496 "when deleting a word from the FTS index.\n",
1497 ut_strerr(error));
1498 }
1499
1500 fts_que_graph_free(graph);
1501 graph = NULL;
1502
1503 mem_free(table_name);
1504
1505 /* Even if the operation needs to be rolled back and redone,
1506 we iterate over the nodes in order to free the ilist. */
1507 for (i = 0; i < ib_vector_size(nodes); ++i) {
1508
1509 fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1510
1511 if (error == DB_SUCCESS) {
1512 error = fts_write_node(
1513 trx, &graph, fts_table, word, node);
1514
1515 if (error != DB_SUCCESS) {
1516 ut_print_timestamp(stderr);
1517 fprintf(stderr, " InnoDB: Error: (%s) "
1518 "during optimize, while adding a "
1519 "word to the FTS index.\n",
1520 ut_strerr(error));
1521 }
1522 }
1523
1524 ut_free(node->ilist);
1525 node->ilist = NULL;
1526 node->ilist_size = node->ilist_size_alloc = 0;
1527 }
1528
1529 if (graph != NULL) {
1530 fts_que_graph_free(graph);
1531 }
1532
1533 return(error);
1534 }
1535
1536 /**********************************************************************//**
1537 Free fts_optimizer_word_t instanace.*/
1538 UNIV_INTERN
1539 void
fts_word_free(fts_word_t * word)1540 fts_word_free(
1541 /*==========*/
1542 fts_word_t* word) /*!< in: instance to free.*/
1543 {
1544 mem_heap_t* heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1545
1546 #ifdef UNIV_DEBUG
1547 memset(word, 0, sizeof(*word));
1548 #endif /* UNIV_DEBUG */
1549
1550 mem_heap_free(heap);
1551 }
1552
1553 /**********************************************************************//**
1554 Optimize the word ilist and rewrite data to the FTS index.
1555 @return status one of RESTART, EXIT, ERROR */
1556 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1557 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,ib_time_t start_time)1558 fts_optimize_compact(
1559 /*=================*/
1560 fts_optimize_t* optim, /*!< in: optimize state data */
1561 dict_index_t* index, /*!< in: current FTS being optimized */
1562 ib_time_t start_time) /*!< in: optimize start time */
1563 {
1564 ulint i;
1565 dberr_t error = DB_SUCCESS;
1566 ulint size = ib_vector_size(optim->words);
1567
1568 for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1569 fts_word_t* word;
1570 ib_vector_t* nodes;
1571 trx_t* trx = optim->trx;
1572
1573 word = (fts_word_t*) ib_vector_get(optim->words, i);
1574
1575 /* nodes is allocated from the word heap and will be destroyed
1576 when the word is freed. We however have to be careful about
1577 the ilist, that needs to be freed explicitly. */
1578 nodes = fts_optimize_word(optim, word);
1579
1580 /* Update the data on disk. */
1581 error = fts_optimize_write_word(
1582 trx, &optim->fts_index_table, &word->text, nodes);
1583
1584 if (error == DB_SUCCESS) {
1585 /* Write the last word optimized to the config table,
1586 we use this value for restarting optimize. */
1587 error = fts_config_set_index_value(
1588 optim->trx, index,
1589 FTS_LAST_OPTIMIZED_WORD, &word->text);
1590 }
1591
1592 /* Free the word that was optimized. */
1593 fts_word_free(word);
1594
1595 if (fts_optimize_time_limit > 0
1596 && (ut_time() - start_time) > fts_optimize_time_limit) {
1597
1598 optim->done = TRUE;
1599 }
1600 }
1601
1602 return(error);
1603 }
1604
1605 /**********************************************************************//**
1606 Create an instance of fts_optimize_t. Also create a new
1607 background transaction.*/
1608 static
1609 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1610 fts_optimize_create(
1611 /*================*/
1612 dict_table_t* table) /*!< in: table with FTS indexes */
1613 {
1614 fts_optimize_t* optim;
1615 mem_heap_t* heap = mem_heap_create(128);
1616
1617 optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1618
1619 optim->self_heap = ib_heap_allocator_create(heap);
1620
1621 optim->to_delete = fts_doc_ids_create();
1622
1623 optim->words = ib_vector_create(
1624 optim->self_heap, sizeof(fts_word_t), 256);
1625
1626 optim->table = table;
1627
1628 optim->trx = trx_allocate_for_background();
1629
1630 optim->fts_common_table.parent = table->name;
1631 optim->fts_common_table.table_id = table->id;
1632 optim->fts_common_table.type = FTS_COMMON_TABLE;
1633 optim->fts_common_table.table = table;
1634
1635 optim->fts_index_table.parent = table->name;
1636 optim->fts_index_table.table_id = table->id;
1637 optim->fts_index_table.type = FTS_INDEX_TABLE;
1638 optim->fts_index_table.table = table;
1639
1640 /* The common prefix for all this parent table's aux tables. */
1641 optim->name_prefix = fts_get_table_name_prefix(
1642 &optim->fts_common_table);
1643
1644 return(optim);
1645 }
1646
1647 #ifdef FTS_OPTIMIZE_DEBUG
1648 /**********************************************************************//**
1649 Get optimize start time of an FTS index.
1650 @return DB_SUCCESS if all OK else error code */
1651 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1652 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t * start_time)1653 fts_optimize_get_index_start_time(
1654 /*==============================*/
1655 trx_t* trx, /*!< in: transaction */
1656 dict_index_t* index, /*!< in: FTS index */
1657 ib_time_t* start_time) /*!< out: time in secs */
1658 {
1659 return(fts_config_get_index_ulint(
1660 trx, index, FTS_OPTIMIZE_START_TIME,
1661 (ulint*) start_time));
1662 }
1663
1664 /**********************************************************************//**
1665 Set the optimize start time of an FTS index.
1666 @return DB_SUCCESS if all OK else error code */
1667 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1668 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t start_time)1669 fts_optimize_set_index_start_time(
1670 /*==============================*/
1671 trx_t* trx, /*!< in: transaction */
1672 dict_index_t* index, /*!< in: FTS index */
1673 ib_time_t start_time) /*!< in: start time */
1674 {
1675 return(fts_config_set_index_ulint(
1676 trx, index, FTS_OPTIMIZE_START_TIME,
1677 (ulint) start_time));
1678 }
1679
1680 /**********************************************************************//**
1681 Get optimize end time of an FTS index.
1682 @return DB_SUCCESS if all OK else error code */
1683 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1684 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t * end_time)1685 fts_optimize_get_index_end_time(
1686 /*============================*/
1687 trx_t* trx, /*!< in: transaction */
1688 dict_index_t* index, /*!< in: FTS index */
1689 ib_time_t* end_time) /*!< out: time in secs */
1690 {
1691 return(fts_config_get_index_ulint(
1692 trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1693 }
1694
1695 /**********************************************************************//**
1696 Set the optimize end time of an FTS index.
1697 @return DB_SUCCESS if all OK else error code */
1698 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1699 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t end_time)1700 fts_optimize_set_index_end_time(
1701 /*============================*/
1702 trx_t* trx, /*!< in: transaction */
1703 dict_index_t* index, /*!< in: FTS index */
1704 ib_time_t end_time) /*!< in: end time */
1705 {
1706 return(fts_config_set_index_ulint(
1707 trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1708 }
1709 #endif
1710
1711 /**********************************************************************//**
1712 Free the optimize prepared statements.*/
1713 static
1714 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1715 fts_optimize_graph_free(
1716 /*====================*/
1717 fts_optimize_graph_t* graph) /*!< in/out: The graph instances
1718 to free */
1719 {
1720 if (graph->commit_graph) {
1721 que_graph_free(graph->commit_graph);
1722 graph->commit_graph = NULL;
1723 }
1724
1725 if (graph->write_nodes_graph) {
1726 que_graph_free(graph->write_nodes_graph);
1727 graph->write_nodes_graph = NULL;
1728 }
1729
1730 if (graph->delete_nodes_graph) {
1731 que_graph_free(graph->delete_nodes_graph);
1732 graph->delete_nodes_graph = NULL;
1733 }
1734
1735 if (graph->read_nodes_graph) {
1736 que_graph_free(graph->read_nodes_graph);
1737 graph->read_nodes_graph = NULL;
1738 }
1739 }
1740
1741 /**********************************************************************//**
1742 Free all optimize resources. */
1743 static
1744 void
fts_optimize_free(fts_optimize_t * optim)1745 fts_optimize_free(
1746 /*==============*/
1747 fts_optimize_t* optim) /*!< in: table with on FTS index */
1748 {
1749 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1750
1751 trx_free_for_background(optim->trx);
1752
1753 fts_doc_ids_free(optim->to_delete);
1754 fts_optimize_graph_free(&optim->graph);
1755
1756 mem_free(optim->name_prefix);
1757
1758 /* This will free the heap from which optim itself was allocated. */
1759 mem_heap_free(heap);
1760 }
1761
1762 /**********************************************************************//**
1763 Get the max time optimize should run in millisecs.
1764 @return max optimize time limit in millisecs. */
1765 static
1766 ib_time_t
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1767 fts_optimize_get_time_limit(
1768 /*========================*/
1769 trx_t* trx, /*!< in: transaction */
1770 fts_table_t* fts_table) /*!< in: aux table */
1771 {
1772 ib_time_t time_limit = 0;
1773
1774 fts_config_get_ulint(
1775 trx, fts_table,
1776 FTS_OPTIMIZE_LIMIT_IN_SECS, (ulint*) &time_limit);
1777
1778 return(time_limit * 1000);
1779 }
1780
1781
1782 /**********************************************************************//**
1783 Run OPTIMIZE on the given table. Note: this can take a very long time
1784 (hours). */
1785 static
1786 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1787 fts_optimize_words(
1788 /*===============*/
1789 fts_optimize_t* optim, /*!< in: optimize instance */
1790 dict_index_t* index, /*!< in: current FTS being optimized */
1791 fts_string_t* word) /*!< in: the starting word to optimize */
1792 {
1793 fts_fetch_t fetch;
1794 ib_time_t start_time;
1795 que_t* graph = NULL;
1796 CHARSET_INFO* charset = optim->fts_index_table.charset;
1797
1798 ut_a(!optim->done);
1799
1800 /* Get the time limit from the config table. */
1801 fts_optimize_time_limit = fts_optimize_get_time_limit(
1802 optim->trx, &optim->fts_common_table);
1803
1804 start_time = ut_time();
1805
1806 /* Setup the callback to use for fetching the word ilist etc. */
1807 fetch.read_arg = optim->words;
1808 fetch.read_record = fts_optimize_index_fetch_node;
1809
1810 fprintf(stderr, "%.*s\n", (int) word->f_len, word->f_str);
1811
1812 while(!optim->done) {
1813 dberr_t error;
1814 trx_t* trx = optim->trx;
1815 ulint selected;
1816
1817 ut_a(ib_vector_size(optim->words) == 0);
1818
1819 selected = fts_select_index(charset, word->f_str, word->f_len);
1820
1821 /* Read the index records to optimize. */
1822 fetch.total_memory = 0;
1823 error = fts_index_fetch_nodes(
1824 trx, &graph, &optim->fts_index_table, word,
1825 &fetch);
1826 ut_ad(fetch.total_memory < fts_result_cache_limit);
1827
1828 if (error == DB_SUCCESS) {
1829 /* There must be some nodes to read. */
1830 ut_a(ib_vector_size(optim->words) > 0);
1831
1832 /* Optimize the nodes that were read and write
1833 back to DB. */
1834 error = fts_optimize_compact(optim, index, start_time);
1835
1836 if (error == DB_SUCCESS) {
1837 fts_sql_commit(optim->trx);
1838 } else {
1839 fts_sql_rollback(optim->trx);
1840 }
1841 }
1842
1843 ib_vector_reset(optim->words);
1844
1845 if (error == DB_SUCCESS) {
1846 if (!optim->done) {
1847 if (!fts_zip_read_word(optim->zip, word)) {
1848 optim->done = TRUE;
1849 } else if (selected
1850 != fts_select_index(
1851 charset, word->f_str,
1852 word->f_len)
1853 && graph) {
1854 fts_que_graph_free(graph);
1855 graph = NULL;
1856 }
1857 }
1858 } else if (error == DB_LOCK_WAIT_TIMEOUT) {
1859 fprintf(stderr, "InnoDB: Warning: lock wait timeout "
1860 "during optimize. Retrying!\n");
1861
1862 trx->error_state = DB_SUCCESS;
1863 } else if (error == DB_DEADLOCK) {
1864 fprintf(stderr, "InnoDB: Warning: deadlock "
1865 "during optimize. Retrying!\n");
1866
1867 trx->error_state = DB_SUCCESS;
1868 } else {
1869 optim->done = TRUE; /* Exit the loop. */
1870 }
1871 }
1872
1873 if (graph != NULL) {
1874 fts_que_graph_free(graph);
1875 }
1876 }
1877
1878 /**********************************************************************//**
1879 Select the FTS index to search.
1880 @return TRUE if last index */
1881 static
1882 ibool
fts_optimize_set_next_word(CHARSET_INFO * charset,fts_string_t * word)1883 fts_optimize_set_next_word(
1884 /*=======================*/
1885 CHARSET_INFO* charset, /*!< in: charset */
1886 fts_string_t* word) /*!< in: current last word */
1887 {
1888 ulint selected;
1889 ibool last = FALSE;
1890
1891 selected = fts_select_next_index(charset, word->f_str, word->f_len);
1892
1893 /* If this was the last index then reset to start. */
1894 if (fts_index_selector[selected].value == 0) {
1895 /* Reset the last optimized word to '' if no
1896 more words could be read from the FTS index. */
1897 word->f_len = 0;
1898 *word->f_str = 0;
1899
1900 last = TRUE;
1901 } else {
1902 ulint value = fts_index_selector[selected].value;
1903
1904 ut_a(value <= 0xff);
1905
1906 /* Set to the first character of the next slot. */
1907 word->f_len = 1;
1908 *word->f_str = (byte) value;
1909 }
1910
1911 return(last);
1912 }
1913
1914 /**********************************************************************//**
1915 Optimize is complete. Set the completion time, and reset the optimize
1916 start string for this FTS index to "".
1917 @return DB_SUCCESS if all OK */
1918 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1919 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1920 fts_optimize_index_completed(
1921 /*=========================*/
1922 fts_optimize_t* optim, /*!< in: optimize instance */
1923 dict_index_t* index) /*!< in: table with one FTS index */
1924 {
1925 fts_string_t word;
1926 dberr_t error;
1927 byte buf[sizeof(ulint)];
1928 #ifdef FTS_OPTIMIZE_DEBUG
1929 ib_time_t end_time = ut_time();
1930
1931 error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1932 #endif
1933
1934 /* If we've reached the end of the index then set the start
1935 word to the empty string. */
1936
1937 word.f_len = 0;
1938 word.f_str = buf;
1939 *word.f_str = '\0';
1940
1941 error = fts_config_set_index_value(
1942 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1943
1944 if (error != DB_SUCCESS) {
1945
1946 fprintf(stderr, "InnoDB: Error: (%s) while "
1947 "updating last optimized word!\n", ut_strerr(error));
1948 }
1949
1950 return(error);
1951 }
1952
1953
1954 /**********************************************************************//**
1955 Read the list of words from the FTS auxiliary index that will be
1956 optimized in this pass.
1957 @return DB_SUCCESS if all OK */
1958 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1959 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1960 fts_optimize_index_read_words(
1961 /*==========================*/
1962 fts_optimize_t* optim, /*!< in: optimize instance */
1963 dict_index_t* index, /*!< in: table with one FTS index */
1964 fts_string_t* word) /*!< in: buffer to use */
1965 {
1966 dberr_t error = DB_SUCCESS;
1967
1968 if (optim->del_list_regenerated) {
1969 word->f_len = 0;
1970 } else {
1971
1972 /* Get the last word that was optimized from
1973 the config table. */
1974 error = fts_config_get_index_value(
1975 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1976 }
1977
1978 /* If record not found then we start from the top. */
1979 if (error == DB_RECORD_NOT_FOUND) {
1980 word->f_len = 0;
1981 error = DB_SUCCESS;
1982 }
1983
1984 while (error == DB_SUCCESS) {
1985
1986 error = fts_index_fetch_words(
1987 optim, word, fts_num_word_optimize);
1988
1989 if (error == DB_SUCCESS) {
1990
1991 /* If the search returned an empty set
1992 try the next index in the horizontal split. */
1993 if (optim->zip->n_words > 0) {
1994 break;
1995 } else {
1996
1997 fts_optimize_set_next_word(
1998 optim->fts_index_table.charset,
1999 word);
2000
2001 if (word->f_len == 0) {
2002 break;
2003 }
2004 }
2005 }
2006 }
2007
2008 return(error);
2009 }
2010
2011 /**********************************************************************//**
2012 Run OPTIMIZE on the given FTS index. Note: this can take a very long
2013 time (hours).
2014 @return DB_SUCCESS if all OK */
2015 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2016 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)2017 fts_optimize_index(
2018 /*===============*/
2019 fts_optimize_t* optim, /*!< in: optimize instance */
2020 dict_index_t* index) /*!< in: table with one FTS index */
2021 {
2022 fts_string_t word;
2023 dberr_t error;
2024 byte str[FTS_MAX_WORD_LEN + 1];
2025
2026 /* Set the current index that we have to optimize. */
2027 optim->fts_index_table.index_id = index->id;
2028 optim->fts_index_table.charset = fts_index_get_charset(index);
2029
2030 optim->done = FALSE; /* Optimize until !done */
2031
2032 /* We need to read the last word optimized so that we start from
2033 the next word. */
2034 word.f_str = str;
2035
2036 /* We set the length of word to the size of str since we
2037 need to pass the max len info to the fts_get_config_value() function. */
2038 word.f_len = sizeof(str) - 1;
2039
2040 memset(word.f_str, 0x0, word.f_len);
2041
2042 /* Read the words that will be optimized in this pass. */
2043 error = fts_optimize_index_read_words(optim, index, &word);
2044
2045 if (error == DB_SUCCESS) {
2046 int zip_error;
2047
2048 ut_a(optim->zip->pos == 0);
2049 ut_a(optim->zip->zp->total_in == 0);
2050 ut_a(optim->zip->zp->total_out == 0);
2051
2052 zip_error = inflateInit(optim->zip->zp);
2053 ut_a(zip_error == Z_OK);
2054
2055 word.f_len = 0;
2056 word.f_str = str;
2057
2058 /* Read the first word to optimize from the Zip buffer. */
2059 if (!fts_zip_read_word(optim->zip, &word)) {
2060
2061 optim->done = TRUE;
2062 } else {
2063 fts_optimize_words(optim, index, &word);
2064 }
2065
2066 /* If we couldn't read any records then optimize is
2067 complete. Increment the number of indexes that have
2068 been optimized and set FTS index optimize state to
2069 completed. */
2070 if (error == DB_SUCCESS && optim->zip->n_words == 0) {
2071
2072 error = fts_optimize_index_completed(optim, index);
2073
2074 if (error == DB_SUCCESS) {
2075 ++optim->n_completed;
2076 }
2077 }
2078 }
2079
2080 return(error);
2081 }
2082
2083 /**********************************************************************//**
2084 Delete the document ids in the delete, and delete cache tables.
2085 @return DB_SUCCESS if all OK */
2086 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2087 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2088 fts_optimize_purge_deleted_doc_ids(
2089 /*===============================*/
2090 fts_optimize_t* optim) /*!< in: optimize instance */
2091 {
2092 ulint i;
2093 pars_info_t* info;
2094 que_t* graph;
2095 fts_update_t* update;
2096 char* sql_str;
2097 doc_id_t write_doc_id;
2098 dberr_t error = DB_SUCCESS;
2099
2100 info = pars_info_create();
2101
2102 ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2103
2104 update = static_cast<fts_update_t*>(
2105 ib_vector_get(optim->to_delete->doc_ids, 0));
2106
2107 /* Convert to "storage" byte order. */
2108 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2109
2110 /* This is required for the SQL parser to work. It must be able
2111 to find the following variables. So we do it twice. */
2112 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2113 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2114
2115 /* Since we only replace the table_id and don't construct the full
2116 name, we do substitution ourselves. Remember to free sql_str. */
2117 sql_str = ut_strreplace(
2118 fts_delete_doc_ids_sql, "%s", optim->name_prefix);
2119
2120 graph = fts_parse_sql(NULL, info, sql_str);
2121
2122 mem_free(sql_str);
2123
2124 /* Delete the doc ids that were copied at the start. */
2125 for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2126
2127 update = static_cast<fts_update_t*>(ib_vector_get(
2128 optim->to_delete->doc_ids, i));
2129
2130 /* Convert to "storage" byte order. */
2131 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2132
2133 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2134
2135 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2136
2137 error = fts_eval_sql(optim->trx, graph);
2138
2139 // FIXME: Check whether delete actually succeeded!
2140 if (error != DB_SUCCESS) {
2141
2142 fts_sql_rollback(optim->trx);
2143 break;
2144 }
2145 }
2146
2147 fts_que_graph_free(graph);
2148
2149 return(error);
2150 }
2151
2152 /**********************************************************************//**
2153 Delete the document ids in the pending delete, and delete tables.
2154 @return DB_SUCCESS if all OK */
2155 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2156 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2157 fts_optimize_purge_deleted_doc_id_snapshot(
2158 /*=======================================*/
2159 fts_optimize_t* optim) /*!< in: optimize instance */
2160 {
2161 dberr_t error;
2162 que_t* graph;
2163 char* sql_str;
2164
2165 /* Since we only replace the table_id and don't construct
2166 the full name, we do the '%s' substitution ourselves. */
2167 sql_str = ut_strreplace(fts_end_delete_sql, "%s", optim->name_prefix);
2168
2169 /* Delete the doc ids that were copied to delete pending state at
2170 the start of optimize. */
2171 graph = fts_parse_sql(NULL, NULL, sql_str);
2172
2173 mem_free(sql_str);
2174
2175 error = fts_eval_sql(optim->trx, graph);
2176 fts_que_graph_free(graph);
2177
2178 return(error);
2179 }
2180
2181 /**********************************************************************//**
2182 Copy the deleted doc ids that will be purged during this optimize run
2183 to the being deleted FTS auxiliary tables. The transaction is committed
2184 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2185 @return DB_SUCCESS if all OK */
2186 static
2187 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2188 fts_optimize_being_deleted_count(
2189 /*=============================*/
2190 fts_optimize_t* optim) /*!< in: optimize instance */
2191 {
2192 fts_table_t fts_table;
2193
2194 FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2195 optim->table);
2196
2197 return(fts_get_rows_count(&fts_table));
2198 }
2199
2200 /*********************************************************************//**
2201 Copy the deleted doc ids that will be purged during this optimize run
2202 to the being deleted FTS auxiliary tables. The transaction is committed
2203 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2204 @return DB_SUCCESS if all OK */
2205 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2206 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2207 fts_optimize_create_deleted_doc_id_snapshot(
2208 /*========================================*/
2209 fts_optimize_t* optim) /*!< in: optimize instance */
2210 {
2211 dberr_t error;
2212 que_t* graph;
2213 char* sql_str;
2214
2215 /* Since we only replace the table_id and don't construct the
2216 full name, we do the substitution ourselves. */
2217 sql_str = ut_strreplace(fts_init_delete_sql, "%s", optim->name_prefix);
2218
2219 /* Move doc_ids that are to be deleted to state being deleted. */
2220 graph = fts_parse_sql(NULL, NULL, sql_str);
2221
2222 mem_free(sql_str);
2223
2224 error = fts_eval_sql(optim->trx, graph);
2225
2226 fts_que_graph_free(graph);
2227
2228 if (error != DB_SUCCESS) {
2229 fts_sql_rollback(optim->trx);
2230 } else {
2231 fts_sql_commit(optim->trx);
2232 }
2233
2234 optim->del_list_regenerated = TRUE;
2235
2236 return(error);
2237 }
2238
2239 /*********************************************************************//**
2240 Read in the document ids that are to be purged during optimize. The
2241 transaction is committed upon successfully read.
2242 @return DB_SUCCESS if all OK */
2243 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2244 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2245 fts_optimize_read_deleted_doc_id_snapshot(
2246 /*======================================*/
2247 fts_optimize_t* optim) /*!< in: optimize instance */
2248 {
2249 dberr_t error;
2250
2251 optim->fts_common_table.suffix = "BEING_DELETED";
2252
2253 /* Read the doc_ids to delete. */
2254 error = fts_table_fetch_doc_ids(
2255 optim->trx, &optim->fts_common_table, optim->to_delete);
2256
2257 if (error == DB_SUCCESS) {
2258
2259 optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2260
2261 /* Read additional doc_ids to delete. */
2262 error = fts_table_fetch_doc_ids(
2263 optim->trx, &optim->fts_common_table, optim->to_delete);
2264 }
2265
2266 if (error != DB_SUCCESS) {
2267
2268 fts_doc_ids_free(optim->to_delete);
2269 optim->to_delete = NULL;
2270 }
2271
2272 return(error);
2273 }
2274
2275 /*********************************************************************//**
2276 Optimze all the FTS indexes, skipping those that have already been
2277 optimized, since the FTS auxiliary indexes are not guaranteed to be
2278 of the same cardinality.
2279 @return DB_SUCCESS if all OK */
2280 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2281 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2282 fts_optimize_indexes(
2283 /*=================*/
2284 fts_optimize_t* optim) /*!< in: optimize instance */
2285 {
2286 ulint i;
2287 dberr_t error = DB_SUCCESS;
2288 fts_t* fts = optim->table->fts;
2289
2290 /* Optimize the FTS indexes. */
2291 for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2292 dict_index_t* index;
2293
2294 #ifdef FTS_OPTIMIZE_DEBUG
2295 ib_time_t end_time;
2296 ib_time_t start_time;
2297
2298 /* Get the start and end optimize times for this index. */
2299 error = fts_optimize_get_index_start_time(
2300 optim->trx, index, &start_time);
2301
2302 if (error != DB_SUCCESS) {
2303 break;
2304 }
2305
2306 error = fts_optimize_get_index_end_time(
2307 optim->trx, index, &end_time);
2308
2309 if (error != DB_SUCCESS) {
2310 break;
2311 }
2312
2313 /* Start time will be 0 only for the first time or after
2314 completing the optimization of all FTS indexes. */
2315 if (start_time == 0) {
2316 start_time = ut_time();
2317
2318 error = fts_optimize_set_index_start_time(
2319 optim->trx, index, start_time);
2320 }
2321
2322 /* Check if this index needs to be optimized or not. */
2323 if (ut_difftime(end_time, start_time) < 0) {
2324 error = fts_optimize_index(optim, index);
2325
2326 if (error != DB_SUCCESS) {
2327 break;
2328 }
2329 } else {
2330 ++optim->n_completed;
2331 }
2332 #endif
2333 index = static_cast<dict_index_t*>(
2334 ib_vector_getp(fts->indexes, i));
2335 error = fts_optimize_index(optim, index);
2336 }
2337
2338 if (error == DB_SUCCESS) {
2339 fts_sql_commit(optim->trx);
2340 } else {
2341 fts_sql_rollback(optim->trx);
2342 }
2343
2344 return(error);
2345 }
2346
2347 /*********************************************************************//**
2348 Cleanup the snapshot tables and the master deleted table.
2349 @return DB_SUCCESS if all OK */
2350 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2351 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2352 fts_optimize_purge_snapshot(
2353 /*========================*/
2354 fts_optimize_t* optim) /*!< in: optimize instance */
2355 {
2356 dberr_t error;
2357
2358 /* Delete the doc ids from the master deleted tables, that were
2359 in the snapshot that was taken at the start of optimize. */
2360 error = fts_optimize_purge_deleted_doc_ids(optim);
2361
2362 if (error == DB_SUCCESS) {
2363 /* Destroy the deleted doc id snapshot. */
2364 error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2365 }
2366
2367 if (error == DB_SUCCESS) {
2368 fts_sql_commit(optim->trx);
2369 } else {
2370 fts_sql_rollback(optim->trx);
2371 }
2372
2373 return(error);
2374 }
2375
2376 /*********************************************************************//**
2377 Reset the start time to 0 so that a new optimize can be started.
2378 @return DB_SUCCESS if all OK */
2379 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2380 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2381 fts_optimize_reset_start_time(
2382 /*==========================*/
2383 fts_optimize_t* optim) /*!< in: optimize instance */
2384 {
2385 dberr_t error = DB_SUCCESS;
2386 #ifdef FTS_OPTIMIZE_DEBUG
2387 fts_t* fts = optim->table->fts;
2388
2389 /* Optimization should have been completed for all indexes. */
2390 ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2391
2392 for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2393 dict_index_t* index;
2394
2395 ib_time_t start_time = 0;
2396
2397 /* Reset the start time to 0 for this index. */
2398 error = fts_optimize_set_index_start_time(
2399 optim->trx, index, start_time);
2400
2401 index = static_cast<dict_index_t*>(
2402 ib_vector_getp(fts->indexes, i));
2403 }
2404 #endif
2405
2406 if (error == DB_SUCCESS) {
2407 fts_sql_commit(optim->trx);
2408 } else {
2409 fts_sql_rollback(optim->trx);
2410 }
2411
2412 return(error);
2413 }
2414
2415 /*********************************************************************//**
2416 Run OPTIMIZE on the given table by a background thread.
2417 @return DB_SUCCESS if all OK */
2418 static MY_ATTRIBUTE((nonnull))
2419 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2420 fts_optimize_table_bk(
2421 /*==================*/
2422 fts_slot_t* slot) /*!< in: table to optimiza */
2423 {
2424 dberr_t error;
2425 dict_table_t* table = slot->table;
2426 fts_t* fts = table->fts;
2427
2428 /* Avoid optimizing tables that were optimized recently. */
2429 if (slot->last_run > 0
2430 && (ut_time() - slot->last_run) < slot->interval_time) {
2431
2432 return(DB_SUCCESS);
2433
2434 } else if (fts && fts->cache
2435 && fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2436
2437 error = fts_optimize_table(table);
2438
2439 if (error == DB_SUCCESS) {
2440 slot->state = FTS_STATE_DONE;
2441 slot->last_run = 0;
2442 slot->completed = ut_time();
2443 }
2444 } else {
2445 error = DB_SUCCESS;
2446 }
2447
2448 /* Note time this run completed. */
2449 slot->last_run = ut_time();
2450
2451 return(error);
2452 }
2453 /*********************************************************************//**
2454 Run OPTIMIZE on the given table.
2455 @return DB_SUCCESS if all OK */
2456 UNIV_INTERN
2457 dberr_t
fts_optimize_table(dict_table_t * table)2458 fts_optimize_table(
2459 /*===============*/
2460 dict_table_t* table) /*!< in: table to optimiza */
2461 {
2462 dberr_t error = DB_SUCCESS;
2463 fts_optimize_t* optim = NULL;
2464 fts_t* fts = table->fts;
2465
2466 ut_print_timestamp(stderr);
2467 fprintf(stderr, " InnoDB: FTS start optimize %s\n", table->name);
2468
2469 optim = fts_optimize_create(table);
2470
2471 // FIXME: Call this only at the start of optimize, currently we
2472 // rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2473
2474 /* Check whether there are still records in BEING_DELETED table */
2475 if (fts_optimize_being_deleted_count(optim) == 0) {
2476 /* Take a snapshot of the deleted document ids, they are copied
2477 to the BEING_ tables. */
2478 error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2479 }
2480
2481 /* A duplicate error is OK, since we don't erase the
2482 doc ids from the being deleted state until all FTS
2483 indexes have been optimized. */
2484 if (error == DB_DUPLICATE_KEY) {
2485 error = DB_SUCCESS;
2486 }
2487
2488 if (error == DB_SUCCESS) {
2489
2490 /* These document ids will be filtered out during the
2491 index optimization phase. They are in the snapshot that we
2492 took above, at the start of the optimize. */
2493 error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2494
2495 if (error == DB_SUCCESS) {
2496
2497 /* Commit the read of being deleted
2498 doc ids transaction. */
2499 fts_sql_commit(optim->trx);
2500
2501 /* We would do optimization only if there
2502 are deleted records to be cleaned up */
2503 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2504 error = fts_optimize_indexes(optim);
2505 }
2506
2507 } else {
2508 ut_a(optim->to_delete == NULL);
2509 }
2510
2511 /* Only after all indexes have been optimized can we
2512 delete the (snapshot) doc ids in the pending delete,
2513 and master deleted tables. */
2514 if (error == DB_SUCCESS
2515 && optim->n_completed == ib_vector_size(fts->indexes)) {
2516
2517 if (fts_enable_diag_print) {
2518 fprintf(stderr, "FTS_OPTIMIZE: Completed "
2519 "Optimize, cleanup DELETED "
2520 "table\n");
2521 }
2522
2523 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2524
2525 /* Purge the doc ids that were in the
2526 snapshot from the snapshot tables and
2527 the master deleted table. */
2528 error = fts_optimize_purge_snapshot(optim);
2529 }
2530
2531 if (error == DB_SUCCESS) {
2532 /* Reset the start time of all the FTS indexes
2533 so that optimize can be restarted. */
2534 error = fts_optimize_reset_start_time(optim);
2535 }
2536 }
2537 }
2538
2539 fts_optimize_free(optim);
2540
2541 ut_print_timestamp(stderr);
2542 fprintf(stderr, " InnoDB: FTS end optimize %s\n", table->name);
2543
2544 return(error);
2545 }
2546
2547 /********************************************************************//**
2548 Add the table to add to the OPTIMIZER's list.
2549 @return new message instance */
2550 static
2551 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2552 fts_optimize_create_msg(
2553 /*====================*/
2554 fts_msg_type_t type, /*!< in: type of message */
2555 void* ptr) /*!< in: message payload */
2556 {
2557 mem_heap_t* heap;
2558 fts_msg_t* msg;
2559
2560 heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2561 msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2562
2563 msg->ptr = ptr;
2564 msg->type = type;
2565 msg->heap = heap;
2566
2567 return(msg);
2568 }
2569
2570 /**********************************************************************//**
2571 Add the table to add to the OPTIMIZER's list. */
2572 UNIV_INTERN
2573 void
fts_optimize_add_table(dict_table_t * table)2574 fts_optimize_add_table(
2575 /*===================*/
2576 dict_table_t* table) /*!< in: table to add */
2577 {
2578 fts_msg_t* msg;
2579
2580 if (!fts_optimize_wq) {
2581 return;
2582 }
2583
2584 /* Make sure table with FTS index cannot be evicted */
2585 if (table->can_be_evicted) {
2586 dict_table_move_from_lru_to_non_lru(table);
2587 }
2588
2589 msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2590
2591 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2592 }
2593
2594 /**********************************************************************//**
2595 Optimize a table. */
2596 UNIV_INTERN
2597 void
fts_optimize_do_table(dict_table_t * table)2598 fts_optimize_do_table(
2599 /*==================*/
2600 dict_table_t* table) /*!< in: table to optimize */
2601 {
2602 fts_msg_t* msg;
2603
2604 /* Optimizer thread could be shutdown */
2605 if (!fts_optimize_wq) {
2606 return;
2607 }
2608
2609 msg = fts_optimize_create_msg(FTS_MSG_OPTIMIZE_TABLE, table);
2610
2611 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2612 }
2613
2614 /**********************************************************************//**
2615 Remove the table from the OPTIMIZER's list. We do wait for
2616 acknowledgement from the consumer of the message. */
2617 UNIV_INTERN
2618 void
fts_optimize_remove_table(dict_table_t * table)2619 fts_optimize_remove_table(
2620 /*======================*/
2621 dict_table_t* table) /*!< in: table to remove */
2622 {
2623 fts_msg_t* msg;
2624 os_event_t event;
2625 fts_msg_del_t* remove;
2626
2627 /* if the optimize system not yet initialized, return */
2628 if (!fts_optimize_wq) {
2629 return;
2630 }
2631
2632 /* FTS optimizer thread is already exited */
2633 if (fts_opt_start_shutdown) {
2634 ib_logf(IB_LOG_LEVEL_INFO,
2635 "Try to remove table %s after FTS optimize"
2636 " thread exiting.", table->name);
2637 return;
2638 }
2639
2640 msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2641
2642 /* We will wait on this event until signalled by the consumer. */
2643 event = os_event_create();
2644
2645 remove = static_cast<fts_msg_del_t*>(
2646 mem_heap_alloc(msg->heap, sizeof(*remove)));
2647
2648 remove->table = table;
2649 remove->event = event;
2650 msg->ptr = remove;
2651
2652 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2653
2654 os_event_wait(event);
2655
2656 os_event_free(event);
2657 }
2658
2659 /** Send sync fts cache for the table.
2660 @param[in] table table to sync */
2661 UNIV_INTERN
2662 void
fts_optimize_request_sync_table(dict_table_t * table)2663 fts_optimize_request_sync_table(
2664 dict_table_t* table)
2665 {
2666 fts_msg_t* msg;
2667 table_id_t* table_id;
2668
2669 /* if the optimize system not yet initialized, return */
2670 if (!fts_optimize_wq) {
2671 return;
2672 }
2673
2674 /* FTS optimizer thread is already exited */
2675 if (fts_opt_start_shutdown) {
2676 ib_logf(IB_LOG_LEVEL_INFO,
2677 "Try to sync table %s after FTS optimize"
2678 " thread exiting.", table->name);
2679 return;
2680 }
2681
2682 msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
2683
2684 table_id = static_cast<table_id_t*>(
2685 mem_heap_alloc(msg->heap, sizeof(table_id_t)));
2686 *table_id = table->id;
2687 msg->ptr = table_id;
2688
2689 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2690 }
2691
2692 /**********************************************************************//**
2693 Find the slot for a particular table.
2694 @return slot if found else NULL. */
2695 static
2696 fts_slot_t*
fts_optimize_find_slot(ib_vector_t * tables,const dict_table_t * table)2697 fts_optimize_find_slot(
2698 /*===================*/
2699 ib_vector_t* tables, /*!< in: vector of tables */
2700 const dict_table_t* table) /*!< in: table to add */
2701 {
2702 ulint i;
2703
2704 for (i = 0; i < ib_vector_size(tables); ++i) {
2705 fts_slot_t* slot;
2706
2707 slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2708
2709 if (slot->table->id == table->id) {
2710 return(slot);
2711 }
2712 }
2713
2714 return(NULL);
2715 }
2716
2717 /**********************************************************************//**
2718 Start optimizing table. */
2719 static
2720 void
fts_optimize_start_table(ib_vector_t * tables,dict_table_t * table)2721 fts_optimize_start_table(
2722 /*=====================*/
2723 ib_vector_t* tables, /*!< in/out: vector of tables */
2724 dict_table_t* table) /*!< in: table to optimize */
2725 {
2726 fts_slot_t* slot;
2727
2728 slot = fts_optimize_find_slot(tables, table);
2729
2730 if (slot == NULL) {
2731 ut_print_timestamp(stderr);
2732 fprintf(stderr, " InnoDB: Error: table %s not registered "
2733 "with the optimize thread.\n", table->name);
2734 } else {
2735 slot->last_run = 0;
2736 slot->completed = 0;
2737 }
2738 }
2739
2740 /**********************************************************************//**
2741 Add the table to the vector if it doesn't already exist. */
2742 static
2743 ibool
fts_optimize_new_table(ib_vector_t * tables,dict_table_t * table)2744 fts_optimize_new_table(
2745 /*===================*/
2746 ib_vector_t* tables, /*!< in/out: vector of tables */
2747 dict_table_t* table) /*!< in: table to add */
2748 {
2749 ulint i;
2750 fts_slot_t* slot;
2751 ulint empty_slot = ULINT_UNDEFINED;
2752
2753 /* Search for duplicates, also find a free slot if one exists. */
2754 for (i = 0; i < ib_vector_size(tables); ++i) {
2755
2756 slot = static_cast<fts_slot_t*>(
2757 ib_vector_get(tables, i));
2758
2759 if (slot->state == FTS_STATE_EMPTY) {
2760 empty_slot = i;
2761 } else if (slot->table->id == table->id) {
2762 /* Already exists in our optimize queue. */
2763 ut_ad(slot->table_id = table->id);
2764 return(FALSE);
2765 }
2766 }
2767
2768 /* Reuse old slot. */
2769 if (empty_slot != ULINT_UNDEFINED) {
2770
2771 slot = static_cast<fts_slot_t*>(
2772 ib_vector_get(tables, empty_slot));
2773
2774 ut_a(slot->state == FTS_STATE_EMPTY);
2775
2776 } else { /* Create a new slot. */
2777
2778 slot = static_cast<fts_slot_t*>(ib_vector_push(tables, NULL));
2779 }
2780
2781 memset(slot, 0x0, sizeof(*slot));
2782
2783 slot->table = table;
2784 slot->table_id = table->id;
2785 slot->state = FTS_STATE_LOADED;
2786 slot->interval_time = FTS_OPTIMIZE_INTERVAL_IN_SECS;
2787
2788 return(TRUE);
2789 }
2790
2791 /**********************************************************************//**
2792 Remove the table from the vector if it exists. */
2793 static
2794 ibool
fts_optimize_del_table(ib_vector_t * tables,fts_msg_del_t * msg)2795 fts_optimize_del_table(
2796 /*===================*/
2797 ib_vector_t* tables, /*!< in/out: vector of tables */
2798 fts_msg_del_t* msg) /*!< in: table to delete */
2799 {
2800 ulint i;
2801 dict_table_t* table = msg->table;
2802
2803 for (i = 0; i < ib_vector_size(tables); ++i) {
2804 fts_slot_t* slot;
2805
2806 slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2807
2808 /* FIXME: Should we assert on this ? */
2809 if (slot->state != FTS_STATE_EMPTY
2810 && slot->table->id == table->id) {
2811
2812 ut_print_timestamp(stderr);
2813 fprintf(stderr, " InnoDB: FTS Optimize Removing "
2814 "table %s\n", table->name);
2815
2816 slot->table = NULL;
2817 slot->state = FTS_STATE_EMPTY;
2818
2819 return(TRUE);
2820 }
2821 }
2822
2823 return(FALSE);
2824 }
2825
2826 /**********************************************************************//**
2827 Calculate how many of the registered tables need to be optimized.
2828 @return no. of tables to optimize */
2829 static
2830 ulint
fts_optimize_how_many(const ib_vector_t * tables)2831 fts_optimize_how_many(
2832 /*==================*/
2833 const ib_vector_t* tables) /*!< in: registered tables
2834 vector*/
2835 {
2836 ulint i;
2837 ib_time_t delta;
2838 ulint n_tables = 0;
2839 ib_time_t current_time;
2840
2841 current_time = ut_time();
2842
2843 for (i = 0; i < ib_vector_size(tables); ++i) {
2844 const fts_slot_t* slot;
2845
2846 slot = static_cast<const fts_slot_t*>(
2847 ib_vector_get_const(tables, i));
2848
2849 switch (slot->state) {
2850 case FTS_STATE_DONE:
2851 case FTS_STATE_LOADED:
2852 ut_a(slot->completed <= current_time);
2853
2854 delta = current_time - slot->completed;
2855
2856 /* Skip slots that have been optimized recently. */
2857 if (delta >= slot->interval_time) {
2858 ++n_tables;
2859 }
2860 break;
2861
2862 case FTS_STATE_RUNNING:
2863 ut_a(slot->last_run <= current_time);
2864
2865 delta = current_time - slot->last_run;
2866
2867 if (delta > slot->interval_time) {
2868 ++n_tables;
2869 }
2870 break;
2871
2872 /* Slots in a state other than the above
2873 are ignored. */
2874 case FTS_STATE_EMPTY:
2875 case FTS_STATE_SUSPENDED:
2876 break;
2877 }
2878
2879 }
2880
2881 return(n_tables);
2882 }
2883
2884 /**********************************************************************//**
2885 Check if the total memory used by all FTS table exceeds the maximum limit.
2886 @return true if a sync is needed, false otherwise */
2887 static
2888 bool
fts_is_sync_needed(const ib_vector_t * tables)2889 fts_is_sync_needed(
2890 /*===============*/
2891 const ib_vector_t* tables) /*!< in: registered tables
2892 vector*/
2893 {
2894 ulint total_memory = 0;
2895 double time_diff = difftime(ut_time(), last_check_sync_time);
2896
2897 if (fts_need_sync || time_diff < 5) {
2898 return(false);
2899 }
2900
2901 last_check_sync_time = ut_time();
2902
2903 for (ulint i = 0; i < ib_vector_size(tables); ++i) {
2904 const fts_slot_t* slot;
2905
2906 slot = static_cast<const fts_slot_t*>(
2907 ib_vector_get_const(tables, i));
2908
2909 if (slot->state != FTS_STATE_EMPTY && slot->table
2910 && slot->table->fts) {
2911 total_memory += slot->table->fts->cache->total_size;
2912 }
2913
2914 if (total_memory > fts_max_total_cache_size) {
2915 return(true);
2916 }
2917 }
2918
2919 return(false);
2920 }
2921
2922 #if 0
2923 /*********************************************************************//**
2924 Check whether a table needs to be optimized. */
2925 static
2926 void
2927 fts_optimize_need_sync(
2928 /*===================*/
2929 ib_vector_t* tables) /*!< in: list of tables */
2930 {
2931 dict_table_t* table = NULL;
2932 fts_slot_t* slot;
2933 ulint num_table = ib_vector_size(tables);
2934
2935 if (!num_table) {
2936 return;
2937 }
2938
2939 if (fts_optimize_sync_iterator >= num_table) {
2940 fts_optimize_sync_iterator = 0;
2941 }
2942
2943 slot = ib_vector_get(tables, fts_optimize_sync_iterator);
2944 table = slot->table;
2945
2946 if (!table) {
2947 return;
2948 }
2949
2950 ut_ad(table->fts);
2951
2952 if (table->fts->cache) {
2953 ulint deleted = table->fts->cache->deleted;
2954
2955 if (table->fts->cache->added
2956 >= fts_optimize_add_threshold) {
2957 fts_sync_table(table);
2958 } else if (deleted >= fts_optimize_delete_threshold) {
2959 fts_optimize_do_table(table);
2960
2961 mutex_enter(&table->fts->cache->deleted_lock);
2962 table->fts->cache->deleted -= deleted;
2963 mutex_exit(&table->fts->cache->deleted_lock);
2964 }
2965 }
2966
2967 fts_optimize_sync_iterator++;
2968
2969 return;
2970 }
2971 #endif
2972
2973 /** Sync fts cache of a table
2974 @param[in] table_id table id */
2975 void
fts_optimize_sync_table(table_id_t table_id)2976 fts_optimize_sync_table(
2977 table_id_t table_id)
2978 {
2979 dict_table_t* table = NULL;
2980
2981 table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL);
2982
2983 if (table) {
2984 if (dict_table_has_fts_index(table) && table->fts->cache) {
2985 fts_sync_table(table, true, false, true);
2986 }
2987
2988 dict_table_close(table, FALSE, FALSE);
2989 }
2990 }
2991
2992 /**********************************************************************//**
2993 Optimize all FTS tables.
2994 @return Dummy return */
2995 UNIV_INTERN
2996 os_thread_ret_t
fts_optimize_thread(void * arg)2997 fts_optimize_thread(
2998 /*================*/
2999 void* arg) /*!< in: work queue*/
3000 {
3001 mem_heap_t* heap;
3002 ib_vector_t* tables;
3003 ib_alloc_t* heap_alloc;
3004 ulint current = 0;
3005 ibool done = FALSE;
3006 ulint n_tables = 0;
3007 os_event_t exit_event = 0;
3008 ulint n_optimize = 0;
3009 ib_wqueue_t* wq = (ib_wqueue_t*) arg;
3010
3011 ut_ad(!srv_read_only_mode);
3012 my_thread_init();
3013
3014 heap = mem_heap_create(sizeof(dict_table_t*) * 64);
3015 heap_alloc = ib_heap_allocator_create(heap);
3016
3017 tables = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
3018
3019 while(!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
3020
3021 /* If there is no message in the queue and we have tables
3022 to optimize then optimize the tables. */
3023
3024 if (!done
3025 && ib_wqueue_is_empty(wq)
3026 && n_tables > 0
3027 && n_optimize > 0) {
3028
3029 fts_slot_t* slot;
3030
3031 ut_a(ib_vector_size(tables) > 0);
3032
3033 slot = static_cast<fts_slot_t*>(
3034 ib_vector_get(tables, current));
3035
3036 /* Handle the case of empty slots. */
3037 if (slot->state != FTS_STATE_EMPTY) {
3038
3039 slot->state = FTS_STATE_RUNNING;
3040
3041 fts_optimize_table_bk(slot);
3042 }
3043
3044 ++current;
3045
3046 /* Wrap around the counter. */
3047 if (current >= ib_vector_size(tables)) {
3048 n_optimize = fts_optimize_how_many(tables);
3049
3050 current = 0;
3051 }
3052
3053 } else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
3054 fts_msg_t* msg;
3055
3056 msg = static_cast<fts_msg_t*>(
3057 ib_wqueue_timedwait(wq,
3058 FTS_QUEUE_WAIT_IN_USECS));
3059
3060 /* Timeout ? */
3061 if (msg == NULL) {
3062 if (fts_is_sync_needed(tables)) {
3063 fts_need_sync = true;
3064 }
3065
3066 continue;
3067 }
3068
3069 switch (msg->type) {
3070 case FTS_MSG_START:
3071 break;
3072
3073 case FTS_MSG_PAUSE:
3074 break;
3075
3076 case FTS_MSG_STOP:
3077 done = TRUE;
3078 exit_event = (os_event_t) msg->ptr;
3079 break;
3080
3081 case FTS_MSG_ADD_TABLE:
3082 ut_a(!done);
3083 if (fts_optimize_new_table(
3084 tables,
3085 static_cast<dict_table_t*>(
3086 msg->ptr))) {
3087 ++n_tables;
3088 }
3089 break;
3090
3091 case FTS_MSG_OPTIMIZE_TABLE:
3092 if (!done) {
3093 fts_optimize_start_table(
3094 tables,
3095 static_cast<dict_table_t*>(
3096 msg->ptr));
3097 }
3098 break;
3099
3100 case FTS_MSG_DEL_TABLE:
3101 if (fts_optimize_del_table(
3102 tables, static_cast<fts_msg_del_t*>(
3103 msg->ptr))) {
3104 --n_tables;
3105 }
3106
3107 /* Signal the producer that we have
3108 removed the table. */
3109 os_event_set(
3110 ((fts_msg_del_t*) msg->ptr)->event);
3111 break;
3112
3113 case FTS_MSG_SYNC_TABLE:
3114 fts_optimize_sync_table(
3115 *static_cast<table_id_t*>(msg->ptr));
3116 break;
3117
3118 default:
3119 ut_error;
3120 }
3121
3122 mem_heap_free(msg->heap);
3123
3124 if (!done) {
3125 n_optimize = fts_optimize_how_many(tables);
3126 } else {
3127 n_optimize = 0;
3128 }
3129 }
3130 }
3131
3132 /* Server is being shutdown, sync the data from FTS cache to disk
3133 if needed */
3134 if (n_tables > 0) {
3135 ulint i;
3136
3137 for (i = 0; i < ib_vector_size(tables); i++) {
3138 fts_slot_t* slot;
3139
3140 slot = static_cast<fts_slot_t*>(
3141 ib_vector_get(tables, i));
3142
3143 if (slot->state != FTS_STATE_EMPTY) {
3144 fts_optimize_sync_table(slot->table_id);
3145 }
3146 }
3147 }
3148
3149 ib_vector_free(tables);
3150
3151 ib_logf(IB_LOG_LEVEL_INFO, "FTS optimize thread exiting.");
3152
3153 os_event_set(exit_event);
3154 my_thread_end();
3155
3156 /* We count the number of threads in os_thread_exit(). A created
3157 thread should always use that to exit and not use return() to exit. */
3158 os_thread_exit(NULL);
3159
3160 OS_THREAD_DUMMY_RETURN;
3161 }
3162
3163 /**********************************************************************//**
3164 Startup the optimize thread and create the work queue. */
3165 UNIV_INTERN
3166 void
fts_optimize_init(void)3167 fts_optimize_init(void)
3168 /*===================*/
3169 {
3170 ut_ad(!srv_read_only_mode);
3171
3172 /* For now we only support one optimize thread. */
3173 ut_a(fts_optimize_wq == NULL);
3174
3175 fts_optimize_wq = ib_wqueue_create();
3176 ut_a(fts_optimize_wq != NULL);
3177 last_check_sync_time = ut_time();
3178
3179 os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
3180 }
3181
3182 /**********************************************************************//**
3183 Check whether the work queue is initialized.
3184 @return TRUE if optimze queue is initialized. */
3185 UNIV_INTERN
3186 ibool
fts_optimize_is_init(void)3187 fts_optimize_is_init(void)
3188 /*======================*/
3189 {
3190 return(fts_optimize_wq != NULL);
3191 }
3192
3193 /**********************************************************************//**
3194 Signal the optimize thread to prepare for shutdown. */
3195 UNIV_INTERN
3196 void
fts_optimize_start_shutdown(void)3197 fts_optimize_start_shutdown(void)
3198 /*=============================*/
3199 {
3200 ut_ad(!srv_read_only_mode);
3201
3202 fts_msg_t* msg;
3203 os_event_t event;
3204
3205 /* If there is an ongoing activity on dictionary, such as
3206 srv_master_evict_from_table_cache(), wait for it */
3207 dict_mutex_enter_for_mysql();
3208
3209 /* Tells FTS optimizer system that we are exiting from
3210 optimizer thread, message send their after will not be
3211 processed */
3212 fts_opt_start_shutdown = true;
3213 dict_mutex_exit_for_mysql();
3214
3215 /* We tell the OPTIMIZE thread to switch to state done, we
3216 can't delete the work queue here because the add thread needs
3217 deregister the FTS tables. */
3218 event = os_event_create();
3219
3220 msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
3221 msg->ptr = event;
3222
3223 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
3224
3225 os_event_wait(event);
3226 os_event_free(event);
3227
3228 ib_wqueue_free(fts_optimize_wq);
3229
3230 }
3231
3232 /**********************************************************************//**
3233 Reset the work queue. */
3234 UNIV_INTERN
3235 void
fts_optimize_end(void)3236 fts_optimize_end(void)
3237 /*==================*/
3238 {
3239 ut_ad(!srv_read_only_mode);
3240
3241 // FIXME: Potential race condition here: We should wait for
3242 // the optimize thread to confirm shutdown.
3243 fts_optimize_wq = NULL;
3244 }
3245