1 /*****************************************************************************
2
3 Copyright (c) 2007, 2021, Oracle and/or its affiliates.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /******************************************************************//**
28 @file fts/fts0opt.cc
29 Full Text Search optimize thread
30
31 Created 2007/03/27 Sunny Bains
32 Completed 2011/7/10 Sunny and Jimmy Yang
33
34 ***********************************************************************/
35
36 #include "ha_prototypes.h"
37
38 #include "fts0fts.h"
39 #include "row0sel.h"
40 #include "que0types.h"
41 #include "fts0priv.h"
42 #include "fts0types.h"
43 #include "ut0wqueue.h"
44 #include "srv0start.h"
45 #include "ut0list.h"
46 #include "zlib.h"
47
48 #ifdef UNIV_NONINL
49 #include "fts0types.ic"
50 #include "fts0vlc.ic"
51 #endif
52
53 /** The FTS optimize thread's work queue. */
54 static ib_wqueue_t* fts_optimize_wq;
55
56 /** The FTS vector to store fts_slot_t */
57 static ib_vector_t* fts_slots;
58
59 /** Time to wait for a message. */
60 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
61
62 /** Default optimize interval in secs. */
63 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
64
65 /** Server is shutting down, so does we exiting the optimize thread */
66 static bool fts_opt_start_shutdown = false;
67
68 /** Event to wait for shutdown of the optimize thread */
69 static os_event_t fts_opt_shutdown_event = NULL;
70
71 /** Initial size of nodes in fts_word_t. */
72 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
73
74 /** Last time we did check whether system need a sync */
75 static ib_time_monotonic_t last_check_sync_time;
76
77 /** State of a table within the optimization sub system. */
78 enum fts_state_t {
79 FTS_STATE_LOADED,
80 FTS_STATE_RUNNING,
81 FTS_STATE_SUSPENDED,
82 FTS_STATE_DONE,
83 FTS_STATE_EMPTY
84 };
85
86 /** FTS optimize thread message types. */
87 enum fts_msg_type_t {
88 FTS_MSG_STOP, /*!< Stop optimizing and exit thread */
89
90 FTS_MSG_ADD_TABLE, /*!< Add table to the optimize thread's
91 work queue */
92
93 FTS_MSG_DEL_TABLE, /*!< Remove a table from the optimize
94 threads work queue */
95 FTS_MSG_SYNC_TABLE /*!< Sync fts cache of a table */
96 };
97
98 /** Compressed list of words that have been read from FTS INDEX
99 that needs to be optimized. */
100 struct fts_zip_t {
101 lint status; /*!< Status of (un)/zip operation */
102
103 ulint n_words; /*!< Number of words compressed */
104
105 ulint block_sz; /*!< Size of a block in bytes */
106
107 ib_vector_t* blocks; /*!< Vector of compressed blocks */
108
109 ib_alloc_t* heap_alloc; /*!< Heap to use for allocations */
110
111 ulint pos; /*!< Offset into blocks */
112
113 ulint last_big_block; /*!< Offset of last block in the
114 blocks array that is of size
115 block_sz. Blocks beyond this offset
116 are of size FTS_MAX_WORD_LEN */
117
118 z_streamp zp; /*!< ZLib state */
119
120 /*!< The value of the last word read
121 from the FTS INDEX table. This is
122 used to discard duplicates */
123
124 fts_string_t word; /*!< UTF-8 string */
125
126 ulint max_words; /*!< maximum number of words to read
127 in one pase */
128 };
129
130 /** Prepared statemets used during optimize */
131 struct fts_optimize_graph_t {
132 /*!< Delete a word from FTS INDEX */
133 que_t* delete_nodes_graph;
134 /*!< Insert a word into FTS INDEX */
135 que_t* write_nodes_graph;
136 /*!< COMMIT a transaction */
137 que_t* commit_graph;
138 /*!< Read the nodes from FTS_INDEX */
139 que_t* read_nodes_graph;
140 };
141
142 /** Used by fts_optimize() to store state. */
143 struct fts_optimize_t {
144 trx_t* trx; /*!< The transaction used for all SQL */
145
146 ib_alloc_t* self_heap; /*!< Heap to use for allocations */
147
148 char* name_prefix; /*!< FTS table name prefix */
149
150 fts_table_t fts_index_table;/*!< Common table definition */
151
152 /*!< Common table definition */
153 fts_table_t fts_common_table;
154
155 dict_table_t* table; /*!< Table that has to be queried */
156
157 dict_index_t* index; /*!< The FTS index to be optimized */
158
159 fts_doc_ids_t* to_delete; /*!< doc ids to delete, we check against
160 this vector and purge the matching
161 entries during the optimizing
162 process. The vector entries are
163 sorted on doc id */
164
165 ulint del_pos; /*!< Offset within to_delete vector,
166 this is used to keep track of where
167 we are up to in the vector */
168
169 ibool done; /*!< TRUE when optimize finishes */
170
171 ib_vector_t* words; /*!< Word + Nodes read from FTS_INDEX,
172 it contains instances of fts_word_t */
173
174 fts_zip_t* zip; /*!< Words read from the FTS_INDEX */
175
176 fts_optimize_graph_t /*!< Prepared statements used during */
177 graph; /*optimize */
178
179 ulint n_completed; /*!< Number of FTS indexes that have
180 been optimized */
181 ibool del_list_regenerated;
182 /*!< BEING_DELETED list regenarated */
183 };
184
185 /** Used by the optimize, to keep state during compacting nodes. */
186 struct fts_encode_t {
187 doc_id_t src_last_doc_id;/*!< Last doc id read from src node */
188 byte* src_ilist_ptr; /*!< Current ptr within src ilist */
189 };
190
191 /** We use this information to determine when to start the optimize
192 cycle for a table. */
193 struct fts_slot_t {
194 dict_table_t* table; /*!< Table to optimize */
195
196 table_id_t table_id; /*!< Table id */
197
198 fts_state_t state; /*!< State of this slot */
199
200 ulint added; /*!< Number of doc ids added since the
201 last time this table was optimized */
202
203 ulint deleted; /*!< Number of doc ids deleted since the
204 last time this table was optimized */
205
206 ib_time_monotonic_t last_run; /*!< Time last run completed */
207
208 ib_time_monotonic_t completed; /*!< Optimize finish time */
209
210 ib_time_t interval_time; /*!< Minimum time to wait before
211 optimizing the table again. */
212 };
213
214 /** A table remove message for the FTS optimize thread. */
215 struct fts_msg_del_t {
216 dict_table_t* table; /*!< The table to remove */
217
218 os_event_t event; /*!< Event to synchronize acknowledgement
219 of receipt and processing of the
220 this message by the consumer */
221 };
222
223 /** The FTS optimize message work queue message type. */
224 struct fts_msg_t {
225 fts_msg_type_t type; /*!< Message type */
226
227 void* ptr; /*!< The message contents */
228
229 mem_heap_t* heap; /*!< The heap used to allocate this
230 message, the message consumer will
231 free the heap. */
232 };
233
234 /** The number of words to read and optimize in a single pass. */
235 ulong fts_num_word_optimize;
236
237 /** Whether to enable additional FTS diagnostic printout. */
238 char fts_enable_diag_print;
239
240 /** ZLib compressed block size.*/
241 static ulint FTS_ZIP_BLOCK_SIZE = 1024;
242
243 /** The amount of time optimizing in a single pass, in milliseconds. */
244 static ib_time_t fts_optimize_time_limit = 0;
245
246 /** It's defined in fts0fts.cc */
247 extern const char* fts_common_tables[];
248
249 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
250 static const char* fts_init_delete_sql =
251 "BEGIN\n"
252 "\n"
253 "INSERT INTO $BEING_DELETED\n"
254 "SELECT doc_id FROM $DELETED;\n"
255 "\n"
256 "INSERT INTO $BEING_DELETED_CACHE\n"
257 "SELECT doc_id FROM $DELETED_CACHE;\n";
258
259 static const char* fts_delete_doc_ids_sql =
260 "BEGIN\n"
261 "\n"
262 "DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
263 "DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
264
265 static const char* fts_end_delete_sql =
266 "BEGIN\n"
267 "\n"
268 "DELETE FROM $BEING_DELETED;\n"
269 "DELETE FROM $BEING_DELETED_CACHE;\n";
270
271 /**********************************************************************//**
272 Initialize fts_zip_t. */
273 static
274 void
fts_zip_initialize(fts_zip_t * zip)275 fts_zip_initialize(
276 /*===============*/
277 fts_zip_t* zip) /*!< out: zip instance to initialize */
278 {
279 zip->pos = 0;
280 zip->n_words = 0;
281
282 zip->status = Z_OK;
283
284 zip->last_big_block = 0;
285
286 zip->word.f_len = 0;
287 memset(zip->word.f_str, 0, FTS_MAX_WORD_LEN);
288
289 ib_vector_reset(zip->blocks);
290
291 memset(zip->zp, 0, sizeof(*zip->zp));
292 }
293
294 /**********************************************************************//**
295 Create an instance of fts_zip_t.
296 @return a new instance of fts_zip_t */
297 static
298 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)299 fts_zip_create(
300 /*===========*/
301 mem_heap_t* heap, /*!< in: heap */
302 ulint block_sz, /*!< in: size of a zip block.*/
303 ulint max_words) /*!< in: max words to read */
304 {
305 fts_zip_t* zip;
306
307 zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
308
309 zip->word.f_str = static_cast<byte*>(
310 mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
311
312 zip->block_sz = block_sz;
313
314 zip->heap_alloc = ib_heap_allocator_create(heap);
315
316 zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
317
318 zip->max_words = max_words;
319
320 zip->zp = static_cast<z_stream*>(
321 mem_heap_zalloc(heap, sizeof(*zip->zp)));
322
323 return(zip);
324 }
325
326 /**********************************************************************//**
327 Initialize an instance of fts_zip_t. */
328 static
329 void
fts_zip_init(fts_zip_t * zip)330 fts_zip_init(
331 /*=========*/
332
333 fts_zip_t* zip) /*!< in: zip instance to init */
334 {
335 memset(zip->zp, 0, sizeof(*zip->zp));
336
337 zip->word.f_len = 0;
338 *zip->word.f_str = '\0';
339 }
340
341 /**********************************************************************//**
342 Create a fts_optimizer_word_t instance.
343 @return new instance */
344 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)345 fts_word_init(
346 /*==========*/
347 fts_word_t* word, /*!< in: word to initialize */
348 byte* utf8, /*!< in: UTF-8 string */
349 ulint len) /*!< in: length of string in bytes */
350 {
351 mem_heap_t* heap = mem_heap_create(sizeof(fts_node_t));
352
353 memset(word, 0, sizeof(*word));
354
355 word->text.f_len = len;
356 word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
357
358 /* Need to copy the NUL character too. */
359 memcpy(word->text.f_str, utf8, word->text.f_len);
360 word->text.f_str[word->text.f_len] = 0;
361
362 word->heap_alloc = ib_heap_allocator_create(heap);
363
364 word->nodes = ib_vector_create(
365 word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
366
367 return(word);
368 }
369
370 /**********************************************************************//**
371 Read the FTS INDEX row.
372 @return fts_node_t instance */
373 static
374 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)375 fts_optimize_read_node(
376 /*===================*/
377 fts_word_t* word, /*!< in: */
378 que_node_t* exp) /*!< in: */
379 {
380 int i;
381 fts_node_t* node = static_cast<fts_node_t*>(
382 ib_vector_push(word->nodes, NULL));
383
384 /* Start from 1 since the first node has been read by the caller */
385 for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
386
387 dfield_t* dfield = que_node_get_val(exp);
388 byte* data = static_cast<byte*>(
389 dfield_get_data(dfield));
390 ulint len = dfield_get_len(dfield);
391
392 ut_a(len != UNIV_SQL_NULL);
393
394 /* Note: The column numbers below must match the SELECT */
395 switch (i) {
396 case 1: /* DOC_COUNT */
397 node->doc_count = mach_read_from_4(data);
398 break;
399
400 case 2: /* FIRST_DOC_ID */
401 node->first_doc_id = fts_read_doc_id(data);
402 break;
403
404 case 3: /* LAST_DOC_ID */
405 node->last_doc_id = fts_read_doc_id(data);
406 break;
407
408 case 4: /* ILIST */
409 node->ilist_size_alloc = node->ilist_size = len;
410 node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
411 memcpy(node->ilist, data, len);
412 break;
413
414 default:
415 ut_error;
416 }
417 }
418
419 /* Make sure all columns were read. */
420 ut_a(i == 5);
421
422 return(node);
423 }
424
425 /**********************************************************************//**
426 Callback function to fetch the rows in an FTS INDEX record.
427 @return always returns non-NULL */
428 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)429 fts_optimize_index_fetch_node(
430 /*==========================*/
431 void* row, /*!< in: sel_node_t* */
432 void* user_arg) /*!< in: pointer to ib_vector_t */
433 {
434 fts_word_t* word;
435 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
436 fts_fetch_t* fetch = static_cast<fts_fetch_t*>(user_arg);
437 ib_vector_t* words = static_cast<ib_vector_t*>(fetch->read_arg);
438 que_node_t* exp = sel_node->select_list;
439 dfield_t* dfield = que_node_get_val(exp);
440 void* data = dfield_get_data(dfield);
441 ulint dfield_len = dfield_get_len(dfield);
442 fts_node_t* node;
443 bool is_word_init = false;
444
445 ut_a(dfield_len <= FTS_MAX_WORD_LEN);
446
447 if (ib_vector_size(words) == 0) {
448
449 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
450 fts_word_init(word, (byte*) data, dfield_len);
451 is_word_init = true;
452 }
453
454 word = static_cast<fts_word_t*>(ib_vector_last(words));
455
456 if (dfield_len != word->text.f_len
457 || memcmp(word->text.f_str, data, dfield_len)) {
458
459 word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
460 fts_word_init(word, (byte*) data, dfield_len);
461 is_word_init = true;
462 }
463
464 node = fts_optimize_read_node(word, que_node_get_next(exp));
465
466 fetch->total_memory += node->ilist_size;
467 if (is_word_init) {
468 fetch->total_memory += sizeof(fts_word_t)
469 + sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
470 + sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
471 } else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
472 fetch->total_memory += sizeof(fts_node_t);
473 }
474
475 if (fetch->total_memory >= fts_result_cache_limit) {
476 return(FALSE);
477 }
478
479 return(TRUE);
480 }
481
482 /**********************************************************************//**
483 Read the rows from the FTS inde.
484 @return DB_SUCCESS or error code */
485 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)486 fts_index_fetch_nodes(
487 /*==================*/
488 trx_t* trx, /*!< in: transaction */
489 que_t** graph, /*!< in: prepared statement */
490 fts_table_t* fts_table, /*!< in: table of the FTS INDEX */
491 const fts_string_t*
492 word, /*!< in: the word to fetch */
493 fts_fetch_t* fetch) /*!< in: fetch callback.*/
494 {
495 pars_info_t* info;
496 dberr_t error;
497 char table_name[MAX_FULL_NAME_LEN];
498
499 trx->op_info = "fetching FTS index nodes";
500
501 if (*graph) {
502 info = (*graph)->info;
503 } else {
504 ulint selected;
505
506 info = pars_info_create();
507
508 ut_a(fts_table->type == FTS_INDEX_TABLE);
509
510 selected = fts_select_index(fts_table->charset,
511 word->f_str, word->f_len);
512
513 fts_table->suffix = fts_get_suffix(selected);
514
515 fts_get_table_name(fts_table, table_name);
516
517 pars_info_bind_id(info, true, "table_name", table_name);
518 }
519
520 pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
521 pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
522
523 if (!*graph) {
524
525 *graph = fts_parse_sql(
526 fts_table,
527 info,
528 "DECLARE FUNCTION my_func;\n"
529 "DECLARE CURSOR c IS"
530 " SELECT word, doc_count, first_doc_id, last_doc_id,"
531 " ilist\n"
532 " FROM $table_name\n"
533 " WHERE word LIKE :word\n"
534 " ORDER BY first_doc_id;\n"
535 "BEGIN\n"
536 "\n"
537 "OPEN c;\n"
538 "WHILE 1 = 1 LOOP\n"
539 " FETCH c INTO my_func();\n"
540 " IF c % NOTFOUND THEN\n"
541 " EXIT;\n"
542 " END IF;\n"
543 "END LOOP;\n"
544 "CLOSE c;");
545 }
546
547 for (;;) {
548 error = fts_eval_sql(trx, *graph);
549
550 if (error == DB_SUCCESS) {
551 fts_sql_commit(trx);
552
553 break; /* Exit the loop. */
554 } else {
555 fts_sql_rollback(trx);
556
557 if (error == DB_LOCK_WAIT_TIMEOUT) {
558 ib::warn() << "lock wait timeout reading"
559 " FTS index. Retrying!";
560
561 trx->error_state = DB_SUCCESS;
562 } else {
563 ib::error() << "(" << ut_strerr(error)
564 << ") while reading FTS index.";
565
566 break; /* Exit the loop. */
567 }
568 }
569 }
570
571 return(error);
572 }
573
574 /**********************************************************************//**
575 Read a word */
576 static
577 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)578 fts_zip_read_word(
579 /*==============*/
580 fts_zip_t* zip, /*!< in: Zip state + data */
581 fts_string_t* word) /*!< out: uncompressed word */
582 {
583 short len = 0;
584 void* null = NULL;
585 byte* ptr = word->f_str;
586 int flush = Z_NO_FLUSH;
587
588 /* Either there was an error or we are at the Z_STREAM_END. */
589 if (zip->status != Z_OK) {
590 return(NULL);
591 }
592
593 zip->zp->next_out = reinterpret_cast<byte*>(&len);
594 zip->zp->avail_out = sizeof(len);
595
596 while (zip->status == Z_OK && zip->zp->avail_out > 0) {
597
598 /* Finished decompressing block. */
599 if (zip->zp->avail_in == 0) {
600
601 /* Free the block thats been decompressed. */
602 if (zip->pos > 0) {
603 ulint prev = zip->pos - 1;
604
605 ut_a(zip->pos < ib_vector_size(zip->blocks));
606
607 ut_free(ib_vector_getp(zip->blocks, prev));
608 ib_vector_set(zip->blocks, prev, &null);
609 }
610
611 /* Any more blocks to decompress. */
612 if (zip->pos < ib_vector_size(zip->blocks)) {
613
614 zip->zp->next_in = static_cast<byte*>(
615 ib_vector_getp(
616 zip->blocks, zip->pos));
617
618 if (zip->pos > zip->last_big_block) {
619 zip->zp->avail_in =
620 FTS_MAX_WORD_LEN;
621 } else {
622 zip->zp->avail_in =
623 static_cast<uInt>(zip->block_sz);
624 }
625
626 ++zip->pos;
627 } else {
628 flush = Z_FINISH;
629 }
630 }
631
632 switch (zip->status = inflate(zip->zp, flush)) {
633 case Z_OK:
634 if (zip->zp->avail_out == 0 && len > 0) {
635
636 ut_a(len <= FTS_MAX_WORD_LEN);
637 ptr[len] = 0;
638
639 zip->zp->next_out = ptr;
640 zip->zp->avail_out = len;
641
642 word->f_len = len;
643 len = 0;
644 }
645 break;
646
647 case Z_BUF_ERROR: /* No progress possible. */
648 case Z_STREAM_END:
649 inflateEnd(zip->zp);
650 break;
651
652 case Z_STREAM_ERROR:
653 default:
654 ut_error;
655 }
656 }
657
658 /* All blocks must be freed at end of inflate. */
659 if (zip->status != Z_OK) {
660 for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
661 if (ib_vector_getp(zip->blocks, i)) {
662 ut_free(ib_vector_getp(zip->blocks, i));
663 ib_vector_set(zip->blocks, i, &null);
664 }
665 }
666 }
667
668 if (ptr != NULL) {
669 ut_ad(word->f_len == strlen((char*) ptr));
670 }
671
672 return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
673 }
674
675 /**********************************************************************//**
676 Callback function to fetch and compress the word in an FTS
677 INDEX record.
678 @return FALSE on EOF */
679 static
680 ibool
fts_fetch_index_words(void * row,void * user_arg)681 fts_fetch_index_words(
682 /*==================*/
683 void* row, /*!< in: sel_node_t* */
684 void* user_arg) /*!< in: pointer to ib_vector_t */
685 {
686 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
687 fts_zip_t* zip = static_cast<fts_zip_t*>(user_arg);
688 que_node_t* exp = sel_node->select_list;
689 dfield_t* dfield = que_node_get_val(exp);
690 ushort len = static_cast<ushort>(dfield_get_len(dfield));
691 void* data = dfield_get_data(dfield);
692
693 /* Skip the duplicate words. */
694 if (zip->word.f_len == static_cast<ulint>(len)
695 && !memcmp(zip->word.f_str, data, len)) {
696
697 return(TRUE);
698 }
699
700 ut_a(len <= FTS_MAX_WORD_LEN);
701
702 memcpy(zip->word.f_str, data, len);
703 zip->word.f_len = len;
704
705 ut_a(zip->zp->avail_in == 0);
706 ut_a(zip->zp->next_in == NULL);
707
708 /* The string is prefixed by len. */
709 zip->zp->next_in = reinterpret_cast<byte*>(&len);
710 zip->zp->avail_in = sizeof(len);
711
712 /* Compress the word, create output blocks as necessary. */
713 while (zip->zp->avail_in > 0) {
714
715 /* No space left in output buffer, create a new one. */
716 if (zip->zp->avail_out == 0) {
717 byte* block;
718
719 block = static_cast<byte*>(
720 ut_malloc_nokey(zip->block_sz));
721
722 ib_vector_push(zip->blocks, &block);
723
724 zip->zp->next_out = block;
725 zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
726 }
727
728 switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
729 case Z_OK:
730 if (zip->zp->avail_in == 0) {
731 zip->zp->next_in = static_cast<byte*>(data);
732 zip->zp->avail_in = len;
733 ut_a(len <= FTS_MAX_WORD_LEN);
734 len = 0;
735 }
736 break;
737
738 case Z_STREAM_END:
739 case Z_BUF_ERROR:
740 case Z_STREAM_ERROR:
741 default:
742 ut_error;
743 break;
744 }
745 }
746
747 /* All data should have been compressed. */
748 ut_a(zip->zp->avail_in == 0);
749 zip->zp->next_in = NULL;
750
751 ++zip->n_words;
752
753 return(zip->n_words >= zip->max_words ? FALSE : TRUE);
754 }
755
756 /**********************************************************************//**
757 Finish Zip deflate. */
758 static
759 void
fts_zip_deflate_end(fts_zip_t * zip)760 fts_zip_deflate_end(
761 /*================*/
762 fts_zip_t* zip) /*!< in: instance that should be closed*/
763 {
764 ut_a(zip->zp->avail_in == 0);
765 ut_a(zip->zp->next_in == NULL);
766
767 zip->status = deflate(zip->zp, Z_FINISH);
768
769 ut_a(ib_vector_size(zip->blocks) > 0);
770 zip->last_big_block = ib_vector_size(zip->blocks) - 1;
771
772 /* Allocate smaller block(s), since this is trailing data. */
773 while (zip->status == Z_OK) {
774 byte* block;
775
776 ut_a(zip->zp->avail_out == 0);
777
778 block = static_cast<byte*>(
779 ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
780
781 ib_vector_push(zip->blocks, &block);
782
783 zip->zp->next_out = block;
784 zip->zp->avail_out = FTS_MAX_WORD_LEN;
785
786 zip->status = deflate(zip->zp, Z_FINISH);
787 }
788
789 ut_a(zip->status == Z_STREAM_END);
790
791 zip->status = deflateEnd(zip->zp);
792 ut_a(zip->status == Z_OK);
793
794 /* Reset the ZLib data structure. */
795 memset(zip->zp, 0, sizeof(*zip->zp));
796 }
797
798 /**********************************************************************//**
799 Read the words from the FTS INDEX.
800 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
801 to search else error code */
802 static MY_ATTRIBUTE((nonnull, warn_unused_result))
803 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)804 fts_index_fetch_words(
805 /*==================*/
806 fts_optimize_t* optim, /*!< in: optimize scratch pad */
807 const fts_string_t* word, /*!< in: get words greater than this
808 word */
809 ulint n_words)/*!< in: max words to read */
810 {
811 pars_info_t* info;
812 que_t* graph;
813 ulint selected;
814 fts_zip_t* zip = NULL;
815 dberr_t error = DB_SUCCESS;
816 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
817 ibool inited = FALSE;
818
819 optim->trx->op_info = "fetching FTS index words";
820
821 if (optim->zip == NULL) {
822 optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
823 } else {
824 fts_zip_initialize(optim->zip);
825 }
826
827 for (selected = fts_select_index(
828 optim->fts_index_table.charset, word->f_str, word->f_len);
829 selected < FTS_NUM_AUX_INDEX;
830 selected++) {
831
832 char table_name[MAX_FULL_NAME_LEN];
833
834 optim->fts_index_table.suffix = fts_get_suffix(selected);
835
836 info = pars_info_create();
837
838 pars_info_bind_function(
839 info, "my_func", fts_fetch_index_words, optim->zip);
840
841 pars_info_bind_varchar_literal(
842 info, "word", word->f_str, word->f_len);
843
844 fts_get_table_name(&optim->fts_index_table, table_name);
845 pars_info_bind_id(info, true, "table_name", table_name);
846
847 graph = fts_parse_sql(
848 &optim->fts_index_table,
849 info,
850 "DECLARE FUNCTION my_func;\n"
851 "DECLARE CURSOR c IS"
852 " SELECT word\n"
853 " FROM $table_name\n"
854 " WHERE word > :word\n"
855 " ORDER BY word;\n"
856 "BEGIN\n"
857 "\n"
858 "OPEN c;\n"
859 "WHILE 1 = 1 LOOP\n"
860 " FETCH c INTO my_func();\n"
861 " IF c % NOTFOUND THEN\n"
862 " EXIT;\n"
863 " END IF;\n"
864 "END LOOP;\n"
865 "CLOSE c;");
866
867 zip = optim->zip;
868
869 for (;;) {
870 int err;
871
872 if (!inited && ((err = deflateInit(zip->zp, 9))
873 != Z_OK)) {
874 ib::error() << "ZLib deflateInit() failed: "
875 << err;
876
877 error = DB_ERROR;
878 break;
879 } else {
880 inited = TRUE;
881 error = fts_eval_sql(optim->trx, graph);
882 }
883
884 if (error == DB_SUCCESS) {
885 //FIXME fts_sql_commit(optim->trx);
886 break;
887 } else {
888 //FIXME fts_sql_rollback(optim->trx);
889
890 if (error == DB_LOCK_WAIT_TIMEOUT) {
891 ib::warn() << "Lock wait timeout"
892 " reading document. Retrying!";
893
894 /* We need to reset the ZLib state. */
895 inited = FALSE;
896 deflateEnd(zip->zp);
897 fts_zip_init(zip);
898
899 optim->trx->error_state = DB_SUCCESS;
900 } else {
901 ib::error() << "(" << ut_strerr(error)
902 << ") while reading document.";
903
904 break; /* Exit the loop. */
905 }
906 }
907 }
908
909 fts_que_graph_free(graph);
910
911 /* Check if max word to fetch is exceeded */
912 if (optim->zip->n_words >= n_words) {
913 break;
914 }
915 }
916
917 if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
918
919 /* All data should have been read. */
920 ut_a(zip->zp->avail_in == 0);
921
922 fts_zip_deflate_end(zip);
923 } else {
924 deflateEnd(zip->zp);
925 }
926
927 return(error);
928 }
929
930 /**********************************************************************//**
931 Callback function to fetch the doc id from the record.
932 @return always returns TRUE */
933 static
934 ibool
fts_fetch_doc_ids(void * row,void * user_arg)935 fts_fetch_doc_ids(
936 /*==============*/
937 void* row, /*!< in: sel_node_t* */
938 void* user_arg) /*!< in: pointer to ib_vector_t */
939 {
940 que_node_t* exp;
941 int i = 0;
942 sel_node_t* sel_node = static_cast<sel_node_t*>(row);
943 fts_doc_ids_t* fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
944 fts_update_t* update = static_cast<fts_update_t*>(
945 ib_vector_push(fts_doc_ids->doc_ids, NULL));
946
947 for (exp = sel_node->select_list;
948 exp;
949 exp = que_node_get_next(exp), ++i) {
950
951 dfield_t* dfield = que_node_get_val(exp);
952 void* data = dfield_get_data(dfield);
953 ulint len = dfield_get_len(dfield);
954
955 ut_a(len != UNIV_SQL_NULL);
956
957 /* Note: The column numbers below must match the SELECT. */
958 switch (i) {
959 case 0: /* DOC_ID */
960 update->fts_indexes = NULL;
961 update->doc_id = fts_read_doc_id(
962 static_cast<byte*>(data));
963 break;
964
965 default:
966 ut_error;
967 }
968 }
969
970 return(TRUE);
971 }
972
973 /**********************************************************************//**
974 Read the rows from a FTS common auxiliary table.
975 @return DB_SUCCESS or error code */
976 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)977 fts_table_fetch_doc_ids(
978 /*====================*/
979 trx_t* trx, /*!< in: transaction */
980 fts_table_t* fts_table, /*!< in: table */
981 fts_doc_ids_t* doc_ids) /*!< in: For collecting doc ids */
982 {
983 dberr_t error;
984 que_t* graph;
985 pars_info_t* info = pars_info_create();
986 ibool alloc_bk_trx = FALSE;
987 char table_name[MAX_FULL_NAME_LEN];
988
989 ut_a(fts_table->suffix != NULL);
990 ut_a(fts_table->type == FTS_COMMON_TABLE);
991
992 if (!trx) {
993 trx = trx_allocate_for_background();
994 alloc_bk_trx = TRUE;
995 }
996
997 trx->op_info = "fetching FTS doc ids";
998
999 pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
1000
1001 fts_get_table_name(fts_table, table_name);
1002 pars_info_bind_id(info, true, "table_name", table_name);
1003
1004 graph = fts_parse_sql(
1005 fts_table,
1006 info,
1007 "DECLARE FUNCTION my_func;\n"
1008 "DECLARE CURSOR c IS"
1009 " SELECT doc_id FROM $table_name;\n"
1010 "BEGIN\n"
1011 "\n"
1012 "OPEN c;\n"
1013 "WHILE 1 = 1 LOOP\n"
1014 " FETCH c INTO my_func();\n"
1015 " IF c % NOTFOUND THEN\n"
1016 " EXIT;\n"
1017 " END IF;\n"
1018 "END LOOP;\n"
1019 "CLOSE c;");
1020
1021 error = fts_eval_sql(trx, graph);
1022
1023 mutex_enter(&dict_sys->mutex);
1024 que_graph_free(graph);
1025 mutex_exit(&dict_sys->mutex);
1026
1027 if (error == DB_SUCCESS) {
1028 fts_sql_commit(trx);
1029
1030 ib_vector_sort(doc_ids->doc_ids, fts_update_doc_id_cmp);
1031 } else {
1032 fts_sql_rollback(trx);
1033 }
1034
1035 if (alloc_bk_trx) {
1036 trx_free_for_background(trx);
1037 }
1038
1039 return(error);
1040 }
1041
1042 /**********************************************************************//**
1043 Do a binary search for a doc id in the array
1044 @return +ve index if found -ve index where it should be inserted
1045 if not found */
1046 int
fts_bsearch(fts_update_t * array,int lower,int upper,doc_id_t doc_id)1047 fts_bsearch(
1048 /*========*/
1049 fts_update_t* array, /*!< in: array to sort */
1050 int lower, /*!< in: the array lower bound */
1051 int upper, /*!< in: the array upper bound */
1052 doc_id_t doc_id) /*!< in: the doc id to search for */
1053 {
1054 int orig_size = upper;
1055
1056 if (upper == 0) {
1057 /* Nothing to search */
1058 return(-1);
1059 } else {
1060 while (lower < upper) {
1061 int i = (lower + upper) >> 1;
1062
1063 if (doc_id > array[i].doc_id) {
1064 lower = i + 1;
1065 } else if (doc_id < array[i].doc_id) {
1066 upper = i - 1;
1067 } else {
1068 return(i); /* Found. */
1069 }
1070 }
1071 }
1072
1073 if (lower == upper && lower < orig_size) {
1074 if (doc_id == array[lower].doc_id) {
1075 return(lower);
1076 } else if (lower == 0) {
1077 return(-1);
1078 }
1079 }
1080
1081 /* Not found. */
1082 return( (lower == 0) ? -1 : -(lower));
1083 }
1084
1085 /**********************************************************************//**
1086 Search in the to delete array whether any of the doc ids within
1087 the [first, last] range are to be deleted
1088 @return +ve index if found -ve index where it should be inserted
1089 if not found */
1090 static
1091 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1092 fts_optimize_lookup(
1093 /*================*/
1094 ib_vector_t* doc_ids, /*!< in: array to search */
1095 ulint lower, /*!< in: lower limit of array */
1096 doc_id_t first_doc_id, /*!< in: doc id to lookup */
1097 doc_id_t last_doc_id) /*!< in: doc id to lookup */
1098 {
1099 int pos;
1100 int upper = static_cast<int>(ib_vector_size(doc_ids));
1101 fts_update_t* array = (fts_update_t*) doc_ids->data;
1102
1103 pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1104
1105 ut_a(abs(pos) <= upper + 1);
1106
1107 if (pos < 0) {
1108
1109 int i = abs(pos);
1110
1111 /* If i is 1, it could be first_doc_id is less than
1112 either the first or second array item, do a
1113 double check */
1114 if (i == 1 && array[0].doc_id <= last_doc_id
1115 && first_doc_id < array[0].doc_id) {
1116 pos = 0;
1117 } else if (i < upper && array[i].doc_id <= last_doc_id) {
1118
1119 /* Check if the "next" doc id is within the
1120 first & last doc id of the node. */
1121 pos = i;
1122 }
1123 }
1124
1125 return(pos);
1126 }
1127
1128 /**********************************************************************//**
1129 Encode the word pos list into the node
1130 @return DB_SUCCESS or error code*/
1131 static MY_ATTRIBUTE((nonnull))
1132 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1133 fts_optimize_encode_node(
1134 /*=====================*/
1135 fts_node_t* node, /*!< in: node to fill*/
1136 doc_id_t doc_id, /*!< in: doc id to encode */
1137 fts_encode_t* enc) /*!< in: encoding state.*/
1138 {
1139 byte* dst;
1140 ulint enc_len;
1141 ulint pos_enc_len;
1142 doc_id_t doc_id_delta;
1143 dberr_t error = DB_SUCCESS;
1144 byte* src = enc->src_ilist_ptr;
1145
1146 if (node->first_doc_id == 0) {
1147 ut_a(node->last_doc_id == 0);
1148
1149 node->first_doc_id = doc_id;
1150 }
1151
1152 /* Calculate the space required to store the ilist. */
1153 ut_ad(doc_id > node->last_doc_id);
1154 doc_id_delta = doc_id - node->last_doc_id;
1155 enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1156
1157 /* Calculate the size of the encoded pos array. */
1158 while (*src) {
1159 fts_decode_vlc(&src);
1160 }
1161
1162 /* Skip the 0x00 byte at the end of the word positions list. */
1163 ++src;
1164
1165 /* Number of encoded pos bytes to copy. */
1166 pos_enc_len = src - enc->src_ilist_ptr;
1167
1168 /* Total number of bytes required for copy. */
1169 enc_len += pos_enc_len;
1170
1171 /* Check we have enough space in the destination buffer for
1172 copying the document word list. */
1173 if (!node->ilist) {
1174 ulint new_size;
1175
1176 ut_a(node->ilist_size == 0);
1177
1178 new_size = enc_len > FTS_ILIST_MAX_SIZE
1179 ? enc_len : FTS_ILIST_MAX_SIZE;
1180
1181 node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1182 node->ilist_size_alloc = new_size;
1183
1184 } else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1185 ulint new_size = node->ilist_size + enc_len;
1186 byte* ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1187
1188 memcpy(ilist, node->ilist, node->ilist_size);
1189
1190 ut_free(node->ilist);
1191
1192 node->ilist = ilist;
1193 node->ilist_size_alloc = new_size;
1194 }
1195
1196 src = enc->src_ilist_ptr;
1197 dst = node->ilist + node->ilist_size;
1198
1199 /* Encode the doc id. Cast to ulint, the delta should be small and
1200 therefore no loss of precision. */
1201 dst += fts_encode_int((ulint) doc_id_delta, dst);
1202
1203 /* Copy the encoded pos array. */
1204 memcpy(dst, src, pos_enc_len);
1205
1206 node->last_doc_id = doc_id;
1207
1208 /* Data copied upto here. */
1209 node->ilist_size += enc_len;
1210 enc->src_ilist_ptr += pos_enc_len;
1211
1212 ut_a(node->ilist_size <= node->ilist_size_alloc);
1213
1214 return(error);
1215 }
1216
1217 /**********************************************************************//**
1218 Optimize the data contained in a node.
1219 @return DB_SUCCESS or error code*/
1220 static MY_ATTRIBUTE((nonnull))
1221 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1222 fts_optimize_node(
1223 /*==============*/
1224 ib_vector_t* del_vec, /*!< in: vector of doc ids to delete*/
1225 int* del_pos, /*!< in: offset into above vector */
1226 fts_node_t* dst_node, /*!< in: node to fill*/
1227 fts_node_t* src_node, /*!< in: source node for data*/
1228 fts_encode_t* enc) /*!< in: encoding state */
1229 {
1230 ulint copied;
1231 dberr_t error = DB_SUCCESS;
1232 doc_id_t doc_id = enc->src_last_doc_id;
1233
1234 if (!enc->src_ilist_ptr) {
1235 enc->src_ilist_ptr = src_node->ilist;
1236 }
1237
1238 copied = enc->src_ilist_ptr - src_node->ilist;
1239
1240 /* While there is data in the source node and space to copy
1241 into in the destination node. */
1242 while (copied < src_node->ilist_size
1243 && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1244
1245 doc_id_t delta;
1246 doc_id_t del_doc_id = FTS_NULL_DOC_ID;
1247
1248 delta = fts_decode_vlc(&enc->src_ilist_ptr);
1249
1250 test_again:
1251 /* Check whether the doc id is in the delete list, if
1252 so then we skip the entries but we need to track the
1253 delta for decoding the entries following this document's
1254 entries. */
1255 if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1256 fts_update_t* update;
1257
1258 update = (fts_update_t*) ib_vector_get(
1259 del_vec, *del_pos);
1260
1261 del_doc_id = update->doc_id;
1262 }
1263
1264 if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1265 ut_a(delta == src_node->first_doc_id);
1266 }
1267
1268 doc_id += delta;
1269
1270 if (del_doc_id > 0 && doc_id == del_doc_id) {
1271
1272 ++*del_pos;
1273
1274 /* Skip the entries for this document. */
1275 while (*enc->src_ilist_ptr) {
1276 fts_decode_vlc(&enc->src_ilist_ptr);
1277 }
1278
1279 /* Skip the end of word position marker. */
1280 ++enc->src_ilist_ptr;
1281
1282 } else {
1283
1284 /* DOC ID already becomes larger than
1285 del_doc_id, check the next del_doc_id */
1286 if (del_doc_id > 0 && doc_id > del_doc_id) {
1287 del_doc_id = 0;
1288 ++*del_pos;
1289 delta = 0;
1290 goto test_again;
1291 }
1292
1293 /* Decode and copy the word positions into
1294 the dest node. */
1295 fts_optimize_encode_node(dst_node, doc_id, enc);
1296
1297 ++dst_node->doc_count;
1298
1299 ut_a(dst_node->last_doc_id == doc_id);
1300 }
1301
1302 /* Bytes copied so for from source. */
1303 copied = enc->src_ilist_ptr - src_node->ilist;
1304 }
1305
1306 if (copied >= src_node->ilist_size) {
1307 ut_a(doc_id == src_node->last_doc_id);
1308 }
1309
1310 enc->src_last_doc_id = doc_id;
1311
1312 return(error);
1313 }
1314
1315 /**********************************************************************//**
1316 Determine the starting pos within the deleted doc id vector for a word.
1317 @return delete position */
1318 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1319 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1320 fts_optimize_deleted_pos(
1321 /*=====================*/
1322 fts_optimize_t* optim, /*!< in: optimize state data */
1323 fts_word_t* word) /*!< in: the word data to check */
1324 {
1325 int del_pos;
1326 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1327
1328 /* Get the first and last dict ids for the word, we will use
1329 these values to determine which doc ids need to be removed
1330 when we coalesce the nodes. This way we can reduce the numer
1331 of elements that need to be searched in the deleted doc ids
1332 vector and secondly we can remove the doc ids during the
1333 coalescing phase. */
1334 if (ib_vector_size(del_vec) > 0) {
1335 fts_node_t* node;
1336 doc_id_t last_id;
1337 doc_id_t first_id;
1338 ulint size = ib_vector_size(word->nodes);
1339
1340 node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1341 first_id = node->first_doc_id;
1342
1343 node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1344 last_id = node->last_doc_id;
1345
1346 ut_a(first_id <= last_id);
1347
1348 del_pos = fts_optimize_lookup(
1349 del_vec, optim->del_pos, first_id, last_id);
1350 } else {
1351
1352 del_pos = -1; /* Note that there is nothing to delete. */
1353 }
1354
1355 return(del_pos);
1356 }
1357
1358 #define FTS_DEBUG_PRINT
1359 /**********************************************************************//**
1360 Compact the nodes for a word, we also remove any doc ids during the
1361 compaction pass.
1362 @return DB_SUCCESS or error code.*/
1363 static
1364 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1365 fts_optimize_word(
1366 /*==============*/
1367 fts_optimize_t* optim, /*!< in: optimize state data */
1368 fts_word_t* word) /*!< in: the word to optimize */
1369 {
1370 fts_encode_t enc;
1371 ib_vector_t* nodes;
1372 ulint i = 0;
1373 int del_pos;
1374 fts_node_t* dst_node = NULL;
1375 ib_vector_t* del_vec = optim->to_delete->doc_ids;
1376 ulint size = ib_vector_size(word->nodes);
1377
1378 del_pos = fts_optimize_deleted_pos(optim, word);
1379 nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1380
1381 enc.src_last_doc_id = 0;
1382 enc.src_ilist_ptr = NULL;
1383
1384 if (fts_enable_diag_print) {
1385 word->text.f_str[word->text.f_len] = 0;
1386 ib::info() << "FTS_OPTIMIZE: optimize \"" << word->text.f_str
1387 << "\"";
1388 }
1389
1390 while (i < size) {
1391 ulint copied;
1392 fts_node_t* src_node;
1393
1394 src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1395
1396 if (dst_node == NULL
1397 || dst_node->last_doc_id > src_node->first_doc_id) {
1398
1399 dst_node = static_cast<fts_node_t*>(
1400 ib_vector_push(nodes, NULL));
1401 memset(dst_node, 0, sizeof(*dst_node));
1402 }
1403
1404 /* Copy from the src to the dst node. */
1405 fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1406
1407 ut_a(enc.src_ilist_ptr != NULL);
1408
1409 /* Determine the numer of bytes copied to dst_node. */
1410 copied = enc.src_ilist_ptr - src_node->ilist;
1411
1412 /* Can't copy more than whats in the vlc array. */
1413 ut_a(copied <= src_node->ilist_size);
1414
1415 /* We are done with this node release the resources. */
1416 if (copied == src_node->ilist_size) {
1417
1418 enc.src_last_doc_id = 0;
1419 enc.src_ilist_ptr = NULL;
1420
1421 ut_free(src_node->ilist);
1422
1423 src_node->ilist = NULL;
1424 src_node->ilist_size = src_node->ilist_size_alloc = 0;
1425
1426 src_node = NULL;
1427
1428 ++i; /* Get next source node to OPTIMIZE. */
1429 }
1430
1431 if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1432
1433 dst_node = NULL;
1434 }
1435 }
1436
1437 /* All dst nodes created should have been added to the vector. */
1438 ut_a(dst_node == NULL);
1439
1440 /* Return the OPTIMIZED nodes. */
1441 return(nodes);
1442 }
1443
1444 /**********************************************************************//**
1445 Update the FTS index table. This is a delete followed by an insert.
1446 @return DB_SUCCESS or error code */
1447 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1448 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1449 fts_optimize_write_word(
1450 /*====================*/
1451 trx_t* trx, /*!< in: transaction */
1452 fts_table_t* fts_table, /*!< in: table of FTS index */
1453 fts_string_t* word, /*!< in: word data to write */
1454 ib_vector_t* nodes) /*!< in: the nodes to write */
1455 {
1456 ulint i;
1457 pars_info_t* info;
1458 que_t* graph;
1459 ulint selected;
1460 dberr_t error = DB_SUCCESS;
1461 char table_name[MAX_FULL_NAME_LEN];
1462
1463 info = pars_info_create();
1464
1465 ut_ad(fts_table->charset);
1466
1467 if (fts_enable_diag_print) {
1468 ib::info() << "FTS_OPTIMIZE: processed \"" << word->f_str
1469 << "\"";
1470 }
1471
1472 pars_info_bind_varchar_literal(
1473 info, "word", word->f_str, word->f_len);
1474
1475 selected = fts_select_index(fts_table->charset,
1476 word->f_str, word->f_len);
1477
1478 fts_table->suffix = fts_get_suffix(selected);
1479 fts_get_table_name(fts_table, table_name);
1480 pars_info_bind_id(info, true, "table_name", table_name);
1481
1482 graph = fts_parse_sql(
1483 fts_table,
1484 info,
1485 "BEGIN DELETE FROM $table_name WHERE word = :word;");
1486
1487 error = fts_eval_sql(trx, graph);
1488
1489 if (error != DB_SUCCESS) {
1490 ib::error() << "(" << ut_strerr(error) << ") during optimize,"
1491 " when deleting a word from the FTS index.";
1492 }
1493
1494 fts_que_graph_free(graph);
1495 graph = NULL;
1496
1497 /* Even if the operation needs to be rolled back and redone,
1498 we iterate over the nodes in order to free the ilist. */
1499 for (i = 0; i < ib_vector_size(nodes); ++i) {
1500
1501 fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1502
1503 if (error == DB_SUCCESS) {
1504 /* Skip empty node. */
1505 if (node->ilist == NULL) {
1506 ut_ad(node->ilist_size == 0);
1507 continue;
1508 }
1509
1510 error = fts_write_node(
1511 trx, &graph, fts_table, word, node);
1512
1513 if (error != DB_SUCCESS) {
1514 ib::error() << "(" << ut_strerr(error) << ")"
1515 " during optimize, while adding a"
1516 " word to the FTS index.";
1517 }
1518 }
1519
1520 ut_free(node->ilist);
1521 node->ilist = NULL;
1522 node->ilist_size = node->ilist_size_alloc = 0;
1523 }
1524
1525 if (graph != NULL) {
1526 fts_que_graph_free(graph);
1527 }
1528
1529 return(error);
1530 }
1531
1532 /**********************************************************************//**
1533 Free fts_optimizer_word_t instanace.*/
1534 void
fts_word_free(fts_word_t * word)1535 fts_word_free(
1536 /*==========*/
1537 fts_word_t* word) /*!< in: instance to free.*/
1538 {
1539 mem_heap_t* heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1540
1541 #ifdef UNIV_DEBUG
1542 memset(word, 0, sizeof(*word));
1543 #endif /* UNIV_DEBUG */
1544
1545 mem_heap_free(heap);
1546 }
1547
1548 /**********************************************************************//**
1549 Optimize the word ilist and rewrite data to the FTS index.
1550 @return status one of RESTART, EXIT, ERROR */
1551 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1552 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,ib_time_monotonic_t start_time)1553 fts_optimize_compact(
1554 /*=================*/
1555 fts_optimize_t* optim, /*!< in: optimize state data */
1556 dict_index_t* index, /*!< in: current FTS being
1557 optimized */
1558 ib_time_monotonic_t start_time) /*!< in: optimize start time */
1559 {
1560 ulint i;
1561 dberr_t error = DB_SUCCESS;
1562 ulint size = ib_vector_size(optim->words);
1563
1564 for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1565 fts_word_t* word;
1566 ib_vector_t* nodes;
1567 trx_t* trx = optim->trx;
1568
1569 word = (fts_word_t*) ib_vector_get(optim->words, i);
1570
1571 /* nodes is allocated from the word heap and will be destroyed
1572 when the word is freed. We however have to be careful about
1573 the ilist, that needs to be freed explicitly. */
1574 nodes = fts_optimize_word(optim, word);
1575
1576 /* Update the data on disk. */
1577 error = fts_optimize_write_word(
1578 trx, &optim->fts_index_table, &word->text, nodes);
1579
1580 if (error == DB_SUCCESS) {
1581 /* Write the last word optimized to the config table,
1582 we use this value for restarting optimize. */
1583 error = fts_config_set_index_value(
1584 optim->trx, index,
1585 FTS_LAST_OPTIMIZED_WORD, &word->text);
1586 }
1587
1588 /* Free the word that was optimized. */
1589 fts_word_free(word);
1590
1591 if (fts_optimize_time_limit > 0
1592 && (ut_time_monotonic() - start_time) >
1593 fts_optimize_time_limit) {
1594
1595 optim->done = TRUE;
1596 }
1597 }
1598
1599 return(error);
1600 }
1601
1602 /**********************************************************************//**
1603 Create an instance of fts_optimize_t. Also create a new
1604 background transaction.*/
1605 static
1606 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1607 fts_optimize_create(
1608 /*================*/
1609 dict_table_t* table) /*!< in: table with FTS indexes */
1610 {
1611 fts_optimize_t* optim;
1612 mem_heap_t* heap = mem_heap_create(128);
1613
1614 optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1615
1616 optim->self_heap = ib_heap_allocator_create(heap);
1617
1618 optim->to_delete = fts_doc_ids_create();
1619
1620 optim->words = ib_vector_create(
1621 optim->self_heap, sizeof(fts_word_t), 256);
1622
1623 optim->table = table;
1624
1625 optim->trx = trx_allocate_for_background();
1626
1627 optim->fts_common_table.parent = table->name.m_name;
1628 optim->fts_common_table.table_id = table->id;
1629 optim->fts_common_table.type = FTS_COMMON_TABLE;
1630 optim->fts_common_table.table = table;
1631
1632 optim->fts_index_table.parent = table->name.m_name;
1633 optim->fts_index_table.table_id = table->id;
1634 optim->fts_index_table.type = FTS_INDEX_TABLE;
1635 optim->fts_index_table.table = table;
1636
1637 /* The common prefix for all this parent table's aux tables. */
1638 optim->name_prefix = fts_get_table_name_prefix(
1639 &optim->fts_common_table);
1640
1641 return(optim);
1642 }
1643
1644 #ifdef FTS_OPTIMIZE_DEBUG
1645 /**********************************************************************//**
1646 Get optimize start time of an FTS index.
1647 @return DB_SUCCESS if all OK else error code */
1648 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1649 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t * start_time)1650 fts_optimize_get_index_start_time(
1651 /*==============================*/
1652 trx_t* trx, /*!< in: transaction */
1653 dict_index_t* index, /*!< in: FTS index */
1654 ib_time_t* start_time) /*!< out: time in secs */
1655 {
1656 return(fts_config_get_index_ulint(
1657 trx, index, FTS_OPTIMIZE_START_TIME,
1658 (ulint*) start_time));
1659 }
1660
1661 /**********************************************************************//**
1662 Set the optimize start time of an FTS index.
1663 @return DB_SUCCESS if all OK else error code */
1664 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1665 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t start_time)1666 fts_optimize_set_index_start_time(
1667 /*==============================*/
1668 trx_t* trx, /*!< in: transaction */
1669 dict_index_t* index, /*!< in: FTS index */
1670 ib_time_t start_time) /*!< in: start time */
1671 {
1672 return(fts_config_set_index_ulint(
1673 trx, index, FTS_OPTIMIZE_START_TIME,
1674 (ulint) start_time));
1675 }
1676
1677 /**********************************************************************//**
1678 Get optimize end time of an FTS index.
1679 @return DB_SUCCESS if all OK else error code */
1680 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1681 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t * end_time)1682 fts_optimize_get_index_end_time(
1683 /*============================*/
1684 trx_t* trx, /*!< in: transaction */
1685 dict_index_t* index, /*!< in: FTS index */
1686 ib_time_t* end_time) /*!< out: time in secs */
1687 {
1688 return(fts_config_get_index_ulint(
1689 trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1690 }
1691
1692 /**********************************************************************//**
1693 Set the optimize end time of an FTS index.
1694 @return DB_SUCCESS if all OK else error code */
1695 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1696 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t end_time)1697 fts_optimize_set_index_end_time(
1698 /*============================*/
1699 trx_t* trx, /*!< in: transaction */
1700 dict_index_t* index, /*!< in: FTS index */
1701 ib_time_t end_time) /*!< in: end time */
1702 {
1703 return(fts_config_set_index_ulint(
1704 trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1705 }
1706 #endif
1707
1708 /**********************************************************************//**
1709 Free the optimize prepared statements.*/
1710 static
1711 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1712 fts_optimize_graph_free(
1713 /*====================*/
1714 fts_optimize_graph_t* graph) /*!< in/out: The graph instances
1715 to free */
1716 {
1717 if (graph->commit_graph) {
1718 que_graph_free(graph->commit_graph);
1719 graph->commit_graph = NULL;
1720 }
1721
1722 if (graph->write_nodes_graph) {
1723 que_graph_free(graph->write_nodes_graph);
1724 graph->write_nodes_graph = NULL;
1725 }
1726
1727 if (graph->delete_nodes_graph) {
1728 que_graph_free(graph->delete_nodes_graph);
1729 graph->delete_nodes_graph = NULL;
1730 }
1731
1732 if (graph->read_nodes_graph) {
1733 que_graph_free(graph->read_nodes_graph);
1734 graph->read_nodes_graph = NULL;
1735 }
1736 }
1737
1738 /**********************************************************************//**
1739 Free all optimize resources. */
1740 static
1741 void
fts_optimize_free(fts_optimize_t * optim)1742 fts_optimize_free(
1743 /*==============*/
1744 fts_optimize_t* optim) /*!< in: table with on FTS index */
1745 {
1746 mem_heap_t* heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1747
1748 trx_free_for_background(optim->trx);
1749
1750 fts_doc_ids_free(optim->to_delete);
1751 fts_optimize_graph_free(&optim->graph);
1752
1753 ut_free(optim->name_prefix);
1754
1755 /* This will free the heap from which optim itself was allocated. */
1756 mem_heap_free(heap);
1757 }
1758
1759 /**********************************************************************//**
1760 Get the max time optimize should run in millisecs.
1761 @return max optimize time limit in millisecs. */
1762 static
1763 ib_time_t
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1764 fts_optimize_get_time_limit(
1765 /*========================*/
1766 trx_t* trx, /*!< in: transaction */
1767 fts_table_t* fts_table) /*!< in: aux table */
1768 {
1769 ib_time_t time_limit = 0;
1770
1771 fts_config_get_ulint(
1772 trx, fts_table,
1773 FTS_OPTIMIZE_LIMIT_IN_SECS, (ulint*) &time_limit);
1774
1775 return(time_limit * 1000);
1776 }
1777
1778
1779 /**********************************************************************//**
1780 Run OPTIMIZE on the given table. Note: this can take a very long time
1781 (hours). */
1782 static
1783 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1784 fts_optimize_words(
1785 /*===============*/
1786 fts_optimize_t* optim, /*!< in: optimize instance */
1787 dict_index_t* index, /*!< in: current FTS being optimized */
1788 fts_string_t* word) /*!< in: the starting word to optimize */
1789 {
1790 fts_fetch_t fetch;
1791 ib_time_monotonic_t start_time;
1792 que_t* graph = NULL;
1793 CHARSET_INFO* charset = optim->fts_index_table.charset;
1794
1795 ut_a(!optim->done);
1796
1797 /* Get the time limit from the config table. */
1798 fts_optimize_time_limit = fts_optimize_get_time_limit(
1799 optim->trx, &optim->fts_common_table);
1800
1801 start_time = ut_time_monotonic();
1802
1803 /* Setup the callback to use for fetching the word ilist etc. */
1804 fetch.read_arg = optim->words;
1805 fetch.read_record = fts_optimize_index_fetch_node;
1806
1807 ib::info().write(word->f_str, word->f_len);
1808
1809 while (!optim->done) {
1810 dberr_t error;
1811 trx_t* trx = optim->trx;
1812 ulint selected;
1813
1814 ut_a(ib_vector_size(optim->words) == 0);
1815
1816 selected = fts_select_index(charset, word->f_str, word->f_len);
1817
1818 /* Read the index records to optimize. */
1819 fetch.total_memory = 0;
1820 error = fts_index_fetch_nodes(
1821 trx, &graph, &optim->fts_index_table, word,
1822 &fetch);
1823 ut_ad(fetch.total_memory < fts_result_cache_limit);
1824
1825 if (error == DB_SUCCESS) {
1826 /* There must be some nodes to read. */
1827 ut_a(ib_vector_size(optim->words) > 0);
1828
1829 /* Optimize the nodes that were read and write
1830 back to DB. */
1831 error = fts_optimize_compact(optim, index, start_time);
1832
1833 if (error == DB_SUCCESS) {
1834 fts_sql_commit(optim->trx);
1835 } else {
1836 fts_sql_rollback(optim->trx);
1837 }
1838 }
1839
1840 ib_vector_reset(optim->words);
1841
1842 if (error == DB_SUCCESS) {
1843 if (!optim->done) {
1844 if (!fts_zip_read_word(optim->zip, word)) {
1845 optim->done = TRUE;
1846 } else if (selected
1847 != fts_select_index(
1848 charset, word->f_str,
1849 word->f_len)
1850 && graph) {
1851 fts_que_graph_free(graph);
1852 graph = NULL;
1853 }
1854 }
1855 } else if (error == DB_LOCK_WAIT_TIMEOUT) {
1856 ib::warn() << "Lock wait timeout during optimize."
1857 " Retrying!";
1858
1859 trx->error_state = DB_SUCCESS;
1860 } else if (error == DB_DEADLOCK) {
1861 ib::warn() << "Deadlock during optimize. Retrying!";
1862
1863 trx->error_state = DB_SUCCESS;
1864 } else {
1865 optim->done = TRUE; /* Exit the loop. */
1866 }
1867 }
1868
1869 if (graph != NULL) {
1870 fts_que_graph_free(graph);
1871 }
1872 }
1873
1874 /**********************************************************************//**
1875 Optimize is complete. Set the completion time, and reset the optimize
1876 start string for this FTS index to "".
1877 @return DB_SUCCESS if all OK */
1878 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1879 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1880 fts_optimize_index_completed(
1881 /*=========================*/
1882 fts_optimize_t* optim, /*!< in: optimize instance */
1883 dict_index_t* index) /*!< in: table with one FTS index */
1884 {
1885 fts_string_t word;
1886 dberr_t error;
1887 byte buf[sizeof(ulint)];
1888 #ifdef FTS_OPTIMIZE_DEBUG
1889 ib_time_t end_time = ut_time();
1890
1891 error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1892 #endif
1893
1894 /* If we've reached the end of the index then set the start
1895 word to the empty string. */
1896
1897 word.f_len = 0;
1898 word.f_str = buf;
1899 *word.f_str = '\0';
1900
1901 error = fts_config_set_index_value(
1902 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1903
1904 if (error != DB_SUCCESS) {
1905
1906 ib::error() << "(" << ut_strerr(error) << ") while updating"
1907 " last optimized word!";
1908 }
1909
1910 return(error);
1911 }
1912
1913
1914 /**********************************************************************//**
1915 Read the list of words from the FTS auxiliary index that will be
1916 optimized in this pass.
1917 @return DB_SUCCESS if all OK */
1918 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1919 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1920 fts_optimize_index_read_words(
1921 /*==========================*/
1922 fts_optimize_t* optim, /*!< in: optimize instance */
1923 dict_index_t* index, /*!< in: table with one FTS index */
1924 fts_string_t* word) /*!< in: buffer to use */
1925 {
1926 dberr_t error = DB_SUCCESS;
1927
1928 if (optim->del_list_regenerated) {
1929 word->f_len = 0;
1930 } else {
1931
1932 /* Get the last word that was optimized from
1933 the config table. */
1934 error = fts_config_get_index_value(
1935 optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1936 }
1937
1938 /* If record not found then we start from the top. */
1939 if (error == DB_RECORD_NOT_FOUND) {
1940 word->f_len = 0;
1941 error = DB_SUCCESS;
1942 }
1943
1944 while (error == DB_SUCCESS) {
1945
1946 error = fts_index_fetch_words(
1947 optim, word, fts_num_word_optimize);
1948
1949 if (error == DB_SUCCESS) {
1950 /* Reset the last optimized word to '' if no
1951 more words could be read from the FTS index. */
1952 if (optim->zip->n_words == 0) {
1953 word->f_len = 0;
1954 *word->f_str = 0;
1955 }
1956
1957 break;
1958 }
1959 }
1960
1961 return(error);
1962 }
1963
1964 /**********************************************************************//**
1965 Run OPTIMIZE on the given FTS index. Note: this can take a very long
1966 time (hours).
1967 @return DB_SUCCESS if all OK */
1968 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1969 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)1970 fts_optimize_index(
1971 /*===============*/
1972 fts_optimize_t* optim, /*!< in: optimize instance */
1973 dict_index_t* index) /*!< in: table with one FTS index */
1974 {
1975 fts_string_t word;
1976 dberr_t error;
1977 byte str[FTS_MAX_WORD_LEN + 1];
1978
1979 /* Set the current index that we have to optimize. */
1980 optim->fts_index_table.index_id = index->id;
1981 optim->fts_index_table.charset = fts_index_get_charset(index);
1982
1983 optim->done = FALSE; /* Optimize until !done */
1984
1985 /* We need to read the last word optimized so that we start from
1986 the next word. */
1987 word.f_str = str;
1988
1989 /* We set the length of word to the size of str since we
1990 need to pass the max len info to the fts_get_config_value() function. */
1991 word.f_len = sizeof(str) - 1;
1992
1993 memset(word.f_str, 0x0, word.f_len);
1994
1995 /* Read the words that will be optimized in this pass. */
1996 error = fts_optimize_index_read_words(optim, index, &word);
1997
1998 if (error == DB_SUCCESS) {
1999 int zip_error;
2000
2001 ut_a(optim->zip->pos == 0);
2002 ut_a(optim->zip->zp->total_in == 0);
2003 ut_a(optim->zip->zp->total_out == 0);
2004
2005 zip_error = inflateInit(optim->zip->zp);
2006 ut_a(zip_error == Z_OK);
2007
2008 word.f_len = 0;
2009 word.f_str = str;
2010
2011 /* Read the first word to optimize from the Zip buffer. */
2012 if (!fts_zip_read_word(optim->zip, &word)) {
2013
2014 optim->done = TRUE;
2015 } else {
2016 fts_optimize_words(optim, index, &word);
2017 }
2018
2019 /* If we couldn't read any records then optimize is
2020 complete. Increment the number of indexes that have
2021 been optimized and set FTS index optimize state to
2022 completed. */
2023 if (error == DB_SUCCESS && optim->zip->n_words == 0) {
2024
2025 error = fts_optimize_index_completed(optim, index);
2026
2027 if (error == DB_SUCCESS) {
2028 ++optim->n_completed;
2029 }
2030 }
2031 }
2032
2033 return(error);
2034 }
2035
2036 /**********************************************************************//**
2037 Delete the document ids in the delete, and delete cache tables.
2038 @return DB_SUCCESS if all OK */
2039 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2040 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2041 fts_optimize_purge_deleted_doc_ids(
2042 /*===============================*/
2043 fts_optimize_t* optim) /*!< in: optimize instance */
2044 {
2045 ulint i;
2046 pars_info_t* info;
2047 que_t* graph;
2048 fts_update_t* update;
2049 doc_id_t write_doc_id;
2050 dberr_t error = DB_SUCCESS;
2051 char deleted[MAX_FULL_NAME_LEN];
2052 char deleted_cache[MAX_FULL_NAME_LEN];
2053
2054 info = pars_info_create();
2055
2056 ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2057
2058 update = static_cast<fts_update_t*>(
2059 ib_vector_get(optim->to_delete->doc_ids, 0));
2060
2061 /* Convert to "storage" byte order. */
2062 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2063
2064 /* This is required for the SQL parser to work. It must be able
2065 to find the following variables. So we do it twice. */
2066 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2067 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2068
2069 /* Make sure the following two names are consistent with the name
2070 used in the fts_delete_doc_ids_sql */
2071 optim->fts_common_table.suffix = fts_common_tables[3];
2072 fts_get_table_name(&optim->fts_common_table, deleted);
2073 pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2074
2075 optim->fts_common_table.suffix = fts_common_tables[4];
2076 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2077 pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2078
2079 graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
2080
2081 /* Delete the doc ids that were copied at the start. */
2082 for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2083
2084 update = static_cast<fts_update_t*>(ib_vector_get(
2085 optim->to_delete->doc_ids, i));
2086
2087 /* Convert to "storage" byte order. */
2088 fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2089
2090 fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2091
2092 fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2093
2094 error = fts_eval_sql(optim->trx, graph);
2095
2096 // FIXME: Check whether delete actually succeeded!
2097 if (error != DB_SUCCESS) {
2098
2099 fts_sql_rollback(optim->trx);
2100 break;
2101 }
2102 }
2103
2104 fts_que_graph_free(graph);
2105
2106 return(error);
2107 }
2108
2109 /**********************************************************************//**
2110 Delete the document ids in the pending delete, and delete tables.
2111 @return DB_SUCCESS if all OK */
2112 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2113 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2114 fts_optimize_purge_deleted_doc_id_snapshot(
2115 /*=======================================*/
2116 fts_optimize_t* optim) /*!< in: optimize instance */
2117 {
2118 dberr_t error;
2119 que_t* graph;
2120 pars_info_t* info;
2121 char being_deleted[MAX_FULL_NAME_LEN];
2122 char being_deleted_cache[MAX_FULL_NAME_LEN];
2123
2124 info = pars_info_create();
2125
2126 /* Make sure the following two names are consistent with the name
2127 used in the fts_end_delete_sql */
2128 optim->fts_common_table.suffix = fts_common_tables[0];
2129 fts_get_table_name(&optim->fts_common_table, being_deleted);
2130 pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2131
2132 optim->fts_common_table.suffix = fts_common_tables[1];
2133 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2134 pars_info_bind_id(info, true, fts_common_tables[1],
2135 being_deleted_cache);
2136
2137 /* Delete the doc ids that were copied to delete pending state at
2138 the start of optimize. */
2139 graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
2140
2141 error = fts_eval_sql(optim->trx, graph);
2142 fts_que_graph_free(graph);
2143
2144 return(error);
2145 }
2146
2147 /**********************************************************************//**
2148 Copy the deleted doc ids that will be purged during this optimize run
2149 to the being deleted FTS auxiliary tables. The transaction is committed
2150 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2151 @return DB_SUCCESS if all OK */
2152 static
2153 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2154 fts_optimize_being_deleted_count(
2155 /*=============================*/
2156 fts_optimize_t* optim) /*!< in: optimize instance */
2157 {
2158 fts_table_t fts_table;
2159
2160 FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2161 optim->table);
2162
2163 return(fts_get_rows_count(&fts_table));
2164 }
2165
2166 /*********************************************************************//**
2167 Copy the deleted doc ids that will be purged during this optimize run
2168 to the being deleted FTS auxiliary tables. The transaction is committed
2169 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2170 @return DB_SUCCESS if all OK */
2171 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2172 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2173 fts_optimize_create_deleted_doc_id_snapshot(
2174 /*========================================*/
2175 fts_optimize_t* optim) /*!< in: optimize instance */
2176 {
2177 dberr_t error;
2178 que_t* graph;
2179 pars_info_t* info;
2180 char being_deleted[MAX_FULL_NAME_LEN];
2181 char deleted[MAX_FULL_NAME_LEN];
2182 char being_deleted_cache[MAX_FULL_NAME_LEN];
2183 char deleted_cache[MAX_FULL_NAME_LEN];
2184
2185 info = pars_info_create();
2186
2187 /* Make sure the following four names are consistent with the name
2188 used in the fts_init_delete_sql */
2189 optim->fts_common_table.suffix = fts_common_tables[0];
2190 fts_get_table_name(&optim->fts_common_table, being_deleted);
2191 pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2192
2193 optim->fts_common_table.suffix = fts_common_tables[3];
2194 fts_get_table_name(&optim->fts_common_table, deleted);
2195 pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2196
2197 optim->fts_common_table.suffix = fts_common_tables[1];
2198 fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2199 pars_info_bind_id(info, true, fts_common_tables[1],
2200 being_deleted_cache);
2201
2202 optim->fts_common_table.suffix = fts_common_tables[4];
2203 fts_get_table_name(&optim->fts_common_table, deleted_cache);
2204 pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2205
2206 /* Move doc_ids that are to be deleted to state being deleted. */
2207 graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
2208
2209 error = fts_eval_sql(optim->trx, graph);
2210
2211 fts_que_graph_free(graph);
2212
2213 if (error != DB_SUCCESS) {
2214 fts_sql_rollback(optim->trx);
2215 } else {
2216 fts_sql_commit(optim->trx);
2217 }
2218
2219 optim->del_list_regenerated = TRUE;
2220
2221 return(error);
2222 }
2223
2224 /*********************************************************************//**
2225 Read in the document ids that are to be purged during optimize. The
2226 transaction is committed upon successfully read.
2227 @return DB_SUCCESS if all OK */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2230 fts_optimize_read_deleted_doc_id_snapshot(
2231 /*======================================*/
2232 fts_optimize_t* optim) /*!< in: optimize instance */
2233 {
2234 dberr_t error;
2235
2236 optim->fts_common_table.suffix = "BEING_DELETED";
2237
2238 /* Read the doc_ids to delete. */
2239 error = fts_table_fetch_doc_ids(
2240 optim->trx, &optim->fts_common_table, optim->to_delete);
2241
2242 if (error == DB_SUCCESS) {
2243
2244 optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2245
2246 /* Read additional doc_ids to delete. */
2247 error = fts_table_fetch_doc_ids(
2248 optim->trx, &optim->fts_common_table, optim->to_delete);
2249 }
2250
2251 if (error != DB_SUCCESS) {
2252
2253 fts_doc_ids_free(optim->to_delete);
2254 optim->to_delete = NULL;
2255 }
2256
2257 return(error);
2258 }
2259
2260 /*********************************************************************//**
2261 Optimze all the FTS indexes, skipping those that have already been
2262 optimized, since the FTS auxiliary indexes are not guaranteed to be
2263 of the same cardinality.
2264 @return DB_SUCCESS if all OK */
2265 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2266 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2267 fts_optimize_indexes(
2268 /*=================*/
2269 fts_optimize_t* optim) /*!< in: optimize instance */
2270 {
2271 ulint i;
2272 dberr_t error = DB_SUCCESS;
2273 fts_t* fts = optim->table->fts;
2274
2275 /* Optimize the FTS indexes. */
2276 for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2277 dict_index_t* index;
2278
2279 #ifdef FTS_OPTIMIZE_DEBUG
2280 ib_time_t end_time;
2281 ib_time_t start_time;
2282
2283 /* Get the start and end optimize times for this index. */
2284 error = fts_optimize_get_index_start_time(
2285 optim->trx, index, &start_time);
2286
2287 if (error != DB_SUCCESS) {
2288 break;
2289 }
2290
2291 error = fts_optimize_get_index_end_time(
2292 optim->trx, index, &end_time);
2293
2294 if (error != DB_SUCCESS) {
2295 break;
2296 }
2297
2298 /* Start time will be 0 only for the first time or after
2299 completing the optimization of all FTS indexes. */
2300 if (start_time == 0) {
2301 start_time = ut_time();
2302
2303 error = fts_optimize_set_index_start_time(
2304 optim->trx, index, start_time);
2305 }
2306
2307 /* Check if this index needs to be optimized or not. */
2308 if (ut_difftime(end_time, start_time) < 0) {
2309 error = fts_optimize_index(optim, index);
2310
2311 if (error != DB_SUCCESS) {
2312 break;
2313 }
2314 } else {
2315 ++optim->n_completed;
2316 }
2317 #endif
2318 index = static_cast<dict_index_t*>(
2319 ib_vector_getp(fts->indexes, i));
2320 error = fts_optimize_index(optim, index);
2321 }
2322
2323 if (error == DB_SUCCESS) {
2324 fts_sql_commit(optim->trx);
2325 } else {
2326 fts_sql_rollback(optim->trx);
2327 }
2328
2329 return(error);
2330 }
2331
2332 /*********************************************************************//**
2333 Cleanup the snapshot tables and the master deleted table.
2334 @return DB_SUCCESS if all OK */
2335 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2336 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2337 fts_optimize_purge_snapshot(
2338 /*========================*/
2339 fts_optimize_t* optim) /*!< in: optimize instance */
2340 {
2341 dberr_t error;
2342
2343 /* Delete the doc ids from the master deleted tables, that were
2344 in the snapshot that was taken at the start of optimize. */
2345 error = fts_optimize_purge_deleted_doc_ids(optim);
2346
2347 if (error == DB_SUCCESS) {
2348 /* Destroy the deleted doc id snapshot. */
2349 error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2350 }
2351
2352 if (error == DB_SUCCESS) {
2353 fts_sql_commit(optim->trx);
2354 } else {
2355 fts_sql_rollback(optim->trx);
2356 }
2357
2358 return(error);
2359 }
2360
2361 /*********************************************************************//**
2362 Reset the start time to 0 so that a new optimize can be started.
2363 @return DB_SUCCESS if all OK */
2364 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2365 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2366 fts_optimize_reset_start_time(
2367 /*==========================*/
2368 fts_optimize_t* optim) /*!< in: optimize instance */
2369 {
2370 dberr_t error = DB_SUCCESS;
2371 #ifdef FTS_OPTIMIZE_DEBUG
2372 fts_t* fts = optim->table->fts;
2373
2374 /* Optimization should have been completed for all indexes. */
2375 ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2376
2377 for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2378 dict_index_t* index;
2379
2380 ib_time_t start_time = 0;
2381
2382 /* Reset the start time to 0 for this index. */
2383 error = fts_optimize_set_index_start_time(
2384 optim->trx, index, start_time);
2385
2386 index = static_cast<dict_index_t*>(
2387 ib_vector_getp(fts->indexes, i));
2388 }
2389 #endif
2390
2391 if (error == DB_SUCCESS) {
2392 fts_sql_commit(optim->trx);
2393 } else {
2394 fts_sql_rollback(optim->trx);
2395 }
2396
2397 return(error);
2398 }
2399
2400 /*********************************************************************//**
2401 Run OPTIMIZE on the given table by a background thread.
2402 @return DB_SUCCESS if all OK */
2403 static MY_ATTRIBUTE((nonnull))
2404 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2405 fts_optimize_table_bk(
2406 /*==================*/
2407 fts_slot_t* slot) /*!< in: table to optimiza */
2408 {
2409 dberr_t error;
2410 dict_table_t* table = slot->table;
2411 fts_t* fts = table->fts;
2412
2413 /* Avoid optimizing tables that were optimized recently. */
2414 if (slot->last_run > 0
2415 && (ut_time_monotonic() - slot->last_run) < slot->interval_time) {
2416
2417 return(DB_SUCCESS);
2418
2419 } else if (fts && fts->cache
2420 && fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2421
2422 error = fts_optimize_table(table);
2423
2424 if (error == DB_SUCCESS) {
2425 slot->state = FTS_STATE_DONE;
2426 slot->last_run = 0;
2427 slot->completed = ut_time_monotonic();
2428 }
2429 } else {
2430 error = DB_SUCCESS;
2431 }
2432
2433 /* Note time this run completed. */
2434 slot->last_run = ut_time_monotonic();
2435
2436 return(error);
2437 }
2438 /*********************************************************************//**
2439 Run OPTIMIZE on the given table.
2440 @return DB_SUCCESS if all OK */
2441 dberr_t
fts_optimize_table(dict_table_t * table)2442 fts_optimize_table(
2443 /*===============*/
2444 dict_table_t* table) /*!< in: table to optimiza */
2445 {
2446 dberr_t error = DB_SUCCESS;
2447 fts_optimize_t* optim = NULL;
2448 fts_t* fts = table->fts;
2449
2450 if (fts_enable_diag_print) {
2451 ib::info() << "FTS start optimize " << table->name;
2452 }
2453
2454 optim = fts_optimize_create(table);
2455
2456 // FIXME: Call this only at the start of optimize, currently we
2457 // rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2458
2459 /* Check whether there are still records in BEING_DELETED table */
2460 if (fts_optimize_being_deleted_count(optim) == 0) {
2461 /* Take a snapshot of the deleted document ids, they are copied
2462 to the BEING_ tables. */
2463 error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2464 }
2465
2466 /* A duplicate error is OK, since we don't erase the
2467 doc ids from the being deleted state until all FTS
2468 indexes have been optimized. */
2469 if (error == DB_DUPLICATE_KEY) {
2470 error = DB_SUCCESS;
2471 }
2472
2473 if (error == DB_SUCCESS) {
2474
2475 /* These document ids will be filtered out during the
2476 index optimization phase. They are in the snapshot that we
2477 took above, at the start of the optimize. */
2478 error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2479
2480 if (error == DB_SUCCESS) {
2481
2482 /* Commit the read of being deleted
2483 doc ids transaction. */
2484 fts_sql_commit(optim->trx);
2485
2486 /* We would do optimization only if there
2487 are deleted records to be cleaned up */
2488 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2489 error = fts_optimize_indexes(optim);
2490 }
2491
2492 } else {
2493 ut_a(optim->to_delete == NULL);
2494 }
2495
2496 /* Only after all indexes have been optimized can we
2497 delete the (snapshot) doc ids in the pending delete,
2498 and master deleted tables. */
2499 if (error == DB_SUCCESS
2500 && optim->n_completed == ib_vector_size(fts->indexes)) {
2501
2502 if (fts_enable_diag_print) {
2503 ib::info() << "FTS_OPTIMIZE: Completed"
2504 " Optimize, cleanup DELETED table";
2505 }
2506
2507 if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2508
2509 /* Purge the doc ids that were in the
2510 snapshot from the snapshot tables and
2511 the master deleted table. */
2512 error = fts_optimize_purge_snapshot(optim);
2513 }
2514
2515 if (error == DB_SUCCESS) {
2516 /* Reset the start time of all the FTS indexes
2517 so that optimize can be restarted. */
2518 error = fts_optimize_reset_start_time(optim);
2519 }
2520 }
2521 }
2522
2523 fts_optimize_free(optim);
2524
2525 if (fts_enable_diag_print) {
2526 ib::info() << "FTS end optimize " << table->name;
2527 }
2528
2529 return(error);
2530 }
2531
2532 /********************************************************************//**
2533 Add the table to add to the OPTIMIZER's list.
2534 @return new message instance */
2535 static
2536 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2537 fts_optimize_create_msg(
2538 /*====================*/
2539 fts_msg_type_t type, /*!< in: type of message */
2540 void* ptr) /*!< in: message payload */
2541 {
2542 mem_heap_t* heap;
2543 fts_msg_t* msg;
2544
2545 heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2546 msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2547
2548 msg->ptr = ptr;
2549 msg->type = type;
2550 msg->heap = heap;
2551
2552 return(msg);
2553 }
2554
2555 /** Add the table to add to the OPTIMIZER's list.
2556 @param[in] table table to add */
2557 void
fts_optimize_add_table(dict_table_t * table)2558 fts_optimize_add_table(
2559 dict_table_t* table)
2560 {
2561 fts_msg_t* msg;
2562
2563 if (!fts_optimize_is_init()) {
2564 return;
2565 }
2566
2567 /* Make sure table with FTS index cannot be evicted */
2568 dict_table_prevent_eviction(table);
2569
2570 msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2571
2572 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2573 }
2574
2575 /**********************************************************************//**
2576 Remove the table from the OPTIMIZER's list. We do wait for
2577 acknowledgement from the consumer of the message. */
2578 void
fts_optimize_remove_table(dict_table_t * table)2579 fts_optimize_remove_table(
2580 /*======================*/
2581 dict_table_t* table) /*!< in: table to remove */
2582 {
2583 fts_msg_t* msg;
2584 os_event_t event;
2585 fts_msg_del_t* remove;
2586
2587 /* if the optimize system not yet initialized, return */
2588 if (!fts_optimize_is_init()) {
2589 return;
2590 }
2591
2592 /* FTS optimizer thread is already exited */
2593 if (fts_opt_start_shutdown) {
2594 ib::info() << "Try to remove table " << table->name
2595 << " after FTS optimize thread exiting.";
2596 return;
2597 }
2598
2599 msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2600
2601 /* We will wait on this event until signalled by the consumer. */
2602 event = os_event_create(0);
2603
2604 remove = static_cast<fts_msg_del_t*>(
2605 mem_heap_alloc(msg->heap, sizeof(*remove)));
2606
2607 remove->table = table;
2608 remove->event = event;
2609 msg->ptr = remove;
2610
2611 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2612
2613 os_event_wait(event);
2614
2615 os_event_destroy(event);
2616 }
2617
2618 /** Send sync fts cache for the table.
2619 @param[in] table table to sync */
2620 void
fts_optimize_request_sync_table(dict_table_t * table)2621 fts_optimize_request_sync_table(
2622 dict_table_t* table)
2623 {
2624 fts_msg_t* msg;
2625 table_id_t* table_id;
2626
2627 /* if the optimize system not yet initialized, return */
2628 if (!fts_optimize_is_init()) {
2629 return;
2630 }
2631
2632 /* FTS optimizer thread is already exited */
2633 if (fts_opt_start_shutdown) {
2634 ib::info() << "Try to sync table " << table->name
2635 << " after FTS optimize thread exiting.";
2636 return;
2637 }
2638
2639 msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
2640
2641 table_id = static_cast<table_id_t*>(
2642 mem_heap_alloc(msg->heap, sizeof(table_id_t)));
2643 *table_id = table->id;
2644 msg->ptr = table_id;
2645
2646 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2647 DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
2648 if (ib_wqueue_get_count(fts_optimize_wq) > 1000) {
2649 DBUG_SUICIDE(); });
2650 }
2651
2652 /**********************************************************************//**
2653 Add the table to the vector if it doesn't already exist. */
2654 static
2655 ibool
fts_optimize_new_table(ib_vector_t * tables,dict_table_t * table)2656 fts_optimize_new_table(
2657 /*===================*/
2658 ib_vector_t* tables, /*!< in/out: vector of tables */
2659 dict_table_t* table) /*!< in: table to add */
2660 {
2661 ulint i;
2662 fts_slot_t* slot;
2663 ulint empty_slot = ULINT_UNDEFINED;
2664
2665 /* Search for duplicates, also find a free slot if one exists. */
2666 for (i = 0; i < ib_vector_size(tables); ++i) {
2667
2668 slot = static_cast<fts_slot_t*>(
2669 ib_vector_get(tables, i));
2670
2671 if (slot->state == FTS_STATE_EMPTY) {
2672 empty_slot = i;
2673 } else if (slot->table == table) {
2674 /* Already exists in our optimize queue. */
2675 ut_ad(slot->table_id = table->id);
2676 return(FALSE);
2677 }
2678 }
2679
2680 /* Reuse old slot. */
2681 if (empty_slot != ULINT_UNDEFINED) {
2682
2683 slot = static_cast<fts_slot_t*>(
2684 ib_vector_get(tables, empty_slot));
2685
2686 ut_a(slot->state == FTS_STATE_EMPTY);
2687
2688 } else { /* Create a new slot. */
2689
2690 slot = static_cast<fts_slot_t*>(ib_vector_push(tables, NULL));
2691 }
2692
2693 memset(slot, 0x0, sizeof(*slot));
2694
2695 slot->table = table;
2696 slot->table_id = table->id;
2697 slot->state = FTS_STATE_LOADED;
2698 slot->interval_time = FTS_OPTIMIZE_INTERVAL_IN_SECS;
2699
2700 return(TRUE);
2701 }
2702
2703 /**********************************************************************//**
2704 Remove the table from the vector if it exists. */
2705 static
2706 ibool
fts_optimize_del_table(ib_vector_t * tables,fts_msg_del_t * msg)2707 fts_optimize_del_table(
2708 /*===================*/
2709 ib_vector_t* tables, /*!< in/out: vector of tables */
2710 fts_msg_del_t* msg) /*!< in: table to delete */
2711 {
2712 ulint i;
2713 dict_table_t* table = msg->table;
2714
2715 for (i = 0; i < ib_vector_size(tables); ++i) {
2716 fts_slot_t* slot;
2717
2718 slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2719
2720 if (slot->state != FTS_STATE_EMPTY
2721 && slot->table == table) {
2722
2723 if (fts_enable_diag_print) {
2724 ib::info() << "FTS Optimize Removing table "
2725 << table->name;
2726 }
2727
2728 slot->table = NULL;
2729 slot->state = FTS_STATE_EMPTY;
2730
2731 return(TRUE);
2732 }
2733 }
2734
2735 return(FALSE);
2736 }
2737
2738 /**********************************************************************//**
2739 Calculate how many of the registered tables need to be optimized.
2740 @return no. of tables to optimize */
2741 static
2742 ulint
fts_optimize_how_many(const ib_vector_t * tables)2743 fts_optimize_how_many(
2744 /*==================*/
2745 const ib_vector_t* tables) /*!< in: registered tables
2746 vector*/
2747 {
2748 ulint i;
2749 ib_time_monotonic_t delta;
2750 ulint n_tables = 0;
2751 ib_time_monotonic_t current_time;
2752
2753 current_time = ut_time_monotonic();
2754
2755 for (i = 0; i < ib_vector_size(tables); ++i) {
2756 const fts_slot_t* slot;
2757
2758 slot = static_cast<const fts_slot_t*>(
2759 ib_vector_get_const(tables, i));
2760
2761 switch (slot->state) {
2762 case FTS_STATE_DONE:
2763 case FTS_STATE_LOADED:
2764 ut_a(slot->completed <= current_time);
2765
2766 delta = current_time - slot->completed;
2767
2768 /* Skip slots that have been optimized recently. */
2769 if (delta >= slot->interval_time) {
2770 ++n_tables;
2771 }
2772 break;
2773
2774 case FTS_STATE_RUNNING:
2775 ut_a(slot->last_run <= current_time);
2776
2777 delta = current_time - slot->last_run;
2778
2779 if (delta > slot->interval_time) {
2780 ++n_tables;
2781 }
2782 break;
2783
2784 /* Slots in a state other than the above
2785 are ignored. */
2786 case FTS_STATE_EMPTY:
2787 case FTS_STATE_SUSPENDED:
2788 break;
2789 }
2790
2791 }
2792
2793 return(n_tables);
2794 }
2795
2796 /**********************************************************************//**
2797 Check if the total memory used by all FTS table exceeds the maximum limit.
2798 @return true if a sync is needed, false otherwise */
2799 static
2800 bool
fts_is_sync_needed(const ib_vector_t * tables)2801 fts_is_sync_needed(
2802 /*===============*/
2803 const ib_vector_t* tables) /*!< in: registered tables
2804 vector*/
2805 {
2806 ulint total_memory = 0;
2807 uint64_t time_diff = ut_time_monotonic() - last_check_sync_time;
2808
2809 if (fts_need_sync || time_diff < 5) {
2810 return(false);
2811 }
2812
2813 last_check_sync_time = ut_time_monotonic();
2814
2815 for (ulint i = 0; i < ib_vector_size(tables); ++i) {
2816 const fts_slot_t* slot;
2817
2818 slot = static_cast<const fts_slot_t*>(
2819 ib_vector_get_const(tables, i));
2820
2821 if (slot->state != FTS_STATE_EMPTY && slot->table
2822 && slot->table->fts && slot->table->fts->cache) {
2823 total_memory += slot->table->fts->cache->total_size;
2824 }
2825
2826 if (total_memory > fts_max_total_cache_size) {
2827 return(true);
2828 }
2829 }
2830
2831 return(false);
2832 }
2833
2834 /** Sync fts cache of a table
2835 @param[in] table_id table id */
2836 void
fts_optimize_sync_table(table_id_t table_id)2837 fts_optimize_sync_table(
2838 table_id_t table_id)
2839 {
2840 dict_table_t* table = NULL;
2841
2842 table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL);
2843
2844 if (table) {
2845 if (dict_table_has_fts_index(table) && table->fts->cache) {
2846 fts_sync_table(table, true, false, false);
2847 }
2848
2849 dict_table_close(table, FALSE, FALSE);
2850 }
2851 }
2852
2853 /**********************************************************************//**
2854 Optimize all FTS tables.
2855 @return Dummy return */
2856 os_thread_ret_t
fts_optimize_thread(void * arg)2857 fts_optimize_thread(
2858 /*================*/
2859 void* arg) /*!< in: work queue*/
2860 {
2861 ulint current = 0;
2862 ibool done = FALSE;
2863 ulint n_tables = 0;
2864 ulint n_optimize = 0;
2865 ib_wqueue_t* wq = (ib_wqueue_t*) arg;
2866
2867 ut_ad(!srv_read_only_mode);
2868 my_thread_init();
2869
2870 ut_ad(fts_slots);
2871
2872 /* Assign number of tables added in fts_slots_t to n_tables */
2873 n_tables = ib_vector_size(fts_slots);
2874
2875 while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
2876
2877 /* If there is no message in the queue and we have tables
2878 to optimize then optimize the tables. */
2879
2880 if (!done
2881 && ib_wqueue_is_empty(wq)
2882 && n_tables > 0
2883 && n_optimize > 0) {
2884
2885 fts_slot_t* slot;
2886
2887 ut_a(ib_vector_size(fts_slots) > 0);
2888
2889 slot = static_cast<fts_slot_t*>(
2890 ib_vector_get(fts_slots, current));
2891
2892 /* Handle the case of empty slots. */
2893 if (slot->state != FTS_STATE_EMPTY) {
2894
2895 slot->state = FTS_STATE_RUNNING;
2896
2897 fts_optimize_table_bk(slot);
2898 }
2899
2900 ++current;
2901
2902 /* Wrap around the counter. */
2903 if (current >= ib_vector_size(fts_slots)) {
2904 n_optimize = fts_optimize_how_many(fts_slots);
2905
2906 current = 0;
2907 }
2908
2909 } else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
2910 fts_msg_t* msg;
2911
2912 msg = static_cast<fts_msg_t*>(
2913 ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS));
2914
2915 /* Timeout ? */
2916 if (msg == NULL) {
2917 if (fts_is_sync_needed(fts_slots)) {
2918 fts_need_sync = true;
2919 }
2920
2921 continue;
2922 }
2923
2924 switch (msg->type) {
2925 case FTS_MSG_STOP:
2926 done = TRUE;
2927 break;
2928
2929 case FTS_MSG_ADD_TABLE:
2930 ut_a(!done);
2931 if (fts_optimize_new_table(
2932 fts_slots,
2933 static_cast<dict_table_t*>(msg->ptr))) {
2934 ++n_tables;
2935 }
2936 break;
2937
2938 case FTS_MSG_DEL_TABLE:
2939 if (fts_optimize_del_table(
2940 fts_slots, static_cast<fts_msg_del_t*>(msg->ptr))) {
2941 --n_tables;
2942 }
2943
2944 /* Signal the producer that we have
2945 removed the table. */
2946 os_event_set(((fts_msg_del_t*) msg->ptr)->event);
2947 break;
2948
2949 case FTS_MSG_SYNC_TABLE:
2950 DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep",
2951 os_thread_sleep(300000);
2952 );
2953
2954 fts_optimize_sync_table(
2955 *static_cast<table_id_t*>(msg->ptr));
2956 break;
2957
2958 default:
2959 ut_error;
2960 }
2961
2962 mem_heap_free(msg->heap);
2963
2964 if (!done) {
2965 n_optimize = fts_optimize_how_many(fts_slots);
2966 } else {
2967 n_optimize = 0;
2968 }
2969 }
2970 }
2971
2972 /* Server is being shutdown, sync the data from FTS cache to disk
2973 if needed */
2974 if (n_tables > 0) {
2975 ulint i;
2976
2977 for (i = 0; i < ib_vector_size(fts_slots); i++) {
2978 fts_slot_t* slot;
2979
2980 slot = static_cast<fts_slot_t*>(
2981 ib_vector_get(fts_slots, i));
2982
2983 if (slot->state != FTS_STATE_EMPTY) {
2984 fts_optimize_sync_table(slot->table_id);
2985 }
2986 }
2987 }
2988
2989 ib_vector_free(fts_slots);
2990
2991 ib::info() << "FTS optimize thread exiting.";
2992
2993 os_event_set(fts_opt_shutdown_event);
2994 my_thread_end();
2995
2996 /* We count the number of threads in os_thread_exit(). A created
2997 thread should always use that to exit and not use return() to exit. */
2998 os_thread_exit();
2999
3000 OS_THREAD_DUMMY_RETURN;
3001 }
3002
3003 /**********************************************************************//**
3004 Startup the optimize thread and create the work queue. */
3005 void
fts_optimize_init(void)3006 fts_optimize_init(void)
3007 /*===================*/
3008 {
3009 mem_heap_t* heap;
3010 ib_alloc_t* heap_alloc;
3011 dict_table_t* table;
3012
3013 ut_ad(!srv_read_only_mode);
3014
3015 /* For now we only support one optimize thread. */
3016 ut_a(!fts_optimize_is_init());
3017
3018 /* Create FTS optimize work queue */
3019 fts_optimize_wq = ib_wqueue_create();
3020 ut_a(fts_optimize_wq != NULL);
3021
3022 /* Create FTS vector to store fts_slot_t */
3023 heap = mem_heap_create(sizeof(dict_table_t*) * 64);
3024 heap_alloc = ib_heap_allocator_create(heap);
3025 fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
3026
3027 /* Add fts tables to the fts_slots vector which were skipped during restart */
3028 std::vector<dict_table_t*> table_vector;
3029 std::vector<dict_table_t*>::iterator it;
3030
3031 mutex_enter(&dict_sys->mutex);
3032 for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
3033 table != NULL;
3034 table = UT_LIST_GET_NEXT(table_LRU, table)) {
3035 if (table->fts &&
3036 dict_table_has_fts_index(table)) {
3037 if (fts_optimize_new_table(fts_slots,
3038 table)){
3039 table_vector.push_back(table);
3040 }
3041 }
3042 }
3043
3044 /* It is better to call dict_table_prevent_eviction()
3045 outside the above loop because it operates on
3046 dict_sys->table_LRU list.*/
3047 for (it=table_vector.begin();it!=table_vector.end();++it) {
3048 dict_table_prevent_eviction(*it);
3049 }
3050
3051 mutex_exit(&dict_sys->mutex);
3052 table_vector.clear();
3053
3054 fts_opt_shutdown_event = os_event_create(0);
3055 last_check_sync_time = ut_time_monotonic();
3056
3057 os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
3058 }
3059
3060 /**********************************************************************//**
3061 Check whether the work queue is initialized.
3062 @return TRUE if optimize queue is initialized. */
3063 ibool
fts_optimize_is_init(void)3064 fts_optimize_is_init(void)
3065 /*======================*/
3066 {
3067 return(fts_optimize_wq != NULL);
3068 }
3069
3070 /** Shutdown fts optimize thread. */
3071 void
fts_optimize_shutdown()3072 fts_optimize_shutdown()
3073 {
3074 ut_ad(!srv_read_only_mode);
3075
3076 fts_msg_t* msg;
3077
3078 /* If there is an ongoing activity on dictionary, such as
3079 srv_master_evict_from_table_cache(), wait for it */
3080 dict_mutex_enter_for_mysql();
3081
3082 /* Tells FTS optimizer system that we are exiting from
3083 optimizer thread, message send their after will not be
3084 processed */
3085 fts_opt_start_shutdown = true;
3086 dict_mutex_exit_for_mysql();
3087
3088 /* We tell the OPTIMIZE thread to switch to state done, we
3089 can't delete the work queue here because the add thread needs
3090 deregister the FTS tables. */
3091
3092 msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
3093
3094 ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
3095
3096 os_event_wait(fts_opt_shutdown_event);
3097
3098 os_event_destroy(fts_opt_shutdown_event);
3099
3100 ib_wqueue_free(fts_optimize_wq);
3101 fts_optimize_wq = NULL;
3102 }
3103