1 /*
2  Copyright (c) 2011, 2021, Oracle and/or its affiliates. All rights
3  reserved.
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License, version 2.0,
7  as published by the Free Software Foundation.
8 
9  This program is also distributed with certain software (including
10  but not limited to OpenSSL) that is licensed under separate terms,
11  as designated in a particular file or component or in included license
12  documentation.  The authors of MySQL hereby grant you an additional
13  permission to link the program and your derivative works with the
14  separately licensed software that they have included with MySQL.
15 
16  This program is distributed in the hope that it will be useful,
17  but WITHOUT ANY WARRANTY; without even the implied warranty of
18  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  GNU General Public License, version 2.0, for more details.
20 
21  You should have received a copy of the GNU General Public License
22  along with this program; if not, write to the Free Software
23  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
24  02110-1301  USA
25  */
26 
27 /* configure defines */
28 #include <my_config.h>
29 
30 /* System headers */
31 #define __STDC_FORMAT_MACROS
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <assert.h>
36 #include <string.h>
37 #include <inttypes.h>
38 #include <ctype.h>
39 #include <arpa/inet.h>
40 
41 /* Memcache headers */
42 #include "memcached/types.h"
43 #include <memcached/extension_loggers.h>
44 
45 /* NDB headers */
46 #include "NdbApi.hpp"
47 
48 /* NDB Memcache headers */
49 #include "atomics.h"
50 #include "ndbmemcache_global.h"
51 #include "hash_item_util.h"
52 #include "workitem.h"
53 #include "Configuration.h"
54 #include "DataTypeHandler.h"
55 #include "ExpireTime.h"
56 #include "ExternalValue.h"
57 #include "TabSeparatedValues.h"
58 #include "debug.h"
59 #include "Operation.h"
60 #include "NdbInstance.h"
61 #include "status_block.h"
62 #include "Operation.h"
63 #include "Scheduler.h"
64 #include "ndb_engine.h"
65 #include "hash_item_util.h"
66 #include "ndb_worker.h"
67 #include "ndb_error_logger.h"
68 #include "ndb_engine_errors.h"
69 
70 /**********************************************************
71   Schedduler::schedule()
72     worker_prepare_operation(workitem *)
73       WorkerStep1::do_op()
74         NdbTransaction::executeAsynchPrepare() with ndb_async_callback
75                 ...
76    ndb_async_callback
77      * (workitem->next_step)()
78 ************************************************************/
79 
80 /*  The first phase of any operation is implemented as a method which begins
81     an NDB async transaction and returns an op_status_t to the scheduler.
82 */
83 
84 class WorkerStep1 {
85 public:
86   WorkerStep1(struct workitem *);
87   op_status_t do_append();           // begin an append/prepend operation
88   op_status_t do_read();             // begin a read operation
89   op_status_t do_write();            // begin a SET/ADD/REPLACE operation
90   op_status_t do_delete();           // begin a delete operation
91   op_status_t do_math();             // begin an INCR/DECR operation
92 
93 private:
94   /* Private member variables */
95   workitem *wqitem;
96   NdbTransaction *tx;
97   QueryPlan * &plan;
98 
99   /* Private methods*/
100   bool setKeyForReading(Operation &);
101   bool startTransaction(Operation &);
102 };
103 
104 
105 /* Whenever an NDB async operation completes, control returns to a
106    callback function defined in executeAsyncPrepare().
107    In case of common errors, the main callback closes the tx and yields.
108    The incr callback has special-case error handling for increments.
109 
110    typedef void ndb_async_callback(int, NdbTransaction *, void *);
111 */
112 
113 ndb_async_callback callback_main;     // general purpose callback
114 ndb_async_callback callback_incr;     // special callback for incr/decr
115 ndb_async_callback callback_close;    // just call worker_close()
116 
117 
118 /*
119    The next step is a function that conforms to the worker_step signature.
120    It must either call yield() or reschedule(), and is also responsible for
121    closing the transaction.  The signature is in ndb_worker.h:
122 
123    FIXME: yield() and reschedule() no longer exist --- what must it do ?????
124 
125    typedef void worker_step(NdbTransaction *, workitem *);
126 */
127 
128 worker_step worker_close;             /* Close tx and yield scheduler */
129 worker_step worker_commit;            /* exec(Commit) if needed before close */
130 worker_step worker_append;
131 worker_step worker_check_read;
132 worker_step worker_finalize_read;
133 worker_step worker_finalize_write;
134 
135 /*****************************************************************************/
136 
137 /* Misc utility functions */
138 void worker_set_cas(ndb_pipeline *, uint64_t *);
139 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val);
140 void build_hash_item(workitem *, Operation &,  ExpireTime &);
141 
142 /* Extern pointers */
143 extern EXTENSION_LOGGER_DESCRIPTOR *logger;
144 
145 /* Prototype for ndb_allocate() from ndb_engine.c: */
146 ENGINE_ERROR_CODE ndb_allocate(ENGINE_HANDLE* handle,
147                                const void* cookie,
148                                item **item,
149                                const void* key,
150                                const size_t nkey,
151                                const size_t nbytes,
152                                const int flags,
153                                const rel_time_t exptime);
154 
155 /* Return Status Descriptions */
156 status_block status_block_generic_success =
157   { ENGINE_SUCCESS , "Transaction succeeded"          };
158 
159 status_block status_block_item_not_found =
160   { ENGINE_KEY_ENOENT,  "Item Not Found"              };
161 
162 status_block status_block_misc_error =
163   { ENGINE_FAILED, "Transaction failed"               };
164 
165 status_block status_block_memcache_error =
166   { ENGINE_FAILED, "Cache level error"                };
167 
168 status_block status_block_cas_mismatch =
169   { ENGINE_KEY_EEXISTS, "CAS mismatch"                };
170 
171 status_block status_block_bad_add =
172   { ENGINE_NOT_STORED, "Duplicate key on insert"      };
173 
174 status_block status_block_bad_replace =
175   { ENGINE_NOT_STORED, "Tuple not found"              };
176 
177 status_block status_block_idx_insert =
178   { ENGINE_NOT_STORED, "Cannot insert via unique index" };
179 
180 status_block status_block_too_big =
181   { ENGINE_E2BIG, "Value too large"                   };
182 
183 status_block status_block_no_mem =
184   { ENGINE_ENOMEM, "NDB out of data memory"           };
185 
186 status_block status_block_temp_failure =
187   { ENGINE_TMPFAIL, "NDB Temporary Error"             };
188 
189 status_block status_block_op_not_supported =
190   { ENGINE_ENOTSUP, "Operation not supported"         };
191 
192 status_block status_block_op_bad_key =
193   { ENGINE_EINVAL,  "Invalid Key"                     };
194 
195 
196 /*  valgrind will complain that setting "* wqitem->cas = x" is an invalid
197     write of 8 bytes.  But this is not a bug, just an artifact of the unorthodox
198     way memcached allocates the (optional) 8 byte CAS ID past the end of a
199     defined structure.
200 */
worker_set_cas(ndb_pipeline * p,uint64_t * cas)201 void worker_set_cas(ndb_pipeline *p, uint64_t *cas) {
202   /* Be careful here --  atomic_int32_t might be a signed type.
203      Shitfting of signed types behaves differently. */
204   bool did_inc;
205   uint32_t cas_lo;
206   uint32_t & cas_hi = p->engine->cas_hi;
207   do {
208     cas_lo = p->engine->cas_lo;
209     did_inc = atomic_cmp_swap_int(& p->engine->cas_lo, cas_lo, cas_lo + 1);
210   } while(! did_inc);
211   *cas = uint64_t(cas_lo) | (uint64_t(cas_hi) << 32);
212   DEBUG_PRINT_DETAIL("hi:%lx lo:%lx cas:%llx (%llu)", cas_hi, cas_lo, *cas, *cas);
213 }
214 
215 
216 /* worker_set_ext_flag():
217    Determine whether a workitem should take the special "external values" path.
218    Sets item->base.use_ext_val
219 */
worker_set_ext_flag(workitem * item)220 void worker_set_ext_flag(workitem *item) {
221   bool result = false;
222 
223   if(item->plan->canHaveExternalValue()) {
224     switch(item->base.verb) {
225       /* INSERTS only need the special path if the value is large */
226       case OPERATION_ADD:
227         result = item->plan->shouldExternalizeValue(item->cache_item->nbytes);
228         break;
229 
230       case OP_ARITHMETIC:
231         result = false;
232         break;
233 
234       default:
235         result = true;
236     }
237   }
238   item->base.use_ext_val = result;
239   DEBUG_PRINT_DETAIL(" %d.%d: %s", item->pipeline->id, item->id, result ? "T" : "F");
240 }
241 
242 
243 /* worker_prepare_operation():
244    Called from the scheduler.
245    Returns op_prepared if Scheduler::execute() has been called on the item.
246 */
worker_prepare_operation(workitem * newitem)247 op_status_t worker_prepare_operation(workitem *newitem) {
248   WorkerStep1 worker(newitem);
249   op_status_t r;
250 
251   worker_set_ext_flag(newitem);
252 
253   /* Jump table */
254   switch(newitem->base.verb) {
255     case OP_READ:
256       r = worker.do_read();
257       break;
258 
259     case OPERATION_APPEND:
260     case OPERATION_PREPEND:
261       r = worker.do_append();
262       break;
263 
264     case OP_DELETE:
265       r = worker.do_delete();
266       break;
267 
268     case OPERATION_SET:
269     case OPERATION_ADD:
270     case OPERATION_REPLACE:
271     case OPERATION_CAS:
272       r = worker.do_write();
273       break;
274 
275     case OP_ARITHMETIC:
276       r = worker.do_math();
277       break;
278 
279     default:
280       r = op_not_supported;
281   }
282 
283   switch(r) {
284     case op_not_supported:
285       newitem->status = & status_block_op_not_supported;
286       break;
287 
288     case op_failed:
289       newitem->status = & status_block_misc_error;
290       break;
291 
292     case op_bad_key:
293       newitem->status = & status_block_op_bad_key;
294       break;
295 
296     case op_overflow:
297       newitem->status = & status_block_too_big;
298       break;
299 
300     case op_prepared:
301       break;
302   }
303 
304   return r;
305 }
306 
307 
308 /***************** STEP ONE OPERATIONS ***************************************/
309 
startTransaction(Operation & op)310 bool WorkerStep1::startTransaction(Operation & op) {
311   tx = op.startTransaction(wqitem->ndb_instance->db);
312   if(tx) {
313     return true;
314   }
315   log_ndb_error(wqitem->ndb_instance->db->getNdbError());
316   return false;
317 }
318 
319 
WorkerStep1(workitem * newitem)320 WorkerStep1::WorkerStep1(workitem *newitem) :
321   wqitem(newitem),
322   tx(0),
323   plan(newitem->plan)
324 {
325   /* Set cas_owner in workitem.
326      (Further refine the semantics of this.  Does it depend on do_mc_read?)
327   */
328     newitem->base.cas_owner = (newitem->prefix_info.has_cas_col);
329 };
330 
331 
do_delete()332 op_status_t WorkerStep1::do_delete() {
333   DEBUG_ENTER_DETAIL();
334 
335   if(wqitem->base.use_ext_val) {
336     return ExternalValue::do_delete(wqitem);
337   }
338 
339   const NdbOperation *ndb_op = 0;
340   Operation op(plan, OP_DELETE);
341 
342   op.key_buffer = wqitem->ndb_key_buffer;
343   const char *dbkey = workitem_get_key_suffix(wqitem);
344   if(! op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix)) {
345     return op_overflow;
346   }
347 
348   if(! startTransaction(op))
349     return op_failed;
350 
351   /* Here we could also support op.deleteTupleCAS(tx, & options)
352      but the protocol is ambiguous about whether this is allowed.
353   */
354   ndb_op = op.deleteTuple(tx);
355 
356   /* Check for errors */
357   if(ndb_op == 0) {
358     const NdbError & err = tx->getNdbError();
359     if(err.status != NdbError::Success) {
360       log_ndb_error(err);
361       tx->close();
362       return op_failed;
363     }
364   }
365 
366   /* Prepare for execution */
367   Scheduler::execute(tx, NdbTransaction::Commit, callback_main, wqitem, YIELD);
368   return op_prepared;
369 }
370 
371 
do_write()372 op_status_t WorkerStep1::do_write() {
373   DEBUG_PRINT_DETAIL("%s", workitem_get_operation(wqitem));
374 
375   if(wqitem->base.use_ext_val) {
376     return ExternalValue::do_write(wqitem);
377   }
378 
379   uint64_t cas_in = *wqitem->cas;                  // read old value
380   if(wqitem->base.cas_owner) {
381     worker_set_cas(wqitem->pipeline, wqitem->cas);    // generate a new value
382     hash_item_set_cas(wqitem->cache_item, * wqitem->cas); // store it
383   }
384 
385   const NdbOperation *ndb_op = 0;
386   Operation op(wqitem);
387   const char *dbkey = workitem_get_key_suffix(wqitem);
388   bool op_ok;
389 
390   /* Set the key */
391   op_ok = op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
392   if(! op_ok) {
393     return op_overflow;
394   }
395 
396   /* Allocate and encode the buffer for the row */
397   workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer());
398   op.buffer = wqitem->row_buffer_1;
399 
400   /* Set the row */
401   op.setNullBits();
402   op.setKeyFieldsInRow(plan->spec->nkeycols,  dbkey, wqitem->base.nsuffix);
403 
404   if(plan->spec->nvaluecols > 1) {
405     /* Multiple Value Columns */
406     TabSeparatedValues tsv(hash_item_get_data(wqitem->cache_item),
407                            plan->spec->nvaluecols, wqitem->cache_item->nbytes);
408     int idx = 0;
409     do {
410       if(tsv.getLength()) {
411         op_ok = op.setColumn(COL_STORE_VALUE+idx, tsv.getPointer(), tsv.getLength());
412         if(! op_ok) {
413           return op_overflow;
414         }
415       }
416       else {
417         op.setColumnNull(COL_STORE_VALUE+idx);
418       }
419       idx++;
420     } while (tsv.advance());
421   }
422   else {
423     /* Just one value column */
424     op_ok = op.setColumn(COL_STORE_VALUE, hash_item_get_data(wqitem->cache_item),
425                          wqitem->cache_item->nbytes);
426     if(! op_ok) {
427       return op_overflow;
428     }
429   }
430 
431   if(wqitem->base.cas_owner) {
432     op.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);   // the cas
433   }
434 
435   if(wqitem->plan->dup_numbers) {
436     if(isdigit(* hash_item_get_data(wqitem->cache_item)) &&
437        wqitem->cache_item->nbytes < 32) {      // Copy string representation
438       uint64_t number;
439       const int len = wqitem->cache_item->nbytes;
440       char value[32];
441       for(int i = 0 ; i  < len ; i++)
442         value[i] = * (hash_item_get_data(wqitem->cache_item) + i);
443       value[len] = 0;
444       if(safe_strtoull(value, &number)) { // numeric: set the math column
445         DEBUG_PRINT_DETAIL(" dup_numbers -- %d", (int) number );
446         op.setColumnBigUnsigned(COL_STORE_MATH, number);
447       }
448       else {  // non-numeric
449         DEBUG_PRINT_DETAIL(" dup_numbers but non-numeric: %.*s *** ", len, value);
450         op.setColumnNull(COL_STORE_MATH);
451       }
452     }
453     else op.setColumnNull(COL_STORE_MATH);
454   }
455 
456   /* Set expire time */
457   rel_time_t exptime = hash_item_get_exptime(wqitem->cache_item);
458   if(exptime && wqitem->prefix_info.has_expire_col) {
459     time_t abs_expires =
460       wqitem->pipeline->engine->server.core->abstime(exptime);
461     op.setColumnInt(COL_STORE_EXPIRES, abs_expires);
462   }
463 
464   /* Set flags */
465   if(wqitem->prefix_info.has_flags_col) {
466     uint32_t flags = hash_item_get_flags(wqitem->cache_item);
467     op.setColumnInt(COL_STORE_FLAGS, ntohl(flags));
468   }
469 
470   /* Start the transaction */
471   if(! startTransaction(op))
472     return op_failed;
473 
474   if(wqitem->base.verb == OPERATION_REPLACE) {
475     DEBUG_PRINT(" [REPLACE] \"%.*s\"", wqitem->base.nkey, wqitem->key);
476     ndb_op = op.updateTuple(tx);
477   }
478   else if(wqitem->base.verb == OPERATION_ADD) {
479     DEBUG_PRINT(" [ADD]     \"%.*s\"", wqitem->base.nkey, wqitem->key);
480     ndb_op = op.insertTuple(tx);
481   }
482   else if(wqitem->base.verb == OPERATION_CAS) {
483     if(wqitem->base.cas_owner) {
484       /* NdbOperation.hpp says: "All data is copied out of the OperationOptions
485        structure (and any subtended structures) at operation definition time."
486        */
487       DEBUG_PRINT(" [CAS UPDATE:%llu]     \"%.*s\"", cas_in, wqitem->base.nkey, wqitem->key);
488       const Uint32 program_size = 25;
489       Uint32 program[program_size];
490       NdbInterpretedCode cas_code(plan->table, program, program_size);
491       NdbOperation::OperationOptions options;
492       build_cas_routine(& cas_code, plan->cas_column_id, cas_in);
493       options.optionsPresent = NdbOperation::OperationOptions::OO_INTERPRETED;
494       options.interpretedCode = & cas_code;
495       ndb_op = op.updateTuple(tx, & options);
496     }
497   }
498   else if(wqitem->base.verb == OPERATION_SET) {
499     DEBUG_PRINT(" [SET]     \"%.*s\"", wqitem->base.nkey, wqitem->key);
500     ndb_op = op.writeTuple(tx);
501   }
502 
503   /* Error case; operation has not been built */
504   if(! ndb_op) {
505     log_ndb_error(tx->getNdbError());
506     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", wqitem->pipeline->id,
507                 wqitem->id);
508     tx->close();
509     return op_failed;
510   }
511 
512   wqitem->next_step = (void *) worker_finalize_write;
513   Scheduler::execute(tx, NdbTransaction::Commit, callback_main, wqitem, YIELD);
514   return op_prepared;
515 }
516 
517 
do_read()518 op_status_t WorkerStep1::do_read() {
519   DEBUG_ENTER_DETAIL();
520 
521   Operation op(plan, OP_READ);
522   if(! setKeyForReading(op)) {
523     return op_overflow;
524   }
525 
526   NdbOperation::LockMode lockmode;
527   NdbTransaction::ExecType commitflag;
528   if(plan->canUseCommittedRead()) {
529     lockmode = NdbOperation::LM_CommittedRead;
530     commitflag = NdbTransaction::Commit;
531   }
532   else {
533     lockmode = NdbOperation::LM_Read;
534     commitflag = NdbTransaction::NoCommit;
535   }
536 
537   if(! op.readTuple(tx, lockmode)) {
538     log_ndb_error(tx->getNdbError());
539     tx->close();
540     return op_failed;
541   }
542 
543   /* Save the workitem in the transaction and prepare for async execution */
544   wqitem->next_step = (void *)
545     (wqitem->base.use_ext_val ? worker_check_read : worker_finalize_read);
546   Scheduler::execute(tx, commitflag, callback_main, wqitem, YIELD);
547   return op_prepared;
548 }
549 
550 
do_append()551 op_status_t WorkerStep1::do_append() {
552   DEBUG_ENTER_DETAIL();
553 
554   /* APPEND/PREPEND is currently not supported for tsv */
555   if(wqitem->plan->spec->nvaluecols > 1) {
556     return op_not_supported;
557   }
558   Operation op(plan, OP_READ);
559   if(! setKeyForReading(op)) {
560     return op_overflow;
561   }
562 
563   /* Read with an exculsive lock */
564   if(! op.readTuple(tx, NdbOperation::LM_Exclusive)) {
565     log_ndb_error(tx->getNdbError());
566     tx->close();
567     return op_failed;
568   }
569 
570   /* Prepare for async execution */
571   wqitem->next_step = (void *) worker_append;
572   Scheduler::execute(tx, NdbTransaction::NoCommit, callback_main, wqitem, YIELD);
573   return op_prepared;
574 }
575 
576 
setKeyForReading(Operation & op)577 bool WorkerStep1::setKeyForReading(Operation &op) {
578 
579   /* Use the workitem's inline key buffer */
580   op.key_buffer = wqitem->ndb_key_buffer;
581 
582   /*  Allocate a new result buffer large enough for the result.
583    Add 2 bytes to hold potential \r\n in a no-copy result. */
584   workitem_allocate_rowbuffer_1(wqitem, op.requiredBuffer() + 2);
585   op.buffer = wqitem->row_buffer_1;
586 
587   /* set the key */
588   op.clearKeyNullBits();
589   const char *dbkey = workitem_get_key_suffix(wqitem);
590   if(! op.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix))
591     return false;
592 
593   /* Start a transaction */
594   return startTransaction(op);
595 }
596 
597 
598 
do_math()599 op_status_t WorkerStep1::do_math() {
600   DEBUG_PRINT_DETAIL("create: %d   retries: %d",
601                      wqitem->base.math_create, wqitem->base.retries);
602   worker_set_cas(wqitem->pipeline, wqitem->cas);
603 
604   /*
605    Begin transaction
606    1. readTuple  (LM_Exclusive)
607    2. if(create_flag)
608    insertTuple, setting value to initial_value - delta (AO_IgnoreError)
609    3. updateTuple (interpreted: add delta to value)
610    Execute (Commit)
611 
612    Then look at the error codes from all 3 operations to see what happened:
613 
614    read  insert  update  response
615    --------------------------------------------------------------------------
616    626   0       0       row was created.  return initial_value.
617    0     630     0       row existed.  return fetched_value + delta.
618    x     x       626     failure due to race with concurrent delete
619    */
620 
621   const char *dbkey = workitem_get_key_suffix(wqitem);
622 
623   /* This transaction involves up to three NdbOperations. */
624   const NdbOperation *ndbop1 = 0;
625   const NdbOperation *ndbop2 = 0;
626   const NdbOperation *ndbop3 = 0;
627 
628   /* "Operation" is really just a header-only library for convenience and
629    safety.  We use 2 of them here -- one for the read, the other for the
630    update and insert.  This is only because they will make slightly different
631    use of records and buffers.   Both will use the inline key buffer.
632    */
633   Operation op1(plan, OP_READ, wqitem->ndb_key_buffer);
634   Operation op2(wqitem);    // insert
635   Operation op3(wqitem);    // update
636 
637   op1.readSelectedColumns();
638   op1.readColumn(COL_STORE_MATH);
639 
640   if(! wqitem->base.retries) {
641     /* Allocate & populate row buffers for these operations:
642      We need one for the read and one for the insert.  */
643     size_t needed = op1.requiredBuffer();
644     workitem_allocate_rowbuffer_1(wqitem, needed);
645     workitem_allocate_rowbuffer_2(wqitem, needed);
646     op1.buffer = wqitem->row_buffer_1;
647     op2.buffer = wqitem->row_buffer_2;
648     op3.buffer = wqitem->row_buffer_2;
649 
650     /* The two items share a key buffer, so we encode the key just once */
651     op1.setKey(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
652 
653     /* The insert operation also needs the key written into the row */
654     op2.clearNullBits();
655     op2.setKeyFieldsInRow(plan->spec->nkeycols, dbkey, wqitem->base.nsuffix);
656 
657     /* CAS */
658     if(wqitem->base.cas_owner) {
659       op1.readColumn(COL_STORE_CAS);
660       op2.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);
661       op3.setColumnBigUnsigned(COL_STORE_CAS, * wqitem->cas);
662     }
663     /* In "dup_numbers" mode, also null out the text version of the value */
664     if(wqitem->plan->dup_numbers) {
665       op2.setColumnNull(COL_STORE_VALUE);
666       op3.setColumnNull(COL_STORE_VALUE);
667     }
668   }
669 
670   /* Use an op (either one) to start the transaction */
671   if(! startTransaction(op1))
672     return op_failed;
673 
674   /* NdbOperation #1: READ */
675   {
676     ndbop1 = op1.readTuple(tx, NdbOperation::LM_Exclusive);
677     if(! ndbop1) {
678       log_ndb_error(tx->getNdbError());
679       tx->close();
680       return op_failed;
681     }
682   }
683 
684   /* NdbOperation #2: INSERT (AO_IgnoreError) */
685   if(wqitem->base.math_create) {
686     /* Offset the initial value to compensate for the update */
687     uint64_t initial_value;
688     if(wqitem->base.math_incr)
689       initial_value = wqitem->math_value - wqitem->math_flags;  // incr
690     else
691       initial_value = wqitem->math_value + wqitem->math_flags;  // decr
692     op2.setColumnBigUnsigned(COL_STORE_MATH, initial_value);
693 
694     /* If this insert gets an error, the transaction should continue. */
695     NdbOperation::OperationOptions options;
696     options.optionsPresent = NdbOperation::OperationOptions::OO_ABORTOPTION;
697     options.abortOption = NdbOperation::AO_IgnoreError;
698 
699     ndbop2 = op2.insertTuple(tx, & options);
700     if(! ndbop2) {
701       log_ndb_error(tx->getNdbError());
702       tx->close();
703       return op_failed;
704     }
705   }
706 
707   /* NdbOperation #3: Interpreted Update */
708   {
709     NdbOperation::OperationOptions options;
710     const Uint32 program_size = 32;
711     Uint32 program[program_size];
712     NdbInterpretedCode code(plan->table, program, program_size);
713 
714     if(wqitem->base.math_incr) {                                  // incr
715       code.add_val(plan->math_column_id, wqitem->math_flags);
716       code.interpret_exit_ok();
717     }
718     else {                                                        // decr
719       const Uint32 Rdel = 1, Rcol = 2, Rres = 3;       // registers 1,2,3
720       const Uint32 SUB_ZERO = 0;                       // a label
721 
722       code.load_const_u64(Rdel, wqitem->math_flags);   // load R1, delta
723       code.read_attr     (Rcol, plan->math_column_id); // load R2, math_col
724       code.branch_gt     (Rdel, Rcol, SUB_ZERO);       // if R1 > R2 goto SUB_ZERO
725       code.sub_reg       (Rres, Rcol, Rdel);           // let R3 = R2 - R1
726       code.write_attr    (plan->math_column_id, Rres); // Store into column
727       code.interpret_exit_ok();
728       code.def_label     (SUB_ZERO);
729       code.load_const_u64(Rres, 0);                    // Set to zero
730       code.write_attr    (plan->math_column_id, Rres); // Store into column
731       code.interpret_exit_ok();
732     }
733 
734     code.finalise();
735 
736     options.optionsPresent = NdbOperation::OperationOptions::OO_INTERPRETED;
737     options.interpretedCode = & code;
738 
739     ndbop3 = op3.updateTuple(tx, & options);
740     if(! ndbop3) {
741       log_ndb_error(tx->getNdbError());
742       tx->close();
743       return op_failed;
744     }
745   }
746 
747   Scheduler::execute(tx,NdbTransaction::Commit, callback_incr, wqitem, YIELD);
748   return op_prepared;
749 }
750 
751 
752 /***************** NDB CALLBACKS *********************************************/
debug_workitem(workitem * item)753 void debug_workitem(workitem *item) {
754   DEBUG_PRINT("%d.%d %s %s %s",
755     item->pipeline->id,
756     item->id,
757     item->plan->table->getName(),
758     workitem_get_operation(item),
759     workitem_get_key_suffix(item));
760 }
761 
callback_main(int,NdbTransaction * tx,void * itemptr)762 void callback_main(int, NdbTransaction *tx, void *itemptr) {
763   workitem *wqitem = (workitem *) itemptr;
764 
765   /************** Error handling ***********/
766   /* No Error */
767   if(tx->getNdbError().classification == NdbError::NoError) {
768     DEBUG_PRINT("Success.");
769     wqitem->status = & status_block_generic_success;
770     if(wqitem->next_step) {
771       /* Control moves forward to the next step of the operation */
772       worker_step * next_step = (worker_step *) wqitem->next_step;
773       wqitem->next_step = 0;
774       next_step(tx, wqitem);
775       return;
776     }
777   }
778   /* CAS mismatch; interpreted code aborted with interpret_exit_nok() */
779   else if(tx->getNdbError().code == 2010) {
780     DEBUG_PRINT("CAS mismatch.");
781     * wqitem->cas = 0ULL;  // set cas=0 in the response. see note re. valgrind.
782     wqitem->status = & status_block_cas_mismatch;
783   }
784   /* No Data Found */
785   else if(tx->getNdbError().classification == NdbError::NoDataFound) {
786     /* Error code should be 626 */
787     DEBUG_PRINT("NoDataFound [%d].", tx->getNdbError().code);
788     if(wqitem->cas) * wqitem->cas = 0ULL;   // see note re. valgrind
789     switch(wqitem->base.verb) {
790       case OPERATION_REPLACE:
791       case OPERATION_APPEND:
792       case OPERATION_PREPEND:
793         wqitem->status = & status_block_bad_replace;
794         break;
795       default:
796         wqitem->status = & status_block_item_not_found;
797         break;
798     }
799   }
800   /* Duplicate key on insert */
801   else if(tx->getNdbError().code == 630) {
802     DEBUG_PRINT("Duplicate key on insert.");
803     if(wqitem->cas) * wqitem->cas = 0ULL;   // see note re. valgrind
804     wqitem->status = & status_block_bad_add;
805   }
806   /* Overload Error, e.g. 410 "REDO log files overloaded" */
807   else if(tx->getNdbError().classification == NdbError::OverloadError) {
808     log_ndb_error(tx->getNdbError());
809     wqitem->status = & status_block_temp_failure;
810   }
811   /* Attempt to insert via unique index access */
812   else if(tx->getNdbError().code == 897) {
813     wqitem->status = & status_block_idx_insert;
814   }
815   /* Out of memory */
816   else if(tx->getNdbError().code == 827) {
817     log_ndb_error(tx->getNdbError());
818     wqitem->status = & status_block_no_mem;
819   }
820   /* Some other error.
821      The get("dummy") in mtr's memcached_wait_for_ready.inc script will often
822      get a 241 or 284 error here.
823   */
824   else  {
825     log_ndb_error(tx->getNdbError());
826     debug_workitem(wqitem);
827     wqitem->status = & status_block_misc_error;
828   }
829 
830   worker_commit(tx, wqitem);
831 }
832 
833 
callback_incr(int result,NdbTransaction * tx,void * itemptr)834 void callback_incr(int result, NdbTransaction *tx, void *itemptr) {
835   workitem *wqitem = (workitem *) itemptr;
836   // ndb_pipeline * & pipeline = wqitem->pipeline;
837 
838   /*  read  insert  update cr_flag response
839    ------------------------------------------------------------------------
840    626   0       0        0     return NOT_FOUND.
841    626   0       0        1     row was created.  return initial_value.
842    0     x       0        x     row existed.  return fetched_value + delta.
843    x     x       626      x     failure due to race with concurrent delete.
844    */
845 
846   const NdbOperation *ndbop1, *ndbop2, *ndbop3;
847   int r_read = -1;
848   int r_insert = -1;
849   int r_update = -1;
850 
851   ndbop1 = tx->getNextCompletedOperation(NULL);
852   r_read = ndbop1->getNdbError().code;
853 
854   if(ndbop1) {  /* ndbop1 is the read operation */
855     if(wqitem->base.math_create) {
856       ndbop2 = tx->getNextCompletedOperation(ndbop1);  /* the insert */
857       r_insert = ndbop2->getNdbError().code;
858     }
859     else {
860       ndbop2 = ndbop1;  /* no insert (create flag was not set) */
861       r_insert = 0;
862     }
863     if(ndbop2) {
864       ndbop3 = tx->getNextCompletedOperation(ndbop2);  /* the update */
865       r_update = ndbop3->getNdbError().code;
866     }
867   }
868   DEBUG_PRINT_DETAIL("r_read: %d   r_insert: %d   r_update: %d   create: %d",
869               r_read, r_insert, r_update, wqitem->base.math_create);
870 
871   if(r_read == 626 && ! wqitem->base.math_create) {
872     /* row did not exist, and create flag was not set */
873     wqitem->status = & status_block_item_not_found;
874   }
875   else if(r_read == 0 && r_update == 0) {
876     /* row existed.  return fetched_value +/- delta. */
877     Operation op(wqitem->plan, OP_READ);
878     op.buffer = wqitem->row_buffer_1;
879     uint64_t stored = op.getBigUnsignedValue(COL_STORE_MATH);
880     if(wqitem->base.math_incr) {              // incr
881       wqitem->math_value = stored + wqitem->math_flags;
882     }
883     else {                                    // decr
884       if(wqitem->math_flags > stored)
885         wqitem->math_value = 0; // underflow < 0 is not allowed
886       else
887         wqitem->math_value = stored - wqitem->math_flags;
888     }
889 
890     wqitem->status = & status_block_generic_success;
891   }
892   else if(r_read == 626 && r_insert == 0 && r_update == 0) {
893     /* row was created.   Return initial_value.
894      wqitem->math_value is already set to the initial_value :)  */
895     wqitem->status = & status_block_generic_success;
896   }
897   else if(r_read == -1 || r_insert == -1 || r_update == -1) {
898     /* Total failure */
899     logger->log(LOG_WARNING, 0, "incr/decr: total failure.\n");
900     wqitem->status = & status_block_misc_error;
901   }
902   else if(r_update == 626) {
903     /*  failure due to race with concurrent delete */
904     // TODO: design a test for this code.  Does it require reschedule()?
905     if(wqitem->base.retries++ < 3) {       // try again:
906       tx->close();
907       op_status_t r = worker_prepare_operation(wqitem);
908       if(r == op_prepared)
909         return;  /* retry is in progress */
910       else
911         wqitem->status = & status_block_misc_error;
912     }
913     else {
914       logger->log(LOG_WARNING, 0, "incr/decr: giving up, too many retries.\n");
915       wqitem->status = & status_block_misc_error;
916     }
917   }
918 
919   worker_close(tx, wqitem);
920 }
921 
922 
callback_close(int result,NdbTransaction * tx,void * itemptr)923 void callback_close(int result, NdbTransaction *tx, void *itemptr) {
924   if(result) log_ndb_error(tx->getNdbError());
925   workitem *wqitem = (workitem *) itemptr;
926   worker_close(tx, wqitem);
927 }
928 
929 
930 /***************** WORKER STEPS **********************************************/
931 
worker_commit(NdbTransaction * tx,workitem * item)932 void worker_commit(NdbTransaction *tx, workitem *item) {
933   /* If the transaction has not been committed, we need to send an empty
934      execute call and commit it.  Otherwise close() will block. */
935   if(tx->commitStatus() == NdbTransaction::Started) {
936     Scheduler::execute(tx, NdbTransaction::Commit, callback_close, item, RESCHEDULE);
937   }
938   else
939     worker_close(tx, item);
940 }
941 
942 
worker_close(NdbTransaction * tx,workitem * wqitem)943 void worker_close(NdbTransaction *tx, workitem *wqitem) {
944   DEBUG_PRINT_DETAIL("%d.%d", wqitem->pipeline->id, wqitem->id);
945   ndb_pipeline * & pipeline = wqitem->pipeline;
946 
947   if(wqitem->ext_val)
948     delete wqitem->ext_val;
949 
950   pipeline->scheduler->close(tx, wqitem);
951 }
952 
953 
worker_append(NdbTransaction * tx,workitem * item)954 void worker_append(NdbTransaction *tx, workitem *item) {
955   if(item->base.use_ext_val) {
956     ExternalValue::append_after_read(tx, item);
957     return;
958   }
959 
960   DEBUG_PRINT("%d.%d", item->pipeline->id, item->id);
961 
962   /* Strings and lengths: */
963   char * current_val = 0;
964   size_t current_len = 0;
965   const char * affix_val = hash_item_get_data(item->cache_item);
966   const size_t affix_len = item->cache_item->nbytes;
967 
968   /* worker_do_read() has already written the key into item->ndb_key_buffer.
969      The result is sitting in wqitem->row_buffer_1.
970      Read the value.
971   */
972   Operation readop(item->plan, OP_READ);
973   readop.buffer = item->row_buffer_1;
974   if(readop.nValues() != 1) {
975     return worker_close(tx, item);
976   }
977   readop.getStringValueNoCopy(COL_STORE_VALUE + 0, & current_val, & current_len);
978 
979   /* Generate a new CAS */
980   worker_set_cas(item->pipeline, item->cas);
981   hash_item_set_cas(item->cache_item, * item->cas);
982 
983   /* Prepare a write operation */
984   Operation op(item->plan, item->base.verb, item->ndb_key_buffer);
985   const NdbOperation *ndb_op = 0;
986 
987   /* Allocate a buffer for the new value */
988   size_t max_len = op.requiredBuffer();
989   workitem_allocate_rowbuffer_2(item, max_len);
990   op.buffer = item->row_buffer_2;
991 
992   /* Rewrite the value */
993   size_t total_len = affix_len + current_len;
994   if(total_len > max_len) total_len = max_len;
995   if(item->base.verb == OPERATION_APPEND) {
996     memcpy(current_val + current_len, affix_val, total_len - current_len);
997   }
998   else {
999     assert(item->base.verb == OPERATION_PREPEND);
1000     memmove(current_val + affix_len, current_val, current_len);
1001     memcpy(current_val, affix_val, affix_len);
1002   }
1003   * (current_val + total_len) = 0;
1004   DEBUG_PRINT_DETAIL("New value: %.*s%s", total_len < 100 ? total_len : 100,
1005                      current_val, total_len > 100 ? " ..." : "");
1006 
1007   /* Set the row */
1008   op.setNullBits();
1009   op.setKeyFieldsInRow(item->plan->spec->nkeycols,
1010                        workitem_get_key_suffix(item), item->base.nsuffix);
1011   op.setColumn(COL_STORE_VALUE, current_val, total_len);
1012   if(item->prefix_info.has_cas_col)
1013     op.setColumnBigUnsigned(COL_STORE_CAS, * item->cas);
1014   ndb_op = op.updateTuple(tx);
1015 
1016   if(ndb_op) {
1017     // Inform the scheduler that this item must be re-polled
1018     item->next_step = (void *) worker_finalize_write;
1019     Scheduler::execute(tx, NdbTransaction::Commit, callback_main, item, RESCHEDULE);
1020   }
1021   else {
1022     /* Error case; operation has not been built */
1023     DEBUG_PRINT("NDB operation failed.  workitem %d.%d", item->pipeline->id,
1024                 item->id);
1025     worker_close(tx, item);
1026   }
1027 }
1028 
1029 
worker_check_read(NdbTransaction * tx,workitem * wqitem)1030 void worker_check_read(NdbTransaction *tx, workitem *wqitem) {
1031   Operation op(wqitem->plan, OP_READ);
1032   op.buffer = wqitem->row_buffer_1;
1033 
1034   if(op.isNull(COL_STORE_EXT_SIZE)) {
1035     worker_finalize_read(tx, wqitem);
1036   }
1037   else {
1038     ExternalValue *ext_val = new ExternalValue(wqitem);
1039     ext_val->worker_read_external(op, tx);
1040   }
1041 }
1042 
1043 
delete_expired_item(workitem * wqitem,NdbTransaction * tx)1044 void delete_expired_item(workitem *wqitem, NdbTransaction *tx) {
1045   DEBUG_PRINT(" Deleting [%d.%d]", wqitem->pipeline->id, wqitem->id);
1046   Operation op(wqitem);
1047   op.deleteTuple(tx);
1048   wqitem->status = & status_block_item_not_found;
1049   Scheduler::execute(tx, NdbTransaction::Commit, callback_close, wqitem, RESCHEDULE);
1050 }
1051 
1052 
worker_finalize_read(NdbTransaction * tx,workitem * wqitem)1053 void worker_finalize_read(NdbTransaction *tx, workitem *wqitem) {
1054   ExpireTime exp_time(wqitem);
1055   Operation op(wqitem->plan, OP_READ);
1056   op.buffer = wqitem->row_buffer_1;
1057 
1058   if(exp_time.stored_item_has_expired(op)) {
1059     delete_expired_item(wqitem, tx);
1060     return;
1061   }
1062 
1063   if(wqitem->prefix_info.has_flags_col && ! op.isNull(COL_STORE_FLAGS))
1064     wqitem->math_flags = htonl(op.getIntValue(COL_STORE_FLAGS));
1065   else if(wqitem->plan->static_flags)
1066     wqitem->math_flags = htonl(wqitem->plan->static_flags);
1067   else
1068     wqitem->math_flags = 0;
1069 
1070   if(wqitem->prefix_info.has_cas_col) {
1071     wqitem->cas = (uint64_t *) op.getPointer(COL_STORE_CAS);
1072   }
1073 
1074   /* Try to send the value from the row_buffer without copying it. */
1075   if(    (! wqitem->prefix_info.do_mc_read)
1076      && op.nValues() == 1
1077      && ! (op.isNull(COL_STORE_VALUE) && wqitem->plan->dup_numbers)
1078      && op.getStringValueNoCopy(COL_STORE_VALUE, & wqitem->value_ptr, & wqitem->value_size)
1079      && op.appendCRLF(COL_STORE_VALUE, wqitem->value_size))
1080   {
1081     /* The workitem's value_ptr and value_size were set above. */
1082     DEBUG_PRINT("%d.%d using no-copy buffer.", wqitem->pipeline->id, wqitem->id);
1083     wqitem->base.has_value = true;
1084     /* "cache_item == workitem" is a sort of code, required because memcached
1085         expects us to return a non-zero item.  In ndb_release() we will look
1086         for this and use it to prevent double-freeing of the workitem.  */
1087     wqitem->cache_item = (hash_item *) wqitem;
1088   }
1089   else {
1090     /* Copy the value into a new buffer */
1091     DEBUG_PRINT("%d.%d copying value.", wqitem->pipeline->id, wqitem->id);
1092     build_hash_item(wqitem, op, exp_time);
1093   }
1094 
1095   worker_commit(tx, wqitem);
1096 }
1097 
1098 
worker_finalize_write(NdbTransaction * tx,workitem * wqitem)1099 void worker_finalize_write(NdbTransaction *tx, workitem *wqitem) {
1100   if(wqitem->prefix_info.do_mc_write) {
1101     /* If the write was successful, update the local cache */
1102     /* Possible bugs here:
1103      (1) store_item will store nbytes as length, which is wrong.
1104      (2) The CAS may be incorrect.
1105      Status as of Feb. 2013:
1106         Memcapable INCR/DECR/APPEND/PREPEND tests fail when
1107         local caching is enabled.
1108     */
1109     ndb_pipeline * & pipeline = wqitem->pipeline;
1110     struct default_engine * se;
1111     se = (struct default_engine *) pipeline->engine->m_default_engine;
1112     ENGINE_ERROR_CODE status;
1113 
1114     status = store_item(se, wqitem->cache_item,
1115                    hash_item_get_cas_ptr(wqitem->cache_item),
1116                    OPERATION_SET, wqitem->cookie);
1117     if (status != ENGINE_SUCCESS) {
1118       wqitem->status = & status_block_memcache_error;
1119     }
1120   }
1121 
1122   worker_close(tx, wqitem);
1123 }
1124 
1125 
1126 /*****************************************************************************/
1127 
1128 
1129 
1130 /*  Allocate a hash table item, populate it with the original key
1131     and the results from the read, then store it.
1132  */
build_hash_item(workitem * wqitem,Operation & op,ExpireTime & exp_time)1133 void build_hash_item(workitem *wqitem, Operation &op, ExpireTime & exp_time) {
1134   DEBUG_ENTER();
1135   ndb_pipeline * & pipeline = wqitem->pipeline;
1136   struct default_engine *se;
1137   se = (struct default_engine *) pipeline->engine->m_default_engine;
1138 
1139   size_t nbytes = op.getStringifiedLength() + 2;  /* 2 bytes for \r\n */
1140 
1141   /* Allocate a hash item */
1142   /* item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie) */
1143   hash_item * item = item_alloc(se, wqitem->key, wqitem->base.nkey,
1144                                 wqitem->math_flags,
1145                                 exp_time.local_cache_expire_time,
1146                                 nbytes, wqitem->cookie);
1147 
1148   if(item) {
1149     /* Now populate the item with the result */
1150     size_t ncopied = 0;
1151     memcpy(hash_item_get_key(item), wqitem->key, wqitem->base.nkey); // the key
1152     char * data_ptr = hash_item_get_data(item);
1153 
1154     /* Maybe use the math column as the value */
1155     if(    wqitem->plan->hasMathColumn()
1156         && (! op.isNull(COL_STORE_MATH))
1157         && ( (op.nValues() == 0)
1158              || (wqitem->plan->dup_numbers && op.isNull(COL_STORE_VALUE))
1159            )
1160        ) {
1161       ncopied = op.copyValue(COL_STORE_MATH, data_ptr);
1162     }
1163     else {
1164       /* Build a result containing each column */
1165       for(int i = 0 ; i < op.nValues() ; i++) {
1166         if(i) * (data_ptr + ncopied++) = '\t';
1167         ncopied += op.copyValue(COL_STORE_VALUE + i, data_ptr + ncopied);
1168       }
1169     }
1170 
1171     /* pad the value with \r\n -- memcached expects it there. */
1172     * (data_ptr + ncopied)     = '\r';
1173     * (data_ptr + ncopied + 1) = '\n';
1174     * (data_ptr + ncopied + 2) = '\0';
1175     DEBUG_PRINT("nbytes: %d   ncopied: %d", nbytes, ncopied + 2);
1176 
1177     /* Point to it in the workitem */
1178     wqitem->cache_item = item;
1179     wqitem->value_size = ncopied;
1180 
1181     /* store it in the local cache? */
1182     // fixme: probably nbytes is wrong
1183     if(wqitem->prefix_info.do_mc_read) {
1184       uint64_t *cas = hash_item_get_cas_ptr(item);
1185       ENGINE_ERROR_CODE status;
1186       status = store_item(se, item, cas, OPERATION_SET, wqitem->cookie);
1187       if(status != ENGINE_SUCCESS)
1188         wqitem->status = & status_block_memcache_error;
1189     }
1190   }
1191   else {
1192     DEBUG_PRINT("Failure.  Item: %p", item);
1193     wqitem->status = & status_block_memcache_error;
1194   }
1195 }
1196 
1197 
build_cas_routine(NdbInterpretedCode * r,int cas_col,uint64_t cas_val)1198 int build_cas_routine(NdbInterpretedCode *r, int cas_col, uint64_t cas_val) {
1199   const Uint32 R1 = 1;  // a register
1200   const Uint32 R2 = 2;  // a register
1201   const Uint32 MISMATCH = 0;  // a branch label
1202 
1203   DEBUG_PRINT("cas_col: %d,  cas_val: %llu", cas_col, cas_val);
1204 
1205   /* Branch on cas_value != cas_column */
1206   r->load_const_u64(R1, cas_val);            // load the CAS into R1
1207   r->read_attr(R2, cas_col);                 // read the cas column into R2
1208   r->branch_ne(R1, R2, MISMATCH);            // if(R1 != R2) goto MISMATCH
1209 
1210   /* Here is the cas_value == cas_column branch: */
1211   r->interpret_exit_ok();                    // allow operation to succeed
1212 
1213   /* Here is the cas_value != cas_column branch: */
1214   r->def_label(MISMATCH);
1215   r->interpret_exit_nok(2010);               // abort the operation
1216 
1217   return r->finalise();                      // resolve the label/branch
1218 }
1219 
1220