1 /*
2    Copyright (c) 2004, 2015, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "probes_mysql.h"
26 #include "sql_class.h"                          // SSV
27 #include "sql_table.h"
28 #include <myisam.h>
29 
30 #include "ha_archive.h"
31 #include <my_dir.h>
32 
33 #include <mysql/plugin.h>
34 #include "mysql/psi/mysql_file.h"
35 
36 /*
37   First, if you want to understand storage engines you should look at
38   ha_example.cc and ha_example.h.
39 
40   This example was written as a test case for a customer who needed
41   a storage engine without indexes that could compress data very well.
42   So, welcome to a completely compressed storage engine. This storage
43   engine only does inserts. No replace, deletes, or updates. All reads are
44   complete table scans. Compression is done through a combination of packing
45   and making use of the zlib library
46 
47   We keep a file pointer open for each instance of ha_archive for each read
48   but for writes we keep one open file handle just for that. We flush it
49   only if we have a read occur. azip handles compressing lots of records
50   at once much better then doing lots of little records between writes.
51   It is possible to not lock on writes but this would then mean we couldn't
52   handle bulk inserts as well (that is if someone was trying to read at
53   the same time since we would want to flush).
54 
55   A "meta" file is kept alongside the data file. This file serves two purpose.
56   The first purpose is to track the number of rows in the table. The second
57   purpose is to determine if the table was closed properly or not. When the
58   meta file is first opened it is marked as dirty. It is opened when the table
59   itself is opened for writing. When the table is closed the new count for rows
60   is written to the meta file and the file is marked as clean. If the meta file
61   is opened and it is marked as dirty, it is assumed that a crash occured. At
62   this point an error occurs and the user is told to rebuild the file.
63   A rebuild scans the rows and rewrites the meta file. If corruption is found
64   in the data file then the meta file is not repaired.
65 
66   At some point a recovery method for such a drastic case needs to be divised.
67 
68   Locks are row level, and you will get a consistant read.
69 
70   For performance as far as table scans go it is quite fast. I don't have
71   good numbers but locally it has out performed both Innodb and MyISAM. For
72   Innodb the question will be if the table can be fit into the buffer
73   pool. For MyISAM its a question of how much the file system caches the
74   MyISAM file. With enough free memory MyISAM is faster. Its only when the OS
75   doesn't have enough memory to cache entire table that archive turns out
76   to be any faster.
77 
78   Examples between MyISAM (packed) and Archive.
79 
80   Table with 76695844 identical rows:
81   29680807 a_archive.ARZ
82   920350317 a.MYD
83 
84 
85   Table with 8991478 rows (all of Slashdot's comments):
86   1922964506 comment_archive.ARZ
87   2944970297 comment_text.MYD
88 
89 
90   TODO:
91    Allow users to set compression level.
92    Allow adjustable block size.
93    Implement versioning, should be easy.
94    Allow for errors, find a way to mark bad rows.
95    Add optional feature so that rows can be flushed at interval (which will cause less
96      compression but may speed up ordered searches).
97    Checkpoint the meta file to allow for faster rebuilds.
98    Option to allow for dirty reads, this would lower the sync calls, which would make
99      inserts a lot faster, but would mean highly arbitrary reads.
100 
101     -Brian
102 
103   Archive file format versions:
104   <5.1.5 - v.1
105   5.1.5-5.1.15 - v.2
106   >5.1.15 - v.3
107 */
108 
109 /* The file extension */
110 #define ARZ ".ARZ"               // The data file
111 #define ARN ".ARN"               // Files used during an optimize call
112 #define ARM ".ARM"               // Meta file (deprecated)
113 
114 /* 5.0 compatibility */
115 #define META_V1_OFFSET_CHECK_HEADER  0
116 #define META_V1_OFFSET_VERSION       1
117 #define META_V1_OFFSET_ROWS_RECORDED 2
118 #define META_V1_OFFSET_CHECK_POINT   10
119 #define META_V1_OFFSET_CRASHED       18
120 #define META_V1_LENGTH               19
121 
122 /*
123   uchar + uchar
124 */
125 #define DATA_BUFFER_SIZE 2       // Size of the data used in the data file
126 #define ARCHIVE_CHECK_HEADER 254 // The number we use to determine corruption
127 
128 #ifdef HAVE_PSI_INTERFACE
129 extern "C" PSI_file_key arch_key_file_data;
130 #endif
131 
132 /* Static declarations for handerton */
133 static handler *archive_create_handler(handlerton *hton,
134                                        TABLE_SHARE *table,
135                                        MEM_ROOT *mem_root);
136 int archive_discover(handlerton *hton, THD* thd, const char *db,
137                      const char *name,
138                      uchar **frmblob,
139                      size_t *frmlen);
140 
141 /*
142   Number of rows that will force a bulk insert.
143 */
144 #define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
145 
146 /*
147   Size of header used for row
148 */
149 #define ARCHIVE_ROW_HEADER_SIZE 4
150 
archive_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)151 static handler *archive_create_handler(handlerton *hton,
152                                        TABLE_SHARE *table,
153                                        MEM_ROOT *mem_root)
154 {
155   return new (mem_root) ha_archive(hton, table);
156 }
157 
158 PSI_memory_key az_key_memory_frm;
159 PSI_memory_key az_key_memory_record_buffer;
160 
161 #ifdef HAVE_PSI_INTERFACE
162 PSI_mutex_key az_key_mutex_Archive_share_mutex;
163 
164 static PSI_mutex_info all_archive_mutexes[]=
165 {
166   { &az_key_mutex_Archive_share_mutex, "Archive_share::mutex", 0}
167 };
168 
169 PSI_file_key arch_key_file_metadata, arch_key_file_data, arch_key_file_frm;
170 static PSI_file_info all_archive_files[]=
171 {
172   { &arch_key_file_metadata, "metadata", 0},
173   { &arch_key_file_data, "data", 0},
174   { &arch_key_file_frm, "FRM", 0}
175 };
176 
177 static PSI_memory_info all_archive_memory[]=
178 {
179   { &az_key_memory_frm, "FRM", 0},
180   { &az_key_memory_record_buffer, "record_buffer", 0},
181 };
182 
183 
init_archive_psi_keys(void)184 static void init_archive_psi_keys(void)
185 {
186   const char* category= "archive";
187   int count;
188 
189   count= array_elements(all_archive_mutexes);
190   mysql_mutex_register(category, all_archive_mutexes, count);
191 
192   count= array_elements(all_archive_files);
193   mysql_file_register(category, all_archive_files, count);
194 
195   count= array_elements(all_archive_memory);
196   mysql_memory_register(category, all_archive_memory, count);
197 }
198 
199 
200 #endif /* HAVE_PSI_INTERFACE */
201 
202 /*
203   Initialize the archive handler.
204 
205   SYNOPSIS
206     archive_db_init()
207     void *
208 
209   RETURN
210     FALSE       OK
211     TRUE        Error
212 */
213 
archive_db_init(void * p)214 int archive_db_init(void *p)
215 {
216   DBUG_ENTER("archive_db_init");
217   handlerton *archive_hton;
218 
219 #ifdef HAVE_PSI_INTERFACE
220   init_archive_psi_keys();
221 #endif
222 
223   archive_hton= (handlerton *)p;
224   archive_hton->state= SHOW_OPTION_YES;
225   archive_hton->db_type= DB_TYPE_ARCHIVE_DB;
226   archive_hton->create= archive_create_handler;
227   archive_hton->flags= HTON_NO_FLAGS;
228   archive_hton->discover= archive_discover;
229 
230   DBUG_RETURN(0);
231 }
232 
233 
Archive_share()234 Archive_share::Archive_share()
235 {
236   crashed= false;
237   in_optimize= false;
238   archive_write_open= false;
239   dirty= false;
240   DBUG_PRINT("ha_archive", ("Archive_share: %p",
241                             this));
242   thr_lock_init(&lock);
243   /*
244     We will use this lock for rows.
245   */
246   mysql_mutex_init(az_key_mutex_Archive_share_mutex,
247                    &mutex, MY_MUTEX_INIT_FAST);
248 }
249 
250 
ha_archive(handlerton * hton,TABLE_SHARE * table_arg)251 ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
252   :handler(hton, table_arg), share(NULL), bulk_insert(0)
253 {
254   /* Set our original buffer from pre-allocated memory */
255   buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
256 
257   /* The size of the offset value we will use for position() */
258   ref_length= sizeof(my_off_t);
259   archive_reader_open= FALSE;
260 }
261 
archive_discover(handlerton * hton,THD * thd,const char * db,const char * name,uchar ** frmblob,size_t * frmlen)262 int archive_discover(handlerton *hton, THD* thd, const char *db,
263                      const char *name,
264                      uchar **frmblob,
265                      size_t *frmlen)
266 {
267   DBUG_ENTER("archive_discover");
268   DBUG_PRINT("archive_discover", ("db: %s, name: %s", db, name));
269   azio_stream frm_stream;
270   char az_file[FN_REFLEN];
271   char *frm_ptr;
272   MY_STAT file_stat;
273 
274   build_table_filename(az_file, sizeof(az_file) - 1, db, name, ARZ, 0);
275 
276   if (!(mysql_file_stat(arch_key_file_data, az_file, &file_stat, MYF(0))))
277     goto err;
278 
279   if (!(azopen(&frm_stream, az_file, O_RDONLY|O_BINARY)))
280   {
281     if (errno == EROFS || errno == EACCES)
282     {
283       set_my_errno(errno);
284       DBUG_RETURN(errno);
285     }
286     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
287   }
288 
289   if (frm_stream.frm_length == 0)
290     goto err;
291 
292   frm_ptr= (char *)my_malloc(az_key_memory_frm,
293                              sizeof(char) * frm_stream.frm_length, MYF(0));
294   azread_frm(&frm_stream, frm_ptr);
295   azclose(&frm_stream);
296 
297   *frmlen= frm_stream.frm_length;
298   *frmblob= (uchar*) frm_ptr;
299 
300   DBUG_RETURN(0);
301 err:
302   set_my_errno(0);
303   DBUG_RETURN(1);
304 }
305 
save_auto_increment(TABLE * table,ulonglong * value)306 static void save_auto_increment(TABLE *table, ulonglong *value)
307 {
308   Field *field= table->found_next_number_field;
309   ulonglong auto_value=
310     (ulonglong) field->val_int(table->record[0] +
311                                field->offset(table->record[0]));
312   if (*value <= auto_value)
313     *value= auto_value + 1;
314 }
315 
316 /**
317   @brief Read version 1 meta file (5.0 compatibility routine).
318 
319   @return Completion status
320     @retval  0 Success
321     @retval !0 Failure
322 */
323 
read_v1_metafile()324 int Archive_share::read_v1_metafile()
325 {
326   char file_name[FN_REFLEN];
327   uchar buf[META_V1_LENGTH];
328   File fd;
329   DBUG_ENTER("Archive_share::read_v1_metafile");
330 
331   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
332   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_RDONLY, MYF(0))) == -1)
333     DBUG_RETURN(-1);
334 
335   if (mysql_file_read(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
336   {
337     mysql_file_close(fd, MYF(0));
338     DBUG_RETURN(-1);
339   }
340 
341   rows_recorded= uint8korr(buf + META_V1_OFFSET_ROWS_RECORDED);
342   crashed= buf[META_V1_OFFSET_CRASHED];
343   mysql_file_close(fd, MYF(0));
344   DBUG_RETURN(0);
345 }
346 
347 
348 /**
349   @brief Write version 1 meta file (5.0 compatibility routine).
350 
351   @return Completion status
352     @retval  0 Success
353     @retval !0 Failure
354 */
355 
write_v1_metafile()356 int Archive_share::write_v1_metafile()
357 {
358   char file_name[FN_REFLEN];
359   uchar buf[META_V1_LENGTH];
360   File fd;
361   DBUG_ENTER("Archive_share::write_v1_metafile");
362 
363   buf[META_V1_OFFSET_CHECK_HEADER]= ARCHIVE_CHECK_HEADER;
364   buf[META_V1_OFFSET_VERSION]= 1;
365   int8store(buf + META_V1_OFFSET_ROWS_RECORDED, rows_recorded);
366   int8store(buf + META_V1_OFFSET_CHECK_POINT, (ulonglong) 0);
367   buf[META_V1_OFFSET_CRASHED]= crashed;
368 
369   fn_format(file_name, data_file_name, "", ARM, MY_REPLACE_EXT);
370   if ((fd= mysql_file_open(arch_key_file_metadata, file_name, O_WRONLY, MYF(0))) == -1)
371     DBUG_RETURN(-1);
372 
373   if (mysql_file_write(fd, buf, sizeof(buf), MYF(0)) != sizeof(buf))
374   {
375     mysql_file_close(fd, MYF(0));
376     DBUG_RETURN(-1);
377   }
378 
379   mysql_file_close(fd, MYF(0));
380   DBUG_RETURN(0);
381 }
382 
383 
384 /**
385   @brief Pack version 1 row (5.0 compatibility routine).
386 
387   @param[in]  record  the record to pack
388 
389   @return Length of packed row
390 */
391 
pack_row_v1(uchar * record)392 unsigned int ha_archive::pack_row_v1(uchar *record)
393 {
394   uint *blob, *end;
395   uchar *pos;
396   DBUG_ENTER("pack_row_v1");
397   memcpy(record_buffer->buffer, record, table->s->reclength);
398   pos= record_buffer->buffer + table->s->reclength;
399   for (blob= table->s->blob_field, end= blob + table->s->blob_fields;
400        blob != end; blob++)
401   {
402     uint32 length= ((Field_blob *) table->field[*blob])->get_length();
403     if (length)
404     {
405       uchar *data_ptr;
406       ((Field_blob *) table->field[*blob])->get_ptr(&data_ptr);
407       memcpy(pos, data_ptr, length);
408       pos+= length;
409     }
410   }
411   DBUG_RETURN(pos - record_buffer->buffer);
412 }
413 
414 
415 /*
416   This method reads the header of a datafile and returns whether or not it was successful.
417 */
read_data_header(azio_stream * file_to_read)418 int ha_archive::read_data_header(azio_stream *file_to_read)
419 {
420   int error;
421   size_t ret;
422   uchar data_buffer[DATA_BUFFER_SIZE];
423   DBUG_ENTER("ha_archive::read_data_header");
424 
425   if (azrewind(file_to_read) == -1)
426     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
427 
428   if (file_to_read->version >= 3)
429     DBUG_RETURN(0);
430   /* Everything below this is just legacy to version 2< */
431 
432   DBUG_PRINT("ha_archive", ("Reading legacy data header"));
433 
434   ret= azread(file_to_read, data_buffer, DATA_BUFFER_SIZE, &error);
435 
436   if (ret != DATA_BUFFER_SIZE)
437   {
438     DBUG_PRINT("ha_archive", ("Reading, expected %d got %zu",
439                               DATA_BUFFER_SIZE, ret));
440     DBUG_RETURN(1);
441   }
442 
443   if (error)
444   {
445     DBUG_PRINT("ha_archive", ("Compression error (%d)", error));
446     DBUG_RETURN(1);
447   }
448 
449   DBUG_PRINT("ha_archive", ("Check %u", data_buffer[0]));
450   DBUG_PRINT("ha_archive", ("Version %u", data_buffer[1]));
451 
452   if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
453       (data_buffer[1] != (uchar)ARCHIVE_VERSION))
454     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
455 
456   DBUG_RETURN(0);
457 }
458 
459 
460 /*
461   We create the shared memory space that we will use for the open table.
462   No matter what we try to get or create a share. This is so that a repair
463   table operation can occur.
464 
465   See ha_example.cc for a longer description.
466 */
get_share(const char * table_name,int * rc)467 Archive_share *ha_archive::get_share(const char *table_name, int *rc)
468 {
469   Archive_share *tmp_share;
470 
471   DBUG_ENTER("ha_archive::get_share");
472 
473   lock_shared_ha_data();
474   if (!(tmp_share= static_cast<Archive_share*>(get_ha_share_ptr())))
475   {
476     azio_stream archive_tmp;
477 
478     tmp_share= new Archive_share;
479 
480     if (!tmp_share)
481     {
482       *rc= HA_ERR_OUT_OF_MEM;
483       goto err;
484     }
485     DBUG_PRINT("ha_archive", ("new Archive_share: %p",
486                               tmp_share));
487 
488     fn_format(tmp_share->data_file_name, table_name, "",
489               ARZ, MY_REPLACE_EXT | MY_UNPACK_FILENAME);
490     my_stpcpy(tmp_share->table_name, table_name);
491     DBUG_PRINT("ha_archive", ("Data File %s",
492                         tmp_share->data_file_name));
493 
494     /*
495       We read the meta file, but do not mark it dirty. Since we are not
496       doing a write we won't mark it dirty (and we won't open it for
497       anything but reading... open it for write and we will generate null
498       compression writes).
499     */
500     if (!(azopen(&archive_tmp, tmp_share->data_file_name, O_RDONLY|O_BINARY)))
501     {
502       delete tmp_share;
503       *rc= my_errno() ? my_errno() : HA_ERR_CRASHED;
504       tmp_share= NULL;
505       goto err;
506     }
507     stats.auto_increment_value= archive_tmp.auto_increment + 1;
508     tmp_share->rows_recorded= (ha_rows)archive_tmp.rows;
509     tmp_share->crashed= archive_tmp.dirty;
510     share= tmp_share;
511     if (archive_tmp.version == 1)
512       share->read_v1_metafile();
513     azclose(&archive_tmp);
514 
515     set_ha_share_ptr(static_cast<Handler_share*>(tmp_share));
516   }
517   if (tmp_share->crashed)
518     *rc= HA_ERR_CRASHED_ON_USAGE;
519 err:
520   unlock_shared_ha_data();
521 
522   DBUG_ASSERT(tmp_share || *rc);
523 
524   DBUG_RETURN(tmp_share);
525 }
526 
527 
init_archive_writer()528 int Archive_share::init_archive_writer()
529 {
530   DBUG_ENTER("Archive_share::init_archive_writer");
531   /*
532     It is expensive to open and close the data files and since you can't have
533     a gzip file that can be both read and written we keep a writer open
534     that is shared amoung all open tables.
535   */
536   if (!(azopen(&archive_write, data_file_name,
537                O_RDWR|O_BINARY)))
538   {
539     DBUG_PRINT("ha_archive", ("Could not open archive write file"));
540     crashed= true;
541     DBUG_RETURN(1);
542   }
543   archive_write_open= true;
544 
545   DBUG_RETURN(0);
546 }
547 
548 
close_archive_writer()549 void Archive_share::close_archive_writer()
550 {
551   mysql_mutex_assert_owner(&mutex);
552   if (archive_write_open)
553   {
554     if (archive_write.version == 1)
555       (void) write_v1_metafile();
556     azclose(&archive_write);
557     archive_write_open= false;
558     dirty= false;
559   }
560 }
561 
562 
563 /*
564   No locks are required because it is associated with just one handler instance
565 */
init_archive_reader()566 int ha_archive::init_archive_reader()
567 {
568   DBUG_ENTER("ha_archive::init_archive_reader");
569   /*
570     It is expensive to open and close the data files and since you can't have
571     a gzip file that can be both read and written we keep a writer open
572     that is shared amoung all open tables, but have one reader open for
573     each handler instance.
574   */
575   if (!archive_reader_open)
576   {
577     if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
578     {
579       DBUG_PRINT("ha_archive", ("Could not open archive read file"));
580       share->crashed= TRUE;
581       DBUG_RETURN(1);
582     }
583     archive_reader_open= TRUE;
584   }
585 
586   DBUG_RETURN(0);
587 }
588 
589 
590 /*
591   We just implement one additional file extension.
592 */
593 static const char *ha_archive_exts[] = {
594   ARZ,
595   NullS
596 };
597 
bas_ext() const598 const char **ha_archive::bas_ext() const
599 {
600   return ha_archive_exts;
601 }
602 
603 
604 /*
605   When opening a file we:
606   Create/get our shared structure.
607   Init out lock.
608   We open the file we will read from.
609 */
open(const char * name,int mode,uint open_options)610 int ha_archive::open(const char *name, int mode, uint open_options)
611 {
612   int rc= 0;
613   DBUG_ENTER("ha_archive::open");
614 
615   DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
616                       (open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
617   share= get_share(name, &rc);
618   if (!share)
619     DBUG_RETURN(rc);
620 
621   /* Allow open on crashed table in repair mode only. */
622   switch (rc)
623   {
624   case 0:
625     break;
626   case HA_ERR_CRASHED_ON_USAGE:
627     if (open_options & HA_OPEN_FOR_REPAIR)
628       break;
629     /* fall through */
630   default:
631     DBUG_RETURN(rc);
632   }
633 
634   record_buffer= create_record_buffer(table->s->reclength +
635                                       ARCHIVE_ROW_HEADER_SIZE);
636 
637   if (!record_buffer)
638     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
639 
640   thr_lock_data_init(&share->lock, &lock, NULL);
641 
642   DBUG_PRINT("ha_archive", ("archive table was crashed %s",
643                       rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
644   if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
645   {
646     DBUG_RETURN(0);
647   }
648 
649   DBUG_RETURN(rc);
650 }
651 
652 
653 /*
654   Closes the file.
655 
656   SYNOPSIS
657     close();
658 
659   IMPLEMENTATION:
660 
661   We first close this storage engines file handle to the archive and
662   then remove our reference count to the table (and possibly free it
663   as well).
664 
665   RETURN
666     0  ok
667     1  Error
668 */
669 
close(void)670 int ha_archive::close(void)
671 {
672   int rc= 0;
673   DBUG_ENTER("ha_archive::close");
674 
675   destroy_record_buffer(record_buffer);
676 
677   if (archive_reader_open)
678   {
679     if (azclose(&archive))
680       rc= 1;
681   }
682 
683   DBUG_RETURN(rc);
684 }
685 
686 
frm_load(const char * name,azio_stream * dst)687 void ha_archive::frm_load(const char *name, azio_stream *dst)
688 {
689   char name_buff[FN_REFLEN];
690   MY_STAT file_stat;
691   File frm_file;
692   uchar *frm_ptr;
693   DBUG_ENTER("ha_archive::frm_load");
694   fn_format(name_buff, name, "", ".frm", MY_REPLACE_EXT | MY_UNPACK_FILENAME);
695 
696   /* Here is where we open up the frm and pass it to archive to store */
697   if ((frm_file= mysql_file_open(arch_key_file_frm, name_buff, O_RDONLY, MYF(0))) >= 0)
698   {
699     if (!mysql_file_fstat(frm_file, &file_stat, MYF(MY_WME)))
700     {
701       frm_ptr= (uchar *) my_malloc(az_key_memory_frm,
702                                    sizeof(uchar) * (size_t) file_stat.st_size, MYF(0));
703       if (frm_ptr)
704       {
705         if (mysql_file_read(frm_file, frm_ptr, (size_t) file_stat.st_size, MYF(0)) ==
706             (size_t) file_stat.st_size)
707           azwrite_frm(dst, (char *) frm_ptr, (size_t) file_stat.st_size);
708         my_free(frm_ptr);
709       }
710     }
711     mysql_file_close(frm_file, MYF(0));
712   }
713   DBUG_VOID_RETURN;
714 }
715 
716 
717 /**
718   Copy a frm blob between streams.
719 
720   @param[in]  src   The source stream.
721   @param[in]  dst   The destination stream.
722 
723   @return Zero on success, non-zero otherwise.
724 */
725 
frm_copy(azio_stream * src,azio_stream * dst)726 int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
727 {
728   int rc= 0;
729   char *frm_ptr;
730 
731   /* If there is no .frm in source stream, try to read .frm from file. */
732   if (!src->frm_length)
733   {
734     frm_load(table->s->normalized_path.str, dst);
735     return 0;
736   }
737 
738   if (!(frm_ptr= (char *) my_malloc(az_key_memory_frm,
739                                     src->frm_length, MYF(0))))
740     return HA_ERR_OUT_OF_MEM;
741 
742   /* Write file offset is set to the end of the file. */
743   if (azread_frm(src, frm_ptr) ||
744       azwrite_frm(dst, frm_ptr, src->frm_length))
745     rc= my_errno() ? my_errno() : HA_ERR_INTERNAL_ERROR;
746 
747   my_free(frm_ptr);
748 
749   return rc;
750 }
751 
752 
753 /*
754   We create our data file here. The format is pretty simple.
755   You can read about the format of the data file above.
756   Unlike other storage engines we do not "pack" our data. Since we
757   are about to do a general compression, packing would just be a waste of
758   CPU time. If the table has blobs they are written after the row in the order
759   of creation.
760 */
761 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)762 int ha_archive::create(const char *name, TABLE *table_arg,
763                        HA_CREATE_INFO *create_info)
764 {
765   char name_buff[FN_REFLEN];
766   char linkname[FN_REFLEN];
767   int error;
768   azio_stream create_stream;            /* Archive file we are working with */
769   MY_STAT file_stat;  // Stat information for the data file
770 
771   DBUG_ENTER("ha_archive::create");
772 
773   stats.auto_increment_value= create_info->auto_increment_value;
774 
775   for (uint key= 0; key < table_arg->s->keys; key++)
776   {
777     KEY *pos= table_arg->key_info+key;
778     KEY_PART_INFO *key_part=     pos->key_part;
779     KEY_PART_INFO *key_part_end= key_part + pos->user_defined_key_parts;
780 
781     for (; key_part != key_part_end; key_part++)
782     {
783       Field *field= key_part->field;
784 
785       if (!(field->flags & AUTO_INCREMENT_FLAG))
786       {
787         error= -1;
788         DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
789         goto error;
790       }
791     }
792   }
793 
794   /*
795     We reuse name_buff since it is available.
796   */
797 #ifdef HAVE_READLINK
798   if (my_enable_symlinks &&
799       create_info->data_file_name &&
800       create_info->data_file_name[0] != '#')
801   {
802     DBUG_PRINT("ha_archive", ("archive will create stream file %s",
803                         create_info->data_file_name));
804 
805     fn_format(name_buff, create_info->data_file_name, "", ARZ,
806               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
807     fn_format(linkname, name, "", ARZ,
808               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
809   }
810   else
811 #endif /* HAVE_READLINK */
812   {
813     if (create_info->data_file_name)
814     {
815       push_warning_printf(table_arg->in_use, Sql_condition::SL_WARNING,
816                           WARN_OPTION_IGNORED,
817                           ER_DEFAULT(WARN_OPTION_IGNORED),
818                           "DATA DIRECTORY");
819     }
820     fn_format(name_buff, name, "", ARZ,
821               MY_REPLACE_EXT | MY_UNPACK_FILENAME);
822     linkname[0]= 0;
823   }
824 
825   /* Archive engine never uses INDEX DIRECTORY. */
826   if (create_info->index_file_name)
827   {
828     push_warning_printf(table_arg->in_use, Sql_condition::SL_WARNING,
829                         WARN_OPTION_IGNORED,
830                         ER_DEFAULT(WARN_OPTION_IGNORED),
831                         "INDEX DIRECTORY");
832   }
833 
834   /*
835     There is a chance that the file was "discovered". In this case
836     just use whatever file is there.
837   */
838   if (!(mysql_file_stat(arch_key_file_data, name_buff, &file_stat, MYF(0))))
839   {
840     set_my_errno(0);
841     if (!(azopen(&create_stream, name_buff, O_CREAT|O_RDWR|O_BINARY)))
842     {
843       error= errno;
844       goto error2;
845     }
846 
847     if (linkname[0])
848       my_symlink(name_buff, linkname, MYF(0));
849 
850     frm_load(name, &create_stream);
851 
852     if (create_info->comment.str)
853       azwrite_comment(&create_stream, create_info->comment.str,
854                       create_info->comment.length);
855 
856     /*
857       Yes you need to do this, because the starting value
858       for the autoincrement may not be zero.
859     */
860     create_stream.auto_increment= stats.auto_increment_value ?
861                                     stats.auto_increment_value - 1 : 0;
862     if (azclose(&create_stream))
863     {
864       error= errno;
865       goto error2;
866     }
867   }
868   else
869     set_my_errno(0);
870 
871   DBUG_PRINT("ha_archive", ("Creating File %s", name_buff));
872   DBUG_PRINT("ha_archive", ("Creating Link %s", linkname));
873 
874 
875   DBUG_RETURN(0);
876 
877 error2:
878   delete_table(name);
879 error:
880   /* Return error number, if we got one */
881   DBUG_RETURN(error ? error : -1);
882 }
883 
884 /*
885   This is where the actual row is written out.
886 */
real_write_row(uchar * buf,azio_stream * writer)887 int ha_archive::real_write_row(uchar *buf, azio_stream *writer)
888 {
889   my_off_t written;
890   unsigned int r_pack_length;
891   DBUG_ENTER("ha_archive::real_write_row");
892 
893   /* We pack the row for writing */
894   r_pack_length= pack_row(buf, writer);
895 
896   written= azwrite(writer, record_buffer->buffer, r_pack_length);
897   if (written != r_pack_length)
898   {
899     DBUG_PRINT("ha_archive", ("Wrote %d bytes expected %d",
900                                               (uint32) written,
901                                               (uint32)r_pack_length));
902     DBUG_RETURN(-1);
903   }
904 
905   if (!bulk_insert)
906     share->dirty= TRUE;
907 
908   DBUG_RETURN(0);
909 }
910 
911 
912 /*
913   Calculate max length needed for row. This includes
914   the bytes required for the length in the header.
915 */
916 
max_row_length(const uchar * buf)917 uint32 ha_archive::max_row_length(const uchar *buf)
918 {
919   uint32 length= (uint32)(table->s->reclength + table->s->fields*2);
920   length+= ARCHIVE_ROW_HEADER_SIZE;
921 
922   uint *ptr, *end;
923   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
924        ptr != end ;
925        ptr++)
926   {
927     if (!table->field[*ptr]->is_null())
928       length += 2 + ((Field_blob*)table->field[*ptr])->get_length();
929   }
930 
931   return length;
932 }
933 
934 
pack_row(uchar * record,azio_stream * writer)935 unsigned int ha_archive::pack_row(uchar *record, azio_stream *writer)
936 {
937   uchar *ptr;
938 
939   DBUG_ENTER("ha_archive::pack_row");
940 
941 
942   if (fix_rec_buff(max_row_length(record)))
943     DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
944 
945   if (writer->version == 1)
946     DBUG_RETURN(pack_row_v1(record));
947 
948   /* Copy null bits */
949   memcpy(record_buffer->buffer+ARCHIVE_ROW_HEADER_SIZE,
950          record, table->s->null_bytes);
951   ptr= record_buffer->buffer + table->s->null_bytes + ARCHIVE_ROW_HEADER_SIZE;
952 
953   for (Field **field=table->field ; *field ; field++)
954   {
955     if (!((*field)->is_null()))
956       ptr= (*field)->pack(ptr, record + (*field)->offset(record));
957   }
958 
959   int4store(record_buffer->buffer, (int)(ptr - record_buffer->buffer -
960                                          ARCHIVE_ROW_HEADER_SIZE));
961   DBUG_PRINT("ha_archive",("Pack row length %u", (unsigned int)
962                            (ptr - record_buffer->buffer -
963                              ARCHIVE_ROW_HEADER_SIZE)));
964 
965   DBUG_RETURN((unsigned int) (ptr - record_buffer->buffer));
966 }
967 
968 
969 /*
970   Look at ha_archive::open() for an explanation of the row format.
971   Here we just write out the row.
972 
973   Wondering about start_bulk_insert()? We don't implement it for
974   archive since it optimizes for lots of writes. The only save
975   for implementing start_bulk_insert() is that we could skip
976   setting dirty to true each time.
977 */
write_row(uchar * buf)978 int ha_archive::write_row(uchar *buf)
979 {
980   int rc;
981   uchar *read_buf= NULL;
982   ulonglong temp_auto;
983   uchar *record=  table->record[0];
984   DBUG_ENTER("ha_archive::write_row");
985 
986   if (share->crashed)
987     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
988 
989   ha_statistic_increment(&SSV::ha_write_count);
990   mysql_mutex_lock(&share->mutex);
991 
992   if (!share->archive_write_open && share->init_archive_writer())
993   {
994     rc= HA_ERR_CRASHED_ON_USAGE;
995     goto error;
996   }
997 
998   if (table->next_number_field && record == table->record[0])
999   {
1000     KEY *mkey= &table->s->key_info[0]; // We only support one key right now
1001     update_auto_increment();
1002     temp_auto= (((Field_num*) table->next_number_field)->unsigned_flag ||
1003                 table->next_number_field->val_int() > 0 ?
1004                 table->next_number_field->val_int() : 0);
1005 
1006     /*
1007       We don't support decremening auto_increment. They make the performance
1008       just cry.
1009     */
1010     if (temp_auto <= share->archive_write.auto_increment &&
1011         mkey->flags & HA_NOSAME)
1012     {
1013       rc= HA_ERR_FOUND_DUPP_KEY;
1014       goto error;
1015     }
1016     else
1017     {
1018       if (temp_auto > share->archive_write.auto_increment)
1019         stats.auto_increment_value=
1020           (share->archive_write.auto_increment= temp_auto) + 1;
1021     }
1022   }
1023 
1024   /*
1025     Notice that the global auto_increment has been increased.
1026     In case of a failed row write, we will never try to reuse the value.
1027   */
1028   share->rows_recorded++;
1029   rc= real_write_row(buf,  &(share->archive_write));
1030 error:
1031   mysql_mutex_unlock(&share->mutex);
1032   if (read_buf)
1033     my_free(read_buf);
1034   DBUG_RETURN(rc);
1035 }
1036 
1037 
get_auto_increment(ulonglong offset,ulonglong increment,ulonglong nb_desired_values,ulonglong * first_value,ulonglong * nb_reserved_values)1038 void ha_archive::get_auto_increment(ulonglong offset, ulonglong increment,
1039                                     ulonglong nb_desired_values,
1040                                     ulonglong *first_value,
1041                                     ulonglong *nb_reserved_values)
1042 {
1043   *nb_reserved_values= ULLONG_MAX;
1044   *first_value= share->archive_write.auto_increment + 1;
1045 }
1046 
1047 /* Initialized at each key walk (called multiple times unlike rnd_init()) */
index_init(uint keynr,bool sorted)1048 int ha_archive::index_init(uint keynr, bool sorted)
1049 {
1050   DBUG_ENTER("ha_archive::index_init");
1051   active_index= keynr;
1052   DBUG_RETURN(0);
1053 }
1054 
1055 
1056 /*
1057   No indexes, so if we get a request for an index search since we tell
1058   the optimizer that we have unique indexes, we scan
1059 */
index_read(uchar * buf,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1060 int ha_archive::index_read(uchar *buf, const uchar *key,
1061                              uint key_len, enum ha_rkey_function find_flag)
1062 {
1063   int rc;
1064   DBUG_ENTER("ha_archive::index_read");
1065   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1066   rc= index_read_idx(buf, active_index, key, key_len, find_flag);
1067   MYSQL_INDEX_READ_ROW_DONE(rc);
1068   DBUG_RETURN(rc);
1069 }
1070 
1071 
index_read_idx(uchar * buf,uint index,const uchar * key,uint key_len,enum ha_rkey_function find_flag)1072 int ha_archive::index_read_idx(uchar *buf, uint index, const uchar *key,
1073                                  uint key_len, enum ha_rkey_function find_flag)
1074 {
1075   int rc;
1076   bool found= 0;
1077   KEY *mkey= &table->s->key_info[index];
1078   current_k_offset= mkey->key_part->offset;
1079   current_key= key;
1080   current_key_len= key_len;
1081 
1082 
1083   DBUG_ENTER("ha_archive::index_read_idx");
1084 
1085   rc= rnd_init(TRUE);
1086 
1087   if (rc)
1088     goto error;
1089 
1090   while (!(get_row(&archive, buf)))
1091   {
1092     if (!memcmp(current_key, buf + current_k_offset, current_key_len))
1093     {
1094       found= 1;
1095       break;
1096     }
1097   }
1098 
1099   if (found)
1100   {
1101     /* notify handler that a record has been found */
1102     table->status= 0;
1103     DBUG_RETURN(0);
1104   }
1105 
1106 error:
1107   DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
1108 }
1109 
1110 
index_next(uchar * buf)1111 int ha_archive::index_next(uchar * buf)
1112 {
1113   bool found= 0;
1114   int rc;
1115 
1116   DBUG_ENTER("ha_archive::index_next");
1117   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
1118 
1119   while (!(get_row(&archive, buf)))
1120   {
1121     if (!memcmp(current_key, buf+current_k_offset, current_key_len))
1122     {
1123       found= 1;
1124       break;
1125     }
1126   }
1127 
1128   rc= found ? 0 : HA_ERR_END_OF_FILE;
1129   MYSQL_INDEX_READ_ROW_DONE(rc);
1130   DBUG_RETURN(rc);
1131 }
1132 
1133 /*
1134   All calls that need to scan the table start with this method. If we are told
1135   that it is a table scan we rewind the file to the beginning, otherwise
1136   we assume the position will be set.
1137 */
1138 
rnd_init(bool scan)1139 int ha_archive::rnd_init(bool scan)
1140 {
1141   DBUG_ENTER("ha_archive::rnd_init");
1142 
1143   if (share->crashed)
1144       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1145 
1146   init_archive_reader();
1147 
1148   /* We rewind the file so that we can read from the beginning if scan */
1149   if (scan)
1150   {
1151     scan_rows= stats.records;
1152     DBUG_PRINT("info", ("archive will retrieve %llu rows",
1153                         (unsigned long long) scan_rows));
1154 
1155     if (read_data_header(&archive))
1156       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1157   }
1158 
1159   DBUG_RETURN(0);
1160 }
1161 
1162 
1163 /*
1164   This is the method that is used to read a row. It assumes that the row is
1165   positioned where you want it.
1166 */
get_row(azio_stream * file_to_read,uchar * buf)1167 int ha_archive::get_row(azio_stream *file_to_read, uchar *buf)
1168 {
1169   int rc;
1170   DBUG_ENTER("ha_archive::get_row");
1171   DBUG_PRINT("ha_archive", ("Picking version for get_row() %d -> %d",
1172                             (uchar)file_to_read->version,
1173                             ARCHIVE_VERSION));
1174   if (file_to_read->version == ARCHIVE_VERSION)
1175     rc= get_row_version3(file_to_read, buf);
1176   else
1177     rc= get_row_version2(file_to_read, buf);
1178 
1179   DBUG_PRINT("ha_archive", ("Return %d\n", rc));
1180 
1181   DBUG_RETURN(rc);
1182 }
1183 
1184 /* Reallocate buffer if needed */
fix_rec_buff(unsigned int length)1185 bool ha_archive::fix_rec_buff(unsigned int length)
1186 {
1187   DBUG_ENTER("ha_archive::fix_rec_buff");
1188   DBUG_PRINT("ha_archive", ("Fixing %u for %u",
1189                             length, record_buffer->length));
1190   DBUG_ASSERT(record_buffer->buffer);
1191 
1192   if (length > record_buffer->length)
1193   {
1194     uchar *newptr;
1195     if (!(newptr=(uchar*) my_realloc(az_key_memory_record_buffer,
1196                                      (uchar*) record_buffer->buffer,
1197                                     length,
1198 				    MYF(MY_ALLOW_ZERO_PTR))))
1199       DBUG_RETURN(1);
1200     record_buffer->buffer= newptr;
1201     record_buffer->length= length;
1202   }
1203 
1204   DBUG_ASSERT(length <= record_buffer->length);
1205 
1206   DBUG_RETURN(0);
1207 }
1208 
unpack_row(azio_stream * file_to_read,uchar * record)1209 int ha_archive::unpack_row(azio_stream *file_to_read, uchar *record)
1210 {
1211   DBUG_ENTER("ha_archive::unpack_row");
1212 
1213   size_t read;
1214   int error;
1215   uchar size_buffer[ARCHIVE_ROW_HEADER_SIZE], *size_buffer_p= size_buffer;
1216   unsigned int row_len;
1217 
1218   /* First we grab the length stored */
1219   read= azread(file_to_read, size_buffer, ARCHIVE_ROW_HEADER_SIZE, &error);
1220 
1221   if (error == Z_STREAM_ERROR ||  (read && read < ARCHIVE_ROW_HEADER_SIZE))
1222     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1223 
1224   /* If we read nothing we are at the end of the file */
1225   if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
1226     DBUG_RETURN(HA_ERR_END_OF_FILE);
1227 
1228   row_len= uint4korr(size_buffer_p);
1229   DBUG_PRINT("ha_archive",("Unpack row length %u -> %u", row_len,
1230                            (unsigned int)table->s->reclength));
1231 
1232   if (fix_rec_buff(row_len))
1233   {
1234     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1235   }
1236   DBUG_ASSERT(row_len <= record_buffer->length);
1237 
1238   read= azread(file_to_read, record_buffer->buffer, row_len, &error);
1239 
1240   if (read != row_len || error)
1241   {
1242     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1243   }
1244 
1245   /* Copy null bits */
1246   const uchar *ptr= record_buffer->buffer;
1247   /*
1248     Field::unpack() is not called when field is NULL. For VARCHAR
1249     Field::unpack() only unpacks as much bytes as occupied by field
1250     value. In these cases respective memory area on record buffer is
1251     not initialized.
1252 
1253     These uninitialized areas may be accessed by CHECKSUM TABLE or
1254     by optimizer using temporary table (BUG#12997905). We may remove
1255     this memset() when they're fixed.
1256   */
1257   memset(record, 0, table->s->reclength);
1258   memcpy(record, ptr, table->s->null_bytes);
1259   ptr+= table->s->null_bytes;
1260   for (Field **field=table->field ; *field ; field++)
1261   {
1262     if (!((*field)->is_null_in_record(record)))
1263     {
1264       ptr= (*field)->unpack(record + (*field)->offset(table->record[0]), ptr);
1265     }
1266   }
1267   DBUG_RETURN(0);
1268 }
1269 
1270 
get_row_version3(azio_stream * file_to_read,uchar * buf)1271 int ha_archive::get_row_version3(azio_stream *file_to_read, uchar *buf)
1272 {
1273   DBUG_ENTER("ha_archive::get_row_version3");
1274 
1275   int returnable= unpack_row(file_to_read, buf);
1276 
1277   DBUG_RETURN(returnable);
1278 }
1279 
1280 
get_row_version2(azio_stream * file_to_read,uchar * buf)1281 int ha_archive::get_row_version2(azio_stream *file_to_read, uchar *buf)
1282 {
1283   size_t read;
1284   int error;
1285   uint *ptr, *end;
1286   char *last;
1287   size_t total_blob_length= 0;
1288   MY_BITMAP *read_set= table->read_set;
1289   DBUG_ENTER("ha_archive::get_row_version2");
1290 
1291   read= azread(file_to_read, (voidp)buf, table->s->reclength, &error);
1292 
1293   /* If we read nothing we are at the end of the file */
1294   if (read == 0)
1295     DBUG_RETURN(HA_ERR_END_OF_FILE);
1296 
1297   if (read != table->s->reclength)
1298   {
1299     DBUG_PRINT("ha_archive::get_row_version2", ("Read %zu bytes expected %u",
1300                                                 read,
1301                                                 (unsigned int)table->s->reclength));
1302     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1303   }
1304 
1305   if (error == Z_STREAM_ERROR || error == Z_DATA_ERROR )
1306     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1307 
1308   /* If the record is the wrong size, the file is probably damaged. */
1309   if ((ulong) read != table->s->reclength)
1310     DBUG_RETURN(HA_ERR_END_OF_FILE);
1311 
1312   /* Calculate blob length, we use this for our buffer */
1313   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1314        ptr != end ;
1315        ptr++)
1316   {
1317     if (bitmap_is_set(read_set,
1318                       (((Field_blob*) table->field[*ptr])->field_index)))
1319         total_blob_length += ((Field_blob*) table->field[*ptr])->get_length();
1320   }
1321 
1322   /* Adjust our row buffer if we need be */
1323   buffer.alloc(total_blob_length);
1324   last= (char *)buffer.ptr();
1325 
1326   /* Loop through our blobs and read them */
1327   for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
1328        ptr != end ;
1329        ptr++)
1330   {
1331     size_t size= ((Field_blob*) table->field[*ptr])->get_length();
1332     if (size)
1333     {
1334       if (bitmap_is_set(read_set,
1335                         ((Field_blob*) table->field[*ptr])->field_index))
1336       {
1337         read= azread(file_to_read, last, size, &error);
1338 
1339         if (error)
1340           DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1341 
1342         if ((size_t) read != size)
1343           DBUG_RETURN(HA_ERR_END_OF_FILE);
1344         ((Field_blob*) table->field[*ptr])->set_ptr(size, (uchar*) last);
1345         last += size;
1346       }
1347       else
1348       {
1349         (void)azseek(file_to_read, size, SEEK_CUR);
1350       }
1351     }
1352   }
1353   DBUG_RETURN(0);
1354 }
1355 
1356 
1357 /*
1358   Called during ORDER BY. Its position is either from being called sequentially
1359   or by having had ha_archive::rnd_pos() called before it is called.
1360 */
1361 
rnd_next(uchar * buf)1362 int ha_archive::rnd_next(uchar *buf)
1363 {
1364   int rc;
1365   DBUG_ENTER("ha_archive::rnd_next");
1366   MYSQL_READ_ROW_START(table_share->db.str,
1367                        table_share->table_name.str, TRUE);
1368 
1369   if (share->crashed)
1370       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1371 
1372   if (!scan_rows)
1373   {
1374     rc= HA_ERR_END_OF_FILE;
1375     goto end;
1376   }
1377   scan_rows--;
1378 
1379   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1380   current_position= aztell(&archive);
1381   rc= get_row(&archive, buf);
1382 
1383   table->status=rc ? STATUS_NOT_FOUND: 0;
1384 
1385 end:
1386   MYSQL_READ_ROW_DONE(rc);
1387   DBUG_RETURN(rc);
1388 }
1389 
1390 
1391 /*
1392   Thanks to the table flag HA_REC_NOT_IN_SEQ this will be called after
1393   each call to ha_archive::rnd_next() if an ordering of the rows is
1394   needed.
1395 */
1396 
position(const uchar * record)1397 void ha_archive::position(const uchar *record)
1398 {
1399   DBUG_ENTER("ha_archive::position");
1400   my_store_ptr(ref, ref_length, current_position);
1401   DBUG_VOID_RETURN;
1402 }
1403 
1404 
1405 /*
1406   This is called after a table scan for each row if the results of the
1407   scan need to be ordered. It will take *pos and use it to move the
1408   cursor in the file so that the next row that is called is the
1409   correctly ordered row.
1410 */
1411 
rnd_pos(uchar * buf,uchar * pos)1412 int ha_archive::rnd_pos(uchar * buf, uchar *pos)
1413 {
1414   int rc;
1415   DBUG_ENTER("ha_archive::rnd_pos");
1416   MYSQL_READ_ROW_START(table_share->db.str,
1417                        table_share->table_name.str, FALSE);
1418   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1419   current_position= (my_off_t)my_get_ptr(pos, ref_length);
1420   if (azseek(&archive, current_position, SEEK_SET) == (my_off_t)(-1L))
1421   {
1422     rc= HA_ERR_CRASHED_ON_USAGE;
1423     goto end;
1424   }
1425   rc= get_row(&archive, buf);
1426 end:
1427   MYSQL_READ_ROW_DONE(rc);
1428   DBUG_RETURN(rc);
1429 }
1430 
1431 /*
1432   This method repairs the meta file. It does this by walking the datafile and
1433   rewriting the meta file. If EXTENDED repair is requested, we attempt to
1434   recover as much data as possible.
1435 */
repair(THD * thd,HA_CHECK_OPT * check_opt)1436 int ha_archive::repair(THD* thd, HA_CHECK_OPT* check_opt)
1437 {
1438   DBUG_ENTER("ha_archive::repair");
1439   int rc= optimize(thd, check_opt);
1440 
1441   if (rc)
1442     DBUG_RETURN(HA_ADMIN_CORRUPT);
1443 
1444   share->crashed= FALSE;
1445   DBUG_RETURN(0);
1446 }
1447 
1448 /*
1449   The table can become fragmented if data was inserted, read, and then
1450   inserted again. What we do is open up the file and recompress it completely.
1451 */
optimize(THD * thd,HA_CHECK_OPT * check_opt)1452 int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
1453 {
1454   int rc= 0;
1455   azio_stream writer;
1456   ha_rows count;
1457   my_bitmap_map *org_bitmap;
1458   char writer_filename[FN_REFLEN];
1459   DBUG_ENTER("ha_archive::optimize");
1460 
1461   mysql_mutex_lock(&share->mutex);
1462   if (share->in_optimize)
1463   {
1464     mysql_mutex_unlock(&share->mutex);
1465     DBUG_RETURN(HA_ADMIN_FAILED);
1466   }
1467   share->in_optimize= true;
1468   /* remember the number of rows */
1469   count= share->rows_recorded;
1470   if (share->archive_write_open)
1471     azflush(&share->archive_write, Z_SYNC_FLUSH);
1472   mysql_mutex_unlock(&share->mutex);
1473 
1474   init_archive_reader();
1475 
1476   /* Lets create a file to contain the new data */
1477   fn_format(writer_filename, share->table_name, "", ARN,
1478             MY_REPLACE_EXT | MY_UNPACK_FILENAME);
1479 
1480   if (!(azopen(&writer, writer_filename, O_CREAT|O_RDWR|O_BINARY)))
1481   {
1482     share->in_optimize= false;
1483     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1484   }
1485 
1486   /*
1487     Transfer the embedded FRM so that the file can be discoverable.
1488     Write file offset is set to the end of the file.
1489   */
1490   if ((rc= frm_copy(&archive, &writer)))
1491   {
1492     share->in_optimize= false;
1493     goto error;
1494   }
1495   /*
1496     An extended rebuild is a lot more effort. We open up each row and re-record it.
1497     Any dead rows are removed (aka rows that may have been partially recorded).
1498 
1499     As of Archive format 3, this is the only type that is performed, before this
1500     version it was just done on T_EXTEND
1501   */
1502 
1503   DBUG_PRINT("ha_archive", ("archive extended rebuild"));
1504 
1505   /*
1506     Now we will rewind the archive file so that we are positioned at the
1507     start of the file.
1508   */
1509   if ((rc= read_data_header(&archive)))
1510   {
1511     share->in_optimize= false;
1512     goto error;
1513   }
1514 
1515   stats.auto_increment_value= 1;
1516   org_bitmap= tmp_use_all_columns(table, table->read_set);
1517   /* read rows upto the remembered rows */
1518   for (ha_rows cur_count= count; cur_count; cur_count--)
1519   {
1520     if ((rc= get_row(&archive, table->record[0])))
1521       break;
1522     real_write_row(table->record[0], &writer);
1523     if (table->found_next_number_field)
1524       save_auto_increment(table, &stats.auto_increment_value);
1525   }
1526 
1527   mysql_mutex_lock(&share->mutex);
1528 
1529   share->close_archive_writer();
1530   if (!rc)
1531   {
1532     /* read the remaining rows */
1533     for (count= share->rows_recorded - count; count; count--)
1534     {
1535       if ((rc= get_row(&archive, table->record[0])))
1536         break;
1537       real_write_row(table->record[0], &writer);
1538       if (table->found_next_number_field)
1539         save_auto_increment(table, &stats.auto_increment_value);
1540     }
1541   }
1542 
1543   tmp_restore_column_map(table->read_set, org_bitmap);
1544   share->rows_recorded= (ha_rows) writer.rows;
1545   share->archive_write.auto_increment= stats.auto_increment_value - 1;
1546   DBUG_PRINT("info", ("recovered %llu archive rows",
1547                       (unsigned long long)share->rows_recorded));
1548 
1549   DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
1550                       (unsigned long long)share->rows_recorded));
1551 
1552   /*
1553     If REPAIR ... EXTENDED is requested, try to recover as much data
1554     from data file as possible. In this case if we failed to read a
1555     record, we assume EOF. This allows massive data loss, but we can
1556     hardly do more with broken zlib stream. And this is the only way
1557     to restore at least what is still recoverable.
1558   */
1559   if (rc && rc != HA_ERR_END_OF_FILE && !(check_opt->flags & T_EXTEND))
1560   {
1561     share->in_optimize= false;
1562     mysql_mutex_unlock(&share->mutex);
1563     goto error;
1564   }
1565 
1566   azclose(&writer);
1567   share->dirty= FALSE;
1568   azclose(&archive);
1569   archive_reader_open= FALSE;
1570 
1571   // make the file we just wrote be our data file
1572   rc= my_rename(writer_filename, share->data_file_name, MYF(0));
1573   share->in_optimize= false;
1574   mysql_mutex_unlock(&share->mutex);
1575 
1576   DBUG_RETURN(rc);
1577 error:
1578   DBUG_PRINT("ha_archive", ("Failed to recover, error was %d", rc));
1579   azclose(&writer);
1580 
1581   DBUG_RETURN(rc);
1582 }
1583 
1584 /*
1585   Below is an example of how to setup row level locking.
1586 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1587 THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
1588                                        THR_LOCK_DATA **to,
1589                                        enum thr_lock_type lock_type)
1590 {
1591   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1592   {
1593     /*
1594       Here is where we get into the guts of a row level lock.
1595       If TL_UNLOCK is set
1596       If we are not doing a LOCK TABLE or DISCARD/IMPORT
1597       TABLESPACE, then allow multiple writers
1598     */
1599 
1600     if ((lock_type >= TL_WRITE_CONCURRENT_INSERT &&
1601          lock_type <= TL_WRITE) && !thd_in_lock_tables(thd)
1602         && !thd_tablespace_op(thd))
1603       lock_type = TL_WRITE_ALLOW_WRITE;
1604 
1605     /*
1606       In queries of type INSERT INTO t1 SELECT ... FROM t2 ...
1607       MySQL would use the lock TL_READ_NO_INSERT on t2, and that
1608       would conflict with TL_WRITE_ALLOW_WRITE, blocking all inserts
1609       to t2. Convert the lock to a normal read lock to allow
1610       concurrent inserts to t2.
1611     */
1612 
1613     if (lock_type == TL_READ_NO_INSERT && !thd_in_lock_tables(thd))
1614       lock_type = TL_READ;
1615 
1616     lock.type=lock_type;
1617   }
1618 
1619   *to++= &lock;
1620 
1621   return to;
1622 }
1623 
update_create_info(HA_CREATE_INFO * create_info)1624 void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
1625 {
1626   char tmp_real_path[FN_REFLEN];
1627   DBUG_ENTER("ha_archive::update_create_info");
1628 
1629   ha_archive::info(HA_STATUS_AUTO);
1630   if (!(create_info->used_fields & HA_CREATE_USED_AUTO))
1631   {
1632     create_info->auto_increment_value= stats.auto_increment_value;
1633   }
1634 
1635   if (!(my_readlink(tmp_real_path, share->data_file_name, MYF(0))))
1636     create_info->data_file_name= sql_strdup(tmp_real_path);
1637 
1638   DBUG_VOID_RETURN;
1639 }
1640 
1641 
1642 /*
1643   Hints for optimizer, see ha_tina for more information
1644 */
info(uint flag)1645 int ha_archive::info(uint flag)
1646 {
1647   DBUG_ENTER("ha_archive::info");
1648 
1649   mysql_mutex_lock(&share->mutex);
1650   if (share->dirty)
1651   {
1652     DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
1653     DBUG_ASSERT(share->archive_write_open);
1654     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1655     share->dirty= FALSE;
1656   }
1657 
1658   /*
1659     This should be an accurate number now, though bulk inserts can
1660     cause the number to be inaccurate.
1661   */
1662   stats.records= share->rows_recorded;
1663   mysql_mutex_unlock(&share->mutex);
1664 
1665   stats.deleted= 0;
1666 
1667   DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
1668   /* Costs quite a bit more to get all information */
1669   if (flag & (HA_STATUS_TIME | HA_STATUS_CONST | HA_STATUS_VARIABLE))
1670   {
1671     MY_STAT file_stat;  // Stat information for the data file
1672 
1673     (void) mysql_file_stat(arch_key_file_data, share->data_file_name, &file_stat, MYF(MY_WME));
1674 
1675     if (flag & HA_STATUS_TIME)
1676       stats.update_time= (ulong) file_stat.st_mtime;
1677     if (flag & HA_STATUS_CONST)
1678     {
1679       stats.max_data_file_length= share->rows_recorded * stats.mean_rec_length;
1680       stats.max_data_file_length= MAX_FILE_SIZE;
1681       stats.create_time= (ulong) file_stat.st_ctime;
1682     }
1683     if (flag & HA_STATUS_VARIABLE)
1684     {
1685       stats.delete_length= 0;
1686       stats.data_file_length= file_stat.st_size;
1687       stats.index_file_length=0;
1688       stats.mean_rec_length= stats.records ?
1689         ulong(stats.data_file_length / stats.records) : table->s->reclength;
1690     }
1691   }
1692 
1693   if (flag & HA_STATUS_AUTO)
1694   {
1695     /* TODO: Use the shared writer instead during the lock above. */
1696     init_archive_reader();
1697     mysql_mutex_lock(&share->mutex);
1698     azflush(&archive, Z_SYNC_FLUSH);
1699     mysql_mutex_unlock(&share->mutex);
1700     stats.auto_increment_value= archive.auto_increment + 1;
1701   }
1702 
1703   DBUG_RETURN(0);
1704 }
1705 
1706 
1707 /**
1708   Handler hints.
1709 
1710   @param operation  Operation to prepare for.
1711 
1712   @return Operation status
1713     @return 0    Success
1714     @return != 0 Error
1715 */
1716 
extra(enum ha_extra_function operation)1717 int ha_archive::extra(enum ha_extra_function operation)
1718 {
1719   int ret= 0;
1720   DBUG_ENTER("ha_archive::extra");
1721   /* On windows we need to close all files before rename/delete. */
1722 #ifdef _WIN32
1723   switch (operation)
1724   {
1725   case HA_EXTRA_PREPARE_FOR_RENAME:
1726   case HA_EXTRA_FORCE_REOPEN:
1727     /* Close both reader and writer so we don't have the file open. */
1728     if (archive_reader_open)
1729     {
1730       ret= azclose(&archive);
1731       archive_reader_open= false;
1732     }
1733     mysql_mutex_lock(&share->mutex);
1734     share->close_archive_writer();
1735     mysql_mutex_unlock(&share->mutex);
1736     break;
1737   default:
1738     /* Nothing to do. */
1739     ;
1740   }
1741 #endif
1742   DBUG_RETURN(ret);
1743 }
1744 
1745 
1746 /*
1747   This method tells us that a bulk insert operation is about to occur. We set
1748   a flag which will keep write_row from saying that its data is dirty. This in
1749   turn will keep selects from causing a sync to occur.
1750   Basically, yet another optimizations to keep compression working well.
1751 */
start_bulk_insert(ha_rows rows)1752 void ha_archive::start_bulk_insert(ha_rows rows)
1753 {
1754   DBUG_ENTER("ha_archive::start_bulk_insert");
1755   if (!rows || rows >= ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT)
1756     bulk_insert= TRUE;
1757   DBUG_VOID_RETURN;
1758 }
1759 
1760 
1761 /*
1762   Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
1763   flag, and set the share dirty so that the next select will call sync for us.
1764 */
end_bulk_insert()1765 int ha_archive::end_bulk_insert()
1766 {
1767   DBUG_ENTER("ha_archive::end_bulk_insert");
1768   bulk_insert= FALSE;
1769   mysql_mutex_lock(&share->mutex);
1770   if (share->archive_write_open)
1771     share->dirty= true;
1772   mysql_mutex_unlock(&share->mutex);
1773   DBUG_RETURN(0);
1774 }
1775 
1776 /*
1777   We cancel a truncate command. The only way to delete an archive table is to drop it.
1778   This is done for security reasons. In a later version we will enable this by
1779   allowing the user to select a different row format.
1780 */
truncate()1781 int ha_archive::truncate()
1782 {
1783   DBUG_ENTER("ha_archive::truncate");
1784   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1785 }
1786 
1787 /*
1788   We just return state if asked.
1789 */
is_crashed() const1790 bool ha_archive::is_crashed() const
1791 {
1792   DBUG_ENTER("ha_archive::is_crashed");
1793   DBUG_RETURN(share->crashed);
1794 }
1795 
1796 
1797 /**
1798   @brief Check for upgrade
1799 
1800   @param[in]  check_opt  check options
1801 
1802   @return Completion status
1803     @retval HA_ADMIN_OK            No upgrade required
1804     @retval HA_ADMIN_CORRUPT       Cannot read meta-data
1805     @retval HA_ADMIN_NEEDS_UPGRADE Upgrade required
1806 */
1807 
check_for_upgrade(HA_CHECK_OPT * check_opt)1808 int ha_archive::check_for_upgrade(HA_CHECK_OPT *check_opt)
1809 {
1810   DBUG_ENTER("ha_archive::check_for_upgrade");
1811   if (init_archive_reader())
1812     DBUG_RETURN(HA_ADMIN_CORRUPT);
1813   if (archive.version < ARCHIVE_VERSION)
1814     DBUG_RETURN(HA_ADMIN_NEEDS_UPGRADE);
1815   DBUG_RETURN(HA_ADMIN_OK);
1816 }
1817 
1818 
1819 /*
1820   Simple scan of the tables to make sure everything is ok.
1821 */
1822 
check(THD * thd,HA_CHECK_OPT * check_opt)1823 int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
1824 {
1825   int rc= 0;
1826   const char *old_proc_info;
1827   ha_rows count;
1828   DBUG_ENTER("ha_archive::check");
1829 
1830   old_proc_info= thd_proc_info(thd, "Checking table");
1831   mysql_mutex_lock(&share->mutex);
1832   count= share->rows_recorded;
1833   /* Flush any waiting data */
1834   if (share->archive_write_open)
1835     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1836   mysql_mutex_unlock(&share->mutex);
1837 
1838   if (init_archive_reader())
1839     DBUG_RETURN(HA_ADMIN_CORRUPT);
1840   /*
1841     Now we will rewind the archive file so that we are positioned at the
1842     start of the file.
1843   */
1844   read_data_header(&archive);
1845   for (ha_rows cur_count= count; cur_count; cur_count--)
1846   {
1847     if ((rc= get_row(&archive, table->record[0])))
1848       goto error;
1849   }
1850   /*
1851     Now read records that may have been inserted concurrently.
1852     Acquire share->mutex so tail of the table is not modified by
1853     concurrent writers.
1854   */
1855   mysql_mutex_lock(&share->mutex);
1856   count= share->rows_recorded - count;
1857   if (share->archive_write_open)
1858     azflush(&(share->archive_write), Z_SYNC_FLUSH);
1859   while (!(rc= get_row(&archive, table->record[0])))
1860     count--;
1861   mysql_mutex_unlock(&share->mutex);
1862 
1863   if ((rc && rc != HA_ERR_END_OF_FILE) || count)
1864     goto error;
1865 
1866   thd_proc_info(thd, old_proc_info);
1867   DBUG_RETURN(HA_ADMIN_OK);
1868 
1869 error:
1870   thd_proc_info(thd, old_proc_info);
1871   share->crashed= FALSE;
1872   DBUG_RETURN(HA_ADMIN_CORRUPT);
1873 }
1874 
1875 /*
1876   Check and repair the table if needed.
1877 */
check_and_repair(THD * thd)1878 bool ha_archive::check_and_repair(THD *thd)
1879 {
1880   HA_CHECK_OPT check_opt;
1881   DBUG_ENTER("ha_archive::check_and_repair");
1882 
1883   check_opt.init();
1884 
1885   DBUG_RETURN(repair(thd, &check_opt));
1886 }
1887 
create_record_buffer(unsigned int length)1888 archive_record_buffer *ha_archive::create_record_buffer(unsigned int length)
1889 {
1890   DBUG_ENTER("ha_archive::create_record_buffer");
1891   archive_record_buffer *r;
1892   if (!(r=
1893         (archive_record_buffer*) my_malloc(az_key_memory_record_buffer,
1894                                            sizeof(archive_record_buffer),
1895                                            MYF(MY_WME))))
1896   {
1897     DBUG_RETURN(NULL); /* purecov: inspected */
1898   }
1899   r->length= (int)length;
1900 
1901   if (!(r->buffer= (uchar*) my_malloc(az_key_memory_record_buffer,
1902                                       r->length,
1903                                     MYF(MY_WME))))
1904   {
1905     my_free(r);
1906     DBUG_RETURN(NULL); /* purecov: inspected */
1907   }
1908 
1909   DBUG_RETURN(r);
1910 }
1911 
destroy_record_buffer(archive_record_buffer * r)1912 void ha_archive::destroy_record_buffer(archive_record_buffer *r)
1913 {
1914   DBUG_ENTER("ha_archive::destroy_record_buffer");
1915   my_free(r->buffer);
1916   my_free(r);
1917   DBUG_VOID_RETURN;
1918 }
1919 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1920 bool ha_archive::check_if_incompatible_data(HA_CREATE_INFO *info,
1921                                             uint table_changes)
1922 {
1923   if (info->auto_increment_value != stats.auto_increment_value ||
1924       (info->used_fields & HA_CREATE_USED_DATADIR) ||
1925       info->data_file_name ||
1926       (info->used_fields & HA_CREATE_USED_COMMENT) ||
1927       table_changes != IS_EQUAL_YES)
1928     return COMPATIBLE_DATA_NO;
1929 
1930   return COMPATIBLE_DATA_YES;
1931 }
1932 
1933 
1934 struct st_mysql_storage_engine archive_storage_engine=
1935 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1936 
mysql_declare_plugin(archive)1937 mysql_declare_plugin(archive)
1938 {
1939   MYSQL_STORAGE_ENGINE_PLUGIN,
1940   &archive_storage_engine,
1941   "ARCHIVE",
1942   "Brian Aker, MySQL AB",
1943   "Archive storage engine",
1944   PLUGIN_LICENSE_GPL,
1945   archive_db_init, /* Plugin Init */
1946   NULL, /* Plugin Deinit */
1947   0x0300 /* 3.0 */,
1948   NULL,                       /* status variables                */
1949   NULL,                       /* system variables                */
1950   NULL,                       /* config options                  */
1951   0,                          /* flags                           */
1952 }
1953 mysql_declare_plugin_end;
1954 
1955