1 /*****************************************************************************
2
3 Copyright (c) 2007, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /******************************************************************//**
21 @file fts/fts0opt.cc
22 Full Text Search optimize thread
23
24 Created 2007/03/27 Sunny Bains
25 Completed 2011/7/10 Sunny and Jimmy Yang
26
27 ***********************************************************************/
28
29 #include "fts0fts.h"
30 #include "row0sel.h"
31 #include "que0types.h"
32 #include "fts0priv.h"
33 #include "fts0types.h"
34 #include "ut0wqueue.h"
35 #include "srv0start.h"
36 #include "ut0list.h"
37 #include "zlib.h"
38 #include "fts0opt.h"
39 #include "fts0vlc.h"
40
41 /** The FTS optimize thread's work queue. */
42 ib_wqueue_t* fts_optimize_wq;
43
44 /** The FTS vector to store fts_slot_t */
45 static ib_vector_t* fts_slots;
46
47 /** Time to wait for a message. */
48 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
49
50 /** Default optimize interval in secs. */
51 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
52
53 /** Server is shutting down, so does we exiting the optimize thread */
54 static bool fts_opt_start_shutdown = false;
55
56 /** Event to wait for shutdown of the optimize thread */
57 static os_event_t fts_opt_shutdown_event = NULL;
58
59 /** Initial size of nodes in fts_word_t. */
60 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
61
62 /** Last time we did check whether system need a sync */
63 static time_t last_check_sync_time;
64
65 /** FTS optimize thread message types. */
66 enum fts_msg_type_t {
67 FTS_MSG_STOP, /*!< Stop optimizing and exit thread */
68
69 FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
70 work queue */
71
72 FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
73 threads work queue */
74 FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
75 };
76
77 /** Compressed list of words that have been read from FTS INDEX
78 that needs to be optimized. */
79 struct fts_zip_t {
80 lint status; /*!< Status of (un)/zip operation */
81
82 ulint n_words; /*!< Number of words compressed */
83
84 ulint block_sz; /*!< Size of a block in bytes */
85
86 ib_vector_t* blocks; /*!< Vector of compressed blocks */
87
88 ib_alloc_t* heap_alloc; /*!< Heap to use for allocations */
89
90 ulint pos; /*!< Offset into blocks */
91
92 ulint last_big_block; /*!< Offset of last block in the
93 blocks array that is of size
94 block_sz. Blocks beyond this offset
95 are of size FTS_MAX_WORD_LEN */
96
97 z_streamp zp; /*!< ZLib state */
98
99 /*!< The value of the last word read
100 from the FTS INDEX table. This is
101 used to discard duplicates */
102
103 fts_string_t word; /*!< UTF-8 string */
104
105 ulint max_words; /*!< maximum number of words to read
106 in one pase */
107 };
108
109 /** Prepared statemets used during optimize */
110 struct fts_optimize_graph_t {
111 /*!< Delete a word from FTS INDEX */
112 que_t* delete_nodes_graph;
113 /*!< Insert a word into FTS INDEX */
114 que_t* write_nodes_graph;
115 /*!< COMMIT a transaction */
116 que_t* commit_graph;
117 /*!< Read the nodes from FTS_INDEX */
118 que_t* read_nodes_graph;
119 };
120
121 /** Used by fts_optimize() to store state. */
122 struct fts_optimize_t {
123 trx_t* trx; /*!< The transaction used for all SQL */
124
125 ib_alloc_t* self_heap; /*!< Heap to use for allocations */
126
127 char* name_prefix; /*!< FTS table name prefix */
128
129 fts_table_t fts_index_table;/*!< Common table definition */
130
131 /*!< Common table definition */
132 fts_table_t fts_common_table;
133
134 dict_table_t* table; /*!< Table that has to be queried */
135
136 dict_index_t* index; /*!< The FTS index to be optimized */
137
138 fts_doc_ids_t* to_delete; /*!< doc ids to delete, we check against
139 this vector and purge the matching
140 entries during the optimizing
141 process. The vector entries are
142 sorted on doc id */
143
144 ulint del_pos; /*!< Offset within to_delete vector,
145 this is used to keep track of where
146 we are up to in the vector */
147
148 ibool done; /*!< TRUE when optimize finishes */
149
150 ib_vector_t* words; /*!< Word + Nodes read from FTS_INDEX,
151 it contains instances of fts_word_t */
152
153 fts_zip_t* zip; /*!< Words read from the FTS_INDEX */
154
155 fts_optimize_graph_t /*!< Prepared statements used during */
156 graph; /*optimize */
157
158 ulint n_completed; /*!< Number of FTS indexes that have
159 been optimized */
160 ibool del_list_regenerated;
161 /*!< BEING_DELETED list regenarated */
162 };
163
164 /** Used by the optimize, to keep state during compacting nodes. */
165 struct fts_encode_t {
166 doc_id_t src_last_doc_id;/*!< Last doc id read from src node */
167 byte* src_ilist_ptr; /*!< Current ptr within src ilist */
168 };
169
170 /** We use this information to determine when to start the optimize
171 cycle for a table. */
172 struct fts_slot_t {
173 /** table, or NULL if the slot is unused */
174 dict_table_t* table;
175
176 /** whether this slot is being processed */
177 bool running;
178
179 ulint added; /*!< Number of doc ids added since the
180 last time this table was optimized */
181
182 ulint deleted; /*!< Number of doc ids deleted since the
183 last time this table was optimized */
184
185 /** time(NULL) of completing fts_optimize_table_bk() */
186 time_t last_run;
187
188 /** time(NULL) of latest successful fts_optimize_table() */
189 time_t completed;
190 };
191
192 /** A table remove message for the FTS optimize thread. */
193 struct fts_msg_del_t {
194 dict_table_t* table; /*!< The table to remove */
195
196 os_event_t event; /*!< Event to synchronize acknowledgement
197 of receipt and processing of the
198 this message by the consumer */
199 };
200
201 /** The FTS optimize message work queue message type. */
202 struct fts_msg_t {
203 fts_msg_type_t type; /*!< Message type */
204
205 void* ptr; /*!< The message contents */
206
207 mem_heap_t* heap; /*!< The heap used to allocate this
208 message, the message consumer will
209 free the heap. */
210 };
211
212 /** The number of words to read and optimize in a single pass. */
213 ulong fts_num_word_optimize;
214
215 /** Whether to enable additional FTS diagnostic printout. */
216 char fts_enable_diag_print;
217
218 /** ZLib compressed block size.*/
219 static ulint FTS_ZIP_BLOCK_SIZE = 1024;
220
221 /** The amount of time optimizing in a single pass, in seconds. */
222 static ulint fts_optimize_time_limit;
223
224 /** It's defined in fts0fts.cc */
225 extern const char* fts_common_tables[];
226
227 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
228 static const char* fts_init_delete_sql =
229 "BEGIN\n"
230 "\n"
231 "INSERT INTO $BEING_DELETED\n"
232 "SELECT doc_id FROM $DELETED;\n"
233 "\n"
234 "INSERT INTO $BEING_DELETED_CACHE\n"
235 "SELECT doc_id FROM $DELETED_CACHE;\n";
236
237 static const char* fts_delete_doc_ids_sql =
238 "BEGIN\n"
239 "\n"
240 "DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
241 "DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
242
243 static const char* fts_end_delete_sql =
244 "BEGIN\n"
245 "\n"
246 "DELETE FROM $BEING_DELETED;\n"
247 "DELETE FROM $BEING_DELETED_CACHE;\n";
248
249 /**********************************************************************//**
250 Initialize fts_zip_t. */
251 static
252 void
fts_zip_initialize(fts_zip_t * zip)253 fts_zip_initialize(
254 /*===============*/
255 fts_zip_t* zip) /*!< out: zip instance to initialize */
256 {
257 zip->pos = 0;
258 zip->n_words = 0;
259
260 zip->status = Z_OK;
261
262 zip->last_big_block = 0;
263
264 zip->word.f_len = 0;
265 *zip->word.f_str = 0;
266
267 ib_vector_reset(zip->blocks);
268
269 memset(zip->zp, 0, sizeof(*zip->zp));
270 }
271
272 /**********************************************************************//**
273 Create an instance of fts_zip_t.
274 @return a new instance of fts_zip_t */
275 static
276 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)277 fts_zip_create(
278 /*===========*/
279 mem_heap_t* heap, /*!< in: heap */
280 ulint block_sz, /*!< in: size of a zip block.*/
281 ulint max_words) /*!< in: max words to read */
282 {
283 fts_zip_t* zip;
284
285 zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
286
287 zip->word.f_str = static_cast<byte*>(
288 mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
289
290 zip->block_sz = block_sz;
291
292 zip->heap_alloc = ib_heap_allocator_create(heap);
293
294 zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
295
296 zip->max_words = max_words;
297
298 zip->zp = static_cast<z_stream*>(
299 mem_heap_zalloc(heap, sizeof(*zip->zp)));
300
301 return(zip);
302 }
303
304 /**********************************************************************//**
305 Initialize an instance of fts_zip_t. */
306 static
307 void
fts_zip_init(fts_zip_t * zip)308 fts_zip_init(
309 /*=========*/
310
311 fts_zip_t* zip) /*!< in: zip instance to init */
312 {
313 memset(zip->zp, 0, sizeof(*zip->zp));
314
315 zip->word.f_len = 0;
316 *zip->word.f_str = '\0';
317 }
318
319 /**********************************************************************//**
320 Create a fts_optimizer_word_t instance.
321 @return new instance */
322 static
323 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)324 fts_word_init(
325 /*==========*/
326 fts_word_t* word, /*!< in: word to initialize */
327 byte* utf8, /*!< in: UTF-8 string */
328 ulint len) /*!< in: length of string in bytes */
329 {
330 mem_heap_t* heap = mem_heap_create(sizeof(fts_node_t));
331
332 memset(word, 0, sizeof(*word));
333
334 word->text.f_len = len;
335 word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
336
337 /* Need to copy the NUL character too. */
338 memcpy(word->text.f_str, utf8, word->text.f_len);
339 word->text.f_str[word->text.f_len] = 0;
340
341 word->heap_alloc = ib_heap_allocator_create(heap);
342
343 word->nodes = ib_vector_create(
344 word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
345
346 return(word);
347 }
348
349 /**********************************************************************//**
350 Read the FTS INDEX row.
351 @return fts_node_t instance */
352 static
353 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)354 fts_optimize_read_node(
355 /*===================*/
356 fts_word_t* word, /*!< in: */
357 que_node_t* exp) /*!< in: */
358 {
359 int i;
360 fts_node_t* node = static_cast<fts_node_t*>(
361 ib_vector_push(word->nodes, NULL));
362
363 /* Start from 1 since the first node has been read by the caller */
364 for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
365
366 dfield_t* dfield = que_node_get_val(exp);
367 byte* data = static_cast<byte*>(
368 dfield_get_data(dfield));
369 ulint len = dfield_get_len(dfield);
370
371 ut_a(len != UNIV_SQL_NULL);
372
373 /* Note: The column numbers below must match the SELECT */
374 switch (i) {
375 case 1: /* DOC_COUNT */
376 node->doc_count = mach_read_from_4(data);
377 break;
378
379 case 2: /* FIRST_DOC_ID */
380 node->first_doc_id = fts_read_doc_id(data);
381 break;
382
383 case 3: /* LAST_DOC_ID */
384 node->last_doc_id = fts_read_doc_id(data);
385 break;
386
387 case 4: /* ILIST */
388 node->ilist_size_alloc = node->ilist_size = len;
389 node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
390 memcpy(node->ilist, data, len);
391 break;
392
393 default:
394 ut_error;
395 }
396 }
397
398 /* Make sure all columns were read. */
399 ut_a(i == 5);
400
401 return(node);
402 }
403
404 /**********************************************************************//**
405 Callback function to fetch the rows in an FTS INDEX record.
406 @return always returns non-NULL */
407 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)408 fts_optimize_index_fetch_node(
409 /*==========================*/
410 void* row, /*!< in: sel_node_t* */
411 void* user_arg) /*!< in: pointer to ib_vector_t */
412 {
413 fts_word_t* word;
414 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
415 fts_fetch_t* fetch = static_cast<fts_fetch_t*>(user_arg);
416 ib_vector_t* words = static_cast<ib_vector_t*>(fetch->read_arg);
417 que_node_t* exp = sel_node->select_list;
418 dfield_t* dfield = que_node_get_val(exp);
419 void* data = dfield_get_data(dfield);
420 ulint dfield_len = dfield_get_len(dfield);
421 fts_node_t* node;
422 bool is_word_init = false;
423
424 ut_a(dfield_len <= FTS_MAX_WORD_LEN);
425
426 if (ib_vector_size(words) == 0) {
427
428 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
429 fts_word_init(word, (byte*) data, dfield_len);
430 is_word_init = true;
431 }
432
433 word = static_cast<fts_word_t*>(ib_vector_last(words));
434
435 if (dfield_len != word->text.f_len
436 || memcmp(word->text.f_str, data, dfield_len)) {
437
438 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
439 fts_word_init(word, (byte*) data, dfield_len);
440 is_word_init = true;
441 }
442
443 node = fts_optimize_read_node(word, que_node_get_next(exp));
444
445 fetch->total_memory += node->ilist_size;
446 if (is_word_init) {
447 fetch->total_memory += sizeof(fts_word_t)
448 + sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
449 + sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
450 } else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
451 fetch->total_memory += sizeof(fts_node_t);
452 }
453
454 if (fetch->total_memory >= fts_result_cache_limit) {
455 return(FALSE);
456 }
457
458 return(TRUE);
459 }
460
461 /**********************************************************************//**
462 Read the rows from the FTS inde.
463 @return DB_SUCCESS or error code */
464 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)465 fts_index_fetch_nodes(
466 /*==================*/
467 trx_t* trx, /*!< in: transaction */
468 que_t** graph, /*!< in: prepared statement */
469 fts_table_t* fts_table, /*!< in: table of the FTS INDEX */
470 const fts_string_t*
471 word, /*!< in: the word to fetch */
472 fts_fetch_t* fetch) /*!< in: fetch callback.*/
473 {
474 pars_info_t* info;
475 dberr_t error;
476 char table_name[MAX_FULL_NAME_LEN];
477
478 trx->op_info = "fetching FTS index nodes";
479
480 if (*graph) {
481 info = (*graph)->info;
482 } else {
483 ulint selected;
484
485 info = pars_info_create();
486
487 ut_a(fts_table->type == FTS_INDEX_TABLE);
488
489 selected = fts_select_index(fts_table->charset,
490 word->f_str, word->f_len);
491
492 fts_table->suffix = fts_get_suffix(selected);
493
494 fts_get_table_name(fts_table, table_name);
495
496 pars_info_bind_id(info, "table_name", table_name);
497 }
498
499 pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
500 pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
501
502 if (!*graph) {
503
504 *graph = fts_parse_sql(
505 fts_table,
506 info,
507 "DECLARE FUNCTION my_func;\n"
508 "DECLARE CURSOR c IS"
509 " SELECT word, doc_count, first_doc_id, last_doc_id,"
510 " ilist\n"
511 " FROM $table_name\n"
512 " WHERE word LIKE :word\n"
513 " ORDER BY first_doc_id;\n"
514 "BEGIN\n"
515 "\n"
516 "OPEN c;\n"
517 "WHILE 1 = 1 LOOP\n"
518 " FETCH c INTO my_func();\n"
519 " IF c % NOTFOUND THEN\n"
520 " EXIT;\n"
521 " END IF;\n"
522 "END LOOP;\n"
523 "CLOSE c;");
524 }
525
526 for (;;) {
527 error = fts_eval_sql(trx, *graph);
528
529 if (UNIV_LIKELY(error == DB_SUCCESS)) {
530 fts_sql_commit(trx);
531
532 break; /* Exit the loop. */
533 } else {
534 fts_sql_rollback(trx);
535
536 if (error == DB_LOCK_WAIT_TIMEOUT) {
537 ib::warn() << "lock wait timeout reading"
538 " FTS index. Retrying!";
539
540 trx->error_state = DB_SUCCESS;
541 } else {
542 ib::error() << "(" << error
543 << ") while reading FTS index.";
544
545 break; /* Exit the loop. */
546 }
547 }
548 }
549
550 return(error);
551 }
552
553 /**********************************************************************//**
554 Read a word */
555 static
556 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)557 fts_zip_read_word(
558 /*==============*/
559 fts_zip_t* zip, /*!< in: Zip state + data */
560 fts_string_t* word) /*!< out: uncompressed word */
561 {
562 short len = 0;
563 void* null = NULL;
564 byte* ptr = word->f_str;
565 int flush = Z_NO_FLUSH;
566
567 /* Either there was an error or we are at the Z_STREAM_END. */
568 if (zip->status != Z_OK) {
569 return(NULL);
570 }
571
572 zip->zp->next_out = reinterpret_cast<byte*>(&len);
573 zip->zp->avail_out = sizeof(len);
574
575 while (zip->status == Z_OK && zip->zp->avail_out > 0) {
576
577 /* Finished decompressing block. */
578 if (zip->zp->avail_in == 0) {
579
580 /* Free the block thats been decompressed. */
581 if (zip->pos > 0) {
582 ulint prev = zip->pos - 1;
583
584 ut_a(zip->pos < ib_vector_size(zip->blocks));
585
586 ut_free(ib_vector_getp(zip->blocks, prev));
587 ib_vector_set(zip->blocks, prev, &null);
588 }
589
590 /* Any more blocks to decompress. */
591 if (zip->pos < ib_vector_size(zip->blocks)) {
592
593 zip->zp->next_in = static_cast<byte*>(
594 ib_vector_getp(
595 zip->blocks, zip->pos));
596
597 if (zip->pos > zip->last_big_block) {
598 zip->zp->avail_in =
599 FTS_MAX_WORD_LEN;
600 } else {
601 zip->zp->avail_in =
602 static_cast<uInt>(zip->block_sz);
603 }
604
605 ++zip->pos;
606 } else {
607 flush = Z_FINISH;
608 }
609 }
610
611 switch (zip->status = inflate(zip->zp, flush)) {
612 case Z_OK:
613 if (zip->zp->avail_out == 0 && len > 0) {
614
615 ut_a(len <= FTS_MAX_WORD_LEN);
616 ptr[len] = 0;
617
618 zip->zp->next_out = ptr;
619 zip->zp->avail_out = uInt(len);
620
621 word->f_len = ulint(len);
622 len = 0;
623 }
624 break;
625
626 case Z_BUF_ERROR: /* No progress possible. */
627 case Z_STREAM_END:
628 inflateEnd(zip->zp);
629 break;
630
631 case Z_STREAM_ERROR:
632 default:
633 ut_error;
634 }
635 }
636
637 /* All blocks must be freed at end of inflate. */
638 if (zip->status != Z_OK) {
639 for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
640 if (ib_vector_getp(zip->blocks, i)) {
641 ut_free(ib_vector_getp(zip->blocks, i));
642 ib_vector_set(zip->blocks, i, &null);
643 }
644 }
645 }
646
647 if (ptr != NULL) {
648 ut_ad(word->f_len == strlen((char*) ptr));
649 }
650
651 return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
652 }
653
654 /**********************************************************************//**
655 Callback function to fetch and compress the word in an FTS
656 INDEX record.
657 @return FALSE on EOF */
658 static
659 ibool
fts_fetch_index_words(void * row,void * user_arg)660 fts_fetch_index_words(
661 /*==================*/
662 void* row, /*!< in: sel_node_t* */
663 void* user_arg) /*!< in: pointer to ib_vector_t */
664 {
665 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
666 fts_zip_t* zip = static_cast<fts_zip_t*>(user_arg);
667 que_node_t* exp = sel_node->select_list;
668 dfield_t* dfield = que_node_get_val(exp);
669
670 ut_a(dfield_get_len(dfield) <= FTS_MAX_WORD_LEN);
671
672 uint16 len = uint16(dfield_get_len(dfield));
673 void* data = dfield_get_data(dfield);
674
675 /* Skip the duplicate words. */
676 if (zip->word.f_len == len && !memcmp(zip->word.f_str, data, len)) {
677 return(TRUE);
678 }
679
680 memcpy(zip->word.f_str, data, len);
681 zip->word.f_len = len;
682
683 ut_a(zip->zp->avail_in == 0);
684 ut_a(zip->zp->next_in == NULL);
685
686 /* The string is prefixed by len. */
687 /* FIXME: This is not byte order agnostic (InnoDB data files
688 with FULLTEXT INDEX are not portable between little-endian and
689 big-endian systems!) */
690 zip->zp->next_in = reinterpret_cast<byte*>(&len);
691 zip->zp->avail_in = sizeof(len);
692
693 /* Compress the word, create output blocks as necessary. */
694 while (zip->zp->avail_in > 0) {
695
696 /* No space left in output buffer, create a new one. */
697 if (zip->zp->avail_out == 0) {
698 byte* block;
699
700 block = static_cast<byte*>(
701 ut_malloc_nokey(zip->block_sz));
702
703 ib_vector_push(zip->blocks, &block);
704
705 zip->zp->next_out = block;
706 zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
707 }
708
709 switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
710 case Z_OK:
711 if (zip->zp->avail_in == 0) {
712 zip->zp->next_in = static_cast<byte*>(data);
713 zip->zp->avail_in = uInt(len);
714 ut_a(len <= FTS_MAX_WORD_LEN);
715 len = 0;
716 }
717 continue;
718
719 case Z_STREAM_END:
720 case Z_BUF_ERROR:
721 case Z_STREAM_ERROR:
722 default:
723 ut_error;
724 }
725 }
726
727 /* All data should have been compressed. */
728 ut_a(zip->zp->avail_in == 0);
729 zip->zp->next_in = NULL;
730
731 ++zip->n_words;
732
733 return(zip->n_words >= zip->max_words ? FALSE : TRUE);
734 }
735
736 /**********************************************************************//**
737 Finish Zip deflate. */
738 static
739 void
fts_zip_deflate_end(fts_zip_t * zip)740 fts_zip_deflate_end(
741 /*================*/
742 fts_zip_t* zip) /*!< in: instance that should be closed*/
743 {
744 ut_a(zip->zp->avail_in == 0);
745 ut_a(zip->zp->next_in == NULL);
746
747 zip->status = deflate(zip->zp, Z_FINISH);
748
749 ut_a(ib_vector_size(zip->blocks) > 0);
750 zip->last_big_block = ib_vector_size(zip->blocks) - 1;
751
752 /* Allocate smaller block(s), since this is trailing data. */
753 while (zip->status == Z_OK) {
754 byte* block;
755
756 ut_a(zip->zp->avail_out == 0);
757
758 block = static_cast<byte*>(
759 ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
760
761 ib_vector_push(zip->blocks, &block);
762
763 zip->zp->next_out = block;
764 zip->zp->avail_out = FTS_MAX_WORD_LEN;
765
766 zip->status = deflate(zip->zp, Z_FINISH);
767 }
768
769 ut_a(zip->status == Z_STREAM_END);
770
771 zip->status = deflateEnd(zip->zp);
772 ut_a(zip->status == Z_OK);
773
774 /* Reset the ZLib data structure. */
775 memset(zip->zp, 0, sizeof(*zip->zp));
776 }
777
778 /**********************************************************************//**
779 Read the words from the FTS INDEX.
780 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
781 to search else error code */
782 static MY_ATTRIBUTE((nonnull, warn_unused_result))
783 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)784 fts_index_fetch_words(
785 /*==================*/
786 fts_optimize_t* optim, /*!< in: optimize scratch pad */
787 const fts_string_t* word, /*!< in: get words greater than this
788 word */
789 ulint n_words)/*!< in: max words to read */
790 {
791 pars_info_t* info;
792 que_t* graph;
793 ulint selected;
794 fts_zip_t* zip = NULL;
795 dberr_t error = DB_SUCCESS;
796 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
797 ibool inited = FALSE;
798
799 optim->trx->op_info = "fetching FTS index words";
800
801 if (optim->zip == NULL) {
802 optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
803 } else {
804 fts_zip_initialize(optim->zip);
805 }
806
807 for (selected = fts_select_index(
808 optim->fts_index_table.charset, word->f_str, word->f_len);
809 selected < FTS_NUM_AUX_INDEX;
810 selected++) {
811
812 char table_name[MAX_FULL_NAME_LEN];
813
814 optim->fts_index_table.suffix = fts_get_suffix(selected);
815
816 info = pars_info_create();
817
818 pars_info_bind_function(
819 info, "my_func", fts_fetch_index_words, optim->zip);
820
821 pars_info_bind_varchar_literal(
822 info, "word", word->f_str, word->f_len);
823
824 fts_get_table_name(&optim->fts_index_table, table_name);
825 pars_info_bind_id(info, "table_name", table_name);
826
827 graph = fts_parse_sql(
828 &optim->fts_index_table,
829 info,
830 "DECLARE FUNCTION my_func;\n"
831 "DECLARE CURSOR c IS"
832 " SELECT word\n"
833 " FROM $table_name\n"
834 " WHERE word > :word\n"
835 " ORDER BY word;\n"
836 "BEGIN\n"
837 "\n"
838 "OPEN c;\n"
839 "WHILE 1 = 1 LOOP\n"
840 " FETCH c INTO my_func();\n"
841 " IF c % NOTFOUND THEN\n"
842 " EXIT;\n"
843 " END IF;\n"
844 "END LOOP;\n"
845 "CLOSE c;");
846
847 zip = optim->zip;
848
849 for (;;) {
850 int err;
851
852 if (!inited && ((err = deflateInit(zip->zp, 9))
853 != Z_OK)) {
854 ib::error() << "ZLib deflateInit() failed: "
855 << err;
856
857 error = DB_ERROR;
858 break;
859 } else {
860 inited = TRUE;
861 error = fts_eval_sql(optim->trx, graph);
862 }
863
864 if (UNIV_LIKELY(error == DB_SUCCESS)) {
865 //FIXME fts_sql_commit(optim->trx);
866 break;
867 } else {
868 //FIXME fts_sql_rollback(optim->trx);
869
870 if (error == DB_LOCK_WAIT_TIMEOUT) {
871 ib::warn() << "Lock wait timeout"
872 " reading document. Retrying!";
873
874 /* We need to reset the ZLib state. */
875 inited = FALSE;
876 deflateEnd(zip->zp);
877 fts_zip_init(zip);
878
879 optim->trx->error_state = DB_SUCCESS;
880 } else {
881 ib::error() << "(" << error
882 << ") while reading document.";
883
884 break; /* Exit the loop. */
885 }
886 }
887 }
888
889 fts_que_graph_free(graph);
890
891 /* Check if max word to fetch is exceeded */
892 if (optim->zip->n_words >= n_words) {
893 break;
894 }
895 }
896
897 if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
898
899 /* All data should have been read. */
900 ut_a(zip->zp->avail_in == 0);
901
902 fts_zip_deflate_end(zip);
903 } else {
904 deflateEnd(zip->zp);
905 }
906
907 return(error);
908 }
909
910 /**********************************************************************//**
911 Callback function to fetch the doc id from the record.
912 @return always returns TRUE */
913 static
914 ibool
fts_fetch_doc_ids(void * row,void * user_arg)915 fts_fetch_doc_ids(
916 /*==============*/
917 void* row, /*!< in: sel_node_t* */
918 void* user_arg) /*!< in: pointer to ib_vector_t */
919 {
920 que_node_t* exp;
921 int i = 0;
922 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
923 fts_doc_ids_t* fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
924 doc_id_t* update = static_cast<doc_id_t*>(
925 ib_vector_push(fts_doc_ids->doc_ids, NULL));
926
927 for (exp = sel_node->select_list;
928 exp;
929 exp = que_node_get_next(exp), ++i) {
930
931 dfield_t* dfield = que_node_get_val(exp);
932 void* data = dfield_get_data(dfield);
933 ulint len = dfield_get_len(dfield);
934
935 ut_a(len != UNIV_SQL_NULL);
936
937 /* Note: The column numbers below must match the SELECT. */
938 switch (i) {
939 case 0: /* DOC_ID */
940 *update = fts_read_doc_id(
941 static_cast<byte*>(data));
942 break;
943
944 default:
945 ut_error;
946 }
947 }
948
949 return(TRUE);
950 }
951
952 /**********************************************************************//**
953 Read the rows from a FTS common auxiliary table.
954 @return DB_SUCCESS or error code */
955 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)956 fts_table_fetch_doc_ids(
957 /*====================*/
958 trx_t* trx, /*!< in: transaction */
959 fts_table_t* fts_table, /*!< in: table */
960 fts_doc_ids_t* doc_ids) /*!< in: For collecting doc ids */
961 {
962 dberr_t error;
963 que_t* graph;
964 pars_info_t* info = pars_info_create();
965 ibool alloc_bk_trx = FALSE;
966 char table_name[MAX_FULL_NAME_LEN];
967
968 ut_a(fts_table->suffix != NULL);
969 ut_a(fts_table->type == FTS_COMMON_TABLE);
970
971 if (!trx) {
972 trx = trx_create();
973 alloc_bk_trx = TRUE;
974 }
975
976 trx->op_info = "fetching FTS doc ids";
977
978 pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
979
980 fts_get_table_name(fts_table, table_name);
981 pars_info_bind_id(info, "table_name", table_name);
982
983 graph = fts_parse_sql(
984 fts_table,
985 info,
986 "DECLARE FUNCTION my_func;\n"
987 "DECLARE CURSOR c IS"
988 " SELECT doc_id FROM $table_name;\n"
989 "BEGIN\n"
990 "\n"
991 "OPEN c;\n"
992 "WHILE 1 = 1 LOOP\n"
993 " FETCH c INTO my_func();\n"
994 " IF c % NOTFOUND THEN\n"
995 " EXIT;\n"
996 " END IF;\n"
997 "END LOOP;\n"
998 "CLOSE c;");
999
1000 error = fts_eval_sql(trx, graph);
1001 fts_sql_commit(trx);
1002
1003 mutex_enter(&dict_sys.mutex);
1004 que_graph_free(graph);
1005 mutex_exit(&dict_sys.mutex);
1006
1007 if (error == DB_SUCCESS) {
1008 ib_vector_sort(doc_ids->doc_ids, fts_doc_id_cmp);
1009 }
1010
1011 if (alloc_bk_trx) {
1012 trx->free();
1013 }
1014
1015 return(error);
1016 }
1017
1018 /**********************************************************************//**
1019 Do a binary search for a doc id in the array
1020 @return +ve index if found -ve index where it should be inserted
1021 if not found */
1022 int
fts_bsearch(doc_id_t * array,int lower,int upper,doc_id_t doc_id)1023 fts_bsearch(
1024 /*========*/
1025 doc_id_t* array, /*!< in: array to sort */
1026 int lower, /*!< in: the array lower bound */
1027 int upper, /*!< in: the array upper bound */
1028 doc_id_t doc_id) /*!< in: the doc id to search for */
1029 {
1030 int orig_size = upper;
1031
1032 if (upper == 0) {
1033 /* Nothing to search */
1034 return(-1);
1035 } else {
1036 while (lower < upper) {
1037 int i = (lower + upper) >> 1;
1038
1039 if (doc_id > array[i]) {
1040 lower = i + 1;
1041 } else if (doc_id < array[i]) {
1042 upper = i - 1;
1043 } else {
1044 return(i); /* Found. */
1045 }
1046 }
1047 }
1048
1049 if (lower == upper && lower < orig_size) {
1050 if (doc_id == array[lower]) {
1051 return(lower);
1052 } else if (lower == 0) {
1053 return(-1);
1054 }
1055 }
1056
1057 /* Not found. */
1058 return( (lower == 0) ? -1 : -(lower));
1059 }
1060
1061 /**********************************************************************//**
1062 Search in the to delete array whether any of the doc ids within
1063 the [first, last] range are to be deleted
1064 @return +ve index if found -ve index where it should be inserted
1065 if not found */
1066 static
1067 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1068 fts_optimize_lookup(
1069 /*================*/
1070 ib_vector_t* doc_ids, /*!< in: array to search */
1071 ulint lower, /*!< in: lower limit of array */
1072 doc_id_t first_doc_id, /*!< in: doc id to lookup */
1073 doc_id_t last_doc_id) /*!< in: doc id to lookup */
1074 {
1075 int pos;
1076 int upper = static_cast<int>(ib_vector_size(doc_ids));
1077 doc_id_t* array = (doc_id_t*) doc_ids->data;
1078
1079 pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1080
1081 ut_a(abs(pos) <= upper + 1);
1082
1083 if (pos < 0) {
1084
1085 int i = abs(pos);
1086
1087 /* If i is 1, it could be first_doc_id is less than
1088 either the first or second array item, do a
1089 double check */
1090 if (i == 1 && array[0] <= last_doc_id
1091 && first_doc_id < array[0]) {
1092 pos = 0;
1093 } else if (i < upper && array[i] <= last_doc_id) {
1094
1095 /* Check if the "next" doc id is within the
1096 first & last doc id of the node. */
1097 pos = i;
1098 }
1099 }
1100
1101 return(pos);
1102 }
1103
1104 /**********************************************************************//**
1105 Encode the word pos list into the node
1106 @return DB_SUCCESS or error code*/
1107 static MY_ATTRIBUTE((nonnull))
1108 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1109 fts_optimize_encode_node(
1110 /*=====================*/
1111 fts_node_t* node, /*!< in: node to fill*/
1112 doc_id_t doc_id, /*!< in: doc id to encode */
1113 fts_encode_t* enc) /*!< in: encoding state.*/
1114 {
1115 byte* dst;
1116 ulint enc_len;
1117 ulint pos_enc_len;
1118 doc_id_t doc_id_delta;
1119 dberr_t error = DB_SUCCESS;
1120 const byte* src = enc->src_ilist_ptr;
1121
1122 if (node->first_doc_id == 0) {
1123 ut_a(node->last_doc_id == 0);
1124
1125 node->first_doc_id = doc_id;
1126 }
1127
1128 /* Calculate the space required to store the ilist. */
1129 ut_ad(doc_id > node->last_doc_id);
1130 doc_id_delta = doc_id - node->last_doc_id;
1131 enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1132
1133 /* Calculate the size of the encoded pos array. */
1134 while (*src) {
1135 fts_decode_vlc(&src);
1136 }
1137
1138 /* Skip the 0x00 byte at the end of the word positions list. */
1139 ++src;
1140
1141 /* Number of encoded pos bytes to copy. */
1142 pos_enc_len = ulint(src - enc->src_ilist_ptr);
1143
1144 /* Total number of bytes required for copy. */
1145 enc_len += pos_enc_len;
1146
1147 /* Check we have enough space in the destination buffer for
1148 copying the document word list. */
1149 if (!node->ilist) {
1150 ulint new_size;
1151
1152 ut_a(node->ilist_size == 0);
1153
1154 new_size = enc_len > FTS_ILIST_MAX_SIZE
1155 ? enc_len : FTS_ILIST_MAX_SIZE;
1156
1157 node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1158 node->ilist_size_alloc = new_size;
1159
1160 } else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1161 ulint new_size = node->ilist_size + enc_len;
1162 byte* ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1163
1164 memcpy(ilist, node->ilist, node->ilist_size);
1165
1166 ut_free(node->ilist);
1167
1168 node->ilist = ilist;
1169 node->ilist_size_alloc = new_size;
1170 }
1171
1172 src = enc->src_ilist_ptr;
1173 dst = node->ilist + node->ilist_size;
1174
1175 /* Encode the doc id. Cast to ulint, the delta should be small and
1176 therefore no loss of precision. */
1177 dst = fts_encode_int(doc_id_delta, dst);
1178
1179 /* Copy the encoded pos array. */
1180 memcpy(dst, src, pos_enc_len);
1181
1182 node->last_doc_id = doc_id;
1183
1184 /* Data copied upto here. */
1185 node->ilist_size += enc_len;
1186 enc->src_ilist_ptr += pos_enc_len;
1187
1188 ut_a(node->ilist_size <= node->ilist_size_alloc);
1189
1190 return(error);
1191 }
1192
1193 /**********************************************************************//**
1194 Optimize the data contained in a node.
1195 @return DB_SUCCESS or error code*/
1196 static MY_ATTRIBUTE((nonnull))
1197 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1198 fts_optimize_node(
1199 /*==============*/
1200 ib_vector_t* del_vec, /*!< in: vector of doc ids to delete*/
1201 int* del_pos, /*!< in: offset into above vector */
1202 fts_node_t* dst_node, /*!< in: node to fill*/
1203 fts_node_t* src_node, /*!< in: source node for data*/
1204 fts_encode_t* enc) /*!< in: encoding state */
1205 {
1206 ulint copied;
1207 dberr_t error = DB_SUCCESS;
1208 doc_id_t doc_id = enc->src_last_doc_id;
1209
1210 if (!enc->src_ilist_ptr) {
1211 enc->src_ilist_ptr = src_node->ilist;
1212 }
1213
1214 copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1215
1216 /* While there is data in the source node and space to copy
1217 into in the destination node. */
1218 while (copied < src_node->ilist_size
1219 && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1220
1221 doc_id_t delta;
1222 doc_id_t del_doc_id = FTS_NULL_DOC_ID;
1223
1224 delta = fts_decode_vlc(
1225 (const byte**)&enc->src_ilist_ptr);
1226
1227 test_again:
1228 /* Check whether the doc id is in the delete list, if
1229 so then we skip the entries but we need to track the
1230 delta for decoding the entries following this document's
1231 entries. */
1232 if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1233 doc_id_t* update;
1234
1235 update = (doc_id_t*) ib_vector_get(
1236 del_vec, ulint(*del_pos));
1237
1238 del_doc_id = *update;
1239 }
1240
1241 if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1242 ut_a(delta == src_node->first_doc_id);
1243 }
1244
1245 doc_id += delta;
1246
1247 if (del_doc_id > 0 && doc_id == del_doc_id) {
1248
1249 ++*del_pos;
1250
1251 /* Skip the entries for this document. */
1252 while (*enc->src_ilist_ptr) {
1253 fts_decode_vlc((const byte**)&enc->src_ilist_ptr);
1254 }
1255
1256 /* Skip the end of word position marker. */
1257 ++enc->src_ilist_ptr;
1258
1259 } else {
1260
1261 /* DOC ID already becomes larger than
1262 del_doc_id, check the next del_doc_id */
1263 if (del_doc_id > 0 && doc_id > del_doc_id) {
1264 del_doc_id = 0;
1265 ++*del_pos;
1266 delta = 0;
1267 goto test_again;
1268 }
1269
1270 /* Decode and copy the word positions into
1271 the dest node. */
1272 fts_optimize_encode_node(dst_node, doc_id, enc);
1273
1274 ++dst_node->doc_count;
1275
1276 ut_a(dst_node->last_doc_id == doc_id);
1277 }
1278
1279 /* Bytes copied so for from source. */
1280 copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1281 }
1282
1283 if (copied >= src_node->ilist_size) {
1284 ut_a(doc_id == src_node->last_doc_id);
1285 }
1286
1287 enc->src_last_doc_id = doc_id;
1288
1289 return(error);
1290 }
1291
1292 /**********************************************************************//**
1293 Determine the starting pos within the deleted doc id vector for a word.
1294 @return delete position */
1295 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1296 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1297 fts_optimize_deleted_pos(
1298 /*=====================*/
1299 fts_optimize_t* optim, /*!< in: optimize state data */
1300 fts_word_t* word) /*!< in: the word data to check */
1301 {
1302 int del_pos;
1303 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1304
1305 /* Get the first and last dict ids for the word, we will use
1306 these values to determine which doc ids need to be removed
1307 when we coalesce the nodes. This way we can reduce the numer
1308 of elements that need to be searched in the deleted doc ids
1309 vector and secondly we can remove the doc ids during the
1310 coalescing phase. */
1311 if (ib_vector_size(del_vec) > 0) {
1312 fts_node_t* node;
1313 doc_id_t last_id;
1314 doc_id_t first_id;
1315 ulint size = ib_vector_size(word->nodes);
1316
1317 node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1318 first_id = node->first_doc_id;
1319
1320 node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1321 last_id = node->last_doc_id;
1322
1323 ut_a(first_id <= last_id);
1324
1325 del_pos = fts_optimize_lookup(
1326 del_vec, optim->del_pos, first_id, last_id);
1327 } else {
1328
1329 del_pos = -1; /* Note that there is nothing to delete. */
1330 }
1331
1332 return(del_pos);
1333 }
1334
1335 #define FTS_DEBUG_PRINT
1336 /**********************************************************************//**
1337 Compact the nodes for a word, we also remove any doc ids during the
1338 compaction pass.
1339 @return DB_SUCCESS or error code.*/
1340 static
1341 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1342 fts_optimize_word(
1343 /*==============*/
1344 fts_optimize_t* optim, /*!< in: optimize state data */
1345 fts_word_t* word) /*!< in: the word to optimize */
1346 {
1347 fts_encode_t enc;
1348 ib_vector_t* nodes;
1349 ulint i = 0;
1350 int del_pos;
1351 fts_node_t* dst_node = NULL;
1352 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1353 ulint size = ib_vector_size(word->nodes);
1354
1355 del_pos = fts_optimize_deleted_pos(optim, word);
1356 nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1357
1358 enc.src_last_doc_id = 0;
1359 enc.src_ilist_ptr = NULL;
1360
1361 while (i < size) {
1362 ulint copied;
1363 fts_node_t* src_node;
1364
1365 src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1366
1367 if (dst_node == NULL
1368 || dst_node->last_doc_id > src_node->first_doc_id) {
1369
1370 dst_node = static_cast<fts_node_t*>(
1371 ib_vector_push(nodes, NULL));
1372 memset(dst_node, 0, sizeof(*dst_node));
1373 }
1374
1375 /* Copy from the src to the dst node. */
1376 fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1377
1378 ut_a(enc.src_ilist_ptr != NULL);
1379
1380 /* Determine the numer of bytes copied to dst_node. */
1381 copied = ulint(enc.src_ilist_ptr - src_node->ilist);
1382
1383 /* Can't copy more than whats in the vlc array. */
1384 ut_a(copied <= src_node->ilist_size);
1385
1386 /* We are done with this node release the resources. */
1387 if (copied == src_node->ilist_size) {
1388
1389 enc.src_last_doc_id = 0;
1390 enc.src_ilist_ptr = NULL;
1391
1392 ut_free(src_node->ilist);
1393
1394 src_node->ilist = NULL;
1395 src_node->ilist_size = src_node->ilist_size_alloc = 0;
1396
1397 src_node = NULL;
1398
1399 ++i; /* Get next source node to OPTIMIZE. */
1400 }
1401
1402 if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1403
1404 dst_node = NULL;
1405 }
1406 }
1407
1408 /* All dst nodes created should have been added to the vector. */
1409 ut_a(dst_node == NULL);
1410
1411 /* Return the OPTIMIZED nodes. */
1412 return(nodes);
1413 }
1414
1415 /**********************************************************************//**
1416 Update the FTS index table. This is a delete followed by an insert.
1417 @return DB_SUCCESS or error code */
1418 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1419 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1420 fts_optimize_write_word(
1421 /*====================*/
1422 trx_t* trx, /*!< in: transaction */
1423 fts_table_t* fts_table, /*!< in: table of FTS index */
1424 fts_string_t* word, /*!< in: word data to write */
1425 ib_vector_t* nodes) /*!< in: the nodes to write */
1426 {
1427 ulint i;
1428 pars_info_t* info;
1429 que_t* graph;
1430 ulint selected;
1431 dberr_t error = DB_SUCCESS;
1432 char table_name[MAX_FULL_NAME_LEN];
1433
1434 info = pars_info_create();
1435
1436 ut_ad(fts_table->charset);
1437
1438 pars_info_bind_varchar_literal(
1439 info, "word", word->f_str, word->f_len);
1440
1441 selected = fts_select_index(fts_table->charset,
1442 word->f_str, word->f_len);
1443
1444 fts_table->suffix = fts_get_suffix(selected);
1445 fts_get_table_name(fts_table, table_name);
1446 pars_info_bind_id(info, "table_name", table_name);
1447
1448 graph = fts_parse_sql(
1449 fts_table,
1450 info,
1451 "BEGIN DELETE FROM $table_name WHERE word = :word;");
1452
1453 error = fts_eval_sql(trx, graph);
1454
1455 if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1456 ib::error() << "(" << error << ") during optimize,"
1457 " when deleting a word from the FTS index.";
1458 }
1459
1460 fts_que_graph_free(graph);
1461 graph = NULL;
1462
1463 /* Even if the operation needs to be rolled back and redone,
1464 we iterate over the nodes in order to free the ilist. */
1465 for (i = 0; i < ib_vector_size(nodes); ++i) {
1466
1467 fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1468
1469 if (error == DB_SUCCESS) {
1470 /* Skip empty node. */
1471 if (node->ilist == NULL) {
1472 ut_ad(node->ilist_size == 0);
1473 continue;
1474 }
1475
1476 error = fts_write_node(
1477 trx, &graph, fts_table, word, node);
1478
1479 if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1480 ib::error() << "(" << error << ")"
1481 " during optimize, while adding a"
1482 " word to the FTS index.";
1483 }
1484 }
1485
1486 ut_free(node->ilist);
1487 node->ilist = NULL;
1488 node->ilist_size = node->ilist_size_alloc = 0;
1489 }
1490
1491 if (graph != NULL) {
1492 fts_que_graph_free(graph);
1493 }
1494
1495 return(error);
1496 }
1497
1498 /**********************************************************************//**
1499 Free fts_optimizer_word_t instanace.*/
1500 void
fts_word_free(fts_word_t * word)1501 fts_word_free(
1502 /*==========*/
1503 fts_word_t* word) /*!< in: instance to free.*/
1504 {
1505 mem_heap_t* heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1506
1507 #ifdef UNIV_DEBUG
1508 memset(word, 0, sizeof(*word));
1509 #endif /* UNIV_DEBUG */
1510
1511 mem_heap_free(heap);
1512 }
1513
1514 /**********************************************************************//**
1515 Optimize the word ilist and rewrite data to the FTS index.
1516 @return status one of RESTART, EXIT, ERROR */
1517 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1518 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,time_t start_time)1519 fts_optimize_compact(
1520 /*=================*/
1521 fts_optimize_t* optim, /*!< in: optimize state data */
1522 dict_index_t* index, /*!< in: current FTS being optimized */
1523 time_t start_time) /*!< in: optimize start time */
1524 {
1525 ulint i;
1526 dberr_t error = DB_SUCCESS;
1527 ulint size = ib_vector_size(optim->words);
1528
1529 for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1530 fts_word_t* word;
1531 ib_vector_t* nodes;
1532 trx_t* trx = optim->trx;
1533
1534 word = (fts_word_t*) ib_vector_get(optim->words, i);
1535
1536 /* nodes is allocated from the word heap and will be destroyed
1537 when the word is freed. We however have to be careful about
1538 the ilist, that needs to be freed explicitly. */
1539 nodes = fts_optimize_word(optim, word);
1540
1541 /* Update the data on disk. */
1542 error = fts_optimize_write_word(
1543 trx, &optim->fts_index_table, &word->text, nodes);
1544
1545 if (error == DB_SUCCESS) {
1546 /* Write the last word optimized to the config table,
1547 we use this value for restarting optimize. */
1548 error = fts_config_set_index_value(
1549 optim->trx, index,
1550 FTS_LAST_OPTIMIZED_WORD, &word->text);
1551 }
1552
1553 /* Free the word that was optimized. */
1554 fts_word_free(word);
1555
1556 ulint interval = ulint(time(NULL) - start_time);
1557
1558 if (fts_optimize_time_limit > 0
1559 && (lint(interval) < 0
1560 || interval > fts_optimize_time_limit)) {
1561
1562 optim->done = TRUE;
1563 }
1564 }
1565
1566 return(error);
1567 }
1568
1569 /**********************************************************************//**
1570 Create an instance of fts_optimize_t. Also create a new
1571 background transaction.*/
1572 static
1573 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1574 fts_optimize_create(
1575 /*================*/
1576 dict_table_t* table) /*!< in: table with FTS indexes */
1577 {
1578 fts_optimize_t* optim;
1579 mem_heap_t* heap = mem_heap_create(128);
1580
1581 optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1582
1583 optim->self_heap = ib_heap_allocator_create(heap);
1584
1585 optim->to_delete = fts_doc_ids_create();
1586
1587 optim->words = ib_vector_create(
1588 optim->self_heap, sizeof(fts_word_t), 256);
1589
1590 optim->table = table;
1591
1592 optim->trx = trx_create();
1593 trx_start_internal(optim->trx);
1594
1595 optim->fts_common_table.table_id = table->id;
1596 optim->fts_common_table.type = FTS_COMMON_TABLE;
1597 optim->fts_common_table.table = table;
1598
1599 optim->fts_index_table.table_id = table->id;
1600 optim->fts_index_table.type = FTS_INDEX_TABLE;
1601 optim->fts_index_table.table = table;
1602
1603 /* The common prefix for all this parent table's aux tables. */
1604 optim->name_prefix = fts_get_table_name_prefix(
1605 &optim->fts_common_table);
1606
1607 return(optim);
1608 }
1609
1610 #ifdef FTS_OPTIMIZE_DEBUG
1611 /**********************************************************************//**
1612 Get optimize start time of an FTS index.
1613 @return DB_SUCCESS if all OK else error code */
1614 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1615 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,time_t * start_time)1616 fts_optimize_get_index_start_time(
1617 /*==============================*/
1618 trx_t* trx, /*!< in: transaction */
1619 dict_index_t* index, /*!< in: FTS index */
1620 time_t* start_time) /*!< out: time in secs */
1621 {
1622 return(fts_config_get_index_ulint(
1623 trx, index, FTS_OPTIMIZE_START_TIME,
1624 (ulint*) start_time));
1625 }
1626
1627 /**********************************************************************//**
1628 Set the optimize start time of an FTS index.
1629 @return DB_SUCCESS if all OK else error code */
1630 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1631 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,time_t start_time)1632 fts_optimize_set_index_start_time(
1633 /*==============================*/
1634 trx_t* trx, /*!< in: transaction */
1635 dict_index_t* index, /*!< in: FTS index */
1636 time_t start_time) /*!< in: start time */
1637 {
1638 return(fts_config_set_index_ulint(
1639 trx, index, FTS_OPTIMIZE_START_TIME,
1640 (ulint) start_time));
1641 }
1642
1643 /**********************************************************************//**
1644 Get optimize end time of an FTS index.
1645 @return DB_SUCCESS if all OK else error code */
1646 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1647 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,time_t * end_time)1648 fts_optimize_get_index_end_time(
1649 /*============================*/
1650 trx_t* trx, /*!< in: transaction */
1651 dict_index_t* index, /*!< in: FTS index */
1652 time_t* end_time) /*!< out: time in secs */
1653 {
1654 return(fts_config_get_index_ulint(
1655 trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1656 }
1657
1658 /**********************************************************************//**
1659 Set the optimize end time of an FTS index.
1660 @return DB_SUCCESS if all OK else error code */
1661 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1662 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,time_t end_time)1663 fts_optimize_set_index_end_time(
1664 /*============================*/
1665 trx_t* trx, /*!< in: transaction */
1666 dict_index_t* index, /*!< in: FTS index */
1667 time_t end_time) /*!< in: end time */
1668 {
1669 return(fts_config_set_index_ulint(
1670 trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1671 }
1672 #endif
1673
1674 /**********************************************************************//**
1675 Free the optimize prepared statements.*/
1676 static
1677 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1678 fts_optimize_graph_free(
1679 /*====================*/
1680 fts_optimize_graph_t* graph) /*!< in/out: The graph instances
1681 to free */
1682 {
1683 if (graph->commit_graph) {
1684 que_graph_free(graph->commit_graph);
1685 graph->commit_graph = NULL;
1686 }
1687
1688 if (graph->write_nodes_graph) {
1689 que_graph_free(graph->write_nodes_graph);
1690 graph->write_nodes_graph = NULL;
1691 }
1692
1693 if (graph->delete_nodes_graph) {
1694 que_graph_free(graph->delete_nodes_graph);
1695 graph->delete_nodes_graph = NULL;
1696 }
1697
1698 if (graph->read_nodes_graph) {
1699 que_graph_free(graph->read_nodes_graph);
1700 graph->read_nodes_graph = NULL;
1701 }
1702 }
1703
1704 /**********************************************************************//**
1705 Free all optimize resources. */
1706 static
1707 void
fts_optimize_free(fts_optimize_t * optim)1708 fts_optimize_free(
1709 /*==============*/
1710 fts_optimize_t* optim) /*!< in: table with on FTS index */
1711 {
1712 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1713
1714 trx_commit_for_mysql(optim->trx);
1715 optim->trx->free();
1716 optim->trx = NULL;
1717
1718 fts_doc_ids_free(optim->to_delete);
1719 fts_optimize_graph_free(&optim->graph);
1720
1721 ut_free(optim->name_prefix);
1722
1723 /* This will free the heap from which optim itself was allocated. */
1724 mem_heap_free(heap);
1725 }
1726
1727 /**********************************************************************//**
1728 Get the max time optimize should run in millisecs.
1729 @return max optimize time limit in millisecs. */
1730 static
1731 ulint
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1732 fts_optimize_get_time_limit(
1733 /*========================*/
1734 trx_t* trx, /*!< in: transaction */
1735 fts_table_t* fts_table) /*!< in: aux table */
1736 {
1737 ulint time_limit = 0;
1738
1739 fts_config_get_ulint(
1740 trx, fts_table,
1741 FTS_OPTIMIZE_LIMIT_IN_SECS, &time_limit);
1742
1743 /* FIXME: This is returning milliseconds, while the variable
1744 is being stored and interpreted as seconds! */
1745 return(time_limit * 1000);
1746 }
1747
1748 /**********************************************************************//**
1749 Run OPTIMIZE on the given table. Note: this can take a very long time
1750 (hours). */
1751 static
1752 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1753 fts_optimize_words(
1754 /*===============*/
1755 fts_optimize_t* optim, /*!< in: optimize instance */
1756 dict_index_t* index, /*!< in: current FTS being optimized */
1757 fts_string_t* word) /*!< in: the starting word to optimize */
1758 {
1759 fts_fetch_t fetch;
1760 que_t* graph = NULL;
1761 CHARSET_INFO* charset = optim->fts_index_table.charset;
1762
1763 ut_a(!optim->done);
1764
1765 /* Get the time limit from the config table. */
1766 fts_optimize_time_limit = fts_optimize_get_time_limit(
1767 optim->trx, &optim->fts_common_table);
1768
1769 const time_t start_time = time(NULL);
1770
1771 /* Setup the callback to use for fetching the word ilist etc. */
1772 fetch.read_arg = optim->words;
1773 fetch.read_record = fts_optimize_index_fetch_node;
1774
1775 while (!optim->done) {
1776 dberr_t error;
1777 trx_t* trx = optim->trx;
1778 ulint selected;
1779
1780 ut_a(ib_vector_size(optim->words) == 0);
1781
1782 selected = fts_select_index(charset, word->f_str, word->f_len);
1783
1784 /* Read the index records to optimize. */
1785 fetch.total_memory = 0;
1786 error = fts_index_fetch_nodes(
1787 trx, &graph, &optim->fts_index_table, word,
1788 &fetch);
1789 ut_ad(fetch.total_memory < fts_result_cache_limit);
1790
1791 if (error == DB_SUCCESS) {
1792 /* There must be some nodes to read. */
1793 ut_a(ib_vector_size(optim->words) > 0);
1794
1795 /* Optimize the nodes that were read and write
1796 back to DB. */
1797 error = fts_optimize_compact(optim, index, start_time);
1798
1799 if (error == DB_SUCCESS) {
1800 fts_sql_commit(optim->trx);
1801 } else {
1802 fts_sql_rollback(optim->trx);
1803 }
1804 }
1805
1806 ib_vector_reset(optim->words);
1807
1808 if (error == DB_SUCCESS) {
1809 if (!optim->done) {
1810 if (!fts_zip_read_word(optim->zip, word)) {
1811 optim->done = TRUE;
1812 } else if (selected
1813 != fts_select_index(
1814 charset, word->f_str,
1815 word->f_len)
1816 && graph) {
1817 fts_que_graph_free(graph);
1818 graph = NULL;
1819 }
1820 }
1821 } else if (error == DB_LOCK_WAIT_TIMEOUT) {
1822 ib::warn() << "Lock wait timeout during optimize."
1823 " Retrying!";
1824
1825 trx->error_state = DB_SUCCESS;
1826 } else if (error == DB_DEADLOCK) {
1827 ib::warn() << "Deadlock during optimize. Retrying!";
1828
1829 trx->error_state = DB_SUCCESS;
1830 } else {
1831 optim->done = TRUE; /* Exit the loop. */
1832 }
1833 }
1834
1835 if (graph != NULL) {
1836 fts_que_graph_free(graph);
1837 }
1838 }
1839
1840 /**********************************************************************//**
1841 Optimize is complete. Set the completion time, and reset the optimize
1842 start string for this FTS index to "".
1843 @return DB_SUCCESS if all OK */
1844 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1845 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1846 fts_optimize_index_completed(
1847 /*=========================*/
1848 fts_optimize_t* optim, /*!< in: optimize instance */
1849 dict_index_t* index) /*!< in: table with one FTS index */
1850 {
1851 fts_string_t word;
1852 dberr_t error;
1853 byte buf[sizeof(ulint)];
1854 #ifdef FTS_OPTIMIZE_DEBUG
1855 time_t end_time = time(NULL);
1856
1857 error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1858 #endif
1859
1860 /* If we've reached the end of the index then set the start
1861 word to the empty string. */
1862
1863 word.f_len = 0;
1864 word.f_str = buf;
1865 *word.f_str = '\0';
1866
1867 error = fts_config_set_index_value(
1868 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1869
1870 if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1871 ib::error() << "(" << error << ") while updating"
1872 " last optimized word!";
1873 }
1874
1875 return(error);
1876 }
1877
1878
1879 /**********************************************************************//**
1880 Read the list of words from the FTS auxiliary index that will be
1881 optimized in this pass.
1882 @return DB_SUCCESS if all OK */
1883 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1884 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1885 fts_optimize_index_read_words(
1886 /*==========================*/
1887 fts_optimize_t* optim, /*!< in: optimize instance */
1888 dict_index_t* index, /*!< in: table with one FTS index */
1889 fts_string_t* word) /*!< in: buffer to use */
1890 {
1891 dberr_t error = DB_SUCCESS;
1892
1893 if (optim->del_list_regenerated) {
1894 word->f_len = 0;
1895 } else {
1896
1897 /* Get the last word that was optimized from
1898 the config table. */
1899 error = fts_config_get_index_value(
1900 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1901 }
1902
1903 /* If record not found then we start from the top. */
1904 if (error == DB_RECORD_NOT_FOUND) {
1905 word->f_len = 0;
1906 error = DB_SUCCESS;
1907 }
1908
1909 while (error == DB_SUCCESS) {
1910
1911 error = fts_index_fetch_words(
1912 optim, word, fts_num_word_optimize);
1913
1914 if (error == DB_SUCCESS) {
1915 /* Reset the last optimized word to '' if no
1916 more words could be read from the FTS index. */
1917 if (optim->zip->n_words == 0) {
1918 word->f_len = 0;
1919 *word->f_str = 0;
1920 }
1921
1922 break;
1923 }
1924 }
1925
1926 return(error);
1927 }
1928
1929 /**********************************************************************//**
1930 Run OPTIMIZE on the given FTS index. Note: this can take a very long
1931 time (hours).
1932 @return DB_SUCCESS if all OK */
1933 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1934 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)1935 fts_optimize_index(
1936 /*===============*/
1937 fts_optimize_t* optim, /*!< in: optimize instance */
1938 dict_index_t* index) /*!< in: table with one FTS index */
1939 {
1940 fts_string_t word;
1941 dberr_t error;
1942 byte str[FTS_MAX_WORD_LEN + 1];
1943
1944 /* Set the current index that we have to optimize. */
1945 optim->fts_index_table.index_id = index->id;
1946 optim->fts_index_table.charset = fts_index_get_charset(index);
1947
1948 optim->done = FALSE; /* Optimize until !done */
1949
1950 /* We need to read the last word optimized so that we start from
1951 the next word. */
1952 word.f_str = str;
1953
1954 /* We set the length of word to the size of str since we
1955 need to pass the max len info to the fts_get_config_value() function. */
1956 word.f_len = sizeof(str) - 1;
1957
1958 memset(word.f_str, 0x0, word.f_len);
1959
1960 /* Read the words that will be optimized in this pass. */
1961 error = fts_optimize_index_read_words(optim, index, &word);
1962
1963 if (error == DB_SUCCESS) {
1964 int zip_error;
1965
1966 ut_a(optim->zip->pos == 0);
1967 ut_a(optim->zip->zp->total_in == 0);
1968 ut_a(optim->zip->zp->total_out == 0);
1969
1970 zip_error = inflateInit(optim->zip->zp);
1971 ut_a(zip_error == Z_OK);
1972
1973 word.f_len = 0;
1974 word.f_str = str;
1975
1976 /* Read the first word to optimize from the Zip buffer. */
1977 if (!fts_zip_read_word(optim->zip, &word)) {
1978
1979 optim->done = TRUE;
1980 } else {
1981 fts_optimize_words(optim, index, &word);
1982 }
1983
1984 /* If we couldn't read any records then optimize is
1985 complete. Increment the number of indexes that have
1986 been optimized and set FTS index optimize state to
1987 completed. */
1988 if (error == DB_SUCCESS && optim->zip->n_words == 0) {
1989
1990 error = fts_optimize_index_completed(optim, index);
1991
1992 if (error == DB_SUCCESS) {
1993 ++optim->n_completed;
1994 }
1995 }
1996 }
1997
1998 return(error);
1999 }
2000
2001 /**********************************************************************//**
2002 Delete the document ids in the delete, and delete cache tables.
2003 @return DB_SUCCESS if all OK */
2004 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2005 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2006 fts_optimize_purge_deleted_doc_ids(
2007 /*===============================*/
2008 fts_optimize_t* optim) /*!< in: optimize instance */
2009 {
2010 ulint i;
2011 pars_info_t* info;
2012 que_t* graph;
2013 doc_id_t* update;
2014 doc_id_t write_doc_id;
2015 dberr_t error = DB_SUCCESS;
2016 char deleted[MAX_FULL_NAME_LEN];
2017 char deleted_cache[MAX_FULL_NAME_LEN];
2018
2019 info = pars_info_create();
2020
2021 ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2022
2023 update = static_cast<doc_id_t*>(
2024 ib_vector_get(optim->to_delete->doc_ids, 0));
2025
2026 /* Convert to "storage" byte order. */
2027 fts_write_doc_id((byte*) &write_doc_id, *update);
2028
2029 /* This is required for the SQL parser to work. It must be able
2030 to find the following variables. So we do it twice. */
2031 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2032 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2033
2034 /* Make sure the following two names are consistent with the name
2035 used in the fts_delete_doc_ids_sql */
2036 optim->fts_common_table.suffix = fts_common_tables[3];
2037 fts_get_table_name(&optim->fts_common_table, deleted);
2038 pars_info_bind_id(info, fts_common_tables[3], deleted);
2039
2040 optim->fts_common_table.suffix = fts_common_tables[4];
2041 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2042 pars_info_bind_id(info, fts_common_tables[4], deleted_cache);
2043
2044 graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
2045
2046 /* Delete the doc ids that were copied at the start. */
2047 for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2048
2049 update = static_cast<doc_id_t*>(ib_vector_get(
2050 optim->to_delete->doc_ids, i));
2051
2052 /* Convert to "storage" byte order. */
2053 fts_write_doc_id((byte*) &write_doc_id, *update);
2054
2055 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2056
2057 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2058
2059 error = fts_eval_sql(optim->trx, graph);
2060
2061 // FIXME: Check whether delete actually succeeded!
2062 if (error != DB_SUCCESS) {
2063
2064 fts_sql_rollback(optim->trx);
2065 break;
2066 }
2067 }
2068
2069 fts_que_graph_free(graph);
2070
2071 return(error);
2072 }
2073
2074 /**********************************************************************//**
2075 Delete the document ids in the pending delete, and delete tables.
2076 @return DB_SUCCESS if all OK */
2077 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2078 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2079 fts_optimize_purge_deleted_doc_id_snapshot(
2080 /*=======================================*/
2081 fts_optimize_t* optim) /*!< in: optimize instance */
2082 {
2083 dberr_t error;
2084 que_t* graph;
2085 pars_info_t* info;
2086 char being_deleted[MAX_FULL_NAME_LEN];
2087 char being_deleted_cache[MAX_FULL_NAME_LEN];
2088
2089 info = pars_info_create();
2090
2091 /* Make sure the following two names are consistent with the name
2092 used in the fts_end_delete_sql */
2093 optim->fts_common_table.suffix = fts_common_tables[0];
2094 fts_get_table_name(&optim->fts_common_table, being_deleted);
2095 pars_info_bind_id(info, fts_common_tables[0], being_deleted);
2096
2097 optim->fts_common_table.suffix = fts_common_tables[1];
2098 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2099 pars_info_bind_id(info, fts_common_tables[1], being_deleted_cache);
2100
2101 /* Delete the doc ids that were copied to delete pending state at
2102 the start of optimize. */
2103 graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
2104
2105 error = fts_eval_sql(optim->trx, graph);
2106 fts_que_graph_free(graph);
2107
2108 return(error);
2109 }
2110
2111 /**********************************************************************//**
2112 Copy the deleted doc ids that will be purged during this optimize run
2113 to the being deleted FTS auxiliary tables. The transaction is committed
2114 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2115 @return DB_SUCCESS if all OK */
2116 static
2117 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2118 fts_optimize_being_deleted_count(
2119 /*=============================*/
2120 fts_optimize_t* optim) /*!< in: optimize instance */
2121 {
2122 fts_table_t fts_table;
2123
2124 FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2125 optim->table);
2126
2127 return(fts_get_rows_count(&fts_table));
2128 }
2129
2130 /*********************************************************************//**
2131 Copy the deleted doc ids that will be purged during this optimize run
2132 to the being deleted FTS auxiliary tables. The transaction is committed
2133 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2134 @return DB_SUCCESS if all OK */
2135 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2136 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2137 fts_optimize_create_deleted_doc_id_snapshot(
2138 /*========================================*/
2139 fts_optimize_t* optim) /*!< in: optimize instance */
2140 {
2141 dberr_t error;
2142 que_t* graph;
2143 pars_info_t* info;
2144 char being_deleted[MAX_FULL_NAME_LEN];
2145 char deleted[MAX_FULL_NAME_LEN];
2146 char being_deleted_cache[MAX_FULL_NAME_LEN];
2147 char deleted_cache[MAX_FULL_NAME_LEN];
2148
2149 info = pars_info_create();
2150
2151 /* Make sure the following four names are consistent with the name
2152 used in the fts_init_delete_sql */
2153 optim->fts_common_table.suffix = fts_common_tables[0];
2154 fts_get_table_name(&optim->fts_common_table, being_deleted);
2155 pars_info_bind_id(info, fts_common_tables[0], being_deleted);
2156
2157 optim->fts_common_table.suffix = fts_common_tables[3];
2158 fts_get_table_name(&optim->fts_common_table, deleted);
2159 pars_info_bind_id(info, fts_common_tables[3], deleted);
2160
2161 optim->fts_common_table.suffix = fts_common_tables[1];
2162 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2163 pars_info_bind_id(info, fts_common_tables[1], being_deleted_cache);
2164
2165 optim->fts_common_table.suffix = fts_common_tables[4];
2166 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2167 pars_info_bind_id(info, fts_common_tables[4], deleted_cache);
2168
2169 /* Move doc_ids that are to be deleted to state being deleted. */
2170 graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
2171
2172 error = fts_eval_sql(optim->trx, graph);
2173
2174 fts_que_graph_free(graph);
2175
2176 if (error != DB_SUCCESS) {
2177 fts_sql_rollback(optim->trx);
2178 } else {
2179 fts_sql_commit(optim->trx);
2180 }
2181
2182 optim->del_list_regenerated = TRUE;
2183
2184 return(error);
2185 }
2186
2187 /*********************************************************************//**
2188 Read in the document ids that are to be purged during optimize. The
2189 transaction is committed upon successfully read.
2190 @return DB_SUCCESS if all OK */
2191 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2192 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2193 fts_optimize_read_deleted_doc_id_snapshot(
2194 /*======================================*/
2195 fts_optimize_t* optim) /*!< in: optimize instance */
2196 {
2197 dberr_t error;
2198
2199 optim->fts_common_table.suffix = "BEING_DELETED";
2200
2201 /* Read the doc_ids to delete. */
2202 error = fts_table_fetch_doc_ids(
2203 optim->trx, &optim->fts_common_table, optim->to_delete);
2204
2205 if (error == DB_SUCCESS) {
2206
2207 optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2208
2209 /* Read additional doc_ids to delete. */
2210 error = fts_table_fetch_doc_ids(
2211 optim->trx, &optim->fts_common_table, optim->to_delete);
2212 }
2213
2214 if (error != DB_SUCCESS) {
2215
2216 fts_doc_ids_free(optim->to_delete);
2217 optim->to_delete = NULL;
2218 }
2219
2220 return(error);
2221 }
2222
2223 /*********************************************************************//**
2224 Optimze all the FTS indexes, skipping those that have already been
2225 optimized, since the FTS auxiliary indexes are not guaranteed to be
2226 of the same cardinality.
2227 @return DB_SUCCESS if all OK */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2230 fts_optimize_indexes(
2231 /*=================*/
2232 fts_optimize_t* optim) /*!< in: optimize instance */
2233 {
2234 ulint i;
2235 dberr_t error = DB_SUCCESS;
2236 fts_t* fts = optim->table->fts;
2237
2238 /* Optimize the FTS indexes. */
2239 for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2240 dict_index_t* index;
2241
2242 #ifdef FTS_OPTIMIZE_DEBUG
2243 time_t end_time;
2244 time_t start_time;
2245
2246 /* Get the start and end optimize times for this index. */
2247 error = fts_optimize_get_index_start_time(
2248 optim->trx, index, &start_time);
2249
2250 if (error != DB_SUCCESS) {
2251 break;
2252 }
2253
2254 error = fts_optimize_get_index_end_time(
2255 optim->trx, index, &end_time);
2256
2257 if (error != DB_SUCCESS) {
2258 break;
2259 }
2260
2261 /* Start time will be 0 only for the first time or after
2262 completing the optimization of all FTS indexes. */
2263 if (start_time == 0) {
2264 start_time = time(NULL);
2265
2266 error = fts_optimize_set_index_start_time(
2267 optim->trx, index, start_time);
2268 }
2269
2270 /* Check if this index needs to be optimized or not. */
2271 if (difftime(end_time, start_time) < 0) {
2272 error = fts_optimize_index(optim, index);
2273
2274 if (error != DB_SUCCESS) {
2275 break;
2276 }
2277 } else {
2278 ++optim->n_completed;
2279 }
2280 #endif
2281 index = static_cast<dict_index_t*>(
2282 ib_vector_getp(fts->indexes, i));
2283 error = fts_optimize_index(optim, index);
2284 }
2285
2286 if (error == DB_SUCCESS) {
2287 fts_sql_commit(optim->trx);
2288 } else {
2289 fts_sql_rollback(optim->trx);
2290 }
2291
2292 return(error);
2293 }
2294
2295 /*********************************************************************//**
2296 Cleanup the snapshot tables and the master deleted table.
2297 @return DB_SUCCESS if all OK */
2298 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2299 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2300 fts_optimize_purge_snapshot(
2301 /*========================*/
2302 fts_optimize_t* optim) /*!< in: optimize instance */
2303 {
2304 dberr_t error;
2305
2306 /* Delete the doc ids from the master deleted tables, that were
2307 in the snapshot that was taken at the start of optimize. */
2308 error = fts_optimize_purge_deleted_doc_ids(optim);
2309
2310 if (error == DB_SUCCESS) {
2311 /* Destroy the deleted doc id snapshot. */
2312 error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2313 }
2314
2315 if (error == DB_SUCCESS) {
2316 fts_sql_commit(optim->trx);
2317 } else {
2318 fts_sql_rollback(optim->trx);
2319 }
2320
2321 return(error);
2322 }
2323
2324 /*********************************************************************//**
2325 Reset the start time to 0 so that a new optimize can be started.
2326 @return DB_SUCCESS if all OK */
2327 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2328 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2329 fts_optimize_reset_start_time(
2330 /*==========================*/
2331 fts_optimize_t* optim) /*!< in: optimize instance */
2332 {
2333 dberr_t error = DB_SUCCESS;
2334 #ifdef FTS_OPTIMIZE_DEBUG
2335 fts_t* fts = optim->table->fts;
2336
2337 /* Optimization should have been completed for all indexes. */
2338 ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2339
2340 for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2341 dict_index_t* index;
2342
2343 time_t start_time = 0;
2344
2345 /* Reset the start time to 0 for this index. */
2346 error = fts_optimize_set_index_start_time(
2347 optim->trx, index, start_time);
2348
2349 index = static_cast<dict_index_t*>(
2350 ib_vector_getp(fts->indexes, i));
2351 }
2352 #endif
2353
2354 if (error == DB_SUCCESS) {
2355 fts_sql_commit(optim->trx);
2356 } else {
2357 fts_sql_rollback(optim->trx);
2358 }
2359
2360 return(error);
2361 }
2362
2363 /*********************************************************************//**
2364 Run OPTIMIZE on the given table by a background thread.
2365 @return DB_SUCCESS if all OK */
2366 static MY_ATTRIBUTE((nonnull))
2367 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2368 fts_optimize_table_bk(
2369 /*==================*/
2370 fts_slot_t* slot) /*!< in: table to optimiza */
2371 {
2372 const time_t now = time(NULL);
2373 const ulint interval = ulint(now - slot->last_run);
2374
2375 /* Avoid optimizing tables that were optimized recently. */
2376 if (slot->last_run > 0
2377 && lint(interval) >= 0
2378 && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
2379
2380 return(DB_SUCCESS);
2381 }
2382
2383 dict_table_t* table = slot->table;
2384 dberr_t error;
2385
2386 if (fil_table_accessible(table)
2387 && table->fts && table->fts->cache
2388 && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2389 error = fts_optimize_table(table);
2390
2391 slot->last_run = time(NULL);
2392
2393 if (error == DB_SUCCESS) {
2394 slot->running = false;
2395 slot->completed = slot->last_run;
2396 }
2397 } else {
2398 /* Note time this run completed. */
2399 slot->last_run = now;
2400 error = DB_SUCCESS;
2401 }
2402
2403 return(error);
2404 }
2405 /*********************************************************************//**
2406 Run OPTIMIZE on the given table.
2407 @return DB_SUCCESS if all OK */
2408 dberr_t
fts_optimize_table(dict_table_t * table)2409 fts_optimize_table(
2410 /*===============*/
2411 dict_table_t* table) /*!< in: table to optimiza */
2412 {
2413 if (srv_read_only_mode) {
2414 return DB_READ_ONLY;
2415 }
2416
2417 dberr_t error = DB_SUCCESS;
2418 fts_optimize_t* optim = NULL;
2419 fts_t* fts = table->fts;
2420
2421 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2422 ib::info() << "FTS start optimize " << table->name;
2423 }
2424
2425 optim = fts_optimize_create(table);
2426
2427 // FIXME: Call this only at the start of optimize, currently we
2428 // rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2429
2430 /* Check whether there are still records in BEING_DELETED table */
2431 if (fts_optimize_being_deleted_count(optim) == 0) {
2432 /* Take a snapshot of the deleted document ids, they are copied
2433 to the BEING_ tables. */
2434 error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2435 }
2436
2437 /* A duplicate error is OK, since we don't erase the
2438 doc ids from the being deleted state until all FTS
2439 indexes have been optimized. */
2440 if (error == DB_DUPLICATE_KEY) {
2441 error = DB_SUCCESS;
2442 }
2443
2444 if (error == DB_SUCCESS) {
2445
2446 /* These document ids will be filtered out during the
2447 index optimization phase. They are in the snapshot that we
2448 took above, at the start of the optimize. */
2449 error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2450
2451 if (error == DB_SUCCESS) {
2452
2453 /* Commit the read of being deleted
2454 doc ids transaction. */
2455 fts_sql_commit(optim->trx);
2456
2457 /* We would do optimization only if there
2458 are deleted records to be cleaned up */
2459 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2460 error = fts_optimize_indexes(optim);
2461 }
2462
2463 } else {
2464 ut_a(optim->to_delete == NULL);
2465 }
2466
2467 /* Only after all indexes have been optimized can we
2468 delete the (snapshot) doc ids in the pending delete,
2469 and master deleted tables. */
2470 if (error == DB_SUCCESS
2471 && optim->n_completed == ib_vector_size(fts->indexes)) {
2472
2473 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2474 ib::info() << "FTS_OPTIMIZE: Completed"
2475 " Optimize, cleanup DELETED table";
2476 }
2477
2478 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2479
2480 /* Purge the doc ids that were in the
2481 snapshot from the snapshot tables and
2482 the master deleted table. */
2483 error = fts_optimize_purge_snapshot(optim);
2484 }
2485
2486 if (error == DB_SUCCESS) {
2487 /* Reset the start time of all the FTS indexes
2488 so that optimize can be restarted. */
2489 error = fts_optimize_reset_start_time(optim);
2490 }
2491 }
2492 }
2493
2494 fts_optimize_free(optim);
2495
2496 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2497 ib::info() << "FTS end optimize " << table->name;
2498 }
2499
2500 return(error);
2501 }
2502
2503 /********************************************************************//**
2504 Add the table to add to the OPTIMIZER's list.
2505 @return new message instance */
2506 static
2507 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2508 fts_optimize_create_msg(
2509 /*====================*/
2510 fts_msg_type_t type, /*!< in: type of message */
2511 void* ptr) /*!< in: message payload */
2512 {
2513 mem_heap_t* heap;
2514 fts_msg_t* msg;
2515
2516 heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2517 msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2518
2519 msg->ptr = ptr;
2520 msg->type = type;
2521 msg->heap = heap;
2522
2523 return(msg);
2524 }
2525
2526 /** Add the table to add to the OPTIMIZER's list.
2527 @param[in] table table to add */
fts_optimize_add_table(dict_table_t * table)2528 void fts_optimize_add_table(dict_table_t* table)
2529 {
2530 fts_msg_t* msg;
2531
2532 if (!fts_optimize_wq) {
2533 return;
2534 }
2535
2536 /* If there is no fts index present then don't add to
2537 optimize queue. */
2538 if (!ib_vector_size(table->fts->indexes)) {
2539 return;
2540 }
2541
2542 /* Make sure table with FTS index cannot be evicted */
2543 dict_table_prevent_eviction(table);
2544
2545 msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2546
2547 mutex_enter(&fts_optimize_wq->mutex);
2548
2549 ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2550
2551 table->fts->in_queue = true;
2552
2553 mutex_exit(&fts_optimize_wq->mutex);
2554 }
2555
2556 /**********************************************************************//**
2557 Remove the table from the OPTIMIZER's list. We do wait for
2558 acknowledgement from the consumer of the message. */
2559 void
fts_optimize_remove_table(dict_table_t * table)2560 fts_optimize_remove_table(
2561 /*======================*/
2562 dict_table_t* table) /*!< in: table to remove */
2563 {
2564 fts_msg_t* msg;
2565 os_event_t event;
2566 fts_msg_del_t* remove;
2567
2568 /* if the optimize system not yet initialized, return */
2569 if (!fts_optimize_wq) {
2570 return;
2571 }
2572
2573 /* FTS optimizer thread is already exited */
2574 if (fts_opt_start_shutdown) {
2575 ib::info() << "Try to remove table " << table->name
2576 << " after FTS optimize thread exiting.";
2577 /* If the table can't be removed then wait till
2578 fts optimize thread shuts down */
2579 while (fts_optimize_wq) {
2580 os_thread_sleep(10000);
2581 }
2582 return;
2583 }
2584
2585 mutex_enter(&fts_optimize_wq->mutex);
2586
2587 if (!table->fts->in_queue) {
2588 mutex_exit(&fts_optimize_wq->mutex);
2589 return;
2590 }
2591
2592 msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2593
2594 /* We will wait on this event until signalled by the consumer. */
2595 event = os_event_create(0);
2596
2597 remove = static_cast<fts_msg_del_t*>(
2598 mem_heap_alloc(msg->heap, sizeof(*remove)));
2599
2600 remove->table = table;
2601 remove->event = event;
2602 msg->ptr = remove;
2603
2604 ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2605
2606 mutex_exit(&fts_optimize_wq->mutex);
2607
2608 os_event_wait(event);
2609
2610 os_event_destroy(event);
2611
2612 #ifdef UNIV_DEBUG
2613 if (!fts_opt_start_shutdown) {
2614 mutex_enter(&fts_optimize_wq->mutex);
2615 ut_ad(!table->fts->in_queue);
2616 mutex_exit(&fts_optimize_wq->mutex);
2617 }
2618 #endif /* UNIV_DEBUG */
2619 }
2620
2621 /** Send sync fts cache for the table.
2622 @param[in] table table to sync */
2623 void
fts_optimize_request_sync_table(dict_table_t * table)2624 fts_optimize_request_sync_table(
2625 dict_table_t* table)
2626 {
2627 /* if the optimize system not yet initialized, return */
2628 if (!fts_optimize_wq) {
2629 return;
2630 }
2631
2632 /* FTS optimizer thread is already exited */
2633 if (fts_opt_start_shutdown) {
2634 ib::info() << "Try to sync table " << table->name
2635 << " after FTS optimize thread exiting.";
2636 return;
2637 }
2638
2639 fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
2640
2641 mutex_enter(&fts_optimize_wq->mutex);
2642
2643 ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2644
2645 DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
2646 DBUG_ASSERT(fts_optimize_wq->length <= 1000););
2647
2648 mutex_exit(&fts_optimize_wq->mutex);
2649 }
2650
2651 /** Add a table to fts_slots if it doesn't already exist. */
fts_optimize_new_table(dict_table_t * table)2652 static bool fts_optimize_new_table(dict_table_t* table)
2653 {
2654 ut_ad(table);
2655
2656 ulint i;
2657 fts_slot_t* slot;
2658 fts_slot_t* empty = NULL;
2659
2660 /* Search for duplicates, also find a free slot if one exists. */
2661 for (i = 0; i < ib_vector_size(fts_slots); ++i) {
2662
2663 slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
2664
2665 if (!slot->table) {
2666 empty = slot;
2667 } else if (slot->table == table) {
2668 /* Already exists in our optimize queue. */
2669 return false;
2670 }
2671 }
2672
2673 slot = empty ? empty : static_cast<fts_slot_t*>(
2674 ib_vector_push(fts_slots, NULL));
2675
2676 memset(slot, 0x0, sizeof(*slot));
2677
2678 slot->table = table;
2679 return true;
2680 }
2681
2682 /** Remove a table from fts_slots if it exists.
2683 @param[in,out] table table to be removed from fts_slots */
fts_optimize_del_table(const dict_table_t * table)2684 static bool fts_optimize_del_table(const dict_table_t* table)
2685 {
2686 ut_ad(table);
2687 for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2688 fts_slot_t* slot;
2689
2690 slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
2691
2692 if (slot->table == table) {
2693 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2694 ib::info() << "FTS Optimize Removing table "
2695 << table->name;
2696 }
2697
2698 mutex_enter(&fts_optimize_wq->mutex);
2699 slot->table->fts->in_queue = false;
2700 mutex_exit(&fts_optimize_wq->mutex);
2701 slot->table = NULL;
2702 return true;
2703 }
2704 }
2705
2706 return false;
2707 }
2708
2709 /**********************************************************************//**
2710 Calculate how many tables in fts_slots need to be optimized.
2711 @return no. of tables to optimize */
fts_optimize_how_many()2712 static ulint fts_optimize_how_many()
2713 {
2714 ulint n_tables = 0;
2715 const time_t current_time = time(NULL);
2716
2717 for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2718 const fts_slot_t* slot = static_cast<const fts_slot_t*>(
2719 ib_vector_get_const(fts_slots, i));
2720 if (!slot->table) {
2721 continue;
2722 }
2723
2724 const time_t end = slot->running
2725 ? slot->last_run : slot->completed;
2726 ulint interval = ulint(current_time - end);
2727
2728 if (lint(interval) < 0
2729 || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
2730 ++n_tables;
2731 }
2732 }
2733
2734 return(n_tables);
2735 }
2736
2737 /**********************************************************************//**
2738 Check if the total memory used by all FTS table exceeds the maximum limit.
2739 @return true if a sync is needed, false otherwise */
fts_is_sync_needed()2740 static bool fts_is_sync_needed()
2741 {
2742 ulint total_memory = 0;
2743 const time_t now = time(NULL);
2744 double time_diff = difftime(now, last_check_sync_time);
2745
2746 if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
2747 return(false);
2748 }
2749
2750 last_check_sync_time = now;
2751
2752 for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2753 const fts_slot_t* slot = static_cast<const fts_slot_t*>(
2754 ib_vector_get_const(fts_slots, i));
2755
2756 if (!slot->table) {
2757 continue;
2758 }
2759
2760 if (slot->table->fts && slot->table->fts->cache) {
2761 total_memory += slot->table->fts->cache->total_size;
2762 }
2763
2764 if (total_memory > fts_max_total_cache_size) {
2765 return(true);
2766 }
2767 }
2768
2769 return(false);
2770 }
2771
2772 /** Sync fts cache of a table
2773 @param[in,out] table table to be synced */
fts_optimize_sync_table(dict_table_t * table)2774 static void fts_optimize_sync_table(dict_table_t* table)
2775 {
2776 if (table->fts && table->fts->cache && fil_table_accessible(table)) {
2777 fts_sync_table(table, false);
2778 }
2779
2780 DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
2781 }
2782
2783 /**********************************************************************//**
2784 Optimize all FTS tables.
2785 @return Dummy return */
2786 static
2787 os_thread_ret_t
DECLARE_THREAD(fts_optimize_thread)2788 DECLARE_THREAD(fts_optimize_thread)(
2789 /*================*/
2790 void* arg) /*!< in: work queue*/
2791 {
2792 ulint current = 0;
2793 ibool done = FALSE;
2794 ulint n_tables = 0;
2795 ulint n_optimize = 0;
2796 ib_wqueue_t* wq = (ib_wqueue_t*) arg;
2797
2798 ut_ad(!srv_read_only_mode);
2799 my_thread_init();
2800
2801 ut_ad(fts_slots);
2802
2803 /* Assign number of tables added in fts_slots_t to n_tables */
2804 n_tables = ib_vector_size(fts_slots);
2805
2806 while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
2807 /* If there is no message in the queue and we have tables
2808 to optimize then optimize the tables. */
2809
2810 if (!done
2811 && ib_wqueue_is_empty(wq)
2812 && n_tables > 0
2813 && n_optimize > 0) {
2814 fts_slot_t* slot = static_cast<fts_slot_t*>(
2815 ib_vector_get(fts_slots, current));
2816
2817 /* Handle the case of empty slots. */
2818 if (slot->table) {
2819 slot->running = true;
2820 fts_optimize_table_bk(slot);
2821 }
2822
2823 /* Wrap around the counter. */
2824 if (++current >= ib_vector_size(fts_slots)) {
2825 n_optimize = fts_optimize_how_many();
2826 current = 0;
2827 }
2828
2829 } else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
2830 fts_msg_t* msg;
2831
2832 msg = static_cast<fts_msg_t*>(
2833 ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS));
2834
2835 /* Timeout ? */
2836 if (msg == NULL) {
2837 if (fts_is_sync_needed()) {
2838 fts_need_sync = true;
2839 }
2840
2841 continue;
2842 }
2843
2844 switch (msg->type) {
2845 case FTS_MSG_STOP:
2846 done = TRUE;
2847 break;
2848
2849 case FTS_MSG_ADD_TABLE:
2850 ut_a(!done);
2851 if (fts_optimize_new_table(
2852 static_cast<dict_table_t*>(
2853 msg->ptr))) {
2854 ++n_tables;
2855 }
2856 break;
2857
2858 case FTS_MSG_DEL_TABLE:
2859 if (fts_optimize_del_table(
2860 static_cast<fts_msg_del_t*>(
2861 msg->ptr)->table)) {
2862 --n_tables;
2863 }
2864
2865 /* Signal the producer that we have
2866 removed the table. */
2867 os_event_set(
2868 ((fts_msg_del_t*) msg->ptr)->event);
2869 break;
2870
2871 case FTS_MSG_SYNC_TABLE:
2872 DBUG_EXECUTE_IF(
2873 "fts_instrument_msg_sync_sleep",
2874 os_thread_sleep(300000););
2875
2876 fts_optimize_sync_table(
2877 static_cast<dict_table_t*>(msg->ptr));
2878 break;
2879
2880 default:
2881 ut_error;
2882 }
2883
2884 mem_heap_free(msg->heap);
2885 n_optimize = done ? 0 : fts_optimize_how_many();
2886 }
2887 }
2888
2889 /* Server is being shutdown, sync the data from FTS cache to disk
2890 if needed */
2891 if (n_tables > 0) {
2892 for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
2893 fts_slot_t* slot = static_cast<fts_slot_t*>(
2894 ib_vector_get(fts_slots, i));
2895
2896 if (slot->table) {
2897 fts_optimize_sync_table(slot->table);
2898 }
2899 }
2900 }
2901
2902 ib_vector_free(fts_slots);
2903 fts_slots = NULL;
2904
2905 ib::info() << "FTS optimize thread exiting.";
2906
2907 os_event_set(fts_opt_shutdown_event);
2908 my_thread_end();
2909
2910 /* We count the number of threads in os_thread_exit(). A created
2911 thread should always use that to exit and not use return() to exit. */
2912 os_thread_exit();
2913
2914 OS_THREAD_DUMMY_RETURN;
2915 }
2916
2917 /**********************************************************************//**
2918 Startup the optimize thread and create the work queue. */
2919 void
fts_optimize_init(void)2920 fts_optimize_init(void)
2921 /*===================*/
2922 {
2923 mem_heap_t* heap;
2924 ib_alloc_t* heap_alloc;
2925
2926 ut_ad(!srv_read_only_mode);
2927
2928 /* For now we only support one optimize thread. */
2929 ut_a(!fts_optimize_wq);
2930
2931 /* Create FTS optimize work queue */
2932 fts_optimize_wq = ib_wqueue_create();
2933 ut_a(fts_optimize_wq != NULL);
2934
2935 /* Create FTS vector to store fts_slot_t */
2936 heap = mem_heap_create(sizeof(dict_table_t*) * 64);
2937 heap_alloc = ib_heap_allocator_create(heap);
2938 fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
2939
2940 /* Add fts tables to fts_slots which could be skipped
2941 during dict_load_table_one() because fts_optimize_thread
2942 wasn't even started. */
2943 mutex_enter(&dict_sys.mutex);
2944 for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys.table_LRU);
2945 table != NULL;
2946 table = UT_LIST_GET_NEXT(table_LRU, table)) {
2947 if (!table->fts || !dict_table_has_fts_index(table)) {
2948 continue;
2949 }
2950
2951 /* fts_optimize_thread is not started yet. So there is no
2952 need to acquire fts_optimize_wq->mutex for adding the fts
2953 table to the fts slots. */
2954 ut_ad(!table->can_be_evicted);
2955 fts_optimize_new_table(table);
2956 table->fts->in_queue = true;
2957 }
2958 mutex_exit(&dict_sys.mutex);
2959
2960 fts_opt_shutdown_event = os_event_create(0);
2961 last_check_sync_time = time(NULL);
2962
2963 os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
2964 }
2965
2966 /** Shutdown fts optimize thread. */
2967 void
fts_optimize_shutdown()2968 fts_optimize_shutdown()
2969 {
2970 ut_ad(!srv_read_only_mode);
2971
2972 fts_msg_t* msg;
2973
2974 /* If there is an ongoing activity on dictionary, such as
2975 srv_master_evict_from_table_cache(), wait for it */
2976 dict_mutex_enter_for_mysql();
2977
2978 /* Tells FTS optimizer system that we are exiting from
2979 optimizer thread, message send their after will not be
2980 processed */
2981 fts_opt_start_shutdown = true;
2982 dict_mutex_exit_for_mysql();
2983
2984 /* We tell the OPTIMIZE thread to switch to state done, we
2985 can't delete the work queue here because the add thread needs
2986 deregister the FTS tables. */
2987
2988 msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
2989
2990 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2991
2992 os_event_wait(fts_opt_shutdown_event);
2993
2994 os_event_destroy(fts_opt_shutdown_event);
2995
2996 ib_wqueue_free(fts_optimize_wq);
2997 fts_optimize_wq = NULL;
2998 }
2999