1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 
27 #include <my_config.h>
28 #include <stdio.h>
29 #include <ctype.h>
30 #include <stdio.h>
31 #include <sys/errno.h>
32 #define __STDC_FORMAT_MACROS
33 #include <inttypes.h>
34 
35 /* Memcache headers */
36 #include "memcached/types.h"
37 #include <memcached/extension_loggers.h>
38 
39 #include "timing.h"
40 #include "debug.h"
41 #include "Configuration.h"
42 #include "thread_identifier.h"
43 #include "workitem.h"
44 #include "ndb_worker.h"
45 #include "ndb_engine_errors.h"
46 #include "ndb_error_logger.h"
47 
48 #include "S_sched.h"
49 
50 extern "C" {
51   void * run_send_thread(void *v);
52   void * run_poll_thread(void *v);
53 }
54 
55 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
56 
57 /* Scheduler Global singleton */
58 static S::SchedulerGlobal * s_global;
59 
60 /* SchedulerGlobal methods */
SchedulerGlobal(int _nthreads)61 S::SchedulerGlobal::SchedulerGlobal(int _nthreads) :
62   GlobalConfigManager(_nthreads)
63 {
64 }
65 
66 
init(const scheduler_options * sched_opts)67 void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
68   DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
69 
70   /* Set member variables */
71   config_string = sched_opts->config_string;
72   parse_config_string(nthreads, config_string);
73   options.max_clients = sched_opts->max_clients;
74 
75   /* Fetch or initialize clusters */
76   nclusters = conf->nclusters;
77   clusters = new Cluster * [nclusters];
78   for(int i = 0 ; i < nclusters ; i++) {
79     ClusterConnectionPool *pool = conf->getConnectionPoolById(i);
80     Cluster *c = (Cluster *) pool->getCustomData();
81     if(c == 0) {
82       c = new Cluster(this, i);
83       pool->setCustomData(c);
84     }
85     clusters[i] = c;
86     c->nreferences += 1;
87   }
88 
89   /* Initialize the WorkerConnections */
90   for(int t = 0 ; t < nthreads ; t++) {
91     for(int c = 0 ; c < nclusters ; c++) {
92       WorkerConnection **wc_cell = getWorkerConnectionPtr(t, c);
93       * wc_cell = new WorkerConnection(this, t, c);
94     }
95   }
96 
97   /* Build Configurations for WorkerConnections */
98   configureSchedulers();
99 
100   /* Start the send & poll threads for each connection */
101   for(int i = 0 ; i < nclusters ; i++)
102     clusters[i]->startThreads();
103 
104   /* Log message for startup */
105   logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
106               "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
107               options.n_connections, options.force_send,
108               options.auto_grow, options.send_timer);
109 
110   /* Now Running */
111   running = true;
112 }
113 
114 
shutdown()115 void S::SchedulerGlobal::shutdown() {
116   if(running) {
117     logger->log(LOG_INFO, 0, "Shutting down scheduler.");
118 
119     /* First shut down each WorkerConnection */
120     for(int i = 0; i < nclusters ; i++) {
121       for(int j = 0; j < options.n_worker_threads; j++) {
122         S::WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
123         wc->sendqueue->abort();
124       }
125     }
126 
127     /* Release each Cluster (and its Connections) */
128     for(int i = 0; i < nclusters ; i++) {
129       Cluster *c = clusters[i];
130       if ( --(c->nreferences) == 0) {
131         delete c;
132         conf->getConnectionPoolById(i)->setCustomData(0);
133       }
134     }
135 
136     /* Then actually delete each WorkerConnection */
137     for(int i = 0; i < nclusters ; i++) {
138       for(int j = 0; j < options.n_worker_threads; j++) {
139         delete * (getWorkerConnectionPtr(j, i));
140         * (getWorkerConnectionPtr(j, i)) = 0;
141       }
142     }
143 
144     /* Shutdown now */
145     logger->log(LOG_WARNING, 0, "Shutdown completed.");
146     running = false;
147   }
148 }
149 
150 
parse_config_string(int nthreads,const char * str)151 void S::SchedulerGlobal::parse_config_string(int nthreads, const char *str) {
152 
153   /* Initialize the configuration default values */
154   options.n_worker_threads = nthreads;
155   options.n_connections = 0;   // 0 = n_connections based on db-stored config
156   options.force_send = 0;      // 0 = force send always off
157   options.send_timer = 1;      // 1 = 1 ms. timer in send thread
158   options.auto_grow = 1;       // 1 = allow NDB instance pool to grow on demand
159 
160   if(str) {
161     const char *s = str;
162     char letter;
163     int value;
164 
165     /* tolerate a ':' at the start of the string */
166     if( *s == ':') s++;
167 
168     while(*s != '\0' && sscanf(s, "%c%d", &letter, &value) == 2) {
169       switch(letter) {
170         case 'c':
171           options.n_connections = value;
172           break;
173         case 'f':
174           options.force_send = value;
175           break;
176         case 'g':
177           options.auto_grow = value;
178           break;
179         case 't':
180           options.send_timer = value;
181           break;
182       }
183       /* Skip over the part just read */
184       s += 1;                   // the letter
185       while(isdigit(*s)) s++;   // the value
186 
187       /* Now tolerate a comma */
188       if(*s == ',') s++;
189     }
190   }
191 
192   /* Test validity of configuration */
193   if(options.force_send < 0 || options.force_send > 2) {
194     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
195     assert(options.force_send >= 0 || options.force_send <= 2);
196   }
197   if(options.n_connections < 0 || options.n_connections > 4) {
198     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
199     assert(options.n_connections >= 0 && options.n_connections <= 4);
200   }
201   if(options.send_timer < 1 || options.send_timer > 10) {
202     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
203     assert(options.send_timer >= 1 && options.send_timer <= 10);
204   }
205   if(options.auto_grow < 0 || options.auto_grow > 1) {
206     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
207     assert(options.auto_grow == 0 || options.auto_grow == 1);
208   }
209 }
210 
211 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)212 void S::SchedulerGlobal::add_stats(const char *stat_key,
213                                    ADD_STAT add_stat,
214                                    const void *cookie) {
215   if(strncasecmp(stat_key, "reconf", 6) == 0) {
216     WorkerConnection ** wc = getWorkerConnectionPtr(0,0);
217     (* wc)->add_stats(stat_key, add_stat, cookie);
218   }
219   else {
220     DEBUG_PRINT(" scheduler");
221     for(int c = 0 ; c < nclusters ; c++) {
222       clusters[c]->add_stats(stat_key, add_stat, cookie);
223     }
224   }
225 }
226 
227 
228 /* SchedulerWorker methods */
229 
init(int my_thread,const scheduler_options * options)230 void S::SchedulerWorker::init(int my_thread,
231                               const scheduler_options * options) {
232   /* On the first call in, initialize the SchedulerGlobal.
233    * This will start the send & poll threads for each connection.
234    */
235   if(my_thread == 0) {
236     s_global = new SchedulerGlobal(options->nthreads);
237     s_global->init(options);
238   }
239 
240   /* Initialize member variables */
241   id = my_thread;
242 }
243 
244 
shutdown()245 void S::SchedulerWorker::shutdown() {
246   if(id == 0)
247     s_global->shutdown();
248 }
249 
250 
~SchedulerWorker()251 S::SchedulerWorker::~SchedulerWorker() {
252   if(id == 0)
253     delete s_global;
254 }
255 
256 
schedule(workitem * item)257 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
258   int c = item->prefix_info.cluster_id;
259   ENGINE_ERROR_CODE response_code;
260   NdbInstance *inst = 0;
261   const KeyPrefix *pfx;
262   S::WorkerConnection *wc;
263 
264   wc = * (s_global->getWorkerConnectionPtr(id, c));
265   if(wc == 0) return ENGINE_FAILED;
266 
267   if(wc->freelist) {                 /* Get the next NDB from the freelist. */
268     inst = wc->freelist;
269     wc->freelist = inst->next;
270   }
271   else {                             /* No free NDBs. */
272     if(wc->sendqueue->is_aborted()) {
273       return ENGINE_TMPFAIL;
274     }
275     else {                           /* Try to make an NdbInstance on the fly */
276       inst = wc->newNdbInstance();
277       if(inst) {
278         log_app_error(& AppError29024_autogrow);
279       }
280       else {
281         /* We have hit a hard maximum.  Eventually Scheduler::io_completed()
282            will run _in this thread_ and return an NDB to the freelist.
283            But no other thread can free one, so here we return an error.
284          */
285         log_app_error(& AppError29002_NoNDBs);
286         return ENGINE_TMPFAIL;
287       }
288     }
289   }
290 
291   assert(inst);
292   inst->link_workitem(item);
293 
294   // Fetch the query plan for this prefix.
295   pfx = wc->setQueryPlanInWorkitem(item);
296   if(! item->plan) {
297     DEBUG_PRINT("getPlanForPrefix() failure");
298     return ENGINE_FAILED;
299   }
300 
301   // Build the NDB transaction
302   op_status_t op_status = worker_prepare_operation(item);
303 
304   // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
305   if(op_status == op_prepared) {
306     /* Put the prepared item onto a send queue */
307     wc->sendqueue->produce(inst);
308     DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
309 
310     /* This locking is explained in run_ndb_send_thread() */
311     if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) {  // try the lock
312       wc->conn->sem.counter++;                               // increment
313       pthread_cond_signal( & wc->conn->sem.not_zero);        // signal
314       pthread_mutex_unlock( & wc->conn->sem.lock);           // release
315     }
316 
317     response_code = ENGINE_EWOULDBLOCK;
318   }
319   else {
320     /* Status is not op_prepared, but rather some error status */
321     response_code = item->status->status;
322   }
323 
324   return response_code;
325 }
326 
327 
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)328 void S::SchedulerWorker::prepare(NdbTransaction * tx,
329                                  NdbTransaction::ExecType execType,
330                                  NdbAsynchCallback callback,
331                                  workitem * item, prepare_flags flags) {
332   tx->executeAsynchPrepare(execType, callback, (void *) item);
333   if(flags == RESCHEDULE) item->base.reschedule = 1;
334 }
335 
336 
close(NdbTransaction * tx,workitem * item)337 void S::SchedulerWorker::close(NdbTransaction *tx, workitem *item) {
338   Uint64 nwaits_pre, nwaits_post;
339   Ndb * & ndb = item->ndb_instance->db;
340 
341   nwaits_pre  = ndb->getClientStat(Ndb::WaitExecCompleteCount);
342   tx->close();
343   nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
344 
345   if(nwaits_post > nwaits_pre)
346     log_app_error(& AppError29023_SyncClose);
347 }
348 
349 
350 /* Release the resources used by an operation.
351    Unlink the NdbInstance from the workitem, and return it to the free list
352    (or free it, if the scheduler is shutting down).
353 */
release(workitem * item)354 void S::SchedulerWorker::release(workitem *item) {
355   DEBUG_ENTER();
356   NdbInstance *inst = item->ndb_instance;
357 
358   if(inst) {
359     inst->unlink_workitem(item);
360     int c = item->prefix_info.cluster_id;
361     S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
362     if(wc && ! wc->sendqueue->is_aborted()) {
363       inst->next = wc->freelist;
364       wc->freelist = inst;
365       // DEBUG_PRINT("Returned NdbInstance to freelist.");
366     }
367     else {
368       /* We are in the midst of shutting down (and possibly reconfiguring) */
369       delete inst;
370     }
371   }
372 }
373 
374 
global_reconfigure(Configuration * new_cf)375 bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
376   return s_global->reconfigure(new_cf);
377 }
378 
379 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)380 void S::SchedulerWorker::add_stats(const char *stat_key,
381                                    ADD_STAT add_stat,
382                                    const void *cookie) {
383   s_global->add_stats(stat_key, add_stat, cookie);
384 }
385 
386 
387 /* Cluster methods */
Cluster(SchedulerGlobal * global,int _id)388 S::Cluster::Cluster(SchedulerGlobal *global, int _id) :
389   threads_started(false),
390   cluster_id(_id),
391   nreferences(0)
392 {
393   DEBUG_PRINT("%d", cluster_id);
394 
395   /* How many cluster connections are wanted?
396      If options.n_connections is zero (the default) we want one connection
397      per 50,000 desired TPS.  (The default for TPS is 100,000 -- so, two
398      connections).   But if a number is specified in the config, use that
399      instead.
400   */
401   if(global->options.n_connections) {
402     nconnections = global->options.n_connections;
403   }
404   else {
405     const int connection_tps = 50000;
406     nconnections = global->conf->max_tps / connection_tps;
407     if(global->conf->max_tps % connection_tps) nconnections += 1;
408   }
409   assert(nconnections > 0);
410 
411   /* Get our connection pool */
412   ClusterConnectionPool *pool = global->conf->getConnectionPoolById(cluster_id);
413 
414   /* Some NDB Cluster Connections are already open;
415      if we want more, try to add them now. */
416   // TODO: If you reconfigure too many times you run out of connections ...
417   DEBUG_PRINT("Cluster %d, have %d connection(s), want %d",
418               cluster_id, pool->getPoolSize(), nconnections);
419   for(int i = pool->getPoolSize(); i < nconnections ; i ++) {
420     Ndb_cluster_connection * c = pool->addPooledConnection();
421     if(c == 0) {
422       /* unable to create any more connections */
423       nconnections = i;
424       break;
425     }
426   }
427 
428   logger->log(LOG_WARNING, 0, "Scheduler: using %d connection%s to cluster %d\n",
429               nconnections, nconnections == 1 ? "" : "s", cluster_id);
430 
431   /* Instantiate the Connection objects */
432   connections = new S::Connection * [nconnections];
433   for(int i = 0; i < nconnections ; i++) {
434     connections[i] = new S::Connection(*this, i);
435   }
436 }
437 
438 
startThreads()439 void S::Cluster::startThreads() {
440   /* Threads are started only once and persist across reconfiguration.
441      But, this method will be called again for each reconf. */
442   if(threads_started == false) {
443     for(int i = 0 ; i < nconnections; i++) {
444       connections[i]->startThreads();
445     }
446     threads_started = true;
447   }
448 }
449 
450 
~Cluster()451 S::Cluster::~Cluster() {
452   DEBUG_PRINT("Shutting down cluster %d", cluster_id);
453   for(int i = 0; i < nconnections ; i++) {
454     delete connections[i];
455   }
456 }
457 
458 
getWorkerConnectionPtr(int thd) const459 S::WorkerConnection ** S::Cluster::getWorkerConnectionPtr(int thd) const {
460   return s_global->getWorkerConnectionPtr(thd, cluster_id);
461 }
462 
463 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)464 void S::Cluster::add_stats(const char *stat_key,
465                            ADD_STAT add_stat,
466                            const void *cookie) {
467   for(int c = 0 ; c < nconnections ; c++) {
468     connections[c]->add_stats(stat_key, add_stat, cookie);
469   }
470 }
471 
472 
473 /* WorkerConnection methods */
474 
475 
newNdbInstance()476 NdbInstance * S::WorkerConnection::newNdbInstance() {
477   NdbInstance *inst = 0;
478   if(instances.current < instances.max) {
479     inst = new NdbInstance(conn->conn, 2);
480     instances.current++;
481     inst->id = ((id.thd + 1) * 10000) + instances.current;
482   }
483   return inst;
484 }
485 
486 
WorkerConnection(SchedulerGlobal * global,int thd_id,int cluster_id)487 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
488                                       int thd_id, int cluster_id) :
489   SchedulerConfigManager(thd_id, cluster_id)
490 {
491   S::Cluster *cl = global->clusters[cluster_id];
492 
493   id.thd = thd_id;
494   id.cluster = cluster_id;
495   id.conn = thd_id % cl->nconnections;  // round-robin assignment
496   conn = cl->connections[id.conn];
497   id.node = conn->node_id;
498 
499   /* How many NDB instances to start initially */
500   instances.initial = conn->instances.initial / conn->n_workers;
501 
502   /* Maximum size of send queue, and upper bound on NDB instances */
503   instances.max = conn->instances.max / conn->n_workers;
504 
505   /* Build the freelist */
506   freelist = 0;
507   for(instances.current = 0; instances.current < instances.initial; ) {
508     NdbInstance *inst = newNdbInstance();
509     inst->next = freelist;
510     freelist = inst;
511   }
512 
513   DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
514               id.cluster, id.conn, id.node, id.thd, instances.current);
515 
516   /* Initialize the sendqueue */
517   sendqueue = new Queue<NdbInstance>(instances.max);
518 
519   /* Hoard a transaction (an API connect record) for each Ndb object.  This
520    * first call to startTransaction() will send TC_SEIZEREQ and wait for a
521    * reply, but later at runtime startTransaction() should return immediately.
522    */
523   NdbTransaction ** txlist = new NdbTransaction * [instances.current];
524   int i = 0;
525 
526   // Open them all.
527   for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
528     NdbTransaction *tx;
529     tx = inst->db->startTransaction();
530     if(! tx) log_ndb_error(inst->db->getNdbError());
531     txlist[i] = tx;
532   }
533 
534   // Close them all.
535   for(i = 0 ; i < instances.current ; i++) {
536     if(txlist[i])
537       txlist[i]->close();
538   }
539 
540   // Free the list.
541   delete[] txlist;
542 }
543 
544 
~WorkerConnection()545 S::WorkerConnection::~WorkerConnection() {
546   DEBUG_ENTER_METHOD("S::WorkerConnection::~WorkerConnection");
547 
548   /* Delete all of the Ndbs that are not currently in use */
549   NdbInstance *inst = freelist;
550   while(inst != 0) {
551     NdbInstance *next = inst->next;
552     delete inst;
553     inst = next;
554   }
555 
556   /* Delete the sendqueue */
557   delete sendqueue;
558 }
559 
560 
561 /* Connection methods */
562 
Connection(S::Cluster & _cl,int _id)563 S::Connection::Connection(S::Cluster & _cl, int _id) :
564   cluster(_cl), id(_id)
565 {
566   S::SchedulerGlobal *global = s_global;
567   Configuration *conf = global->conf;
568   n_total_workers = global->options.n_worker_threads;
569 
570   /* Get the connection pool for my cluster */
571   ClusterConnectionPool *pool = conf->getConnectionPoolById(cluster.cluster_id);
572 
573   /* Get my connection from the pool */
574   conn = pool->getPooledConnection(id);
575   node_id = conn->node_id();
576 
577   /* Set the timer on the adaptive send thread */
578   conn->set_max_adaptive_send_time(global->options.send_timer);
579 
580   /* How many worker threads will use this connection? */
581   n_workers = global->options.n_worker_threads / cluster.nconnections;
582   if(n_total_workers % cluster.nconnections > id) n_workers += 1;
583 
584   /* How many NDB objects are needed for the desired performance? */
585   double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
586   instances.initial = (int) (total_ndb_objects / cluster.nconnections);
587   while(instances.initial % n_workers) instances.initial++; // round up
588 
589   /* The maximum number of NDB objects.
590    * This is used to configure hard limits on the size of the waitgroup,
591    * the sentqueue, and the reschedulequeue -- and it will not be
592    * possible to increase those limits during online reconfig.
593    */
594   instances.max = instances.initial;
595   // allow the pool to grow on demand?
596   if(global->options.auto_grow)
597     instances.max = (int) (instances.max * 1.6);
598   // max_clients imposes a hard upper limit
599   if(instances.max > (global->options.max_clients / cluster.nconnections))
600     instances.max = global->options.max_clients / cluster.nconnections;
601   // instances.initial might also be subject to the max_clients limit
602   if(instances.initial > instances.max)
603     instances.initial = instances.max;
604 
605   /* Get a multi-wait Poll Group */
606   pollgroup = conn->create_ndb_wait_group(instances.max);
607 
608   /* Initialize the statistics */
609   stats.sent_operations = 0;
610   stats.batches = 0;
611   stats.timeout_races = 0;
612 
613   /* Initialize the semaphore */
614   pthread_mutex_init(& sem.lock, NULL);
615   init_condition_var(& sem.not_zero);
616   sem.counter = 0;
617 
618   /* Initialize the queues for sent and resceduled items */
619   sentqueue = new Queue<NdbInstance>(instances.max);
620   reschedulequeue = new Queue<NdbInstance>(instances.max);
621 }
622 
623 
startThreads()624 void S::Connection::startThreads() {
625   /* Start the poll thread */
626   pthread_create( & poll_thread_id, NULL, run_poll_thread, (void *) this);
627 
628   /* Start the send thread */
629   pthread_create( & send_thread_id, NULL, run_send_thread, (void *) this);
630 }
631 
632 
~Connection()633 S::Connection::~Connection() {
634   /* Shut down a connection.
635      The send thread should send everything in its queue.
636      The poll thread should wait for everything in its waitgroup.
637      Then they should both shut down.
638   */
639   DEBUG_ENTER_METHOD("S::Connection::~Connection");
640   pthread_join(send_thread_id, NULL);
641   DEBUG_PRINT("Cluster %d connection %d send thread has quit.",
642               cluster.cluster_id, id);
643 
644   pthread_join(poll_thread_id, NULL);
645   DEBUG_PRINT("Cluster %d connection %d poll thread has quit.",
646               cluster.cluster_id, id);
647 
648   /* Delete the queues */
649   assert(sentqueue->is_aborted());
650   delete sentqueue;
651   delete reschedulequeue;
652 
653   /* Delete the semaphore */
654   pthread_cond_destroy(& sem.not_zero);
655   pthread_mutex_destroy(& sem.lock);
656 
657   /* Release the multiwait group */
658   conn->release_ndb_wait_group(pollgroup);
659 }
660 
661 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)662 void S::Connection::add_stats(const char *stat_key,
663                               ADD_STAT add_stat,
664                               const void *cookie) {
665   char key[128];
666   char val[128];
667   int klen, vlen;
668 
669   klen = sprintf(key, "cl%d.conn%d.sent_operations", cluster.cluster_id, id);
670   vlen = sprintf(val, "%llu", stats.sent_operations);
671   add_stat(key, klen, val, vlen, cookie);
672 
673   klen = sprintf(key, "cl%d.conn%d.batches", cluster.cluster_id, id);
674   vlen = sprintf(val, "%llu", stats.batches);
675   add_stat(key, klen, val, vlen, cookie);
676 
677   klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
678   vlen = sprintf(val, "%llu", stats.timeout_races);
679   add_stat(key, klen, val, vlen, cookie);
680 
681   klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
682   vlen = sprintf(val, "%d", instances.initial);
683   add_stat(key, klen, val, vlen, cookie);
684 
685   klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
686   vlen = sprintf(val, "%d", instances.max);
687   add_stat(key, klen, val, vlen, cookie);
688 }
689 
690 
691 /*
692  Some design features of the send thread
693 
694  1:  When a worker thread has an item ready to send, it tries to acquire
695  the mutex and post to the semaphore.  The send thread sleeps on the
696  semaphore's condition variable waiting for a worker to post to it.
697  But if a worker thread finds the mutex already locked, it simply
698  skips posting the semaphore; some other thread must be posting anyway.
699  This sets up a possible race where a worker may queue an item but the
700  send thread misses it.  Therefore the send thread always sets a timeout
701  when waiting, and always examines the queues after the timer expires.
702 
703  2: The occurence of the race described above is recorded in the
704  stats.timeout_races counter.
705 
706  3: How long is the timeout? It varies from a low value when the server is
707  busy to a high one when idle.  Also, when busy, we try to reduce the number
708  of calls to gettimeofday() or clock_gettime() to one per timeout_msec ms
709  rather than one per iteration.
710 */
run_ndb_send_thread()711 void * S::Connection::run_ndb_send_thread() {
712   /* Set thread identity */
713   thread_identifier tid;
714   tid.pipeline = 0;
715   snprintf(tid.name, THD_ID_NAME_LEN,
716            "cl%d.conn%d.send", cluster.cluster_id, id);
717   set_thread_id(&tid);
718 
719   DEBUG_ENTER();
720 
721   NdbInstance *readylist;     /* list of items fetched from queues */
722   int nready = 0;             /* number of items on the readylist */
723   int nsent = 0;              /* number sent in this iteration */
724   int c_wait = 0;             /* return value from pthread_cond_timedwait() */
725   struct timespec timer;
726   const int timeout_min = 200;   /* "busy" server timeout */
727   const int timeout_max = 3200;  /* "idle" server timeout */
728   int timeout_msec = timeout_min;
729   int shutting_down = 0;
730 
731   while(1) {
732     if(nsent == 0) {  /* nothing sent last time through the loop */
733       if(shutting_down) {
734         sentqueue->abort();
735         pollgroup->wakeup();
736         return 0;
737       }
738 
739       if(timeout_msec < timeout_max) {
740         timeout_msec *= 2;  /* progress from "busy" towards "idle" */
741       }
742       timespec_get_time(& timer);
743       timespec_add_msec(& timer, timeout_msec);
744     }
745 
746     /* Acquire the semaphore */
747     pthread_mutex_lock(& sem.lock);
748     if(sem.counter == 0) {
749       c_wait = pthread_cond_timedwait(& sem.not_zero, & sem.lock, & timer);
750     }
751     sem.counter = 0;
752     pthread_mutex_unlock(& sem.lock);
753 
754     /* There are several queues that may have NDBs ready for sending.
755        Examine all of them, and consolidate all of the ready NDBs into a
756        single list. */
757     nready = 0;
758     readylist = 0;
759 
760     /* First check the reschedule queue */
761     nready += get_operations_from_queue(& readylist, reschedulequeue);
762 
763     /* Then the worker thread queues */
764     for(int w = id; w < n_total_workers; w += cluster.nconnections) {
765       S::WorkerConnection *wc = * (cluster.getWorkerConnectionPtr(w));
766       DEBUG_ASSERT(wc->id.conn == id);
767       nready += get_operations_from_queue(& readylist, wc->sendqueue);
768       if(wc->sendqueue->is_aborted()) {
769         shutting_down = 1;
770       }
771     }
772 
773     /* Now walk the readylist.  Send pending operations from the NDBs there,
774        then place them on the sent-items queue for the poll thread. */
775     nsent = 0;
776     if(nready) {
777       for(NdbInstance *inst = readylist; inst != NULL ; inst = inst->next) {
778         int force = 0;
779         if(nready == 1 && s_global->options.force_send == 1) {
780           force = 1; // force-send the last item in the list
781         }
782 
783         /* Send the operations */
784         inst->db->sendPreparedTransactions(force);
785         DEBUG_PRINT("Sent %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
786 
787         /* Give the instance to the poll thread */
788         sentqueue->produce(inst);
789 
790         nsent += 1;
791         nready -= 1;
792       }
793 
794       stats.batches += 1;
795       stats.sent_operations += nsent;
796       if(c_wait == ETIMEDOUT) {
797         stats.timeout_races += 1;
798       }
799 
800       pollgroup->wakeup();
801 
802       timeout_msec = timeout_min;  /* we are now "busy" */
803     }
804   }
805 }
806 
807 
get_operations_from_queue(NdbInstance ** readylist,Queue<NdbInstance> * q)808 int S::Connection::get_operations_from_queue(NdbInstance **readylist,
809                                              Queue<NdbInstance> *q) {
810   int n = 0;
811   NdbInstance *inst;
812   while((inst = q->consume()) != NULL) {
813     assert(inst->db);
814     inst->next = *readylist;
815     *readylist = inst;
816     n++;
817   }
818   return n;
819 }
820 
821 
run_ndb_poll_thread()822 void * S::Connection::run_ndb_poll_thread() {
823   /* Set thread identity */
824   thread_identifier tid;
825   tid.pipeline = 0;
826   snprintf(tid.name, THD_ID_NAME_LEN,
827            "cl%d.conn%d.poll", cluster.cluster_id, id);
828   set_thread_id(&tid);
829 
830   DEBUG_ENTER();
831 
832   NdbInstance *inst;
833   Ndb ** ready_list;
834   int wait_timeout_millisec = 5000;
835   int min_ready;
836   int in_flight = 0;
837 
838   while(1) {
839     if(in_flight == 0 && sentqueue->is_aborted()) {
840       return 0;
841     }
842 
843     int n_added = 0;
844     /* Add new NDBs to the poll group */
845     while((inst = sentqueue->consume()) != 0) {
846       assert(inst->db);
847       inst->next = 0;
848       DEBUG_PRINT(" ** adding %d.%d to wait group ** ",
849                   inst->wqitem->pipeline->id, inst->wqitem->id);
850       pollgroup->addNdb(inst->db);
851       n_added++;
852       in_flight++;
853     }
854 
855     /* What's the minimum number of ready Ndb's to wake up for? */
856     int n = n_added / 4;
857     min_ready = n > 0 ? n : 1;
858 
859     /* Wait until something is ready to poll */
860     int nwaiting = pollgroup->wait(ready_list, wait_timeout_millisec, min_ready);
861 
862     /* Poll the ones that are ready */
863     if(nwaiting > 0) {
864       for(int i = 0; i < nwaiting ; i++) {
865         in_flight--;
866         assert(in_flight >= 0);
867         Ndb *db = ready_list[i];
868         inst = (NdbInstance *) db->getCustomData();
869         DEBUG_PRINT("Polling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
870         db->pollNdb(0, 1);
871 
872         if(inst->wqitem->base.reschedule) {
873           DEBUG_PRINT("Rescheduling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
874           inst->wqitem->base.reschedule = 0;
875           reschedulequeue->produce(inst);  // Put it on the reschedule queue
876           if(pthread_mutex_trylock( & sem.lock) == 0) {
877             sem.counter++;
878             pthread_cond_signal(& sem.not_zero);  // Ping the send thread
879             pthread_mutex_unlock(& sem.lock);
880           }
881         }
882         else {
883           // Scheduler yielded. Notify memcached that the operation is complete.
884           DEBUG_PRINT("item_io_complete for %d.%d",
885                       inst->wqitem->pipeline->id, inst->wqitem->id);
886           item_io_complete(inst->wqitem);
887         }
888       }
889     }
890   }
891   return 0; /* not reached */
892   return 0; /* not reached */
893 }
894 
895 
run_send_thread(void * v)896 void * run_send_thread(void *v) {
897   S::Connection *c = (S::Connection *) v;
898   return c->run_ndb_send_thread();
899 }
900 
run_poll_thread(void * v)901 void * run_poll_thread(void *v) {
902   S::Connection *c = (S::Connection *) v;
903   return c->run_ndb_poll_thread();
904 }
905 
906 
907