1 /*****************************************************************************
2 
3 Copyright (c) 2007, 2017, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /******************************************************************//**
28 @file fts/fts0opt.cc
29 Full Text Search optimize thread
30 
31 Created 2007/03/27 Sunny Bains
32 Completed 2011/7/10 Sunny and Jimmy Yang
33 
34 ***********************************************************************/
35 
36 #include "fts0fts.h"
37 #include "row0sel.h"
38 #include "que0types.h"
39 #include "fts0priv.h"
40 #include "fts0types.h"
41 #include "ut0wqueue.h"
42 #include "srv0start.h"
43 #include "zlib.h"
44 
45 #ifndef UNIV_NONINL
46 #include "fts0types.ic"
47 #include "fts0vlc.ic"
48 #endif
49 
50 /** The FTS optimize thread's work queue. */
51 static ib_wqueue_t* fts_optimize_wq;
52 
53 /** Time to wait for a message. */
54 static const ulint FTS_QUEUE_WAIT_IN_USECS = 5000000;
55 
56 /** Default optimize interval in secs. */
57 static const ulint FTS_OPTIMIZE_INTERVAL_IN_SECS = 300;
58 
59 /** Server is shutting down, so does we exiting the optimize thread */
60 static bool fts_opt_start_shutdown = false;
61 
62 /** Initial size of nodes in fts_word_t. */
63 static const ulint FTS_WORD_NODES_INIT_SIZE = 64;
64 
65 /** Last time we did check whether system need a sync */
66 static ib_time_t	last_check_sync_time;
67 
68 #if 0
69 /** Check each table in round robin to see whether they'd
70 need to be "optimized" */
71 static	ulint	fts_optimize_sync_iterator = 0;
72 #endif
73 
74 /** State of a table within the optimization sub system. */
75 enum fts_state_t {
76 	FTS_STATE_LOADED,
77 	FTS_STATE_RUNNING,
78 	FTS_STATE_SUSPENDED,
79 	FTS_STATE_DONE,
80 	FTS_STATE_EMPTY
81 };
82 
83 /** FTS optimize thread message types. */
84 enum fts_msg_type_t {
85 	FTS_MSG_START,			/*!< Start optimizing thread */
86 
87 	FTS_MSG_PAUSE,			/*!< Pause optimizing thread */
88 
89 	FTS_MSG_STOP,			/*!< Stop optimizing and exit thread */
90 
91 	FTS_MSG_ADD_TABLE,		/*!< Add table to the optimize thread's
92 					work queue */
93 
94 	FTS_MSG_OPTIMIZE_TABLE,		/*!< Optimize a table */
95 
96 	FTS_MSG_DEL_TABLE,		/*!< Remove a table from the optimize
97 					threads work queue */
98 	FTS_MSG_SYNC_TABLE		/*!< Sync fts cache of a table */
99 };
100 
101 /** Compressed list of words that have been read from FTS INDEX
102 that needs to be optimized. */
103 struct fts_zip_t {
104 	lint		status;		/*!< Status of (un)/zip operation */
105 
106 	ulint		n_words;	/*!< Number of words compressed */
107 
108 	ulint		block_sz;	/*!< Size of a block in bytes */
109 
110 	ib_vector_t*	blocks;		/*!< Vector of compressed blocks */
111 
112 	ib_alloc_t*	heap_alloc;	/*!< Heap to use for allocations */
113 
114 	ulint		pos;		/*!< Offset into blocks */
115 
116 	ulint		last_big_block;	/*!< Offset of last block in the
117 					blocks array that is of size
118 					block_sz. Blocks beyond this offset
119 					are of size FTS_MAX_WORD_LEN */
120 
121 	z_streamp	zp;		/*!< ZLib state */
122 
123 					/*!< The value of the last word read
124 					from the FTS INDEX table. This is
125 					used to discard duplicates */
126 
127 	fts_string_t	word;		/*!< UTF-8 string */
128 
129 	ulint		max_words;	/*!< maximum number of words to read
130 					in one pase */
131 };
132 
133 /** Prepared statemets used during optimize */
134 struct fts_optimize_graph_t {
135 					/*!< Delete a word from FTS INDEX */
136 	que_t*		delete_nodes_graph;
137 					/*!< Insert a word into FTS INDEX */
138 	que_t*		write_nodes_graph;
139 					/*!< COMMIT a transaction */
140 	que_t*		commit_graph;
141 					/*!< Read the nodes from FTS_INDEX */
142 	que_t*		read_nodes_graph;
143 };
144 
145 /** Used by fts_optimize() to store state. */
146 struct fts_optimize_t {
147 	trx_t*		trx;		/*!< The transaction used for all SQL */
148 
149 	ib_alloc_t*	self_heap;	/*!< Heap to use for allocations */
150 
151 	char*		name_prefix;	/*!< FTS table name prefix */
152 
153 	fts_table_t	fts_index_table;/*!< Common table definition */
154 
155 					/*!< Common table definition */
156 	fts_table_t	fts_common_table;
157 
158 	dict_table_t*	table;		/*!< Table that has to be queried */
159 
160 	dict_index_t*	index;		/*!< The FTS index to be optimized */
161 
162 	fts_doc_ids_t*	to_delete;	/*!< doc ids to delete, we check against
163 					this vector and purge the matching
164 					entries during the optimizing
165 					process. The vector entries are
166 					sorted on doc id */
167 
168 	ulint		del_pos;	/*!< Offset within to_delete vector,
169 					this is used to keep track of where
170 					we are up to in the vector */
171 
172 	ibool		done;		/*!< TRUE when optimize finishes */
173 
174 	ib_vector_t*	words;		/*!< Word + Nodes read from FTS_INDEX,
175 					it contains instances of fts_word_t */
176 
177 	fts_zip_t*	zip;		/*!< Words read from the FTS_INDEX */
178 
179 	fts_optimize_graph_t		/*!< Prepared statements used during */
180 			graph;		/*optimize */
181 
182 	ulint		n_completed;	/*!< Number of FTS indexes that have
183 					been optimized */
184 	ibool		del_list_regenerated;
185 					/*!< BEING_DELETED list regenarated */
186 };
187 
188 /** Used by the optimize, to keep state during compacting nodes. */
189 struct fts_encode_t {
190 	doc_id_t	src_last_doc_id;/*!< Last doc id read from src node */
191 	byte*		src_ilist_ptr;	/*!< Current ptr within src ilist */
192 };
193 
194 /** We use this information to determine when to start the optimize
195 cycle for a table. */
196 struct fts_slot_t {
197 	dict_table_t*	table;		/*!< Table to optimize */
198 
199 	table_id_t	table_id;	/*!< Table id */
200 
201 	fts_state_t	state;		/*!< State of this slot */
202 
203 	ulint		added;		/*!< Number of doc ids added since the
204 					last time this table was optimized */
205 
206 	ulint		deleted;	/*!< Number of doc ids deleted since the
207 					last time this table was optimized */
208 
209 	ib_time_t	last_run;	/*!< Time last run completed */
210 
211 	ib_time_t	completed;	/*!< Optimize finish time */
212 
213 	ib_time_t	interval_time;	/*!< Minimum time to wait before
214 					optimizing the table again. */
215 };
216 
217 /** A table remove message for the FTS optimize thread. */
218 struct fts_msg_del_t {
219 	dict_table_t*	table;		/*!< The table to remove */
220 
221 	os_event_t	event;		/*!< Event to synchronize acknowledgement
222 					of receipt and processing of the
223 					this message by the consumer */
224 };
225 
226 /** Stop the optimize thread. */
227 struct fts_msg_optimize_t {
228 	dict_table_t*	table;		/*!< Table to optimize */
229 };
230 
231 /** The FTS optimize message work queue message type. */
232 struct fts_msg_t {
233 	fts_msg_type_t	type;		/*!< Message type */
234 
235 	void*		ptr;		/*!< The message contents */
236 
237 	mem_heap_t*	heap;		/*!< The heap used to allocate this
238 					message, the message consumer will
239 					free the heap. */
240 };
241 
242 /** The number of words to read and optimize in a single pass. */
243 UNIV_INTERN ulong	fts_num_word_optimize;
244 
245 // FIXME
246 UNIV_INTERN char	fts_enable_diag_print;
247 
248 /** ZLib compressed block size.*/
249 static ulint FTS_ZIP_BLOCK_SIZE	= 1024;
250 
251 /** The amount of time optimizing in a single pass, in milliseconds. */
252 static ib_time_t fts_optimize_time_limit = 0;
253 
254 /** SQL Statement for changing state of rows to be deleted from FTS Index. */
255 static	const char* fts_init_delete_sql =
256 	"BEGIN\n"
257 	"\n"
258 	"INSERT INTO \"%s_BEING_DELETED\"\n"
259 		"SELECT doc_id FROM \"%s_DELETED\";\n"
260 	"\n"
261 	"INSERT INTO \"%s_BEING_DELETED_CACHE\"\n"
262 		"SELECT doc_id FROM \"%s_DELETED_CACHE\";\n";
263 
264 static const char* fts_delete_doc_ids_sql =
265 	"BEGIN\n"
266 	"\n"
267 	"DELETE FROM \"%s_DELETED\" WHERE doc_id = :doc_id1;\n"
268 	"DELETE FROM \"%s_DELETED_CACHE\" WHERE doc_id = :doc_id2;\n";
269 
270 static const char* fts_end_delete_sql =
271 	"BEGIN\n"
272 	"\n"
273 	"DELETE FROM \"%s_BEING_DELETED\";\n"
274 	"DELETE FROM \"%s_BEING_DELETED_CACHE\";\n";
275 
276 /**********************************************************************//**
277 Initialize fts_zip_t. */
278 static
279 void
fts_zip_initialize(fts_zip_t * zip)280 fts_zip_initialize(
281 /*===============*/
282 	fts_zip_t*	zip)		/*!< out: zip instance to initialize */
283 {
284 	zip->pos = 0;
285 	zip->n_words = 0;
286 
287 	zip->status = Z_OK;
288 
289 	zip->last_big_block = 0;
290 
291 	zip->word.f_len = 0;
292 	memset(zip->word.f_str, 0, FTS_MAX_WORD_LEN);
293 
294 	ib_vector_reset(zip->blocks);
295 
296 	memset(zip->zp, 0, sizeof(*zip->zp));
297 }
298 
299 /**********************************************************************//**
300 Create an instance of fts_zip_t.
301 @return a new instance of fts_zip_t */
302 static
303 fts_zip_t*
fts_zip_create(mem_heap_t * heap,ulint block_sz,ulint max_words)304 fts_zip_create(
305 /*===========*/
306 	mem_heap_t*	heap,		/*!< in: heap */
307 	ulint		block_sz,	/*!< in: size of a zip block.*/
308 	ulint		max_words)	/*!< in: max words to read */
309 {
310 	fts_zip_t*	zip;
311 
312 	zip = static_cast<fts_zip_t*>(mem_heap_zalloc(heap, sizeof(*zip)));
313 
314 	zip->word.f_str = static_cast<byte*>(
315 		mem_heap_zalloc(heap, FTS_MAX_WORD_LEN + 1));
316 
317 	zip->block_sz = block_sz;
318 
319 	zip->heap_alloc = ib_heap_allocator_create(heap);
320 
321 	zip->blocks = ib_vector_create(zip->heap_alloc, sizeof(void*), 128);
322 
323 	zip->max_words = max_words;
324 
325 	zip->zp = static_cast<z_stream*>(
326 		mem_heap_zalloc(heap, sizeof(*zip->zp)));
327 
328 	return(zip);
329 }
330 
331 /**********************************************************************//**
332 Initialize an instance of fts_zip_t. */
333 static
334 void
fts_zip_init(fts_zip_t * zip)335 fts_zip_init(
336 /*=========*/
337 
338 	fts_zip_t*	zip)		/*!< in: zip instance to init */
339 {
340 	memset(zip->zp, 0, sizeof(*zip->zp));
341 
342 	zip->word.f_len = 0;
343 	*zip->word.f_str = '\0';
344 }
345 
346 /**********************************************************************//**
347 Create a fts_optimizer_word_t instance.
348 @return new instance */
349 UNIV_INTERN
350 fts_word_t*
fts_word_init(fts_word_t * word,byte * utf8,ulint len)351 fts_word_init(
352 /*==========*/
353 	fts_word_t*	word,		/*!< in: word to initialize */
354 	byte*		utf8,		/*!< in: UTF-8 string */
355 	ulint		len)		/*!< in: length of string in bytes */
356 {
357 	mem_heap_t*	heap = mem_heap_create(sizeof(fts_node_t));
358 
359 	memset(word, 0, sizeof(*word));
360 
361 	word->text.f_len = len;
362 	word->text.f_str = static_cast<byte*>(mem_heap_alloc(heap, len + 1));
363 
364 	/* Need to copy the NUL character too. */
365 	memcpy(word->text.f_str, utf8, word->text.f_len);
366 	word->text.f_str[word->text.f_len] = 0;
367 
368 	word->heap_alloc = ib_heap_allocator_create(heap);
369 
370 	word->nodes = ib_vector_create(
371 		word->heap_alloc, sizeof(fts_node_t), FTS_WORD_NODES_INIT_SIZE);
372 
373 	return(word);
374 }
375 
376 /**********************************************************************//**
377 Read the FTS INDEX row.
378 @return fts_node_t instance */
379 static
380 fts_node_t*
fts_optimize_read_node(fts_word_t * word,que_node_t * exp)381 fts_optimize_read_node(
382 /*===================*/
383 	fts_word_t*	word,		/*!< in: */
384 	que_node_t*	exp)		/*!< in: */
385 {
386 	int		i;
387 	fts_node_t*	node = static_cast<fts_node_t*>(
388 		ib_vector_push(word->nodes, NULL));
389 
390 	/* Start from 1 since the first node has been read by the caller */
391 	for (i = 1; exp; exp = que_node_get_next(exp), ++i) {
392 
393 		dfield_t*	dfield = que_node_get_val(exp);
394 		byte*		data = static_cast<byte*>(
395 			dfield_get_data(dfield));
396 		ulint		len = dfield_get_len(dfield);
397 
398 		ut_a(len != UNIV_SQL_NULL);
399 
400 		/* Note: The column numbers below must match the SELECT */
401 		switch (i) {
402 		case 1: /* DOC_COUNT */
403 			node->doc_count = mach_read_from_4(data);
404 			break;
405 
406 		case 2: /* FIRST_DOC_ID */
407 			node->first_doc_id = fts_read_doc_id(data);
408 			break;
409 
410 		case 3: /* LAST_DOC_ID */
411 			node->last_doc_id = fts_read_doc_id(data);
412 			break;
413 
414 		case 4: /* ILIST */
415 			node->ilist_size_alloc = node->ilist_size = len;
416 			node->ilist = static_cast<byte*>(ut_malloc(len));
417 			memcpy(node->ilist, data, len);
418 			break;
419 
420 		default:
421 			ut_error;
422 		}
423 	}
424 
425 	/* Make sure all columns were read. */
426 	ut_a(i == 5);
427 
428 	return(node);
429 }
430 
431 /**********************************************************************//**
432 Callback function to fetch the rows in an FTS INDEX record.
433 @return always returns non-NULL */
434 UNIV_INTERN
435 ibool
fts_optimize_index_fetch_node(void * row,void * user_arg)436 fts_optimize_index_fetch_node(
437 /*==========================*/
438 	void*		row,		/*!< in: sel_node_t* */
439 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
440 {
441 	fts_word_t*	word;
442 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
443 	fts_fetch_t*	fetch = static_cast<fts_fetch_t*>(user_arg);
444 	ib_vector_t*	words = static_cast<ib_vector_t*>(fetch->read_arg);
445 	que_node_t*	exp = sel_node->select_list;
446 	dfield_t*	dfield = que_node_get_val(exp);
447 	void*		data = dfield_get_data(dfield);
448 	ulint		dfield_len = dfield_get_len(dfield);
449 	fts_node_t*	node;
450 	bool		is_word_init = false;
451 
452 	ut_a(dfield_len <= FTS_MAX_WORD_LEN);
453 
454 	if (ib_vector_size(words) == 0) {
455 
456 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
457 		fts_word_init(word, (byte*) data, dfield_len);
458 		is_word_init = true;
459 	}
460 
461 	word = static_cast<fts_word_t*>(ib_vector_last(words));
462 
463 	if (dfield_len != word->text.f_len
464 	    || memcmp(word->text.f_str, data, dfield_len)) {
465 
466 		word = static_cast<fts_word_t*>(ib_vector_push(words, NULL));
467 		fts_word_init(word, (byte*) data, dfield_len);
468 		is_word_init = true;
469 	}
470 
471 	node = fts_optimize_read_node(word, que_node_get_next(exp));
472 
473 	fetch->total_memory += node->ilist_size;
474 	if (is_word_init) {
475 		fetch->total_memory += sizeof(fts_word_t)
476 			+ sizeof(ib_alloc_t) + sizeof(ib_vector_t) + dfield_len
477 			+ sizeof(fts_node_t) * FTS_WORD_NODES_INIT_SIZE;
478 	} else if (ib_vector_size(words) > FTS_WORD_NODES_INIT_SIZE) {
479 		fetch->total_memory += sizeof(fts_node_t);
480 	}
481 
482 	if (fetch->total_memory >= fts_result_cache_limit) {
483 		return(FALSE);
484 	}
485 
486 	return(TRUE);
487 }
488 
489 /**********************************************************************//**
490 Read the rows from the FTS inde.
491 @return DB_SUCCESS or error code */
492 UNIV_INTERN
493 dberr_t
fts_index_fetch_nodes(trx_t * trx,que_t ** graph,fts_table_t * fts_table,const fts_string_t * word,fts_fetch_t * fetch)494 fts_index_fetch_nodes(
495 /*==================*/
496 	trx_t*		trx,		/*!< in: transaction */
497 	que_t**		graph,		/*!< in: prepared statement */
498 	fts_table_t*	fts_table,	/*!< in: table of the FTS INDEX */
499 	const fts_string_t*
500 			word,		/*!< in: the word to fetch */
501 	fts_fetch_t*	fetch)		/*!< in: fetch callback.*/
502 {
503 	pars_info_t*	info;
504 	dberr_t		error;
505 
506 	trx->op_info = "fetching FTS index nodes";
507 
508 	if (*graph) {
509 		info = (*graph)->info;
510 	} else {
511 		info = pars_info_create();
512 	}
513 
514 	pars_info_bind_function(info, "my_func", fetch->read_record, fetch);
515 	pars_info_bind_varchar_literal(info, "word", word->f_str, word->f_len);
516 
517 	if (!*graph) {
518 		ulint	selected;
519 
520 		ut_a(fts_table->type == FTS_INDEX_TABLE);
521 
522 		selected = fts_select_index(fts_table->charset,
523 					    word->f_str, word->f_len);
524 
525 		fts_table->suffix = fts_get_suffix(selected);
526 
527 		*graph = fts_parse_sql(
528 			fts_table,
529 			info,
530 			"DECLARE FUNCTION my_func;\n"
531 			"DECLARE CURSOR c IS"
532 			" SELECT word, doc_count, first_doc_id, last_doc_id, "
533 				"ilist\n"
534 			" FROM \"%s\"\n"
535 			" WHERE word LIKE :word\n"
536 			" ORDER BY first_doc_id;\n"
537 			"BEGIN\n"
538 			"\n"
539 			"OPEN c;\n"
540 			"WHILE 1 = 1 LOOP\n"
541 			"  FETCH c INTO my_func();\n"
542 			"  IF c % NOTFOUND THEN\n"
543 			"    EXIT;\n"
544 			"  END IF;\n"
545 			"END LOOP;\n"
546 			"CLOSE c;");
547 	}
548 
549 	for(;;) {
550 		error = fts_eval_sql(trx, *graph);
551 
552 		if (error == DB_SUCCESS) {
553 			fts_sql_commit(trx);
554 
555 			break;				/* Exit the loop. */
556 		} else {
557 			fts_sql_rollback(trx);
558 
559 			ut_print_timestamp(stderr);
560 
561 			if (error == DB_LOCK_WAIT_TIMEOUT) {
562 				fprintf(stderr, " InnoDB: Warning: lock wait "
563 					"timeout reading FTS index. "
564 					"Retrying!\n");
565 
566 				trx->error_state = DB_SUCCESS;
567 			} else {
568 				fprintf(stderr, " InnoDB: Error: (%s) "
569 					"while reading FTS index.\n",
570 					ut_strerr(error));
571 
572 				break;			/* Exit the loop. */
573 			}
574 		}
575 	}
576 
577 	return(error);
578 }
579 
580 /**********************************************************************//**
581 Read a word */
582 static
583 byte*
fts_zip_read_word(fts_zip_t * zip,fts_string_t * word)584 fts_zip_read_word(
585 /*==============*/
586 	fts_zip_t*	zip,		/*!< in: Zip state + data */
587 	fts_string_t*	word)		/*!< out: uncompressed word */
588 {
589 	short		len = 0;
590 	void*		null = NULL;
591 	byte*		ptr = word->f_str;
592 	int		flush = Z_NO_FLUSH;
593 
594 	/* Either there was an error or we are at the Z_STREAM_END. */
595 	if (zip->status != Z_OK) {
596 		return(NULL);
597 	}
598 
599 	zip->zp->next_out = reinterpret_cast<byte*>(&len);
600 	zip->zp->avail_out = sizeof(len);
601 
602 	while (zip->status == Z_OK && zip->zp->avail_out > 0) {
603 
604 		/* Finished decompressing block. */
605 		if (zip->zp->avail_in == 0) {
606 
607 			/* Free the block thats been decompressed. */
608 			if (zip->pos > 0) {
609 				ulint	prev = zip->pos - 1;
610 
611 				ut_a(zip->pos < ib_vector_size(zip->blocks));
612 
613 				ut_free(ib_vector_getp(zip->blocks, prev));
614 				ib_vector_set(zip->blocks, prev, &null);
615 			}
616 
617 			/* Any more blocks to decompress. */
618 			if (zip->pos < ib_vector_size(zip->blocks)) {
619 
620 				zip->zp->next_in = static_cast<byte*>(
621 					ib_vector_getp(
622 						zip->blocks, zip->pos));
623 
624 				if (zip->pos > zip->last_big_block) {
625 					zip->zp->avail_in =
626 						FTS_MAX_WORD_LEN;
627 				} else {
628 					zip->zp->avail_in = static_cast<uInt>(zip->block_sz);
629 				}
630 
631 				++zip->pos;
632 			} else {
633 				flush = Z_FINISH;
634 			}
635 		}
636 
637 		switch (zip->status = inflate(zip->zp, flush)) {
638 		case Z_OK:
639 			if (zip->zp->avail_out == 0 && len > 0) {
640 
641 				ut_a(len <= FTS_MAX_WORD_LEN);
642 				ptr[len] = 0;
643 
644 				zip->zp->next_out = ptr;
645 				zip->zp->avail_out = len;
646 
647 				word->f_len = len;
648 				len = 0;
649 			}
650 			break;
651 
652 		case Z_BUF_ERROR:	/* No progress possible. */
653 		case Z_STREAM_END:
654 			inflateEnd(zip->zp);
655 			break;
656 
657 		case Z_STREAM_ERROR:
658 		default:
659 			ut_error;
660 		}
661 	}
662 
663 	/* All blocks must be freed at end of inflate. */
664 	if (zip->status != Z_OK) {
665 		for (ulint i = 0; i < ib_vector_size(zip->blocks); ++i) {
666 			if (ib_vector_getp(zip->blocks, i)) {
667 				ut_free(ib_vector_getp(zip->blocks, i));
668 				ib_vector_set(zip->blocks, i, &null);
669 			}
670 		}
671 	}
672 
673 	if (ptr != NULL) {
674 		ut_ad(word->f_len == strlen((char*) ptr));
675 	}
676 
677 	return(zip->status == Z_OK || zip->status == Z_STREAM_END ? ptr : NULL);
678 }
679 
680 /**********************************************************************//**
681 Callback function to fetch and compress the word in an FTS
682 INDEX record.
683 @return FALSE on EOF */
684 static
685 ibool
fts_fetch_index_words(void * row,void * user_arg)686 fts_fetch_index_words(
687 /*==================*/
688 	void*		row,		/*!< in: sel_node_t* */
689 	void*		user_arg)	/*!< in: pointer to ib_vector_t */
690 {
691 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
692 	fts_zip_t*	zip = static_cast<fts_zip_t*>(user_arg);
693 	que_node_t*	exp = sel_node->select_list;
694 	dfield_t*	dfield = que_node_get_val(exp);
695 	short		len =  static_cast<short>(dfield_get_len(dfield));
696 	void*		data = dfield_get_data(dfield);
697 
698 	/* Skip the duplicate words. */
699 	if (zip->word.f_len == static_cast<ulint>(len)
700 	    && !memcmp(zip->word.f_str, data, len)) {
701 
702 		return(TRUE);
703 	}
704 
705 	ut_a(len <= FTS_MAX_WORD_LEN);
706 
707 	memcpy(zip->word.f_str, data, len);
708 	zip->word.f_len = len;
709 
710 	ut_a(zip->zp->avail_in == 0);
711 	ut_a(zip->zp->next_in == NULL);
712 
713 	/* The string is prefixed by len. */
714 	zip->zp->next_in = reinterpret_cast<byte*>(&len);
715 	zip->zp->avail_in = sizeof(len);
716 
717 	/* Compress the word, create output blocks as necessary. */
718 	while (zip->zp->avail_in > 0) {
719 
720 		/* No space left in output buffer, create a new one. */
721 		if (zip->zp->avail_out == 0) {
722 			byte*		block;
723 
724 			block = static_cast<byte*>(ut_malloc(zip->block_sz));
725 			ib_vector_push(zip->blocks, &block);
726 
727 			zip->zp->next_out = block;
728 			zip->zp->avail_out = static_cast<uInt>(zip->block_sz);
729 		}
730 
731 		switch (zip->status = deflate(zip->zp, Z_NO_FLUSH)) {
732 		case Z_OK:
733 			if (zip->zp->avail_in == 0) {
734 				zip->zp->next_in = static_cast<byte*>(data);
735 				zip->zp->avail_in = len;
736 				ut_a(len <= FTS_MAX_WORD_LEN);
737 				len = 0;
738 			}
739 			break;
740 
741 		case Z_STREAM_END:
742 		case Z_BUF_ERROR:
743 		case Z_STREAM_ERROR:
744 		default:
745 			ut_error;
746 			break;
747 		}
748 	}
749 
750 	/* All data should have been compressed. */
751 	ut_a(zip->zp->avail_in == 0);
752 	zip->zp->next_in = NULL;
753 
754 	++zip->n_words;
755 
756 	return(zip->n_words >= zip->max_words ? FALSE : TRUE);
757 }
758 
759 /**********************************************************************//**
760 Finish Zip deflate. */
761 static
762 void
fts_zip_deflate_end(fts_zip_t * zip)763 fts_zip_deflate_end(
764 /*================*/
765 	fts_zip_t*	zip)		/*!< in: instance that should be closed*/
766 {
767 	ut_a(zip->zp->avail_in == 0);
768 	ut_a(zip->zp->next_in == NULL);
769 
770 	zip->status = deflate(zip->zp, Z_FINISH);
771 
772 	ut_a(ib_vector_size(zip->blocks) > 0);
773 	zip->last_big_block = ib_vector_size(zip->blocks) - 1;
774 
775 	/* Allocate smaller block(s), since this is trailing data. */
776 	while (zip->status == Z_OK) {
777 		byte*		block;
778 
779 		ut_a(zip->zp->avail_out == 0);
780 
781 		block = static_cast<byte*>(ut_malloc(FTS_MAX_WORD_LEN + 1));
782 		ib_vector_push(zip->blocks, &block);
783 
784 		zip->zp->next_out = block;
785 		zip->zp->avail_out = FTS_MAX_WORD_LEN;
786 
787 		zip->status = deflate(zip->zp, Z_FINISH);
788 	}
789 
790 	ut_a(zip->status == Z_STREAM_END);
791 
792 	zip->status = deflateEnd(zip->zp);
793 	ut_a(zip->status == Z_OK);
794 
795 	/* Reset the ZLib data structure. */
796 	memset(zip->zp, 0, sizeof(*zip->zp));
797 }
798 
799 /**********************************************************************//**
800 Read the words from the FTS INDEX.
801 @return DB_SUCCESS if all OK, DB_TABLE_NOT_FOUND if no more indexes
802         to search else error code */
803 static MY_ATTRIBUTE((nonnull, warn_unused_result))
804 dberr_t
fts_index_fetch_words(fts_optimize_t * optim,const fts_string_t * word,ulint n_words)805 fts_index_fetch_words(
806 /*==================*/
807 	fts_optimize_t*		optim,	/*!< in: optimize scratch pad */
808 	const fts_string_t*	word,	/*!< in: get words greater than this
809 					 word */
810 	ulint			n_words)/*!< in: max words to read */
811 {
812 	pars_info_t*	info;
813 	que_t*		graph;
814 	ulint		selected;
815 	fts_zip_t*	zip = NULL;
816 	dberr_t		error = DB_SUCCESS;
817 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
818 	ibool		inited = FALSE;
819 
820 	optim->trx->op_info = "fetching FTS index words";
821 
822 	if (optim->zip == NULL) {
823 		optim->zip = fts_zip_create(heap, FTS_ZIP_BLOCK_SIZE, n_words);
824 	} else {
825 		fts_zip_initialize(optim->zip);
826 	}
827 
828 	for (selected = fts_select_index(
829 		optim->fts_index_table.charset, word->f_str, word->f_len);
830 	     fts_index_selector[selected].value;
831 	     selected++) {
832 
833 		optim->fts_index_table.suffix = fts_get_suffix(selected);
834 
835 		/* We've search all indexes. */
836 		if (optim->fts_index_table.suffix == NULL) {
837 			return(DB_TABLE_NOT_FOUND);
838 		}
839 
840 		info = pars_info_create();
841 
842 		pars_info_bind_function(
843 			info, "my_func", fts_fetch_index_words, optim->zip);
844 
845 		pars_info_bind_varchar_literal(
846 			info, "word", word->f_str, word->f_len);
847 
848 		graph = fts_parse_sql(
849 			&optim->fts_index_table,
850 			info,
851 			"DECLARE FUNCTION my_func;\n"
852 			"DECLARE CURSOR c IS"
853 			" SELECT word\n"
854 			" FROM \"%s\"\n"
855 			" WHERE word > :word\n"
856 			" ORDER BY word;\n"
857 			"BEGIN\n"
858 			"\n"
859 			"OPEN c;\n"
860 			"WHILE 1 = 1 LOOP\n"
861 			"  FETCH c INTO my_func();\n"
862 			"  IF c % NOTFOUND THEN\n"
863 			"    EXIT;\n"
864 			"  END IF;\n"
865 			"END LOOP;\n"
866 			"CLOSE c;");
867 
868 		zip = optim->zip;
869 
870 		for(;;) {
871 			int	err;
872 
873 			if (!inited && ((err = deflateInit(zip->zp, 9))
874 					!= Z_OK)) {
875 				ut_print_timestamp(stderr);
876 				fprintf(stderr,
877 					" InnoDB: Error: ZLib deflateInit() "
878 					"failed: %d\n", err);
879 
880 				error = DB_ERROR;
881 				break;
882 			} else {
883 				inited = TRUE;
884 				error = fts_eval_sql(optim->trx, graph);
885 			}
886 
887 			if (error == DB_SUCCESS) {
888 				//FIXME fts_sql_commit(optim->trx);
889 				break;
890 			} else {
891 				//FIXME fts_sql_rollback(optim->trx);
892 
893 				ut_print_timestamp(stderr);
894 
895 				if (error == DB_LOCK_WAIT_TIMEOUT) {
896 					fprintf(stderr, " InnoDB: "
897 						"Warning: lock wait "
898 						"timeout reading document. "
899 						"Retrying!\n");
900 
901 					/* We need to reset the ZLib state. */
902 					inited = FALSE;
903 					deflateEnd(zip->zp);
904 					fts_zip_init(zip);
905 
906 					optim->trx->error_state = DB_SUCCESS;
907 				} else {
908 					fprintf(stderr, " InnoDB: Error: (%s) "
909 						"while reading document.\n",
910 						ut_strerr(error));
911 
912 					break;	/* Exit the loop. */
913 				}
914 			}
915 		}
916 
917 		fts_que_graph_free(graph);
918 
919 		/* Check if max word to fetch is exceeded */
920 		if (optim->zip->n_words >= n_words) {
921 			break;
922 		}
923 	}
924 
925 	if (error == DB_SUCCESS && zip->status == Z_OK && zip->n_words > 0) {
926 
927 		/* All data should have been read. */
928 		ut_a(zip->zp->avail_in == 0);
929 
930 		fts_zip_deflate_end(zip);
931 	} else {
932 		deflateEnd(zip->zp);
933 	}
934 
935 	return(error);
936 }
937 
938 /**********************************************************************//**
939 Callback function to fetch the doc id from the record.
940 @return always returns TRUE */
941 static
942 ibool
fts_fetch_doc_ids(void * row,void * user_arg)943 fts_fetch_doc_ids(
944 /*==============*/
945 	void*	row,		/*!< in: sel_node_t* */
946 	void*	user_arg)	/*!< in: pointer to ib_vector_t */
947 {
948 	que_node_t*	exp;
949 	int		i = 0;
950 	sel_node_t*	sel_node = static_cast<sel_node_t*>(row);
951 	fts_doc_ids_t*	fts_doc_ids = static_cast<fts_doc_ids_t*>(user_arg);
952 	fts_update_t*	update = static_cast<fts_update_t*>(
953 		ib_vector_push(fts_doc_ids->doc_ids, NULL));
954 
955 	for (exp = sel_node->select_list;
956 	     exp;
957 	     exp = que_node_get_next(exp), ++i) {
958 
959 		dfield_t*	dfield = que_node_get_val(exp);
960 		void*		data = dfield_get_data(dfield);
961 		ulint		len = dfield_get_len(dfield);
962 
963 		ut_a(len != UNIV_SQL_NULL);
964 
965 		/* Note: The column numbers below must match the SELECT. */
966 		switch (i) {
967 		case 0: /* DOC_ID */
968 			update->fts_indexes = NULL;
969 			update->doc_id = fts_read_doc_id(
970 				static_cast<byte*>(data));
971 			break;
972 
973 		default:
974 			ut_error;
975 		}
976 	}
977 
978 	return(TRUE);
979 }
980 
981 /**********************************************************************//**
982 Read the rows from a FTS common auxiliary table.
983 @return DB_SUCCESS or error code */
984 UNIV_INTERN
985 dberr_t
fts_table_fetch_doc_ids(trx_t * trx,fts_table_t * fts_table,fts_doc_ids_t * doc_ids)986 fts_table_fetch_doc_ids(
987 /*====================*/
988 	trx_t*		trx,		/*!< in: transaction */
989 	fts_table_t*	fts_table,	/*!< in: table */
990 	fts_doc_ids_t*	doc_ids)	/*!< in: For collecting doc ids */
991 {
992 	dberr_t		error;
993 	que_t*		graph;
994 	pars_info_t*	info = pars_info_create();
995 	ibool		alloc_bk_trx = FALSE;
996 
997 	ut_a(fts_table->suffix != NULL);
998 	ut_a(fts_table->type == FTS_COMMON_TABLE);
999 
1000 	if (!trx) {
1001 		trx = trx_allocate_for_background();
1002 		alloc_bk_trx = TRUE;
1003 	}
1004 
1005 	trx->op_info = "fetching FTS doc ids";
1006 
1007 	pars_info_bind_function(info, "my_func", fts_fetch_doc_ids, doc_ids);
1008 
1009 	graph = fts_parse_sql(
1010 		fts_table,
1011 		info,
1012 		"DECLARE FUNCTION my_func;\n"
1013 		"DECLARE CURSOR c IS"
1014 		" SELECT doc_id FROM \"%s\";\n"
1015 		"BEGIN\n"
1016 		"\n"
1017 		"OPEN c;\n"
1018 		"WHILE 1 = 1 LOOP\n"
1019 		"  FETCH c INTO my_func();\n"
1020 		"  IF c % NOTFOUND THEN\n"
1021 		"    EXIT;\n"
1022 		"  END IF;\n"
1023 		"END LOOP;\n"
1024 		"CLOSE c;");
1025 
1026 	error = fts_eval_sql(trx, graph);
1027 
1028 	mutex_enter(&dict_sys->mutex);
1029 	que_graph_free(graph);
1030 	mutex_exit(&dict_sys->mutex);
1031 
1032 	if (error == DB_SUCCESS) {
1033 		fts_sql_commit(trx);
1034 
1035 		ib_vector_sort(doc_ids->doc_ids, fts_update_doc_id_cmp);
1036 	} else {
1037 		fts_sql_rollback(trx);
1038 	}
1039 
1040 	if (alloc_bk_trx) {
1041 		trx_free_for_background(trx);
1042 	}
1043 
1044 	return(error);
1045 }
1046 
1047 /**********************************************************************//**
1048 Do a binary search for a doc id in the array
1049 @return +ve index if found -ve index where it should be inserted
1050         if not found */
1051 UNIV_INTERN
1052 int
fts_bsearch(fts_update_t * array,int lower,int upper,doc_id_t doc_id)1053 fts_bsearch(
1054 /*========*/
1055 	fts_update_t*	array,	/*!< in: array to sort */
1056 	int		lower,	/*!< in: the array lower bound */
1057 	int		upper,	/*!< in: the array upper bound */
1058 	doc_id_t	doc_id)	/*!< in: the doc id to search for */
1059 {
1060 	int	orig_size = upper;
1061 
1062 	if (upper == 0) {
1063 		/* Nothing to search */
1064 		return(-1);
1065 	} else {
1066 		while (lower < upper) {
1067 			int	i = (lower + upper) >> 1;
1068 
1069 			if (doc_id > array[i].doc_id) {
1070 				lower = i + 1;
1071 			} else if (doc_id < array[i].doc_id) {
1072 				upper = i - 1;
1073 			} else {
1074 				return(i); /* Found. */
1075 			}
1076 		}
1077 	}
1078 
1079 	if (lower == upper && lower < orig_size) {
1080 		if (doc_id == array[lower].doc_id) {
1081 			return(lower);
1082 		} else if (lower == 0) {
1083 			return(-1);
1084 		}
1085 	}
1086 
1087 	/* Not found. */
1088 	return( (lower == 0) ? -1 : -lower);
1089 }
1090 
1091 /**********************************************************************//**
1092 Search in the to delete array whether any of the doc ids within
1093 the [first, last] range are to be deleted
1094 @return +ve index if found -ve index where it should be inserted
1095         if not found */
1096 static
1097 int
fts_optimize_lookup(ib_vector_t * doc_ids,ulint lower,doc_id_t first_doc_id,doc_id_t last_doc_id)1098 fts_optimize_lookup(
1099 /*================*/
1100 	ib_vector_t*	doc_ids,	/*!< in: array to search */
1101 	ulint		lower,		/*!< in: lower limit of array */
1102 	doc_id_t	first_doc_id,	/*!< in: doc id to lookup */
1103 	doc_id_t	last_doc_id)	/*!< in: doc id to lookup */
1104 {
1105 	int		pos;
1106 	int		upper = static_cast<int>(ib_vector_size(doc_ids));
1107 	fts_update_t*	array = (fts_update_t*) doc_ids->data;
1108 
1109 	pos = fts_bsearch(array, static_cast<int>(lower), upper, first_doc_id);
1110 
1111 	ut_a(abs(pos) <= upper + 1);
1112 
1113 	if (pos < 0) {
1114 
1115 		int	i = abs(pos);
1116 
1117 		/* If i is 1, it could be first_doc_id is less than
1118 		either the first or second array item, do a
1119 		double check */
1120 		if (i == 1 && array[0].doc_id <= last_doc_id
1121 		    && first_doc_id < array[0].doc_id) {
1122 			pos = 0;
1123 		} else if (i < upper && array[i].doc_id <= last_doc_id) {
1124 
1125 			/* Check if the "next" doc id is within the
1126 			first & last doc id of the node. */
1127 			pos = i;
1128 		}
1129 	}
1130 
1131 	return(pos);
1132 }
1133 
1134 /**********************************************************************//**
1135 Encode the word pos list into the node
1136 @return DB_SUCCESS or error code*/
1137 static MY_ATTRIBUTE((nonnull))
1138 dberr_t
fts_optimize_encode_node(fts_node_t * node,doc_id_t doc_id,fts_encode_t * enc)1139 fts_optimize_encode_node(
1140 /*=====================*/
1141 	fts_node_t*	node,		/*!< in: node to fill*/
1142 	doc_id_t	doc_id,		/*!< in: doc id to encode */
1143 	fts_encode_t*	enc)		/*!< in: encoding state.*/
1144 {
1145 	byte*		dst;
1146 	ulint		enc_len;
1147 	ulint		pos_enc_len;
1148 	doc_id_t	doc_id_delta;
1149 	dberr_t		error = DB_SUCCESS;
1150 	byte*		src = enc->src_ilist_ptr;
1151 
1152 	if (node->first_doc_id == 0) {
1153 		ut_a(node->last_doc_id == 0);
1154 
1155 		node->first_doc_id = doc_id;
1156 	}
1157 
1158 	/* Calculate the space required to store the ilist. */
1159 	ut_ad(doc_id > node->last_doc_id);
1160 	doc_id_delta = doc_id - node->last_doc_id;
1161 	enc_len = fts_get_encoded_len(static_cast<ulint>(doc_id_delta));
1162 
1163 	/* Calculate the size of the encoded pos array. */
1164 	while (*src) {
1165 		fts_decode_vlc(&src);
1166 	}
1167 
1168 	/* Skip the 0x00 byte at the end of the word positions list. */
1169 	++src;
1170 
1171 	/* Number of encoded pos bytes to copy. */
1172 	pos_enc_len = src - enc->src_ilist_ptr;
1173 
1174 	/* Total number of bytes required for copy. */
1175 	enc_len += pos_enc_len;
1176 
1177 	/* Check we have enough space in the destination buffer for
1178 	copying the document word list. */
1179 	if (!node->ilist) {
1180 		ulint	new_size;
1181 
1182 		ut_a(node->ilist_size == 0);
1183 
1184 		new_size = enc_len > FTS_ILIST_MAX_SIZE
1185 			? enc_len : FTS_ILIST_MAX_SIZE;
1186 
1187 		node->ilist = static_cast<byte*>(ut_malloc(new_size));
1188 		node->ilist_size_alloc = new_size;
1189 
1190 	} else if ((node->ilist_size + enc_len) > node->ilist_size_alloc) {
1191 		ulint	new_size = node->ilist_size + enc_len;
1192 		byte*	ilist = static_cast<byte*>(ut_malloc(new_size));
1193 
1194 		memcpy(ilist, node->ilist, node->ilist_size);
1195 
1196 		ut_free(node->ilist);
1197 
1198 		node->ilist = ilist;
1199 		node->ilist_size_alloc = new_size;
1200 	}
1201 
1202 	src = enc->src_ilist_ptr;
1203 	dst = node->ilist + node->ilist_size;
1204 
1205 	/* Encode the doc id. Cast to ulint, the delta should be small and
1206 	therefore no loss of precision. */
1207 	dst += fts_encode_int((ulint) doc_id_delta, dst);
1208 
1209 	/* Copy the encoded pos array. */
1210 	memcpy(dst, src, pos_enc_len);
1211 
1212 	node->last_doc_id = doc_id;
1213 
1214 	/* Data copied upto here. */
1215 	node->ilist_size += enc_len;
1216 	enc->src_ilist_ptr += pos_enc_len;
1217 
1218 	ut_a(node->ilist_size <= node->ilist_size_alloc);
1219 
1220 	return(error);
1221 }
1222 
1223 /**********************************************************************//**
1224 Optimize the data contained in a node.
1225 @return DB_SUCCESS or error code*/
1226 static MY_ATTRIBUTE((nonnull))
1227 dberr_t
fts_optimize_node(ib_vector_t * del_vec,int * del_pos,fts_node_t * dst_node,fts_node_t * src_node,fts_encode_t * enc)1228 fts_optimize_node(
1229 /*==============*/
1230 	ib_vector_t*	del_vec,	/*!< in: vector of doc ids to delete*/
1231 	int*		del_pos,	/*!< in: offset into above vector */
1232 	fts_node_t*	dst_node,	/*!< in: node to fill*/
1233 	fts_node_t*	src_node,	/*!< in: source node for data*/
1234 	fts_encode_t*	enc)		/*!< in: encoding state */
1235 {
1236 	ulint		copied;
1237 	dberr_t		error = DB_SUCCESS;
1238 	doc_id_t	doc_id = enc->src_last_doc_id;
1239 
1240 	if (!enc->src_ilist_ptr) {
1241 		enc->src_ilist_ptr = src_node->ilist;
1242 	}
1243 
1244 	copied = enc->src_ilist_ptr - src_node->ilist;
1245 
1246 	/* While there is data in the source node and space to copy
1247 	into in the destination node. */
1248 	while (copied < src_node->ilist_size
1249 	       && dst_node->ilist_size < FTS_ILIST_MAX_SIZE) {
1250 
1251 		doc_id_t	delta;
1252 		doc_id_t	del_doc_id = FTS_NULL_DOC_ID;
1253 
1254 		delta = fts_decode_vlc(&enc->src_ilist_ptr);
1255 
1256 test_again:
1257 		/* Check whether the doc id is in the delete list, if
1258 		so then we skip the entries but we need to track the
1259 		delta for decoding the entries following this document's
1260 		entries. */
1261 		if (*del_pos >= 0 && *del_pos < (int) ib_vector_size(del_vec)) {
1262 			fts_update_t*	update;
1263 
1264 			update = (fts_update_t*) ib_vector_get(
1265 				del_vec, *del_pos);
1266 
1267 			del_doc_id = update->doc_id;
1268 		}
1269 
1270 		if (enc->src_ilist_ptr == src_node->ilist && doc_id == 0) {
1271 			ut_a(delta == src_node->first_doc_id);
1272 		}
1273 
1274 		doc_id += delta;
1275 
1276 		if (del_doc_id > 0 && doc_id == del_doc_id) {
1277 
1278 			++*del_pos;
1279 
1280 			/* Skip the entries for this document. */
1281 			while (*enc->src_ilist_ptr) {
1282 				fts_decode_vlc(&enc->src_ilist_ptr);
1283 			}
1284 
1285 			/* Skip the end of word position marker. */
1286 			++enc->src_ilist_ptr;
1287 
1288 		} else {
1289 
1290 			/* DOC ID already becomes larger than
1291 			del_doc_id, check the next del_doc_id */
1292 			if (del_doc_id > 0 && doc_id > del_doc_id) {
1293 				del_doc_id = 0;
1294 				++*del_pos;
1295 				delta = 0;
1296 				goto test_again;
1297 			}
1298 
1299 			/* Decode and copy the word positions into
1300 			the dest node. */
1301 			fts_optimize_encode_node(dst_node, doc_id, enc);
1302 
1303 			++dst_node->doc_count;
1304 
1305 			ut_a(dst_node->last_doc_id == doc_id);
1306 		}
1307 
1308 		/* Bytes copied so for from source. */
1309 		copied = enc->src_ilist_ptr - src_node->ilist;
1310 	}
1311 
1312 	if (copied >= src_node->ilist_size) {
1313 		ut_a(doc_id == src_node->last_doc_id);
1314 	}
1315 
1316 	enc->src_last_doc_id = doc_id;
1317 
1318 	return(error);
1319 }
1320 
1321 /**********************************************************************//**
1322 Determine the starting pos within the deleted doc id vector for a word.
1323 @return delete position */
1324 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1325 int
fts_optimize_deleted_pos(fts_optimize_t * optim,fts_word_t * word)1326 fts_optimize_deleted_pos(
1327 /*=====================*/
1328 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1329 	fts_word_t*	word)		/*!< in: the word data to check */
1330 {
1331 	int		del_pos;
1332 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1333 
1334 	/* Get the first and last dict ids for the word, we will use
1335 	these values to determine which doc ids need to be removed
1336 	when we coalesce the nodes. This way we can reduce the numer
1337 	of elements that need to be searched in the deleted doc ids
1338 	vector and secondly we can remove the doc ids during the
1339 	coalescing phase. */
1340 	if (ib_vector_size(del_vec) > 0) {
1341 		fts_node_t*	node;
1342 		doc_id_t	last_id;
1343 		doc_id_t	first_id;
1344 		ulint		size = ib_vector_size(word->nodes);
1345 
1346 		node = (fts_node_t*) ib_vector_get(word->nodes, 0);
1347 		first_id = node->first_doc_id;
1348 
1349 		node = (fts_node_t*) ib_vector_get(word->nodes, size - 1);
1350 		last_id = node->last_doc_id;
1351 
1352 		ut_a(first_id <= last_id);
1353 
1354 		del_pos = fts_optimize_lookup(
1355 			del_vec, optim->del_pos, first_id, last_id);
1356 	} else {
1357 
1358 		del_pos = -1; /* Note that there is nothing to delete. */
1359 	}
1360 
1361 	return(del_pos);
1362 }
1363 
1364 #define FTS_DEBUG_PRINT
1365 /**********************************************************************//**
1366 Compact the nodes for a word, we also remove any doc ids during the
1367 compaction pass.
1368 @return DB_SUCCESS or error code.*/
1369 static
1370 ib_vector_t*
fts_optimize_word(fts_optimize_t * optim,fts_word_t * word)1371 fts_optimize_word(
1372 /*==============*/
1373 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1374 	fts_word_t*	word)		/*!< in: the word to optimize */
1375 {
1376 	fts_encode_t	enc;
1377 	ib_vector_t*	nodes;
1378 	ulint		i = 0;
1379 	int		del_pos;
1380 	fts_node_t*	dst_node = NULL;
1381 	ib_vector_t*	del_vec = optim->to_delete->doc_ids;
1382 	ulint		size = ib_vector_size(word->nodes);
1383 
1384 	del_pos = fts_optimize_deleted_pos(optim, word);
1385 	nodes = ib_vector_create(word->heap_alloc, sizeof(*dst_node), 128);
1386 
1387 	enc.src_last_doc_id = 0;
1388 	enc.src_ilist_ptr = NULL;
1389 
1390 	if (fts_enable_diag_print) {
1391 		word->text.f_str[word->text.f_len] = 0;
1392 		fprintf(stderr, "FTS_OPTIMIZE: optimize \"%s\"\n",
1393 			word->text.f_str);
1394 	}
1395 
1396 	while (i < size) {
1397 		ulint		copied;
1398 		fts_node_t*	src_node;
1399 
1400 		src_node = (fts_node_t*) ib_vector_get(word->nodes, i);
1401 
1402 		if (dst_node == NULL
1403 		    || dst_node->last_doc_id > src_node->first_doc_id) {
1404 
1405 			dst_node = static_cast<fts_node_t*>(
1406 				ib_vector_push(nodes, NULL));
1407 			memset(dst_node, 0, sizeof(*dst_node));
1408 		}
1409 
1410 		/* Copy from the src to the dst node. */
1411 		fts_optimize_node(del_vec, &del_pos, dst_node, src_node, &enc);
1412 
1413 		ut_a(enc.src_ilist_ptr != NULL);
1414 
1415 		/* Determine the numer of bytes copied to dst_node. */
1416 		copied = enc.src_ilist_ptr - src_node->ilist;
1417 
1418 		/* Can't copy more than whats in the vlc array. */
1419 		ut_a(copied <= src_node->ilist_size);
1420 
1421 		/* We are done with this node release the resources. */
1422 		if (copied == src_node->ilist_size) {
1423 
1424 			enc.src_last_doc_id = 0;
1425 			enc.src_ilist_ptr = NULL;
1426 
1427 			ut_free(src_node->ilist);
1428 
1429 			src_node->ilist = NULL;
1430 			src_node->ilist_size = src_node->ilist_size_alloc = 0;
1431 
1432 			src_node = NULL;
1433 
1434 			++i; /* Get next source node to OPTIMIZE. */
1435 		}
1436 
1437 		if (dst_node->ilist_size >= FTS_ILIST_MAX_SIZE || i >= size) {
1438 
1439 			dst_node = NULL;
1440 		}
1441 	}
1442 
1443 	/* All dst nodes created should have been added to the vector. */
1444 	ut_a(dst_node == NULL);
1445 
1446 	/* Return the OPTIMIZED nodes. */
1447 	return(nodes);
1448 }
1449 
1450 /**********************************************************************//**
1451 Update the FTS index table. This is a delete followed by an insert.
1452 @return DB_SUCCESS or error code */
1453 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1454 dberr_t
fts_optimize_write_word(trx_t * trx,fts_table_t * fts_table,fts_string_t * word,ib_vector_t * nodes)1455 fts_optimize_write_word(
1456 /*====================*/
1457 	trx_t*		trx,		/*!< in: transaction */
1458 	fts_table_t*	fts_table,	/*!< in: table of FTS index */
1459 	fts_string_t*	word,		/*!< in: word data to write */
1460 	ib_vector_t*	nodes)		/*!< in: the nodes to write */
1461 {
1462 	ulint		i;
1463 	pars_info_t*	info;
1464 	que_t*		graph;
1465 	ulint		selected;
1466 	dberr_t		error = DB_SUCCESS;
1467 	char*		table_name = fts_get_table_name(fts_table);
1468 
1469 	info = pars_info_create();
1470 
1471 	ut_ad(fts_table->charset);
1472 
1473 	if (fts_enable_diag_print) {
1474 		fprintf(stderr, "FTS_OPTIMIZE: processed \"%s\"\n",
1475 			word->f_str);
1476 	}
1477 
1478 	pars_info_bind_varchar_literal(
1479 		info, "word", word->f_str, word->f_len);
1480 
1481 	selected = fts_select_index(fts_table->charset,
1482 				    word->f_str, word->f_len);
1483 
1484 	fts_table->suffix = fts_get_suffix(selected);
1485 
1486 	graph = fts_parse_sql(
1487 		fts_table,
1488 		info,
1489 		"BEGIN DELETE FROM \"%s\" WHERE word = :word;");
1490 
1491 	error = fts_eval_sql(trx, graph);
1492 
1493 	if (error != DB_SUCCESS) {
1494 		ut_print_timestamp(stderr);
1495 		fprintf(stderr, " InnoDB: Error: (%s) during optimize, "
1496 			"when deleting a word from the FTS index.\n",
1497 			ut_strerr(error));
1498 	}
1499 
1500 	fts_que_graph_free(graph);
1501 	graph = NULL;
1502 
1503 	mem_free(table_name);
1504 
1505 	/* Even if the operation needs to be rolled back and redone,
1506 	we iterate over the nodes in order to free the ilist. */
1507 	for (i = 0; i < ib_vector_size(nodes); ++i) {
1508 
1509 		fts_node_t* node = (fts_node_t*) ib_vector_get(nodes, i);
1510 
1511 		if (error == DB_SUCCESS) {
1512 			error = fts_write_node(
1513 				trx, &graph, fts_table, word, node);
1514 
1515 			if (error != DB_SUCCESS) {
1516 				ut_print_timestamp(stderr);
1517 				fprintf(stderr, " InnoDB: Error: (%s) "
1518 					"during optimize, while adding a "
1519 					"word to the FTS index.\n",
1520 					ut_strerr(error));
1521 			}
1522 		}
1523 
1524 		ut_free(node->ilist);
1525 		node->ilist = NULL;
1526 		node->ilist_size = node->ilist_size_alloc = 0;
1527 	}
1528 
1529 	if (graph != NULL) {
1530 		fts_que_graph_free(graph);
1531 	}
1532 
1533 	return(error);
1534 }
1535 
1536 /**********************************************************************//**
1537 Free fts_optimizer_word_t instanace.*/
1538 UNIV_INTERN
1539 void
fts_word_free(fts_word_t * word)1540 fts_word_free(
1541 /*==========*/
1542 	fts_word_t*	word)		/*!< in: instance to free.*/
1543 {
1544 	mem_heap_t*	heap = static_cast<mem_heap_t*>(word->heap_alloc->arg);
1545 
1546 #ifdef UNIV_DEBUG
1547 	memset(word, 0, sizeof(*word));
1548 #endif /* UNIV_DEBUG */
1549 
1550 	mem_heap_free(heap);
1551 }
1552 
1553 /**********************************************************************//**
1554 Optimize the word ilist and rewrite data to the FTS index.
1555 @return status one of RESTART, EXIT, ERROR */
1556 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1557 dberr_t
fts_optimize_compact(fts_optimize_t * optim,dict_index_t * index,ib_time_t start_time)1558 fts_optimize_compact(
1559 /*=================*/
1560 	fts_optimize_t*	optim,		/*!< in: optimize state data */
1561 	dict_index_t*	index,		/*!< in: current FTS being optimized */
1562 	ib_time_t	start_time)	/*!< in: optimize start time */
1563 {
1564 	ulint		i;
1565 	dberr_t		error = DB_SUCCESS;
1566 	ulint		size = ib_vector_size(optim->words);
1567 
1568 	for (i = 0; i < size && error == DB_SUCCESS && !optim->done; ++i) {
1569 		fts_word_t*	word;
1570 		ib_vector_t*	nodes;
1571 		trx_t*		trx = optim->trx;
1572 
1573 		word = (fts_word_t*) ib_vector_get(optim->words, i);
1574 
1575 		/* nodes is allocated from the word heap and will be destroyed
1576 		when the word is freed. We however have to be careful about
1577 		the ilist, that needs to be freed explicitly. */
1578 		nodes = fts_optimize_word(optim, word);
1579 
1580 		/* Update the data on disk. */
1581 		error = fts_optimize_write_word(
1582 			trx, &optim->fts_index_table, &word->text, nodes);
1583 
1584 		if (error == DB_SUCCESS) {
1585 			/* Write the last word optimized to the config table,
1586 			we use this value for restarting optimize. */
1587 			error = fts_config_set_index_value(
1588 				optim->trx, index,
1589 				FTS_LAST_OPTIMIZED_WORD, &word->text);
1590 		}
1591 
1592 		/* Free the word that was optimized. */
1593 		fts_word_free(word);
1594 
1595 		if (fts_optimize_time_limit > 0
1596 		    && (ut_time() - start_time) > fts_optimize_time_limit) {
1597 
1598 			optim->done = TRUE;
1599 		}
1600 	}
1601 
1602 	return(error);
1603 }
1604 
1605 /**********************************************************************//**
1606 Create an instance of fts_optimize_t. Also create a new
1607 background transaction.*/
1608 static
1609 fts_optimize_t*
fts_optimize_create(dict_table_t * table)1610 fts_optimize_create(
1611 /*================*/
1612 	dict_table_t*	table)		/*!< in: table with FTS indexes */
1613 {
1614 	fts_optimize_t*	optim;
1615 	mem_heap_t*	heap = mem_heap_create(128);
1616 
1617 	optim = (fts_optimize_t*) mem_heap_zalloc(heap, sizeof(*optim));
1618 
1619 	optim->self_heap = ib_heap_allocator_create(heap);
1620 
1621 	optim->to_delete = fts_doc_ids_create();
1622 
1623 	optim->words = ib_vector_create(
1624 		optim->self_heap, sizeof(fts_word_t), 256);
1625 
1626 	optim->table = table;
1627 
1628 	optim->trx = trx_allocate_for_background();
1629 
1630 	optim->fts_common_table.parent = table->name;
1631 	optim->fts_common_table.table_id = table->id;
1632 	optim->fts_common_table.type = FTS_COMMON_TABLE;
1633 	optim->fts_common_table.table = table;
1634 
1635 	optim->fts_index_table.parent = table->name;
1636 	optim->fts_index_table.table_id = table->id;
1637 	optim->fts_index_table.type = FTS_INDEX_TABLE;
1638 	optim->fts_index_table.table = table;
1639 
1640 	/* The common prefix for all this parent table's aux tables. */
1641 	optim->name_prefix = fts_get_table_name_prefix(
1642 		&optim->fts_common_table);
1643 
1644 	return(optim);
1645 }
1646 
1647 #ifdef FTS_OPTIMIZE_DEBUG
1648 /**********************************************************************//**
1649 Get optimize start time of an FTS index.
1650 @return DB_SUCCESS if all OK else error code */
1651 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1652 dberr_t
fts_optimize_get_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t * start_time)1653 fts_optimize_get_index_start_time(
1654 /*==============================*/
1655 	trx_t*		trx,			/*!< in: transaction */
1656 	dict_index_t*	index,			/*!< in: FTS index */
1657 	ib_time_t*	start_time)		/*!< out: time in secs */
1658 {
1659 	return(fts_config_get_index_ulint(
1660 		       trx, index, FTS_OPTIMIZE_START_TIME,
1661 		       (ulint*) start_time));
1662 }
1663 
1664 /**********************************************************************//**
1665 Set the optimize start time of an FTS index.
1666 @return DB_SUCCESS if all OK else error code */
1667 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1668 dberr_t
fts_optimize_set_index_start_time(trx_t * trx,dict_index_t * index,ib_time_t start_time)1669 fts_optimize_set_index_start_time(
1670 /*==============================*/
1671 	trx_t*		trx,			/*!< in: transaction */
1672 	dict_index_t*	index,			/*!< in: FTS index */
1673 	ib_time_t	start_time)		/*!< in: start time */
1674 {
1675 	return(fts_config_set_index_ulint(
1676 		       trx, index, FTS_OPTIMIZE_START_TIME,
1677 		       (ulint) start_time));
1678 }
1679 
1680 /**********************************************************************//**
1681 Get optimize end time of an FTS index.
1682 @return DB_SUCCESS if all OK else error code */
1683 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1684 dberr_t
fts_optimize_get_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t * end_time)1685 fts_optimize_get_index_end_time(
1686 /*============================*/
1687 	trx_t*		trx,			/*!< in: transaction */
1688 	dict_index_t*	index,			/*!< in: FTS index */
1689 	ib_time_t*	end_time)		/*!< out: time in secs */
1690 {
1691 	return(fts_config_get_index_ulint(
1692 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint*) end_time));
1693 }
1694 
1695 /**********************************************************************//**
1696 Set the optimize end time of an FTS index.
1697 @return DB_SUCCESS if all OK else error code */
1698 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1699 dberr_t
fts_optimize_set_index_end_time(trx_t * trx,dict_index_t * index,ib_time_t end_time)1700 fts_optimize_set_index_end_time(
1701 /*============================*/
1702 	trx_t*		trx,			/*!< in: transaction */
1703 	dict_index_t*	index,			/*!< in: FTS index */
1704 	ib_time_t	end_time)		/*!< in: end time */
1705 {
1706 	return(fts_config_set_index_ulint(
1707 		       trx, index, FTS_OPTIMIZE_END_TIME, (ulint) end_time));
1708 }
1709 #endif
1710 
1711 /**********************************************************************//**
1712 Free the optimize prepared statements.*/
1713 static
1714 void
fts_optimize_graph_free(fts_optimize_graph_t * graph)1715 fts_optimize_graph_free(
1716 /*====================*/
1717 	fts_optimize_graph_t*	graph)	/*!< in/out: The graph instances
1718 					to free */
1719 {
1720 	if (graph->commit_graph) {
1721 		que_graph_free(graph->commit_graph);
1722 		graph->commit_graph = NULL;
1723 	}
1724 
1725 	if (graph->write_nodes_graph) {
1726 		que_graph_free(graph->write_nodes_graph);
1727 		graph->write_nodes_graph = NULL;
1728 	}
1729 
1730 	if (graph->delete_nodes_graph) {
1731 		que_graph_free(graph->delete_nodes_graph);
1732 		graph->delete_nodes_graph = NULL;
1733 	}
1734 
1735 	if (graph->read_nodes_graph) {
1736 		que_graph_free(graph->read_nodes_graph);
1737 		graph->read_nodes_graph = NULL;
1738 	}
1739 }
1740 
1741 /**********************************************************************//**
1742 Free all optimize resources. */
1743 static
1744 void
fts_optimize_free(fts_optimize_t * optim)1745 fts_optimize_free(
1746 /*==============*/
1747 	fts_optimize_t*	optim)		/*!< in: table with on FTS index */
1748 {
1749 	mem_heap_t*	heap = static_cast<mem_heap_t*>(optim->self_heap->arg);
1750 
1751 	trx_free_for_background(optim->trx);
1752 
1753 	fts_doc_ids_free(optim->to_delete);
1754 	fts_optimize_graph_free(&optim->graph);
1755 
1756 	mem_free(optim->name_prefix);
1757 
1758 	/* This will free the heap from which optim itself was allocated. */
1759 	mem_heap_free(heap);
1760 }
1761 
1762 /**********************************************************************//**
1763 Get the max time optimize should run in millisecs.
1764 @return max optimize time limit in millisecs. */
1765 static
1766 ib_time_t
fts_optimize_get_time_limit(trx_t * trx,fts_table_t * fts_table)1767 fts_optimize_get_time_limit(
1768 /*========================*/
1769 	trx_t*		trx,			/*!< in: transaction */
1770 	fts_table_t*	fts_table)		/*!< in: aux table */
1771 {
1772 	ib_time_t	time_limit = 0;
1773 
1774 	fts_config_get_ulint(
1775 		trx, fts_table,
1776 		FTS_OPTIMIZE_LIMIT_IN_SECS, (ulint*) &time_limit);
1777 
1778 	return(time_limit * 1000);
1779 }
1780 
1781 
1782 /**********************************************************************//**
1783 Run OPTIMIZE on the given table. Note: this can take a very long time
1784 (hours). */
1785 static
1786 void
fts_optimize_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1787 fts_optimize_words(
1788 /*===============*/
1789 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1790 	dict_index_t*	index,	/*!< in: current FTS being optimized */
1791 	fts_string_t*	word)	/*!< in: the starting word to optimize */
1792 {
1793 	fts_fetch_t	fetch;
1794 	ib_time_t	start_time;
1795 	que_t*		graph = NULL;
1796 	CHARSET_INFO*	charset = optim->fts_index_table.charset;
1797 
1798 	ut_a(!optim->done);
1799 
1800 	/* Get the time limit from the config table. */
1801 	fts_optimize_time_limit = fts_optimize_get_time_limit(
1802 		optim->trx, &optim->fts_common_table);
1803 
1804 	start_time = ut_time();
1805 
1806 	/* Setup the callback to use for fetching the word ilist etc. */
1807 	fetch.read_arg = optim->words;
1808 	fetch.read_record = fts_optimize_index_fetch_node;
1809 
1810 	fprintf(stderr, "%.*s\n", (int) word->f_len, word->f_str);
1811 
1812 	while(!optim->done) {
1813 		dberr_t	error;
1814 		trx_t*	trx = optim->trx;
1815 		ulint	selected;
1816 
1817 		ut_a(ib_vector_size(optim->words) == 0);
1818 
1819 		selected = fts_select_index(charset, word->f_str, word->f_len);
1820 
1821 		/* Read the index records to optimize. */
1822 		fetch.total_memory = 0;
1823 		error = fts_index_fetch_nodes(
1824 			trx, &graph, &optim->fts_index_table, word,
1825 			&fetch);
1826 		ut_ad(fetch.total_memory < fts_result_cache_limit);
1827 
1828 		if (error == DB_SUCCESS) {
1829 			/* There must be some nodes to read. */
1830 			ut_a(ib_vector_size(optim->words) > 0);
1831 
1832 			/* Optimize the nodes that were read and write
1833 			back to DB. */
1834 			error = fts_optimize_compact(optim, index, start_time);
1835 
1836 			if (error == DB_SUCCESS) {
1837 				fts_sql_commit(optim->trx);
1838 			} else {
1839 				fts_sql_rollback(optim->trx);
1840 			}
1841 		}
1842 
1843 		ib_vector_reset(optim->words);
1844 
1845 		if (error == DB_SUCCESS) {
1846 			if (!optim->done) {
1847 				if (!fts_zip_read_word(optim->zip, word)) {
1848 					optim->done = TRUE;
1849 				} else if (selected
1850 					   != fts_select_index(
1851 						charset, word->f_str,
1852 						word->f_len)
1853 					  && graph) {
1854 					fts_que_graph_free(graph);
1855 					graph = NULL;
1856 				}
1857 			}
1858 		} else if (error == DB_LOCK_WAIT_TIMEOUT) {
1859 			fprintf(stderr, "InnoDB: Warning: lock wait timeout "
1860 				"during optimize. Retrying!\n");
1861 
1862 			trx->error_state = DB_SUCCESS;
1863 		} else if (error == DB_DEADLOCK) {
1864 			fprintf(stderr, "InnoDB: Warning: deadlock "
1865 				"during optimize. Retrying!\n");
1866 
1867 			trx->error_state = DB_SUCCESS;
1868 		} else {
1869 			optim->done = TRUE;		/* Exit the loop. */
1870 		}
1871 	}
1872 
1873 	if (graph != NULL) {
1874 		fts_que_graph_free(graph);
1875 	}
1876 }
1877 
1878 /**********************************************************************//**
1879 Select the FTS index to search.
1880 @return TRUE if last index */
1881 static
1882 ibool
fts_optimize_set_next_word(CHARSET_INFO * charset,fts_string_t * word)1883 fts_optimize_set_next_word(
1884 /*=======================*/
1885 	CHARSET_INFO*	charset,	/*!< in: charset */
1886 	fts_string_t*	word)		/*!< in: current last word */
1887 {
1888 	ulint		selected;
1889 	ibool		last = FALSE;
1890 
1891 	selected = fts_select_next_index(charset, word->f_str, word->f_len);
1892 
1893 	/* If this was the last index then reset to start. */
1894 	if (fts_index_selector[selected].value == 0) {
1895 		/* Reset the last optimized word to '' if no
1896 		more words could be read from the FTS index. */
1897 		word->f_len = 0;
1898 		*word->f_str = 0;
1899 
1900 		last = TRUE;
1901 	} else {
1902 		ulint	value = fts_index_selector[selected].value;
1903 
1904 		ut_a(value <= 0xff);
1905 
1906 		/* Set to the first character of the next slot. */
1907 		word->f_len = 1;
1908 		*word->f_str = (byte) value;
1909 	}
1910 
1911 	return(last);
1912 }
1913 
1914 /**********************************************************************//**
1915 Optimize is complete. Set the completion time, and reset the optimize
1916 start string for this FTS index to "".
1917 @return DB_SUCCESS if all OK */
1918 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1919 dberr_t
fts_optimize_index_completed(fts_optimize_t * optim,dict_index_t * index)1920 fts_optimize_index_completed(
1921 /*=========================*/
1922 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1923 	dict_index_t*	index)	/*!< in: table with one FTS index */
1924 {
1925 	fts_string_t	word;
1926 	dberr_t		error;
1927 	byte		buf[sizeof(ulint)];
1928 #ifdef FTS_OPTIMIZE_DEBUG
1929 	ib_time_t	end_time = ut_time();
1930 
1931 	error = fts_optimize_set_index_end_time(optim->trx, index, end_time);
1932 #endif
1933 
1934 	/* If we've reached the end of the index then set the start
1935 	word to the empty string. */
1936 
1937 	word.f_len = 0;
1938 	word.f_str = buf;
1939 	*word.f_str = '\0';
1940 
1941 	error = fts_config_set_index_value(
1942 		optim->trx, index, FTS_LAST_OPTIMIZED_WORD, &word);
1943 
1944 	if (error != DB_SUCCESS) {
1945 
1946 		fprintf(stderr, "InnoDB: Error: (%s) while "
1947 			"updating last optimized word!\n", ut_strerr(error));
1948 	}
1949 
1950 	return(error);
1951 }
1952 
1953 
1954 /**********************************************************************//**
1955 Read the list of words from the FTS auxiliary index that will be
1956 optimized in this pass.
1957 @return DB_SUCCESS if all OK */
1958 static MY_ATTRIBUTE((nonnull, warn_unused_result))
1959 dberr_t
fts_optimize_index_read_words(fts_optimize_t * optim,dict_index_t * index,fts_string_t * word)1960 fts_optimize_index_read_words(
1961 /*==========================*/
1962 	fts_optimize_t*	optim,	/*!< in: optimize instance */
1963 	dict_index_t*	index,	/*!< in: table with one FTS index */
1964 	fts_string_t*	word)	/*!< in: buffer to use */
1965 {
1966 	dberr_t	error = DB_SUCCESS;
1967 
1968 	if (optim->del_list_regenerated) {
1969 		word->f_len = 0;
1970 	} else {
1971 
1972 		/* Get the last word that was optimized from
1973 		the config table. */
1974 		error = fts_config_get_index_value(
1975 			optim->trx, index, FTS_LAST_OPTIMIZED_WORD, word);
1976 	}
1977 
1978 	/* If record not found then we start from the top. */
1979 	if (error == DB_RECORD_NOT_FOUND) {
1980 		word->f_len = 0;
1981 		error = DB_SUCCESS;
1982 	}
1983 
1984 	while (error == DB_SUCCESS) {
1985 
1986 		error = fts_index_fetch_words(
1987 			optim, word, fts_num_word_optimize);
1988 
1989 		if (error == DB_SUCCESS) {
1990 
1991 			/* If the search returned an empty set
1992 			try the next index in the horizontal split. */
1993 			if (optim->zip->n_words > 0) {
1994 				break;
1995 			} else {
1996 
1997 				fts_optimize_set_next_word(
1998 					optim->fts_index_table.charset,
1999 					word);
2000 
2001 				if (word->f_len == 0) {
2002 					break;
2003 				}
2004 			}
2005 		}
2006 	}
2007 
2008 	return(error);
2009 }
2010 
2011 /**********************************************************************//**
2012 Run OPTIMIZE on the given FTS index. Note: this can take a very long
2013 time (hours).
2014 @return DB_SUCCESS if all OK */
2015 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2016 dberr_t
fts_optimize_index(fts_optimize_t * optim,dict_index_t * index)2017 fts_optimize_index(
2018 /*===============*/
2019 	fts_optimize_t*	optim,	/*!< in: optimize instance */
2020 	dict_index_t*	index)	/*!< in: table with one FTS index */
2021 {
2022 	fts_string_t	word;
2023 	dberr_t		error;
2024 	byte		str[FTS_MAX_WORD_LEN + 1];
2025 
2026 	/* Set the current index that we have to optimize. */
2027 	optim->fts_index_table.index_id = index->id;
2028 	optim->fts_index_table.charset = fts_index_get_charset(index);
2029 
2030 	optim->done = FALSE; /* Optimize until !done */
2031 
2032 	/* We need to read the last word optimized so that we start from
2033 	the next word. */
2034 	word.f_str = str;
2035 
2036 	/* We set the length of word to the size of str since we
2037 	need to pass the max len info to the fts_get_config_value() function. */
2038 	word.f_len = sizeof(str) - 1;
2039 
2040 	memset(word.f_str, 0x0, word.f_len);
2041 
2042 	/* Read the words that will be optimized in this pass. */
2043 	error = fts_optimize_index_read_words(optim, index, &word);
2044 
2045 	if (error == DB_SUCCESS) {
2046 		int	zip_error;
2047 
2048 		ut_a(optim->zip->pos == 0);
2049 		ut_a(optim->zip->zp->total_in == 0);
2050 		ut_a(optim->zip->zp->total_out == 0);
2051 
2052 		zip_error = inflateInit(optim->zip->zp);
2053 		ut_a(zip_error == Z_OK);
2054 
2055 		word.f_len = 0;
2056 		word.f_str = str;
2057 
2058 		/* Read the first word to optimize from the Zip buffer. */
2059 		if (!fts_zip_read_word(optim->zip, &word)) {
2060 
2061 			optim->done = TRUE;
2062 		} else {
2063 			fts_optimize_words(optim, index, &word);
2064 		}
2065 
2066 		/* If we couldn't read any records then optimize is
2067 		complete. Increment the number of indexes that have
2068 		been optimized and set FTS index optimize state to
2069 		completed. */
2070 		if (error == DB_SUCCESS && optim->zip->n_words == 0) {
2071 
2072 			error = fts_optimize_index_completed(optim, index);
2073 
2074 			if (error == DB_SUCCESS) {
2075 				++optim->n_completed;
2076 			}
2077 		}
2078 	}
2079 
2080 	return(error);
2081 }
2082 
2083 /**********************************************************************//**
2084 Delete the document ids in the delete, and delete cache tables.
2085 @return DB_SUCCESS if all OK */
2086 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2087 dberr_t
fts_optimize_purge_deleted_doc_ids(fts_optimize_t * optim)2088 fts_optimize_purge_deleted_doc_ids(
2089 /*===============================*/
2090 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2091 {
2092 	ulint		i;
2093 	pars_info_t*	info;
2094 	que_t*		graph;
2095 	fts_update_t*	update;
2096 	char*		sql_str;
2097 	doc_id_t	write_doc_id;
2098 	dberr_t		error = DB_SUCCESS;
2099 
2100 	info = pars_info_create();
2101 
2102 	ut_a(ib_vector_size(optim->to_delete->doc_ids) > 0);
2103 
2104 	update = static_cast<fts_update_t*>(
2105 		ib_vector_get(optim->to_delete->doc_ids, 0));
2106 
2107 	/* Convert to "storage" byte order. */
2108 	fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2109 
2110 	/* This is required for the SQL parser to work. It must be able
2111 	to find the following variables. So we do it twice. */
2112 	fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2113 	fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2114 
2115 	/* Since we only replace the table_id and don't construct the full
2116 	name, we do substitution ourselves. Remember to free sql_str. */
2117 	sql_str = ut_strreplace(
2118 		fts_delete_doc_ids_sql, "%s", optim->name_prefix);
2119 
2120 	graph = fts_parse_sql(NULL, info, sql_str);
2121 
2122 	mem_free(sql_str);
2123 
2124 	/* Delete the doc ids that were copied at the start. */
2125 	for (i = 0; i < ib_vector_size(optim->to_delete->doc_ids); ++i) {
2126 
2127 		update = static_cast<fts_update_t*>(ib_vector_get(
2128 			optim->to_delete->doc_ids, i));
2129 
2130 		/* Convert to "storage" byte order. */
2131 		fts_write_doc_id((byte*) &write_doc_id, update->doc_id);
2132 
2133 		fts_bind_doc_id(info, "doc_id1", &write_doc_id);
2134 
2135 		fts_bind_doc_id(info, "doc_id2", &write_doc_id);
2136 
2137 		error = fts_eval_sql(optim->trx, graph);
2138 
2139 		// FIXME: Check whether delete actually succeeded!
2140 		if (error != DB_SUCCESS) {
2141 
2142 			fts_sql_rollback(optim->trx);
2143 			break;
2144 		}
2145 	}
2146 
2147 	fts_que_graph_free(graph);
2148 
2149 	return(error);
2150 }
2151 
2152 /**********************************************************************//**
2153 Delete the document ids in the pending delete, and delete tables.
2154 @return DB_SUCCESS if all OK */
2155 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2156 dberr_t
fts_optimize_purge_deleted_doc_id_snapshot(fts_optimize_t * optim)2157 fts_optimize_purge_deleted_doc_id_snapshot(
2158 /*=======================================*/
2159 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2160 {
2161 	dberr_t		error;
2162 	que_t*		graph;
2163 	char*		sql_str;
2164 
2165 	/* Since we only replace the table_id and don't construct
2166 	the full name, we do the '%s' substitution ourselves. */
2167 	sql_str = ut_strreplace(fts_end_delete_sql, "%s", optim->name_prefix);
2168 
2169 	/* Delete the doc ids that were copied to delete pending state at
2170 	the start of optimize. */
2171 	graph = fts_parse_sql(NULL, NULL, sql_str);
2172 
2173 	mem_free(sql_str);
2174 
2175 	error = fts_eval_sql(optim->trx, graph);
2176 	fts_que_graph_free(graph);
2177 
2178 	return(error);
2179 }
2180 
2181 /**********************************************************************//**
2182 Copy the deleted doc ids that will be purged during this optimize run
2183 to the being deleted FTS auxiliary tables. The transaction is committed
2184 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2185 @return DB_SUCCESS if all OK */
2186 static
2187 ulint
fts_optimize_being_deleted_count(fts_optimize_t * optim)2188 fts_optimize_being_deleted_count(
2189 /*=============================*/
2190 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2191 {
2192 	fts_table_t	fts_table;
2193 
2194 	FTS_INIT_FTS_TABLE(&fts_table, "BEING_DELETED", FTS_COMMON_TABLE,
2195 			   optim->table);
2196 
2197 	return(fts_get_rows_count(&fts_table));
2198 }
2199 
2200 /*********************************************************************//**
2201 Copy the deleted doc ids that will be purged during this optimize run
2202 to the being deleted FTS auxiliary tables. The transaction is committed
2203 upon successfull copy and rolled back on DB_DUPLICATE_KEY error.
2204 @return DB_SUCCESS if all OK */
2205 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2206 dberr_t
fts_optimize_create_deleted_doc_id_snapshot(fts_optimize_t * optim)2207 fts_optimize_create_deleted_doc_id_snapshot(
2208 /*========================================*/
2209 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2210 {
2211 	dberr_t		error;
2212 	que_t*		graph;
2213 	char*		sql_str;
2214 
2215 	/* Since we only replace the table_id and don't construct the
2216 	full name, we do the substitution ourselves. */
2217 	sql_str = ut_strreplace(fts_init_delete_sql, "%s", optim->name_prefix);
2218 
2219 	/* Move doc_ids that are to be deleted to state being deleted. */
2220 	graph = fts_parse_sql(NULL, NULL, sql_str);
2221 
2222 	mem_free(sql_str);
2223 
2224 	error = fts_eval_sql(optim->trx, graph);
2225 
2226 	fts_que_graph_free(graph);
2227 
2228 	if (error != DB_SUCCESS) {
2229 		fts_sql_rollback(optim->trx);
2230 	} else {
2231 		fts_sql_commit(optim->trx);
2232 	}
2233 
2234 	optim->del_list_regenerated = TRUE;
2235 
2236 	return(error);
2237 }
2238 
2239 /*********************************************************************//**
2240 Read in the document ids that are to be purged during optimize. The
2241 transaction is committed upon successfully read.
2242 @return DB_SUCCESS if all OK */
2243 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2244 dberr_t
fts_optimize_read_deleted_doc_id_snapshot(fts_optimize_t * optim)2245 fts_optimize_read_deleted_doc_id_snapshot(
2246 /*======================================*/
2247 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2248 {
2249 	dberr_t		error;
2250 
2251 	optim->fts_common_table.suffix = "BEING_DELETED";
2252 
2253 	/* Read the doc_ids to delete. */
2254 	error = fts_table_fetch_doc_ids(
2255 		optim->trx, &optim->fts_common_table, optim->to_delete);
2256 
2257 	if (error == DB_SUCCESS) {
2258 
2259 		optim->fts_common_table.suffix = "BEING_DELETED_CACHE";
2260 
2261 		/* Read additional doc_ids to delete. */
2262 		error = fts_table_fetch_doc_ids(
2263 			optim->trx, &optim->fts_common_table, optim->to_delete);
2264 	}
2265 
2266 	if (error != DB_SUCCESS) {
2267 
2268 		fts_doc_ids_free(optim->to_delete);
2269 		optim->to_delete = NULL;
2270 	}
2271 
2272 	return(error);
2273 }
2274 
2275 /*********************************************************************//**
2276 Optimze all the FTS indexes, skipping those that have already been
2277 optimized, since the FTS auxiliary indexes are not guaranteed to be
2278 of the same cardinality.
2279 @return DB_SUCCESS if all OK */
2280 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2281 dberr_t
fts_optimize_indexes(fts_optimize_t * optim)2282 fts_optimize_indexes(
2283 /*=================*/
2284 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2285 {
2286 	ulint		i;
2287 	dberr_t		error = DB_SUCCESS;
2288 	fts_t*		fts = optim->table->fts;
2289 
2290 	/* Optimize the FTS indexes. */
2291 	for (i = 0; i < ib_vector_size(fts->indexes); ++i) {
2292 		dict_index_t*	index;
2293 
2294 #ifdef	FTS_OPTIMIZE_DEBUG
2295 		ib_time_t	end_time;
2296 		ib_time_t	start_time;
2297 
2298 		/* Get the start and end optimize times for this index. */
2299 		error = fts_optimize_get_index_start_time(
2300 			optim->trx, index, &start_time);
2301 
2302 		if (error != DB_SUCCESS) {
2303 			break;
2304 		}
2305 
2306 		error = fts_optimize_get_index_end_time(
2307 			optim->trx, index, &end_time);
2308 
2309 		if (error != DB_SUCCESS) {
2310 			break;
2311 		}
2312 
2313 		/* Start time will be 0 only for the first time or after
2314 		completing the optimization of all FTS indexes. */
2315 		if (start_time == 0) {
2316 			start_time = ut_time();
2317 
2318 			error = fts_optimize_set_index_start_time(
2319 				optim->trx, index, start_time);
2320 		}
2321 
2322 		/* Check if this index needs to be optimized or not. */
2323 		if (ut_difftime(end_time, start_time) < 0) {
2324 			error = fts_optimize_index(optim, index);
2325 
2326 			if (error != DB_SUCCESS) {
2327 				break;
2328 			}
2329 		} else {
2330 			++optim->n_completed;
2331 		}
2332 #endif
2333 		index = static_cast<dict_index_t*>(
2334 			ib_vector_getp(fts->indexes, i));
2335 		error = fts_optimize_index(optim, index);
2336 	}
2337 
2338 	if (error == DB_SUCCESS) {
2339 		fts_sql_commit(optim->trx);
2340 	} else {
2341 		fts_sql_rollback(optim->trx);
2342 	}
2343 
2344 	return(error);
2345 }
2346 
2347 /*********************************************************************//**
2348 Cleanup the snapshot tables and the master deleted table.
2349 @return DB_SUCCESS if all OK */
2350 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2351 dberr_t
fts_optimize_purge_snapshot(fts_optimize_t * optim)2352 fts_optimize_purge_snapshot(
2353 /*========================*/
2354 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2355 {
2356 	dberr_t		error;
2357 
2358 	/* Delete the doc ids from the master deleted tables, that were
2359 	in the snapshot that was taken at the start of optimize. */
2360 	error = fts_optimize_purge_deleted_doc_ids(optim);
2361 
2362 	if (error == DB_SUCCESS) {
2363 		/* Destroy the deleted doc id snapshot. */
2364 		error = fts_optimize_purge_deleted_doc_id_snapshot(optim);
2365 	}
2366 
2367 	if (error == DB_SUCCESS) {
2368 		fts_sql_commit(optim->trx);
2369 	} else {
2370 		fts_sql_rollback(optim->trx);
2371 	}
2372 
2373 	return(error);
2374 }
2375 
2376 /*********************************************************************//**
2377 Reset the start time to 0 so that a new optimize can be started.
2378 @return DB_SUCCESS if all OK */
2379 static MY_ATTRIBUTE((nonnull, warn_unused_result))
2380 dberr_t
fts_optimize_reset_start_time(fts_optimize_t * optim)2381 fts_optimize_reset_start_time(
2382 /*==========================*/
2383 	fts_optimize_t*	optim)	/*!< in: optimize instance */
2384 {
2385 	dberr_t		error = DB_SUCCESS;
2386 #ifdef FTS_OPTIMIZE_DEBUG
2387 	fts_t*		fts = optim->table->fts;
2388 
2389 	/* Optimization should have been completed for all indexes. */
2390 	ut_a(optim->n_completed == ib_vector_size(fts->indexes));
2391 
2392 	for (uint i = 0; i < ib_vector_size(fts->indexes); ++i) {
2393 		dict_index_t*	index;
2394 
2395 		ib_time_t	start_time = 0;
2396 
2397 		/* Reset the start time to 0 for this index. */
2398 		error = fts_optimize_set_index_start_time(
2399 			optim->trx, index, start_time);
2400 
2401 		index = static_cast<dict_index_t*>(
2402 			ib_vector_getp(fts->indexes, i));
2403 	}
2404 #endif
2405 
2406 	if (error == DB_SUCCESS) {
2407 		fts_sql_commit(optim->trx);
2408 	} else {
2409 		fts_sql_rollback(optim->trx);
2410 	}
2411 
2412 	return(error);
2413 }
2414 
2415 /*********************************************************************//**
2416 Run OPTIMIZE on the given table by a background thread.
2417 @return DB_SUCCESS if all OK */
2418 static MY_ATTRIBUTE((nonnull))
2419 dberr_t
fts_optimize_table_bk(fts_slot_t * slot)2420 fts_optimize_table_bk(
2421 /*==================*/
2422 	fts_slot_t*	slot)	/*!< in: table to optimiza */
2423 {
2424 	dberr_t		error;
2425 	dict_table_t*	table = slot->table;
2426 	fts_t*		fts = table->fts;
2427 
2428 	/* Avoid optimizing tables that were optimized recently. */
2429 	if (slot->last_run > 0
2430 	    && (ut_time() - slot->last_run) < slot->interval_time) {
2431 
2432 		return(DB_SUCCESS);
2433 
2434 	} else if (fts && fts->cache
2435 		   && fts->cache->deleted >= FTS_OPTIMIZE_THRESHOLD) {
2436 
2437 		error = fts_optimize_table(table);
2438 
2439 		if (error == DB_SUCCESS) {
2440 			slot->state = FTS_STATE_DONE;
2441 			slot->last_run = 0;
2442 			slot->completed = ut_time();
2443 		}
2444 	} else {
2445 		error = DB_SUCCESS;
2446 	}
2447 
2448 	/* Note time this run completed. */
2449 	slot->last_run = ut_time();
2450 
2451 	return(error);
2452 }
2453 /*********************************************************************//**
2454 Run OPTIMIZE on the given table.
2455 @return DB_SUCCESS if all OK */
2456 UNIV_INTERN
2457 dberr_t
fts_optimize_table(dict_table_t * table)2458 fts_optimize_table(
2459 /*===============*/
2460 	dict_table_t*	table)	/*!< in: table to optimiza */
2461 {
2462 	dberr_t		error = DB_SUCCESS;
2463 	fts_optimize_t*	optim = NULL;
2464 	fts_t*		fts = table->fts;
2465 
2466 	ut_print_timestamp(stderr);
2467 	fprintf(stderr, " InnoDB: FTS start optimize %s\n", table->name);
2468 
2469 	optim = fts_optimize_create(table);
2470 
2471 	// FIXME: Call this only at the start of optimize, currently we
2472 	// rely on DB_DUPLICATE_KEY to handle corrupting the snapshot.
2473 
2474 	/* Check whether there are still records in BEING_DELETED table */
2475 	if (fts_optimize_being_deleted_count(optim) == 0) {
2476 		/* Take a snapshot of the deleted document ids, they are copied
2477 		to the BEING_ tables. */
2478 		error = fts_optimize_create_deleted_doc_id_snapshot(optim);
2479 	}
2480 
2481 	/* A duplicate error is OK, since we don't erase the
2482 	doc ids from the being deleted state until all FTS
2483 	indexes have been optimized. */
2484 	if (error == DB_DUPLICATE_KEY) {
2485 		error = DB_SUCCESS;
2486 	}
2487 
2488 	if (error == DB_SUCCESS) {
2489 
2490 		/* These document ids will be filtered out during the
2491 		index optimization phase. They are in the snapshot that we
2492 		took above, at the start of the optimize. */
2493 		error = fts_optimize_read_deleted_doc_id_snapshot(optim);
2494 
2495 		if (error == DB_SUCCESS) {
2496 
2497 			/* Commit the read of being deleted
2498 			doc ids transaction. */
2499 			fts_sql_commit(optim->trx);
2500 
2501 			/* We would do optimization only if there
2502 			are deleted records to be cleaned up */
2503 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2504 				error = fts_optimize_indexes(optim);
2505 			}
2506 
2507 		} else {
2508 			ut_a(optim->to_delete == NULL);
2509 		}
2510 
2511 		/* Only after all indexes have been optimized can we
2512 		delete the (snapshot) doc ids in the pending delete,
2513 		and master deleted tables. */
2514 		if (error == DB_SUCCESS
2515 		    && optim->n_completed == ib_vector_size(fts->indexes)) {
2516 
2517 			if (fts_enable_diag_print) {
2518 				fprintf(stderr, "FTS_OPTIMIZE: Completed "
2519 						"Optimize, cleanup DELETED "
2520 						"table\n");
2521 			}
2522 
2523 			if (ib_vector_size(optim->to_delete->doc_ids) > 0) {
2524 
2525 				/* Purge the doc ids that were in the
2526 				snapshot from the snapshot tables and
2527 				the master deleted table. */
2528 				error = fts_optimize_purge_snapshot(optim);
2529 			}
2530 
2531 			if (error == DB_SUCCESS) {
2532 				/* Reset the start time of all the FTS indexes
2533 				so that optimize can be restarted. */
2534 				error = fts_optimize_reset_start_time(optim);
2535 			}
2536 		}
2537 	}
2538 
2539 	fts_optimize_free(optim);
2540 
2541 	ut_print_timestamp(stderr);
2542 	fprintf(stderr, " InnoDB: FTS end optimize %s\n", table->name);
2543 
2544 	return(error);
2545 }
2546 
2547 /********************************************************************//**
2548 Add the table to add to the OPTIMIZER's list.
2549 @return new message instance */
2550 static
2551 fts_msg_t*
fts_optimize_create_msg(fts_msg_type_t type,void * ptr)2552 fts_optimize_create_msg(
2553 /*====================*/
2554 	fts_msg_type_t	type,		/*!< in: type of message */
2555 	void*		ptr)		/*!< in: message payload */
2556 {
2557 	mem_heap_t*	heap;
2558 	fts_msg_t*	msg;
2559 
2560 	heap = mem_heap_create(sizeof(*msg) + sizeof(ib_list_node_t) + 16);
2561 	msg = static_cast<fts_msg_t*>(mem_heap_alloc(heap, sizeof(*msg)));
2562 
2563 	msg->ptr = ptr;
2564 	msg->type = type;
2565 	msg->heap = heap;
2566 
2567 	return(msg);
2568 }
2569 
2570 /**********************************************************************//**
2571 Add the table to add to the OPTIMIZER's list. */
2572 UNIV_INTERN
2573 void
fts_optimize_add_table(dict_table_t * table)2574 fts_optimize_add_table(
2575 /*===================*/
2576 	dict_table_t*	table)			/*!< in: table to add */
2577 {
2578 	fts_msg_t*	msg;
2579 
2580 	if (!fts_optimize_wq) {
2581 		return;
2582 	}
2583 
2584 	/* Make sure table with FTS index cannot be evicted */
2585 	if (table->can_be_evicted) {
2586 		dict_table_move_from_lru_to_non_lru(table);
2587 	}
2588 
2589 	msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
2590 
2591 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2592 }
2593 
2594 /**********************************************************************//**
2595 Optimize a table. */
2596 UNIV_INTERN
2597 void
fts_optimize_do_table(dict_table_t * table)2598 fts_optimize_do_table(
2599 /*==================*/
2600 	dict_table_t*	table)			/*!< in: table to optimize */
2601 {
2602 	fts_msg_t*	msg;
2603 
2604 	/* Optimizer thread could be shutdown */
2605 	if (!fts_optimize_wq) {
2606 		return;
2607 	}
2608 
2609 	msg = fts_optimize_create_msg(FTS_MSG_OPTIMIZE_TABLE, table);
2610 
2611 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2612 }
2613 
2614 /**********************************************************************//**
2615 Remove the table from the OPTIMIZER's list. We do wait for
2616 acknowledgement from the consumer of the message. */
2617 UNIV_INTERN
2618 void
fts_optimize_remove_table(dict_table_t * table)2619 fts_optimize_remove_table(
2620 /*======================*/
2621 	dict_table_t*	table)			/*!< in: table to remove */
2622 {
2623 	fts_msg_t*	msg;
2624 	os_event_t	event;
2625 	fts_msg_del_t*	remove;
2626 
2627 	/* if the optimize system not yet initialized, return */
2628 	if (!fts_optimize_wq) {
2629 		return;
2630 	}
2631 
2632 	/* FTS optimizer thread is already exited */
2633 	if (fts_opt_start_shutdown) {
2634 		ib_logf(IB_LOG_LEVEL_INFO,
2635 			"Try to remove table %s after FTS optimize"
2636 			" thread exiting.", table->name);
2637 		return;
2638 	}
2639 
2640 	msg = fts_optimize_create_msg(FTS_MSG_DEL_TABLE, NULL);
2641 
2642 	/* We will wait on this event until signalled by the consumer. */
2643 	event = os_event_create();
2644 
2645 	remove = static_cast<fts_msg_del_t*>(
2646 		mem_heap_alloc(msg->heap, sizeof(*remove)));
2647 
2648 	remove->table = table;
2649 	remove->event = event;
2650 	msg->ptr = remove;
2651 
2652 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2653 
2654 	os_event_wait(event);
2655 
2656 	os_event_free(event);
2657 }
2658 
2659 /** Send sync fts cache for the table.
2660 @param[in]	table	table to sync */
2661 UNIV_INTERN
2662 void
fts_optimize_request_sync_table(dict_table_t * table)2663 fts_optimize_request_sync_table(
2664 	dict_table_t*	table)
2665 {
2666 	fts_msg_t*	msg;
2667 	table_id_t*	table_id;
2668 
2669 	/* if the optimize system not yet initialized, return */
2670 	if (!fts_optimize_wq) {
2671 		return;
2672 	}
2673 
2674 	/* FTS optimizer thread is already exited */
2675 	if (fts_opt_start_shutdown) {
2676 		ib_logf(IB_LOG_LEVEL_INFO,
2677 			"Try to sync table %s after FTS optimize"
2678 			" thread exiting.", table->name);
2679 		return;
2680 	}
2681 
2682 	msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
2683 
2684 	table_id = static_cast<table_id_t*>(
2685 		mem_heap_alloc(msg->heap, sizeof(table_id_t)));
2686 	*table_id = table->id;
2687 	msg->ptr = table_id;
2688 
2689 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
2690 }
2691 
2692 /**********************************************************************//**
2693 Find the slot for a particular table.
2694 @return slot if found else NULL. */
2695 static
2696 fts_slot_t*
fts_optimize_find_slot(ib_vector_t * tables,const dict_table_t * table)2697 fts_optimize_find_slot(
2698 /*===================*/
2699 	ib_vector_t*		tables,		/*!< in: vector of tables */
2700 	const dict_table_t*	table)		/*!< in: table to add */
2701 {
2702 	ulint		i;
2703 
2704 	for (i = 0; i < ib_vector_size(tables); ++i) {
2705 		fts_slot_t*	slot;
2706 
2707 		slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2708 
2709 		if (slot->table->id == table->id) {
2710 			return(slot);
2711 		}
2712 	}
2713 
2714 	return(NULL);
2715 }
2716 
2717 /**********************************************************************//**
2718 Start optimizing table. */
2719 static
2720 void
fts_optimize_start_table(ib_vector_t * tables,dict_table_t * table)2721 fts_optimize_start_table(
2722 /*=====================*/
2723 	ib_vector_t*		tables,		/*!< in/out: vector of tables */
2724 	dict_table_t*		table)		/*!< in: table to optimize */
2725 {
2726 	fts_slot_t*	slot;
2727 
2728 	slot = fts_optimize_find_slot(tables, table);
2729 
2730 	if (slot == NULL) {
2731 		ut_print_timestamp(stderr);
2732 		fprintf(stderr, " InnoDB: Error: table %s not registered "
2733 			"with the optimize thread.\n", table->name);
2734 	} else {
2735 		slot->last_run = 0;
2736 		slot->completed = 0;
2737 	}
2738 }
2739 
2740 /**********************************************************************//**
2741 Add the table to the vector if it doesn't already exist. */
2742 static
2743 ibool
fts_optimize_new_table(ib_vector_t * tables,dict_table_t * table)2744 fts_optimize_new_table(
2745 /*===================*/
2746 	ib_vector_t*	tables,			/*!< in/out: vector of tables */
2747 	dict_table_t*	table)			/*!< in: table to add */
2748 {
2749 	ulint		i;
2750 	fts_slot_t*	slot;
2751 	ulint		empty_slot = ULINT_UNDEFINED;
2752 
2753 	/* Search for duplicates, also find a free slot if one exists. */
2754 	for (i = 0; i < ib_vector_size(tables); ++i) {
2755 
2756 		slot = static_cast<fts_slot_t*>(
2757 			ib_vector_get(tables, i));
2758 
2759 		if (slot->state == FTS_STATE_EMPTY) {
2760 			empty_slot = i;
2761 		} else if (slot->table->id == table->id) {
2762 			/* Already exists in our optimize queue. */
2763 			ut_ad(slot->table_id = table->id);
2764 			return(FALSE);
2765 		}
2766 	}
2767 
2768 	/* Reuse old slot. */
2769 	if (empty_slot != ULINT_UNDEFINED) {
2770 
2771 		slot = static_cast<fts_slot_t*>(
2772 			ib_vector_get(tables, empty_slot));
2773 
2774 		ut_a(slot->state == FTS_STATE_EMPTY);
2775 
2776 	} else { /* Create a new slot. */
2777 
2778 		slot = static_cast<fts_slot_t*>(ib_vector_push(tables, NULL));
2779 	}
2780 
2781 	memset(slot, 0x0, sizeof(*slot));
2782 
2783 	slot->table = table;
2784 	slot->table_id = table->id;
2785 	slot->state = FTS_STATE_LOADED;
2786 	slot->interval_time = FTS_OPTIMIZE_INTERVAL_IN_SECS;
2787 
2788 	return(TRUE);
2789 }
2790 
2791 /**********************************************************************//**
2792 Remove the table from the vector if it exists. */
2793 static
2794 ibool
fts_optimize_del_table(ib_vector_t * tables,fts_msg_del_t * msg)2795 fts_optimize_del_table(
2796 /*===================*/
2797 	ib_vector_t*	tables,			/*!< in/out: vector of tables */
2798 	fts_msg_del_t*	msg)			/*!< in: table to delete */
2799 {
2800 	ulint		i;
2801 	dict_table_t*	table = msg->table;
2802 
2803 	for (i = 0; i < ib_vector_size(tables); ++i) {
2804 		fts_slot_t*	slot;
2805 
2806 		slot = static_cast<fts_slot_t*>(ib_vector_get(tables, i));
2807 
2808 		/* FIXME: Should we assert on this ? */
2809 		if (slot->state != FTS_STATE_EMPTY
2810 		    && slot->table->id == table->id) {
2811 
2812 			ut_print_timestamp(stderr);
2813 			fprintf(stderr, " InnoDB: FTS Optimize Removing "
2814 				"table %s\n", table->name);
2815 
2816 			slot->table = NULL;
2817 			slot->state = FTS_STATE_EMPTY;
2818 
2819 			return(TRUE);
2820 		}
2821 	}
2822 
2823 	return(FALSE);
2824 }
2825 
2826 /**********************************************************************//**
2827 Calculate how many of the registered tables need to be optimized.
2828 @return no. of tables to optimize */
2829 static
2830 ulint
fts_optimize_how_many(const ib_vector_t * tables)2831 fts_optimize_how_many(
2832 /*==================*/
2833 	const ib_vector_t*	tables)		/*!< in: registered tables
2834 						vector*/
2835 {
2836 	ulint		i;
2837 	ib_time_t	delta;
2838 	ulint		n_tables = 0;
2839 	ib_time_t	current_time;
2840 
2841 	current_time = ut_time();
2842 
2843 	for (i = 0; i < ib_vector_size(tables); ++i) {
2844 		const fts_slot_t*	slot;
2845 
2846 		slot = static_cast<const fts_slot_t*>(
2847 			ib_vector_get_const(tables, i));
2848 
2849 		switch (slot->state) {
2850 		case FTS_STATE_DONE:
2851 		case FTS_STATE_LOADED:
2852 			ut_a(slot->completed <= current_time);
2853 
2854 			delta = current_time - slot->completed;
2855 
2856 			/* Skip slots that have been optimized recently. */
2857 			if (delta >= slot->interval_time) {
2858 				++n_tables;
2859 			}
2860 			break;
2861 
2862 		case FTS_STATE_RUNNING:
2863 			ut_a(slot->last_run <= current_time);
2864 
2865 			delta = current_time - slot->last_run;
2866 
2867 			if (delta > slot->interval_time) {
2868 				++n_tables;
2869 			}
2870 			break;
2871 
2872 			/* Slots in a state other than the above
2873 			are ignored. */
2874 		case FTS_STATE_EMPTY:
2875 		case FTS_STATE_SUSPENDED:
2876 			break;
2877 		}
2878 
2879 	}
2880 
2881 	return(n_tables);
2882 }
2883 
2884 /**********************************************************************//**
2885 Check if the total memory used by all FTS table exceeds the maximum limit.
2886 @return true if a sync is needed, false otherwise */
2887 static
2888 bool
fts_is_sync_needed(const ib_vector_t * tables)2889 fts_is_sync_needed(
2890 /*===============*/
2891 	const ib_vector_t*	tables)		/*!< in: registered tables
2892 						vector*/
2893 {
2894 	ulint		total_memory = 0;
2895 	double		time_diff = difftime(ut_time(), last_check_sync_time);
2896 
2897 	if (fts_need_sync || time_diff < 5) {
2898 		return(false);
2899 	}
2900 
2901 	last_check_sync_time = ut_time();
2902 
2903 	for (ulint i = 0; i < ib_vector_size(tables); ++i) {
2904 		const fts_slot_t*	slot;
2905 
2906 		slot = static_cast<const fts_slot_t*>(
2907 			ib_vector_get_const(tables, i));
2908 
2909 		if (slot->state != FTS_STATE_EMPTY && slot->table
2910 		    && slot->table->fts) {
2911 			total_memory += slot->table->fts->cache->total_size;
2912 		}
2913 
2914 		if (total_memory > fts_max_total_cache_size) {
2915 			return(true);
2916 		}
2917 	}
2918 
2919 	return(false);
2920 }
2921 
2922 #if 0
2923 /*********************************************************************//**
2924 Check whether a table needs to be optimized. */
2925 static
2926 void
2927 fts_optimize_need_sync(
2928 /*===================*/
2929 	ib_vector_t*	tables)	/*!< in: list of tables */
2930 {
2931 	dict_table_t*	table = NULL;
2932 	fts_slot_t*	slot;
2933 	ulint		num_table = ib_vector_size(tables);
2934 
2935 	if (!num_table) {
2936 		return;
2937 	}
2938 
2939 	if (fts_optimize_sync_iterator >= num_table) {
2940 		fts_optimize_sync_iterator = 0;
2941 	}
2942 
2943 	slot = ib_vector_get(tables, fts_optimize_sync_iterator);
2944 	table = slot->table;
2945 
2946 	if (!table) {
2947 		return;
2948 	}
2949 
2950 	ut_ad(table->fts);
2951 
2952 	if (table->fts->cache) {
2953 		ulint	deleted = table->fts->cache->deleted;
2954 
2955 		if (table->fts->cache->added
2956 		    >= fts_optimize_add_threshold) {
2957 			fts_sync_table(table);
2958 		} else if (deleted >= fts_optimize_delete_threshold) {
2959 			fts_optimize_do_table(table);
2960 
2961 			mutex_enter(&table->fts->cache->deleted_lock);
2962 			table->fts->cache->deleted -= deleted;
2963 			mutex_exit(&table->fts->cache->deleted_lock);
2964 		}
2965 	}
2966 
2967 	fts_optimize_sync_iterator++;
2968 
2969 	return;
2970 }
2971 #endif
2972 
2973 /** Sync fts cache of a table
2974 @param[in]	table_id	table id */
2975 void
fts_optimize_sync_table(table_id_t table_id)2976 fts_optimize_sync_table(
2977 	table_id_t	table_id)
2978 {
2979 	dict_table_t*   table = NULL;
2980 
2981 	table = dict_table_open_on_id(table_id, FALSE, DICT_TABLE_OP_NORMAL);
2982 
2983 	if (table) {
2984 		if (dict_table_has_fts_index(table) && table->fts->cache) {
2985 			fts_sync_table(table, true, false, true);
2986 		}
2987 
2988 		dict_table_close(table, FALSE, FALSE);
2989 	}
2990 }
2991 
2992 /**********************************************************************//**
2993 Optimize all FTS tables.
2994 @return Dummy return */
2995 UNIV_INTERN
2996 os_thread_ret_t
fts_optimize_thread(void * arg)2997 fts_optimize_thread(
2998 /*================*/
2999 	void*		arg)			/*!< in: work queue*/
3000 {
3001 	mem_heap_t*	heap;
3002 	ib_vector_t*	tables;
3003 	ib_alloc_t*	heap_alloc;
3004 	ulint		current = 0;
3005 	ibool		done = FALSE;
3006 	ulint		n_tables = 0;
3007 	os_event_t	exit_event = 0;
3008 	ulint		n_optimize = 0;
3009 	ib_wqueue_t*	wq = (ib_wqueue_t*) arg;
3010 
3011 	ut_ad(!srv_read_only_mode);
3012 	my_thread_init();
3013 
3014 	heap = mem_heap_create(sizeof(dict_table_t*) * 64);
3015 	heap_alloc = ib_heap_allocator_create(heap);
3016 
3017 	tables = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
3018 
3019 	while(!done && srv_shutdown_state == SRV_SHUTDOWN_NONE) {
3020 
3021 		/* If there is no message in the queue and we have tables
3022 		to optimize then optimize the tables. */
3023 
3024 		if (!done
3025 		    && ib_wqueue_is_empty(wq)
3026 		    && n_tables > 0
3027 		    && n_optimize > 0) {
3028 
3029 			fts_slot_t*	slot;
3030 
3031 			ut_a(ib_vector_size(tables) > 0);
3032 
3033 			slot = static_cast<fts_slot_t*>(
3034 				ib_vector_get(tables, current));
3035 
3036 			/* Handle the case of empty slots. */
3037 			if (slot->state != FTS_STATE_EMPTY) {
3038 
3039 				slot->state = FTS_STATE_RUNNING;
3040 
3041 				fts_optimize_table_bk(slot);
3042 			}
3043 
3044 			++current;
3045 
3046 			/* Wrap around the counter. */
3047 			if (current >= ib_vector_size(tables)) {
3048 				n_optimize = fts_optimize_how_many(tables);
3049 
3050 				current = 0;
3051 			}
3052 
3053 		} else if (n_optimize == 0 || !ib_wqueue_is_empty(wq)) {
3054 			fts_msg_t*	msg;
3055 
3056 			msg = static_cast<fts_msg_t*>(
3057 				ib_wqueue_timedwait(wq,
3058 						    FTS_QUEUE_WAIT_IN_USECS));
3059 
3060 			/* Timeout ? */
3061 			if (msg == NULL) {
3062 				if (fts_is_sync_needed(tables)) {
3063 					fts_need_sync = true;
3064 				}
3065 
3066 				continue;
3067 			}
3068 
3069 			switch (msg->type) {
3070 			case FTS_MSG_START:
3071 				break;
3072 
3073 			case FTS_MSG_PAUSE:
3074 				break;
3075 
3076 			case FTS_MSG_STOP:
3077 				done = TRUE;
3078 				exit_event = (os_event_t) msg->ptr;
3079 				break;
3080 
3081 			case FTS_MSG_ADD_TABLE:
3082 				ut_a(!done);
3083 				if (fts_optimize_new_table(
3084 					tables,
3085 					static_cast<dict_table_t*>(
3086 					msg->ptr))) {
3087 					++n_tables;
3088 				}
3089 				break;
3090 
3091 			case FTS_MSG_OPTIMIZE_TABLE:
3092 				if (!done) {
3093 					fts_optimize_start_table(
3094 						tables,
3095 						static_cast<dict_table_t*>(
3096 						msg->ptr));
3097 				}
3098 				break;
3099 
3100 			case FTS_MSG_DEL_TABLE:
3101 				if (fts_optimize_del_table(
3102 					tables, static_cast<fts_msg_del_t*>(
3103 						msg->ptr))) {
3104 					--n_tables;
3105 				}
3106 
3107 				/* Signal the producer that we have
3108 				removed the table. */
3109 				os_event_set(
3110 					((fts_msg_del_t*) msg->ptr)->event);
3111 				break;
3112 
3113 			case FTS_MSG_SYNC_TABLE:
3114 				fts_optimize_sync_table(
3115 					*static_cast<table_id_t*>(msg->ptr));
3116 				break;
3117 
3118 			default:
3119 				ut_error;
3120 			}
3121 
3122 			mem_heap_free(msg->heap);
3123 
3124 			if (!done) {
3125 				n_optimize = fts_optimize_how_many(tables);
3126 			} else {
3127 				n_optimize = 0;
3128 			}
3129 		}
3130 	}
3131 
3132 	/* Server is being shutdown, sync the data from FTS cache to disk
3133 	if needed */
3134 	if (n_tables > 0) {
3135 		ulint	i;
3136 
3137 		for (i = 0; i < ib_vector_size(tables); i++) {
3138 			fts_slot_t*	slot;
3139 
3140 			slot = static_cast<fts_slot_t*>(
3141 				ib_vector_get(tables, i));
3142 
3143 			if (slot->state != FTS_STATE_EMPTY) {
3144 				fts_optimize_sync_table(slot->table_id);
3145 			}
3146 		}
3147 	}
3148 
3149 	ib_vector_free(tables);
3150 
3151 	ib_logf(IB_LOG_LEVEL_INFO, "FTS optimize thread exiting.");
3152 
3153 	os_event_set(exit_event);
3154 	my_thread_end();
3155 
3156 	/* We count the number of threads in os_thread_exit(). A created
3157 	thread should always use that to exit and not use return() to exit. */
3158 	os_thread_exit(NULL);
3159 
3160 	OS_THREAD_DUMMY_RETURN;
3161 }
3162 
3163 /**********************************************************************//**
3164 Startup the optimize thread and create the work queue. */
3165 UNIV_INTERN
3166 void
fts_optimize_init(void)3167 fts_optimize_init(void)
3168 /*===================*/
3169 {
3170 	ut_ad(!srv_read_only_mode);
3171 
3172 	/* For now we only support one optimize thread. */
3173 	ut_a(fts_optimize_wq == NULL);
3174 
3175 	fts_optimize_wq = ib_wqueue_create();
3176 	ut_a(fts_optimize_wq != NULL);
3177 	last_check_sync_time = ut_time();
3178 
3179 	os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
3180 }
3181 
3182 /**********************************************************************//**
3183 Check whether the work queue is initialized.
3184 @return TRUE if optimze queue is initialized. */
3185 UNIV_INTERN
3186 ibool
fts_optimize_is_init(void)3187 fts_optimize_is_init(void)
3188 /*======================*/
3189 {
3190 	return(fts_optimize_wq != NULL);
3191 }
3192 
3193 /**********************************************************************//**
3194 Signal the optimize thread to prepare for shutdown. */
3195 UNIV_INTERN
3196 void
fts_optimize_start_shutdown(void)3197 fts_optimize_start_shutdown(void)
3198 /*=============================*/
3199 {
3200 	ut_ad(!srv_read_only_mode);
3201 
3202 	fts_msg_t*	msg;
3203 	os_event_t	event;
3204 
3205 	/* If there is an ongoing activity on dictionary, such as
3206 	srv_master_evict_from_table_cache(), wait for it */
3207 	dict_mutex_enter_for_mysql();
3208 
3209 	/* Tells FTS optimizer system that we are exiting from
3210 	optimizer thread, message send their after will not be
3211 	processed */
3212 	fts_opt_start_shutdown = true;
3213 	dict_mutex_exit_for_mysql();
3214 
3215 	/* We tell the OPTIMIZE thread to switch to state done, we
3216 	can't delete the work queue here because the add thread needs
3217 	deregister the FTS tables. */
3218 	event = os_event_create();
3219 
3220 	msg = fts_optimize_create_msg(FTS_MSG_STOP, NULL);
3221 	msg->ptr = event;
3222 
3223 	ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
3224 
3225 	os_event_wait(event);
3226 	os_event_free(event);
3227 
3228 	ib_wqueue_free(fts_optimize_wq);
3229 
3230 }
3231 
3232 /**********************************************************************//**
3233 Reset the work queue. */
3234 UNIV_INTERN
3235 void
fts_optimize_end(void)3236 fts_optimize_end(void)
3237 /*==================*/
3238 {
3239 	ut_ad(!srv_read_only_mode);
3240 
3241 	// FIXME: Potential race condition here: We should wait for
3242 	// the optimize thread to confirm shutdown.
3243 	fts_optimize_wq = NULL;
3244 }
3245