1 /*
2    Copyright (c) 2004, 2014, Oracle and/or its affiliates
3    Copyright (c) 2010, 2014, SkySQL Ab.
4 
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License
7    as published by the Free Software Foundation; version 2 of
8    the License.
9 
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
18 */
19 
20 #ifdef USE_PRAGMA_IMPLEMENTATION
21 #pragma implementation        // gcc: Class implementation
22 #endif
23 
24 #include <my_global.h>
25 #include "sql_class.h"                          // SSV
26 #include "sql_table.h"                          // build_table_filename
27 #include <myisam.h>                             // T_EXTEND
28 
29 #include "ha_archive.h"
30 #include "discover.h"
31 #include <my_dir.h>
32 
33 #include <mysql/plugin.h>
34 
35 /*
36   First, if you want to understand storage engines you should look at
37   ha_example.cc and ha_example.h.
38 
39   This example was written as a test case for a customer who needed
40   a storage engine without indexes that could compress data very well.
41   So, welcome to a completely compressed storage engine. This storage
42   engine only does inserts. No replace, deletes, or updates. All reads are
43   complete table scans. Compression is done through a combination of packing
44   and making use of the zlib library
45 
46   We keep a file pointer open for each instance of ha_archive for each read
47   but for writes we keep one open file handle just for that. We flush it
48   only if we have a read occur. azip handles compressing lots of records
49   at once much better then doing lots of little records between writes.
50   It is possible to not lock on writes but this would then mean we couldn't
51   handle bulk inserts as well (that is if someone was trying to read at
52   the same time since we would want to flush).
53 
54   A "meta" file is kept alongside the data file. This file serves two purpose.
55   The first purpose is to track the number of rows in the table. The second
56   purpose is to determine if the table was closed properly or not. When the
57   meta file is first opened it is marked as dirty. It is opened when the table
58   itself is opened for writing. When the table is closed the new count for rows
59   is written to the meta file and the file is marked as clean. If the meta file
60   is opened and it is marked as dirty, it is assumed that a crash occurred. At
61   this point an error occurs and the user is told to rebuild the file.
62   A rebuild scans the rows and rewrites the meta file. If corruption is found
63   in the data file then the meta file is not repaired.
64 
65   At some point a recovery method for such a drastic case needs to be divised.
66 
67   Locks are row level, and you will get a consistant read.
68 
69   For performance as far as table scans go it is quite fast. I don't have
70   good numbers but locally it has out performed both Innodb and MyISAM. For
71   Innodb the question will be if the table can be fit into the buffer
72   pool. For MyISAM its a question of how much the file system caches the
73   MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
74   doesn't have enough memory to cache entire table that archive turns out
75   to be any faster.
76 
77   Examples between MyISAM (packed) and Archive.
78 
79   Table with 76695844 identical rows:
80   29680807 a_archive.ARZ
81   920350317 a.MYD
82 
83 
84   Table with 8991478 rows (all of Slashdot's comments):
85   1922964506 comment_archive.ARZ
86   2944970297 comment_text.MYD
87 
88 
89   TODO:
90    Allow users to set compression level.
91    Allow adjustable block size.
92    Implement versioning, should be easy.
93    Allow for errors, find a way to mark bad rows.
94    Add optional feature so that rows can be flushed at interval (which will cause less
95      compression but may speed up ordered searches).
96    Checkpoint the meta file to allow for faster rebuilds.
97    Option to allow for dirty reads, this would lower the sync calls, which would make
98      inserts a lot faster, but would mean highly arbitrary reads.
99 
100     -Brian
101 
102   Archive file format versions:
103   <5.1.5 - v.1
104   5.1.5-5.1.15 - v.2
105   >5.1.15 - v.3
106 */
107 
108 /* The file extension */
109 #define ARZ ".ARZ"               // The data file
110 #define ARN ".ARN"               // Files used during an optimize call
111 #define ARM ".ARM"               // Meta file (deprecated)
112 
113 /* 5.0 compatibility */
114 #define META_V1_OFFSET_CHECK_HEADER  0
115 #define META_V1_OFFSET_VERSION       1
116 #define META_V1_OFFSET_ROWS_RECORDED 2
117 #define META_V1_OFFSET_CHECK_POINT   10
118 #define META_V1_OFFSET_CRASHED       18
119 #define META_V1_LENGTH               19
120 
121 /*
122   uchar + uchar
123 */
124 #define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
125 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
126 
127 #ifdef HAVE_PSI_INTERFACE
128 extern "C" PSI_file_key arch_key_file_data;
129 #endif
130 
131 /* Static declarations for handerton */
132 static handler *archive_create_handler(handlerton *hton,
133                                        TABLE_SHARE *table,
134                                        MEM_ROOT *mem_root);
135 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share);
136 
137 /*
138   Number of rows that will force a bulk insert.
139 */
140 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
141 
142 /*
143   Size of header used for row
144 */
145 #define ARCHIVE_ROW_HEADER_SIZE 4
146 
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)147 static handler *archive_create_handler(handlerton *hton,
148                                        TABLE_SHARE *table,
149                                        MEM_ROOT *mem_root)
150 {
151   return new (mem_root) ha_archive(hton, table);
152 }
153 
154 #ifdef HAVE_PSI_INTERFACE
155 PSI_mutex_key az_key_mutex_Archive_share_mutex;
156 
157 static PSI_mutex_info all_archive_mutexes[]=
158 {
159   { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
160 };
161 
162 PSI_file_key arch_key_file_metadata, arch_key_file_data;
163 static PSI_file_info all_archive_files[]=
164 {
165     { &arch_key_file_metadata, "metadata", 0},
166     { &arch_key_file_data, "data", 0}
167 };
168 
init_archive_psi_keys(void)169 static void init_archive_psi_keys(void)
170 {
171   const char* category= "archive";
172   int count;
173 
174   if (!PSI_server)
175     return;
176 
177   count= array_elements(all_archive_mutexes);
178   mysql_mutex_register(category, all_archive_mutexes, count);
179 
180   count= array_elements(all_archive_files);
181   mysql_file_register(category, all_archive_files, count);
182 }
183 
184 #endif /* HAVE_PSI_INTERFACE */
185 
186 /*
187   Initialize the archive handler.
188 
189   SYNOPSIS
190     archive_db_init()
191     void *
192 
193   RETURN
194     FALSE       OK
195     TRUE        Error
196 */
197 
198 /*
199   We just implement one additional file extension.
200   ARM is here just to properly drop 5.0 tables.
201 */
202 static const char *ha_archive_exts[] = {
203   ARZ,
204   ARM,
205   NullS
206 };
207 
archive_db_init(void * p)208 int archive_db_init(void *p)
209 {
210   DBUG_ENTER("archive_db_init");
211   handlerton *archive_hton;
212 
213 #ifdef HAVE_PSI_INTERFACE
214   init_archive_psi_keys();
215 #endif
216 
217   archive_hton= (handlerton *)p;
218   archive_hton->state= SHOW_OPTION_YES;
219   archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
220   archive_hton->create= archive_create_handler;
221   archive_hton->flags= HTON_NO_FLAGS;
222   archive_hton->discover_table= archive_discover;
223   archive_hton->tablefile_extensions= ha_archive_exts;
224 
225   DBUG_RETURN(0);
226 }
227 
228 
Archive_share()229 Archive_share::Archive_share()
230 {
231   crashed= false;
232   in_optimize= false;
233   archive_write_open= false;
234   dirty= false;
235   DBUG_PRINT("ha_archive", ("Archive_share: %p",
236                             this));
237   thr_lock_init(&lock);
238   /*
239     We will use this lock for rows.
240   */
241   mysql_mutex_init(az_key_mutex_Archive_share_mutex,
242                    &mutex, MY_MUTEX_INIT_FAST);
243 }
244 
245 
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)246 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
247   :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
248 {
249   /* Set our original buffer from pre-allocated memory */
250   buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
251 
252   /* The size of the offset value we will use for position() */
253   ref_length= sizeof(my_off_t);
254   archive_reader_open= FALSE;
255 }
256 
archive_discover(handlerton * hton,THD * thd,TABLE_SHARE * share)257 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
258 {
259   DBUG_ENTER("archive_discover");
260   DBUG_PRINT("archive_discover", ("db: '%s'  name: '%s'", share->db.str,
261                                   share->table_name.str));
262   azio_stream frm_stream;
263   char az_file[FN_REFLEN];
264   uchar *frm_ptr;
265   MY_STAT file_stat;
266 
267   strxmov(az_file, share->normalized_path.str, ARZ, NullS);
268 
269   if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0))))
270     DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
271 
272   if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
273   {
274     if (errno == EROFS || errno == EACCES)
275       DBUG_RETURN(my_errno= errno);
276     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
277   }
278 
279   if (frm_stream.frm_length == 0)
280     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
281 
282   frm_ptr= (uchar *)my_malloc(sizeof(char) * frm_stream.frm_length,
283                               MYF(MY_THREAD_SPECIFIC | MY_WME));
284   if (!frm_ptr)
285     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
286 
287   if (azread_frm(&frm_stream, frm_ptr))
288     goto ret;
289 
290   azclose(&frm_stream);
291 
292   my_errno= share->init_from_binary_frm_image(thd, 1,
293                                               frm_ptr, frm_stream.frm_length);
294 ret:
295   my_free(frm_ptr);
296   DBUG_RETURN(my_errno);
297 }
298 
299 /**
300   @brief Read version 1 meta file (5.0 compatibility routine).
301 
302   @return Completion status
303     @retval  0 Success
304     @retval !0 Failure
305 */
306 
read_v1_metafile()307 int Archive_share::read_v1_metafile()
308 {
309   char file_name[FN_REFLEN];
310   uchar buf[META_V1_LENGTH];
311   File fd;
312   DBUG_ENTER("Archive_share::read_v1_metafile");
313 
314   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
315   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
316     DBUG_RETURN(-1);
317 
318   if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
319   {
320     mysql_file_close(fd, MYF(0));
321     DBUG_RETURN(-1);
322   }
323 
324   rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
325   crashed= buf[META_V1_OFFSET_CRASHED];
326   mysql_file_close(fd, MYF(0));
327   DBUG_RETURN(0);
328 }
329 
330 
331 /**
332   @brief Write version 1 meta file (5.0 compatibility routine).
333 
334   @return Completion status
335     @retval  0 Success
336     @retval !0 Failure
337 */
338 
write_v1_metafile()339 int Archive_share::write_v1_metafile()
340 {
341   char file_name[FN_REFLEN];
342   uchar buf[META_V1_LENGTH];
343   File fd;
344   DBUG_ENTER("Archive_share::write_v1_metafile");
345 
346   buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
347   buf[META_V1_OFFSET_VERSION]= 1;
348   int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
349   int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
350   buf[META_V1_OFFSET_CRASHED]= crashed;
351 
352   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
353   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
354     DBUG_RETURN(-1);
355 
356   if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
357   {
358     mysql_file_close(fd, MYF(0));
359     DBUG_RETURN(-1);
360   }
361 
362   mysql_file_close(fd, MYF(0));
363   DBUG_RETURN(0);
364 }
365 
366 /**
367   @brief Pack version 1 row (5.0 compatibility routine).
368 
369   @param[in]  record  the record to pack
370 
371   @return Length of packed row
372 */
373 
pack_row_v1(uchar * record)374 unsigned int ha_archive::pack_row_v1(uchar *record)
375 {
376   uint *blob, *end;
377   uchar *pos;
378   DBUG_ENTER("pack_row_v1");
379   memcpy(record_buffer->buffer, record, table->s->reclength);
380 
381   /*
382     The end of VARCHAR fields are filled with garbage,so here
383     we explicitly set the end of the VARCHAR fields with zeroes
384   */
385 
386   for (Field** field= table->field; (*field) ; field++)
387   {
388     Field *fld= *field;
389     if (fld->type() == MYSQL_TYPE_VARCHAR)
390     {
391       if (!(fld->is_real_null(record - table->record[0])))
392       {
393         ptrdiff_t  start= (fld->ptr - table->record[0]);
394         Field_varstring *const field_var= (Field_varstring *)fld;
395         uint offset= field_var->data_length() + field_var->length_size();
396         memset(record_buffer->buffer + start + offset, 0,
397                fld->field_length - offset + 1);
398       }
399     }
400   }
401   pos= record_buffer->buffer + table->s->reclength;
402   for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
403        blob != end; blob++)
404   {
405     uint32 length= ((Field_blob *) table->field[*blob])->get_length();
406     if (length)
407     {
408       uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr();
409       memcpy(pos, data_ptr, length);
410       pos+= length;
411     }
412   }
413   DBUG_RETURN((int)(pos - record_buffer->buffer));
414 }
415 
416 /*
417   This method reads the header of a datafile and returns whether or not it was successful.
418 */
read_data_header(azio_stream * file_to_read)419 int ha_archive::read_data_header(azio_stream *file_to_read)
420 {
421   int error;
422   unsigned long ret;
423   uchar data_buffer[DATA_BUFFER_SIZE];
424   DBUG_ENTER("ha_archive::read_data_header");
425 
426   if (azrewind(file_to_read) == -1)
427     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
428 
429   if (file_to_read->version >= 3)
430     DBUG_RETURN(0);
431   /* Everything below this is just legacy to version 2< */
432 
433   DBUG_PRINT("ha_archive", ("Reading legacy data header"));
434 
435   ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
436 
437   if (ret != DATA_BUFFER_SIZE)
438   {
439     DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
440                               DATA_BUFFER_SIZE, ret));
441     DBUG_RETURN(1);
442   }
443 
444   if (error)
445   {
446     DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
447     DBUG_RETURN(1);
448   }
449 
450   DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
451   DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
452 
453   if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
454       (data_buffer[1] == 1 || data_buffer[1] == 2))
455     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
456 
457   DBUG_RETURN(0);
458 }
459 
460 
461 /*
462   We create the shared memory space that we will use for the open table.
463   No matter what we try to get or create a share. This is so that a repair
464   table operation can occur.
465 
466   See ha_example.cc for a longer description.
467 */
get_share(const char * table_name,int * rc)468 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
469 {
470   Archive_share *tmp_share;
471 
472   DBUG_ENTER("ha_archive::get_share");
473 
474   lock_shared_ha_data();
475   if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
476   {
477     azio_stream archive_tmp;
478 
479     tmp_share= new Archive_share;
480 
481     if (!tmp_share)
482     {
483       *rc= HA_ERR_OUT_OF_MEM;
484       goto err;
485     }
486     DBUG_PRINT("ha_archive", ("new Archive_share: %p",
487                               tmp_share));
488 
489     fn_format(tmp_share->data_file_name, table_name, "",
490               ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
491     strmov(tmp_share->table_name, table_name);
492     DBUG_PRINT("ha_archive", ("Data File %s",
493                         tmp_share->data_file_name));
494 
495     /*
496       We read the meta file, but do not mark it dirty. Since we are not
497       doing a write we won't mark it dirty (and we won't open it for
498       anything but reading... open it for write and we will generate null
499       compression writes).
500     */
501     if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
502     {
503       delete tmp_share;
504       *rc= my_errno ? my_errno : HA_ERR_CRASHED;
505       tmp_share= NULL;
506       goto err;
507     }
508     stats.auto_increment_value= archive_tmp.auto_increment + 1;
509     tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
510     tmp_share->crashed= archive_tmp.dirty;
511     share= tmp_share;
512     if (archive_tmp.version == 1)
513       share->read_v1_metafile();
514     else if (frm_compare(&archive_tmp))
515       *rc= HA_ERR_TABLE_DEF_CHANGED;
516 
517     azclose(&archive_tmp);
518 
519     set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
520   }
521   if (tmp_share->crashed)
522     *rc= HA_ERR_CRASHED_ON_USAGE;
523 err:
524   unlock_shared_ha_data();
525 
526   DBUG_ASSERT(tmp_share || *rc);
527 
528   DBUG_RETURN(tmp_share);
529 }
530 
531 
init_archive_writer()532 int Archive_share::init_archive_writer()
533 {
534   DBUG_ENTER("Archive_share::init_archive_writer");
535   /*
536     It is expensive to open and close the data files and since you can't have
537     a gzip file that can be both read and written we keep a writer open
538     that is shared amoung all open tables.
539   */
540   if (!(azopen(&archive_write, data_file_name,
541                O_RDWR|O_BINARY)))
542   {
543     DBUG_PRINT("ha_archive", ("Could not open archive write file"));
544     crashed= true;
545     DBUG_RETURN(1);
546   }
547   archive_write_open= true;
548 
549   DBUG_RETURN(0);
550 }
551 
552 
close_archive_writer()553 void Archive_share::close_archive_writer()
554 {
555   mysql_mutex_assert_owner(&mutex);
556   if (archive_write_open)
557   {
558     if (archive_write.version == 1)
559       (void) write_v1_metafile();
560     azclose(&archive_write);
561     archive_write_open= false;
562     dirty= false;
563   }
564 }
565 
566 
567 /*
568   No locks are required because it is associated with just one handler instance
569 */
init_archive_reader()570 int ha_archive::init_archive_reader()
571 {
572   DBUG_ENTER("ha_archive::init_archive_reader");
573   /*
574     It is expensive to open and close the data files and since you can't have
575     a gzip file that can be both read and written we keep a writer open
576     that is shared amoung all open tables, but have one reader open for
577     each handler instance.
578   */
579   if (!archive_reader_open)
580   {
581     if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
582     {
583       DBUG_PRINT("ha_archive", ("Could not open archive read file"));
584       share->crashed= TRUE;
585       DBUG_RETURN(1);
586     }
587     archive_reader_open= TRUE;
588   }
589 
590   DBUG_RETURN(0);
591 }
592 
593 
594 /*
595   When opening a file we:
596   Create/get our shared structure.
597   Init out lock.
598   We open the file we will read from.
599 */
open(const char * name,int mode,uint open_options)600 int ha_archive::open(const char *name, int mode, uint open_options)
601 {
602   int rc= 0;
603   DBUG_ENTER("ha_archive::open");
604 
605   DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
606                       (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
607   share= get_share(name, &rc);
608   if (!share)
609     DBUG_RETURN(rc);
610 
611   /* Allow open on crashed table in repair mode only. */
612   switch (rc)
613   {
614   case 0:
615     break;
616   case HA_ERR_TABLE_DEF_CHANGED:
617   case HA_ERR_CRASHED_ON_USAGE:
618     if (open_options & HA_OPEN_FOR_REPAIR)
619     {
620       rc= 0;
621       break;
622     }
623     /* fall through */
624   default:
625     DBUG_RETURN(rc);
626   }
627 
628   DBUG_ASSERT(share);
629 
630   record_buffer= create_record_buffer(table->s->reclength +
631                                       ARCHIVE_ROW_HEADER_SIZE);
632 
633   if (!record_buffer)
634     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
635 
636   thr_lock_data_init(&share->lock, &lock, NULL);
637 
638   DBUG_PRINT("ha_archive", ("archive table was crashed %s",
639                       rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
640   if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
641   {
642     DBUG_RETURN(0);
643   }
644 
645   DBUG_RETURN(rc);
646 }
647 
648 
649 /*
650   Closes the file.
651 
652   SYNOPSIS
653     close();
654 
655   IMPLEMENTATION:
656 
657   We first close this storage engines file handle to the archive and
658   then remove our reference count to the table (and possibly free it
659   as well).
660 
661   RETURN
662     0  ok
663     1  Error
664 */
665 
close(void)666 int ha_archive::close(void)
667 {
668   int rc= 0;
669   DBUG_ENTER("ha_archive::close");
670 
671   destroy_record_buffer(record_buffer);
672 
673   /* First close stream */
674   if (archive_reader_open)
675   {
676     if (azclose(&archive))
677       rc= 1;
678   }
679 
680   DBUG_RETURN(rc);
681 }
682 
683 
684 /**
685   Copy a frm blob between streams.
686 
687   @param  src   The source stream.
688   @param  dst   The destination stream.
689 
690   @return Zero on success, non-zero otherwise.
691 */
692 
frm_copy(azio_stream * src,azio_stream * dst)693 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
694 {
695   int rc= 0;
696   uchar *frm_ptr;
697 
698   if (!src->frm_length)
699   {
700     size_t frm_len;
701     if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len))
702     {
703       azwrite_frm(dst, frm_ptr, frm_len);
704       table_share->free_frm_image(frm_ptr);
705     }
706     return 0;
707   }
708 
709   if (!(frm_ptr= (uchar *) my_malloc(src->frm_length,
710                                      MYF(MY_THREAD_SPECIFIC | MY_WME))))
711     return HA_ERR_OUT_OF_MEM;
712 
713   /* Write file offset is set to the end of the file. */
714   if (azread_frm(src, frm_ptr) ||
715       azwrite_frm(dst, frm_ptr, src->frm_length))
716     rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
717 
718   my_free(frm_ptr);
719 
720   return rc;
721 }
722 
723 
724 /**
725   Compare frm blob with the on-disk frm file
726 
727   @param  s     The azio stream.
728 
729   @return Zero if equal, non-zero otherwise.
730 */
731 
frm_compare(azio_stream * s)732 int ha_archive::frm_compare(azio_stream *s)
733 {
734   if (!s->frmver_length)
735     return 0; // Old pre-10.0 archive table. Never rediscover.
736 
737   LEX_CUSTRING *ver= &table->s->tabledef_version;
738   return ver->length != s->frmver_length ||
739          memcmp(ver->str,  s->frmver, ver->length);
740 }
741 
742 
743 /*
744   We create our data file here. The format is pretty simple.
745   You can read about the format of the data file above.
746   Unlike other storage engines we do not "pack" our data. Since we
747   are about to do a general compression, packing would just be a waste of
748   CPU time. If the table has blobs they are written after the row in the order
749   of creation.
750 */
751 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)752 int ha_archive::create(const char *name, TABLE *table_arg,
753                        HA_CREATE_INFO *create_info)
754 {
755   char name_buff[FN_REFLEN];
756   char linkname[FN_REFLEN];
757   int error;
758   azio_stream create_stream;            /* Archive file we are working with */
759   const uchar *frm_ptr;
760   size_t frm_len;
761 
762   DBUG_ENTER("ha_archive::create");
763 
764   stats.auto_increment_value= create_info->auto_increment_value;
765 
766   for (uint key= 0; key < table_arg->s->keys; key++)
767   {
768     KEY *pos= table_arg->key_info+key;
769     KEY_PART_INFO *key_part=     pos->key_part;
770     KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
771 
772     for (; key_part != key_part_end; key_part++)
773     {
774       Field *field= key_part->field;
775 
776       if (!(field->flags & AUTO_INCREMENT_FLAG))
777       {
778         error= HA_WRONG_CREATE_OPTION;
779         DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
780         goto error;
781       }
782     }
783   }
784 
785   /*
786     We reuse name_buff since it is available.
787   */
788 #ifdef HAVE_READLINK
789   if (my_use_symdir &&
790       create_info->data_file_name &&
791       create_info->data_file_name[0] != '#')
792   {
793     DBUG_PRINT("ha_archive", ("archive will create stream file %s",
794                         create_info->data_file_name));
795 
796     fn_format(name_buff, create_info->data_file_name, "", ARZ,
797               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
798     fn_format(linkname, name, "", ARZ,
799               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
800   }
801   else
802 #endif /* HAVE_READLINK */
803   {
804     if (create_info->data_file_name)
805       my_error(WARN_OPTION_IGNORED, MYF(ME_JUST_WARNING), "DATA DIRECTORY");
806 
807     fn_format(name_buff, name, "", ARZ,
808               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
809     linkname[0]= 0;
810   }
811 
812   /* Archive engine never uses INDEX DIRECTORY. */
813   if (create_info->index_file_name)
814       my_error(WARN_OPTION_IGNORED, MYF(ME_JUST_WARNING), "INDEX DIRECTORY");
815 
816   /*
817     There is a chance that the file was "discovered". In this case
818     just use whatever file is there.
819   */
820   my_errno= 0;
821   if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
822   {
823     error= errno;
824     goto error2;
825   }
826 
827   if (linkname[0])
828     my_symlink(name_buff, linkname, MYF(0));
829 
830   /*
831     Here is where we open up the frm and pass it to archive to store
832   */
833   if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len))
834   {
835     azwrite_frm(&create_stream, frm_ptr, frm_len);
836     table_arg->s->free_frm_image(frm_ptr);
837   }
838 
839   if (create_info->comment.str)
840     azwrite_comment(&create_stream, create_info->comment.str,
841                     create_info->comment.length);
842 
843   /*
844     Yes you need to do this, because the starting value
845     for the autoincrement may not be zero.
846   */
847   create_stream.auto_increment= stats.auto_increment_value ?
848                                   stats.auto_increment_value - 1 : 0;
849   if (azclose(&create_stream))
850   {
851     error= errno;
852     goto error2;
853   }
854 
855   DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
856   DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
857 
858 
859   DBUG_RETURN(0);
860 
861 error2:
862   delete_table(name);
863 error:
864   /* Return error number, if we got one */
865   DBUG_RETURN(error ? error : -1);
866 }
867 
868 /*
869   This is where the actual row is written out.
870 */
real_write_row(uchar * buf,azio_stream * writer)871 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
872 {
873   my_off_t written;
874   unsigned int r_pack_length;
875   DBUG_ENTER("ha_archive::real_write_row");
876 
877   /* We pack the row for writing */
878   r_pack_length= pack_row(buf, writer);
879 
880   written= azwrite(writer, record_buffer->buffer, r_pack_length);
881   if (written != r_pack_length)
882   {
883     DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
884                                               (uint32) written,
885                                               (uint32)r_pack_length));
886     DBUG_RETURN(-1);
887   }
888 
889   if (!delayed_insert || !bulk_insert)
890     share->dirty= TRUE;
891 
892   DBUG_RETURN(0);
893 }
894 
895 
896 /*
897   Calculate max length needed for row. This includes
898   the bytes required for the length in the header.
899 */
900 
max_row_length(const uchar * record)901 uint32 ha_archive::max_row_length(const uchar *record)
902 {
903   uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
904   length+= ARCHIVE_ROW_HEADER_SIZE;
905   my_ptrdiff_t const rec_offset= record - table->record[0];
906 
907   uint *ptr, *end;
908   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
909        ptr != end ;
910        ptr++)
911   {
912     if (!table->field[*ptr]->is_null(rec_offset))
913       length += 2 + ((Field_blob*)table->field[*ptr])->get_length(rec_offset);
914   }
915 
916   return length;
917 }
918 
919 
pack_row(uchar * record,azio_stream * writer)920 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
921 {
922   uchar *ptr;
923   my_ptrdiff_t const rec_offset= record - table->record[0];
924   DBUG_ENTER("ha_archive::pack_row");
925 
926   if (fix_rec_buff(max_row_length(record)))
927     DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
928 
929   if (writer->version == 1)
930     DBUG_RETURN(pack_row_v1(record));
931 
932   /* Copy null bits */
933   memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
934          record, table->s->null_bytes);
935   ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
936 
937   for (Field **field=table->field ; *field ; field++)
938   {
939     if (!((*field)->is_null(rec_offset)))
940       ptr= (*field)->pack(ptr, record + (*field)->offset(record));
941   }
942 
943   int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
944                                          ARCHIVE_ROW_HEADER_SIZE));
945   DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
946                            (ptr - record_buffer->buffer -
947                              ARCHIVE_ROW_HEADER_SIZE)));
948 
949   DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
950 }
951 
952 
953 /*
954   Look at ha_archive::open() for an explanation of the row format.
955   Here we just write out the row.
956 
957   Wondering about start_bulk_insert()? We don't implement it for
958   archive since it optimizes for lots of writes. The only save
959   for implementing start_bulk_insert() is that we could skip
960   setting dirty to true each time.
961 */
write_row(uchar * buf)962 int ha_archive::write_row(uchar *buf)
963 {
964   int rc;
965   uchar *read_buf= NULL;
966   ulonglong temp_auto;
967   uchar *record=  table->record[0];
968   DBUG_ENTER("ha_archive::write_row");
969 
970   if (share->crashed)
971     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
972 
973   mysql_mutex_lock(&share->mutex);
974 
975   if (!share->archive_write_open && share->init_archive_writer())
976   {
977     rc= errno;
978     goto error;
979   }
980 
981   if (table->next_number_field && record == table->record[0])
982   {
983     KEY *mkey= &table->s->key_info[0]; // We only support one key right now
984     update_auto_increment();
985     temp_auto= table->next_number_field->val_int();
986 
987     /*
988       We don't support decremening auto_increment. They make the performance
989       just cry.
990     */
991     if (temp_auto <= share->archive_write.auto_increment &&
992         mkey->flags & HA_NOSAME)
993     {
994       rc= HA_ERR_FOUND_DUPP_KEY;
995       goto error;
996     }
997 #ifdef DEAD_CODE
998     /*
999       Bad news, this will cause a search for the unique value which is very
1000       expensive since we will have to do a table scan which will lock up
1001       all other writers during this period. This could perhaps be optimized
1002       in the future.
1003     */
1004     {
1005       /*
1006         First we create a buffer that we can use for reading rows, and can pass
1007         to get_row().
1008       */
1009       if (!(read_buf= (uchar*) my_malloc(table->s->reclength,
1010                                          MYF(MY_THREAD_SPECIFIC | MY_WME))))
1011       {
1012         rc= HA_ERR_OUT_OF_MEM;
1013         goto error;
1014       }
1015        /*
1016          All of the buffer must be written out or we won't see all of the
1017          data
1018        */
1019       azflush(&(share->archive_write), Z_SYNC_FLUSH);
1020       /*
1021         Set the position of the local read thread to the beginning position.
1022       */
1023       if (read_data_header(&archive))
1024       {
1025         rc= HA_ERR_CRASHED_ON_USAGE;
1026         goto error;
1027       }
1028 
1029       Field *mfield= table->next_number_field;
1030 
1031       while (!(get_row(&archive, read_buf)))
1032       {
1033         if (!memcmp(read_buf + mfield->offset(record),
1034                     table->next_number_field->ptr,
1035                     mfield->max_display_length()))
1036         {
1037           rc= HA_ERR_FOUND_DUPP_KEY;
1038           goto error;
1039         }
1040       }
1041     }
1042 #endif
1043     else
1044     {
1045       if (temp_auto > share->archive_write.auto_increment)
1046         stats.auto_increment_value=
1047           (share->archive_write.auto_increment= temp_auto) + 1;
1048     }
1049   }
1050 
1051   /*
1052     Notice that the global auto_increment has been increased.
1053     In case of a failed row write, we will never try to reuse the value.
1054   */
1055   share->rows_recorded++;
1056   rc= real_write_row(buf,  &(share->archive_write));
1057 error:
1058   mysql_mutex_unlock(&share->mutex);
1059   my_free(read_buf);
1060   DBUG_RETURN(rc);
1061 }
1062 
1063 
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1064 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1065                                     ulonglong nb_desired_values,
1066                                     ulonglong *first_value,
1067                                     ulonglong *nb_reserved_values)
1068 {
1069   *nb_reserved_values= ULONGLONG_MAX;
1070   *first_value= share->archive_write.auto_increment + 1;
1071 }
1072 
1073 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1074 int ha_archive::index_init(uint keynr, bool sorted)
1075 {
1076   DBUG_ENTER("ha_archive::index_init");
1077   active_index= keynr;
1078   DBUG_RETURN(0);
1079 }
1080 
1081 
1082 /*
1083   No indexes, so if we get a request for an index search since we tell
1084   the optimizer that we have unique indexes, we scan
1085 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1086 int ha_archive::index_read(uchar *buf, const uchar *key,
1087                              uint key_len, enum ha_rkey_function find_flag)
1088 {
1089   int rc;
1090   DBUG_ENTER("ha_archive::index_read");
1091   rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1092   DBUG_RETURN(rc);
1093 }
1094 
1095 
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1096 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1097                                  uint key_len, enum ha_rkey_function find_flag)
1098 {
1099   int rc;
1100   bool found= 0;
1101   KEY *mkey= &table->s->key_info[index];
1102   current_k_offset= mkey->key_part->offset;
1103   current_key= key;
1104   current_key_len= key_len;
1105 
1106 
1107   DBUG_ENTER("ha_archive::index_read_idx");
1108 
1109   rc= rnd_init(TRUE);
1110 
1111   if (rc)
1112     goto error;
1113 
1114   while (!(get_row(&archive, buf)))
1115   {
1116     if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1117     {
1118       found= 1;
1119       break;
1120     }
1121   }
1122 
1123   if (found)
1124   {
1125     /* notify handler that a record has been found */
1126     table->status= 0;
1127     DBUG_RETURN(0);
1128   }
1129 
1130 error:
1131   DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1132 }
1133 
1134 
index_next(uchar * buf)1135 int ha_archive::index_next(uchar * buf)
1136 {
1137   bool found= 0;
1138   int rc;
1139 
1140   DBUG_ENTER("ha_archive::index_next");
1141 
1142   while (!(get_row(&archive, buf)))
1143   {
1144     if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1145     {
1146       found= 1;
1147       break;
1148     }
1149   }
1150 
1151   rc= found ? 0 : HA_ERR_END_OF_FILE;
1152   DBUG_RETURN(rc);
1153 }
1154 
1155 /*
1156   All calls that need to scan the table start with this method. If we are told
1157   that it is a table scan we rewind the file to the beginning, otherwise
1158   we assume the position will be set.
1159 */
1160 
rnd_init(bool scan)1161 int ha_archive::rnd_init(bool scan)
1162 {
1163   DBUG_ENTER("ha_archive::rnd_init");
1164 
1165   if (share->crashed)
1166       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1167 
1168   if (init_archive_reader())
1169       DBUG_RETURN(errno);
1170 
1171   /* We rewind the file so that we can read from the beginning if scan */
1172   if (scan)
1173   {
1174     scan_rows= stats.records;
1175     DBUG_PRINT("info", ("archive will retrieve %llu rows",
1176                         (unsigned long long) scan_rows));
1177 
1178     if (read_data_header(&archive))
1179       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1180   }
1181 
1182   DBUG_RETURN(0);
1183 }
1184 
1185 
1186 /*
1187   This is the method that is used to read a row. It assumes that the row is
1188   positioned where you want it.
1189 */
get_row(azio_stream * file_to_read,uchar * buf)1190 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1191 {
1192   int rc;
1193   DBUG_ENTER("ha_archive::get_row");
1194   DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1195                             (uchar)file_to_read->version,
1196                             ARCHIVE_VERSION));
1197   if (file_to_read->version == ARCHIVE_VERSION)
1198     rc= get_row_version3(file_to_read, buf);
1199   else
1200     rc= get_row_version2(file_to_read, buf);
1201 
1202   DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1203 
1204   DBUG_RETURN(rc);
1205 }
1206 
1207 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1208 bool ha_archive::fix_rec_buff(unsigned int length)
1209 {
1210   DBUG_ENTER("ha_archive::fix_rec_buff");
1211   DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1212                             length, record_buffer->length));
1213   DBUG_ASSERT(record_buffer->buffer);
1214 
1215   if (length > record_buffer->length)
1216   {
1217     uchar *newptr;
1218     if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1219                                     length,
1220 				    MYF(MY_ALLOW_ZERO_PTR))))
1221       DBUG_RETURN(1);
1222     record_buffer->buffer= newptr;
1223     record_buffer->length= length;
1224   }
1225 
1226   DBUG_ASSERT(length <= record_buffer->length);
1227 
1228   DBUG_RETURN(0);
1229 }
1230 
unpack_row(azio_stream * file_to_read,uchar * record)1231 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1232 {
1233   DBUG_ENTER("ha_archive::unpack_row");
1234 
1235   unsigned int read;
1236   int error;
1237   uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1238   unsigned int row_len;
1239 
1240   /* First we grab the length stored */
1241   read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1242 
1243   if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1244     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1245 
1246   /* If we read nothing we are at the end of the file */
1247   if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1248     DBUG_RETURN(HA_ERR_END_OF_FILE);
1249 
1250   row_len=  uint4korr(size_buffer);
1251   DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1252                            (unsigned int)table->s->reclength));
1253 
1254   if (fix_rec_buff(row_len))
1255   {
1256     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1257   }
1258   DBUG_ASSERT(row_len <= record_buffer->length);
1259 
1260   read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1261 
1262   if (read != row_len || error)
1263   {
1264     DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD);
1265   }
1266 
1267   /* Copy null bits */
1268   const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len;
1269   memcpy(record, ptr, table->s->null_bytes);
1270   ptr+= table->s->null_bytes;
1271   if (ptr > end)
1272     DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1273   for (Field **field=table->field ; *field ; field++)
1274   {
1275     if (!((*field)->is_null_in_record(record)))
1276     {
1277       if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]),
1278                                   ptr, end)))
1279         DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1280     }
1281   }
1282   if (ptr != end)
1283     DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1284   DBUG_RETURN(0);
1285 }
1286 
1287 
get_row_version3(azio_stream * file_to_read,uchar * buf)1288 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1289 {
1290   DBUG_ENTER("ha_archive::get_row_version3");
1291 
1292   int returnable= unpack_row(file_to_read, buf);
1293 
1294   DBUG_RETURN(returnable);
1295 }
1296 
1297 
get_row_version2(azio_stream * file_to_read,uchar * buf)1298 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1299 {
1300   unsigned int read;
1301   int error;
1302   uint *ptr, *end;
1303   char *last;
1304   size_t total_blob_length= 0;
1305   MY_BITMAP *read_set= table->read_set;
1306   DBUG_ENTER("ha_archive::get_row_version2");
1307 
1308   read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1309 
1310   /* If we read nothing we are at the end of the file */
1311   if (read == 0)
1312     DBUG_RETURN(HA_ERR_END_OF_FILE);
1313 
1314   if (read != table->s->reclength)
1315   {
1316     DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1317                                                 read,
1318                                                 (unsigned int)table->s->reclength));
1319     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1320   }
1321 
1322   if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1323     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1324 
1325   /*
1326     If the record is the wrong size, the file is probably damaged, unless
1327     we are dealing with a delayed insert or a bulk insert.
1328   */
1329   if ((ulong) read != table->s->reclength)
1330     DBUG_RETURN(HA_ERR_END_OF_FILE);
1331 
1332   /* Calculate blob length, we use this for our buffer */
1333   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1334        ptr != end ;
1335        ptr++)
1336   {
1337     if (bitmap_is_set(read_set,
1338                       (((Field_blob*) table->field[*ptr])->field_index)))
1339         total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1340   }
1341 
1342   /* Adjust our row buffer if we need be */
1343   buffer.alloc(total_blob_length);
1344   last= (char *)buffer.ptr();
1345 
1346   /* Loop through our blobs and read them */
1347   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1348        ptr != end ;
1349        ptr++)
1350   {
1351     size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1352     if (size)
1353     {
1354       if (bitmap_is_set(read_set,
1355                         ((Field_blob*) table->field[*ptr])->field_index))
1356       {
1357         read= azread(file_to_read, last, size, &error);
1358 
1359         if (error)
1360           DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1361 
1362         if ((size_t) read != size)
1363           DBUG_RETURN(HA_ERR_END_OF_FILE);
1364         ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last);
1365         last += size;
1366       }
1367       else
1368       {
1369         (void)azseek(file_to_read, size, SEEK_CUR);
1370       }
1371     }
1372   }
1373   DBUG_RETURN(0);
1374 }
1375 
1376 
1377 /*
1378   Called during ORDER BY. Its position is either from being called sequentially
1379   or by having had ha_archive::rnd_pos() called before it is called.
1380 */
1381 
rnd_next(uchar * buf)1382 int ha_archive::rnd_next(uchar *buf)
1383 {
1384   int rc;
1385   DBUG_ENTER("ha_archive::rnd_next");
1386 
1387   if (share->crashed)
1388       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1389 
1390   if (!scan_rows)
1391   {
1392     rc= HA_ERR_END_OF_FILE;
1393     goto end;
1394   }
1395   scan_rows--;
1396 
1397   current_position= aztell(&archive);
1398   rc= get_row(&archive, buf);
1399 
1400 end:
1401   DBUG_RETURN(rc);
1402 }
1403 
1404 
1405 /*
1406   Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1407   each call to ha_archive::rnd_next() if an ordering of the rows is
1408   needed.
1409 */
1410 
position(const uchar * record)1411 void ha_archive::position(const uchar *record)
1412 {
1413   DBUG_ENTER("ha_archive::position");
1414   my_store_ptr(ref, ref_length, current_position);
1415   DBUG_VOID_RETURN;
1416 }
1417 
1418 
1419 /*
1420   This is called after a table scan for each row if the results of the
1421   scan need to be ordered. It will take *pos and use it to move the
1422   cursor in the file so that the next row that is called is the
1423   correctly ordered row.
1424 */
1425 
rnd_pos(uchar * buf,uchar * pos)1426 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1427 {
1428   int rc;
1429   DBUG_ENTER("ha_archive::rnd_pos");
1430   current_position= (my_off_t)my_get_ptr(pos, ref_length);
1431   if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1432   {
1433     rc= HA_ERR_CRASHED_ON_USAGE;
1434     goto end;
1435   }
1436   rc= get_row(&archive, buf);
1437 end:
1438   DBUG_RETURN(rc);
1439 }
1440 
1441 
1442 /**
1443   @brief Check for upgrade
1444 
1445   @param[in]  check_opt  check options
1446 
1447   @return Completion status
1448     @retval HA_ADMIN_OK            No upgrade required
1449     @retval HA_ADMIN_CORRUPT       Cannot read meta-data
1450     @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1451 */
1452 
check_for_upgrade(HA_CHECK_OPT * check_opt)1453 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1454 {
1455   DBUG_ENTER("ha_archive::check_for_upgrade");
1456   if (init_archive_reader())
1457     DBUG_RETURN(HA_ADMIN_CORRUPT);
1458   if (archive.version < ARCHIVE_VERSION)
1459     DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1460   DBUG_RETURN(HA_ADMIN_OK);
1461 }
1462 
1463 
1464 /*
1465   This method repairs the meta file. It does this by walking the datafile and
1466   rewriting the meta file. If EXTENDED repair is requested, we attempt to
1467   recover as much data as possible.
1468 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1469 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1470 {
1471   DBUG_ENTER("ha_archive::repair");
1472   int rc= optimize(thd, check_opt);
1473 
1474   if (rc)
1475     DBUG_RETURN(HA_ADMIN_CORRUPT);
1476 
1477   share->crashed= FALSE;
1478   DBUG_RETURN(0);
1479 }
1480 
1481 /*
1482   The table can become fragmented if data was inserted, read, and then
1483   inserted again. What we do is open up the file and recompress it completely.
1484 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1485 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1486 {
1487   int rc= 0;
1488   azio_stream writer;
1489   char writer_filename[FN_REFLEN];
1490   DBUG_ENTER("ha_archive::optimize");
1491 
1492   mysql_mutex_lock(&share->mutex);
1493 
1494   if (init_archive_reader())
1495   {
1496     mysql_mutex_unlock(&share->mutex);
1497     DBUG_RETURN(errno);
1498   }
1499 
1500   // now we close both our writer and our reader for the rename
1501   if (share->archive_write_open)
1502   {
1503     azclose(&(share->archive_write));
1504     share->archive_write_open= FALSE;
1505   }
1506 
1507   /* Lets create a file to contain the new data */
1508   fn_format(writer_filename, share->table_name, "", ARN,
1509             MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1510 
1511   if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1512   {
1513     mysql_mutex_unlock(&share->mutex);
1514     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1515   }
1516 
1517   /*
1518     Transfer the embedded FRM so that the file can be discoverable.
1519     Write file offset is set to the end of the file.
1520   */
1521   if ((rc= frm_copy(&archive, &writer)))
1522     goto error;
1523 
1524   /*
1525     An extended rebuild is a lot more effort. We open up each row and re-record it.
1526     Any dead rows are removed (aka rows that may have been partially recorded).
1527 
1528     As of Archive format 3, this is the only type that is performed, before this
1529     version it was just done on T_EXTEND
1530   */
1531   if (1)
1532   {
1533     DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1534 
1535     /*
1536       Now we will rewind the archive file so that we are positioned at the
1537       start of the file.
1538     */
1539     rc= read_data_header(&archive);
1540 
1541     /*
1542       On success of writing out the new header, we now fetch each row and
1543       insert it into the new archive file.
1544     */
1545     if (!rc)
1546     {
1547       share->rows_recorded= 0;
1548       stats.auto_increment_value= 1;
1549       share->archive_write.auto_increment= 0;
1550       MY_BITMAP *org_bitmap= tmp_use_all_columns(table, &table->read_set);
1551 
1552       while (!(rc= get_row(&archive, table->record[0])))
1553       {
1554         real_write_row(table->record[0], &writer);
1555         /*
1556           Long term it should be possible to optimize this so that
1557           it is not called on each row.
1558         */
1559         if (table->found_next_number_field)
1560         {
1561           Field *field= table->found_next_number_field;
1562           ulonglong auto_value=
1563             (ulonglong) field->val_int(table->record[0] +
1564                                        field->offset(table->record[0]));
1565           if (share->archive_write.auto_increment < auto_value)
1566             stats.auto_increment_value=
1567               (share->archive_write.auto_increment= auto_value) + 1;
1568         }
1569       }
1570 
1571       tmp_restore_column_map(&table->read_set, org_bitmap);
1572       share->rows_recorded= (ha_rows)writer.rows;
1573     }
1574 
1575     DBUG_PRINT("info", ("recovered %llu archive rows",
1576                         (unsigned long long)share->rows_recorded));
1577 
1578     DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1579                         (unsigned long long)share->rows_recorded));
1580 
1581     /*
1582       If REPAIR ... EXTENDED is requested, try to recover as much data
1583       from data file as possible. In this case if we failed to read a
1584       record, we assume EOF. This allows massive data loss, but we can
1585       hardly do more with broken zlib stream. And this is the only way
1586       to restore at least what is still recoverable.
1587     */
1588     if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1589       goto error;
1590   }
1591 
1592   azclose(&writer);
1593   share->dirty= FALSE;
1594 
1595   azclose(&archive);
1596 
1597   // make the file we just wrote be our data file
1598   rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1599 
1600 
1601   mysql_mutex_unlock(&share->mutex);
1602   DBUG_RETURN(rc);
1603 error:
1604   DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1605   azclose(&writer);
1606   mysql_mutex_unlock(&share->mutex);
1607 
1608   DBUG_RETURN(rc);
1609 }
1610 
1611 /*
1612   Below is an example of how to setup row level locking.
1613 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1614 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1615                                        THR_LOCK_DATA **to,
1616                                        enum thr_lock_type lock_type)
1617 {
1618   if (lock_type == TL_WRITE_DELAYED)
1619     delayed_insert= TRUE;
1620   else
1621     delayed_insert= FALSE;
1622 
1623   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1624   {
1625     /*
1626       Here is where we get into the guts of a row level lock.
1627       If TL_UNLOCK is set
1628       If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT
1629       TABLESPACE, then allow multiple writers
1630     */
1631 
1632     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1633          lock_type <= TL_WRITE) && delayed_insert == FALSE &&
1634         !thd_in_lock_tables(thd)
1635         && !thd_tablespace_op(thd))
1636       lock_type = TL_WRITE_ALLOW_WRITE;
1637 
1638     /*
1639       In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1640       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1641       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1642       to t2. Convert the lock to a normal read lock to allow
1643       concurrent inserts to t2.
1644     */
1645 
1646     if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1647       lock_type = TL_READ;
1648 
1649     lock.type=lock_type;
1650   }
1651 
1652   *to++= &lock;
1653 
1654   return to;
1655 }
1656 
update_create_info(HA_CREATE_INFO * create_info)1657 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1658 {
1659   char tmp_real_path[FN_REFLEN];
1660   DBUG_ENTER("ha_archive::update_create_info");
1661 
1662   ha_archive::info(HA_STATUS_AUTO);
1663   if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1664   {
1665     create_info->auto_increment_value= stats.auto_increment_value;
1666   }
1667 
1668   if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1669     create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path);
1670 
1671   DBUG_VOID_RETURN;
1672 }
1673 
1674 /*
1675   Hints for optimizer, see ha_tina for more information
1676 */
info(uint flag)1677 int ha_archive::info(uint flag)
1678 {
1679   DBUG_ENTER("ha_archive::info");
1680 
1681   flush_and_clear_pending_writes();
1682   stats.deleted= 0;
1683 
1684   DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1685   /* Costs quite a bit more to get all information */
1686   if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1687   {
1688     MY_STAT file_stat;  // Stat information for the data file
1689 
1690     (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME));
1691 
1692     if (flag & HA_STATUS_TIME)
1693       stats.update_time= (ulong) file_stat.st_mtime;
1694     if (flag & HA_STATUS_CONST)
1695     {
1696       stats.max_data_file_length= MAX_FILE_SIZE;
1697       stats.create_time= (ulong) file_stat.st_ctime;
1698     }
1699     if (flag & HA_STATUS_VARIABLE)
1700     {
1701       stats.delete_length= 0;
1702       stats.data_file_length= file_stat.st_size;
1703       stats.index_file_length=0;
1704       stats.mean_rec_length= stats.records ?
1705         ulong(stats.data_file_length / stats.records) : table->s->reclength;
1706     }
1707   }
1708 
1709   if (flag & HA_STATUS_AUTO)
1710   {
1711     if (init_archive_reader())
1712       DBUG_RETURN(errno);
1713 
1714     mysql_mutex_lock(&share->mutex);
1715     azflush(&archive, Z_SYNC_FLUSH);
1716     mysql_mutex_unlock(&share->mutex);
1717     stats.auto_increment_value= archive.auto_increment + 1;
1718   }
1719 
1720   DBUG_RETURN(0);
1721 }
1722 
1723 
external_lock(THD * thd,int lock_type)1724 int ha_archive::external_lock(THD *thd, int lock_type)
1725 {
1726   if (lock_type == F_RDLCK)
1727   {
1728     // We are going to read from the table. Flush any pending writes that we
1729     // may have
1730     flush_and_clear_pending_writes();
1731   }
1732   return 0;
1733 }
1734 
1735 
flush_and_clear_pending_writes()1736 void ha_archive::flush_and_clear_pending_writes()
1737 {
1738   mysql_mutex_lock(&share->mutex);
1739   if (share->dirty)
1740   {
1741     DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1742     DBUG_ASSERT(share->archive_write_open);
1743     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1744     share->dirty= FALSE;
1745   }
1746 
1747   /*
1748     This should be an accurate number now, though bulk and delayed inserts can
1749     cause the number to be inaccurate.
1750   */
1751   stats.records= share->rows_recorded;
1752   mysql_mutex_unlock(&share->mutex);
1753 }
1754 
1755 
1756 /*
1757   This method tells us that a bulk insert operation is about to occur. We set
1758   a flag which will keep write_row from saying that its data is dirty. This in
1759   turn will keep selects from causing a sync to occur.
1760   Basically, yet another optimizations to keep compression working well.
1761 */
start_bulk_insert(ha_rows rows,uint flags)1762 void ha_archive::start_bulk_insert(ha_rows rows, uint flags)
1763 {
1764   DBUG_ENTER("ha_archive::start_bulk_insert");
1765   if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1766     bulk_insert= TRUE;
1767   DBUG_VOID_RETURN;
1768 }
1769 
1770 
1771 /*
1772   Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1773   flag, and set the share dirty so that the next select will call sync for us.
1774 */
end_bulk_insert()1775 int ha_archive::end_bulk_insert()
1776 {
1777   DBUG_ENTER("ha_archive::end_bulk_insert");
1778   bulk_insert= FALSE;
1779   mysql_mutex_lock(&share->mutex);
1780   if (share->archive_write_open)
1781     share->dirty= true;
1782   mysql_mutex_unlock(&share->mutex);
1783   DBUG_RETURN(0);
1784 }
1785 
1786 /*
1787   We cancel a truncate command. The only way to delete an archive table is to drop it.
1788   This is done for security reasons. In a later version we will enable this by
1789   allowing the user to select a different row format.
1790 */
truncate()1791 int ha_archive::truncate()
1792 {
1793   DBUG_ENTER("ha_archive::truncate");
1794   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1795 }
1796 
1797 /*
1798   We just return state if asked.
1799 */
is_crashed() const1800 bool ha_archive::is_crashed() const
1801 {
1802   DBUG_ENTER("ha_archive::is_crashed");
1803   DBUG_RETURN(share->crashed);
1804 }
1805 
1806 /*
1807   Simple scan of the tables to make sure everything is ok.
1808 */
1809 
check(THD * thd,HA_CHECK_OPT * check_opt)1810 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1811 {
1812   int rc= 0;
1813   const char *old_proc_info;
1814   ha_rows count;
1815   DBUG_ENTER("ha_archive::check");
1816 
1817   old_proc_info= thd_proc_info(thd, "Checking table");
1818   mysql_mutex_lock(&share->mutex);
1819   count= share->rows_recorded;
1820   /* Flush any waiting data */
1821   if (share->archive_write_open)
1822     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1823   mysql_mutex_unlock(&share->mutex);
1824 
1825   if (init_archive_reader())
1826     DBUG_RETURN(HA_ADMIN_CORRUPT);
1827   /*
1828     Now we will rewind the archive file so that we are positioned at the
1829     start of the file.
1830   */
1831   read_data_header(&archive);
1832   for (ha_rows cur_count= count; cur_count; cur_count--)
1833   {
1834     if ((rc= get_row(&archive, table->record[0])))
1835       goto error;
1836   }
1837   /*
1838     Now read records that may have been inserted concurrently.
1839     Acquire share->mutex so tail of the table is not modified by
1840     concurrent writers.
1841   */
1842   mysql_mutex_lock(&share->mutex);
1843   count= share->rows_recorded - count;
1844   if (share->archive_write_open)
1845     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1846   while (!(rc= get_row(&archive, table->record[0])))
1847     count--;
1848   mysql_mutex_unlock(&share->mutex);
1849 
1850   if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1851     goto error;
1852 
1853   thd_proc_info(thd, old_proc_info);
1854   DBUG_RETURN(HA_ADMIN_OK);
1855 
1856 error:
1857   thd_proc_info(thd, old_proc_info);
1858   share->crashed= FALSE;
1859   DBUG_RETURN(HA_ADMIN_CORRUPT);
1860 }
1861 
1862 /*
1863   Check and repair the table if needed.
1864 */
check_and_repair(THD * thd)1865 bool ha_archive::check_and_repair(THD *thd)
1866 {
1867   HA_CHECK_OPT check_opt;
1868   DBUG_ENTER("ha_archive::check_and_repair");
1869 
1870   check_opt.init();
1871 
1872   DBUG_RETURN(repair(thd, &check_opt));
1873 }
1874 
create_record_buffer(unsigned int length)1875 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1876 {
1877   DBUG_ENTER("ha_archive::create_record_buffer");
1878   archive_record_buffer *r;
1879   if (!(r=
1880         (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1881                                            MYF(MY_WME))))
1882   {
1883     DBUG_RETURN(NULL); /* purecov: inspected */
1884   }
1885   r->length= (int)length;
1886 
1887   if (!(r->buffer= (uchar*) my_malloc(r->length,
1888                                     MYF(MY_WME))))
1889   {
1890     my_free(r);
1891     DBUG_RETURN(NULL); /* purecov: inspected */
1892   }
1893 
1894   DBUG_RETURN(r);
1895 }
1896 
destroy_record_buffer(archive_record_buffer * r)1897 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1898 {
1899   DBUG_ENTER("ha_archive::destroy_record_buffer");
1900   my_free(r->buffer);
1901   my_free(r);
1902   DBUG_VOID_RETURN;
1903 }
1904 
1905 /*
1906   In archive *any* ALTER should cause a table to be rebuilt,
1907   no ALTER can be frm-only.
1908   Because after any change to the frm file archive must update the
1909   frm image in the ARZ file. And this cannot be done in-place, it
1910   requires ARZ file to be recreated from scratch
1911 */
check_if_incompatible_data(HA_CREATE_INFO * info_arg,uint table_changes)1912 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg,
1913                                             uint table_changes)
1914 {
1915   return COMPATIBLE_DATA_NO;
1916 }
1917 
1918 
1919 struct st_mysql_storage_engine archive_storage_engine=
1920 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1921 
maria_declare_plugin(archive)1922 maria_declare_plugin(archive)
1923 {
1924   MYSQL_STORAGE_ENGINE_PLUGIN,
1925   &archive_storage_engine,
1926   "ARCHIVE",
1927   "Brian Aker, MySQL AB",
1928   "gzip-compresses tables for a low storage footprint",
1929   PLUGIN_LICENSE_GPL,
1930   archive_db_init, /* Plugin Init */
1931   NULL, /* Plugin Deinit */
1932   0x0300 /* 3.0 */,
1933   NULL,                       /* status variables                */
1934   NULL,                       /* system variables                */
1935   "1.0",                      /* string version */
1936   MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
1937 }
1938 maria_declare_plugin_end;
1939 
1940