1 /*
2 Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "my_config.h"
26 #include <stdio.h>
27 #include <ctype.h>
28 #include <stdio.h>
29 #include <sys/errno.h>
30 #include <inttypes.h>
31
32 /* Memcache headers */
33 #include "memcached/types.h"
34 #include <memcached/extension_loggers.h>
35
36 #include "timing.h"
37 #include "debug.h"
38 #include "Configuration.h"
39 #include "thread_identifier.h"
40 #include "workitem.h"
41 #include "ndb_worker.h"
42 #include "ndb_engine_errors.h"
43 #include "ndb_error_logger.h"
44
45 #include "S_sched.h"
46
47 extern "C" {
48 void * run_send_thread(void *v);
49 void * run_poll_thread(void *v);
50 }
51
52 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
53
54 /* Scheduler Global singleton */
55 static S::SchedulerGlobal * s_global;
56
57 /* SchedulerGlobal methods */
SchedulerGlobal(int _nthreads)58 S::SchedulerGlobal::SchedulerGlobal(int _nthreads) :
59 GlobalConfigManager(_nthreads)
60 {
61 }
62
63
init(const scheduler_options * sched_opts)64 void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
65 DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
66
67 /* Set member variables */
68 config_string = sched_opts->config_string;
69 parse_config_string(nthreads, config_string);
70 options.max_clients = sched_opts->max_clients;
71
72 /* Fetch or initialize clusters */
73 nclusters = conf->nclusters;
74 clusters = new Cluster * [nclusters];
75 for(int i = 0 ; i < nclusters ; i++) {
76 ClusterConnectionPool *pool = conf->getConnectionPoolById(i);
77 Cluster *c = (Cluster *) pool->getCustomData();
78 if(c == 0) {
79 c = new Cluster(this, i);
80 pool->setCustomData(c);
81 }
82 clusters[i] = c;
83 c->nreferences += 1;
84 }
85
86 /* Initialize the WorkerConnections */
87 for(int t = 0 ; t < nthreads ; t++) {
88 for(int c = 0 ; c < nclusters ; c++) {
89 WorkerConnection **wc_cell = getWorkerConnectionPtr(t, c);
90 * wc_cell = new WorkerConnection(this, t, c);
91 }
92 }
93
94 /* Build Configurations for WorkerConnections */
95 configureSchedulers();
96
97 /* Start the send & poll threads for each connection */
98 for(int i = 0 ; i < nclusters ; i++)
99 clusters[i]->startThreads();
100
101 /* Log message for startup */
102 logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
103 "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
104 options.n_connections, options.force_send,
105 options.auto_grow, options.send_timer);
106
107 /* Now Running */
108 running = true;
109 }
110
111
shutdown()112 void S::SchedulerGlobal::shutdown() {
113 if(running) {
114 logger->log(LOG_INFO, 0, "Shutting down scheduler.");
115
116 /* First shut down each WorkerConnection */
117 for(int i = 0; i < nclusters ; i++) {
118 for(int j = 0; j < options.n_worker_threads; j++) {
119 S::WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
120 wc->sendqueue->abort();
121 }
122 }
123
124 /* Release each Cluster (and its Connections) */
125 for(int i = 0; i < nclusters ; i++) {
126 Cluster *c = clusters[i];
127 if ( --(c->nreferences) == 0) {
128 delete c;
129 conf->getConnectionPoolById(i)->setCustomData(0);
130 }
131 }
132
133 /* Then actually delete each WorkerConnection */
134 for(int i = 0; i < nclusters ; i++) {
135 for(int j = 0; j < options.n_worker_threads; j++) {
136 delete * (getWorkerConnectionPtr(j, i));
137 * (getWorkerConnectionPtr(j, i)) = 0;
138 }
139 }
140
141 /* Shutdown now */
142 logger->log(LOG_WARNING, 0, "Shutdown completed.");
143 running = false;
144 }
145 }
146
147
parse_config_string(int nthreads,const char * str)148 void S::SchedulerGlobal::parse_config_string(int nthreads, const char *str) {
149
150 /* Initialize the configuration default values */
151 options.n_worker_threads = nthreads;
152 options.n_connections = 0; // 0 = n_connections based on db-stored config
153 options.force_send = 0; // 0 = force send always off
154 options.send_timer = 1; // 1 = 1 ms. timer in send thread
155 options.auto_grow = 1; // 1 = allow NDB instance pool to grow on demand
156
157 if(str) {
158 const char *s = str;
159 char letter;
160 int value;
161
162 /* tolerate a ':' at the start of the string */
163 if( *s == ':') s++;
164
165 while(*s != '\0' && sscanf(s, "%c%d", &letter, &value) == 2) {
166 switch(letter) {
167 case 'c':
168 options.n_connections = value;
169 break;
170 case 'f':
171 options.force_send = value;
172 break;
173 case 'g':
174 options.auto_grow = value;
175 break;
176 case 't':
177 options.send_timer = value;
178 break;
179 }
180 /* Skip over the part just read */
181 s += 1; // the letter
182 while(isdigit(*s)) s++; // the value
183
184 /* Now tolerate a comma */
185 if(*s == ',') s++;
186 }
187 }
188
189 /* Test validity of configuration */
190 if(options.force_send < 0 || options.force_send > 2) {
191 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
192 assert(options.force_send >= 0 && options.force_send <= 2);
193 }
194 if(options.n_connections < 0 || options.n_connections > 4) {
195 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
196 assert(options.n_connections >= 0 && options.n_connections <= 4);
197 }
198 if(options.send_timer < 1 || options.send_timer > 10) {
199 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
200 assert(options.send_timer >= 1 && options.send_timer <= 10);
201 }
202 if(options.auto_grow < 0 || options.auto_grow > 1) {
203 logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
204 assert(options.auto_grow == 0 || options.auto_grow == 1);
205 }
206 }
207
208
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)209 void S::SchedulerGlobal::add_stats(const char *stat_key,
210 ADD_STAT add_stat,
211 const void *cookie) {
212 if(strncasecmp(stat_key, "reconf", 6) == 0) {
213 WorkerConnection ** wc = getWorkerConnectionPtr(0,0);
214 (* wc)->add_stats(stat_key, add_stat, cookie);
215 }
216 else {
217 DEBUG_PRINT(" scheduler");
218 for(int c = 0 ; c < nclusters ; c++) {
219 clusters[c]->add_stats(stat_key, add_stat, cookie);
220 }
221 }
222 }
223
224
225 /* SchedulerWorker methods */
226
init(int my_thread,const scheduler_options * options)227 void S::SchedulerWorker::init(int my_thread,
228 const scheduler_options * options) {
229 /* On the first call in, initialize the SchedulerGlobal.
230 * This will start the send & poll threads for each connection.
231 */
232 if(my_thread == 0) {
233 s_global = new SchedulerGlobal(options->nthreads);
234 s_global->init(options);
235 }
236
237 /* Initialize member variables */
238 id = my_thread;
239 }
240
241
shutdown()242 void S::SchedulerWorker::shutdown() {
243 if(id == 0)
244 s_global->shutdown();
245 }
246
247
~SchedulerWorker()248 S::SchedulerWorker::~SchedulerWorker() {
249 if(id == 0)
250 delete s_global;
251 }
252
253
schedule(workitem * item)254 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
255 int c = item->prefix_info.cluster_id;
256 ENGINE_ERROR_CODE response_code;
257 NdbInstance *inst = 0;
258 const KeyPrefix *pfx;
259 S::WorkerConnection *wc;
260
261 wc = * (s_global->getWorkerConnectionPtr(id, c));
262 if(wc == 0) return ENGINE_FAILED;
263
264 if(wc->freelist) { /* Get the next NDB from the freelist. */
265 inst = wc->freelist;
266 wc->freelist = inst->next;
267 }
268 else { /* No free NDBs. */
269 if(wc->sendqueue->is_aborted()) {
270 return ENGINE_TMPFAIL;
271 }
272 else { /* Try to make an NdbInstance on the fly */
273 inst = wc->newNdbInstance();
274 if(inst) {
275 log_app_error(& AppError29024_autogrow);
276 }
277 else {
278 /* We have hit a hard maximum. Eventually Scheduler::io_completed()
279 will run _in this thread_ and return an NDB to the freelist.
280 But no other thread can free one, so here we return an error.
281 */
282 log_app_error(& AppError29002_NoNDBs);
283 return ENGINE_TMPFAIL;
284 }
285 }
286 }
287
288 assert(inst);
289 inst->link_workitem(item);
290
291 // Fetch the query plan for this prefix.
292 pfx = wc->setQueryPlanInWorkitem(item);
293 if(! item->plan) {
294 DEBUG_PRINT("getPlanForPrefix() failure");
295 return ENGINE_FAILED;
296 }
297
298 // Build the NDB transaction
299 op_status_t op_status = worker_prepare_operation(item);
300
301 // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
302 if(op_status == op_prepared) {
303 /* Put the prepared item onto a send queue */
304 wc->sendqueue->produce(inst);
305 DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
306
307 /* This locking is explained in run_ndb_send_thread() */
308 if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) { // try the lock
309 wc->conn->sem.counter++; // increment
310 pthread_cond_signal( & wc->conn->sem.not_zero); // signal
311 pthread_mutex_unlock( & wc->conn->sem.lock); // release
312 }
313
314 response_code = ENGINE_EWOULDBLOCK;
315 }
316 else {
317 /* Status is not op_prepared, but rather some error status */
318 response_code = item->status->status;
319 }
320
321 return response_code;
322 }
323
324
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)325 void S::SchedulerWorker::prepare(NdbTransaction * tx,
326 NdbTransaction::ExecType execType,
327 NdbAsynchCallback callback,
328 workitem * item, prepare_flags flags) {
329 tx->executeAsynchPrepare(execType, callback, (void *) item);
330 if(flags == RESCHEDULE) item->base.reschedule = 1;
331 }
332
333
close(NdbTransaction * tx,workitem * item)334 void S::SchedulerWorker::close(NdbTransaction *tx, workitem *item) {
335 Uint64 nwaits_pre, nwaits_post;
336 Ndb * & ndb = item->ndb_instance->db;
337
338 nwaits_pre = ndb->getClientStat(Ndb::WaitExecCompleteCount);
339 tx->close();
340 nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
341
342 if(nwaits_post > nwaits_pre)
343 log_app_error(& AppError29023_SyncClose);
344 }
345
346
347 /* Release the resources used by an operation.
348 Unlink the NdbInstance from the workitem, and return it to the free list
349 (or free it, if the scheduler is shutting down).
350 */
release(workitem * item)351 void S::SchedulerWorker::release(workitem *item) {
352 DEBUG_ENTER();
353 NdbInstance *inst = item->ndb_instance;
354
355 if(inst) {
356 inst->unlink_workitem(item);
357 int c = item->prefix_info.cluster_id;
358 S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
359 if(wc && ! wc->sendqueue->is_aborted()) {
360 inst->next = wc->freelist;
361 wc->freelist = inst;
362 // DEBUG_PRINT("Returned NdbInstance to freelist.");
363 }
364 else {
365 /* We are in the midst of shutting down (and possibly reconfiguring) */
366 delete inst;
367 }
368 }
369 }
370
371
global_reconfigure(Configuration * new_cf)372 bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
373 return s_global->reconfigure(new_cf);
374 }
375
376
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)377 void S::SchedulerWorker::add_stats(const char *stat_key,
378 ADD_STAT add_stat,
379 const void *cookie) {
380 s_global->add_stats(stat_key, add_stat, cookie);
381 }
382
383
384 /* Cluster methods */
Cluster(SchedulerGlobal * global,int _id)385 S::Cluster::Cluster(SchedulerGlobal *global, int _id) :
386 threads_started(false),
387 cluster_id(_id),
388 nreferences(0)
389 {
390 DEBUG_PRINT("%d", cluster_id);
391
392 /* How many cluster connections are wanted?
393 If options.n_connections is zero (the default) we want one connection
394 per 50,000 desired TPS. (The default for TPS is 100,000 -- so, two
395 connections). But if a number is specified in the config, use that
396 instead.
397 */
398 if(global->options.n_connections) {
399 nconnections = global->options.n_connections;
400 }
401 else {
402 const int connection_tps = 50000;
403 nconnections = global->conf->max_tps / connection_tps;
404 if(global->conf->max_tps % connection_tps) nconnections += 1;
405 }
406 assert(nconnections > 0);
407
408 /* Get our connection pool */
409 ClusterConnectionPool *pool = global->conf->getConnectionPoolById(cluster_id);
410
411 /* Some NDB Cluster Connections are already open;
412 if we want more, try to add them now. */
413 // TODO: If you reconfigure too many times you run out of connections ...
414 DEBUG_PRINT("Cluster %d, have %d connection(s), want %d",
415 cluster_id, pool->getPoolSize(), nconnections);
416 for(int i = pool->getPoolSize(); i < nconnections ; i ++) {
417 Ndb_cluster_connection * c = pool->addPooledConnection();
418 if(c == 0) {
419 /* unable to create any more connections */
420 nconnections = i;
421 break;
422 }
423 }
424
425 logger->log(LOG_WARNING, 0, "Scheduler: using %d connection%s to cluster %d\n",
426 nconnections, nconnections == 1 ? "" : "s", cluster_id);
427
428 /* Instantiate the Connection objects */
429 connections = new S::Connection * [nconnections];
430 for(int i = 0; i < nconnections ; i++) {
431 connections[i] = new S::Connection(*this, i);
432 }
433 }
434
435
startThreads()436 void S::Cluster::startThreads() {
437 /* Threads are started only once and persist across reconfiguration.
438 But, this method will be called again for each reconf. */
439 if(threads_started == false) {
440 for(int i = 0 ; i < nconnections; i++) {
441 connections[i]->startThreads();
442 }
443 threads_started = true;
444 }
445 }
446
447
~Cluster()448 S::Cluster::~Cluster() {
449 DEBUG_PRINT("Shutting down cluster %d", cluster_id);
450 for(int i = 0; i < nconnections ; i++) {
451 delete connections[i];
452 }
453 }
454
455
getWorkerConnectionPtr(int thd) const456 S::WorkerConnection ** S::Cluster::getWorkerConnectionPtr(int thd) const {
457 return s_global->getWorkerConnectionPtr(thd, cluster_id);
458 }
459
460
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)461 void S::Cluster::add_stats(const char *stat_key,
462 ADD_STAT add_stat,
463 const void *cookie) {
464 for(int c = 0 ; c < nconnections ; c++) {
465 connections[c]->add_stats(stat_key, add_stat, cookie);
466 }
467 }
468
469
470 /* WorkerConnection methods */
471
472
newNdbInstance()473 NdbInstance * S::WorkerConnection::newNdbInstance() {
474 NdbInstance *inst = 0;
475 if(instances.current < instances.max) {
476 inst = new NdbInstance(conn->conn, 2);
477 instances.current++;
478 inst->id = ((id.thd + 1) * 10000) + instances.current;
479 }
480 return inst;
481 }
482
483
WorkerConnection(SchedulerGlobal * global,int thd_id,int cluster_id)484 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
485 int thd_id, int cluster_id) :
486 SchedulerConfigManager(thd_id, cluster_id)
487 {
488 S::Cluster *cl = global->clusters[cluster_id];
489
490 id.thd = thd_id;
491 id.cluster = cluster_id;
492 id.conn = thd_id % cl->nconnections; // round-robin assignment
493 conn = cl->connections[id.conn];
494 id.node = conn->node_id;
495
496 /* How many NDB instances to start initially */
497 instances.initial = conn->instances.initial / conn->n_workers;
498
499 /* Maximum size of send queue, and upper bound on NDB instances */
500 instances.max = conn->instances.max / conn->n_workers;
501
502 /* Build the freelist */
503 freelist = 0;
504 for(instances.current = 0; instances.current < instances.initial; ) {
505 NdbInstance *inst = newNdbInstance();
506 inst->next = freelist;
507 freelist = inst;
508 }
509
510 DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
511 id.cluster, id.conn, id.node, id.thd, instances.current);
512
513 /* Initialize the sendqueue */
514 sendqueue = new Queue<NdbInstance>(instances.max);
515
516 /* Hoard a transaction (an API connect record) for each Ndb object. This
517 * first call to startTransaction() will send TC_SEIZEREQ and wait for a
518 * reply, but later at runtime startTransaction() should return immediately.
519 */
520 NdbTransaction ** txlist = new NdbTransaction * [instances.current];
521 int i = 0;
522
523 // Open them all.
524 for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
525 NdbTransaction *tx;
526 tx = inst->db->startTransaction();
527 if(! tx) log_ndb_error(inst->db->getNdbError());
528 txlist[i] = tx;
529 }
530
531 // Close them all.
532 for(i = 0 ; i < instances.current ; i++) {
533 if(txlist[i])
534 txlist[i]->close();
535 }
536
537 // Free the list.
538 delete[] txlist;
539 }
540
541
~WorkerConnection()542 S::WorkerConnection::~WorkerConnection() {
543 DEBUG_ENTER_METHOD("S::WorkerConnection::~WorkerConnection");
544
545 /* Delete all of the Ndbs that are not currently in use */
546 NdbInstance *inst = freelist;
547 while(inst != 0) {
548 NdbInstance *next = inst->next;
549 delete inst;
550 inst = next;
551 }
552
553 /* Delete the sendqueue */
554 delete sendqueue;
555 }
556
557
558 /* Connection methods */
559
Connection(S::Cluster & _cl,int _id)560 S::Connection::Connection(S::Cluster & _cl, int _id) :
561 cluster(_cl), id(_id)
562 {
563 S::SchedulerGlobal *global = s_global;
564 Configuration *conf = global->conf;
565 n_total_workers = global->options.n_worker_threads;
566
567 /* Get the connection pool for my cluster */
568 ClusterConnectionPool *pool = conf->getConnectionPoolById(cluster.cluster_id);
569
570 /* Get my connection from the pool */
571 conn = pool->getPooledConnection(id);
572 node_id = conn->node_id();
573
574 /* Set the timer on the adaptive send thread */
575 conn->set_max_adaptive_send_time(global->options.send_timer);
576
577 /* How many worker threads will use this connection? */
578 n_workers = global->options.n_worker_threads / cluster.nconnections;
579 if(n_total_workers % cluster.nconnections > id) n_workers += 1;
580
581 /* How many NDB objects are needed for the desired performance? */
582 double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
583 instances.initial = (int) (total_ndb_objects / cluster.nconnections);
584 while(instances.initial % n_workers) instances.initial++; // round up
585
586 /* The maximum number of NDB objects.
587 * This is used to configure hard limits on the size of the waitgroup,
588 * the sentqueue, and the reschedulequeue -- and it will not be
589 * possible to increase those limits during online reconfig.
590 */
591 instances.max = instances.initial;
592 // allow the pool to grow on demand?
593 if(global->options.auto_grow)
594 instances.max = (int) (instances.max * 1.6);
595 // max_clients imposes a hard upper limit
596 if(instances.max > (global->options.max_clients / cluster.nconnections))
597 instances.max = global->options.max_clients / cluster.nconnections;
598 // instances.initial might also be subject to the max_clients limit
599 if(instances.initial > instances.max)
600 instances.initial = instances.max;
601
602 /* Get a multi-wait Poll Group */
603 pollgroup = conn->create_ndb_wait_group(instances.max);
604
605 /* Initialize the statistics */
606 stats.sent_operations = 0;
607 stats.batches = 0;
608 stats.timeout_races = 0;
609
610 /* Initialize the semaphore */
611 pthread_mutex_init(& sem.lock, NULL);
612 init_condition_var(& sem.not_zero);
613 sem.counter = 0;
614
615 /* Initialize the queues for sent and resceduled items */
616 sentqueue = new Queue<NdbInstance>(instances.max);
617 reschedulequeue = new Queue<NdbInstance>(instances.max);
618 }
619
620
startThreads()621 void S::Connection::startThreads() {
622 /* Start the poll thread */
623 pthread_create( & poll_thread_id, NULL, run_poll_thread, (void *) this);
624
625 /* Start the send thread */
626 pthread_create( & send_thread_id, NULL, run_send_thread, (void *) this);
627 }
628
629
~Connection()630 S::Connection::~Connection() {
631 /* Shut down a connection.
632 The send thread should send everything in its queue.
633 The poll thread should wait for everything in its waitgroup.
634 Then they should both shut down.
635 */
636 DEBUG_ENTER_METHOD("S::Connection::~Connection");
637 pthread_join(send_thread_id, NULL);
638 DEBUG_PRINT("Cluster %d connection %d send thread has quit.",
639 cluster.cluster_id, id);
640
641 pthread_join(poll_thread_id, NULL);
642 DEBUG_PRINT("Cluster %d connection %d poll thread has quit.",
643 cluster.cluster_id, id);
644
645 /* Delete the queues */
646 assert(sentqueue->is_aborted());
647 delete sentqueue;
648 delete reschedulequeue;
649
650 /* Delete the semaphore */
651 pthread_cond_destroy(& sem.not_zero);
652 pthread_mutex_destroy(& sem.lock);
653
654 /* Release the multiwait group */
655 conn->release_ndb_wait_group(pollgroup);
656 }
657
658
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)659 void S::Connection::add_stats(const char *stat_key,
660 ADD_STAT add_stat,
661 const void *cookie) {
662 char key[128];
663 char val[128];
664 int klen, vlen;
665
666 klen = sprintf(key, "cl%d.conn%d.sent_operations", cluster.cluster_id, id);
667 vlen = sprintf(val, "%llu", stats.sent_operations);
668 add_stat(key, klen, val, vlen, cookie);
669
670 klen = sprintf(key, "cl%d.conn%d.batches", cluster.cluster_id, id);
671 vlen = sprintf(val, "%llu", stats.batches);
672 add_stat(key, klen, val, vlen, cookie);
673
674 klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
675 vlen = sprintf(val, "%llu", stats.timeout_races);
676 add_stat(key, klen, val, vlen, cookie);
677
678 klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
679 vlen = sprintf(val, "%d", instances.initial);
680 add_stat(key, klen, val, vlen, cookie);
681
682 klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
683 vlen = sprintf(val, "%d", instances.max);
684 add_stat(key, klen, val, vlen, cookie);
685 }
686
687
688 /*
689 Some design features of the send thread
690
691 1: When a worker thread has an item ready to send, it tries to acquire
692 the mutex and post to the semaphore. The send thread sleeps on the
693 semaphore's condition variable waiting for a worker to post to it.
694 But if a worker thread finds the mutex already locked, it simply
695 skips posting the semaphore; some other thread must be posting anyway.
696 This sets up a possible race where a worker may queue an item but the
697 send thread misses it. Therefore the send thread always sets a timeout
698 when waiting, and always examines the queues after the timer expires.
699
700 2: The occurence of the race described above is recorded in the
701 stats.timeout_races counter.
702
703 3: How long is the timeout? It varies from a low value when the server is
704 busy to a high one when idle. Also, when busy, we try to reduce the number
705 of calls to gettimeofday() or clock_gettime() to one per timeout_msec ms
706 rather than one per iteration.
707 */
run_ndb_send_thread()708 void * S::Connection::run_ndb_send_thread() {
709 /* Set thread identity */
710 thread_identifier tid;
711 tid.pipeline = 0;
712 snprintf(tid.name, THD_ID_NAME_LEN,
713 "cl%d.conn%d.send", cluster.cluster_id, id);
714 set_thread_id(&tid);
715
716 DEBUG_ENTER();
717
718 NdbInstance *readylist; /* list of items fetched from queues */
719 int nready = 0; /* number of items on the readylist */
720 int nsent = 0; /* number sent in this iteration */
721 int c_wait = 0; /* return value from pthread_cond_timedwait() */
722 struct timespec timer;
723 const int timeout_min = 200; /* "busy" server timeout */
724 const int timeout_max = 3200; /* "idle" server timeout */
725 int timeout_msec = timeout_min;
726 int shutting_down = 0;
727
728 while(1) {
729 if(nsent == 0) { /* nothing sent last time through the loop */
730 if(shutting_down) {
731 sentqueue->abort();
732 pollgroup->wakeup();
733 return 0;
734 }
735
736 if(timeout_msec < timeout_max) {
737 timeout_msec *= 2; /* progress from "busy" towards "idle" */
738 }
739 timespec_get_time(& timer);
740 timespec_add_msec(& timer, timeout_msec);
741 }
742
743 /* Acquire the semaphore */
744 pthread_mutex_lock(& sem.lock);
745 if(sem.counter == 0) {
746 c_wait = pthread_cond_timedwait(& sem.not_zero, & sem.lock, & timer);
747 }
748 sem.counter = 0;
749 pthread_mutex_unlock(& sem.lock);
750
751 /* There are several queues that may have NDBs ready for sending.
752 Examine all of them, and consolidate all of the ready NDBs into a
753 single list. */
754 nready = 0;
755 readylist = 0;
756
757 /* First check the reschedule queue */
758 nready += get_operations_from_queue(& readylist, reschedulequeue);
759
760 /* Then the worker thread queues */
761 for(int w = id; w < n_total_workers; w += cluster.nconnections) {
762 S::WorkerConnection *wc = * (cluster.getWorkerConnectionPtr(w));
763 DEBUG_ASSERT(wc->id.conn == id);
764 nready += get_operations_from_queue(& readylist, wc->sendqueue);
765 if(wc->sendqueue->is_aborted()) {
766 shutting_down = 1;
767 }
768 }
769
770 /* Now walk the readylist. Send pending operations from the NDBs there,
771 then place them on the sent-items queue for the poll thread. */
772 nsent = 0;
773 if(nready) {
774 for(NdbInstance *inst = readylist; inst != NULL ; inst = inst->next) {
775 int force = 0;
776 if(nready == 1 && s_global->options.force_send == 1) {
777 force = 1; // force-send the last item in the list
778 }
779
780 /* Send the operations */
781 inst->db->sendPreparedTransactions(force);
782 DEBUG_PRINT("Sent %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
783
784 /* Give the instance to the poll thread */
785 sentqueue->produce(inst);
786
787 nsent += 1;
788 nready -= 1;
789 }
790
791 stats.batches += 1;
792 stats.sent_operations += nsent;
793 if(c_wait == ETIMEDOUT) {
794 stats.timeout_races += 1;
795 }
796
797 pollgroup->wakeup();
798
799 timeout_msec = timeout_min; /* we are now "busy" */
800 }
801 }
802 }
803
804
get_operations_from_queue(NdbInstance ** readylist,Queue<NdbInstance> * q)805 int S::Connection::get_operations_from_queue(NdbInstance **readylist,
806 Queue<NdbInstance> *q) {
807 int n = 0;
808 NdbInstance *inst;
809 while((inst = q->consume()) != NULL) {
810 assert(inst->db);
811 inst->next = *readylist;
812 *readylist = inst;
813 n++;
814 }
815 return n;
816 }
817
818
run_ndb_poll_thread()819 void * S::Connection::run_ndb_poll_thread() {
820 /* Set thread identity */
821 thread_identifier tid;
822 tid.pipeline = 0;
823 snprintf(tid.name, THD_ID_NAME_LEN,
824 "cl%d.conn%d.poll", cluster.cluster_id, id);
825 set_thread_id(&tid);
826
827 DEBUG_ENTER();
828
829 NdbInstance *inst;
830 int wait_timeout_millisec = 5000;
831 int pct_ready;
832 int in_flight = 0;
833
834 while(1) {
835 if(in_flight == 0 && sentqueue->is_aborted()) {
836 return 0;
837 }
838
839 int n_added = 0;
840 /* Add new NDBs to the poll group */
841 while((inst = sentqueue->consume()) != 0) {
842 assert(inst->db);
843 inst->next = 0;
844 DEBUG_PRINT(" ** adding %d.%d to wait group ** ",
845 inst->wqitem->pipeline->id, inst->wqitem->id);
846 if(! pollgroup->push(inst->db)) {
847 n_added++;
848 in_flight++;
849 }
850 }
851
852 /* What's the minimum number of ready Ndb's to wake up for? */
853 pct_ready = (n_added > 4) ? 25 : 1;
854
855 /* Wait until something is ready to poll */
856 int nwaiting = pollgroup->wait(wait_timeout_millisec, pct_ready);
857
858 /* Poll the ones that are ready */
859 if(nwaiting > 0) {
860 for(int i = 0; i < nwaiting ; i++) {
861 in_flight--;
862 assert(in_flight >= 0);
863 Ndb *db = pollgroup->pop();
864 inst = (NdbInstance *) db->getCustomData();
865 DEBUG_PRINT("Polling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
866 db->pollNdb(0, 1);
867
868 if(inst->wqitem->base.reschedule) {
869 DEBUG_PRINT("Rescheduling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
870 inst->wqitem->base.reschedule = 0;
871 reschedulequeue->produce(inst); // Put it on the reschedule queue
872 if(pthread_mutex_trylock( & sem.lock) == 0) {
873 sem.counter++;
874 pthread_cond_signal(& sem.not_zero); // Ping the send thread
875 pthread_mutex_unlock(& sem.lock);
876 }
877 }
878 else {
879 // Scheduler yielded. Notify memcached that the operation is complete.
880 DEBUG_PRINT("item_io_complete for %d.%d",
881 inst->wqitem->pipeline->id, inst->wqitem->id);
882 item_io_complete(inst->wqitem);
883 }
884 }
885 }
886 }
887 return 0; /* not reached */
888 }
889
890
run_send_thread(void * v)891 void * run_send_thread(void *v) {
892 S::Connection *c = (S::Connection *) v;
893 return c->run_ndb_send_thread();
894 }
895
run_poll_thread(void * v)896 void * run_poll_thread(void *v) {
897 S::Connection *c = (S::Connection *) v;
898 return c->run_ndb_poll_thread();
899 }
900
901
902