1 /* Copyright (c) 2014, 2021, Oracle and/or its affiliates.
2 
3   This program is free software; you can redistribute it and/or modify
4   it under the terms of the GNU General Public License, version 2.0,
5   as published by the Free Software Foundation.
6 
7   This program is also distributed with certain software (including
8   but not limited to OpenSSL) that is licensed under separate terms,
9   as designated in a particular file or component or in included license
10   documentation.  The authors of MySQL hereby grant you an additional
11   permission to link the program and your derivative works with the
12   separately licensed software that they have included with MySQL.
13 
14   This program is distributed in the hope that it will be useful,
15   but WITHOUT ANY WARRANTY; without even the implied warranty of
16   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17   GNU General Public License, version 2.0, for more details.
18 
19   You should have received a copy of the GNU General Public License
20   along with this program; if not, write to the Free Software Foundation,
21   51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
22 
23 #ifndef PFS_BUFFER_CONTAINER_H
24 #define PFS_BUFFER_CONTAINER_H
25 
26 #include "my_global.h"
27 #include "pfs.h" // PSI_COUNT_VOLATILITY
28 #include "pfs_lock.h"
29 #include "pfs_instr.h"
30 #include "pfs_setup_actor.h"
31 #include "pfs_setup_object.h"
32 #include "pfs_program.h"
33 #include "pfs_prepared_stmt.h"
34 #include "pfs_builtin_memory.h"
35 
36 #define USE_SCALABLE
37 
38 class PFS_opaque_container_page;
39 class PFS_opaque_container;
40 
41 struct PFS_builtin_memory_class;
42 
43 template <class T>
44 class PFS_buffer_const_iterator;
45 
46 template <class T>
47 class PFS_buffer_processor;
48 
49 template <class T, class U, class V>
50 class PFS_buffer_iterator;
51 
52 template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
53 class PFS_buffer_scalable_iterator;
54 
55 template <class T>
56 class PFS_buffer_default_array;
57 
58 template <class T>
59 class PFS_buffer_default_allocator;
60 
61 template <class T, class U, class V>
62 class PFS_buffer_container;
63 
64 template <class T, int PFS_PAGE_SIZE, int PFS_PAGE_COUNT, class U, class V>
65 class PFS_buffer_scalable_container;
66 
67 template <class B, int COUNT>
68 class PFS_partitioned_buffer_scalable_iterator;
69 
70 template <class B, int COUNT>
71 class PFS_partitioned_buffer_scalable_container;
72 
73 
74 template <class T>
75 class PFS_buffer_default_array
76 {
77 public:
78   typedef T value_type;
79 
allocate(pfs_dirty_state * dirty_state)80   value_type *allocate(pfs_dirty_state *dirty_state)
81   {
82     uint index;
83     uint monotonic;
84     uint monotonic_max;
85     value_type *pfs;
86 
87     if (m_full)
88       return NULL;
89 
90     monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
91     monotonic_max= monotonic + static_cast<uint>(m_max);
92 
93     while (monotonic < monotonic_max)
94     {
95       index= monotonic % m_max;
96       pfs= m_ptr + index;
97 
98       if (pfs->m_lock.free_to_dirty(dirty_state))
99       {
100         return pfs;
101       }
102       monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
103     }
104 
105     m_full= true;
106     return NULL;
107   }
108 
deallocate(value_type * pfs)109   void deallocate(value_type *pfs)
110   {
111     pfs->m_lock.allocated_to_free();
112     m_full= false;
113   }
114 
get_first()115   T* get_first()
116   {
117     return m_ptr;
118   }
119 
get_last()120   T* get_last()
121   {
122     return m_ptr + m_max;
123   }
124 
125   bool m_full;
126   PFS_cacheline_uint32 m_monotonic;
127   T * m_ptr;
128   size_t m_max;
129   /** Container. */
130   PFS_opaque_container *m_container;
131 };
132 
133 template <class T>
134 class PFS_buffer_default_allocator
135 {
136 public:
137   typedef PFS_buffer_default_array<T> array_type;
138 
PFS_buffer_default_allocator(PFS_builtin_memory_class * klass)139   PFS_buffer_default_allocator(PFS_builtin_memory_class *klass)
140     : m_builtin_class(klass)
141   {}
142 
alloc_array(array_type * array)143   int alloc_array(array_type *array)
144   {
145     array->m_ptr= NULL;
146     array->m_full= true;
147     array->m_monotonic.m_u32= 0;
148 
149     if (array->m_max > 0)
150     {
151       array->m_ptr= PFS_MALLOC_ARRAY(m_builtin_class,
152                                      array->m_max, sizeof(T), T, MYF(MY_ZEROFILL));
153       if (array->m_ptr == NULL)
154         return 1;
155       array->m_full= false;
156     }
157     return 0;
158   }
159 
free_array(array_type * array)160   void free_array(array_type *array)
161   {
162     assert(array->m_max > 0);
163 
164     PFS_FREE_ARRAY(m_builtin_class,
165                    array->m_max, sizeof(T), array->m_ptr);
166     array->m_ptr= NULL;
167   }
168 
169 private:
170   PFS_builtin_memory_class *m_builtin_class;
171 };
172 
173 template <class T,
174           class U = PFS_buffer_default_array<T>,
175           class V = PFS_buffer_default_allocator<T> >
176 class PFS_buffer_container
177 {
178 public:
179   friend class PFS_buffer_iterator<T, U, V>;
180 
181   typedef T value_type;
182   typedef U array_type;
183   typedef V allocator_type;
184   typedef PFS_buffer_const_iterator<T> const_iterator_type;
185   typedef PFS_buffer_iterator<T, U, V> iterator_type;
186   typedef PFS_buffer_processor<T> processor_type;
187   typedef void (*function_type)(value_type *);
188 
PFS_buffer_container(allocator_type * allocator)189   PFS_buffer_container(allocator_type *allocator)
190   {
191     m_array.m_full= true;
192     m_array.m_ptr= NULL;
193     m_array.m_max= 0;
194     m_array.m_monotonic.m_u32= 0;
195     m_lost= 0;
196     m_max= 0;
197     m_allocator= allocator;
198   }
199 
init(ulong max_size)200   int init(ulong max_size)
201   {
202     if (max_size > 0)
203     {
204       m_array.m_max= max_size;
205       int rc= m_allocator->alloc_array(& m_array);
206       if (rc != 0)
207       {
208         m_allocator->free_array(& m_array);
209         return 1;
210       }
211       m_max= max_size;
212       m_array.m_full= false;
213     }
214     return 0;
215   }
216 
cleanup()217   void cleanup()
218   {
219     m_allocator->free_array(& m_array);
220   }
221 
get_row_count()222   ulong get_row_count() const
223   {
224     return m_max;
225   }
226 
get_row_size()227   ulong get_row_size() const
228   {
229     return sizeof(value_type);
230   }
231 
get_memory()232   ulong get_memory() const
233   {
234     return get_row_count() * get_row_size();
235   }
236 
allocate(pfs_dirty_state * dirty_state)237   value_type *allocate(pfs_dirty_state *dirty_state)
238   {
239     value_type *pfs;
240 
241     pfs= m_array.allocate(dirty_state, m_max);
242     if (pfs == NULL)
243     {
244       m_lost++;
245     }
246 
247     return pfs;
248   }
249 
deallocate(value_type * pfs)250   void deallocate(value_type *pfs)
251   {
252     m_array.deallocate(pfs);
253   }
254 
iterate()255   iterator_type iterate()
256   {
257     return PFS_buffer_iterator<T, U, V>(this, 0);
258   }
259 
iterate(uint index)260   iterator_type iterate(uint index)
261   {
262     assert(index <= m_max);
263     return PFS_buffer_iterator<T, U, V>(this, index);
264   }
265 
apply(function_type fct)266   void apply(function_type fct)
267   {
268     value_type *pfs= m_array.get_first();
269     value_type *pfs_last= m_array.get_last();
270 
271     while (pfs < pfs_last)
272     {
273       if (pfs->m_lock.is_populated())
274       {
275         fct(pfs);
276       }
277       pfs++;
278     }
279   }
280 
apply_all(function_type fct)281   void apply_all(function_type fct)
282   {
283     value_type *pfs= m_array.get_first();
284     value_type *pfs_last= m_array.get_last();
285 
286     while (pfs < pfs_last)
287     {
288       fct(pfs);
289       pfs++;
290     }
291   }
292 
apply(processor_type & proc)293   void apply(processor_type & proc)
294   {
295     value_type *pfs= m_array.get_first();
296     value_type *pfs_last= m_array.get_last();
297 
298     while (pfs < pfs_last)
299     {
300       if (pfs->m_lock.is_populated())
301       {
302         proc(pfs);
303       }
304       pfs++;
305     }
306   }
307 
apply_all(processor_type & proc)308   void apply_all(processor_type & proc)
309   {
310     value_type *pfs= m_array.get_first();
311     value_type *pfs_last= m_array.get_last();
312 
313     while (pfs < pfs_last)
314     {
315       proc(pfs);
316       pfs++;
317     }
318   }
319 
get(uint index)320   inline value_type* get(uint index)
321   {
322     assert(index < m_max);
323 
324     value_type *pfs= m_array.m_ptr + index;
325     if (pfs->m_lock.is_populated())
326     {
327       return pfs;
328     }
329 
330     return NULL;
331   }
332 
get(uint index,bool * has_more)333   value_type* get(uint index, bool *has_more)
334   {
335     if (index >= m_max)
336     {
337       *has_more= false;
338       return NULL;
339     }
340 
341     *has_more= true;
342     return get(index);
343   }
344 
sanitize(value_type * unsafe)345   value_type *sanitize(value_type *unsafe)
346   {
347     intptr offset;
348     value_type *pfs= m_array.get_first();
349     value_type *pfs_last= m_array.get_last();
350 
351     if ((pfs <= unsafe) &&
352         (unsafe < pfs_last))
353     {
354       offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
355       if (offset == 0)
356         return unsafe;
357     }
358 
359     return NULL;
360   }
361 
362   ulong m_lost;
363 
364 private:
scan_next(uint & index,uint * found_index)365   value_type* scan_next(uint & index, uint * found_index)
366   {
367     assert(index <= m_max);
368 
369     value_type *pfs_first= m_array.get_first();
370     value_type *pfs= pfs_first + index;
371     value_type *pfs_last= m_array.get_last();
372 
373     while (pfs < pfs_last)
374     {
375       if (pfs->m_lock.is_populated())
376       {
377         uint found= pfs - pfs_first;
378         *found_index= found;
379         index= found + 1;
380         return pfs;
381       }
382       pfs++;
383     }
384 
385     index= m_max;
386     return NULL;
387   }
388 
389   ulong m_max;
390   array_type m_array;
391   allocator_type *m_allocator;
392 };
393 
394 template <class T,
395           int PFS_PAGE_SIZE,
396           int PFS_PAGE_COUNT,
397           class U = PFS_buffer_default_array<T>,
398           class V = PFS_buffer_default_allocator<T> >
399 class PFS_buffer_scalable_container
400 {
401 public:
402   friend class PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>;
403 
404   /**
405     Type of elements in the buffer.
406     The following attributes are required:
407     - pfs_lock m_lock
408     - PFS_opaque_container_page *m_page
409   */
410   typedef T value_type;
411   /**
412     Type of pages in the buffer.
413     The following attributes are required:
414     - PFS_opaque_container *m_container
415   */
416   typedef U array_type;
417   typedef V allocator_type;
418   /** This container type */
419   typedef PFS_buffer_scalable_container<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> container_type;
420   typedef PFS_buffer_const_iterator<T> const_iterator_type;
421   typedef PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V> iterator_type;
422   typedef PFS_buffer_processor<T> processor_type;
423   typedef void (*function_type)(value_type *);
424 
425   static const size_t MAX_SIZE= PFS_PAGE_SIZE*PFS_PAGE_COUNT;
426 
PFS_buffer_scalable_container(allocator_type * allocator)427   PFS_buffer_scalable_container(allocator_type *allocator)
428   {
429     m_allocator= allocator;
430     m_initialized= false;
431     m_lost= 0;
432   }
433 
init(long max_size)434   int init(long max_size)
435   {
436     int i;
437 
438     m_initialized= true;
439     m_full= true;
440     m_max= PFS_PAGE_COUNT * PFS_PAGE_SIZE;
441     m_max_page_count= PFS_PAGE_COUNT;
442     m_last_page_size= PFS_PAGE_SIZE;
443     m_lost= 0;
444     m_monotonic.m_u32= 0;
445     m_max_page_index.m_u32= 0;
446 
447     for (i=0 ; i < PFS_PAGE_COUNT; i++)
448     {
449       m_pages[i]= NULL;
450     }
451 
452     if (max_size == 0)
453     {
454       /* No allocation. */
455       m_max_page_count= 0;
456     }
457     else if (max_size > 0)
458     {
459       if (max_size % PFS_PAGE_SIZE == 0)
460       {
461         m_max_page_count= max_size / PFS_PAGE_SIZE;
462       }
463       else
464       {
465         m_max_page_count= max_size / PFS_PAGE_SIZE + 1;
466         m_last_page_size= max_size % PFS_PAGE_SIZE;
467       }
468       /* Bounded allocation. */
469       m_full= false;
470 
471       if (m_max_page_count > PFS_PAGE_COUNT)
472       {
473         m_max_page_count= PFS_PAGE_COUNT;
474         m_last_page_size= PFS_PAGE_SIZE;
475       }
476     }
477     else
478     {
479       /* max_size = -1 means unbounded allocation */
480       m_full= false;
481     }
482 
483     assert(m_max_page_count <= PFS_PAGE_COUNT);
484     assert(0 < m_last_page_size);
485     assert(m_last_page_size <= PFS_PAGE_SIZE);
486 
487     pthread_mutex_init(& m_critical_section, NULL);
488     return 0;
489   }
490 
cleanup()491   void cleanup()
492   {
493     int i;
494     array_type *page;
495 
496     if (! m_initialized)
497       return;
498 
499     pthread_mutex_lock(& m_critical_section);
500 
501     for (i=0 ; i < PFS_PAGE_COUNT; i++)
502     {
503       page= m_pages[i];
504       if (page != NULL)
505       {
506         m_allocator->free_array(page);
507         delete page;
508         m_pages[i]= NULL;
509       }
510     }
511     pthread_mutex_unlock(& m_critical_section);
512 
513     pthread_mutex_destroy(& m_critical_section);
514 
515     m_initialized= false;
516   }
517 
get_row_count()518   ulong get_row_count()
519   {
520     ulong page_count= PFS_atomic::load_u32(& m_max_page_index.m_u32);
521 
522     return page_count * PFS_PAGE_SIZE;
523   }
524 
get_row_size()525   ulong get_row_size() const
526   {
527     return sizeof(value_type);
528   }
529 
get_memory()530   ulong get_memory()
531   {
532     return get_row_count() * get_row_size();
533   }
534 
allocate(pfs_dirty_state * dirty_state)535   value_type *allocate(pfs_dirty_state *dirty_state)
536   {
537     if (m_full)
538     {
539       m_lost++;
540       return NULL;
541     }
542 
543     uint index;
544     uint monotonic;
545     uint monotonic_max;
546     uint current_page_count;
547     value_type *pfs;
548     array_type *array;
549 
550     void *addr;
551     void * volatile * typed_addr;
552     void *ptr;
553 
554     /*
555       1: Try to find an available record within the existing pages
556     */
557     current_page_count= PFS_atomic::load_u32(& m_max_page_index.m_u32);
558 
559     if (current_page_count != 0)
560     {
561       monotonic= PFS_atomic::load_u32(& m_monotonic.m_u32);
562       monotonic_max= monotonic + current_page_count;
563 
564       while (monotonic < monotonic_max)
565       {
566         /*
567           Scan in the [0 .. current_page_count - 1] range,
568           in parallel with m_monotonic (see below)
569         */
570         index= monotonic % current_page_count;
571 
572         /* Atomic Load, array= m_pages[index] */
573         addr= & m_pages[index];
574         typed_addr= static_cast<void * volatile *>(addr);
575         ptr= my_atomic_loadptr(typed_addr);
576         array= static_cast<array_type *>(ptr);
577 
578         if (array != NULL)
579         {
580           pfs= array->allocate(dirty_state);
581           if (pfs != NULL)
582           {
583             /* Keep a pointer to the parent page, for deallocate(). */
584             pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
585             return pfs;
586           }
587         }
588 
589         /*
590           Parallel scans collaborate to increase
591           the common monotonic scan counter.
592 
593           Note that when all the existing page are full,
594           one thread will eventually add a new page,
595           and cause m_max_page_index to increase,
596           which fools all the modulo logic for scans already in progress,
597           because the monotonic counter is not folded to the same place
598           (sometime modulo N, sometime modulo N+1).
599 
600           This is actually ok: since all the pages are full anyway,
601           there is nothing to miss, so better increase the monotonic
602           counter faster and then move on to the detection of new pages,
603           in part 2: below.
604         */
605         monotonic= PFS_atomic::add_u32(& m_monotonic.m_u32, 1);
606       };
607     }
608 
609     /*
610       2: Try to add a new page, beyond the m_max_page_index limit
611     */
612     while (current_page_count < m_max_page_count)
613     {
614       /* Peek for pages added by collaborating threads */
615 
616       /* (2-a) Atomic Load, array= m_pages[current_page_count] */
617       addr= & m_pages[current_page_count];
618       typed_addr= static_cast<void * volatile *>(addr);
619       ptr= my_atomic_loadptr(typed_addr);
620       array= static_cast<array_type *>(ptr);
621 
622       if (array == NULL)
623       {
624         // ==================================================================
625         // BEGIN CRITICAL SECTION -- buffer expand
626         // ==================================================================
627 
628         /*
629           On a fresh started server, buffers are typically empty.
630           When a sudden load spike is seen by the server,
631           multiple threads may want to expand the buffer at the same time.
632 
633           Using a compare and swap to allow multiple pages to be added,
634           possibly freeing duplicate pages on collisions,
635           does not work well because the amount of code involved
636           when creating a new page can be significant (PFS_thread),
637           causing MANY collisions between (2-b) and (2-d).
638 
639           A huge number of collisions (which can happen when thousands
640           of new connections hits the server after a restart)
641           leads to a huge memory consumption, and to OOM.
642 
643           To mitigate this, we use here a mutex,
644           to enforce that only ONE page is added at a time,
645           so that scaling the buffer happens in a predictable
646           and controlled manner.
647         */
648         pthread_mutex_lock(& m_critical_section);
649 
650         /*
651           Peek again for pages added by collaborating threads,
652           this time as the only thread allowed to expand the buffer
653         */
654 
655         /* (2-b) Atomic Load, array= m_pages[current_page_count] */
656 
657         ptr= my_atomic_loadptr(typed_addr);
658         array= static_cast<array_type *>(ptr);
659 
660         if (array == NULL)
661         {
662           /* (2-c) Found no page, allocate a new one */
663           array= new array_type();
664           builtin_memory_scalable_buffer.count_alloc(sizeof (array_type));
665 
666           array->m_max= get_page_logical_size(current_page_count);
667           int rc= m_allocator->alloc_array(array);
668           if (rc != 0)
669           {
670             m_allocator->free_array(array);
671             delete array;
672             builtin_memory_scalable_buffer.count_free(sizeof (array_type));
673             m_lost++;
674             pthread_mutex_unlock(& m_critical_section);
675             return NULL;
676           }
677 
678           /* Keep a pointer to this container, for static_deallocate(). */
679           array->m_container= reinterpret_cast<PFS_opaque_container *> (this);
680 
681           /* (2-d) Atomic STORE, m_pages[current_page_count] = array  */
682           ptr= array;
683           my_atomic_storeptr(typed_addr, ptr);
684 
685           /* Advertise the new page */
686           PFS_atomic::add_u32(& m_max_page_index.m_u32, 1);
687         }
688 
689         pthread_mutex_unlock(& m_critical_section);
690 
691         // ==================================================================
692         // END CRITICAL SECTION -- buffer expand
693         // ==================================================================
694       }
695 
696       assert(array != NULL);
697       pfs= array->allocate(dirty_state);
698       if (pfs != NULL)
699       {
700         /* Keep a pointer to the parent page, for deallocate(). */
701         pfs->m_page= reinterpret_cast<PFS_opaque_container_page *> (array);
702         return pfs;
703       }
704 
705       current_page_count++;
706     }
707 
708     m_lost++;
709     m_full= true;
710     return NULL;
711   }
712 
deallocate(value_type * safe_pfs)713   void deallocate(value_type *safe_pfs)
714   {
715     /* Find the containing page */
716     PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
717     array_type *page= reinterpret_cast<array_type *> (opaque_page);
718 
719     /* Mark the object free */
720     safe_pfs->m_lock.allocated_to_free();
721 
722     /* Flag the containing page as not full. */
723     page->m_full= false;
724 
725     /* Flag the overall container as not full. */
726     m_full= false;
727   }
728 
static_deallocate(value_type * safe_pfs)729   static void static_deallocate(value_type *safe_pfs)
730   {
731     /* Find the containing page */
732     PFS_opaque_container_page *opaque_page= safe_pfs->m_page;
733     array_type *page= reinterpret_cast<array_type *> (opaque_page);
734 
735     /* Mark the object free */
736     safe_pfs->m_lock.allocated_to_free();
737 
738     /* Flag the containing page as not full. */
739     page->m_full= false;
740 
741     /* Find the containing buffer */
742     PFS_opaque_container *opaque_container= page->m_container;
743     PFS_buffer_scalable_container *container;
744     container= reinterpret_cast<container_type *> (opaque_container);
745 
746     /* Flag the overall container as not full. */
747     container->m_full= false;
748   }
749 
iterate()750   iterator_type iterate()
751   {
752     return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, 0);
753   }
754 
iterate(uint index)755   iterator_type iterate(uint index)
756   {
757     assert(index <= m_max);
758     return PFS_buffer_scalable_iterator<T, PFS_PAGE_SIZE, PFS_PAGE_COUNT, U, V>(this, index);
759   }
760 
apply(function_type fct)761   void apply(function_type fct)
762   {
763     uint i;
764     array_type *page;
765     value_type *pfs;
766     value_type *pfs_last;
767 
768     for (i=0 ; i < PFS_PAGE_COUNT; i++)
769     {
770       page= m_pages[i];
771       if (page != NULL)
772       {
773         pfs= page->get_first();
774         pfs_last= page->get_last();
775 
776         while (pfs < pfs_last)
777         {
778           if (pfs->m_lock.is_populated())
779           {
780             fct(pfs);
781           }
782           pfs++;
783         }
784       }
785     }
786   }
787 
apply_all(function_type fct)788   void apply_all(function_type fct)
789   {
790     uint i;
791     array_type *page;
792     value_type *pfs;
793     value_type *pfs_last;
794 
795     for (i=0 ; i < PFS_PAGE_COUNT; i++)
796     {
797       page= m_pages[i];
798       if (page != NULL)
799       {
800         pfs= page->get_first();
801         pfs_last= page->get_last();
802 
803         while (pfs < pfs_last)
804         {
805           fct(pfs);
806           pfs++;
807         }
808       }
809     }
810   }
811 
apply(processor_type & proc)812   void apply(processor_type & proc)
813   {
814     uint i;
815     array_type *page;
816     value_type *pfs;
817     value_type *pfs_last;
818 
819     for (i=0 ; i < PFS_PAGE_COUNT; i++)
820     {
821       page= m_pages[i];
822       if (page != NULL)
823       {
824         pfs= page->get_first();
825         pfs_last= page->get_last();
826 
827         while (pfs < pfs_last)
828         {
829           if (pfs->m_lock.is_populated())
830           {
831             proc(pfs);
832           }
833           pfs++;
834         }
835       }
836     }
837   }
838 
apply_all(processor_type & proc)839   void apply_all(processor_type & proc)
840   {
841     uint i;
842     array_type *page;
843     value_type *pfs;
844     value_type *pfs_last;
845 
846     for (i=0 ; i < PFS_PAGE_COUNT; i++)
847     {
848       page= m_pages[i];
849       if (page != NULL)
850       {
851         pfs= page->get_first();
852         pfs_last= page->get_last();
853 
854         while (pfs < pfs_last)
855         {
856           proc(pfs);
857           pfs++;
858         }
859       }
860     }
861   }
862 
get(uint index)863   value_type* get(uint index)
864   {
865     assert(index < m_max);
866 
867     uint index_1= index / PFS_PAGE_SIZE;
868     array_type *page= m_pages[index_1];
869     if (page != NULL)
870     {
871       uint index_2= index % PFS_PAGE_SIZE;
872 
873       if (index_2 >= page->m_max)
874       {
875         return NULL;
876       }
877 
878       value_type *pfs= page->m_ptr + index_2;
879 
880       if (pfs->m_lock.is_populated())
881       {
882         return pfs;
883       }
884     }
885 
886     return NULL;
887   }
888 
get(uint index,bool * has_more)889   value_type* get(uint index, bool *has_more)
890   {
891     if (index >= m_max)
892     {
893       *has_more= false;
894       return NULL;
895     }
896 
897     uint index_1= index / PFS_PAGE_SIZE;
898     array_type *page= m_pages[index_1];
899 
900     if (page == NULL)
901     {
902       *has_more= false;
903       return NULL;
904     }
905 
906     uint index_2= index % PFS_PAGE_SIZE;
907 
908     if (index_2 >= page->m_max)
909     {
910       *has_more= false;
911       return NULL;
912     }
913 
914     *has_more= true;
915     value_type *pfs= page->m_ptr + index_2;
916 
917     if (pfs->m_lock.is_populated())
918     {
919       return pfs;
920     }
921 
922     return NULL;
923   }
924 
sanitize(value_type * unsafe)925   value_type *sanitize(value_type *unsafe)
926   {
927     intptr offset;
928     uint i;
929     array_type *page;
930     value_type *pfs;
931     value_type *pfs_last;
932 
933     for (i=0 ; i < PFS_PAGE_COUNT; i++)
934     {
935       page= m_pages[i];
936       if (page != NULL)
937       {
938         pfs= page->get_first();
939         pfs_last= page->get_last();
940 
941         if ((pfs <= unsafe) &&
942             (unsafe < pfs_last))
943         {
944           offset= ((intptr) unsafe - (intptr) pfs) % sizeof(value_type);
945           if (offset == 0)
946             return unsafe;
947         }
948       }
949     }
950 
951     return NULL;
952   }
953 
954   ulong m_lost;
955 
956 private:
957 
get_page_logical_size(uint page_index)958   uint get_page_logical_size(uint page_index)
959   {
960     if (page_index + 1 < m_max_page_count)
961       return PFS_PAGE_SIZE;
962     assert(page_index + 1 == m_max_page_count);
963     return m_last_page_size;
964   }
965 
scan_next(uint & index,uint * found_index)966   value_type* scan_next(uint & index, uint * found_index)
967   {
968     assert(index <= m_max);
969 
970     uint index_1= index / PFS_PAGE_SIZE;
971     uint index_2= index % PFS_PAGE_SIZE;
972     array_type *page;
973     value_type *pfs_first;
974     value_type *pfs;
975     value_type *pfs_last;
976 
977     while (index_1 < PFS_PAGE_COUNT)
978     {
979       page= m_pages[index_1];
980 
981       if (page == NULL)
982       {
983         index= static_cast<uint>(m_max);
984         return NULL;
985       }
986 
987       pfs_first= page->get_first();
988       pfs= pfs_first + index_2;
989       pfs_last= page->get_last();
990 
991       while (pfs < pfs_last)
992       {
993         if (pfs->m_lock.is_populated())
994         {
995           uint found= index_1 * PFS_PAGE_SIZE + static_cast<uint>(pfs - pfs_first);
996           *found_index= found;
997           index= found + 1;
998           return pfs;
999         }
1000         pfs++;
1001       }
1002 
1003       index_1++;
1004       index_2= 0;
1005     }
1006 
1007     index= static_cast<uint>(m_max);
1008     return NULL;
1009   }
1010 
1011   bool m_initialized;
1012   bool m_full;
1013   size_t m_max;
1014   PFS_cacheline_uint32 m_monotonic;
1015   PFS_cacheline_uint32 m_max_page_index;
1016   ulong m_max_page_count;
1017   ulong m_last_page_size;
1018   array_type * m_pages[PFS_PAGE_COUNT];
1019   allocator_type *m_allocator;
1020   pthread_mutex_t m_critical_section;
1021 };
1022 
1023 template <class T, class U, class V>
1024 class PFS_buffer_iterator
1025 {
1026   friend class PFS_buffer_container<T, U, V>;
1027 
1028   typedef T value_type;
1029   typedef PFS_buffer_container<T, U, V> container_type;
1030 
1031 public:
scan_next()1032   value_type* scan_next()
1033   {
1034     uint unused;
1035     return m_container->scan_next(m_index, & unused);
1036   }
1037 
scan_next(uint * found_index)1038   value_type* scan_next(uint * found_index)
1039   {
1040     return m_container->scan_next(m_index, found_index);
1041   }
1042 
1043 private:
PFS_buffer_iterator(container_type * container,uint index)1044   PFS_buffer_iterator(container_type *container, uint index)
1045     : m_container(container),
1046       m_index(index)
1047   {}
1048 
1049   container_type *m_container;
1050   uint m_index;
1051 };
1052 
1053 template <class T, int page_size, int page_count, class U, class V>
1054 class PFS_buffer_scalable_iterator
1055 {
1056   friend class PFS_buffer_scalable_container<T, page_size, page_count, U, V>;
1057 
1058   typedef T value_type;
1059   typedef PFS_buffer_scalable_container<T, page_size, page_count, U, V> container_type;
1060 
1061 public:
scan_next()1062   value_type* scan_next()
1063   {
1064     uint unused;
1065     return m_container->scan_next(m_index, & unused);
1066   }
1067 
scan_next(uint * found_index)1068   value_type* scan_next(uint * found_index)
1069   {
1070     return m_container->scan_next(m_index, found_index);
1071   }
1072 
1073 private:
PFS_buffer_scalable_iterator(container_type * container,uint index)1074   PFS_buffer_scalable_iterator(container_type *container, uint index)
1075     : m_container(container),
1076       m_index(index)
1077   {}
1078 
1079   container_type *m_container;
1080   uint m_index;
1081 };
1082 
1083 template <class T>
1084 class PFS_buffer_processor
1085 {
1086 public:
1087   virtual ~PFS_buffer_processor<T> ()
1088   {}
1089   virtual void operator()(T *element) = 0;
1090 };
1091 
1092 template <class B, int PFS_PARTITION_COUNT>
1093 class PFS_partitioned_buffer_scalable_container
1094 {
1095 public:
1096   friend class PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT>;
1097 
1098   typedef typename B::value_type value_type;
1099   typedef typename B::allocator_type allocator_type;
1100   typedef PFS_partitioned_buffer_scalable_iterator<B, PFS_PARTITION_COUNT> iterator_type;
1101   typedef typename B::iterator_type sub_iterator_type;
1102   typedef typename B::processor_type processor_type;
1103   typedef typename B::function_type function_type;
1104 
PFS_partitioned_buffer_scalable_container(allocator_type * allocator)1105   PFS_partitioned_buffer_scalable_container(allocator_type *allocator)
1106   {
1107     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
1108     {
1109       m_partitions[i]= new B(allocator);
1110     }
1111   }
1112 
~PFS_partitioned_buffer_scalable_container()1113   ~PFS_partitioned_buffer_scalable_container()
1114   {
1115     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
1116     {
1117       delete m_partitions[i];
1118     }
1119   }
1120 
init(long max_size)1121   int init(long max_size)
1122   {
1123     int rc= 0;
1124     // FIXME: we have max_size * PFS_PARTITION_COUNT here
1125     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
1126     {
1127       rc|= m_partitions[i]->init(max_size);
1128     }
1129     return rc;
1130   }
1131 
cleanup()1132   void cleanup()
1133   {
1134     for (int i=0 ; i < PFS_PARTITION_COUNT; i++)
1135     {
1136       m_partitions[i]->cleanup();
1137     }
1138   }
1139 
get_row_count()1140   ulong get_row_count() const
1141   {
1142     ulong sum= 0;
1143 
1144     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1145     {
1146       sum += m_partitions[i]->get_row_count();
1147     }
1148 
1149     return sum;
1150   }
1151 
get_row_size()1152   ulong get_row_size() const
1153   {
1154     return sizeof(value_type);
1155   }
1156 
get_memory()1157   ulong get_memory() const
1158   {
1159     ulong sum= 0;
1160 
1161     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1162     {
1163       sum += m_partitions[i]->get_memory();
1164     }
1165 
1166     return sum;
1167   }
1168 
get_lost_counter()1169   long get_lost_counter()
1170   {
1171     long sum= 0;
1172 
1173     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1174     {
1175       sum += m_partitions[i]->m_lost;
1176     }
1177 
1178     return sum;
1179   }
1180 
allocate(pfs_dirty_state * dirty_state,uint partition)1181   value_type *allocate(pfs_dirty_state *dirty_state, uint partition)
1182   {
1183     assert(partition < PFS_PARTITION_COUNT);
1184 
1185     return m_partitions[partition]->allocate(dirty_state);
1186   }
1187 
deallocate(value_type * safe_pfs)1188   void deallocate(value_type *safe_pfs)
1189   {
1190     /*
1191       One issue here is that we do not know which partition
1192       the record belongs to.
1193       Each record points to the parent page,
1194       and each page points to the parent buffer,
1195       so using static_deallocate here,
1196       which will find the correct partition by itself.
1197     */
1198     B::static_deallocate(safe_pfs);
1199   }
1200 
iterate()1201   iterator_type iterate()
1202   {
1203     return iterator_type(this, 0, 0);
1204   }
1205 
iterate(uint user_index)1206   iterator_type iterate(uint user_index)
1207   {
1208     uint partition_index;
1209     uint sub_index;
1210     unpack_index(user_index, &partition_index, &sub_index);
1211     return iterator_type(this, partition_index, sub_index);
1212   }
1213 
apply(function_type fct)1214   void apply(function_type fct)
1215   {
1216     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1217     {
1218       m_partitions[i]->apply(fct);
1219     }
1220   }
1221 
apply_all(function_type fct)1222   void apply_all(function_type fct)
1223   {
1224     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1225     {
1226       m_partitions[i]->apply_all(fct);
1227     }
1228   }
1229 
apply(processor_type & proc)1230   void apply(processor_type & proc)
1231   {
1232     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1233     {
1234       m_partitions[i]->apply(proc);
1235     }
1236   }
1237 
apply_all(processor_type & proc)1238   void apply_all(processor_type & proc)
1239   {
1240     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1241     {
1242       m_partitions[i]->apply_all(proc);
1243     }
1244   }
1245 
get(uint user_index)1246   value_type* get(uint user_index)
1247   {
1248     uint partition_index;
1249     uint sub_index;
1250     unpack_index(user_index, &partition_index, &sub_index);
1251 
1252     if (partition_index >= PFS_PARTITION_COUNT)
1253     {
1254       return NULL;
1255     }
1256 
1257     return m_partitions[partition_index]->get(sub_index);
1258   }
1259 
get(uint user_index,bool * has_more)1260   value_type* get(uint user_index, bool *has_more)
1261   {
1262     uint partition_index;
1263     uint sub_index;
1264     unpack_index(user_index, &partition_index, &sub_index);
1265 
1266     if (partition_index >= PFS_PARTITION_COUNT)
1267     {
1268       *has_more= false;
1269       return NULL;
1270     }
1271 
1272     *has_more= true;
1273     return m_partitions[partition_index]->get(sub_index);
1274   }
1275 
sanitize(value_type * unsafe)1276   value_type *sanitize(value_type *unsafe)
1277   {
1278     value_type *safe= NULL;
1279 
1280     for (int i=0; i < PFS_PARTITION_COUNT; i++)
1281     {
1282       safe= m_partitions[i]->sanitize(unsafe);
1283       if (safe != NULL)
1284       {
1285         return safe;
1286       }
1287     }
1288 
1289     return safe;
1290   }
1291 
1292 private:
pack_index(uint partition_index,uint sub_index,uint * user_index)1293   static void pack_index(uint partition_index, uint sub_index, uint *user_index)
1294   {
1295     /* 2^8 = 256 partitions max */
1296     compile_time_assert(PFS_PARTITION_COUNT <= (1 << 8));
1297     /* 2^24 = 16777216 max per partitioned buffer. */
1298     compile_time_assert((B::MAX_SIZE) <= (1 << 24));
1299 
1300     *user_index= (partition_index << 24) + sub_index;
1301   }
1302 
unpack_index(uint user_index,uint * partition_index,uint * sub_index)1303   static void unpack_index(uint user_index, uint *partition_index, uint *sub_index)
1304   {
1305     *partition_index= user_index >> 24;
1306     *sub_index= user_index & 0x00FFFFFF;
1307   }
1308 
scan_next(uint & partition_index,uint & sub_index,uint * found_partition,uint * found_sub_index)1309   value_type* scan_next(uint & partition_index, uint & sub_index, uint * found_partition, uint * found_sub_index)
1310   {
1311     value_type *record= NULL;
1312     assert(partition_index < PFS_PARTITION_COUNT);
1313 
1314     while (partition_index < PFS_PARTITION_COUNT)
1315     {
1316       sub_iterator_type sub_iterator= m_partitions[partition_index]->iterate(sub_index);
1317       record= sub_iterator.scan_next(found_sub_index);
1318       if (record != NULL)
1319       {
1320         *found_partition= partition_index;
1321         sub_index= *found_sub_index + 1;
1322         return record;
1323       }
1324 
1325       partition_index++;
1326       sub_index= 0;
1327     }
1328 
1329     *found_partition= PFS_PARTITION_COUNT;
1330     *found_sub_index= 0;
1331     sub_index= 0;
1332     return NULL;
1333   }
1334 
1335   B *m_partitions[PFS_PARTITION_COUNT];
1336 };
1337 
1338 template <class B, int PFS_PARTITION_COUNT>
1339 class PFS_partitioned_buffer_scalable_iterator
1340 {
1341 public:
1342   friend class PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT>;
1343 
1344   typedef typename B::value_type value_type;
1345   typedef PFS_partitioned_buffer_scalable_container<B, PFS_PARTITION_COUNT> container_type;
1346 
scan_next()1347   value_type* scan_next()
1348   {
1349     uint unused_partition;
1350     uint unused_sub_index;
1351     return m_container->scan_next(m_partition, m_sub_index, & unused_partition, & unused_sub_index);
1352   }
1353 
scan_next(uint * found_user_index)1354   value_type* scan_next(uint *found_user_index)
1355   {
1356     uint found_partition;
1357     uint found_sub_index;
1358     value_type *record;
1359     record=  m_container->scan_next(m_partition, m_sub_index, &found_partition, &found_sub_index);
1360     container_type::pack_index(found_partition, found_sub_index, found_user_index);
1361     return record;
1362   }
1363 
1364 private:
PFS_partitioned_buffer_scalable_iterator(container_type * container,uint partition,uint sub_index)1365   PFS_partitioned_buffer_scalable_iterator(container_type *container, uint partition, uint sub_index)
1366     : m_container(container),
1367       m_partition(partition),
1368       m_sub_index(sub_index)
1369   {}
1370 
1371   container_type *m_container;
1372   uint m_partition;
1373   uint m_sub_index;
1374 };
1375 
1376 #ifdef USE_SCALABLE
1377 typedef PFS_buffer_scalable_container<PFS_mutex, 1024, 1024> PFS_mutex_basic_container;
1378 typedef PFS_partitioned_buffer_scalable_container<PFS_mutex_basic_container, PSI_COUNT_VOLATILITY> PFS_mutex_container;
1379 #else
1380 typedef PFS_buffer_container<PFS_mutex> PFS_mutex_container;
1381 #endif
1382 typedef PFS_mutex_container::iterator_type PFS_mutex_iterator;
1383 extern PFS_mutex_container global_mutex_container;
1384 
1385 #ifdef USE_SCALABLE
1386 typedef PFS_buffer_scalable_container<PFS_rwlock, 1024, 1024> PFS_rwlock_container;
1387 #else
1388 typedef PFS_buffer_container<PFS_rwlock> PFS_rwlock_container;
1389 #endif
1390 typedef PFS_rwlock_container::iterator_type PFS_rwlock_iterator;
1391 extern PFS_rwlock_container global_rwlock_container;
1392 
1393 #ifdef USE_SCALABLE
1394 typedef PFS_buffer_scalable_container<PFS_cond, 256, 256> PFS_cond_container;
1395 #else
1396 typedef PFS_buffer_container<PFS_cond> PFS_cond_container;
1397 #endif
1398 typedef PFS_cond_container::iterator_type PFS_cond_iterator;
1399 extern PFS_cond_container global_cond_container;
1400 
1401 #ifdef USE_SCALABLE
1402 typedef PFS_buffer_scalable_container<PFS_file, 4 * 1024, 4 * 1024> PFS_file_container;
1403 #else
1404 typedef PFS_buffer_container<PFS_file> PFS_file_container;
1405 #endif
1406 typedef PFS_file_container::iterator_type PFS_file_iterator;
1407 extern PFS_file_container global_file_container;
1408 
1409 #ifdef USE_SCALABLE
1410 typedef PFS_buffer_scalable_container<PFS_socket, 256, 256> PFS_socket_container;
1411 #else
1412 typedef PFS_buffer_container<PFS_socket> PFS_socket_container;
1413 #endif
1414 typedef PFS_socket_container::iterator_type PFS_socket_iterator;
1415 extern PFS_socket_container global_socket_container;
1416 
1417 #ifdef USE_SCALABLE
1418 typedef PFS_buffer_scalable_container<PFS_metadata_lock, 1024, 1024> PFS_mdl_container;
1419 #else
1420 typedef PFS_buffer_container<PFS_metadata_lock> PFS_mdl_container;
1421 #endif
1422 typedef PFS_mdl_container::iterator_type PFS_mdl_iterator;
1423 extern PFS_mdl_container global_mdl_container;
1424 
1425 #ifdef USE_SCALABLE
1426 typedef PFS_buffer_scalable_container<PFS_setup_actor, 128, 1024> PFS_setup_actor_container;
1427 #else
1428 typedef PFS_buffer_container<PFS_setup_actor> PFS_setup_actor_container;
1429 #endif
1430 typedef PFS_setup_actor_container::iterator_type PFS_setup_actor_iterator;
1431 extern PFS_setup_actor_container global_setup_actor_container;
1432 
1433 #ifdef USE_SCALABLE
1434 typedef PFS_buffer_scalable_container<PFS_setup_object, 128, 1024> PFS_setup_object_container;
1435 #else
1436 typedef PFS_buffer_container<PFS_setup_object> PFS_setup_object_container;
1437 #endif
1438 typedef PFS_setup_object_container::iterator_type PFS_setup_object_iterator;
1439 extern PFS_setup_object_container global_setup_object_container;
1440 
1441 #ifdef USE_SCALABLE
1442 typedef PFS_buffer_scalable_container<PFS_table, 1024, 1024> PFS_table_container;
1443 #else
1444 typedef PFS_buffer_container<PFS_table> PFS_table_container;
1445 #endif
1446 typedef PFS_table_container::iterator_type PFS_table_iterator;
1447 extern PFS_table_container global_table_container;
1448 
1449 #ifdef USE_SCALABLE
1450 typedef PFS_buffer_scalable_container<PFS_table_share, 4 * 1024, 4 * 1024> PFS_table_share_container;
1451 #else
1452 typedef PFS_buffer_container<PFS_table_share> PFS_table_share_container;
1453 #endif
1454 typedef PFS_table_share_container::iterator_type PFS_table_share_iterator;
1455 extern PFS_table_share_container global_table_share_container;
1456 
1457 #ifdef USE_SCALABLE
1458 typedef PFS_buffer_scalable_container<PFS_table_share_index, 8 * 1024, 8 * 1024> PFS_table_share_index_container;
1459 #else
1460 typedef PFS_buffer_container<PFS_table_share_index> PFS_table_share_index_container;
1461 #endif
1462 typedef PFS_table_share_index_container::iterator_type PFS_table_share_index_iterator;
1463 extern PFS_table_share_index_container global_table_share_index_container;
1464 
1465 #ifdef USE_SCALABLE
1466 typedef PFS_buffer_scalable_container<PFS_table_share_lock, 4 * 1024, 4 * 1024> PFS_table_share_lock_container;
1467 #else
1468 typedef PFS_buffer_container<PFS_table_share_lock> PFS_table_share_lock_container;
1469 #endif
1470 typedef PFS_table_share_lock_container::iterator_type PFS_table_share_lock_iterator;
1471 extern PFS_table_share_lock_container global_table_share_lock_container;
1472 
1473 #ifdef USE_SCALABLE
1474 typedef PFS_buffer_scalable_container<PFS_program, 1024, 1024> PFS_program_container;
1475 #else
1476 typedef PFS_buffer_container<PFS_program> PFS_program_container;
1477 #endif
1478 typedef PFS_program_container::iterator_type PFS_program_iterator;
1479 extern PFS_program_container global_program_container;
1480 
1481 #ifdef USE_SCALABLE
1482 typedef PFS_buffer_scalable_container<PFS_prepared_stmt, 1024, 1024> PFS_prepared_stmt_container;
1483 #else
1484 typedef PFS_buffer_container<PFS_prepared_stmt> PFS_prepared_stmt_container;
1485 #endif
1486 typedef PFS_prepared_stmt_container::iterator_type PFS_prepared_stmt_iterator;
1487 extern PFS_prepared_stmt_container global_prepared_stmt_container;
1488 
1489 class PFS_account_array : public PFS_buffer_default_array<PFS_account>
1490 {
1491 public:
1492   PFS_single_stat *m_instr_class_waits_array;
1493   PFS_stage_stat *m_instr_class_stages_array;
1494   PFS_statement_stat *m_instr_class_statements_array;
1495   PFS_transaction_stat *m_instr_class_transactions_array;
1496   PFS_memory_stat *m_instr_class_memory_array;
1497 };
1498 
1499 class PFS_account_allocator
1500 {
1501 public:
1502   int alloc_array(PFS_account_array *array);
1503   void free_array(PFS_account_array *array);
1504 };
1505 
1506 #ifdef USE_SCALABLE
1507 typedef PFS_buffer_scalable_container<PFS_account,
1508                                       128,
1509                                       128,
1510                                       PFS_account_array,
1511                                       PFS_account_allocator> PFS_account_container;
1512 #else
1513 typedef PFS_buffer_container<PFS_account,
1514                              PFS_account_array,
1515                              PFS_account_allocator> PFS_account_container;
1516 #endif
1517 typedef PFS_account_container::iterator_type PFS_account_iterator;
1518 extern PFS_account_container global_account_container;
1519 
1520 class PFS_host_array : public PFS_buffer_default_array<PFS_host>
1521 {
1522 public:
1523   PFS_single_stat *m_instr_class_waits_array;
1524   PFS_stage_stat *m_instr_class_stages_array;
1525   PFS_statement_stat *m_instr_class_statements_array;
1526   PFS_transaction_stat *m_instr_class_transactions_array;
1527   PFS_memory_stat *m_instr_class_memory_array;
1528 };
1529 
1530 class PFS_host_allocator
1531 {
1532 public:
1533   int alloc_array(PFS_host_array *array);
1534   void free_array(PFS_host_array *array);
1535 };
1536 
1537 #ifdef USE_SCALABLE
1538 typedef PFS_buffer_scalable_container<PFS_host,
1539                                       128,
1540                                       128,
1541                                       PFS_host_array,
1542                                       PFS_host_allocator> PFS_host_container;
1543 #else
1544 typedef PFS_buffer_container<PFS_host,
1545                              PFS_host_array,
1546                              PFS_host_allocator> PFS_host_container;
1547 #endif
1548 typedef PFS_host_container::iterator_type PFS_host_iterator;
1549 extern PFS_host_container global_host_container;
1550 
1551 class PFS_thread_array : public PFS_buffer_default_array<PFS_thread>
1552 {
1553 public:
1554   PFS_single_stat *m_instr_class_waits_array;
1555   PFS_stage_stat *m_instr_class_stages_array;
1556   PFS_statement_stat *m_instr_class_statements_array;
1557   PFS_transaction_stat *m_instr_class_transactions_array;
1558   PFS_memory_stat *m_instr_class_memory_array;
1559 
1560   PFS_events_waits *m_waits_history_array;
1561   PFS_events_stages *m_stages_history_array;
1562   PFS_events_statements *m_statements_history_array;
1563   PFS_events_statements *m_statements_stack_array;
1564   PFS_events_transactions *m_transactions_history_array;
1565   char *m_session_connect_attrs_array;
1566 
1567   char *m_current_stmts_text_array;
1568   char *m_history_stmts_text_array;
1569   unsigned char *m_current_stmts_digest_token_array;
1570   unsigned char *m_history_stmts_digest_token_array;
1571 };
1572 
1573 class PFS_thread_allocator
1574 {
1575 public:
1576   int alloc_array(PFS_thread_array *array);
1577   void free_array(PFS_thread_array *array);
1578 };
1579 
1580 #ifdef USE_SCALABLE
1581 typedef PFS_buffer_scalable_container<PFS_thread,
1582                                       256,
1583                                       256,
1584                                       PFS_thread_array,
1585                                       PFS_thread_allocator> PFS_thread_container;
1586 #else
1587 typedef PFS_buffer_container<PFS_thread,
1588                              PFS_thread_array,
1589                              PFS_thread_allocator> PFS_thread_container;
1590 #endif
1591 typedef PFS_thread_container::iterator_type PFS_thread_iterator;
1592 extern PFS_thread_container global_thread_container;
1593 
1594 class PFS_user_array : public PFS_buffer_default_array<PFS_user>
1595 {
1596 public:
1597   PFS_single_stat *m_instr_class_waits_array;
1598   PFS_stage_stat *m_instr_class_stages_array;
1599   PFS_statement_stat *m_instr_class_statements_array;
1600   PFS_transaction_stat *m_instr_class_transactions_array;
1601   PFS_memory_stat *m_instr_class_memory_array;
1602 };
1603 
1604 class PFS_user_allocator
1605 {
1606 public:
1607   int alloc_array(PFS_user_array *array);
1608   void free_array(PFS_user_array *array);
1609 };
1610 
1611 #ifdef USE_SCALABLE
1612 typedef PFS_buffer_scalable_container<PFS_user,
1613                                       128,
1614                                       128,
1615                                       PFS_user_array,
1616                                       PFS_user_allocator> PFS_user_container;
1617 #else
1618 typedef PFS_buffer_container<PFS_user,
1619                              PFS_user_array,
1620                              PFS_user_allocator> PFS_user_container;
1621 #endif
1622 typedef PFS_user_container::iterator_type PFS_user_iterator;
1623 extern PFS_user_container global_user_container;
1624 
1625 #endif
1626 
1627