1 /*****************************************************************************
2 
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ftsort.cc
29 Create Full Text Index with (parallel) merge sort
30 
31 Created 10/13/2010 Jimmy Yang
32 *******************************************************/
33 
34 #include "dict0dict.h" /* dict_table_stats_lock() */
35 #include "row0merge.h"
36 #include "pars0pars.h"
37 #include "row0ftsort.h"
38 #include "row0merge.h"
39 #include "row0row.h"
40 #include "btr0cur.h"
41 #include "btr0sea.h"
42 
43 /** Read the next record to buffer N.
44 @param N	index into array of merge info structure */
45 #define ROW_MERGE_READ_GET_NEXT(N)					\
46 	do {								\
47 		b[N] = row_merge_read_rec(				\
48 			block[N], buf[N], b[N], index,			\
49 			fd[N], &foffs[N], &mrec[N], offsets[N]);	\
50 		if (UNIV_UNLIKELY(!b[N])) {				\
51 			if (mrec[N]) {					\
52 				goto exit;				\
53 			}						\
54 		}							\
55 	} while (0)
56 
57 /** Parallel sort degree */
58 UNIV_INTERN ulong	fts_sort_pll_degree	= 2;
59 
60 /*********************************************************************//**
61 Create a temporary "fts sort index" used to merge sort the
62 tokenized doc string. The index has three "fields":
63 
64 1) Tokenized word,
65 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
66 integer value)
67 3) Word's position in original doc.
68 
69 @return dict_index_t structure for the fts sort index */
70 UNIV_INTERN
71 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,const dict_table_t * table,ibool * opt_doc_id_size)72 row_merge_create_fts_sort_index(
73 /*============================*/
74 	dict_index_t*		index,	/*!< in: Original FTS index
75 					based on which this sort index
76 					is created */
77 	const dict_table_t*	table,	/*!< in: table that FTS index
78 					is being created on */
79 	ibool*			opt_doc_id_size)
80 					/*!< out: whether to use 4 bytes
81 					instead of 8 bytes integer to
82 					store Doc ID during sort */
83 {
84 	dict_index_t*   new_index;
85 	dict_field_t*   field;
86 	dict_field_t*   idx_field;
87 	CHARSET_INFO*	charset;
88 
89 	// FIXME: This name shouldn't be hard coded here.
90 	new_index = dict_mem_index_create(
91 		index->table->name, "tmp_fts_idx", 0, DICT_FTS, 3);
92 
93 	new_index->id = index->id;
94 	new_index->table = (dict_table_t*) table;
95 	new_index->n_uniq = FTS_NUM_FIELDS_SORT;
96 	new_index->n_def = FTS_NUM_FIELDS_SORT;
97 	new_index->cached = TRUE;
98 
99 	btr_search_index_init(new_index);
100 
101 	idx_field = dict_index_get_nth_field(index, 0);
102 	charset = fts_index_get_charset(index);
103 
104 	/* The first field is on the Tokenized Word */
105 	field = dict_index_get_nth_field(new_index, 0);
106 	field->name = NULL;
107 	field->prefix_len = 0;
108 	field->col = static_cast<dict_col_t*>(
109 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
110 	field->col->len = FTS_MAX_WORD_LEN;
111 
112 	if (strcmp(charset->name, "latin1_swedish_ci") == 0) {
113 		field->col->mtype = DATA_VARCHAR;
114 	} else {
115 		field->col->mtype = DATA_VARMYSQL;
116 	}
117 
118 	field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
119 	field->col->mbminmaxlen = idx_field->col->mbminmaxlen;
120 	field->fixed_len = 0;
121 
122 	/* Doc ID */
123 	field = dict_index_get_nth_field(new_index, 1);
124 	field->name = NULL;
125 	field->prefix_len = 0;
126 	field->col = static_cast<dict_col_t*>(
127 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
128 	field->col->mtype = DATA_INT;
129 	*opt_doc_id_size = FALSE;
130 
131 	/* Check whether we can use 4 bytes instead of 8 bytes integer
132 	field to hold the Doc ID, thus reduce the overall sort size */
133 	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
134 		/* If Doc ID column is being added by this create
135 		index, then just check the number of rows in the table */
136 		if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
137 			*opt_doc_id_size = TRUE;
138 		}
139 	} else {
140 		doc_id_t	max_doc_id;
141 
142 		/* If the Doc ID column is supplied by user, then
143 		check the maximum Doc ID in the table */
144 		max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
145 
146 		if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
147 			*opt_doc_id_size = TRUE;
148 		}
149 	}
150 
151 	if (*opt_doc_id_size) {
152 		field->col->len = sizeof(ib_uint32_t);
153 		field->fixed_len = sizeof(ib_uint32_t);
154 	} else {
155 		field->col->len = FTS_DOC_ID_LEN;
156 		field->fixed_len = FTS_DOC_ID_LEN;
157 	}
158 
159 	field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
160 
161 	field->col->mbminmaxlen = 0;
162 
163 	/* The third field is on the word's position in the original doc */
164 	field = dict_index_get_nth_field(new_index, 2);
165 	field->name = NULL;
166 	field->prefix_len = 0;
167 	field->col = static_cast<dict_col_t*>(
168 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
169 	field->col->mtype = DATA_INT;
170 	field->col->len = 4 ;
171 	field->fixed_len = 4;
172 	field->col->prtype = DATA_NOT_NULL;
173 	field->col->mbminmaxlen = 0;
174 
175 	return(new_index);
176 }
177 /*********************************************************************//**
178 Initialize FTS parallel sort structures.
179 @return TRUE if all successful */
180 UNIV_INTERN
181 ibool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,const dict_table_t * new_table,ibool opt_doc_id_size,fts_psort_t ** psort,fts_psort_t ** merge)182 row_fts_psort_info_init(
183 /*====================*/
184 	trx_t*			trx,	/*!< in: transaction */
185 	row_merge_dup_t*	dup,	/*!< in,own: descriptor of
186 					FTS index being created */
187 	const dict_table_t*	new_table,/*!< in: table on which indexes are
188 					created */
189 	ibool			opt_doc_id_size,
190 					/*!< in: whether to use 4 bytes
191 					instead of 8 bytes integer to
192 					store Doc ID during sort */
193 	fts_psort_t**		psort,	/*!< out: parallel sort info to be
194 					instantiated */
195 	fts_psort_t**		merge)	/*!< out: parallel merge info
196 					to be instantiated */
197 {
198 	ulint			i;
199 	ulint			j;
200 	fts_psort_common_t*	common_info = NULL;
201 	fts_psort_t*		psort_info = NULL;
202 	fts_psort_t*		merge_info = NULL;
203 	ulint			block_size;
204 	ibool			ret = TRUE;
205 
206 	block_size = 3 * srv_sort_buf_size;
207 
208 	*psort = psort_info = static_cast<fts_psort_t*>(mem_zalloc(
209 		 fts_sort_pll_degree * sizeof *psort_info));
210 
211 	if (!psort_info) {
212 		ut_free(dup);
213 		return(FALSE);
214 	}
215 
216 	/* Common Info for all sort threads */
217 	common_info = static_cast<fts_psort_common_t*>(
218 		mem_alloc(sizeof *common_info));
219 
220 	if (!common_info) {
221 		ut_free(dup);
222 		mem_free(psort_info);
223 		return(FALSE);
224 	}
225 
226 	common_info->dup = dup;
227 	common_info->new_table = (dict_table_t*) new_table;
228 	common_info->trx = trx;
229 	common_info->all_info = psort_info;
230 	common_info->sort_event = os_event_create();
231 	common_info->merge_event = os_event_create();
232 	common_info->opt_doc_id_size = opt_doc_id_size;
233 
234 	ut_ad(trx->mysql_thd != NULL);
235 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
236 
237 	/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
238 	each parallel sort thread. Each "sort bucket" holds records for
239 	a particular "FTS index partition" */
240 	for (j = 0; j < fts_sort_pll_degree; j++) {
241 
242 		UT_LIST_INIT(psort_info[j].fts_doc_list);
243 
244 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
245 
246 			psort_info[j].merge_file[i] =
247 				 static_cast<merge_file_t*>(
248 					mem_zalloc(sizeof(merge_file_t)));
249 
250 			if (!psort_info[j].merge_file[i]) {
251 				ret = FALSE;
252 				goto func_exit;
253 			}
254 
255 			psort_info[j].merge_buf[i] = row_merge_buf_create(
256 				dup->index);
257 
258 			if (row_merge_file_create(psort_info[j].merge_file[i],
259 						  path) < 0) {
260 				goto func_exit;
261 			}
262 
263 			/* Need to align memory for O_DIRECT write */
264 			psort_info[j].block_alloc[i] =
265 				static_cast<row_merge_block_t*>(ut_malloc(
266 					block_size + 1024));
267 
268 			psort_info[j].merge_block[i] =
269 				static_cast<row_merge_block_t*>(
270 					ut_align(
271 					psort_info[j].block_alloc[i], 1024));
272 
273 			if (!psort_info[j].merge_block[i]) {
274 				ret = FALSE;
275 				goto func_exit;
276 			}
277 		}
278 
279 		psort_info[j].child_status = 0;
280 		psort_info[j].state = 0;
281 		psort_info[j].psort_common = common_info;
282 		psort_info[j].error = DB_SUCCESS;
283 		psort_info[j].memory_used = 0;
284 		mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE);
285 	}
286 
287 	/* Initialize merge_info structures parallel merge and insert
288 	into auxiliary FTS tables (FTS_INDEX_TABLE) */
289 	*merge = merge_info = static_cast<fts_psort_t*>(
290 		mem_alloc(FTS_NUM_AUX_INDEX * sizeof *merge_info));
291 
292 	for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
293 
294 		merge_info[j].child_status = 0;
295 		merge_info[j].state = 0;
296 		merge_info[j].psort_common = common_info;
297 	}
298 
299 func_exit:
300 	if (!ret) {
301 		row_fts_psort_info_destroy(psort_info, merge_info);
302 	}
303 
304 	return(ret);
305 }
306 /*********************************************************************//**
307 Clean up and deallocate FTS parallel sort structures, and close the
308 merge sort files  */
309 UNIV_INTERN
310 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)311 row_fts_psort_info_destroy(
312 /*=======================*/
313 	fts_psort_t*	psort_info,	/*!< parallel sort info */
314 	fts_psort_t*	merge_info)	/*!< parallel merge info */
315 {
316 	ulint	i;
317 	ulint	j;
318 
319 	if (psort_info) {
320 		for (j = 0; j < fts_sort_pll_degree; j++) {
321 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
322 				if (psort_info[j].merge_file[i]) {
323 					row_merge_file_destroy(
324 						psort_info[j].merge_file[i]);
325 				}
326 
327 				if (psort_info[j].block_alloc[i]) {
328 					ut_free(psort_info[j].block_alloc[i]);
329 				}
330 				mem_free(psort_info[j].merge_file[i]);
331 			}
332 
333 			mutex_free(&psort_info[j].mutex);
334 		}
335 
336 		os_event_free(merge_info[0].psort_common->sort_event);
337 		os_event_free(merge_info[0].psort_common->merge_event);
338 		ut_free(merge_info[0].psort_common->dup);
339 		mem_free(merge_info[0].psort_common);
340 		mem_free(psort_info);
341 	}
342 
343 	if (merge_info) {
344 		mem_free(merge_info);
345 	}
346 }
347 /*********************************************************************//**
348 Free up merge buffers when merge sort is done */
349 UNIV_INTERN
350 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)351 row_fts_free_pll_merge_buf(
352 /*=======================*/
353 	fts_psort_t*	psort_info)	/*!< in: parallel sort info */
354 {
355 	ulint	j;
356 	ulint	i;
357 
358 	if (!psort_info) {
359 		return;
360 	}
361 
362 	for (j = 0; j < fts_sort_pll_degree; j++) {
363 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
364 			row_merge_buf_free(psort_info[j].merge_buf[i]);
365 		}
366 	}
367 
368 	return;
369 }
370 
371 /*********************************************************************//**
372 Tokenize incoming text data and add to the sort buffer.
373 @return	TRUE if the record passed, FALSE if out of space */
374 static
375 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,dtype_t * word_dtype,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)376 row_merge_fts_doc_tokenize(
377 /*=======================*/
378 	row_merge_buf_t**	sort_buf,	/*!< in/out: sort buffer */
379 	doc_id_t		doc_id,		/*!< in: Doc ID */
380 	fts_doc_t*		doc,		/*!< in: Doc to be tokenized */
381 	dtype_t*		word_dtype,	/*!< in: data structure for
382 						word col */
383 	merge_file_t**		merge_file,	/*!< in/out: merge file */
384 	ibool			opt_doc_id_size,/*!< in: whether to use 4 bytes
385 						instead of 8 bytes integer to
386 						store Doc ID during sort*/
387 	fts_tokenize_ctx_t*	t_ctx)          /*!< in/out: tokenize context */
388 {
389 	ulint		i;
390 	ulint		inc;
391 	fts_string_t	str;
392 	ulint		len;
393 	row_merge_buf_t* buf;
394 	dfield_t*	field;
395 	fts_string_t	t_str;
396 	ibool		buf_full = FALSE;
397 	byte		str_buf[FTS_MAX_WORD_LEN + 1];
398 	ulint		data_size[FTS_NUM_AUX_INDEX];
399 	ulint		n_tuple[FTS_NUM_AUX_INDEX];
400 
401 	t_str.f_n_char = 0;
402 	t_ctx->buf_used = 0;
403 
404 	memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
405 	memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
406 
407 	/* Tokenize the data and add each word string, its corresponding
408 	doc id and position to sort buffer */
409 	for (i = t_ctx->processed_len; i < doc->text.f_len; i += inc) {
410 		ib_rbt_bound_t	parent;
411 		ulint		idx = 0;
412 		ib_uint32_t	position;
413 		ulint           offset = 0;
414 		ulint		cur_len = 0;
415 		doc_id_t	write_doc_id;
416 
417 		inc = innobase_mysql_fts_get_token(
418 			doc->charset, doc->text.f_str + i,
419 			doc->text.f_str + doc->text.f_len, &str, &offset);
420 
421 		ut_a(inc > 0);
422 
423 		/* Ignore string whose character number is less than
424 		"fts_min_token_size" or more than "fts_max_token_size" */
425 		if (str.f_n_char < fts_min_token_size
426 		    || str.f_n_char > fts_max_token_size) {
427 
428 			t_ctx->processed_len += inc;
429 			continue;
430 		}
431 
432 		t_str.f_len = innobase_fts_casedn_str(
433 			doc->charset, (char*) str.f_str, str.f_len,
434 			(char*) &str_buf, FTS_MAX_WORD_LEN + 1);
435 
436 		t_str.f_str = (byte*) &str_buf;
437 
438 		/* if "cached_stopword" is defined, ingore words in the
439 		stopword list */
440 		if (t_ctx->cached_stopword
441 		    && rbt_search(t_ctx->cached_stopword,
442 				  &parent, &t_str) == 0) {
443 
444 			t_ctx->processed_len += inc;
445 			continue;
446 		}
447 
448 		/* There are FTS_NUM_AUX_INDEX auxiliary tables, find
449 		out which sort buffer to put this word record in */
450 		t_ctx->buf_used = fts_select_index(
451 			doc->charset, t_str.f_str, t_str.f_len);
452 
453 		buf = sort_buf[t_ctx->buf_used];
454 
455 		ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
456 		idx = t_ctx->buf_used;
457 
458 		mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
459 
460 		field = mtuple->fields = static_cast<dfield_t*>(
461 			mem_heap_alloc(buf->heap,
462 				       FTS_NUM_FIELDS_SORT * sizeof *field));
463 
464 		/* The first field is the tokenized word */
465 		dfield_set_data(field, t_str.f_str, t_str.f_len);
466 		len = dfield_get_len(field);
467 
468 		field->type.mtype = word_dtype->mtype;
469 		field->type.prtype = word_dtype->prtype | DATA_NOT_NULL;
470 
471 		/* Variable length field, set to max size. */
472 		field->type.len = FTS_MAX_WORD_LEN;
473 		field->type.mbminmaxlen = word_dtype->mbminmaxlen;
474 
475 		cur_len += len;
476 		dfield_dup(field, buf->heap);
477 		field++;
478 
479 		/* The second field is the Doc ID */
480 
481 		ib_uint32_t	doc_id_32_bit;
482 
483 		if (!opt_doc_id_size) {
484 			fts_write_doc_id((byte*) &write_doc_id, doc_id);
485 
486 			dfield_set_data(
487 				field, &write_doc_id, sizeof(write_doc_id));
488 		} else {
489 			mach_write_to_4(
490 				(byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
491 
492 			dfield_set_data(
493 				field, &doc_id_32_bit, sizeof(doc_id_32_bit));
494 		}
495 
496 		len = field->len;
497 		ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
498 
499 		field->type.mtype = DATA_INT;
500 		field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
501 		field->type.len = len;
502 		field->type.mbminmaxlen = 0;
503 
504 		cur_len += len;
505 		dfield_dup(field, buf->heap);
506 
507 		++field;
508 
509 		/* The third field is the position */
510 		mach_write_to_4(
511 			(byte*) &position,
512 			(i + offset + inc - str.f_len + t_ctx->init_pos));
513 
514 		dfield_set_data(field, &position, sizeof(position));
515 		len = dfield_get_len(field);
516 		ut_ad(len == sizeof(ib_uint32_t));
517 
518 		field->type.mtype = DATA_INT;
519 		field->type.prtype = DATA_NOT_NULL;
520 		field->type.len = len;
521 		field->type.mbminmaxlen = 0;
522 		cur_len += len;
523 		dfield_dup(field, buf->heap);
524 
525 		/* One variable length column, word with its lenght less than
526 		fts_max_token_size, add one extra size and one extra byte.
527 
528 		Since the max length for FTS token now is larger than 255,
529 		so we will need to signify length byte itself, so only 1 to 128
530 		bytes can be used for 1 bytes, larger than that 2 bytes. */
531 		if (t_str.f_len < 128) {
532 			/* Extra size is one byte. */
533 			cur_len += 2;
534 		} else {
535 			/* Extra size is two bytes. */
536 			cur_len += 3;
537 		}
538 
539 		/* Reserve one byte for the end marker of row_merge_block_t. */
540 		if (buf->total_size + data_size[idx] + cur_len
541 		    >= srv_sort_buf_size - 1) {
542 
543 			buf_full = TRUE;
544 			break;
545 		}
546 
547 		/* Increment the number of tuples */
548 		n_tuple[idx]++;
549 		t_ctx->processed_len += inc;
550 		data_size[idx] += cur_len;
551 	}
552 
553 	/* Update the data length and the number of new word tuples
554 	added in this round of tokenization */
555 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
556 		/* The computation of total_size below assumes that no
557 		delete-mark flags will be stored and that all fields
558 		are NOT NULL and fixed-length. */
559 
560 		sort_buf[i]->total_size += data_size[i];
561 
562 		sort_buf[i]->n_tuples += n_tuple[i];
563 
564 		merge_file[i]->n_rec += n_tuple[i];
565 		t_ctx->rows_added[i] += n_tuple[i];
566 	}
567 
568 	if (!buf_full) {
569 		/* we pad one byte between text accross two fields */
570 		t_ctx->init_pos += doc->text.f_len + 1;
571 	}
572 
573 	return(!buf_full);
574 }
575 
576 /*********************************************************************//**
577 Get next doc item from fts_doc_list */
578 UNIV_INLINE
579 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)580 row_merge_fts_get_next_doc_item(
581 /*============================*/
582 	fts_psort_t*		psort_info,	/*!< in: psort_info */
583 	fts_doc_item_t**	doc_item)	/*!< in/out: doc item */
584 {
585 	if (*doc_item != NULL) {
586 		ut_free(*doc_item);
587 	}
588 
589 	mutex_enter(&psort_info->mutex);
590 
591 	*doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
592 	if (*doc_item != NULL) {
593 		UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list,
594 			       *doc_item);
595 
596 		ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
597 		      + (*doc_item)->field->len);
598 		psort_info->memory_used -= sizeof(fts_doc_item_t)
599 			+ (*doc_item)->field->len;
600 	}
601 
602 	mutex_exit(&psort_info->mutex);
603 }
604 
605 /*********************************************************************//**
606 Function performs parallel tokenization of the incoming doc strings.
607 It also performs the initial in memory sort of the parsed records.
608 @return OS_THREAD_DUMMY_RETURN */
609 UNIV_INTERN
610 os_thread_ret_t
fts_parallel_tokenization(void * arg)611 fts_parallel_tokenization(
612 /*======================*/
613 	void*		arg)	/*!< in: psort_info for the thread */
614 {
615 	fts_psort_t*		psort_info = (fts_psort_t*) arg;
616 	ulint			i;
617 	fts_doc_item_t*		doc_item = NULL;
618 	row_merge_buf_t**	buf;
619 	ibool			processed = FALSE;
620 	merge_file_t**		merge_file;
621 	row_merge_block_t**	block;
622 	int			tmpfd[FTS_NUM_AUX_INDEX];
623 	ulint			mycount[FTS_NUM_AUX_INDEX];
624 	ib_uint64_t		total_rec = 0;
625 	ulint			num_doc_processed = 0;
626 	doc_id_t		last_doc_id = 0;
627 	ulint			zip_size;
628 	mem_heap_t*		blob_heap = NULL;
629 	fts_doc_t		doc;
630 	dict_table_t*		table = psort_info->psort_common->new_table;
631 	dtype_t			word_dtype;
632 	dict_field_t*		idx_field;
633 	fts_tokenize_ctx_t	t_ctx;
634 	ulint			retried = 0;
635 	dberr_t			error = DB_SUCCESS;
636 
637 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
638 
639 	const char*		path = thd_innodb_tmpdir(
640 		psort_info->psort_common->trx->mysql_thd);
641 
642 	ut_ad(psort_info);
643 
644 	buf = psort_info->merge_buf;
645 	merge_file = psort_info->merge_file;
646 	blob_heap = mem_heap_create(512);
647 	memset(&doc, 0, sizeof(doc));
648 	memset(&t_ctx, 0, sizeof(t_ctx));
649 	memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
650 
651 	doc.charset = fts_index_get_charset(
652 		psort_info->psort_common->dup->index);
653 
654 	idx_field = dict_index_get_nth_field(
655 		psort_info->psort_common->dup->index, 0);
656 	word_dtype.prtype = idx_field->col->prtype;
657 	word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
658 	word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
659 				? DATA_VARCHAR : DATA_VARMYSQL;
660 
661 	block = psort_info->merge_block;
662 	zip_size = dict_table_zip_size(table);
663 
664 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
665 
666 	t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
667 	processed = TRUE;
668 loop:
669 	while (doc_item) {
670 		dfield_t*	dfield = doc_item->field;
671 
672 		last_doc_id = doc_item->doc_id;
673 
674 		ut_ad (dfield->data != NULL
675 		       && dfield_get_len(dfield) != UNIV_SQL_NULL);
676 
677 		/* If finish processing the last item, update "doc" with
678 		strings in the doc_item, otherwise continue processing last
679 		item */
680 		if (processed) {
681 			byte*		data;
682 			ulint		data_len;
683 
684 			dfield = doc_item->field;
685 			data = static_cast<byte*>(dfield_get_data(dfield));
686 			data_len = dfield_get_len(dfield);
687 
688 			if (dfield_is_ext(dfield)) {
689 				doc.text.f_str =
690 					btr_copy_externally_stored_field(
691 						&doc.text.f_len, data,
692 						zip_size, data_len, blob_heap);
693 			} else {
694 				doc.text.f_str = data;
695 				doc.text.f_len = data_len;
696 			}
697 
698 			doc.tokens = 0;
699 			t_ctx.processed_len = 0;
700 		} else {
701 			/* Not yet finish processing the "doc" on hand,
702 			continue processing it */
703 			ut_ad(doc.text.f_str);
704 			ut_ad(t_ctx.processed_len < doc.text.f_len);
705 		}
706 
707 		processed = row_merge_fts_doc_tokenize(
708 			buf, doc_item->doc_id, &doc,
709 			&word_dtype,
710 			merge_file, psort_info->psort_common->opt_doc_id_size,
711 			&t_ctx);
712 
713 		/* Current sort buffer full, need to recycle */
714 		if (!processed) {
715 			ut_ad(t_ctx.processed_len < doc.text.f_len);
716 			ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
717 			break;
718 		}
719 
720 		num_doc_processed++;
721 
722 		if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
723 			ib_logf(IB_LOG_LEVEL_INFO,
724 				"number of doc processed %d\n",
725 				(int) num_doc_processed);
726 #ifdef FTS_INTERNAL_DIAG_PRINT
727 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
728 				ib_logf(IB_LOG_LEVEL_INFO,
729 					"ID %d, partition %d, word "
730 					"%d\n",(int) psort_info->psort_id,
731 					(int) i, (int) mycount[i]);
732 			}
733 #endif
734 		}
735 
736 		mem_heap_empty(blob_heap);
737 
738 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
739 
740 		if (doc_item && last_doc_id != doc_item->doc_id) {
741 			t_ctx.init_pos = 0;
742 		}
743 	}
744 
745 	/* If we run out of current sort buffer, need to sort
746 	and flush the sort buffer to disk */
747 	if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
748 		row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
749 		row_merge_buf_write(buf[t_ctx.buf_used],
750 				    merge_file[t_ctx.buf_used],
751 				    block[t_ctx.buf_used]);
752 
753 		if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
754 				     merge_file[t_ctx.buf_used]->offset++,
755 				     block[t_ctx.buf_used])) {
756 			error = DB_TEMP_FILE_WRITE_FAILURE;
757 			goto func_exit;
758 		}
759 
760 		UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
761 		buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
762 		mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
763 		t_ctx.rows_added[t_ctx.buf_used] = 0;
764 
765 		ut_a(doc_item);
766 		goto loop;
767 	}
768 
769 	/* Parent done scanning, and if finish processing all the docs, exit */
770 	if (psort_info->state == FTS_PARENT_COMPLETE) {
771 		if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
772 			goto exit;
773 		} else if (retried > 10000) {
774 			ut_ad(!doc_item);
775 			/* retied too many times and cannot get new record */
776 			ib_logf(IB_LOG_LEVEL_ERROR,
777 					"InnoDB: FTS parallel sort processed "
778 					"%lu records, the sort queue has "
779 					"%lu records. But sort cannot get "
780 					"the next records", num_doc_processed,
781 					UT_LIST_GET_LEN(
782 						psort_info->fts_doc_list));
783 			goto exit;
784 		}
785 	} else if (psort_info->state == FTS_PARENT_EXITING) {
786 		/* Parent abort */
787 		goto func_exit;
788 	}
789 
790 	if (doc_item == NULL) {
791 		os_thread_yield();
792 	}
793 
794 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
795 
796 	if (doc_item != NULL) {
797 		if (last_doc_id != doc_item->doc_id) {
798 			t_ctx.init_pos = 0;
799 		}
800 
801 		retried = 0;
802 	} else if (psort_info->state == FTS_PARENT_COMPLETE) {
803 		retried++;
804 	}
805 
806 	goto loop;
807 
808 exit:
809 	/* Do a final sort of the last (or latest) batch of records
810 	in block memory. Flush them to temp file if records cannot
811 	be hold in one block memory */
812 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
813 		if (t_ctx.rows_added[i]) {
814 			row_merge_buf_sort(buf[i], NULL);
815 			row_merge_buf_write(
816 				buf[i], merge_file[i], block[i]);
817 
818 			/* Write to temp file, only if records have
819 			been flushed to temp file before (offset > 0):
820 			The pseudo code for sort is following:
821 
822 				while (there are rows) {
823 					tokenize rows, put result in block[]
824 					if (block[] runs out) {
825 						sort rows;
826 						write to temp file with
827 						row_merge_write();
828 						offset++;
829 					}
830 				}
831 
832 				# write out the last batch
833 				if (offset > 0) {
834 					row_merge_write();
835 					offset++;
836 				} else {
837 					# no need to write anything
838 					offset stay as 0
839 				}
840 
841 			so if merge_file[i]->offset is 0 when we come to
842 			here as the last batch, this means rows have
843 			never flush to temp file, it can be held all in
844 			memory */
845 			if (merge_file[i]->offset != 0) {
846 				if (!row_merge_write(merge_file[i]->fd,
847 						merge_file[i]->offset++,
848 						block[i])) {
849 					error = DB_TEMP_FILE_WRITE_FAILURE;
850 					goto func_exit;
851 				}
852 
853 				UNIV_MEM_INVALID(block[i][0],
854 						 srv_sort_buf_size);
855 			}
856 
857 			buf[i] = row_merge_buf_empty(buf[i]);
858 			t_ctx.rows_added[i] = 0;
859 		}
860 	}
861 
862 	if (fts_enable_diag_print) {
863 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: start merge sort\n");
864 	}
865 
866 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
867 		if (!merge_file[i]->offset) {
868 			continue;
869 		}
870 
871 		tmpfd[i] = row_merge_file_create_low(path);
872 		if (tmpfd[i] < 0) {
873 			error = DB_OUT_OF_MEMORY;
874 			goto func_exit;
875 		}
876 
877 		error = row_merge_sort(psort_info->psort_common->trx,
878 				       psort_info->psort_common->dup,
879 				       merge_file[i], block[i], &tmpfd[i]);
880 		if (error != DB_SUCCESS) {
881 			close(tmpfd[i]);
882 			goto func_exit;
883 		}
884 
885 		total_rec += merge_file[i]->n_rec;
886 		close(tmpfd[i]);
887 	}
888 
889 func_exit:
890 	if (fts_enable_diag_print) {
891 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: complete merge sort\n");
892 	}
893 
894 	mem_heap_free(blob_heap);
895 
896 	mutex_enter(&psort_info->mutex);
897 	psort_info->error = error;
898 	mutex_exit(&psort_info->mutex);
899 
900 	if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
901 		/* child can exit either with error or told by parent. */
902 		ut_ad(error != DB_SUCCESS
903 		      || psort_info->state == FTS_PARENT_EXITING);
904 	}
905 
906 	/* Free fts doc list in case of error. */
907 	do {
908 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
909 	} while (doc_item != NULL);
910 
911 	psort_info->child_status = FTS_CHILD_COMPLETE;
912 	os_event_set(psort_info->psort_common->sort_event);
913 	psort_info->child_status = FTS_CHILD_EXITING;
914 
915 #ifdef __WIN__
916 	CloseHandle(psort_info->thread_hdl);
917 #endif /*__WIN__ */
918 
919 	os_thread_exit(NULL);
920 
921 	OS_THREAD_DUMMY_RETURN;
922 }
923 
924 /*********************************************************************//**
925 Start the parallel tokenization and parallel merge sort */
926 UNIV_INTERN
927 void
row_fts_start_psort(fts_psort_t * psort_info)928 row_fts_start_psort(
929 /*================*/
930 	fts_psort_t*	psort_info)	/*!< parallel sort structure */
931 {
932 	ulint		i = 0;
933 	os_thread_id_t	thd_id;
934 
935 	for (i = 0; i < fts_sort_pll_degree; i++) {
936 		psort_info[i].psort_id = i;
937 		psort_info[i].thread_hdl = os_thread_create(
938 			fts_parallel_tokenization,
939 			(void*) &psort_info[i], &thd_id);
940 	}
941 }
942 
943 /*********************************************************************//**
944 Function performs the merge and insertion of the sorted records.
945 @return OS_THREAD_DUMMY_RETURN */
946 UNIV_INTERN
947 os_thread_ret_t
fts_parallel_merge(void * arg)948 fts_parallel_merge(
949 /*===============*/
950 	void*		arg)		/*!< in: parallel merge info */
951 {
952 	fts_psort_t*	psort_info = (fts_psort_t*) arg;
953 	ulint		id;
954 
955 	ut_ad(psort_info);
956 
957 	id = psort_info->psort_id;
958 
959 	row_fts_merge_insert(psort_info->psort_common->dup->index,
960 			     psort_info->psort_common->new_table,
961 			     psort_info->psort_common->all_info, id);
962 
963 	psort_info->child_status = FTS_CHILD_COMPLETE;
964 	os_event_set(psort_info->psort_common->merge_event);
965 	psort_info->child_status = FTS_CHILD_EXITING;
966 
967 #ifdef __WIN__
968 	CloseHandle(psort_info->thread_hdl);
969 #endif /*__WIN__ */
970 
971 	os_thread_exit(NULL, false);
972 
973 	OS_THREAD_DUMMY_RETURN;
974 }
975 
976 /*********************************************************************//**
977 Kick off the parallel merge and insert thread */
978 UNIV_INTERN
979 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)980 row_fts_start_parallel_merge(
981 /*=========================*/
982 	fts_psort_t*	merge_info)	/*!< in: parallel sort info */
983 {
984 	int		i = 0;
985 	os_thread_id_t	thd_id;
986 
987 	/* Kick off merge/insert threads */
988 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
989 		merge_info[i].psort_id = i;
990 		merge_info[i].child_status = 0;
991 
992 		merge_info[i].thread_hdl = os_thread_create(
993 			fts_parallel_merge, (void*) &merge_info[i], &thd_id);
994 	}
995 }
996 
997 /********************************************************************//**
998 Insert processed FTS data to auxillary index tables.
999 @return	DB_SUCCESS if insertion runs fine */
1000 static MY_ATTRIBUTE((nonnull))
1001 dberr_t
row_merge_write_fts_word(trx_t * trx,que_t ** ins_graph,fts_tokenizer_word_t * word,fts_table_t * fts_table,CHARSET_INFO * charset)1002 row_merge_write_fts_word(
1003 /*=====================*/
1004 	trx_t*		trx,		/*!< in: transaction */
1005 	que_t**		ins_graph,	/*!< in: Insert query graphs */
1006 	fts_tokenizer_word_t* word,	/*!< in: sorted and tokenized
1007 					word */
1008 	fts_table_t*	fts_table,	/*!< in: fts aux table instance */
1009 	CHARSET_INFO*	charset)	/*!< in: charset */
1010 {
1011 	ulint	selected;
1012 	dberr_t	ret = DB_SUCCESS;
1013 
1014 	selected = fts_select_index(
1015 		charset, word->text.f_str, word->text.f_len);
1016 	fts_table->suffix = fts_get_suffix(selected);
1017 
1018 	/* Pop out each fts_node in word->nodes write them to auxiliary table */
1019 	while (ib_vector_size(word->nodes) > 0) {
1020 		dberr_t		error;
1021 		fts_node_t*	fts_node;
1022 
1023 		fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
1024 
1025 		error = fts_write_node(
1026 			trx, &ins_graph[selected], fts_table, &word->text,
1027 			fts_node);
1028 
1029 		if (error != DB_SUCCESS) {
1030 			fprintf(stderr, "InnoDB: failed to write"
1031 				" word %s to FTS auxiliary index"
1032 				" table, error (%s) \n",
1033 				word->text.f_str, ut_strerr(error));
1034 			ret = error;
1035 		}
1036 
1037 		ut_free(fts_node->ilist);
1038 		fts_node->ilist = NULL;
1039 	}
1040 
1041 	return(ret);
1042 }
1043 
1044 /*********************************************************************//**
1045 Read sorted FTS data files and insert data tuples to auxillary tables.
1046 @return	DB_SUCCESS or error number */
1047 UNIV_INTERN
1048 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1049 row_fts_insert_tuple(
1050 /*=================*/
1051 	fts_psort_insert_t*
1052 			ins_ctx,	/*!< in: insert context */
1053 	fts_tokenizer_word_t* word,	/*!< in: last processed
1054 					tokenized word */
1055 	ib_vector_t*	positions,	/*!< in: word position */
1056 	doc_id_t*	in_doc_id,	/*!< in: last item doc id */
1057 	dtuple_t*	dtuple)		/*!< in: entry to insert */
1058 {
1059 	fts_node_t*	fts_node = NULL;
1060 	dfield_t*	dfield;
1061 	doc_id_t	doc_id;
1062 	ulint		position;
1063 	fts_string_t	token_word;
1064 	ulint		i;
1065 
1066 	/* Get fts_node for the FTS auxillary INDEX table */
1067 	if (ib_vector_size(word->nodes) > 0) {
1068 		fts_node = static_cast<fts_node_t*>(
1069 			ib_vector_last(word->nodes));
1070 	}
1071 
1072 	if (fts_node == NULL
1073 	    || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1074 
1075 		fts_node = static_cast<fts_node_t*>(
1076 			ib_vector_push(word->nodes, NULL));
1077 
1078 		memset(fts_node, 0x0, sizeof(*fts_node));
1079 	}
1080 
1081 	/* If dtuple == NULL, this is the last word to be processed */
1082 	if (!dtuple) {
1083 		if (fts_node && ib_vector_size(positions) > 0) {
1084 			fts_cache_node_add_positions(
1085 				NULL, fts_node, *in_doc_id,
1086 				positions);
1087 
1088 			/* Write out the current word */
1089 			row_merge_write_fts_word(ins_ctx->trx,
1090 						 ins_ctx->ins_graph, word,
1091 						 &ins_ctx->fts_table,
1092 						 ins_ctx->charset);
1093 
1094 		}
1095 
1096 		return;
1097 	}
1098 
1099 	/* Get the first field for the tokenized word */
1100 	dfield = dtuple_get_nth_field(dtuple, 0);
1101 
1102 	token_word.f_n_char = 0;
1103 	token_word.f_len = dfield->len;
1104 	token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1105 
1106 	if (!word->text.f_str) {
1107 		fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1108 	}
1109 
1110 	/* compare to the last word, to see if they are the same
1111 	word */
1112 	if (innobase_fts_text_cmp(ins_ctx->charset,
1113 				  &word->text, &token_word) != 0) {
1114 		ulint	num_item;
1115 
1116 		/* Getting a new word, flush the last position info
1117 		for the currnt word in fts_node */
1118 		if (ib_vector_size(positions) > 0) {
1119 			fts_cache_node_add_positions(
1120 				NULL, fts_node, *in_doc_id, positions);
1121 		}
1122 
1123 		/* Write out the current word */
1124 		row_merge_write_fts_word(ins_ctx->trx, ins_ctx->ins_graph,
1125 					 word, &ins_ctx->fts_table,
1126 					 ins_ctx->charset);
1127 
1128 		/* Copy the new word */
1129 		fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1130 
1131 		num_item = ib_vector_size(positions);
1132 
1133 		/* Clean up position queue */
1134 		for (i = 0; i < num_item; i++) {
1135 			ib_vector_pop(positions);
1136 		}
1137 
1138 		/* Reset Doc ID */
1139 		*in_doc_id = 0;
1140 		memset(fts_node, 0x0, sizeof(*fts_node));
1141 	}
1142 
1143 	/* Get the word's Doc ID */
1144 	dfield = dtuple_get_nth_field(dtuple, 1);
1145 
1146 	if (!ins_ctx->opt_doc_id_size) {
1147 		doc_id = fts_read_doc_id(
1148 			static_cast<byte*>(dfield_get_data(dfield)));
1149 	} else {
1150 		doc_id = (doc_id_t) mach_read_from_4(
1151 			static_cast<byte*>(dfield_get_data(dfield)));
1152 	}
1153 
1154 	/* Get the word's position info */
1155 	dfield = dtuple_get_nth_field(dtuple, 2);
1156 	position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1157 
1158 	/* If this is the same word as the last word, and they
1159 	have the same Doc ID, we just need to add its position
1160 	info. Otherwise, we will flush position info to the
1161 	fts_node and initiate a new position vector  */
1162 	if (!(*in_doc_id) || *in_doc_id == doc_id) {
1163 		ib_vector_push(positions, &position);
1164 	} else {
1165 		ulint	num_pos = ib_vector_size(positions);
1166 
1167 		fts_cache_node_add_positions(NULL, fts_node,
1168 					     *in_doc_id, positions);
1169 		for (i = 0; i < num_pos; i++) {
1170 			ib_vector_pop(positions);
1171 		}
1172 		ib_vector_push(positions, &position);
1173 	}
1174 
1175 	/* record the current Doc ID */
1176 	*in_doc_id = doc_id;
1177 }
1178 
1179 /*********************************************************************//**
1180 Propagate a newly added record up one level in the selection tree
1181 @return parent where this value propagated to */
1182 static
1183 int
row_fts_sel_tree_propagate(int propogated,int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1184 row_fts_sel_tree_propagate(
1185 /*=======================*/
1186 	int		propogated,	/*<! in: tree node propagated */
1187 	int*		sel_tree,	/*<! in: selection tree */
1188 	const mrec_t**	mrec,		/*<! in: sort record */
1189 	ulint**		offsets,	/*<! in: record offsets */
1190 	dict_index_t*	index)		/*<! in/out: FTS index */
1191 {
1192 	ulint	parent;
1193 	int	child_left;
1194 	int	child_right;
1195 	int	selected;
1196 
1197 	/* Find which parent this value will be propagated to */
1198 	parent = (propogated - 1) / 2;
1199 
1200 	/* Find out which value is smaller, and to propagate */
1201 	child_left = sel_tree[parent * 2 + 1];
1202 	child_right = sel_tree[parent * 2 + 2];
1203 
1204 	if (child_left == -1 || mrec[child_left] == NULL) {
1205 		if (child_right == -1
1206 		    || mrec[child_right] == NULL) {
1207 			selected = -1;
1208 		} else {
1209 			selected = child_right ;
1210 		}
1211 	} else if (child_right == -1
1212 		   || mrec[child_right] == NULL) {
1213 		selected = child_left;
1214 	} else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1215 				      offsets[child_left],
1216 				      offsets[child_right],
1217 				      index, NULL) < 0) {
1218 		selected = child_left;
1219 	} else {
1220 		selected = child_right;
1221 	}
1222 
1223 	sel_tree[parent] = selected;
1224 
1225 	return(static_cast<int>(parent));
1226 }
1227 
1228 /*********************************************************************//**
1229 Readjust selection tree after popping the root and read a new value
1230 @return the new root */
1231 static
1232 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1233 row_fts_sel_tree_update(
1234 /*====================*/
1235 	int*		sel_tree,	/*<! in/out: selection tree */
1236 	ulint		propagated,	/*<! in: node to propagate up */
1237 	ulint		height,		/*<! in: tree height */
1238 	const mrec_t**	mrec,		/*<! in: sort record */
1239 	ulint**		offsets,	/*<! in: record offsets */
1240 	dict_index_t*	index)		/*<! in: index dictionary */
1241 {
1242 	ulint	i;
1243 
1244 	for (i = 1; i <= height; i++) {
1245 		propagated = static_cast<ulint>(row_fts_sel_tree_propagate(
1246 			static_cast<int>(propagated), sel_tree, mrec, offsets, index));
1247 	}
1248 
1249 	return(sel_tree[0]);
1250 }
1251 
1252 /*********************************************************************//**
1253 Build selection tree at a specified level */
1254 static
1255 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1256 row_fts_build_sel_tree_level(
1257 /*=========================*/
1258 	int*		sel_tree,	/*<! in/out: selection tree */
1259 	ulint		level,		/*<! in: selection tree level */
1260 	const mrec_t**	mrec,		/*<! in: sort record */
1261 	ulint**		offsets,	/*<! in: record offsets */
1262 	dict_index_t*	index)		/*<! in: index dictionary */
1263 {
1264 	ulint	start;
1265 	int	child_left;
1266 	int	child_right;
1267 	ulint	i;
1268 	ulint	num_item;
1269 
1270 	start = static_cast<ulint>((1 << level) - 1);
1271 	num_item = static_cast<ulint>(1 << level);
1272 
1273 	for (i = 0; i < num_item;  i++) {
1274 		child_left = sel_tree[(start + i) * 2 + 1];
1275 		child_right = sel_tree[(start + i) * 2 + 2];
1276 
1277 		if (child_left == -1) {
1278 			if (child_right == -1) {
1279 				sel_tree[start + i] = -1;
1280 			} else {
1281 				sel_tree[start + i] =  child_right;
1282 			}
1283 			continue;
1284 		} else if (child_right == -1) {
1285 			sel_tree[start + i] = child_left;
1286 			continue;
1287 		}
1288 
1289 		/* Deal with NULL child conditions */
1290 		if (!mrec[child_left]) {
1291 			if (!mrec[child_right]) {
1292 				sel_tree[start + i] = -1;
1293 			} else {
1294 				sel_tree[start + i] = child_right;
1295 			}
1296 			continue;
1297 		} else if (!mrec[child_right]) {
1298 			sel_tree[start + i] = child_left;
1299 			continue;
1300 		}
1301 
1302 		/* Select the smaller one to set parent pointer */
1303 		int cmp = cmp_rec_rec_simple(
1304 			mrec[child_left], mrec[child_right],
1305 			offsets[child_left], offsets[child_right],
1306 			index, NULL);
1307 
1308 		sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1309 	}
1310 }
1311 
1312 /*********************************************************************//**
1313 Build a selection tree for merge. The selection tree is a binary tree
1314 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1315 @return number of tree levels */
1316 static
1317 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1318 row_fts_build_sel_tree(
1319 /*===================*/
1320 	int*		sel_tree,	/*<! in/out: selection tree */
1321 	const mrec_t**	mrec,		/*<! in: sort record */
1322 	ulint**		offsets,	/*<! in: record offsets */
1323 	dict_index_t*	index)		/*<! in: index dictionary */
1324 {
1325 	ulint	treelevel = 1;
1326 	ulint	num = 2;
1327 	int	i = 0;
1328 	ulint	start;
1329 
1330 	/* No need to build selection tree if we only have two merge threads */
1331 	if (fts_sort_pll_degree <= 2) {
1332 		return(0);
1333 	}
1334 
1335 	while (num < fts_sort_pll_degree) {
1336 		num = num << 1;
1337 		treelevel++;
1338 	}
1339 
1340 	start = (1 << treelevel) - 1;
1341 
1342 	for (i = 0; i < (int) fts_sort_pll_degree; i++) {
1343 		sel_tree[i + start] = i;
1344 	}
1345 
1346 	for (i = static_cast<int>(treelevel) - 1; i >= 0; i--) {
1347 		row_fts_build_sel_tree_level(
1348 			sel_tree, static_cast<ulint>(i), mrec, offsets, index);
1349 	}
1350 
1351 	return(treelevel);
1352 }
1353 
1354 /*********************************************************************//**
1355 Read sorted file containing index data tuples and insert these data
1356 tuples to the index
1357 @return	DB_SUCCESS or error number */
1358 UNIV_INTERN
1359 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1360 row_fts_merge_insert(
1361 /*=================*/
1362 	dict_index_t*		index,	/*!< in: index */
1363 	dict_table_t*		table,	/*!< in: new table */
1364 	fts_psort_t*		psort_info, /*!< parallel sort info */
1365 	ulint			id)	/* !< in: which auxiliary table's data
1366 					to insert to */
1367 {
1368 	const byte**		b;
1369 	mem_heap_t*		tuple_heap;
1370 	mem_heap_t*		heap;
1371 	dberr_t			error = DB_SUCCESS;
1372 	ulint*			foffs;
1373 	ulint**			offsets;
1374 	fts_tokenizer_word_t	new_word;
1375 	ib_vector_t*		positions;
1376 	doc_id_t		last_doc_id;
1377 	ib_alloc_t*		heap_alloc;
1378 	ulint			n_bytes;
1379 	ulint			i;
1380 	mrec_buf_t**		buf;
1381 	int*			fd;
1382 	byte**			block;
1383 	const mrec_t**		mrec;
1384 	ulint			count = 0;
1385 	int*			sel_tree;
1386 	ulint			height;
1387 	ulint			start;
1388 	fts_psort_insert_t	ins_ctx;
1389 	ulint			count_diag = 0;
1390 
1391 	ut_ad(index);
1392 	ut_ad(table);
1393 
1394 	/* We use the insert query graph as the dummy graph
1395 	needed in the row module call */
1396 
1397 	ins_ctx.trx = trx_allocate_for_background();
1398 
1399 	ins_ctx.trx->op_info = "inserting index entries";
1400 
1401 	ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1402 
1403 	heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1404 
1405 	b = (const byte**) mem_heap_alloc(
1406 		heap, sizeof (*b) * fts_sort_pll_degree);
1407 	foffs = (ulint*) mem_heap_alloc(
1408 		heap, sizeof(*foffs) * fts_sort_pll_degree);
1409 	offsets = (ulint**) mem_heap_alloc(
1410 		heap, sizeof(*offsets) * fts_sort_pll_degree);
1411 	buf = (mrec_buf_t**) mem_heap_alloc(
1412 		heap, sizeof(*buf) * fts_sort_pll_degree);
1413 	fd = (int*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1414 	block = (byte**) mem_heap_alloc(
1415 		heap, sizeof(*block) * fts_sort_pll_degree);
1416 	mrec = (const mrec_t**) mem_heap_alloc(
1417 		heap, sizeof(*mrec) * fts_sort_pll_degree);
1418 	sel_tree = (int*) mem_heap_alloc(
1419 		heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1420 
1421 	tuple_heap = mem_heap_create(1000);
1422 
1423 	ins_ctx.charset = fts_index_get_charset(index);
1424 	ins_ctx.heap = heap;
1425 
1426 	for (i = 0; i < fts_sort_pll_degree; i++) {
1427 		ulint	num;
1428 
1429 		num = 1 + REC_OFFS_HEADER_SIZE
1430 			+ dict_index_get_n_fields(index);
1431 		offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1432 			heap, num * sizeof *offsets[i]));
1433 		offsets[i][0] = num;
1434 		offsets[i][1] = dict_index_get_n_fields(index);
1435 		block[i] = psort_info[i].merge_block[id];
1436 		b[i] = psort_info[i].merge_block[id];
1437 		fd[i] = psort_info[i].merge_file[id]->fd;
1438 		foffs[i] = 0;
1439 
1440 		buf[i] = static_cast<unsigned char (*)[16384]>(
1441 			mem_heap_alloc(heap, sizeof *buf[i]));
1442 		count_diag += (int) psort_info[i].merge_file[id]->n_rec;
1443 	}
1444 
1445 	if (fts_enable_diag_print) {
1446 		ut_print_timestamp(stderr);
1447 		fprintf(stderr, "  InnoDB_FTS: to inserted %lu records\n",
1448 			(ulong) count_diag);
1449 	}
1450 
1451 	/* Initialize related variables if creating FTS indexes */
1452 	heap_alloc = ib_heap_allocator_create(heap);
1453 
1454 	memset(&new_word, 0, sizeof(new_word));
1455 
1456 	new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1457 	positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1458 	last_doc_id = 0;
1459 
1460 	/* Allocate insert query graphs for FTS auxillary
1461 	Index Table, note we have FTS_NUM_AUX_INDEX such index tables */
1462 	n_bytes = sizeof(que_t*) * (FTS_NUM_AUX_INDEX + 1);
1463 	ins_ctx.ins_graph = static_cast<que_t**>(mem_heap_alloc(heap, n_bytes));
1464 	memset(ins_ctx.ins_graph, 0x0, n_bytes);
1465 
1466 	/* We should set the flags2 with aux_table_name here,
1467 	in order to get the correct aux table names. */
1468 	index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1469 	DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1470 			index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
1471 
1472 	ins_ctx.fts_table.type = FTS_INDEX_TABLE;
1473 	ins_ctx.fts_table.index_id = index->id;
1474 	ins_ctx.fts_table.table_id = table->id;
1475 	ins_ctx.fts_table.parent = index->table->name;
1476 	ins_ctx.fts_table.table = index->table;
1477 
1478 	for (i = 0; i < fts_sort_pll_degree; i++) {
1479 		if (psort_info[i].merge_file[id]->n_rec == 0) {
1480 			/* No Rows to read */
1481 			mrec[i] = b[i] = NULL;
1482 		} else {
1483 			/* Read from temp file only if it has been
1484 			written to. Otherwise, block memory holds
1485 			all the sorted records */
1486 			if (psort_info[i].merge_file[id]->offset > 0
1487 			    && (!row_merge_read(
1488 					fd[i], foffs[i],
1489 					(row_merge_block_t*) block[i]))) {
1490 				error = DB_CORRUPTION;
1491 				goto exit;
1492 			}
1493 
1494 			ROW_MERGE_READ_GET_NEXT(i);
1495 		}
1496 	}
1497 
1498 	height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1499 					offsets, index);
1500 
1501 	start = (1 << height) - 1;
1502 
1503 	/* Fetch sorted records from sort buffer and insert them into
1504 	corresponding FTS index auxiliary tables */
1505 	for (;;) {
1506 		dtuple_t*	dtuple;
1507 		ulint		n_ext;
1508 		int		min_rec = 0;
1509 
1510 		if (fts_sort_pll_degree <= 2) {
1511 			while (!mrec[min_rec]) {
1512 				min_rec++;
1513 
1514 				if (min_rec >= (int) fts_sort_pll_degree) {
1515 					row_fts_insert_tuple(
1516 						&ins_ctx, &new_word,
1517 						positions, &last_doc_id,
1518 						NULL);
1519 
1520 					goto exit;
1521 				}
1522 			}
1523 
1524 			for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1525 				if (!mrec[i]) {
1526 					continue;
1527 				}
1528 
1529 				if (cmp_rec_rec_simple(
1530 					    mrec[i], mrec[min_rec],
1531 					    offsets[i], offsets[min_rec],
1532 					    index, NULL) < 0) {
1533 					min_rec = static_cast<int>(i);
1534 				}
1535 			}
1536 		} else {
1537 			min_rec = sel_tree[0];
1538 
1539 			if (min_rec ==  -1) {
1540 				row_fts_insert_tuple(
1541 					&ins_ctx, &new_word,
1542 					positions, &last_doc_id,
1543 					NULL);
1544 
1545 				goto exit;
1546 			}
1547 		}
1548 
1549 		dtuple = row_rec_to_index_entry_low(
1550 			mrec[min_rec], index, offsets[min_rec], &n_ext,
1551 			tuple_heap);
1552 
1553 		row_fts_insert_tuple(
1554 			&ins_ctx, &new_word, positions,
1555 			&last_doc_id, dtuple);
1556 
1557 
1558 		ROW_MERGE_READ_GET_NEXT(min_rec);
1559 
1560 		if (fts_sort_pll_degree > 2) {
1561 			if (!mrec[min_rec]) {
1562 				sel_tree[start + min_rec] = -1;
1563 			}
1564 
1565 			row_fts_sel_tree_update(sel_tree, start + min_rec,
1566 						height, mrec,
1567 						offsets, index);
1568 		}
1569 
1570 		count++;
1571 
1572 		mem_heap_empty(tuple_heap);
1573 	}
1574 
1575 exit:
1576 	fts_sql_commit(ins_ctx.trx);
1577 
1578 	ins_ctx.trx->op_info = "";
1579 
1580 	mem_heap_free(tuple_heap);
1581 
1582 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1583 		if (ins_ctx.ins_graph[i]) {
1584 			fts_que_graph_free(ins_ctx.ins_graph[i]);
1585 		}
1586 	}
1587 
1588 	trx_free_for_background(ins_ctx.trx);
1589 
1590 	mem_heap_free(heap);
1591 
1592 	if (fts_enable_diag_print) {
1593 		ut_print_timestamp(stderr);
1594 		fprintf(stderr, "  InnoDB_FTS: inserted %lu records\n",
1595 			(ulong) count);
1596 	}
1597 
1598 	return(error);
1599 }
1600