1 /* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License as published by
5 the Free Software Foundation; version 2 of the License.
6
7 This program is distributed in the hope that it will be useful,
8 but WITHOUT ANY WARRANTY; without even the implied warranty of
9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 GNU General Public License for more details.
11
12 You should have received a copy of the GNU General Public License
13 along with this program; if not, write to the Free Software
14 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15
16 /* Describe, check and repair of MARIA tables */
17
18 /*
19 About checksum calculation.
20
21 There are two types of checksums. Table checksum and row checksum.
22
23 Row checksum is an additional uchar at the end of dynamic length
24 records. It must be calculated if the table is configured for them.
25 Otherwise they must not be used. The variable
26 MYISAM_SHARE::calc_checksum determines if row checksums are used.
27 MI_INFO::checksum is used as temporary storage during row handling.
28 For parallel repair we must assure that only one thread can use this
29 variable. There is no problem on the write side as this is done by one
30 thread only. But when checking a record after read this could go
31 wrong. But since all threads read through a common read buffer, it is
32 sufficient if only one thread checks it.
33
34 Table checksum is an eight uchar value in the header of the index file.
35 It can be calculated even if row checksums are not used. The variable
36 MI_CHECK::glob_crc is calculated over all records.
37 MI_SORT_PARAM::calc_checksum determines if this should be done. This
38 variable is not part of MI_CHECK because it must be set per thread for
39 parallel repair. The global glob_crc must be changed by one thread
40 only. And it is sufficient to calculate the checksum once only.
41 */
42
43 #include "ma_ftdefs.h"
44 #include "ma_rt_index.h"
45 #include "ma_blockrec.h"
46 #include "trnman.h"
47 #include "ma_key_recover.h"
48 #include <my_check_opt.h>
49 #include <my_stack_alloc.h>
50 #include <my_getopt.h>
51 #ifdef HAVE_SYS_VADVISE_H
52 #include <sys/vadvise.h>
53 #endif
54
55 /* Functions defined in this file */
56
57 static int check_k_link(HA_CHECK *param, MARIA_HA *info, my_off_t next_link);
58 static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
59 MARIA_PAGE *page, ha_rows *keys,
60 ha_checksum *key_checksum, uint level);
61 static uint isam_key_length(MARIA_HA *info,MARIA_KEYDEF *keyinfo);
62 static ha_checksum calc_checksum(ha_rows count);
63 static int writekeys(MARIA_SORT_PARAM *sort_param);
64 static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
65 MARIA_KEYDEF *keyinfo,
66 my_off_t pagepos, File new_file);
67 static int sort_key_read(MARIA_SORT_PARAM *sort_param, uchar *key);
68 static int sort_maria_ft_key_read(MARIA_SORT_PARAM *sort_param, uchar *key);
69 static int sort_get_next_record(MARIA_SORT_PARAM *sort_param);
70 static int sort_key_cmp(MARIA_SORT_PARAM *sort_param, const void *a,
71 const void *b);
72 static int sort_maria_ft_key_write(MARIA_SORT_PARAM *sort_param,
73 const uchar *a);
74 static int sort_key_write(MARIA_SORT_PARAM *sort_param, const uchar *a);
75 static my_off_t get_record_for_key(MARIA_KEYDEF *keyinfo, const uchar *key);
76 static int sort_insert_key(MARIA_SORT_PARAM *sort_param,
77 reg1 MA_SORT_KEY_BLOCKS *key_block,
78 const uchar *key, my_off_t prev_block);
79 static int sort_delete_record(MARIA_SORT_PARAM *sort_param);
80 /*static int _ma_flush_pending_blocks(HA_CHECK *param);*/
81 static MA_SORT_KEY_BLOCKS *alloc_key_blocks(HA_CHECK *param, uint blocks,
82 uint buffer_length);
83 static ha_checksum maria_byte_checksum(const uchar *buf, uint length);
84 static void set_data_file_type(MARIA_SORT_INFO *sort_info, MARIA_SHARE *share);
85 static void restore_data_file_type(MARIA_SHARE *share);
86 static void change_data_file_descriptor(MARIA_HA *info, File new_file);
87 static void unuse_data_file_descriptor(MARIA_HA *info);
88 static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
89 MARIA_HA *info, uchar *record);
90 static void copy_data_file_state(MARIA_STATE_INFO *to,
91 MARIA_STATE_INFO *from);
92 static void report_keypage_fault(HA_CHECK *param, MARIA_HA *info,
93 my_off_t position);
94 static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file);
95 static my_bool _ma_flush_table_files_before_swap(HA_CHECK *param,
96 MARIA_HA *info);
97 static TrID max_trid_in_system(void);
98 static void _ma_check_print_not_visible_error(HA_CHECK *param, TrID used_trid);
99 void retry_if_quick(MARIA_SORT_PARAM *param, int error);
100 static void print_bitmap_description(MARIA_SHARE *share,
101 pgcache_page_no_t page,
102 uchar *buff);
103
104
105 /* Initialize check param with default values */
106
maria_chk_init(HA_CHECK * param)107 void maria_chk_init(HA_CHECK *param)
108 {
109 bzero((uchar*) param,sizeof(*param));
110 param->opt_follow_links=1;
111 param->keys_in_use= ~(ulonglong) 0;
112 param->search_after_block=HA_OFFSET_ERROR;
113 param->auto_increment_value= 0;
114 param->use_buffers= PAGE_BUFFER_INIT;
115 param->read_buffer_length=READ_BUFFER_INIT;
116 param->write_buffer_length=READ_BUFFER_INIT;
117 param->orig_sort_buffer_length=SORT_BUFFER_INIT;
118 param->sort_key_blocks=BUFFERS_WHEN_SORTING;
119 param->tmpfile_createflag=O_RDWR | O_TRUNC | O_EXCL;
120 param->myf_rw=MYF(MY_NABP | MY_WME | MY_WAIT_IF_FULL);
121 param->start_check_pos=0;
122 param->max_record_length= LONGLONG_MAX;
123 param->pagecache_block_size= KEY_CACHE_BLOCK_SIZE;
124 param->stats_method= MI_STATS_METHOD_NULLS_NOT_EQUAL;
125 param->max_stage= 1;
126 param->stack_end_ptr= &my_thread_var->stack_ends_here;
127 param->max_allowed_lsn= (LSN) ~0ULL;
128 }
129
130
131 /* Initialize check param and maria handler for check of table */
132
maria_chk_init_for_check(HA_CHECK * param,MARIA_HA * info)133 void maria_chk_init_for_check(HA_CHECK *param, MARIA_HA *info)
134 {
135 param->not_visible_rows_found= 0;
136 param->max_found_trid= 0;
137
138 /*
139 Set up transaction handler so that we can see all rows. When rows is read
140 we will check the found id against param->max_tried
141 */
142 if (!info->s->base.born_transactional)
143 {
144 /*
145 There are no trids. Howver we want to set max_trid to make test of
146 create_trid simpler.
147 */
148 param->max_trid= ~(TrID) 0;
149 }
150 else if (param->max_trid == 0 || param->max_trid == ~(TrID) 0)
151 {
152 if (!ma_control_file_inited())
153 param->max_trid= 0; /* Give warning for first trid found */
154 else
155 param->max_trid= max_trid_in_system();
156 }
157
158 maria_ignore_trids(info);
159 }
160
161
162 /* Check the status flags for the table */
163
maria_chk_status(HA_CHECK * param,MARIA_HA * info)164 int maria_chk_status(HA_CHECK *param, MARIA_HA *info)
165 {
166 MARIA_SHARE *share= info->s;
167
168 /* Protection for HA_EXTRA_FLUSH */
169 mysql_mutex_lock(&share->intern_lock);
170
171 if (maria_is_crashed_on_repair(info))
172 _ma_check_print_warning(param,
173 "Table is marked as crashed and last repair failed");
174 else if (maria_in_repair(info))
175 _ma_check_print_warning(param,
176 "Last repair was aborted before finishing");
177 else if (maria_is_crashed(info))
178 _ma_check_print_warning(param,
179 "Table is marked as crashed");
180 if (share->state.open_count != (uint) (share->global_changed ? 1 : 0))
181 {
182 /* Don't count this as a real warning, as check can correct this ! */
183 my_bool save=param->warning_printed;
184 _ma_check_print_warning(param,
185 share->state.open_count==1 ?
186 "%d client is using or hasn't closed the table properly" :
187 "%d clients are using or haven't closed the table properly",
188 share->state.open_count);
189 /* If this will be fixed by the check, forget the warning */
190 if (param->testflag & T_UPDATE_STATE)
191 param->warning_printed=save;
192 }
193
194 mysql_mutex_unlock(&share->intern_lock);
195
196 if (share->state.create_trid > param->max_trid)
197 {
198 param->wrong_trd_printed= 1; /* Force should run zerofill */
199 _ma_check_print_warning(param,
200 "Table create_trd (%llu) > current max_transaction id (%llu). Table needs to be repaired or zerofilled to be usable",
201 share->state.create_trid, param->max_trid);
202 return 1;
203 }
204 return 0;
205 }
206
207 /*
208 Check delete links in row data
209 */
210
maria_chk_del(HA_CHECK * param,register MARIA_HA * info,ulonglong test_flag)211 int maria_chk_del(HA_CHECK *param, register MARIA_HA *info,
212 ulonglong test_flag)
213 {
214 MARIA_SHARE *share= info->s;
215 reg2 ha_rows i;
216 uint delete_link_length;
217 my_off_t empty,next_link,UNINIT_VAR(old_link);
218 char buff[22],buff2[22];
219 DBUG_ENTER("maria_chk_del");
220
221 param->record_checksum=0;
222
223 if (share->data_file_type == BLOCK_RECORD)
224 DBUG_RETURN(0); /* No delete links here */
225
226 delete_link_length=((share->options & HA_OPTION_PACK_RECORD) ? 20 :
227 share->rec_reflength+1);
228
229 if (!(test_flag & T_SILENT))
230 puts("- check record delete-chain");
231
232 next_link=share->state.dellink;
233 if (share->state.state.del == 0)
234 {
235 if (test_flag & T_VERBOSE)
236 {
237 puts("No recordlinks");
238 }
239 }
240 else
241 {
242 if (test_flag & T_VERBOSE)
243 printf("Recordlinks: ");
244 empty=0;
245 for (i= share->state.state.del ; i > 0L && next_link != HA_OFFSET_ERROR ; i--)
246 {
247 if (_ma_killed_ptr(param))
248 DBUG_RETURN(1);
249 if (test_flag & T_VERBOSE)
250 printf(" %9s",llstr(next_link,buff));
251 if (next_link >= share->state.state.data_file_length)
252 goto wrong;
253 if (mysql_file_pread(info->dfile.file, (uchar*) buff, delete_link_length,
254 next_link,MYF(MY_NABP)))
255 {
256 if (test_flag & T_VERBOSE) puts("");
257 _ma_check_print_error(param,"Can't read delete-link at filepos: %s",
258 llstr(next_link,buff));
259 DBUG_RETURN(1);
260 }
261 if (*buff != '\0')
262 {
263 if (test_flag & T_VERBOSE) puts("");
264 _ma_check_print_error(param,"Record at pos: %s is not remove-marked",
265 llstr(next_link,buff));
266 goto wrong;
267 }
268 if (share->options & HA_OPTION_PACK_RECORD)
269 {
270 my_off_t prev_link=mi_sizekorr(buff+12);
271 if (empty && prev_link != old_link)
272 {
273 if (test_flag & T_VERBOSE) puts("");
274 _ma_check_print_error(param,
275 "Deleted block at %s doesn't point back at previous delete link",
276 llstr(next_link,buff2));
277 goto wrong;
278 }
279 old_link=next_link;
280 next_link=mi_sizekorr(buff+4);
281 empty+=mi_uint3korr(buff+1);
282 }
283 else
284 {
285 param->record_checksum+=(ha_checksum) next_link;
286 next_link= _ma_rec_pos(share, (uchar *) buff + 1);
287 empty+=share->base.pack_reclength;
288 }
289 }
290 if (share->state.state.del && (test_flag & T_VERBOSE))
291 puts("\n");
292 if (empty != share->state.state.empty)
293 {
294 _ma_check_print_warning(param,
295 "Found %s deleted space in delete link chain. Should be %s",
296 llstr(empty,buff2),
297 llstr(share->state.state.empty,buff));
298 }
299 if (next_link != HA_OFFSET_ERROR)
300 {
301 _ma_check_print_error(param,
302 "Found more than the expected %s deleted rows in delete link chain",
303 llstr(share->state.state.del, buff));
304 goto wrong;
305 }
306 if (i != 0)
307 {
308 _ma_check_print_error(param,
309 "Found %s deleted rows in delete link chain. Should be %s",
310 llstr(share->state.state.del - i, buff2),
311 llstr(share->state.state.del, buff));
312 goto wrong;
313 }
314 }
315 DBUG_RETURN(0);
316
317 wrong:
318 param->testflag|=T_RETRY_WITHOUT_QUICK;
319 if (test_flag & T_VERBOSE)
320 puts("");
321 _ma_check_print_error(param,"record delete-link-chain corrupted");
322 DBUG_RETURN(1);
323 } /* maria_chk_del */
324
325
326 /* Check delete links in index file */
327
check_k_link(HA_CHECK * param,register MARIA_HA * info,my_off_t next_link)328 static int check_k_link(HA_CHECK *param, register MARIA_HA *info,
329 my_off_t next_link)
330 {
331 MARIA_SHARE *share= info->s;
332 uint block_size= share->block_size;
333 ha_rows records;
334 char llbuff[21], llbuff2[21];
335 uchar *buff;
336 DBUG_ENTER("check_k_link");
337
338 if (next_link == HA_OFFSET_ERROR)
339 DBUG_RETURN(0); /* Avoid printing empty line */
340
341 records= (ha_rows) (share->state.state.key_file_length / block_size);
342 while (next_link != HA_OFFSET_ERROR && records > 0)
343 {
344 if (_ma_killed_ptr(param))
345 DBUG_RETURN(1);
346 if (param->testflag & T_VERBOSE)
347 printf("%16s",llstr(next_link,llbuff));
348
349 /* Key blocks must lay within the key file length entirely. */
350 if (next_link + block_size > share->state.state.key_file_length)
351 {
352 /* purecov: begin tested */
353 _ma_check_print_error(param, "Invalid key block position: %s "
354 "key block size: %u file_length: %s",
355 llstr(next_link, llbuff), block_size,
356 llstr(share->state.state.key_file_length, llbuff2));
357 DBUG_RETURN(1);
358 /* purecov: end */
359 }
360
361 /* Key blocks must be aligned at block_size */
362 if (next_link & (block_size -1))
363 {
364 /* purecov: begin tested */
365 _ma_check_print_error(param, "Mis-aligned key block: %s "
366 "minimum key block length: %u",
367 llstr(next_link, llbuff),
368 block_size);
369 DBUG_RETURN(1);
370 /* purecov: end */
371 }
372
373 DBUG_ASSERT(share->pagecache->block_size == block_size);
374 if (!(buff= pagecache_read(share->pagecache,
375 &share->kfile,
376 (pgcache_page_no_t) (next_link / block_size),
377 DFLT_INIT_HITS,
378 info->buff, PAGECACHE_READ_UNKNOWN_PAGE,
379 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
380 {
381 /* purecov: begin tested */
382 _ma_check_print_error(param, "key cache read error for block: %s",
383 llstr(next_link,llbuff));
384 DBUG_RETURN(1);
385 /* purecov: end */
386 }
387 if (_ma_get_keynr(info->s, buff) != MARIA_DELETE_KEY_NR)
388 _ma_check_print_error(param, "Page at %s is not delete marked",
389 llstr(next_link, llbuff));
390
391 next_link= mi_sizekorr(buff + share->keypage_header);
392 records--;
393 param->key_file_blocks+=block_size;
394 }
395 if (param->testflag & T_VERBOSE)
396 {
397 if (next_link != HA_OFFSET_ERROR)
398 printf("%16s\n",llstr(next_link,llbuff));
399 else
400 puts("");
401 }
402 DBUG_RETURN (next_link != HA_OFFSET_ERROR);
403 } /* check_k_link */
404
405
406 /* Check sizes of files */
407
maria_chk_size(HA_CHECK * param,register MARIA_HA * info)408 int maria_chk_size(HA_CHECK *param, register MARIA_HA *info)
409 {
410 MARIA_SHARE *share= info->s;
411 int error;
412 register my_off_t skr,size;
413 char buff[22],buff2[22];
414 DBUG_ENTER("maria_chk_size");
415
416 if (info->s3)
417 {
418 /* We cannot check file sizes for S3 */
419 DBUG_RETURN(0);
420 }
421
422 if (!(param->testflag & T_SILENT))
423 puts("- check file-size");
424
425 /*
426 The following is needed if called externally (not from maria_chk).
427 To get a correct physical size we need to flush them.
428 */
429 if ((error= _ma_flush_table_files(info,
430 MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
431 FLUSH_FORCE_WRITE, FLUSH_FORCE_WRITE)))
432 _ma_check_print_error(param, "Failed to flush data or index file");
433
434 size= mysql_file_seek(share->kfile.file, 0L, MY_SEEK_END, MYF(MY_THREADSAFE));
435 if ((skr=(my_off_t) share->state.state.key_file_length) != size)
436 {
437 /* Don't give error if file generated by maria_pack */
438 if (skr > size && maria_is_any_key_active(share->state.key_map))
439 {
440 error=1;
441 _ma_check_print_error(param,
442 "Size of indexfile is: %-8s Expected: %s",
443 llstr(size,buff), llstr(skr,buff2));
444 share->state.state.key_file_length= size;
445 }
446 else if (!(param->testflag & T_VERY_SILENT))
447 _ma_check_print_warning(param,
448 "Size of indexfile is: %-8s Expected: %s",
449 llstr(size,buff), llstr(skr,buff2));
450 }
451 if (size > share->base.max_key_file_length)
452 {
453 _ma_check_print_warning(param,
454 "Size of indexfile is: %-8s which is bigger than max indexfile size: %s",
455 ullstr(size,buff),
456 ullstr(share->base.max_key_file_length, buff2));
457 }
458 else if (!(param->testflag & T_VERY_SILENT) &&
459 ! (share->options & HA_OPTION_COMPRESS_RECORD) &&
460 ulonglong2double(share->state.state.key_file_length) >
461 ulonglong2double(share->base.margin_key_file_length)*0.9)
462 _ma_check_print_warning(param,"Keyfile is almost full, %10s of %10s used",
463 llstr(share->state.state.key_file_length,buff),
464 llstr(share->base.max_key_file_length,buff));
465
466 size= mysql_file_seek(info->dfile.file, 0L, MY_SEEK_END, MYF(0));
467 skr=(my_off_t) share->state.state.data_file_length;
468 if (share->options & HA_OPTION_COMPRESS_RECORD)
469 skr+= MEMMAP_EXTRA_MARGIN;
470 #ifdef USE_RELOC
471 if (share->data_file_type == STATIC_RECORD &&
472 skr < (my_off_t) share->base.reloc*share->base.min_pack_length)
473 skr=(my_off_t) share->base.reloc*share->base.min_pack_length;
474 #endif
475 if (skr != size)
476 {
477 share->state.state.data_file_length=size; /* Skip other errors */
478 if (skr > size && skr != size + MEMMAP_EXTRA_MARGIN)
479 {
480 error=1;
481 _ma_check_print_error(param,"Size of datafile is: %-9s Expected: %s",
482 llstr(size,buff), llstr(skr,buff2));
483 param->testflag|=T_RETRY_WITHOUT_QUICK;
484 }
485 else
486 {
487 _ma_check_print_warning(param,
488 "Size of datafile is: %-9s Expected: %s",
489 llstr(size,buff), llstr(skr,buff2));
490 }
491 }
492 if (size > share->base.max_data_file_length)
493 {
494 _ma_check_print_warning(param,
495 "Size of datafile is: %-8s which is bigger than max datafile size: %s",
496 ullstr(size,buff),
497 ullstr(share->base.max_data_file_length, buff2));
498 } else if (!(param->testflag & T_VERY_SILENT) &&
499 !(share->options & HA_OPTION_COMPRESS_RECORD) &&
500 ulonglong2double(share->state.state.data_file_length) >
501 (ulonglong2double(share->base.max_data_file_length)*0.9))
502 _ma_check_print_warning(param, "Datafile is almost full, %10s of %10s used",
503 llstr(share->state.state.data_file_length,buff),
504 llstr(share->base.max_data_file_length,buff2));
505 DBUG_RETURN(error);
506 } /* maria_chk_size */
507
508
509 /* Check keys */
510
maria_chk_key(HA_CHECK * param,register MARIA_HA * info)511 int maria_chk_key(HA_CHECK *param, register MARIA_HA *info)
512 {
513 uint key,found_keys=0,full_text_keys=0,result=0;
514 ha_rows keys;
515 ha_checksum old_record_checksum,init_checksum;
516 my_off_t all_keydata,all_totaldata,key_totlength,length;
517 double *rec_per_key_part;
518 MARIA_SHARE *share= info->s;
519 MARIA_KEYDEF *keyinfo;
520 char buff[22],buff2[22];
521 MARIA_PAGE page;
522 DBUG_ENTER("maria_chk_key");
523
524 if (!(param->testflag & T_SILENT))
525 puts("- check key delete-chain");
526
527 param->key_file_blocks=share->base.keystart;
528 if (check_k_link(param, info, share->state.key_del))
529 {
530 if (param->testflag & T_VERBOSE) puts("");
531 _ma_check_print_error(param,"key delete-link-chain corrupted");
532 DBUG_RETURN(-1);
533 }
534
535 if (!(param->testflag & T_SILENT))
536 puts("- check index reference");
537
538 all_keydata=all_totaldata=key_totlength=0;
539 init_checksum=param->record_checksum;
540 old_record_checksum=0;
541 if (share->data_file_type == STATIC_RECORD)
542 old_record_checksum= (calc_checksum(share->state.state.records +
543 share->state.state.del-1) *
544 share->base.pack_reclength);
545 rec_per_key_part= param->new_rec_per_key_part;
546 for (key= 0,keyinfo= &share->keyinfo[0]; key < share->base.keys ;
547 rec_per_key_part+=keyinfo->keysegs, key++, keyinfo++)
548 {
549 param->key_crc[key]=0;
550 if (! maria_is_key_active(share->state.key_map, key))
551 {
552 /* Remember old statistics for key */
553 memcpy((char*) rec_per_key_part,
554 (char*) (share->state.rec_per_key_part +
555 (uint) (rec_per_key_part - param->new_rec_per_key_part)),
556 keyinfo->keysegs*sizeof(*rec_per_key_part));
557 continue;
558 }
559 found_keys++;
560 _ma_report_progress(param, key, share->base.keys);
561
562 param->record_checksum=init_checksum;
563
564 bzero((char*) ¶m->unique_count,sizeof(param->unique_count));
565 bzero((char*) ¶m->notnull_count,sizeof(param->notnull_count));
566
567 if ((!(param->testflag & T_SILENT)))
568 printf ("- check data record references index: %d\n",key+1);
569 if (keyinfo->flag & (HA_FULLTEXT | HA_SPATIAL))
570 full_text_keys++;
571 if (share->state.key_root[key] == HA_OFFSET_ERROR)
572 {
573 if (share->state.state.records != 0 && !(keyinfo->flag & HA_FULLTEXT))
574 _ma_check_print_error(param, "Key tree %u is empty", key + 1);
575 goto do_stat;
576 }
577 if (_ma_fetch_keypage(&page, info, keyinfo, share->state.key_root[key],
578 PAGECACHE_LOCK_LEFT_UNLOCKED, DFLT_INIT_HITS,
579 info->buff, 0))
580 {
581 report_keypage_fault(param, info, share->state.key_root[key]);
582 if (!(param->testflag & T_INFO))
583 DBUG_RETURN(-1);
584 result= -1;
585 continue;
586 }
587 param->key_file_blocks+=keyinfo->block_length;
588 keys=0;
589 param->keydata=param->totaldata=0;
590 param->key_blocks=0;
591 param->max_level=0;
592 if (chk_index(param, info,keyinfo, &page, &keys, param->key_crc+key,1))
593 DBUG_RETURN(-1);
594 if (!(keyinfo->flag & (HA_FULLTEXT | HA_SPATIAL | HA_RTREE_INDEX)))
595 {
596 if (keys != share->state.state.records)
597 {
598 _ma_check_print_error(param,"Found %s keys of %s",llstr(keys,buff),
599 llstr(share->state.state.records,buff2));
600 if (!(param->testflag & (T_INFO | T_EXTEND)))
601 DBUG_RETURN(-1);
602 result= -1;
603 continue;
604 }
605 if ((found_keys - full_text_keys == 1 &&
606 !(share->data_file_type == STATIC_RECORD)) ||
607 (param->testflag & T_DONT_CHECK_CHECKSUM))
608 old_record_checksum= param->record_checksum;
609 else if (old_record_checksum != param->record_checksum)
610 {
611 if (key)
612 _ma_check_print_error(param,
613 "Key %u doesn't point at same records as "
614 "key 1",
615 key+1);
616 else
617 _ma_check_print_error(param,"Key 1 doesn't point at all records");
618 if (!(param->testflag & T_INFO))
619 DBUG_RETURN(-1);
620 result= -1;
621 continue;
622 }
623 }
624 if ((uint) share->base.auto_key -1 == key)
625 {
626 /* Check that auto_increment key is bigger than max key value */
627 ulonglong auto_increment;
628 const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
629 info->lastinx=key;
630 _ma_read_key_record(info, info->rec_buff, 0);
631 auto_increment=
632 ma_retrieve_auto_increment(info->rec_buff + keyseg->start,
633 keyseg->type);
634 if (auto_increment > share->state.auto_increment)
635 {
636 _ma_check_print_warning(param, "Auto-increment value: %s is smaller "
637 "than max used value: %s",
638 llstr(share->state.auto_increment,buff2),
639 llstr(auto_increment, buff));
640 }
641 if (param->testflag & T_AUTO_INC)
642 {
643 set_if_bigger(share->state.auto_increment,
644 auto_increment);
645 set_if_bigger(share->state.auto_increment,
646 param->auto_increment_value);
647 }
648
649 /* Check that there isn't a row with auto_increment = 0 in the table */
650 maria_extra(info,HA_EXTRA_KEYREAD,0);
651 bzero(info->lastkey_buff, keyinfo->seg->length);
652 if (!maria_rkey(info, info->rec_buff, key,
653 info->lastkey_buff,
654 (key_part_map) 1, HA_READ_KEY_EXACT))
655 {
656 /* Don't count this as a real warning, as maria_chk can't correct it */
657 my_bool save=param->warning_printed;
658 _ma_check_print_warning(param, "Found row where the auto_increment "
659 "column has the value 0");
660 param->warning_printed=save;
661 }
662 maria_extra(info,HA_EXTRA_NO_KEYREAD,0);
663 }
664
665 length=(my_off_t) isam_key_length(info,keyinfo)*keys + param->key_blocks*2;
666 if (param->testflag & T_INFO && param->totaldata != 0L && keys != 0L)
667 printf("Key: %2d: Keyblocks used: %3d%% Packed: %4d%% Max levels: %2d\n",
668 key+1,
669 (int) (my_off_t2double(param->keydata)*100.0/my_off_t2double(param->totaldata)),
670 (int) ((my_off_t2double(length) - my_off_t2double(param->keydata))*100.0/
671 my_off_t2double(length)),
672 param->max_level);
673 all_keydata+=param->keydata; all_totaldata+=param->totaldata; key_totlength+=length;
674
675 do_stat:
676 if (param->testflag & T_STATISTICS)
677 maria_update_key_parts(keyinfo, rec_per_key_part, param->unique_count,
678 param->stats_method == MI_STATS_METHOD_IGNORE_NULLS?
679 param->notnull_count: NULL,
680 (ulonglong)share->state.state.records);
681 }
682 if (param->testflag & T_INFO)
683 {
684 if (all_totaldata != 0L && found_keys > 0)
685 printf("Total: Keyblocks used: %3d%% Packed: %4d%%\n\n",
686 (int) (my_off_t2double(all_keydata)*100.0/
687 my_off_t2double(all_totaldata)),
688 (int) ((my_off_t2double(key_totlength) -
689 my_off_t2double(all_keydata))*100.0/
690 my_off_t2double(key_totlength)));
691 else if (all_totaldata != 0L && maria_is_any_key_active(share->state.key_map))
692 puts("");
693 }
694 if (param->key_file_blocks != share->state.state.key_file_length &&
695 share->state.key_map == ~(ulonglong) 0)
696 _ma_check_print_warning(param, "Some data are unreferenced in keyfile");
697 if (found_keys != full_text_keys)
698 param->record_checksum=old_record_checksum-init_checksum; /* Remove delete links */
699 else
700 param->record_checksum=0;
701 DBUG_RETURN(result);
702 } /* maria_chk_key */
703
704
705
chk_index_down(HA_CHECK * param,MARIA_HA * info,MARIA_KEYDEF * keyinfo,my_off_t page,uchar * buff,ha_rows * keys,ha_checksum * key_checksum,uint level)706 static int chk_index_down(HA_CHECK *param, MARIA_HA *info,
707 MARIA_KEYDEF *keyinfo,
708 my_off_t page, uchar *buff, ha_rows *keys,
709 ha_checksum *key_checksum, uint level)
710 {
711 char llbuff[22],llbuff2[22];
712 MARIA_SHARE *share= info->s;
713 MARIA_PAGE ma_page;
714 DBUG_ENTER("chk_index_down");
715
716 /* Key blocks must lay within the key file length entirely. */
717 if (page + keyinfo->block_length > share->state.state.key_file_length)
718 {
719 /* purecov: begin tested */
720 /* Give it a chance to fit in the real file size. */
721 my_off_t max_length= mysql_file_seek(info->s->kfile.file, 0L, MY_SEEK_END,
722 MYF(MY_THREADSAFE));
723 _ma_check_print_error(param, "Invalid key block position: %s "
724 "key block size: %u file_length: %s",
725 llstr(page, llbuff), keyinfo->block_length,
726 llstr(share->state.state.key_file_length, llbuff2));
727 if (page + keyinfo->block_length > max_length)
728 goto err;
729 /* Fix the remembered key file length. */
730 share->state.state.key_file_length= (max_length &
731 ~ (my_off_t) (keyinfo->block_length -
732 1));
733 /* purecov: end */
734 }
735
736 /* Key blocks must be aligned at block length */
737 if (page & (info->s->block_size -1))
738 {
739 /* purecov: begin tested */
740 _ma_check_print_error(param, "Mis-aligned key block: %s "
741 "key block length: %u",
742 llstr(page, llbuff), info->s->block_size);
743 goto err;
744 /* purecov: end */
745 }
746
747 if (_ma_fetch_keypage(&ma_page, info, keyinfo, page,
748 PAGECACHE_LOCK_LEFT_UNLOCKED,
749 DFLT_INIT_HITS, buff, 0))
750 {
751 report_keypage_fault(param, info, page);
752 goto err;
753 }
754 param->key_file_blocks+=keyinfo->block_length;
755 if (chk_index(param, info, keyinfo, &ma_page, keys, key_checksum,level))
756 goto err;
757
758 DBUG_RETURN(0);
759
760 /* purecov: begin tested */
761 err:
762 DBUG_RETURN(1);
763 /* purecov: end */
764 }
765
766
767 /*
768 "Ignore NULLs" statistics collection method: process first index tuple.
769
770 SYNOPSIS
771 maria_collect_stats_nonulls_first()
772 keyseg IN Array of key part descriptions
773 notnull INOUT Array, notnull[i] = (number of {keypart1...keypart_i}
774 tuples that don't contain NULLs)
775 key IN Key values tuple
776
777 DESCRIPTION
778 Process the first index tuple - find out which prefix tuples don't
779 contain NULLs, and update the array of notnull counters accordingly.
780 */
781
782 static
maria_collect_stats_nonulls_first(HA_KEYSEG * keyseg,ulonglong * notnull,const uchar * key)783 void maria_collect_stats_nonulls_first(HA_KEYSEG *keyseg, ulonglong *notnull,
784 const uchar *key)
785 {
786 size_t first_null, kp;
787 first_null= ha_find_null(keyseg, key) - keyseg;
788 /*
789 All prefix tuples that don't include keypart_{first_null} are not-null
790 tuples (and all others aren't), increment counters for them.
791 */
792 for (kp= 0; kp < first_null; kp++)
793 notnull[kp]++;
794 }
795
796
797 /*
798 "Ignore NULLs" statistics collection method: process next index tuple.
799
800 SYNOPSIS
801 maria_collect_stats_nonulls_next()
802 keyseg IN Array of key part descriptions
803 notnull INOUT Array, notnull[i] = (number of {keypart1...keypart_i}
804 tuples that don't contain NULLs)
805 prev_key IN Previous key values tuple
806 last_key IN Next key values tuple
807
808 DESCRIPTION
809 Process the next index tuple:
810 1. Find out which prefix tuples of last_key don't contain NULLs, and
811 update the array of notnull counters accordingly.
812 2. Find the first keypart number where the prev_key and last_key tuples
813 are different(A), or last_key has NULL value(B), and return it, so the
814 caller can count number of unique tuples for each key prefix. We don't
815 need (B) to be counted, and that is compensated back in
816 maria_update_key_parts().
817
818 RETURN
819 1 + number of first keypart where values differ or last_key tuple has NULL
820 */
821
822 static
maria_collect_stats_nonulls_next(HA_KEYSEG * keyseg,ulonglong * notnull,const uchar * prev_key,const uchar * last_key)823 int maria_collect_stats_nonulls_next(HA_KEYSEG *keyseg, ulonglong *notnull,
824 const uchar *prev_key,
825 const uchar *last_key)
826 {
827 uint diffs[2];
828 size_t first_null_seg, kp;
829 HA_KEYSEG *seg;
830
831 /*
832 Find the first keypart where values are different or either of them is
833 NULL. We get results in diffs array:
834 diffs[0]= 1 + number of first different keypart
835 diffs[1]=offset: (last_key + diffs[1]) points to first value in
836 last_key that is NULL or different from corresponding
837 value in prev_key.
838 */
839 ha_key_cmp(keyseg, prev_key, last_key, USE_WHOLE_KEY,
840 SEARCH_FIND | SEARCH_NULL_ARE_NOT_EQUAL, diffs);
841 seg= keyseg + diffs[0] - 1;
842
843 /* Find first NULL in last_key */
844 first_null_seg= ha_find_null(seg, last_key + diffs[1]) - keyseg;
845 for (kp= 0; kp < first_null_seg; kp++)
846 notnull[kp]++;
847
848 /*
849 Return 1+ number of first key part where values differ. Don't care if
850 these were NULLs and not .... We compensate for that in
851 maria_update_key_parts.
852 */
853 return diffs[0];
854 }
855
856
857 /* Check if index is ok */
858
chk_index(HA_CHECK * param,MARIA_HA * info,MARIA_KEYDEF * keyinfo,MARIA_PAGE * anc_page,ha_rows * keys,ha_checksum * key_checksum,uint level)859 static int chk_index(HA_CHECK *param, MARIA_HA *info, MARIA_KEYDEF *keyinfo,
860 MARIA_PAGE *anc_page, ha_rows *keys,
861 ha_checksum *key_checksum, uint level)
862 {
863 int flag;
864 uint comp_flag, page_flag, nod_flag;
865 uchar *temp_buff, *keypos, *old_keypos, *endpos;
866 my_off_t next_page,record;
867 MARIA_SHARE *share= info->s;
868 char llbuff[22];
869 uint diff_pos[2];
870 uchar *tmp_key_buff;
871 my_bool temp_buff_alloced;
872 MARIA_KEY tmp_key;
873 DBUG_ENTER("chk_index");
874 DBUG_DUMP("buff", anc_page->buff, anc_page->size);
875
876 /* TODO: implement appropriate check for RTree keys */
877 if (keyinfo->flag & (HA_SPATIAL | HA_RTREE_INDEX))
878 DBUG_RETURN(0);
879
880 alloc_on_stack(*param->stack_end_ptr, temp_buff, temp_buff_alloced,
881 (keyinfo->block_length + keyinfo->max_store_length));
882 if (!temp_buff)
883 {
884 _ma_check_print_error(param,"Not enough memory for keyblock");
885 DBUG_RETURN(-1);
886 }
887 tmp_key_buff= temp_buff+ keyinfo->block_length;
888
889 if (keyinfo->flag & HA_NOSAME)
890 {
891 /* Not real duplicates */
892 comp_flag=SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT;
893 }
894 else
895 comp_flag=SEARCH_SAME; /* Keys in positionorder */
896
897 page_flag= anc_page->flag;
898 nod_flag= anc_page->node;
899 old_keypos= anc_page->buff + share->keypage_header;
900 keypos= old_keypos + nod_flag;
901 endpos= anc_page->buff + anc_page->size;
902
903 param->keydata+= anc_page->size;
904 param->totaldata+= keyinfo->block_length; /* INFO */
905 param->key_blocks++;
906 if (level > param->max_level)
907 param->max_level=level;
908
909 if (_ma_get_keynr(share, anc_page->buff) != keyinfo->key_nr)
910 _ma_check_print_error(param, "Page at %s is not marked for index %u",
911 llstr(anc_page->pos, llbuff),
912 (uint) keyinfo->key_nr);
913 if (page_flag & KEYPAGE_FLAG_HAS_TRANSID)
914 {
915 if (!share->base.born_transactional)
916 {
917 _ma_check_print_error(param,
918 "Page at %s is marked with HAS_TRANSID even if "
919 "table is not transactional",
920 llstr(anc_page->pos, llbuff));
921 }
922 }
923 if (share->base.born_transactional)
924 {
925 LSN lsn= lsn_korr(anc_page->buff);
926 if ((ulonglong) lsn > param->max_allowed_lsn)
927 {
928 /* Avoid flooding of errors */
929 if (param->skip_lsn_error_count++ < MAX_LSN_ERRORS)
930 {
931 _ma_check_print_error(param,
932 "Page at %s as wrong LSN " LSN_FMT ". Current "
933 "LSN is " LSN_FMT,
934 llstr(anc_page->pos, llbuff),
935 LSN_IN_PARTS(lsn),
936 LSN_IN_PARTS(param->max_allowed_lsn));
937 }
938 }
939 }
940 if (anc_page->size > share->max_index_block_size)
941 {
942 _ma_check_print_error(param,
943 "Page at %s has impossible (too big) pagelength",
944 llstr(anc_page->pos, llbuff));
945 goto err;
946 }
947
948 info->last_key.keyinfo= tmp_key.keyinfo= keyinfo;
949 info->lastinx= ~0; /* Safety */
950 tmp_key.data= tmp_key_buff;
951 for ( ;; _ma_copy_key(&info->last_key, &tmp_key))
952 {
953 if (nod_flag)
954 {
955 if (_ma_killed_ptr(param))
956 goto err;
957 next_page= _ma_kpos(nod_flag,keypos);
958 if (chk_index_down(param,info,keyinfo,next_page,
959 temp_buff,keys,key_checksum,level+1))
960 {
961 DBUG_DUMP("page_data", old_keypos, (uint) (keypos - old_keypos));
962 goto err;
963 }
964 }
965 old_keypos=keypos;
966 if (keypos >= endpos ||
967 !(*keyinfo->get_key)(&tmp_key, page_flag, nod_flag, &keypos))
968 break;
969 if (keypos > endpos)
970 {
971 _ma_check_print_error(param,
972 "Page length and length of keys don't match at "
973 "page: %s",
974 llstr(anc_page->pos,llbuff));
975 goto err;
976 }
977 if (share->data_file_type == BLOCK_RECORD &&
978 !(page_flag & KEYPAGE_FLAG_HAS_TRANSID) &&
979 key_has_transid(tmp_key.data + tmp_key.data_length +
980 share->rec_reflength-1))
981 {
982 _ma_check_print_error(param,
983 "Found key marked for transid on page that is not "
984 "marked for transid at: %s",
985 llstr(anc_page->pos,llbuff));
986 goto err;
987 }
988
989 if ((*keys)++ &&
990 (flag=ha_key_cmp(keyinfo->seg, info->last_key.data, tmp_key.data,
991 tmp_key.data_length + tmp_key.ref_length,
992 (comp_flag | SEARCH_INSERT | (tmp_key.flag >> 1) |
993 info->last_key.flag), diff_pos)) >=0)
994 {
995 DBUG_DUMP_KEY("old", &info->last_key);
996 DBUG_DUMP_KEY("new", &tmp_key);
997 DBUG_DUMP("new_in_page", old_keypos, (uint) (keypos-old_keypos));
998
999 if ((comp_flag & SEARCH_FIND) && flag == 0)
1000 _ma_check_print_error(param,"Found duplicated key at page %s",
1001 llstr(anc_page->pos,llbuff));
1002 else
1003 _ma_check_print_error(param,"Key in wrong position at page %s",
1004 llstr(anc_page->pos,llbuff));
1005 goto err;
1006 }
1007
1008 if (param->testflag & T_STATISTICS)
1009 {
1010 if (*keys != 1L) /* not first_key */
1011 {
1012 if (param->stats_method == MI_STATS_METHOD_NULLS_NOT_EQUAL)
1013 ha_key_cmp(keyinfo->seg, info->last_key.data,
1014 tmp_key.data, tmp_key.data_length,
1015 SEARCH_FIND | SEARCH_NULL_ARE_NOT_EQUAL,
1016 diff_pos);
1017 else if (param->stats_method == MI_STATS_METHOD_IGNORE_NULLS)
1018 {
1019 diff_pos[0]= maria_collect_stats_nonulls_next(keyinfo->seg,
1020 param->notnull_count,
1021 info->last_key.data,
1022 tmp_key.data);
1023 }
1024 param->unique_count[diff_pos[0]-1]++;
1025 }
1026 else
1027 {
1028 if (param->stats_method == MI_STATS_METHOD_IGNORE_NULLS)
1029 maria_collect_stats_nonulls_first(keyinfo->seg, param->notnull_count,
1030 tmp_key.data);
1031 }
1032 }
1033 (*key_checksum)+= maria_byte_checksum(tmp_key.data, tmp_key.data_length);
1034 record= _ma_row_pos_from_key(&tmp_key);
1035
1036 if (keyinfo->flag & HA_FULLTEXT) /* special handling for ft2 */
1037 {
1038 uint off;
1039 int subkeys;
1040 get_key_full_length_rdonly(off, tmp_key.data);
1041 subkeys= ft_sintXkorr(tmp_key.data + off);
1042 if (subkeys < 0)
1043 {
1044 ha_rows tmp_keys=0;
1045 share->ft2_keyinfo.key_nr= keyinfo->key_nr;
1046 if (chk_index_down(param,info,&share->ft2_keyinfo,record,
1047 temp_buff,&tmp_keys,key_checksum,1))
1048 goto err;
1049 if (tmp_keys + subkeys)
1050 {
1051 _ma_check_print_error(param,
1052 "Number of words in the 2nd level tree "
1053 "does not match the number in the header. "
1054 "Parent word in on the page %s, offset %u",
1055 llstr(anc_page->pos,llbuff),
1056 (uint) (old_keypos - anc_page->buff));
1057 goto err;
1058 }
1059 (*keys)+=tmp_keys-1;
1060 continue;
1061 }
1062 /* fall through */
1063 }
1064 if ((share->data_file_type != BLOCK_RECORD &&
1065 share->data_file_type != NO_RECORD &&
1066 record >= share->state.state.data_file_length) ||
1067 (share->data_file_type == BLOCK_RECORD &&
1068 ma_recordpos_to_page(record) * share->base.min_block_length >=
1069 share->state.state.data_file_length) ||
1070 (share->data_file_type == NO_RECORD && record != 0))
1071 {
1072 #ifndef DBUG_OFF
1073 char llbuff2[22], llbuff3[22];
1074 #endif
1075 _ma_check_print_error(param,
1076 "Found key at page %s that points to record "
1077 "outside datafile",
1078 llstr(anc_page->pos,llbuff));
1079 DBUG_PRINT("test",("page: %s record: %s filelength: %s",
1080 llstr(anc_page->pos,llbuff),llstr(record,llbuff2),
1081 llstr(share->state.state.data_file_length,llbuff3)));
1082 DBUG_DUMP_KEY("key", &tmp_key);
1083 DBUG_DUMP("new_in_page", old_keypos, (uint) (keypos-old_keypos));
1084 goto err;
1085 }
1086 param->record_checksum+= (ha_checksum) record;
1087 }
1088 if (keypos != endpos)
1089 {
1090 _ma_check_print_error(param,
1091 "Keyblock size at page %s is not correct. "
1092 "Block length: %u key length: %u",
1093 llstr(anc_page->pos, llbuff), anc_page->size,
1094 (uint) (keypos - anc_page->buff));
1095 goto err;
1096 }
1097 stack_alloc_free(temp_buff, temp_buff_alloced);
1098 DBUG_RETURN(0);
1099 err:
1100 stack_alloc_free(temp_buff, temp_buff_alloced);
1101 DBUG_RETURN(1);
1102 } /* chk_index */
1103
1104
1105 /* Calculate a checksum of 1+2+3+4...N = N*(N+1)/2 without overflow */
1106
calc_checksum(ha_rows count)1107 static ha_checksum calc_checksum(ha_rows count)
1108 {
1109 ulonglong sum,a,b;
1110 DBUG_ENTER("calc_checksum");
1111
1112 sum=0;
1113 a=count; b=count+1;
1114 if (a & 1)
1115 b>>=1;
1116 else
1117 a>>=1;
1118 while (b)
1119 {
1120 if (b & 1)
1121 sum+=a;
1122 a<<=1; b>>=1;
1123 }
1124 DBUG_PRINT("exit",("sum: %lx",(ulong) sum));
1125 DBUG_RETURN((ha_checksum) sum);
1126 } /* calc_checksum */
1127
1128
1129 /* Calc length of key in normal isam */
1130
isam_key_length(MARIA_HA * info,register MARIA_KEYDEF * keyinfo)1131 static uint isam_key_length(MARIA_HA *info, register MARIA_KEYDEF *keyinfo)
1132 {
1133 uint length;
1134 HA_KEYSEG *keyseg;
1135 DBUG_ENTER("isam_key_length");
1136
1137 length= info->s->rec_reflength;
1138 for (keyseg=keyinfo->seg ; keyseg->type ; keyseg++)
1139 length+= keyseg->length;
1140
1141 DBUG_PRINT("exit",("length: %d",length));
1142 DBUG_RETURN(length);
1143 } /* key_length */
1144
1145
1146
record_pos_to_txt(MARIA_HA * info,my_off_t recpos,char * buff)1147 static char * record_pos_to_txt(MARIA_HA *info, my_off_t recpos,
1148 char *buff)
1149 {
1150 if (info->s->data_file_type != BLOCK_RECORD)
1151 llstr(recpos, buff);
1152 else
1153 {
1154 my_off_t page= ma_recordpos_to_page(recpos);
1155 uint row= ma_recordpos_to_dir_entry(recpos);
1156 char *end= longlong10_to_str(page, buff, 10);
1157 *(end++)= ':';
1158 longlong10_to_str(row, end, 10);
1159 }
1160 return buff;
1161 }
1162
1163
1164 /*
1165 Check that keys in records exist in index tree
1166
1167 SYNOPSIS
1168 check_keys_in_record()
1169 param Check paramenter
1170 info Maria handler
1171 extend Type of check (extended or normal)
1172 start_recpos Position to row
1173 record Record buffer
1174
1175 NOTES
1176 This function also calculates record checksum & number of rows
1177 */
1178
check_keys_in_record(HA_CHECK * param,MARIA_HA * info,int extend,my_off_t start_recpos,uchar * record)1179 static int check_keys_in_record(HA_CHECK *param, MARIA_HA *info, int extend,
1180 my_off_t start_recpos, uchar *record)
1181 {
1182 MARIA_SHARE *share= info->s;
1183 MARIA_KEYDEF *keyinfo;
1184 char llbuff[22+4];
1185 uint keynr;
1186
1187 param->tmp_record_checksum+= (ha_checksum) start_recpos;
1188 param->records++;
1189 if (param->records % WRITE_COUNT == 0)
1190 {
1191 if (param->testflag & T_WRITE_LOOP)
1192 {
1193 printf("%s\r", llstr(param->records, llbuff));
1194 fflush(stdout);
1195 }
1196 _ma_report_progress(param, param->records, share->state.state.records);
1197 }
1198
1199 /* Check if keys match the record */
1200 for (keynr=0, keyinfo= share->keyinfo; keynr < share->base.keys;
1201 keynr++, keyinfo++)
1202 {
1203 if (maria_is_key_active(share->state.key_map, keynr))
1204 {
1205 MARIA_KEY key;
1206 if (!(keyinfo->flag & HA_FULLTEXT))
1207 {
1208 (*keyinfo->make_key)(info, &key, keynr, info->lastkey_buff, record,
1209 start_recpos, 0);
1210 info->last_key.keyinfo= key.keyinfo;
1211 if (extend)
1212 {
1213 /* We don't need to lock the key tree here as we don't allow
1214 concurrent threads when running maria_chk
1215 */
1216 int search_result=
1217 #ifdef HAVE_RTREE_KEYS
1218 (keyinfo->flag & (HA_SPATIAL | HA_RTREE_INDEX)) ?
1219 maria_rtree_find_first(info, &key, MBR_EQUAL | MBR_DATA) :
1220 #endif
1221 _ma_search(info, &key, SEARCH_SAME, share->state.key_root[keynr]);
1222 if (search_result)
1223 {
1224 _ma_check_print_error(param,
1225 "Record at: %14s "
1226 "Can't find key for index: %2d",
1227 record_pos_to_txt(info, start_recpos,
1228 llbuff),
1229 keynr+1);
1230 if (param->testflag & T_VERBOSE)
1231 _ma_print_key(stdout, &key);
1232 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1233 return -1;
1234 }
1235 }
1236 else
1237 param->tmp_key_crc[keynr]+=
1238 maria_byte_checksum(key.data, key.data_length);
1239 }
1240 }
1241 }
1242 return 0;
1243 }
1244
1245
1246 /*
1247 Functions to loop through all rows and check if they are ok
1248
1249 NOTES
1250 One function for each record format
1251
1252 RESULT
1253 0 ok
1254 -1 Interrupted by user
1255 1 Error
1256 */
1257
check_static_record(HA_CHECK * param,MARIA_HA * info,int extend,uchar * record)1258 static int check_static_record(HA_CHECK *param, MARIA_HA *info, int extend,
1259 uchar *record)
1260 {
1261 MARIA_SHARE *share= info->s;
1262 my_off_t start_recpos, pos;
1263 char llbuff[22];
1264
1265 pos= 0;
1266 while (pos < share->state.state.data_file_length)
1267 {
1268 if (_ma_killed_ptr(param))
1269 return -1;
1270 if (my_b_read(¶m->read_cache, record,
1271 share->base.pack_reclength))
1272 {
1273 _ma_check_print_error(param,
1274 "got error: %d when reading datafile at position: "
1275 "%s",
1276 my_errno, llstr(pos, llbuff));
1277 return 1;
1278 }
1279 start_recpos= pos;
1280 pos+= share->base.pack_reclength;
1281 param->splits++;
1282 if (*record == '\0')
1283 {
1284 param->del_blocks++;
1285 param->del_length+= share->base.pack_reclength;
1286 continue; /* Record removed */
1287 }
1288 param->glob_crc+= _ma_static_checksum(info,record);
1289 param->used+= share->base.pack_reclength;
1290 if (check_keys_in_record(param, info, extend, start_recpos, record))
1291 return 1;
1292 }
1293 return 0;
1294 }
1295
1296
check_dynamic_record(HA_CHECK * param,MARIA_HA * info,int extend,uchar * record)1297 static int check_dynamic_record(HA_CHECK *param, MARIA_HA *info, int extend,
1298 uchar *record)
1299 {
1300 MARIA_BLOCK_INFO block_info;
1301 MARIA_SHARE *share= info->s;
1302 my_off_t UNINIT_VAR(start_recpos), start_block, pos;
1303 uchar *UNINIT_VAR(to);
1304 ulong UNINIT_VAR(left_length);
1305 uint b_type;
1306 char llbuff[22],llbuff2[22],llbuff3[22];
1307 myf myflag= MY_WME | (share->temporary ? MY_THREAD_SPECIFIC : 0);
1308 DBUG_ENTER("check_dynamic_record");
1309
1310 pos= 0;
1311 while (pos < share->state.state.data_file_length)
1312 {
1313 my_bool got_error= 0;
1314 int flag;
1315 if (_ma_killed_ptr(param))
1316 DBUG_RETURN(-1);
1317
1318 flag= block_info.second_read=0;
1319 block_info.next_filepos=pos;
1320 do
1321 {
1322 if (_ma_read_cache(info, ¶m->read_cache, block_info.header,
1323 (start_block=block_info.next_filepos),
1324 sizeof(block_info.header),
1325 (flag ? 0 : READING_NEXT) | READING_HEADER))
1326 {
1327 _ma_check_print_error(param,
1328 "got error: %d when reading datafile at "
1329 "position: %s",
1330 my_errno, llstr(start_block, llbuff));
1331 DBUG_RETURN(1);
1332 }
1333
1334 if (start_block & (MARIA_DYN_ALIGN_SIZE-1))
1335 {
1336 _ma_check_print_error(param,"Wrong aligned block at %s",
1337 llstr(start_block,llbuff));
1338 DBUG_RETURN(1);
1339 }
1340 b_type= _ma_get_block_info(info, &block_info,-1,start_block);
1341 if (b_type & (BLOCK_DELETED | BLOCK_ERROR | BLOCK_SYNC_ERROR |
1342 BLOCK_FATAL_ERROR))
1343 {
1344 if (b_type & BLOCK_SYNC_ERROR)
1345 {
1346 if (flag)
1347 {
1348 _ma_check_print_error(param,"Unexpected byte: %d at link: %s",
1349 (int) block_info.header[0],
1350 llstr(start_block,llbuff));
1351 DBUG_RETURN(1);
1352 }
1353 pos=block_info.filepos+block_info.block_len;
1354 goto next;
1355 }
1356 if (b_type & BLOCK_DELETED)
1357 {
1358 if (block_info.block_len < share->base.min_block_length)
1359 {
1360 _ma_check_print_error(param,
1361 "Deleted block with impossible length %lu "
1362 "at %s",
1363 block_info.block_len,llstr(pos,llbuff));
1364 DBUG_RETURN(1);
1365 }
1366 if ((block_info.next_filepos != HA_OFFSET_ERROR &&
1367 block_info.next_filepos >= share->state.state.data_file_length) ||
1368 (block_info.prev_filepos != HA_OFFSET_ERROR &&
1369 block_info.prev_filepos >= share->state.state.data_file_length))
1370 {
1371 _ma_check_print_error(param,"Delete link points outside datafile "
1372 "at %s",
1373 llstr(pos,llbuff));
1374 DBUG_RETURN(1);
1375 }
1376 param->del_blocks++;
1377 param->del_length+= block_info.block_len;
1378 param->splits++;
1379 pos= block_info.filepos+block_info.block_len;
1380 goto next;
1381 }
1382 _ma_check_print_error(param,"Wrong bytesec: %d-%d-%d at linkstart: %s",
1383 block_info.header[0],block_info.header[1],
1384 block_info.header[2],
1385 llstr(start_block,llbuff));
1386 DBUG_RETURN(1);
1387 }
1388 if (share->state.state.data_file_length < block_info.filepos+
1389 block_info.block_len)
1390 {
1391 _ma_check_print_error(param,
1392 "Recordlink that points outside datafile at %s",
1393 llstr(pos,llbuff));
1394 got_error=1;
1395 break;
1396 }
1397 param->splits++;
1398 if (!flag++) /* First block */
1399 {
1400 start_recpos=pos;
1401 pos=block_info.filepos+block_info.block_len;
1402 if (block_info.rec_len > (uint) share->base.max_pack_length)
1403 {
1404 my_errno= HA_ERR_WRONG_IN_RECORD;
1405 _ma_check_print_error(param,"Found too long record (%lu) at %s",
1406 (ulong) block_info.rec_len,
1407 llstr(start_recpos,llbuff));
1408 got_error=1;
1409 break;
1410 }
1411 if (share->base.blobs)
1412 {
1413 if (_ma_alloc_buffer(&info->rec_buff, &info->rec_buff_size,
1414 block_info.rec_len +
1415 share->base.extra_rec_buff_size, myflag))
1416
1417 {
1418 _ma_check_print_error(param,
1419 "Not enough memory (%lu) for blob at %s",
1420 (ulong) block_info.rec_len,
1421 llstr(start_recpos,llbuff));
1422 got_error=1;
1423 break;
1424 }
1425 }
1426 to= info->rec_buff;
1427 left_length= block_info.rec_len;
1428 }
1429 if (left_length < block_info.data_len)
1430 {
1431 _ma_check_print_error(param,"Found too long record (%lu) at %s",
1432 (ulong) block_info.data_len,
1433 llstr(start_recpos,llbuff));
1434 got_error=1;
1435 break;
1436 }
1437 if (_ma_read_cache(info, ¶m->read_cache, to, block_info.filepos,
1438 (uint) block_info.data_len,
1439 flag == 1 ? READING_NEXT : 0))
1440 {
1441 _ma_check_print_error(param,
1442 "got error: %d when reading datafile at "
1443 "position: %s", my_errno,
1444 llstr(block_info.filepos, llbuff));
1445
1446 DBUG_RETURN(1);
1447 }
1448 to+=block_info.data_len;
1449 param->link_used+= block_info.filepos-start_block;
1450 param->used+= block_info.filepos - start_block + block_info.data_len;
1451 param->empty+= block_info.block_len-block_info.data_len;
1452 left_length-= block_info.data_len;
1453 if (left_length)
1454 {
1455 if (b_type & BLOCK_LAST)
1456 {
1457 _ma_check_print_error(param,
1458 "Wrong record length %s of %s at %s",
1459 llstr(block_info.rec_len-left_length,llbuff),
1460 llstr(block_info.rec_len, llbuff2),
1461 llstr(start_recpos,llbuff3));
1462 got_error=1;
1463 break;
1464 }
1465 if (share->state.state.data_file_length < block_info.next_filepos)
1466 {
1467 _ma_check_print_error(param,
1468 "Found next-recordlink that points outside "
1469 "datafile at %s",
1470 llstr(block_info.filepos,llbuff));
1471 got_error=1;
1472 break;
1473 }
1474 }
1475 } while (left_length);
1476
1477 if (! got_error)
1478 {
1479 if (_ma_rec_unpack(info,record,info->rec_buff,block_info.rec_len) ==
1480 MY_FILE_ERROR)
1481 {
1482 _ma_check_print_error(param,"Found wrong record at %s",
1483 llstr(start_recpos,llbuff));
1484 got_error=1;
1485 }
1486 else
1487 {
1488 ha_checksum checksum= 0;
1489 if (share->calc_checksum)
1490 checksum= (*share->calc_checksum)(info, record);
1491
1492 if (param->testflag & (T_EXTEND | T_MEDIUM | T_VERBOSE))
1493 {
1494 if (_ma_rec_check(info,record, info->rec_buff,block_info.rec_len,
1495 MY_TEST(share->calc_checksum), checksum))
1496 {
1497 _ma_check_print_error(param,"Found wrong packed record at %s",
1498 llstr(start_recpos,llbuff));
1499 got_error= 1;
1500 }
1501 }
1502 param->glob_crc+= checksum;
1503 }
1504
1505 if (! got_error)
1506 {
1507 if (check_keys_in_record(param, info, extend, start_recpos, record))
1508 DBUG_RETURN(1);
1509 }
1510 else
1511 {
1512 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1513 DBUG_RETURN(1);
1514 }
1515 }
1516 else if (!flag)
1517 pos= block_info.filepos+block_info.block_len;
1518 next:;
1519 }
1520 DBUG_RETURN(0);
1521 }
1522
1523
check_compressed_record(HA_CHECK * param,MARIA_HA * info,int extend,uchar * record)1524 static int check_compressed_record(HA_CHECK *param, MARIA_HA *info, int extend,
1525 uchar *record)
1526 {
1527 MARIA_BLOCK_INFO block_info;
1528 MARIA_SHARE *share= info->s;
1529 my_off_t start_recpos, pos;
1530 char llbuff[22];
1531 my_bool got_error= 0;
1532 DBUG_ENTER("check_compressed_record");
1533
1534 pos= share->pack.header_length; /* Skip header */
1535 while (pos < share->state.state.data_file_length)
1536 {
1537 if (_ma_killed_ptr(param))
1538 DBUG_RETURN(-1);
1539
1540 if (_ma_read_cache(info, ¶m->read_cache, block_info.header, pos,
1541 share->pack.ref_length, READING_NEXT))
1542 {
1543 _ma_check_print_error(param,
1544 "got error: %d when reading datafile at position: "
1545 "%s",
1546 my_errno, llstr(pos, llbuff));
1547 DBUG_RETURN(1);
1548 }
1549
1550 start_recpos= pos;
1551 param->splits++;
1552 _ma_pack_get_block_info(info, &info->bit_buff, &block_info,
1553 &info->rec_buff, &info->rec_buff_size, -1,
1554 start_recpos);
1555 pos=block_info.filepos+block_info.rec_len;
1556 if (block_info.rec_len < (uint) share->min_pack_length ||
1557 block_info.rec_len > (uint) share->max_pack_length)
1558 {
1559 _ma_check_print_error(param,
1560 "Found block with wrong recordlength: %lu at %s",
1561 block_info.rec_len, llstr(start_recpos,llbuff));
1562 got_error=1;
1563 goto end;
1564 }
1565 if (_ma_read_cache(info, ¶m->read_cache, info->rec_buff,
1566 block_info.filepos, block_info.rec_len, READING_NEXT))
1567 {
1568 _ma_check_print_error(param,
1569 "got error: %d when reading datafile at position: "
1570 "%s",
1571 my_errno, llstr(block_info.filepos, llbuff));
1572 DBUG_RETURN(1);
1573 }
1574 info->rec_buff[block_info.rec_len]= 0; /* Keep valgrind happy */
1575 if (_ma_pack_rec_unpack(info, &info->bit_buff, record,
1576 info->rec_buff, block_info.rec_len))
1577 {
1578 _ma_check_print_error(param,"Found wrong record at %s",
1579 llstr(start_recpos,llbuff));
1580 got_error=1;
1581 goto end;
1582 }
1583 param->glob_crc+= (*share->calc_checksum)(info,record);
1584 param->link_used+= (block_info.filepos - start_recpos);
1585 param->used+= (pos-start_recpos);
1586
1587 end:
1588 if (! got_error)
1589 {
1590 if (check_keys_in_record(param, info, extend, start_recpos, record))
1591 DBUG_RETURN(1);
1592 }
1593 else
1594 {
1595 got_error= 0; /* Reset for next loop */
1596 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1597 DBUG_RETURN(1);
1598 }
1599 }
1600 DBUG_RETURN(0);
1601 }
1602
1603
1604 /*
1605 Check if layout on head or tail page is ok
1606
1607 NOTES
1608 This is for rows-in-block format.
1609 */
1610
check_page_layout(HA_CHECK * param,MARIA_HA * info,my_off_t page_pos,uchar * page,uint row_count,uint head_empty,uint * real_rows_found,uint * free_slots_found)1611 static int check_page_layout(HA_CHECK *param, MARIA_HA *info,
1612 my_off_t page_pos, uchar *page,
1613 uint row_count, uint head_empty,
1614 uint *real_rows_found, uint *free_slots_found)
1615 {
1616 uint empty, last_row_end, row, first_dir_entry, free_entry, block_size;
1617 uint free_entries, prev_free_entry;
1618 uchar *dir_entry;
1619 char llbuff[22];
1620 my_bool error_in_free_list= 0;
1621 DBUG_ENTER("check_page_layout");
1622
1623 block_size= info->s->block_size;
1624 empty= 0;
1625 last_row_end= PAGE_HEADER_SIZE(info->s);
1626 *real_rows_found= 0;
1627
1628 /* Check free directory list */
1629 free_entry= (uint) page[DIR_FREE_OFFSET];
1630 free_entries= 0;
1631 prev_free_entry= END_OF_DIR_FREE_LIST;
1632 while (free_entry != END_OF_DIR_FREE_LIST)
1633 {
1634 uchar *dir;
1635 if (free_entry > row_count)
1636 {
1637 _ma_check_print_error(param,
1638 "Page %9s: Directory free entry points outside "
1639 "directory",
1640 llstr(page_pos, llbuff));
1641 error_in_free_list= 1;
1642 break;
1643 }
1644 dir= dir_entry_pos(page, block_size, free_entry);
1645 if (uint2korr(dir) != 0)
1646 {
1647 _ma_check_print_error(param,
1648 "Page %9s: Directory free entry points to "
1649 "not deleted entry",
1650 llstr(page_pos, llbuff));
1651 error_in_free_list= 1;
1652 break;
1653 }
1654 if (dir[2] != prev_free_entry)
1655 {
1656 _ma_check_print_error(param,
1657 "Page %9s: Directory free list back pointer "
1658 "points to wrong entry",
1659 llstr(page_pos, llbuff));
1660 error_in_free_list= 1;
1661 break;
1662 }
1663 prev_free_entry= free_entry;
1664 free_entry= dir[3];
1665 free_entries++;
1666 }
1667 *free_slots_found= free_entries;
1668
1669 /* Check directry */
1670 dir_entry= page+ block_size - PAGE_SUFFIX_SIZE;
1671 first_dir_entry= (block_size - row_count * DIR_ENTRY_SIZE -
1672 PAGE_SUFFIX_SIZE);
1673 for (row= 0 ; row < row_count ; row++)
1674 {
1675 uint pos, length;
1676 dir_entry-= DIR_ENTRY_SIZE;
1677 pos= uint2korr(dir_entry);
1678 if (!pos)
1679 {
1680 free_entries--;
1681 if (row == row_count -1)
1682 {
1683 _ma_check_print_error(param,
1684 "Page %9s: First entry in directory is 0",
1685 llstr(page_pos, llbuff));
1686 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1687 DBUG_RETURN(1);
1688 }
1689 continue; /* Deleted row */
1690 }
1691 (*real_rows_found)++;
1692 length= uint2korr(dir_entry+2);
1693 param->used+= length;
1694 if (pos < last_row_end)
1695 {
1696 _ma_check_print_error(param,
1697 "Page %9s: Row %3u overlapps with previous row",
1698 llstr(page_pos, llbuff), row);
1699 DBUG_RETURN(1);
1700 }
1701 empty+= (pos - last_row_end);
1702 last_row_end= pos + length;
1703 if (last_row_end > first_dir_entry)
1704 {
1705 _ma_check_print_error(param,
1706 "Page %9s: Row %3u overlapps with directory",
1707 llstr(page_pos, llbuff), row);
1708 DBUG_RETURN(1);
1709 }
1710 }
1711 empty+= (first_dir_entry - last_row_end);
1712
1713 if (empty != head_empty)
1714 {
1715 _ma_check_print_error(param,
1716 "Page %9s: Wrong empty size. Stored: %5u "
1717 "Actual: %5u",
1718 llstr(page_pos, llbuff), head_empty, empty);
1719 param->err_count++;
1720 }
1721 if (free_entries != 0 && !error_in_free_list)
1722 {
1723 _ma_check_print_error(param,
1724 "Page %9s: Directory free link don't include "
1725 "all free entries",
1726 llstr(page_pos, llbuff));
1727 param->err_count++;
1728 }
1729 DBUG_RETURN(param->err_count &&
1730 (param->err_count >= MAXERR || !(param->testflag & T_VERBOSE)));
1731 }
1732
1733
1734 /*
1735 Check all rows on head page
1736
1737 NOTES
1738 This is for rows-in-block format.
1739
1740 Before this, we have already called check_page_layout(), so
1741 we know the block is logicaly correct (even if the rows may not be that)
1742
1743 RETURN
1744 0 ok
1745 1 error
1746 */
1747
1748
check_head_page(HA_CHECK * param,MARIA_HA * info,uchar * record,int extend,my_off_t page_pos,uchar * page_buff,uint row_count)1749 static my_bool check_head_page(HA_CHECK *param, MARIA_HA *info, uchar *record,
1750 int extend, my_off_t page_pos, uchar *page_buff,
1751 uint row_count)
1752 {
1753 MARIA_SHARE *share= info->s;
1754 uchar *dir_entry;
1755 uint row;
1756 char llbuff[22], llbuff2[22];
1757 ulonglong page= page_pos / share->block_size;
1758 DBUG_ENTER("check_head_page");
1759
1760 dir_entry= page_buff+ share->block_size - PAGE_SUFFIX_SIZE;
1761 for (row= 0 ; row < row_count ; row++)
1762 {
1763 uint pos, length, flag;
1764 dir_entry-= DIR_ENTRY_SIZE;
1765 pos= uint2korr(dir_entry);
1766 if (!pos)
1767 continue;
1768 length= uint2korr(dir_entry+2);
1769 if (length < share->base.min_block_length)
1770 {
1771 _ma_check_print_error(param,
1772 "Page %9s: Row %3u is too short "
1773 "(%d of min %d bytes)",
1774 llstr(page, llbuff), row, length,
1775 (uint) share->base.min_block_length);
1776 DBUG_RETURN(1);
1777 }
1778 flag= (uint) (uchar) page_buff[pos];
1779 if (flag & ~(ROW_FLAG_ALL))
1780 _ma_check_print_error(param,
1781 "Page %9s: Row %3u has wrong flag: %u",
1782 llstr(page, llbuff), row, flag);
1783
1784 DBUG_PRINT("info", ("rowid: %s page: %lu row: %u",
1785 llstr(ma_recordpos(page, row), llbuff),
1786 (ulong) page, row));
1787 info->cur_row.trid= 0;
1788 if (_ma_read_block_record2(info, record, page_buff+pos,
1789 page_buff+pos+length))
1790 {
1791 _ma_check_print_error(param,
1792 "Page %9s: Row %3d is crashed",
1793 llstr(page, llbuff), row);
1794 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1795 DBUG_RETURN(1);
1796 continue;
1797 }
1798 set_if_bigger(param->max_found_trid, info->cur_row.trid);
1799 if (info->cur_row.trid > param->max_trid)
1800 _ma_check_print_not_visible_error(param, info->cur_row.trid);
1801
1802 if (share->calc_checksum)
1803 {
1804 ha_checksum checksum= (*share->calc_checksum)(info, record);
1805 if (info->cur_row.checksum != (checksum & 255))
1806 _ma_check_print_error(param, "Page %9s: Row %3d has wrong checksum",
1807 llstr(page, llbuff), row);
1808 param->glob_crc+= checksum;
1809 }
1810 if (info->cur_row.extents_count)
1811 {
1812 uchar *extents= info->cur_row.extents;
1813 uint i;
1814 /* Check that bitmap has the right marker for the found extents */
1815 for (i= 0 ; i < info->cur_row.extents_count ; i++)
1816 {
1817 pgcache_page_no_t extent_page;
1818 uint page_count, page_type;
1819 extent_page= uint5korr(extents);
1820 page_count= uint2korr(extents+5) & ~START_EXTENT_BIT;
1821 extents+= ROW_EXTENT_SIZE;
1822 page_type= BLOB_PAGE;
1823 if (page_count & TAIL_BIT)
1824 {
1825 page_count= 1;
1826 page_type= TAIL_PAGE;
1827 }
1828 /*
1829 TODO OPTIMIZE:
1830 Check the whole extent with one test and only do the loop if
1831 something is wrong (for exact error reporting)
1832 */
1833 for ( ; page_count--; extent_page++)
1834 {
1835 uint bitmap_pattern;
1836 if (_ma_check_if_right_bitmap_type(info, page_type, extent_page,
1837 &bitmap_pattern))
1838 {
1839 _ma_check_print_error(param,
1840 "Page %9s: Row: %3d has an extent with "
1841 "wrong information in bitmap: "
1842 "Page: %9s Page_type: %d Bitmap: %d",
1843 llstr(page, llbuff), row,
1844 llstr(extent_page, llbuff2),
1845 page_type, bitmap_pattern);
1846 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1847 DBUG_RETURN(1);
1848 }
1849 }
1850 }
1851 }
1852 param->full_page_count+= info->cur_row.full_page_count;
1853 param->tail_count+= info->cur_row.tail_count;
1854 if (check_keys_in_record(param, info, extend,
1855 ma_recordpos(page, row), record))
1856 DBUG_RETURN(1);
1857 }
1858 DBUG_RETURN(0);
1859 }
1860
1861
1862 /*
1863 Check if rows-in-block data file is consistent
1864 */
1865
check_block_record(HA_CHECK * param,MARIA_HA * info,int extend,uchar * record)1866 static int check_block_record(HA_CHECK *param, MARIA_HA *info, int extend,
1867 uchar *record)
1868 {
1869 MARIA_SHARE *share= info->s;
1870 my_off_t pos;
1871 pgcache_page_no_t page;
1872 uchar *page_buff, *bitmap_buff, *data;
1873 char llbuff[22], llbuff2[22];
1874 uint block_size= share->block_size;
1875 ha_rows full_page_count, tail_count;
1876 my_bool UNINIT_VAR(full_dir), now_transactional;
1877 uint offset_page, offset, free_count;
1878 LSN lsn;
1879
1880 if (_ma_scan_init_block_record(info))
1881 {
1882 _ma_check_print_error(param, "got error %d when initializing scan",
1883 my_errno);
1884 return 1;
1885 }
1886
1887 now_transactional= info->s->now_transactional;
1888 info->s->now_transactional= 0; /* Don't log changes */
1889
1890 bitmap_buff= info->scan.bitmap_buff;
1891 page_buff= info->scan.page_buff;
1892 full_page_count= tail_count= 0;
1893 param->full_page_count= param->tail_count= 0;
1894 param->used= param->link_used= 0;
1895 param->splits= share->state.state.data_file_length / block_size;
1896
1897 for (pos= 0, page= 0;
1898 pos < share->state.state.data_file_length;
1899 pos+= block_size, page++)
1900 {
1901 uint UNINIT_VAR(row_count), real_row_count, UNINIT_VAR(empty_space),
1902 page_type, bitmap_pattern;
1903 uint bitmap_for_page;
1904
1905 if (_ma_killed_ptr(param))
1906 {
1907 _ma_scan_end_block_record(info);
1908 info->s->now_transactional= now_transactional;
1909 return -1; /* Interrupted */
1910 }
1911 if ((page % share->bitmap.pages_covered) == 0)
1912 {
1913 /* Bitmap page */
1914 if (pagecache_read(share->pagecache,
1915 &info->s->bitmap.file,
1916 page, 1,
1917 bitmap_buff,
1918 PAGECACHE_PLAIN_PAGE,
1919 PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == 0)
1920 {
1921 _ma_check_print_error(param,
1922 "Page %9s: Got error: %d when reading datafile",
1923 llstr(page, llbuff), my_errno);
1924 goto err;
1925 }
1926 param->used+= block_size;
1927 param->link_used+= block_size;
1928 if (param->verbose > 2)
1929 print_bitmap_description(share, page, bitmap_buff);
1930 continue;
1931 }
1932 /* Skip pages marked as empty in bitmap */
1933 offset_page= (uint) ((page % share->bitmap.pages_covered) -1) * 3;
1934 offset= offset_page & 7;
1935 data= bitmap_buff + offset_page / 8;
1936 bitmap_pattern= uint2korr(data);
1937 if (!(bitmap_for_page= ((bitmap_pattern >> offset) & 7)))
1938 {
1939 param->empty+= block_size;
1940 param->del_blocks++;
1941 continue;
1942 }
1943
1944 if (pagecache_read(share->pagecache,
1945 &info->dfile,
1946 page, 1,
1947 page_buff,
1948 share->page_type,
1949 PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == 0)
1950 {
1951 _ma_check_print_error(param,
1952 "Page %9s: Got error: %d when reading datafile",
1953 llstr(page, llbuff), my_errno);
1954 goto err;
1955 }
1956 page_type= page_buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK;
1957 if (page_type == UNALLOCATED_PAGE || page_type >= MAX_PAGE_TYPE)
1958 {
1959 _ma_check_print_error(param,
1960 "Page: %9s Found wrong page type %d. Bitmap: %d '%s'",
1961 llstr(page, llbuff), page_type,
1962 bitmap_for_page, bits_to_txt[bitmap_for_page]);
1963 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
1964 goto err;
1965 continue;
1966 }
1967 switch ((enum en_page_type) page_type) {
1968 case UNALLOCATED_PAGE:
1969 case MAX_PAGE_TYPE:
1970 default:
1971 DBUG_ASSERT(0); /* Impossible */
1972 break;
1973 case HEAD_PAGE:
1974 row_count= page_buff[DIR_COUNT_OFFSET];
1975 empty_space= uint2korr(page_buff + EMPTY_SPACE_OFFSET);
1976 param->used+= block_size - empty_space;
1977 param->link_used+= (PAGE_HEADER_SIZE(info->s) + PAGE_SUFFIX_SIZE +
1978 row_count * DIR_ENTRY_SIZE);
1979 if (empty_space < share->bitmap.sizes[3])
1980 param->lost+= empty_space;
1981 if (check_page_layout(param, info, pos, page_buff, row_count,
1982 empty_space, &real_row_count, &free_count))
1983 goto err;
1984 full_dir= (row_count == MAX_ROWS_PER_PAGE &&
1985 page_buff[DIR_FREE_OFFSET] == END_OF_DIR_FREE_LIST);
1986 break;
1987 case TAIL_PAGE:
1988 row_count= page_buff[DIR_COUNT_OFFSET];
1989 empty_space= uint2korr(page_buff + EMPTY_SPACE_OFFSET);
1990 param->used+= block_size - empty_space;
1991 param->link_used+= (PAGE_HEADER_SIZE(info->s) + PAGE_SUFFIX_SIZE +
1992 row_count * DIR_ENTRY_SIZE);
1993 if (empty_space < share->bitmap.sizes[6])
1994 param->lost+= empty_space;
1995 if (check_page_layout(param, info, pos, page_buff, row_count,
1996 empty_space, &real_row_count, &free_count))
1997 goto err;
1998 full_dir= (row_count - free_count >= MAX_ROWS_PER_PAGE -
1999 share->base.blobs);
2000 break;
2001 case BLOB_PAGE:
2002 full_page_count++;
2003 full_dir= 0;
2004 empty_space= block_size; /* for error reporting */
2005 param->link_used+= FULL_PAGE_HEADER_SIZE(info->s);
2006 param->used+= block_size;
2007 break;
2008 }
2009 if (_ma_check_bitmap_data(info, page_type,
2010 full_dir ? 0 : empty_space,
2011 bitmap_for_page))
2012 {
2013 _ma_check_print_error(param,
2014 "Page %9s: Wrong data in bitmap. Page_type: "
2015 "%d full: %d empty_space: %u Bitmap-bits: %d "
2016 "'%s'",
2017 llstr(page, llbuff), page_type, full_dir,
2018 empty_space, bitmap_for_page,
2019 bits_to_txt[bitmap_for_page]);
2020 if (param->err_count++ > MAXERR || !(param->testflag & T_VERBOSE))
2021 goto err;
2022 }
2023 if (share->base.born_transactional)
2024 {
2025 lsn= lsn_korr(page_buff);
2026 if ((ulonglong) lsn > param->max_allowed_lsn)
2027 {
2028 /* Avoid flooding of errors */
2029 if (param->skip_lsn_error_count++ < MAX_LSN_ERRORS)
2030 {
2031 _ma_check_print_error(param,
2032 "Page %9s: Wrong LSN " LSN_FMT ". Current "
2033 "LSN is " LSN_FMT,
2034 llstr(page, llbuff),
2035 LSN_IN_PARTS(lsn),
2036 LSN_IN_PARTS(param->max_allowed_lsn));
2037 }
2038 }
2039 }
2040 if ((enum en_page_type) page_type == BLOB_PAGE)
2041 continue;
2042 param->empty+= empty_space;
2043 if ((enum en_page_type) page_type == TAIL_PAGE)
2044 {
2045 tail_count+= real_row_count;
2046 continue;
2047 }
2048 if (check_head_page(param, info, record, extend, pos, page_buff,
2049 row_count))
2050 goto err;
2051 }
2052
2053 /* Verify that rest of bitmap is zero */
2054
2055 if (page % share->bitmap.pages_covered)
2056 {
2057 /* Not at end of bitmap */
2058 uint bitmap_pattern;
2059 uint byte_offset;
2060
2061 offset_page= (uint) ((page % share->bitmap.pages_covered) -1) * 3;
2062 offset= offset_page & 7;
2063 byte_offset= offset_page / 8;
2064 data= bitmap_buff + byte_offset;
2065 bitmap_pattern= uint2korr(data);
2066 if (byte_offset + 1 == share->bitmap.max_total_size)
2067 {
2068 /* On last byte of bitmap; Remove possible checksum */
2069 bitmap_pattern&= 0xff;
2070 }
2071 if (((bitmap_pattern >> offset)) ||
2072 (byte_offset + 2 < share->bitmap.max_total_size &&
2073 _ma_check_if_zero(data+2, share->bitmap.max_total_size -
2074 byte_offset - 2)))
2075 {
2076 ulonglong bitmap_page;
2077 bitmap_page= page / share->bitmap.pages_covered;
2078 bitmap_page*= share->bitmap.pages_covered;
2079
2080 _ma_check_print_error(param,
2081 "Bitmap at page %s has pages reserved outside of "
2082 "data file length",
2083 llstr(bitmap_page, llbuff));
2084 DBUG_EXECUTE("bitmap", _ma_print_bitmap(&share->bitmap, bitmap_buff,
2085 bitmap_page););
2086 }
2087 }
2088
2089 _ma_scan_end_block_record(info);
2090
2091 if (full_page_count != param->full_page_count)
2092 _ma_check_print_error(param, "Full page count read through records was %s "
2093 "but we found %s pages while scanning table",
2094 llstr(param->full_page_count, llbuff),
2095 llstr(full_page_count, llbuff2));
2096 if (tail_count != param->tail_count)
2097 _ma_check_print_error(param, "Tail count read through records was %s but "
2098 "we found %s tails while scanning table",
2099 llstr(param->tail_count, llbuff),
2100 llstr(tail_count, llbuff2));
2101
2102 info->s->now_transactional= now_transactional;
2103 return param->error_printed != 0;
2104
2105 err:
2106 _ma_scan_end_block_record(info);
2107 info->s->now_transactional= now_transactional;
2108 return 1;
2109 }
2110
2111
2112 /* Check that record-link is ok */
2113
maria_chk_data_link(HA_CHECK * param,MARIA_HA * info,my_bool extend)2114 int maria_chk_data_link(HA_CHECK *param, MARIA_HA *info, my_bool extend)
2115 {
2116 MARIA_SHARE *share= info->s;
2117 int error;
2118 uchar *record;
2119 char llbuff[22],llbuff2[22],llbuff3[22];
2120 DBUG_ENTER("maria_chk_data_link");
2121
2122 if (!(param->testflag & T_SILENT))
2123 {
2124 if (extend)
2125 puts("- check records and index references");
2126 else
2127 puts("- check record links");
2128 }
2129
2130 if (!(record= (uchar*) my_malloc(PSI_INSTRUMENT_ME,
2131 share->base.default_rec_buff_size, MYF(0))))
2132 {
2133 _ma_check_print_error(param,"Not enough memory for record");
2134 DBUG_RETURN(-1);
2135 }
2136 param->records= param->del_blocks= 0;
2137 param->used= param->link_used= param->splits= param->del_length= 0;
2138 param->lost= 0;
2139 param->tmp_record_checksum= param->glob_crc= 0;
2140 param->err_count= 0;
2141
2142 error= 0;
2143 param->empty= share->pack.header_length;
2144
2145 bzero((char*) param->tmp_key_crc,
2146 share->base.keys * sizeof(param->tmp_key_crc[0]));
2147
2148 info->in_check_table= 1; /* Don't assert on checksum errors */
2149
2150 switch (share->data_file_type) {
2151 case BLOCK_RECORD:
2152 error= check_block_record(param, info, extend, record);
2153 break;
2154 case STATIC_RECORD:
2155 error= check_static_record(param, info, extend, record);
2156 break;
2157 case DYNAMIC_RECORD:
2158 error= check_dynamic_record(param, info, extend, record);
2159 break;
2160 case COMPRESSED_RECORD:
2161 error= check_compressed_record(param, info, extend, record);
2162 break;
2163 case NO_RECORD:
2164 param->records= share->state.state.records;
2165 param->record_checksum= 0;
2166 extend= 1; /* No row checksums */
2167 /* no data, nothing to do */
2168 break;
2169 } /* switch */
2170
2171 info->in_check_table= 0;
2172
2173 if (error)
2174 goto err;
2175
2176 if (param->testflag & T_WRITE_LOOP)
2177 {
2178 fputs(" \r",stdout);
2179 fflush(stdout);
2180 }
2181 if (param->records != share->state.state.records)
2182 {
2183 _ma_check_print_error(param,
2184 "Record-count is not ok; found %-10s Should be: %s",
2185 llstr(param->records,llbuff),
2186 llstr(share->state.state.records,llbuff2));
2187 error=1;
2188 }
2189 if (param->record_checksum &&
2190 param->record_checksum != param->tmp_record_checksum)
2191 {
2192 _ma_check_print_error(param,
2193 "Key pointers and record positions doesn't match");
2194 error=1;
2195 }
2196 if (param->glob_crc != share->state.state.checksum &&
2197 (share->options &
2198 (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)))
2199 {
2200 _ma_check_print_warning(param,
2201 "Record checksum is not the same as checksum "
2202 "stored in the index file");
2203 error=1;
2204 }
2205 if (!extend)
2206 {
2207 uint key;
2208 for (key=0 ; key < share->base.keys; key++)
2209 {
2210 if (param->tmp_key_crc[key] != param->key_crc[key] &&
2211 !(share->keyinfo[key].flag &
2212 (HA_FULLTEXT | HA_SPATIAL | HA_RTREE_INDEX)))
2213 {
2214 _ma_check_print_error(param,"Checksum for key: %2d doesn't match "
2215 "checksum for records",
2216 key+1);
2217 error=1;
2218 }
2219 }
2220 }
2221
2222 if (param->del_length != share->state.state.empty)
2223 {
2224 _ma_check_print_warning(param,
2225 "Found %s deleted space. Should be %s",
2226 llstr(param->del_length,llbuff2),
2227 llstr(share->state.state.empty,llbuff));
2228 }
2229 /* Skip following checks for BLOCK RECORD as they don't make any sence */
2230 if (share->data_file_type != BLOCK_RECORD)
2231 {
2232 if (param->used + param->empty + param->del_length !=
2233 share->state.state.data_file_length)
2234 {
2235 _ma_check_print_warning(param,
2236 "Found %s record data and %s unused data and %s "
2237 "deleted data",
2238 llstr(param->used, llbuff),
2239 llstr(param->empty,llbuff2),
2240 llstr(param->del_length,llbuff3));
2241 _ma_check_print_warning(param,
2242 "Total %s Should be: %s",
2243 llstr((param->used+param->empty +
2244 param->del_length), llbuff),
2245 llstr(share->state.state.data_file_length,
2246 llbuff2));
2247 }
2248 if (param->del_blocks != share->state.state.del)
2249 {
2250 _ma_check_print_warning(param,
2251 "Found %10s deleted blocks. Should be: %s",
2252 llstr(param->del_blocks,llbuff),
2253 llstr(share->state.state.del,llbuff2));
2254 }
2255 if (param->splits != share->state.split)
2256 {
2257 _ma_check_print_warning(param,
2258 "Found %10s parts. Should be: %s",
2259 llstr(param->splits, llbuff),
2260 llstr(share->state.split,llbuff2));
2261 }
2262 }
2263 if (param->testflag & T_INFO)
2264 {
2265 if (param->warning_printed || param->error_printed)
2266 puts("");
2267 if (param->used != 0 && ! param->error_printed)
2268 {
2269 if (param->records)
2270 {
2271 printf("Records:%18s M.recordlength:%9lu Packed:%14.0f%%\n",
2272 llstr(param->records,llbuff),
2273 (long)((param->used - param->link_used)/param->records),
2274 (share->base.blobs ? 0.0 :
2275 (ulonglong2double((ulonglong) share->base.reclength *
2276 param->records)-
2277 my_off_t2double(param->used))/
2278 ulonglong2double((ulonglong) share->base.reclength *
2279 param->records)*100.0));
2280 printf("Recordspace used:%9.0f%% Empty space:%12d%% "
2281 "Blocks/Record: %6.2f\n",
2282 (ulonglong2double(param->used - param->link_used)/
2283 ulonglong2double(param->used-param->link_used+param->empty) *
2284 100.0),
2285 (!param->records ? 100 :
2286 (int) (ulonglong2double(param->del_length+param->empty)/
2287 my_off_t2double(param->used)*100.0)),
2288 ulonglong2double(param->splits - param->del_blocks) /
2289 param->records);
2290 }
2291 else
2292 printf("Records:%18s\n", "0");
2293 }
2294 printf("Record blocks:%12s Delete blocks:%10s\n",
2295 llstr(param->splits - param->del_blocks, llbuff),
2296 llstr(param->del_blocks, llbuff2));
2297 printf("Record data: %12s Deleted data: %10s\n",
2298 llstr(param->used - param->link_used,llbuff),
2299 llstr(param->del_length, llbuff2));
2300 printf("Empty space: %12s Linkdata: %10s\n",
2301 llstr(param->empty, llbuff),llstr(param->link_used, llbuff2));
2302 if (share->data_file_type == BLOCK_RECORD)
2303 {
2304 printf("Full pages: %12s Tail count: %12s\n",
2305 llstr(param->full_page_count, llbuff),
2306 llstr(param->tail_count, llbuff2));
2307 printf("Lost space: %12s\n", llstr(param->lost, llbuff));
2308 if (param->max_found_trid)
2309 {
2310 printf("Max trans. id: %11s\n",
2311 llstr(param->max_found_trid, llbuff));
2312 }
2313 }
2314 }
2315 my_free(record);
2316 DBUG_RETURN (error);
2317
2318 err:
2319 my_free(record);
2320 param->testflag|=T_RETRY_WITHOUT_QUICK;
2321 DBUG_RETURN(1);
2322 } /* maria_chk_data_link */
2323
2324
2325 /**
2326 Prepares a table for a repair or index sort: flushes pages, records durably
2327 in the table that it is undergoing the operation (if that op crashes, that
2328 info will serve for Recovery and the user).
2329
2330 If we start overwriting the index file, and crash then, old REDOs will
2331 be tried and fail. To prevent that, we bump skip_redo_lsn, and thus we have
2332 to flush and sync pages so that old REDOs can be skipped.
2333 If this is not a bulk insert, which Recovery can handle gracefully (by
2334 truncating files, see UNDO_BULK_INSERT) we also mark the table
2335 crashed-on-repair, so that user knows it has to re-repair. If bulk insert we
2336 shouldn't mark it crashed-on-repair, because if we did this, the UNDO phase
2337 would skip the table (UNDO_BULK_INSERT would not be applied),
2338 and maria_chk would not improve that.
2339 If this is an OPTIMIZE which merely sorts index, we need to do the same
2340 too: old REDOs should not apply to the new index file.
2341 Only the flush is needed when in maria_chk which is not crash-safe.
2342
2343 @param info table
2344 @param param repair parameters
2345 @param discard_index if index pages can be thrown away
2346 */
2347
protect_against_repair_crash(MARIA_HA * info,const HA_CHECK * param,my_bool discard_index)2348 static my_bool protect_against_repair_crash(MARIA_HA *info,
2349 const HA_CHECK *param,
2350 my_bool discard_index)
2351 {
2352 MARIA_SHARE *share= info->s;
2353
2354 /*
2355 There are other than recovery-related reasons to do the writes below:
2356 - the physical size of the data file is sometimes used during repair: we
2357 need to flush to have it exact
2358 - we flush the state because maria_open(HA_OPEN_COPY) will want to read
2359 it from disk.
2360 */
2361 if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
2362 FLUSH_FORCE_WRITE,
2363 discard_index ? FLUSH_IGNORE_CHANGED :
2364 FLUSH_FORCE_WRITE) ||
2365 (share->changed &&
2366 _ma_state_info_write(share,
2367 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
2368 MA_STATE_INFO_WRITE_FULL_INFO |
2369 MA_STATE_INFO_WRITE_LOCK)))
2370 return TRUE;
2371 /* In maria_chk this is not needed: */
2372 if (maria_multi_threaded && share->base.born_transactional)
2373 {
2374 if ((param->testflag & T_NO_CREATE_RENAME_LSN) == 0)
2375 {
2376 /* this can be true only for a transactional table */
2377 maria_mark_in_repair(info);
2378 if (_ma_state_info_write(share,
2379 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
2380 MA_STATE_INFO_WRITE_LOCK))
2381 return TRUE;
2382 }
2383 if (translog_status == TRANSLOG_OK &&
2384 _ma_update_state_lsns(share, translog_get_horizon(),
2385 share->state.create_trid, FALSE, FALSE))
2386 return TRUE;
2387 if (_ma_sync_table_files(info))
2388 return TRUE;
2389 }
2390 return FALSE;
2391 }
2392
2393
2394 /**
2395 @brief Initialize variables for repair
2396 */
2397
initialize_variables_for_repair(HA_CHECK * param,MARIA_SORT_INFO * sort_info,MARIA_SORT_PARAM * sort_param,MARIA_HA * info,my_bool rep_quick,MARIA_SHARE * org_share)2398 static int initialize_variables_for_repair(HA_CHECK *param,
2399 MARIA_SORT_INFO *sort_info,
2400 MARIA_SORT_PARAM *sort_param,
2401 MARIA_HA *info,
2402 my_bool rep_quick,
2403 MARIA_SHARE *org_share)
2404 {
2405 MARIA_SHARE *share= info->s;
2406 size_t tmp;
2407 uint threads;
2408
2409 /*
2410 We have to clear these variables first, as the cleanup-in-case-of-error
2411 handling may touch these.
2412 */
2413 bzero((char*) sort_info, sizeof(*sort_info));
2414 bzero((char*) sort_param, sizeof(*sort_param));
2415 bzero(&info->rec_cache, sizeof(info->rec_cache));
2416
2417 if (share->data_file_type == NO_RECORD)
2418 {
2419 _ma_check_print_error(param,
2420 "Can't repair tables with record type NO_DATA");
2421 return 1;
2422 }
2423
2424 /* Make a copy to allow us to restore state and check how state changed */
2425 memcpy(org_share, share, sizeof(*share));
2426
2427 /* Repair code relies on share->state.state so we have to update it here */
2428 if (share->lock.update_status)
2429 (*share->lock.update_status)(info->lock.status_param);
2430
2431 param->testflag|= T_REP; /* for easy checking */
2432 if (share->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
2433 param->testflag|= T_CALC_CHECKSUM;
2434 param->glob_crc= 0;
2435 if (rep_quick)
2436 param->testflag|= T_QUICK;
2437 else
2438 param->testflag&= ~T_QUICK;
2439 param->org_key_map= share->state.key_map;
2440
2441 /*
2442 Clear check variables set by repair. This is needed to allow one to run
2443 several repair's in a row with same param
2444 */
2445 param->retry_repair= 0;
2446 param->warning_printed= 0;
2447 param->error_printed= 0;
2448 param->wrong_trd_printed= 0;
2449
2450 sort_param->sort_info= sort_info;
2451 sort_param->fix_datafile= ! rep_quick;
2452 sort_param->calc_checksum= MY_TEST(param->testflag & T_CALC_CHECKSUM);
2453 sort_info->info= sort_info->new_info= info;
2454 sort_info->param= param;
2455 set_data_file_type(sort_info, info->s);
2456 sort_info->org_data_file_type= share->data_file_type;
2457
2458 info->rec_cache.file= info->dfile.file;
2459 info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
2460
2461 if (protect_against_repair_crash(info, param,
2462 !MY_TEST(param->testflag &
2463 T_CREATE_MISSING_KEYS)))
2464 return 1;
2465
2466 /* calculate max_records */
2467 sort_info->filelength= my_seek(info->dfile.file, 0L, MY_SEEK_END, MYF(0));
2468
2469 param->max_progress= sort_info->filelength;
2470 if ((param->testflag & T_CREATE_MISSING_KEYS) ||
2471 sort_info->org_data_file_type == COMPRESSED_RECORD)
2472 sort_info->max_records= share->state.state.records;
2473 else
2474 {
2475 ulong rec_length;
2476 rec_length= MY_MAX(share->base.min_pack_length,
2477 share->base.min_block_length);
2478 sort_info->max_records= (ha_rows) (sort_info->filelength / rec_length);
2479 }
2480
2481 /* We don't need a bigger sort buffer than file_length * 8 */
2482 threads= (param->testflag & T_REP_PARALLEL) ? (uint) share->base.keys : 1;
2483 tmp= (size_t) MY_MIN(sort_info->filelength,
2484 (my_off_t) (SIZE_T_MAX/10/threads));
2485 tmp= MY_MAX(tmp * 8 * threads, (size_t) 65536); /* Some margin */
2486 param->sort_buffer_length= MY_MIN(param->orig_sort_buffer_length,
2487 tmp);
2488 set_if_smaller(param->sort_buffer_length, tmp);
2489 /* Protect against too big sort buffer length */
2490 #if SIZEOF_SIZE_T >= 8
2491 set_if_smaller(param->sort_buffer_length, 16LL*1024LL*1024LL*1024LL);
2492 #else
2493 set_if_smaller(param->sort_buffer_length, 1L*1024L*1024L*1024L);
2494 #endif
2495
2496 /* Set up transaction handler so that we can see all rows */
2497 if (param->max_trid == 0)
2498 {
2499 if (!ma_control_file_inited())
2500 param->max_trid= 0; /* Give warning for first trid found */
2501 else
2502 param->max_trid= max_trid_in_system();
2503 }
2504 maria_ignore_trids(info);
2505 /* Don't write transid's during repair */
2506 maria_versioning(info, 0);
2507 /* remember original number of rows */
2508 *info->state= info->s->state.state;
2509 return 0;
2510 }
2511
2512
2513 /*
2514 During initialize_variables_for_repair and related functions we set some
2515 variables to values that makes sence during repair.
2516 This function restores these values to their original values so that we can
2517 use the handler in MariaDB without having to close and open the table.
2518 */
2519
restore_table_state_after_repair(MARIA_HA * info,MARIA_SHARE * org_share)2520 static void restore_table_state_after_repair(MARIA_HA *info,
2521 MARIA_SHARE *org_share)
2522 {
2523 maria_versioning(info, info->s->have_versioning);
2524 info->s->lock_key_trees= org_share->lock_key_trees;
2525 DBUG_ASSERT(!info->s->have_versioning || info->s->lock_key_trees);
2526 }
2527
2528
2529 /**
2530 @brief Drop all indexes
2531
2532 @param[in] param check parameters
2533 @param[in] info MARIA_HA handle
2534 @param[in] force if to force drop all indexes
2535
2536 @return status
2537 @retval 0 OK
2538 @retval != 0 Error
2539
2540 @note
2541 Once allocated, index blocks remain part of the key file forever.
2542 When indexes are disabled, no block is freed. When enabling indexes,
2543 no block is freed either. The new indexes are create from new
2544 blocks. (Bug #4692)
2545
2546 Before recreating formerly disabled indexes, the unused blocks
2547 must be freed. There are two options to do this:
2548 - Follow the tree of disabled indexes, add all blocks to the
2549 deleted blocks chain. Would require a lot of random I/O.
2550 - Drop all blocks by clearing all index root pointers and all
2551 delete chain pointers and resetting key_file_length to the end
2552 of the index file header. This requires to recreate all indexes,
2553 even those that may still be intact.
2554 The second method is probably faster in most cases.
2555
2556 When disabling indexes, MySQL disables either all indexes or all
2557 non-unique indexes. When MySQL [re-]enables disabled indexes
2558 (T_CREATE_MISSING_KEYS), then we either have "lost" blocks in the
2559 index file, or there are no non-unique indexes. In the latter case,
2560 maria_repair*() would not be called as there would be no disabled
2561 indexes.
2562
2563 If there would be more unique indexes than disabled (non-unique)
2564 indexes, we could do the first method. But this is not implemented
2565 yet. By now we drop and recreate all indexes when repair is called.
2566
2567 However, there is an exception. Sometimes MySQL disables non-unique
2568 indexes when the table is empty (e.g. when copying a table in
2569 mysql_alter_table()). When enabling the non-unique indexes, they
2570 are still empty. So there is no index block that can be lost. This
2571 optimization is implemented in this function.
2572
2573 Note that in normal repair (T_CREATE_MISSING_KEYS not set) we
2574 recreate all enabled indexes unconditonally. We do not change the
2575 key_map. Otherwise we invert the key map temporarily (outside of
2576 this function) and recreate the then "seemingly" enabled indexes.
2577 When we cannot use the optimization, and drop all indexes, we
2578 pretend that all indexes were disabled. By the inversion, we will
2579 then recrate all indexes.
2580 */
2581
maria_drop_all_indexes(HA_CHECK * param,MARIA_HA * info,my_bool force)2582 static int maria_drop_all_indexes(HA_CHECK *param, MARIA_HA *info,
2583 my_bool force)
2584 {
2585 MARIA_SHARE *share= info->s;
2586 MARIA_STATE_INFO *state= &share->state;
2587 uint i;
2588 DBUG_ENTER("maria_drop_all_indexes");
2589
2590 /*
2591 If any of the disabled indexes has a key block assigned, we must
2592 drop and recreate all indexes to avoid losing index blocks.
2593
2594 If we want to recreate disabled indexes only _and_ all of these
2595 indexes are empty, we don't need to recreate the existing indexes.
2596 */
2597 if (!force && (param->testflag & T_CREATE_MISSING_KEYS))
2598 {
2599 DBUG_PRINT("repair", ("creating missing indexes"));
2600 for (i= 0; i < share->base.keys; i++)
2601 {
2602 DBUG_PRINT("repair", ("index #: %u key_root:%lld active: %d",
2603 i, state->key_root[i],
2604 maria_is_key_active(state->key_map, i)));
2605 if ((state->key_root[i] != HA_OFFSET_ERROR) &&
2606 !maria_is_key_active(state->key_map, i))
2607 {
2608 /*
2609 This index has at least one key block and it is disabled.
2610 We would lose its block(s) if would just recreate it.
2611 So we need to drop and recreate all indexes.
2612 */
2613 DBUG_PRINT("repair", ("nonempty and disabled: recreate all"));
2614 break;
2615 }
2616 }
2617 if (i >= share->base.keys)
2618 goto end;
2619
2620 /*
2621 We do now drop all indexes and declare them disabled. With the
2622 T_CREATE_MISSING_KEYS flag, maria_repair*() will recreate all
2623 disabled indexes and enable them.
2624 */
2625 maria_clear_all_keys_active(state->key_map);
2626 DBUG_PRINT("repair", ("declared all indexes disabled"));
2627 }
2628
2629 /* Flush obsolete index data from key cache */
2630 _ma_flush_table_files(info, MARIA_FLUSH_INDEX,
2631 FLUSH_IGNORE_CHANGED, FLUSH_IGNORE_CHANGED);
2632 /* Clear index root block pointers. */
2633 for (i= 0; i < share->base.keys; i++)
2634 state->key_root[i]= HA_OFFSET_ERROR;
2635
2636 /* Drop the delete chain. */
2637 share->state.key_del= HA_OFFSET_ERROR;
2638
2639 /* Reset index file length to end of index file header. */
2640 share->state.state.key_file_length= share->base.keystart;
2641
2642 end:
2643 DBUG_RETURN(0);
2644 }
2645
2646
2647 /*
2648 Recover old table by reading each record and writing all keys
2649
2650 NOTES
2651 Save new datafile-name in temp_filename.
2652 We overwrite the index file as we go (writekeys() for example), so if we
2653 crash during this the table is unusable and user (or Recovery in the
2654 future) must repeat the REPAIR/OPTIMIZE operation. We could use a
2655 temporary index file in the future (drawback: more disk space).
2656
2657 IMPLEMENTATION (for hard repair with block format)
2658 - Create new, unrelated MARIA_HA of the table
2659 - Create new datafile and associate it with new handler
2660 - Reset all statistic information in new handler
2661 - Copy all data to new handler with normal write operations
2662 - Move state of new handler to old handler
2663 - Close new handler
2664 - Close data file in old handler
2665 - Rename old data file to new data file.
2666 - Reopen data file in old handler
2667 */
2668
maria_repair(HA_CHECK * param,register MARIA_HA * info,char * name,my_bool rep_quick)2669 int maria_repair(HA_CHECK *param, register MARIA_HA *info,
2670 char *name, my_bool rep_quick)
2671 {
2672 int error, got_error;
2673 ha_rows start_records,new_header_length;
2674 my_off_t del;
2675 File new_file;
2676 MARIA_SHARE *share= info->s;
2677 char llbuff[22],llbuff2[22];
2678 MARIA_SORT_INFO sort_info;
2679 MARIA_SORT_PARAM sort_param;
2680 my_bool block_record, scan_inited= 0, reenable_logging= 0;
2681 enum data_file_type org_data_file_type= share->data_file_type;
2682 myf sync_dir= ((share->now_transactional && !share->temporary) ?
2683 MY_SYNC_DIR : 0);
2684 MARIA_SHARE backup_share;
2685 DBUG_ENTER("maria_repair");
2686
2687 got_error= 1;
2688 new_file= -1;
2689 start_records= share->state.state.records;
2690 if (!(param->testflag & T_SILENT))
2691 {
2692 printf("- recovering (with keycache) Aria-table '%s'\n",name);
2693 printf("Data records: %s\n", llstr(start_records, llbuff));
2694 }
2695
2696 if (initialize_variables_for_repair(param, &sort_info, &sort_param, info,
2697 rep_quick, &backup_share))
2698 goto err;
2699
2700 if ((reenable_logging= share->now_transactional))
2701 _ma_tmp_disable_logging_for_table(info, 0);
2702
2703 sort_param.current_filepos= sort_param.filepos= new_header_length=
2704 ((param->testflag & T_UNPACK) ? 0L : share->pack.header_length);
2705
2706 if (!rep_quick)
2707 {
2708 /* Get real path for data file */
2709 if ((new_file= mysql_file_create(key_file_tmp,
2710 fn_format(param->temp_filename,
2711 share->data_file_name.str, "",
2712 DATA_TMP_EXT, 2+4),
2713 0,param->tmpfile_createflag,
2714 MYF(0))) < 0)
2715 {
2716 _ma_check_print_error(param,"Can't create new tempfile: '%s'",
2717 param->temp_filename);
2718 goto err;
2719 }
2720 if (new_header_length &&
2721 maria_filecopy(param, new_file, info->dfile.file, 0L,
2722 new_header_length, "datafile-header"))
2723 goto err;
2724 share->state.dellink= HA_OFFSET_ERROR;
2725 info->rec_cache.file= new_file; /* For sort_delete_record */
2726 if (share->data_file_type == BLOCK_RECORD ||
2727 (param->testflag & T_UNPACK))
2728 {
2729 if (create_new_data_handle(&sort_param, new_file))
2730 goto err;
2731 sort_info.new_info->rec_cache.file= new_file;
2732 }
2733 }
2734
2735 block_record= sort_info.new_info->s->data_file_type == BLOCK_RECORD;
2736
2737 if (org_data_file_type != BLOCK_RECORD)
2738 {
2739 /* We need a read buffer to read rows in big blocks */
2740 if (init_io_cache(¶m->read_cache, info->dfile.file,
2741 (uint) param->read_buffer_length,
2742 READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)))
2743 goto err;
2744 }
2745 if (sort_info.new_info->s->data_file_type != BLOCK_RECORD)
2746 {
2747 /* When writing to not block records, we need a write buffer */
2748 if (!rep_quick)
2749 {
2750 if (init_io_cache(&sort_info.new_info->rec_cache, new_file,
2751 (uint) param->write_buffer_length,
2752 WRITE_CACHE, new_header_length, 1,
2753 MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))
2754 goto err;
2755 sort_info.new_info->opt_flag|=WRITE_CACHE_USED;
2756 }
2757 }
2758 else if (block_record)
2759 {
2760 scan_inited= 1;
2761 if (maria_scan_init(sort_info.info))
2762 goto err;
2763 }
2764
2765 if (!(sort_param.record=
2766 (uchar *) my_malloc(PSI_INSTRUMENT_ME, (uint)
2767 share->base.default_rec_buff_size, MYF(0))) ||
2768 _ma_alloc_buffer(&sort_param.rec_buff, &sort_param.rec_buff_size,
2769 share->base.default_rec_buff_size, MYF(0)))
2770 {
2771 _ma_check_print_error(param, "Not enough memory for extra record");
2772 goto err;
2773 }
2774
2775 sort_param.read_cache=param->read_cache;
2776 sort_param.pos=sort_param.max_pos=share->pack.header_length;
2777 param->read_cache.end_of_file= sort_info.filelength;
2778 sort_param.master=1;
2779 sort_info.max_records= ~(ha_rows) 0;
2780
2781 del= share->state.state.del;
2782 share->state.state.records= share->state.state.del= share->state.split= 0;
2783 share->state.state.empty= 0;
2784
2785 if (param->testflag & T_CREATE_MISSING_KEYS)
2786 maria_set_all_keys_active(share->state.key_map, share->base.keys);
2787 maria_drop_all_indexes(param, info, TRUE);
2788
2789 maria_lock_memory(param); /* Everything is alloced */
2790
2791 sort_param.sort_info->info->in_check_table= 1;
2792 /* Re-create all keys, which are set in key_map. */
2793 while (!(error=sort_get_next_record(&sort_param)))
2794 {
2795 if (block_record && _ma_sort_write_record(&sort_param))
2796 goto err;
2797
2798 if (writekeys(&sort_param))
2799 {
2800 if (my_errno != HA_ERR_FOUND_DUPP_KEY)
2801 goto err;
2802 DBUG_DUMP("record", sort_param.record,
2803 share->base.default_rec_buff_size);
2804 _ma_check_print_warning(param,
2805 "Duplicate key %2d for record at %10s against "
2806 "new record at %10s",
2807 info->errkey+1,
2808 record_pos_to_txt(info,
2809 sort_param.current_filepos,
2810 llbuff),
2811 record_pos_to_txt(info,
2812 info->dup_key_pos, llbuff2));
2813 if (param->testflag & T_VERBOSE)
2814 {
2815 MARIA_KEY tmp_key;
2816 MARIA_KEYDEF *keyinfo= share->keyinfo + info->errkey;
2817 (*keyinfo->make_key)(info, &tmp_key, (uint) info->errkey,
2818 info->lastkey_buff,
2819 sort_param.record, 0L, 0);
2820 _ma_print_key(stdout, &tmp_key);
2821 }
2822 sort_info.dupp++;
2823 if ((param->testflag & (T_FORCE_UNIQUENESS|T_QUICK)) == T_QUICK)
2824 {
2825 param->testflag|=T_RETRY_WITHOUT_QUICK;
2826 param->error_printed++;
2827 goto err;
2828 }
2829 /* purecov: begin tested */
2830 if (block_record)
2831 {
2832 sort_info.new_info->s->state.state.records--;
2833 if ((*sort_info.new_info->s->write_record_abort)(sort_info.new_info))
2834 {
2835 _ma_check_print_error(param,"Couldn't delete duplicate row");
2836 goto err;
2837 }
2838 }
2839 /* purecov: end */
2840 continue;
2841 }
2842 if (!block_record)
2843 {
2844 if (_ma_sort_write_record(&sort_param))
2845 goto err;
2846 /* Filepos is pointer to where next row will be stored */
2847 sort_param.current_filepos= sort_param.filepos;
2848 }
2849 }
2850 if (error > 0 || maria_write_data_suffix(&sort_info, !rep_quick) ||
2851 flush_io_cache(&sort_info.new_info->rec_cache) ||
2852 param->read_cache.error < 0)
2853 goto err;
2854
2855 if (param->testflag & T_WRITE_LOOP)
2856 {
2857 fputs(" \r",stdout); fflush(stdout);
2858 }
2859 if (mysql_file_chsize(share->kfile.file,
2860 share->state.state.key_file_length, 0, MYF(0)))
2861 {
2862 _ma_check_print_warning(param,
2863 "Can't change size of indexfile, error: %d",
2864 my_errno);
2865 goto err;
2866 }
2867
2868 if (rep_quick && del+sort_info.dupp != share->state.state.del)
2869 {
2870 _ma_check_print_error(param,"Couldn't fix table with quick recovery: "
2871 "Found wrong number of deleted records");
2872 _ma_check_print_error(param,"Run recovery again without -q");
2873 param->retry_repair=1;
2874 param->testflag|=T_RETRY_WITHOUT_QUICK;
2875 goto err;
2876 }
2877
2878 if (param->testflag & T_SAFE_REPAIR)
2879 {
2880 /* Don't repair if we loosed more than one row */
2881 if (sort_info.new_info->s->state.state.records+1 < start_records)
2882 {
2883 share->state.state.records= start_records;
2884 goto err;
2885 }
2886 }
2887
2888 end_io_cache(&sort_info.new_info->rec_cache);
2889 info->opt_flag&= ~WRITE_CACHE_USED;
2890
2891 /*
2892 As we have read the data file (sort_get_next_record()) we may have
2893 cached, non-changed blocks of it in the page cache. We must throw them
2894 away as we are going to close their descriptor ('new_file'). We also want
2895 to flush any index block, so that it is ready for the upcoming sync.
2896 */
2897 if (_ma_flush_table_files_before_swap(param, info))
2898 goto err;
2899
2900 if (!rep_quick)
2901 {
2902 sort_info.new_info->s->state.state.data_file_length= sort_param.filepos;
2903 if (sort_info.new_info != sort_info.info)
2904 {
2905 MARIA_STATE_INFO save_state= sort_info.new_info->s->state;
2906 if (maria_close(sort_info.new_info))
2907 {
2908 _ma_check_print_error(param, "Got error %d on close", my_errno);
2909 goto err;
2910 }
2911 copy_data_file_state(&share->state, &save_state);
2912 new_file= -1;
2913 sort_info.new_info= info;
2914 }
2915 share->state.version=(ulong) time((time_t*) 0); /* Force reopen */
2916
2917 /* Replace the actual file with the temporary file */
2918 if (new_file >= 0)
2919 mysql_file_close(new_file, MYF(MY_WME));
2920 new_file= -1;
2921 change_data_file_descriptor(info, -1);
2922 if (maria_change_to_newfile(share->data_file_name.str, MARIA_NAME_DEXT,
2923 DATA_TMP_EXT, param->backup_time,
2924 (param->testflag & T_BACKUP_DATA ?
2925 MYF(MY_REDEL_MAKE_BACKUP): MYF(0)) |
2926 sync_dir) ||
2927 _ma_open_datafile(info, share))
2928 {
2929 goto err;
2930 }
2931 }
2932 else
2933 {
2934 share->state.state.data_file_length= sort_param.max_pos;
2935 }
2936 if (param->testflag & T_CALC_CHECKSUM)
2937 share->state.state.checksum= param->glob_crc;
2938
2939 if (!(param->testflag & T_SILENT))
2940 {
2941 if (start_records != share->state.state.records)
2942 printf("Data records: %s\n", llstr(share->state.state.records,llbuff));
2943 }
2944 if (sort_info.dupp)
2945 _ma_check_print_warning(param,
2946 "%s records have been removed",
2947 llstr(sort_info.dupp,llbuff));
2948
2949 got_error= 0;
2950 /* If invoked by external program that uses thr_lock */
2951 if (&share->state.state != info->state)
2952 *info->state= *info->state_start= share->state.state;
2953
2954 err:
2955 if (scan_inited)
2956 maria_scan_end(sort_info.info);
2957 _ma_reset_state(info);
2958
2959 end_io_cache(¶m->read_cache);
2960 if (sort_info.new_info)
2961 {
2962 end_io_cache(&sort_info.new_info->rec_cache);
2963 sort_info.new_info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2964 }
2965 info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
2966
2967 sort_param.sort_info->info->in_check_table= 0;
2968 /* this below could fail, shouldn't we detect error? */
2969 if (got_error)
2970 {
2971 if (! param->error_printed)
2972 _ma_check_print_error(param,"%d for record at pos %s",my_errno,
2973 llstr(sort_param.start_recpos,llbuff));
2974 (void)_ma_flush_table_files_before_swap(param, info);
2975 if (sort_info.new_info && sort_info.new_info != sort_info.info)
2976 {
2977 unuse_data_file_descriptor(sort_info.new_info);
2978 maria_close(sort_info.new_info);
2979 }
2980 if (new_file >= 0)
2981 {
2982 mysql_file_close(new_file,MYF(0));
2983 mysql_file_delete(key_file_tmp, param->temp_filename, MYF(MY_WME));
2984 }
2985 maria_mark_crashed_on_repair(info);
2986 }
2987 /* If caller had disabled logging it's not up to us to re-enable it */
2988 if (reenable_logging)
2989 _ma_reenable_logging_for_table(info, FALSE);
2990 restore_table_state_after_repair(info, &backup_share);
2991
2992 my_free(sort_param.rec_buff);
2993 my_free(sort_param.record);
2994 my_free(sort_info.buff);
2995 if (!got_error && (param->testflag & T_UNPACK))
2996 restore_data_file_type(share);
2997 share->state.changed|= (STATE_NOT_OPTIMIZED_KEYS | STATE_NOT_SORTED_PAGES |
2998 STATE_NOT_ANALYZED | STATE_NOT_ZEROFILLED);
2999 if (!rep_quick)
3000 share->state.changed&= ~(STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_MOVABLE);
3001 DBUG_RETURN(got_error);
3002 }
3003
3004
3005 /* Uppdate keyfile when doing repair */
3006
writekeys(MARIA_SORT_PARAM * sort_param)3007 static int writekeys(MARIA_SORT_PARAM *sort_param)
3008 {
3009 uint i;
3010 MARIA_HA *info= sort_param->sort_info->info;
3011 MARIA_SHARE *share= info->s;
3012 uchar *record= sort_param->record;
3013 uchar *key_buff;
3014 my_off_t filepos= sort_param->current_filepos;
3015 MARIA_KEY key;
3016 DBUG_ENTER("writekeys");
3017
3018 key_buff= info->lastkey_buff+share->base.max_key_length;
3019
3020 for (i=0 ; i < share->base.keys ; i++)
3021 {
3022 if (maria_is_key_active(share->state.key_map, i))
3023 {
3024 if (share->keyinfo[i].flag & HA_FULLTEXT )
3025 {
3026 if (_ma_ft_add(info, i, key_buff, record, filepos))
3027 goto err;
3028 }
3029 else
3030 {
3031 if (!(*share->keyinfo[i].make_key)(info, &key, i, key_buff, record,
3032 filepos, 0))
3033 goto err;
3034 if ((*share->keyinfo[i].ck_insert)(info, &key))
3035 goto err;
3036 }
3037 }
3038 }
3039 DBUG_RETURN(0);
3040
3041 err:
3042 if (my_errno == HA_ERR_FOUND_DUPP_KEY)
3043 {
3044 info->errkey=(int) i; /* This key was found */
3045 while ( i-- > 0 )
3046 {
3047 if (maria_is_key_active(share->state.key_map, i))
3048 {
3049 if (share->keyinfo[i].flag & HA_FULLTEXT)
3050 {
3051 if (_ma_ft_del(info,i,key_buff,record,filepos))
3052 break;
3053 }
3054 else
3055 {
3056 (*share->keyinfo[i].make_key)(info, &key, i, key_buff, record,
3057 filepos, 0);
3058 if (_ma_ck_delete(info, &key))
3059 break;
3060 }
3061 }
3062 }
3063 }
3064 /* Remove checksum that was added to glob_crc in sort_get_next_record */
3065 if (sort_param->calc_checksum)
3066 sort_param->sort_info->param->glob_crc-= info->cur_row.checksum;
3067 DBUG_PRINT("error",("errno: %d",my_errno));
3068 DBUG_RETURN(-1);
3069 } /* writekeys */
3070
3071
3072 /* Change all key-pointers that points to a records */
3073
maria_movepoint(register MARIA_HA * info,uchar * record,MARIA_RECORD_POS oldpos,MARIA_RECORD_POS newpos,uint prot_key)3074 int maria_movepoint(register MARIA_HA *info, uchar *record,
3075 MARIA_RECORD_POS oldpos, MARIA_RECORD_POS newpos,
3076 uint prot_key)
3077 {
3078 uint i;
3079 uchar *key_buff;
3080 MARIA_SHARE *share= info->s;
3081 MARIA_PAGE page;
3082 DBUG_ENTER("maria_movepoint");
3083
3084 key_buff= info->lastkey_buff + share->base.max_key_length;
3085 for (i=0 ; i < share->base.keys; i++)
3086 {
3087 if (i != prot_key && maria_is_key_active(share->state.key_map, i))
3088 {
3089 MARIA_KEY key;
3090 (*share->keyinfo[i].make_key)(info, &key, i, key_buff, record, oldpos,
3091 0);
3092 if (key.keyinfo->flag & HA_NOSAME)
3093 { /* Change pointer direct */
3094 MARIA_KEYDEF *keyinfo;
3095 keyinfo=share->keyinfo+i;
3096 if (_ma_search(info, &key, (uint32) (SEARCH_SAME | SEARCH_SAVE_BUFF),
3097 share->state.key_root[i]))
3098 DBUG_RETURN(-1);
3099 _ma_page_setup(&page, info, keyinfo, info->last_keypage,
3100 info->keyread_buff);
3101
3102 _ma_dpointer(share, info->int_keypos - page.node -
3103 share->rec_reflength,newpos);
3104
3105 if (_ma_write_keypage(&page, PAGECACHE_LOCK_LEFT_UNLOCKED,
3106 DFLT_INIT_HITS))
3107 DBUG_RETURN(-1);
3108 }
3109 else
3110 { /* Change old key to new */
3111 if (_ma_ck_delete(info, &key))
3112 DBUG_RETURN(-1);
3113 (*share->keyinfo[i].make_key)(info, &key, i, key_buff, record, newpos,
3114 0);
3115 if (_ma_ck_write(info, &key))
3116 DBUG_RETURN(-1);
3117 }
3118 }
3119 }
3120 DBUG_RETURN(0);
3121 } /* maria_movepoint */
3122
3123
3124 /* Tell system that we want all memory for our cache */
3125
maria_lock_memory(HA_CHECK * param)3126 void maria_lock_memory(HA_CHECK *param __attribute__((unused)))
3127 {
3128 #ifdef SUN_OS /* Key-cacheing thrases on sun 4.1 */
3129 if (param->opt_maria_lock_memory)
3130 {
3131 int success = mlockall(MCL_CURRENT); /* or plock(DATLOCK); */
3132 if (geteuid() == 0 && success != 0)
3133 _ma_check_print_warning(param,
3134 "Failed to lock memory. errno %d",my_errno);
3135 }
3136 #endif
3137 } /* maria_lock_memory */
3138
3139
3140 /**
3141 Flush all changed blocks to disk.
3142
3143 We release blocks as it's unlikely that they would all be needed soon.
3144 This function needs to be called before swapping data or index files or
3145 syncing them.
3146
3147 @param param description of the repair operation
3148 @param info table
3149 */
3150
_ma_flush_table_files_before_swap(HA_CHECK * param,MARIA_HA * info)3151 static my_bool _ma_flush_table_files_before_swap(HA_CHECK *param,
3152 MARIA_HA *info)
3153 {
3154 DBUG_ENTER("_ma_flush_table_files_before_swap");
3155 if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
3156 FLUSH_RELEASE, FLUSH_RELEASE))
3157 {
3158 _ma_check_print_error(param, "%d when trying to write buffers", my_errno);
3159 DBUG_RETURN(TRUE);
3160 }
3161 DBUG_RETURN(FALSE);
3162 }
3163
3164
3165 /* Sort index for more efficent reads */
3166
maria_sort_index(HA_CHECK * param,register MARIA_HA * info,char * name)3167 int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name)
3168 {
3169 reg2 uint key;
3170 reg1 MARIA_KEYDEF *keyinfo;
3171 File new_file;
3172 my_off_t index_pos[HA_MAX_POSSIBLE_KEY];
3173 uint r_locks,w_locks;
3174 int old_lock;
3175 MARIA_SHARE *share= info->s;
3176 MARIA_STATE_INFO old_state;
3177 myf sync_dir= ((share->now_transactional && !share->temporary) ?
3178 MY_SYNC_DIR : 0);
3179 DBUG_ENTER("maria_sort_index");
3180
3181 /* cannot sort index files with R-tree indexes */
3182 for (key= 0,keyinfo= &share->keyinfo[0]; key < share->base.keys ;
3183 key++,keyinfo++)
3184 if (keyinfo->key_alg == HA_KEY_ALG_RTREE)
3185 DBUG_RETURN(0);
3186
3187 if (!(param->testflag & T_SILENT))
3188 printf("- Sorting index for Aria-table '%s'\n",name);
3189
3190 if (protect_against_repair_crash(info, param, FALSE))
3191 DBUG_RETURN(1);
3192
3193 /* Get real path for index file */
3194 fn_format(param->temp_filename,name,"", MARIA_NAME_IEXT,2+4+32);
3195 if ((new_file=mysql_file_create(key_file_kfile, fn_format(param->temp_filename,param->temp_filename,
3196 "", INDEX_TMP_EXT,2+4),
3197 0, param->tmpfile_createflag, MYF(0))) < 0)
3198 {
3199 _ma_check_print_error(param,"Can't create new tempfile: '%s'",
3200 param->temp_filename);
3201 DBUG_RETURN(-1);
3202 }
3203 if (maria_filecopy(param, new_file, share->kfile.file, 0L,
3204 (ulong) share->base.keystart, "headerblock"))
3205 goto err;
3206
3207 param->new_file_pos=share->base.keystart;
3208 for (key= 0,keyinfo= &share->keyinfo[0]; key < share->base.keys ;
3209 key++,keyinfo++)
3210 {
3211 if (maria_is_key_active(share->state.key_map, key) &&
3212 share->state.key_root[key] != HA_OFFSET_ERROR)
3213 {
3214 index_pos[key]=param->new_file_pos; /* Write first block here */
3215 if (sort_one_index(param,info,keyinfo,share->state.key_root[key],
3216 new_file))
3217 goto err;
3218 }
3219 else
3220 index_pos[key]= HA_OFFSET_ERROR; /* No blocks */
3221 }
3222
3223 /* Flush key cache for this file if we are calling this outside maria_chk */
3224 flush_pagecache_blocks(share->pagecache, &share->kfile,
3225 FLUSH_IGNORE_CHANGED);
3226
3227 share->state.version=(ulong) time((time_t*) 0);
3228 old_state= share->state; /* save state if not stored */
3229 r_locks= share->r_locks;
3230 w_locks= share->w_locks;
3231 old_lock= info->lock_type;
3232
3233 /* Put same locks as old file */
3234 share->r_locks= share->w_locks= share->tot_locks= 0;
3235 (void) _ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE);
3236 mysql_mutex_lock(&share->intern_lock);
3237 mysql_file_close(share->kfile.file, MYF(MY_WME));
3238 share->kfile.file = -1;
3239 mysql_mutex_unlock(&share->intern_lock);
3240 mysql_file_close(new_file, MYF(MY_WME));
3241 if (maria_change_to_newfile(share->index_file_name.str, MARIA_NAME_IEXT,
3242 INDEX_TMP_EXT, 0, sync_dir) ||
3243 _ma_open_keyfile(share))
3244 goto err2;
3245 info->lock_type= F_UNLCK; /* Force maria_readinfo to lock */
3246 _ma_readinfo(info,F_WRLCK,0); /* Will lock the table */
3247 info->lock_type= old_lock;
3248 share->r_locks= r_locks;
3249 share->w_locks= w_locks;
3250 share->tot_locks= r_locks+w_locks;
3251 share->state= old_state; /* Restore old state */
3252
3253 share->state.state.key_file_length=param->new_file_pos;
3254 info->update= (short) (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
3255 for (key=0 ; key < share->base.keys ; key++)
3256 share->state.key_root[key]=index_pos[key];
3257 share->state.key_del= HA_OFFSET_ERROR;
3258
3259 share->state.changed&= ~STATE_NOT_SORTED_PAGES;
3260 DBUG_EXECUTE_IF("maria_flush_whole_log",
3261 {
3262 DBUG_PRINT("maria_flush_whole_log", ("now"));
3263 translog_flush(translog_get_horizon());
3264 });
3265 DBUG_EXECUTE_IF("maria_crash_sort_index",
3266 {
3267 DBUG_PRINT("maria_crash_sort_index", ("now"));
3268 DBUG_SUICIDE();
3269 });
3270 DBUG_RETURN(0);
3271
3272 err:
3273 mysql_file_close(new_file, MYF(MY_WME));
3274 err2:
3275 mysql_file_delete(key_file_tmp, param->temp_filename,MYF(MY_WME));
3276 DBUG_RETURN(-1);
3277 } /* maria_sort_index */
3278
3279
3280 /**
3281 @brief write a page directly to index file
3282
3283 */
3284
write_page(MARIA_SHARE * share,File file,uchar * buff,uint block_size,my_off_t pos,int myf_rw)3285 static int write_page(MARIA_SHARE *share, File file,
3286 uchar *buff, uint block_size,
3287 my_off_t pos, int myf_rw)
3288 {
3289 int res;
3290 PAGECACHE_IO_HOOK_ARGS args;
3291 args.page= buff;
3292 args.pageno= (pgcache_page_no_t) (pos / share->block_size);
3293 args.data= (uchar*) share;
3294 args.crypt_buf= NULL;
3295 (* share->kfile.pre_write_hook)(&args);
3296 res= (int)my_pwrite(file, args.page, block_size, pos, myf_rw);
3297 (* share->kfile.post_write_hook)(res, &args);
3298 return res;
3299 }
3300
3301
3302 /* Sort index blocks recursive using one index */
3303
sort_one_index(HA_CHECK * param,MARIA_HA * info,MARIA_KEYDEF * keyinfo,my_off_t pagepos,File new_file)3304 static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
3305 MARIA_KEYDEF *keyinfo,
3306 my_off_t pagepos, File new_file)
3307 {
3308 uint length,nod_flag;
3309 uchar *buff,*keypos,*endpos;
3310 my_off_t new_page_pos,next_page;
3311 MARIA_SHARE *share= info->s;
3312 MARIA_KEY key;
3313 MARIA_PAGE page;
3314 my_bool buff_alloced;
3315 DBUG_ENTER("sort_one_index");
3316
3317 /* cannot walk over R-tree indices */
3318 DBUG_ASSERT(keyinfo->key_alg != HA_KEY_ALG_RTREE);
3319 new_page_pos=param->new_file_pos;
3320 param->new_file_pos+=keyinfo->block_length;
3321 key.keyinfo= keyinfo;
3322
3323 alloc_on_stack(*param->stack_end_ptr, buff, buff_alloced,
3324 keyinfo->block_length + keyinfo->max_store_length);
3325 if (!buff)
3326 {
3327 _ma_check_print_error(param,"Not enough memory for keyblock");
3328 DBUG_RETURN(-1);
3329 }
3330 key.data= buff + keyinfo->block_length;
3331
3332 if (_ma_fetch_keypage(&page, info, keyinfo, pagepos,
3333 PAGECACHE_LOCK_LEFT_UNLOCKED,
3334 DFLT_INIT_HITS, buff, 0))
3335 {
3336 report_keypage_fault(param, info, pagepos);
3337 goto err;
3338 }
3339
3340 if ((nod_flag= page.node) || keyinfo->flag & HA_FULLTEXT)
3341 {
3342 keypos= page.buff + share->keypage_header + nod_flag;
3343 endpos= page.buff + page.size;
3344
3345 for ( ;; )
3346 {
3347 if (nod_flag)
3348 {
3349 next_page= _ma_kpos(nod_flag,keypos);
3350 /* Save new pos */
3351 _ma_kpointer(info,keypos-nod_flag,param->new_file_pos);
3352 if (sort_one_index(param,info,keyinfo,next_page,new_file))
3353 {
3354 DBUG_PRINT("error",
3355 ("From page: %ld, keyoffset: %lu used_length: %d",
3356 (ulong) pagepos, (ulong) (keypos - buff),
3357 (int) page.size));
3358 DBUG_DUMP("buff", page.buff, page.size);
3359 goto err;
3360 }
3361 }
3362 if (keypos >= endpos ||
3363 !(*keyinfo->get_key)(&key, page.flag, nod_flag, &keypos))
3364 break;
3365 DBUG_ASSERT(keypos <= endpos);
3366 if (keyinfo->flag & HA_FULLTEXT)
3367 {
3368 uint off;
3369 int subkeys;
3370 get_key_full_length_rdonly(off, key.data);
3371 subkeys= ft_sintXkorr(key.data + off);
3372 if (subkeys < 0)
3373 {
3374 next_page= _ma_row_pos_from_key(&key);
3375 _ma_dpointer(share, keypos - nod_flag - share->rec_reflength,
3376 param->new_file_pos); /* Save new pos */
3377 if (sort_one_index(param,info,&share->ft2_keyinfo,
3378 next_page,new_file))
3379 goto err;
3380 }
3381 }
3382 }
3383 }
3384
3385 /* Fill block with zero and write it to the new index file */
3386 length= page.size;
3387 bzero(buff+length,keyinfo->block_length-length);
3388 if (write_page(share, new_file, buff, keyinfo->block_length,
3389 new_page_pos, MYF(MY_NABP | MY_WAIT_IF_FULL)))
3390 {
3391 _ma_check_print_error(param,"Can't write indexblock, error: %d",my_errno);
3392 goto err;
3393 }
3394 stack_alloc_free(buff, buff_alloced);
3395 DBUG_RETURN(0);
3396 err:
3397 stack_alloc_free(buff, buff_alloced);
3398 DBUG_RETURN(1);
3399 } /* sort_one_index */
3400
3401
3402 /**
3403 @brief Fill empty space in index file with zeroes
3404
3405 @return
3406 @retval 0 Ok
3407 @retval 1 Error
3408 */
3409
maria_zerofill_index(HA_CHECK * param,MARIA_HA * info,const char * name)3410 static my_bool maria_zerofill_index(HA_CHECK *param, MARIA_HA *info,
3411 const char *name)
3412 {
3413 MARIA_SHARE *share= info->s;
3414 MARIA_PINNED_PAGE page_link;
3415 char llbuff[21];
3416 uchar *buff;
3417 pgcache_page_no_t page;
3418 my_off_t pos;
3419 my_off_t key_file_length= share->state.state.key_file_length;
3420 uint block_size= share->block_size;
3421 my_bool zero_lsn= (share->base.born_transactional &&
3422 !(param->testflag & T_ZEROFILL_KEEP_LSN));
3423 int error= 1;
3424 enum pagecache_page_type page_type= (share->base.born_transactional ?
3425 PAGECACHE_LSN_PAGE :
3426 PAGECACHE_PLAIN_PAGE);
3427 DBUG_ENTER("maria_zerofill_index");
3428
3429 if (!(param->testflag & T_SILENT))
3430 printf("- Zerofilling index for Aria-table '%s'\n",name);
3431
3432 /* Go through the index file */
3433 for (pos= share->base.keystart, page= (ulonglong) (pos / block_size);
3434 pos < key_file_length;
3435 pos+= block_size, page++)
3436 {
3437 uint length;
3438 if (!(buff= pagecache_read(share->pagecache,
3439 &share->kfile, page,
3440 DFLT_INIT_HITS, 0,
3441 page_type, PAGECACHE_LOCK_WRITE,
3442 &page_link.link)))
3443 {
3444 pagecache_unlock_by_link(share->pagecache, page_link.link,
3445 PAGECACHE_LOCK_WRITE_UNLOCK,
3446 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
3447 LSN_IMPOSSIBLE, 0, FALSE);
3448 _ma_check_print_error(param,
3449 "Page %9s: Got error %d when reading index file",
3450 llstr(pos, llbuff), my_errno);
3451 goto end;
3452 }
3453 if (zero_lsn)
3454 bzero(buff, LSN_SIZE);
3455
3456 if (share->base.born_transactional)
3457 {
3458 uint keynr= _ma_get_keynr(share, buff);
3459 if (keynr < share->base.keys)
3460 {
3461 MARIA_PAGE page;
3462 DBUG_ASSERT(keynr < share->base.keys);
3463
3464 _ma_page_setup(&page, info, share->keyinfo + keynr, pos, buff);
3465 if (_ma_compact_keypage(&page, ~(TrID) 0))
3466 {
3467 _ma_check_print_error(param,
3468 "Page %9s: Got error %d when reading index "
3469 "file",
3470 llstr(pos, llbuff), my_errno);
3471 goto end;
3472 }
3473 }
3474 }
3475
3476 length= _ma_get_page_used(share, buff);
3477 DBUG_ASSERT(length <= block_size);
3478 if (length < block_size)
3479 bzero(buff + length, block_size - length);
3480 pagecache_unlock_by_link(share->pagecache, page_link.link,
3481 PAGECACHE_LOCK_WRITE_UNLOCK,
3482 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
3483 LSN_IMPOSSIBLE, 1, FALSE);
3484 }
3485 error= 0; /* ok */
3486
3487 end:
3488 if (flush_pagecache_blocks(share->pagecache, &share->kfile,
3489 FLUSH_FORCE_WRITE))
3490 DBUG_RETURN(1);
3491 DBUG_RETURN(error);
3492 }
3493
3494
3495 /**
3496 @brief Fill empty space in data file with zeroes
3497
3498 @todo
3499 Zerofill all pages marked in bitmap as empty and change them to
3500 be of type UNALLOCATED_PAGE
3501
3502 @return
3503 @retval 0 Ok
3504 @retval 1 Error
3505 */
3506
maria_zerofill_data(HA_CHECK * param,MARIA_HA * info,const char * name)3507 static my_bool maria_zerofill_data(HA_CHECK *param, MARIA_HA *info,
3508 const char *name)
3509 {
3510 MARIA_SHARE *share= info->s;
3511 MARIA_PINNED_PAGE page_link;
3512 char llbuff[21];
3513 my_off_t pos;
3514 pgcache_page_no_t page;
3515 uint block_size= share->block_size;
3516 MARIA_FILE_BITMAP *bitmap= &share->bitmap;
3517 my_bool zero_lsn= !(param->testflag & T_ZEROFILL_KEEP_LSN), error;
3518 enum pagecache_page_type read_page_type= (share->base.born_transactional ?
3519 PAGECACHE_LSN_PAGE :
3520 PAGECACHE_PLAIN_PAGE);
3521 DBUG_ENTER("maria_zerofill_data");
3522
3523 /* This works only with BLOCK_RECORD files */
3524 if (share->data_file_type != BLOCK_RECORD)
3525 DBUG_RETURN(0);
3526
3527 if (!(param->testflag & T_SILENT))
3528 printf("- Zerofilling data for Aria-table '%s'\n",name);
3529
3530 /* Go through the record file */
3531 for (page= 1, pos= block_size;
3532 pos < share->state.state.data_file_length;
3533 pos+= block_size, page++)
3534 {
3535 uchar *buff;
3536 enum en_page_type page_type;
3537
3538 /* Ignore bitmap pages */
3539 if ((page % share->bitmap.pages_covered) == 0)
3540 continue;
3541 if (!(buff= pagecache_read(share->pagecache,
3542 &info->dfile,
3543 page, 1, 0,
3544 read_page_type, PAGECACHE_LOCK_WRITE,
3545 &page_link.link)))
3546 {
3547 _ma_check_print_error(param,
3548 "Page %9s: Got error: %d when reading datafile",
3549 llstr(pos, llbuff), my_errno);
3550 goto err;
3551 }
3552 page_type= (enum en_page_type) (buff[PAGE_TYPE_OFFSET] & PAGE_TYPE_MASK);
3553 switch (page_type) {
3554 case UNALLOCATED_PAGE:
3555 if (zero_lsn)
3556 bzero(buff, block_size);
3557 else
3558 bzero(buff + LSN_SIZE, block_size - LSN_SIZE);
3559 break;
3560 case BLOB_PAGE:
3561 if (_ma_bitmap_get_page_bits(info, bitmap, page) == 0)
3562 {
3563 /* Unallocated page */
3564 if (zero_lsn)
3565 bzero(buff, block_size);
3566 else
3567 bzero(buff + LSN_SIZE, block_size - LSN_SIZE);
3568 }
3569 else
3570 if (zero_lsn)
3571 bzero(buff, LSN_SIZE);
3572 break;
3573 case HEAD_PAGE:
3574 case TAIL_PAGE:
3575 {
3576 uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
3577 uint offset, dir_start, empty_space;
3578 uchar *dir;
3579
3580 if (zero_lsn)
3581 bzero(buff, LSN_SIZE);
3582 if (max_entry != 0)
3583 {
3584 my_bool is_head_page= (page_type == HEAD_PAGE);
3585 dir= dir_entry_pos(buff, block_size, max_entry - 1);
3586 _ma_compact_block_page(share,
3587 buff, max_entry -1, 0,
3588 is_head_page ? ~(TrID) 0 : 0,
3589 is_head_page ?
3590 share->base.min_block_length : 0);
3591
3592 /* compactation may have increased free space */
3593 empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
3594 if (!enough_free_entries_on_page(share, buff))
3595 empty_space= 0; /* Page is full */
3596 if (_ma_bitmap_set(info, page, is_head_page,
3597 empty_space))
3598 goto err;
3599
3600 /* Zerofill the not used part */
3601 offset= uint2korr(dir) + uint2korr(dir+2);
3602 dir_start= (uint) (dir - buff);
3603 DBUG_ASSERT(dir_start >= offset);
3604 if (dir_start > offset)
3605 bzero(buff + offset, dir_start - offset);
3606 }
3607 break;
3608 }
3609 default:
3610 _ma_check_print_error(param,
3611 "Page %9s: Found unrecognizable block of type %d",
3612 llstr(pos, llbuff), page_type);
3613 goto err;
3614 }
3615 pagecache_unlock_by_link(share->pagecache, page_link.link,
3616 PAGECACHE_LOCK_WRITE_UNLOCK,
3617 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
3618 LSN_IMPOSSIBLE, 1, FALSE);
3619 }
3620 error= _ma_bitmap_flush(share);
3621 if (flush_pagecache_blocks(share->pagecache, &info->dfile,
3622 FLUSH_FORCE_WRITE))
3623 error= 1;
3624 DBUG_RETURN(error);
3625
3626 err:
3627 pagecache_unlock_by_link(share->pagecache, page_link.link,
3628 PAGECACHE_LOCK_WRITE_UNLOCK,
3629 PAGECACHE_UNPIN, LSN_IMPOSSIBLE,
3630 LSN_IMPOSSIBLE, 0, FALSE);
3631 /* flush what was changed so far */
3632 (void) _ma_bitmap_flush(share);
3633 (void) flush_pagecache_blocks(share->pagecache, &info->dfile,
3634 FLUSH_FORCE_WRITE);
3635
3636 DBUG_RETURN(1);
3637 }
3638
3639
3640 /**
3641 @brief Fill empty space in index and data files with zeroes
3642
3643 @return
3644 @retval 0 Ok
3645 @retval 1 Error
3646 */
3647
maria_zerofill(HA_CHECK * param,MARIA_HA * info,const char * name)3648 int maria_zerofill(HA_CHECK *param, MARIA_HA *info, const char *name)
3649 {
3650 my_bool error, reenable_logging,
3651 zero_lsn= !(param->testflag & T_ZEROFILL_KEEP_LSN);
3652 MARIA_SHARE *share= info->s;
3653 DBUG_ENTER("maria_zerofill");
3654 if ((reenable_logging= share->now_transactional))
3655 _ma_tmp_disable_logging_for_table(info, 0);
3656 if (!(error= (maria_zerofill_index(param, info, name) ||
3657 maria_zerofill_data(param, info, name) ||
3658 _ma_set_uuid(info->s, 0))))
3659 {
3660 /*
3661 Mark that we have done zerofill of data and index. If we zeroed pages'
3662 LSN, table is movable.
3663 */
3664 share->state.changed&= ~STATE_NOT_ZEROFILLED;
3665 if (zero_lsn)
3666 {
3667 share->state.changed&= ~(STATE_NOT_MOVABLE | STATE_MOVED);
3668 /* Table should get new LSNs */
3669 share->state.create_rename_lsn= share->state.is_of_horizon=
3670 share->state.skip_redo_lsn= LSN_NEEDS_NEW_STATE_LSNS;
3671 }
3672 /* Ensure state is later flushed to disk, if within maria_chk */
3673 info->update= (HA_STATE_CHANGED | HA_STATE_ROW_CHANGED);
3674
3675 /*
3676 Reset create_trid to make file comparable and to ensure that new
3677 trid's in the file starts from 0.
3678 */
3679 share->state.create_trid= 0;
3680 }
3681 if (reenable_logging)
3682 _ma_reenable_logging_for_table(info, FALSE);
3683 DBUG_RETURN(error);
3684 }
3685
3686
3687 /*
3688 Let temporary file replace old file.
3689 This assumes that the new file was created in the same
3690 directory as given by realpath(filename).
3691 This will ensure that any symlinks that are used will still work.
3692 Copy stats from old file to new file, deletes orignal and
3693 changes new file name to old file name
3694 */
3695
maria_change_to_newfile(const char * filename,const char * old_ext,const char * new_ext,time_t backup_time,myf MyFlags)3696 int maria_change_to_newfile(const char * filename, const char * old_ext,
3697 const char * new_ext, time_t backup_time,
3698 myf MyFlags)
3699 {
3700 char old_filename[FN_REFLEN],new_filename[FN_REFLEN];
3701 /* Get real path to filename */
3702 (void) fn_format(old_filename,filename,"",old_ext,2+4+32);
3703 return my_redel(old_filename,
3704 fn_format(new_filename,old_filename,"",new_ext,2+4),
3705 backup_time,
3706 MYF(MY_WME | MY_LINK_WARNING | MyFlags));
3707 } /* maria_change_to_newfile */
3708
3709
3710 /* Copy a block between two files */
3711
maria_filecopy(HA_CHECK * param,File to,File from,my_off_t start,my_off_t length,const char * type)3712 int maria_filecopy(HA_CHECK *param, File to,File from,my_off_t start,
3713 my_off_t length, const char *type)
3714 {
3715 uchar tmp_buff[IO_SIZE], *buff;
3716 ulong buff_length;
3717 DBUG_ENTER("maria_filecopy");
3718
3719 buff_length=(ulong) MY_MIN(param->write_buffer_length,length);
3720 if (!(buff=my_malloc(PSI_INSTRUMENT_ME, buff_length, MYF(0))))
3721 {
3722 buff=tmp_buff; buff_length=IO_SIZE;
3723 }
3724
3725 mysql_file_seek(from, start, MY_SEEK_SET,MYF(0));
3726 while (length > buff_length)
3727 {
3728 if (mysql_file_read(from, buff, buff_length, MYF(MY_NABP)) ||
3729 mysql_file_write(to, buff, buff_length, param->myf_rw))
3730 goto err;
3731 length-= buff_length;
3732 }
3733 if (mysql_file_read(from, buff, (size_t) length,MYF(MY_NABP)) ||
3734 mysql_file_write(to, buff, (size_t) length,param->myf_rw))
3735 goto err;
3736 if (buff != tmp_buff)
3737 my_free(buff);
3738 DBUG_RETURN(0);
3739 err:
3740 if (buff != tmp_buff)
3741 my_free(buff);
3742 _ma_check_print_error(param,"Can't copy %s to tempfile, error %d",
3743 type,my_errno);
3744 DBUG_RETURN(1);
3745 }
3746
3747
3748 /*
3749 Repair table or given index using sorting
3750
3751 SYNOPSIS
3752 maria_repair_by_sort()
3753 param Repair parameters
3754 info MARIA handler to repair
3755 name Name of table (for warnings)
3756 rep_quick set to <> 0 if we should not change data file
3757
3758 RESULT
3759 0 ok
3760 <>0 Error
3761 */
3762
maria_repair_by_sort(HA_CHECK * param,register MARIA_HA * info,const char * name,my_bool rep_quick)3763 int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
3764 const char * name, my_bool rep_quick)
3765 {
3766 int got_error;
3767 uint i, keys_to_repair;
3768 ha_rows start_records;
3769 my_off_t new_header_length, org_header_length, del;
3770 File new_file;
3771 MARIA_SORT_PARAM sort_param;
3772 MARIA_SHARE *share= info->s;
3773 HA_KEYSEG *keyseg;
3774 double *rec_per_key_part;
3775 char llbuff[22];
3776 MARIA_SORT_INFO sort_info;
3777 ulonglong UNINIT_VAR(key_map);
3778 myf sync_dir= ((share->now_transactional && !share->temporary) ?
3779 MY_SYNC_DIR : 0);
3780 my_bool scan_inited= 0, reenable_logging= 0;
3781 MARIA_SHARE backup_share;
3782 DBUG_ENTER("maria_repair_by_sort");
3783
3784 got_error= 1;
3785 new_file= -1;
3786 start_records= share->state.state.records;
3787 if (!(param->testflag & T_SILENT))
3788 {
3789 printf("- recovering (with sort) Aria-table '%s'\n",name);
3790 printf("Data records: %s\n", llstr(start_records,llbuff));
3791 }
3792
3793 if (initialize_variables_for_repair(param, &sort_info, &sort_param, info,
3794 rep_quick, &backup_share))
3795 goto err;
3796
3797 if ((reenable_logging= share->now_transactional))
3798 _ma_tmp_disable_logging_for_table(info, 0);
3799
3800 org_header_length= share->pack.header_length;
3801 new_header_length= (param->testflag & T_UNPACK) ? 0 : org_header_length;
3802 sort_param.filepos= new_header_length;
3803
3804 if (!rep_quick)
3805 {
3806 /* Get real path for data file */
3807 if ((new_file=mysql_file_create(key_file_tmp,
3808 fn_format(param->temp_filename,
3809 share->data_file_name.str, "",
3810 DATA_TMP_EXT, 2+4),
3811 0,param->tmpfile_createflag,
3812 MYF(0))) < 0)
3813 {
3814 _ma_check_print_error(param,"Can't create new tempfile: '%s'",
3815 param->temp_filename);
3816 goto err;
3817 }
3818 if (new_header_length &&
3819 maria_filecopy(param, new_file, info->dfile.file, 0L,
3820 new_header_length, "datafile-header"))
3821 goto err;
3822
3823 share->state.dellink= HA_OFFSET_ERROR;
3824 info->rec_cache.file= new_file; /* For sort_delete_record */
3825 if (share->data_file_type == BLOCK_RECORD ||
3826 (param->testflag & T_UNPACK))
3827 {
3828 if (create_new_data_handle(&sort_param, new_file))
3829 goto err;
3830 sort_info.new_info->rec_cache.file= new_file;
3831 }
3832 }
3833
3834 if (!(sort_info.key_block=
3835 alloc_key_blocks(param,
3836 (uint) param->sort_key_blocks,
3837 share->base.max_key_block_length)))
3838 goto err;
3839 sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
3840
3841 if (share->data_file_type != BLOCK_RECORD)
3842 {
3843 /* We need a read buffer to read rows in big blocks */
3844 if (init_io_cache(¶m->read_cache, info->dfile.file,
3845 (uint) param->read_buffer_length,
3846 READ_CACHE, org_header_length, 1, MYF(MY_WME)))
3847 goto err;
3848 }
3849 if (sort_info.new_info->s->data_file_type != BLOCK_RECORD)
3850 {
3851 /* When writing to not block records, we need a write buffer */
3852 if (!rep_quick)
3853 {
3854 if (init_io_cache(&sort_info.new_info->rec_cache, new_file,
3855 (uint) param->write_buffer_length,
3856 WRITE_CACHE, new_header_length, 1,
3857 MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))
3858 goto err;
3859 sort_info.new_info->opt_flag|= WRITE_CACHE_USED;
3860 }
3861 }
3862
3863 if (!(sort_param.record=
3864 (uchar*) my_malloc(PSI_INSTRUMENT_ME,
3865 (size_t) share->base.default_rec_buff_size,
3866 MYF(0))) ||
3867 _ma_alloc_buffer(&sort_param.rec_buff, &sort_param.rec_buff_size,
3868 share->base.default_rec_buff_size, MYF(0)))
3869 {
3870 _ma_check_print_error(param, "Not enough memory for extra record");
3871 goto err;
3872 }
3873
3874 /* Optionally drop indexes and optionally modify the key_map */
3875 maria_drop_all_indexes(param, info, FALSE);
3876 key_map= share->state.key_map;
3877 if (param->testflag & T_CREATE_MISSING_KEYS)
3878 {
3879 /* Invert the copied key_map to recreate all disabled indexes. */
3880 key_map= ~key_map;
3881 }
3882
3883 param->read_cache.end_of_file= sort_info.filelength;
3884 sort_param.wordlist=NULL;
3885 init_alloc_root(PSI_INSTRUMENT_ME, &sort_param.wordroot, FTPARSER_MEMROOT_ALLOC_SIZE, 0,
3886 MYF(param->malloc_flags));
3887
3888 sort_param.key_cmp=sort_key_cmp;
3889 sort_param.lock_in_memory=maria_lock_memory;
3890 sort_param.tmpdir=param->tmpdir;
3891 sort_param.master =1;
3892
3893 del=share->state.state.del;
3894
3895 /* Calculate number of keys to repair */
3896 keys_to_repair= 0;
3897 for (sort_param.key=0 ; sort_param.key < share->base.keys ;
3898 sort_param.key++)
3899 {
3900 if (maria_is_key_active(key_map, sort_param.key))
3901 keys_to_repair++;
3902 }
3903 /* For each key we scan and merge sort the keys */
3904 param->max_stage= keys_to_repair*2;
3905
3906 rec_per_key_part= param->new_rec_per_key_part;
3907 for (sort_param.key=0 ; sort_param.key < share->base.keys ;
3908 rec_per_key_part+=sort_param.keyinfo->keysegs, sort_param.key++)
3909 {
3910 sort_param.keyinfo=share->keyinfo+sort_param.key;
3911 /*
3912 Skip this index if it is marked disabled in the copied
3913 (and possibly inverted) key_map.
3914 */
3915 if (! maria_is_key_active(key_map, sort_param.key))
3916 {
3917 /* Remember old statistics for key */
3918 memcpy((char*) rec_per_key_part,
3919 (char*) (share->state.rec_per_key_part +
3920 (uint) (rec_per_key_part - param->new_rec_per_key_part)),
3921 sort_param.keyinfo->keysegs*sizeof(*rec_per_key_part));
3922 DBUG_PRINT("repair", ("skipping seemingly disabled index #: %u",
3923 sort_param.key));
3924 continue;
3925 }
3926
3927 if ((!(param->testflag & T_SILENT)))
3928 printf ("- Fixing index %d\n",sort_param.key+1);
3929
3930 sort_param.read_cache=param->read_cache;
3931 sort_param.seg=sort_param.keyinfo->seg;
3932 sort_param.max_pos= sort_param.pos= org_header_length;
3933 keyseg=sort_param.seg;
3934 bzero((char*) sort_param.unique,sizeof(sort_param.unique));
3935 sort_param.key_length=share->rec_reflength;
3936 for (i=0 ; keyseg[i].type != HA_KEYTYPE_END; i++)
3937 {
3938 sort_param.key_length+=keyseg[i].length;
3939 if (keyseg[i].flag & HA_SPACE_PACK)
3940 sort_param.key_length+=get_pack_length(keyseg[i].length);
3941 if (keyseg[i].flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
3942 sort_param.key_length+= 2 + MY_TEST(keyseg[i].length >= 127);
3943 if (keyseg[i].flag & HA_NULL_PART)
3944 sort_param.key_length++;
3945 }
3946 share->state.state.records=share->state.state.del=share->state.split=0;
3947 share->state.state.empty=0;
3948
3949 if (sort_param.keyinfo->flag & HA_FULLTEXT)
3950 {
3951 uint ft_max_word_len_for_sort=FT_MAX_WORD_LEN_FOR_SORT*
3952 sort_param.keyinfo->seg->charset->mbmaxlen;
3953 sort_param.key_length+=ft_max_word_len_for_sort-HA_FT_MAXBYTELEN;
3954 /*
3955 fulltext indexes may have much more entries than the
3956 number of rows in the table. We estimate the number here.
3957
3958 Note, built-in parser is always nr. 0 - see ftparser_call_initializer()
3959 */
3960 if (sort_param.keyinfo->ftkey_nr == 0)
3961 {
3962 /*
3963 for built-in parser the number of generated index entries
3964 cannot be larger than the size of the data file divided
3965 by the minimal word's length
3966 */
3967 sort_info.max_records=
3968 (ha_rows) (sort_info.filelength/ft_min_word_len+1);
3969 }
3970 else
3971 {
3972 /*
3973 for external plugin parser we cannot tell anything at all :(
3974 so, we'll use all the sort memory and start from ~10 buffpeks.
3975 (see _ma_create_index_by_sort)
3976 */
3977 sort_info.max_records=
3978 10*param->sort_buffer_length/sort_param.key_length;
3979 }
3980
3981 sort_param.key_read= sort_maria_ft_key_read;
3982 sort_param.key_write= sort_maria_ft_key_write;
3983 }
3984 else
3985 {
3986 sort_param.key_read= sort_key_read;
3987 sort_param.key_write= sort_key_write;
3988 }
3989
3990 if (sort_info.new_info->s->data_file_type == BLOCK_RECORD)
3991 {
3992 scan_inited= 1;
3993 if (maria_scan_init(sort_info.info))
3994 goto err;
3995 }
3996 if (_ma_create_index_by_sort(&sort_param,
3997 (my_bool) (!(param->testflag & T_VERBOSE)),
3998 (size_t) param->sort_buffer_length))
3999 {
4000 if ((param->testflag & T_CREATE_UNIQUE_BY_SORT) && sort_param.sort_info->dupp)
4001 share->state.dupp_key= sort_param.key;
4002 else
4003 param->retry_repair= 1;
4004 _ma_check_print_error(param, "Create index by sort failed");
4005 goto err;
4006 }
4007 DBUG_EXECUTE_IF("maria_flush_whole_log",
4008 {
4009 DBUG_PRINT("maria_flush_whole_log", ("now"));
4010 translog_flush(translog_get_horizon());
4011 });
4012 DBUG_EXECUTE_IF("maria_crash_create_index_by_sort",
4013 {
4014 DBUG_PRINT("maria_crash_create_index_by_sort", ("now"));
4015 DBUG_SUICIDE();
4016 });
4017 if (scan_inited)
4018 {
4019 scan_inited= 0;
4020 maria_scan_end(sort_info.info);
4021 }
4022
4023 /* No need to calculate checksum again. */
4024 sort_param.calc_checksum= 0;
4025 free_root(&sort_param.wordroot, MYF(0));
4026
4027 /* Set for next loop */
4028 sort_info.max_records= (ha_rows) sort_info.new_info->s->state.state.records;
4029 param->stage++; /* Next stage */
4030 param->progress= 0;
4031
4032 if (param->testflag & T_STATISTICS)
4033 maria_update_key_parts(sort_param.keyinfo, rec_per_key_part,
4034 sort_param.unique,
4035 (param->stats_method ==
4036 MI_STATS_METHOD_IGNORE_NULLS ?
4037 sort_param.notnull : NULL),
4038 (ulonglong) share->state.state.records);
4039 maria_set_key_active(share->state.key_map, sort_param.key);
4040 DBUG_PRINT("repair", ("set enabled index #: %u", sort_param.key));
4041
4042 if (_ma_flush_table_files_before_swap(param, info))
4043 goto err;
4044
4045 if (sort_param.fix_datafile)
4046 {
4047 param->read_cache.end_of_file=sort_param.filepos;
4048 if (maria_write_data_suffix(&sort_info,1) ||
4049 end_io_cache(&sort_info.new_info->rec_cache))
4050 {
4051 _ma_check_print_error(param, "Got error when flushing row cache");
4052 goto err;
4053 }
4054 sort_info.new_info->opt_flag&= ~WRITE_CACHE_USED;
4055
4056 if (param->testflag & T_SAFE_REPAIR)
4057 {
4058 /* Don't repair if we loosed more than one row */
4059 if (sort_info.new_info->s->state.state.records+1 < start_records)
4060 {
4061 _ma_check_print_error(param,
4062 "Rows lost (Found %lu of %lu); Aborting "
4063 "because safe repair was requested",
4064 (ulong) sort_info.new_info->s->
4065 state.state.records,
4066 (ulong) start_records);
4067 share->state.state.records=start_records;
4068 goto err;
4069 }
4070 }
4071
4072 sort_info.new_info->s->state.state.data_file_length= sort_param.filepos;
4073 if (sort_info.new_info != sort_info.info)
4074 {
4075 MARIA_STATE_INFO save_state= sort_info.new_info->s->state;
4076 if (maria_close(sort_info.new_info))
4077 {
4078 _ma_check_print_error(param, "Got error %d on close", my_errno);
4079 goto err;
4080 }
4081 copy_data_file_state(&share->state, &save_state);
4082 new_file= -1;
4083 sort_info.new_info= info;
4084 info->rec_cache.file= info->dfile.file;
4085 }
4086
4087 share->state.version=(ulong) time((time_t*) 0); /* Force reopen */
4088
4089 /* Replace the actual file with the temporary file */
4090 if (new_file >= 0)
4091 {
4092 mysql_file_close(new_file, MYF(MY_WME));
4093 new_file= -1;
4094 }
4095 change_data_file_descriptor(info, -1);
4096 if (maria_change_to_newfile(share->data_file_name.str, MARIA_NAME_DEXT,
4097 DATA_TMP_EXT, param->backup_time,
4098 (param->testflag & T_BACKUP_DATA ?
4099 MYF(MY_REDEL_MAKE_BACKUP): MYF(0)) |
4100 sync_dir) ||
4101 _ma_open_datafile(info, share))
4102 {
4103 _ma_check_print_error(param, "Couldn't change to new data file");
4104 goto err;
4105 }
4106 if (param->testflag & T_UNPACK)
4107 restore_data_file_type(share);
4108
4109 org_header_length= share->pack.header_length;
4110 sort_info.org_data_file_type= share->data_file_type;
4111 sort_info.filelength= share->state.state.data_file_length;
4112 sort_param.fix_datafile=0;
4113
4114 /* Offsets are now in proportion to the new file length */
4115 param->max_progress= sort_info.filelength;
4116
4117 }
4118 else
4119 share->state.state.data_file_length=sort_param.max_pos;
4120
4121 param->read_cache.file= info->dfile.file; /* re-init read cache */
4122 if (share->data_file_type != BLOCK_RECORD)
4123 reinit_io_cache(¶m->read_cache, READ_CACHE,
4124 share->pack.header_length, 1, 1);
4125 }
4126
4127 if (param->testflag & T_WRITE_LOOP)
4128 {
4129 fputs(" \r",stdout);
4130 fflush(stdout);
4131 }
4132
4133 if (rep_quick && del+sort_info.dupp != share->state.state.del)
4134 {
4135 _ma_check_print_error(param,"Couldn't fix table with quick recovery: "
4136 "Found wrong number of deleted records");
4137 _ma_check_print_error(param,"Run recovery again without -q");
4138 got_error=1;
4139 param->retry_repair=1;
4140 param->testflag|=T_RETRY_WITHOUT_QUICK;
4141 goto err;
4142 }
4143
4144 if (rep_quick && (param->testflag & T_FORCE_UNIQUENESS))
4145 {
4146 my_off_t skr= share->state.state.data_file_length +
4147 ((sort_info.org_data_file_type == COMPRESSED_RECORD) ?
4148 MEMMAP_EXTRA_MARGIN : 0);
4149 #ifdef USE_RELOC
4150 if (sort_info.org_data_file_type == STATIC_RECORD &&
4151 skr < share->base.reloc*share->base.min_pack_length)
4152 skr=share->base.reloc*share->base.min_pack_length;
4153 #endif
4154 if (skr != sort_info.filelength)
4155 if (mysql_file_chsize(info->dfile.file, skr, 0, MYF(0)))
4156 _ma_check_print_warning(param,
4157 "Can't change size of datafile, error: %d",
4158 my_errno);
4159 }
4160
4161 if (param->testflag & T_CALC_CHECKSUM)
4162 share->state.state.checksum=param->glob_crc;
4163
4164 if (mysql_file_chsize(share->kfile.file,
4165 share->state.state.key_file_length, 0, MYF(0)))
4166 _ma_check_print_warning(param,
4167 "Can't change size of indexfile, error: %d",
4168 my_errno);
4169
4170 if (!(param->testflag & T_SILENT))
4171 {
4172 if (start_records != share->state.state.records)
4173 printf("Data records: %s\n", llstr(share->state.state.records,llbuff));
4174 }
4175 if (sort_info.dupp)
4176 _ma_check_print_warning(param,
4177 "%s records have been removed",
4178 llstr(sort_info.dupp,llbuff));
4179 got_error=0;
4180 /* If invoked by external program that uses thr_lock */
4181 if (&share->state.state != info->state)
4182 *info->state= *info->state_start= share->state.state;
4183
4184 err:
4185 if (scan_inited)
4186 maria_scan_end(sort_info.info);
4187 _ma_reset_state(info);
4188
4189 if (sort_info.new_info)
4190 {
4191 end_io_cache(&sort_info.new_info->rec_cache);
4192 sort_info.new_info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
4193 }
4194 end_io_cache(¶m->read_cache);
4195 info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
4196 if (got_error)
4197 {
4198 if (! param->error_printed)
4199 _ma_check_print_error(param,"%d when fixing table",my_errno);
4200 (void)_ma_flush_table_files_before_swap(param, info);
4201 if (sort_info.new_info && sort_info.new_info != sort_info.info)
4202 {
4203 unuse_data_file_descriptor(sort_info.new_info);
4204 maria_close(sort_info.new_info);
4205 }
4206 if (new_file >= 0)
4207 {
4208 mysql_file_close(new_file, MYF(0));
4209 mysql_file_delete(key_file_tmp, param->temp_filename, MYF(MY_WME));
4210 }
4211 maria_mark_crashed_on_repair(info);
4212 }
4213 else
4214 {
4215 if (key_map == share->state.key_map)
4216 share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
4217 /*
4218 Now that we have flushed and forced everything, we can bump
4219 create_rename_lsn:
4220 */
4221 DBUG_EXECUTE_IF("maria_flush_whole_log",
4222 {
4223 DBUG_PRINT("maria_flush_whole_log", ("now"));
4224 translog_flush(translog_get_horizon());
4225 });
4226 DBUG_EXECUTE_IF("maria_crash_repair",
4227 {
4228 DBUG_PRINT("maria_crash_repair", ("now"));
4229 DBUG_SUICIDE();
4230 });
4231 }
4232 share->state.changed|= STATE_NOT_SORTED_PAGES;
4233 if (!rep_quick)
4234 share->state.changed&= ~(STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
4235 STATE_NOT_MOVABLE);
4236
4237 /* If caller had disabled logging it's not up to us to re-enable it */
4238 if (reenable_logging)
4239 _ma_reenable_logging_for_table(info, FALSE);
4240 restore_table_state_after_repair(info, &backup_share);
4241
4242 my_free(sort_param.rec_buff);
4243 my_free(sort_param.record);
4244 my_free(sort_info.key_block);
4245 my_free(sort_info.ft_buf);
4246 my_free(sort_info.buff);
4247 DBUG_RETURN(got_error);
4248 }
4249
4250
4251 /*
4252 Threaded repair of table using sorting
4253
4254 SYNOPSIS
4255 maria_repair_parallel()
4256 param Repair parameters
4257 info MARIA handler to repair
4258 name Name of table (for warnings)
4259 rep_quick set to <> 0 if we should not change data file
4260
4261 DESCRIPTION
4262 Same as maria_repair_by_sort but do it multithreaded
4263 Each key is handled by a separate thread.
4264 TODO: make a number of threads a parameter
4265
4266 In parallel repair we use one thread per index. There are two modes:
4267
4268 Quick
4269
4270 Only the indexes are rebuilt. All threads share a read buffer.
4271 Every thread that needs fresh data in the buffer enters the shared
4272 cache lock. The last thread joining the lock reads the buffer from
4273 the data file and wakes all other threads.
4274
4275 Non-quick
4276
4277 The data file is rebuilt and all indexes are rebuilt to point to
4278 the new record positions. One thread is the master thread. It
4279 reads from the old data file and writes to the new data file. It
4280 also creates one of the indexes. The other threads read from a
4281 buffer which is filled by the master. If they need fresh data,
4282 they enter the shared cache lock. If the masters write buffer is
4283 full, it flushes it to the new data file and enters the shared
4284 cache lock too. When all threads joined in the lock, the master
4285 copies its write buffer to the read buffer for the other threads
4286 and wakes them.
4287
4288 RESULT
4289 0 ok
4290 <>0 Error
4291 */
4292
maria_repair_parallel(HA_CHECK * param,register MARIA_HA * info,const char * name,my_bool rep_quick)4293 int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
4294 const char * name, my_bool rep_quick)
4295 {
4296 int got_error;
4297 uint i,key, istep;
4298 ha_rows start_records;
4299 my_off_t new_header_length,del;
4300 File new_file;
4301 MARIA_SORT_PARAM *sort_param=0, tmp_sort_param;
4302 MARIA_SHARE *share= info->s;
4303 double *rec_per_key_part;
4304 HA_KEYSEG *keyseg;
4305 char llbuff[22];
4306 IO_CACHE new_data_cache; /* For non-quick repair. */
4307 IO_CACHE_SHARE io_share;
4308 MARIA_SORT_INFO sort_info;
4309 MARIA_SHARE backup_share;
4310 ulonglong UNINIT_VAR(key_map);
4311 pthread_attr_t thr_attr;
4312 myf sync_dir= ((share->now_transactional && !share->temporary) ?
4313 MY_SYNC_DIR : 0);
4314 my_bool reenable_logging= 0;
4315 DBUG_ENTER("maria_repair_parallel");
4316
4317 got_error= 1;
4318 new_file= -1;
4319 start_records= share->state.state.records;
4320 if (!(param->testflag & T_SILENT))
4321 {
4322 printf("- parallel recovering (with sort) Aria-table '%s'\n",name);
4323 printf("Data records: %s\n", llstr(start_records, llbuff));
4324 }
4325
4326 bzero(&new_data_cache, sizeof(new_data_cache));
4327 if (initialize_variables_for_repair(param, &sort_info, &tmp_sort_param, info,
4328 rep_quick, &backup_share))
4329 goto err;
4330
4331 if ((reenable_logging= share->now_transactional))
4332 _ma_tmp_disable_logging_for_table(info, 0);
4333
4334 new_header_length= ((param->testflag & T_UNPACK) ? 0 :
4335 share->pack.header_length);
4336
4337 /*
4338 Quick repair (not touching data file, rebuilding indexes):
4339 {
4340 Read cache is (HA_CHECK *param)->read_cache using info->dfile.file.
4341 }
4342
4343 Non-quick repair (rebuilding data file and indexes):
4344 {
4345 Master thread:
4346
4347 Read cache is (HA_CHECK *param)->read_cache using info->dfile.file.
4348 Write cache is (MARIA_INFO *info)->rec_cache using new_file.
4349
4350 Slave threads:
4351
4352 Read cache is new_data_cache synced to master rec_cache.
4353
4354 The final assignment of the filedescriptor for rec_cache is done
4355 after the cache creation.
4356
4357 Don't check file size on new_data_cache, as the resulting file size
4358 is not known yet.
4359
4360 As rec_cache and new_data_cache are synced, write_buffer_length is
4361 used for the read cache 'new_data_cache'. Both start at the same
4362 position 'new_header_length'.
4363 }
4364 */
4365 DBUG_PRINT("info", ("is quick repair: %d", (int) rep_quick));
4366 if (!rep_quick)
4367 my_b_clear(&new_data_cache);
4368
4369 /* Initialize pthread structures before goto err. */
4370 mysql_mutex_init(key_SORT_INFO_mutex, &sort_info.mutex, MY_MUTEX_INIT_FAST);
4371 mysql_cond_init(key_SORT_INFO_cond, &sort_info.cond, 0);
4372
4373 if (!(sort_info.key_block=
4374 alloc_key_blocks(param, (uint) param->sort_key_blocks,
4375 share->base.max_key_block_length)))
4376 goto err;
4377
4378 if (init_io_cache(¶m->read_cache, info->dfile.file,
4379 (uint) param->read_buffer_length,
4380 READ_CACHE, share->pack.header_length, 1, MYF(MY_WME)))
4381 goto err;
4382
4383 sort_info.key_block_end=sort_info.key_block+param->sort_key_blocks;
4384 info->opt_flag|=WRITE_CACHE_USED;
4385 info->rec_cache.file= info->dfile.file; /* for sort_delete_record */
4386
4387 if (!rep_quick)
4388 {
4389 /* Get real path for data file */
4390 if ((new_file= mysql_file_create(key_file_tmp,
4391 fn_format(param->temp_filename,
4392 share->data_file_name.str, "",
4393 DATA_TMP_EXT,
4394 2+4),
4395 0,param->tmpfile_createflag,
4396 MYF(0))) < 0)
4397 {
4398 _ma_check_print_error(param,"Can't create new tempfile: '%s'",
4399 param->temp_filename);
4400 goto err;
4401 }
4402 if (new_header_length &&
4403 maria_filecopy(param, new_file, info->dfile.file,0L,new_header_length,
4404 "datafile-header"))
4405 goto err;
4406 if (param->testflag & T_UNPACK)
4407 restore_data_file_type(share);
4408 share->state.dellink= HA_OFFSET_ERROR;
4409
4410 if (init_io_cache(&new_data_cache, -1,
4411 (uint) param->write_buffer_length,
4412 READ_CACHE, new_header_length, 1,
4413 MYF(MY_WME | MY_DONT_CHECK_FILESIZE)))
4414 goto err;
4415
4416 if (init_io_cache(&info->rec_cache, new_file,
4417 (uint) param->write_buffer_length,
4418 WRITE_CACHE, new_header_length, 1,
4419 MYF(MY_WME | MY_WAIT_IF_FULL) & param->myf_rw))
4420 goto err;
4421
4422 }
4423
4424 /* Optionally drop indexes and optionally modify the key_map. */
4425 maria_drop_all_indexes(param, info, FALSE);
4426 key_map= share->state.key_map;
4427 if (param->testflag & T_CREATE_MISSING_KEYS)
4428 {
4429 /* Invert the copied key_map to recreate all disabled indexes. */
4430 key_map= ~key_map;
4431 }
4432
4433 param->read_cache.end_of_file= sort_info.filelength;
4434
4435 /*
4436 +1 below is required hack for parallel repair mode.
4437 The share->state.state.records value, that is compared later
4438 to sort_info.max_records and cannot exceed it, is
4439 increased in sort_key_write. In maria_repair_by_sort, sort_key_write
4440 is called after sort_key_read, where the comparison is performed,
4441 but in parallel mode master thread can call sort_key_write
4442 before some other repair thread calls sort_key_read.
4443 Furthermore I'm not even sure +1 would be enough.
4444 May be sort_info.max_records shold be always set to max value in
4445 parallel mode.
4446 */
4447 sort_info.max_records++;
4448
4449 del=share->state.state.del;
4450
4451 if (!(sort_param=(MARIA_SORT_PARAM *)
4452 my_malloc(PSI_INSTRUMENT_ME, (uint) share->base.keys *
4453 (sizeof(MARIA_SORT_PARAM) + share->base.pack_reclength),
4454 MYF(MY_ZEROFILL))))
4455 {
4456 _ma_check_print_error(param,"Not enough memory for key!");
4457 goto err;
4458 }
4459 #ifdef USING_SECOND_APPROACH
4460 uint total_key_length=0;
4461 #endif
4462 rec_per_key_part= param->new_rec_per_key_part;
4463 share->state.state.records=share->state.state.del=share->state.split=0;
4464 share->state.state.empty=0;
4465
4466 for (i=key=0, istep=1 ; key < share->base.keys ;
4467 rec_per_key_part+=sort_param[i].keyinfo->keysegs, i+=istep, key++)
4468 {
4469 sort_param[i].key=key;
4470 sort_param[i].keyinfo=share->keyinfo+key;
4471 sort_param[i].seg=sort_param[i].keyinfo->seg;
4472 /*
4473 Skip this index if it is marked disabled in the copied
4474 (and possibly inverted) key_map.
4475 */
4476 if (! maria_is_key_active(key_map, key))
4477 {
4478 /* Remember old statistics for key */
4479 memcpy((char*) rec_per_key_part,
4480 (char*) (share->state.rec_per_key_part+
4481 (uint) (rec_per_key_part - param->new_rec_per_key_part)),
4482 sort_param[i].keyinfo->keysegs*sizeof(*rec_per_key_part));
4483 istep=0;
4484 continue;
4485 }
4486 istep=1;
4487 if ((!(param->testflag & T_SILENT)))
4488 printf ("- Fixing index %d\n",key+1);
4489 if (sort_param[i].keyinfo->flag & HA_FULLTEXT)
4490 {
4491 sort_param[i].key_read=sort_maria_ft_key_read;
4492 sort_param[i].key_write=sort_maria_ft_key_write;
4493 }
4494 else
4495 {
4496 sort_param[i].key_read=sort_key_read;
4497 sort_param[i].key_write=sort_key_write;
4498 }
4499 sort_param[i].key_cmp=sort_key_cmp;
4500 sort_param[i].lock_in_memory=maria_lock_memory;
4501 sort_param[i].tmpdir=param->tmpdir;
4502 sort_param[i].sort_info=&sort_info;
4503 sort_param[i].master=0;
4504 sort_param[i].fix_datafile=0;
4505 sort_param[i].calc_checksum= 0;
4506
4507 sort_param[i].filepos=new_header_length;
4508 sort_param[i].max_pos=sort_param[i].pos=share->pack.header_length;
4509
4510 sort_param[i].record= (((uchar *)(sort_param+share->base.keys))+
4511 (share->base.pack_reclength * i));
4512 if (_ma_alloc_buffer(&sort_param[i].rec_buff, &sort_param[i].rec_buff_size,
4513 share->base.default_rec_buff_size, MYF(0)))
4514 {
4515 _ma_check_print_error(param,"Not enough memory!");
4516 goto err;
4517 }
4518 sort_param[i].key_length=share->rec_reflength;
4519 for (keyseg=sort_param[i].seg; keyseg->type != HA_KEYTYPE_END;
4520 keyseg++)
4521 {
4522 sort_param[i].key_length+=keyseg->length;
4523 if (keyseg->flag & HA_SPACE_PACK)
4524 sort_param[i].key_length+=get_pack_length(keyseg->length);
4525 if (keyseg->flag & (HA_BLOB_PART | HA_VAR_LENGTH_PART))
4526 sort_param[i].key_length+= 2 + MY_TEST(keyseg->length >= 127);
4527 if (keyseg->flag & HA_NULL_PART)
4528 sort_param[i].key_length++;
4529 }
4530 #ifdef USING_SECOND_APPROACH
4531 total_key_length+=sort_param[i].key_length;
4532 #endif
4533
4534 if (sort_param[i].keyinfo->flag & HA_FULLTEXT)
4535 {
4536 uint ft_max_word_len_for_sort=
4537 (FT_MAX_WORD_LEN_FOR_SORT *
4538 sort_param[i].keyinfo->seg->charset->mbmaxlen);
4539 sort_param[i].key_length+=ft_max_word_len_for_sort-HA_FT_MAXBYTELEN;
4540 init_alloc_root(PSI_INSTRUMENT_ME, &sort_param[i].wordroot, FTPARSER_MEMROOT_ALLOC_SIZE, 0,
4541 MYF(param->malloc_flags));
4542 }
4543 }
4544 sort_info.total_keys=i;
4545 sort_param[0].master= 1;
4546 sort_param[0].fix_datafile= ! rep_quick;
4547 sort_param[0].calc_checksum= MY_TEST(param->testflag & T_CALC_CHECKSUM);
4548
4549 if (!maria_ftparser_alloc_param(info))
4550 goto err;
4551
4552 sort_info.got_error=0;
4553 mysql_mutex_lock(&sort_info.mutex);
4554
4555 /*
4556 Initialize the I/O cache share for use with the read caches and, in
4557 case of non-quick repair, the write cache. When all threads join on
4558 the cache lock, the writer copies the write cache contents to the
4559 read caches.
4560 */
4561 if (i > 1)
4562 {
4563 if (rep_quick)
4564 init_io_cache_share(¶m->read_cache, &io_share, NULL, i);
4565 else
4566 init_io_cache_share(&new_data_cache, &io_share, &info->rec_cache, i);
4567 }
4568 else
4569 io_share.total_threads= 0; /* share not used */
4570
4571 (void) pthread_attr_init(&thr_attr);
4572 (void) pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
4573 (void) my_setstacksize(&thr_attr, (size_t)my_thread_stack_size);
4574
4575 for (i=0 ; i < sort_info.total_keys ; i++)
4576 {
4577 /*
4578 Copy the properly initialized IO_CACHE structure so that every
4579 thread has its own copy. In quick mode param->read_cache is shared
4580 for use by all threads. In non-quick mode all threads but the
4581 first copy the shared new_data_cache, which is synchronized to the
4582 write cache of the first thread. The first thread copies
4583 param->read_cache, which is not shared.
4584 */
4585 sort_param[i].read_cache= ((rep_quick || !i) ? param->read_cache :
4586 new_data_cache);
4587 DBUG_PRINT("io_cache_share", ("thread: %u read_cache: %p",
4588 i, &sort_param[i].read_cache));
4589
4590 /*
4591 two approaches: the same amount of memory for each thread
4592 or the memory for the same number of keys for each thread...
4593 In the second one all the threads will fill their sort_buffers
4594 (and call write_keys) at the same time, putting more stress on i/o.
4595 */
4596 sort_param[i].sortbuff_size=
4597 #ifndef USING_SECOND_APPROACH
4598 param->sort_buffer_length/sort_info.total_keys;
4599 #else
4600 param->sort_buffer_length*sort_param[i].key_length/total_key_length;
4601 #endif
4602 if (mysql_thread_create(key_thread_find_all_keys,
4603 &sort_param[i].thr, &thr_attr,
4604 _ma_thr_find_all_keys, (void *) (sort_param+i)))
4605 {
4606 _ma_check_print_error(param,"Cannot start a repair thread");
4607 /* Cleanup: Detach from the share. Avoid others to be blocked. */
4608 if (io_share.total_threads)
4609 remove_io_thread(&sort_param[i].read_cache);
4610 DBUG_PRINT("error", ("Cannot start a repair thread"));
4611 sort_info.got_error=1;
4612 }
4613 else
4614 sort_info.threads_running++;
4615 }
4616 (void) pthread_attr_destroy(&thr_attr);
4617
4618 /* waiting for all threads to finish */
4619 while (sort_info.threads_running)
4620 mysql_cond_wait(&sort_info.cond, &sort_info.mutex);
4621 mysql_mutex_unlock(&sort_info.mutex);
4622
4623 if ((got_error= _ma_thr_write_keys(sort_param)))
4624 {
4625 param->retry_repair=1;
4626 goto err;
4627 }
4628 got_error=1; /* Assume the following may go wrong */
4629
4630 if (_ma_flush_table_files_before_swap(param, info))
4631 goto err;
4632
4633 if (sort_param[0].fix_datafile)
4634 {
4635 /*
4636 Append some nulls to the end of a memory mapped file. Destroy the
4637 write cache. The master thread did already detach from the share
4638 by remove_io_thread() in sort.c:thr_find_all_keys().
4639 */
4640 if (maria_write_data_suffix(&sort_info,1) ||
4641 end_io_cache(&info->rec_cache))
4642 goto err;
4643 if (param->testflag & T_SAFE_REPAIR)
4644 {
4645 /* Don't repair if we loosed more than one row */
4646 if (sort_info.new_info->s->state.state.records+1 < start_records)
4647 {
4648 _ma_check_print_error(param,
4649 "Rows lost (Found %lu of %lu); Aborting "
4650 "because safe repair was requested",
4651 (ulong) share->state.state.records,
4652 (ulong) start_records);
4653 share->state.state.records=start_records;
4654 goto err;
4655 }
4656 }
4657 share->state.state.data_file_length= sort_param->filepos;
4658 /* Only whole records */
4659 share->state.version= (ulong) time((time_t*) 0);
4660 /*
4661 Exchange the data file descriptor of the table, so that we use the
4662 new file from now on.
4663 */
4664 mysql_file_close(info->dfile.file, MYF(0));
4665 info->dfile.file= new_file;
4666 share->pack.header_length=(ulong) new_header_length;
4667 }
4668 else
4669 share->state.state.data_file_length=sort_param->max_pos;
4670
4671 if (rep_quick && del+sort_info.dupp != share->state.state.del)
4672 {
4673 _ma_check_print_error(param,"Couldn't fix table with quick recovery: "
4674 "Found wrong number of deleted records");
4675 _ma_check_print_error(param,"Run recovery again without -q");
4676 param->retry_repair=1;
4677 param->testflag|=T_RETRY_WITHOUT_QUICK;
4678 goto err;
4679 }
4680
4681 if (rep_quick && (param->testflag & T_FORCE_UNIQUENESS))
4682 {
4683 my_off_t skr= share->state.state.data_file_length +
4684 ((sort_info.org_data_file_type == COMPRESSED_RECORD) ?
4685 MEMMAP_EXTRA_MARGIN : 0);
4686 #ifdef USE_RELOC
4687 if (sort_info.org_data_file_type == STATIC_RECORD &&
4688 skr < share->base.reloc*share->base.min_pack_length)
4689 skr=share->base.reloc*share->base.min_pack_length;
4690 #endif
4691 if (skr != sort_info.filelength)
4692 if (mysql_file_chsize(info->dfile.file, skr, 0, MYF(0)))
4693 _ma_check_print_warning(param,
4694 "Can't change size of datafile, error: %d",
4695 my_errno);
4696 }
4697 if (param->testflag & T_CALC_CHECKSUM)
4698 share->state.state.checksum=param->glob_crc;
4699
4700 if (mysql_file_chsize(share->kfile.file,
4701 share->state.state.key_file_length, 0, MYF(0)))
4702 _ma_check_print_warning(param,
4703 "Can't change size of indexfile, error: %d",
4704 my_errno);
4705
4706 if (!(param->testflag & T_SILENT))
4707 {
4708 if (start_records != share->state.state.records)
4709 printf("Data records: %s\n", llstr(share->state.state.records,llbuff));
4710 }
4711 if (sort_info.dupp)
4712 _ma_check_print_warning(param,
4713 "%s records have been removed",
4714 llstr(sort_info.dupp,llbuff));
4715 got_error=0;
4716 /* If invoked by external program that uses thr_lock */
4717 if (&share->state.state != info->state)
4718 *info->state= *info->state_start= share->state.state;
4719
4720 err:
4721 _ma_reset_state(info);
4722
4723 /*
4724 Destroy the write cache. The master thread did already detach from
4725 the share by remove_io_thread() or it was not yet started (if the
4726 error happend before creating the thread).
4727 */
4728 if (sort_info.new_info)
4729 {
4730 end_io_cache(&sort_info.new_info->rec_cache);
4731 sort_info.new_info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
4732 }
4733 end_io_cache(¶m->read_cache);
4734 info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
4735 /*
4736 Destroy the new data cache in case of non-quick repair. All slave
4737 threads did either detach from the share by remove_io_thread()
4738 already or they were not yet started (if the error happend before
4739 creating the threads).
4740 */
4741 if (!rep_quick && my_b_inited(&new_data_cache))
4742 end_io_cache(&new_data_cache);
4743 if (!got_error)
4744 {
4745 /* Replace the actual file with the temporary file */
4746 if (new_file >= 0)
4747 {
4748 mysql_file_close(new_file,MYF(0));
4749 info->dfile.file= new_file= -1;
4750 if (maria_change_to_newfile(share->data_file_name.str, MARIA_NAME_DEXT,
4751 DATA_TMP_EXT, param->backup_time,
4752 MYF((param->testflag & T_BACKUP_DATA ?
4753 MY_REDEL_MAKE_BACKUP : 0) |
4754 sync_dir)) ||
4755 _ma_open_datafile(info,share))
4756 got_error=1;
4757 }
4758 }
4759 if (got_error)
4760 {
4761 if (! param->error_printed)
4762 _ma_check_print_error(param,"%d when fixing table",my_errno);
4763 (void)_ma_flush_table_files_before_swap(param, info);
4764 if (new_file >= 0)
4765 {
4766 mysql_file_close(new_file,MYF(0));
4767 mysql_file_delete(key_file_tmp, param->temp_filename, MYF(MY_WME));
4768 if (info->dfile.file == new_file)
4769 info->dfile.file= -1;
4770 }
4771 maria_mark_crashed_on_repair(info);
4772 }
4773 else if (key_map == share->state.key_map)
4774 share->state.changed&= ~STATE_NOT_OPTIMIZED_KEYS;
4775 share->state.changed|= STATE_NOT_SORTED_PAGES;
4776 if (!rep_quick)
4777 share->state.changed&= ~(STATE_NOT_OPTIMIZED_ROWS | STATE_NOT_ZEROFILLED |
4778 STATE_NOT_MOVABLE);
4779
4780 mysql_cond_destroy (&sort_info.cond);
4781 mysql_mutex_destroy(&sort_info.mutex);
4782
4783 /* If caller had disabled logging it's not up to us to re-enable it */
4784 if (reenable_logging)
4785 _ma_reenable_logging_for_table(info, FALSE);
4786 restore_table_state_after_repair(info, &backup_share);
4787
4788 my_free(sort_info.ft_buf);
4789 my_free(sort_info.key_block);
4790 my_free(sort_param);
4791 my_free(sort_info.buff);
4792 if (!got_error && (param->testflag & T_UNPACK))
4793 restore_data_file_type(share);
4794 DBUG_RETURN(got_error);
4795 }
4796
4797 /* Read next record and return next key */
4798
sort_key_read(MARIA_SORT_PARAM * sort_param,uchar * key)4799 static int sort_key_read(MARIA_SORT_PARAM *sort_param, uchar *key)
4800 {
4801 int error;
4802 MARIA_SORT_INFO *sort_info= sort_param->sort_info;
4803 MARIA_HA *info= sort_info->info;
4804 MARIA_KEY int_key;
4805 DBUG_ENTER("sort_key_read");
4806
4807 if ((error=sort_get_next_record(sort_param)))
4808 DBUG_RETURN(error);
4809 if (info->s->state.state.records == sort_info->max_records)
4810 {
4811 _ma_check_print_error(sort_info->param,
4812 "Key %d - Found too many records; Can't continue",
4813 sort_param->key+1);
4814 DBUG_RETURN(1);
4815 }
4816 if (_ma_sort_write_record(sort_param))
4817 DBUG_RETURN(1);
4818
4819 (*info->s->keyinfo[sort_param->key].make_key)(info, &int_key,
4820 sort_param->key, key,
4821 sort_param->record,
4822 sort_param->current_filepos,
4823 0);
4824 sort_param->real_key_length= int_key.data_length + int_key.ref_length;
4825 #ifdef HAVE_valgrind
4826 bzero(key+sort_param->real_key_length,
4827 (sort_param->key_length-sort_param->real_key_length));
4828 #endif
4829 DBUG_RETURN(0);
4830 } /* sort_key_read */
4831
4832
sort_maria_ft_key_read(MARIA_SORT_PARAM * sort_param,uchar * key)4833 static int sort_maria_ft_key_read(MARIA_SORT_PARAM *sort_param, uchar *key)
4834 {
4835 int error;
4836 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
4837 MARIA_HA *info=sort_info->info;
4838 FT_WORD *wptr=0;
4839 MARIA_KEY int_key;
4840 DBUG_ENTER("sort_maria_ft_key_read");
4841
4842 if (!sort_param->wordlist)
4843 {
4844 for (;;)
4845 {
4846 free_root(&sort_param->wordroot, MYF(MY_MARK_BLOCKS_FREE));
4847 if ((error=sort_get_next_record(sort_param)))
4848 DBUG_RETURN(error);
4849 if ((error= _ma_sort_write_record(sort_param)))
4850 DBUG_RETURN(error);
4851 if (!(wptr= _ma_ft_parserecord(info,sort_param->key,sort_param->record,
4852 &sort_param->wordroot)))
4853
4854 DBUG_RETURN(1);
4855 if (wptr->pos)
4856 break;
4857 }
4858 sort_param->wordptr=sort_param->wordlist=wptr;
4859 }
4860 else
4861 {
4862 error=0;
4863 wptr=(FT_WORD*)(sort_param->wordptr);
4864 }
4865
4866 _ma_ft_make_key(info, &int_key, sort_param->key, key, wptr++,
4867 sort_param->current_filepos);
4868 sort_param->real_key_length= int_key.data_length + int_key.ref_length;
4869
4870 #ifdef HAVE_valgrind
4871 if (sort_param->key_length > sort_param->real_key_length)
4872 bzero(key+sort_param->real_key_length,
4873 (sort_param->key_length-sort_param->real_key_length));
4874 #endif
4875 if (!wptr->pos)
4876 {
4877 free_root(&sort_param->wordroot, MYF(MY_MARK_BLOCKS_FREE));
4878 sort_param->wordlist=0;
4879 }
4880 else
4881 sort_param->wordptr=(void*)wptr;
4882
4883 DBUG_RETURN(error);
4884 } /* sort_maria_ft_key_read */
4885
4886
4887 /*
4888 Read next record from file using parameters in sort_info.
4889
4890 SYNOPSIS
4891 sort_get_next_record()
4892 sort_param Information about and for the sort process
4893
4894 NOTES
4895 Dynamic Records With Non-Quick Parallel Repair
4896
4897 For non-quick parallel repair we use a synchronized read/write
4898 cache. This means that one thread is the master who fixes the data
4899 file by reading each record from the old data file and writing it
4900 to the new data file. By doing this the records in the new data
4901 file are written contiguously. Whenever the write buffer is full,
4902 it is copied to the read buffer. The slaves read from the read
4903 buffer, which is not associated with a file. Thus read_cache.file
4904 is -1. When using _mi_read_cache(), the slaves must always set
4905 flag to READING_NEXT so that the function never tries to read from
4906 file. This is safe because the records are contiguous. There is no
4907 need to read outside the cache. This condition is evaluated in the
4908 variable 'parallel_flag' for quick reference. read_cache.file must
4909 be >= 0 in every other case.
4910
4911 RETURN
4912 -1 end of file
4913 0 ok
4914 sort_param->current_filepos points to record position.
4915 sort_param->record contains record
4916 sort_param->max_pos contains position to last byte read
4917 > 0 error
4918 */
4919
sort_get_next_record(MARIA_SORT_PARAM * sort_param)4920 static int sort_get_next_record(MARIA_SORT_PARAM *sort_param)
4921 {
4922 int searching;
4923 int parallel_flag;
4924 uint found_record,b_type,left_length;
4925 my_off_t pos;
4926 MARIA_BLOCK_INFO block_info;
4927 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
4928 HA_CHECK *param=sort_info->param;
4929 MARIA_HA *info=sort_info->info;
4930 MARIA_SHARE *share= info->s;
4931 char llbuff[22],llbuff2[22];
4932 DBUG_ENTER("sort_get_next_record");
4933
4934 if (_ma_killed_ptr(param))
4935 DBUG_RETURN(1);
4936 if (param->progress_counter++ >= WRITE_COUNT)
4937 {
4938 param->progress_counter= 0;
4939 _ma_report_progress(param, param->progress, param->max_progress);
4940 }
4941
4942 switch (sort_info->org_data_file_type) {
4943 case BLOCK_RECORD:
4944 {
4945 for (;;)
4946 {
4947 int flag;
4948 /*
4949 Assume table is transactional and it had LSN pages in the
4950 cache. Repair has flushed them, left data pages stay in
4951 cache, and disabled transactionality (so share's current page
4952 type is PLAIN); page cache would assert if it finds a cached LSN page
4953 while _ma_scan_block_record() requested a PLAIN page. So we use
4954 UNKNOWN.
4955 */
4956 enum pagecache_page_type save_page_type= share->page_type;
4957 share->page_type= PAGECACHE_READ_UNKNOWN_PAGE;
4958 if (info != sort_info->new_info)
4959 {
4960 /* Safe scanning */
4961 flag= _ma_safe_scan_block_record(sort_info, info,
4962 sort_param->record);
4963 }
4964 else
4965 {
4966 /*
4967 Scan on clean table.
4968 It requires a reliable data_file_length so we set it.
4969 */
4970 share->state.state.data_file_length= sort_info->filelength;
4971 info->cur_row.trid= 0;
4972 flag= _ma_scan_block_record(info, sort_param->record,
4973 info->cur_row.nextpos, 1);
4974 set_if_bigger(param->max_found_trid, info->cur_row.trid);
4975 if (info->cur_row.trid > param->max_trid)
4976 {
4977 _ma_check_print_not_visible_error(param, info->cur_row.trid);
4978 flag= HA_ERR_ROW_NOT_VISIBLE;
4979 }
4980 }
4981 param->progress= (ma_recordpos_to_page(info->cur_row.lastpos)*
4982 share->block_size);
4983
4984 share->page_type= save_page_type;
4985 if (!flag)
4986 {
4987 if (sort_param->calc_checksum)
4988 {
4989 ha_checksum checksum;
4990 checksum= (*share->calc_check_checksum)(info, sort_param->record);
4991 if (share->calc_checksum &&
4992 info->cur_row.checksum != (checksum & 255))
4993 {
4994 if (param->testflag & T_VERBOSE)
4995 {
4996 _ma_check_print_info(param,
4997 "Found record with wrong checksum at %s",
4998 record_pos_to_txt(info,
4999 info->cur_row.lastpos,
5000 llbuff));
5001
5002 }
5003 continue;
5004 }
5005 info->cur_row.checksum= checksum;
5006 param->glob_crc+= checksum;
5007 }
5008 sort_param->start_recpos= sort_param->current_filepos=
5009 info->cur_row.lastpos;
5010 DBUG_RETURN(0);
5011 }
5012 if (flag == HA_ERR_END_OF_FILE)
5013 {
5014 sort_param->max_pos= share->state.state.data_file_length;
5015 DBUG_RETURN(-1);
5016 }
5017 /* Retry only if wrong record, not if disk error */
5018 if (flag != HA_ERR_WRONG_IN_RECORD && flag != HA_ERR_WRONG_CRC &&
5019 flag != HA_ERR_DECRYPTION_FAILED)
5020 {
5021 retry_if_quick(sort_param, flag);
5022 DBUG_RETURN(flag);
5023 }
5024 }
5025 break; /* Impossible */
5026 }
5027 case STATIC_RECORD:
5028 for (;;)
5029 {
5030 if (my_b_read(&sort_param->read_cache,sort_param->record,
5031 share->base.pack_reclength))
5032 {
5033 if (sort_param->read_cache.error)
5034 param->out_flag |= O_DATA_LOST;
5035 retry_if_quick(sort_param, my_errno);
5036 DBUG_RETURN(-1);
5037 }
5038 sort_param->start_recpos=sort_param->pos;
5039 param->progress= sort_param->pos;
5040 if (!sort_param->fix_datafile)
5041 {
5042 sort_param->current_filepos= sort_param->pos;
5043 if (sort_param->master)
5044 share->state.split++;
5045 }
5046 sort_param->max_pos=(sort_param->pos+=share->base.pack_reclength);
5047 if (*sort_param->record)
5048 {
5049 if (sort_param->calc_checksum)
5050 param->glob_crc+= (info->cur_row.checksum=
5051 _ma_static_checksum(info,sort_param->record));
5052 DBUG_RETURN(0);
5053 }
5054 if (!sort_param->fix_datafile && sort_param->master)
5055 {
5056 share->state.state.del++;
5057 share->state.state.empty+=share->base.pack_reclength;
5058 }
5059 }
5060 case DYNAMIC_RECORD:
5061 {
5062 uchar *UNINIT_VAR(to);
5063 ha_checksum checksum= 0;
5064
5065 pos=sort_param->pos;
5066 param->progress= pos;
5067 searching=(sort_param->fix_datafile && (param->testflag & T_EXTEND));
5068 parallel_flag= (sort_param->read_cache.file < 0) ? READING_NEXT : 0;
5069 for (;;)
5070 {
5071 found_record=block_info.second_read= 0;
5072 left_length=1;
5073 if (searching)
5074 {
5075 pos=MY_ALIGN(pos,MARIA_DYN_ALIGN_SIZE);
5076 param->testflag|=T_RETRY_WITHOUT_QUICK;
5077 sort_param->start_recpos=pos;
5078 }
5079 do
5080 {
5081 if (pos > sort_param->max_pos)
5082 sort_param->max_pos=pos;
5083 if (pos & (MARIA_DYN_ALIGN_SIZE-1))
5084 {
5085 if ((param->testflag & T_VERBOSE) || searching == 0)
5086 _ma_check_print_info(param,"Wrong aligned block at %s",
5087 llstr(pos,llbuff));
5088 if (searching)
5089 goto try_next;
5090 }
5091 if (found_record && pos == param->search_after_block)
5092 _ma_check_print_info(param,"Block: %s used by record at %s",
5093 llstr(param->search_after_block,llbuff),
5094 llstr(sort_param->start_recpos,llbuff2));
5095 if (_ma_read_cache(info, &sort_param->read_cache,
5096 block_info.header, pos,
5097 MARIA_BLOCK_INFO_HEADER_LENGTH,
5098 (! found_record ? READING_NEXT : 0) |
5099 parallel_flag | READING_HEADER))
5100 {
5101 if (found_record)
5102 {
5103 _ma_check_print_info(param,
5104 "Can't read whole record at %s (errno: %d)",
5105 llstr(sort_param->start_recpos,llbuff),errno);
5106 goto try_next;
5107 }
5108 DBUG_RETURN(-1);
5109 }
5110 if (searching && ! sort_param->fix_datafile)
5111 {
5112 param->error_printed++;
5113 param->retry_repair=1;
5114 param->testflag|=T_RETRY_WITHOUT_QUICK;
5115 my_errno= HA_ERR_WRONG_IN_RECORD;
5116 DBUG_RETURN(1); /* Something wrong with data */
5117 }
5118 b_type= _ma_get_block_info(info, &block_info,-1,pos);
5119 if ((b_type & (BLOCK_ERROR | BLOCK_FATAL_ERROR)) ||
5120 ((b_type & BLOCK_FIRST) &&
5121 (block_info.rec_len < (uint) share->base.min_pack_length ||
5122 block_info.rec_len > (uint) share->base.max_pack_length)))
5123 {
5124 uint i;
5125 if (param->testflag & T_VERBOSE || searching == 0)
5126 _ma_check_print_info(param,
5127 "Wrong bytesec: %3d-%3d-%3d at %10s; Skipped",
5128 block_info.header[0],block_info.header[1],
5129 block_info.header[2],llstr(pos,llbuff));
5130 if (found_record)
5131 goto try_next;
5132 block_info.second_read=0;
5133 searching=1;
5134 /* Search after block in read header string */
5135 for (i=MARIA_DYN_ALIGN_SIZE ;
5136 i < MARIA_BLOCK_INFO_HEADER_LENGTH ;
5137 i+= MARIA_DYN_ALIGN_SIZE)
5138 if (block_info.header[i] >= 1 &&
5139 block_info.header[i] <= MARIA_MAX_DYN_HEADER_BYTE)
5140 break;
5141 pos+=(ulong) i;
5142 sort_param->start_recpos=pos;
5143 continue;
5144 }
5145 if (b_type & BLOCK_DELETED)
5146 {
5147 my_bool error=0;
5148 if (block_info.block_len+ (uint) (block_info.filepos-pos) <
5149 share->base.min_block_length)
5150 {
5151 if (!searching)
5152 _ma_check_print_info(param,
5153 "Deleted block with impossible length %lu "
5154 "at %s",
5155 block_info.block_len,llstr(pos,llbuff));
5156 error=1;
5157 }
5158 else
5159 {
5160 if ((block_info.next_filepos != HA_OFFSET_ERROR &&
5161 block_info.next_filepos >=
5162 share->state.state.data_file_length) ||
5163 (block_info.prev_filepos != HA_OFFSET_ERROR &&
5164 block_info.prev_filepos >=
5165 share->state.state.data_file_length))
5166 {
5167 if (!searching)
5168 _ma_check_print_info(param,
5169 "Delete link points outside datafile at "
5170 "%s",
5171 llstr(pos,llbuff));
5172 error=1;
5173 }
5174 }
5175 if (error)
5176 {
5177 if (found_record)
5178 goto try_next;
5179 searching=1;
5180 pos+= MARIA_DYN_ALIGN_SIZE;
5181 sort_param->start_recpos=pos;
5182 block_info.second_read=0;
5183 continue;
5184 }
5185 }
5186 else
5187 {
5188 if (block_info.block_len+ (uint) (block_info.filepos-pos) <
5189 share->base.min_block_length ||
5190 block_info.block_len > (uint) share->base.max_pack_length+
5191 MARIA_SPLIT_LENGTH)
5192 {
5193 if (!searching)
5194 _ma_check_print_info(param,
5195 "Found block with impossible length %lu "
5196 "at %s; Skipped",
5197 block_info.block_len+
5198 (uint) (block_info.filepos-pos),
5199 llstr(pos,llbuff));
5200 if (found_record)
5201 goto try_next;
5202 searching=1;
5203 pos+= MARIA_DYN_ALIGN_SIZE;
5204 sort_param->start_recpos=pos;
5205 block_info.second_read=0;
5206 continue;
5207 }
5208 }
5209 if (b_type & (BLOCK_DELETED | BLOCK_SYNC_ERROR))
5210 {
5211 if (!sort_param->fix_datafile && sort_param->master &&
5212 (b_type & BLOCK_DELETED))
5213 {
5214 share->state.state.empty+=block_info.block_len;
5215 share->state.state.del++;
5216 share->state.split++;
5217 }
5218 if (found_record)
5219 goto try_next;
5220 if (searching)
5221 {
5222 pos+=MARIA_DYN_ALIGN_SIZE;
5223 sort_param->start_recpos=pos;
5224 }
5225 else
5226 pos=block_info.filepos+block_info.block_len;
5227 block_info.second_read=0;
5228 continue;
5229 }
5230
5231 if (!sort_param->fix_datafile && sort_param->master)
5232 share->state.split++;
5233 if (! found_record++)
5234 {
5235 sort_param->find_length=left_length=block_info.rec_len;
5236 sort_param->start_recpos=pos;
5237 if (!sort_param->fix_datafile)
5238 sort_param->current_filepos= sort_param->start_recpos;
5239 if (sort_param->fix_datafile && (param->testflag & T_EXTEND))
5240 sort_param->pos=block_info.filepos+1;
5241 else
5242 sort_param->pos=block_info.filepos+block_info.block_len;
5243 if (share->base.blobs)
5244 {
5245 if (_ma_alloc_buffer(&sort_param->rec_buff,
5246 &sort_param->rec_buff_size,
5247 block_info.rec_len +
5248 share->base.extra_rec_buff_size, MYF(0)))
5249
5250 {
5251 if (param->max_record_length >= block_info.rec_len)
5252 {
5253 _ma_check_print_error(param,"Not enough memory for blob at %s "
5254 "(need %lu)",
5255 llstr(sort_param->start_recpos,llbuff),
5256 (ulong) block_info.rec_len);
5257 DBUG_RETURN(1);
5258 }
5259 else
5260 {
5261 _ma_check_print_info(param,"Not enough memory for blob at %s "
5262 "(need %lu); Row skipped",
5263 llstr(sort_param->start_recpos,llbuff),
5264 (ulong) block_info.rec_len);
5265 goto try_next;
5266 }
5267 }
5268 }
5269 to= sort_param->rec_buff;
5270 }
5271 if (left_length < block_info.data_len || ! block_info.data_len)
5272 {
5273 _ma_check_print_info(param,
5274 "Found block with too small length at %s; "
5275 "Skipped",
5276 llstr(sort_param->start_recpos,llbuff));
5277 goto try_next;
5278 }
5279 if (block_info.filepos + block_info.data_len >
5280 sort_param->read_cache.end_of_file)
5281 {
5282 _ma_check_print_info(param,
5283 "Found block that points outside data file "
5284 "at %s",
5285 llstr(sort_param->start_recpos,llbuff));
5286 goto try_next;
5287 }
5288 /*
5289 Copy information that is already read. Avoid accessing data
5290 below the cache start. This could happen if the header
5291 streched over the end of the previous buffer contents.
5292 */
5293 {
5294 uint header_len= (uint) (block_info.filepos - pos);
5295 uint prefetch_len= (MARIA_BLOCK_INFO_HEADER_LENGTH - header_len);
5296
5297 if (prefetch_len > block_info.data_len)
5298 prefetch_len= block_info.data_len;
5299 if (prefetch_len)
5300 {
5301 memcpy(to, block_info.header + header_len, prefetch_len);
5302 block_info.filepos+= prefetch_len;
5303 block_info.data_len-= prefetch_len;
5304 left_length-= prefetch_len;
5305 to+= prefetch_len;
5306 }
5307 }
5308 if (block_info.data_len &&
5309 _ma_read_cache(info, &sort_param->read_cache,to,block_info.filepos,
5310 block_info.data_len,
5311 (found_record == 1 ? READING_NEXT : 0) |
5312 parallel_flag))
5313 {
5314 _ma_check_print_info(param,
5315 "Read error for block at: %s (error: %d); "
5316 "Skipped",
5317 llstr(block_info.filepos,llbuff),my_errno);
5318 goto try_next;
5319 }
5320 left_length-=block_info.data_len;
5321 to+=block_info.data_len;
5322 pos=block_info.next_filepos;
5323 if (pos == HA_OFFSET_ERROR && left_length)
5324 {
5325 _ma_check_print_info(param,
5326 "Wrong block with wrong total length "
5327 "starting at %s",
5328 llstr(sort_param->start_recpos,llbuff));
5329 goto try_next;
5330 }
5331 if (pos + MARIA_BLOCK_INFO_HEADER_LENGTH >
5332 sort_param->read_cache.end_of_file)
5333 {
5334 _ma_check_print_info(param,
5335 "Found link that points at %s (outside data "
5336 "file) at %s",
5337 llstr(pos,llbuff2),
5338 llstr(sort_param->start_recpos,llbuff));
5339 goto try_next;
5340 }
5341 } while (left_length);
5342
5343 if (_ma_rec_unpack(info,sort_param->record,sort_param->rec_buff,
5344 sort_param->find_length) != MY_FILE_ERROR)
5345 {
5346 if (sort_param->read_cache.error < 0)
5347 DBUG_RETURN(1);
5348 if (sort_param->calc_checksum)
5349 checksum= (share->calc_check_checksum)(info, sort_param->record);
5350 if ((param->testflag & (T_EXTEND | T_REP)) || searching)
5351 {
5352 if (_ma_rec_check(info, sort_param->record, sort_param->rec_buff,
5353 sort_param->find_length,
5354 (param->testflag & T_QUICK) &&
5355 sort_param->calc_checksum &&
5356 MY_TEST(share->calc_checksum), checksum))
5357 {
5358 _ma_check_print_info(param,"Found wrong packed record at %s",
5359 llstr(sort_param->start_recpos,llbuff));
5360 goto try_next;
5361 }
5362 }
5363 if (sort_param->calc_checksum)
5364 param->glob_crc+= checksum;
5365 DBUG_RETURN(0);
5366 }
5367 if (!searching)
5368 _ma_check_print_info(param,"Key %d - Found wrong stored record at %s",
5369 sort_param->key+1,
5370 llstr(sort_param->start_recpos,llbuff));
5371 try_next:
5372 pos=(sort_param->start_recpos+=MARIA_DYN_ALIGN_SIZE);
5373 searching=1;
5374 }
5375 }
5376 case COMPRESSED_RECORD:
5377 param->progress= sort_param->pos;
5378 for (searching=0 ;; searching=1, sort_param->pos++)
5379 {
5380 if (_ma_read_cache(info, &sort_param->read_cache, block_info.header,
5381 sort_param->pos,
5382 share->pack.ref_length,READING_NEXT))
5383 DBUG_RETURN(-1);
5384 if (searching && ! sort_param->fix_datafile)
5385 {
5386 param->error_printed++;
5387 param->retry_repair=1;
5388 param->testflag|=T_RETRY_WITHOUT_QUICK;
5389 my_errno= HA_ERR_WRONG_IN_RECORD;
5390 DBUG_RETURN(1); /* Something wrong with data */
5391 }
5392 sort_param->start_recpos=sort_param->pos;
5393 if (_ma_pack_get_block_info(info, &sort_param->bit_buff, &block_info,
5394 &sort_param->rec_buff,
5395 &sort_param->rec_buff_size, -1,
5396 sort_param->pos))
5397 DBUG_RETURN(-1);
5398 if (!block_info.rec_len &&
5399 sort_param->pos + MEMMAP_EXTRA_MARGIN ==
5400 sort_param->read_cache.end_of_file)
5401 DBUG_RETURN(-1);
5402 if (block_info.rec_len < (uint) share->min_pack_length ||
5403 block_info.rec_len > (uint) share->max_pack_length)
5404 {
5405 if (! searching)
5406 _ma_check_print_info(param,
5407 "Found block with wrong recordlength: %lu "
5408 "at %s\n",
5409 block_info.rec_len,
5410 llstr(sort_param->pos,llbuff));
5411 continue;
5412 }
5413 if (_ma_read_cache(info, &sort_param->read_cache, sort_param->rec_buff,
5414 block_info.filepos, block_info.rec_len,
5415 READING_NEXT))
5416 {
5417 if (! searching)
5418 _ma_check_print_info(param,"Couldn't read whole record from %s",
5419 llstr(sort_param->pos,llbuff));
5420 continue;
5421 }
5422 sort_param->rec_buff[block_info.rec_len]= 0; /* Keep valgrind happy */
5423 if (_ma_pack_rec_unpack(info, &sort_param->bit_buff, sort_param->record,
5424 sort_param->rec_buff, block_info.rec_len))
5425 {
5426 if (! searching)
5427 _ma_check_print_info(param,"Found wrong record at %s",
5428 llstr(sort_param->pos,llbuff));
5429 continue;
5430 }
5431 if (!sort_param->fix_datafile)
5432 {
5433 sort_param->current_filepos= sort_param->pos;
5434 if (sort_param->master)
5435 share->state.split++;
5436 }
5437 sort_param->max_pos= (sort_param->pos=block_info.filepos+
5438 block_info.rec_len);
5439 info->packed_length=block_info.rec_len;
5440
5441 if (sort_param->calc_checksum)
5442 {
5443 info->cur_row.checksum= (*share->calc_check_checksum)(info,
5444 sort_param->
5445 record);
5446 param->glob_crc+= info->cur_row.checksum;
5447 }
5448 DBUG_RETURN(0);
5449 }
5450 case NO_RECORD:
5451 DBUG_RETURN(1); /* Impossible */
5452 }
5453 DBUG_RETURN(1); /* Impossible */
5454 }
5455
5456
5457 /**
5458 @brief Write record to new file.
5459
5460 @fn _ma_sort_write_record()
5461 @param sort_param Sort parameters.
5462
5463 @note
5464 This is only called by a master thread if parallel repair is used.
5465
5466 @return
5467 @retval 0 OK
5468 sort_param->current_filepos points to inserted record for
5469 block_records and to the place for the next record for
5470 other row types.
5471 sort_param->filepos points to end of file
5472 @retval 1 Error
5473 */
5474
_ma_sort_write_record(MARIA_SORT_PARAM * sort_param)5475 int _ma_sort_write_record(MARIA_SORT_PARAM *sort_param)
5476 {
5477 int flag;
5478 uint length;
5479 ulong block_length,reclength;
5480 uchar *from;
5481 uchar block_buff[8];
5482 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
5483 HA_CHECK *param= sort_info->param;
5484 MARIA_HA *info= sort_info->new_info;
5485 MARIA_SHARE *share= info->s;
5486 DBUG_ENTER("_ma_sort_write_record");
5487
5488 if (sort_param->fix_datafile)
5489 {
5490 sort_param->current_filepos= sort_param->filepos;
5491 switch (sort_info->new_data_file_type) {
5492 case BLOCK_RECORD:
5493 if ((sort_param->current_filepos=
5494 (*share->write_record_init)(info, sort_param->record)) ==
5495 HA_OFFSET_ERROR)
5496 {
5497 _ma_check_print_error(param, "%d when writing to datafile", my_errno);
5498 DBUG_RETURN(1);
5499 }
5500 /* Pointer to end of file */
5501 sort_param->filepos= share->state.state.data_file_length;
5502 break;
5503 case STATIC_RECORD:
5504 if (my_b_write(&info->rec_cache,sort_param->record,
5505 share->base.pack_reclength))
5506 {
5507 _ma_check_print_error(param,"%d when writing to datafile",my_errno);
5508 DBUG_RETURN(1);
5509 }
5510 sort_param->filepos+=share->base.pack_reclength;
5511 share->state.split++;
5512 break;
5513 case DYNAMIC_RECORD:
5514 if (! info->blobs)
5515 from=sort_param->rec_buff;
5516 else
5517 {
5518 /* must be sure that local buffer is big enough */
5519 reclength=share->base.pack_reclength+
5520 _ma_calc_total_blob_length(info,sort_param->record)+
5521 ALIGN_SIZE(MARIA_MAX_DYN_BLOCK_HEADER)+MARIA_SPLIT_LENGTH+
5522 MARIA_DYN_DELETE_BLOCK_HEADER;
5523 if (sort_info->buff_length < reclength)
5524 {
5525 if (!(sort_info->buff=my_realloc(PSI_INSTRUMENT_ME, sort_info->buff, (uint) reclength,
5526 MYF(MY_FREE_ON_ERROR |
5527 MY_ALLOW_ZERO_PTR))))
5528 DBUG_RETURN(1);
5529 sort_info->buff_length=reclength;
5530 }
5531 from= (uchar *) sort_info->buff+ALIGN_SIZE(MARIA_MAX_DYN_BLOCK_HEADER);
5532 }
5533 /* We can use info->checksum here as only one thread calls this */
5534 info->cur_row.checksum= (*share->calc_check_checksum)(info,
5535 sort_param->
5536 record);
5537 if (!(reclength= _ma_rec_pack(info,from,sort_param->record)))
5538 {
5539 _ma_check_print_error(param,"Got error %d when packing record",
5540 my_errno);
5541 DBUG_RETURN(1);
5542 }
5543 flag=0;
5544
5545 do
5546 {
5547 block_length= reclength + 3 + MY_TEST(reclength >= (65520 - 3));
5548 if (block_length < share->base.min_block_length)
5549 block_length=share->base.min_block_length;
5550 info->update|=HA_STATE_WRITE_AT_END;
5551 block_length=MY_ALIGN(block_length,MARIA_DYN_ALIGN_SIZE);
5552 if (block_length > MARIA_MAX_BLOCK_LENGTH)
5553 block_length=MARIA_MAX_BLOCK_LENGTH;
5554 if (_ma_write_part_record(info,0L,block_length,
5555 sort_param->filepos+block_length,
5556 &from,&reclength,&flag))
5557 {
5558 _ma_check_print_error(param,"%d when writing to datafile",my_errno);
5559 DBUG_RETURN(1);
5560 }
5561 sort_param->filepos+=block_length;
5562 share->state.split++;
5563 } while (reclength);
5564 break;
5565 case COMPRESSED_RECORD:
5566 reclength=info->packed_length;
5567 length= _ma_save_pack_length((uint) share->pack.version, block_buff,
5568 reclength);
5569 if (share->base.blobs)
5570 length+= _ma_save_pack_length((uint) share->pack.version,
5571 block_buff + length, info->blob_length);
5572 if (my_b_write(&info->rec_cache,block_buff,length) ||
5573 my_b_write(&info->rec_cache, sort_param->rec_buff, reclength))
5574 {
5575 _ma_check_print_error(param,"%d when writing to datafile",my_errno);
5576 DBUG_RETURN(1);
5577 }
5578 sort_param->filepos+=reclength+length;
5579 share->state.split++;
5580 break;
5581 case NO_RECORD:
5582 DBUG_RETURN(1); /* Impossible */
5583 }
5584 }
5585 if (sort_param->master)
5586 {
5587 share->state.state.records++;
5588 if ((param->testflag & T_WRITE_LOOP) &&
5589 (share->state.state.records % WRITE_COUNT) == 0)
5590 {
5591 char llbuff[22];
5592 printf("%s\r", llstr(share->state.state.records,llbuff));
5593 fflush(stdout);
5594 }
5595 }
5596 DBUG_RETURN(0);
5597 } /* _ma_sort_write_record */
5598
5599
5600 /* Compare two keys from _ma_create_index_by_sort */
5601
sort_key_cmp(MARIA_SORT_PARAM * sort_param,const void * a,const void * b)5602 static int sort_key_cmp(MARIA_SORT_PARAM *sort_param, const void *a,
5603 const void *b)
5604 {
5605 uint not_used[2];
5606 return (ha_key_cmp(sort_param->seg, *((uchar* const *) a),
5607 *((uchar* const *) b),
5608 USE_WHOLE_KEY, SEARCH_SAME, not_used));
5609 } /* sort_key_cmp */
5610
5611
sort_key_write(MARIA_SORT_PARAM * sort_param,const uchar * a)5612 static int sort_key_write(MARIA_SORT_PARAM *sort_param, const uchar *a)
5613 {
5614 uint diff_pos[2];
5615 char llbuff[22],llbuff2[22];
5616 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
5617 HA_CHECK *param= sort_info->param;
5618 MARIA_HA *info= sort_info->info;
5619 int cmp;
5620
5621 if (sort_info->key_block->inited)
5622 {
5623 cmp= ha_key_cmp(sort_param->seg, sort_info->key_block->lastkey,
5624 a, USE_WHOLE_KEY,
5625 SEARCH_FIND | SEARCH_UPDATE | SEARCH_INSERT,
5626 diff_pos);
5627 if (param->stats_method == MI_STATS_METHOD_NULLS_NOT_EQUAL)
5628 ha_key_cmp(sort_param->seg, sort_info->key_block->lastkey,
5629 a, USE_WHOLE_KEY,
5630 SEARCH_FIND | SEARCH_NULL_ARE_NOT_EQUAL, diff_pos);
5631 else if (param->stats_method == MI_STATS_METHOD_IGNORE_NULLS)
5632 {
5633 diff_pos[0]= maria_collect_stats_nonulls_next(sort_param->seg,
5634 sort_param->notnull,
5635 sort_info->key_block->lastkey,
5636 a);
5637 }
5638 sort_param->unique[diff_pos[0]-1]++;
5639 }
5640 else
5641 {
5642 cmp= -1;
5643 if (param->stats_method == MI_STATS_METHOD_IGNORE_NULLS)
5644 maria_collect_stats_nonulls_first(sort_param->seg, sort_param->notnull,
5645 a);
5646 }
5647 if ((sort_param->keyinfo->flag & HA_NOSAME) && cmp == 0)
5648 {
5649 DBUG_EXECUTE("key", _ma_print_keydata(DBUG_FILE, sort_param->seg, a,
5650 USE_WHOLE_KEY););
5651 sort_info->dupp++;
5652 sort_info->info->cur_row.lastpos= get_record_for_key(sort_param->keyinfo,
5653 a);
5654 if ((param->testflag & (T_CREATE_UNIQUE_BY_SORT | T_SUPPRESS_ERR_HANDLING))
5655 == T_CREATE_UNIQUE_BY_SORT)
5656 param->testflag|= T_SUPPRESS_ERR_HANDLING;
5657 _ma_check_print_warning(param,
5658 "Duplicate key %2u for record at %10s against "
5659 "record at %10s",
5660 sort_param->key + 1,
5661 record_pos_to_txt(info,
5662 sort_info->info->cur_row.lastpos,
5663 llbuff),
5664 record_pos_to_txt(info,
5665 get_record_for_key(sort_param->
5666 keyinfo,
5667 sort_info->key_block->lastkey),
5668 llbuff2));
5669 param->testflag|=T_RETRY_WITHOUT_QUICK;
5670 if (sort_info->param->testflag & T_VERBOSE)
5671 _ma_print_keydata(stdout,sort_param->seg, a, USE_WHOLE_KEY);
5672 return (sort_delete_record(sort_param));
5673 }
5674 #ifndef DBUG_OFF
5675 if (cmp > 0)
5676 {
5677 _ma_check_print_error(param,
5678 "Internal error: Keys are not in order from sort");
5679 return(1);
5680 }
5681 #endif
5682 return (sort_insert_key(sort_param, sort_info->key_block,
5683 a, HA_OFFSET_ERROR));
5684 } /* sort_key_write */
5685
5686
_ma_sort_ft_buf_flush(MARIA_SORT_PARAM * sort_param)5687 int _ma_sort_ft_buf_flush(MARIA_SORT_PARAM *sort_param)
5688 {
5689 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
5690 MA_SORT_KEY_BLOCKS *key_block=sort_info->key_block;
5691 MARIA_SHARE *share=sort_info->info->s;
5692 uint val_off, val_len;
5693 int error;
5694 MA_SORT_FT_BUF *maria_ft_buf=sort_info->ft_buf;
5695 uchar *from, *to;
5696
5697 val_len=share->ft2_keyinfo.keylength;
5698 get_key_full_length_rdonly(val_off, maria_ft_buf->lastkey);
5699 to= maria_ft_buf->lastkey+val_off;
5700
5701 if (maria_ft_buf->buf)
5702 {
5703 /* flushing first-level tree */
5704 error= sort_insert_key(sort_param,key_block,maria_ft_buf->lastkey,
5705 HA_OFFSET_ERROR);
5706 for (from=to+val_len;
5707 !error && from < maria_ft_buf->buf;
5708 from+= val_len)
5709 {
5710 memcpy(to, from, val_len);
5711 error= sort_insert_key(sort_param,key_block,maria_ft_buf->lastkey,
5712 HA_OFFSET_ERROR);
5713 }
5714 return error;
5715 }
5716 /* flushing second-level tree keyblocks */
5717 error=_ma_flush_pending_blocks(sort_param);
5718 /* updating lastkey with second-level tree info */
5719 ft_intXstore(maria_ft_buf->lastkey+val_off, -maria_ft_buf->count);
5720 _ma_dpointer(sort_info->info->s, maria_ft_buf->lastkey+val_off+HA_FT_WLEN,
5721 share->state.key_root[sort_param->key]);
5722 /* restoring first level tree data in sort_info/sort_param */
5723 sort_info->key_block=sort_info->key_block_end- sort_info->param->sort_key_blocks;
5724 sort_param->keyinfo=share->keyinfo+sort_param->key;
5725 share->state.key_root[sort_param->key]=HA_OFFSET_ERROR;
5726 /* writing lastkey in first-level tree */
5727 return error ? error :
5728 sort_insert_key(sort_param,sort_info->key_block,
5729 maria_ft_buf->lastkey,HA_OFFSET_ERROR);
5730 }
5731
5732
sort_maria_ft_key_write(MARIA_SORT_PARAM * sort_param,const uchar * a)5733 static int sort_maria_ft_key_write(MARIA_SORT_PARAM *sort_param,
5734 const uchar *a)
5735 {
5736 uint a_len, val_off, val_len, error;
5737 MARIA_SORT_INFO *sort_info= sort_param->sort_info;
5738 MA_SORT_FT_BUF *ft_buf= sort_info->ft_buf;
5739 MA_SORT_KEY_BLOCKS *key_block= sort_info->key_block;
5740 MARIA_SHARE *share= sort_info->info->s;
5741
5742 val_len=HA_FT_WLEN+share->rec_reflength;
5743 get_key_full_length_rdonly(a_len, a);
5744
5745 if (!ft_buf)
5746 {
5747 /*
5748 use two-level tree only if key_reflength fits in rec_reflength place
5749 and row format is NOT static - for _ma_dpointer not to garble offsets
5750 */
5751 if ((share->base.key_reflength <=
5752 share->rec_reflength) &&
5753 (share->options &
5754 (HA_OPTION_PACK_RECORD | HA_OPTION_COMPRESS_RECORD)))
5755 ft_buf= (MA_SORT_FT_BUF *)my_malloc(PSI_INSTRUMENT_ME, sort_param->keyinfo->block_length +
5756 sizeof(MA_SORT_FT_BUF), MYF(MY_WME));
5757
5758 if (!ft_buf)
5759 {
5760 sort_param->key_write=sort_key_write;
5761 return sort_key_write(sort_param, a);
5762 }
5763 sort_info->ft_buf= ft_buf;
5764 goto word_init_ft_buf; /* no need to duplicate the code */
5765 }
5766 get_key_full_length_rdonly(val_off, ft_buf->lastkey);
5767
5768 if (ha_compare_text(sort_param->seg->charset,
5769 a+1,a_len-1,
5770 ft_buf->lastkey+1,val_off-1, 0)==0)
5771 {
5772 uchar *p;
5773 if (!ft_buf->buf) /* store in second-level tree */
5774 {
5775 ft_buf->count++;
5776 return sort_insert_key(sort_param,key_block,
5777 a + a_len, HA_OFFSET_ERROR);
5778 }
5779
5780 /* storing the key in the buffer. */
5781 memcpy (ft_buf->buf, (const char *)a+a_len, val_len);
5782 ft_buf->buf+=val_len;
5783 if (ft_buf->buf < ft_buf->end)
5784 return 0;
5785
5786 /* converting to two-level tree */
5787 p=ft_buf->lastkey+val_off;
5788
5789 while (key_block->inited)
5790 key_block++;
5791 sort_info->key_block=key_block;
5792 sort_param->keyinfo= &share->ft2_keyinfo;
5793 ft_buf->count=(uint)(ft_buf->buf - p)/val_len;
5794
5795 /* flushing buffer to second-level tree */
5796 for (error=0; !error && p < ft_buf->buf; p+= val_len)
5797 error=sort_insert_key(sort_param,key_block,p,HA_OFFSET_ERROR);
5798 ft_buf->buf=0;
5799 return error;
5800 }
5801
5802 /* flushing buffer */
5803 if ((error=_ma_sort_ft_buf_flush(sort_param)))
5804 return error;
5805
5806 word_init_ft_buf:
5807 a_len+=val_len;
5808 memcpy(ft_buf->lastkey, a, a_len);
5809 ft_buf->buf=ft_buf->lastkey+a_len;
5810 /*
5811 32 is just a safety margin here
5812 (at least MY_MAX(val_len, sizeof(nod_flag)) should be there).
5813 May be better performance could be achieved if we'd put
5814 (sort_info->keyinfo->block_length-32)/XXX
5815 instead.
5816 TODO: benchmark the best value for XXX.
5817 */
5818 ft_buf->end= ft_buf->lastkey+ (sort_param->keyinfo->block_length-32);
5819 return 0;
5820 } /* sort_maria_ft_key_write */
5821
5822
5823 /* get pointer to record from a key */
5824
get_record_for_key(MARIA_KEYDEF * keyinfo,const uchar * key_data)5825 static my_off_t get_record_for_key(MARIA_KEYDEF *keyinfo,
5826 const uchar *key_data)
5827 {
5828 MARIA_KEY key;
5829 key.keyinfo= keyinfo;
5830 key.data= (uchar*) key_data;
5831 key.data_length= (_ma_keylength(keyinfo, key_data) -
5832 keyinfo->share->rec_reflength);
5833 return _ma_row_pos_from_key(&key);
5834 } /* get_record_for_key */
5835
5836
5837 /* Insert a key in sort-key-blocks */
5838
sort_insert_key(MARIA_SORT_PARAM * sort_param,register MA_SORT_KEY_BLOCKS * key_block,const uchar * key,my_off_t prev_block)5839 static int sort_insert_key(MARIA_SORT_PARAM *sort_param,
5840 register MA_SORT_KEY_BLOCKS *key_block,
5841 const uchar *key,
5842 my_off_t prev_block)
5843 {
5844 uint a_length,t_length,nod_flag;
5845 my_off_t filepos;
5846 uchar *anc_buff,*lastkey;
5847 MARIA_KEY_PARAM s_temp;
5848 MARIA_KEYDEF *keyinfo=sort_param->keyinfo;
5849 MARIA_SORT_INFO *sort_info= sort_param->sort_info;
5850 HA_CHECK *param=sort_info->param;
5851 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
5852 MARIA_KEY tmp_key;
5853 MARIA_HA *info= sort_info->info;
5854 MARIA_SHARE *share= info->s;
5855 DBUG_ENTER("sort_insert_key");
5856
5857 anc_buff= key_block->buff;
5858 lastkey=key_block->lastkey;
5859 nod_flag= (key_block == sort_info->key_block ? 0 :
5860 share->base.key_reflength);
5861
5862 if (!key_block->inited)
5863 {
5864 key_block->inited=1;
5865 if (key_block == sort_info->key_block_end)
5866 {
5867 _ma_check_print_error(param,
5868 "To many key-block-levels; "
5869 "Try increasing sort_key_blocks");
5870 DBUG_RETURN(1);
5871 }
5872 a_length= share->keypage_header + nod_flag;
5873 key_block->end_pos= anc_buff + share->keypage_header;
5874 bzero(anc_buff, share->keypage_header);
5875 _ma_store_keynr(share, anc_buff, sort_param->keyinfo->key_nr);
5876 lastkey=0; /* No previous key in block */
5877 }
5878 else
5879 a_length= _ma_get_page_used(share, anc_buff);
5880
5881 /* Save pointer to previous block */
5882 if (nod_flag)
5883 {
5884 _ma_store_keypage_flag(share, anc_buff, KEYPAGE_FLAG_ISNOD);
5885 _ma_kpointer(info,key_block->end_pos,prev_block);
5886 }
5887
5888 tmp_key.keyinfo= keyinfo;
5889 tmp_key.data= (uchar*) key;
5890 tmp_key.data_length= _ma_keylength(keyinfo, key) - share->rec_reflength;
5891 tmp_key.ref_length= share->rec_reflength;
5892
5893 t_length= (*keyinfo->pack_key)(&tmp_key, nod_flag,
5894 (uchar*) 0, lastkey, lastkey, &s_temp);
5895 (*keyinfo->store_key)(keyinfo, key_block->end_pos+nod_flag,&s_temp);
5896 a_length+=t_length;
5897 _ma_store_page_used(share, anc_buff, a_length);
5898 key_block->end_pos+=t_length;
5899 if (a_length <= share->max_index_block_size)
5900 {
5901 MARIA_KEY tmp_key2;
5902 tmp_key2.data= key_block->lastkey;
5903 _ma_copy_key(&tmp_key2, &tmp_key);
5904 key_block->last_length=a_length-t_length;
5905 DBUG_RETURN(0);
5906 }
5907
5908 /* Fill block with end-zero and write filled block */
5909 _ma_store_page_used(share, anc_buff, key_block->last_length);
5910 bzero(anc_buff+key_block->last_length,
5911 keyinfo->block_length- key_block->last_length);
5912 if ((filepos= _ma_new(info, DFLT_INIT_HITS, &page_link)) == HA_OFFSET_ERROR)
5913 DBUG_RETURN(1);
5914 _ma_fast_unlock_key_del(info);
5915
5916 /* If we read the page from the key cache, we have to write it back to it */
5917 if (page_link->changed)
5918 {
5919 MARIA_PAGE page;
5920 pop_dynamic(&info->pinned_pages);
5921 _ma_page_setup(&page, info, keyinfo, filepos, anc_buff);
5922 if (_ma_write_keypage(&page, PAGECACHE_LOCK_WRITE_UNLOCK, DFLT_INIT_HITS))
5923 DBUG_RETURN(1);
5924 }
5925 else
5926 {
5927 if (write_page(share, share->kfile.file, anc_buff,
5928 keyinfo->block_length, filepos, param->myf_rw))
5929 DBUG_RETURN(1);
5930 }
5931 DBUG_DUMP("buff", anc_buff, _ma_get_page_used(share, anc_buff));
5932
5933 /* Write separator-key to block in next level */
5934 if (sort_insert_key(sort_param,key_block+1,key_block->lastkey,filepos))
5935 DBUG_RETURN(1);
5936
5937 /* clear old block and write new key in it */
5938 key_block->inited=0;
5939 DBUG_RETURN(sort_insert_key(sort_param, key_block,key,prev_block));
5940 } /* sort_insert_key */
5941
5942
5943 /* Delete record when we found a duplicated key */
5944
sort_delete_record(MARIA_SORT_PARAM * sort_param)5945 static int sort_delete_record(MARIA_SORT_PARAM *sort_param)
5946 {
5947 uint i;
5948 int old_file,error;
5949 uchar *key;
5950 MARIA_SORT_INFO *sort_info=sort_param->sort_info;
5951 HA_CHECK *param=sort_info->param;
5952 MARIA_HA *row_info= sort_info->new_info, *key_info= sort_info->info;
5953 DBUG_ENTER("sort_delete_record");
5954
5955 if ((param->testflag & (T_FORCE_UNIQUENESS|T_QUICK)) == T_QUICK)
5956 {
5957 _ma_check_print_error(param,
5958 "Quick-recover aborted; Run recovery without switch "
5959 "-q or with switch -qq");
5960 DBUG_RETURN(1);
5961 }
5962 if (key_info->s->options & HA_OPTION_COMPRESS_RECORD)
5963 {
5964 _ma_check_print_error(param,
5965 "Recover aborted; Can't run standard recovery on "
5966 "compressed tables with errors in data-file. "
5967 "Use 'aria_chk --safe-recover' to fix it");
5968 DBUG_RETURN(1);
5969 }
5970
5971 old_file= row_info->dfile.file;
5972 /* This only affects static and dynamic row formats */
5973 row_info->dfile.file= row_info->rec_cache.file;
5974 if (flush_io_cache(&row_info->rec_cache))
5975 DBUG_RETURN(1);
5976
5977 key= key_info->lastkey_buff + key_info->s->base.max_key_length;
5978 if ((error=(*row_info->s->read_record)(row_info, sort_param->record,
5979 key_info->cur_row.lastpos)) &&
5980 error != HA_ERR_RECORD_DELETED)
5981 {
5982 _ma_check_print_error(param,"Can't read record to be removed");
5983 row_info->dfile.file= old_file;
5984 DBUG_RETURN(1);
5985 }
5986 row_info->cur_row.lastpos= key_info->cur_row.lastpos;
5987
5988 for (i=0 ; i < sort_info->current_key ; i++)
5989 {
5990 MARIA_KEY tmp_key;
5991 (*key_info->s->keyinfo[i].make_key)(key_info, &tmp_key, i, key,
5992 sort_param->record,
5993 key_info->cur_row.lastpos, 0);
5994 if (_ma_ck_delete(key_info, &tmp_key))
5995 {
5996 _ma_check_print_error(param,
5997 "Can't delete key %d from record to be removed",
5998 i+1);
5999 row_info->dfile.file= old_file;
6000 DBUG_RETURN(1);
6001 }
6002 }
6003 if (sort_param->calc_checksum)
6004 param->glob_crc-=(*key_info->s->calc_check_checksum)(key_info,
6005 sort_param->record);
6006 error= (*row_info->s->delete_record)(row_info, sort_param->record);
6007 if (error)
6008 _ma_check_print_error(param,"Got error %d when deleting record",
6009 my_errno);
6010 row_info->dfile.file= old_file; /* restore actual value */
6011 row_info->s->state.state.records--;
6012 DBUG_RETURN(error);
6013 } /* sort_delete_record */
6014
6015
6016 /* Fix all pending blocks and flush everything to disk */
6017
_ma_flush_pending_blocks(MARIA_SORT_PARAM * sort_param)6018 int _ma_flush_pending_blocks(MARIA_SORT_PARAM *sort_param)
6019 {
6020 uint nod_flag,length;
6021 my_off_t filepos;
6022 MA_SORT_KEY_BLOCKS *key_block;
6023 MARIA_SORT_INFO *sort_info= sort_param->sort_info;
6024 myf myf_rw=sort_info->param->myf_rw;
6025 MARIA_HA *info=sort_info->info;
6026 MARIA_KEYDEF *keyinfo=sort_param->keyinfo;
6027 MARIA_PINNED_PAGE tmp_page_link, *page_link= &tmp_page_link;
6028 DBUG_ENTER("_ma_flush_pending_blocks");
6029
6030 filepos= HA_OFFSET_ERROR; /* if empty file */
6031 nod_flag=0;
6032 for (key_block=sort_info->key_block ; key_block->inited ; key_block++)
6033 {
6034 key_block->inited=0;
6035 length= _ma_get_page_used(info->s, key_block->buff);
6036 if (nod_flag)
6037 _ma_kpointer(info,key_block->end_pos,filepos);
6038 bzero(key_block->buff+length, keyinfo->block_length-length);
6039 if ((filepos= _ma_new(info, DFLT_INIT_HITS, &page_link)) ==
6040 HA_OFFSET_ERROR)
6041 goto err;
6042
6043 /* If we read the page from the key cache, we have to write it back */
6044 if (page_link->changed)
6045 {
6046 MARIA_PAGE page;
6047 pop_dynamic(&info->pinned_pages);
6048
6049 _ma_page_setup(&page, info, keyinfo, filepos, key_block->buff);
6050 if (_ma_write_keypage(&page, PAGECACHE_LOCK_WRITE_UNLOCK,
6051 DFLT_INIT_HITS))
6052 goto err;
6053 }
6054 else
6055 {
6056 if (write_page(info->s, info->s->kfile.file, key_block->buff,
6057 keyinfo->block_length, filepos, myf_rw))
6058 goto err;
6059 }
6060 DBUG_DUMP("buff",key_block->buff,length);
6061 nod_flag=1;
6062 }
6063 info->s->state.key_root[sort_param->key]=filepos; /* Last is root for tree */
6064 _ma_fast_unlock_key_del(info);
6065 DBUG_RETURN(0);
6066
6067 err:
6068 _ma_fast_unlock_key_del(info);
6069 DBUG_RETURN(1);
6070 } /* _ma_flush_pending_blocks */
6071
6072 /* alloc space and pointers for key_blocks */
6073
alloc_key_blocks(HA_CHECK * param,uint blocks,uint buffer_length)6074 static MA_SORT_KEY_BLOCKS *alloc_key_blocks(HA_CHECK *param, uint blocks,
6075 uint buffer_length)
6076 {
6077 reg1 uint i;
6078 MA_SORT_KEY_BLOCKS *block;
6079 DBUG_ENTER("alloc_key_blocks");
6080
6081 if (!(block= (MA_SORT_KEY_BLOCKS*) my_malloc(PSI_INSTRUMENT_ME,
6082 (sizeof(MA_SORT_KEY_BLOCKS)+buffer_length+IO_SIZE)*blocks, MYF(0))))
6083 {
6084 _ma_check_print_error(param,"Not enough memory for sort-key-blocks");
6085 return(0);
6086 }
6087 for (i=0 ; i < blocks ; i++)
6088 {
6089 block[i].inited=0;
6090 block[i].buff= (uchar*) (block+blocks)+(buffer_length+IO_SIZE)*i;
6091 }
6092 DBUG_RETURN(block);
6093 } /* alloc_key_blocks */
6094
6095
6096 /* Check if file is almost full */
6097
maria_test_if_almost_full(MARIA_HA * info)6098 int maria_test_if_almost_full(MARIA_HA *info)
6099 {
6100 MARIA_SHARE *share= info->s;
6101
6102 if (share->options & HA_OPTION_COMPRESS_RECORD)
6103 return 0;
6104 return mysql_file_seek(share->kfile.file, 0L, MY_SEEK_END,
6105 MYF(MY_THREADSAFE))/10*9 >
6106 (my_off_t) share->base.max_key_file_length ||
6107 mysql_file_seek(info->dfile.file, 0L, MY_SEEK_END, MYF(0)) / 10 * 9 >
6108 (my_off_t) share->base.max_data_file_length;
6109 }
6110
6111
6112 /* Recreate table with bigger more alloced record-data */
6113
maria_recreate_table(HA_CHECK * param,MARIA_HA ** org_info,char * filename)6114 int maria_recreate_table(HA_CHECK *param, MARIA_HA **org_info, char *filename)
6115 {
6116 int error;
6117 MARIA_HA info;
6118 MARIA_SHARE share;
6119 MARIA_KEYDEF *keyinfo,*key,*key_end;
6120 HA_KEYSEG *keysegs,*keyseg;
6121 MARIA_COLUMNDEF *columndef,*column,*end;
6122 MARIA_UNIQUEDEF *uniquedef,*u_ptr,*u_end;
6123 MARIA_STATUS_INFO status_info;
6124 uint unpack,key_parts;
6125 ha_rows max_records;
6126 ulonglong file_length,tmp_length;
6127 MARIA_CREATE_INFO create_info;
6128 DBUG_ENTER("maria_recreate_table");
6129
6130 if ((!(param->testflag & T_SILENT)))
6131 printf("Recreating table '%s'\n", param->isam_file_name);
6132
6133 error=1; /* Default error */
6134 info= **org_info;
6135 status_info= (*org_info)->state[0];
6136 info.state= &status_info;
6137 share= *(*org_info)->s;
6138 unpack= ((share.data_file_type == COMPRESSED_RECORD) &&
6139 (param->testflag & T_UNPACK));
6140 if (!(keyinfo=(MARIA_KEYDEF*) my_alloca(sizeof(MARIA_KEYDEF) *
6141 share.base.keys)))
6142 DBUG_RETURN(0);
6143 memcpy((uchar*) keyinfo,(uchar*) share.keyinfo,
6144 (size_t) (sizeof(MARIA_KEYDEF)*share.base.keys));
6145
6146 key_parts= share.base.all_key_parts;
6147 if (!(keysegs=(HA_KEYSEG*) my_alloca(sizeof(HA_KEYSEG)*
6148 (key_parts+share.base.keys))))
6149 {
6150 my_afree(keyinfo);
6151 DBUG_RETURN(1);
6152 }
6153 if (!(columndef=(MARIA_COLUMNDEF*)
6154 my_alloca(sizeof(MARIA_COLUMNDEF)*(share.base.fields+1))))
6155 {
6156 my_afree(keyinfo);
6157 my_afree(keysegs);
6158 DBUG_RETURN(1);
6159 }
6160 if (!(uniquedef=(MARIA_UNIQUEDEF*)
6161 my_alloca(sizeof(MARIA_UNIQUEDEF)*(share.state.header.uniques+1))))
6162 {
6163 my_afree(columndef);
6164 my_afree(keyinfo);
6165 my_afree(keysegs);
6166 DBUG_RETURN(1);
6167 }
6168
6169 /* Copy the column definitions in their original order */
6170 for (column= share.columndef, end= share.columndef+share.base.fields;
6171 column != end ;
6172 column++)
6173 columndef[column->column_nr]= *column;
6174
6175 /* Change the new key to point at the saved key segments */
6176 memcpy((uchar*) keysegs,(uchar*) share.keyparts,
6177 (size_t) (sizeof(HA_KEYSEG)*(key_parts+share.base.keys+
6178 share.state.header.uniques)));
6179 keyseg=keysegs;
6180 for (key=keyinfo,key_end=keyinfo+share.base.keys; key != key_end ; key++)
6181 {
6182 key->seg=keyseg;
6183 for (; keyseg->type ; keyseg++)
6184 {
6185 if (param->language)
6186 keyseg->language=param->language; /* change language */
6187 }
6188 keyseg++; /* Skip end pointer */
6189 }
6190
6191 /*
6192 Copy the unique definitions and change them to point at the new key
6193 segments
6194 */
6195 memcpy((uchar*) uniquedef,(uchar*) share.uniqueinfo,
6196 (size_t) (sizeof(MARIA_UNIQUEDEF)*(share.state.header.uniques)));
6197 for (u_ptr=uniquedef,u_end=uniquedef+share.state.header.uniques;
6198 u_ptr != u_end ; u_ptr++)
6199 {
6200 u_ptr->seg=keyseg;
6201 keyseg+=u_ptr->keysegs+1;
6202 }
6203
6204 file_length=(ulonglong) mysql_file_seek(info.dfile.file, 0L, MY_SEEK_END, MYF(0));
6205 if (share.options & HA_OPTION_COMPRESS_RECORD)
6206 share.base.records=max_records=info.state->records;
6207 else if (share.base.min_pack_length)
6208 max_records=(ha_rows) (file_length / share.base.min_pack_length);
6209 else
6210 max_records=0;
6211 share.options&= ~HA_OPTION_TEMP_COMPRESS_RECORD;
6212
6213 tmp_length= file_length+file_length/10;
6214 set_if_bigger(file_length,param->max_data_file_length);
6215 set_if_bigger(file_length,tmp_length);
6216 set_if_bigger(file_length,(ulonglong) share.base.max_data_file_length);
6217
6218 maria_close(*org_info);
6219
6220 bzero((char*) &create_info,sizeof(create_info));
6221 create_info.max_rows=MY_MAX(max_records,share.base.records);
6222 create_info.reloc_rows=share.base.reloc;
6223 create_info.old_options=(share.options |
6224 (unpack ? HA_OPTION_TEMP_COMPRESS_RECORD : 0));
6225
6226 create_info.data_file_length=file_length;
6227 create_info.auto_increment=share.state.auto_increment;
6228 create_info.language = (param->language ? param->language :
6229 share.base.language);
6230 create_info.key_file_length= status_info.key_file_length;
6231 create_info.org_data_file_type= ((enum data_file_type)
6232 share.state.header.org_data_file_type);
6233
6234 /*
6235 Allow for creating an auto_increment key. This has an effect only if
6236 an auto_increment key exists in the original table.
6237 */
6238 create_info.with_auto_increment= TRUE;
6239 create_info.null_bytes= share.base.null_bytes;
6240 create_info.transactional= share.base.born_transactional;
6241
6242 /*
6243 We don't have to handle symlinks here because we are using
6244 HA_DONT_TOUCH_DATA
6245 */
6246 if (maria_create(filename, share.data_file_type,
6247 share.base.keys - share.state.header.uniques,
6248 keyinfo, share.base.fields, columndef,
6249 share.state.header.uniques, uniquedef,
6250 &create_info,
6251 HA_DONT_TOUCH_DATA))
6252 {
6253 _ma_check_print_error(param,
6254 "Got error %d when trying to recreate indexfile",
6255 my_errno);
6256 goto end;
6257 }
6258 *org_info= maria_open(filename,O_RDWR,
6259 (HA_OPEN_FOR_REPAIR |
6260 ((param->testflag & T_WAIT_FOREVER) ?
6261 HA_OPEN_WAIT_IF_LOCKED :
6262 (param->testflag & T_DESCRIPT) ?
6263 HA_OPEN_IGNORE_IF_LOCKED :
6264 HA_OPEN_ABORT_IF_LOCKED)), 0);
6265 if (!*org_info)
6266 {
6267 _ma_check_print_error(param,
6268 "Got error %d when trying to open re-created "
6269 "indexfile", my_errno);
6270 goto end;
6271 }
6272 /* We are modifing */
6273 (*org_info)->s->options&= ~HA_OPTION_READ_ONLY_DATA;
6274 _ma_readinfo(*org_info,F_WRLCK,0);
6275 (*org_info)->s->state.state.records= info.state->records;
6276 if (share.state.create_time)
6277 (*org_info)->s->state.create_time=share.state.create_time;
6278 #ifdef MARIA_EXTERNAL_LOCKING
6279 (*org_info)->s->state.unique= (*org_info)->this_unique= share.state.unique;
6280 #endif
6281 (*org_info)->s->state.state.checksum= info.state->checksum;
6282 (*org_info)->s->state.state.del= info.state->del;
6283 (*org_info)->s->state.dellink= share.state.dellink;
6284 (*org_info)->s->state.state.empty= info.state->empty;
6285 (*org_info)->s->state.state.data_file_length= info.state->data_file_length;
6286 *(*org_info)->state= (*org_info)->s->state.state;
6287 if (maria_update_state_info(param,*org_info,UPDATE_TIME | UPDATE_STAT |
6288 UPDATE_OPEN_COUNT))
6289 goto end;
6290 error=0;
6291 end:
6292 my_afree(uniquedef);
6293 my_afree(keyinfo);
6294 my_afree(columndef);
6295 my_afree(keysegs);
6296 DBUG_RETURN(error);
6297 }
6298
6299
6300 /* Write suffix to data file if needed */
6301
maria_write_data_suffix(MARIA_SORT_INFO * sort_info,my_bool fix_datafile)6302 int maria_write_data_suffix(MARIA_SORT_INFO *sort_info, my_bool fix_datafile)
6303 {
6304 MARIA_HA *info=sort_info->new_info;
6305
6306 if (info->s->data_file_type == COMPRESSED_RECORD && fix_datafile)
6307 {
6308 uchar buff[MEMMAP_EXTRA_MARGIN];
6309 bzero(buff,sizeof(buff));
6310 if (my_b_write(&info->rec_cache,buff,sizeof(buff)))
6311 {
6312 _ma_check_print_error(sort_info->param,
6313 "%d when writing to datafile",my_errno);
6314 return 1;
6315 }
6316 sort_info->param->read_cache.end_of_file+=sizeof(buff);
6317 }
6318 return 0;
6319 }
6320
6321
6322 /* Update state and maria_chk time of indexfile */
6323
maria_update_state_info(HA_CHECK * param,MARIA_HA * info,uint update)6324 int maria_update_state_info(HA_CHECK *param, MARIA_HA *info,uint update)
6325 {
6326 MARIA_SHARE *share= info->s;
6327 DBUG_ENTER("maria_update_state_info");
6328
6329 if (update & UPDATE_OPEN_COUNT)
6330 {
6331 share->state.open_count=0;
6332 share->global_changed=0;
6333 share->changed= 1;
6334 }
6335 if (update & UPDATE_STAT)
6336 {
6337 uint i, key_parts= mi_uint2korr(share->state.header.key_parts);
6338 share->state.records_at_analyze= share->state.state.records;
6339 share->state.changed&= ~STATE_NOT_ANALYZED;
6340 if (share->state.state.records)
6341 {
6342 for (i=0; i<key_parts; i++)
6343 {
6344 if (!(share->state.rec_per_key_part[i]=param->new_rec_per_key_part[i]))
6345 share->state.changed|= STATE_NOT_ANALYZED;
6346 }
6347 }
6348 }
6349 if (update & (UPDATE_STAT | UPDATE_SORT | UPDATE_TIME | UPDATE_AUTO_INC))
6350 {
6351 if (update & UPDATE_TIME)
6352 {
6353 share->state.check_time= time((time_t*) 0);
6354 if (!share->state.create_time)
6355 share->state.create_time= share->state.check_time;
6356 }
6357 if (_ma_state_info_write(share,
6358 MA_STATE_INFO_WRITE_DONT_MOVE_OFFSET |
6359 MA_STATE_INFO_WRITE_FULL_INFO))
6360 goto err;
6361 }
6362 { /* Force update of status */
6363 int error;
6364 uint r_locks=share->r_locks,w_locks=share->w_locks;
6365 share->r_locks= share->w_locks= share->tot_locks= 0;
6366 error= _ma_writeinfo(info,WRITEINFO_NO_UNLOCK);
6367 share->r_locks=r_locks;
6368 share->w_locks=w_locks;
6369 share->tot_locks=r_locks+w_locks;
6370 if (!error)
6371 DBUG_RETURN(0);
6372 }
6373 err:
6374 _ma_check_print_error(param,"%d when updating keyfile",my_errno);
6375 DBUG_RETURN(1);
6376 }
6377
6378 /*
6379 Update auto increment value for a table
6380 When setting the 'repair_only' flag we only want to change the
6381 old auto_increment value if its wrong (smaller than some given key).
6382 The reason is that we shouldn't change the auto_increment value
6383 for a table without good reason when only doing a repair; If the
6384 user have inserted and deleted rows, the auto_increment value
6385 may be bigger than the biggest current row and this is ok.
6386
6387 If repair_only is not set, we will update the flag to the value in
6388 param->auto_increment is bigger than the biggest key.
6389 */
6390
_ma_update_auto_increment_key(HA_CHECK * param,MARIA_HA * info,my_bool repair_only)6391 void _ma_update_auto_increment_key(HA_CHECK *param, MARIA_HA *info,
6392 my_bool repair_only)
6393 {
6394 MARIA_SHARE *share= info->s;
6395 uchar *record;
6396 DBUG_ENTER("update_auto_increment_key");
6397
6398 if (!share->base.auto_key ||
6399 ! maria_is_key_active(share->state.key_map, share->base.auto_key - 1))
6400 {
6401 if (!(param->testflag & T_VERY_SILENT))
6402 _ma_check_print_info(param,
6403 "Table: %s doesn't have an auto increment key\n",
6404 param->isam_file_name);
6405 DBUG_VOID_RETURN;
6406 }
6407 if (!(param->testflag & T_SILENT) &&
6408 !(param->testflag & T_REP))
6409 printf("Updating Aria file: %s\n", param->isam_file_name);
6410 /*
6411 We have to use an allocated buffer instead of info->rec_buff as
6412 _ma_put_key_in_record() may use info->rec_buff
6413 */
6414 if (!(record= (uchar*) my_malloc(PSI_INSTRUMENT_ME, (size_t) share->base.default_rec_buff_size,
6415 MYF(0))))
6416 {
6417 _ma_check_print_error(param,"Not enough memory for extra record");
6418 DBUG_VOID_RETURN;
6419 }
6420
6421 maria_extra(info,HA_EXTRA_KEYREAD,0);
6422 if (maria_rlast(info, record, share->base.auto_key-1))
6423 {
6424 if (my_errno != HA_ERR_END_OF_FILE)
6425 {
6426 maria_extra(info,HA_EXTRA_NO_KEYREAD,0);
6427 my_free(record);
6428 _ma_check_print_error(param,"%d when reading last record",my_errno);
6429 DBUG_VOID_RETURN;
6430 }
6431 if (!repair_only)
6432 share->state.auto_increment=param->auto_increment_value;
6433 }
6434 else
6435 {
6436 const HA_KEYSEG *keyseg= share->keyinfo[share->base.auto_key-1].seg;
6437 ulonglong auto_increment=
6438 ma_retrieve_auto_increment(record + keyseg->start, keyseg->type);
6439 set_if_bigger(share->state.auto_increment,auto_increment);
6440 if (!repair_only)
6441 set_if_bigger(share->state.auto_increment, param->auto_increment_value);
6442 }
6443 maria_extra(info,HA_EXTRA_NO_KEYREAD,0);
6444 my_free(record);
6445 maria_update_state_info(param, info, UPDATE_AUTO_INC);
6446 DBUG_VOID_RETURN;
6447 }
6448
6449
6450 /*
6451 Update statistics for each part of an index
6452
6453 SYNOPSIS
6454 maria_update_key_parts()
6455 keyinfo IN Index information (only key->keysegs used)
6456 rec_per_key_part OUT Store statistics here
6457 unique IN Array of (#distinct tuples)
6458 notnull_tuples IN Array of (#tuples), or NULL
6459 records Number of records in the table
6460
6461 DESCRIPTION
6462 This function is called produce index statistics values from unique and
6463 notnull_tuples arrays after these arrays were produced with sequential
6464 index scan (the scan is done in two places: chk_index() and
6465 sort_key_write()).
6466
6467 This function handles all 3 index statistics collection methods.
6468
6469 Unique is an array:
6470 unique[0]= (#different values of {keypart1}) - 1
6471 unique[1]= (#different values of {keypart1,keypart2} tuple)-unique[0]-1
6472 ...
6473
6474 For MI_STATS_METHOD_IGNORE_NULLS method, notnull_tuples is an array too:
6475 notnull_tuples[0]= (#of {keypart1} tuples such that keypart1 is not NULL)
6476 notnull_tuples[1]= (#of {keypart1,keypart2} tuples such that all
6477 keypart{i} are not NULL)
6478 ...
6479 For all other statistics collection methods notnull_tuples==NULL.
6480
6481 Output is an array:
6482 rec_per_key_part[k] =
6483 = E(#records in the table such that keypart_1=c_1 AND ... AND
6484 keypart_k=c_k for arbitrary constants c_1 ... c_k)
6485
6486 = {assuming that values have uniform distribution and index contains all
6487 tuples from the domain (or that {c_1, ..., c_k} tuple is chosen from
6488 index tuples}
6489
6490 = #tuples-in-the-index / #distinct-tuples-in-the-index.
6491
6492 The #tuples-in-the-index and #distinct-tuples-in-the-index have different
6493 meaning depending on which statistics collection method is used:
6494
6495 MI_STATS_METHOD_* how are nulls compared? which tuples are counted?
6496 NULLS_EQUAL NULL == NULL all tuples in table
6497 NULLS_NOT_EQUAL NULL != NULL all tuples in table
6498 IGNORE_NULLS n/a tuples that don't have NULLs
6499 */
6500
maria_update_key_parts(MARIA_KEYDEF * keyinfo,double * rec_per_key_part,ulonglong * unique,ulonglong * notnull,ulonglong records)6501 void maria_update_key_parts(MARIA_KEYDEF *keyinfo, double *rec_per_key_part,
6502 ulonglong *unique, ulonglong *notnull,
6503 ulonglong records)
6504 {
6505 ulonglong count=0, unique_tuples;
6506 ulonglong tuples= records;
6507 uint parts;
6508 double tmp;
6509 for (parts=0 ; parts < keyinfo->keysegs ; parts++)
6510 {
6511 count+=unique[parts];
6512 unique_tuples= count + 1;
6513 if (notnull)
6514 {
6515 tuples= notnull[parts];
6516 /*
6517 #(unique_tuples not counting tuples with NULLs) =
6518 #(unique_tuples counting tuples with NULLs as different) -
6519 #(tuples with NULLs)
6520 */
6521 unique_tuples -= (records - notnull[parts]);
6522 }
6523
6524 if (unique_tuples == 0)
6525 tmp= 1;
6526 else if (count == 0)
6527 tmp= ulonglong2double(tuples); /* 1 unique tuple */
6528 else
6529 tmp= ulonglong2double(tuples) / ulonglong2double(unique_tuples);
6530
6531 /*
6532 for some weird keys (e.g. FULLTEXT) tmp can be <1 here.
6533 let's ensure it is not
6534 */
6535 set_if_bigger(tmp,1);
6536
6537 *rec_per_key_part++= tmp;
6538 }
6539 }
6540
6541
maria_byte_checksum(const uchar * buf,uint length)6542 static ha_checksum maria_byte_checksum(const uchar *buf, uint length)
6543 {
6544 ha_checksum crc;
6545 const uchar *end=buf+length;
6546 for (crc=0; buf != end; buf++)
6547 crc=((crc << 1) + *buf) +
6548 MY_TEST(crc & (((ha_checksum) 1) << (8 * sizeof(ha_checksum) - 1)));
6549 return crc;
6550 }
6551
maria_too_big_key_for_sort(MARIA_KEYDEF * key,ha_rows rows)6552 my_bool maria_too_big_key_for_sort(MARIA_KEYDEF *key, ha_rows rows)
6553 {
6554 uint key_maxlength=key->maxlength;
6555 if (key->flag & HA_FULLTEXT)
6556 {
6557 uint ft_max_word_len_for_sort=FT_MAX_WORD_LEN_FOR_SORT*
6558 key->seg->charset->mbmaxlen;
6559 key_maxlength+=ft_max_word_len_for_sort-HA_FT_MAXBYTELEN;
6560 }
6561 return (key->flag & HA_SPATIAL) ||
6562 (key->flag & (HA_BINARY_PACK_KEY | HA_VAR_LENGTH_KEY | HA_FULLTEXT) &&
6563 ((ulonglong) rows * key_maxlength >
6564 (ulonglong) maria_max_temp_length));
6565 }
6566
6567 /*
6568 Return TRUE if we can use repair by sorting
6569 One can set the force argument to force to use sorting
6570 even if the temporary file would be quite big!
6571 */
6572
maria_test_if_sort_rep(MARIA_HA * info,ha_rows rows,ulonglong key_map,my_bool force)6573 my_bool maria_test_if_sort_rep(MARIA_HA *info, ha_rows rows,
6574 ulonglong key_map, my_bool force)
6575 {
6576 MARIA_SHARE *share= info->s;
6577 MARIA_KEYDEF *key=share->keyinfo;
6578 uint i;
6579
6580 /*
6581 maria_repair_by_sort only works if we have at least one key. If we don't
6582 have any keys, we should use the normal repair.
6583 */
6584 if (! maria_is_any_key_active(key_map))
6585 return FALSE; /* Can't use sort */
6586 for (i=0 ; i < share->base.keys ; i++,key++)
6587 {
6588 if (!force && maria_too_big_key_for_sort(key,rows))
6589 return FALSE;
6590 }
6591 return TRUE;
6592 }
6593
6594
6595 /**
6596 @brief Create a new handle for manipulation the new record file
6597
6598 @note
6599 It's ok for Recovery to have two MARIA_SHARE on the same index file
6600 because the one we create here is not transactional
6601 */
6602
create_new_data_handle(MARIA_SORT_PARAM * param,File new_file)6603 static my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
6604 {
6605
6606 MARIA_SORT_INFO *sort_info= param->sort_info;
6607 MARIA_HA *info= sort_info->info;
6608 MARIA_HA *new_info;
6609 DBUG_ENTER("create_new_data_handle");
6610
6611 if (!(sort_info->new_info= maria_open(info->s->open_file_name.str, O_RDWR,
6612 HA_OPEN_COPY | HA_OPEN_FOR_REPAIR |
6613 HA_OPEN_INTERNAL_TABLE, 0)))
6614 DBUG_RETURN(1);
6615
6616 new_info= sort_info->new_info;
6617 _ma_bitmap_set_pagecache_callbacks(&new_info->s->bitmap.file,
6618 new_info->s);
6619 _ma_set_data_pagecache_callbacks(&new_info->dfile, new_info->s);
6620 change_data_file_descriptor(new_info, new_file);
6621 maria_lock_database(new_info, F_EXTRA_LCK);
6622 if ((sort_info->param->testflag & T_UNPACK) &&
6623 info->s->data_file_type == COMPRESSED_RECORD)
6624 {
6625 (*new_info->s->once_end)(new_info->s);
6626 (*new_info->s->end)(new_info);
6627 restore_data_file_type(new_info->s);
6628 _ma_setup_functions(new_info->s);
6629 if ((*new_info->s->once_init)(new_info->s, new_file) ||
6630 (*new_info->s->init)(new_info))
6631 DBUG_RETURN(1);
6632 }
6633 _ma_reset_status(new_info);
6634 if (_ma_initialize_data_file(new_info->s, new_file))
6635 DBUG_RETURN(1);
6636
6637 /* Take into account any bitmap page created above: */
6638 param->filepos= new_info->s->state.state.data_file_length;
6639
6640 /* Use new virtual functions for key generation */
6641 info->s->keypos_to_recpos= new_info->s->keypos_to_recpos;
6642 info->s->recpos_to_keypos= new_info->s->recpos_to_keypos;
6643 DBUG_RETURN(0);
6644 }
6645
6646
6647 static void
set_data_file_type(MARIA_SORT_INFO * sort_info,MARIA_SHARE * share)6648 set_data_file_type(MARIA_SORT_INFO *sort_info, MARIA_SHARE *share)
6649 {
6650 if ((sort_info->new_data_file_type=share->data_file_type) ==
6651 COMPRESSED_RECORD && sort_info->param->testflag & T_UNPACK)
6652 {
6653 MARIA_SHARE tmp;
6654 sort_info->new_data_file_type= share->state.header.org_data_file_type;
6655 /* Set delete_function for sort_delete_record() */
6656 tmp= *share;
6657 tmp.state.header.data_file_type= tmp.state.header.org_data_file_type;
6658 tmp.options= ~HA_OPTION_COMPRESS_RECORD;
6659 _ma_setup_functions(&tmp);
6660 share->delete_record=tmp.delete_record;
6661 }
6662 }
6663
restore_data_file_type(MARIA_SHARE * share)6664 static void restore_data_file_type(MARIA_SHARE *share)
6665 {
6666 MARIA_SHARE tmp_share;
6667 share->options&= ~HA_OPTION_COMPRESS_RECORD;
6668 mi_int2store(share->state.header.options,share->options);
6669 share->state.header.data_file_type=
6670 share->state.header.org_data_file_type;
6671 share->data_file_type= share->state.header.data_file_type;
6672 share->pack.header_length= 0;
6673
6674 /* Use new virtual functions for key generation */
6675 tmp_share= *share;
6676 _ma_setup_functions(&tmp_share);
6677 share->keypos_to_recpos= tmp_share.keypos_to_recpos;
6678 share->recpos_to_keypos= tmp_share.recpos_to_keypos;
6679 }
6680
6681
change_data_file_descriptor(MARIA_HA * info,File new_file)6682 static void change_data_file_descriptor(MARIA_HA *info, File new_file)
6683 {
6684 mysql_file_close(info->dfile.file, MYF(MY_WME));
6685 info->dfile.file= info->s->bitmap.file.file= new_file;
6686 _ma_bitmap_reset_cache(info->s);
6687 }
6688
6689
6690 /**
6691 @brief Mark the data file to not be used
6692
6693 @note
6694 This is used in repair when we want to ensure the handler will not
6695 write anything to the data file anymore
6696 */
6697
unuse_data_file_descriptor(MARIA_HA * info)6698 static void unuse_data_file_descriptor(MARIA_HA *info)
6699 {
6700 (void) flush_pagecache_blocks(info->s->pagecache,
6701 &info->s->bitmap.file,
6702 FLUSH_IGNORE_CHANGED);
6703 info->dfile.file= info->s->bitmap.file.file= -1;
6704 _ma_bitmap_reset_cache(info->s);
6705 }
6706
6707
6708 /*
6709 Copy all states that has to do with the data file
6710
6711 NOTES
6712 This is done to copy the state from the data file generated from
6713 repair to the original handler
6714 */
6715
copy_data_file_state(MARIA_STATE_INFO * to,MARIA_STATE_INFO * from)6716 static void copy_data_file_state(MARIA_STATE_INFO *to,
6717 MARIA_STATE_INFO *from)
6718 {
6719 to->state.records= from->state.records;
6720 to->state.del= from->state.del;
6721 to->state.empty= from->state.empty;
6722 to->state.data_file_length= from->state.data_file_length;
6723 to->split= from->split;
6724 to->dellink= from->dellink;
6725 to->first_bitmap_with_space= from->first_bitmap_with_space;
6726 }
6727
6728
6729 /*
6730 Read 'safely' next record while scanning table.
6731
6732 SYNOPSIS
6733 _ma_safe_scan_block_record()
6734 info Maria handler
6735 record Store found here
6736
6737 NOTES
6738 - One must have called mi_scan() before this
6739
6740 Differences compared to _ma_scan_block_records() are:
6741 - We read all blocks, not only blocks marked by the bitmap to be safe
6742 - In case of errors, next read will read next record.
6743 - More sanity checks
6744
6745 RETURN
6746 0 ok
6747 HA_ERR_END_OF_FILE End of file
6748 # error number
6749 */
6750
6751
_ma_safe_scan_block_record(MARIA_SORT_INFO * sort_info,MARIA_HA * info,uchar * record)6752 static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
6753 MARIA_HA *info, uchar *record)
6754 {
6755 MARIA_SHARE *share= info->s;
6756 MARIA_RECORD_POS record_pos= info->cur_row.nextpos;
6757 pgcache_page_no_t page= sort_info->page;
6758 DBUG_ENTER("_ma_safe_scan_block_record");
6759
6760 for (;;)
6761 {
6762 /* Find next row in current page */
6763 if (likely(record_pos < info->scan.number_of_rows))
6764 {
6765 uint length, offset;
6766 uchar *data, *end_of_data;
6767 char llbuff[22];
6768
6769 while (!(offset= uint2korr(info->scan.dir)))
6770 {
6771 info->scan.dir-= DIR_ENTRY_SIZE;
6772 record_pos++;
6773 if (info->scan.dir < info->scan.dir_end)
6774 {
6775 _ma_check_print_info(sort_info->param,
6776 "Wrong directory on page %s",
6777 llstr(page, llbuff));
6778 goto read_next_page;
6779 }
6780 }
6781 /* found row */
6782 info->cur_row.lastpos= info->scan.row_base_page + record_pos;
6783 info->cur_row.nextpos= record_pos + 1;
6784 data= info->scan.page_buff + offset;
6785 length= uint2korr(info->scan.dir + 2);
6786 end_of_data= data + length;
6787 info->scan.dir-= DIR_ENTRY_SIZE; /* Point to previous row */
6788
6789 if (end_of_data > info->scan.dir_end ||
6790 offset < PAGE_HEADER_SIZE(info->s) ||
6791 length < share->base.min_block_length)
6792 {
6793 _ma_check_print_info(sort_info->param,
6794 "Wrong directory entry %3u at page %s",
6795 (uint) record_pos, llstr(page, llbuff));
6796 record_pos++;
6797 continue;
6798 }
6799 else
6800 {
6801 DBUG_PRINT("info", ("rowid: %lu", (ulong) info->cur_row.lastpos));
6802 DBUG_RETURN(_ma_read_block_record2(info, record, data, end_of_data));
6803 }
6804 }
6805
6806 read_next_page:
6807 /* Read until we find next head page */
6808 for (;;)
6809 {
6810 uint page_type;
6811 char llbuff[22];
6812
6813 sort_info->page++; /* In case of errors */
6814 page++;
6815 if (!(page % share->bitmap.pages_covered))
6816 {
6817 /* Skip bitmap */
6818 page++;
6819 sort_info->page++;
6820 }
6821 if ((my_off_t) (page + 1) * share->block_size > sort_info->filelength)
6822 DBUG_RETURN(HA_ERR_END_OF_FILE);
6823 if (!(pagecache_read(share->pagecache,
6824 &info->dfile,
6825 page, 0, info->scan.page_buff,
6826 PAGECACHE_READ_UNKNOWN_PAGE,
6827 PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
6828 {
6829 if (my_errno == HA_ERR_WRONG_CRC ||
6830 my_errno == HA_ERR_DECRYPTION_FAILED)
6831 {
6832 /*
6833 Don't give errors for zero filled blocks. These can
6834 sometimes be found at end of a bitmap when we wrote a big
6835 record last that was moved to the next bitmap.
6836 */
6837 if (_ma_check_bitmap_data(info, UNALLOCATED_PAGE, 0,
6838 _ma_bitmap_get_page_bits(info,
6839 &share->bitmap,
6840 page)))
6841 {
6842 _ma_check_print_info(sort_info->param,
6843 "Wrong CRC on datapage at %s",
6844 llstr(page, llbuff));
6845 }
6846 continue;
6847 }
6848 DBUG_RETURN(my_errno);
6849 }
6850 page_type= (info->scan.page_buff[PAGE_TYPE_OFFSET] &
6851 PAGE_TYPE_MASK);
6852 if (page_type == HEAD_PAGE)
6853 {
6854 if ((info->scan.number_of_rows=
6855 (uint) (uchar) info->scan.page_buff[DIR_COUNT_OFFSET]) != 0)
6856 break;
6857 _ma_check_print_info(sort_info->param,
6858 "Wrong head page at page %s",
6859 llstr(page, llbuff));
6860 }
6861 else if (page_type >= MAX_PAGE_TYPE)
6862 {
6863 _ma_check_print_info(sort_info->param,
6864 "Found wrong page type: %d at page %s",
6865 page_type, llstr(page, llbuff));
6866 }
6867 }
6868
6869 /* New head page */
6870 info->scan.dir= (info->scan.page_buff + share->block_size -
6871 PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE);
6872 info->scan.dir_end= (info->scan.dir -
6873 (info->scan.number_of_rows - 1) *
6874 DIR_ENTRY_SIZE);
6875 info->scan.row_base_page= ma_recordpos(page, 0);
6876 record_pos= 0;
6877 }
6878 }
6879
6880
6881 /**
6882 @brief Writes a LOGREC_REPAIR_TABLE record and updates create_rename_lsn
6883 if needed (so that maria_read_log does not redo the repair).
6884
6885 @param param description of the REPAIR operation
6886 @param info table
6887
6888 @return Operation status
6889 @retval 0 ok
6890 @retval 1 error (disk problem)
6891 */
6892
write_log_record_for_repair(const HA_CHECK * param,MARIA_HA * info)6893 my_bool write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
6894 {
6895 MARIA_SHARE *share= info->s;
6896 /* in case this is maria_chk or recovery... */
6897 if (translog_status == TRANSLOG_OK && !maria_in_recovery &&
6898 share->base.born_transactional)
6899 {
6900 my_bool save_now_transactional= share->now_transactional;
6901
6902 /*
6903 For now this record is only informative. It could serve when applying
6904 logs to a backup, but that needs more thought. Assume table became
6905 corrupted. It is repaired, then some writes happen to it.
6906 Later we restore an old backup, and want to apply this REDO_REPAIR_TABLE
6907 record. For it to give the same result as originally, the table should
6908 be corrupted the same way, so applying previous REDOs should produce the
6909 same corruption; that's really not guaranteed (different execution paths
6910 in execution of REDOs vs runtime code so not same bugs hit, temporary
6911 hardware issues not repeatable etc). Corruption may not be repeatable.
6912 A reasonable solution is to execute the REDO_REPAIR_TABLE record and
6913 check if the checksum of the resulting table matches what it was at the
6914 end of the original repair (should be stored in log record); or execute
6915 the REDO_REPAIR_TABLE if the checksum of the table-before-repair matches
6916 was it was at the start of the original repair (should be stored in log
6917 record).
6918 */
6919 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
6920 uchar log_data[FILEID_STORE_SIZE + 8 + 8];
6921 LSN lsn;
6922
6923 /*
6924 testflag gives an idea of what REPAIR did (in particular T_QUICK
6925 or not: did it touch the data file or not?).
6926 */
6927 int8store(log_data + FILEID_STORE_SIZE, param->testflag);
6928 /* org_key_map is used when recreating index after a load data infile */
6929 int8store(log_data + FILEID_STORE_SIZE + 8, param->org_key_map);
6930
6931 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
6932 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
6933
6934 share->now_transactional= 1;
6935 if (unlikely(translog_write_record(&lsn, LOGREC_REDO_REPAIR_TABLE,
6936 &dummy_transaction_object, info,
6937 (translog_size_t) sizeof(log_data),
6938 sizeof(log_array)/sizeof(log_array[0]),
6939 log_array, log_data, NULL) ||
6940 translog_flush(lsn)))
6941 return TRUE;
6942 /*
6943 The table's existence was made durable earlier (MY_SYNC_DIR passed to
6944 maria_change_to_newfile()). All pages have been flushed, state too, we
6945 need to force it to disk. Old REDOs should not be applied to the table,
6946 which is already enforced as skip_redos_lsn was increased in
6947 protect_against_repair_crash(). But if this is an explicit repair,
6948 even UNDO phase should ignore this table: create_rename_lsn should be
6949 increased, and this also serves for the REDO_REPAIR to be ignored by
6950 maria_read_log.
6951 The fully correct order would be: sync data and index file, remove crash
6952 mark and update LSNs then write state and sync index file. But at this
6953 point state (without crash mark) is already written.
6954 */
6955 if ((!(param->testflag & T_NO_CREATE_RENAME_LSN) &&
6956 _ma_update_state_lsns(share, lsn, share->state.create_trid, FALSE,
6957 FALSE)) ||
6958 _ma_sync_table_files(info))
6959 return TRUE;
6960 share->now_transactional= save_now_transactional;
6961 }
6962 return FALSE;
6963 }
6964
6965
6966 /**
6967 Writes an UNDO record which if executed in UNDO phase, will empty the
6968 table. Such record is thus logged only in certain cases of bulk insert
6969 (table needs to be empty etc).
6970 */
write_log_record_for_bulk_insert(MARIA_HA * info)6971 my_bool write_log_record_for_bulk_insert(MARIA_HA *info)
6972 {
6973 LEX_CUSTRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
6974 uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE];
6975 LSN lsn;
6976 lsn_store(log_data, info->trn->undo_lsn);
6977 log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data;
6978 log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
6979 return translog_write_record(&lsn, LOGREC_UNDO_BULK_INSERT,
6980 info->trn, info,
6981 (translog_size_t)
6982 log_array[TRANSLOG_INTERNAL_PARTS +
6983 0].length,
6984 TRANSLOG_INTERNAL_PARTS + 1, log_array,
6985 log_data + LSN_STORE_SIZE, NULL) ||
6986 translog_flush(lsn); /* WAL */
6987 }
6988
6989
6990 /* Give error message why reading of key page failed */
6991
report_keypage_fault(HA_CHECK * param,MARIA_HA * info,my_off_t position)6992 static void report_keypage_fault(HA_CHECK *param, MARIA_HA *info,
6993 my_off_t position)
6994 {
6995 char buff[11];
6996 uint32 block_size= info->s->block_size;
6997
6998 if (my_errno == HA_ERR_CRASHED)
6999 _ma_check_print_error(param,
7000 "Wrong base information on indexpage at page: %s",
7001 llstr(position / block_size, buff));
7002 else
7003 _ma_check_print_error(param,
7004 "Can't read indexpage from page: %s, "
7005 "error: %d",
7006 llstr(position / block_size, buff), my_errno);
7007 }
7008
7009
7010 /**
7011 When we want to check a table, we verify that the transaction ids of rows
7012 and keys are not bigger than the biggest id generated by Maria so far, which
7013 is returned by the function below.
7014
7015 @note If control file is not open, 0 may be returned; to not confuse
7016 this with a valid max trid of 0, the caller should notice that it failed to
7017 open the control file (ma_control_file_inited() can serve for that).
7018 */
7019
max_trid_in_system(void)7020 static TrID max_trid_in_system(void)
7021 {
7022 TrID id= trnman_get_max_trid(); /* 0 if transac manager not initialized */
7023 /* 'id' may be far bigger, if last shutdown is old */
7024 return MY_MAX(id, max_trid_in_control_file);
7025 }
7026
7027
_ma_check_print_not_visible_error(HA_CHECK * param,TrID used_trid)7028 static void _ma_check_print_not_visible_error(HA_CHECK *param, TrID used_trid)
7029 {
7030 char buff[22], buff2[22];
7031 if (!param->not_visible_rows_found++)
7032 {
7033 if (!ma_control_file_inited())
7034 {
7035 _ma_check_print_warning(param,
7036 "Found row with transaction id %s but no "
7037 "aria_control_file was used or specified. "
7038 "The table may be corrupted",
7039 llstr(used_trid, buff));
7040 }
7041 else
7042 {
7043 _ma_check_print_error(param,
7044 "Found row with transaction id %s when max "
7045 "transaction id according to aria_control_file "
7046 "is %s",
7047 llstr(used_trid, buff),
7048 llstr(param->max_trid, buff2));
7049 }
7050 }
7051 }
7052
7053
7054 /**
7055 Mark that we can retry normal repair if we used quick repair
7056
7057 We shouldn't do this in case of disk error as in this case we are likely
7058 to loose much more than expected.
7059 */
7060
retry_if_quick(MARIA_SORT_PARAM * sort_param,int error)7061 void retry_if_quick(MARIA_SORT_PARAM *sort_param, int error)
7062 {
7063 HA_CHECK *param=sort_param->sort_info->param;
7064
7065 if (!sort_param->fix_datafile && error >= HA_ERR_FIRST)
7066 {
7067 param->retry_repair=1;
7068 param->testflag|=T_RETRY_WITHOUT_QUICK;
7069 }
7070 }
7071
7072 /* Print information about bitmap page */
7073
print_bitmap_description(MARIA_SHARE * share,pgcache_page_no_t page,uchar * bitmap_data)7074 static void print_bitmap_description(MARIA_SHARE *share,
7075 pgcache_page_no_t page,
7076 uchar *bitmap_data)
7077 {
7078 char *tmp= my_malloc(PSI_INSTRUMENT_ME, MAX_BITMAP_INFO_LENGTH, MYF(MY_WME));
7079 if (!tmp)
7080 return;
7081 _ma_get_bitmap_description(&share->bitmap, bitmap_data, page, tmp);
7082 printf("Bitmap page %lu\n%s", (ulong) page, tmp);
7083 my_free(tmp);
7084 }
7085