1 /*
2 Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24 #include "my_config.h"
25 #include <stdlib.h>
26 #include <pthread.h>
27 #include <stdio.h>
28
29 #include <inttypes.h>
30
31 #include "config.h"
32 #include "ndb_configuration.h"
33 #include "Configuration.h"
34 #include "Scheduler.h"
35 #include "workitem.h"
36 #include "ndb_engine.h"
37 #include "debug.h"
38 #include "thread_identifier.h"
39 #include "ndb_worker.h"
40
41 #include "schedulers/Stockholm.h"
42 #include "schedulers/S_sched.h"
43 #include "schedulers/Scheduler73.h"
44 #include "schedulers/Trondheim.h"
45
46 #include "ndb_error_logger.h"
47
48 #define DEFAULT_SCHEDULER Scheduler73::Worker
49
50 /* globals (exported; also used by workitem.c) */
51 int workitem_class_id;
52 int workitem_actual_inline_buffer_size;
53
54 /* file-scope private variables */
55 static int pool_slab_class_id;
56
57 /* Handle to the memcache server API */
58 static SERVER_COOKIE_API * mc_server_handle;
59
60 /* The private internal structure of a allocation_reference */
61 struct allocation_reference {
62 void * pointer; /*! allocated region (or next array) */
63 struct {
64 unsigned is_header : 1; /*! is this cell an array header? */
65 unsigned sys_malloc : 1; /*! set for malloc() allocations */
66 unsigned slab_class : 6; /*! slab class of the allocation */
67 unsigned cells_total : 10; /*! total cells in this array */
68 unsigned cells_idx : 10; /*! index of next free cell */
69 unsigned _reserved : 4; /*! for future use */
70 } d;
71 };
72
73 /* declarations of private utility functions: */
74 Scheduler * get_scheduler_instance(ndb_engine *);
75 void init_allocator(ndb_pipeline *);
76 int init_slab_class(allocator_slab_class *c, int size);
77 int malloc_new_slab(allocator_slab_class *c);
78 void init_pool_header(allocation_reference *head, int slab_class);
79
80
81 /* The public API */
82
83 /* Attach a new pipeline to an NDB worker thread.
84 Some initialization has already occurred when the main single-thread startup
85 called get_request_pipeline(). But this is the first call into a pipeline
86 from its worker thread. It will initialize the thread's identifier, and
87 attach the pipeline to its scheduler.
88 */
ndb_pipeline_initialize(struct ndb_engine * engine)89 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
90 bool did_inc;
91 unsigned int id;
92 thread_identifier * tid;
93
94 /* Get my pipeline id */
95 do {
96 id = engine->npipelines;
97 did_inc = atomic_cmp_swap_int(& engine->npipelines, id, id + 1);
98 } while(did_inc == false);
99
100 /* Fetch the partially initialized pipeline */
101 ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
102
103 /* Sanity checks */
104 assert(self->id == id);
105 assert(self->engine == engine);
106
107 /* Set the pthread id */
108 self->worker_thread_id = pthread_self();
109
110 /* Create and set a thread identity */
111 tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
112 tid->pipeline = self;
113 sprintf(tid->name, "worker.%d", self->id);
114 set_thread_id(tid);
115
116 /* Attach the scheduler */
117 self->scheduler->attach_thread(tid);
118
119 return self;
120 }
121
122
123 /* Allocate and initialize a generic request pipeline.
124 In unit test code, this can be called with a NULL engine pointer --
125 it will still initialize a usable slab allocator and memory pool
126 which can be tested.
127 */
get_request_pipeline(int thd_id,struct ndb_engine * engine)128 ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) {
129 /* Allocate the pipeline */
130 ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline));
131
132 /* Initialize */
133 self->engine = engine;
134 self->id = thd_id;
135 self->nworkitems = 0;
136 mc_server_handle = engine->server.cookie;
137
138 /* Say hi to the alligator */
139 init_allocator(self);
140
141 /* Create a memory pool */
142 self->pool = pipeline_create_memory_pool(self);
143
144 return self;
145 }
146
147
148 /* Free all the internal resources of a pipeline.
149 */
ndb_pipeline_free(ndb_pipeline * self)150 void ndb_pipeline_free(ndb_pipeline *self) {
151 delete self->scheduler;
152 memory_pool_free(self->pool); // frees all items created from pool
153 memory_pool_destroy(self->pool); // frees the pool itself
154 // TODO: free() all slabs
155 free(self);
156 }
157
158
pipeline_add_stats(ndb_pipeline * self,const char * stat_key,ADD_STAT add_stat,const void * cookie)159 void pipeline_add_stats(ndb_pipeline *self,
160 const char *stat_key,
161 ADD_STAT add_stat,
162 const void *cookie) {
163 char key[128];
164
165 const Configuration & conf = get_Configuration();
166
167 if(strncasecmp(stat_key,"ndb",3) == 0) {
168 for(unsigned int i = 0 ; i < conf.nclusters ; i ++) {
169 sprintf(key, "cl%d", i);
170 conf.getConnectionPoolById(i)->add_stats(key, add_stat, cookie);
171 }
172 }
173 else if(strncasecmp(stat_key,"errors",6) == 0) {
174 ndb_error_logger_stats(add_stat, cookie);
175 ndbmc_debug_flush();
176 add_stat("log", 3, "flushed", 7, cookie);
177 }
178 else if((strncasecmp(stat_key,"scheduler",9) == 0)
179 || (strncasecmp(stat_key,"reconf",6) == 0)) {
180 self->scheduler->add_stats(stat_key, add_stat, cookie);
181 }
182 }
183
184
pipeline_flush_all(ndb_pipeline * self)185 ENGINE_ERROR_CODE pipeline_flush_all(ndb_pipeline *self) {
186 return ndb_flush_all(self);
187 }
188
189
190 /* The scheduler API */
191
scheduler_initialize(ndb_pipeline * self,scheduler_options * options)192 bool scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
193 Scheduler *s = 0;
194 const char *cf = self->engine->startup_options.scheduler;
195 options->config_string = 0;
196
197 if(cf == 0 || *cf == 0) {
198 s = new DEFAULT_SCHEDULER;
199 }
200 else if(!strncasecmp(cf,"stockholm", 9)) {
201 s = new Scheduler_stockholm;
202 options->config_string = & cf[9];
203 }
204 else if(!strncasecmp(cf,"S", 1)) {
205 s = new S::SchedulerWorker;
206 options->config_string = & cf[1];
207 }
208 else if(!strncasecmp(cf,"73", 2)) {
209 s = new Scheduler73::Worker;
210 options->config_string = & cf[2];
211 }
212 else if(!strncasecmp(cf,"trondheim", 9)) {
213 s = new Trondheim::Worker;
214 options->config_string = & cf[9];
215 }
216 else {
217 return false;
218 }
219
220 s->init(self->id, options);
221 self->scheduler = s;
222
223 return true;
224 }
225
226
scheduler_shutdown(ndb_pipeline * self)227 void scheduler_shutdown(ndb_pipeline *self) {
228 self->scheduler->shutdown();
229 }
230
231
scheduler_schedule(ndb_pipeline * self,struct workitem * item)232 ENGINE_ERROR_CODE scheduler_schedule(ndb_pipeline *self, struct workitem *item) {
233 mc_server_handle->store_engine_specific(item->cookie, item);
234 ENGINE_ERROR_CODE status = self->scheduler->schedule(item);
235 DEBUG_PRINT_DETAIL(" returning %d for workitem %d.%d", (int) status, self->id, item->id);
236 return status;
237 }
238
239
scheduler_release(ndb_pipeline * self,struct workitem * item)240 void scheduler_release(ndb_pipeline *self, struct workitem *item) {
241 self->scheduler->release(item);
242 }
243
244
item_io_complete(struct workitem * item)245 void item_io_complete(struct workitem *item) {
246 mc_server_handle->notify_io_complete(item->cookie, ENGINE_SUCCESS);
247 }
248
249
250 /* The slab allocator API */
251
pipeline_get_size_class_id(size_t object_size)252 int pipeline_get_size_class_id(size_t object_size) {
253 int cls = 1;
254
255 if(object_size) {
256 object_size--;
257 while(object_size >>= 1) /* keep shifting */
258 cls++;
259
260 if (cls < ALLIGATOR_POWER_SMALLEST) cls = ALLIGATOR_POWER_SMALLEST;
261 if (cls > ALLIGATOR_POWER_LARGEST) cls = -1;
262 }
263 else
264 cls = 0;
265
266 return cls;
267 }
268
269
pipeline_alloc(ndb_pipeline * self,int class_id)270 void * pipeline_alloc(ndb_pipeline *self, int class_id) {
271 allocator_slab_class *c;
272 void * ptr = 0;
273
274 if(class_id < ALLIGATOR_POWER_SMALLEST) return 0;
275 if(class_id > ALLIGATOR_POWER_LARGEST) return 0;
276
277 c = & self->alligator[class_id];
278
279 // common case alloc() is to pop a pointer from the list
280 if(! pthread_mutex_lock(& c->lock)) {
281 if(c->free_idx || malloc_new_slab(c))
282 ptr = c->list[-- c->free_idx]; // pop
283 pthread_mutex_unlock(& c->lock);
284 }
285
286 return ptr;
287 }
288
289
pipeline_free(ndb_pipeline * self,void * ptr,int class_id)290 void pipeline_free(ndb_pipeline *self, void * ptr, int class_id ) {
291 if(class_id < ALLIGATOR_POWER_SMALLEST) return;
292 if(class_id > ALLIGATOR_POWER_LARGEST) return;
293
294 allocator_slab_class *c = & self->alligator[class_id];
295
296 /* begin critical section */
297 if(! pthread_mutex_lock(& c->lock)) {
298 if(c->free_idx == c->list_size) { /* list is full; must grow */
299 void **new_list;
300 new_list = (void **) realloc(c->list, c->list_size * 2 * sizeof(void *));
301 if(new_list) {
302 c->list = new_list;
303 c->list_size *= 2;
304 c->list[c->free_idx++] = ptr; // push
305 }
306 }
307 else {
308 // common case free() is simply to push the freed pointer onto the list
309 c->list[c->free_idx++] = ptr; // push
310 }
311 pthread_mutex_unlock(& c->lock);
312 }
313 /* end critical section */
314 }
315
316
317 /*** The high-level (pool) API */
318
pipeline_create_memory_pool(ndb_pipeline * self)319 memory_pool * pipeline_create_memory_pool(ndb_pipeline *self) {
320 memory_pool *p;
321
322 /* Use slab class 6 (64 bytes) for the first array in a new pool. */
323 const int initial_slab_class = 6;
324
325 /* Initialize the global static class id */
326 if(pool_slab_class_id == 0) {
327 pool_slab_class_id = pipeline_get_size_class_id(sizeof(memory_pool));
328 }
329
330 /* Get a pool header */
331 p = (memory_pool *) pipeline_alloc(self, pool_slab_class_id);
332 p->pipeline = self;
333
334 /* Get an array. */
335 p->head = (allocation_reference *) pipeline_alloc(self, initial_slab_class);
336
337 /* Count it in the stats */
338 p->total = (1 << initial_slab_class); /* just the root array */
339 p->size = 0;
340
341 /* Initialize the array header */
342 init_pool_header(& p->head[0], initial_slab_class);
343
344 return p;
345 }
346
347
memory_pool_alloc(memory_pool * p,size_t sz)348 void * memory_pool_alloc(memory_pool *p, size_t sz) {
349 if(p->head[0].d.cells_idx == p->head[0].d.cells_total) {
350 /* We must add a new list. Make it twice as big as the previous one. */
351 allocation_reference *old_head = p->head;
352 int slab_class = old_head->d.slab_class;
353 if(slab_class < ALLIGATOR_POWER_LARGEST) slab_class++;
354
355 p->head = (allocation_reference *) pipeline_alloc(p->pipeline, slab_class);
356 init_pool_header(p->head, slab_class);
357 p->head->pointer = old_head;
358 p->size += (1 << slab_class);
359 }
360
361 allocation_reference &r = p->head[p->head->d.cells_idx++];
362
363 int slab_class = pipeline_get_size_class_id(sz);
364 if(slab_class == -1) { // large areas use system malloc
365 r.d.sys_malloc = 1;
366 r.pointer = malloc(sz);
367 p->size += sz;
368 }
369 else { // small areas use slab allocator
370 r.d.sys_malloc = 0;
371 r.d.slab_class = slab_class;
372 r.pointer = pipeline_alloc(p->pipeline, r.d.slab_class);
373 p->size += (1 << r.d.slab_class);
374 }
375
376 return r.pointer;
377 }
378
379
memory_pool_free(memory_pool * pool)380 void memory_pool_free(memory_pool *pool) {
381 allocation_reference *next = pool->head;;
382 allocation_reference *array;
383
384 pool->total += pool->size; pool->size = 0; // reset the size counter
385 do {
386 array = next;
387 next = (allocation_reference *) array->pointer;
388 for(unsigned int i = 1; i < array->d.cells_idx ; i++) { // free each block
389 allocation_reference &r = array[i];
390 if(r.d.sys_malloc)
391 free(r.pointer);
392 else
393 pipeline_free(pool->pipeline, r.pointer, r.d.slab_class);
394 }
395 if(next) { // if this isn't the last array, free it
396 pipeline_free(pool->pipeline, array, array->d.slab_class);
397 }
398 else { // reset the slot counter
399 array->d.cells_idx = 1;
400 }
401 } while(next);
402
403 /* Reset the head */
404 pool->head = array;
405 }
406
407
memory_pool_destroy(memory_pool * pool)408 void memory_pool_destroy(memory_pool *pool) {
409 assert(pool_slab_class_id > 0);
410 pipeline_free(pool->pipeline, pool->head, pool->head[0].d.slab_class);
411 pipeline_free(pool->pipeline, pool, pool_slab_class_id);
412 }
413
414
415 /* private utility functions follow */
416
init_allocator(ndb_pipeline * self)417 void init_allocator(ndb_pipeline *self) {
418 for(int i = 0, size = 1 ; i <= ALLIGATOR_POWER_LARGEST ; i++) {
419 init_slab_class(& self->alligator[i], size);
420 size *= 2;
421 }
422
423 /* Set the static global workitem information, but only once */
424 if(self->id == 0) {
425 workitem_class_id = pipeline_get_size_class_id(sizeof(struct workitem));
426 size_t sz = self->alligator[workitem_class_id].size;
427 workitem_actual_inline_buffer_size =
428 WORKITEM_MIN_INLINE_BUF + (sz - sizeof(struct workitem));
429 DEBUG_PRINT_DETAIL("workitem slab class: %d, inline buffer: %d",
430 workitem_class_id, workitem_actual_inline_buffer_size);
431 }
432
433 /* Pre-allocate a new slab for certain special classes. */
434 malloc_new_slab(& self->alligator[5]); /* for key buffers */
435 malloc_new_slab(& self->alligator[6]); /* for key buffers and memory pools*/
436 malloc_new_slab(& self->alligator[7]); /* for key buffers */
437 malloc_new_slab(& self->alligator[8]); /* for key buffers */
438 if(workitem_class_id > 8)
439 malloc_new_slab(& self->alligator[workitem_class_id]); /* for workitems */
440 malloc_new_slab(& self->alligator[13]); /* The 8KB class, for row buffers */
441 malloc_new_slab(& self->alligator[14]); /* The 16KB class for 13K rows */
442 }
443
444
init_slab_class(allocator_slab_class * c,int size)445 int init_slab_class(allocator_slab_class *c, int size) {
446 c->size = size;
447 c->perslab = ALLIGATOR_SLAB_SIZE / size;
448 c->list = 0;
449 c->list_size = 0;
450 c->free_idx = 0;
451 c->total = 0;
452 return pthread_mutex_init(& c->lock, NULL);
453 }
454
455
456 /* malloc_new_slab:
457 get a slab from malloc() and add it to a class.
458 once the scheduler has been started, you must hold p->lock to call this.
459 */
malloc_new_slab(allocator_slab_class * c)460 int malloc_new_slab(allocator_slab_class *c) {
461 unsigned int num = c->perslab;
462 void **new_list;
463 char *ptr;
464
465 if (c->list_size < num) {
466 new_list = (void **) realloc(c->list, num * sizeof(void *));
467 if (new_list == 0)
468 return 0;
469 c->list = new_list;
470 c->list_size = num;
471 }
472
473 void **cur = c->list;
474 ptr = (char *) malloc(ALLIGATOR_SLAB_SIZE);
475 if (ptr == 0) return 0;
476 for (unsigned int i = 0; i < num; i++) {
477 *cur = ptr; /* push the pointer onto the list */
478 cur++; /* bump the list forward one position */
479 ptr += c->size; /* bump the pointer to the next block */
480 }
481 c->free_idx += num;
482 c->total += ALLIGATOR_SLAB_SIZE;
483
484 return 1;
485 }
486
487 /* init_pool_header()
488 */
init_pool_header(allocation_reference * head,int slab_class)489 void init_pool_header(allocation_reference *head, int slab_class) {
490 head->pointer = 0;
491 head->d.is_header = 1;
492 head->d.slab_class = slab_class;
493 head->d.cells_total = (1 << slab_class) / sizeof(allocation_reference);
494 head->d.cells_idx = 1;
495 }
496