1 /*****************************************************************************
2 
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5 
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9 
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17 
18 *****************************************************************************/
19 
20 /**************************************************//**
21 @file row/row0ftsort.cc
22 Create Full Text Index with (parallel) merge sort
23 
24 Created 10/13/2010 Jimmy Yang
25 *******************************************************/
26 
27 #include "row0ftsort.h"
28 #include "dict0dict.h"
29 #include "row0merge.h"
30 #include "row0row.h"
31 #include "btr0cur.h"
32 #include "fts0plugin.h"
33 #include "log0crypt.h"
34 
35 /** Read the next record to buffer N.
36 @param N index into array of merge info structure */
37 #define ROW_MERGE_READ_GET_NEXT(N)					\
38 	do {								\
39 		b[N] = row_merge_read_rec(				\
40 			block[N], buf[N], b[N], index,			\
41 			fd[N], &foffs[N], &mrec[N], offsets[N],		\
42 			crypt_block[N], space);				\
43 		if (UNIV_UNLIKELY(!b[N])) {				\
44 			if (mrec[N]) {					\
45 				goto exit;				\
46 			}						\
47 		}							\
48 	} while (0)
49 
50 /** Parallel sort degree */
51 ulong	fts_sort_pll_degree	= 2;
52 
53 /*********************************************************************//**
54 Create a temporary "fts sort index" used to merge sort the
55 tokenized doc string. The index has three "fields":
56 
57 1) Tokenized word,
58 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
59 integer value)
60 3) Word's position in original doc.
61 
62 @see fts_create_one_index_table()
63 
64 @return dict_index_t structure for the fts sort index */
65 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,dict_table_t * table,ibool * opt_doc_id_size)66 row_merge_create_fts_sort_index(
67 /*============================*/
68 	dict_index_t*	index,	/*!< in: Original FTS index
69 				based on which this sort index
70 				is created */
71 	dict_table_t*	table,	/*!< in,out: table that FTS index
72 				is being created on */
73 	ibool*		opt_doc_id_size)
74 				/*!< out: whether to use 4 bytes
75 				instead of 8 bytes integer to
76 				store Doc ID during sort */
77 {
78 	dict_index_t*   new_index;
79 	dict_field_t*   field;
80 	dict_field_t*   idx_field;
81 	CHARSET_INFO*	charset;
82 
83 	// FIXME: This name shouldn't be hard coded here.
84 	new_index = dict_mem_index_create(table, "tmp_fts_idx", DICT_FTS, 3);
85 
86 	new_index->id = index->id;
87 	new_index->n_uniq = FTS_NUM_FIELDS_SORT;
88 	new_index->n_def = FTS_NUM_FIELDS_SORT;
89 	new_index->cached = TRUE;
90 	new_index->parser = index->parser;
91 
92 	idx_field = dict_index_get_nth_field(index, 0);
93 	charset = fts_index_get_charset(index);
94 
95 	/* The first field is on the Tokenized Word */
96 	field = dict_index_get_nth_field(new_index, 0);
97 	field->name = NULL;
98 	field->prefix_len = 0;
99 	field->col = static_cast<dict_col_t*>(
100 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
101 	field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
102 	field->col->mtype = charset == &my_charset_latin1
103 		? DATA_VARCHAR : DATA_VARMYSQL;
104 	field->col->mbminlen = idx_field->col->mbminlen;
105 	field->col->mbmaxlen = idx_field->col->mbmaxlen;
106 	field->col->len = static_cast<uint16_t>(
107 		HA_FT_MAXCHARLEN * field->col->mbmaxlen);
108 
109 	field->fixed_len = 0;
110 
111 	/* Doc ID */
112 	field = dict_index_get_nth_field(new_index, 1);
113 	field->name = NULL;
114 	field->prefix_len = 0;
115 	field->col = static_cast<dict_col_t*>(
116 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
117 	field->col->mtype = DATA_INT;
118 	*opt_doc_id_size = FALSE;
119 
120 	/* Check whether we can use 4 bytes instead of 8 bytes integer
121 	field to hold the Doc ID, thus reduce the overall sort size */
122 	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
123 		/* If Doc ID column is being added by this create
124 		index, then just check the number of rows in the table */
125 		if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
126 			*opt_doc_id_size = TRUE;
127 		}
128 	} else {
129 		doc_id_t	max_doc_id;
130 
131 		/* If the Doc ID column is supplied by user, then
132 		check the maximum Doc ID in the table */
133 		max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
134 
135 		if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
136 			*opt_doc_id_size = TRUE;
137 		}
138 	}
139 
140 	if (*opt_doc_id_size) {
141 		field->col->len = sizeof(ib_uint32_t);
142 		field->fixed_len = sizeof(ib_uint32_t);
143 	} else {
144 		field->col->len = FTS_DOC_ID_LEN;
145 		field->fixed_len = FTS_DOC_ID_LEN;
146 	}
147 
148 	field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
149 
150 	/* The third field is on the word's position in the original doc */
151 	field = dict_index_get_nth_field(new_index, 2);
152 	field->name = NULL;
153 	field->prefix_len = 0;
154 	field->col = static_cast<dict_col_t*>(
155 		mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
156 	field->col->mtype = DATA_INT;
157 	field->col->len = 4 ;
158 	field->fixed_len = 4;
159 	field->col->prtype = DATA_NOT_NULL;
160 
161 	return(new_index);
162 }
163 
164 /** Initialize FTS parallel sort structures.
165 @param[in]	trx		transaction
166 @param[in,out]	dup		descriptor of FTS index being created
167 @param[in,out]	new_table	table where indexes are created
168 @param[in]	opt_doc_id_size	whether to use 4 bytes instead of 8 bytes
169 				integer to store Doc ID during sort
170 @param[in]	old_zip_size	page size of the old table during alter
171 @param[out]	psort		parallel sort info to be instantiated
172 @param[out]	merge		parallel merge info to be instantiated
173 @return true if all successful */
174 bool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,dict_table_t * new_table,bool opt_doc_id_size,ulint old_zip_size,fts_psort_t ** psort,fts_psort_t ** merge)175 row_fts_psort_info_init(
176 	trx_t*		trx,
177 	row_merge_dup_t*dup,
178 	dict_table_t*	new_table,
179 	bool		opt_doc_id_size,
180 	ulint		old_zip_size,
181 	fts_psort_t**	psort,
182 	fts_psort_t**	merge)
183 {
184 	ulint			i;
185 	ulint			j;
186 	fts_psort_common_t*	common_info = NULL;
187 	fts_psort_t*		psort_info = NULL;
188 	fts_psort_t*		merge_info = NULL;
189 	ulint			block_size;
190 	ibool			ret = TRUE;
191 	bool			encrypted = false;
192 	ut_ad(ut_is_2pow(old_zip_size));
193 
194 	block_size = 3 * srv_sort_buf_size;
195 
196 	*psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey(
197 		 fts_sort_pll_degree * sizeof *psort_info));
198 
199 	if (!psort_info) {
200 		ut_free(dup);
201 		return(FALSE);
202 	}
203 
204 	/* Common Info for all sort threads */
205 	common_info = static_cast<fts_psort_common_t*>(
206 		ut_malloc_nokey(sizeof *common_info));
207 
208 	if (!common_info) {
209 		ut_free(dup);
210 		ut_free(psort_info);
211 		return(FALSE);
212 	}
213 
214 	common_info->dup = dup;
215 	common_info->new_table = new_table;
216 	common_info->old_zip_size = old_zip_size;
217 	common_info->trx = trx;
218 	common_info->all_info = psort_info;
219 	common_info->sort_event = os_event_create(0);
220 	common_info->opt_doc_id_size = opt_doc_id_size;
221 
222 	if (log_tmp_is_encrypted()) {
223 		encrypted = true;
224 	}
225 
226 	ut_ad(trx->mysql_thd != NULL);
227 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
228 	/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
229 	each parallel sort thread. Each "sort bucket" holds records for
230 	a particular "FTS index partition" */
231 	for (j = 0; j < fts_sort_pll_degree; j++) {
232 
233 		UT_LIST_INIT(
234 			psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list);
235 
236 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
237 
238 			psort_info[j].merge_file[i] =
239 				 static_cast<merge_file_t*>(
240 					ut_zalloc_nokey(sizeof(merge_file_t)));
241 
242 			if (!psort_info[j].merge_file[i]) {
243 				ret = FALSE;
244 				goto func_exit;
245 			}
246 
247 			psort_info[j].merge_buf[i] = row_merge_buf_create(
248 				dup->index);
249 
250 			if (row_merge_file_create(psort_info[j].merge_file[i],
251 						  path) == OS_FILE_CLOSED) {
252 				goto func_exit;
253 			}
254 
255 			/* Need to align memory for O_DIRECT write */
256 			psort_info[j].merge_block[i] =
257 				static_cast<row_merge_block_t*>(
258 					aligned_malloc(block_size, 1024));
259 
260 			if (!psort_info[j].merge_block[i]) {
261 				ret = FALSE;
262 				goto func_exit;
263 			}
264 
265 			/* If tablespace is encrypted, allocate additional buffer for
266 			encryption/decryption. */
267 			if (encrypted) {
268 				/* Need to align memory for O_DIRECT write */
269 				psort_info[j].crypt_block[i] =
270 					static_cast<row_merge_block_t*>(
271 						aligned_malloc(block_size,
272 							       1024));
273 
274 				if (!psort_info[j].crypt_block[i]) {
275 					ret = FALSE;
276 					goto func_exit;
277 				}
278 			} else {
279 				psort_info[j].crypt_block[i] = NULL;
280 			}
281 		}
282 
283 		psort_info[j].child_status = 0;
284 		psort_info[j].state = 0;
285 		psort_info[j].psort_common = common_info;
286 		psort_info[j].error = DB_SUCCESS;
287 		psort_info[j].memory_used = 0;
288 		mutex_create(LATCH_ID_FTS_PLL_TOKENIZE, &psort_info[j].mutex);
289 	}
290 
291 	/* Initialize merge_info structures parallel merge and insert
292 	into auxiliary FTS tables (FTS_INDEX_TABLE) */
293 	*merge = merge_info = static_cast<fts_psort_t*>(
294 		ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info));
295 
296 	for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
297 
298 		merge_info[j].child_status = 0;
299 		merge_info[j].state = 0;
300 		merge_info[j].psort_common = common_info;
301 	}
302 
303 func_exit:
304 	if (!ret) {
305 		row_fts_psort_info_destroy(psort_info, merge_info);
306 	}
307 
308 	return(ret);
309 }
310 /*********************************************************************//**
311 Clean up and deallocate FTS parallel sort structures, and close the
312 merge sort files  */
313 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)314 row_fts_psort_info_destroy(
315 /*=======================*/
316 	fts_psort_t*	psort_info,	/*!< parallel sort info */
317 	fts_psort_t*	merge_info)	/*!< parallel merge info */
318 {
319 	ulint	i;
320 	ulint	j;
321 
322 	if (psort_info) {
323 		for (j = 0; j < fts_sort_pll_degree; j++) {
324 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
325 				if (psort_info[j].merge_file[i]) {
326 					row_merge_file_destroy(
327 						psort_info[j].merge_file[i]);
328 				}
329 
330 				aligned_free(psort_info[j].merge_block[i]);
331 				ut_free(psort_info[j].merge_file[i]);
332 				aligned_free(psort_info[j].crypt_block[i]);
333 			}
334 
335 			mutex_free(&psort_info[j].mutex);
336 		}
337 
338 		os_event_destroy(merge_info[0].psort_common->sort_event);
339 		ut_free(merge_info[0].psort_common->dup);
340 		ut_free(merge_info[0].psort_common);
341 		ut_free(psort_info);
342 	}
343 
344 	ut_free(merge_info);
345 }
346 /*********************************************************************//**
347 Free up merge buffers when merge sort is done */
348 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)349 row_fts_free_pll_merge_buf(
350 /*=======================*/
351 	fts_psort_t*	psort_info)	/*!< in: parallel sort info */
352 {
353 	ulint	j;
354 	ulint	i;
355 
356 	if (!psort_info) {
357 		return;
358 	}
359 
360 	for (j = 0; j < fts_sort_pll_degree; j++) {
361 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
362 			row_merge_buf_free(psort_info[j].merge_buf[i]);
363 		}
364 	}
365 
366 	return;
367 }
368 
369 /*********************************************************************//**
370 FTS plugin parser 'myql_add_word' callback function for row merge.
371 Refer to 'st_mysql_ftparser_param' for more detail.
372 @return always returns 0 */
373 static
374 int
row_merge_fts_doc_add_word_for_parser(MYSQL_FTPARSER_PARAM * param,const char * word,int word_len,MYSQL_FTPARSER_BOOLEAN_INFO * boolean_info)375 row_merge_fts_doc_add_word_for_parser(
376 /*==================================*/
377 	MYSQL_FTPARSER_PARAM	*param,		/* in: parser paramter */
378 	const char		*word,		/* in: token word */
379 	int			word_len,	/* in: word len */
380 	MYSQL_FTPARSER_BOOLEAN_INFO*	boolean_info)	/* in: boolean info */
381 {
382 	fts_string_t		str;
383 	fts_tokenize_ctx_t*	t_ctx;
384 	row_fts_token_t*	fts_token;
385 	byte*			ptr;
386 
387 	ut_ad(param);
388 	ut_ad(param->mysql_ftparam);
389 	ut_ad(word);
390 	ut_ad(boolean_info);
391 
392 	t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam);
393 	ut_ad(t_ctx);
394 
395 	str.f_str = (byte*)(word);
396 	str.f_len = ulint(word_len);
397 	str.f_n_char = fts_get_token_size(
398 		(CHARSET_INFO*)param->cs, word, ulint(word_len));
399 
400 	/* JAN: TODO: MySQL 5.7 FTS
401 	ut_ad(boolean_info->position >= 0);
402 	*/
403 
404 	ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t)
405 			+ sizeof(fts_string_t) + str.f_len));
406 	fts_token = reinterpret_cast<row_fts_token_t*>(ptr);
407 	fts_token->text = reinterpret_cast<fts_string_t*>(
408 			ptr + sizeof(row_fts_token_t));
409 	fts_token->text->f_str = static_cast<byte*>(
410 			ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t));
411 
412 	fts_token->text->f_len = str.f_len;
413 	fts_token->text->f_n_char = str.f_n_char;
414 	memcpy(fts_token->text->f_str, str.f_str, str.f_len);
415 
416 	/* JAN: TODO: MySQL 5.7 FTS
417 	fts_token->position = boolean_info->position;
418 	*/
419 
420 	/* Add token to list */
421 	UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token);
422 
423 	return(0);
424 }
425 
426 /*********************************************************************//**
427 Tokenize by fts plugin parser */
428 static
429 void
row_merge_fts_doc_tokenize_by_parser(fts_doc_t * doc,st_mysql_ftparser * parser,fts_tokenize_ctx_t * t_ctx)430 row_merge_fts_doc_tokenize_by_parser(
431 /*=================================*/
432 	fts_doc_t*		doc,	/* in: doc to tokenize */
433 	st_mysql_ftparser*	parser,	/* in: plugin parser instance */
434 	fts_tokenize_ctx_t*	t_ctx)	/* in/out: tokenize ctx instance */
435 {
436 	MYSQL_FTPARSER_PARAM	param;
437 
438 	ut_a(parser);
439 
440 	/* Set paramters for param */
441 	param.mysql_parse = fts_tokenize_document_internal;
442 	param.mysql_add_word = row_merge_fts_doc_add_word_for_parser;
443 	param.mysql_ftparam = t_ctx;
444 	param.cs = doc->charset;
445 	param.doc = reinterpret_cast<char*>(doc->text.f_str);
446 	param.length = static_cast<int>(doc->text.f_len);
447 	param.mode= MYSQL_FTPARSER_SIMPLE_MODE;
448 
449 	PARSER_INIT(parser, &param);
450 	/* We assume parse returns successfully here. */
451 	parser->parse(&param);
452 	PARSER_DEINIT(parser, &param);
453 }
454 
455 /*********************************************************************//**
456 Tokenize incoming text data and add to the sort buffer.
457 @see row_merge_buf_encode()
458 @return	TRUE if the record passed, FALSE if out of space */
459 static
460 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)461 row_merge_fts_doc_tokenize(
462 /*=======================*/
463 	row_merge_buf_t**	sort_buf,	/*!< in/out: sort buffer */
464 	doc_id_t		doc_id,		/*!< in: Doc ID */
465 	fts_doc_t*		doc,		/*!< in: Doc to be tokenized */
466 	merge_file_t**		merge_file,	/*!< in/out: merge file */
467 	ibool			opt_doc_id_size,/*!< in: whether to use 4 bytes
468 						instead of 8 bytes integer to
469 						store Doc ID during sort*/
470 	fts_tokenize_ctx_t*	t_ctx)          /*!< in/out: tokenize context */
471 {
472 	ulint		inc = 0;
473 	fts_string_t	str;
474 	ulint		len;
475 	row_merge_buf_t* buf;
476 	dfield_t*	field;
477 	fts_string_t	t_str;
478 	ibool		buf_full = FALSE;
479 	byte		str_buf[FTS_MAX_WORD_LEN + 1];
480 	ulint		data_size[FTS_NUM_AUX_INDEX];
481 	ulint		n_tuple[FTS_NUM_AUX_INDEX];
482 	st_mysql_ftparser*	parser;
483 
484 	t_str.f_n_char = 0;
485 	t_ctx->buf_used = 0;
486 
487 	memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
488 	memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
489 
490 	parser = sort_buf[0]->index->parser;
491 
492 	/* Tokenize the data and add each word string, its corresponding
493 	doc id and position to sort buffer */
494 	while (t_ctx->processed_len < doc->text.f_len) {
495 		ulint		idx = 0;
496 		ulint		cur_len;
497 		doc_id_t	write_doc_id;
498 		row_fts_token_t* fts_token = NULL;
499 
500 		if (parser != NULL) {
501 			if (t_ctx->processed_len == 0) {
502 				UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list);
503 
504 				/* Parse the whole doc and cache tokens */
505 				row_merge_fts_doc_tokenize_by_parser(doc,
506 					parser, t_ctx);
507 
508 				/* Just indictate we have parsed all the word */
509 				t_ctx->processed_len += 1;
510 			}
511 
512 			/* Then get a token */
513 			fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list);
514 			if (fts_token) {
515 				str.f_len = fts_token->text->f_len;
516 				str.f_n_char = fts_token->text->f_n_char;
517 				str.f_str = fts_token->text->f_str;
518 			} else {
519 				ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0);
520 				/* Reach the end of the list */
521 				t_ctx->processed_len = doc->text.f_len;
522 				break;
523 			}
524 		} else {
525 			inc = innobase_mysql_fts_get_token(
526 				doc->charset,
527 				doc->text.f_str + t_ctx->processed_len,
528 				doc->text.f_str + doc->text.f_len, &str);
529 
530 			ut_a(inc > 0);
531 		}
532 
533 		/* Ignore string whose character number is less than
534 		"fts_min_token_size" or more than "fts_max_token_size" */
535 		if (!fts_check_token(&str, NULL, NULL)) {
536 			if (parser != NULL) {
537 				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
538 				ut_free(fts_token);
539 			} else {
540 				t_ctx->processed_len += inc;
541 			}
542 
543 			continue;
544 		}
545 
546 		t_str.f_len = innobase_fts_casedn_str(
547 			doc->charset, (char*) str.f_str, str.f_len,
548 			(char*) &str_buf, FTS_MAX_WORD_LEN + 1);
549 
550 		t_str.f_str = (byte*) &str_buf;
551 
552 		/* if "cached_stopword" is defined, ignore words in the
553 		stopword list */
554 		if (!fts_check_token(&str, t_ctx->cached_stopword,
555 				     doc->charset)) {
556 			if (parser != NULL) {
557 				UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
558 				ut_free(fts_token);
559 			} else {
560 				t_ctx->processed_len += inc;
561 			}
562 
563 			continue;
564 		}
565 
566 		/* There are FTS_NUM_AUX_INDEX auxiliary tables, find
567 		out which sort buffer to put this word record in */
568 		t_ctx->buf_used = fts_select_index(
569 			doc->charset, t_str.f_str, t_str.f_len);
570 
571 		buf = sort_buf[t_ctx->buf_used];
572 
573 		ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
574 		idx = t_ctx->buf_used;
575 
576 		mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
577 
578 		field = mtuple->fields = static_cast<dfield_t*>(
579 			mem_heap_alloc(buf->heap,
580 				       FTS_NUM_FIELDS_SORT * sizeof *field));
581 
582 		/* The first field is the tokenized word */
583 		dfield_set_data(field, t_str.f_str, t_str.f_len);
584 		len = dfield_get_len(field);
585 
586 		dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type);
587 		field->type.prtype |= DATA_NOT_NULL;
588 		ut_ad(len <= field->type.len);
589 
590 		/* For the temporary file, row_merge_buf_encode() uses
591 		1 byte for representing the number of extra_size bytes.
592 		This number will always be 1, because for this 3-field index
593 		consisting of one variable-size column, extra_size will always
594 		be 1 or 2, which can be encoded in one byte.
595 
596 		The extra_size is 1 byte if the length of the
597 		variable-length column is less than 128 bytes or the
598 		maximum length is less than 256 bytes. */
599 
600 		/* One variable length column, word with its lenght less than
601 		fts_max_token_size, add one extra size and one extra byte.
602 
603 		Since the max length for FTS token now is larger than 255,
604 		so we will need to signify length byte itself, so only 1 to 128
605 		bytes can be used for 1 bytes, larger than that 2 bytes. */
606 		if (len < 128 || field->type.len < 256) {
607 			/* Extra size is one byte. */
608 			cur_len = 2 + len;
609 		} else {
610 			/* Extra size is two bytes. */
611 			cur_len = 3 + len;
612 		}
613 
614 		dfield_dup(field, buf->heap);
615 		field++;
616 
617 		/* The second field is the Doc ID */
618 
619 		ib_uint32_t	doc_id_32_bit;
620 
621 		if (!opt_doc_id_size) {
622 			fts_write_doc_id((byte*) &write_doc_id, doc_id);
623 
624 			dfield_set_data(
625 				field, &write_doc_id, sizeof(write_doc_id));
626 		} else {
627 			mach_write_to_4(
628 				(byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
629 
630 			dfield_set_data(
631 				field, &doc_id_32_bit, sizeof(doc_id_32_bit));
632 		}
633 
634 		len = field->len;
635 		ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
636 
637 		field->type.mtype = DATA_INT;
638 		field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
639 		field->type.len = static_cast<uint16_t>(field->len);
640 		field->type.mbminlen = 0;
641 		field->type.mbmaxlen = 0;
642 
643 		cur_len += len;
644 		dfield_dup(field, buf->heap);
645 
646 		++field;
647 
648 		/* The third field is the position.
649 		MySQL 5.7 changed the fulltext parser plugin interface
650 		by adding MYSQL_FTPARSER_BOOLEAN_INFO::position.
651 		Below we assume that the field is always 0. */
652 		ulint	pos = t_ctx->init_pos;
653 		byte		position[4];
654 		if (parser == NULL) {
655 			pos += t_ctx->processed_len + inc - str.f_len;
656 		}
657 		len = 4;
658 		mach_write_to_4(position, pos);
659 		dfield_set_data(field, &position, len);
660 
661 		field->type.mtype = DATA_INT;
662 		field->type.prtype = DATA_NOT_NULL;
663 		field->type.len = 4;
664 		field->type.mbminlen = 0;
665 		field->type.mbmaxlen = 0;
666 		cur_len += len;
667 		dfield_dup(field, buf->heap);
668 
669 		/* Reserve one byte for the end marker of row_merge_block_t */
670 		if (buf->total_size + data_size[idx] + cur_len
671 		    >= srv_sort_buf_size - 1) {
672 
673 			buf_full = TRUE;
674 			break;
675 		}
676 
677 		/* Increment the number of tuples */
678 		n_tuple[idx]++;
679 		if (parser != NULL) {
680 			UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
681 			ut_free(fts_token);
682 		} else {
683 			t_ctx->processed_len += inc;
684 		}
685 		data_size[idx] += cur_len;
686 	}
687 
688 	/* Update the data length and the number of new word tuples
689 	added in this round of tokenization */
690 	for (ulint i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
691 		/* The computation of total_size below assumes that no
692 		delete-mark flags will be stored and that all fields
693 		are NOT NULL and fixed-length. */
694 
695 		sort_buf[i]->total_size += data_size[i];
696 
697 		sort_buf[i]->n_tuples += n_tuple[i];
698 
699 		merge_file[i]->n_rec += n_tuple[i];
700 		t_ctx->rows_added[i] += n_tuple[i];
701 	}
702 
703 	if (!buf_full) {
704 		/* we pad one byte between text accross two fields */
705 		t_ctx->init_pos += doc->text.f_len + 1;
706 	}
707 
708 	return(!buf_full);
709 }
710 
711 /*********************************************************************//**
712 Get next doc item from fts_doc_list */
713 UNIV_INLINE
714 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)715 row_merge_fts_get_next_doc_item(
716 /*============================*/
717 	fts_psort_t*		psort_info,	/*!< in: psort_info */
718 	fts_doc_item_t**	doc_item)	/*!< in/out: doc item */
719 {
720 	if (*doc_item != NULL) {
721 		ut_free(*doc_item);
722 	}
723 
724 	mutex_enter(&psort_info->mutex);
725 
726 	*doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
727 	if (*doc_item != NULL) {
728 		UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item);
729 
730 		ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
731 		      + (*doc_item)->field->len);
732 		psort_info->memory_used -= sizeof(fts_doc_item_t)
733 			+ (*doc_item)->field->len;
734 	}
735 
736 	mutex_exit(&psort_info->mutex);
737 }
738 
739 /*********************************************************************//**
740 Function performs parallel tokenization of the incoming doc strings.
741 It also performs the initial in memory sort of the parsed records.
742 */
743 static
fts_parallel_tokenization(void * arg)744 void fts_parallel_tokenization(
745 /*======================*/
746 	void*		arg)	/*!< in: psort_info for the thread */
747 {
748 	fts_psort_t*		psort_info = (fts_psort_t*) arg;
749 	ulint			i;
750 	fts_doc_item_t*		doc_item = NULL;
751 	row_merge_buf_t**	buf;
752 	ibool			processed = FALSE;
753 	merge_file_t**		merge_file;
754 	row_merge_block_t**	block;
755 	row_merge_block_t**	crypt_block;
756 	pfs_os_file_t		tmpfd[FTS_NUM_AUX_INDEX];
757 	ulint			mycount[FTS_NUM_AUX_INDEX];
758 	ulint			num_doc_processed = 0;
759 	doc_id_t		last_doc_id = 0;
760 	mem_heap_t*		blob_heap = NULL;
761 	fts_doc_t		doc;
762 	dict_table_t*		table = psort_info->psort_common->new_table;
763 	fts_tokenize_ctx_t	t_ctx;
764 	ulint			retried = 0;
765 	dberr_t			error = DB_SUCCESS;
766 
767 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
768 
769 	/* const char*		path = thd_innodb_tmpdir(
770 		psort_info->psort_common->trx->mysql_thd);
771 	*/
772 
773 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
774 
775 	const char*		path = thd_innodb_tmpdir(
776 		psort_info->psort_common->trx->mysql_thd);
777 
778 	ut_ad(psort_info);
779 
780 	buf = psort_info->merge_buf;
781 	merge_file = psort_info->merge_file;
782 	blob_heap = mem_heap_create(512);
783 	memset(&doc, 0, sizeof(doc));
784 	memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
785 
786 	doc.charset = fts_index_get_charset(
787 		psort_info->psort_common->dup->index);
788 
789 	block = psort_info->merge_block;
790 	crypt_block = psort_info->crypt_block;
791 
792 	const ulint zip_size = psort_info->psort_common->old_zip_size;
793 
794 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
795 
796 	t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
797 	processed = TRUE;
798 loop:
799 	while (doc_item) {
800 		dfield_t*	dfield = doc_item->field;
801 
802 		last_doc_id = doc_item->doc_id;
803 
804 		ut_ad (dfield->data != NULL
805 		       && dfield_get_len(dfield) != UNIV_SQL_NULL);
806 
807 		/* If finish processing the last item, update "doc" with
808 		strings in the doc_item, otherwise continue processing last
809 		item */
810 		if (processed) {
811 			byte*		data;
812 			ulint		data_len;
813 
814 			dfield = doc_item->field;
815 			data = static_cast<byte*>(dfield_get_data(dfield));
816 			data_len = dfield_get_len(dfield);
817 
818 			if (dfield_is_ext(dfield)) {
819 				doc.text.f_str =
820 					btr_copy_externally_stored_field(
821 						&doc.text.f_len, data,
822 						zip_size, data_len, blob_heap);
823 			} else {
824 				doc.text.f_str = data;
825 				doc.text.f_len = data_len;
826 			}
827 
828 			doc.tokens = 0;
829 			t_ctx.processed_len = 0;
830 		} else {
831 			/* Not yet finish processing the "doc" on hand,
832 			continue processing it */
833 			ut_ad(doc.text.f_str);
834 			ut_ad(t_ctx.processed_len < doc.text.f_len);
835 		}
836 
837 		processed = row_merge_fts_doc_tokenize(
838 			buf, doc_item->doc_id, &doc,
839 			merge_file, psort_info->psort_common->opt_doc_id_size,
840 			&t_ctx);
841 
842 		/* Current sort buffer full, need to recycle */
843 		if (!processed) {
844 			ut_ad(t_ctx.processed_len < doc.text.f_len);
845 			ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
846 			break;
847 		}
848 
849 		num_doc_processed++;
850 
851 		if (UNIV_UNLIKELY(fts_enable_diag_print)
852 		    && num_doc_processed % 10000 == 1) {
853 			ib::info() << "Number of documents processed: "
854 				<< num_doc_processed;
855 #ifdef FTS_INTERNAL_DIAG_PRINT
856 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
857 				ib::info() << "ID " << psort_info->psort_id
858 					<< ", partition " << i << ", word "
859 					<< mycount[i];
860 			}
861 #endif
862 		}
863 
864 		mem_heap_empty(blob_heap);
865 
866 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
867 
868 		if (doc_item && last_doc_id != doc_item->doc_id) {
869 			t_ctx.init_pos = 0;
870 		}
871 	}
872 
873 	/* If we run out of current sort buffer, need to sort
874 	and flush the sort buffer to disk */
875 	if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
876 		row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
877 		row_merge_buf_write(buf[t_ctx.buf_used],
878 				    merge_file[t_ctx.buf_used],
879 				    block[t_ctx.buf_used]);
880 
881 		if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
882 				     merge_file[t_ctx.buf_used]->offset++,
883 				     block[t_ctx.buf_used],
884 				     crypt_block[t_ctx.buf_used],
885 				     table->space_id)) {
886 			error = DB_TEMP_FILE_WRITE_FAIL;
887 			goto func_exit;
888 		}
889 
890 		MEM_UNDEFINED(block[t_ctx.buf_used], srv_sort_buf_size);
891 		buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
892 		mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
893 		t_ctx.rows_added[t_ctx.buf_used] = 0;
894 
895 		ut_a(doc_item);
896 		goto loop;
897 	}
898 
899 	/* Parent done scanning, and if finish processing all the docs, exit */
900 	if (psort_info->state == FTS_PARENT_COMPLETE) {
901 		if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
902 			goto exit;
903 		} else if (retried > 10000) {
904 			ut_ad(!doc_item);
905 			/* retried too many times and cannot get new record */
906 			ib::error() << "FTS parallel sort processed "
907 				<< num_doc_processed
908 				<< " records, the sort queue has "
909 				<< UT_LIST_GET_LEN(psort_info->fts_doc_list)
910 				<< " records. But sort cannot get the next"
911 				" records during alter table " << table->name;
912 			goto exit;
913 		}
914 	} else if (psort_info->state == FTS_PARENT_EXITING) {
915 		/* Parent abort */
916 		goto func_exit;
917 	}
918 
919 	if (doc_item == NULL) {
920 		os_thread_yield();
921 	}
922 
923 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
924 
925 	if (doc_item != NULL) {
926 		if (last_doc_id != doc_item->doc_id) {
927 			t_ctx.init_pos = 0;
928 		}
929 
930 		retried = 0;
931 	} else if (psort_info->state == FTS_PARENT_COMPLETE) {
932 		retried++;
933 	}
934 
935 	goto loop;
936 
937 exit:
938 	/* Do a final sort of the last (or latest) batch of records
939 	in block memory. Flush them to temp file if records cannot
940 	be hold in one block memory */
941 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
942 		if (t_ctx.rows_added[i]) {
943 			row_merge_buf_sort(buf[i], NULL);
944 			row_merge_buf_write(
945 				buf[i], merge_file[i], block[i]);
946 
947 			/* Write to temp file, only if records have
948 			been flushed to temp file before (offset > 0):
949 			The pseudo code for sort is following:
950 
951 				while (there are rows) {
952 					tokenize rows, put result in block[]
953 					if (block[] runs out) {
954 						sort rows;
955 						write to temp file with
956 						row_merge_write();
957 						offset++;
958 					}
959 				}
960 
961 				# write out the last batch
962 				if (offset > 0) {
963 					row_merge_write();
964 					offset++;
965 				} else {
966 					# no need to write anything
967 					offset stay as 0
968 				}
969 
970 			so if merge_file[i]->offset is 0 when we come to
971 			here as the last batch, this means rows have
972 			never flush to temp file, it can be held all in
973 			memory */
974 			if (merge_file[i]->offset != 0) {
975 				if (!row_merge_write(merge_file[i]->fd,
976 						merge_file[i]->offset++,
977 						block[i],
978 						crypt_block[i],
979 						table->space_id)) {
980 					error = DB_TEMP_FILE_WRITE_FAIL;
981 					goto func_exit;
982 				}
983 
984 #ifdef HAVE_valgrind
985 				MEM_UNDEFINED(block[i], srv_sort_buf_size);
986 
987 				if (crypt_block[i]) {
988 					MEM_UNDEFINED(crypt_block[i],
989 						      srv_sort_buf_size);
990 				}
991 #endif /* HAVE_valgrind */
992 			}
993 
994 			buf[i] = row_merge_buf_empty(buf[i]);
995 			t_ctx.rows_added[i] = 0;
996 		}
997 	}
998 
999 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1000 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: start merge sort\n");
1001 	}
1002 
1003 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1004 		if (!merge_file[i]->offset) {
1005 			continue;
1006 		}
1007 
1008 		tmpfd[i] = row_merge_file_create_low(path);
1009 		if (tmpfd[i] == OS_FILE_CLOSED) {
1010 			error = DB_OUT_OF_MEMORY;
1011 			goto func_exit;
1012 		}
1013 
1014 		error = row_merge_sort(psort_info->psort_common->trx,
1015 				       psort_info->psort_common->dup,
1016 				       merge_file[i], block[i], &tmpfd[i],
1017 				       false, 0.0/* pct_progress */, 0.0/* pct_cost */,
1018 				       crypt_block[i], table->space_id);
1019 
1020 		if (error != DB_SUCCESS) {
1021 			row_merge_file_destroy_low(tmpfd[i]);
1022 			goto func_exit;
1023 		}
1024 
1025 		row_merge_file_destroy_low(tmpfd[i]);
1026 	}
1027 
1028 func_exit:
1029 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1030 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: complete merge sort\n");
1031 	}
1032 
1033 	mem_heap_free(blob_heap);
1034 
1035 	mutex_enter(&psort_info->mutex);
1036 	psort_info->error = error;
1037 	mutex_exit(&psort_info->mutex);
1038 
1039 	if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
1040 		/* child can exit either with error or told by parent. */
1041 		ut_ad(error != DB_SUCCESS
1042 		      || psort_info->state == FTS_PARENT_EXITING);
1043 	}
1044 
1045 	/* Free fts doc list in case of error. */
1046 	do {
1047 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
1048 	} while (doc_item != NULL);
1049 
1050 	psort_info->child_status = FTS_CHILD_COMPLETE;
1051 	os_event_set(psort_info->psort_common->sort_event);
1052 	psort_info->child_status = FTS_CHILD_EXITING;
1053 }
1054 
1055 /*********************************************************************//**
1056 Start the parallel tokenization and parallel merge sort */
1057 void
row_fts_start_psort(fts_psort_t * psort_info)1058 row_fts_start_psort(
1059 /*================*/
1060 	fts_psort_t*	psort_info)	/*!< parallel sort structure */
1061 {
1062 	ulint		i = 0;
1063 
1064 	for (i = 0; i < fts_sort_pll_degree; i++) {
1065 		psort_info[i].psort_id = i;
1066 		psort_info[i].task =
1067 			new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
1068 		srv_thread_pool->submit_task(psort_info[i].task);
1069 	}
1070 }
1071 
1072 /*********************************************************************//**
1073 Function performs the merge and insertion of the sorted records. */
1074 static
1075 void
fts_parallel_merge(void * arg)1076 fts_parallel_merge(
1077 /*===============*/
1078 	void*		arg)		/*!< in: parallel merge info */
1079 {
1080 	fts_psort_t*	psort_info = (fts_psort_t*) arg;
1081 	ulint		id;
1082 
1083 	ut_ad(psort_info);
1084 
1085 	id = psort_info->psort_id;
1086 
1087 	row_fts_merge_insert(psort_info->psort_common->dup->index,
1088 			     psort_info->psort_common->new_table,
1089 			     psort_info->psort_common->all_info, id);
1090 }
1091 
1092 /*********************************************************************//**
1093 Kick off the parallel merge and insert thread */
1094 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)1095 row_fts_start_parallel_merge(
1096 /*=========================*/
1097 	fts_psort_t*	merge_info)	/*!< in: parallel sort info */
1098 {
1099 	ulint		i = 0;
1100 
1101 	/* Kick off merge/insert tasks */
1102 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
1103 		merge_info[i].psort_id = i;
1104 		merge_info[i].child_status = 0;
1105 
1106 		merge_info[i].task = new tpool::waitable_task(
1107 			fts_parallel_merge,
1108 			(void*) &merge_info[i]);
1109 		srv_thread_pool->submit_task(merge_info[i].task);
1110 	}
1111 }
1112 
1113 /**
1114 Write out a single word's data as new entry/entries in the INDEX table.
1115 @param[in]	ins_ctx	insert context
1116 @param[in]	word	word string
1117 @param[in]	node	node colmns
1118 @return	DB_SUCCUESS if insertion runs fine, otherwise error code */
1119 static
1120 dberr_t
row_merge_write_fts_node(const fts_psort_insert_t * ins_ctx,const fts_string_t * word,const fts_node_t * node)1121 row_merge_write_fts_node(
1122 	const	fts_psort_insert_t*	ins_ctx,
1123 	const	fts_string_t*		word,
1124 	const	fts_node_t*		node)
1125 {
1126 	dtuple_t*	tuple;
1127 	dfield_t*	field;
1128 	dberr_t		ret = DB_SUCCESS;
1129 	doc_id_t	write_first_doc_id[8];
1130 	doc_id_t	write_last_doc_id[8];
1131 	ib_uint32_t	write_doc_count;
1132 
1133 	tuple = ins_ctx->tuple;
1134 
1135 	/* The first field is the tokenized word */
1136 	field = dtuple_get_nth_field(tuple, 0);
1137 	dfield_set_data(field, word->f_str, word->f_len);
1138 
1139 	/* The second field is first_doc_id */
1140 	field = dtuple_get_nth_field(tuple, 1);
1141 	fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id);
1142 	dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t));
1143 
1144 	/* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/
1145 	/* The fifth field is last_doc_id */
1146 	field = dtuple_get_nth_field(tuple, 4);
1147 	fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id);
1148 	dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t));
1149 
1150 	/* The sixth field is doc_count */
1151 	field = dtuple_get_nth_field(tuple, 5);
1152 	mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count);
1153 	dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t));
1154 
1155 	/* The seventh field is ilist */
1156 	field = dtuple_get_nth_field(tuple, 6);
1157 	dfield_set_data(field, node->ilist, node->ilist_size);
1158 
1159 	ret = ins_ctx->btr_bulk->insert(tuple);
1160 
1161 	return(ret);
1162 }
1163 
1164 /********************************************************************//**
1165 Insert processed FTS data to auxillary index tables.
1166 @return DB_SUCCESS if insertion runs fine */
1167 static MY_ATTRIBUTE((nonnull))
1168 dberr_t
row_merge_write_fts_word(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word)1169 row_merge_write_fts_word(
1170 /*=====================*/
1171 	fts_psort_insert_t*	ins_ctx,	/*!< in: insert context */
1172 	fts_tokenizer_word_t*	word)		/*!< in: sorted and tokenized
1173 						word */
1174 {
1175 	dberr_t	ret = DB_SUCCESS;
1176 
1177 	ut_ad(ins_ctx->aux_index_id == fts_select_index(
1178 		ins_ctx->charset, word->text.f_str, word->text.f_len));
1179 
1180 	/* Pop out each fts_node in word->nodes write them to auxiliary table */
1181 	for (ulint i = 0; i < ib_vector_size(word->nodes); i++) {
1182 		dberr_t		error;
1183 		fts_node_t*	fts_node;
1184 
1185 		fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i));
1186 
1187 		error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node);
1188 
1189 		if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1190 			ib::error() << "Failed to write word to FTS auxiliary"
1191 				" index table "
1192 				<< ins_ctx->btr_bulk->table_name()
1193 				<< ", error " << error;
1194 			ret = error;
1195 		}
1196 
1197 		ut_free(fts_node->ilist);
1198 		fts_node->ilist = NULL;
1199 	}
1200 
1201 	ib_vector_reset(word->nodes);
1202 
1203 	return(ret);
1204 }
1205 
1206 /*********************************************************************//**
1207 Read sorted FTS data files and insert data tuples to auxillary tables.
1208 @return DB_SUCCESS or error number */
1209 static
1210 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1211 row_fts_insert_tuple(
1212 /*=================*/
1213 	fts_psort_insert_t*
1214 			ins_ctx,	/*!< in: insert context */
1215 	fts_tokenizer_word_t* word,	/*!< in: last processed
1216 					tokenized word */
1217 	ib_vector_t*	positions,	/*!< in: word position */
1218 	doc_id_t*	in_doc_id,	/*!< in: last item doc id */
1219 	dtuple_t*	dtuple)		/*!< in: entry to insert */
1220 {
1221 	fts_node_t*	fts_node = NULL;
1222 	dfield_t*	dfield;
1223 	doc_id_t	doc_id;
1224 	ulint		position;
1225 	fts_string_t	token_word;
1226 	ulint		i;
1227 
1228 	/* Get fts_node for the FTS auxillary INDEX table */
1229 	if (ib_vector_size(word->nodes) > 0) {
1230 		fts_node = static_cast<fts_node_t*>(
1231 			ib_vector_last(word->nodes));
1232 	}
1233 
1234 	if (fts_node == NULL
1235 	    || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1236 
1237 		fts_node = static_cast<fts_node_t*>(
1238 			ib_vector_push(word->nodes, NULL));
1239 
1240 		memset(fts_node, 0x0, sizeof(*fts_node));
1241 	}
1242 
1243 	/* If dtuple == NULL, this is the last word to be processed */
1244 	if (!dtuple) {
1245 		if (fts_node && ib_vector_size(positions) > 0) {
1246 			fts_cache_node_add_positions(
1247 				NULL, fts_node, *in_doc_id,
1248 				positions);
1249 
1250 			/* Write out the current word */
1251 			row_merge_write_fts_word(ins_ctx, word);
1252 		}
1253 
1254 		return;
1255 	}
1256 
1257 	/* Get the first field for the tokenized word */
1258 	dfield = dtuple_get_nth_field(dtuple, 0);
1259 
1260 	token_word.f_n_char = 0;
1261 	token_word.f_len = dfield->len;
1262 	token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1263 
1264 	if (!word->text.f_str) {
1265 		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1266 	}
1267 
1268 	/* compare to the last word, to see if they are the same
1269 	word */
1270 	if (innobase_fts_text_cmp(ins_ctx->charset,
1271 				  &word->text, &token_word) != 0) {
1272 		ulint	num_item;
1273 
1274 		/* Getting a new word, flush the last position info
1275 		for the currnt word in fts_node */
1276 		if (ib_vector_size(positions) > 0) {
1277 			fts_cache_node_add_positions(
1278 				NULL, fts_node, *in_doc_id, positions);
1279 		}
1280 
1281 		/* Write out the current word */
1282 		row_merge_write_fts_word(ins_ctx, word);
1283 
1284 		/* Copy the new word */
1285 		fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1286 
1287 		num_item = ib_vector_size(positions);
1288 
1289 		/* Clean up position queue */
1290 		for (i = 0; i < num_item; i++) {
1291 			ib_vector_pop(positions);
1292 		}
1293 
1294 		/* Reset Doc ID */
1295 		*in_doc_id = 0;
1296 		memset(fts_node, 0x0, sizeof(*fts_node));
1297 	}
1298 
1299 	/* Get the word's Doc ID */
1300 	dfield = dtuple_get_nth_field(dtuple, 1);
1301 
1302 	if (!ins_ctx->opt_doc_id_size) {
1303 		doc_id = fts_read_doc_id(
1304 			static_cast<byte*>(dfield_get_data(dfield)));
1305 	} else {
1306 		doc_id = (doc_id_t) mach_read_from_4(
1307 			static_cast<byte*>(dfield_get_data(dfield)));
1308 	}
1309 
1310 	/* Get the word's position info */
1311 	dfield = dtuple_get_nth_field(dtuple, 2);
1312 	position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1313 
1314 	/* If this is the same word as the last word, and they
1315 	have the same Doc ID, we just need to add its position
1316 	info. Otherwise, we will flush position info to the
1317 	fts_node and initiate a new position vector  */
1318 	if (!(*in_doc_id) || *in_doc_id == doc_id) {
1319 		ib_vector_push(positions, &position);
1320 	} else {
1321 		ulint	num_pos = ib_vector_size(positions);
1322 
1323 		fts_cache_node_add_positions(NULL, fts_node,
1324 					     *in_doc_id, positions);
1325 		for (i = 0; i < num_pos; i++) {
1326 			ib_vector_pop(positions);
1327 		}
1328 		ib_vector_push(positions, &position);
1329 	}
1330 
1331 	/* record the current Doc ID */
1332 	*in_doc_id = doc_id;
1333 }
1334 
1335 /*********************************************************************//**
1336 Propagate a newly added record up one level in the selection tree
1337 @return parent where this value propagated to */
1338 static
1339 ulint
row_fts_sel_tree_propagate(ulint propogated,int * sel_tree,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1340 row_fts_sel_tree_propagate(
1341 /*=======================*/
1342 	ulint		propogated,	/*<! in: tree node propagated */
1343 	int*		sel_tree,	/*<! in: selection tree */
1344 	const mrec_t**	mrec,		/*<! in: sort record */
1345 	rec_offs**	offsets,	/*<! in: record offsets */
1346 	dict_index_t*	index)		/*<! in/out: FTS index */
1347 {
1348 	ulint	parent;
1349 	int	child_left;
1350 	int	child_right;
1351 	int	selected;
1352 
1353 	/* Find which parent this value will be propagated to */
1354 	parent = (propogated - 1) / 2;
1355 
1356 	/* Find out which value is smaller, and to propagate */
1357 	child_left = sel_tree[parent * 2 + 1];
1358 	child_right = sel_tree[parent * 2 + 2];
1359 
1360 	if (child_left == -1 || mrec[child_left] == NULL) {
1361 		if (child_right == -1
1362 		    || mrec[child_right] == NULL) {
1363 			selected = -1;
1364 		} else {
1365 			selected = child_right ;
1366 		}
1367 	} else if (child_right == -1
1368 		   || mrec[child_right] == NULL) {
1369 		selected = child_left;
1370 	} else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1371 				      offsets[child_left],
1372 				      offsets[child_right],
1373 				      index, NULL) < 0) {
1374 		selected = child_left;
1375 	} else {
1376 		selected = child_right;
1377 	}
1378 
1379 	sel_tree[parent] = selected;
1380 
1381 	return parent;
1382 }
1383 
1384 /*********************************************************************//**
1385 Readjust selection tree after popping the root and read a new value
1386 @return the new root */
1387 static
1388 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1389 row_fts_sel_tree_update(
1390 /*====================*/
1391 	int*		sel_tree,	/*<! in/out: selection tree */
1392 	ulint		propagated,	/*<! in: node to propagate up */
1393 	ulint		height,		/*<! in: tree height */
1394 	const mrec_t**	mrec,		/*<! in: sort record */
1395 	rec_offs**	offsets,	/*<! in: record offsets */
1396 	dict_index_t*	index)		/*<! in: index dictionary */
1397 {
1398 	ulint	i;
1399 
1400 	for (i = 1; i <= height; i++) {
1401 		propagated = row_fts_sel_tree_propagate(
1402 			propagated, sel_tree, mrec, offsets, index);
1403 	}
1404 
1405 	return(sel_tree[0]);
1406 }
1407 
1408 /*********************************************************************//**
1409 Build selection tree at a specified level */
1410 static
1411 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1412 row_fts_build_sel_tree_level(
1413 /*=========================*/
1414 	int*		sel_tree,	/*<! in/out: selection tree */
1415 	ulint		level,		/*<! in: selection tree level */
1416 	const mrec_t**	mrec,		/*<! in: sort record */
1417 	rec_offs**	offsets,	/*<! in: record offsets */
1418 	dict_index_t*	index)		/*<! in: index dictionary */
1419 {
1420 	ulint	start;
1421 	int	child_left;
1422 	int	child_right;
1423 	ulint	i;
1424 	ulint	num_item	= ulint(1) << level;
1425 
1426 	start = num_item - 1;
1427 
1428 	for (i = 0; i < num_item;  i++) {
1429 		child_left = sel_tree[(start + i) * 2 + 1];
1430 		child_right = sel_tree[(start + i) * 2 + 2];
1431 
1432 		if (child_left == -1) {
1433 			if (child_right == -1) {
1434 				sel_tree[start + i] = -1;
1435 			} else {
1436 				sel_tree[start + i] =  child_right;
1437 			}
1438 			continue;
1439 		} else if (child_right == -1) {
1440 			sel_tree[start + i] = child_left;
1441 			continue;
1442 		}
1443 
1444 		/* Deal with NULL child conditions */
1445 		if (!mrec[child_left]) {
1446 			if (!mrec[child_right]) {
1447 				sel_tree[start + i] = -1;
1448 			} else {
1449 				sel_tree[start + i] = child_right;
1450 			}
1451 			continue;
1452 		} else if (!mrec[child_right]) {
1453 			sel_tree[start + i] = child_left;
1454 			continue;
1455 		}
1456 
1457 		/* Select the smaller one to set parent pointer */
1458 		int cmp = cmp_rec_rec_simple(
1459 			mrec[child_left], mrec[child_right],
1460 			offsets[child_left], offsets[child_right],
1461 			index, NULL);
1462 
1463 		sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1464 	}
1465 }
1466 
1467 /*********************************************************************//**
1468 Build a selection tree for merge. The selection tree is a binary tree
1469 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1470 @return number of tree levels */
1471 static
1472 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1473 row_fts_build_sel_tree(
1474 /*===================*/
1475 	int*		sel_tree,	/*<! in/out: selection tree */
1476 	const mrec_t**	mrec,		/*<! in: sort record */
1477 	rec_offs**	offsets,	/*<! in: record offsets */
1478 	dict_index_t*	index)		/*<! in: index dictionary */
1479 {
1480 	ulint	treelevel = 1;
1481 	ulint	num = 2;
1482 	ulint	i = 0;
1483 	ulint	start;
1484 
1485 	/* No need to build selection tree if we only have two merge threads */
1486 	if (fts_sort_pll_degree <= 2) {
1487 		return(0);
1488 	}
1489 
1490 	while (num < fts_sort_pll_degree) {
1491 		num = num << 1;
1492 		treelevel++;
1493 	}
1494 
1495 	start = (ulint(1) << treelevel) - 1;
1496 
1497 	for (i = 0; i < fts_sort_pll_degree; i++) {
1498 		sel_tree[i + start] = int(i);
1499 	}
1500 
1501 	i = treelevel;
1502 	do {
1503 		row_fts_build_sel_tree_level(
1504 			sel_tree, --i, mrec, offsets, index);
1505 	} while (i > 0);
1506 
1507 	return(treelevel);
1508 }
1509 
1510 /*********************************************************************//**
1511 Read sorted file containing index data tuples and insert these data
1512 tuples to the index
1513 @return DB_SUCCESS or error number */
1514 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1515 row_fts_merge_insert(
1516 /*=================*/
1517 	dict_index_t*		index,	/*!< in: index */
1518 	dict_table_t*		table,	/*!< in: new table */
1519 	fts_psort_t*		psort_info, /*!< parallel sort info */
1520 	ulint			id)	/* !< in: which auxiliary table's data
1521 					to insert to */
1522 {
1523 	const byte**		b;
1524 	mem_heap_t*		tuple_heap;
1525 	mem_heap_t*		heap;
1526 	dberr_t			error = DB_SUCCESS;
1527 	ulint*			foffs;
1528 	rec_offs**		offsets;
1529 	fts_tokenizer_word_t	new_word;
1530 	ib_vector_t*		positions;
1531 	doc_id_t		last_doc_id;
1532 	ib_alloc_t*		heap_alloc;
1533 	ulint			i;
1534 	mrec_buf_t**		buf;
1535 	pfs_os_file_t*			fd;
1536 	byte**			block;
1537 	byte**			crypt_block;
1538 	const mrec_t**		mrec;
1539 	ulint			count = 0;
1540 	int*			sel_tree;
1541 	ulint			height;
1542 	ulint			start;
1543 	fts_psort_insert_t	ins_ctx;
1544 	uint64_t		count_diag = 0;
1545 	fts_table_t		fts_table;
1546 	char			aux_table_name[MAX_FULL_NAME_LEN];
1547 	dict_table_t*		aux_table;
1548 	dict_index_t*		aux_index;
1549 	trx_t*			trx;
1550 
1551 	/* We use the insert query graph as the dummy graph
1552 	needed in the row module call */
1553 
1554 	trx = trx_create();
1555 	trx_start_if_not_started(trx, true);
1556 
1557 	trx->op_info = "inserting index entries";
1558 
1559 	ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1560 
1561 	heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1562 
1563 	b = (const byte**) mem_heap_alloc(
1564 		heap, sizeof (*b) * fts_sort_pll_degree);
1565 	foffs = (ulint*) mem_heap_alloc(
1566 		heap, sizeof(*foffs) * fts_sort_pll_degree);
1567 	offsets = (rec_offs**) mem_heap_alloc(
1568 		heap, sizeof(*offsets) * fts_sort_pll_degree);
1569 	buf = (mrec_buf_t**) mem_heap_alloc(
1570 		heap, sizeof(*buf) * fts_sort_pll_degree);
1571 	fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1572 	block = (byte**) mem_heap_alloc(
1573 		heap, sizeof(*block) * fts_sort_pll_degree);
1574 	crypt_block = (byte**) mem_heap_alloc(
1575 		heap, sizeof(*block) * fts_sort_pll_degree);
1576 	mrec = (const mrec_t**) mem_heap_alloc(
1577 		heap, sizeof(*mrec) * fts_sort_pll_degree);
1578 	sel_tree = (int*) mem_heap_alloc(
1579 		heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1580 
1581 	tuple_heap = mem_heap_create(1000);
1582 
1583 	ins_ctx.charset = fts_index_get_charset(index);
1584 	ins_ctx.heap = heap;
1585 
1586 	for (i = 0; i < fts_sort_pll_degree; i++) {
1587 		ulint	num;
1588 
1589 		num = 1 + REC_OFFS_HEADER_SIZE
1590 			+ dict_index_get_n_fields(index);
1591 		offsets[i] = static_cast<rec_offs*>(mem_heap_zalloc(
1592 			heap, num * sizeof *offsets[i]));
1593 		rec_offs_set_n_alloc(offsets[i], num);
1594 		rec_offs_set_n_fields(offsets[i], dict_index_get_n_fields(index));
1595 		block[i] = psort_info[i].merge_block[id];
1596 		crypt_block[i] = psort_info[i].crypt_block[id];
1597 		b[i] = psort_info[i].merge_block[id];
1598 		fd[i] = psort_info[i].merge_file[id]->fd;
1599 		foffs[i] = 0;
1600 
1601 		buf[i] = static_cast<mrec_buf_t*>(
1602 			mem_heap_alloc(heap, sizeof *buf[i]));
1603 
1604 		count_diag += psort_info[i].merge_file[id]->n_rec;
1605 	}
1606 
1607 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1608 		ib::info() << "InnoDB_FTS: to insert " << count_diag
1609 			<< " records";
1610 	}
1611 
1612 	/* Initialize related variables if creating FTS indexes */
1613 	heap_alloc = ib_heap_allocator_create(heap);
1614 
1615 	memset(&new_word, 0, sizeof(new_word));
1616 
1617 	new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1618 	positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1619 	last_doc_id = 0;
1620 
1621 	/* We should set the flags2 with aux_table_name here,
1622 	in order to get the correct aux table names. */
1623 	index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1624 	DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1625 			index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME
1626 			& ((1U << DICT_TF2_BITS) - 1););
1627 	fts_table.type = FTS_INDEX_TABLE;
1628 	fts_table.index_id = index->id;
1629 	fts_table.table_id = table->id;
1630 	fts_table.table = index->table;
1631 	fts_table.suffix = fts_get_suffix(id);
1632 
1633 	/* Get aux index */
1634 	fts_get_table_name(&fts_table, aux_table_name);
1635 	aux_table = dict_table_open_on_name(aux_table_name, FALSE, FALSE,
1636 					    DICT_ERR_IGNORE_NONE);
1637 	ut_ad(aux_table != NULL);
1638 	aux_index = dict_table_get_first_index(aux_table);
1639 
1640 	ut_ad(!aux_index->is_instant());
1641 	/* row_merge_write_fts_node() depends on the correct value */
1642 	ut_ad(aux_index->n_core_null_bytes
1643 	      == UT_BITS_IN_BYTES(aux_index->n_nullable));
1644 
1645 	/* Create bulk load instance */
1646 	ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx));
1647 
1648 	/* Create tuple for insert */
1649 	ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
1650 	dict_index_copy_types(ins_ctx.tuple, aux_index,
1651 			      dict_index_get_n_fields(aux_index));
1652 
1653 	/* Set TRX_ID and ROLL_PTR */
1654 	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 2),
1655 			&reset_trx_id, DATA_TRX_ID_LEN);
1656 	dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 3),
1657 			&reset_trx_id[DATA_TRX_ID_LEN], DATA_ROLL_PTR_LEN);
1658 
1659 	ut_d(ins_ctx.aux_index_id = id);
1660 
1661 	const ulint space = table->space_id;
1662 
1663 	for (i = 0; i < fts_sort_pll_degree; i++) {
1664 		if (psort_info[i].merge_file[id]->n_rec == 0) {
1665 			/* No Rows to read */
1666 			mrec[i] = b[i] = NULL;
1667 		} else {
1668 			/* Read from temp file only if it has been
1669 			written to. Otherwise, block memory holds
1670 			all the sorted records */
1671 			if (psort_info[i].merge_file[id]->offset > 0
1672 			    && (!row_merge_read(
1673 					fd[i], foffs[i],
1674 					(row_merge_block_t*) block[i],
1675 					(row_merge_block_t*) crypt_block[i],
1676 					space))) {
1677 				error = DB_CORRUPTION;
1678 				goto exit;
1679 			}
1680 
1681 			ROW_MERGE_READ_GET_NEXT(i);
1682 		}
1683 	}
1684 
1685 	height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1686 					offsets, index);
1687 
1688 	start = (1U << height) - 1;
1689 
1690 	/* Fetch sorted records from sort buffer and insert them into
1691 	corresponding FTS index auxiliary tables */
1692 	for (;;) {
1693 		dtuple_t*	dtuple;
1694 		int		min_rec = 0;
1695 
1696 		if (fts_sort_pll_degree <= 2) {
1697 			while (!mrec[min_rec]) {
1698 				min_rec++;
1699 
1700 				if (min_rec >= (int) fts_sort_pll_degree) {
1701 					row_fts_insert_tuple(
1702 						&ins_ctx, &new_word,
1703 						positions, &last_doc_id,
1704 						NULL);
1705 
1706 					goto exit;
1707 				}
1708 			}
1709 
1710 			for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1711 				if (!mrec[i]) {
1712 					continue;
1713 				}
1714 
1715 				if (cmp_rec_rec_simple(
1716 					    mrec[i], mrec[min_rec],
1717 					    offsets[i], offsets[min_rec],
1718 					    index, NULL) < 0) {
1719 					min_rec = static_cast<int>(i);
1720 				}
1721 			}
1722 		} else {
1723 			min_rec = sel_tree[0];
1724 
1725 			if (min_rec ==  -1) {
1726 				row_fts_insert_tuple(
1727 					&ins_ctx, &new_word,
1728 					positions, &last_doc_id,
1729 					NULL);
1730 
1731 				goto exit;
1732 			}
1733 		}
1734 
1735 		dtuple = row_rec_to_index_entry_low(
1736 			mrec[min_rec], index, offsets[min_rec],
1737 			tuple_heap);
1738 
1739 		row_fts_insert_tuple(
1740 			&ins_ctx, &new_word, positions,
1741 			&last_doc_id, dtuple);
1742 
1743 
1744 		ROW_MERGE_READ_GET_NEXT(min_rec);
1745 
1746 		if (fts_sort_pll_degree > 2) {
1747 			if (!mrec[min_rec]) {
1748 				sel_tree[start + min_rec] = -1;
1749 			}
1750 
1751 			row_fts_sel_tree_update(sel_tree, start + min_rec,
1752 						height, mrec,
1753 						offsets, index);
1754 		}
1755 
1756 		count++;
1757 
1758 		mem_heap_empty(tuple_heap);
1759 	}
1760 
1761 exit:
1762 	fts_sql_commit(trx);
1763 
1764 	trx->op_info = "";
1765 
1766 	mem_heap_free(tuple_heap);
1767 
1768 	error = ins_ctx.btr_bulk->finish(error);
1769 	UT_DELETE(ins_ctx.btr_bulk);
1770 
1771 	dict_table_close(aux_table, FALSE, FALSE);
1772 
1773 	trx->free();
1774 
1775 	mem_heap_free(heap);
1776 
1777 	if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1778 		ib::info() << "InnoDB_FTS: inserted " << count << " records";
1779 	}
1780 
1781 	return(error);
1782 }
1783