1 /* Copyright (C) 2000-2008 MySQL AB, 2008-2011 Monty Program Ab
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License as published by
5    the Free Software Foundation; version 2 of the License.
6 
7    This program is distributed in the hope that it will be useful,
8    but WITHOUT ANY WARRANTY; without even the implied warranty of
9    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10    GNU General Public License for more details.
11 
12    You should have received a copy of the GNU General Public License
13    along with this program; if not, write to the Free Software
14    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA */
15 
16 /*
17   These functions handle page caching for Maria tables.
18 
19   One cache can handle many files.
20   It must contain buffers of the same blocksize.
21   init_pagecache() should be used to init cache handler.
22 
23   The free list (free_block_list) is a stack like structure.
24   When a block is freed by free_block(), it is pushed onto the stack.
25   When a new block is required it is first tried to pop one from the stack.
26   If the stack is empty, it is tried to get a never-used block from the pool.
27   If this is empty too, then a block is taken from the LRU ring, flushing it
28   to disk, if necessary. This is handled in find_block().
29   With the new free list, the blocks can have three temperatures:
30   hot, warm and cold (which is free). This is remembered in the block header
31   by the enum PCBLOCK_TEMPERATURE temperature variable. Remembering the
32   temperature is necessary to correctly count the number of warm blocks,
33   which is required to decide when blocks are allowed to become hot. Whenever
34   a block is inserted to another (sub-)chain, we take the old and new
35   temperature into account to decide if we got one more or less warm block.
36   blocks_unused is the sum of never used blocks in the pool and of currently
37   free blocks. blocks_used is the number of blocks fetched from the pool and
38   as such gives the maximum number of in-use blocks at any time.
39 
40   TODO: Write operation locks whole cache till the end of the operation.
41     Should be fixed.
42 */
43 
44 #include "maria_def.h"
45 #include <m_string.h>
46 #include "ma_pagecache.h"
47 #include "ma_blockrec.h"
48 #include <my_bit.h>
49 #include <errno.h>
50 
51 /*
52   Some compilation flags have been added specifically for this module
53   to control the following:
54   - not to let a thread to yield the control when reading directly
55     from page cache, which might improve performance in many cases;
56     to enable this add:
57     #define SERIALIZED_READ_FROM_CACHE
58   - to set an upper bound for number of threads simultaneously
59     using the page cache; this setting helps to determine an optimal
60     size for hash table and improve performance when the number of
61     blocks in the page cache much less than the number of threads
62     accessing it;
63     to set this number equal to <N> add
64       #define MAX_THREADS <N>
65   - to substitute calls of mysql_cond_wait for calls of
66     mysql_cond_timedwait (wait with timeout set up);
67     this setting should be used only when you want to trap a deadlock
68     situation, which theoretically should not happen;
69     to set timeout equal to <T> seconds add
70       #define PAGECACHE_TIMEOUT <T>
71   - to enable the module traps and to send debug information from
72     page cache module to a special debug log add:
73       #define PAGECACHE_DEBUG
74     the name of this debug log file <LOG NAME> can be set through:
75       #define PAGECACHE_DEBUG_LOG  <LOG NAME>
76     if the name is not defined, it's set by default;
77     if the PAGECACHE_DEBUG flag is not set up and we are in a debug
78     mode, i.e. when ! defined(DBUG_OFF), the debug information from the
79     module is sent to the regular debug log.
80 
81   Example of the settings:
82     #define SERIALIZED_READ_FROM_CACHE
83     #define MAX_THREADS   100
84     #define PAGECACHE_TIMEOUT  1
85     #define PAGECACHE_DEBUG
86     #define PAGECACHE_DEBUG_LOG  "my_pagecache_debug.log"
87 */
88 
89 /*
90   In key cache we have external raw locking here we use
91   SERIALIZED_READ_FROM_CACHE to avoid problem of reading
92   not consistent data from the page.
93   (keycache functions (key_cache_read(), key_cache_insert() and
94   key_cache_write()) rely on external MyISAM lock, we don't)
95 */
96 #define SERIALIZED_READ_FROM_CACHE yes
97 
98 #define PCBLOCK_INFO(B) \
99   DBUG_PRINT("info", \
100              ("block: %p  fd: %lu  page: %lu  status: 0x%x  " \
101               "hshL: %p  requests: %u/%u  wrlocks: %u  rdlocks: %u  " \
102               "rdlocks_q: %u  pins: %u  type: %s", \
103               (B), \
104               (ulong)((B)->hash_link ? \
105                       (B)->hash_link->file.file : \
106                       0), \
107               (ulong)((B)->hash_link ? \
108                       (B)->hash_link->pageno : \
109                       0), \
110               (uint) (B)->status,    \
111               (B)->hash_link, \
112               (uint) (B)->requests, \
113               (uint)((B)->hash_link ? \
114                      (B)->hash_link->requests : \
115                        0), \
116               (B)->wlocks, (B)->rlocks, (B)->rlocks_queue, \
117               (uint)(B)->pins, \
118               page_cache_page_type_str[(B)->type]))
119 
120 /* TODO: put it to my_static.c */
121 my_bool my_disable_flush_pagecache_blocks= 0;
122 
123 #define STRUCT_PTR(TYPE, MEMBER, a)                                           \
124           (TYPE *) ((char *) (a) - offsetof(TYPE, MEMBER))
125 
126 /* types of condition variables */
127 #define  COND_FOR_REQUESTED 0  /* queue of thread waiting for read operation */
128 #define  COND_FOR_SAVED     1  /* queue of thread waiting for flush */
129 #define  COND_FOR_WRLOCK    2  /* queue of write lock */
130 #define  COND_SIZE          3  /* number of COND_* queues */
131 
132 typedef mysql_cond_t KEYCACHE_CONDVAR;
133 
134 /* descriptor of the page in the page cache block buffer */
135 struct st_pagecache_page
136 {
137   PAGECACHE_FILE file;    /* file to which the page belongs to  */
138   pgcache_page_no_t pageno; /* number of the page in the file   */
139 };
140 
141 /* element in the chain of a hash table bucket */
142 struct st_pagecache_hash_link
143 {
144   struct st_pagecache_hash_link
145     *next, **prev;                   /* to connect links in the same bucket  */
146   struct st_pagecache_block_link
147     *block;                          /* reference to the block for the page: */
148   PAGECACHE_FILE file;               /* from such a file                     */
149   pgcache_page_no_t pageno;            /* this page                            */
150   uint requests;                     /* number of requests for the page      */
151 };
152 
153 /* simple states of a block */
154 #define PCBLOCK_ERROR       1 /* an error occurred when performing disk i/o  */
155 #define PCBLOCK_READ        2 /* the is page in the block buffer             */
156 
157 /*
158   A tread is reading the data to the page.
159   If the page contained old changed data, it will be written out with
160   this state set on the block.
161   The page is not yet ready to be used for reading.
162 */
163 #define PCBLOCK_IN_SWITCH   4
164 /*
165   Block does not accept new requests for old page that would cause
166   the page to be pinned or written to.
167   (Reads that copies the block can still continue).
168   This state happens when another thread is waiting for readers to finish
169   to read data to the block (after the block, if it was changed, has been
170   flushed out to disk).
171 */
172 #define PCBLOCK_REASSIGNED  8
173 #define PCBLOCK_IN_FLUSH   16 /* block is in flush operation                 */
174 #define PCBLOCK_CHANGED    32 /* block buffer contains a dirty page          */
175 #define PCBLOCK_DIRECT_W   64 /* possible direct write to the block          */
176 #define PCBLOCK_DEL_WRITE 128 /* should be written on delete                 */
177 
178 /* page status, returned by find_block */
179 #define PAGE_READ               0
180 #define PAGE_TO_BE_READ         1
181 #define PAGE_WAIT_TO_BE_READ    2
182 
183 /* block temperature determines in which (sub-)chain the block currently is */
184 enum PCBLOCK_TEMPERATURE { PCBLOCK_COLD /*free*/ , PCBLOCK_WARM , PCBLOCK_HOT };
185 
186 /* debug info */
187 #ifndef DBUG_OFF
188 static const char *page_cache_page_type_str[]=
189 {
190   /* used only for control page type changing during debugging */
191   "EMPTY",
192   "PLAIN",
193   "LSN",
194   "READ_UNKNOWN"
195 };
196 
197 static const char *page_cache_page_write_mode_str[]=
198 {
199   "DELAY",
200   "DONE"
201 };
202 
203 static const char *page_cache_page_lock_str[]=
204 {
205   "free -> free",
206   "read -> read",
207   "write -> write",
208   "free -> read",
209   "free -> write",
210   "read -> free",
211   "write -> free",
212   "write -> read"
213 };
214 
215 static const char *page_cache_page_pin_str[]=
216 {
217   "pinned -> pinned",
218   "unpinned -> unpinned",
219   "unpinned -> pinned",
220   "pinned -> unpinned"
221 };
222 
223 
224 typedef struct st_pagecache_pin_info
225 {
226   struct st_pagecache_pin_info *next, **prev;
227   struct st_my_thread_var *thread;
228 }  PAGECACHE_PIN_INFO;
229 
230 /*
231   st_pagecache_lock_info structure should be kept in next, prev, thread part
232   compatible with st_pagecache_pin_info to be compatible in functions.
233 */
234 
235 typedef struct st_pagecache_lock_info
236 {
237   struct st_pagecache_lock_info *next, **prev;
238   struct st_my_thread_var *thread;
239   my_bool write_lock;
240 } PAGECACHE_LOCK_INFO;
241 
242 
243 /* service functions maintain debugging info about pin & lock */
244 
245 
246 /*
247   Links information about thread pinned/locked the block to the list
248 
249   SYNOPSIS
250     info_link()
251     list                 the list to link in
252     node                 the node which should be linked
253 */
254 
255 static void info_link(PAGECACHE_PIN_INFO **list, PAGECACHE_PIN_INFO *node)
256 {
257   if ((node->next= *list))
258     node->next->prev= &(node->next);
259   *list= node;
260   node->prev= list;
261 }
262 
263 
264 /*
265   Unlinks information about thread pinned/locked the block from the list
266 
267   SYNOPSIS
268     info_unlink()
269     node                 the node which should be unlinked
270 */
271 
272 static void info_unlink(PAGECACHE_PIN_INFO *node)
273 {
274   if ((*node->prev= node->next))
275    node->next->prev= node->prev;
276 }
277 
278 
279 /*
280   Finds information about given thread in the list of threads which
281   pinned/locked this block.
282 
283   SYNOPSIS
284     info_find()
285     list                 the list where to find the thread
286     thread               thread ID (reference to the st_my_thread_var
287                          of the thread)
288     any                  return any thread of the list
289 
290   RETURN
291     0 - the thread was not found
292     pointer to the information node of the thread in the list, or, if 'any',
293     to any thread of the list.
294 */
295 
296 static PAGECACHE_PIN_INFO *info_find(PAGECACHE_PIN_INFO *list,
297                                      struct st_my_thread_var *thread,
298                                      my_bool any)
299 {
300   register PAGECACHE_PIN_INFO *i= list;
301   if (any)
302     return i;
303   for(; i != 0; i= i->next)
304     if (i->thread == thread)
305       return i;
306   return 0;
307 }
308 
309 #endif /* !DBUG_OFF */
310 
311 /* page cache block */
312 struct st_pagecache_block_link
313 {
314   struct st_pagecache_block_link
315     *next_used, **prev_used;   /* to connect links in the LRU chain (ring)   */
316   struct st_pagecache_block_link
317     *next_changed, **prev_changed; /* for lists of file dirty/clean blocks   */
318   struct st_pagecache_hash_link
319     *hash_link;           /* backward ptr to referring hash_link             */
320 #ifndef DBUG_OFF
321   PAGECACHE_PIN_INFO *pin_list;
322   PAGECACHE_LOCK_INFO *lock_list;
323 #endif
324   KEYCACHE_CONDVAR *condvar; /* condition variable for 'no readers' event    */
325   uchar *buffer;           /* buffer for the block page                      */
326   pthread_t write_locker;
327 
328   ulonglong last_hit_time; /* timestamp of the last hit                      */
329   WQUEUE
330     wqueue[COND_SIZE];    /* queues on waiting requests for new/old pages    */
331   uint32 requests;        /* number of requests for the block                */
332   uint32 pins;            /* pin counter                                     */
333   uint32 wlocks;          /* write locks counter                             */
334   uint32 rlocks;          /* read locks counter                              */
335   uint32 rlocks_queue;    /* rd. locks waiting wr. lock of this thread       */
336   uint16 status;          /* state of the block                              */
337   int16  error;           /* error code for block in case of error */
338   enum PCBLOCK_TEMPERATURE temperature; /* block temperature: cold, warm, hot*/
339   enum pagecache_page_type type; /* type of the block                        */
340   uint hits_left;         /* number of hits left until promotion             */
341   /** @brief LSN when first became dirty; LSN_MAX means "not yet set"        */
342   LSN rec_lsn;
343 };
344 
345 /** @brief information describing a run of flush_pagecache_blocks_int() */
346 struct st_file_in_flush
347 {
348   File file;
349   /**
350      @brief threads waiting for the thread currently flushing this file to be
351      done
352   */
353   WQUEUE flush_queue;
354   /**
355      @brief if the thread currently flushing the file has a non-empty
356      first_in_switch list.
357   */
358   my_bool first_in_switch;
359 };
360 
361 #ifndef DBUG_OFF
362 /* debug checks */
363 
364 #ifdef NOT_USED
365 static my_bool info_check_pin(PAGECACHE_BLOCK_LINK *block,
366                               enum pagecache_page_pin mode
367                               __attribute__((unused)))
368 {
369   struct st_my_thread_var *thread= my_thread_var;
370   PAGECACHE_PIN_INFO *info= info_find(block->pin_list, thread);
371   DBUG_ENTER("info_check_pin");
372   DBUG_PRINT("enter", ("thread: 0x%lx  pin: %s",
373                        (ulong) thread, page_cache_page_pin_str[mode]));
374   if (info)
375   {
376     if (mode == PAGECACHE_PIN_LEFT_UNPINNED)
377     {
378       DBUG_PRINT("info",
379                  ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; LEFT_UNPINNED!!!",
380                   (ulong)thread, (ulong)block));
381       DBUG_RETURN(1);
382     }
383     else if (mode == PAGECACHE_PIN)
384     {
385       DBUG_PRINT("info",
386                  ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; PIN!!!",
387                   (ulong)thread, (ulong)block));
388       DBUG_RETURN(1);
389     }
390   }
391   else
392   {
393     if (mode == PAGECACHE_PIN_LEFT_PINNED)
394     {
395       DBUG_PRINT("info",
396                  ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; LEFT_PINNED!!!",
397                   (ulong)thread, (ulong)block));
398       DBUG_RETURN(1);
399     }
400     else if (mode == PAGECACHE_UNPIN)
401     {
402       DBUG_PRINT("info",
403                  ("info_check_pin: thread: 0x%lx  block: 0x%lx  ; UNPIN!!!",
404                   (ulong)thread, (ulong)block));
405       DBUG_RETURN(1);
406     }
407   }
408   DBUG_RETURN(0);
409 }
410 
411 
412 /*
413   Debug function which checks current lock/pin state and requested changes
414 
415   SYNOPSIS
416     info_check_lock()
417     lock                 requested lock changes
418     pin                  requested pin changes
419 
420   RETURN
421     0 - OK
422     1 - Error
423 */
424 
425 static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
426                                enum pagecache_page_lock lock,
427                                enum pagecache_page_pin pin)
428 {
429   struct st_my_thread_var *thread= my_thread_var;
430   PAGECACHE_LOCK_INFO *info=
431     (PAGECACHE_LOCK_INFO *) info_find((PAGECACHE_PIN_INFO *) block->lock_list,
432                                       thread);
433   DBUG_ENTER("info_check_lock");
434   switch(lock) {
435   case PAGECACHE_LOCK_LEFT_UNLOCKED:
436     if (pin != PAGECACHE_PIN_LEFT_UNPINNED ||
437         info)
438       goto error;
439     break;
440   case PAGECACHE_LOCK_LEFT_READLOCKED:
441     if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
442          pin != PAGECACHE_PIN_LEFT_PINNED) ||
443         info == 0 || info->write_lock)
444       goto error;
445     break;
446   case PAGECACHE_LOCK_LEFT_WRITELOCKED:
447     if (pin != PAGECACHE_PIN_LEFT_PINNED ||
448         info == 0 || !info->write_lock)
449       goto error;
450     break;
451   case PAGECACHE_LOCK_READ:
452     if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
453          pin != PAGECACHE_PIN) ||
454         info != 0)
455       goto error;
456     break;
457   case PAGECACHE_LOCK_WRITE:
458     if (pin != PAGECACHE_PIN ||
459         info != 0)
460       goto error;
461     break;
462   case PAGECACHE_LOCK_READ_UNLOCK:
463     if ((pin != PAGECACHE_PIN_LEFT_UNPINNED &&
464          pin != PAGECACHE_UNPIN) ||
465         info == 0 || info->write_lock)
466       goto error;
467     break;
468   case PAGECACHE_LOCK_WRITE_UNLOCK:
469     if (pin != PAGECACHE_UNPIN ||
470         info == 0 || !info->write_lock)
471       goto error;
472     break;
473   case PAGECACHE_LOCK_WRITE_TO_READ:
474     if ((pin != PAGECACHE_PIN_LEFT_PINNED &&
475          pin != PAGECACHE_UNPIN) ||
476         info == 0 || !info->write_lock)
477       goto error;
478     break;
479   }
480   DBUG_RETURN(0);
481 error:
482   DBUG_PRINT("info",
483              ("info_check_lock: thread: 0x%lx block 0x%lx: info: %d wrt: %d,"
484               "to lock: %s, to pin: %s",
485               (ulong) thread, (ulong) block, MY_TEST(info),
486               (info ? info->write_lock : 0),
487               page_cache_page_lock_str[lock],
488               page_cache_page_pin_str[pin]));
489   DBUG_RETURN(1);
490 }
491 #endif /* NOT_USED */
492 #endif /* !DBUG_OFF */
493 
494 #define FLUSH_CACHE         2000            /* sort this many blocks at once */
495 
496 static my_bool free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
497                           my_bool abort_if_pinned);
498 static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link);
499 #ifndef DBUG_OFF
500 static void test_key_cache(PAGECACHE *pagecache,
501                            const char *where, my_bool lock);
502 #endif
503 
504 #define PAGECACHE_HASH(p, f, pos) (((size_t) (pos) +                          \
505                                     (size_t) (f).file) & (p->hash_entries-1))
506 #define FILE_HASH(f,cache) ((uint) (f).file & (cache->changed_blocks_hash_size-1))
507 
508 #define DEFAULT_PAGECACHE_DEBUG_LOG  "pagecache_debug.log"
509 
510 #if defined(PAGECACHE_DEBUG) && ! defined(PAGECACHE_DEBUG_LOG)
511 #define PAGECACHE_DEBUG_LOG  DEFAULT_PAGECACHE_DEBUG_LOG
512 #endif
513 
514 #if defined(PAGECACHE_DEBUG_LOG)
515 static FILE *pagecache_debug_log= NULL;
516 static void pagecache_debug_print _VARARGS((const char *fmt, ...));
517 #define PAGECACHE_DEBUG_OPEN                                                  \
518           if (!pagecache_debug_log)                                           \
519           {                                                                   \
520             pagecache_debug_log= fopen(PAGECACHE_DEBUG_LOG, "w");             \
521             (void) setvbuf(pagecache_debug_log, NULL, _IOLBF, BUFSIZ);        \
522           }
523 
524 #define PAGECACHE_DEBUG_CLOSE                                                 \
525           if (pagecache_debug_log)                                            \
526           {                                                                   \
527             fclose(pagecache_debug_log);                                      \
528             pagecache_debug_log= 0;                                           \
529           }
530 #else
531 #define PAGECACHE_DEBUG_OPEN
532 #define PAGECACHE_DEBUG_CLOSE
533 #endif /* defined(PAGECACHE_DEBUG_LOG) */
534 
535 #if defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG)
536 #define KEYCACHE_PRINT(l, m) KEYCACHE_DBUG_PRINT(l,m)
537 #define KEYCACHE_DBUG_PRINT(l, m)                                             \
538             { if (pagecache_debug_log)                                        \
539                 fprintf(pagecache_debug_log, "%s: ", l);                      \
540               pagecache_debug_print m; }
541 
542 #define KEYCACHE_DBUG_ASSERT(a)                                               \
543             { if (! (a) && pagecache_debug_log)                               \
544                 fclose(pagecache_debug_log);                                  \
545               DBUG_ASSERT(a); }
546 #else
547 #define KEYCACHE_PRINT(l, m)
548 #define KEYCACHE_DBUG_PRINT(l, m)  DBUG_PRINT(l, m)
549 #define KEYCACHE_DBUG_ASSERT(a)    DBUG_ASSERT(a)
550 #endif /* defined(PAGECACHE_DEBUG_LOG) && defined(PAGECACHE_DEBUG) */
551 
552 #if defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF)
553 static long pagecache_thread_id;
554 #define KEYCACHE_THREAD_TRACE(l)                                              \
555              KEYCACHE_DBUG_PRINT(l,("|thread %ld",pagecache_thread_id))
556 
557 #define KEYCACHE_THREAD_TRACE_BEGIN(l)                                        \
558             { struct st_my_thread_var *thread_var= my_thread_var;             \
559               pagecache_thread_id= thread_var->id;                            \
560               KEYCACHE_DBUG_PRINT(l,("[thread %ld",pagecache_thread_id)) }
561 
562 #define KEYCACHE_THREAD_TRACE_END(l)                                          \
563             KEYCACHE_DBUG_PRINT(l,("]thread %ld",pagecache_thread_id))
564 #else
565 #define KEYCACHE_PRINT(l,m)
566 #define KEYCACHE_THREAD_TRACE_BEGIN(l)
567 #define KEYCACHE_THREAD_TRACE_END(l)
568 #define KEYCACHE_THREAD_TRACE(l)
569 #endif /* defined(PAGECACHE_DEBUG) || !defined(DBUG_OFF) */
570 
571 #define PCBLOCK_NUMBER(p, b)                                                    \
572   ((uint) (((char*)(b)-(char *) p->block_root)/sizeof(PAGECACHE_BLOCK_LINK)))
573 #define PAGECACHE_HASH_LINK_NUMBER(p, h)                                      \
574   ((uint) (((char*)(h)-(char *) p->hash_link_root)/                           \
575            sizeof(PAGECACHE_HASH_LINK)))
576 
577 #if (defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)) || defined(PAGECACHE_DEBUG)
578 static int pagecache_pthread_cond_wait(mysql_cond_t *cond,
579                                       mysql_mutex_t *mutex);
580 #else
581 #define  pagecache_pthread_cond_wait mysql_cond_wait
582 #endif
583 
584 #if defined(PAGECACHE_DEBUG)
585 static int ___pagecache_pthread_mutex_lock(mysql_mutex_t *mutex);
586 static void ___pagecache_pthread_mutex_unlock(mysql_mutex_t *mutex);
587 static int ___pagecache_pthread_cond_signal(mysql_cond_t *cond);
588 #define pagecache_pthread_mutex_lock(M) \
589 { DBUG_PRINT("lock", ("mutex lock 0x%lx %u", (ulong)(M), __LINE__)); \
590   ___pagecache_pthread_mutex_lock(M);}
591 #define pagecache_pthread_mutex_unlock(M) \
592 { DBUG_PRINT("lock", ("mutex unlock 0x%lx %u", (ulong)(M), __LINE__)); \
593   ___pagecache_pthread_mutex_unlock(M);}
594 #define pagecache_pthread_cond_signal(M) \
595 { DBUG_PRINT("lock", ("signal 0x%lx %u", (ulong)(M), __LINE__)); \
596   ___pagecache_pthread_cond_signal(M);}
597 #else
598 #define pagecache_pthread_mutex_lock mysql_mutex_lock
599 #define pagecache_pthread_mutex_unlock mysql_mutex_unlock
600 #define pagecache_pthread_cond_signal mysql_cond_signal
601 #endif /* defined(PAGECACHE_DEBUG) */
602 
603 extern my_bool translog_flush(TRANSLOG_ADDRESS lsn);
604 
605 /*
606   Write page to the disk
607 
608   SYNOPSIS
609     pagecache_fwrite()
610     pagecache - page cache pointer
611     filedesc  - pagecache file descriptor structure
612     buffer    - buffer which we will write
613     type      - page type (plain or with LSN)
614     flags     - MYF() flags
615 
616   RETURN
617     0   - OK
618     1   - Error
619 */
620 
621 static my_bool pagecache_fwrite(PAGECACHE *pagecache,
622                                 PAGECACHE_FILE *filedesc,
623                                 uchar *buffer,
624                                 pgcache_page_no_t pageno,
625                                 enum pagecache_page_type type
626                                 __attribute__((unused)),
627                                 myf flags)
628 {
629   int res;
630   PAGECACHE_IO_HOOK_ARGS args;
631   DBUG_ENTER("pagecache_fwrite");
632   DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
633 
634 #ifdef EXTRA_DEBUG_BITMAP
635   /*
636     This code is very good when debugging changes in bitmaps or dirty lists
637     The above define should be defined for all Aria files if you want to
638     debug either of the above issues.
639   */
640 
641   if (pagecache->extra_debug)
642   {
643     char buff[80];
644     uint len= my_sprintf(buff,
645                          (buff, "fwrite: fd: %d  id: %u  page: %llu",
646                           filedesc->file,
647                           _ma_file_callback_to_id(filedesc->callback_data),
648                           pageno));
649     (void) translog_log_debug_info(0, LOGREC_DEBUG_INFO_QUERY,
650                                    (uchar*) buff, len);
651   }
652 #endif
653 
654   /* initialize hooks args */
655   args.page= buffer;
656   args.pageno= pageno;
657   args.data= filedesc->callback_data;
658 
659   /* Todo: Integrate this with write_callback so we have only one callback */
660   if ((*filedesc->flush_log_callback)(&args))
661     DBUG_RETURN(1);
662   DBUG_PRINT("info", ("pre_write_hook:%p  data: %p",
663                       filedesc->pre_write_hook,
664                       filedesc->callback_data));
665   if ((*filedesc->pre_write_hook)(&args))
666   {
667     DBUG_PRINT("error", ("write callback problem"));
668     DBUG_RETURN(1);
669   }
670   res= (int)my_pwrite(filedesc->file, args.page, pagecache->block_size,
671                  ((my_off_t) pageno << pagecache->shift), flags);
672   (*filedesc->post_write_hook)(res, &args);
673   DBUG_RETURN(res);
674 }
675 
676 
677 /*
678   Read page from the disk
679 
680   SYNOPSIS
681     pagecache_fread()
682     pagecache - page cache pointer
683     filedesc  - pagecache file descriptor structure
684     buffer    - buffer in which we will read
685     pageno    - page number
686     flags     - MYF() flags
687 */
688 #define pagecache_fread(pagecache, filedesc, buffer, pageno, flags) \
689   mysql_file_pread((filedesc)->file, buffer, pagecache->block_size,         \
690            ((my_off_t) pageno << pagecache->shift), flags)
691 
692 
693 /**
694   @brief set rec_lsn of pagecache block (if it is needed)
695 
696   @param block                   block where to set rec_lsn
697   @param first_REDO_LSN_for_page the LSN to set
698 */
699 
700 static inline void pagecache_set_block_rec_lsn(PAGECACHE_BLOCK_LINK *block,
701                                                LSN first_REDO_LSN_for_page)
702 {
703   if (block->rec_lsn == LSN_MAX)
704     block->rec_lsn= first_REDO_LSN_for_page;
705   else
706     DBUG_ASSERT(cmp_translog_addr(block->rec_lsn,
707                                   first_REDO_LSN_for_page) <= 0);
708 }
709 
710 
711 /*
712   next_power(value) is 2 at the power of (1+floor(log2(value)));
713   e.g. next_power(2)=4, next_power(3)=4.
714 */
715 static inline uint next_power(uint value)
716 {
717   return (uint) my_round_up_to_next_power((uint32) value) << 1;
718 }
719 
720 
721 /*
722   Initialize a page cache
723 
724   SYNOPSIS
725     init_pagecache()
726     pagecache			pointer to a page cache data structure
727     key_cache_block_size	size of blocks to keep cached data
728     use_mem                     total memory to use for the key cache
729     division_limit		division limit (may be zero)
730     age_threshold		age threshold (may be zero)
731     block_size                  size of block (should be power of 2)
732     my_read_flags		Flags used for all pread/pwrite calls
733 			        Usually MY_WME in case of recovery
734 
735   RETURN VALUE
736     number of blocks in the key cache, if successful,
737     0 - otherwise.
738 
739   NOTES.
740     if pagecache->inited != 0 we assume that the key cache
741     is already initialized.  This is for now used by myisamchk, but shouldn't
742     be something that a program should rely on!
743 
744     It's assumed that no two threads call this function simultaneously
745     referring to the same key cache handle.
746 
747 */
748 
749 size_t init_pagecache(PAGECACHE *pagecache, size_t use_mem,
750                      uint division_limit, uint age_threshold,
751                      uint block_size, uint changed_blocks_hash_size,
752                      myf my_readwrite_flags)
753 {
754   size_t blocks, hash_links, length;
755   int error;
756   DBUG_ENTER("init_pagecache");
757   DBUG_ASSERT(block_size >= 512);
758 
759   PAGECACHE_DEBUG_OPEN;
760   if (pagecache->inited && pagecache->disk_blocks > 0)
761   {
762     DBUG_PRINT("warning",("key cache already in use"));
763     DBUG_RETURN(0);
764   }
765 
766   pagecache->global_cache_w_requests= pagecache->global_cache_r_requests= 0;
767   pagecache->global_cache_read= pagecache->global_cache_write= 0;
768   pagecache->disk_blocks= -1;
769   if (! pagecache->inited)
770   {
771     if (mysql_mutex_init(key_PAGECACHE_cache_lock,
772                          &pagecache->cache_lock, MY_MUTEX_INIT_FAST) ||
773         my_hash_init(&pagecache->files_in_flush, &my_charset_bin, 32,
774                      offsetof(struct st_file_in_flush, file),
775                      sizeof(((struct st_file_in_flush *)NULL)->file),
776                      NULL, NULL, 0))
777       goto err;
778     pagecache->inited= 1;
779     pagecache->in_init= 0;
780     pagecache->resize_queue.last_thread= NULL;
781   }
782 
783   pagecache->mem_size= use_mem;
784   pagecache->block_size= block_size;
785   pagecache->shift= my_bit_log2(block_size);
786   pagecache->readwrite_flags= my_readwrite_flags | MY_NABP | MY_WAIT_IF_FULL;
787   pagecache->org_readwrite_flags= pagecache->readwrite_flags;
788   DBUG_PRINT("info", ("block_size: %u", block_size));
789   DBUG_ASSERT(((uint)(1 << pagecache->shift)) == block_size);
790 
791   blocks= use_mem / (sizeof(PAGECACHE_BLOCK_LINK) +
792                               2 * sizeof(PAGECACHE_HASH_LINK) +
793                               sizeof(PAGECACHE_HASH_LINK*) *
794                               5/4 + block_size);
795   /* Changed blocks hash needs to be a power of 2 */
796   changed_blocks_hash_size= my_round_up_to_next_power(MY_MAX(changed_blocks_hash_size,
797                                                              MIN_PAGECACHE_CHANGED_BLOCKS_HASH_SIZE));
798 
799   /*
800     We need to support page cache with just one block to be able to do
801     scanning of rows-in-block files
802   */
803   for ( ; ; )
804   {
805     if (blocks < 8)
806     {
807       my_message(ENOMEM, "Not enough memory to allocate 8 pagecache pages",
808                  MYF(0));
809       my_errno= ENOMEM;
810       goto err;
811     }
812     /* Set my_hash_entries to the next bigger 2 power */
813     if ((pagecache->hash_entries= next_power((uint)blocks)) <
814         (blocks) * 5/4)
815       pagecache->hash_entries<<= 1;
816     hash_links= 2 * blocks;
817 #if defined(MAX_THREADS)
818     if (hash_links < MAX_THREADS + blocks - 1)
819       hash_links= MAX_THREADS + blocks - 1;
820 #endif
821     while ((length= (ALIGN_SIZE(blocks * sizeof(PAGECACHE_BLOCK_LINK)) +
822                      ALIGN_SIZE(sizeof(PAGECACHE_HASH_LINK*) *
823                                 pagecache->hash_entries) +
824                      ALIGN_SIZE(hash_links * sizeof(PAGECACHE_HASH_LINK)) +
825                      sizeof(PAGECACHE_BLOCK_LINK*)* (changed_blocks_hash_size*2))) +
826            (blocks << pagecache->shift) > use_mem && blocks > 8)
827       blocks--;
828     /* Allocate memory for cache page buffers */
829     if ((pagecache->block_mem=
830       my_large_malloc(blocks * pagecache->block_size,
831                          MYF(MY_WME))))
832     {
833       /*
834         Allocate memory for blocks, hash_links and hash entries;
835         For each block 2 hash links are allocated
836       */
837       if (my_multi_malloc_large(MYF(MY_ZEROFILL),
838                                 &pagecache->block_root,
839                                 (ulonglong) (blocks *
840                                              sizeof(PAGECACHE_BLOCK_LINK)),
841                                 &pagecache->hash_root,
842                                 (ulonglong) (sizeof(PAGECACHE_HASH_LINK*) *
843                                              pagecache->hash_entries),
844                                 &pagecache->hash_link_root,
845                                 (ulonglong) (hash_links *
846                                              sizeof(PAGECACHE_HASH_LINK)),
847                                 &pagecache->changed_blocks,
848                                 (ulonglong) (sizeof(PAGECACHE_BLOCK_LINK*) *
849                                              changed_blocks_hash_size),
850                                 &pagecache->file_blocks,
851                                 (ulonglong) (sizeof(PAGECACHE_BLOCK_LINK*) *
852                                              changed_blocks_hash_size),
853                                 NullS))
854         break;
855       my_large_free(pagecache->block_mem);
856       pagecache->block_mem= 0;
857     }
858     blocks= blocks / 4*3;
859   }
860   pagecache->blocks_unused= blocks;
861   pagecache->disk_blocks= blocks;
862   pagecache->hash_links= hash_links;
863   pagecache->hash_links_used= 0;
864   pagecache->free_hash_list= NULL;
865   pagecache->blocks_used= pagecache->blocks_changed= 0;
866 
867   pagecache->global_blocks_changed= 0;
868   pagecache->blocks_available=0;		/* For debugging */
869 
870   /* The LRU chain is empty after initialization */
871   pagecache->used_last= NULL;
872   pagecache->used_ins= NULL;
873   pagecache->free_block_list= NULL;
874   pagecache->time= 0;
875   pagecache->warm_blocks= 0;
876   pagecache->min_warm_blocks= (division_limit ?
877                                blocks * division_limit / 100 + 1 :
878                                blocks);
879   pagecache->age_threshold= (age_threshold ?
880                              blocks * age_threshold / 100 :
881                              blocks);
882   pagecache->changed_blocks_hash_size= changed_blocks_hash_size;
883 
884   pagecache->cnt_for_resize_op= 0;
885   pagecache->resize_in_flush= 0;
886   pagecache->can_be_used= 1;
887 
888   pagecache->waiting_for_hash_link.last_thread= NULL;
889   pagecache->waiting_for_block.last_thread= NULL;
890   DBUG_PRINT("exit",
891              ("disk_blocks: %zu  block_root: %p  hash_entries: %zu\
892  hash_root: %p  hash_links: %zu  hash_link_root: %p",
893               (size_t)pagecache->disk_blocks, pagecache->block_root,
894               pagecache->hash_entries, pagecache->hash_root,
895               (size_t)pagecache->hash_links, pagecache->hash_link_root));
896 
897   pagecache->blocks= pagecache->disk_blocks > 0 ? pagecache->disk_blocks : 0;
898   DBUG_RETURN((size_t)pagecache->disk_blocks);
899 
900 err:
901   error= my_errno;
902   pagecache->disk_blocks= 0;
903   pagecache->blocks=  0;
904   if (pagecache->block_mem)
905   {
906     my_large_free(pagecache->block_mem);
907     pagecache->block_mem= NULL;
908   }
909   if (pagecache->block_root)
910   {
911     my_free(pagecache->block_root);
912     pagecache->block_root= NULL;
913   }
914   my_errno= error;
915   pagecache->can_be_used= 0;
916   DBUG_RETURN(0);
917 }
918 
919 
920 /*
921   Flush all blocks in the key cache to disk
922 */
923 
924 #ifdef NOT_USED
925 static int flush_all_key_blocks(PAGECACHE *pagecache)
926 {
927 #if defined(PAGECACHE_DEBUG)
928   uint cnt=0;
929 #endif
930   while (pagecache->blocks_changed > 0)
931   {
932     PAGECACHE_BLOCK_LINK *block;
933     for (block= pagecache->used_last->next_used ; ; block=block->next_used)
934     {
935       if (block->hash_link)
936       {
937 #if defined(PAGECACHE_DEBUG)
938         cnt++;
939         KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
940 #endif
941         if (flush_pagecache_blocks_int(pagecache, &block->hash_link->file,
942                                        FLUSH_RELEASE, NULL, NULL))
943           return 1;
944         break;
945       }
946       if (block == pagecache->used_last)
947         break;
948     }
949   }
950   return 0;
951 }
952 #endif /* NOT_USED */
953 
954 /*
955   Resize a key cache
956 
957   SYNOPSIS
958     resize_pagecache()
959     pagecache                   pointer to a page cache data structure
960     use_mem			total memory to use for the new key cache
961     division_limit		new division limit (if not zero)
962     age_threshold		new age threshold (if not zero)
963 
964   RETURN VALUE
965     number of blocks in the key cache, if successful,
966     0 - otherwise.
967 
968   NOTES.
969     The function first compares the memory size parameter
970     with the key cache value.
971 
972     If they differ the function free the the memory allocated for the
973     old key cache blocks by calling the end_pagecache function and
974     then rebuilds the key cache with new blocks by calling
975     init_key_cache.
976 
977     The function starts the operation only when all other threads
978     performing operations with the key cache let her to proceed
979     (when cnt_for_resize=0).
980 
981      Before being usable, this function needs:
982      - to receive fixes for BUG#17332 "changing key_buffer_size on a running
983      server can crash under load" similar to those done to the key cache
984      - to have us (Sanja) look at the additional constraints placed on
985      resizing, due to the page locking specific to this page cache.
986      So we disable it for now.
987 */
988 #ifdef NOT_USED /* keep disabled until code is fixed see above !! */
989 size_t resize_pagecache(PAGECACHE *pagecache,
990                        size_t use_mem, uint division_limit,
991                        uint age_threshold, uint changed_blocks_hash_size)
992 {
993   size_t blocks;
994   struct st_my_thread_var *thread;
995   WQUEUE *wqueue;
996   DBUG_ENTER("resize_pagecache");
997 
998   if (!pagecache->inited)
999     DBUG_RETURN(pagecache->disk_blocks);
1000 
1001   if(use_mem == pagecache->mem_size)
1002   {
1003     change_pagecache_param(pagecache, division_limit, age_threshold);
1004     DBUG_RETURN(pagecache->disk_blocks);
1005   }
1006 
1007   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
1008 
1009   wqueue= &pagecache->resize_queue;
1010   thread= my_thread_var;
1011   wqueue_link_into_queue(wqueue, thread);
1012 
1013   while (wqueue->last_thread->next != thread)
1014   {
1015     pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
1016   }
1017 
1018   pagecache->resize_in_flush= 1;
1019   if (flush_all_key_blocks(pagecache))
1020   {
1021     /* TODO: if this happens, we should write a warning in the log file ! */
1022     pagecache->resize_in_flush= 0;
1023     blocks= 0;
1024     pagecache->can_be_used= 0;
1025     goto finish;
1026   }
1027   pagecache->resize_in_flush= 0;
1028   pagecache->can_be_used= 0;
1029   while (pagecache->cnt_for_resize_op)
1030   {
1031     DBUG_PRINT("wait", ("suspend thread %s %ld", thread->name, thread->id));
1032     pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
1033   }
1034 
1035   end_pagecache(pagecache, 0);			/* Don't free mutex */
1036   /* The following will work even if use_mem is 0 */
1037   blocks= init_pagecache(pagecache, pagecache->block_size, use_mem,
1038 			 division_limit, age_threshold, changed_blocks_hash_size,
1039                          pagecache->readwrite_flags);
1040 
1041 finish:
1042   wqueue_unlink_from_queue(wqueue, thread);
1043   /* Signal for the next resize request to proceeed if any */
1044   if (wqueue->last_thread)
1045   {
1046     DBUG_PRINT("signal",
1047                ("thread %s %ld", wqueue->last_thread->next->name,
1048                 wqueue->last_thread->next->id));
1049     pagecache_pthread_cond_signal(&wqueue->last_thread->next->suspend);
1050   }
1051   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
1052   DBUG_RETURN(blocks);
1053 }
1054 #endif /* 0 */
1055 
1056 
1057 /*
1058   Increment counter blocking resize key cache operation
1059 */
1060 static inline void inc_counter_for_resize_op(PAGECACHE *pagecache)
1061 {
1062   mysql_mutex_assert_owner(&pagecache->cache_lock);
1063   pagecache->cnt_for_resize_op++;
1064 }
1065 
1066 
1067 /*
1068   Decrement counter blocking resize key cache operation;
1069   Signal the operation to proceed when counter becomes equal zero
1070 */
1071 
1072 static inline void dec_counter_for_resize_op(PAGECACHE *pagecache)
1073 {
1074   struct st_my_thread_var *last_thread;
1075   mysql_mutex_assert_owner(&pagecache->cache_lock);
1076   if (!--pagecache->cnt_for_resize_op &&
1077       (last_thread= pagecache->resize_queue.last_thread))
1078   {
1079     DBUG_PRINT("signal",
1080                ("thread %s %ld", last_thread->next->name,
1081                 (ulong) last_thread->next->id));
1082     pagecache_pthread_cond_signal(&last_thread->next->suspend);
1083   }
1084 }
1085 
1086 /*
1087   Change the page cache parameters
1088 
1089   SYNOPSIS
1090     change_pagecache_param()
1091     pagecache			pointer to a page cache data structure
1092     division_limit		new division limit (if not zero)
1093     age_threshold		new age threshold (if not zero)
1094 
1095   RETURN VALUE
1096     none
1097 
1098   NOTES.
1099     Presently the function resets the key cache parameters
1100     concerning midpoint insertion strategy - division_limit and
1101     age_threshold.
1102 */
1103 
1104 void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
1105 			    uint age_threshold)
1106 {
1107   DBUG_ENTER("change_pagecache_param");
1108 
1109   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
1110   if (division_limit)
1111     pagecache->min_warm_blocks= (pagecache->disk_blocks *
1112 				division_limit / 100 + 1);
1113   if (age_threshold)
1114     pagecache->age_threshold=   (pagecache->disk_blocks *
1115 				age_threshold / 100);
1116   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
1117   DBUG_VOID_RETURN;
1118 }
1119 
1120 
1121 /*
1122   Check that pagecache was used and cleaned up properly.
1123 */
1124 
1125 #ifndef DBUG_OFF
1126 void check_pagecache_is_cleaned_up(PAGECACHE *pagecache)
1127 {
1128   DBUG_ENTER("check_pagecache_is_cleaned_up");
1129   /*
1130     Ensure we called inc_counter_for_resize_op and dec_counter_for_resize_op
1131     the same number of times. (If not, a resize() could never happen.
1132   */
1133   DBUG_ASSERT(pagecache->cnt_for_resize_op == 0);
1134 
1135   if (pagecache->disk_blocks > 0)
1136   {
1137     if (pagecache->block_mem)
1138     {
1139       uint i;
1140       for (i=0 ; i < pagecache->blocks_used ; i++)
1141       {
1142         DBUG_ASSERT(pagecache->block_root[i].status == 0);
1143         DBUG_ASSERT(pagecache->block_root[i].type == PAGECACHE_EMPTY_PAGE);
1144       }
1145     }
1146   }
1147   DBUG_VOID_RETURN;
1148 }
1149 #endif
1150 
1151 
1152 /*
1153   Removes page cache from memory. Does NOT flush pages to disk.
1154 
1155   SYNOPSIS
1156     end_pagecache()
1157     pagecache		page cache handle
1158     cleanup		Complete free (Free also mutex for key cache)
1159 
1160   RETURN VALUE
1161     none
1162 */
1163 
1164 void end_pagecache(PAGECACHE *pagecache, my_bool cleanup)
1165 {
1166   DBUG_ENTER("end_pagecache");
1167   DBUG_PRINT("enter", ("key_cache: %p", pagecache));
1168 
1169   if (!pagecache->inited)
1170     DBUG_VOID_RETURN;
1171 
1172   if (pagecache->disk_blocks > 0)
1173   {
1174 #ifndef DBUG_OFF
1175     check_pagecache_is_cleaned_up(pagecache);
1176 #endif
1177 
1178     if (pagecache->block_mem)
1179     {
1180       my_large_free(pagecache->block_mem);
1181       pagecache->block_mem= NULL;
1182       my_free(pagecache->block_root);
1183       pagecache->block_root= NULL;
1184     }
1185     pagecache->disk_blocks= -1;
1186     /* Reset blocks_changed to be safe if flush_all_key_blocks is called */
1187     pagecache->blocks_changed= 0;
1188   }
1189 
1190   DBUG_PRINT("status", ("used: %zu  changed: %zu  w_requests: %llu  "
1191                         "writes: %llu  r_requests: %llu  reads: %llu",
1192 			pagecache->blocks_used,
1193 			pagecache->global_blocks_changed,
1194 			pagecache->global_cache_w_requests,
1195 			pagecache->global_cache_write,
1196 			pagecache->global_cache_r_requests,
1197 			pagecache->global_cache_read));
1198 
1199   if (cleanup)
1200   {
1201     my_hash_free(&pagecache->files_in_flush);
1202     mysql_mutex_destroy(&pagecache->cache_lock);
1203     pagecache->inited= pagecache->can_be_used= 0;
1204     PAGECACHE_DEBUG_CLOSE;
1205   }
1206   DBUG_VOID_RETURN;
1207 } /* end_pagecache */
1208 
1209 
1210 /*
1211   Unlink a block from the chain of dirty/clean blocks
1212 */
1213 
1214 static inline void unlink_changed(PAGECACHE_BLOCK_LINK *block)
1215 {
1216   if (block->next_changed)
1217     block->next_changed->prev_changed= block->prev_changed;
1218   *block->prev_changed= block->next_changed;
1219 }
1220 
1221 
1222 /*
1223   Link a block into the chain of dirty/clean blocks
1224 */
1225 
1226 static inline void link_changed(PAGECACHE_BLOCK_LINK *block,
1227                                 PAGECACHE_BLOCK_LINK **phead)
1228 {
1229   block->prev_changed= phead;
1230   if ((block->next_changed= *phead))
1231     (*phead)->prev_changed= &block->next_changed;
1232   *phead= block;
1233 }
1234 
1235 
1236 /*
1237   Unlink a block from the chain of dirty/clean blocks, if it's asked for,
1238   and link it to the chain of clean blocks for the specified file
1239 */
1240 
1241 static void link_to_file_list(PAGECACHE *pagecache,
1242                               PAGECACHE_BLOCK_LINK *block,
1243                               PAGECACHE_FILE *file, my_bool unlink_flag)
1244 {
1245   if (unlink_flag)
1246     unlink_changed(block);
1247   link_changed(block, &pagecache->file_blocks[FILE_HASH(*file, pagecache)]);
1248   if (block->status & PCBLOCK_CHANGED)
1249   {
1250     block->status&= ~(PCBLOCK_CHANGED | PCBLOCK_DEL_WRITE);
1251     block->rec_lsn= LSN_MAX;
1252     pagecache->blocks_changed--;
1253     pagecache->global_blocks_changed--;
1254   }
1255 }
1256 
1257 
1258 /*
1259   Unlink a block from the chain of clean blocks for the specified
1260   file and link it to the chain of dirty blocks for this file
1261 */
1262 
1263 static inline void link_to_changed_list(PAGECACHE *pagecache,
1264                                         PAGECACHE_BLOCK_LINK *block)
1265 {
1266   unlink_changed(block);
1267   link_changed(block,
1268                &pagecache->changed_blocks[FILE_HASH(block->hash_link->file, pagecache)]);
1269   block->status|=PCBLOCK_CHANGED;
1270   pagecache->blocks_changed++;
1271   pagecache->global_blocks_changed++;
1272 }
1273 
1274 
1275 /*
1276   Link a block to the LRU chain at the beginning or at the end of
1277   one of two parts.
1278 
1279   SYNOPSIS
1280     link_block()
1281       pagecache            pointer to a page cache data structure
1282       block               pointer to the block to link to the LRU chain
1283       hot                 <-> to link the block into the hot subchain
1284       at_end              <-> to link the block at the end of the subchain
1285 
1286   RETURN VALUE
1287     none
1288 
1289   NOTES.
1290     The LRU chain is represented by a circular list of block structures.
1291     The list is double-linked of the type (**prev,*next) type.
1292     The LRU chain is divided into two parts - hot and warm.
1293     There are two pointers to access the last blocks of these two
1294     parts. The beginning of the warm part follows right after the
1295     end of the hot part.
1296     Only blocks of the warm part can be used for replacement.
1297     The first block from the beginning of this subchain is always
1298     taken for eviction (pagecache->last_used->next)
1299 
1300     LRU chain:       +------+   H O T    +------+
1301                 +----| end  |----...<----| beg  |----+
1302                 |    +------+last        +------+    |
1303                 v<-link in latest hot (new end)      |
1304                 |     link in latest warm (new end)->^
1305                 |    +------+  W A R M   +------+    |
1306                 +----| beg  |---->...----| end  |----+
1307                      +------+            +------+ins
1308                   first for eviction
1309 */
1310 
1311 static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
1312                        my_bool hot, my_bool at_end)
1313 {
1314   PAGECACHE_BLOCK_LINK *ins;
1315   PAGECACHE_BLOCK_LINK **ptr_ins;
1316   DBUG_ENTER("link_block");
1317 
1318   PCBLOCK_INFO(block);
1319   KEYCACHE_DBUG_ASSERT(! (block->hash_link && block->hash_link->requests));
1320   if (!hot && pagecache->waiting_for_block.last_thread)
1321   {
1322     /* Signal that in the LRU warm sub-chain an available block has appeared */
1323     struct st_my_thread_var *last_thread=
1324                                pagecache->waiting_for_block.last_thread;
1325     struct st_my_thread_var *first_thread= last_thread->next;
1326     struct st_my_thread_var *next_thread= first_thread;
1327     PAGECACHE_HASH_LINK *hash_link=
1328       (PAGECACHE_HASH_LINK *) first_thread->keycache_link;
1329     struct st_my_thread_var *thread;
1330 
1331     DBUG_ASSERT(block->requests + block->wlocks  + block->rlocks +
1332                 block->pins == 0);
1333     DBUG_ASSERT(block->next_used == NULL);
1334 
1335     do
1336     {
1337       thread= next_thread;
1338       next_thread= thread->next;
1339       /*
1340          We notify about the event all threads that ask
1341          for the same page as the first thread in the queue
1342       */
1343       if ((PAGECACHE_HASH_LINK *) thread->keycache_link == hash_link)
1344       {
1345         DBUG_PRINT("signal", ("thread: %s %ld", thread->name,
1346                               (ulong) thread->id));
1347         pagecache_pthread_cond_signal(&thread->suspend);
1348         wqueue_unlink_from_queue(&pagecache->waiting_for_block, thread);
1349         block->requests++;
1350       }
1351     }
1352     while (thread != last_thread);
1353     hash_link->block= block;
1354     /* Ensure that no other thread tries to use this block */
1355     block->status|= PCBLOCK_REASSIGNED;
1356 
1357     DBUG_PRINT("signal", ("after signal"));
1358 #if defined(PAGECACHE_DEBUG)
1359     KEYCACHE_DBUG_PRINT("link_block",
1360         ("linked,unlinked block: %u  status: %x  #requests: %u  #available: %u",
1361          PCBLOCK_NUMBER(pagecache, block), block->status,
1362          block->requests, pagecache->blocks_available));
1363 #endif
1364     DBUG_VOID_RETURN;
1365   }
1366   ptr_ins= hot ? &pagecache->used_ins : &pagecache->used_last;
1367   ins= *ptr_ins;
1368   if (ins)
1369   {
1370     ins->next_used->prev_used= &block->next_used;
1371     block->next_used= ins->next_used;
1372     block->prev_used= &ins->next_used;
1373     ins->next_used= block;
1374     if (at_end)
1375       *ptr_ins= block;
1376   }
1377   else
1378   {
1379     /* The LRU chain is empty */
1380     pagecache->used_last= pagecache->used_ins= block->next_used= block;
1381     block->prev_used= &block->next_used;
1382   }
1383   KEYCACHE_THREAD_TRACE("link_block");
1384 #if defined(PAGECACHE_DEBUG)
1385   pagecache->blocks_available++;
1386   KEYCACHE_DBUG_PRINT("link_block",
1387                       ("linked block: %u:%1u  status: %x  #requests: %u  #available: %u",
1388                        PCBLOCK_NUMBER(pagecache, block), at_end, block->status,
1389                        block->requests, pagecache->blocks_available));
1390   KEYCACHE_DBUG_ASSERT(pagecache->blocks_available <=
1391                        pagecache->blocks_used);
1392 #endif
1393   DBUG_VOID_RETURN;
1394 }
1395 
1396 
1397 /*
1398   Unlink a block from the LRU chain
1399 
1400   SYNOPSIS
1401     unlink_block()
1402       pagecache           pointer to a page cache data structure
1403       block               pointer to the block to unlink from the LRU chain
1404 
1405   RETURN VALUE
1406     none
1407 
1408   NOTES.
1409     See NOTES for link_block
1410 */
1411 
1412 static void unlink_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block)
1413 {
1414   DBUG_ENTER("unlink_block");
1415   DBUG_PRINT("pagecache", ("unlink %p", block));
1416   DBUG_ASSERT(block->next_used != NULL);
1417   if (block->next_used == block)
1418   {
1419     /* The list contains only one member */
1420     pagecache->used_last= pagecache->used_ins= NULL;
1421   }
1422   else
1423   {
1424     block->next_used->prev_used= block->prev_used;
1425     *block->prev_used= block->next_used;
1426     if (pagecache->used_last == block)
1427       pagecache->used_last= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
1428                                        next_used, block->prev_used);
1429     if (pagecache->used_ins == block)
1430       pagecache->used_ins= STRUCT_PTR(PAGECACHE_BLOCK_LINK,
1431                                       next_used, block->prev_used);
1432   }
1433   block->next_used= NULL;
1434 
1435   KEYCACHE_THREAD_TRACE("unlink_block");
1436 #if defined(PAGECACHE_DEBUG)
1437   KEYCACHE_DBUG_ASSERT(pagecache->blocks_available != 0);
1438   pagecache->blocks_available--;
1439   KEYCACHE_DBUG_PRINT("pagecache",
1440                       ("unlinked block: %p (%u)  status: %x   #requests: %u  #available: %u",
1441                        block, PCBLOCK_NUMBER(pagecache, block),
1442                        block->status,
1443                        block->requests, pagecache->blocks_available));
1444   PCBLOCK_INFO(block);
1445 #endif
1446   DBUG_VOID_RETURN;
1447 }
1448 
1449 
1450 /*
1451   Register requests for a block
1452 
1453   SYNOPSIS
1454     reg_requests()
1455     pagecache            this page cache reference
1456     block                the block we request reference
1457     count                how many requests we register (it is 1 everywhere)
1458 
1459   NOTE
1460   Registration of request means we are going to use this block so we exclude
1461   it from the LRU if it is first request
1462 */
1463 static void reg_requests(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
1464                          int count)
1465 {
1466   DBUG_ENTER("reg_requests");
1467   PCBLOCK_INFO(block);
1468   if (! block->requests)
1469     /* First request for the block unlinks it */
1470     unlink_block(pagecache, block);
1471   block->requests+= count;
1472   DBUG_VOID_RETURN;
1473 }
1474 
1475 
1476 /*
1477   Unregister request for a block
1478   linking it to the LRU chain if it's the last request
1479 
1480   SYNOPSIS
1481     unreg_request()
1482     pagecache            pointer to a page cache data structure
1483     block               pointer to the block to link to the LRU chain
1484     at_end              <-> to link the block at the end of the LRU chain
1485 
1486   RETURN VALUE
1487     none
1488 
1489   NOTES.
1490     Every linking to the LRU chain decrements by one a special block
1491     counter (if it's positive). If the at_end parameter is TRUE the block is
1492     added either at the end of warm sub-chain or at the end of hot sub-chain.
1493     It is added to the hot subchain if its counter is zero and number of
1494     blocks in warm sub-chain is not less than some low limit (determined by
1495     the division_limit parameter). Otherwise the block is added to the warm
1496     sub-chain. If the at_end parameter is FALSE the block is always added
1497     at beginning of the warm sub-chain.
1498     Thus a warm block can be promoted to the hot sub-chain when its counter
1499     becomes zero for the first time.
1500     At the same time  the block at the very beginning of the hot subchain
1501     might be moved to the beginning of the warm subchain if it stays untouched
1502     for a too long time (this time is determined by parameter age_threshold).
1503 */
1504 
1505 static void unreg_request(PAGECACHE *pagecache,
1506                           PAGECACHE_BLOCK_LINK *block, int at_end)
1507 {
1508   DBUG_ENTER("unreg_request");
1509   DBUG_PRINT("enter", ("block %p (%u)  status: %x  requests: %u",
1510 		       block, PCBLOCK_NUMBER(pagecache, block),
1511                        block->status, block->requests));
1512   PCBLOCK_INFO(block);
1513   DBUG_ASSERT(block->requests > 0);
1514   if (! --block->requests)
1515   {
1516     my_bool hot;
1517     if (block->hits_left)
1518       block->hits_left--;
1519     hot= !block->hits_left && at_end &&
1520       pagecache->warm_blocks > pagecache->min_warm_blocks;
1521     if (hot)
1522     {
1523       if (block->temperature == PCBLOCK_WARM)
1524         pagecache->warm_blocks--;
1525       block->temperature= PCBLOCK_HOT;
1526       KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %zu",
1527                                             pagecache->warm_blocks));
1528     }
1529     link_block(pagecache, block, hot, (my_bool)at_end);
1530     block->last_hit_time= pagecache->time;
1531     pagecache->time++;
1532 
1533     block= pagecache->used_ins;
1534     /* Check if we should link a hot block to the warm block */
1535     if (block && pagecache->time - block->last_hit_time >
1536 	pagecache->age_threshold)
1537     {
1538       unlink_block(pagecache, block);
1539       link_block(pagecache, block, 0, 0);
1540       if (block->temperature != PCBLOCK_WARM)
1541       {
1542         pagecache->warm_blocks++;
1543         block->temperature= PCBLOCK_WARM;
1544       }
1545       KEYCACHE_DBUG_PRINT("unreg_request", ("#warm_blocks: %zu",
1546                                             pagecache->warm_blocks));
1547     }
1548   }
1549   DBUG_VOID_RETURN;
1550 }
1551 
1552 /*
1553   Remove a reader of the page in block
1554 */
1555 
1556 static inline void remove_reader(PAGECACHE_BLOCK_LINK *block)
1557 {
1558   DBUG_ENTER("remove_reader");
1559   PCBLOCK_INFO(block);
1560   DBUG_ASSERT(block->hash_link->requests > 0);
1561   if (! --block->hash_link->requests && block->condvar)
1562     pagecache_pthread_cond_signal(block->condvar);
1563   DBUG_VOID_RETURN;
1564 }
1565 
1566 
1567 /*
1568   Wait until the last reader of the page in block
1569   signals on its termination
1570 */
1571 
1572 static inline void wait_for_readers(PAGECACHE *pagecache
1573                                     __attribute__((unused)),
1574                                     PAGECACHE_BLOCK_LINK *block
1575                                     __attribute__((unused)))
1576 {
1577   struct st_my_thread_var *thread= my_thread_var;
1578   DBUG_ASSERT(block->condvar == NULL);
1579   while (block->hash_link->requests)
1580   {
1581     DBUG_ENTER("wait_for_readers");
1582     DBUG_PRINT("wait",
1583                ("suspend thread: %s %ld  block: %u",
1584                 thread->name, (ulong) thread->id,
1585                 PCBLOCK_NUMBER(pagecache, block)));
1586     block->condvar= &thread->suspend;
1587     pagecache_pthread_cond_wait(&thread->suspend, &pagecache->cache_lock);
1588     block->condvar= NULL;
1589     DBUG_VOID_RETURN;
1590   }
1591 }
1592 
1593 
1594 /*
1595   Wait until the flush of the page is done.
1596 */
1597 
1598 static void wait_for_flush(PAGECACHE *pagecache
1599                            __attribute__((unused)),
1600                            PAGECACHE_BLOCK_LINK *block
1601                            __attribute__((unused)))
1602 {
1603   struct st_my_thread_var *thread= my_thread_var;
1604   DBUG_ENTER("wait_for_flush");
1605   wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
1606   do
1607   {
1608     DBUG_PRINT("wait",
1609                ("suspend thread %s %ld", thread->name, (ulong) thread->id));
1610     pagecache_pthread_cond_wait(&thread->suspend,
1611                                 &pagecache->cache_lock);
1612   }
1613   while(thread->next);
1614   DBUG_VOID_RETURN;
1615 }
1616 
1617 
1618 /*
1619   Add a hash link to a bucket in the hash_table
1620 */
1621 
1622 static inline void link_hash(PAGECACHE_HASH_LINK **start,
1623                              PAGECACHE_HASH_LINK *hash_link)
1624 {
1625   if (*start)
1626     (*start)->prev= &hash_link->next;
1627   hash_link->next= *start;
1628   hash_link->prev= start;
1629   *start= hash_link;
1630 }
1631 
1632 
1633 /*
1634   Remove a hash link from the hash table
1635 */
1636 
1637 static void unlink_hash(PAGECACHE *pagecache, PAGECACHE_HASH_LINK *hash_link)
1638 {
1639   DBUG_ENTER("unlink_hash");
1640   DBUG_PRINT("enter", ("hash_link: %p  fd: %u  pos: %lu  requests: %u",
1641                        hash_link, (uint) hash_link->file.file,
1642                        (ulong) hash_link->pageno,
1643                        hash_link->requests));
1644   DBUG_ASSERT(hash_link->requests == 0);
1645   DBUG_ASSERT(!hash_link->block || hash_link->block->pins == 0);
1646 
1647   if ((*hash_link->prev= hash_link->next))
1648     hash_link->next->prev= hash_link->prev;
1649   hash_link->block= NULL;
1650   if (pagecache->waiting_for_hash_link.last_thread)
1651   {
1652     /* Signal that a free hash link has appeared */
1653     struct st_my_thread_var *last_thread=
1654                                pagecache->waiting_for_hash_link.last_thread;
1655     struct st_my_thread_var *first_thread= last_thread->next;
1656     struct st_my_thread_var *next_thread= first_thread;
1657     PAGECACHE_PAGE *first_page= (PAGECACHE_PAGE *) (first_thread->keycache_link);
1658     struct st_my_thread_var *thread;
1659 
1660     hash_link->file= first_page->file;
1661     DBUG_ASSERT(first_page->pageno < ((1ULL) << 40));
1662     hash_link->pageno= first_page->pageno;
1663     do
1664     {
1665       PAGECACHE_PAGE *page;
1666       thread= next_thread;
1667       page= (PAGECACHE_PAGE *) thread->keycache_link;
1668       next_thread= thread->next;
1669       /*
1670          We notify about the event all threads that ask
1671          for the same page as the first thread in the queue
1672       */
1673       if (page->file.file == hash_link->file.file &&
1674           page->pageno == hash_link->pageno)
1675       {
1676         DBUG_PRINT("signal", ("thread %s %ld", thread->name,
1677                               (ulong) thread->id));
1678         pagecache_pthread_cond_signal(&thread->suspend);
1679         wqueue_unlink_from_queue(&pagecache->waiting_for_hash_link, thread);
1680       }
1681     }
1682     while (thread != last_thread);
1683 
1684     /*
1685       Add this to the hash, so that the waiting threads can find it
1686       when they retry the call to get_hash_link().  This entry is special
1687       in that it has no associated block.
1688     */
1689     link_hash(&pagecache->hash_root[PAGECACHE_HASH(pagecache,
1690                                                    hash_link->file,
1691                                                    hash_link->pageno)],
1692               hash_link);
1693     DBUG_VOID_RETURN;
1694   }
1695 
1696   /* Add hash to free hash list */
1697   hash_link->next= pagecache->free_hash_list;
1698   pagecache->free_hash_list= hash_link;
1699   DBUG_VOID_RETURN;
1700 }
1701 
1702 
1703 /*
1704   Get the hash link for the page if it is in the cache (do not put the
1705   page in the cache if it is absent there)
1706 
1707   SYNOPSIS
1708     get_present_hash_link()
1709     pagecache            Pagecache reference
1710     file                 file ID
1711     pageno               page number in the file
1712     start                where to put pointer to found hash bucket (for
1713                          direct referring it)
1714 
1715   RETURN
1716     found hashlink pointer
1717 */
1718 
1719 static PAGECACHE_HASH_LINK *get_present_hash_link(PAGECACHE *pagecache,
1720                                                   PAGECACHE_FILE *file,
1721                                                   pgcache_page_no_t pageno,
1722                                                   PAGECACHE_HASH_LINK ***start)
1723 {
1724   reg1 PAGECACHE_HASH_LINK *hash_link;
1725 #if defined(PAGECACHE_DEBUG)
1726   int cnt;
1727 #endif
1728   DBUG_ENTER("get_present_hash_link");
1729   DBUG_PRINT("enter", ("fd: %u  pos: %lu", (uint) file->file, (ulong) pageno));
1730 
1731   /*
1732      Find the bucket in the hash table for the pair (file, pageno);
1733      start contains the head of the bucket list,
1734      hash_link points to the first member of the list
1735   */
1736   hash_link= *(*start= &pagecache->hash_root[PAGECACHE_HASH(pagecache,
1737                                                             *file, pageno)]);
1738 #if defined(PAGECACHE_DEBUG)
1739   cnt= 0;
1740 #endif
1741   /* Look for an element for the pair (file, pageno) in the bucket chain */
1742   while (hash_link &&
1743          (hash_link->pageno != pageno ||
1744           hash_link->file.file != file->file))
1745   {
1746     hash_link= hash_link->next;
1747 #if defined(PAGECACHE_DEBUG)
1748     cnt++;
1749     if (! (cnt <= pagecache->hash_links_used))
1750     {
1751       int i;
1752       for (i=0, hash_link= **start ;
1753            i < cnt ; i++, hash_link= hash_link->next)
1754       {
1755         KEYCACHE_DBUG_PRINT("get_present_hash_link", ("fd: %u  pos: %lu",
1756             (uint) hash_link->file.file, (ulong) hash_link->pageno));
1757       }
1758     }
1759     KEYCACHE_DBUG_ASSERT(cnt <= pagecache->hash_links_used);
1760 #endif
1761   }
1762   if (hash_link)
1763   {
1764     DBUG_PRINT("exit", ("hash_link: %p", hash_link));
1765     /* Register the request for the page */
1766     hash_link->requests++;
1767   }
1768   /*
1769     As soon as the caller will release the page cache's lock, "hash_link"
1770     will be potentially obsolete (unusable) information.
1771   */
1772   DBUG_RETURN(hash_link);
1773 }
1774 
1775 
1776 /*
1777   Get the hash link for a page
1778 */
1779 
1780 static PAGECACHE_HASH_LINK *get_hash_link(PAGECACHE *pagecache,
1781                                           PAGECACHE_FILE *file,
1782                                           pgcache_page_no_t pageno)
1783 {
1784   reg1 PAGECACHE_HASH_LINK *hash_link;
1785   PAGECACHE_HASH_LINK **start;
1786   DBUG_ENTER("get_hash_link");
1787 
1788 restart:
1789   /* try to find the page in the cache */
1790   hash_link= get_present_hash_link(pagecache, file, pageno,
1791                                    &start);
1792   if (!hash_link)
1793   {
1794     /* There is no hash link in the hash table for the pair (file, pageno) */
1795     if (pagecache->free_hash_list)
1796     {
1797       DBUG_PRINT("info", ("free_hash_list: %p  free_hash_list->next: %p",
1798                           pagecache->free_hash_list,
1799                           pagecache->free_hash_list->next));
1800       hash_link= pagecache->free_hash_list;
1801       pagecache->free_hash_list= hash_link->next;
1802     }
1803     else if (pagecache->hash_links_used < pagecache->hash_links)
1804     {
1805       hash_link= &pagecache->hash_link_root[pagecache->hash_links_used++];
1806     }
1807     else
1808     {
1809       /* Wait for a free hash link */
1810       struct st_my_thread_var *thread= my_thread_var;
1811       PAGECACHE_PAGE page;
1812       page.file= *file;
1813       page.pageno= pageno;
1814       thread->keycache_link= (void *) &page;
1815       wqueue_link_into_queue(&pagecache->waiting_for_hash_link, thread);
1816       DBUG_PRINT("wait",
1817                  ("suspend thread %s %ld", thread->name, (ulong) thread->id));
1818       pagecache_pthread_cond_wait(&thread->suspend,
1819                                  &pagecache->cache_lock);
1820       thread->keycache_link= NULL;
1821       DBUG_PRINT("thread", ("restarting..."));
1822       goto restart;
1823     }
1824     hash_link->file= *file;
1825     DBUG_ASSERT(pageno < ((1ULL) << 40));
1826     hash_link->pageno= pageno;
1827     link_hash(start, hash_link);
1828     /* Register the request for the page */
1829     hash_link->requests++;
1830     DBUG_ASSERT(hash_link->block == 0);
1831     DBUG_ASSERT(hash_link->requests == 1);
1832   }
1833   else
1834   {
1835     /*
1836       We have to copy the flush_log callback, as it may change if the table
1837       goes from non_transactional to transactional during recovery
1838     */
1839     hash_link->file.flush_log_callback= file->flush_log_callback;
1840   }
1841   DBUG_PRINT("exit", ("hash_link: %p  block: %p", hash_link,
1842                       hash_link->block));
1843   DBUG_RETURN(hash_link);
1844 }
1845 
1846 
1847 /*
1848   Get a block for the file page requested by a pagecache read/write operation;
1849   If the page is not in the cache return a free block, if there is none
1850   return the lru block after saving its buffer if the page is dirty.
1851 
1852   SYNOPSIS
1853 
1854     find_block()
1855       pagecache            pointer to a page cache data structure
1856       file                handler for the file to read page from
1857       pageno              number of the page in the file
1858       init_hits_left      how initialize the block counter for the page
1859       wrmode              <-> get for writing
1860       block_is_copied     1 if block will be copied from page cache under
1861                           the pagelock mutex.
1862       reg_req             Register request to the page. Normally all pages
1863                           should be registered; The only time it's ok to
1864                           not register a page is when the page is already
1865                           pinned (and thus registered) by the same thread.
1866       page_st        out  {PAGE_READ,PAGE_TO_BE_READ,PAGE_WAIT_TO_BE_READ}
1867 
1868   RETURN VALUE
1869     Pointer to the found block if successful, 0 - otherwise
1870 
1871   NOTES.
1872     For the page from file positioned at pageno the function checks whether
1873     the page is in the key cache specified by the first parameter.
1874     If this is the case it immediately returns the block.
1875     If not, the function first chooses  a block for this page. If there is
1876     no not used blocks in the key cache yet, the function takes the block
1877     at the very beginning of the warm sub-chain. It saves the page in that
1878     block if it's dirty before returning the pointer to it.
1879     The function returns in the page_st parameter the following values:
1880       PAGE_READ         - if page already in the block,
1881       PAGE_TO_BE_READ   - if it is to be read yet by the current thread
1882       WAIT_TO_BE_READ   - if it is to be read by another thread
1883     If an error occurs THE PCBLOCK_ERROR bit is set in the block status.
1884     It might happen that there are no blocks in LRU chain (in warm part) -
1885     all blocks  are unlinked for some read/write operations. Then the function
1886     waits until first of this operations links any block back.
1887 */
1888 
1889 static PAGECACHE_BLOCK_LINK *find_block(PAGECACHE *pagecache,
1890                                         PAGECACHE_FILE *file,
1891                                         pgcache_page_no_t pageno,
1892                                         int init_hits_left,
1893                                         my_bool wrmode,
1894                                         my_bool block_is_copied,
1895                                         my_bool reg_req,
1896                                         int *page_st)
1897 {
1898   PAGECACHE_HASH_LINK *hash_link;
1899   PAGECACHE_BLOCK_LINK *block;
1900   int error= 0;
1901   int page_status;
1902   DBUG_ENTER("find_block");
1903   DBUG_PRINT("enter", ("fd: %d  pos: %lu  wrmode: %d  block_is_copied: %d",
1904                        file->file, (ulong) pageno, wrmode, block_is_copied));
1905   KEYCACHE_PRINT("find_block", ("fd: %d  pos: %lu  wrmode: %d",
1906                                 file->file, (ulong) pageno,
1907                                 wrmode));
1908 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
1909   DBUG_EXECUTE("check_pagecache",
1910                test_key_cache(pagecache, "start of find_block", 0););
1911 #endif
1912 
1913 restart:
1914   /* Find the hash link for the requested page (file, pageno) */
1915   hash_link= get_hash_link(pagecache, file, pageno);
1916 
1917   page_status= -1;
1918   if ((block= hash_link->block) &&
1919       block->hash_link == hash_link && (block->status & PCBLOCK_READ))
1920     page_status= PAGE_READ;
1921 
1922   if (wrmode && pagecache->resize_in_flush)
1923   {
1924     /* This is a write request during the flush phase of a resize operation */
1925 
1926     if (page_status != PAGE_READ)
1927     {
1928       /* We don't need the page in the cache: we are going to write on disk */
1929       DBUG_ASSERT(hash_link->requests > 0);
1930       hash_link->requests--;
1931       unlink_hash(pagecache, hash_link);
1932       return 0;
1933     }
1934     if (!(block->status & PCBLOCK_IN_FLUSH))
1935     {
1936       DBUG_ASSERT(hash_link->requests > 0);
1937       hash_link->requests--;
1938       /*
1939         Remove block to invalidate the page in the block buffer
1940         as we are going to write directly on disk.
1941         Although we have an exclusive lock for the updated key part
1942         the control can be yielded by the current thread as we might
1943         have unfinished readers of other key parts in the block
1944         buffer. Still we are guaranteed not to have any readers
1945         of the key part we are writing into until the block is
1946         removed from the cache as we set the PCBLOCK_REASSIGNED
1947         flag (see the code below that handles reading requests).
1948       */
1949       free_block(pagecache, block, 0);
1950       return 0;
1951     }
1952     /* Wait until the page is flushed on disk */
1953     DBUG_ASSERT(hash_link->requests > 0);
1954     hash_link->requests--;
1955     wait_for_flush(pagecache, block);
1956 
1957     /* Invalidate page in the block if it has not been done yet */
1958     DBUG_ASSERT(block->status);                 /* Should always be true */
1959     if (block->status)
1960       free_block(pagecache, block, 0);
1961     return 0;
1962   }
1963 
1964   if (page_status == PAGE_READ &&
1965       (block->status & (PCBLOCK_IN_SWITCH | PCBLOCK_REASSIGNED)))
1966   {
1967     /* This is a request for a page to be removed from cache */
1968 
1969     KEYCACHE_DBUG_PRINT("find_block",
1970                         ("request for old page in block: %u  "
1971                          "wrmode: %d  block->status: %d",
1972                          PCBLOCK_NUMBER(pagecache, block), wrmode,
1973                          block->status));
1974     /*
1975        Only reading requests can proceed until the old dirty page is flushed,
1976        all others are to be suspended, then resubmitted
1977     */
1978     if (!wrmode && block_is_copied && !(block->status & PCBLOCK_REASSIGNED))
1979     {
1980       if (reg_req)
1981         reg_requests(pagecache, block, 1);
1982     }
1983     else
1984     {
1985       /*
1986         When we come here either PCBLOCK_REASSIGNED or PCBLOCK_IN_SWITCH are
1987         active. In both cases wqueue_release_queue() is called when the
1988         state changes.
1989       */
1990       DBUG_ASSERT(block->hash_link == hash_link);
1991       remove_reader(block);
1992       KEYCACHE_DBUG_PRINT("find_block",
1993                           ("request waiting for old page to be saved"));
1994       {
1995         struct st_my_thread_var *thread= my_thread_var;
1996         /* Put the request into the queue of those waiting for the old page */
1997         wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
1998         /* Wait until the request can be resubmitted */
1999         do
2000         {
2001           DBUG_PRINT("wait",
2002                      ("suspend thread %s %ld", thread->name,
2003                       (ulong) thread->id));
2004           pagecache_pthread_cond_wait(&thread->suspend,
2005                                      &pagecache->cache_lock);
2006         }
2007         while(thread->next);
2008       }
2009       KEYCACHE_DBUG_PRINT("find_block",
2010                           ("request for old page resubmitted"));
2011       DBUG_PRINT("info", ("restarting..."));
2012       /* Resubmit the request */
2013       goto restart;
2014     }
2015   }
2016   else
2017   {
2018     /* This is a request for a new page or for a page not to be removed */
2019     if (! block)
2020     {
2021       /* No block is assigned for the page yet */
2022       if (pagecache->blocks_unused)
2023       {
2024         if (pagecache->free_block_list)
2025         {
2026           /* There is a block in the free list. */
2027           block= pagecache->free_block_list;
2028           pagecache->free_block_list= block->next_used;
2029           block->next_used= NULL;
2030         }
2031         else
2032         {
2033           /* There are some never used blocks, take first of them */
2034           block= &pagecache->block_root[pagecache->blocks_used];
2035           block->buffer= ADD_TO_PTR(pagecache->block_mem,
2036                                     (pagecache->blocks_used*
2037                                      pagecache->block_size),
2038                                     uchar*);
2039           pagecache->blocks_used++;
2040         }
2041         pagecache->blocks_unused--;
2042         DBUG_ASSERT(block->wlocks == 0);
2043         DBUG_ASSERT(block->rlocks == 0);
2044         DBUG_ASSERT(block->rlocks_queue == 0);
2045         DBUG_ASSERT(block->pins == 0);
2046         block->status= 0;
2047 #ifdef DBUG_ASSERT_EXISTS
2048         block->type= PAGECACHE_EMPTY_PAGE;
2049 #endif
2050         DBUG_ASSERT(reg_req);
2051         block->requests= 1;
2052         block->temperature= PCBLOCK_COLD;
2053         block->hits_left= init_hits_left;
2054         block->last_hit_time= 0;
2055         block->rec_lsn= LSN_MAX;
2056         link_to_file_list(pagecache, block, file, 0);
2057         block->hash_link= hash_link;
2058         hash_link->block= block;
2059         page_status= PAGE_TO_BE_READ;
2060         DBUG_PRINT("info", ("page to be read set for page %p (%u)",
2061                             block, PCBLOCK_NUMBER(pagecache, block)));
2062         KEYCACHE_PRINT("find_block",
2063                        ("got free or never used block %u",
2064                         PCBLOCK_NUMBER(pagecache, block)));
2065       }
2066       else
2067       {
2068 	/* There are no never used blocks, use a block from the LRU chain */
2069 
2070         /*
2071           Ensure that we are going to register the block.
2072           (This should be true as a new block could not have been
2073           pinned by caller).
2074         */
2075         DBUG_ASSERT(reg_req);
2076 
2077         if (! pagecache->used_last)
2078         {
2079           /*
2080             Wait until a new block is added to the LRU chain;
2081             several threads might wait here for the same page,
2082             all of them must get the same block.
2083 
2084             The block is given to us by the next thread executing
2085             link_block().
2086           */
2087 
2088           struct st_my_thread_var *thread= my_thread_var;
2089           thread->keycache_link= (void *) hash_link;
2090           wqueue_link_into_queue(&pagecache->waiting_for_block, thread);
2091           do
2092           {
2093             DBUG_PRINT("wait",
2094                        ("suspend thread %s %ld", thread->name,
2095                         (ulong) thread->id));
2096             pagecache_pthread_cond_wait(&thread->suspend,
2097                                        &pagecache->cache_lock);
2098           }
2099           while (thread->next);
2100           thread->keycache_link= NULL;
2101           block= hash_link->block;
2102           /* Ensure that the block is registered */
2103           DBUG_ASSERT(block->requests >= 1);
2104         }
2105         else
2106         {
2107           /*
2108              Take the first block from the LRU chain
2109              unlinking it from the chain
2110           */
2111           block= pagecache->used_last->next_used;
2112 	  if (reg_req)
2113             reg_requests(pagecache, block, 1);
2114           hash_link->block= block;
2115           DBUG_ASSERT(block->requests == 1);
2116         }
2117 
2118         PCBLOCK_INFO(block);
2119 
2120         DBUG_ASSERT(block->hash_link == hash_link ||
2121                     !(block->status & PCBLOCK_IN_SWITCH));
2122 
2123         if (block->hash_link != hash_link &&
2124 	    ! (block->status & PCBLOCK_IN_SWITCH) )
2125         {
2126           /* If another thread is flushing the block, wait for it. */
2127           if (block->status & PCBLOCK_IN_FLUSH)
2128             wait_for_flush(pagecache, block);
2129 
2130 	  /* this is a primary request for a new page */
2131           DBUG_ASSERT(block->wlocks == 0);
2132           DBUG_ASSERT(block->rlocks == 0);
2133           DBUG_ASSERT(block->rlocks_queue == 0);
2134           DBUG_ASSERT(block->pins == 0);
2135           block->status|= PCBLOCK_IN_SWITCH;
2136 
2137           KEYCACHE_DBUG_PRINT("find_block",
2138                               ("got block %u for new page",
2139                                PCBLOCK_NUMBER(pagecache, block)));
2140 
2141           if (block->status & PCBLOCK_CHANGED)
2142           {
2143 	    /* The block contains a dirty page - push it out of the cache */
2144 
2145             KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
2146 
2147             /*
2148 	      The call is thread safe because only the current
2149 	      thread might change the block->hash_link value
2150             */
2151             DBUG_ASSERT(block->pins == 0);
2152             pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
2153             error= pagecache_fwrite(pagecache,
2154                                     &block->hash_link->file,
2155                                     block->buffer,
2156                                     block->hash_link->pageno,
2157                                     block->type,
2158                                     pagecache->readwrite_flags);
2159             pagecache_pthread_mutex_lock(&pagecache->cache_lock);
2160 	    pagecache->global_cache_write++;
2161           }
2162 
2163           block->status|= PCBLOCK_REASSIGNED;
2164           if (block->hash_link)
2165           {
2166             /*
2167 	      Wait until all pending read requests
2168 	      for this page are executed
2169 	      (we could have avoided this waiting, if we had read
2170 	      a page in the cache in a sweep, without yielding control)
2171             */
2172             wait_for_readers(pagecache, block);
2173 
2174             /* Remove the hash link for this page from the hash table */
2175             unlink_hash(pagecache, block->hash_link);
2176 
2177             /* All pending requests for this page must be resubmitted */
2178             if (block->wqueue[COND_FOR_SAVED].last_thread)
2179               wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
2180           }
2181           link_to_file_list(pagecache, block, file,
2182                             (my_bool)(block->hash_link ? 1 : 0));
2183 
2184           block->hash_link= hash_link;
2185           PCBLOCK_INFO(block);
2186           block->hits_left= init_hits_left;
2187           block->last_hit_time= 0;
2188           block->status= error ? PCBLOCK_ERROR : 0;
2189           block->error=  error ? (int16) my_errno : 0;
2190 #ifdef DBUG_ASSERT_EXISTS
2191           block->type= PAGECACHE_EMPTY_PAGE;
2192           if (error)
2193             my_debug_put_break_here();
2194 #endif
2195           page_status= PAGE_TO_BE_READ;
2196           DBUG_PRINT("info", ("page to be read set for page %p", block));
2197 
2198           KEYCACHE_DBUG_ASSERT(block->hash_link->block == block);
2199           KEYCACHE_DBUG_ASSERT(hash_link->block->hash_link == hash_link);
2200         }
2201         else
2202         {
2203           /* This is for secondary requests for a new page only */
2204           KEYCACHE_DBUG_PRINT("find_block",
2205                               ("block->hash_link: %p  hash_link: %p  "
2206                                "block->status: %u", block->hash_link,
2207                                hash_link, block->status ));
2208           page_status= (((block->hash_link == hash_link) &&
2209                          (block->status & PCBLOCK_READ)) ?
2210                         PAGE_READ : PAGE_WAIT_TO_BE_READ);
2211         }
2212       }
2213     }
2214     else
2215     {
2216       /*
2217         The block was found in the cache. It's either a already read
2218         block or a block waiting to be read by another thread.
2219       */
2220       if (reg_req)
2221 	reg_requests(pagecache, block, 1);
2222       KEYCACHE_DBUG_PRINT("find_block",
2223                           ("block->hash_link: %p  hash_link: %p  "
2224                            "block->status: %u", block->hash_link,
2225                            hash_link, block->status ));
2226       /*
2227         block->hash_link != hash_link can only happen when
2228         the block is in PCBLOCK_IN_SWITCH above (is flushed out
2229         to be replaced by another block). The SWITCH code will change
2230         block->hash_link to point to hash_link.
2231       */
2232       KEYCACHE_DBUG_ASSERT(block->hash_link == hash_link ||
2233                            block->status & PCBLOCK_IN_SWITCH);
2234       page_status= (((block->hash_link == hash_link) &&
2235                      (block->status & PCBLOCK_READ)) ?
2236                     PAGE_READ : PAGE_WAIT_TO_BE_READ);
2237     }
2238   }
2239 
2240   KEYCACHE_DBUG_ASSERT(page_status != -1);
2241   *page_st= page_status;
2242   DBUG_PRINT("info",
2243              ("block: %p  fd: %u  pos: %lu  block->status: %u  page_status: %u",
2244               block, (uint) file->file,
2245               (ulong) pageno, block->status, (uint) page_status));
2246   KEYCACHE_PRINT("find_block",
2247                  ("block: %p  fd: %d  pos: %lu  block->status: %u  page_status: %d",
2248                   block, file->file, (ulong) pageno, block->status,
2249                   page_status));
2250 
2251 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
2252   DBUG_EXECUTE("check_pagecache",
2253                test_key_cache(pagecache, "end of find_block",0););
2254 #endif
2255   KEYCACHE_THREAD_TRACE("find_block:end");
2256   DBUG_RETURN(block);
2257 }
2258 
2259 
2260 static void add_pin(PAGECACHE_BLOCK_LINK *block)
2261 {
2262   DBUG_ENTER("add_pin");
2263   DBUG_PRINT("enter", ("block: %p  pins: %u", block, block->pins));
2264   PCBLOCK_INFO(block);
2265   block->pins++;
2266 #ifndef DBUG_OFF
2267   {
2268     PAGECACHE_PIN_INFO *info=
2269       (PAGECACHE_PIN_INFO *)my_malloc(sizeof(PAGECACHE_PIN_INFO), MYF(0));
2270     info->thread= my_thread_var;
2271     info_link(&block->pin_list, info);
2272   }
2273 #endif
2274   DBUG_VOID_RETURN;
2275 }
2276 
2277 static void remove_pin(PAGECACHE_BLOCK_LINK *block, my_bool any
2278 #ifdef DBUG_OFF
2279                        __attribute__((unused))
2280 #endif
2281                        )
2282 {
2283   DBUG_ENTER("remove_pin");
2284   DBUG_PRINT("enter", ("block: %p  pins: %u  any: %d", block, block->pins,
2285                        (int)any));
2286   PCBLOCK_INFO(block);
2287   DBUG_ASSERT(block->pins > 0);
2288   block->pins--;
2289 #ifndef DBUG_OFF
2290   {
2291     PAGECACHE_PIN_INFO *info= info_find(block->pin_list, my_thread_var, any);
2292     DBUG_ASSERT(info != 0);
2293     info_unlink(info);
2294     my_free(info);
2295   }
2296 #endif
2297   DBUG_VOID_RETURN;
2298 }
2299 #ifndef DBUG_OFF
2300 static void info_add_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
2301 {
2302   PAGECACHE_LOCK_INFO *info=
2303     (PAGECACHE_LOCK_INFO *)my_malloc(sizeof(PAGECACHE_LOCK_INFO), MYF(0));
2304   info->thread= my_thread_var;
2305   info->write_lock= wl;
2306   info_link((PAGECACHE_PIN_INFO **)&block->lock_list,
2307 	    (PAGECACHE_PIN_INFO *)info);
2308 }
2309 static void info_remove_lock(PAGECACHE_BLOCK_LINK *block)
2310 {
2311   PAGECACHE_LOCK_INFO *info=
2312     (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
2313                                      my_thread_var, FALSE);
2314   DBUG_ASSERT(info != 0);
2315   info_unlink((PAGECACHE_PIN_INFO *)info);
2316   my_free(info);
2317 }
2318 static void info_change_lock(PAGECACHE_BLOCK_LINK *block, my_bool wl)
2319 {
2320   PAGECACHE_LOCK_INFO *info=
2321     (PAGECACHE_LOCK_INFO *)info_find((PAGECACHE_PIN_INFO *)block->lock_list,
2322                                      my_thread_var, FALSE);
2323   DBUG_ASSERT(info != 0);
2324   DBUG_ASSERT(info->write_lock != wl);
2325   info->write_lock= wl;
2326 }
2327 #else
2328 #define info_add_lock(B,W)
2329 #define info_remove_lock(B)
2330 #define info_change_lock(B,W)
2331 #endif
2332 
2333 
2334 /**
2335   @brief waiting for lock for read and write lock
2336 
2337   @parem pagecache       pointer to a page cache data structure
2338   @parem block           the block to work with
2339   @param file            file of the block when it was locked
2340   @param pageno          page number of the block when it was locked
2341   @param lock_type       MY_PTHREAD_LOCK_READ or MY_PTHREAD_LOCK_WRITE
2342 
2343   @retval 0 OK
2344   @retval 1 Can't lock this block, need retry
2345 */
2346 
2347 static my_bool pagecache_wait_lock(PAGECACHE *pagecache,
2348                                   PAGECACHE_BLOCK_LINK *block,
2349                                   PAGECACHE_FILE file,
2350                                   pgcache_page_no_t pageno,
2351                                   uint lock_type)
2352 {
2353   /* Lock failed we will wait */
2354   struct st_my_thread_var *thread= my_thread_var;
2355   DBUG_ENTER("pagecache_wait_lock");
2356   DBUG_PRINT("info", ("fail to lock, waiting... %p", block));
2357   thread->lock_type= lock_type;
2358   wqueue_add_to_queue(&block->wqueue[COND_FOR_WRLOCK], thread);
2359   dec_counter_for_resize_op(pagecache);
2360   do
2361   {
2362     DBUG_PRINT("wait",
2363                ("suspend thread %s %ld", thread->name, (ulong) thread->id));
2364     pagecache_pthread_cond_wait(&thread->suspend,
2365                                 &pagecache->cache_lock);
2366   }
2367   while(thread->next);
2368   inc_counter_for_resize_op(pagecache);
2369   PCBLOCK_INFO(block);
2370   if ((block->status & (PCBLOCK_REASSIGNED | PCBLOCK_IN_SWITCH)) ||
2371       !block->hash_link ||
2372       file.file != block->hash_link->file.file ||
2373       pageno != block->hash_link->pageno)
2374   {
2375     DBUG_PRINT("info", ("the block %p changed => need retry "
2376                         "status: %x  files %d != %d or pages %lu != %lu",
2377                         block, block->status, file.file,
2378                         block->hash_link ? block->hash_link->file.file : -1,
2379                         (ulong) pageno,
2380                         (ulong) (block->hash_link ? block->hash_link->pageno : 0)));
2381     DBUG_RETURN(1);
2382   }
2383   DBUG_RETURN(0);
2384 }
2385 
2386 /**
2387   @brief Put on the block write lock
2388 
2389   @parem pagecache       pointer to a page cache data structure
2390   @parem block           the block to work with
2391 
2392   @note We have loose scheme for locking by the same thread:
2393     * Downgrade to read lock if no other locks are taken
2394     * Our scheme of locking allow for the same thread
2395       - the same kind of lock
2396       - taking read lock if write lock present
2397       - downgrading to read lock if still other place the same
2398         thread keep write lock
2399     * But unlock operation number should be the same to lock operation.
2400     * If we try to get read lock having active write locks we put read
2401       locks to queue, and as soon as write lock(s) gone the read locks
2402       from queue came in force.
2403     * If read lock is unlocked earlier then it came to force it
2404       just removed from the queue
2405 
2406   @retval 0 OK
2407   @retval 1 Can't lock this block, need retry
2408 */
2409 
2410 static my_bool get_wrlock(PAGECACHE *pagecache,
2411                           PAGECACHE_BLOCK_LINK *block)
2412 {
2413   PAGECACHE_FILE file= block->hash_link->file;
2414   pgcache_page_no_t pageno= block->hash_link->pageno;
2415   pthread_t locker= pthread_self();
2416   DBUG_ENTER("get_wrlock");
2417   DBUG_PRINT("info", ("the block %p "
2418                       "files %d(%d)  pages %lu(%lu)",
2419                       block, file.file, block->hash_link->file.file,
2420                       (ulong) pageno, (ulong) block->hash_link->pageno));
2421   PCBLOCK_INFO(block);
2422   /*
2423     We assume that the same thread will try write lock on block on which it
2424     has already read lock.
2425   */
2426   while ((block->wlocks && !pthread_equal(block->write_locker, locker)) ||
2427          block->rlocks)
2428   {
2429     /* Lock failed we will wait */
2430     if (pagecache_wait_lock(pagecache, block, file, pageno,
2431                            MY_PTHREAD_LOCK_WRITE))
2432       DBUG_RETURN(1);
2433   }
2434   /* we are doing it by global cache mutex protection, so it is OK */
2435   block->wlocks++;
2436   block->write_locker= locker;
2437   DBUG_PRINT("info", ("WR lock set, block %p", block));
2438   DBUG_RETURN(0);
2439 }
2440 
2441 
2442 /*
2443   @brief Put on the block read lock
2444 
2445   @param pagecache       pointer to a page cache data structure
2446   @param block           the block to work with
2447   @param user_file	 Unique handler per handler file. Used to check if
2448 			 we request many write locks withing the same
2449                          statement
2450 
2451   @note see note for get_wrlock().
2452 
2453   @retvalue 0 OK
2454   @retvalue 1 Can't lock this block, need retry
2455 */
2456 
2457 static my_bool get_rdlock(PAGECACHE *pagecache,
2458                           PAGECACHE_BLOCK_LINK *block)
2459 {
2460   PAGECACHE_FILE file= block->hash_link->file;
2461   pgcache_page_no_t pageno= block->hash_link->pageno;
2462   pthread_t locker= pthread_self();
2463   DBUG_ENTER("get_rdlock");
2464   DBUG_PRINT("info", ("the block %p "
2465                       "files %d(%d)  pages %lu(%lu)",
2466                       block, file.file, block->hash_link->file.file,
2467                       (ulong) pageno, (ulong) block->hash_link->pageno));
2468   PCBLOCK_INFO(block);
2469   while (block->wlocks && !pthread_equal(block->write_locker, locker))
2470   {
2471     /* Lock failed we will wait */
2472     if (pagecache_wait_lock(pagecache, block, file, pageno,
2473                            MY_PTHREAD_LOCK_READ))
2474       DBUG_RETURN(1);
2475   }
2476   /* we are doing it by global cache mutex protection, so it is OK */
2477   if (block->wlocks)
2478   {
2479     DBUG_ASSERT(pthread_equal(block->write_locker, locker));
2480     block->rlocks_queue++;
2481     DBUG_PRINT("info", ("RD lock put into queue, block %p", block));
2482   }
2483   else
2484   {
2485     block->rlocks++;
2486     DBUG_PRINT("info", ("RD lock set, block %p", block));
2487   }
2488   DBUG_RETURN(0);
2489 }
2490 
2491 
2492 /*
2493   @brief Remove write lock from the block
2494 
2495   @param pagecache       pointer to a page cache data structure
2496   @param block           the block to work with
2497   @param read_lock       downgrade to read lock
2498 
2499   @note see note for get_wrlock().
2500 */
2501 
2502 static void release_wrlock(PAGECACHE_BLOCK_LINK *block, my_bool read_lock)
2503 {
2504   DBUG_ENTER("release_wrlock");
2505   PCBLOCK_INFO(block);
2506   DBUG_ASSERT(block->wlocks > 0);
2507   DBUG_ASSERT(block->rlocks == 0);
2508   DBUG_ASSERT(block->pins > 0);
2509   if (read_lock)
2510     block->rlocks_queue++;
2511   if (block->wlocks == 1)
2512   {
2513     block->rlocks= block->rlocks_queue;
2514     block->rlocks_queue= 0;
2515   }
2516   block->wlocks--;
2517   if (block->wlocks > 0)
2518     DBUG_VOID_RETURN;                      /* Multiple write locked */
2519   DBUG_PRINT("info", ("WR lock reset, block %p", block));
2520   /* release all threads waiting for read lock or one waiting for write */
2521   if (block->wqueue[COND_FOR_WRLOCK].last_thread)
2522     wqueue_release_one_locktype_from_queue(&block->wqueue[COND_FOR_WRLOCK]);
2523   PCBLOCK_INFO(block);
2524   DBUG_VOID_RETURN;
2525 }
2526 
2527 /*
2528   @brief Remove read lock from the block
2529 
2530   @param pagecache       pointer to a page cache data structure
2531   @param block           the block to work with
2532 
2533   @note see note for get_wrlock().
2534 */
2535 
2536 static void release_rdlock(PAGECACHE_BLOCK_LINK *block)
2537 {
2538   DBUG_ENTER("release_wrlock");
2539   PCBLOCK_INFO(block);
2540   if (block->wlocks)
2541   {
2542     DBUG_ASSERT(pthread_equal(block->write_locker, pthread_self()));
2543     DBUG_ASSERT(block->rlocks == 0);
2544     DBUG_ASSERT(block->rlocks_queue > 0);
2545     block->rlocks_queue--;
2546     DBUG_PRINT("info", ("RD lock queue decreased, block %p", block));
2547     DBUG_VOID_RETURN;
2548   }
2549   DBUG_ASSERT(block->rlocks > 0);
2550   DBUG_ASSERT(block->rlocks_queue == 0);
2551   block->rlocks--;
2552   DBUG_PRINT("info", ("RD lock decreased, block %p", block));
2553   if (block->rlocks > 0)
2554     DBUG_VOID_RETURN;                      /* Multiple write locked */
2555   DBUG_PRINT("info", ("RD lock reset, block %p", block));
2556   /* release all threads waiting for read lock or one waiting for write */
2557   if (block->wqueue[COND_FOR_WRLOCK].last_thread)
2558     wqueue_release_one_locktype_from_queue(&block->wqueue[COND_FOR_WRLOCK]);
2559   PCBLOCK_INFO(block);
2560   DBUG_VOID_RETURN;
2561 }
2562 
2563 /**
2564   @brief Try to lock/unlock and pin/unpin the block
2565 
2566   @param pagecache       pointer to a page cache data structure
2567   @param block           the block to work with
2568   @param lock            lock change mode
2569   @param pin             pinchange mode
2570   @param file            File handler requesting pin
2571   @param any             allow unpinning block pinned by any thread; possible
2572                          only if not locked, see pagecache_unlock_by_link()
2573 
2574   @retval 0 OK
2575   @retval 1 Try to lock the block failed
2576 */
2577 
2578 static my_bool make_lock_and_pin(PAGECACHE *pagecache,
2579                                  PAGECACHE_BLOCK_LINK *block,
2580                                  enum pagecache_page_lock lock,
2581                                  enum pagecache_page_pin pin,
2582                                  my_bool any)
2583 {
2584   DBUG_ENTER("make_lock_and_pin");
2585   DBUG_PRINT("enter", ("block: %p (%u)  lock: %s  pin: %s any %d",
2586                        block, PCBLOCK_NUMBER(pagecache, block),
2587                        page_cache_page_lock_str[lock],
2588                        page_cache_page_pin_str[pin], (int)any));
2589   PCBLOCK_INFO(block);
2590 
2591   DBUG_ASSERT(block);
2592   DBUG_ASSERT(!any ||
2593               ((lock == PAGECACHE_LOCK_LEFT_UNLOCKED) &&
2594                (pin == PAGECACHE_UNPIN)));
2595   DBUG_ASSERT(block->hash_link->block == block);
2596 
2597   switch (lock) {
2598   case PAGECACHE_LOCK_WRITE:               /* free  -> write */
2599     /* Writelock and pin the buffer */
2600     if (get_wrlock(pagecache, block))
2601     {
2602       /* Couldn't lock because block changed status => need retry */
2603       goto retry;
2604     }
2605 
2606     /* The cache is locked so nothing afraid of */
2607     add_pin(block);
2608     info_add_lock(block, 1);
2609     break;
2610   case PAGECACHE_LOCK_WRITE_TO_READ:       /* write -> read  */
2611   case PAGECACHE_LOCK_WRITE_UNLOCK:        /* write -> free  */
2612     /* Removes write lock and puts read lock */
2613     release_wrlock(block, lock == PAGECACHE_LOCK_WRITE_TO_READ);
2614     /* fall through */
2615   case PAGECACHE_LOCK_READ_UNLOCK:         /* read  -> free  */
2616     if (lock == PAGECACHE_LOCK_READ_UNLOCK)
2617       release_rdlock(block);
2618     /* fall through */
2619   case PAGECACHE_LOCK_LEFT_READLOCKED:     /* read  -> read  */
2620     if (pin == PAGECACHE_UNPIN)
2621     {
2622       remove_pin(block, FALSE);
2623     }
2624     if (lock == PAGECACHE_LOCK_WRITE_TO_READ)
2625     {
2626       info_change_lock(block, 0);
2627     }
2628     else if (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
2629              lock == PAGECACHE_LOCK_READ_UNLOCK)
2630     {
2631       info_remove_lock(block);
2632     }
2633     break;
2634   case PAGECACHE_LOCK_READ:                /* free  -> read  */
2635     if (get_rdlock(pagecache, block))
2636     {
2637       /* Couldn't lock because block changed status => need retry */
2638       goto retry;
2639     }
2640 
2641     if (pin == PAGECACHE_PIN)
2642     {
2643       /* The cache is locked so nothing afraid off */
2644       add_pin(block);
2645     }
2646     info_add_lock(block, 0);
2647     break;
2648   case PAGECACHE_LOCK_LEFT_UNLOCKED:       /* free  -> free  */
2649     if (pin == PAGECACHE_UNPIN)
2650     {
2651       remove_pin(block, any);
2652     }
2653     /* fall through */
2654   case PAGECACHE_LOCK_LEFT_WRITELOCKED:    /* write -> write */
2655     break; /* do nothing */
2656   default:
2657     DBUG_ASSERT(0); /* Never should happened */
2658   }
2659 
2660   PCBLOCK_INFO(block);
2661   DBUG_RETURN(0);
2662 retry:
2663   DBUG_PRINT("INFO", ("Retry block %p", block));
2664   PCBLOCK_INFO(block);
2665   DBUG_ASSERT(block->hash_link->requests > 0);
2666   block->hash_link->requests--;
2667   DBUG_RETURN(1);
2668 
2669 }
2670 
2671 
2672 /*
2673   Read into a key cache block buffer from disk.
2674 
2675   SYNOPSIS
2676 
2677     read_block()
2678       pagecache           pointer to a page cache data structure
2679       block               block to which buffer the data is to be read
2680       primary             <-> the current thread will read the data
2681 
2682   RETURN VALUE
2683     None
2684 
2685   NOTES.
2686     The function either reads a page data from file to the block buffer,
2687     or waits until another thread reads it. What page to read is determined
2688     by a block parameter - reference to a hash link for this page.
2689     If an error occurs THE PCBLOCK_ERROR bit is set in the block status.
2690 
2691     On entry cache_lock is locked
2692 */
2693 
2694 static void read_block(PAGECACHE *pagecache,
2695                        PAGECACHE_BLOCK_LINK *block,
2696                        my_bool primary)
2697 {
2698   DBUG_ENTER("read_block");
2699   DBUG_PRINT("enter", ("read block: %p  primary: %d", block, primary));
2700   if (primary)
2701   {
2702     size_t error;
2703     PAGECACHE_IO_HOOK_ARGS args;
2704     /*
2705       This code is executed only by threads
2706       that submitted primary requests
2707     */
2708 
2709     pagecache->global_cache_read++;
2710     /*
2711       Page is not in buffer yet, is to be read from disk
2712       Here other threads may step in and register as secondary readers.
2713       They will register in block->wqueue[COND_FOR_REQUESTED].
2714     */
2715     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
2716     args.page= block->buffer;
2717     args.pageno= block->hash_link->pageno;
2718     args.data= block->hash_link->file.callback_data;
2719     error= (*block->hash_link->file.pre_read_hook)(&args);
2720     if (!error)
2721     {
2722       error= pagecache_fread(pagecache, &block->hash_link->file,
2723                              args.page,
2724                              block->hash_link->pageno,
2725                              pagecache->readwrite_flags);
2726     }
2727     error= (*block->hash_link->file.post_read_hook)(error != 0, &args);
2728     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
2729     if (error)
2730     {
2731       DBUG_ASSERT(maria_in_recovery || !maria_assert_if_crashed_table);
2732       block->status|= PCBLOCK_ERROR;
2733       block->error=   (int16) my_errno;
2734       my_debug_put_break_here();
2735     }
2736     else
2737     {
2738       block->status|= PCBLOCK_READ;
2739     }
2740     DBUG_PRINT("read_block",
2741                ("primary request: new page in cache"));
2742     /* Signal that all pending requests for this page now can be processed */
2743     if (block->wqueue[COND_FOR_REQUESTED].last_thread)
2744       wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
2745   }
2746   else
2747   {
2748     /*
2749       This code is executed only by threads
2750       that submitted secondary requests
2751     */
2752 
2753       struct st_my_thread_var *thread= my_thread_var;
2754       /* Put the request into a queue and wait until it can be processed */
2755       wqueue_add_to_queue(&block->wqueue[COND_FOR_REQUESTED], thread);
2756       do
2757       {
2758         DBUG_PRINT("wait",
2759                    ("suspend thread %s %ld", thread->name,
2760                     (ulong) thread->id));
2761         pagecache_pthread_cond_wait(&thread->suspend,
2762                                    &pagecache->cache_lock);
2763       }
2764       while (thread->next);
2765     DBUG_PRINT("read_block",
2766                ("secondary request: new page in cache"));
2767   }
2768   DBUG_VOID_RETURN;
2769 }
2770 
2771 
2772 /**
2773    @brief Set LSN on the page to the given one if the given LSN is bigger
2774 
2775    @param  pagecache        pointer to a page cache data structure
2776    @param  lsn              LSN to set
2777    @param  block            block to check and set
2778 */
2779 
2780 static void check_and_set_lsn(PAGECACHE *pagecache,
2781                               LSN lsn, PAGECACHE_BLOCK_LINK *block)
2782 {
2783   LSN old;
2784   DBUG_ENTER("check_and_set_lsn");
2785   /*
2786     In recovery, we can _ma_unpin_all_pages() to put a LSN on page, though
2787     page would be PAGECACHE_PLAIN_PAGE (transactionality temporarily disabled
2788     to not log REDOs).
2789   */
2790   DBUG_ASSERT((block->type == PAGECACHE_LSN_PAGE) || maria_in_recovery);
2791   old= lsn_korr(block->buffer);
2792   DBUG_PRINT("info", ("old lsn: " LSN_FMT "  new lsn: " LSN_FMT,
2793                       LSN_IN_PARTS(old), LSN_IN_PARTS(lsn)));
2794   if (cmp_translog_addr(lsn, old) > 0)
2795   {
2796 
2797     DBUG_ASSERT(block->type != PAGECACHE_READ_UNKNOWN_PAGE);
2798     lsn_store(block->buffer, lsn);
2799     /* we stored LSN in page so we dirtied it */
2800     if (!(block->status & PCBLOCK_CHANGED))
2801       link_to_changed_list(pagecache, block);
2802   }
2803   DBUG_VOID_RETURN;
2804 }
2805 
2806 
2807 /**
2808   @brief Unlock/unpin page and put LSN stamp if it need
2809 
2810   @param pagecache      pointer to a page cache data structure
2811   @pagam file           handler for the file for the block of data to be read
2812   @param pageno         number of the block of data in the file
2813   @param lock           lock change
2814   @param pin            pin page
2815   @param first_REDO_LSN_for_page do not set it if it is zero
2816   @param lsn            if it is not LSN_IMPOSSIBLE (0) and it
2817                         is bigger then LSN on the page it will be written on
2818                         the page
2819   @param was_changed    should be true if the page was write locked with
2820                         direct link giving and the page was changed
2821 
2822   @note
2823     Pininig uses requests registration mechanism it works following way:
2824                                 | beginnig    | ending        |
2825                                 | of func.    | of func.      |
2826     ----------------------------+-------------+---------------+
2827     PAGECACHE_PIN_LEFT_PINNED   |      -      |       -       |
2828     PAGECACHE_PIN_LEFT_UNPINNED | reg request | unreg request |
2829     PAGECACHE_PIN               | reg request |       -       |
2830     PAGECACHE_UNPIN             |      -      | unreg request |
2831 
2832 
2833 */
2834 
2835 void pagecache_unlock(PAGECACHE *pagecache,
2836                       PAGECACHE_FILE *file,
2837                       pgcache_page_no_t pageno,
2838                       enum pagecache_page_lock lock,
2839                       enum pagecache_page_pin pin,
2840                       LSN first_REDO_LSN_for_page,
2841                       LSN lsn, my_bool was_changed)
2842 {
2843   PAGECACHE_BLOCK_LINK *block;
2844   int page_st;
2845   DBUG_ENTER("pagecache_unlock");
2846   DBUG_PRINT("enter", ("fd: %u  page: %lu  %s  %s",
2847                        (uint) file->file, (ulong) pageno,
2848                        page_cache_page_lock_str[lock],
2849                        page_cache_page_pin_str[pin]));
2850   /* we do not allow any lock/pin increasing here */
2851   DBUG_ASSERT(pin != PAGECACHE_PIN);
2852   DBUG_ASSERT(lock != PAGECACHE_LOCK_READ && lock != PAGECACHE_LOCK_WRITE);
2853 
2854   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
2855   /*
2856     As soon as we keep lock cache can be used, and we have lock because want
2857     to unlock.
2858   */
2859   DBUG_ASSERT(pagecache->can_be_used);
2860 
2861   inc_counter_for_resize_op(pagecache);
2862   /* See NOTE for pagecache_unlock about registering requests */
2863   block= find_block(pagecache, file, pageno, 0, 0, 0,
2864                     pin == PAGECACHE_PIN_LEFT_UNPINNED, &page_st);
2865   PCBLOCK_INFO(block);
2866   DBUG_ASSERT(block != 0 && page_st == PAGE_READ);
2867   if (first_REDO_LSN_for_page)
2868   {
2869     DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK);
2870     DBUG_ASSERT(pin == PAGECACHE_UNPIN);
2871     pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
2872   }
2873   if (lsn != LSN_IMPOSSIBLE)
2874     check_and_set_lsn(pagecache, lsn, block);
2875 
2876   /* if we lock for write we must link the block to changed blocks */
2877   DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0 ||
2878               (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
2879                lock == PAGECACHE_LOCK_WRITE_TO_READ ||
2880                lock == PAGECACHE_LOCK_LEFT_WRITELOCKED));
2881   /*
2882     if was_changed then status should be PCBLOCK_DIRECT_W or marked
2883     as dirty
2884   */
2885   DBUG_ASSERT(!was_changed || (block->status & PCBLOCK_DIRECT_W) ||
2886               (block->status & PCBLOCK_CHANGED));
2887   if ((block->status & PCBLOCK_DIRECT_W) &&
2888       (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
2889        lock == PAGECACHE_LOCK_WRITE_TO_READ))
2890   {
2891     if (!(block->status & PCBLOCK_CHANGED) && was_changed)
2892       link_to_changed_list(pagecache, block);
2893     block->status&= ~PCBLOCK_DIRECT_W;
2894     DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: %p", block));
2895   }
2896 
2897   if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
2898   {
2899     DBUG_ASSERT(0); /* should not happend */
2900   }
2901 
2902   remove_reader(block);
2903   /*
2904     Link the block into the LRU chain if it's the last submitted request
2905     for the block and block will not be pinned.
2906     See NOTE for pagecache_unlock about registering requests.
2907   */
2908   if (pin != PAGECACHE_PIN_LEFT_PINNED)
2909     unreg_request(pagecache, block, 1);
2910 
2911   dec_counter_for_resize_op(pagecache);
2912 
2913   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
2914 
2915   DBUG_VOID_RETURN;
2916 }
2917 
2918 
2919 /*
2920   Unpin page
2921 
2922   SYNOPSIS
2923     pagecache_unpin()
2924     pagecache           pointer to a page cache data structure
2925     file                handler for the file for the block of data to be read
2926     pageno              number of the block of data in the file
2927     lsn                 if it is not LSN_IMPOSSIBLE (0) and it
2928                         is bigger then LSN on the page it will be written on
2929                         the page
2930 */
2931 
2932 void pagecache_unpin(PAGECACHE *pagecache,
2933                      PAGECACHE_FILE *file,
2934                      pgcache_page_no_t pageno,
2935                      LSN lsn)
2936 {
2937   PAGECACHE_BLOCK_LINK *block;
2938   int page_st;
2939   DBUG_ENTER("pagecache_unpin");
2940   DBUG_PRINT("enter", ("fd: %u  page: %lu",
2941                        (uint) file->file, (ulong) pageno));
2942   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
2943   /*
2944     As soon as we keep lock cache can be used, and we have lock bacause want
2945     aunlock.
2946   */
2947   DBUG_ASSERT(pagecache->can_be_used);
2948 
2949   inc_counter_for_resize_op(pagecache);
2950   /* See NOTE for pagecache_unlock about registering requests */
2951   block= find_block(pagecache, file, pageno, 0, 0, 0, 0, &page_st);
2952   DBUG_ASSERT(block != 0);
2953   DBUG_ASSERT(page_st == PAGE_READ);
2954   /* we can't unpin such page without unlock */
2955   DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);
2956 
2957   if (lsn != LSN_IMPOSSIBLE)
2958     check_and_set_lsn(pagecache, lsn, block);
2959 
2960   /*
2961     we can just unpin only with keeping read lock because:
2962     a) we can't pin without any lock
2963     b) we can't unpin keeping write lock
2964   */
2965   if (make_lock_and_pin(pagecache, block,
2966                         PAGECACHE_LOCK_LEFT_READLOCKED,
2967                         PAGECACHE_UNPIN, FALSE))
2968     DBUG_ASSERT(0);                           /* should not happend */
2969 
2970   remove_reader(block);
2971   /*
2972     Link the block into the LRU chain if it's the last submitted request
2973     for the block and block will not be pinned.
2974     See NOTE for pagecache_unlock about registering requests
2975   */
2976   unreg_request(pagecache, block, 1);
2977 
2978   dec_counter_for_resize_op(pagecache);
2979 
2980   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
2981 
2982   DBUG_VOID_RETURN;
2983 }
2984 
2985 
2986 /**
2987   @brief Unlock/unpin page and put LSN stamp if it need
2988   (uses direct block/page pointer)
2989 
2990   @param pagecache       pointer to a page cache data structure
2991   @param link            direct link to page (returned by read or write)
2992   @param lock            lock change
2993   @param pin             pin page
2994   @param first_REDO_LSN_for_page do not set it if it is LSN_IMPOSSIBLE (0)
2995   @param lsn             if it is not LSN_IMPOSSIBLE and it is bigger then
2996                          LSN on the page it will be written on the page
2997   @param was_changed     should be true if the page was write locked with
2998                          direct link giving and the page was changed
2999   @param any             allow unpinning block pinned by any thread; possible
3000                          only if not locked
3001 
3002   @note 'any' is a hack so that _ma_bitmap_unpin_all() is allowed to unpin
3003   non-locked bitmap pages pinned by other threads. Because it always uses
3004   PAGECACHE_LOCK_LEFT_UNLOCKED and PAGECACHE_UNPIN
3005   (see write_changed_bitmap()), the hack is limited to these conditions.
3006 */
3007 
3008 void pagecache_unlock_by_link(PAGECACHE *pagecache,
3009                               PAGECACHE_BLOCK_LINK *block,
3010                               enum pagecache_page_lock lock,
3011                               enum pagecache_page_pin pin,
3012                               LSN first_REDO_LSN_for_page,
3013                               LSN lsn, my_bool was_changed,
3014                               my_bool any)
3015 {
3016   DBUG_ENTER("pagecache_unlock_by_link");
3017   DBUG_PRINT("enter", ("block: %p  fd: %u  page: %lu  changed: %d  %s  %s",
3018                        block, (uint) block->hash_link->file.file,
3019                        (ulong) block->hash_link->pageno, was_changed,
3020                        page_cache_page_lock_str[lock],
3021                        page_cache_page_pin_str[pin]));
3022   /*
3023     We do not allow any lock/pin increasing here and page can't be
3024     unpinned because we use direct link.
3025   */
3026   DBUG_ASSERT(pin != PAGECACHE_PIN);
3027   DBUG_ASSERT(pin != PAGECACHE_PIN_LEFT_UNPINNED);
3028   DBUG_ASSERT(lock != PAGECACHE_LOCK_READ);
3029   DBUG_ASSERT(lock != PAGECACHE_LOCK_WRITE);
3030   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3031   if (pin == PAGECACHE_PIN_LEFT_UNPINNED &&
3032       lock == PAGECACHE_LOCK_READ_UNLOCK)
3033   {
3034     if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
3035       DBUG_ASSERT(0);                         /* should not happend */
3036     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3037     DBUG_VOID_RETURN;
3038   }
3039 
3040   /*
3041     As soon as we keep lock cache can be used, and we have lock because want
3042     unlock.
3043   */
3044   DBUG_ASSERT(pagecache->can_be_used);
3045 
3046   inc_counter_for_resize_op(pagecache);
3047   if (was_changed)
3048   {
3049     if (first_REDO_LSN_for_page != LSN_IMPOSSIBLE)
3050     {
3051       /*
3052         LOCK_READ_UNLOCK is ok here as the page may have first locked
3053         with WRITE lock that was temporarly converted to READ lock before
3054         it's unpinned
3055       */
3056       DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
3057                   lock == PAGECACHE_LOCK_READ_UNLOCK);
3058       DBUG_ASSERT(pin == PAGECACHE_UNPIN);
3059       pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
3060     }
3061     if (lsn != LSN_IMPOSSIBLE)
3062       check_and_set_lsn(pagecache, lsn, block);
3063     /*
3064       Reset error flag. Mark also that page is active; This may not have
3065       been the case if there was an error reading the page
3066     */
3067     block->status= (block->status & ~PCBLOCK_ERROR) | PCBLOCK_READ;
3068   }
3069 
3070   /* if we lock for write we must link the block to changed blocks */
3071   DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0 ||
3072               (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
3073                lock == PAGECACHE_LOCK_WRITE_TO_READ ||
3074                lock == PAGECACHE_LOCK_LEFT_WRITELOCKED));
3075   /*
3076     If was_changed then status should be PCBLOCK_DIRECT_W or marked
3077     as dirty
3078   */
3079   DBUG_ASSERT(!was_changed || (block->status & PCBLOCK_DIRECT_W) ||
3080               (block->status & PCBLOCK_CHANGED));
3081   if ((block->status & PCBLOCK_DIRECT_W) &&
3082       (lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
3083        lock == PAGECACHE_LOCK_WRITE_TO_READ))
3084   {
3085     if (!(block->status & PCBLOCK_CHANGED) && was_changed)
3086       link_to_changed_list(pagecache, block);
3087     block->status&= ~PCBLOCK_DIRECT_W;
3088     DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: %p", block));
3089   }
3090 
3091   if (make_lock_and_pin(pagecache, block, lock, pin, any))
3092     DBUG_ASSERT(0);                           /* should not happend */
3093 
3094   /*
3095     Link the block into the LRU chain if it's the last submitted request
3096     for the block and block will not be pinned.
3097     See NOTE for pagecache_unlock about registering requests.
3098   */
3099   if (pin != PAGECACHE_PIN_LEFT_PINNED)
3100     unreg_request(pagecache, block, 1);
3101 
3102   dec_counter_for_resize_op(pagecache);
3103 
3104   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3105 
3106   DBUG_VOID_RETURN;
3107 }
3108 
3109 
3110 /*
3111   Unpin page
3112   (uses direct block/page pointer)
3113 
3114   SYNOPSIS
3115     pagecache_unpin_by_link()
3116     pagecache           pointer to a page cache data structure
3117     link                direct link to page (returned by read or write)
3118     lsn                 if it is not LSN_IMPOSSIBLE (0) and it
3119                         is bigger then LSN on the page it will be written on
3120                         the page
3121 */
3122 
3123 void pagecache_unpin_by_link(PAGECACHE *pagecache,
3124                              PAGECACHE_BLOCK_LINK *block,
3125                              LSN lsn)
3126 {
3127   DBUG_ENTER("pagecache_unpin_by_link");
3128   DBUG_PRINT("enter", ("block: %p  fd: %u page: %lu",
3129                        block, (uint) block->hash_link->file.file,
3130                        (ulong) block->hash_link->pageno));
3131 
3132   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3133   /*
3134     As soon as we keep lock cache can be used, and we have lock because want
3135     unlock.
3136   */
3137   DBUG_ASSERT(pagecache->can_be_used);
3138   /* we can't unpin such page without unlock */
3139   DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);
3140 
3141   inc_counter_for_resize_op(pagecache);
3142 
3143   if (lsn != LSN_IMPOSSIBLE)
3144     check_and_set_lsn(pagecache, lsn, block);
3145 
3146   /*
3147     We can just unpin only with keeping read lock because:
3148     a) we can't pin without any lock
3149     b) we can't unpin keeping write lock
3150   */
3151   if (make_lock_and_pin(pagecache, block,
3152                         PAGECACHE_LOCK_LEFT_READLOCKED,
3153                         PAGECACHE_UNPIN, FALSE))
3154     DBUG_ASSERT(0); /* should not happend */
3155 
3156   /*
3157     Link the block into the LRU chain if it's the last submitted request
3158     for the block and block will not be pinned.
3159     See NOTE for pagecache_unlock about registering requests.
3160   */
3161   unreg_request(pagecache, block, 1);
3162 
3163   dec_counter_for_resize_op(pagecache);
3164 
3165   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3166 
3167   DBUG_VOID_RETURN;
3168 }
3169 
3170 /* description of how to change lock before and after read/write */
3171 struct rw_lock_change
3172 {
3173   my_bool need_lock_change; /* need changing of lock at the end */
3174   enum pagecache_page_lock new_lock; /* lock at the beginning */
3175   enum pagecache_page_lock unlock_lock; /* lock at the end */
3176 };
3177 
3178 /* description of how to change pin before and after read/write */
3179 struct rw_pin_change
3180 {
3181   enum pagecache_page_pin new_pin; /* pin status at the beginning */
3182   enum pagecache_page_pin unlock_pin; /* pin status at the end */
3183 };
3184 
3185 /**
3186   Depending on the lock which the user wants in pagecache_read(), we
3187   need to acquire a first type of lock at start of pagecache_read(), and
3188   downgrade it to a second type of lock at end. For example, if user
3189   asked for no lock (PAGECACHE_LOCK_LEFT_UNLOCKED) this translates into
3190   taking first a read lock PAGECACHE_LOCK_READ (to rightfully block on
3191   existing write locks) then read then unlock the lock i.e. change lock
3192   to PAGECACHE_LOCK_READ_UNLOCK (the "1" below tells that a change is
3193   needed).
3194 */
3195 
3196 static struct rw_lock_change lock_to_read[8]=
3197 {
3198   { /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
3199     1,
3200     PAGECACHE_LOCK_READ, PAGECACHE_LOCK_READ_UNLOCK
3201   },
3202   { /*PAGECACHE_LOCK_LEFT_READLOCKED*/
3203     0,
3204     PAGECACHE_LOCK_LEFT_READLOCKED, PAGECACHE_LOCK_LEFT_READLOCKED
3205   },
3206   { /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
3207     0,
3208     PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_LEFT_WRITELOCKED
3209   },
3210   { /*PAGECACHE_LOCK_READ*/
3211     1,
3212     PAGECACHE_LOCK_READ, PAGECACHE_LOCK_LEFT_READLOCKED
3213   },
3214   { /*PAGECACHE_LOCK_WRITE*/
3215     1,
3216     PAGECACHE_LOCK_WRITE, PAGECACHE_LOCK_LEFT_WRITELOCKED
3217   },
3218   { /*PAGECACHE_LOCK_READ_UNLOCK*/
3219     1,
3220     PAGECACHE_LOCK_LEFT_READLOCKED, PAGECACHE_LOCK_READ_UNLOCK
3221   },
3222   { /*PAGECACHE_LOCK_WRITE_UNLOCK*/
3223     1,
3224     PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_WRITE_UNLOCK
3225   },
3226   { /*PAGECACHE_LOCK_WRITE_TO_READ*/
3227     1,
3228     PAGECACHE_LOCK_LEFT_WRITELOCKED, PAGECACHE_LOCK_WRITE_TO_READ
3229   }
3230 };
3231 
3232 /**
3233   Two sets of pin modes (every as for lock upper but for pinning). The
3234   difference between sets if whether we are going to provide caller with
3235   reference on the block or not
3236 */
3237 
3238 static struct rw_pin_change lock_to_pin[2][8]=
3239 {
3240   {
3241     { /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
3242       PAGECACHE_PIN_LEFT_UNPINNED,
3243       PAGECACHE_PIN_LEFT_UNPINNED
3244     },
3245     { /*PAGECACHE_LOCK_LEFT_READLOCKED*/
3246       PAGECACHE_PIN_LEFT_UNPINNED,
3247       PAGECACHE_PIN_LEFT_UNPINNED,
3248     },
3249     { /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
3250       PAGECACHE_PIN_LEFT_PINNED,
3251       PAGECACHE_PIN_LEFT_PINNED
3252     },
3253     { /*PAGECACHE_LOCK_READ*/
3254       PAGECACHE_PIN_LEFT_UNPINNED,
3255       PAGECACHE_PIN_LEFT_UNPINNED
3256     },
3257     { /*PAGECACHE_LOCK_WRITE*/
3258       PAGECACHE_PIN,
3259       PAGECACHE_PIN_LEFT_PINNED
3260     },
3261     { /*PAGECACHE_LOCK_READ_UNLOCK*/
3262       PAGECACHE_PIN_LEFT_UNPINNED,
3263       PAGECACHE_PIN_LEFT_UNPINNED
3264     },
3265     { /*PAGECACHE_LOCK_WRITE_UNLOCK*/
3266       PAGECACHE_PIN_LEFT_PINNED,
3267       PAGECACHE_UNPIN
3268     },
3269     { /*PAGECACHE_LOCK_WRITE_TO_READ*/
3270       PAGECACHE_PIN_LEFT_PINNED,
3271       PAGECACHE_UNPIN
3272     }
3273   },
3274   {
3275     { /*PAGECACHE_LOCK_LEFT_UNLOCKED*/
3276       PAGECACHE_PIN_LEFT_UNPINNED,
3277       PAGECACHE_PIN_LEFT_UNPINNED
3278     },
3279     { /*PAGECACHE_LOCK_LEFT_READLOCKED*/
3280       PAGECACHE_PIN_LEFT_UNPINNED,
3281       PAGECACHE_PIN_LEFT_UNPINNED,
3282     },
3283     { /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/
3284       PAGECACHE_PIN_LEFT_PINNED,
3285       PAGECACHE_PIN_LEFT_PINNED
3286     },
3287     { /*PAGECACHE_LOCK_READ*/
3288       PAGECACHE_PIN,
3289       PAGECACHE_PIN_LEFT_PINNED
3290     },
3291     { /*PAGECACHE_LOCK_WRITE*/
3292       PAGECACHE_PIN,
3293       PAGECACHE_PIN_LEFT_PINNED
3294     },
3295     { /*PAGECACHE_LOCK_READ_UNLOCK*/
3296       PAGECACHE_PIN_LEFT_UNPINNED,
3297       PAGECACHE_PIN_LEFT_UNPINNED
3298     },
3299     { /*PAGECACHE_LOCK_WRITE_UNLOCK*/
3300       PAGECACHE_PIN_LEFT_PINNED,
3301       PAGECACHE_UNPIN
3302     },
3303     { /*PAGECACHE_LOCK_WRITE_TO_READ*/
3304       PAGECACHE_PIN_LEFT_PINNED,
3305       PAGECACHE_PIN_LEFT_PINNED,
3306     }
3307   }
3308 };
3309 
3310 
3311 /*
3312   @brief Read a block of data from a cached file into a buffer;
3313 
3314   @param pagecache      pointer to a page cache data structure
3315   @param file           handler for the file for the block of data to be read
3316   @param pageno         number of the block of data in the file
3317   @param level          determines the weight of the data
3318   @param buff           buffer to where the data must be placed
3319   @param type           type of the page
3320   @param lock           lock change
3321   @param link           link to the page if we pin it
3322 
3323   @return address from where the data is placed if successful, 0 - otherwise.
3324 
3325   @note Pin will be chosen according to lock parameter (see lock_to_pin)
3326 
3327   @note 'buff', if not NULL, must be long-aligned.
3328 
3329   @note  If buff==0 then we provide reference on the page so should keep the
3330   page pinned.
3331 */
3332 
3333 uchar *pagecache_read(PAGECACHE *pagecache,
3334                       PAGECACHE_FILE *file,
3335                       pgcache_page_no_t pageno,
3336                       uint level,
3337                       uchar *buff,
3338                       enum pagecache_page_type type,
3339                       enum pagecache_page_lock lock,
3340                       PAGECACHE_BLOCK_LINK **page_link)
3341 {
3342   my_bool error= 0;
3343   enum pagecache_page_pin
3344     new_pin= lock_to_pin[buff==0][lock].new_pin,
3345     unlock_pin= lock_to_pin[buff==0][lock].unlock_pin;
3346   PAGECACHE_BLOCK_LINK *fake_link;
3347   my_bool reg_request;
3348 #ifndef DBUG_OFF
3349   char llbuf[22];
3350   DBUG_ENTER("pagecache_read");
3351   DBUG_PRINT("enter", ("fd: %u  page: %s  buffer: %p  level: %u  "
3352                        "t:%s  (%d)%s->%s  %s->%s",
3353                        (uint) file->file, ullstr(pageno, llbuf),
3354                        buff, level,
3355                        page_cache_page_type_str[type],
3356                        lock_to_read[lock].need_lock_change,
3357                        page_cache_page_lock_str[lock_to_read[lock].new_lock],
3358                        page_cache_page_lock_str[lock_to_read[lock].unlock_lock],
3359                        page_cache_page_pin_str[new_pin],
3360                        page_cache_page_pin_str[unlock_pin]));
3361   DBUG_ASSERT(buff != 0 || (buff == 0 && (unlock_pin == PAGECACHE_PIN ||
3362                                           unlock_pin == PAGECACHE_PIN_LEFT_PINNED)));
3363   DBUG_ASSERT(pageno < ((1ULL) << 40));
3364 #endif
3365 
3366   if (!page_link)
3367     page_link= &fake_link;
3368   *page_link= 0;                                 /* Catch errors */
3369 
3370 restart:
3371 
3372   if (pagecache->can_be_used)
3373   {
3374     /* Key cache is used */
3375     PAGECACHE_BLOCK_LINK *block;
3376     uint status;
3377     int UNINIT_VAR(page_st);
3378 
3379     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3380     if (!pagecache->can_be_used)
3381     {
3382       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3383       goto no_key_cache;
3384     }
3385 
3386     inc_counter_for_resize_op(pagecache);
3387     pagecache->global_cache_r_requests++;
3388     /* See NOTE for pagecache_unlock about registering requests. */
3389     reg_request= ((new_pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
3390                   (new_pin == PAGECACHE_PIN));
3391     block= find_block(pagecache, file, pageno, level,
3392                       lock == PAGECACHE_LOCK_WRITE, buff != 0,
3393                       reg_request, &page_st);
3394     DBUG_PRINT("info", ("Block type: %s current type %s",
3395                         page_cache_page_type_str[block->type],
3396                         page_cache_page_type_str[type]));
3397     if (((block->status & PCBLOCK_ERROR) == 0) && (page_st != PAGE_READ))
3398     {
3399       /* The requested page is to be read into the block buffer */
3400       read_block(pagecache, block,
3401                  (my_bool)(page_st == PAGE_TO_BE_READ));
3402       DBUG_PRINT("info", ("read is done"));
3403     }
3404     /*
3405       Assert after block is read. Imagine two concurrent SELECTs on same
3406       table (thread1 and 2), which want to pagecache_read() the same
3407       pageno/fileno. Thread1 calls find_block(), decides to evict a dirty
3408       page from LRU; while it's writing this dirty page to disk, it is
3409       pre-empted and thread2 runs its find_block(), gets the block (in
3410       PAGE_TO_BE_READ state). This block is still containing the in-eviction
3411       dirty page so has an its type, which cannot be tested.
3412       So thread2 has to wait for read_block() to finish (when it wakes up in
3413       read_block(), it's woken up by read_block() of thread1, which implies
3414       that block's type was set to EMPTY by thread1 as part of find_block()).
3415     */
3416     DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
3417                 block->type == type ||
3418                 type == PAGECACHE_LSN_PAGE ||
3419                 type == PAGECACHE_READ_UNKNOWN_PAGE ||
3420                 block->type == PAGECACHE_READ_UNKNOWN_PAGE);
3421     if (type != PAGECACHE_READ_UNKNOWN_PAGE ||
3422         block->type == PAGECACHE_EMPTY_PAGE)
3423       block->type= type;
3424 
3425     if (make_lock_and_pin(pagecache, block, lock_to_read[lock].new_lock,
3426                           new_pin, FALSE))
3427     {
3428       /*
3429         We failed to write lock the block, cache is unlocked,
3430         we will try to get the block again.
3431       */
3432       if (reg_request)
3433         unreg_request(pagecache, block, 1);
3434       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3435       DBUG_PRINT("info", ("restarting..."));
3436       goto restart;
3437     }
3438 
3439     status= block->status;
3440     if (!buff)
3441     {
3442       buff=  block->buffer;
3443       /* possibly we will write here (resolved on unlock) */
3444       if ((lock == PAGECACHE_LOCK_WRITE ||
3445            lock == PAGECACHE_LOCK_LEFT_WRITELOCKED))
3446       {
3447         block->status|= PCBLOCK_DIRECT_W;
3448         DBUG_PRINT("info", ("Set PCBLOCK_DIRECT_W for block: %p", block));
3449       }
3450     }
3451     else
3452     {
3453       if (status & PCBLOCK_READ)
3454       {
3455 #if !defined(SERIALIZED_READ_FROM_CACHE)
3456         pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3457 #endif
3458 
3459         DBUG_ASSERT((pagecache->block_size & 511) == 0);
3460         /* Copy data from the cache buffer */
3461         memcpy(buff, block->buffer, pagecache->block_size);
3462 
3463 #if !defined(SERIALIZED_READ_FROM_CACHE)
3464         pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3465 #endif
3466       }
3467     }
3468 
3469     remove_reader(block);
3470     if (lock_to_read[lock].need_lock_change)
3471     {
3472       if (make_lock_and_pin(pagecache, block,
3473                             lock_to_read[lock].unlock_lock,
3474                             unlock_pin, FALSE))
3475       {
3476         pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3477         DBUG_ASSERT(0);
3478         return (uchar*) 0;
3479       }
3480     }
3481     /*
3482       Link the block into the LRU chain if it's the last submitted request
3483       for the block and block will not be pinned.
3484       See NOTE for pagecache_unlock about registering requests.
3485     */
3486     if (unlock_pin == PAGECACHE_PIN_LEFT_UNPINNED ||
3487         unlock_pin == PAGECACHE_UNPIN)
3488       unreg_request(pagecache, block, 1);
3489     else
3490       *page_link= block;
3491 
3492     dec_counter_for_resize_op(pagecache);
3493 
3494     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3495 
3496     if (status & PCBLOCK_ERROR)
3497     {
3498       my_errno= block->error;
3499       DBUG_ASSERT(my_errno != 0);
3500       DBUG_PRINT("error", ("Got error %d when doing page read", my_errno));
3501       DBUG_RETURN((uchar *) 0);
3502     }
3503 
3504     DBUG_RETURN(buff);
3505   }
3506 
3507 no_key_cache:					/* Key cache is not used */
3508 
3509   /* We can't use mutex here as the key cache may not be initialized */
3510   pagecache->global_cache_r_requests++;
3511   pagecache->global_cache_read++;
3512 
3513   {
3514     PAGECACHE_IO_HOOK_ARGS args;
3515     args.page= buff;
3516     args.pageno= pageno;
3517     args.data= file->callback_data;
3518     error= (* file->pre_read_hook)(&args);
3519     if (!error)
3520     {
3521       error= pagecache_fread(pagecache, file, args.page, pageno,
3522                              pagecache->readwrite_flags) != 0;
3523     }
3524     error= (* file->post_read_hook)(error, &args);
3525   }
3526 
3527   DBUG_RETURN(error ? (uchar*) 0 : buff);
3528 }
3529 
3530 
3531 /*
3532   @brief Set/reset flag that page always should be flushed on delete
3533 
3534   @param pagecache      pointer to a page cache data structure
3535   @param link           direct link to page (returned by read or write)
3536   @param write          write on delete flag value
3537 
3538 */
3539 
3540 void pagecache_set_write_on_delete_by_link(PAGECACHE_BLOCK_LINK *block)
3541 {
3542   DBUG_ENTER("pagecache_set_write_on_delete_by_link");
3543   DBUG_PRINT("enter", ("fd: %d block %p  %d -> TRUE",
3544                        block->hash_link->file.file,
3545                        block, (int) block->status & PCBLOCK_DEL_WRITE));
3546   DBUG_ASSERT(block->pins); /* should be pinned */
3547   DBUG_ASSERT(block->wlocks); /* should be write locked */
3548 
3549   block->status|= PCBLOCK_DEL_WRITE;
3550 
3551   DBUG_VOID_RETURN;
3552 }
3553 
3554 
3555 /*
3556   @brief Delete page from the buffer (common part for link and file/page)
3557 
3558   @param pagecache      pointer to a page cache data structure
3559   @param block          direct link to page (returned by read or write)
3560   @param page_link      hash link of the block
3561   @param flush          flush page if it is dirty
3562 
3563   @retval 0 deleted or was not present at all
3564   @retval 1 error
3565 
3566 */
3567 
3568 static my_bool pagecache_delete_internal(PAGECACHE *pagecache,
3569                                          PAGECACHE_BLOCK_LINK *block,
3570                                          PAGECACHE_HASH_LINK *page_link,
3571                                          my_bool flush)
3572 {
3573   my_bool error= 0;
3574   if (block->status & PCBLOCK_IN_FLUSH)
3575   {
3576     /*
3577       this call is just 'hint' for the cache to free the page so we will
3578       not interferes with flushing process but must return success
3579     */
3580     goto out;
3581   }
3582   if (block->status & PCBLOCK_CHANGED)
3583   {
3584     flush= (flush || (block->status & PCBLOCK_DEL_WRITE));
3585     if (flush)
3586     {
3587       /* The block contains a dirty page - push it out of the cache */
3588 
3589       KEYCACHE_DBUG_PRINT("find_block", ("block is dirty"));
3590 
3591       /*
3592         The call is thread safe because only the current
3593         thread might change the block->hash_link value
3594       */
3595       DBUG_ASSERT(block->pins == 1);
3596       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3597       error= pagecache_fwrite(pagecache,
3598                               &block->hash_link->file,
3599                               block->buffer,
3600                               block->hash_link->pageno,
3601                               block->type,
3602                               pagecache->readwrite_flags);
3603       pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3604       pagecache->global_cache_write++;
3605 
3606       if (error)
3607       {
3608         block->status|= PCBLOCK_ERROR;
3609         block->error=   (int16) my_errno;
3610         my_debug_put_break_here();
3611         goto out;
3612       }
3613     }
3614     else
3615     {
3616       PAGECACHE_IO_HOOK_ARGS args;
3617       PAGECACHE_FILE *filedesc= &block->hash_link->file;
3618       args.page= block->buffer;
3619       args.pageno= block->hash_link->pageno;
3620       args.data= filedesc->callback_data;
3621       /* We are not going to write the page but have to call callbacks */
3622       DBUG_PRINT("info", ("flush_callback: %p  data: %p",
3623                           filedesc->flush_log_callback,
3624                           filedesc->callback_data));
3625       if ((*filedesc->flush_log_callback)(&args))
3626       {
3627         DBUG_PRINT("error", ("flush or write callback problem"));
3628         error= 1;
3629         goto out;
3630       }
3631     }
3632     pagecache->blocks_changed--;
3633     pagecache->global_blocks_changed--;
3634     /*
3635       free_block() will change the status and rec_lsn of the block so no
3636       need to change them here.
3637     */
3638   }
3639   /* Cache is locked, so we can relese page before freeing it */
3640   if (make_lock_and_pin(pagecache, block,
3641                         PAGECACHE_LOCK_WRITE_UNLOCK,
3642                         PAGECACHE_UNPIN, FALSE))
3643     DBUG_ASSERT(0);
3644   DBUG_ASSERT(block->hash_link->requests > 0);
3645   page_link->requests--;
3646   /* See NOTE for pagecache_unlock() about registering requests. */
3647   free_block(pagecache, block, 0);
3648   dec_counter_for_resize_op(pagecache);
3649   return 0;
3650 
3651 out:
3652   /* Cache is locked, so we can relese page before freeing it */
3653   if (make_lock_and_pin(pagecache, block,
3654                         PAGECACHE_LOCK_WRITE_UNLOCK,
3655                         PAGECACHE_UNPIN, FALSE))
3656     DBUG_ASSERT(0);
3657   page_link->requests--;
3658   unreg_request(pagecache, block, 1);
3659   dec_counter_for_resize_op(pagecache);
3660   return error;
3661 }
3662 
3663 
3664 /*
3665   @brief Delete page from the buffer by link
3666 
3667   @param pagecache      pointer to a page cache data structure
3668   @param link           direct link to page (returned by read or write)
3669   @param lock           lock change
3670   @param flush          flush page if it is dirty
3671 
3672   @retval 0 deleted or was not present at all
3673   @retval 1 error
3674 
3675   @note lock  can be only PAGECACHE_LOCK_LEFT_WRITELOCKED (page was
3676   write locked before) or PAGECACHE_LOCK_WRITE (delete will write
3677   lock page before delete)
3678 */
3679 
3680 my_bool pagecache_delete_by_link(PAGECACHE *pagecache,
3681                                  PAGECACHE_BLOCK_LINK *block,
3682                                  enum pagecache_page_lock lock,
3683                                  my_bool flush)
3684 {
3685   my_bool error= 0;
3686   enum pagecache_page_pin pin= PAGECACHE_PIN_LEFT_PINNED;
3687   DBUG_ENTER("pagecache_delete_by_link");
3688   DBUG_PRINT("enter", ("fd: %d block %p  %s  %s",
3689                        block->hash_link->file.file,
3690                        block,
3691                        page_cache_page_lock_str[lock],
3692                        page_cache_page_pin_str[pin]));
3693   DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE ||
3694               lock == PAGECACHE_LOCK_LEFT_WRITELOCKED);
3695   DBUG_ASSERT(block->pins != 0); /* should be pinned */
3696 
3697   if (pagecache->can_be_used)
3698   {
3699     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3700     if (!pagecache->can_be_used)
3701       goto end;
3702 
3703     /*
3704       This block should be pinned (i.e. has not zero request counter) =>
3705       Such block can't be chosen for eviction.
3706     */
3707     DBUG_ASSERT((block->status &
3708                  (PCBLOCK_IN_SWITCH | PCBLOCK_REASSIGNED)) == 0);
3709 
3710     inc_counter_for_resize_op(pagecache);
3711     /*
3712       make_lock_and_pin() can't fail here, because we are keeping pin on the
3713       block and it can't be evicted (which is cause of lock fail and retry)
3714     */
3715     if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
3716       DBUG_ASSERT(0);
3717 
3718     /*
3719       get_present_hash_link() side effect emulation before call
3720       pagecache_delete_internal()
3721     */
3722     block->hash_link->requests++;
3723 
3724     error= pagecache_delete_internal(pagecache, block, block->hash_link,
3725                                      flush);
3726 end:
3727     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3728   }
3729 
3730   DBUG_RETURN(error);
3731 }
3732 
3733 
3734 /**
3735   @brief Returns "hits" for promotion
3736 
3737   @return "hits" for promotion
3738 */
3739 
3740 uint pagecache_pagelevel(PAGECACHE_BLOCK_LINK *block)
3741 {
3742   return block->hits_left;
3743 }
3744 
3745 /*
3746   @brief Adds "hits" to the page
3747 
3748   @param link           direct link to page (returned by read or write)
3749   @param level          number of "hits" which we add to the page
3750 */
3751 
3752 void pagecache_add_level_by_link(PAGECACHE_BLOCK_LINK *block,
3753                                  uint level)
3754 {
3755   DBUG_ASSERT(block->pins != 0); /* should be pinned */
3756   /*
3757     Operation is just for statistics so it is not really important
3758     if it interfere with other hit increasing => we are doing it without
3759     locking the pagecache.
3760   */
3761   block->hits_left+= level;
3762 }
3763 
3764 /*
3765   @brief Delete page from the buffer
3766 
3767   @param pagecache      pointer to a page cache data structure
3768   @param file           handler for the file for the block of data to be read
3769   @param pageno         number of the block of data in the file
3770   @param lock           lock change
3771   @param flush          flush page if it is dirty
3772 
3773   @retval 0 deleted or was not present at all
3774   @retval 1 error
3775 
3776   @note lock  can be only PAGECACHE_LOCK_LEFT_WRITELOCKED (page was
3777   write locked before) or PAGECACHE_LOCK_WRITE (delete will write
3778   lock page before delete)
3779 */
3780 static enum pagecache_page_pin lock_to_pin_one_phase[8]=
3781 {
3782   PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
3783   PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
3784   PAGECACHE_PIN_LEFT_PINNED   /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
3785   PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ*/,
3786   PAGECACHE_PIN               /*PAGECACHE_LOCK_WRITE*/,
3787   PAGECACHE_PIN_LEFT_UNPINNED /*PAGECACHE_LOCK_READ_UNLOCK*/,
3788   PAGECACHE_UNPIN             /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
3789   PAGECACHE_UNPIN             /*PAGECACHE_LOCK_WRITE_TO_READ*/
3790 };
3791 
3792 my_bool pagecache_delete(PAGECACHE *pagecache,
3793                          PAGECACHE_FILE *file,
3794                          pgcache_page_no_t pageno,
3795                          enum pagecache_page_lock lock,
3796                          my_bool flush)
3797 {
3798   my_bool error= 0;
3799   enum pagecache_page_pin pin= lock_to_pin_one_phase[lock];
3800   DBUG_ENTER("pagecache_delete");
3801   DBUG_PRINT("enter", ("fd: %u  page: %lu  %s  %s",
3802                        (uint) file->file, (ulong) pageno,
3803                        page_cache_page_lock_str[lock],
3804                        page_cache_page_pin_str[pin]));
3805   DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE ||
3806               lock == PAGECACHE_LOCK_LEFT_WRITELOCKED);
3807   DBUG_ASSERT(pin == PAGECACHE_PIN ||
3808               pin == PAGECACHE_PIN_LEFT_PINNED);
3809 restart:
3810 
3811   DBUG_ASSERT(pageno < ((1ULL) << 40));
3812   if (pagecache->can_be_used)
3813   {
3814     /* Key cache is used */
3815     reg1 PAGECACHE_BLOCK_LINK *block;
3816     PAGECACHE_HASH_LINK **unused_start, *page_link;
3817 
3818     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
3819     if (!pagecache->can_be_used)
3820       goto end;
3821 
3822     inc_counter_for_resize_op(pagecache);
3823     page_link= get_present_hash_link(pagecache, file, pageno, &unused_start);
3824     if (!page_link)
3825     {
3826       DBUG_PRINT("info", ("There is no such page in the cache"));
3827       dec_counter_for_resize_op(pagecache);
3828       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3829       DBUG_RETURN(0);
3830     }
3831     block= page_link->block;
3832     if (block->status & (PCBLOCK_REASSIGNED | PCBLOCK_IN_SWITCH))
3833     {
3834       DBUG_PRINT("info", ("Block %p already is %s",
3835                           block,
3836                           ((block->status & PCBLOCK_REASSIGNED) ?
3837                            "reassigned" : "in switch")));
3838       PCBLOCK_INFO(block);
3839       page_link->requests--;
3840       dec_counter_for_resize_op(pagecache);
3841       goto end;
3842     }
3843     /* See NOTE for pagecache_unlock about registering requests. */
3844     if (pin == PAGECACHE_PIN)
3845       reg_requests(pagecache, block, 1);
3846     if (make_lock_and_pin(pagecache, block, lock, pin, FALSE))
3847     {
3848       /*
3849         We failed to writelock the block, cache is unlocked, and last write
3850         lock is released, we will try to get the block again.
3851       */
3852       if (pin == PAGECACHE_PIN)
3853         unreg_request(pagecache, block, 1);
3854       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3855       DBUG_PRINT("info", ("restarting..."));
3856       goto restart;
3857     }
3858 
3859     /* we can't delete with opened direct link for write */
3860     DBUG_ASSERT((block->status & PCBLOCK_DIRECT_W) == 0);
3861 
3862     error= pagecache_delete_internal(pagecache, block, page_link, flush);
3863 end:
3864     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
3865   }
3866 
3867   DBUG_RETURN(error);
3868 }
3869 
3870 
3871 my_bool pagecache_delete_pages(PAGECACHE *pagecache,
3872                                PAGECACHE_FILE *file,
3873                                pgcache_page_no_t pageno,
3874                                uint page_count,
3875                                enum pagecache_page_lock lock,
3876                                my_bool flush)
3877 {
3878   pgcache_page_no_t page_end;
3879   DBUG_ENTER("pagecache_delete_pages");
3880   DBUG_ASSERT(page_count > 0);
3881 
3882   page_end= pageno + page_count;
3883   do
3884   {
3885     if (pagecache_delete(pagecache, file, pageno,
3886                          lock, flush))
3887       DBUG_RETURN(1);
3888   } while (++pageno != page_end);
3889   DBUG_RETURN(0);
3890 }
3891 
3892 
3893 /**
3894   @brief Writes a buffer into a cached file.
3895 
3896   @param pagecache       pointer to a page cache data structure
3897   @param file            handler for the file to write data to
3898   @param pageno          number of the block of data in the file
3899   @param level           determines the weight of the data
3900   @param buff            buffer with the data
3901   @param type            type of the page
3902   @param lock            lock change
3903   @param pin             pin page
3904   @param write_mode      how to write page
3905   @param link            link to the page if we pin it
3906   @param first_REDO_LSN_for_page the lsn to set rec_lsn
3907   @param offset          offset in the page
3908   @param size            size of data
3909   @param validator       read page validator
3910   @param validator_data  the validator data
3911 
3912   @retval 0 if a success.
3913   @retval 1 Error.
3914 */
3915 
3916 static struct rw_lock_change write_lock_change_table[]=
3917 {
3918   {1,
3919    PAGECACHE_LOCK_WRITE,
3920    PAGECACHE_LOCK_WRITE_UNLOCK} /*PAGECACHE_LOCK_LEFT_UNLOCKED*/,
3921   {0, /*unsupported (we can't write having the block read locked) */
3922    PAGECACHE_LOCK_LEFT_UNLOCKED,
3923    PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_LEFT_READLOCKED*/,
3924   {0, PAGECACHE_LOCK_LEFT_WRITELOCKED, 0} /*PAGECACHE_LOCK_LEFT_WRITELOCKED*/,
3925   {1,
3926    PAGECACHE_LOCK_WRITE,
3927    PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_READ*/,
3928   {0, PAGECACHE_LOCK_WRITE, 0} /*PAGECACHE_LOCK_WRITE*/,
3929   {0, /*unsupported (we can't write having the block read locked) */
3930    PAGECACHE_LOCK_LEFT_UNLOCKED,
3931    PAGECACHE_LOCK_LEFT_UNLOCKED} /*PAGECACHE_LOCK_READ_UNLOCK*/,
3932   {1,
3933    PAGECACHE_LOCK_LEFT_WRITELOCKED,
3934    PAGECACHE_LOCK_WRITE_UNLOCK } /*PAGECACHE_LOCK_WRITE_UNLOCK*/,
3935   {1,
3936    PAGECACHE_LOCK_LEFT_WRITELOCKED,
3937    PAGECACHE_LOCK_WRITE_TO_READ} /*PAGECACHE_LOCK_WRITE_TO_READ*/
3938 };
3939 
3940 
3941 static struct rw_pin_change write_pin_change_table[]=
3942 {
3943   {PAGECACHE_PIN_LEFT_PINNED,
3944    PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN_LEFT_PINNED*/,
3945   {PAGECACHE_PIN,
3946    PAGECACHE_UNPIN} /*PAGECACHE_PIN_LEFT_UNPINNED*/,
3947   {PAGECACHE_PIN,
3948    PAGECACHE_PIN_LEFT_PINNED} /*PAGECACHE_PIN*/,
3949   {PAGECACHE_PIN_LEFT_PINNED,
3950    PAGECACHE_UNPIN} /*PAGECACHE_UNPIN*/
3951 };
3952 
3953 
3954 /**
3955   @note 'buff', if not NULL, must be long-aligned.
3956 */
3957 
3958 my_bool pagecache_write_part(PAGECACHE *pagecache,
3959                              PAGECACHE_FILE *file,
3960                              pgcache_page_no_t pageno,
3961                              uint level,
3962                              uchar *buff,
3963                              enum pagecache_page_type type,
3964                              enum pagecache_page_lock lock,
3965                              enum pagecache_page_pin pin,
3966                              enum pagecache_write_mode write_mode,
3967                              PAGECACHE_BLOCK_LINK **page_link,
3968                              LSN first_REDO_LSN_for_page,
3969                              uint offset, uint size)
3970 {
3971   PAGECACHE_BLOCK_LINK *block= NULL;
3972   PAGECACHE_BLOCK_LINK *fake_link;
3973   my_bool error= 0;
3974   int need_lock_change= write_lock_change_table[lock].need_lock_change;
3975   my_bool reg_request;
3976 #ifndef DBUG_OFF
3977   char llbuf[22];
3978   DBUG_ENTER("pagecache_write_part");
3979   DBUG_PRINT("enter", ("fd: %u  page: %s  level: %u  type: %s  lock: %s  "
3980                        "pin: %s   mode: %s  offset: %u  size %u",
3981                        (uint) file->file, ullstr(pageno, llbuf), level,
3982                        page_cache_page_type_str[type],
3983                        page_cache_page_lock_str[lock],
3984                        page_cache_page_pin_str[pin],
3985                        page_cache_page_write_mode_str[write_mode],
3986                        offset, size));
3987   DBUG_ASSERT(type != PAGECACHE_READ_UNKNOWN_PAGE);
3988   DBUG_ASSERT(lock != PAGECACHE_LOCK_LEFT_READLOCKED);
3989   DBUG_ASSERT(lock != PAGECACHE_LOCK_READ_UNLOCK);
3990   DBUG_ASSERT(offset + size <= pagecache->block_size);
3991   DBUG_ASSERT(pageno < ((1ULL) << 40));
3992 #endif
3993 
3994   if (!page_link)
3995     page_link= &fake_link;
3996   *page_link= 0;
3997 
3998 restart:
3999 
4000 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
4001   DBUG_EXECUTE("check_pagecache",
4002                test_key_cache(pagecache, "start of key_cache_write", 1););
4003 #endif
4004 
4005   if (pagecache->can_be_used)
4006   {
4007     /* Key cache is used */
4008     int page_st;
4009     my_bool need_page_ready_signal= FALSE;
4010 
4011     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4012     if (!pagecache->can_be_used)
4013     {
4014       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4015       goto no_key_cache;
4016     }
4017 
4018     inc_counter_for_resize_op(pagecache);
4019     pagecache->global_cache_w_requests++;
4020     /*
4021       Here we register a request if the page was not already pinned.
4022       See NOTE for pagecache_unlock about registering requests.
4023     */
4024     reg_request= ((pin == PAGECACHE_PIN_LEFT_UNPINNED) ||
4025                   (pin == PAGECACHE_PIN));
4026     block= find_block(pagecache, file, pageno, level,
4027                       TRUE, FALSE,
4028                       reg_request, &page_st);
4029     if (!block)
4030     {
4031       DBUG_ASSERT(write_mode != PAGECACHE_WRITE_DONE);
4032       /* It happens only for requests submitted during resize operation */
4033       dec_counter_for_resize_op(pagecache);
4034       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4035       /* Write to the disk key cache is in resize at the moment*/
4036       goto no_key_cache;
4037     }
4038     DBUG_PRINT("info", ("page status: %d", page_st));
4039     if (!(block->status & PCBLOCK_ERROR) &&
4040         ((page_st == PAGE_TO_BE_READ &&
4041           (offset || size < pagecache->block_size)) ||
4042          (page_st == PAGE_WAIT_TO_BE_READ)))
4043     {
4044       /* The requested page is to be read into the block buffer */
4045       read_block(pagecache, block,
4046                  (my_bool)(page_st == PAGE_TO_BE_READ));
4047       DBUG_PRINT("info", ("read is done"));
4048     }
4049     else if (page_st == PAGE_TO_BE_READ)
4050     {
4051       need_page_ready_signal= TRUE;
4052     }
4053 
4054     DBUG_ASSERT(block->type == PAGECACHE_EMPTY_PAGE ||
4055                 block->type == PAGECACHE_READ_UNKNOWN_PAGE ||
4056                 block->type == type ||
4057                 /* this is for when going to non-trans to trans */
4058                 (block->type == PAGECACHE_PLAIN_PAGE &&
4059                  type == PAGECACHE_LSN_PAGE));
4060     block->type= type;
4061     /* we write to the page so it has no sense to keep the flag */
4062     block->status&= ~PCBLOCK_DIRECT_W;
4063     DBUG_PRINT("info", ("Drop PCBLOCK_DIRECT_W for block: %p", block));
4064 
4065     if (make_lock_and_pin(pagecache, block,
4066                           write_lock_change_table[lock].new_lock,
4067                           (need_lock_change ?
4068                            write_pin_change_table[pin].new_pin :
4069                            pin), FALSE))
4070     {
4071       /*
4072         We failed to writelock the block, cache is unlocked, and last write
4073         lock is released, we will try to get the block again.
4074       */
4075       if (reg_request)
4076         unreg_request(pagecache, block, 1);
4077       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4078       DBUG_PRINT("info", ("restarting..."));
4079       goto restart;
4080     }
4081 
4082     if (write_mode == PAGECACHE_WRITE_DONE)
4083     {
4084       if (block->status & PCBLOCK_ERROR)
4085       {
4086         my_debug_put_break_here();
4087         DBUG_PRINT("warning", ("Writing on page with error"));
4088       }
4089       else
4090       {
4091         /* Copy data from buff */
4092         memcpy(block->buffer + offset, buff, size);
4093         block->status= PCBLOCK_READ;
4094         KEYCACHE_DBUG_PRINT("key_cache_insert",
4095                             ("Page injection"));
4096         /* Signal that all pending requests for this now can be processed. */
4097         if (block->wqueue[COND_FOR_REQUESTED].last_thread)
4098           wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
4099       }
4100     }
4101     else
4102     {
4103       if (! (block->status & PCBLOCK_CHANGED))
4104           link_to_changed_list(pagecache, block);
4105 
4106       memcpy(block->buffer + offset, buff, size);
4107       block->status|= PCBLOCK_READ;
4108       /* Page is correct again if we made a full write in it */
4109       if (size == pagecache->block_size)
4110         block->status&= ~PCBLOCK_ERROR;
4111     }
4112 
4113     if (need_page_ready_signal &&
4114         block->wqueue[COND_FOR_REQUESTED].last_thread)
4115       wqueue_release_queue(&block->wqueue[COND_FOR_REQUESTED]);
4116 
4117     if (first_REDO_LSN_for_page)
4118     {
4119       /* single write action of the last write action */
4120       DBUG_ASSERT(lock == PAGECACHE_LOCK_WRITE_UNLOCK ||
4121                   lock == PAGECACHE_LOCK_LEFT_UNLOCKED);
4122       DBUG_ASSERT(pin == PAGECACHE_UNPIN ||
4123                   pin == PAGECACHE_PIN_LEFT_UNPINNED);
4124       pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
4125     }
4126 
4127     if (need_lock_change)
4128     {
4129       /*
4130         We don't set rec_lsn of the block; this is ok as for the
4131         Maria-block-record's pages, we always keep pages pinned here.
4132       */
4133       if (make_lock_and_pin(pagecache, block,
4134                             write_lock_change_table[lock].unlock_lock,
4135                             write_pin_change_table[pin].unlock_pin, FALSE))
4136         DBUG_ASSERT(0);
4137     }
4138 
4139     /* Unregister the request */
4140     DBUG_ASSERT(block->hash_link->requests > 0);
4141     block->hash_link->requests--;
4142     /* See NOTE for pagecache_unlock about registering requests. */
4143     if (pin == PAGECACHE_PIN_LEFT_UNPINNED || pin == PAGECACHE_UNPIN)
4144     {
4145       unreg_request(pagecache, block, 1);
4146       DBUG_ASSERT(page_link == &fake_link);
4147     }
4148     else
4149       *page_link= block;
4150 
4151     if (block->status & PCBLOCK_ERROR)
4152     {
4153       error= 1;
4154       my_debug_put_break_here();
4155     }
4156 
4157     dec_counter_for_resize_op(pagecache);
4158 
4159     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4160 
4161     goto end;
4162   }
4163 
4164 no_key_cache:
4165   /*
4166     We can't by pass the normal page cache operations because need
4167     whole page for calling callbacks & so on.
4168     This branch should not be used for now (but it is fixed as it
4169     should be just to avoid confusing)
4170   */
4171   DBUG_ASSERT(0);
4172   /* Key cache is not used */
4173   if (write_mode == PAGECACHE_WRITE_DELAY)
4174   {
4175     /* We can't use mutex here as the key cache may not be initialized */
4176     pagecache->global_cache_w_requests++;
4177     pagecache->global_cache_write++;
4178     if (offset != 0 || size != pagecache->block_size)
4179     {
4180       uchar *page_buffer= (uchar *) alloca(pagecache->block_size);
4181       PAGECACHE_IO_HOOK_ARGS args;
4182       args.page= page_buffer;
4183       args.pageno= pageno;
4184       args.data= file->callback_data;
4185 
4186       pagecache->global_cache_read++;
4187       error= (*file->pre_read_hook)(&args);
4188       if (!error)
4189       {
4190         error= pagecache_fread(pagecache, file,
4191                                page_buffer,
4192                                pageno,
4193                                pagecache->readwrite_flags) != 0;
4194       }
4195       if ((*file->post_read_hook)(error, &args))
4196       {
4197         DBUG_PRINT("error", ("read callback problem"));
4198         error= 1;
4199         goto end;
4200       }
4201       memcpy((char *)page_buffer + offset, buff, size);
4202       buff= page_buffer;
4203     }
4204     if (pagecache_fwrite(pagecache, file, buff, pageno, type,
4205                          pagecache->readwrite_flags))
4206       error= 1;
4207   }
4208 
4209 end:
4210 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
4211   DBUG_EXECUTE("exec",
4212                test_key_cache(pagecache, "end of key_cache_write", 1););
4213 #endif
4214   if (block)
4215     PCBLOCK_INFO(block);
4216   else
4217     DBUG_PRINT("info", ("No block"));
4218   DBUG_RETURN(error);
4219 }
4220 
4221 
4222 /*
4223   Free block: remove reference to it from hash table,
4224   remove it from the chain file of dirty/clean blocks
4225   and add it to the free list.
4226 */
4227 
4228 static my_bool free_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
4229                           my_bool abort_if_pinned)
4230 {
4231   uint status= block->status;
4232   KEYCACHE_THREAD_TRACE("free block");
4233   KEYCACHE_DBUG_PRINT("free_block",
4234                       ("block: %u  hash_link %p",
4235                        PCBLOCK_NUMBER(pagecache, block),
4236                        block->hash_link));
4237   mysql_mutex_assert_owner(&pagecache->cache_lock);
4238   if (block->hash_link)
4239   {
4240     /*
4241       While waiting for readers to finish, new readers might request the
4242       block. But since we set block->status|= PCBLOCK_REASSIGNED, they
4243       will wait on block->wqueue[COND_FOR_SAVED]. They must be signaled
4244       later.
4245     */
4246     block->status|= PCBLOCK_REASSIGNED;
4247     wait_for_readers(pagecache, block);
4248     if (unlikely(abort_if_pinned) && unlikely(block->pins))
4249     {
4250       /*
4251         Block got pinned while waiting for readers.
4252         This can only happens when called from flush_pagecache_blocks_int()
4253         when flushing blocks as part of prepare for maria_close() or from
4254         flush_cached_blocks()
4255       */
4256       block->status&= ~PCBLOCK_REASSIGNED;
4257       unreg_request(pagecache, block, 0);
4258 
4259       /* All pending requests for this page must be resubmitted. */
4260       if (block->wqueue[COND_FOR_SAVED].last_thread)
4261         wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
4262       return 1;
4263     }
4264     unlink_hash(pagecache, block->hash_link);
4265   }
4266 
4267   unlink_changed(block);
4268   DBUG_ASSERT(block->wlocks == 0);
4269   DBUG_ASSERT(block->rlocks == 0);
4270   DBUG_ASSERT(block->rlocks_queue == 0);
4271   DBUG_ASSERT(block->pins == 0);
4272   DBUG_ASSERT((block->status & ~(PCBLOCK_ERROR | PCBLOCK_READ | PCBLOCK_IN_FLUSH | PCBLOCK_CHANGED | PCBLOCK_REASSIGNED | PCBLOCK_DEL_WRITE)) == 0);
4273   DBUG_ASSERT(block->requests >= 1);
4274   DBUG_ASSERT(block->next_used == NULL);
4275   block->status= 0;
4276 #ifdef DBUG_ASSERT_EXISTS
4277   block->type= PAGECACHE_EMPTY_PAGE;
4278 #endif
4279   block->rec_lsn= LSN_MAX;
4280   block->hash_link= NULL;
4281   if (block->temperature == PCBLOCK_WARM)
4282     pagecache->warm_blocks--;
4283   block->temperature= PCBLOCK_COLD;
4284   KEYCACHE_THREAD_TRACE("free block");
4285   KEYCACHE_DBUG_PRINT("free_block",
4286                       ("block is freed"));
4287   unreg_request(pagecache, block, 0);
4288 
4289   /*
4290     Block->requests is != 0 if unreg_requests()/link_block() gave the block
4291     to a waiting thread
4292   */
4293   if (!block->requests)
4294   {
4295     DBUG_ASSERT(block->next_used != 0);
4296 
4297     /* Remove the free block from the LRU ring. */
4298     unlink_block(pagecache, block);
4299     /* Insert the free block in the free list. */
4300     block->next_used= pagecache->free_block_list;
4301     pagecache->free_block_list= block;
4302     /* Keep track of the number of currently unused blocks. */
4303     pagecache->blocks_unused++;
4304   }
4305   else
4306   {
4307     /* keep flag set by link_block() */
4308     block->status= status & PCBLOCK_REASSIGNED;
4309   }
4310 
4311   /* All pending requests for this page must be resubmitted. */
4312   if (block->wqueue[COND_FOR_SAVED].last_thread)
4313     wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
4314 
4315   return 0;
4316 }
4317 
4318 
4319 static int cmp_sec_link(PAGECACHE_BLOCK_LINK **a, PAGECACHE_BLOCK_LINK **b)
4320 {
4321   return (((*a)->hash_link->pageno < (*b)->hash_link->pageno) ? -1 :
4322       ((*a)->hash_link->pageno > (*b)->hash_link->pageno) ? 1 : 0);
4323 }
4324 
4325 
4326 /**
4327   @brief Flush a portion of changed blocks to disk, free used blocks
4328   if requested
4329 
4330   @param pagecache       This page cache reference.
4331   @param file            File which should be flushed
4332   @param cache           Beginning of array of the block.
4333   @param end             Reference to the block after last in the array.
4334   @param flush_type      Type of the flush.
4335   @param first_errno     Where to store first errno of the flush.
4336 
4337 
4338   @return Operation status
4339   @retval PCFLUSH_OK OK
4340   @retval PCFLUSH_ERROR There was errors during the flush process.
4341   @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
4342   @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
4343 */
4344 
4345 static int flush_cached_blocks(PAGECACHE *pagecache,
4346                                PAGECACHE_FILE *file,
4347                                PAGECACHE_BLOCK_LINK **cache,
4348                                PAGECACHE_BLOCK_LINK **end,
4349                                enum flush_type type,
4350                                int *first_errno)
4351 {
4352   int rc= PCFLUSH_OK;
4353   my_bool error;
4354   uint count= (uint) (end-cache);
4355   DBUG_ENTER("flush_cached_blocks");
4356   *first_errno= 0;
4357 
4358   /* Don't lock the cache during the flush */
4359   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4360   /*
4361      As all blocks referred in 'cache' are marked by PCBLOCK_IN_FLUSH
4362      we are guaranteed that no thread will change them
4363   */
4364   qsort((uchar*) cache, count, sizeof(*cache), (qsort_cmp) cmp_sec_link);
4365 
4366   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4367   for (; cache != end; cache++)
4368   {
4369     PAGECACHE_BLOCK_LINK *block= *cache;
4370 
4371     /*
4372       In the case of non_transactional tables we want to flush also
4373       block pinned with reads. This is becasue we may have other
4374       threads reading the block during flush, as non transactional
4375       tables can have many readers while the one writer is doing the
4376       flush.
4377       We don't want to do flush pinned blocks during checkpoint.
4378       We detect the checkpoint case by checking if type is LAZY.
4379     */
4380     if ((type == FLUSH_KEEP_LAZY && block->pins) || block->wlocks)
4381     {
4382       KEYCACHE_DBUG_PRINT("flush_cached_blocks",
4383                           ("block: %u (%p)  pinned",
4384                            PCBLOCK_NUMBER(pagecache, block), block));
4385       DBUG_PRINT("info", ("block: %u (%p)  pinned",
4386                           PCBLOCK_NUMBER(pagecache, block), block));
4387       PCBLOCK_INFO(block);
4388       /* undo the mark put by flush_pagecache_blocks_int(): */
4389       block->status&= ~PCBLOCK_IN_FLUSH;
4390       rc|= PCFLUSH_PINNED;
4391       DBUG_PRINT("warning", ("Page pinned"));
4392       unreg_request(pagecache, block, 1);
4393       if (!*first_errno)
4394         *first_errno= HA_ERR_INTERNAL_ERROR;
4395       continue;
4396     }
4397     if (make_lock_and_pin(pagecache, block,
4398                           PAGECACHE_LOCK_READ, PAGECACHE_PIN, FALSE))
4399       DBUG_ASSERT(0);
4400 
4401     KEYCACHE_PRINT("flush_cached_blocks",
4402                    ("block: %u (%p)  to be flushed",
4403                     PCBLOCK_NUMBER(pagecache, block), block));
4404     DBUG_PRINT("info", ("block: %u (%p) to be flushed",
4405                         PCBLOCK_NUMBER(pagecache, block), block));
4406     PCBLOCK_INFO(block);
4407 
4408     /**
4409        @todo IO If page is contiguous with next page to flush, group flushes
4410        in one single my_pwrite().
4411     */
4412     /**
4413       It is important to use block->hash_link->file below and not 'file', as
4414       the first one is right and the second may have different out-of-date
4415       content (see StaleFilePointersInFlush in ma_checkpoint.c).
4416       @todo change argument of functions to be File.
4417     */
4418     pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4419     error= pagecache_fwrite(pagecache, &block->hash_link->file,
4420                             block->buffer,
4421                             block->hash_link->pageno,
4422                             block->type,
4423                             pagecache->readwrite_flags);
4424     pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4425 
4426     if (make_lock_and_pin(pagecache, block,
4427                           PAGECACHE_LOCK_READ_UNLOCK,
4428                           PAGECACHE_UNPIN, FALSE))
4429       DBUG_ASSERT(0);
4430 
4431     pagecache->global_cache_write++;
4432     if (error)
4433     {
4434       block->status|= PCBLOCK_ERROR;
4435       block->error=   (int16) my_errno;
4436       my_debug_put_break_here();
4437       if (!*first_errno)
4438         *first_errno= my_errno ? my_errno : -1;
4439       rc|= PCFLUSH_ERROR;
4440     }
4441     /*
4442       Let to proceed for possible waiting requests to write to the block page.
4443       It might happen only during an operation to resize the key cache.
4444     */
4445     if (block->wqueue[COND_FOR_SAVED].last_thread)
4446       wqueue_release_queue(&block->wqueue[COND_FOR_SAVED]);
4447     /* type will never be FLUSH_IGNORE_CHANGED here */
4448     if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
4449            type == FLUSH_FORCE_WRITE))
4450     {
4451       if (!free_block(pagecache, block, 1))
4452       {
4453         pagecache->blocks_changed--;
4454         pagecache->global_blocks_changed--;
4455       }
4456       else
4457       {
4458         block->status&= ~PCBLOCK_IN_FLUSH;
4459         link_to_file_list(pagecache, block, file, 1);
4460       }
4461     }
4462     else
4463     {
4464       block->status&= ~PCBLOCK_IN_FLUSH;
4465       link_to_file_list(pagecache, block, file, 1);
4466       unreg_request(pagecache, block, 1);
4467     }
4468   }
4469   DBUG_RETURN(rc);
4470 }
4471 
4472 
4473 /**
4474    @brief flush all blocks for a file to disk but don't do any mutex locks
4475 
4476    @param  pagecache       pointer to a pagecache data structure
4477    @param  file            handler for the file to flush to
4478    @param  flush_type      type of the flush
4479    @param  filter          optional function which tells what blocks to flush;
4480                            can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
4481                            or FLUSH_FORCE_WRITE.
4482    @param  filter_arg      an argument to pass to 'filter'. Information about
4483                            the block will be passed too.
4484 
4485    @note
4486      Flushes all blocks having the same OS file descriptor as 'file->file', so
4487      can flush blocks having '*block->hash_link->file' != '*file'.
4488 
4489    @note
4490      This function doesn't do any mutex locks because it needs to be called
4491      both from flush_pagecache_blocks and flush_all_key_blocks (the later one
4492      does the mutex lock in the resize_pagecache() function).
4493 
4494    @note
4495      This function can cause problems if two threads call it
4496      concurrently on the same file (look for "PageCacheFlushConcurrencyBugs"
4497      in ma_checkpoint.c); to avoid them, it has internal logic to serialize in
4498      this situation.
4499 
4500    @return Operation status
4501    @retval PCFLUSH_OK OK
4502    @retval PCFLUSH_ERROR There was errors during the flush process.
4503    @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
4504    @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
4505 */
4506 
4507 static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
4508                                       PAGECACHE_FILE *file,
4509                                       enum flush_type type,
4510                                       PAGECACHE_FLUSH_FILTER filter,
4511                                       void *filter_arg)
4512 {
4513   PAGECACHE_BLOCK_LINK *cache_buff[FLUSH_CACHE],**cache;
4514   int last_errno= 0;
4515   int rc= PCFLUSH_OK;
4516   DBUG_ENTER("flush_pagecache_blocks_int");
4517   DBUG_PRINT("enter",
4518              ("fd: %d  blocks_used: %zu  blocks_changed: %zu  type: %d",
4519               file->file, pagecache->blocks_used, pagecache->blocks_changed,
4520               type));
4521 
4522 #if !defined(DBUG_OFF) && defined(EXTRA_DEBUG)
4523     DBUG_EXECUTE("check_pagecache",
4524                  test_key_cache(pagecache,
4525                                 "start of flush_pagecache_blocks", 0););
4526 #endif
4527 
4528   cache= cache_buff;
4529   if (pagecache->disk_blocks > 0 &&
4530       (!my_disable_flush_pagecache_blocks ||
4531        (type != FLUSH_KEEP && type != FLUSH_KEEP_LAZY)))
4532   {
4533     /*
4534       Key cache exists. If my_disable_flush_pagecache_blocks is true it
4535       disables the operation but only FLUSH_KEEP[_LAZY]: other flushes still
4536       need to be allowed: FLUSH_RELEASE has to free blocks, and
4537       FLUSH_FORCE_WRITE is to overrule my_disable_flush_pagecache_blocks.
4538     */
4539     int error= 0;
4540     uint count= 0;
4541     PAGECACHE_BLOCK_LINK **pos, **end;
4542     PAGECACHE_BLOCK_LINK *first_in_switch= NULL;
4543     PAGECACHE_BLOCK_LINK *block, *next;
4544 #if defined(PAGECACHE_DEBUG)
4545     uint cnt= 0;
4546 #endif
4547 
4548     struct st_file_in_flush us_flusher, *other_flusher;
4549     us_flusher.file= file->file;
4550     us_flusher.flush_queue.last_thread= NULL;
4551     us_flusher.first_in_switch= FALSE;
4552     while ((other_flusher= (struct st_file_in_flush *)
4553             my_hash_search(&pagecache->files_in_flush, (uchar *)&file->file,
4554                            sizeof(file->file))))
4555     {
4556       /*
4557         File is in flush already: wait, unless FLUSH_KEEP_LAZY. "Flusher"
4558         means "who can mark PCBLOCK_IN_FLUSH", i.e. caller of
4559         flush_pagecache_blocks_int().
4560       */
4561       struct st_my_thread_var *thread;
4562       if (type == FLUSH_KEEP_LAZY)
4563       {
4564         DBUG_PRINT("info",("FLUSH_KEEP_LAZY skips"));
4565         DBUG_RETURN(0);
4566       }
4567       thread= my_thread_var;
4568       wqueue_add_to_queue(&other_flusher->flush_queue, thread);
4569       do
4570       {
4571         DBUG_PRINT("wait",
4572                    ("(1) suspend thread %s %ld",
4573                     thread->name, (ulong) thread->id));
4574         pagecache_pthread_cond_wait(&thread->suspend,
4575                                     &pagecache->cache_lock);
4576       }
4577       while (thread->next);
4578     }
4579     /* we are the only flusher of this file now */
4580     while (my_hash_insert(&pagecache->files_in_flush, (uchar *)&us_flusher))
4581     {
4582       /*
4583         Out of memory, wait for flushers to empty the hash and retry; should
4584         rarely happen. Other threads are flushing the file; when done, they
4585         are going to remove themselves from the hash, and thus memory will
4586         appear again. However, this memory may be stolen by yet another thread
4587         (for a purpose unrelated to page cache), before we retry
4588         my_hash_insert(). So the loop may run for long. Only if the thread was
4589         killed do we abort the loop, returning 1 (error) which can cause the
4590         table to be marked as corrupted (cf maria_chk_size(), maria_close())
4591         and thus require a table check.
4592       */
4593       DBUG_ASSERT(0);
4594       pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4595       if (my_thread_var->abort)
4596         DBUG_RETURN(1);		/* End if aborted by user */
4597       sleep(10);
4598       pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4599     }
4600 
4601     if (type != FLUSH_IGNORE_CHANGED)
4602     {
4603       /*
4604         Count how many key blocks we have to cache to be able
4605         to flush all dirty pages with minimum seek moves.
4606       */
4607       for (block= pagecache->changed_blocks[FILE_HASH(*file, pagecache)] ;
4608            block;
4609            block= block->next_changed)
4610       {
4611         if (block->hash_link->file.file == file->file)
4612         {
4613           count++;
4614           KEYCACHE_DBUG_ASSERT(count<= pagecache->blocks_used);
4615         }
4616       }
4617       count++;    /* Allocate one extra for easy end-of-buffer test */
4618       /* Allocate a new buffer only if its bigger than the one we have */
4619       if (count > FLUSH_CACHE &&
4620           !(cache=
4621             (PAGECACHE_BLOCK_LINK**)
4622             my_malloc(sizeof(PAGECACHE_BLOCK_LINK*)*count, MYF(0))))
4623       {
4624         cache= cache_buff;
4625         count= FLUSH_CACHE;
4626       }
4627     }
4628 
4629     /* Retrieve the blocks and write them to a buffer to be flushed */
4630 restart:
4631     end= (pos= cache)+count;
4632     for (block= pagecache->changed_blocks[FILE_HASH(*file, pagecache)] ;
4633          block;
4634          block= next)
4635     {
4636 #if defined(PAGECACHE_DEBUG)
4637       cnt++;
4638       KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
4639 #endif
4640       next= block->next_changed;
4641       if (block->hash_link->file.file != file->file)
4642         continue;
4643       if (filter != NULL)
4644       {
4645         int filter_res= (*filter)(block->type, block->hash_link->pageno,
4646                                   block->rec_lsn, filter_arg);
4647         DBUG_PRINT("info",("filter returned %d", filter_res));
4648         if (filter_res == FLUSH_FILTER_SKIP_TRY_NEXT)
4649           continue;
4650         if (filter_res == FLUSH_FILTER_SKIP_ALL)
4651           break;
4652         DBUG_ASSERT(filter_res == FLUSH_FILTER_OK);
4653       }
4654       {
4655         DBUG_ASSERT(!(block->status & PCBLOCK_IN_FLUSH));
4656         /*
4657           We care only for the blocks for which flushing was not
4658           initiated by other threads as a result of page swapping
4659         */
4660         if (! (block->status & PCBLOCK_IN_SWITCH))
4661         {
4662           /*
4663             Mark the block with BLOCK_IN_FLUSH in order not to let
4664             other threads to use it for new pages and interfere with
4665             our sequence of flushing dirty file pages
4666           */
4667           block->status|= PCBLOCK_IN_FLUSH;
4668 
4669           reg_requests(pagecache, block, 1);
4670           if (type != FLUSH_IGNORE_CHANGED)
4671           {
4672             *pos++= block;
4673 	    /* It's not a temporary file */
4674             if (pos == end)
4675             {
4676 	      /*
4677 		This happens only if there is not enough
4678 		memory for the big block
4679               */
4680               if ((rc|= flush_cached_blocks(pagecache, file, cache,
4681                                             end, type, &error)) &
4682                   (PCFLUSH_ERROR | PCFLUSH_PINNED))
4683                 last_errno=error;
4684               DBUG_PRINT("info", ("restarting..."));
4685               /*
4686 		Restart the scan as some other thread might have changed
4687 		the changed blocks chain: the blocks that were in switch
4688 		state before the flush started have to be excluded
4689               */
4690               goto restart;
4691             }
4692           }
4693           else
4694           {
4695             /* It's a temporary file */
4696             pagecache->blocks_changed--;
4697 	    pagecache->global_blocks_changed--;
4698             free_block(pagecache, block, 0);
4699           }
4700         }
4701         else if (type != FLUSH_KEEP_LAZY)
4702         {
4703           /*
4704             Link the block into a list of blocks 'in switch', and then we will
4705             wait for this list to be empty, which means they have been flushed
4706           */
4707           unlink_changed(block);
4708           link_changed(block, &first_in_switch);
4709           us_flusher.first_in_switch= TRUE;
4710         }
4711       }
4712     }
4713     if (pos != cache)
4714     {
4715       if ((rc|= flush_cached_blocks(pagecache, file, cache, pos, type,
4716                                     &error)) &
4717           (PCFLUSH_ERROR | PCFLUSH_PINNED))
4718         last_errno= error;
4719     }
4720     /* Wait until list of blocks in switch is empty */
4721     while (first_in_switch)
4722     {
4723 #if defined(PAGECACHE_DEBUG)
4724       cnt= 0;
4725 #endif
4726       block= first_in_switch;
4727       {
4728         struct st_my_thread_var *thread= my_thread_var;
4729         wqueue_add_to_queue(&block->wqueue[COND_FOR_SAVED], thread);
4730         do
4731         {
4732           DBUG_PRINT("wait",
4733                      ("(2) suspend thread %s %ld",
4734                               thread->name, (ulong) thread->id));
4735           pagecache_pthread_cond_wait(&thread->suspend,
4736                                      &pagecache->cache_lock);
4737         }
4738         while (thread->next);
4739       }
4740 #if defined(PAGECACHE_DEBUG)
4741       cnt++;
4742       KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
4743 #endif
4744     }
4745     us_flusher.first_in_switch= FALSE;
4746     /* The following happens very seldom */
4747     if (! (type == FLUSH_KEEP || type == FLUSH_KEEP_LAZY ||
4748            type == FLUSH_FORCE_WRITE))
4749     {
4750       /*
4751         this code would free all blocks while filter maybe handled only a
4752         few, that is not possible.
4753       */
4754       DBUG_ASSERT(filter == NULL);
4755 #if defined(PAGECACHE_DEBUG)
4756       cnt=0;
4757 #endif
4758       for (block= pagecache->file_blocks[FILE_HASH(*file, pagecache)] ;
4759            block;
4760            block= next)
4761       {
4762 #if defined(PAGECACHE_DEBUG)
4763         cnt++;
4764         KEYCACHE_DBUG_ASSERT(cnt <= pagecache->blocks_used);
4765 #endif
4766         next= block->next_changed;
4767         if (block->hash_link->file.file == file->file &&
4768             !block->pins &&
4769             (! (block->status & PCBLOCK_CHANGED)
4770              || type == FLUSH_IGNORE_CHANGED))
4771         {
4772           reg_requests(pagecache, block, 1);
4773           free_block(pagecache, block, 1);
4774         }
4775       }
4776     }
4777     /* wake up others waiting to flush this file */
4778     my_hash_delete(&pagecache->files_in_flush, (uchar *)&us_flusher);
4779     if (us_flusher.flush_queue.last_thread)
4780       wqueue_release_queue(&us_flusher.flush_queue);
4781   }
4782 
4783   DBUG_EXECUTE("check_pagecache",
4784                test_key_cache(pagecache, "end of flush_pagecache_blocks", 0););
4785   if (cache != cache_buff)
4786     my_free(cache);
4787   if (rc != 0)
4788   {
4789     if (last_errno)
4790       my_errno= last_errno;                /* Return first error */
4791     DBUG_PRINT("error", ("Got error: %d", my_errno));
4792   }
4793   DBUG_RETURN(rc);
4794 }
4795 
4796 
4797 /**
4798    @brief flush all blocks for a file to disk
4799 
4800    @param  pagecache       pointer to a pagecache data structure
4801    @param  file            handler for the file to flush to
4802    @param  flush_type      type of the flush
4803    @param  filter          optional function which tells what blocks to flush;
4804                            can be non-NULL only if FLUSH_KEEP, FLUSH_KEEP_LAZY
4805                            or FLUSH_FORCE_WRITE.
4806    @param  filter_arg      an argument to pass to 'filter'. Information about
4807                            the block will be passed too.
4808 
4809    @return Operation status
4810    @retval PCFLUSH_OK OK
4811    @retval PCFLUSH_ERROR There was errors during the flush process.
4812    @retval PCFLUSH_PINNED Pinned blocks was met and skipped.
4813    @retval PCFLUSH_PINNED_AND_ERROR PCFLUSH_ERROR and PCFLUSH_PINNED.
4814 */
4815 
4816 int flush_pagecache_blocks_with_filter(PAGECACHE *pagecache,
4817                                        PAGECACHE_FILE *file,
4818                                        enum flush_type type,
4819                                        PAGECACHE_FLUSH_FILTER filter,
4820                                        void *filter_arg)
4821 {
4822   int res;
4823   DBUG_ENTER("flush_pagecache_blocks_with_filter");
4824   DBUG_PRINT("enter", ("pagecache: %p", pagecache));
4825 
4826   if (pagecache->disk_blocks <= 0)
4827     DBUG_RETURN(0);
4828   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4829   inc_counter_for_resize_op(pagecache);
4830   res= flush_pagecache_blocks_int(pagecache, file, type, filter, filter_arg);
4831   dec_counter_for_resize_op(pagecache);
4832   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
4833   DBUG_RETURN(res);
4834 }
4835 
4836 
4837 /*
4838   Reset the counters of a key cache.
4839 
4840   SYNOPSIS
4841     reset_pagecache_counters()
4842     name       the name of a key cache
4843     pagecache  pointer to the pagecache to be reset
4844 
4845   DESCRIPTION
4846     This procedure is used to reset the counters of all currently used key
4847     caches, both the default one and the named ones.
4848 
4849   RETURN
4850     0 on success (always because it can't fail)
4851 */
4852 
4853 int reset_pagecache_counters(const char *name __attribute__((unused)),
4854                              PAGECACHE *pagecache)
4855 {
4856   DBUG_ENTER("reset_pagecache_counters");
4857   if (!pagecache->inited)
4858   {
4859     DBUG_PRINT("info", ("Key cache %s not initialized.", name));
4860     DBUG_RETURN(0);
4861   }
4862   DBUG_PRINT("info", ("Resetting counters for key cache %s.", name));
4863 
4864   pagecache->global_blocks_changed= 0;   /* Key_blocks_not_flushed */
4865   pagecache->global_cache_r_requests= 0; /* Key_read_requests */
4866   pagecache->global_cache_read= 0;       /* Key_reads */
4867   pagecache->global_cache_w_requests= 0; /* Key_write_requests */
4868   pagecache->global_cache_write= 0;      /* Key_writes */
4869   DBUG_RETURN(0);
4870 }
4871 
4872 
4873 /**
4874    @brief Allocates a buffer and stores in it some info about all dirty pages
4875 
4876    Does the allocation because the caller cannot know the size itself.
4877    Memory freeing is to be done by the caller (if the "str" member of the
4878    LEX_STRING is not NULL).
4879    Ignores all pages of another type than PAGECACHE_LSN_PAGE, because they
4880    are not interesting for a checkpoint record.
4881    The caller has the intention of doing checkpoints.
4882 
4883    @param       pagecache   pointer to the page cache
4884    @param[out]  str         pointer to where the allocated buffer, and
4885                             its size, will be put
4886    @param[out]  min_rec_lsn pointer to where the minimum rec_lsn of all
4887                             relevant dirty pages will be put
4888    @return Operation status
4889      @retval 0      OK
4890      @retval 1      Error
4891 */
4892 
4893 my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
4894                                                   LEX_STRING *str,
4895                                                   LSN *min_rec_lsn)
4896 {
4897   my_bool error= 0;
4898   size_t stored_list_size= 0;
4899   uint file_hash;
4900   char *ptr;
4901   LSN minimum_rec_lsn= LSN_MAX;
4902   DBUG_ENTER("pagecache_collect_changed_blocks_with_LSN");
4903 
4904   DBUG_ASSERT(NULL == str->str);
4905   /*
4906     We lock the entire cache but will be quick, just reading/writing a few MBs
4907     of memory at most.
4908   */
4909   pagecache_pthread_mutex_lock(&pagecache->cache_lock);
4910   for (;;)
4911   {
4912     struct st_file_in_flush *other_flusher;
4913     for (file_hash= 0;
4914          (other_flusher= (struct st_file_in_flush *)
4915           my_hash_element(&pagecache->files_in_flush, file_hash)) != NULL &&
4916            !other_flusher->first_in_switch;
4917          file_hash++)
4918     {}
4919     if (other_flusher == NULL)
4920       break;
4921     /*
4922       other_flusher.first_in_switch is true: some thread is flushing a file
4923       and has removed dirty blocks from changed_blocks[] while they were still
4924       dirty (they were being evicted (=>flushed) by yet another thread, which
4925       may not have flushed the block yet so it may still be dirty).
4926       If Checkpoint proceeds now, it will not see the page. If there is a
4927       crash right after writing the checkpoint record, before the page is
4928       flushed, at recovery the page will be wrongly ignored because it won't
4929       be in the dirty pages list in the checkpoint record. So wait.
4930     */
4931     {
4932       struct st_my_thread_var *thread= my_thread_var;
4933       wqueue_add_to_queue(&other_flusher->flush_queue, thread);
4934       do
4935       {
4936         DBUG_PRINT("wait",
4937                    ("suspend thread %s %ld", thread->name,
4938                     (ulong) thread->id));
4939         pagecache_pthread_cond_wait(&thread->suspend,
4940                                     &pagecache->cache_lock);
4941       }
4942       while (thread->next);
4943     }
4944   }
4945 
4946   /* Count how many dirty pages are interesting */
4947   for (file_hash= 0; file_hash < pagecache->changed_blocks_hash_size; file_hash++)
4948   {
4949     PAGECACHE_BLOCK_LINK *block;
4950     for (block= pagecache->changed_blocks[file_hash] ;
4951          block;
4952          block= block->next_changed)
4953     {
4954       /*
4955         Q: is there something subtle with block->hash_link: can it be NULL?
4956         does it have to be == hash_link->block... ?
4957       */
4958       DBUG_ASSERT(block->hash_link != NULL);
4959       DBUG_ASSERT(block->status & PCBLOCK_CHANGED);
4960       /*
4961         Note that we don't store bitmap pages, or pages from non-transactional
4962         (like temporary) tables. Don't checkpoint during Recovery which uses
4963         PAGECACHE_PLAIN_PAGE.
4964       */
4965       if (block->type != PAGECACHE_LSN_PAGE)
4966         continue; /* no need to store it */
4967       stored_list_size++;
4968     }
4969   }
4970 
4971   compile_time_assert(sizeof(pagecache->blocks) <= 8);
4972   str->length= 8 + /* number of dirty pages */
4973     (2 + /* table id */
4974      1 + /* data or index file */
4975      5 + /* pageno */
4976      LSN_STORE_SIZE /* rec_lsn */
4977      ) * stored_list_size;
4978   if (NULL == (str->str= my_malloc(str->length, MYF(MY_WME))))
4979     goto err;
4980   ptr= str->str;
4981   int8store(ptr, (ulonglong)stored_list_size);
4982   ptr+= 8;
4983   DBUG_PRINT("info", ("found %zu dirty pages", stored_list_size));
4984   if (stored_list_size == 0)
4985     goto end;
4986   for (file_hash= 0; file_hash < pagecache->changed_blocks_hash_size; file_hash++)
4987   {
4988     PAGECACHE_BLOCK_LINK *block;
4989     for (block= pagecache->changed_blocks[file_hash] ;
4990          block;
4991          block= block->next_changed)
4992     {
4993       uint16 table_id;
4994       MARIA_SHARE *share;
4995       if (block->type != PAGECACHE_LSN_PAGE)
4996         continue; /* no need to store it in the checkpoint record */
4997       share= (MARIA_SHARE *)(block->hash_link->file.callback_data);
4998       table_id= share->id;
4999       int2store(ptr, table_id);
5000       ptr+= 2;
5001       ptr[0]= (share->kfile.file == block->hash_link->file.file);
5002       ptr++;
5003       DBUG_ASSERT(block->hash_link->pageno < ((1ULL) << 40));
5004       page_store(ptr, block->hash_link->pageno);
5005       ptr+= PAGE_STORE_SIZE;
5006       lsn_store(ptr, block->rec_lsn);
5007       ptr+= LSN_STORE_SIZE;
5008       if (block->rec_lsn != LSN_MAX)
5009       {
5010         DBUG_ASSERT(LSN_VALID(block->rec_lsn));
5011         if (cmp_translog_addr(block->rec_lsn, minimum_rec_lsn) < 0)
5012           minimum_rec_lsn= block->rec_lsn;
5013       } /* otherwise, some trn->rec_lsn should hold the correct info */
5014     }
5015   }
5016 end:
5017   pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
5018   *min_rec_lsn= minimum_rec_lsn;
5019   DBUG_RETURN(error);
5020 
5021 err:
5022   error= 1;
5023   goto end;
5024 }
5025 
5026 
5027 #ifndef DBUG_OFF
5028 
5029 /**
5030   Verifies that a file has no dirty pages.
5031 */
5032 
5033 void pagecache_file_no_dirty_page(PAGECACHE *pagecache, PAGECACHE_FILE *file)
5034 {
5035   File fd= file->file;
5036   PAGECACHE_BLOCK_LINK *block;
5037   for (block= pagecache->changed_blocks[FILE_HASH(*file, pagecache)];
5038        block != NULL;
5039        block= block->next_changed)
5040     if (block->hash_link->file.file == fd)
5041     {
5042       DBUG_PRINT("info", ("pagecache_file_not_in error"));
5043       PCBLOCK_INFO(block);
5044       DBUG_ASSERT(0);
5045     }
5046 }
5047 
5048 
5049 /*
5050   Test if disk-cache is ok
5051 */
5052 static void test_key_cache(PAGECACHE *pagecache __attribute__((unused)),
5053                            const char *where __attribute__((unused)),
5054                            my_bool lock __attribute__((unused)))
5055 {
5056   /* TODO */
5057 }
5058 #endif
5059 
5060 uchar *pagecache_block_link_to_buffer(PAGECACHE_BLOCK_LINK *block)
5061 {
5062   return block->buffer;
5063 }
5064 
5065 #if defined(PAGECACHE_TIMEOUT)
5066 
5067 #define KEYCACHE_DUMP_FILE  "pagecache_dump.txt"
5068 #define MAX_QUEUE_LEN  100
5069 
5070 
5071 static void pagecache_dump(PAGECACHE *pagecache)
5072 {
5073   FILE *pagecache_dump_file=fopen(KEYCACHE_DUMP_FILE, "w");
5074   struct st_my_thread_var *last;
5075   struct st_my_thread_var *thread;
5076   PAGECACHE_BLOCK_LINK *block;
5077   PAGECACHE_HASH_LINK *hash_link;
5078   PAGECACHE_PAGE *page;
5079   uint i;
5080 
5081   fprintf(pagecache_dump_file, "thread: %s %ld\n", thread->name,
5082           (ulong) thread->id);
5083 
5084   i=0;
5085   thread=last=waiting_for_hash_link.last_thread;
5086   fprintf(pagecache_dump_file, "queue of threads waiting for hash link\n");
5087   if (thread)
5088     do
5089     {
5090       thread= thread->next;
5091       page= (PAGECACHE_PAGE *) thread->keycache_link;
5092       fprintf(pagecache_dump_file,
5093               "thread: %s %ld, (file,pageno)=(%u,%lu)\n",
5094               thread->name, (ulong) thread->id,
5095               (uint) page->file.file,(ulong) page->pageno);
5096       if (++i == MAX_QUEUE_LEN)
5097         break;
5098     }
5099     while (thread != last);
5100 
5101   i=0;
5102   thread=last=waiting_for_block.last_thread;
5103   fprintf(pagecache_dump_file, "queue of threads waiting for block\n");
5104   if (thread)
5105     do
5106     {
5107       thread=thread->next;
5108       hash_link= (PAGECACHE_HASH_LINK *) thread->keycache_link;
5109       fprintf(pagecache_dump_file,
5110               "thread: %s %u hash_link:%u (file,pageno)=(%u,%lu)\n",
5111               thread->name, (ulong) thread->id,
5112               (uint) PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link),
5113         (uint) hash_link->file.file,(ulong) hash_link->pageno);
5114       if (++i == MAX_QUEUE_LEN)
5115         break;
5116     }
5117     while (thread != last);
5118 
5119   for (i=0 ; i < pagecache->blocks_used ; i++)
5120   {
5121     int j;
5122     block= &pagecache->block_root[i];
5123     hash_link= block->hash_link;
5124     fprintf(pagecache_dump_file,
5125             "block:%u hash_link:%d status:%x #requests=%u waiting_for_readers:%d\n",
5126             i, (int) (hash_link ?
5127                       PAGECACHE_HASH_LINK_NUMBER(pagecache, hash_link) :
5128                       -1),
5129             block->status, block->requests, block->condvar ? 1 : 0);
5130     for (j=0 ; j < COND_SIZE; j++)
5131     {
5132       PAGECACHE_WQUEUE *wqueue=&block->wqueue[j];
5133       thread= last= wqueue->last_thread;
5134       fprintf(pagecache_dump_file, "queue #%d\n", j);
5135       if (thread)
5136       {
5137         do
5138         {
5139           thread=thread->next;
5140           fprintf(pagecache_dump_file,
5141                   "thread: %s %ld\n", thread->name, (ulong) thread->id);
5142           if (++i == MAX_QUEUE_LEN)
5143             break;
5144         }
5145         while (thread != last);
5146       }
5147     }
5148   }
5149   fprintf(pagecache_dump_file, "LRU chain:");
5150   block= pagecache= used_last;
5151   if (block)
5152   {
5153     do
5154     {
5155       block= block->next_used;
5156       fprintf(pagecache_dump_file,
5157               "block:%u, ", PCBLOCK_NUMBER(pagecache, block));
5158     }
5159     while (block != pagecache->used_last);
5160   }
5161   fprintf(pagecache_dump_file, "\n");
5162 
5163   fclose(pagecache_dump_file);
5164 }
5165 
5166 #endif /* defined(PAGECACHE_TIMEOUT) */
5167 
5168 #if defined(PAGECACHE_TIMEOUT) && !defined(__WIN__)
5169 
5170 
5171 static int pagecache_pthread_cond_wait(mysql_cond_t *cond,
5172                                       mysql_mutex_t *mutex)
5173 {
5174   int rc;
5175   struct timeval  now;            /* time when we started waiting        */
5176   struct timespec timeout;        /* timeout value for the wait function */
5177   struct timezone tz;
5178 #if defined(PAGECACHE_DEBUG)
5179   int cnt=0;
5180 #endif
5181 
5182   /* Get current time */
5183   gettimeofday(&now, &tz);
5184   /* Prepare timeout value */
5185   timeout.tv_sec= now.tv_sec + PAGECACHE_TIMEOUT;
5186  /*
5187    timeval uses microseconds.
5188    timespec uses nanoseconds.
5189    1 nanosecond = 1000 micro seconds
5190  */
5191   timeout.tv_nsec= now.tv_usec * 1000;
5192   KEYCACHE_THREAD_TRACE_END("started waiting");
5193 #if defined(PAGECACHE_DEBUG)
5194   cnt++;
5195   if (cnt % 100 == 0)
5196     fprintf(pagecache_debug_log, "waiting...\n");
5197     fflush(pagecache_debug_log);
5198 #endif
5199   rc= mysql_cond_timedwait(cond, mutex, &timeout);
5200   KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
5201   if (rc == ETIMEDOUT || rc == ETIME)
5202   {
5203 #if defined(PAGECACHE_DEBUG)
5204     fprintf(pagecache_debug_log,"aborted by pagecache timeout\n");
5205     fclose(pagecache_debug_log);
5206     abort();
5207 #endif
5208     pagecache_dump();
5209   }
5210 
5211 #if defined(PAGECACHE_DEBUG)
5212   KEYCACHE_DBUG_ASSERT(rc != ETIMEDOUT);
5213 #else
5214   assert(rc != ETIMEDOUT);
5215 #endif
5216   return rc;
5217 }
5218 #else
5219 #if defined(PAGECACHE_DEBUG)
5220 static int pagecache_pthread_cond_wait(mysql_cond_t *cond,
5221                                       mysql_mutex_t *mutex)
5222 {
5223   int rc;
5224   KEYCACHE_THREAD_TRACE_END("started waiting");
5225   rc= mysql_cond_wait(cond, mutex);
5226   KEYCACHE_THREAD_TRACE_BEGIN("finished waiting");
5227   return rc;
5228 }
5229 #endif
5230 #endif /* defined(PAGECACHE_TIMEOUT) && !defined(__WIN__) */
5231 
5232 #if defined(PAGECACHE_DEBUG)
5233 static int ___pagecache_pthread_mutex_lock(mysql_mutex_t *mutex)
5234 {
5235   int rc;
5236   rc= mysql_mutex_lock(mutex);
5237   KEYCACHE_THREAD_TRACE_BEGIN("");
5238   return rc;
5239 }
5240 
5241 
5242 static void ___pagecache_pthread_mutex_unlock(mysql_mutex_t *mutex)
5243 {
5244   KEYCACHE_THREAD_TRACE_END("");
5245   mysql_mutex_unlock(mutex);
5246 }
5247 
5248 
5249 static int ___pagecache_pthread_cond_signal(mysql_cond_t *cond)
5250 {
5251   int rc;
5252   KEYCACHE_THREAD_TRACE("signal");
5253   rc= mysql_cond_signal(cond);
5254   return rc;
5255 }
5256 
5257 
5258 #if defined(PAGECACHE_DEBUG_LOG)
5259 
5260 
5261 static void pagecache_debug_print(const char * fmt, ...)
5262 {
5263   va_list args;
5264   va_start(args,fmt);
5265   if (pagecache_debug_log)
5266   {
5267     VOID(vfprintf(pagecache_debug_log, fmt, args));
5268     VOID(fputc('\n',pagecache_debug_log));
5269   }
5270   va_end(args);
5271 }
5272 #endif /* defined(PAGECACHE_DEBUG_LOG) */
5273 
5274 #if defined(PAGECACHE_DEBUG_LOG)
5275 
5276 
5277 void pagecache_debug_log_close(void)
5278 {
5279   if (pagecache_debug_log)
5280     fclose(pagecache_debug_log);
5281 }
5282 #endif /* defined(PAGECACHE_DEBUG_LOG) */
5283 
5284 #endif /* defined(PAGECACHE_DEBUG) */
5285 
5286 /**
5287   @brief null hooks
5288 */
5289 
5290 static my_bool null_pre_hook(PAGECACHE_IO_HOOK_ARGS *args
5291                              __attribute__((unused)))
5292 {
5293   return 0;
5294 }
5295 
5296 static my_bool null_post_read_hook(int res, PAGECACHE_IO_HOOK_ARGS *args
5297                                    __attribute__((unused)))
5298 {
5299   return res != 0;
5300 }
5301 
5302 static void null_post_write_hook(int res __attribute__((unused)),
5303                                  PAGECACHE_IO_HOOK_ARGS *args
5304                                  __attribute__((unused)))
5305 {
5306   return;
5307 }
5308 
5309 void
5310 pagecache_file_set_null_hooks(PAGECACHE_FILE *file)
5311 {
5312   file->pre_read_hook= null_pre_hook;
5313   file->post_read_hook= null_post_read_hook;
5314   file->pre_write_hook= null_pre_hook;
5315   file->post_write_hook= null_post_write_hook;
5316   file->flush_log_callback= null_pre_hook;
5317   file->callback_data= NULL;
5318 }
5319