1 /*
2  Copyright (c) 2011, 2017, Oracle and/or its affiliates. All rights reserved.
3 
4  This program is free software; you can redistribute it and/or modify
5  it under the terms of the GNU General Public License, version 2.0,
6  as published by the Free Software Foundation.
7 
8  This program is also distributed with certain software (including
9  but not limited to OpenSSL) that is licensed under separate terms,
10  as designated in a particular file or component or in included license
11  documentation.  The authors of MySQL hereby grant you an additional
12  permission to link the program and your derivative works with the
13  separately licensed software that they have included with MySQL.
14 
15  This program is distributed in the hope that it will be useful,
16  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  GNU General Public License, version 2.0, for more details.
19 
20  You should have received a copy of the GNU General Public License
21  along with this program; if not, write to the Free Software
22  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23  */
24 
25 #include "my_config.h"
26 #include <stdio.h>
27 #include <ctype.h>
28 #include <stdio.h>
29 #include <sys/errno.h>
30 #include <inttypes.h>
31 
32 /* Memcache headers */
33 #include "memcached/types.h"
34 #include <memcached/extension_loggers.h>
35 
36 #include "timing.h"
37 #include "debug.h"
38 #include "Configuration.h"
39 #include "thread_identifier.h"
40 #include "workitem.h"
41 #include "ndb_worker.h"
42 #include "ndb_engine_errors.h"
43 #include "ndb_error_logger.h"
44 
45 #include "S_sched.h"
46 
47 extern "C" {
48   void * run_send_thread(void *v);
49   void * run_poll_thread(void *v);
50 }
51 
52 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
53 
54 /* Scheduler Global singleton */
55 static S::SchedulerGlobal * s_global;
56 
57 /* SchedulerGlobal methods */
SchedulerGlobal(int _nthreads)58 S::SchedulerGlobal::SchedulerGlobal(int _nthreads) :
59   GlobalConfigManager(_nthreads)
60 {
61 }
62 
63 
init(const scheduler_options * sched_opts)64 void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
65   DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
66 
67   /* Set member variables */
68   config_string = sched_opts->config_string;
69   parse_config_string(nthreads, config_string);
70   options.max_clients = sched_opts->max_clients;
71 
72   /* Fetch or initialize clusters */
73   nclusters = conf->nclusters;
74   clusters = new Cluster * [nclusters];
75   for(int i = 0 ; i < nclusters ; i++) {
76     ClusterConnectionPool *pool = conf->getConnectionPoolById(i);
77     Cluster *c = (Cluster *) pool->getCustomData();
78     if(c == 0) {
79       c = new Cluster(this, i);
80       pool->setCustomData(c);
81     }
82     clusters[i] = c;
83     c->nreferences += 1;
84   }
85 
86   /* Initialize the WorkerConnections */
87   for(int t = 0 ; t < nthreads ; t++) {
88     for(int c = 0 ; c < nclusters ; c++) {
89       WorkerConnection **wc_cell = getWorkerConnectionPtr(t, c);
90       * wc_cell = new WorkerConnection(this, t, c);
91     }
92   }
93 
94   /* Build Configurations for WorkerConnections */
95   configureSchedulers();
96 
97   /* Start the send & poll threads for each connection */
98   for(int i = 0 ; i < nclusters ; i++)
99     clusters[i]->startThreads();
100 
101   /* Log message for startup */
102   logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
103               "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
104               options.n_connections, options.force_send,
105               options.auto_grow, options.send_timer);
106 
107   /* Now Running */
108   running = true;
109 }
110 
111 
shutdown()112 void S::SchedulerGlobal::shutdown() {
113   if(running) {
114     logger->log(LOG_INFO, 0, "Shutting down scheduler.");
115 
116     /* First shut down each WorkerConnection */
117     for(int i = 0; i < nclusters ; i++) {
118       for(int j = 0; j < options.n_worker_threads; j++) {
119         S::WorkerConnection *wc = * (getWorkerConnectionPtr(j, i));
120         wc->sendqueue->abort();
121       }
122     }
123 
124     /* Release each Cluster (and its Connections) */
125     for(int i = 0; i < nclusters ; i++) {
126       Cluster *c = clusters[i];
127       if ( --(c->nreferences) == 0) {
128         delete c;
129         conf->getConnectionPoolById(i)->setCustomData(0);
130       }
131     }
132 
133     /* Then actually delete each WorkerConnection */
134     for(int i = 0; i < nclusters ; i++) {
135       for(int j = 0; j < options.n_worker_threads; j++) {
136         delete * (getWorkerConnectionPtr(j, i));
137         * (getWorkerConnectionPtr(j, i)) = 0;
138       }
139     }
140 
141     /* Shutdown now */
142     logger->log(LOG_WARNING, 0, "Shutdown completed.");
143     running = false;
144   }
145 }
146 
147 
parse_config_string(int nthreads,const char * str)148 void S::SchedulerGlobal::parse_config_string(int nthreads, const char *str) {
149 
150   /* Initialize the configuration default values */
151   options.n_worker_threads = nthreads;
152   options.n_connections = 0;   // 0 = n_connections based on db-stored config
153   options.force_send = 0;      // 0 = force send always off
154   options.send_timer = 1;      // 1 = 1 ms. timer in send thread
155   options.auto_grow = 1;       // 1 = allow NDB instance pool to grow on demand
156 
157   if(str) {
158     const char *s = str;
159     char letter;
160     int value;
161 
162     /* tolerate a ':' at the start of the string */
163     if( *s == ':') s++;
164 
165     while(*s != '\0' && sscanf(s, "%c%d", &letter, &value) == 2) {
166       switch(letter) {
167         case 'c':
168           options.n_connections = value;
169           break;
170         case 'f':
171           options.force_send = value;
172           break;
173         case 'g':
174           options.auto_grow = value;
175           break;
176         case 't':
177           options.send_timer = value;
178           break;
179       }
180       /* Skip over the part just read */
181       s += 1;                   // the letter
182       while(isdigit(*s)) s++;   // the value
183 
184       /* Now tolerate a comma */
185       if(*s == ',') s++;
186     }
187   }
188 
189   /* Test validity of configuration */
190   if(options.force_send < 0 || options.force_send > 2) {
191     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
192     assert(options.force_send >= 0 && options.force_send <= 2);
193   }
194   if(options.n_connections < 0 || options.n_connections > 4) {
195     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
196     assert(options.n_connections >= 0 && options.n_connections <= 4);
197   }
198   if(options.send_timer < 1 || options.send_timer > 10) {
199     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
200     assert(options.send_timer >= 1 && options.send_timer <= 10);
201   }
202   if(options.auto_grow < 0 || options.auto_grow > 1) {
203     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
204     assert(options.auto_grow == 0 || options.auto_grow == 1);
205   }
206 }
207 
208 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)209 void S::SchedulerGlobal::add_stats(const char *stat_key,
210                                    ADD_STAT add_stat,
211                                    const void *cookie) {
212   if(strncasecmp(stat_key, "reconf", 6) == 0) {
213     WorkerConnection ** wc = getWorkerConnectionPtr(0,0);
214     (* wc)->add_stats(stat_key, add_stat, cookie);
215   }
216   else {
217     DEBUG_PRINT(" scheduler");
218     for(int c = 0 ; c < nclusters ; c++) {
219       clusters[c]->add_stats(stat_key, add_stat, cookie);
220     }
221   }
222 }
223 
224 
225 /* SchedulerWorker methods */
226 
init(int my_thread,const scheduler_options * options)227 void S::SchedulerWorker::init(int my_thread,
228                               const scheduler_options * options) {
229   /* On the first call in, initialize the SchedulerGlobal.
230    * This will start the send & poll threads for each connection.
231    */
232   if(my_thread == 0) {
233     s_global = new SchedulerGlobal(options->nthreads);
234     s_global->init(options);
235   }
236 
237   /* Initialize member variables */
238   id = my_thread;
239 }
240 
241 
shutdown()242 void S::SchedulerWorker::shutdown() {
243   if(id == 0)
244     s_global->shutdown();
245 }
246 
247 
~SchedulerWorker()248 S::SchedulerWorker::~SchedulerWorker() {
249   if(id == 0)
250     delete s_global;
251 }
252 
253 
schedule(workitem * item)254 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
255   int c = item->prefix_info.cluster_id;
256   ENGINE_ERROR_CODE response_code;
257   NdbInstance *inst = 0;
258   const KeyPrefix *pfx;
259   S::WorkerConnection *wc;
260 
261   wc = * (s_global->getWorkerConnectionPtr(id, c));
262   if(wc == 0) return ENGINE_FAILED;
263 
264   if(wc->freelist) {                 /* Get the next NDB from the freelist. */
265     inst = wc->freelist;
266     wc->freelist = inst->next;
267   }
268   else {                             /* No free NDBs. */
269     if(wc->sendqueue->is_aborted()) {
270       return ENGINE_TMPFAIL;
271     }
272     else {                           /* Try to make an NdbInstance on the fly */
273       inst = wc->newNdbInstance();
274       if(inst) {
275         log_app_error(& AppError29024_autogrow);
276       }
277       else {
278         /* We have hit a hard maximum.  Eventually Scheduler::io_completed()
279            will run _in this thread_ and return an NDB to the freelist.
280            But no other thread can free one, so here we return an error.
281          */
282         log_app_error(& AppError29002_NoNDBs);
283         return ENGINE_TMPFAIL;
284       }
285     }
286   }
287 
288   assert(inst);
289   inst->link_workitem(item);
290 
291   // Fetch the query plan for this prefix.
292   pfx = wc->setQueryPlanInWorkitem(item);
293   if(! item->plan) {
294     DEBUG_PRINT("getPlanForPrefix() failure");
295     return ENGINE_FAILED;
296   }
297 
298   // Build the NDB transaction
299   op_status_t op_status = worker_prepare_operation(item);
300 
301   // Success; put the workitem on the send queue and return ENGINE_EWOULDBLOCK.
302   if(op_status == op_prepared) {
303     /* Put the prepared item onto a send queue */
304     wc->sendqueue->produce(inst);
305     DEBUG_PRINT("%d.%d placed on send queue.", id, inst->wqitem->id);
306 
307     /* This locking is explained in run_ndb_send_thread() */
308     if(pthread_mutex_trylock( & wc->conn->sem.lock) == 0) {  // try the lock
309       wc->conn->sem.counter++;                               // increment
310       pthread_cond_signal( & wc->conn->sem.not_zero);        // signal
311       pthread_mutex_unlock( & wc->conn->sem.lock);           // release
312     }
313 
314     response_code = ENGINE_EWOULDBLOCK;
315   }
316   else {
317     /* Status is not op_prepared, but rather some error status */
318     response_code = item->status->status;
319   }
320 
321   return response_code;
322 }
323 
324 
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)325 void S::SchedulerWorker::prepare(NdbTransaction * tx,
326                                  NdbTransaction::ExecType execType,
327                                  NdbAsynchCallback callback,
328                                  workitem * item, prepare_flags flags) {
329   tx->executeAsynchPrepare(execType, callback, (void *) item);
330   if(flags == RESCHEDULE) item->base.reschedule = 1;
331 }
332 
333 
close(NdbTransaction * tx,workitem * item)334 void S::SchedulerWorker::close(NdbTransaction *tx, workitem *item) {
335   Uint64 nwaits_pre, nwaits_post;
336   Ndb * & ndb = item->ndb_instance->db;
337 
338   nwaits_pre  = ndb->getClientStat(Ndb::WaitExecCompleteCount);
339   tx->close();
340   nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
341 
342   if(nwaits_post > nwaits_pre)
343     log_app_error(& AppError29023_SyncClose);
344 }
345 
346 
347 /* Release the resources used by an operation.
348    Unlink the NdbInstance from the workitem, and return it to the free list
349    (or free it, if the scheduler is shutting down).
350 */
release(workitem * item)351 void S::SchedulerWorker::release(workitem *item) {
352   DEBUG_ENTER();
353   NdbInstance *inst = item->ndb_instance;
354 
355   if(inst) {
356     inst->unlink_workitem(item);
357     int c = item->prefix_info.cluster_id;
358     S::WorkerConnection * wc = * (s_global->getWorkerConnectionPtr(id, c));
359     if(wc && ! wc->sendqueue->is_aborted()) {
360       inst->next = wc->freelist;
361       wc->freelist = inst;
362       // DEBUG_PRINT("Returned NdbInstance to freelist.");
363     }
364     else {
365       /* We are in the midst of shutting down (and possibly reconfiguring) */
366       delete inst;
367     }
368   }
369 }
370 
371 
global_reconfigure(Configuration * new_cf)372 bool S::SchedulerWorker::global_reconfigure(Configuration *new_cf) {
373   return s_global->reconfigure(new_cf);
374 }
375 
376 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)377 void S::SchedulerWorker::add_stats(const char *stat_key,
378                                    ADD_STAT add_stat,
379                                    const void *cookie) {
380   s_global->add_stats(stat_key, add_stat, cookie);
381 }
382 
383 
384 /* Cluster methods */
Cluster(SchedulerGlobal * global,int _id)385 S::Cluster::Cluster(SchedulerGlobal *global, int _id) :
386   threads_started(false),
387   cluster_id(_id),
388   nreferences(0)
389 {
390   DEBUG_PRINT("%d", cluster_id);
391 
392   /* How many cluster connections are wanted?
393      If options.n_connections is zero (the default) we want one connection
394      per 50,000 desired TPS.  (The default for TPS is 100,000 -- so, two
395      connections).   But if a number is specified in the config, use that
396      instead.
397   */
398   if(global->options.n_connections) {
399     nconnections = global->options.n_connections;
400   }
401   else {
402     const int connection_tps = 50000;
403     nconnections = global->conf->max_tps / connection_tps;
404     if(global->conf->max_tps % connection_tps) nconnections += 1;
405   }
406   assert(nconnections > 0);
407 
408   /* Get our connection pool */
409   ClusterConnectionPool *pool = global->conf->getConnectionPoolById(cluster_id);
410 
411   /* Some NDB Cluster Connections are already open;
412      if we want more, try to add them now. */
413   // TODO: If you reconfigure too many times you run out of connections ...
414   DEBUG_PRINT("Cluster %d, have %d connection(s), want %d",
415               cluster_id, pool->getPoolSize(), nconnections);
416   for(int i = pool->getPoolSize(); i < nconnections ; i ++) {
417     Ndb_cluster_connection * c = pool->addPooledConnection();
418     if(c == 0) {
419       /* unable to create any more connections */
420       nconnections = i;
421       break;
422     }
423   }
424 
425   logger->log(LOG_WARNING, 0, "Scheduler: using %d connection%s to cluster %d\n",
426               nconnections, nconnections == 1 ? "" : "s", cluster_id);
427 
428   /* Instantiate the Connection objects */
429   connections = new S::Connection * [nconnections];
430   for(int i = 0; i < nconnections ; i++) {
431     connections[i] = new S::Connection(*this, i);
432   }
433 }
434 
435 
startThreads()436 void S::Cluster::startThreads() {
437   /* Threads are started only once and persist across reconfiguration.
438      But, this method will be called again for each reconf. */
439   if(threads_started == false) {
440     for(int i = 0 ; i < nconnections; i++) {
441       connections[i]->startThreads();
442     }
443     threads_started = true;
444   }
445 }
446 
447 
~Cluster()448 S::Cluster::~Cluster() {
449   DEBUG_PRINT("Shutting down cluster %d", cluster_id);
450   for(int i = 0; i < nconnections ; i++) {
451     delete connections[i];
452   }
453 }
454 
455 
getWorkerConnectionPtr(int thd) const456 S::WorkerConnection ** S::Cluster::getWorkerConnectionPtr(int thd) const {
457   return s_global->getWorkerConnectionPtr(thd, cluster_id);
458 }
459 
460 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)461 void S::Cluster::add_stats(const char *stat_key,
462                            ADD_STAT add_stat,
463                            const void *cookie) {
464   for(int c = 0 ; c < nconnections ; c++) {
465     connections[c]->add_stats(stat_key, add_stat, cookie);
466   }
467 }
468 
469 
470 /* WorkerConnection methods */
471 
472 
newNdbInstance()473 NdbInstance * S::WorkerConnection::newNdbInstance() {
474   NdbInstance *inst = 0;
475   if(instances.current < instances.max) {
476     inst = new NdbInstance(conn->conn, 2);
477     instances.current++;
478     inst->id = ((id.thd + 1) * 10000) + instances.current;
479   }
480   return inst;
481 }
482 
483 
WorkerConnection(SchedulerGlobal * global,int thd_id,int cluster_id)484 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
485                                       int thd_id, int cluster_id) :
486   SchedulerConfigManager(thd_id, cluster_id)
487 {
488   S::Cluster *cl = global->clusters[cluster_id];
489 
490   id.thd = thd_id;
491   id.cluster = cluster_id;
492   id.conn = thd_id % cl->nconnections;  // round-robin assignment
493   conn = cl->connections[id.conn];
494   id.node = conn->node_id;
495 
496   /* How many NDB instances to start initially */
497   instances.initial = conn->instances.initial / conn->n_workers;
498 
499   /* Maximum size of send queue, and upper bound on NDB instances */
500   instances.max = conn->instances.max / conn->n_workers;
501 
502   /* Build the freelist */
503   freelist = 0;
504   for(instances.current = 0; instances.current < instances.initial; ) {
505     NdbInstance *inst = newNdbInstance();
506     inst->next = freelist;
507     freelist = inst;
508   }
509 
510   DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
511               id.cluster, id.conn, id.node, id.thd, instances.current);
512 
513   /* Initialize the sendqueue */
514   sendqueue = new Queue<NdbInstance>(instances.max);
515 
516   /* Hoard a transaction (an API connect record) for each Ndb object.  This
517    * first call to startTransaction() will send TC_SEIZEREQ and wait for a
518    * reply, but later at runtime startTransaction() should return immediately.
519    */
520   NdbTransaction ** txlist = new NdbTransaction * [instances.current];
521   int i = 0;
522 
523   // Open them all.
524   for(NdbInstance *inst = freelist; inst != 0 ;inst=inst->next, i++) {
525     NdbTransaction *tx;
526     tx = inst->db->startTransaction();
527     if(! tx) log_ndb_error(inst->db->getNdbError());
528     txlist[i] = tx;
529   }
530 
531   // Close them all.
532   for(i = 0 ; i < instances.current ; i++) {
533     if(txlist[i])
534       txlist[i]->close();
535   }
536 
537   // Free the list.
538   delete[] txlist;
539 }
540 
541 
~WorkerConnection()542 S::WorkerConnection::~WorkerConnection() {
543   DEBUG_ENTER_METHOD("S::WorkerConnection::~WorkerConnection");
544 
545   /* Delete all of the Ndbs that are not currently in use */
546   NdbInstance *inst = freelist;
547   while(inst != 0) {
548     NdbInstance *next = inst->next;
549     delete inst;
550     inst = next;
551   }
552 
553   /* Delete the sendqueue */
554   delete sendqueue;
555 }
556 
557 
558 /* Connection methods */
559 
Connection(S::Cluster & _cl,int _id)560 S::Connection::Connection(S::Cluster & _cl, int _id) :
561   cluster(_cl), id(_id)
562 {
563   S::SchedulerGlobal *global = s_global;
564   Configuration *conf = global->conf;
565   n_total_workers = global->options.n_worker_threads;
566 
567   /* Get the connection pool for my cluster */
568   ClusterConnectionPool *pool = conf->getConnectionPoolById(cluster.cluster_id);
569 
570   /* Get my connection from the pool */
571   conn = pool->getPooledConnection(id);
572   node_id = conn->node_id();
573 
574   /* Set the timer on the adaptive send thread */
575   conn->set_max_adaptive_send_time(global->options.send_timer);
576 
577   /* How many worker threads will use this connection? */
578   n_workers = global->options.n_worker_threads / cluster.nconnections;
579   if(n_total_workers % cluster.nconnections > id) n_workers += 1;
580 
581   /* How many NDB objects are needed for the desired performance? */
582   double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
583   instances.initial = (int) (total_ndb_objects / cluster.nconnections);
584   while(instances.initial % n_workers) instances.initial++; // round up
585 
586   /* The maximum number of NDB objects.
587    * This is used to configure hard limits on the size of the waitgroup,
588    * the sentqueue, and the reschedulequeue -- and it will not be
589    * possible to increase those limits during online reconfig.
590    */
591   instances.max = instances.initial;
592   // allow the pool to grow on demand?
593   if(global->options.auto_grow)
594     instances.max = (int) (instances.max * 1.6);
595   // max_clients imposes a hard upper limit
596   if(instances.max > (global->options.max_clients / cluster.nconnections))
597     instances.max = global->options.max_clients / cluster.nconnections;
598   // instances.initial might also be subject to the max_clients limit
599   if(instances.initial > instances.max)
600     instances.initial = instances.max;
601 
602   /* Get a multi-wait Poll Group */
603   pollgroup = conn->create_ndb_wait_group(instances.max);
604 
605   /* Initialize the statistics */
606   stats.sent_operations = 0;
607   stats.batches = 0;
608   stats.timeout_races = 0;
609 
610   /* Initialize the semaphore */
611   pthread_mutex_init(& sem.lock, NULL);
612   init_condition_var(& sem.not_zero);
613   sem.counter = 0;
614 
615   /* Initialize the queues for sent and resceduled items */
616   sentqueue = new Queue<NdbInstance>(instances.max);
617   reschedulequeue = new Queue<NdbInstance>(instances.max);
618 }
619 
620 
startThreads()621 void S::Connection::startThreads() {
622   /* Start the poll thread */
623   pthread_create( & poll_thread_id, NULL, run_poll_thread, (void *) this);
624 
625   /* Start the send thread */
626   pthread_create( & send_thread_id, NULL, run_send_thread, (void *) this);
627 }
628 
629 
~Connection()630 S::Connection::~Connection() {
631   /* Shut down a connection.
632      The send thread should send everything in its queue.
633      The poll thread should wait for everything in its waitgroup.
634      Then they should both shut down.
635   */
636   DEBUG_ENTER_METHOD("S::Connection::~Connection");
637   pthread_join(send_thread_id, NULL);
638   DEBUG_PRINT("Cluster %d connection %d send thread has quit.",
639               cluster.cluster_id, id);
640 
641   pthread_join(poll_thread_id, NULL);
642   DEBUG_PRINT("Cluster %d connection %d poll thread has quit.",
643               cluster.cluster_id, id);
644 
645   /* Delete the queues */
646   assert(sentqueue->is_aborted());
647   delete sentqueue;
648   delete reschedulequeue;
649 
650   /* Delete the semaphore */
651   pthread_cond_destroy(& sem.not_zero);
652   pthread_mutex_destroy(& sem.lock);
653 
654   /* Release the multiwait group */
655   conn->release_ndb_wait_group(pollgroup);
656 }
657 
658 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)659 void S::Connection::add_stats(const char *stat_key,
660                               ADD_STAT add_stat,
661                               const void *cookie) {
662   char key[128];
663   char val[128];
664   int klen, vlen;
665 
666   klen = sprintf(key, "cl%d.conn%d.sent_operations", cluster.cluster_id, id);
667   vlen = sprintf(val, "%llu", stats.sent_operations);
668   add_stat(key, klen, val, vlen, cookie);
669 
670   klen = sprintf(key, "cl%d.conn%d.batches", cluster.cluster_id, id);
671   vlen = sprintf(val, "%llu", stats.batches);
672   add_stat(key, klen, val, vlen, cookie);
673 
674   klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
675   vlen = sprintf(val, "%llu", stats.timeout_races);
676   add_stat(key, klen, val, vlen, cookie);
677 
678   klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
679   vlen = sprintf(val, "%d", instances.initial);
680   add_stat(key, klen, val, vlen, cookie);
681 
682   klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
683   vlen = sprintf(val, "%d", instances.max);
684   add_stat(key, klen, val, vlen, cookie);
685 }
686 
687 
688 /*
689  Some design features of the send thread
690 
691  1:  When a worker thread has an item ready to send, it tries to acquire
692  the mutex and post to the semaphore.  The send thread sleeps on the
693  semaphore's condition variable waiting for a worker to post to it.
694  But if a worker thread finds the mutex already locked, it simply
695  skips posting the semaphore; some other thread must be posting anyway.
696  This sets up a possible race where a worker may queue an item but the
697  send thread misses it.  Therefore the send thread always sets a timeout
698  when waiting, and always examines the queues after the timer expires.
699 
700  2: The occurence of the race described above is recorded in the
701  stats.timeout_races counter.
702 
703  3: How long is the timeout? It varies from a low value when the server is
704  busy to a high one when idle.  Also, when busy, we try to reduce the number
705  of calls to gettimeofday() or clock_gettime() to one per timeout_msec ms
706  rather than one per iteration.
707 */
run_ndb_send_thread()708 void * S::Connection::run_ndb_send_thread() {
709   /* Set thread identity */
710   thread_identifier tid;
711   tid.pipeline = 0;
712   snprintf(tid.name, THD_ID_NAME_LEN,
713            "cl%d.conn%d.send", cluster.cluster_id, id);
714   set_thread_id(&tid);
715 
716   DEBUG_ENTER();
717 
718   NdbInstance *readylist;     /* list of items fetched from queues */
719   int nready = 0;             /* number of items on the readylist */
720   int nsent = 0;              /* number sent in this iteration */
721   int c_wait = 0;             /* return value from pthread_cond_timedwait() */
722   struct timespec timer;
723   const int timeout_min = 200;   /* "busy" server timeout */
724   const int timeout_max = 3200;  /* "idle" server timeout */
725   int timeout_msec = timeout_min;
726   int shutting_down = 0;
727 
728   while(1) {
729     if(nsent == 0) {  /* nothing sent last time through the loop */
730       if(shutting_down) {
731         sentqueue->abort();
732         pollgroup->wakeup();
733         return 0;
734       }
735 
736       if(timeout_msec < timeout_max) {
737         timeout_msec *= 2;  /* progress from "busy" towards "idle" */
738       }
739       timespec_get_time(& timer);
740       timespec_add_msec(& timer, timeout_msec);
741     }
742 
743     /* Acquire the semaphore */
744     pthread_mutex_lock(& sem.lock);
745     if(sem.counter == 0) {
746       c_wait = pthread_cond_timedwait(& sem.not_zero, & sem.lock, & timer);
747     }
748     sem.counter = 0;
749     pthread_mutex_unlock(& sem.lock);
750 
751     /* There are several queues that may have NDBs ready for sending.
752        Examine all of them, and consolidate all of the ready NDBs into a
753        single list. */
754     nready = 0;
755     readylist = 0;
756 
757     /* First check the reschedule queue */
758     nready += get_operations_from_queue(& readylist, reschedulequeue);
759 
760     /* Then the worker thread queues */
761     for(int w = id; w < n_total_workers; w += cluster.nconnections) {
762       S::WorkerConnection *wc = * (cluster.getWorkerConnectionPtr(w));
763       DEBUG_ASSERT(wc->id.conn == id);
764       nready += get_operations_from_queue(& readylist, wc->sendqueue);
765       if(wc->sendqueue->is_aborted()) {
766         shutting_down = 1;
767       }
768     }
769 
770     /* Now walk the readylist.  Send pending operations from the NDBs there,
771        then place them on the sent-items queue for the poll thread. */
772     nsent = 0;
773     if(nready) {
774       for(NdbInstance *inst = readylist; inst != NULL ; inst = inst->next) {
775         int force = 0;
776         if(nready == 1 && s_global->options.force_send == 1) {
777           force = 1; // force-send the last item in the list
778         }
779 
780         /* Send the operations */
781         inst->db->sendPreparedTransactions(force);
782         DEBUG_PRINT("Sent %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
783 
784         /* Give the instance to the poll thread */
785         sentqueue->produce(inst);
786 
787         nsent += 1;
788         nready -= 1;
789       }
790 
791       stats.batches += 1;
792       stats.sent_operations += nsent;
793       if(c_wait == ETIMEDOUT) {
794         stats.timeout_races += 1;
795       }
796 
797       pollgroup->wakeup();
798 
799       timeout_msec = timeout_min;  /* we are now "busy" */
800     }
801   }
802 }
803 
804 
get_operations_from_queue(NdbInstance ** readylist,Queue<NdbInstance> * q)805 int S::Connection::get_operations_from_queue(NdbInstance **readylist,
806                                              Queue<NdbInstance> *q) {
807   int n = 0;
808   NdbInstance *inst;
809   while((inst = q->consume()) != NULL) {
810     assert(inst->db);
811     inst->next = *readylist;
812     *readylist = inst;
813     n++;
814   }
815   return n;
816 }
817 
818 
run_ndb_poll_thread()819 void * S::Connection::run_ndb_poll_thread() {
820   /* Set thread identity */
821   thread_identifier tid;
822   tid.pipeline = 0;
823   snprintf(tid.name, THD_ID_NAME_LEN,
824            "cl%d.conn%d.poll", cluster.cluster_id, id);
825   set_thread_id(&tid);
826 
827   DEBUG_ENTER();
828 
829   NdbInstance *inst;
830   int wait_timeout_millisec = 5000;
831   int pct_ready;
832   int in_flight = 0;
833 
834   while(1) {
835     if(in_flight == 0 && sentqueue->is_aborted()) {
836       return 0;
837     }
838 
839     int n_added = 0;
840     /* Add new NDBs to the poll group */
841     while((inst = sentqueue->consume()) != 0) {
842       assert(inst->db);
843       inst->next = 0;
844       DEBUG_PRINT(" ** adding %d.%d to wait group ** ",
845                   inst->wqitem->pipeline->id, inst->wqitem->id);
846       if(! pollgroup->push(inst->db)) {
847         n_added++;
848         in_flight++;
849       }
850     }
851 
852     /* What's the minimum number of ready Ndb's to wake up for? */
853     pct_ready = (n_added > 4) ? 25 : 1;
854 
855     /* Wait until something is ready to poll */
856     int nwaiting = pollgroup->wait(wait_timeout_millisec, pct_ready);
857 
858     /* Poll the ones that are ready */
859     if(nwaiting > 0) {
860       for(int i = 0; i < nwaiting ; i++) {
861         in_flight--;
862         assert(in_flight >= 0);
863         Ndb *db = pollgroup->pop();
864         inst = (NdbInstance *) db->getCustomData();
865         DEBUG_PRINT("Polling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
866         db->pollNdb(0, 1);
867 
868         if(inst->wqitem->base.reschedule) {
869           DEBUG_PRINT("Rescheduling %d.%d", inst->wqitem->pipeline->id, inst->wqitem->id);
870           inst->wqitem->base.reschedule = 0;
871           reschedulequeue->produce(inst);  // Put it on the reschedule queue
872           if(pthread_mutex_trylock( & sem.lock) == 0) {
873             sem.counter++;
874             pthread_cond_signal(& sem.not_zero);  // Ping the send thread
875             pthread_mutex_unlock(& sem.lock);
876           }
877         }
878         else {
879           // Scheduler yielded. Notify memcached that the operation is complete.
880           DEBUG_PRINT("item_io_complete for %d.%d",
881                       inst->wqitem->pipeline->id, inst->wqitem->id);
882           item_io_complete(inst->wqitem);
883         }
884       }
885     }
886   }
887   return 0; /* not reached */
888 }
889 
890 
run_send_thread(void * v)891 void * run_send_thread(void *v) {
892   S::Connection *c = (S::Connection *) v;
893   return c->run_ndb_send_thread();
894 }
895 
run_poll_thread(void * v)896 void * run_poll_thread(void *v) {
897   S::Connection *c = (S::Connection *) v;
898   return c->run_ndb_poll_thread();
899 }
900 
901 
902