1 /*
2    Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "Trix.hpp"
26 
27 #include <string.h>
28 #include <kernel_types.h>
29 #include <NdbOut.hpp>
30 
31 #include <signaldata/ReadNodesConf.hpp>
32 #include <signaldata/NodeFailRep.hpp>
33 #include <signaldata/DumpStateOrd.hpp>
34 #include <signaldata/GetTabInfo.hpp>
35 #include <signaldata/DictTabInfo.hpp>
36 #include <signaldata/CopyData.hpp>
37 #include <signaldata/BuildIndxImpl.hpp>
38 #include <signaldata/BuildFKImpl.hpp>
39 #include <signaldata/SumaImpl.hpp>
40 #include <signaldata/UtilPrepare.hpp>
41 #include <signaldata/UtilExecute.hpp>
42 #include <signaldata/UtilRelease.hpp>
43 #include <SectionReader.hpp>
44 #include <AttributeHeader.hpp>
45 #include <signaldata/TcKeyReq.hpp>
46 
47 #include <signaldata/DbinfoScan.hpp>
48 #include <signaldata/TransIdAI.hpp>
49 #include <signaldata/WaitGCP.hpp>
50 
51 #define JAM_FILE_ID 433
52 
53 
54 #define CONSTRAINT_VIOLATION 893
55 #define TUPLE_NOT_FOUND 626
56 #define FK_NO_PARENT_ROW_EXISTS 255
57 
58 static
59 bool
check_timeout(Uint32 errCode)60 check_timeout(Uint32 errCode)
61 {
62   switch(errCode){
63   case 266:
64     return true;
65   }
66   return false;
67 }
68 
69 #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
70 
71 /**
72  *
73  */
Trix(Block_context & ctx)74 Trix::Trix(Block_context& ctx) :
75   SimulatedBlock(TRIX, ctx),
76   c_theNodes(c_theNodeRecPool),
77   c_masterNodeId(0),
78   c_masterTrixRef(0),
79   c_noNodesFailed(0),
80   c_noActiveNodes(0),
81   c_theSubscriptions(c_theSubscriptionRecPool)
82 {
83   BLOCK_CONSTRUCTOR(Trix);
84 
85   // Add received signals
86   addRecSignal(GSN_READ_CONFIG_REQ,  &Trix::execREAD_CONFIG_REQ);
87   addRecSignal(GSN_STTOR,  &Trix::execSTTOR);
88   addRecSignal(GSN_NDB_STTOR,  &Trix::execNDB_STTOR); // Forwarded from DICT
89   addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
90   addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
91   addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
92   addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
93   addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
94   addRecSignal(GSN_DBINFO_SCANREQ, &Trix::execDBINFO_SCANREQ);
95 
96   // Index build
97   addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Trix::execBUILD_INDX_IMPL_REQ);
98   // Dump testing
99   addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &Trix::execBUILD_INDX_IMPL_CONF);
100   addRecSignal(GSN_BUILD_INDX_IMPL_REF, &Trix::execBUILD_INDX_IMPL_REF);
101 
102   addRecSignal(GSN_COPY_DATA_IMPL_REQ, &Trix::execCOPY_DATA_IMPL_REQ);
103   addRecSignal(GSN_BUILD_FK_IMPL_REQ, &Trix::execBUILD_FK_IMPL_REQ);
104 
105   addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
106   addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
107   addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
108   addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
109   addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
110   addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
111 
112 
113   // Suma signals
114   addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
115   addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
116   addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
117   addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
118   addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
119   addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
120   addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
121   addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
122 
123   addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
124   addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
125 
126   // index stats
127   addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
128   addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
129   addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
130 
131   // index stats sys tables
132   c_statGetMetaDone = false;
133 }
134 
135 /**
136  *
137  */
~Trix()138 Trix::~Trix()
139 {
140 }
141 
142 void
execREAD_CONFIG_REQ(Signal * signal)143 Trix::execREAD_CONFIG_REQ(Signal* signal)
144 {
145   jamEntry();
146 
147   const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
148 
149   Uint32 ref = req->senderRef;
150   Uint32 senderData = req->senderData;
151 
152   const ndb_mgm_configuration_iterator * p =
153     m_ctx.m_config.getOwnConfigIterator();
154   ndbrequire(p != 0);
155 
156   // Allocate pool sizes
157   c_theAttrOrderBufferPool.setSize(100);
158   c_theSubscriptionRecPool.setSize(100);
159   c_statOpPool.setSize(5);
160 
161   DLList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
162   SubscriptionRecPtr subptr;
163   while (subscriptions.seizeFirst(subptr) == true) {
164     new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
165   }
166   while (subscriptions.releaseFirst());
167 
168   ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
169   conf->senderRef = reference();
170   conf->senderData = senderData;
171   sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
172 	     ReadConfigConf::SignalLength, JBB);
173 }
174 
175 /**
176  *
177  */
execSTTOR(Signal * signal)178 void Trix::execSTTOR(Signal* signal)
179 {
180   jamEntry();
181 
182   //const Uint32 startphase   = signal->theData[1];
183   const Uint32 theSignalKey = signal->theData[6];
184 
185   signal->theData[0] = theSignalKey;
186   signal->theData[3] = 1;
187   signal->theData[4] = 255; // No more start phases from missra
188   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
189   return;
190 }//Trix::execSTTOR()
191 
192 /**
193  *
194  */
execNDB_STTOR(Signal * signal)195 void Trix::execNDB_STTOR(Signal* signal)
196 {
197   jamEntry();
198   BlockReference ndbcntrRef = signal->theData[0];
199   Uint16 startphase = signal->theData[2];      /* RESTART PHASE           */
200   Uint16 mynode = signal->theData[1];
201   //Uint16 restarttype = signal->theData[3];
202   //UintR configInfo1 = signal->theData[6];     /* CONFIGRATION INFO PART 1 */
203   //UintR configInfo2 = signal->theData[7];     /* CONFIGRATION INFO PART 2 */
204   switch (startphase) {
205   case 3:
206     jam();
207     /* SYMBOLIC START PHASE 4             */
208     /* ABSOLUTE PHASE 5                   */
209     /* REQUEST NODE IDENTITIES FROM DBDIH */
210     signal->theData[0] = calcTrixBlockRef(mynode);
211     sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
212     return;
213     break;
214   case 6:
215     break;
216   default:
217     break;
218   }
219 }
220 
221 /**
222  *
223  */
execREAD_NODESCONF(Signal * signal)224 void Trix::execREAD_NODESCONF(Signal* signal)
225 {
226   jamEntry();
227 
228   ReadNodesConf * const  readNodes = (ReadNodesConf *)signal->getDataPtr();
229   //Uint32 noOfNodes   = readNodes->noOfNodes;
230   NodeRecPtr nodeRecPtr;
231 
232   c_masterNodeId = readNodes->masterNodeId;
233   c_masterTrixRef = RNIL;
234   c_noNodesFailed = 0;
235 
236   for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
237     jam();
238     if(NdbNodeBitmask::get(readNodes->allNodes, i)) {
239       // Node is defined
240       jam();
241       ndbrequire(c_theNodes.getPool().seizeId(nodeRecPtr, i));
242       c_theNodes.addFirst(nodeRecPtr);
243       nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
244       if (i == c_masterNodeId) {
245         c_masterTrixRef = nodeRecPtr.p->trixRef;
246       }
247       if(NdbNodeBitmask::get(readNodes->inactiveNodes, i)){
248         // Node is not active
249 	jam();
250 	/**-----------------------------------------------------------------
251 	 * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
252 	 * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
253 	 * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
254 	 * NOT ALL NODES ARE ALIVE.
255 	 *------------------------------------------------------------------*/
256 	arrGuard(c_noNodesFailed, MAX_NDB_NODES);
257 	nodeRecPtr.p->alive = false;
258 	c_noNodesFailed++;
259 	c_blockState = Trix::NODE_FAILURE;
260       }
261       else {
262         // Node is active
263         jam();
264         c_noActiveNodes++;
265         nodeRecPtr.p->alive = true;
266       }
267     }
268   }
269   if (c_noNodesFailed == 0) {
270     c_blockState = Trix::STARTED;
271   }
272 }
273 
274 /**
275  *
276  */
execREAD_NODESREF(Signal * signal)277 void Trix::execREAD_NODESREF(Signal* signal)
278 {
279   // NYI
280 }
281 
282 /**
283  *
284  */
execNODE_FAILREP(Signal * signal)285 void Trix::execNODE_FAILREP(Signal* signal)
286 {
287   jamEntry();
288   NodeFailRep * const  nodeFail = (NodeFailRep *) signal->getDataPtr();
289 
290   //Uint32 failureNr    = nodeFail->failNo;
291   //Uint32 numberNodes  = nodeFail->noOfNodes;
292   Uint32 masterNodeId = nodeFail->masterNodeId;
293 
294   NodeRecPtr nodeRecPtr;
295 
296   for(c_theNodes.first(nodeRecPtr);
297       nodeRecPtr.i != RNIL;
298       c_theNodes.next(nodeRecPtr)) {
299     if(NdbNodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
300       nodeRecPtr.p->alive = false;
301       c_noNodesFailed++;
302       c_noActiveNodes--;
303     }
304   }
305   if (c_masterNodeId != masterNodeId) {
306     c_masterNodeId = masterNodeId;
307     NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
308     c_masterTrixRef = nodeRec->trixRef;
309   }
310 }
311 
312 /**
313  *
314  */
execINCL_NODEREQ(Signal * signal)315 void Trix::execINCL_NODEREQ(Signal* signal)
316 {
317   jamEntry();
318   UintR node_id = signal->theData[1];
319   NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
320   nodeRec->alive = true;
321   c_noNodesFailed--;
322   c_noActiveNodes++;
323   nodeRec->trixRef = calcTrixBlockRef(node_id);
324   if (c_noNodesFailed == 0) {
325     c_blockState = Trix::STARTED;
326   }
327 }
328 
329 // Debugging
330 void
execDUMP_STATE_ORD(Signal * signal)331 Trix::execDUMP_STATE_ORD(Signal* signal)
332 {
333   jamEntry();
334 
335   DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
336 
337   switch(dumpStateOrd->args[0]) {
338   case(300): {// ok
339     // index2 -T; index2 -I -n10000; index2 -c
340     // all dump 300 0 0 0 0 0 4 2
341     // select_count INDEX0000
342     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
343 
344     MEMCOPY_NO_WORDS(buildIndxReq,
345 		     signal->theData + 1,
346 		     BuildIndxImplReq::SignalLength);
347     buildIndxReq->senderRef = reference(); // return to me
348     buildIndxReq->parallelism = 10;
349     Uint32 indexColumns[1] = {1};
350     Uint32 keyColumns[1] = {0};
351     struct LinearSectionPtr ls_ptr[3];
352     ls_ptr[0].p = indexColumns;
353     ls_ptr[0].sz = 1;
354     ls_ptr[1].p = keyColumns;
355     ls_ptr[1].sz = 1;
356     sendSignal(reference(),
357 	       GSN_BUILD_INDX_IMPL_REQ,
358 	       signal,
359 	       BuildIndxImplReq::SignalLength,
360 	       JBB, ls_ptr, 2);
361     break;
362   }
363   case(301): { // ok
364     // index2 -T; index2 -I -n10000; index2 -c -p
365     // all dump 301 0 0 0 0 0 4 2
366     // select_count INDEX0000
367     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
368 
369     MEMCOPY_NO_WORDS(buildIndxReq,
370 		     signal->theData + 1,
371 		     BuildIndxImplReq::SignalLength);
372     buildIndxReq->senderRef = reference(); // return to me
373     buildIndxReq->parallelism = 10;
374     Uint32 indexColumns[2] = {0, 1};
375     Uint32 keyColumns[1] = {0};
376     struct LinearSectionPtr ls_ptr[3];
377     ls_ptr[0].p = indexColumns;
378     ls_ptr[0].sz = 2;
379     ls_ptr[1].p = keyColumns;
380     ls_ptr[1].sz = 1;
381     sendSignal(reference(),
382 	       GSN_BUILD_INDX_IMPL_REQ,
383 	       signal,
384 	       BuildIndxImplReq::SignalLength,
385 	       JBB, ls_ptr, 2);
386     break;
387   }
388   case(302): { // ok
389     // index -T; index -I -n1000; index -c -p
390     // all dump 302 0 0 0 0 0 4 2
391     // select_count PNUMINDEX0000
392     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
393 
394     MEMCOPY_NO_WORDS(buildIndxReq,
395 		     signal->theData + 1,
396 		     BuildIndxImplReq::SignalLength);
397     buildIndxReq->senderRef = reference(); // return to me
398     buildIndxReq->parallelism = 10;
399     Uint32 indexColumns[3] = {0, 3, 5};
400     Uint32 keyColumns[1] = {0};
401     struct LinearSectionPtr ls_ptr[3];
402     ls_ptr[0].p = indexColumns;
403     ls_ptr[0].sz = 3;
404     ls_ptr[1].p = keyColumns;
405     ls_ptr[1].sz = 1;
406     sendSignal(reference(),
407 	       GSN_BUILD_INDX_IMPL_REQ,
408 	       signal,
409 	       BuildIndxImplReq::SignalLength,
410 	       JBB, ls_ptr, 2);
411     break;
412   }
413   case(303): { // ok
414     // index -T -2; index -I -2 -n1000; index -c -p
415     // all dump 303 0 0 0 0 0 4 2
416     // select_count PNUMINDEX0000
417     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
418 
419     MEMCOPY_NO_WORDS(buildIndxReq,
420 		     signal->theData + 1,
421 		     BuildIndxImplReq::SignalLength);
422     buildIndxReq->senderRef = reference(); // return to me
423     buildIndxReq->parallelism = 10;
424     Uint32 indexColumns[3] = {0, 3, 5};
425     Uint32 keyColumns[2] = {0, 1};
426     struct LinearSectionPtr ls_ptr[3];
427     ls_ptr[0].p = indexColumns;
428     ls_ptr[0].sz = 3;
429     ls_ptr[1].p = keyColumns;
430     ls_ptr[1].sz = 2;
431     sendSignal(reference(),
432 	       GSN_BUILD_INDX_IMPL_REQ,
433 	       signal,
434 	       BuildIndxImplReq::SignalLength,
435 	       JBB, ls_ptr, 2);
436     break;
437   }
438   case(304): { // ok
439     // index -T -L; index -I -L -n1000; index -c -p
440     // all dump 304 0 0 0 0 0 4 2
441     // select_count PNUMINDEX0000
442     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
443 
444     MEMCOPY_NO_WORDS(buildIndxReq,
445 		     signal->theData + 1,
446 		     BuildIndxImplReq::SignalLength);
447     buildIndxReq->senderRef = reference(); // return to me
448     buildIndxReq->parallelism = 10;
449     Uint32 indexColumns[3] = {0, 3, 5};
450     Uint32 keyColumns[1] = {0};
451     struct LinearSectionPtr ls_ptr[3];
452     ls_ptr[0].p = indexColumns;
453     ls_ptr[0].sz = 3;
454     ls_ptr[1].p = keyColumns;
455     ls_ptr[1].sz = 1;
456     sendSignal(reference(),
457 	       GSN_BUILD_INDX_IMPL_REQ,
458 	       signal,
459 	       BuildIndxImplReq::SignalLength,
460 	       JBB, ls_ptr, 2);
461     break;
462   }
463   case(305): { // ok
464     // index -T -2 -L; index -I -2 -L -n1000; index -c -p
465     // all dump 305 0 0 0 0 0 4 2
466     // select_count PNUMINDEX0000
467     BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
468 
469     MEMCOPY_NO_WORDS(buildIndxReq,
470 		     signal->theData + 1,
471 		     BuildIndxImplReq::SignalLength);
472     buildIndxReq->senderRef = reference(); // return to me
473     buildIndxReq->parallelism = 10;
474     Uint32 indexColumns[3] = {0, 3, 5};
475     Uint32 keyColumns[2] = {0, 1};
476     struct LinearSectionPtr ls_ptr[3];
477     ls_ptr[0].p = indexColumns;
478     ls_ptr[0].sz = 3;
479     ls_ptr[1].p = keyColumns;
480     ls_ptr[1].sz = 2;
481     sendSignal(reference(),
482 	       GSN_BUILD_INDX_IMPL_REQ,
483 	       signal,
484 	       BuildIndxImplReq::SignalLength,
485 	       JBB, ls_ptr, 2);
486     break;
487   }
488   default: {
489     // Ignore
490   }
491   }
492 
493   if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
494   {
495     RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
496     RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
497     return;
498   }
499 
500   if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
501   {
502     RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
503     RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
504     return;
505   }
506 
507   if (signal->theData[0] == 8004)
508   {
509     infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
510               c_theSubscriptionRecPool.getSize(),
511               c_theSubscriptionRecPool.getNoOfFree());
512     return;
513   }
514 
515 }
516 
execDBINFO_SCANREQ(Signal * signal)517 void Trix::execDBINFO_SCANREQ(Signal *signal)
518 {
519   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
520   const Ndbinfo::ScanCursor* cursor =
521     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
522   Ndbinfo::Ratelimit rl;
523 
524   jamEntry();
525 
526   switch(req.tableId){
527   case Ndbinfo::POOLS_TABLEID:
528   {
529     Ndbinfo::pool_entry pools[] =
530     {
531       { "Attribute Order Buffer",
532         c_theAttrOrderBufferPool.getUsed(),
533         c_theAttrOrderBufferPool.getSize(),
534         c_theAttrOrderBufferPool.getEntrySize(),
535         c_theAttrOrderBufferPool.getUsedHi(),
536         { 0,0,0,0 }},
537       { "Subscription Record",
538         c_theSubscriptionRecPool.getUsed(),
539         c_theSubscriptionRecPool.getSize(),
540         c_theSubscriptionRecPool.getEntrySize(),
541         c_theSubscriptionRecPool.getUsedHi(),
542         { 0,0,0,0 }},
543       { NULL, 0,0,0,0,{0,0,0,0}}
544     };
545 
546     const size_t num_config_params =
547       sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
548     Uint32 pool = cursor->data[0];
549     BlockNumber bn = blockToMain(number());
550     while(pools[pool].poolname)
551     {
552       jam();
553       Ndbinfo::Row row(signal, req);
554       row.write_uint32(getOwnNodeId());
555       row.write_uint32(bn);           // block number
556       row.write_uint32(instance());   // block instance
557       row.write_string(pools[pool].poolname);
558       row.write_uint64(pools[pool].used);
559       row.write_uint64(pools[pool].total);
560       row.write_uint64(pools[pool].used_hi);
561       row.write_uint64(pools[pool].entry_size);
562       for (size_t i = 0; i < num_config_params; i++)
563         row.write_uint32(pools[pool].config_params[i]);
564       ndbinfo_send_row(signal, req, row, rl);
565       pool++;
566       if (rl.need_break(req))
567       {
568         jam();
569         ndbinfo_send_scan_break(signal, req, rl, pool);
570         return;
571       }
572     }
573     break;
574   }
575   default:
576     break;
577   }
578 
579   ndbinfo_send_scan_conf(signal, req, rl);
580 }
581 
582 // Build index
execBUILD_INDX_IMPL_REQ(Signal * signal)583 void Trix:: execBUILD_INDX_IMPL_REQ(Signal* signal)
584 {
585   jamEntry();
586   DBUG_ENTER("Trix:: execBUILD_INDX_IMPL_REQ");
587 
588   const BuildIndxImplReq
589     buildIndxReqData = *(const BuildIndxImplReq*)signal->getDataPtr(),
590     *buildIndxReq = &buildIndxReqData;
591 
592   // Seize a subscription record
593   SubscriptionRecPtr subRecPtr;
594   SubscriptionRecord* subRec;
595   SectionHandle handle(this, signal);
596 
597   if (ERROR_INSERTED_CLEAR(18000))
598   {
599     sendSignalWithDelay(reference(), GSN_BUILD_INDX_IMPL_REQ, signal, 1000,
600                         signal->getLength(), &handle);
601     DBUG_VOID_RETURN;
602   }
603 
604   if (!c_theSubscriptions.getPool().seizeId(subRecPtr, buildIndxReq->buildId)) {
605     jam();
606     // Failed to allocate subscription record
607     BuildIndxRef* buildIndxRef = (BuildIndxRef*)signal->getDataPtrSend();
608 
609     buildIndxRef->errorCode = BuildIndxRef::AllocationFailure;
610     releaseSections(handle);
611     sendSignal(buildIndxReq->senderRef, GSN_BUILD_INDX_IMPL_REF, signal,
612                BuildIndxRef::SignalLength, JBB);
613     DBUG_VOID_RETURN;
614   }
615   c_theSubscriptions.addFirst(subRecPtr);
616 
617   subRec = subRecPtr.p;
618   subRec->errorCode = BuildIndxRef::NoError;
619   subRec->userReference = buildIndxReq->senderRef;
620   subRec->connectionPtr = buildIndxReq->senderData;
621   subRec->schemaTransId = buildIndxReq->transId;
622   subRec->subscriptionId = buildIndxReq->buildId;
623   subRec->subscriptionKey = buildIndxReq->buildKey;
624   subRec->indexType = buildIndxReq->indexType;
625   subRec->sourceTableId = buildIndxReq->tableId;
626   subRec->targetTableId = buildIndxReq->indexId;
627   subRec->parallelism = buildIndxReq->parallelism;
628   subRec->expectedConf = 0;
629   subRec->subscriptionCreated = false;
630   subRec->pendingSubSyncContinueConf = false;
631   subRec->prepareId = RNIL;
632   subRec->requestType = INDEX_BUILD;
633   subRec->fragCount = 0;
634   subRec->fragId = ZNIL;
635   subRec->m_rows_processed = 0;
636   subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
637   subRec->m_gci = 0;
638   if (buildIndxReq->requestType & BuildIndxImplReq::RF_NO_DISK)
639   {
640     subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
641   }
642 
643   // Get column order segments
644   Uint32 noOfSections = handle.m_cnt;
645   if (noOfSections > 0) {
646     jam();
647     SegmentedSectionPtr ptr;
648     handle.getSection(ptr, BuildIndxImplReq::INDEX_COLUMNS);
649     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
650     subRec->noOfIndexColumns = ptr.sz;
651   }
652   if (noOfSections > 1) {
653     jam();
654     SegmentedSectionPtr ptr;
655     handle.getSection(ptr, BuildIndxImplReq::KEY_COLUMNS);
656     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
657     subRec->noOfKeyColumns = ptr.sz;
658   }
659 
660 #if 0
661   // Debugging
662   printf("Trix:: execBUILD_INDX_IMPL_REQ: Attribute order:\n");
663   subRec->attributeOrder.print(stdout);
664 #endif
665 
666   releaseSections(handle);
667   prepareInsertTransactions(signal, subRecPtr);
668   DBUG_VOID_RETURN;
669 }
670 
execBUILD_INDX_IMPL_CONF(Signal * signal)671 void Trix:: execBUILD_INDX_IMPL_CONF(Signal* signal)
672 {
673   printf("Trix:: execBUILD_INDX_IMPL_CONF\n");
674 }
675 
execBUILD_INDX_IMPL_REF(Signal * signal)676 void Trix:: execBUILD_INDX_IMPL_REF(Signal* signal)
677 {
678   printf("Trix:: execBUILD_INDX_IMPL_REF\n");
679 }
680 
execUTIL_PREPARE_CONF(Signal * signal)681 void Trix::execUTIL_PREPARE_CONF(Signal* signal)
682 {
683   jamEntry();
684   UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
685   SubscriptionRecPtr subRecPtr;
686   SubscriptionRecord* subRec;
687 
688   subRecPtr.i = utilPrepareConf->senderData;
689   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
690     printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
691     return;
692   }
693   if (subRec->requestType == STAT_UTIL)
694   {
695     statUtilPrepareConf(signal, subRec->m_statPtrI);
696     return;
697   }
698   subRecPtr.p = subRec;
699   subRec->prepareId = utilPrepareConf->prepareId;
700   setupSubscription(signal, subRecPtr);
701 }
702 
execUTIL_PREPARE_REF(Signal * signal)703 void Trix::execUTIL_PREPARE_REF(Signal* signal)
704 {
705   jamEntry();
706   UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
707   SubscriptionRecPtr subRecPtr;
708   SubscriptionRecord* subRec;
709 
710   subRecPtr.i = utilPrepareRef->senderData;
711   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
712     printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
713     return;
714   }
715   if (subRec->requestType == STAT_UTIL)
716   {
717     statUtilPrepareRef(signal, subRec->m_statPtrI);
718     return;
719   }
720   subRecPtr.p = subRec;
721   subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
722 
723   UtilReleaseConf* conf = (UtilReleaseConf*)signal->getDataPtrSend();
724   conf->senderData = subRecPtr.i;
725   execUTIL_RELEASE_CONF(signal);
726 }
727 
execUTIL_EXECUTE_CONF(Signal * signal)728 void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
729 {
730   jamEntry();
731   UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
732   SubscriptionRecPtr subRecPtr;
733   SubscriptionRecord* subRec;
734 
735   const Uint32 gci_hi = utilExecuteConf->gci_hi;
736   const Uint32 gci_lo = utilExecuteConf->gci_lo;
737   const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
738 
739   subRecPtr.i = utilExecuteConf->senderData;
740   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
741     printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
742     return;
743   }
744   if (subRec->requestType == STAT_UTIL)
745   {
746     statUtilExecuteConf(signal, subRec->m_statPtrI);
747     return;
748   }
749   subRecPtr.p = subRec;
750   subRec->expectedConf--;
751 
752   if (gci > subRecPtr.p->m_gci)
753   {
754     jam();
755     subRecPtr.p->m_gci = gci;
756   }
757 
758   checkParallelism(signal, subRec);
759   if (subRec->expectedConf == 0)
760   {
761     if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
762     {
763       jam();
764       wait_gcp(signal, subRecPtr);
765       return;
766     }
767     buildComplete(signal, subRecPtr);
768   }
769 }
770 
execUTIL_EXECUTE_REF(Signal * signal)771 void Trix::execUTIL_EXECUTE_REF(Signal* signal)
772 {
773   jamEntry();
774   UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
775   SubscriptionRecPtr subRecPtr;
776   SubscriptionRecord* subRec;
777 
778   subRecPtr.i = utilExecuteRef->senderData;
779   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
780     printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
781     return;
782   }
783   if (subRec->requestType == STAT_UTIL)
784   {
785     statUtilExecuteRef(signal, subRec->m_statPtrI);
786     return;
787   }
788   subRecPtr.p = subRec;
789   ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
790   if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
791   {
792     jam();
793     buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
794   }
795   else if (check_timeout(utilExecuteRef->TCErrorCode))
796   {
797     jam();
798     buildFailed(signal, subRecPtr, BuildIndxRef::DeadlockError);
799   }
800   else if (subRec->requestType == FK_BUILD &&
801            utilExecuteRef->TCErrorCode == TUPLE_NOT_FOUND)
802   {
803     jam();
804     buildFailed(signal, subRecPtr,
805                 (BuildIndxRef::ErrorCode)FK_NO_PARENT_ROW_EXISTS);
806   }
807   else
808   {
809     jam();
810     buildFailed(signal, subRecPtr,
811                 (BuildIndxRef::ErrorCode)utilExecuteRef->TCErrorCode);
812   }
813 }
814 
execSUB_CREATE_CONF(Signal * signal)815 void Trix::execSUB_CREATE_CONF(Signal* signal)
816 {
817   jamEntry();
818   DBUG_ENTER("Trix::execSUB_CREATE_CONF");
819   SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
820   SubscriptionRecPtr subRecPtr;
821   SubscriptionRecord* subRec;
822 
823   subRecPtr.i = subCreateConf->senderData;
824   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
825     printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
826     DBUG_VOID_RETURN;
827   }
828   subRec->subscriptionCreated = true;
829   subRecPtr.p = subRec;
830 
831   DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
832 		     subRecPtr.i, subRecPtr.p->subscriptionId,
833 		     subRecPtr.p->subscriptionKey));
834 
835   startTableScan(signal, subRecPtr);
836   DBUG_VOID_RETURN;
837 }
838 
execSUB_CREATE_REF(Signal * signal)839 void Trix::execSUB_CREATE_REF(Signal* signal)
840 {
841   jamEntry();
842   DBUG_ENTER("Trix::execSUB_CREATE_REF");
843 
844   SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
845   SubscriptionRecPtr subRecPtr;
846   SubscriptionRecord* subRec;
847 
848   subRecPtr.i = subCreateRef->senderData;
849   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL)
850   {
851     printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
852     return;
853   }
854   subRecPtr.p = subRec;
855   subRecPtr.p->errorCode = (BuildIndxRef::ErrorCode)subCreateRef->errorCode;
856 
857   UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
858   req->prepareId = subRecPtr.p->prepareId;
859   req->senderData = subRecPtr.i;
860 
861   sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
862 	     UtilReleaseReq::SignalLength, JBB);
863 
864   DBUG_VOID_RETURN;
865 }
866 
execSUB_SYNC_CONF(Signal * signal)867 void Trix::execSUB_SYNC_CONF(Signal* signal)
868 {
869   jamEntry();
870   DBUG_ENTER("Trix::execSUB_SYNC_CONF");
871   SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
872   SubscriptionRecPtr subRecPtr;
873   SubscriptionRecord* subRec;
874 
875   subRecPtr.i = subSyncConf->senderData;
876   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
877     printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
878 	   subRecPtr.i);
879     DBUG_VOID_RETURN;
880   }
881 
882   subRecPtr.p = subRec;
883   subRec->expectedConf--;
884   checkParallelism(signal, subRec);
885   if (subRec->expectedConf == 0)
886   {
887     if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
888     {
889       jam();
890       wait_gcp(signal, subRecPtr);
891       DBUG_VOID_RETURN;
892     }
893     buildComplete(signal, subRecPtr);
894   }
895   DBUG_VOID_RETURN;
896 }
897 
execSUB_SYNC_REF(Signal * signal)898 void Trix::execSUB_SYNC_REF(Signal* signal)
899 {
900   jamEntry();
901   DBUG_ENTER("Trix::execSUB_SYNC_REF");
902   SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
903   SubscriptionRecPtr subRecPtr;
904   SubscriptionRecord* subRec;
905 
906   subRecPtr.i = subSyncRef->senderData;
907   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
908     printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
909     DBUG_VOID_RETURN;
910   }
911   subRecPtr.p = subRec;
912   buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
913   DBUG_VOID_RETURN;
914 }
915 
execSUB_SYNC_CONTINUE_REQ(Signal * signal)916 void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
917 {
918   SubSyncContinueReq  * subSyncContinueReq =
919     (SubSyncContinueReq *) signal->getDataPtr();
920 
921   SubscriptionRecPtr subRecPtr;
922   SubscriptionRecord* subRec;
923   subRecPtr.i = subSyncContinueReq->subscriberData;
924   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
925     printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
926     return;
927   }
928   subRecPtr.p = subRec;
929   subRec->pendingSubSyncContinueConf = true;
930   subRec->syncPtr = subSyncContinueReq->senderData;
931   checkParallelism(signal, subRec);
932 }
933 
execSUB_TABLE_DATA(Signal * signal)934 void Trix::execSUB_TABLE_DATA(Signal* signal)
935 {
936   jamEntry();
937   DBUG_ENTER("Trix::execSUB_TABLE_DATA");
938   SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
939   SubscriptionRecPtr subRecPtr;
940   SubscriptionRecord* subRec;
941   subRecPtr.i = subTableData->senderData;
942   if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
943     printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
944     DBUG_VOID_RETURN;
945   }
946   subRecPtr.p = subRec;
947   switch(subRecPtr.p->requestType){
948   case INDEX_BUILD:
949     executeBuildInsertTransaction(signal, subRecPtr);
950     break;
951   case REORG_COPY:
952   case REORG_DELETE:
953     executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
954     break;
955   case FK_BUILD:
956     executeBuildFKTransaction(signal, subRecPtr);
957     break;
958   case STAT_UTIL:
959     ndbrequire(false);
960     break;
961   case STAT_CLEAN:
962     {
963       StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
964       statCleanExecute(signal, stat);
965     }
966     break;
967   case STAT_SCAN:
968     {
969       StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
970       statScanExecute(signal, stat);
971     }
972     break;
973   }
974 
975   subRecPtr.p->m_rows_processed++;
976 
977   DBUG_VOID_RETURN;
978 }
979 
setupSubscription(Signal * signal,SubscriptionRecPtr subRecPtr)980 void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
981 {
982   jam();
983   DBUG_ENTER("Trix::setupSubscription");
984   SubscriptionRecord* subRec = subRecPtr.p;
985   SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
986 //  Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
987   subCreateReq->senderRef = reference();
988   subCreateReq->senderData = subRecPtr.i;
989   subCreateReq->subscriptionId = subRec->subscriptionId;
990   subCreateReq->subscriptionKey = subRec->subscriptionKey;
991   subCreateReq->tableId = subRec->sourceTableId;
992   subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
993   subCreateReq->schemaTransId = subRec->schemaTransId;
994 
995   DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
996 		     subRecPtr.i, subCreateReq->subscriptionId,
997 		     subCreateReq->subscriptionKey));
998 
999   sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
1000 	     signal, SubCreateReq::SignalLength, JBB);
1001 
1002   DBUG_VOID_RETURN;
1003 }
1004 
startTableScan(Signal * signal,SubscriptionRecPtr subRecPtr)1005 void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
1006 {
1007   jam();
1008 
1009   Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
1010   SubscriptionRecord* subRec = subRecPtr.p;
1011   AttrOrderBuffer::DataBufferIterator iter;
1012 
1013   Uint32 cnt = 0;
1014   bool moreAttributes = subRec->attributeOrder.first(iter);
1015   if (subRec->requestType == FK_BUILD)
1016   {
1017     jam();
1018     // skip over key columns
1019     ndbrequire(subRec->attributeOrder.position(iter, subRec->noOfKeyColumns));
1020   }
1021 
1022   while (moreAttributes) {
1023     attributeList[cnt++] = *iter.data;
1024     moreAttributes = subRec->attributeOrder.next(iter);
1025   }
1026 
1027   // Merge index and key column segments
1028   struct LinearSectionPtr orderPtr[3];
1029   Uint32 noOfSections;
1030   orderPtr[0].p = attributeList;
1031   orderPtr[0].sz = cnt;
1032   noOfSections = 1;
1033 
1034   SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
1035   subSyncReq->senderRef = reference();
1036   subSyncReq->senderData = subRecPtr.i;
1037   subSyncReq->subscriptionId = subRec->subscriptionId;
1038   subSyncReq->subscriptionKey = subRec->subscriptionKey;
1039   subSyncReq->part = SubscriptionData::TableData;
1040   subSyncReq->requestInfo = 0;
1041   subSyncReq->fragCount = subRec->fragCount;
1042   subSyncReq->fragId = subRec->fragId;
1043 
1044   if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
1045   {
1046     jam();
1047     subSyncReq->requestInfo |= SubSyncReq::NoDisk;
1048   }
1049 
1050   if (subRec->m_flags & SubscriptionRecord::RF_TUP_ORDER)
1051   {
1052     jam();
1053     subSyncReq->requestInfo |= SubSyncReq::TupOrder;
1054   }
1055 
1056   if (subRec->requestType == REORG_COPY)
1057   {
1058     jam();
1059     subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1060   }
1061   else if (subRec->requestType == REORG_DELETE)
1062   {
1063     jam();
1064     subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1065     subSyncReq->requestInfo |= SubSyncReq::Reorg;
1066   }
1067   else if (subRec->requestType == STAT_CLEAN)
1068   {
1069     jam();
1070     StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1071     StatOp::Clean clean = stat.m_clean;
1072     orderPtr[1].p = clean.m_bound;
1073     orderPtr[1].sz = clean.m_boundSize;
1074     noOfSections = 2;
1075     subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1076     subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1077   }
1078   else if (subRec->requestType == STAT_SCAN)
1079   {
1080     jam();
1081     orderPtr[1].p = 0;
1082     orderPtr[1].sz = 0;
1083     noOfSections = 2;
1084     subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1085     subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1086     subSyncReq->requestInfo |= SubSyncReq::StatScan;
1087   }
1088   subRecPtr.p->expectedConf = 1;
1089 
1090   DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
1091 		     subRecPtr.i, subSyncReq->subscriptionId,
1092 		     subSyncReq->subscriptionKey));
1093 
1094   sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
1095 	     signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
1096 }
1097 
prepareInsertTransactions(Signal * signal,SubscriptionRecPtr subRecPtr)1098 void Trix::prepareInsertTransactions(Signal* signal,
1099 				     SubscriptionRecPtr subRecPtr)
1100 {
1101   SubscriptionRecord* subRec = subRecPtr.p;
1102   UtilPrepareReq * utilPrepareReq =
1103     (UtilPrepareReq *)signal->getDataPtrSend();
1104 
1105   jam();
1106   utilPrepareReq->senderRef = reference();
1107   utilPrepareReq->senderData = subRecPtr.i;
1108   utilPrepareReq->schemaTransId = subRec->schemaTransId;
1109 
1110   const Uint32 pageSizeInWords = 128;
1111   Uint32 propPage[pageSizeInWords];
1112   LinearWriter w(&propPage[0],128);
1113   w.first();
1114   w.add(UtilPrepareReq::NoOfOperations, 1);
1115   w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1116   w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1117   // Add index attributes in increasing order and one PK attribute
1118   for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
1119     w.add(UtilPrepareReq::AttributeId, i);
1120 
1121 #if 0
1122   // Debugging
1123   SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
1124   printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
1125   reader.printAll(ndbout);
1126 #endif
1127 
1128   struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1129   sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1130   sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1131   sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1132 	     UtilPrepareReq::SignalLength, JBB,
1133 	     sectionsPtr, UtilPrepareReq::NoOfSections);
1134 }
1135 
executeBuildInsertTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1136 void Trix::executeBuildInsertTransaction(Signal* signal,
1137                                          SubscriptionRecPtr subRecPtr)
1138 {
1139   jam();
1140   SubscriptionRecord* subRec = subRecPtr.p;
1141   UtilExecuteReq * utilExecuteReq =
1142     (UtilExecuteReq *)signal->getDataPtrSend();
1143 
1144   utilExecuteReq->senderRef = reference();
1145   utilExecuteReq->senderData = subRecPtr.i;
1146   utilExecuteReq->prepareId = subRec->prepareId;
1147 #if 0
1148   printf("Header size %u\n", headerPtr.sz);
1149   for(int i = 0; i < headerPtr.sz; i++)
1150     printf("H'%.8x ", headerBuffer[i]);
1151   printf("\n");
1152 
1153   printf("Data size %u\n", dataPtr.sz);
1154   for(int i = 0; i < dataPtr.sz; i++)
1155     printf("H'%.8x ", dataBuffer[i]);
1156   printf("\n");
1157 #endif
1158   // Save scan result in linear buffers
1159   SectionHandle handle(this, signal);
1160   SegmentedSectionPtr headerPtr, dataPtr;
1161 
1162   handle.getSection(headerPtr, 0);
1163   handle.getSection(dataPtr, 1);
1164 
1165   Uint32* headerBuffer = signal->theData + 25;
1166   Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1167 
1168   copy(headerBuffer, headerPtr);
1169   copy(dataBuffer, dataPtr);
1170   releaseSections(handle);
1171 
1172   // Calculate packed key size
1173   Uint32 noOfKeyData = 0;
1174   for(Uint32 i = 0; i < headerPtr.sz; i++) {
1175     AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1176 
1177     // Filter out NULL attributes
1178     if (keyAttrHead->isNULL())
1179       return;
1180 
1181     if (i < subRec->noOfIndexColumns)
1182       // Renumber index attributes in consequtive order
1183       keyAttrHead->setAttributeId(i);
1184     else
1185       // Calculate total size of PK attribute
1186       noOfKeyData += keyAttrHead->getDataSize();
1187   }
1188   // Increase expected CONF count
1189   subRec->expectedConf++;
1190 
1191   // Pack key attributes
1192   AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
1193 			subRec->noOfIndexColumns,
1194 			noOfKeyData << 2);
1195 
1196   struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1197   sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1198   sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
1199     subRec->noOfIndexColumns + 1;
1200   sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1201   sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1202   sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1203 	     UtilExecuteReq::SignalLength, JBB,
1204 	     sectionsPtr, UtilExecuteReq::NoOfSections);
1205 }
1206 
executeReorgTransaction(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 takeOver)1207 void Trix::executeReorgTransaction(Signal* signal,
1208                                    SubscriptionRecPtr subRecPtr,
1209                                    Uint32 takeOver)
1210 {
1211   jam();
1212   SubscriptionRecord* subRec = subRecPtr.p;
1213   UtilExecuteReq * utilExecuteReq =
1214     (UtilExecuteReq *)signal->getDataPtrSend();
1215 
1216   const Uint32 tScanInfo = takeOver & 0x3FFFF;
1217   const Uint32 tTakeOverFragment = takeOver >> 20;
1218   {
1219     UintR scanInfo = 0;
1220     TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1221     TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1222     TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1223     utilExecuteReq->scanTakeOver = scanInfo;
1224   }
1225 
1226   utilExecuteReq->senderRef = reference();
1227   utilExecuteReq->senderData = subRecPtr.i;
1228   utilExecuteReq->prepareId = subRec->prepareId;
1229 #if 0
1230   printf("Header size %u\n", headerPtr.sz);
1231   for(int i = 0; i < headerPtr.sz; i++)
1232     printf("H'%.8x ", headerBuffer[i]);
1233   printf("\n");
1234 
1235   printf("Data size %u\n", dataPtr.sz);
1236   for(int i = 0; i < dataPtr.sz; i++)
1237     printf("H'%.8x ", dataBuffer[i]);
1238   printf("\n");
1239 #endif
1240   // Increase expected CONF count
1241   subRec->expectedConf++;
1242 
1243   SectionHandle handle(this, signal);
1244   sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1245 	     UtilExecuteReq::SignalLength, JBB,
1246 	     &handle);
1247 }
1248 
1249 void
wait_gcp(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 delay)1250 Trix::wait_gcp(Signal* signal, SubscriptionRecPtr subRecPtr, Uint32 delay)
1251 {
1252   WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
1253   req->senderRef = reference();
1254   req->senderData = subRecPtr.i;
1255   req->requestType = WaitGCPReq::CurrentGCI;
1256 
1257   if (delay == 0)
1258   {
1259     jam();
1260     sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1261                WaitGCPReq::SignalLength, JBB);
1262   }
1263   else
1264   {
1265     jam();
1266     sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1267                         delay, WaitGCPReq::SignalLength);
1268   }
1269 }
1270 
1271 void
execWAIT_GCP_REF(Signal * signal)1272 Trix::execWAIT_GCP_REF(Signal* signal)
1273 {
1274   WaitGCPRef ref = *(WaitGCPRef*)signal->getDataPtr();
1275 
1276   SubscriptionRecPtr subRecPtr;
1277   c_theSubscriptions.getPtr(subRecPtr, ref.senderData);
1278   wait_gcp(signal, subRecPtr, 100);
1279 }
1280 
1281 void
execWAIT_GCP_CONF(Signal * signal)1282 Trix::execWAIT_GCP_CONF(Signal* signal)
1283 {
1284   WaitGCPConf * conf = (WaitGCPConf*)signal->getDataPtr();
1285 
1286   SubscriptionRecPtr subRecPtr;
1287   c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1288 
1289   const Uint32 gci_hi = conf->gci_hi;
1290   const Uint32 gci_lo = conf->gci_lo;
1291   const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
1292 
1293   if (gci > subRecPtr.p->m_gci)
1294   {
1295     jam();
1296     buildComplete(signal, subRecPtr);
1297   }
1298   else
1299   {
1300     jam();
1301     wait_gcp(signal, subRecPtr, 100);
1302   }
1303 }
1304 
buildComplete(Signal * signal,SubscriptionRecPtr subRecPtr)1305 void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
1306 {
1307   SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
1308   req->senderRef       = reference();
1309   req->senderData      = subRecPtr.i;
1310   req->subscriptionId  = subRecPtr.p->subscriptionId;
1311   req->subscriptionKey = subRecPtr.p->subscriptionKey;
1312   sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
1313 	     SubRemoveReq::SignalLength, JBB);
1314 }
1315 
buildFailed(Signal * signal,SubscriptionRecPtr subRecPtr,BuildIndxRef::ErrorCode errorCode)1316 void Trix::buildFailed(Signal* signal,
1317 		       SubscriptionRecPtr subRecPtr,
1318 		       BuildIndxRef::ErrorCode errorCode)
1319 {
1320   SubscriptionRecord* subRec = subRecPtr.p;
1321 
1322   subRec->errorCode = errorCode;
1323   // Continue accumulating since we currently cannot stop SUMA
1324   subRec->expectedConf--;
1325   checkParallelism(signal, subRec);
1326   if (subRec->expectedConf == 0)
1327     buildComplete(signal, subRecPtr);
1328 }
1329 
1330 void
execSUB_REMOVE_REF(Signal * signal)1331 Trix::execSUB_REMOVE_REF(Signal* signal){
1332   jamEntry();
1333   //@todo
1334   ndbrequire(false);
1335 }
1336 
1337 void
execSUB_REMOVE_CONF(Signal * signal)1338 Trix::execSUB_REMOVE_CONF(Signal* signal){
1339   jamEntry();
1340 
1341   SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
1342 
1343   SubscriptionRecPtr subRecPtr;
1344   c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1345 
1346   if(subRecPtr.p->prepareId != RNIL){
1347     jam();
1348 
1349     UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
1350     req->prepareId = subRecPtr.p->prepareId;
1351     req->senderData = subRecPtr.i;
1352 
1353     sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
1354 	       UtilReleaseReq::SignalLength , JBB);
1355     return;
1356   }
1357 
1358   {
1359     UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1360     conf->senderData = subRecPtr.i;
1361     execUTIL_RELEASE_CONF(signal);
1362   }
1363 }
1364 
1365 void
execUTIL_RELEASE_REF(Signal * signal)1366 Trix::execUTIL_RELEASE_REF(Signal* signal){
1367   jamEntry();
1368   ndbrequire(false);
1369 }
1370 
1371 void
execUTIL_RELEASE_CONF(Signal * signal)1372 Trix::execUTIL_RELEASE_CONF(Signal* signal){
1373 
1374   UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1375 
1376   SubscriptionRecPtr subRecPtr;
1377   c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1378 
1379   switch(subRecPtr.p->requestType){
1380   case REORG_COPY:
1381   case REORG_DELETE:
1382     if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1383     {
1384       jam();
1385       // Build is complete, reply to original sender
1386       CopyDataImplConf* conf = (CopyDataImplConf*)signal->getDataPtrSend();
1387       conf->senderRef = reference(); //wl3600_todo ok?
1388       conf->senderData = subRecPtr.p->connectionPtr;
1389 
1390       sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_CONF, signal,
1391                  CopyDataImplConf::SignalLength , JBB);
1392 
1393       infoEvent("%s table %u processed %llu rows",
1394                 subRecPtr.p->requestType == REORG_COPY ?
1395                 "reorg-copy" : "reorg-delete",
1396                 subRecPtr.p->sourceTableId,
1397                 subRecPtr.p->m_rows_processed);
1398     } else {
1399       jam();
1400       // Build failed, reply to original sender
1401       CopyDataImplRef* ref = (CopyDataImplRef*)signal->getDataPtrSend();
1402       ref->senderRef = reference();
1403       ref->senderData = subRecPtr.p->connectionPtr;
1404       ref->errorCode = subRecPtr.p->errorCode;
1405 
1406       sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_REF, signal,
1407                  CopyDataImplRef::SignalLength , JBB);
1408     }
1409     break;
1410   case INDEX_BUILD:
1411     if (subRecPtr.p->errorCode == BuildIndxRef::NoError) {
1412       jam();
1413       // Build is complete, reply to original sender
1414       BuildIndxImplConf* buildIndxConf =
1415         (BuildIndxImplConf*)signal->getDataPtrSend();
1416       buildIndxConf->senderRef = reference(); //wl3600_todo ok?
1417       buildIndxConf->senderData = subRecPtr.p->connectionPtr;
1418 
1419       sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_CONF, signal,
1420                  BuildIndxConf::SignalLength , JBB);
1421 
1422       infoEvent("index-build table %u index: %u processed %llu rows",
1423                 subRecPtr.p->sourceTableId,
1424                 subRecPtr.p->targetTableId,
1425                 subRecPtr.p->m_rows_processed);
1426     } else {
1427       jam();
1428       // Build failed, reply to original sender
1429       BuildIndxImplRef* buildIndxRef =
1430         (BuildIndxImplRef*)signal->getDataPtrSend();
1431       buildIndxRef->senderRef = reference();
1432       buildIndxRef->senderData = subRecPtr.p->connectionPtr;
1433       buildIndxRef->errorCode = subRecPtr.p->errorCode;
1434 
1435       sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_REF, signal,
1436                  BuildIndxRef::SignalLength , JBB);
1437     }
1438     break;
1439   case FK_BUILD:
1440     if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1441     {
1442       jam();
1443       // Build is complete, reply to original sender
1444       BuildFKImplConf* buildFKConf =
1445         CAST_PTR(BuildFKImplConf, signal->getDataPtrSend());
1446       buildFKConf->senderRef = reference();
1447       buildFKConf->senderData = subRecPtr.p->connectionPtr;
1448 
1449       sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_CONF, signal,
1450                  BuildFKImplConf::SignalLength , JBB);
1451 
1452       infoEvent("fk-build parent table: %u child table: %u processed %llu rows",
1453                 subRecPtr.p->targetTableId,
1454                 subRecPtr.p->sourceTableId,
1455                 subRecPtr.p->m_rows_processed);
1456     }
1457     else
1458     {
1459       jam();
1460       // Build failed, reply to original sender
1461       BuildFKImplRef* buildFKRef =
1462         (BuildFKImplRef*)signal->getDataPtrSend();
1463       buildFKRef->senderRef = reference();
1464       buildFKRef->senderData = subRecPtr.p->connectionPtr;
1465       buildFKRef->errorCode = subRecPtr.p->errorCode;
1466 
1467       sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_REF, signal,
1468                  BuildFKImplRef::SignalLength , JBB);
1469     }
1470     break;
1471   case STAT_UTIL:
1472     ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
1473     statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
1474     return;
1475   case STAT_CLEAN:
1476     {
1477       subRecPtr.p->prepareId = RNIL;
1478       StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1479       statCleanRelease(signal, stat);
1480     }
1481     return;
1482   case STAT_SCAN:
1483     {
1484       subRecPtr.p->prepareId = RNIL;
1485       StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1486       statScanRelease(signal, stat);
1487     }
1488     return;
1489   }
1490 
1491   // Release subscription record
1492   subRecPtr.p->attributeOrder.release();
1493   c_theSubscriptions.release(subRecPtr.i);
1494 }
1495 
checkParallelism(Signal * signal,SubscriptionRecord * subRec)1496 void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
1497 {
1498   if ((subRec->pendingSubSyncContinueConf) &&
1499       (subRec->expectedConf == 1)) {
1500     jam();
1501     SubSyncContinueConf  * subSyncContinueConf =
1502       (SubSyncContinueConf *) signal->getDataPtrSend();
1503     subSyncContinueConf->subscriptionId = subRec->subscriptionId;
1504     subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
1505     subSyncContinueConf->senderData = subRec->syncPtr;
1506     sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
1507 	       SubSyncContinueConf::SignalLength , JBB);
1508     subRec->pendingSubSyncContinueConf = false;
1509     return;
1510   }
1511 }
1512 
1513 // CopyData
1514 void
execCOPY_DATA_IMPL_REQ(Signal * signal)1515 Trix::execCOPY_DATA_IMPL_REQ(Signal* signal)
1516 {
1517   jamEntry();
1518 
1519   const CopyDataImplReq reqData = *(const CopyDataImplReq*)signal->getDataPtr();
1520   const CopyDataImplReq *req = &reqData;
1521 
1522   // Seize a subscription record
1523   SubscriptionRecPtr subRecPtr;
1524   SectionHandle handle(this, signal);
1525 
1526   if (!c_theSubscriptions.seizeFirst(subRecPtr))
1527   {
1528     jam();
1529     // Failed to allocate subscription record
1530     releaseSections(handle);
1531 
1532     CopyDataImplRef* ref = (CopyDataRef*)signal->getDataPtrSend();
1533 
1534     ref->errorCode = -1; // XXX CopyDataImplRef::AllocationFailure;
1535     ref->senderData = req->senderData;
1536     ref->transId = req->transId;
1537     sendSignal(req->senderRef, GSN_COPY_DATA_IMPL_REF, signal,
1538                CopyDataImplRef::SignalLength, JBB);
1539     return;
1540   }
1541 
1542   SubscriptionRecord* subRec = subRecPtr.p;
1543   subRec->errorCode = BuildIndxRef::NoError;
1544   subRec->userReference = req->senderRef;
1545   subRec->connectionPtr = req->senderData;
1546   subRec->schemaTransId = req->transId;
1547   subRec->subscriptionId = rand();
1548   subRec->subscriptionKey = rand();
1549   subRec->indexType = RNIL;
1550   subRec->sourceTableId = req->srcTableId;
1551   subRec->targetTableId = req->dstTableId;
1552   subRec->parallelism = 16;
1553   subRec->expectedConf = 0;
1554   subRec->subscriptionCreated = false;
1555   subRec->pendingSubSyncContinueConf = false;
1556   subRec->prepareId = req->transId;
1557   subRec->fragCount = req->srcFragments;
1558   subRec->fragId = ZNIL;
1559   subRec->m_rows_processed = 0;
1560   subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
1561   subRec->m_gci = 0;
1562   switch(req->requestType){
1563   case CopyDataImplReq::ReorgCopy:
1564     jam();
1565     subRec->requestType = REORG_COPY;
1566     break;
1567   case CopyDataImplReq::ReorgDelete:
1568     subRec->requestType = REORG_DELETE;
1569     break;
1570   default:
1571     jamLine(req->requestType);
1572     ndbrequire(false);
1573   }
1574 
1575   if (req->requestInfo & CopyDataReq::TupOrder)
1576   {
1577     jam();
1578     subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1579   }
1580 
1581   // Get column order segments
1582   Uint32 noOfSections = handle.m_cnt;
1583   if (noOfSections > 0) {
1584     jam();
1585     SegmentedSectionPtr ptr;
1586     handle.getSection(ptr, 0);
1587     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1588     subRec->noOfIndexColumns = ptr.sz;
1589   }
1590 
1591   if (noOfSections > 1) {
1592     jam();
1593     SegmentedSectionPtr ptr;
1594     handle.getSection(ptr, 1);
1595     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1596     subRec->noOfKeyColumns = ptr.sz;
1597   }
1598 
1599   releaseSections(handle);
1600   {
1601     UtilPrepareReq * utilPrepareReq =
1602       (UtilPrepareReq *)signal->getDataPtrSend();
1603 
1604     utilPrepareReq->senderRef = reference();
1605     utilPrepareReq->senderData = subRecPtr.i;
1606     utilPrepareReq->schemaTransId = subRec->schemaTransId;
1607 
1608     const Uint32 pageSizeInWords = 128;
1609     Uint32 propPage[pageSizeInWords];
1610     LinearWriter w(&propPage[0],128);
1611     w.first();
1612     w.add(UtilPrepareReq::NoOfOperations, 1);
1613     if (subRec->requestType == REORG_COPY)
1614     {
1615       w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1616     }
1617     else
1618     {
1619       w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Delete);
1620     }
1621     w.add(UtilPrepareReq::ScanTakeOverInd, 1);
1622     w.add(UtilPrepareReq::ReorgInd, 1);
1623     w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1624 
1625     AttrOrderBuffer::DataBufferIterator iter;
1626     ndbrequire(subRec->attributeOrder.first(iter));
1627 
1628     for(Uint32 i = 0; i < subRec->noOfIndexColumns; i++)
1629     {
1630       w.add(UtilPrepareReq::AttributeId, * iter.data);
1631       subRec->attributeOrder.next(iter);
1632     }
1633 
1634     struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1635     sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1636     sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1637     sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1638                UtilPrepareReq::SignalLength, JBB,
1639                sectionsPtr, UtilPrepareReq::NoOfSections);
1640   }
1641 }
1642 
1643 // BuildFK
1644 void
execBUILD_FK_IMPL_REQ(Signal * signal)1645 Trix::execBUILD_FK_IMPL_REQ(Signal* signal)
1646 {
1647   jamEntry();
1648 
1649   const BuildFKImplReq reqData = *(const BuildFKImplReq*)signal->getDataPtr();
1650   const BuildFKImplReq *req = &reqData;
1651 
1652   // Seize a subscription record
1653   SubscriptionRecPtr subRecPtr;
1654   SectionHandle handle(this, signal);
1655 
1656   if (!c_theSubscriptions.seizeFirst(subRecPtr))
1657   {
1658     jam();
1659     // Failed to allocate subscription record
1660     releaseSections(handle);
1661 
1662     BuildFKImplRef* ref = (BuildFKImplRef*)signal->getDataPtrSend();
1663 
1664     ref->errorCode = -1; // XXX BuildFKImplRef::AllocationFailure;
1665     ref->senderData = req->senderData;
1666     sendSignal(req->senderRef, GSN_BUILD_FK_IMPL_REF, signal,
1667                BuildFKImplRef::SignalLength, JBB);
1668     return;
1669   }
1670 
1671   SubscriptionRecord* subRec = subRecPtr.p;
1672   subRec->errorCode = BuildIndxRef::NoError;
1673   subRec->userReference = req->senderRef;
1674   subRec->connectionPtr = req->senderData;
1675   subRec->schemaTransId = req->transId;
1676   subRec->subscriptionId = rand();
1677   subRec->subscriptionKey = rand();
1678   subRec->indexType = RNIL;
1679   subRec->sourceTableId = req->childTableId;
1680   subRec->targetTableId = req->parentTableId;
1681   subRec->parallelism = 16;
1682   subRec->expectedConf = 0;
1683   subRec->subscriptionCreated = false;
1684   subRec->pendingSubSyncContinueConf = false;
1685   subRec->prepareId = req->transId;
1686   subRec->fragCount = 0; // all
1687   subRec->fragId = ZNIL;
1688   subRec->m_rows_processed = 0;
1689   subRec->m_flags = 0;
1690   subRec->m_gci = 0;
1691   subRec->requestType = FK_BUILD;
1692 
1693   // TODO...check if there is a scenario where this is not optimal
1694   subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1695 
1696   // as we don't support index on disk...
1697   subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
1698 
1699   // Get parent columns...
1700   {
1701     SegmentedSectionPtr ptr;
1702     handle.getSection(ptr, 0);
1703     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1704     subRec->noOfKeyColumns = ptr.sz;
1705   }
1706 
1707   {
1708     // Get child columns...
1709     SegmentedSectionPtr ptr;
1710     handle.getSection(ptr, 1);
1711     append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1712     subRec->noOfIndexColumns = ptr.sz;
1713   }
1714 
1715   ndbrequire(subRec->noOfKeyColumns == subRec->noOfIndexColumns);
1716 
1717   releaseSections(handle);
1718 
1719   {
1720     UtilPrepareReq * utilPrepareReq =
1721       (UtilPrepareReq *)signal->getDataPtrSend();
1722 
1723     utilPrepareReq->senderRef = reference();
1724     utilPrepareReq->senderData = subRecPtr.i;
1725     utilPrepareReq->schemaTransId = subRec->schemaTransId;
1726 
1727     const Uint32 pageSizeInWords = 128;
1728     Uint32 propPage[pageSizeInWords];
1729     LinearWriter w(&propPage[0],128);
1730     w.first();
1731     w.add(UtilPrepareReq::NoOfOperations, 1);
1732     w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Probe);
1733     w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1734 
1735     // key is always in 0
1736     AttrOrderBuffer::DataBufferIterator iter;
1737     ndbrequire(subRec->attributeOrder.first(iter));
1738     for(Uint32 i = 0; i < subRec->noOfKeyColumns; i++)
1739     {
1740       w.add(UtilPrepareReq::AttributeId, * iter.data);
1741       subRec->attributeOrder.next(iter);
1742     }
1743 
1744     struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1745     sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1746     sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1747     sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1748                UtilPrepareReq::SignalLength, JBB,
1749                sectionsPtr, UtilPrepareReq::NoOfSections);
1750   }
1751 }
1752 
1753 void
executeBuildFKTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1754 Trix::executeBuildFKTransaction(Signal* signal,
1755                                 SubscriptionRecPtr subRecPtr)
1756 {
1757   jam();
1758   SubscriptionRecord* subRec = subRecPtr.p;
1759   UtilExecuteReq * utilExecuteReq =
1760     CAST_PTR(UtilExecuteReq, signal->getDataPtrSend());
1761 
1762   utilExecuteReq->senderRef = reference();
1763   utilExecuteReq->senderData = subRecPtr.i;
1764   utilExecuteReq->prepareId = subRec->prepareId;
1765 
1766   // Save scan result in linear buffers
1767   SectionHandle handle(this, signal);
1768   SegmentedSectionPtr headerPtr, dataPtr;
1769 
1770   handle.getSection(headerPtr, 0);
1771   handle.getSection(dataPtr, 1);
1772 
1773   Uint32* headerBuffer = signal->theData + 25;
1774   Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1775 
1776   copy(headerBuffer, headerPtr);
1777   copy(dataBuffer, dataPtr);
1778   releaseSections(handle);
1779 
1780   AttrOrderBuffer::ConstDataBufferIterator iter;
1781   ndbrequire(subRec->attributeOrder.first(iter));
1782   for(Uint32 i = 0; i < headerPtr.sz; i++)
1783   {
1784     AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1785 
1786     // Filter out NULL attributes
1787     if (keyAttrHead->isNULL())
1788       return;
1789 
1790     /**
1791      * UTIL_EXECUTE header section expects real attrid (same as passed in
1792      * UTIL_PREPARE).  SUMA sends child attrid, replace it by parent attrid.
1793      */
1794     keyAttrHead->setAttributeId(*iter.data);
1795     subRec->attributeOrder.next(iter);
1796   }
1797   // Increase expected CONF count
1798   subRec->expectedConf++;
1799 
1800   struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1801   sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1802   sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = subRec->noOfKeyColumns;
1803   sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1804   sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1805   sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1806 	     UtilExecuteReq::SignalLength, JBB,
1807 	     sectionsPtr, UtilExecuteReq::NoOfSections);
1808 }
1809 
1810 // index stats
1811 
1812 Trix::StatOp&
statOpGetPtr(Uint32 statPtrI)1813 Trix::statOpGetPtr(Uint32 statPtrI)
1814 {
1815   ndbrequire(statPtrI != RNIL);
1816   return *c_statOpPool.getPtr(statPtrI);
1817 }
1818 
1819 bool
statOpSeize(Uint32 & statPtrI)1820 Trix::statOpSeize(Uint32& statPtrI)
1821 {
1822   StatOpPtr statPtr;
1823   if (ERROR_INSERTED(18001) ||
1824       !c_statOpPool.seize(statPtr))
1825   {
1826     jam();
1827     CLEAR_ERROR_INSERT_VALUE;
1828     D("statOpSeize: seize statOp failed");
1829     return false;
1830   }
1831 #ifdef VM_TRACE
1832   memset(statPtr.p, 0xf3, sizeof(*statPtr.p));
1833 #endif
1834   new (statPtr.p) StatOp;
1835   statPtrI = statPtr.i;
1836   StatOp& stat = statOpGetPtr(statPtrI);
1837   stat.m_ownPtrI = statPtrI;
1838 
1839   SubscriptionRecPtr subRecPtr;
1840   if (ERROR_INSERTED(18002) ||
1841       !c_theSubscriptions.seizeFirst(subRecPtr))
1842   {
1843     jam();
1844     CLEAR_ERROR_INSERT_VALUE;
1845     c_statOpPool.release(statPtr);
1846     D("statOpSeize: seize subRec failed");
1847     return false;
1848   }
1849   SubscriptionRecord* subRec = subRecPtr.p;
1850   subRec->m_statPtrI = stat.m_ownPtrI;
1851   stat.m_subRecPtrI = subRecPtr.i;
1852 
1853   D("statOpSeize" << V(statPtrI) << V(subRecPtr.i));
1854   return true;
1855 }
1856 
1857 void
statOpRelease(StatOp & stat)1858 Trix::statOpRelease(StatOp& stat)
1859 {
1860   StatOp::Util& util = stat.m_util;
1861   D("statOpRelease" << V(stat));
1862 
1863   if (stat.m_subRecPtrI != RNIL)
1864   {
1865     jam();
1866     SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1867     ndbrequire(subRec->prepareId == RNIL);
1868     subRec->attributeOrder.release();
1869     c_theSubscriptions.release(stat.m_subRecPtrI);
1870     stat.m_subRecPtrI = RNIL;
1871   }
1872   ndbrequire(util.m_prepareId == RNIL);
1873   c_statOpPool.release(stat.m_ownPtrI);
1874 }
1875 
1876 void
execINDEX_STAT_IMPL_REQ(Signal * signal)1877 Trix::execINDEX_STAT_IMPL_REQ(Signal* signal)
1878 {
1879   jamEntry();
1880   const IndexStatImplReq* req = (const IndexStatImplReq*)signal->getDataPtr();
1881 
1882   Uint32 statPtrI = RNIL;
1883   if (!statOpSeize(statPtrI))
1884   {
1885     jam();
1886     const IndexStatImplReq reqCopy = *req;
1887     statOpRef(signal, &reqCopy, IndexStatRef::NoFreeStatOp, __LINE__);
1888     return;
1889   }
1890   StatOp& stat = statOpGetPtr(statPtrI);
1891   stat.m_req = *req;
1892   stat.m_requestType = req->requestType;
1893 
1894   // set request name for cluster log message
1895   switch (stat.m_requestType) {
1896   case IndexStatReq::RT_CLEAN_NEW:
1897     jam();
1898     stat.m_requestName = "clean new";
1899     break;
1900   case IndexStatReq::RT_CLEAN_OLD:
1901     jam();
1902     stat.m_requestName = "clean old";
1903     break;
1904   case IndexStatReq::RT_CLEAN_ALL:
1905     jam();
1906     stat.m_requestName = "clean all";
1907     break;
1908   case IndexStatReq::RT_SCAN_FRAG:
1909     jam();
1910     stat.m_requestName = "scan frag";
1911     break;
1912   case IndexStatReq::RT_DROP_HEAD:
1913     jam();
1914     stat.m_requestName = "drop head";
1915     break;
1916   default:
1917     ndbrequire(false);
1918     break;
1919   }
1920 
1921   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1922   subRec->prepareId = RNIL;
1923   subRec->errorCode = BuildIndxRef::NoError;
1924 
1925   // sys tables are not recreated so do this only once
1926   if (!c_statGetMetaDone)
1927   {
1928     jam();
1929     statMetaGetHead(signal, stat);
1930     return;
1931   }
1932   statGetMetaDone(signal, stat);
1933 }
1934 
1935 // sys tables metadata
1936 
1937 const Trix::SysColumn
1938 Trix::g_statMetaHead_column[] = {
1939   { 0, "index_id",
1940     true
1941   },
1942   { 1, "index_version",
1943     true
1944   },
1945   { 2, "table_id",
1946     false
1947   },
1948   { 3, "frag_count",
1949     false
1950   },
1951   { 4, "value_format",
1952     false
1953   },
1954   { 5, "sample_version",
1955     false
1956   },
1957   { 6, "load_time",
1958     false
1959   },
1960   { 7, "sample_count",
1961     false
1962   },
1963   { 8, "key_bytes",
1964     false
1965   }
1966 };
1967 
1968 const Trix::SysColumn
1969 Trix::g_statMetaSample_column[] = {
1970   { 0, "index_id",
1971     true
1972   },
1973   { 1, "index_version",
1974     true
1975   },
1976   { 2, "sample_version",
1977     true
1978   },
1979   { 3, "stat_key",
1980     true
1981   },
1982   { 4, "stat_value",
1983     false
1984   }
1985 };
1986 
1987 const Trix::SysTable
1988 Trix::g_statMetaHead = {
1989   NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_HEAD_TABLE,
1990   ~(Uint32)0,
1991   sizeof(g_statMetaHead_column)/sizeof(g_statMetaHead_column[0]),
1992   g_statMetaHead_column
1993 };
1994 
1995 const Trix::SysTable
1996 Trix::g_statMetaSample = {
1997   NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_SAMPLE_TABLE,
1998   ~(Uint32)0,
1999   sizeof(g_statMetaSample_column)/sizeof(g_statMetaSample_column[0]),
2000   g_statMetaSample_column
2001 };
2002 
2003 const Trix::SysIndex
2004 Trix::g_statMetaSampleX1 = {
2005   // indexes are always in "sys"
2006   "sys" "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
2007   ~(Uint32)0,
2008   ~(Uint32)0
2009 };
2010 
2011 void
statMetaGetHead(Signal * signal,StatOp & stat)2012 Trix::statMetaGetHead(Signal* signal, StatOp& stat)
2013 {
2014   D("statMetaGetHead" << V(stat));
2015   StatOp::Meta& meta = stat.m_meta;
2016   meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
2017   meta.m_cb.m_callbackData = stat.m_ownPtrI;
2018   const char* name = g_statMetaHead.name;
2019   sendGetTabInfoReq(signal, stat, name);
2020 }
2021 
2022 void
statMetaGetHeadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2023 Trix::statMetaGetHeadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2024 {
2025   StatOp& stat = statOpGetPtr(statPtrI);
2026   D("statMetaGetHeadCB" << V(stat) << V(ret));
2027   StatOp::Meta& meta = stat.m_meta;
2028   if (ret != 0)
2029   {
2030     jam();
2031     Uint32 supress[] = { GetTabInfoRef::TableNotDefined, 0 };
2032     statOpError(signal, stat, ret, __LINE__, supress);
2033     return;
2034   }
2035   g_statMetaHead.tableId = meta.m_conf.tableId;
2036   statMetaGetSample(signal, stat);
2037 }
2038 
2039 void
statMetaGetSample(Signal * signal,StatOp & stat)2040 Trix::statMetaGetSample(Signal* signal, StatOp& stat)
2041 {
2042   D("statMetaGetSample" << V(stat));
2043   StatOp::Meta& meta = stat.m_meta;
2044   meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
2045   meta.m_cb.m_callbackData = stat.m_ownPtrI;
2046   const char* name = g_statMetaSample.name;
2047   sendGetTabInfoReq(signal, stat, name);
2048 }
2049 
2050 void
statMetaGetSampleCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2051 Trix::statMetaGetSampleCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2052 {
2053   StatOp& stat = statOpGetPtr(statPtrI);
2054   D("statMetaGetSampleCB" << V(stat) << V(ret));
2055   StatOp::Meta& meta = stat.m_meta;
2056   if (ret != 0)
2057   {
2058     jam();
2059     statOpError(signal, stat, ret, __LINE__);
2060     return;
2061   }
2062   g_statMetaSample.tableId = meta.m_conf.tableId;
2063   statMetaGetSampleX1(signal, stat);
2064 }
2065 
2066 void
statMetaGetSampleX1(Signal * signal,StatOp & stat)2067 Trix::statMetaGetSampleX1(Signal* signal, StatOp& stat)
2068 {
2069   D("statMetaGetSampleX1" << V(stat));
2070   StatOp::Meta& meta = stat.m_meta;
2071   meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
2072   meta.m_cb.m_callbackData = stat.m_ownPtrI;
2073   const char* name_fmt = g_statMetaSampleX1.name;
2074   char name[MAX_TAB_NAME_SIZE];
2075   BaseString::snprintf(name, sizeof(name), name_fmt, g_statMetaSample.tableId);
2076   sendGetTabInfoReq(signal, stat, name);
2077 }
2078 
2079 void
statMetaGetSampleX1CB(Signal * signal,Uint32 statPtrI,Uint32 ret)2080 Trix::statMetaGetSampleX1CB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2081 {
2082   StatOp& stat = statOpGetPtr(statPtrI);
2083   D("statMetaGetSampleX1CB" << V(stat) << V(ret));
2084   StatOp::Meta& meta = stat.m_meta;
2085   if (ret != 0)
2086   {
2087     jam();
2088     statOpError(signal, stat, ret, __LINE__);
2089     return;
2090   }
2091   g_statMetaSampleX1.tableId = g_statMetaSample.tableId;
2092   g_statMetaSampleX1.indexId = meta.m_conf.tableId;
2093   statGetMetaDone(signal, stat);
2094 }
2095 
2096 void
sendGetTabInfoReq(Signal * signal,StatOp & stat,const char * name)2097 Trix::sendGetTabInfoReq(Signal* signal, StatOp& stat, const char* name)
2098 {
2099   D("sendGetTabInfoReq" << V(stat) << V(name));
2100   GetTabInfoReq* req = (GetTabInfoReq*)signal->getDataPtrSend();
2101 
2102   Uint32 name_len = (Uint32)strlen(name) + 1;
2103   Uint32 name_len_words = (name_len + 3 ) / 4;
2104   Uint32 name_buf[32];
2105   ndbrequire(name_len_words <= 32);
2106   memset(name_buf, 0, sizeof(name_buf));
2107   memcpy(name_buf, name, name_len);
2108 
2109   req->senderData = stat.m_ownPtrI;
2110   req->senderRef = reference();
2111   req->requestType = GetTabInfoReq::RequestByName |
2112                      GetTabInfoReq::LongSignalConf;
2113   req->tableNameLen = name_len;
2114   req->schemaTransId = 0;
2115   LinearSectionPtr ptr[3];
2116   ptr[0].p = name_buf;
2117   ptr[0].sz = name_len_words;
2118   sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
2119              signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
2120 }
2121 
2122 void
execGET_TABINFO_CONF(Signal * signal)2123 Trix::execGET_TABINFO_CONF(Signal* signal)
2124 {
2125   jamEntry();
2126   if (!assembleFragments(signal)) {
2127     jam();
2128     return;
2129   }
2130   const GetTabInfoConf* conf = (const GetTabInfoConf*)signal->getDataPtr();
2131   StatOp& stat = statOpGetPtr(conf->senderData);
2132   D("execGET_TABINFO_CONF" << V(stat));
2133   StatOp::Meta& meta = stat.m_meta;
2134   meta.m_conf = *conf;
2135 
2136   // do not need DICTTABINFO
2137   SectionHandle handle(this, signal);
2138   releaseSections(handle);
2139 
2140   execute(signal, meta.m_cb, 0);
2141 }
2142 
2143 void
execGET_TABINFO_REF(Signal * signal)2144 Trix::execGET_TABINFO_REF(Signal* signal)
2145 {
2146   jamEntry();
2147   const GetTabInfoRef* ref = (const GetTabInfoRef*)signal->getDataPtr();
2148   StatOp& stat = statOpGetPtr(ref->senderData);
2149   D("execGET_TABINFO_REF" << V(stat));
2150   StatOp::Meta& meta = stat.m_meta;
2151 
2152   ndbrequire(ref->errorCode != 0);
2153   execute(signal, meta.m_cb, ref->errorCode);
2154 }
2155 
2156 // continue after metadata retrieval
2157 
2158 void
statGetMetaDone(Signal * signal,StatOp & stat)2159 Trix::statGetMetaDone(Signal* signal, StatOp& stat)
2160 {
2161   const IndexStatImplReq* req = &stat.m_req;
2162   StatOp::Data& data = stat.m_data;
2163   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2164   D("statGetMetaDone" << V(stat));
2165 
2166   // c_statGetMetaDone = true;
2167 
2168   subRec->requestType = STAT_UTIL;
2169   // fill in constant part
2170   ndbrequire(req->fragCount != 0);
2171   data.m_indexId = req->indexId;
2172   data.m_indexVersion = req->indexVersion;
2173   data.m_fragCount = req->fragCount;
2174   statHeadRead(signal, stat);
2175 }
2176 
2177 // head table ops
2178 
2179 void
statHeadRead(Signal * signal,StatOp & stat)2180 Trix::statHeadRead(Signal* signal, StatOp& stat)
2181 {
2182   StatOp::Util& util = stat.m_util;
2183   StatOp::Send& send = stat.m_send;
2184   D("statHeadRead" << V(stat));
2185 
2186   util.m_not_found = false;
2187   util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
2188   util.m_cb.m_callbackData = stat.m_ownPtrI;
2189   send.m_sysTable = &g_statMetaHead;
2190   send.m_operationType = UtilPrepareReq::Read;
2191   statUtilPrepare(signal, stat);
2192 }
2193 
2194 void
statHeadReadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2195 Trix::statHeadReadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2196 {
2197   StatOp& stat = statOpGetPtr(statPtrI);
2198   StatOp::Data& data = stat.m_data;
2199   StatOp::Util& util = stat.m_util;
2200   D("statHeadReadCB" << V(stat) << V(ret));
2201 
2202   ndbrequire(ret == 0);
2203   data.m_head_found = !util.m_not_found;
2204   statReadHeadDone(signal, stat);
2205 }
2206 
2207 void
statHeadInsert(Signal * signal,StatOp & stat)2208 Trix::statHeadInsert(Signal* signal, StatOp& stat)
2209 {
2210   StatOp::Util& util = stat.m_util;
2211   StatOp::Send& send = stat.m_send;
2212   D("statHeadInsert" << V(stat));
2213 
2214   util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
2215   util.m_cb.m_callbackData = stat.m_ownPtrI;
2216   send.m_sysTable = &g_statMetaHead;
2217   send.m_operationType = UtilPrepareReq::Insert;
2218   statUtilPrepare(signal, stat);
2219 }
2220 
2221 void
statHeadInsertCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2222 Trix::statHeadInsertCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2223 {
2224   StatOp& stat = statOpGetPtr(statPtrI);
2225   D("statHeadInsertCB" << V(stat) << V(ret));
2226 
2227   ndbrequire(ret == 0);
2228   statInsertHeadDone(signal, stat);
2229 }
2230 
2231 void
statHeadUpdate(Signal * signal,StatOp & stat)2232 Trix::statHeadUpdate(Signal* signal, StatOp& stat)
2233 {
2234   StatOp::Util& util = stat.m_util;
2235   StatOp::Send& send = stat.m_send;
2236   D("statHeadUpdate" << V(stat));
2237 
2238   util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
2239   util.m_cb.m_callbackData = stat.m_ownPtrI;
2240   send.m_sysTable = &g_statMetaHead;
2241   send.m_operationType = UtilPrepareReq::Update;
2242   statUtilPrepare(signal, stat);
2243 }
2244 
2245 void
statHeadUpdateCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2246 Trix::statHeadUpdateCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2247 {
2248   StatOp& stat = statOpGetPtr(statPtrI);
2249   D("statHeadUpdateCB" << V(stat) << V(ret));
2250 
2251   ndbrequire(ret == 0);
2252   statUpdateHeadDone(signal, stat);
2253 }
2254 
2255 void
statHeadDelete(Signal * signal,StatOp & stat)2256 Trix::statHeadDelete(Signal* signal, StatOp& stat)
2257 {
2258   StatOp::Util& util = stat.m_util;
2259   StatOp::Send& send = stat.m_send;
2260   D("statHeadDelete" << V(stat));
2261 
2262   util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
2263   util.m_cb.m_callbackData = stat.m_ownPtrI;
2264   send.m_sysTable = &g_statMetaHead;
2265   send.m_operationType = UtilPrepareReq::Delete;
2266   statUtilPrepare(signal, stat);
2267 }
2268 
2269 void
statHeadDeleteCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2270 Trix::statHeadDeleteCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2271 {
2272   StatOp& stat = statOpGetPtr(statPtrI);
2273   D("statHeadDeleteCB" << V(stat) << V(ret));
2274 
2275   ndbrequire(ret == 0);
2276   statDeleteHeadDone(signal, stat);
2277 }
2278 
2279 // util (PK ops, only HEAD for now)
2280 
2281 void
statUtilPrepare(Signal * signal,StatOp & stat)2282 Trix::statUtilPrepare(Signal* signal, StatOp& stat)
2283 {
2284   StatOp::Util& util = stat.m_util;
2285   D("statUtilPrepare" << V(stat));
2286 
2287   util.m_prepareId = RNIL;
2288   statSendPrepare(signal, stat);
2289 }
2290 
2291 void
statUtilPrepareConf(Signal * signal,Uint32 statPtrI)2292 Trix::statUtilPrepareConf(Signal* signal, Uint32 statPtrI)
2293 {
2294   StatOp& stat = statOpGetPtr(statPtrI);
2295   StatOp::Util& util = stat.m_util;
2296   StatOp::Send& send = stat.m_send;
2297   D("statUtilPrepareConf" << V(stat));
2298 
2299   const UtilPrepareConf* utilConf =
2300     (const UtilPrepareConf*)signal->getDataPtr();
2301   util.m_prepareId = utilConf->prepareId;
2302 
2303   const Uint32 ot = send.m_operationType;
2304   if ((ERROR_INSERTED(18011) && ot == UtilPrepareReq::Read) ||
2305       (ERROR_INSERTED(18012) && ot != UtilPrepareReq::Read))
2306   {
2307     jam();
2308     CLEAR_ERROR_INSERT_VALUE;
2309     UtilExecuteRef* utilRef =
2310       (UtilExecuteRef*)signal->getDataPtrSend();
2311     utilRef->senderData = stat.m_ownPtrI;
2312     utilRef->errorCode = UtilExecuteRef::AllocationError;
2313     utilRef->TCErrorCode = 0;
2314     sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2315                signal, UtilExecuteRef::SignalLength, JBB);
2316     return;
2317   }
2318 
2319   statUtilExecute(signal, stat);
2320 }
2321 
2322 void
statUtilPrepareRef(Signal * signal,Uint32 statPtrI)2323 Trix::statUtilPrepareRef(Signal* signal, Uint32 statPtrI)
2324 {
2325   StatOp& stat = statOpGetPtr(statPtrI);
2326   D("statUtilPrepareRef" << V(stat));
2327 
2328   const UtilPrepareRef* utilRef =
2329     (const UtilPrepareRef*)signal->getDataPtr();
2330   Uint32 errorCode = utilRef->errorCode;
2331   ndbrequire(errorCode != 0);
2332 
2333   switch (errorCode) {
2334   case UtilPrepareRef::PREPARE_SEIZE_ERROR:
2335   case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
2336   case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
2337     errorCode = IndexStatRef::BusyUtilPrepare;
2338     break;
2339   case UtilPrepareRef::DICT_TAB_INFO_ERROR:
2340     errorCode = IndexStatRef::InvalidSysTable;
2341     break;
2342   case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
2343   default:
2344     ndbrequire(false);
2345     break;
2346   }
2347   statOpError(signal, stat, errorCode, __LINE__);
2348 }
2349 
2350 void
statUtilExecute(Signal * signal,StatOp & stat)2351 Trix::statUtilExecute(Signal* signal, StatOp& stat)
2352 {
2353   StatOp::Util& util = stat.m_util;
2354   StatOp::Send& send = stat.m_send;
2355   D("statUtilExecute" << V(stat));
2356 
2357   send.m_prepareId = util.m_prepareId;
2358   statSendExecute(signal, stat);
2359 }
2360 
2361 void
statUtilExecuteConf(Signal * signal,Uint32 statPtrI)2362 Trix::statUtilExecuteConf(Signal* signal, Uint32 statPtrI)
2363 {
2364   StatOp& stat = statOpGetPtr(statPtrI);
2365   StatOp::Attr& attr = stat.m_attr;
2366   StatOp::Send& send = stat.m_send;
2367   D("statUtilExecuteConf" << V(stat));
2368 
2369   if (send.m_operationType == UtilPrepareReq::Read)
2370   {
2371     jam();
2372     SectionHandle handle(this, signal);
2373     Uint32 rattr[20];
2374     Uint32 rdata[2048];
2375     attr.m_attr = rattr;
2376     attr.m_attrMax = 20;
2377     attr.m_attrSize = 0;
2378     attr.m_data = rdata;
2379     attr.m_dataMax = 2048;
2380     attr.m_dataSize = 0;
2381     {
2382       SegmentedSectionPtr ssPtr;
2383       handle.getSection(ssPtr, 0);
2384       ::copy(rattr, ssPtr);
2385     }
2386     {
2387       SegmentedSectionPtr ssPtr;
2388       handle.getSection(ssPtr, 1);
2389       ::copy(rdata, ssPtr);
2390     }
2391     releaseSections(handle);
2392 
2393     const SysTable& sysTable = *send.m_sysTable;
2394     for (Uint32 i = 0; i < sysTable.columnCount; i++)
2395     {
2396       jam();
2397       statDataIn(stat, i);
2398     }
2399   }
2400 
2401   statUtilRelease(signal, stat);
2402 }
2403 
2404 void
statUtilExecuteRef(Signal * signal,Uint32 statPtrI)2405 Trix::statUtilExecuteRef(Signal* signal, Uint32 statPtrI)
2406 {
2407   StatOp& stat = statOpGetPtr(statPtrI);
2408   StatOp::Util& util = stat.m_util;
2409   StatOp::Send& send = stat.m_send;
2410   D("statUtilExecuteRef" << V(stat));
2411 
2412   const UtilExecuteRef* utilRef =
2413     (const UtilExecuteRef*)signal->getDataPtr();
2414   Uint32 errorCode = utilRef->errorCode;
2415   ndbrequire(errorCode != 0);
2416 
2417   switch (errorCode) {
2418   case UtilExecuteRef::TCError:
2419     errorCode = utilRef->TCErrorCode;
2420     ndbrequire(errorCode != 0);
2421     if (send.m_operationType == UtilPrepareReq::Read &&
2422         errorCode == ZNOT_FOUND)
2423     {
2424       jam();
2425       util.m_not_found = true;
2426       errorCode = 0;
2427     }
2428     break;
2429   case UtilExecuteRef::AllocationError:
2430     errorCode = IndexStatRef::BusyUtilExecute;
2431     break;
2432   default:
2433     ndbrequire(false);
2434     break;
2435   }
2436 
2437   if (errorCode != 0)
2438   {
2439     jam();
2440     statOpError(signal, stat, errorCode, __LINE__);
2441     return;
2442   }
2443   statUtilRelease(signal, stat);
2444 }
2445 
2446 void
statUtilRelease(Signal * signal,StatOp & stat)2447 Trix::statUtilRelease(Signal* signal, StatOp& stat)
2448 {
2449   StatOp::Util& util = stat.m_util;
2450   StatOp::Send& send = stat.m_send;
2451   D("statUtilRelease" << V(stat));
2452 
2453   send.m_prepareId = util.m_prepareId;
2454   statSendRelease(signal, stat);
2455 }
2456 
2457 void
statUtilReleaseConf(Signal * signal,Uint32 statPtrI)2458 Trix::statUtilReleaseConf(Signal* signal, Uint32 statPtrI)
2459 {
2460   StatOp& stat = statOpGetPtr(statPtrI);
2461   StatOp::Util& util = stat.m_util;
2462   D("statUtilReleaseConf" << V(stat));
2463 
2464   util.m_prepareId = RNIL;
2465   execute(signal, util.m_cb, 0);
2466 }
2467 
2468 // continue after head table ops
2469 
2470 void
statReadHeadDone(Signal * signal,StatOp & stat)2471 Trix::statReadHeadDone(Signal* signal, StatOp& stat)
2472 {
2473   //UNUSED StatOp::Data& data = stat.m_data;
2474   D("statReadHeadDone" << V(stat));
2475 
2476   switch (stat.m_requestType) {
2477   case IndexStatReq::RT_CLEAN_NEW:
2478     jam();
2479   case IndexStatReq::RT_CLEAN_OLD:
2480     jam();
2481   case IndexStatReq::RT_CLEAN_ALL:
2482     jam();
2483     statCleanBegin(signal, stat);
2484     break;
2485 
2486   case IndexStatReq::RT_SCAN_FRAG:
2487     jam();
2488     statScanBegin(signal, stat);
2489     break;
2490 
2491   case IndexStatReq::RT_DROP_HEAD:
2492     jam();
2493     statDropBegin(signal, stat);
2494     break;
2495 
2496   default:
2497     ndbrequire(false);
2498     break;
2499   }
2500 }
2501 
2502 void
statInsertHeadDone(Signal * signal,StatOp & stat)2503 Trix::statInsertHeadDone(Signal* signal, StatOp& stat)
2504 {
2505   D("statInsertHeadDone" << V(stat));
2506 
2507   switch (stat.m_requestType) {
2508   case IndexStatReq::RT_SCAN_FRAG:
2509     jam();
2510     statScanEnd(signal, stat);
2511     break;
2512   default:
2513     ndbrequire(false);
2514     break;
2515   }
2516 }
2517 
2518 void
statUpdateHeadDone(Signal * signal,StatOp & stat)2519 Trix::statUpdateHeadDone(Signal* signal, StatOp& stat)
2520 {
2521   D("statUpdateHeadDone" << V(stat));
2522 
2523   switch (stat.m_requestType) {
2524   case IndexStatReq::RT_SCAN_FRAG:
2525     jam();
2526     statScanEnd(signal, stat);
2527     break;
2528   default:
2529     ndbrequire(false);
2530     break;
2531   }
2532 }
2533 
2534 void
statDeleteHeadDone(Signal * signal,StatOp & stat)2535 Trix::statDeleteHeadDone(Signal* signal, StatOp& stat)
2536 {
2537   D("statDeleteHeadDone" << V(stat));
2538 
2539   switch (stat.m_requestType) {
2540   case IndexStatReq::RT_DROP_HEAD:
2541     jam();
2542     statDropEnd(signal, stat);
2543     break;
2544   default:
2545     ndbrequire(false);
2546     break;
2547   }
2548 }
2549 
2550 // clean
2551 
2552 void
statCleanBegin(Signal * signal,StatOp & stat)2553 Trix::statCleanBegin(Signal* signal, StatOp& stat)
2554 {
2555   const IndexStatImplReq* req = &stat.m_req;
2556   StatOp::Data& data = stat.m_data;
2557   D("statCleanBegin" << V(stat));
2558 
2559   if (data.m_head_found == true)
2560   {
2561     jam();
2562     if (data.m_tableId != req->tableId &&
2563         stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2564     {
2565       jam();
2566       // must run ndb_index_stat --drop
2567       statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2568       return;
2569     }
2570   }
2571   else
2572   {
2573     if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2574     {
2575       jam();
2576       // happens normally on first stats scan
2577       stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
2578     }
2579   }
2580   statCleanPrepare(signal, stat);
2581 }
2582 
2583 void
statCleanPrepare(Signal * signal,StatOp & stat)2584 Trix::statCleanPrepare(Signal* signal, StatOp& stat)
2585 {
2586   const IndexStatImplReq* req = &stat.m_req;
2587   StatOp::Data& data = stat.m_data;
2588   StatOp::Clean& clean = stat.m_clean;
2589   StatOp::Send& send = stat.m_send;
2590   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2591   D("statCleanPrepare" << V(stat));
2592 
2593   // count of deleted samples is just for info
2594   clean.m_cleanCount = 0;
2595 
2596   const Uint32 ao_list[] = {
2597     0,  // INDEX_ID
2598     1,  // INDEX_VERSION
2599     2,  // SAMPLE_VERSION
2600     3   // STAT_KEY
2601   };
2602   const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2603 
2604   ndbrequire(req->fragId == ZNIL);
2605   subRec->m_flags = 0;
2606   subRec->requestType = STAT_CLEAN;
2607   subRec->schemaTransId = req->transId;
2608   subRec->userReference = 0; // not used
2609   subRec->connectionPtr = RNIL;
2610   subRec->subscriptionId = rand();
2611   subRec->subscriptionKey = rand();
2612   subRec->prepareId = RNIL;
2613   subRec->indexType = 0; // not used
2614   subRec->sourceTableId = g_statMetaSampleX1.indexId;
2615   subRec->targetTableId = RNIL;
2616   subRec->noOfIndexColumns = ao_size;
2617   subRec->noOfKeyColumns = 0;
2618   subRec->parallelism = 16;
2619   subRec->fragCount = 0;
2620   subRec->fragId = ZNIL;
2621   subRec->syncPtr = RNIL;
2622   subRec->errorCode = BuildIndxRef::NoError;
2623   subRec->subscriptionCreated = false;
2624   subRec->pendingSubSyncContinueConf = false;
2625   subRec->expectedConf = 0;
2626   subRec->m_rows_processed = 0;
2627   subRec->m_gci = 0;
2628 
2629   AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2630   ndbrequire(ao_buf.isEmpty());
2631   ao_buf.append(ao_list, ao_size);
2632 
2633   // create TUX bounds
2634   clean.m_bound[0] = TuxBoundInfo::BoundEQ;
2635   clean.m_bound[1] = AttributeHeader(0, 4).m_value;
2636   clean.m_bound[2] = data.m_indexId;
2637   clean.m_bound[3] = TuxBoundInfo::BoundEQ;
2638   clean.m_bound[4] = AttributeHeader(1, 4).m_value;
2639   clean.m_bound[5] = data.m_indexVersion;
2640   switch (stat.m_requestType) {
2641   case IndexStatReq::RT_CLEAN_NEW:
2642     D("statCleanPrepare delete sample versions > " << data.m_sampleVersion);
2643     clean.m_bound[6] = TuxBoundInfo::BoundLT;
2644     clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2645     clean.m_bound[8] = data.m_sampleVersion;
2646     clean.m_boundCount = 3;
2647     break;
2648   case IndexStatReq::RT_CLEAN_OLD:
2649     D("statCleanPrepare delete sample versions < " << data.m_sampleVersion);
2650     clean.m_bound[6] = TuxBoundInfo::BoundGT;
2651     clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2652     clean.m_bound[8] = data.m_sampleVersion;
2653     clean.m_boundCount = 3;
2654     break;
2655   case IndexStatReq::RT_CLEAN_ALL:
2656     D("statCleanPrepare delete all sample versions");
2657     clean.m_boundCount = 2;
2658     break;
2659   default:
2660     ndbrequire(false);
2661     break;
2662   }
2663   clean.m_boundSize = 3 * clean.m_boundCount;
2664 
2665   // TRIX traps the CONF
2666   send.m_sysTable = &g_statMetaSample;
2667   send.m_operationType = UtilPrepareReq::Delete;
2668   statSendPrepare(signal, stat);
2669 }
2670 
2671 void
statCleanExecute(Signal * signal,StatOp & stat)2672 Trix::statCleanExecute(Signal* signal, StatOp& stat)
2673 {
2674   StatOp::Data& data = stat.m_data;
2675   StatOp::Send& send = stat.m_send;
2676   StatOp::Clean& clean = stat.m_clean;
2677   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2678   D("statCleanExecute" << V(stat));
2679 
2680   CRASH_INSERTION(18025);
2681 
2682   SectionHandle handle(this, signal);
2683   ndbrequire(handle.m_cnt == 2);
2684 
2685   // ATTR_INFO
2686   AttributeHeader ah[4];
2687   SegmentedSectionPtr ptr0;
2688   handle.getSection(ptr0, SubTableData::ATTR_INFO);
2689   ndbrequire(ptr0.sz == 4);
2690   ::copy((Uint32*)ah, ptr0);
2691   ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
2692   ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
2693   ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
2694   // read via TUP rounds bytes to words
2695   const Uint32 kz = ah[3].getDataSize();
2696   ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
2697 
2698   // AFTER_VALUES
2699   const Uint32 avmax = 3 + MAX_INDEX_STAT_KEY_SIZE;
2700   Uint32 av[avmax];
2701   SegmentedSectionPtr ptr1;
2702   handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2703   ndbrequire(ptr1.sz <= avmax);
2704   ::copy(av, ptr1);
2705   ndbrequire(data.m_indexId == av[0]);
2706   ndbrequire(data.m_indexVersion == av[1]);
2707   data.m_sampleVersion = av[2];
2708   data.m_statKey = &av[3];
2709   const unsigned char* kp = (const unsigned char*)data.m_statKey;
2710   const Uint32 kb = kp[0] + (kp[1] << 8);
2711   // key is not empty
2712   ndbrequire(kb != 0);
2713   ndbrequire(kz == ((2 + kb) + 3) / 4);
2714 
2715   clean.m_cleanCount++;
2716   releaseSections(handle);
2717 
2718   const Uint32 rt = stat.m_requestType;
2719   if ((ERROR_INSERTED(18021) && rt == IndexStatReq::RT_CLEAN_NEW) ||
2720       (ERROR_INSERTED(18022) && rt == IndexStatReq::RT_CLEAN_OLD) ||
2721       (ERROR_INSERTED(18023) && rt == IndexStatReq::RT_CLEAN_ALL))
2722   {
2723     jam();
2724     CLEAR_ERROR_INSERT_VALUE;
2725     UtilExecuteRef* utilRef =
2726       (UtilExecuteRef*)signal->getDataPtrSend();
2727     utilRef->senderData = stat.m_ownPtrI;
2728     utilRef->errorCode = UtilExecuteRef::TCError;
2729     utilRef->TCErrorCode = 626;
2730     sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2731                signal, UtilExecuteRef::SignalLength, JBB);
2732     subRec->expectedConf++;
2733     return;
2734   }
2735 
2736   // TRIX traps the CONF
2737   send.m_sysTable = &g_statMetaSample;
2738   send.m_operationType = UtilPrepareReq::Delete;
2739   send.m_prepareId = subRec->prepareId;
2740   subRec->expectedConf++;
2741   statSendExecute(signal, stat);
2742 }
2743 
2744 void
statCleanRelease(Signal * signal,StatOp & stat)2745 Trix::statCleanRelease(Signal* signal, StatOp& stat)
2746 {
2747   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2748   D("statCleanRelease" << V(stat) << V(subRec->errorCode));
2749 
2750   if (subRec->errorCode != 0)
2751   {
2752     jam();
2753     statOpError(signal, stat, subRec->errorCode, __LINE__);
2754     return;
2755   }
2756   statCleanEnd(signal, stat);
2757 }
2758 
2759 void
statCleanEnd(Signal * signal,StatOp & stat)2760 Trix::statCleanEnd(Signal* signal, StatOp& stat)
2761 {
2762   D("statCleanEnd" << V(stat));
2763   statOpSuccess(signal, stat);
2764 }
2765 
2766 // scan
2767 
2768 void
statScanBegin(Signal * signal,StatOp & stat)2769 Trix::statScanBegin(Signal* signal, StatOp& stat)
2770 {
2771   const IndexStatImplReq* req = &stat.m_req;
2772   StatOp::Data& data = stat.m_data;
2773   D("statScanBegin" << V(stat));
2774 
2775   if (data.m_head_found == true &&
2776       data.m_tableId != req->tableId)
2777   {
2778     jam();
2779     statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2780     return;
2781   }
2782   data.m_tableId = req->tableId;
2783   statScanPrepare(signal, stat);
2784 }
2785 
2786 void
statScanPrepare(Signal * signal,StatOp & stat)2787 Trix::statScanPrepare(Signal* signal, StatOp& stat)
2788 {
2789   const IndexStatImplReq* req = &stat.m_req;
2790   StatOp::Data& data = stat.m_data;
2791   StatOp::Scan& scan = stat.m_scan;
2792   StatOp::Send& send = stat.m_send;
2793   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2794   D("statScanPrepare" << V(stat));
2795 
2796   // update sample version prior to scan
2797   if (data.m_head_found == false)
2798     data.m_sampleVersion = 0;
2799   data.m_sampleVersion += 1;
2800 
2801   // zero totals
2802   scan.m_sampleCount = 0;
2803   scan.m_keyBytes = 0;
2804 
2805   const Uint32 ao_list[] = {
2806     AttributeHeader::INDEX_STAT_KEY,
2807     AttributeHeader::INDEX_STAT_VALUE
2808   };
2809   const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2810 
2811   ndbrequire(req->fragId != ZNIL);
2812   subRec->m_flags = 0;
2813   subRec->requestType = STAT_SCAN;
2814   subRec->schemaTransId = req->transId;
2815   subRec->userReference = 0; // not used
2816   subRec->connectionPtr = RNIL;
2817   subRec->subscriptionId = rand();
2818   subRec->subscriptionKey = rand();
2819   subRec->prepareId = RNIL;
2820   subRec->indexType = 0; // not used
2821   subRec->sourceTableId = data.m_indexId;
2822   subRec->targetTableId = RNIL;
2823   subRec->noOfIndexColumns = ao_size;
2824   subRec->noOfKeyColumns = 0;
2825   subRec->parallelism = 16;
2826   subRec->fragCount = 0; // XXX Suma currently checks all frags
2827   subRec->fragId = req->fragId;
2828   subRec->syncPtr = RNIL;
2829   subRec->errorCode = BuildIndxRef::NoError;
2830   subRec->subscriptionCreated = false;
2831   subRec->pendingSubSyncContinueConf = false;
2832   subRec->expectedConf = 0;
2833   subRec->m_rows_processed = 0;
2834   subRec->m_gci = 0;
2835 
2836   AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2837   ndbrequire(ao_buf.isEmpty());
2838   ao_buf.append(ao_list, ao_size);
2839 
2840   // TRIX traps the CONF
2841   send.m_sysTable = &g_statMetaSample;
2842   send.m_operationType = UtilPrepareReq::Insert;
2843   statSendPrepare(signal, stat);
2844 }
2845 
2846 void
statScanExecute(Signal * signal,StatOp & stat)2847 Trix::statScanExecute(Signal* signal, StatOp& stat)
2848 {
2849   StatOp::Data& data = stat.m_data;
2850   StatOp::Scan& scan = stat.m_scan;
2851   StatOp::Send& send = stat.m_send;
2852   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2853   D("statScanExecute" << V(stat));
2854 
2855   CRASH_INSERTION(18026);
2856 
2857   SectionHandle handle(this, signal);
2858   ndbrequire(handle.m_cnt == 2);
2859 
2860   // ATTR_INFO
2861   AttributeHeader ah[2];
2862   SegmentedSectionPtr ptr0;
2863   handle.getSection(ptr0, SubTableData::ATTR_INFO);
2864   ndbrequire(ptr0.sz == 2);
2865   ::copy((Uint32*)ah, ptr0);
2866   ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
2867   ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
2868   // read via TUP rounds bytes to words
2869   const Uint32 kz = ah[0].getDataSize();
2870   const Uint32 vz = ah[1].getDataSize();
2871   ndbrequire(kz != 0 && vz != 0);
2872 
2873   // AFTER_VALUES
2874   const Uint32 avmax = MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
2875   Uint32 av[avmax];
2876   SegmentedSectionPtr ptr1;
2877   handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2878   ndbrequire(ptr1.sz <= avmax);
2879   ::copy(av, ptr1);
2880   data.m_statKey = &av[0];
2881   data.m_statValue = &av[kz];
2882   const unsigned char* kp = (const unsigned char*)data.m_statKey;
2883   const unsigned char* vp = (const unsigned char*)data.m_statValue;
2884   const Uint32 kb = kp[0] + (kp[1] << 8);
2885   const Uint32 vb = vp[0] + (vp[1] << 8);
2886   // key and value are not empty
2887   ndbrequire(kb != 0 && vb != 0);
2888   ndbrequire(kz == ((2 + kb) + 3) / 4);
2889   ndbrequire(vz == ((2 + vb) + 3) / 4);
2890 
2891   scan.m_sampleCount++;
2892   scan.m_keyBytes += kb;
2893   releaseSections(handle);
2894 
2895   if (ERROR_INSERTED(18024))
2896   {
2897     jam();
2898     CLEAR_ERROR_INSERT_VALUE;
2899     UtilExecuteRef* utilRef =
2900       (UtilExecuteRef*)signal->getDataPtrSend();
2901     utilRef->senderData = stat.m_ownPtrI;
2902     utilRef->errorCode = UtilExecuteRef::TCError;
2903     utilRef->TCErrorCode = 630;
2904     sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2905                signal, UtilExecuteRef::SignalLength, JBB);
2906     subRec->expectedConf++;
2907     return;
2908   }
2909 
2910   // TRIX traps the CONF
2911   send.m_sysTable = &g_statMetaSample;
2912   send.m_operationType = UtilPrepareReq::Insert;
2913   send.m_prepareId = subRec->prepareId;
2914   subRec->expectedConf++;
2915   statSendExecute(signal, stat);
2916 }
2917 
2918 void
statScanRelease(Signal * signal,StatOp & stat)2919 Trix::statScanRelease(Signal* signal, StatOp& stat)
2920 {
2921   StatOp::Data& data = stat.m_data;
2922   StatOp::Scan& scan = stat.m_scan;
2923   SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2924   D("statScanRelease" << V(stat) << V(subRec->errorCode));
2925 
2926   if (subRec->errorCode != 0)
2927   {
2928     jam();
2929     statOpError(signal, stat, subRec->errorCode, __LINE__);
2930     return;
2931   }
2932   subRec->requestType = STAT_UTIL;
2933 
2934   const Uint32 now = (Uint32)time(0);
2935   data.m_loadTime = now;
2936   data.m_sampleCount = scan.m_sampleCount;
2937   data.m_keyBytes = scan.m_keyBytes;
2938   data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
2939 
2940   if (data.m_head_found == false)
2941   {
2942     jam();
2943     statHeadInsert(signal, stat);
2944   }
2945   else
2946   {
2947     jam();
2948     statHeadUpdate(signal, stat);
2949   }
2950 }
2951 
2952 void
statScanEnd(Signal * signal,StatOp & stat)2953 Trix::statScanEnd(Signal* signal, StatOp& stat)
2954 {
2955   StatOp::Data& data = stat.m_data;
2956   const IndexStatImplReq* req = &stat.m_req;
2957   D("statScanEnd" << V(stat));
2958 
2959   /*
2960    * TRIX reports stats load time to TUX for proper stats monitoring.
2961    * Passing this via DBDICT RT_START_MON is not feasible.  For MT-LQH
2962    * we prefer DbtuxProxy to avoid introducing MT-LQH into TRIX.
2963    */
2964 
2965 #if trix_index_stat_rep_to_tux_instance
2966   Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
2967   BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
2968 #else
2969   BlockReference tuxRef = DBTUX_REF;
2970 #endif
2971 
2972   IndexStatRep* rep = (IndexStatRep*)signal->getDataPtrSend();
2973   rep->senderRef = reference();
2974   rep->senderData = 0;
2975   rep->requestType = IndexStatRep::RT_UPDATE_CONF;
2976   rep->requestFlag = 0;
2977   rep->indexId = req->indexId;
2978   rep->indexVersion = req->indexVersion;
2979   rep->tableId = req->tableId;
2980   rep->fragId = req->fragId;
2981   rep->loadTime = data.m_loadTime;
2982   sendSignal(tuxRef, GSN_INDEX_STAT_REP,
2983              signal, IndexStatRep::SignalLength, JBB);
2984 
2985   statOpSuccess(signal, stat);
2986 }
2987 
2988 // drop
2989 
2990 void
statDropBegin(Signal * signal,StatOp & stat)2991 Trix::statDropBegin(Signal* signal, StatOp& stat)
2992 {
2993   StatOp::Data& data = stat.m_data;
2994   D("statDropBegin" << V(stat));
2995 
2996   if (data.m_head_found == true)
2997   {
2998     jam();
2999     statHeadDelete(signal, stat);
3000     return;
3001   }
3002   statDropEnd(signal, stat);
3003 }
3004 
3005 void
statDropEnd(Signal * signal,StatOp & stat)3006 Trix::statDropEnd(Signal* signal, StatOp& stat)
3007 {
3008   D("statDropEnd");
3009   statOpSuccess(signal, stat);
3010 }
3011 
3012 // send
3013 
3014 void
statSendPrepare(Signal * signal,StatOp & stat)3015 Trix::statSendPrepare(Signal* signal, StatOp& stat)
3016 {
3017   StatOp::Send& send = stat.m_send;
3018   const IndexStatImplReq* req = &stat.m_req;
3019   const SysTable& sysTable = *send.m_sysTable;
3020   D("statSendPrepare" << V(stat));
3021 
3022   UtilPrepareReq* utilReq =
3023     (UtilPrepareReq*)signal->getDataPtrSend();
3024   utilReq->senderData = stat.m_ownPtrI;
3025   utilReq->senderRef = reference();
3026   utilReq->schemaTransId = req->transId;
3027 
3028   Uint32 wbuf[256];
3029   LinearWriter w(&wbuf[0], sizeof(wbuf) >> 2);
3030 
3031   w.first();
3032   w.add(UtilPrepareReq::NoOfOperations, 1);
3033   w.add(UtilPrepareReq::OperationType, send.m_operationType);
3034   w.add(UtilPrepareReq::TableId, sysTable.tableId);
3035 
3036   Uint32 i;
3037   for (i = 0; i < sysTable.columnCount; i++) {
3038     const SysColumn& c = sysTable.columnList[i];
3039     switch (send.m_operationType) {
3040     case UtilPrepareReq::Read:
3041     case UtilPrepareReq::Insert:
3042     case UtilPrepareReq::Update:
3043       jam();
3044       w.add(UtilPrepareReq::AttributeId, i);
3045       break;
3046     case UtilPrepareReq::Delete:
3047       jam();
3048       if (c.keyFlag)
3049         w.add(UtilPrepareReq::AttributeId, i);
3050       break;
3051     default:
3052       ndbrequire(false);
3053       break;
3054     }
3055   }
3056 
3057   LinearSectionPtr ptr[3];
3058   ptr[0].p = &wbuf[0];
3059   ptr[0].sz = w.getWordsUsed();
3060   sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
3061              signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
3062 }
3063 
3064 void
statSendExecute(Signal * signal,StatOp & stat)3065 Trix::statSendExecute(Signal* signal, StatOp& stat)
3066 {
3067   D("statSendExecute" << V(stat));
3068   StatOp::Send& send = stat.m_send;
3069   StatOp::Attr& attr = stat.m_attr;
3070   const SysTable& sysTable = *send.m_sysTable;
3071 
3072   UtilExecuteReq* utilReq =
3073     (UtilExecuteReq*)signal->getDataPtrSend();
3074   utilReq->senderData = stat.m_ownPtrI;
3075   utilReq->senderRef = reference();
3076   utilReq->prepareId = send.m_prepareId;
3077   utilReq->scanTakeOver = 0;
3078 
3079   Uint32 wattr[20];
3080   Uint32 wdata[2048];
3081   attr.m_attr = wattr;
3082   attr.m_attrMax = 20;
3083   attr.m_attrSize = 0;
3084   attr.m_data = wdata;
3085   attr.m_dataMax = 2048;
3086   attr.m_dataSize = 0;
3087 
3088   for (Uint32 i = 0; i < sysTable.columnCount; i++) {
3089     const SysColumn& c = sysTable.columnList[i];
3090     switch (send.m_operationType) {
3091     case UtilPrepareReq::Read:
3092     case UtilPrepareReq::Insert:
3093     case UtilPrepareReq::Update:
3094       jam();
3095       statDataOut(stat, i);
3096       break;
3097     case UtilPrepareReq::Delete:
3098       jam();
3099       if (c.keyFlag)
3100         statDataOut(stat, i);
3101       break;
3102     default:
3103       ndbrequire(false);
3104       break;
3105     }
3106   }
3107 
3108   LinearSectionPtr ptr[3];
3109   ptr[0].p = attr.m_attr;
3110   ptr[0].sz = attr.m_attrSize;
3111   ptr[1].p = attr.m_data;
3112   ptr[1].sz = attr.m_dataSize;
3113   sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
3114              signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
3115 }
3116 
3117 void
statSendRelease(Signal * signal,StatOp & stat)3118 Trix::statSendRelease(Signal* signal, StatOp& stat)
3119 {
3120   D("statSendRelease" << V(stat));
3121   StatOp::Send& send = stat.m_send;
3122   ndbrequire(send.m_prepareId != RNIL);
3123 
3124   UtilReleaseReq* utilReq =
3125     (UtilReleaseReq*)signal->getDataPtrSend();
3126   utilReq->senderData = stat.m_ownPtrI;
3127   utilReq->prepareId = send.m_prepareId;
3128   sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
3129              signal, UtilReleaseReq::SignalLength, JBB);
3130 }
3131 
3132 // data
3133 
3134 void
statDataPtr(StatOp & stat,Uint32 i,Uint32 * & dptr,Uint32 & bytes)3135 Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
3136 {
3137   StatOp::Data& data = stat.m_data;
3138   StatOp::Send& send = stat.m_send;
3139 
3140   const SysTable& sysTable = *send.m_sysTable;
3141   ndbrequire(i < sysTable.columnCount);
3142   //UNUSED const SysColumn& c = sysTable.columnList[i];
3143 
3144   if (&sysTable == &g_statMetaHead)
3145   {
3146     switch (i) {
3147     case 0:
3148       dptr = &data.m_indexId;
3149       bytes = 4;
3150       break;
3151     case 1:
3152       dptr = &data.m_indexVersion;
3153       bytes = 4;
3154       break;
3155     case 2:
3156       dptr = &data.m_tableId;
3157       bytes = 4;
3158       break;
3159     case 3:
3160       dptr = &data.m_fragCount;
3161       bytes = 4;
3162       break;
3163     case 4:
3164       dptr = &data.m_valueFormat;
3165       bytes = 4;
3166       break;
3167     case 5:
3168       dptr = &data.m_sampleVersion;
3169       bytes = 4;
3170       break;
3171     case 6:
3172       dptr = &data.m_loadTime;
3173       bytes = 4;
3174       break;
3175     case 7:
3176       dptr = &data.m_sampleCount;
3177       bytes = 4;
3178       break;
3179     case 8:
3180       dptr = &data.m_keyBytes;
3181       bytes = 4;
3182       break;
3183     default:
3184       ndbrequire(false);
3185       break;
3186     }
3187     return;
3188   }
3189 
3190   if (&sysTable == &g_statMetaSample)
3191   {
3192     switch (i) {
3193     case 0:
3194       dptr = &data.m_indexId;
3195       bytes = 4;
3196       break;
3197     case 1:
3198       dptr = &data.m_indexVersion;
3199       bytes = 4;
3200       break;
3201     case 2:
3202       dptr = &data.m_sampleVersion;
3203       bytes = 4;
3204       break;
3205     case 3:
3206       {
3207         dptr = data.m_statKey;
3208         const uchar* p = (uchar*)dptr;
3209         ndbrequire(p != 0);
3210         bytes = 2 + p[0] + (p[1] << 8);
3211       }
3212       break;
3213     case 4:
3214       {
3215         dptr = data.m_statValue;
3216         const uchar* p = (uchar*)dptr;
3217         ndbrequire(p != 0);
3218         bytes = 2 + p[0] + (p[1] << 8);
3219       }
3220       break;
3221     default:
3222       ndbrequire(false);
3223       break;
3224     }
3225     return;
3226   }
3227 
3228   ndbrequire(false);
3229 }
3230 
3231 void
statDataOut(StatOp & stat,Uint32 i)3232 Trix::statDataOut(StatOp& stat, Uint32 i)
3233 {
3234   StatOp::Attr& attr = stat.m_attr;
3235   Uint32* dptr = 0;
3236   Uint32 bytes = 0;
3237   statDataPtr(stat, i, dptr, bytes);
3238 
3239   ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3240   AttributeHeader::init(&attr.m_attr[attr.m_attrSize], i, bytes);
3241   attr.m_attrSize++;
3242 
3243   Uint32 words = (bytes + 3) / 4;
3244   ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3245   Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
3246   memcpy(dst, dptr, bytes);
3247   while (bytes < words * 4)
3248     dst[bytes++] = 0;
3249   attr.m_dataSize += words;
3250   D("statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
3251 }
3252 
3253 void
statDataIn(StatOp & stat,Uint32 i)3254 Trix::statDataIn(StatOp& stat, Uint32 i)
3255 {
3256   StatOp::Attr& attr = stat.m_attr;
3257   Uint32* dptr = 0;
3258   Uint32 bytes = 0;
3259   statDataPtr(stat, i, dptr, bytes);
3260 
3261   ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3262   const AttributeHeader& ah = attr.m_attr[attr.m_attrSize];
3263   attr.m_attrSize++;
3264 
3265   ndbrequire(ah.getByteSize() == bytes);
3266   Uint32 words = (bytes + 3) / 4;
3267   ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3268   const char* src = (const char*)&attr.m_data[attr.m_dataSize];
3269   memcpy(dptr, src, bytes);
3270   attr.m_dataSize += words;
3271   D("statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
3272 }
3273 
3274 // abort ongoing
3275 
3276 void
statAbortUtil(Signal * signal,StatOp & stat)3277 Trix::statAbortUtil(Signal* signal, StatOp& stat)
3278 {
3279   StatOp::Util& util = stat.m_util;
3280   D("statAbortUtil" << V(stat));
3281 
3282   ndbrequire(util.m_prepareId != RNIL);
3283   util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
3284   util.m_cb.m_callbackData = stat.m_ownPtrI;
3285   statUtilRelease(signal, stat);
3286 }
3287 
3288 void
statAbortUtilCB(Signal * signal,Uint32 statPtrI,Uint32 ret)3289 Trix::statAbortUtilCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
3290 {
3291   StatOp& stat = statOpGetPtr(statPtrI);
3292   D("statAbortUtilCB" << V(stat) << V(ret));
3293 
3294   ndbrequire(ret == 0);
3295   statOpAbort(signal, stat);
3296 }
3297 
3298 // conf and ref
3299 
3300 void
statOpSuccess(Signal * signal,StatOp & stat)3301 Trix::statOpSuccess(Signal* signal, StatOp& stat)
3302 {
3303   StatOp::Data& data = stat.m_data;
3304   D("statOpSuccess" << V(stat));
3305 
3306   if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
3307     statOpEvent(stat, "I", "created %u samples", data.m_sampleCount);
3308 
3309   statOpConf(signal, stat);
3310   statOpRelease(stat);
3311 }
3312 
3313 void
statOpConf(Signal * signal,StatOp & stat)3314 Trix::statOpConf(Signal* signal, StatOp& stat)
3315 {
3316   const IndexStatImplReq* req = &stat.m_req;
3317   D("statOpConf" << V(stat));
3318 
3319   IndexStatImplConf* conf = (IndexStatImplConf*)signal->getDataPtrSend();
3320   conf->senderRef = reference();
3321   conf->senderData = req->senderData;
3322   sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
3323              signal, IndexStatImplConf::SignalLength, JBB);
3324 }
3325 
3326 void
statOpError(Signal * signal,StatOp & stat,Uint32 errorCode,Uint32 errorLine,const Uint32 * supress)3327 Trix::statOpError(Signal* signal, StatOp& stat,
3328                   Uint32 errorCode, Uint32 errorLine,
3329                   const Uint32 * supress)
3330 {
3331   D("statOpError" << V(stat) << V(errorCode) << V(errorLine));
3332 
3333   if (supress)
3334   {
3335     for (Uint32 i = 0; supress[i] != 0; i++)
3336     {
3337       if (errorCode == supress[i])
3338       {
3339         goto do_supress;
3340       }
3341     }
3342   }
3343   statOpEvent(stat, "W", "error %u line %u", errorCode, errorLine);
3344 
3345 do_supress:
3346   ndbrequire(stat.m_errorCode == 0);
3347   stat.m_errorCode = errorCode;
3348   stat.m_errorLine = errorLine;
3349   statOpAbort(signal, stat);
3350 }
3351 
3352 void
statOpAbort(Signal * signal,StatOp & stat)3353 Trix::statOpAbort(Signal* signal, StatOp& stat)
3354 {
3355   StatOp::Util& util = stat.m_util;
3356   D("statOpAbort" << V(stat));
3357 
3358   if (util.m_prepareId != RNIL)
3359   {
3360     jam();
3361     // returns here when done
3362     statAbortUtil(signal, stat);
3363     return;
3364   }
3365   statOpRef(signal, stat);
3366   statOpRelease(stat);
3367 }
3368 
3369 void
statOpRef(Signal * signal,StatOp & stat)3370 Trix::statOpRef(Signal* signal, StatOp& stat)
3371 {
3372   const IndexStatImplReq* req = &stat.m_req;
3373   D("statOpRef" << V(stat));
3374 
3375   statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
3376 }
3377 
3378 void
statOpRef(Signal * signal,const IndexStatImplReq * req,Uint32 errorCode,Uint32 errorLine)3379 Trix::statOpRef(Signal* signal, const IndexStatImplReq* req,
3380                 Uint32 errorCode, Uint32 errorLine)
3381 {
3382   D("statOpRef" << V(errorCode) << V(errorLine));
3383 
3384   IndexStatImplRef* ref = (IndexStatImplRef*)signal->getDataPtrSend();
3385   ref->senderRef = reference();
3386   ref->senderData = req->senderData;
3387   ref->errorCode = errorCode;
3388   ref->errorLine = errorLine;
3389   sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
3390              signal, IndexStatImplRef::SignalLength, JBB);
3391 }
3392 
3393 void
statOpEvent(StatOp & stat,const char * level,const char * msg,...)3394 Trix::statOpEvent(StatOp& stat, const char* level, const char* msg, ...)
3395 {
3396   //UNUSED const IndexStatImplReq* req = &stat.m_req;
3397   StatOp::Data& data = stat.m_data;
3398 
3399   char tmp1[100];
3400   va_list ap;
3401   va_start(ap, msg);
3402   BaseString::vsnprintf(tmp1, sizeof(tmp1), msg, ap);
3403   va_end(ap);
3404 
3405   char tmp2[100];
3406   BaseString::snprintf(tmp2, sizeof(tmp2),
3407                        "index %u stats version %u: %s: %s",
3408                        data.m_indexId, data.m_sampleVersion,
3409                        stat.m_requestName, tmp1);
3410 
3411   D("statOpEvent" << V(level) << V(tmp2));
3412 
3413   if (level[0] == 'I')
3414     infoEvent("%s", tmp2);
3415   if (level[0] == 'W')
3416     warningEvent("%s", tmp2);
3417 }
3418 
3419 // debug
3420 
3421 class NdbOut&
operator <<(NdbOut & out,const Trix::StatOp & stat)3422 operator<<(NdbOut& out, const Trix::StatOp& stat)
3423 {
3424   out << "[";
3425   out << " i:" << stat.m_ownPtrI;
3426   out << " head_found:" << stat.m_data.m_head_found;
3427   out << " ]";
3428   return out;
3429 }
3430 
3431 
3432 BLOCK_FUNCTIONS(Trix)
3433 
3434 template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);
3435