1 /*
2 Copyright (c) 2004, 2012, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "sql_priv.h"
26 #include "probes_mysql.h"
27 #include "sql_class.h" // SSV
28 #include "sql_table.h"
29 #include <myisam.h>
30
31 #include "ha_archive.h"
32 #include <my_dir.h>
33
34 #include <mysql/plugin.h>
35
36 /*
37 First, if you want to understand storage engines you should look at
38 ha_example.cc and ha_example.h.
39
40 This example was written as a test case for a customer who needed
41 a storage engine without indexes that could compress data very well.
42 So, welcome to a completely compressed storage engine. This storage
43 engine only does inserts. No replace, deletes, or updates. All reads are
44 complete table scans. Compression is done through a combination of packing
45 and making use of the zlib library
46
47 We keep a file pointer open for each instance of ha_archive for each read
48 but for writes we keep one open file handle just for that. We flush it
49 only if we have a read occur. azip handles compressing lots of records
50 at once much better then doing lots of little records between writes.
51 It is possible to not lock on writes but this would then mean we couldn't
52 handle bulk inserts as well (that is if someone was trying to read at
53 the same time since we would want to flush).
54
55 A "meta" file is kept alongside the data file. This file serves two purpose.
56 The first purpose is to track the number of rows in the table. The second
57 purpose is to determine if the table was closed properly or not. When the
58 meta file is first opened it is marked as dirty. It is opened when the table
59 itself is opened for writing. When the table is closed the new count for rows
60 is written to the meta file and the file is marked as clean. If the meta file
61 is opened and it is marked as dirty, it is assumed that a crash occured. At
62 this point an error occurs and the user is told to rebuild the file.
63 A rebuild scans the rows and rewrites the meta file. If corruption is found
64 in the data file then the meta file is not repaired.
65
66 At some point a recovery method for such a drastic case needs to be divised.
67
68 Locks are row level, and you will get a consistant read.
69
70 For performance as far as table scans go it is quite fast. I don't have
71 good numbers but locally it has out performed both Innodb and MyISAM. For
72 Innodb the question will be if the table can be fit into the buffer
73 pool. For MyISAM its a question of how much the file system caches the
74 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75 doesn't have enough memory to cache entire table that archive turns out
76 to be any faster.
77
78 Examples between MyISAM (packed) and Archive.
79
80 Table with 76695844 identical rows:
81 29680807 a_archive.ARZ
82 920350317 a.MYD
83
84
85 Table with 8991478 rows (all of Slashdot's comments):
86 1922964506 comment_archive.ARZ
87 2944970297 comment_text.MYD
88
89
90 TODO:
91 Allow users to set compression level.
92 Allow adjustable block size.
93 Implement versioning, should be easy.
94 Allow for errors, find a way to mark bad rows.
95 Add optional feature so that rows can be flushed at interval (which will cause less
96 compression but may speed up ordered searches).
97 Checkpoint the meta file to allow for faster rebuilds.
98 Option to allow for dirty reads, this would lower the sync calls, which would make
99 inserts a lot faster, but would mean highly arbitrary reads.
100
101 -Brian
102
103 Archive file format versions:
104 <5.1.5 - v.1
105 5.1.5-5.1.15 - v.2
106 >5.1.15 - v.3
107 */
108
109 /* The file extension */
110 #define ARZ ".ARZ" // The data file
111 #define ARN ".ARN" // Files used during an optimize call
112 #define ARM ".ARM" // Meta file (deprecated)
113
114 /* 5.0 compatibility */
115 #define META_V1_OFFSET_CHECK_HEADER 0
116 #define META_V1_OFFSET_VERSION 1
117 #define META_V1_OFFSET_ROWS_RECORDED 2
118 #define META_V1_OFFSET_CHECK_POINT 10
119 #define META_V1_OFFSET_CRASHED 18
120 #define META_V1_LENGTH 19
121
122 /*
123 uchar + uchar
124 */
125 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
126 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
127
128 #ifdef HAVE_PSI_INTERFACE
129 extern "C" PSI_file_key arch_key_file_data;
130 #endif
131
132 /* Static declarations for handerton */
133 static handler *archive_create_handler(handlerton *hton,
134 TABLE_SHARE *table,
135 MEM_ROOT *mem_root);
136 int archive_discover(handlerton *hton, THD* thd, const char *db,
137 const char *name,
138 uchar **frmblob,
139 size_t *frmlen);
140
141 /*
142 Number of rows that will force a bulk insert.
143 */
144 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
145
146 /*
147 Size of header used for row
148 */
149 #define ARCHIVE_ROW_HEADER_SIZE 4
150
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)151 static handler *archive_create_handler(handlerton *hton,
152 TABLE_SHARE *table,
153 MEM_ROOT *mem_root)
154 {
155 return new (mem_root) ha_archive(hton, table);
156 }
157
158
159 #ifdef HAVE_PSI_INTERFACE
160 PSI_mutex_key az_key_mutex_Archive_share_mutex;
161
162 static PSI_mutex_info all_archive_mutexes[]=
163 {
164 { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
165 };
166
167 PSI_file_key arch_key_file_metadata, arch_key_file_data, arch_key_file_frm;
168 static PSI_file_info all_archive_files[]=
169 {
170 { &arch_key_file_metadata, "metadata", 0},
171 { &arch_key_file_data, "data", 0},
172 { &arch_key_file_frm, "FRM", 0}
173 };
174
init_archive_psi_keys(void)175 static void init_archive_psi_keys(void)
176 {
177 const char* category= "archive";
178 int count;
179
180 count= array_elements(all_archive_mutexes);
181 mysql_mutex_register(category, all_archive_mutexes, count);
182
183 count= array_elements(all_archive_files);
184 mysql_file_register(category, all_archive_files, count);
185 }
186
187
188 #endif /* HAVE_PSI_INTERFACE */
189
190 /*
191 Initialize the archive handler.
192
193 SYNOPSIS
194 archive_db_init()
195 void *
196
197 RETURN
198 FALSE OK
199 TRUE Error
200 */
201
archive_db_init(void * p)202 int archive_db_init(void *p)
203 {
204 DBUG_ENTER("archive_db_init");
205 handlerton *archive_hton;
206
207 #ifdef HAVE_PSI_INTERFACE
208 init_archive_psi_keys();
209 #endif
210
211 archive_hton= (handlerton *)p;
212 archive_hton->state= SHOW_OPTION_YES;
213 archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
214 archive_hton->create= archive_create_handler;
215 archive_hton->flags= HTON_NO_FLAGS;
216 archive_hton->discover= archive_discover;
217
218 DBUG_RETURN(0);
219 }
220
221
Archive_share()222 Archive_share::Archive_share()
223 {
224 crashed= false;
225 in_optimize= false;
226 archive_write_open= false;
227 dirty= false;
228 DBUG_PRINT("ha_archive", ("Archive_share: %p",
229 this));
230 thr_lock_init(&lock);
231 /*
232 We will use this lock for rows.
233 */
234 mysql_mutex_init(az_key_mutex_Archive_share_mutex,
235 &mutex, MY_MUTEX_INIT_FAST);
236 }
237
238
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)239 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
240 :handler(hton, table_arg), share(NULL), delayed_insert(0), bulk_insert(0)
241 {
242 /* Set our original buffer from pre-allocated memory */
243 buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
244
245 /* The size of the offset value we will use for position() */
246 ref_length= sizeof(my_off_t);
247 archive_reader_open= FALSE;
248 }
249
archive_discover(handlerton * hton,THD * thd,const char * db,const char * name,uchar ** frmblob,size_t * frmlen)250 int archive_discover(handlerton *hton, THD* thd, const char *db,
251 const char *name,
252 uchar **frmblob,
253 size_t *frmlen)
254 {
255 DBUG_ENTER("archive_discover");
256 DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
257 azio_stream frm_stream;
258 char az_file[FN_REFLEN];
259 char *frm_ptr;
260 MY_STAT file_stat;
261
262 build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
263
264 if (!(mysql_file_stat(arch_key_file_data, az_file, &file_stat, MYF(0))))
265 goto err;
266
267 if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
268 {
269 if (errno == EROFS || errno == EACCES)
270 DBUG_RETURN(my_errno= errno);
271 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
272 }
273
274 if (frm_stream.frm_length == 0)
275 goto err;
276
277 frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
278 azread_frm(&frm_stream, frm_ptr);
279 azclose(&frm_stream);
280
281 *frmlen= frm_stream.frm_length;
282 *frmblob= (uchar*) frm_ptr;
283
284 DBUG_RETURN(0);
285 err:
286 my_errno= 0;
287 DBUG_RETURN(1);
288 }
289
save_auto_increment(TABLE * table,ulonglong * value)290 static void save_auto_increment(TABLE *table, ulonglong *value)
291 {
292 Field *field= table->found_next_number_field;
293 ulonglong auto_value=
294 (ulonglong) field->val_int(table->record[0] +
295 field->offset(table->record[0]));
296 if (*value <= auto_value)
297 *value= auto_value + 1;
298 }
299
300 /**
301 @brief Read version 1 meta file (5.0 compatibility routine).
302
303 @return Completion status
304 @retval 0 Success
305 @retval !0 Failure
306 */
307
read_v1_metafile()308 int Archive_share::read_v1_metafile()
309 {
310 char file_name[FN_REFLEN];
311 uchar buf[META_V1_LENGTH];
312 File fd;
313 DBUG_ENTER("Archive_share::read_v1_metafile");
314
315 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
316 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
317 DBUG_RETURN(-1);
318
319 if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
320 {
321 mysql_file_close(fd, MYF(0));
322 DBUG_RETURN(-1);
323 }
324
325 rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
326 crashed= buf[META_V1_OFFSET_CRASHED];
327 mysql_file_close(fd, MYF(0));
328 DBUG_RETURN(0);
329 }
330
331
332 /**
333 @brief Write version 1 meta file (5.0 compatibility routine).
334
335 @return Completion status
336 @retval 0 Success
337 @retval !0 Failure
338 */
339
write_v1_metafile()340 int Archive_share::write_v1_metafile()
341 {
342 char file_name[FN_REFLEN];
343 uchar buf[META_V1_LENGTH];
344 File fd;
345 DBUG_ENTER("Archive_share::write_v1_metafile");
346
347 buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
348 buf[META_V1_OFFSET_VERSION]= 1;
349 int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
350 int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
351 buf[META_V1_OFFSET_CRASHED]= crashed;
352
353 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
354 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
355 DBUG_RETURN(-1);
356
357 if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
358 {
359 mysql_file_close(fd, MYF(0));
360 DBUG_RETURN(-1);
361 }
362
363 mysql_file_close(fd, MYF(0));
364 DBUG_RETURN(0);
365 }
366
367
368 /**
369 @brief Pack version 1 row (5.0 compatibility routine).
370
371 @param[in] record the record to pack
372
373 @return Length of packed row
374 */
375
pack_row_v1(uchar * record)376 unsigned int ha_archive::pack_row_v1(uchar *record)
377 {
378 uint *blob, *end;
379 uchar *pos;
380 DBUG_ENTER("pack_row_v1");
381 memcpy(record_buffer->buffer, record, table->s->reclength);
382 pos= record_buffer->buffer + table->s->reclength;
383 for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
384 blob != end; blob++)
385 {
386 uint32 length= ((Field_blob *) table->field[*blob])->get_length();
387 if (length)
388 {
389 uchar *data_ptr;
390 ((Field_blob *) table->field[*blob])->get_ptr(&data_ptr);
391 memcpy(pos, data_ptr, length);
392 pos+= length;
393 }
394 }
395 DBUG_RETURN(pos - record_buffer->buffer);
396 }
397
398
399 /*
400 This method reads the header of a datafile and returns whether or not it was successful.
401 */
read_data_header(azio_stream * file_to_read)402 int ha_archive::read_data_header(azio_stream *file_to_read)
403 {
404 int error;
405 unsigned long ret;
406 uchar data_buffer[DATA_BUFFER_SIZE];
407 DBUG_ENTER("ha_archive::read_data_header");
408
409 if (azrewind(file_to_read) == -1)
410 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
411
412 if (file_to_read->version >= 3)
413 DBUG_RETURN(0);
414 /* Everything below this is just legacy to version 2< */
415
416 DBUG_PRINT("ha_archive", ("Reading legacy data header"));
417
418 ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
419
420 if (ret != DATA_BUFFER_SIZE)
421 {
422 DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
423 DATA_BUFFER_SIZE, ret));
424 DBUG_RETURN(1);
425 }
426
427 if (error)
428 {
429 DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
430 DBUG_RETURN(1);
431 }
432
433 DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
434 DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
435
436 if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
437 (data_buffer[1] != (uchar)ARCHIVE_VERSION))
438 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
439
440 DBUG_RETURN(0);
441 }
442
443
444 /*
445 We create the shared memory space that we will use for the open table.
446 No matter what we try to get or create a share. This is so that a repair
447 table operation can occur.
448
449 See ha_example.cc for a longer description.
450 */
get_share(const char * table_name,int * rc)451 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
452 {
453 Archive_share *tmp_share;
454
455 DBUG_ENTER("ha_archive::get_share");
456
457 lock_shared_ha_data();
458 if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
459 {
460 azio_stream archive_tmp;
461
462 tmp_share= new Archive_share;
463
464 if (!tmp_share)
465 {
466 *rc= HA_ERR_OUT_OF_MEM;
467 goto err;
468 }
469 DBUG_PRINT("ha_archive", ("new Archive_share: %p",
470 tmp_share));
471
472 fn_format(tmp_share->data_file_name, table_name, "",
473 ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
474 strmov(tmp_share->table_name, table_name);
475 DBUG_PRINT("ha_archive", ("Data File %s",
476 tmp_share->data_file_name));
477
478 /*
479 We read the meta file, but do not mark it dirty. Since we are not
480 doing a write we won't mark it dirty (and we won't open it for
481 anything but reading... open it for write and we will generate null
482 compression writes).
483 */
484 if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
485 {
486 delete tmp_share;
487 *rc= my_errno ? my_errno : HA_ERR_CRASHED;
488 tmp_share= NULL;
489 goto err;
490 }
491 stats.auto_increment_value= archive_tmp.auto_increment + 1;
492 tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
493 tmp_share->crashed= archive_tmp.dirty;
494 share= tmp_share;
495 if (archive_tmp.version == 1)
496 share->read_v1_metafile();
497 azclose(&archive_tmp);
498
499 set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
500 }
501 if (tmp_share->crashed)
502 *rc= HA_ERR_CRASHED_ON_USAGE;
503 err:
504 unlock_shared_ha_data();
505
506 DBUG_ASSERT(tmp_share || *rc);
507
508 DBUG_RETURN(tmp_share);
509 }
510
511
init_archive_writer()512 int Archive_share::init_archive_writer()
513 {
514 DBUG_ENTER("Archive_share::init_archive_writer");
515 /*
516 It is expensive to open and close the data files and since you can't have
517 a gzip file that can be both read and written we keep a writer open
518 that is shared amoung all open tables.
519 */
520 if (!(azopen(&archive_write, data_file_name,
521 O_RDWR|O_BINARY)))
522 {
523 DBUG_PRINT("ha_archive", ("Could not open archive write file"));
524 crashed= true;
525 DBUG_RETURN(1);
526 }
527 archive_write_open= true;
528
529 DBUG_RETURN(0);
530 }
531
532
close_archive_writer()533 void Archive_share::close_archive_writer()
534 {
535 mysql_mutex_assert_owner(&mutex);
536 if (archive_write_open)
537 {
538 if (archive_write.version == 1)
539 (void) write_v1_metafile();
540 azclose(&archive_write);
541 archive_write_open= false;
542 dirty= false;
543 }
544 }
545
546
547 /*
548 No locks are required because it is associated with just one handler instance
549 */
init_archive_reader()550 int ha_archive::init_archive_reader()
551 {
552 DBUG_ENTER("ha_archive::init_archive_reader");
553 /*
554 It is expensive to open and close the data files and since you can't have
555 a gzip file that can be both read and written we keep a writer open
556 that is shared amoung all open tables, but have one reader open for
557 each handler instance.
558 */
559 if (!archive_reader_open)
560 {
561 if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
562 {
563 DBUG_PRINT("ha_archive", ("Could not open archive read file"));
564 share->crashed= TRUE;
565 DBUG_RETURN(1);
566 }
567 archive_reader_open= TRUE;
568 }
569
570 DBUG_RETURN(0);
571 }
572
573
574 /*
575 We just implement one additional file extension.
576 */
577 static const char *ha_archive_exts[] = {
578 ARZ,
579 NullS
580 };
581
bas_ext() const582 const char **ha_archive::bas_ext() const
583 {
584 return ha_archive_exts;
585 }
586
587
588 /*
589 When opening a file we:
590 Create/get our shared structure.
591 Init out lock.
592 We open the file we will read from.
593 */
open(const char * name,int mode,uint open_options)594 int ha_archive::open(const char *name, int mode, uint open_options)
595 {
596 int rc= 0;
597 DBUG_ENTER("ha_archive::open");
598
599 DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
600 (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
601 share= get_share(name, &rc);
602 if (!share)
603 DBUG_RETURN(rc);
604
605 /* Allow open on crashed table in repair mode only. */
606 switch (rc)
607 {
608 case 0:
609 break;
610 case HA_ERR_CRASHED_ON_USAGE:
611 if (open_options & HA_OPEN_FOR_REPAIR)
612 break;
613 /* fall through */
614 default:
615 DBUG_RETURN(rc);
616 }
617
618 record_buffer= create_record_buffer(table->s->reclength +
619 ARCHIVE_ROW_HEADER_SIZE);
620
621 if (!record_buffer)
622 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
623
624 thr_lock_data_init(&share->lock, &lock, NULL);
625
626 DBUG_PRINT("ha_archive", ("archive table was crashed %s",
627 rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
628 if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
629 {
630 DBUG_RETURN(0);
631 }
632
633 DBUG_RETURN(rc);
634 }
635
636
637 /*
638 Closes the file.
639
640 SYNOPSIS
641 close();
642
643 IMPLEMENTATION:
644
645 We first close this storage engines file handle to the archive and
646 then remove our reference count to the table (and possibly free it
647 as well).
648
649 RETURN
650 0 ok
651 1 Error
652 */
653
close(void)654 int ha_archive::close(void)
655 {
656 int rc= 0;
657 DBUG_ENTER("ha_archive::close");
658
659 destroy_record_buffer(record_buffer);
660
661 if (archive_reader_open)
662 {
663 if (azclose(&archive))
664 rc= 1;
665 }
666
667 DBUG_RETURN(rc);
668 }
669
670
frm_load(const char * name,azio_stream * dst)671 void ha_archive::frm_load(const char *name, azio_stream *dst)
672 {
673 char name_buff[FN_REFLEN];
674 MY_STAT file_stat;
675 File frm_file;
676 uchar *frm_ptr;
677 DBUG_ENTER("ha_archive::frm_load");
678 fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME);
679
680 /* Here is where we open up the frm and pass it to archive to store */
681 if ((frm_file= mysql_file_open(arch_key_file_frm, name_buff, O_RDONLY, MYF(0))) >= 0)
682 {
683 if (!mysql_file_fstat(frm_file, &file_stat, MYF(MY_WME)))
684 {
685 frm_ptr= (uchar *) my_malloc(sizeof(uchar) * (size_t) file_stat.st_size, MYF(0));
686 if (frm_ptr)
687 {
688 if (mysql_file_read(frm_file, frm_ptr, (size_t) file_stat.st_size, MYF(0)) ==
689 (size_t) file_stat.st_size)
690 azwrite_frm(dst, (char *) frm_ptr, (size_t) file_stat.st_size);
691 my_free(frm_ptr);
692 }
693 }
694 mysql_file_close(frm_file, MYF(0));
695 }
696 DBUG_VOID_RETURN;
697 }
698
699
700 /**
701 Copy a frm blob between streams.
702
703 @param[in] src The source stream.
704 @param[in] dst The destination stream.
705
706 @return Zero on success, non-zero otherwise.
707 */
708
frm_copy(azio_stream * src,azio_stream * dst)709 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
710 {
711 int rc= 0;
712 char *frm_ptr;
713
714 /* If there is no .frm in source stream, try to read .frm from file. */
715 if (!src->frm_length)
716 {
717 frm_load(table->s->normalized_path.str, dst);
718 return 0;
719 }
720
721 if (!(frm_ptr= (char *) my_malloc(src->frm_length, MYF(0))))
722 return HA_ERR_OUT_OF_MEM;
723
724 /* Write file offset is set to the end of the file. */
725 if (azread_frm(src, frm_ptr) ||
726 azwrite_frm(dst, frm_ptr, src->frm_length))
727 rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
728
729 my_free(frm_ptr);
730
731 return rc;
732 }
733
734
735 /*
736 We create our data file here. The format is pretty simple.
737 You can read about the format of the data file above.
738 Unlike other storage engines we do not "pack" our data. Since we
739 are about to do a general compression, packing would just be a waste of
740 CPU time. If the table has blobs they are written after the row in the order
741 of creation.
742 */
743
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)744 int ha_archive::create(const char *name, TABLE *table_arg,
745 HA_CREATE_INFO *create_info)
746 {
747 char name_buff[FN_REFLEN];
748 char linkname[FN_REFLEN];
749 int error;
750 azio_stream create_stream; /* Archive file we are working with */
751 MY_STAT file_stat; // Stat information for the data file
752
753 DBUG_ENTER("ha_archive::create");
754
755 stats.auto_increment_value= create_info->auto_increment_value;
756
757 for (uint key= 0; key < table_arg->s->keys; key++)
758 {
759 KEY *pos= table_arg->key_info+key;
760 KEY_PART_INFO *key_part= pos->key_part;
761 KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
762
763 for (; key_part != key_part_end; key_part++)
764 {
765 Field *field= key_part->field;
766
767 if (!(field->flags & AUTO_INCREMENT_FLAG))
768 {
769 error= -1;
770 DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
771 goto error;
772 }
773 }
774 }
775
776 /*
777 We reuse name_buff since it is available.
778 */
779 #ifdef HAVE_READLINK
780 if (my_use_symdir &&
781 create_info->data_file_name &&
782 create_info->data_file_name[0] != '#')
783 {
784 DBUG_PRINT("ha_archive", ("archive will create stream file %s",
785 create_info->data_file_name));
786
787 fn_format(name_buff, create_info->data_file_name, "", ARZ,
788 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
789 fn_format(linkname, name, "", ARZ,
790 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
791 }
792 else
793 #endif /* HAVE_READLINK */
794 {
795 if (create_info->data_file_name)
796 {
797 push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
798 WARN_OPTION_IGNORED,
799 ER_DEFAULT(WARN_OPTION_IGNORED),
800 "DATA DIRECTORY");
801 }
802 fn_format(name_buff, name, "", ARZ,
803 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
804 linkname[0]= 0;
805 }
806
807 /* Archive engine never uses INDEX DIRECTORY. */
808 if (create_info->index_file_name)
809 {
810 push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
811 WARN_OPTION_IGNORED,
812 ER_DEFAULT(WARN_OPTION_IGNORED),
813 "INDEX DIRECTORY");
814 }
815
816 /*
817 There is a chance that the file was "discovered". In this case
818 just use whatever file is there.
819 */
820 if (!(mysql_file_stat(arch_key_file_data, name_buff, &file_stat, MYF(0))))
821 {
822 my_errno= 0;
823 if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
824 {
825 error= errno;
826 goto error2;
827 }
828
829 if (linkname[0])
830 my_symlink(name_buff, linkname, MYF(0));
831
832 frm_load(name, &create_stream);
833
834 if (create_info->comment.str)
835 azwrite_comment(&create_stream, create_info->comment.str,
836 create_info->comment.length);
837
838 /*
839 Yes you need to do this, because the starting value
840 for the autoincrement may not be zero.
841 */
842 create_stream.auto_increment= stats.auto_increment_value ?
843 stats.auto_increment_value - 1 : 0;
844 if (azclose(&create_stream))
845 {
846 error= errno;
847 goto error2;
848 }
849 }
850 else
851 my_errno= 0;
852
853 DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
854 DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
855
856
857 DBUG_RETURN(0);
858
859 error2:
860 delete_table(name);
861 error:
862 /* Return error number, if we got one */
863 DBUG_RETURN(error ? error : -1);
864 }
865
866 /*
867 This is where the actual row is written out.
868 */
real_write_row(uchar * buf,azio_stream * writer)869 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
870 {
871 my_off_t written;
872 unsigned int r_pack_length;
873 DBUG_ENTER("ha_archive::real_write_row");
874
875 /* We pack the row for writing */
876 r_pack_length= pack_row(buf, writer);
877
878 written= azwrite(writer, record_buffer->buffer, r_pack_length);
879 if (written != r_pack_length)
880 {
881 DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
882 (uint32) written,
883 (uint32)r_pack_length));
884 DBUG_RETURN(-1);
885 }
886
887 if (!delayed_insert || !bulk_insert)
888 share->dirty= TRUE;
889
890 DBUG_RETURN(0);
891 }
892
893
894 /*
895 Calculate max length needed for row. This includes
896 the bytes required for the length in the header.
897 */
898
max_row_length(const uchar * buf)899 uint32 ha_archive::max_row_length(const uchar *buf)
900 {
901 uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
902 length+= ARCHIVE_ROW_HEADER_SIZE;
903
904 uint *ptr, *end;
905 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
906 ptr != end ;
907 ptr++)
908 {
909 if (!table->field[*ptr]->is_null())
910 length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
911 }
912
913 return length;
914 }
915
916
pack_row(uchar * record,azio_stream * writer)917 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
918 {
919 uchar *ptr;
920
921 DBUG_ENTER("ha_archive::pack_row");
922
923
924 if (fix_rec_buff(max_row_length(record)))
925 DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
926
927 if (writer->version == 1)
928 DBUG_RETURN(pack_row_v1(record));
929
930 /* Copy null bits */
931 memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
932 record, table->s->null_bytes);
933 ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
934
935 for (Field **field=table->field ; *field ; field++)
936 {
937 if (!((*field)->is_null()))
938 ptr= (*field)->pack(ptr, record + (*field)->offset(record));
939 }
940
941 int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
942 ARCHIVE_ROW_HEADER_SIZE));
943 DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
944 (ptr - record_buffer->buffer -
945 ARCHIVE_ROW_HEADER_SIZE)));
946
947 DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
948 }
949
950
951 /*
952 Look at ha_archive::open() for an explanation of the row format.
953 Here we just write out the row.
954
955 Wondering about start_bulk_insert()? We don't implement it for
956 archive since it optimizes for lots of writes. The only save
957 for implementing start_bulk_insert() is that we could skip
958 setting dirty to true each time.
959 */
write_row(uchar * buf)960 int ha_archive::write_row(uchar *buf)
961 {
962 int rc;
963 uchar *read_buf= NULL;
964 ulonglong temp_auto;
965 uchar *record= table->record[0];
966 DBUG_ENTER("ha_archive::write_row");
967
968 if (share->crashed)
969 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
970
971 ha_statistic_increment(&SSV::ha_write_count);
972 mysql_mutex_lock(&share->mutex);
973
974 if (!share->archive_write_open && share->init_archive_writer())
975 {
976 rc= HA_ERR_CRASHED_ON_USAGE;
977 goto error;
978 }
979
980 if (table->next_number_field && record == table->record[0])
981 {
982 KEY *mkey= &table->s->key_info[0]; // We only support one key right now
983 update_auto_increment();
984 temp_auto= (((Field_num*) table->next_number_field)->unsigned_flag ||
985 table->next_number_field->val_int() > 0 ?
986 table->next_number_field->val_int() : 0);
987
988 /*
989 We don't support decremening auto_increment. They make the performance
990 just cry.
991 */
992 if (temp_auto <= share->archive_write.auto_increment &&
993 mkey->flags & HA_NOSAME)
994 {
995 rc= HA_ERR_FOUND_DUPP_KEY;
996 goto error;
997 }
998 #ifdef DEAD_CODE
999 /*
1000 Bad news, this will cause a search for the unique value which is very
1001 expensive since we will have to do a table scan which will lock up
1002 all other writers during this period. This could perhaps be optimized
1003 in the future.
1004 */
1005 {
1006 /*
1007 First we create a buffer that we can use for reading rows, and can pass
1008 to get_row().
1009 */
1010 if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1011 {
1012 rc= HA_ERR_OUT_OF_MEM;
1013 goto error;
1014 }
1015 /*
1016 All of the buffer must be written out or we won't see all of the
1017 data
1018 */
1019 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1020 /*
1021 Set the position of the local read thread to the beginning position.
1022 */
1023 if (read_data_header(&archive))
1024 {
1025 rc= HA_ERR_CRASHED_ON_USAGE;
1026 goto error;
1027 }
1028
1029 Field *mfield= table->next_number_field;
1030
1031 while (!(get_row(&archive, read_buf)))
1032 {
1033 if (!memcmp(read_buf + mfield->offset(record),
1034 table->next_number_field->ptr,
1035 mfield->max_display_length()))
1036 {
1037 rc= HA_ERR_FOUND_DUPP_KEY;
1038 goto error;
1039 }
1040 }
1041 }
1042 #endif
1043 else
1044 {
1045 if (temp_auto > share->archive_write.auto_increment)
1046 stats.auto_increment_value=
1047 (share->archive_write.auto_increment= temp_auto) + 1;
1048 }
1049 }
1050
1051 /*
1052 Notice that the global auto_increment has been increased.
1053 In case of a failed row write, we will never try to reuse the value.
1054 */
1055 share->rows_recorded++;
1056 rc= real_write_row(buf, &(share->archive_write));
1057 error:
1058 mysql_mutex_unlock(&share->mutex);
1059 if (read_buf)
1060 my_free(read_buf);
1061 DBUG_RETURN(rc);
1062 }
1063
1064
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1065 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1066 ulonglong nb_desired_values,
1067 ulonglong *first_value,
1068 ulonglong *nb_reserved_values)
1069 {
1070 *nb_reserved_values= ULONGLONG_MAX;
1071 *first_value= share->archive_write.auto_increment + 1;
1072 }
1073
1074 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1075 int ha_archive::index_init(uint keynr, bool sorted)
1076 {
1077 DBUG_ENTER("ha_archive::index_init");
1078 active_index= keynr;
1079 DBUG_RETURN(0);
1080 }
1081
1082
1083 /*
1084 No indexes, so if we get a request for an index search since we tell
1085 the optimizer that we have unique indexes, we scan
1086 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1087 int ha_archive::index_read(uchar *buf, const uchar *key,
1088 uint key_len, enum ha_rkey_function find_flag)
1089 {
1090 int rc;
1091 DBUG_ENTER("ha_archive::index_read");
1092 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1093 rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1094 MYSQL_INDEX_READ_ROW_DONE(rc);
1095 DBUG_RETURN(rc);
1096 }
1097
1098
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1099 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1100 uint key_len, enum ha_rkey_function find_flag)
1101 {
1102 int rc;
1103 bool found= 0;
1104 KEY *mkey= &table->s->key_info[index];
1105 current_k_offset= mkey->key_part->offset;
1106 current_key= key;
1107 current_key_len= key_len;
1108
1109
1110 DBUG_ENTER("ha_archive::index_read_idx");
1111
1112 rc= rnd_init(TRUE);
1113
1114 if (rc)
1115 goto error;
1116
1117 while (!(get_row(&archive, buf)))
1118 {
1119 if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1120 {
1121 found= 1;
1122 break;
1123 }
1124 }
1125
1126 if (found)
1127 {
1128 /* notify handler that a record has been found */
1129 table->status= 0;
1130 DBUG_RETURN(0);
1131 }
1132
1133 error:
1134 DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1135 }
1136
1137
index_next(uchar * buf)1138 int ha_archive::index_next(uchar * buf)
1139 {
1140 bool found= 0;
1141 int rc;
1142
1143 DBUG_ENTER("ha_archive::index_next");
1144 MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1145
1146 while (!(get_row(&archive, buf)))
1147 {
1148 if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1149 {
1150 found= 1;
1151 break;
1152 }
1153 }
1154
1155 rc= found ? 0 : HA_ERR_END_OF_FILE;
1156 MYSQL_INDEX_READ_ROW_DONE(rc);
1157 DBUG_RETURN(rc);
1158 }
1159
1160 /*
1161 All calls that need to scan the table start with this method. If we are told
1162 that it is a table scan we rewind the file to the beginning, otherwise
1163 we assume the position will be set.
1164 */
1165
rnd_init(bool scan)1166 int ha_archive::rnd_init(bool scan)
1167 {
1168 DBUG_ENTER("ha_archive::rnd_init");
1169
1170 if (share->crashed)
1171 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1172
1173 init_archive_reader();
1174
1175 /* We rewind the file so that we can read from the beginning if scan */
1176 if (scan)
1177 {
1178 scan_rows= stats.records;
1179 DBUG_PRINT("info", ("archive will retrieve %llu rows",
1180 (unsigned long long) scan_rows));
1181
1182 if (read_data_header(&archive))
1183 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1184 }
1185
1186 DBUG_RETURN(0);
1187 }
1188
1189
1190 /*
1191 This is the method that is used to read a row. It assumes that the row is
1192 positioned where you want it.
1193 */
get_row(azio_stream * file_to_read,uchar * buf)1194 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1195 {
1196 int rc;
1197 DBUG_ENTER("ha_archive::get_row");
1198 DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1199 (uchar)file_to_read->version,
1200 ARCHIVE_VERSION));
1201 if (file_to_read->version == ARCHIVE_VERSION)
1202 rc= get_row_version3(file_to_read, buf);
1203 else
1204 rc= get_row_version2(file_to_read, buf);
1205
1206 DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1207
1208 DBUG_RETURN(rc);
1209 }
1210
1211 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1212 bool ha_archive::fix_rec_buff(unsigned int length)
1213 {
1214 DBUG_ENTER("ha_archive::fix_rec_buff");
1215 DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1216 length, record_buffer->length));
1217 DBUG_ASSERT(record_buffer->buffer);
1218
1219 if (length > record_buffer->length)
1220 {
1221 uchar *newptr;
1222 if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1223 length,
1224 MYF(MY_ALLOW_ZERO_PTR))))
1225 DBUG_RETURN(1);
1226 record_buffer->buffer= newptr;
1227 record_buffer->length= length;
1228 }
1229
1230 DBUG_ASSERT(length <= record_buffer->length);
1231
1232 DBUG_RETURN(0);
1233 }
1234
unpack_row(azio_stream * file_to_read,uchar * record)1235 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1236 {
1237 DBUG_ENTER("ha_archive::unpack_row");
1238
1239 unsigned int read;
1240 int error;
1241 uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE], *size_buffer_p= size_buffer;
1242 unsigned int row_len;
1243
1244 /* First we grab the length stored */
1245 read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1246
1247 if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE))
1248 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1249
1250 /* If we read nothing we are at the end of the file */
1251 if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1252 DBUG_RETURN(HA_ERR_END_OF_FILE);
1253
1254 row_len= uint4korr(size_buffer_p);
1255 DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1256 (unsigned int)table->s->reclength));
1257
1258 if (fix_rec_buff(row_len))
1259 {
1260 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1261 }
1262 DBUG_ASSERT(row_len <= record_buffer->length);
1263
1264 read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1265
1266 if (read != row_len || error)
1267 {
1268 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1269 }
1270
1271 /* Copy null bits */
1272 const uchar *ptr= record_buffer->buffer;
1273 /*
1274 Field::unpack() is not called when field is NULL. For VARCHAR
1275 Field::unpack() only unpacks as much bytes as occupied by field
1276 value. In these cases respective memory area on record buffer is
1277 not initialized.
1278
1279 These uninitialized areas may be accessed by CHECKSUM TABLE or
1280 by optimizer using temporary table (BUG#12997905). We may remove
1281 this memset() when they're fixed.
1282 */
1283 memset(record, 0, table->s->reclength);
1284 memcpy(record, ptr, table->s->null_bytes);
1285 ptr+= table->s->null_bytes;
1286 for (Field **field=table->field ; *field ; field++)
1287 {
1288 if (!((*field)->is_null_in_record(record)))
1289 {
1290 ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1291 }
1292 }
1293 DBUG_RETURN(0);
1294 }
1295
1296
get_row_version3(azio_stream * file_to_read,uchar * buf)1297 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1298 {
1299 DBUG_ENTER("ha_archive::get_row_version3");
1300
1301 int returnable= unpack_row(file_to_read, buf);
1302
1303 DBUG_RETURN(returnable);
1304 }
1305
1306
get_row_version2(azio_stream * file_to_read,uchar * buf)1307 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1308 {
1309 unsigned int read;
1310 int error;
1311 uint *ptr, *end;
1312 char *last;
1313 size_t total_blob_length= 0;
1314 MY_BITMAP *read_set= table->read_set;
1315 DBUG_ENTER("ha_archive::get_row_version2");
1316
1317 read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1318
1319 /* If we read nothing we are at the end of the file */
1320 if (read == 0)
1321 DBUG_RETURN(HA_ERR_END_OF_FILE);
1322
1323 if (read != table->s->reclength)
1324 {
1325 DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1326 read,
1327 (unsigned int)table->s->reclength));
1328 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1329 }
1330
1331 if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1332 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1333
1334 /*
1335 If the record is the wrong size, the file is probably damaged, unless
1336 we are dealing with a delayed insert or a bulk insert.
1337 */
1338 if ((ulong) read != table->s->reclength)
1339 DBUG_RETURN(HA_ERR_END_OF_FILE);
1340
1341 /* Calculate blob length, we use this for our buffer */
1342 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1343 ptr != end ;
1344 ptr++)
1345 {
1346 if (bitmap_is_set(read_set,
1347 (((Field_blob*) table->field[*ptr])->field_index)))
1348 total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1349 }
1350
1351 /* Adjust our row buffer if we need be */
1352 buffer.alloc(total_blob_length);
1353 last= (char *)buffer.ptr();
1354
1355 /* Loop through our blobs and read them */
1356 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1357 ptr != end ;
1358 ptr++)
1359 {
1360 size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1361 if (size)
1362 {
1363 if (bitmap_is_set(read_set,
1364 ((Field_blob*) table->field[*ptr])->field_index))
1365 {
1366 read= azread(file_to_read, last, size, &error);
1367
1368 if (error)
1369 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1370
1371 if ((size_t) read != size)
1372 DBUG_RETURN(HA_ERR_END_OF_FILE);
1373 ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1374 last += size;
1375 }
1376 else
1377 {
1378 (void)azseek(file_to_read, size, SEEK_CUR);
1379 }
1380 }
1381 }
1382 DBUG_RETURN(0);
1383 }
1384
1385
1386 /*
1387 Called during ORDER BY. Its position is either from being called sequentially
1388 or by having had ha_archive::rnd_pos() called before it is called.
1389 */
1390
rnd_next(uchar * buf)1391 int ha_archive::rnd_next(uchar *buf)
1392 {
1393 int rc;
1394 DBUG_ENTER("ha_archive::rnd_next");
1395 MYSQL_READ_ROW_START(table_share->db.str,
1396 table_share->table_name.str, TRUE);
1397
1398 if (share->crashed)
1399 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1400
1401 if (!scan_rows)
1402 {
1403 rc= HA_ERR_END_OF_FILE;
1404 goto end;
1405 }
1406 scan_rows--;
1407
1408 ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1409 current_position= aztell(&archive);
1410 rc= get_row(&archive, buf);
1411
1412 table->status=rc ? STATUS_NOT_FOUND: 0;
1413
1414 end:
1415 MYSQL_READ_ROW_DONE(rc);
1416 DBUG_RETURN(rc);
1417 }
1418
1419
1420 /*
1421 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1422 each call to ha_archive::rnd_next() if an ordering of the rows is
1423 needed.
1424 */
1425
position(const uchar * record)1426 void ha_archive::position(const uchar *record)
1427 {
1428 DBUG_ENTER("ha_archive::position");
1429 my_store_ptr(ref, ref_length, current_position);
1430 DBUG_VOID_RETURN;
1431 }
1432
1433
1434 /*
1435 This is called after a table scan for each row if the results of the
1436 scan need to be ordered. It will take *pos and use it to move the
1437 cursor in the file so that the next row that is called is the
1438 correctly ordered row.
1439 */
1440
rnd_pos(uchar * buf,uchar * pos)1441 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1442 {
1443 int rc;
1444 DBUG_ENTER("ha_archive::rnd_pos");
1445 MYSQL_READ_ROW_START(table_share->db.str,
1446 table_share->table_name.str, FALSE);
1447 ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1448 current_position= (my_off_t)my_get_ptr(pos, ref_length);
1449 if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1450 {
1451 rc= HA_ERR_CRASHED_ON_USAGE;
1452 goto end;
1453 }
1454 rc= get_row(&archive, buf);
1455 end:
1456 MYSQL_READ_ROW_DONE(rc);
1457 DBUG_RETURN(rc);
1458 }
1459
1460 /*
1461 This method repairs the meta file. It does this by walking the datafile and
1462 rewriting the meta file. If EXTENDED repair is requested, we attempt to
1463 recover as much data as possible.
1464 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1465 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1466 {
1467 DBUG_ENTER("ha_archive::repair");
1468 int rc= optimize(thd, check_opt);
1469
1470 if (rc)
1471 DBUG_RETURN(HA_ADMIN_CORRUPT);
1472
1473 share->crashed= FALSE;
1474 DBUG_RETURN(0);
1475 }
1476
1477 /*
1478 The table can become fragmented if data was inserted, read, and then
1479 inserted again. What we do is open up the file and recompress it completely.
1480 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1481 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1482 {
1483 int rc= 0;
1484 azio_stream writer;
1485 ha_rows count;
1486 my_bitmap_map *org_bitmap;
1487 char writer_filename[FN_REFLEN];
1488 DBUG_ENTER("ha_archive::optimize");
1489
1490 mysql_mutex_lock(&share->mutex);
1491 if (share->in_optimize)
1492 {
1493 mysql_mutex_unlock(&share->mutex);
1494 DBUG_RETURN(HA_ADMIN_FAILED);
1495 }
1496 share->in_optimize= true;
1497 /* remember the number of rows */
1498 count= share->rows_recorded;
1499 if (share->archive_write_open)
1500 azflush(&share->archive_write, Z_SYNC_FLUSH);
1501 mysql_mutex_unlock(&share->mutex);
1502
1503 init_archive_reader();
1504
1505 /* Lets create a file to contain the new data */
1506 fn_format(writer_filename, share->table_name, "", ARN,
1507 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1508
1509 if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1510 {
1511 share->in_optimize= false;
1512 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1513 }
1514
1515 /*
1516 Transfer the embedded FRM so that the file can be discoverable.
1517 Write file offset is set to the end of the file.
1518 */
1519 if ((rc= frm_copy(&archive, &writer)))
1520 {
1521 share->in_optimize= false;
1522 goto error;
1523 }
1524 /*
1525 An extended rebuild is a lot more effort. We open up each row and re-record it.
1526 Any dead rows are removed (aka rows that may have been partially recorded).
1527
1528 As of Archive format 3, this is the only type that is performed, before this
1529 version it was just done on T_EXTEND
1530 */
1531
1532 DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1533
1534 /*
1535 Now we will rewind the archive file so that we are positioned at the
1536 start of the file.
1537 */
1538 if ((rc= read_data_header(&archive)))
1539 {
1540 share->in_optimize= false;
1541 goto error;
1542 }
1543
1544 stats.auto_increment_value= 1;
1545 org_bitmap= tmp_use_all_columns(table, table->read_set);
1546 /* read rows upto the remembered rows */
1547 for (ha_rows cur_count= count; cur_count; cur_count--)
1548 {
1549 if ((rc= get_row(&archive, table->record[0])))
1550 break;
1551 real_write_row(table->record[0], &writer);
1552 if (table->found_next_number_field)
1553 save_auto_increment(table, &stats.auto_increment_value);
1554 }
1555
1556 mysql_mutex_lock(&share->mutex);
1557
1558 share->close_archive_writer();
1559 if (!rc)
1560 {
1561 /* read the remaining rows */
1562 for (count= share->rows_recorded - count; count; count--)
1563 {
1564 if ((rc= get_row(&archive, table->record[0])))
1565 break;
1566 real_write_row(table->record[0], &writer);
1567 if (table->found_next_number_field)
1568 save_auto_increment(table, &stats.auto_increment_value);
1569 }
1570 }
1571
1572 tmp_restore_column_map(table->read_set, org_bitmap);
1573 share->rows_recorded= (ha_rows) writer.rows;
1574 share->archive_write.auto_increment= stats.auto_increment_value - 1;
1575 DBUG_PRINT("info", ("recovered %llu archive rows",
1576 (unsigned long long)share->rows_recorded));
1577
1578 DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1579 (unsigned long long)share->rows_recorded));
1580
1581 /*
1582 If REPAIR ... EXTENDED is requested, try to recover as much data
1583 from data file as possible. In this case if we failed to read a
1584 record, we assume EOF. This allows massive data loss, but we can
1585 hardly do more with broken zlib stream. And this is the only way
1586 to restore at least what is still recoverable.
1587 */
1588 if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1589 {
1590 share->in_optimize= false;
1591 mysql_mutex_unlock(&share->mutex);
1592 goto error;
1593 }
1594
1595 azclose(&writer);
1596 share->dirty= FALSE;
1597 azclose(&archive);
1598 archive_reader_open= FALSE;
1599
1600 // make the file we just wrote be our data file
1601 rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1602 share->in_optimize= false;
1603 mysql_mutex_unlock(&share->mutex);
1604
1605 DBUG_RETURN(rc);
1606 error:
1607 DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1608 azclose(&writer);
1609
1610 DBUG_RETURN(rc);
1611 }
1612
1613 /*
1614 Below is an example of how to setup row level locking.
1615 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1616 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1617 THR_LOCK_DATA **to,
1618 enum thr_lock_type lock_type)
1619 {
1620 if (lock_type == TL_WRITE_DELAYED)
1621 delayed_insert= TRUE;
1622 else
1623 delayed_insert= FALSE;
1624
1625 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1626 {
1627 /*
1628 Here is where we get into the guts of a row level lock.
1629 If TL_UNLOCK is set
1630 If we are not doing a LOCK TABLE or DISCARD/IMPORT
1631 TABLESPACE, then allow multiple writers
1632 */
1633
1634 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1635 lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1636 && !thd_tablespace_op(thd))
1637 lock_type = TL_WRITE_ALLOW_WRITE;
1638
1639 /*
1640 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1641 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1642 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1643 to t2. Convert the lock to a normal read lock to allow
1644 concurrent inserts to t2.
1645 */
1646
1647 if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1648 lock_type = TL_READ;
1649
1650 lock.type=lock_type;
1651 }
1652
1653 *to++= &lock;
1654
1655 return to;
1656 }
1657
update_create_info(HA_CREATE_INFO * create_info)1658 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1659 {
1660 char tmp_real_path[FN_REFLEN];
1661 DBUG_ENTER("ha_archive::update_create_info");
1662
1663 ha_archive::info(HA_STATUS_AUTO);
1664 if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1665 {
1666 create_info->auto_increment_value= stats.auto_increment_value;
1667 }
1668
1669 if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1670 create_info->data_file_name= sql_strdup(tmp_real_path);
1671
1672 DBUG_VOID_RETURN;
1673 }
1674
1675
1676 /*
1677 Hints for optimizer, see ha_tina for more information
1678 */
info(uint flag)1679 int ha_archive::info(uint flag)
1680 {
1681 DBUG_ENTER("ha_archive::info");
1682
1683 mysql_mutex_lock(&share->mutex);
1684 if (share->dirty)
1685 {
1686 DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1687 DBUG_ASSERT(share->archive_write_open);
1688 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1689 share->dirty= FALSE;
1690 }
1691
1692 /*
1693 This should be an accurate number now, though bulk and delayed inserts can
1694 cause the number to be inaccurate.
1695 */
1696 stats.records= share->rows_recorded;
1697 mysql_mutex_unlock(&share->mutex);
1698
1699 stats.deleted= 0;
1700
1701 DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1702 /* Costs quite a bit more to get all information */
1703 if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1704 {
1705 MY_STAT file_stat; // Stat information for the data file
1706
1707 (void) mysql_file_stat(arch_key_file_data, share->data_file_name, &file_stat, MYF(MY_WME));
1708
1709 if (flag & HA_STATUS_TIME)
1710 stats.update_time= (ulong) file_stat.st_mtime;
1711 if (flag & HA_STATUS_CONST)
1712 {
1713 stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1714 stats.max_data_file_length= MAX_FILE_SIZE;
1715 stats.create_time= (ulong) file_stat.st_ctime;
1716 }
1717 if (flag & HA_STATUS_VARIABLE)
1718 {
1719 stats.delete_length= 0;
1720 stats.data_file_length= file_stat.st_size;
1721 stats.index_file_length=0;
1722 stats.mean_rec_length= stats.records ?
1723 ulong(stats.data_file_length / stats.records) : table->s->reclength;
1724 }
1725 }
1726
1727 if (flag & HA_STATUS_AUTO)
1728 {
1729 /* TODO: Use the shared writer instead during the lock above. */
1730 init_archive_reader();
1731 mysql_mutex_lock(&share->mutex);
1732 azflush(&archive, Z_SYNC_FLUSH);
1733 mysql_mutex_unlock(&share->mutex);
1734 stats.auto_increment_value= archive.auto_increment + 1;
1735 }
1736
1737 DBUG_RETURN(0);
1738 }
1739
1740
1741 /**
1742 Handler hints.
1743
1744 @param operation Operation to prepare for.
1745
1746 @return Operation status
1747 @return 0 Success
1748 @return != 0 Error
1749 */
1750
extra(enum ha_extra_function operation)1751 int ha_archive::extra(enum ha_extra_function operation)
1752 {
1753 int ret= 0;
1754 DBUG_ENTER("ha_archive::extra");
1755 /* On windows we need to close all files before rename/delete. */
1756 #ifdef __WIN__
1757 switch (operation)
1758 {
1759 case HA_EXTRA_PREPARE_FOR_RENAME:
1760 case HA_EXTRA_FORCE_REOPEN:
1761 /* Close both reader and writer so we don't have the file open. */
1762 if (archive_reader_open)
1763 {
1764 ret= azclose(&archive);
1765 archive_reader_open= false;
1766 }
1767 mysql_mutex_lock(&share->mutex);
1768 share->close_archive_writer();
1769 mysql_mutex_unlock(&share->mutex);
1770 break;
1771 default:
1772 /* Nothing to do. */
1773 ;
1774 }
1775 #endif
1776 DBUG_RETURN(ret);
1777 }
1778
1779
1780 /*
1781 This method tells us that a bulk insert operation is about to occur. We set
1782 a flag which will keep write_row from saying that its data is dirty. This in
1783 turn will keep selects from causing a sync to occur.
1784 Basically, yet another optimizations to keep compression working well.
1785 */
start_bulk_insert(ha_rows rows)1786 void ha_archive::start_bulk_insert(ha_rows rows)
1787 {
1788 DBUG_ENTER("ha_archive::start_bulk_insert");
1789 if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1790 bulk_insert= TRUE;
1791 DBUG_VOID_RETURN;
1792 }
1793
1794
1795 /*
1796 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1797 flag, and set the share dirty so that the next select will call sync for us.
1798 */
end_bulk_insert()1799 int ha_archive::end_bulk_insert()
1800 {
1801 DBUG_ENTER("ha_archive::end_bulk_insert");
1802 bulk_insert= FALSE;
1803 mysql_mutex_lock(&share->mutex);
1804 if (share->archive_write_open)
1805 share->dirty= true;
1806 mysql_mutex_unlock(&share->mutex);
1807 DBUG_RETURN(0);
1808 }
1809
1810 /*
1811 We cancel a truncate command. The only way to delete an archive table is to drop it.
1812 This is done for security reasons. In a later version we will enable this by
1813 allowing the user to select a different row format.
1814 */
truncate()1815 int ha_archive::truncate()
1816 {
1817 DBUG_ENTER("ha_archive::truncate");
1818 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1819 }
1820
1821 /*
1822 We just return state if asked.
1823 */
is_crashed() const1824 bool ha_archive::is_crashed() const
1825 {
1826 DBUG_ENTER("ha_archive::is_crashed");
1827 DBUG_RETURN(share->crashed);
1828 }
1829
1830
1831 /**
1832 @brief Check for upgrade
1833
1834 @param[in] check_opt check options
1835
1836 @return Completion status
1837 @retval HA_ADMIN_OK No upgrade required
1838 @retval HA_ADMIN_CORRUPT Cannot read meta-data
1839 @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1840 */
1841
check_for_upgrade(HA_CHECK_OPT * check_opt)1842 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1843 {
1844 DBUG_ENTER("ha_archive::check_for_upgrade");
1845 if (init_archive_reader())
1846 DBUG_RETURN(HA_ADMIN_CORRUPT);
1847 if (archive.version < ARCHIVE_VERSION)
1848 DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1849 DBUG_RETURN(HA_ADMIN_OK);
1850 }
1851
1852
1853 /*
1854 Simple scan of the tables to make sure everything is ok.
1855 */
1856
check(THD * thd,HA_CHECK_OPT * check_opt)1857 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1858 {
1859 int rc= 0;
1860 const char *old_proc_info;
1861 ha_rows count;
1862 DBUG_ENTER("ha_archive::check");
1863
1864 old_proc_info= thd_proc_info(thd, "Checking table");
1865 mysql_mutex_lock(&share->mutex);
1866 count= share->rows_recorded;
1867 /* Flush any waiting data */
1868 if (share->archive_write_open)
1869 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1870 mysql_mutex_unlock(&share->mutex);
1871
1872 if (init_archive_reader())
1873 DBUG_RETURN(HA_ADMIN_CORRUPT);
1874 /*
1875 Now we will rewind the archive file so that we are positioned at the
1876 start of the file.
1877 */
1878 read_data_header(&archive);
1879 for (ha_rows cur_count= count; cur_count; cur_count--)
1880 {
1881 if ((rc= get_row(&archive, table->record[0])))
1882 goto error;
1883 }
1884 /*
1885 Now read records that may have been inserted concurrently.
1886 Acquire share->mutex so tail of the table is not modified by
1887 concurrent writers.
1888 */
1889 mysql_mutex_lock(&share->mutex);
1890 count= share->rows_recorded - count;
1891 if (share->archive_write_open)
1892 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1893 while (!(rc= get_row(&archive, table->record[0])))
1894 count--;
1895 mysql_mutex_unlock(&share->mutex);
1896
1897 if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1898 goto error;
1899
1900 thd_proc_info(thd, old_proc_info);
1901 DBUG_RETURN(HA_ADMIN_OK);
1902
1903 error:
1904 thd_proc_info(thd, old_proc_info);
1905 share->crashed= FALSE;
1906 DBUG_RETURN(HA_ADMIN_CORRUPT);
1907 }
1908
1909 /*
1910 Check and repair the table if needed.
1911 */
check_and_repair(THD * thd)1912 bool ha_archive::check_and_repair(THD *thd)
1913 {
1914 HA_CHECK_OPT check_opt;
1915 DBUG_ENTER("ha_archive::check_and_repair");
1916
1917 check_opt.init();
1918
1919 DBUG_RETURN(repair(thd, &check_opt));
1920 }
1921
create_record_buffer(unsigned int length)1922 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1923 {
1924 DBUG_ENTER("ha_archive::create_record_buffer");
1925 archive_record_buffer *r;
1926 if (!(r=
1927 (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1928 MYF(MY_WME))))
1929 {
1930 DBUG_RETURN(NULL); /* purecov: inspected */
1931 }
1932 r->length= (int)length;
1933
1934 if (!(r->buffer= (uchar*) my_malloc(r->length,
1935 MYF(MY_WME))))
1936 {
1937 my_free(r);
1938 DBUG_RETURN(NULL); /* purecov: inspected */
1939 }
1940
1941 DBUG_RETURN(r);
1942 }
1943
destroy_record_buffer(archive_record_buffer * r)1944 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1945 {
1946 DBUG_ENTER("ha_archive::destroy_record_buffer");
1947 my_free(r->buffer);
1948 my_free(r);
1949 DBUG_VOID_RETURN;
1950 }
1951
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1952 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info,
1953 uint table_changes)
1954 {
1955 if (info->auto_increment_value != stats.auto_increment_value ||
1956 (info->used_fields & HA_CREATE_USED_DATADIR) ||
1957 info->data_file_name ||
1958 (info->used_fields & HA_CREATE_USED_COMMENT) ||
1959 table_changes != IS_EQUAL_YES)
1960 return COMPATIBLE_DATA_NO;
1961
1962 return COMPATIBLE_DATA_YES;
1963 }
1964
1965
1966 struct st_mysql_storage_engine archive_storage_engine=
1967 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1968
mysql_declare_plugin(archive)1969 mysql_declare_plugin(archive)
1970 {
1971 MYSQL_STORAGE_ENGINE_PLUGIN,
1972 &archive_storage_engine,
1973 "ARCHIVE",
1974 "Brian Aker, MySQL AB",
1975 "Archive storage engine",
1976 PLUGIN_LICENSE_GPL,
1977 archive_db_init, /* Plugin Init */
1978 NULL, /* Plugin Deinit */
1979 0x0300 /* 3.0 */,
1980 NULL, /* status variables */
1981 NULL, /* system variables */
1982 NULL, /* config options */
1983 0, /* flags */
1984 }
1985 mysql_declare_plugin_end;
1986
1987