1 /*
2    Copyright (c) 2004, 2014, Oracle and/or its affiliates
3    Copyright (c) 2010, 2014, SkySQL Ab.
4 
5    This program is free software; you can redistribute it and/or
6    modify it under the terms of the GNU General Public License
7    as published by the Free Software Foundation; version 2 of
8    the License.
9 
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, write to the Free Software
17    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA
18 */
19 
20 #ifdef USE_PRAGMA_IMPLEMENTATION
21 #pragma implementation        // gcc: Class implementation
22 #endif
23 
24 #include <my_global.h>
25 #include "sql_class.h"                          // SSV
26 #include "sql_table.h"                          // build_table_filename
27 #include <myisam.h>                             // T_EXTEND
28 
29 #include "ha_archive.h"
30 #include "discover.h"
31 #include <my_dir.h>
32 
33 #include <mysql/plugin.h>
34 
35 /*
36   First, if you want to understand storage engines you should look at
37   ha_example.cc and ha_example.h.
38 
39   This example was written as a test case for a customer who needed
40   a storage engine without indexes that could compress data very well.
41   So, welcome to a completely compressed storage engine. This storage
42   engine only does inserts. No replace, deletes, or updates. All reads are
43   complete table scans. Compression is done through a combination of packing
44   and making use of the zlib library
45 
46   We keep a file pointer open for each instance of ha_archive for each read
47   but for writes we keep one open file handle just for that. We flush it
48   only if we have a read occur. azip handles compressing lots of records
49   at once much better then doing lots of little records between writes.
50   It is possible to not lock on writes but this would then mean we couldn't
51   handle bulk inserts as well (that is if someone was trying to read at
52   the same time since we would want to flush).
53 
54   A "meta" file is kept alongside the data file. This file serves two purpose.
55   The first purpose is to track the number of rows in the table. The second
56   purpose is to determine if the table was closed properly or not. When the
57   meta file is first opened it is marked as dirty. It is opened when the table
58   itself is opened for writing. When the table is closed the new count for rows
59   is written to the meta file and the file is marked as clean. If the meta file
60   is opened and it is marked as dirty, it is assumed that a crash occurred. At
61   this point an error occurs and the user is told to rebuild the file.
62   A rebuild scans the rows and rewrites the meta file. If corruption is found
63   in the data file then the meta file is not repaired.
64 
65   At some point a recovery method for such a drastic case needs to be divised.
66 
67   Locks are row level, and you will get a consistant read.
68 
69   For performance as far as table scans go it is quite fast. I don't have
70   good numbers but locally it has out performed both Innodb and MyISAM. For
71   Innodb the question will be if the table can be fit into the buffer
72   pool. For MyISAM its a question of how much the file system caches the
73   MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
74   doesn't have enough memory to cache entire table that archive turns out
75   to be any faster.
76 
77   Examples between MyISAM (packed) and Archive.
78 
79   Table with 76695844 identical rows:
80   29680807 a_archive.ARZ
81   920350317 a.MYD
82 
83 
84   Table with 8991478 rows (all of Slashdot's comments):
85   1922964506 comment_archive.ARZ
86   2944970297 comment_text.MYD
87 
88 
89   TODO:
90    Allow users to set compression level.
91    Allow adjustable block size.
92    Implement versioning, should be easy.
93    Allow for errors, find a way to mark bad rows.
94    Add optional feature so that rows can be flushed at interval (which will cause less
95      compression but may speed up ordered searches).
96    Checkpoint the meta file to allow for faster rebuilds.
97    Option to allow for dirty reads, this would lower the sync calls, which would make
98      inserts a lot faster, but would mean highly arbitrary reads.
99 
100     -Brian
101 
102   Archive file format versions:
103   <5.1.5 - v.1
104   5.1.5-5.1.15 - v.2
105   >5.1.15 - v.3
106 */
107 
108 /* The file extension */
109 #define ARZ ".ARZ"               // The data file
110 #define ARN ".ARN"               // Files used during an optimize call
111 #define ARM ".ARM"               // Meta file (deprecated)
112 
113 /* 5.0 compatibility */
114 #define META_V1_OFFSET_CHECK_HEADER  0
115 #define META_V1_OFFSET_VERSION       1
116 #define META_V1_OFFSET_ROWS_RECORDED 2
117 #define META_V1_OFFSET_CHECK_POINT   10
118 #define META_V1_OFFSET_CRASHED       18
119 #define META_V1_LENGTH               19
120 
121 /*
122   uchar + uchar
123 */
124 #define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
125 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
126 
127 #ifdef HAVE_PSI_INTERFACE
128 extern "C" PSI_file_key arch_key_file_data;
129 #endif
130 
131 /* Static declarations for handerton */
132 static handler *archive_create_handler(handlerton *hton,
133                                        TABLE_SHARE *table,
134                                        MEM_ROOT *mem_root);
135 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share);
136 
137 /*
138   Number of rows that will force a bulk insert.
139 */
140 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
141 
142 /*
143   Size of header used for row
144 */
145 #define ARCHIVE_ROW_HEADER_SIZE 4
146 
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)147 static handler *archive_create_handler(handlerton *hton,
148                                        TABLE_SHARE *table,
149                                        MEM_ROOT *mem_root)
150 {
151   return new (mem_root) ha_archive(hton, table);
152 }
153 
154 #ifdef HAVE_PSI_INTERFACE
155 PSI_mutex_key az_key_mutex_Archive_share_mutex;
156 
157 static PSI_mutex_info all_archive_mutexes[]=
158 {
159   { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
160 };
161 
162 PSI_file_key arch_key_file_metadata, arch_key_file_data;
163 static PSI_file_info all_archive_files[]=
164 {
165     { &arch_key_file_metadata, "metadata", 0},
166     { &arch_key_file_data, "data", 0}
167 };
168 
init_archive_psi_keys(void)169 static void init_archive_psi_keys(void)
170 {
171   const char* category= "archive";
172   int count;
173 
174   if (!PSI_server)
175     return;
176 
177   count= array_elements(all_archive_mutexes);
178   mysql_mutex_register(category, all_archive_mutexes, count);
179 
180   count= array_elements(all_archive_files);
181   mysql_file_register(category, all_archive_files, count);
182 }
183 
184 #endif /* HAVE_PSI_INTERFACE */
185 
186 /*
187   Initialize the archive handler.
188 
189   SYNOPSIS
190     archive_db_init()
191     void *
192 
193   RETURN
194     FALSE       OK
195     TRUE        Error
196 */
197 
198 /*
199   We just implement one additional file extension.
200   ARM is here just to properly drop 5.0 tables.
201 */
202 static const char *ha_archive_exts[] = {
203   ARZ,
204   ARM,
205   NullS
206 };
207 
archive_db_init(void * p)208 int archive_db_init(void *p)
209 {
210   DBUG_ENTER("archive_db_init");
211   handlerton *archive_hton;
212 
213 #ifdef HAVE_PSI_INTERFACE
214   init_archive_psi_keys();
215 #endif
216 
217   archive_hton= (handlerton *)p;
218   archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
219   archive_hton->create= archive_create_handler;
220   archive_hton->flags= HTON_NO_FLAGS;
221   archive_hton->discover_table= archive_discover;
222   archive_hton->tablefile_extensions= ha_archive_exts;
223 
224   DBUG_RETURN(0);
225 }
226 
227 
Archive_share()228 Archive_share::Archive_share()
229 {
230   crashed= false;
231   in_optimize= false;
232   archive_write_open= false;
233   dirty= false;
234   DBUG_PRINT("ha_archive", ("Archive_share: %p",
235                             this));
236   thr_lock_init(&lock);
237   /*
238     We will use this lock for rows.
239   */
240   mysql_mutex_init(az_key_mutex_Archive_share_mutex,
241                    &mutex, MY_MUTEX_INIT_FAST);
242 }
243 
244 
~Archive_share()245 Archive_share::~Archive_share()
246 {
247   DBUG_PRINT("ha_archive", ("~Archive_share: %p", this));
248   if (archive_write_open)
249   {
250     mysql_mutex_lock(&mutex);
251     (void) close_archive_writer();              // Will reset archive_write_open
252     mysql_mutex_unlock(&mutex);
253   }
254   thr_lock_delete(&lock);
255   mysql_mutex_destroy(&mutex);
256 }
257 
258 
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)259 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
260   :handler(hton, table_arg), delayed_insert(0), bulk_insert(0)
261 {
262   /* Set our original buffer from pre-allocated memory */
263   buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
264 
265   /* The size of the offset value we will use for position() */
266   ref_length= sizeof(my_off_t);
267   archive_reader_open= FALSE;
268 }
269 
archive_discover(handlerton * hton,THD * thd,TABLE_SHARE * share)270 int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
271 {
272   DBUG_ENTER("archive_discover");
273   DBUG_PRINT("archive_discover", ("db: '%s'  name: '%s'", share->db.str,
274                                   share->table_name.str));
275   azio_stream frm_stream;
276   char az_file[FN_REFLEN];
277   uchar *frm_ptr;
278   MY_STAT file_stat;
279 
280   strxmov(az_file, share->normalized_path.str, ARZ, NullS);
281 
282   if (!(mysql_file_stat(/* arch_key_file_data */ 0, az_file, &file_stat, MYF(0))))
283     DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
284 
285   if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
286   {
287     if (errno == EROFS || errno == EACCES)
288       DBUG_RETURN(my_errno= errno);
289     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
290   }
291 
292   if (frm_stream.frm_length == 0)
293     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
294 
295   frm_ptr= (uchar *)my_malloc(PSI_INSTRUMENT_ME, frm_stream.frm_length,
296                               MYF(MY_THREAD_SPECIFIC | MY_WME));
297   if (!frm_ptr)
298     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
299 
300   if (azread_frm(&frm_stream, frm_ptr))
301     goto ret;
302 
303   azclose(&frm_stream);
304 
305   my_errno= share->init_from_binary_frm_image(thd, 1,
306                                               frm_ptr, frm_stream.frm_length);
307 ret:
308   my_free(frm_ptr);
309   DBUG_RETURN(my_errno);
310 }
311 
312 /**
313   @brief Read version 1 meta file (5.0 compatibility routine).
314 
315   @return Completion status
316     @retval  0 Success
317     @retval !0 Failure
318 */
319 
read_v1_metafile()320 int Archive_share::read_v1_metafile()
321 {
322   char file_name[FN_REFLEN];
323   uchar buf[META_V1_LENGTH];
324   File fd;
325   DBUG_ENTER("Archive_share::read_v1_metafile");
326 
327   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
328   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
329     DBUG_RETURN(-1);
330 
331   if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
332   {
333     mysql_file_close(fd, MYF(0));
334     DBUG_RETURN(-1);
335   }
336 
337   rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
338   crashed= buf[META_V1_OFFSET_CRASHED];
339   mysql_file_close(fd, MYF(0));
340   DBUG_RETURN(0);
341 }
342 
343 
344 /**
345   @brief Write version 1 meta file (5.0 compatibility routine).
346 
347   @return Completion status
348     @retval  0 Success
349     @retval !0 Failure
350 */
351 
write_v1_metafile()352 int Archive_share::write_v1_metafile()
353 {
354   char file_name[FN_REFLEN];
355   uchar buf[META_V1_LENGTH];
356   File fd;
357   DBUG_ENTER("Archive_share::write_v1_metafile");
358 
359   buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
360   buf[META_V1_OFFSET_VERSION]= 1;
361   int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
362   int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
363   buf[META_V1_OFFSET_CRASHED]= crashed;
364 
365   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
366   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
367     DBUG_RETURN(-1);
368 
369   if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
370   {
371     mysql_file_close(fd, MYF(0));
372     DBUG_RETURN(-1);
373   }
374 
375   mysql_file_close(fd, MYF(0));
376   DBUG_RETURN(0);
377 }
378 
379 /**
380   @brief Pack version 1 row (5.0 compatibility routine).
381 
382   @param[in]  record  the record to pack
383 
384   @return Length of packed row
385 */
386 
pack_row_v1(const uchar * record)387 unsigned int ha_archive::pack_row_v1(const uchar *record)
388 {
389   uint *blob, *end;
390   uchar *pos;
391   DBUG_ENTER("pack_row_v1");
392   memcpy(record_buffer->buffer, record, table->s->reclength);
393 
394   /*
395     The end of VARCHAR fields are filled with garbage,so here
396     we explicitly set the end of the VARCHAR fields with zeroes
397   */
398 
399   for (Field** field= table->field; (*field) ; field++)
400   {
401     Field *fld= *field;
402     if (fld->type() == MYSQL_TYPE_VARCHAR)
403     {
404       if (!(fld->is_real_null(record - table->record[0])))
405       {
406         ptrdiff_t  start= (fld->ptr - table->record[0]);
407         Field_varstring *const field_var= (Field_varstring *)fld;
408         uint offset= field_var->data_length() + field_var->length_size();
409         memset(record_buffer->buffer + start + offset, 0,
410                fld->field_length - offset + 1);
411       }
412     }
413   }
414   pos= record_buffer->buffer + table->s->reclength;
415   for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
416        blob != end; blob++)
417   {
418     uint32 length= ((Field_blob *) table->field[*blob])->get_length();
419     if (length)
420     {
421       uchar *data_ptr= ((Field_blob *) table->field[*blob])->get_ptr();
422       memcpy(pos, data_ptr, length);
423       pos+= length;
424     }
425   }
426   DBUG_RETURN((int)(pos - record_buffer->buffer));
427 }
428 
429 /*
430   This method reads the header of a datafile and returns whether or not it was successful.
431 */
read_data_header(azio_stream * file_to_read)432 int ha_archive::read_data_header(azio_stream *file_to_read)
433 {
434   int error;
435   unsigned long ret;
436   uchar data_buffer[DATA_BUFFER_SIZE];
437   DBUG_ENTER("ha_archive::read_data_header");
438 
439   if (azrewind(file_to_read) == -1)
440     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
441 
442   if (file_to_read->version >= 3)
443     DBUG_RETURN(0);
444   /* Everything below this is just legacy to version 2< */
445 
446   DBUG_PRINT("ha_archive", ("Reading legacy data header"));
447 
448   ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
449 
450   if (ret != DATA_BUFFER_SIZE)
451   {
452     DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
453                               DATA_BUFFER_SIZE, ret));
454     DBUG_RETURN(1);
455   }
456 
457   if (error)
458   {
459     DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
460     DBUG_RETURN(1);
461   }
462 
463   DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
464   DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
465 
466   if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
467       (data_buffer[1] == 1 || data_buffer[1] == 2))
468     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
469 
470   DBUG_RETURN(0);
471 }
472 
473 
474 /*
475   We create the shared memory space that we will use for the open table.
476   No matter what we try to get or create a share. This is so that a repair
477   table operation can occur.
478 
479   See ha_example.cc for a longer description.
480 */
get_share(const char * table_name,int * rc)481 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
482 {
483   Archive_share *tmp_share;
484 
485   DBUG_ENTER("ha_archive::get_share");
486 
487   lock_shared_ha_data();
488   if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
489   {
490     azio_stream archive_tmp;
491 
492     tmp_share= new Archive_share;
493 
494     if (!tmp_share)
495     {
496       *rc= HA_ERR_OUT_OF_MEM;
497       goto err;
498     }
499     DBUG_PRINT("ha_archive", ("new Archive_share: %p",
500                               tmp_share));
501 
502     fn_format(tmp_share->data_file_name, table_name, "",
503               ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
504     strmov(tmp_share->table_name, table_name);
505     DBUG_PRINT("ha_archive", ("Data File %s",
506                         tmp_share->data_file_name));
507 
508     /*
509       We read the meta file, but do not mark it dirty. Since we are not
510       doing a write we won't mark it dirty (and we won't open it for
511       anything but reading... open it for write and we will generate null
512       compression writes).
513     */
514     if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
515     {
516       delete tmp_share;
517       *rc= my_errno ? my_errno : HA_ERR_CRASHED;
518       tmp_share= NULL;
519       goto err;
520     }
521     stats.auto_increment_value= archive_tmp.auto_increment + 1;
522     tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
523     tmp_share->crashed= archive_tmp.dirty;
524     share= tmp_share;
525     if (archive_tmp.version == 1)
526       share->read_v1_metafile();
527     else if (frm_compare(&archive_tmp))
528       *rc= HA_ERR_TABLE_DEF_CHANGED;
529 
530     azclose(&archive_tmp);
531 
532     set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
533   }
534   if (tmp_share->crashed)
535     *rc= HA_ERR_CRASHED_ON_USAGE;
536 err:
537   unlock_shared_ha_data();
538 
539   DBUG_ASSERT(tmp_share || *rc);
540 
541   DBUG_RETURN(tmp_share);
542 }
543 
544 
init_archive_writer()545 int Archive_share::init_archive_writer()
546 {
547   DBUG_ENTER("Archive_share::init_archive_writer");
548   /*
549     It is expensive to open and close the data files and since you can't have
550     a gzip file that can be both read and written we keep a writer open
551     that is shared amoung all open tables.
552   */
553   if (!(azopen(&archive_write, data_file_name,
554                O_RDWR|O_BINARY)))
555   {
556     DBUG_PRINT("ha_archive", ("Could not open archive write file"));
557     crashed= true;
558     DBUG_RETURN(1);
559   }
560   archive_write_open= true;
561 
562   DBUG_RETURN(0);
563 }
564 
565 
close_archive_writer()566 void Archive_share::close_archive_writer()
567 {
568   mysql_mutex_assert_owner(&mutex);
569   if (archive_write_open)
570   {
571     if (archive_write.version == 1)
572       (void) write_v1_metafile();
573     azclose(&archive_write);
574     archive_write_open= false;
575     dirty= false;
576   }
577 }
578 
579 
580 /*
581   No locks are required because it is associated with just one handler instance
582 */
init_archive_reader()583 int ha_archive::init_archive_reader()
584 {
585   DBUG_ENTER("ha_archive::init_archive_reader");
586   /*
587     It is expensive to open and close the data files and since you can't have
588     a gzip file that can be both read and written we keep a writer open
589     that is shared amoung all open tables, but have one reader open for
590     each handler instance.
591   */
592   if (!archive_reader_open)
593   {
594     if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
595     {
596       DBUG_PRINT("ha_archive", ("Could not open archive read file"));
597       share->crashed= TRUE;
598       DBUG_RETURN(1);
599     }
600     archive_reader_open= TRUE;
601   }
602 
603   DBUG_RETURN(0);
604 }
605 
606 
607 /*
608   When opening a file we:
609   Create/get our shared structure.
610   Init out lock.
611   We open the file we will read from.
612 */
open(const char * name,int mode,uint open_options)613 int ha_archive::open(const char *name, int mode, uint open_options)
614 {
615   int rc= 0;
616   DBUG_ENTER("ha_archive::open");
617 
618   DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
619                       (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
620   share= get_share(name, &rc);
621   if (!share)
622     DBUG_RETURN(rc);
623 
624   /* Allow open on crashed table in repair mode only. */
625   switch (rc)
626   {
627   case 0:
628     break;
629   case HA_ERR_TABLE_DEF_CHANGED:
630   case HA_ERR_CRASHED_ON_USAGE:
631     if (open_options & HA_OPEN_FOR_REPAIR)
632     {
633       rc= 0;
634       break;
635     }
636     /* fall through */
637   default:
638     DBUG_RETURN(rc);
639   }
640 
641   DBUG_ASSERT(share);
642 
643   record_buffer= create_record_buffer(table->s->reclength +
644                                       ARCHIVE_ROW_HEADER_SIZE);
645 
646   if (!record_buffer)
647     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
648 
649   thr_lock_data_init(&share->lock, &lock, NULL);
650 
651   DBUG_PRINT("ha_archive", ("archive table was crashed %s",
652                       rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
653   if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
654   {
655     DBUG_RETURN(0);
656   }
657 
658   DBUG_RETURN(rc);
659 }
660 
661 
662 /*
663   Closes the file.
664 
665   SYNOPSIS
666     close();
667 
668   IMPLEMENTATION:
669 
670   We first close this storage engines file handle to the archive and
671   then remove our reference count to the table (and possibly free it
672   as well).
673 
674   RETURN
675     0  ok
676     1  Error
677 */
678 
close(void)679 int ha_archive::close(void)
680 {
681   int rc= 0;
682   DBUG_ENTER("ha_archive::close");
683 
684   destroy_record_buffer(record_buffer);
685 
686   /* First close stream */
687   if (archive_reader_open)
688   {
689     if (azclose(&archive))
690       rc= 1;
691   }
692   DBUG_RETURN(rc);
693 }
694 
695 
696 /**
697   Copy a frm blob between streams.
698 
699   @param  src   The source stream.
700   @param  dst   The destination stream.
701 
702   @return Zero on success, non-zero otherwise.
703 */
704 
frm_copy(azio_stream * src,azio_stream * dst)705 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
706 {
707   int rc= 0;
708   uchar *frm_ptr;
709 
710   if (!src->frm_length)
711   {
712     size_t frm_len;
713     if (!table_share->read_frm_image((const uchar**) &frm_ptr, &frm_len))
714     {
715       azwrite_frm(dst, frm_ptr, frm_len);
716       table_share->free_frm_image(frm_ptr);
717     }
718     return 0;
719   }
720 
721   if (!(frm_ptr= (uchar *) my_malloc(PSI_INSTRUMENT_ME, src->frm_length,
722                                      MYF(MY_THREAD_SPECIFIC | MY_WME))))
723     return HA_ERR_OUT_OF_MEM;
724 
725   /* Write file offset is set to the end of the file. */
726   if (azread_frm(src, frm_ptr) ||
727       azwrite_frm(dst, frm_ptr, src->frm_length))
728     rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
729 
730   my_free(frm_ptr);
731 
732   return rc;
733 }
734 
735 
736 /**
737   Compare frm blob with the on-disk frm file
738 
739   @param  s     The azio stream.
740 
741   @return Zero if equal, non-zero otherwise.
742 */
743 
frm_compare(azio_stream * s)744 int ha_archive::frm_compare(azio_stream *s)
745 {
746   if (!s->frmver_length)
747     return 0; // Old pre-10.0 archive table. Never rediscover.
748 
749   LEX_CUSTRING *ver= &table->s->tabledef_version;
750   return ver->length != s->frmver_length ||
751          memcmp(ver->str,  s->frmver, ver->length);
752 }
753 
754 
755 /*
756   We create our data file here. The format is pretty simple.
757   You can read about the format of the data file above.
758   Unlike other storage engines we do not "pack" our data. Since we
759   are about to do a general compression, packing would just be a waste of
760   CPU time. If the table has blobs they are written after the row in the order
761   of creation.
762 */
763 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)764 int ha_archive::create(const char *name, TABLE *table_arg,
765                        HA_CREATE_INFO *create_info)
766 {
767   char name_buff[FN_REFLEN];
768   char linkname[FN_REFLEN];
769   int error;
770   azio_stream create_stream;            /* Archive file we are working with */
771   const uchar *frm_ptr;
772   size_t frm_len;
773 
774   DBUG_ENTER("ha_archive::create");
775 
776   stats.auto_increment_value= create_info->auto_increment_value;
777 
778   for (uint key= 0; key < table_arg->s->keys; key++)
779   {
780     KEY *pos= table_arg->key_info+key;
781     KEY_PART_INFO *key_part=     pos->key_part;
782     KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
783 
784     for (; key_part != key_part_end; key_part++)
785     {
786       Field *field= key_part->field;
787 
788       if (!(field->flags & AUTO_INCREMENT_FLAG))
789       {
790         error= HA_WRONG_CREATE_OPTION;
791         DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
792         goto error;
793       }
794     }
795   }
796 
797   /*
798     We reuse name_buff since it is available.
799   */
800 #ifdef HAVE_READLINK
801   if (my_use_symdir &&
802       create_info->data_file_name &&
803       create_info->data_file_name[0] != '#')
804   {
805     DBUG_PRINT("ha_archive", ("archive will create stream file %s",
806                         create_info->data_file_name));
807 
808     fn_format(name_buff, create_info->data_file_name, "", ARZ,
809               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
810     fn_format(linkname, name, "", ARZ,
811               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
812   }
813   else
814 #endif /* HAVE_READLINK */
815   {
816     if (create_info->data_file_name)
817       my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "DATA DIRECTORY");
818 
819     fn_format(name_buff, name, "", ARZ,
820               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
821     linkname[0]= 0;
822   }
823 
824   /* Archive engine never uses INDEX DIRECTORY. */
825   if (create_info->index_file_name)
826       my_error(WARN_OPTION_IGNORED, MYF(ME_WARNING), "INDEX DIRECTORY");
827 
828   /*
829     There is a chance that the file was "discovered". In this case
830     just use whatever file is there.
831   */
832   my_errno= 0;
833   if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
834   {
835     error= errno;
836     goto error2;
837   }
838 
839   if (linkname[0])
840     my_symlink(name_buff, linkname, MYF(0));
841 
842   /*
843     Here is where we open up the frm and pass it to archive to store
844   */
845   if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len))
846   {
847     azwrite_frm(&create_stream, frm_ptr, frm_len);
848     table_arg->s->free_frm_image(frm_ptr);
849   }
850 
851   if (create_info->comment.str)
852     azwrite_comment(&create_stream, create_info->comment.str,
853                     create_info->comment.length);
854 
855   /*
856     Yes you need to do this, because the starting value
857     for the autoincrement may not be zero.
858   */
859   create_stream.auto_increment= stats.auto_increment_value ?
860                                   stats.auto_increment_value - 1 : 0;
861   if (azclose(&create_stream))
862   {
863     error= errno;
864     goto error2;
865   }
866 
867   DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
868   DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
869 
870 
871   DBUG_RETURN(0);
872 
873 error2:
874   delete_table(name);
875 error:
876   /* Return error number, if we got one */
877   DBUG_RETURN(error ? error : -1);
878 }
879 
880 /*
881   This is where the actual row is written out.
882 */
real_write_row(const uchar * buf,azio_stream * writer)883 int ha_archive::real_write_row(const uchar *buf, azio_stream *writer)
884 {
885   my_off_t written;
886   unsigned int r_pack_length;
887   DBUG_ENTER("ha_archive::real_write_row");
888 
889   /* We pack the row for writing */
890   r_pack_length= pack_row(buf, writer);
891 
892   written= azwrite(writer, record_buffer->buffer, r_pack_length);
893   if (written != r_pack_length)
894   {
895     DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
896                                               (uint32) written,
897                                               (uint32)r_pack_length));
898     DBUG_RETURN(-1);
899   }
900 
901   if (!delayed_insert || !bulk_insert)
902     share->dirty= TRUE;
903 
904   DBUG_RETURN(0);
905 }
906 
907 
908 /*
909   Calculate max length needed for row. This includes
910   the bytes required for the length in the header.
911 */
912 
max_row_length(const uchar * record)913 uint32 ha_archive::max_row_length(const uchar *record)
914 {
915   uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
916   length+= ARCHIVE_ROW_HEADER_SIZE;
917   my_ptrdiff_t const rec_offset= record - table->record[0];
918 
919   uint *ptr, *end;
920   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
921        ptr != end ;
922        ptr++)
923   {
924     if (!table->field[*ptr]->is_null(rec_offset))
925       length += 2 + ((Field_blob*)table->field[*ptr])->get_length(rec_offset);
926   }
927 
928   return length;
929 }
930 
931 
pack_row(const uchar * record,azio_stream * writer)932 unsigned int ha_archive::pack_row(const uchar *record, azio_stream *writer)
933 {
934   uchar *ptr;
935   my_ptrdiff_t const rec_offset= record - table->record[0];
936   DBUG_ENTER("ha_archive::pack_row");
937 
938   if (fix_rec_buff(max_row_length(record)))
939     DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
940 
941   if (writer->version == 1)
942     DBUG_RETURN(pack_row_v1(record));
943 
944   /* Copy null bits */
945   memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
946          record, table->s->null_bytes);
947   ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
948 
949   for (Field **field=table->field ; *field ; field++)
950   {
951     if (!((*field)->is_null(rec_offset)))
952       ptr= (*field)->pack(ptr, record + (*field)->offset(record));
953   }
954 
955   int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
956                                          ARCHIVE_ROW_HEADER_SIZE));
957   DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
958                            (ptr - record_buffer->buffer -
959                              ARCHIVE_ROW_HEADER_SIZE)));
960 
961   DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
962 }
963 
964 
965 /*
966   Look at ha_archive::open() for an explanation of the row format.
967   Here we just write out the row.
968 
969   Wondering about start_bulk_insert()? We don't implement it for
970   archive since it optimizes for lots of writes. The only save
971   for implementing start_bulk_insert() is that we could skip
972   setting dirty to true each time.
973 */
write_row(const uchar * buf)974 int ha_archive::write_row(const uchar *buf)
975 {
976   int rc;
977   uchar *read_buf= NULL;
978   ulonglong temp_auto;
979   uchar *record=  table->record[0];
980   DBUG_ENTER("ha_archive::write_row");
981 
982   if (share->crashed)
983     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
984 
985   mysql_mutex_lock(&share->mutex);
986 
987   if (!share->archive_write_open && share->init_archive_writer())
988   {
989     rc= errno;
990     goto error;
991   }
992 
993   if (table->next_number_field && record == table->record[0])
994   {
995     KEY *mkey= &table->key_info[0]; // We only support one key right now
996     update_auto_increment();
997     temp_auto= table->next_number_field->val_int();
998 
999     /*
1000       We don't support decremening auto_increment. They make the performance
1001       just cry.
1002     */
1003     if (temp_auto <= share->archive_write.auto_increment &&
1004         mkey->flags & HA_NOSAME)
1005     {
1006       rc= HA_ERR_FOUND_DUPP_KEY;
1007       goto error;
1008     }
1009 #ifdef DEAD_CODE
1010     /*
1011       Bad news, this will cause a search for the unique value which is very
1012       expensive since we will have to do a table scan which will lock up
1013       all other writers during this period. This could perhaps be optimized
1014       in the future.
1015     */
1016     {
1017       /*
1018         First we create a buffer that we can use for reading rows, and can pass
1019         to get_row().
1020       */
1021       if (!(read_buf= (uchar*) my_malloc(table->s->reclength,
1022                                          MYF(MY_THREAD_SPECIFIC | MY_WME))))
1023       {
1024         rc= HA_ERR_OUT_OF_MEM;
1025         goto error;
1026       }
1027        /*
1028          All of the buffer must be written out or we won't see all of the
1029          data
1030        */
1031       azflush(&(share->archive_write), Z_SYNC_FLUSH);
1032       /*
1033         Set the position of the local read thread to the beginning position.
1034       */
1035       if (read_data_header(&archive))
1036       {
1037         rc= HA_ERR_CRASHED_ON_USAGE;
1038         goto error;
1039       }
1040 
1041       Field *mfield= table->next_number_field;
1042 
1043       while (!(get_row(&archive, read_buf)))
1044       {
1045         if (!memcmp(read_buf + mfield->offset(record),
1046                     table->next_number_field->ptr,
1047                     mfield->max_display_length()))
1048         {
1049           rc= HA_ERR_FOUND_DUPP_KEY;
1050           goto error;
1051         }
1052       }
1053     }
1054 #endif
1055     else
1056     {
1057       if (temp_auto > share->archive_write.auto_increment)
1058         stats.auto_increment_value=
1059           (share->archive_write.auto_increment= temp_auto) + 1;
1060     }
1061   }
1062 
1063   /*
1064     Notice that the global auto_increment has been increased.
1065     In case of a failed row write, we will never try to reuse the value.
1066   */
1067   share->rows_recorded++;
1068   rc= real_write_row(buf,  &(share->archive_write));
1069 error:
1070   mysql_mutex_unlock(&share->mutex);
1071   my_free(read_buf);
1072   DBUG_RETURN(rc);
1073 }
1074 
1075 
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1076 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1077                                     ulonglong nb_desired_values,
1078                                     ulonglong *first_value,
1079                                     ulonglong *nb_reserved_values)
1080 {
1081   *nb_reserved_values= ULONGLONG_MAX;
1082   *first_value= share->archive_write.auto_increment + 1;
1083 }
1084 
1085 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1086 int ha_archive::index_init(uint keynr, bool sorted)
1087 {
1088   DBUG_ENTER("ha_archive::index_init");
1089   active_index= keynr;
1090   DBUG_RETURN(0);
1091 }
1092 
1093 
1094 /*
1095   No indexes, so if we get a request for an index search since we tell
1096   the optimizer that we have unique indexes, we scan
1097 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1098 int ha_archive::index_read(uchar *buf, const uchar *key,
1099                              uint key_len, enum ha_rkey_function find_flag)
1100 {
1101   int rc;
1102   DBUG_ENTER("ha_archive::index_read");
1103   rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1104   DBUG_RETURN(rc);
1105 }
1106 
1107 
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1108 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1109                                  uint key_len, enum ha_rkey_function find_flag)
1110 {
1111   int rc;
1112   bool found= 0;
1113   KEY *mkey= &table->key_info[index];
1114   current_k_offset= mkey->key_part->offset;
1115   current_key= key;
1116   current_key_len= key_len;
1117 
1118 
1119   DBUG_ENTER("ha_archive::index_read_idx");
1120 
1121   rc= rnd_init(TRUE);
1122 
1123   if (rc)
1124     goto error;
1125 
1126   while (!(get_row(&archive, buf)))
1127   {
1128     if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1129     {
1130       found= 1;
1131       break;
1132     }
1133   }
1134 
1135   if (found)
1136   {
1137     /* notify handler that a record has been found */
1138     table->status= 0;
1139     DBUG_RETURN(0);
1140   }
1141 
1142 error:
1143   DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1144 }
1145 
1146 
index_next(uchar * buf)1147 int ha_archive::index_next(uchar * buf)
1148 {
1149   bool found= 0;
1150   int rc;
1151 
1152   DBUG_ENTER("ha_archive::index_next");
1153 
1154   while (!(get_row(&archive, buf)))
1155   {
1156     if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1157     {
1158       found= 1;
1159       break;
1160     }
1161   }
1162 
1163   rc= found ? 0 : HA_ERR_END_OF_FILE;
1164   DBUG_RETURN(rc);
1165 }
1166 
1167 /*
1168   All calls that need to scan the table start with this method. If we are told
1169   that it is a table scan we rewind the file to the beginning, otherwise
1170   we assume the position will be set.
1171 */
1172 
rnd_init(bool scan)1173 int ha_archive::rnd_init(bool scan)
1174 {
1175   DBUG_ENTER("ha_archive::rnd_init");
1176 
1177   if (share->crashed)
1178       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1179 
1180   if (init_archive_reader())
1181       DBUG_RETURN(errno);
1182 
1183   /* We rewind the file so that we can read from the beginning if scan */
1184   if (scan)
1185   {
1186     scan_rows= stats.records;
1187     DBUG_PRINT("info", ("archive will retrieve %llu rows",
1188                         (unsigned long long) scan_rows));
1189 
1190     if (read_data_header(&archive))
1191       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1192   }
1193 
1194   DBUG_RETURN(0);
1195 }
1196 
1197 
1198 /*
1199   This is the method that is used to read a row. It assumes that the row is
1200   positioned where you want it.
1201 */
get_row(azio_stream * file_to_read,uchar * buf)1202 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1203 {
1204   int rc;
1205   DBUG_ENTER("ha_archive::get_row");
1206   DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1207                             (uchar)file_to_read->version,
1208                             ARCHIVE_VERSION));
1209   if (file_to_read->version == ARCHIVE_VERSION)
1210     rc= get_row_version3(file_to_read, buf);
1211   else
1212     rc= get_row_version2(file_to_read, buf);
1213 
1214   DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1215 
1216   DBUG_RETURN(rc);
1217 }
1218 
1219 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1220 bool ha_archive::fix_rec_buff(unsigned int length)
1221 {
1222   DBUG_ENTER("ha_archive::fix_rec_buff");
1223   DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1224                             length, record_buffer->length));
1225   DBUG_ASSERT(record_buffer->buffer);
1226 
1227   if (length > record_buffer->length)
1228   {
1229     uchar *newptr;
1230     if (!(newptr=(uchar*) my_realloc(PSI_INSTRUMENT_ME,
1231                                      (uchar*) record_buffer->buffer, length,
1232 				    MYF(MY_ALLOW_ZERO_PTR))))
1233       DBUG_RETURN(1);
1234     record_buffer->buffer= newptr;
1235     record_buffer->length= length;
1236   }
1237 
1238   DBUG_ASSERT(length <= record_buffer->length);
1239 
1240   DBUG_RETURN(0);
1241 }
1242 
unpack_row(azio_stream * file_to_read,uchar * record)1243 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1244 {
1245   DBUG_ENTER("ha_archive::unpack_row");
1246 
1247   unsigned int read;
1248   int error;
1249   uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE];
1250   unsigned int row_len;
1251 
1252   /* First we grab the length stored */
1253   read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1254 
1255   if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1256     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1257 
1258   /* If we read nothing we are at the end of the file */
1259   if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1260     DBUG_RETURN(HA_ERR_END_OF_FILE);
1261 
1262   row_len=  uint4korr(size_buffer);
1263   DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1264                            (unsigned int)table->s->reclength));
1265 
1266   if (fix_rec_buff(row_len))
1267   {
1268     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1269   }
1270   DBUG_ASSERT(row_len <= record_buffer->length);
1271 
1272   read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1273 
1274   if (read != row_len || error)
1275   {
1276     DBUG_RETURN(error ? HA_ERR_CRASHED_ON_USAGE : HA_ERR_WRONG_IN_RECORD);
1277   }
1278 
1279   /* Copy null bits */
1280   const uchar *ptr= record_buffer->buffer, *end= ptr+ row_len;
1281   memcpy(record, ptr, table->s->null_bytes);
1282   ptr+= table->s->null_bytes;
1283   if (ptr > end)
1284     DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1285   for (Field **field=table->field ; *field ; field++)
1286   {
1287     if (!((*field)->is_null_in_record(record)))
1288     {
1289       if (!(ptr= (*field)->unpack(record + (*field)->offset(table->record[0]),
1290                                   ptr, end)))
1291         DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1292     }
1293   }
1294   if (ptr != end)
1295     DBUG_RETURN(HA_ERR_WRONG_IN_RECORD);
1296   DBUG_RETURN(0);
1297 }
1298 
1299 
get_row_version3(azio_stream * file_to_read,uchar * buf)1300 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1301 {
1302   DBUG_ENTER("ha_archive::get_row_version3");
1303 
1304   int returnable= unpack_row(file_to_read, buf);
1305 
1306   DBUG_RETURN(returnable);
1307 }
1308 
1309 
get_row_version2(azio_stream * file_to_read,uchar * buf)1310 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1311 {
1312   unsigned int read;
1313   int error;
1314   uint *ptr, *end;
1315   char *last;
1316   size_t total_blob_length= 0;
1317   MY_BITMAP *read_set= table->read_set;
1318   DBUG_ENTER("ha_archive::get_row_version2");
1319 
1320   read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1321 
1322   /* If we read nothing we are at the end of the file */
1323   if (read == 0)
1324     DBUG_RETURN(HA_ERR_END_OF_FILE);
1325 
1326   if (read != table->s->reclength)
1327   {
1328     DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1329                                                 read,
1330                                                 (unsigned int)table->s->reclength));
1331     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1332   }
1333 
1334   if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1335     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1336 
1337   /*
1338     If the record is the wrong size, the file is probably damaged, unless
1339     we are dealing with a delayed insert or a bulk insert.
1340   */
1341   if ((ulong) read != table->s->reclength)
1342     DBUG_RETURN(HA_ERR_END_OF_FILE);
1343 
1344   /* Calculate blob length, we use this for our buffer */
1345   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1346        ptr != end ;
1347        ptr++)
1348   {
1349     if (bitmap_is_set(read_set,
1350                       (((Field_blob*) table->field[*ptr])->field_index)))
1351         total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1352   }
1353 
1354   /* Adjust our row buffer if we need be */
1355   buffer.alloc(total_blob_length);
1356   last= (char *)buffer.ptr();
1357 
1358   /* Loop through our blobs and read them */
1359   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1360        ptr != end ;
1361        ptr++)
1362   {
1363     size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1364     if (size)
1365     {
1366       if (bitmap_is_set(read_set,
1367                         ((Field_blob*) table->field[*ptr])->field_index))
1368       {
1369         read= azread(file_to_read, last, size, &error);
1370 
1371         if (error)
1372           DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1373 
1374         if ((size_t) read != size)
1375           DBUG_RETURN(HA_ERR_END_OF_FILE);
1376         ((Field_blob*) table->field[*ptr])->set_ptr(read, (uchar*) last);
1377         last += size;
1378       }
1379       else
1380       {
1381         (void)azseek(file_to_read, size, SEEK_CUR);
1382       }
1383     }
1384   }
1385   DBUG_RETURN(0);
1386 }
1387 
1388 
1389 /*
1390   Called during ORDER BY. Its position is either from being called sequentially
1391   or by having had ha_archive::rnd_pos() called before it is called.
1392 */
1393 
rnd_next(uchar * buf)1394 int ha_archive::rnd_next(uchar *buf)
1395 {
1396   int rc;
1397   DBUG_ENTER("ha_archive::rnd_next");
1398 
1399   if (share->crashed)
1400       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1401 
1402   if (!scan_rows)
1403   {
1404     rc= HA_ERR_END_OF_FILE;
1405     goto end;
1406   }
1407   scan_rows--;
1408 
1409   current_position= aztell(&archive);
1410   rc= get_row(&archive, buf);
1411 
1412 end:
1413   DBUG_RETURN(rc);
1414 }
1415 
1416 
1417 /*
1418   Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1419   each call to ha_archive::rnd_next() if an ordering of the rows is
1420   needed.
1421 */
1422 
position(const uchar * record)1423 void ha_archive::position(const uchar *record)
1424 {
1425   DBUG_ENTER("ha_archive::position");
1426   my_store_ptr(ref, ref_length, current_position);
1427   DBUG_VOID_RETURN;
1428 }
1429 
1430 
1431 /*
1432   This is called after a table scan for each row if the results of the
1433   scan need to be ordered. It will take *pos and use it to move the
1434   cursor in the file so that the next row that is called is the
1435   correctly ordered row.
1436 */
1437 
rnd_pos(uchar * buf,uchar * pos)1438 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1439 {
1440   int rc;
1441   DBUG_ENTER("ha_archive::rnd_pos");
1442   current_position= (my_off_t)my_get_ptr(pos, ref_length);
1443   if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1444   {
1445     rc= HA_ERR_CRASHED_ON_USAGE;
1446     goto end;
1447   }
1448   rc= get_row(&archive, buf);
1449 end:
1450   DBUG_RETURN(rc);
1451 }
1452 
1453 
1454 /**
1455   @brief Check for upgrade
1456 
1457   @param[in]  check_opt  check options
1458 
1459   @return Completion status
1460     @retval HA_ADMIN_OK            No upgrade required
1461     @retval HA_ADMIN_CORRUPT       Cannot read meta-data
1462     @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1463 */
1464 
check_for_upgrade(HA_CHECK_OPT * check_opt)1465 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1466 {
1467   DBUG_ENTER("ha_archive::check_for_upgrade");
1468   if (init_archive_reader())
1469     DBUG_RETURN(HA_ADMIN_CORRUPT);
1470   if (archive.version < ARCHIVE_VERSION)
1471     DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1472   DBUG_RETURN(HA_ADMIN_OK);
1473 }
1474 
1475 
1476 /*
1477   This method repairs the meta file. It does this by walking the datafile and
1478   rewriting the meta file. If EXTENDED repair is requested, we attempt to
1479   recover as much data as possible.
1480 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1481 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1482 {
1483   DBUG_ENTER("ha_archive::repair");
1484   int rc= optimize(thd, check_opt);
1485 
1486   if (rc)
1487     DBUG_RETURN(HA_ADMIN_CORRUPT);
1488 
1489   share->crashed= FALSE;
1490   DBUG_RETURN(0);
1491 }
1492 
1493 /*
1494   The table can become fragmented if data was inserted, read, and then
1495   inserted again. What we do is open up the file and recompress it completely.
1496 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1497 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1498 {
1499   int rc= 0;
1500   azio_stream writer;
1501   char writer_filename[FN_REFLEN];
1502   DBUG_ENTER("ha_archive::optimize");
1503 
1504   mysql_mutex_lock(&share->mutex);
1505 
1506   if (init_archive_reader())
1507   {
1508     mysql_mutex_unlock(&share->mutex);
1509     DBUG_RETURN(errno);
1510   }
1511 
1512   // now we close both our writer and our reader for the rename
1513   if (share->archive_write_open)
1514   {
1515     azclose(&(share->archive_write));
1516     share->archive_write_open= FALSE;
1517   }
1518 
1519   /* Lets create a file to contain the new data */
1520   fn_format(writer_filename, share->table_name, "", ARN,
1521             MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1522 
1523   if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1524   {
1525     mysql_mutex_unlock(&share->mutex);
1526     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1527   }
1528 
1529   /*
1530     Transfer the embedded FRM so that the file can be discoverable.
1531     Write file offset is set to the end of the file.
1532   */
1533   if ((rc= frm_copy(&archive, &writer)))
1534     goto error;
1535 
1536   /*
1537     An extended rebuild is a lot more effort. We open up each row and re-record it.
1538     Any dead rows are removed (aka rows that may have been partially recorded).
1539 
1540     As of Archive format 3, this is the only type that is performed, before this
1541     version it was just done on T_EXTEND
1542   */
1543   if (1)
1544   {
1545     DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1546 
1547     /*
1548       Now we will rewind the archive file so that we are positioned at the
1549       start of the file.
1550     */
1551     rc= read_data_header(&archive);
1552 
1553     /*
1554       On success of writing out the new header, we now fetch each row and
1555       insert it into the new archive file.
1556     */
1557     if (!rc)
1558     {
1559       share->rows_recorded= 0;
1560       stats.auto_increment_value= 1;
1561       share->archive_write.auto_increment= 0;
1562       MY_BITMAP *org_bitmap= tmp_use_all_columns(table, &table->read_set);
1563 
1564       while (!(rc= get_row(&archive, table->record[0])))
1565       {
1566         real_write_row(table->record[0], &writer);
1567         /*
1568           Long term it should be possible to optimize this so that
1569           it is not called on each row.
1570         */
1571         if (table->found_next_number_field)
1572         {
1573           Field *field= table->found_next_number_field;
1574           ulonglong auto_value=
1575             (ulonglong) field->val_int(table->record[0] +
1576                                        field->offset(table->record[0]));
1577           if (share->archive_write.auto_increment < auto_value)
1578             stats.auto_increment_value=
1579               (share->archive_write.auto_increment= auto_value) + 1;
1580         }
1581       }
1582 
1583       tmp_restore_column_map(&table->read_set, org_bitmap);
1584       share->rows_recorded= (ha_rows)writer.rows;
1585     }
1586 
1587     DBUG_PRINT("info", ("recovered %llu archive rows",
1588                         (unsigned long long)share->rows_recorded));
1589 
1590     DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1591                         (unsigned long long)share->rows_recorded));
1592 
1593     /*
1594       If REPAIR ... EXTENDED is requested, try to recover as much data
1595       from data file as possible. In this case if we failed to read a
1596       record, we assume EOF. This allows massive data loss, but we can
1597       hardly do more with broken zlib stream. And this is the only way
1598       to restore at least what is still recoverable.
1599     */
1600     if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1601       goto error;
1602   }
1603 
1604   azclose(&writer);
1605   share->dirty= FALSE;
1606 
1607   azclose(&archive);
1608 
1609   // make the file we just wrote be our data file
1610   rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1611 
1612 
1613   mysql_mutex_unlock(&share->mutex);
1614   DBUG_RETURN(rc);
1615 error:
1616   DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1617   azclose(&writer);
1618   mysql_mutex_unlock(&share->mutex);
1619 
1620   DBUG_RETURN(rc);
1621 }
1622 
1623 /*
1624   Below is an example of how to setup row level locking.
1625 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1626 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1627                                        THR_LOCK_DATA **to,
1628                                        enum thr_lock_type lock_type)
1629 {
1630   if (lock_type == TL_WRITE_DELAYED)
1631     delayed_insert= TRUE;
1632   else
1633     delayed_insert= FALSE;
1634 
1635   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1636   {
1637     /*
1638       Here is where we get into the guts of a row level lock.
1639       If TL_UNLOCK is set
1640       If we are not doing a LOCK TABLE, DELAYED LOCK or DISCARD/IMPORT
1641       TABLESPACE, then allow multiple writers
1642     */
1643 
1644     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1645          lock_type <= TL_WRITE) && delayed_insert == FALSE &&
1646         !thd_in_lock_tables(thd)
1647         && !thd_tablespace_op(thd))
1648       lock_type = TL_WRITE_ALLOW_WRITE;
1649 
1650     /*
1651       In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1652       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1653       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1654       to t2. Convert the lock to a normal read lock to allow
1655       concurrent inserts to t2.
1656     */
1657 
1658     if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1659       lock_type = TL_READ;
1660 
1661     lock.type=lock_type;
1662   }
1663 
1664   *to++= &lock;
1665 
1666   return to;
1667 }
1668 
update_create_info(HA_CREATE_INFO * create_info)1669 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1670 {
1671   char tmp_real_path[FN_REFLEN];
1672   DBUG_ENTER("ha_archive::update_create_info");
1673 
1674   ha_archive::info(HA_STATUS_AUTO);
1675   if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1676   {
1677     create_info->auto_increment_value= stats.auto_increment_value;
1678   }
1679 
1680   if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1681     create_info->data_file_name= thd_strdup(ha_thd(), tmp_real_path);
1682 
1683   DBUG_VOID_RETURN;
1684 }
1685 
1686 /*
1687   Hints for optimizer, see ha_tina for more information
1688 */
info(uint flag)1689 int ha_archive::info(uint flag)
1690 {
1691   DBUG_ENTER("ha_archive::info");
1692 
1693   flush_and_clear_pending_writes();
1694   stats.deleted= 0;
1695 
1696   DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1697   /* Costs quite a bit more to get all information */
1698   if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1699   {
1700     MY_STAT file_stat;  // Stat information for the data file
1701 
1702     (void) mysql_file_stat(/* arch_key_file_data */ 0, share->data_file_name, &file_stat, MYF(MY_WME));
1703 
1704     if (flag & HA_STATUS_TIME)
1705       stats.update_time= (ulong) file_stat.st_mtime;
1706     if (flag & HA_STATUS_CONST)
1707     {
1708       stats.max_data_file_length= MAX_FILE_SIZE;
1709       stats.create_time= (ulong) file_stat.st_ctime;
1710     }
1711     if (flag & HA_STATUS_VARIABLE)
1712     {
1713       stats.delete_length= 0;
1714       stats.data_file_length= file_stat.st_size;
1715       stats.index_file_length=0;
1716       stats.mean_rec_length= stats.records ?
1717         ulong(stats.data_file_length / stats.records) : table->s->reclength;
1718     }
1719   }
1720 
1721   if (flag & HA_STATUS_AUTO)
1722   {
1723     if (init_archive_reader())
1724       DBUG_RETURN(errno);
1725 
1726     mysql_mutex_lock(&share->mutex);
1727     azflush(&archive, Z_SYNC_FLUSH);
1728     mysql_mutex_unlock(&share->mutex);
1729     stats.auto_increment_value= archive.auto_increment + 1;
1730   }
1731 
1732   DBUG_RETURN(0);
1733 }
1734 
1735 
external_lock(THD * thd,int lock_type)1736 int ha_archive::external_lock(THD *thd, int lock_type)
1737 {
1738   if (lock_type == F_RDLCK)
1739   {
1740     // We are going to read from the table. Flush any pending writes that we
1741     // may have
1742     flush_and_clear_pending_writes();
1743   }
1744   return 0;
1745 }
1746 
1747 
flush_and_clear_pending_writes()1748 void ha_archive::flush_and_clear_pending_writes()
1749 {
1750   mysql_mutex_lock(&share->mutex);
1751   if (share->dirty)
1752   {
1753     DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1754     DBUG_ASSERT(share->archive_write_open);
1755     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1756     share->dirty= FALSE;
1757   }
1758 
1759   /*
1760     This should be an accurate number now, though bulk and delayed inserts can
1761     cause the number to be inaccurate.
1762   */
1763   stats.records= share->rows_recorded;
1764   mysql_mutex_unlock(&share->mutex);
1765 }
1766 
1767 
extra(enum ha_extra_function operation)1768 int ha_archive::extra(enum ha_extra_function operation)
1769 {
1770   switch (operation) {
1771   case HA_EXTRA_FLUSH:
1772     mysql_mutex_lock(&share->mutex);
1773     share->close_archive_writer();
1774     mysql_mutex_unlock(&share->mutex);
1775     break;
1776   default:
1777     break;
1778   }
1779   return 0;
1780 }
1781 
1782 /*
1783   This method tells us that a bulk insert operation is about to occur. We set
1784   a flag which will keep write_row from saying that its data is dirty. This in
1785   turn will keep selects from causing a sync to occur.
1786   Basically, yet another optimizations to keep compression working well.
1787 */
start_bulk_insert(ha_rows rows,uint flags)1788 void ha_archive::start_bulk_insert(ha_rows rows, uint flags)
1789 {
1790   DBUG_ENTER("ha_archive::start_bulk_insert");
1791   if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1792     bulk_insert= TRUE;
1793   DBUG_VOID_RETURN;
1794 }
1795 
1796 
1797 /*
1798   Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1799   flag, and set the share dirty so that the next select will call sync for us.
1800 */
end_bulk_insert()1801 int ha_archive::end_bulk_insert()
1802 {
1803   DBUG_ENTER("ha_archive::end_bulk_insert");
1804   bulk_insert= FALSE;
1805   mysql_mutex_lock(&share->mutex);
1806   if (share->archive_write_open)
1807     share->dirty= true;
1808   mysql_mutex_unlock(&share->mutex);
1809   DBUG_RETURN(0);
1810 }
1811 
1812 /*
1813   We cancel a truncate command. The only way to delete an archive table is to drop it.
1814   This is done for security reasons. In a later version we will enable this by
1815   allowing the user to select a different row format.
1816 */
truncate()1817 int ha_archive::truncate()
1818 {
1819   DBUG_ENTER("ha_archive::truncate");
1820   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1821 }
1822 
1823 /*
1824   We just return state if asked.
1825 */
is_crashed() const1826 bool ha_archive::is_crashed() const
1827 {
1828   DBUG_ENTER("ha_archive::is_crashed");
1829   DBUG_RETURN(share->crashed);
1830 }
1831 
1832 /*
1833   Simple scan of the tables to make sure everything is ok.
1834 */
1835 
check(THD * thd,HA_CHECK_OPT * check_opt)1836 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1837 {
1838   int rc= 0;
1839   const char *old_proc_info;
1840   ha_rows count;
1841   DBUG_ENTER("ha_archive::check");
1842 
1843   old_proc_info= thd_proc_info(thd, "Checking table");
1844   mysql_mutex_lock(&share->mutex);
1845   count= share->rows_recorded;
1846   /* Flush any waiting data */
1847   if (share->archive_write_open)
1848     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1849   mysql_mutex_unlock(&share->mutex);
1850 
1851   if (init_archive_reader())
1852     DBUG_RETURN(HA_ADMIN_CORRUPT);
1853   /*
1854     Now we will rewind the archive file so that we are positioned at the
1855     start of the file.
1856   */
1857   read_data_header(&archive);
1858   for (ha_rows cur_count= count; cur_count; cur_count--)
1859   {
1860     if ((rc= get_row(&archive, table->record[0])))
1861       goto error;
1862   }
1863   /*
1864     Now read records that may have been inserted concurrently.
1865     Acquire share->mutex so tail of the table is not modified by
1866     concurrent writers.
1867   */
1868   mysql_mutex_lock(&share->mutex);
1869   count= share->rows_recorded - count;
1870   if (share->archive_write_open)
1871     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1872   while (!(rc= get_row(&archive, table->record[0])))
1873     count--;
1874   mysql_mutex_unlock(&share->mutex);
1875 
1876   if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1877     goto error;
1878 
1879   thd_proc_info(thd, old_proc_info);
1880   DBUG_RETURN(HA_ADMIN_OK);
1881 
1882 error:
1883   thd_proc_info(thd, old_proc_info);
1884   share->crashed= FALSE;
1885   DBUG_RETURN(HA_ADMIN_CORRUPT);
1886 }
1887 
1888 /*
1889   Check and repair the table if needed.
1890 */
check_and_repair(THD * thd)1891 bool ha_archive::check_and_repair(THD *thd)
1892 {
1893   HA_CHECK_OPT check_opt;
1894   DBUG_ENTER("ha_archive::check_and_repair");
1895 
1896   check_opt.init();
1897 
1898   DBUG_RETURN(repair(thd, &check_opt));
1899 }
1900 
create_record_buffer(unsigned int length)1901 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1902 {
1903   DBUG_ENTER("ha_archive::create_record_buffer");
1904   archive_record_buffer *r;
1905   if (!(r= (archive_record_buffer*) my_malloc(PSI_INSTRUMENT_ME,
1906                                  sizeof(archive_record_buffer), MYF(MY_WME))))
1907   {
1908     DBUG_RETURN(NULL); /* purecov: inspected */
1909   }
1910   r->length= (int)length;
1911 
1912   if (!(r->buffer= (uchar*) my_malloc(PSI_INSTRUMENT_ME, r->length, MYF(MY_WME))))
1913   {
1914     my_free(r);
1915     DBUG_RETURN(NULL); /* purecov: inspected */
1916   }
1917 
1918   DBUG_RETURN(r);
1919 }
1920 
destroy_record_buffer(archive_record_buffer * r)1921 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1922 {
1923   DBUG_ENTER("ha_archive::destroy_record_buffer");
1924   my_free(r->buffer);
1925   my_free(r);
1926   DBUG_VOID_RETURN;
1927 }
1928 
1929 /*
1930   In archive *any* ALTER should cause a table to be rebuilt,
1931   no ALTER can be frm-only.
1932   Because after any change to the frm file archive must update the
1933   frm image in the ARZ file. And this cannot be done in-place, it
1934   requires ARZ file to be recreated from scratch
1935 */
check_if_incompatible_data(HA_CREATE_INFO * info_arg,uint table_changes)1936 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info_arg,
1937                                             uint table_changes)
1938 {
1939   return COMPATIBLE_DATA_NO;
1940 }
1941 
1942 
1943 struct st_mysql_storage_engine archive_storage_engine=
1944 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1945 
maria_declare_plugin(archive)1946 maria_declare_plugin(archive)
1947 {
1948   MYSQL_STORAGE_ENGINE_PLUGIN,
1949   &archive_storage_engine,
1950   "ARCHIVE",
1951   "Brian Aker, MySQL AB",
1952   "gzip-compresses tables for a low storage footprint",
1953   PLUGIN_LICENSE_GPL,
1954   archive_db_init, /* Plugin Init */
1955   NULL, /* Plugin Deinit */
1956   0x0300 /* 3.0 */,
1957   NULL,                       /* status variables                */
1958   NULL,                       /* system variables                */
1959   "1.0",                      /* string version */
1960   MariaDB_PLUGIN_MATURITY_STABLE /* maturity */
1961 }
1962 maria_declare_plugin_end;
1963 
1964