1 /*
2 Copyright (c) 2000, 2011, Oracle and/or its affiliates
3 Copyright (c) 2010, 2020, MariaDB
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
17
18 /*
19 Cashing of files with only does (sequential) read or writes of fixed-
20 length records. A read isn't allowed to go over file-length. A read is ok
21 if it ends at file-length and next read can try to read after file-length
22 (and get a EOF-error).
23 Possibly use of asyncronic io.
24 macros for read and writes for faster io.
25 Used instead of FILE when reading or writing whole files.
26 One can change info->pos_in_file to a higher value to skip bytes in file if
27 also info->read_pos is set to info->read_end.
28 If called through open_cached_file(), then the temporary file will
29 only be created if a write exeeds the file buffer or if one calls
30 my_b_flush_io_cache().
31
32 If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
33 reading and another for writing. Reads are first done from disk and
34 then done from the write buffer. This is an efficient way to read
35 from a log file when one is writing to it at the same time.
36 For this to work, the file has to be opened in append mode!
37 Note that when one uses SEQ_READ_APPEND, one MUST write using
38 my_b_append ! This is needed because we need to lock the mutex
39 every time we access the write buffer.
40
41 TODO:
42 When one SEQ_READ_APPEND and we are reading and writing at the same time,
43 each time the write buffer gets full and it's written to disk, we will
44 always do a disk read to read a part of the buffer from disk to the
45 read buffer.
46 This should be fixed so that when we do a my_b_flush_io_cache() and
47 we have been reading the write buffer, we should transfer the rest of the
48 write buffer to the read buffer before we start to reuse it.
49 */
50
51 #include "mysys_priv.h"
52 #include <m_string.h>
53 #ifdef HAVE_AIOWAIT
54 #include "mysys_err.h"
55 static void my_aiowait(my_aio_result *result);
56 #endif
57 #include <errno.h>
58
59 #define lock_append_buffer(info) \
60 mysql_mutex_lock(&(info)->append_buffer_lock)
61 #define unlock_append_buffer(info) \
62 mysql_mutex_unlock(&(info)->append_buffer_lock)
63
64 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
65 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
66
67 static int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count);
68 static int _my_b_cache_read_r(IO_CACHE *info, uchar *Buffer, size_t Count);
69 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count);
70 static int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count);
71 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count);
72
73 int (*_my_b_encr_read)(IO_CACHE *info,uchar *Buffer,size_t Count)= 0;
74 int (*_my_b_encr_write)(IO_CACHE *info,const uchar *Buffer,size_t Count)= 0;
75
76
77
78 static void
init_functions(IO_CACHE * info)79 init_functions(IO_CACHE* info)
80 {
81 enum cache_type type= info->type;
82 info->read_function = 0; /* Force a core if used */
83 info->write_function = 0; /* Force a core if used */
84 switch (type) {
85 case READ_NET:
86 /*
87 Must be initialized by the caller. The problem is that
88 _my_b_net_read has to be defined in sql directory because of
89 the dependency on THD, and therefore cannot be visible to
90 programs that link against mysys but know nothing about THD, such
91 as myisamchk
92 */
93 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
94 break;
95 case SEQ_READ_APPEND:
96 info->read_function = _my_b_seq_read;
97 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
98 break;
99 case READ_CACHE:
100 if (info->myflags & MY_ENCRYPT)
101 {
102 DBUG_ASSERT(info->share == 0);
103 info->read_function = _my_b_encr_read;
104 break;
105 }
106 /* fall through */
107 case WRITE_CACHE:
108 if (info->myflags & MY_ENCRYPT)
109 {
110 info->write_function = _my_b_encr_write;
111 break;
112 }
113 /* fall through */
114 case READ_FIFO:
115 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
116 info->read_function = info->share ? _my_b_cache_read_r : _my_b_cache_read;
117 info->write_function = info->share ? _my_b_cache_write_r : _my_b_cache_write;
118 break;
119 case TYPE_NOT_SET:
120 DBUG_ASSERT(0);
121 break;
122 }
123 if (type == READ_CACHE || type == WRITE_CACHE || type == SEQ_READ_APPEND)
124 info->myflags|= MY_FULL_IO;
125 else
126 info->myflags&= ~MY_FULL_IO;
127 }
128
129
130 /*
131 Initialize an IO_CACHE object
132
133 SYNOPSOS
134 init_io_cache()
135 info cache handler to initialize
136 file File that should be associated to to the handler
137 If == -1 then real_open_cached_file()
138 will be called when it's time to open file.
139 cachesize Size of buffer to allocate for read/write
140 If == 0 then use my_default_record_cache_size
141 type Type of cache
142 seek_offset Where cache should start reading/writing
143 use_async_io Set to 1 of we should use async_io (if available)
144 cache_myflags Bitmap of different flags
145 MY_WME | MY_FAE | MY_NABP | MY_FNABP |
146 MY_DONT_CHECK_FILESIZE
147
148 RETURN
149 0 ok
150 # error
151 */
152
init_io_cache(IO_CACHE * info,File file,size_t cachesize,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,myf cache_myflags)153 int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
154 enum cache_type type, my_off_t seek_offset,
155 my_bool use_async_io, myf cache_myflags)
156 {
157 size_t min_cache;
158 my_off_t pos;
159 my_off_t end_of_file= ~(my_off_t) 0;
160 DBUG_ENTER("init_io_cache");
161 DBUG_PRINT("enter",("cache:%p type: %d pos: %llu",
162 info, (int) type, (ulonglong) seek_offset));
163
164 info->file= file;
165 info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
166 info->pos_in_file= seek_offset;
167 info->alloced_buffer = 0;
168 info->buffer=0;
169 info->seek_not_done= 0;
170 info->next_file_user= NULL;
171
172 if (file >= 0)
173 {
174 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
175 pos= mysql_file_tell(file, MYF(0));
176 if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
177 {
178 /*
179 This kind of object doesn't support seek() or tell(). Don't set a
180 seek_not_done that will make us again try to seek() later and fail.
181
182 Additionally, if we're supposed to start somewhere other than the
183 the beginning of whatever this file is, then somebody made a bad
184 assumption.
185 */
186 DBUG_ASSERT(seek_offset == 0);
187 }
188 else
189 info->seek_not_done= MY_TEST(seek_offset != pos);
190 }
191 else
192 if (type == WRITE_CACHE && _my_b_encr_read)
193 {
194 cache_myflags|= MY_ENCRYPT;
195 DBUG_ASSERT(seek_offset == 0);
196 }
197
198 info->disk_writes= 0;
199 info->share=0;
200
201 if (!cachesize && !(cachesize= my_default_record_cache_size))
202 DBUG_RETURN(1); /* No cache requested */
203 min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
204 if (type == READ_CACHE || type == SEQ_READ_APPEND)
205 { /* Assume file isn't growing */
206 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
207 if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
208 {
209 /* Calculate end of file to avoid allocating oversized buffers */
210 end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
211 /* Need to reset seek_not_done now that we just did a seek. */
212 info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
213 if (end_of_file < seek_offset)
214 end_of_file=seek_offset;
215 /* Trim cache size if the file is very small */
216 if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
217 {
218 cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
219 use_async_io=0; /* No need to use async */
220 }
221 }
222 }
223 cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
224 if (type != READ_NET)
225 {
226 /* Retry allocating memory in smaller blocks until we get one */
227 cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
228 for (;;)
229 {
230 size_t buffer_block;
231 /*
232 Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
233 MY_ZEROFILL.
234 */
235 myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
236
237 if (cachesize < min_cache)
238 cachesize = min_cache;
239 buffer_block= cachesize;
240 if (type == SEQ_READ_APPEND)
241 buffer_block *= 2;
242 else if (cache_myflags & MY_ENCRYPT)
243 buffer_block= 2*(buffer_block + MY_AES_BLOCK_SIZE) + sizeof(IO_CACHE_CRYPT);
244 if (cachesize == min_cache)
245 flags|= (myf) MY_WME;
246
247 if ((info->buffer= (uchar*) my_malloc(buffer_block, flags)) != 0)
248 {
249 if (type == SEQ_READ_APPEND)
250 info->write_buffer= info->buffer + cachesize;
251 else
252 info->write_buffer= info->buffer;
253 info->alloced_buffer= buffer_block;
254 break; /* Enough memory found */
255 }
256 if (cachesize == min_cache)
257 DBUG_RETURN(2); /* Can't alloc cache */
258 /* Try with less memory */
259 cachesize= (cachesize*3/4 & ~(min_cache-1));
260 }
261 }
262
263 DBUG_PRINT("info",("init_io_cache: cachesize = %lu", (ulong) cachesize));
264 info->read_length=info->buffer_length=cachesize;
265 info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
266 info->request_pos= info->read_pos= info->write_pos = info->buffer;
267 if (type == SEQ_READ_APPEND)
268 {
269 info->append_read_pos = info->write_pos = info->write_buffer;
270 info->write_end = info->write_buffer + info->buffer_length;
271 mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
272 &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
273 }
274 #if defined(SAFE_MUTEX)
275 else
276 {
277 /* Clear mutex so that safe_mutex will notice that it's not initialized */
278 bzero((char*) &info->append_buffer_lock, sizeof(info->append_buffer_lock));
279 }
280 #endif
281
282 if (type == WRITE_CACHE)
283 info->write_end=
284 info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
285 else
286 info->read_end=info->buffer; /* Nothing in cache */
287
288 /* End_of_file may be changed by user later */
289 info->end_of_file= end_of_file;
290 info->error=0;
291 info->type= type;
292 init_functions(info);
293 #ifdef HAVE_AIOWAIT
294 if (use_async_io && ! my_disable_async_io)
295 {
296 DBUG_PRINT("info",("Using async io"));
297 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
298 info->read_length/=2;
299 info->read_function=_my_b_async_read;
300 }
301 info->inited=info->aio_result.pending=0;
302 #endif
303 DBUG_RETURN(0);
304 } /* init_io_cache */
305
306
307
308 /*
309 Initialize the slave IO_CACHE to read the same file (and data)
310 as master does.
311
312 One can create multiple slaves from a single master. Every slave and master
313 will have independent file positions.
314
315 The master must be a non-shared READ_CACHE.
316 It is assumed that no more reads are done after a master and/or a slave
317 has been freed (this limitation can be easily lifted).
318 */
319
init_slave_io_cache(IO_CACHE * master,IO_CACHE * slave)320 int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
321 {
322 uchar *slave_buf;
323 DBUG_ASSERT(master->type == READ_CACHE);
324 DBUG_ASSERT(!master->share);
325 DBUG_ASSERT(master->alloced_buffer);
326
327 if (!(slave_buf= (uchar*)my_malloc(master->alloced_buffer, MYF(0))))
328 {
329 return 1;
330 }
331 memcpy(slave, master, sizeof(IO_CACHE));
332 slave->buffer= slave_buf;
333
334 memcpy(slave->buffer, master->buffer, master->alloced_buffer);
335 slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
336 slave->read_end= slave->buffer + (master->read_end - master->buffer);
337
338 if (master->next_file_user)
339 {
340 IO_CACHE *p;
341 for (p= master->next_file_user;
342 p->next_file_user !=master;
343 p= p->next_file_user)
344 {}
345
346 p->next_file_user= slave;
347 slave->next_file_user= master;
348 }
349 else
350 {
351 slave->next_file_user= master;
352 master->next_file_user= slave;
353 }
354 return 0;
355 }
356
357
end_slave_io_cache(IO_CACHE * cache)358 void end_slave_io_cache(IO_CACHE *cache)
359 {
360 /* Remove the cache from the next_file_user circular linked list. */
361 if (cache->next_file_user != cache)
362 {
363 IO_CACHE *p= cache->next_file_user;
364 while (p->next_file_user != cache)
365 p= p->next_file_user;
366 p->next_file_user= cache->next_file_user;
367
368 }
369 my_free(cache->buffer);
370 }
371
372 /*
373 Seek a read io cache to a given offset
374 */
seek_io_cache(IO_CACHE * cache,my_off_t needed_offset)375 void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
376 {
377 my_off_t cached_data_start= cache->pos_in_file;
378 my_off_t cached_data_end= cache->pos_in_file + (cache->read_end -
379 cache->buffer);
380
381 if (needed_offset >= cached_data_start &&
382 needed_offset < cached_data_end)
383 {
384 /*
385 The offset we're seeking to is in the buffer.
386 Move buffer's read position accordingly
387 */
388 cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
389 }
390 else
391 {
392 if (needed_offset > cache->end_of_file)
393 needed_offset= cache->end_of_file;
394 /*
395 The offset we're seeking to is not in the buffer.
396 - Set the buffer to be exhausted.
397 - Make the next read to a mysql_file_seek() call to the required
398 offset.
399 TODO(cvicentiu, spetrunia) properly implement aligned seeks for
400 efficiency.
401 */
402 cache->seek_not_done= 1;
403 cache->pos_in_file= needed_offset;
404 /* When reading it must appear as if we've started from the offset
405 that we've seeked here. We must let _my_b_cache_read assume that
406 by implying "no reading starting from pos_in_file" has happened. */
407 cache->read_pos= cache->buffer;
408 cache->read_end= cache->buffer;
409 }
410 }
411
412 /* Wait until current request is ready */
413
414 #ifdef HAVE_AIOWAIT
my_aiowait(my_aio_result * result)415 static void my_aiowait(my_aio_result *result)
416 {
417 if (result->pending)
418 {
419 struct aio_result_t *tmp;
420 for (;;)
421 {
422 if ((int) (tmp=aiowait((struct timeval *) 0)) == -1)
423 {
424 if (errno == EINTR)
425 continue;
426 DBUG_PRINT("error",("No aio request, error: %d",errno));
427 result->pending=0; /* Assume everything is ok */
428 break;
429 }
430 ((my_aio_result*) tmp)->pending=0;
431 if ((my_aio_result*) tmp == result)
432 break;
433 }
434 }
435 return;
436 }
437 #endif
438
439
440 /*
441 Use this to reset cache to re-start reading or to change the type
442 between READ_CACHE <-> WRITE_CACHE
443 If we are doing a reinit of a cache where we have the start of the file
444 in the cache, we are reusing this memory without flushing it to disk.
445 */
446
reinit_io_cache(IO_CACHE * info,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,my_bool clear_cache)447 my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
448 my_off_t seek_offset,
449 my_bool use_async_io __attribute__((unused)),
450 my_bool clear_cache)
451 {
452 DBUG_ENTER("reinit_io_cache");
453 DBUG_PRINT("enter",("cache:%p type: %d seek_offset: %llu clear_cache: %d",
454 info, type, (ulonglong) seek_offset,
455 (int) clear_cache));
456
457 DBUG_ASSERT(type == READ_CACHE || type == WRITE_CACHE);
458 DBUG_ASSERT(info->type == READ_CACHE || info->type == WRITE_CACHE);
459
460 /* If the whole file is in memory, avoid flushing to disk */
461 if (! clear_cache &&
462 seek_offset >= info->pos_in_file &&
463 seek_offset <= my_b_tell(info))
464 {
465 /* Reuse current buffer without flushing it to disk */
466 uchar *pos;
467 if (info->type == WRITE_CACHE && type == READ_CACHE)
468 {
469 info->read_end=info->write_pos;
470 info->end_of_file=my_b_tell(info);
471 /*
472 Trigger a new seek only if we have a valid
473 file handle.
474 */
475 info->seek_not_done= (info->file != -1);
476 }
477 else if (type == WRITE_CACHE)
478 {
479 if (info->type == READ_CACHE)
480 {
481 info->write_end=info->write_buffer+info->buffer_length;
482 info->seek_not_done=1;
483 }
484 info->end_of_file = ~(my_off_t) 0;
485 }
486 pos=info->request_pos+(seek_offset-info->pos_in_file);
487 if (type == WRITE_CACHE)
488 info->write_pos=pos;
489 else
490 info->read_pos= pos;
491 #ifdef HAVE_AIOWAIT
492 my_aiowait(&info->aio_result); /* Wait for outstanding req */
493 #endif
494 }
495 else
496 {
497 /*
498 If we change from WRITE_CACHE to READ_CACHE, assume that everything
499 after the current positions should be ignored. In other cases we
500 update end_of_file as it may have changed since last init.
501 */
502 if (type == READ_CACHE)
503 {
504 if (info->type == WRITE_CACHE)
505 info->end_of_file= my_b_tell(info);
506 else
507 {
508 if (!(info->myflags & MY_ENCRYPT))
509 info->end_of_file= mysql_file_seek(info->file, 0L,
510 MY_SEEK_END, MYF(0));
511 }
512 }
513 /* flush cache if we want to reuse it */
514 if (!clear_cache && my_b_flush_io_cache(info,1))
515 DBUG_RETURN(1);
516 info->pos_in_file=seek_offset;
517 /* Better to do always do a seek */
518 info->seek_not_done=1;
519 info->request_pos=info->read_pos=info->write_pos=info->buffer;
520 if (type == READ_CACHE)
521 {
522 info->read_end=info->buffer; /* Nothing in cache */
523 }
524 else
525 {
526 if (info->myflags & MY_ENCRYPT)
527 {
528 info->write_end = info->write_buffer + info->buffer_length;
529 if (seek_offset && info->file != -1)
530 {
531 info->read_end= info->buffer;
532 _my_b_encr_read(info, 0, 0); /* prefill the buffer */
533 info->write_pos= info->read_pos;
534 info->seek_not_done=1;
535 }
536 }
537 else
538 {
539 info->write_end=(info->buffer + info->buffer_length -
540 (seek_offset & (IO_SIZE-1)));
541 }
542 info->end_of_file= ~(my_off_t) 0;
543 }
544 }
545 info->type=type;
546 info->error=0;
547 init_functions(info);
548
549 #ifdef HAVE_AIOWAIT
550 if (use_async_io && ! my_disable_async_io &&
551 ((ulong) info->buffer_length <
552 (ulong) (info->end_of_file - seek_offset)))
553 {
554 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
555 info->read_length=info->buffer_length/2;
556 info->read_function=_my_b_async_read;
557 }
558 info->inited=0;
559 #endif
560 DBUG_RETURN(0);
561 } /* reinit_io_cache */
562
563
_my_b_read(IO_CACHE * info,uchar * Buffer,size_t Count)564 int _my_b_read(IO_CACHE *info, uchar *Buffer, size_t Count)
565 {
566 size_t left_length;
567 int res;
568
569 /* If the buffer is not empty yet, copy what is available. */
570 if ((left_length= (size_t) (info->read_end - info->read_pos)))
571 {
572 DBUG_ASSERT(Count > left_length);
573 memcpy(Buffer, info->read_pos, left_length);
574 Buffer+=left_length;
575 Count-=left_length;
576 }
577 res= info->read_function(info, Buffer, Count);
578 if (res && info->error >= 0)
579 info->error+= (int)left_length; /* update number or read bytes */
580 return res;
581 }
582
_my_b_write(IO_CACHE * info,const uchar * Buffer,size_t Count)583 int _my_b_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
584 {
585 size_t rest_length;
586 int res;
587
588 /* Always use my_b_flush_io_cache() to flush write_buffer! */
589 DBUG_ASSERT(Buffer != info->write_buffer);
590
591 if (info->pos_in_file + info->buffer_length > info->end_of_file)
592 {
593 my_errno=errno=EFBIG;
594 return info->error = -1;
595 }
596
597 rest_length= (size_t) (info->write_end - info->write_pos);
598 DBUG_ASSERT(Count >= rest_length);
599 memcpy(info->write_pos, Buffer, (size_t) rest_length);
600 Buffer+=rest_length;
601 Count-=rest_length;
602 info->write_pos+=rest_length;
603
604 if (my_b_flush_io_cache(info, 1))
605 return 1;
606
607 if (Count)
608 {
609 my_off_t old_pos_in_file= info->pos_in_file;
610 res= info->write_function(info, Buffer, Count);
611 Count-= (size_t) (info->pos_in_file - old_pos_in_file);
612 Buffer+= info->pos_in_file - old_pos_in_file;
613 }
614 else
615 res= 0;
616
617 if (!res && Count)
618 {
619 memcpy(info->write_pos, Buffer, Count);
620 info->write_pos+= Count;
621 }
622 return res;
623 }
624
625 /*
626 Read buffered.
627
628 SYNOPSIS
629 _my_b_cache_read()
630 info IO_CACHE pointer
631 Buffer Buffer to retrieve count bytes from file
632 Count Number of bytes to read into Buffer
633
634 NOTE
635 This function is only called from the my_b_read() macro when there
636 isn't enough characters in the buffer to satisfy the request.
637
638 WARNING
639
640 When changing this function, be careful with handling file offsets
641 (end-of_file, pos_in_file). Do not cast them to possibly smaller
642 types than my_off_t unless you can be sure that their value fits.
643 Same applies to differences of file offsets.
644
645 When changing this function, check _my_b_cache_read_r(). It might need the
646 same change.
647
648 RETURN
649 0 we succeeded in reading all data
650 1 Error: couldn't read requested characters. In this case:
651 If info->error == -1, we got a read error.
652 Otherwise info->error contains the number of bytes in Buffer.
653 */
654
_my_b_cache_read(IO_CACHE * info,uchar * Buffer,size_t Count)655 int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
656 {
657 size_t length= 0, diff_length, left_length= 0, max_length;
658 my_off_t pos_in_file;
659 DBUG_ENTER("_my_b_cache_read");
660
661 /* pos_in_file always point on where info->buffer was read */
662 pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
663
664 /*
665 Whenever a function which operates on IO_CACHE flushes/writes
666 some part of the IO_CACHE to disk it will set the property
667 "seek_not_done" to indicate this to other functions operating
668 on the IO_CACHE.
669 */
670 if (info->seek_not_done)
671 {
672 if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0))
673 != MY_FILEPOS_ERROR))
674 {
675 /* No error, reset seek_not_done flag. */
676 info->seek_not_done= 0;
677
678 if (info->next_file_user)
679 {
680 IO_CACHE *c;
681 for (c= info->next_file_user;
682 c!= info;
683 c= c->next_file_user)
684 {
685 c->seek_not_done= 1;
686 }
687 }
688 }
689 else
690 {
691 /*
692 If the seek failed and the error number is ESPIPE, it is because
693 info->file is a pipe or socket or FIFO. We never should have tried
694 to seek on that. See Bugs#25807 and #22828 for more info.
695 */
696 DBUG_ASSERT(my_errno != ESPIPE);
697 info->error= -1;
698 DBUG_RETURN(1);
699 }
700 }
701
702 /*
703 Calculate, how much we are within a IO_SIZE block. Ideally this
704 should be zero.
705 */
706 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
707
708 /*
709 If more than a block plus the rest of the current block is wanted,
710 we do read directly, without filling the buffer.
711 */
712 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
713 { /* Fill first intern buffer */
714 size_t read_length;
715 if (info->end_of_file <= pos_in_file)
716 {
717 /* End of file. Return, what we did copy from the buffer. */
718 info->error= (int) left_length;
719 info->seek_not_done=1;
720 DBUG_RETURN(1);
721 }
722 /*
723 Crop the wanted count to a multiple of IO_SIZE and subtract,
724 what we did already read from a block. That way, the read will
725 end aligned with a block.
726 */
727 length= IO_ROUND_DN(Count) - diff_length;
728 if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
729 != length)
730 {
731 /*
732 If we didn't get, what we wanted, we either return -1 for a read
733 error, or (it's end of file), how much we got in total.
734 */
735 info->error= (read_length == (size_t) -1 ? -1 :
736 (int) (read_length+left_length));
737 info->seek_not_done=1;
738 DBUG_RETURN(1);
739 }
740 Count-=length;
741 Buffer+=length;
742 pos_in_file+=length;
743 left_length+=length;
744 diff_length=0;
745 }
746
747 /*
748 At this point, we want less than one and a partial block.
749 We will read a full cache, minus the number of bytes, we are
750 within a block already. So we will reach new alignment.
751 */
752 max_length= info->read_length-diff_length;
753 /* We will not read past end of file. */
754 if (info->type != READ_FIFO &&
755 max_length > (info->end_of_file - pos_in_file))
756 max_length= (size_t) (info->end_of_file - pos_in_file);
757 /*
758 If there is nothing left to read,
759 we either are done, or we failed to fulfill the request.
760 Otherwise, we read max_length into the cache.
761 */
762 if (!max_length)
763 {
764 if (Count)
765 {
766 /* We couldn't fulfil the request. Return, how much we got. */
767 info->error= (int) left_length;
768 DBUG_RETURN(1);
769 }
770 else
771 {
772 info->error= 0;
773 if (length == 0) /* nothing was read */
774 DBUG_RETURN(0); /* EOF */
775
776 length= 0; /* non-zero size read was done */
777 }
778 }
779 else
780 {
781 if (info->next_file_user)
782 {
783 IO_CACHE *c;
784 for (c= info->next_file_user;
785 c!= info;
786 c= c->next_file_user)
787 {
788 c->seek_not_done= 1;
789 }
790 }
791 if ((length= mysql_file_read(info->file,info->buffer, max_length,
792 info->myflags)) < Count ||
793 length == (size_t) -1)
794 {
795 /*
796 We got an read error, or less than requested (end of file).
797 If not a read error, copy, what we got.
798 */
799 if (length != (size_t) -1)
800 memcpy(Buffer, info->buffer, length);
801 info->pos_in_file= pos_in_file;
802 /* For a read error, return -1, otherwise, what we got in total. */
803 info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
804 info->read_pos=info->read_end=info->buffer;
805 info->seek_not_done=1;
806 DBUG_RETURN(1);
807 }
808 }
809 /*
810 Count is the remaining number of bytes requested.
811 length is the amount of data in the cache.
812 Read Count bytes from the cache.
813 */
814 info->read_pos=info->buffer+Count;
815 info->read_end=info->buffer+length;
816 info->pos_in_file=pos_in_file;
817 if (Count)
818 memcpy(Buffer, info->buffer, Count);
819 DBUG_RETURN(0);
820 }
821
822
823 /*
824 Prepare IO_CACHE for shared use.
825
826 SYNOPSIS
827 init_io_cache_share()
828 read_cache A read cache. This will be copied for
829 every thread after setup.
830 cshare The share.
831 write_cache If non-NULL a write cache that is to be
832 synchronized with the read caches.
833 num_threads Number of threads sharing the cache
834 including the write thread if any.
835
836 DESCRIPTION
837
838 The shared cache is used so: One IO_CACHE is initialized with
839 init_io_cache(). This includes the allocation of a buffer. Then a
840 share is allocated and init_io_cache_share() is called with the io
841 cache and the share. Then the io cache is copied for each thread. So
842 every thread has its own copy of IO_CACHE. But the allocated buffer
843 is shared because cache->buffer is the same for all caches.
844
845 One thread reads data from the file into the buffer. All threads
846 read from the buffer, but every thread maintains its own set of
847 pointers into the buffer. When all threads have used up the buffer
848 contents, one of the threads reads the next block of data into the
849 buffer. To accomplish this, each thread enters the cache lock before
850 accessing the buffer. They wait in lock_io_cache() until all threads
851 joined the lock. The last thread entering the lock is in charge of
852 reading from file to buffer. It wakes all threads when done.
853
854 Synchronizing a write cache to the read caches works so: Whenever
855 the write buffer needs a flush, the write thread enters the lock and
856 waits for all other threads to enter the lock too. They do this when
857 they have used up the read buffer. When all threads are in the lock,
858 the write thread copies the write buffer to the read buffer and
859 wakes all threads.
860
861 share->running_threads is the number of threads not being in the
862 cache lock. When entering lock_io_cache() the number is decreased.
863 When the thread that fills the buffer enters unlock_io_cache() the
864 number is reset to the number of threads. The condition
865 running_threads == 0 means that all threads are in the lock. Bumping
866 up the number to the full count is non-intuitive. But increasing the
867 number by one for each thread that leaves the lock could lead to a
868 solo run of one thread. The last thread to join a lock reads from
869 file to buffer, wakes the other threads, processes the data in the
870 cache and enters the lock again. If no other thread left the lock
871 meanwhile, it would think it's the last one again and read the next
872 block...
873
874 The share has copies of 'error', 'buffer', 'read_end', and
875 'pos_in_file' from the thread that filled the buffer. We may not be
876 able to access this information directly from its cache because the
877 thread may be removed from the share before the variables could be
878 copied by all other threads. Or, if a write buffer is synchronized,
879 it would change its 'pos_in_file' after waking the other threads,
880 possibly before they could copy its value.
881
882 However, the 'buffer' variable in the share is for a synchronized
883 write cache. It needs to know where to put the data. Otherwise it
884 would need access to the read cache of one of the threads that is
885 not yet removed from the share.
886
887 RETURN
888 void
889 */
890
init_io_cache_share(IO_CACHE * read_cache,IO_CACHE_SHARE * cshare,IO_CACHE * write_cache,uint num_threads)891 void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
892 IO_CACHE *write_cache, uint num_threads)
893 {
894 DBUG_ENTER("init_io_cache_share");
895 DBUG_PRINT("io_cache_share", ("read_cache: %p share: %p "
896 "write_cache: %p threads: %u",
897 read_cache, cshare,
898 write_cache, num_threads));
899
900 DBUG_ASSERT(num_threads > 1);
901 DBUG_ASSERT(read_cache->type == READ_CACHE);
902 DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
903
904 mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
905 &cshare->mutex, MY_MUTEX_INIT_FAST);
906 mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
907 mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
908
909 cshare->running_threads= num_threads;
910 cshare->total_threads= num_threads;
911 cshare->error= 0; /* Initialize. */
912 cshare->buffer= read_cache->buffer;
913 cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
914 cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
915 cshare->source_cache= write_cache; /* Can be NULL. */
916
917 read_cache->share= cshare;
918 read_cache->read_function= _my_b_cache_read_r;
919
920 if (write_cache)
921 {
922 write_cache->share= cshare;
923 write_cache->write_function= _my_b_cache_write_r;
924 }
925
926 DBUG_VOID_RETURN;
927 }
928
929
930 /*
931 Remove a thread from shared access to IO_CACHE.
932
933 SYNOPSIS
934 remove_io_thread()
935 cache The IO_CACHE to be removed from the share.
936
937 NOTE
938
939 Every thread must do that on exit for not to deadlock other threads.
940
941 The last thread destroys the pthread resources.
942
943 A writer flushes its cache first.
944
945 RETURN
946 void
947 */
948
remove_io_thread(IO_CACHE * cache)949 void remove_io_thread(IO_CACHE *cache)
950 {
951 IO_CACHE_SHARE *cshare= cache->share;
952 uint total;
953 DBUG_ENTER("remove_io_thread");
954
955 /* If the writer goes, it needs to flush the write cache. */
956 if (cache == cshare->source_cache)
957 flush_io_cache(cache);
958
959 mysql_mutex_lock(&cshare->mutex);
960 DBUG_PRINT("io_cache_share", ("%s: %p",
961 (cache == cshare->source_cache) ?
962 "writer" : "reader", cache));
963
964 /* Remove from share. */
965 total= --cshare->total_threads;
966 DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
967
968 /* Detach from share. */
969 cache->share= NULL;
970
971 /* If the writer goes, let the readers know. */
972 if (cache == cshare->source_cache)
973 {
974 DBUG_PRINT("io_cache_share", ("writer leaves"));
975 cshare->source_cache= NULL;
976 }
977
978 /* If all threads are waiting for me to join the lock, wake them. */
979 if (!--cshare->running_threads)
980 {
981 DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
982 mysql_cond_signal(&cshare->cond_writer);
983 mysql_cond_broadcast(&cshare->cond);
984 }
985
986 mysql_mutex_unlock(&cshare->mutex);
987
988 if (!total)
989 {
990 DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
991 mysql_cond_destroy (&cshare->cond_writer);
992 mysql_cond_destroy (&cshare->cond);
993 mysql_mutex_destroy(&cshare->mutex);
994 }
995
996 DBUG_VOID_RETURN;
997 }
998
999
1000 /*
1001 Lock IO cache and wait for all other threads to join.
1002
1003 SYNOPSIS
1004 lock_io_cache()
1005 cache The cache of the thread entering the lock.
1006 pos File position of the block to read.
1007 Unused for the write thread.
1008
1009 DESCRIPTION
1010
1011 Wait for all threads to finish with the current buffer. We want
1012 all threads to proceed in concert. The last thread to join
1013 lock_io_cache() will read the block from file and all threads start
1014 to use it. Then they will join again for reading the next block.
1015
1016 The waiting threads detect a fresh buffer by comparing
1017 cshare->pos_in_file with the position they want to process next.
1018 Since the first block may start at position 0, we take
1019 cshare->read_end as an additional condition. This variable is
1020 initialized to NULL and will be set after a block of data is written
1021 to the buffer.
1022
1023 RETURN
1024 1 OK, lock in place, go ahead and read.
1025 0 OK, unlocked, another thread did the read.
1026 */
1027
lock_io_cache(IO_CACHE * cache,my_off_t pos)1028 static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
1029 {
1030 IO_CACHE_SHARE *cshare= cache->share;
1031 DBUG_ENTER("lock_io_cache");
1032
1033 /* Enter the lock. */
1034 mysql_mutex_lock(&cshare->mutex);
1035 cshare->running_threads--;
1036 DBUG_PRINT("io_cache_share", ("%s: %p pos: %lu running: %u",
1037 (cache == cshare->source_cache) ?
1038 "writer" : "reader", cache, (ulong) pos,
1039 cshare->running_threads));
1040
1041 if (cshare->source_cache)
1042 {
1043 /* A write cache is synchronized to the read caches. */
1044
1045 if (cache == cshare->source_cache)
1046 {
1047 /* The writer waits until all readers are here. */
1048 while (cshare->running_threads)
1049 {
1050 DBUG_PRINT("io_cache_share", ("writer waits in lock"));
1051 mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
1052 }
1053 DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
1054
1055 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1056 DBUG_RETURN(1);
1057 }
1058
1059 /* The last thread wakes the writer. */
1060 if (!cshare->running_threads)
1061 {
1062 DBUG_PRINT("io_cache_share", ("waking writer"));
1063 mysql_cond_signal(&cshare->cond_writer);
1064 }
1065
1066 /*
1067 Readers wait until the data is copied from the writer. Another
1068 reason to stop waiting is the removal of the write thread. If this
1069 happens, we leave the lock with old data in the buffer.
1070 */
1071 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1072 cshare->source_cache)
1073 {
1074 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1075 mysql_cond_wait(&cshare->cond, &cshare->mutex);
1076 }
1077
1078 /*
1079 If the writer was removed from the share while this thread was
1080 asleep, we need to simulate an EOF condition. The writer cannot
1081 reset the share variables as they might still be in use by readers
1082 of the last block. When we awake here then because the last
1083 joining thread signalled us. If the writer is not the last, it
1084 will not signal. So it is safe to clear the buffer here.
1085 */
1086 if (!cshare->read_end || (cshare->pos_in_file < pos))
1087 {
1088 DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
1089 cshare->read_end= cshare->buffer; /* Empty buffer. */
1090 cshare->error= 0; /* EOF is not an error. */
1091 }
1092 }
1093 else
1094 {
1095 /*
1096 There are read caches only. The last thread arriving in
1097 lock_io_cache() continues with a locked cache and reads the block.
1098 */
1099 if (!cshare->running_threads)
1100 {
1101 DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
1102 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1103 DBUG_RETURN(1);
1104 }
1105
1106 /*
1107 All other threads wait until the requested block is read by the
1108 last thread arriving. Another reason to stop waiting is the
1109 removal of a thread. If this leads to all threads being in the
1110 lock, we have to continue also. The first of the awaken threads
1111 will then do the read.
1112 */
1113 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1114 cshare->running_threads)
1115 {
1116 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1117 mysql_cond_wait(&cshare->cond, &cshare->mutex);
1118 }
1119
1120 /* If the block is not yet read, continue with a locked cache and read. */
1121 if (!cshare->read_end || (cshare->pos_in_file < pos))
1122 {
1123 DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
1124 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1125 DBUG_RETURN(1);
1126 }
1127
1128 /* Another thread did read the block already. */
1129 }
1130 DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
1131 (uint) (cshare->read_end ? (size_t)
1132 (cshare->read_end - cshare->buffer) :
1133 0)));
1134
1135 /*
1136 Leave the lock. Do not call unlock_io_cache() later. The thread that
1137 filled the buffer did this and marked all threads as running.
1138 */
1139 mysql_mutex_unlock(&cshare->mutex);
1140 DBUG_RETURN(0);
1141 }
1142
1143
1144 /*
1145 Unlock IO cache.
1146
1147 SYNOPSIS
1148 unlock_io_cache()
1149 cache The cache of the thread leaving the lock.
1150
1151 NOTE
1152 This is called by the thread that filled the buffer. It marks all
1153 threads as running and awakes them. This must not be done by any
1154 other thread.
1155
1156 Do not signal cond_writer. Either there is no writer or the writer
1157 is the only one who can call this function.
1158
1159 The reason for resetting running_threads to total_threads before
1160 waking all other threads is that it could be possible that this
1161 thread is so fast with processing the buffer that it enters the lock
1162 before even one other thread has left it. If every awoken thread
1163 would increase running_threads by one, this thread could think that
1164 he is again the last to join and would not wait for the other
1165 threads to process the data.
1166
1167 RETURN
1168 void
1169 */
1170
unlock_io_cache(IO_CACHE * cache)1171 static void unlock_io_cache(IO_CACHE *cache)
1172 {
1173 IO_CACHE_SHARE *cshare= cache->share;
1174 DBUG_ENTER("unlock_io_cache");
1175 DBUG_PRINT("io_cache_share", ("%s: %p pos: %lu running: %u",
1176 (cache == cshare->source_cache) ?
1177 "writer" : "reader",
1178 cache, (ulong) cshare->pos_in_file,
1179 cshare->total_threads));
1180
1181 cshare->running_threads= cshare->total_threads;
1182 mysql_cond_broadcast(&cshare->cond);
1183 mysql_mutex_unlock(&cshare->mutex);
1184 DBUG_VOID_RETURN;
1185 }
1186
1187
1188 /*
1189 Read from IO_CACHE when it is shared between several threads.
1190
1191 SYNOPSIS
1192 _my_b_cache_read_r()
1193 cache IO_CACHE pointer
1194 Buffer Buffer to retrieve count bytes from file
1195 Count Number of bytes to read into Buffer
1196
1197 NOTE
1198 This function is only called from the my_b_read() macro when there
1199 isn't enough characters in the buffer to satisfy the request.
1200
1201 IMPLEMENTATION
1202
1203 It works as follows: when a thread tries to read from a file (that
1204 is, after using all the data from the (shared) buffer), it just
1205 hangs on lock_io_cache(), waiting for other threads. When the very
1206 last thread attempts a read, lock_io_cache() returns 1, the thread
1207 does actual IO and unlock_io_cache(), which signals all the waiting
1208 threads that data is in the buffer.
1209
1210 WARNING
1211
1212 When changing this function, be careful with handling file offsets
1213 (end-of_file, pos_in_file). Do not cast them to possibly smaller
1214 types than my_off_t unless you can be sure that their value fits.
1215 Same applies to differences of file offsets. (Bug #11527)
1216
1217 When changing this function, check _my_b_cache_read(). It might need the
1218 same change.
1219
1220 RETURN
1221 0 we succeeded in reading all data
1222 1 Error: can't read requested characters
1223 */
1224
_my_b_cache_read_r(IO_CACHE * cache,uchar * Buffer,size_t Count)1225 static int _my_b_cache_read_r(IO_CACHE *cache, uchar *Buffer, size_t Count)
1226 {
1227 my_off_t pos_in_file;
1228 size_t length, diff_length, left_length= 0;
1229 IO_CACHE_SHARE *cshare= cache->share;
1230 DBUG_ENTER("_my_b_cache_read_r");
1231 DBUG_ASSERT(!(cache->myflags & MY_ENCRYPT));
1232
1233 while (Count)
1234 {
1235 size_t cnt, len;
1236
1237 pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1238 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1239 length=IO_ROUND_UP(Count+diff_length)-diff_length;
1240 length= ((length <= cache->read_length) ?
1241 length + IO_ROUND_DN(cache->read_length - length) :
1242 length - IO_ROUND_UP(length - cache->read_length));
1243 if (cache->type != READ_FIFO &&
1244 (length > (cache->end_of_file - pos_in_file)))
1245 length= (size_t) (cache->end_of_file - pos_in_file);
1246 if (length == 0)
1247 {
1248 cache->error= (int) left_length;
1249 DBUG_RETURN(1);
1250 }
1251 if (lock_io_cache(cache, pos_in_file))
1252 {
1253 /* With a synchronized write/read cache we won't come here... */
1254 DBUG_ASSERT(!cshare->source_cache);
1255 /*
1256 ... unless the writer has gone before this thread entered the
1257 lock. Simulate EOF in this case. It can be distinguished by
1258 cache->file.
1259 */
1260 if (cache->file < 0)
1261 len= 0;
1262 else
1263 {
1264 /*
1265 Whenever a function which operates on IO_CACHE flushes/writes
1266 some part of the IO_CACHE to disk it will set the property
1267 "seek_not_done" to indicate this to other functions operating
1268 on the IO_CACHE.
1269 */
1270 if (cache->seek_not_done)
1271 {
1272 if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1273 == MY_FILEPOS_ERROR)
1274 {
1275 cache->error= -1;
1276 unlock_io_cache(cache);
1277 DBUG_RETURN(1);
1278 }
1279 }
1280 len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1281 }
1282 DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1283
1284 cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1285 cache->error= (len == length ? 0 : (int) len);
1286 cache->pos_in_file= pos_in_file;
1287
1288 /* Copy important values to the share. */
1289 cshare->error= cache->error;
1290 cshare->read_end= cache->read_end;
1291 cshare->pos_in_file= pos_in_file;
1292
1293 /* Mark all threads as running and wake them. */
1294 unlock_io_cache(cache);
1295 }
1296 else
1297 {
1298 /*
1299 With a synchronized write/read cache readers always come here.
1300 Copy important values from the share.
1301 */
1302 cache->error= cshare->error;
1303 cache->read_end= cshare->read_end;
1304 cache->pos_in_file= cshare->pos_in_file;
1305
1306 len= ((cache->error == -1) ? (size_t) -1 :
1307 (size_t) (cache->read_end - cache->buffer));
1308 }
1309 cache->read_pos= cache->buffer;
1310 cache->seek_not_done= 0;
1311 if (len == 0 || len == (size_t) -1)
1312 {
1313 DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1314 (ulong) len, (ulong) left_length));
1315 cache->error= (int) left_length;
1316 DBUG_RETURN(1);
1317 }
1318 cnt= (len > Count) ? Count : len;
1319 if (cnt)
1320 memcpy(Buffer, cache->read_pos, cnt);
1321 Count -= cnt;
1322 Buffer+= cnt;
1323 left_length+= cnt;
1324 cache->read_pos+= cnt;
1325 }
1326 DBUG_RETURN(0);
1327 }
1328
1329
1330 /*
1331 Copy data from write cache to read cache.
1332
1333 SYNOPSIS
1334 copy_to_read_buffer()
1335 write_cache The write cache.
1336 write_buffer The source of data, mostly the cache buffer.
1337 write_length The number of bytes to copy.
1338
1339 NOTE
1340 The write thread will wait for all read threads to join the cache
1341 lock. Then it copies the data over and wakes the read threads.
1342
1343 RETURN
1344 void
1345 */
1346
copy_to_read_buffer(IO_CACHE * write_cache,const uchar * write_buffer,my_off_t pos_in_file)1347 static void copy_to_read_buffer(IO_CACHE *write_cache,
1348 const uchar *write_buffer, my_off_t pos_in_file)
1349 {
1350 size_t write_length= (size_t) (write_cache->pos_in_file - pos_in_file);
1351 IO_CACHE_SHARE *cshare= write_cache->share;
1352
1353 DBUG_ASSERT(cshare->source_cache == write_cache);
1354 /*
1355 write_length is usually less or equal to buffer_length.
1356 It can be bigger if _my_b_cache_write_r() is called with a big length.
1357 */
1358 while (write_length)
1359 {
1360 size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1361 int __attribute__((unused)) rc;
1362
1363 rc= lock_io_cache(write_cache, pos_in_file);
1364 /* The writing thread does always have the lock when it awakes. */
1365 DBUG_ASSERT(rc);
1366
1367 memcpy(cshare->buffer, write_buffer, copy_length);
1368
1369 cshare->error= 0;
1370 cshare->read_end= cshare->buffer + copy_length;
1371 cshare->pos_in_file= pos_in_file;
1372
1373 /* Mark all threads as running and wake them. */
1374 unlock_io_cache(write_cache);
1375
1376 write_buffer+= copy_length;
1377 write_length-= copy_length;
1378 }
1379 }
1380
1381
1382 /*
1383 Do sequential read from the SEQ_READ_APPEND cache.
1384
1385 We do this in three stages:
1386 - first read from info->buffer
1387 - then if there are still data to read, try the file descriptor
1388 - afterwards, if there are still data to read, try append buffer
1389
1390 RETURNS
1391 0 Success
1392 1 Failed to read
1393 */
1394
_my_b_seq_read(IO_CACHE * info,uchar * Buffer,size_t Count)1395 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count)
1396 {
1397 size_t length, diff_length, save_count, max_length;
1398 my_off_t pos_in_file;
1399 save_count=Count;
1400
1401 lock_append_buffer(info);
1402
1403 /* pos_in_file always point on where info->buffer was read */
1404 if ((pos_in_file=info->pos_in_file +
1405 (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1406 goto read_append_buffer;
1407
1408 /*
1409 With read-append cache we must always do a seek before we read,
1410 because the write could have moved the file pointer astray
1411 */
1412 if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1413 {
1414 info->error= -1;
1415 unlock_append_buffer(info);
1416 return (1);
1417 }
1418 info->seek_not_done=0;
1419
1420 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1421
1422 /* now the second stage begins - read from file descriptor */
1423 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1424 {
1425 /* Fill first intern buffer */
1426 size_t read_length;
1427
1428 length= IO_ROUND_DN(Count) - diff_length;
1429 if ((read_length= mysql_file_read(info->file,Buffer, length,
1430 info->myflags)) == (size_t) -1)
1431 {
1432 info->error= -1;
1433 unlock_append_buffer(info);
1434 return 1;
1435 }
1436 Count-=read_length;
1437 Buffer+=read_length;
1438 pos_in_file+=read_length;
1439
1440 if (read_length != length)
1441 {
1442 /*
1443 We only got part of data; Read the rest of the data from the
1444 write buffer
1445 */
1446 goto read_append_buffer;
1447 }
1448 diff_length=0;
1449 }
1450
1451 max_length= info->read_length-diff_length;
1452 if (max_length > (info->end_of_file - pos_in_file))
1453 max_length= (size_t) (info->end_of_file - pos_in_file);
1454 if (!max_length)
1455 {
1456 if (Count)
1457 goto read_append_buffer;
1458 length=0; /* Didn't read any more chars */
1459 }
1460 else
1461 {
1462 length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1463 if (length == (size_t) -1)
1464 {
1465 info->error= -1;
1466 unlock_append_buffer(info);
1467 return 1;
1468 }
1469 if (length < Count)
1470 {
1471 memcpy(Buffer, info->buffer, length);
1472 Count -= length;
1473 Buffer += length;
1474
1475 /*
1476 added the line below to make
1477 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1478 otherwise this does not appear to be needed
1479 */
1480 pos_in_file += length;
1481 goto read_append_buffer;
1482 }
1483 }
1484 unlock_append_buffer(info);
1485 info->read_pos=info->buffer+Count;
1486 info->read_end=info->buffer+length;
1487 info->pos_in_file=pos_in_file;
1488 memcpy(Buffer,info->buffer,(size_t) Count);
1489 return 0;
1490
1491 read_append_buffer:
1492
1493 /*
1494 Read data from the current write buffer.
1495 Count should never be == 0 here (The code will work even if count is 0)
1496 */
1497
1498 {
1499 /* First copy the data to Count */
1500 size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1501 size_t copy_len;
1502 size_t transfer_len;
1503
1504 DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1505 copy_len=MY_MIN(Count, len_in_buff);
1506 memcpy(Buffer, info->append_read_pos, copy_len);
1507 info->append_read_pos += copy_len;
1508 Count -= copy_len;
1509 if (Count)
1510 info->error= (int) (save_count - Count);
1511
1512 /* Fill read buffer with data from write buffer */
1513 memcpy(info->buffer, info->append_read_pos,
1514 (size_t) (transfer_len=len_in_buff - copy_len));
1515 info->read_pos= info->buffer;
1516 info->read_end= info->buffer+transfer_len;
1517 info->append_read_pos=info->write_pos;
1518 info->pos_in_file=pos_in_file+copy_len;
1519 info->end_of_file+=len_in_buff;
1520 }
1521 unlock_append_buffer(info);
1522 return Count ? 1 : 0;
1523 }
1524
1525
1526 #ifdef HAVE_AIOWAIT
1527
1528 /*
1529 Read from the IO_CACHE into a buffer and feed asynchronously
1530 from disk when needed.
1531
1532 SYNOPSIS
1533 _my_b_async_read()
1534 info IO_CACHE pointer
1535 Buffer Buffer to retrieve count bytes from file
1536 Count Number of bytes to read into Buffer
1537
1538 RETURN VALUE
1539 -1 An error has occurred; my_errno is set.
1540 0 Success
1541 1 An error has occurred; IO_CACHE to error state.
1542 */
1543
_my_b_async_read(IO_CACHE * info,uchar * Buffer,size_t Count)1544 int _my_b_async_read(IO_CACHE *info, uchar *Buffer, size_t Count)
1545 {
1546 size_t length, read_length, diff_length, left_length=0, use_length, org_Count;
1547 size_t max_length;
1548 my_off_t next_pos_in_file;
1549 uchar *read_buffer;
1550
1551 org_Count=Count;
1552
1553 if (info->inited)
1554 { /* wait for read block */
1555 info->inited=0; /* No more block to read */
1556 my_aiowait(&info->aio_result); /* Wait for outstanding req */
1557 if (info->aio_result.result.aio_errno)
1558 {
1559 if (info->myflags & MY_WME)
1560 my_error(EE_READ, MYF(ME_BELL), my_filename(info->file),
1561 info->aio_result.result.aio_errno);
1562 my_errno=info->aio_result.result.aio_errno;
1563 info->error= -1;
1564 return(1);
1565 }
1566 if (! (read_length= (size_t) info->aio_result.result.aio_return) ||
1567 read_length == (size_t) -1)
1568 {
1569 my_errno=0; /* For testing */
1570 info->error= (read_length == (size_t) -1 ? -1 :
1571 (int) (read_length+left_length));
1572 return(1);
1573 }
1574 info->pos_in_file+= (size_t) (info->read_end - info->request_pos);
1575
1576 if (info->request_pos != info->buffer)
1577 info->request_pos=info->buffer;
1578 else
1579 info->request_pos=info->buffer+info->read_length;
1580 info->read_pos=info->request_pos;
1581 next_pos_in_file=info->aio_read_pos+read_length;
1582
1583 /* Check if pos_in_file is changed
1584 (_ni_read_cache may have skipped some bytes) */
1585
1586 if (info->aio_read_pos < info->pos_in_file)
1587 { /* Fix if skipped bytes */
1588 if (info->aio_read_pos + read_length < info->pos_in_file)
1589 {
1590 read_length=0; /* Skip block */
1591 next_pos_in_file=info->pos_in_file;
1592 }
1593 else
1594 {
1595 my_off_t offset= (info->pos_in_file - info->aio_read_pos);
1596 info->pos_in_file=info->aio_read_pos; /* Whe are here */
1597 info->read_pos=info->request_pos+offset;
1598 read_length-=offset; /* Bytes left from read_pos */
1599 }
1600 }
1601 #ifndef DBUG_OFF
1602 if (info->aio_read_pos > info->pos_in_file)
1603 {
1604 my_errno=EINVAL;
1605 return(info->read_length= (size_t) -1);
1606 }
1607 #endif
1608 /* Copy found bytes to buffer */
1609 length=MY_MIN(Count,read_length);
1610 memcpy(Buffer,info->read_pos,(size_t) length);
1611 Buffer+=length;
1612 Count-=length;
1613 left_length+=length;
1614 info->read_end=info->read_pos+read_length;
1615 info->read_pos+=length;
1616 }
1617 else
1618 next_pos_in_file=(info->pos_in_file+ (size_t)
1619 (info->read_end - info->request_pos));
1620
1621 /* If reading large blocks, or first read or read with skip */
1622 if (Count)
1623 {
1624 if (next_pos_in_file == info->end_of_file)
1625 {
1626 info->error=(int) (read_length+left_length);
1627 return 1;
1628 }
1629
1630 if (mysql_file_seek(info->file, next_pos_in_file, MY_SEEK_SET, MYF(0))
1631 == MY_FILEPOS_ERROR)
1632 {
1633 info->error= -1;
1634 return (1);
1635 }
1636
1637 read_length=IO_SIZE*2- (size_t) (next_pos_in_file & (IO_SIZE-1));
1638 if (Count < read_length)
1639 { /* Small block, read to cache */
1640 if ((read_length=mysql_file_read(info->file,info->request_pos,
1641 read_length, info->myflags)) == (size_t) -1)
1642 return info->error= -1;
1643 use_length=MY_MIN(Count,read_length);
1644 memcpy(Buffer,info->request_pos,(size_t) use_length);
1645 info->read_pos=info->request_pos+Count;
1646 info->read_end=info->request_pos+read_length;
1647 info->pos_in_file=next_pos_in_file; /* Start of block in cache */
1648 next_pos_in_file+=read_length;
1649
1650 if (Count != use_length)
1651 { /* Didn't find hole block */
1652 if (info->myflags & (MY_WME | MY_FAE | MY_FNABP) && Count != org_Count)
1653 my_error(EE_EOFERR, MYF(ME_BELL), my_filename(info->file), my_errno);
1654 info->error=(int) (read_length+left_length);
1655 return 1;
1656 }
1657 }
1658 else
1659 { /* Big block, don't cache it */
1660 if ((read_length= mysql_file_read(info->file, Buffer, Count,info->myflags))
1661 != Count)
1662 {
1663 info->error= read_length == (size_t) -1 ? -1 : read_length+left_length;
1664 return 1;
1665 }
1666 info->read_pos=info->read_end=info->request_pos;
1667 info->pos_in_file=(next_pos_in_file+=Count);
1668 }
1669 }
1670
1671 /* Read next block with asyncronic io */
1672 diff_length=(next_pos_in_file & (IO_SIZE-1));
1673 max_length= info->read_length - diff_length;
1674 if (max_length > info->end_of_file - next_pos_in_file)
1675 max_length= (size_t) (info->end_of_file - next_pos_in_file);
1676
1677 if (info->request_pos != info->buffer)
1678 read_buffer=info->buffer;
1679 else
1680 read_buffer=info->buffer+info->read_length;
1681 info->aio_read_pos=next_pos_in_file;
1682 if (max_length)
1683 {
1684 info->aio_result.result.aio_errno=AIO_INPROGRESS; /* Marker for test */
1685 DBUG_PRINT("aioread",("filepos: %ld length: %lu",
1686 (ulong) next_pos_in_file, (ulong) max_length));
1687 if (aioread(info->file,read_buffer, max_length,
1688 (my_off_t) next_pos_in_file,MY_SEEK_SET,
1689 &info->aio_result.result))
1690 { /* Skip async io */
1691 my_errno=errno;
1692 DBUG_PRINT("error",("got error: %d, aio_result: %d from aioread, async skipped",
1693 errno, info->aio_result.result.aio_errno));
1694 if (info->request_pos != info->buffer)
1695 {
1696 bmove(info->buffer,info->request_pos,
1697 (size_t) (info->read_end - info->read_pos));
1698 info->request_pos=info->buffer;
1699 info->read_pos-=info->read_length;
1700 info->read_end-=info->read_length;
1701 }
1702 info->read_length=info->buffer_length; /* Use hole buffer */
1703 info->read_function=_my_b_cache_read; /* Use normal IO_READ next */
1704 }
1705 else
1706 info->inited=info->aio_result.pending=1;
1707 }
1708 return 0; /* Block read, async in use */
1709 } /* _my_b_async_read */
1710 #endif
1711
1712
1713 /* Read one byte when buffer is empty */
1714
_my_b_get(IO_CACHE * info)1715 int _my_b_get(IO_CACHE *info)
1716 {
1717 uchar buff;
1718 if ((*(info)->read_function)(info,&buff,1))
1719 return my_b_EOF;
1720 return (int) (uchar) buff;
1721 }
1722
1723 /*
1724 Write a byte buffer to IO_CACHE and flush to disk
1725 if IO_CACHE is full.
1726
1727 RETURN VALUE
1728 1 On error on write
1729 0 On success
1730 -1 On error; my_errno contains error code.
1731 */
1732
_my_b_cache_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1733 int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1734 {
1735 if (Buffer != info->write_buffer)
1736 {
1737 Count= IO_ROUND_DN(Count);
1738 if (!Count)
1739 return 0;
1740 }
1741
1742 if (info->seek_not_done)
1743 {
1744 /*
1745 Whenever a function which operates on IO_CACHE flushes/writes
1746 some part of the IO_CACHE to disk it will set the property
1747 "seek_not_done" to indicate this to other functions operating
1748 on the IO_CACHE.
1749 */
1750 if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET,
1751 MYF(info->myflags & MY_WME)) == MY_FILEPOS_ERROR)
1752 {
1753 info->error= -1;
1754 return 1;
1755 }
1756 info->seek_not_done=0;
1757 }
1758 if (mysql_file_write(info->file, Buffer, Count, info->myflags | MY_NABP))
1759 return info->error= -1;
1760
1761 info->pos_in_file+= Count;
1762 return 0;
1763 }
1764
1765
1766 /*
1767 In case of a shared I/O cache with a writer we normally do direct
1768 write cache to read cache copy. Simulate this here by direct
1769 caller buffer to read cache copy. Do it after the write so that
1770 the cache readers actions on the flushed part can go in parallel
1771 with the write of the extra stuff. copy_to_read_buffer()
1772 synchronizes writer and readers so that after this call the
1773 readers can act on the extra stuff while the writer can go ahead
1774 and prepare the next output. copy_to_read_buffer() relies on
1775 info->pos_in_file.
1776 */
_my_b_cache_write_r(IO_CACHE * info,const uchar * Buffer,size_t Count)1777 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count)
1778 {
1779 my_off_t old_pos_in_file= info->pos_in_file;
1780 int res= _my_b_cache_write(info, Buffer, Count);
1781 if (res)
1782 return res;
1783
1784 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1785 DBUG_ASSERT(info->share);
1786 copy_to_read_buffer(info, Buffer, old_pos_in_file);
1787
1788 return 0;
1789 }
1790
1791
1792 /*
1793 Append a block to the write buffer.
1794 This is done with the buffer locked to ensure that we don't read from
1795 the write buffer before we are ready with it.
1796 */
1797
my_b_append(IO_CACHE * info,const uchar * Buffer,size_t Count)1798 int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count)
1799 {
1800 size_t rest_length,length;
1801
1802 /*
1803 Assert that we cannot come here with a shared cache. If we do one
1804 day, we might need to add a call to copy_to_read_buffer().
1805 */
1806 DBUG_ASSERT(!info->share);
1807 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1808
1809 lock_append_buffer(info);
1810 rest_length= (size_t) (info->write_end - info->write_pos);
1811 if (Count <= rest_length)
1812 goto end;
1813 memcpy(info->write_pos, Buffer, rest_length);
1814 Buffer+=rest_length;
1815 Count-=rest_length;
1816 info->write_pos+=rest_length;
1817 if (my_b_flush_io_cache(info,0))
1818 {
1819 unlock_append_buffer(info);
1820 return 1;
1821 }
1822 if (Count >= IO_SIZE)
1823 { /* Fill first intern buffer */
1824 length= IO_ROUND_DN(Count);
1825 if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
1826 {
1827 unlock_append_buffer(info);
1828 return info->error= -1;
1829 }
1830 Count-=length;
1831 Buffer+=length;
1832 info->end_of_file+=length;
1833 }
1834
1835 end:
1836 memcpy(info->write_pos,Buffer,(size_t) Count);
1837 info->write_pos+=Count;
1838 unlock_append_buffer(info);
1839 return 0;
1840 }
1841
1842
my_b_safe_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1843 int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1844 {
1845 /*
1846 Sasha: We are not writing this with the ? operator to avoid hitting
1847 a possible compiler bug. At least gcc 2.95 cannot deal with
1848 several layers of ternary operators that evaluated comma(,) operator
1849 expressions inside - I do have a test case if somebody wants it
1850 */
1851 if (info->type == SEQ_READ_APPEND)
1852 return my_b_append(info, Buffer, Count);
1853 return my_b_write(info, Buffer, Count);
1854 }
1855
1856
1857 /*
1858 Write a block to disk where part of the data may be inside the record
1859 buffer. As all write calls to the data goes through the cache,
1860 we will never get a seek over the end of the buffer
1861 */
1862
my_block_write(IO_CACHE * info,const uchar * Buffer,size_t Count,my_off_t pos)1863 int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
1864 my_off_t pos)
1865 {
1866 size_t length;
1867 int error=0;
1868
1869 /*
1870 Assert that we cannot come here with a shared cache. If we do one
1871 day, we might need to add a call to copy_to_read_buffer().
1872 */
1873 DBUG_ASSERT(!info->share);
1874 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1875
1876 if (pos < info->pos_in_file)
1877 {
1878 /* Of no overlap, write everything without buffering */
1879 if (pos + Count <= info->pos_in_file)
1880 return (int)mysql_file_pwrite(info->file, Buffer, Count, pos,
1881 info->myflags | MY_NABP);
1882 /* Write the part of the block that is before buffer */
1883 length= (uint) (info->pos_in_file - pos);
1884 if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1885 info->error= error= -1;
1886 Buffer+=length;
1887 pos+= length;
1888 Count-= length;
1889 }
1890
1891 /* Check if we want to write inside the used part of the buffer.*/
1892 length= (size_t) (info->write_end - info->buffer);
1893 if (pos < info->pos_in_file + length)
1894 {
1895 size_t offset= (size_t) (pos - info->pos_in_file);
1896 length-=offset;
1897 if (length > Count)
1898 length=Count;
1899 memcpy(info->buffer+offset, Buffer, length);
1900 Buffer+=length;
1901 Count-= length;
1902 /* Fix length of buffer if the new data was larger */
1903 if (info->buffer+length > info->write_pos)
1904 info->write_pos=info->buffer+length;
1905 if (!Count)
1906 return (error);
1907 }
1908 /* Write at the end of the current buffer; This is the normal case */
1909 if (_my_b_write(info, Buffer, Count))
1910 error= -1;
1911 return error;
1912 }
1913
1914
1915 /* Flush write cache */
1916
1917 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1918 lock_append_buffer(info);
1919 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1920 unlock_append_buffer(info);
1921
my_b_flush_io_cache(IO_CACHE * info,int need_append_buffer_lock)1922 int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1923 {
1924 size_t length;
1925 my_bool append_cache= (info->type == SEQ_READ_APPEND);
1926 DBUG_ENTER("my_b_flush_io_cache");
1927 DBUG_PRINT("enter", ("cache: %p", info));
1928
1929 if (!append_cache)
1930 need_append_buffer_lock= 0;
1931
1932 if (info->type == WRITE_CACHE || append_cache)
1933 {
1934 if (info->file == -1)
1935 {
1936 if (real_open_cached_file(info))
1937 DBUG_RETURN((info->error= -1));
1938 }
1939 LOCK_APPEND_BUFFER;
1940
1941 if ((length=(size_t) (info->write_pos - info->write_buffer)))
1942 {
1943 if (append_cache)
1944 {
1945 if (mysql_file_write(info->file, info->write_buffer, length,
1946 info->myflags | MY_NABP))
1947 {
1948 info->error= -1;
1949 DBUG_RETURN(-1);
1950 }
1951 info->end_of_file+= info->write_pos - info->append_read_pos;
1952 info->append_read_pos= info->write_buffer;
1953 DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
1954 }
1955 else
1956 {
1957 int res= info->write_function(info, info->write_buffer, length);
1958 if (res)
1959 DBUG_RETURN(res);
1960
1961 set_if_bigger(info->end_of_file, info->pos_in_file);
1962 }
1963 info->write_end= (info->write_buffer + info->buffer_length -
1964 ((info->pos_in_file + length) & (IO_SIZE - 1)));
1965 info->write_pos= info->write_buffer;
1966 ++info->disk_writes;
1967 UNLOCK_APPEND_BUFFER;
1968 DBUG_RETURN(info->error);
1969 }
1970 }
1971 #ifdef HAVE_AIOWAIT
1972 else if (info->type != READ_NET)
1973 {
1974 my_aiowait(&info->aio_result); /* Wait for outstanding req */
1975 info->inited=0;
1976 }
1977 #endif
1978 UNLOCK_APPEND_BUFFER;
1979 DBUG_RETURN(0);
1980 }
1981
1982 /*
1983 Free an IO_CACHE object
1984
1985 SYNOPSOS
1986 end_io_cache()
1987 info IO_CACHE Handle to free
1988
1989 NOTES
1990 It's currently safe to call this if one has called init_io_cache()
1991 on the 'info' object, even if init_io_cache() failed.
1992 This function is also safe to call twice with the same handle.
1993 Note that info->file is not reset as the caller may still use ut for my_close()
1994
1995 RETURN
1996 0 ok
1997 # Error
1998 */
1999
end_io_cache(IO_CACHE * info)2000 int end_io_cache(IO_CACHE *info)
2001 {
2002 int error=0;
2003 DBUG_ENTER("end_io_cache");
2004 DBUG_PRINT("enter",("cache: %p", info));
2005
2006 /*
2007 Every thread must call remove_io_thread(). The last one destroys
2008 the share elements.
2009 */
2010 DBUG_ASSERT(!info->share || !info->share->total_threads);
2011
2012 if (info->alloced_buffer)
2013 {
2014 info->alloced_buffer=0;
2015 if (info->file != -1) /* File doesn't exist */
2016 error= my_b_flush_io_cache(info,1);
2017 my_free(info->buffer);
2018 info->buffer=info->read_pos=(uchar*) 0;
2019 }
2020 if (info->type == SEQ_READ_APPEND)
2021 {
2022 /* Destroy allocated mutex */
2023 mysql_mutex_destroy(&info->append_buffer_lock);
2024 }
2025 info->share= 0;
2026 info->type= TYPE_NOT_SET; /* Ensure that flush_io_cache() does nothing */
2027 info->write_end= 0; /* Ensure that my_b_write() fails */
2028 info->write_function= 0; /* my_b_write will crash if used */
2029 DBUG_RETURN(error);
2030 } /* end_io_cache */
2031
2032
2033 /**********************************************************************
2034 Testing of MF_IOCACHE
2035 **********************************************************************/
2036
2037 #ifdef MAIN
2038
2039 #include <my_dir.h>
2040
die(const char * fmt,...)2041 void die(const char* fmt, ...)
2042 {
2043 va_list va_args;
2044 va_start(va_args,fmt);
2045 fprintf(stderr,"Error:");
2046 vfprintf(stderr, fmt,va_args);
2047 fprintf(stderr,", errno=%d\n", errno);
2048 va_end(va_args);
2049 exit(1);
2050 }
2051
open_file(const char * fname,IO_CACHE * info,int cache_size)2052 int open_file(const char* fname, IO_CACHE* info, int cache_size)
2053 {
2054 int fd;
2055 if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
2056 die("Could not open %s", fname);
2057 if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
2058 die("failed in init_io_cache()");
2059 return fd;
2060 }
2061
close_file(IO_CACHE * info)2062 void close_file(IO_CACHE* info)
2063 {
2064 end_io_cache(info);
2065 my_close(info->file, MYF(MY_WME));
2066 }
2067
main(int argc,char ** argv)2068 int main(int argc, char** argv)
2069 {
2070 IO_CACHE sra_cache; /* SEQ_READ_APPEND */
2071 MY_STAT status;
2072 const char* fname="/tmp/iocache.test";
2073 int cache_size=16384;
2074 char llstr_buf[22];
2075 int max_block,total_bytes=0;
2076 int i,num_loops=100,error=0;
2077 char *p;
2078 char* block, *block_end;
2079 MY_INIT(argv[0]);
2080 max_block = cache_size*3;
2081 if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
2082 die("Not enough memory to allocate test block");
2083 block_end = block + max_block;
2084 for (p = block,i=0; p < block_end;i++)
2085 {
2086 *p++ = (char)i;
2087 }
2088 if (my_stat(fname,&status, MYF(0)) &&
2089 my_delete(fname,MYF(MY_WME)))
2090 {
2091 die("Delete of %s failed, aborting", fname);
2092 }
2093 open_file(fname,&sra_cache, cache_size);
2094 for (i = 0; i < num_loops; i++)
2095 {
2096 char buf[4];
2097 int block_size = abs(rand() % max_block);
2098 int4store(buf, block_size);
2099 if (my_b_append(&sra_cache,buf,4) ||
2100 my_b_append(&sra_cache, block, block_size))
2101 die("write failed");
2102 total_bytes += 4+block_size;
2103 }
2104 close_file(&sra_cache);
2105 my_free(block);
2106 if (!my_stat(fname,&status,MYF(MY_WME)))
2107 die("%s failed to stat, but I had just closed it,\
2108 wonder how that happened");
2109 printf("Final size of %s is %s, wrote %d bytes\n",fname,
2110 llstr(status.st_size,llstr_buf),
2111 total_bytes);
2112 my_delete(fname, MYF(MY_WME));
2113 /* check correctness of tests */
2114 if (total_bytes != status.st_size)
2115 {
2116 fprintf(stderr,"Not the same number of bytes actually in file as bytes \
2117 supposedly written\n");
2118 error=1;
2119 }
2120 exit(error);
2121 return 0;
2122 }
2123 #endif
2124