1 /*****************************************************************************
2
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ftsort.cc
29 Create Full Text Index with (parallel) merge sort
30
31 Created 10/13/2010 Jimmy Yang
32 *******************************************************/
33
34 #include "dict0dict.h" /* dict_table_stats_lock() */
35 #include "row0merge.h"
36 #include "pars0pars.h"
37 #include "row0ftsort.h"
38 #include "row0merge.h"
39 #include "row0row.h"
40 #include "btr0cur.h"
41 #include "btr0sea.h"
42
43 /** Read the next record to buffer N.
44 @param N index into array of merge info structure */
45 #define ROW_MERGE_READ_GET_NEXT(N) \
46 do { \
47 b[N] = row_merge_read_rec( \
48 block[N], buf[N], b[N], index, \
49 fd[N], &foffs[N], &mrec[N], offsets[N]); \
50 if (UNIV_UNLIKELY(!b[N])) { \
51 if (mrec[N]) { \
52 goto exit; \
53 } \
54 } \
55 } while (0)
56
57 /** Parallel sort degree */
58 UNIV_INTERN ulong fts_sort_pll_degree = 2;
59
60 /*********************************************************************//**
61 Create a temporary "fts sort index" used to merge sort the
62 tokenized doc string. The index has three "fields":
63
64 1) Tokenized word,
65 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
66 integer value)
67 3) Word's position in original doc.
68
69 @return dict_index_t structure for the fts sort index */
70 UNIV_INTERN
71 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,const dict_table_t * table,ibool * opt_doc_id_size)72 row_merge_create_fts_sort_index(
73 /*============================*/
74 dict_index_t* index, /*!< in: Original FTS index
75 based on which this sort index
76 is created */
77 const dict_table_t* table, /*!< in: table that FTS index
78 is being created on */
79 ibool* opt_doc_id_size)
80 /*!< out: whether to use 4 bytes
81 instead of 8 bytes integer to
82 store Doc ID during sort */
83 {
84 dict_index_t* new_index;
85 dict_field_t* field;
86 dict_field_t* idx_field;
87 CHARSET_INFO* charset;
88
89 // FIXME: This name shouldn't be hard coded here.
90 new_index = dict_mem_index_create(
91 index->table->name, "tmp_fts_idx", 0, DICT_FTS, 3);
92
93 new_index->id = index->id;
94 new_index->table = (dict_table_t*) table;
95 new_index->n_uniq = FTS_NUM_FIELDS_SORT;
96 new_index->n_def = FTS_NUM_FIELDS_SORT;
97 new_index->cached = TRUE;
98
99 btr_search_index_init(new_index);
100
101 idx_field = dict_index_get_nth_field(index, 0);
102 charset = fts_index_get_charset(index);
103
104 /* The first field is on the Tokenized Word */
105 field = dict_index_get_nth_field(new_index, 0);
106 field->name = NULL;
107 field->prefix_len = 0;
108 field->col = static_cast<dict_col_t*>(
109 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
110 field->col->len = FTS_MAX_WORD_LEN;
111
112 if (strcmp(charset->name, "latin1_swedish_ci") == 0) {
113 field->col->mtype = DATA_VARCHAR;
114 } else {
115 field->col->mtype = DATA_VARMYSQL;
116 }
117
118 field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
119 field->col->mbminmaxlen = idx_field->col->mbminmaxlen;
120 field->fixed_len = 0;
121
122 /* Doc ID */
123 field = dict_index_get_nth_field(new_index, 1);
124 field->name = NULL;
125 field->prefix_len = 0;
126 field->col = static_cast<dict_col_t*>(
127 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
128 field->col->mtype = DATA_INT;
129 *opt_doc_id_size = FALSE;
130
131 /* Check whether we can use 4 bytes instead of 8 bytes integer
132 field to hold the Doc ID, thus reduce the overall sort size */
133 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
134 /* If Doc ID column is being added by this create
135 index, then just check the number of rows in the table */
136 if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
137 *opt_doc_id_size = TRUE;
138 }
139 } else {
140 doc_id_t max_doc_id;
141
142 /* If the Doc ID column is supplied by user, then
143 check the maximum Doc ID in the table */
144 max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
145
146 if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
147 *opt_doc_id_size = TRUE;
148 }
149 }
150
151 if (*opt_doc_id_size) {
152 field->col->len = sizeof(ib_uint32_t);
153 field->fixed_len = sizeof(ib_uint32_t);
154 } else {
155 field->col->len = FTS_DOC_ID_LEN;
156 field->fixed_len = FTS_DOC_ID_LEN;
157 }
158
159 field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
160
161 field->col->mbminmaxlen = 0;
162
163 /* The third field is on the word's position in the original doc */
164 field = dict_index_get_nth_field(new_index, 2);
165 field->name = NULL;
166 field->prefix_len = 0;
167 field->col = static_cast<dict_col_t*>(
168 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
169 field->col->mtype = DATA_INT;
170 field->col->len = 4 ;
171 field->fixed_len = 4;
172 field->col->prtype = DATA_NOT_NULL;
173 field->col->mbminmaxlen = 0;
174
175 return(new_index);
176 }
177 /*********************************************************************//**
178 Initialize FTS parallel sort structures.
179 @return TRUE if all successful */
180 UNIV_INTERN
181 ibool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,const dict_table_t * new_table,ibool opt_doc_id_size,fts_psort_t ** psort,fts_psort_t ** merge)182 row_fts_psort_info_init(
183 /*====================*/
184 trx_t* trx, /*!< in: transaction */
185 row_merge_dup_t* dup, /*!< in,own: descriptor of
186 FTS index being created */
187 const dict_table_t* new_table,/*!< in: table on which indexes are
188 created */
189 ibool opt_doc_id_size,
190 /*!< in: whether to use 4 bytes
191 instead of 8 bytes integer to
192 store Doc ID during sort */
193 fts_psort_t** psort, /*!< out: parallel sort info to be
194 instantiated */
195 fts_psort_t** merge) /*!< out: parallel merge info
196 to be instantiated */
197 {
198 ulint i;
199 ulint j;
200 fts_psort_common_t* common_info = NULL;
201 fts_psort_t* psort_info = NULL;
202 fts_psort_t* merge_info = NULL;
203 ulint block_size;
204 ibool ret = TRUE;
205
206 block_size = 3 * srv_sort_buf_size;
207
208 *psort = psort_info = static_cast<fts_psort_t*>(mem_zalloc(
209 fts_sort_pll_degree * sizeof *psort_info));
210
211 if (!psort_info) {
212 ut_free(dup);
213 return(FALSE);
214 }
215
216 /* Common Info for all sort threads */
217 common_info = static_cast<fts_psort_common_t*>(
218 mem_alloc(sizeof *common_info));
219
220 if (!common_info) {
221 ut_free(dup);
222 mem_free(psort_info);
223 return(FALSE);
224 }
225
226 common_info->dup = dup;
227 common_info->new_table = (dict_table_t*) new_table;
228 common_info->trx = trx;
229 common_info->all_info = psort_info;
230 common_info->sort_event = os_event_create();
231 common_info->merge_event = os_event_create();
232 common_info->opt_doc_id_size = opt_doc_id_size;
233
234 ut_ad(trx->mysql_thd != NULL);
235 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
236
237 /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
238 each parallel sort thread. Each "sort bucket" holds records for
239 a particular "FTS index partition" */
240 for (j = 0; j < fts_sort_pll_degree; j++) {
241
242 UT_LIST_INIT(psort_info[j].fts_doc_list);
243
244 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
245
246 psort_info[j].merge_file[i] =
247 static_cast<merge_file_t*>(
248 mem_zalloc(sizeof(merge_file_t)));
249
250 if (!psort_info[j].merge_file[i]) {
251 ret = FALSE;
252 goto func_exit;
253 }
254
255 psort_info[j].merge_buf[i] = row_merge_buf_create(
256 dup->index);
257
258 if (row_merge_file_create(psort_info[j].merge_file[i],
259 path) < 0) {
260 goto func_exit;
261 }
262
263 /* Need to align memory for O_DIRECT write */
264 psort_info[j].block_alloc[i] =
265 static_cast<row_merge_block_t*>(ut_malloc(
266 block_size + 1024));
267
268 psort_info[j].merge_block[i] =
269 static_cast<row_merge_block_t*>(
270 ut_align(
271 psort_info[j].block_alloc[i], 1024));
272
273 if (!psort_info[j].merge_block[i]) {
274 ret = FALSE;
275 goto func_exit;
276 }
277 }
278
279 psort_info[j].child_status = 0;
280 psort_info[j].state = 0;
281 psort_info[j].psort_common = common_info;
282 psort_info[j].error = DB_SUCCESS;
283 psort_info[j].memory_used = 0;
284 mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE);
285 }
286
287 /* Initialize merge_info structures parallel merge and insert
288 into auxiliary FTS tables (FTS_INDEX_TABLE) */
289 *merge = merge_info = static_cast<fts_psort_t*>(
290 mem_alloc(FTS_NUM_AUX_INDEX * sizeof *merge_info));
291
292 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
293
294 merge_info[j].child_status = 0;
295 merge_info[j].state = 0;
296 merge_info[j].psort_common = common_info;
297 }
298
299 func_exit:
300 if (!ret) {
301 row_fts_psort_info_destroy(psort_info, merge_info);
302 }
303
304 return(ret);
305 }
306 /*********************************************************************//**
307 Clean up and deallocate FTS parallel sort structures, and close the
308 merge sort files */
309 UNIV_INTERN
310 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)311 row_fts_psort_info_destroy(
312 /*=======================*/
313 fts_psort_t* psort_info, /*!< parallel sort info */
314 fts_psort_t* merge_info) /*!< parallel merge info */
315 {
316 ulint i;
317 ulint j;
318
319 if (psort_info) {
320 for (j = 0; j < fts_sort_pll_degree; j++) {
321 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
322 if (psort_info[j].merge_file[i]) {
323 row_merge_file_destroy(
324 psort_info[j].merge_file[i]);
325 }
326
327 if (psort_info[j].block_alloc[i]) {
328 ut_free(psort_info[j].block_alloc[i]);
329 }
330 mem_free(psort_info[j].merge_file[i]);
331 }
332
333 mutex_free(&psort_info[j].mutex);
334 }
335
336 os_event_free(merge_info[0].psort_common->sort_event);
337 os_event_free(merge_info[0].psort_common->merge_event);
338 ut_free(merge_info[0].psort_common->dup);
339 mem_free(merge_info[0].psort_common);
340 mem_free(psort_info);
341 }
342
343 if (merge_info) {
344 mem_free(merge_info);
345 }
346 }
347 /*********************************************************************//**
348 Free up merge buffers when merge sort is done */
349 UNIV_INTERN
350 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)351 row_fts_free_pll_merge_buf(
352 /*=======================*/
353 fts_psort_t* psort_info) /*!< in: parallel sort info */
354 {
355 ulint j;
356 ulint i;
357
358 if (!psort_info) {
359 return;
360 }
361
362 for (j = 0; j < fts_sort_pll_degree; j++) {
363 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
364 row_merge_buf_free(psort_info[j].merge_buf[i]);
365 }
366 }
367
368 return;
369 }
370
371 /*********************************************************************//**
372 Tokenize incoming text data and add to the sort buffer.
373 @return TRUE if the record passed, FALSE if out of space */
374 static
375 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,dtype_t * word_dtype,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)376 row_merge_fts_doc_tokenize(
377 /*=======================*/
378 row_merge_buf_t** sort_buf, /*!< in/out: sort buffer */
379 doc_id_t doc_id, /*!< in: Doc ID */
380 fts_doc_t* doc, /*!< in: Doc to be tokenized */
381 dtype_t* word_dtype, /*!< in: data structure for
382 word col */
383 merge_file_t** merge_file, /*!< in/out: merge file */
384 ibool opt_doc_id_size,/*!< in: whether to use 4 bytes
385 instead of 8 bytes integer to
386 store Doc ID during sort*/
387 fts_tokenize_ctx_t* t_ctx) /*!< in/out: tokenize context */
388 {
389 ulint i;
390 ulint inc;
391 fts_string_t str;
392 ulint len;
393 row_merge_buf_t* buf;
394 dfield_t* field;
395 fts_string_t t_str;
396 ibool buf_full = FALSE;
397 byte str_buf[FTS_MAX_WORD_LEN + 1];
398 ulint data_size[FTS_NUM_AUX_INDEX];
399 ulint n_tuple[FTS_NUM_AUX_INDEX];
400
401 t_str.f_n_char = 0;
402 t_ctx->buf_used = 0;
403
404 memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
405 memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
406
407 /* Tokenize the data and add each word string, its corresponding
408 doc id and position to sort buffer */
409 for (i = t_ctx->processed_len; i < doc->text.f_len; i += inc) {
410 ib_rbt_bound_t parent;
411 ulint idx = 0;
412 ib_uint32_t position;
413 ulint offset = 0;
414 ulint cur_len = 0;
415 doc_id_t write_doc_id;
416
417 inc = innobase_mysql_fts_get_token(
418 doc->charset, doc->text.f_str + i,
419 doc->text.f_str + doc->text.f_len, &str, &offset);
420
421 ut_a(inc > 0);
422
423 /* Ignore string whose character number is less than
424 "fts_min_token_size" or more than "fts_max_token_size" */
425 if (str.f_n_char < fts_min_token_size
426 || str.f_n_char > fts_max_token_size) {
427
428 t_ctx->processed_len += inc;
429 continue;
430 }
431
432 t_str.f_len = innobase_fts_casedn_str(
433 doc->charset, (char*) str.f_str, str.f_len,
434 (char*) &str_buf, FTS_MAX_WORD_LEN + 1);
435
436 t_str.f_str = (byte*) &str_buf;
437
438 /* if "cached_stopword" is defined, ingore words in the
439 stopword list */
440 if (t_ctx->cached_stopword
441 && rbt_search(t_ctx->cached_stopword,
442 &parent, &t_str) == 0) {
443
444 t_ctx->processed_len += inc;
445 continue;
446 }
447
448 /* There are FTS_NUM_AUX_INDEX auxiliary tables, find
449 out which sort buffer to put this word record in */
450 t_ctx->buf_used = fts_select_index(
451 doc->charset, t_str.f_str, t_str.f_len);
452
453 buf = sort_buf[t_ctx->buf_used];
454
455 ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
456 idx = t_ctx->buf_used;
457
458 mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
459
460 field = mtuple->fields = static_cast<dfield_t*>(
461 mem_heap_alloc(buf->heap,
462 FTS_NUM_FIELDS_SORT * sizeof *field));
463
464 /* The first field is the tokenized word */
465 dfield_set_data(field, t_str.f_str, t_str.f_len);
466 len = dfield_get_len(field);
467
468 field->type.mtype = word_dtype->mtype;
469 field->type.prtype = word_dtype->prtype | DATA_NOT_NULL;
470
471 /* Variable length field, set to max size. */
472 field->type.len = FTS_MAX_WORD_LEN;
473 field->type.mbminmaxlen = word_dtype->mbminmaxlen;
474
475 cur_len += len;
476 dfield_dup(field, buf->heap);
477 field++;
478
479 /* The second field is the Doc ID */
480
481 ib_uint32_t doc_id_32_bit;
482
483 if (!opt_doc_id_size) {
484 fts_write_doc_id((byte*) &write_doc_id, doc_id);
485
486 dfield_set_data(
487 field, &write_doc_id, sizeof(write_doc_id));
488 } else {
489 mach_write_to_4(
490 (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
491
492 dfield_set_data(
493 field, &doc_id_32_bit, sizeof(doc_id_32_bit));
494 }
495
496 len = field->len;
497 ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
498
499 field->type.mtype = DATA_INT;
500 field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
501 field->type.len = len;
502 field->type.mbminmaxlen = 0;
503
504 cur_len += len;
505 dfield_dup(field, buf->heap);
506
507 ++field;
508
509 /* The third field is the position */
510 mach_write_to_4(
511 (byte*) &position,
512 (i + offset + inc - str.f_len + t_ctx->init_pos));
513
514 dfield_set_data(field, &position, sizeof(position));
515 len = dfield_get_len(field);
516 ut_ad(len == sizeof(ib_uint32_t));
517
518 field->type.mtype = DATA_INT;
519 field->type.prtype = DATA_NOT_NULL;
520 field->type.len = len;
521 field->type.mbminmaxlen = 0;
522 cur_len += len;
523 dfield_dup(field, buf->heap);
524
525 /* One variable length column, word with its lenght less than
526 fts_max_token_size, add one extra size and one extra byte.
527
528 Since the max length for FTS token now is larger than 255,
529 so we will need to signify length byte itself, so only 1 to 128
530 bytes can be used for 1 bytes, larger than that 2 bytes. */
531 if (t_str.f_len < 128) {
532 /* Extra size is one byte. */
533 cur_len += 2;
534 } else {
535 /* Extra size is two bytes. */
536 cur_len += 3;
537 }
538
539 /* Reserve one byte for the end marker of row_merge_block_t. */
540 if (buf->total_size + data_size[idx] + cur_len
541 >= srv_sort_buf_size - 1) {
542
543 buf_full = TRUE;
544 break;
545 }
546
547 /* Increment the number of tuples */
548 n_tuple[idx]++;
549 t_ctx->processed_len += inc;
550 data_size[idx] += cur_len;
551 }
552
553 /* Update the data length and the number of new word tuples
554 added in this round of tokenization */
555 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
556 /* The computation of total_size below assumes that no
557 delete-mark flags will be stored and that all fields
558 are NOT NULL and fixed-length. */
559
560 sort_buf[i]->total_size += data_size[i];
561
562 sort_buf[i]->n_tuples += n_tuple[i];
563
564 merge_file[i]->n_rec += n_tuple[i];
565 t_ctx->rows_added[i] += n_tuple[i];
566 }
567
568 if (!buf_full) {
569 /* we pad one byte between text accross two fields */
570 t_ctx->init_pos += doc->text.f_len + 1;
571 }
572
573 return(!buf_full);
574 }
575
576 /*********************************************************************//**
577 Get next doc item from fts_doc_list */
578 UNIV_INLINE
579 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)580 row_merge_fts_get_next_doc_item(
581 /*============================*/
582 fts_psort_t* psort_info, /*!< in: psort_info */
583 fts_doc_item_t** doc_item) /*!< in/out: doc item */
584 {
585 if (*doc_item != NULL) {
586 ut_free(*doc_item);
587 }
588
589 mutex_enter(&psort_info->mutex);
590
591 *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
592 if (*doc_item != NULL) {
593 UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list,
594 *doc_item);
595
596 ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
597 + (*doc_item)->field->len);
598 psort_info->memory_used -= sizeof(fts_doc_item_t)
599 + (*doc_item)->field->len;
600 }
601
602 mutex_exit(&psort_info->mutex);
603 }
604
605 /*********************************************************************//**
606 Function performs parallel tokenization of the incoming doc strings.
607 It also performs the initial in memory sort of the parsed records.
608 @return OS_THREAD_DUMMY_RETURN */
609 UNIV_INTERN
610 os_thread_ret_t
fts_parallel_tokenization(void * arg)611 fts_parallel_tokenization(
612 /*======================*/
613 void* arg) /*!< in: psort_info for the thread */
614 {
615 fts_psort_t* psort_info = (fts_psort_t*) arg;
616 ulint i;
617 fts_doc_item_t* doc_item = NULL;
618 row_merge_buf_t** buf;
619 ibool processed = FALSE;
620 merge_file_t** merge_file;
621 row_merge_block_t** block;
622 int tmpfd[FTS_NUM_AUX_INDEX];
623 ulint mycount[FTS_NUM_AUX_INDEX];
624 ib_uint64_t total_rec = 0;
625 ulint num_doc_processed = 0;
626 doc_id_t last_doc_id = 0;
627 ulint zip_size;
628 mem_heap_t* blob_heap = NULL;
629 fts_doc_t doc;
630 dict_table_t* table = psort_info->psort_common->new_table;
631 dtype_t word_dtype;
632 dict_field_t* idx_field;
633 fts_tokenize_ctx_t t_ctx;
634 ulint retried = 0;
635 dberr_t error = DB_SUCCESS;
636
637 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
638
639 const char* path = thd_innodb_tmpdir(
640 psort_info->psort_common->trx->mysql_thd);
641
642 ut_ad(psort_info);
643
644 buf = psort_info->merge_buf;
645 merge_file = psort_info->merge_file;
646 blob_heap = mem_heap_create(512);
647 memset(&doc, 0, sizeof(doc));
648 memset(&t_ctx, 0, sizeof(t_ctx));
649 memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
650
651 doc.charset = fts_index_get_charset(
652 psort_info->psort_common->dup->index);
653
654 idx_field = dict_index_get_nth_field(
655 psort_info->psort_common->dup->index, 0);
656 word_dtype.prtype = idx_field->col->prtype;
657 word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
658 word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
659 ? DATA_VARCHAR : DATA_VARMYSQL;
660
661 block = psort_info->merge_block;
662 zip_size = dict_table_zip_size(table);
663
664 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
665
666 t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
667 processed = TRUE;
668 loop:
669 while (doc_item) {
670 dfield_t* dfield = doc_item->field;
671
672 last_doc_id = doc_item->doc_id;
673
674 ut_ad (dfield->data != NULL
675 && dfield_get_len(dfield) != UNIV_SQL_NULL);
676
677 /* If finish processing the last item, update "doc" with
678 strings in the doc_item, otherwise continue processing last
679 item */
680 if (processed) {
681 byte* data;
682 ulint data_len;
683
684 dfield = doc_item->field;
685 data = static_cast<byte*>(dfield_get_data(dfield));
686 data_len = dfield_get_len(dfield);
687
688 if (dfield_is_ext(dfield)) {
689 doc.text.f_str =
690 btr_copy_externally_stored_field(
691 &doc.text.f_len, data,
692 zip_size, data_len, blob_heap);
693 } else {
694 doc.text.f_str = data;
695 doc.text.f_len = data_len;
696 }
697
698 doc.tokens = 0;
699 t_ctx.processed_len = 0;
700 } else {
701 /* Not yet finish processing the "doc" on hand,
702 continue processing it */
703 ut_ad(doc.text.f_str);
704 ut_ad(t_ctx.processed_len < doc.text.f_len);
705 }
706
707 processed = row_merge_fts_doc_tokenize(
708 buf, doc_item->doc_id, &doc,
709 &word_dtype,
710 merge_file, psort_info->psort_common->opt_doc_id_size,
711 &t_ctx);
712
713 /* Current sort buffer full, need to recycle */
714 if (!processed) {
715 ut_ad(t_ctx.processed_len < doc.text.f_len);
716 ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
717 break;
718 }
719
720 num_doc_processed++;
721
722 if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
723 ib_logf(IB_LOG_LEVEL_INFO,
724 "number of doc processed %d\n",
725 (int) num_doc_processed);
726 #ifdef FTS_INTERNAL_DIAG_PRINT
727 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
728 ib_logf(IB_LOG_LEVEL_INFO,
729 "ID %d, partition %d, word "
730 "%d\n",(int) psort_info->psort_id,
731 (int) i, (int) mycount[i]);
732 }
733 #endif
734 }
735
736 mem_heap_empty(blob_heap);
737
738 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
739
740 if (doc_item && last_doc_id != doc_item->doc_id) {
741 t_ctx.init_pos = 0;
742 }
743 }
744
745 /* If we run out of current sort buffer, need to sort
746 and flush the sort buffer to disk */
747 if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
748 row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
749 row_merge_buf_write(buf[t_ctx.buf_used],
750 merge_file[t_ctx.buf_used],
751 block[t_ctx.buf_used]);
752
753 if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
754 merge_file[t_ctx.buf_used]->offset++,
755 block[t_ctx.buf_used])) {
756 error = DB_TEMP_FILE_WRITE_FAILURE;
757 goto func_exit;
758 }
759
760 UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
761 buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
762 mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
763 t_ctx.rows_added[t_ctx.buf_used] = 0;
764
765 ut_a(doc_item);
766 goto loop;
767 }
768
769 /* Parent done scanning, and if finish processing all the docs, exit */
770 if (psort_info->state == FTS_PARENT_COMPLETE) {
771 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
772 goto exit;
773 } else if (retried > 10000) {
774 ut_ad(!doc_item);
775 /* retied too many times and cannot get new record */
776 ib_logf(IB_LOG_LEVEL_ERROR,
777 "InnoDB: FTS parallel sort processed "
778 "%lu records, the sort queue has "
779 "%lu records. But sort cannot get "
780 "the next records", num_doc_processed,
781 UT_LIST_GET_LEN(
782 psort_info->fts_doc_list));
783 goto exit;
784 }
785 } else if (psort_info->state == FTS_PARENT_EXITING) {
786 /* Parent abort */
787 goto func_exit;
788 }
789
790 if (doc_item == NULL) {
791 os_thread_yield();
792 }
793
794 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
795
796 if (doc_item != NULL) {
797 if (last_doc_id != doc_item->doc_id) {
798 t_ctx.init_pos = 0;
799 }
800
801 retried = 0;
802 } else if (psort_info->state == FTS_PARENT_COMPLETE) {
803 retried++;
804 }
805
806 goto loop;
807
808 exit:
809 /* Do a final sort of the last (or latest) batch of records
810 in block memory. Flush them to temp file if records cannot
811 be hold in one block memory */
812 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
813 if (t_ctx.rows_added[i]) {
814 row_merge_buf_sort(buf[i], NULL);
815 row_merge_buf_write(
816 buf[i], merge_file[i], block[i]);
817
818 /* Write to temp file, only if records have
819 been flushed to temp file before (offset > 0):
820 The pseudo code for sort is following:
821
822 while (there are rows) {
823 tokenize rows, put result in block[]
824 if (block[] runs out) {
825 sort rows;
826 write to temp file with
827 row_merge_write();
828 offset++;
829 }
830 }
831
832 # write out the last batch
833 if (offset > 0) {
834 row_merge_write();
835 offset++;
836 } else {
837 # no need to write anything
838 offset stay as 0
839 }
840
841 so if merge_file[i]->offset is 0 when we come to
842 here as the last batch, this means rows have
843 never flush to temp file, it can be held all in
844 memory */
845 if (merge_file[i]->offset != 0) {
846 if (!row_merge_write(merge_file[i]->fd,
847 merge_file[i]->offset++,
848 block[i])) {
849 error = DB_TEMP_FILE_WRITE_FAILURE;
850 goto func_exit;
851 }
852
853 UNIV_MEM_INVALID(block[i][0],
854 srv_sort_buf_size);
855 }
856
857 buf[i] = row_merge_buf_empty(buf[i]);
858 t_ctx.rows_added[i] = 0;
859 }
860 }
861
862 if (fts_enable_diag_print) {
863 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n");
864 }
865
866 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
867 if (!merge_file[i]->offset) {
868 continue;
869 }
870
871 tmpfd[i] = row_merge_file_create_low(path);
872 if (tmpfd[i] < 0) {
873 error = DB_OUT_OF_MEMORY;
874 goto func_exit;
875 }
876
877 error = row_merge_sort(psort_info->psort_common->trx,
878 psort_info->psort_common->dup,
879 merge_file[i], block[i], &tmpfd[i]);
880 if (error != DB_SUCCESS) {
881 close(tmpfd[i]);
882 goto func_exit;
883 }
884
885 total_rec += merge_file[i]->n_rec;
886 close(tmpfd[i]);
887 }
888
889 func_exit:
890 if (fts_enable_diag_print) {
891 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
892 }
893
894 mem_heap_free(blob_heap);
895
896 mutex_enter(&psort_info->mutex);
897 psort_info->error = error;
898 mutex_exit(&psort_info->mutex);
899
900 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
901 /* child can exit either with error or told by parent. */
902 ut_ad(error != DB_SUCCESS
903 || psort_info->state == FTS_PARENT_EXITING);
904 }
905
906 /* Free fts doc list in case of error. */
907 do {
908 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
909 } while (doc_item != NULL);
910
911 psort_info->child_status = FTS_CHILD_COMPLETE;
912 os_event_set(psort_info->psort_common->sort_event);
913 psort_info->child_status = FTS_CHILD_EXITING;
914
915 #ifdef __WIN__
916 CloseHandle(psort_info->thread_hdl);
917 #endif /*__WIN__ */
918
919 os_thread_exit(NULL);
920
921 OS_THREAD_DUMMY_RETURN;
922 }
923
924 /*********************************************************************//**
925 Start the parallel tokenization and parallel merge sort */
926 UNIV_INTERN
927 void
row_fts_start_psort(fts_psort_t * psort_info)928 row_fts_start_psort(
929 /*================*/
930 fts_psort_t* psort_info) /*!< parallel sort structure */
931 {
932 ulint i = 0;
933 os_thread_id_t thd_id;
934
935 for (i = 0; i < fts_sort_pll_degree; i++) {
936 psort_info[i].psort_id = i;
937 psort_info[i].thread_hdl = os_thread_create(
938 fts_parallel_tokenization,
939 (void*) &psort_info[i], &thd_id);
940 }
941 }
942
943 /*********************************************************************//**
944 Function performs the merge and insertion of the sorted records.
945 @return OS_THREAD_DUMMY_RETURN */
946 UNIV_INTERN
947 os_thread_ret_t
fts_parallel_merge(void * arg)948 fts_parallel_merge(
949 /*===============*/
950 void* arg) /*!< in: parallel merge info */
951 {
952 fts_psort_t* psort_info = (fts_psort_t*) arg;
953 ulint id;
954
955 ut_ad(psort_info);
956
957 id = psort_info->psort_id;
958
959 row_fts_merge_insert(psort_info->psort_common->dup->index,
960 psort_info->psort_common->new_table,
961 psort_info->psort_common->all_info, id);
962
963 psort_info->child_status = FTS_CHILD_COMPLETE;
964 os_event_set(psort_info->psort_common->merge_event);
965 psort_info->child_status = FTS_CHILD_EXITING;
966
967 #ifdef __WIN__
968 CloseHandle(psort_info->thread_hdl);
969 #endif /*__WIN__ */
970
971 os_thread_exit(NULL, false);
972
973 OS_THREAD_DUMMY_RETURN;
974 }
975
976 /*********************************************************************//**
977 Kick off the parallel merge and insert thread */
978 UNIV_INTERN
979 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)980 row_fts_start_parallel_merge(
981 /*=========================*/
982 fts_psort_t* merge_info) /*!< in: parallel sort info */
983 {
984 int i = 0;
985 os_thread_id_t thd_id;
986
987 /* Kick off merge/insert threads */
988 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
989 merge_info[i].psort_id = i;
990 merge_info[i].child_status = 0;
991
992 merge_info[i].thread_hdl = os_thread_create(
993 fts_parallel_merge, (void*) &merge_info[i], &thd_id);
994 }
995 }
996
997 /********************************************************************//**
998 Insert processed FTS data to auxillary index tables.
999 @return DB_SUCCESS if insertion runs fine */
1000 static MY_ATTRIBUTE((nonnull))
1001 dberr_t
row_merge_write_fts_word(trx_t * trx,que_t ** ins_graph,fts_tokenizer_word_t * word,fts_table_t * fts_table,CHARSET_INFO * charset)1002 row_merge_write_fts_word(
1003 /*=====================*/
1004 trx_t* trx, /*!< in: transaction */
1005 que_t** ins_graph, /*!< in: Insert query graphs */
1006 fts_tokenizer_word_t* word, /*!< in: sorted and tokenized
1007 word */
1008 fts_table_t* fts_table, /*!< in: fts aux table instance */
1009 CHARSET_INFO* charset) /*!< in: charset */
1010 {
1011 ulint selected;
1012 dberr_t ret = DB_SUCCESS;
1013
1014 selected = fts_select_index(
1015 charset, word->text.f_str, word->text.f_len);
1016 fts_table->suffix = fts_get_suffix(selected);
1017
1018 /* Pop out each fts_node in word->nodes write them to auxiliary table */
1019 while (ib_vector_size(word->nodes) > 0) {
1020 dberr_t error;
1021 fts_node_t* fts_node;
1022
1023 fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
1024
1025 error = fts_write_node(
1026 trx, &ins_graph[selected], fts_table, &word->text,
1027 fts_node);
1028
1029 if (error != DB_SUCCESS) {
1030 fprintf(stderr, "InnoDB: failed to write"
1031 " word %s to FTS auxiliary index"
1032 " table, error (%s) \n",
1033 word->text.f_str, ut_strerr(error));
1034 ret = error;
1035 }
1036
1037 ut_free(fts_node->ilist);
1038 fts_node->ilist = NULL;
1039 }
1040
1041 return(ret);
1042 }
1043
1044 /*********************************************************************//**
1045 Read sorted FTS data files and insert data tuples to auxillary tables.
1046 @return DB_SUCCESS or error number */
1047 UNIV_INTERN
1048 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1049 row_fts_insert_tuple(
1050 /*=================*/
1051 fts_psort_insert_t*
1052 ins_ctx, /*!< in: insert context */
1053 fts_tokenizer_word_t* word, /*!< in: last processed
1054 tokenized word */
1055 ib_vector_t* positions, /*!< in: word position */
1056 doc_id_t* in_doc_id, /*!< in: last item doc id */
1057 dtuple_t* dtuple) /*!< in: entry to insert */
1058 {
1059 fts_node_t* fts_node = NULL;
1060 dfield_t* dfield;
1061 doc_id_t doc_id;
1062 ulint position;
1063 fts_string_t token_word;
1064 ulint i;
1065
1066 /* Get fts_node for the FTS auxillary INDEX table */
1067 if (ib_vector_size(word->nodes) > 0) {
1068 fts_node = static_cast<fts_node_t*>(
1069 ib_vector_last(word->nodes));
1070 }
1071
1072 if (fts_node == NULL
1073 || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1074
1075 fts_node = static_cast<fts_node_t*>(
1076 ib_vector_push(word->nodes, NULL));
1077
1078 memset(fts_node, 0x0, sizeof(*fts_node));
1079 }
1080
1081 /* If dtuple == NULL, this is the last word to be processed */
1082 if (!dtuple) {
1083 if (fts_node && ib_vector_size(positions) > 0) {
1084 fts_cache_node_add_positions(
1085 NULL, fts_node, *in_doc_id,
1086 positions);
1087
1088 /* Write out the current word */
1089 row_merge_write_fts_word(ins_ctx->trx,
1090 ins_ctx->ins_graph, word,
1091 &ins_ctx->fts_table,
1092 ins_ctx->charset);
1093
1094 }
1095
1096 return;
1097 }
1098
1099 /* Get the first field for the tokenized word */
1100 dfield = dtuple_get_nth_field(dtuple, 0);
1101
1102 token_word.f_n_char = 0;
1103 token_word.f_len = dfield->len;
1104 token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1105
1106 if (!word->text.f_str) {
1107 fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1108 }
1109
1110 /* compare to the last word, to see if they are the same
1111 word */
1112 if (innobase_fts_text_cmp(ins_ctx->charset,
1113 &word->text, &token_word) != 0) {
1114 ulint num_item;
1115
1116 /* Getting a new word, flush the last position info
1117 for the currnt word in fts_node */
1118 if (ib_vector_size(positions) > 0) {
1119 fts_cache_node_add_positions(
1120 NULL, fts_node, *in_doc_id, positions);
1121 }
1122
1123 /* Write out the current word */
1124 row_merge_write_fts_word(ins_ctx->trx, ins_ctx->ins_graph,
1125 word, &ins_ctx->fts_table,
1126 ins_ctx->charset);
1127
1128 /* Copy the new word */
1129 fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1130
1131 num_item = ib_vector_size(positions);
1132
1133 /* Clean up position queue */
1134 for (i = 0; i < num_item; i++) {
1135 ib_vector_pop(positions);
1136 }
1137
1138 /* Reset Doc ID */
1139 *in_doc_id = 0;
1140 memset(fts_node, 0x0, sizeof(*fts_node));
1141 }
1142
1143 /* Get the word's Doc ID */
1144 dfield = dtuple_get_nth_field(dtuple, 1);
1145
1146 if (!ins_ctx->opt_doc_id_size) {
1147 doc_id = fts_read_doc_id(
1148 static_cast<byte*>(dfield_get_data(dfield)));
1149 } else {
1150 doc_id = (doc_id_t) mach_read_from_4(
1151 static_cast<byte*>(dfield_get_data(dfield)));
1152 }
1153
1154 /* Get the word's position info */
1155 dfield = dtuple_get_nth_field(dtuple, 2);
1156 position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1157
1158 /* If this is the same word as the last word, and they
1159 have the same Doc ID, we just need to add its position
1160 info. Otherwise, we will flush position info to the
1161 fts_node and initiate a new position vector */
1162 if (!(*in_doc_id) || *in_doc_id == doc_id) {
1163 ib_vector_push(positions, &position);
1164 } else {
1165 ulint num_pos = ib_vector_size(positions);
1166
1167 fts_cache_node_add_positions(NULL, fts_node,
1168 *in_doc_id, positions);
1169 for (i = 0; i < num_pos; i++) {
1170 ib_vector_pop(positions);
1171 }
1172 ib_vector_push(positions, &position);
1173 }
1174
1175 /* record the current Doc ID */
1176 *in_doc_id = doc_id;
1177 }
1178
1179 /*********************************************************************//**
1180 Propagate a newly added record up one level in the selection tree
1181 @return parent where this value propagated to */
1182 static
1183 int
row_fts_sel_tree_propagate(int propogated,int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1184 row_fts_sel_tree_propagate(
1185 /*=======================*/
1186 int propogated, /*<! in: tree node propagated */
1187 int* sel_tree, /*<! in: selection tree */
1188 const mrec_t** mrec, /*<! in: sort record */
1189 ulint** offsets, /*<! in: record offsets */
1190 dict_index_t* index) /*<! in/out: FTS index */
1191 {
1192 ulint parent;
1193 int child_left;
1194 int child_right;
1195 int selected;
1196
1197 /* Find which parent this value will be propagated to */
1198 parent = (propogated - 1) / 2;
1199
1200 /* Find out which value is smaller, and to propagate */
1201 child_left = sel_tree[parent * 2 + 1];
1202 child_right = sel_tree[parent * 2 + 2];
1203
1204 if (child_left == -1 || mrec[child_left] == NULL) {
1205 if (child_right == -1
1206 || mrec[child_right] == NULL) {
1207 selected = -1;
1208 } else {
1209 selected = child_right ;
1210 }
1211 } else if (child_right == -1
1212 || mrec[child_right] == NULL) {
1213 selected = child_left;
1214 } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1215 offsets[child_left],
1216 offsets[child_right],
1217 index, NULL) < 0) {
1218 selected = child_left;
1219 } else {
1220 selected = child_right;
1221 }
1222
1223 sel_tree[parent] = selected;
1224
1225 return(static_cast<int>(parent));
1226 }
1227
1228 /*********************************************************************//**
1229 Readjust selection tree after popping the root and read a new value
1230 @return the new root */
1231 static
1232 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1233 row_fts_sel_tree_update(
1234 /*====================*/
1235 int* sel_tree, /*<! in/out: selection tree */
1236 ulint propagated, /*<! in: node to propagate up */
1237 ulint height, /*<! in: tree height */
1238 const mrec_t** mrec, /*<! in: sort record */
1239 ulint** offsets, /*<! in: record offsets */
1240 dict_index_t* index) /*<! in: index dictionary */
1241 {
1242 ulint i;
1243
1244 for (i = 1; i <= height; i++) {
1245 propagated = static_cast<ulint>(row_fts_sel_tree_propagate(
1246 static_cast<int>(propagated), sel_tree, mrec, offsets, index));
1247 }
1248
1249 return(sel_tree[0]);
1250 }
1251
1252 /*********************************************************************//**
1253 Build selection tree at a specified level */
1254 static
1255 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1256 row_fts_build_sel_tree_level(
1257 /*=========================*/
1258 int* sel_tree, /*<! in/out: selection tree */
1259 ulint level, /*<! in: selection tree level */
1260 const mrec_t** mrec, /*<! in: sort record */
1261 ulint** offsets, /*<! in: record offsets */
1262 dict_index_t* index) /*<! in: index dictionary */
1263 {
1264 ulint start;
1265 int child_left;
1266 int child_right;
1267 ulint i;
1268 ulint num_item;
1269
1270 start = static_cast<ulint>((1 << level) - 1);
1271 num_item = static_cast<ulint>(1 << level);
1272
1273 for (i = 0; i < num_item; i++) {
1274 child_left = sel_tree[(start + i) * 2 + 1];
1275 child_right = sel_tree[(start + i) * 2 + 2];
1276
1277 if (child_left == -1) {
1278 if (child_right == -1) {
1279 sel_tree[start + i] = -1;
1280 } else {
1281 sel_tree[start + i] = child_right;
1282 }
1283 continue;
1284 } else if (child_right == -1) {
1285 sel_tree[start + i] = child_left;
1286 continue;
1287 }
1288
1289 /* Deal with NULL child conditions */
1290 if (!mrec[child_left]) {
1291 if (!mrec[child_right]) {
1292 sel_tree[start + i] = -1;
1293 } else {
1294 sel_tree[start + i] = child_right;
1295 }
1296 continue;
1297 } else if (!mrec[child_right]) {
1298 sel_tree[start + i] = child_left;
1299 continue;
1300 }
1301
1302 /* Select the smaller one to set parent pointer */
1303 int cmp = cmp_rec_rec_simple(
1304 mrec[child_left], mrec[child_right],
1305 offsets[child_left], offsets[child_right],
1306 index, NULL);
1307
1308 sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1309 }
1310 }
1311
1312 /*********************************************************************//**
1313 Build a selection tree for merge. The selection tree is a binary tree
1314 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1315 @return number of tree levels */
1316 static
1317 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1318 row_fts_build_sel_tree(
1319 /*===================*/
1320 int* sel_tree, /*<! in/out: selection tree */
1321 const mrec_t** mrec, /*<! in: sort record */
1322 ulint** offsets, /*<! in: record offsets */
1323 dict_index_t* index) /*<! in: index dictionary */
1324 {
1325 ulint treelevel = 1;
1326 ulint num = 2;
1327 int i = 0;
1328 ulint start;
1329
1330 /* No need to build selection tree if we only have two merge threads */
1331 if (fts_sort_pll_degree <= 2) {
1332 return(0);
1333 }
1334
1335 while (num < fts_sort_pll_degree) {
1336 num = num << 1;
1337 treelevel++;
1338 }
1339
1340 start = (1 << treelevel) - 1;
1341
1342 for (i = 0; i < (int) fts_sort_pll_degree; i++) {
1343 sel_tree[i + start] = i;
1344 }
1345
1346 for (i = static_cast<int>(treelevel) - 1; i >= 0; i--) {
1347 row_fts_build_sel_tree_level(
1348 sel_tree, static_cast<ulint>(i), mrec, offsets, index);
1349 }
1350
1351 return(treelevel);
1352 }
1353
1354 /*********************************************************************//**
1355 Read sorted file containing index data tuples and insert these data
1356 tuples to the index
1357 @return DB_SUCCESS or error number */
1358 UNIV_INTERN
1359 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1360 row_fts_merge_insert(
1361 /*=================*/
1362 dict_index_t* index, /*!< in: index */
1363 dict_table_t* table, /*!< in: new table */
1364 fts_psort_t* psort_info, /*!< parallel sort info */
1365 ulint id) /* !< in: which auxiliary table's data
1366 to insert to */
1367 {
1368 const byte** b;
1369 mem_heap_t* tuple_heap;
1370 mem_heap_t* heap;
1371 dberr_t error = DB_SUCCESS;
1372 ulint* foffs;
1373 ulint** offsets;
1374 fts_tokenizer_word_t new_word;
1375 ib_vector_t* positions;
1376 doc_id_t last_doc_id;
1377 ib_alloc_t* heap_alloc;
1378 ulint n_bytes;
1379 ulint i;
1380 mrec_buf_t** buf;
1381 int* fd;
1382 byte** block;
1383 const mrec_t** mrec;
1384 ulint count = 0;
1385 int* sel_tree;
1386 ulint height;
1387 ulint start;
1388 fts_psort_insert_t ins_ctx;
1389 ulint count_diag = 0;
1390
1391 ut_ad(index);
1392 ut_ad(table);
1393
1394 /* We use the insert query graph as the dummy graph
1395 needed in the row module call */
1396
1397 ins_ctx.trx = trx_allocate_for_background();
1398
1399 ins_ctx.trx->op_info = "inserting index entries";
1400
1401 ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1402
1403 heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1404
1405 b = (const byte**) mem_heap_alloc(
1406 heap, sizeof (*b) * fts_sort_pll_degree);
1407 foffs = (ulint*) mem_heap_alloc(
1408 heap, sizeof(*foffs) * fts_sort_pll_degree);
1409 offsets = (ulint**) mem_heap_alloc(
1410 heap, sizeof(*offsets) * fts_sort_pll_degree);
1411 buf = (mrec_buf_t**) mem_heap_alloc(
1412 heap, sizeof(*buf) * fts_sort_pll_degree);
1413 fd = (int*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1414 block = (byte**) mem_heap_alloc(
1415 heap, sizeof(*block) * fts_sort_pll_degree);
1416 mrec = (const mrec_t**) mem_heap_alloc(
1417 heap, sizeof(*mrec) * fts_sort_pll_degree);
1418 sel_tree = (int*) mem_heap_alloc(
1419 heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1420
1421 tuple_heap = mem_heap_create(1000);
1422
1423 ins_ctx.charset = fts_index_get_charset(index);
1424 ins_ctx.heap = heap;
1425
1426 for (i = 0; i < fts_sort_pll_degree; i++) {
1427 ulint num;
1428
1429 num = 1 + REC_OFFS_HEADER_SIZE
1430 + dict_index_get_n_fields(index);
1431 offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1432 heap, num * sizeof *offsets[i]));
1433 offsets[i][0] = num;
1434 offsets[i][1] = dict_index_get_n_fields(index);
1435 block[i] = psort_info[i].merge_block[id];
1436 b[i] = psort_info[i].merge_block[id];
1437 fd[i] = psort_info[i].merge_file[id]->fd;
1438 foffs[i] = 0;
1439
1440 buf[i] = static_cast<unsigned char (*)[16384]>(
1441 mem_heap_alloc(heap, sizeof *buf[i]));
1442 count_diag += (int) psort_info[i].merge_file[id]->n_rec;
1443 }
1444
1445 if (fts_enable_diag_print) {
1446 ut_print_timestamp(stderr);
1447 fprintf(stderr, " InnoDB_FTS: to inserted %lu records\n",
1448 (ulong) count_diag);
1449 }
1450
1451 /* Initialize related variables if creating FTS indexes */
1452 heap_alloc = ib_heap_allocator_create(heap);
1453
1454 memset(&new_word, 0, sizeof(new_word));
1455
1456 new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1457 positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1458 last_doc_id = 0;
1459
1460 /* Allocate insert query graphs for FTS auxillary
1461 Index Table, note we have FTS_NUM_AUX_INDEX such index tables */
1462 n_bytes = sizeof(que_t*) * (FTS_NUM_AUX_INDEX + 1);
1463 ins_ctx.ins_graph = static_cast<que_t**>(mem_heap_alloc(heap, n_bytes));
1464 memset(ins_ctx.ins_graph, 0x0, n_bytes);
1465
1466 /* We should set the flags2 with aux_table_name here,
1467 in order to get the correct aux table names. */
1468 index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1469 DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1470 index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
1471
1472 ins_ctx.fts_table.type = FTS_INDEX_TABLE;
1473 ins_ctx.fts_table.index_id = index->id;
1474 ins_ctx.fts_table.table_id = table->id;
1475 ins_ctx.fts_table.parent = index->table->name;
1476 ins_ctx.fts_table.table = index->table;
1477
1478 for (i = 0; i < fts_sort_pll_degree; i++) {
1479 if (psort_info[i].merge_file[id]->n_rec == 0) {
1480 /* No Rows to read */
1481 mrec[i] = b[i] = NULL;
1482 } else {
1483 /* Read from temp file only if it has been
1484 written to. Otherwise, block memory holds
1485 all the sorted records */
1486 if (psort_info[i].merge_file[id]->offset > 0
1487 && (!row_merge_read(
1488 fd[i], foffs[i],
1489 (row_merge_block_t*) block[i]))) {
1490 error = DB_CORRUPTION;
1491 goto exit;
1492 }
1493
1494 ROW_MERGE_READ_GET_NEXT(i);
1495 }
1496 }
1497
1498 height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1499 offsets, index);
1500
1501 start = (1 << height) - 1;
1502
1503 /* Fetch sorted records from sort buffer and insert them into
1504 corresponding FTS index auxiliary tables */
1505 for (;;) {
1506 dtuple_t* dtuple;
1507 ulint n_ext;
1508 int min_rec = 0;
1509
1510 if (fts_sort_pll_degree <= 2) {
1511 while (!mrec[min_rec]) {
1512 min_rec++;
1513
1514 if (min_rec >= (int) fts_sort_pll_degree) {
1515 row_fts_insert_tuple(
1516 &ins_ctx, &new_word,
1517 positions, &last_doc_id,
1518 NULL);
1519
1520 goto exit;
1521 }
1522 }
1523
1524 for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1525 if (!mrec[i]) {
1526 continue;
1527 }
1528
1529 if (cmp_rec_rec_simple(
1530 mrec[i], mrec[min_rec],
1531 offsets[i], offsets[min_rec],
1532 index, NULL) < 0) {
1533 min_rec = static_cast<int>(i);
1534 }
1535 }
1536 } else {
1537 min_rec = sel_tree[0];
1538
1539 if (min_rec == -1) {
1540 row_fts_insert_tuple(
1541 &ins_ctx, &new_word,
1542 positions, &last_doc_id,
1543 NULL);
1544
1545 goto exit;
1546 }
1547 }
1548
1549 dtuple = row_rec_to_index_entry_low(
1550 mrec[min_rec], index, offsets[min_rec], &n_ext,
1551 tuple_heap);
1552
1553 row_fts_insert_tuple(
1554 &ins_ctx, &new_word, positions,
1555 &last_doc_id, dtuple);
1556
1557
1558 ROW_MERGE_READ_GET_NEXT(min_rec);
1559
1560 if (fts_sort_pll_degree > 2) {
1561 if (!mrec[min_rec]) {
1562 sel_tree[start + min_rec] = -1;
1563 }
1564
1565 row_fts_sel_tree_update(sel_tree, start + min_rec,
1566 height, mrec,
1567 offsets, index);
1568 }
1569
1570 count++;
1571
1572 mem_heap_empty(tuple_heap);
1573 }
1574
1575 exit:
1576 fts_sql_commit(ins_ctx.trx);
1577
1578 ins_ctx.trx->op_info = "";
1579
1580 mem_heap_free(tuple_heap);
1581
1582 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1583 if (ins_ctx.ins_graph[i]) {
1584 fts_que_graph_free(ins_ctx.ins_graph[i]);
1585 }
1586 }
1587
1588 trx_free_for_background(ins_ctx.trx);
1589
1590 mem_heap_free(heap);
1591
1592 if (fts_enable_diag_print) {
1593 ut_print_timestamp(stderr);
1594 fprintf(stderr, " InnoDB_FTS: inserted %lu records\n",
1595 (ulong) count);
1596 }
1597
1598 return(error);
1599 }
1600