1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26
27 #include <my_config.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <stdio.h>
31 #include <sys/errno.h>
32 #define __STDC_FORMAT_MACROS
33 #include <inttypes.h>
34
35 /* Memcache headers */
36 #include "memcached/types.h"
37 #include <memcached/extension_loggers.h>
38
39 #include "timing.h"
40 #include "debug.h"
41 #include "Configuration.h"
42 #include "thread_identifier.h"
43 #include "workitem.h"
44 #include "ndb_worker.h"
45 #include "ndb_engine_errors.h"
46 #include "ndb_error_logger.h"
47
48 #include "S_sched.h"
49
50 extern "C" {
51 void * run_send_thread(void *v);
52 void * run_poll_thread(void *v);
53 }
54
55 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
56
57 /* Scheduler Global singleton */
58 static S::SchedulerGlobal * s_global;
59
60 /* SchedulerGlobal methods */
SchedulerGlobal(int _nthreads)61 S::SchedulerGlobal::SchedulerGlobal(int _nthreads) :
62 GlobalConfigManager(_nthreads)
63 {
64 }
65
66
init(const scheduler_options * sched_opts)67 void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
68 DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
69
70 /* Set member variables */
71 config_string = sched_opts->config_string;
72 parse_config_string(nthreads, config_string);
73 options.max_clients = sched_opts->max_clients;
74
75 /* Fetch or initialize clusters */
76 nclusters = conf->nclusters;
77 clusters = new Cluster * [nclusters];
78 for(int i = 0 ; i < nclusters ; i++) {
79 ClusterConnectionPool *pool = conf->getConnectionPoolById(i);
80 Cluster *c = (Cluster *) pool->getCustomData();
81 if(c == 0) {
82 c = new Cluster(this, i);
83 pool->setCustomData(c);
84 }
85 clusters[i] = c;
86 c->nreferences += 1;
87 }
88
89 /* Initialize the WorkerConnections */
90 for(int t = 0 ; t < nthreads ; t++) {
91 for(int c = 0 ; c < nclusters ; c++) {
92 WorkerConnection **wc_cell = getWorkerConnectionPtr(t, c);
93 * wc_cell = new WorkerConnection(this, t, c);
94 }
95 }
96
97 /* Build Configurations for WorkerConnections */
98 configureSchedulers();
99
100 /* Start the send & poll threads for each connection */
101 for(int i = 0 ; i < nclusters ; i++)
102 clusters[i]->startThreads();
103
104 /* Log message for startup */
105 logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
106 "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
107 options.n_connections, options.force_send,
108 options.auto_grow, options.send_timer);
109
110 /* Now Running */
111 running = true;
112 }
113
114
shutdown()115 void S::SchedulerGlobal::shutdown() {
116 if(running) {
117 logger->log(LOG_INFO, 0, "Shutting down scheduler.");
118
119 /* First shut down each WorkerConnection */
120 for(int i = 0; i < nclusters ; i++) {
121 for(int j = 0; j < options.n_worker_threads; j++) {
122 S::WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
123 wc->sendqueue->abort();
124 }
125 }
126
127 /* Release each Cluster (and its Connections) */
128 for(int i = 0; i < nclusters ; i++) {
129 Cluster *c = clusters[i];
130 if ( --(c->nreferences) == 0) {
131 delete c;
132 conf->getConnectionPoolById(i)->setCustomData(0);
133 }
134 }
135
136 /* Then actually delete each WorkerConnection */
137 for(int i = 0; i < nclusters ; i++) {
138 for(int j = 0; j < options.n_worker_threads; j++) {
139 delete * (getWorkerConnectionPtr(j, i));
140 * (getWorkerConnectionPtr(j, i)) = 0;
141 }
142 }
143
144 /* Shutdown now */
145 logger->log(LOG_WARNING, 0, "Shutdown completed.");
146 running = false;
147 }
148 }
149
150
parse_config_string(int nthreads,const char * str)151 void S::SchedulerGlobal::parse_config_string(int nthreads, const char *str) {
152
153 /* Initialize the configuration default values */
154 options.n_worker_threads = nthreads;
155 options.n_connections = 0; // 0 = n_connections based on db-stored config
156 options.force_send = 0; // 0 = force send always off
157 options.send_timer = 1; // 1 = 1 ms. timer in send thread
158 options.auto_grow = 1; // 1 = allow NDB instance pool to grow on demand
159
160 if(str) {
161 const char *s = str;
162 char letter;
163 int value;
164
165 /* tolerate a ':' at the start of the string */
166 if( *s == ':') s++;
167
168 while(*s != '\0' && sscanf(s, "%c%d", &letter, &value) == 2) {
169 switch(letter) {
170 case 'c':
171 options.n_connections = value;
172 break;
173 case 'f':
174 options.force_send = value;
175 break;
176 case 'g':
177 options.auto_grow = value;
178 break;
179 case 't':
180 options.send_timer = value;
181 break;
182 }
183 /* Skip over the part just read */
184 s += 1; // the letter
185 while(isdigit(*s)) s++; // the value
186
187 /* Now tolerate a comma */
188 if(*s == ',') s++;
189 }
190 }
191
192 /* Test validity of configuration */
193 if(options.force_send < 0 || options.force_send > 2) {
194 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
195 assert(options.force_send >= 0 || options.force_send <= 2);
196 }
197 if(options.n_connections < 0 || options.n_connections > 4) {
198 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
199 assert(options.n_connections >= 0 && options.n_connections <= 4);
200 }
201 if(options.send_timer < 1 || options.send_timer > 10) {
202 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
203 assert(options.send_timer >= 1 && options.send_timer <= 10);
204 }
205 if(options.auto_grow < 0 || options.auto_grow > 1) {
206 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
207 assert(options.auto_grow == 0 || options.auto_grow == 1);
208 }
209 }
210
211
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)212 void S::SchedulerGlobal::add_stats(const char *stat_key,
213 ADD_STAT add_stat,
214 const void *cookie) {
215 if(strncasecmp(stat_key, "reconf", 6) == 0) {
216 WorkerConnection ** wc = getWorkerConnectionPtr(0,0);
217 (* wc)->add_stats(stat_key, add_stat, cookie);
218 }
219 else {
220 DEBUG_PRINT(" scheduler");
221 for(int c = 0 ; c < nclusters ; c++) {
222 clusters[c]->add_stats(stat_key, add_stat, cookie);
223 }
224 }
225 }
226
227
228 /* SchedulerWorker methods */
229
init(int my_thread,const scheduler_options * options)230 void S::SchedulerWorker::init(int my_thread,
231 const scheduler_options * options) {
232 /* On the first call in, initialize the SchedulerGlobal.
233 * This will start the send & poll threads for each connection.
234 */
235 if(my_thread == 0) {
236 s_global = new SchedulerGlobal(options->nthreads);
237 s_global->init(options);
238 }
239
240 /* Initialize member variables */
241 id = my_thread;
242 }
243
244
shutdown()245 void S::SchedulerWorker::shutdown() {
246 if(id == 0)
247 s_global->shutdown();
248 }
249
250
~SchedulerWorker()251 S::SchedulerWorker::~SchedulerWorker() {
252 if(id == 0)
253 delete s_global;
254 }
255
256
schedule(workitem * item)257 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
258 int c = item->prefix_info.cluster_id;
259 ENGINE_ERROR_CODE response_code;
260 NdbInstance *inst = 0;
261 const KeyPrefix *pfx;
262 S::WorkerConnection *wc;
263
264 wc = * (s_global->getWorkerConnectionPtr(id, c));
265 if(wc == 0) return ENGINE_FAILED;
266
267 if(wc->freelist) { /* Get the next NDB from the freelist. */
268 inst = wc->freelist;
269 wc->freelist = inst->next;
270 }
271 else { /* No free NDBs. */
272 if(wc->sendqueue->is_aborted()) {
273 return ENGINE_TMPFAIL;
274 }
275 else { /* Try to make an NdbInstance on the fly */
276 inst = wc->newNdbInstance();
277 if(inst) {
278 log_app_error(& AppError29024_autogrow);
279 }
280 else {
281 /* We have hit a hard maximum. Eventually Scheduler::io_completed()
282 will run _in this thread_ and return an NDB to the freelist.
283 But no other thread can free one, so here we return an error.
284 */
285 log_app_error(& AppError29002_NoNDBs);
286 return ENGINE_TMPFAIL;
287 }
288 }
289 }
290
291 assert(inst);
292 inst->link_workitem(item);
293
294 // Fetch the query plan for this prefix.
295 pfx = wc->setQueryPlanInWorkitem(item);
296 if(! item->plan) {
297 DEBUG_PRINT("getPlanForPrefix() failure");
298 return ENGINE_FAILED;
299 }
300
301 // Build the NDB transaction
302 op_status_t op_status = worker_prepare_operation(item);
303
304 // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
305 if(op_status == op_prepared) {
306 /* Put the prepared item onto a send queue */
307 wc->sendqueue->produce(inst);
308 DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
309
310 /* This locking is explained in run_ndb_send_thread() */
311 if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock
312 wc->conn->sem.counter++; // increment
313 pthread_cond_signal( & wc->conn->sem.not_zero); // signal
314 pthread_mutex_unlock( & wc->conn->sem.lock); // release
315 }
316
317 response_code = ENGINE_EWOULDBLOCK;
318 }
319 else {
320 /* Status is not op_prepared, but rather some error status */
321 response_code = item->status->status;
322 }
323
324 return response_code;
325 }
326
327
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)328 void S::SchedulerWorker::prepare(NdbTransaction * tx,
329 NdbTransaction::ExecType execType,
330 NdbAsynchCallback callback,
331 workitem * item, prepare_flags flags) {
332 tx->executeAsynchPrepare(execType, callback, (void *) item);
333 if(flags == RESCHEDULE) item->base.reschedule = 1;
334 }
335
336
close(NdbTransaction * tx,workitem * item)337 void S::SchedulerWorker::close(NdbTransaction *tx, workitem *item) {
338 Uint64 nwaits_pre, nwaits_post;
339 Ndb * & ndb = item->ndb_instance->db;
340
341 nwaits_pre = ndb->getClientStat(Ndb::WaitExecCompleteCount);
342 tx->close();
343 nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
344
345 if(nwaits_post > nwaits_pre)
346 log_app_error(& AppError29023_SyncClose);
347 }
348
349
350 /* Release the resources used by an operation.
351 Unlink the NdbInstance from the workitem, and return it to the free list
352 (or free it, if the scheduler is shutting down).
353 */
release(workitem * item)354 void S::SchedulerWorker::release(workitem *item) {
355 DEBUG_ENTER();
356 NdbInstance *inst = item->ndb_instance;
357
358 if(inst) {
359 inst->unlink_workitem(item);
360 int c = item->prefix_info.cluster_id;
361 S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
362 if(wc && ! wc->sendqueue->is_aborted()) {
363 inst->next = wc->freelist;
364 wc->freelist = inst;
365 // DEBUG_PRINT("Returned NdbInstance to freelist.");
366 }
367 else {
368 /* We are in the midst of shutting down (and possibly reconfiguring) */
369 delete inst;
370 }
371 }
372 }
373
374
global_reconfigure(Configuration * new_cf)375 bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
376 return s_global->reconfigure(new_cf);
377 }
378
379
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)380 void S::SchedulerWorker::add_stats(const char *stat_key,
381 ADD_STAT add_stat,
382 const void *cookie) {
383 s_global->add_stats(stat_key, add_stat, cookie);
384 }
385
386
387 /* Cluster methods */
Cluster(SchedulerGlobal * global,int _id)388 S::Cluster::Cluster(SchedulerGlobal *global, int _id) :
389 threads_started(false),
390 cluster_id(_id),
391 nreferences(0)
392 {
393 DEBUG_PRINT("%d", cluster_id);
394
395 /* How many cluster connections are wanted?
396 If options.n_connections is zero (the default) we want one connection
397 per 50,000 desired TPS. (The default for TPS is 100,000 -- so, two
398 connections). But if a number is specified in the config, use that
399 instead.
400 */
401 if(global->options.n_connections) {
402 nconnections = global->options.n_connections;
403 }
404 else {
405 const int connection_tps = 50000;
406 nconnections = global->conf->max_tps / connection_tps;
407 if(global->conf->max_tps % connection_tps) nconnections += 1;
408 }
409 assert(nconnections > 0);
410
411 /* Get our connection pool */
412 ClusterConnectionPool *pool = global->conf->getConnectionPoolById(cluster_id);
413
414 /* Some NDB Cluster Connections are already open;
415 if we want more, try to add them now. */
416 // TODO: If you reconfigure too many times you run out of connections ...
417 DEBUG_PRINT("Cluster %d, have %d connection(s), want %d",
418 cluster_id, pool->getPoolSize(), nconnections);
419 for(int i = pool->getPoolSize(); i < nconnections ; i ++) {
420 Ndb_cluster_connection * c = pool->addPooledConnection();
421 if(c == 0) {
422 /* unable to create any more connections */
423 nconnections = i;
424 break;
425 }
426 }
427
428 logger->log(LOG_WARNING, 0, "Scheduler: using %d connection%s to cluster %d\n",
429 nconnections, nconnections == 1 ? "" : "s", cluster_id);
430
431 /* Instantiate the Connection objects */
432 connections = new S::Connection * [nconnections];
433 for(int i = 0; i < nconnections ; i++) {
434 connections[i] = new S::Connection(*this, i);
435 }
436 }
437
438
startThreads()439 void S::Cluster::startThreads() {
440 /* Threads are started only once and persist across reconfiguration.
441 But, this method will be called again for each reconf. */
442 if(threads_started == false) {
443 for(int i = 0 ; i < nconnections; i++) {
444 connections[i]->startThreads();
445 }
446 threads_started = true;
447 }
448 }
449
450
~Cluster()451 S::Cluster::~Cluster() {
452 DEBUG_PRINT("Shutting down cluster %d", cluster_id);
453 for(int i = 0; i < nconnections ; i++) {
454 delete connections[i];
455 }
456 }
457
458
getWorkerConnectionPtr(int thd) const459 S::WorkerConnection ** S::Cluster::getWorkerConnectionPtr(int thd) const {
460 return s_global->getWorkerConnectionPtr(thd, cluster_id);
461 }
462
463
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)464 void S::Cluster::add_stats(const char *stat_key,
465 ADD_STAT add_stat,
466 const void *cookie) {
467 for(int c = 0 ; c < nconnections ; c++) {
468 connections[c]->add_stats(stat_key, add_stat, cookie);
469 }
470 }
471
472
473 /* WorkerConnection methods */
474
475
newNdbInstance()476 NdbInstance * S::WorkerConnection::newNdbInstance() {
477 NdbInstance *inst = 0;
478 if(instances.current < instances.max) {
479 inst = new NdbInstance(conn->conn, 2);
480 instances.current++;
481 inst->id = ((id.thd + 1) * 10000) + instances.current;
482 }
483 return inst;
484 }
485
486
WorkerConnection(SchedulerGlobal * global,int thd_id,int cluster_id)487 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
488 int thd_id, int cluster_id) :
489 SchedulerConfigManager(thd_id, cluster_id)
490 {
491 S::Cluster *cl = global->clusters[cluster_id];
492
493 id.thd = thd_id;
494 id.cluster = cluster_id;
495 id.conn = thd_id % cl->nconnections; // round-robin assignment
496 conn = cl->connections[id.conn];
497 id.node = conn->node_id;
498
499 /* How many NDB instances to start initially */
500 instances.initial = conn->instances.initial / conn->n_workers;
501
502 /* Maximum size of send queue, and upper bound on NDB instances */
503 instances.max = conn->instances.max / conn->n_workers;
504
505 /* Build the freelist */
506 freelist = 0;
507 for(instances.current = 0; instances.current < instances.initial; ) {
508 NdbInstance *inst = newNdbInstance();
509 inst->next = freelist;
510 freelist = inst;
511 }
512
513 DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
514 id.cluster, id.conn, id.node, id.thd, instances.current);
515
516 /* Initialize the sendqueue */
517 sendqueue = new Queue<NdbInstance>(instances.max);
518
519 /* Hoard a transaction (an API connect record) for each Ndb object. This
520 * first call to startTransaction() will send TC_SEIZEREQ and wait for a
521 * reply, but later at runtime startTransaction() should return immediately.
522 */
523 NdbTransaction ** txlist = new NdbTransaction * [instances.current];
524 int i = 0;
525
526 // Open them all.
527 for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
528 NdbTransaction *tx;
529 tx = inst->db->startTransaction();
530 if(! tx) log_ndb_error(inst->db->getNdbError());
531 txlist[i] = tx;
532 }
533
534 // Close them all.
535 for(i = 0 ; i < instances.current ; i++) {
536 if(txlist[i])
537 txlist[i]->close();
538 }
539
540 // Free the list.
541 delete[] txlist;
542 }
543
544
~WorkerConnection()545 S::WorkerConnection::~WorkerConnection() {
546 DEBUG_ENTER_METHOD("S::WorkerConnection::~WorkerConnection");
547
548 /* Delete all of the Ndbs that are not currently in use */
549 NdbInstance *inst = freelist;
550 while(inst != 0) {
551 NdbInstance *next = inst->next;
552 delete inst;
553 inst = next;
554 }
555
556 /* Delete the sendqueue */
557 delete sendqueue;
558 }
559
560
561 /* Connection methods */
562
Connection(S::Cluster & _cl,int _id)563 S::Connection::Connection(S::Cluster & _cl, int _id) :
564 cluster(_cl), id(_id)
565 {
566 S::SchedulerGlobal *global = s_global;
567 Configuration *conf = global->conf;
568 n_total_workers = global->options.n_worker_threads;
569
570 /* Get the connection pool for my cluster */
571 ClusterConnectionPool *pool = conf->getConnectionPoolById(cluster.cluster_id);
572
573 /* Get my connection from the pool */
574 conn = pool->getPooledConnection(id);
575 node_id = conn->node_id();
576
577 /* Set the timer on the adaptive send thread */
578 conn->set_max_adaptive_send_time(global->options.send_timer);
579
580 /* How many worker threads will use this connection? */
581 n_workers = global->options.n_worker_threads / cluster.nconnections;
582 if(n_total_workers % cluster.nconnections > id) n_workers += 1;
583
584 /* How many NDB objects are needed for the desired performance? */
585 double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
586 instances.initial = (int) (total_ndb_objects / cluster.nconnections);
587 while(instances.initial % n_workers) instances.initial++; // round up
588
589 /* The maximum number of NDB objects.
590 * This is used to configure hard limits on the size of the waitgroup,
591 * the sentqueue, and the reschedulequeue -- and it will not be
592 * possible to increase those limits during online reconfig.
593 */
594 instances.max = instances.initial;
595 // allow the pool to grow on demand?
596 if(global->options.auto_grow)
597 instances.max = (int) (instances.max * 1.6);
598 // max_clients imposes a hard upper limit
599 if(instances.max > (global->options.max_clients / cluster.nconnections))
600 instances.max = global->options.max_clients / cluster.nconnections;
601 // instances.initial might also be subject to the max_clients limit
602 if(instances.initial > instances.max)
603 instances.initial = instances.max;
604
605 /* Get a multi-wait Poll Group */
606 pollgroup = conn->create_ndb_wait_group(instances.max);
607
608 /* Initialize the statistics */
609 stats.sent_operations = 0;
610 stats.batches = 0;
611 stats.timeout_races = 0;
612
613 /* Initialize the semaphore */
614 pthread_mutex_init(& sem.lock, NULL);
615 init_condition_var(& sem.not_zero);
616 sem.counter = 0;
617
618 /* Initialize the queues for sent and resceduled items */
619 sentqueue = new Queue<NdbInstance>(instances.max);
620 reschedulequeue = new Queue<NdbInstance>(instances.max);
621 }
622
623
startThreads()624 void S::Connection::startThreads() {
625 /* Start the poll thread */
626 pthread_create( & poll_thread_id, NULL, run_poll_thread, (void *) this);
627
628 /* Start the send thread */
629 pthread_create( & send_thread_id, NULL, run_send_thread, (void *) this);
630 }
631
632
~Connection()633 S::Connection::~Connection() {
634 /* Shut down a connection.
635 The send thread should send everything in its queue.
636 The poll thread should wait for everything in its waitgroup.
637 Then they should both shut down.
638 */
639 DEBUG_ENTER_METHOD("S::Connection::~Connection");
640 pthread_join(send_thread_id, NULL);
641 DEBUG_PRINT("Cluster %d connection %d send thread has quit.",
642 cluster.cluster_id, id);
643
644 pthread_join(poll_thread_id, NULL);
645 DEBUG_PRINT("Cluster %d connection %d poll thread has quit.",
646 cluster.cluster_id, id);
647
648 /* Delete the queues */
649 assert(sentqueue->is_aborted());
650 delete sentqueue;
651 delete reschedulequeue;
652
653 /* Delete the semaphore */
654 pthread_cond_destroy(& sem.not_zero);
655 pthread_mutex_destroy(& sem.lock);
656
657 /* Release the multiwait group */
658 conn->release_ndb_wait_group(pollgroup);
659 }
660
661
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)662 void S::Connection::add_stats(const char *stat_key,
663 ADD_STAT add_stat,
664 const void *cookie) {
665 char key[128];
666 char val[128];
667 int klen, vlen;
668
669 klen = sprintf(key, "cl%d.conn%d.sent_operations", cluster.cluster_id, id);
670 vlen = sprintf(val, "%llu", stats.sent_operations);
671 add_stat(key, klen, val, vlen, cookie);
672
673 klen = sprintf(key, "cl%d.conn%d.batches", cluster.cluster_id, id);
674 vlen = sprintf(val, "%llu", stats.batches);
675 add_stat(key, klen, val, vlen, cookie);
676
677 klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
678 vlen = sprintf(val, "%llu", stats.timeout_races);
679 add_stat(key, klen, val, vlen, cookie);
680
681 klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
682 vlen = sprintf(val, "%d", instances.initial);
683 add_stat(key, klen, val, vlen, cookie);
684
685 klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
686 vlen = sprintf(val, "%d", instances.max);
687 add_stat(key, klen, val, vlen, cookie);
688 }
689
690
691 /*
692 Some design features of the send thread
693
694 1: When a worker thread has an item ready to send, it tries to acquire
695 the mutex and post to the semaphore. The send thread sleeps on the
696 semaphore's condition variable waiting for a worker to post to it.
697 But if a worker thread finds the mutex already locked, it simply
698 skips posting the semaphore; some other thread must be posting anyway.
699 This sets up a possible race where a worker may queue an item but the
700 send thread misses it. Therefore the send thread always sets a timeout
701 when waiting, and always examines the queues after the timer expires.
702
703 2: The occurence of the race described above is recorded in the
704 stats.timeout_races counter.
705
706 3: How long is the timeout? It varies from a low value when the server is
707 busy to a high one when idle. Also, when busy, we try to reduce the number
708 of calls to gettimeofday() or clock_gettime() to one per timeout_msec ms
709 rather than one per iteration.
710 */
run_ndb_send_thread()711 void * S::Connection::run_ndb_send_thread() {
712 /* Set thread identity */
713 thread_identifier tid;
714 tid.pipeline = 0;
715 snprintf(tid.name, THD_ID_NAME_LEN,
716 "cl%d.conn%d.send", cluster.cluster_id, id);
717 set_thread_id(&tid);
718
719 DEBUG_ENTER();
720
721 NdbInstance *readylist; /* list of items fetched from queues */
722 int nready = 0; /* number of items on the readylist */
723 int nsent = 0; /* number sent in this iteration */
724 int c_wait = 0; /* return value from pthread_cond_timedwait() */
725 struct timespec timer;
726 const int timeout_min = 200; /* "busy" server timeout */
727 const int timeout_max = 3200; /* "idle" server timeout */
728 int timeout_msec = timeout_min;
729 int shutting_down = 0;
730
731 while(1) {
732 if(nsent == 0) { /* nothing sent last time through the loop */
733 if(shutting_down) {
734 sentqueue->abort();
735 pollgroup->wakeup();
736 return 0;
737 }
738
739 if(timeout_msec < timeout_max) {
740 timeout_msec *= 2; /* progress from "busy" towards "idle" */
741 }
742 timespec_get_time(& timer);
743 timespec_add_msec(& timer, timeout_msec);
744 }
745
746 /* Acquire the semaphore */
747 pthread_mutex_lock(& sem.lock);
748 if(sem.counter == 0) {
749 c_wait = pthread_cond_timedwait(& sem.not_zero, & sem.lock, & timer);
750 }
751 sem.counter = 0;
752 pthread_mutex_unlock(& sem.lock);
753
754 /* There are several queues that may have NDBs ready for sending.
755 Examine all of them, and consolidate all of the ready NDBs into a
756 single list. */
757 nready = 0;
758 readylist = 0;
759
760 /* First check the reschedule queue */
761 nready += get_operations_from_queue(& readylist, reschedulequeue);
762
763 /* Then the worker thread queues */
764 for(int w = id; w < n_total_workers; w += cluster.nconnections) {
765 S::WorkerConnection *wc = * (cluster.getWorkerConnectionPtr(w));
766 DEBUG_ASSERT(wc->id.conn == id);
767 nready += get_operations_from_queue(& readylist, wc->sendqueue);
768 if(wc->sendqueue->is_aborted()) {
769 shutting_down = 1;
770 }
771 }
772
773 /* Now walk the readylist. Send pending operations from the NDBs there,
774 then place them on the sent-items queue for the poll thread. */
775 nsent = 0;
776 if(nready) {
777 for(NdbInstance *inst = readylist; inst != NULL ; inst = inst->next) {
778 int force = 0;
779 if(nready == 1 && s_global->options.force_send == 1) {
780 force = 1; // force-send the last item in the list
781 }
782
783 /* Send the operations */
784 inst->db->sendPreparedTransactions(force);
785 DEBUG_PRINT("Sent %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
786
787 /* Give the instance to the poll thread */
788 sentqueue->produce(inst);
789
790 nsent += 1;
791 nready -= 1;
792 }
793
794 stats.batches += 1;
795 stats.sent_operations += nsent;
796 if(c_wait == ETIMEDOUT) {
797 stats.timeout_races += 1;
798 }
799
800 pollgroup->wakeup();
801
802 timeout_msec = timeout_min; /* we are now "busy" */
803 }
804 }
805 }
806
807
get_operations_from_queue(NdbInstance ** readylist,Queue<NdbInstance> * q)808 int S::Connection::get_operations_from_queue(NdbInstance **readylist,
809 Queue<NdbInstance> *q) {
810 int n = 0;
811 NdbInstance *inst;
812 while((inst = q->consume()) != NULL) {
813 assert(inst->db);
814 inst->next = *readylist;
815 *readylist = inst;
816 n++;
817 }
818 return n;
819 }
820
821
run_ndb_poll_thread()822 void * S::Connection::run_ndb_poll_thread() {
823 /* Set thread identity */
824 thread_identifier tid;
825 tid.pipeline = 0;
826 snprintf(tid.name, THD_ID_NAME_LEN,
827 "cl%d.conn%d.poll", cluster.cluster_id, id);
828 set_thread_id(&tid);
829
830 DEBUG_ENTER();
831
832 NdbInstance *inst;
833 Ndb ** ready_list;
834 int wait_timeout_millisec = 5000;
835 int min_ready;
836 int in_flight = 0;
837
838 while(1) {
839 if(in_flight == 0 && sentqueue->is_aborted()) {
840 return 0;
841 }
842
843 int n_added = 0;
844 /* Add new NDBs to the poll group */
845 while((inst = sentqueue->consume()) != 0) {
846 assert(inst->db);
847 inst->next = 0;
848 DEBUG_PRINT(" ** adding %d.%d to wait group ** ",
849 inst->wqitem->pipeline->id, inst->wqitem->id);
850 pollgroup->addNdb(inst->db);
851 n_added++;
852 in_flight++;
853 }
854
855 /* What's the minimum number of ready Ndb's to wake up for? */
856 int n = n_added / 4;
857 min_ready = n > 0 ? n : 1;
858
859 /* Wait until something is ready to poll */
860 int nwaiting = pollgroup->wait(ready_list, wait_timeout_millisec, min_ready);
861
862 /* Poll the ones that are ready */
863 if(nwaiting > 0) {
864 for(int i = 0; i < nwaiting ; i++) {
865 in_flight--;
866 assert(in_flight >= 0);
867 Ndb *db = ready_list[i];
868 inst = (NdbInstance *) db->getCustomData();
869 DEBUG_PRINT("Polling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
870 db->pollNdb(0, 1);
871
872 if(inst->wqitem->base.reschedule) {
873 DEBUG_PRINT("Rescheduling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
874 inst->wqitem->base.reschedule = 0;
875 reschedulequeue->produce(inst); // Put it on the reschedule queue
876 if(pthread_mutex_trylock( & sem.lock) == 0) {
877 sem.counter++;
878 pthread_cond_signal(& sem.not_zero); // Ping the send thread
879 pthread_mutex_unlock(& sem.lock);
880 }
881 }
882 else {
883 // Scheduler yielded. Notify memcached that the operation is complete.
884 DEBUG_PRINT("item_io_complete for %d.%d",
885 inst->wqitem->pipeline->id, inst->wqitem->id);
886 item_io_complete(inst->wqitem);
887 }
888 }
889 }
890 }
891 return 0; /* not reached */
892 return 0; /* not reached */
893 }
894
895
run_send_thread(void * v)896 void * run_send_thread(void *v) {
897 S::Connection *c = (S::Connection *) v;
898 return c->run_ndb_send_thread();
899 }
900
run_poll_thread(void * v)901 void * run_poll_thread(void *v) {
902 S::Connection *c = (S::Connection *) v;
903 return c->run_ndb_poll_thread();
904 }
905
906
907