1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26
27 /* configure defines */
28 #include <my_config.h>
29
30 /* System headers */
31 /* C++ files must define __STDC_FORMAT_MACROS in order to get PRIu64 */
32 #define __STDC_FORMAT_MACROS
33 #include <inttypes.h>
34 #include <stdio.h>
35
36 /* Memcache headers */
37 #include "memcached/types.h"
38 #include <memcached/extension_loggers.h>
39
40 /* NDB Memcache headers */
41 #include <NdbApi.hpp>
42 #include "Stockholm.h"
43 #include "ndb_worker.h"
44
45 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
46
47 class commit_thread_spec {
48 public:
commit_thread_spec(Scheduler_stockholm * s,int i)49 commit_thread_spec(Scheduler_stockholm *s, int i): sched(s), cluster_id(i) {};
50 Scheduler_stockholm *sched;
51 int cluster_id;
52 };
53
54 extern "C" {
55 void * run_stockholm_commit_thread(void *);
56 }
57
58
init(int my_thread,const scheduler_options * options)59 void Scheduler_stockholm::init(int my_thread,
60 const scheduler_options *options) {
61 const Configuration & conf = get_Configuration();
62
63 /* How many NDB instances are needed per cluster? */
64 for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
65 double total_ndb_objects = conf.figureInFlightTransactions(c);
66 cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
67 #ifdef DEBUG_OUTPUT
68 ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
69 DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
70 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
71 #endif
72 }
73
74 // Get the ConnQueryPlanSet and NDB instances for each cluster.
75 for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
76 cluster[c].instances = (NdbInstance**)
77 calloc(cluster[c].nInst, sizeof(NdbInstance *));
78
79 ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
80 Ndb_cluster_connection *conn = pool->getPooledConnection(my_thread);
81
82 cluster[c].plan_set = new ConnQueryPlanSet(conn, conf.nprefixes);
83 cluster[c].plan_set->buildSetForConfiguration(&conf, c);
84
85 cluster[c].nextFree = NULL;
86 for(int i = 0; i < cluster[c].nInst ; i++) {
87 NdbInstance *inst = new NdbInstance(conn, 1);
88 cluster[c].instances[i] = inst;
89 inst->next = cluster[c].nextFree;
90 cluster[c].nextFree = inst;
91 }
92
93 logger->log(LOG_WARNING, 0, "Pipeline %d using %u Ndb instances for Cluster %u.\n",
94 my_thread, cluster[c].nInst, c);
95 }
96
97
98 /* Hoard a transaction (an API connect record) for each Ndb object. This
99 first call to startTransaction() will send TC_SEIZEREQ and wait for a
100 reply, but later at runtime startTransaction() should return immediately.
101 TODO? Start one tx on each data node.
102 */
103 QueryPlan *plan;
104 // const KeyPrefix *default_prefix = conf.getDefaultPrefix(); // TODO: something
105 for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
106 const KeyPrefix *prefix = conf.getNextPrefixForCluster(c, NULL);
107 if(prefix) {
108 NdbTransaction ** txlist;
109 txlist = ( NdbTransaction **) calloc(cluster[c].nInst, sizeof(NdbTransaction *));
110 // Open them all.
111 for(int i = 0 ; i < cluster[c].nInst ; i++) {
112 plan = cluster[c].plan_set->getPlanForPrefix(prefix);
113 txlist[i] = cluster[c].instances[i]->db->startTransaction();
114 }
115 // Close them all.
116 for(int i = 0 ; i < cluster[c].nInst ; i++) {
117 txlist[i]->close();
118 }
119 // Free the list.
120 free(txlist);
121 }
122 }
123
124 /* Allocate and initialize a workqueue for each cluster.
125 The engine thread will add items to this queue, and the commit thread will
126 consume them.
127 */
128 for(unsigned int c = 0 ; c < conf.nclusters; c++) {
129 cluster[c].queue = (struct workqueue *) malloc(sizeof(struct workqueue));
130 workqueue_init(cluster[c].queue, 8192, 1);
131 }
132 }
133
134
attach_thread(thread_identifier * parent)135 void Scheduler_stockholm::attach_thread(thread_identifier * parent) {
136 pipeline = parent->pipeline;
137 const Configuration & conf = get_Configuration();
138
139 logger->log(LOG_WARNING, 0, "Pipeline %d attached to Stockholm scheduler; "
140 "launching %d commit thread%s.\n", pipeline->id, conf.nclusters,
141 conf.nclusters == 1 ? "" : "s");
142
143 for(unsigned int c = 0 ; c < conf.nclusters; c++) {
144 cluster[c].stats.cycles = 0;
145 cluster[c].stats.commit_thread_vtime = 0;
146
147 // Launch the commit thread
148 commit_thread_spec * spec = new commit_thread_spec(this, c);
149 pthread_create(& cluster[c].commit_thread_id, NULL,
150 run_stockholm_commit_thread, (void *) spec);
151 }
152 }
153
154
shutdown()155 void Scheduler_stockholm::shutdown() {
156 DEBUG_ENTER();
157 const Configuration & conf = get_Configuration();
158
159 /* Shut down the workqueues */
160 for(unsigned int c = 0 ; c < conf.nclusters; c++)
161 workqueue_abort(cluster[c].queue);
162
163 /* Close all of the Ndbs */
164 for(unsigned int c = 0 ; c < conf.nclusters; c++) {
165 for(int i = 0 ; i < cluster[c].nInst ; i++) {
166 delete cluster[c].instances[i];
167 }
168 }
169 }
170
171
~Scheduler_stockholm()172 Scheduler_stockholm::~Scheduler_stockholm() {
173 logger->log(LOG_WARNING, 0, "Shutdown completed.");
174 }
175
176
schedule(workitem * newitem)177 ENGINE_ERROR_CODE Scheduler_stockholm::schedule(workitem *newitem) {
178 NdbInstance *inst;
179 int c;
180 const Configuration & conf = get_Configuration();
181
182 /* Fetch the config for its key prefix */
183 const KeyPrefix *pfx = conf.getPrefixByInfo(newitem->prefix_info);
184
185 if(newitem->prefix_info.prefix_id) {
186 DEBUG_PRINT("prefix %d: \"%s\" Table: %s Value Cols: %d",
187 newitem->prefix_info.prefix_id, pfx->prefix,
188 pfx->table->table_name, pfx->table->nvaluecols);
189 }
190
191 /* From here on we will work mainly with the suffix part of the key. */
192 newitem->base.nsuffix = newitem->base.nkey - pfx->prefix_len;
193 if(newitem->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
194
195 c = newitem->prefix_info.cluster_id;
196
197 if (cluster[c].nextFree)
198 {
199 inst = cluster[c].nextFree;
200 cluster[c].nextFree = inst->next;
201 }
202 else
203 {
204 return ENGINE_TMPFAIL;
205 }
206
207 inst->link_workitem(newitem);
208
209 // Fetch the query plan for this prefix.
210 newitem->plan = cluster[c].plan_set->getPlanForPrefix(pfx);
211 if(! newitem->plan) return ENGINE_FAILED;
212
213 // Build the NDB transaction
214 op_status_t op_status = worker_prepare_operation(newitem);
215 ENGINE_ERROR_CODE response_code;
216
217 if(op_status == op_prepared) {
218 workqueue_add(cluster[c].queue, newitem); // place item on queue
219 response_code = ENGINE_EWOULDBLOCK;
220 } else {
221 /* Status is not op_prepared, but rather some error status */
222 response_code = newitem->status->status;
223 }
224
225 return response_code;
226 }
227
228
close(NdbTransaction * tx,workitem * item)229 void Scheduler_stockholm::close(NdbTransaction *tx, workitem *item) {
230 tx->close();
231 }
232
233
release(workitem * item)234 void Scheduler_stockholm::release(workitem *item) {
235 DEBUG_ENTER();
236 NdbInstance* inst = item->ndb_instance;
237
238 if(inst) {
239 inst->unlink_workitem(item);
240 int c = item->prefix_info.cluster_id;
241 inst->next = cluster[c].nextFree;
242 cluster[c].nextFree = inst;
243 }
244 }
245
246
add_stats(const char * stat_key,ADD_STAT add_stat,const void * cookie)247 void Scheduler_stockholm::add_stats(const char *stat_key,
248 ADD_STAT add_stat,
249 const void * cookie) {
250 char key[128];
251 char val[128];
252 int klen, vlen;
253 const Configuration & conf = get_Configuration();
254
255 if(strncasecmp(stat_key, "reconf", 6) == 0) {
256 add_stat("Reconf", 6, "unsupported", 11, cookie);
257 return;
258 }
259
260 for(unsigned int c = 0 ; c < conf.nclusters; c++) {
261 klen = sprintf(key, "pipeline_%d_cluster_%d_commit_cycles", pipeline->id, c);
262 vlen = sprintf(val, "%"PRIu64, cluster[c].stats.cycles);
263 add_stat(key, klen, val, vlen, cookie);
264
265 klen = sprintf(key, "pipeline_%d_cluster_%d_commit_thread_time", pipeline->id, c);
266 vlen = sprintf(val, "%"PRIu64, cluster[c].stats.commit_thread_vtime);
267 add_stat(key, klen, val, vlen, cookie);
268 }
269 }
270
271
prepare(NdbTransaction * tx,NdbTransaction::ExecType execType,NdbAsynchCallback callback,workitem * item,prepare_flags flags)272 void Scheduler_stockholm::prepare(NdbTransaction * tx,
273 NdbTransaction::ExecType execType,
274 NdbAsynchCallback callback,
275 workitem * item, prepare_flags flags) {
276 tx->executeAsynchPrepare(execType, callback, (void *) item);
277 if(flags == RESCHEDULE) item->base.reschedule = 1;
278 }
279
280
281 #define STAT_INTERVAL 50
282
run_stockholm_commit_thread(void * s)283 void * run_stockholm_commit_thread(void *s) {
284 commit_thread_spec *spec = (commit_thread_spec *) s;
285 spec->sched->run_ndb_commit_thread(spec->cluster_id);
286 delete spec;
287 return 0;
288 }
289
290 /*
291 Stockholm version of the commit_thread.
292 Get an item off the workqueue, and call pollNdb() on that item.
293 */
run_ndb_commit_thread(int c)294 void * Scheduler_stockholm::run_ndb_commit_thread(int c) {
295 workitem *item;
296 int polled;
297
298 DEBUG_ENTER();
299
300 while(1) {
301 /* Wait for something to appear on the queue */
302 item = (workitem *) workqueue_consumer_wait(cluster[c].queue);
303
304 if(item == NULL) break; /* queue has been shut down and emptied */
305
306 /* Send & poll for response; reschedule if needed */
307 do {
308 item->base.reschedule = 0;
309 polled = item->ndb_instance->db->sendPollNdb(10, 1, 1);
310 } while(item->base.reschedule || ! polled);
311
312 DEBUG_ASSERT(polled == 1); // i.e. not > 1
313
314 /* Now that sendPollNdb() has returned, it is OK to notify_io_complete(),
315 which will trigger the worker thread to release the Ndb instance. */
316 item_io_complete(item);
317
318 if(! (cluster[c].stats.cycles++ % STAT_INTERVAL))
319 cluster[c].stats.commit_thread_vtime = get_thread_vtime();
320 }
321
322 return NULL;
323 }
324
325
326