1 /*
2 Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "Trix.hpp"
26
27 #include <cstring>
28 #include <string.h>
29 #include <kernel_types.h>
30 #include <NdbOut.hpp>
31
32 #include <signaldata/ReadNodesConf.hpp>
33 #include <signaldata/NodeFailRep.hpp>
34 #include <signaldata/DumpStateOrd.hpp>
35 #include <signaldata/GetTabInfo.hpp>
36 #include <signaldata/DictTabInfo.hpp>
37 #include <signaldata/CopyData.hpp>
38 #include <signaldata/BuildIndxImpl.hpp>
39 #include <signaldata/BuildFKImpl.hpp>
40 #include <signaldata/SumaImpl.hpp>
41 #include <signaldata/UtilPrepare.hpp>
42 #include <signaldata/UtilExecute.hpp>
43 #include <signaldata/UtilRelease.hpp>
44 #include <SectionReader.hpp>
45 #include <AttributeHeader.hpp>
46 #include <signaldata/TcKeyReq.hpp>
47
48 #include <signaldata/DbinfoScan.hpp>
49 #include <signaldata/TransIdAI.hpp>
50 #include <signaldata/WaitGCP.hpp>
51
52 #define JAM_FILE_ID 433
53
54
55 #define CONSTRAINT_VIOLATION 893
56 #define TUPLE_NOT_FOUND 626
57 #define FK_NO_PARENT_ROW_EXISTS 21033
58
59 static
60 bool
check_timeout(Uint32 errCode)61 check_timeout(Uint32 errCode)
62 {
63 switch(errCode){
64 case 266:
65 return true;
66 }
67 return false;
68 }
69
70 #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
71
72 /**
73 *
74 */
Trix(Block_context & ctx)75 Trix::Trix(Block_context& ctx) :
76 SimulatedBlock(TRIX, ctx),
77 c_theNodes(c_theNodeRecPool),
78 c_masterNodeId(0),
79 c_masterTrixRef(0),
80 c_noNodesFailed(0),
81 c_noActiveNodes(0),
82 c_theSubscriptions(c_theSubscriptionRecPool)
83 {
84 BLOCK_CONSTRUCTOR(Trix);
85
86 // Add received signals
87 addRecSignal(GSN_READ_CONFIG_REQ, &Trix::execREAD_CONFIG_REQ);
88 addRecSignal(GSN_STTOR, &Trix::execSTTOR);
89 addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT
90 addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
91 addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
92 addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
93 addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
94 addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
95 addRecSignal(GSN_DBINFO_SCANREQ, &Trix::execDBINFO_SCANREQ);
96
97 // Index build
98 addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Trix::execBUILD_INDX_IMPL_REQ);
99 // Dump testing
100 addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &Trix::execBUILD_INDX_IMPL_CONF);
101 addRecSignal(GSN_BUILD_INDX_IMPL_REF, &Trix::execBUILD_INDX_IMPL_REF);
102
103 addRecSignal(GSN_COPY_DATA_IMPL_REQ, &Trix::execCOPY_DATA_IMPL_REQ);
104 addRecSignal(GSN_BUILD_FK_IMPL_REQ, &Trix::execBUILD_FK_IMPL_REQ);
105
106 addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
107 addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
108 addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
109 addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
110 addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
111 addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
112
113
114 // Suma signals
115 addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
116 addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
117 addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
118 addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
119 addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
120 addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
121 addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
122 addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
123
124 addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
125 addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
126
127 // index stats
128 addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
129 addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
130 addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
131
132 // index stats sys tables
133 c_statGetMetaDone = false;
134 }
135
136 /**
137 *
138 */
~Trix()139 Trix::~Trix()
140 {
141 }
142
143 void
execREAD_CONFIG_REQ(Signal * signal)144 Trix::execREAD_CONFIG_REQ(Signal* signal)
145 {
146 jamEntry();
147
148 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
149
150 Uint32 ref = req->senderRef;
151 Uint32 senderData = req->senderData;
152
153 const ndb_mgm_configuration_iterator * p =
154 m_ctx.m_config.getOwnConfigIterator();
155 ndbrequire(p != 0);
156
157 c_maxUIBuildBatchSize = 64;
158 ndb_mgm_get_int_parameter(p, CFG_DB_UI_BUILD_MAX_BATCHSIZE,
159 &c_maxUIBuildBatchSize);
160
161 c_maxFKBuildBatchSize = 64;
162 ndb_mgm_get_int_parameter(p, CFG_DB_FK_BUILD_MAX_BATCHSIZE,
163 &c_maxFKBuildBatchSize);
164
165 c_maxReorgBuildBatchSize = 64;
166 ndb_mgm_get_int_parameter(p, CFG_DB_REORG_BUILD_MAX_BATCHSIZE,
167 &c_maxReorgBuildBatchSize);
168
169 // Allocate pool sizes
170 c_theAttrOrderBufferPool.setSize(100);
171 c_theSubscriptionRecPool.setSize(100);
172 c_statOpPool.setSize(5);
173
174 SubscriptionRecord_list subscriptions(c_theSubscriptionRecPool);
175 SubscriptionRecPtr subptr;
176 while (subscriptions.seizeFirst(subptr) == true) {
177 new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
178 }
179 while (subscriptions.releaseFirst());
180
181 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
182 conf->senderRef = reference();
183 conf->senderData = senderData;
184 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
185 ReadConfigConf::SignalLength, JBB);
186 }
187
188 /**
189 *
190 */
execSTTOR(Signal * signal)191 void Trix::execSTTOR(Signal* signal)
192 {
193 jamEntry();
194
195 //const Uint32 startphase = signal->theData[1];
196 const Uint32 theSignalKey = signal->theData[6];
197
198 signal->theData[0] = theSignalKey;
199 signal->theData[3] = 1;
200 signal->theData[4] = 255; // No more start phases from missra
201 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
202 return;
203 }//Trix::execSTTOR()
204
205 /**
206 *
207 */
execNDB_STTOR(Signal * signal)208 void Trix::execNDB_STTOR(Signal* signal)
209 {
210 jamEntry();
211 BlockReference ndbcntrRef = signal->theData[0];
212 Uint16 startphase = signal->theData[2]; /* RESTART PHASE */
213 Uint16 mynode = signal->theData[1];
214 //Uint16 restarttype = signal->theData[3];
215 //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */
216 //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */
217 switch (startphase) {
218 case 3:
219 jam();
220 /* SYMBOLIC START PHASE 4 */
221 /* ABSOLUTE PHASE 5 */
222 /* REQUEST NODE IDENTITIES FROM DBDIH */
223 signal->theData[0] = calcTrixBlockRef(mynode);
224 sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
225 return;
226 break;
227 case 6:
228 break;
229 default:
230 break;
231 }
232 }
233
234 /**
235 *
236 */
execREAD_NODESCONF(Signal * signal)237 void Trix::execREAD_NODESCONF(Signal* signal)
238 {
239 jamEntry();
240
241 ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr();
242 //Uint32 noOfNodes = readNodes->noOfNodes;
243 NodeRecPtr nodeRecPtr;
244
245 {
246 ndbrequire(signal->getNoOfSections() == 1);
247 SegmentedSectionPtr ptr;
248 SectionHandle handle(this, signal);
249 handle.getSection(ptr, 0);
250 ndbrequire(ptr.sz == 5 * NdbNodeBitmask::Size);
251 copy(readNodes->definedNodes.rep.data, ptr);
252 releaseSections(handle);
253 }
254
255 c_masterNodeId = readNodes->masterNodeId;
256 c_masterTrixRef = RNIL;
257 c_noNodesFailed = 0;
258
259 for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
260 jam();
261 if (readNodes->definedNodes.get(i))
262 {
263 // Node is defined
264 jam();
265 ndbrequire(c_theNodes.getPool().seizeId(nodeRecPtr, i));
266 c_theNodes.addFirst(nodeRecPtr);
267 nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
268 if (i == c_masterNodeId) {
269 c_masterTrixRef = nodeRecPtr.p->trixRef;
270 }
271 if (readNodes->inactiveNodes.get(i))
272 {
273 // Node is not active
274 jam();
275 /**-----------------------------------------------------------------
276 * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
277 * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
278 * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
279 * NOT ALL NODES ARE ALIVE.
280 *------------------------------------------------------------------*/
281 arrGuard(c_noNodesFailed, MAX_NDB_NODES);
282 nodeRecPtr.p->alive = false;
283 c_noNodesFailed++;
284 c_blockState = Trix::NODE_FAILURE;
285 }
286 else {
287 // Node is active
288 jam();
289 c_noActiveNodes++;
290 nodeRecPtr.p->alive = true;
291 }
292 }
293 }
294 if (c_noNodesFailed == 0) {
295 c_blockState = Trix::STARTED;
296 }
297 }
298
299 /**
300 *
301 */
execREAD_NODESREF(Signal * signal)302 void Trix::execREAD_NODESREF(Signal* signal)
303 {
304 // NYI
305 }
306
307 /**
308 *
309 */
execNODE_FAILREP(Signal * signal)310 void Trix::execNODE_FAILREP(Signal* signal)
311 {
312 jamEntry();
313 NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr();
314
315 if(signal->getNoOfSections() >= 1)
316 {
317 ndbrequire(ndbd_send_node_bitmask_in_section(
318 getNodeInfo(refToNode(signal->getSendersBlockRef())).m_version));
319 SegmentedSectionPtr ptr;
320 SectionHandle handle(this, signal);
321 handle.getSection(ptr, 0);
322 memset(nodeFail->theNodes, 0, sizeof(nodeFail->theNodes));
323 copy(nodeFail->theNodes, ptr);
324 releaseSections(handle);
325 }
326 else
327 {
328 memset(nodeFail->theNodes + NdbNodeBitmask48::Size,
329 0,
330 _NDB_NBM_DIFF_BYTES);
331 }
332
333 //Uint32 failureNr = nodeFail->failNo;
334 //Uint32 numberNodes = nodeFail->noOfNodes;
335 Uint32 masterNodeId = nodeFail->masterNodeId;
336
337 NodeRecPtr nodeRecPtr;
338
339 for(c_theNodes.first(nodeRecPtr);
340 nodeRecPtr.i != RNIL;
341 c_theNodes.next(nodeRecPtr)) {
342 if(NdbNodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
343 nodeRecPtr.p->alive = false;
344 c_noNodesFailed++;
345 c_noActiveNodes--;
346 }
347 }
348 if (c_masterNodeId != masterNodeId) {
349 c_masterNodeId = masterNodeId;
350 NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
351 c_masterTrixRef = nodeRec->trixRef;
352 }
353 }
354
355 /**
356 *
357 */
execINCL_NODEREQ(Signal * signal)358 void Trix::execINCL_NODEREQ(Signal* signal)
359 {
360 jamEntry();
361 UintR node_id = signal->theData[1];
362 NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
363 nodeRec->alive = true;
364 c_noNodesFailed--;
365 c_noActiveNodes++;
366 nodeRec->trixRef = calcTrixBlockRef(node_id);
367 if (c_noNodesFailed == 0) {
368 c_blockState = Trix::STARTED;
369 }
370 }
371
372 // Debugging
373 void
execDUMP_STATE_ORD(Signal * signal)374 Trix::execDUMP_STATE_ORD(Signal* signal)
375 {
376 jamEntry();
377
378 DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
379
380 switch(dumpStateOrd->args[0]) {
381 case(300): {// ok
382 // index2 -T; index2 -I -n10000; index2 -c
383 // all dump 300 0 0 0 0 0 4 2
384 // select_count INDEX0000
385 std::memmove(signal->theData,
386 signal->theData + 1,
387 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
388 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
389
390 buildIndxReq->senderRef = reference(); // return to me
391 buildIndxReq->parallelism = 10;
392 Uint32 indexColumns[1] = {1};
393 Uint32 keyColumns[1] = {0};
394 struct LinearSectionPtr ls_ptr[3];
395 ls_ptr[0].p = indexColumns;
396 ls_ptr[0].sz = 1;
397 ls_ptr[1].p = keyColumns;
398 ls_ptr[1].sz = 1;
399 sendSignal(reference(),
400 GSN_BUILD_INDX_IMPL_REQ,
401 signal,
402 BuildIndxImplReq::SignalLength,
403 JBB, ls_ptr, 2);
404 break;
405 }
406 case(301): { // ok
407 // index2 -T; index2 -I -n10000; index2 -c -p
408 // all dump 301 0 0 0 0 0 4 2
409 // select_count INDEX0000
410 std::memmove(signal->theData,
411 signal->theData + 1,
412 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
413 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
414
415 buildIndxReq->senderRef = reference(); // return to me
416 buildIndxReq->parallelism = 10;
417 Uint32 indexColumns[2] = {0, 1};
418 Uint32 keyColumns[1] = {0};
419 struct LinearSectionPtr ls_ptr[3];
420 ls_ptr[0].p = indexColumns;
421 ls_ptr[0].sz = 2;
422 ls_ptr[1].p = keyColumns;
423 ls_ptr[1].sz = 1;
424 sendSignal(reference(),
425 GSN_BUILD_INDX_IMPL_REQ,
426 signal,
427 BuildIndxImplReq::SignalLength,
428 JBB, ls_ptr, 2);
429 break;
430 }
431 case(302): { // ok
432 // index -T; index -I -n1000; index -c -p
433 // all dump 302 0 0 0 0 0 4 2
434 // select_count PNUMINDEX0000
435 std::memmove(signal->theData,
436 signal->theData + 1,
437 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
438 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
439
440 buildIndxReq->senderRef = reference(); // return to me
441 buildIndxReq->parallelism = 10;
442 Uint32 indexColumns[3] = {0, 3, 5};
443 Uint32 keyColumns[1] = {0};
444 struct LinearSectionPtr ls_ptr[3];
445 ls_ptr[0].p = indexColumns;
446 ls_ptr[0].sz = 3;
447 ls_ptr[1].p = keyColumns;
448 ls_ptr[1].sz = 1;
449 sendSignal(reference(),
450 GSN_BUILD_INDX_IMPL_REQ,
451 signal,
452 BuildIndxImplReq::SignalLength,
453 JBB, ls_ptr, 2);
454 break;
455 }
456 case(303): { // ok
457 // index -T -2; index -I -2 -n1000; index -c -p
458 // all dump 303 0 0 0 0 0 4 2
459 // select_count PNUMINDEX0000
460 std::memmove(signal->theData,
461 signal->theData + 1,
462 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
463 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
464
465 buildIndxReq->senderRef = reference(); // return to me
466 buildIndxReq->parallelism = 10;
467 Uint32 indexColumns[3] = {0, 3, 5};
468 Uint32 keyColumns[2] = {0, 1};
469 struct LinearSectionPtr ls_ptr[3];
470 ls_ptr[0].p = indexColumns;
471 ls_ptr[0].sz = 3;
472 ls_ptr[1].p = keyColumns;
473 ls_ptr[1].sz = 2;
474 sendSignal(reference(),
475 GSN_BUILD_INDX_IMPL_REQ,
476 signal,
477 BuildIndxImplReq::SignalLength,
478 JBB, ls_ptr, 2);
479 break;
480 }
481 case(304): { // ok
482 // index -T -L; index -I -L -n1000; index -c -p
483 // all dump 304 0 0 0 0 0 4 2
484 // select_count PNUMINDEX0000
485 std::memmove(signal->theData,
486 signal->theData + 1,
487 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
488 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
489
490 buildIndxReq->senderRef = reference(); // return to me
491 buildIndxReq->parallelism = 10;
492 Uint32 indexColumns[3] = {0, 3, 5};
493 Uint32 keyColumns[1] = {0};
494 struct LinearSectionPtr ls_ptr[3];
495 ls_ptr[0].p = indexColumns;
496 ls_ptr[0].sz = 3;
497 ls_ptr[1].p = keyColumns;
498 ls_ptr[1].sz = 1;
499 sendSignal(reference(),
500 GSN_BUILD_INDX_IMPL_REQ,
501 signal,
502 BuildIndxImplReq::SignalLength,
503 JBB, ls_ptr, 2);
504 break;
505 }
506 case(305): { // ok
507 // index -T -2 -L; index -I -2 -L -n1000; index -c -p
508 // all dump 305 0 0 0 0 0 4 2
509 // select_count PNUMINDEX0000
510 std::memmove(signal->theData,
511 signal->theData + 1,
512 BuildIndxImplReq::SignalLength * sizeof(signal->theData[0]));
513 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
514
515 buildIndxReq->senderRef = reference(); // return to me
516 buildIndxReq->parallelism = 10;
517 Uint32 indexColumns[3] = {0, 3, 5};
518 Uint32 keyColumns[2] = {0, 1};
519 struct LinearSectionPtr ls_ptr[3];
520 ls_ptr[0].p = indexColumns;
521 ls_ptr[0].sz = 3;
522 ls_ptr[1].p = keyColumns;
523 ls_ptr[1].sz = 2;
524 sendSignal(reference(),
525 GSN_BUILD_INDX_IMPL_REQ,
526 signal,
527 BuildIndxImplReq::SignalLength,
528 JBB, ls_ptr, 2);
529 break;
530 }
531 default: {
532 // Ignore
533 }
534 }
535
536 if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
537 {
538 RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
539 RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
540 return;
541 }
542
543 if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
544 {
545 RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
546 RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
547 return;
548 }
549
550 if (signal->theData[0] == 8004)
551 {
552 infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
553 c_theSubscriptionRecPool.getSize(),
554 c_theSubscriptionRecPool.getNoOfFree());
555 return;
556 }
557
558 }
559
execDBINFO_SCANREQ(Signal * signal)560 void Trix::execDBINFO_SCANREQ(Signal *signal)
561 {
562 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
563 const Ndbinfo::ScanCursor* cursor =
564 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
565 Ndbinfo::Ratelimit rl;
566
567 jamEntry();
568
569 switch(req.tableId){
570 case Ndbinfo::POOLS_TABLEID:
571 {
572 Ndbinfo::pool_entry pools[] =
573 {
574 { "Attribute Order Buffer",
575 c_theAttrOrderBufferPool.getUsed(),
576 c_theAttrOrderBufferPool.getSize(),
577 c_theAttrOrderBufferPool.getEntrySize(),
578 c_theAttrOrderBufferPool.getUsedHi(),
579 { 0,0,0,0 },
580 0},
581 { "Subscription Record",
582 c_theSubscriptionRecPool.getUsed(),
583 c_theSubscriptionRecPool.getSize(),
584 c_theSubscriptionRecPool.getEntrySize(),
585 c_theSubscriptionRecPool.getUsedHi(),
586 { 0,0,0,0 },
587 0},
588 { NULL, 0,0,0,0,{0,0,0,0},0}
589 };
590
591 const size_t num_config_params =
592 sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
593 Uint32 pool = cursor->data[0];
594 BlockNumber bn = blockToMain(number());
595 while(pools[pool].poolname)
596 {
597 jam();
598 Ndbinfo::Row row(signal, req);
599 row.write_uint32(getOwnNodeId());
600 row.write_uint32(bn); // block number
601 row.write_uint32(instance()); // block instance
602 row.write_string(pools[pool].poolname);
603 row.write_uint64(pools[pool].used);
604 row.write_uint64(pools[pool].total);
605 row.write_uint64(pools[pool].used_hi);
606 row.write_uint64(pools[pool].entry_size);
607 for (size_t i = 0; i < num_config_params; i++)
608 row.write_uint32(pools[pool].config_params[i]);
609 row.write_uint32(GET_RG(pools[pool].record_type));
610 row.write_uint32(GET_TID(pools[pool].record_type));
611 ndbinfo_send_row(signal, req, row, rl);
612 pool++;
613 if (rl.need_break(req))
614 {
615 jam();
616 ndbinfo_send_scan_break(signal, req, rl, pool);
617 return;
618 }
619 }
620 break;
621 }
622 default:
623 break;
624 }
625
626 ndbinfo_send_scan_conf(signal, req, rl);
627 }
628
629 // Build index
execBUILD_INDX_IMPL_REQ(Signal * signal)630 void Trix:: execBUILD_INDX_IMPL_REQ(Signal* signal)
631 {
632 jamEntry();
633 DBUG_ENTER("Trix:: execBUILD_INDX_IMPL_REQ");
634
635 const BuildIndxImplReq
636 buildIndxReqData = *(const BuildIndxImplReq*)signal->getDataPtr(),
637 *buildIndxReq = &buildIndxReqData;
638
639 // Seize a subscription record
640 SubscriptionRecPtr subRecPtr;
641 SubscriptionRecord* subRec;
642 SectionHandle handle(this, signal);
643
644 if (ERROR_INSERTED(18000) ||
645 ERROR_INSERTED(18003))
646 {
647 const Uint32 delay = (ERROR_INSERTED(18003)? 20000 : 1000);
648 CLEAR_ERROR_INSERT_VALUE;
649 sendSignalWithDelay(reference(), GSN_BUILD_INDX_IMPL_REQ, signal, delay,
650 signal->getLength(), &handle);
651 DBUG_VOID_RETURN;
652 }
653
654 if (!c_theSubscriptions.getPool().seizeId(subRecPtr, buildIndxReq->buildId)) {
655 jam();
656 // Failed to allocate subscription record
657 BuildIndxRef* buildIndxRef = (BuildIndxRef*)signal->getDataPtrSend();
658
659 buildIndxRef->errorCode = BuildIndxRef::AllocationFailure;
660 releaseSections(handle);
661 sendSignal(buildIndxReq->senderRef, GSN_BUILD_INDX_IMPL_REF, signal,
662 BuildIndxRef::SignalLength, JBB);
663 DBUG_VOID_RETURN;
664 }
665 c_theSubscriptions.addFirst(subRecPtr);
666
667 subRec = subRecPtr.p;
668 subRec->errorCode = BuildIndxRef::NoError;
669 subRec->userReference = buildIndxReq->senderRef;
670 subRec->connectionPtr = buildIndxReq->senderData;
671 subRec->schemaTransId = buildIndxReq->transId;
672 subRec->subscriptionId = buildIndxReq->buildId;
673 subRec->subscriptionKey = buildIndxReq->buildKey;
674 subRec->indexType = buildIndxReq->indexType;
675 subRec->sourceTableId = buildIndxReq->tableId;
676 subRec->targetTableId = buildIndxReq->indexId;
677 subRec->parallelism = c_maxUIBuildBatchSize;
678 subRec->expectedConf = 0;
679 subRec->subscriptionCreated = false;
680 subRec->pendingSubSyncContinueConf = false;
681 subRec->prepareId = RNIL;
682 subRec->requestType = INDEX_BUILD;
683 subRec->fragCount = 0;
684 subRec->fragId = ZNIL;
685 subRec->m_rows_processed = 0;
686 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
687 subRec->m_gci = 0;
688 if (buildIndxReq->requestType & BuildIndxImplReq::RF_NO_DISK)
689 {
690 subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
691 }
692
693 // Get column order segments
694 Uint32 noOfSections = handle.m_cnt;
695 if (noOfSections > 0) {
696 jam();
697 SegmentedSectionPtr ptr;
698 handle.getSection(ptr, BuildIndxImplReq::INDEX_COLUMNS);
699 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
700 subRec->noOfIndexColumns = ptr.sz;
701 }
702 if (noOfSections > 1) {
703 jam();
704 SegmentedSectionPtr ptr;
705 handle.getSection(ptr, BuildIndxImplReq::KEY_COLUMNS);
706 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
707 subRec->noOfKeyColumns = ptr.sz;
708 }
709
710 #if 0
711 // Debugging
712 printf("Trix:: execBUILD_INDX_IMPL_REQ: Attribute order:\n");
713 subRec->attributeOrder.print(stdout);
714 #endif
715
716 releaseSections(handle);
717 prepareInsertTransactions(signal, subRecPtr);
718 DBUG_VOID_RETURN;
719 }
720
execBUILD_INDX_IMPL_CONF(Signal * signal)721 void Trix:: execBUILD_INDX_IMPL_CONF(Signal* signal)
722 {
723 printf("Trix:: execBUILD_INDX_IMPL_CONF\n");
724 }
725
execBUILD_INDX_IMPL_REF(Signal * signal)726 void Trix:: execBUILD_INDX_IMPL_REF(Signal* signal)
727 {
728 printf("Trix:: execBUILD_INDX_IMPL_REF\n");
729 }
730
execUTIL_PREPARE_CONF(Signal * signal)731 void Trix::execUTIL_PREPARE_CONF(Signal* signal)
732 {
733 jamEntry();
734 UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
735 SubscriptionRecPtr subRecPtr;
736 SubscriptionRecord* subRec;
737
738 subRecPtr.i = utilPrepareConf->senderData;
739 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
740 printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
741 return;
742 }
743 if (subRec->requestType == STAT_UTIL)
744 {
745 statUtilPrepareConf(signal, subRec->m_statPtrI);
746 return;
747 }
748 subRecPtr.p = subRec;
749 subRec->prepareId = utilPrepareConf->prepareId;
750 setupSubscription(signal, subRecPtr);
751 }
752
execUTIL_PREPARE_REF(Signal * signal)753 void Trix::execUTIL_PREPARE_REF(Signal* signal)
754 {
755 jamEntry();
756 UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
757 SubscriptionRecPtr subRecPtr;
758 SubscriptionRecord* subRec;
759
760 subRecPtr.i = utilPrepareRef->senderData;
761 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
762 printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
763 return;
764 }
765 if (subRec->requestType == STAT_UTIL)
766 {
767 statUtilPrepareRef(signal, subRec->m_statPtrI);
768 return;
769 }
770 subRecPtr.p = subRec;
771 subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
772 switch (utilPrepareRef->errorCode) {
773 case UtilPrepareRef::PREPARE_SEIZE_ERROR:
774 case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
775 case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
776 case UtilPrepareRef::DICT_TAB_INFO_ERROR:
777 subRec->errorCode = BuildIndxRef::UtilBusy;
778 break;
779 case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
780 subRec->errorCode = BuildIndxRef::BadRequestType;
781 break;
782 default:
783 ndbabort();
784 }
785
786 UtilReleaseConf* conf = (UtilReleaseConf*)signal->getDataPtrSend();
787 conf->senderData = subRecPtr.i;
788 execUTIL_RELEASE_CONF(signal);
789 }
790
execUTIL_EXECUTE_CONF(Signal * signal)791 void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
792 {
793 jamEntry();
794 UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
795 SubscriptionRecPtr subRecPtr;
796 SubscriptionRecord* subRec;
797
798 const Uint32 gci_hi = utilExecuteConf->gci_hi;
799 const Uint32 gci_lo = utilExecuteConf->gci_lo;
800 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
801
802 subRecPtr.i = utilExecuteConf->senderData;
803 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
804 printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
805 return;
806 }
807 if (subRec->requestType == STAT_UTIL)
808 {
809 statUtilExecuteConf(signal, subRec->m_statPtrI);
810 return;
811 }
812 subRecPtr.p = subRec;
813 subRec->expectedConf--;
814
815 if (gci > subRecPtr.p->m_gci)
816 {
817 jam();
818 subRecPtr.p->m_gci = gci;
819 }
820
821 checkParallelism(signal, subRec);
822 if (subRec->expectedConf == 0)
823 {
824 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
825 {
826 jam();
827 wait_gcp(signal, subRecPtr);
828 return;
829 }
830 buildComplete(signal, subRecPtr);
831 }
832 }
833
execUTIL_EXECUTE_REF(Signal * signal)834 void Trix::execUTIL_EXECUTE_REF(Signal* signal)
835 {
836 jamEntry();
837 UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
838 SubscriptionRecPtr subRecPtr;
839 SubscriptionRecord* subRec;
840
841 subRecPtr.i = utilExecuteRef->senderData;
842 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
843 printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
844 return;
845 }
846 if (subRec->requestType == STAT_UTIL)
847 {
848 statUtilExecuteRef(signal, subRec->m_statPtrI);
849 return;
850 }
851 subRecPtr.p = subRec;
852 ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
853 if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
854 {
855 jam();
856 buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
857 }
858 else if (check_timeout(utilExecuteRef->TCErrorCode))
859 {
860 jam();
861 buildFailed(signal, subRecPtr, BuildIndxRef::DeadlockError);
862 }
863 else if (subRec->requestType == FK_BUILD &&
864 utilExecuteRef->TCErrorCode == TUPLE_NOT_FOUND)
865 {
866 jam();
867 buildFailed(signal, subRecPtr,
868 (BuildIndxRef::ErrorCode)FK_NO_PARENT_ROW_EXISTS);
869 }
870 else
871 {
872 jam();
873 buildFailed(signal, subRecPtr,
874 (BuildIndxRef::ErrorCode)utilExecuteRef->TCErrorCode);
875 }
876 }
877
execSUB_CREATE_CONF(Signal * signal)878 void Trix::execSUB_CREATE_CONF(Signal* signal)
879 {
880 jamEntry();
881 DBUG_ENTER("Trix::execSUB_CREATE_CONF");
882 SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
883 SubscriptionRecPtr subRecPtr;
884 SubscriptionRecord* subRec;
885
886 subRecPtr.i = subCreateConf->senderData;
887 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
888 printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
889 DBUG_VOID_RETURN;
890 }
891 subRec->subscriptionCreated = true;
892 subRecPtr.p = subRec;
893
894 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
895 subRecPtr.i, subRecPtr.p->subscriptionId,
896 subRecPtr.p->subscriptionKey));
897
898 startTableScan(signal, subRecPtr);
899 DBUG_VOID_RETURN;
900 }
901
execSUB_CREATE_REF(Signal * signal)902 void Trix::execSUB_CREATE_REF(Signal* signal)
903 {
904 jamEntry();
905 DBUG_ENTER("Trix::execSUB_CREATE_REF");
906
907 SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
908 SubscriptionRecPtr subRecPtr;
909 SubscriptionRecord* subRec;
910
911 subRecPtr.i = subCreateRef->senderData;
912 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL)
913 {
914 printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
915 return;
916 }
917 subRecPtr.p = subRec;
918 subRecPtr.p->errorCode = (BuildIndxRef::ErrorCode)subCreateRef->errorCode;
919
920 UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
921 req->prepareId = subRecPtr.p->prepareId;
922 req->senderData = subRecPtr.i;
923
924 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
925 UtilReleaseReq::SignalLength, JBB);
926
927 DBUG_VOID_RETURN;
928 }
929
execSUB_SYNC_CONF(Signal * signal)930 void Trix::execSUB_SYNC_CONF(Signal* signal)
931 {
932 jamEntry();
933 DBUG_ENTER("Trix::execSUB_SYNC_CONF");
934 SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
935 SubscriptionRecPtr subRecPtr;
936 SubscriptionRecord* subRec;
937
938 subRecPtr.i = subSyncConf->senderData;
939 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
940 printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
941 subRecPtr.i);
942 DBUG_VOID_RETURN;
943 }
944
945 subRecPtr.p = subRec;
946 subRec->expectedConf--;
947 checkParallelism(signal, subRec);
948 if (subRec->expectedConf == 0)
949 {
950 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
951 {
952 jam();
953 wait_gcp(signal, subRecPtr);
954 DBUG_VOID_RETURN;
955 }
956 buildComplete(signal, subRecPtr);
957 }
958 DBUG_VOID_RETURN;
959 }
960
execSUB_SYNC_REF(Signal * signal)961 void Trix::execSUB_SYNC_REF(Signal* signal)
962 {
963 jamEntry();
964 DBUG_ENTER("Trix::execSUB_SYNC_REF");
965 SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
966 SubscriptionRecPtr subRecPtr;
967 SubscriptionRecord* subRec;
968
969 subRecPtr.i = subSyncRef->senderData;
970 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
971 printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
972 DBUG_VOID_RETURN;
973 }
974 subRecPtr.p = subRec;
975 buildFailed(signal, subRecPtr,
976 (BuildIndxRef::ErrorCode)subSyncRef->errorCode);
977 DBUG_VOID_RETURN;
978 }
979
execSUB_SYNC_CONTINUE_REQ(Signal * signal)980 void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
981 {
982 SubSyncContinueReq * subSyncContinueReq =
983 (SubSyncContinueReq *) signal->getDataPtr();
984
985 SubscriptionRecPtr subRecPtr;
986 SubscriptionRecord* subRec;
987 subRecPtr.i = subSyncContinueReq->subscriberData;
988 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
989 printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
990 return;
991 }
992 subRecPtr.p = subRec;
993 subRec->pendingSubSyncContinueConf = true;
994 subRec->syncPtr = subSyncContinueReq->senderData;
995 checkParallelism(signal, subRec);
996 }
997
execSUB_TABLE_DATA(Signal * signal)998 void Trix::execSUB_TABLE_DATA(Signal* signal)
999 {
1000 jamEntry();
1001 DBUG_ENTER("Trix::execSUB_TABLE_DATA");
1002 SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
1003 SubscriptionRecPtr subRecPtr;
1004 SubscriptionRecord* subRec;
1005 subRecPtr.i = subTableData->senderData;
1006 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
1007 printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
1008 DBUG_VOID_RETURN;
1009 }
1010 subRecPtr.p = subRec;
1011 switch(subRecPtr.p->requestType){
1012 case INDEX_BUILD:
1013 executeBuildInsertTransaction(signal, subRecPtr);
1014 break;
1015 case REORG_COPY:
1016 case REORG_DELETE:
1017 executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
1018 break;
1019 case FK_BUILD:
1020 executeBuildFKTransaction(signal, subRecPtr);
1021 break;
1022 case STAT_UTIL:
1023 ndbabort();
1024 case STAT_CLEAN:
1025 {
1026 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1027 statCleanExecute(signal, stat);
1028 }
1029 break;
1030 case STAT_SCAN:
1031 {
1032 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1033 statScanExecute(signal, stat);
1034 }
1035 break;
1036 }
1037
1038 subRecPtr.p->m_rows_processed++;
1039
1040 DBUG_VOID_RETURN;
1041 }
1042
setupSubscription(Signal * signal,SubscriptionRecPtr subRecPtr)1043 void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
1044 {
1045 jam();
1046 DBUG_ENTER("Trix::setupSubscription");
1047 SubscriptionRecord* subRec = subRecPtr.p;
1048 SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
1049 // Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
1050 subCreateReq->senderRef = reference();
1051 subCreateReq->senderData = subRecPtr.i;
1052 subCreateReq->subscriptionId = subRec->subscriptionId;
1053 subCreateReq->subscriptionKey = subRec->subscriptionKey;
1054 subCreateReq->tableId = subRec->sourceTableId;
1055 subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
1056 subCreateReq->schemaTransId = subRec->schemaTransId;
1057
1058 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
1059 subRecPtr.i, subCreateReq->subscriptionId,
1060 subCreateReq->subscriptionKey));
1061
1062 D("SUB_CREATE_REQ tableId: " << subRec->sourceTableId);
1063
1064 sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
1065 signal, SubCreateReq::SignalLength, JBB);
1066
1067 DBUG_VOID_RETURN;
1068 }
1069
startTableScan(Signal * signal,SubscriptionRecPtr subRecPtr)1070 void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
1071 {
1072 jam();
1073
1074 Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
1075 SubscriptionRecord* subRec = subRecPtr.p;
1076 AttrOrderBuffer::DataBufferIterator iter;
1077
1078 Uint32 cnt = 0;
1079 bool moreAttributes = subRec->attributeOrder.first(iter);
1080 if (subRec->requestType == FK_BUILD)
1081 {
1082 jam();
1083 // skip over key columns
1084 ndbrequire(subRec->attributeOrder.position(iter, subRec->noOfKeyColumns));
1085 }
1086
1087 while (moreAttributes) {
1088 attributeList[cnt++] = *iter.data;
1089 moreAttributes = subRec->attributeOrder.next(iter);
1090 }
1091
1092 // Merge index and key column segments
1093 LinearSectionPtr orderPtr[3];
1094 Uint32 noOfSections;
1095 orderPtr[0].p = attributeList;
1096 orderPtr[0].sz = cnt;
1097 noOfSections = 1;
1098
1099 SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
1100 subSyncReq->senderRef = reference();
1101 subSyncReq->senderData = subRecPtr.i;
1102 subSyncReq->subscriptionId = subRec->subscriptionId;
1103 subSyncReq->subscriptionKey = subRec->subscriptionKey;
1104 subSyncReq->part = SubscriptionData::TableData;
1105 subSyncReq->requestInfo = 0;
1106 subSyncReq->fragCount = subRec->fragCount;
1107 subSyncReq->fragId = subRec->fragId;
1108 subSyncReq->batchSize = subRec->parallelism;
1109
1110 if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
1111 {
1112 jam();
1113 subSyncReq->requestInfo |= SubSyncReq::NoDisk;
1114 }
1115
1116 if (subRec->m_flags & SubscriptionRecord::RF_TUP_ORDER)
1117 {
1118 jam();
1119 subSyncReq->requestInfo |= SubSyncReq::TupOrder;
1120 }
1121
1122 if (subRec->requestType == REORG_COPY)
1123 {
1124 jam();
1125 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1126 }
1127 else if (subRec->requestType == REORG_DELETE)
1128 {
1129 jam();
1130 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1131 subSyncReq->requestInfo |= SubSyncReq::ReorgDelete;
1132 }
1133 else if (subRec->requestType == STAT_CLEAN)
1134 {
1135 jam();
1136 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1137 StatOp::Clean& clean = stat.m_clean;
1138 orderPtr[1].p = clean.m_bound;
1139 orderPtr[1].sz = clean.m_boundSize;
1140 noOfSections = 2;
1141 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1142 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1143 }
1144 else if (subRec->requestType == STAT_SCAN)
1145 {
1146 jam();
1147 orderPtr[1].p = 0;
1148 orderPtr[1].sz = 0;
1149 noOfSections = 2;
1150 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1151 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1152 subSyncReq->requestInfo |= SubSyncReq::StatScan;
1153 }
1154 subRecPtr.p->expectedConf = 1;
1155
1156 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
1157 subRecPtr.i, subSyncReq->subscriptionId,
1158 subSyncReq->subscriptionKey));
1159
1160 D("SUB_SYNC_REQ fragId: " << subRec->fragId <<
1161 " fragCount: " << subRec->fragCount <<
1162 " requestInfo: " << hex << subSyncReq->requestInfo);
1163
1164 sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
1165 signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
1166 }
1167
prepareInsertTransactions(Signal * signal,SubscriptionRecPtr subRecPtr)1168 void Trix::prepareInsertTransactions(Signal* signal,
1169 SubscriptionRecPtr subRecPtr)
1170 {
1171 SubscriptionRecord* subRec = subRecPtr.p;
1172 UtilPrepareReq * utilPrepareReq =
1173 (UtilPrepareReq *)signal->getDataPtrSend();
1174
1175 jam();
1176 utilPrepareReq->senderRef = reference();
1177 utilPrepareReq->senderData = subRecPtr.i;
1178 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1179
1180 const Uint32 pageSizeInWords = 128;
1181 Uint32 propPage[pageSizeInWords];
1182 LinearWriter w(&propPage[0],128);
1183 w.first();
1184 w.add(UtilPrepareReq::NoOfOperations, 1);
1185 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1186 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1187 // Add index attributes in increasing order and one PK attribute
1188 for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
1189 w.add(UtilPrepareReq::AttributeId, i);
1190
1191 #if 0
1192 // Debugging
1193 SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
1194 printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
1195 reader.printAll(ndbout);
1196 #endif
1197
1198 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1199 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1200 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1201 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1202 UtilPrepareReq::SignalLength, JBB,
1203 sectionsPtr, UtilPrepareReq::NoOfSections);
1204 }
1205
executeBuildInsertTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1206 void Trix::executeBuildInsertTransaction(Signal* signal,
1207 SubscriptionRecPtr subRecPtr)
1208 {
1209 jam();
1210 SubscriptionRecord* subRec = subRecPtr.p;
1211 UtilExecuteReq * utilExecuteReq =
1212 (UtilExecuteReq *)signal->getDataPtrSend();
1213
1214 utilExecuteReq->senderRef = reference();
1215 utilExecuteReq->senderData = subRecPtr.i;
1216 utilExecuteReq->prepareId = subRec->prepareId;
1217 #if 0
1218 printf("Header size %u\n", headerPtr.sz);
1219 for(int i = 0; i < headerPtr.sz; i++)
1220 printf("H'%.8x ", headerBuffer[i]);
1221 printf("\n");
1222
1223 printf("Data size %u\n", dataPtr.sz);
1224 for(int i = 0; i < dataPtr.sz; i++)
1225 printf("H'%.8x ", dataBuffer[i]);
1226 printf("\n");
1227 #endif
1228 // Save scan result in linear buffers
1229 SectionHandle handle(this, signal);
1230 SegmentedSectionPtr headerPtr, dataPtr;
1231
1232 handle.getSection(headerPtr, 0);
1233 handle.getSection(dataPtr, 1);
1234
1235 Uint32* headerBuffer = signal->theData + 25;
1236 Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1237
1238 copy(headerBuffer, headerPtr);
1239 copy(dataBuffer, dataPtr);
1240 releaseSections(handle);
1241
1242 // Calculate packed key size
1243 Uint32 noOfKeyData = 0;
1244 for(Uint32 i = 0; i < headerPtr.sz; i++) {
1245 AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1246
1247 // Filter out NULL attributes
1248 if (keyAttrHead->isNULL())
1249 return;
1250
1251 if (i < subRec->noOfIndexColumns)
1252 // Renumber index attributes in consequtive order
1253 keyAttrHead->setAttributeId(i);
1254 else
1255 // Calculate total size of PK attribute
1256 noOfKeyData += keyAttrHead->getDataSize();
1257 }
1258 // Increase expected CONF count
1259 subRec->expectedConf++;
1260
1261 // Pack key attributes
1262 AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
1263 subRec->noOfIndexColumns,
1264 noOfKeyData << 2);
1265
1266 struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1267 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1268 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
1269 subRec->noOfIndexColumns + 1;
1270 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1271 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1272 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1273 UtilExecuteReq::SignalLength, JBB,
1274 sectionsPtr, UtilExecuteReq::NoOfSections);
1275 }
1276
executeReorgTransaction(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 takeOver)1277 void Trix::executeReorgTransaction(Signal* signal,
1278 SubscriptionRecPtr subRecPtr,
1279 Uint32 takeOver)
1280 {
1281 jam();
1282 SubscriptionRecord* subRec = subRecPtr.p;
1283 UtilExecuteReq * utilExecuteReq =
1284 (UtilExecuteReq *)signal->getDataPtrSend();
1285
1286 const Uint32 tScanInfo = takeOver & 0x3FFFF;
1287 const Uint32 tTakeOverFragment = takeOver >> 20;
1288 {
1289 UintR scanInfo = 0;
1290 TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1291 TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1292 TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1293 utilExecuteReq->scanTakeOver = scanInfo;
1294 }
1295
1296 utilExecuteReq->senderRef = reference();
1297 utilExecuteReq->senderData = subRecPtr.i;
1298 utilExecuteReq->prepareId = subRec->prepareId;
1299 #if 0
1300 printf("Header size %u\n", headerPtr.sz);
1301 for(int i = 0; i < headerPtr.sz; i++)
1302 printf("H'%.8x ", headerBuffer[i]);
1303 printf("\n");
1304
1305 printf("Data size %u\n", dataPtr.sz);
1306 for(int i = 0; i < dataPtr.sz; i++)
1307 printf("H'%.8x ", dataBuffer[i]);
1308 printf("\n");
1309 #endif
1310 // Increase expected CONF count
1311 subRec->expectedConf++;
1312
1313 SectionHandle handle(this, signal);
1314 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1315 UtilExecuteReq::SignalLength, JBB,
1316 &handle);
1317 }
1318
1319 void
wait_gcp(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 delay)1320 Trix::wait_gcp(Signal* signal, SubscriptionRecPtr subRecPtr, Uint32 delay)
1321 {
1322 WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
1323 req->senderRef = reference();
1324 req->senderData = subRecPtr.i;
1325 req->requestType = WaitGCPReq::CurrentGCI;
1326
1327 if (delay == 0)
1328 {
1329 jam();
1330 sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1331 WaitGCPReq::SignalLength, JBB);
1332 }
1333 else
1334 {
1335 jam();
1336 sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1337 delay, WaitGCPReq::SignalLength);
1338 }
1339 }
1340
1341 void
execWAIT_GCP_REF(Signal * signal)1342 Trix::execWAIT_GCP_REF(Signal* signal)
1343 {
1344 WaitGCPRef ref = *(WaitGCPRef*)signal->getDataPtr();
1345
1346 SubscriptionRecPtr subRecPtr;
1347 c_theSubscriptions.getPtr(subRecPtr, ref.senderData);
1348 wait_gcp(signal, subRecPtr, 100);
1349 }
1350
1351 void
execWAIT_GCP_CONF(Signal * signal)1352 Trix::execWAIT_GCP_CONF(Signal* signal)
1353 {
1354 WaitGCPConf * conf = (WaitGCPConf*)signal->getDataPtr();
1355
1356 SubscriptionRecPtr subRecPtr;
1357 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1358
1359 const Uint32 gci_hi = conf->gci_hi;
1360 const Uint32 gci_lo = conf->gci_lo;
1361 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
1362
1363 if (gci > subRecPtr.p->m_gci)
1364 {
1365 jam();
1366 buildComplete(signal, subRecPtr);
1367 }
1368 else
1369 {
1370 jam();
1371 wait_gcp(signal, subRecPtr, 100);
1372 }
1373 }
1374
buildComplete(Signal * signal,SubscriptionRecPtr subRecPtr)1375 void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
1376 {
1377 SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
1378 req->senderRef = reference();
1379 req->senderData = subRecPtr.i;
1380 req->subscriptionId = subRecPtr.p->subscriptionId;
1381 req->subscriptionKey = subRecPtr.p->subscriptionKey;
1382 sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
1383 SubRemoveReq::SignalLength, JBB);
1384 }
1385
buildFailed(Signal * signal,SubscriptionRecPtr subRecPtr,BuildIndxRef::ErrorCode errorCode)1386 void Trix::buildFailed(Signal* signal,
1387 SubscriptionRecPtr subRecPtr,
1388 BuildIndxRef::ErrorCode errorCode)
1389 {
1390 SubscriptionRecord* subRec = subRecPtr.p;
1391
1392 subRec->errorCode = errorCode;
1393 // Continue accumulating since we currently cannot stop SUMA
1394 subRec->expectedConf--;
1395 checkParallelism(signal, subRec);
1396 if (subRec->expectedConf == 0)
1397 buildComplete(signal, subRecPtr);
1398 }
1399
1400 void
execSUB_REMOVE_REF(Signal * signal)1401 Trix::execSUB_REMOVE_REF(Signal* signal){
1402 jamEntry();
1403 //@todo
1404 ndbabort();
1405 }
1406
1407 void
execSUB_REMOVE_CONF(Signal * signal)1408 Trix::execSUB_REMOVE_CONF(Signal* signal){
1409 jamEntry();
1410
1411 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
1412
1413 SubscriptionRecPtr subRecPtr;
1414 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1415
1416 if(subRecPtr.p->prepareId != RNIL){
1417 jam();
1418
1419 UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
1420 req->prepareId = subRecPtr.p->prepareId;
1421 req->senderData = subRecPtr.i;
1422
1423 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
1424 UtilReleaseReq::SignalLength , JBB);
1425 return;
1426 }
1427
1428 {
1429 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1430 conf->senderData = subRecPtr.i;
1431 execUTIL_RELEASE_CONF(signal);
1432 }
1433 }
1434
1435 void
execUTIL_RELEASE_REF(Signal * signal)1436 Trix::execUTIL_RELEASE_REF(Signal* signal){
1437 jamEntry();
1438 ndbabort();
1439 }
1440
1441 void
execUTIL_RELEASE_CONF(Signal * signal)1442 Trix::execUTIL_RELEASE_CONF(Signal* signal){
1443
1444 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1445
1446 SubscriptionRecPtr subRecPtr;
1447 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1448
1449 switch(subRecPtr.p->requestType){
1450 case REORG_COPY:
1451 case REORG_DELETE:
1452 if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1453 {
1454 jam();
1455 // Build is complete, reply to original sender
1456 CopyDataImplConf* conf = (CopyDataImplConf*)signal->getDataPtrSend();
1457 conf->senderRef = reference(); //wl3600_todo ok?
1458 conf->senderData = subRecPtr.p->connectionPtr;
1459
1460 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_CONF, signal,
1461 CopyDataImplConf::SignalLength , JBB);
1462
1463 infoEvent("%s table %u processed %llu rows",
1464 subRecPtr.p->requestType == REORG_COPY ?
1465 "reorg-copy" : "reorg-delete",
1466 subRecPtr.p->sourceTableId,
1467 subRecPtr.p->m_rows_processed);
1468 } else {
1469 jam();
1470 // Build failed, reply to original sender
1471 CopyDataImplRef* ref = (CopyDataImplRef*)signal->getDataPtrSend();
1472 ref->senderRef = reference();
1473 ref->senderData = subRecPtr.p->connectionPtr;
1474 ref->errorCode = subRecPtr.p->errorCode;
1475
1476 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_REF, signal,
1477 CopyDataImplRef::SignalLength , JBB);
1478 }
1479 break;
1480 case INDEX_BUILD:
1481 if (subRecPtr.p->errorCode == BuildIndxRef::NoError) {
1482 jam();
1483 // Build is complete, reply to original sender
1484 BuildIndxImplConf* buildIndxConf =
1485 (BuildIndxImplConf*)signal->getDataPtrSend();
1486 buildIndxConf->senderRef = reference(); //wl3600_todo ok?
1487 buildIndxConf->senderData = subRecPtr.p->connectionPtr;
1488
1489 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_CONF, signal,
1490 BuildIndxConf::SignalLength , JBB);
1491
1492 infoEvent("index-build table %u index: %u processed %llu rows",
1493 subRecPtr.p->sourceTableId,
1494 subRecPtr.p->targetTableId,
1495 subRecPtr.p->m_rows_processed);
1496 } else {
1497 jam();
1498 // Build failed, reply to original sender
1499 BuildIndxImplRef* buildIndxRef =
1500 (BuildIndxImplRef*)signal->getDataPtrSend();
1501 buildIndxRef->senderRef = reference();
1502 buildIndxRef->senderData = subRecPtr.p->connectionPtr;
1503 buildIndxRef->errorCode = subRecPtr.p->errorCode;
1504
1505 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_REF, signal,
1506 BuildIndxRef::SignalLength , JBB);
1507 }
1508 break;
1509 case FK_BUILD:
1510 if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1511 {
1512 jam();
1513 // Build is complete, reply to original sender
1514 BuildFKImplConf* buildFKConf =
1515 CAST_PTR(BuildFKImplConf, signal->getDataPtrSend());
1516 buildFKConf->senderRef = reference();
1517 buildFKConf->senderData = subRecPtr.p->connectionPtr;
1518
1519 sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_CONF, signal,
1520 BuildFKImplConf::SignalLength , JBB);
1521
1522 infoEvent("fk-build parent table: %u child table: %u processed %llu rows",
1523 subRecPtr.p->targetTableId,
1524 subRecPtr.p->sourceTableId,
1525 subRecPtr.p->m_rows_processed);
1526 }
1527 else
1528 {
1529 jam();
1530 // Build failed, reply to original sender
1531 BuildFKImplRef* buildFKRef =
1532 (BuildFKImplRef*)signal->getDataPtrSend();
1533 buildFKRef->senderRef = reference();
1534 buildFKRef->senderData = subRecPtr.p->connectionPtr;
1535 buildFKRef->errorCode = subRecPtr.p->errorCode;
1536
1537 sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_REF, signal,
1538 BuildFKImplRef::SignalLength , JBB);
1539 }
1540 break;
1541 case STAT_UTIL:
1542 ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
1543 statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
1544 return;
1545 case STAT_CLEAN:
1546 {
1547 subRecPtr.p->prepareId = RNIL;
1548 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1549 statCleanRelease(signal, stat);
1550 }
1551 return;
1552 case STAT_SCAN:
1553 {
1554 subRecPtr.p->prepareId = RNIL;
1555 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1556 statScanRelease(signal, stat);
1557 }
1558 return;
1559 }
1560
1561 // Release subscription record
1562 subRecPtr.p->attributeOrder.release();
1563 c_theSubscriptions.release(subRecPtr.i);
1564 }
1565
checkParallelism(Signal * signal,SubscriptionRecord * subRec)1566 void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
1567 {
1568 if ((subRec->pendingSubSyncContinueConf) &&
1569 (subRec->expectedConf == 1)) {
1570 jam();
1571 SubSyncContinueConf * subSyncContinueConf =
1572 (SubSyncContinueConf *) signal->getDataPtrSend();
1573 subSyncContinueConf->subscriptionId = subRec->subscriptionId;
1574 subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
1575 subSyncContinueConf->senderData = subRec->syncPtr;
1576 sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
1577 SubSyncContinueConf::SignalLength , JBB);
1578 subRec->pendingSubSyncContinueConf = false;
1579 return;
1580 }
1581 }
1582
1583 // CopyData
1584 void
execCOPY_DATA_IMPL_REQ(Signal * signal)1585 Trix::execCOPY_DATA_IMPL_REQ(Signal* signal)
1586 {
1587 jamEntry();
1588
1589 const CopyDataImplReq reqData = *(const CopyDataImplReq*)signal->getDataPtr();
1590 const CopyDataImplReq *req = &reqData;
1591
1592 // Seize a subscription record
1593 SubscriptionRecPtr subRecPtr;
1594 SectionHandle handle(this, signal);
1595
1596 if (!c_theSubscriptions.seizeFirst(subRecPtr))
1597 {
1598 jam();
1599 // Failed to allocate subscription record
1600 releaseSections(handle);
1601
1602 CopyDataImplRef* ref = (CopyDataRef*)signal->getDataPtrSend();
1603
1604 ref->errorCode = -1; // XXX CopyDataImplRef::AllocationFailure;
1605 ref->senderData = req->senderData;
1606 ref->transId = req->transId;
1607 sendSignal(req->senderRef, GSN_COPY_DATA_IMPL_REF, signal,
1608 CopyDataImplRef::SignalLength, JBB);
1609 return;
1610 }
1611
1612 SubscriptionRecord* subRec = subRecPtr.p;
1613 subRec->errorCode = BuildIndxRef::NoError;
1614 subRec->userReference = req->senderRef;
1615 subRec->connectionPtr = req->senderData;
1616 subRec->schemaTransId = req->transId;
1617 subRec->subscriptionId = rand();
1618 subRec->subscriptionKey = rand();
1619 subRec->indexType = RNIL;
1620 subRec->sourceTableId = req->srcTableId;
1621 subRec->targetTableId = req->dstTableId;
1622 subRec->parallelism = c_maxReorgBuildBatchSize;
1623 subRec->expectedConf = 0;
1624 subRec->subscriptionCreated = false;
1625 subRec->pendingSubSyncContinueConf = false;
1626 subRec->prepareId = req->transId;
1627 subRec->fragCount = req->srcFragments;
1628 subRec->fragId = ZNIL;
1629 subRec->m_rows_processed = 0;
1630 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
1631 subRec->m_gci = 0;
1632 switch(req->requestType){
1633 case CopyDataImplReq::ReorgCopy:
1634 jam();
1635 subRec->requestType = REORG_COPY;
1636 break;
1637 case CopyDataImplReq::ReorgDelete:
1638 subRec->requestType = REORG_DELETE;
1639 break;
1640 default:
1641 jamLine(req->requestType);
1642 ndbabort();
1643 }
1644
1645 if (req->requestInfo & CopyDataReq::TupOrder)
1646 {
1647 jam();
1648 subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1649 }
1650
1651 // Get column order segments
1652 Uint32 noOfSections = handle.m_cnt;
1653 if (noOfSections > 0) {
1654 jam();
1655 SegmentedSectionPtr ptr;
1656 handle.getSection(ptr, 0);
1657 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1658 subRec->noOfIndexColumns = ptr.sz;
1659 }
1660
1661 if (noOfSections > 1) {
1662 jam();
1663 SegmentedSectionPtr ptr;
1664 handle.getSection(ptr, 1);
1665 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1666 subRec->noOfKeyColumns = ptr.sz;
1667 }
1668
1669 D("COPY_DATA_IMPL_REQ srctableId: " << subRec->sourceTableId <<
1670 " targetTableId: " << subRec->targetTableId <<
1671 " fragCount: " << subRec->fragCount <<
1672 " requestType: " << subRec->requestType <<
1673 " flags: " << hex << subRec->m_flags);
1674
1675 releaseSections(handle);
1676 {
1677 UtilPrepareReq * utilPrepareReq =
1678 (UtilPrepareReq *)signal->getDataPtrSend();
1679
1680 utilPrepareReq->senderRef = reference();
1681 utilPrepareReq->senderData = subRecPtr.i;
1682 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1683
1684 const Uint32 pageSizeInWords = 128;
1685 Uint32 propPage[pageSizeInWords];
1686 LinearWriter w(&propPage[0],128);
1687 w.first();
1688 w.add(UtilPrepareReq::NoOfOperations, 1);
1689 if (subRec->requestType == REORG_COPY)
1690 {
1691 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1692 }
1693 else
1694 {
1695 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Delete);
1696 }
1697 if (!(req->requestInfo & CopyDataReq::NoScanTakeOver))
1698 {
1699 w.add(UtilPrepareReq::ScanTakeOverInd, 1);
1700 }
1701 w.add(UtilPrepareReq::ReorgInd, 1);
1702 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1703
1704 AttrOrderBuffer::DataBufferIterator iter;
1705 ndbrequire(subRec->attributeOrder.first(iter));
1706
1707 for(Uint32 i = 0; i < subRec->noOfIndexColumns; i++)
1708 {
1709 w.add(UtilPrepareReq::AttributeId, * iter.data);
1710 subRec->attributeOrder.next(iter);
1711 }
1712
1713 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1714 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1715 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1716 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1717 UtilPrepareReq::SignalLength, JBB,
1718 sectionsPtr, UtilPrepareReq::NoOfSections);
1719 }
1720 }
1721
1722 // BuildFK
1723 void
execBUILD_FK_IMPL_REQ(Signal * signal)1724 Trix::execBUILD_FK_IMPL_REQ(Signal* signal)
1725 {
1726 jamEntry();
1727
1728 const BuildFKImplReq reqData = *(const BuildFKImplReq*)signal->getDataPtr();
1729 const BuildFKImplReq *req = &reqData;
1730
1731 // Seize a subscription record
1732 SubscriptionRecPtr subRecPtr;
1733 SectionHandle handle(this, signal);
1734
1735 if (!c_theSubscriptions.seizeFirst(subRecPtr))
1736 {
1737 jam();
1738 // Failed to allocate subscription record
1739 releaseSections(handle);
1740
1741 BuildFKImplRef* ref = (BuildFKImplRef*)signal->getDataPtrSend();
1742
1743 ref->errorCode = -1; // XXX BuildFKImplRef::AllocationFailure;
1744 ref->senderData = req->senderData;
1745 sendSignal(req->senderRef, GSN_BUILD_FK_IMPL_REF, signal,
1746 BuildFKImplRef::SignalLength, JBB);
1747 return;
1748 }
1749
1750 SubscriptionRecord* subRec = subRecPtr.p;
1751 subRec->errorCode = BuildIndxRef::NoError;
1752 subRec->userReference = req->senderRef;
1753 subRec->connectionPtr = req->senderData;
1754 subRec->schemaTransId = req->transId;
1755 subRec->subscriptionId = rand();
1756 subRec->subscriptionKey = rand();
1757 subRec->indexType = RNIL;
1758 subRec->sourceTableId = req->childTableId;
1759 subRec->targetTableId = req->parentTableId;
1760 subRec->parallelism = c_maxFKBuildBatchSize;
1761 subRec->expectedConf = 0;
1762 subRec->subscriptionCreated = false;
1763 subRec->pendingSubSyncContinueConf = false;
1764 subRec->prepareId = req->transId;
1765 subRec->fragCount = 0; // all
1766 subRec->fragId = ZNIL;
1767 subRec->m_rows_processed = 0;
1768 subRec->m_flags = 0;
1769 subRec->m_gci = 0;
1770 subRec->requestType = FK_BUILD;
1771
1772 // TODO...check if there is a scenario where this is not optimal
1773 subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1774
1775 // as we don't support index on disk...
1776 subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
1777
1778 // Get parent columns...
1779 {
1780 SegmentedSectionPtr ptr;
1781 handle.getSection(ptr, 0);
1782 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1783 subRec->noOfKeyColumns = ptr.sz;
1784 }
1785
1786 {
1787 // Get child columns...
1788 SegmentedSectionPtr ptr;
1789 handle.getSection(ptr, 1);
1790 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1791 subRec->noOfIndexColumns = ptr.sz;
1792 }
1793
1794 ndbrequire(subRec->noOfKeyColumns == subRec->noOfIndexColumns);
1795
1796 releaseSections(handle);
1797
1798 {
1799 UtilPrepareReq * utilPrepareReq =
1800 (UtilPrepareReq *)signal->getDataPtrSend();
1801
1802 utilPrepareReq->senderRef = reference();
1803 utilPrepareReq->senderData = subRecPtr.i;
1804 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1805
1806 const Uint32 pageSizeInWords = 128;
1807 Uint32 propPage[pageSizeInWords];
1808 LinearWriter w(&propPage[0],128);
1809 w.first();
1810 w.add(UtilPrepareReq::NoOfOperations, 1);
1811 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Probe);
1812 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1813
1814 // key is always in 0
1815 AttrOrderBuffer::DataBufferIterator iter;
1816 ndbrequire(subRec->attributeOrder.first(iter));
1817 for(Uint32 i = 0; i < subRec->noOfKeyColumns; i++)
1818 {
1819 w.add(UtilPrepareReq::AttributeId, * iter.data);
1820 subRec->attributeOrder.next(iter);
1821 }
1822
1823 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1824 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1825 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1826 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1827 UtilPrepareReq::SignalLength, JBB,
1828 sectionsPtr, UtilPrepareReq::NoOfSections);
1829 }
1830 }
1831
1832 void
executeBuildFKTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1833 Trix::executeBuildFKTransaction(Signal* signal,
1834 SubscriptionRecPtr subRecPtr)
1835 {
1836 jam();
1837 SubscriptionRecord* subRec = subRecPtr.p;
1838 UtilExecuteReq * utilExecuteReq =
1839 CAST_PTR(UtilExecuteReq, signal->getDataPtrSend());
1840
1841 utilExecuteReq->senderRef = reference();
1842 utilExecuteReq->senderData = subRecPtr.i;
1843 utilExecuteReq->prepareId = subRec->prepareId;
1844
1845 // Save scan result in linear buffers
1846 SectionHandle handle(this, signal);
1847 SegmentedSectionPtr headerPtr, dataPtr;
1848
1849 handle.getSection(headerPtr, 0);
1850 handle.getSection(dataPtr, 1);
1851
1852 Uint32* headerBuffer = signal->theData + 25;
1853 Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1854
1855 copy(headerBuffer, headerPtr);
1856 copy(dataBuffer, dataPtr);
1857 releaseSections(handle);
1858
1859 AttrOrderBuffer::ConstDataBufferIterator iter;
1860 ndbrequire(subRec->attributeOrder.first(iter));
1861 for(Uint32 i = 0; i < headerPtr.sz; i++)
1862 {
1863 AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1864
1865 // Filter out NULL attributes
1866 if (keyAttrHead->isNULL())
1867 return;
1868
1869 /**
1870 * UTIL_EXECUTE header section expects real attrid (same as passed in
1871 * UTIL_PREPARE). SUMA sends child attrid, replace it by parent attrid.
1872 */
1873 keyAttrHead->setAttributeId(*iter.data);
1874 subRec->attributeOrder.next(iter);
1875 }
1876 // Increase expected CONF count
1877 subRec->expectedConf++;
1878
1879 struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1880 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1881 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = subRec->noOfKeyColumns;
1882 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1883 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1884 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1885 UtilExecuteReq::SignalLength, JBB,
1886 sectionsPtr, UtilExecuteReq::NoOfSections);
1887 }
1888
1889 // index stats
1890
1891 Trix::StatOp&
statOpGetPtr(Uint32 statPtrI)1892 Trix::statOpGetPtr(Uint32 statPtrI)
1893 {
1894 ndbrequire(statPtrI != RNIL);
1895 return *c_statOpPool.getPtr(statPtrI);
1896 }
1897
1898 bool
statOpSeize(Uint32 & statPtrI)1899 Trix::statOpSeize(Uint32& statPtrI)
1900 {
1901 StatOpPtr statPtr;
1902 if (ERROR_INSERTED(18001) ||
1903 !c_statOpPool.seize(statPtr))
1904 {
1905 jam();
1906 CLEAR_ERROR_INSERT_VALUE;
1907 D("statOpSeize: seize statOp failed");
1908 return false;
1909 }
1910 #ifdef VM_TRACE
1911 memset(statPtr.p, 0xf3, sizeof(*statPtr.p));
1912 #endif
1913 new (statPtr.p) StatOp;
1914 statPtrI = statPtr.i;
1915 StatOp& stat = statOpGetPtr(statPtrI);
1916 stat.m_ownPtrI = statPtrI;
1917
1918 SubscriptionRecPtr subRecPtr;
1919 if (ERROR_INSERTED(18002) ||
1920 !c_theSubscriptions.seizeFirst(subRecPtr))
1921 {
1922 jam();
1923 CLEAR_ERROR_INSERT_VALUE;
1924 c_statOpPool.release(statPtr);
1925 D("statOpSeize: seize subRec failed");
1926 return false;
1927 }
1928 SubscriptionRecord* subRec = subRecPtr.p;
1929 subRec->m_statPtrI = stat.m_ownPtrI;
1930 stat.m_subRecPtrI = subRecPtr.i;
1931
1932 D("statOpSeize" << V(statPtrI) << V(subRecPtr.i));
1933 return true;
1934 }
1935
1936 void
statOpRelease(StatOp & stat)1937 Trix::statOpRelease(StatOp& stat)
1938 {
1939 StatOp::Util& util = stat.m_util;
1940 D("statOpRelease" << V(stat));
1941
1942 if (stat.m_subRecPtrI != RNIL)
1943 {
1944 jam();
1945 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1946 ndbrequire(subRec->prepareId == RNIL);
1947 subRec->attributeOrder.release();
1948 c_theSubscriptions.release(stat.m_subRecPtrI);
1949 stat.m_subRecPtrI = RNIL;
1950 }
1951 ndbrequire(util.m_prepareId == RNIL);
1952 c_statOpPool.release(stat.m_ownPtrI);
1953 }
1954
1955 void
execINDEX_STAT_IMPL_REQ(Signal * signal)1956 Trix::execINDEX_STAT_IMPL_REQ(Signal* signal)
1957 {
1958 jamEntry();
1959 const IndexStatImplReq* req = (const IndexStatImplReq*)signal->getDataPtr();
1960
1961 Uint32 statPtrI = RNIL;
1962 if (!statOpSeize(statPtrI))
1963 {
1964 jam();
1965 const IndexStatImplReq reqCopy = *req;
1966 statOpRef(signal, &reqCopy, IndexStatRef::NoFreeStatOp, __LINE__);
1967 return;
1968 }
1969 StatOp& stat = statOpGetPtr(statPtrI);
1970 stat.m_req = *req;
1971 stat.m_requestType = req->requestType;
1972
1973 // set request name for cluster log message
1974 switch (stat.m_requestType) {
1975 case IndexStatReq::RT_CLEAN_NEW:
1976 jam();
1977 stat.m_requestName = "clean new";
1978 break;
1979 case IndexStatReq::RT_CLEAN_OLD:
1980 jam();
1981 stat.m_requestName = "clean old";
1982 break;
1983 case IndexStatReq::RT_CLEAN_ALL:
1984 jam();
1985 stat.m_requestName = "clean all";
1986 break;
1987 case IndexStatReq::RT_SCAN_FRAG:
1988 jam();
1989 stat.m_requestName = "scan frag";
1990 break;
1991 case IndexStatReq::RT_DROP_HEAD:
1992 jam();
1993 stat.m_requestName = "drop head";
1994 break;
1995 default:
1996 ndbabort();
1997 }
1998
1999 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2000 subRec->prepareId = RNIL;
2001 subRec->errorCode = BuildIndxRef::NoError;
2002
2003 // sys tables are not recreated so do this only once
2004 if (!c_statGetMetaDone)
2005 {
2006 jam();
2007 statMetaGetHead(signal, stat);
2008 return;
2009 }
2010 statGetMetaDone(signal, stat);
2011 }
2012
2013 // sys tables metadata
2014
2015 const Trix::SysColumn
2016 Trix::g_statMetaHead_column[] = {
2017 { 0, "index_id",
2018 true
2019 },
2020 { 1, "index_version",
2021 true
2022 },
2023 { 2, "table_id",
2024 false
2025 },
2026 { 3, "frag_count",
2027 false
2028 },
2029 { 4, "value_format",
2030 false
2031 },
2032 { 5, "sample_version",
2033 false
2034 },
2035 { 6, "load_time",
2036 false
2037 },
2038 { 7, "sample_count",
2039 false
2040 },
2041 { 8, "key_bytes",
2042 false
2043 }
2044 };
2045
2046 const Trix::SysColumn
2047 Trix::g_statMetaSample_column[] = {
2048 { 0, "index_id",
2049 true
2050 },
2051 { 1, "index_version",
2052 true
2053 },
2054 { 2, "sample_version",
2055 true
2056 },
2057 { 3, "stat_key",
2058 true
2059 },
2060 { 4, "stat_value",
2061 false
2062 }
2063 };
2064
2065 const Trix::SysTable
2066 Trix::g_statMetaHead = {
2067 NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_HEAD_TABLE,
2068 ~(Uint32)0,
2069 sizeof(g_statMetaHead_column)/sizeof(g_statMetaHead_column[0]),
2070 g_statMetaHead_column
2071 };
2072
2073 const Trix::SysTable
2074 Trix::g_statMetaSample = {
2075 NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_SAMPLE_TABLE,
2076 ~(Uint32)0,
2077 sizeof(g_statMetaSample_column)/sizeof(g_statMetaSample_column[0]),
2078 g_statMetaSample_column
2079 };
2080
2081 const Trix::SysIndex
2082 Trix::g_statMetaSampleX1 = {
2083 // indexes are always in "sys"
2084 "sys" "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
2085 ~(Uint32)0,
2086 ~(Uint32)0
2087 };
2088
2089 void
statMetaGetHead(Signal * signal,StatOp & stat)2090 Trix::statMetaGetHead(Signal* signal, StatOp& stat)
2091 {
2092 D("statMetaGetHead" << V(stat));
2093 StatOp::Meta& meta = stat.m_meta;
2094 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
2095 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2096 const char* name = g_statMetaHead.name;
2097 sendGetTabInfoReq(signal, stat, name);
2098 }
2099
2100 void
statMetaGetHeadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2101 Trix::statMetaGetHeadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2102 {
2103 StatOp& stat = statOpGetPtr(statPtrI);
2104 D("statMetaGetHeadCB" << V(stat) << V(ret));
2105 StatOp::Meta& meta = stat.m_meta;
2106 if (ret != 0)
2107 {
2108 jam();
2109 Uint32 supress[] = { GetTabInfoRef::TableNotDefined, 0 };
2110 statOpError(signal, stat, ret, __LINE__, supress);
2111 return;
2112 }
2113 g_statMetaHead.tableId = meta.m_conf.tableId;
2114 statMetaGetSample(signal, stat);
2115 }
2116
2117 void
statMetaGetSample(Signal * signal,StatOp & stat)2118 Trix::statMetaGetSample(Signal* signal, StatOp& stat)
2119 {
2120 D("statMetaGetSample" << V(stat));
2121 StatOp::Meta& meta = stat.m_meta;
2122 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
2123 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2124 const char* name = g_statMetaSample.name;
2125 sendGetTabInfoReq(signal, stat, name);
2126 }
2127
2128 void
statMetaGetSampleCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2129 Trix::statMetaGetSampleCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2130 {
2131 StatOp& stat = statOpGetPtr(statPtrI);
2132 D("statMetaGetSampleCB" << V(stat) << V(ret));
2133 StatOp::Meta& meta = stat.m_meta;
2134 if (ret != 0)
2135 {
2136 jam();
2137 statOpError(signal, stat, ret, __LINE__);
2138 return;
2139 }
2140 g_statMetaSample.tableId = meta.m_conf.tableId;
2141 statMetaGetSampleX1(signal, stat);
2142 }
2143
2144 void
statMetaGetSampleX1(Signal * signal,StatOp & stat)2145 Trix::statMetaGetSampleX1(Signal* signal, StatOp& stat)
2146 {
2147 D("statMetaGetSampleX1" << V(stat));
2148 StatOp::Meta& meta = stat.m_meta;
2149 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
2150 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2151 const char* name_fmt = g_statMetaSampleX1.name;
2152 char name[MAX_TAB_NAME_SIZE];
2153 BaseString::snprintf(name, sizeof(name), name_fmt, g_statMetaSample.tableId);
2154 sendGetTabInfoReq(signal, stat, name);
2155 }
2156
2157 void
statMetaGetSampleX1CB(Signal * signal,Uint32 statPtrI,Uint32 ret)2158 Trix::statMetaGetSampleX1CB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2159 {
2160 StatOp& stat = statOpGetPtr(statPtrI);
2161 D("statMetaGetSampleX1CB" << V(stat) << V(ret));
2162 StatOp::Meta& meta = stat.m_meta;
2163 if (ret != 0)
2164 {
2165 jam();
2166 statOpError(signal, stat, ret, __LINE__);
2167 return;
2168 }
2169 g_statMetaSampleX1.tableId = g_statMetaSample.tableId;
2170 g_statMetaSampleX1.indexId = meta.m_conf.tableId;
2171 statGetMetaDone(signal, stat);
2172 }
2173
2174 void
sendGetTabInfoReq(Signal * signal,StatOp & stat,const char * name)2175 Trix::sendGetTabInfoReq(Signal* signal, StatOp& stat, const char* name)
2176 {
2177 D("sendGetTabInfoReq" << V(stat) << V(name));
2178 GetTabInfoReq* req = (GetTabInfoReq*)signal->getDataPtrSend();
2179
2180 Uint32 name_len = (Uint32)strlen(name) + 1;
2181 Uint32 name_len_words = (name_len + 3 ) / 4;
2182 Uint32 name_buf[32];
2183 ndbrequire(name_len_words <= 32);
2184 memset(name_buf, 0, sizeof(name_buf));
2185 memcpy(name_buf, name, name_len);
2186
2187 req->senderData = stat.m_ownPtrI;
2188 req->senderRef = reference();
2189 req->requestType = GetTabInfoReq::RequestByName |
2190 GetTabInfoReq::LongSignalConf;
2191 req->tableNameLen = name_len;
2192 req->schemaTransId = 0;
2193 LinearSectionPtr ptr[3];
2194 ptr[0].p = name_buf;
2195 ptr[0].sz = name_len_words;
2196 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
2197 signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
2198 }
2199
2200 void
execGET_TABINFO_CONF(Signal * signal)2201 Trix::execGET_TABINFO_CONF(Signal* signal)
2202 {
2203 jamEntry();
2204 if (!assembleFragments(signal)) {
2205 jam();
2206 return;
2207 }
2208 const GetTabInfoConf* conf = (const GetTabInfoConf*)signal->getDataPtr();
2209 StatOp& stat = statOpGetPtr(conf->senderData);
2210 D("execGET_TABINFO_CONF" << V(stat));
2211 StatOp::Meta& meta = stat.m_meta;
2212 meta.m_conf = *conf;
2213
2214 // do not need DICTTABINFO
2215 SectionHandle handle(this, signal);
2216 releaseSections(handle);
2217
2218 execute(signal, meta.m_cb, 0);
2219 }
2220
2221 void
execGET_TABINFO_REF(Signal * signal)2222 Trix::execGET_TABINFO_REF(Signal* signal)
2223 {
2224 jamEntry();
2225 const GetTabInfoRef* ref = (const GetTabInfoRef*)signal->getDataPtr();
2226 StatOp& stat = statOpGetPtr(ref->senderData);
2227 D("execGET_TABINFO_REF" << V(stat));
2228 StatOp::Meta& meta = stat.m_meta;
2229
2230 ndbrequire(ref->errorCode != 0);
2231 execute(signal, meta.m_cb, ref->errorCode);
2232 }
2233
2234 // continue after metadata retrieval
2235
2236 void
statGetMetaDone(Signal * signal,StatOp & stat)2237 Trix::statGetMetaDone(Signal* signal, StatOp& stat)
2238 {
2239 const IndexStatImplReq* req = &stat.m_req;
2240 StatOp::Data& data = stat.m_data;
2241 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2242 D("statGetMetaDone" << V(stat));
2243
2244 // c_statGetMetaDone = true;
2245
2246 subRec->requestType = STAT_UTIL;
2247 // fill in constant part
2248 ndbrequire(req->fragCount != 0);
2249 data.m_indexId = req->indexId;
2250 data.m_indexVersion = req->indexVersion;
2251 data.m_fragCount = req->fragCount;
2252 statHeadRead(signal, stat);
2253 }
2254
2255 // head table ops
2256
2257 void
statHeadRead(Signal * signal,StatOp & stat)2258 Trix::statHeadRead(Signal* signal, StatOp& stat)
2259 {
2260 StatOp::Util& util = stat.m_util;
2261 StatOp::Send& send = stat.m_send;
2262 D("statHeadRead" << V(stat));
2263
2264 util.m_not_found = false;
2265 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
2266 util.m_cb.m_callbackData = stat.m_ownPtrI;
2267 send.m_sysTable = &g_statMetaHead;
2268 send.m_operationType = UtilPrepareReq::Read;
2269 statUtilPrepare(signal, stat);
2270 }
2271
2272 void
statHeadReadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2273 Trix::statHeadReadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2274 {
2275 StatOp& stat = statOpGetPtr(statPtrI);
2276 StatOp::Data& data = stat.m_data;
2277 StatOp::Util& util = stat.m_util;
2278 D("statHeadReadCB" << V(stat) << V(ret));
2279
2280 ndbrequire(ret == 0);
2281 data.m_head_found = !util.m_not_found;
2282 statReadHeadDone(signal, stat);
2283 }
2284
2285 void
statHeadInsert(Signal * signal,StatOp & stat)2286 Trix::statHeadInsert(Signal* signal, StatOp& stat)
2287 {
2288 StatOp::Util& util = stat.m_util;
2289 StatOp::Send& send = stat.m_send;
2290 D("statHeadInsert" << V(stat));
2291
2292 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
2293 util.m_cb.m_callbackData = stat.m_ownPtrI;
2294 send.m_sysTable = &g_statMetaHead;
2295 send.m_operationType = UtilPrepareReq::Insert;
2296 statUtilPrepare(signal, stat);
2297 }
2298
2299 void
statHeadInsertCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2300 Trix::statHeadInsertCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2301 {
2302 StatOp& stat = statOpGetPtr(statPtrI);
2303 D("statHeadInsertCB" << V(stat) << V(ret));
2304
2305 ndbrequire(ret == 0);
2306 statInsertHeadDone(signal, stat);
2307 }
2308
2309 void
statHeadUpdate(Signal * signal,StatOp & stat)2310 Trix::statHeadUpdate(Signal* signal, StatOp& stat)
2311 {
2312 StatOp::Util& util = stat.m_util;
2313 StatOp::Send& send = stat.m_send;
2314 D("statHeadUpdate" << V(stat));
2315
2316 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
2317 util.m_cb.m_callbackData = stat.m_ownPtrI;
2318 send.m_sysTable = &g_statMetaHead;
2319 send.m_operationType = UtilPrepareReq::Update;
2320 statUtilPrepare(signal, stat);
2321 }
2322
2323 void
statHeadUpdateCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2324 Trix::statHeadUpdateCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2325 {
2326 StatOp& stat = statOpGetPtr(statPtrI);
2327 D("statHeadUpdateCB" << V(stat) << V(ret));
2328
2329 ndbrequire(ret == 0);
2330 statUpdateHeadDone(signal, stat);
2331 }
2332
2333 void
statHeadDelete(Signal * signal,StatOp & stat)2334 Trix::statHeadDelete(Signal* signal, StatOp& stat)
2335 {
2336 StatOp::Util& util = stat.m_util;
2337 StatOp::Send& send = stat.m_send;
2338 D("statHeadDelete" << V(stat));
2339
2340 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
2341 util.m_cb.m_callbackData = stat.m_ownPtrI;
2342 send.m_sysTable = &g_statMetaHead;
2343 send.m_operationType = UtilPrepareReq::Delete;
2344 statUtilPrepare(signal, stat);
2345 }
2346
2347 void
statHeadDeleteCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2348 Trix::statHeadDeleteCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2349 {
2350 StatOp& stat = statOpGetPtr(statPtrI);
2351 D("statHeadDeleteCB" << V(stat) << V(ret));
2352
2353 ndbrequire(ret == 0);
2354 statDeleteHeadDone(signal, stat);
2355 }
2356
2357 // util (PK ops, only HEAD for now)
2358
2359 void
statUtilPrepare(Signal * signal,StatOp & stat)2360 Trix::statUtilPrepare(Signal* signal, StatOp& stat)
2361 {
2362 StatOp::Util& util = stat.m_util;
2363 D("statUtilPrepare" << V(stat));
2364
2365 util.m_prepareId = RNIL;
2366 statSendPrepare(signal, stat);
2367 }
2368
2369 void
statUtilPrepareConf(Signal * signal,Uint32 statPtrI)2370 Trix::statUtilPrepareConf(Signal* signal, Uint32 statPtrI)
2371 {
2372 StatOp& stat = statOpGetPtr(statPtrI);
2373 StatOp::Util& util = stat.m_util;
2374 StatOp::Send& send = stat.m_send;
2375 D("statUtilPrepareConf" << V(stat));
2376
2377 const UtilPrepareConf* utilConf =
2378 (const UtilPrepareConf*)signal->getDataPtr();
2379 util.m_prepareId = utilConf->prepareId;
2380
2381 const Uint32 ot = send.m_operationType;
2382 if ((ERROR_INSERTED(18011) && ot == UtilPrepareReq::Read) ||
2383 (ERROR_INSERTED(18012) && ot != UtilPrepareReq::Read))
2384 {
2385 jam();
2386 CLEAR_ERROR_INSERT_VALUE;
2387 UtilExecuteRef* utilRef =
2388 (UtilExecuteRef*)signal->getDataPtrSend();
2389 utilRef->senderData = stat.m_ownPtrI;
2390 utilRef->errorCode = UtilExecuteRef::AllocationError;
2391 utilRef->TCErrorCode = 0;
2392 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2393 signal, UtilExecuteRef::SignalLength, JBB);
2394 return;
2395 }
2396
2397 statUtilExecute(signal, stat);
2398 }
2399
2400 void
statUtilPrepareRef(Signal * signal,Uint32 statPtrI)2401 Trix::statUtilPrepareRef(Signal* signal, Uint32 statPtrI)
2402 {
2403 StatOp& stat = statOpGetPtr(statPtrI);
2404 D("statUtilPrepareRef" << V(stat));
2405
2406 const UtilPrepareRef* utilRef =
2407 (const UtilPrepareRef*)signal->getDataPtr();
2408 Uint32 errorCode = utilRef->errorCode;
2409 ndbrequire(errorCode != 0);
2410
2411 switch (errorCode) {
2412 case UtilPrepareRef::PREPARE_SEIZE_ERROR:
2413 case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
2414 case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
2415 errorCode = IndexStatRef::BusyUtilPrepare;
2416 break;
2417 case UtilPrepareRef::DICT_TAB_INFO_ERROR:
2418 errorCode = IndexStatRef::InvalidSysTable;
2419 break;
2420 case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
2421 default:
2422 ndbabort();
2423 }
2424 statOpError(signal, stat, errorCode, __LINE__);
2425 }
2426
2427 void
statUtilExecute(Signal * signal,StatOp & stat)2428 Trix::statUtilExecute(Signal* signal, StatOp& stat)
2429 {
2430 StatOp::Util& util = stat.m_util;
2431 StatOp::Send& send = stat.m_send;
2432 D("statUtilExecute" << V(stat));
2433
2434 send.m_prepareId = util.m_prepareId;
2435 statSendExecute(signal, stat);
2436 }
2437
2438 void
statUtilExecuteConf(Signal * signal,Uint32 statPtrI)2439 Trix::statUtilExecuteConf(Signal* signal, Uint32 statPtrI)
2440 {
2441 StatOp& stat = statOpGetPtr(statPtrI);
2442 StatOp::Attr& attr = stat.m_attr;
2443 StatOp::Send& send = stat.m_send;
2444 D("statUtilExecuteConf" << V(stat));
2445
2446 if (send.m_operationType == UtilPrepareReq::Read)
2447 {
2448 jam();
2449 SectionHandle handle(this, signal);
2450 Uint32 rattr[20];
2451 Uint32 rdata[2048];
2452 attr.m_attr = rattr;
2453 attr.m_attrMax = 20;
2454 attr.m_attrSize = 0;
2455 attr.m_data = rdata;
2456 attr.m_dataMax = 2048;
2457 attr.m_dataSize = 0;
2458 {
2459 SegmentedSectionPtr ssPtr;
2460 handle.getSection(ssPtr, 0);
2461 ::copy(rattr, ssPtr);
2462 }
2463 {
2464 SegmentedSectionPtr ssPtr;
2465 handle.getSection(ssPtr, 1);
2466 ::copy(rdata, ssPtr);
2467 }
2468 releaseSections(handle);
2469
2470 const SysTable& sysTable = *send.m_sysTable;
2471 for (Uint32 i = 0; i < sysTable.columnCount; i++)
2472 {
2473 jam();
2474 statDataIn(stat, i);
2475 }
2476 }
2477
2478 statUtilRelease(signal, stat);
2479 }
2480
2481 void
statUtilExecuteRef(Signal * signal,Uint32 statPtrI)2482 Trix::statUtilExecuteRef(Signal* signal, Uint32 statPtrI)
2483 {
2484 StatOp& stat = statOpGetPtr(statPtrI);
2485 StatOp::Util& util = stat.m_util;
2486 StatOp::Send& send = stat.m_send;
2487 D("statUtilExecuteRef" << V(stat));
2488
2489 const UtilExecuteRef* utilRef =
2490 (const UtilExecuteRef*)signal->getDataPtr();
2491 Uint32 errorCode = utilRef->errorCode;
2492 ndbrequire(errorCode != 0);
2493
2494 switch (errorCode) {
2495 case UtilExecuteRef::TCError:
2496 errorCode = utilRef->TCErrorCode;
2497 ndbrequire(errorCode != 0);
2498 if (send.m_operationType == UtilPrepareReq::Read &&
2499 errorCode == ZNOT_FOUND)
2500 {
2501 jam();
2502 util.m_not_found = true;
2503 errorCode = 0;
2504 }
2505 break;
2506 case UtilExecuteRef::AllocationError:
2507 errorCode = IndexStatRef::BusyUtilExecute;
2508 break;
2509 default:
2510 ndbabort();
2511 }
2512
2513 if (errorCode != 0)
2514 {
2515 jam();
2516 statOpError(signal, stat, errorCode, __LINE__);
2517 return;
2518 }
2519 statUtilRelease(signal, stat);
2520 }
2521
2522 void
statUtilRelease(Signal * signal,StatOp & stat)2523 Trix::statUtilRelease(Signal* signal, StatOp& stat)
2524 {
2525 StatOp::Util& util = stat.m_util;
2526 StatOp::Send& send = stat.m_send;
2527 D("statUtilRelease" << V(stat));
2528
2529 send.m_prepareId = util.m_prepareId;
2530 statSendRelease(signal, stat);
2531 }
2532
2533 void
statUtilReleaseConf(Signal * signal,Uint32 statPtrI)2534 Trix::statUtilReleaseConf(Signal* signal, Uint32 statPtrI)
2535 {
2536 StatOp& stat = statOpGetPtr(statPtrI);
2537 StatOp::Util& util = stat.m_util;
2538 D("statUtilReleaseConf" << V(stat));
2539
2540 util.m_prepareId = RNIL;
2541 execute(signal, util.m_cb, 0);
2542 }
2543
2544 // continue after head table ops
2545
2546 void
statReadHeadDone(Signal * signal,StatOp & stat)2547 Trix::statReadHeadDone(Signal* signal, StatOp& stat)
2548 {
2549 //UNUSED StatOp::Data& data = stat.m_data;
2550 D("statReadHeadDone" << V(stat));
2551
2552 switch (stat.m_requestType) {
2553 case IndexStatReq::RT_CLEAN_NEW:
2554 jam();
2555 // Fall through
2556 case IndexStatReq::RT_CLEAN_OLD:
2557 jam();
2558 // Fall through
2559 case IndexStatReq::RT_CLEAN_ALL:
2560 jam();
2561 statCleanBegin(signal, stat);
2562 break;
2563
2564 case IndexStatReq::RT_SCAN_FRAG:
2565 jam();
2566 statScanBegin(signal, stat);
2567 break;
2568
2569 case IndexStatReq::RT_DROP_HEAD:
2570 jam();
2571 statDropBegin(signal, stat);
2572 break;
2573
2574 default:
2575 ndbabort();
2576 }
2577 }
2578
2579 void
statInsertHeadDone(Signal * signal,StatOp & stat)2580 Trix::statInsertHeadDone(Signal* signal, StatOp& stat)
2581 {
2582 D("statInsertHeadDone" << V(stat));
2583
2584 switch (stat.m_requestType) {
2585 case IndexStatReq::RT_SCAN_FRAG:
2586 jam();
2587 statScanEnd(signal, stat);
2588 break;
2589 default:
2590 ndbabort();
2591 }
2592 }
2593
2594 void
statUpdateHeadDone(Signal * signal,StatOp & stat)2595 Trix::statUpdateHeadDone(Signal* signal, StatOp& stat)
2596 {
2597 D("statUpdateHeadDone" << V(stat));
2598
2599 switch (stat.m_requestType) {
2600 case IndexStatReq::RT_SCAN_FRAG:
2601 jam();
2602 statScanEnd(signal, stat);
2603 break;
2604 default:
2605 ndbabort();
2606 }
2607 }
2608
2609 void
statDeleteHeadDone(Signal * signal,StatOp & stat)2610 Trix::statDeleteHeadDone(Signal* signal, StatOp& stat)
2611 {
2612 D("statDeleteHeadDone" << V(stat));
2613
2614 switch (stat.m_requestType) {
2615 case IndexStatReq::RT_DROP_HEAD:
2616 jam();
2617 statDropEnd(signal, stat);
2618 break;
2619 default:
2620 ndbabort();
2621 }
2622 }
2623
2624 // clean
2625
2626 void
statCleanBegin(Signal * signal,StatOp & stat)2627 Trix::statCleanBegin(Signal* signal, StatOp& stat)
2628 {
2629 const IndexStatImplReq* req = &stat.m_req;
2630 StatOp::Data& data = stat.m_data;
2631 D("statCleanBegin" << V(stat));
2632
2633 if (data.m_head_found == true)
2634 {
2635 jam();
2636 if (data.m_tableId != req->tableId &&
2637 stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2638 {
2639 jam();
2640 // must run ndb_index_stat --drop
2641 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2642 return;
2643 }
2644 }
2645 else
2646 {
2647 if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2648 {
2649 jam();
2650 // happens normally on first stats scan
2651 stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
2652 }
2653 }
2654 statCleanPrepare(signal, stat);
2655 }
2656
2657 void
statCleanPrepare(Signal * signal,StatOp & stat)2658 Trix::statCleanPrepare(Signal* signal, StatOp& stat)
2659 {
2660 const IndexStatImplReq* req = &stat.m_req;
2661 StatOp::Data& data = stat.m_data;
2662 StatOp::Clean& clean = stat.m_clean;
2663 StatOp::Send& send = stat.m_send;
2664 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2665 D("statCleanPrepare" << V(stat));
2666
2667 // count of deleted samples is just for info
2668 clean.m_cleanCount = 0;
2669
2670 const Uint32 ao_list[] = {
2671 0, // INDEX_ID
2672 1, // INDEX_VERSION
2673 2, // SAMPLE_VERSION
2674 3 // STAT_KEY
2675 };
2676 const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2677
2678 ndbrequire(req->fragId == ZNIL);
2679 subRec->m_flags = 0;
2680 subRec->requestType = STAT_CLEAN;
2681 subRec->schemaTransId = req->transId;
2682 subRec->userReference = 0; // not used
2683 subRec->connectionPtr = RNIL;
2684 subRec->subscriptionId = rand();
2685 subRec->subscriptionKey = rand();
2686 subRec->prepareId = RNIL;
2687 subRec->indexType = 0; // not used
2688 subRec->sourceTableId = g_statMetaSampleX1.indexId;
2689 subRec->targetTableId = RNIL;
2690 subRec->noOfIndexColumns = ao_size;
2691 subRec->noOfKeyColumns = 0;
2692 subRec->parallelism = 16; // remains hardcoded for now
2693 subRec->fragCount = 0;
2694 subRec->fragId = ZNIL;
2695 subRec->syncPtr = RNIL;
2696 subRec->errorCode = BuildIndxRef::NoError;
2697 subRec->subscriptionCreated = false;
2698 subRec->pendingSubSyncContinueConf = false;
2699 subRec->expectedConf = 0;
2700 subRec->m_rows_processed = 0;
2701 subRec->m_gci = 0;
2702
2703 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2704 ndbrequire(ao_buf.isEmpty());
2705 ao_buf.append(ao_list, ao_size);
2706
2707 // create TUX bounds
2708 clean.m_bound[0] = TuxBoundInfo::BoundEQ;
2709 clean.m_bound[1] = AttributeHeader(0, 4).m_value;
2710 clean.m_bound[2] = data.m_indexId;
2711 clean.m_bound[3] = TuxBoundInfo::BoundEQ;
2712 clean.m_bound[4] = AttributeHeader(1, 4).m_value;
2713 clean.m_bound[5] = data.m_indexVersion;
2714 Uint32 boundCount;
2715 switch (stat.m_requestType) {
2716 case IndexStatReq::RT_CLEAN_NEW:
2717 D("statCleanPrepare delete sample versions > " << data.m_sampleVersion);
2718 clean.m_bound[6] = TuxBoundInfo::BoundLT;
2719 clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2720 clean.m_bound[8] = data.m_sampleVersion;
2721 boundCount = 3;
2722 break;
2723 case IndexStatReq::RT_CLEAN_OLD:
2724 D("statCleanPrepare delete sample versions < " << data.m_sampleVersion);
2725 clean.m_bound[6] = TuxBoundInfo::BoundGT;
2726 clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2727 clean.m_bound[8] = data.m_sampleVersion;
2728 boundCount = 3;
2729 break;
2730 case IndexStatReq::RT_CLEAN_ALL:
2731 D("statCleanPrepare delete all sample versions");
2732 boundCount = 2;
2733 break;
2734 default:
2735 boundCount = 0; /* Silence compiler warning */
2736 ndbabort();
2737 }
2738 clean.m_boundSize = 3 * boundCount;
2739
2740 // TRIX traps the CONF
2741 send.m_sysTable = &g_statMetaSample;
2742 send.m_operationType = UtilPrepareReq::Delete;
2743 statSendPrepare(signal, stat);
2744 }
2745
2746 void
statCleanExecute(Signal * signal,StatOp & stat)2747 Trix::statCleanExecute(Signal* signal, StatOp& stat)
2748 {
2749 StatOp::Data& data = stat.m_data;
2750 StatOp::Send& send = stat.m_send;
2751 StatOp::Clean& clean = stat.m_clean;
2752 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2753 D("statCleanExecute" << V(stat));
2754
2755 CRASH_INSERTION(18025);
2756
2757 SectionHandle handle(this, signal);
2758 ndbrequire(handle.m_cnt == 2);
2759
2760 // ATTR_INFO
2761 AttributeHeader ah[4];
2762 SegmentedSectionPtr ptr0;
2763 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2764 ndbrequire(ptr0.sz == 4);
2765 ::copy((Uint32*)ah, ptr0);
2766 ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
2767 ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
2768 ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
2769 // read via TUP rounds bytes to words
2770 const Uint32 kz = ah[3].getDataSize();
2771 ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
2772
2773 // AFTER_VALUES
2774 // avmax = other pk attributes + length + max index stat key size
2775 const Uint32 avmax = 3 + 1 + MAX_INDEX_STAT_KEY_SIZE;
2776 Uint32 av[avmax];
2777 SegmentedSectionPtr ptr1;
2778 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2779 ndbrequire(ptr1.sz <= avmax);
2780 ::copy(av, ptr1);
2781 ndbrequire(data.m_indexId == av[0]);
2782 ndbrequire(data.m_indexVersion == av[1]);
2783 data.m_sampleVersion = av[2];
2784 data.m_statKey = &av[3];
2785 const unsigned char* kp = (const unsigned char*)data.m_statKey;
2786 const Uint32 kb = kp[0] + (kp[1] << 8);
2787 // key is not empty
2788 ndbrequire(kb != 0);
2789 ndbrequire(kz == ((2 + kb) + 3) / 4);
2790
2791 clean.m_cleanCount++;
2792 releaseSections(handle);
2793
2794 const Uint32 rt = stat.m_requestType;
2795 if ((ERROR_INSERTED(18021) && rt == IndexStatReq::RT_CLEAN_NEW) ||
2796 (ERROR_INSERTED(18022) && rt == IndexStatReq::RT_CLEAN_OLD) ||
2797 (ERROR_INSERTED(18023) && rt == IndexStatReq::RT_CLEAN_ALL))
2798 {
2799 jam();
2800 CLEAR_ERROR_INSERT_VALUE;
2801 UtilExecuteRef* utilRef =
2802 (UtilExecuteRef*)signal->getDataPtrSend();
2803 utilRef->senderData = stat.m_ownPtrI;
2804 utilRef->errorCode = UtilExecuteRef::TCError;
2805 utilRef->TCErrorCode = 626;
2806 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2807 signal, UtilExecuteRef::SignalLength, JBB);
2808 subRec->expectedConf++;
2809 return;
2810 }
2811
2812 // TRIX traps the CONF
2813 send.m_sysTable = &g_statMetaSample;
2814 send.m_operationType = UtilPrepareReq::Delete;
2815 send.m_prepareId = subRec->prepareId;
2816 subRec->expectedConf++;
2817 statSendExecute(signal, stat);
2818 }
2819
2820 void
statCleanRelease(Signal * signal,StatOp & stat)2821 Trix::statCleanRelease(Signal* signal, StatOp& stat)
2822 {
2823 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2824 D("statCleanRelease" << V(stat) << V(subRec->errorCode));
2825
2826 if (subRec->errorCode != 0)
2827 {
2828 jam();
2829 statOpError(signal, stat, subRec->errorCode, __LINE__);
2830 return;
2831 }
2832 statCleanEnd(signal, stat);
2833 }
2834
2835 void
statCleanEnd(Signal * signal,StatOp & stat)2836 Trix::statCleanEnd(Signal* signal, StatOp& stat)
2837 {
2838 D("statCleanEnd" << V(stat));
2839 statOpSuccess(signal, stat);
2840 }
2841
2842 // scan
2843
2844 void
statScanBegin(Signal * signal,StatOp & stat)2845 Trix::statScanBegin(Signal* signal, StatOp& stat)
2846 {
2847 const IndexStatImplReq* req = &stat.m_req;
2848 StatOp::Data& data = stat.m_data;
2849 D("statScanBegin" << V(stat));
2850
2851 if (data.m_head_found == true &&
2852 data.m_tableId != req->tableId)
2853 {
2854 jam();
2855 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2856 return;
2857 }
2858 data.m_tableId = req->tableId;
2859 statScanPrepare(signal, stat);
2860 }
2861
2862 void
statScanPrepare(Signal * signal,StatOp & stat)2863 Trix::statScanPrepare(Signal* signal, StatOp& stat)
2864 {
2865 const IndexStatImplReq* req = &stat.m_req;
2866 StatOp::Data& data = stat.m_data;
2867 StatOp::Scan& scan = stat.m_scan;
2868 StatOp::Send& send = stat.m_send;
2869 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2870 D("statScanPrepare" << V(stat));
2871
2872 // update sample version prior to scan
2873 if (data.m_head_found == false)
2874 data.m_sampleVersion = 0;
2875 data.m_sampleVersion += 1;
2876
2877 // zero totals
2878 scan.m_sampleCount = 0;
2879 scan.m_keyBytes = 0;
2880
2881 const Uint32 ao_list[] = {
2882 AttributeHeader::INDEX_STAT_KEY,
2883 AttributeHeader::INDEX_STAT_VALUE
2884 };
2885 const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2886
2887 ndbrequire(req->fragId != ZNIL);
2888 subRec->m_flags = 0;
2889 subRec->requestType = STAT_SCAN;
2890 subRec->schemaTransId = req->transId;
2891 subRec->userReference = 0; // not used
2892 subRec->connectionPtr = RNIL;
2893 subRec->subscriptionId = rand();
2894 subRec->subscriptionKey = rand();
2895 subRec->prepareId = RNIL;
2896 subRec->indexType = 0; // not used
2897 subRec->sourceTableId = data.m_indexId;
2898 subRec->targetTableId = RNIL;
2899 subRec->noOfIndexColumns = ao_size;
2900 subRec->noOfKeyColumns = 0;
2901 subRec->parallelism = 16; // remains hardcoded for now
2902 subRec->fragCount = 0; // XXX Suma currently checks all frags
2903 subRec->fragId = req->fragId;
2904 subRec->syncPtr = RNIL;
2905 subRec->errorCode = BuildIndxRef::NoError;
2906 subRec->subscriptionCreated = false;
2907 subRec->pendingSubSyncContinueConf = false;
2908 subRec->expectedConf = 0;
2909 subRec->m_rows_processed = 0;
2910 subRec->m_gci = 0;
2911
2912 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2913 ndbrequire(ao_buf.isEmpty());
2914 ao_buf.append(ao_list, ao_size);
2915
2916 // TRIX traps the CONF
2917 send.m_sysTable = &g_statMetaSample;
2918 send.m_operationType = UtilPrepareReq::Insert;
2919 statSendPrepare(signal, stat);
2920 }
2921
2922 void
statScanExecute(Signal * signal,StatOp & stat)2923 Trix::statScanExecute(Signal* signal, StatOp& stat)
2924 {
2925 StatOp::Data& data = stat.m_data;
2926 StatOp::Scan& scan = stat.m_scan;
2927 StatOp::Send& send = stat.m_send;
2928 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2929 D("statScanExecute" << V(stat));
2930
2931 CRASH_INSERTION(18026);
2932
2933 SectionHandle handle(this, signal);
2934 ndbrequire(handle.m_cnt == 2);
2935
2936 // ATTR_INFO
2937 AttributeHeader ah[2];
2938 SegmentedSectionPtr ptr0;
2939 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2940 ndbrequire(ptr0.sz == 2);
2941 ::copy((Uint32*)ah, ptr0);
2942 ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
2943 ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
2944 // read via TUP rounds bytes to words
2945 const Uint32 kz = ah[0].getDataSize();
2946 const Uint32 vz = ah[1].getDataSize();
2947 ndbrequire(kz != 0 && vz != 0);
2948
2949 // AFTER_VALUES
2950 // avmax = length + max key size + length + max value size
2951 const Uint32 avmax = 2 + MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
2952 Uint32 av[avmax];
2953 SegmentedSectionPtr ptr1;
2954 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2955 ndbrequire(ptr1.sz <= avmax);
2956 ::copy(av, ptr1);
2957 data.m_statKey = &av[0];
2958 data.m_statValue = &av[kz];
2959 const unsigned char* kp = (const unsigned char*)data.m_statKey;
2960 const unsigned char* vp = (const unsigned char*)data.m_statValue;
2961 const Uint32 kb = kp[0] + (kp[1] << 8);
2962 const Uint32 vb = vp[0] + (vp[1] << 8);
2963 // key and value are not empty
2964 ndbrequire(kb != 0 && vb != 0);
2965 ndbrequire(kz == ((2 + kb) + 3) / 4);
2966 ndbrequire(vz == ((2 + vb) + 3) / 4);
2967
2968 scan.m_sampleCount++;
2969 scan.m_keyBytes += kb;
2970 releaseSections(handle);
2971
2972 if (ERROR_INSERTED(18024))
2973 {
2974 jam();
2975 CLEAR_ERROR_INSERT_VALUE;
2976 UtilExecuteRef* utilRef =
2977 (UtilExecuteRef*)signal->getDataPtrSend();
2978 utilRef->senderData = stat.m_ownPtrI;
2979 utilRef->errorCode = UtilExecuteRef::TCError;
2980 utilRef->TCErrorCode = 630;
2981 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2982 signal, UtilExecuteRef::SignalLength, JBB);
2983 subRec->expectedConf++;
2984 return;
2985 }
2986
2987 // TRIX traps the CONF
2988 send.m_sysTable = &g_statMetaSample;
2989 send.m_operationType = UtilPrepareReq::Insert;
2990 send.m_prepareId = subRec->prepareId;
2991 subRec->expectedConf++;
2992 statSendExecute(signal, stat);
2993 }
2994
2995 void
statScanRelease(Signal * signal,StatOp & stat)2996 Trix::statScanRelease(Signal* signal, StatOp& stat)
2997 {
2998 StatOp::Data& data = stat.m_data;
2999 StatOp::Scan& scan = stat.m_scan;
3000 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
3001 D("statScanRelease" << V(stat) << V(subRec->errorCode));
3002
3003 if (subRec->errorCode != 0)
3004 {
3005 jam();
3006 statOpError(signal, stat, subRec->errorCode, __LINE__);
3007 return;
3008 }
3009 subRec->requestType = STAT_UTIL;
3010
3011 const Uint32 now = (Uint32)time(0);
3012 data.m_loadTime = now;
3013 data.m_sampleCount = scan.m_sampleCount;
3014 data.m_keyBytes = scan.m_keyBytes;
3015 data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
3016
3017 if (data.m_head_found == false)
3018 {
3019 jam();
3020 statHeadInsert(signal, stat);
3021 }
3022 else
3023 {
3024 jam();
3025 statHeadUpdate(signal, stat);
3026 }
3027 }
3028
3029 void
statScanEnd(Signal * signal,StatOp & stat)3030 Trix::statScanEnd(Signal* signal, StatOp& stat)
3031 {
3032 StatOp::Data& data = stat.m_data;
3033 const IndexStatImplReq* req = &stat.m_req;
3034 D("statScanEnd" << V(stat));
3035
3036 /*
3037 * TRIX reports stats load time to TUX for proper stats monitoring.
3038 * Passing this via DBDICT RT_START_MON is not feasible. For MT-LQH
3039 * we prefer DbtuxProxy to avoid introducing MT-LQH into TRIX.
3040 */
3041
3042 #ifdef trix_index_stat_rep_to_tux_instance
3043 Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
3044 BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
3045 #else
3046 BlockReference tuxRef = DBTUX_REF;
3047 #endif
3048
3049 IndexStatRep* rep = (IndexStatRep*)signal->getDataPtrSend();
3050 rep->senderRef = reference();
3051 rep->senderData = 0;
3052 rep->requestType = IndexStatRep::RT_UPDATE_CONF;
3053 rep->requestFlag = 0;
3054 rep->indexId = req->indexId;
3055 rep->indexVersion = req->indexVersion;
3056 rep->tableId = req->tableId;
3057 rep->fragId = req->fragId;
3058 rep->loadTime = data.m_loadTime;
3059 sendSignal(tuxRef, GSN_INDEX_STAT_REP,
3060 signal, IndexStatRep::SignalLength, JBB);
3061
3062 statOpSuccess(signal, stat);
3063 }
3064
3065 // drop
3066
3067 void
statDropBegin(Signal * signal,StatOp & stat)3068 Trix::statDropBegin(Signal* signal, StatOp& stat)
3069 {
3070 StatOp::Data& data = stat.m_data;
3071 D("statDropBegin" << V(stat));
3072
3073 if (data.m_head_found == true)
3074 {
3075 jam();
3076 statHeadDelete(signal, stat);
3077 return;
3078 }
3079 statDropEnd(signal, stat);
3080 }
3081
3082 void
statDropEnd(Signal * signal,StatOp & stat)3083 Trix::statDropEnd(Signal* signal, StatOp& stat)
3084 {
3085 D("statDropEnd");
3086 statOpSuccess(signal, stat);
3087 }
3088
3089 // send
3090
3091 void
statSendPrepare(Signal * signal,StatOp & stat)3092 Trix::statSendPrepare(Signal* signal, StatOp& stat)
3093 {
3094 StatOp::Send& send = stat.m_send;
3095 const IndexStatImplReq* req = &stat.m_req;
3096 const SysTable& sysTable = *send.m_sysTable;
3097 D("statSendPrepare" << V(stat));
3098
3099 UtilPrepareReq* utilReq =
3100 (UtilPrepareReq*)signal->getDataPtrSend();
3101 utilReq->senderData = stat.m_ownPtrI;
3102 utilReq->senderRef = reference();
3103 utilReq->schemaTransId = req->transId;
3104
3105 Uint32 wbuf[256];
3106 LinearWriter w(&wbuf[0], sizeof(wbuf) >> 2);
3107
3108 w.first();
3109 w.add(UtilPrepareReq::NoOfOperations, 1);
3110 w.add(UtilPrepareReq::OperationType, send.m_operationType);
3111 w.add(UtilPrepareReq::TableId, sysTable.tableId);
3112
3113 Uint32 i;
3114 for (i = 0; i < sysTable.columnCount; i++) {
3115 const SysColumn& c = sysTable.columnList[i];
3116 switch (send.m_operationType) {
3117 case UtilPrepareReq::Read:
3118 case UtilPrepareReq::Insert:
3119 case UtilPrepareReq::Update:
3120 jam();
3121 w.add(UtilPrepareReq::AttributeId, i);
3122 break;
3123 case UtilPrepareReq::Delete:
3124 jam();
3125 if (c.keyFlag)
3126 w.add(UtilPrepareReq::AttributeId, i);
3127 break;
3128 default:
3129 ndbabort();
3130 }
3131 }
3132
3133 LinearSectionPtr ptr[3];
3134 ptr[0].p = &wbuf[0];
3135 ptr[0].sz = w.getWordsUsed();
3136 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
3137 signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
3138 }
3139
3140 void
statSendExecute(Signal * signal,StatOp & stat)3141 Trix::statSendExecute(Signal* signal, StatOp& stat)
3142 {
3143 D("statSendExecute" << V(stat));
3144 StatOp::Send& send = stat.m_send;
3145 StatOp::Attr& attr = stat.m_attr;
3146 const SysTable& sysTable = *send.m_sysTable;
3147
3148 UtilExecuteReq* utilReq =
3149 (UtilExecuteReq*)signal->getDataPtrSend();
3150 utilReq->senderData = stat.m_ownPtrI;
3151 utilReq->senderRef = reference();
3152 utilReq->prepareId = send.m_prepareId;
3153 utilReq->scanTakeOver = 0;
3154
3155 Uint32 wattr[20];
3156 Uint32 wdata[2048];
3157 attr.m_attr = wattr;
3158 attr.m_attrMax = 20;
3159 attr.m_attrSize = 0;
3160 attr.m_data = wdata;
3161 attr.m_dataMax = 2048;
3162 attr.m_dataSize = 0;
3163
3164 for (Uint32 i = 0; i < sysTable.columnCount; i++) {
3165 const SysColumn& c = sysTable.columnList[i];
3166 switch (send.m_operationType) {
3167 case UtilPrepareReq::Read:
3168 case UtilPrepareReq::Insert:
3169 case UtilPrepareReq::Update:
3170 jam();
3171 statDataOut(stat, i);
3172 break;
3173 case UtilPrepareReq::Delete:
3174 jam();
3175 if (c.keyFlag)
3176 statDataOut(stat, i);
3177 break;
3178 default:
3179 ndbabort();
3180 }
3181 }
3182
3183 LinearSectionPtr ptr[3];
3184 ptr[0].p = attr.m_attr;
3185 ptr[0].sz = attr.m_attrSize;
3186 ptr[1].p = attr.m_data;
3187 ptr[1].sz = attr.m_dataSize;
3188 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
3189 signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
3190 }
3191
3192 void
statSendRelease(Signal * signal,StatOp & stat)3193 Trix::statSendRelease(Signal* signal, StatOp& stat)
3194 {
3195 D("statSendRelease" << V(stat));
3196 StatOp::Send& send = stat.m_send;
3197 ndbrequire(send.m_prepareId != RNIL);
3198
3199 UtilReleaseReq* utilReq =
3200 (UtilReleaseReq*)signal->getDataPtrSend();
3201 utilReq->senderData = stat.m_ownPtrI;
3202 utilReq->prepareId = send.m_prepareId;
3203 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
3204 signal, UtilReleaseReq::SignalLength, JBB);
3205 }
3206
3207 // data
3208
3209 void
statDataPtr(StatOp & stat,Uint32 i,Uint32 * & dptr,Uint32 & bytes)3210 Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
3211 {
3212 StatOp::Data& data = stat.m_data;
3213 StatOp::Send& send = stat.m_send;
3214
3215 const SysTable& sysTable = *send.m_sysTable;
3216 ndbrequire(i < sysTable.columnCount);
3217 //UNUSED const SysColumn& c = sysTable.columnList[i];
3218
3219 if (&sysTable == &g_statMetaHead)
3220 {
3221 switch (i) {
3222 case 0:
3223 dptr = &data.m_indexId;
3224 bytes = 4;
3225 break;
3226 case 1:
3227 dptr = &data.m_indexVersion;
3228 bytes = 4;
3229 break;
3230 case 2:
3231 dptr = &data.m_tableId;
3232 bytes = 4;
3233 break;
3234 case 3:
3235 dptr = &data.m_fragCount;
3236 bytes = 4;
3237 break;
3238 case 4:
3239 dptr = &data.m_valueFormat;
3240 bytes = 4;
3241 break;
3242 case 5:
3243 dptr = &data.m_sampleVersion;
3244 bytes = 4;
3245 break;
3246 case 6:
3247 dptr = &data.m_loadTime;
3248 bytes = 4;
3249 break;
3250 case 7:
3251 dptr = &data.m_sampleCount;
3252 bytes = 4;
3253 break;
3254 case 8:
3255 dptr = &data.m_keyBytes;
3256 bytes = 4;
3257 break;
3258 default:
3259 ndbabort();
3260 }
3261 return;
3262 }
3263
3264 if (&sysTable == &g_statMetaSample)
3265 {
3266 switch (i) {
3267 case 0:
3268 dptr = &data.m_indexId;
3269 bytes = 4;
3270 break;
3271 case 1:
3272 dptr = &data.m_indexVersion;
3273 bytes = 4;
3274 break;
3275 case 2:
3276 dptr = &data.m_sampleVersion;
3277 bytes = 4;
3278 break;
3279 case 3:
3280 {
3281 dptr = data.m_statKey;
3282 const uchar* p = (uchar*)dptr;
3283 ndbrequire(p != 0);
3284 bytes = 2 + p[0] + (p[1] << 8);
3285 }
3286 break;
3287 case 4:
3288 {
3289 dptr = data.m_statValue;
3290 const uchar* p = (uchar*)dptr;
3291 ndbrequire(p != 0);
3292 bytes = 2 + p[0] + (p[1] << 8);
3293 }
3294 break;
3295 default:
3296 ndbabort();
3297 }
3298 return;
3299 }
3300
3301 ndbabort();
3302 }
3303
3304 void
statDataOut(StatOp & stat,Uint32 i)3305 Trix::statDataOut(StatOp& stat, Uint32 i)
3306 {
3307 StatOp::Attr& attr = stat.m_attr;
3308 Uint32* dptr = 0;
3309 Uint32 bytes = 0;
3310 statDataPtr(stat, i, dptr, bytes);
3311
3312 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3313 AttributeHeader::init(&attr.m_attr[attr.m_attrSize], i, bytes);
3314 attr.m_attrSize++;
3315
3316 Uint32 words = (bytes + 3) / 4;
3317 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3318 Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
3319 memcpy(dst, dptr, bytes);
3320 while (bytes < words * 4)
3321 dst[bytes++] = 0;
3322 attr.m_dataSize += words;
3323 D("statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
3324 }
3325
3326 void
statDataIn(StatOp & stat,Uint32 i)3327 Trix::statDataIn(StatOp& stat, Uint32 i)
3328 {
3329 StatOp::Attr& attr = stat.m_attr;
3330 Uint32* dptr = 0;
3331 Uint32 bytes = 0;
3332 statDataPtr(stat, i, dptr, bytes);
3333
3334 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3335 const AttributeHeader& ah = attr.m_attr[attr.m_attrSize];
3336 attr.m_attrSize++;
3337
3338 ndbrequire(ah.getByteSize() == bytes);
3339 Uint32 words = (bytes + 3) / 4;
3340 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3341 const char* src = (const char*)&attr.m_data[attr.m_dataSize];
3342 memcpy(dptr, src, bytes);
3343 attr.m_dataSize += words;
3344 D("statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
3345 }
3346
3347 // abort ongoing
3348
3349 void
statAbortUtil(Signal * signal,StatOp & stat)3350 Trix::statAbortUtil(Signal* signal, StatOp& stat)
3351 {
3352 StatOp::Util& util = stat.m_util;
3353 D("statAbortUtil" << V(stat));
3354
3355 ndbrequire(util.m_prepareId != RNIL);
3356 util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
3357 util.m_cb.m_callbackData = stat.m_ownPtrI;
3358 statUtilRelease(signal, stat);
3359 }
3360
3361 void
statAbortUtilCB(Signal * signal,Uint32 statPtrI,Uint32 ret)3362 Trix::statAbortUtilCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
3363 {
3364 StatOp& stat = statOpGetPtr(statPtrI);
3365 D("statAbortUtilCB" << V(stat) << V(ret));
3366
3367 ndbrequire(ret == 0);
3368 statOpAbort(signal, stat);
3369 }
3370
3371 // conf and ref
3372
3373 void
statOpSuccess(Signal * signal,StatOp & stat)3374 Trix::statOpSuccess(Signal* signal, StatOp& stat)
3375 {
3376 StatOp::Data& data = stat.m_data;
3377 D("statOpSuccess" << V(stat));
3378
3379 if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
3380 statOpEvent(stat, "I", "created %u samples", data.m_sampleCount);
3381
3382 statOpConf(signal, stat);
3383 statOpRelease(stat);
3384 }
3385
3386 void
statOpConf(Signal * signal,StatOp & stat)3387 Trix::statOpConf(Signal* signal, StatOp& stat)
3388 {
3389 const IndexStatImplReq* req = &stat.m_req;
3390 D("statOpConf" << V(stat));
3391
3392 IndexStatImplConf* conf = (IndexStatImplConf*)signal->getDataPtrSend();
3393 conf->senderRef = reference();
3394 conf->senderData = req->senderData;
3395 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
3396 signal, IndexStatImplConf::SignalLength, JBB);
3397 }
3398
3399 void
statOpError(Signal * signal,StatOp & stat,Uint32 errorCode,Uint32 errorLine,const Uint32 * supress)3400 Trix::statOpError(Signal* signal, StatOp& stat,
3401 Uint32 errorCode, Uint32 errorLine,
3402 const Uint32 * supress)
3403 {
3404 D("statOpError" << V(stat) << V(errorCode) << V(errorLine));
3405
3406 if (supress)
3407 {
3408 for (Uint32 i = 0; supress[i] != 0; i++)
3409 {
3410 if (errorCode == supress[i])
3411 {
3412 goto do_supress;
3413 }
3414 }
3415 }
3416 statOpEvent(stat, "W", "error %u line %u", errorCode, errorLine);
3417
3418 do_supress:
3419 ndbrequire(stat.m_errorCode == 0);
3420 stat.m_errorCode = errorCode;
3421 stat.m_errorLine = errorLine;
3422 statOpAbort(signal, stat);
3423 }
3424
3425 void
statOpAbort(Signal * signal,StatOp & stat)3426 Trix::statOpAbort(Signal* signal, StatOp& stat)
3427 {
3428 StatOp::Util& util = stat.m_util;
3429 D("statOpAbort" << V(stat));
3430
3431 if (util.m_prepareId != RNIL)
3432 {
3433 jam();
3434 // returns here when done
3435 statAbortUtil(signal, stat);
3436 return;
3437 }
3438 statOpRef(signal, stat);
3439 statOpRelease(stat);
3440 }
3441
3442 void
statOpRef(Signal * signal,StatOp & stat)3443 Trix::statOpRef(Signal* signal, StatOp& stat)
3444 {
3445 const IndexStatImplReq* req = &stat.m_req;
3446 D("statOpRef" << V(stat));
3447
3448 statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
3449 }
3450
3451 void
statOpRef(Signal * signal,const IndexStatImplReq * req,Uint32 errorCode,Uint32 errorLine)3452 Trix::statOpRef(Signal* signal, const IndexStatImplReq* req,
3453 Uint32 errorCode, Uint32 errorLine)
3454 {
3455 D("statOpRef" << V(errorCode) << V(errorLine));
3456
3457 IndexStatImplRef* ref = (IndexStatImplRef*)signal->getDataPtrSend();
3458 ref->senderRef = reference();
3459 ref->senderData = req->senderData;
3460 ref->errorCode = errorCode;
3461 ref->errorLine = errorLine;
3462 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
3463 signal, IndexStatImplRef::SignalLength, JBB);
3464 }
3465
3466 void
statOpEvent(StatOp & stat,const char * level,const char * msg,...)3467 Trix::statOpEvent(StatOp& stat, const char* level, const char* msg, ...)
3468 {
3469 //UNUSED const IndexStatImplReq* req = &stat.m_req;
3470 StatOp::Data& data = stat.m_data;
3471
3472 char tmp1[100];
3473 va_list ap;
3474 va_start(ap, msg);
3475 BaseString::vsnprintf(tmp1, sizeof(tmp1), msg, ap);
3476 va_end(ap);
3477
3478 char tmp2[100];
3479 BaseString::snprintf(tmp2, sizeof(tmp2),
3480 "index %u stats version %u: %s: %s",
3481 data.m_indexId, data.m_sampleVersion,
3482 stat.m_requestName, tmp1);
3483
3484 D("statOpEvent" << V(level) << V(tmp2));
3485
3486 if (level[0] == 'I')
3487 infoEvent("%s", tmp2);
3488 if (level[0] == 'W')
3489 warningEvent("%s", tmp2);
3490 }
3491
3492 // debug
3493
3494 class NdbOut&
operator <<(NdbOut & out,const Trix::StatOp & stat)3495 operator<<(NdbOut& out, const Trix::StatOp& stat)
3496 {
3497 out << "[";
3498 out << " i:" << stat.m_ownPtrI;
3499 out << " head_found:" << stat.m_data.m_head_found;
3500 out << " ]";
3501 return out;
3502 }
3503
3504
3505 BLOCK_FUNCTIONS(Trix)
3506