1 /*
2    Copyright (c) 2004, 2012, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "sql_priv.h"
26 #include "probes_mysql.h"
27 #include "sql_class.h"                          // SSV
28 #include "sql_table.h"
29 #include <myisam.h>
30 
31 #include "ha_archive.h"
32 #include <my_dir.h>
33 
34 #include <mysql/plugin.h>
35 
36 /*
37   First, if you want to understand storage engines you should look at
38   ha_example.cc and ha_example.h.
39 
40   This example was written as a test case for a customer who needed
41   a storage engine without indexes that could compress data very well.
42   So, welcome to a completely compressed storage engine. This storage
43   engine only does inserts. No replace, deletes, or updates. All reads are
44   complete table scans. Compression is done through a combination of packing
45   and making use of the zlib library
46 
47   We keep a file pointer open for each instance of ha_archive for each read
48   but for writes we keep one open file handle just for that. We flush it
49   only if we have a read occur. azip handles compressing lots of records
50   at once much better then doing lots of little records between writes.
51   It is possible to not lock on writes but this would then mean we couldn't
52   handle bulk inserts as well (that is if someone was trying to read at
53   the same time since we would want to flush).
54 
55   A "meta" file is kept alongside the data file. This file serves two purpose.
56   The first purpose is to track the number of rows in the table. The second
57   purpose is to determine if the table was closed properly or not. When the
58   meta file is first opened it is marked as dirty. It is opened when the table
59   itself is opened for writing. When the table is closed the new count for rows
60   is written to the meta file and the file is marked as clean. If the meta file
61   is opened and it is marked as dirty, it is assumed that a crash occured. At
62   this point an error occurs and the user is told to rebuild the file.
63   A rebuild scans the rows and rewrites the meta file. If corruption is found
64   in the data file then the meta file is not repaired.
65 
66   At some point a recovery method for such a drastic case needs to be divised.
67 
68   Locks are row level, and you will get a consistant read.
69 
70   For performance as far as table scans go it is quite fast. I don't have
71   good numbers but locally it has out performed both Innodb and MyISAM. For
72   Innodb the question will be if the table can be fit into the buffer
73   pool. For MyISAM its a question of how much the file system caches the
74   MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75   doesn't have enough memory to cache entire table that archive turns out
76   to be any faster.
77 
78   Examples between MyISAM (packed) and Archive.
79 
80   Table with 76695844 identical rows:
81   29680807 a_archive.ARZ
82   920350317 a.MYD
83 
84 
85   Table with 8991478 rows (all of Slashdot's comments):
86   1922964506 comment_archive.ARZ
87   2944970297 comment_text.MYD
88 
89 
90   TODO:
91    Allow users to set compression level.
92    Allow adjustable block size.
93    Implement versioning, should be easy.
94    Allow for errors, find a way to mark bad rows.
95    Add optional feature so that rows can be flushed at interval (which will cause less
96      compression but may speed up ordered searches).
97    Checkpoint the meta file to allow for faster rebuilds.
98    Option to allow for dirty reads, this would lower the sync calls, which would make
99      inserts a lot faster, but would mean highly arbitrary reads.
100 
101     -Brian
102 
103   Archive file format versions:
104   <5.1.5 - v.1
105   5.1.5-5.1.15 - v.2
106   >5.1.15 - v.3
107 */
108 
109 /* The file extension */
110 #define ARZ ".ARZ"               // The data file
111 #define ARN ".ARN"               // Files used during an optimize call
112 #define ARM ".ARM"               // Meta file (deprecated)
113 
114 /* 5.0 compatibility */
115 #define META_V1_OFFSET_CHECK_HEADER  0
116 #define META_V1_OFFSET_VERSION       1
117 #define META_V1_OFFSET_ROWS_RECORDED 2
118 #define META_V1_OFFSET_CHECK_POINT   10
119 #define META_V1_OFFSET_CRASHED       18
120 #define META_V1_LENGTH               19
121 
122 /*
123   uchar + uchar
124 */
125 #define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
126 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
127 
128 #ifdef HAVE_PSI_INTERFACE
129 extern "C" PSI_file_key arch_key_file_data;
130 #endif
131 
132 /* Static declarations for handerton */
133 static handler *archive_create_handler(handlerton *hton,
134                                        TABLE_SHARE *table,
135                                        MEM_ROOT *mem_root);
136 int archive_discover(handlerton *hton, THD* thd, const char *db,
137                      const char *name,
138                      uchar **frmblob,
139                      size_t *frmlen);
140 
141 /*
142   Number of rows that will force a bulk insert.
143 */
144 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
145 
146 /*
147   Size of header used for row
148 */
149 #define ARCHIVE_ROW_HEADER_SIZE 4
150 
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)151 static handler *archive_create_handler(handlerton *hton,
152                                        TABLE_SHARE *table,
153                                        MEM_ROOT *mem_root)
154 {
155   return new (mem_root) ha_archive(hton, table);
156 }
157 
158 
159 #ifdef HAVE_PSI_INTERFACE
160 PSI_mutex_key az_key_mutex_Archive_share_mutex;
161 
162 static PSI_mutex_info all_archive_mutexes[]=
163 {
164   { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
165 };
166 
167 PSI_file_key arch_key_file_metadata, arch_key_file_data, arch_key_file_frm;
168 static PSI_file_info all_archive_files[]=
169 {
170     { &arch_key_file_metadata, "metadata", 0},
171     { &arch_key_file_data, "data", 0},
172     { &arch_key_file_frm, "FRM", 0}
173 };
174 
init_archive_psi_keys(void)175 static void init_archive_psi_keys(void)
176 {
177   const char* category= "archive";
178   int count;
179 
180   count= array_elements(all_archive_mutexes);
181   mysql_mutex_register(category, all_archive_mutexes, count);
182 
183   count= array_elements(all_archive_files);
184   mysql_file_register(category, all_archive_files, count);
185 }
186 
187 
188 #endif /* HAVE_PSI_INTERFACE */
189 
190 /*
191   Initialize the archive handler.
192 
193   SYNOPSIS
194     archive_db_init()
195     void *
196 
197   RETURN
198     FALSE       OK
199     TRUE        Error
200 */
201 
archive_db_init(void * p)202 int archive_db_init(void *p)
203 {
204   DBUG_ENTER("archive_db_init");
205   handlerton *archive_hton;
206 
207 #ifdef HAVE_PSI_INTERFACE
208   init_archive_psi_keys();
209 #endif
210 
211   archive_hton= (handlerton *)p;
212   archive_hton->state= SHOW_OPTION_YES;
213   archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
214   archive_hton->create= archive_create_handler;
215   archive_hton->flags= HTON_NO_FLAGS;
216   archive_hton->discover= archive_discover;
217 
218   DBUG_RETURN(0);
219 }
220 
221 
Archive_share()222 Archive_share::Archive_share()
223 {
224   crashed= false;
225   in_optimize= false;
226   archive_write_open= false;
227   dirty= false;
228   DBUG_PRINT("ha_archive", ("Archive_share: %p",
229                             this));
230   thr_lock_init(&lock);
231   /*
232     We will use this lock for rows.
233   */
234   mysql_mutex_init(az_key_mutex_Archive_share_mutex,
235                    &mutex, MY_MUTEX_INIT_FAST);
236 }
237 
238 
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)239 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
240   :handler(hton, table_arg), share(NULL), delayed_insert(0), bulk_insert(0)
241 {
242   /* Set our original buffer from pre-allocated memory */
243   buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
244 
245   /* The size of the offset value we will use for position() */
246   ref_length= sizeof(my_off_t);
247   archive_reader_open= FALSE;
248 }
249 
archive_discover(handlerton * hton,THD * thd,const char * db,const char * name,uchar ** frmblob,size_t * frmlen)250 int archive_discover(handlerton *hton, THD* thd, const char *db,
251                      const char *name,
252                      uchar **frmblob,
253                      size_t *frmlen)
254 {
255   DBUG_ENTER("archive_discover");
256   DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
257   azio_stream frm_stream;
258   char az_file[FN_REFLEN];
259   char *frm_ptr;
260   MY_STAT file_stat;
261 
262   build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
263 
264   if (!(mysql_file_stat(arch_key_file_data, az_file, &file_stat, MYF(0))))
265     goto err;
266 
267   if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
268   {
269     if (errno == EROFS || errno == EACCES)
270       DBUG_RETURN(my_errno= errno);
271     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
272   }
273 
274   if (frm_stream.frm_length == 0)
275     goto err;
276 
277   frm_ptr= (char *)my_malloc(sizeof(char) * frm_stream.frm_length, MYF(0));
278   azread_frm(&frm_stream, frm_ptr);
279   azclose(&frm_stream);
280 
281   *frmlen= frm_stream.frm_length;
282   *frmblob= (uchar*) frm_ptr;
283 
284   DBUG_RETURN(0);
285 err:
286   my_errno= 0;
287   DBUG_RETURN(1);
288 }
289 
save_auto_increment(TABLE * table,ulonglong * value)290 static void save_auto_increment(TABLE *table, ulonglong *value)
291 {
292   Field *field= table->found_next_number_field;
293   ulonglong auto_value=
294     (ulonglong) field->val_int(table->record[0] +
295                                field->offset(table->record[0]));
296   if (*value <= auto_value)
297     *value= auto_value + 1;
298 }
299 
300 /**
301   @brief Read version 1 meta file (5.0 compatibility routine).
302 
303   @return Completion status
304     @retval  0 Success
305     @retval !0 Failure
306 */
307 
read_v1_metafile()308 int Archive_share::read_v1_metafile()
309 {
310   char file_name[FN_REFLEN];
311   uchar buf[META_V1_LENGTH];
312   File fd;
313   DBUG_ENTER("Archive_share::read_v1_metafile");
314 
315   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
316   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
317     DBUG_RETURN(-1);
318 
319   if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
320   {
321     mysql_file_close(fd, MYF(0));
322     DBUG_RETURN(-1);
323   }
324 
325   rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
326   crashed= buf[META_V1_OFFSET_CRASHED];
327   mysql_file_close(fd, MYF(0));
328   DBUG_RETURN(0);
329 }
330 
331 
332 /**
333   @brief Write version 1 meta file (5.0 compatibility routine).
334 
335   @return Completion status
336     @retval  0 Success
337     @retval !0 Failure
338 */
339 
write_v1_metafile()340 int Archive_share::write_v1_metafile()
341 {
342   char file_name[FN_REFLEN];
343   uchar buf[META_V1_LENGTH];
344   File fd;
345   DBUG_ENTER("Archive_share::write_v1_metafile");
346 
347   buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
348   buf[META_V1_OFFSET_VERSION]= 1;
349   int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
350   int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
351   buf[META_V1_OFFSET_CRASHED]= crashed;
352 
353   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
354   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
355     DBUG_RETURN(-1);
356 
357   if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
358   {
359     mysql_file_close(fd, MYF(0));
360     DBUG_RETURN(-1);
361   }
362 
363   mysql_file_close(fd, MYF(0));
364   DBUG_RETURN(0);
365 }
366 
367 
368 /**
369   @brief Pack version 1 row (5.0 compatibility routine).
370 
371   @param[in]  record  the record to pack
372 
373   @return Length of packed row
374 */
375 
pack_row_v1(uchar * record)376 unsigned int ha_archive::pack_row_v1(uchar *record)
377 {
378   uint *blob, *end;
379   uchar *pos;
380   DBUG_ENTER("pack_row_v1");
381   memcpy(record_buffer->buffer, record, table->s->reclength);
382   pos= record_buffer->buffer + table->s->reclength;
383   for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
384        blob != end; blob++)
385   {
386     uint32 length= ((Field_blob *) table->field[*blob])->get_length();
387     if (length)
388     {
389       uchar *data_ptr;
390       ((Field_blob *) table->field[*blob])->get_ptr(&data_ptr);
391       memcpy(pos, data_ptr, length);
392       pos+= length;
393     }
394   }
395   DBUG_RETURN(pos - record_buffer->buffer);
396 }
397 
398 
399 /*
400   This method reads the header of a datafile and returns whether or not it was successful.
401 */
read_data_header(azio_stream * file_to_read)402 int ha_archive::read_data_header(azio_stream *file_to_read)
403 {
404   int error;
405   unsigned long ret;
406   uchar data_buffer[DATA_BUFFER_SIZE];
407   DBUG_ENTER("ha_archive::read_data_header");
408 
409   if (azrewind(file_to_read) == -1)
410     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
411 
412   if (file_to_read->version >= 3)
413     DBUG_RETURN(0);
414   /* Everything below this is just legacy to version 2< */
415 
416   DBUG_PRINT("ha_archive", ("Reading legacy data header"));
417 
418   ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
419 
420   if (ret != DATA_BUFFER_SIZE)
421   {
422     DBUG_PRINT("ha_archive", ("Reading, expected %d got %lu",
423                               DATA_BUFFER_SIZE, ret));
424     DBUG_RETURN(1);
425   }
426 
427   if (error)
428   {
429     DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
430     DBUG_RETURN(1);
431   }
432 
433   DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
434   DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
435 
436   if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
437       (data_buffer[1] != (uchar)ARCHIVE_VERSION))
438     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
439 
440   DBUG_RETURN(0);
441 }
442 
443 
444 /*
445   We create the shared memory space that we will use for the open table.
446   No matter what we try to get or create a share. This is so that a repair
447   table operation can occur.
448 
449   See ha_example.cc for a longer description.
450 */
get_share(const char * table_name,int * rc)451 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
452 {
453   Archive_share *tmp_share;
454 
455   DBUG_ENTER("ha_archive::get_share");
456 
457   lock_shared_ha_data();
458   if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
459   {
460     azio_stream archive_tmp;
461 
462     tmp_share= new Archive_share;
463 
464     if (!tmp_share)
465     {
466       *rc= HA_ERR_OUT_OF_MEM;
467       goto err;
468     }
469     DBUG_PRINT("ha_archive", ("new Archive_share: %p",
470                               tmp_share));
471 
472     fn_format(tmp_share->data_file_name, table_name, "",
473               ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
474     strmov(tmp_share->table_name, table_name);
475     DBUG_PRINT("ha_archive", ("Data File %s",
476                         tmp_share->data_file_name));
477 
478     /*
479       We read the meta file, but do not mark it dirty. Since we are not
480       doing a write we won't mark it dirty (and we won't open it for
481       anything but reading... open it for write and we will generate null
482       compression writes).
483     */
484     if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
485     {
486       delete tmp_share;
487       *rc= my_errno ? my_errno : HA_ERR_CRASHED;
488       tmp_share= NULL;
489       goto err;
490     }
491     stats.auto_increment_value= archive_tmp.auto_increment + 1;
492     tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
493     tmp_share->crashed= archive_tmp.dirty;
494     share= tmp_share;
495     if (archive_tmp.version == 1)
496       share->read_v1_metafile();
497     azclose(&archive_tmp);
498 
499     set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
500   }
501   if (tmp_share->crashed)
502     *rc= HA_ERR_CRASHED_ON_USAGE;
503 err:
504   unlock_shared_ha_data();
505 
506   DBUG_ASSERT(tmp_share || *rc);
507 
508   DBUG_RETURN(tmp_share);
509 }
510 
511 
init_archive_writer()512 int Archive_share::init_archive_writer()
513 {
514   DBUG_ENTER("Archive_share::init_archive_writer");
515   /*
516     It is expensive to open and close the data files and since you can't have
517     a gzip file that can be both read and written we keep a writer open
518     that is shared amoung all open tables.
519   */
520   if (!(azopen(&archive_write, data_file_name,
521                O_RDWR|O_BINARY)))
522   {
523     DBUG_PRINT("ha_archive", ("Could not open archive write file"));
524     crashed= true;
525     DBUG_RETURN(1);
526   }
527   archive_write_open= true;
528 
529   DBUG_RETURN(0);
530 }
531 
532 
close_archive_writer()533 void Archive_share::close_archive_writer()
534 {
535   mysql_mutex_assert_owner(&mutex);
536   if (archive_write_open)
537   {
538     if (archive_write.version == 1)
539       (void) write_v1_metafile();
540     azclose(&archive_write);
541     archive_write_open= false;
542     dirty= false;
543   }
544 }
545 
546 
547 /*
548   No locks are required because it is associated with just one handler instance
549 */
init_archive_reader()550 int ha_archive::init_archive_reader()
551 {
552   DBUG_ENTER("ha_archive::init_archive_reader");
553   /*
554     It is expensive to open and close the data files and since you can't have
555     a gzip file that can be both read and written we keep a writer open
556     that is shared amoung all open tables, but have one reader open for
557     each handler instance.
558   */
559   if (!archive_reader_open)
560   {
561     if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
562     {
563       DBUG_PRINT("ha_archive", ("Could not open archive read file"));
564       share->crashed= TRUE;
565       DBUG_RETURN(1);
566     }
567     archive_reader_open= TRUE;
568   }
569 
570   DBUG_RETURN(0);
571 }
572 
573 
574 /*
575   We just implement one additional file extension.
576 */
577 static const char *ha_archive_exts[] = {
578   ARZ,
579   NullS
580 };
581 
bas_ext() const582 const char **ha_archive::bas_ext() const
583 {
584   return ha_archive_exts;
585 }
586 
587 
588 /*
589   When opening a file we:
590   Create/get our shared structure.
591   Init out lock.
592   We open the file we will read from.
593 */
open(const char * name,int mode,uint open_options)594 int ha_archive::open(const char *name, int mode, uint open_options)
595 {
596   int rc= 0;
597   DBUG_ENTER("ha_archive::open");
598 
599   DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
600                       (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
601   share= get_share(name, &rc);
602   if (!share)
603     DBUG_RETURN(rc);
604 
605   /* Allow open on crashed table in repair mode only. */
606   switch (rc)
607   {
608   case 0:
609     break;
610   case HA_ERR_CRASHED_ON_USAGE:
611     if (open_options & HA_OPEN_FOR_REPAIR)
612       break;
613     /* fall through */
614   default:
615     DBUG_RETURN(rc);
616   }
617 
618   record_buffer= create_record_buffer(table->s->reclength +
619                                       ARCHIVE_ROW_HEADER_SIZE);
620 
621   if (!record_buffer)
622     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
623 
624   thr_lock_data_init(&share->lock, &lock, NULL);
625 
626   DBUG_PRINT("ha_archive", ("archive table was crashed %s",
627                       rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
628   if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
629   {
630     DBUG_RETURN(0);
631   }
632 
633   DBUG_RETURN(rc);
634 }
635 
636 
637 /*
638   Closes the file.
639 
640   SYNOPSIS
641     close();
642 
643   IMPLEMENTATION:
644 
645   We first close this storage engines file handle to the archive and
646   then remove our reference count to the table (and possibly free it
647   as well).
648 
649   RETURN
650     0  ok
651     1  Error
652 */
653 
close(void)654 int ha_archive::close(void)
655 {
656   int rc= 0;
657   DBUG_ENTER("ha_archive::close");
658 
659   destroy_record_buffer(record_buffer);
660 
661   if (archive_reader_open)
662   {
663     if (azclose(&archive))
664       rc= 1;
665   }
666 
667   DBUG_RETURN(rc);
668 }
669 
670 
frm_load(const char * name,azio_stream * dst)671 void ha_archive::frm_load(const char *name, azio_stream *dst)
672 {
673   char name_buff[FN_REFLEN];
674   MY_STAT file_stat;
675   File frm_file;
676   uchar *frm_ptr;
677   DBUG_ENTER("ha_archive::frm_load");
678   fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME);
679 
680   /* Here is where we open up the frm and pass it to archive to store */
681   if ((frm_file= mysql_file_open(arch_key_file_frm, name_buff, O_RDONLY, MYF(0))) >= 0)
682   {
683     if (!mysql_file_fstat(frm_file, &file_stat, MYF(MY_WME)))
684     {
685       frm_ptr= (uchar *) my_malloc(sizeof(uchar) * (size_t) file_stat.st_size, MYF(0));
686       if (frm_ptr)
687       {
688         if (mysql_file_read(frm_file, frm_ptr, (size_t) file_stat.st_size, MYF(0)) ==
689             (size_t) file_stat.st_size)
690           azwrite_frm(dst, (char *) frm_ptr, (size_t) file_stat.st_size);
691         my_free(frm_ptr);
692       }
693     }
694     mysql_file_close(frm_file, MYF(0));
695   }
696   DBUG_VOID_RETURN;
697 }
698 
699 
700 /**
701   Copy a frm blob between streams.
702 
703   @param[in]  src   The source stream.
704   @param[in]  dst   The destination stream.
705 
706   @return Zero on success, non-zero otherwise.
707 */
708 
frm_copy(azio_stream * src,azio_stream * dst)709 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
710 {
711   int rc= 0;
712   char *frm_ptr;
713 
714   /* If there is no .frm in source stream, try to read .frm from file. */
715   if (!src->frm_length)
716   {
717     frm_load(table->s->normalized_path.str, dst);
718     return 0;
719   }
720 
721   if (!(frm_ptr= (char *) my_malloc(src->frm_length, MYF(0))))
722     return HA_ERR_OUT_OF_MEM;
723 
724   /* Write file offset is set to the end of the file. */
725   if (azread_frm(src, frm_ptr) ||
726       azwrite_frm(dst, frm_ptr, src->frm_length))
727     rc= my_errno ? my_errno : HA_ERR_INTERNAL_ERROR;
728 
729   my_free(frm_ptr);
730 
731   return rc;
732 }
733 
734 
735 /*
736   We create our data file here. The format is pretty simple.
737   You can read about the format of the data file above.
738   Unlike other storage engines we do not "pack" our data. Since we
739   are about to do a general compression, packing would just be a waste of
740   CPU time. If the table has blobs they are written after the row in the order
741   of creation.
742 */
743 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)744 int ha_archive::create(const char *name, TABLE *table_arg,
745                        HA_CREATE_INFO *create_info)
746 {
747   char name_buff[FN_REFLEN];
748   char linkname[FN_REFLEN];
749   int error;
750   azio_stream create_stream;            /* Archive file we are working with */
751   MY_STAT file_stat;  // Stat information for the data file
752 
753   DBUG_ENTER("ha_archive::create");
754 
755   stats.auto_increment_value= create_info->auto_increment_value;
756 
757   for (uint key= 0; key < table_arg->s->keys; key++)
758   {
759     KEY *pos= table_arg->key_info+key;
760     KEY_PART_INFO *key_part=     pos->key_part;
761     KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
762 
763     for (; key_part != key_part_end; key_part++)
764     {
765       Field *field= key_part->field;
766 
767       if (!(field->flags & AUTO_INCREMENT_FLAG))
768       {
769         error= -1;
770         DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
771         goto error;
772       }
773     }
774   }
775 
776   /*
777     We reuse name_buff since it is available.
778   */
779 #ifdef HAVE_READLINK
780   if (my_use_symdir &&
781       create_info->data_file_name &&
782       create_info->data_file_name[0] != '#')
783   {
784     DBUG_PRINT("ha_archive", ("archive will create stream file %s",
785                         create_info->data_file_name));
786 
787     fn_format(name_buff, create_info->data_file_name, "", ARZ,
788               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
789     fn_format(linkname, name, "", ARZ,
790               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
791   }
792   else
793 #endif /* HAVE_READLINK */
794   {
795     if (create_info->data_file_name)
796     {
797       push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
798                           WARN_OPTION_IGNORED,
799                           ER_DEFAULT(WARN_OPTION_IGNORED),
800                           "DATA DIRECTORY");
801     }
802     fn_format(name_buff, name, "", ARZ,
803               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
804     linkname[0]= 0;
805   }
806 
807   /* Archive engine never uses INDEX DIRECTORY. */
808   if (create_info->index_file_name)
809   {
810     push_warning_printf(table_arg->in_use, Sql_condition::WARN_LEVEL_WARN,
811                         WARN_OPTION_IGNORED,
812                         ER_DEFAULT(WARN_OPTION_IGNORED),
813                         "INDEX DIRECTORY");
814   }
815 
816   /*
817     There is a chance that the file was "discovered". In this case
818     just use whatever file is there.
819   */
820   if (!(mysql_file_stat(arch_key_file_data, name_buff, &file_stat, MYF(0))))
821   {
822     my_errno= 0;
823     if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
824     {
825       error= errno;
826       goto error2;
827     }
828 
829     if (linkname[0])
830       my_symlink(name_buff, linkname, MYF(0));
831 
832     frm_load(name, &create_stream);
833 
834     if (create_info->comment.str)
835       azwrite_comment(&create_stream, create_info->comment.str,
836                       create_info->comment.length);
837 
838     /*
839       Yes you need to do this, because the starting value
840       for the autoincrement may not be zero.
841     */
842     create_stream.auto_increment= stats.auto_increment_value ?
843                                     stats.auto_increment_value - 1 : 0;
844     if (azclose(&create_stream))
845     {
846       error= errno;
847       goto error2;
848     }
849   }
850   else
851     my_errno= 0;
852 
853   DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
854   DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
855 
856 
857   DBUG_RETURN(0);
858 
859 error2:
860   delete_table(name);
861 error:
862   /* Return error number, if we got one */
863   DBUG_RETURN(error ? error : -1);
864 }
865 
866 /*
867   This is where the actual row is written out.
868 */
real_write_row(uchar * buf,azio_stream * writer)869 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
870 {
871   my_off_t written;
872   unsigned int r_pack_length;
873   DBUG_ENTER("ha_archive::real_write_row");
874 
875   /* We pack the row for writing */
876   r_pack_length= pack_row(buf, writer);
877 
878   written= azwrite(writer, record_buffer->buffer, r_pack_length);
879   if (written != r_pack_length)
880   {
881     DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
882                                               (uint32) written,
883                                               (uint32)r_pack_length));
884     DBUG_RETURN(-1);
885   }
886 
887   if (!delayed_insert || !bulk_insert)
888     share->dirty= TRUE;
889 
890   DBUG_RETURN(0);
891 }
892 
893 
894 /*
895   Calculate max length needed for row. This includes
896   the bytes required for the length in the header.
897 */
898 
max_row_length(const uchar * buf)899 uint32 ha_archive::max_row_length(const uchar *buf)
900 {
901   uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
902   length+= ARCHIVE_ROW_HEADER_SIZE;
903 
904   uint *ptr, *end;
905   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
906        ptr != end ;
907        ptr++)
908   {
909     if (!table->field[*ptr]->is_null())
910       length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
911   }
912 
913   return length;
914 }
915 
916 
pack_row(uchar * record,azio_stream * writer)917 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
918 {
919   uchar *ptr;
920 
921   DBUG_ENTER("ha_archive::pack_row");
922 
923 
924   if (fix_rec_buff(max_row_length(record)))
925     DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
926 
927   if (writer->version == 1)
928     DBUG_RETURN(pack_row_v1(record));
929 
930   /* Copy null bits */
931   memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
932          record, table->s->null_bytes);
933   ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
934 
935   for (Field **field=table->field ; *field ; field++)
936   {
937     if (!((*field)->is_null()))
938       ptr= (*field)->pack(ptr, record + (*field)->offset(record));
939   }
940 
941   int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
942                                          ARCHIVE_ROW_HEADER_SIZE));
943   DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
944                            (ptr - record_buffer->buffer -
945                              ARCHIVE_ROW_HEADER_SIZE)));
946 
947   DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
948 }
949 
950 
951 /*
952   Look at ha_archive::open() for an explanation of the row format.
953   Here we just write out the row.
954 
955   Wondering about start_bulk_insert()? We don't implement it for
956   archive since it optimizes for lots of writes. The only save
957   for implementing start_bulk_insert() is that we could skip
958   setting dirty to true each time.
959 */
write_row(uchar * buf)960 int ha_archive::write_row(uchar *buf)
961 {
962   int rc;
963   uchar *read_buf= NULL;
964   ulonglong temp_auto;
965   uchar *record=  table->record[0];
966   DBUG_ENTER("ha_archive::write_row");
967 
968   if (share->crashed)
969     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
970 
971   ha_statistic_increment(&SSV::ha_write_count);
972   mysql_mutex_lock(&share->mutex);
973 
974   if (!share->archive_write_open && share->init_archive_writer())
975   {
976     rc= HA_ERR_CRASHED_ON_USAGE;
977     goto error;
978   }
979 
980   if (table->next_number_field && record == table->record[0])
981   {
982     KEY *mkey= &table->s->key_info[0]; // We only support one key right now
983     update_auto_increment();
984     temp_auto= (((Field_num*) table->next_number_field)->unsigned_flag ||
985                 table->next_number_field->val_int() > 0 ?
986                 table->next_number_field->val_int() : 0);
987 
988     /*
989       We don't support decremening auto_increment. They make the performance
990       just cry.
991     */
992     if (temp_auto <= share->archive_write.auto_increment &&
993         mkey->flags & HA_NOSAME)
994     {
995       rc= HA_ERR_FOUND_DUPP_KEY;
996       goto error;
997     }
998 #ifdef DEAD_CODE
999     /*
1000       Bad news, this will cause a search for the unique value which is very
1001       expensive since we will have to do a table scan which will lock up
1002       all other writers during this period. This could perhaps be optimized
1003       in the future.
1004     */
1005     {
1006       /*
1007         First we create a buffer that we can use for reading rows, and can pass
1008         to get_row().
1009       */
1010       if (!(read_buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1011       {
1012         rc= HA_ERR_OUT_OF_MEM;
1013         goto error;
1014       }
1015        /*
1016          All of the buffer must be written out or we won't see all of the
1017          data
1018        */
1019       azflush(&(share->archive_write), Z_SYNC_FLUSH);
1020       /*
1021         Set the position of the local read thread to the beginning position.
1022       */
1023       if (read_data_header(&archive))
1024       {
1025         rc= HA_ERR_CRASHED_ON_USAGE;
1026         goto error;
1027       }
1028 
1029       Field *mfield= table->next_number_field;
1030 
1031       while (!(get_row(&archive, read_buf)))
1032       {
1033         if (!memcmp(read_buf + mfield->offset(record),
1034                     table->next_number_field->ptr,
1035                     mfield->max_display_length()))
1036         {
1037           rc= HA_ERR_FOUND_DUPP_KEY;
1038           goto error;
1039         }
1040       }
1041     }
1042 #endif
1043     else
1044     {
1045       if (temp_auto > share->archive_write.auto_increment)
1046         stats.auto_increment_value=
1047           (share->archive_write.auto_increment= temp_auto) + 1;
1048     }
1049   }
1050 
1051   /*
1052     Notice that the global auto_increment has been increased.
1053     In case of a failed row write, we will never try to reuse the value.
1054   */
1055   share->rows_recorded++;
1056   rc= real_write_row(buf,  &(share->archive_write));
1057 error:
1058   mysql_mutex_unlock(&share->mutex);
1059   if (read_buf)
1060     my_free(read_buf);
1061   DBUG_RETURN(rc);
1062 }
1063 
1064 
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1065 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1066                                     ulonglong nb_desired_values,
1067                                     ulonglong *first_value,
1068                                     ulonglong *nb_reserved_values)
1069 {
1070   *nb_reserved_values= ULONGLONG_MAX;
1071   *first_value= share->archive_write.auto_increment + 1;
1072 }
1073 
1074 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1075 int ha_archive::index_init(uint keynr, bool sorted)
1076 {
1077   DBUG_ENTER("ha_archive::index_init");
1078   active_index= keynr;
1079   DBUG_RETURN(0);
1080 }
1081 
1082 
1083 /*
1084   No indexes, so if we get a request for an index search since we tell
1085   the optimizer that we have unique indexes, we scan
1086 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1087 int ha_archive::index_read(uchar *buf, const uchar *key,
1088                              uint key_len, enum ha_rkey_function find_flag)
1089 {
1090   int rc;
1091   DBUG_ENTER("ha_archive::index_read");
1092   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1093   rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1094   MYSQL_INDEX_READ_ROW_DONE(rc);
1095   DBUG_RETURN(rc);
1096 }
1097 
1098 
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1099 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1100                                  uint key_len, enum ha_rkey_function find_flag)
1101 {
1102   int rc;
1103   bool found= 0;
1104   KEY *mkey= &table->s->key_info[index];
1105   current_k_offset= mkey->key_part->offset;
1106   current_key= key;
1107   current_key_len= key_len;
1108 
1109 
1110   DBUG_ENTER("ha_archive::index_read_idx");
1111 
1112   rc= rnd_init(TRUE);
1113 
1114   if (rc)
1115     goto error;
1116 
1117   while (!(get_row(&archive, buf)))
1118   {
1119     if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1120     {
1121       found= 1;
1122       break;
1123     }
1124   }
1125 
1126   if (found)
1127   {
1128     /* notify handler that a record has been found */
1129     table->status= 0;
1130     DBUG_RETURN(0);
1131   }
1132 
1133 error:
1134   DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1135 }
1136 
1137 
index_next(uchar * buf)1138 int ha_archive::index_next(uchar * buf)
1139 {
1140   bool found= 0;
1141   int rc;
1142 
1143   DBUG_ENTER("ha_archive::index_next");
1144   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1145 
1146   while (!(get_row(&archive, buf)))
1147   {
1148     if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1149     {
1150       found= 1;
1151       break;
1152     }
1153   }
1154 
1155   rc= found ? 0 : HA_ERR_END_OF_FILE;
1156   MYSQL_INDEX_READ_ROW_DONE(rc);
1157   DBUG_RETURN(rc);
1158 }
1159 
1160 /*
1161   All calls that need to scan the table start with this method. If we are told
1162   that it is a table scan we rewind the file to the beginning, otherwise
1163   we assume the position will be set.
1164 */
1165 
rnd_init(bool scan)1166 int ha_archive::rnd_init(bool scan)
1167 {
1168   DBUG_ENTER("ha_archive::rnd_init");
1169 
1170   if (share->crashed)
1171       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1172 
1173   init_archive_reader();
1174 
1175   /* We rewind the file so that we can read from the beginning if scan */
1176   if (scan)
1177   {
1178     scan_rows= stats.records;
1179     DBUG_PRINT("info", ("archive will retrieve %llu rows",
1180                         (unsigned long long) scan_rows));
1181 
1182     if (read_data_header(&archive))
1183       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1184   }
1185 
1186   DBUG_RETURN(0);
1187 }
1188 
1189 
1190 /*
1191   This is the method that is used to read a row. It assumes that the row is
1192   positioned where you want it.
1193 */
get_row(azio_stream * file_to_read,uchar * buf)1194 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1195 {
1196   int rc;
1197   DBUG_ENTER("ha_archive::get_row");
1198   DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1199                             (uchar)file_to_read->version,
1200                             ARCHIVE_VERSION));
1201   if (file_to_read->version == ARCHIVE_VERSION)
1202     rc= get_row_version3(file_to_read, buf);
1203   else
1204     rc= get_row_version2(file_to_read, buf);
1205 
1206   DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1207 
1208   DBUG_RETURN(rc);
1209 }
1210 
1211 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1212 bool ha_archive::fix_rec_buff(unsigned int length)
1213 {
1214   DBUG_ENTER("ha_archive::fix_rec_buff");
1215   DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1216                             length, record_buffer->length));
1217   DBUG_ASSERT(record_buffer->buffer);
1218 
1219   if (length > record_buffer->length)
1220   {
1221     uchar *newptr;
1222     if (!(newptr=(uchar*) my_realloc((uchar*) record_buffer->buffer,
1223                                     length,
1224 				    MYF(MY_ALLOW_ZERO_PTR))))
1225       DBUG_RETURN(1);
1226     record_buffer->buffer= newptr;
1227     record_buffer->length= length;
1228   }
1229 
1230   DBUG_ASSERT(length <= record_buffer->length);
1231 
1232   DBUG_RETURN(0);
1233 }
1234 
unpack_row(azio_stream * file_to_read,uchar * record)1235 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1236 {
1237   DBUG_ENTER("ha_archive::unpack_row");
1238 
1239   unsigned int read;
1240   int error;
1241   uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE], *size_buffer_p= size_buffer;
1242   unsigned int row_len;
1243 
1244   /* First we grab the length stored */
1245   read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1246 
1247   if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1248     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1249 
1250   /* If we read nothing we are at the end of the file */
1251   if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1252     DBUG_RETURN(HA_ERR_END_OF_FILE);
1253 
1254   row_len= uint4korr(size_buffer_p);
1255   DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1256                            (unsigned int)table->s->reclength));
1257 
1258   if (fix_rec_buff(row_len))
1259   {
1260     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1261   }
1262   DBUG_ASSERT(row_len <= record_buffer->length);
1263 
1264   read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1265 
1266   if (read != row_len || error)
1267   {
1268     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1269   }
1270 
1271   /* Copy null bits */
1272   const uchar *ptr= record_buffer->buffer;
1273   /*
1274     Field::unpack() is not called when field is NULL. For VARCHAR
1275     Field::unpack() only unpacks as much bytes as occupied by field
1276     value. In these cases respective memory area on record buffer is
1277     not initialized.
1278 
1279     These uninitialized areas may be accessed by CHECKSUM TABLE or
1280     by optimizer using temporary table (BUG#12997905). We may remove
1281     this memset() when they're fixed.
1282   */
1283   memset(record, 0, table->s->reclength);
1284   memcpy(record, ptr, table->s->null_bytes);
1285   ptr+= table->s->null_bytes;
1286   for (Field **field=table->field ; *field ; field++)
1287   {
1288     if (!((*field)->is_null_in_record(record)))
1289     {
1290       ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1291     }
1292   }
1293   DBUG_RETURN(0);
1294 }
1295 
1296 
get_row_version3(azio_stream * file_to_read,uchar * buf)1297 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1298 {
1299   DBUG_ENTER("ha_archive::get_row_version3");
1300 
1301   int returnable= unpack_row(file_to_read, buf);
1302 
1303   DBUG_RETURN(returnable);
1304 }
1305 
1306 
get_row_version2(azio_stream * file_to_read,uchar * buf)1307 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1308 {
1309   unsigned int read;
1310   int error;
1311   uint *ptr, *end;
1312   char *last;
1313   size_t total_blob_length= 0;
1314   MY_BITMAP *read_set= table->read_set;
1315   DBUG_ENTER("ha_archive::get_row_version2");
1316 
1317   read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1318 
1319   /* If we read nothing we are at the end of the file */
1320   if (read == 0)
1321     DBUG_RETURN(HA_ERR_END_OF_FILE);
1322 
1323   if (read != table->s->reclength)
1324   {
1325     DBUG_PRINT("ha_archive::get_row_version2", ("Read %u bytes expected %u",
1326                                                 read,
1327                                                 (unsigned int)table->s->reclength));
1328     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1329   }
1330 
1331   if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1332     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1333 
1334   /*
1335     If the record is the wrong size, the file is probably damaged, unless
1336     we are dealing with a delayed insert or a bulk insert.
1337   */
1338   if ((ulong) read != table->s->reclength)
1339     DBUG_RETURN(HA_ERR_END_OF_FILE);
1340 
1341   /* Calculate blob length, we use this for our buffer */
1342   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1343        ptr != end ;
1344        ptr++)
1345   {
1346     if (bitmap_is_set(read_set,
1347                       (((Field_blob*) table->field[*ptr])->field_index)))
1348         total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1349   }
1350 
1351   /* Adjust our row buffer if we need be */
1352   buffer.alloc(total_blob_length);
1353   last= (char *)buffer.ptr();
1354 
1355   /* Loop through our blobs and read them */
1356   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1357        ptr != end ;
1358        ptr++)
1359   {
1360     size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1361     if (size)
1362     {
1363       if (bitmap_is_set(read_set,
1364                         ((Field_blob*) table->field[*ptr])->field_index))
1365       {
1366         read= azread(file_to_read, last, size, &error);
1367 
1368         if (error)
1369           DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1370 
1371         if ((size_t) read != size)
1372           DBUG_RETURN(HA_ERR_END_OF_FILE);
1373         ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1374         last += size;
1375       }
1376       else
1377       {
1378         (void)azseek(file_to_read, size, SEEK_CUR);
1379       }
1380     }
1381   }
1382   DBUG_RETURN(0);
1383 }
1384 
1385 
1386 /*
1387   Called during ORDER BY. Its position is either from being called sequentially
1388   or by having had ha_archive::rnd_pos() called before it is called.
1389 */
1390 
rnd_next(uchar * buf)1391 int ha_archive::rnd_next(uchar *buf)
1392 {
1393   int rc;
1394   DBUG_ENTER("ha_archive::rnd_next");
1395   MYSQL_READ_ROW_START(table_share->db.str,
1396                        table_share->table_name.str, TRUE);
1397 
1398   if (share->crashed)
1399       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1400 
1401   if (!scan_rows)
1402   {
1403     rc= HA_ERR_END_OF_FILE;
1404     goto end;
1405   }
1406   scan_rows--;
1407 
1408   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1409   current_position= aztell(&archive);
1410   rc= get_row(&archive, buf);
1411 
1412   table->status=rc ? STATUS_NOT_FOUND: 0;
1413 
1414 end:
1415   MYSQL_READ_ROW_DONE(rc);
1416   DBUG_RETURN(rc);
1417 }
1418 
1419 
1420 /*
1421   Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1422   each call to ha_archive::rnd_next() if an ordering of the rows is
1423   needed.
1424 */
1425 
position(const uchar * record)1426 void ha_archive::position(const uchar *record)
1427 {
1428   DBUG_ENTER("ha_archive::position");
1429   my_store_ptr(ref, ref_length, current_position);
1430   DBUG_VOID_RETURN;
1431 }
1432 
1433 
1434 /*
1435   This is called after a table scan for each row if the results of the
1436   scan need to be ordered. It will take *pos and use it to move the
1437   cursor in the file so that the next row that is called is the
1438   correctly ordered row.
1439 */
1440 
rnd_pos(uchar * buf,uchar * pos)1441 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1442 {
1443   int rc;
1444   DBUG_ENTER("ha_archive::rnd_pos");
1445   MYSQL_READ_ROW_START(table_share->db.str,
1446                        table_share->table_name.str, FALSE);
1447   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1448   current_position= (my_off_t)my_get_ptr(pos, ref_length);
1449   if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1450   {
1451     rc= HA_ERR_CRASHED_ON_USAGE;
1452     goto end;
1453   }
1454   rc= get_row(&archive, buf);
1455 end:
1456   MYSQL_READ_ROW_DONE(rc);
1457   DBUG_RETURN(rc);
1458 }
1459 
1460 /*
1461   This method repairs the meta file. It does this by walking the datafile and
1462   rewriting the meta file. If EXTENDED repair is requested, we attempt to
1463   recover as much data as possible.
1464 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1465 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1466 {
1467   DBUG_ENTER("ha_archive::repair");
1468   int rc= optimize(thd, check_opt);
1469 
1470   if (rc)
1471     DBUG_RETURN(HA_ADMIN_CORRUPT);
1472 
1473   share->crashed= FALSE;
1474   DBUG_RETURN(0);
1475 }
1476 
1477 /*
1478   The table can become fragmented if data was inserted, read, and then
1479   inserted again. What we do is open up the file and recompress it completely.
1480 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1481 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1482 {
1483   int rc= 0;
1484   azio_stream writer;
1485   ha_rows count;
1486   my_bitmap_map *org_bitmap;
1487   char writer_filename[FN_REFLEN];
1488   DBUG_ENTER("ha_archive::optimize");
1489 
1490   mysql_mutex_lock(&share->mutex);
1491   if (share->in_optimize)
1492   {
1493     mysql_mutex_unlock(&share->mutex);
1494     DBUG_RETURN(HA_ADMIN_FAILED);
1495   }
1496   share->in_optimize= true;
1497   /* remember the number of rows */
1498   count= share->rows_recorded;
1499   if (share->archive_write_open)
1500     azflush(&share->archive_write, Z_SYNC_FLUSH);
1501   mysql_mutex_unlock(&share->mutex);
1502 
1503   init_archive_reader();
1504 
1505   /* Lets create a file to contain the new data */
1506   fn_format(writer_filename, share->table_name, "", ARN,
1507             MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1508 
1509   if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1510   {
1511     share->in_optimize= false;
1512     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1513   }
1514 
1515   /*
1516     Transfer the embedded FRM so that the file can be discoverable.
1517     Write file offset is set to the end of the file.
1518   */
1519   if ((rc= frm_copy(&archive, &writer)))
1520   {
1521     share->in_optimize= false;
1522     goto error;
1523   }
1524   /*
1525     An extended rebuild is a lot more effort. We open up each row and re-record it.
1526     Any dead rows are removed (aka rows that may have been partially recorded).
1527 
1528     As of Archive format 3, this is the only type that is performed, before this
1529     version it was just done on T_EXTEND
1530   */
1531 
1532   DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1533 
1534   /*
1535     Now we will rewind the archive file so that we are positioned at the
1536     start of the file.
1537   */
1538   if ((rc= read_data_header(&archive)))
1539   {
1540     share->in_optimize= false;
1541     goto error;
1542   }
1543 
1544   stats.auto_increment_value= 1;
1545   org_bitmap= tmp_use_all_columns(table, table->read_set);
1546   /* read rows upto the remembered rows */
1547   for (ha_rows cur_count= count; cur_count; cur_count--)
1548   {
1549     if ((rc= get_row(&archive, table->record[0])))
1550       break;
1551     real_write_row(table->record[0], &writer);
1552     if (table->found_next_number_field)
1553       save_auto_increment(table, &stats.auto_increment_value);
1554   }
1555 
1556   mysql_mutex_lock(&share->mutex);
1557 
1558   share->close_archive_writer();
1559   if (!rc)
1560   {
1561     /* read the remaining rows */
1562     for (count= share->rows_recorded - count; count; count--)
1563     {
1564       if ((rc= get_row(&archive, table->record[0])))
1565         break;
1566       real_write_row(table->record[0], &writer);
1567       if (table->found_next_number_field)
1568         save_auto_increment(table, &stats.auto_increment_value);
1569     }
1570   }
1571 
1572   tmp_restore_column_map(table->read_set, org_bitmap);
1573   share->rows_recorded= (ha_rows) writer.rows;
1574   share->archive_write.auto_increment= stats.auto_increment_value - 1;
1575   DBUG_PRINT("info", ("recovered %llu archive rows",
1576                       (unsigned long long)share->rows_recorded));
1577 
1578   DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1579                       (unsigned long long)share->rows_recorded));
1580 
1581   /*
1582     If REPAIR ... EXTENDED is requested, try to recover as much data
1583     from data file as possible. In this case if we failed to read a
1584     record, we assume EOF. This allows massive data loss, but we can
1585     hardly do more with broken zlib stream. And this is the only way
1586     to restore at least what is still recoverable.
1587   */
1588   if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1589   {
1590     share->in_optimize= false;
1591     mysql_mutex_unlock(&share->mutex);
1592     goto error;
1593   }
1594 
1595   azclose(&writer);
1596   share->dirty= FALSE;
1597   azclose(&archive);
1598   archive_reader_open= FALSE;
1599 
1600   // make the file we just wrote be our data file
1601   rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1602   share->in_optimize= false;
1603   mysql_mutex_unlock(&share->mutex);
1604 
1605   DBUG_RETURN(rc);
1606 error:
1607   DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1608   azclose(&writer);
1609 
1610   DBUG_RETURN(rc);
1611 }
1612 
1613 /*
1614   Below is an example of how to setup row level locking.
1615 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1616 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1617                                        THR_LOCK_DATA **to,
1618                                        enum thr_lock_type lock_type)
1619 {
1620   if (lock_type == TL_WRITE_DELAYED)
1621     delayed_insert= TRUE;
1622   else
1623     delayed_insert= FALSE;
1624 
1625   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1626   {
1627     /*
1628       Here is where we get into the guts of a row level lock.
1629       If TL_UNLOCK is set
1630       If we are not doing a LOCK TABLE or DISCARD/IMPORT
1631       TABLESPACE, then allow multiple writers
1632     */
1633 
1634     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1635          lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1636         && !thd_tablespace_op(thd))
1637       lock_type = TL_WRITE_ALLOW_WRITE;
1638 
1639     /*
1640       In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1641       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1642       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1643       to t2. Convert the lock to a normal read lock to allow
1644       concurrent inserts to t2.
1645     */
1646 
1647     if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1648       lock_type = TL_READ;
1649 
1650     lock.type=lock_type;
1651   }
1652 
1653   *to++= &lock;
1654 
1655   return to;
1656 }
1657 
update_create_info(HA_CREATE_INFO * create_info)1658 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1659 {
1660   char tmp_real_path[FN_REFLEN];
1661   DBUG_ENTER("ha_archive::update_create_info");
1662 
1663   ha_archive::info(HA_STATUS_AUTO);
1664   if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1665   {
1666     create_info->auto_increment_value= stats.auto_increment_value;
1667   }
1668 
1669   if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1670     create_info->data_file_name= sql_strdup(tmp_real_path);
1671 
1672   DBUG_VOID_RETURN;
1673 }
1674 
1675 
1676 /*
1677   Hints for optimizer, see ha_tina for more information
1678 */
info(uint flag)1679 int ha_archive::info(uint flag)
1680 {
1681   DBUG_ENTER("ha_archive::info");
1682 
1683   mysql_mutex_lock(&share->mutex);
1684   if (share->dirty)
1685   {
1686     DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1687     DBUG_ASSERT(share->archive_write_open);
1688     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1689     share->dirty= FALSE;
1690   }
1691 
1692   /*
1693     This should be an accurate number now, though bulk and delayed inserts can
1694     cause the number to be inaccurate.
1695   */
1696   stats.records= share->rows_recorded;
1697   mysql_mutex_unlock(&share->mutex);
1698 
1699   stats.deleted= 0;
1700 
1701   DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1702   /* Costs quite a bit more to get all information */
1703   if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1704   {
1705     MY_STAT file_stat;  // Stat information for the data file
1706 
1707     (void) mysql_file_stat(arch_key_file_data, share->data_file_name, &file_stat, MYF(MY_WME));
1708 
1709     if (flag & HA_STATUS_TIME)
1710       stats.update_time= (ulong) file_stat.st_mtime;
1711     if (flag & HA_STATUS_CONST)
1712     {
1713       stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1714       stats.max_data_file_length= MAX_FILE_SIZE;
1715       stats.create_time= (ulong) file_stat.st_ctime;
1716     }
1717     if (flag & HA_STATUS_VARIABLE)
1718     {
1719       stats.delete_length= 0;
1720       stats.data_file_length= file_stat.st_size;
1721       stats.index_file_length=0;
1722       stats.mean_rec_length= stats.records ?
1723         ulong(stats.data_file_length / stats.records) : table->s->reclength;
1724     }
1725   }
1726 
1727   if (flag & HA_STATUS_AUTO)
1728   {
1729     /* TODO: Use the shared writer instead during the lock above. */
1730     init_archive_reader();
1731     mysql_mutex_lock(&share->mutex);
1732     azflush(&archive, Z_SYNC_FLUSH);
1733     mysql_mutex_unlock(&share->mutex);
1734     stats.auto_increment_value= archive.auto_increment + 1;
1735   }
1736 
1737   DBUG_RETURN(0);
1738 }
1739 
1740 
1741 /**
1742   Handler hints.
1743 
1744   @param operation  Operation to prepare for.
1745 
1746   @return Operation status
1747     @return 0    Success
1748     @return != 0 Error
1749 */
1750 
extra(enum ha_extra_function operation)1751 int ha_archive::extra(enum ha_extra_function operation)
1752 {
1753   int ret= 0;
1754   DBUG_ENTER("ha_archive::extra");
1755   /* On windows we need to close all files before rename/delete. */
1756 #ifdef __WIN__
1757   switch (operation)
1758   {
1759   case HA_EXTRA_PREPARE_FOR_RENAME:
1760   case HA_EXTRA_FORCE_REOPEN:
1761     /* Close both reader and writer so we don't have the file open. */
1762     if (archive_reader_open)
1763     {
1764       ret= azclose(&archive);
1765       archive_reader_open= false;
1766     }
1767     mysql_mutex_lock(&share->mutex);
1768     share->close_archive_writer();
1769     mysql_mutex_unlock(&share->mutex);
1770     break;
1771   default:
1772     /* Nothing to do. */
1773     ;
1774   }
1775 #endif
1776   DBUG_RETURN(ret);
1777 }
1778 
1779 
1780 /*
1781   This method tells us that a bulk insert operation is about to occur. We set
1782   a flag which will keep write_row from saying that its data is dirty. This in
1783   turn will keep selects from causing a sync to occur.
1784   Basically, yet another optimizations to keep compression working well.
1785 */
start_bulk_insert(ha_rows rows)1786 void ha_archive::start_bulk_insert(ha_rows rows)
1787 {
1788   DBUG_ENTER("ha_archive::start_bulk_insert");
1789   if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1790     bulk_insert= TRUE;
1791   DBUG_VOID_RETURN;
1792 }
1793 
1794 
1795 /*
1796   Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1797   flag, and set the share dirty so that the next select will call sync for us.
1798 */
end_bulk_insert()1799 int ha_archive::end_bulk_insert()
1800 {
1801   DBUG_ENTER("ha_archive::end_bulk_insert");
1802   bulk_insert= FALSE;
1803   mysql_mutex_lock(&share->mutex);
1804   if (share->archive_write_open)
1805     share->dirty= true;
1806   mysql_mutex_unlock(&share->mutex);
1807   DBUG_RETURN(0);
1808 }
1809 
1810 /*
1811   We cancel a truncate command. The only way to delete an archive table is to drop it.
1812   This is done for security reasons. In a later version we will enable this by
1813   allowing the user to select a different row format.
1814 */
truncate()1815 int ha_archive::truncate()
1816 {
1817   DBUG_ENTER("ha_archive::truncate");
1818   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1819 }
1820 
1821 /*
1822   We just return state if asked.
1823 */
is_crashed() const1824 bool ha_archive::is_crashed() const
1825 {
1826   DBUG_ENTER("ha_archive::is_crashed");
1827   DBUG_RETURN(share->crashed);
1828 }
1829 
1830 
1831 /**
1832   @brief Check for upgrade
1833 
1834   @param[in]  check_opt  check options
1835 
1836   @return Completion status
1837     @retval HA_ADMIN_OK            No upgrade required
1838     @retval HA_ADMIN_CORRUPT       Cannot read meta-data
1839     @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1840 */
1841 
check_for_upgrade(HA_CHECK_OPT * check_opt)1842 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1843 {
1844   DBUG_ENTER("ha_archive::check_for_upgrade");
1845   if (init_archive_reader())
1846     DBUG_RETURN(HA_ADMIN_CORRUPT);
1847   if (archive.version < ARCHIVE_VERSION)
1848     DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1849   DBUG_RETURN(HA_ADMIN_OK);
1850 }
1851 
1852 
1853 /*
1854   Simple scan of the tables to make sure everything is ok.
1855 */
1856 
check(THD * thd,HA_CHECK_OPT * check_opt)1857 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1858 {
1859   int rc= 0;
1860   const char *old_proc_info;
1861   ha_rows count;
1862   DBUG_ENTER("ha_archive::check");
1863 
1864   old_proc_info= thd_proc_info(thd, "Checking table");
1865   mysql_mutex_lock(&share->mutex);
1866   count= share->rows_recorded;
1867   /* Flush any waiting data */
1868   if (share->archive_write_open)
1869     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1870   mysql_mutex_unlock(&share->mutex);
1871 
1872   if (init_archive_reader())
1873     DBUG_RETURN(HA_ADMIN_CORRUPT);
1874   /*
1875     Now we will rewind the archive file so that we are positioned at the
1876     start of the file.
1877   */
1878   read_data_header(&archive);
1879   for (ha_rows cur_count= count; cur_count; cur_count--)
1880   {
1881     if ((rc= get_row(&archive, table->record[0])))
1882       goto error;
1883   }
1884   /*
1885     Now read records that may have been inserted concurrently.
1886     Acquire share->mutex so tail of the table is not modified by
1887     concurrent writers.
1888   */
1889   mysql_mutex_lock(&share->mutex);
1890   count= share->rows_recorded - count;
1891   if (share->archive_write_open)
1892     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1893   while (!(rc= get_row(&archive, table->record[0])))
1894     count--;
1895   mysql_mutex_unlock(&share->mutex);
1896 
1897   if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1898     goto error;
1899 
1900   thd_proc_info(thd, old_proc_info);
1901   DBUG_RETURN(HA_ADMIN_OK);
1902 
1903 error:
1904   thd_proc_info(thd, old_proc_info);
1905   share->crashed= FALSE;
1906   DBUG_RETURN(HA_ADMIN_CORRUPT);
1907 }
1908 
1909 /*
1910   Check and repair the table if needed.
1911 */
check_and_repair(THD * thd)1912 bool ha_archive::check_and_repair(THD *thd)
1913 {
1914   HA_CHECK_OPT check_opt;
1915   DBUG_ENTER("ha_archive::check_and_repair");
1916 
1917   check_opt.init();
1918 
1919   DBUG_RETURN(repair(thd, &check_opt));
1920 }
1921 
create_record_buffer(unsigned int length)1922 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1923 {
1924   DBUG_ENTER("ha_archive::create_record_buffer");
1925   archive_record_buffer *r;
1926   if (!(r=
1927         (archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
1928                                            MYF(MY_WME))))
1929   {
1930     DBUG_RETURN(NULL); /* purecov: inspected */
1931   }
1932   r->length= (int)length;
1933 
1934   if (!(r->buffer= (uchar*) my_malloc(r->length,
1935                                     MYF(MY_WME))))
1936   {
1937     my_free(r);
1938     DBUG_RETURN(NULL); /* purecov: inspected */
1939   }
1940 
1941   DBUG_RETURN(r);
1942 }
1943 
destroy_record_buffer(archive_record_buffer * r)1944 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1945 {
1946   DBUG_ENTER("ha_archive::destroy_record_buffer");
1947   my_free(r->buffer);
1948   my_free(r);
1949   DBUG_VOID_RETURN;
1950 }
1951 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1952 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info,
1953                                             uint table_changes)
1954 {
1955   if (info->auto_increment_value != stats.auto_increment_value ||
1956       (info->used_fields & HA_CREATE_USED_DATADIR) ||
1957       info->data_file_name ||
1958       (info->used_fields & HA_CREATE_USED_COMMENT) ||
1959       table_changes != IS_EQUAL_YES)
1960     return COMPATIBLE_DATA_NO;
1961 
1962   return COMPATIBLE_DATA_YES;
1963 }
1964 
1965 
1966 struct st_mysql_storage_engine archive_storage_engine=
1967 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1968 
mysql_declare_plugin(archive)1969 mysql_declare_plugin(archive)
1970 {
1971   MYSQL_STORAGE_ENGINE_PLUGIN,
1972   &archive_storage_engine,
1973   "ARCHIVE",
1974   "Brian Aker, MySQL AB",
1975   "Archive storage engine",
1976   PLUGIN_LICENSE_GPL,
1977   archive_db_init, /* Plugin Init */
1978   NULL, /* Plugin Deinit */
1979   0x0300 /* 3.0 */,
1980   NULL,                       /* status variables                */
1981   NULL,                       /* system variables                */
1982   NULL,                       /* config options                  */
1983   0,                          /* flags                           */
1984 }
1985 mysql_declare_plugin_end;
1986 
1987