1 /* Copyright (c) 2004, 2021, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software
21   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /*
24   Make sure to look at ha_tina.h for more details.
25 
26   First off, this is a play thing for me, there are a number of things
27   wrong with it:
28     *) It was designed for csv and therefore its performance is highly
29        questionable.
30     *) Indexes have not been implemented. This is because the files can
31        be traded in and out of the table directory without having to worry
32        about rebuilding anything.
33     *) NULLs and "" are treated equally (like a spreadsheet).
34     *) There was in the beginning no point to anyone seeing this other
35        then me, so there is a good chance that I haven't quite documented
36        it well.
37     *) Less design, more "make it work"
38 
39   Now there are a few cool things with it:
40     *) Errors can result in corrupted data files.
41     *) Data files can be read by spreadsheets directly.
42 
43 TODO:
44  *) Move to a block system for larger files
45  *) Error recovery, its all there, just need to finish it
46  *) Document how the chains work.
47 
48  -Brian
49 */
50 
51 #include "my_global.h"
52 #include "sql_class.h"                          // SSV
53 #include <mysql/plugin.h>
54 #include <mysql/psi/mysql_file.h>
55 #include "ha_tina.h"
56 #include "probes_mysql.h"
57 
58 #include <algorithm>
59 
60 using std::min;
61 using std::max;
62 
63 /*
64   uchar + uchar + ulonglong + ulonglong + ulonglong + ulonglong + uchar
65 */
66 #define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
67   + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)
68 #define TINA_CHECK_HEADER 254 // The number we use to determine corruption
69 #define BLOB_MEMROOT_ALLOC_SIZE 8192
70 
71 /* The file extension */
72 #define CSV_EXT ".CSV"               // The data file
73 #define CSN_EXT ".CSN"               // Files used during repair and update
74 #define CSM_EXT ".CSM"               // Meta file
75 
76 
77 static TINA_SHARE *get_share(const char *table_name, TABLE *table);
78 static int free_share(TINA_SHARE *share);
79 static int read_meta_file(File meta_file, ha_rows *rows);
80 static int write_meta_file(File meta_file, ha_rows rows, bool dirty);
81 
82 extern "C" void tina_get_status(void* param, int concurrent_insert);
83 extern "C" void tina_update_status(void* param);
84 extern "C" my_bool tina_check_status(void* param);
85 
86 /* Stuff for shares */
87 mysql_mutex_t tina_mutex;
88 static HASH tina_open_tables;
89 static handler *tina_create_handler(handlerton *hton,
90                                     TABLE_SHARE *table,
91                                     MEM_ROOT *mem_root);
92 
93 
94 /*****************************************************************************
95  ** TINA tables
96  *****************************************************************************/
97 
98 /*
99   Used for sorting chains with qsort().
100 */
sort_set(tina_set * a,tina_set * b)101 int sort_set (tina_set *a, tina_set *b)
102 {
103   /*
104     We assume that intervals do not intersect. So, it is enought to compare
105     any two points. Here we take start of intervals for comparison.
106   */
107   return ( a->begin > b->begin ? 1 : ( a->begin < b->begin ? -1 : 0 ) );
108 }
109 
tina_get_key(TINA_SHARE * share,size_t * length,my_bool not_used MY_ATTRIBUTE ((unused)))110 static uchar* tina_get_key(TINA_SHARE *share, size_t *length,
111                           my_bool not_used MY_ATTRIBUTE((unused)))
112 {
113   *length=share->table_name_length;
114   return (uchar*) share->table_name;
115 }
116 
117 static PSI_memory_key csv_key_memory_tina_share;
118 static PSI_memory_key csv_key_memory_blobroot;
119 static PSI_memory_key csv_key_memory_tina_set;
120 static PSI_memory_key csv_key_memory_row;
121 
122 #ifdef HAVE_PSI_INTERFACE
123 
124 static PSI_mutex_key csv_key_mutex_tina, csv_key_mutex_TINA_SHARE_mutex;
125 
126 static PSI_mutex_info all_tina_mutexes[]=
127 {
128   { &csv_key_mutex_tina, "tina", PSI_FLAG_GLOBAL},
129   { &csv_key_mutex_TINA_SHARE_mutex, "TINA_SHARE::mutex", 0}
130 };
131 
132 static PSI_file_key csv_key_file_metadata, csv_key_file_data,
133   csv_key_file_update;
134 
135 static PSI_file_info all_tina_files[]=
136 {
137   { &csv_key_file_metadata, "metadata", 0},
138   { &csv_key_file_data, "data", 0},
139   { &csv_key_file_update, "update", 0}
140 };
141 
142 static PSI_memory_info all_tina_memory[]=
143 {
144   { &csv_key_memory_tina_share, "TINA_SHARE", PSI_FLAG_GLOBAL},
145   { &csv_key_memory_blobroot, "blobroot", 0},
146   { &csv_key_memory_tina_set, "tina_set", 0},
147   { &csv_key_memory_row, "row", 0},
148   { &csv_key_memory_Transparent_file, "Transparent_file", 0}
149 };
150 
init_tina_psi_keys(void)151 static void init_tina_psi_keys(void)
152 {
153   const char* category= "csv";
154   int count;
155 
156   count= array_elements(all_tina_mutexes);
157   mysql_mutex_register(category, all_tina_mutexes, count);
158 
159   count= array_elements(all_tina_files);
160   mysql_file_register(category, all_tina_files, count);
161 
162   count= array_elements(all_tina_memory);
163   mysql_memory_register(category, all_tina_memory, count);
164 }
165 #endif /* HAVE_PSI_INTERFACE */
166 
tina_init_func(void * p)167 static int tina_init_func(void *p)
168 {
169   handlerton *tina_hton;
170 
171 #ifdef HAVE_PSI_INTERFACE
172   init_tina_psi_keys();
173 #endif
174 
175   tina_hton= (handlerton *)p;
176   mysql_mutex_init(csv_key_mutex_tina, &tina_mutex, MY_MUTEX_INIT_FAST);
177   (void) my_hash_init(&tina_open_tables,system_charset_info,32,0,0,
178                       (my_hash_get_key) tina_get_key,0,0,
179                       csv_key_memory_tina_share);
180   tina_hton->state= SHOW_OPTION_YES;
181   tina_hton->db_type= DB_TYPE_CSV_DB;
182   tina_hton->create= tina_create_handler;
183   tina_hton->flags= (HTON_CAN_RECREATE | HTON_SUPPORT_LOG_TABLES |
184                      HTON_NO_PARTITION);
185   return 0;
186 }
187 
tina_done_func(void * p)188 static int tina_done_func(void *p)
189 {
190   my_hash_free(&tina_open_tables);
191   mysql_mutex_destroy(&tina_mutex);
192 
193   return 0;
194 }
195 
196 
197 /*
198   Simple lock controls.
199 */
get_share(const char * table_name,TABLE * table)200 static TINA_SHARE *get_share(const char *table_name, TABLE *table)
201 {
202   TINA_SHARE *share;
203   char meta_file_name[FN_REFLEN];
204   MY_STAT file_stat;                /* Stat information for the data file */
205   char *tmp_name;
206   uint length;
207 
208   mysql_mutex_lock(&tina_mutex);
209   length=(uint) strlen(table_name);
210 
211   /*
212     If share is not present in the hash, create a new share and
213     initialize its members.
214   */
215   if (!(share=(TINA_SHARE*) my_hash_search(&tina_open_tables,
216                                            (uchar*) table_name,
217                                            length)))
218   {
219     if (!my_multi_malloc(csv_key_memory_tina_share,
220                          MYF(MY_WME | MY_ZEROFILL),
221                          &share, sizeof(*share),
222                          &tmp_name, length+1,
223                          NullS))
224     {
225       mysql_mutex_unlock(&tina_mutex);
226       return NULL;
227     }
228 
229     share->use_count= 0;
230     share->is_log_table= FALSE;
231     share->table_name_length= length;
232     share->table_name= tmp_name;
233     share->crashed= FALSE;
234     share->rows_recorded= 0;
235     share->update_file_opened= FALSE;
236     share->tina_write_opened= FALSE;
237     share->data_file_version= 0;
238     my_stpcpy(share->table_name, table_name);
239     fn_format(share->data_file_name, table_name, "", CSV_EXT,
240               MY_REPLACE_EXT|MY_UNPACK_FILENAME);
241     fn_format(meta_file_name, table_name, "", CSM_EXT,
242               MY_REPLACE_EXT|MY_UNPACK_FILENAME);
243 
244     if (mysql_file_stat(csv_key_file_data,
245                         share->data_file_name, &file_stat, MYF(MY_WME)) == NULL)
246       goto error;
247     share->saved_data_file_length= file_stat.st_size;
248 
249     if (my_hash_insert(&tina_open_tables, (uchar*) share))
250       goto error;
251     thr_lock_init(&share->lock);
252     mysql_mutex_init(csv_key_mutex_TINA_SHARE_mutex,
253                      &share->mutex, MY_MUTEX_INIT_FAST);
254 
255     /*
256       Open or create the meta file. In the latter case, we'll get
257       an error during read_meta_file and mark the table as crashed.
258       Usually this will result in auto-repair, and we will get a good
259       meta-file in the end.
260     */
261     if (((share->meta_file= mysql_file_open(csv_key_file_metadata,
262                                             meta_file_name,
263                                             O_RDWR|O_CREAT,
264                                             MYF(MY_WME))) == -1) ||
265         read_meta_file(share->meta_file, &share->rows_recorded))
266       share->crashed= TRUE;
267   }
268 
269   share->use_count++;
270   mysql_mutex_unlock(&tina_mutex);
271 
272   return share;
273 
274 error:
275   mysql_mutex_unlock(&tina_mutex);
276   my_free(share);
277 
278   return NULL;
279 }
280 
281 
282 /*
283   Read CSV meta-file
284 
285   SYNOPSIS
286     read_meta_file()
287     meta_file   The meta-file filedes
288     ha_rows     Pointer to the var we use to store rows count.
289                 These are read from the meta-file.
290 
291   DESCRIPTION
292 
293     Read the meta-file info. For now we are only interested in
294     rows counf, crashed bit and magic number.
295 
296   RETURN
297     0 - OK
298     non-zero - error occurred
299 */
300 
read_meta_file(File meta_file,ha_rows * rows)301 static int read_meta_file(File meta_file, ha_rows *rows)
302 {
303   uchar meta_buffer[META_BUFFER_SIZE];
304   uchar *ptr= meta_buffer;
305 
306   DBUG_ENTER("ha_tina::read_meta_file");
307 
308   mysql_file_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
309   if (mysql_file_read(meta_file, (uchar*)meta_buffer, META_BUFFER_SIZE, 0)
310       != META_BUFFER_SIZE)
311     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
312 
313   /*
314     Parse out the meta data, we ignore version at the moment
315   */
316 
317   ptr+= sizeof(uchar)*2; // Move past header
318   *rows= (ha_rows)uint8korr(ptr);
319   ptr+= sizeof(ulonglong); // Move past rows
320   /*
321     Move past check_point, auto_increment and forced_flushes fields.
322     They are present in the format, but we do not use them yet.
323   */
324   ptr+= 3*sizeof(ulonglong);
325 
326   /* check crashed bit and magic number */
327   if ((meta_buffer[0] != (uchar)TINA_CHECK_HEADER) ||
328       ((bool)(*ptr)== TRUE))
329     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
330 
331   mysql_file_sync(meta_file, MYF(MY_WME));
332 
333   DBUG_RETURN(0);
334 }
335 
336 
337 /*
338   Write CSV meta-file
339 
340   SYNOPSIS
341     write_meta_file()
342     meta_file   The meta-file filedes
343     ha_rows     The number of rows we have in the datafile.
344     dirty       A flag, which marks whether we have a corrupt table
345 
346   DESCRIPTION
347 
348     Write meta-info the the file. Only rows count, crashed bit and
349     magic number matter now.
350 
351   RETURN
352     0 - OK
353     non-zero - error occurred
354 */
355 
write_meta_file(File meta_file,ha_rows rows,bool dirty)356 static int write_meta_file(File meta_file, ha_rows rows, bool dirty)
357 {
358   uchar meta_buffer[META_BUFFER_SIZE];
359   uchar *ptr= meta_buffer;
360 
361   DBUG_ENTER("ha_tina::write_meta_file");
362 
363   *ptr= (uchar)TINA_CHECK_HEADER;
364   ptr+= sizeof(uchar);
365   *ptr= (uchar)TINA_VERSION;
366   ptr+= sizeof(uchar);
367   int8store(ptr, (ulonglong)rows);
368   ptr+= sizeof(ulonglong);
369   memset(ptr, 0, 3*sizeof(ulonglong));
370   /*
371      Skip over checkpoint, autoincrement and forced_flushes fields.
372      We'll need them later.
373   */
374   ptr+= 3*sizeof(ulonglong);
375   *ptr= (uchar)dirty;
376 
377   mysql_file_seek(meta_file, 0, MY_SEEK_SET, MYF(0));
378   if (mysql_file_write(meta_file, (uchar *)meta_buffer, META_BUFFER_SIZE, 0)
379       != META_BUFFER_SIZE)
380     DBUG_RETURN(-1);
381 
382   mysql_file_sync(meta_file, MYF(MY_WME));
383 
384   DBUG_RETURN(0);
385 }
386 
check_and_repair(THD * thd)387 bool ha_tina::check_and_repair(THD *thd)
388 {
389   HA_CHECK_OPT check_opt;
390   DBUG_ENTER("ha_tina::check_and_repair");
391 
392   check_opt.init();
393 
394   DBUG_RETURN(repair(thd, &check_opt));
395 }
396 
397 
init_tina_writer()398 int ha_tina::init_tina_writer()
399 {
400   DBUG_ENTER("ha_tina::init_tina_writer");
401 
402   /*
403     Mark the file as crashed. We will set the flag back when we close
404     the file. In the case of the crash it will remain marked crashed,
405     which enforce recovery.
406   */
407   (void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
408 
409   if ((share->tina_write_filedes=
410         mysql_file_open(csv_key_file_data,
411                         share->data_file_name, O_RDWR|O_APPEND,
412                         MYF(MY_WME))) == -1)
413   {
414     DBUG_PRINT("info", ("Could not open tina file writes"));
415     share->crashed= TRUE;
416     DBUG_RETURN(my_errno() ? my_errno() : -1);
417   }
418   share->tina_write_opened= TRUE;
419 
420   DBUG_RETURN(0);
421 }
422 
423 
is_crashed() const424 bool ha_tina::is_crashed() const
425 {
426   DBUG_ENTER("ha_tina::is_crashed");
427   DBUG_RETURN(share->crashed);
428 }
429 
430 /*
431   Free lock controls.
432 */
free_share(TINA_SHARE * share)433 static int free_share(TINA_SHARE *share)
434 {
435   DBUG_ENTER("ha_tina::free_share");
436   mysql_mutex_lock(&tina_mutex);
437   int result_code= 0;
438   if (!--share->use_count){
439     /* Write the meta file. Mark it as crashed if needed. */
440     (void)write_meta_file(share->meta_file, share->rows_recorded,
441                           share->crashed ? TRUE :FALSE);
442     if (mysql_file_close(share->meta_file, MYF(0)))
443       result_code= 1;
444     if (share->tina_write_opened)
445     {
446       if (mysql_file_close(share->tina_write_filedes, MYF(0)))
447         result_code= 1;
448       share->tina_write_opened= FALSE;
449     }
450 
451     my_hash_delete(&tina_open_tables, (uchar*) share);
452     thr_lock_delete(&share->lock);
453     mysql_mutex_destroy(&share->mutex);
454     my_free(share);
455   }
456   mysql_mutex_unlock(&tina_mutex);
457 
458   DBUG_RETURN(result_code);
459 }
460 
461 
462 /*
463   This function finds the end of a line and returns the length
464   of the line ending.
465 
466   We support three kinds of line endings:
467   '\r'     --  Old Mac OS line ending
468   '\n'     --  Traditional Unix and Mac OS X line ending
469   '\r''\n' --  DOS\Windows line ending
470 */
471 
find_eoln_buff(Transparent_file * data_buff,my_off_t begin,my_off_t end,int * eoln_len)472 my_off_t find_eoln_buff(Transparent_file *data_buff, my_off_t begin,
473                      my_off_t end, int *eoln_len)
474 {
475   *eoln_len= 0;
476 
477   for (my_off_t x= begin; x < end; x++)
478   {
479     /* Unix (includes Mac OS X) */
480     if (data_buff->get_value(x) == '\n')
481       *eoln_len= 1;
482     else
483       if (data_buff->get_value(x) == '\r') // Mac or Dos
484       {
485         /* old Mac line ending */
486         if (x + 1 == end || (data_buff->get_value(x + 1) != '\n'))
487           *eoln_len= 1;
488         else // DOS style ending
489           *eoln_len= 2;
490       }
491 
492     if (*eoln_len)  // end of line was found
493       return x;
494   }
495 
496   return 0;
497 }
498 
499 
tina_create_handler(handlerton * hton,TABLE_SHARE * table,MEM_ROOT * mem_root)500 static handler *tina_create_handler(handlerton *hton,
501                                     TABLE_SHARE *table,
502                                     MEM_ROOT *mem_root)
503 {
504   return new (mem_root) ha_tina(hton, table);
505 }
506 
507 
ha_tina(handlerton * hton,TABLE_SHARE * table_arg)508 ha_tina::ha_tina(handlerton *hton, TABLE_SHARE *table_arg)
509   :handler(hton, table_arg),
510   /*
511     These definitions are found in handler.h
512     They are not probably completely right.
513   */
514   current_position(0), next_position(0), local_saved_data_file_length(0),
515   file_buff(0), chain_alloced(0), chain_size(DEFAULT_CHAIN_LENGTH),
516   local_data_file_version(0), records_is_known(0)
517 {
518   /* Set our original buffers from pre-allocated memory */
519   buffer.set((char*)byte_buffer, IO_SIZE, &my_charset_bin);
520   chain= chain_buffer;
521   file_buff= new Transparent_file();
522   init_alloc_root(csv_key_memory_blobroot,
523                   &blobroot, BLOB_MEMROOT_ALLOC_SIZE, 0);
524 }
525 
526 
527 /*
528   Encode a buffer into the quoted format.
529 */
530 
encode_quote(uchar * buf)531 int ha_tina::encode_quote(uchar *buf)
532 {
533   char attribute_buffer[1024];
534   String attribute(attribute_buffer, sizeof(attribute_buffer),
535                    &my_charset_bin);
536 
537   my_bitmap_map *org_bitmap= dbug_tmp_use_all_columns(table, table->read_set);
538   buffer.length(0);
539 
540   for (Field **field=table->field ; *field ; field++)
541   {
542     const char *ptr;
543     const char *end_ptr;
544     const bool was_null= (*field)->is_null();
545 
546     /*
547       assistance for backwards compatibility in production builds.
548       note: this will not work for ENUM columns.
549     */
550     if (was_null)
551     {
552       (*field)->set_default();
553       (*field)->set_notnull();
554     }
555 
556     (*field)->val_str(&attribute,&attribute);
557 
558     if (was_null)
559       (*field)->set_null();
560 
561     if ((*field)->str_needs_quotes())
562     {
563       ptr= attribute.ptr();
564       end_ptr= attribute.length() + ptr;
565 
566       buffer.append('"');
567 
568       for (; ptr < end_ptr; ptr++)
569       {
570         if (*ptr == '"')
571         {
572           buffer.append('\\');
573           buffer.append('"');
574         }
575         else if (*ptr == '\r')
576         {
577           buffer.append('\\');
578           buffer.append('r');
579         }
580         else if (*ptr == '\\')
581         {
582           buffer.append('\\');
583           buffer.append('\\');
584         }
585         else if (*ptr == '\n')
586         {
587           buffer.append('\\');
588           buffer.append('n');
589         }
590         else
591           buffer.append(*ptr);
592       }
593       buffer.append('"');
594     }
595     else
596     {
597       buffer.append(attribute);
598     }
599 
600     buffer.append(',');
601   }
602   // Remove the comma, add a line feed
603   buffer.length(buffer.length() - 1);
604   buffer.append('\n');
605 
606   //buffer.replace(buffer.length(), 0, "\n", 1);
607 
608   dbug_tmp_restore_column_map(table->read_set, org_bitmap);
609   return (buffer.length());
610 }
611 
612 /*
613   chain_append() adds delete positions to the chain that we use to keep
614   track of space. Then the chain will be used to cleanup "holes", occurred
615   due to deletes and updates.
616 */
chain_append()617 int ha_tina::chain_append()
618 {
619   if ( chain_ptr != chain && (chain_ptr -1)->end == current_position)
620     (chain_ptr -1)->end= next_position;
621   else
622   {
623     /* We set up for the next position */
624     if ((size_t)(chain_ptr - chain) == (chain_size -1))
625     {
626       my_off_t location= chain_ptr - chain;
627       chain_size += DEFAULT_CHAIN_LENGTH;
628       if (chain_alloced)
629       {
630         /* Must cast since my_malloc unlike malloc doesn't have a void ptr */
631         if ((chain= (tina_set *) my_realloc(csv_key_memory_tina_set,
632                                             (uchar*)chain,
633                                             chain_size*sizeof(tina_set), MYF(MY_WME))) == NULL)
634           return -1;
635       }
636       else
637       {
638         tina_set *ptr= (tina_set *) my_malloc(csv_key_memory_tina_set,
639                                               chain_size * sizeof(tina_set),
640                                               MYF(MY_WME));
641         memcpy(ptr, chain, DEFAULT_CHAIN_LENGTH * sizeof(tina_set));
642         chain= ptr;
643         chain_alloced++;
644       }
645       chain_ptr= chain + location;
646     }
647     chain_ptr->begin= current_position;
648     chain_ptr->end= next_position;
649     chain_ptr++;
650   }
651 
652   return 0;
653 }
654 
655 
656 /*
657   Scans for a row.
658 */
find_current_row(uchar * buf)659 int ha_tina::find_current_row(uchar *buf)
660 {
661   my_off_t end_offset, curr_offset= current_position;
662   int eoln_len;
663   my_bitmap_map *org_bitmap;
664   int error;
665   bool read_all;
666   DBUG_ENTER("ha_tina::find_current_row");
667 
668   free_root(&blobroot, MYF(0));
669 
670   /*
671     We do not read further then local_saved_data_file_length in order
672     not to conflict with undergoing concurrent insert.
673   */
674   if ((end_offset=
675         find_eoln_buff(file_buff, current_position,
676                        local_saved_data_file_length, &eoln_len)) == 0)
677     DBUG_RETURN(HA_ERR_END_OF_FILE);
678 
679   /* We must read all columns in case a table is opened for update */
680   read_all= !bitmap_is_clear_all(table->write_set);
681   /* Avoid asserts in ::store() for columns that are not going to be updated */
682   org_bitmap= dbug_tmp_use_all_columns(table, table->write_set);
683   error= HA_ERR_CRASHED_ON_USAGE;
684 
685   memset(buf, 0, table->s->null_bytes);
686 
687   /*
688     Parse the line obtained using the following algorithm
689 
690     BEGIN
691       1) Store the EOL (end of line) for the current row
692       2) Until all the fields in the current query have not been
693          filled
694          2.1) If the current character is a quote
695               2.1.1) Until EOL has not been reached
696                      a) If end of current field is reached, move
697                         to next field and jump to step 2.3
698                      b) If current character is a \\ handle
699                         \\n, \\r, \\, \\"
700                      c) else append the current character into the buffer
701                         before checking that EOL has not been reached.
702           2.2) If the current character does not begin with a quote
703                2.2.1) Until EOL has not been reached
704                       a) If the end of field has been reached move to the
705                          next field and jump to step 2.3
706                       b) If current character begins with \\ handle
707                         \\n, \\r, \\, \\"
708                       c) else append the current character into the buffer
709                          before checking that EOL has not been reached.
710           2.3) Store the current field value and jump to 2)
711     TERMINATE
712   */
713 
714   for (Field **field=table->field ; *field ; field++)
715   {
716     char curr_char;
717 
718     buffer.length(0);
719     if (curr_offset >= end_offset)
720       goto err;
721     curr_char= file_buff->get_value(curr_offset);
722     /* Handle the case where the first character is a quote */
723     if (curr_char == '"')
724     {
725       /* Increment past the first quote */
726       curr_offset++;
727 
728       /* Loop through the row to extract the values for the current field */
729       for ( ; curr_offset < end_offset; curr_offset++)
730       {
731         curr_char= file_buff->get_value(curr_offset);
732         /* check for end of the current field */
733         if (curr_char == '"' &&
734             (curr_offset == end_offset - 1 ||
735              file_buff->get_value(curr_offset + 1) == ','))
736         {
737           /* Move past the , and the " */
738           curr_offset+= 2;
739           break;
740         }
741         if (curr_char == '\\' && curr_offset != (end_offset - 1))
742         {
743           curr_offset++;
744           curr_char= file_buff->get_value(curr_offset);
745           if (curr_char == 'r')
746             buffer.append('\r');
747           else if (curr_char == 'n' )
748             buffer.append('\n');
749           else if (curr_char == '\\' || curr_char == '"')
750             buffer.append(curr_char);
751           else  /* This could only happed with an externally created file */
752           {
753             buffer.append('\\');
754             buffer.append(curr_char);
755           }
756         }
757         else // ordinary symbol
758         {
759           /*
760             If we are at final symbol and no last quote was found =>
761             we are working with a damaged file.
762           */
763           if (curr_offset == end_offset - 1)
764             goto err;
765           buffer.append(curr_char);
766         }
767       }
768     }
769     else
770     {
771       for ( ; curr_offset < end_offset; curr_offset++)
772       {
773         curr_char= file_buff->get_value(curr_offset);
774         /* Move past the ,*/
775         if (curr_char == ',')
776         {
777           curr_offset++;
778           break;
779         }
780         if (curr_char == '\\' && curr_offset != (end_offset - 1))
781         {
782           curr_offset++;
783           curr_char= file_buff->get_value(curr_offset);
784           if (curr_char == 'r')
785             buffer.append('\r');
786           else if (curr_char == 'n' )
787             buffer.append('\n');
788           else if (curr_char == '\\' || curr_char == '"')
789             buffer.append(curr_char);
790           else  /* This could only happed with an externally created file */
791           {
792             buffer.append('\\');
793             buffer.append(curr_char);
794           }
795         }
796         else
797         {
798           /*
799              We are at the final symbol and a quote was found for the
800              unquoted field => We are working with a damaged field.
801           */
802           if (curr_offset == end_offset - 1 && curr_char == '"')
803             goto err;
804           buffer.append(curr_char);
805         }
806       }
807     }
808 
809     if (read_all || bitmap_is_set(table->read_set, (*field)->field_index))
810     {
811       bool is_enum= ((*field)->real_type() ==  MYSQL_TYPE_ENUM);
812       /*
813         Here CHECK_FIELD_WARN checks that all values in the csv file are valid
814         which is normally the case, if they were written  by
815         INSERT -> ha_tina::write_row. '0' values on ENUM fields are considered
816         invalid by Field_enum::store() but it can store them on INSERT anyway.
817         Thus, for enums we silence the warning, as it doesn't really mean
818         an invalid value.
819       */
820       if ((*field)->store(buffer.ptr(), buffer.length(), buffer.charset(),
821                           is_enum ? CHECK_FIELD_IGNORE : CHECK_FIELD_WARN))
822       {
823         if (!is_enum)
824           goto err;
825       }
826       if ((*field)->flags & BLOB_FLAG)
827       {
828         Field_blob *blob= *(Field_blob**) field;
829         uchar *src, *tgt;
830         uint length, packlength;
831 
832         packlength= blob->pack_length_no_ptr();
833         length= blob->get_length(blob->ptr);
834         memcpy(&src, blob->ptr + packlength, sizeof(char*));
835         if (src)
836         {
837           tgt= (uchar*) alloc_root(&blobroot, length);
838           memmove(tgt, src, length);
839           memcpy(blob->ptr + packlength, &tgt, sizeof(char*));
840         }
841       }
842     }
843   }
844   next_position= end_offset + eoln_len;
845   error= 0;
846 
847 err:
848   dbug_tmp_restore_column_map(table->write_set, org_bitmap);
849 
850   DBUG_RETURN(error);
851 }
852 
853 /*
854   If frm_error() is called in table.cc this is called to find out what file
855   extensions exist for this handler.
856 */
857 static const char *ha_tina_exts[] = {
858   CSV_EXT,
859   CSM_EXT,
860   NullS
861 };
862 
bas_ext() const863 const char **ha_tina::bas_ext() const
864 {
865   return ha_tina_exts;
866 }
867 
868 /*
869   Three functions below are needed to enable concurrent insert functionality
870   for CSV engine. For more details see mysys/thr_lock.c
871 */
872 
tina_get_status(void * param,int concurrent_insert)873 void tina_get_status(void* param, int concurrent_insert)
874 {
875   ha_tina *tina= (ha_tina*) param;
876   tina->get_status();
877 }
878 
tina_update_status(void * param)879 void tina_update_status(void* param)
880 {
881   ha_tina *tina= (ha_tina*) param;
882   tina->update_status();
883 }
884 
885 /* this should exist and return 0 for concurrent insert to work */
tina_check_status(void * param)886 my_bool tina_check_status(void* param)
887 {
888   return 0;
889 }
890 
891 /*
892   Save the state of the table
893 
894   SYNOPSIS
895     get_status()
896 
897   DESCRIPTION
898     This function is used to retrieve the file length. During the lock
899     phase of concurrent insert. For more details see comment to
900     ha_tina::update_status below.
901 */
902 
get_status()903 void ha_tina::get_status()
904 {
905   if (share->is_log_table)
906   {
907     /*
908       We have to use mutex to follow pthreads memory visibility
909       rules for share->saved_data_file_length
910     */
911     mysql_mutex_lock(&share->mutex);
912     local_saved_data_file_length= share->saved_data_file_length;
913     mysql_mutex_unlock(&share->mutex);
914     return;
915   }
916   local_saved_data_file_length= share->saved_data_file_length;
917 }
918 
919 
920 /*
921   Correct the state of the table. Called by unlock routines
922   before the write lock is released.
923 
924   SYNOPSIS
925     update_status()
926 
927   DESCRIPTION
928     When we employ concurrent insert lock, we save current length of the file
929     during the lock phase. We do not read further saved value, as we don't
930     want to interfere with undergoing concurrent insert. Writers update file
931     length info during unlock with update_status().
932 
933   NOTE
934     For log tables concurrent insert works different. The reason is that
935     log tables are always opened and locked. And as they do not unlock
936     tables, the file length after writes should be updated in a different
937     way. For this purpose we need is_log_table flag. When this flag is set
938     we call update_status() explicitly after each row write.
939 */
940 
update_status()941 void ha_tina::update_status()
942 {
943   /* correct local_saved_data_file_length for writers */
944   share->saved_data_file_length= local_saved_data_file_length;
945 }
946 
947 
948 /*
949   Open a database file. Keep in mind that tables are caches, so
950   this will not be called for every request. Any sort of positions
951   that need to be reset should be kept in the ::extra() call.
952 */
open(const char * name,int mode,uint open_options)953 int ha_tina::open(const char *name, int mode, uint open_options)
954 {
955   DBUG_ENTER("ha_tina::open");
956 
957   if (!(share= get_share(name, table)))
958     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
959 
960   if (share->crashed && !(open_options & HA_OPEN_FOR_REPAIR))
961   {
962     free_share(share);
963     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
964   }
965 
966   local_data_file_version= share->data_file_version;
967   if ((data_file= mysql_file_open(csv_key_file_data,
968                                   share->data_file_name,
969                                   O_RDONLY, MYF(MY_WME))) == -1)
970   {
971     free_share(share);
972     DBUG_RETURN(my_errno() ? my_errno() : -1);
973   }
974 
975   /*
976     Init locking. Pass handler object to the locking routines,
977     so that they could save/update local_saved_data_file_length value
978     during locking. This is needed to enable concurrent inserts.
979   */
980   thr_lock_data_init(&share->lock, &lock, (void*) this);
981   ref_length= sizeof(my_off_t);
982 
983   share->lock.get_status= tina_get_status;
984   share->lock.update_status= tina_update_status;
985   share->lock.check_status= tina_check_status;
986 
987   DBUG_RETURN(0);
988 }
989 
990 
991 /*
992   Close a database file. We remove ourselves from the shared strucutre.
993   If it is empty we destroy it.
994 */
close(void)995 int ha_tina::close(void)
996 {
997   int rc= 0;
998   DBUG_ENTER("ha_tina::close");
999   rc= mysql_file_close(data_file, MYF(0));
1000   DBUG_RETURN(free_share(share) || rc);
1001 }
1002 
1003 /*
1004   This is an INSERT. At the moment this handler just seeks to the end
1005   of the file and appends the data. In an error case it really should
1006   just truncate to the original position (this is not done yet).
1007 */
write_row(uchar * buf)1008 int ha_tina::write_row(uchar * buf)
1009 {
1010   int size;
1011   DBUG_ENTER("ha_tina::write_row");
1012 
1013   if (share->crashed)
1014       DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1015 
1016   ha_statistic_increment(&SSV::ha_write_count);
1017 
1018   size= encode_quote(buf);
1019 
1020   if (!share->tina_write_opened)
1021     if (init_tina_writer())
1022       DBUG_RETURN(-1);
1023 
1024    /* use pwrite, as concurrent reader could have changed the position */
1025   if (mysql_file_write(share->tina_write_filedes, (uchar*)buffer.ptr(), size,
1026                        MYF(MY_WME | MY_NABP)))
1027     DBUG_RETURN(-1);
1028 
1029   /* update local copy of the max position to see our own changes */
1030   local_saved_data_file_length+= size;
1031 
1032   /* update shared info */
1033   mysql_mutex_lock(&share->mutex);
1034   share->rows_recorded++;
1035   /* update status for the log tables */
1036   if (share->is_log_table)
1037     update_status();
1038   mysql_mutex_unlock(&share->mutex);
1039 
1040   stats.records++;
1041   DBUG_RETURN(0);
1042 }
1043 
1044 
open_update_temp_file_if_needed()1045 int ha_tina::open_update_temp_file_if_needed()
1046 {
1047   char updated_fname[FN_REFLEN];
1048 
1049   if (!share->update_file_opened)
1050   {
1051     if ((update_temp_file=
1052            mysql_file_create(csv_key_file_update,
1053                              fn_format(updated_fname, share->table_name,
1054                                        "", CSN_EXT,
1055                                        MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1056                              0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1057       return 1;
1058     share->update_file_opened= TRUE;
1059     temp_file_length= 0;
1060   }
1061   return 0;
1062 }
1063 
1064 /*
1065   This is called for an update.
1066   Make sure you put in code to increment the auto increment.
1067   Currently auto increment is not being
1068   fixed since autoincrements have yet to be added to this table handler.
1069   This will be called in a table scan right before the previous ::rnd_next()
1070   call.
1071 */
update_row(const uchar * old_data,uchar * new_data)1072 int ha_tina::update_row(const uchar * old_data, uchar * new_data)
1073 {
1074   int size;
1075   int rc= -1;
1076   DBUG_ENTER("ha_tina::update_row");
1077 
1078   ha_statistic_increment(&SSV::ha_update_count);
1079 
1080   size= encode_quote(new_data);
1081 
1082   /*
1083     During update we mark each updating record as deleted
1084     (see the chain_append()) then write new one to the temporary data file.
1085     At the end of the sequence in the rnd_end() we append all non-marked
1086     records from the data file to the temporary data file then rename it.
1087     The temp_file_length is used to calculate new data file length.
1088   */
1089   if (chain_append())
1090     goto err;
1091 
1092   if (open_update_temp_file_if_needed())
1093     goto err;
1094 
1095   if (mysql_file_write(update_temp_file, (uchar*)buffer.ptr(), size,
1096                        MYF(MY_WME | MY_NABP)))
1097     goto err;
1098   temp_file_length+= size;
1099   rc= 0;
1100 
1101   /* UPDATE should never happen on the log tables */
1102   assert(!share->is_log_table);
1103 
1104 err:
1105   DBUG_PRINT("info",("rc = %d", rc));
1106   DBUG_RETURN(rc);
1107 }
1108 
1109 
1110 /*
1111   Deletes a row. First the database will find the row, and then call this
1112   method. In the case of a table scan, the previous call to this will be
1113   the ::rnd_next() that found this row.
1114   The exception to this is an ORDER BY. This will cause the table handler
1115   to walk the table noting the positions of all rows that match a query.
1116   The table will then be deleted/positioned based on the ORDER (so RANDOM,
1117   DESC, ASC).
1118 */
delete_row(const uchar * buf)1119 int ha_tina::delete_row(const uchar * buf)
1120 {
1121   DBUG_ENTER("ha_tina::delete_row");
1122   ha_statistic_increment(&SSV::ha_delete_count);
1123 
1124   if (chain_append())
1125     DBUG_RETURN(-1);
1126 
1127   stats.records--;
1128   /* Update shared info */
1129   assert(share->rows_recorded);
1130   mysql_mutex_lock(&share->mutex);
1131   share->rows_recorded--;
1132   mysql_mutex_unlock(&share->mutex);
1133 
1134   /* DELETE should never happen on the log table */
1135   assert(!share->is_log_table);
1136 
1137   DBUG_RETURN(0);
1138 }
1139 
1140 
1141 /**
1142   @brief Initialize the data file.
1143 
1144   @details Compare the local version of the data file with the shared one.
1145   If they differ, there are some changes behind and we have to reopen
1146   the data file to make the changes visible.
1147   Call @c file_buff->init_buff() at the end to read the beginning of the
1148   data file into buffer.
1149 
1150   @retval  0  OK.
1151   @retval  1  There was an error.
1152 */
1153 
init_data_file()1154 int ha_tina::init_data_file()
1155 {
1156   if (local_data_file_version != share->data_file_version)
1157   {
1158     local_data_file_version= share->data_file_version;
1159     if (mysql_file_close(data_file, MYF(0)) ||
1160         (data_file= mysql_file_open(csv_key_file_data,
1161                                     share->data_file_name, O_RDONLY,
1162                                     MYF(MY_WME))) == -1)
1163       return my_errno() ? my_errno() : -1;
1164   }
1165   file_buff->init_buff(data_file);
1166   return 0;
1167 }
1168 
1169 
1170 /*
1171   All table scans call this first.
1172   The order of a table scan is:
1173 
1174   ha_tina::store_lock
1175   ha_tina::external_lock
1176   ha_tina::info
1177   ha_tina::rnd_init
1178   ha_tina::extra
1179   ENUM HA_EXTRA_CACHE   Cash record in HA_rrnd()
1180   ha_tina::rnd_next
1181   ha_tina::rnd_next
1182   ha_tina::rnd_next
1183   ha_tina::rnd_next
1184   ha_tina::rnd_next
1185   ha_tina::rnd_next
1186   ha_tina::rnd_next
1187   ha_tina::rnd_next
1188   ha_tina::rnd_next
1189   ha_tina::extra
1190   ENUM HA_EXTRA_NO_CACHE   End cacheing of records (def)
1191   ha_tina::external_lock
1192   ha_tina::extra
1193   ENUM HA_EXTRA_RESET   Reset database to after open
1194 
1195   Each call to ::rnd_next() represents a row returned in the can. When no more
1196   rows can be returned, rnd_next() returns a value of HA_ERR_END_OF_FILE.
1197   The ::info() call is just for the optimizer.
1198 
1199 */
1200 
rnd_init(bool scan)1201 int ha_tina::rnd_init(bool scan)
1202 {
1203   DBUG_ENTER("ha_tina::rnd_init");
1204 
1205   /* set buffer to the beginning of the file */
1206   if (share->crashed || init_data_file())
1207     DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
1208 
1209   current_position= next_position= 0;
1210   stats.records= 0;
1211   records_is_known= 0;
1212   chain_ptr= chain;
1213 
1214   DBUG_RETURN(0);
1215 }
1216 
1217 /*
1218   ::rnd_next() does all the heavy lifting for a table scan. You will need to
1219   populate *buf with the correct field data. You can walk the field to
1220   determine at what position you should store the data (take a look at how
1221   ::find_current_row() works). The structure is something like:
1222   0Foo  Dog  Friend
1223   The first offset is for the first attribute. All space before that is
1224   reserved for null count.
1225   Basically this works as a mask for which rows are nulled (compared to just
1226   empty).
1227   This table handler doesn't do nulls and does not know the difference between
1228   NULL and "". This is ok since this table handler is for spreadsheets and
1229   they don't know about them either :)
1230 */
rnd_next(uchar * buf)1231 int ha_tina::rnd_next(uchar *buf)
1232 {
1233   int rc;
1234   DBUG_ENTER("ha_tina::rnd_next");
1235   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
1236                        TRUE);
1237 
1238   if (share->crashed)
1239   {
1240     rc= HA_ERR_CRASHED_ON_USAGE;
1241     goto end;
1242   }
1243 
1244   ha_statistic_increment(&SSV::ha_read_rnd_next_count);
1245 
1246   current_position= next_position;
1247 
1248   /* don't scan an empty file */
1249   if (!local_saved_data_file_length)
1250   {
1251     rc= HA_ERR_END_OF_FILE;
1252     goto end;
1253   }
1254 
1255   if ((rc= find_current_row(buf)))
1256     goto end;
1257 
1258   stats.records++;
1259   rc= 0;
1260 end:
1261   MYSQL_READ_ROW_DONE(rc);
1262   DBUG_RETURN(rc);
1263 }
1264 
1265 /*
1266   In the case of an order by rows will need to be sorted.
1267   ::position() is called after each call to ::rnd_next(),
1268   the data it stores is to a byte array. You can store this
1269   data via my_store_ptr(). ref_length is a variable defined to the
1270   class that is the sizeof() of position being stored. In our case
1271   its just a position. Look at the bdb code if you want to see a case
1272   where something other then a number is stored.
1273 */
position(const uchar * record)1274 void ha_tina::position(const uchar *record)
1275 {
1276   DBUG_ENTER("ha_tina::position");
1277   my_store_ptr(ref, ref_length, current_position);
1278   DBUG_VOID_RETURN;
1279 }
1280 
1281 
1282 /*
1283   Used to fetch a row from a posiion stored with ::position().
1284   my_get_ptr() retrieves the data for you.
1285 */
1286 
rnd_pos(uchar * buf,uchar * pos)1287 int ha_tina::rnd_pos(uchar * buf, uchar *pos)
1288 {
1289   int rc;
1290   DBUG_ENTER("ha_tina::rnd_pos");
1291   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
1292                        FALSE);
1293   ha_statistic_increment(&SSV::ha_read_rnd_count);
1294   current_position= my_get_ptr(pos,ref_length);
1295   rc= find_current_row(buf);
1296   MYSQL_READ_ROW_DONE(rc);
1297   DBUG_RETURN(rc);
1298 }
1299 
1300 /*
1301   ::info() is used to return information to the optimizer.
1302   Currently this table handler doesn't implement most of the fields
1303   really needed. SHOW also makes use of this data
1304 */
info(uint flag)1305 int ha_tina::info(uint flag)
1306 {
1307   DBUG_ENTER("ha_tina::info");
1308   /* This is a lie, but you don't want the optimizer to see zero or 1 */
1309   if (!records_is_known && stats.records < 2)
1310     stats.records= 2;
1311   DBUG_RETURN(0);
1312 }
1313 
1314 /*
1315   Grab bag of flags that are sent to the able handler every so often.
1316   HA_EXTRA_RESET and HA_EXTRA_RESET_STATE are the most frequently called.
1317   You are not required to implement any of these.
1318 */
extra(enum ha_extra_function operation)1319 int ha_tina::extra(enum ha_extra_function operation)
1320 {
1321   DBUG_ENTER("ha_tina::extra");
1322  if (operation == HA_EXTRA_MARK_AS_LOG_TABLE)
1323  {
1324    mysql_mutex_lock(&share->mutex);
1325    share->is_log_table= TRUE;
1326    mysql_mutex_unlock(&share->mutex);
1327  }
1328   DBUG_RETURN(0);
1329 }
1330 
1331 
1332 /*
1333   Set end_pos to the last valid byte of continuous area, closest
1334   to the given "hole", stored in the buffer. "Valid" here means,
1335   not listed in the chain of deleted records ("holes").
1336 */
get_write_pos(my_off_t * end_pos,tina_set * closest_hole)1337 bool ha_tina::get_write_pos(my_off_t *end_pos, tina_set *closest_hole)
1338 {
1339   if (closest_hole == chain_ptr) /* no more chains */
1340     *end_pos= file_buff->end();
1341   else
1342     *end_pos= min(file_buff->end(), closest_hole->begin);
1343   return (closest_hole != chain_ptr) && (*end_pos == closest_hole->begin);
1344 }
1345 
1346 
1347 /*
1348   Called after each table scan. In particular after deletes,
1349   and updates. In the last case we employ chain of deleted
1350   slots to clean up all of the dead space we have collected while
1351   performing deletes/updates.
1352 */
rnd_end()1353 int ha_tina::rnd_end()
1354 {
1355   char updated_fname[FN_REFLEN];
1356   my_off_t file_buffer_start= 0;
1357   DBUG_ENTER("ha_tina::rnd_end");
1358 
1359   free_root(&blobroot, MYF(0));
1360   records_is_known= 1;
1361 
1362   if ((chain_ptr - chain)  > 0)
1363   {
1364     tina_set *ptr= chain;
1365 
1366     /*
1367       Re-read the beginning of a file (as the buffer should point to the
1368       end of file after the scan).
1369     */
1370     file_buff->init_buff(data_file);
1371 
1372     /*
1373       The sort is needed when there were updates/deletes with random orders.
1374       It sorts so that we move the firts blocks to the beginning.
1375     */
1376     my_qsort(chain, (size_t)(chain_ptr - chain), sizeof(tina_set),
1377              (qsort_cmp)sort_set);
1378 
1379     my_off_t write_begin= 0, write_end;
1380 
1381     /* create the file to write updated table if it wasn't yet created */
1382     if (open_update_temp_file_if_needed())
1383       DBUG_RETURN(-1);
1384 
1385     /* write the file with updated info */
1386     while ((file_buffer_start != (my_off_t)-1))     // while not end of file
1387     {
1388       bool in_hole= get_write_pos(&write_end, ptr);
1389       my_off_t write_length= write_end - write_begin;
1390 
1391       /* if there is something to write, write it */
1392       if (write_length)
1393       {
1394         if (mysql_file_write(update_temp_file,
1395                              (uchar*) (file_buff->ptr() +
1396                                        (write_begin - file_buff->start())),
1397                              (size_t)write_length, MYF_RW))
1398           goto error;
1399         temp_file_length+= write_length;
1400       }
1401       if (in_hole)
1402       {
1403         /* skip hole */
1404         while (file_buff->end() <= ptr->end &&
1405                file_buffer_start != (my_off_t)-1)
1406           file_buffer_start= file_buff->read_next();
1407         write_begin= ptr->end;
1408         ptr++;
1409       }
1410       else
1411         write_begin= write_end;
1412 
1413       if (write_end == file_buff->end())
1414         file_buffer_start= file_buff->read_next(); /* shift the buffer */
1415 
1416     }
1417 
1418     if (mysql_file_sync(update_temp_file, MYF(MY_WME)) ||
1419         mysql_file_close(update_temp_file, MYF(0)))
1420       DBUG_RETURN(-1);
1421 
1422     share->update_file_opened= FALSE;
1423 
1424     if (share->tina_write_opened)
1425     {
1426       if (mysql_file_close(share->tina_write_filedes, MYF(0)))
1427         DBUG_RETURN(-1);
1428       /*
1429         Mark that the writer fd is closed, so that init_tina_writer()
1430         will reopen it later.
1431       */
1432       share->tina_write_opened= FALSE;
1433     }
1434 
1435     /*
1436       Close opened fildes's. Then move updated file in place
1437       of the old datafile.
1438     */
1439     if (mysql_file_close(data_file, MYF(0)) ||
1440         mysql_file_rename(csv_key_file_data,
1441                           fn_format(updated_fname, share->table_name,
1442                                     "", CSN_EXT,
1443                                     MY_REPLACE_EXT | MY_UNPACK_FILENAME),
1444                           share->data_file_name, MYF(0)))
1445       DBUG_RETURN(-1);
1446 
1447     /* Open the file again */
1448     if ((data_file= mysql_file_open(csv_key_file_data,
1449                                     share->data_file_name,
1450                                     O_RDONLY, MYF(MY_WME))) == -1)
1451       DBUG_RETURN(my_errno() ? my_errno() : -1);
1452     /*
1453       As we reopened the data file, increase share->data_file_version
1454       in order to force other threads waiting on a table lock and
1455       have already opened the table to reopen the data file.
1456       That makes the latest changes become visible to them.
1457       Update local_data_file_version as no need to reopen it in the
1458       current thread.
1459     */
1460     share->data_file_version++;
1461     local_data_file_version= share->data_file_version;
1462     /*
1463       The datafile is consistent at this point and the write filedes is
1464       closed, so nothing worrying will happen to it in case of a crash.
1465       Here we record this fact to the meta-file.
1466     */
1467     (void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
1468     /*
1469       Update local_saved_data_file_length with the real length of the
1470       data file.
1471     */
1472     local_saved_data_file_length= temp_file_length;
1473   }
1474 
1475   DBUG_RETURN(0);
1476 error:
1477   mysql_file_close(update_temp_file, MYF(0));
1478   share->update_file_opened= FALSE;
1479   DBUG_RETURN(-1);
1480 }
1481 
1482 
1483 /*
1484   Repair CSV table in the case, it is crashed.
1485 
1486   SYNOPSIS
1487     repair()
1488     thd         The thread, performing repair
1489     check_opt   The options for repair. We do not use it currently.
1490 
1491   DESCRIPTION
1492     If the file is empty, change # of rows in the file and complete recovery.
1493     Otherwise, scan the table looking for bad rows. If none were found,
1494     we mark file as a good one and return. If a bad row was encountered,
1495     we truncate the datafile up to the last good row.
1496 
1497    TODO: Make repair more clever - it should try to recover subsequent
1498          rows (after the first bad one) as well.
1499 */
1500 
repair(THD * thd,HA_CHECK_OPT * check_opt)1501 int ha_tina::repair(THD* thd, HA_CHECK_OPT* check_opt)
1502 {
1503   char repaired_fname[FN_REFLEN];
1504   uchar *buf;
1505   File repair_file;
1506   int rc;
1507   ha_rows rows_repaired= 0;
1508   my_off_t write_begin= 0, write_end;
1509   DBUG_ENTER("ha_tina::repair");
1510 
1511   /* empty file */
1512   if (!share->saved_data_file_length)
1513   {
1514     share->rows_recorded= 0;
1515     goto end;
1516   }
1517 
1518   /* Don't assert in field::val() functions */
1519   table->use_all_columns();
1520   if (!(buf= (uchar*) my_malloc(csv_key_memory_row,
1521                                 table->s->reclength, MYF(MY_WME))))
1522     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1523 
1524   /* position buffer to the start of the file */
1525   if (init_data_file())
1526     DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1527 
1528   /*
1529     Local_saved_data_file_length is initialized during the lock phase.
1530     Sometimes this is not getting executed before ::repair (e.g. for
1531     the log tables). We set it manually here.
1532   */
1533   local_saved_data_file_length= share->saved_data_file_length;
1534   /* set current position to the beginning of the file */
1535   current_position= next_position= 0;
1536 
1537   /* Read the file row-by-row. If everything is ok, repair is not needed. */
1538   while (!(rc= find_current_row(buf)))
1539   {
1540     thd_inc_row_count(thd);
1541     rows_repaired++;
1542     current_position= next_position;
1543   }
1544 
1545   free_root(&blobroot, MYF(0));
1546 
1547   my_free(buf);
1548 
1549   if (rc == HA_ERR_END_OF_FILE)
1550   {
1551     /*
1552       All rows were read ok until end of file, the file does not need repair.
1553       If rows_recorded != rows_repaired, we should update rows_recorded value
1554       to the current amount of rows.
1555     */
1556     share->rows_recorded= rows_repaired;
1557     goto end;
1558   }
1559 
1560   /*
1561     Otherwise we've encountered a bad row => repair is needed.
1562     Let us create a temporary file.
1563   */
1564   if ((repair_file= mysql_file_create(csv_key_file_update,
1565                                       fn_format(repaired_fname,
1566                                                 share->table_name,
1567                                                 "", CSN_EXT,
1568                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1569                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1570     DBUG_RETURN(HA_ERR_CRASHED_ON_REPAIR);
1571 
1572   file_buff->init_buff(data_file);
1573 
1574 
1575   /* we just truncated the file up to the first bad row. update rows count. */
1576   share->rows_recorded= rows_repaired;
1577 
1578   /* write repaired file */
1579   while (1)
1580   {
1581     write_end= min(file_buff->end(), current_position);
1582     if ((write_end - write_begin) &&
1583         (mysql_file_write(repair_file, (uchar*)file_buff->ptr(),
1584                           (size_t) (write_end - write_begin), MYF_RW)))
1585       DBUG_RETURN(-1);
1586 
1587     write_begin= write_end;
1588     if (write_end== current_position)
1589       break;
1590     else
1591       file_buff->read_next(); /* shift the buffer */
1592   }
1593 
1594   /*
1595     Close the files and rename repaired file to the datafile.
1596     We have to close the files, as on Windows one cannot rename
1597     a file, which descriptor is still open. EACCES will be returned
1598     when trying to delete the "to"-file in mysql_file_rename().
1599   */
1600   if (share->tina_write_opened)
1601   {
1602     /*
1603       Data file might be opened twice, on table opening stage and
1604       during write_row execution. We need to close both instances
1605       to satisfy Win.
1606     */
1607     if (mysql_file_close(share->tina_write_filedes, MYF(0)))
1608       DBUG_RETURN(my_errno() ? my_errno() : -1);
1609     share->tina_write_opened= FALSE;
1610   }
1611   if (mysql_file_close(data_file, MYF(0)) ||
1612       mysql_file_close(repair_file, MYF(0)) ||
1613       mysql_file_rename(csv_key_file_data,
1614                         repaired_fname, share->data_file_name, MYF(0)))
1615     DBUG_RETURN(-1);
1616 
1617   /* Open the file again, it should now be repaired */
1618   if ((data_file= mysql_file_open(csv_key_file_data,
1619                                   share->data_file_name, O_RDWR|O_APPEND,
1620                                   MYF(MY_WME))) == -1)
1621     DBUG_RETURN(my_errno() ? my_errno() : -1);
1622 
1623   /* Set new file size. The file size will be updated by ::update_status() */
1624   local_saved_data_file_length= (size_t) current_position;
1625 
1626 end:
1627   share->crashed= FALSE;
1628   DBUG_RETURN(HA_ADMIN_OK);
1629 }
1630 
1631 /*
1632   DELETE without WHERE calls this
1633 */
1634 
delete_all_rows()1635 int ha_tina::delete_all_rows()
1636 {
1637   int rc;
1638   DBUG_ENTER("ha_tina::delete_all_rows");
1639 
1640   if (!records_is_known)
1641   {
1642     set_my_errno(HA_ERR_WRONG_COMMAND);
1643     DBUG_RETURN(HA_ERR_WRONG_COMMAND);
1644   }
1645 
1646   if (!share->tina_write_opened)
1647     if (init_tina_writer())
1648       DBUG_RETURN(-1);
1649 
1650   /* Truncate the file to zero size */
1651   rc= mysql_file_chsize(share->tina_write_filedes, 0, 0, MYF(MY_WME));
1652 
1653   stats.records=0;
1654   /* Update shared info */
1655   mysql_mutex_lock(&share->mutex);
1656   share->rows_recorded= 0;
1657   mysql_mutex_unlock(&share->mutex);
1658   local_saved_data_file_length= 0;
1659   DBUG_RETURN(rc);
1660 }
1661 
1662 /*
1663   Called by the database to lock the table. Keep in mind that this
1664   is an internal lock.
1665 */
store_lock(THD * thd,THR_LOCK_DATA ** to,enum thr_lock_type lock_type)1666 THR_LOCK_DATA **ha_tina::store_lock(THD *thd,
1667                                     THR_LOCK_DATA **to,
1668                                     enum thr_lock_type lock_type)
1669 {
1670   if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
1671     lock.type=lock_type;
1672   *to++= &lock;
1673   return to;
1674 }
1675 
1676 /*
1677   Create a table. You do not want to leave the table open after a call to
1678   this (the database will call ::open() if it needs to).
1679 */
1680 
create(const char * name,TABLE * table_arg,HA_CREATE_INFO * create_info)1681 int ha_tina::create(const char *name, TABLE *table_arg,
1682                     HA_CREATE_INFO *create_info)
1683 {
1684   char name_buff[FN_REFLEN];
1685   File create_file;
1686   DBUG_ENTER("ha_tina::create");
1687 
1688   /*
1689     check columns
1690   */
1691   for (Field **field= table_arg->s->field; *field; field++)
1692   {
1693     if ((*field)->real_maybe_null())
1694     {
1695       my_error(ER_CHECK_NOT_IMPLEMENTED, MYF(0), "nullable columns");
1696       DBUG_RETURN(HA_ERR_UNSUPPORTED);
1697     }
1698   }
1699 
1700 
1701   if ((create_file= mysql_file_create(csv_key_file_metadata,
1702                                       fn_format(name_buff, name, "", CSM_EXT,
1703                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1704                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1705     DBUG_RETURN(-1);
1706 
1707   write_meta_file(create_file, 0, FALSE);
1708   mysql_file_close(create_file, MYF(0));
1709 
1710   if ((create_file= mysql_file_create(csv_key_file_data,
1711                                       fn_format(name_buff, name, "", CSV_EXT,
1712                                                 MY_REPLACE_EXT|MY_UNPACK_FILENAME),
1713                                       0, O_RDWR | O_TRUNC, MYF(MY_WME))) < 0)
1714     DBUG_RETURN(-1);
1715 
1716   mysql_file_close(create_file, MYF(0));
1717 
1718   DBUG_RETURN(0);
1719 }
1720 
check(THD * thd,HA_CHECK_OPT * check_opt)1721 int ha_tina::check(THD* thd, HA_CHECK_OPT* check_opt)
1722 {
1723   int rc= 0;
1724   uchar *buf;
1725   const char *old_proc_info;
1726   ha_rows count= share->rows_recorded;
1727   DBUG_ENTER("ha_tina::check");
1728 
1729   old_proc_info= thd_proc_info(thd, "Checking table");
1730   if (!(buf= (uchar*) my_malloc(csv_key_memory_row,
1731                                 table->s->reclength, MYF(MY_WME))))
1732     DBUG_RETURN(HA_ERR_OUT_OF_MEM);
1733 
1734   /* position buffer to the start of the file */
1735    if (init_data_file())
1736      DBUG_RETURN(HA_ERR_CRASHED);
1737 
1738   /*
1739     Local_saved_data_file_length is initialized during the lock phase.
1740     Check does not use store_lock in certain cases. So, we set it
1741     manually here.
1742   */
1743   local_saved_data_file_length= share->saved_data_file_length;
1744   /* set current position to the beginning of the file */
1745   current_position= next_position= 0;
1746 
1747   /* Read the file row-by-row. If everything is ok, repair is not needed. */
1748   while (!(rc= find_current_row(buf)))
1749   {
1750     thd_inc_row_count(thd);
1751     count--;
1752     current_position= next_position;
1753   }
1754 
1755   free_root(&blobroot, MYF(0));
1756 
1757   my_free(buf);
1758   thd_proc_info(thd, old_proc_info);
1759 
1760   if ((rc != HA_ERR_END_OF_FILE) || count)
1761   {
1762     share->crashed= TRUE;
1763     DBUG_RETURN(HA_ADMIN_CORRUPT);
1764   }
1765 
1766   DBUG_RETURN(HA_ADMIN_OK);
1767 }
1768 
1769 
check_if_incompatible_data(HA_CREATE_INFO * info,uint table_changes)1770 bool ha_tina::check_if_incompatible_data(HA_CREATE_INFO *info,
1771 					   uint table_changes)
1772 {
1773   return COMPATIBLE_DATA_YES;
1774 }
1775 
1776 struct st_mysql_storage_engine csv_storage_engine=
1777 { MYSQL_HANDLERTON_INTERFACE_VERSION };
1778 
mysql_declare_plugin(csv)1779 mysql_declare_plugin(csv)
1780 {
1781   MYSQL_STORAGE_ENGINE_PLUGIN,
1782   &csv_storage_engine,
1783   "CSV",
1784   "Brian Aker, MySQL AB",
1785   "CSV storage engine",
1786   PLUGIN_LICENSE_GPL,
1787   tina_init_func, /* Plugin Init */
1788   tina_done_func, /* Plugin Deinit */
1789   0x0100 /* 1.0 */,
1790   NULL,                       /* status variables                */
1791   NULL,                       /* system variables                */
1792   NULL,                       /* config options                  */
1793   0,                          /* flags                           */
1794 }
1795 mysql_declare_plugin_end;
1796 
1797