1 /*****************************************************************************
2
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License along with
22 this program; if not, write to the Free Software Foundation, Inc.,
23 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
24
25 *****************************************************************************/
26
27 /**************************************************//**
28 @file row/row0ftsort.cc
29 Create Full Text Index with (parallel) merge sort
30
31 Created 10/13/2010 Jimmy Yang
32 *******************************************************/
33
34 #include "dict0dict.h" /* dict_table_stats_lock() */
35 #include "row0merge.h"
36 #include "pars0pars.h"
37 #include "row0ftsort.h"
38 #include "row0merge.h"
39 #include "row0row.h"
40 #include "btr0cur.h"
41
42 /** Read the next record to buffer N.
43 @param N index into array of merge info structure */
44 #define ROW_MERGE_READ_GET_NEXT(N) \
45 do { \
46 b[N] = row_merge_read_rec( \
47 block[N], buf[N], b[N], index, \
48 fd[N], &foffs[N], &mrec[N], offsets[N]); \
49 if (UNIV_UNLIKELY(!b[N])) { \
50 if (mrec[N]) { \
51 goto exit; \
52 } \
53 } \
54 } while (0)
55
56 /** Parallel sort degree */
57 UNIV_INTERN ulong fts_sort_pll_degree = 2;
58
59 /*********************************************************************//**
60 Create a temporary "fts sort index" used to merge sort the
61 tokenized doc string. The index has three "fields":
62
63 1) Tokenized word,
64 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
65 integer value)
66 3) Word's position in original doc.
67
68 @return dict_index_t structure for the fts sort index */
69 UNIV_INTERN
70 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,const dict_table_t * table,ibool * opt_doc_id_size)71 row_merge_create_fts_sort_index(
72 /*============================*/
73 dict_index_t* index, /*!< in: Original FTS index
74 based on which this sort index
75 is created */
76 const dict_table_t* table, /*!< in: table that FTS index
77 is being created on */
78 ibool* opt_doc_id_size)
79 /*!< out: whether to use 4 bytes
80 instead of 8 bytes integer to
81 store Doc ID during sort */
82 {
83 dict_index_t* new_index;
84 dict_field_t* field;
85 dict_field_t* idx_field;
86 CHARSET_INFO* charset;
87
88 // FIXME: This name shouldn't be hard coded here.
89 new_index = dict_mem_index_create(
90 index->table->name, "tmp_fts_idx", 0, DICT_FTS, 3);
91
92 new_index->id = index->id;
93 new_index->table = (dict_table_t*) table;
94 new_index->n_uniq = FTS_NUM_FIELDS_SORT;
95 new_index->n_def = FTS_NUM_FIELDS_SORT;
96 new_index->cached = TRUE;
97
98 idx_field = dict_index_get_nth_field(index, 0);
99 charset = fts_index_get_charset(index);
100
101 /* The first field is on the Tokenized Word */
102 field = dict_index_get_nth_field(new_index, 0);
103 field->name = NULL;
104 field->prefix_len = 0;
105 field->col = static_cast<dict_col_t*>(
106 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
107 field->col->len = FTS_MAX_WORD_LEN;
108
109 if (strcmp(charset->name, "latin1_swedish_ci") == 0) {
110 field->col->mtype = DATA_VARCHAR;
111 } else {
112 field->col->mtype = DATA_VARMYSQL;
113 }
114
115 field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
116 field->col->mbminmaxlen = idx_field->col->mbminmaxlen;
117 field->fixed_len = 0;
118
119 /* Doc ID */
120 field = dict_index_get_nth_field(new_index, 1);
121 field->name = NULL;
122 field->prefix_len = 0;
123 field->col = static_cast<dict_col_t*>(
124 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
125 field->col->mtype = DATA_INT;
126 *opt_doc_id_size = FALSE;
127
128 /* Check whether we can use 4 bytes instead of 8 bytes integer
129 field to hold the Doc ID, thus reduce the overall sort size */
130 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
131 /* If Doc ID column is being added by this create
132 index, then just check the number of rows in the table */
133 if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
134 *opt_doc_id_size = TRUE;
135 }
136 } else {
137 doc_id_t max_doc_id;
138
139 /* If the Doc ID column is supplied by user, then
140 check the maximum Doc ID in the table */
141 max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
142
143 if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
144 *opt_doc_id_size = TRUE;
145 }
146 }
147
148 if (*opt_doc_id_size) {
149 field->col->len = sizeof(ib_uint32_t);
150 field->fixed_len = sizeof(ib_uint32_t);
151 } else {
152 field->col->len = FTS_DOC_ID_LEN;
153 field->fixed_len = FTS_DOC_ID_LEN;
154 }
155
156 field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
157
158 field->col->mbminmaxlen = 0;
159
160 /* The third field is on the word's position in the original doc */
161 field = dict_index_get_nth_field(new_index, 2);
162 field->name = NULL;
163 field->prefix_len = 0;
164 field->col = static_cast<dict_col_t*>(
165 mem_heap_alloc(new_index->heap, sizeof(dict_col_t)));
166 field->col->mtype = DATA_INT;
167 field->col->len = 4 ;
168 field->fixed_len = 4;
169 field->col->prtype = DATA_NOT_NULL;
170 field->col->mbminmaxlen = 0;
171
172 return(new_index);
173 }
174 /*********************************************************************//**
175 Initialize FTS parallel sort structures.
176 @return TRUE if all successful */
177 UNIV_INTERN
178 ibool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,const dict_table_t * new_table,ibool opt_doc_id_size,fts_psort_t ** psort,fts_psort_t ** merge)179 row_fts_psort_info_init(
180 /*====================*/
181 trx_t* trx, /*!< in: transaction */
182 row_merge_dup_t* dup, /*!< in,own: descriptor of
183 FTS index being created */
184 const dict_table_t* new_table,/*!< in: table on which indexes are
185 created */
186 ibool opt_doc_id_size,
187 /*!< in: whether to use 4 bytes
188 instead of 8 bytes integer to
189 store Doc ID during sort */
190 fts_psort_t** psort, /*!< out: parallel sort info to be
191 instantiated */
192 fts_psort_t** merge) /*!< out: parallel merge info
193 to be instantiated */
194 {
195 ulint i;
196 ulint j;
197 fts_psort_common_t* common_info = NULL;
198 fts_psort_t* psort_info = NULL;
199 fts_psort_t* merge_info = NULL;
200 ulint block_size;
201 ibool ret = TRUE;
202
203 block_size = 3 * srv_sort_buf_size;
204
205 *psort = psort_info = static_cast<fts_psort_t*>(mem_zalloc(
206 fts_sort_pll_degree * sizeof *psort_info));
207
208 if (!psort_info) {
209 ut_free(dup);
210 return(FALSE);
211 }
212
213 /* Common Info for all sort threads */
214 common_info = static_cast<fts_psort_common_t*>(
215 mem_alloc(sizeof *common_info));
216
217 if (!common_info) {
218 ut_free(dup);
219 mem_free(psort_info);
220 return(FALSE);
221 }
222
223 common_info->dup = dup;
224 common_info->new_table = (dict_table_t*) new_table;
225 common_info->trx = trx;
226 common_info->all_info = psort_info;
227 common_info->sort_event = os_event_create();
228 common_info->merge_event = os_event_create();
229 common_info->opt_doc_id_size = opt_doc_id_size;
230
231 ut_ad(trx->mysql_thd != NULL);
232 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
233
234 /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
235 each parallel sort thread. Each "sort bucket" holds records for
236 a particular "FTS index partition" */
237 for (j = 0; j < fts_sort_pll_degree; j++) {
238
239 UT_LIST_INIT(psort_info[j].fts_doc_list);
240
241 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
242
243 psort_info[j].merge_file[i] =
244 static_cast<merge_file_t*>(
245 mem_zalloc(sizeof(merge_file_t)));
246
247 if (!psort_info[j].merge_file[i]) {
248 ret = FALSE;
249 goto func_exit;
250 }
251
252 psort_info[j].merge_buf[i] = row_merge_buf_create(
253 dup->index);
254
255 if (row_merge_file_create(psort_info[j].merge_file[i],
256 path) < 0) {
257 goto func_exit;
258 }
259
260 /* Need to align memory for O_DIRECT write */
261 psort_info[j].block_alloc[i] =
262 static_cast<row_merge_block_t*>(ut_malloc(
263 block_size + 1024));
264
265 psort_info[j].merge_block[i] =
266 static_cast<row_merge_block_t*>(
267 ut_align(
268 psort_info[j].block_alloc[i], 1024));
269
270 if (!psort_info[j].merge_block[i]) {
271 ret = FALSE;
272 goto func_exit;
273 }
274 }
275
276 psort_info[j].child_status = 0;
277 psort_info[j].state = 0;
278 psort_info[j].psort_common = common_info;
279 psort_info[j].error = DB_SUCCESS;
280 psort_info[j].memory_used = 0;
281 mutex_create(fts_pll_tokenize_mutex_key, &psort_info[j].mutex, SYNC_FTS_TOKENIZE);
282 }
283
284 /* Initialize merge_info structures parallel merge and insert
285 into auxiliary FTS tables (FTS_INDEX_TABLE) */
286 *merge = merge_info = static_cast<fts_psort_t*>(
287 mem_alloc(FTS_NUM_AUX_INDEX * sizeof *merge_info));
288
289 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
290
291 merge_info[j].child_status = 0;
292 merge_info[j].state = 0;
293 merge_info[j].psort_common = common_info;
294 }
295
296 func_exit:
297 if (!ret) {
298 row_fts_psort_info_destroy(psort_info, merge_info);
299 }
300
301 return(ret);
302 }
303 /*********************************************************************//**
304 Clean up and deallocate FTS parallel sort structures, and close the
305 merge sort files */
306 UNIV_INTERN
307 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)308 row_fts_psort_info_destroy(
309 /*=======================*/
310 fts_psort_t* psort_info, /*!< parallel sort info */
311 fts_psort_t* merge_info) /*!< parallel merge info */
312 {
313 ulint i;
314 ulint j;
315
316 if (psort_info) {
317 for (j = 0; j < fts_sort_pll_degree; j++) {
318 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
319 if (psort_info[j].merge_file[i]) {
320 row_merge_file_destroy(
321 psort_info[j].merge_file[i]);
322 }
323
324 if (psort_info[j].block_alloc[i]) {
325 ut_free(psort_info[j].block_alloc[i]);
326 }
327 mem_free(psort_info[j].merge_file[i]);
328 }
329
330 mutex_free(&psort_info[j].mutex);
331 }
332
333 os_event_free(merge_info[0].psort_common->sort_event);
334 os_event_free(merge_info[0].psort_common->merge_event);
335 ut_free(merge_info[0].psort_common->dup);
336 mem_free(merge_info[0].psort_common);
337 mem_free(psort_info);
338 }
339
340 if (merge_info) {
341 mem_free(merge_info);
342 }
343 }
344 /*********************************************************************//**
345 Free up merge buffers when merge sort is done */
346 UNIV_INTERN
347 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)348 row_fts_free_pll_merge_buf(
349 /*=======================*/
350 fts_psort_t* psort_info) /*!< in: parallel sort info */
351 {
352 ulint j;
353 ulint i;
354
355 if (!psort_info) {
356 return;
357 }
358
359 for (j = 0; j < fts_sort_pll_degree; j++) {
360 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
361 row_merge_buf_free(psort_info[j].merge_buf[i]);
362 }
363 }
364
365 return;
366 }
367
368 /*********************************************************************//**
369 Tokenize incoming text data and add to the sort buffer.
370 @return TRUE if the record passed, FALSE if out of space */
371 static
372 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,dtype_t * word_dtype,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)373 row_merge_fts_doc_tokenize(
374 /*=======================*/
375 row_merge_buf_t** sort_buf, /*!< in/out: sort buffer */
376 doc_id_t doc_id, /*!< in: Doc ID */
377 fts_doc_t* doc, /*!< in: Doc to be tokenized */
378 dtype_t* word_dtype, /*!< in: data structure for
379 word col */
380 merge_file_t** merge_file, /*!< in/out: merge file */
381 ibool opt_doc_id_size,/*!< in: whether to use 4 bytes
382 instead of 8 bytes integer to
383 store Doc ID during sort*/
384 fts_tokenize_ctx_t* t_ctx) /*!< in/out: tokenize context */
385 {
386 ulint i;
387 ulint inc;
388 fts_string_t str;
389 ulint len;
390 row_merge_buf_t* buf;
391 dfield_t* field;
392 fts_string_t t_str;
393 ibool buf_full = FALSE;
394 byte str_buf[FTS_MAX_WORD_LEN + 1];
395 ulint data_size[FTS_NUM_AUX_INDEX];
396 ulint n_tuple[FTS_NUM_AUX_INDEX];
397
398 t_str.f_n_char = 0;
399 t_ctx->buf_used = 0;
400
401 memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
402 memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
403
404 /* Tokenize the data and add each word string, its corresponding
405 doc id and position to sort buffer */
406 for (i = t_ctx->processed_len; i < doc->text.f_len; i += inc) {
407 ib_rbt_bound_t parent;
408 ulint idx = 0;
409 ib_uint32_t position;
410 ulint offset = 0;
411 ulint cur_len = 0;
412 doc_id_t write_doc_id;
413
414 inc = innobase_mysql_fts_get_token(
415 doc->charset, doc->text.f_str + i,
416 doc->text.f_str + doc->text.f_len, &str, &offset);
417
418 ut_a(inc > 0);
419
420 /* Ignore string whose character number is less than
421 "fts_min_token_size" or more than "fts_max_token_size" */
422 if (str.f_n_char < fts_min_token_size
423 || str.f_n_char > fts_max_token_size) {
424
425 t_ctx->processed_len += inc;
426 continue;
427 }
428
429 t_str.f_len = innobase_fts_casedn_str(
430 doc->charset, (char*) str.f_str, str.f_len,
431 (char*) &str_buf, FTS_MAX_WORD_LEN + 1);
432
433 t_str.f_str = (byte*) &str_buf;
434
435 /* if "cached_stopword" is defined, ingore words in the
436 stopword list */
437 if (t_ctx->cached_stopword
438 && rbt_search(t_ctx->cached_stopword,
439 &parent, &t_str) == 0) {
440
441 t_ctx->processed_len += inc;
442 continue;
443 }
444
445 /* There are FTS_NUM_AUX_INDEX auxiliary tables, find
446 out which sort buffer to put this word record in */
447 t_ctx->buf_used = fts_select_index(
448 doc->charset, t_str.f_str, t_str.f_len);
449
450 buf = sort_buf[t_ctx->buf_used];
451
452 ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
453 idx = t_ctx->buf_used;
454
455 mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
456
457 field = mtuple->fields = static_cast<dfield_t*>(
458 mem_heap_alloc(buf->heap,
459 FTS_NUM_FIELDS_SORT * sizeof *field));
460
461 /* The first field is the tokenized word */
462 dfield_set_data(field, t_str.f_str, t_str.f_len);
463 len = dfield_get_len(field);
464
465 field->type.mtype = word_dtype->mtype;
466 field->type.prtype = word_dtype->prtype | DATA_NOT_NULL;
467
468 /* Variable length field, set to max size. */
469 field->type.len = FTS_MAX_WORD_LEN;
470 field->type.mbminmaxlen = word_dtype->mbminmaxlen;
471
472 cur_len += len;
473 dfield_dup(field, buf->heap);
474 field++;
475
476 /* The second field is the Doc ID */
477
478 ib_uint32_t doc_id_32_bit;
479
480 if (!opt_doc_id_size) {
481 fts_write_doc_id((byte*) &write_doc_id, doc_id);
482
483 dfield_set_data(
484 field, &write_doc_id, sizeof(write_doc_id));
485 } else {
486 mach_write_to_4(
487 (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
488
489 dfield_set_data(
490 field, &doc_id_32_bit, sizeof(doc_id_32_bit));
491 }
492
493 len = field->len;
494 ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
495
496 field->type.mtype = DATA_INT;
497 field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
498 field->type.len = len;
499 field->type.mbminmaxlen = 0;
500
501 cur_len += len;
502 dfield_dup(field, buf->heap);
503
504 ++field;
505
506 /* The third field is the position */
507 mach_write_to_4(
508 (byte*) &position,
509 (i + offset + inc - str.f_len + t_ctx->init_pos));
510
511 dfield_set_data(field, &position, sizeof(position));
512 len = dfield_get_len(field);
513 ut_ad(len == sizeof(ib_uint32_t));
514
515 field->type.mtype = DATA_INT;
516 field->type.prtype = DATA_NOT_NULL;
517 field->type.len = len;
518 field->type.mbminmaxlen = 0;
519 cur_len += len;
520 dfield_dup(field, buf->heap);
521
522 /* One variable length column, word with its lenght less than
523 fts_max_token_size, add one extra size and one extra byte.
524
525 Since the max length for FTS token now is larger than 255,
526 so we will need to signify length byte itself, so only 1 to 128
527 bytes can be used for 1 bytes, larger than that 2 bytes. */
528 if (t_str.f_len < 128) {
529 /* Extra size is one byte. */
530 cur_len += 2;
531 } else {
532 /* Extra size is two bytes. */
533 cur_len += 3;
534 }
535
536 /* Reserve one byte for the end marker of row_merge_block_t. */
537 if (buf->total_size + data_size[idx] + cur_len
538 >= srv_sort_buf_size - 1) {
539
540 buf_full = TRUE;
541 break;
542 }
543
544 /* Increment the number of tuples */
545 n_tuple[idx]++;
546 t_ctx->processed_len += inc;
547 data_size[idx] += cur_len;
548 }
549
550 /* Update the data length and the number of new word tuples
551 added in this round of tokenization */
552 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
553 /* The computation of total_size below assumes that no
554 delete-mark flags will be stored and that all fields
555 are NOT NULL and fixed-length. */
556
557 sort_buf[i]->total_size += data_size[i];
558
559 sort_buf[i]->n_tuples += n_tuple[i];
560
561 merge_file[i]->n_rec += n_tuple[i];
562 t_ctx->rows_added[i] += n_tuple[i];
563 }
564
565 if (!buf_full) {
566 /* we pad one byte between text accross two fields */
567 t_ctx->init_pos += doc->text.f_len + 1;
568 }
569
570 return(!buf_full);
571 }
572
573 /*********************************************************************//**
574 Get next doc item from fts_doc_list */
575 UNIV_INLINE
576 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)577 row_merge_fts_get_next_doc_item(
578 /*============================*/
579 fts_psort_t* psort_info, /*!< in: psort_info */
580 fts_doc_item_t** doc_item) /*!< in/out: doc item */
581 {
582 if (*doc_item != NULL) {
583 ut_free(*doc_item);
584 }
585
586 mutex_enter(&psort_info->mutex);
587
588 *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
589 if (*doc_item != NULL) {
590 UT_LIST_REMOVE(doc_list, psort_info->fts_doc_list,
591 *doc_item);
592
593 ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
594 + (*doc_item)->field->len);
595 psort_info->memory_used -= sizeof(fts_doc_item_t)
596 + (*doc_item)->field->len;
597 }
598
599 mutex_exit(&psort_info->mutex);
600 }
601
602 /*********************************************************************//**
603 Function performs parallel tokenization of the incoming doc strings.
604 It also performs the initial in memory sort of the parsed records.
605 @return OS_THREAD_DUMMY_RETURN */
606 UNIV_INTERN
607 os_thread_ret_t
fts_parallel_tokenization(void * arg)608 fts_parallel_tokenization(
609 /*======================*/
610 void* arg) /*!< in: psort_info for the thread */
611 {
612 fts_psort_t* psort_info = (fts_psort_t*) arg;
613 ulint i;
614 fts_doc_item_t* doc_item = NULL;
615 row_merge_buf_t** buf;
616 ibool processed = FALSE;
617 merge_file_t** merge_file;
618 row_merge_block_t** block;
619 int tmpfd[FTS_NUM_AUX_INDEX];
620 ulint mycount[FTS_NUM_AUX_INDEX];
621 ib_uint64_t total_rec = 0;
622 ulint num_doc_processed = 0;
623 doc_id_t last_doc_id = 0;
624 ulint zip_size;
625 mem_heap_t* blob_heap = NULL;
626 fts_doc_t doc;
627 dict_table_t* table = psort_info->psort_common->new_table;
628 dtype_t word_dtype;
629 dict_field_t* idx_field;
630 fts_tokenize_ctx_t t_ctx;
631 ulint retried = 0;
632 dberr_t error = DB_SUCCESS;
633
634 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
635
636 const char* path = thd_innodb_tmpdir(
637 psort_info->psort_common->trx->mysql_thd);
638
639 ut_ad(psort_info);
640
641 buf = psort_info->merge_buf;
642 merge_file = psort_info->merge_file;
643 blob_heap = mem_heap_create(512);
644 memset(&doc, 0, sizeof(doc));
645 memset(&t_ctx, 0, sizeof(t_ctx));
646 memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
647
648 doc.charset = fts_index_get_charset(
649 psort_info->psort_common->dup->index);
650
651 idx_field = dict_index_get_nth_field(
652 psort_info->psort_common->dup->index, 0);
653 word_dtype.prtype = idx_field->col->prtype;
654 word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
655 word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
656 ? DATA_VARCHAR : DATA_VARMYSQL;
657
658 block = psort_info->merge_block;
659 zip_size = dict_table_zip_size(table);
660
661 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
662
663 t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
664 processed = TRUE;
665 loop:
666 while (doc_item) {
667 dfield_t* dfield = doc_item->field;
668
669 last_doc_id = doc_item->doc_id;
670
671 ut_ad (dfield->data != NULL
672 && dfield_get_len(dfield) != UNIV_SQL_NULL);
673
674 /* If finish processing the last item, update "doc" with
675 strings in the doc_item, otherwise continue processing last
676 item */
677 if (processed) {
678 byte* data;
679 ulint data_len;
680
681 dfield = doc_item->field;
682 data = static_cast<byte*>(dfield_get_data(dfield));
683 data_len = dfield_get_len(dfield);
684
685 if (dfield_is_ext(dfield)) {
686 doc.text.f_str =
687 btr_copy_externally_stored_field(
688 &doc.text.f_len, data,
689 zip_size, data_len, blob_heap);
690 } else {
691 doc.text.f_str = data;
692 doc.text.f_len = data_len;
693 }
694
695 doc.tokens = 0;
696 t_ctx.processed_len = 0;
697 } else {
698 /* Not yet finish processing the "doc" on hand,
699 continue processing it */
700 ut_ad(doc.text.f_str);
701 ut_ad(t_ctx.processed_len < doc.text.f_len);
702 }
703
704 processed = row_merge_fts_doc_tokenize(
705 buf, doc_item->doc_id, &doc,
706 &word_dtype,
707 merge_file, psort_info->psort_common->opt_doc_id_size,
708 &t_ctx);
709
710 /* Current sort buffer full, need to recycle */
711 if (!processed) {
712 ut_ad(t_ctx.processed_len < doc.text.f_len);
713 ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
714 break;
715 }
716
717 num_doc_processed++;
718
719 if (fts_enable_diag_print && num_doc_processed % 10000 == 1) {
720 ib_logf(IB_LOG_LEVEL_INFO,
721 "number of doc processed %d\n",
722 (int) num_doc_processed);
723 #ifdef FTS_INTERNAL_DIAG_PRINT
724 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
725 ib_logf(IB_LOG_LEVEL_INFO,
726 "ID %d, partition %d, word "
727 "%d\n",(int) psort_info->psort_id,
728 (int) i, (int) mycount[i]);
729 }
730 #endif
731 }
732
733 mem_heap_empty(blob_heap);
734
735 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
736
737 if (doc_item && last_doc_id != doc_item->doc_id) {
738 t_ctx.init_pos = 0;
739 }
740 }
741
742 /* If we run out of current sort buffer, need to sort
743 and flush the sort buffer to disk */
744 if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
745 row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
746 row_merge_buf_write(buf[t_ctx.buf_used],
747 merge_file[t_ctx.buf_used],
748 block[t_ctx.buf_used]);
749
750 if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
751 merge_file[t_ctx.buf_used]->offset++,
752 block[t_ctx.buf_used])) {
753 error = DB_TEMP_FILE_WRITE_FAILURE;
754 goto func_exit;
755 }
756
757 UNIV_MEM_INVALID(block[t_ctx.buf_used][0], srv_sort_buf_size);
758 buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
759 mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
760 t_ctx.rows_added[t_ctx.buf_used] = 0;
761
762 ut_a(doc_item);
763 goto loop;
764 }
765
766 /* Parent done scanning, and if finish processing all the docs, exit */
767 if (psort_info->state == FTS_PARENT_COMPLETE) {
768 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
769 goto exit;
770 } else if (retried > 10000) {
771 ut_ad(!doc_item);
772 /* retied too many times and cannot get new record */
773 ib_logf(IB_LOG_LEVEL_ERROR,
774 "InnoDB: FTS parallel sort processed "
775 "%lu records, the sort queue has "
776 "%lu records. But sort cannot get "
777 "the next records", num_doc_processed,
778 UT_LIST_GET_LEN(
779 psort_info->fts_doc_list));
780 goto exit;
781 }
782 } else if (psort_info->state == FTS_PARENT_EXITING) {
783 /* Parent abort */
784 goto func_exit;
785 }
786
787 if (doc_item == NULL) {
788 os_thread_yield();
789 }
790
791 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
792
793 if (doc_item != NULL) {
794 if (last_doc_id != doc_item->doc_id) {
795 t_ctx.init_pos = 0;
796 }
797
798 retried = 0;
799 } else if (psort_info->state == FTS_PARENT_COMPLETE) {
800 retried++;
801 }
802
803 goto loop;
804
805 exit:
806 /* Do a final sort of the last (or latest) batch of records
807 in block memory. Flush them to temp file if records cannot
808 be hold in one block memory */
809 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
810 if (t_ctx.rows_added[i]) {
811 row_merge_buf_sort(buf[i], NULL);
812 row_merge_buf_write(
813 buf[i], merge_file[i], block[i]);
814
815 /* Write to temp file, only if records have
816 been flushed to temp file before (offset > 0):
817 The pseudo code for sort is following:
818
819 while (there are rows) {
820 tokenize rows, put result in block[]
821 if (block[] runs out) {
822 sort rows;
823 write to temp file with
824 row_merge_write();
825 offset++;
826 }
827 }
828
829 # write out the last batch
830 if (offset > 0) {
831 row_merge_write();
832 offset++;
833 } else {
834 # no need to write anything
835 offset stay as 0
836 }
837
838 so if merge_file[i]->offset is 0 when we come to
839 here as the last batch, this means rows have
840 never flush to temp file, it can be held all in
841 memory */
842 if (merge_file[i]->offset != 0) {
843 if (!row_merge_write(merge_file[i]->fd,
844 merge_file[i]->offset++,
845 block[i])) {
846 error = DB_TEMP_FILE_WRITE_FAILURE;
847 goto func_exit;
848 }
849
850 UNIV_MEM_INVALID(block[i][0],
851 srv_sort_buf_size);
852 }
853
854 buf[i] = row_merge_buf_empty(buf[i]);
855 t_ctx.rows_added[i] = 0;
856 }
857 }
858
859 if (fts_enable_diag_print) {
860 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n");
861 }
862
863 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
864 if (!merge_file[i]->offset) {
865 continue;
866 }
867
868 tmpfd[i] = row_merge_file_create_low(path);
869 if (tmpfd[i] < 0) {
870 error = DB_OUT_OF_MEMORY;
871 goto func_exit;
872 }
873
874 error = row_merge_sort(psort_info->psort_common->trx,
875 psort_info->psort_common->dup,
876 merge_file[i], block[i], &tmpfd[i]);
877 if (error != DB_SUCCESS) {
878 close(tmpfd[i]);
879 goto func_exit;
880 }
881
882 total_rec += merge_file[i]->n_rec;
883 close(tmpfd[i]);
884 }
885
886 func_exit:
887 if (fts_enable_diag_print) {
888 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
889 }
890
891 mem_heap_free(blob_heap);
892
893 mutex_enter(&psort_info->mutex);
894 psort_info->error = error;
895 mutex_exit(&psort_info->mutex);
896
897 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
898 /* child can exit either with error or told by parent. */
899 ut_ad(error != DB_SUCCESS
900 || psort_info->state == FTS_PARENT_EXITING);
901 }
902
903 /* Free fts doc list in case of error. */
904 do {
905 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
906 } while (doc_item != NULL);
907
908 psort_info->child_status = FTS_CHILD_COMPLETE;
909 os_event_set(psort_info->psort_common->sort_event);
910 psort_info->child_status = FTS_CHILD_EXITING;
911
912 #ifdef __WIN__
913 CloseHandle(psort_info->thread_hdl);
914 #endif /*__WIN__ */
915
916 os_thread_exit(NULL);
917
918 OS_THREAD_DUMMY_RETURN;
919 }
920
921 /*********************************************************************//**
922 Start the parallel tokenization and parallel merge sort */
923 UNIV_INTERN
924 void
row_fts_start_psort(fts_psort_t * psort_info)925 row_fts_start_psort(
926 /*================*/
927 fts_psort_t* psort_info) /*!< parallel sort structure */
928 {
929 ulint i = 0;
930 os_thread_id_t thd_id;
931
932 for (i = 0; i < fts_sort_pll_degree; i++) {
933 psort_info[i].psort_id = i;
934 psort_info[i].thread_hdl = os_thread_create(
935 fts_parallel_tokenization,
936 (void*) &psort_info[i], &thd_id);
937 }
938 }
939
940 /*********************************************************************//**
941 Function performs the merge and insertion of the sorted records.
942 @return OS_THREAD_DUMMY_RETURN */
943 UNIV_INTERN
944 os_thread_ret_t
fts_parallel_merge(void * arg)945 fts_parallel_merge(
946 /*===============*/
947 void* arg) /*!< in: parallel merge info */
948 {
949 fts_psort_t* psort_info = (fts_psort_t*) arg;
950 ulint id;
951
952 ut_ad(psort_info);
953
954 id = psort_info->psort_id;
955
956 row_fts_merge_insert(psort_info->psort_common->dup->index,
957 psort_info->psort_common->new_table,
958 psort_info->psort_common->all_info, id);
959
960 psort_info->child_status = FTS_CHILD_COMPLETE;
961 os_event_set(psort_info->psort_common->merge_event);
962 psort_info->child_status = FTS_CHILD_EXITING;
963
964 #ifdef __WIN__
965 CloseHandle(psort_info->thread_hdl);
966 #endif /*__WIN__ */
967
968 os_thread_exit(NULL, false);
969
970 OS_THREAD_DUMMY_RETURN;
971 }
972
973 /*********************************************************************//**
974 Kick off the parallel merge and insert thread */
975 UNIV_INTERN
976 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)977 row_fts_start_parallel_merge(
978 /*=========================*/
979 fts_psort_t* merge_info) /*!< in: parallel sort info */
980 {
981 int i = 0;
982 os_thread_id_t thd_id;
983
984 /* Kick off merge/insert threads */
985 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
986 merge_info[i].psort_id = i;
987 merge_info[i].child_status = 0;
988
989 merge_info[i].thread_hdl = os_thread_create(
990 fts_parallel_merge, (void*) &merge_info[i], &thd_id);
991 }
992 }
993
994 /********************************************************************//**
995 Insert processed FTS data to auxillary index tables.
996 @return DB_SUCCESS if insertion runs fine */
997 static MY_ATTRIBUTE((nonnull))
998 dberr_t
row_merge_write_fts_word(trx_t * trx,que_t ** ins_graph,fts_tokenizer_word_t * word,fts_table_t * fts_table,CHARSET_INFO * charset)999 row_merge_write_fts_word(
1000 /*=====================*/
1001 trx_t* trx, /*!< in: transaction */
1002 que_t** ins_graph, /*!< in: Insert query graphs */
1003 fts_tokenizer_word_t* word, /*!< in: sorted and tokenized
1004 word */
1005 fts_table_t* fts_table, /*!< in: fts aux table instance */
1006 CHARSET_INFO* charset) /*!< in: charset */
1007 {
1008 ulint selected;
1009 dberr_t ret = DB_SUCCESS;
1010
1011 selected = fts_select_index(
1012 charset, word->text.f_str, word->text.f_len);
1013 fts_table->suffix = fts_get_suffix(selected);
1014
1015 /* Pop out each fts_node in word->nodes write them to auxiliary table */
1016 while (ib_vector_size(word->nodes) > 0) {
1017 dberr_t error;
1018 fts_node_t* fts_node;
1019
1020 fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
1021
1022 error = fts_write_node(
1023 trx, &ins_graph[selected], fts_table, &word->text,
1024 fts_node);
1025
1026 if (error != DB_SUCCESS) {
1027 fprintf(stderr, "InnoDB: failed to write"
1028 " word %s to FTS auxiliary index"
1029 " table, error (%s) \n",
1030 word->text.f_str, ut_strerr(error));
1031 ret = error;
1032 }
1033
1034 ut_free(fts_node->ilist);
1035 fts_node->ilist = NULL;
1036 }
1037
1038 return(ret);
1039 }
1040
1041 /*********************************************************************//**
1042 Read sorted FTS data files and insert data tuples to auxillary tables.
1043 @return DB_SUCCESS or error number */
1044 UNIV_INTERN
1045 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1046 row_fts_insert_tuple(
1047 /*=================*/
1048 fts_psort_insert_t*
1049 ins_ctx, /*!< in: insert context */
1050 fts_tokenizer_word_t* word, /*!< in: last processed
1051 tokenized word */
1052 ib_vector_t* positions, /*!< in: word position */
1053 doc_id_t* in_doc_id, /*!< in: last item doc id */
1054 dtuple_t* dtuple) /*!< in: entry to insert */
1055 {
1056 fts_node_t* fts_node = NULL;
1057 dfield_t* dfield;
1058 doc_id_t doc_id;
1059 ulint position;
1060 fts_string_t token_word;
1061 ulint i;
1062
1063 /* Get fts_node for the FTS auxillary INDEX table */
1064 if (ib_vector_size(word->nodes) > 0) {
1065 fts_node = static_cast<fts_node_t*>(
1066 ib_vector_last(word->nodes));
1067 }
1068
1069 if (fts_node == NULL
1070 || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1071
1072 fts_node = static_cast<fts_node_t*>(
1073 ib_vector_push(word->nodes, NULL));
1074
1075 memset(fts_node, 0x0, sizeof(*fts_node));
1076 }
1077
1078 /* If dtuple == NULL, this is the last word to be processed */
1079 if (!dtuple) {
1080 if (fts_node && ib_vector_size(positions) > 0) {
1081 fts_cache_node_add_positions(
1082 NULL, fts_node, *in_doc_id,
1083 positions);
1084
1085 /* Write out the current word */
1086 row_merge_write_fts_word(ins_ctx->trx,
1087 ins_ctx->ins_graph, word,
1088 &ins_ctx->fts_table,
1089 ins_ctx->charset);
1090
1091 }
1092
1093 return;
1094 }
1095
1096 /* Get the first field for the tokenized word */
1097 dfield = dtuple_get_nth_field(dtuple, 0);
1098
1099 token_word.f_n_char = 0;
1100 token_word.f_len = dfield->len;
1101 token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1102
1103 if (!word->text.f_str) {
1104 fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1105 }
1106
1107 /* compare to the last word, to see if they are the same
1108 word */
1109 if (innobase_fts_text_cmp(ins_ctx->charset,
1110 &word->text, &token_word) != 0) {
1111 ulint num_item;
1112
1113 /* Getting a new word, flush the last position info
1114 for the currnt word in fts_node */
1115 if (ib_vector_size(positions) > 0) {
1116 fts_cache_node_add_positions(
1117 NULL, fts_node, *in_doc_id, positions);
1118 }
1119
1120 /* Write out the current word */
1121 row_merge_write_fts_word(ins_ctx->trx, ins_ctx->ins_graph,
1122 word, &ins_ctx->fts_table,
1123 ins_ctx->charset);
1124
1125 /* Copy the new word */
1126 fts_utf8_string_dup(&word->text, &token_word, ins_ctx->heap);
1127
1128 num_item = ib_vector_size(positions);
1129
1130 /* Clean up position queue */
1131 for (i = 0; i < num_item; i++) {
1132 ib_vector_pop(positions);
1133 }
1134
1135 /* Reset Doc ID */
1136 *in_doc_id = 0;
1137 memset(fts_node, 0x0, sizeof(*fts_node));
1138 }
1139
1140 /* Get the word's Doc ID */
1141 dfield = dtuple_get_nth_field(dtuple, 1);
1142
1143 if (!ins_ctx->opt_doc_id_size) {
1144 doc_id = fts_read_doc_id(
1145 static_cast<byte*>(dfield_get_data(dfield)));
1146 } else {
1147 doc_id = (doc_id_t) mach_read_from_4(
1148 static_cast<byte*>(dfield_get_data(dfield)));
1149 }
1150
1151 /* Get the word's position info */
1152 dfield = dtuple_get_nth_field(dtuple, 2);
1153 position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1154
1155 /* If this is the same word as the last word, and they
1156 have the same Doc ID, we just need to add its position
1157 info. Otherwise, we will flush position info to the
1158 fts_node and initiate a new position vector */
1159 if (!(*in_doc_id) || *in_doc_id == doc_id) {
1160 ib_vector_push(positions, &position);
1161 } else {
1162 ulint num_pos = ib_vector_size(positions);
1163
1164 fts_cache_node_add_positions(NULL, fts_node,
1165 *in_doc_id, positions);
1166 for (i = 0; i < num_pos; i++) {
1167 ib_vector_pop(positions);
1168 }
1169 ib_vector_push(positions, &position);
1170 }
1171
1172 /* record the current Doc ID */
1173 *in_doc_id = doc_id;
1174 }
1175
1176 /*********************************************************************//**
1177 Propagate a newly added record up one level in the selection tree
1178 @return parent where this value propagated to */
1179 static
1180 int
row_fts_sel_tree_propagate(int propogated,int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1181 row_fts_sel_tree_propagate(
1182 /*=======================*/
1183 int propogated, /*<! in: tree node propagated */
1184 int* sel_tree, /*<! in: selection tree */
1185 const mrec_t** mrec, /*<! in: sort record */
1186 ulint** offsets, /*<! in: record offsets */
1187 dict_index_t* index) /*<! in/out: FTS index */
1188 {
1189 ulint parent;
1190 int child_left;
1191 int child_right;
1192 int selected;
1193
1194 /* Find which parent this value will be propagated to */
1195 parent = (propogated - 1) / 2;
1196
1197 /* Find out which value is smaller, and to propagate */
1198 child_left = sel_tree[parent * 2 + 1];
1199 child_right = sel_tree[parent * 2 + 2];
1200
1201 if (child_left == -1 || mrec[child_left] == NULL) {
1202 if (child_right == -1
1203 || mrec[child_right] == NULL) {
1204 selected = -1;
1205 } else {
1206 selected = child_right ;
1207 }
1208 } else if (child_right == -1
1209 || mrec[child_right] == NULL) {
1210 selected = child_left;
1211 } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1212 offsets[child_left],
1213 offsets[child_right],
1214 index, NULL) < 0) {
1215 selected = child_left;
1216 } else {
1217 selected = child_right;
1218 }
1219
1220 sel_tree[parent] = selected;
1221
1222 return(static_cast<int>(parent));
1223 }
1224
1225 /*********************************************************************//**
1226 Readjust selection tree after popping the root and read a new value
1227 @return the new root */
1228 static
1229 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1230 row_fts_sel_tree_update(
1231 /*====================*/
1232 int* sel_tree, /*<! in/out: selection tree */
1233 ulint propagated, /*<! in: node to propagate up */
1234 ulint height, /*<! in: tree height */
1235 const mrec_t** mrec, /*<! in: sort record */
1236 ulint** offsets, /*<! in: record offsets */
1237 dict_index_t* index) /*<! in: index dictionary */
1238 {
1239 ulint i;
1240
1241 for (i = 1; i <= height; i++) {
1242 propagated = static_cast<ulint>(row_fts_sel_tree_propagate(
1243 static_cast<int>(propagated), sel_tree, mrec, offsets, index));
1244 }
1245
1246 return(sel_tree[0]);
1247 }
1248
1249 /*********************************************************************//**
1250 Build selection tree at a specified level */
1251 static
1252 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1253 row_fts_build_sel_tree_level(
1254 /*=========================*/
1255 int* sel_tree, /*<! in/out: selection tree */
1256 ulint level, /*<! in: selection tree level */
1257 const mrec_t** mrec, /*<! in: sort record */
1258 ulint** offsets, /*<! in: record offsets */
1259 dict_index_t* index) /*<! in: index dictionary */
1260 {
1261 ulint start;
1262 int child_left;
1263 int child_right;
1264 ulint i;
1265 ulint num_item;
1266
1267 start = static_cast<ulint>((1 << level) - 1);
1268 num_item = static_cast<ulint>(1 << level);
1269
1270 for (i = 0; i < num_item; i++) {
1271 child_left = sel_tree[(start + i) * 2 + 1];
1272 child_right = sel_tree[(start + i) * 2 + 2];
1273
1274 if (child_left == -1) {
1275 if (child_right == -1) {
1276 sel_tree[start + i] = -1;
1277 } else {
1278 sel_tree[start + i] = child_right;
1279 }
1280 continue;
1281 } else if (child_right == -1) {
1282 sel_tree[start + i] = child_left;
1283 continue;
1284 }
1285
1286 /* Deal with NULL child conditions */
1287 if (!mrec[child_left]) {
1288 if (!mrec[child_right]) {
1289 sel_tree[start + i] = -1;
1290 } else {
1291 sel_tree[start + i] = child_right;
1292 }
1293 continue;
1294 } else if (!mrec[child_right]) {
1295 sel_tree[start + i] = child_left;
1296 continue;
1297 }
1298
1299 /* Select the smaller one to set parent pointer */
1300 int cmp = cmp_rec_rec_simple(
1301 mrec[child_left], mrec[child_right],
1302 offsets[child_left], offsets[child_right],
1303 index, NULL);
1304
1305 sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1306 }
1307 }
1308
1309 /*********************************************************************//**
1310 Build a selection tree for merge. The selection tree is a binary tree
1311 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1312 @return number of tree levels */
1313 static
1314 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,ulint ** offsets,dict_index_t * index)1315 row_fts_build_sel_tree(
1316 /*===================*/
1317 int* sel_tree, /*<! in/out: selection tree */
1318 const mrec_t** mrec, /*<! in: sort record */
1319 ulint** offsets, /*<! in: record offsets */
1320 dict_index_t* index) /*<! in: index dictionary */
1321 {
1322 ulint treelevel = 1;
1323 ulint num = 2;
1324 int i = 0;
1325 ulint start;
1326
1327 /* No need to build selection tree if we only have two merge threads */
1328 if (fts_sort_pll_degree <= 2) {
1329 return(0);
1330 }
1331
1332 while (num < fts_sort_pll_degree) {
1333 num = num << 1;
1334 treelevel++;
1335 }
1336
1337 start = (1 << treelevel) - 1;
1338
1339 for (i = 0; i < (int) fts_sort_pll_degree; i++) {
1340 sel_tree[i + start] = i;
1341 }
1342
1343 for (i = static_cast<int>(treelevel) - 1; i >= 0; i--) {
1344 row_fts_build_sel_tree_level(
1345 sel_tree, static_cast<ulint>(i), mrec, offsets, index);
1346 }
1347
1348 return(treelevel);
1349 }
1350
1351 /*********************************************************************//**
1352 Read sorted file containing index data tuples and insert these data
1353 tuples to the index
1354 @return DB_SUCCESS or error number */
1355 UNIV_INTERN
1356 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1357 row_fts_merge_insert(
1358 /*=================*/
1359 dict_index_t* index, /*!< in: index */
1360 dict_table_t* table, /*!< in: new table */
1361 fts_psort_t* psort_info, /*!< parallel sort info */
1362 ulint id) /* !< in: which auxiliary table's data
1363 to insert to */
1364 {
1365 const byte** b;
1366 mem_heap_t* tuple_heap;
1367 mem_heap_t* heap;
1368 dberr_t error = DB_SUCCESS;
1369 ulint* foffs;
1370 ulint** offsets;
1371 fts_tokenizer_word_t new_word;
1372 ib_vector_t* positions;
1373 doc_id_t last_doc_id;
1374 ib_alloc_t* heap_alloc;
1375 ulint n_bytes;
1376 ulint i;
1377 mrec_buf_t** buf;
1378 int* fd;
1379 byte** block;
1380 const mrec_t** mrec;
1381 ulint count = 0;
1382 int* sel_tree;
1383 ulint height;
1384 ulint start;
1385 fts_psort_insert_t ins_ctx;
1386 ulint count_diag = 0;
1387
1388 ut_ad(index);
1389 ut_ad(table);
1390
1391 /* We use the insert query graph as the dummy graph
1392 needed in the row module call */
1393
1394 ins_ctx.trx = trx_allocate_for_background();
1395
1396 ins_ctx.trx->op_info = "inserting index entries";
1397
1398 ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1399
1400 heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1401
1402 b = (const byte**) mem_heap_alloc(
1403 heap, sizeof (*b) * fts_sort_pll_degree);
1404 foffs = (ulint*) mem_heap_alloc(
1405 heap, sizeof(*foffs) * fts_sort_pll_degree);
1406 offsets = (ulint**) mem_heap_alloc(
1407 heap, sizeof(*offsets) * fts_sort_pll_degree);
1408 buf = (mrec_buf_t**) mem_heap_alloc(
1409 heap, sizeof(*buf) * fts_sort_pll_degree);
1410 fd = (int*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1411 block = (byte**) mem_heap_alloc(
1412 heap, sizeof(*block) * fts_sort_pll_degree);
1413 mrec = (const mrec_t**) mem_heap_alloc(
1414 heap, sizeof(*mrec) * fts_sort_pll_degree);
1415 sel_tree = (int*) mem_heap_alloc(
1416 heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1417
1418 tuple_heap = mem_heap_create(1000);
1419
1420 ins_ctx.charset = fts_index_get_charset(index);
1421 ins_ctx.heap = heap;
1422
1423 for (i = 0; i < fts_sort_pll_degree; i++) {
1424 ulint num;
1425
1426 num = 1 + REC_OFFS_HEADER_SIZE
1427 + dict_index_get_n_fields(index);
1428 offsets[i] = static_cast<ulint*>(mem_heap_zalloc(
1429 heap, num * sizeof *offsets[i]));
1430 offsets[i][0] = num;
1431 offsets[i][1] = dict_index_get_n_fields(index);
1432 block[i] = psort_info[i].merge_block[id];
1433 b[i] = psort_info[i].merge_block[id];
1434 fd[i] = psort_info[i].merge_file[id]->fd;
1435 foffs[i] = 0;
1436
1437 buf[i] = static_cast<unsigned char (*)[16384]>(
1438 mem_heap_alloc(heap, sizeof *buf[i]));
1439 count_diag += (int) psort_info[i].merge_file[id]->n_rec;
1440 }
1441
1442 if (fts_enable_diag_print) {
1443 ut_print_timestamp(stderr);
1444 fprintf(stderr, " InnoDB_FTS: to inserted %lu records\n",
1445 (ulong) count_diag);
1446 }
1447
1448 /* Initialize related variables if creating FTS indexes */
1449 heap_alloc = ib_heap_allocator_create(heap);
1450
1451 memset(&new_word, 0, sizeof(new_word));
1452
1453 new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1454 positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1455 last_doc_id = 0;
1456
1457 /* Allocate insert query graphs for FTS auxillary
1458 Index Table, note we have FTS_NUM_AUX_INDEX such index tables */
1459 n_bytes = sizeof(que_t*) * (FTS_NUM_AUX_INDEX + 1);
1460 ins_ctx.ins_graph = static_cast<que_t**>(mem_heap_alloc(heap, n_bytes));
1461 memset(ins_ctx.ins_graph, 0x0, n_bytes);
1462
1463 /* We should set the flags2 with aux_table_name here,
1464 in order to get the correct aux table names. */
1465 index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1466 DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1467 index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME;);
1468
1469 ins_ctx.fts_table.type = FTS_INDEX_TABLE;
1470 ins_ctx.fts_table.index_id = index->id;
1471 ins_ctx.fts_table.table_id = table->id;
1472 ins_ctx.fts_table.parent = index->table->name;
1473 ins_ctx.fts_table.table = index->table;
1474
1475 for (i = 0; i < fts_sort_pll_degree; i++) {
1476 if (psort_info[i].merge_file[id]->n_rec == 0) {
1477 /* No Rows to read */
1478 mrec[i] = b[i] = NULL;
1479 } else {
1480 /* Read from temp file only if it has been
1481 written to. Otherwise, block memory holds
1482 all the sorted records */
1483 if (psort_info[i].merge_file[id]->offset > 0
1484 && (!row_merge_read(
1485 fd[i], foffs[i],
1486 (row_merge_block_t*) block[i]))) {
1487 error = DB_CORRUPTION;
1488 goto exit;
1489 }
1490
1491 ROW_MERGE_READ_GET_NEXT(i);
1492 }
1493 }
1494
1495 height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1496 offsets, index);
1497
1498 start = (1 << height) - 1;
1499
1500 /* Fetch sorted records from sort buffer and insert them into
1501 corresponding FTS index auxiliary tables */
1502 for (;;) {
1503 dtuple_t* dtuple;
1504 ulint n_ext;
1505 int min_rec = 0;
1506
1507 if (fts_sort_pll_degree <= 2) {
1508 while (!mrec[min_rec]) {
1509 min_rec++;
1510
1511 if (min_rec >= (int) fts_sort_pll_degree) {
1512 row_fts_insert_tuple(
1513 &ins_ctx, &new_word,
1514 positions, &last_doc_id,
1515 NULL);
1516
1517 goto exit;
1518 }
1519 }
1520
1521 for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1522 if (!mrec[i]) {
1523 continue;
1524 }
1525
1526 if (cmp_rec_rec_simple(
1527 mrec[i], mrec[min_rec],
1528 offsets[i], offsets[min_rec],
1529 index, NULL) < 0) {
1530 min_rec = static_cast<int>(i);
1531 }
1532 }
1533 } else {
1534 min_rec = sel_tree[0];
1535
1536 if (min_rec == -1) {
1537 row_fts_insert_tuple(
1538 &ins_ctx, &new_word,
1539 positions, &last_doc_id,
1540 NULL);
1541
1542 goto exit;
1543 }
1544 }
1545
1546 dtuple = row_rec_to_index_entry_low(
1547 mrec[min_rec], index, offsets[min_rec], &n_ext,
1548 tuple_heap);
1549
1550 row_fts_insert_tuple(
1551 &ins_ctx, &new_word, positions,
1552 &last_doc_id, dtuple);
1553
1554
1555 ROW_MERGE_READ_GET_NEXT(min_rec);
1556
1557 if (fts_sort_pll_degree > 2) {
1558 if (!mrec[min_rec]) {
1559 sel_tree[start + min_rec] = -1;
1560 }
1561
1562 row_fts_sel_tree_update(sel_tree, start + min_rec,
1563 height, mrec,
1564 offsets, index);
1565 }
1566
1567 count++;
1568
1569 mem_heap_empty(tuple_heap);
1570 }
1571
1572 exit:
1573 fts_sql_commit(ins_ctx.trx);
1574
1575 ins_ctx.trx->op_info = "";
1576
1577 mem_heap_free(tuple_heap);
1578
1579 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1580 if (ins_ctx.ins_graph[i]) {
1581 fts_que_graph_free(ins_ctx.ins_graph[i]);
1582 }
1583 }
1584
1585 trx_free_for_background(ins_ctx.trx);
1586
1587 mem_heap_free(heap);
1588
1589 if (fts_enable_diag_print) {
1590 ut_print_timestamp(stderr);
1591 fprintf(stderr, " InnoDB_FTS: inserted %lu records\n",
1592 (ulong) count);
1593 }
1594
1595 return(error);
1596 }
1597