1 /*
2    Copyright (c) 2000, 2011, Oracle and/or its affiliates
3    Copyright (c) 2010, 2021, MariaDB
4 
5    This program is free software; you can redistribute it and/or modify
6    it under the terms of the GNU General Public License as published by
7    the Free Software Foundation; version 2 of the License.
8 
9    This program is distributed in the hope that it will be useful,
10    but WITHOUT ANY WARRANTY; without even the implied warranty of
11    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12    GNU General Public License for more details.
13 
14    You should have received a copy of the GNU General Public License
15    along with this program; if not, write to the Free Software
16    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335  USA */
17 
18 /*
19   Cashing of files with only does (sequential) read or writes of fixed-
20   length records. A read isn't allowed to go over file-length. A read is ok
21   if it ends at file-length and next read can try to read after file-length
22   (and get a EOF-error).
23   Possibly use of asyncronic io.
24   macros for read and writes for faster io.
25   Used instead of FILE when reading or writing whole files.
26   One can change info->pos_in_file to a higher value to skip bytes in file if
27   also info->read_pos is set to info->read_end.
28   If called through open_cached_file(), then the temporary file will
29   only be created if a write exeeds the file buffer or if one calls
30   my_b_flush_io_cache().
31 
32   If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
33   reading and another for writing.  Reads are first done from disk and
34   then done from the write buffer.  This is an efficient way to read
35   from a log file when one is writing to it at the same time.
36   For this to work, the file has to be opened in append mode!
37   Note that when one uses SEQ_READ_APPEND, one MUST write using
38   my_b_append !  This is needed because we need to lock the mutex
39   every time we access the write buffer.
40 
41 TODO:
42   When one SEQ_READ_APPEND and we are reading and writing at the same time,
43   each time the write buffer gets full and it's written to disk, we will
44   always do a disk read to read a part of the buffer from disk to the
45   read buffer.
46   This should be fixed so that when we do a my_b_flush_io_cache() and
47   we have been reading the write buffer, we should transfer the rest of the
48   write buffer to the read buffer before we start to reuse it.
49 */
50 
51 #include "mysys_priv.h"
52 #include <m_string.h>
53 #include <errno.h>
54 #include "mysql/psi/mysql_file.h"
55 
56 PSI_file_key key_file_io_cache;
57 
58 #define lock_append_buffer(info) \
59   mysql_mutex_lock(&(info)->append_buffer_lock)
60 #define unlock_append_buffer(info) \
61   mysql_mutex_unlock(&(info)->append_buffer_lock)
62 
63 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
64 #define IO_ROUND_DN(X) ( (X)            & ~(IO_SIZE-1))
65 
66 static int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count);
67 static int _my_b_cache_read_r(IO_CACHE *info, uchar *Buffer, size_t Count);
68 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count);
69 static int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count);
70 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count);
71 
72 int (*_my_b_encr_read)(IO_CACHE *info,uchar *Buffer,size_t Count)= 0;
73 int (*_my_b_encr_write)(IO_CACHE *info,const uchar *Buffer,size_t Count)= 0;
74 
75 
76 
77 static void
init_functions(IO_CACHE * info)78 init_functions(IO_CACHE* info)
79 {
80   enum cache_type type= info->type;
81   info->read_function = 0;                      /* Force a core if used */
82   info->write_function = 0;                     /* Force a core if used */
83   switch (type) {
84   case READ_NET:
85     /*
86       Must be initialized by the caller. The problem is that
87       _my_b_net_read has to be defined in sql directory because of
88       the dependency on THD, and therefore cannot be visible to
89       programs that link against mysys but know nothing about THD, such
90       as myisamchk
91     */
92     DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
93     break;
94   case SEQ_READ_APPEND:
95     info->read_function = _my_b_seq_read;
96     DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
97     break;
98   case READ_CACHE:
99     if (info->myflags & MY_ENCRYPT)
100     {
101       DBUG_ASSERT(info->share == 0);
102       info->read_function = _my_b_encr_read;
103       break;
104     }
105     /* fall through */
106   case WRITE_CACHE:
107     if (info->myflags & MY_ENCRYPT)
108     {
109       info->write_function = _my_b_encr_write;
110       break;
111     }
112     /* fall through */
113   case READ_FIFO:
114     DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
115     info->read_function = info->share ? _my_b_cache_read_r : _my_b_cache_read;
116     info->write_function = info->share ? _my_b_cache_write_r : _my_b_cache_write;
117     break;
118   case TYPE_NOT_SET:
119     DBUG_ASSERT(0);
120     break;
121   }
122   if (type == READ_CACHE || type == WRITE_CACHE || type == SEQ_READ_APPEND)
123     info->myflags|= MY_FULL_IO;
124   else
125     info->myflags&= ~MY_FULL_IO;
126 }
127 
128 
129 /*
130   Initialize an IO_CACHE object
131 
132   SYNOPSOS
133     init_io_cache_ext()
134     info		cache handler to initialize
135     file		File that should be associated to to the handler
136 			If == -1 then real_open_cached_file()
137 			will be called when it's time to open file.
138     cachesize		Size of buffer to allocate for read/write
139 			If == 0 then use my_default_record_cache_size
140     type		Type of cache
141     seek_offset		Where cache should start reading/writing
142     use_async_io	Set to 1 of we should use async_io (if available)
143     cache_myflags	Bitmap of different flags
144 			MY_WME | MY_FAE | MY_NABP | MY_FNABP |
145 			MY_DONT_CHECK_FILESIZE
146     file_key           Instrumented file key for temporary cache file
147 
148   RETURN
149     0  ok
150     #  error
151 */
152 
init_io_cache_ext(IO_CACHE * info,File file,size_t cachesize,enum cache_type type,my_off_t seek_offset,pbool use_async_io,myf cache_myflags,PSI_file_key file_key)153 int init_io_cache_ext(IO_CACHE *info, File file, size_t cachesize,
154                       enum cache_type type, my_off_t seek_offset,
155                       pbool use_async_io, myf cache_myflags,
156                       PSI_file_key file_key)
157 {
158   size_t min_cache;
159   my_off_t pos;
160   my_off_t end_of_file= ~(my_off_t) 0;
161   DBUG_ENTER("init_io_cache_ext");
162   DBUG_PRINT("enter",("cache:%p  type: %d  pos: %llu",
163 		      info, (int) type, (ulonglong) seek_offset));
164 
165   info->file= file;
166   info->type= TYPE_NOT_SET;	    /* Don't set it until mutex are created */
167   info->pos_in_file= seek_offset;
168   info->alloced_buffer = 0;
169   info->buffer=0;
170   info->seek_not_done= 0;
171   info->next_file_user= NULL;
172 
173   if (file >= 0)
174   {
175     DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
176     pos= mysql_file_tell(file, MYF(0));
177     if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
178     {
179       /*
180         This kind of object doesn't support seek() or tell(). Don't set a
181         seek_not_done that will make us again try to seek() later and fail.
182 
183         Additionally, if we're supposed to start somewhere other than the
184         the beginning of whatever this file is, then somebody made a bad
185         assumption.
186       */
187       DBUG_ASSERT(seek_offset == 0);
188     }
189     else
190       info->seek_not_done= MY_TEST(seek_offset != pos);
191   }
192   else
193     if (type == WRITE_CACHE && _my_b_encr_read)
194     {
195       cache_myflags|= MY_ENCRYPT;
196       DBUG_ASSERT(seek_offset == 0);
197     }
198 
199   info->disk_writes= 0;
200   info->share=0;
201 
202   if (!cachesize && !(cachesize= my_default_record_cache_size))
203     DBUG_RETURN(1);				/* No cache requested */
204   min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
205   if (type == READ_CACHE || type == SEQ_READ_APPEND)
206   {						/* Assume file isn't growing */
207     DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
208     if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
209     {
210       /* Calculate end of file to avoid allocating oversized buffers */
211       end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
212       /* Need to reset seek_not_done now that we just did a seek. */
213       info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
214       if (end_of_file < seek_offset)
215 	end_of_file=seek_offset;
216       /* Trim cache size if the file is very small */
217       if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
218       {
219 	cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
220 	use_async_io=0;				/* No need to use async */
221       }
222     }
223   }
224   cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
225   if (type != READ_NET)
226   {
227     /* Retry allocating memory in smaller blocks until we get one */
228     cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
229     for (;;)
230     {
231       size_t buffer_block;
232       /*
233         Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
234         MY_ZEROFILL.
235       */
236       myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
237 
238       if (cachesize < min_cache)
239 	cachesize = min_cache;
240       buffer_block= cachesize;
241       if (type == SEQ_READ_APPEND)
242 	buffer_block *= 2;
243       else if (cache_myflags & MY_ENCRYPT)
244         buffer_block= 2*(buffer_block + MY_AES_BLOCK_SIZE) + sizeof(IO_CACHE_CRYPT);
245       if (cachesize == min_cache)
246         flags|= (myf) MY_WME;
247 
248       if ((info->buffer= (uchar*) my_malloc(key_memory_IO_CACHE, buffer_block, flags)) != 0)
249       {
250 	if (type == SEQ_READ_APPEND)
251 	  info->write_buffer= info->buffer + cachesize;
252         else
253           info->write_buffer= info->buffer;
254 	info->alloced_buffer= buffer_block;
255 	break;					/* Enough memory found */
256       }
257       if (cachesize == min_cache)
258 	DBUG_RETURN(2);				/* Can't alloc cache */
259       /* Try with less memory */
260       cachesize= (cachesize*3/4 & ~(min_cache-1));
261     }
262   }
263 
264   DBUG_PRINT("info",("init_io_cache_ext: cachesize = %lu", (ulong) cachesize));
265   info->read_length=info->buffer_length=cachesize;
266   info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
267   info->request_pos= info->read_pos= info->write_pos = info->buffer;
268   if (type == SEQ_READ_APPEND)
269   {
270     info->append_read_pos = info->write_pos = info->write_buffer;
271     info->write_end = info->write_buffer + info->buffer_length;
272     mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
273                      &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
274   }
275 #if defined(SAFE_MUTEX)
276   else
277   {
278     /* Clear mutex so that safe_mutex will notice that it's not initialized */
279     bzero((char*) &info->append_buffer_lock, sizeof(info->append_buffer_lock));
280   }
281 #endif
282 
283   if (type == WRITE_CACHE)
284     info->write_end=
285       info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
286   else
287     info->read_end=info->buffer;		/* Nothing in cache */
288 
289   /* End_of_file may be changed by user later */
290   info->end_of_file= end_of_file;
291   info->error=0;
292   info->type= type;
293   init_functions(info);
294   DBUG_RETURN(0);
295 }
296 
init_io_cache(IO_CACHE * info,File file,size_t cachesize,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,myf cache_myflags)297 int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
298 		  enum cache_type type, my_off_t seek_offset,
299 		  my_bool use_async_io, myf cache_myflags)
300 {
301   return init_io_cache_ext(info, file, cachesize, type, seek_offset,
302                            use_async_io, cache_myflags, key_file_io_cache);
303 }
304 
305 /*
306   Initialize the slave IO_CACHE to read the same file (and data)
307   as master does.
308 
309   One can create multiple slaves from a single master. Every slave and master
310   will have independent file positions.
311 
312   The master must be a non-shared READ_CACHE.
313   It is assumed that no more reads are done after a master and/or a slave
314   has been freed (this limitation can be easily lifted).
315 */
316 
init_slave_io_cache(IO_CACHE * master,IO_CACHE * slave)317 int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
318 {
319   uchar *slave_buf;
320   DBUG_ASSERT(master->type == READ_CACHE);
321   DBUG_ASSERT(!master->share);
322   DBUG_ASSERT(master->alloced_buffer);
323 
324   if (!(slave_buf= (uchar*)my_malloc(PSI_INSTRUMENT_ME, master->alloced_buffer, MYF(0))))
325   {
326     return 1;
327   }
328   memcpy(slave, master, sizeof(IO_CACHE));
329   slave->buffer= slave_buf;
330 
331   memcpy(slave->buffer, master->buffer, master->alloced_buffer);
332   slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
333   slave->read_end= slave->buffer + (master->read_end - master->buffer);
334 
335   if (master->next_file_user)
336   {
337     IO_CACHE *p;
338     for (p= master->next_file_user;
339          p->next_file_user !=master;
340          p= p->next_file_user)
341     {}
342 
343     p->next_file_user= slave;
344     slave->next_file_user= master;
345   }
346   else
347   {
348     slave->next_file_user= master;
349     master->next_file_user= slave;
350   }
351   return 0;
352 }
353 
354 
end_slave_io_cache(IO_CACHE * cache)355 void end_slave_io_cache(IO_CACHE *cache)
356 {
357   /* Remove the cache from the next_file_user circular linked list. */
358   if (cache->next_file_user != cache)
359   {
360     IO_CACHE *p= cache->next_file_user;
361     while (p->next_file_user != cache)
362       p= p->next_file_user;
363     p->next_file_user= cache->next_file_user;
364 
365   }
366   my_free(cache->buffer);
367 }
368 
369 /*
370   Seek a read io cache to a given offset
371 */
seek_io_cache(IO_CACHE * cache,my_off_t needed_offset)372 void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
373 {
374   my_off_t cached_data_start= cache->pos_in_file;
375   my_off_t cached_data_end= cache->pos_in_file + (cache->read_end -
376                                                   cache->buffer);
377 
378   if (needed_offset >= cached_data_start &&
379       needed_offset < cached_data_end)
380   {
381     /*
382       The offset we're seeking to is in the buffer.
383       Move buffer's read position accordingly
384     */
385     cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
386   }
387   else
388   {
389     if (needed_offset > cache->end_of_file)
390       needed_offset= cache->end_of_file;
391     /*
392       The offset we're seeking to is not in the buffer.
393       - Set the buffer to be exhausted.
394       - Make the next read to a mysql_file_seek() call to the required
395         offset.
396       TODO(cvicentiu, spetrunia) properly implement aligned seeks for
397       efficiency.
398     */
399     cache->seek_not_done= 1;
400     cache->pos_in_file= needed_offset;
401     /* When reading it must appear as if we've started from  the offset
402        that we've seeked here. We must let _my_b_cache_read assume that
403        by implying "no reading starting from pos_in_file" has happened. */
404     cache->read_pos= cache->buffer;
405     cache->read_end= cache->buffer;
406   }
407 }
408 
409 
410 /*
411   Use this to reset cache to re-start reading or to change the type
412   between READ_CACHE <-> WRITE_CACHE
413   If we are doing a reinit of a cache where we have the start of the file
414   in the cache, we are reusing this memory without flushing it to disk.
415 */
416 
reinit_io_cache(IO_CACHE * info,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,my_bool clear_cache)417 my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
418 			my_off_t seek_offset,
419 			my_bool use_async_io __attribute__((unused)),
420 			my_bool clear_cache)
421 {
422   DBUG_ENTER("reinit_io_cache");
423   DBUG_PRINT("enter",("cache:%p type: %d  seek_offset: %llu  clear_cache: %d",
424 		      info, type, (ulonglong) seek_offset,
425 		      (int) clear_cache));
426 
427   DBUG_ASSERT(type == READ_CACHE || type == WRITE_CACHE);
428   DBUG_ASSERT(info->type == READ_CACHE || info->type == WRITE_CACHE);
429 
430   /* If the whole file is in memory, avoid flushing to disk */
431   if (! clear_cache &&
432       seek_offset >= info->pos_in_file &&
433       seek_offset <= my_b_tell(info))
434   {
435     /* Reuse current buffer without flushing it to disk */
436     uchar *pos;
437     if (info->type == WRITE_CACHE && type == READ_CACHE)
438     {
439       info->read_end=info->write_pos;
440       info->end_of_file=my_b_tell(info);
441       /*
442         Trigger a new seek only if we have a valid
443         file handle.
444       */
445       info->seek_not_done= (info->file != -1);
446     }
447     else if (type == WRITE_CACHE)
448     {
449       if (info->type == READ_CACHE)
450       {
451 	info->write_end=info->write_buffer+info->buffer_length;
452 	info->seek_not_done=1;
453       }
454       info->end_of_file = ~(my_off_t) 0;
455     }
456     pos=info->request_pos+(seek_offset-info->pos_in_file);
457     if (type == WRITE_CACHE)
458       info->write_pos=pos;
459     else
460       info->read_pos= pos;
461   }
462   else
463   {
464     /*
465       If we change from WRITE_CACHE to READ_CACHE, assume that everything
466       after the current positions should be ignored. In other cases we
467       update end_of_file as it may have changed since last init.
468     */
469     if (type == READ_CACHE)
470     {
471       if (info->type == WRITE_CACHE)
472 	info->end_of_file= my_b_tell(info);
473       else
474       {
475         if (!(info->myflags & MY_ENCRYPT))
476           info->end_of_file= mysql_file_seek(info->file, 0L,
477                                              MY_SEEK_END, MYF(0));
478       }
479     }
480     /* flush cache if we want to reuse it */
481     if (!clear_cache && my_b_flush_io_cache(info,1))
482       DBUG_RETURN(1);
483     info->pos_in_file=seek_offset;
484     /* Better to do always do a seek */
485     info->seek_not_done=1;
486     info->request_pos=info->read_pos=info->write_pos=info->buffer;
487     if (type == READ_CACHE)
488     {
489       info->read_end=info->buffer;		/* Nothing in cache */
490     }
491     else
492     {
493       if (info->myflags & MY_ENCRYPT)
494       {
495         info->write_end = info->write_buffer + info->buffer_length;
496         if (seek_offset && info->file != -1)
497         {
498           info->read_end= info->buffer;
499           _my_b_encr_read(info, 0, 0); /* prefill the buffer */
500           info->write_pos= info->read_pos;
501           info->seek_not_done=1;
502         }
503       }
504       else
505       {
506         info->write_end=(info->buffer + info->buffer_length -
507                          (seek_offset & (IO_SIZE-1)));
508       }
509       info->end_of_file= ~(my_off_t) 0;
510     }
511   }
512   info->type=type;
513   info->error=0;
514   init_functions(info);
515   DBUG_RETURN(0);
516 } /* reinit_io_cache */
517 
518 
_my_b_read(IO_CACHE * info,uchar * Buffer,size_t Count)519 int _my_b_read(IO_CACHE *info, uchar *Buffer, size_t Count)
520 {
521   size_t left_length;
522   int res;
523 
524   /* If the buffer is not empty yet, copy what is available. */
525   if ((left_length= (size_t) (info->read_end - info->read_pos)))
526   {
527     DBUG_ASSERT(Count > left_length);
528     memcpy(Buffer, info->read_pos, left_length);
529     Buffer+=left_length;
530     Count-=left_length;
531   }
532   res= info->read_function(info, Buffer, Count);
533   if (res && info->error >= 0)
534     info->error+= (int)left_length; /* update number or read bytes */
535   return res;
536 }
537 
_my_b_write(IO_CACHE * info,const uchar * Buffer,size_t Count)538 int _my_b_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
539 {
540   size_t rest_length;
541   int res;
542 
543   /* Always use my_b_flush_io_cache() to flush write_buffer! */
544   DBUG_ASSERT(Buffer != info->write_buffer);
545 
546   if (info->pos_in_file + info->buffer_length > info->end_of_file)
547   {
548     my_errno=errno=EFBIG;
549     return info->error = -1;
550   }
551 
552   rest_length= (size_t) (info->write_end - info->write_pos);
553   DBUG_ASSERT(Count >= rest_length);
554   memcpy(info->write_pos, Buffer, (size_t) rest_length);
555   Buffer+=rest_length;
556   Count-=rest_length;
557   info->write_pos+=rest_length;
558 
559   if (my_b_flush_io_cache(info, 1))
560     return 1;
561 
562   if (Count)
563   {
564     my_off_t old_pos_in_file= info->pos_in_file;
565     res= info->write_function(info, Buffer, Count);
566     Count-= (size_t) (info->pos_in_file - old_pos_in_file);
567     Buffer+= info->pos_in_file - old_pos_in_file;
568   }
569   else
570     res= 0;
571 
572   if (!res && Count)
573   {
574     memcpy(info->write_pos, Buffer, Count);
575     info->write_pos+= Count;
576   }
577   return res;
578 }
579 
580 /*
581   Read buffered.
582 
583   SYNOPSIS
584     _my_b_cache_read()
585       info                      IO_CACHE pointer
586       Buffer                    Buffer to retrieve count bytes from file
587       Count                     Number of bytes to read into Buffer
588 
589   NOTE
590     This function is only called from the my_b_read() macro when there
591     isn't enough characters in the buffer to satisfy the request.
592 
593   WARNING
594 
595     When changing this function, be careful with handling file offsets
596     (end-of_file, pos_in_file). Do not cast them to possibly smaller
597     types than my_off_t unless you can be sure that their value fits.
598     Same applies to differences of file offsets.
599 
600     When changing this function, check _my_b_cache_read_r(). It might need the
601     same change.
602 
603   RETURN
604     0      we succeeded in reading all data
605     1      Error: couldn't read requested characters. In this case:
606              If info->error == -1, we got a read error.
607              Otherwise info->error contains the number of bytes in Buffer.
608 */
609 
_my_b_cache_read(IO_CACHE * info,uchar * Buffer,size_t Count)610 int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
611 {
612   size_t length= 0, diff_length, left_length= 0, max_length;
613   my_off_t pos_in_file;
614   DBUG_ENTER("_my_b_cache_read");
615 
616   /* pos_in_file always point on where info->buffer was read */
617   pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
618 
619   /*
620     Whenever a function which operates on IO_CACHE flushes/writes
621     some part of the IO_CACHE to disk it will set the property
622     "seek_not_done" to indicate this to other functions operating
623     on the IO_CACHE.
624   */
625   if (info->seek_not_done)
626   {
627     if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0))
628         != MY_FILEPOS_ERROR))
629     {
630       /* No error, reset seek_not_done flag. */
631       info->seek_not_done= 0;
632 
633       if (info->next_file_user)
634       {
635         IO_CACHE *c;
636         for (c= info->next_file_user;
637              c!= info;
638              c= c->next_file_user)
639         {
640           c->seek_not_done= 1;
641         }
642       }
643     }
644     else
645     {
646       /*
647         If the seek failed and the error number is ESPIPE, it is because
648         info->file is a pipe or socket or FIFO.  We never should have tried
649         to seek on that.  See Bugs#25807 and #22828 for more info.
650       */
651       DBUG_ASSERT(my_errno != ESPIPE);
652       info->error= -1;
653       DBUG_RETURN(1);
654     }
655   }
656 
657   /*
658     Calculate, how much we are within a IO_SIZE block. Ideally this
659     should be zero.
660   */
661   diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
662 
663   /*
664     If more than a block plus the rest of the current block is wanted,
665     we do read directly, without filling the buffer.
666   */
667   if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
668   {					/* Fill first intern buffer */
669     size_t read_length;
670     if (info->end_of_file <= pos_in_file)
671     {
672       /* End of file. Return, what we did copy from the buffer. */
673       info->error= (int) left_length;
674       info->seek_not_done=1;
675       DBUG_RETURN(1);
676     }
677     /*
678       Crop the wanted count to a multiple of IO_SIZE and subtract,
679       what we did already read from a block. That way, the read will
680       end aligned with a block.
681     */
682     length= IO_ROUND_DN(Count) - diff_length;
683     if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
684 	!= length)
685     {
686       /*
687         If we didn't get, what we wanted, we either return -1 for a read
688         error, or (it's end of file), how much we got in total.
689       */
690       info->error= (read_length == (size_t) -1 ? -1 :
691 		    (int) (read_length+left_length));
692       info->seek_not_done=1;
693       DBUG_RETURN(1);
694     }
695     Count-=length;
696     Buffer+=length;
697     pos_in_file+=length;
698     left_length+=length;
699     diff_length=0;
700   }
701 
702   /*
703     At this point, we want less than one and a partial block.
704     We will read a full cache, minus the number of bytes, we are
705     within a block already. So we will reach new alignment.
706   */
707   max_length= info->read_length-diff_length;
708   /* We will not read past end of file. */
709   if (info->type != READ_FIFO &&
710       max_length > (info->end_of_file - pos_in_file))
711     max_length= (size_t) (info->end_of_file - pos_in_file);
712   /*
713     If there is nothing left to read,
714       we either are done, or we failed to fulfill the request.
715     Otherwise, we read max_length into the cache.
716   */
717   if (!max_length)
718   {
719     if (Count)
720     {
721       /* We couldn't fulfil the request. Return, how much we got. */
722       info->error= (int) left_length;
723       DBUG_RETURN(1);
724     }
725     else
726     {
727       info->error= 0;
728       if (length == 0)                            /* nothing was read */
729         DBUG_RETURN(0);                           /* EOF */
730 
731       length= 0;                       /* non-zero size read was done */
732     }
733   }
734   else
735   {
736     if (info->next_file_user)
737     {
738       IO_CACHE *c;
739       for (c= info->next_file_user;
740            c!= info;
741            c= c->next_file_user)
742       {
743         c->seek_not_done= 1;
744       }
745     }
746     if ((length= mysql_file_read(info->file,info->buffer, max_length,
747                             info->myflags)) < Count ||
748 	   length == (size_t) -1)
749     {
750       /*
751         We got an read error, or less than requested (end of file).
752         If not a read error, copy, what we got.
753       */
754       if (length != (size_t) -1)
755         memcpy(Buffer, info->buffer, length);
756       info->pos_in_file= pos_in_file;
757       /* For a read error, return -1, otherwise, what we got in total. */
758       info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
759       info->read_pos=info->read_end=info->buffer;
760       info->seek_not_done=1;
761       DBUG_RETURN(1);
762     }
763   }
764   /*
765     Count is the remaining number of bytes requested.
766     length is the amount of data in the cache.
767     Read Count bytes from the cache.
768   */
769   info->read_pos=info->buffer+Count;
770   info->read_end=info->buffer+length;
771   info->pos_in_file=pos_in_file;
772   if (Count)
773     memcpy(Buffer, info->buffer, Count);
774   DBUG_RETURN(0);
775 }
776 
777 
778 /*
779   Prepare IO_CACHE for shared use.
780 
781   SYNOPSIS
782     init_io_cache_share()
783       read_cache                A read cache. This will be copied for
784                                 every thread after setup.
785       cshare                    The share.
786       write_cache               If non-NULL a write cache that is to be
787                                 synchronized with the read caches.
788       num_threads               Number of threads sharing the cache
789                                 including the write thread if any.
790 
791   DESCRIPTION
792 
793     The shared cache is used so: One IO_CACHE is initialized with
794     init_io_cache(). This includes the allocation of a buffer. Then a
795     share is allocated and init_io_cache_share() is called with the io
796     cache and the share. Then the io cache is copied for each thread. So
797     every thread has its own copy of IO_CACHE. But the allocated buffer
798     is shared because cache->buffer is the same for all caches.
799 
800     One thread reads data from the file into the buffer. All threads
801     read from the buffer, but every thread maintains its own set of
802     pointers into the buffer. When all threads have used up the buffer
803     contents, one of the threads reads the next block of data into the
804     buffer. To accomplish this, each thread enters the cache lock before
805     accessing the buffer. They wait in lock_io_cache() until all threads
806     joined the lock. The last thread entering the lock is in charge of
807     reading from file to buffer. It wakes all threads when done.
808 
809     Synchronizing a write cache to the read caches works so: Whenever
810     the write buffer needs a flush, the write thread enters the lock and
811     waits for all other threads to enter the lock too. They do this when
812     they have used up the read buffer. When all threads are in the lock,
813     the write thread copies the write buffer to the read buffer and
814     wakes all threads.
815 
816     share->running_threads is the number of threads not being in the
817     cache lock. When entering lock_io_cache() the number is decreased.
818     When the thread that fills the buffer enters unlock_io_cache() the
819     number is reset to the number of threads. The condition
820     running_threads == 0 means that all threads are in the lock. Bumping
821     up the number to the full count is non-intuitive. But increasing the
822     number by one for each thread that leaves the lock could lead to a
823     solo run of one thread. The last thread to join a lock reads from
824     file to buffer, wakes the other threads, processes the data in the
825     cache and enters the lock again. If no other thread left the lock
826     meanwhile, it would think it's the last one again and read the next
827     block...
828 
829     The share has copies of 'error', 'buffer', 'read_end', and
830     'pos_in_file' from the thread that filled the buffer. We may not be
831     able to access this information directly from its cache because the
832     thread may be removed from the share before the variables could be
833     copied by all other threads. Or, if a write buffer is synchronized,
834     it would change its 'pos_in_file' after waking the other threads,
835     possibly before they could copy its value.
836 
837     However, the 'buffer' variable in the share is for a synchronized
838     write cache. It needs to know where to put the data. Otherwise it
839     would need access to the read cache of one of the threads that is
840     not yet removed from the share.
841 
842   RETURN
843     void
844 */
845 
init_io_cache_share(IO_CACHE * read_cache,IO_CACHE_SHARE * cshare,IO_CACHE * write_cache,uint num_threads)846 void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
847                          IO_CACHE *write_cache, uint num_threads)
848 {
849   DBUG_ENTER("init_io_cache_share");
850   DBUG_PRINT("io_cache_share", ("read_cache: %p  share: %p "
851                                 "write_cache: %p  threads: %u",
852                                  read_cache,  cshare,
853                                  write_cache, num_threads));
854 
855   DBUG_ASSERT(num_threads > 1);
856   DBUG_ASSERT(read_cache->type == READ_CACHE);
857   DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
858 
859   mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
860                    &cshare->mutex, MY_MUTEX_INIT_FAST);
861   mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
862   mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
863 
864   cshare->running_threads= num_threads;
865   cshare->total_threads=   num_threads;
866   cshare->error=           0;    /* Initialize. */
867   cshare->buffer=          read_cache->buffer;
868   cshare->read_end=        NULL; /* See function comment of lock_io_cache(). */
869   cshare->pos_in_file=     0;    /* See function comment of lock_io_cache(). */
870   cshare->source_cache=    write_cache; /* Can be NULL. */
871 
872   read_cache->share=         cshare;
873   read_cache->read_function= _my_b_cache_read_r;
874 
875   if (write_cache)
876   {
877     write_cache->share= cshare;
878     write_cache->write_function= _my_b_cache_write_r;
879   }
880 
881   DBUG_VOID_RETURN;
882 }
883 
884 
885 /*
886   Remove a thread from shared access to IO_CACHE.
887 
888   SYNOPSIS
889     remove_io_thread()
890       cache                     The IO_CACHE to be removed from the share.
891 
892   NOTE
893 
894     Every thread must do that on exit for not to deadlock other threads.
895 
896     The last thread destroys the pthread resources.
897 
898     A writer flushes its cache first.
899 
900   RETURN
901     void
902 */
903 
remove_io_thread(IO_CACHE * cache)904 void remove_io_thread(IO_CACHE *cache)
905 {
906   IO_CACHE_SHARE *cshare= cache->share;
907   uint total;
908   DBUG_ENTER("remove_io_thread");
909 
910   /* If the writer goes, it needs to flush the write cache. */
911   if (cache == cshare->source_cache)
912     flush_io_cache(cache);
913 
914   mysql_mutex_lock(&cshare->mutex);
915   DBUG_PRINT("io_cache_share", ("%s: %p",
916                                 (cache == cshare->source_cache) ?
917                                 "writer" : "reader", cache));
918 
919   /* Remove from share. */
920   total= --cshare->total_threads;
921   DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
922 
923   /* Detach from share. */
924   cache->share= NULL;
925 
926   /* If the writer goes, let the readers know. */
927   if (cache == cshare->source_cache)
928   {
929     DBUG_PRINT("io_cache_share", ("writer leaves"));
930     cshare->source_cache= NULL;
931   }
932 
933   /* If all threads are waiting for me to join the lock, wake them. */
934   if (!--cshare->running_threads)
935   {
936     DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
937     mysql_cond_signal(&cshare->cond_writer);
938     mysql_cond_broadcast(&cshare->cond);
939   }
940 
941   mysql_mutex_unlock(&cshare->mutex);
942 
943   if (!total)
944   {
945     DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
946     mysql_cond_destroy (&cshare->cond_writer);
947     mysql_cond_destroy (&cshare->cond);
948     mysql_mutex_destroy(&cshare->mutex);
949   }
950 
951   DBUG_VOID_RETURN;
952 }
953 
954 
955 /*
956   Lock IO cache and wait for all other threads to join.
957 
958   SYNOPSIS
959     lock_io_cache()
960       cache                     The cache of the thread entering the lock.
961       pos                       File position of the block to read.
962                                 Unused for the write thread.
963 
964   DESCRIPTION
965 
966     Wait for all threads to finish with the current buffer. We want
967     all threads to proceed in concert. The last thread to join
968     lock_io_cache() will read the block from file and all threads start
969     to use it. Then they will join again for reading the next block.
970 
971     The waiting threads detect a fresh buffer by comparing
972     cshare->pos_in_file with the position they want to process next.
973     Since the first block may start at position 0, we take
974     cshare->read_end as an additional condition. This variable is
975     initialized to NULL and will be set after a block of data is written
976     to the buffer.
977 
978   RETURN
979     1           OK, lock in place, go ahead and read.
980     0           OK, unlocked, another thread did the read.
981 */
982 
lock_io_cache(IO_CACHE * cache,my_off_t pos)983 static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
984 {
985   IO_CACHE_SHARE *cshare= cache->share;
986   DBUG_ENTER("lock_io_cache");
987 
988   /* Enter the lock. */
989   mysql_mutex_lock(&cshare->mutex);
990   cshare->running_threads--;
991   DBUG_PRINT("io_cache_share", ("%s: %p  pos: %lu  running: %u",
992                                 (cache == cshare->source_cache) ?
993                                 "writer" : "reader", cache, (ulong) pos,
994                                 cshare->running_threads));
995 
996   if (cshare->source_cache)
997   {
998     /* A write cache is synchronized to the read caches. */
999 
1000     if (cache == cshare->source_cache)
1001     {
1002       /* The writer waits until all readers are here. */
1003       while (cshare->running_threads)
1004       {
1005         DBUG_PRINT("io_cache_share", ("writer waits in lock"));
1006         mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
1007       }
1008       DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
1009 
1010       /* Stay locked. Leave the lock later by unlock_io_cache(). */
1011       DBUG_RETURN(1);
1012     }
1013 
1014     /* The last thread wakes the writer. */
1015     if (!cshare->running_threads)
1016     {
1017       DBUG_PRINT("io_cache_share", ("waking writer"));
1018       mysql_cond_signal(&cshare->cond_writer);
1019     }
1020 
1021     /*
1022       Readers wait until the data is copied from the writer. Another
1023       reason to stop waiting is the removal of the write thread. If this
1024       happens, we leave the lock with old data in the buffer.
1025     */
1026     while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1027            cshare->source_cache)
1028     {
1029       DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1030       mysql_cond_wait(&cshare->cond, &cshare->mutex);
1031     }
1032 
1033     /*
1034       If the writer was removed from the share while this thread was
1035       asleep, we need to simulate an EOF condition. The writer cannot
1036       reset the share variables as they might still be in use by readers
1037       of the last block. When we awake here then because the last
1038       joining thread signalled us. If the writer is not the last, it
1039       will not signal. So it is safe to clear the buffer here.
1040     */
1041     if (!cshare->read_end || (cshare->pos_in_file < pos))
1042     {
1043       DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
1044       cshare->read_end= cshare->buffer; /* Empty buffer. */
1045       cshare->error= 0; /* EOF is not an error. */
1046     }
1047   }
1048   else
1049   {
1050     /*
1051       There are read caches only. The last thread arriving in
1052       lock_io_cache() continues with a locked cache and reads the block.
1053     */
1054     if (!cshare->running_threads)
1055     {
1056       DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
1057       /* Stay locked. Leave the lock later by unlock_io_cache(). */
1058       DBUG_RETURN(1);
1059     }
1060 
1061     /*
1062       All other threads wait until the requested block is read by the
1063       last thread arriving. Another reason to stop waiting is the
1064       removal of a thread. If this leads to all threads being in the
1065       lock, we have to continue also. The first of the awaken threads
1066       will then do the read.
1067     */
1068     while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1069            cshare->running_threads)
1070     {
1071       DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1072       mysql_cond_wait(&cshare->cond, &cshare->mutex);
1073     }
1074 
1075     /* If the block is not yet read, continue with a locked cache and read. */
1076     if (!cshare->read_end || (cshare->pos_in_file < pos))
1077     {
1078       DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
1079       /* Stay locked. Leave the lock later by unlock_io_cache(). */
1080       DBUG_RETURN(1);
1081     }
1082 
1083     /* Another thread did read the block already. */
1084   }
1085   DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
1086                                 (uint) (cshare->read_end ? (size_t)
1087                                         (cshare->read_end - cshare->buffer) :
1088                                         0)));
1089 
1090   /*
1091     Leave the lock. Do not call unlock_io_cache() later. The thread that
1092     filled the buffer did this and marked all threads as running.
1093   */
1094   mysql_mutex_unlock(&cshare->mutex);
1095   DBUG_RETURN(0);
1096 }
1097 
1098 
1099 /*
1100   Unlock IO cache.
1101 
1102   SYNOPSIS
1103     unlock_io_cache()
1104       cache                     The cache of the thread leaving the lock.
1105 
1106   NOTE
1107     This is called by the thread that filled the buffer. It marks all
1108     threads as running and awakes them. This must not be done by any
1109     other thread.
1110 
1111     Do not signal cond_writer. Either there is no writer or the writer
1112     is the only one who can call this function.
1113 
1114     The reason for resetting running_threads to total_threads before
1115     waking all other threads is that it could be possible that this
1116     thread is so fast with processing the buffer that it enters the lock
1117     before even one other thread has left it. If every awoken thread
1118     would increase running_threads by one, this thread could think that
1119     he is again the last to join and would not wait for the other
1120     threads to process the data.
1121 
1122   RETURN
1123     void
1124 */
1125 
unlock_io_cache(IO_CACHE * cache)1126 static void unlock_io_cache(IO_CACHE *cache)
1127 {
1128   IO_CACHE_SHARE *cshare= cache->share;
1129   DBUG_ENTER("unlock_io_cache");
1130   DBUG_PRINT("io_cache_share", ("%s: %p  pos: %lu  running: %u",
1131                                 (cache == cshare->source_cache) ?
1132                                 "writer" : "reader",
1133                                 cache, (ulong) cshare->pos_in_file,
1134                                 cshare->total_threads));
1135 
1136   cshare->running_threads= cshare->total_threads;
1137   mysql_cond_broadcast(&cshare->cond);
1138   mysql_mutex_unlock(&cshare->mutex);
1139   DBUG_VOID_RETURN;
1140 }
1141 
1142 
1143 /*
1144   Read from IO_CACHE when it is shared between several threads.
1145 
1146   SYNOPSIS
1147     _my_b_cache_read_r()
1148       cache                     IO_CACHE pointer
1149       Buffer                    Buffer to retrieve count bytes from file
1150       Count                     Number of bytes to read into Buffer
1151 
1152   NOTE
1153     This function is only called from the my_b_read() macro when there
1154     isn't enough characters in the buffer to satisfy the request.
1155 
1156   IMPLEMENTATION
1157 
1158     It works as follows: when a thread tries to read from a file (that
1159     is, after using all the data from the (shared) buffer), it just
1160     hangs on lock_io_cache(), waiting for other threads. When the very
1161     last thread attempts a read, lock_io_cache() returns 1, the thread
1162     does actual IO and unlock_io_cache(), which signals all the waiting
1163     threads that data is in the buffer.
1164 
1165   WARNING
1166 
1167     When changing this function, be careful with handling file offsets
1168     (end-of_file, pos_in_file). Do not cast them to possibly smaller
1169     types than my_off_t unless you can be sure that their value fits.
1170     Same applies to differences of file offsets. (Bug #11527)
1171 
1172     When changing this function, check _my_b_cache_read(). It might need the
1173     same change.
1174 
1175   RETURN
1176     0      we succeeded in reading all data
1177     1      Error: can't read requested characters
1178 */
1179 
_my_b_cache_read_r(IO_CACHE * cache,uchar * Buffer,size_t Count)1180 static int _my_b_cache_read_r(IO_CACHE *cache, uchar *Buffer, size_t Count)
1181 {
1182   my_off_t pos_in_file;
1183   size_t length, diff_length, left_length= 0;
1184   IO_CACHE_SHARE *cshare= cache->share;
1185   DBUG_ENTER("_my_b_cache_read_r");
1186   DBUG_ASSERT(!(cache->myflags & MY_ENCRYPT));
1187 
1188   while (Count)
1189   {
1190     size_t cnt, len;
1191 
1192     pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1193     diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1194     length=IO_ROUND_UP(Count+diff_length)-diff_length;
1195     length= ((length <= cache->read_length) ?
1196              length + IO_ROUND_DN(cache->read_length - length) :
1197              length - IO_ROUND_UP(length - cache->read_length));
1198     if (cache->type != READ_FIFO &&
1199 	(length > (cache->end_of_file - pos_in_file)))
1200       length= (size_t) (cache->end_of_file - pos_in_file);
1201     if (length == 0)
1202     {
1203       cache->error= (int) left_length;
1204       DBUG_RETURN(1);
1205     }
1206     if (lock_io_cache(cache, pos_in_file))
1207     {
1208       /* With a synchronized write/read cache we won't come here... */
1209       DBUG_ASSERT(!cshare->source_cache);
1210       /*
1211         ... unless the writer has gone before this thread entered the
1212         lock. Simulate EOF in this case. It can be distinguished by
1213         cache->file.
1214       */
1215       if (cache->file < 0)
1216         len= 0;
1217       else
1218       {
1219         /*
1220           Whenever a function which operates on IO_CACHE flushes/writes
1221           some part of the IO_CACHE to disk it will set the property
1222           "seek_not_done" to indicate this to other functions operating
1223           on the IO_CACHE.
1224         */
1225         if (cache->seek_not_done)
1226         {
1227           if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1228               == MY_FILEPOS_ERROR)
1229           {
1230             cache->error= -1;
1231             unlock_io_cache(cache);
1232             DBUG_RETURN(1);
1233           }
1234         }
1235         len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1236       }
1237       DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1238 
1239       cache->read_end=    cache->buffer + (len == (size_t) -1 ? 0 : len);
1240       cache->error=       (len == length ? 0 : (int) len);
1241       cache->pos_in_file= pos_in_file;
1242 
1243       /* Copy important values to the share. */
1244       cshare->error=       cache->error;
1245       cshare->read_end=    cache->read_end;
1246       cshare->pos_in_file= pos_in_file;
1247 
1248       /* Mark all threads as running and wake them. */
1249       unlock_io_cache(cache);
1250     }
1251     else
1252     {
1253       /*
1254         With a synchronized write/read cache readers always come here.
1255         Copy important values from the share.
1256       */
1257       cache->error=       cshare->error;
1258       cache->read_end=    cshare->read_end;
1259       cache->pos_in_file= cshare->pos_in_file;
1260 
1261       len= ((cache->error == -1) ? (size_t) -1 :
1262             (size_t) (cache->read_end - cache->buffer));
1263     }
1264     cache->read_pos=      cache->buffer;
1265     cache->seek_not_done= 0;
1266     if (len == 0 || len == (size_t) -1)
1267     {
1268       DBUG_PRINT("io_cache_share", ("reader error. len %lu  left %lu",
1269                                     (ulong) len, (ulong) left_length));
1270       cache->error= (int) left_length;
1271       DBUG_RETURN(1);
1272     }
1273     cnt= (len > Count) ? Count : len;
1274     if (cnt)
1275       memcpy(Buffer, cache->read_pos, cnt);
1276     Count -= cnt;
1277     Buffer+= cnt;
1278     left_length+= cnt;
1279     cache->read_pos+= cnt;
1280   }
1281   DBUG_RETURN(0);
1282 }
1283 
1284 
1285 /*
1286   Copy data from write cache to read cache.
1287 
1288   SYNOPSIS
1289     copy_to_read_buffer()
1290       write_cache               The write cache.
1291       write_buffer              The source of data, mostly the cache buffer.
1292       write_length              The number of bytes to copy.
1293 
1294   NOTE
1295     The write thread will wait for all read threads to join the cache
1296     lock. Then it copies the data over and wakes the read threads.
1297 
1298   RETURN
1299     void
1300 */
1301 
copy_to_read_buffer(IO_CACHE * write_cache,const uchar * write_buffer,my_off_t pos_in_file)1302 static void copy_to_read_buffer(IO_CACHE *write_cache,
1303                                 const uchar *write_buffer, my_off_t pos_in_file)
1304 {
1305   size_t write_length= (size_t) (write_cache->pos_in_file - pos_in_file);
1306   IO_CACHE_SHARE *cshare= write_cache->share;
1307 
1308   DBUG_ASSERT(cshare->source_cache == write_cache);
1309   /*
1310     write_length is usually less or equal to buffer_length.
1311     It can be bigger if _my_b_cache_write_r() is called with a big length.
1312   */
1313   while (write_length)
1314   {
1315     size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1316     int  __attribute__((unused)) rc;
1317 
1318     rc= lock_io_cache(write_cache, pos_in_file);
1319     /* The writing thread does always have the lock when it awakes. */
1320     DBUG_ASSERT(rc);
1321 
1322     memcpy(cshare->buffer, write_buffer, copy_length);
1323 
1324     cshare->error=       0;
1325     cshare->read_end=    cshare->buffer + copy_length;
1326     cshare->pos_in_file= pos_in_file;
1327 
1328     /* Mark all threads as running and wake them. */
1329     unlock_io_cache(write_cache);
1330 
1331     write_buffer+= copy_length;
1332     write_length-= copy_length;
1333   }
1334 }
1335 
1336 
1337 /*
1338   Do sequential read from the SEQ_READ_APPEND cache.
1339 
1340   We do this in three stages:
1341    - first read from info->buffer
1342    - then if there are still data to read, try the file descriptor
1343    - afterwards, if there are still data to read, try append buffer
1344 
1345   RETURNS
1346     0  Success
1347     1  Failed to read
1348 */
1349 
_my_b_seq_read(IO_CACHE * info,uchar * Buffer,size_t Count)1350 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count)
1351 {
1352   size_t length, diff_length, save_count, max_length;
1353   my_off_t pos_in_file;
1354   save_count=Count;
1355 
1356   lock_append_buffer(info);
1357 
1358   /* pos_in_file always point on where info->buffer was read */
1359   if ((pos_in_file=info->pos_in_file +
1360        (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1361     goto read_append_buffer;
1362 
1363   /*
1364     With read-append cache we must always do a seek before we read,
1365     because the write could have moved the file pointer astray
1366   */
1367   if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1368   {
1369    info->error= -1;
1370    unlock_append_buffer(info);
1371    return (1);
1372   }
1373   info->seek_not_done=0;
1374 
1375   diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1376 
1377   /* now the second stage begins - read from file descriptor */
1378   if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1379   {
1380     /* Fill first intern buffer */
1381     size_t read_length;
1382 
1383     length= IO_ROUND_DN(Count) - diff_length;
1384     if ((read_length= mysql_file_read(info->file,Buffer, length,
1385                                       info->myflags)) == (size_t) -1)
1386     {
1387       info->error= -1;
1388       unlock_append_buffer(info);
1389       return 1;
1390     }
1391     Count-=read_length;
1392     Buffer+=read_length;
1393     pos_in_file+=read_length;
1394 
1395     if (read_length != length)
1396     {
1397       /*
1398 	We only got part of data;  Read the rest of the data from the
1399 	write buffer
1400       */
1401       goto read_append_buffer;
1402     }
1403     diff_length=0;
1404   }
1405 
1406   max_length= info->read_length-diff_length;
1407   if (max_length > (info->end_of_file - pos_in_file))
1408     max_length= (size_t) (info->end_of_file - pos_in_file);
1409   if (!max_length)
1410   {
1411     if (Count)
1412       goto read_append_buffer;
1413     length=0;				/* Didn't read any more chars */
1414   }
1415   else
1416   {
1417     length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1418     if (length == (size_t) -1)
1419     {
1420       info->error= -1;
1421       unlock_append_buffer(info);
1422       return 1;
1423     }
1424     if (length < Count)
1425     {
1426       memcpy(Buffer, info->buffer, length);
1427       Count -= length;
1428       Buffer += length;
1429 
1430       /*
1431 	 added the line below to make
1432 	 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1433 	 otherwise this does not appear to be needed
1434       */
1435       pos_in_file += length;
1436       goto read_append_buffer;
1437     }
1438   }
1439   unlock_append_buffer(info);
1440   info->read_pos=info->buffer+Count;
1441   info->read_end=info->buffer+length;
1442   info->pos_in_file=pos_in_file;
1443   memcpy(Buffer,info->buffer,(size_t) Count);
1444   return 0;
1445 
1446 read_append_buffer:
1447 
1448   /*
1449      Read data from the current write buffer.
1450      Count should never be == 0 here (The code will work even if count is 0)
1451   */
1452 
1453   {
1454     /* First copy the data to Count */
1455     size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1456     size_t copy_len;
1457     size_t transfer_len;
1458 
1459     DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1460     copy_len=MY_MIN(Count, len_in_buff);
1461     memcpy(Buffer, info->append_read_pos, copy_len);
1462     info->append_read_pos += copy_len;
1463     Count -= copy_len;
1464     if (Count)
1465       info->error= (int) (save_count - Count);
1466 
1467     /* Fill read buffer with data from write buffer */
1468     memcpy(info->buffer, info->append_read_pos,
1469 	   (size_t) (transfer_len=len_in_buff - copy_len));
1470     info->read_pos= info->buffer;
1471     info->read_end= info->buffer+transfer_len;
1472     info->append_read_pos=info->write_pos;
1473     info->pos_in_file=pos_in_file+copy_len;
1474     info->end_of_file+=len_in_buff;
1475   }
1476   unlock_append_buffer(info);
1477   return Count ? 1 : 0;
1478 }
1479 
1480 
1481 /* Read one byte when buffer is empty */
1482 
_my_b_get(IO_CACHE * info)1483 int _my_b_get(IO_CACHE *info)
1484 {
1485   uchar buff;
1486   if ((*(info)->read_function)(info,&buff,1))
1487     return my_b_EOF;
1488   return (int) (uchar) buff;
1489 }
1490 
1491 /*
1492    Write a byte buffer to IO_CACHE and flush to disk
1493    if IO_CACHE is full.
1494 
1495    RETURN VALUE
1496     1 On error on write
1497     0 On success
1498    -1 On error; my_errno contains error code.
1499 */
1500 
_my_b_cache_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1501 int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1502 {
1503   if (Buffer != info->write_buffer)
1504   {
1505     Count= IO_ROUND_DN(Count);
1506     if (!Count)
1507       return 0;
1508   }
1509 
1510   if (info->seek_not_done)
1511   {
1512     /*
1513       Whenever a function which operates on IO_CACHE flushes/writes
1514       some part of the IO_CACHE to disk it will set the property
1515       "seek_not_done" to indicate this to other functions operating
1516       on the IO_CACHE.
1517     */
1518     if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET,
1519                         MYF(info->myflags & MY_WME)) == MY_FILEPOS_ERROR)
1520     {
1521       info->error= -1;
1522       return 1;
1523     }
1524     info->seek_not_done=0;
1525   }
1526   if (mysql_file_write(info->file, Buffer, Count, info->myflags | MY_NABP))
1527     return info->error= -1;
1528 
1529   info->pos_in_file+= Count;
1530   return 0;
1531 }
1532 
1533 
1534 /*
1535   In case of a shared I/O cache with a writer we normally do direct
1536   write cache to read cache copy. Simulate this here by direct
1537   caller buffer to read cache copy. Do it after the write so that
1538   the cache readers actions on the flushed part can go in parallel
1539   with the write of the extra stuff. copy_to_read_buffer()
1540   synchronizes writer and readers so that after this call the
1541   readers can act on the extra stuff while the writer can go ahead
1542   and prepare the next output. copy_to_read_buffer() relies on
1543   info->pos_in_file.
1544 */
_my_b_cache_write_r(IO_CACHE * info,const uchar * Buffer,size_t Count)1545 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count)
1546 {
1547   my_off_t old_pos_in_file= info->pos_in_file;
1548   int res= _my_b_cache_write(info, Buffer, Count);
1549   if (res)
1550     return res;
1551 
1552   DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1553   DBUG_ASSERT(info->share);
1554   copy_to_read_buffer(info, Buffer, old_pos_in_file);
1555 
1556   return 0;
1557 }
1558 
1559 
1560 /*
1561   Append a block to the write buffer.
1562   This is done with the buffer locked to ensure that we don't read from
1563   the write buffer before we are ready with it.
1564 */
1565 
my_b_append(IO_CACHE * info,const uchar * Buffer,size_t Count)1566 int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count)
1567 {
1568   size_t rest_length,length;
1569 
1570   MEM_CHECK_DEFINED(Buffer, Count);
1571 
1572   /*
1573     Assert that we cannot come here with a shared cache. If we do one
1574     day, we might need to add a call to copy_to_read_buffer().
1575   */
1576   DBUG_ASSERT(!info->share);
1577   DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1578 
1579   lock_append_buffer(info);
1580   rest_length= (size_t) (info->write_end - info->write_pos);
1581   if (Count <= rest_length)
1582     goto end;
1583   memcpy(info->write_pos, Buffer, rest_length);
1584   Buffer+=rest_length;
1585   Count-=rest_length;
1586   info->write_pos+=rest_length;
1587   if (my_b_flush_io_cache(info,0))
1588   {
1589     unlock_append_buffer(info);
1590     return 1;
1591   }
1592   if (Count >= IO_SIZE)
1593   {					/* Fill first intern buffer */
1594     length= IO_ROUND_DN(Count);
1595     if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
1596     {
1597       unlock_append_buffer(info);
1598       return info->error= -1;
1599     }
1600     Count-=length;
1601     Buffer+=length;
1602     info->end_of_file+=length;
1603   }
1604 
1605 end:
1606   memcpy(info->write_pos,Buffer,(size_t) Count);
1607   info->write_pos+=Count;
1608   unlock_append_buffer(info);
1609   return 0;
1610 }
1611 
1612 
my_b_safe_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1613 int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1614 {
1615   /*
1616     Sasha: We are not writing this with the ? operator to avoid hitting
1617     a possible compiler bug. At least gcc 2.95 cannot deal with
1618     several layers of ternary operators that evaluated comma(,) operator
1619     expressions inside - I do have a test case if somebody wants it
1620   */
1621   if (info->type == SEQ_READ_APPEND)
1622     return my_b_append(info, Buffer, Count);
1623   return my_b_write(info, Buffer, Count);
1624 }
1625 
1626 
1627 /*
1628   Write a block to disk where part of the data may be inside the record
1629   buffer.  As all write calls to the data goes through the cache,
1630   we will never get a seek over the end of the buffer
1631 */
1632 
my_block_write(IO_CACHE * info,const uchar * Buffer,size_t Count,my_off_t pos)1633 int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
1634 		   my_off_t pos)
1635 {
1636   size_t length;
1637   int error=0;
1638 
1639   /*
1640     Assert that we cannot come here with a shared cache. If we do one
1641     day, we might need to add a call to copy_to_read_buffer().
1642   */
1643   DBUG_ASSERT(!info->share);
1644   DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1645 
1646   if (pos < info->pos_in_file)
1647   {
1648     /* Of no overlap, write everything without buffering */
1649     if (pos + Count <= info->pos_in_file)
1650       return (int)mysql_file_pwrite(info->file, Buffer, Count, pos,
1651 		                    info->myflags | MY_NABP);
1652     /* Write the part of the block that is before buffer */
1653     length= (uint) (info->pos_in_file - pos);
1654     if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1655       info->error= error= -1;
1656     Buffer+=length;
1657     pos+=  length;
1658     Count-= length;
1659   }
1660 
1661   /* Check if we want to write inside the used part of the buffer.*/
1662   length= (size_t) (info->write_end - info->buffer);
1663   if (pos < info->pos_in_file + length)
1664   {
1665     size_t offset= (size_t) (pos - info->pos_in_file);
1666     length-=offset;
1667     if (length > Count)
1668       length=Count;
1669     memcpy(info->buffer+offset, Buffer, length);
1670     Buffer+=length;
1671     Count-= length;
1672     /* Fix length of buffer if the new data was larger */
1673     if (info->buffer+length > info->write_pos)
1674       info->write_pos=info->buffer+length;
1675     if (!Count)
1676       return (error);
1677   }
1678   /* Write at the end of the current buffer; This is the normal case */
1679   if (_my_b_write(info, Buffer, Count))
1680     error= -1;
1681   return error;
1682 }
1683 
1684 
1685 	/* Flush write cache */
1686 
1687 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1688   lock_append_buffer(info);
1689 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1690   unlock_append_buffer(info);
1691 
my_b_flush_io_cache(IO_CACHE * info,int need_append_buffer_lock)1692 int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1693 {
1694   size_t length;
1695   my_bool append_cache= (info->type == SEQ_READ_APPEND);
1696   DBUG_ENTER("my_b_flush_io_cache");
1697   DBUG_PRINT("enter", ("cache: %p",  info));
1698 
1699   if (!append_cache)
1700     need_append_buffer_lock= 0;
1701 
1702   if (info->type == WRITE_CACHE || append_cache)
1703   {
1704     if (info->file == -1)
1705     {
1706       if (real_open_cached_file(info))
1707 	DBUG_RETURN((info->error= -1));
1708     }
1709     LOCK_APPEND_BUFFER;
1710 
1711     if ((length=(size_t) (info->write_pos - info->write_buffer)))
1712     {
1713       if (append_cache)
1714       {
1715         if (mysql_file_write(info->file, info->write_buffer, length,
1716                              info->myflags | MY_NABP))
1717         {
1718           info->error= -1;
1719           DBUG_RETURN(-1);
1720         }
1721         info->end_of_file+= info->write_pos - info->append_read_pos;
1722         info->append_read_pos= info->write_buffer;
1723         DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
1724       }
1725       else
1726       {
1727         int res= info->write_function(info, info->write_buffer, length);
1728         if (res)
1729           DBUG_RETURN(res);
1730 
1731         set_if_bigger(info->end_of_file, info->pos_in_file);
1732       }
1733       info->write_end= (info->write_buffer + info->buffer_length -
1734                         ((info->pos_in_file + length) & (IO_SIZE - 1)));
1735       info->write_pos= info->write_buffer;
1736       ++info->disk_writes;
1737       UNLOCK_APPEND_BUFFER;
1738       DBUG_RETURN(info->error);
1739     }
1740   }
1741   UNLOCK_APPEND_BUFFER;
1742   DBUG_RETURN(0);
1743 }
1744 
1745 /*
1746   Free an IO_CACHE object
1747 
1748   SYNOPSOS
1749     end_io_cache()
1750     info		IO_CACHE Handle to free
1751 
1752   NOTES
1753     It's currently safe to call this if one has called init_io_cache()
1754     on the 'info' object, even if init_io_cache() failed.
1755     This function is also safe to call twice with the same handle.
1756     Note that info->file is not reset as the caller may still use ut for my_close()
1757 
1758   RETURN
1759    0  ok
1760    #  Error
1761 */
1762 
end_io_cache(IO_CACHE * info)1763 int end_io_cache(IO_CACHE *info)
1764 {
1765   int error=0;
1766   DBUG_ENTER("end_io_cache");
1767   DBUG_PRINT("enter",("cache: %p", info));
1768 
1769   /*
1770     Every thread must call remove_io_thread(). The last one destroys
1771     the share elements.
1772   */
1773   DBUG_ASSERT(!info->share || !info->share->total_threads);
1774 
1775   if (info->alloced_buffer)
1776   {
1777     info->alloced_buffer=0;
1778     if (info->file != -1)			/* File doesn't exist */
1779       error= my_b_flush_io_cache(info,1);
1780     my_free(info->buffer);
1781     info->buffer=info->read_pos=(uchar*) 0;
1782   }
1783   if (info->type == SEQ_READ_APPEND)
1784   {
1785     /* Destroy allocated mutex */
1786     mysql_mutex_destroy(&info->append_buffer_lock);
1787   }
1788   info->share= 0;
1789   info->type= TYPE_NOT_SET;                  /* Ensure that flush_io_cache() does nothing */
1790   info->write_end= 0;                        /* Ensure that my_b_write() fails */
1791   info->write_function= 0;                   /* my_b_write will crash if used */
1792   DBUG_RETURN(error);
1793 } /* end_io_cache */
1794 
1795 
1796 /**********************************************************************
1797  Testing of MF_IOCACHE
1798 **********************************************************************/
1799 
1800 #ifdef MAIN
1801 
1802 #include <my_dir.h>
1803 
die(const char * fmt,...)1804 void die(const char* fmt, ...)
1805 {
1806   va_list va_args;
1807   va_start(va_args,fmt);
1808   fprintf(stderr,"Error:");
1809   vfprintf(stderr, fmt,va_args);
1810   fprintf(stderr,", errno=%d\n", errno);
1811   va_end(va_args);
1812   exit(1);
1813 }
1814 
open_file(const char * fname,IO_CACHE * info,int cache_size)1815 int open_file(const char* fname, IO_CACHE* info, int cache_size)
1816 {
1817   int fd;
1818   if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1819     die("Could not open %s", fname);
1820   if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1821     die("failed in init_io_cache()");
1822   return fd;
1823 }
1824 
close_file(IO_CACHE * info)1825 void close_file(IO_CACHE* info)
1826 {
1827   end_io_cache(info);
1828   my_close(info->file, MYF(MY_WME));
1829 }
1830 
main(int argc,char ** argv)1831 int main(int argc, char** argv)
1832 {
1833   IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1834   MY_STAT status;
1835   const char* fname="/tmp/iocache.test";
1836   int cache_size=16384;
1837   char llstr_buf[22];
1838   int max_block,total_bytes=0;
1839   int i,num_loops=100,error=0;
1840   char *p;
1841   char* block, *block_end;
1842   MY_INIT(argv[0]);
1843   max_block = cache_size*3;
1844   if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1845     die("Not enough memory to allocate test block");
1846   block_end = block + max_block;
1847   for (p = block,i=0; p < block_end;i++)
1848   {
1849     *p++ = (char)i;
1850   }
1851   if (my_stat(fname,&status, MYF(0)) &&
1852       my_delete(fname,MYF(MY_WME)))
1853     {
1854       die("Delete of %s failed, aborting", fname);
1855     }
1856   open_file(fname,&sra_cache, cache_size);
1857   for (i = 0; i < num_loops; i++)
1858   {
1859     char buf[4];
1860     int block_size = abs(rand() % max_block);
1861     int4store(buf, block_size);
1862     if (my_b_append(&sra_cache,buf,4) ||
1863 	my_b_append(&sra_cache, block, block_size))
1864       die("write failed");
1865     total_bytes += 4+block_size;
1866   }
1867   close_file(&sra_cache);
1868   my_free(block);
1869   if (!my_stat(fname,&status,MYF(MY_WME)))
1870     die("%s failed to stat, but I had just closed it,\
1871  wonder how that happened");
1872   printf("Final size of %s is %s, wrote %d bytes\n",fname,
1873 	 llstr(status.st_size,llstr_buf),
1874 	 total_bytes);
1875   my_delete(fname, MYF(MY_WME));
1876   /* check correctness of tests */
1877   if (total_bytes != status.st_size)
1878   {
1879     fprintf(stderr,"Not the same number of bytes actually  in file as bytes \
1880 supposedly written\n");
1881     error=1;
1882   }
1883   exit(error);
1884   return 0;
1885 }
1886 #endif
1887