1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 
27 /* configure defines */
28 #include <my_config.h>
29 
30 /* System headers */
31 /* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
32 #define __STDC_FORMAT_MACROS
33 #include <inttypes.h>
34 #include <stdio.h>
35 
36 /* Memcache headers */
37 #include "memcached/types.h"
38 #include <memcached/extension_loggers.h>
39 
40 /* NDB Memcache headers */
41 #include <NdbApi.hpp>
42 #include "Stockholm.h"
43 #include "ndb_worker.h"
44 
45 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
46 
47 class commit_thread_spec {
48 public:
commit_thread_spec(Scheduler_stockholm * s,int i)49   commit_thread_spec(Scheduler_stockholm *s, int i): sched(s), cluster_id(i) {};
50   Scheduler_stockholm *sched;
51   int cluster_id;
52 };
53 
54 extern "C" {
55   void * run_stockholm_commit_thread(void *);
56 }
57 
58 
init(int my_thread,const scheduler_options * options)59 void Scheduler_stockholm::init(int my_thread,
60                                const scheduler_options *options) {
61   const Configuration & conf = get_Configuration();
62 
63   /* How many NDB instances are needed per cluster? */
64   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
65     double total_ndb_objects = conf.figureInFlightTransactions(c);
66     cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
67 #ifdef DEBUG_OUTPUT
68     ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
69     DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
70                 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
71 #endif
72   }
73 
74   // Get the ConnQueryPlanSet and NDB instances for each cluster.
75   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
76     cluster[c].instances = (NdbInstance**)
77       calloc(cluster[c].nInst, sizeof(NdbInstance *));
78 
79     ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
80     Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
81 
82     cluster[c].plan_set = new ConnQueryPlanSet(conn, conf.nprefixes);
83     cluster[c].plan_set->buildSetForConfiguration(&conf, c);
84 
85     cluster[c].nextFree = NULL;
86     for(int i = 0; i < cluster[c].nInst ; i++) {
87       NdbInstance *inst = new NdbInstance(conn, 1);
88       cluster[c].instances[i] = inst;
89       inst->next = cluster[c].nextFree;
90       cluster[c].nextFree = inst;
91     }
92 
93     logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
94                 my_thread, cluster[c].nInst, c);
95   }
96 
97 
98   /* Hoard a transaction (an API connect record) for each Ndb object.  This
99      first call to startTransaction() will send TC_SEIZEREQ and wait for a
100      reply, but later at runtime startTransaction() should return immediately.
101      TODO? Start one tx on each data node.
102   */
103   QueryPlan *plan;
104 //  const KeyPrefix *default_prefix = conf.getDefaultPrefix();  // TODO: something
105   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
106     const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL);
107     if(prefix) {
108       NdbTransaction ** txlist;
109       txlist = ( NdbTransaction **) calloc(cluster[c].nInst, sizeof(NdbTransaction *));
110       // Open them all.
111       for(int i = 0 ; i < cluster[c].nInst ; i++) {
112         plan = cluster[c].plan_set->getPlanForPrefix(prefix);
113         txlist[i] = cluster[c].instances[i]->db->startTransaction();
114       }
115       // Close them all.
116       for(int i = 0 ; i < cluster[c].nInst ; i++) {
117         txlist[i]->close();
118       }
119       // Free the list.
120       free(txlist);
121     }
122   }
123 
124   /* Allocate and initialize a workqueue for each cluster.
125      The engine thread will add items to this queue, and the commit thread will
126      consume them.
127   */
128   for(unsigned int c = 0 ; c < conf.nclusters; c++) {
129     cluster[c].queue = (struct workqueue *) malloc(sizeof(struct workqueue));
130     workqueue_init(cluster[c].queue, 8192, 1);
131   }
132 }
133 
134 
attach_thread(thread_identifier * parent)135 void Scheduler_stockholm::attach_thread(thread_identifier * parent) {
136   pipeline = parent->pipeline;
137   const Configuration & conf = get_Configuration();
138 
139   logger->log(LOG_WARNING, 0, "Pipeline %d attached to Stockholm scheduler; "
140               "launching %d commit thread%s.\n", pipeline->id, conf.nclusters,
141               conf.nclusters == 1 ? "" : "s");
142 
143   for(unsigned int c = 0 ; c < conf.nclusters; c++) {
144     cluster[c].stats.cycles = 0;
145     cluster[c].stats.commit_thread_vtime = 0;
146 
147     // Launch the commit thread
148     commit_thread_spec * spec = new commit_thread_spec(this, c);
149     pthread_create(& cluster[c].commit_thread_id, NULL,
150                    run_stockholm_commit_thread, (void *) spec);
151   }
152 }
153 
154 
shutdown()155 void Scheduler_stockholm::shutdown() {
156   DEBUG_ENTER();
157   const Configuration & conf = get_Configuration();
158 
159   /* Shut down the workqueues */
160   for(unsigned int c = 0 ; c < conf.nclusters; c++)
161     workqueue_abort(cluster[c].queue);
162 
163   /* Close all of the Ndbs */
164   for(unsigned int c = 0 ; c < conf.nclusters; c++) {
165     for(int i = 0 ; i < cluster[c].nInst ; i++) {
166       delete cluster[c].instances[i];
167     }
168   }
169 }
170 
171 
~Scheduler_stockholm()172 Scheduler_stockholm::~Scheduler_stockholm() {
173   logger->log(LOG_WARNING, 0, "Shutdown completed.");
174 }
175 
176 
schedule(workitem * newitem)177 ENGINE_ERROR_CODE Scheduler_stockholm::schedule(workitem *newitem) {
178   NdbInstance *inst;
179   int c;
180   const Configuration & conf = get_Configuration();
181 
182   /* Fetch the config for its key prefix */
183   const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
184 
185   if(newitem->prefix_info.prefix_id) {
186     DEBUG_PRINT("prefix %d: \"%s\" Table: %s  Value Cols: %d",
187                 newitem->prefix_info.prefix_id, pfx->prefix,
188                 pfx->table->table_name, pfx->table->nvaluecols);
189   }
190 
191   /* From here on we will work mainly with the suffix part of the key. */
192   newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
193   if(newitem->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
194 
195   c = newitem->prefix_info.cluster_id;
196 
197   if (cluster[c].nextFree)
198   {
199     inst = cluster[c].nextFree;
200     cluster[c].nextFree = inst->next;
201   }
202   else
203   {
204     return ENGINE_TMPFAIL;
205   }
206 
207   inst->link_workitem(newitem);
208 
209   // Fetch the query plan for this prefix.
210   newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
211   if(! newitem->plan) return ENGINE_FAILED;
212 
213   // Build the NDB transaction
214   op_status_t op_status = worker_prepare_operation(newitem);
215   ENGINE_ERROR_CODE response_code;
216 
217   if(op_status == op_prepared) {
218      workqueue_add(cluster[c].queue, newitem); // place item on queue
219      response_code = ENGINE_EWOULDBLOCK;
220   } else {
221     /* Status is not op_prepared, but rather some error status */
222     response_code = newitem->status->status;
223   }
224 
225   return response_code;
226 }
227 
228 
close(NdbTransaction * tx,workitem * item)229 void Scheduler_stockholm::close(NdbTransaction *tx, workitem *item) {
230   tx->close();
231 }
232 
233 
release(workitem * item)234 void Scheduler_stockholm::release(workitem *item) {
235   DEBUG_ENTER();
236   NdbInstance* inst = item->ndb_instance;
237 
238   if(inst) {
239     inst->unlink_workitem(item);
240     int c = item->prefix_info.cluster_id;
241     inst->next = cluster[c].nextFree;
242     cluster[c].nextFree = inst;
243   }
244 }
245 
246 
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)247 void Scheduler_stockholm::add_stats(const char *stat_key,
248                                     ADD_STAT add_stat,
249                                     const void * cookie) {
250   char key[128];
251   char val[128];
252   int klen, vlen;
253   const Configuration & conf = get_Configuration();
254 
255   if(strncasecmp(stat_key, "reconf", 6) == 0) {
256     add_stat("Reconf", 6, "unsupported", 11, cookie);
257     return;
258   }
259 
260   for(unsigned int c = 0 ; c < conf.nclusters; c++) {
261     klen = sprintf(key, "pipeline_%d_cluster_%d_commit_cycles", pipeline->id, c);
262     vlen = sprintf(val, "%"PRIu64, cluster[c].stats.cycles);
263     add_stat(key, klen, val, vlen, cookie);
264 
265     klen = sprintf(key, "pipeline_%d_cluster_%d_commit_thread_time", pipeline->id, c);
266     vlen = sprintf(val, "%"PRIu64, cluster[c].stats.commit_thread_vtime);
267     add_stat(key, klen, val, vlen, cookie);
268   }
269 }
270 
271 
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)272 void Scheduler_stockholm::prepare(NdbTransaction * tx,
273                                   NdbTransaction::ExecType execType,
274                                   NdbAsynchCallback callback,
275                                   workitem * item, prepare_flags flags) {
276   tx->executeAsynchPrepare(execType, callback, (void *) item);
277   if(flags == RESCHEDULE) item->base.reschedule = 1;
278 }
279 
280 
281 #define STAT_INTERVAL 50
282 
run_stockholm_commit_thread(void * s)283 void * run_stockholm_commit_thread(void *s) {
284   commit_thread_spec *spec = (commit_thread_spec *) s;
285   spec->sched->run_ndb_commit_thread(spec->cluster_id);
286   delete spec;
287   return 0;
288 }
289 
290 /*
291   Stockholm version of the commit_thread.
292   Get an item off the workqueue, and call pollNdb() on that item.
293  */
run_ndb_commit_thread(int c)294 void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
295   workitem *item;
296   int polled;
297 
298   DEBUG_ENTER();
299 
300   while(1) {
301     /* Wait for something to appear on the queue */
302     item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
303 
304     if(item == NULL) break;  /* queue has been shut down and emptied */
305 
306     /* Send & poll for response; reschedule if needed */
307     do {
308       item->base.reschedule = 0;
309       polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
310     } while(item->base.reschedule || ! polled);
311 
312     DEBUG_ASSERT(polled == 1);  // i.e. not > 1
313 
314     /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
315        which will trigger the worker thread to release the Ndb instance. */
316     item_io_complete(item);
317 
318     if(! (cluster[c].stats.cycles++ % STAT_INTERVAL))
319       cluster[c].stats.commit_thread_vtime = get_thread_vtime();
320   }
321 
322   return NULL;
323 }
324 
325 
326