1 /*****************************************************************************
2 
3 Copyright (c) 2007, 2021, Oracle and/or its affiliates.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /******************************************************************//**
28 @file fts/fts0opt.cc
29 Full Text Search optimize thread
30 
31 Created 2007/03/27 Sunny Bains
32 Completed 2011/7/10 Sunny and Jimmy Yang
33 
34 ***********************************************************************/
35 
36 #include "ha_prototypes.h"
37 
38 #include "fts0fts.h"
39 #include "row0sel.h"
40 #include "que0types.h"
41 #include "fts0priv.h"
42 #include "fts0types.h"
43 #include "ut0wqueue.h"
44 #include "srv0start.h"
45 #include "ut0list.h"
46 #include "zlib.h"
47 
48 #ifdef UNIV_NONINL
49 #include "fts0types.ic"
50 #include "fts0vlc.ic"
51 #endif
52 
53 /** The FTS optimize thread's work queue. */
54 static ib_wqueue_t* fts_optimize_wq;
55 
56 /** The FTS vector to store fts_slot_t */
57 static ib_vector_t*  fts_slots;
58 
59 /** Time to wait for a message. */
60 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
61 
62 /** Default optimize interval in secs. */
63 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
64 
65 /** Server is shutting down, so does we exiting the optimize thread */
66 static bool fts_opt_start_shutdown = false;
67 
68 /** Event to wait for shutdown of the optimize thread */
69 static os_event_t fts_opt_shutdown_event = NULL;
70 
71 /** Initial size of nodes in fts_word_t. */
72 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
73 
74 /** Last time we did check whether system need a sync */
75 static ib_time_monotonic_t	last_check_sync_time;
76 
77 /** State of a table within the optimization sub system. */
78 enum fts_state_t {
79 	FTS_STATE_LOADED,
80 	FTS_STATE_RUNNING,
81 	FTS_STATE_SUSPENDED,
82 	FTS_STATE_DONE,
83 	FTS_STATE_EMPTY
84 };
85 
86 /** FTS optimize thread message types. */
87 enum fts_msg_type_t {
88 	FTS_MSG_STOP,			/*!< Stop optimizing and exit thread */
89 
90 	FTS_MSG_ADD_TABLE,		/*!< Add table to the optimize thread's
91 					work queue */
92 
93 	FTS_MSG_DEL_TABLE,		/*!< Remove a table from the optimize
94 					threads work queue */
95 	FTS_MSG_SYNC_TABLE		/*!< Sync fts cache of a table */
96 };
97 
98 /** Compressed list of words that have been read from FTS INDEX
99 that needs to be optimized. */
100 struct fts_zip_t {
101 	lint		status;		/*!< Status of (un)/zip operation */
102 
103 	ulint		n_words;	/*!< Number of words compressed */
104 
105 	ulint		block_sz;	/*!< Size of a block in bytes */
106 
107 	ib_vector_t*	blocks;		/*!< Vector of compressed blocks */
108 
109 	ib_alloc_t*	heap_alloc;	/*!< Heap to use for allocations */
110 
111 	ulint		pos;		/*!< Offset into blocks */
112 
113 	ulint		last_big_block;	/*!< Offset of last block in the
114 					blocks array that is of size
115 					block_sz. Blocks beyond this offset
116 					are of size FTS_MAX_WORD_LEN */
117 
118 	z_streamp	zp;		/*!< ZLib state */
119 
120 					/*!< The value of the last word read
121 					from the FTS INDEX table. This is
122 					used to discard duplicates */
123 
124 	fts_string_t	word;		/*!< UTF-8 string */
125 
126 	ulint		max_words;	/*!< maximum number of words to read
127 					in one pase */
128 };
129 
130 /** Prepared statemets used during optimize */
131 struct fts_optimize_graph_t {
132 					/*!< Delete a word from FTS INDEX */
133 	que_t*		delete_nodes_graph;
134 					/*!< Insert a word into FTS INDEX */
135 	que_t*		write_nodes_graph;
136 					/*!< COMMIT a transaction */
137 	que_t*		commit_graph;
138 					/*!< Read the nodes from FTS_INDEX */
139 	que_t*		read_nodes_graph;
140 };
141 
142 /** Used by fts_optimize() to store state. */
143 struct fts_optimize_t {
144 	trx_t*		trx;		/*!< The transaction used for all SQL */
145 
146 	ib_alloc_t*	self_heap;	/*!< Heap to use for allocations */
147 
148 	char*		name_prefix;	/*!< FTS table name prefix */
149 
150 	fts_table_t	fts_index_table;/*!< Common table definition */
151 
152 					/*!< Common table definition */
153 	fts_table_t	fts_common_table;
154 
155 	dict_table_t*	table;		/*!< Table that has to be queried */
156 
157 	dict_index_t*	index;		/*!< The FTS index to be optimized */
158 
159 	fts_doc_ids_t*	to_delete;	/*!< doc ids to delete, we check against
160 					this vector and purge the matching
161 					entries during the optimizing
162 					process. The vector entries are
163 					sorted on doc id */
164 
165 	ulint		del_pos;	/*!< Offset within to_delete vector,
166 					this is used to keep track of where
167 					we are up to in the vector */
168 
169 	ibool		done;		/*!< TRUE when optimize finishes */
170 
171 	ib_vector_t*	words;		/*!< Word + Nodes read from FTS_INDEX,
172 					it contains instances of fts_word_t */
173 
174 	fts_zip_t*	zip;		/*!< Words read from the FTS_INDEX */
175 
176 	fts_optimize_graph_t		/*!< Prepared statements used during */
177 			graph;		/*optimize */
178 
179 	ulint		n_completed;	/*!< Number of FTS indexes that have
180 					been optimized */
181 	ibool		del_list_regenerated;
182 					/*!< BEING_DELETED list regenarated */
183 };
184 
185 /** Used by the optimize, to keep state during compacting nodes. */
186 struct fts_encode_t {
187 	doc_id_t	src_last_doc_id;/*!< Last doc id read from src node */
188 	byte*		src_ilist_ptr;	/*!< Current ptr within src ilist */
189 };
190 
191 /** We use this information to determine when to start the optimize
192 cycle for a table. */
193 struct fts_slot_t {
194 	dict_table_t*	table;		/*!< Table to optimize */
195 
196 	table_id_t	table_id;	/*!< Table id */
197 
198 	fts_state_t	state;		/*!< State of this slot */
199 
200 	ulint		added;		/*!< Number of doc ids added since the
201 					last time this table was optimized */
202 
203 	ulint		deleted;	/*!< Number of doc ids deleted since the
204 					last time this table was optimized */
205 
206 	ib_time_monotonic_t	last_run;  /*!< Time last run completed */
207 
208 	ib_time_monotonic_t	completed; /*!< Optimize finish time */
209 
210 	ib_time_t	interval_time;	   /*!< Minimum time to wait before
211 					   optimizing the table again. */
212 };
213 
214 /** A table remove message for the FTS optimize thread. */
215 struct fts_msg_del_t {
216 	dict_table_t*	table;		/*!< The table to remove */
217 
218 	os_event_t	event;		/*!< Event to synchronize acknowledgement
219 					of receipt and processing of the
220 					this message by the consumer */
221 };
222 
223 /** The FTS optimize message work queue message type. */
224 struct fts_msg_t {
225 	fts_msg_type_t	type;		/*!< Message type */
226 
227 	void*		ptr;		/*!< The message contents */
228 
229 	mem_heap_t*	heap;		/*!< The heap used to allocate this
230 					message, the message consumer will
231 					free the heap. */
232 };
233 
234 /** The number of words to read and optimize in a single pass. */
235 ulong	fts_num_word_optimize;
236 
237 /** Whether to enable additional FTS diagnostic printout. */
238 char	fts_enable_diag_print;
239 
240 /** ZLib compressed block size.*/
241 static ulint FTS_ZIP_BLOCK_SIZE	= 1024;
242 
243 /** The amount of time optimizing in a single pass, in milliseconds. */
244 static ib_time_t fts_optimize_time_limit = 0;
245 
246 /** It's defined in fts0fts.cc  */
247 extern const char* fts_common_tables[];
248 
249 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
250 static	const char* fts_init_delete_sql =
251 	"BEGIN\n"
252 	"\n"
253 	"INSERT INTO $BEING_DELETED\n"
254 		"SELECT doc_id FROM $DELETED;\n"
255 	"\n"
256 	"INSERT INTO $BEING_DELETED_CACHE\n"
257 		"SELECT doc_id FROM $DELETED_CACHE;\n";
258 
259 static const char* fts_delete_doc_ids_sql =
260 	"BEGIN\n"
261 	"\n"
262 	"DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
263 	"DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
264 
265 static const char* fts_end_delete_sql =
266 	"BEGIN\n"
267 	"\n"
268 	"DELETE FROM $BEING_DELETED;\n"
269 	"DELETE FROM $BEING_DELETED_CACHE;\n";
270 
271 /**********************************************************************//**
272 Initialize fts_zip_t. */
273 static
274 void
fts_zip_initialize(fts_zip_t * zip)275 fts_zip_initialize(
276 /*===============*/
277 	fts_zip_t*	zip)		/*!< out: zip instance to initialize */
278 {
279 	zip->pos = 0;
280 	zip->n_words = 0;
281 
282 	zip->status = Z_OK;
283 
284 	zip->last_big_block = 0;
285 
286 	zip->word.f_len = 0;
287 	memset(zip->word.f_str, 0, FTS_MAX_WORD_LEN);
288 
289 	ib_vector_reset(zip->blocks);
290 
291 	memset(zip->zp, 0, sizeof(*zip->zp));
292 }
293 
294 /**********************************************************************//**
295 Create an instance of fts_zip_t.
296 @return a new instance of fts_zip_t */
297 static
298 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)299 fts_zip_create(
300 /*===========*/
301 	mem_heap_t*	heap,		/*!< in: heap */
302 	ulint		block_sz,	/*!< in: size of a zip block.*/
303 	ulint		max_words)	/*!< in: max words to read */
304 {
305 	fts_zip_t*	zip;
306 
307 	zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
308 
309 	zip->word.f_str = static_cast<byte*>(
310 		mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
311 
312 	zip->block_sz = block_sz;
313 
314 	zip->heap_alloc = ib_heap_allocator_create(heap);
315 
316 	zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
317 
318 	zip->max_words = max_words;
319 
320 	zip->zp = static_cast<z_stream*>(
321 		mem_heap_zalloc(heap, sizeof(*zip->zp)));
322 
323 	return(zip);
324 }
325 
326 /**********************************************************************//**
327 Initialize an instance of fts_zip_t. */
328 static
329 void
fts_zip_init(fts_zip_t * zip)330 fts_zip_init(
331 /*=========*/
332 
333 	fts_zip_t*	zip)		/*!< in: zip instance to init */
334 {
335 	memset(zip->zp, 0, sizeof(*zip->zp));
336 
337 	zip->word.f_len = 0;
338 	*zip->word.f_str = '\0';
339 }
340 
341 /**********************************************************************//**
342 Create a fts_optimizer_word_t instance.
343 @return new instance */
344 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)345 fts_word_init(
346 /*==========*/
347 	fts_word_t*	word,		/*!< in: word to initialize */
348 	byte*		utf8,		/*!< in: UTF-8 string */
349 	ulint		len)		/*!< in: length of string in bytes */
350 {
351 	mem_heap_t*	heap = mem_heap_create(sizeof(fts_node_t));
352 
353 	memset(word, 0, sizeof(*word));
354 
355 	word->text.f_len = len;
356 	word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
357 
358 	/* Need to copy the NUL character too. */
359 	memcpy(word->text.f_str, utf8, word->text.f_len);
360 	word->text.f_str[word->text.f_len] = 0;
361 
362 	word->heap_alloc = ib_heap_allocator_create(heap);
363 
364 	word->nodes = ib_vector_create(
365 		word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
366 
367 	return(word);
368 }
369 
370 /**********************************************************************//**
371 Read the FTS INDEX row.
372 @return fts_node_t instance */
373 static
374 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)375 fts_optimize_read_node(
376 /*===================*/
377 	fts_word_t*	word,		/*!< in: */
378 	que_node_t*	exp)		/*!< in: */
379 {
380 	int		i;
381 	fts_node_t*	node = static_cast<fts_node_t*>(
382 		ib_vector_push(word->nodes, NULL));
383 
384 	/* Start from 1 since the first node has been read by the caller */
385 	for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
386 
387 		dfield_t*	dfield = que_node_get_val(exp);
388 		byte*		data = static_cast<byte*>(
389 			dfield_get_data(dfield));
390 		ulint		len = dfield_get_len(dfield);
391 
392 		ut_a(len != UNIV_SQL_NULL);
393 
394 		/* Note: The column numbers below must match the SELECT */
395 		switch (i) {
396 		case 1: /* DOC_COUNT */
397 			node->doc_count = mach_read_from_4(data);
398 			break;
399 
400 		case 2: /* FIRST_DOC_ID */
401 			node->first_doc_id = fts_read_doc_id(data);
402 			break;
403 
404 		case 3: /* LAST_DOC_ID */
405 			node->last_doc_id = fts_read_doc_id(data);
406 			break;
407 
408 		case 4: /* ILIST */
409 			node->ilist_size_alloc = node->ilist_size = len;
410 			node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
411 			memcpy(node->ilist, data, len);
412 			break;
413 
414 		default:
415 			ut_error;
416 		}
417 	}
418 
419 	/* Make sure all columns were read. */
420 	ut_a(i == 5);
421 
422 	return(node);
423 }
424 
425 /**********************************************************************//**
426 Callback function to fetch the rows in an FTS INDEX record.
427 @return always returns non-NULL */
428 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)429 fts_optimize_index_fetch_node(
430 /*==========================*/
431 	void*		row,		/*!< in: sel_node_t* */
432 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
433 {
434 	fts_word_t*	word;
435 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
436 	fts_fetch_t*	fetch = static_cast<fts_fetch_t*>(user_arg);
437 	ib_vector_t*	words = static_cast<ib_vector_t*>(fetch->read_arg);
438 	que_node_t*	exp = sel_node->select_list;
439 	dfield_t*	dfield = que_node_get_val(exp);
440 	void*		data = dfield_get_data(dfield);
441 	ulint		dfield_len = dfield_get_len(dfield);
442 	fts_node_t*	node;
443 	bool		is_word_init = false;
444 
445 	ut_a(dfield_len <= FTS_MAX_WORD_LEN);
446 
447 	if (ib_vector_size(words) == 0) {
448 
449 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
450 		fts_word_init(word, (byte*) data, dfield_len);
451 		is_word_init = true;
452 	}
453 
454 	word = static_cast<fts_word_t*>(ib_vector_last(words));
455 
456 	if (dfield_len != word->text.f_len
457 	    || memcmp(word->text.f_str, data, dfield_len)) {
458 
459 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
460 		fts_word_init(word, (byte*) data, dfield_len);
461 		is_word_init = true;
462 	}
463 
464 	node = fts_optimize_read_node(word, que_node_get_next(exp));
465 
466 	fetch->total_memory += node->ilist_size;
467 	if (is_word_init) {
468 		fetch->total_memory += sizeof(fts_word_t)
469 			+ sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
470 			+ sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
471 	} else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
472 		fetch->total_memory += sizeof(fts_node_t);
473 	}
474 
475 	if (fetch->total_memory >= fts_result_cache_limit) {
476 		return(FALSE);
477 	}
478 
479 	return(TRUE);
480 }
481 
482 /**********************************************************************//**
483 Read the rows from the FTS inde.
484 @return DB_SUCCESS or error code */
485 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)486 fts_index_fetch_nodes(
487 /*==================*/
488 	trx_t*		trx,		/*!< in: transaction */
489 	que_t**		graph,		/*!< in: prepared statement */
490 	fts_table_t*	fts_table,	/*!< in: table of the FTS INDEX */
491 	const fts_string_t*
492 			word,		/*!< in: the word to fetch */
493 	fts_fetch_t*	fetch)		/*!< in: fetch callback.*/
494 {
495 	pars_info_t*	info;
496 	dberr_t		error;
497 	char		table_name[MAX_FULL_NAME_LEN];
498 
499 	trx->op_info = "fetching FTS index nodes";
500 
501 	if (*graph) {
502 		info = (*graph)->info;
503 	} else {
504 		ulint	selected;
505 
506 		info = pars_info_create();
507 
508 		ut_a(fts_table->type == FTS_INDEX_TABLE);
509 
510 		selected = fts_select_index(fts_table->charset,
511 					    word->f_str, word->f_len);
512 
513 		fts_table->suffix = fts_get_suffix(selected);
514 
515 		fts_get_table_name(fts_table, table_name);
516 
517 		pars_info_bind_id(info, true, "table_name", table_name);
518 	}
519 
520 	pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
521 	pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
522 
523 	if (!*graph) {
524 
525 		*graph = fts_parse_sql(
526 			fts_table,
527 			info,
528 			"DECLARE FUNCTION my_func;\n"
529 			"DECLARE CURSOR c IS"
530 			" SELECT word, doc_count, first_doc_id, last_doc_id,"
531 			" ilist\n"
532 			" FROM $table_name\n"
533 			" WHERE word LIKE :word\n"
534 			" ORDER BY first_doc_id;\n"
535 			"BEGIN\n"
536 			"\n"
537 			"OPEN c;\n"
538 			"WHILE 1 = 1 LOOP\n"
539 			"  FETCH c INTO my_func();\n"
540 			"  IF c % NOTFOUND THEN\n"
541 			"    EXIT;\n"
542 			"  END IF;\n"
543 			"END LOOP;\n"
544 			"CLOSE c;");
545 	}
546 
547 	for (;;) {
548 		error = fts_eval_sql(trx, *graph);
549 
550 		if (error == DB_SUCCESS) {
551 			fts_sql_commit(trx);
552 
553 			break;				/* Exit the loop. */
554 		} else {
555 			fts_sql_rollback(trx);
556 
557 			if (error == DB_LOCK_WAIT_TIMEOUT) {
558 				ib::warn() << "lock wait timeout reading"
559 					" FTS index. Retrying!";
560 
561 				trx->error_state = DB_SUCCESS;
562 			} else {
563 				ib::error() << "(" << ut_strerr(error)
564 					<< ") while reading FTS index.";
565 
566 				break;			/* Exit the loop. */
567 			}
568 		}
569 	}
570 
571 	return(error);
572 }
573 
574 /**********************************************************************//**
575 Read a word */
576 static
577 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)578 fts_zip_read_word(
579 /*==============*/
580 	fts_zip_t*	zip,		/*!< in: Zip state + data */
581 	fts_string_t*	word)		/*!< out: uncompressed word */
582 {
583 	short		len = 0;
584 	void*		null = NULL;
585 	byte*		ptr = word->f_str;
586 	int		flush = Z_NO_FLUSH;
587 
588 	/* Either there was an error or we are at the Z_STREAM_END. */
589 	if (zip->status != Z_OK) {
590 		return(NULL);
591 	}
592 
593 	zip->zp->next_out = reinterpret_cast<byte*>(&len);
594 	zip->zp->avail_out = sizeof(len);
595 
596 	while (zip->status == Z_OK && zip->zp->avail_out > 0) {
597 
598 		/* Finished decompressing block. */
599 		if (zip->zp->avail_in == 0) {
600 
601 			/* Free the block thats been decompressed. */
602 			if (zip->pos > 0) {
603 				ulint	prev = zip->pos - 1;
604 
605 				ut_a(zip->pos < ib_vector_size(zip->blocks));
606 
607 				ut_free(ib_vector_getp(zip->blocks, prev));
608 				ib_vector_set(zip->blocks, prev, &null);
609 			}
610 
611 			/* Any more blocks to decompress. */
612 			if (zip->pos < ib_vector_size(zip->blocks)) {
613 
614 				zip->zp->next_in = static_cast<byte*>(
615 					ib_vector_getp(
616 						zip->blocks, zip->pos));
617 
618 				if (zip->pos > zip->last_big_block) {
619 					zip->zp->avail_in =
620 						FTS_MAX_WORD_LEN;
621 				} else {
622 					zip->zp->avail_in =
623 						static_cast<uInt>(zip->block_sz);
624 				}
625 
626 				++zip->pos;
627 			} else {
628 				flush = Z_FINISH;
629 			}
630 		}
631 
632 		switch (zip->status = inflate(zip->zp, flush)) {
633 		case Z_OK:
634 			if (zip->zp->avail_out == 0 && len > 0) {
635 
636 				ut_a(len <= FTS_MAX_WORD_LEN);
637 				ptr[len] = 0;
638 
639 				zip->zp->next_out = ptr;
640 				zip->zp->avail_out = len;
641 
642 				word->f_len = len;
643 				len = 0;
644 			}
645 			break;
646 
647 		case Z_BUF_ERROR:	/* No progress possible. */
648 		case Z_STREAM_END:
649 			inflateEnd(zip->zp);
650 			break;
651 
652 		case Z_STREAM_ERROR:
653 		default:
654 			ut_error;
655 		}
656 	}
657 
658 	/* All blocks must be freed at end of inflate. */
659 	if (zip->status != Z_OK) {
660 		for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
661 			if (ib_vector_getp(zip->blocks, i)) {
662 				ut_free(ib_vector_getp(zip->blocks, i));
663 				ib_vector_set(zip->blocks, i, &null);
664 			}
665 		}
666 	}
667 
668 	if (ptr != NULL) {
669 		ut_ad(word->f_len == strlen((char*) ptr));
670 	}
671 
672 	return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
673 }
674 
675 /**********************************************************************//**
676 Callback function to fetch and compress the word in an FTS
677 INDEX record.
678 @return FALSE on EOF */
679 static
680 ibool
fts_fetch_index_words(void * row,void * user_arg)681 fts_fetch_index_words(
682 /*==================*/
683 	void*		row,		/*!< in: sel_node_t* */
684 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
685 {
686 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
687 	fts_zip_t*	zip = static_cast<fts_zip_t*>(user_arg);
688 	que_node_t*	exp = sel_node->select_list;
689 	dfield_t*	dfield = que_node_get_val(exp);
690 	ushort		len =  static_cast<ushort>(dfield_get_len(dfield));
691 	void*		data = dfield_get_data(dfield);
692 
693 	/* Skip the duplicate words. */
694 	if (zip->word.f_len == static_cast<ulint>(len)
695 	    && !memcmp(zip->word.f_str, data, len)) {
696 
697 		return(TRUE);
698 	}
699 
700 	ut_a(len <= FTS_MAX_WORD_LEN);
701 
702 	memcpy(zip->word.f_str, data, len);
703 	zip->word.f_len = len;
704 
705 	ut_a(zip->zp->avail_in == 0);
706 	ut_a(zip->zp->next_in == NULL);
707 
708 	/* The string is prefixed by len. */
709 	zip->zp->next_in = reinterpret_cast<byte*>(&len);
710 	zip->zp->avail_in = sizeof(len);
711 
712 	/* Compress the word, create output blocks as necessary. */
713 	while (zip->zp->avail_in > 0) {
714 
715 		/* No space left in output buffer, create a new one. */
716 		if (zip->zp->avail_out == 0) {
717 			byte*		block;
718 
719 			block = static_cast<byte*>(
720 				ut_malloc_nokey(zip->block_sz));
721 
722 			ib_vector_push(zip->blocks, &block);
723 
724 			zip->zp->next_out = block;
725 			zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
726 		}
727 
728 		switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
729 		case Z_OK:
730 			if (zip->zp->avail_in == 0) {
731 				zip->zp->next_in = static_cast<byte*>(data);
732 				zip->zp->avail_in = len;
733 				ut_a(len <= FTS_MAX_WORD_LEN);
734 				len = 0;
735 			}
736 			break;
737 
738 		case Z_STREAM_END:
739 		case Z_BUF_ERROR:
740 		case Z_STREAM_ERROR:
741 		default:
742 			ut_error;
743 			break;
744 		}
745 	}
746 
747 	/* All data should have been compressed. */
748 	ut_a(zip->zp->avail_in == 0);
749 	zip->zp->next_in = NULL;
750 
751 	++zip->n_words;
752 
753 	return(zip->n_words >= zip->max_words ? FALSE : TRUE);
754 }
755 
756 /**********************************************************************//**
757 Finish Zip deflate. */
758 static
759 void
fts_zip_deflate_end(fts_zip_t * zip)760 fts_zip_deflate_end(
761 /*================*/
762 	fts_zip_t*	zip)		/*!< in: instance that should be closed*/
763 {
764 	ut_a(zip->zp->avail_in == 0);
765 	ut_a(zip->zp->next_in == NULL);
766 
767 	zip->status = deflate(zip->zp, Z_FINISH);
768 
769 	ut_a(ib_vector_size(zip->blocks) > 0);
770 	zip->last_big_block = ib_vector_size(zip->blocks) - 1;
771 
772 	/* Allocate smaller block(s), since this is trailing data. */
773 	while (zip->status == Z_OK) {
774 		byte*		block;
775 
776 		ut_a(zip->zp->avail_out == 0);
777 
778 		block = static_cast<byte*>(
779 			ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
780 
781 		ib_vector_push(zip->blocks, &block);
782 
783 		zip->zp->next_out = block;
784 		zip->zp->avail_out = FTS_MAX_WORD_LEN;
785 
786 		zip->status = deflate(zip->zp, Z_FINISH);
787 	}
788 
789 	ut_a(zip->status == Z_STREAM_END);
790 
791 	zip->status = deflateEnd(zip->zp);
792 	ut_a(zip->status == Z_OK);
793 
794 	/* Reset the ZLib data structure. */
795 	memset(zip->zp, 0, sizeof(*zip->zp));
796 }
797 
798 /**********************************************************************//**
799 Read the words from the FTS INDEX.
800 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
801         to search else error code */
802 static MY_ATTRIBUTE((nonnull, warn_unused_result))
803 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)804 fts_index_fetch_words(
805 /*==================*/
806 	fts_optimize_t*		optim,	/*!< in: optimize scratch pad */
807 	const fts_string_t*	word,	/*!< in: get words greater than this
808 					 word */
809 	ulint			n_words)/*!< in: max words to read */
810 {
811 	pars_info_t*	info;
812 	que_t*		graph;
813 	ulint		selected;
814 	fts_zip_t*	zip = NULL;
815 	dberr_t		error = DB_SUCCESS;
816 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
817 	ibool		inited = FALSE;
818 
819 	optim->trx->op_info = "fetching FTS index words";
820 
821 	if (optim->zip == NULL) {
822 		optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
823 	} else {
824 		fts_zip_initialize(optim->zip);
825 	}
826 
827 	for (selected = fts_select_index(
828 		     optim->fts_index_table.charset, word->f_str, word->f_len);
829 	     selected < FTS_NUM_AUX_INDEX;
830 	     selected++) {
831 
832 		char	table_name[MAX_FULL_NAME_LEN];
833 
834 		optim->fts_index_table.suffix = fts_get_suffix(selected);
835 
836 		info = pars_info_create();
837 
838 		pars_info_bind_function(
839 			info, "my_func", fts_fetch_index_words, optim->zip);
840 
841 		pars_info_bind_varchar_literal(
842 			info, "word", word->f_str, word->f_len);
843 
844 		fts_get_table_name(&optim->fts_index_table, table_name);
845 		pars_info_bind_id(info, true, "table_name", table_name);
846 
847 		graph = fts_parse_sql(
848 			&optim->fts_index_table,
849 			info,
850 			"DECLARE FUNCTION my_func;\n"
851 			"DECLARE CURSOR c IS"
852 			" SELECT word\n"
853 			" FROM $table_name\n"
854 			" WHERE word > :word\n"
855 			" ORDER BY word;\n"
856 			"BEGIN\n"
857 			"\n"
858 			"OPEN c;\n"
859 			"WHILE 1 = 1 LOOP\n"
860 			"  FETCH c INTO my_func();\n"
861 			"  IF c % NOTFOUND THEN\n"
862 			"    EXIT;\n"
863 			"  END IF;\n"
864 			"END LOOP;\n"
865 			"CLOSE c;");
866 
867 		zip = optim->zip;
868 
869 		for (;;) {
870 			int	err;
871 
872 			if (!inited && ((err = deflateInit(zip->zp, 9))
873 					!= Z_OK)) {
874 				ib::error() << "ZLib deflateInit() failed: "
875 					<< err;
876 
877 				error = DB_ERROR;
878 				break;
879 			} else {
880 				inited = TRUE;
881 				error = fts_eval_sql(optim->trx, graph);
882 			}
883 
884 			if (error == DB_SUCCESS) {
885 				//FIXME fts_sql_commit(optim->trx);
886 				break;
887 			} else {
888 				//FIXME fts_sql_rollback(optim->trx);
889 
890 				if (error == DB_LOCK_WAIT_TIMEOUT) {
891 					ib::warn() << "Lock wait timeout"
892 						" reading document. Retrying!";
893 
894 					/* We need to reset the ZLib state. */
895 					inited = FALSE;
896 					deflateEnd(zip->zp);
897 					fts_zip_init(zip);
898 
899 					optim->trx->error_state = DB_SUCCESS;
900 				} else {
901 					ib::error() << "(" << ut_strerr(error)
902 						<< ") while reading document.";
903 
904 					break;	/* Exit the loop. */
905 				}
906 			}
907 		}
908 
909 		fts_que_graph_free(graph);
910 
911 		/* Check if max word to fetch is exceeded */
912 		if (optim->zip->n_words >= n_words) {
913 			break;
914 		}
915 	}
916 
917 	if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
918 
919 		/* All data should have been read. */
920 		ut_a(zip->zp->avail_in == 0);
921 
922 		fts_zip_deflate_end(zip);
923 	} else {
924 		deflateEnd(zip->zp);
925 	}
926 
927 	return(error);
928 }
929 
930 /**********************************************************************//**
931 Callback function to fetch the doc id from the record.
932 @return always returns TRUE */
933 static
934 ibool
fts_fetch_doc_ids(void * row,void * user_arg)935 fts_fetch_doc_ids(
936 /*==============*/
937 	void*	row,		/*!< in: sel_node_t* */
938 	void*	user_arg)	/*!< in: pointer to ib_vector_t */
939 {
940 	que_node_t*	exp;
941 	int		i = 0;
942 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
943 	fts_doc_ids_t*	fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
944 	fts_update_t*	update = static_cast<fts_update_t*>(
945 		ib_vector_push(fts_doc_ids->doc_ids, NULL));
946 
947 	for (exp = sel_node->select_list;
948 	     exp;
949 	     exp = que_node_get_next(exp), ++i) {
950 
951 		dfield_t*	dfield = que_node_get_val(exp);
952 		void*		data = dfield_get_data(dfield);
953 		ulint		len = dfield_get_len(dfield);
954 
955 		ut_a(len != UNIV_SQL_NULL);
956 
957 		/* Note: The column numbers below must match the SELECT. */
958 		switch (i) {
959 		case 0: /* DOC_ID */
960 			update->fts_indexes = NULL;
961 			update->doc_id = fts_read_doc_id(
962 				static_cast<byte*>(data));
963 			break;
964 
965 		default:
966 			ut_error;
967 		}
968 	}
969 
970 	return(TRUE);
971 }
972 
973 /**********************************************************************//**
974 Read the rows from a FTS common auxiliary table.
975 @return DB_SUCCESS or error code */
976 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)977 fts_table_fetch_doc_ids(
978 /*====================*/
979 	trx_t*		trx,		/*!< in: transaction */
980 	fts_table_t*	fts_table,	/*!< in: table */
981 	fts_doc_ids_t*	doc_ids)	/*!< in: For collecting doc ids */
982 {
983 	dberr_t		error;
984 	que_t*		graph;
985 	pars_info_t*	info = pars_info_create();
986 	ibool		alloc_bk_trx = FALSE;
987 	char		table_name[MAX_FULL_NAME_LEN];
988 
989 	ut_a(fts_table->suffix != NULL);
990 	ut_a(fts_table->type == FTS_COMMON_TABLE);
991 
992 	if (!trx) {
993 		trx = trx_allocate_for_background();
994 		alloc_bk_trx = TRUE;
995 	}
996 
997 	trx->op_info = "fetching FTS doc ids";
998 
999 	pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
1000 
1001 	fts_get_table_name(fts_table, table_name);
1002 	pars_info_bind_id(info, true, "table_name", table_name);
1003 
1004 	graph = fts_parse_sql(
1005 		fts_table,
1006 		info,
1007 		"DECLARE FUNCTION my_func;\n"
1008 		"DECLARE CURSOR c IS"
1009 		" SELECT doc_id FROM $table_name;\n"
1010 		"BEGIN\n"
1011 		"\n"
1012 		"OPEN c;\n"
1013 		"WHILE 1 = 1 LOOP\n"
1014 		"  FETCH c INTO my_func();\n"
1015 		"  IF c % NOTFOUND THEN\n"
1016 		"    EXIT;\n"
1017 		"  END IF;\n"
1018 		"END LOOP;\n"
1019 		"CLOSE c;");
1020 
1021 	error = fts_eval_sql(trx, graph);
1022 
1023 	mutex_enter(&dict_sys->mutex);
1024 	que_graph_free(graph);
1025 	mutex_exit(&dict_sys->mutex);
1026 
1027 	if (error == DB_SUCCESS) {
1028 		fts_sql_commit(trx);
1029 
1030 		ib_vector_sort(doc_ids->doc_ids, fts_update_doc_id_cmp);
1031 	} else {
1032 		fts_sql_rollback(trx);
1033 	}
1034 
1035 	if (alloc_bk_trx) {
1036 		trx_free_for_background(trx);
1037 	}
1038 
1039 	return(error);
1040 }
1041 
1042 /**********************************************************************//**
1043 Do a binary search for a doc id in the array
1044 @return +ve index if found -ve index where it should be inserted
1045         if not found */
1046 int
fts_bsearch(fts_update_t * array,int lower,int upper,doc_id_t doc_id)1047 fts_bsearch(
1048 /*========*/
1049 	fts_update_t*	array,	/*!< in: array to sort */
1050 	int		lower,	/*!< in: the array lower bound */
1051 	int		upper,	/*!< in: the array upper bound */
1052 	doc_id_t	doc_id)	/*!< in: the doc id to search for */
1053 {
1054 	int	orig_size = upper;
1055 
1056 	if (upper == 0) {
1057 		/* Nothing to search */
1058 		return(-1);
1059 	} else {
1060 		while (lower < upper) {
1061 			int	i = (lower + upper) >> 1;
1062 
1063 			if (doc_id > array[i].doc_id) {
1064 				lower = i + 1;
1065 			} else if (doc_id < array[i].doc_id) {
1066 				upper = i - 1;
1067 			} else {
1068 				return(i); /* Found. */
1069 			}
1070 		}
1071 	}
1072 
1073 	if (lower == upper && lower < orig_size) {
1074 		if (doc_id == array[lower].doc_id) {
1075 			return(lower);
1076 		} else if (lower == 0) {
1077 			return(-1);
1078 		}
1079 	}
1080 
1081 	/* Not found. */
1082 	return( (lower == 0) ? -1 : -(lower));
1083 }
1084 
1085 /**********************************************************************//**
1086 Search in the to delete array whether any of the doc ids within
1087 the [first, last] range are to be deleted
1088 @return +ve index if found -ve index where it should be inserted
1089         if not found */
1090 static
1091 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1092 fts_optimize_lookup(
1093 /*================*/
1094 	ib_vector_t*	doc_ids,	/*!< in: array to search */
1095 	ulint		lower,		/*!< in: lower limit of array */
1096 	doc_id_t	first_doc_id,	/*!< in: doc id to lookup */
1097 	doc_id_t	last_doc_id)	/*!< in: doc id to lookup */
1098 {
1099 	int		pos;
1100 	int		upper = static_cast<int>(ib_vector_size(doc_ids));
1101 	fts_update_t*	array = (fts_update_t*) doc_ids->data;
1102 
1103 	pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1104 
1105 	ut_a(abs(pos) <= upper + 1);
1106 
1107 	if (pos < 0) {
1108 
1109 		int	i = abs(pos);
1110 
1111 		/* If i is 1, it could be first_doc_id is less than
1112 		either the first or second array item, do a
1113 		double check */
1114 		if (i == 1 && array[0].doc_id <= last_doc_id
1115 		    && first_doc_id < array[0].doc_id) {
1116 			pos = 0;
1117 		} else if (i < upper && array[i].doc_id <= last_doc_id) {
1118 
1119 			/* Check if the "next" doc id is within the
1120 			first & last doc id of the node. */
1121 			pos = i;
1122 		}
1123 	}
1124 
1125 	return(pos);
1126 }
1127 
1128 /**********************************************************************//**
1129 Encode the word pos list into the node
1130 @return DB_SUCCESS or error code*/
1131 static MY_ATTRIBUTE((nonnull))
1132 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1133 fts_optimize_encode_node(
1134 /*=====================*/
1135 	fts_node_t*	node,		/*!< in: node to fill*/
1136 	doc_id_t	doc_id,		/*!< in: doc id to encode */
1137 	fts_encode_t*	enc)		/*!< in: encoding state.*/
1138 {
1139 	byte*		dst;
1140 	ulint		enc_len;
1141 	ulint		pos_enc_len;
1142 	doc_id_t	doc_id_delta;
1143 	dberr_t		error = DB_SUCCESS;
1144 	byte*		src = enc->src_ilist_ptr;
1145 
1146 	if (node->first_doc_id == 0) {
1147 		ut_a(node->last_doc_id == 0);
1148 
1149 		node->first_doc_id = doc_id;
1150 	}
1151 
1152 	/* Calculate the space required to store the ilist. */
1153 	ut_ad(doc_id > node->last_doc_id);
1154 	doc_id_delta = doc_id - node->last_doc_id;
1155 	enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1156 
1157 	/* Calculate the size of the encoded pos array. */
1158 	while (*src) {
1159 		fts_decode_vlc(&src);
1160 	}
1161 
1162 	/* Skip the 0x00 byte at the end of the word positions list. */
1163 	++src;
1164 
1165 	/* Number of encoded pos bytes to copy. */
1166 	pos_enc_len = src - enc->src_ilist_ptr;
1167 
1168 	/* Total number of bytes required for copy. */
1169 	enc_len += pos_enc_len;
1170 
1171 	/* Check we have enough space in the destination buffer for
1172 	copying the document word list. */
1173 	if (!node->ilist) {
1174 		ulint	new_size;
1175 
1176 		ut_a(node->ilist_size == 0);
1177 
1178 		new_size = enc_len > FTS_ILIST_MAX_SIZE
1179 			? enc_len : FTS_ILIST_MAX_SIZE;
1180 
1181 		node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1182 		node->ilist_size_alloc = new_size;
1183 
1184 	} else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1185 		ulint	new_size = node->ilist_size + enc_len;
1186 		byte*	ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1187 
1188 		memcpy(ilist, node->ilist, node->ilist_size);
1189 
1190 		ut_free(node->ilist);
1191 
1192 		node->ilist = ilist;
1193 		node->ilist_size_alloc = new_size;
1194 	}
1195 
1196 	src = enc->src_ilist_ptr;
1197 	dst = node->ilist + node->ilist_size;
1198 
1199 	/* Encode the doc id. Cast to ulint, the delta should be small and
1200 	therefore no loss of precision. */
1201 	dst += fts_encode_int((ulint) doc_id_delta, dst);
1202 
1203 	/* Copy the encoded pos array. */
1204 	memcpy(dst, src, pos_enc_len);
1205 
1206 	node->last_doc_id = doc_id;
1207 
1208 	/* Data copied upto here. */
1209 	node->ilist_size += enc_len;
1210 	enc->src_ilist_ptr += pos_enc_len;
1211 
1212 	ut_a(node->ilist_size <= node->ilist_size_alloc);
1213 
1214 	return(error);
1215 }
1216 
1217 /**********************************************************************//**
1218 Optimize the data contained in a node.
1219 @return DB_SUCCESS or error code*/
1220 static MY_ATTRIBUTE((nonnull))
1221 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1222 fts_optimize_node(
1223 /*==============*/
1224 	ib_vector_t*	del_vec,	/*!< in: vector of doc ids to delete*/
1225 	int*		del_pos,	/*!< in: offset into above vector */
1226 	fts_node_t*	dst_node,	/*!< in: node to fill*/
1227 	fts_node_t*	src_node,	/*!< in: source node for data*/
1228 	fts_encode_t*	enc)		/*!< in: encoding state */
1229 {
1230 	ulint		copied;
1231 	dberr_t		error = DB_SUCCESS;
1232 	doc_id_t	doc_id = enc->src_last_doc_id;
1233 
1234 	if (!enc->src_ilist_ptr) {
1235 		enc->src_ilist_ptr = src_node->ilist;
1236 	}
1237 
1238 	copied = enc->src_ilist_ptr - src_node->ilist;
1239 
1240 	/* While there is data in the source node and space to copy
1241 	into in the destination node. */
1242 	while (copied < src_node->ilist_size
1243 	       && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1244 
1245 		doc_id_t	delta;
1246 		doc_id_t	del_doc_id = FTS_NULL_DOC_ID;
1247 
1248 		delta = fts_decode_vlc(&enc->src_ilist_ptr);
1249 
1250 test_again:
1251 		/* Check whether the doc id is in the delete list, if
1252 		so then we skip the entries but we need to track the
1253 		delta for decoding the entries following this document's
1254 		entries. */
1255 		if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1256 			fts_update_t*	update;
1257 
1258 			update = (fts_update_t*) ib_vector_get(
1259 				del_vec, *del_pos);
1260 
1261 			del_doc_id = update->doc_id;
1262 		}
1263 
1264 		if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1265 			ut_a(delta == src_node->first_doc_id);
1266 		}
1267 
1268 		doc_id += delta;
1269 
1270 		if (del_doc_id > 0 && doc_id == del_doc_id) {
1271 
1272 			++*del_pos;
1273 
1274 			/* Skip the entries for this document. */
1275 			while (*enc->src_ilist_ptr) {
1276 				fts_decode_vlc(&enc->src_ilist_ptr);
1277 			}
1278 
1279 			/* Skip the end of word position marker. */
1280 			++enc->src_ilist_ptr;
1281 
1282 		} else {
1283 
1284 			/* DOC ID already becomes larger than
1285 			del_doc_id, check the next del_doc_id */
1286 			if (del_doc_id > 0 && doc_id > del_doc_id) {
1287 				del_doc_id = 0;
1288 				++*del_pos;
1289 				delta = 0;
1290 				goto test_again;
1291 			}
1292 
1293 			/* Decode and copy the word positions into
1294 			the dest node. */
1295 			fts_optimize_encode_node(dst_node, doc_id, enc);
1296 
1297 			++dst_node->doc_count;
1298 
1299 			ut_a(dst_node->last_doc_id == doc_id);
1300 		}
1301 
1302 		/* Bytes copied so for from source. */
1303 		copied = enc->src_ilist_ptr - src_node->ilist;
1304 	}
1305 
1306 	if (copied >= src_node->ilist_size) {
1307 		ut_a(doc_id == src_node->last_doc_id);
1308 	}
1309 
1310 	enc->src_last_doc_id = doc_id;
1311 
1312 	return(error);
1313 }
1314 
1315 /**********************************************************************//**
1316 Determine the starting pos within the deleted doc id vector for a word.
1317 @return delete position */
1318 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1319 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1320 fts_optimize_deleted_pos(
1321 /*=====================*/
1322 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1323 	fts_word_t*	word)		/*!< in: the word data to check */
1324 {
1325 	int		del_pos;
1326 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1327 
1328 	/* Get the first and last dict ids for the word, we will use
1329 	these values to determine which doc ids need to be removed
1330 	when we coalesce the nodes. This way we can reduce the numer
1331 	of elements that need to be searched in the deleted doc ids
1332 	vector and secondly we can remove the doc ids during the
1333 	coalescing phase. */
1334 	if (ib_vector_size(del_vec) > 0) {
1335 		fts_node_t*	node;
1336 		doc_id_t	last_id;
1337 		doc_id_t	first_id;
1338 		ulint		size = ib_vector_size(word->nodes);
1339 
1340 		node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1341 		first_id = node->first_doc_id;
1342 
1343 		node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1344 		last_id = node->last_doc_id;
1345 
1346 		ut_a(first_id <= last_id);
1347 
1348 		del_pos = fts_optimize_lookup(
1349 			del_vec, optim->del_pos, first_id, last_id);
1350 	} else {
1351 
1352 		del_pos = -1; /* Note that there is nothing to delete. */
1353 	}
1354 
1355 	return(del_pos);
1356 }
1357 
1358 #define FTS_DEBUG_PRINT
1359 /**********************************************************************//**
1360 Compact the nodes for a word, we also remove any doc ids during the
1361 compaction pass.
1362 @return DB_SUCCESS or error code.*/
1363 static
1364 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1365 fts_optimize_word(
1366 /*==============*/
1367 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1368 	fts_word_t*	word)		/*!< in: the word to optimize */
1369 {
1370 	fts_encode_t	enc;
1371 	ib_vector_t*	nodes;
1372 	ulint		i = 0;
1373 	int		del_pos;
1374 	fts_node_t*	dst_node = NULL;
1375 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1376 	ulint		size = ib_vector_size(word->nodes);
1377 
1378 	del_pos = fts_optimize_deleted_pos(optim, word);
1379 	nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1380 
1381 	enc.src_last_doc_id = 0;
1382 	enc.src_ilist_ptr = NULL;
1383 
1384 	if (fts_enable_diag_print) {
1385 		word->text.f_str[word->text.f_len] = 0;
1386 		ib::info() << "FTS_OPTIMIZE: optimize \"" << word->text.f_str
1387 			<< "\"";
1388 	}
1389 
1390 	while (i < size) {
1391 		ulint		copied;
1392 		fts_node_t*	src_node;
1393 
1394 		src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1395 
1396 		if (dst_node == NULL
1397 		    || dst_node->last_doc_id > src_node->first_doc_id) {
1398 
1399 			dst_node = static_cast<fts_node_t*>(
1400 				ib_vector_push(nodes, NULL));
1401 			memset(dst_node, 0, sizeof(*dst_node));
1402 		}
1403 
1404 		/* Copy from the src to the dst node. */
1405 		fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1406 
1407 		ut_a(enc.src_ilist_ptr != NULL);
1408 
1409 		/* Determine the numer of bytes copied to dst_node. */
1410 		copied = enc.src_ilist_ptr - src_node->ilist;
1411 
1412 		/* Can't copy more than whats in the vlc array. */
1413 		ut_a(copied <= src_node->ilist_size);
1414 
1415 		/* We are done with this node release the resources. */
1416 		if (copied == src_node->ilist_size) {
1417 
1418 			enc.src_last_doc_id = 0;
1419 			enc.src_ilist_ptr = NULL;
1420 
1421 			ut_free(src_node->ilist);
1422 
1423 			src_node->ilist = NULL;
1424 			src_node->ilist_size = src_node->ilist_size_alloc = 0;
1425 
1426 			src_node = NULL;
1427 
1428 			++i; /* Get next source node to OPTIMIZE. */
1429 		}
1430 
1431 		if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1432 
1433 			dst_node = NULL;
1434 		}
1435 	}
1436 
1437 	/* All dst nodes created should have been added to the vector. */
1438 	ut_a(dst_node == NULL);
1439 
1440 	/* Return the OPTIMIZED nodes. */
1441 	return(nodes);
1442 }
1443 
1444 /**********************************************************************//**
1445 Update the FTS index table. This is a delete followed by an insert.
1446 @return DB_SUCCESS or error code */
1447 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1448 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1449 fts_optimize_write_word(
1450 /*====================*/
1451 	trx_t*		trx,		/*!< in: transaction */
1452 	fts_table_t*	fts_table,	/*!< in: table of FTS index */
1453 	fts_string_t*	word,		/*!< in: word data to write */
1454 	ib_vector_t*	nodes)		/*!< in: the nodes to write */
1455 {
1456 	ulint		i;
1457 	pars_info_t*	info;
1458 	que_t*		graph;
1459 	ulint		selected;
1460 	dberr_t		error = DB_SUCCESS;
1461 	char		table_name[MAX_FULL_NAME_LEN];
1462 
1463 	info = pars_info_create();
1464 
1465 	ut_ad(fts_table->charset);
1466 
1467 	if (fts_enable_diag_print) {
1468 		ib::info() << "FTS_OPTIMIZE: processed \"" << word->f_str
1469 			<< "\"";
1470 	}
1471 
1472 	pars_info_bind_varchar_literal(
1473 		info, "word", word->f_str, word->f_len);
1474 
1475 	selected = fts_select_index(fts_table->charset,
1476 				    word->f_str, word->f_len);
1477 
1478 	fts_table->suffix = fts_get_suffix(selected);
1479 	fts_get_table_name(fts_table, table_name);
1480 	pars_info_bind_id(info, true, "table_name", table_name);
1481 
1482 	graph = fts_parse_sql(
1483 		fts_table,
1484 		info,
1485 		"BEGIN DELETE FROM $table_name WHERE word = :word;");
1486 
1487 	error = fts_eval_sql(trx, graph);
1488 
1489 	if (error != DB_SUCCESS) {
1490 		ib::error() << "(" << ut_strerr(error) << ") during optimize,"
1491 			" when deleting a word from the FTS index.";
1492 	}
1493 
1494 	fts_que_graph_free(graph);
1495 	graph = NULL;
1496 
1497 	/* Even if the operation needs to be rolled back and redone,
1498 	we iterate over the nodes in order to free the ilist. */
1499 	for (i = 0; i < ib_vector_size(nodes); ++i) {
1500 
1501 		fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1502 
1503 		if (error == DB_SUCCESS) {
1504 			/* Skip empty node. */
1505 			if (node->ilist == NULL) {
1506 				ut_ad(node->ilist_size == 0);
1507 				continue;
1508 			}
1509 
1510 			error = fts_write_node(
1511 				trx, &graph, fts_table, word, node);
1512 
1513 			if (error != DB_SUCCESS) {
1514 				ib::error() << "(" << ut_strerr(error) << ")"
1515 					" during optimize, while adding a"
1516 					" word to the FTS index.";
1517 			}
1518 		}
1519 
1520 		ut_free(node->ilist);
1521 		node->ilist = NULL;
1522 		node->ilist_size = node->ilist_size_alloc = 0;
1523 	}
1524 
1525 	if (graph != NULL) {
1526 		fts_que_graph_free(graph);
1527 	}
1528 
1529 	return(error);
1530 }
1531 
1532 /**********************************************************************//**
1533 Free fts_optimizer_word_t instanace.*/
1534 void
fts_word_free(fts_word_t * word)1535 fts_word_free(
1536 /*==========*/
1537 	fts_word_t*	word)		/*!< in: instance to free.*/
1538 {
1539 	mem_heap_t*	heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1540 
1541 #ifdef UNIV_DEBUG
1542 	memset(word, 0, sizeof(*word));
1543 #endif /* UNIV_DEBUG */
1544 
1545 	mem_heap_free(heap);
1546 }
1547 
1548 /**********************************************************************//**
1549 Optimize the word ilist and rewrite data to the FTS index.
1550 @return status one of RESTART, EXIT, ERROR */
1551 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1552 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,ib_time_monotonic_t start_time)1553 fts_optimize_compact(
1554 /*=================*/
1555 	fts_optimize_t*	optim,		     /*!< in: optimize state data */
1556 	dict_index_t*	index,		     /*!< in: current FTS being
1557 					      optimized */
1558 	ib_time_monotonic_t	start_time)  /*!< in: optimize start time */
1559 {
1560 	ulint		i;
1561 	dberr_t		error = DB_SUCCESS;
1562 	ulint		size = ib_vector_size(optim->words);
1563 
1564 	for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1565 		fts_word_t*	word;
1566 		ib_vector_t*	nodes;
1567 		trx_t*		trx = optim->trx;
1568 
1569 		word = (fts_word_t*) ib_vector_get(optim->words, i);
1570 
1571 		/* nodes is allocated from the word heap and will be destroyed
1572 		when the word is freed. We however have to be careful about
1573 		the ilist, that needs to be freed explicitly. */
1574 		nodes = fts_optimize_word(optim, word);
1575 
1576 		/* Update the data on disk. */
1577 		error = fts_optimize_write_word(
1578 			trx, &optim->fts_index_table, &word->text, nodes);
1579 
1580 		if (error == DB_SUCCESS) {
1581 			/* Write the last word optimized to the config table,
1582 			we use this value for restarting optimize. */
1583 			error = fts_config_set_index_value(
1584 				optim->trx, index,
1585 				FTS_LAST_OPTIMIZED_WORD, &word->text);
1586 		}
1587 
1588 		/* Free the word that was optimized. */
1589 		fts_word_free(word);
1590 
1591 		if (fts_optimize_time_limit > 0
1592 		    && (ut_time_monotonic() - start_time) >
1593 		        fts_optimize_time_limit) {
1594 
1595 			optim->done = TRUE;
1596 		}
1597 	}
1598 
1599 	return(error);
1600 }
1601 
1602 /**********************************************************************//**
1603 Create an instance of fts_optimize_t. Also create a new
1604 background transaction.*/
1605 static
1606 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1607 fts_optimize_create(
1608 /*================*/
1609 	dict_table_t*	table)		/*!< in: table with FTS indexes */
1610 {
1611 	fts_optimize_t*	optim;
1612 	mem_heap_t*	heap = mem_heap_create(128);
1613 
1614 	optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1615 
1616 	optim->self_heap = ib_heap_allocator_create(heap);
1617 
1618 	optim->to_delete = fts_doc_ids_create();
1619 
1620 	optim->words = ib_vector_create(
1621 		optim->self_heap, sizeof(fts_word_t), 256);
1622 
1623 	optim->table = table;
1624 
1625 	optim->trx = trx_allocate_for_background();
1626 
1627 	optim->fts_common_table.parent = table->name.m_name;
1628 	optim->fts_common_table.table_id = table->id;
1629 	optim->fts_common_table.type = FTS_COMMON_TABLE;
1630 	optim->fts_common_table.table = table;
1631 
1632 	optim->fts_index_table.parent = table->name.m_name;
1633 	optim->fts_index_table.table_id = table->id;
1634 	optim->fts_index_table.type = FTS_INDEX_TABLE;
1635 	optim->fts_index_table.table = table;
1636 
1637 	/* The common prefix for all this parent table's aux tables. */
1638 	optim->name_prefix = fts_get_table_name_prefix(
1639 		&optim->fts_common_table);
1640 
1641 	return(optim);
1642 }
1643 
1644 #ifdef FTS_OPTIMIZE_DEBUG
1645 /**********************************************************************//**
1646 Get optimize start time of an FTS index.
1647 @return DB_SUCCESS if all OK else error code */
1648 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1649 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t * start_time)1650 fts_optimize_get_index_start_time(
1651 /*==============================*/
1652 	trx_t*		trx,			/*!< in: transaction */
1653 	dict_index_t*	index,			/*!< in: FTS index */
1654 	ib_time_t*	start_time)		/*!< out: time in secs */
1655 {
1656 	return(fts_config_get_index_ulint(
1657 		       trx, index, FTS_OPTIMIZE_START_TIME,
1658 		       (ulint*) start_time));
1659 }
1660 
1661 /**********************************************************************//**
1662 Set the optimize start time of an FTS index.
1663 @return DB_SUCCESS if all OK else error code */
1664 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1665 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t start_time)1666 fts_optimize_set_index_start_time(
1667 /*==============================*/
1668 	trx_t*		trx,			/*!< in: transaction */
1669 	dict_index_t*	index,			/*!< in: FTS index */
1670 	ib_time_t	start_time)		/*!< in: start time */
1671 {
1672 	return(fts_config_set_index_ulint(
1673 		       trx, index, FTS_OPTIMIZE_START_TIME,
1674 		       (ulint) start_time));
1675 }
1676 
1677 /**********************************************************************//**
1678 Get optimize end time of an FTS index.
1679 @return DB_SUCCESS if all OK else error code */
1680 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1681 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t * end_time)1682 fts_optimize_get_index_end_time(
1683 /*============================*/
1684 	trx_t*		trx,			/*!< in: transaction */
1685 	dict_index_t*	index,			/*!< in: FTS index */
1686 	ib_time_t*	end_time)		/*!< out: time in secs */
1687 {
1688 	return(fts_config_get_index_ulint(
1689 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1690 }
1691 
1692 /**********************************************************************//**
1693 Set the optimize end time of an FTS index.
1694 @return DB_SUCCESS if all OK else error code */
1695 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1696 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t end_time)1697 fts_optimize_set_index_end_time(
1698 /*============================*/
1699 	trx_t*		trx,			/*!< in: transaction */
1700 	dict_index_t*	index,			/*!< in: FTS index */
1701 	ib_time_t	end_time)		/*!< in: end time */
1702 {
1703 	return(fts_config_set_index_ulint(
1704 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1705 }
1706 #endif
1707 
1708 /**********************************************************************//**
1709 Free the optimize prepared statements.*/
1710 static
1711 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1712 fts_optimize_graph_free(
1713 /*====================*/
1714 	fts_optimize_graph_t*	graph)	/*!< in/out: The graph instances
1715 					to free */
1716 {
1717 	if (graph->commit_graph) {
1718 		que_graph_free(graph->commit_graph);
1719 		graph->commit_graph = NULL;
1720 	}
1721 
1722 	if (graph->write_nodes_graph) {
1723 		que_graph_free(graph->write_nodes_graph);
1724 		graph->write_nodes_graph = NULL;
1725 	}
1726 
1727 	if (graph->delete_nodes_graph) {
1728 		que_graph_free(graph->delete_nodes_graph);
1729 		graph->delete_nodes_graph = NULL;
1730 	}
1731 
1732 	if (graph->read_nodes_graph) {
1733 		que_graph_free(graph->read_nodes_graph);
1734 		graph->read_nodes_graph = NULL;
1735 	}
1736 }
1737 
1738 /**********************************************************************//**
1739 Free all optimize resources. */
1740 static
1741 void
fts_optimize_free(fts_optimize_t * optim)1742 fts_optimize_free(
1743 /*==============*/
1744 	fts_optimize_t*	optim)		/*!< in: table with on FTS index */
1745 {
1746 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1747 
1748 	trx_free_for_background(optim->trx);
1749 
1750 	fts_doc_ids_free(optim->to_delete);
1751 	fts_optimize_graph_free(&optim->graph);
1752 
1753 	ut_free(optim->name_prefix);
1754 
1755 	/* This will free the heap from which optim itself was allocated. */
1756 	mem_heap_free(heap);
1757 }
1758 
1759 /**********************************************************************//**
1760 Get the max time optimize should run in millisecs.
1761 @return max optimize time limit in millisecs. */
1762 static
1763 ib_time_t
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1764 fts_optimize_get_time_limit(
1765 /*========================*/
1766 	trx_t*		trx,			/*!< in: transaction */
1767 	fts_table_t*	fts_table)		/*!< in: aux table */
1768 {
1769 	ib_time_t	time_limit = 0;
1770 
1771 	fts_config_get_ulint(
1772 		trx, fts_table,
1773 		FTS_OPTIMIZE_LIMIT_IN_SECS, (ulint*) &time_limit);
1774 
1775 	return(time_limit * 1000);
1776 }
1777 
1778 
1779 /**********************************************************************//**
1780 Run OPTIMIZE on the given table. Note: this can take a very long time
1781 (hours). */
1782 static
1783 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1784 fts_optimize_words(
1785 /*===============*/
1786 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1787 	dict_index_t*	index,	/*!< in: current FTS being optimized */
1788 	fts_string_t*	word)	/*!< in: the starting word to optimize */
1789 {
1790 	fts_fetch_t	fetch;
1791 	ib_time_monotonic_t	start_time;
1792 	que_t*		graph = NULL;
1793 	CHARSET_INFO*	charset = optim->fts_index_table.charset;
1794 
1795 	ut_a(!optim->done);
1796 
1797 	/* Get the time limit from the config table. */
1798 	fts_optimize_time_limit = fts_optimize_get_time_limit(
1799 		optim->trx, &optim->fts_common_table);
1800 
1801 	start_time = ut_time_monotonic();
1802 
1803 	/* Setup the callback to use for fetching the word ilist etc. */
1804 	fetch.read_arg = optim->words;
1805 	fetch.read_record = fts_optimize_index_fetch_node;
1806 
1807 	ib::info().write(word->f_str, word->f_len);
1808 
1809 	while (!optim->done) {
1810 		dberr_t	error;
1811 		trx_t*	trx = optim->trx;
1812 		ulint	selected;
1813 
1814 		ut_a(ib_vector_size(optim->words) == 0);
1815 
1816 		selected = fts_select_index(charset, word->f_str, word->f_len);
1817 
1818 		/* Read the index records to optimize. */
1819 		fetch.total_memory = 0;
1820 		error = fts_index_fetch_nodes(
1821 			trx, &graph, &optim->fts_index_table, word,
1822 			&fetch);
1823 		ut_ad(fetch.total_memory < fts_result_cache_limit);
1824 
1825 		if (error == DB_SUCCESS) {
1826 			/* There must be some nodes to read. */
1827 			ut_a(ib_vector_size(optim->words) > 0);
1828 
1829 			/* Optimize the nodes that were read and write
1830 			back to DB. */
1831 			error = fts_optimize_compact(optim, index, start_time);
1832 
1833 			if (error == DB_SUCCESS) {
1834 				fts_sql_commit(optim->trx);
1835 			} else {
1836 				fts_sql_rollback(optim->trx);
1837 			}
1838 		}
1839 
1840 		ib_vector_reset(optim->words);
1841 
1842 		if (error == DB_SUCCESS) {
1843 			if (!optim->done) {
1844 				if (!fts_zip_read_word(optim->zip, word)) {
1845 					optim->done = TRUE;
1846 				} else if (selected
1847 					   != fts_select_index(
1848 						charset, word->f_str,
1849 						word->f_len)
1850 					  && graph) {
1851 					fts_que_graph_free(graph);
1852 					graph = NULL;
1853 				}
1854 			}
1855 		} else if (error == DB_LOCK_WAIT_TIMEOUT) {
1856 			ib::warn() << "Lock wait timeout during optimize."
1857 				" Retrying!";
1858 
1859 			trx->error_state = DB_SUCCESS;
1860 		} else if (error == DB_DEADLOCK) {
1861 			ib::warn() << "Deadlock during optimize. Retrying!";
1862 
1863 			trx->error_state = DB_SUCCESS;
1864 		} else {
1865 			optim->done = TRUE;		/* Exit the loop. */
1866 		}
1867 	}
1868 
1869 	if (graph != NULL) {
1870 		fts_que_graph_free(graph);
1871 	}
1872 }
1873 
1874 /**********************************************************************//**
1875 Optimize is complete. Set the completion time, and reset the optimize
1876 start string for this FTS index to "".
1877 @return DB_SUCCESS if all OK */
1878 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1879 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1880 fts_optimize_index_completed(
1881 /*=========================*/
1882 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1883 	dict_index_t*	index)	/*!< in: table with one FTS index */
1884 {
1885 	fts_string_t	word;
1886 	dberr_t		error;
1887 	byte		buf[sizeof(ulint)];
1888 #ifdef FTS_OPTIMIZE_DEBUG
1889 	ib_time_t	end_time = ut_time();
1890 
1891 	error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1892 #endif
1893 
1894 	/* If we've reached the end of the index then set the start
1895 	word to the empty string. */
1896 
1897 	word.f_len = 0;
1898 	word.f_str = buf;
1899 	*word.f_str = '\0';
1900 
1901 	error = fts_config_set_index_value(
1902 		optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1903 
1904 	if (error != DB_SUCCESS) {
1905 
1906 		ib::error() << "(" << ut_strerr(error) << ") while updating"
1907 			" last optimized word!";
1908 	}
1909 
1910 	return(error);
1911 }
1912 
1913 
1914 /**********************************************************************//**
1915 Read the list of words from the FTS auxiliary index that will be
1916 optimized in this pass.
1917 @return DB_SUCCESS if all OK */
1918 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1919 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1920 fts_optimize_index_read_words(
1921 /*==========================*/
1922 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1923 	dict_index_t*	index,	/*!< in: table with one FTS index */
1924 	fts_string_t*	word)	/*!< in: buffer to use */
1925 {
1926 	dberr_t	error = DB_SUCCESS;
1927 
1928 	if (optim->del_list_regenerated) {
1929 		word->f_len = 0;
1930 	} else {
1931 
1932 		/* Get the last word that was optimized from
1933 		the config table. */
1934 		error = fts_config_get_index_value(
1935 			optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1936 	}
1937 
1938 	/* If record not found then we start from the top. */
1939 	if (error == DB_RECORD_NOT_FOUND) {
1940 		word->f_len = 0;
1941 		error = DB_SUCCESS;
1942 	}
1943 
1944 	while (error == DB_SUCCESS) {
1945 
1946 		error = fts_index_fetch_words(
1947 			optim, word, fts_num_word_optimize);
1948 
1949 		if (error == DB_SUCCESS) {
1950 			/* Reset the last optimized word to '' if no
1951 			more words could be read from the FTS index. */
1952 			if (optim->zip->n_words == 0) {
1953 				word->f_len = 0;
1954 				*word->f_str = 0;
1955 			}
1956 
1957 			break;
1958 		}
1959 	}
1960 
1961 	return(error);
1962 }
1963 
1964 /**********************************************************************//**
1965 Run OPTIMIZE on the given FTS index. Note: this can take a very long
1966 time (hours).
1967 @return DB_SUCCESS if all OK */
1968 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1969 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)1970 fts_optimize_index(
1971 /*===============*/
1972 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1973 	dict_index_t*	index)	/*!< in: table with one FTS index */
1974 {
1975 	fts_string_t	word;
1976 	dberr_t		error;
1977 	byte		str[FTS_MAX_WORD_LEN + 1];
1978 
1979 	/* Set the current index that we have to optimize. */
1980 	optim->fts_index_table.index_id = index->id;
1981 	optim->fts_index_table.charset = fts_index_get_charset(index);
1982 
1983 	optim->done = FALSE; /* Optimize until !done */
1984 
1985 	/* We need to read the last word optimized so that we start from
1986 	the next word. */
1987 	word.f_str = str;
1988 
1989 	/* We set the length of word to the size of str since we
1990 	need to pass the max len info to the fts_get_config_value() function. */
1991 	word.f_len = sizeof(str) - 1;
1992 
1993 	memset(word.f_str, 0x0, word.f_len);
1994 
1995 	/* Read the words that will be optimized in this pass. */
1996 	error = fts_optimize_index_read_words(optim, index, &word);
1997 
1998 	if (error == DB_SUCCESS) {
1999 		int	zip_error;
2000 
2001 		ut_a(optim->zip->pos == 0);
2002 		ut_a(optim->zip->zp->total_in == 0);
2003 		ut_a(optim->zip->zp->total_out == 0);
2004 
2005 		zip_error = inflateInit(optim->zip->zp);
2006 		ut_a(zip_error == Z_OK);
2007 
2008 		word.f_len = 0;
2009 		word.f_str = str;
2010 
2011 		/* Read the first word to optimize from the Zip buffer. */
2012 		if (!fts_zip_read_word(optim->zip, &word)) {
2013 
2014 			optim->done = TRUE;
2015 		} else {
2016 			fts_optimize_words(optim, index, &word);
2017 		}
2018 
2019 		/* If we couldn't read any records then optimize is
2020 		complete. Increment the number of indexes that have
2021 		been optimized and set FTS index optimize state to
2022 		completed. */
2023 		if (error == DB_SUCCESS && optim->zip->n_words == 0) {
2024 
2025 			error = fts_optimize_index_completed(optim, index);
2026 
2027 			if (error == DB_SUCCESS) {
2028 				++optim->n_completed;
2029 			}
2030 		}
2031 	}
2032 
2033 	return(error);
2034 }
2035 
2036 /**********************************************************************//**
2037 Delete the document ids in the delete, and delete cache tables.
2038 @return DB_SUCCESS if all OK */
2039 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2040 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2041 fts_optimize_purge_deleted_doc_ids(
2042 /*===============================*/
2043 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2044 {
2045 	ulint		i;
2046 	pars_info_t*	info;
2047 	que_t*		graph;
2048 	fts_update_t*	update;
2049 	doc_id_t	write_doc_id;
2050 	dberr_t		error = DB_SUCCESS;
2051 	char		deleted[MAX_FULL_NAME_LEN];
2052 	char		deleted_cache[MAX_FULL_NAME_LEN];
2053 
2054 	info = pars_info_create();
2055 
2056 	ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2057 
2058 	update = static_cast<fts_update_t*>(
2059 		ib_vector_get(optim->to_delete->doc_ids, 0));
2060 
2061 	/* Convert to "storage" byte order. */
2062 	fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2063 
2064 	/* This is required for the SQL parser to work. It must be able
2065 	to find the following variables. So we do it twice. */
2066 	fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2067 	fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2068 
2069 	/* Make sure the following two names are consistent with the name
2070 	used in the fts_delete_doc_ids_sql */
2071 	optim->fts_common_table.suffix = fts_common_tables[3];
2072 	fts_get_table_name(&optim->fts_common_table, deleted);
2073 	pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2074 
2075 	optim->fts_common_table.suffix = fts_common_tables[4];
2076 	fts_get_table_name(&optim->fts_common_table, deleted_cache);
2077 	pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2078 
2079 	graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
2080 
2081 	/* Delete the doc ids that were copied at the start. */
2082 	for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2083 
2084 		update = static_cast<fts_update_t*>(ib_vector_get(
2085 			optim->to_delete->doc_ids, i));
2086 
2087 		/* Convert to "storage" byte order. */
2088 		fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2089 
2090 		fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2091 
2092 		fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2093 
2094 		error = fts_eval_sql(optim->trx, graph);
2095 
2096 		// FIXME: Check whether delete actually succeeded!
2097 		if (error != DB_SUCCESS) {
2098 
2099 			fts_sql_rollback(optim->trx);
2100 			break;
2101 		}
2102 	}
2103 
2104 	fts_que_graph_free(graph);
2105 
2106 	return(error);
2107 }
2108 
2109 /**********************************************************************//**
2110 Delete the document ids in the pending delete, and delete tables.
2111 @return DB_SUCCESS if all OK */
2112 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2113 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2114 fts_optimize_purge_deleted_doc_id_snapshot(
2115 /*=======================================*/
2116 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2117 {
2118 	dberr_t		error;
2119 	que_t*		graph;
2120 	pars_info_t*	info;
2121 	char		being_deleted[MAX_FULL_NAME_LEN];
2122 	char		being_deleted_cache[MAX_FULL_NAME_LEN];
2123 
2124 	info = pars_info_create();
2125 
2126 	/* Make sure the following two names are consistent with the name
2127 	used in the fts_end_delete_sql */
2128 	optim->fts_common_table.suffix = fts_common_tables[0];
2129 	fts_get_table_name(&optim->fts_common_table, being_deleted);
2130 	pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2131 
2132 	optim->fts_common_table.suffix = fts_common_tables[1];
2133 	fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2134 	pars_info_bind_id(info, true, fts_common_tables[1],
2135 			  being_deleted_cache);
2136 
2137 	/* Delete the doc ids that were copied to delete pending state at
2138 	the start of optimize. */
2139 	graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
2140 
2141 	error = fts_eval_sql(optim->trx, graph);
2142 	fts_que_graph_free(graph);
2143 
2144 	return(error);
2145 }
2146 
2147 /**********************************************************************//**
2148 Copy the deleted doc ids that will be purged during this optimize run
2149 to the being deleted FTS auxiliary tables. The transaction is committed
2150 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2151 @return DB_SUCCESS if all OK */
2152 static
2153 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2154 fts_optimize_being_deleted_count(
2155 /*=============================*/
2156 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2157 {
2158 	fts_table_t	fts_table;
2159 
2160 	FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2161 			   optim->table);
2162 
2163 	return(fts_get_rows_count(&fts_table));
2164 }
2165 
2166 /*********************************************************************//**
2167 Copy the deleted doc ids that will be purged during this optimize run
2168 to the being deleted FTS auxiliary tables. The transaction is committed
2169 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2170 @return DB_SUCCESS if all OK */
2171 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2172 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2173 fts_optimize_create_deleted_doc_id_snapshot(
2174 /*========================================*/
2175 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2176 {
2177 	dberr_t		error;
2178 	que_t*		graph;
2179 	pars_info_t*	info;
2180 	char		being_deleted[MAX_FULL_NAME_LEN];
2181 	char		deleted[MAX_FULL_NAME_LEN];
2182 	char		being_deleted_cache[MAX_FULL_NAME_LEN];
2183 	char		deleted_cache[MAX_FULL_NAME_LEN];
2184 
2185 	info = pars_info_create();
2186 
2187 	/* Make sure the following four names are consistent with the name
2188 	used in the fts_init_delete_sql */
2189 	optim->fts_common_table.suffix = fts_common_tables[0];
2190 	fts_get_table_name(&optim->fts_common_table, being_deleted);
2191 	pars_info_bind_id(info, true, fts_common_tables[0], being_deleted);
2192 
2193 	optim->fts_common_table.suffix = fts_common_tables[3];
2194 	fts_get_table_name(&optim->fts_common_table, deleted);
2195 	pars_info_bind_id(info, true, fts_common_tables[3], deleted);
2196 
2197 	optim->fts_common_table.suffix = fts_common_tables[1];
2198 	fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2199 	pars_info_bind_id(info, true, fts_common_tables[1],
2200 			  being_deleted_cache);
2201 
2202 	optim->fts_common_table.suffix = fts_common_tables[4];
2203 	fts_get_table_name(&optim->fts_common_table, deleted_cache);
2204 	pars_info_bind_id(info, true, fts_common_tables[4], deleted_cache);
2205 
2206 	/* Move doc_ids that are to be deleted to state being deleted. */
2207 	graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
2208 
2209 	error = fts_eval_sql(optim->trx, graph);
2210 
2211 	fts_que_graph_free(graph);
2212 
2213 	if (error != DB_SUCCESS) {
2214 		fts_sql_rollback(optim->trx);
2215 	} else {
2216 		fts_sql_commit(optim->trx);
2217 	}
2218 
2219 	optim->del_list_regenerated = TRUE;
2220 
2221 	return(error);
2222 }
2223 
2224 /*********************************************************************//**
2225 Read in the document ids that are to be purged during optimize. The
2226 transaction is committed upon successfully read.
2227 @return DB_SUCCESS if all OK */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2230 fts_optimize_read_deleted_doc_id_snapshot(
2231 /*======================================*/
2232 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2233 {
2234 	dberr_t		error;
2235 
2236 	optim->fts_common_table.suffix = "BEING_DELETED";
2237 
2238 	/* Read the doc_ids to delete. */
2239 	error = fts_table_fetch_doc_ids(
2240 		optim->trx, &optim->fts_common_table, optim->to_delete);
2241 
2242 	if (error == DB_SUCCESS) {
2243 
2244 		optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2245 
2246 		/* Read additional doc_ids to delete. */
2247 		error = fts_table_fetch_doc_ids(
2248 			optim->trx, &optim->fts_common_table, optim->to_delete);
2249 	}
2250 
2251 	if (error != DB_SUCCESS) {
2252 
2253 		fts_doc_ids_free(optim->to_delete);
2254 		optim->to_delete = NULL;
2255 	}
2256 
2257 	return(error);
2258 }
2259 
2260 /*********************************************************************//**
2261 Optimze all the FTS indexes, skipping those that have already been
2262 optimized, since the FTS auxiliary indexes are not guaranteed to be
2263 of the same cardinality.
2264 @return DB_SUCCESS if all OK */
2265 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2266 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2267 fts_optimize_indexes(
2268 /*=================*/
2269 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2270 {
2271 	ulint		i;
2272 	dberr_t		error = DB_SUCCESS;
2273 	fts_t*		fts = optim->table->fts;
2274 
2275 	/* Optimize the FTS indexes. */
2276 	for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2277 		dict_index_t*	index;
2278 
2279 #ifdef	FTS_OPTIMIZE_DEBUG
2280 		ib_time_t	end_time;
2281 		ib_time_t	start_time;
2282 
2283 		/* Get the start and end optimize times for this index. */
2284 		error = fts_optimize_get_index_start_time(
2285 			optim->trx, index, &start_time);
2286 
2287 		if (error != DB_SUCCESS) {
2288 			break;
2289 		}
2290 
2291 		error = fts_optimize_get_index_end_time(
2292 			optim->trx, index, &end_time);
2293 
2294 		if (error != DB_SUCCESS) {
2295 			break;
2296 		}
2297 
2298 		/* Start time will be 0 only for the first time or after
2299 		completing the optimization of all FTS indexes. */
2300 		if (start_time == 0) {
2301 			start_time = ut_time();
2302 
2303 			error = fts_optimize_set_index_start_time(
2304 				optim->trx, index, start_time);
2305 		}
2306 
2307 		/* Check if this index needs to be optimized or not. */
2308 		if (ut_difftime(end_time, start_time) < 0) {
2309 			error = fts_optimize_index(optim, index);
2310 
2311 			if (error != DB_SUCCESS) {
2312 				break;
2313 			}
2314 		} else {
2315 			++optim->n_completed;
2316 		}
2317 #endif
2318 		index = static_cast<dict_index_t*>(
2319 			ib_vector_getp(fts->indexes, i));
2320 		error = fts_optimize_index(optim, index);
2321 	}
2322 
2323 	if (error == DB_SUCCESS) {
2324 		fts_sql_commit(optim->trx);
2325 	} else {
2326 		fts_sql_rollback(optim->trx);
2327 	}
2328 
2329 	return(error);
2330 }
2331 
2332 /*********************************************************************//**
2333 Cleanup the snapshot tables and the master deleted table.
2334 @return DB_SUCCESS if all OK */
2335 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2336 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2337 fts_optimize_purge_snapshot(
2338 /*========================*/
2339 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2340 {
2341 	dberr_t		error;
2342 
2343 	/* Delete the doc ids from the master deleted tables, that were
2344 	in the snapshot that was taken at the start of optimize. */
2345 	error = fts_optimize_purge_deleted_doc_ids(optim);
2346 
2347 	if (error == DB_SUCCESS) {
2348 		/* Destroy the deleted doc id snapshot. */
2349 		error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2350 	}
2351 
2352 	if (error == DB_SUCCESS) {
2353 		fts_sql_commit(optim->trx);
2354 	} else {
2355 		fts_sql_rollback(optim->trx);
2356 	}
2357 
2358 	return(error);
2359 }
2360 
2361 /*********************************************************************//**
2362 Reset the start time to 0 so that a new optimize can be started.
2363 @return DB_SUCCESS if all OK */
2364 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2365 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2366 fts_optimize_reset_start_time(
2367 /*==========================*/
2368 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2369 {
2370 	dberr_t		error = DB_SUCCESS;
2371 #ifdef FTS_OPTIMIZE_DEBUG
2372 	fts_t*		fts = optim->table->fts;
2373 
2374 	/* Optimization should have been completed for all indexes. */
2375 	ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2376 
2377 	for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2378 		dict_index_t*	index;
2379 
2380 		ib_time_t	start_time = 0;
2381 
2382 		/* Reset the start time to 0 for this index. */
2383 		error = fts_optimize_set_index_start_time(
2384 			optim->trx, index, start_time);
2385 
2386 		index = static_cast<dict_index_t*>(
2387 			ib_vector_getp(fts->indexes, i));
2388 	}
2389 #endif
2390 
2391 	if (error == DB_SUCCESS) {
2392 		fts_sql_commit(optim->trx);
2393 	} else {
2394 		fts_sql_rollback(optim->trx);
2395 	}
2396 
2397 	return(error);
2398 }
2399 
2400 /*********************************************************************//**
2401 Run OPTIMIZE on the given table by a background thread.
2402 @return DB_SUCCESS if all OK */
2403 static MY_ATTRIBUTE((nonnull))
2404 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2405 fts_optimize_table_bk(
2406 /*==================*/
2407 	fts_slot_t*	slot)	/*!< in: table to optimiza */
2408 {
2409 	dberr_t		error;
2410 	dict_table_t*	table = slot->table;
2411 	fts_t*		fts = table->fts;
2412 
2413 	/* Avoid optimizing tables that were optimized recently. */
2414 	if (slot->last_run > 0
2415 	    && (ut_time_monotonic() - slot->last_run) < slot->interval_time) {
2416 
2417 		return(DB_SUCCESS);
2418 
2419 	} else if (fts && fts->cache
2420 		   && fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2421 
2422 		error = fts_optimize_table(table);
2423 
2424 		if (error == DB_SUCCESS) {
2425 			slot->state = FTS_STATE_DONE;
2426 			slot->last_run = 0;
2427 			slot->completed = ut_time_monotonic();
2428 		}
2429 	} else {
2430 		error = DB_SUCCESS;
2431 	}
2432 
2433 	/* Note time this run completed. */
2434 	slot->last_run = ut_time_monotonic();
2435 
2436 	return(error);
2437 }
2438 /*********************************************************************//**
2439 Run OPTIMIZE on the given table.
2440 @return DB_SUCCESS if all OK */
2441 dberr_t
fts_optimize_table(dict_table_t * table)2442 fts_optimize_table(
2443 /*===============*/
2444 	dict_table_t*	table)	/*!< in: table to optimiza */
2445 {
2446 	dberr_t		error = DB_SUCCESS;
2447 	fts_optimize_t*	optim = NULL;
2448 	fts_t*		fts = table->fts;
2449 
2450 	if (fts_enable_diag_print) {
2451 		ib::info() << "FTS start optimize " << table->name;
2452 	}
2453 
2454 	optim = fts_optimize_create(table);
2455 
2456 	// FIXME: Call this only at the start of optimize, currently we
2457 	// rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2458 
2459 	/* Check whether there are still records in BEING_DELETED table */
2460 	if (fts_optimize_being_deleted_count(optim) == 0) {
2461 		/* Take a snapshot of the deleted document ids, they are copied
2462 		to the BEING_ tables. */
2463 		error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2464 	}
2465 
2466 	/* A duplicate error is OK, since we don't erase the
2467 	doc ids from the being deleted state until all FTS
2468 	indexes have been optimized. */
2469 	if (error == DB_DUPLICATE_KEY) {
2470 		error = DB_SUCCESS;
2471 	}
2472 
2473 	if (error == DB_SUCCESS) {
2474 
2475 		/* These document ids will be filtered out during the
2476 		index optimization phase. They are in the snapshot that we
2477 		took above, at the start of the optimize. */
2478 		error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2479 
2480 		if (error == DB_SUCCESS) {
2481 
2482 			/* Commit the read of being deleted
2483 			doc ids transaction. */
2484 			fts_sql_commit(optim->trx);
2485 
2486 			/* We would do optimization only if there
2487 			are deleted records to be cleaned up */
2488 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2489 				error = fts_optimize_indexes(optim);
2490 			}
2491 
2492 		} else {
2493 			ut_a(optim->to_delete == NULL);
2494 		}
2495 
2496 		/* Only after all indexes have been optimized can we
2497 		delete the (snapshot) doc ids in the pending delete,
2498 		and master deleted tables. */
2499 		if (error == DB_SUCCESS
2500 		    && optim->n_completed == ib_vector_size(fts->indexes)) {
2501 
2502 			if (fts_enable_diag_print) {
2503 				ib::info() << "FTS_OPTIMIZE: Completed"
2504 					" Optimize, cleanup DELETED table";
2505 			}
2506 
2507 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2508 
2509 				/* Purge the doc ids that were in the
2510 				snapshot from the snapshot tables and
2511 				the master deleted table. */
2512 				error = fts_optimize_purge_snapshot(optim);
2513 			}
2514 
2515 			if (error == DB_SUCCESS) {
2516 				/* Reset the start time of all the FTS indexes
2517 				so that optimize can be restarted. */
2518 				error = fts_optimize_reset_start_time(optim);
2519 			}
2520 		}
2521 	}
2522 
2523 	fts_optimize_free(optim);
2524 
2525 	if (fts_enable_diag_print) {
2526 		ib::info() << "FTS end optimize " << table->name;
2527 	}
2528 
2529 	return(error);
2530 }
2531 
2532 /********************************************************************//**
2533 Add the table to add to the OPTIMIZER's list.
2534 @return new message instance */
2535 static
2536 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2537 fts_optimize_create_msg(
2538 /*====================*/
2539 	fts_msg_type_t	type,		/*!< in: type of message */
2540 	void*		ptr)		/*!< in: message payload */
2541 {
2542 	mem_heap_t*	heap;
2543 	fts_msg_t*	msg;
2544 
2545 	heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2546 	msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2547 
2548 	msg->ptr = ptr;
2549 	msg->type = type;
2550 	msg->heap = heap;
2551 
2552 	return(msg);
2553 }
2554 
2555 /** Add the table to add to the OPTIMIZER's list.
2556 @param[in]	table	table to add */
2557 void
fts_optimize_add_table(dict_table_t * table)2558 fts_optimize_add_table(
2559 	dict_table_t*	table)
2560 {
2561 	fts_msg_t*	msg;
2562 
2563 	if (!fts_optimize_is_init()) {
2564 		return;
2565 	}
2566 
2567 	/* Make sure table with FTS index cannot be evicted */
2568 	dict_table_prevent_eviction(table);
2569 
2570 	msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2571 
2572 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2573 }
2574 
2575 /**********************************************************************//**
2576 Remove the table from the OPTIMIZER's list. We do wait for
2577 acknowledgement from the consumer of the message. */
2578 void
fts_optimize_remove_table(dict_table_t * table)2579 fts_optimize_remove_table(
2580 /*======================*/
2581 	dict_table_t*	table)			/*!< in: table to remove */
2582 {
2583 	fts_msg_t*	msg;
2584 	os_event_t	event;
2585 	fts_msg_del_t*	remove;
2586 
2587 	/* if the optimize system not yet initialized, return */
2588 	if (!fts_optimize_is_init()) {
2589 		return;
2590 	}
2591 
2592 	/* FTS optimizer thread is already exited */
2593 	if (fts_opt_start_shutdown) {
2594 		ib::info() << "Try to remove table " << table->name
2595 			<< " after FTS optimize thread exiting.";
2596 		return;
2597 	}
2598 
2599 	msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2600 
2601 	/* We will wait on this event until signalled by the consumer. */
2602 	event = os_event_create(0);
2603 
2604 	remove = static_cast<fts_msg_del_t*>(
2605 		mem_heap_alloc(msg->heap, sizeof(*remove)));
2606 
2607 	remove->table = table;
2608 	remove->event = event;
2609 	msg->ptr = remove;
2610 
2611 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2612 
2613 	os_event_wait(event);
2614 
2615 	os_event_destroy(event);
2616 }
2617 
2618 /** Send sync fts cache for the table.
2619 @param[in]	table	table to sync */
2620 void
fts_optimize_request_sync_table(dict_table_t * table)2621 fts_optimize_request_sync_table(
2622 	dict_table_t*	table)
2623 {
2624 	fts_msg_t*	msg;
2625 	table_id_t*	table_id;
2626 
2627 	/* if the optimize system not yet initialized, return */
2628 	if (!fts_optimize_is_init()) {
2629 		return;
2630 	}
2631 
2632 	/* FTS optimizer thread is already exited */
2633 	if (fts_opt_start_shutdown) {
2634 		ib::info() << "Try to sync table " << table->name
2635 			<< " after FTS optimize thread exiting.";
2636 		return;
2637 	}
2638 
2639 	msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
2640 
2641 	table_id = static_cast<table_id_t*>(
2642 		mem_heap_alloc(msg->heap, sizeof(table_id_t)));
2643 	*table_id = table->id;
2644 	msg->ptr = table_id;
2645 
2646 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2647 	DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
2648 	    if (ib_wqueue_get_count(fts_optimize_wq) > 1000) {
2649 	    DBUG_SUICIDE(); });
2650 }
2651 
2652 /**********************************************************************//**
2653 Add the table to the vector if it doesn't already exist. */
2654 static
2655 ibool
fts_optimize_new_table(ib_vector_t * tables,dict_table_t * table)2656 fts_optimize_new_table(
2657 /*===================*/
2658 	ib_vector_t*	tables,			/*!< in/out: vector of tables */
2659 	dict_table_t*	table)			/*!< in: table to add */
2660 {
2661 	ulint		i;
2662 	fts_slot_t*	slot;
2663 	ulint		empty_slot = ULINT_UNDEFINED;
2664 
2665 	/* Search for duplicates, also find a free slot if one exists. */
2666 	for (i = 0; i < ib_vector_size(tables); ++i) {
2667 
2668 		slot = static_cast<fts_slot_t*>(
2669 			ib_vector_get(tables, i));
2670 
2671 		if (slot->state == FTS_STATE_EMPTY) {
2672 			empty_slot = i;
2673 		} else if (slot->table == table) {
2674 			/* Already exists in our optimize queue. */
2675 			ut_ad(slot->table_id = table->id);
2676 			return(FALSE);
2677 		}
2678 	}
2679 
2680 	/* Reuse old slot. */
2681 	if (empty_slot != ULINT_UNDEFINED) {
2682 
2683 		slot = static_cast<fts_slot_t*>(
2684 			ib_vector_get(tables, empty_slot));
2685 
2686 		ut_a(slot->state == FTS_STATE_EMPTY);
2687 
2688 	} else { /* Create a new slot. */
2689 
2690 		slot = static_cast<fts_slot_t*>(ib_vector_push(tables, NULL));
2691 	}
2692 
2693 	memset(slot, 0x0, sizeof(*slot));
2694 
2695 	slot->table = table;
2696 	slot->table_id = table->id;
2697 	slot->state = FTS_STATE_LOADED;
2698 	slot->interval_time = FTS_OPTIMIZE_INTERVAL_IN_SECS;
2699 
2700 	return(TRUE);
2701 }
2702 
2703 /**********************************************************************//**
2704 Remove the table from the vector if it exists. */
2705 static
2706 ibool
fts_optimize_del_table(ib_vector_t * tables,fts_msg_del_t * msg)2707 fts_optimize_del_table(
2708 /*===================*/
2709 	ib_vector_t*	tables,			/*!< in/out: vector of tables */
2710 	fts_msg_del_t*	msg)			/*!< in: table to delete */
2711 {
2712 	ulint		i;
2713 	dict_table_t*	table = msg->table;
2714 
2715 	for (i = 0; i < ib_vector_size(tables); ++i) {
2716 		fts_slot_t*	slot;
2717 
2718 		slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2719 
2720 		if (slot->state != FTS_STATE_EMPTY
2721 		    && slot->table == table) {
2722 
2723 			if (fts_enable_diag_print) {
2724 				ib::info() << "FTS Optimize Removing table "
2725 					<< table->name;
2726 			}
2727 
2728 			slot->table = NULL;
2729 			slot->state = FTS_STATE_EMPTY;
2730 
2731 			return(TRUE);
2732 		}
2733 	}
2734 
2735 	return(FALSE);
2736 }
2737 
2738 /**********************************************************************//**
2739 Calculate how many of the registered tables need to be optimized.
2740 @return no. of tables to optimize */
2741 static
2742 ulint
fts_optimize_how_many(const ib_vector_t * tables)2743 fts_optimize_how_many(
2744 /*==================*/
2745 	const ib_vector_t*	tables)		/*!< in: registered tables
2746 						vector*/
2747 {
2748 	ulint		i;
2749 	ib_time_monotonic_t	delta;
2750 	ulint		n_tables = 0;
2751 	ib_time_monotonic_t	current_time;
2752 
2753 	current_time = ut_time_monotonic();
2754 
2755 	for (i = 0; i < ib_vector_size(tables); ++i) {
2756 		const fts_slot_t*	slot;
2757 
2758 		slot = static_cast<const fts_slot_t*>(
2759 			ib_vector_get_const(tables, i));
2760 
2761 		switch (slot->state) {
2762 		case FTS_STATE_DONE:
2763 		case FTS_STATE_LOADED:
2764 			ut_a(slot->completed <= current_time);
2765 
2766 			delta = current_time - slot->completed;
2767 
2768 			/* Skip slots that have been optimized recently. */
2769 			if (delta >= slot->interval_time) {
2770 				++n_tables;
2771 			}
2772 			break;
2773 
2774 		case FTS_STATE_RUNNING:
2775 			ut_a(slot->last_run <= current_time);
2776 
2777 			delta = current_time - slot->last_run;
2778 
2779 			if (delta > slot->interval_time) {
2780 				++n_tables;
2781 			}
2782 			break;
2783 
2784 			/* Slots in a state other than the above
2785 			are ignored. */
2786 		case FTS_STATE_EMPTY:
2787 		case FTS_STATE_SUSPENDED:
2788 			break;
2789 		}
2790 
2791 	}
2792 
2793 	return(n_tables);
2794 }
2795 
2796 /**********************************************************************//**
2797 Check if the total memory used by all FTS table exceeds the maximum limit.
2798 @return true if a sync is needed, false otherwise */
2799 static
2800 bool
fts_is_sync_needed(const ib_vector_t * tables)2801 fts_is_sync_needed(
2802 /*===============*/
2803 	const ib_vector_t*	tables)		/*!< in: registered tables
2804 						vector*/
2805 {
2806 	ulint	total_memory = 0;
2807 	uint64_t time_diff = ut_time_monotonic() - last_check_sync_time;
2808 
2809 	if (fts_need_sync || time_diff < 5) {
2810 		return(false);
2811 	}
2812 
2813 	last_check_sync_time = ut_time_monotonic();
2814 
2815 	for (ulint i = 0; i < ib_vector_size(tables); ++i) {
2816 		const fts_slot_t*	slot;
2817 
2818 		slot = static_cast<const fts_slot_t*>(
2819 			ib_vector_get_const(tables, i));
2820 
2821 		if (slot->state != FTS_STATE_EMPTY && slot->table
2822 		    && slot->table->fts && slot->table->fts->cache) {
2823 			total_memory += slot->table->fts->cache->total_size;
2824 		}
2825 
2826 		if (total_memory > fts_max_total_cache_size) {
2827 			return(true);
2828 		}
2829 	}
2830 
2831 	return(false);
2832 }
2833 
2834 /** Sync fts cache of a table
2835 @param[in]	table_id	table id */
2836 void
fts_optimize_sync_table(table_id_t table_id)2837 fts_optimize_sync_table(
2838 	table_id_t	table_id)
2839 {
2840 	dict_table_t*   table = NULL;
2841 
2842 	table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL);
2843 
2844 	if (table) {
2845 		if (dict_table_has_fts_index(table) && table->fts->cache) {
2846 			fts_sync_table(table, true, false, false);
2847 		}
2848 
2849 		dict_table_close(table, FALSE, FALSE);
2850 	}
2851 }
2852 
2853 /**********************************************************************//**
2854 Optimize all FTS tables.
2855 @return Dummy return */
2856 os_thread_ret_t
fts_optimize_thread(void * arg)2857 fts_optimize_thread(
2858 /*================*/
2859 	void*		arg)			/*!< in: work queue*/
2860 {
2861 	ulint		current = 0;
2862 	ibool		done = FALSE;
2863 	ulint		n_tables = 0;
2864 	ulint		n_optimize = 0;
2865 	ib_wqueue_t*	wq = (ib_wqueue_t*) arg;
2866 
2867 	ut_ad(!srv_read_only_mode);
2868 	my_thread_init();
2869 
2870 	ut_ad(fts_slots);
2871 
2872 	/* Assign number of tables added in fts_slots_t to n_tables */
2873 	n_tables = ib_vector_size(fts_slots);
2874 
2875 	while (!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
2876 
2877 		/* If there is no message in the queue and we have tables
2878 		to optimize then optimize the tables. */
2879 
2880 		if (!done
2881 		    && ib_wqueue_is_empty(wq)
2882 		    && n_tables > 0
2883 		    && n_optimize > 0) {
2884 
2885 			fts_slot_t*	slot;
2886 
2887 			ut_a(ib_vector_size(fts_slots) > 0);
2888 
2889 			slot = static_cast<fts_slot_t*>(
2890 				ib_vector_get(fts_slots, current));
2891 
2892 			/* Handle the case of empty slots. */
2893 			if (slot->state != FTS_STATE_EMPTY) {
2894 
2895 				slot->state = FTS_STATE_RUNNING;
2896 
2897 				fts_optimize_table_bk(slot);
2898 			}
2899 
2900 			++current;
2901 
2902 			/* Wrap around the counter. */
2903 			if (current >= ib_vector_size(fts_slots)) {
2904 				n_optimize = fts_optimize_how_many(fts_slots);
2905 
2906 				current = 0;
2907 			}
2908 
2909 		} else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
2910 			fts_msg_t*	msg;
2911 
2912 			msg = static_cast<fts_msg_t*>(
2913 				ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS));
2914 
2915 			/* Timeout ? */
2916 			if (msg == NULL) {
2917 				if (fts_is_sync_needed(fts_slots)) {
2918 					fts_need_sync = true;
2919 				}
2920 
2921 				continue;
2922 			}
2923 
2924 			switch (msg->type) {
2925 			case FTS_MSG_STOP:
2926 				done = TRUE;
2927 				break;
2928 
2929 			case FTS_MSG_ADD_TABLE:
2930 				ut_a(!done);
2931 				if (fts_optimize_new_table(
2932 					fts_slots,
2933 					static_cast<dict_table_t*>(msg->ptr))) {
2934 					++n_tables;
2935 				}
2936 				break;
2937 
2938 			case FTS_MSG_DEL_TABLE:
2939 				if (fts_optimize_del_table(
2940 					fts_slots, static_cast<fts_msg_del_t*>(msg->ptr))) {
2941 					--n_tables;
2942 				}
2943 
2944 				/* Signal the producer that we have
2945 				removed the table. */
2946 				os_event_set(((fts_msg_del_t*) msg->ptr)->event);
2947 				break;
2948 
2949 			case FTS_MSG_SYNC_TABLE:
2950 				DBUG_EXECUTE_IF("fts_instrument_msg_sync_sleep",
2951 					os_thread_sleep(300000);
2952 				);
2953 
2954 				fts_optimize_sync_table(
2955 					*static_cast<table_id_t*>(msg->ptr));
2956 				break;
2957 
2958 			default:
2959 				ut_error;
2960 			}
2961 
2962 			mem_heap_free(msg->heap);
2963 
2964 			if (!done) {
2965 				n_optimize = fts_optimize_how_many(fts_slots);
2966 			} else {
2967 				n_optimize = 0;
2968 			}
2969 		}
2970 	}
2971 
2972 	/* Server is being shutdown, sync the data from FTS cache to disk
2973 	if needed */
2974 	if (n_tables > 0) {
2975 		ulint	i;
2976 
2977 		for (i = 0; i < ib_vector_size(fts_slots); i++) {
2978 			fts_slot_t*	slot;
2979 
2980 			slot = static_cast<fts_slot_t*>(
2981 				ib_vector_get(fts_slots, i));
2982 
2983 			if (slot->state != FTS_STATE_EMPTY) {
2984 				fts_optimize_sync_table(slot->table_id);
2985 			}
2986 		}
2987 	}
2988 
2989 	ib_vector_free(fts_slots);
2990 
2991 	ib::info() << "FTS optimize thread exiting.";
2992 
2993 	os_event_set(fts_opt_shutdown_event);
2994 	my_thread_end();
2995 
2996 	/* We count the number of threads in os_thread_exit(). A created
2997 	thread should always use that to exit and not use return() to exit. */
2998 	os_thread_exit();
2999 
3000 	OS_THREAD_DUMMY_RETURN;
3001 }
3002 
3003 /**********************************************************************//**
3004 Startup the optimize thread and create the work queue. */
3005 void
fts_optimize_init(void)3006 fts_optimize_init(void)
3007 /*===================*/
3008 {
3009 	mem_heap_t*	heap;
3010 	ib_alloc_t*     heap_alloc;
3011 	dict_table_t*   table;
3012 
3013 	ut_ad(!srv_read_only_mode);
3014 
3015 	/* For now we only support one optimize thread. */
3016 	ut_a(!fts_optimize_is_init());
3017 
3018 	/* Create FTS optimize work queue */
3019 	fts_optimize_wq = ib_wqueue_create();
3020 	ut_a(fts_optimize_wq != NULL);
3021 
3022 	/* Create FTS vector to store fts_slot_t */
3023 	heap = mem_heap_create(sizeof(dict_table_t*) * 64);
3024 	heap_alloc = ib_heap_allocator_create(heap);
3025 	fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
3026 
3027 	/* Add fts tables to the fts_slots vector which were skipped during restart */
3028 	std::vector<dict_table_t*> table_vector;
3029 	std::vector<dict_table_t*>::iterator it;
3030 
3031 	mutex_enter(&dict_sys->mutex);
3032 	for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
3033              table != NULL;
3034              table = UT_LIST_GET_NEXT(table_LRU, table)) {
3035                 if (table->fts &&
3036                     dict_table_has_fts_index(table)) {
3037 			if (fts_optimize_new_table(fts_slots,
3038 						   table)){
3039 				table_vector.push_back(table);
3040 			}
3041 		}
3042 	}
3043 
3044 	/* It is better to call dict_table_prevent_eviction()
3045 	outside the above loop because it operates on
3046 	dict_sys->table_LRU list.*/
3047 	for (it=table_vector.begin();it!=table_vector.end();++it) {
3048 		dict_table_prevent_eviction(*it);
3049 	}
3050 
3051 	mutex_exit(&dict_sys->mutex);
3052 	table_vector.clear();
3053 
3054 	fts_opt_shutdown_event = os_event_create(0);
3055 	last_check_sync_time = ut_time_monotonic();
3056 
3057 	os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
3058 }
3059 
3060 /**********************************************************************//**
3061 Check whether the work queue is initialized.
3062 @return TRUE if optimize queue is initialized. */
3063 ibool
fts_optimize_is_init(void)3064 fts_optimize_is_init(void)
3065 /*======================*/
3066 {
3067 	return(fts_optimize_wq != NULL);
3068 }
3069 
3070 /** Shutdown fts optimize thread. */
3071 void
fts_optimize_shutdown()3072 fts_optimize_shutdown()
3073 {
3074 	ut_ad(!srv_read_only_mode);
3075 
3076 	fts_msg_t*	msg;
3077 
3078 	/* If there is an ongoing activity on dictionary, such as
3079 	srv_master_evict_from_table_cache(), wait for it */
3080 	dict_mutex_enter_for_mysql();
3081 
3082 	/* Tells FTS optimizer system that we are exiting from
3083 	optimizer thread, message send their after will not be
3084 	processed */
3085 	fts_opt_start_shutdown = true;
3086 	dict_mutex_exit_for_mysql();
3087 
3088 	/* We tell the OPTIMIZE thread to switch to state done, we
3089 	can't delete the work queue here because the add thread needs
3090 	deregister the FTS tables. */
3091 
3092 	msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
3093 
3094 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
3095 
3096 	os_event_wait(fts_opt_shutdown_event);
3097 
3098 	os_event_destroy(fts_opt_shutdown_event);
3099 
3100 	ib_wqueue_free(fts_optimize_wq);
3101 	fts_optimize_wq = NULL;
3102 }
3103