1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3 reserved.
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License, version 2.0,
7 as published by the Free Software Foundation.
8
9 This program is also distributed with certain software (including
10 but not limited to OpenSSL) that is licensed under separate terms,
11 as designated in a particular file or component or in included license
12 documentation. The authors of MySQL hereby grant you an additional
13 permission to link the program and your derivative works with the
14 separately licensed software that they have included with MySQL.
15
16 This program is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License, version 2.0, for more details.
20
21 You should have received a copy of the GNU General Public License
22 along with this program; if not, write to the Free Software
23 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24 02110-1301 USA
25 */
26
27 /* configure defines */
28 #include <my_config.h>
29
30 /* System headers */
31 #define __STDC_FORMAT_MACROS
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <assert.h>
36 #include <string.h>
37 #include <inttypes.h>
38 #include <ctype.h>
39 #include <arpa/inet.h>
40
41 /* Memcache headers */
42 #include "memcached/types.h"
43 #include <memcached/extension_loggers.h>
44
45 /* NDB headers */
46 #include "NdbApi.hpp"
47
48 /* NDB Memcache headers */
49 #include "atomics.h"
50 #include "ndbmemcache_global.h"
51 #include "hash_item_util.h"
52 #include "workitem.h"
53 #include "Configuration.h"
54 #include "DataTypeHandler.h"
55 #include "ExpireTime.h"
56 #include "ExternalValue.h"
57 #include "TabSeparatedValues.h"
58 #include "debug.h"
59 #include "Operation.h"
60 #include "NdbInstance.h"
61 #include "status_block.h"
62 #include "Operation.h"
63 #include "Scheduler.h"
64 #include "ndb_engine.h"
65 #include "hash_item_util.h"
66 #include "ndb_worker.h"
67 #include "ndb_error_logger.h"
68 #include "ndb_engine_errors.h"
69
70 /**********************************************************
71 Schedduler::schedule()
72 worker_prepare_operation(workitem *)
73 WorkerStep1::do_op()
74 NdbTransaction::executeAsynchPrepare() with ndb_async_callback
75 ...
76 ndb_async_callback
77 * (workitem->next_step)()
78 ************************************************************/
79
80 /* The first phase of any operation is implemented as a method which begins
81 an NDB async transaction and returns an op_status_t to the scheduler.
82 */
83
84 class WorkerStep1 {
85 public:
86 WorkerStep1(struct workitem *);
87 op_status_t do_append(); // begin an append/prepend operation
88 op_status_t do_read(); // begin a read operation
89 op_status_t do_write(); // begin a SET/ADD/REPLACE operation
90 op_status_t do_delete(); // begin a delete operation
91 op_status_t do_math(); // begin an INCR/DECR operation
92
93 private:
94 /* Private member variables */
95 workitem *wqitem;
96 NdbTransaction *tx;
97 QueryPlan * &plan;
98
99 /* Private methods*/
100 bool setKeyForReading(Operation &);
101 bool startTransaction(Operation &);
102 };
103
104
105 /* Whenever an NDB async operation completes, control returns to a
106 callback function defined in executeAsyncPrepare().
107 In case of common errors, the main callback closes the tx and yields.
108 The incr callback has special-case error handling for increments.
109
110 typedef void ndb_async_callback(int, NdbTransaction *, void *);
111 */
112
113 ndb_async_callback callback_main; // general purpose callback
114 ndb_async_callback callback_incr; // special callback for incr/decr
115 ndb_async_callback callback_close; // just call worker_close()
116
117
118 /*
119 The next step is a function that conforms to the worker_step signature.
120 It must either call yield() or reschedule(), and is also responsible for
121 closing the transaction. The signature is in ndb_worker.h:
122
123 FIXME: yield() and reschedule() no longer exist --- what must it do ?????
124
125 typedef void worker_step(NdbTransaction *, workitem *);
126 */
127
128 worker_step worker_close; /* Close tx and yield scheduler */
129 worker_step worker_commit; /* exec(Commit) if needed before close */
130 worker_step worker_append;
131 worker_step worker_check_read;
132 worker_step worker_finalize_read;
133 worker_step worker_finalize_write;
134
135 /*****************************************************************************/
136
137 /* Misc utility functions */
138 void worker_set_cas(ndb_pipeline *, uint64_t *);
139 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
140 void build_hash_item(workitem *, Operation &, ExpireTime &);
141
142 /* Extern pointers */
143 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
144
145 /* Prototype for ndb_allocate() from ndb_engine.c: */
146 ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
147 const void* cookie,
148 item **item,
149 const void* key,
150 const size_t nkey,
151 const size_t nbytes,
152 const int flags,
153 const rel_time_t exptime);
154
155 /* Return Status Descriptions */
156 status_block status_block_generic_success =
157 { ENGINE_SUCCESS , "Transaction succeeded" };
158
159 status_block status_block_item_not_found =
160 { ENGINE_KEY_ENOENT, "Item Not Found" };
161
162 status_block status_block_misc_error =
163 { ENGINE_FAILED, "Transaction failed" };
164
165 status_block status_block_memcache_error =
166 { ENGINE_FAILED, "Cache level error" };
167
168 status_block status_block_cas_mismatch =
169 { ENGINE_KEY_EEXISTS, "CAS mismatch" };
170
171 status_block status_block_bad_add =
172 { ENGINE_NOT_STORED, "Duplicate key on insert" };
173
174 status_block status_block_bad_replace =
175 { ENGINE_NOT_STORED, "Tuple not found" };
176
177 status_block status_block_idx_insert =
178 { ENGINE_NOT_STORED, "Cannot insert via unique index" };
179
180 status_block status_block_too_big =
181 { ENGINE_E2BIG, "Value too large" };
182
183 status_block status_block_no_mem =
184 { ENGINE_ENOMEM, "NDB out of data memory" };
185
186 status_block status_block_temp_failure =
187 { ENGINE_TMPFAIL, "NDB Temporary Error" };
188
189 status_block status_block_op_not_supported =
190 { ENGINE_ENOTSUP, "Operation not supported" };
191
192 status_block status_block_op_bad_key =
193 { ENGINE_EINVAL, "Invalid Key" };
194
195
196 /* valgrind will complain that setting "* wqitem->cas = x" is an invalid
197 write of 8 bytes. But this is not a bug, just an artifact of the unorthodox
198 way memcached allocates the (optional) 8 byte CAS ID past the end of a
199 defined structure.
200 */
worker_set_cas(ndb_pipeline * p,uint64_t * cas)201 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {
202 /* Be careful here -- atomic_int32_t might be a signed type.
203 Shitfting of signed types behaves differently. */
204 bool did_inc;
205 uint32_t cas_lo;
206 uint32_t & cas_hi = p->engine->cas_hi;
207 do {
208 cas_lo = p->engine->cas_lo;
209 did_inc = atomic_cmp_swap_int(& p->engine->cas_lo, cas_lo, cas_lo + 1);
210 } while(! did_inc);
211 *cas = uint64_t(cas_lo) | (uint64_t(cas_hi) << 32);
212 DEBUG_PRINT_DETAIL("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas);
213 }
214
215
216 /* worker_set_ext_flag():
217 Determine whether a workitem should take the special "external values" path.
218 Sets item->base.use_ext_val
219 */
worker_set_ext_flag(workitem * item)220 void worker_set_ext_flag(workitem *item) {
221 bool result = false;
222
223 if(item->plan->canHaveExternalValue()) {
224 switch(item->base.verb) {
225 /* INSERTS only need the special path if the value is large */
226 case OPERATION_ADD:
227 result = item->plan->shouldExternalizeValue(item->cache_item->nbytes);
228 break;
229
230 case OP_ARITHMETIC:
231 result = false;
232 break;
233
234 default:
235 result = true;
236 }
237 }
238 item->base.use_ext_val = result;
239 DEBUG_PRINT_DETAIL(" %d.%d: %s", item->pipeline->id, item->id, result ? "T" : "F");
240 }
241
242
243 /* worker_prepare_operation():
244 Called from the scheduler.
245 Returns op_prepared if Scheduler::execute() has been called on the item.
246 */
worker_prepare_operation(workitem * newitem)247 op_status_t worker_prepare_operation(workitem *newitem) {
248 WorkerStep1 worker(newitem);
249 op_status_t r;
250
251 worker_set_ext_flag(newitem);
252
253 /* Jump table */
254 switch(newitem->base.verb) {
255 case OP_READ:
256 r = worker.do_read();
257 break;
258
259 case OPERATION_APPEND:
260 case OPERATION_PREPEND:
261 r = worker.do_append();
262 break;
263
264 case OP_DELETE:
265 r = worker.do_delete();
266 break;
267
268 case OPERATION_SET:
269 case OPERATION_ADD:
270 case OPERATION_REPLACE:
271 case OPERATION_CAS:
272 r = worker.do_write();
273 break;
274
275 case OP_ARITHMETIC:
276 r = worker.do_math();
277 break;
278
279 default:
280 r = op_not_supported;
281 }
282
283 switch(r) {
284 case op_not_supported:
285 newitem->status = & status_block_op_not_supported;
286 break;
287
288 case op_failed:
289 newitem->status = & status_block_misc_error;
290 break;
291
292 case op_bad_key:
293 newitem->status = & status_block_op_bad_key;
294 break;
295
296 case op_overflow:
297 newitem->status = & status_block_too_big;
298 break;
299
300 case op_prepared:
301 break;
302 }
303
304 return r;
305 }
306
307
308 /***************** STEP ONE OPERATIONS ***************************************/
309
startTransaction(Operation & op)310 bool WorkerStep1::startTransaction(Operation & op) {
311 tx = op.startTransaction(wqitem->ndb_instance->db);
312 if(tx) {
313 return true;
314 }
315 log_ndb_error(wqitem->ndb_instance->db->getNdbError());
316 return false;
317 }
318
319
WorkerStep1(workitem * newitem)320 WorkerStep1::WorkerStep1(workitem *newitem) :
321 wqitem(newitem),
322 tx(0),
323 plan(newitem->plan)
324 {
325 /* Set cas_owner in workitem.
326 (Further refine the semantics of this. Does it depend on do_mc_read?)
327 */
328 newitem->base.cas_owner = (newitem->prefix_info.has_cas_col);
329 };
330
331
do_delete()332 op_status_t WorkerStep1::do_delete() {
333 DEBUG_ENTER_DETAIL();
334
335 if(wqitem->base.use_ext_val) {
336 return ExternalValue::do_delete(wqitem);
337 }
338
339 const NdbOperation *ndb_op = 0;
340 Operation op(plan, OP_DELETE);
341
342 op.key_buffer = wqitem->ndb_key_buffer;
343 const char *dbkey = workitem_get_key_suffix(wqitem);
344 if(! op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix)) {
345 return op_overflow;
346 }
347
348 if(! startTransaction(op))
349 return op_failed;
350
351 /* Here we could also support op.deleteTupleCAS(tx, & options)
352 but the protocol is ambiguous about whether this is allowed.
353 */
354 ndb_op = op.deleteTuple(tx);
355
356 /* Check for errors */
357 if(ndb_op == 0) {
358 const NdbError & err = tx->getNdbError();
359 if(err.status != NdbError::Success) {
360 log_ndb_error(err);
361 tx->close();
362 return op_failed;
363 }
364 }
365
366 /* Prepare for execution */
367 Scheduler::execute(tx, NdbTransaction::Commit, callback_main, wqitem, YIELD);
368 return op_prepared;
369 }
370
371
do_write()372 op_status_t WorkerStep1::do_write() {
373 DEBUG_PRINT_DETAIL("%s", workitem_get_operation(wqitem));
374
375 if(wqitem->base.use_ext_val) {
376 return ExternalValue::do_write(wqitem);
377 }
378
379 uint64_t cas_in = *wqitem->cas; // read old value
380 if(wqitem->base.cas_owner) {
381 worker_set_cas(wqitem->pipeline, wqitem->cas); // generate a new value
382 hash_item_set_cas(wqitem->cache_item, * wqitem->cas); // store it
383 }
384
385 const NdbOperation *ndb_op = 0;
386 Operation op(wqitem);
387 const char *dbkey = workitem_get_key_suffix(wqitem);
388 bool op_ok;
389
390 /* Set the key */
391 op_ok = op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
392 if(! op_ok) {
393 return op_overflow;
394 }
395
396 /* Allocate and encode the buffer for the row */
397 workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer());
398 op.buffer = wqitem->row_buffer_1;
399
400 /* Set the row */
401 op.setNullBits();
402 op.setKeyFieldsInRow(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
403
404 if(plan->spec->nvaluecols > 1) {
405 /* Multiple Value Columns */
406 TabSeparatedValues tsv(hash_item_get_data(wqitem->cache_item),
407 plan->spec->nvaluecols, wqitem->cache_item->nbytes);
408 int idx = 0;
409 do {
410 if(tsv.getLength()) {
411 op_ok = op.setColumn(COL_STORE_VALUE+idx, tsv.getPointer(), tsv.getLength());
412 if(! op_ok) {
413 return op_overflow;
414 }
415 }
416 else {
417 op.setColumnNull(COL_STORE_VALUE+idx);
418 }
419 idx++;
420 } while (tsv.advance());
421 }
422 else {
423 /* Just one value column */
424 op_ok = op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
425 wqitem->cache_item->nbytes);
426 if(! op_ok) {
427 return op_overflow;
428 }
429 }
430
431 if(wqitem->base.cas_owner) {
432 op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas); // the cas
433 }
434
435 if(wqitem->plan->dup_numbers) {
436 if(isdigit(* hash_item_get_data(wqitem->cache_item)) &&
437 wqitem->cache_item->nbytes < 32) { // Copy string representation
438 uint64_t number;
439 const int len = wqitem->cache_item->nbytes;
440 char value[32];
441 for(int i = 0 ; i < len ; i++)
442 value[i] = * (hash_item_get_data(wqitem->cache_item) + i);
443 value[len] = 0;
444 if(safe_strtoull(value, &number)) { // numeric: set the math column
445 DEBUG_PRINT_DETAIL(" dup_numbers -- %d", (int) number );
446 op.setColumnBigUnsigned(COL_STORE_MATH, number);
447 }
448 else { // non-numeric
449 DEBUG_PRINT_DETAIL(" dup_numbers but non-numeric: %.*s *** ", len, value);
450 op.setColumnNull(COL_STORE_MATH);
451 }
452 }
453 else op.setColumnNull(COL_STORE_MATH);
454 }
455
456 /* Set expire time */
457 rel_time_t exptime = hash_item_get_exptime(wqitem->cache_item);
458 if(exptime && wqitem->prefix_info.has_expire_col) {
459 time_t abs_expires =
460 wqitem->pipeline->engine->server.core->abstime(exptime);
461 op.setColumnInt(COL_STORE_EXPIRES, abs_expires);
462 }
463
464 /* Set flags */
465 if(wqitem->prefix_info.has_flags_col) {
466 uint32_t flags = hash_item_get_flags(wqitem->cache_item);
467 op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
468 }
469
470 /* Start the transaction */
471 if(! startTransaction(op))
472 return op_failed;
473
474 if(wqitem->base.verb == OPERATION_REPLACE) {
475 DEBUG_PRINT(" [REPLACE] \"%.*s\"", wqitem->base.nkey, wqitem->key);
476 ndb_op = op.updateTuple(tx);
477 }
478 else if(wqitem->base.verb == OPERATION_ADD) {
479 DEBUG_PRINT(" [ADD] \"%.*s\"", wqitem->base.nkey, wqitem->key);
480 ndb_op = op.insertTuple(tx);
481 }
482 else if(wqitem->base.verb == OPERATION_CAS) {
483 if(wqitem->base.cas_owner) {
484 /* NdbOperation.hpp says: "All data is copied out of the OperationOptions
485 structure (and any subtended structures) at operation definition time."
486 */
487 DEBUG_PRINT(" [CAS UPDATE:%llu] \"%.*s\"", cas_in, wqitem->base.nkey, wqitem->key);
488 const Uint32 program_size = 25;
489 Uint32 program[program_size];
490 NdbInterpretedCode cas_code(plan->table, program, program_size);
491 NdbOperation::OperationOptions options;
492 build_cas_routine(& cas_code, plan->cas_column_id, cas_in);
493 options.optionsPresent = NdbOperation::OperationOptions::OO_INTERPRETED;
494 options.interpretedCode = & cas_code;
495 ndb_op = op.updateTuple(tx, & options);
496 }
497 }
498 else if(wqitem->base.verb == OPERATION_SET) {
499 DEBUG_PRINT(" [SET] \"%.*s\"", wqitem->base.nkey, wqitem->key);
500 ndb_op = op.writeTuple(tx);
501 }
502
503 /* Error case; operation has not been built */
504 if(! ndb_op) {
505 log_ndb_error(tx->getNdbError());
506 DEBUG_PRINT("NDB operation failed. workitem %d.%d", wqitem->pipeline->id,
507 wqitem->id);
508 tx->close();
509 return op_failed;
510 }
511
512 wqitem->next_step = (void *) worker_finalize_write;
513 Scheduler::execute(tx, NdbTransaction::Commit, callback_main, wqitem, YIELD);
514 return op_prepared;
515 }
516
517
do_read()518 op_status_t WorkerStep1::do_read() {
519 DEBUG_ENTER_DETAIL();
520
521 Operation op(plan, OP_READ);
522 if(! setKeyForReading(op)) {
523 return op_overflow;
524 }
525
526 NdbOperation::LockMode lockmode;
527 NdbTransaction::ExecType commitflag;
528 if(plan->canUseCommittedRead()) {
529 lockmode = NdbOperation::LM_CommittedRead;
530 commitflag = NdbTransaction::Commit;
531 }
532 else {
533 lockmode = NdbOperation::LM_Read;
534 commitflag = NdbTransaction::NoCommit;
535 }
536
537 if(! op.readTuple(tx, lockmode)) {
538 log_ndb_error(tx->getNdbError());
539 tx->close();
540 return op_failed;
541 }
542
543 /* Save the workitem in the transaction and prepare for async execution */
544 wqitem->next_step = (void *)
545 (wqitem->base.use_ext_val ? worker_check_read : worker_finalize_read);
546 Scheduler::execute(tx, commitflag, callback_main, wqitem, YIELD);
547 return op_prepared;
548 }
549
550
do_append()551 op_status_t WorkerStep1::do_append() {
552 DEBUG_ENTER_DETAIL();
553
554 /* APPEND/PREPEND is currently not supported for tsv */
555 if(wqitem->plan->spec->nvaluecols > 1) {
556 return op_not_supported;
557 }
558 Operation op(plan, OP_READ);
559 if(! setKeyForReading(op)) {
560 return op_overflow;
561 }
562
563 /* Read with an exculsive lock */
564 if(! op.readTuple(tx, NdbOperation::LM_Exclusive)) {
565 log_ndb_error(tx->getNdbError());
566 tx->close();
567 return op_failed;
568 }
569
570 /* Prepare for async execution */
571 wqitem->next_step = (void *) worker_append;
572 Scheduler::execute(tx, NdbTransaction::NoCommit, callback_main, wqitem, YIELD);
573 return op_prepared;
574 }
575
576
setKeyForReading(Operation & op)577 bool WorkerStep1::setKeyForReading(Operation &op) {
578
579 /* Use the workitem's inline key buffer */
580 op.key_buffer = wqitem->ndb_key_buffer;
581
582 /* Allocate a new result buffer large enough for the result.
583 Add 2 bytes to hold potential \r\n in a no-copy result. */
584 workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer() + 2);
585 op.buffer = wqitem->row_buffer_1;
586
587 /* set the key */
588 op.clearKeyNullBits();
589 const char *dbkey = workitem_get_key_suffix(wqitem);
590 if(! op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix))
591 return false;
592
593 /* Start a transaction */
594 return startTransaction(op);
595 }
596
597
598
do_math()599 op_status_t WorkerStep1::do_math() {
600 DEBUG_PRINT_DETAIL("create: %d retries: %d",
601 wqitem->base.math_create, wqitem->base.retries);
602 worker_set_cas(wqitem->pipeline, wqitem->cas);
603
604 /*
605 Begin transaction
606 1. readTuple (LM_Exclusive)
607 2. if(create_flag)
608 insertTuple, setting value to initial_value - delta (AO_IgnoreError)
609 3. updateTuple (interpreted: add delta to value)
610 Execute (Commit)
611
612 Then look at the error codes from all 3 operations to see what happened:
613
614 read insert update response
615 --------------------------------------------------------------------------
616 626 0 0 row was created. return initial_value.
617 0 630 0 row existed. return fetched_value + delta.
618 x x 626 failure due to race with concurrent delete
619 */
620
621 const char *dbkey = workitem_get_key_suffix(wqitem);
622
623 /* This transaction involves up to three NdbOperations. */
624 const NdbOperation *ndbop1 = 0;
625 const NdbOperation *ndbop2 = 0;
626 const NdbOperation *ndbop3 = 0;
627
628 /* "Operation" is really just a header-only library for convenience and
629 safety. We use 2 of them here -- one for the read, the other for the
630 update and insert. This is only because they will make slightly different
631 use of records and buffers. Both will use the inline key buffer.
632 */
633 Operation op1(plan, OP_READ, wqitem->ndb_key_buffer);
634 Operation op2(wqitem); // insert
635 Operation op3(wqitem); // update
636
637 op1.readSelectedColumns();
638 op1.readColumn(COL_STORE_MATH);
639
640 if(! wqitem->base.retries) {
641 /* Allocate & populate row buffers for these operations:
642 We need one for the read and one for the insert. */
643 size_t needed = op1.requiredBuffer();
644 workitem_allocate_rowbuffer_1(wqitem, needed);
645 workitem_allocate_rowbuffer_2(wqitem, needed);
646 op1.buffer = wqitem->row_buffer_1;
647 op2.buffer = wqitem->row_buffer_2;
648 op3.buffer = wqitem->row_buffer_2;
649
650 /* The two items share a key buffer, so we encode the key just once */
651 op1.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
652
653 /* The insert operation also needs the key written into the row */
654 op2.clearNullBits();
655 op2.setKeyFieldsInRow(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
656
657 /* CAS */
658 if(wqitem->base.cas_owner) {
659 op1.readColumn(COL_STORE_CAS);
660 op2.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);
661 op3.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);
662 }
663 /* In "dup_numbers" mode, also null out the text version of the value */
664 if(wqitem->plan->dup_numbers) {
665 op2.setColumnNull(COL_STORE_VALUE);
666 op3.setColumnNull(COL_STORE_VALUE);
667 }
668 }
669
670 /* Use an op (either one) to start the transaction */
671 if(! startTransaction(op1))
672 return op_failed;
673
674 /* NdbOperation #1: READ */
675 {
676 ndbop1 = op1.readTuple(tx, NdbOperation::LM_Exclusive);
677 if(! ndbop1) {
678 log_ndb_error(tx->getNdbError());
679 tx->close();
680 return op_failed;
681 }
682 }
683
684 /* NdbOperation #2: INSERT (AO_IgnoreError) */
685 if(wqitem->base.math_create) {
686 /* Offset the initial value to compensate for the update */
687 uint64_t initial_value;
688 if(wqitem->base.math_incr)
689 initial_value = wqitem->math_value - wqitem->math_flags; // incr
690 else
691 initial_value = wqitem->math_value + wqitem->math_flags; // decr
692 op2.setColumnBigUnsigned(COL_STORE_MATH, initial_value);
693
694 /* If this insert gets an error, the transaction should continue. */
695 NdbOperation::OperationOptions options;
696 options.optionsPresent = NdbOperation::OperationOptions::OO_ABORTOPTION;
697 options.abortOption = NdbOperation::AO_IgnoreError;
698
699 ndbop2 = op2.insertTuple(tx, & options);
700 if(! ndbop2) {
701 log_ndb_error(tx->getNdbError());
702 tx->close();
703 return op_failed;
704 }
705 }
706
707 /* NdbOperation #3: Interpreted Update */
708 {
709 NdbOperation::OperationOptions options;
710 const Uint32 program_size = 32;
711 Uint32 program[program_size];
712 NdbInterpretedCode code(plan->table, program, program_size);
713
714 if(wqitem->base.math_incr) { // incr
715 code.add_val(plan->math_column_id, wqitem->math_flags);
716 code.interpret_exit_ok();
717 }
718 else { // decr
719 const Uint32 Rdel = 1, Rcol = 2, Rres = 3; // registers 1,2,3
720 const Uint32 SUB_ZERO = 0; // a label
721
722 code.load_const_u64(Rdel, wqitem->math_flags); // load R1, delta
723 code.read_attr (Rcol, plan->math_column_id); // load R2, math_col
724 code.branch_gt (Rdel, Rcol, SUB_ZERO); // if R1 > R2 goto SUB_ZERO
725 code.sub_reg (Rres, Rcol, Rdel); // let R3 = R2 - R1
726 code.write_attr (plan->math_column_id, Rres); // Store into column
727 code.interpret_exit_ok();
728 code.def_label (SUB_ZERO);
729 code.load_const_u64(Rres, 0); // Set to zero
730 code.write_attr (plan->math_column_id, Rres); // Store into column
731 code.interpret_exit_ok();
732 }
733
734 code.finalise();
735
736 options.optionsPresent = NdbOperation::OperationOptions::OO_INTERPRETED;
737 options.interpretedCode = & code;
738
739 ndbop3 = op3.updateTuple(tx, & options);
740 if(! ndbop3) {
741 log_ndb_error(tx->getNdbError());
742 tx->close();
743 return op_failed;
744 }
745 }
746
747 Scheduler::execute(tx,NdbTransaction::Commit, callback_incr, wqitem, YIELD);
748 return op_prepared;
749 }
750
751
752 /***************** NDB CALLBACKS *********************************************/
debug_workitem(workitem * item)753 void debug_workitem(workitem *item) {
754 DEBUG_PRINT("%d.%d %s %s %s",
755 item->pipeline->id,
756 item->id,
757 item->plan->table->getName(),
758 workitem_get_operation(item),
759 workitem_get_key_suffix(item));
760 }
761
callback_main(int,NdbTransaction * tx,void * itemptr)762 void callback_main(int, NdbTransaction *tx, void *itemptr) {
763 workitem *wqitem = (workitem *) itemptr;
764
765 /************** Error handling ***********/
766 /* No Error */
767 if(tx->getNdbError().classification == NdbError::NoError) {
768 DEBUG_PRINT("Success.");
769 wqitem->status = & status_block_generic_success;
770 if(wqitem->next_step) {
771 /* Control moves forward to the next step of the operation */
772 worker_step * next_step = (worker_step *) wqitem->next_step;
773 wqitem->next_step = 0;
774 next_step(tx, wqitem);
775 return;
776 }
777 }
778 /* CAS mismatch; interpreted code aborted with interpret_exit_nok() */
779 else if(tx->getNdbError().code == 2010) {
780 DEBUG_PRINT("CAS mismatch.");
781 * wqitem->cas = 0ULL; // set cas=0 in the response. see note re. valgrind.
782 wqitem->status = & status_block_cas_mismatch;
783 }
784 /* No Data Found */
785 else if(tx->getNdbError().classification == NdbError::NoDataFound) {
786 /* Error code should be 626 */
787 DEBUG_PRINT("NoDataFound [%d].", tx->getNdbError().code);
788 if(wqitem->cas) * wqitem->cas = 0ULL; // see note re. valgrind
789 switch(wqitem->base.verb) {
790 case OPERATION_REPLACE:
791 case OPERATION_APPEND:
792 case OPERATION_PREPEND:
793 wqitem->status = & status_block_bad_replace;
794 break;
795 default:
796 wqitem->status = & status_block_item_not_found;
797 break;
798 }
799 }
800 /* Duplicate key on insert */
801 else if(tx->getNdbError().code == 630) {
802 DEBUG_PRINT("Duplicate key on insert.");
803 if(wqitem->cas) * wqitem->cas = 0ULL; // see note re. valgrind
804 wqitem->status = & status_block_bad_add;
805 }
806 /* Overload Error, e.g. 410 "REDO log files overloaded" */
807 else if(tx->getNdbError().classification == NdbError::OverloadError) {
808 log_ndb_error(tx->getNdbError());
809 wqitem->status = & status_block_temp_failure;
810 }
811 /* Attempt to insert via unique index access */
812 else if(tx->getNdbError().code == 897) {
813 wqitem->status = & status_block_idx_insert;
814 }
815 /* Out of memory */
816 else if(tx->getNdbError().code == 827) {
817 log_ndb_error(tx->getNdbError());
818 wqitem->status = & status_block_no_mem;
819 }
820 /* Some other error.
821 The get("dummy") in mtr's memcached_wait_for_ready.inc script will often
822 get a 241 or 284 error here.
823 */
824 else {
825 log_ndb_error(tx->getNdbError());
826 debug_workitem(wqitem);
827 wqitem->status = & status_block_misc_error;
828 }
829
830 worker_commit(tx, wqitem);
831 }
832
833
callback_incr(int result,NdbTransaction * tx,void * itemptr)834 void callback_incr(int result, NdbTransaction *tx, void *itemptr) {
835 workitem *wqitem = (workitem *) itemptr;
836 // ndb_pipeline * & pipeline = wqitem->pipeline;
837
838 /* read insert update cr_flag response
839 ------------------------------------------------------------------------
840 626 0 0 0 return NOT_FOUND.
841 626 0 0 1 row was created. return initial_value.
842 0 x 0 x row existed. return fetched_value + delta.
843 x x 626 x failure due to race with concurrent delete.
844 */
845
846 const NdbOperation *ndbop1, *ndbop2, *ndbop3;
847 int r_read = -1;
848 int r_insert = -1;
849 int r_update = -1;
850
851 ndbop1 = tx->getNextCompletedOperation(NULL);
852 r_read = ndbop1->getNdbError().code;
853
854 if(ndbop1) { /* ndbop1 is the read operation */
855 if(wqitem->base.math_create) {
856 ndbop2 = tx->getNextCompletedOperation(ndbop1); /* the insert */
857 r_insert = ndbop2->getNdbError().code;
858 }
859 else {
860 ndbop2 = ndbop1; /* no insert (create flag was not set) */
861 r_insert = 0;
862 }
863 if(ndbop2) {
864 ndbop3 = tx->getNextCompletedOperation(ndbop2); /* the update */
865 r_update = ndbop3->getNdbError().code;
866 }
867 }
868 DEBUG_PRINT_DETAIL("r_read: %d r_insert: %d r_update: %d create: %d",
869 r_read, r_insert, r_update, wqitem->base.math_create);
870
871 if(r_read == 626 && ! wqitem->base.math_create) {
872 /* row did not exist, and create flag was not set */
873 wqitem->status = & status_block_item_not_found;
874 }
875 else if(r_read == 0 && r_update == 0) {
876 /* row existed. return fetched_value +/- delta. */
877 Operation op(wqitem->plan, OP_READ);
878 op.buffer = wqitem->row_buffer_1;
879 uint64_t stored = op.getBigUnsignedValue(COL_STORE_MATH);
880 if(wqitem->base.math_incr) { // incr
881 wqitem->math_value = stored + wqitem->math_flags;
882 }
883 else { // decr
884 if(wqitem->math_flags > stored)
885 wqitem->math_value = 0; // underflow < 0 is not allowed
886 else
887 wqitem->math_value = stored - wqitem->math_flags;
888 }
889
890 wqitem->status = & status_block_generic_success;
891 }
892 else if(r_read == 626 && r_insert == 0 && r_update == 0) {
893 /* row was created. Return initial_value.
894 wqitem->math_value is already set to the initial_value :) */
895 wqitem->status = & status_block_generic_success;
896 }
897 else if(r_read == -1 || r_insert == -1 || r_update == -1) {
898 /* Total failure */
899 logger->log(LOG_WARNING, 0, "incr/decr: total failure.\n");
900 wqitem->status = & status_block_misc_error;
901 }
902 else if(r_update == 626) {
903 /* failure due to race with concurrent delete */
904 // TODO: design a test for this code. Does it require reschedule()?
905 if(wqitem->base.retries++ < 3) { // try again:
906 tx->close();
907 op_status_t r = worker_prepare_operation(wqitem);
908 if(r == op_prepared)
909 return; /* retry is in progress */
910 else
911 wqitem->status = & status_block_misc_error;
912 }
913 else {
914 logger->log(LOG_WARNING, 0, "incr/decr: giving up, too many retries.\n");
915 wqitem->status = & status_block_misc_error;
916 }
917 }
918
919 worker_close(tx, wqitem);
920 }
921
922
callback_close(int result,NdbTransaction * tx,void * itemptr)923 void callback_close(int result, NdbTransaction *tx, void *itemptr) {
924 if(result) log_ndb_error(tx->getNdbError());
925 workitem *wqitem = (workitem *) itemptr;
926 worker_close(tx, wqitem);
927 }
928
929
930 /***************** WORKER STEPS **********************************************/
931
worker_commit(NdbTransaction * tx,workitem * item)932 void worker_commit(NdbTransaction *tx, workitem *item) {
933 /* If the transaction has not been committed, we need to send an empty
934 execute call and commit it. Otherwise close() will block. */
935 if(tx->commitStatus() == NdbTransaction::Started) {
936 Scheduler::execute(tx, NdbTransaction::Commit, callback_close, item, RESCHEDULE);
937 }
938 else
939 worker_close(tx, item);
940 }
941
942
worker_close(NdbTransaction * tx,workitem * wqitem)943 void worker_close(NdbTransaction *tx, workitem *wqitem) {
944 DEBUG_PRINT_DETAIL("%d.%d", wqitem->pipeline->id, wqitem->id);
945 ndb_pipeline * & pipeline = wqitem->pipeline;
946
947 if(wqitem->ext_val)
948 delete wqitem->ext_val;
949
950 pipeline->scheduler->close(tx, wqitem);
951 }
952
953
worker_append(NdbTransaction * tx,workitem * item)954 void worker_append(NdbTransaction *tx, workitem *item) {
955 if(item->base.use_ext_val) {
956 ExternalValue::append_after_read(tx, item);
957 return;
958 }
959
960 DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
961
962 /* Strings and lengths: */
963 char * current_val = 0;
964 size_t current_len = 0;
965 const char * affix_val = hash_item_get_data(item->cache_item);
966 const size_t affix_len = item->cache_item->nbytes;
967
968 /* worker_do_read() has already written the key into item->ndb_key_buffer.
969 The result is sitting in wqitem->row_buffer_1.
970 Read the value.
971 */
972 Operation readop(item->plan, OP_READ);
973 readop.buffer = item->row_buffer_1;
974 if(readop.nValues() != 1) {
975 return worker_close(tx, item);
976 }
977 readop.getStringValueNoCopy(COL_STORE_VALUE + 0, & current_val, & current_len);
978
979 /* Generate a new CAS */
980 worker_set_cas(item->pipeline, item->cas);
981 hash_item_set_cas(item->cache_item, * item->cas);
982
983 /* Prepare a write operation */
984 Operation op(item->plan, item->base.verb, item->ndb_key_buffer);
985 const NdbOperation *ndb_op = 0;
986
987 /* Allocate a buffer for the new value */
988 size_t max_len = op.requiredBuffer();
989 workitem_allocate_rowbuffer_2(item, max_len);
990 op.buffer = item->row_buffer_2;
991
992 /* Rewrite the value */
993 size_t total_len = affix_len + current_len;
994 if(total_len > max_len) total_len = max_len;
995 if(item->base.verb == OPERATION_APPEND) {
996 memcpy(current_val + current_len, affix_val, total_len - current_len);
997 }
998 else {
999 assert(item->base.verb == OPERATION_PREPEND);
1000 memmove(current_val + affix_len, current_val, current_len);
1001 memcpy(current_val, affix_val, affix_len);
1002 }
1003 * (current_val + total_len) = 0;
1004 DEBUG_PRINT_DETAIL("New value: %.*s%s", total_len < 100 ? total_len : 100,
1005 current_val, total_len > 100 ? " ..." : "");
1006
1007 /* Set the row */
1008 op.setNullBits();
1009 op.setKeyFieldsInRow(item->plan->spec->nkeycols,
1010 workitem_get_key_suffix(item), item->base.nsuffix);
1011 op.setColumn(COL_STORE_VALUE, current_val, total_len);
1012 if(item->prefix_info.has_cas_col)
1013 op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
1014 ndb_op = op.updateTuple(tx);
1015
1016 if(ndb_op) {
1017 // Inform the scheduler that this item must be re-polled
1018 item->next_step = (void *) worker_finalize_write;
1019 Scheduler::execute(tx, NdbTransaction::Commit, callback_main, item, RESCHEDULE);
1020 }
1021 else {
1022 /* Error case; operation has not been built */
1023 DEBUG_PRINT("NDB operation failed. workitem %d.%d", item->pipeline->id,
1024 item->id);
1025 worker_close(tx, item);
1026 }
1027 }
1028
1029
worker_check_read(NdbTransaction * tx,workitem * wqitem)1030 void worker_check_read(NdbTransaction *tx, workitem *wqitem) {
1031 Operation op(wqitem->plan, OP_READ);
1032 op.buffer = wqitem->row_buffer_1;
1033
1034 if(op.isNull(COL_STORE_EXT_SIZE)) {
1035 worker_finalize_read(tx, wqitem);
1036 }
1037 else {
1038 ExternalValue *ext_val = new ExternalValue(wqitem);
1039 ext_val->worker_read_external(op, tx);
1040 }
1041 }
1042
1043
delete_expired_item(workitem * wqitem,NdbTransaction * tx)1044 void delete_expired_item(workitem *wqitem, NdbTransaction *tx) {
1045 DEBUG_PRINT(" Deleting [%d.%d]", wqitem->pipeline->id, wqitem->id);
1046 Operation op(wqitem);
1047 op.deleteTuple(tx);
1048 wqitem->status = & status_block_item_not_found;
1049 Scheduler::execute(tx, NdbTransaction::Commit, callback_close, wqitem, RESCHEDULE);
1050 }
1051
1052
worker_finalize_read(NdbTransaction * tx,workitem * wqitem)1053 void worker_finalize_read(NdbTransaction *tx, workitem *wqitem) {
1054 ExpireTime exp_time(wqitem);
1055 Operation op(wqitem->plan, OP_READ);
1056 op.buffer = wqitem->row_buffer_1;
1057
1058 if(exp_time.stored_item_has_expired(op)) {
1059 delete_expired_item(wqitem, tx);
1060 return;
1061 }
1062
1063 if(wqitem->prefix_info.has_flags_col && ! op.isNull(COL_STORE_FLAGS))
1064 wqitem->math_flags = htonl(op.getIntValue(COL_STORE_FLAGS));
1065 else if(wqitem->plan->static_flags)
1066 wqitem->math_flags = htonl(wqitem->plan->static_flags);
1067 else
1068 wqitem->math_flags = 0;
1069
1070 if(wqitem->prefix_info.has_cas_col) {
1071 wqitem->cas = (uint64_t *) op.getPointer(COL_STORE_CAS);
1072 }
1073
1074 /* Try to send the value from the row_buffer without copying it. */
1075 if( (! wqitem->prefix_info.do_mc_read)
1076 && op.nValues() == 1
1077 && ! (op.isNull(COL_STORE_VALUE) && wqitem->plan->dup_numbers)
1078 && op.getStringValueNoCopy(COL_STORE_VALUE, & wqitem->value_ptr, & wqitem->value_size)
1079 && op.appendCRLF(COL_STORE_VALUE, wqitem->value_size))
1080 {
1081 /* The workitem's value_ptr and value_size were set above. */
1082 DEBUG_PRINT("%d.%d using no-copy buffer.", wqitem->pipeline->id, wqitem->id);
1083 wqitem->base.has_value = true;
1084 /* "cache_item == workitem" is a sort of code, required because memcached
1085 expects us to return a non-zero item. In ndb_release() we will look
1086 for this and use it to prevent double-freeing of the workitem. */
1087 wqitem->cache_item = (hash_item *) wqitem;
1088 }
1089 else {
1090 /* Copy the value into a new buffer */
1091 DEBUG_PRINT("%d.%d copying value.", wqitem->pipeline->id, wqitem->id);
1092 build_hash_item(wqitem, op, exp_time);
1093 }
1094
1095 worker_commit(tx, wqitem);
1096 }
1097
1098
worker_finalize_write(NdbTransaction * tx,workitem * wqitem)1099 void worker_finalize_write(NdbTransaction *tx, workitem *wqitem) {
1100 if(wqitem->prefix_info.do_mc_write) {
1101 /* If the write was successful, update the local cache */
1102 /* Possible bugs here:
1103 (1) store_item will store nbytes as length, which is wrong.
1104 (2) The CAS may be incorrect.
1105 Status as of Feb. 2013:
1106 Memcapable INCR/DECR/APPEND/PREPEND tests fail when
1107 local caching is enabled.
1108 */
1109 ndb_pipeline * & pipeline = wqitem->pipeline;
1110 struct default_engine * se;
1111 se = (struct default_engine *) pipeline->engine->m_default_engine;
1112 ENGINE_ERROR_CODE status;
1113
1114 status = store_item(se, wqitem->cache_item,
1115 hash_item_get_cas_ptr(wqitem->cache_item),
1116 OPERATION_SET, wqitem->cookie);
1117 if (status != ENGINE_SUCCESS) {
1118 wqitem->status = & status_block_memcache_error;
1119 }
1120 }
1121
1122 worker_close(tx, wqitem);
1123 }
1124
1125
1126 /*****************************************************************************/
1127
1128
1129
1130 /* Allocate a hash table item, populate it with the original key
1131 and the results from the read, then store it.
1132 */
build_hash_item(workitem * wqitem,Operation & op,ExpireTime & exp_time)1133 void build_hash_item(workitem *wqitem, Operation &op, ExpireTime & exp_time) {
1134 DEBUG_ENTER();
1135 ndb_pipeline * & pipeline = wqitem->pipeline;
1136 struct default_engine *se;
1137 se = (struct default_engine *) pipeline->engine->m_default_engine;
1138
1139 size_t nbytes = op.getStringifiedLength() + 2; /* 2 bytes for \r\n */
1140
1141 /* Allocate a hash item */
1142 /* item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie) */
1143 hash_item * item = item_alloc(se, wqitem->key, wqitem->base.nkey,
1144 wqitem->math_flags,
1145 exp_time.local_cache_expire_time,
1146 nbytes, wqitem->cookie);
1147
1148 if(item) {
1149 /* Now populate the item with the result */
1150 size_t ncopied = 0;
1151 memcpy(hash_item_get_key(item), wqitem->key, wqitem->base.nkey); // the key
1152 char * data_ptr = hash_item_get_data(item);
1153
1154 /* Maybe use the math column as the value */
1155 if( wqitem->plan->hasMathColumn()
1156 && (! op.isNull(COL_STORE_MATH))
1157 && ( (op.nValues() == 0)
1158 || (wqitem->plan->dup_numbers && op.isNull(COL_STORE_VALUE))
1159 )
1160 ) {
1161 ncopied = op.copyValue(COL_STORE_MATH, data_ptr);
1162 }
1163 else {
1164 /* Build a result containing each column */
1165 for(int i = 0 ; i < op.nValues() ; i++) {
1166 if(i) * (data_ptr + ncopied++) = '\t';
1167 ncopied += op.copyValue(COL_STORE_VALUE + i, data_ptr + ncopied);
1168 }
1169 }
1170
1171 /* pad the value with \r\n -- memcached expects it there. */
1172 * (data_ptr + ncopied) = '\r';
1173 * (data_ptr + ncopied + 1) = '\n';
1174 * (data_ptr + ncopied + 2) = '\0';
1175 DEBUG_PRINT("nbytes: %d ncopied: %d", nbytes, ncopied + 2);
1176
1177 /* Point to it in the workitem */
1178 wqitem->cache_item = item;
1179 wqitem->value_size = ncopied;
1180
1181 /* store it in the local cache? */
1182 // fixme: probably nbytes is wrong
1183 if(wqitem->prefix_info.do_mc_read) {
1184 uint64_t *cas = hash_item_get_cas_ptr(item);
1185 ENGINE_ERROR_CODE status;
1186 status = store_item(se, item, cas, OPERATION_SET, wqitem->cookie);
1187 if(status != ENGINE_SUCCESS)
1188 wqitem->status = & status_block_memcache_error;
1189 }
1190 }
1191 else {
1192 DEBUG_PRINT("Failure. Item: %p", item);
1193 wqitem->status = & status_block_memcache_error;
1194 }
1195 }
1196
1197
build_cas_routine(NdbInterpretedCode * r,int cas_col,uint64_t cas_val)1198 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val) {
1199 const Uint32 R1 = 1; // a register
1200 const Uint32 R2 = 2; // a register
1201 const Uint32 MISMATCH = 0; // a branch label
1202
1203 DEBUG_PRINT("cas_col: %d, cas_val: %llu", cas_col, cas_val);
1204
1205 /* Branch on cas_value != cas_column */
1206 r->load_const_u64(R1, cas_val); // load the CAS into R1
1207 r->read_attr(R2, cas_col); // read the cas column into R2
1208 r->branch_ne(R1, R2, MISMATCH); // if(R1 != R2) goto MISMATCH
1209
1210 /* Here is the cas_value == cas_column branch: */
1211 r->interpret_exit_ok(); // allow operation to succeed
1212
1213 /* Here is the cas_value != cas_column branch: */
1214 r->def_label(MISMATCH);
1215 r->interpret_exit_nok(2010); // abort the operation
1216
1217 return r->finalise(); // resolve the label/branch
1218 }
1219
1220