1 /*
2 Copyright (c) 2004, 2015, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "probes_mysql.h"
26 #include "sql_class.h" // SSV
27 #include "sql_table.h"
28 #include <myisam.h>
29
30 #include "ha_archive.h"
31 #include <my_dir.h>
32
33 #include <mysql/plugin.h>
34 #include "mysql/psi/mysql_file.h"
35
36 /*
37 First, if you want to understand storage engines you should look at
38 ha_example.cc and ha_example.h.
39
40 This example was written as a test case for a customer who needed
41 a storage engine without indexes that could compress data very well.
42 So, welcome to a completely compressed storage engine. This storage
43 engine only does inserts. No replace, deletes, or updates. All reads are
44 complete table scans. Compression is done through a combination of packing
45 and making use of the zlib library
46
47 We keep a file pointer open for each instance of ha_archive for each read
48 but for writes we keep one open file handle just for that. We flush it
49 only if we have a read occur. azip handles compressing lots of records
50 at once much better then doing lots of little records between writes.
51 It is possible to not lock on writes but this would then mean we couldn't
52 handle bulk inserts as well (that is if someone was trying to read at
53 the same time since we would want to flush).
54
55 A "meta" file is kept alongside the data file. This file serves two purpose.
56 The first purpose is to track the number of rows in the table. The second
57 purpose is to determine if the table was closed properly or not. When the
58 meta file is first opened it is marked as dirty. It is opened when the table
59 itself is opened for writing. When the table is closed the new count for rows
60 is written to the meta file and the file is marked as clean. If the meta file
61 is opened and it is marked as dirty, it is assumed that a crash occured. At
62 this point an error occurs and the user is told to rebuild the file.
63 A rebuild scans the rows and rewrites the meta file. If corruption is found
64 in the data file then the meta file is not repaired.
65
66 At some point a recovery method for such a drastic case needs to be divised.
67
68 Locks are row level, and you will get a consistant read.
69
70 For performance as far as table scans go it is quite fast. I don't have
71 good numbers but locally it has out performed both Innodb and MyISAM. For
72 Innodb the question will be if the table can be fit into the buffer
73 pool. For MyISAM its a question of how much the file system caches the
74 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75 doesn't have enough memory to cache entire table that archive turns out
76 to be any faster.
77
78 Examples between MyISAM (packed) and Archive.
79
80 Table with 76695844 identical rows:
81 29680807 a_archive.ARZ
82 920350317 a.MYD
83
84
85 Table with 8991478 rows (all of Slashdot's comments):
86 1922964506 comment_archive.ARZ
87 2944970297 comment_text.MYD
88
89
90 TODO:
91 Allow users to set compression level.
92 Allow adjustable block size.
93 Implement versioning, should be easy.
94 Allow for errors, find a way to mark bad rows.
95 Add optional feature so that rows can be flushed at interval (which will cause less
96 compression but may speed up ordered searches).
97 Checkpoint the meta file to allow for faster rebuilds.
98 Option to allow for dirty reads, this would lower the sync calls, which would make
99 inserts a lot faster, but would mean highly arbitrary reads.
100
101 -Brian
102
103 Archive file format versions:
104 <5.1.5 - v.1
105 5.1.5-5.1.15 - v.2
106 >5.1.15 - v.3
107 */
108
109 /* The file extension */
110 #define ARZ ".ARZ" // The data file
111 #define ARN ".ARN" // Files used during an optimize call
112 #define ARM ".ARM" // Meta file (deprecated)
113
114 /* 5.0 compatibility */
115 #define META_V1_OFFSET_CHECK_HEADER 0
116 #define META_V1_OFFSET_VERSION 1
117 #define META_V1_OFFSET_ROWS_RECORDED 2
118 #define META_V1_OFFSET_CHECK_POINT 10
119 #define META_V1_OFFSET_CRASHED 18
120 #define META_V1_LENGTH 19
121
122 /*
123 uchar + uchar
124 */
125 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
126 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
127
128 #ifdef HAVE_PSI_INTERFACE
129 extern "C" PSI_file_key arch_key_file_data;
130 #endif
131
132 /* Static declarations for handerton */
133 static handler *archive_create_handler(handlerton *hton,
134 TABLE_SHARE *table,
135 MEM_ROOT *mem_root);
136 int archive_discover(handlerton *hton, THD* thd, const char *db,
137 const char *name,
138 uchar **frmblob,
139 size_t *frmlen);
140
141 /*
142 Number of rows that will force a bulk insert.
143 */
144 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
145
146 /*
147 Size of header used for row
148 */
149 #define ARCHIVE_ROW_HEADER_SIZE 4
150
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)151 static handler *archive_create_handler(handlerton *hton,
152 TABLE_SHARE *table,
153 MEM_ROOT *mem_root)
154 {
155 return new (mem_root) ha_archive(hton, table);
156 }
157
158 PSI_memory_key az_key_memory_frm;
159 PSI_memory_key az_key_memory_record_buffer;
160
161 #ifdef HAVE_PSI_INTERFACE
162 PSI_mutex_key az_key_mutex_Archive_share_mutex;
163
164 static PSI_mutex_info all_archive_mutexes[]=
165 {
166 { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
167 };
168
169 PSI_file_key arch_key_file_metadata, arch_key_file_data, arch_key_file_frm;
170 static PSI_file_info all_archive_files[]=
171 {
172 { &arch_key_file_metadata, "metadata", 0},
173 { &arch_key_file_data, "data", 0},
174 { &arch_key_file_frm, "FRM", 0}
175 };
176
177 static PSI_memory_info all_archive_memory[]=
178 {
179 { &az_key_memory_frm, "FRM", 0},
180 { &az_key_memory_record_buffer, "record_buffer", 0},
181 };
182
183
init_archive_psi_keys(void)184 static void init_archive_psi_keys(void)
185 {
186 const char* category= "archive";
187 int count;
188
189 count= array_elements(all_archive_mutexes);
190 mysql_mutex_register(category, all_archive_mutexes, count);
191
192 count= array_elements(all_archive_files);
193 mysql_file_register(category, all_archive_files, count);
194
195 count= array_elements(all_archive_memory);
196 mysql_memory_register(category, all_archive_memory, count);
197 }
198
199
200 #endif /* HAVE_PSI_INTERFACE */
201
202 /*
203 Initialize the archive handler.
204
205 SYNOPSIS
206 archive_db_init()
207 void *
208
209 RETURN
210 FALSE OK
211 TRUE Error
212 */
213
archive_db_init(void * p)214 int archive_db_init(void *p)
215 {
216 DBUG_ENTER("archive_db_init");
217 handlerton *archive_hton;
218
219 #ifdef HAVE_PSI_INTERFACE
220 init_archive_psi_keys();
221 #endif
222
223 archive_hton= (handlerton *)p;
224 archive_hton->state= SHOW_OPTION_YES;
225 archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
226 archive_hton->create= archive_create_handler;
227 archive_hton->flags= HTON_NO_FLAGS;
228 archive_hton->discover= archive_discover;
229
230 DBUG_RETURN(0);
231 }
232
233
Archive_share()234 Archive_share::Archive_share()
235 {
236 crashed= false;
237 in_optimize= false;
238 archive_write_open= false;
239 dirty= false;
240 DBUG_PRINT("ha_archive", ("Archive_share: %p",
241 this));
242 thr_lock_init(&lock);
243 /*
244 We will use this lock for rows.
245 */
246 mysql_mutex_init(az_key_mutex_Archive_share_mutex,
247 &mutex, MY_MUTEX_INIT_FAST);
248 }
249
250
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)251 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
252 :handler(hton, table_arg), share(NULL), bulk_insert(0)
253 {
254 /* Set our original buffer from pre-allocated memory */
255 buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
256
257 /* The size of the offset value we will use for position() */
258 ref_length= sizeof(my_off_t);
259 archive_reader_open= FALSE;
260 }
261
archive_discover(handlerton * hton,THD * thd,const char * db,const char * name,uchar ** frmblob,size_t * frmlen)262 int archive_discover(handlerton *hton, THD* thd, const char *db,
263 const char *name,
264 uchar **frmblob,
265 size_t *frmlen)
266 {
267 DBUG_ENTER("archive_discover");
268 DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
269 azio_stream frm_stream;
270 char az_file[FN_REFLEN];
271 char *frm_ptr;
272 MY_STAT file_stat;
273
274 build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
275
276 if (!(mysql_file_stat(arch_key_file_data, az_file, &file_stat, MYF(0))))
277 goto err;
278
279 if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
280 {
281 if (errno == EROFS || errno == EACCES)
282 {
283 set_my_errno(errno);
284 DBUG_RETURN(errno);
285 }
286 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
287 }
288
289 if (frm_stream.frm_length == 0)
290 goto err;
291
292 frm_ptr= (char *)my_malloc(az_key_memory_frm,
293 sizeof(char) * frm_stream.frm_length, MYF(0));
294 azread_frm(&frm_stream, frm_ptr);
295 azclose(&frm_stream);
296
297 *frmlen= frm_stream.frm_length;
298 *frmblob= (uchar*) frm_ptr;
299
300 DBUG_RETURN(0);
301 err:
302 set_my_errno(0);
303 DBUG_RETURN(1);
304 }
305
save_auto_increment(TABLE * table,ulonglong * value)306 static void save_auto_increment(TABLE *table, ulonglong *value)
307 {
308 Field *field= table->found_next_number_field;
309 ulonglong auto_value=
310 (ulonglong) field->val_int(table->record[0] +
311 field->offset(table->record[0]));
312 if (*value <= auto_value)
313 *value= auto_value + 1;
314 }
315
316 /**
317 @brief Read version 1 meta file (5.0 compatibility routine).
318
319 @return Completion status
320 @retval 0 Success
321 @retval !0 Failure
322 */
323
read_v1_metafile()324 int Archive_share::read_v1_metafile()
325 {
326 char file_name[FN_REFLEN];
327 uchar buf[META_V1_LENGTH];
328 File fd;
329 DBUG_ENTER("Archive_share::read_v1_metafile");
330
331 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
332 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
333 DBUG_RETURN(-1);
334
335 if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
336 {
337 mysql_file_close(fd, MYF(0));
338 DBUG_RETURN(-1);
339 }
340
341 rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
342 crashed= buf[META_V1_OFFSET_CRASHED];
343 mysql_file_close(fd, MYF(0));
344 DBUG_RETURN(0);
345 }
346
347
348 /**
349 @brief Write version 1 meta file (5.0 compatibility routine).
350
351 @return Completion status
352 @retval 0 Success
353 @retval !0 Failure
354 */
355
write_v1_metafile()356 int Archive_share::write_v1_metafile()
357 {
358 char file_name[FN_REFLEN];
359 uchar buf[META_V1_LENGTH];
360 File fd;
361 DBUG_ENTER("Archive_share::write_v1_metafile");
362
363 buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
364 buf[META_V1_OFFSET_VERSION]= 1;
365 int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
366 int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
367 buf[META_V1_OFFSET_CRASHED]= crashed;
368
369 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
370 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
371 DBUG_RETURN(-1);
372
373 if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
374 {
375 mysql_file_close(fd, MYF(0));
376 DBUG_RETURN(-1);
377 }
378
379 mysql_file_close(fd, MYF(0));
380 DBUG_RETURN(0);
381 }
382
383
384 /**
385 @brief Pack version 1 row (5.0 compatibility routine).
386
387 @param[in] record the record to pack
388
389 @return Length of packed row
390 */
391
pack_row_v1(uchar * record)392 unsigned int ha_archive::pack_row_v1(uchar *record)
393 {
394 uint *blob, *end;
395 uchar *pos;
396 DBUG_ENTER("pack_row_v1");
397 memcpy(record_buffer->buffer, record, table->s->reclength);
398 pos= record_buffer->buffer + table->s->reclength;
399 for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
400 blob != end; blob++)
401 {
402 uint32 length= ((Field_blob *) table->field[*blob])->get_length();
403 if (length)
404 {
405 uchar *data_ptr;
406 ((Field_blob *) table->field[*blob])->get_ptr(&data_ptr);
407 memcpy(pos, data_ptr, length);
408 pos+= length;
409 }
410 }
411 DBUG_RETURN(pos - record_buffer->buffer);
412 }
413
414
415 /*
416 This method reads the header of a datafile and returns whether or not it was successful.
417 */
read_data_header(azio_stream * file_to_read)418 int ha_archive::read_data_header(azio_stream *file_to_read)
419 {
420 int error;
421 size_t ret;
422 uchar data_buffer[DATA_BUFFER_SIZE];
423 DBUG_ENTER("ha_archive::read_data_header");
424
425 if (azrewind(file_to_read) == -1)
426 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
427
428 if (file_to_read->version >= 3)
429 DBUG_RETURN(0);
430 /* Everything below this is just legacy to version 2< */
431
432 DBUG_PRINT("ha_archive", ("Reading legacy data header"));
433
434 ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
435
436 if (ret != DATA_BUFFER_SIZE)
437 {
438 DBUG_PRINT("ha_archive", ("Reading, expected %d got %zu",
439 DATA_BUFFER_SIZE, ret));
440 DBUG_RETURN(1);
441 }
442
443 if (error)
444 {
445 DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
446 DBUG_RETURN(1);
447 }
448
449 DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
450 DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
451
452 if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
453 (data_buffer[1] != (uchar)ARCHIVE_VERSION))
454 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
455
456 DBUG_RETURN(0);
457 }
458
459
460 /*
461 We create the shared memory space that we will use for the open table.
462 No matter what we try to get or create a share. This is so that a repair
463 table operation can occur.
464
465 See ha_example.cc for a longer description.
466 */
get_share(const char * table_name,int * rc)467 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
468 {
469 Archive_share *tmp_share;
470
471 DBUG_ENTER("ha_archive::get_share");
472
473 lock_shared_ha_data();
474 if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
475 {
476 azio_stream archive_tmp;
477
478 tmp_share= new Archive_share;
479
480 if (!tmp_share)
481 {
482 *rc= HA_ERR_OUT_OF_MEM;
483 goto err;
484 }
485 DBUG_PRINT("ha_archive", ("new Archive_share: %p",
486 tmp_share));
487
488 fn_format(tmp_share->data_file_name, table_name, "",
489 ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
490 my_stpcpy(tmp_share->table_name, table_name);
491 DBUG_PRINT("ha_archive", ("Data File %s",
492 tmp_share->data_file_name));
493
494 /*
495 We read the meta file, but do not mark it dirty. Since we are not
496 doing a write we won't mark it dirty (and we won't open it for
497 anything but reading... open it for write and we will generate null
498 compression writes).
499 */
500 if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
501 {
502 delete tmp_share;
503 *rc= my_errno() ? my_errno() : HA_ERR_CRASHED;
504 tmp_share= NULL;
505 goto err;
506 }
507 stats.auto_increment_value= archive_tmp.auto_increment + 1;
508 tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
509 tmp_share->crashed= archive_tmp.dirty;
510 share= tmp_share;
511 if (archive_tmp.version == 1)
512 share->read_v1_metafile();
513 azclose(&archive_tmp);
514
515 set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
516 }
517 if (tmp_share->crashed)
518 *rc= HA_ERR_CRASHED_ON_USAGE;
519 err:
520 unlock_shared_ha_data();
521
522 DBUG_ASSERT(tmp_share || *rc);
523
524 DBUG_RETURN(tmp_share);
525 }
526
527
init_archive_writer()528 int Archive_share::init_archive_writer()
529 {
530 DBUG_ENTER("Archive_share::init_archive_writer");
531 /*
532 It is expensive to open and close the data files and since you can't have
533 a gzip file that can be both read and written we keep a writer open
534 that is shared amoung all open tables.
535 */
536 if (!(azopen(&archive_write, data_file_name,
537 O_RDWR|O_BINARY)))
538 {
539 DBUG_PRINT("ha_archive", ("Could not open archive write file"));
540 crashed= true;
541 DBUG_RETURN(1);
542 }
543 archive_write_open= true;
544
545 DBUG_RETURN(0);
546 }
547
548
close_archive_writer()549 void Archive_share::close_archive_writer()
550 {
551 mysql_mutex_assert_owner(&mutex);
552 if (archive_write_open)
553 {
554 if (archive_write.version == 1)
555 (void) write_v1_metafile();
556 azclose(&archive_write);
557 archive_write_open= false;
558 dirty= false;
559 }
560 }
561
562
563 /*
564 No locks are required because it is associated with just one handler instance
565 */
init_archive_reader()566 int ha_archive::init_archive_reader()
567 {
568 DBUG_ENTER("ha_archive::init_archive_reader");
569 /*
570 It is expensive to open and close the data files and since you can't have
571 a gzip file that can be both read and written we keep a writer open
572 that is shared amoung all open tables, but have one reader open for
573 each handler instance.
574 */
575 if (!archive_reader_open)
576 {
577 if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
578 {
579 DBUG_PRINT("ha_archive", ("Could not open archive read file"));
580 share->crashed= TRUE;
581 DBUG_RETURN(1);
582 }
583 archive_reader_open= TRUE;
584 }
585
586 DBUG_RETURN(0);
587 }
588
589
590 /*
591 We just implement one additional file extension.
592 */
593 static const char *ha_archive_exts[] = {
594 ARZ,
595 NullS
596 };
597
bas_ext() const598 const char **ha_archive::bas_ext() const
599 {
600 return ha_archive_exts;
601 }
602
603
604 /*
605 When opening a file we:
606 Create/get our shared structure.
607 Init out lock.
608 We open the file we will read from.
609 */
open(const char * name,int mode,uint open_options)610 int ha_archive::open(const char *name, int mode, uint open_options)
611 {
612 int rc= 0;
613 DBUG_ENTER("ha_archive::open");
614
615 DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
616 (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
617 share= get_share(name, &rc);
618 if (!share)
619 DBUG_RETURN(rc);
620
621 /* Allow open on crashed table in repair mode only. */
622 switch (rc)
623 {
624 case 0:
625 break;
626 case HA_ERR_CRASHED_ON_USAGE:
627 if (open_options & HA_OPEN_FOR_REPAIR)
628 break;
629 /* fall through */
630 default:
631 DBUG_RETURN(rc);
632 }
633
634 record_buffer= create_record_buffer(table->s->reclength +
635 ARCHIVE_ROW_HEADER_SIZE);
636
637 if (!record_buffer)
638 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
639
640 thr_lock_data_init(&share->lock, &lock, NULL);
641
642 DBUG_PRINT("ha_archive", ("archive table was crashed %s",
643 rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
644 if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
645 {
646 DBUG_RETURN(0);
647 }
648
649 DBUG_RETURN(rc);
650 }
651
652
653 /*
654 Closes the file.
655
656 SYNOPSIS
657 close();
658
659 IMPLEMENTATION:
660
661 We first close this storage engines file handle to the archive and
662 then remove our reference count to the table (and possibly free it
663 as well).
664
665 RETURN
666 0 ok
667 1 Error
668 */
669
close(void)670 int ha_archive::close(void)
671 {
672 int rc= 0;
673 DBUG_ENTER("ha_archive::close");
674
675 destroy_record_buffer(record_buffer);
676
677 if (archive_reader_open)
678 {
679 if (azclose(&archive))
680 rc= 1;
681 }
682
683 DBUG_RETURN(rc);
684 }
685
686
frm_load(const char * name,azio_stream * dst)687 void ha_archive::frm_load(const char *name, azio_stream *dst)
688 {
689 char name_buff[FN_REFLEN];
690 MY_STAT file_stat;
691 File frm_file;
692 uchar *frm_ptr;
693 DBUG_ENTER("ha_archive::frm_load");
694 fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME);
695
696 /* Here is where we open up the frm and pass it to archive to store */
697 if ((frm_file= mysql_file_open(arch_key_file_frm, name_buff, O_RDONLY, MYF(0))) >= 0)
698 {
699 if (!mysql_file_fstat(frm_file, &file_stat, MYF(MY_WME)))
700 {
701 frm_ptr= (uchar *) my_malloc(az_key_memory_frm,
702 sizeof(uchar) * (size_t) file_stat.st_size, MYF(0));
703 if (frm_ptr)
704 {
705 if (mysql_file_read(frm_file, frm_ptr, (size_t) file_stat.st_size, MYF(0)) ==
706 (size_t) file_stat.st_size)
707 azwrite_frm(dst, (char *) frm_ptr, (size_t) file_stat.st_size);
708 my_free(frm_ptr);
709 }
710 }
711 mysql_file_close(frm_file, MYF(0));
712 }
713 DBUG_VOID_RETURN;
714 }
715
716
717 /**
718 Copy a frm blob between streams.
719
720 @param[in] src The source stream.
721 @param[in] dst The destination stream.
722
723 @return Zero on success, non-zero otherwise.
724 */
725
frm_copy(azio_stream * src,azio_stream * dst)726 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
727 {
728 int rc= 0;
729 char *frm_ptr;
730
731 /* If there is no .frm in source stream, try to read .frm from file. */
732 if (!src->frm_length)
733 {
734 frm_load(table->s->normalized_path.str, dst);
735 return 0;
736 }
737
738 if (!(frm_ptr= (char *) my_malloc(az_key_memory_frm,
739 src->frm_length, MYF(0))))
740 return HA_ERR_OUT_OF_MEM;
741
742 /* Write file offset is set to the end of the file. */
743 if (azread_frm(src, frm_ptr) ||
744 azwrite_frm(dst, frm_ptr, src->frm_length))
745 rc= my_errno() ? my_errno() : HA_ERR_INTERNAL_ERROR;
746
747 my_free(frm_ptr);
748
749 return rc;
750 }
751
752
753 /*
754 We create our data file here. The format is pretty simple.
755 You can read about the format of the data file above.
756 Unlike other storage engines we do not "pack" our data. Since we
757 are about to do a general compression, packing would just be a waste of
758 CPU time. If the table has blobs they are written after the row in the order
759 of creation.
760 */
761
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)762 int ha_archive::create(const char *name, TABLE *table_arg,
763 HA_CREATE_INFO *create_info)
764 {
765 char name_buff[FN_REFLEN];
766 char linkname[FN_REFLEN];
767 int error;
768 azio_stream create_stream; /* Archive file we are working with */
769 MY_STAT file_stat; // Stat information for the data file
770
771 DBUG_ENTER("ha_archive::create");
772
773 stats.auto_increment_value= create_info->auto_increment_value;
774
775 for (uint key= 0; key < table_arg->s->keys; key++)
776 {
777 KEY *pos= table_arg->key_info+key;
778 KEY_PART_INFO *key_part= pos->key_part;
779 KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
780
781 for (; key_part != key_part_end; key_part++)
782 {
783 Field *field= key_part->field;
784
785 if (!(field->flags & AUTO_INCREMENT_FLAG))
786 {
787 error= -1;
788 DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
789 goto error;
790 }
791 }
792 }
793
794 /*
795 We reuse name_buff since it is available.
796 */
797 #ifdef HAVE_READLINK
798 if (my_enable_symlinks &&
799 create_info->data_file_name &&
800 create_info->data_file_name[0] != '#')
801 {
802 DBUG_PRINT("ha_archive", ("archive will create stream file %s",
803 create_info->data_file_name));
804
805 fn_format(name_buff, create_info->data_file_name, "", ARZ,
806 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
807 fn_format(linkname, name, "", ARZ,
808 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
809 }
810 else
811 #endif /* HAVE_READLINK */
812 {
813 if (create_info->data_file_name)
814 {
815 push_warning_printf(table_arg->in_use, Sql_condition::SL_WARNING,
816 WARN_OPTION_IGNORED,
817 ER_DEFAULT(WARN_OPTION_IGNORED),
818 "DATA DIRECTORY");
819 }
820 fn_format(name_buff, name, "", ARZ,
821 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
822 linkname[0]= 0;
823 }
824
825 /* Archive engine never uses INDEX DIRECTORY. */
826 if (create_info->index_file_name)
827 {
828 push_warning_printf(table_arg->in_use, Sql_condition::SL_WARNING,
829 WARN_OPTION_IGNORED,
830 ER_DEFAULT(WARN_OPTION_IGNORED),
831 "INDEX DIRECTORY");
832 }
833
834 /*
835 There is a chance that the file was "discovered". In this case
836 just use whatever file is there.
837 */
838 if (!(mysql_file_stat(arch_key_file_data, name_buff, &file_stat, MYF(0))))
839 {
840 set_my_errno(0);
841 if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
842 {
843 error= errno;
844 goto error2;
845 }
846
847 if (linkname[0])
848 my_symlink(name_buff, linkname, MYF(0));
849
850 frm_load(name, &create_stream);
851
852 if (create_info->comment.str)
853 azwrite_comment(&create_stream, create_info->comment.str,
854 create_info->comment.length);
855
856 /*
857 Yes you need to do this, because the starting value
858 for the autoincrement may not be zero.
859 */
860 create_stream.auto_increment= stats.auto_increment_value ?
861 stats.auto_increment_value - 1 : 0;
862 if (azclose(&create_stream))
863 {
864 error= errno;
865 goto error2;
866 }
867 }
868 else
869 set_my_errno(0);
870
871 DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
872 DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
873
874
875 DBUG_RETURN(0);
876
877 error2:
878 delete_table(name);
879 error:
880 /* Return error number, if we got one */
881 DBUG_RETURN(error ? error : -1);
882 }
883
884 /*
885 This is where the actual row is written out.
886 */
real_write_row(uchar * buf,azio_stream * writer)887 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
888 {
889 my_off_t written;
890 unsigned int r_pack_length;
891 DBUG_ENTER("ha_archive::real_write_row");
892
893 /* We pack the row for writing */
894 r_pack_length= pack_row(buf, writer);
895
896 written= azwrite(writer, record_buffer->buffer, r_pack_length);
897 if (written != r_pack_length)
898 {
899 DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
900 (uint32) written,
901 (uint32)r_pack_length));
902 DBUG_RETURN(-1);
903 }
904
905 if (!bulk_insert)
906 share->dirty= TRUE;
907
908 DBUG_RETURN(0);
909 }
910
911
912 /*
913 Calculate max length needed for row. This includes
914 the bytes required for the length in the header.
915 */
916
max_row_length(const uchar * buf)917 uint32 ha_archive::max_row_length(const uchar *buf)
918 {
919 uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
920 length+= ARCHIVE_ROW_HEADER_SIZE;
921
922 uint *ptr, *end;
923 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
924 ptr != end ;
925 ptr++)
926 {
927 if (!table->field[*ptr]->is_null())
928 length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
929 }
930
931 return length;
932 }
933
934
pack_row(uchar * record,azio_stream * writer)935 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
936 {
937 uchar *ptr;
938
939 DBUG_ENTER("ha_archive::pack_row");
940
941
942 if (fix_rec_buff(max_row_length(record)))
943 DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
944
945 if (writer->version == 1)
946 DBUG_RETURN(pack_row_v1(record));
947
948 /* Copy null bits */
949 memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
950 record, table->s->null_bytes);
951 ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
952
953 for (Field **field=table->field ; *field ; field++)
954 {
955 if (!((*field)->is_null()))
956 ptr= (*field)->pack(ptr, record + (*field)->offset(record));
957 }
958
959 int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
960 ARCHIVE_ROW_HEADER_SIZE));
961 DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
962 (ptr - record_buffer->buffer -
963 ARCHIVE_ROW_HEADER_SIZE)));
964
965 DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
966 }
967
968
969 /*
970 Look at ha_archive::open() for an explanation of the row format.
971 Here we just write out the row.
972
973 Wondering about start_bulk_insert()? We don't implement it for
974 archive since it optimizes for lots of writes. The only save
975 for implementing start_bulk_insert() is that we could skip
976 setting dirty to true each time.
977 */
write_row(uchar * buf)978 int ha_archive::write_row(uchar *buf)
979 {
980 int rc;
981 uchar *read_buf= NULL;
982 ulonglong temp_auto;
983 uchar *record= table->record[0];
984 DBUG_ENTER("ha_archive::write_row");
985
986 if (share->crashed)
987 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
988
989 ha_statistic_increment(&SSV::ha_write_count);
990 mysql_mutex_lock(&share->mutex);
991
992 if (!share->archive_write_open && share->init_archive_writer())
993 {
994 rc= HA_ERR_CRASHED_ON_USAGE;
995 goto error;
996 }
997
998 if (table->next_number_field && record == table->record[0])
999 {
1000 KEY *mkey= &table->s->key_info[0]; // We only support one key right now
1001 update_auto_increment();
1002 temp_auto= (((Field_num*) table->next_number_field)->unsigned_flag ||
1003 table->next_number_field->val_int() > 0 ?
1004 table->next_number_field->val_int() : 0);
1005
1006 /*
1007 We don't support decremening auto_increment. They make the performance
1008 just cry.
1009 */
1010 if (temp_auto <= share->archive_write.auto_increment &&
1011 mkey->flags & HA_NOSAME)
1012 {
1013 rc= HA_ERR_FOUND_DUPP_KEY;
1014 goto error;
1015 }
1016 else
1017 {
1018 if (temp_auto > share->archive_write.auto_increment)
1019 stats.auto_increment_value=
1020 (share->archive_write.auto_increment= temp_auto) + 1;
1021 }
1022 }
1023
1024 /*
1025 Notice that the global auto_increment has been increased.
1026 In case of a failed row write, we will never try to reuse the value.
1027 */
1028 share->rows_recorded++;
1029 rc= real_write_row(buf, &(share->archive_write));
1030 error:
1031 mysql_mutex_unlock(&share->mutex);
1032 if (read_buf)
1033 my_free(read_buf);
1034 DBUG_RETURN(rc);
1035 }
1036
1037
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1038 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1039 ulonglong nb_desired_values,
1040 ulonglong *first_value,
1041 ulonglong *nb_reserved_values)
1042 {
1043 *nb_reserved_values= ULLONG_MAX;
1044 *first_value= share->archive_write.auto_increment + 1;
1045 }
1046
1047 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1048 int ha_archive::index_init(uint keynr, bool sorted)
1049 {
1050 DBUG_ENTER("ha_archive::index_init");
1051 active_index= keynr;
1052 DBUG_RETURN(0);
1053 }
1054
1055
1056 /*
1057 No indexes, so if we get a request for an index search since we tell
1058 the optimizer that we have unique indexes, we scan
1059 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1060 int ha_archive::index_read(uchar *buf, const uchar *key,
1061 uint key_len, enum ha_rkey_function find_flag)
1062 {
1063 int rc;
1064 DBUG_ENTER("ha_archive::index_read");
1065 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1066 rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1067 MYSQL_INDEX_READ_ROW_DONE(rc);
1068 DBUG_RETURN(rc);
1069 }
1070
1071
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1072 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1073 uint key_len, enum ha_rkey_function find_flag)
1074 {
1075 int rc;
1076 bool found= 0;
1077 KEY *mkey= &table->s->key_info[index];
1078 current_k_offset= mkey->key_part->offset;
1079 current_key= key;
1080 current_key_len= key_len;
1081
1082
1083 DBUG_ENTER("ha_archive::index_read_idx");
1084
1085 rc= rnd_init(TRUE);
1086
1087 if (rc)
1088 goto error;
1089
1090 while (!(get_row(&archive, buf)))
1091 {
1092 if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1093 {
1094 found= 1;
1095 break;
1096 }
1097 }
1098
1099 if (found)
1100 {
1101 /* notify handler that a record has been found */
1102 table->status= 0;
1103 DBUG_RETURN(0);
1104 }
1105
1106 error:
1107 DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1108 }
1109
1110
index_next(uchar * buf)1111 int ha_archive::index_next(uchar * buf)
1112 {
1113 bool found= 0;
1114 int rc;
1115
1116 DBUG_ENTER("ha_archive::index_next");
1117 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1118
1119 while (!(get_row(&archive, buf)))
1120 {
1121 if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1122 {
1123 found= 1;
1124 break;
1125 }
1126 }
1127
1128 rc= found ? 0 : HA_ERR_END_OF_FILE;
1129 MYSQL_INDEX_READ_ROW_DONE(rc);
1130 DBUG_RETURN(rc);
1131 }
1132
1133 /*
1134 All calls that need to scan the table start with this method. If we are told
1135 that it is a table scan we rewind the file to the beginning, otherwise
1136 we assume the position will be set.
1137 */
1138
rnd_init(bool scan)1139 int ha_archive::rnd_init(bool scan)
1140 {
1141 DBUG_ENTER("ha_archive::rnd_init");
1142
1143 if (share->crashed)
1144 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1145
1146 init_archive_reader();
1147
1148 /* We rewind the file so that we can read from the beginning if scan */
1149 if (scan)
1150 {
1151 scan_rows= stats.records;
1152 DBUG_PRINT("info", ("archive will retrieve %llu rows",
1153 (unsigned long long) scan_rows));
1154
1155 if (read_data_header(&archive))
1156 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1157 }
1158
1159 DBUG_RETURN(0);
1160 }
1161
1162
1163 /*
1164 This is the method that is used to read a row. It assumes that the row is
1165 positioned where you want it.
1166 */
get_row(azio_stream * file_to_read,uchar * buf)1167 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1168 {
1169 int rc;
1170 DBUG_ENTER("ha_archive::get_row");
1171 DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1172 (uchar)file_to_read->version,
1173 ARCHIVE_VERSION));
1174 if (file_to_read->version == ARCHIVE_VERSION)
1175 rc= get_row_version3(file_to_read, buf);
1176 else
1177 rc= get_row_version2(file_to_read, buf);
1178
1179 DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1180
1181 DBUG_RETURN(rc);
1182 }
1183
1184 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1185 bool ha_archive::fix_rec_buff(unsigned int length)
1186 {
1187 DBUG_ENTER("ha_archive::fix_rec_buff");
1188 DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1189 length, record_buffer->length));
1190 DBUG_ASSERT(record_buffer->buffer);
1191
1192 if (length > record_buffer->length)
1193 {
1194 uchar *newptr;
1195 if (!(newptr=(uchar*) my_realloc(az_key_memory_record_buffer,
1196 (uchar*) record_buffer->buffer,
1197 length,
1198 MYF(MY_ALLOW_ZERO_PTR))))
1199 DBUG_RETURN(1);
1200 record_buffer->buffer= newptr;
1201 record_buffer->length= length;
1202 }
1203
1204 DBUG_ASSERT(length <= record_buffer->length);
1205
1206 DBUG_RETURN(0);
1207 }
1208
unpack_row(azio_stream * file_to_read,uchar * record)1209 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1210 {
1211 DBUG_ENTER("ha_archive::unpack_row");
1212
1213 size_t read;
1214 int error;
1215 uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE], *size_buffer_p= size_buffer;
1216 unsigned int row_len;
1217
1218 /* First we grab the length stored */
1219 read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1220
1221 if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE))
1222 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1223
1224 /* If we read nothing we are at the end of the file */
1225 if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1226 DBUG_RETURN(HA_ERR_END_OF_FILE);
1227
1228 row_len= uint4korr(size_buffer_p);
1229 DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1230 (unsigned int)table->s->reclength));
1231
1232 if (fix_rec_buff(row_len))
1233 {
1234 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1235 }
1236 DBUG_ASSERT(row_len <= record_buffer->length);
1237
1238 read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1239
1240 if (read != row_len || error)
1241 {
1242 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1243 }
1244
1245 /* Copy null bits */
1246 const uchar *ptr= record_buffer->buffer;
1247 /*
1248 Field::unpack() is not called when field is NULL. For VARCHAR
1249 Field::unpack() only unpacks as much bytes as occupied by field
1250 value. In these cases respective memory area on record buffer is
1251 not initialized.
1252
1253 These uninitialized areas may be accessed by CHECKSUM TABLE or
1254 by optimizer using temporary table (BUG#12997905). We may remove
1255 this memset() when they're fixed.
1256 */
1257 memset(record, 0, table->s->reclength);
1258 memcpy(record, ptr, table->s->null_bytes);
1259 ptr+= table->s->null_bytes;
1260 for (Field **field=table->field ; *field ; field++)
1261 {
1262 if (!((*field)->is_null_in_record(record)))
1263 {
1264 ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1265 }
1266 }
1267 DBUG_RETURN(0);
1268 }
1269
1270
get_row_version3(azio_stream * file_to_read,uchar * buf)1271 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1272 {
1273 DBUG_ENTER("ha_archive::get_row_version3");
1274
1275 int returnable= unpack_row(file_to_read, buf);
1276
1277 DBUG_RETURN(returnable);
1278 }
1279
1280
get_row_version2(azio_stream * file_to_read,uchar * buf)1281 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1282 {
1283 size_t read;
1284 int error;
1285 uint *ptr, *end;
1286 char *last;
1287 size_t total_blob_length= 0;
1288 MY_BITMAP *read_set= table->read_set;
1289 DBUG_ENTER("ha_archive::get_row_version2");
1290
1291 read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1292
1293 /* If we read nothing we are at the end of the file */
1294 if (read == 0)
1295 DBUG_RETURN(HA_ERR_END_OF_FILE);
1296
1297 if (read != table->s->reclength)
1298 {
1299 DBUG_PRINT("ha_archive::get_row_version2", ("Read %zu bytes expected %u",
1300 read,
1301 (unsigned int)table->s->reclength));
1302 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1303 }
1304
1305 if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1306 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1307
1308 /* If the record is the wrong size, the file is probably damaged. */
1309 if ((ulong) read != table->s->reclength)
1310 DBUG_RETURN(HA_ERR_END_OF_FILE);
1311
1312 /* Calculate blob length, we use this for our buffer */
1313 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1314 ptr != end ;
1315 ptr++)
1316 {
1317 if (bitmap_is_set(read_set,
1318 (((Field_blob*) table->field[*ptr])->field_index)))
1319 total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1320 }
1321
1322 /* Adjust our row buffer if we need be */
1323 buffer.alloc(total_blob_length);
1324 last= (char *)buffer.ptr();
1325
1326 /* Loop through our blobs and read them */
1327 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1328 ptr != end ;
1329 ptr++)
1330 {
1331 size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1332 if (size)
1333 {
1334 if (bitmap_is_set(read_set,
1335 ((Field_blob*) table->field[*ptr])->field_index))
1336 {
1337 read= azread(file_to_read, last, size, &error);
1338
1339 if (error)
1340 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1341
1342 if ((size_t) read != size)
1343 DBUG_RETURN(HA_ERR_END_OF_FILE);
1344 ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1345 last += size;
1346 }
1347 else
1348 {
1349 (void)azseek(file_to_read, size, SEEK_CUR);
1350 }
1351 }
1352 }
1353 DBUG_RETURN(0);
1354 }
1355
1356
1357 /*
1358 Called during ORDER BY. Its position is either from being called sequentially
1359 or by having had ha_archive::rnd_pos() called before it is called.
1360 */
1361
rnd_next(uchar * buf)1362 int ha_archive::rnd_next(uchar *buf)
1363 {
1364 int rc;
1365 DBUG_ENTER("ha_archive::rnd_next");
1366 MYSQL_READ_ROW_START(table_share->db.str,
1367 table_share->table_name.str, TRUE);
1368
1369 if (share->crashed)
1370 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1371
1372 if (!scan_rows)
1373 {
1374 rc= HA_ERR_END_OF_FILE;
1375 goto end;
1376 }
1377 scan_rows--;
1378
1379 ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1380 current_position= aztell(&archive);
1381 rc= get_row(&archive, buf);
1382
1383 table->status=rc ? STATUS_NOT_FOUND: 0;
1384
1385 end:
1386 MYSQL_READ_ROW_DONE(rc);
1387 DBUG_RETURN(rc);
1388 }
1389
1390
1391 /*
1392 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1393 each call to ha_archive::rnd_next() if an ordering of the rows is
1394 needed.
1395 */
1396
position(const uchar * record)1397 void ha_archive::position(const uchar *record)
1398 {
1399 DBUG_ENTER("ha_archive::position");
1400 my_store_ptr(ref, ref_length, current_position);
1401 DBUG_VOID_RETURN;
1402 }
1403
1404
1405 /*
1406 This is called after a table scan for each row if the results of the
1407 scan need to be ordered. It will take *pos and use it to move the
1408 cursor in the file so that the next row that is called is the
1409 correctly ordered row.
1410 */
1411
rnd_pos(uchar * buf,uchar * pos)1412 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1413 {
1414 int rc;
1415 DBUG_ENTER("ha_archive::rnd_pos");
1416 MYSQL_READ_ROW_START(table_share->db.str,
1417 table_share->table_name.str, FALSE);
1418 ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1419 current_position= (my_off_t)my_get_ptr(pos, ref_length);
1420 if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1421 {
1422 rc= HA_ERR_CRASHED_ON_USAGE;
1423 goto end;
1424 }
1425 rc= get_row(&archive, buf);
1426 end:
1427 MYSQL_READ_ROW_DONE(rc);
1428 DBUG_RETURN(rc);
1429 }
1430
1431 /*
1432 This method repairs the meta file. It does this by walking the datafile and
1433 rewriting the meta file. If EXTENDED repair is requested, we attempt to
1434 recover as much data as possible.
1435 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1436 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1437 {
1438 DBUG_ENTER("ha_archive::repair");
1439 int rc= optimize(thd, check_opt);
1440
1441 if (rc)
1442 DBUG_RETURN(HA_ADMIN_CORRUPT);
1443
1444 share->crashed= FALSE;
1445 DBUG_RETURN(0);
1446 }
1447
1448 /*
1449 The table can become fragmented if data was inserted, read, and then
1450 inserted again. What we do is open up the file and recompress it completely.
1451 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1452 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1453 {
1454 int rc= 0;
1455 azio_stream writer;
1456 ha_rows count;
1457 my_bitmap_map *org_bitmap;
1458 char writer_filename[FN_REFLEN];
1459 DBUG_ENTER("ha_archive::optimize");
1460
1461 mysql_mutex_lock(&share->mutex);
1462 if (share->in_optimize)
1463 {
1464 mysql_mutex_unlock(&share->mutex);
1465 DBUG_RETURN(HA_ADMIN_FAILED);
1466 }
1467 share->in_optimize= true;
1468 /* remember the number of rows */
1469 count= share->rows_recorded;
1470 if (share->archive_write_open)
1471 azflush(&share->archive_write, Z_SYNC_FLUSH);
1472 mysql_mutex_unlock(&share->mutex);
1473
1474 init_archive_reader();
1475
1476 /* Lets create a file to contain the new data */
1477 fn_format(writer_filename, share->table_name, "", ARN,
1478 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1479
1480 if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1481 {
1482 share->in_optimize= false;
1483 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1484 }
1485
1486 /*
1487 Transfer the embedded FRM so that the file can be discoverable.
1488 Write file offset is set to the end of the file.
1489 */
1490 if ((rc= frm_copy(&archive, &writer)))
1491 {
1492 share->in_optimize= false;
1493 goto error;
1494 }
1495 /*
1496 An extended rebuild is a lot more effort. We open up each row and re-record it.
1497 Any dead rows are removed (aka rows that may have been partially recorded).
1498
1499 As of Archive format 3, this is the only type that is performed, before this
1500 version it was just done on T_EXTEND
1501 */
1502
1503 DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1504
1505 /*
1506 Now we will rewind the archive file so that we are positioned at the
1507 start of the file.
1508 */
1509 if ((rc= read_data_header(&archive)))
1510 {
1511 share->in_optimize= false;
1512 goto error;
1513 }
1514
1515 stats.auto_increment_value= 1;
1516 org_bitmap= tmp_use_all_columns(table, table->read_set);
1517 /* read rows upto the remembered rows */
1518 for (ha_rows cur_count= count; cur_count; cur_count--)
1519 {
1520 if ((rc= get_row(&archive, table->record[0])))
1521 break;
1522 real_write_row(table->record[0], &writer);
1523 if (table->found_next_number_field)
1524 save_auto_increment(table, &stats.auto_increment_value);
1525 }
1526
1527 mysql_mutex_lock(&share->mutex);
1528
1529 share->close_archive_writer();
1530 if (!rc)
1531 {
1532 /* read the remaining rows */
1533 for (count= share->rows_recorded - count; count; count--)
1534 {
1535 if ((rc= get_row(&archive, table->record[0])))
1536 break;
1537 real_write_row(table->record[0], &writer);
1538 if (table->found_next_number_field)
1539 save_auto_increment(table, &stats.auto_increment_value);
1540 }
1541 }
1542
1543 tmp_restore_column_map(table->read_set, org_bitmap);
1544 share->rows_recorded= (ha_rows) writer.rows;
1545 share->archive_write.auto_increment= stats.auto_increment_value - 1;
1546 DBUG_PRINT("info", ("recovered %llu archive rows",
1547 (unsigned long long)share->rows_recorded));
1548
1549 DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1550 (unsigned long long)share->rows_recorded));
1551
1552 /*
1553 If REPAIR ... EXTENDED is requested, try to recover as much data
1554 from data file as possible. In this case if we failed to read a
1555 record, we assume EOF. This allows massive data loss, but we can
1556 hardly do more with broken zlib stream. And this is the only way
1557 to restore at least what is still recoverable.
1558 */
1559 if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1560 {
1561 share->in_optimize= false;
1562 mysql_mutex_unlock(&share->mutex);
1563 goto error;
1564 }
1565
1566 azclose(&writer);
1567 share->dirty= FALSE;
1568 azclose(&archive);
1569 archive_reader_open= FALSE;
1570
1571 // make the file we just wrote be our data file
1572 rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1573 share->in_optimize= false;
1574 mysql_mutex_unlock(&share->mutex);
1575
1576 DBUG_RETURN(rc);
1577 error:
1578 DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1579 azclose(&writer);
1580
1581 DBUG_RETURN(rc);
1582 }
1583
1584 /*
1585 Below is an example of how to setup row level locking.
1586 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1587 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1588 THR_LOCK_DATA **to,
1589 enum thr_lock_type lock_type)
1590 {
1591 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1592 {
1593 /*
1594 Here is where we get into the guts of a row level lock.
1595 If TL_UNLOCK is set
1596 If we are not doing a LOCK TABLE or DISCARD/IMPORT
1597 TABLESPACE, then allow multiple writers
1598 */
1599
1600 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1601 lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1602 && !thd_tablespace_op(thd))
1603 lock_type = TL_WRITE_ALLOW_WRITE;
1604
1605 /*
1606 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1607 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1608 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1609 to t2. Convert the lock to a normal read lock to allow
1610 concurrent inserts to t2.
1611 */
1612
1613 if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1614 lock_type = TL_READ;
1615
1616 lock.type=lock_type;
1617 }
1618
1619 *to++= &lock;
1620
1621 return to;
1622 }
1623
update_create_info(HA_CREATE_INFO * create_info)1624 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1625 {
1626 char tmp_real_path[FN_REFLEN];
1627 DBUG_ENTER("ha_archive::update_create_info");
1628
1629 ha_archive::info(HA_STATUS_AUTO);
1630 if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1631 {
1632 create_info->auto_increment_value= stats.auto_increment_value;
1633 }
1634
1635 if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1636 create_info->data_file_name= sql_strdup(tmp_real_path);
1637
1638 DBUG_VOID_RETURN;
1639 }
1640
1641
1642 /*
1643 Hints for optimizer, see ha_tina for more information
1644 */
info(uint flag)1645 int ha_archive::info(uint flag)
1646 {
1647 DBUG_ENTER("ha_archive::info");
1648
1649 mysql_mutex_lock(&share->mutex);
1650 if (share->dirty)
1651 {
1652 DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1653 DBUG_ASSERT(share->archive_write_open);
1654 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1655 share->dirty= FALSE;
1656 }
1657
1658 /*
1659 This should be an accurate number now, though bulk inserts can
1660 cause the number to be inaccurate.
1661 */
1662 stats.records= share->rows_recorded;
1663 mysql_mutex_unlock(&share->mutex);
1664
1665 stats.deleted= 0;
1666
1667 DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1668 /* Costs quite a bit more to get all information */
1669 if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1670 {
1671 MY_STAT file_stat; // Stat information for the data file
1672
1673 (void) mysql_file_stat(arch_key_file_data, share->data_file_name, &file_stat, MYF(MY_WME));
1674
1675 if (flag & HA_STATUS_TIME)
1676 stats.update_time= (ulong) file_stat.st_mtime;
1677 if (flag & HA_STATUS_CONST)
1678 {
1679 stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1680 stats.max_data_file_length= MAX_FILE_SIZE;
1681 stats.create_time= (ulong) file_stat.st_ctime;
1682 }
1683 if (flag & HA_STATUS_VARIABLE)
1684 {
1685 stats.delete_length= 0;
1686 stats.data_file_length= file_stat.st_size;
1687 stats.index_file_length=0;
1688 stats.mean_rec_length= stats.records ?
1689 ulong(stats.data_file_length / stats.records) : table->s->reclength;
1690 }
1691 }
1692
1693 if (flag & HA_STATUS_AUTO)
1694 {
1695 /* TODO: Use the shared writer instead during the lock above. */
1696 init_archive_reader();
1697 mysql_mutex_lock(&share->mutex);
1698 azflush(&archive, Z_SYNC_FLUSH);
1699 mysql_mutex_unlock(&share->mutex);
1700 stats.auto_increment_value= archive.auto_increment + 1;
1701 }
1702
1703 DBUG_RETURN(0);
1704 }
1705
1706
1707 /**
1708 Handler hints.
1709
1710 @param operation Operation to prepare for.
1711
1712 @return Operation status
1713 @return 0 Success
1714 @return != 0 Error
1715 */
1716
extra(enum ha_extra_function operation)1717 int ha_archive::extra(enum ha_extra_function operation)
1718 {
1719 int ret= 0;
1720 DBUG_ENTER("ha_archive::extra");
1721 /* On windows we need to close all files before rename/delete. */
1722 #ifdef _WIN32
1723 switch (operation)
1724 {
1725 case HA_EXTRA_PREPARE_FOR_RENAME:
1726 case HA_EXTRA_FORCE_REOPEN:
1727 /* Close both reader and writer so we don't have the file open. */
1728 if (archive_reader_open)
1729 {
1730 ret= azclose(&archive);
1731 archive_reader_open= false;
1732 }
1733 mysql_mutex_lock(&share->mutex);
1734 share->close_archive_writer();
1735 mysql_mutex_unlock(&share->mutex);
1736 break;
1737 default:
1738 /* Nothing to do. */
1739 ;
1740 }
1741 #endif
1742 DBUG_RETURN(ret);
1743 }
1744
1745
1746 /*
1747 This method tells us that a bulk insert operation is about to occur. We set
1748 a flag which will keep write_row from saying that its data is dirty. This in
1749 turn will keep selects from causing a sync to occur.
1750 Basically, yet another optimizations to keep compression working well.
1751 */
start_bulk_insert(ha_rows rows)1752 void ha_archive::start_bulk_insert(ha_rows rows)
1753 {
1754 DBUG_ENTER("ha_archive::start_bulk_insert");
1755 if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1756 bulk_insert= TRUE;
1757 DBUG_VOID_RETURN;
1758 }
1759
1760
1761 /*
1762 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1763 flag, and set the share dirty so that the next select will call sync for us.
1764 */
end_bulk_insert()1765 int ha_archive::end_bulk_insert()
1766 {
1767 DBUG_ENTER("ha_archive::end_bulk_insert");
1768 bulk_insert= FALSE;
1769 mysql_mutex_lock(&share->mutex);
1770 if (share->archive_write_open)
1771 share->dirty= true;
1772 mysql_mutex_unlock(&share->mutex);
1773 DBUG_RETURN(0);
1774 }
1775
1776 /*
1777 We cancel a truncate command. The only way to delete an archive table is to drop it.
1778 This is done for security reasons. In a later version we will enable this by
1779 allowing the user to select a different row format.
1780 */
truncate()1781 int ha_archive::truncate()
1782 {
1783 DBUG_ENTER("ha_archive::truncate");
1784 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1785 }
1786
1787 /*
1788 We just return state if asked.
1789 */
is_crashed() const1790 bool ha_archive::is_crashed() const
1791 {
1792 DBUG_ENTER("ha_archive::is_crashed");
1793 DBUG_RETURN(share->crashed);
1794 }
1795
1796
1797 /**
1798 @brief Check for upgrade
1799
1800 @param[in] check_opt check options
1801
1802 @return Completion status
1803 @retval HA_ADMIN_OK No upgrade required
1804 @retval HA_ADMIN_CORRUPT Cannot read meta-data
1805 @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1806 */
1807
check_for_upgrade(HA_CHECK_OPT * check_opt)1808 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1809 {
1810 DBUG_ENTER("ha_archive::check_for_upgrade");
1811 if (init_archive_reader())
1812 DBUG_RETURN(HA_ADMIN_CORRUPT);
1813 if (archive.version < ARCHIVE_VERSION)
1814 DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1815 DBUG_RETURN(HA_ADMIN_OK);
1816 }
1817
1818
1819 /*
1820 Simple scan of the tables to make sure everything is ok.
1821 */
1822
check(THD * thd,HA_CHECK_OPT * check_opt)1823 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1824 {
1825 int rc= 0;
1826 const char *old_proc_info;
1827 ha_rows count;
1828 DBUG_ENTER("ha_archive::check");
1829
1830 old_proc_info= thd_proc_info(thd, "Checking table");
1831 mysql_mutex_lock(&share->mutex);
1832 count= share->rows_recorded;
1833 /* Flush any waiting data */
1834 if (share->archive_write_open)
1835 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1836 mysql_mutex_unlock(&share->mutex);
1837
1838 if (init_archive_reader())
1839 DBUG_RETURN(HA_ADMIN_CORRUPT);
1840 /*
1841 Now we will rewind the archive file so that we are positioned at the
1842 start of the file.
1843 */
1844 read_data_header(&archive);
1845 for (ha_rows cur_count= count; cur_count; cur_count--)
1846 {
1847 if ((rc= get_row(&archive, table->record[0])))
1848 goto error;
1849 }
1850 /*
1851 Now read records that may have been inserted concurrently.
1852 Acquire share->mutex so tail of the table is not modified by
1853 concurrent writers.
1854 */
1855 mysql_mutex_lock(&share->mutex);
1856 count= share->rows_recorded - count;
1857 if (share->archive_write_open)
1858 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1859 while (!(rc= get_row(&archive, table->record[0])))
1860 count--;
1861 mysql_mutex_unlock(&share->mutex);
1862
1863 if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1864 goto error;
1865
1866 thd_proc_info(thd, old_proc_info);
1867 DBUG_RETURN(HA_ADMIN_OK);
1868
1869 error:
1870 thd_proc_info(thd, old_proc_info);
1871 share->crashed= FALSE;
1872 DBUG_RETURN(HA_ADMIN_CORRUPT);
1873 }
1874
1875 /*
1876 Check and repair the table if needed.
1877 */
check_and_repair(THD * thd)1878 bool ha_archive::check_and_repair(THD *thd)
1879 {
1880 HA_CHECK_OPT check_opt;
1881 DBUG_ENTER("ha_archive::check_and_repair");
1882
1883 check_opt.init();
1884
1885 DBUG_RETURN(repair(thd, &check_opt));
1886 }
1887
create_record_buffer(unsigned int length)1888 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1889 {
1890 DBUG_ENTER("ha_archive::create_record_buffer");
1891 archive_record_buffer *r;
1892 if (!(r=
1893 (archive_record_buffer*) my_malloc(az_key_memory_record_buffer,
1894 sizeof(archive_record_buffer),
1895 MYF(MY_WME))))
1896 {
1897 DBUG_RETURN(NULL); /* purecov: inspected */
1898 }
1899 r->length= (int)length;
1900
1901 if (!(r->buffer= (uchar*) my_malloc(az_key_memory_record_buffer,
1902 r->length,
1903 MYF(MY_WME))))
1904 {
1905 my_free(r);
1906 DBUG_RETURN(NULL); /* purecov: inspected */
1907 }
1908
1909 DBUG_RETURN(r);
1910 }
1911
destroy_record_buffer(archive_record_buffer * r)1912 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1913 {
1914 DBUG_ENTER("ha_archive::destroy_record_buffer");
1915 my_free(r->buffer);
1916 my_free(r);
1917 DBUG_VOID_RETURN;
1918 }
1919
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1920 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info,
1921 uint table_changes)
1922 {
1923 if (info->auto_increment_value != stats.auto_increment_value ||
1924 (info->used_fields & HA_CREATE_USED_DATADIR) ||
1925 info->data_file_name ||
1926 (info->used_fields & HA_CREATE_USED_COMMENT) ||
1927 table_changes != IS_EQUAL_YES)
1928 return COMPATIBLE_DATA_NO;
1929
1930 return COMPATIBLE_DATA_YES;
1931 }
1932
1933
1934 struct st_mysql_storage_engine archive_storage_engine=
1935 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1936
mysql_declare_plugin(archive)1937 mysql_declare_plugin(archive)
1938 {
1939 MYSQL_STORAGE_ENGINE_PLUGIN,
1940 &archive_storage_engine,
1941 "ARCHIVE",
1942 "Brian Aker, MySQL AB",
1943 "Archive storage engine",
1944 PLUGIN_LICENSE_GPL,
1945 archive_db_init, /* Plugin Init */
1946 NULL, /* Plugin Deinit */
1947 0x0300 /* 3.0 */,
1948 NULL, /* status variables */
1949 NULL, /* system variables */
1950 NULL, /* config options */
1951 0, /* flags */
1952 }
1953 mysql_declare_plugin_end;
1954
1955