1 /*
2 Copyright (c) 2004, 2014, Oracle and/or its affiliates
3 Copyright (c) 2010, 2014, SkySQL Ab.
4
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License
7 as published by the Free Software Foundation; version 2 of
8 the License.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
18 */
19
20 #ifdef USE_PRAGMA_IMPLEMENTATION
21 #pragma implementation // gcc: Class implementation
22 #endif
23
24 #include <my_global.h>
25 #include "sql_class.h" // SSV
26 #include "sql_table.h" // build_table_filename
27 #include <myisam.h> // T_EXTEND
28
29 #include "ha_archive.h"
30 #include "discover.h"
31 #include <my_dir.h>
32
33 #include <mysql/plugin.h>
34
35 /*
36 First, if you want to understand storage engines you should look at
37 ha_example.cc and ha_example.h.
38
39 This example was written as a test case for a customer who needed
40 a storage engine without indexes that could compress data very well.
41 So, welcome to a completely compressed storage engine. This storage
42 engine only does inserts. No replace, deletes, or updates. All reads are
43 complete table scans. Compression is done through a combination of packing
44 and making use of the zlib library
45
46 We keep a file pointer open for each instance of ha_archive for each read
47 but for writes we keep one open file handle just for that. We flush it
48 only if we have a read occur. azip handles compressing lots of records
49 at once much better then doing lots of little records between writes.
50 It is possible to not lock on writes but this would then mean we couldn't
51 handle bulk inserts as well (that is if someone was trying to read at
52 the same time since we would want to flush).
53
54 A "meta" file is kept alongside the data file. This file serves two purpose.
55 The first purpose is to track the number of rows in the table. The second
56 purpose is to determine if the table was closed properly or not. When the
57 meta file is first opened it is marked as dirty. It is opened when the table
58 itself is opened for writing. When the table is closed the new count for rows
59 is written to the meta file and the file is marked as clean. If the meta file
60 is opened and it is marked as dirty, it is assumed that a crash occurred. At
61 this point an error occurs and the user is told to rebuild the file.
62 A rebuild scans the rows and rewrites the meta file. If corruption is found
63 in the data file then the meta file is not repaired.
64
65 At some point a recovery method for such a drastic case needs to be divised.
66
67 Locks are row level, and you will get a consistant read.
68
69 For performance as far as table scans go it is quite fast. I don't have
70 good numbers but locally it has out performed both Innodb and MyISAM. For
71 Innodb the question will be if the table can be fit into the buffer
72 pool. For MyISAM its a question of how much the file system caches the
73 MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
74 doesn't have enough memory to cache entire table that archive turns out
75 to be any faster.
76
77 Examples between MyISAM (packed) and Archive.
78
79 Table with 76695844 identical rows:
80 29680807 a_archive.ARZ
81 920350317 a.MYD
82
83
84 Table with 8991478 rows (all of Slashdot's comments):
85 1922964506 comment_archive.ARZ
86 2944970297 comment_text.MYD
87
88
89 TODO:
90 Allow users to set compression level.
91 Allow adjustable block size.
92 Implement versioning, should be easy.
93 Allow for errors, find a way to mark bad rows.
94 Add optional feature so that rows can be flushed at interval (which will cause less
95 compression but may speed up ordered searches).
96 Checkpoint the meta file to allow for faster rebuilds.
97 Option to allow for dirty reads, this would lower the sync calls, which would make
98 inserts a lot faster, but would mean highly arbitrary reads.
99
100 -Brian
101
102 Archive file format versions:
103 <5.1.5 - v.1
104 5.1.5-5.1.15 - v.2
105 >5.1.15 - v.3
106 */
107
108 /* The file extension */
109 #define ARZ ".ARZ" // The data file
110 #define ARN ".ARN" // Files used during an optimize call
111 #define ARM ".ARM" // Meta file (deprecated)
112
113 /* 5.0 compatibility */
114 #define META_V1_OFFSET_CHECK_HEADER 0
115 #define META_V1_OFFSET_VERSION 1
116 #define META_V1_OFFSET_ROWS_RECORDED 2
117 #define META_V1_OFFSET_CHECK_POINT 10
118 #define META_V1_OFFSET_CRASHED 18
119 #define META_V1_LENGTH 19
120
121 /*
122 uchar + uchar
123 */
124 #define DATA_BUFFER_SIZE 2 // Size of the data used in the data file
125 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
126
127 #ifdef HAVE_PSI_INTERFACE
128 extern "C" PSI_file_key arch_key_file_data;
129 #endif
130
131 /* Static declarations for handerton */
132 static handler *archive_create_handler(handlerton *hton,
133 TABLE_SHARE *table,
134 MEM_ROOT *mem_root);
135 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share);
136
137 /*
138 Number of rows that will force a bulk insert.
139 */
140 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
141
142 /*
143 Size of header used for row
144 */
145 #define ARCHIVE_ROW_HEADER_SIZE 4
146
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)147 static handler *archive_create_handler(handlerton *hton,
148 TABLE_SHARE *table,
149 MEM_ROOT *mem_root)
150 {
151 return new (mem_root) ha_archive(hton, table);
152 }
153
154 #ifdef HAVE_PSI_INTERFACE
155 PSI_mutex_key az_key_mutex_Archive_share_mutex;
156
157 static PSI_mutex_info all_archive_mutexes[]=
158 {
159 { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
160 };
161
162 PSI_file_key arch_key_file_metadata, arch_key_file_data;
163 static PSI_file_info all_archive_files[]=
164 {
165 { &arch_key_file_metadata, "metadata", 0},
166 { &arch_key_file_data, "data", 0}
167 };
168
init_archive_psi_keys(void)169 static void init_archive_psi_keys(void)
170 {
171 const char* category= "archive";
172 int count;
173
174 if (!PSI_server)
175 return;
176
177 count= array_elements(all_archive_mutexes);
178 mysql_mutex_register(category, all_archive_mutexes, count);
179
180 count= array_elements(all_archive_files);
181 mysql_file_register(category, all_archive_files, count);
182 }
183
184 #endif /* HAVE_PSI_INTERFACE */
185
186 /*
187 Initialize the archive handler.
188
189 SYNOPSIS
190 archive_db_init()
191 void *
192
193 RETURN
194 FALSE OK
195 TRUE Error
196 */
197
198 /*
199 We just implement one additional file extension.
200 ARM is here just to properly drop 5.0 tables.
201 */
202 static const char *ha_archive_exts[] = {
203 ARZ,
204 ARM,
205 NullS
206 };
207
archive_db_init(void * p)208 int archive_db_init(void *p)
209 {
210 DBUG_ENTER("archive_db_init");
211 handlerton *archive_hton;
212
213 #ifdef HAVE_PSI_INTERFACE
214 init_archive_psi_keys();
215 #endif
216
217 archive_hton= (handlerton *)p;
218 archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
219 archive_hton->create= archive_create_handler;
220 archive_hton->flags= HTON_NO_FLAGS;
221 archive_hton->discover_table= archive_discover;
222 archive_hton->tablefile_extensions= ha_archive_exts;
223
224 DBUG_RETURN(0);
225 }
226
227
Archive_share()228 Archive_share::Archive_share()
229 {
230 crashed= false;
231 in_optimize= false;
232 archive_write_open= false;
233 dirty= false;
234 DBUG_PRINT("ha_archive", ("Archive_share: %p",
235 this));
236 thr_lock_init(&lock);
237 /*
238 We will use this lock for rows.
239 */
240 mysql_mutex_init(az_key_mutex_Archive_share_mutex,
241 &mutex, MY_MUTEX_INIT_FAST);
242 }
243
244
~Archive_share()245 Archive_share::~Archive_share()
246 {
247 DBUG_PRINT("ha_archive", ("~Archive_share: %p", this));
248 if (archive_write_open)
249 {
250 mysql_mutex_lock(&mutex);
251 (void) close_archive_writer(); // Will reset archive_write_open
252 mysql_mutex_unlock(&mutex);
253 }
254 thr_lock_delete(&lock);
255 mysql_mutex_destroy(&mutex);
256 }
257
258
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)259 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
260 :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
261 {
262 /* Set our original buffer from pre-allocated memory */
263 buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
264
265 /* The size of the offset value we will use for position() */
266 ref_length= sizeof(my_off_t);
267 archive_reader_open= FALSE;
268 }
269
archive_discover(handlerton * hton,THD * thd,TABLE_SHARE * share)270 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
271 {
272 DBUG_ENTER("archive_discover");
273 DBUG_PRINT("archive_discover", ("db: '%s' name: '%s'", share->db.str,
274 share->table_name.str));
275 azio_stream frm_stream;
276 char az_file[FN_REFLEN];
277 uchar *frm_ptr;
278 MY_STAT file_stat;
279
280 strxmov(az_file, share->normalized_path.str, ARZ, NullS);
281
282 if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0))))
283 DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
284
285 if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
286 {
287 if (errno == EROFS || errno == EACCES)
288 DBUG_RETURN(my_errno= errno);
289 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
290 }
291
292 if (frm_stream.frm_length == 0)
293 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
294
295 frm_ptr= (uchar *)my_malloc(PSI_INSTRUMENT_ME, frm_stream.frm_length,
296 MYF(MY_THREAD_SPECIFIC | MY_WME));
297 if (!frm_ptr)
298 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
299
300 if (azread_frm(&frm_stream, frm_ptr))
301 goto ret;
302
303 azclose(&frm_stream);
304
305 my_errno= share->init_from_binary_frm_image(thd, 1,
306 frm_ptr, frm_stream.frm_length);
307 ret:
308 my_free(frm_ptr);
309 DBUG_RETURN(my_errno);
310 }
311
312 /**
313 @brief Read version 1 meta file (5.0 compatibility routine).
314
315 @return Completion status
316 @retval 0 Success
317 @retval !0 Failure
318 */
319
read_v1_metafile()320 int Archive_share::read_v1_metafile()
321 {
322 char file_name[FN_REFLEN];
323 uchar buf[META_V1_LENGTH];
324 File fd;
325 DBUG_ENTER("Archive_share::read_v1_metafile");
326
327 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
328 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
329 DBUG_RETURN(-1);
330
331 if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
332 {
333 mysql_file_close(fd, MYF(0));
334 DBUG_RETURN(-1);
335 }
336
337 rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
338 crashed= buf[META_V1_OFFSET_CRASHED];
339 mysql_file_close(fd, MYF(0));
340 DBUG_RETURN(0);
341 }
342
343
344 /**
345 @brief Write version 1 meta file (5.0 compatibility routine).
346
347 @return Completion status
348 @retval 0 Success
349 @retval !0 Failure
350 */
351
write_v1_metafile()352 int Archive_share::write_v1_metafile()
353 {
354 char file_name[FN_REFLEN];
355 uchar buf[META_V1_LENGTH];
356 File fd;
357 DBUG_ENTER("Archive_share::write_v1_metafile");
358
359 buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
360 buf[META_V1_OFFSET_VERSION]= 1;
361 int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
362 int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
363 buf[META_V1_OFFSET_CRASHED]= crashed;
364
365 fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
366 if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
367 DBUG_RETURN(-1);
368
369 if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
370 {
371 mysql_file_close(fd, MYF(0));
372 DBUG_RETURN(-1);
373 }
374
375 mysql_file_close(fd, MYF(0));
376 DBUG_RETURN(0);
377 }
378
379 /**
380 @brief Pack version 1 row (5.0 compatibility routine).
381
382 @param[in] record the record to pack
383
384 @return Length of packed row
385 */
386
pack_row_v1(const uchar * record)387 unsigned int ha_archive::pack_row_v1(const uchar *record)
388 {
389 uint *blob, *end;
390 uchar *pos;
391 DBUG_ENTER("pack_row_v1");
392 memcpy(record_buffer->buffer, record, table->s->reclength);
393
394 /*
395 The end of VARCHAR fields are filled with garbage,so here
396 we explicitly set the end of the VARCHAR fields with zeroes
397 */
398
399 for (Field** field= table->field; (*field) ; field++)
400 {
401 Field *fld= *field;
402 if (fld->type() == MYSQL_TYPE_VARCHAR)
403 {
404 if (!(fld->is_real_null(record - table->record[0])))
405 {
406 ptrdiff_t start= (fld->ptr - table->record[0]);
407 Field_varstring *const field_var= (Field_varstring *)fld;
408 uint offset= field_var->data_length() + field_var->length_size();
409 memset(record_buffer->buffer + start + offset, 0,
410 fld->field_length - offset + 1);
411 }
412 }
413 }
414 pos= record_buffer->buffer + table->s->reclength;
415 for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
416 blob != end; blob++)
417 {
418 uint32 length= ((Field_blob *) table->field[*blob])->get_length();
419 if (length)
420 {
421 uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr();
422 memcpy(pos, data_ptr, length);
423 pos+= length;
424 }
425 }
426 DBUG_RETURN((int)(pos - record_buffer->buffer));
427 }
428
429 /*
430 This method reads the header of a datafile and returns whether or not it was successful.
431 */
read_data_header(azio_stream * file_to_read)432 int ha_archive::read_data_header(azio_stream *file_to_read)
433 {
434 int error;
435 unsigned long ret;
436 uchar data_buffer[DATA_BUFFER_SIZE];
437 DBUG_ENTER("ha_archive::read_data_header");
438
439 if (azrewind(file_to_read) == -1)
440 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
441
442 if (file_to_read->version >= 3)
443 DBUG_RETURN(0);
444 /* Everything below this is just legacy to version 2< */
445
446 DBUG_PRINT("ha_archive", ("Reading legacy data header"));
447
448 ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
449
450 if (ret != DATA_BUFFER_SIZE)
451 {
452 DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
453 DATA_BUFFER_SIZE, ret));
454 DBUG_RETURN(1);
455 }
456
457 if (error)
458 {
459 DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
460 DBUG_RETURN(1);
461 }
462
463 DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
464 DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
465
466 if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
467 (data_buffer[1] == 1 || data_buffer[1] == 2))
468 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
469
470 DBUG_RETURN(0);
471 }
472
473
474 /*
475 We create the shared memory space that we will use for the open table.
476 No matter what we try to get or create a share. This is so that a repair
477 table operation can occur.
478
479 See ha_example.cc for a longer description.
480 */
get_share(const char * table_name,int * rc)481 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
482 {
483 Archive_share *tmp_share;
484
485 DBUG_ENTER("ha_archive::get_share");
486
487 lock_shared_ha_data();
488 if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
489 {
490 azio_stream archive_tmp;
491
492 tmp_share= new Archive_share;
493
494 if (!tmp_share)
495 {
496 *rc= HA_ERR_OUT_OF_MEM;
497 goto err;
498 }
499 DBUG_PRINT("ha_archive", ("new Archive_share: %p",
500 tmp_share));
501
502 fn_format(tmp_share->data_file_name, table_name, "",
503 ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
504 strmov(tmp_share->table_name, table_name);
505 DBUG_PRINT("ha_archive", ("Data File %s",
506 tmp_share->data_file_name));
507
508 /*
509 We read the meta file, but do not mark it dirty. Since we are not
510 doing a write we won't mark it dirty (and we won't open it for
511 anything but reading... open it for write and we will generate null
512 compression writes).
513 */
514 if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
515 {
516 delete tmp_share;
517 *rc= my_errno ? my_errno : HA_ERR_CRASHED;
518 tmp_share= NULL;
519 goto err;
520 }
521 stats.auto_increment_value= archive_tmp.auto_increment + 1;
522 tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
523 tmp_share->crashed= archive_tmp.dirty;
524 share= tmp_share;
525 if (archive_tmp.version == 1)
526 share->read_v1_metafile();
527 else if (frm_compare(&archive_tmp))
528 *rc= HA_ERR_TABLE_DEF_CHANGED;
529
530 azclose(&archive_tmp);
531
532 set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
533 }
534 if (tmp_share->crashed)
535 *rc= HA_ERR_CRASHED_ON_USAGE;
536 err:
537 unlock_shared_ha_data();
538
539 DBUG_ASSERT(tmp_share || *rc);
540
541 DBUG_RETURN(tmp_share);
542 }
543
544
init_archive_writer()545 int Archive_share::init_archive_writer()
546 {
547 DBUG_ENTER("Archive_share::init_archive_writer");
548 /*
549 It is expensive to open and close the data files and since you can't have
550 a gzip file that can be both read and written we keep a writer open
551 that is shared amoung all open tables.
552 */
553 if (!(azopen(&archive_write, data_file_name,
554 O_RDWR|O_BINARY)))
555 {
556 DBUG_PRINT("ha_archive", ("Could not open archive write file"));
557 crashed= true;
558 DBUG_RETURN(1);
559 }
560 archive_write_open= true;
561
562 DBUG_RETURN(0);
563 }
564
565
close_archive_writer()566 void Archive_share::close_archive_writer()
567 {
568 mysql_mutex_assert_owner(&mutex);
569 if (archive_write_open)
570 {
571 if (archive_write.version == 1)
572 (void) write_v1_metafile();
573 azclose(&archive_write);
574 archive_write_open= false;
575 dirty= false;
576 }
577 }
578
579
580 /*
581 No locks are required because it is associated with just one handler instance
582 */
init_archive_reader()583 int ha_archive::init_archive_reader()
584 {
585 DBUG_ENTER("ha_archive::init_archive_reader");
586 /*
587 It is expensive to open and close the data files and since you can't have
588 a gzip file that can be both read and written we keep a writer open
589 that is shared amoung all open tables, but have one reader open for
590 each handler instance.
591 */
592 if (!archive_reader_open)
593 {
594 if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
595 {
596 DBUG_PRINT("ha_archive", ("Could not open archive read file"));
597 share->crashed= TRUE;
598 DBUG_RETURN(1);
599 }
600 archive_reader_open= TRUE;
601 }
602
603 DBUG_RETURN(0);
604 }
605
606
607 /*
608 When opening a file we:
609 Create/get our shared structure.
610 Init out lock.
611 We open the file we will read from.
612 */
open(const char * name,int mode,uint open_options)613 int ha_archive::open(const char *name, int mode, uint open_options)
614 {
615 int rc= 0;
616 DBUG_ENTER("ha_archive::open");
617
618 DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
619 (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
620 share= get_share(name, &rc);
621 if (!share)
622 DBUG_RETURN(rc);
623
624 /* Allow open on crashed table in repair mode only. */
625 switch (rc)
626 {
627 case 0:
628 break;
629 case HA_ERR_TABLE_DEF_CHANGED:
630 case HA_ERR_CRASHED_ON_USAGE:
631 if (open_options & HA_OPEN_FOR_REPAIR)
632 {
633 rc= 0;
634 break;
635 }
636 /* fall through */
637 default:
638 DBUG_RETURN(rc);
639 }
640
641 DBUG_ASSERT(share);
642
643 record_buffer= create_record_buffer(table->s->reclength +
644 ARCHIVE_ROW_HEADER_SIZE);
645
646 if (!record_buffer)
647 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
648
649 thr_lock_data_init(&share->lock, &lock, NULL);
650
651 DBUG_PRINT("ha_archive", ("archive table was crashed %s",
652 rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
653 if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
654 {
655 DBUG_RETURN(0);
656 }
657
658 DBUG_RETURN(rc);
659 }
660
661
662 /*
663 Closes the file.
664
665 SYNOPSIS
666 close();
667
668 IMPLEMENTATION:
669
670 We first close this storage engines file handle to the archive and
671 then remove our reference count to the table (and possibly free it
672 as well).
673
674 RETURN
675 0 ok
676 1 Error
677 */
678
close(void)679 int ha_archive::close(void)
680 {
681 int rc= 0;
682 DBUG_ENTER("ha_archive::close");
683
684 destroy_record_buffer(record_buffer);
685
686 /* First close stream */
687 if (archive_reader_open)
688 {
689 if (azclose(&archive))
690 rc= 1;
691 }
692 DBUG_RETURN(rc);
693 }
694
695
696 /**
697 Copy a frm blob between streams.
698
699 @param src The source stream.
700 @param dst The destination stream.
701
702 @return Zero on success, non-zero otherwise.
703 */
704
frm_copy(azio_stream * src,azio_stream * dst)705 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
706 {
707 int rc= 0;
708 uchar *frm_ptr;
709
710 if (!src->frm_length)
711 {
712 size_t frm_len;
713 if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len))
714 {
715 azwrite_frm(dst, frm_ptr, frm_len);
716 table_share->free_frm_image(frm_ptr);
717 }
718 return 0;
719 }
720
721 if (!(frm_ptr= (uchar *) my_malloc(PSI_INSTRUMENT_ME, src->frm_length,
722 MYF(MY_THREAD_SPECIFIC | MY_WME))))
723 return HA_ERR_OUT_OF_MEM;
724
725 /* Write file offset is set to the end of the file. */
726 if (azread_frm(src, frm_ptr) ||
727 azwrite_frm(dst, frm_ptr, src->frm_length))
728 rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
729
730 my_free(frm_ptr);
731
732 return rc;
733 }
734
735
736 /**
737 Compare frm blob with the on-disk frm file
738
739 @param s The azio stream.
740
741 @return Zero if equal, non-zero otherwise.
742 */
743
frm_compare(azio_stream * s)744 int ha_archive::frm_compare(azio_stream *s)
745 {
746 if (!s->frmver_length)
747 return 0; // Old pre-10.0 archive table. Never rediscover.
748
749 LEX_CUSTRING *ver= &table->s->tabledef_version;
750 return ver->length != s->frmver_length ||
751 memcmp(ver->str, s->frmver, ver->length);
752 }
753
754
755 /*
756 We create our data file here. The format is pretty simple.
757 You can read about the format of the data file above.
758 Unlike other storage engines we do not "pack" our data. Since we
759 are about to do a general compression, packing would just be a waste of
760 CPU time. If the table has blobs they are written after the row in the order
761 of creation.
762 */
763
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)764 int ha_archive::create(const char *name, TABLE *table_arg,
765 HA_CREATE_INFO *create_info)
766 {
767 char name_buff[FN_REFLEN];
768 char linkname[FN_REFLEN];
769 int error;
770 azio_stream create_stream; /* Archive file we are working with */
771 const uchar *frm_ptr;
772 size_t frm_len;
773
774 DBUG_ENTER("ha_archive::create");
775
776 stats.auto_increment_value= create_info->auto_increment_value;
777
778 for (uint key= 0; key < table_arg->s->keys; key++)
779 {
780 KEY *pos= table_arg->key_info+key;
781 KEY_PART_INFO *key_part= pos->key_part;
782 KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
783
784 for (; key_part != key_part_end; key_part++)
785 {
786 Field *field= key_part->field;
787
788 if (!(field->flags & AUTO_INCREMENT_FLAG))
789 {
790 error= HA_WRONG_CREATE_OPTION;
791 DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
792 goto error;
793 }
794 }
795 }
796
797 /*
798 We reuse name_buff since it is available.
799 */
800 #ifdef HAVE_READLINK
801 if (my_use_symdir &&
802 create_info->data_file_name &&
803 create_info->data_file_name[0] != '#')
804 {
805 DBUG_PRINT("ha_archive", ("archive will create stream file %s",
806 create_info->data_file_name));
807
808 fn_format(name_buff, create_info->data_file_name, "", ARZ,
809 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
810 fn_format(linkname, name, "", ARZ,
811 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
812 }
813 else
814 #endif /* HAVE_READLINK */
815 {
816 if (create_info->data_file_name)
817 my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "DATA DIRECTORY");
818
819 fn_format(name_buff, name, "", ARZ,
820 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
821 linkname[0]= 0;
822 }
823
824 /* Archive engine never uses INDEX DIRECTORY. */
825 if (create_info->index_file_name)
826 my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "INDEX DIRECTORY");
827
828 /*
829 There is a chance that the file was "discovered". In this case
830 just use whatever file is there.
831 */
832 my_errno= 0;
833 if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
834 {
835 error= errno;
836 goto error2;
837 }
838
839 if (linkname[0])
840 my_symlink(name_buff, linkname, MYF(0));
841
842 /*
843 Here is where we open up the frm and pass it to archive to store
844 */
845 if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len))
846 {
847 azwrite_frm(&create_stream, frm_ptr, frm_len);
848 table_arg->s->free_frm_image(frm_ptr);
849 }
850
851 if (create_info->comment.str)
852 azwrite_comment(&create_stream, create_info->comment.str,
853 create_info->comment.length);
854
855 /*
856 Yes you need to do this, because the starting value
857 for the autoincrement may not be zero.
858 */
859 create_stream.auto_increment= stats.auto_increment_value ?
860 stats.auto_increment_value - 1 : 0;
861 if (azclose(&create_stream))
862 {
863 error= errno;
864 goto error2;
865 }
866
867 DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
868 DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
869
870
871 DBUG_RETURN(0);
872
873 error2:
874 delete_table(name);
875 error:
876 /* Return error number, if we got one */
877 DBUG_RETURN(error ? error : -1);
878 }
879
880 /*
881 This is where the actual row is written out.
882 */
real_write_row(const uchar * buf,azio_stream * writer)883 int ha_archive::real_write_row(const uchar *buf, azio_stream *writer)
884 {
885 my_off_t written;
886 unsigned int r_pack_length;
887 DBUG_ENTER("ha_archive::real_write_row");
888
889 /* We pack the row for writing */
890 r_pack_length= pack_row(buf, writer);
891
892 written= azwrite(writer, record_buffer->buffer, r_pack_length);
893 if (written != r_pack_length)
894 {
895 DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
896 (uint32) written,
897 (uint32)r_pack_length));
898 DBUG_RETURN(-1);
899 }
900
901 if (!delayed_insert || !bulk_insert)
902 share->dirty= TRUE;
903
904 DBUG_RETURN(0);
905 }
906
907
908 /*
909 Calculate max length needed for row. This includes
910 the bytes required for the length in the header.
911 */
912
max_row_length(const uchar * record)913 uint32 ha_archive::max_row_length(const uchar *record)
914 {
915 uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
916 length+= ARCHIVE_ROW_HEADER_SIZE;
917 my_ptrdiff_t const rec_offset= record - table->record[0];
918
919 uint *ptr, *end;
920 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
921 ptr != end ;
922 ptr++)
923 {
924 if (!table->field[*ptr]->is_null(rec_offset))
925 length += 2 + ((Field_blob*)table->field[*ptr])->get_length(rec_offset);
926 }
927
928 return length;
929 }
930
931
pack_row(const uchar * record,azio_stream * writer)932 unsigned int ha_archive::pack_row(const uchar *record, azio_stream *writer)
933 {
934 uchar *ptr;
935 my_ptrdiff_t const rec_offset= record - table->record[0];
936 DBUG_ENTER("ha_archive::pack_row");
937
938 if (fix_rec_buff(max_row_length(record)))
939 DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
940
941 if (writer->version == 1)
942 DBUG_RETURN(pack_row_v1(record));
943
944 /* Copy null bits */
945 memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
946 record, table->s->null_bytes);
947 ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
948
949 for (Field **field=table->field ; *field ; field++)
950 {
951 if (!((*field)->is_null(rec_offset)))
952 ptr= (*field)->pack(ptr, record + (*field)->offset(record));
953 }
954
955 int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
956 ARCHIVE_ROW_HEADER_SIZE));
957 DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
958 (ptr - record_buffer->buffer -
959 ARCHIVE_ROW_HEADER_SIZE)));
960
961 DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
962 }
963
964
965 /*
966 Look at ha_archive::open() for an explanation of the row format.
967 Here we just write out the row.
968
969 Wondering about start_bulk_insert()? We don't implement it for
970 archive since it optimizes for lots of writes. The only save
971 for implementing start_bulk_insert() is that we could skip
972 setting dirty to true each time.
973 */
write_row(const uchar * buf)974 int ha_archive::write_row(const uchar *buf)
975 {
976 int rc;
977 uchar *read_buf= NULL;
978 ulonglong temp_auto;
979 uchar *record= table->record[0];
980 DBUG_ENTER("ha_archive::write_row");
981
982 if (share->crashed)
983 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
984
985 mysql_mutex_lock(&share->mutex);
986
987 if (!share->archive_write_open && share->init_archive_writer())
988 {
989 rc= errno;
990 goto error;
991 }
992
993 if (table->next_number_field && record == table->record[0])
994 {
995 KEY *mkey= &table->key_info[0]; // We only support one key right now
996 update_auto_increment();
997 temp_auto= table->next_number_field->val_int();
998
999 /*
1000 We don't support decremening auto_increment. They make the performance
1001 just cry.
1002 */
1003 if (temp_auto <= share->archive_write.auto_increment &&
1004 mkey->flags & HA_NOSAME)
1005 {
1006 rc= HA_ERR_FOUND_DUPP_KEY;
1007 goto error;
1008 }
1009 #ifdef DEAD_CODE
1010 /*
1011 Bad news, this will cause a search for the unique value which is very
1012 expensive since we will have to do a table scan which will lock up
1013 all other writers during this period. This could perhaps be optimized
1014 in the future.
1015 */
1016 {
1017 /*
1018 First we create a buffer that we can use for reading rows, and can pass
1019 to get_row().
1020 */
1021 if (!(read_buf= (uchar*) my_malloc(table->s->reclength,
1022 MYF(MY_THREAD_SPECIFIC | MY_WME))))
1023 {
1024 rc= HA_ERR_OUT_OF_MEM;
1025 goto error;
1026 }
1027 /*
1028 All of the buffer must be written out or we won't see all of the
1029 data
1030 */
1031 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1032 /*
1033 Set the position of the local read thread to the beginning position.
1034 */
1035 if (read_data_header(&archive))
1036 {
1037 rc= HA_ERR_CRASHED_ON_USAGE;
1038 goto error;
1039 }
1040
1041 Field *mfield= table->next_number_field;
1042
1043 while (!(get_row(&archive, read_buf)))
1044 {
1045 if (!memcmp(read_buf + mfield->offset(record),
1046 table->next_number_field->ptr,
1047 mfield->max_display_length()))
1048 {
1049 rc= HA_ERR_FOUND_DUPP_KEY;
1050 goto error;
1051 }
1052 }
1053 }
1054 #endif
1055 else
1056 {
1057 if (temp_auto > share->archive_write.auto_increment)
1058 stats.auto_increment_value=
1059 (share->archive_write.auto_increment= temp_auto) + 1;
1060 }
1061 }
1062
1063 /*
1064 Notice that the global auto_increment has been increased.
1065 In case of a failed row write, we will never try to reuse the value.
1066 */
1067 share->rows_recorded++;
1068 rc= real_write_row(buf, &(share->archive_write));
1069 error:
1070 mysql_mutex_unlock(&share->mutex);
1071 my_free(read_buf);
1072 DBUG_RETURN(rc);
1073 }
1074
1075
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1076 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1077 ulonglong nb_desired_values,
1078 ulonglong *first_value,
1079 ulonglong *nb_reserved_values)
1080 {
1081 *nb_reserved_values= ULONGLONG_MAX;
1082 *first_value= share->archive_write.auto_increment + 1;
1083 }
1084
1085 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1086 int ha_archive::index_init(uint keynr, bool sorted)
1087 {
1088 DBUG_ENTER("ha_archive::index_init");
1089 active_index= keynr;
1090 DBUG_RETURN(0);
1091 }
1092
1093
1094 /*
1095 No indexes, so if we get a request for an index search since we tell
1096 the optimizer that we have unique indexes, we scan
1097 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1098 int ha_archive::index_read(uchar *buf, const uchar *key,
1099 uint key_len, enum ha_rkey_function find_flag)
1100 {
1101 int rc;
1102 DBUG_ENTER("ha_archive::index_read");
1103 rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1104 DBUG_RETURN(rc);
1105 }
1106
1107
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1108 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1109 uint key_len, enum ha_rkey_function find_flag)
1110 {
1111 int rc;
1112 bool found= 0;
1113 KEY *mkey= &table->key_info[index];
1114 current_k_offset= mkey->key_part->offset;
1115 current_key= key;
1116 current_key_len= key_len;
1117
1118
1119 DBUG_ENTER("ha_archive::index_read_idx");
1120
1121 rc= rnd_init(TRUE);
1122
1123 if (rc)
1124 goto error;
1125
1126 while (!(get_row(&archive, buf)))
1127 {
1128 if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1129 {
1130 found= 1;
1131 break;
1132 }
1133 }
1134
1135 if (found)
1136 {
1137 /* notify handler that a record has been found */
1138 table->status= 0;
1139 DBUG_RETURN(0);
1140 }
1141
1142 error:
1143 DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1144 }
1145
1146
index_next(uchar * buf)1147 int ha_archive::index_next(uchar * buf)
1148 {
1149 bool found= 0;
1150 int rc;
1151
1152 DBUG_ENTER("ha_archive::index_next");
1153
1154 while (!(get_row(&archive, buf)))
1155 {
1156 if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1157 {
1158 found= 1;
1159 break;
1160 }
1161 }
1162
1163 rc= found ? 0 : HA_ERR_END_OF_FILE;
1164 DBUG_RETURN(rc);
1165 }
1166
1167 /*
1168 All calls that need to scan the table start with this method. If we are told
1169 that it is a table scan we rewind the file to the beginning, otherwise
1170 we assume the position will be set.
1171 */
1172
rnd_init(bool scan)1173 int ha_archive::rnd_init(bool scan)
1174 {
1175 DBUG_ENTER("ha_archive::rnd_init");
1176
1177 if (share->crashed)
1178 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1179
1180 if (init_archive_reader())
1181 DBUG_RETURN(errno);
1182
1183 /* We rewind the file so that we can read from the beginning if scan */
1184 if (scan)
1185 {
1186 scan_rows= stats.records;
1187 DBUG_PRINT("info", ("archive will retrieve %llu rows",
1188 (unsigned long long) scan_rows));
1189
1190 if (read_data_header(&archive))
1191 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1192 }
1193
1194 DBUG_RETURN(0);
1195 }
1196
1197
1198 /*
1199 This is the method that is used to read a row. It assumes that the row is
1200 positioned where you want it.
1201 */
get_row(azio_stream * file_to_read,uchar * buf)1202 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1203 {
1204 int rc;
1205 DBUG_ENTER("ha_archive::get_row");
1206 DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1207 (uchar)file_to_read->version,
1208 ARCHIVE_VERSION));
1209 if (file_to_read->version == ARCHIVE_VERSION)
1210 rc= get_row_version3(file_to_read, buf);
1211 else
1212 rc= get_row_version2(file_to_read, buf);
1213
1214 DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1215
1216 DBUG_RETURN(rc);
1217 }
1218
1219 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1220 bool ha_archive::fix_rec_buff(unsigned int length)
1221 {
1222 DBUG_ENTER("ha_archive::fix_rec_buff");
1223 DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1224 length, record_buffer->length));
1225 DBUG_ASSERT(record_buffer->buffer);
1226
1227 if (length > record_buffer->length)
1228 {
1229 uchar *newptr;
1230 if (!(newptr=(uchar*) my_realloc(PSI_INSTRUMENT_ME,
1231 (uchar*) record_buffer->buffer, length,
1232 MYF(MY_ALLOW_ZERO_PTR))))
1233 DBUG_RETURN(1);
1234 record_buffer->buffer= newptr;
1235 record_buffer->length= length;
1236 }
1237
1238 DBUG_ASSERT(length <= record_buffer->length);
1239
1240 DBUG_RETURN(0);
1241 }
1242
unpack_row(azio_stream * file_to_read,uchar * record)1243 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1244 {
1245 DBUG_ENTER("ha_archive::unpack_row");
1246
1247 unsigned int read;
1248 int error;
1249 uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1250 unsigned int row_len;
1251
1252 /* First we grab the length stored */
1253 read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1254
1255 if (error == Z_STREAM_ERROR || (read && read < ARCHIVE_ROW_HEADER_SIZE))
1256 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1257
1258 /* If we read nothing we are at the end of the file */
1259 if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1260 DBUG_RETURN(HA_ERR_END_OF_FILE);
1261
1262 row_len= uint4korr(size_buffer);
1263 DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1264 (unsigned int)table->s->reclength));
1265
1266 if (fix_rec_buff(row_len))
1267 {
1268 DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1269 }
1270 DBUG_ASSERT(row_len <= record_buffer->length);
1271
1272 read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1273
1274 if (read != row_len || error)
1275 {
1276 DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD);
1277 }
1278
1279 /* Copy null bits */
1280 const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len;
1281 memcpy(record, ptr, table->s->null_bytes);
1282 ptr+= table->s->null_bytes;
1283 if (ptr > end)
1284 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1285 for (Field **field=table->field ; *field ; field++)
1286 {
1287 if (!((*field)->is_null_in_record(record)))
1288 {
1289 if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]),
1290 ptr, end)))
1291 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1292 }
1293 }
1294 if (ptr != end)
1295 DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1296 DBUG_RETURN(0);
1297 }
1298
1299
get_row_version3(azio_stream * file_to_read,uchar * buf)1300 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1301 {
1302 DBUG_ENTER("ha_archive::get_row_version3");
1303
1304 int returnable= unpack_row(file_to_read, buf);
1305
1306 DBUG_RETURN(returnable);
1307 }
1308
1309
get_row_version2(azio_stream * file_to_read,uchar * buf)1310 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1311 {
1312 unsigned int read;
1313 int error;
1314 uint *ptr, *end;
1315 char *last;
1316 size_t total_blob_length= 0;
1317 MY_BITMAP *read_set= table->read_set;
1318 DBUG_ENTER("ha_archive::get_row_version2");
1319
1320 read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1321
1322 /* If we read nothing we are at the end of the file */
1323 if (read == 0)
1324 DBUG_RETURN(HA_ERR_END_OF_FILE);
1325
1326 if (read != table->s->reclength)
1327 {
1328 DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1329 read,
1330 (unsigned int)table->s->reclength));
1331 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1332 }
1333
1334 if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1335 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1336
1337 /*
1338 If the record is the wrong size, the file is probably damaged, unless
1339 we are dealing with a delayed insert or a bulk insert.
1340 */
1341 if ((ulong) read != table->s->reclength)
1342 DBUG_RETURN(HA_ERR_END_OF_FILE);
1343
1344 /* Calculate blob length, we use this for our buffer */
1345 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1346 ptr != end ;
1347 ptr++)
1348 {
1349 if (bitmap_is_set(read_set,
1350 (((Field_blob*) table->field[*ptr])->field_index)))
1351 total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1352 }
1353
1354 /* Adjust our row buffer if we need be */
1355 buffer.alloc(total_blob_length);
1356 last= (char *)buffer.ptr();
1357
1358 /* Loop through our blobs and read them */
1359 for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1360 ptr != end ;
1361 ptr++)
1362 {
1363 size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1364 if (size)
1365 {
1366 if (bitmap_is_set(read_set,
1367 ((Field_blob*) table->field[*ptr])->field_index))
1368 {
1369 read= azread(file_to_read, last, size, &error);
1370
1371 if (error)
1372 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1373
1374 if ((size_t) read != size)
1375 DBUG_RETURN(HA_ERR_END_OF_FILE);
1376 ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last);
1377 last += size;
1378 }
1379 else
1380 {
1381 (void)azseek(file_to_read, size, SEEK_CUR);
1382 }
1383 }
1384 }
1385 DBUG_RETURN(0);
1386 }
1387
1388
1389 /*
1390 Called during ORDER BY. Its position is either from being called sequentially
1391 or by having had ha_archive::rnd_pos() called before it is called.
1392 */
1393
rnd_next(uchar * buf)1394 int ha_archive::rnd_next(uchar *buf)
1395 {
1396 int rc;
1397 DBUG_ENTER("ha_archive::rnd_next");
1398
1399 if (share->crashed)
1400 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1401
1402 if (!scan_rows)
1403 {
1404 rc= HA_ERR_END_OF_FILE;
1405 goto end;
1406 }
1407 scan_rows--;
1408
1409 current_position= aztell(&archive);
1410 rc= get_row(&archive, buf);
1411
1412 end:
1413 DBUG_RETURN(rc);
1414 }
1415
1416
1417 /*
1418 Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1419 each call to ha_archive::rnd_next() if an ordering of the rows is
1420 needed.
1421 */
1422
position(const uchar * record)1423 void ha_archive::position(const uchar *record)
1424 {
1425 DBUG_ENTER("ha_archive::position");
1426 my_store_ptr(ref, ref_length, current_position);
1427 DBUG_VOID_RETURN;
1428 }
1429
1430
1431 /*
1432 This is called after a table scan for each row if the results of the
1433 scan need to be ordered. It will take *pos and use it to move the
1434 cursor in the file so that the next row that is called is the
1435 correctly ordered row.
1436 */
1437
rnd_pos(uchar * buf,uchar * pos)1438 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1439 {
1440 int rc;
1441 DBUG_ENTER("ha_archive::rnd_pos");
1442 current_position= (my_off_t)my_get_ptr(pos, ref_length);
1443 if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1444 {
1445 rc= HA_ERR_CRASHED_ON_USAGE;
1446 goto end;
1447 }
1448 rc= get_row(&archive, buf);
1449 end:
1450 DBUG_RETURN(rc);
1451 }
1452
1453
1454 /**
1455 @brief Check for upgrade
1456
1457 @param[in] check_opt check options
1458
1459 @return Completion status
1460 @retval HA_ADMIN_OK No upgrade required
1461 @retval HA_ADMIN_CORRUPT Cannot read meta-data
1462 @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1463 */
1464
check_for_upgrade(HA_CHECK_OPT * check_opt)1465 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1466 {
1467 DBUG_ENTER("ha_archive::check_for_upgrade");
1468 if (init_archive_reader())
1469 DBUG_RETURN(HA_ADMIN_CORRUPT);
1470 if (archive.version < ARCHIVE_VERSION)
1471 DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1472 DBUG_RETURN(HA_ADMIN_OK);
1473 }
1474
1475
1476 /*
1477 This method repairs the meta file. It does this by walking the datafile and
1478 rewriting the meta file. If EXTENDED repair is requested, we attempt to
1479 recover as much data as possible.
1480 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1481 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1482 {
1483 DBUG_ENTER("ha_archive::repair");
1484 int rc= optimize(thd, check_opt);
1485
1486 if (rc)
1487 DBUG_RETURN(HA_ADMIN_CORRUPT);
1488
1489 share->crashed= FALSE;
1490 DBUG_RETURN(0);
1491 }
1492
1493 /*
1494 The table can become fragmented if data was inserted, read, and then
1495 inserted again. What we do is open up the file and recompress it completely.
1496 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1497 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1498 {
1499 int rc= 0;
1500 azio_stream writer;
1501 char writer_filename[FN_REFLEN];
1502 DBUG_ENTER("ha_archive::optimize");
1503
1504 mysql_mutex_lock(&share->mutex);
1505
1506 if (init_archive_reader())
1507 {
1508 mysql_mutex_unlock(&share->mutex);
1509 DBUG_RETURN(errno);
1510 }
1511
1512 // now we close both our writer and our reader for the rename
1513 if (share->archive_write_open)
1514 {
1515 azclose(&(share->archive_write));
1516 share->archive_write_open= FALSE;
1517 }
1518
1519 /* Lets create a file to contain the new data */
1520 fn_format(writer_filename, share->table_name, "", ARN,
1521 MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1522
1523 if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1524 {
1525 mysql_mutex_unlock(&share->mutex);
1526 DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1527 }
1528
1529 /*
1530 Transfer the embedded FRM so that the file can be discoverable.
1531 Write file offset is set to the end of the file.
1532 */
1533 if ((rc= frm_copy(&archive, &writer)))
1534 goto error;
1535
1536 /*
1537 An extended rebuild is a lot more effort. We open up each row and re-record it.
1538 Any dead rows are removed (aka rows that may have been partially recorded).
1539
1540 As of Archive format 3, this is the only type that is performed, before this
1541 version it was just done on T_EXTEND
1542 */
1543 if (1)
1544 {
1545 DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1546
1547 /*
1548 Now we will rewind the archive file so that we are positioned at the
1549 start of the file.
1550 */
1551 rc= read_data_header(&archive);
1552
1553 /*
1554 On success of writing out the new header, we now fetch each row and
1555 insert it into the new archive file.
1556 */
1557 if (!rc)
1558 {
1559 share->rows_recorded= 0;
1560 stats.auto_increment_value= 1;
1561 share->archive_write.auto_increment= 0;
1562 MY_BITMAP *org_bitmap= tmp_use_all_columns(table, &table->read_set);
1563
1564 while (!(rc= get_row(&archive, table->record[0])))
1565 {
1566 real_write_row(table->record[0], &writer);
1567 /*
1568 Long term it should be possible to optimize this so that
1569 it is not called on each row.
1570 */
1571 if (table->found_next_number_field)
1572 {
1573 Field *field= table->found_next_number_field;
1574 ulonglong auto_value=
1575 (ulonglong) field->val_int(table->record[0] +
1576 field->offset(table->record[0]));
1577 if (share->archive_write.auto_increment < auto_value)
1578 stats.auto_increment_value=
1579 (share->archive_write.auto_increment= auto_value) + 1;
1580 }
1581 }
1582
1583 tmp_restore_column_map(&table->read_set, org_bitmap);
1584 share->rows_recorded= (ha_rows)writer.rows;
1585 }
1586
1587 DBUG_PRINT("info", ("recovered %llu archive rows",
1588 (unsigned long long)share->rows_recorded));
1589
1590 DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1591 (unsigned long long)share->rows_recorded));
1592
1593 /*
1594 If REPAIR ... EXTENDED is requested, try to recover as much data
1595 from data file as possible. In this case if we failed to read a
1596 record, we assume EOF. This allows massive data loss, but we can
1597 hardly do more with broken zlib stream. And this is the only way
1598 to restore at least what is still recoverable.
1599 */
1600 if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1601 goto error;
1602 }
1603
1604 azclose(&writer);
1605 share->dirty= FALSE;
1606
1607 azclose(&archive);
1608
1609 // make the file we just wrote be our data file
1610 rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1611
1612
1613 mysql_mutex_unlock(&share->mutex);
1614 DBUG_RETURN(rc);
1615 error:
1616 DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1617 azclose(&writer);
1618 mysql_mutex_unlock(&share->mutex);
1619
1620 DBUG_RETURN(rc);
1621 }
1622
1623 /*
1624 Below is an example of how to setup row level locking.
1625 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1626 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1627 THR_LOCK_DATA **to,
1628 enum thr_lock_type lock_type)
1629 {
1630 if (lock_type == TL_WRITE_DELAYED)
1631 delayed_insert= TRUE;
1632 else
1633 delayed_insert= FALSE;
1634
1635 if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1636 {
1637 /*
1638 Here is where we get into the guts of a row level lock.
1639 If TL_UNLOCK is set
1640 If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT
1641 TABLESPACE, then allow multiple writers
1642 */
1643
1644 if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1645 lock_type <= TL_WRITE) && delayed_insert == FALSE &&
1646 !thd_in_lock_tables(thd)
1647 && !thd_tablespace_op(thd))
1648 lock_type = TL_WRITE_ALLOW_WRITE;
1649
1650 /*
1651 In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1652 MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1653 would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1654 to t2. Convert the lock to a normal read lock to allow
1655 concurrent inserts to t2.
1656 */
1657
1658 if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1659 lock_type = TL_READ;
1660
1661 lock.type=lock_type;
1662 }
1663
1664 *to++= &lock;
1665
1666 return to;
1667 }
1668
update_create_info(HA_CREATE_INFO * create_info)1669 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1670 {
1671 char tmp_real_path[FN_REFLEN];
1672 DBUG_ENTER("ha_archive::update_create_info");
1673
1674 ha_archive::info(HA_STATUS_AUTO);
1675 if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1676 {
1677 create_info->auto_increment_value= stats.auto_increment_value;
1678 }
1679
1680 if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1681 create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path);
1682
1683 DBUG_VOID_RETURN;
1684 }
1685
1686 /*
1687 Hints for optimizer, see ha_tina for more information
1688 */
info(uint flag)1689 int ha_archive::info(uint flag)
1690 {
1691 DBUG_ENTER("ha_archive::info");
1692
1693 flush_and_clear_pending_writes();
1694 stats.deleted= 0;
1695
1696 DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1697 /* Costs quite a bit more to get all information */
1698 if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1699 {
1700 MY_STAT file_stat; // Stat information for the data file
1701
1702 (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME));
1703
1704 if (flag & HA_STATUS_TIME)
1705 stats.update_time= (ulong) file_stat.st_mtime;
1706 if (flag & HA_STATUS_CONST)
1707 {
1708 stats.max_data_file_length= MAX_FILE_SIZE;
1709 stats.create_time= (ulong) file_stat.st_ctime;
1710 }
1711 if (flag & HA_STATUS_VARIABLE)
1712 {
1713 stats.delete_length= 0;
1714 stats.data_file_length= file_stat.st_size;
1715 stats.index_file_length=0;
1716 stats.mean_rec_length= stats.records ?
1717 ulong(stats.data_file_length / stats.records) : table->s->reclength;
1718 }
1719 }
1720
1721 if (flag & HA_STATUS_AUTO)
1722 {
1723 if (init_archive_reader())
1724 DBUG_RETURN(errno);
1725
1726 mysql_mutex_lock(&share->mutex);
1727 azflush(&archive, Z_SYNC_FLUSH);
1728 mysql_mutex_unlock(&share->mutex);
1729 stats.auto_increment_value= archive.auto_increment + 1;
1730 }
1731
1732 DBUG_RETURN(0);
1733 }
1734
1735
external_lock(THD * thd,int lock_type)1736 int ha_archive::external_lock(THD *thd, int lock_type)
1737 {
1738 if (lock_type == F_RDLCK)
1739 {
1740 // We are going to read from the table. Flush any pending writes that we
1741 // may have
1742 flush_and_clear_pending_writes();
1743 }
1744 return 0;
1745 }
1746
1747
flush_and_clear_pending_writes()1748 void ha_archive::flush_and_clear_pending_writes()
1749 {
1750 mysql_mutex_lock(&share->mutex);
1751 if (share->dirty)
1752 {
1753 DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1754 DBUG_ASSERT(share->archive_write_open);
1755 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1756 share->dirty= FALSE;
1757 }
1758
1759 /*
1760 This should be an accurate number now, though bulk and delayed inserts can
1761 cause the number to be inaccurate.
1762 */
1763 stats.records= share->rows_recorded;
1764 mysql_mutex_unlock(&share->mutex);
1765 }
1766
1767
extra(enum ha_extra_function operation)1768 int ha_archive::extra(enum ha_extra_function operation)
1769 {
1770 switch (operation) {
1771 case HA_EXTRA_FLUSH:
1772 mysql_mutex_lock(&share->mutex);
1773 share->close_archive_writer();
1774 mysql_mutex_unlock(&share->mutex);
1775 break;
1776 default:
1777 break;
1778 }
1779 return 0;
1780 }
1781
1782 /*
1783 This method tells us that a bulk insert operation is about to occur. We set
1784 a flag which will keep write_row from saying that its data is dirty. This in
1785 turn will keep selects from causing a sync to occur.
1786 Basically, yet another optimizations to keep compression working well.
1787 */
start_bulk_insert(ha_rows rows,uint flags)1788 void ha_archive::start_bulk_insert(ha_rows rows, uint flags)
1789 {
1790 DBUG_ENTER("ha_archive::start_bulk_insert");
1791 if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1792 bulk_insert= TRUE;
1793 DBUG_VOID_RETURN;
1794 }
1795
1796
1797 /*
1798 Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1799 flag, and set the share dirty so that the next select will call sync for us.
1800 */
end_bulk_insert()1801 int ha_archive::end_bulk_insert()
1802 {
1803 DBUG_ENTER("ha_archive::end_bulk_insert");
1804 bulk_insert= FALSE;
1805 mysql_mutex_lock(&share->mutex);
1806 if (share->archive_write_open)
1807 share->dirty= true;
1808 mysql_mutex_unlock(&share->mutex);
1809 DBUG_RETURN(0);
1810 }
1811
1812 /*
1813 We cancel a truncate command. The only way to delete an archive table is to drop it.
1814 This is done for security reasons. In a later version we will enable this by
1815 allowing the user to select a different row format.
1816 */
truncate()1817 int ha_archive::truncate()
1818 {
1819 DBUG_ENTER("ha_archive::truncate");
1820 DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1821 }
1822
1823 /*
1824 We just return state if asked.
1825 */
is_crashed() const1826 bool ha_archive::is_crashed() const
1827 {
1828 DBUG_ENTER("ha_archive::is_crashed");
1829 DBUG_RETURN(share->crashed);
1830 }
1831
1832 /*
1833 Simple scan of the tables to make sure everything is ok.
1834 */
1835
check(THD * thd,HA_CHECK_OPT * check_opt)1836 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1837 {
1838 int rc= 0;
1839 const char *old_proc_info;
1840 ha_rows count;
1841 DBUG_ENTER("ha_archive::check");
1842
1843 old_proc_info= thd_proc_info(thd, "Checking table");
1844 mysql_mutex_lock(&share->mutex);
1845 count= share->rows_recorded;
1846 /* Flush any waiting data */
1847 if (share->archive_write_open)
1848 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1849 mysql_mutex_unlock(&share->mutex);
1850
1851 if (init_archive_reader())
1852 DBUG_RETURN(HA_ADMIN_CORRUPT);
1853 /*
1854 Now we will rewind the archive file so that we are positioned at the
1855 start of the file.
1856 */
1857 read_data_header(&archive);
1858 for (ha_rows cur_count= count; cur_count; cur_count--)
1859 {
1860 if ((rc= get_row(&archive, table->record[0])))
1861 goto error;
1862 }
1863 /*
1864 Now read records that may have been inserted concurrently.
1865 Acquire share->mutex so tail of the table is not modified by
1866 concurrent writers.
1867 */
1868 mysql_mutex_lock(&share->mutex);
1869 count= share->rows_recorded - count;
1870 if (share->archive_write_open)
1871 azflush(&(share->archive_write), Z_SYNC_FLUSH);
1872 while (!(rc= get_row(&archive, table->record[0])))
1873 count--;
1874 mysql_mutex_unlock(&share->mutex);
1875
1876 if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1877 goto error;
1878
1879 thd_proc_info(thd, old_proc_info);
1880 DBUG_RETURN(HA_ADMIN_OK);
1881
1882 error:
1883 thd_proc_info(thd, old_proc_info);
1884 share->crashed= FALSE;
1885 DBUG_RETURN(HA_ADMIN_CORRUPT);
1886 }
1887
1888 /*
1889 Check and repair the table if needed.
1890 */
check_and_repair(THD * thd)1891 bool ha_archive::check_and_repair(THD *thd)
1892 {
1893 HA_CHECK_OPT check_opt;
1894 DBUG_ENTER("ha_archive::check_and_repair");
1895
1896 check_opt.init();
1897
1898 DBUG_RETURN(repair(thd, &check_opt));
1899 }
1900
create_record_buffer(unsigned int length)1901 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1902 {
1903 DBUG_ENTER("ha_archive::create_record_buffer");
1904 archive_record_buffer *r;
1905 if (!(r= (archive_record_buffer*) my_malloc(PSI_INSTRUMENT_ME,
1906 sizeof(archive_record_buffer), MYF(MY_WME))))
1907 {
1908 DBUG_RETURN(NULL); /* purecov: inspected */
1909 }
1910 r->length= (int)length;
1911
1912 if (!(r->buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, r->length, MYF(MY_WME))))
1913 {
1914 my_free(r);
1915 DBUG_RETURN(NULL); /* purecov: inspected */
1916 }
1917
1918 DBUG_RETURN(r);
1919 }
1920
destroy_record_buffer(archive_record_buffer * r)1921 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1922 {
1923 DBUG_ENTER("ha_archive::destroy_record_buffer");
1924 my_free(r->buffer);
1925 my_free(r);
1926 DBUG_VOID_RETURN;
1927 }
1928
1929 /*
1930 In archive *any* ALTER should cause a table to be rebuilt,
1931 no ALTER can be frm-only.
1932 Because after any change to the frm file archive must update the
1933 frm image in the ARZ file. And this cannot be done in-place, it
1934 requires ARZ file to be recreated from scratch
1935 */
check_if_incompatible_data(HA_CREATE_INFO * info_arg,uint table_changes)1936 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg,
1937 uint table_changes)
1938 {
1939 return COMPATIBLE_DATA_NO;
1940 }
1941
1942
1943 struct st_mysql_storage_engine archive_storage_engine=
1944 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1945
maria_declare_plugin(archive)1946 maria_declare_plugin(archive)
1947 {
1948 MYSQL_STORAGE_ENGINE_PLUGIN,
1949 &archive_storage_engine,
1950 "ARCHIVE",
1951 "Brian Aker, MySQL AB",
1952 "gzip-compresses tables for a low storage footprint",
1953 PLUGIN_LICENSE_GPL,
1954 archive_db_init, /* Plugin Init */
1955 NULL, /* Plugin Deinit */
1956 0x0300 /* 3.0 */,
1957 NULL, /* status variables */
1958 NULL, /* system variables */
1959 "1.0", /* string version */
1960 MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
1961 }
1962 maria_declare_plugin_end;
1963
1964