1 /*****************************************************************************
2 
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8 
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation.  The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15 
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 GNU General Public License, version 2.0, for more details.
20 
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24 
25 *****************************************************************************/
26 
27 /**************************************************//**
28 @file row/row0ftsort.cc
29 Create Full Text Index with (parallel) merge sort
30 
31 Created 10/13/2010 Jimmy Yang
32 *******************************************************/
33 
34 #include "dict0dict.h" /* dict_table_stats_lock() */
35 #include "row0merge.h"
36 #include "pars0pars.h"
37 #include "row0ftsort.h"
38 #include "row0merge.h"
39 #include "row0row.h"
40 #include "btr0cur.h"
41 
42 /** Read the next record to buffer N.
43 @param N	index into array of merge info structure */
44 #define ROW_MERGE_READ_GET_NEXT(N)					\
45 	do {								\
46 		b[N] = row_merge_read_rec(				\
47 			block[N], buf[N], b[N], index,			\
48 			fd[N], &foffs[N], &mrec[N], offsets[N]);	\
49 		if (UNIV_UNLIKELY(!b[N])) {				\
50 			if (mrec[N]) {					\
51 				goto exit;				\
52 			}						\
53 		}							\
54 	} while (0)
55 
56 /** Parallel sort degree */
57 UNIV_INTERN ulong	fts_sort_pll_degree	= 2;
58 
59 /*********************************************************************//**
60 Create a temporary "fts sort index" used to merge sort the
61 tokenized doc string. The index has three "fields":
62 
63 1) Tokenized word,
64 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
65 integer value)
66 3) Word's position in original doc.
67 
68 @return dict_index_t structure for the fts sort index */
69 UNIV_INTERN
70 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,const dict_table_t * table,ibool * opt_doc_id_size)71 row_merge_create_fts_sort_index(
72 /*============================*/
73 	dict_index_t*		index,	/*!< in: Original FTS index
74 					based on which this sort index
75 					is created */
76 	const dict_table_t*	table,	/*!< in: table that FTS index
77 					is being created on */
78 	ibool*			opt_doc_id_size)
79 					/*!< out: whether to use 4 bytes
80 					instead of 8 bytes integer to
81 					store Doc ID during sort */
82 {
83 	dict_index_t*   new_index;
84 	dict_field_t*   field;
85 	dict_field_t*   idx_field;
86 	CHARSET_INFO*	charset;
87 
88 	// FIXME: This name shouldn't be hard coded here.
89 	new_index = dict_mem_index_create(
90 		index->table->name, "tmp_fts_idx", 0, DICT_FTS, 3);
91 
92 	new_index->id = index->id;
93 	new_index->table = (dict_table_t*) table;
94 	new_index->n_uniq = FTS_NUM_FIELDS_SORT;
95 	new_index->n_def = FTS_NUM_FIELDS_SORT;
96 	new_index->cached = TRUE;
97 
98 	idx_field = dict_index_get_nth_field(index, 0);
99 	charset = fts_index_get_charset(index);
100 
101 	/* The first field is on the Tokenized Word */
102 	field = dict_index_get_nth_field(new_index, 0);
103 	field->name = NULL;
104 	field->prefix_len = 0;
105 	field->col = static_cast<dict_col_t*>(
106 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
107 	field->col->len = FTS_MAX_WORD_LEN;
108 
109 	if (strcmp(charset->name, "latin1_swedish_ci") == 0) {
110 		field->col->mtype = DATA_VARCHAR;
111 	} else {
112 		field->col->mtype = DATA_VARMYSQL;
113 	}
114 
115 	field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
116 	field->col->mbminmaxlen = idx_field->col->mbminmaxlen;
117 	field->fixed_len = 0;
118 
119 	/* Doc ID */
120 	field = dict_index_get_nth_field(new_index, 1);
121 	field->name = NULL;
122 	field->prefix_len = 0;
123 	field->col = static_cast<dict_col_t*>(
124 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
125 	field->col->mtype = DATA_INT;
126 	*opt_doc_id_size = FALSE;
127 
128 	/* Check whether we can use 4 bytes instead of 8 bytes integer
129 	field to hold the Doc ID, thus reduce the overall sort size */
130 	if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
131 		/* If Doc ID column is being added by this create
132 		index, then just check the number of rows in the table */
133 		if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
134 			*opt_doc_id_size = TRUE;
135 		}
136 	} else {
137 		doc_id_t	max_doc_id;
138 
139 		/* If the Doc ID column is supplied by user, then
140 		check the maximum Doc ID in the table */
141 		max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
142 
143 		if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
144 			*opt_doc_id_size = TRUE;
145 		}
146 	}
147 
148 	if (*opt_doc_id_size) {
149 		field->col->len = sizeof(ib_uint32_t);
150 		field->fixed_len = sizeof(ib_uint32_t);
151 	} else {
152 		field->col->len = FTS_DOC_ID_LEN;
153 		field->fixed_len = FTS_DOC_ID_LEN;
154 	}
155 
156 	field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
157 
158 	field->col->mbminmaxlen = 0;
159 
160 	/* The third field is on the word's position in the original doc */
161 	field = dict_index_get_nth_field(new_index, 2);
162 	field->name = NULL;
163 	field->prefix_len = 0;
164 	field->col = static_cast<dict_col_t*>(
165 		mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
166 	field->col->mtype = DATA_INT;
167 	field->col->len = 4 ;
168 	field->fixed_len = 4;
169 	field->col->prtype = DATA_NOT_NULL;
170 	field->col->mbminmaxlen = 0;
171 
172 	return(new_index);
173 }
174 /*********************************************************************//**
175 Initialize FTS parallel sort structures.
176 @return TRUE if all successful */
177 UNIV_INTERN
178 ibool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,const dict_table_t * new_table,ibool opt_doc_id_size,fts_psort_t ** psort,fts_psort_t ** merge)179 row_fts_psort_info_init(
180 /*====================*/
181 	trx_t*			trx,	/*!< in: transaction */
182 	row_merge_dup_t*	dup,	/*!< in,own: descriptor of
183 					FTS index being created */
184 	const dict_table_t*	new_table,/*!< in: table on which indexes are
185 					created */
186 	ibool			opt_doc_id_size,
187 					/*!< in: whether to use 4 bytes
188 					instead of 8 bytes integer to
189 					store Doc ID during sort */
190 	fts_psort_t**		psort,	/*!< out: parallel sort info to be
191 					instantiated */
192 	fts_psort_t**		merge)	/*!< out: parallel merge info
193 					to be instantiated */
194 {
195 	ulint			i;
196 	ulint			j;
197 	fts_psort_common_t*	common_info = NULL;
198 	fts_psort_t*		psort_info = NULL;
199 	fts_psort_t*		merge_info = NULL;
200 	ulint			block_size;
201 	ibool			ret = TRUE;
202 
203 	block_size = 3 * srv_sort_buf_size;
204 
205 	*psort = psort_info = static_cast<fts_psort_t*>(mem_zalloc(
206 		 fts_sort_pll_degree * sizeof *psort_info));
207 
208 	if (!psort_info) {
209 		ut_free(dup);
210 		return(FALSE);
211 	}
212 
213 	/* Common Info for all sort threads */
214 	common_info = static_cast<fts_psort_common_t*>(
215 		mem_alloc(sizeof *common_info));
216 
217 	if (!common_info) {
218 		ut_free(dup);
219 		mem_free(psort_info);
220 		return(FALSE);
221 	}
222 
223 	common_info->dup = dup;
224 	common_info->new_table = (dict_table_t*) new_table;
225 	common_info->trx = trx;
226 	common_info->all_info = psort_info;
227 	common_info->sort_event = os_event_create();
228 	common_info->merge_event = os_event_create();
229 	common_info->opt_doc_id_size = opt_doc_id_size;
230 
231 	ut_ad(trx->mysql_thd != NULL);
232 	const char*	path = thd_innodb_tmpdir(trx->mysql_thd);
233 
234 	/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
235 	each parallel sort thread. Each "sort bucket" holds records for
236 	a particular "FTS index partition" */
237 	for (j = 0; j < fts_sort_pll_degree; j++) {
238 
239 		UT_LIST_INIT(psort_info[j].fts_doc_list);
240 
241 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
242 
243 			psort_info[j].merge_file[i] =
244 				 static_cast<merge_file_t*>(
245 					mem_zalloc(sizeof(merge_file_t)));
246 
247 			if (!psort_info[j].merge_file[i]) {
248 				ret = FALSE;
249 				goto func_exit;
250 			}
251 
252 			psort_info[j].merge_buf[i] = row_merge_buf_create(
253 				dup->index);
254 
255 			if (row_merge_file_create(psort_info[j].merge_file[i],
256 						  path) < 0) {
257 				goto func_exit;
258 			}
259 
260 			/* Need to align memory for O_DIRECT write */
261 			psort_info[j].block_alloc[i] =
262 				static_cast<row_merge_block_t*>(ut_malloc(
263 					block_size + 1024));
264 
265 			psort_info[j].merge_block[i] =
266 				static_cast<row_merge_block_t*>(
267 					ut_align(
268 					psort_info[j].block_alloc[i], 1024));
269 
270 			if (!psort_info[j].merge_block[i]) {
271 				ret = FALSE;
272 				goto func_exit;
273 			}
274 		}
275 
276 		psort_info[j].child_status = 0;
277 		psort_info[j].state = 0;
278 		psort_info[j].psort_common = common_info;
279 		psort_info[j].error = DB_SUCCESS;
280 		psort_info[j].memory_used = 0;
281 		mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE);
282 	}
283 
284 	/* Initialize merge_info structures parallel merge and insert
285 	into auxiliary FTS tables (FTS_INDEX_TABLE) */
286 	*merge = merge_info = static_cast<fts_psort_t*>(
287 		mem_alloc(FTS_NUM_AUX_INDEX * sizeof *merge_info));
288 
289 	for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
290 
291 		merge_info[j].child_status = 0;
292 		merge_info[j].state = 0;
293 		merge_info[j].psort_common = common_info;
294 	}
295 
296 func_exit:
297 	if (!ret) {
298 		row_fts_psort_info_destroy(psort_info, merge_info);
299 	}
300 
301 	return(ret);
302 }
303 /*********************************************************************//**
304 Clean up and deallocate FTS parallel sort structures, and close the
305 merge sort files  */
306 UNIV_INTERN
307 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)308 row_fts_psort_info_destroy(
309 /*=======================*/
310 	fts_psort_t*	psort_info,	/*!< parallel sort info */
311 	fts_psort_t*	merge_info)	/*!< parallel merge info */
312 {
313 	ulint	i;
314 	ulint	j;
315 
316 	if (psort_info) {
317 		for (j = 0; j < fts_sort_pll_degree; j++) {
318 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
319 				if (psort_info[j].merge_file[i]) {
320 					row_merge_file_destroy(
321 						psort_info[j].merge_file[i]);
322 				}
323 
324 				if (psort_info[j].block_alloc[i]) {
325 					ut_free(psort_info[j].block_alloc[i]);
326 				}
327 				mem_free(psort_info[j].merge_file[i]);
328 			}
329 
330 			mutex_free(&psort_info[j].mutex);
331 		}
332 
333 		os_event_free(merge_info[0].psort_common->sort_event);
334 		os_event_free(merge_info[0].psort_common->merge_event);
335 		ut_free(merge_info[0].psort_common->dup);
336 		mem_free(merge_info[0].psort_common);
337 		mem_free(psort_info);
338 	}
339 
340 	if (merge_info) {
341 		mem_free(merge_info);
342 	}
343 }
344 /*********************************************************************//**
345 Free up merge buffers when merge sort is done */
346 UNIV_INTERN
347 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)348 row_fts_free_pll_merge_buf(
349 /*=======================*/
350 	fts_psort_t*	psort_info)	/*!< in: parallel sort info */
351 {
352 	ulint	j;
353 	ulint	i;
354 
355 	if (!psort_info) {
356 		return;
357 	}
358 
359 	for (j = 0; j < fts_sort_pll_degree; j++) {
360 		for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
361 			row_merge_buf_free(psort_info[j].merge_buf[i]);
362 		}
363 	}
364 
365 	return;
366 }
367 
368 /*********************************************************************//**
369 Tokenize incoming text data and add to the sort buffer.
370 @return	TRUE if the record passed, FALSE if out of space */
371 static
372 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,dtype_t * word_dtype,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)373 row_merge_fts_doc_tokenize(
374 /*=======================*/
375 	row_merge_buf_t**	sort_buf,	/*!< in/out: sort buffer */
376 	doc_id_t		doc_id,		/*!< in: Doc ID */
377 	fts_doc_t*		doc,		/*!< in: Doc to be tokenized */
378 	dtype_t*		word_dtype,	/*!< in: data structure for
379 						word col */
380 	merge_file_t**		merge_file,	/*!< in/out: merge file */
381 	ibool			opt_doc_id_size,/*!< in: whether to use 4 bytes
382 						instead of 8 bytes integer to
383 						store Doc ID during sort*/
384 	fts_tokenize_ctx_t*	t_ctx)          /*!< in/out: tokenize context */
385 {
386 	ulint		i;
387 	ulint		inc;
388 	fts_string_t	str;
389 	ulint		len;
390 	row_merge_buf_t* buf;
391 	dfield_t*	field;
392 	fts_string_t	t_str;
393 	ibool		buf_full = FALSE;
394 	byte		str_buf[FTS_MAX_WORD_LEN + 1];
395 	ulint		data_size[FTS_NUM_AUX_INDEX];
396 	ulint		n_tuple[FTS_NUM_AUX_INDEX];
397 
398 	t_str.f_n_char = 0;
399 	t_ctx->buf_used = 0;
400 
401 	memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
402 	memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
403 
404 	/* Tokenize the data and add each word string, its corresponding
405 	doc id and position to sort buffer */
406 	for (i = t_ctx->processed_len; i < doc->text.f_len; i += inc) {
407 		ib_rbt_bound_t	parent;
408 		ulint		idx = 0;
409 		ib_uint32_t	position;
410 		ulint           offset = 0;
411 		ulint		cur_len = 0;
412 		doc_id_t	write_doc_id;
413 
414 		inc = innobase_mysql_fts_get_token(
415 			doc->charset, doc->text.f_str + i,
416 			doc->text.f_str + doc->text.f_len, &str, &offset);
417 
418 		ut_a(inc > 0);
419 
420 		/* Ignore string whose character number is less than
421 		"fts_min_token_size" or more than "fts_max_token_size" */
422 		if (str.f_n_char < fts_min_token_size
423 		    || str.f_n_char > fts_max_token_size) {
424 
425 			t_ctx->processed_len += inc;
426 			continue;
427 		}
428 
429 		t_str.f_len = innobase_fts_casedn_str(
430 			doc->charset, (char*) str.f_str, str.f_len,
431 			(char*) &str_buf, FTS_MAX_WORD_LEN + 1);
432 
433 		t_str.f_str = (byte*) &str_buf;
434 
435 		/* if "cached_stopword" is defined, ingore words in the
436 		stopword list */
437 		if (t_ctx->cached_stopword
438 		    && rbt_search(t_ctx->cached_stopword,
439 				  &parent, &t_str) == 0) {
440 
441 			t_ctx->processed_len += inc;
442 			continue;
443 		}
444 
445 		/* There are FTS_NUM_AUX_INDEX auxiliary tables, find
446 		out which sort buffer to put this word record in */
447 		t_ctx->buf_used = fts_select_index(
448 			doc->charset, t_str.f_str, t_str.f_len);
449 
450 		buf = sort_buf[t_ctx->buf_used];
451 
452 		ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
453 		idx = t_ctx->buf_used;
454 
455 		mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
456 
457 		field = mtuple->fields = static_cast<dfield_t*>(
458 			mem_heap_alloc(buf->heap,
459 				       FTS_NUM_FIELDS_SORT * sizeof *field));
460 
461 		/* The first field is the tokenized word */
462 		dfield_set_data(field, t_str.f_str, t_str.f_len);
463 		len = dfield_get_len(field);
464 
465 		field->type.mtype = word_dtype->mtype;
466 		field->type.prtype = word_dtype->prtype | DATA_NOT_NULL;
467 
468 		/* Variable length field, set to max size. */
469 		field->type.len = FTS_MAX_WORD_LEN;
470 		field->type.mbminmaxlen = word_dtype->mbminmaxlen;
471 
472 		cur_len += len;
473 		dfield_dup(field, buf->heap);
474 		field++;
475 
476 		/* The second field is the Doc ID */
477 
478 		ib_uint32_t	doc_id_32_bit;
479 
480 		if (!opt_doc_id_size) {
481 			fts_write_doc_id((byte*) &write_doc_id, doc_id);
482 
483 			dfield_set_data(
484 				field, &write_doc_id, sizeof(write_doc_id));
485 		} else {
486 			mach_write_to_4(
487 				(byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
488 
489 			dfield_set_data(
490 				field, &doc_id_32_bit, sizeof(doc_id_32_bit));
491 		}
492 
493 		len = field->len;
494 		ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
495 
496 		field->type.mtype = DATA_INT;
497 		field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
498 		field->type.len = len;
499 		field->type.mbminmaxlen = 0;
500 
501 		cur_len += len;
502 		dfield_dup(field, buf->heap);
503 
504 		++field;
505 
506 		/* The third field is the position */
507 		mach_write_to_4(
508 			(byte*) &position,
509 			(i + offset + inc - str.f_len + t_ctx->init_pos));
510 
511 		dfield_set_data(field, &position, sizeof(position));
512 		len = dfield_get_len(field);
513 		ut_ad(len == sizeof(ib_uint32_t));
514 
515 		field->type.mtype = DATA_INT;
516 		field->type.prtype = DATA_NOT_NULL;
517 		field->type.len = len;
518 		field->type.mbminmaxlen = 0;
519 		cur_len += len;
520 		dfield_dup(field, buf->heap);
521 
522 		/* One variable length column, word with its lenght less than
523 		fts_max_token_size, add one extra size and one extra byte.
524 
525 		Since the max length for FTS token now is larger than 255,
526 		so we will need to signify length byte itself, so only 1 to 128
527 		bytes can be used for 1 bytes, larger than that 2 bytes. */
528 		if (t_str.f_len < 128) {
529 			/* Extra size is one byte. */
530 			cur_len += 2;
531 		} else {
532 			/* Extra size is two bytes. */
533 			cur_len += 3;
534 		}
535 
536 		/* Reserve one byte for the end marker of row_merge_block_t. */
537 		if (buf->total_size + data_size[idx] + cur_len
538 		    >= srv_sort_buf_size - 1) {
539 
540 			buf_full = TRUE;
541 			break;
542 		}
543 
544 		/* Increment the number of tuples */
545 		n_tuple[idx]++;
546 		t_ctx->processed_len += inc;
547 		data_size[idx] += cur_len;
548 	}
549 
550 	/* Update the data length and the number of new word tuples
551 	added in this round of tokenization */
552 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
553 		/* The computation of total_size below assumes that no
554 		delete-mark flags will be stored and that all fields
555 		are NOT NULL and fixed-length. */
556 
557 		sort_buf[i]->total_size += data_size[i];
558 
559 		sort_buf[i]->n_tuples += n_tuple[i];
560 
561 		merge_file[i]->n_rec += n_tuple[i];
562 		t_ctx->rows_added[i] += n_tuple[i];
563 	}
564 
565 	if (!buf_full) {
566 		/* we pad one byte between text accross two fields */
567 		t_ctx->init_pos += doc->text.f_len + 1;
568 	}
569 
570 	return(!buf_full);
571 }
572 
573 /*********************************************************************//**
574 Get next doc item from fts_doc_list */
575 UNIV_INLINE
576 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)577 row_merge_fts_get_next_doc_item(
578 /*============================*/
579 	fts_psort_t*		psort_info,	/*!< in: psort_info */
580 	fts_doc_item_t**	doc_item)	/*!< in/out: doc item */
581 {
582 	if (*doc_item != NULL) {
583 		ut_free(*doc_item);
584 	}
585 
586 	mutex_enter(&psort_info->mutex);
587 
588 	*doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
589 	if (*doc_item != NULL) {
590 		UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list,
591 			       *doc_item);
592 
593 		ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
594 		      + (*doc_item)->field->len);
595 		psort_info->memory_used -= sizeof(fts_doc_item_t)
596 			+ (*doc_item)->field->len;
597 	}
598 
599 	mutex_exit(&psort_info->mutex);
600 }
601 
602 /*********************************************************************//**
603 Function performs parallel tokenization of the incoming doc strings.
604 It also performs the initial in memory sort of the parsed records.
605 @return OS_THREAD_DUMMY_RETURN */
606 UNIV_INTERN
607 os_thread_ret_t
fts_parallel_tokenization(void * arg)608 fts_parallel_tokenization(
609 /*======================*/
610 	void*		arg)	/*!< in: psort_info for the thread */
611 {
612 	fts_psort_t*		psort_info = (fts_psort_t*) arg;
613 	ulint			i;
614 	fts_doc_item_t*		doc_item = NULL;
615 	row_merge_buf_t**	buf;
616 	ibool			processed = FALSE;
617 	merge_file_t**		merge_file;
618 	row_merge_block_t**	block;
619 	int			tmpfd[FTS_NUM_AUX_INDEX];
620 	ulint			mycount[FTS_NUM_AUX_INDEX];
621 	ib_uint64_t		total_rec = 0;
622 	ulint			num_doc_processed = 0;
623 	doc_id_t		last_doc_id = 0;
624 	ulint			zip_size;
625 	mem_heap_t*		blob_heap = NULL;
626 	fts_doc_t		doc;
627 	dict_table_t*		table = psort_info->psort_common->new_table;
628 	dtype_t			word_dtype;
629 	dict_field_t*		idx_field;
630 	fts_tokenize_ctx_t	t_ctx;
631 	ulint			retried = 0;
632 	dberr_t			error = DB_SUCCESS;
633 
634 	ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
635 
636 	const char*		path = thd_innodb_tmpdir(
637 		psort_info->psort_common->trx->mysql_thd);
638 
639 	ut_ad(psort_info);
640 
641 	buf = psort_info->merge_buf;
642 	merge_file = psort_info->merge_file;
643 	blob_heap = mem_heap_create(512);
644 	memset(&doc, 0, sizeof(doc));
645 	memset(&t_ctx, 0, sizeof(t_ctx));
646 	memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
647 
648 	doc.charset = fts_index_get_charset(
649 		psort_info->psort_common->dup->index);
650 
651 	idx_field = dict_index_get_nth_field(
652 		psort_info->psort_common->dup->index, 0);
653 	word_dtype.prtype = idx_field->col->prtype;
654 	word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
655 	word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
656 				? DATA_VARCHAR : DATA_VARMYSQL;
657 
658 	block = psort_info->merge_block;
659 	zip_size = dict_table_zip_size(table);
660 
661 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
662 
663 	t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
664 	processed = TRUE;
665 loop:
666 	while (doc_item) {
667 		dfield_t*	dfield = doc_item->field;
668 
669 		last_doc_id = doc_item->doc_id;
670 
671 		ut_ad (dfield->data != NULL
672 		       && dfield_get_len(dfield) != UNIV_SQL_NULL);
673 
674 		/* If finish processing the last item, update "doc" with
675 		strings in the doc_item, otherwise continue processing last
676 		item */
677 		if (processed) {
678 			byte*		data;
679 			ulint		data_len;
680 
681 			dfield = doc_item->field;
682 			data = static_cast<byte*>(dfield_get_data(dfield));
683 			data_len = dfield_get_len(dfield);
684 
685 			if (dfield_is_ext(dfield)) {
686 				doc.text.f_str =
687 					btr_copy_externally_stored_field(
688 						&doc.text.f_len, data,
689 						zip_size, data_len, blob_heap);
690 			} else {
691 				doc.text.f_str = data;
692 				doc.text.f_len = data_len;
693 			}
694 
695 			doc.tokens = 0;
696 			t_ctx.processed_len = 0;
697 		} else {
698 			/* Not yet finish processing the "doc" on hand,
699 			continue processing it */
700 			ut_ad(doc.text.f_str);
701 			ut_ad(t_ctx.processed_len < doc.text.f_len);
702 		}
703 
704 		processed = row_merge_fts_doc_tokenize(
705 			buf, doc_item->doc_id, &doc,
706 			&word_dtype,
707 			merge_file, psort_info->psort_common->opt_doc_id_size,
708 			&t_ctx);
709 
710 		/* Current sort buffer full, need to recycle */
711 		if (!processed) {
712 			ut_ad(t_ctx.processed_len < doc.text.f_len);
713 			ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
714 			break;
715 		}
716 
717 		num_doc_processed++;
718 
719 		if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
720 			ib_logf(IB_LOG_LEVEL_INFO,
721 				"number of doc processed %d\n",
722 				(int) num_doc_processed);
723 #ifdef FTS_INTERNAL_DIAG_PRINT
724 			for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
725 				ib_logf(IB_LOG_LEVEL_INFO,
726 					"ID %d, partition %d, word "
727 					"%d\n",(int) psort_info->psort_id,
728 					(int) i, (int) mycount[i]);
729 			}
730 #endif
731 		}
732 
733 		mem_heap_empty(blob_heap);
734 
735 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
736 
737 		if (doc_item && last_doc_id != doc_item->doc_id) {
738 			t_ctx.init_pos = 0;
739 		}
740 	}
741 
742 	/* If we run out of current sort buffer, need to sort
743 	and flush the sort buffer to disk */
744 	if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
745 		row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
746 		row_merge_buf_write(buf[t_ctx.buf_used],
747 				    merge_file[t_ctx.buf_used],
748 				    block[t_ctx.buf_used]);
749 
750 		if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
751 				     merge_file[t_ctx.buf_used]->offset++,
752 				     block[t_ctx.buf_used])) {
753 			error = DB_TEMP_FILE_WRITE_FAILURE;
754 			goto func_exit;
755 		}
756 
757 		UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
758 		buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
759 		mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
760 		t_ctx.rows_added[t_ctx.buf_used] = 0;
761 
762 		ut_a(doc_item);
763 		goto loop;
764 	}
765 
766 	/* Parent done scanning, and if finish processing all the docs, exit */
767 	if (psort_info->state == FTS_PARENT_COMPLETE) {
768 		if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
769 			goto exit;
770 		} else if (retried > 10000) {
771 			ut_ad(!doc_item);
772 			/* retied too many times and cannot get new record */
773 			ib_logf(IB_LOG_LEVEL_ERROR,
774 					"InnoDB: FTS parallel sort processed "
775 					"%lu records, the sort queue has "
776 					"%lu records. But sort cannot get "
777 					"the next records", num_doc_processed,
778 					UT_LIST_GET_LEN(
779 						psort_info->fts_doc_list));
780 			goto exit;
781 		}
782 	} else if (psort_info->state == FTS_PARENT_EXITING) {
783 		/* Parent abort */
784 		goto func_exit;
785 	}
786 
787 	if (doc_item == NULL) {
788 		os_thread_yield();
789 	}
790 
791 	row_merge_fts_get_next_doc_item(psort_info, &doc_item);
792 
793 	if (doc_item != NULL) {
794 		if (last_doc_id != doc_item->doc_id) {
795 			t_ctx.init_pos = 0;
796 		}
797 
798 		retried = 0;
799 	} else if (psort_info->state == FTS_PARENT_COMPLETE) {
800 		retried++;
801 	}
802 
803 	goto loop;
804 
805 exit:
806 	/* Do a final sort of the last (or latest) batch of records
807 	in block memory. Flush them to temp file if records cannot
808 	be hold in one block memory */
809 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
810 		if (t_ctx.rows_added[i]) {
811 			row_merge_buf_sort(buf[i], NULL);
812 			row_merge_buf_write(
813 				buf[i], merge_file[i], block[i]);
814 
815 			/* Write to temp file, only if records have
816 			been flushed to temp file before (offset > 0):
817 			The pseudo code for sort is following:
818 
819 				while (there are rows) {
820 					tokenize rows, put result in block[]
821 					if (block[] runs out) {
822 						sort rows;
823 						write to temp file with
824 						row_merge_write();
825 						offset++;
826 					}
827 				}
828 
829 				# write out the last batch
830 				if (offset > 0) {
831 					row_merge_write();
832 					offset++;
833 				} else {
834 					# no need to write anything
835 					offset stay as 0
836 				}
837 
838 			so if merge_file[i]->offset is 0 when we come to
839 			here as the last batch, this means rows have
840 			never flush to temp file, it can be held all in
841 			memory */
842 			if (merge_file[i]->offset != 0) {
843 				if (!row_merge_write(merge_file[i]->fd,
844 						merge_file[i]->offset++,
845 						block[i])) {
846 					error = DB_TEMP_FILE_WRITE_FAILURE;
847 					goto func_exit;
848 				}
849 
850 				UNIV_MEM_INVALID(block[i][0],
851 						 srv_sort_buf_size);
852 			}
853 
854 			buf[i] = row_merge_buf_empty(buf[i]);
855 			t_ctx.rows_added[i] = 0;
856 		}
857 	}
858 
859 	if (fts_enable_diag_print) {
860 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: start merge sort\n");
861 	}
862 
863 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
864 		if (!merge_file[i]->offset) {
865 			continue;
866 		}
867 
868 		tmpfd[i] = row_merge_file_create_low(path);
869 		if (tmpfd[i] < 0) {
870 			error = DB_OUT_OF_MEMORY;
871 			goto func_exit;
872 		}
873 
874 		error = row_merge_sort(psort_info->psort_common->trx,
875 				       psort_info->psort_common->dup,
876 				       merge_file[i], block[i], &tmpfd[i]);
877 		if (error != DB_SUCCESS) {
878 			close(tmpfd[i]);
879 			goto func_exit;
880 		}
881 
882 		total_rec += merge_file[i]->n_rec;
883 		close(tmpfd[i]);
884 	}
885 
886 func_exit:
887 	if (fts_enable_diag_print) {
888 		DEBUG_FTS_SORT_PRINT("  InnoDB_FTS: complete merge sort\n");
889 	}
890 
891 	mem_heap_free(blob_heap);
892 
893 	mutex_enter(&psort_info->mutex);
894 	psort_info->error = error;
895 	mutex_exit(&psort_info->mutex);
896 
897 	if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
898 		/* child can exit either with error or told by parent. */
899 		ut_ad(error != DB_SUCCESS
900 		      || psort_info->state == FTS_PARENT_EXITING);
901 	}
902 
903 	/* Free fts doc list in case of error. */
904 	do {
905 		row_merge_fts_get_next_doc_item(psort_info, &doc_item);
906 	} while (doc_item != NULL);
907 
908 	psort_info->child_status = FTS_CHILD_COMPLETE;
909 	os_event_set(psort_info->psort_common->sort_event);
910 	psort_info->child_status = FTS_CHILD_EXITING;
911 
912 #ifdef __WIN__
913 	CloseHandle(psort_info->thread_hdl);
914 #endif /*__WIN__ */
915 
916 	os_thread_exit(NULL);
917 
918 	OS_THREAD_DUMMY_RETURN;
919 }
920 
921 /*********************************************************************//**
922 Start the parallel tokenization and parallel merge sort */
923 UNIV_INTERN
924 void
row_fts_start_psort(fts_psort_t * psort_info)925 row_fts_start_psort(
926 /*================*/
927 	fts_psort_t*	psort_info)	/*!< parallel sort structure */
928 {
929 	ulint		i = 0;
930 	os_thread_id_t	thd_id;
931 
932 	for (i = 0; i < fts_sort_pll_degree; i++) {
933 		psort_info[i].psort_id = i;
934 		psort_info[i].thread_hdl = os_thread_create(
935 			fts_parallel_tokenization,
936 			(void*) &psort_info[i], &thd_id);
937 	}
938 }
939 
940 /*********************************************************************//**
941 Function performs the merge and insertion of the sorted records.
942 @return OS_THREAD_DUMMY_RETURN */
943 UNIV_INTERN
944 os_thread_ret_t
fts_parallel_merge(void * arg)945 fts_parallel_merge(
946 /*===============*/
947 	void*		arg)		/*!< in: parallel merge info */
948 {
949 	fts_psort_t*	psort_info = (fts_psort_t*) arg;
950 	ulint		id;
951 
952 	ut_ad(psort_info);
953 
954 	id = psort_info->psort_id;
955 
956 	row_fts_merge_insert(psort_info->psort_common->dup->index,
957 			     psort_info->psort_common->new_table,
958 			     psort_info->psort_common->all_info, id);
959 
960 	psort_info->child_status = FTS_CHILD_COMPLETE;
961 	os_event_set(psort_info->psort_common->merge_event);
962 	psort_info->child_status = FTS_CHILD_EXITING;
963 
964 #ifdef __WIN__
965 	CloseHandle(psort_info->thread_hdl);
966 #endif /*__WIN__ */
967 
968 	os_thread_exit(NULL, false);
969 
970 	OS_THREAD_DUMMY_RETURN;
971 }
972 
973 /*********************************************************************//**
974 Kick off the parallel merge and insert thread */
975 UNIV_INTERN
976 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)977 row_fts_start_parallel_merge(
978 /*=========================*/
979 	fts_psort_t*	merge_info)	/*!< in: parallel sort info */
980 {
981 	int		i = 0;
982 	os_thread_id_t	thd_id;
983 
984 	/* Kick off merge/insert threads */
985 	for (i = 0; i <  FTS_NUM_AUX_INDEX; i++) {
986 		merge_info[i].psort_id = i;
987 		merge_info[i].child_status = 0;
988 
989 		merge_info[i].thread_hdl = os_thread_create(
990 			fts_parallel_merge, (void*) &merge_info[i], &thd_id);
991 	}
992 }
993 
994 /********************************************************************//**
995 Insert processed FTS data to auxillary index tables.
996 @return	DB_SUCCESS if insertion runs fine */
997 static MY_ATTRIBUTE((nonnull))
998 dberr_t
row_merge_write_fts_word(trx_t * trx,que_t ** ins_graph,fts_tokenizer_word_t * word,fts_table_t * fts_table,CHARSET_INFO * charset)999 row_merge_write_fts_word(
1000 /*=====================*/
1001 	trx_t*		trx,		/*!< in: transaction */
1002 	que_t**		ins_graph,	/*!< in: Insert query graphs */
1003 	fts_tokenizer_word_t* word,	/*!< in: sorted and tokenized
1004 					word */
1005 	fts_table_t*	fts_table,	/*!< in: fts aux table instance */
1006 	CHARSET_INFO*	charset)	/*!< in: charset */
1007 {
1008 	ulint	selected;
1009 	dberr_t	ret = DB_SUCCESS;
1010 
1011 	selected = fts_select_index(
1012 		charset, word->text.f_str, word->text.f_len);
1013 	fts_table->suffix = fts_get_suffix(selected);
1014 
1015 	/* Pop out each fts_node in word->nodes write them to auxiliary table */
1016 	while (ib_vector_size(word->nodes) > 0) {
1017 		dberr_t		error;
1018 		fts_node_t*	fts_node;
1019 
1020 		fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
1021 
1022 		error = fts_write_node(
1023 			trx, &ins_graph[selected], fts_table, &word->text,
1024 			fts_node);
1025 
1026 		if (error != DB_SUCCESS) {
1027 			fprintf(stderr, "InnoDB: failed to write"
1028 				" word %s to FTS auxiliary index"
1029 				" table, error (%s) \n",
1030 				word->text.f_str, ut_strerr(error));
1031 			ret = error;
1032 		}
1033 
1034 		ut_free(fts_node->ilist);
1035 		fts_node->ilist = NULL;
1036 	}
1037 
1038 	return(ret);
1039 }
1040 
1041 /*********************************************************************//**
1042 Read sorted FTS data files and insert data tuples to auxillary tables.
1043 @return	DB_SUCCESS or error number */
1044 UNIV_INTERN
1045 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1046 row_fts_insert_tuple(
1047 /*=================*/
1048 	fts_psort_insert_t*
1049 			ins_ctx,	/*!< in: insert context */
1050 	fts_tokenizer_word_t* word,	/*!< in: last processed
1051 					tokenized word */
1052 	ib_vector_t*	positions,	/*!< in: word position */
1053 	doc_id_t*	in_doc_id,	/*!< in: last item doc id */
1054 	dtuple_t*	dtuple)		/*!< in: entry to insert */
1055 {
1056 	fts_node_t*	fts_node = NULL;
1057 	dfield_t*	dfield;
1058 	doc_id_t	doc_id;
1059 	ulint		position;
1060 	fts_string_t	token_word;
1061 	ulint		i;
1062 
1063 	/* Get fts_node for the FTS auxillary INDEX table */
1064 	if (ib_vector_size(word->nodes) > 0) {
1065 		fts_node = static_cast<fts_node_t*>(
1066 			ib_vector_last(word->nodes));
1067 	}
1068 
1069 	if (fts_node == NULL
1070 	    || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1071 
1072 		fts_node = static_cast<fts_node_t*>(
1073 			ib_vector_push(word->nodes, NULL));
1074 
1075 		memset(fts_node, 0x0, sizeof(*fts_node));
1076 	}
1077 
1078 	/* If dtuple == NULL, this is the last word to be processed */
1079 	if (!dtuple) {
1080 		if (fts_node && ib_vector_size(positions) > 0) {
1081 			fts_cache_node_add_positions(
1082 				NULL, fts_node, *in_doc_id,
1083 				positions);
1084 
1085 			/* Write out the current word */
1086 			row_merge_write_fts_word(ins_ctx->trx,
1087 						 ins_ctx->ins_graph, word,
1088 						 &ins_ctx->fts_table,
1089 						 ins_ctx->charset);
1090 
1091 		}
1092 
1093 		return;
1094 	}
1095 
1096 	/* Get the first field for the tokenized word */
1097 	dfield = dtuple_get_nth_field(dtuple, 0);
1098 
1099 	token_word.f_n_char = 0;
1100 	token_word.f_len = dfield->len;
1101 	token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1102 
1103 	if (!word->text.f_str) {
1104 		fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1105 	}
1106 
1107 	/* compare to the last word, to see if they are the same
1108 	word */
1109 	if (innobase_fts_text_cmp(ins_ctx->charset,
1110 				  &word->text, &token_word) != 0) {
1111 		ulint	num_item;
1112 
1113 		/* Getting a new word, flush the last position info
1114 		for the currnt word in fts_node */
1115 		if (ib_vector_size(positions) > 0) {
1116 			fts_cache_node_add_positions(
1117 				NULL, fts_node, *in_doc_id, positions);
1118 		}
1119 
1120 		/* Write out the current word */
1121 		row_merge_write_fts_word(ins_ctx->trx, ins_ctx->ins_graph,
1122 					 word, &ins_ctx->fts_table,
1123 					 ins_ctx->charset);
1124 
1125 		/* Copy the new word */
1126 		fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1127 
1128 		num_item = ib_vector_size(positions);
1129 
1130 		/* Clean up position queue */
1131 		for (i = 0; i < num_item; i++) {
1132 			ib_vector_pop(positions);
1133 		}
1134 
1135 		/* Reset Doc ID */
1136 		*in_doc_id = 0;
1137 		memset(fts_node, 0x0, sizeof(*fts_node));
1138 	}
1139 
1140 	/* Get the word's Doc ID */
1141 	dfield = dtuple_get_nth_field(dtuple, 1);
1142 
1143 	if (!ins_ctx->opt_doc_id_size) {
1144 		doc_id = fts_read_doc_id(
1145 			static_cast<byte*>(dfield_get_data(dfield)));
1146 	} else {
1147 		doc_id = (doc_id_t) mach_read_from_4(
1148 			static_cast<byte*>(dfield_get_data(dfield)));
1149 	}
1150 
1151 	/* Get the word's position info */
1152 	dfield = dtuple_get_nth_field(dtuple, 2);
1153 	position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1154 
1155 	/* If this is the same word as the last word, and they
1156 	have the same Doc ID, we just need to add its position
1157 	info. Otherwise, we will flush position info to the
1158 	fts_node and initiate a new position vector  */
1159 	if (!(*in_doc_id) || *in_doc_id == doc_id) {
1160 		ib_vector_push(positions, &position);
1161 	} else {
1162 		ulint	num_pos = ib_vector_size(positions);
1163 
1164 		fts_cache_node_add_positions(NULL, fts_node,
1165 					     *in_doc_id, positions);
1166 		for (i = 0; i < num_pos; i++) {
1167 			ib_vector_pop(positions);
1168 		}
1169 		ib_vector_push(positions, &position);
1170 	}
1171 
1172 	/* record the current Doc ID */
1173 	*in_doc_id = doc_id;
1174 }
1175 
1176 /*********************************************************************//**
1177 Propagate a newly added record up one level in the selection tree
1178 @return parent where this value propagated to */
1179 static
1180 int
row_fts_sel_tree_propagate(int propogated,int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1181 row_fts_sel_tree_propagate(
1182 /*=======================*/
1183 	int		propogated,	/*<! in: tree node propagated */
1184 	int*		sel_tree,	/*<! in: selection tree */
1185 	const mrec_t**	mrec,		/*<! in: sort record */
1186 	ulint**		offsets,	/*<! in: record offsets */
1187 	dict_index_t*	index)		/*<! in/out: FTS index */
1188 {
1189 	ulint	parent;
1190 	int	child_left;
1191 	int	child_right;
1192 	int	selected;
1193 
1194 	/* Find which parent this value will be propagated to */
1195 	parent = (propogated - 1) / 2;
1196 
1197 	/* Find out which value is smaller, and to propagate */
1198 	child_left = sel_tree[parent * 2 + 1];
1199 	child_right = sel_tree[parent * 2 + 2];
1200 
1201 	if (child_left == -1 || mrec[child_left] == NULL) {
1202 		if (child_right == -1
1203 		    || mrec[child_right] == NULL) {
1204 			selected = -1;
1205 		} else {
1206 			selected = child_right ;
1207 		}
1208 	} else if (child_right == -1
1209 		   || mrec[child_right] == NULL) {
1210 		selected = child_left;
1211 	} else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1212 				      offsets[child_left],
1213 				      offsets[child_right],
1214 				      index, NULL) < 0) {
1215 		selected = child_left;
1216 	} else {
1217 		selected = child_right;
1218 	}
1219 
1220 	sel_tree[parent] = selected;
1221 
1222 	return(static_cast<int>(parent));
1223 }
1224 
1225 /*********************************************************************//**
1226 Readjust selection tree after popping the root and read a new value
1227 @return the new root */
1228 static
1229 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1230 row_fts_sel_tree_update(
1231 /*====================*/
1232 	int*		sel_tree,	/*<! in/out: selection tree */
1233 	ulint		propagated,	/*<! in: node to propagate up */
1234 	ulint		height,		/*<! in: tree height */
1235 	const mrec_t**	mrec,		/*<! in: sort record */
1236 	ulint**		offsets,	/*<! in: record offsets */
1237 	dict_index_t*	index)		/*<! in: index dictionary */
1238 {
1239 	ulint	i;
1240 
1241 	for (i = 1; i <= height; i++) {
1242 		propagated = static_cast<ulint>(row_fts_sel_tree_propagate(
1243 			static_cast<int>(propagated), sel_tree, mrec, offsets, index));
1244 	}
1245 
1246 	return(sel_tree[0]);
1247 }
1248 
1249 /*********************************************************************//**
1250 Build selection tree at a specified level */
1251 static
1252 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1253 row_fts_build_sel_tree_level(
1254 /*=========================*/
1255 	int*		sel_tree,	/*<! in/out: selection tree */
1256 	ulint		level,		/*<! in: selection tree level */
1257 	const mrec_t**	mrec,		/*<! in: sort record */
1258 	ulint**		offsets,	/*<! in: record offsets */
1259 	dict_index_t*	index)		/*<! in: index dictionary */
1260 {
1261 	ulint	start;
1262 	int	child_left;
1263 	int	child_right;
1264 	ulint	i;
1265 	ulint	num_item;
1266 
1267 	start = static_cast<ulint>((1 << level) - 1);
1268 	num_item = static_cast<ulint>(1 << level);
1269 
1270 	for (i = 0; i < num_item;  i++) {
1271 		child_left = sel_tree[(start + i) * 2 + 1];
1272 		child_right = sel_tree[(start + i) * 2 + 2];
1273 
1274 		if (child_left == -1) {
1275 			if (child_right == -1) {
1276 				sel_tree[start + i] = -1;
1277 			} else {
1278 				sel_tree[start + i] =  child_right;
1279 			}
1280 			continue;
1281 		} else if (child_right == -1) {
1282 			sel_tree[start + i] = child_left;
1283 			continue;
1284 		}
1285 
1286 		/* Deal with NULL child conditions */
1287 		if (!mrec[child_left]) {
1288 			if (!mrec[child_right]) {
1289 				sel_tree[start + i] = -1;
1290 			} else {
1291 				sel_tree[start + i] = child_right;
1292 			}
1293 			continue;
1294 		} else if (!mrec[child_right]) {
1295 			sel_tree[start + i] = child_left;
1296 			continue;
1297 		}
1298 
1299 		/* Select the smaller one to set parent pointer */
1300 		int cmp = cmp_rec_rec_simple(
1301 			mrec[child_left], mrec[child_right],
1302 			offsets[child_left], offsets[child_right],
1303 			index, NULL);
1304 
1305 		sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1306 	}
1307 }
1308 
1309 /*********************************************************************//**
1310 Build a selection tree for merge. The selection tree is a binary tree
1311 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1312 @return number of tree levels */
1313 static
1314 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1315 row_fts_build_sel_tree(
1316 /*===================*/
1317 	int*		sel_tree,	/*<! in/out: selection tree */
1318 	const mrec_t**	mrec,		/*<! in: sort record */
1319 	ulint**		offsets,	/*<! in: record offsets */
1320 	dict_index_t*	index)		/*<! in: index dictionary */
1321 {
1322 	ulint	treelevel = 1;
1323 	ulint	num = 2;
1324 	int	i = 0;
1325 	ulint	start;
1326 
1327 	/* No need to build selection tree if we only have two merge threads */
1328 	if (fts_sort_pll_degree <= 2) {
1329 		return(0);
1330 	}
1331 
1332 	while (num < fts_sort_pll_degree) {
1333 		num = num << 1;
1334 		treelevel++;
1335 	}
1336 
1337 	start = (1 << treelevel) - 1;
1338 
1339 	for (i = 0; i < (int) fts_sort_pll_degree; i++) {
1340 		sel_tree[i + start] = i;
1341 	}
1342 
1343 	for (i = static_cast<int>(treelevel) - 1; i >= 0; i--) {
1344 		row_fts_build_sel_tree_level(
1345 			sel_tree, static_cast<ulint>(i), mrec, offsets, index);
1346 	}
1347 
1348 	return(treelevel);
1349 }
1350 
1351 /*********************************************************************//**
1352 Read sorted file containing index data tuples and insert these data
1353 tuples to the index
1354 @return	DB_SUCCESS or error number */
1355 UNIV_INTERN
1356 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1357 row_fts_merge_insert(
1358 /*=================*/
1359 	dict_index_t*		index,	/*!< in: index */
1360 	dict_table_t*		table,	/*!< in: new table */
1361 	fts_psort_t*		psort_info, /*!< parallel sort info */
1362 	ulint			id)	/* !< in: which auxiliary table's data
1363 					to insert to */
1364 {
1365 	const byte**		b;
1366 	mem_heap_t*		tuple_heap;
1367 	mem_heap_t*		heap;
1368 	dberr_t			error = DB_SUCCESS;
1369 	ulint*			foffs;
1370 	ulint**			offsets;
1371 	fts_tokenizer_word_t	new_word;
1372 	ib_vector_t*		positions;
1373 	doc_id_t		last_doc_id;
1374 	ib_alloc_t*		heap_alloc;
1375 	ulint			n_bytes;
1376 	ulint			i;
1377 	mrec_buf_t**		buf;
1378 	int*			fd;
1379 	byte**			block;
1380 	const mrec_t**		mrec;
1381 	ulint			count = 0;
1382 	int*			sel_tree;
1383 	ulint			height;
1384 	ulint			start;
1385 	fts_psort_insert_t	ins_ctx;
1386 	ulint			count_diag = 0;
1387 
1388 	ut_ad(index);
1389 	ut_ad(table);
1390 
1391 	/* We use the insert query graph as the dummy graph
1392 	needed in the row module call */
1393 
1394 	ins_ctx.trx = trx_allocate_for_background();
1395 
1396 	ins_ctx.trx->op_info = "inserting index entries";
1397 
1398 	ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1399 
1400 	heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1401 
1402 	b = (const byte**) mem_heap_alloc(
1403 		heap, sizeof (*b) * fts_sort_pll_degree);
1404 	foffs = (ulint*) mem_heap_alloc(
1405 		heap, sizeof(*foffs) * fts_sort_pll_degree);
1406 	offsets = (ulint**) mem_heap_alloc(
1407 		heap, sizeof(*offsets) * fts_sort_pll_degree);
1408 	buf = (mrec_buf_t**) mem_heap_alloc(
1409 		heap, sizeof(*buf) * fts_sort_pll_degree);
1410 	fd = (int*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1411 	block = (byte**) mem_heap_alloc(
1412 		heap, sizeof(*block) * fts_sort_pll_degree);
1413 	mrec = (const mrec_t**) mem_heap_alloc(
1414 		heap, sizeof(*mrec) * fts_sort_pll_degree);
1415 	sel_tree = (int*) mem_heap_alloc(
1416 		heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1417 
1418 	tuple_heap = mem_heap_create(1000);
1419 
1420 	ins_ctx.charset = fts_index_get_charset(index);
1421 	ins_ctx.heap = heap;
1422 
1423 	for (i = 0; i < fts_sort_pll_degree; i++) {
1424 		ulint	num;
1425 
1426 		num = 1 + REC_OFFS_HEADER_SIZE
1427 			+ dict_index_get_n_fields(index);
1428 		offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1429 			heap, num * sizeof *offsets[i]));
1430 		offsets[i][0] = num;
1431 		offsets[i][1] = dict_index_get_n_fields(index);
1432 		block[i] = psort_info[i].merge_block[id];
1433 		b[i] = psort_info[i].merge_block[id];
1434 		fd[i] = psort_info[i].merge_file[id]->fd;
1435 		foffs[i] = 0;
1436 
1437 		buf[i] = static_cast<unsigned char (*)[16384]>(
1438 			mem_heap_alloc(heap, sizeof *buf[i]));
1439 		count_diag += (int) psort_info[i].merge_file[id]->n_rec;
1440 	}
1441 
1442 	if (fts_enable_diag_print) {
1443 		ut_print_timestamp(stderr);
1444 		fprintf(stderr, "  InnoDB_FTS: to inserted %lu records\n",
1445 			(ulong) count_diag);
1446 	}
1447 
1448 	/* Initialize related variables if creating FTS indexes */
1449 	heap_alloc = ib_heap_allocator_create(heap);
1450 
1451 	memset(&new_word, 0, sizeof(new_word));
1452 
1453 	new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1454 	positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1455 	last_doc_id = 0;
1456 
1457 	/* Allocate insert query graphs for FTS auxillary
1458 	Index Table, note we have FTS_NUM_AUX_INDEX such index tables */
1459 	n_bytes = sizeof(que_t*) * (FTS_NUM_AUX_INDEX + 1);
1460 	ins_ctx.ins_graph = static_cast<que_t**>(mem_heap_alloc(heap, n_bytes));
1461 	memset(ins_ctx.ins_graph, 0x0, n_bytes);
1462 
1463 	/* We should set the flags2 with aux_table_name here,
1464 	in order to get the correct aux table names. */
1465 	index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1466 	DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1467 			index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
1468 
1469 	ins_ctx.fts_table.type = FTS_INDEX_TABLE;
1470 	ins_ctx.fts_table.index_id = index->id;
1471 	ins_ctx.fts_table.table_id = table->id;
1472 	ins_ctx.fts_table.parent = index->table->name;
1473 	ins_ctx.fts_table.table = index->table;
1474 
1475 	for (i = 0; i < fts_sort_pll_degree; i++) {
1476 		if (psort_info[i].merge_file[id]->n_rec == 0) {
1477 			/* No Rows to read */
1478 			mrec[i] = b[i] = NULL;
1479 		} else {
1480 			/* Read from temp file only if it has been
1481 			written to. Otherwise, block memory holds
1482 			all the sorted records */
1483 			if (psort_info[i].merge_file[id]->offset > 0
1484 			    && (!row_merge_read(
1485 					fd[i], foffs[i],
1486 					(row_merge_block_t*) block[i]))) {
1487 				error = DB_CORRUPTION;
1488 				goto exit;
1489 			}
1490 
1491 			ROW_MERGE_READ_GET_NEXT(i);
1492 		}
1493 	}
1494 
1495 	height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1496 					offsets, index);
1497 
1498 	start = (1 << height) - 1;
1499 
1500 	/* Fetch sorted records from sort buffer and insert them into
1501 	corresponding FTS index auxiliary tables */
1502 	for (;;) {
1503 		dtuple_t*	dtuple;
1504 		ulint		n_ext;
1505 		int		min_rec = 0;
1506 
1507 		if (fts_sort_pll_degree <= 2) {
1508 			while (!mrec[min_rec]) {
1509 				min_rec++;
1510 
1511 				if (min_rec >= (int) fts_sort_pll_degree) {
1512 					row_fts_insert_tuple(
1513 						&ins_ctx, &new_word,
1514 						positions, &last_doc_id,
1515 						NULL);
1516 
1517 					goto exit;
1518 				}
1519 			}
1520 
1521 			for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1522 				if (!mrec[i]) {
1523 					continue;
1524 				}
1525 
1526 				if (cmp_rec_rec_simple(
1527 					    mrec[i], mrec[min_rec],
1528 					    offsets[i], offsets[min_rec],
1529 					    index, NULL) < 0) {
1530 					min_rec = static_cast<int>(i);
1531 				}
1532 			}
1533 		} else {
1534 			min_rec = sel_tree[0];
1535 
1536 			if (min_rec ==  -1) {
1537 				row_fts_insert_tuple(
1538 					&ins_ctx, &new_word,
1539 					positions, &last_doc_id,
1540 					NULL);
1541 
1542 				goto exit;
1543 			}
1544 		}
1545 
1546 		dtuple = row_rec_to_index_entry_low(
1547 			mrec[min_rec], index, offsets[min_rec], &n_ext,
1548 			tuple_heap);
1549 
1550 		row_fts_insert_tuple(
1551 			&ins_ctx, &new_word, positions,
1552 			&last_doc_id, dtuple);
1553 
1554 
1555 		ROW_MERGE_READ_GET_NEXT(min_rec);
1556 
1557 		if (fts_sort_pll_degree > 2) {
1558 			if (!mrec[min_rec]) {
1559 				sel_tree[start + min_rec] = -1;
1560 			}
1561 
1562 			row_fts_sel_tree_update(sel_tree, start + min_rec,
1563 						height, mrec,
1564 						offsets, index);
1565 		}
1566 
1567 		count++;
1568 
1569 		mem_heap_empty(tuple_heap);
1570 	}
1571 
1572 exit:
1573 	fts_sql_commit(ins_ctx.trx);
1574 
1575 	ins_ctx.trx->op_info = "";
1576 
1577 	mem_heap_free(tuple_heap);
1578 
1579 	for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1580 		if (ins_ctx.ins_graph[i]) {
1581 			fts_que_graph_free(ins_ctx.ins_graph[i]);
1582 		}
1583 	}
1584 
1585 	trx_free_for_background(ins_ctx.trx);
1586 
1587 	mem_heap_free(heap);
1588 
1589 	if (fts_enable_diag_print) {
1590 		ut_print_timestamp(stderr);
1591 		fprintf(stderr, "  InnoDB_FTS: inserted %lu records\n",
1592 			(ulong) count);
1593 	}
1594 
1595 	return(error);
1596 }
1597