1 /*
2 Copyright (c) 2000, 2011, Oracle and/or its affiliates
3 Copyright (c) 2010, 2021, MariaDB
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; version 2 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
17
18 /*
19 Cashing of files with only does (sequential) read or writes of fixed-
20 length records. A read isn't allowed to go over file-length. A read is ok
21 if it ends at file-length and next read can try to read after file-length
22 (and get a EOF-error).
23 Possibly use of asyncronic io.
24 macros for read and writes for faster io.
25 Used instead of FILE when reading or writing whole files.
26 One can change info->pos_in_file to a higher value to skip bytes in file if
27 also info->read_pos is set to info->read_end.
28 If called through open_cached_file(), then the temporary file will
29 only be created if a write exeeds the file buffer or if one calls
30 my_b_flush_io_cache().
31
32 If one uses SEQ_READ_APPEND, then two buffers are allocated, one for
33 reading and another for writing. Reads are first done from disk and
34 then done from the write buffer. This is an efficient way to read
35 from a log file when one is writing to it at the same time.
36 For this to work, the file has to be opened in append mode!
37 Note that when one uses SEQ_READ_APPEND, one MUST write using
38 my_b_append ! This is needed because we need to lock the mutex
39 every time we access the write buffer.
40
41 TODO:
42 When one SEQ_READ_APPEND and we are reading and writing at the same time,
43 each time the write buffer gets full and it's written to disk, we will
44 always do a disk read to read a part of the buffer from disk to the
45 read buffer.
46 This should be fixed so that when we do a my_b_flush_io_cache() and
47 we have been reading the write buffer, we should transfer the rest of the
48 write buffer to the read buffer before we start to reuse it.
49 */
50
51 #include "mysys_priv.h"
52 #include <m_string.h>
53 #include <errno.h>
54 #include "mysql/psi/mysql_file.h"
55
56 PSI_file_key key_file_io_cache;
57
58 #define lock_append_buffer(info) \
59 mysql_mutex_lock(&(info)->append_buffer_lock)
60 #define unlock_append_buffer(info) \
61 mysql_mutex_unlock(&(info)->append_buffer_lock)
62
63 #define IO_ROUND_UP(X) (((X)+IO_SIZE-1) & ~(IO_SIZE-1))
64 #define IO_ROUND_DN(X) ( (X) & ~(IO_SIZE-1))
65
66 static int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count);
67 static int _my_b_cache_read_r(IO_CACHE *info, uchar *Buffer, size_t Count);
68 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count);
69 static int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count);
70 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count);
71
72 int (*_my_b_encr_read)(IO_CACHE *info,uchar *Buffer,size_t Count)= 0;
73 int (*_my_b_encr_write)(IO_CACHE *info,const uchar *Buffer,size_t Count)= 0;
74
75
76
77 static void
init_functions(IO_CACHE * info)78 init_functions(IO_CACHE* info)
79 {
80 enum cache_type type= info->type;
81 info->read_function = 0; /* Force a core if used */
82 info->write_function = 0; /* Force a core if used */
83 switch (type) {
84 case READ_NET:
85 /*
86 Must be initialized by the caller. The problem is that
87 _my_b_net_read has to be defined in sql directory because of
88 the dependency on THD, and therefore cannot be visible to
89 programs that link against mysys but know nothing about THD, such
90 as myisamchk
91 */
92 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
93 break;
94 case SEQ_READ_APPEND:
95 info->read_function = _my_b_seq_read;
96 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
97 break;
98 case READ_CACHE:
99 if (info->myflags & MY_ENCRYPT)
100 {
101 DBUG_ASSERT(info->share == 0);
102 info->read_function = _my_b_encr_read;
103 break;
104 }
105 /* fall through */
106 case WRITE_CACHE:
107 if (info->myflags & MY_ENCRYPT)
108 {
109 info->write_function = _my_b_encr_write;
110 break;
111 }
112 /* fall through */
113 case READ_FIFO:
114 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
115 info->read_function = info->share ? _my_b_cache_read_r : _my_b_cache_read;
116 info->write_function = info->share ? _my_b_cache_write_r : _my_b_cache_write;
117 break;
118 case TYPE_NOT_SET:
119 DBUG_ASSERT(0);
120 break;
121 }
122 if (type == READ_CACHE || type == WRITE_CACHE || type == SEQ_READ_APPEND)
123 info->myflags|= MY_FULL_IO;
124 else
125 info->myflags&= ~MY_FULL_IO;
126 }
127
128
129 /*
130 Initialize an IO_CACHE object
131
132 SYNOPSOS
133 init_io_cache_ext()
134 info cache handler to initialize
135 file File that should be associated to to the handler
136 If == -1 then real_open_cached_file()
137 will be called when it's time to open file.
138 cachesize Size of buffer to allocate for read/write
139 If == 0 then use my_default_record_cache_size
140 type Type of cache
141 seek_offset Where cache should start reading/writing
142 use_async_io Set to 1 of we should use async_io (if available)
143 cache_myflags Bitmap of different flags
144 MY_WME | MY_FAE | MY_NABP | MY_FNABP |
145 MY_DONT_CHECK_FILESIZE
146 file_key Instrumented file key for temporary cache file
147
148 RETURN
149 0 ok
150 # error
151 */
152
init_io_cache_ext(IO_CACHE * info,File file,size_t cachesize,enum cache_type type,my_off_t seek_offset,pbool use_async_io,myf cache_myflags,PSI_file_key file_key)153 int init_io_cache_ext(IO_CACHE *info, File file, size_t cachesize,
154 enum cache_type type, my_off_t seek_offset,
155 pbool use_async_io, myf cache_myflags,
156 PSI_file_key file_key)
157 {
158 size_t min_cache;
159 my_off_t pos;
160 my_off_t end_of_file= ~(my_off_t) 0;
161 DBUG_ENTER("init_io_cache_ext");
162 DBUG_PRINT("enter",("cache:%p type: %d pos: %llu",
163 info, (int) type, (ulonglong) seek_offset));
164
165 info->file= file;
166 info->type= TYPE_NOT_SET; /* Don't set it until mutex are created */
167 info->pos_in_file= seek_offset;
168 info->alloced_buffer = 0;
169 info->buffer=0;
170 info->seek_not_done= 0;
171 info->next_file_user= NULL;
172
173 if (file >= 0)
174 {
175 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
176 pos= mysql_file_tell(file, MYF(0));
177 if ((pos == (my_off_t) -1) && (my_errno == ESPIPE))
178 {
179 /*
180 This kind of object doesn't support seek() or tell(). Don't set a
181 seek_not_done that will make us again try to seek() later and fail.
182
183 Additionally, if we're supposed to start somewhere other than the
184 the beginning of whatever this file is, then somebody made a bad
185 assumption.
186 */
187 DBUG_ASSERT(seek_offset == 0);
188 }
189 else
190 info->seek_not_done= MY_TEST(seek_offset != pos);
191 }
192 else
193 if (type == WRITE_CACHE && _my_b_encr_read)
194 {
195 cache_myflags|= MY_ENCRYPT;
196 DBUG_ASSERT(seek_offset == 0);
197 }
198
199 info->disk_writes= 0;
200 info->share=0;
201
202 if (!cachesize && !(cachesize= my_default_record_cache_size))
203 DBUG_RETURN(1); /* No cache requested */
204 min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
205 if (type == READ_CACHE || type == SEQ_READ_APPEND)
206 { /* Assume file isn't growing */
207 DBUG_ASSERT(!(cache_myflags & MY_ENCRYPT));
208 if (!(cache_myflags & MY_DONT_CHECK_FILESIZE))
209 {
210 /* Calculate end of file to avoid allocating oversized buffers */
211 end_of_file= mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
212 /* Need to reset seek_not_done now that we just did a seek. */
213 info->seek_not_done= end_of_file == seek_offset ? 0 : 1;
214 if (end_of_file < seek_offset)
215 end_of_file=seek_offset;
216 /* Trim cache size if the file is very small */
217 if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
218 {
219 cachesize= (size_t) (end_of_file-seek_offset)+IO_SIZE*2-1;
220 use_async_io=0; /* No need to use async */
221 }
222 }
223 }
224 cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
225 if (type != READ_NET)
226 {
227 /* Retry allocating memory in smaller blocks until we get one */
228 cachesize= ((cachesize + min_cache-1) & ~(min_cache-1));
229 for (;;)
230 {
231 size_t buffer_block;
232 /*
233 Unset MY_WAIT_IF_FULL bit if it is set, to prevent conflict with
234 MY_ZEROFILL.
235 */
236 myf flags= (myf) (cache_myflags & ~(MY_WME | MY_WAIT_IF_FULL));
237
238 if (cachesize < min_cache)
239 cachesize = min_cache;
240 buffer_block= cachesize;
241 if (type == SEQ_READ_APPEND)
242 buffer_block *= 2;
243 else if (cache_myflags & MY_ENCRYPT)
244 buffer_block= 2*(buffer_block + MY_AES_BLOCK_SIZE) + sizeof(IO_CACHE_CRYPT);
245 if (cachesize == min_cache)
246 flags|= (myf) MY_WME;
247
248 if ((info->buffer= (uchar*) my_malloc(key_memory_IO_CACHE, buffer_block, flags)) != 0)
249 {
250 if (type == SEQ_READ_APPEND)
251 info->write_buffer= info->buffer + cachesize;
252 else
253 info->write_buffer= info->buffer;
254 info->alloced_buffer= buffer_block;
255 break; /* Enough memory found */
256 }
257 if (cachesize == min_cache)
258 DBUG_RETURN(2); /* Can't alloc cache */
259 /* Try with less memory */
260 cachesize= (cachesize*3/4 & ~(min_cache-1));
261 }
262 }
263
264 DBUG_PRINT("info",("init_io_cache_ext: cachesize = %lu", (ulong) cachesize));
265 info->read_length=info->buffer_length=cachesize;
266 info->myflags=cache_myflags & ~(MY_NABP | MY_FNABP);
267 info->request_pos= info->read_pos= info->write_pos = info->buffer;
268 if (type == SEQ_READ_APPEND)
269 {
270 info->append_read_pos = info->write_pos = info->write_buffer;
271 info->write_end = info->write_buffer + info->buffer_length;
272 mysql_mutex_init(key_IO_CACHE_append_buffer_lock,
273 &info->append_buffer_lock, MY_MUTEX_INIT_FAST);
274 }
275 #if defined(SAFE_MUTEX)
276 else
277 {
278 /* Clear mutex so that safe_mutex will notice that it's not initialized */
279 bzero((char*) &info->append_buffer_lock, sizeof(info->append_buffer_lock));
280 }
281 #endif
282
283 if (type == WRITE_CACHE)
284 info->write_end=
285 info->buffer+info->buffer_length- (seek_offset & (IO_SIZE-1));
286 else
287 info->read_end=info->buffer; /* Nothing in cache */
288
289 /* End_of_file may be changed by user later */
290 info->end_of_file= end_of_file;
291 info->error=0;
292 info->type= type;
293 init_functions(info);
294 DBUG_RETURN(0);
295 }
296
init_io_cache(IO_CACHE * info,File file,size_t cachesize,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,myf cache_myflags)297 int init_io_cache(IO_CACHE *info, File file, size_t cachesize,
298 enum cache_type type, my_off_t seek_offset,
299 my_bool use_async_io, myf cache_myflags)
300 {
301 return init_io_cache_ext(info, file, cachesize, type, seek_offset,
302 use_async_io, cache_myflags, key_file_io_cache);
303 }
304
305 /*
306 Initialize the slave IO_CACHE to read the same file (and data)
307 as master does.
308
309 One can create multiple slaves from a single master. Every slave and master
310 will have independent file positions.
311
312 The master must be a non-shared READ_CACHE.
313 It is assumed that no more reads are done after a master and/or a slave
314 has been freed (this limitation can be easily lifted).
315 */
316
init_slave_io_cache(IO_CACHE * master,IO_CACHE * slave)317 int init_slave_io_cache(IO_CACHE *master, IO_CACHE *slave)
318 {
319 uchar *slave_buf;
320 DBUG_ASSERT(master->type == READ_CACHE);
321 DBUG_ASSERT(!master->share);
322 DBUG_ASSERT(master->alloced_buffer);
323
324 if (!(slave_buf= (uchar*)my_malloc(PSI_INSTRUMENT_ME, master->alloced_buffer, MYF(0))))
325 {
326 return 1;
327 }
328 memcpy(slave, master, sizeof(IO_CACHE));
329 slave->buffer= slave_buf;
330
331 memcpy(slave->buffer, master->buffer, master->alloced_buffer);
332 slave->read_pos= slave->buffer + (master->read_pos - master->buffer);
333 slave->read_end= slave->buffer + (master->read_end - master->buffer);
334
335 if (master->next_file_user)
336 {
337 IO_CACHE *p;
338 for (p= master->next_file_user;
339 p->next_file_user !=master;
340 p= p->next_file_user)
341 {}
342
343 p->next_file_user= slave;
344 slave->next_file_user= master;
345 }
346 else
347 {
348 slave->next_file_user= master;
349 master->next_file_user= slave;
350 }
351 return 0;
352 }
353
354
end_slave_io_cache(IO_CACHE * cache)355 void end_slave_io_cache(IO_CACHE *cache)
356 {
357 /* Remove the cache from the next_file_user circular linked list. */
358 if (cache->next_file_user != cache)
359 {
360 IO_CACHE *p= cache->next_file_user;
361 while (p->next_file_user != cache)
362 p= p->next_file_user;
363 p->next_file_user= cache->next_file_user;
364
365 }
366 my_free(cache->buffer);
367 }
368
369 /*
370 Seek a read io cache to a given offset
371 */
seek_io_cache(IO_CACHE * cache,my_off_t needed_offset)372 void seek_io_cache(IO_CACHE *cache, my_off_t needed_offset)
373 {
374 my_off_t cached_data_start= cache->pos_in_file;
375 my_off_t cached_data_end= cache->pos_in_file + (cache->read_end -
376 cache->buffer);
377
378 if (needed_offset >= cached_data_start &&
379 needed_offset < cached_data_end)
380 {
381 /*
382 The offset we're seeking to is in the buffer.
383 Move buffer's read position accordingly
384 */
385 cache->read_pos= cache->buffer + (needed_offset - cached_data_start);
386 }
387 else
388 {
389 if (needed_offset > cache->end_of_file)
390 needed_offset= cache->end_of_file;
391 /*
392 The offset we're seeking to is not in the buffer.
393 - Set the buffer to be exhausted.
394 - Make the next read to a mysql_file_seek() call to the required
395 offset.
396 TODO(cvicentiu, spetrunia) properly implement aligned seeks for
397 efficiency.
398 */
399 cache->seek_not_done= 1;
400 cache->pos_in_file= needed_offset;
401 /* When reading it must appear as if we've started from the offset
402 that we've seeked here. We must let _my_b_cache_read assume that
403 by implying "no reading starting from pos_in_file" has happened. */
404 cache->read_pos= cache->buffer;
405 cache->read_end= cache->buffer;
406 }
407 }
408
409
410 /*
411 Use this to reset cache to re-start reading or to change the type
412 between READ_CACHE <-> WRITE_CACHE
413 If we are doing a reinit of a cache where we have the start of the file
414 in the cache, we are reusing this memory without flushing it to disk.
415 */
416
reinit_io_cache(IO_CACHE * info,enum cache_type type,my_off_t seek_offset,my_bool use_async_io,my_bool clear_cache)417 my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
418 my_off_t seek_offset,
419 my_bool use_async_io __attribute__((unused)),
420 my_bool clear_cache)
421 {
422 DBUG_ENTER("reinit_io_cache");
423 DBUG_PRINT("enter",("cache:%p type: %d seek_offset: %llu clear_cache: %d",
424 info, type, (ulonglong) seek_offset,
425 (int) clear_cache));
426
427 DBUG_ASSERT(type == READ_CACHE || type == WRITE_CACHE);
428 DBUG_ASSERT(info->type == READ_CACHE || info->type == WRITE_CACHE);
429
430 /* If the whole file is in memory, avoid flushing to disk */
431 if (! clear_cache &&
432 seek_offset >= info->pos_in_file &&
433 seek_offset <= my_b_tell(info))
434 {
435 /* Reuse current buffer without flushing it to disk */
436 uchar *pos;
437 if (info->type == WRITE_CACHE && type == READ_CACHE)
438 {
439 info->read_end=info->write_pos;
440 info->end_of_file=my_b_tell(info);
441 /*
442 Trigger a new seek only if we have a valid
443 file handle.
444 */
445 info->seek_not_done= (info->file != -1);
446 }
447 else if (type == WRITE_CACHE)
448 {
449 if (info->type == READ_CACHE)
450 {
451 info->write_end=info->write_buffer+info->buffer_length;
452 info->seek_not_done=1;
453 }
454 info->end_of_file = ~(my_off_t) 0;
455 }
456 pos=info->request_pos+(seek_offset-info->pos_in_file);
457 if (type == WRITE_CACHE)
458 info->write_pos=pos;
459 else
460 info->read_pos= pos;
461 }
462 else
463 {
464 /*
465 If we change from WRITE_CACHE to READ_CACHE, assume that everything
466 after the current positions should be ignored. In other cases we
467 update end_of_file as it may have changed since last init.
468 */
469 if (type == READ_CACHE)
470 {
471 if (info->type == WRITE_CACHE)
472 info->end_of_file= my_b_tell(info);
473 else
474 {
475 if (!(info->myflags & MY_ENCRYPT))
476 info->end_of_file= mysql_file_seek(info->file, 0L,
477 MY_SEEK_END, MYF(0));
478 }
479 }
480 /* flush cache if we want to reuse it */
481 if (!clear_cache && my_b_flush_io_cache(info,1))
482 DBUG_RETURN(1);
483 info->pos_in_file=seek_offset;
484 /* Better to do always do a seek */
485 info->seek_not_done=1;
486 info->request_pos=info->read_pos=info->write_pos=info->buffer;
487 if (type == READ_CACHE)
488 {
489 info->read_end=info->buffer; /* Nothing in cache */
490 }
491 else
492 {
493 if (info->myflags & MY_ENCRYPT)
494 {
495 info->write_end = info->write_buffer + info->buffer_length;
496 if (seek_offset && info->file != -1)
497 {
498 info->read_end= info->buffer;
499 _my_b_encr_read(info, 0, 0); /* prefill the buffer */
500 info->write_pos= info->read_pos;
501 info->seek_not_done=1;
502 }
503 }
504 else
505 {
506 info->write_end=(info->buffer + info->buffer_length -
507 (seek_offset & (IO_SIZE-1)));
508 }
509 info->end_of_file= ~(my_off_t) 0;
510 }
511 }
512 info->type=type;
513 info->error=0;
514 init_functions(info);
515 DBUG_RETURN(0);
516 } /* reinit_io_cache */
517
518
_my_b_read(IO_CACHE * info,uchar * Buffer,size_t Count)519 int _my_b_read(IO_CACHE *info, uchar *Buffer, size_t Count)
520 {
521 size_t left_length;
522 int res;
523
524 /* If the buffer is not empty yet, copy what is available. */
525 if ((left_length= (size_t) (info->read_end - info->read_pos)))
526 {
527 DBUG_ASSERT(Count > left_length);
528 memcpy(Buffer, info->read_pos, left_length);
529 Buffer+=left_length;
530 Count-=left_length;
531 }
532 res= info->read_function(info, Buffer, Count);
533 if (res && info->error >= 0)
534 info->error+= (int)left_length; /* update number or read bytes */
535 return res;
536 }
537
_my_b_write(IO_CACHE * info,const uchar * Buffer,size_t Count)538 int _my_b_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
539 {
540 size_t rest_length;
541 int res;
542
543 /* Always use my_b_flush_io_cache() to flush write_buffer! */
544 DBUG_ASSERT(Buffer != info->write_buffer);
545
546 if (info->pos_in_file + info->buffer_length > info->end_of_file)
547 {
548 my_errno=errno=EFBIG;
549 return info->error = -1;
550 }
551
552 rest_length= (size_t) (info->write_end - info->write_pos);
553 DBUG_ASSERT(Count >= rest_length);
554 memcpy(info->write_pos, Buffer, (size_t) rest_length);
555 Buffer+=rest_length;
556 Count-=rest_length;
557 info->write_pos+=rest_length;
558
559 if (my_b_flush_io_cache(info, 1))
560 return 1;
561
562 if (Count)
563 {
564 my_off_t old_pos_in_file= info->pos_in_file;
565 res= info->write_function(info, Buffer, Count);
566 Count-= (size_t) (info->pos_in_file - old_pos_in_file);
567 Buffer+= info->pos_in_file - old_pos_in_file;
568 }
569 else
570 res= 0;
571
572 if (!res && Count)
573 {
574 memcpy(info->write_pos, Buffer, Count);
575 info->write_pos+= Count;
576 }
577 return res;
578 }
579
580 /*
581 Read buffered.
582
583 SYNOPSIS
584 _my_b_cache_read()
585 info IO_CACHE pointer
586 Buffer Buffer to retrieve count bytes from file
587 Count Number of bytes to read into Buffer
588
589 NOTE
590 This function is only called from the my_b_read() macro when there
591 isn't enough characters in the buffer to satisfy the request.
592
593 WARNING
594
595 When changing this function, be careful with handling file offsets
596 (end-of_file, pos_in_file). Do not cast them to possibly smaller
597 types than my_off_t unless you can be sure that their value fits.
598 Same applies to differences of file offsets.
599
600 When changing this function, check _my_b_cache_read_r(). It might need the
601 same change.
602
603 RETURN
604 0 we succeeded in reading all data
605 1 Error: couldn't read requested characters. In this case:
606 If info->error == -1, we got a read error.
607 Otherwise info->error contains the number of bytes in Buffer.
608 */
609
_my_b_cache_read(IO_CACHE * info,uchar * Buffer,size_t Count)610 int _my_b_cache_read(IO_CACHE *info, uchar *Buffer, size_t Count)
611 {
612 size_t length= 0, diff_length, left_length= 0, max_length;
613 my_off_t pos_in_file;
614 DBUG_ENTER("_my_b_cache_read");
615
616 /* pos_in_file always point on where info->buffer was read */
617 pos_in_file=info->pos_in_file+ (size_t) (info->read_end - info->buffer);
618
619 /*
620 Whenever a function which operates on IO_CACHE flushes/writes
621 some part of the IO_CACHE to disk it will set the property
622 "seek_not_done" to indicate this to other functions operating
623 on the IO_CACHE.
624 */
625 if (info->seek_not_done)
626 {
627 if ((mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0))
628 != MY_FILEPOS_ERROR))
629 {
630 /* No error, reset seek_not_done flag. */
631 info->seek_not_done= 0;
632
633 if (info->next_file_user)
634 {
635 IO_CACHE *c;
636 for (c= info->next_file_user;
637 c!= info;
638 c= c->next_file_user)
639 {
640 c->seek_not_done= 1;
641 }
642 }
643 }
644 else
645 {
646 /*
647 If the seek failed and the error number is ESPIPE, it is because
648 info->file is a pipe or socket or FIFO. We never should have tried
649 to seek on that. See Bugs#25807 and #22828 for more info.
650 */
651 DBUG_ASSERT(my_errno != ESPIPE);
652 info->error= -1;
653 DBUG_RETURN(1);
654 }
655 }
656
657 /*
658 Calculate, how much we are within a IO_SIZE block. Ideally this
659 should be zero.
660 */
661 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
662
663 /*
664 If more than a block plus the rest of the current block is wanted,
665 we do read directly, without filling the buffer.
666 */
667 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
668 { /* Fill first intern buffer */
669 size_t read_length;
670 if (info->end_of_file <= pos_in_file)
671 {
672 /* End of file. Return, what we did copy from the buffer. */
673 info->error= (int) left_length;
674 info->seek_not_done=1;
675 DBUG_RETURN(1);
676 }
677 /*
678 Crop the wanted count to a multiple of IO_SIZE and subtract,
679 what we did already read from a block. That way, the read will
680 end aligned with a block.
681 */
682 length= IO_ROUND_DN(Count) - diff_length;
683 if ((read_length= mysql_file_read(info->file,Buffer, length, info->myflags))
684 != length)
685 {
686 /*
687 If we didn't get, what we wanted, we either return -1 for a read
688 error, or (it's end of file), how much we got in total.
689 */
690 info->error= (read_length == (size_t) -1 ? -1 :
691 (int) (read_length+left_length));
692 info->seek_not_done=1;
693 DBUG_RETURN(1);
694 }
695 Count-=length;
696 Buffer+=length;
697 pos_in_file+=length;
698 left_length+=length;
699 diff_length=0;
700 }
701
702 /*
703 At this point, we want less than one and a partial block.
704 We will read a full cache, minus the number of bytes, we are
705 within a block already. So we will reach new alignment.
706 */
707 max_length= info->read_length-diff_length;
708 /* We will not read past end of file. */
709 if (info->type != READ_FIFO &&
710 max_length > (info->end_of_file - pos_in_file))
711 max_length= (size_t) (info->end_of_file - pos_in_file);
712 /*
713 If there is nothing left to read,
714 we either are done, or we failed to fulfill the request.
715 Otherwise, we read max_length into the cache.
716 */
717 if (!max_length)
718 {
719 if (Count)
720 {
721 /* We couldn't fulfil the request. Return, how much we got. */
722 info->error= (int) left_length;
723 DBUG_RETURN(1);
724 }
725 else
726 {
727 info->error= 0;
728 if (length == 0) /* nothing was read */
729 DBUG_RETURN(0); /* EOF */
730
731 length= 0; /* non-zero size read was done */
732 }
733 }
734 else
735 {
736 if (info->next_file_user)
737 {
738 IO_CACHE *c;
739 for (c= info->next_file_user;
740 c!= info;
741 c= c->next_file_user)
742 {
743 c->seek_not_done= 1;
744 }
745 }
746 if ((length= mysql_file_read(info->file,info->buffer, max_length,
747 info->myflags)) < Count ||
748 length == (size_t) -1)
749 {
750 /*
751 We got an read error, or less than requested (end of file).
752 If not a read error, copy, what we got.
753 */
754 if (length != (size_t) -1)
755 memcpy(Buffer, info->buffer, length);
756 info->pos_in_file= pos_in_file;
757 /* For a read error, return -1, otherwise, what we got in total. */
758 info->error= length == (size_t) -1 ? -1 : (int) (length+left_length);
759 info->read_pos=info->read_end=info->buffer;
760 info->seek_not_done=1;
761 DBUG_RETURN(1);
762 }
763 }
764 /*
765 Count is the remaining number of bytes requested.
766 length is the amount of data in the cache.
767 Read Count bytes from the cache.
768 */
769 info->read_pos=info->buffer+Count;
770 info->read_end=info->buffer+length;
771 info->pos_in_file=pos_in_file;
772 if (Count)
773 memcpy(Buffer, info->buffer, Count);
774 DBUG_RETURN(0);
775 }
776
777
778 /*
779 Prepare IO_CACHE for shared use.
780
781 SYNOPSIS
782 init_io_cache_share()
783 read_cache A read cache. This will be copied for
784 every thread after setup.
785 cshare The share.
786 write_cache If non-NULL a write cache that is to be
787 synchronized with the read caches.
788 num_threads Number of threads sharing the cache
789 including the write thread if any.
790
791 DESCRIPTION
792
793 The shared cache is used so: One IO_CACHE is initialized with
794 init_io_cache(). This includes the allocation of a buffer. Then a
795 share is allocated and init_io_cache_share() is called with the io
796 cache and the share. Then the io cache is copied for each thread. So
797 every thread has its own copy of IO_CACHE. But the allocated buffer
798 is shared because cache->buffer is the same for all caches.
799
800 One thread reads data from the file into the buffer. All threads
801 read from the buffer, but every thread maintains its own set of
802 pointers into the buffer. When all threads have used up the buffer
803 contents, one of the threads reads the next block of data into the
804 buffer. To accomplish this, each thread enters the cache lock before
805 accessing the buffer. They wait in lock_io_cache() until all threads
806 joined the lock. The last thread entering the lock is in charge of
807 reading from file to buffer. It wakes all threads when done.
808
809 Synchronizing a write cache to the read caches works so: Whenever
810 the write buffer needs a flush, the write thread enters the lock and
811 waits for all other threads to enter the lock too. They do this when
812 they have used up the read buffer. When all threads are in the lock,
813 the write thread copies the write buffer to the read buffer and
814 wakes all threads.
815
816 share->running_threads is the number of threads not being in the
817 cache lock. When entering lock_io_cache() the number is decreased.
818 When the thread that fills the buffer enters unlock_io_cache() the
819 number is reset to the number of threads. The condition
820 running_threads == 0 means that all threads are in the lock. Bumping
821 up the number to the full count is non-intuitive. But increasing the
822 number by one for each thread that leaves the lock could lead to a
823 solo run of one thread. The last thread to join a lock reads from
824 file to buffer, wakes the other threads, processes the data in the
825 cache and enters the lock again. If no other thread left the lock
826 meanwhile, it would think it's the last one again and read the next
827 block...
828
829 The share has copies of 'error', 'buffer', 'read_end', and
830 'pos_in_file' from the thread that filled the buffer. We may not be
831 able to access this information directly from its cache because the
832 thread may be removed from the share before the variables could be
833 copied by all other threads. Or, if a write buffer is synchronized,
834 it would change its 'pos_in_file' after waking the other threads,
835 possibly before they could copy its value.
836
837 However, the 'buffer' variable in the share is for a synchronized
838 write cache. It needs to know where to put the data. Otherwise it
839 would need access to the read cache of one of the threads that is
840 not yet removed from the share.
841
842 RETURN
843 void
844 */
845
init_io_cache_share(IO_CACHE * read_cache,IO_CACHE_SHARE * cshare,IO_CACHE * write_cache,uint num_threads)846 void init_io_cache_share(IO_CACHE *read_cache, IO_CACHE_SHARE *cshare,
847 IO_CACHE *write_cache, uint num_threads)
848 {
849 DBUG_ENTER("init_io_cache_share");
850 DBUG_PRINT("io_cache_share", ("read_cache: %p share: %p "
851 "write_cache: %p threads: %u",
852 read_cache, cshare,
853 write_cache, num_threads));
854
855 DBUG_ASSERT(num_threads > 1);
856 DBUG_ASSERT(read_cache->type == READ_CACHE);
857 DBUG_ASSERT(!write_cache || (write_cache->type == WRITE_CACHE));
858
859 mysql_mutex_init(key_IO_CACHE_SHARE_mutex,
860 &cshare->mutex, MY_MUTEX_INIT_FAST);
861 mysql_cond_init(key_IO_CACHE_SHARE_cond, &cshare->cond, 0);
862 mysql_cond_init(key_IO_CACHE_SHARE_cond_writer, &cshare->cond_writer, 0);
863
864 cshare->running_threads= num_threads;
865 cshare->total_threads= num_threads;
866 cshare->error= 0; /* Initialize. */
867 cshare->buffer= read_cache->buffer;
868 cshare->read_end= NULL; /* See function comment of lock_io_cache(). */
869 cshare->pos_in_file= 0; /* See function comment of lock_io_cache(). */
870 cshare->source_cache= write_cache; /* Can be NULL. */
871
872 read_cache->share= cshare;
873 read_cache->read_function= _my_b_cache_read_r;
874
875 if (write_cache)
876 {
877 write_cache->share= cshare;
878 write_cache->write_function= _my_b_cache_write_r;
879 }
880
881 DBUG_VOID_RETURN;
882 }
883
884
885 /*
886 Remove a thread from shared access to IO_CACHE.
887
888 SYNOPSIS
889 remove_io_thread()
890 cache The IO_CACHE to be removed from the share.
891
892 NOTE
893
894 Every thread must do that on exit for not to deadlock other threads.
895
896 The last thread destroys the pthread resources.
897
898 A writer flushes its cache first.
899
900 RETURN
901 void
902 */
903
remove_io_thread(IO_CACHE * cache)904 void remove_io_thread(IO_CACHE *cache)
905 {
906 IO_CACHE_SHARE *cshare= cache->share;
907 uint total;
908 DBUG_ENTER("remove_io_thread");
909
910 /* If the writer goes, it needs to flush the write cache. */
911 if (cache == cshare->source_cache)
912 flush_io_cache(cache);
913
914 mysql_mutex_lock(&cshare->mutex);
915 DBUG_PRINT("io_cache_share", ("%s: %p",
916 (cache == cshare->source_cache) ?
917 "writer" : "reader", cache));
918
919 /* Remove from share. */
920 total= --cshare->total_threads;
921 DBUG_PRINT("io_cache_share", ("remaining threads: %u", total));
922
923 /* Detach from share. */
924 cache->share= NULL;
925
926 /* If the writer goes, let the readers know. */
927 if (cache == cshare->source_cache)
928 {
929 DBUG_PRINT("io_cache_share", ("writer leaves"));
930 cshare->source_cache= NULL;
931 }
932
933 /* If all threads are waiting for me to join the lock, wake them. */
934 if (!--cshare->running_threads)
935 {
936 DBUG_PRINT("io_cache_share", ("the last running thread leaves, wake all"));
937 mysql_cond_signal(&cshare->cond_writer);
938 mysql_cond_broadcast(&cshare->cond);
939 }
940
941 mysql_mutex_unlock(&cshare->mutex);
942
943 if (!total)
944 {
945 DBUG_PRINT("io_cache_share", ("last thread removed, destroy share"));
946 mysql_cond_destroy (&cshare->cond_writer);
947 mysql_cond_destroy (&cshare->cond);
948 mysql_mutex_destroy(&cshare->mutex);
949 }
950
951 DBUG_VOID_RETURN;
952 }
953
954
955 /*
956 Lock IO cache and wait for all other threads to join.
957
958 SYNOPSIS
959 lock_io_cache()
960 cache The cache of the thread entering the lock.
961 pos File position of the block to read.
962 Unused for the write thread.
963
964 DESCRIPTION
965
966 Wait for all threads to finish with the current buffer. We want
967 all threads to proceed in concert. The last thread to join
968 lock_io_cache() will read the block from file and all threads start
969 to use it. Then they will join again for reading the next block.
970
971 The waiting threads detect a fresh buffer by comparing
972 cshare->pos_in_file with the position they want to process next.
973 Since the first block may start at position 0, we take
974 cshare->read_end as an additional condition. This variable is
975 initialized to NULL and will be set after a block of data is written
976 to the buffer.
977
978 RETURN
979 1 OK, lock in place, go ahead and read.
980 0 OK, unlocked, another thread did the read.
981 */
982
lock_io_cache(IO_CACHE * cache,my_off_t pos)983 static int lock_io_cache(IO_CACHE *cache, my_off_t pos)
984 {
985 IO_CACHE_SHARE *cshare= cache->share;
986 DBUG_ENTER("lock_io_cache");
987
988 /* Enter the lock. */
989 mysql_mutex_lock(&cshare->mutex);
990 cshare->running_threads--;
991 DBUG_PRINT("io_cache_share", ("%s: %p pos: %lu running: %u",
992 (cache == cshare->source_cache) ?
993 "writer" : "reader", cache, (ulong) pos,
994 cshare->running_threads));
995
996 if (cshare->source_cache)
997 {
998 /* A write cache is synchronized to the read caches. */
999
1000 if (cache == cshare->source_cache)
1001 {
1002 /* The writer waits until all readers are here. */
1003 while (cshare->running_threads)
1004 {
1005 DBUG_PRINT("io_cache_share", ("writer waits in lock"));
1006 mysql_cond_wait(&cshare->cond_writer, &cshare->mutex);
1007 }
1008 DBUG_PRINT("io_cache_share", ("writer awoke, going to copy"));
1009
1010 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1011 DBUG_RETURN(1);
1012 }
1013
1014 /* The last thread wakes the writer. */
1015 if (!cshare->running_threads)
1016 {
1017 DBUG_PRINT("io_cache_share", ("waking writer"));
1018 mysql_cond_signal(&cshare->cond_writer);
1019 }
1020
1021 /*
1022 Readers wait until the data is copied from the writer. Another
1023 reason to stop waiting is the removal of the write thread. If this
1024 happens, we leave the lock with old data in the buffer.
1025 */
1026 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1027 cshare->source_cache)
1028 {
1029 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1030 mysql_cond_wait(&cshare->cond, &cshare->mutex);
1031 }
1032
1033 /*
1034 If the writer was removed from the share while this thread was
1035 asleep, we need to simulate an EOF condition. The writer cannot
1036 reset the share variables as they might still be in use by readers
1037 of the last block. When we awake here then because the last
1038 joining thread signalled us. If the writer is not the last, it
1039 will not signal. So it is safe to clear the buffer here.
1040 */
1041 if (!cshare->read_end || (cshare->pos_in_file < pos))
1042 {
1043 DBUG_PRINT("io_cache_share", ("reader found writer removed. EOF"));
1044 cshare->read_end= cshare->buffer; /* Empty buffer. */
1045 cshare->error= 0; /* EOF is not an error. */
1046 }
1047 }
1048 else
1049 {
1050 /*
1051 There are read caches only. The last thread arriving in
1052 lock_io_cache() continues with a locked cache and reads the block.
1053 */
1054 if (!cshare->running_threads)
1055 {
1056 DBUG_PRINT("io_cache_share", ("last thread joined, going to read"));
1057 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1058 DBUG_RETURN(1);
1059 }
1060
1061 /*
1062 All other threads wait until the requested block is read by the
1063 last thread arriving. Another reason to stop waiting is the
1064 removal of a thread. If this leads to all threads being in the
1065 lock, we have to continue also. The first of the awaken threads
1066 will then do the read.
1067 */
1068 while ((!cshare->read_end || (cshare->pos_in_file < pos)) &&
1069 cshare->running_threads)
1070 {
1071 DBUG_PRINT("io_cache_share", ("reader waits in lock"));
1072 mysql_cond_wait(&cshare->cond, &cshare->mutex);
1073 }
1074
1075 /* If the block is not yet read, continue with a locked cache and read. */
1076 if (!cshare->read_end || (cshare->pos_in_file < pos))
1077 {
1078 DBUG_PRINT("io_cache_share", ("reader awoke, going to read"));
1079 /* Stay locked. Leave the lock later by unlock_io_cache(). */
1080 DBUG_RETURN(1);
1081 }
1082
1083 /* Another thread did read the block already. */
1084 }
1085 DBUG_PRINT("io_cache_share", ("reader awoke, going to process %u bytes",
1086 (uint) (cshare->read_end ? (size_t)
1087 (cshare->read_end - cshare->buffer) :
1088 0)));
1089
1090 /*
1091 Leave the lock. Do not call unlock_io_cache() later. The thread that
1092 filled the buffer did this and marked all threads as running.
1093 */
1094 mysql_mutex_unlock(&cshare->mutex);
1095 DBUG_RETURN(0);
1096 }
1097
1098
1099 /*
1100 Unlock IO cache.
1101
1102 SYNOPSIS
1103 unlock_io_cache()
1104 cache The cache of the thread leaving the lock.
1105
1106 NOTE
1107 This is called by the thread that filled the buffer. It marks all
1108 threads as running and awakes them. This must not be done by any
1109 other thread.
1110
1111 Do not signal cond_writer. Either there is no writer or the writer
1112 is the only one who can call this function.
1113
1114 The reason for resetting running_threads to total_threads before
1115 waking all other threads is that it could be possible that this
1116 thread is so fast with processing the buffer that it enters the lock
1117 before even one other thread has left it. If every awoken thread
1118 would increase running_threads by one, this thread could think that
1119 he is again the last to join and would not wait for the other
1120 threads to process the data.
1121
1122 RETURN
1123 void
1124 */
1125
unlock_io_cache(IO_CACHE * cache)1126 static void unlock_io_cache(IO_CACHE *cache)
1127 {
1128 IO_CACHE_SHARE *cshare= cache->share;
1129 DBUG_ENTER("unlock_io_cache");
1130 DBUG_PRINT("io_cache_share", ("%s: %p pos: %lu running: %u",
1131 (cache == cshare->source_cache) ?
1132 "writer" : "reader",
1133 cache, (ulong) cshare->pos_in_file,
1134 cshare->total_threads));
1135
1136 cshare->running_threads= cshare->total_threads;
1137 mysql_cond_broadcast(&cshare->cond);
1138 mysql_mutex_unlock(&cshare->mutex);
1139 DBUG_VOID_RETURN;
1140 }
1141
1142
1143 /*
1144 Read from IO_CACHE when it is shared between several threads.
1145
1146 SYNOPSIS
1147 _my_b_cache_read_r()
1148 cache IO_CACHE pointer
1149 Buffer Buffer to retrieve count bytes from file
1150 Count Number of bytes to read into Buffer
1151
1152 NOTE
1153 This function is only called from the my_b_read() macro when there
1154 isn't enough characters in the buffer to satisfy the request.
1155
1156 IMPLEMENTATION
1157
1158 It works as follows: when a thread tries to read from a file (that
1159 is, after using all the data from the (shared) buffer), it just
1160 hangs on lock_io_cache(), waiting for other threads. When the very
1161 last thread attempts a read, lock_io_cache() returns 1, the thread
1162 does actual IO and unlock_io_cache(), which signals all the waiting
1163 threads that data is in the buffer.
1164
1165 WARNING
1166
1167 When changing this function, be careful with handling file offsets
1168 (end-of_file, pos_in_file). Do not cast them to possibly smaller
1169 types than my_off_t unless you can be sure that their value fits.
1170 Same applies to differences of file offsets. (Bug #11527)
1171
1172 When changing this function, check _my_b_cache_read(). It might need the
1173 same change.
1174
1175 RETURN
1176 0 we succeeded in reading all data
1177 1 Error: can't read requested characters
1178 */
1179
_my_b_cache_read_r(IO_CACHE * cache,uchar * Buffer,size_t Count)1180 static int _my_b_cache_read_r(IO_CACHE *cache, uchar *Buffer, size_t Count)
1181 {
1182 my_off_t pos_in_file;
1183 size_t length, diff_length, left_length= 0;
1184 IO_CACHE_SHARE *cshare= cache->share;
1185 DBUG_ENTER("_my_b_cache_read_r");
1186 DBUG_ASSERT(!(cache->myflags & MY_ENCRYPT));
1187
1188 while (Count)
1189 {
1190 size_t cnt, len;
1191
1192 pos_in_file= cache->pos_in_file + (cache->read_end - cache->buffer);
1193 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1194 length=IO_ROUND_UP(Count+diff_length)-diff_length;
1195 length= ((length <= cache->read_length) ?
1196 length + IO_ROUND_DN(cache->read_length - length) :
1197 length - IO_ROUND_UP(length - cache->read_length));
1198 if (cache->type != READ_FIFO &&
1199 (length > (cache->end_of_file - pos_in_file)))
1200 length= (size_t) (cache->end_of_file - pos_in_file);
1201 if (length == 0)
1202 {
1203 cache->error= (int) left_length;
1204 DBUG_RETURN(1);
1205 }
1206 if (lock_io_cache(cache, pos_in_file))
1207 {
1208 /* With a synchronized write/read cache we won't come here... */
1209 DBUG_ASSERT(!cshare->source_cache);
1210 /*
1211 ... unless the writer has gone before this thread entered the
1212 lock. Simulate EOF in this case. It can be distinguished by
1213 cache->file.
1214 */
1215 if (cache->file < 0)
1216 len= 0;
1217 else
1218 {
1219 /*
1220 Whenever a function which operates on IO_CACHE flushes/writes
1221 some part of the IO_CACHE to disk it will set the property
1222 "seek_not_done" to indicate this to other functions operating
1223 on the IO_CACHE.
1224 */
1225 if (cache->seek_not_done)
1226 {
1227 if (mysql_file_seek(cache->file, pos_in_file, MY_SEEK_SET, MYF(0))
1228 == MY_FILEPOS_ERROR)
1229 {
1230 cache->error= -1;
1231 unlock_io_cache(cache);
1232 DBUG_RETURN(1);
1233 }
1234 }
1235 len= mysql_file_read(cache->file, cache->buffer, length, cache->myflags);
1236 }
1237 DBUG_PRINT("io_cache_share", ("read %lu bytes", (ulong) len));
1238
1239 cache->read_end= cache->buffer + (len == (size_t) -1 ? 0 : len);
1240 cache->error= (len == length ? 0 : (int) len);
1241 cache->pos_in_file= pos_in_file;
1242
1243 /* Copy important values to the share. */
1244 cshare->error= cache->error;
1245 cshare->read_end= cache->read_end;
1246 cshare->pos_in_file= pos_in_file;
1247
1248 /* Mark all threads as running and wake them. */
1249 unlock_io_cache(cache);
1250 }
1251 else
1252 {
1253 /*
1254 With a synchronized write/read cache readers always come here.
1255 Copy important values from the share.
1256 */
1257 cache->error= cshare->error;
1258 cache->read_end= cshare->read_end;
1259 cache->pos_in_file= cshare->pos_in_file;
1260
1261 len= ((cache->error == -1) ? (size_t) -1 :
1262 (size_t) (cache->read_end - cache->buffer));
1263 }
1264 cache->read_pos= cache->buffer;
1265 cache->seek_not_done= 0;
1266 if (len == 0 || len == (size_t) -1)
1267 {
1268 DBUG_PRINT("io_cache_share", ("reader error. len %lu left %lu",
1269 (ulong) len, (ulong) left_length));
1270 cache->error= (int) left_length;
1271 DBUG_RETURN(1);
1272 }
1273 cnt= (len > Count) ? Count : len;
1274 if (cnt)
1275 memcpy(Buffer, cache->read_pos, cnt);
1276 Count -= cnt;
1277 Buffer+= cnt;
1278 left_length+= cnt;
1279 cache->read_pos+= cnt;
1280 }
1281 DBUG_RETURN(0);
1282 }
1283
1284
1285 /*
1286 Copy data from write cache to read cache.
1287
1288 SYNOPSIS
1289 copy_to_read_buffer()
1290 write_cache The write cache.
1291 write_buffer The source of data, mostly the cache buffer.
1292 write_length The number of bytes to copy.
1293
1294 NOTE
1295 The write thread will wait for all read threads to join the cache
1296 lock. Then it copies the data over and wakes the read threads.
1297
1298 RETURN
1299 void
1300 */
1301
copy_to_read_buffer(IO_CACHE * write_cache,const uchar * write_buffer,my_off_t pos_in_file)1302 static void copy_to_read_buffer(IO_CACHE *write_cache,
1303 const uchar *write_buffer, my_off_t pos_in_file)
1304 {
1305 size_t write_length= (size_t) (write_cache->pos_in_file - pos_in_file);
1306 IO_CACHE_SHARE *cshare= write_cache->share;
1307
1308 DBUG_ASSERT(cshare->source_cache == write_cache);
1309 /*
1310 write_length is usually less or equal to buffer_length.
1311 It can be bigger if _my_b_cache_write_r() is called with a big length.
1312 */
1313 while (write_length)
1314 {
1315 size_t copy_length= MY_MIN(write_length, write_cache->buffer_length);
1316 int __attribute__((unused)) rc;
1317
1318 rc= lock_io_cache(write_cache, pos_in_file);
1319 /* The writing thread does always have the lock when it awakes. */
1320 DBUG_ASSERT(rc);
1321
1322 memcpy(cshare->buffer, write_buffer, copy_length);
1323
1324 cshare->error= 0;
1325 cshare->read_end= cshare->buffer + copy_length;
1326 cshare->pos_in_file= pos_in_file;
1327
1328 /* Mark all threads as running and wake them. */
1329 unlock_io_cache(write_cache);
1330
1331 write_buffer+= copy_length;
1332 write_length-= copy_length;
1333 }
1334 }
1335
1336
1337 /*
1338 Do sequential read from the SEQ_READ_APPEND cache.
1339
1340 We do this in three stages:
1341 - first read from info->buffer
1342 - then if there are still data to read, try the file descriptor
1343 - afterwards, if there are still data to read, try append buffer
1344
1345 RETURNS
1346 0 Success
1347 1 Failed to read
1348 */
1349
_my_b_seq_read(IO_CACHE * info,uchar * Buffer,size_t Count)1350 static int _my_b_seq_read(IO_CACHE *info, uchar *Buffer, size_t Count)
1351 {
1352 size_t length, diff_length, save_count, max_length;
1353 my_off_t pos_in_file;
1354 save_count=Count;
1355
1356 lock_append_buffer(info);
1357
1358 /* pos_in_file always point on where info->buffer was read */
1359 if ((pos_in_file=info->pos_in_file +
1360 (size_t) (info->read_end - info->buffer)) >= info->end_of_file)
1361 goto read_append_buffer;
1362
1363 /*
1364 With read-append cache we must always do a seek before we read,
1365 because the write could have moved the file pointer astray
1366 */
1367 if (mysql_file_seek(info->file, pos_in_file, MY_SEEK_SET, MYF(0)) == MY_FILEPOS_ERROR)
1368 {
1369 info->error= -1;
1370 unlock_append_buffer(info);
1371 return (1);
1372 }
1373 info->seek_not_done=0;
1374
1375 diff_length= (size_t) (pos_in_file & (IO_SIZE-1));
1376
1377 /* now the second stage begins - read from file descriptor */
1378 if (Count >= (size_t) (IO_SIZE+(IO_SIZE-diff_length)))
1379 {
1380 /* Fill first intern buffer */
1381 size_t read_length;
1382
1383 length= IO_ROUND_DN(Count) - diff_length;
1384 if ((read_length= mysql_file_read(info->file,Buffer, length,
1385 info->myflags)) == (size_t) -1)
1386 {
1387 info->error= -1;
1388 unlock_append_buffer(info);
1389 return 1;
1390 }
1391 Count-=read_length;
1392 Buffer+=read_length;
1393 pos_in_file+=read_length;
1394
1395 if (read_length != length)
1396 {
1397 /*
1398 We only got part of data; Read the rest of the data from the
1399 write buffer
1400 */
1401 goto read_append_buffer;
1402 }
1403 diff_length=0;
1404 }
1405
1406 max_length= info->read_length-diff_length;
1407 if (max_length > (info->end_of_file - pos_in_file))
1408 max_length= (size_t) (info->end_of_file - pos_in_file);
1409 if (!max_length)
1410 {
1411 if (Count)
1412 goto read_append_buffer;
1413 length=0; /* Didn't read any more chars */
1414 }
1415 else
1416 {
1417 length= mysql_file_read(info->file,info->buffer, max_length, info->myflags);
1418 if (length == (size_t) -1)
1419 {
1420 info->error= -1;
1421 unlock_append_buffer(info);
1422 return 1;
1423 }
1424 if (length < Count)
1425 {
1426 memcpy(Buffer, info->buffer, length);
1427 Count -= length;
1428 Buffer += length;
1429
1430 /*
1431 added the line below to make
1432 DBUG_ASSERT(pos_in_file==info->end_of_file) pass.
1433 otherwise this does not appear to be needed
1434 */
1435 pos_in_file += length;
1436 goto read_append_buffer;
1437 }
1438 }
1439 unlock_append_buffer(info);
1440 info->read_pos=info->buffer+Count;
1441 info->read_end=info->buffer+length;
1442 info->pos_in_file=pos_in_file;
1443 memcpy(Buffer,info->buffer,(size_t) Count);
1444 return 0;
1445
1446 read_append_buffer:
1447
1448 /*
1449 Read data from the current write buffer.
1450 Count should never be == 0 here (The code will work even if count is 0)
1451 */
1452
1453 {
1454 /* First copy the data to Count */
1455 size_t len_in_buff = (size_t) (info->write_pos - info->append_read_pos);
1456 size_t copy_len;
1457 size_t transfer_len;
1458
1459 DBUG_ASSERT(info->append_read_pos <= info->write_pos);
1460 copy_len=MY_MIN(Count, len_in_buff);
1461 memcpy(Buffer, info->append_read_pos, copy_len);
1462 info->append_read_pos += copy_len;
1463 Count -= copy_len;
1464 if (Count)
1465 info->error= (int) (save_count - Count);
1466
1467 /* Fill read buffer with data from write buffer */
1468 memcpy(info->buffer, info->append_read_pos,
1469 (size_t) (transfer_len=len_in_buff - copy_len));
1470 info->read_pos= info->buffer;
1471 info->read_end= info->buffer+transfer_len;
1472 info->append_read_pos=info->write_pos;
1473 info->pos_in_file=pos_in_file+copy_len;
1474 info->end_of_file+=len_in_buff;
1475 }
1476 unlock_append_buffer(info);
1477 return Count ? 1 : 0;
1478 }
1479
1480
1481 /* Read one byte when buffer is empty */
1482
_my_b_get(IO_CACHE * info)1483 int _my_b_get(IO_CACHE *info)
1484 {
1485 uchar buff;
1486 if ((*(info)->read_function)(info,&buff,1))
1487 return my_b_EOF;
1488 return (int) (uchar) buff;
1489 }
1490
1491 /*
1492 Write a byte buffer to IO_CACHE and flush to disk
1493 if IO_CACHE is full.
1494
1495 RETURN VALUE
1496 1 On error on write
1497 0 On success
1498 -1 On error; my_errno contains error code.
1499 */
1500
_my_b_cache_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1501 int _my_b_cache_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1502 {
1503 if (Buffer != info->write_buffer)
1504 {
1505 Count= IO_ROUND_DN(Count);
1506 if (!Count)
1507 return 0;
1508 }
1509
1510 if (info->seek_not_done)
1511 {
1512 /*
1513 Whenever a function which operates on IO_CACHE flushes/writes
1514 some part of the IO_CACHE to disk it will set the property
1515 "seek_not_done" to indicate this to other functions operating
1516 on the IO_CACHE.
1517 */
1518 if (mysql_file_seek(info->file, info->pos_in_file, MY_SEEK_SET,
1519 MYF(info->myflags & MY_WME)) == MY_FILEPOS_ERROR)
1520 {
1521 info->error= -1;
1522 return 1;
1523 }
1524 info->seek_not_done=0;
1525 }
1526 if (mysql_file_write(info->file, Buffer, Count, info->myflags | MY_NABP))
1527 return info->error= -1;
1528
1529 info->pos_in_file+= Count;
1530 return 0;
1531 }
1532
1533
1534 /*
1535 In case of a shared I/O cache with a writer we normally do direct
1536 write cache to read cache copy. Simulate this here by direct
1537 caller buffer to read cache copy. Do it after the write so that
1538 the cache readers actions on the flushed part can go in parallel
1539 with the write of the extra stuff. copy_to_read_buffer()
1540 synchronizes writer and readers so that after this call the
1541 readers can act on the extra stuff while the writer can go ahead
1542 and prepare the next output. copy_to_read_buffer() relies on
1543 info->pos_in_file.
1544 */
_my_b_cache_write_r(IO_CACHE * info,const uchar * Buffer,size_t Count)1545 static int _my_b_cache_write_r(IO_CACHE *info, const uchar *Buffer, size_t Count)
1546 {
1547 my_off_t old_pos_in_file= info->pos_in_file;
1548 int res= _my_b_cache_write(info, Buffer, Count);
1549 if (res)
1550 return res;
1551
1552 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1553 DBUG_ASSERT(info->share);
1554 copy_to_read_buffer(info, Buffer, old_pos_in_file);
1555
1556 return 0;
1557 }
1558
1559
1560 /*
1561 Append a block to the write buffer.
1562 This is done with the buffer locked to ensure that we don't read from
1563 the write buffer before we are ready with it.
1564 */
1565
my_b_append(IO_CACHE * info,const uchar * Buffer,size_t Count)1566 int my_b_append(IO_CACHE *info, const uchar *Buffer, size_t Count)
1567 {
1568 size_t rest_length,length;
1569
1570 MEM_CHECK_DEFINED(Buffer, Count);
1571
1572 /*
1573 Assert that we cannot come here with a shared cache. If we do one
1574 day, we might need to add a call to copy_to_read_buffer().
1575 */
1576 DBUG_ASSERT(!info->share);
1577 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1578
1579 lock_append_buffer(info);
1580 rest_length= (size_t) (info->write_end - info->write_pos);
1581 if (Count <= rest_length)
1582 goto end;
1583 memcpy(info->write_pos, Buffer, rest_length);
1584 Buffer+=rest_length;
1585 Count-=rest_length;
1586 info->write_pos+=rest_length;
1587 if (my_b_flush_io_cache(info,0))
1588 {
1589 unlock_append_buffer(info);
1590 return 1;
1591 }
1592 if (Count >= IO_SIZE)
1593 { /* Fill first intern buffer */
1594 length= IO_ROUND_DN(Count);
1595 if (mysql_file_write(info->file,Buffer, length, info->myflags | MY_NABP))
1596 {
1597 unlock_append_buffer(info);
1598 return info->error= -1;
1599 }
1600 Count-=length;
1601 Buffer+=length;
1602 info->end_of_file+=length;
1603 }
1604
1605 end:
1606 memcpy(info->write_pos,Buffer,(size_t) Count);
1607 info->write_pos+=Count;
1608 unlock_append_buffer(info);
1609 return 0;
1610 }
1611
1612
my_b_safe_write(IO_CACHE * info,const uchar * Buffer,size_t Count)1613 int my_b_safe_write(IO_CACHE *info, const uchar *Buffer, size_t Count)
1614 {
1615 /*
1616 Sasha: We are not writing this with the ? operator to avoid hitting
1617 a possible compiler bug. At least gcc 2.95 cannot deal with
1618 several layers of ternary operators that evaluated comma(,) operator
1619 expressions inside - I do have a test case if somebody wants it
1620 */
1621 if (info->type == SEQ_READ_APPEND)
1622 return my_b_append(info, Buffer, Count);
1623 return my_b_write(info, Buffer, Count);
1624 }
1625
1626
1627 /*
1628 Write a block to disk where part of the data may be inside the record
1629 buffer. As all write calls to the data goes through the cache,
1630 we will never get a seek over the end of the buffer
1631 */
1632
my_block_write(IO_CACHE * info,const uchar * Buffer,size_t Count,my_off_t pos)1633 int my_block_write(IO_CACHE *info, const uchar *Buffer, size_t Count,
1634 my_off_t pos)
1635 {
1636 size_t length;
1637 int error=0;
1638
1639 /*
1640 Assert that we cannot come here with a shared cache. If we do one
1641 day, we might need to add a call to copy_to_read_buffer().
1642 */
1643 DBUG_ASSERT(!info->share);
1644 DBUG_ASSERT(!(info->myflags & MY_ENCRYPT));
1645
1646 if (pos < info->pos_in_file)
1647 {
1648 /* Of no overlap, write everything without buffering */
1649 if (pos + Count <= info->pos_in_file)
1650 return (int)mysql_file_pwrite(info->file, Buffer, Count, pos,
1651 info->myflags | MY_NABP);
1652 /* Write the part of the block that is before buffer */
1653 length= (uint) (info->pos_in_file - pos);
1654 if (mysql_file_pwrite(info->file, Buffer, length, pos, info->myflags | MY_NABP))
1655 info->error= error= -1;
1656 Buffer+=length;
1657 pos+= length;
1658 Count-= length;
1659 }
1660
1661 /* Check if we want to write inside the used part of the buffer.*/
1662 length= (size_t) (info->write_end - info->buffer);
1663 if (pos < info->pos_in_file + length)
1664 {
1665 size_t offset= (size_t) (pos - info->pos_in_file);
1666 length-=offset;
1667 if (length > Count)
1668 length=Count;
1669 memcpy(info->buffer+offset, Buffer, length);
1670 Buffer+=length;
1671 Count-= length;
1672 /* Fix length of buffer if the new data was larger */
1673 if (info->buffer+length > info->write_pos)
1674 info->write_pos=info->buffer+length;
1675 if (!Count)
1676 return (error);
1677 }
1678 /* Write at the end of the current buffer; This is the normal case */
1679 if (_my_b_write(info, Buffer, Count))
1680 error= -1;
1681 return error;
1682 }
1683
1684
1685 /* Flush write cache */
1686
1687 #define LOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1688 lock_append_buffer(info);
1689 #define UNLOCK_APPEND_BUFFER if (need_append_buffer_lock) \
1690 unlock_append_buffer(info);
1691
my_b_flush_io_cache(IO_CACHE * info,int need_append_buffer_lock)1692 int my_b_flush_io_cache(IO_CACHE *info, int need_append_buffer_lock)
1693 {
1694 size_t length;
1695 my_bool append_cache= (info->type == SEQ_READ_APPEND);
1696 DBUG_ENTER("my_b_flush_io_cache");
1697 DBUG_PRINT("enter", ("cache: %p", info));
1698
1699 if (!append_cache)
1700 need_append_buffer_lock= 0;
1701
1702 if (info->type == WRITE_CACHE || append_cache)
1703 {
1704 if (info->file == -1)
1705 {
1706 if (real_open_cached_file(info))
1707 DBUG_RETURN((info->error= -1));
1708 }
1709 LOCK_APPEND_BUFFER;
1710
1711 if ((length=(size_t) (info->write_pos - info->write_buffer)))
1712 {
1713 if (append_cache)
1714 {
1715 if (mysql_file_write(info->file, info->write_buffer, length,
1716 info->myflags | MY_NABP))
1717 {
1718 info->error= -1;
1719 DBUG_RETURN(-1);
1720 }
1721 info->end_of_file+= info->write_pos - info->append_read_pos;
1722 info->append_read_pos= info->write_buffer;
1723 DBUG_ASSERT(info->end_of_file == mysql_file_tell(info->file, MYF(0)));
1724 }
1725 else
1726 {
1727 int res= info->write_function(info, info->write_buffer, length);
1728 if (res)
1729 DBUG_RETURN(res);
1730
1731 set_if_bigger(info->end_of_file, info->pos_in_file);
1732 }
1733 info->write_end= (info->write_buffer + info->buffer_length -
1734 ((info->pos_in_file + length) & (IO_SIZE - 1)));
1735 info->write_pos= info->write_buffer;
1736 ++info->disk_writes;
1737 UNLOCK_APPEND_BUFFER;
1738 DBUG_RETURN(info->error);
1739 }
1740 }
1741 UNLOCK_APPEND_BUFFER;
1742 DBUG_RETURN(0);
1743 }
1744
1745 /*
1746 Free an IO_CACHE object
1747
1748 SYNOPSOS
1749 end_io_cache()
1750 info IO_CACHE Handle to free
1751
1752 NOTES
1753 It's currently safe to call this if one has called init_io_cache()
1754 on the 'info' object, even if init_io_cache() failed.
1755 This function is also safe to call twice with the same handle.
1756 Note that info->file is not reset as the caller may still use ut for my_close()
1757
1758 RETURN
1759 0 ok
1760 # Error
1761 */
1762
end_io_cache(IO_CACHE * info)1763 int end_io_cache(IO_CACHE *info)
1764 {
1765 int error=0;
1766 DBUG_ENTER("end_io_cache");
1767 DBUG_PRINT("enter",("cache: %p", info));
1768
1769 /*
1770 Every thread must call remove_io_thread(). The last one destroys
1771 the share elements.
1772 */
1773 DBUG_ASSERT(!info->share || !info->share->total_threads);
1774
1775 if (info->alloced_buffer)
1776 {
1777 info->alloced_buffer=0;
1778 if (info->file != -1) /* File doesn't exist */
1779 error= my_b_flush_io_cache(info,1);
1780 my_free(info->buffer);
1781 info->buffer=info->read_pos=(uchar*) 0;
1782 }
1783 if (info->type == SEQ_READ_APPEND)
1784 {
1785 /* Destroy allocated mutex */
1786 mysql_mutex_destroy(&info->append_buffer_lock);
1787 }
1788 info->share= 0;
1789 info->type= TYPE_NOT_SET; /* Ensure that flush_io_cache() does nothing */
1790 info->write_end= 0; /* Ensure that my_b_write() fails */
1791 info->write_function= 0; /* my_b_write will crash if used */
1792 DBUG_RETURN(error);
1793 } /* end_io_cache */
1794
1795
1796 /**********************************************************************
1797 Testing of MF_IOCACHE
1798 **********************************************************************/
1799
1800 #ifdef MAIN
1801
1802 #include <my_dir.h>
1803
die(const char * fmt,...)1804 void die(const char* fmt, ...)
1805 {
1806 va_list va_args;
1807 va_start(va_args,fmt);
1808 fprintf(stderr,"Error:");
1809 vfprintf(stderr, fmt,va_args);
1810 fprintf(stderr,", errno=%d\n", errno);
1811 va_end(va_args);
1812 exit(1);
1813 }
1814
open_file(const char * fname,IO_CACHE * info,int cache_size)1815 int open_file(const char* fname, IO_CACHE* info, int cache_size)
1816 {
1817 int fd;
1818 if ((fd=my_open(fname,O_CREAT | O_RDWR,MYF(MY_WME))) < 0)
1819 die("Could not open %s", fname);
1820 if (init_io_cache(info, fd, cache_size, SEQ_READ_APPEND, 0,0,MYF(MY_WME)))
1821 die("failed in init_io_cache()");
1822 return fd;
1823 }
1824
close_file(IO_CACHE * info)1825 void close_file(IO_CACHE* info)
1826 {
1827 end_io_cache(info);
1828 my_close(info->file, MYF(MY_WME));
1829 }
1830
main(int argc,char ** argv)1831 int main(int argc, char** argv)
1832 {
1833 IO_CACHE sra_cache; /* SEQ_READ_APPEND */
1834 MY_STAT status;
1835 const char* fname="/tmp/iocache.test";
1836 int cache_size=16384;
1837 char llstr_buf[22];
1838 int max_block,total_bytes=0;
1839 int i,num_loops=100,error=0;
1840 char *p;
1841 char* block, *block_end;
1842 MY_INIT(argv[0]);
1843 max_block = cache_size*3;
1844 if (!(block=(char*)my_malloc(max_block,MYF(MY_WME))))
1845 die("Not enough memory to allocate test block");
1846 block_end = block + max_block;
1847 for (p = block,i=0; p < block_end;i++)
1848 {
1849 *p++ = (char)i;
1850 }
1851 if (my_stat(fname,&status, MYF(0)) &&
1852 my_delete(fname,MYF(MY_WME)))
1853 {
1854 die("Delete of %s failed, aborting", fname);
1855 }
1856 open_file(fname,&sra_cache, cache_size);
1857 for (i = 0; i < num_loops; i++)
1858 {
1859 char buf[4];
1860 int block_size = abs(rand() % max_block);
1861 int4store(buf, block_size);
1862 if (my_b_append(&sra_cache,buf,4) ||
1863 my_b_append(&sra_cache, block, block_size))
1864 die("write failed");
1865 total_bytes += 4+block_size;
1866 }
1867 close_file(&sra_cache);
1868 my_free(block);
1869 if (!my_stat(fname,&status,MYF(MY_WME)))
1870 die("%s failed to stat, but I had just closed it,\
1871 wonder how that happened");
1872 printf("Final size of %s is %s, wrote %d bytes\n",fname,
1873 llstr(status.st_size,llstr_buf),
1874 total_bytes);
1875 my_delete(fname, MYF(MY_WME));
1876 /* check correctness of tests */
1877 if (total_bytes != status.st_size)
1878 {
1879 fprintf(stderr,"Not the same number of bytes actually in file as bytes \
1880 supposedly written\n");
1881 error=1;
1882 }
1883 exit(error);
1884 return 0;
1885 }
1886 #endif
1887