1 /*****************************************************************************
2
3 Copyright (c) 2010, 2016, Oracle and/or its affiliates. All Rights Reserved.
4 Copyright (c) 2015, 2021, MariaDB Corporation.
5
6 This program is free software; you can redistribute it and/or modify it under
7 the terms of the GNU General Public License as published by the Free Software
8 Foundation; version 2 of the License.
9
10 This program is distributed in the hope that it will be useful, but WITHOUT
11 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License along with
15 this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
17
18 *****************************************************************************/
19
20 /**************************************************//**
21 @file row/row0ftsort.cc
22 Create Full Text Index with (parallel) merge sort
23
24 Created 10/13/2010 Jimmy Yang
25 *******************************************************/
26
27 #include "row0ftsort.h"
28 #include "dict0dict.h"
29 #include "row0merge.h"
30 #include "row0row.h"
31 #include "btr0cur.h"
32 #include "fts0plugin.h"
33 #include "log0crypt.h"
34
35 /** Read the next record to buffer N.
36 @param N index into array of merge info structure */
37 #define ROW_MERGE_READ_GET_NEXT(N) \
38 do { \
39 b[N] = row_merge_read_rec( \
40 block[N], buf[N], b[N], index, \
41 fd[N], &foffs[N], &mrec[N], offsets[N], \
42 crypt_block[N], space); \
43 if (UNIV_UNLIKELY(!b[N])) { \
44 if (mrec[N]) { \
45 goto exit; \
46 } \
47 } \
48 } while (0)
49
50 /** Parallel sort degree */
51 ulong fts_sort_pll_degree = 2;
52
53 /*********************************************************************//**
54 Create a temporary "fts sort index" used to merge sort the
55 tokenized doc string. The index has three "fields":
56
57 1) Tokenized word,
58 2) Doc ID (depend on number of records to sort, it can be a 4 bytes or 8 bytes
59 integer value)
60 3) Word's position in original doc.
61
62 @see fts_create_one_index_table()
63
64 @return dict_index_t structure for the fts sort index */
65 dict_index_t*
row_merge_create_fts_sort_index(dict_index_t * index,dict_table_t * table,ibool * opt_doc_id_size)66 row_merge_create_fts_sort_index(
67 /*============================*/
68 dict_index_t* index, /*!< in: Original FTS index
69 based on which this sort index
70 is created */
71 dict_table_t* table, /*!< in,out: table that FTS index
72 is being created on */
73 ibool* opt_doc_id_size)
74 /*!< out: whether to use 4 bytes
75 instead of 8 bytes integer to
76 store Doc ID during sort */
77 {
78 dict_index_t* new_index;
79 dict_field_t* field;
80 dict_field_t* idx_field;
81 CHARSET_INFO* charset;
82
83 // FIXME: This name shouldn't be hard coded here.
84 new_index = dict_mem_index_create(table, "tmp_fts_idx", DICT_FTS, 3);
85
86 new_index->id = index->id;
87 new_index->n_uniq = FTS_NUM_FIELDS_SORT;
88 new_index->n_def = FTS_NUM_FIELDS_SORT;
89 new_index->cached = TRUE;
90 new_index->parser = index->parser;
91
92 idx_field = dict_index_get_nth_field(index, 0);
93 charset = fts_index_get_charset(index);
94
95 /* The first field is on the Tokenized Word */
96 field = dict_index_get_nth_field(new_index, 0);
97 field->name = NULL;
98 field->prefix_len = 0;
99 field->col = static_cast<dict_col_t*>(
100 mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
101 field->col->prtype = idx_field->col->prtype | DATA_NOT_NULL;
102 field->col->mtype = charset == &my_charset_latin1
103 ? DATA_VARCHAR : DATA_VARMYSQL;
104 field->col->mbminlen = idx_field->col->mbminlen;
105 field->col->mbmaxlen = idx_field->col->mbmaxlen;
106 field->col->len = static_cast<uint16_t>(
107 HA_FT_MAXCHARLEN * field->col->mbmaxlen);
108
109 field->fixed_len = 0;
110
111 /* Doc ID */
112 field = dict_index_get_nth_field(new_index, 1);
113 field->name = NULL;
114 field->prefix_len = 0;
115 field->col = static_cast<dict_col_t*>(
116 mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
117 field->col->mtype = DATA_INT;
118 *opt_doc_id_size = FALSE;
119
120 /* Check whether we can use 4 bytes instead of 8 bytes integer
121 field to hold the Doc ID, thus reduce the overall sort size */
122 if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
123 /* If Doc ID column is being added by this create
124 index, then just check the number of rows in the table */
125 if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
126 *opt_doc_id_size = TRUE;
127 }
128 } else {
129 doc_id_t max_doc_id;
130
131 /* If the Doc ID column is supplied by user, then
132 check the maximum Doc ID in the table */
133 max_doc_id = fts_get_max_doc_id((dict_table_t*) table);
134
135 if (max_doc_id && max_doc_id < MAX_DOC_ID_OPT_VAL) {
136 *opt_doc_id_size = TRUE;
137 }
138 }
139
140 if (*opt_doc_id_size) {
141 field->col->len = sizeof(ib_uint32_t);
142 field->fixed_len = sizeof(ib_uint32_t);
143 } else {
144 field->col->len = FTS_DOC_ID_LEN;
145 field->fixed_len = FTS_DOC_ID_LEN;
146 }
147
148 field->col->prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
149
150 /* The third field is on the word's position in the original doc */
151 field = dict_index_get_nth_field(new_index, 2);
152 field->name = NULL;
153 field->prefix_len = 0;
154 field->col = static_cast<dict_col_t*>(
155 mem_heap_zalloc(new_index->heap, sizeof(dict_col_t)));
156 field->col->mtype = DATA_INT;
157 field->col->len = 4 ;
158 field->fixed_len = 4;
159 field->col->prtype = DATA_NOT_NULL;
160
161 return(new_index);
162 }
163
164 /** Initialize FTS parallel sort structures.
165 @param[in] trx transaction
166 @param[in,out] dup descriptor of FTS index being created
167 @param[in,out] new_table table where indexes are created
168 @param[in] opt_doc_id_size whether to use 4 bytes instead of 8 bytes
169 integer to store Doc ID during sort
170 @param[in] old_zip_size page size of the old table during alter
171 @param[out] psort parallel sort info to be instantiated
172 @param[out] merge parallel merge info to be instantiated
173 @return true if all successful */
174 bool
row_fts_psort_info_init(trx_t * trx,row_merge_dup_t * dup,dict_table_t * new_table,bool opt_doc_id_size,ulint old_zip_size,fts_psort_t ** psort,fts_psort_t ** merge)175 row_fts_psort_info_init(
176 trx_t* trx,
177 row_merge_dup_t*dup,
178 dict_table_t* new_table,
179 bool opt_doc_id_size,
180 ulint old_zip_size,
181 fts_psort_t** psort,
182 fts_psort_t** merge)
183 {
184 ulint i;
185 ulint j;
186 fts_psort_common_t* common_info = NULL;
187 fts_psort_t* psort_info = NULL;
188 fts_psort_t* merge_info = NULL;
189 ulint block_size;
190 ibool ret = TRUE;
191 bool encrypted = false;
192 ut_ad(ut_is_2pow(old_zip_size));
193
194 block_size = 3 * srv_sort_buf_size;
195
196 *psort = psort_info = static_cast<fts_psort_t*>(ut_zalloc_nokey(
197 fts_sort_pll_degree * sizeof *psort_info));
198
199 if (!psort_info) {
200 ut_free(dup);
201 return(FALSE);
202 }
203
204 /* Common Info for all sort threads */
205 common_info = static_cast<fts_psort_common_t*>(
206 ut_malloc_nokey(sizeof *common_info));
207
208 if (!common_info) {
209 ut_free(dup);
210 ut_free(psort_info);
211 return(FALSE);
212 }
213
214 common_info->dup = dup;
215 common_info->new_table = new_table;
216 common_info->old_zip_size = old_zip_size;
217 common_info->trx = trx;
218 common_info->all_info = psort_info;
219 common_info->sort_event = os_event_create(0);
220 common_info->opt_doc_id_size = opt_doc_id_size;
221
222 if (log_tmp_is_encrypted()) {
223 encrypted = true;
224 }
225
226 ut_ad(trx->mysql_thd != NULL);
227 const char* path = thd_innodb_tmpdir(trx->mysql_thd);
228 /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
229 each parallel sort thread. Each "sort bucket" holds records for
230 a particular "FTS index partition" */
231 for (j = 0; j < fts_sort_pll_degree; j++) {
232
233 UT_LIST_INIT(
234 psort_info[j].fts_doc_list, &fts_doc_item_t::doc_list);
235
236 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
237
238 psort_info[j].merge_file[i] =
239 static_cast<merge_file_t*>(
240 ut_zalloc_nokey(sizeof(merge_file_t)));
241
242 if (!psort_info[j].merge_file[i]) {
243 ret = FALSE;
244 goto func_exit;
245 }
246
247 psort_info[j].merge_buf[i] = row_merge_buf_create(
248 dup->index);
249
250 if (row_merge_file_create(psort_info[j].merge_file[i],
251 path) == OS_FILE_CLOSED) {
252 goto func_exit;
253 }
254
255 /* Need to align memory for O_DIRECT write */
256 psort_info[j].merge_block[i] =
257 static_cast<row_merge_block_t*>(
258 aligned_malloc(block_size, 1024));
259
260 if (!psort_info[j].merge_block[i]) {
261 ret = FALSE;
262 goto func_exit;
263 }
264
265 /* If tablespace is encrypted, allocate additional buffer for
266 encryption/decryption. */
267 if (encrypted) {
268 /* Need to align memory for O_DIRECT write */
269 psort_info[j].crypt_block[i] =
270 static_cast<row_merge_block_t*>(
271 aligned_malloc(block_size,
272 1024));
273
274 if (!psort_info[j].crypt_block[i]) {
275 ret = FALSE;
276 goto func_exit;
277 }
278 } else {
279 psort_info[j].crypt_block[i] = NULL;
280 }
281 }
282
283 psort_info[j].child_status = 0;
284 psort_info[j].state = 0;
285 psort_info[j].psort_common = common_info;
286 psort_info[j].error = DB_SUCCESS;
287 psort_info[j].memory_used = 0;
288 mutex_create(LATCH_ID_FTS_PLL_TOKENIZE, &psort_info[j].mutex);
289 }
290
291 /* Initialize merge_info structures parallel merge and insert
292 into auxiliary FTS tables (FTS_INDEX_TABLE) */
293 *merge = merge_info = static_cast<fts_psort_t*>(
294 ut_malloc_nokey(FTS_NUM_AUX_INDEX * sizeof *merge_info));
295
296 for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
297
298 merge_info[j].child_status = 0;
299 merge_info[j].state = 0;
300 merge_info[j].psort_common = common_info;
301 }
302
303 func_exit:
304 if (!ret) {
305 row_fts_psort_info_destroy(psort_info, merge_info);
306 }
307
308 return(ret);
309 }
310 /*********************************************************************//**
311 Clean up and deallocate FTS parallel sort structures, and close the
312 merge sort files */
313 void
row_fts_psort_info_destroy(fts_psort_t * psort_info,fts_psort_t * merge_info)314 row_fts_psort_info_destroy(
315 /*=======================*/
316 fts_psort_t* psort_info, /*!< parallel sort info */
317 fts_psort_t* merge_info) /*!< parallel merge info */
318 {
319 ulint i;
320 ulint j;
321
322 if (psort_info) {
323 for (j = 0; j < fts_sort_pll_degree; j++) {
324 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
325 if (psort_info[j].merge_file[i]) {
326 row_merge_file_destroy(
327 psort_info[j].merge_file[i]);
328 }
329
330 aligned_free(psort_info[j].merge_block[i]);
331 ut_free(psort_info[j].merge_file[i]);
332 aligned_free(psort_info[j].crypt_block[i]);
333 }
334
335 mutex_free(&psort_info[j].mutex);
336 }
337
338 os_event_destroy(merge_info[0].psort_common->sort_event);
339 ut_free(merge_info[0].psort_common->dup);
340 ut_free(merge_info[0].psort_common);
341 ut_free(psort_info);
342 }
343
344 ut_free(merge_info);
345 }
346 /*********************************************************************//**
347 Free up merge buffers when merge sort is done */
348 void
row_fts_free_pll_merge_buf(fts_psort_t * psort_info)349 row_fts_free_pll_merge_buf(
350 /*=======================*/
351 fts_psort_t* psort_info) /*!< in: parallel sort info */
352 {
353 ulint j;
354 ulint i;
355
356 if (!psort_info) {
357 return;
358 }
359
360 for (j = 0; j < fts_sort_pll_degree; j++) {
361 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
362 row_merge_buf_free(psort_info[j].merge_buf[i]);
363 }
364 }
365
366 return;
367 }
368
369 /*********************************************************************//**
370 FTS plugin parser 'myql_add_word' callback function for row merge.
371 Refer to 'st_mysql_ftparser_param' for more detail.
372 @return always returns 0 */
373 static
374 int
row_merge_fts_doc_add_word_for_parser(MYSQL_FTPARSER_PARAM * param,const char * word,int word_len,MYSQL_FTPARSER_BOOLEAN_INFO * boolean_info)375 row_merge_fts_doc_add_word_for_parser(
376 /*==================================*/
377 MYSQL_FTPARSER_PARAM *param, /* in: parser paramter */
378 const char *word, /* in: token word */
379 int word_len, /* in: word len */
380 MYSQL_FTPARSER_BOOLEAN_INFO* boolean_info) /* in: boolean info */
381 {
382 fts_string_t str;
383 fts_tokenize_ctx_t* t_ctx;
384 row_fts_token_t* fts_token;
385 byte* ptr;
386
387 ut_ad(param);
388 ut_ad(param->mysql_ftparam);
389 ut_ad(word);
390 ut_ad(boolean_info);
391
392 t_ctx = static_cast<fts_tokenize_ctx_t*>(param->mysql_ftparam);
393 ut_ad(t_ctx);
394
395 str.f_str = (byte*)(word);
396 str.f_len = ulint(word_len);
397 str.f_n_char = fts_get_token_size(
398 (CHARSET_INFO*)param->cs, word, ulint(word_len));
399
400 /* JAN: TODO: MySQL 5.7 FTS
401 ut_ad(boolean_info->position >= 0);
402 */
403
404 ptr = static_cast<byte*>(ut_malloc_nokey(sizeof(row_fts_token_t)
405 + sizeof(fts_string_t) + str.f_len));
406 fts_token = reinterpret_cast<row_fts_token_t*>(ptr);
407 fts_token->text = reinterpret_cast<fts_string_t*>(
408 ptr + sizeof(row_fts_token_t));
409 fts_token->text->f_str = static_cast<byte*>(
410 ptr + sizeof(row_fts_token_t) + sizeof(fts_string_t));
411
412 fts_token->text->f_len = str.f_len;
413 fts_token->text->f_n_char = str.f_n_char;
414 memcpy(fts_token->text->f_str, str.f_str, str.f_len);
415
416 /* JAN: TODO: MySQL 5.7 FTS
417 fts_token->position = boolean_info->position;
418 */
419
420 /* Add token to list */
421 UT_LIST_ADD_LAST(t_ctx->fts_token_list, fts_token);
422
423 return(0);
424 }
425
426 /*********************************************************************//**
427 Tokenize by fts plugin parser */
428 static
429 void
row_merge_fts_doc_tokenize_by_parser(fts_doc_t * doc,st_mysql_ftparser * parser,fts_tokenize_ctx_t * t_ctx)430 row_merge_fts_doc_tokenize_by_parser(
431 /*=================================*/
432 fts_doc_t* doc, /* in: doc to tokenize */
433 st_mysql_ftparser* parser, /* in: plugin parser instance */
434 fts_tokenize_ctx_t* t_ctx) /* in/out: tokenize ctx instance */
435 {
436 MYSQL_FTPARSER_PARAM param;
437
438 ut_a(parser);
439
440 /* Set paramters for param */
441 param.mysql_parse = fts_tokenize_document_internal;
442 param.mysql_add_word = row_merge_fts_doc_add_word_for_parser;
443 param.mysql_ftparam = t_ctx;
444 param.cs = doc->charset;
445 param.doc = reinterpret_cast<char*>(doc->text.f_str);
446 param.length = static_cast<int>(doc->text.f_len);
447 param.mode= MYSQL_FTPARSER_SIMPLE_MODE;
448
449 PARSER_INIT(parser, ¶m);
450 /* We assume parse returns successfully here. */
451 parser->parse(¶m);
452 PARSER_DEINIT(parser, ¶m);
453 }
454
455 /*********************************************************************//**
456 Tokenize incoming text data and add to the sort buffer.
457 @see row_merge_buf_encode()
458 @return TRUE if the record passed, FALSE if out of space */
459 static
460 ibool
row_merge_fts_doc_tokenize(row_merge_buf_t ** sort_buf,doc_id_t doc_id,fts_doc_t * doc,merge_file_t ** merge_file,ibool opt_doc_id_size,fts_tokenize_ctx_t * t_ctx)461 row_merge_fts_doc_tokenize(
462 /*=======================*/
463 row_merge_buf_t** sort_buf, /*!< in/out: sort buffer */
464 doc_id_t doc_id, /*!< in: Doc ID */
465 fts_doc_t* doc, /*!< in: Doc to be tokenized */
466 merge_file_t** merge_file, /*!< in/out: merge file */
467 ibool opt_doc_id_size,/*!< in: whether to use 4 bytes
468 instead of 8 bytes integer to
469 store Doc ID during sort*/
470 fts_tokenize_ctx_t* t_ctx) /*!< in/out: tokenize context */
471 {
472 ulint inc = 0;
473 fts_string_t str;
474 ulint len;
475 row_merge_buf_t* buf;
476 dfield_t* field;
477 fts_string_t t_str;
478 ibool buf_full = FALSE;
479 byte str_buf[FTS_MAX_WORD_LEN + 1];
480 ulint data_size[FTS_NUM_AUX_INDEX];
481 ulint n_tuple[FTS_NUM_AUX_INDEX];
482 st_mysql_ftparser* parser;
483
484 t_str.f_n_char = 0;
485 t_ctx->buf_used = 0;
486
487 memset(n_tuple, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
488 memset(data_size, 0, FTS_NUM_AUX_INDEX * sizeof(ulint));
489
490 parser = sort_buf[0]->index->parser;
491
492 /* Tokenize the data and add each word string, its corresponding
493 doc id and position to sort buffer */
494 while (t_ctx->processed_len < doc->text.f_len) {
495 ulint idx = 0;
496 ulint cur_len;
497 doc_id_t write_doc_id;
498 row_fts_token_t* fts_token = NULL;
499
500 if (parser != NULL) {
501 if (t_ctx->processed_len == 0) {
502 UT_LIST_INIT(t_ctx->fts_token_list, &row_fts_token_t::token_list);
503
504 /* Parse the whole doc and cache tokens */
505 row_merge_fts_doc_tokenize_by_parser(doc,
506 parser, t_ctx);
507
508 /* Just indictate we have parsed all the word */
509 t_ctx->processed_len += 1;
510 }
511
512 /* Then get a token */
513 fts_token = UT_LIST_GET_FIRST(t_ctx->fts_token_list);
514 if (fts_token) {
515 str.f_len = fts_token->text->f_len;
516 str.f_n_char = fts_token->text->f_n_char;
517 str.f_str = fts_token->text->f_str;
518 } else {
519 ut_ad(UT_LIST_GET_LEN(t_ctx->fts_token_list) == 0);
520 /* Reach the end of the list */
521 t_ctx->processed_len = doc->text.f_len;
522 break;
523 }
524 } else {
525 inc = innobase_mysql_fts_get_token(
526 doc->charset,
527 doc->text.f_str + t_ctx->processed_len,
528 doc->text.f_str + doc->text.f_len, &str);
529
530 ut_a(inc > 0);
531 }
532
533 /* Ignore string whose character number is less than
534 "fts_min_token_size" or more than "fts_max_token_size" */
535 if (!fts_check_token(&str, NULL, NULL)) {
536 if (parser != NULL) {
537 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
538 ut_free(fts_token);
539 } else {
540 t_ctx->processed_len += inc;
541 }
542
543 continue;
544 }
545
546 t_str.f_len = innobase_fts_casedn_str(
547 doc->charset, (char*) str.f_str, str.f_len,
548 (char*) &str_buf, FTS_MAX_WORD_LEN + 1);
549
550 t_str.f_str = (byte*) &str_buf;
551
552 /* if "cached_stopword" is defined, ignore words in the
553 stopword list */
554 if (!fts_check_token(&str, t_ctx->cached_stopword,
555 doc->charset)) {
556 if (parser != NULL) {
557 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
558 ut_free(fts_token);
559 } else {
560 t_ctx->processed_len += inc;
561 }
562
563 continue;
564 }
565
566 /* There are FTS_NUM_AUX_INDEX auxiliary tables, find
567 out which sort buffer to put this word record in */
568 t_ctx->buf_used = fts_select_index(
569 doc->charset, t_str.f_str, t_str.f_len);
570
571 buf = sort_buf[t_ctx->buf_used];
572
573 ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
574 idx = t_ctx->buf_used;
575
576 mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
577
578 field = mtuple->fields = static_cast<dfield_t*>(
579 mem_heap_alloc(buf->heap,
580 FTS_NUM_FIELDS_SORT * sizeof *field));
581
582 /* The first field is the tokenized word */
583 dfield_set_data(field, t_str.f_str, t_str.f_len);
584 len = dfield_get_len(field);
585
586 dict_col_copy_type(dict_index_get_nth_col(buf->index, 0), &field->type);
587 field->type.prtype |= DATA_NOT_NULL;
588 ut_ad(len <= field->type.len);
589
590 /* For the temporary file, row_merge_buf_encode() uses
591 1 byte for representing the number of extra_size bytes.
592 This number will always be 1, because for this 3-field index
593 consisting of one variable-size column, extra_size will always
594 be 1 or 2, which can be encoded in one byte.
595
596 The extra_size is 1 byte if the length of the
597 variable-length column is less than 128 bytes or the
598 maximum length is less than 256 bytes. */
599
600 /* One variable length column, word with its lenght less than
601 fts_max_token_size, add one extra size and one extra byte.
602
603 Since the max length for FTS token now is larger than 255,
604 so we will need to signify length byte itself, so only 1 to 128
605 bytes can be used for 1 bytes, larger than that 2 bytes. */
606 if (len < 128 || field->type.len < 256) {
607 /* Extra size is one byte. */
608 cur_len = 2 + len;
609 } else {
610 /* Extra size is two bytes. */
611 cur_len = 3 + len;
612 }
613
614 dfield_dup(field, buf->heap);
615 field++;
616
617 /* The second field is the Doc ID */
618
619 ib_uint32_t doc_id_32_bit;
620
621 if (!opt_doc_id_size) {
622 fts_write_doc_id((byte*) &write_doc_id, doc_id);
623
624 dfield_set_data(
625 field, &write_doc_id, sizeof(write_doc_id));
626 } else {
627 mach_write_to_4(
628 (byte*) &doc_id_32_bit, (ib_uint32_t) doc_id);
629
630 dfield_set_data(
631 field, &doc_id_32_bit, sizeof(doc_id_32_bit));
632 }
633
634 len = field->len;
635 ut_ad(len == FTS_DOC_ID_LEN || len == sizeof(ib_uint32_t));
636
637 field->type.mtype = DATA_INT;
638 field->type.prtype = DATA_NOT_NULL | DATA_BINARY_TYPE;
639 field->type.len = static_cast<uint16_t>(field->len);
640 field->type.mbminlen = 0;
641 field->type.mbmaxlen = 0;
642
643 cur_len += len;
644 dfield_dup(field, buf->heap);
645
646 ++field;
647
648 /* The third field is the position.
649 MySQL 5.7 changed the fulltext parser plugin interface
650 by adding MYSQL_FTPARSER_BOOLEAN_INFO::position.
651 Below we assume that the field is always 0. */
652 ulint pos = t_ctx->init_pos;
653 byte position[4];
654 if (parser == NULL) {
655 pos += t_ctx->processed_len + inc - str.f_len;
656 }
657 len = 4;
658 mach_write_to_4(position, pos);
659 dfield_set_data(field, &position, len);
660
661 field->type.mtype = DATA_INT;
662 field->type.prtype = DATA_NOT_NULL;
663 field->type.len = 4;
664 field->type.mbminlen = 0;
665 field->type.mbmaxlen = 0;
666 cur_len += len;
667 dfield_dup(field, buf->heap);
668
669 /* Reserve one byte for the end marker of row_merge_block_t */
670 if (buf->total_size + data_size[idx] + cur_len
671 >= srv_sort_buf_size - 1) {
672
673 buf_full = TRUE;
674 break;
675 }
676
677 /* Increment the number of tuples */
678 n_tuple[idx]++;
679 if (parser != NULL) {
680 UT_LIST_REMOVE(t_ctx->fts_token_list, fts_token);
681 ut_free(fts_token);
682 } else {
683 t_ctx->processed_len += inc;
684 }
685 data_size[idx] += cur_len;
686 }
687
688 /* Update the data length and the number of new word tuples
689 added in this round of tokenization */
690 for (ulint i = 0; i < FTS_NUM_AUX_INDEX; i++) {
691 /* The computation of total_size below assumes that no
692 delete-mark flags will be stored and that all fields
693 are NOT NULL and fixed-length. */
694
695 sort_buf[i]->total_size += data_size[i];
696
697 sort_buf[i]->n_tuples += n_tuple[i];
698
699 merge_file[i]->n_rec += n_tuple[i];
700 t_ctx->rows_added[i] += n_tuple[i];
701 }
702
703 if (!buf_full) {
704 /* we pad one byte between text accross two fields */
705 t_ctx->init_pos += doc->text.f_len + 1;
706 }
707
708 return(!buf_full);
709 }
710
711 /*********************************************************************//**
712 Get next doc item from fts_doc_list */
713 UNIV_INLINE
714 void
row_merge_fts_get_next_doc_item(fts_psort_t * psort_info,fts_doc_item_t ** doc_item)715 row_merge_fts_get_next_doc_item(
716 /*============================*/
717 fts_psort_t* psort_info, /*!< in: psort_info */
718 fts_doc_item_t** doc_item) /*!< in/out: doc item */
719 {
720 if (*doc_item != NULL) {
721 ut_free(*doc_item);
722 }
723
724 mutex_enter(&psort_info->mutex);
725
726 *doc_item = UT_LIST_GET_FIRST(psort_info->fts_doc_list);
727 if (*doc_item != NULL) {
728 UT_LIST_REMOVE(psort_info->fts_doc_list, *doc_item);
729
730 ut_ad(psort_info->memory_used >= sizeof(fts_doc_item_t)
731 + (*doc_item)->field->len);
732 psort_info->memory_used -= sizeof(fts_doc_item_t)
733 + (*doc_item)->field->len;
734 }
735
736 mutex_exit(&psort_info->mutex);
737 }
738
739 /*********************************************************************//**
740 Function performs parallel tokenization of the incoming doc strings.
741 It also performs the initial in memory sort of the parsed records.
742 */
743 static
fts_parallel_tokenization(void * arg)744 void fts_parallel_tokenization(
745 /*======================*/
746 void* arg) /*!< in: psort_info for the thread */
747 {
748 fts_psort_t* psort_info = (fts_psort_t*) arg;
749 ulint i;
750 fts_doc_item_t* doc_item = NULL;
751 row_merge_buf_t** buf;
752 ibool processed = FALSE;
753 merge_file_t** merge_file;
754 row_merge_block_t** block;
755 row_merge_block_t** crypt_block;
756 pfs_os_file_t tmpfd[FTS_NUM_AUX_INDEX];
757 ulint mycount[FTS_NUM_AUX_INDEX];
758 ulint num_doc_processed = 0;
759 doc_id_t last_doc_id = 0;
760 mem_heap_t* blob_heap = NULL;
761 fts_doc_t doc;
762 dict_table_t* table = psort_info->psort_common->new_table;
763 fts_tokenize_ctx_t t_ctx;
764 ulint retried = 0;
765 dberr_t error = DB_SUCCESS;
766
767 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
768
769 /* const char* path = thd_innodb_tmpdir(
770 psort_info->psort_common->trx->mysql_thd);
771 */
772
773 ut_ad(psort_info->psort_common->trx->mysql_thd != NULL);
774
775 const char* path = thd_innodb_tmpdir(
776 psort_info->psort_common->trx->mysql_thd);
777
778 ut_ad(psort_info);
779
780 buf = psort_info->merge_buf;
781 merge_file = psort_info->merge_file;
782 blob_heap = mem_heap_create(512);
783 memset(&doc, 0, sizeof(doc));
784 memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
785
786 doc.charset = fts_index_get_charset(
787 psort_info->psort_common->dup->index);
788
789 block = psort_info->merge_block;
790 crypt_block = psort_info->crypt_block;
791
792 const ulint zip_size = psort_info->psort_common->old_zip_size;
793
794 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
795
796 t_ctx.cached_stopword = table->fts->cache->stopword_info.cached_stopword;
797 processed = TRUE;
798 loop:
799 while (doc_item) {
800 dfield_t* dfield = doc_item->field;
801
802 last_doc_id = doc_item->doc_id;
803
804 ut_ad (dfield->data != NULL
805 && dfield_get_len(dfield) != UNIV_SQL_NULL);
806
807 /* If finish processing the last item, update "doc" with
808 strings in the doc_item, otherwise continue processing last
809 item */
810 if (processed) {
811 byte* data;
812 ulint data_len;
813
814 dfield = doc_item->field;
815 data = static_cast<byte*>(dfield_get_data(dfield));
816 data_len = dfield_get_len(dfield);
817
818 if (dfield_is_ext(dfield)) {
819 doc.text.f_str =
820 btr_copy_externally_stored_field(
821 &doc.text.f_len, data,
822 zip_size, data_len, blob_heap);
823 } else {
824 doc.text.f_str = data;
825 doc.text.f_len = data_len;
826 }
827
828 doc.tokens = 0;
829 t_ctx.processed_len = 0;
830 } else {
831 /* Not yet finish processing the "doc" on hand,
832 continue processing it */
833 ut_ad(doc.text.f_str);
834 ut_ad(t_ctx.processed_len < doc.text.f_len);
835 }
836
837 processed = row_merge_fts_doc_tokenize(
838 buf, doc_item->doc_id, &doc,
839 merge_file, psort_info->psort_common->opt_doc_id_size,
840 &t_ctx);
841
842 /* Current sort buffer full, need to recycle */
843 if (!processed) {
844 ut_ad(t_ctx.processed_len < doc.text.f_len);
845 ut_ad(t_ctx.rows_added[t_ctx.buf_used]);
846 break;
847 }
848
849 num_doc_processed++;
850
851 if (UNIV_UNLIKELY(fts_enable_diag_print)
852 && num_doc_processed % 10000 == 1) {
853 ib::info() << "Number of documents processed: "
854 << num_doc_processed;
855 #ifdef FTS_INTERNAL_DIAG_PRINT
856 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
857 ib::info() << "ID " << psort_info->psort_id
858 << ", partition " << i << ", word "
859 << mycount[i];
860 }
861 #endif
862 }
863
864 mem_heap_empty(blob_heap);
865
866 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
867
868 if (doc_item && last_doc_id != doc_item->doc_id) {
869 t_ctx.init_pos = 0;
870 }
871 }
872
873 /* If we run out of current sort buffer, need to sort
874 and flush the sort buffer to disk */
875 if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
876 row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
877 row_merge_buf_write(buf[t_ctx.buf_used],
878 merge_file[t_ctx.buf_used],
879 block[t_ctx.buf_used]);
880
881 if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
882 merge_file[t_ctx.buf_used]->offset++,
883 block[t_ctx.buf_used],
884 crypt_block[t_ctx.buf_used],
885 table->space_id)) {
886 error = DB_TEMP_FILE_WRITE_FAIL;
887 goto func_exit;
888 }
889
890 MEM_UNDEFINED(block[t_ctx.buf_used], srv_sort_buf_size);
891 buf[t_ctx.buf_used] = row_merge_buf_empty(buf[t_ctx.buf_used]);
892 mycount[t_ctx.buf_used] += t_ctx.rows_added[t_ctx.buf_used];
893 t_ctx.rows_added[t_ctx.buf_used] = 0;
894
895 ut_a(doc_item);
896 goto loop;
897 }
898
899 /* Parent done scanning, and if finish processing all the docs, exit */
900 if (psort_info->state == FTS_PARENT_COMPLETE) {
901 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) == 0) {
902 goto exit;
903 } else if (retried > 10000) {
904 ut_ad(!doc_item);
905 /* retried too many times and cannot get new record */
906 ib::error() << "FTS parallel sort processed "
907 << num_doc_processed
908 << " records, the sort queue has "
909 << UT_LIST_GET_LEN(psort_info->fts_doc_list)
910 << " records. But sort cannot get the next"
911 " records during alter table " << table->name;
912 goto exit;
913 }
914 } else if (psort_info->state == FTS_PARENT_EXITING) {
915 /* Parent abort */
916 goto func_exit;
917 }
918
919 if (doc_item == NULL) {
920 os_thread_yield();
921 }
922
923 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
924
925 if (doc_item != NULL) {
926 if (last_doc_id != doc_item->doc_id) {
927 t_ctx.init_pos = 0;
928 }
929
930 retried = 0;
931 } else if (psort_info->state == FTS_PARENT_COMPLETE) {
932 retried++;
933 }
934
935 goto loop;
936
937 exit:
938 /* Do a final sort of the last (or latest) batch of records
939 in block memory. Flush them to temp file if records cannot
940 be hold in one block memory */
941 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
942 if (t_ctx.rows_added[i]) {
943 row_merge_buf_sort(buf[i], NULL);
944 row_merge_buf_write(
945 buf[i], merge_file[i], block[i]);
946
947 /* Write to temp file, only if records have
948 been flushed to temp file before (offset > 0):
949 The pseudo code for sort is following:
950
951 while (there are rows) {
952 tokenize rows, put result in block[]
953 if (block[] runs out) {
954 sort rows;
955 write to temp file with
956 row_merge_write();
957 offset++;
958 }
959 }
960
961 # write out the last batch
962 if (offset > 0) {
963 row_merge_write();
964 offset++;
965 } else {
966 # no need to write anything
967 offset stay as 0
968 }
969
970 so if merge_file[i]->offset is 0 when we come to
971 here as the last batch, this means rows have
972 never flush to temp file, it can be held all in
973 memory */
974 if (merge_file[i]->offset != 0) {
975 if (!row_merge_write(merge_file[i]->fd,
976 merge_file[i]->offset++,
977 block[i],
978 crypt_block[i],
979 table->space_id)) {
980 error = DB_TEMP_FILE_WRITE_FAIL;
981 goto func_exit;
982 }
983
984 #ifdef HAVE_valgrind
985 MEM_UNDEFINED(block[i], srv_sort_buf_size);
986
987 if (crypt_block[i]) {
988 MEM_UNDEFINED(crypt_block[i],
989 srv_sort_buf_size);
990 }
991 #endif /* HAVE_valgrind */
992 }
993
994 buf[i] = row_merge_buf_empty(buf[i]);
995 t_ctx.rows_added[i] = 0;
996 }
997 }
998
999 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1000 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: start merge sort\n");
1001 }
1002
1003 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1004 if (!merge_file[i]->offset) {
1005 continue;
1006 }
1007
1008 tmpfd[i] = row_merge_file_create_low(path);
1009 if (tmpfd[i] == OS_FILE_CLOSED) {
1010 error = DB_OUT_OF_MEMORY;
1011 goto func_exit;
1012 }
1013
1014 error = row_merge_sort(psort_info->psort_common->trx,
1015 psort_info->psort_common->dup,
1016 merge_file[i], block[i], &tmpfd[i],
1017 false, 0.0/* pct_progress */, 0.0/* pct_cost */,
1018 crypt_block[i], table->space_id);
1019
1020 if (error != DB_SUCCESS) {
1021 row_merge_file_destroy_low(tmpfd[i]);
1022 goto func_exit;
1023 }
1024
1025 row_merge_file_destroy_low(tmpfd[i]);
1026 }
1027
1028 func_exit:
1029 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1030 DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
1031 }
1032
1033 mem_heap_free(blob_heap);
1034
1035 mutex_enter(&psort_info->mutex);
1036 psort_info->error = error;
1037 mutex_exit(&psort_info->mutex);
1038
1039 if (UT_LIST_GET_LEN(psort_info->fts_doc_list) > 0) {
1040 /* child can exit either with error or told by parent. */
1041 ut_ad(error != DB_SUCCESS
1042 || psort_info->state == FTS_PARENT_EXITING);
1043 }
1044
1045 /* Free fts doc list in case of error. */
1046 do {
1047 row_merge_fts_get_next_doc_item(psort_info, &doc_item);
1048 } while (doc_item != NULL);
1049
1050 psort_info->child_status = FTS_CHILD_COMPLETE;
1051 os_event_set(psort_info->psort_common->sort_event);
1052 psort_info->child_status = FTS_CHILD_EXITING;
1053 }
1054
1055 /*********************************************************************//**
1056 Start the parallel tokenization and parallel merge sort */
1057 void
row_fts_start_psort(fts_psort_t * psort_info)1058 row_fts_start_psort(
1059 /*================*/
1060 fts_psort_t* psort_info) /*!< parallel sort structure */
1061 {
1062 ulint i = 0;
1063
1064 for (i = 0; i < fts_sort_pll_degree; i++) {
1065 psort_info[i].psort_id = i;
1066 psort_info[i].task =
1067 new tpool::waitable_task(fts_parallel_tokenization,&psort_info[i]);
1068 srv_thread_pool->submit_task(psort_info[i].task);
1069 }
1070 }
1071
1072 /*********************************************************************//**
1073 Function performs the merge and insertion of the sorted records. */
1074 static
1075 void
fts_parallel_merge(void * arg)1076 fts_parallel_merge(
1077 /*===============*/
1078 void* arg) /*!< in: parallel merge info */
1079 {
1080 fts_psort_t* psort_info = (fts_psort_t*) arg;
1081 ulint id;
1082
1083 ut_ad(psort_info);
1084
1085 id = psort_info->psort_id;
1086
1087 row_fts_merge_insert(psort_info->psort_common->dup->index,
1088 psort_info->psort_common->new_table,
1089 psort_info->psort_common->all_info, id);
1090 }
1091
1092 /*********************************************************************//**
1093 Kick off the parallel merge and insert thread */
1094 void
row_fts_start_parallel_merge(fts_psort_t * merge_info)1095 row_fts_start_parallel_merge(
1096 /*=========================*/
1097 fts_psort_t* merge_info) /*!< in: parallel sort info */
1098 {
1099 ulint i = 0;
1100
1101 /* Kick off merge/insert tasks */
1102 for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
1103 merge_info[i].psort_id = i;
1104 merge_info[i].child_status = 0;
1105
1106 merge_info[i].task = new tpool::waitable_task(
1107 fts_parallel_merge,
1108 (void*) &merge_info[i]);
1109 srv_thread_pool->submit_task(merge_info[i].task);
1110 }
1111 }
1112
1113 /**
1114 Write out a single word's data as new entry/entries in the INDEX table.
1115 @param[in] ins_ctx insert context
1116 @param[in] word word string
1117 @param[in] node node colmns
1118 @return DB_SUCCUESS if insertion runs fine, otherwise error code */
1119 static
1120 dberr_t
row_merge_write_fts_node(const fts_psort_insert_t * ins_ctx,const fts_string_t * word,const fts_node_t * node)1121 row_merge_write_fts_node(
1122 const fts_psort_insert_t* ins_ctx,
1123 const fts_string_t* word,
1124 const fts_node_t* node)
1125 {
1126 dtuple_t* tuple;
1127 dfield_t* field;
1128 dberr_t ret = DB_SUCCESS;
1129 doc_id_t write_first_doc_id[8];
1130 doc_id_t write_last_doc_id[8];
1131 ib_uint32_t write_doc_count;
1132
1133 tuple = ins_ctx->tuple;
1134
1135 /* The first field is the tokenized word */
1136 field = dtuple_get_nth_field(tuple, 0);
1137 dfield_set_data(field, word->f_str, word->f_len);
1138
1139 /* The second field is first_doc_id */
1140 field = dtuple_get_nth_field(tuple, 1);
1141 fts_write_doc_id((byte*)&write_first_doc_id, node->first_doc_id);
1142 dfield_set_data(field, &write_first_doc_id, sizeof(doc_id_t));
1143
1144 /* The third and fourth fileds(TRX_ID, ROLL_PTR) are filled already.*/
1145 /* The fifth field is last_doc_id */
1146 field = dtuple_get_nth_field(tuple, 4);
1147 fts_write_doc_id((byte*)&write_last_doc_id, node->last_doc_id);
1148 dfield_set_data(field, &write_last_doc_id, sizeof(doc_id_t));
1149
1150 /* The sixth field is doc_count */
1151 field = dtuple_get_nth_field(tuple, 5);
1152 mach_write_to_4((byte*)&write_doc_count, (ib_uint32_t)node->doc_count);
1153 dfield_set_data(field, &write_doc_count, sizeof(ib_uint32_t));
1154
1155 /* The seventh field is ilist */
1156 field = dtuple_get_nth_field(tuple, 6);
1157 dfield_set_data(field, node->ilist, node->ilist_size);
1158
1159 ret = ins_ctx->btr_bulk->insert(tuple);
1160
1161 return(ret);
1162 }
1163
1164 /********************************************************************//**
1165 Insert processed FTS data to auxillary index tables.
1166 @return DB_SUCCESS if insertion runs fine */
1167 static MY_ATTRIBUTE((nonnull))
1168 dberr_t
row_merge_write_fts_word(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word)1169 row_merge_write_fts_word(
1170 /*=====================*/
1171 fts_psort_insert_t* ins_ctx, /*!< in: insert context */
1172 fts_tokenizer_word_t* word) /*!< in: sorted and tokenized
1173 word */
1174 {
1175 dberr_t ret = DB_SUCCESS;
1176
1177 ut_ad(ins_ctx->aux_index_id == fts_select_index(
1178 ins_ctx->charset, word->text.f_str, word->text.f_len));
1179
1180 /* Pop out each fts_node in word->nodes write them to auxiliary table */
1181 for (ulint i = 0; i < ib_vector_size(word->nodes); i++) {
1182 dberr_t error;
1183 fts_node_t* fts_node;
1184
1185 fts_node = static_cast<fts_node_t*>(ib_vector_get(word->nodes, i));
1186
1187 error = row_merge_write_fts_node(ins_ctx, &word->text, fts_node);
1188
1189 if (UNIV_UNLIKELY(error != DB_SUCCESS)) {
1190 ib::error() << "Failed to write word to FTS auxiliary"
1191 " index table "
1192 << ins_ctx->btr_bulk->table_name()
1193 << ", error " << error;
1194 ret = error;
1195 }
1196
1197 ut_free(fts_node->ilist);
1198 fts_node->ilist = NULL;
1199 }
1200
1201 ib_vector_reset(word->nodes);
1202
1203 return(ret);
1204 }
1205
1206 /*********************************************************************//**
1207 Read sorted FTS data files and insert data tuples to auxillary tables.
1208 @return DB_SUCCESS or error number */
1209 static
1210 void
row_fts_insert_tuple(fts_psort_insert_t * ins_ctx,fts_tokenizer_word_t * word,ib_vector_t * positions,doc_id_t * in_doc_id,dtuple_t * dtuple)1211 row_fts_insert_tuple(
1212 /*=================*/
1213 fts_psort_insert_t*
1214 ins_ctx, /*!< in: insert context */
1215 fts_tokenizer_word_t* word, /*!< in: last processed
1216 tokenized word */
1217 ib_vector_t* positions, /*!< in: word position */
1218 doc_id_t* in_doc_id, /*!< in: last item doc id */
1219 dtuple_t* dtuple) /*!< in: entry to insert */
1220 {
1221 fts_node_t* fts_node = NULL;
1222 dfield_t* dfield;
1223 doc_id_t doc_id;
1224 ulint position;
1225 fts_string_t token_word;
1226 ulint i;
1227
1228 /* Get fts_node for the FTS auxillary INDEX table */
1229 if (ib_vector_size(word->nodes) > 0) {
1230 fts_node = static_cast<fts_node_t*>(
1231 ib_vector_last(word->nodes));
1232 }
1233
1234 if (fts_node == NULL
1235 || fts_node->ilist_size > FTS_ILIST_MAX_SIZE) {
1236
1237 fts_node = static_cast<fts_node_t*>(
1238 ib_vector_push(word->nodes, NULL));
1239
1240 memset(fts_node, 0x0, sizeof(*fts_node));
1241 }
1242
1243 /* If dtuple == NULL, this is the last word to be processed */
1244 if (!dtuple) {
1245 if (fts_node && ib_vector_size(positions) > 0) {
1246 fts_cache_node_add_positions(
1247 NULL, fts_node, *in_doc_id,
1248 positions);
1249
1250 /* Write out the current word */
1251 row_merge_write_fts_word(ins_ctx, word);
1252 }
1253
1254 return;
1255 }
1256
1257 /* Get the first field for the tokenized word */
1258 dfield = dtuple_get_nth_field(dtuple, 0);
1259
1260 token_word.f_n_char = 0;
1261 token_word.f_len = dfield->len;
1262 token_word.f_str = static_cast<byte*>(dfield_get_data(dfield));
1263
1264 if (!word->text.f_str) {
1265 fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1266 }
1267
1268 /* compare to the last word, to see if they are the same
1269 word */
1270 if (innobase_fts_text_cmp(ins_ctx->charset,
1271 &word->text, &token_word) != 0) {
1272 ulint num_item;
1273
1274 /* Getting a new word, flush the last position info
1275 for the currnt word in fts_node */
1276 if (ib_vector_size(positions) > 0) {
1277 fts_cache_node_add_positions(
1278 NULL, fts_node, *in_doc_id, positions);
1279 }
1280
1281 /* Write out the current word */
1282 row_merge_write_fts_word(ins_ctx, word);
1283
1284 /* Copy the new word */
1285 fts_string_dup(&word->text, &token_word, ins_ctx->heap);
1286
1287 num_item = ib_vector_size(positions);
1288
1289 /* Clean up position queue */
1290 for (i = 0; i < num_item; i++) {
1291 ib_vector_pop(positions);
1292 }
1293
1294 /* Reset Doc ID */
1295 *in_doc_id = 0;
1296 memset(fts_node, 0x0, sizeof(*fts_node));
1297 }
1298
1299 /* Get the word's Doc ID */
1300 dfield = dtuple_get_nth_field(dtuple, 1);
1301
1302 if (!ins_ctx->opt_doc_id_size) {
1303 doc_id = fts_read_doc_id(
1304 static_cast<byte*>(dfield_get_data(dfield)));
1305 } else {
1306 doc_id = (doc_id_t) mach_read_from_4(
1307 static_cast<byte*>(dfield_get_data(dfield)));
1308 }
1309
1310 /* Get the word's position info */
1311 dfield = dtuple_get_nth_field(dtuple, 2);
1312 position = mach_read_from_4(static_cast<byte*>(dfield_get_data(dfield)));
1313
1314 /* If this is the same word as the last word, and they
1315 have the same Doc ID, we just need to add its position
1316 info. Otherwise, we will flush position info to the
1317 fts_node and initiate a new position vector */
1318 if (!(*in_doc_id) || *in_doc_id == doc_id) {
1319 ib_vector_push(positions, &position);
1320 } else {
1321 ulint num_pos = ib_vector_size(positions);
1322
1323 fts_cache_node_add_positions(NULL, fts_node,
1324 *in_doc_id, positions);
1325 for (i = 0; i < num_pos; i++) {
1326 ib_vector_pop(positions);
1327 }
1328 ib_vector_push(positions, &position);
1329 }
1330
1331 /* record the current Doc ID */
1332 *in_doc_id = doc_id;
1333 }
1334
1335 /*********************************************************************//**
1336 Propagate a newly added record up one level in the selection tree
1337 @return parent where this value propagated to */
1338 static
1339 ulint
row_fts_sel_tree_propagate(ulint propogated,int * sel_tree,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1340 row_fts_sel_tree_propagate(
1341 /*=======================*/
1342 ulint propogated, /*<! in: tree node propagated */
1343 int* sel_tree, /*<! in: selection tree */
1344 const mrec_t** mrec, /*<! in: sort record */
1345 rec_offs** offsets, /*<! in: record offsets */
1346 dict_index_t* index) /*<! in/out: FTS index */
1347 {
1348 ulint parent;
1349 int child_left;
1350 int child_right;
1351 int selected;
1352
1353 /* Find which parent this value will be propagated to */
1354 parent = (propogated - 1) / 2;
1355
1356 /* Find out which value is smaller, and to propagate */
1357 child_left = sel_tree[parent * 2 + 1];
1358 child_right = sel_tree[parent * 2 + 2];
1359
1360 if (child_left == -1 || mrec[child_left] == NULL) {
1361 if (child_right == -1
1362 || mrec[child_right] == NULL) {
1363 selected = -1;
1364 } else {
1365 selected = child_right ;
1366 }
1367 } else if (child_right == -1
1368 || mrec[child_right] == NULL) {
1369 selected = child_left;
1370 } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
1371 offsets[child_left],
1372 offsets[child_right],
1373 index, NULL) < 0) {
1374 selected = child_left;
1375 } else {
1376 selected = child_right;
1377 }
1378
1379 sel_tree[parent] = selected;
1380
1381 return parent;
1382 }
1383
1384 /*********************************************************************//**
1385 Readjust selection tree after popping the root and read a new value
1386 @return the new root */
1387 static
1388 int
row_fts_sel_tree_update(int * sel_tree,ulint propagated,ulint height,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1389 row_fts_sel_tree_update(
1390 /*====================*/
1391 int* sel_tree, /*<! in/out: selection tree */
1392 ulint propagated, /*<! in: node to propagate up */
1393 ulint height, /*<! in: tree height */
1394 const mrec_t** mrec, /*<! in: sort record */
1395 rec_offs** offsets, /*<! in: record offsets */
1396 dict_index_t* index) /*<! in: index dictionary */
1397 {
1398 ulint i;
1399
1400 for (i = 1; i <= height; i++) {
1401 propagated = row_fts_sel_tree_propagate(
1402 propagated, sel_tree, mrec, offsets, index);
1403 }
1404
1405 return(sel_tree[0]);
1406 }
1407
1408 /*********************************************************************//**
1409 Build selection tree at a specified level */
1410 static
1411 void
row_fts_build_sel_tree_level(int * sel_tree,ulint level,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1412 row_fts_build_sel_tree_level(
1413 /*=========================*/
1414 int* sel_tree, /*<! in/out: selection tree */
1415 ulint level, /*<! in: selection tree level */
1416 const mrec_t** mrec, /*<! in: sort record */
1417 rec_offs** offsets, /*<! in: record offsets */
1418 dict_index_t* index) /*<! in: index dictionary */
1419 {
1420 ulint start;
1421 int child_left;
1422 int child_right;
1423 ulint i;
1424 ulint num_item = ulint(1) << level;
1425
1426 start = num_item - 1;
1427
1428 for (i = 0; i < num_item; i++) {
1429 child_left = sel_tree[(start + i) * 2 + 1];
1430 child_right = sel_tree[(start + i) * 2 + 2];
1431
1432 if (child_left == -1) {
1433 if (child_right == -1) {
1434 sel_tree[start + i] = -1;
1435 } else {
1436 sel_tree[start + i] = child_right;
1437 }
1438 continue;
1439 } else if (child_right == -1) {
1440 sel_tree[start + i] = child_left;
1441 continue;
1442 }
1443
1444 /* Deal with NULL child conditions */
1445 if (!mrec[child_left]) {
1446 if (!mrec[child_right]) {
1447 sel_tree[start + i] = -1;
1448 } else {
1449 sel_tree[start + i] = child_right;
1450 }
1451 continue;
1452 } else if (!mrec[child_right]) {
1453 sel_tree[start + i] = child_left;
1454 continue;
1455 }
1456
1457 /* Select the smaller one to set parent pointer */
1458 int cmp = cmp_rec_rec_simple(
1459 mrec[child_left], mrec[child_right],
1460 offsets[child_left], offsets[child_right],
1461 index, NULL);
1462
1463 sel_tree[start + i] = cmp < 0 ? child_left : child_right;
1464 }
1465 }
1466
1467 /*********************************************************************//**
1468 Build a selection tree for merge. The selection tree is a binary tree
1469 and should have fts_sort_pll_degree / 2 levels. With root as level 0
1470 @return number of tree levels */
1471 static
1472 ulint
row_fts_build_sel_tree(int * sel_tree,const mrec_t ** mrec,rec_offs ** offsets,dict_index_t * index)1473 row_fts_build_sel_tree(
1474 /*===================*/
1475 int* sel_tree, /*<! in/out: selection tree */
1476 const mrec_t** mrec, /*<! in: sort record */
1477 rec_offs** offsets, /*<! in: record offsets */
1478 dict_index_t* index) /*<! in: index dictionary */
1479 {
1480 ulint treelevel = 1;
1481 ulint num = 2;
1482 ulint i = 0;
1483 ulint start;
1484
1485 /* No need to build selection tree if we only have two merge threads */
1486 if (fts_sort_pll_degree <= 2) {
1487 return(0);
1488 }
1489
1490 while (num < fts_sort_pll_degree) {
1491 num = num << 1;
1492 treelevel++;
1493 }
1494
1495 start = (ulint(1) << treelevel) - 1;
1496
1497 for (i = 0; i < fts_sort_pll_degree; i++) {
1498 sel_tree[i + start] = int(i);
1499 }
1500
1501 i = treelevel;
1502 do {
1503 row_fts_build_sel_tree_level(
1504 sel_tree, --i, mrec, offsets, index);
1505 } while (i > 0);
1506
1507 return(treelevel);
1508 }
1509
1510 /*********************************************************************//**
1511 Read sorted file containing index data tuples and insert these data
1512 tuples to the index
1513 @return DB_SUCCESS or error number */
1514 dberr_t
row_fts_merge_insert(dict_index_t * index,dict_table_t * table,fts_psort_t * psort_info,ulint id)1515 row_fts_merge_insert(
1516 /*=================*/
1517 dict_index_t* index, /*!< in: index */
1518 dict_table_t* table, /*!< in: new table */
1519 fts_psort_t* psort_info, /*!< parallel sort info */
1520 ulint id) /* !< in: which auxiliary table's data
1521 to insert to */
1522 {
1523 const byte** b;
1524 mem_heap_t* tuple_heap;
1525 mem_heap_t* heap;
1526 dberr_t error = DB_SUCCESS;
1527 ulint* foffs;
1528 rec_offs** offsets;
1529 fts_tokenizer_word_t new_word;
1530 ib_vector_t* positions;
1531 doc_id_t last_doc_id;
1532 ib_alloc_t* heap_alloc;
1533 ulint i;
1534 mrec_buf_t** buf;
1535 pfs_os_file_t* fd;
1536 byte** block;
1537 byte** crypt_block;
1538 const mrec_t** mrec;
1539 ulint count = 0;
1540 int* sel_tree;
1541 ulint height;
1542 ulint start;
1543 fts_psort_insert_t ins_ctx;
1544 uint64_t count_diag = 0;
1545 fts_table_t fts_table;
1546 char aux_table_name[MAX_FULL_NAME_LEN];
1547 dict_table_t* aux_table;
1548 dict_index_t* aux_index;
1549 trx_t* trx;
1550
1551 /* We use the insert query graph as the dummy graph
1552 needed in the row module call */
1553
1554 trx = trx_create();
1555 trx_start_if_not_started(trx, true);
1556
1557 trx->op_info = "inserting index entries";
1558
1559 ins_ctx.opt_doc_id_size = psort_info[0].psort_common->opt_doc_id_size;
1560
1561 heap = mem_heap_create(500 + sizeof(mrec_buf_t));
1562
1563 b = (const byte**) mem_heap_alloc(
1564 heap, sizeof (*b) * fts_sort_pll_degree);
1565 foffs = (ulint*) mem_heap_alloc(
1566 heap, sizeof(*foffs) * fts_sort_pll_degree);
1567 offsets = (rec_offs**) mem_heap_alloc(
1568 heap, sizeof(*offsets) * fts_sort_pll_degree);
1569 buf = (mrec_buf_t**) mem_heap_alloc(
1570 heap, sizeof(*buf) * fts_sort_pll_degree);
1571 fd = (pfs_os_file_t*) mem_heap_alloc(heap, sizeof(*fd) * fts_sort_pll_degree);
1572 block = (byte**) mem_heap_alloc(
1573 heap, sizeof(*block) * fts_sort_pll_degree);
1574 crypt_block = (byte**) mem_heap_alloc(
1575 heap, sizeof(*block) * fts_sort_pll_degree);
1576 mrec = (const mrec_t**) mem_heap_alloc(
1577 heap, sizeof(*mrec) * fts_sort_pll_degree);
1578 sel_tree = (int*) mem_heap_alloc(
1579 heap, sizeof(*sel_tree) * (fts_sort_pll_degree * 2));
1580
1581 tuple_heap = mem_heap_create(1000);
1582
1583 ins_ctx.charset = fts_index_get_charset(index);
1584 ins_ctx.heap = heap;
1585
1586 for (i = 0; i < fts_sort_pll_degree; i++) {
1587 ulint num;
1588
1589 num = 1 + REC_OFFS_HEADER_SIZE
1590 + dict_index_get_n_fields(index);
1591 offsets[i] = static_cast<rec_offs*>(mem_heap_zalloc(
1592 heap, num * sizeof *offsets[i]));
1593 rec_offs_set_n_alloc(offsets[i], num);
1594 rec_offs_set_n_fields(offsets[i], dict_index_get_n_fields(index));
1595 block[i] = psort_info[i].merge_block[id];
1596 crypt_block[i] = psort_info[i].crypt_block[id];
1597 b[i] = psort_info[i].merge_block[id];
1598 fd[i] = psort_info[i].merge_file[id]->fd;
1599 foffs[i] = 0;
1600
1601 buf[i] = static_cast<mrec_buf_t*>(
1602 mem_heap_alloc(heap, sizeof *buf[i]));
1603
1604 count_diag += psort_info[i].merge_file[id]->n_rec;
1605 }
1606
1607 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1608 ib::info() << "InnoDB_FTS: to insert " << count_diag
1609 << " records";
1610 }
1611
1612 /* Initialize related variables if creating FTS indexes */
1613 heap_alloc = ib_heap_allocator_create(heap);
1614
1615 memset(&new_word, 0, sizeof(new_word));
1616
1617 new_word.nodes = ib_vector_create(heap_alloc, sizeof(fts_node_t), 4);
1618 positions = ib_vector_create(heap_alloc, sizeof(ulint), 32);
1619 last_doc_id = 0;
1620
1621 /* We should set the flags2 with aux_table_name here,
1622 in order to get the correct aux table names. */
1623 index->table->flags2 |= DICT_TF2_FTS_AUX_HEX_NAME;
1624 DBUG_EXECUTE_IF("innodb_test_wrong_fts_aux_table_name",
1625 index->table->flags2 &= ~DICT_TF2_FTS_AUX_HEX_NAME
1626 & ((1U << DICT_TF2_BITS) - 1););
1627 fts_table.type = FTS_INDEX_TABLE;
1628 fts_table.index_id = index->id;
1629 fts_table.table_id = table->id;
1630 fts_table.table = index->table;
1631 fts_table.suffix = fts_get_suffix(id);
1632
1633 /* Get aux index */
1634 fts_get_table_name(&fts_table, aux_table_name);
1635 aux_table = dict_table_open_on_name(aux_table_name, FALSE, FALSE,
1636 DICT_ERR_IGNORE_NONE);
1637 ut_ad(aux_table != NULL);
1638 aux_index = dict_table_get_first_index(aux_table);
1639
1640 ut_ad(!aux_index->is_instant());
1641 /* row_merge_write_fts_node() depends on the correct value */
1642 ut_ad(aux_index->n_core_null_bytes
1643 == UT_BITS_IN_BYTES(aux_index->n_nullable));
1644
1645 /* Create bulk load instance */
1646 ins_ctx.btr_bulk = UT_NEW_NOKEY(BtrBulk(aux_index, trx));
1647
1648 /* Create tuple for insert */
1649 ins_ctx.tuple = dtuple_create(heap, dict_index_get_n_fields(aux_index));
1650 dict_index_copy_types(ins_ctx.tuple, aux_index,
1651 dict_index_get_n_fields(aux_index));
1652
1653 /* Set TRX_ID and ROLL_PTR */
1654 dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 2),
1655 &reset_trx_id, DATA_TRX_ID_LEN);
1656 dfield_set_data(dtuple_get_nth_field(ins_ctx.tuple, 3),
1657 &reset_trx_id[DATA_TRX_ID_LEN], DATA_ROLL_PTR_LEN);
1658
1659 ut_d(ins_ctx.aux_index_id = id);
1660
1661 const ulint space = table->space_id;
1662
1663 for (i = 0; i < fts_sort_pll_degree; i++) {
1664 if (psort_info[i].merge_file[id]->n_rec == 0) {
1665 /* No Rows to read */
1666 mrec[i] = b[i] = NULL;
1667 } else {
1668 /* Read from temp file only if it has been
1669 written to. Otherwise, block memory holds
1670 all the sorted records */
1671 if (psort_info[i].merge_file[id]->offset > 0
1672 && (!row_merge_read(
1673 fd[i], foffs[i],
1674 (row_merge_block_t*) block[i],
1675 (row_merge_block_t*) crypt_block[i],
1676 space))) {
1677 error = DB_CORRUPTION;
1678 goto exit;
1679 }
1680
1681 ROW_MERGE_READ_GET_NEXT(i);
1682 }
1683 }
1684
1685 height = row_fts_build_sel_tree(sel_tree, (const mrec_t **) mrec,
1686 offsets, index);
1687
1688 start = (1U << height) - 1;
1689
1690 /* Fetch sorted records from sort buffer and insert them into
1691 corresponding FTS index auxiliary tables */
1692 for (;;) {
1693 dtuple_t* dtuple;
1694 int min_rec = 0;
1695
1696 if (fts_sort_pll_degree <= 2) {
1697 while (!mrec[min_rec]) {
1698 min_rec++;
1699
1700 if (min_rec >= (int) fts_sort_pll_degree) {
1701 row_fts_insert_tuple(
1702 &ins_ctx, &new_word,
1703 positions, &last_doc_id,
1704 NULL);
1705
1706 goto exit;
1707 }
1708 }
1709
1710 for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
1711 if (!mrec[i]) {
1712 continue;
1713 }
1714
1715 if (cmp_rec_rec_simple(
1716 mrec[i], mrec[min_rec],
1717 offsets[i], offsets[min_rec],
1718 index, NULL) < 0) {
1719 min_rec = static_cast<int>(i);
1720 }
1721 }
1722 } else {
1723 min_rec = sel_tree[0];
1724
1725 if (min_rec == -1) {
1726 row_fts_insert_tuple(
1727 &ins_ctx, &new_word,
1728 positions, &last_doc_id,
1729 NULL);
1730
1731 goto exit;
1732 }
1733 }
1734
1735 dtuple = row_rec_to_index_entry_low(
1736 mrec[min_rec], index, offsets[min_rec],
1737 tuple_heap);
1738
1739 row_fts_insert_tuple(
1740 &ins_ctx, &new_word, positions,
1741 &last_doc_id, dtuple);
1742
1743
1744 ROW_MERGE_READ_GET_NEXT(min_rec);
1745
1746 if (fts_sort_pll_degree > 2) {
1747 if (!mrec[min_rec]) {
1748 sel_tree[start + min_rec] = -1;
1749 }
1750
1751 row_fts_sel_tree_update(sel_tree, start + min_rec,
1752 height, mrec,
1753 offsets, index);
1754 }
1755
1756 count++;
1757
1758 mem_heap_empty(tuple_heap);
1759 }
1760
1761 exit:
1762 fts_sql_commit(trx);
1763
1764 trx->op_info = "";
1765
1766 mem_heap_free(tuple_heap);
1767
1768 error = ins_ctx.btr_bulk->finish(error);
1769 UT_DELETE(ins_ctx.btr_bulk);
1770
1771 dict_table_close(aux_table, FALSE, FALSE);
1772
1773 trx->free();
1774
1775 mem_heap_free(heap);
1776
1777 if (UNIV_UNLIKELY(fts_enable_diag_print)) {
1778 ib::info() << "InnoDB_FTS: inserted " << count << " records";
1779 }
1780
1781 return(error);
1782 }
1783