1 /* Copyright (c) 2004, 2016, Oracle and/or its affiliates. All rights reserved.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software
21   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /*
24   Make sure to look at ha_tina.h for more details.
25 
26   First off, this is a play thing for me, there are a number of things
27   wrong with it:
28     *) It was designed for csv and therefore its performance is highly
29        questionable.
30     *) Indexes have not been implemented. This is because the files can
31        be traded in and out of the table directory without having to worry
32        about rebuilding anything.
33     *) NULLs and "" are treated equally (like a spreadsheet).
34     *) There was in the beginning no point to anyone seeing this other
35        then me, so there is a good chance that I haven't quite documented
36        it well.
37     *) Less design, more "make it work"
38 
39   Now there are a few cool things with it:
40     *) Errors can result in corrupted data files.
41     *) Data files can be read by spreadsheets directly.
42 
43 TODO:
44  *) Move to a block system for larger files
45  *) Error recovery, its all there, just need to finish it
46  *) Document how the chains work.
47 
48  -Brian
49 */
50 
51 #include "my_global.h"
52 #include "sql_priv.h"
53 #include "sql_class.h"                          // SSV
54 #include <mysql/plugin.h>
55 #include <mysql/psi/mysql_file.h>
56 #include "ha_tina.h"
57 #include "probes_mysql.h"
58 
59 #include <algorithm>
60 
61 using std::min;
62 using std::max;
63 
64 /*
65   uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + uchar
66 */
67 #define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
68   + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)
69 #define TINA_CHECK_HEADER 254 // The number we use to determine corruption
70 #define BLOB_MEMROOT_ALLOC_SIZE 8192
71 
72 /* The file extension */
73 #define CSV_EXT ".CSV"               // The data file
74 #define CSN_EXT ".CSN"               // Files used during repair and update
75 #define CSM_EXT ".CSM"               // Meta file
76 
77 
78 static TINA_SHARE *get_share(const char *table_name, TABLE *table);
79 static int free_share(TINA_SHARE *share);
80 static int read_meta_file(File meta_file, ha_rows *rows);
81 static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
82 
83 extern "C" void tina_get_status(void* param, int concurrent_insert);
84 extern "C" void tina_update_status(void* param);
85 extern "C" my_bool tina_check_status(void* param);
86 
87 /* Stuff for shares */
88 mysql_mutex_t tina_mutex;
89 static HASH tina_open_tables;
90 static handler *tina_create_handler(handlerton *hton,
91                                     TABLE_SHARE *table,
92                                     MEM_ROOT *mem_root);
93 
94 
95 /*****************************************************************************
96  ** TINA tables
97  *****************************************************************************/
98 
99 /*
100   Used for sorting chains with qsort().
101 */
sort_set(tina_set * a,tina_set * b)102 int sort_set (tina_set *a, tina_set *b)
103 {
104   /*
105     We assume that intervals do not intersect. So, it is enought to compare
106     any two points. Here we take start of intervals for comparison.
107   */
108   return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
109 }
110 
tina_get_key(TINA_SHARE * share,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))111 static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
112                           my_bool not_used MY_ATTRIBUTE((unused)))
113 {
114   *length=share->table_name_length;
115   return (uchar*) share->table_name;
116 }
117 
118 #ifdef HAVE_PSI_INTERFACE
119 
120 static PSI_mutex_key csv_key_mutex_tina, csv_key_mutex_TINA_SHARE_mutex;
121 
122 static PSI_mutex_info all_tina_mutexes[]=
123 {
124   { &csv_key_mutex_tina, "tina", PSI_FLAG_GLOBAL},
125   { &csv_key_mutex_TINA_SHARE_mutex, "TINA_SHARE::mutex", 0}
126 };
127 
128 static PSI_file_key csv_key_file_metadata, csv_key_file_data,
129   csv_key_file_update;
130 
131 static PSI_file_info all_tina_files[]=
132 {
133   { &csv_key_file_metadata, "metadata", 0},
134   { &csv_key_file_data, "data", 0},
135   { &csv_key_file_update, "update", 0}
136 };
137 
init_tina_psi_keys(void)138 static void init_tina_psi_keys(void)
139 {
140   const char* category= "csv";
141   int count;
142 
143   count= array_elements(all_tina_mutexes);
144   mysql_mutex_register(category, all_tina_mutexes, count);
145 
146   count= array_elements(all_tina_files);
147   mysql_file_register(category, all_tina_files, count);
148 }
149 #endif /* HAVE_PSI_INTERFACE */
150 
tina_init_func(void * p)151 static int tina_init_func(void *p)
152 {
153   handlerton *tina_hton;
154 
155 #ifdef HAVE_PSI_INTERFACE
156   init_tina_psi_keys();
157 #endif
158 
159   tina_hton= (handlerton *)p;
160   mysql_mutex_init(csv_key_mutex_tina, &tina_mutex, MY_MUTEX_INIT_FAST);
161   (void) my_hash_init(&tina_open_tables,system_charset_info,32,0,0,
162                       (my_hash_get_key) tina_get_key,0,0);
163   tina_hton->state= SHOW_OPTION_YES;
164   tina_hton->db_type= DB_TYPE_CSV_DB;
165   tina_hton->create= tina_create_handler;
166   tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
167                      HTON_NO_PARTITION);
168   return 0;
169 }
170 
tina_done_func(void * p)171 static int tina_done_func(void *p)
172 {
173   my_hash_free(&tina_open_tables);
174   mysql_mutex_destroy(&tina_mutex);
175 
176   return 0;
177 }
178 
179 
180 /*
181   Simple lock controls.
182 */
get_share(const char * table_name,TABLE * table)183 static TINA_SHARE *get_share(const char *table_name, TABLE *table)
184 {
185   TINA_SHARE *share;
186   char meta_file_name[FN_REFLEN];
187   MY_STAT file_stat;                /* Stat information for the data file */
188   char *tmp_name;
189   uint length;
190 
191   mysql_mutex_lock(&tina_mutex);
192   length=(uint) strlen(table_name);
193 
194   /*
195     If share is not present in the hash, create a new share and
196     initialize its members.
197   */
198   if (!(share=(TINA_SHARE*) my_hash_search(&tina_open_tables,
199                                            (uchar*) table_name,
200                                            length)))
201   {
202     if (!my_multi_malloc(MYF(MY_WME | MY_ZEROFILL),
203                          &share, sizeof(*share),
204                          &tmp_name, length+1,
205                          NullS))
206     {
207       mysql_mutex_unlock(&tina_mutex);
208       return NULL;
209     }
210 
211     share->use_count= 0;
212     share->is_log_table= FALSE;
213     share->table_name_length= length;
214     share->table_name= tmp_name;
215     share->crashed= FALSE;
216     share->rows_recorded= 0;
217     share->update_file_opened= FALSE;
218     share->tina_write_opened= FALSE;
219     share->data_file_version= 0;
220     strmov(share->table_name, table_name);
221     fn_format(share->data_file_name, table_name, "", CSV_EXT,
222               MY_REPLACE_EXT|MY_UNPACK_FILENAME);
223     fn_format(meta_file_name, table_name, "", CSM_EXT,
224               MY_REPLACE_EXT|MY_UNPACK_FILENAME);
225 
226     if (mysql_file_stat(csv_key_file_data,
227                         share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
228       goto error;
229     share->saved_data_file_length= file_stat.st_size;
230 
231     if (my_hash_insert(&tina_open_tables, (uchar*) share))
232       goto error;
233     thr_lock_init(&share->lock);
234     mysql_mutex_init(csv_key_mutex_TINA_SHARE_mutex,
235                      &share->mutex, MY_MUTEX_INIT_FAST);
236 
237     /*
238       Open or create the meta file. In the latter case, we'll get
239       an error during read_meta_file and mark the table as crashed.
240       Usually this will result in auto-repair, and we will get a good
241       meta-file in the end.
242     */
243     if (((share->meta_file= mysql_file_open(csv_key_file_metadata,
244                                             meta_file_name,
245                                             O_RDWR|O_CREAT,
246                                             MYF(MY_WME))) == -1) ||
247         read_meta_file(share->meta_file, &share->rows_recorded))
248       share->crashed= TRUE;
249   }
250 
251   share->use_count++;
252   mysql_mutex_unlock(&tina_mutex);
253 
254   return share;
255 
256 error:
257   mysql_mutex_unlock(&tina_mutex);
258   my_free(share);
259 
260   return NULL;
261 }
262 
263 
264 /*
265   Read CSV meta-file
266 
267   SYNOPSIS
268     read_meta_file()
269     meta_file   The meta-file filedes
270     ha_rows     Pointer to the var we use to store rows count.
271                 These are read from the meta-file.
272 
273   DESCRIPTION
274 
275     Read the meta-file info. For now we are only interested in
276     rows counf, crashed bit and magic number.
277 
278   RETURN
279     0 - OK
280     non-zero - error occurred
281 */
282 
read_meta_file(File meta_file,ha_rows * rows)283 static int read_meta_file(File meta_file, ha_rows *rows)
284 {
285   uchar meta_buffer[META_BUFFER_SIZE];
286   uchar *ptr= meta_buffer;
287 
288   DBUG_ENTER("ha_tina::read_meta_file");
289 
290   mysql_file_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
291   if (mysql_file_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
292       != META_BUFFER_SIZE)
293     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
294 
295   /*
296     Parse out the meta data, we ignore version at the moment
297   */
298 
299   ptr+= sizeof(uchar)*2; // Move past header
300   *rows= (ha_rows)uint8korr(ptr);
301   ptr+= sizeof(ulonglong); // Move past rows
302   /*
303     Move past check_point, auto_increment and forced_flushes fields.
304     They are present in the format, but we do not use them yet.
305   */
306   ptr+= 3*sizeof(ulonglong);
307 
308   /* check crashed bit and magic number */
309   if ((meta_buffer[0] != (uchar)TINA_CHECK_HEADER) ||
310       ((bool)(*ptr)== TRUE))
311     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
312 
313   mysql_file_sync(meta_file, MYF(MY_WME));
314 
315   DBUG_RETURN(0);
316 }
317 
318 
319 /*
320   Write CSV meta-file
321 
322   SYNOPSIS
323     write_meta_file()
324     meta_file   The meta-file filedes
325     ha_rows     The number of rows we have in the datafile.
326     dirty       A flag, which marks whether we have a corrupt table
327 
328   DESCRIPTION
329 
330     Write meta-info the the file. Only rows count, crashed bit and
331     magic number matter now.
332 
333   RETURN
334     0 - OK
335     non-zero - error occurred
336 */
337 
write_meta_file(File meta_file,ha_rows rows,bool dirty)338 static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
339 {
340   uchar meta_buffer[META_BUFFER_SIZE];
341   uchar *ptr= meta_buffer;
342 
343   DBUG_ENTER("ha_tina::write_meta_file");
344 
345   *ptr= (uchar)TINA_CHECK_HEADER;
346   ptr+= sizeof(uchar);
347   *ptr= (uchar)TINA_VERSION;
348   ptr+= sizeof(uchar);
349   int8store(ptr, (ulonglong)rows);
350   ptr+= sizeof(ulonglong);
351   memset(ptr, 0, 3*sizeof(ulonglong));
352   /*
353      Skip over checkpoint, autoincrement and forced_flushes fields.
354      We'll need them later.
355   */
356   ptr+= 3*sizeof(ulonglong);
357   *ptr= (uchar)dirty;
358 
359   mysql_file_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
360   if (mysql_file_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
361       != META_BUFFER_SIZE)
362     DBUG_RETURN(-1);
363 
364   mysql_file_sync(meta_file, MYF(MY_WME));
365 
366   DBUG_RETURN(0);
367 }
368 
check_and_repair(THD * thd)369 bool ha_tina::check_and_repair(THD *thd)
370 {
371   HA_CHECK_OPT check_opt;
372   DBUG_ENTER("ha_tina::check_and_repair");
373 
374   check_opt.init();
375 
376   DBUG_RETURN(repair(thd, &check_opt));
377 }
378 
379 
init_tina_writer()380 int ha_tina::init_tina_writer()
381 {
382   DBUG_ENTER("ha_tina::init_tina_writer");
383 
384   /*
385     Mark the file as crashed. We will set the flag back when we close
386     the file. In the case of the crash it will remain marked crashed,
387     which enforce recovery.
388   */
389   (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
390 
391   if ((share->tina_write_filedes=
392         mysql_file_open(csv_key_file_data,
393                         share->data_file_name, O_RDWR|O_APPEND,
394                         MYF(MY_WME))) == -1)
395   {
396     DBUG_PRINT("info", ("Could not open tina file writes"));
397     share->crashed= TRUE;
398     DBUG_RETURN(my_errno ? my_errno : -1);
399   }
400   share->tina_write_opened= TRUE;
401 
402   DBUG_RETURN(0);
403 }
404 
405 
is_crashed() const406 bool ha_tina::is_crashed() const
407 {
408   DBUG_ENTER("ha_tina::is_crashed");
409   DBUG_RETURN(share->crashed);
410 }
411 
412 /*
413   Free lock controls.
414 */
free_share(TINA_SHARE * share)415 static int free_share(TINA_SHARE *share)
416 {
417   DBUG_ENTER("ha_tina::free_share");
418   mysql_mutex_lock(&tina_mutex);
419   int result_code= 0;
420   if (!--share->use_count){
421     /* Write the meta file. Mark it as crashed if needed. */
422     (void)write_meta_file(share->meta_file, share->rows_recorded,
423                           share->crashed ? TRUE :FALSE);
424     if (mysql_file_close(share->meta_file, MYF(0)))
425       result_code= 1;
426     if (share->tina_write_opened)
427     {
428       if (mysql_file_close(share->tina_write_filedes, MYF(0)))
429         result_code= 1;
430       share->tina_write_opened= FALSE;
431     }
432 
433     my_hash_delete(&tina_open_tables, (uchar*) share);
434     thr_lock_delete(&share->lock);
435     mysql_mutex_destroy(&share->mutex);
436     my_free(share);
437   }
438   mysql_mutex_unlock(&tina_mutex);
439 
440   DBUG_RETURN(result_code);
441 }
442 
443 
444 /*
445   This function finds the end of a line and returns the length
446   of the line ending.
447 
448   We support three kinds of line endings:
449   '\r'     --  Old Mac OS line ending
450   '\n'     --  Traditional Unix and Mac OS X line ending
451   '\r''\n' --  DOS\Windows line ending
452 */
453 
find_eoln_buff(Transparent_file * data_buff,my_off_t begin,my_off_t end,int * eoln_len)454 my_off_t find_eoln_buff(Transparent_file *data_buff, my_off_t begin,
455                      my_off_t end, int *eoln_len)
456 {
457   *eoln_len= 0;
458 
459   for (my_off_t x= begin; x < end; x++)
460   {
461     /* Unix (includes Mac OS X) */
462     if (data_buff->get_value(x) == '\n')
463       *eoln_len= 1;
464     else
465       if (data_buff->get_value(x) == '\r') // Mac or Dos
466       {
467         /* old Mac line ending */
468         if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
469           *eoln_len= 1;
470         else // DOS style ending
471           *eoln_len= 2;
472       }
473 
474     if (*eoln_len)  // end of line was found
475       return x;
476   }
477 
478   return 0;
479 }
480 
481 
tina_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)482 static handler *tina_create_handler(handlerton *hton,
483                                     TABLE_SHARE *table,
484                                     MEM_ROOT *mem_root)
485 {
486   return new (mem_root) ha_tina(hton, table);
487 }
488 
489 
ha_tina(handlerton * hton,TABLE_SHARE * table_arg)490 ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
491   :handler(hton, table_arg),
492   /*
493     These definitions are found in handler.h
494     They are not probably completely right.
495   */
496   current_position(0), next_position(0), local_saved_data_file_length(0),
497   file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
498   local_data_file_version(0), records_is_known(0)
499 {
500   /* Set our original buffers from pre-allocated memory */
501   buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
502   chain= chain_buffer;
503   file_buff= new Transparent_file();
504   init_alloc_root(&blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);;
505 }
506 
507 
508 /*
509   Encode a buffer into the quoted format.
510 */
511 
encode_quote(uchar * buf)512 int ha_tina::encode_quote(uchar *buf)
513 {
514   char attribute_buffer[1024];
515   String attribute(attribute_buffer, sizeof(attribute_buffer),
516                    &my_charset_bin);
517 
518   my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
519   buffer.length(0);
520 
521   for (Field **field=table->field ; *field ; field++)
522   {
523     const char *ptr;
524     const char *end_ptr;
525     const bool was_null= (*field)->is_null();
526 
527     /*
528       assistance for backwards compatibility in production builds.
529       note: this will not work for ENUM columns.
530     */
531     if (was_null)
532     {
533       (*field)->set_default();
534       (*field)->set_notnull();
535     }
536 
537     (*field)->val_str(&attribute,&attribute);
538 
539     if (was_null)
540       (*field)->set_null();
541 
542     if ((*field)->str_needs_quotes())
543     {
544       ptr= attribute.ptr();
545       end_ptr= attribute.length() + ptr;
546 
547       buffer.append('"');
548 
549       for (; ptr < end_ptr; ptr++)
550       {
551         if (*ptr == '"')
552         {
553           buffer.append('\\');
554           buffer.append('"');
555         }
556         else if (*ptr == '\r')
557         {
558           buffer.append('\\');
559           buffer.append('r');
560         }
561         else if (*ptr == '\\')
562         {
563           buffer.append('\\');
564           buffer.append('\\');
565         }
566         else if (*ptr == '\n')
567         {
568           buffer.append('\\');
569           buffer.append('n');
570         }
571         else
572           buffer.append(*ptr);
573       }
574       buffer.append('"');
575     }
576     else
577     {
578       buffer.append(attribute);
579     }
580 
581     buffer.append(',');
582   }
583   // Remove the comma, add a line feed
584   buffer.length(buffer.length() - 1);
585   buffer.append('\n');
586 
587   //buffer.replace(buffer.length(), 0, "\n", 1);
588 
589   dbug_tmp_restore_column_map(table->read_set, org_bitmap);
590   return (buffer.length());
591 }
592 
593 /*
594   chain_append() adds delete positions to the chain that we use to keep
595   track of space. Then the chain will be used to cleanup "holes", occurred
596   due to deletes and updates.
597 */
chain_append()598 int ha_tina::chain_append()
599 {
600   if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
601     (chain_ptr -1)->end= next_position;
602   else
603   {
604     /* We set up for the next position */
605     if ((off_t)(chain_ptr - chain) == (chain_size -1))
606     {
607       my_off_t location= chain_ptr - chain;
608       chain_size += DEFAULT_CHAIN_LENGTH;
609       if (chain_alloced)
610       {
611         /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
612         if ((chain= (tina_set *) my_realloc((uchar*)chain,
613                                             chain_size, MYF(MY_WME))) == NULL)
614           return -1;
615       }
616       else
617       {
618         tina_set *ptr= (tina_set *) my_malloc(chain_size * sizeof(tina_set),
619                                               MYF(MY_WME));
620         memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
621         chain= ptr;
622         chain_alloced++;
623       }
624       chain_ptr= chain + location;
625     }
626     chain_ptr->begin= current_position;
627     chain_ptr->end= next_position;
628     chain_ptr++;
629   }
630 
631   return 0;
632 }
633 
634 
635 /*
636   Scans for a row.
637 */
find_current_row(uchar * buf)638 int ha_tina::find_current_row(uchar *buf)
639 {
640   my_off_t end_offset, curr_offset= current_position;
641   int eoln_len;
642   my_bitmap_map *org_bitmap;
643   int error;
644   bool read_all;
645   DBUG_ENTER("ha_tina::find_current_row");
646 
647   free_root(&blobroot, MYF(0));
648 
649   /*
650     We do not read further then local_saved_data_file_length in order
651     not to conflict with undergoing concurrent insert.
652   */
653   if ((end_offset=
654         find_eoln_buff(file_buff, current_position,
655                        local_saved_data_file_length, &eoln_len)) == 0)
656     DBUG_RETURN(HA_ERR_END_OF_FILE);
657 
658   /* We must read all columns in case a table is opened for update */
659   read_all= !bitmap_is_clear_all(table->write_set);
660   /* Avoid asserts in ::store() for columns that are not going to be updated */
661   org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
662   error= HA_ERR_CRASHED_ON_USAGE;
663 
664   memset(buf, 0, table->s->null_bytes);
665 
666   /*
667     Parse the line obtained using the following algorithm
668 
669     BEGIN
670       1) Store the EOL (end of line) for the current row
671       2) Until all the fields in the current query have not been
672          filled
673          2.1) If the current character is a quote
674               2.1.1) Until EOL has not been reached
675                      a) If end of current field is reached, move
676                         to next field and jump to step 2.3
677                      b) If current character is a \\ handle
678                         \\n, \\r, \\, \\"
679                      c) else append the current character into the buffer
680                         before checking that EOL has not been reached.
681           2.2) If the current character does not begin with a quote
682                2.2.1) Until EOL has not been reached
683                       a) If the end of field has been reached move to the
684                          next field and jump to step 2.3
685                       b) If current character begins with \\ handle
686                         \\n, \\r, \\, \\"
687                       c) else append the current character into the buffer
688                          before checking that EOL has not been reached.
689           2.3) Store the current field value and jump to 2)
690     TERMINATE
691   */
692 
693   for (Field **field=table->field ; *field ; field++)
694   {
695     char curr_char;
696 
697     buffer.length(0);
698     if (curr_offset >= end_offset)
699       goto err;
700     curr_char= file_buff->get_value(curr_offset);
701     /* Handle the case where the first character is a quote */
702     if (curr_char == '"')
703     {
704       /* Increment past the first quote */
705       curr_offset++;
706 
707       /* Loop through the row to extract the values for the current field */
708       for ( ; curr_offset < end_offset; curr_offset++)
709       {
710         curr_char= file_buff->get_value(curr_offset);
711         /* check for end of the current field */
712         if (curr_char == '"' &&
713             (curr_offset == end_offset - 1 ||
714              file_buff->get_value(curr_offset + 1) == ','))
715         {
716           /* Move past the , and the " */
717           curr_offset+= 2;
718           break;
719         }
720         if (curr_char == '\\' && curr_offset != (end_offset - 1))
721         {
722           curr_offset++;
723           curr_char= file_buff->get_value(curr_offset);
724           if (curr_char == 'r')
725             buffer.append('\r');
726           else if (curr_char == 'n' )
727             buffer.append('\n');
728           else if (curr_char == '\\' || curr_char == '"')
729             buffer.append(curr_char);
730           else  /* This could only happed with an externally created file */
731           {
732             buffer.append('\\');
733             buffer.append(curr_char);
734           }
735         }
736         else // ordinary symbol
737         {
738           /*
739             If we are at final symbol and no last quote was found =>
740             we are working with a damaged file.
741           */
742           if (curr_offset == end_offset - 1)
743             goto err;
744           buffer.append(curr_char);
745         }
746       }
747     }
748     else
749     {
750       for ( ; curr_offset < end_offset; curr_offset++)
751       {
752         curr_char= file_buff->get_value(curr_offset);
753         /* Move past the ,*/
754         if (curr_char == ',')
755         {
756           curr_offset++;
757           break;
758         }
759         if (curr_char == '\\' && curr_offset != (end_offset - 1))
760         {
761           curr_offset++;
762           curr_char= file_buff->get_value(curr_offset);
763           if (curr_char == 'r')
764             buffer.append('\r');
765           else if (curr_char == 'n' )
766             buffer.append('\n');
767           else if (curr_char == '\\' || curr_char == '"')
768             buffer.append(curr_char);
769           else  /* This could only happed with an externally created file */
770           {
771             buffer.append('\\');
772             buffer.append(curr_char);
773           }
774         }
775         else
776         {
777           /*
778              We are at the final symbol and a quote was found for the
779              unquoted field => We are working with a damaged field.
780           */
781           if (curr_offset == end_offset - 1 && curr_char == '"')
782             goto err;
783           buffer.append(curr_char);
784         }
785       }
786     }
787 
788     if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
789     {
790       bool is_enum= ((*field)->real_type() ==  MYSQL_TYPE_ENUM);
791       /*
792         Here CHECK_FIELD_WARN checks that all values in the csv file are valid
793         which is normally the case, if they were written  by
794         INSERT -> ha_tina::write_row. '0' values on ENUM fields are considered
795         invalid by Field_enum::store() but it can store them on INSERT anyway.
796         Thus, for enums we silence the warning, as it doesn't really mean
797         an invalid value.
798       */
799       if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
800                           is_enum ? CHECK_FIELD_IGNORE : CHECK_FIELD_WARN))
801       {
802         if (!is_enum)
803           goto err;
804       }
805       if ((*field)->flags & BLOB_FLAG)
806       {
807         Field_blob *blob= *(Field_blob**) field;
808         uchar *src, *tgt;
809         uint length, packlength;
810 
811         packlength= blob->pack_length_no_ptr();
812         length= blob->get_length(blob->ptr);
813         memcpy(&src, blob->ptr + packlength, sizeof(char*));
814         if (src)
815         {
816           tgt= (uchar*) alloc_root(&blobroot, length);
817           bmove(tgt, src, length);
818           memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
819         }
820       }
821     }
822   }
823   next_position= end_offset + eoln_len;
824   error= 0;
825 
826 err:
827   dbug_tmp_restore_column_map(table->write_set, org_bitmap);
828 
829   DBUG_RETURN(error);
830 }
831 
832 /*
833   If frm_error() is called in table.cc this is called to find out what file
834   extensions exist for this handler.
835 */
836 static const char *ha_tina_exts[] = {
837   CSV_EXT,
838   CSM_EXT,
839   NullS
840 };
841 
bas_ext() const842 const char **ha_tina::bas_ext() const
843 {
844   return ha_tina_exts;
845 }
846 
847 /*
848   Three functions below are needed to enable concurrent insert functionality
849   for CSV engine. For more details see mysys/thr_lock.c
850 */
851 
tina_get_status(void * param,int concurrent_insert)852 void tina_get_status(void* param, int concurrent_insert)
853 {
854   ha_tina *tina= (ha_tina*) param;
855   tina->get_status();
856 }
857 
tina_update_status(void * param)858 void tina_update_status(void* param)
859 {
860   ha_tina *tina= (ha_tina*) param;
861   tina->update_status();
862 }
863 
864 /* this should exist and return 0 for concurrent insert to work */
tina_check_status(void * param)865 my_bool tina_check_status(void* param)
866 {
867   return 0;
868 }
869 
870 /*
871   Save the state of the table
872 
873   SYNOPSIS
874     get_status()
875 
876   DESCRIPTION
877     This function is used to retrieve the file length. During the lock
878     phase of concurrent insert. For more details see comment to
879     ha_tina::update_status below.
880 */
881 
get_status()882 void ha_tina::get_status()
883 {
884   if (share->is_log_table)
885   {
886     /*
887       We have to use mutex to follow pthreads memory visibility
888       rules for share->saved_data_file_length
889     */
890     mysql_mutex_lock(&share->mutex);
891     local_saved_data_file_length= share->saved_data_file_length;
892     mysql_mutex_unlock(&share->mutex);
893     return;
894   }
895   local_saved_data_file_length= share->saved_data_file_length;
896 }
897 
898 
899 /*
900   Correct the state of the table. Called by unlock routines
901   before the write lock is released.
902 
903   SYNOPSIS
904     update_status()
905 
906   DESCRIPTION
907     When we employ concurrent insert lock, we save current length of the file
908     during the lock phase. We do not read further saved value, as we don't
909     want to interfere with undergoing concurrent insert. Writers update file
910     length info during unlock with update_status().
911 
912   NOTE
913     For log tables concurrent insert works different. The reason is that
914     log tables are always opened and locked. And as they do not unlock
915     tables, the file length after writes should be updated in a different
916     way. For this purpose we need is_log_table flag. When this flag is set
917     we call update_status() explicitly after each row write.
918 */
919 
update_status()920 void ha_tina::update_status()
921 {
922   /* correct local_saved_data_file_length for writers */
923   share->saved_data_file_length= local_saved_data_file_length;
924 }
925 
926 
927 /*
928   Open a database file. Keep in mind that tables are caches, so
929   this will not be called for every request. Any sort of positions
930   that need to be reset should be kept in the ::extra() call.
931 */
open(const char * name,int mode,uint open_options)932 int ha_tina::open(const char *name, int mode, uint open_options)
933 {
934   DBUG_ENTER("ha_tina::open");
935 
936   if (!(share= get_share(name, table)))
937     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
938 
939   if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
940   {
941     free_share(share);
942     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
943   }
944 
945   local_data_file_version= share->data_file_version;
946   if ((data_file= mysql_file_open(csv_key_file_data,
947                                   share->data_file_name,
948                                   O_RDONLY, MYF(MY_WME))) == -1)
949   {
950     free_share(share);
951     DBUG_RETURN(my_errno ? my_errno : -1);
952   }
953 
954   /*
955     Init locking. Pass handler object to the locking routines,
956     so that they could save/update local_saved_data_file_length value
957     during locking. This is needed to enable concurrent inserts.
958   */
959   thr_lock_data_init(&share->lock, &lock, (void*) this);
960   ref_length= sizeof(my_off_t);
961 
962   share->lock.get_status= tina_get_status;
963   share->lock.update_status= tina_update_status;
964   share->lock.check_status= tina_check_status;
965 
966   DBUG_RETURN(0);
967 }
968 
969 
970 /*
971   Close a database file. We remove ourselves from the shared strucutre.
972   If it is empty we destroy it.
973 */
close(void)974 int ha_tina::close(void)
975 {
976   int rc= 0;
977   DBUG_ENTER("ha_tina::close");
978   rc= mysql_file_close(data_file, MYF(0));
979   DBUG_RETURN(free_share(share) || rc);
980 }
981 
982 /*
983   This is an INSERT. At the moment this handler just seeks to the end
984   of the file and appends the data. In an error case it really should
985   just truncate to the original position (this is not done yet).
986 */
write_row(uchar * buf)987 int ha_tina::write_row(uchar * buf)
988 {
989   int size;
990   DBUG_ENTER("ha_tina::write_row");
991 
992   if (share->crashed)
993       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
994 
995   ha_statistic_increment(&SSV::ha_write_count);
996 
997   size= encode_quote(buf);
998 
999   if (!share->tina_write_opened)
1000     if (init_tina_writer())
1001       DBUG_RETURN(-1);
1002 
1003    /* use pwrite, as concurrent reader could have changed the position */
1004   if (mysql_file_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
1005                        MYF(MY_WME | MY_NABP)))
1006     DBUG_RETURN(-1);
1007 
1008   /* update local copy of the max position to see our own changes */
1009   local_saved_data_file_length+= size;
1010 
1011   /* update shared info */
1012   mysql_mutex_lock(&share->mutex);
1013   share->rows_recorded++;
1014   /* update status for the log tables */
1015   if (share->is_log_table)
1016     update_status();
1017   mysql_mutex_unlock(&share->mutex);
1018 
1019   stats.records++;
1020   DBUG_RETURN(0);
1021 }
1022 
1023 
open_update_temp_file_if_needed()1024 int ha_tina::open_update_temp_file_if_needed()
1025 {
1026   char updated_fname[FN_REFLEN];
1027 
1028   if (!share->update_file_opened)
1029   {
1030     if ((update_temp_file=
1031            mysql_file_create(csv_key_file_update,
1032                              fn_format(updated_fname, share->table_name,
1033                                        "", CSN_EXT,
1034                                        MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1035                              0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1036       return 1;
1037     share->update_file_opened= TRUE;
1038     temp_file_length= 0;
1039   }
1040   return 0;
1041 }
1042 
1043 /*
1044   This is called for an update.
1045   Make sure you put in code to increment the auto increment.
1046   Currently auto increment is not being
1047   fixed since autoincrements have yet to be added to this table handler.
1048   This will be called in a table scan right before the previous ::rnd_next()
1049   call.
1050 */
update_row(const uchar * old_data,uchar * new_data)1051 int ha_tina::update_row(const uchar * old_data, uchar * new_data)
1052 {
1053   int size;
1054   int rc= -1;
1055   DBUG_ENTER("ha_tina::update_row");
1056 
1057   ha_statistic_increment(&SSV::ha_update_count);
1058 
1059   size= encode_quote(new_data);
1060 
1061   /*
1062     During update we mark each updating record as deleted
1063     (see the chain_append()) then write new one to the temporary data file.
1064     At the end of the sequence in the rnd_end() we append all non-marked
1065     records from the data file to the temporary data file then rename it.
1066     The temp_file_length is used to calculate new data file length.
1067   */
1068   if (chain_append())
1069     goto err;
1070 
1071   if (open_update_temp_file_if_needed())
1072     goto err;
1073 
1074   if (mysql_file_write(update_temp_file, (uchar*)buffer.ptr(), size,
1075                        MYF(MY_WME | MY_NABP)))
1076     goto err;
1077   temp_file_length+= size;
1078   rc= 0;
1079 
1080   /* UPDATE should never happen on the log tables */
1081   DBUG_ASSERT(!share->is_log_table);
1082 
1083 err:
1084   DBUG_PRINT("info",("rc = %d", rc));
1085   DBUG_RETURN(rc);
1086 }
1087 
1088 
1089 /*
1090   Deletes a row. First the database will find the row, and then call this
1091   method. In the case of a table scan, the previous call to this will be
1092   the ::rnd_next() that found this row.
1093   The exception to this is an ORDER BY. This will cause the table handler
1094   to walk the table noting the positions of all rows that match a query.
1095   The table will then be deleted/positioned based on the ORDER (so RANDOM,
1096   DESC, ASC).
1097 */
delete_row(const uchar * buf)1098 int ha_tina::delete_row(const uchar * buf)
1099 {
1100   DBUG_ENTER("ha_tina::delete_row");
1101   ha_statistic_increment(&SSV::ha_delete_count);
1102 
1103   if (chain_append())
1104     DBUG_RETURN(-1);
1105 
1106   stats.records--;
1107   /* Update shared info */
1108   DBUG_ASSERT(share->rows_recorded);
1109   mysql_mutex_lock(&share->mutex);
1110   share->rows_recorded--;
1111   mysql_mutex_unlock(&share->mutex);
1112 
1113   /* DELETE should never happen on the log table */
1114   DBUG_ASSERT(!share->is_log_table);
1115 
1116   DBUG_RETURN(0);
1117 }
1118 
1119 
1120 /**
1121   @brief Initialize the data file.
1122 
1123   @details Compare the local version of the data file with the shared one.
1124   If they differ, there are some changes behind and we have to reopen
1125   the data file to make the changes visible.
1126   Call @c file_buff->init_buff() at the end to read the beginning of the
1127   data file into buffer.
1128 
1129   @retval  0  OK.
1130   @retval  1  There was an error.
1131 */
1132 
init_data_file()1133 int ha_tina::init_data_file()
1134 {
1135   if (local_data_file_version != share->data_file_version)
1136   {
1137     local_data_file_version= share->data_file_version;
1138     if (mysql_file_close(data_file, MYF(0)) ||
1139         (data_file= mysql_file_open(csv_key_file_data,
1140                                     share->data_file_name, O_RDONLY,
1141                                     MYF(MY_WME))) == -1)
1142       return my_errno ? my_errno : -1;
1143   }
1144   file_buff->init_buff(data_file);
1145   return 0;
1146 }
1147 
1148 
1149 /*
1150   All table scans call this first.
1151   The order of a table scan is:
1152 
1153   ha_tina::store_lock
1154   ha_tina::external_lock
1155   ha_tina::info
1156   ha_tina::rnd_init
1157   ha_tina::extra
1158   ENUM HA_EXTRA_CACHE   Cash record in HA_rrnd()
1159   ha_tina::rnd_next
1160   ha_tina::rnd_next
1161   ha_tina::rnd_next
1162   ha_tina::rnd_next
1163   ha_tina::rnd_next
1164   ha_tina::rnd_next
1165   ha_tina::rnd_next
1166   ha_tina::rnd_next
1167   ha_tina::rnd_next
1168   ha_tina::extra
1169   ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
1170   ha_tina::external_lock
1171   ha_tina::extra
1172   ENUM HA_EXTRA_RESET   Reset database to after open
1173 
1174   Each call to ::rnd_next() represents a row returned in the can. When no more
1175   rows can be returned, rnd_next() returns a value of HA_ERR_END_OF_FILE.
1176   The ::info() call is just for the optimizer.
1177 
1178 */
1179 
rnd_init(bool scan)1180 int ha_tina::rnd_init(bool scan)
1181 {
1182   DBUG_ENTER("ha_tina::rnd_init");
1183 
1184   /* set buffer to the beginning of the file */
1185   if (share->crashed || init_data_file())
1186     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1187 
1188   current_position= next_position= 0;
1189   stats.records= 0;
1190   records_is_known= 0;
1191   chain_ptr= chain;
1192 
1193   DBUG_RETURN(0);
1194 }
1195 
1196 /*
1197   ::rnd_next() does all the heavy lifting for a table scan. You will need to
1198   populate *buf with the correct field data. You can walk the field to
1199   determine at what position you should store the data (take a look at how
1200   ::find_current_row() works). The structure is something like:
1201   0Foo  Dog  Friend
1202   The first offset is for the first attribute. All space before that is
1203   reserved for null count.
1204   Basically this works as a mask for which rows are nulled (compared to just
1205   empty).
1206   This table handler doesn't do nulls and does not know the difference between
1207   NULL and "". This is ok since this table handler is for spreadsheets and
1208   they don't know about them either :)
1209 */
rnd_next(uchar * buf)1210 int ha_tina::rnd_next(uchar *buf)
1211 {
1212   int rc;
1213   DBUG_ENTER("ha_tina::rnd_next");
1214   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
1215                        TRUE);
1216 
1217   if (share->crashed)
1218   {
1219     rc= HA_ERR_CRASHED_ON_USAGE;
1220     goto end;
1221   }
1222 
1223   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1224 
1225   current_position= next_position;
1226 
1227   /* don't scan an empty file */
1228   if (!local_saved_data_file_length)
1229   {
1230     rc= HA_ERR_END_OF_FILE;
1231     goto end;
1232   }
1233 
1234   if ((rc= find_current_row(buf)))
1235     goto end;
1236 
1237   stats.records++;
1238   rc= 0;
1239 end:
1240   MYSQL_READ_ROW_DONE(rc);
1241   DBUG_RETURN(rc);
1242 }
1243 
1244 /*
1245   In the case of an order by rows will need to be sorted.
1246   ::position() is called after each call to ::rnd_next(),
1247   the data it stores is to a byte array. You can store this
1248   data via my_store_ptr(). ref_length is a variable defined to the
1249   class that is the sizeof() of position being stored. In our case
1250   its just a position. Look at the bdb code if you want to see a case
1251   where something other then a number is stored.
1252 */
position(const uchar * record)1253 void ha_tina::position(const uchar *record)
1254 {
1255   DBUG_ENTER("ha_tina::position");
1256   my_store_ptr(ref, ref_length, current_position);
1257   DBUG_VOID_RETURN;
1258 }
1259 
1260 
1261 /*
1262   Used to fetch a row from a posiion stored with ::position().
1263   my_get_ptr() retrieves the data for you.
1264 */
1265 
rnd_pos(uchar * buf,uchar * pos)1266 int ha_tina::rnd_pos(uchar * buf, uchar *pos)
1267 {
1268   int rc;
1269   DBUG_ENTER("ha_tina::rnd_pos");
1270   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
1271                        FALSE);
1272   ha_statistic_increment(&SSV::ha_read_rnd_count);
1273   current_position= my_get_ptr(pos,ref_length);
1274   rc= find_current_row(buf);
1275   MYSQL_READ_ROW_DONE(rc);
1276   DBUG_RETURN(rc);
1277 }
1278 
1279 /*
1280   ::info() is used to return information to the optimizer.
1281   Currently this table handler doesn't implement most of the fields
1282   really needed. SHOW also makes use of this data
1283 */
info(uint flag)1284 int ha_tina::info(uint flag)
1285 {
1286   DBUG_ENTER("ha_tina::info");
1287   /* This is a lie, but you don't want the optimizer to see zero or 1 */
1288   if (!records_is_known && stats.records < 2)
1289     stats.records= 2;
1290   DBUG_RETURN(0);
1291 }
1292 
1293 /*
1294   Grab bag of flags that are sent to the able handler every so often.
1295   HA_EXTRA_RESET and HA_EXTRA_RESET_STATE are the most frequently called.
1296   You are not required to implement any of these.
1297 */
extra(enum ha_extra_function operation)1298 int ha_tina::extra(enum ha_extra_function operation)
1299 {
1300   DBUG_ENTER("ha_tina::extra");
1301  if (operation == HA_EXTRA_MARK_AS_LOG_TABLE)
1302  {
1303    mysql_mutex_lock(&share->mutex);
1304    share->is_log_table= TRUE;
1305    mysql_mutex_unlock(&share->mutex);
1306  }
1307   DBUG_RETURN(0);
1308 }
1309 
1310 
1311 /*
1312   Set end_pos to the last valid byte of continuous area, closest
1313   to the given "hole", stored in the buffer. "Valid" here means,
1314   not listed in the chain of deleted records ("holes").
1315 */
get_write_pos(my_off_t * end_pos,tina_set * closest_hole)1316 bool ha_tina::get_write_pos(my_off_t *end_pos, tina_set *closest_hole)
1317 {
1318   if (closest_hole == chain_ptr) /* no more chains */
1319     *end_pos= file_buff->end();
1320   else
1321     *end_pos= min(file_buff->end(), closest_hole->begin);
1322   return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1323 }
1324 
1325 
1326 /*
1327   Called after each table scan. In particular after deletes,
1328   and updates. In the last case we employ chain of deleted
1329   slots to clean up all of the dead space we have collected while
1330   performing deletes/updates.
1331 */
rnd_end()1332 int ha_tina::rnd_end()
1333 {
1334   char updated_fname[FN_REFLEN];
1335   my_off_t file_buffer_start= 0;
1336   DBUG_ENTER("ha_tina::rnd_end");
1337 
1338   free_root(&blobroot, MYF(0));
1339   records_is_known= 1;
1340 
1341   if ((chain_ptr - chain)  > 0)
1342   {
1343     tina_set *ptr= chain;
1344 
1345     /*
1346       Re-read the beginning of a file (as the buffer should point to the
1347       end of file after the scan).
1348     */
1349     file_buff->init_buff(data_file);
1350 
1351     /*
1352       The sort is needed when there were updates/deletes with random orders.
1353       It sorts so that we move the firts blocks to the beginning.
1354     */
1355     my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1356              (qsort_cmp)sort_set);
1357 
1358     my_off_t write_begin= 0, write_end;
1359 
1360     /* create the file to write updated table if it wasn't yet created */
1361     if (open_update_temp_file_if_needed())
1362       DBUG_RETURN(-1);
1363 
1364     /* write the file with updated info */
1365     while ((file_buffer_start != (my_off_t)-1))     // while not end of file
1366     {
1367       bool in_hole= get_write_pos(&write_end, ptr);
1368       my_off_t write_length= write_end - write_begin;
1369 
1370       /* if there is something to write, write it */
1371       if (write_length)
1372       {
1373         if (mysql_file_write(update_temp_file,
1374                              (uchar*) (file_buff->ptr() +
1375                                        (write_begin - file_buff->start())),
1376                              (size_t)write_length, MYF_RW))
1377           goto error;
1378         temp_file_length+= write_length;
1379       }
1380       if (in_hole)
1381       {
1382         /* skip hole */
1383         while (file_buff->end() <= ptr->end &&
1384                file_buffer_start != (my_off_t)-1)
1385           file_buffer_start= file_buff->read_next();
1386         write_begin= ptr->end;
1387         ptr++;
1388       }
1389       else
1390         write_begin= write_end;
1391 
1392       if (write_end == file_buff->end())
1393         file_buffer_start= file_buff->read_next(); /* shift the buffer */
1394 
1395     }
1396 
1397     if (mysql_file_sync(update_temp_file, MYF(MY_WME)) ||
1398         mysql_file_close(update_temp_file, MYF(0)))
1399       DBUG_RETURN(-1);
1400 
1401     share->update_file_opened= FALSE;
1402 
1403     if (share->tina_write_opened)
1404     {
1405       if (mysql_file_close(share->tina_write_filedes, MYF(0)))
1406         DBUG_RETURN(-1);
1407       /*
1408         Mark that the writer fd is closed, so that init_tina_writer()
1409         will reopen it later.
1410       */
1411       share->tina_write_opened= FALSE;
1412     }
1413 
1414     /*
1415       Close opened fildes's. Then move updated file in place
1416       of the old datafile.
1417     */
1418     if (mysql_file_close(data_file, MYF(0)) ||
1419         mysql_file_rename(csv_key_file_data,
1420                           fn_format(updated_fname, share->table_name,
1421                                     "", CSN_EXT,
1422                                     MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1423                           share->data_file_name, MYF(0)))
1424       DBUG_RETURN(-1);
1425 
1426     /* Open the file again */
1427     if ((data_file= mysql_file_open(csv_key_file_data,
1428                                     share->data_file_name,
1429                                     O_RDONLY, MYF(MY_WME))) == -1)
1430       DBUG_RETURN(my_errno ? my_errno : -1);
1431     /*
1432       As we reopened the data file, increase share->data_file_version
1433       in order to force other threads waiting on a table lock and
1434       have already opened the table to reopen the data file.
1435       That makes the latest changes become visible to them.
1436       Update local_data_file_version as no need to reopen it in the
1437       current thread.
1438     */
1439     share->data_file_version++;
1440     local_data_file_version= share->data_file_version;
1441     /*
1442       The datafile is consistent at this point and the write filedes is
1443       closed, so nothing worrying will happen to it in case of a crash.
1444       Here we record this fact to the meta-file.
1445     */
1446     (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
1447     /*
1448       Update local_saved_data_file_length with the real length of the
1449       data file.
1450     */
1451     local_saved_data_file_length= temp_file_length;
1452   }
1453 
1454   DBUG_RETURN(0);
1455 error:
1456   mysql_file_close(update_temp_file, MYF(0));
1457   share->update_file_opened= FALSE;
1458   DBUG_RETURN(-1);
1459 }
1460 
1461 
1462 /*
1463   Repair CSV table in the case, it is crashed.
1464 
1465   SYNOPSIS
1466     repair()
1467     thd         The thread, performing repair
1468     check_opt   The options for repair. We do not use it currently.
1469 
1470   DESCRIPTION
1471     If the file is empty, change # of rows in the file and complete recovery.
1472     Otherwise, scan the table looking for bad rows. If none were found,
1473     we mark file as a good one and return. If a bad row was encountered,
1474     we truncate the datafile up to the last good row.
1475 
1476    TODO: Make repair more clever - it should try to recover subsequent
1477          rows (after the first bad one) as well.
1478 */
1479 
repair(THD * thd,HA_CHECK_OPT * check_opt)1480 int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
1481 {
1482   char repaired_fname[FN_REFLEN];
1483   uchar *buf;
1484   File repair_file;
1485   int rc;
1486   ha_rows rows_repaired= 0;
1487   my_off_t write_begin= 0, write_end;
1488   DBUG_ENTER("ha_tina::repair");
1489 
1490   /* empty file */
1491   if (!share->saved_data_file_length)
1492   {
1493     share->rows_recorded= 0;
1494     goto end;
1495   }
1496 
1497   /* Don't assert in field::val() functions */
1498   table->use_all_columns();
1499   if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1500     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1501 
1502   /* position buffer to the start of the file */
1503   if (init_data_file())
1504     DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1505 
1506   /*
1507     Local_saved_data_file_length is initialized during the lock phase.
1508     Sometimes this is not getting executed before ::repair (e.g. for
1509     the log tables). We set it manually here.
1510   */
1511   local_saved_data_file_length= share->saved_data_file_length;
1512   /* set current position to the beginning of the file */
1513   current_position= next_position= 0;
1514 
1515   /* Read the file row-by-row. If everything is ok, repair is not needed. */
1516   while (!(rc= find_current_row(buf)))
1517   {
1518     thd_inc_row_count(thd);
1519     rows_repaired++;
1520     current_position= next_position;
1521   }
1522 
1523   free_root(&blobroot, MYF(0));
1524 
1525   my_free(buf);
1526 
1527   if (rc == HA_ERR_END_OF_FILE)
1528   {
1529     /*
1530       All rows were read ok until end of file, the file does not need repair.
1531       If rows_recorded != rows_repaired, we should update rows_recorded value
1532       to the current amount of rows.
1533     */
1534     share->rows_recorded= rows_repaired;
1535     goto end;
1536   }
1537 
1538   /*
1539     Otherwise we've encountered a bad row => repair is needed.
1540     Let us create a temporary file.
1541   */
1542   if ((repair_file= mysql_file_create(csv_key_file_update,
1543                                       fn_format(repaired_fname,
1544                                                 share->table_name,
1545                                                 "", CSN_EXT,
1546                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1547                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1548     DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1549 
1550   file_buff->init_buff(data_file);
1551 
1552 
1553   /* we just truncated the file up to the first bad row. update rows count. */
1554   share->rows_recorded= rows_repaired;
1555 
1556   /* write repaired file */
1557   while (1)
1558   {
1559     write_end= min(file_buff->end(), current_position);
1560     if ((write_end - write_begin) &&
1561         (mysql_file_write(repair_file, (uchar*)file_buff->ptr(),
1562                           (size_t) (write_end - write_begin), MYF_RW)))
1563       DBUG_RETURN(-1);
1564 
1565     write_begin= write_end;
1566     if (write_end== current_position)
1567       break;
1568     else
1569       file_buff->read_next(); /* shift the buffer */
1570   }
1571 
1572   /*
1573     Close the files and rename repaired file to the datafile.
1574     We have to close the files, as on Windows one cannot rename
1575     a file, which descriptor is still open. EACCES will be returned
1576     when trying to delete the "to"-file in mysql_file_rename().
1577   */
1578   if (share->tina_write_opened)
1579   {
1580     /*
1581       Data file might be opened twice, on table opening stage and
1582       during write_row execution. We need to close both instances
1583       to satisfy Win.
1584     */
1585     if (mysql_file_close(share->tina_write_filedes, MYF(0)))
1586       DBUG_RETURN(my_errno ? my_errno : -1);
1587     share->tina_write_opened= FALSE;
1588   }
1589   if (mysql_file_close(data_file, MYF(0)) ||
1590       mysql_file_close(repair_file, MYF(0)) ||
1591       mysql_file_rename(csv_key_file_data,
1592                         repaired_fname, share->data_file_name, MYF(0)))
1593     DBUG_RETURN(-1);
1594 
1595   /* Open the file again, it should now be repaired */
1596   if ((data_file= mysql_file_open(csv_key_file_data,
1597                                   share->data_file_name, O_RDWR|O_APPEND,
1598                                   MYF(MY_WME))) == -1)
1599      DBUG_RETURN(my_errno ? my_errno : -1);
1600 
1601   /* Set new file size. The file size will be updated by ::update_status() */
1602   local_saved_data_file_length= (size_t) current_position;
1603 
1604 end:
1605   share->crashed= FALSE;
1606   DBUG_RETURN(HA_ADMIN_OK);
1607 }
1608 
1609 /*
1610   DELETE without WHERE calls this
1611 */
1612 
delete_all_rows()1613 int ha_tina::delete_all_rows()
1614 {
1615   int rc;
1616   DBUG_ENTER("ha_tina::delete_all_rows");
1617 
1618   if (!records_is_known)
1619     DBUG_RETURN(my_errno=HA_ERR_WRONG_COMMAND);
1620 
1621   if (!share->tina_write_opened)
1622     if (init_tina_writer())
1623       DBUG_RETURN(-1);
1624 
1625   /* Truncate the file to zero size */
1626   rc= mysql_file_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
1627 
1628   stats.records=0;
1629   /* Update shared info */
1630   mysql_mutex_lock(&share->mutex);
1631   share->rows_recorded= 0;
1632   mysql_mutex_unlock(&share->mutex);
1633   local_saved_data_file_length= 0;
1634   DBUG_RETURN(rc);
1635 }
1636 
1637 /*
1638   Called by the database to lock the table. Keep in mind that this
1639   is an internal lock.
1640 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1641 THR_LOCK_DATA **ha_tina::store_lock(THD *thd,
1642                                     THR_LOCK_DATA **to,
1643                                     enum thr_lock_type lock_type)
1644 {
1645   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1646     lock.type=lock_type;
1647   *to++= &lock;
1648   return to;
1649 }
1650 
1651 /*
1652   Create a table. You do not want to leave the table open after a call to
1653   this (the database will call ::open() if it needs to).
1654 */
1655 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)1656 int ha_tina::create(const char *name, TABLE *table_arg,
1657                     HA_CREATE_INFO *create_info)
1658 {
1659   char name_buff[FN_REFLEN];
1660   File create_file;
1661   DBUG_ENTER("ha_tina::create");
1662 
1663   /*
1664     check columns
1665   */
1666   for (Field **field= table_arg->s->field; *field; field++)
1667   {
1668     if ((*field)->real_maybe_null())
1669     {
1670       my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1671       DBUG_RETURN(HA_ERR_UNSUPPORTED);
1672     }
1673   }
1674 
1675 
1676   if ((create_file= mysql_file_create(csv_key_file_metadata,
1677                                       fn_format(name_buff, name, "", CSM_EXT,
1678                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1679                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1680     DBUG_RETURN(-1);
1681 
1682   write_meta_file(create_file, 0, FALSE);
1683   mysql_file_close(create_file, MYF(0));
1684 
1685   if ((create_file= mysql_file_create(csv_key_file_data,
1686                                       fn_format(name_buff, name, "", CSV_EXT,
1687                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1688                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1689     DBUG_RETURN(-1);
1690 
1691   mysql_file_close(create_file, MYF(0));
1692 
1693   DBUG_RETURN(0);
1694 }
1695 
check(THD * thd,HA_CHECK_OPT * check_opt)1696 int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
1697 {
1698   int rc= 0;
1699   uchar *buf;
1700   const char *old_proc_info;
1701   ha_rows count= share->rows_recorded;
1702   DBUG_ENTER("ha_tina::check");
1703 
1704   old_proc_info= thd_proc_info(thd, "Checking table");
1705   if (!(buf= (uchar*) my_malloc(table->s->reclength, MYF(MY_WME))))
1706     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1707 
1708   /* position buffer to the start of the file */
1709    if (init_data_file())
1710      DBUG_RETURN(HA_ERR_CRASHED);
1711 
1712   /*
1713     Local_saved_data_file_length is initialized during the lock phase.
1714     Check does not use store_lock in certain cases. So, we set it
1715     manually here.
1716   */
1717   local_saved_data_file_length= share->saved_data_file_length;
1718   /* set current position to the beginning of the file */
1719   current_position= next_position= 0;
1720 
1721   /* Read the file row-by-row. If everything is ok, repair is not needed. */
1722   while (!(rc= find_current_row(buf)))
1723   {
1724     thd_inc_row_count(thd);
1725     count--;
1726     current_position= next_position;
1727   }
1728 
1729   free_root(&blobroot, MYF(0));
1730 
1731   my_free(buf);
1732   thd_proc_info(thd, old_proc_info);
1733 
1734   if ((rc != HA_ERR_END_OF_FILE) || count)
1735   {
1736     share->crashed= TRUE;
1737     DBUG_RETURN(HA_ADMIN_CORRUPT);
1738   }
1739 
1740   DBUG_RETURN(HA_ADMIN_OK);
1741 }
1742 
1743 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1744 bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info,
1745 					   uint table_changes)
1746 {
1747   return COMPATIBLE_DATA_YES;
1748 }
1749 
1750 struct st_mysql_storage_engine csv_storage_engine=
1751 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1752 
mysql_declare_plugin(csv)1753 mysql_declare_plugin(csv)
1754 {
1755   MYSQL_STORAGE_ENGINE_PLUGIN,
1756   &csv_storage_engine,
1757   "CSV",
1758   "Brian Aker, MySQL AB",
1759   "CSV storage engine",
1760   PLUGIN_LICENSE_GPL,
1761   tina_init_func, /* Plugin Init */
1762   tina_done_func, /* Plugin Deinit */
1763   0x0100 /* 1.0 */,
1764   NULL,                       /* status variables                */
1765   NULL,                       /* system variables                */
1766   NULL,                       /* config options                  */
1767   0,                          /* flags                           */
1768 }
1769 mysql_declare_plugin_end;
1770 
1771