1 /*****************************************************************************
2 
3 Copyright (c) 2007, 2018, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2016, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /******************************************************************//**
21 @file fts/fts0opt.cc
22 Full Text Search optimize thread
23 
24 Created 2007/03/27 Sunny Bains
25 Completed 2011/7/10 Sunny and Jimmy Yang
26 
27 ***********************************************************************/
28 
29 #include "fts0fts.h"
30 #include "row0sel.h"
31 #include "que0types.h"
32 #include "fts0priv.h"
33 #include "fts0types.h"
34 #include "ut0wqueue.h"
35 #include "srv0start.h"
36 #include "ut0list.h"
37 #include "zlib.h"
38 #include "fts0opt.h"
39 #include "fts0vlc.h"
40 
41 /** The FTS optimize thread's work queue. */
42 ib_wqueue_t* fts_optimize_wq;
43 
44 /** The FTS vector to store fts_slot_t */
45 static ib_vector_t*  fts_slots;
46 
47 /** Time to wait for a message. */
48 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
49 
50 /** Default optimize interval in secs. */
51 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
52 
53 /** Server is shutting down, so does we exiting the optimize thread */
54 static bool fts_opt_start_shutdown = false;
55 
56 /** Event to wait for shutdown of the optimize thread */
57 static os_event_t fts_opt_shutdown_event = NULL;
58 
59 /** Initial size of nodes in fts_word_t. */
60 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
61 
62 /** Last time we did check whether system need a sync */
63 static time_t	last_check_sync_time;
64 
65 /** FTS optimize thread message types. */
66 enum fts_msg_type_t {
67 	FTS_MSG_STOP,			/*!< Stop optimizing and exit thread */
68 
69 	FTS_MSG_ADD_TABLE,		/*!< Add table to the optimize thread's
70 					work queue */
71 
72 	FTS_MSG_DEL_TABLE,		/*!< Remove a table from the optimize
73 					threads work queue */
74 	FTS_MSG_SYNC_TABLE		/*!< Sync fts cache of a table */
75 };
76 
77 /** Compressed list of words that have been read from FTS INDEX
78 that needs to be optimized. */
79 struct fts_zip_t {
80 	lint		status;		/*!< Status of (un)/zip operation */
81 
82 	ulint		n_words;	/*!< Number of words compressed */
83 
84 	ulint		block_sz;	/*!< Size of a block in bytes */
85 
86 	ib_vector_t*	blocks;		/*!< Vector of compressed blocks */
87 
88 	ib_alloc_t*	heap_alloc;	/*!< Heap to use for allocations */
89 
90 	ulint		pos;		/*!< Offset into blocks */
91 
92 	ulint		last_big_block;	/*!< Offset of last block in the
93 					blocks array that is of size
94 					block_sz. Blocks beyond this offset
95 					are of size FTS_MAX_WORD_LEN */
96 
97 	z_streamp	zp;		/*!< ZLib state */
98 
99 					/*!< The value of the last word read
100 					from the FTS INDEX table. This is
101 					used to discard duplicates */
102 
103 	fts_string_t	word;		/*!< UTF-8 string */
104 
105 	ulint		max_words;	/*!< maximum number of words to read
106 					in one pase */
107 };
108 
109 /** Prepared statemets used during optimize */
110 struct fts_optimize_graph_t {
111 					/*!< Delete a word from FTS INDEX */
112 	que_t*		delete_nodes_graph;
113 					/*!< Insert a word into FTS INDEX */
114 	que_t*		write_nodes_graph;
115 					/*!< COMMIT a transaction */
116 	que_t*		commit_graph;
117 					/*!< Read the nodes from FTS_INDEX */
118 	que_t*		read_nodes_graph;
119 };
120 
121 /** Used by fts_optimize() to store state. */
122 struct fts_optimize_t {
123 	trx_t*		trx;		/*!< The transaction used for all SQL */
124 
125 	ib_alloc_t*	self_heap;	/*!< Heap to use for allocations */
126 
127 	char*		name_prefix;	/*!< FTS table name prefix */
128 
129 	fts_table_t	fts_index_table;/*!< Common table definition */
130 
131 					/*!< Common table definition */
132 	fts_table_t	fts_common_table;
133 
134 	dict_table_t*	table;		/*!< Table that has to be queried */
135 
136 	dict_index_t*	index;		/*!< The FTS index to be optimized */
137 
138 	fts_doc_ids_t*	to_delete;	/*!< doc ids to delete, we check against
139 					this vector and purge the matching
140 					entries during the optimizing
141 					process. The vector entries are
142 					sorted on doc id */
143 
144 	ulint		del_pos;	/*!< Offset within to_delete vector,
145 					this is used to keep track of where
146 					we are up to in the vector */
147 
148 	ibool		done;		/*!< TRUE when optimize finishes */
149 
150 	ib_vector_t*	words;		/*!< Word + Nodes read from FTS_INDEX,
151 					it contains instances of fts_word_t */
152 
153 	fts_zip_t*	zip;		/*!< Words read from the FTS_INDEX */
154 
155 	fts_optimize_graph_t		/*!< Prepared statements used during */
156 			graph;		/*optimize */
157 
158 	ulint		n_completed;	/*!< Number of FTS indexes that have
159 					been optimized */
160 	ibool		del_list_regenerated;
161 					/*!< BEING_DELETED list regenarated */
162 };
163 
164 /** Used by the optimize, to keep state during compacting nodes. */
165 struct fts_encode_t {
166 	doc_id_t	src_last_doc_id;/*!< Last doc id read from src node */
167 	byte*		src_ilist_ptr;	/*!< Current ptr within src ilist */
168 };
169 
170 /** We use this information to determine when to start the optimize
171 cycle for a table. */
172 struct fts_slot_t {
173 	/** table, or NULL if the slot is unused */
174 	dict_table_t*	table;
175 
176 	/** whether this slot is being processed */
177 	bool		running;
178 
179 	ulint		added;		/*!< Number of doc ids added since the
180 					last time this table was optimized */
181 
182 	ulint		deleted;	/*!< Number of doc ids deleted since the
183 					last time this table was optimized */
184 
185 	/** time(NULL) of completing fts_optimize_table_bk() */
186 	time_t		last_run;
187 
188 	/** time(NULL) of latest successful fts_optimize_table() */
189 	time_t		completed;
190 };
191 
192 /** A table remove message for the FTS optimize thread. */
193 struct fts_msg_del_t {
194 	dict_table_t*	table;		/*!< The table to remove */
195 
196 	os_event_t	event;		/*!< Event to synchronize acknowledgement
197 					of receipt and processing of the
198 					this message by the consumer */
199 };
200 
201 /** The FTS optimize message work queue message type. */
202 struct fts_msg_t {
203 	fts_msg_type_t	type;		/*!< Message type */
204 
205 	void*		ptr;		/*!< The message contents */
206 
207 	mem_heap_t*	heap;		/*!< The heap used to allocate this
208 					message, the message consumer will
209 					free the heap. */
210 };
211 
212 /** The number of words to read and optimize in a single pass. */
213 ulong	fts_num_word_optimize;
214 
215 /** Whether to enable additional FTS diagnostic printout. */
216 char	fts_enable_diag_print;
217 
218 /** ZLib compressed block size.*/
219 static ulint FTS_ZIP_BLOCK_SIZE	= 1024;
220 
221 /** The amount of time optimizing in a single pass, in seconds. */
222 static ulint fts_optimize_time_limit;
223 
224 /** It's defined in fts0fts.cc  */
225 extern const char* fts_common_tables[];
226 
227 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
228 static	const char* fts_init_delete_sql =
229 	"BEGIN\n"
230 	"\n"
231 	"INSERT INTO $BEING_DELETED\n"
232 		"SELECT doc_id FROM $DELETED;\n"
233 	"\n"
234 	"INSERT INTO $BEING_DELETED_CACHE\n"
235 		"SELECT doc_id FROM $DELETED_CACHE;\n";
236 
237 static const char* fts_delete_doc_ids_sql =
238 	"BEGIN\n"
239 	"\n"
240 	"DELETE FROM $DELETED WHERE doc_id = :doc_id1;\n"
241 	"DELETE FROM $DELETED_CACHE WHERE doc_id = :doc_id2;\n";
242 
243 static const char* fts_end_delete_sql =
244 	"BEGIN\n"
245 	"\n"
246 	"DELETE FROM $BEING_DELETED;\n"
247 	"DELETE FROM $BEING_DELETED_CACHE;\n";
248 
249 /**********************************************************************//**
250 Initialize fts_zip_t. */
251 static
252 void
fts_zip_initialize(fts_zip_t * zip)253 fts_zip_initialize(
254 /*===============*/
255 	fts_zip_t*	zip)		/*!< out: zip instance to initialize */
256 {
257 	zip->pos = 0;
258 	zip->n_words = 0;
259 
260 	zip->status = Z_OK;
261 
262 	zip->last_big_block = 0;
263 
264 	zip->word.f_len = 0;
265 	*zip->word.f_str = 0;
266 
267 	ib_vector_reset(zip->blocks);
268 
269 	memset(zip->zp, 0, sizeof(*zip->zp));
270 }
271 
272 /**********************************************************************//**
273 Create an instance of fts_zip_t.
274 @return a new instance of fts_zip_t */
275 static
276 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)277 fts_zip_create(
278 /*===========*/
279 	mem_heap_t*	heap,		/*!< in: heap */
280 	ulint		block_sz,	/*!< in: size of a zip block.*/
281 	ulint		max_words)	/*!< in: max words to read */
282 {
283 	fts_zip_t*	zip;
284 
285 	zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
286 
287 	zip->word.f_str = static_cast<byte*>(
288 		mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
289 
290 	zip->block_sz = block_sz;
291 
292 	zip->heap_alloc = ib_heap_allocator_create(heap);
293 
294 	zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
295 
296 	zip->max_words = max_words;
297 
298 	zip->zp = static_cast<z_stream*>(
299 		mem_heap_zalloc(heap, sizeof(*zip->zp)));
300 
301 	return(zip);
302 }
303 
304 /**********************************************************************//**
305 Initialize an instance of fts_zip_t. */
306 static
307 void
fts_zip_init(fts_zip_t * zip)308 fts_zip_init(
309 /*=========*/
310 
311 	fts_zip_t*	zip)		/*!< in: zip instance to init */
312 {
313 	memset(zip->zp, 0, sizeof(*zip->zp));
314 
315 	zip->word.f_len = 0;
316 	*zip->word.f_str = '\0';
317 }
318 
319 /**********************************************************************//**
320 Create a fts_optimizer_word_t instance.
321 @return new instance */
322 static
323 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)324 fts_word_init(
325 /*==========*/
326 	fts_word_t*	word,		/*!< in: word to initialize */
327 	byte*		utf8,		/*!< in: UTF-8 string */
328 	ulint		len)		/*!< in: length of string in bytes */
329 {
330 	mem_heap_t*	heap = mem_heap_create(sizeof(fts_node_t));
331 
332 	memset(word, 0, sizeof(*word));
333 
334 	word->text.f_len = len;
335 	word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
336 
337 	/* Need to copy the NUL character too. */
338 	memcpy(word->text.f_str, utf8, word->text.f_len);
339 	word->text.f_str[word->text.f_len] = 0;
340 
341 	word->heap_alloc = ib_heap_allocator_create(heap);
342 
343 	word->nodes = ib_vector_create(
344 		word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
345 
346 	return(word);
347 }
348 
349 /**********************************************************************//**
350 Read the FTS INDEX row.
351 @return fts_node_t instance */
352 static
353 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)354 fts_optimize_read_node(
355 /*===================*/
356 	fts_word_t*	word,		/*!< in: */
357 	que_node_t*	exp)		/*!< in: */
358 {
359 	int		i;
360 	fts_node_t*	node = static_cast<fts_node_t*>(
361 		ib_vector_push(word->nodes, NULL));
362 
363 	/* Start from 1 since the first node has been read by the caller */
364 	for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
365 
366 		dfield_t*	dfield = que_node_get_val(exp);
367 		byte*		data = static_cast<byte*>(
368 			dfield_get_data(dfield));
369 		ulint		len = dfield_get_len(dfield);
370 
371 		ut_a(len != UNIV_SQL_NULL);
372 
373 		/* Note: The column numbers below must match the SELECT */
374 		switch (i) {
375 		case 1: /* DOC_COUNT */
376 			node->doc_count = mach_read_from_4(data);
377 			break;
378 
379 		case 2: /* FIRST_DOC_ID */
380 			node->first_doc_id = fts_read_doc_id(data);
381 			break;
382 
383 		case 3: /* LAST_DOC_ID */
384 			node->last_doc_id = fts_read_doc_id(data);
385 			break;
386 
387 		case 4: /* ILIST */
388 			node->ilist_size_alloc = node->ilist_size = len;
389 			node->ilist = static_cast<byte*>(ut_malloc_nokey(len));
390 			memcpy(node->ilist, data, len);
391 			break;
392 
393 		default:
394 			ut_error;
395 		}
396 	}
397 
398 	/* Make sure all columns were read. */
399 	ut_a(i == 5);
400 
401 	return(node);
402 }
403 
404 /**********************************************************************//**
405 Callback function to fetch the rows in an FTS INDEX record.
406 @return always returns non-NULL */
407 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)408 fts_optimize_index_fetch_node(
409 /*==========================*/
410 	void*		row,		/*!< in: sel_node_t* */
411 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
412 {
413 	fts_word_t*	word;
414 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
415 	fts_fetch_t*	fetch = static_cast<fts_fetch_t*>(user_arg);
416 	ib_vector_t*	words = static_cast<ib_vector_t*>(fetch->read_arg);
417 	que_node_t*	exp = sel_node->select_list;
418 	dfield_t*	dfield = que_node_get_val(exp);
419 	void*		data = dfield_get_data(dfield);
420 	ulint		dfield_len = dfield_get_len(dfield);
421 	fts_node_t*	node;
422 	bool		is_word_init = false;
423 
424 	ut_a(dfield_len <= FTS_MAX_WORD_LEN);
425 
426 	if (ib_vector_size(words) == 0) {
427 
428 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
429 		fts_word_init(word, (byte*) data, dfield_len);
430 		is_word_init = true;
431 	}
432 
433 	word = static_cast<fts_word_t*>(ib_vector_last(words));
434 
435 	if (dfield_len != word->text.f_len
436 	    || memcmp(word->text.f_str, data, dfield_len)) {
437 
438 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
439 		fts_word_init(word, (byte*) data, dfield_len);
440 		is_word_init = true;
441 	}
442 
443 	node = fts_optimize_read_node(word, que_node_get_next(exp));
444 
445 	fetch->total_memory += node->ilist_size;
446 	if (is_word_init) {
447 		fetch->total_memory += sizeof(fts_word_t)
448 			+ sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
449 			+ sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
450 	} else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
451 		fetch->total_memory += sizeof(fts_node_t);
452 	}
453 
454 	if (fetch->total_memory >= fts_result_cache_limit) {
455 		return(FALSE);
456 	}
457 
458 	return(TRUE);
459 }
460 
461 /**********************************************************************//**
462 Read the rows from the FTS inde.
463 @return DB_SUCCESS or error code */
464 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)465 fts_index_fetch_nodes(
466 /*==================*/
467 	trx_t*		trx,		/*!< in: transaction */
468 	que_t**		graph,		/*!< in: prepared statement */
469 	fts_table_t*	fts_table,	/*!< in: table of the FTS INDEX */
470 	const fts_string_t*
471 			word,		/*!< in: the word to fetch */
472 	fts_fetch_t*	fetch)		/*!< in: fetch callback.*/
473 {
474 	pars_info_t*	info;
475 	dberr_t		error;
476 	char		table_name[MAX_FULL_NAME_LEN];
477 
478 	trx->op_info = "fetching FTS index nodes";
479 
480 	if (*graph) {
481 		info = (*graph)->info;
482 	} else {
483 		ulint	selected;
484 
485 		info = pars_info_create();
486 
487 		ut_a(fts_table->type == FTS_INDEX_TABLE);
488 
489 		selected = fts_select_index(fts_table->charset,
490 					    word->f_str, word->f_len);
491 
492 		fts_table->suffix = fts_get_suffix(selected);
493 
494 		fts_get_table_name(fts_table, table_name);
495 
496 		pars_info_bind_id(info, "table_name", table_name);
497 	}
498 
499 	pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
500 	pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
501 
502 	if (!*graph) {
503 
504 		*graph = fts_parse_sql(
505 			fts_table,
506 			info,
507 			"DECLARE FUNCTION my_func;\n"
508 			"DECLARE CURSOR c IS"
509 			" SELECT word, doc_count, first_doc_id, last_doc_id,"
510 			" ilist\n"
511 			" FROM $table_name\n"
512 			" WHERE word LIKE :word\n"
513 			" ORDER BY first_doc_id;\n"
514 			"BEGIN\n"
515 			"\n"
516 			"OPEN c;\n"
517 			"WHILE 1 = 1 LOOP\n"
518 			"  FETCH c INTO my_func();\n"
519 			"  IF c % NOTFOUND THEN\n"
520 			"    EXIT;\n"
521 			"  END IF;\n"
522 			"END LOOP;\n"
523 			"CLOSE c;");
524 	}
525 
526 	for (;;) {
527 		error = fts_eval_sql(trx, *graph);
528 
529 		if (UNIV_LIKELY(error == DB_SUCCESS)) {
530 			fts_sql_commit(trx);
531 
532 			break;				/* Exit the loop. */
533 		} else {
534 			fts_sql_rollback(trx);
535 
536 			if (error == DB_LOCK_WAIT_TIMEOUT) {
537 				ib::warn() << "lock wait timeout reading"
538 					" FTS index. Retrying!";
539 
540 				trx->error_state = DB_SUCCESS;
541 			} else {
542 				ib::error() << "(" << error
543 					<< ") while reading FTS index.";
544 
545 				break;			/* Exit the loop. */
546 			}
547 		}
548 	}
549 
550 	return(error);
551 }
552 
553 /**********************************************************************//**
554 Read a word */
555 static
556 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)557 fts_zip_read_word(
558 /*==============*/
559 	fts_zip_t*	zip,		/*!< in: Zip state + data */
560 	fts_string_t*	word)		/*!< out: uncompressed word */
561 {
562 	short		len = 0;
563 	void*		null = NULL;
564 	byte*		ptr = word->f_str;
565 	int		flush = Z_NO_FLUSH;
566 
567 	/* Either there was an error or we are at the Z_STREAM_END. */
568 	if (zip->status != Z_OK) {
569 		return(NULL);
570 	}
571 
572 	zip->zp->next_out = reinterpret_cast<byte*>(&len);
573 	zip->zp->avail_out = sizeof(len);
574 
575 	while (zip->status == Z_OK && zip->zp->avail_out > 0) {
576 
577 		/* Finished decompressing block. */
578 		if (zip->zp->avail_in == 0) {
579 
580 			/* Free the block thats been decompressed. */
581 			if (zip->pos > 0) {
582 				ulint	prev = zip->pos - 1;
583 
584 				ut_a(zip->pos < ib_vector_size(zip->blocks));
585 
586 				ut_free(ib_vector_getp(zip->blocks, prev));
587 				ib_vector_set(zip->blocks, prev, &null);
588 			}
589 
590 			/* Any more blocks to decompress. */
591 			if (zip->pos < ib_vector_size(zip->blocks)) {
592 
593 				zip->zp->next_in = static_cast<byte*>(
594 					ib_vector_getp(
595 						zip->blocks, zip->pos));
596 
597 				if (zip->pos > zip->last_big_block) {
598 					zip->zp->avail_in =
599 						FTS_MAX_WORD_LEN;
600 				} else {
601 					zip->zp->avail_in =
602 						static_cast<uInt>(zip->block_sz);
603 				}
604 
605 				++zip->pos;
606 			} else {
607 				flush = Z_FINISH;
608 			}
609 		}
610 
611 		switch (zip->status = inflate(zip->zp, flush)) {
612 		case Z_OK:
613 			if (zip->zp->avail_out == 0 && len > 0) {
614 
615 				ut_a(len <= FTS_MAX_WORD_LEN);
616 				ptr[len] = 0;
617 
618 				zip->zp->next_out = ptr;
619 				zip->zp->avail_out = uInt(len);
620 
621 				word->f_len = ulint(len);
622 				len = 0;
623 			}
624 			break;
625 
626 		case Z_BUF_ERROR:	/* No progress possible. */
627 		case Z_STREAM_END:
628 			inflateEnd(zip->zp);
629 			break;
630 
631 		case Z_STREAM_ERROR:
632 		default:
633 			ut_error;
634 		}
635 	}
636 
637 	/* All blocks must be freed at end of inflate. */
638 	if (zip->status != Z_OK) {
639 		for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
640 			if (ib_vector_getp(zip->blocks, i)) {
641 				ut_free(ib_vector_getp(zip->blocks, i));
642 				ib_vector_set(zip->blocks, i, &null);
643 			}
644 		}
645 	}
646 
647 	if (ptr != NULL) {
648 		ut_ad(word->f_len == strlen((char*) ptr));
649 	}
650 
651 	return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
652 }
653 
654 /**********************************************************************//**
655 Callback function to fetch and compress the word in an FTS
656 INDEX record.
657 @return FALSE on EOF */
658 static
659 ibool
fts_fetch_index_words(void * row,void * user_arg)660 fts_fetch_index_words(
661 /*==================*/
662 	void*		row,		/*!< in: sel_node_t* */
663 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
664 {
665 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
666 	fts_zip_t*	zip = static_cast<fts_zip_t*>(user_arg);
667 	que_node_t*	exp = sel_node->select_list;
668 	dfield_t*	dfield = que_node_get_val(exp);
669 
670 	ut_a(dfield_get_len(dfield) <= FTS_MAX_WORD_LEN);
671 
672 	uint16		len = uint16(dfield_get_len(dfield));
673 	void*		data = dfield_get_data(dfield);
674 
675 	/* Skip the duplicate words. */
676 	if (zip->word.f_len == len && !memcmp(zip->word.f_str, data, len)) {
677 		return(TRUE);
678 	}
679 
680 	memcpy(zip->word.f_str, data, len);
681 	zip->word.f_len = len;
682 
683 	ut_a(zip->zp->avail_in == 0);
684 	ut_a(zip->zp->next_in == NULL);
685 
686 	/* The string is prefixed by len. */
687 	/* FIXME: This is not byte order agnostic (InnoDB data files
688 	with FULLTEXT INDEX are not portable between little-endian and
689 	big-endian systems!) */
690 	zip->zp->next_in = reinterpret_cast<byte*>(&len);
691 	zip->zp->avail_in = sizeof(len);
692 
693 	/* Compress the word, create output blocks as necessary. */
694 	while (zip->zp->avail_in > 0) {
695 
696 		/* No space left in output buffer, create a new one. */
697 		if (zip->zp->avail_out == 0) {
698 			byte*		block;
699 
700 			block = static_cast<byte*>(
701 				ut_malloc_nokey(zip->block_sz));
702 
703 			ib_vector_push(zip->blocks, &block);
704 
705 			zip->zp->next_out = block;
706 			zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
707 		}
708 
709 		switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
710 		case Z_OK:
711 			if (zip->zp->avail_in == 0) {
712 				zip->zp->next_in = static_cast<byte*>(data);
713 				zip->zp->avail_in = uInt(len);
714 				ut_a(len <= FTS_MAX_WORD_LEN);
715 				len = 0;
716 			}
717 			continue;
718 
719 		case Z_STREAM_END:
720 		case Z_BUF_ERROR:
721 		case Z_STREAM_ERROR:
722 		default:
723 			ut_error;
724 		}
725 	}
726 
727 	/* All data should have been compressed. */
728 	ut_a(zip->zp->avail_in == 0);
729 	zip->zp->next_in = NULL;
730 
731 	++zip->n_words;
732 
733 	return(zip->n_words >= zip->max_words ? FALSE : TRUE);
734 }
735 
736 /**********************************************************************//**
737 Finish Zip deflate. */
738 static
739 void
fts_zip_deflate_end(fts_zip_t * zip)740 fts_zip_deflate_end(
741 /*================*/
742 	fts_zip_t*	zip)		/*!< in: instance that should be closed*/
743 {
744 	ut_a(zip->zp->avail_in == 0);
745 	ut_a(zip->zp->next_in == NULL);
746 
747 	zip->status = deflate(zip->zp, Z_FINISH);
748 
749 	ut_a(ib_vector_size(zip->blocks) > 0);
750 	zip->last_big_block = ib_vector_size(zip->blocks) - 1;
751 
752 	/* Allocate smaller block(s), since this is trailing data. */
753 	while (zip->status == Z_OK) {
754 		byte*		block;
755 
756 		ut_a(zip->zp->avail_out == 0);
757 
758 		block = static_cast<byte*>(
759 			ut_malloc_nokey(FTS_MAX_WORD_LEN + 1));
760 
761 		ib_vector_push(zip->blocks, &block);
762 
763 		zip->zp->next_out = block;
764 		zip->zp->avail_out = FTS_MAX_WORD_LEN;
765 
766 		zip->status = deflate(zip->zp, Z_FINISH);
767 	}
768 
769 	ut_a(zip->status == Z_STREAM_END);
770 
771 	zip->status = deflateEnd(zip->zp);
772 	ut_a(zip->status == Z_OK);
773 
774 	/* Reset the ZLib data structure. */
775 	memset(zip->zp, 0, sizeof(*zip->zp));
776 }
777 
778 /**********************************************************************//**
779 Read the words from the FTS INDEX.
780 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
781         to search else error code */
782 static MY_ATTRIBUTE((nonnull, warn_unused_result))
783 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)784 fts_index_fetch_words(
785 /*==================*/
786 	fts_optimize_t*		optim,	/*!< in: optimize scratch pad */
787 	const fts_string_t*	word,	/*!< in: get words greater than this
788 					 word */
789 	ulint			n_words)/*!< in: max words to read */
790 {
791 	pars_info_t*	info;
792 	que_t*		graph;
793 	ulint		selected;
794 	fts_zip_t*	zip = NULL;
795 	dberr_t		error = DB_SUCCESS;
796 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
797 	ibool		inited = FALSE;
798 
799 	optim->trx->op_info = "fetching FTS index words";
800 
801 	if (optim->zip == NULL) {
802 		optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
803 	} else {
804 		fts_zip_initialize(optim->zip);
805 	}
806 
807 	for (selected = fts_select_index(
808 		     optim->fts_index_table.charset, word->f_str, word->f_len);
809 	     selected < FTS_NUM_AUX_INDEX;
810 	     selected++) {
811 
812 		char	table_name[MAX_FULL_NAME_LEN];
813 
814 		optim->fts_index_table.suffix = fts_get_suffix(selected);
815 
816 		info = pars_info_create();
817 
818 		pars_info_bind_function(
819 			info, "my_func", fts_fetch_index_words, optim->zip);
820 
821 		pars_info_bind_varchar_literal(
822 			info, "word", word->f_str, word->f_len);
823 
824 		fts_get_table_name(&optim->fts_index_table, table_name);
825 		pars_info_bind_id(info, "table_name", table_name);
826 
827 		graph = fts_parse_sql(
828 			&optim->fts_index_table,
829 			info,
830 			"DECLARE FUNCTION my_func;\n"
831 			"DECLARE CURSOR c IS"
832 			" SELECT word\n"
833 			" FROM $table_name\n"
834 			" WHERE word > :word\n"
835 			" ORDER BY word;\n"
836 			"BEGIN\n"
837 			"\n"
838 			"OPEN c;\n"
839 			"WHILE 1 = 1 LOOP\n"
840 			"  FETCH c INTO my_func();\n"
841 			"  IF c % NOTFOUND THEN\n"
842 			"    EXIT;\n"
843 			"  END IF;\n"
844 			"END LOOP;\n"
845 			"CLOSE c;");
846 
847 		zip = optim->zip;
848 
849 		for (;;) {
850 			int	err;
851 
852 			if (!inited && ((err = deflateInit(zip->zp, 9))
853 					!= Z_OK)) {
854 				ib::error() << "ZLib deflateInit() failed: "
855 					<< err;
856 
857 				error = DB_ERROR;
858 				break;
859 			} else {
860 				inited = TRUE;
861 				error = fts_eval_sql(optim->trx, graph);
862 			}
863 
864 			if (UNIV_LIKELY(error == DB_SUCCESS)) {
865 				//FIXME fts_sql_commit(optim->trx);
866 				break;
867 			} else {
868 				//FIXME fts_sql_rollback(optim->trx);
869 
870 				if (error == DB_LOCK_WAIT_TIMEOUT) {
871 					ib::warn() << "Lock wait timeout"
872 						" reading document. Retrying!";
873 
874 					/* We need to reset the ZLib state. */
875 					inited = FALSE;
876 					deflateEnd(zip->zp);
877 					fts_zip_init(zip);
878 
879 					optim->trx->error_state = DB_SUCCESS;
880 				} else {
881 					ib::error() << "(" << error
882 						<< ") while reading document.";
883 
884 					break;	/* Exit the loop. */
885 				}
886 			}
887 		}
888 
889 		fts_que_graph_free(graph);
890 
891 		/* Check if max word to fetch is exceeded */
892 		if (optim->zip->n_words >= n_words) {
893 			break;
894 		}
895 	}
896 
897 	if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
898 
899 		/* All data should have been read. */
900 		ut_a(zip->zp->avail_in == 0);
901 
902 		fts_zip_deflate_end(zip);
903 	} else {
904 		deflateEnd(zip->zp);
905 	}
906 
907 	return(error);
908 }
909 
910 /**********************************************************************//**
911 Callback function to fetch the doc id from the record.
912 @return always returns TRUE */
913 static
914 ibool
fts_fetch_doc_ids(void * row,void * user_arg)915 fts_fetch_doc_ids(
916 /*==============*/
917 	void*	row,		/*!< in: sel_node_t* */
918 	void*	user_arg)	/*!< in: pointer to ib_vector_t */
919 {
920 	que_node_t*	exp;
921 	int		i = 0;
922 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
923 	fts_doc_ids_t*	fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
924 	doc_id_t*	update = static_cast<doc_id_t*>(
925 		ib_vector_push(fts_doc_ids->doc_ids, NULL));
926 
927 	for (exp = sel_node->select_list;
928 	     exp;
929 	     exp = que_node_get_next(exp), ++i) {
930 
931 		dfield_t*	dfield = que_node_get_val(exp);
932 		void*		data = dfield_get_data(dfield);
933 		ulint		len = dfield_get_len(dfield);
934 
935 		ut_a(len != UNIV_SQL_NULL);
936 
937 		/* Note: The column numbers below must match the SELECT. */
938 		switch (i) {
939 		case 0: /* DOC_ID */
940 			*update = fts_read_doc_id(
941 				static_cast<byte*>(data));
942 			break;
943 
944 		default:
945 			ut_error;
946 		}
947 	}
948 
949 	return(TRUE);
950 }
951 
952 /**********************************************************************//**
953 Read the rows from a FTS common auxiliary table.
954 @return DB_SUCCESS or error code */
955 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)956 fts_table_fetch_doc_ids(
957 /*====================*/
958 	trx_t*		trx,		/*!< in: transaction */
959 	fts_table_t*	fts_table,	/*!< in: table */
960 	fts_doc_ids_t*	doc_ids)	/*!< in: For collecting doc ids */
961 {
962 	dberr_t		error;
963 	que_t*		graph;
964 	pars_info_t*	info = pars_info_create();
965 	ibool		alloc_bk_trx = FALSE;
966 	char		table_name[MAX_FULL_NAME_LEN];
967 
968 	ut_a(fts_table->suffix != NULL);
969 	ut_a(fts_table->type == FTS_COMMON_TABLE);
970 
971 	if (!trx) {
972 		trx = trx_create();
973 		alloc_bk_trx = TRUE;
974 	}
975 
976 	trx->op_info = "fetching FTS doc ids";
977 
978 	pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
979 
980 	fts_get_table_name(fts_table, table_name);
981 	pars_info_bind_id(info, "table_name", table_name);
982 
983 	graph = fts_parse_sql(
984 		fts_table,
985 		info,
986 		"DECLARE FUNCTION my_func;\n"
987 		"DECLARE CURSOR c IS"
988 		" SELECT doc_id FROM $table_name;\n"
989 		"BEGIN\n"
990 		"\n"
991 		"OPEN c;\n"
992 		"WHILE 1 = 1 LOOP\n"
993 		"  FETCH c INTO my_func();\n"
994 		"  IF c % NOTFOUND THEN\n"
995 		"    EXIT;\n"
996 		"  END IF;\n"
997 		"END LOOP;\n"
998 		"CLOSE c;");
999 
1000 	error = fts_eval_sql(trx, graph);
1001 	fts_sql_commit(trx);
1002 
1003 	mutex_enter(&dict_sys.mutex);
1004 	que_graph_free(graph);
1005 	mutex_exit(&dict_sys.mutex);
1006 
1007 	if (error == DB_SUCCESS) {
1008 		ib_vector_sort(doc_ids->doc_ids, fts_doc_id_cmp);
1009 	}
1010 
1011 	if (alloc_bk_trx) {
1012 		trx->free();
1013 	}
1014 
1015 	return(error);
1016 }
1017 
1018 /**********************************************************************//**
1019 Do a binary search for a doc id in the array
1020 @return +ve index if found -ve index where it should be inserted
1021         if not found */
1022 int
fts_bsearch(doc_id_t * array,int lower,int upper,doc_id_t doc_id)1023 fts_bsearch(
1024 /*========*/
1025 	doc_id_t*	array,	/*!< in: array to sort */
1026 	int		lower,	/*!< in: the array lower bound */
1027 	int		upper,	/*!< in: the array upper bound */
1028 	doc_id_t	doc_id)	/*!< in: the doc id to search for */
1029 {
1030 	int	orig_size = upper;
1031 
1032 	if (upper == 0) {
1033 		/* Nothing to search */
1034 		return(-1);
1035 	} else {
1036 		while (lower < upper) {
1037 			int	i = (lower + upper) >> 1;
1038 
1039 			if (doc_id > array[i]) {
1040 				lower = i + 1;
1041 			} else if (doc_id < array[i]) {
1042 				upper = i - 1;
1043 			} else {
1044 				return(i); /* Found. */
1045 			}
1046 		}
1047 	}
1048 
1049 	if (lower == upper && lower < orig_size) {
1050 		if (doc_id == array[lower]) {
1051 			return(lower);
1052 		} else if (lower == 0) {
1053 			return(-1);
1054 		}
1055 	}
1056 
1057 	/* Not found. */
1058 	return( (lower == 0) ? -1 : -(lower));
1059 }
1060 
1061 /**********************************************************************//**
1062 Search in the to delete array whether any of the doc ids within
1063 the [first, last] range are to be deleted
1064 @return +ve index if found -ve index where it should be inserted
1065         if not found */
1066 static
1067 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1068 fts_optimize_lookup(
1069 /*================*/
1070 	ib_vector_t*	doc_ids,	/*!< in: array to search */
1071 	ulint		lower,		/*!< in: lower limit of array */
1072 	doc_id_t	first_doc_id,	/*!< in: doc id to lookup */
1073 	doc_id_t	last_doc_id)	/*!< in: doc id to lookup */
1074 {
1075 	int		pos;
1076 	int		upper = static_cast<int>(ib_vector_size(doc_ids));
1077 	doc_id_t*	array = (doc_id_t*) doc_ids->data;
1078 
1079 	pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1080 
1081 	ut_a(abs(pos) <= upper + 1);
1082 
1083 	if (pos < 0) {
1084 
1085 		int	i = abs(pos);
1086 
1087 		/* If i is 1, it could be first_doc_id is less than
1088 		either the first or second array item, do a
1089 		double check */
1090 		if (i == 1 && array[0] <= last_doc_id
1091 		    && first_doc_id < array[0]) {
1092 			pos = 0;
1093 		} else if (i < upper && array[i] <= last_doc_id) {
1094 
1095 			/* Check if the "next" doc id is within the
1096 			first & last doc id of the node. */
1097 			pos = i;
1098 		}
1099 	}
1100 
1101 	return(pos);
1102 }
1103 
1104 /**********************************************************************//**
1105 Encode the word pos list into the node
1106 @return DB_SUCCESS or error code*/
1107 static MY_ATTRIBUTE((nonnull))
1108 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1109 fts_optimize_encode_node(
1110 /*=====================*/
1111 	fts_node_t*	node,		/*!< in: node to fill*/
1112 	doc_id_t	doc_id,		/*!< in: doc id to encode */
1113 	fts_encode_t*	enc)		/*!< in: encoding state.*/
1114 {
1115 	byte*		dst;
1116 	ulint		enc_len;
1117 	ulint		pos_enc_len;
1118 	doc_id_t	doc_id_delta;
1119 	dberr_t		error = DB_SUCCESS;
1120 	const byte*	src = enc->src_ilist_ptr;
1121 
1122 	if (node->first_doc_id == 0) {
1123 		ut_a(node->last_doc_id == 0);
1124 
1125 		node->first_doc_id = doc_id;
1126 	}
1127 
1128 	/* Calculate the space required to store the ilist. */
1129 	ut_ad(doc_id > node->last_doc_id);
1130 	doc_id_delta = doc_id - node->last_doc_id;
1131 	enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1132 
1133 	/* Calculate the size of the encoded pos array. */
1134 	while (*src) {
1135 		fts_decode_vlc(&src);
1136 	}
1137 
1138 	/* Skip the 0x00 byte at the end of the word positions list. */
1139 	++src;
1140 
1141 	/* Number of encoded pos bytes to copy. */
1142 	pos_enc_len = ulint(src - enc->src_ilist_ptr);
1143 
1144 	/* Total number of bytes required for copy. */
1145 	enc_len += pos_enc_len;
1146 
1147 	/* Check we have enough space in the destination buffer for
1148 	copying the document word list. */
1149 	if (!node->ilist) {
1150 		ulint	new_size;
1151 
1152 		ut_a(node->ilist_size == 0);
1153 
1154 		new_size = enc_len > FTS_ILIST_MAX_SIZE
1155 			? enc_len : FTS_ILIST_MAX_SIZE;
1156 
1157 		node->ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1158 		node->ilist_size_alloc = new_size;
1159 
1160 	} else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1161 		ulint	new_size = node->ilist_size + enc_len;
1162 		byte*	ilist = static_cast<byte*>(ut_malloc_nokey(new_size));
1163 
1164 		memcpy(ilist, node->ilist, node->ilist_size);
1165 
1166 		ut_free(node->ilist);
1167 
1168 		node->ilist = ilist;
1169 		node->ilist_size_alloc = new_size;
1170 	}
1171 
1172 	src = enc->src_ilist_ptr;
1173 	dst = node->ilist + node->ilist_size;
1174 
1175 	/* Encode the doc id. Cast to ulint, the delta should be small and
1176 	therefore no loss of precision. */
1177 	dst = fts_encode_int(doc_id_delta, dst);
1178 
1179 	/* Copy the encoded pos array. */
1180 	memcpy(dst, src, pos_enc_len);
1181 
1182 	node->last_doc_id = doc_id;
1183 
1184 	/* Data copied upto here. */
1185 	node->ilist_size += enc_len;
1186 	enc->src_ilist_ptr += pos_enc_len;
1187 
1188 	ut_a(node->ilist_size <= node->ilist_size_alloc);
1189 
1190 	return(error);
1191 }
1192 
1193 /**********************************************************************//**
1194 Optimize the data contained in a node.
1195 @return DB_SUCCESS or error code*/
1196 static MY_ATTRIBUTE((nonnull))
1197 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1198 fts_optimize_node(
1199 /*==============*/
1200 	ib_vector_t*	del_vec,	/*!< in: vector of doc ids to delete*/
1201 	int*		del_pos,	/*!< in: offset into above vector */
1202 	fts_node_t*	dst_node,	/*!< in: node to fill*/
1203 	fts_node_t*	src_node,	/*!< in: source node for data*/
1204 	fts_encode_t*	enc)		/*!< in: encoding state */
1205 {
1206 	ulint		copied;
1207 	dberr_t		error = DB_SUCCESS;
1208 	doc_id_t	doc_id = enc->src_last_doc_id;
1209 
1210 	if (!enc->src_ilist_ptr) {
1211 		enc->src_ilist_ptr = src_node->ilist;
1212 	}
1213 
1214 	copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1215 
1216 	/* While there is data in the source node and space to copy
1217 	into in the destination node. */
1218 	while (copied < src_node->ilist_size
1219 	       && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1220 
1221 		doc_id_t	delta;
1222 		doc_id_t	del_doc_id = FTS_NULL_DOC_ID;
1223 
1224 		delta = fts_decode_vlc(
1225 			(const byte**)&enc->src_ilist_ptr);
1226 
1227 test_again:
1228 		/* Check whether the doc id is in the delete list, if
1229 		so then we skip the entries but we need to track the
1230 		delta for decoding the entries following this document's
1231 		entries. */
1232 		if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1233 			doc_id_t*	update;
1234 
1235 			update = (doc_id_t*) ib_vector_get(
1236 				del_vec, ulint(*del_pos));
1237 
1238 			del_doc_id = *update;
1239 		}
1240 
1241 		if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1242 			ut_a(delta == src_node->first_doc_id);
1243 		}
1244 
1245 		doc_id += delta;
1246 
1247 		if (del_doc_id > 0 && doc_id == del_doc_id) {
1248 
1249 			++*del_pos;
1250 
1251 			/* Skip the entries for this document. */
1252 			while (*enc->src_ilist_ptr) {
1253 				fts_decode_vlc((const byte**)&enc->src_ilist_ptr);
1254 			}
1255 
1256 			/* Skip the end of word position marker. */
1257 			++enc->src_ilist_ptr;
1258 
1259 		} else {
1260 
1261 			/* DOC ID already becomes larger than
1262 			del_doc_id, check the next del_doc_id */
1263 			if (del_doc_id > 0 && doc_id > del_doc_id) {
1264 				del_doc_id = 0;
1265 				++*del_pos;
1266 				delta = 0;
1267 				goto test_again;
1268 			}
1269 
1270 			/* Decode and copy the word positions into
1271 			the dest node. */
1272 			fts_optimize_encode_node(dst_node, doc_id, enc);
1273 
1274 			++dst_node->doc_count;
1275 
1276 			ut_a(dst_node->last_doc_id == doc_id);
1277 		}
1278 
1279 		/* Bytes copied so for from source. */
1280 		copied = ulint(enc->src_ilist_ptr - src_node->ilist);
1281 	}
1282 
1283 	if (copied >= src_node->ilist_size) {
1284 		ut_a(doc_id == src_node->last_doc_id);
1285 	}
1286 
1287 	enc->src_last_doc_id = doc_id;
1288 
1289 	return(error);
1290 }
1291 
1292 /**********************************************************************//**
1293 Determine the starting pos within the deleted doc id vector for a word.
1294 @return delete position */
1295 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1296 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1297 fts_optimize_deleted_pos(
1298 /*=====================*/
1299 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1300 	fts_word_t*	word)		/*!< in: the word data to check */
1301 {
1302 	int		del_pos;
1303 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1304 
1305 	/* Get the first and last dict ids for the word, we will use
1306 	these values to determine which doc ids need to be removed
1307 	when we coalesce the nodes. This way we can reduce the numer
1308 	of elements that need to be searched in the deleted doc ids
1309 	vector and secondly we can remove the doc ids during the
1310 	coalescing phase. */
1311 	if (ib_vector_size(del_vec) > 0) {
1312 		fts_node_t*	node;
1313 		doc_id_t	last_id;
1314 		doc_id_t	first_id;
1315 		ulint		size = ib_vector_size(word->nodes);
1316 
1317 		node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1318 		first_id = node->first_doc_id;
1319 
1320 		node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1321 		last_id = node->last_doc_id;
1322 
1323 		ut_a(first_id <= last_id);
1324 
1325 		del_pos = fts_optimize_lookup(
1326 			del_vec, optim->del_pos, first_id, last_id);
1327 	} else {
1328 
1329 		del_pos = -1; /* Note that there is nothing to delete. */
1330 	}
1331 
1332 	return(del_pos);
1333 }
1334 
1335 #define FTS_DEBUG_PRINT
1336 /**********************************************************************//**
1337 Compact the nodes for a word, we also remove any doc ids during the
1338 compaction pass.
1339 @return DB_SUCCESS or error code.*/
1340 static
1341 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1342 fts_optimize_word(
1343 /*==============*/
1344 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1345 	fts_word_t*	word)		/*!< in: the word to optimize */
1346 {
1347 	fts_encode_t	enc;
1348 	ib_vector_t*	nodes;
1349 	ulint		i = 0;
1350 	int		del_pos;
1351 	fts_node_t*	dst_node = NULL;
1352 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1353 	ulint		size = ib_vector_size(word->nodes);
1354 
1355 	del_pos = fts_optimize_deleted_pos(optim, word);
1356 	nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1357 
1358 	enc.src_last_doc_id = 0;
1359 	enc.src_ilist_ptr = NULL;
1360 
1361 	while (i < size) {
1362 		ulint		copied;
1363 		fts_node_t*	src_node;
1364 
1365 		src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1366 
1367 		if (dst_node == NULL
1368 		    || dst_node->last_doc_id > src_node->first_doc_id) {
1369 
1370 			dst_node = static_cast<fts_node_t*>(
1371 				ib_vector_push(nodes, NULL));
1372 			memset(dst_node, 0, sizeof(*dst_node));
1373 		}
1374 
1375 		/* Copy from the src to the dst node. */
1376 		fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1377 
1378 		ut_a(enc.src_ilist_ptr != NULL);
1379 
1380 		/* Determine the numer of bytes copied to dst_node. */
1381 		copied = ulint(enc.src_ilist_ptr - src_node->ilist);
1382 
1383 		/* Can't copy more than whats in the vlc array. */
1384 		ut_a(copied <= src_node->ilist_size);
1385 
1386 		/* We are done with this node release the resources. */
1387 		if (copied == src_node->ilist_size) {
1388 
1389 			enc.src_last_doc_id = 0;
1390 			enc.src_ilist_ptr = NULL;
1391 
1392 			ut_free(src_node->ilist);
1393 
1394 			src_node->ilist = NULL;
1395 			src_node->ilist_size = src_node->ilist_size_alloc = 0;
1396 
1397 			src_node = NULL;
1398 
1399 			++i; /* Get next source node to OPTIMIZE. */
1400 		}
1401 
1402 		if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1403 
1404 			dst_node = NULL;
1405 		}
1406 	}
1407 
1408 	/* All dst nodes created should have been added to the vector. */
1409 	ut_a(dst_node == NULL);
1410 
1411 	/* Return the OPTIMIZED nodes. */
1412 	return(nodes);
1413 }
1414 
1415 /**********************************************************************//**
1416 Update the FTS index table. This is a delete followed by an insert.
1417 @return DB_SUCCESS or error code */
1418 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1419 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1420 fts_optimize_write_word(
1421 /*====================*/
1422 	trx_t*		trx,		/*!< in: transaction */
1423 	fts_table_t*	fts_table,	/*!< in: table of FTS index */
1424 	fts_string_t*	word,		/*!< in: word data to write */
1425 	ib_vector_t*	nodes)		/*!< in: the nodes to write */
1426 {
1427 	ulint		i;
1428 	pars_info_t*	info;
1429 	que_t*		graph;
1430 	ulint		selected;
1431 	dberr_t		error = DB_SUCCESS;
1432 	char		table_name[MAX_FULL_NAME_LEN];
1433 
1434 	info = pars_info_create();
1435 
1436 	ut_ad(fts_table->charset);
1437 
1438 	pars_info_bind_varchar_literal(
1439 		info, "word", word->f_str, word->f_len);
1440 
1441 	selected = fts_select_index(fts_table->charset,
1442 				    word->f_str, word->f_len);
1443 
1444 	fts_table->suffix = fts_get_suffix(selected);
1445 	fts_get_table_name(fts_table, table_name);
1446 	pars_info_bind_id(info, "table_name", table_name);
1447 
1448 	graph = fts_parse_sql(
1449 		fts_table,
1450 		info,
1451 		"BEGIN DELETE FROM $table_name WHERE word = :word;");
1452 
1453 	error = fts_eval_sql(trx, graph);
1454 
1455 	if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1456 		ib::error() << "(" << error << ") during optimize,"
1457 			" when deleting a word from the FTS index.";
1458 	}
1459 
1460 	fts_que_graph_free(graph);
1461 	graph = NULL;
1462 
1463 	/* Even if the operation needs to be rolled back and redone,
1464 	we iterate over the nodes in order to free the ilist. */
1465 	for (i = 0; i < ib_vector_size(nodes); ++i) {
1466 
1467 		fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1468 
1469 		if (error == DB_SUCCESS) {
1470 			/* Skip empty node. */
1471 			if (node->ilist == NULL) {
1472 				ut_ad(node->ilist_size == 0);
1473 				continue;
1474 			}
1475 
1476 			error = fts_write_node(
1477 				trx, &graph, fts_table, word, node);
1478 
1479 			if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1480 				ib::error() << "(" << error << ")"
1481 					" during optimize, while adding a"
1482 					" word to the FTS index.";
1483 			}
1484 		}
1485 
1486 		ut_free(node->ilist);
1487 		node->ilist = NULL;
1488 		node->ilist_size = node->ilist_size_alloc = 0;
1489 	}
1490 
1491 	if (graph != NULL) {
1492 		fts_que_graph_free(graph);
1493 	}
1494 
1495 	return(error);
1496 }
1497 
1498 /**********************************************************************//**
1499 Free fts_optimizer_word_t instanace.*/
1500 void
fts_word_free(fts_word_t * word)1501 fts_word_free(
1502 /*==========*/
1503 	fts_word_t*	word)		/*!< in: instance to free.*/
1504 {
1505 	mem_heap_t*	heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1506 
1507 #ifdef UNIV_DEBUG
1508 	memset(word, 0, sizeof(*word));
1509 #endif /* UNIV_DEBUG */
1510 
1511 	mem_heap_free(heap);
1512 }
1513 
1514 /**********************************************************************//**
1515 Optimize the word ilist and rewrite data to the FTS index.
1516 @return status one of RESTART, EXIT, ERROR */
1517 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1518 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,time_t start_time)1519 fts_optimize_compact(
1520 /*=================*/
1521 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1522 	dict_index_t*	index,		/*!< in: current FTS being optimized */
1523 	time_t		start_time)	/*!< in: optimize start time */
1524 {
1525 	ulint		i;
1526 	dberr_t		error = DB_SUCCESS;
1527 	ulint		size = ib_vector_size(optim->words);
1528 
1529 	for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1530 		fts_word_t*	word;
1531 		ib_vector_t*	nodes;
1532 		trx_t*		trx = optim->trx;
1533 
1534 		word = (fts_word_t*) ib_vector_get(optim->words, i);
1535 
1536 		/* nodes is allocated from the word heap and will be destroyed
1537 		when the word is freed. We however have to be careful about
1538 		the ilist, that needs to be freed explicitly. */
1539 		nodes = fts_optimize_word(optim, word);
1540 
1541 		/* Update the data on disk. */
1542 		error = fts_optimize_write_word(
1543 			trx, &optim->fts_index_table, &word->text, nodes);
1544 
1545 		if (error == DB_SUCCESS) {
1546 			/* Write the last word optimized to the config table,
1547 			we use this value for restarting optimize. */
1548 			error = fts_config_set_index_value(
1549 				optim->trx, index,
1550 				FTS_LAST_OPTIMIZED_WORD, &word->text);
1551 		}
1552 
1553 		/* Free the word that was optimized. */
1554 		fts_word_free(word);
1555 
1556 		ulint interval = ulint(time(NULL) - start_time);
1557 
1558 		if (fts_optimize_time_limit > 0
1559 		    && (lint(interval) < 0
1560 			|| interval > fts_optimize_time_limit)) {
1561 
1562 			optim->done = TRUE;
1563 		}
1564 	}
1565 
1566 	return(error);
1567 }
1568 
1569 /**********************************************************************//**
1570 Create an instance of fts_optimize_t. Also create a new
1571 background transaction.*/
1572 static
1573 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1574 fts_optimize_create(
1575 /*================*/
1576 	dict_table_t*	table)		/*!< in: table with FTS indexes */
1577 {
1578 	fts_optimize_t*	optim;
1579 	mem_heap_t*	heap = mem_heap_create(128);
1580 
1581 	optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1582 
1583 	optim->self_heap = ib_heap_allocator_create(heap);
1584 
1585 	optim->to_delete = fts_doc_ids_create();
1586 
1587 	optim->words = ib_vector_create(
1588 		optim->self_heap, sizeof(fts_word_t), 256);
1589 
1590 	optim->table = table;
1591 
1592 	optim->trx = trx_create();
1593 	trx_start_internal(optim->trx);
1594 
1595 	optim->fts_common_table.table_id = table->id;
1596 	optim->fts_common_table.type = FTS_COMMON_TABLE;
1597 	optim->fts_common_table.table = table;
1598 
1599 	optim->fts_index_table.table_id = table->id;
1600 	optim->fts_index_table.type = FTS_INDEX_TABLE;
1601 	optim->fts_index_table.table = table;
1602 
1603 	/* The common prefix for all this parent table's aux tables. */
1604 	optim->name_prefix = fts_get_table_name_prefix(
1605 		&optim->fts_common_table);
1606 
1607 	return(optim);
1608 }
1609 
1610 #ifdef FTS_OPTIMIZE_DEBUG
1611 /**********************************************************************//**
1612 Get optimize start time of an FTS index.
1613 @return DB_SUCCESS if all OK else error code */
1614 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1615 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,time_t * start_time)1616 fts_optimize_get_index_start_time(
1617 /*==============================*/
1618 	trx_t*		trx,			/*!< in: transaction */
1619 	dict_index_t*	index,			/*!< in: FTS index */
1620 	time_t*		start_time)		/*!< out: time in secs */
1621 {
1622 	return(fts_config_get_index_ulint(
1623 		       trx, index, FTS_OPTIMIZE_START_TIME,
1624 		       (ulint*) start_time));
1625 }
1626 
1627 /**********************************************************************//**
1628 Set the optimize start time of an FTS index.
1629 @return DB_SUCCESS if all OK else error code */
1630 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1631 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,time_t start_time)1632 fts_optimize_set_index_start_time(
1633 /*==============================*/
1634 	trx_t*		trx,			/*!< in: transaction */
1635 	dict_index_t*	index,			/*!< in: FTS index */
1636 	time_t		start_time)		/*!< in: start time */
1637 {
1638 	return(fts_config_set_index_ulint(
1639 		       trx, index, FTS_OPTIMIZE_START_TIME,
1640 		       (ulint) start_time));
1641 }
1642 
1643 /**********************************************************************//**
1644 Get optimize end time of an FTS index.
1645 @return DB_SUCCESS if all OK else error code */
1646 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1647 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,time_t * end_time)1648 fts_optimize_get_index_end_time(
1649 /*============================*/
1650 	trx_t*		trx,			/*!< in: transaction */
1651 	dict_index_t*	index,			/*!< in: FTS index */
1652 	time_t*		end_time)		/*!< out: time in secs */
1653 {
1654 	return(fts_config_get_index_ulint(
1655 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1656 }
1657 
1658 /**********************************************************************//**
1659 Set the optimize end time of an FTS index.
1660 @return DB_SUCCESS if all OK else error code */
1661 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1662 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,time_t end_time)1663 fts_optimize_set_index_end_time(
1664 /*============================*/
1665 	trx_t*		trx,			/*!< in: transaction */
1666 	dict_index_t*	index,			/*!< in: FTS index */
1667 	time_t		end_time)		/*!< in: end time */
1668 {
1669 	return(fts_config_set_index_ulint(
1670 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1671 }
1672 #endif
1673 
1674 /**********************************************************************//**
1675 Free the optimize prepared statements.*/
1676 static
1677 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1678 fts_optimize_graph_free(
1679 /*====================*/
1680 	fts_optimize_graph_t*	graph)	/*!< in/out: The graph instances
1681 					to free */
1682 {
1683 	if (graph->commit_graph) {
1684 		que_graph_free(graph->commit_graph);
1685 		graph->commit_graph = NULL;
1686 	}
1687 
1688 	if (graph->write_nodes_graph) {
1689 		que_graph_free(graph->write_nodes_graph);
1690 		graph->write_nodes_graph = NULL;
1691 	}
1692 
1693 	if (graph->delete_nodes_graph) {
1694 		que_graph_free(graph->delete_nodes_graph);
1695 		graph->delete_nodes_graph = NULL;
1696 	}
1697 
1698 	if (graph->read_nodes_graph) {
1699 		que_graph_free(graph->read_nodes_graph);
1700 		graph->read_nodes_graph = NULL;
1701 	}
1702 }
1703 
1704 /**********************************************************************//**
1705 Free all optimize resources. */
1706 static
1707 void
fts_optimize_free(fts_optimize_t * optim)1708 fts_optimize_free(
1709 /*==============*/
1710 	fts_optimize_t*	optim)		/*!< in: table with on FTS index */
1711 {
1712 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1713 
1714 	trx_commit_for_mysql(optim->trx);
1715 	optim->trx->free();
1716 	optim->trx = NULL;
1717 
1718 	fts_doc_ids_free(optim->to_delete);
1719 	fts_optimize_graph_free(&optim->graph);
1720 
1721 	ut_free(optim->name_prefix);
1722 
1723 	/* This will free the heap from which optim itself was allocated. */
1724 	mem_heap_free(heap);
1725 }
1726 
1727 /**********************************************************************//**
1728 Get the max time optimize should run in millisecs.
1729 @return max optimize time limit in millisecs. */
1730 static
1731 ulint
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1732 fts_optimize_get_time_limit(
1733 /*========================*/
1734 	trx_t*		trx,			/*!< in: transaction */
1735 	fts_table_t*	fts_table)		/*!< in: aux table */
1736 {
1737 	ulint	time_limit = 0;
1738 
1739 	fts_config_get_ulint(
1740 		trx, fts_table,
1741 		FTS_OPTIMIZE_LIMIT_IN_SECS, &time_limit);
1742 
1743 	/* FIXME: This is returning milliseconds, while the variable
1744 	is being stored and interpreted as seconds! */
1745 	return(time_limit * 1000);
1746 }
1747 
1748 /**********************************************************************//**
1749 Run OPTIMIZE on the given table. Note: this can take a very long time
1750 (hours). */
1751 static
1752 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1753 fts_optimize_words(
1754 /*===============*/
1755 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1756 	dict_index_t*	index,	/*!< in: current FTS being optimized */
1757 	fts_string_t*	word)	/*!< in: the starting word to optimize */
1758 {
1759 	fts_fetch_t	fetch;
1760 	que_t*		graph = NULL;
1761 	CHARSET_INFO*	charset = optim->fts_index_table.charset;
1762 
1763 	ut_a(!optim->done);
1764 
1765 	/* Get the time limit from the config table. */
1766 	fts_optimize_time_limit = fts_optimize_get_time_limit(
1767 		optim->trx, &optim->fts_common_table);
1768 
1769 	const time_t start_time = time(NULL);
1770 
1771 	/* Setup the callback to use for fetching the word ilist etc. */
1772 	fetch.read_arg = optim->words;
1773 	fetch.read_record = fts_optimize_index_fetch_node;
1774 
1775 	while (!optim->done) {
1776 		dberr_t	error;
1777 		trx_t*	trx = optim->trx;
1778 		ulint	selected;
1779 
1780 		ut_a(ib_vector_size(optim->words) == 0);
1781 
1782 		selected = fts_select_index(charset, word->f_str, word->f_len);
1783 
1784 		/* Read the index records to optimize. */
1785 		fetch.total_memory = 0;
1786 		error = fts_index_fetch_nodes(
1787 			trx, &graph, &optim->fts_index_table, word,
1788 			&fetch);
1789 		ut_ad(fetch.total_memory < fts_result_cache_limit);
1790 
1791 		if (error == DB_SUCCESS) {
1792 			/* There must be some nodes to read. */
1793 			ut_a(ib_vector_size(optim->words) > 0);
1794 
1795 			/* Optimize the nodes that were read and write
1796 			back to DB. */
1797 			error = fts_optimize_compact(optim, index, start_time);
1798 
1799 			if (error == DB_SUCCESS) {
1800 				fts_sql_commit(optim->trx);
1801 			} else {
1802 				fts_sql_rollback(optim->trx);
1803 			}
1804 		}
1805 
1806 		ib_vector_reset(optim->words);
1807 
1808 		if (error == DB_SUCCESS) {
1809 			if (!optim->done) {
1810 				if (!fts_zip_read_word(optim->zip, word)) {
1811 					optim->done = TRUE;
1812 				} else if (selected
1813 					   != fts_select_index(
1814 						charset, word->f_str,
1815 						word->f_len)
1816 					  && graph) {
1817 					fts_que_graph_free(graph);
1818 					graph = NULL;
1819 				}
1820 			}
1821 		} else if (error == DB_LOCK_WAIT_TIMEOUT) {
1822 			ib::warn() << "Lock wait timeout during optimize."
1823 				" Retrying!";
1824 
1825 			trx->error_state = DB_SUCCESS;
1826 		} else if (error == DB_DEADLOCK) {
1827 			ib::warn() << "Deadlock during optimize. Retrying!";
1828 
1829 			trx->error_state = DB_SUCCESS;
1830 		} else {
1831 			optim->done = TRUE;		/* Exit the loop. */
1832 		}
1833 	}
1834 
1835 	if (graph != NULL) {
1836 		fts_que_graph_free(graph);
1837 	}
1838 }
1839 
1840 /**********************************************************************//**
1841 Optimize is complete. Set the completion time, and reset the optimize
1842 start string for this FTS index to "".
1843 @return DB_SUCCESS if all OK */
1844 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1845 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1846 fts_optimize_index_completed(
1847 /*=========================*/
1848 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1849 	dict_index_t*	index)	/*!< in: table with one FTS index */
1850 {
1851 	fts_string_t	word;
1852 	dberr_t		error;
1853 	byte		buf[sizeof(ulint)];
1854 #ifdef FTS_OPTIMIZE_DEBUG
1855 	time_t		end_time = time(NULL);
1856 
1857 	error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1858 #endif
1859 
1860 	/* If we've reached the end of the index then set the start
1861 	word to the empty string. */
1862 
1863 	word.f_len = 0;
1864 	word.f_str = buf;
1865 	*word.f_str = '\0';
1866 
1867 	error = fts_config_set_index_value(
1868 		optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1869 
1870 	if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1871 		ib::error() << "(" << error << ") while updating"
1872 			" last optimized word!";
1873 	}
1874 
1875 	return(error);
1876 }
1877 
1878 
1879 /**********************************************************************//**
1880 Read the list of words from the FTS auxiliary index that will be
1881 optimized in this pass.
1882 @return DB_SUCCESS if all OK */
1883 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1884 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1885 fts_optimize_index_read_words(
1886 /*==========================*/
1887 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1888 	dict_index_t*	index,	/*!< in: table with one FTS index */
1889 	fts_string_t*	word)	/*!< in: buffer to use */
1890 {
1891 	dberr_t	error = DB_SUCCESS;
1892 
1893 	if (optim->del_list_regenerated) {
1894 		word->f_len = 0;
1895 	} else {
1896 
1897 		/* Get the last word that was optimized from
1898 		the config table. */
1899 		error = fts_config_get_index_value(
1900 			optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1901 	}
1902 
1903 	/* If record not found then we start from the top. */
1904 	if (error == DB_RECORD_NOT_FOUND) {
1905 		word->f_len = 0;
1906 		error = DB_SUCCESS;
1907 	}
1908 
1909 	while (error == DB_SUCCESS) {
1910 
1911 		error = fts_index_fetch_words(
1912 			optim, word, fts_num_word_optimize);
1913 
1914 		if (error == DB_SUCCESS) {
1915 			/* Reset the last optimized word to '' if no
1916 			more words could be read from the FTS index. */
1917 			if (optim->zip->n_words == 0) {
1918 				word->f_len = 0;
1919 				*word->f_str = 0;
1920 			}
1921 
1922 			break;
1923 		}
1924 	}
1925 
1926 	return(error);
1927 }
1928 
1929 /**********************************************************************//**
1930 Run OPTIMIZE on the given FTS index. Note: this can take a very long
1931 time (hours).
1932 @return DB_SUCCESS if all OK */
1933 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1934 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)1935 fts_optimize_index(
1936 /*===============*/
1937 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1938 	dict_index_t*	index)	/*!< in: table with one FTS index */
1939 {
1940 	fts_string_t	word;
1941 	dberr_t		error;
1942 	byte		str[FTS_MAX_WORD_LEN + 1];
1943 
1944 	/* Set the current index that we have to optimize. */
1945 	optim->fts_index_table.index_id = index->id;
1946 	optim->fts_index_table.charset = fts_index_get_charset(index);
1947 
1948 	optim->done = FALSE; /* Optimize until !done */
1949 
1950 	/* We need to read the last word optimized so that we start from
1951 	the next word. */
1952 	word.f_str = str;
1953 
1954 	/* We set the length of word to the size of str since we
1955 	need to pass the max len info to the fts_get_config_value() function. */
1956 	word.f_len = sizeof(str) - 1;
1957 
1958 	memset(word.f_str, 0x0, word.f_len);
1959 
1960 	/* Read the words that will be optimized in this pass. */
1961 	error = fts_optimize_index_read_words(optim, index, &word);
1962 
1963 	if (error == DB_SUCCESS) {
1964 		int	zip_error;
1965 
1966 		ut_a(optim->zip->pos == 0);
1967 		ut_a(optim->zip->zp->total_in == 0);
1968 		ut_a(optim->zip->zp->total_out == 0);
1969 
1970 		zip_error = inflateInit(optim->zip->zp);
1971 		ut_a(zip_error == Z_OK);
1972 
1973 		word.f_len = 0;
1974 		word.f_str = str;
1975 
1976 		/* Read the first word to optimize from the Zip buffer. */
1977 		if (!fts_zip_read_word(optim->zip, &word)) {
1978 
1979 			optim->done = TRUE;
1980 		} else {
1981 			fts_optimize_words(optim, index, &word);
1982 		}
1983 
1984 		/* If we couldn't read any records then optimize is
1985 		complete. Increment the number of indexes that have
1986 		been optimized and set FTS index optimize state to
1987 		completed. */
1988 		if (error == DB_SUCCESS && optim->zip->n_words == 0) {
1989 
1990 			error = fts_optimize_index_completed(optim, index);
1991 
1992 			if (error == DB_SUCCESS) {
1993 				++optim->n_completed;
1994 			}
1995 		}
1996 	}
1997 
1998 	return(error);
1999 }
2000 
2001 /**********************************************************************//**
2002 Delete the document ids in the delete, and delete cache tables.
2003 @return DB_SUCCESS if all OK */
2004 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2005 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2006 fts_optimize_purge_deleted_doc_ids(
2007 /*===============================*/
2008 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2009 {
2010 	ulint		i;
2011 	pars_info_t*	info;
2012 	que_t*		graph;
2013 	doc_id_t*	update;
2014 	doc_id_t	write_doc_id;
2015 	dberr_t		error = DB_SUCCESS;
2016 	char		deleted[MAX_FULL_NAME_LEN];
2017 	char		deleted_cache[MAX_FULL_NAME_LEN];
2018 
2019 	info = pars_info_create();
2020 
2021 	ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2022 
2023 	update = static_cast<doc_id_t*>(
2024 		ib_vector_get(optim->to_delete->doc_ids, 0));
2025 
2026 	/* Convert to "storage" byte order. */
2027 	fts_write_doc_id((byte*) &write_doc_id, *update);
2028 
2029 	/* This is required for the SQL parser to work. It must be able
2030 	to find the following variables. So we do it twice. */
2031 	fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2032 	fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2033 
2034 	/* Make sure the following two names are consistent with the name
2035 	used in the fts_delete_doc_ids_sql */
2036 	optim->fts_common_table.suffix = fts_common_tables[3];
2037 	fts_get_table_name(&optim->fts_common_table, deleted);
2038 	pars_info_bind_id(info, fts_common_tables[3], deleted);
2039 
2040 	optim->fts_common_table.suffix = fts_common_tables[4];
2041 	fts_get_table_name(&optim->fts_common_table, deleted_cache);
2042 	pars_info_bind_id(info, fts_common_tables[4], deleted_cache);
2043 
2044 	graph = fts_parse_sql(NULL, info, fts_delete_doc_ids_sql);
2045 
2046 	/* Delete the doc ids that were copied at the start. */
2047 	for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2048 
2049 		update = static_cast<doc_id_t*>(ib_vector_get(
2050 			optim->to_delete->doc_ids, i));
2051 
2052 		/* Convert to "storage" byte order. */
2053 		fts_write_doc_id((byte*) &write_doc_id, *update);
2054 
2055 		fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2056 
2057 		fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2058 
2059 		error = fts_eval_sql(optim->trx, graph);
2060 
2061 		// FIXME: Check whether delete actually succeeded!
2062 		if (error != DB_SUCCESS) {
2063 
2064 			fts_sql_rollback(optim->trx);
2065 			break;
2066 		}
2067 	}
2068 
2069 	fts_que_graph_free(graph);
2070 
2071 	return(error);
2072 }
2073 
2074 /**********************************************************************//**
2075 Delete the document ids in the pending delete, and delete tables.
2076 @return DB_SUCCESS if all OK */
2077 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2078 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2079 fts_optimize_purge_deleted_doc_id_snapshot(
2080 /*=======================================*/
2081 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2082 {
2083 	dberr_t		error;
2084 	que_t*		graph;
2085 	pars_info_t*	info;
2086 	char		being_deleted[MAX_FULL_NAME_LEN];
2087 	char		being_deleted_cache[MAX_FULL_NAME_LEN];
2088 
2089 	info = pars_info_create();
2090 
2091 	/* Make sure the following two names are consistent with the name
2092 	used in the fts_end_delete_sql */
2093 	optim->fts_common_table.suffix = fts_common_tables[0];
2094 	fts_get_table_name(&optim->fts_common_table, being_deleted);
2095 	pars_info_bind_id(info, fts_common_tables[0], being_deleted);
2096 
2097 	optim->fts_common_table.suffix = fts_common_tables[1];
2098 	fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2099 	pars_info_bind_id(info, fts_common_tables[1], being_deleted_cache);
2100 
2101 	/* Delete the doc ids that were copied to delete pending state at
2102 	the start of optimize. */
2103 	graph = fts_parse_sql(NULL, info, fts_end_delete_sql);
2104 
2105 	error = fts_eval_sql(optim->trx, graph);
2106 	fts_que_graph_free(graph);
2107 
2108 	return(error);
2109 }
2110 
2111 /**********************************************************************//**
2112 Copy the deleted doc ids that will be purged during this optimize run
2113 to the being deleted FTS auxiliary tables. The transaction is committed
2114 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2115 @return DB_SUCCESS if all OK */
2116 static
2117 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2118 fts_optimize_being_deleted_count(
2119 /*=============================*/
2120 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2121 {
2122 	fts_table_t	fts_table;
2123 
2124 	FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2125 			   optim->table);
2126 
2127 	return(fts_get_rows_count(&fts_table));
2128 }
2129 
2130 /*********************************************************************//**
2131 Copy the deleted doc ids that will be purged during this optimize run
2132 to the being deleted FTS auxiliary tables. The transaction is committed
2133 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2134 @return DB_SUCCESS if all OK */
2135 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2136 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2137 fts_optimize_create_deleted_doc_id_snapshot(
2138 /*========================================*/
2139 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2140 {
2141 	dberr_t		error;
2142 	que_t*		graph;
2143 	pars_info_t*	info;
2144 	char		being_deleted[MAX_FULL_NAME_LEN];
2145 	char		deleted[MAX_FULL_NAME_LEN];
2146 	char		being_deleted_cache[MAX_FULL_NAME_LEN];
2147 	char		deleted_cache[MAX_FULL_NAME_LEN];
2148 
2149 	info = pars_info_create();
2150 
2151 	/* Make sure the following four names are consistent with the name
2152 	used in the fts_init_delete_sql */
2153 	optim->fts_common_table.suffix = fts_common_tables[0];
2154 	fts_get_table_name(&optim->fts_common_table, being_deleted);
2155 	pars_info_bind_id(info, fts_common_tables[0], being_deleted);
2156 
2157 	optim->fts_common_table.suffix = fts_common_tables[3];
2158 	fts_get_table_name(&optim->fts_common_table, deleted);
2159 	pars_info_bind_id(info, fts_common_tables[3], deleted);
2160 
2161 	optim->fts_common_table.suffix = fts_common_tables[1];
2162 	fts_get_table_name(&optim->fts_common_table, being_deleted_cache);
2163 	pars_info_bind_id(info, fts_common_tables[1], being_deleted_cache);
2164 
2165 	optim->fts_common_table.suffix = fts_common_tables[4];
2166 	fts_get_table_name(&optim->fts_common_table, deleted_cache);
2167 	pars_info_bind_id(info, fts_common_tables[4], deleted_cache);
2168 
2169 	/* Move doc_ids that are to be deleted to state being deleted. */
2170 	graph = fts_parse_sql(NULL, info, fts_init_delete_sql);
2171 
2172 	error = fts_eval_sql(optim->trx, graph);
2173 
2174 	fts_que_graph_free(graph);
2175 
2176 	if (error != DB_SUCCESS) {
2177 		fts_sql_rollback(optim->trx);
2178 	} else {
2179 		fts_sql_commit(optim->trx);
2180 	}
2181 
2182 	optim->del_list_regenerated = TRUE;
2183 
2184 	return(error);
2185 }
2186 
2187 /*********************************************************************//**
2188 Read in the document ids that are to be purged during optimize. The
2189 transaction is committed upon successfully read.
2190 @return DB_SUCCESS if all OK */
2191 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2192 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2193 fts_optimize_read_deleted_doc_id_snapshot(
2194 /*======================================*/
2195 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2196 {
2197 	dberr_t		error;
2198 
2199 	optim->fts_common_table.suffix = "BEING_DELETED";
2200 
2201 	/* Read the doc_ids to delete. */
2202 	error = fts_table_fetch_doc_ids(
2203 		optim->trx, &optim->fts_common_table, optim->to_delete);
2204 
2205 	if (error == DB_SUCCESS) {
2206 
2207 		optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2208 
2209 		/* Read additional doc_ids to delete. */
2210 		error = fts_table_fetch_doc_ids(
2211 			optim->trx, &optim->fts_common_table, optim->to_delete);
2212 	}
2213 
2214 	if (error != DB_SUCCESS) {
2215 
2216 		fts_doc_ids_free(optim->to_delete);
2217 		optim->to_delete = NULL;
2218 	}
2219 
2220 	return(error);
2221 }
2222 
2223 /*********************************************************************//**
2224 Optimze all the FTS indexes, skipping those that have already been
2225 optimized, since the FTS auxiliary indexes are not guaranteed to be
2226 of the same cardinality.
2227 @return DB_SUCCESS if all OK */
2228 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2229 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2230 fts_optimize_indexes(
2231 /*=================*/
2232 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2233 {
2234 	ulint		i;
2235 	dberr_t		error = DB_SUCCESS;
2236 	fts_t*		fts = optim->table->fts;
2237 
2238 	/* Optimize the FTS indexes. */
2239 	for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2240 		dict_index_t*	index;
2241 
2242 #ifdef	FTS_OPTIMIZE_DEBUG
2243 		time_t	end_time;
2244 		time_t	start_time;
2245 
2246 		/* Get the start and end optimize times for this index. */
2247 		error = fts_optimize_get_index_start_time(
2248 			optim->trx, index, &start_time);
2249 
2250 		if (error != DB_SUCCESS) {
2251 			break;
2252 		}
2253 
2254 		error = fts_optimize_get_index_end_time(
2255 			optim->trx, index, &end_time);
2256 
2257 		if (error != DB_SUCCESS) {
2258 			break;
2259 		}
2260 
2261 		/* Start time will be 0 only for the first time or after
2262 		completing the optimization of all FTS indexes. */
2263 		if (start_time == 0) {
2264 			start_time = time(NULL);
2265 
2266 			error = fts_optimize_set_index_start_time(
2267 				optim->trx, index, start_time);
2268 		}
2269 
2270 		/* Check if this index needs to be optimized or not. */
2271 		if (difftime(end_time, start_time) < 0) {
2272 			error = fts_optimize_index(optim, index);
2273 
2274 			if (error != DB_SUCCESS) {
2275 				break;
2276 			}
2277 		} else {
2278 			++optim->n_completed;
2279 		}
2280 #endif
2281 		index = static_cast<dict_index_t*>(
2282 			ib_vector_getp(fts->indexes, i));
2283 		error = fts_optimize_index(optim, index);
2284 	}
2285 
2286 	if (error == DB_SUCCESS) {
2287 		fts_sql_commit(optim->trx);
2288 	} else {
2289 		fts_sql_rollback(optim->trx);
2290 	}
2291 
2292 	return(error);
2293 }
2294 
2295 /*********************************************************************//**
2296 Cleanup the snapshot tables and the master deleted table.
2297 @return DB_SUCCESS if all OK */
2298 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2299 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2300 fts_optimize_purge_snapshot(
2301 /*========================*/
2302 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2303 {
2304 	dberr_t		error;
2305 
2306 	/* Delete the doc ids from the master deleted tables, that were
2307 	in the snapshot that was taken at the start of optimize. */
2308 	error = fts_optimize_purge_deleted_doc_ids(optim);
2309 
2310 	if (error == DB_SUCCESS) {
2311 		/* Destroy the deleted doc id snapshot. */
2312 		error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2313 	}
2314 
2315 	if (error == DB_SUCCESS) {
2316 		fts_sql_commit(optim->trx);
2317 	} else {
2318 		fts_sql_rollback(optim->trx);
2319 	}
2320 
2321 	return(error);
2322 }
2323 
2324 /*********************************************************************//**
2325 Reset the start time to 0 so that a new optimize can be started.
2326 @return DB_SUCCESS if all OK */
2327 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2328 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2329 fts_optimize_reset_start_time(
2330 /*==========================*/
2331 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2332 {
2333 	dberr_t		error = DB_SUCCESS;
2334 #ifdef FTS_OPTIMIZE_DEBUG
2335 	fts_t*		fts = optim->table->fts;
2336 
2337 	/* Optimization should have been completed for all indexes. */
2338 	ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2339 
2340 	for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2341 		dict_index_t*	index;
2342 
2343 		time_t	start_time = 0;
2344 
2345 		/* Reset the start time to 0 for this index. */
2346 		error = fts_optimize_set_index_start_time(
2347 			optim->trx, index, start_time);
2348 
2349 		index = static_cast<dict_index_t*>(
2350 			ib_vector_getp(fts->indexes, i));
2351 	}
2352 #endif
2353 
2354 	if (error == DB_SUCCESS) {
2355 		fts_sql_commit(optim->trx);
2356 	} else {
2357 		fts_sql_rollback(optim->trx);
2358 	}
2359 
2360 	return(error);
2361 }
2362 
2363 /*********************************************************************//**
2364 Run OPTIMIZE on the given table by a background thread.
2365 @return DB_SUCCESS if all OK */
2366 static MY_ATTRIBUTE((nonnull))
2367 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2368 fts_optimize_table_bk(
2369 /*==================*/
2370 	fts_slot_t*	slot)	/*!< in: table to optimiza */
2371 {
2372 	const time_t now = time(NULL);
2373 	const ulint interval = ulint(now - slot->last_run);
2374 
2375 	/* Avoid optimizing tables that were optimized recently. */
2376 	if (slot->last_run > 0
2377 	    && lint(interval) >= 0
2378 	    && interval < FTS_OPTIMIZE_INTERVAL_IN_SECS) {
2379 
2380 		return(DB_SUCCESS);
2381 	}
2382 
2383 	dict_table_t*	table = slot->table;
2384 	dberr_t		error;
2385 
2386 	if (fil_table_accessible(table)
2387 	    && table->fts && table->fts->cache
2388 	    && table->fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2389 		error = fts_optimize_table(table);
2390 
2391 		slot->last_run = time(NULL);
2392 
2393 		if (error == DB_SUCCESS) {
2394 			slot->running = false;
2395 			slot->completed = slot->last_run;
2396 		}
2397 	} else {
2398 		/* Note time this run completed. */
2399 		slot->last_run = now;
2400 		error = DB_SUCCESS;
2401 	}
2402 
2403 	return(error);
2404 }
2405 /*********************************************************************//**
2406 Run OPTIMIZE on the given table.
2407 @return DB_SUCCESS if all OK */
2408 dberr_t
fts_optimize_table(dict_table_t * table)2409 fts_optimize_table(
2410 /*===============*/
2411 	dict_table_t*	table)	/*!< in: table to optimiza */
2412 {
2413 	if (srv_read_only_mode) {
2414 		return DB_READ_ONLY;
2415 	}
2416 
2417 	dberr_t		error = DB_SUCCESS;
2418 	fts_optimize_t*	optim = NULL;
2419 	fts_t*		fts = table->fts;
2420 
2421 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2422 		ib::info() << "FTS start optimize " << table->name;
2423 	}
2424 
2425 	optim = fts_optimize_create(table);
2426 
2427 	// FIXME: Call this only at the start of optimize, currently we
2428 	// rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2429 
2430 	/* Check whether there are still records in BEING_DELETED table */
2431 	if (fts_optimize_being_deleted_count(optim) == 0) {
2432 		/* Take a snapshot of the deleted document ids, they are copied
2433 		to the BEING_ tables. */
2434 		error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2435 	}
2436 
2437 	/* A duplicate error is OK, since we don't erase the
2438 	doc ids from the being deleted state until all FTS
2439 	indexes have been optimized. */
2440 	if (error == DB_DUPLICATE_KEY) {
2441 		error = DB_SUCCESS;
2442 	}
2443 
2444 	if (error == DB_SUCCESS) {
2445 
2446 		/* These document ids will be filtered out during the
2447 		index optimization phase. They are in the snapshot that we
2448 		took above, at the start of the optimize. */
2449 		error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2450 
2451 		if (error == DB_SUCCESS) {
2452 
2453 			/* Commit the read of being deleted
2454 			doc ids transaction. */
2455 			fts_sql_commit(optim->trx);
2456 
2457 			/* We would do optimization only if there
2458 			are deleted records to be cleaned up */
2459 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2460 				error = fts_optimize_indexes(optim);
2461 			}
2462 
2463 		} else {
2464 			ut_a(optim->to_delete == NULL);
2465 		}
2466 
2467 		/* Only after all indexes have been optimized can we
2468 		delete the (snapshot) doc ids in the pending delete,
2469 		and master deleted tables. */
2470 		if (error == DB_SUCCESS
2471 		    && optim->n_completed == ib_vector_size(fts->indexes)) {
2472 
2473 			if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2474 				ib::info() << "FTS_OPTIMIZE: Completed"
2475 					" Optimize, cleanup DELETED table";
2476 			}
2477 
2478 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2479 
2480 				/* Purge the doc ids that were in the
2481 				snapshot from the snapshot tables and
2482 				the master deleted table. */
2483 				error = fts_optimize_purge_snapshot(optim);
2484 			}
2485 
2486 			if (error == DB_SUCCESS) {
2487 				/* Reset the start time of all the FTS indexes
2488 				so that optimize can be restarted. */
2489 				error = fts_optimize_reset_start_time(optim);
2490 			}
2491 		}
2492 	}
2493 
2494 	fts_optimize_free(optim);
2495 
2496 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2497 		ib::info() << "FTS end optimize " << table->name;
2498 	}
2499 
2500 	return(error);
2501 }
2502 
2503 /********************************************************************//**
2504 Add the table to add to the OPTIMIZER's list.
2505 @return new message instance */
2506 static
2507 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2508 fts_optimize_create_msg(
2509 /*====================*/
2510 	fts_msg_type_t	type,		/*!< in: type of message */
2511 	void*		ptr)		/*!< in: message payload */
2512 {
2513 	mem_heap_t*	heap;
2514 	fts_msg_t*	msg;
2515 
2516 	heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2517 	msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2518 
2519 	msg->ptr = ptr;
2520 	msg->type = type;
2521 	msg->heap = heap;
2522 
2523 	return(msg);
2524 }
2525 
2526 /** Add the table to add to the OPTIMIZER's list.
2527 @param[in]	table	table to add */
fts_optimize_add_table(dict_table_t * table)2528 void fts_optimize_add_table(dict_table_t* table)
2529 {
2530 	fts_msg_t*	msg;
2531 
2532 	if (!fts_optimize_wq) {
2533 		return;
2534 	}
2535 
2536 	/* If there is no fts index present then don't add to
2537 	optimize queue. */
2538 	if (!ib_vector_size(table->fts->indexes)) {
2539 		return;
2540 	}
2541 
2542 	/* Make sure table with FTS index cannot be evicted */
2543 	dict_table_prevent_eviction(table);
2544 
2545 	msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2546 
2547 	mutex_enter(&fts_optimize_wq->mutex);
2548 
2549 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2550 
2551 	table->fts->in_queue = true;
2552 
2553 	mutex_exit(&fts_optimize_wq->mutex);
2554 }
2555 
2556 /**********************************************************************//**
2557 Remove the table from the OPTIMIZER's list. We do wait for
2558 acknowledgement from the consumer of the message. */
2559 void
fts_optimize_remove_table(dict_table_t * table)2560 fts_optimize_remove_table(
2561 /*======================*/
2562 	dict_table_t*	table)			/*!< in: table to remove */
2563 {
2564 	fts_msg_t*	msg;
2565 	os_event_t	event;
2566 	fts_msg_del_t*	remove;
2567 
2568 	/* if the optimize system not yet initialized, return */
2569 	if (!fts_optimize_wq) {
2570 		return;
2571 	}
2572 
2573 	/* FTS optimizer thread is already exited */
2574 	if (fts_opt_start_shutdown) {
2575 		ib::info() << "Try to remove table " << table->name
2576 			<< " after FTS optimize thread exiting.";
2577 		/* If the table can't be removed then wait till
2578 		fts optimize thread shuts down */
2579 		while (fts_optimize_wq) {
2580 			os_thread_sleep(10000);
2581 		}
2582 		return;
2583 	}
2584 
2585 	mutex_enter(&fts_optimize_wq->mutex);
2586 
2587 	if (!table->fts->in_queue) {
2588 		mutex_exit(&fts_optimize_wq->mutex);
2589 		return;
2590 	}
2591 
2592 	msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2593 
2594 	/* We will wait on this event until signalled by the consumer. */
2595 	event = os_event_create(0);
2596 
2597 	remove = static_cast<fts_msg_del_t*>(
2598 		mem_heap_alloc(msg->heap, sizeof(*remove)));
2599 
2600 	remove->table = table;
2601 	remove->event = event;
2602 	msg->ptr = remove;
2603 
2604 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2605 
2606 	mutex_exit(&fts_optimize_wq->mutex);
2607 
2608 	os_event_wait(event);
2609 
2610 	os_event_destroy(event);
2611 
2612 #ifdef UNIV_DEBUG
2613 	if (!fts_opt_start_shutdown) {
2614 		mutex_enter(&fts_optimize_wq->mutex);
2615 		ut_ad(!table->fts->in_queue);
2616 		mutex_exit(&fts_optimize_wq->mutex);
2617 	}
2618 #endif /* UNIV_DEBUG */
2619 }
2620 
2621 /** Send sync fts cache for the table.
2622 @param[in]	table	table to sync */
2623 void
fts_optimize_request_sync_table(dict_table_t * table)2624 fts_optimize_request_sync_table(
2625 	dict_table_t*	table)
2626 {
2627 	/* if the optimize system not yet initialized, return */
2628 	if (!fts_optimize_wq) {
2629 		return;
2630 	}
2631 
2632 	/* FTS optimizer thread is already exited */
2633 	if (fts_opt_start_shutdown) {
2634 		ib::info() << "Try to sync table " << table->name
2635 			<< " after FTS optimize thread exiting.";
2636 		return;
2637 	}
2638 
2639 	fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
2640 
2641 	mutex_enter(&fts_optimize_wq->mutex);
2642 
2643 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
2644 
2645 	DBUG_EXECUTE_IF("fts_optimize_wq_count_check",
2646 			DBUG_ASSERT(fts_optimize_wq->length <= 1000););
2647 
2648 	mutex_exit(&fts_optimize_wq->mutex);
2649 }
2650 
2651 /** Add a table to fts_slots if it doesn't already exist. */
fts_optimize_new_table(dict_table_t * table)2652 static bool fts_optimize_new_table(dict_table_t* table)
2653 {
2654 	ut_ad(table);
2655 
2656 	ulint		i;
2657 	fts_slot_t*	slot;
2658 	fts_slot_t*	empty = NULL;
2659 
2660 	/* Search for duplicates, also find a free slot if one exists. */
2661 	for (i = 0; i < ib_vector_size(fts_slots); ++i) {
2662 
2663 		slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
2664 
2665 		if (!slot->table) {
2666 			empty = slot;
2667 		} else if (slot->table == table) {
2668 			/* Already exists in our optimize queue. */
2669 			return false;
2670 		}
2671 	}
2672 
2673 	slot = empty ? empty : static_cast<fts_slot_t*>(
2674 		ib_vector_push(fts_slots, NULL));
2675 
2676 	memset(slot, 0x0, sizeof(*slot));
2677 
2678 	slot->table = table;
2679 	return true;
2680 }
2681 
2682 /** Remove a table from fts_slots if it exists.
2683 @param[in,out]	table	table to be removed from fts_slots */
fts_optimize_del_table(const dict_table_t * table)2684 static bool fts_optimize_del_table(const dict_table_t* table)
2685 {
2686 	ut_ad(table);
2687 	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2688 		fts_slot_t*	slot;
2689 
2690 		slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
2691 
2692 		if (slot->table == table) {
2693 			if (UNIV_UNLIKELY(fts_enable_diag_print)) {
2694 				ib::info() << "FTS Optimize Removing table "
2695 					<< table->name;
2696 			}
2697 
2698 			mutex_enter(&fts_optimize_wq->mutex);
2699 			slot->table->fts->in_queue = false;
2700 			mutex_exit(&fts_optimize_wq->mutex);
2701 			slot->table = NULL;
2702 			return true;
2703 		}
2704 	}
2705 
2706 	return false;
2707 }
2708 
2709 /**********************************************************************//**
2710 Calculate how many tables in fts_slots need to be optimized.
2711 @return no. of tables to optimize */
fts_optimize_how_many()2712 static ulint fts_optimize_how_many()
2713 {
2714 	ulint n_tables = 0;
2715 	const time_t current_time = time(NULL);
2716 
2717 	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2718 		const fts_slot_t* slot = static_cast<const fts_slot_t*>(
2719 			ib_vector_get_const(fts_slots, i));
2720 		if (!slot->table) {
2721 			continue;
2722 		}
2723 
2724 		const time_t end = slot->running
2725 			? slot->last_run : slot->completed;
2726 		ulint interval = ulint(current_time - end);
2727 
2728 		if (lint(interval) < 0
2729 		    || interval >= FTS_OPTIMIZE_INTERVAL_IN_SECS) {
2730 			++n_tables;
2731 		}
2732 	}
2733 
2734 	return(n_tables);
2735 }
2736 
2737 /**********************************************************************//**
2738 Check if the total memory used by all FTS table exceeds the maximum limit.
2739 @return true if a sync is needed, false otherwise */
fts_is_sync_needed()2740 static bool fts_is_sync_needed()
2741 {
2742 	ulint		total_memory = 0;
2743 	const time_t	now = time(NULL);
2744 	double		time_diff = difftime(now, last_check_sync_time);
2745 
2746 	if (fts_need_sync || (time_diff >= 0 && time_diff < 5)) {
2747 		return(false);
2748 	}
2749 
2750 	last_check_sync_time = now;
2751 
2752 	for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
2753 		const fts_slot_t* slot = static_cast<const fts_slot_t*>(
2754 			ib_vector_get_const(fts_slots, i));
2755 
2756 		if (!slot->table) {
2757 			continue;
2758 		}
2759 
2760 		if (slot->table->fts && slot->table->fts->cache) {
2761 			total_memory += slot->table->fts->cache->total_size;
2762 		}
2763 
2764 		if (total_memory > fts_max_total_cache_size) {
2765 			return(true);
2766 		}
2767 	}
2768 
2769 	return(false);
2770 }
2771 
2772 /** Sync fts cache of a table
2773 @param[in,out]	table	table to be synced */
fts_optimize_sync_table(dict_table_t * table)2774 static void fts_optimize_sync_table(dict_table_t* table)
2775 {
2776 	if (table->fts && table->fts->cache && fil_table_accessible(table)) {
2777 		fts_sync_table(table, false);
2778 	}
2779 
2780 	DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
2781 }
2782 
2783 /**********************************************************************//**
2784 Optimize all FTS tables.
2785 @return Dummy return */
2786 static
2787 os_thread_ret_t
DECLARE_THREAD(fts_optimize_thread)2788 DECLARE_THREAD(fts_optimize_thread)(
2789 /*================*/
2790 	void*		arg)			/*!< in: work queue*/
2791 {
2792 	ulint		current = 0;
2793 	ibool		done = FALSE;
2794 	ulint		n_tables = 0;
2795 	ulint		n_optimize = 0;
2796 	ib_wqueue_t*	wq = (ib_wqueue_t*) arg;
2797 
2798 	ut_ad(!srv_read_only_mode);
2799 	my_thread_init();
2800 
2801 	ut_ad(fts_slots);
2802 
2803 	/* Assign number of tables added in fts_slots_t to n_tables */
2804 	n_tables = ib_vector_size(fts_slots);
2805 
2806 	while (!done && srv_shutdown_state <= SRV_SHUTDOWN_INITIATED) {
2807 		/* If there is no message in the queue and we have tables
2808 		to optimize then optimize the tables. */
2809 
2810 		if (!done
2811 		    && ib_wqueue_is_empty(wq)
2812 		    && n_tables > 0
2813 		    && n_optimize > 0) {
2814 			fts_slot_t* slot = static_cast<fts_slot_t*>(
2815 				ib_vector_get(fts_slots, current));
2816 
2817 			/* Handle the case of empty slots. */
2818 			if (slot->table) {
2819 				slot->running = true;
2820 				fts_optimize_table_bk(slot);
2821 			}
2822 
2823 			/* Wrap around the counter. */
2824 			if (++current >= ib_vector_size(fts_slots)) {
2825 				n_optimize = fts_optimize_how_many();
2826 				current = 0;
2827 			}
2828 
2829 		} else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
2830 			fts_msg_t*	msg;
2831 
2832 			msg = static_cast<fts_msg_t*>(
2833 				ib_wqueue_timedwait(wq, FTS_QUEUE_WAIT_IN_USECS));
2834 
2835 			/* Timeout ? */
2836 			if (msg == NULL) {
2837 				if (fts_is_sync_needed()) {
2838 					fts_need_sync = true;
2839 				}
2840 
2841 				continue;
2842 			}
2843 
2844 			switch (msg->type) {
2845 			case FTS_MSG_STOP:
2846 				done = TRUE;
2847 				break;
2848 
2849 			case FTS_MSG_ADD_TABLE:
2850 				ut_a(!done);
2851 				if (fts_optimize_new_table(
2852 					    static_cast<dict_table_t*>(
2853 						    msg->ptr))) {
2854 					++n_tables;
2855 				}
2856 				break;
2857 
2858 			case FTS_MSG_DEL_TABLE:
2859 				if (fts_optimize_del_table(
2860 					    static_cast<fts_msg_del_t*>(
2861 						    msg->ptr)->table)) {
2862 					--n_tables;
2863 				}
2864 
2865 				/* Signal the producer that we have
2866 				removed the table. */
2867 				os_event_set(
2868 					((fts_msg_del_t*) msg->ptr)->event);
2869 				break;
2870 
2871 			case FTS_MSG_SYNC_TABLE:
2872 				DBUG_EXECUTE_IF(
2873 					"fts_instrument_msg_sync_sleep",
2874 					os_thread_sleep(300000););
2875 
2876 				fts_optimize_sync_table(
2877 					static_cast<dict_table_t*>(msg->ptr));
2878 				break;
2879 
2880 			default:
2881 				ut_error;
2882 			}
2883 
2884 			mem_heap_free(msg->heap);
2885 			n_optimize = done ? 0 : fts_optimize_how_many();
2886 		}
2887 	}
2888 
2889 	/* Server is being shutdown, sync the data from FTS cache to disk
2890 	if needed */
2891 	if (n_tables > 0) {
2892 		for (ulint i = 0; i < ib_vector_size(fts_slots); i++) {
2893 			fts_slot_t* slot = static_cast<fts_slot_t*>(
2894 				ib_vector_get(fts_slots, i));
2895 
2896 			if (slot->table) {
2897 				fts_optimize_sync_table(slot->table);
2898 			}
2899 		}
2900 	}
2901 
2902 	ib_vector_free(fts_slots);
2903 	fts_slots = NULL;
2904 
2905 	ib::info() << "FTS optimize thread exiting.";
2906 
2907 	os_event_set(fts_opt_shutdown_event);
2908 	my_thread_end();
2909 
2910 	/* We count the number of threads in os_thread_exit(). A created
2911 	thread should always use that to exit and not use return() to exit. */
2912 	os_thread_exit();
2913 
2914 	OS_THREAD_DUMMY_RETURN;
2915 }
2916 
2917 /**********************************************************************//**
2918 Startup the optimize thread and create the work queue. */
2919 void
fts_optimize_init(void)2920 fts_optimize_init(void)
2921 /*===================*/
2922 {
2923 	mem_heap_t*	heap;
2924 	ib_alloc_t*     heap_alloc;
2925 
2926 	ut_ad(!srv_read_only_mode);
2927 
2928 	/* For now we only support one optimize thread. */
2929 	ut_a(!fts_optimize_wq);
2930 
2931 	/* Create FTS optimize work queue */
2932 	fts_optimize_wq = ib_wqueue_create();
2933 	ut_a(fts_optimize_wq != NULL);
2934 
2935 	/* Create FTS vector to store fts_slot_t */
2936 	heap = mem_heap_create(sizeof(dict_table_t*) * 64);
2937 	heap_alloc = ib_heap_allocator_create(heap);
2938 	fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
2939 
2940 	/* Add fts tables to fts_slots which could be skipped
2941 	during dict_load_table_one() because fts_optimize_thread
2942 	wasn't even started. */
2943 	mutex_enter(&dict_sys.mutex);
2944 	for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys.table_LRU);
2945 	     table != NULL;
2946 	     table = UT_LIST_GET_NEXT(table_LRU, table)) {
2947 		if (!table->fts || !dict_table_has_fts_index(table)) {
2948 			continue;
2949 		}
2950 
2951 		/* fts_optimize_thread is not started yet. So there is no
2952 		need to acquire fts_optimize_wq->mutex for adding the fts
2953 		table to the fts slots. */
2954 		ut_ad(!table->can_be_evicted);
2955 		fts_optimize_new_table(table);
2956 		table->fts->in_queue = true;
2957 	}
2958 	mutex_exit(&dict_sys.mutex);
2959 
2960 	fts_opt_shutdown_event = os_event_create(0);
2961 	last_check_sync_time = time(NULL);
2962 
2963 	os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
2964 }
2965 
2966 /** Shutdown fts optimize thread. */
2967 void
fts_optimize_shutdown()2968 fts_optimize_shutdown()
2969 {
2970 	ut_ad(!srv_read_only_mode);
2971 
2972 	fts_msg_t*	msg;
2973 
2974 	/* If there is an ongoing activity on dictionary, such as
2975 	srv_master_evict_from_table_cache(), wait for it */
2976 	dict_mutex_enter_for_mysql();
2977 
2978 	/* Tells FTS optimizer system that we are exiting from
2979 	optimizer thread, message send their after will not be
2980 	processed */
2981 	fts_opt_start_shutdown = true;
2982 	dict_mutex_exit_for_mysql();
2983 
2984 	/* We tell the OPTIMIZE thread to switch to state done, we
2985 	can't delete the work queue here because the add thread needs
2986 	deregister the FTS tables. */
2987 
2988 	msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
2989 
2990 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2991 
2992 	os_event_wait(fts_opt_shutdown_event);
2993 
2994 	os_event_destroy(fts_opt_shutdown_event);
2995 
2996 	ib_wqueue_free(fts_optimize_wq);
2997 	fts_optimize_wq = NULL;
2998 }
2999