1 /*
2   Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 #include "my_config.h"
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <stdio.h>
28 
29 #include <inttypes.h>
30 
31 #include "config.h"
32 #include "ndb_configuration.h"
33 #include "Configuration.h"
34 #include "Scheduler.h"
35 #include "workitem.h"
36 #include "ndb_engine.h"
37 #include "debug.h"
38 #include "thread_identifier.h"
39 #include "ndb_worker.h"
40 
41 #include "schedulers/Stockholm.h"
42 #include "schedulers/S_sched.h"
43 #include "schedulers/Scheduler73.h"
44 #include "schedulers/Trondheim.h"
45 
46 #include "ndb_error_logger.h"
47 
48 #define DEFAULT_SCHEDULER Scheduler73::Worker
49 
50 /* globals (exported; also used by workitem.c) */
51 int workitem_class_id;
52 int workitem_actual_inline_buffer_size;
53 
54 /* file-scope private variables */
55 static int pool_slab_class_id;
56 
57 /* Handle to the memcache server API */
58 static SERVER_COOKIE_API * mc_server_handle;
59 
60 /* The private internal structure of a allocation_reference */
61 struct allocation_reference {
62   void * pointer;               /*! allocated region (or next array) */
63   struct {
64     unsigned  is_header   :  1;   /*! is this cell an array header? */
65     unsigned  sys_malloc  :  1;   /*! set for malloc() allocations */
66     unsigned  slab_class  :  6;   /*! slab class of the allocation */
67     unsigned  cells_total : 10;   /*! total cells in this array */
68     unsigned  cells_idx   : 10;   /*! index of next free cell */
69     unsigned  _reserved   :  4;   /*! for future use */
70   } d;
71 };
72 
73 /* declarations of private utility functions: */
74 Scheduler * get_scheduler_instance(ndb_engine *);
75 void init_allocator(ndb_pipeline *);
76 int init_slab_class(allocator_slab_class *c, int size);
77 int malloc_new_slab(allocator_slab_class *c);
78 void init_pool_header(allocation_reference *head, int slab_class);
79 
80 
81 /* The public API */
82 
83 /* Attach a new pipeline to an NDB worker thread.
84    Some initialization has already occurred when the main single-thread startup
85    called get_request_pipeline().  But this is the first call into a pipeline
86    from its worker thread.  It will initialize the thread's identifier, and
87    attach the pipeline to its scheduler.
88 */
ndb_pipeline_initialize(struct ndb_engine * engine)89 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
90   bool did_inc;
91   unsigned int id;
92   thread_identifier * tid;
93 
94   /* Get my pipeline id */
95   do {
96     id = engine->npipelines;
97     did_inc = atomic_cmp_swap_int(& engine->npipelines, id, id + 1);
98   } while(did_inc == false);
99 
100   /* Fetch the partially initialized pipeline */
101   ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
102 
103   /* Sanity checks */
104   assert(self->id == id);
105   assert(self->engine == engine);
106 
107   /* Set the pthread id */
108   self->worker_thread_id = pthread_self();
109 
110   /* Create and set a thread identity */
111   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
112   tid->pipeline = self;
113   sprintf(tid->name, "worker.%d", self->id);
114   set_thread_id(tid);
115 
116   /* Attach the scheduler */
117   self->scheduler->attach_thread(tid);
118 
119   return self;
120 }
121 
122 
123 /* Allocate and initialize a generic request pipeline.
124    In unit test code, this can be called with a NULL engine pointer --
125    it will still initialize a usable slab allocator and memory pool
126    which can be tested.
127 */
get_request_pipeline(int thd_id,struct ndb_engine * engine)128 ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) {
129   /* Allocate the pipeline */
130   ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline));
131 
132   /* Initialize */
133   self->engine = engine;
134   self->id = thd_id;
135   self->nworkitems = 0;
136   mc_server_handle = engine->server.cookie;
137 
138   /* Say hi to the alligator */
139   init_allocator(self);
140 
141   /* Create a memory pool */
142   self->pool = pipeline_create_memory_pool(self);
143 
144   return self;
145 }
146 
147 
148 /* Free all the internal resources of a pipeline.
149 */
ndb_pipeline_free(ndb_pipeline * self)150 void ndb_pipeline_free(ndb_pipeline *self) {
151   delete self->scheduler;
152   memory_pool_free(self->pool);   // frees all items created from pool
153   memory_pool_destroy(self->pool);  // frees the pool itself
154   // TODO: free() all slabs
155   free(self);
156 }
157 
158 
pipeline_add_stats(ndb_pipeline * self,const char * stat_key,ADD_STAT add_stat,const void * cookie)159 void pipeline_add_stats(ndb_pipeline *self,
160                         const char *stat_key,
161                         ADD_STAT add_stat,
162                         const void *cookie) {
163   char key[128];
164 
165   const Configuration & conf = get_Configuration();
166 
167   if(strncasecmp(stat_key,"ndb",3) == 0) {
168     for(unsigned int i = 0 ; i < conf.nclusters ; i ++) {
169       sprintf(key, "cl%d", i);
170       conf.getConnectionPoolById(i)->add_stats(key, add_stat, cookie);
171     }
172   }
173   else if(strncasecmp(stat_key,"errors",6) == 0) {
174     ndb_error_logger_stats(add_stat, cookie);
175     ndbmc_debug_flush();
176     add_stat("log", 3, "flushed", 7, cookie);
177   }
178   else if((strncasecmp(stat_key,"scheduler",9) == 0)
179           || (strncasecmp(stat_key,"reconf",6) == 0)) {
180     self->scheduler->add_stats(stat_key, add_stat, cookie);
181   }
182 }
183 
184 
pipeline_flush_all(ndb_pipeline * self)185 ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
186   return ndb_flush_all(self);
187 }
188 
189 
190 /* The scheduler API */
191 
scheduler_initialize(ndb_pipeline * self,scheduler_options * options)192 bool scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
193   Scheduler *s = 0;
194   const char *cf = self->engine->startup_options.scheduler;
195   options->config_string = 0;
196 
197   if(cf == 0 || *cf == 0) {
198     s = new DEFAULT_SCHEDULER;
199   }
200   else if(!strncasecmp(cf,"stockholm", 9)) {
201     s = new Scheduler_stockholm;
202     options->config_string = & cf[9];
203   }
204   else if(!strncasecmp(cf,"S", 1)) {
205     s = new S::SchedulerWorker;
206     options->config_string = & cf[1];
207   }
208   else if(!strncasecmp(cf,"73", 2)) {
209     s = new Scheduler73::Worker;
210     options->config_string = & cf[2];
211   }
212   else if(!strncasecmp(cf,"trondheim", 9)) {
213     s = new Trondheim::Worker;
214     options->config_string = & cf[9];
215   }
216   else {
217     return false;
218   }
219 
220   s->init(self->id, options);
221   self->scheduler = s;
222 
223   return true;
224 }
225 
226 
scheduler_shutdown(ndb_pipeline * self)227 void scheduler_shutdown(ndb_pipeline *self) {
228   self->scheduler->shutdown();
229 }
230 
231 
scheduler_schedule(ndb_pipeline * self,struct workitem * item)232 ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *self, struct workitem *item) {
233   mc_server_handle->store_engine_specific(item->cookie, item);
234   ENGINE_ERROR_CODE status = self->scheduler->schedule(item);
235   DEBUG_PRINT_DETAIL(" returning %d for workitem %d.%d", (int) status, self->id, item->id);
236   return status;
237 }
238 
239 
scheduler_release(ndb_pipeline * self,struct workitem * item)240 void scheduler_release(ndb_pipeline *self, struct workitem *item) {
241   self->scheduler->release(item);
242 }
243 
244 
item_io_complete(struct workitem * item)245 void item_io_complete(struct workitem *item) {
246   mc_server_handle->notify_io_complete(item->cookie, ENGINE_SUCCESS);
247 }
248 
249 
250 /* The slab allocator API */
251 
pipeline_get_size_class_id(size_t object_size)252 int pipeline_get_size_class_id(size_t object_size) {
253   int cls = 1;
254 
255   if(object_size) {
256     object_size--;
257     while(object_size >>= 1)    /* keep shifting */
258       cls++;
259 
260     if (cls < ALLIGATOR_POWER_SMALLEST)  cls = ALLIGATOR_POWER_SMALLEST;
261     if (cls > ALLIGATOR_POWER_LARGEST)   cls = -1;
262   }
263   else
264     cls = 0;
265 
266   return cls;
267 }
268 
269 
pipeline_alloc(ndb_pipeline * self,int class_id)270 void * pipeline_alloc(ndb_pipeline *self, int class_id) {
271   allocator_slab_class *c;
272   void * ptr = 0;
273 
274   if(class_id < ALLIGATOR_POWER_SMALLEST) return 0;
275   if(class_id > ALLIGATOR_POWER_LARGEST)  return 0;
276 
277   c = & self->alligator[class_id];
278 
279   // common case alloc() is to pop a pointer from the list
280   if(! pthread_mutex_lock(& c->lock)) {
281     if(c->free_idx || malloc_new_slab(c))
282       ptr = c->list[-- c->free_idx];   // pop
283     pthread_mutex_unlock(& c->lock);
284   }
285 
286   return ptr;
287 }
288 
289 
pipeline_free(ndb_pipeline * self,void * ptr,int class_id)290 void pipeline_free(ndb_pipeline *self, void * ptr, int class_id ) {
291   if(class_id < ALLIGATOR_POWER_SMALLEST) return;
292   if(class_id > ALLIGATOR_POWER_LARGEST)  return;
293 
294   allocator_slab_class *c = & self->alligator[class_id];
295 
296   /* begin critical section */
297   if(! pthread_mutex_lock(& c->lock)) {
298     if(c->free_idx == c->list_size) {   /* list is full; must grow */
299       void **new_list;
300       new_list = (void **) realloc(c->list, c->list_size * 2 * sizeof(void *));
301       if(new_list) {
302         c->list = new_list;
303         c->list_size *= 2;
304         c->list[c->free_idx++] = ptr;  // push
305       }
306     }
307     else {
308       // common case free() is simply to push the freed pointer onto the list
309       c->list[c->free_idx++] = ptr;  // push
310     }
311     pthread_mutex_unlock(& c->lock);
312   }
313   /* end critical section */
314 }
315 
316 
317 /*** The high-level (pool) API */
318 
pipeline_create_memory_pool(ndb_pipeline * self)319 memory_pool * pipeline_create_memory_pool(ndb_pipeline *self) {
320   memory_pool *p;
321 
322   /* Use slab class 6 (64 bytes) for the first array in a new pool. */
323   const int initial_slab_class = 6;
324 
325   /* Initialize the global static class id */
326   if(pool_slab_class_id == 0) {
327     pool_slab_class_id = pipeline_get_size_class_id(sizeof(memory_pool));
328   }
329 
330   /* Get a pool header */
331   p = (memory_pool *) pipeline_alloc(self, pool_slab_class_id);
332   p->pipeline = self;
333 
334   /* Get an array. */
335   p->head = (allocation_reference *) pipeline_alloc(self, initial_slab_class);
336 
337   /* Count it in the stats */
338   p->total = (1 << initial_slab_class);  /* just the root array */
339   p->size = 0;
340 
341   /* Initialize the array header */
342   init_pool_header(& p->head[0], initial_slab_class);
343 
344   return p;
345 }
346 
347 
memory_pool_alloc(memory_pool * p,size_t sz)348 void * memory_pool_alloc(memory_pool *p, size_t sz) {
349   if(p->head[0].d.cells_idx == p->head[0].d.cells_total) {
350     /* We must add a new list.  Make it twice as big as the previous one. */
351     allocation_reference *old_head = p->head;
352     int slab_class = old_head->d.slab_class;
353     if(slab_class < ALLIGATOR_POWER_LARGEST) slab_class++;
354 
355     p->head = (allocation_reference *) pipeline_alloc(p->pipeline, slab_class);
356     init_pool_header(p->head, slab_class);
357     p->head->pointer = old_head;
358     p->size += (1 << slab_class);
359   }
360 
361   allocation_reference &r = p->head[p->head->d.cells_idx++];
362 
363   int slab_class = pipeline_get_size_class_id(sz);
364   if(slab_class == -1) {  // large areas use system malloc
365     r.d.sys_malloc = 1;
366     r.pointer = malloc(sz);
367     p->size += sz;
368   }
369   else {  // small areas use slab allocator
370     r.d.sys_malloc = 0;
371     r.d.slab_class = slab_class;
372     r.pointer = pipeline_alloc(p->pipeline, r.d.slab_class);
373     p->size += (1 << r.d.slab_class);
374   }
375 
376   return r.pointer;
377 }
378 
379 
memory_pool_free(memory_pool * pool)380 void memory_pool_free(memory_pool *pool) {
381   allocation_reference *next = pool->head;;
382   allocation_reference *array;
383 
384   pool->total += pool->size; pool->size = 0;  // reset the size counter
385   do {
386     array = next;
387     next = (allocation_reference *) array->pointer;
388     for(unsigned int i = 1; i < array->d.cells_idx ; i++) {  // free each block
389       allocation_reference &r = array[i];
390       if(r.d.sys_malloc)
391         free(r.pointer);
392       else
393         pipeline_free(pool->pipeline, r.pointer, r.d.slab_class);
394     }
395     if(next) {  // if this isn't the last array, free it
396       pipeline_free(pool->pipeline, array, array->d.slab_class);
397     }
398     else {  // reset the slot counter
399       array->d.cells_idx = 1;
400     }
401   } while(next);
402 
403   /* Reset the head */
404   pool->head = array;
405 }
406 
407 
memory_pool_destroy(memory_pool * pool)408 void memory_pool_destroy(memory_pool *pool) {
409   assert(pool_slab_class_id > 0);
410   pipeline_free(pool->pipeline, pool->head, pool->head[0].d.slab_class);
411   pipeline_free(pool->pipeline, pool, pool_slab_class_id);
412 }
413 
414 
415 /* private utility functions follow */
416 
init_allocator(ndb_pipeline * self)417 void init_allocator(ndb_pipeline *self) {
418   for(int i = 0, size = 1 ; i <= ALLIGATOR_POWER_LARGEST ; i++) {
419     init_slab_class(& self->alligator[i], size);
420     size *= 2;
421   }
422 
423   /* Set the static global workitem information, but only once */
424   if(self->id == 0) {
425     workitem_class_id = pipeline_get_size_class_id(sizeof(struct workitem));
426     size_t sz = self->alligator[workitem_class_id].size;
427     workitem_actual_inline_buffer_size =
428       WORKITEM_MIN_INLINE_BUF + (sz - sizeof(struct workitem));
429     DEBUG_PRINT_DETAIL("workitem slab class: %d, inline buffer: %d",
430                 workitem_class_id, workitem_actual_inline_buffer_size);
431   }
432 
433   /* Pre-allocate a new slab for certain special classes. */
434   malloc_new_slab(& self->alligator[5]);  /* for key buffers */
435   malloc_new_slab(& self->alligator[6]);  /* for key buffers and memory pools*/
436   malloc_new_slab(& self->alligator[7]);  /* for key buffers */
437   malloc_new_slab(& self->alligator[8]);  /* for key buffers */
438   if(workitem_class_id > 8)
439     malloc_new_slab(& self->alligator[workitem_class_id]);   /* for workitems */
440   malloc_new_slab(& self->alligator[13]);  /* The 8KB class, for row buffers */
441   malloc_new_slab(& self->alligator[14]);  /* The 16KB class for 13K rows */
442 }
443 
444 
init_slab_class(allocator_slab_class * c,int size)445 int init_slab_class(allocator_slab_class *c, int size) {
446   c->size = size;
447   c->perslab = ALLIGATOR_SLAB_SIZE / size;
448   c->list = 0;
449   c->list_size = 0;
450   c->free_idx = 0;
451   c->total = 0;
452   return pthread_mutex_init(& c->lock, NULL);
453 }
454 
455 
456 /* malloc_new_slab:
457    get a slab from malloc() and add it to a class.
458    once the scheduler has been started, you must hold p->lock to call this.
459 */
malloc_new_slab(allocator_slab_class * c)460 int malloc_new_slab(allocator_slab_class *c) {
461   unsigned int num = c->perslab;
462   void **new_list;
463   char *ptr;
464 
465   if (c->list_size < num) {
466     new_list = (void **) realloc(c->list, num * sizeof(void *));
467     if (new_list == 0)
468       return 0;
469     c->list = new_list;
470     c->list_size = num;
471   }
472 
473   void **cur = c->list;
474   ptr = (char *) malloc(ALLIGATOR_SLAB_SIZE);
475   if (ptr == 0) return 0;
476   for (unsigned int i = 0; i < num; i++) {
477     *cur = ptr;       /* push the pointer onto the list */
478     cur++;            /* bump the list forward one position */
479     ptr += c->size;   /* bump the pointer to the next block */
480   }
481   c->free_idx += num;
482   c->total += ALLIGATOR_SLAB_SIZE;
483 
484   return 1;
485 }
486 
487 /* init_pool_header()
488 */
init_pool_header(allocation_reference * head,int slab_class)489 void init_pool_header(allocation_reference *head, int slab_class) {
490   head->pointer = 0;
491   head->d.is_header = 1;
492   head->d.slab_class = slab_class;
493   head->d.cells_total = (1 << slab_class) / sizeof(allocation_reference);
494   head->d.cells_idx = 1;
495 }
496