1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "Trix.hpp"
26
27 #include <string.h>
28 #include <kernel_types.h>
29 #include <NdbOut.hpp>
30
31 #include <signaldata/ReadNodesConf.hpp>
32 #include <signaldata/NodeFailRep.hpp>
33 #include <signaldata/DumpStateOrd.hpp>
34 #include <signaldata/GetTabInfo.hpp>
35 #include <signaldata/DictTabInfo.hpp>
36 #include <signaldata/CopyData.hpp>
37 #include <signaldata/BuildIndxImpl.hpp>
38 #include <signaldata/BuildFKImpl.hpp>
39 #include <signaldata/SumaImpl.hpp>
40 #include <signaldata/UtilPrepare.hpp>
41 #include <signaldata/UtilExecute.hpp>
42 #include <signaldata/UtilRelease.hpp>
43 #include <SectionReader.hpp>
44 #include <AttributeHeader.hpp>
45 #include <signaldata/TcKeyReq.hpp>
46
47 #include <signaldata/DbinfoScan.hpp>
48 #include <signaldata/TransIdAI.hpp>
49 #include <signaldata/WaitGCP.hpp>
50
51 #define JAM_FILE_ID 433
52
53
54 #define CONSTRAINT_VIOLATION 893
55 #define TUPLE_NOT_FOUND 626
56 #define FK_NO_PARENT_ROW_EXISTS 255
57
58 static
59 bool
check_timeout(Uint32 errCode)60 check_timeout(Uint32 errCode)
61 {
62 switch(errCode){
63 case 266:
64 return true;
65 }
66 return false;
67 }
68
69 #define DEBUG(x) { ndbout << "TRIX::" << x << endl; }
70
71 /**
72 *
73 */
Trix(Block_context & ctx)74 Trix::Trix(Block_context& ctx) :
75 SimulatedBlock(TRIX, ctx),
76 c_theNodes(c_theNodeRecPool),
77 c_masterNodeId(0),
78 c_masterTrixRef(0),
79 c_noNodesFailed(0),
80 c_noActiveNodes(0),
81 c_theSubscriptions(c_theSubscriptionRecPool)
82 {
83 BLOCK_CONSTRUCTOR(Trix);
84
85 // Add received signals
86 addRecSignal(GSN_READ_CONFIG_REQ, &Trix::execREAD_CONFIG_REQ);
87 addRecSignal(GSN_STTOR, &Trix::execSTTOR);
88 addRecSignal(GSN_NDB_STTOR, &Trix::execNDB_STTOR); // Forwarded from DICT
89 addRecSignal(GSN_READ_NODESCONF, &Trix::execREAD_NODESCONF);
90 addRecSignal(GSN_READ_NODESREF, &Trix::execREAD_NODESREF);
91 addRecSignal(GSN_NODE_FAILREP, &Trix::execNODE_FAILREP);
92 addRecSignal(GSN_INCL_NODEREQ, &Trix::execINCL_NODEREQ);
93 addRecSignal(GSN_DUMP_STATE_ORD, &Trix::execDUMP_STATE_ORD);
94 addRecSignal(GSN_DBINFO_SCANREQ, &Trix::execDBINFO_SCANREQ);
95
96 // Index build
97 addRecSignal(GSN_BUILD_INDX_IMPL_REQ, &Trix::execBUILD_INDX_IMPL_REQ);
98 // Dump testing
99 addRecSignal(GSN_BUILD_INDX_IMPL_CONF, &Trix::execBUILD_INDX_IMPL_CONF);
100 addRecSignal(GSN_BUILD_INDX_IMPL_REF, &Trix::execBUILD_INDX_IMPL_REF);
101
102 addRecSignal(GSN_COPY_DATA_IMPL_REQ, &Trix::execCOPY_DATA_IMPL_REQ);
103 addRecSignal(GSN_BUILD_FK_IMPL_REQ, &Trix::execBUILD_FK_IMPL_REQ);
104
105 addRecSignal(GSN_UTIL_PREPARE_CONF, &Trix::execUTIL_PREPARE_CONF);
106 addRecSignal(GSN_UTIL_PREPARE_REF, &Trix::execUTIL_PREPARE_REF);
107 addRecSignal(GSN_UTIL_EXECUTE_CONF, &Trix::execUTIL_EXECUTE_CONF);
108 addRecSignal(GSN_UTIL_EXECUTE_REF, &Trix::execUTIL_EXECUTE_REF);
109 addRecSignal(GSN_UTIL_RELEASE_CONF, &Trix::execUTIL_RELEASE_CONF);
110 addRecSignal(GSN_UTIL_RELEASE_REF, &Trix::execUTIL_RELEASE_REF);
111
112
113 // Suma signals
114 addRecSignal(GSN_SUB_CREATE_CONF, &Trix::execSUB_CREATE_CONF);
115 addRecSignal(GSN_SUB_CREATE_REF, &Trix::execSUB_CREATE_REF);
116 addRecSignal(GSN_SUB_REMOVE_CONF, &Trix::execSUB_REMOVE_CONF);
117 addRecSignal(GSN_SUB_REMOVE_REF, &Trix::execSUB_REMOVE_REF);
118 addRecSignal(GSN_SUB_SYNC_CONF, &Trix::execSUB_SYNC_CONF);
119 addRecSignal(GSN_SUB_SYNC_REF, &Trix::execSUB_SYNC_REF);
120 addRecSignal(GSN_SUB_SYNC_CONTINUE_REQ, &Trix::execSUB_SYNC_CONTINUE_REQ);
121 addRecSignal(GSN_SUB_TABLE_DATA, &Trix::execSUB_TABLE_DATA);
122
123 addRecSignal(GSN_WAIT_GCP_REF, &Trix::execWAIT_GCP_REF);
124 addRecSignal(GSN_WAIT_GCP_CONF, &Trix::execWAIT_GCP_CONF);
125
126 // index stats
127 addRecSignal(GSN_INDEX_STAT_IMPL_REQ, &Trix::execINDEX_STAT_IMPL_REQ);
128 addRecSignal(GSN_GET_TABINFO_CONF, &Trix::execGET_TABINFO_CONF);
129 addRecSignal(GSN_GET_TABINFOREF, &Trix::execGET_TABINFO_REF);
130
131 // index stats sys tables
132 c_statGetMetaDone = false;
133 }
134
135 /**
136 *
137 */
~Trix()138 Trix::~Trix()
139 {
140 }
141
142 void
execREAD_CONFIG_REQ(Signal * signal)143 Trix::execREAD_CONFIG_REQ(Signal* signal)
144 {
145 jamEntry();
146
147 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
148
149 Uint32 ref = req->senderRef;
150 Uint32 senderData = req->senderData;
151
152 const ndb_mgm_configuration_iterator * p =
153 m_ctx.m_config.getOwnConfigIterator();
154 ndbrequire(p != 0);
155
156 // Allocate pool sizes
157 c_theAttrOrderBufferPool.setSize(100);
158 c_theSubscriptionRecPool.setSize(100);
159 c_statOpPool.setSize(5);
160
161 DLList<SubscriptionRecord> subscriptions(c_theSubscriptionRecPool);
162 SubscriptionRecPtr subptr;
163 while (subscriptions.seizeFirst(subptr) == true) {
164 new (subptr.p) SubscriptionRecord(c_theAttrOrderBufferPool);
165 }
166 while (subscriptions.releaseFirst());
167
168 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
169 conf->senderRef = reference();
170 conf->senderData = senderData;
171 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
172 ReadConfigConf::SignalLength, JBB);
173 }
174
175 /**
176 *
177 */
execSTTOR(Signal * signal)178 void Trix::execSTTOR(Signal* signal)
179 {
180 jamEntry();
181
182 //const Uint32 startphase = signal->theData[1];
183 const Uint32 theSignalKey = signal->theData[6];
184
185 signal->theData[0] = theSignalKey;
186 signal->theData[3] = 1;
187 signal->theData[4] = 255; // No more start phases from missra
188 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
189 return;
190 }//Trix::execSTTOR()
191
192 /**
193 *
194 */
execNDB_STTOR(Signal * signal)195 void Trix::execNDB_STTOR(Signal* signal)
196 {
197 jamEntry();
198 BlockReference ndbcntrRef = signal->theData[0];
199 Uint16 startphase = signal->theData[2]; /* RESTART PHASE */
200 Uint16 mynode = signal->theData[1];
201 //Uint16 restarttype = signal->theData[3];
202 //UintR configInfo1 = signal->theData[6]; /* CONFIGRATION INFO PART 1 */
203 //UintR configInfo2 = signal->theData[7]; /* CONFIGRATION INFO PART 2 */
204 switch (startphase) {
205 case 3:
206 jam();
207 /* SYMBOLIC START PHASE 4 */
208 /* ABSOLUTE PHASE 5 */
209 /* REQUEST NODE IDENTITIES FROM DBDIH */
210 signal->theData[0] = calcTrixBlockRef(mynode);
211 sendSignal(ndbcntrRef, GSN_READ_NODESREQ, signal, 1, JBB);
212 return;
213 break;
214 case 6:
215 break;
216 default:
217 break;
218 }
219 }
220
221 /**
222 *
223 */
execREAD_NODESCONF(Signal * signal)224 void Trix::execREAD_NODESCONF(Signal* signal)
225 {
226 jamEntry();
227
228 ReadNodesConf * const readNodes = (ReadNodesConf *)signal->getDataPtr();
229 //Uint32 noOfNodes = readNodes->noOfNodes;
230 NodeRecPtr nodeRecPtr;
231
232 c_masterNodeId = readNodes->masterNodeId;
233 c_masterTrixRef = RNIL;
234 c_noNodesFailed = 0;
235
236 for(unsigned i = 0; i < MAX_NDB_NODES; i++) {
237 jam();
238 if(NdbNodeBitmask::get(readNodes->allNodes, i)) {
239 // Node is defined
240 jam();
241 ndbrequire(c_theNodes.getPool().seizeId(nodeRecPtr, i));
242 c_theNodes.addFirst(nodeRecPtr);
243 nodeRecPtr.p->trixRef = calcTrixBlockRef(i);
244 if (i == c_masterNodeId) {
245 c_masterTrixRef = nodeRecPtr.p->trixRef;
246 }
247 if(NdbNodeBitmask::get(readNodes->inactiveNodes, i)){
248 // Node is not active
249 jam();
250 /**-----------------------------------------------------------------
251 * THIS NODE IS DEFINED IN THE CLUSTER BUT IS NOT ALIVE CURRENTLY.
252 * WE ADD THE NODE TO THE SET OF FAILED NODES AND ALSO SET THE
253 * BLOCKSTATE TO BUSY TO AVOID ADDING TRIGGERS OR INDEXES WHILE
254 * NOT ALL NODES ARE ALIVE.
255 *------------------------------------------------------------------*/
256 arrGuard(c_noNodesFailed, MAX_NDB_NODES);
257 nodeRecPtr.p->alive = false;
258 c_noNodesFailed++;
259 c_blockState = Trix::NODE_FAILURE;
260 }
261 else {
262 // Node is active
263 jam();
264 c_noActiveNodes++;
265 nodeRecPtr.p->alive = true;
266 }
267 }
268 }
269 if (c_noNodesFailed == 0) {
270 c_blockState = Trix::STARTED;
271 }
272 }
273
274 /**
275 *
276 */
execREAD_NODESREF(Signal * signal)277 void Trix::execREAD_NODESREF(Signal* signal)
278 {
279 // NYI
280 }
281
282 /**
283 *
284 */
execNODE_FAILREP(Signal * signal)285 void Trix::execNODE_FAILREP(Signal* signal)
286 {
287 jamEntry();
288 NodeFailRep * const nodeFail = (NodeFailRep *) signal->getDataPtr();
289
290 //Uint32 failureNr = nodeFail->failNo;
291 //Uint32 numberNodes = nodeFail->noOfNodes;
292 Uint32 masterNodeId = nodeFail->masterNodeId;
293
294 NodeRecPtr nodeRecPtr;
295
296 for(c_theNodes.first(nodeRecPtr);
297 nodeRecPtr.i != RNIL;
298 c_theNodes.next(nodeRecPtr)) {
299 if(NdbNodeBitmask::get(nodeFail->theNodes, nodeRecPtr.i)) {
300 nodeRecPtr.p->alive = false;
301 c_noNodesFailed++;
302 c_noActiveNodes--;
303 }
304 }
305 if (c_masterNodeId != masterNodeId) {
306 c_masterNodeId = masterNodeId;
307 NodeRecord* nodeRec = c_theNodes.getPtr(masterNodeId);
308 c_masterTrixRef = nodeRec->trixRef;
309 }
310 }
311
312 /**
313 *
314 */
execINCL_NODEREQ(Signal * signal)315 void Trix::execINCL_NODEREQ(Signal* signal)
316 {
317 jamEntry();
318 UintR node_id = signal->theData[1];
319 NodeRecord* nodeRec = c_theNodes.getPtr(node_id);
320 nodeRec->alive = true;
321 c_noNodesFailed--;
322 c_noActiveNodes++;
323 nodeRec->trixRef = calcTrixBlockRef(node_id);
324 if (c_noNodesFailed == 0) {
325 c_blockState = Trix::STARTED;
326 }
327 }
328
329 // Debugging
330 void
execDUMP_STATE_ORD(Signal * signal)331 Trix::execDUMP_STATE_ORD(Signal* signal)
332 {
333 jamEntry();
334
335 DumpStateOrd * dumpStateOrd = (DumpStateOrd *)signal->getDataPtr();
336
337 switch(dumpStateOrd->args[0]) {
338 case(300): {// ok
339 // index2 -T; index2 -I -n10000; index2 -c
340 // all dump 300 0 0 0 0 0 4 2
341 // select_count INDEX0000
342 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
343
344 MEMCOPY_NO_WORDS(buildIndxReq,
345 signal->theData + 1,
346 BuildIndxImplReq::SignalLength);
347 buildIndxReq->senderRef = reference(); // return to me
348 buildIndxReq->parallelism = 10;
349 Uint32 indexColumns[1] = {1};
350 Uint32 keyColumns[1] = {0};
351 struct LinearSectionPtr ls_ptr[3];
352 ls_ptr[0].p = indexColumns;
353 ls_ptr[0].sz = 1;
354 ls_ptr[1].p = keyColumns;
355 ls_ptr[1].sz = 1;
356 sendSignal(reference(),
357 GSN_BUILD_INDX_IMPL_REQ,
358 signal,
359 BuildIndxImplReq::SignalLength,
360 JBB, ls_ptr, 2);
361 break;
362 }
363 case(301): { // ok
364 // index2 -T; index2 -I -n10000; index2 -c -p
365 // all dump 301 0 0 0 0 0 4 2
366 // select_count INDEX0000
367 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
368
369 MEMCOPY_NO_WORDS(buildIndxReq,
370 signal->theData + 1,
371 BuildIndxImplReq::SignalLength);
372 buildIndxReq->senderRef = reference(); // return to me
373 buildIndxReq->parallelism = 10;
374 Uint32 indexColumns[2] = {0, 1};
375 Uint32 keyColumns[1] = {0};
376 struct LinearSectionPtr ls_ptr[3];
377 ls_ptr[0].p = indexColumns;
378 ls_ptr[0].sz = 2;
379 ls_ptr[1].p = keyColumns;
380 ls_ptr[1].sz = 1;
381 sendSignal(reference(),
382 GSN_BUILD_INDX_IMPL_REQ,
383 signal,
384 BuildIndxImplReq::SignalLength,
385 JBB, ls_ptr, 2);
386 break;
387 }
388 case(302): { // ok
389 // index -T; index -I -n1000; index -c -p
390 // all dump 302 0 0 0 0 0 4 2
391 // select_count PNUMINDEX0000
392 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
393
394 MEMCOPY_NO_WORDS(buildIndxReq,
395 signal->theData + 1,
396 BuildIndxImplReq::SignalLength);
397 buildIndxReq->senderRef = reference(); // return to me
398 buildIndxReq->parallelism = 10;
399 Uint32 indexColumns[3] = {0, 3, 5};
400 Uint32 keyColumns[1] = {0};
401 struct LinearSectionPtr ls_ptr[3];
402 ls_ptr[0].p = indexColumns;
403 ls_ptr[0].sz = 3;
404 ls_ptr[1].p = keyColumns;
405 ls_ptr[1].sz = 1;
406 sendSignal(reference(),
407 GSN_BUILD_INDX_IMPL_REQ,
408 signal,
409 BuildIndxImplReq::SignalLength,
410 JBB, ls_ptr, 2);
411 break;
412 }
413 case(303): { // ok
414 // index -T -2; index -I -2 -n1000; index -c -p
415 // all dump 303 0 0 0 0 0 4 2
416 // select_count PNUMINDEX0000
417 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
418
419 MEMCOPY_NO_WORDS(buildIndxReq,
420 signal->theData + 1,
421 BuildIndxImplReq::SignalLength);
422 buildIndxReq->senderRef = reference(); // return to me
423 buildIndxReq->parallelism = 10;
424 Uint32 indexColumns[3] = {0, 3, 5};
425 Uint32 keyColumns[2] = {0, 1};
426 struct LinearSectionPtr ls_ptr[3];
427 ls_ptr[0].p = indexColumns;
428 ls_ptr[0].sz = 3;
429 ls_ptr[1].p = keyColumns;
430 ls_ptr[1].sz = 2;
431 sendSignal(reference(),
432 GSN_BUILD_INDX_IMPL_REQ,
433 signal,
434 BuildIndxImplReq::SignalLength,
435 JBB, ls_ptr, 2);
436 break;
437 }
438 case(304): { // ok
439 // index -T -L; index -I -L -n1000; index -c -p
440 // all dump 304 0 0 0 0 0 4 2
441 // select_count PNUMINDEX0000
442 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
443
444 MEMCOPY_NO_WORDS(buildIndxReq,
445 signal->theData + 1,
446 BuildIndxImplReq::SignalLength);
447 buildIndxReq->senderRef = reference(); // return to me
448 buildIndxReq->parallelism = 10;
449 Uint32 indexColumns[3] = {0, 3, 5};
450 Uint32 keyColumns[1] = {0};
451 struct LinearSectionPtr ls_ptr[3];
452 ls_ptr[0].p = indexColumns;
453 ls_ptr[0].sz = 3;
454 ls_ptr[1].p = keyColumns;
455 ls_ptr[1].sz = 1;
456 sendSignal(reference(),
457 GSN_BUILD_INDX_IMPL_REQ,
458 signal,
459 BuildIndxImplReq::SignalLength,
460 JBB, ls_ptr, 2);
461 break;
462 }
463 case(305): { // ok
464 // index -T -2 -L; index -I -2 -L -n1000; index -c -p
465 // all dump 305 0 0 0 0 0 4 2
466 // select_count PNUMINDEX0000
467 BuildIndxImplReq * buildIndxReq = (BuildIndxImplReq *)signal->getDataPtrSend();
468
469 MEMCOPY_NO_WORDS(buildIndxReq,
470 signal->theData + 1,
471 BuildIndxImplReq::SignalLength);
472 buildIndxReq->senderRef = reference(); // return to me
473 buildIndxReq->parallelism = 10;
474 Uint32 indexColumns[3] = {0, 3, 5};
475 Uint32 keyColumns[2] = {0, 1};
476 struct LinearSectionPtr ls_ptr[3];
477 ls_ptr[0].p = indexColumns;
478 ls_ptr[0].sz = 3;
479 ls_ptr[1].p = keyColumns;
480 ls_ptr[1].sz = 2;
481 sendSignal(reference(),
482 GSN_BUILD_INDX_IMPL_REQ,
483 signal,
484 BuildIndxImplReq::SignalLength,
485 JBB, ls_ptr, 2);
486 break;
487 }
488 default: {
489 // Ignore
490 }
491 }
492
493 if (signal->theData[0] == DumpStateOrd::SchemaResourceSnapshot)
494 {
495 RSS_AP_SNAPSHOT_SAVE(c_theSubscriptionRecPool);
496 RSS_AP_SNAPSHOT_SAVE(c_statOpPool);
497 return;
498 }
499
500 if (signal->theData[0] == DumpStateOrd::SchemaResourceCheckLeak)
501 {
502 RSS_AP_SNAPSHOT_CHECK(c_theSubscriptionRecPool);
503 RSS_AP_SNAPSHOT_CHECK(c_statOpPool);
504 return;
505 }
506
507 if (signal->theData[0] == 8004)
508 {
509 infoEvent("TRIX: c_theSubscriptionRecPool size: %u free: %u",
510 c_theSubscriptionRecPool.getSize(),
511 c_theSubscriptionRecPool.getNoOfFree());
512 return;
513 }
514
515 }
516
execDBINFO_SCANREQ(Signal * signal)517 void Trix::execDBINFO_SCANREQ(Signal *signal)
518 {
519 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
520 const Ndbinfo::ScanCursor* cursor =
521 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
522 Ndbinfo::Ratelimit rl;
523
524 jamEntry();
525
526 switch(req.tableId){
527 case Ndbinfo::POOLS_TABLEID:
528 {
529 Ndbinfo::pool_entry pools[] =
530 {
531 { "Attribute Order Buffer",
532 c_theAttrOrderBufferPool.getUsed(),
533 c_theAttrOrderBufferPool.getSize(),
534 c_theAttrOrderBufferPool.getEntrySize(),
535 c_theAttrOrderBufferPool.getUsedHi(),
536 { 0,0,0,0 }},
537 { "Subscription Record",
538 c_theSubscriptionRecPool.getUsed(),
539 c_theSubscriptionRecPool.getSize(),
540 c_theSubscriptionRecPool.getEntrySize(),
541 c_theSubscriptionRecPool.getUsedHi(),
542 { 0,0,0,0 }},
543 { NULL, 0,0,0,0,{0,0,0,0}}
544 };
545
546 const size_t num_config_params =
547 sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
548 Uint32 pool = cursor->data[0];
549 BlockNumber bn = blockToMain(number());
550 while(pools[pool].poolname)
551 {
552 jam();
553 Ndbinfo::Row row(signal, req);
554 row.write_uint32(getOwnNodeId());
555 row.write_uint32(bn); // block number
556 row.write_uint32(instance()); // block instance
557 row.write_string(pools[pool].poolname);
558 row.write_uint64(pools[pool].used);
559 row.write_uint64(pools[pool].total);
560 row.write_uint64(pools[pool].used_hi);
561 row.write_uint64(pools[pool].entry_size);
562 for (size_t i = 0; i < num_config_params; i++)
563 row.write_uint32(pools[pool].config_params[i]);
564 ndbinfo_send_row(signal, req, row, rl);
565 pool++;
566 if (rl.need_break(req))
567 {
568 jam();
569 ndbinfo_send_scan_break(signal, req, rl, pool);
570 return;
571 }
572 }
573 break;
574 }
575 default:
576 break;
577 }
578
579 ndbinfo_send_scan_conf(signal, req, rl);
580 }
581
582 // Build index
execBUILD_INDX_IMPL_REQ(Signal * signal)583 void Trix:: execBUILD_INDX_IMPL_REQ(Signal* signal)
584 {
585 jamEntry();
586 DBUG_ENTER("Trix:: execBUILD_INDX_IMPL_REQ");
587
588 const BuildIndxImplReq
589 buildIndxReqData = *(const BuildIndxImplReq*)signal->getDataPtr(),
590 *buildIndxReq = &buildIndxReqData;
591
592 // Seize a subscription record
593 SubscriptionRecPtr subRecPtr;
594 SubscriptionRecord* subRec;
595 SectionHandle handle(this, signal);
596
597 if (ERROR_INSERTED_CLEAR(18000))
598 {
599 sendSignalWithDelay(reference(), GSN_BUILD_INDX_IMPL_REQ, signal, 1000,
600 signal->getLength(), &handle);
601 DBUG_VOID_RETURN;
602 }
603
604 if (!c_theSubscriptions.getPool().seizeId(subRecPtr, buildIndxReq->buildId)) {
605 jam();
606 // Failed to allocate subscription record
607 BuildIndxRef* buildIndxRef = (BuildIndxRef*)signal->getDataPtrSend();
608
609 buildIndxRef->errorCode = BuildIndxRef::AllocationFailure;
610 releaseSections(handle);
611 sendSignal(buildIndxReq->senderRef, GSN_BUILD_INDX_IMPL_REF, signal,
612 BuildIndxRef::SignalLength, JBB);
613 DBUG_VOID_RETURN;
614 }
615 c_theSubscriptions.addFirst(subRecPtr);
616
617 subRec = subRecPtr.p;
618 subRec->errorCode = BuildIndxRef::NoError;
619 subRec->userReference = buildIndxReq->senderRef;
620 subRec->connectionPtr = buildIndxReq->senderData;
621 subRec->schemaTransId = buildIndxReq->transId;
622 subRec->subscriptionId = buildIndxReq->buildId;
623 subRec->subscriptionKey = buildIndxReq->buildKey;
624 subRec->indexType = buildIndxReq->indexType;
625 subRec->sourceTableId = buildIndxReq->tableId;
626 subRec->targetTableId = buildIndxReq->indexId;
627 subRec->parallelism = buildIndxReq->parallelism;
628 subRec->expectedConf = 0;
629 subRec->subscriptionCreated = false;
630 subRec->pendingSubSyncContinueConf = false;
631 subRec->prepareId = RNIL;
632 subRec->requestType = INDEX_BUILD;
633 subRec->fragCount = 0;
634 subRec->fragId = ZNIL;
635 subRec->m_rows_processed = 0;
636 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
637 subRec->m_gci = 0;
638 if (buildIndxReq->requestType & BuildIndxImplReq::RF_NO_DISK)
639 {
640 subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
641 }
642
643 // Get column order segments
644 Uint32 noOfSections = handle.m_cnt;
645 if (noOfSections > 0) {
646 jam();
647 SegmentedSectionPtr ptr;
648 handle.getSection(ptr, BuildIndxImplReq::INDEX_COLUMNS);
649 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
650 subRec->noOfIndexColumns = ptr.sz;
651 }
652 if (noOfSections > 1) {
653 jam();
654 SegmentedSectionPtr ptr;
655 handle.getSection(ptr, BuildIndxImplReq::KEY_COLUMNS);
656 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
657 subRec->noOfKeyColumns = ptr.sz;
658 }
659
660 #if 0
661 // Debugging
662 printf("Trix:: execBUILD_INDX_IMPL_REQ: Attribute order:\n");
663 subRec->attributeOrder.print(stdout);
664 #endif
665
666 releaseSections(handle);
667 prepareInsertTransactions(signal, subRecPtr);
668 DBUG_VOID_RETURN;
669 }
670
execBUILD_INDX_IMPL_CONF(Signal * signal)671 void Trix:: execBUILD_INDX_IMPL_CONF(Signal* signal)
672 {
673 printf("Trix:: execBUILD_INDX_IMPL_CONF\n");
674 }
675
execBUILD_INDX_IMPL_REF(Signal * signal)676 void Trix:: execBUILD_INDX_IMPL_REF(Signal* signal)
677 {
678 printf("Trix:: execBUILD_INDX_IMPL_REF\n");
679 }
680
execUTIL_PREPARE_CONF(Signal * signal)681 void Trix::execUTIL_PREPARE_CONF(Signal* signal)
682 {
683 jamEntry();
684 UtilPrepareConf * utilPrepareConf = (UtilPrepareConf *)signal->getDataPtr();
685 SubscriptionRecPtr subRecPtr;
686 SubscriptionRecord* subRec;
687
688 subRecPtr.i = utilPrepareConf->senderData;
689 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
690 printf("Trix::execUTIL_PREPARE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
691 return;
692 }
693 if (subRec->requestType == STAT_UTIL)
694 {
695 statUtilPrepareConf(signal, subRec->m_statPtrI);
696 return;
697 }
698 subRecPtr.p = subRec;
699 subRec->prepareId = utilPrepareConf->prepareId;
700 setupSubscription(signal, subRecPtr);
701 }
702
execUTIL_PREPARE_REF(Signal * signal)703 void Trix::execUTIL_PREPARE_REF(Signal* signal)
704 {
705 jamEntry();
706 UtilPrepareRef * utilPrepareRef = (UtilPrepareRef *)signal->getDataPtr();
707 SubscriptionRecPtr subRecPtr;
708 SubscriptionRecord* subRec;
709
710 subRecPtr.i = utilPrepareRef->senderData;
711 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
712 printf("Trix::execUTIL_PREPARE_REF: Failed to find subscription data %u\n", subRecPtr.i);
713 return;
714 }
715 if (subRec->requestType == STAT_UTIL)
716 {
717 statUtilPrepareRef(signal, subRec->m_statPtrI);
718 return;
719 }
720 subRecPtr.p = subRec;
721 subRec->errorCode = (BuildIndxRef::ErrorCode)utilPrepareRef->errorCode;
722
723 UtilReleaseConf* conf = (UtilReleaseConf*)signal->getDataPtrSend();
724 conf->senderData = subRecPtr.i;
725 execUTIL_RELEASE_CONF(signal);
726 }
727
execUTIL_EXECUTE_CONF(Signal * signal)728 void Trix::execUTIL_EXECUTE_CONF(Signal* signal)
729 {
730 jamEntry();
731 UtilExecuteConf * utilExecuteConf = (UtilExecuteConf *)signal->getDataPtr();
732 SubscriptionRecPtr subRecPtr;
733 SubscriptionRecord* subRec;
734
735 const Uint32 gci_hi = utilExecuteConf->gci_hi;
736 const Uint32 gci_lo = utilExecuteConf->gci_lo;
737 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
738
739 subRecPtr.i = utilExecuteConf->senderData;
740 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
741 printf("rix::execUTIL_EXECUTE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
742 return;
743 }
744 if (subRec->requestType == STAT_UTIL)
745 {
746 statUtilExecuteConf(signal, subRec->m_statPtrI);
747 return;
748 }
749 subRecPtr.p = subRec;
750 subRec->expectedConf--;
751
752 if (gci > subRecPtr.p->m_gci)
753 {
754 jam();
755 subRecPtr.p->m_gci = gci;
756 }
757
758 checkParallelism(signal, subRec);
759 if (subRec->expectedConf == 0)
760 {
761 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
762 {
763 jam();
764 wait_gcp(signal, subRecPtr);
765 return;
766 }
767 buildComplete(signal, subRecPtr);
768 }
769 }
770
execUTIL_EXECUTE_REF(Signal * signal)771 void Trix::execUTIL_EXECUTE_REF(Signal* signal)
772 {
773 jamEntry();
774 UtilExecuteRef * utilExecuteRef = (UtilExecuteRef *)signal->getDataPtr();
775 SubscriptionRecPtr subRecPtr;
776 SubscriptionRecord* subRec;
777
778 subRecPtr.i = utilExecuteRef->senderData;
779 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
780 printf("Trix::execUTIL_EXECUTE_REF: Failed to find subscription data %u\n", subRecPtr.i);
781 return;
782 }
783 if (subRec->requestType == STAT_UTIL)
784 {
785 statUtilExecuteRef(signal, subRec->m_statPtrI);
786 return;
787 }
788 subRecPtr.p = subRec;
789 ndbrequire(utilExecuteRef->errorCode == UtilExecuteRef::TCError);
790 if(utilExecuteRef->TCErrorCode == CONSTRAINT_VIOLATION)
791 {
792 jam();
793 buildFailed(signal, subRecPtr, BuildIndxRef::IndexNotUnique);
794 }
795 else if (check_timeout(utilExecuteRef->TCErrorCode))
796 {
797 jam();
798 buildFailed(signal, subRecPtr, BuildIndxRef::DeadlockError);
799 }
800 else if (subRec->requestType == FK_BUILD &&
801 utilExecuteRef->TCErrorCode == TUPLE_NOT_FOUND)
802 {
803 jam();
804 buildFailed(signal, subRecPtr,
805 (BuildIndxRef::ErrorCode)FK_NO_PARENT_ROW_EXISTS);
806 }
807 else
808 {
809 jam();
810 buildFailed(signal, subRecPtr,
811 (BuildIndxRef::ErrorCode)utilExecuteRef->TCErrorCode);
812 }
813 }
814
execSUB_CREATE_CONF(Signal * signal)815 void Trix::execSUB_CREATE_CONF(Signal* signal)
816 {
817 jamEntry();
818 DBUG_ENTER("Trix::execSUB_CREATE_CONF");
819 SubCreateConf * subCreateConf = (SubCreateConf *)signal->getDataPtr();
820 SubscriptionRecPtr subRecPtr;
821 SubscriptionRecord* subRec;
822
823 subRecPtr.i = subCreateConf->senderData;
824 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
825 printf("Trix::execSUB_CREATE_CONF: Failed to find subscription data %u\n", subRecPtr.i);
826 DBUG_VOID_RETURN;
827 }
828 subRec->subscriptionCreated = true;
829 subRecPtr.p = subRec;
830
831 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
832 subRecPtr.i, subRecPtr.p->subscriptionId,
833 subRecPtr.p->subscriptionKey));
834
835 startTableScan(signal, subRecPtr);
836 DBUG_VOID_RETURN;
837 }
838
execSUB_CREATE_REF(Signal * signal)839 void Trix::execSUB_CREATE_REF(Signal* signal)
840 {
841 jamEntry();
842 DBUG_ENTER("Trix::execSUB_CREATE_REF");
843
844 SubCreateRef * subCreateRef = (SubCreateRef *)signal->getDataPtr();
845 SubscriptionRecPtr subRecPtr;
846 SubscriptionRecord* subRec;
847
848 subRecPtr.i = subCreateRef->senderData;
849 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL)
850 {
851 printf("Trix::execSUB_CREATE_REF: Failed to find subscription data %u\n", subRecPtr.i);
852 return;
853 }
854 subRecPtr.p = subRec;
855 subRecPtr.p->errorCode = (BuildIndxRef::ErrorCode)subCreateRef->errorCode;
856
857 UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
858 req->prepareId = subRecPtr.p->prepareId;
859 req->senderData = subRecPtr.i;
860
861 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
862 UtilReleaseReq::SignalLength, JBB);
863
864 DBUG_VOID_RETURN;
865 }
866
execSUB_SYNC_CONF(Signal * signal)867 void Trix::execSUB_SYNC_CONF(Signal* signal)
868 {
869 jamEntry();
870 DBUG_ENTER("Trix::execSUB_SYNC_CONF");
871 SubSyncConf * subSyncConf = (SubSyncConf *)signal->getDataPtr();
872 SubscriptionRecPtr subRecPtr;
873 SubscriptionRecord* subRec;
874
875 subRecPtr.i = subSyncConf->senderData;
876 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
877 printf("Trix::execSUB_SYNC_CONF: Failed to find subscription data %u\n",
878 subRecPtr.i);
879 DBUG_VOID_RETURN;
880 }
881
882 subRecPtr.p = subRec;
883 subRec->expectedConf--;
884 checkParallelism(signal, subRec);
885 if (subRec->expectedConf == 0)
886 {
887 if (subRec->m_flags & SubscriptionRecord::RF_WAIT_GCP)
888 {
889 jam();
890 wait_gcp(signal, subRecPtr);
891 DBUG_VOID_RETURN;
892 }
893 buildComplete(signal, subRecPtr);
894 }
895 DBUG_VOID_RETURN;
896 }
897
execSUB_SYNC_REF(Signal * signal)898 void Trix::execSUB_SYNC_REF(Signal* signal)
899 {
900 jamEntry();
901 DBUG_ENTER("Trix::execSUB_SYNC_REF");
902 SubSyncRef * subSyncRef = (SubSyncRef *)signal->getDataPtr();
903 SubscriptionRecPtr subRecPtr;
904 SubscriptionRecord* subRec;
905
906 subRecPtr.i = subSyncRef->senderData;
907 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
908 printf("Trix::execSUB_SYNC_REF: Failed to find subscription data %u\n", subRecPtr.i);
909 DBUG_VOID_RETURN;
910 }
911 subRecPtr.p = subRec;
912 buildFailed(signal, subRecPtr, BuildIndxRef::InternalError);
913 DBUG_VOID_RETURN;
914 }
915
execSUB_SYNC_CONTINUE_REQ(Signal * signal)916 void Trix::execSUB_SYNC_CONTINUE_REQ(Signal* signal)
917 {
918 SubSyncContinueReq * subSyncContinueReq =
919 (SubSyncContinueReq *) signal->getDataPtr();
920
921 SubscriptionRecPtr subRecPtr;
922 SubscriptionRecord* subRec;
923 subRecPtr.i = subSyncContinueReq->subscriberData;
924 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
925 printf("Trix::execSUB_SYNC_CONTINUE_REQ: Failed to find subscription data %u\n", subRecPtr.i);
926 return;
927 }
928 subRecPtr.p = subRec;
929 subRec->pendingSubSyncContinueConf = true;
930 subRec->syncPtr = subSyncContinueReq->senderData;
931 checkParallelism(signal, subRec);
932 }
933
execSUB_TABLE_DATA(Signal * signal)934 void Trix::execSUB_TABLE_DATA(Signal* signal)
935 {
936 jamEntry();
937 DBUG_ENTER("Trix::execSUB_TABLE_DATA");
938 SubTableData * subTableData = (SubTableData *)signal->getDataPtr();
939 SubscriptionRecPtr subRecPtr;
940 SubscriptionRecord* subRec;
941 subRecPtr.i = subTableData->senderData;
942 if ((subRec = c_theSubscriptions.getPtr(subRecPtr.i)) == NULL) {
943 printf("Trix::execSUB_TABLE_DATA: Failed to find subscription data %u\n", subRecPtr.i);
944 DBUG_VOID_RETURN;
945 }
946 subRecPtr.p = subRec;
947 switch(subRecPtr.p->requestType){
948 case INDEX_BUILD:
949 executeBuildInsertTransaction(signal, subRecPtr);
950 break;
951 case REORG_COPY:
952 case REORG_DELETE:
953 executeReorgTransaction(signal, subRecPtr, subTableData->takeOver);
954 break;
955 case FK_BUILD:
956 executeBuildFKTransaction(signal, subRecPtr);
957 break;
958 case STAT_UTIL:
959 ndbrequire(false);
960 break;
961 case STAT_CLEAN:
962 {
963 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
964 statCleanExecute(signal, stat);
965 }
966 break;
967 case STAT_SCAN:
968 {
969 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
970 statScanExecute(signal, stat);
971 }
972 break;
973 }
974
975 subRecPtr.p->m_rows_processed++;
976
977 DBUG_VOID_RETURN;
978 }
979
setupSubscription(Signal * signal,SubscriptionRecPtr subRecPtr)980 void Trix::setupSubscription(Signal* signal, SubscriptionRecPtr subRecPtr)
981 {
982 jam();
983 DBUG_ENTER("Trix::setupSubscription");
984 SubscriptionRecord* subRec = subRecPtr.p;
985 SubCreateReq * subCreateReq = (SubCreateReq *)signal->getDataPtrSend();
986 // Uint32 listLen = subRec->noOfIndexColumns + subRec->noOfKeyColumns;
987 subCreateReq->senderRef = reference();
988 subCreateReq->senderData = subRecPtr.i;
989 subCreateReq->subscriptionId = subRec->subscriptionId;
990 subCreateReq->subscriptionKey = subRec->subscriptionKey;
991 subCreateReq->tableId = subRec->sourceTableId;
992 subCreateReq->subscriptionType = SubCreateReq::SingleTableScan;
993 subCreateReq->schemaTransId = subRec->schemaTransId;
994
995 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
996 subRecPtr.i, subCreateReq->subscriptionId,
997 subCreateReq->subscriptionKey));
998
999 sendSignal(SUMA_REF, GSN_SUB_CREATE_REQ,
1000 signal, SubCreateReq::SignalLength, JBB);
1001
1002 DBUG_VOID_RETURN;
1003 }
1004
startTableScan(Signal * signal,SubscriptionRecPtr subRecPtr)1005 void Trix::startTableScan(Signal* signal, SubscriptionRecPtr subRecPtr)
1006 {
1007 jam();
1008
1009 Uint32 attributeList[MAX_ATTRIBUTES_IN_TABLE * 2];
1010 SubscriptionRecord* subRec = subRecPtr.p;
1011 AttrOrderBuffer::DataBufferIterator iter;
1012
1013 Uint32 cnt = 0;
1014 bool moreAttributes = subRec->attributeOrder.first(iter);
1015 if (subRec->requestType == FK_BUILD)
1016 {
1017 jam();
1018 // skip over key columns
1019 ndbrequire(subRec->attributeOrder.position(iter, subRec->noOfKeyColumns));
1020 }
1021
1022 while (moreAttributes) {
1023 attributeList[cnt++] = *iter.data;
1024 moreAttributes = subRec->attributeOrder.next(iter);
1025 }
1026
1027 // Merge index and key column segments
1028 struct LinearSectionPtr orderPtr[3];
1029 Uint32 noOfSections;
1030 orderPtr[0].p = attributeList;
1031 orderPtr[0].sz = cnt;
1032 noOfSections = 1;
1033
1034 SubSyncReq * subSyncReq = (SubSyncReq *)signal->getDataPtrSend();
1035 subSyncReq->senderRef = reference();
1036 subSyncReq->senderData = subRecPtr.i;
1037 subSyncReq->subscriptionId = subRec->subscriptionId;
1038 subSyncReq->subscriptionKey = subRec->subscriptionKey;
1039 subSyncReq->part = SubscriptionData::TableData;
1040 subSyncReq->requestInfo = 0;
1041 subSyncReq->fragCount = subRec->fragCount;
1042 subSyncReq->fragId = subRec->fragId;
1043
1044 if (subRec->m_flags & SubscriptionRecord::RF_NO_DISK)
1045 {
1046 jam();
1047 subSyncReq->requestInfo |= SubSyncReq::NoDisk;
1048 }
1049
1050 if (subRec->m_flags & SubscriptionRecord::RF_TUP_ORDER)
1051 {
1052 jam();
1053 subSyncReq->requestInfo |= SubSyncReq::TupOrder;
1054 }
1055
1056 if (subRec->requestType == REORG_COPY)
1057 {
1058 jam();
1059 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1060 }
1061 else if (subRec->requestType == REORG_DELETE)
1062 {
1063 jam();
1064 subSyncReq->requestInfo |= SubSyncReq::LM_Exclusive;
1065 subSyncReq->requestInfo |= SubSyncReq::Reorg;
1066 }
1067 else if (subRec->requestType == STAT_CLEAN)
1068 {
1069 jam();
1070 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1071 StatOp::Clean clean = stat.m_clean;
1072 orderPtr[1].p = clean.m_bound;
1073 orderPtr[1].sz = clean.m_boundSize;
1074 noOfSections = 2;
1075 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1076 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1077 }
1078 else if (subRec->requestType == STAT_SCAN)
1079 {
1080 jam();
1081 orderPtr[1].p = 0;
1082 orderPtr[1].sz = 0;
1083 noOfSections = 2;
1084 subSyncReq->requestInfo |= SubSyncReq::LM_CommittedRead;
1085 subSyncReq->requestInfo |= SubSyncReq::RangeScan;
1086 subSyncReq->requestInfo |= SubSyncReq::StatScan;
1087 }
1088 subRecPtr.p->expectedConf = 1;
1089
1090 DBUG_PRINT("info",("i: %u subscriptionId: %u, subscriptionKey: %u",
1091 subRecPtr.i, subSyncReq->subscriptionId,
1092 subSyncReq->subscriptionKey));
1093
1094 sendSignal(SUMA_REF, GSN_SUB_SYNC_REQ,
1095 signal, SubSyncReq::SignalLength, JBB, orderPtr, noOfSections);
1096 }
1097
prepareInsertTransactions(Signal * signal,SubscriptionRecPtr subRecPtr)1098 void Trix::prepareInsertTransactions(Signal* signal,
1099 SubscriptionRecPtr subRecPtr)
1100 {
1101 SubscriptionRecord* subRec = subRecPtr.p;
1102 UtilPrepareReq * utilPrepareReq =
1103 (UtilPrepareReq *)signal->getDataPtrSend();
1104
1105 jam();
1106 utilPrepareReq->senderRef = reference();
1107 utilPrepareReq->senderData = subRecPtr.i;
1108 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1109
1110 const Uint32 pageSizeInWords = 128;
1111 Uint32 propPage[pageSizeInWords];
1112 LinearWriter w(&propPage[0],128);
1113 w.first();
1114 w.add(UtilPrepareReq::NoOfOperations, 1);
1115 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1116 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1117 // Add index attributes in increasing order and one PK attribute
1118 for(Uint32 i = 0; i < subRec->noOfIndexColumns + 1; i++)
1119 w.add(UtilPrepareReq::AttributeId, i);
1120
1121 #if 0
1122 // Debugging
1123 SimplePropertiesLinearReader reader(propPage, w.getWordsUsed());
1124 printf("Trix::prepareInsertTransactions: Sent SimpleProperties:\n");
1125 reader.printAll(ndbout);
1126 #endif
1127
1128 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1129 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1130 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1131 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1132 UtilPrepareReq::SignalLength, JBB,
1133 sectionsPtr, UtilPrepareReq::NoOfSections);
1134 }
1135
executeBuildInsertTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1136 void Trix::executeBuildInsertTransaction(Signal* signal,
1137 SubscriptionRecPtr subRecPtr)
1138 {
1139 jam();
1140 SubscriptionRecord* subRec = subRecPtr.p;
1141 UtilExecuteReq * utilExecuteReq =
1142 (UtilExecuteReq *)signal->getDataPtrSend();
1143
1144 utilExecuteReq->senderRef = reference();
1145 utilExecuteReq->senderData = subRecPtr.i;
1146 utilExecuteReq->prepareId = subRec->prepareId;
1147 #if 0
1148 printf("Header size %u\n", headerPtr.sz);
1149 for(int i = 0; i < headerPtr.sz; i++)
1150 printf("H'%.8x ", headerBuffer[i]);
1151 printf("\n");
1152
1153 printf("Data size %u\n", dataPtr.sz);
1154 for(int i = 0; i < dataPtr.sz; i++)
1155 printf("H'%.8x ", dataBuffer[i]);
1156 printf("\n");
1157 #endif
1158 // Save scan result in linear buffers
1159 SectionHandle handle(this, signal);
1160 SegmentedSectionPtr headerPtr, dataPtr;
1161
1162 handle.getSection(headerPtr, 0);
1163 handle.getSection(dataPtr, 1);
1164
1165 Uint32* headerBuffer = signal->theData + 25;
1166 Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1167
1168 copy(headerBuffer, headerPtr);
1169 copy(dataBuffer, dataPtr);
1170 releaseSections(handle);
1171
1172 // Calculate packed key size
1173 Uint32 noOfKeyData = 0;
1174 for(Uint32 i = 0; i < headerPtr.sz; i++) {
1175 AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1176
1177 // Filter out NULL attributes
1178 if (keyAttrHead->isNULL())
1179 return;
1180
1181 if (i < subRec->noOfIndexColumns)
1182 // Renumber index attributes in consequtive order
1183 keyAttrHead->setAttributeId(i);
1184 else
1185 // Calculate total size of PK attribute
1186 noOfKeyData += keyAttrHead->getDataSize();
1187 }
1188 // Increase expected CONF count
1189 subRec->expectedConf++;
1190
1191 // Pack key attributes
1192 AttributeHeader::init(headerBuffer + subRec->noOfIndexColumns,
1193 subRec->noOfIndexColumns,
1194 noOfKeyData << 2);
1195
1196 struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1197 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1198 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz =
1199 subRec->noOfIndexColumns + 1;
1200 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1201 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1202 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1203 UtilExecuteReq::SignalLength, JBB,
1204 sectionsPtr, UtilExecuteReq::NoOfSections);
1205 }
1206
executeReorgTransaction(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 takeOver)1207 void Trix::executeReorgTransaction(Signal* signal,
1208 SubscriptionRecPtr subRecPtr,
1209 Uint32 takeOver)
1210 {
1211 jam();
1212 SubscriptionRecord* subRec = subRecPtr.p;
1213 UtilExecuteReq * utilExecuteReq =
1214 (UtilExecuteReq *)signal->getDataPtrSend();
1215
1216 const Uint32 tScanInfo = takeOver & 0x3FFFF;
1217 const Uint32 tTakeOverFragment = takeOver >> 20;
1218 {
1219 UintR scanInfo = 0;
1220 TcKeyReq::setTakeOverScanFlag(scanInfo, 1);
1221 TcKeyReq::setTakeOverScanFragment(scanInfo, tTakeOverFragment);
1222 TcKeyReq::setTakeOverScanInfo(scanInfo, tScanInfo);
1223 utilExecuteReq->scanTakeOver = scanInfo;
1224 }
1225
1226 utilExecuteReq->senderRef = reference();
1227 utilExecuteReq->senderData = subRecPtr.i;
1228 utilExecuteReq->prepareId = subRec->prepareId;
1229 #if 0
1230 printf("Header size %u\n", headerPtr.sz);
1231 for(int i = 0; i < headerPtr.sz; i++)
1232 printf("H'%.8x ", headerBuffer[i]);
1233 printf("\n");
1234
1235 printf("Data size %u\n", dataPtr.sz);
1236 for(int i = 0; i < dataPtr.sz; i++)
1237 printf("H'%.8x ", dataBuffer[i]);
1238 printf("\n");
1239 #endif
1240 // Increase expected CONF count
1241 subRec->expectedConf++;
1242
1243 SectionHandle handle(this, signal);
1244 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1245 UtilExecuteReq::SignalLength, JBB,
1246 &handle);
1247 }
1248
1249 void
wait_gcp(Signal * signal,SubscriptionRecPtr subRecPtr,Uint32 delay)1250 Trix::wait_gcp(Signal* signal, SubscriptionRecPtr subRecPtr, Uint32 delay)
1251 {
1252 WaitGCPReq * req = (WaitGCPReq*)signal->getDataPtrSend();
1253 req->senderRef = reference();
1254 req->senderData = subRecPtr.i;
1255 req->requestType = WaitGCPReq::CurrentGCI;
1256
1257 if (delay == 0)
1258 {
1259 jam();
1260 sendSignal(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1261 WaitGCPReq::SignalLength, JBB);
1262 }
1263 else
1264 {
1265 jam();
1266 sendSignalWithDelay(DBDIH_REF, GSN_WAIT_GCP_REQ, signal,
1267 delay, WaitGCPReq::SignalLength);
1268 }
1269 }
1270
1271 void
execWAIT_GCP_REF(Signal * signal)1272 Trix::execWAIT_GCP_REF(Signal* signal)
1273 {
1274 WaitGCPRef ref = *(WaitGCPRef*)signal->getDataPtr();
1275
1276 SubscriptionRecPtr subRecPtr;
1277 c_theSubscriptions.getPtr(subRecPtr, ref.senderData);
1278 wait_gcp(signal, subRecPtr, 100);
1279 }
1280
1281 void
execWAIT_GCP_CONF(Signal * signal)1282 Trix::execWAIT_GCP_CONF(Signal* signal)
1283 {
1284 WaitGCPConf * conf = (WaitGCPConf*)signal->getDataPtr();
1285
1286 SubscriptionRecPtr subRecPtr;
1287 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1288
1289 const Uint32 gci_hi = conf->gci_hi;
1290 const Uint32 gci_lo = conf->gci_lo;
1291 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
1292
1293 if (gci > subRecPtr.p->m_gci)
1294 {
1295 jam();
1296 buildComplete(signal, subRecPtr);
1297 }
1298 else
1299 {
1300 jam();
1301 wait_gcp(signal, subRecPtr, 100);
1302 }
1303 }
1304
buildComplete(Signal * signal,SubscriptionRecPtr subRecPtr)1305 void Trix::buildComplete(Signal* signal, SubscriptionRecPtr subRecPtr)
1306 {
1307 SubRemoveReq * const req = (SubRemoveReq*)signal->getDataPtrSend();
1308 req->senderRef = reference();
1309 req->senderData = subRecPtr.i;
1310 req->subscriptionId = subRecPtr.p->subscriptionId;
1311 req->subscriptionKey = subRecPtr.p->subscriptionKey;
1312 sendSignal(SUMA_REF, GSN_SUB_REMOVE_REQ, signal,
1313 SubRemoveReq::SignalLength, JBB);
1314 }
1315
buildFailed(Signal * signal,SubscriptionRecPtr subRecPtr,BuildIndxRef::ErrorCode errorCode)1316 void Trix::buildFailed(Signal* signal,
1317 SubscriptionRecPtr subRecPtr,
1318 BuildIndxRef::ErrorCode errorCode)
1319 {
1320 SubscriptionRecord* subRec = subRecPtr.p;
1321
1322 subRec->errorCode = errorCode;
1323 // Continue accumulating since we currently cannot stop SUMA
1324 subRec->expectedConf--;
1325 checkParallelism(signal, subRec);
1326 if (subRec->expectedConf == 0)
1327 buildComplete(signal, subRecPtr);
1328 }
1329
1330 void
execSUB_REMOVE_REF(Signal * signal)1331 Trix::execSUB_REMOVE_REF(Signal* signal){
1332 jamEntry();
1333 //@todo
1334 ndbrequire(false);
1335 }
1336
1337 void
execSUB_REMOVE_CONF(Signal * signal)1338 Trix::execSUB_REMOVE_CONF(Signal* signal){
1339 jamEntry();
1340
1341 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
1342
1343 SubscriptionRecPtr subRecPtr;
1344 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1345
1346 if(subRecPtr.p->prepareId != RNIL){
1347 jam();
1348
1349 UtilReleaseReq * const req = (UtilReleaseReq*)signal->getDataPtrSend();
1350 req->prepareId = subRecPtr.p->prepareId;
1351 req->senderData = subRecPtr.i;
1352
1353 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ, signal,
1354 UtilReleaseReq::SignalLength , JBB);
1355 return;
1356 }
1357
1358 {
1359 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1360 conf->senderData = subRecPtr.i;
1361 execUTIL_RELEASE_CONF(signal);
1362 }
1363 }
1364
1365 void
execUTIL_RELEASE_REF(Signal * signal)1366 Trix::execUTIL_RELEASE_REF(Signal* signal){
1367 jamEntry();
1368 ndbrequire(false);
1369 }
1370
1371 void
execUTIL_RELEASE_CONF(Signal * signal)1372 Trix::execUTIL_RELEASE_CONF(Signal* signal){
1373
1374 UtilReleaseConf * const conf = (UtilReleaseConf*)signal->getDataPtrSend();
1375
1376 SubscriptionRecPtr subRecPtr;
1377 c_theSubscriptions.getPtr(subRecPtr, conf->senderData);
1378
1379 switch(subRecPtr.p->requestType){
1380 case REORG_COPY:
1381 case REORG_DELETE:
1382 if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1383 {
1384 jam();
1385 // Build is complete, reply to original sender
1386 CopyDataImplConf* conf = (CopyDataImplConf*)signal->getDataPtrSend();
1387 conf->senderRef = reference(); //wl3600_todo ok?
1388 conf->senderData = subRecPtr.p->connectionPtr;
1389
1390 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_CONF, signal,
1391 CopyDataImplConf::SignalLength , JBB);
1392
1393 infoEvent("%s table %u processed %llu rows",
1394 subRecPtr.p->requestType == REORG_COPY ?
1395 "reorg-copy" : "reorg-delete",
1396 subRecPtr.p->sourceTableId,
1397 subRecPtr.p->m_rows_processed);
1398 } else {
1399 jam();
1400 // Build failed, reply to original sender
1401 CopyDataImplRef* ref = (CopyDataImplRef*)signal->getDataPtrSend();
1402 ref->senderRef = reference();
1403 ref->senderData = subRecPtr.p->connectionPtr;
1404 ref->errorCode = subRecPtr.p->errorCode;
1405
1406 sendSignal(subRecPtr.p->userReference, GSN_COPY_DATA_IMPL_REF, signal,
1407 CopyDataImplRef::SignalLength , JBB);
1408 }
1409 break;
1410 case INDEX_BUILD:
1411 if (subRecPtr.p->errorCode == BuildIndxRef::NoError) {
1412 jam();
1413 // Build is complete, reply to original sender
1414 BuildIndxImplConf* buildIndxConf =
1415 (BuildIndxImplConf*)signal->getDataPtrSend();
1416 buildIndxConf->senderRef = reference(); //wl3600_todo ok?
1417 buildIndxConf->senderData = subRecPtr.p->connectionPtr;
1418
1419 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_CONF, signal,
1420 BuildIndxConf::SignalLength , JBB);
1421
1422 infoEvent("index-build table %u index: %u processed %llu rows",
1423 subRecPtr.p->sourceTableId,
1424 subRecPtr.p->targetTableId,
1425 subRecPtr.p->m_rows_processed);
1426 } else {
1427 jam();
1428 // Build failed, reply to original sender
1429 BuildIndxImplRef* buildIndxRef =
1430 (BuildIndxImplRef*)signal->getDataPtrSend();
1431 buildIndxRef->senderRef = reference();
1432 buildIndxRef->senderData = subRecPtr.p->connectionPtr;
1433 buildIndxRef->errorCode = subRecPtr.p->errorCode;
1434
1435 sendSignal(subRecPtr.p->userReference, GSN_BUILD_INDX_IMPL_REF, signal,
1436 BuildIndxRef::SignalLength , JBB);
1437 }
1438 break;
1439 case FK_BUILD:
1440 if (subRecPtr.p->errorCode == BuildIndxRef::NoError)
1441 {
1442 jam();
1443 // Build is complete, reply to original sender
1444 BuildFKImplConf* buildFKConf =
1445 CAST_PTR(BuildFKImplConf, signal->getDataPtrSend());
1446 buildFKConf->senderRef = reference();
1447 buildFKConf->senderData = subRecPtr.p->connectionPtr;
1448
1449 sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_CONF, signal,
1450 BuildFKImplConf::SignalLength , JBB);
1451
1452 infoEvent("fk-build parent table: %u child table: %u processed %llu rows",
1453 subRecPtr.p->targetTableId,
1454 subRecPtr.p->sourceTableId,
1455 subRecPtr.p->m_rows_processed);
1456 }
1457 else
1458 {
1459 jam();
1460 // Build failed, reply to original sender
1461 BuildFKImplRef* buildFKRef =
1462 (BuildFKImplRef*)signal->getDataPtrSend();
1463 buildFKRef->senderRef = reference();
1464 buildFKRef->senderData = subRecPtr.p->connectionPtr;
1465 buildFKRef->errorCode = subRecPtr.p->errorCode;
1466
1467 sendSignal(subRecPtr.p->userReference, GSN_BUILD_FK_IMPL_REF, signal,
1468 BuildFKImplRef::SignalLength , JBB);
1469 }
1470 break;
1471 case STAT_UTIL:
1472 ndbrequire(subRecPtr.p->errorCode == BuildIndxRef::NoError);
1473 statUtilReleaseConf(signal, subRecPtr.p->m_statPtrI);
1474 return;
1475 case STAT_CLEAN:
1476 {
1477 subRecPtr.p->prepareId = RNIL;
1478 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1479 statCleanRelease(signal, stat);
1480 }
1481 return;
1482 case STAT_SCAN:
1483 {
1484 subRecPtr.p->prepareId = RNIL;
1485 StatOp& stat = statOpGetPtr(subRecPtr.p->m_statPtrI);
1486 statScanRelease(signal, stat);
1487 }
1488 return;
1489 }
1490
1491 // Release subscription record
1492 subRecPtr.p->attributeOrder.release();
1493 c_theSubscriptions.release(subRecPtr.i);
1494 }
1495
checkParallelism(Signal * signal,SubscriptionRecord * subRec)1496 void Trix::checkParallelism(Signal* signal, SubscriptionRecord* subRec)
1497 {
1498 if ((subRec->pendingSubSyncContinueConf) &&
1499 (subRec->expectedConf == 1)) {
1500 jam();
1501 SubSyncContinueConf * subSyncContinueConf =
1502 (SubSyncContinueConf *) signal->getDataPtrSend();
1503 subSyncContinueConf->subscriptionId = subRec->subscriptionId;
1504 subSyncContinueConf->subscriptionKey = subRec->subscriptionKey;
1505 subSyncContinueConf->senderData = subRec->syncPtr;
1506 sendSignal(SUMA_REF, GSN_SUB_SYNC_CONTINUE_CONF, signal,
1507 SubSyncContinueConf::SignalLength , JBB);
1508 subRec->pendingSubSyncContinueConf = false;
1509 return;
1510 }
1511 }
1512
1513 // CopyData
1514 void
execCOPY_DATA_IMPL_REQ(Signal * signal)1515 Trix::execCOPY_DATA_IMPL_REQ(Signal* signal)
1516 {
1517 jamEntry();
1518
1519 const CopyDataImplReq reqData = *(const CopyDataImplReq*)signal->getDataPtr();
1520 const CopyDataImplReq *req = &reqData;
1521
1522 // Seize a subscription record
1523 SubscriptionRecPtr subRecPtr;
1524 SectionHandle handle(this, signal);
1525
1526 if (!c_theSubscriptions.seizeFirst(subRecPtr))
1527 {
1528 jam();
1529 // Failed to allocate subscription record
1530 releaseSections(handle);
1531
1532 CopyDataImplRef* ref = (CopyDataRef*)signal->getDataPtrSend();
1533
1534 ref->errorCode = -1; // XXX CopyDataImplRef::AllocationFailure;
1535 ref->senderData = req->senderData;
1536 ref->transId = req->transId;
1537 sendSignal(req->senderRef, GSN_COPY_DATA_IMPL_REF, signal,
1538 CopyDataImplRef::SignalLength, JBB);
1539 return;
1540 }
1541
1542 SubscriptionRecord* subRec = subRecPtr.p;
1543 subRec->errorCode = BuildIndxRef::NoError;
1544 subRec->userReference = req->senderRef;
1545 subRec->connectionPtr = req->senderData;
1546 subRec->schemaTransId = req->transId;
1547 subRec->subscriptionId = rand();
1548 subRec->subscriptionKey = rand();
1549 subRec->indexType = RNIL;
1550 subRec->sourceTableId = req->srcTableId;
1551 subRec->targetTableId = req->dstTableId;
1552 subRec->parallelism = 16;
1553 subRec->expectedConf = 0;
1554 subRec->subscriptionCreated = false;
1555 subRec->pendingSubSyncContinueConf = false;
1556 subRec->prepareId = req->transId;
1557 subRec->fragCount = req->srcFragments;
1558 subRec->fragId = ZNIL;
1559 subRec->m_rows_processed = 0;
1560 subRec->m_flags = SubscriptionRecord::RF_WAIT_GCP; // Todo make configurable
1561 subRec->m_gci = 0;
1562 switch(req->requestType){
1563 case CopyDataImplReq::ReorgCopy:
1564 jam();
1565 subRec->requestType = REORG_COPY;
1566 break;
1567 case CopyDataImplReq::ReorgDelete:
1568 subRec->requestType = REORG_DELETE;
1569 break;
1570 default:
1571 jamLine(req->requestType);
1572 ndbrequire(false);
1573 }
1574
1575 if (req->requestInfo & CopyDataReq::TupOrder)
1576 {
1577 jam();
1578 subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1579 }
1580
1581 // Get column order segments
1582 Uint32 noOfSections = handle.m_cnt;
1583 if (noOfSections > 0) {
1584 jam();
1585 SegmentedSectionPtr ptr;
1586 handle.getSection(ptr, 0);
1587 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1588 subRec->noOfIndexColumns = ptr.sz;
1589 }
1590
1591 if (noOfSections > 1) {
1592 jam();
1593 SegmentedSectionPtr ptr;
1594 handle.getSection(ptr, 1);
1595 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1596 subRec->noOfKeyColumns = ptr.sz;
1597 }
1598
1599 releaseSections(handle);
1600 {
1601 UtilPrepareReq * utilPrepareReq =
1602 (UtilPrepareReq *)signal->getDataPtrSend();
1603
1604 utilPrepareReq->senderRef = reference();
1605 utilPrepareReq->senderData = subRecPtr.i;
1606 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1607
1608 const Uint32 pageSizeInWords = 128;
1609 Uint32 propPage[pageSizeInWords];
1610 LinearWriter w(&propPage[0],128);
1611 w.first();
1612 w.add(UtilPrepareReq::NoOfOperations, 1);
1613 if (subRec->requestType == REORG_COPY)
1614 {
1615 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Write);
1616 }
1617 else
1618 {
1619 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Delete);
1620 }
1621 w.add(UtilPrepareReq::ScanTakeOverInd, 1);
1622 w.add(UtilPrepareReq::ReorgInd, 1);
1623 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1624
1625 AttrOrderBuffer::DataBufferIterator iter;
1626 ndbrequire(subRec->attributeOrder.first(iter));
1627
1628 for(Uint32 i = 0; i < subRec->noOfIndexColumns; i++)
1629 {
1630 w.add(UtilPrepareReq::AttributeId, * iter.data);
1631 subRec->attributeOrder.next(iter);
1632 }
1633
1634 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1635 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1636 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1637 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1638 UtilPrepareReq::SignalLength, JBB,
1639 sectionsPtr, UtilPrepareReq::NoOfSections);
1640 }
1641 }
1642
1643 // BuildFK
1644 void
execBUILD_FK_IMPL_REQ(Signal * signal)1645 Trix::execBUILD_FK_IMPL_REQ(Signal* signal)
1646 {
1647 jamEntry();
1648
1649 const BuildFKImplReq reqData = *(const BuildFKImplReq*)signal->getDataPtr();
1650 const BuildFKImplReq *req = &reqData;
1651
1652 // Seize a subscription record
1653 SubscriptionRecPtr subRecPtr;
1654 SectionHandle handle(this, signal);
1655
1656 if (!c_theSubscriptions.seizeFirst(subRecPtr))
1657 {
1658 jam();
1659 // Failed to allocate subscription record
1660 releaseSections(handle);
1661
1662 BuildFKImplRef* ref = (BuildFKImplRef*)signal->getDataPtrSend();
1663
1664 ref->errorCode = -1; // XXX BuildFKImplRef::AllocationFailure;
1665 ref->senderData = req->senderData;
1666 sendSignal(req->senderRef, GSN_BUILD_FK_IMPL_REF, signal,
1667 BuildFKImplRef::SignalLength, JBB);
1668 return;
1669 }
1670
1671 SubscriptionRecord* subRec = subRecPtr.p;
1672 subRec->errorCode = BuildIndxRef::NoError;
1673 subRec->userReference = req->senderRef;
1674 subRec->connectionPtr = req->senderData;
1675 subRec->schemaTransId = req->transId;
1676 subRec->subscriptionId = rand();
1677 subRec->subscriptionKey = rand();
1678 subRec->indexType = RNIL;
1679 subRec->sourceTableId = req->childTableId;
1680 subRec->targetTableId = req->parentTableId;
1681 subRec->parallelism = 16;
1682 subRec->expectedConf = 0;
1683 subRec->subscriptionCreated = false;
1684 subRec->pendingSubSyncContinueConf = false;
1685 subRec->prepareId = req->transId;
1686 subRec->fragCount = 0; // all
1687 subRec->fragId = ZNIL;
1688 subRec->m_rows_processed = 0;
1689 subRec->m_flags = 0;
1690 subRec->m_gci = 0;
1691 subRec->requestType = FK_BUILD;
1692
1693 // TODO...check if there is a scenario where this is not optimal
1694 subRec->m_flags |= SubscriptionRecord::RF_TUP_ORDER;
1695
1696 // as we don't support index on disk...
1697 subRec->m_flags |= SubscriptionRecord::RF_NO_DISK;
1698
1699 // Get parent columns...
1700 {
1701 SegmentedSectionPtr ptr;
1702 handle.getSection(ptr, 0);
1703 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1704 subRec->noOfKeyColumns = ptr.sz;
1705 }
1706
1707 {
1708 // Get child columns...
1709 SegmentedSectionPtr ptr;
1710 handle.getSection(ptr, 1);
1711 append(subRec->attributeOrder, ptr, getSectionSegmentPool());
1712 subRec->noOfIndexColumns = ptr.sz;
1713 }
1714
1715 ndbrequire(subRec->noOfKeyColumns == subRec->noOfIndexColumns);
1716
1717 releaseSections(handle);
1718
1719 {
1720 UtilPrepareReq * utilPrepareReq =
1721 (UtilPrepareReq *)signal->getDataPtrSend();
1722
1723 utilPrepareReq->senderRef = reference();
1724 utilPrepareReq->senderData = subRecPtr.i;
1725 utilPrepareReq->schemaTransId = subRec->schemaTransId;
1726
1727 const Uint32 pageSizeInWords = 128;
1728 Uint32 propPage[pageSizeInWords];
1729 LinearWriter w(&propPage[0],128);
1730 w.first();
1731 w.add(UtilPrepareReq::NoOfOperations, 1);
1732 w.add(UtilPrepareReq::OperationType, UtilPrepareReq::Probe);
1733 w.add(UtilPrepareReq::TableId, subRec->targetTableId);
1734
1735 // key is always in 0
1736 AttrOrderBuffer::DataBufferIterator iter;
1737 ndbrequire(subRec->attributeOrder.first(iter));
1738 for(Uint32 i = 0; i < subRec->noOfKeyColumns; i++)
1739 {
1740 w.add(UtilPrepareReq::AttributeId, * iter.data);
1741 subRec->attributeOrder.next(iter);
1742 }
1743
1744 struct LinearSectionPtr sectionsPtr[UtilPrepareReq::NoOfSections];
1745 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].p = propPage;
1746 sectionsPtr[UtilPrepareReq::PROPERTIES_SECTION].sz = w.getWordsUsed();
1747 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ, signal,
1748 UtilPrepareReq::SignalLength, JBB,
1749 sectionsPtr, UtilPrepareReq::NoOfSections);
1750 }
1751 }
1752
1753 void
executeBuildFKTransaction(Signal * signal,SubscriptionRecPtr subRecPtr)1754 Trix::executeBuildFKTransaction(Signal* signal,
1755 SubscriptionRecPtr subRecPtr)
1756 {
1757 jam();
1758 SubscriptionRecord* subRec = subRecPtr.p;
1759 UtilExecuteReq * utilExecuteReq =
1760 CAST_PTR(UtilExecuteReq, signal->getDataPtrSend());
1761
1762 utilExecuteReq->senderRef = reference();
1763 utilExecuteReq->senderData = subRecPtr.i;
1764 utilExecuteReq->prepareId = subRec->prepareId;
1765
1766 // Save scan result in linear buffers
1767 SectionHandle handle(this, signal);
1768 SegmentedSectionPtr headerPtr, dataPtr;
1769
1770 handle.getSection(headerPtr, 0);
1771 handle.getSection(dataPtr, 1);
1772
1773 Uint32* headerBuffer = signal->theData + 25;
1774 Uint32* dataBuffer = headerBuffer + headerPtr.sz;
1775
1776 copy(headerBuffer, headerPtr);
1777 copy(dataBuffer, dataPtr);
1778 releaseSections(handle);
1779
1780 AttrOrderBuffer::ConstDataBufferIterator iter;
1781 ndbrequire(subRec->attributeOrder.first(iter));
1782 for(Uint32 i = 0; i < headerPtr.sz; i++)
1783 {
1784 AttributeHeader* keyAttrHead = (AttributeHeader *) headerBuffer + i;
1785
1786 // Filter out NULL attributes
1787 if (keyAttrHead->isNULL())
1788 return;
1789
1790 /**
1791 * UTIL_EXECUTE header section expects real attrid (same as passed in
1792 * UTIL_PREPARE). SUMA sends child attrid, replace it by parent attrid.
1793 */
1794 keyAttrHead->setAttributeId(*iter.data);
1795 subRec->attributeOrder.next(iter);
1796 }
1797 // Increase expected CONF count
1798 subRec->expectedConf++;
1799
1800 struct LinearSectionPtr sectionsPtr[UtilExecuteReq::NoOfSections];
1801 sectionsPtr[UtilExecuteReq::HEADER_SECTION].p = headerBuffer;
1802 sectionsPtr[UtilExecuteReq::HEADER_SECTION].sz = subRec->noOfKeyColumns;
1803 sectionsPtr[UtilExecuteReq::DATA_SECTION].p = dataBuffer;
1804 sectionsPtr[UtilExecuteReq::DATA_SECTION].sz = dataPtr.sz;
1805 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ, signal,
1806 UtilExecuteReq::SignalLength, JBB,
1807 sectionsPtr, UtilExecuteReq::NoOfSections);
1808 }
1809
1810 // index stats
1811
1812 Trix::StatOp&
statOpGetPtr(Uint32 statPtrI)1813 Trix::statOpGetPtr(Uint32 statPtrI)
1814 {
1815 ndbrequire(statPtrI != RNIL);
1816 return *c_statOpPool.getPtr(statPtrI);
1817 }
1818
1819 bool
statOpSeize(Uint32 & statPtrI)1820 Trix::statOpSeize(Uint32& statPtrI)
1821 {
1822 StatOpPtr statPtr;
1823 if (ERROR_INSERTED(18001) ||
1824 !c_statOpPool.seize(statPtr))
1825 {
1826 jam();
1827 CLEAR_ERROR_INSERT_VALUE;
1828 D("statOpSeize: seize statOp failed");
1829 return false;
1830 }
1831 #ifdef VM_TRACE
1832 memset(statPtr.p, 0xf3, sizeof(*statPtr.p));
1833 #endif
1834 new (statPtr.p) StatOp;
1835 statPtrI = statPtr.i;
1836 StatOp& stat = statOpGetPtr(statPtrI);
1837 stat.m_ownPtrI = statPtrI;
1838
1839 SubscriptionRecPtr subRecPtr;
1840 if (ERROR_INSERTED(18002) ||
1841 !c_theSubscriptions.seizeFirst(subRecPtr))
1842 {
1843 jam();
1844 CLEAR_ERROR_INSERT_VALUE;
1845 c_statOpPool.release(statPtr);
1846 D("statOpSeize: seize subRec failed");
1847 return false;
1848 }
1849 SubscriptionRecord* subRec = subRecPtr.p;
1850 subRec->m_statPtrI = stat.m_ownPtrI;
1851 stat.m_subRecPtrI = subRecPtr.i;
1852
1853 D("statOpSeize" << V(statPtrI) << V(subRecPtr.i));
1854 return true;
1855 }
1856
1857 void
statOpRelease(StatOp & stat)1858 Trix::statOpRelease(StatOp& stat)
1859 {
1860 StatOp::Util& util = stat.m_util;
1861 D("statOpRelease" << V(stat));
1862
1863 if (stat.m_subRecPtrI != RNIL)
1864 {
1865 jam();
1866 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1867 ndbrequire(subRec->prepareId == RNIL);
1868 subRec->attributeOrder.release();
1869 c_theSubscriptions.release(stat.m_subRecPtrI);
1870 stat.m_subRecPtrI = RNIL;
1871 }
1872 ndbrequire(util.m_prepareId == RNIL);
1873 c_statOpPool.release(stat.m_ownPtrI);
1874 }
1875
1876 void
execINDEX_STAT_IMPL_REQ(Signal * signal)1877 Trix::execINDEX_STAT_IMPL_REQ(Signal* signal)
1878 {
1879 jamEntry();
1880 const IndexStatImplReq* req = (const IndexStatImplReq*)signal->getDataPtr();
1881
1882 Uint32 statPtrI = RNIL;
1883 if (!statOpSeize(statPtrI))
1884 {
1885 jam();
1886 const IndexStatImplReq reqCopy = *req;
1887 statOpRef(signal, &reqCopy, IndexStatRef::NoFreeStatOp, __LINE__);
1888 return;
1889 }
1890 StatOp& stat = statOpGetPtr(statPtrI);
1891 stat.m_req = *req;
1892 stat.m_requestType = req->requestType;
1893
1894 // set request name for cluster log message
1895 switch (stat.m_requestType) {
1896 case IndexStatReq::RT_CLEAN_NEW:
1897 jam();
1898 stat.m_requestName = "clean new";
1899 break;
1900 case IndexStatReq::RT_CLEAN_OLD:
1901 jam();
1902 stat.m_requestName = "clean old";
1903 break;
1904 case IndexStatReq::RT_CLEAN_ALL:
1905 jam();
1906 stat.m_requestName = "clean all";
1907 break;
1908 case IndexStatReq::RT_SCAN_FRAG:
1909 jam();
1910 stat.m_requestName = "scan frag";
1911 break;
1912 case IndexStatReq::RT_DROP_HEAD:
1913 jam();
1914 stat.m_requestName = "drop head";
1915 break;
1916 default:
1917 ndbrequire(false);
1918 break;
1919 }
1920
1921 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
1922 subRec->prepareId = RNIL;
1923 subRec->errorCode = BuildIndxRef::NoError;
1924
1925 // sys tables are not recreated so do this only once
1926 if (!c_statGetMetaDone)
1927 {
1928 jam();
1929 statMetaGetHead(signal, stat);
1930 return;
1931 }
1932 statGetMetaDone(signal, stat);
1933 }
1934
1935 // sys tables metadata
1936
1937 const Trix::SysColumn
1938 Trix::g_statMetaHead_column[] = {
1939 { 0, "index_id",
1940 true
1941 },
1942 { 1, "index_version",
1943 true
1944 },
1945 { 2, "table_id",
1946 false
1947 },
1948 { 3, "frag_count",
1949 false
1950 },
1951 { 4, "value_format",
1952 false
1953 },
1954 { 5, "sample_version",
1955 false
1956 },
1957 { 6, "load_time",
1958 false
1959 },
1960 { 7, "sample_count",
1961 false
1962 },
1963 { 8, "key_bytes",
1964 false
1965 }
1966 };
1967
1968 const Trix::SysColumn
1969 Trix::g_statMetaSample_column[] = {
1970 { 0, "index_id",
1971 true
1972 },
1973 { 1, "index_version",
1974 true
1975 },
1976 { 2, "sample_version",
1977 true
1978 },
1979 { 3, "stat_key",
1980 true
1981 },
1982 { 4, "stat_value",
1983 false
1984 }
1985 };
1986
1987 const Trix::SysTable
1988 Trix::g_statMetaHead = {
1989 NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_HEAD_TABLE,
1990 ~(Uint32)0,
1991 sizeof(g_statMetaHead_column)/sizeof(g_statMetaHead_column[0]),
1992 g_statMetaHead_column
1993 };
1994
1995 const Trix::SysTable
1996 Trix::g_statMetaSample = {
1997 NDB_INDEX_STAT_DB "/" NDB_INDEX_STAT_SCHEMA "/" NDB_INDEX_STAT_SAMPLE_TABLE,
1998 ~(Uint32)0,
1999 sizeof(g_statMetaSample_column)/sizeof(g_statMetaSample_column[0]),
2000 g_statMetaSample_column
2001 };
2002
2003 const Trix::SysIndex
2004 Trix::g_statMetaSampleX1 = {
2005 // indexes are always in "sys"
2006 "sys" "/" NDB_INDEX_STAT_SCHEMA "/%u/" NDB_INDEX_STAT_SAMPLE_INDEX1,
2007 ~(Uint32)0,
2008 ~(Uint32)0
2009 };
2010
2011 void
statMetaGetHead(Signal * signal,StatOp & stat)2012 Trix::statMetaGetHead(Signal* signal, StatOp& stat)
2013 {
2014 D("statMetaGetHead" << V(stat));
2015 StatOp::Meta& meta = stat.m_meta;
2016 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetHeadCB);
2017 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2018 const char* name = g_statMetaHead.name;
2019 sendGetTabInfoReq(signal, stat, name);
2020 }
2021
2022 void
statMetaGetHeadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2023 Trix::statMetaGetHeadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2024 {
2025 StatOp& stat = statOpGetPtr(statPtrI);
2026 D("statMetaGetHeadCB" << V(stat) << V(ret));
2027 StatOp::Meta& meta = stat.m_meta;
2028 if (ret != 0)
2029 {
2030 jam();
2031 Uint32 supress[] = { GetTabInfoRef::TableNotDefined, 0 };
2032 statOpError(signal, stat, ret, __LINE__, supress);
2033 return;
2034 }
2035 g_statMetaHead.tableId = meta.m_conf.tableId;
2036 statMetaGetSample(signal, stat);
2037 }
2038
2039 void
statMetaGetSample(Signal * signal,StatOp & stat)2040 Trix::statMetaGetSample(Signal* signal, StatOp& stat)
2041 {
2042 D("statMetaGetSample" << V(stat));
2043 StatOp::Meta& meta = stat.m_meta;
2044 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleCB);
2045 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2046 const char* name = g_statMetaSample.name;
2047 sendGetTabInfoReq(signal, stat, name);
2048 }
2049
2050 void
statMetaGetSampleCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2051 Trix::statMetaGetSampleCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2052 {
2053 StatOp& stat = statOpGetPtr(statPtrI);
2054 D("statMetaGetSampleCB" << V(stat) << V(ret));
2055 StatOp::Meta& meta = stat.m_meta;
2056 if (ret != 0)
2057 {
2058 jam();
2059 statOpError(signal, stat, ret, __LINE__);
2060 return;
2061 }
2062 g_statMetaSample.tableId = meta.m_conf.tableId;
2063 statMetaGetSampleX1(signal, stat);
2064 }
2065
2066 void
statMetaGetSampleX1(Signal * signal,StatOp & stat)2067 Trix::statMetaGetSampleX1(Signal* signal, StatOp& stat)
2068 {
2069 D("statMetaGetSampleX1" << V(stat));
2070 StatOp::Meta& meta = stat.m_meta;
2071 meta.m_cb.m_callbackFunction = safe_cast(&Trix::statMetaGetSampleX1CB);
2072 meta.m_cb.m_callbackData = stat.m_ownPtrI;
2073 const char* name_fmt = g_statMetaSampleX1.name;
2074 char name[MAX_TAB_NAME_SIZE];
2075 BaseString::snprintf(name, sizeof(name), name_fmt, g_statMetaSample.tableId);
2076 sendGetTabInfoReq(signal, stat, name);
2077 }
2078
2079 void
statMetaGetSampleX1CB(Signal * signal,Uint32 statPtrI,Uint32 ret)2080 Trix::statMetaGetSampleX1CB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2081 {
2082 StatOp& stat = statOpGetPtr(statPtrI);
2083 D("statMetaGetSampleX1CB" << V(stat) << V(ret));
2084 StatOp::Meta& meta = stat.m_meta;
2085 if (ret != 0)
2086 {
2087 jam();
2088 statOpError(signal, stat, ret, __LINE__);
2089 return;
2090 }
2091 g_statMetaSampleX1.tableId = g_statMetaSample.tableId;
2092 g_statMetaSampleX1.indexId = meta.m_conf.tableId;
2093 statGetMetaDone(signal, stat);
2094 }
2095
2096 void
sendGetTabInfoReq(Signal * signal,StatOp & stat,const char * name)2097 Trix::sendGetTabInfoReq(Signal* signal, StatOp& stat, const char* name)
2098 {
2099 D("sendGetTabInfoReq" << V(stat) << V(name));
2100 GetTabInfoReq* req = (GetTabInfoReq*)signal->getDataPtrSend();
2101
2102 Uint32 name_len = (Uint32)strlen(name) + 1;
2103 Uint32 name_len_words = (name_len + 3 ) / 4;
2104 Uint32 name_buf[32];
2105 ndbrequire(name_len_words <= 32);
2106 memset(name_buf, 0, sizeof(name_buf));
2107 memcpy(name_buf, name, name_len);
2108
2109 req->senderData = stat.m_ownPtrI;
2110 req->senderRef = reference();
2111 req->requestType = GetTabInfoReq::RequestByName |
2112 GetTabInfoReq::LongSignalConf;
2113 req->tableNameLen = name_len;
2114 req->schemaTransId = 0;
2115 LinearSectionPtr ptr[3];
2116 ptr[0].p = name_buf;
2117 ptr[0].sz = name_len_words;
2118 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ,
2119 signal, GetTabInfoReq::SignalLength, JBB, ptr, 1);
2120 }
2121
2122 void
execGET_TABINFO_CONF(Signal * signal)2123 Trix::execGET_TABINFO_CONF(Signal* signal)
2124 {
2125 jamEntry();
2126 if (!assembleFragments(signal)) {
2127 jam();
2128 return;
2129 }
2130 const GetTabInfoConf* conf = (const GetTabInfoConf*)signal->getDataPtr();
2131 StatOp& stat = statOpGetPtr(conf->senderData);
2132 D("execGET_TABINFO_CONF" << V(stat));
2133 StatOp::Meta& meta = stat.m_meta;
2134 meta.m_conf = *conf;
2135
2136 // do not need DICTTABINFO
2137 SectionHandle handle(this, signal);
2138 releaseSections(handle);
2139
2140 execute(signal, meta.m_cb, 0);
2141 }
2142
2143 void
execGET_TABINFO_REF(Signal * signal)2144 Trix::execGET_TABINFO_REF(Signal* signal)
2145 {
2146 jamEntry();
2147 const GetTabInfoRef* ref = (const GetTabInfoRef*)signal->getDataPtr();
2148 StatOp& stat = statOpGetPtr(ref->senderData);
2149 D("execGET_TABINFO_REF" << V(stat));
2150 StatOp::Meta& meta = stat.m_meta;
2151
2152 ndbrequire(ref->errorCode != 0);
2153 execute(signal, meta.m_cb, ref->errorCode);
2154 }
2155
2156 // continue after metadata retrieval
2157
2158 void
statGetMetaDone(Signal * signal,StatOp & stat)2159 Trix::statGetMetaDone(Signal* signal, StatOp& stat)
2160 {
2161 const IndexStatImplReq* req = &stat.m_req;
2162 StatOp::Data& data = stat.m_data;
2163 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2164 D("statGetMetaDone" << V(stat));
2165
2166 // c_statGetMetaDone = true;
2167
2168 subRec->requestType = STAT_UTIL;
2169 // fill in constant part
2170 ndbrequire(req->fragCount != 0);
2171 data.m_indexId = req->indexId;
2172 data.m_indexVersion = req->indexVersion;
2173 data.m_fragCount = req->fragCount;
2174 statHeadRead(signal, stat);
2175 }
2176
2177 // head table ops
2178
2179 void
statHeadRead(Signal * signal,StatOp & stat)2180 Trix::statHeadRead(Signal* signal, StatOp& stat)
2181 {
2182 StatOp::Util& util = stat.m_util;
2183 StatOp::Send& send = stat.m_send;
2184 D("statHeadRead" << V(stat));
2185
2186 util.m_not_found = false;
2187 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadReadCB);
2188 util.m_cb.m_callbackData = stat.m_ownPtrI;
2189 send.m_sysTable = &g_statMetaHead;
2190 send.m_operationType = UtilPrepareReq::Read;
2191 statUtilPrepare(signal, stat);
2192 }
2193
2194 void
statHeadReadCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2195 Trix::statHeadReadCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2196 {
2197 StatOp& stat = statOpGetPtr(statPtrI);
2198 StatOp::Data& data = stat.m_data;
2199 StatOp::Util& util = stat.m_util;
2200 D("statHeadReadCB" << V(stat) << V(ret));
2201
2202 ndbrequire(ret == 0);
2203 data.m_head_found = !util.m_not_found;
2204 statReadHeadDone(signal, stat);
2205 }
2206
2207 void
statHeadInsert(Signal * signal,StatOp & stat)2208 Trix::statHeadInsert(Signal* signal, StatOp& stat)
2209 {
2210 StatOp::Util& util = stat.m_util;
2211 StatOp::Send& send = stat.m_send;
2212 D("statHeadInsert" << V(stat));
2213
2214 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadInsertCB);
2215 util.m_cb.m_callbackData = stat.m_ownPtrI;
2216 send.m_sysTable = &g_statMetaHead;
2217 send.m_operationType = UtilPrepareReq::Insert;
2218 statUtilPrepare(signal, stat);
2219 }
2220
2221 void
statHeadInsertCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2222 Trix::statHeadInsertCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2223 {
2224 StatOp& stat = statOpGetPtr(statPtrI);
2225 D("statHeadInsertCB" << V(stat) << V(ret));
2226
2227 ndbrequire(ret == 0);
2228 statInsertHeadDone(signal, stat);
2229 }
2230
2231 void
statHeadUpdate(Signal * signal,StatOp & stat)2232 Trix::statHeadUpdate(Signal* signal, StatOp& stat)
2233 {
2234 StatOp::Util& util = stat.m_util;
2235 StatOp::Send& send = stat.m_send;
2236 D("statHeadUpdate" << V(stat));
2237
2238 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadUpdateCB);
2239 util.m_cb.m_callbackData = stat.m_ownPtrI;
2240 send.m_sysTable = &g_statMetaHead;
2241 send.m_operationType = UtilPrepareReq::Update;
2242 statUtilPrepare(signal, stat);
2243 }
2244
2245 void
statHeadUpdateCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2246 Trix::statHeadUpdateCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2247 {
2248 StatOp& stat = statOpGetPtr(statPtrI);
2249 D("statHeadUpdateCB" << V(stat) << V(ret));
2250
2251 ndbrequire(ret == 0);
2252 statUpdateHeadDone(signal, stat);
2253 }
2254
2255 void
statHeadDelete(Signal * signal,StatOp & stat)2256 Trix::statHeadDelete(Signal* signal, StatOp& stat)
2257 {
2258 StatOp::Util& util = stat.m_util;
2259 StatOp::Send& send = stat.m_send;
2260 D("statHeadDelete" << V(stat));
2261
2262 util.m_cb.m_callbackFunction = safe_cast(&Trix::statHeadDeleteCB);
2263 util.m_cb.m_callbackData = stat.m_ownPtrI;
2264 send.m_sysTable = &g_statMetaHead;
2265 send.m_operationType = UtilPrepareReq::Delete;
2266 statUtilPrepare(signal, stat);
2267 }
2268
2269 void
statHeadDeleteCB(Signal * signal,Uint32 statPtrI,Uint32 ret)2270 Trix::statHeadDeleteCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
2271 {
2272 StatOp& stat = statOpGetPtr(statPtrI);
2273 D("statHeadDeleteCB" << V(stat) << V(ret));
2274
2275 ndbrequire(ret == 0);
2276 statDeleteHeadDone(signal, stat);
2277 }
2278
2279 // util (PK ops, only HEAD for now)
2280
2281 void
statUtilPrepare(Signal * signal,StatOp & stat)2282 Trix::statUtilPrepare(Signal* signal, StatOp& stat)
2283 {
2284 StatOp::Util& util = stat.m_util;
2285 D("statUtilPrepare" << V(stat));
2286
2287 util.m_prepareId = RNIL;
2288 statSendPrepare(signal, stat);
2289 }
2290
2291 void
statUtilPrepareConf(Signal * signal,Uint32 statPtrI)2292 Trix::statUtilPrepareConf(Signal* signal, Uint32 statPtrI)
2293 {
2294 StatOp& stat = statOpGetPtr(statPtrI);
2295 StatOp::Util& util = stat.m_util;
2296 StatOp::Send& send = stat.m_send;
2297 D("statUtilPrepareConf" << V(stat));
2298
2299 const UtilPrepareConf* utilConf =
2300 (const UtilPrepareConf*)signal->getDataPtr();
2301 util.m_prepareId = utilConf->prepareId;
2302
2303 const Uint32 ot = send.m_operationType;
2304 if ((ERROR_INSERTED(18011) && ot == UtilPrepareReq::Read) ||
2305 (ERROR_INSERTED(18012) && ot != UtilPrepareReq::Read))
2306 {
2307 jam();
2308 CLEAR_ERROR_INSERT_VALUE;
2309 UtilExecuteRef* utilRef =
2310 (UtilExecuteRef*)signal->getDataPtrSend();
2311 utilRef->senderData = stat.m_ownPtrI;
2312 utilRef->errorCode = UtilExecuteRef::AllocationError;
2313 utilRef->TCErrorCode = 0;
2314 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2315 signal, UtilExecuteRef::SignalLength, JBB);
2316 return;
2317 }
2318
2319 statUtilExecute(signal, stat);
2320 }
2321
2322 void
statUtilPrepareRef(Signal * signal,Uint32 statPtrI)2323 Trix::statUtilPrepareRef(Signal* signal, Uint32 statPtrI)
2324 {
2325 StatOp& stat = statOpGetPtr(statPtrI);
2326 D("statUtilPrepareRef" << V(stat));
2327
2328 const UtilPrepareRef* utilRef =
2329 (const UtilPrepareRef*)signal->getDataPtr();
2330 Uint32 errorCode = utilRef->errorCode;
2331 ndbrequire(errorCode != 0);
2332
2333 switch (errorCode) {
2334 case UtilPrepareRef::PREPARE_SEIZE_ERROR:
2335 case UtilPrepareRef::PREPARE_PAGES_SEIZE_ERROR:
2336 case UtilPrepareRef::PREPARED_OPERATION_SEIZE_ERROR:
2337 errorCode = IndexStatRef::BusyUtilPrepare;
2338 break;
2339 case UtilPrepareRef::DICT_TAB_INFO_ERROR:
2340 errorCode = IndexStatRef::InvalidSysTable;
2341 break;
2342 case UtilPrepareRef::MISSING_PROPERTIES_SECTION:
2343 default:
2344 ndbrequire(false);
2345 break;
2346 }
2347 statOpError(signal, stat, errorCode, __LINE__);
2348 }
2349
2350 void
statUtilExecute(Signal * signal,StatOp & stat)2351 Trix::statUtilExecute(Signal* signal, StatOp& stat)
2352 {
2353 StatOp::Util& util = stat.m_util;
2354 StatOp::Send& send = stat.m_send;
2355 D("statUtilExecute" << V(stat));
2356
2357 send.m_prepareId = util.m_prepareId;
2358 statSendExecute(signal, stat);
2359 }
2360
2361 void
statUtilExecuteConf(Signal * signal,Uint32 statPtrI)2362 Trix::statUtilExecuteConf(Signal* signal, Uint32 statPtrI)
2363 {
2364 StatOp& stat = statOpGetPtr(statPtrI);
2365 StatOp::Attr& attr = stat.m_attr;
2366 StatOp::Send& send = stat.m_send;
2367 D("statUtilExecuteConf" << V(stat));
2368
2369 if (send.m_operationType == UtilPrepareReq::Read)
2370 {
2371 jam();
2372 SectionHandle handle(this, signal);
2373 Uint32 rattr[20];
2374 Uint32 rdata[2048];
2375 attr.m_attr = rattr;
2376 attr.m_attrMax = 20;
2377 attr.m_attrSize = 0;
2378 attr.m_data = rdata;
2379 attr.m_dataMax = 2048;
2380 attr.m_dataSize = 0;
2381 {
2382 SegmentedSectionPtr ssPtr;
2383 handle.getSection(ssPtr, 0);
2384 ::copy(rattr, ssPtr);
2385 }
2386 {
2387 SegmentedSectionPtr ssPtr;
2388 handle.getSection(ssPtr, 1);
2389 ::copy(rdata, ssPtr);
2390 }
2391 releaseSections(handle);
2392
2393 const SysTable& sysTable = *send.m_sysTable;
2394 for (Uint32 i = 0; i < sysTable.columnCount; i++)
2395 {
2396 jam();
2397 statDataIn(stat, i);
2398 }
2399 }
2400
2401 statUtilRelease(signal, stat);
2402 }
2403
2404 void
statUtilExecuteRef(Signal * signal,Uint32 statPtrI)2405 Trix::statUtilExecuteRef(Signal* signal, Uint32 statPtrI)
2406 {
2407 StatOp& stat = statOpGetPtr(statPtrI);
2408 StatOp::Util& util = stat.m_util;
2409 StatOp::Send& send = stat.m_send;
2410 D("statUtilExecuteRef" << V(stat));
2411
2412 const UtilExecuteRef* utilRef =
2413 (const UtilExecuteRef*)signal->getDataPtr();
2414 Uint32 errorCode = utilRef->errorCode;
2415 ndbrequire(errorCode != 0);
2416
2417 switch (errorCode) {
2418 case UtilExecuteRef::TCError:
2419 errorCode = utilRef->TCErrorCode;
2420 ndbrequire(errorCode != 0);
2421 if (send.m_operationType == UtilPrepareReq::Read &&
2422 errorCode == ZNOT_FOUND)
2423 {
2424 jam();
2425 util.m_not_found = true;
2426 errorCode = 0;
2427 }
2428 break;
2429 case UtilExecuteRef::AllocationError:
2430 errorCode = IndexStatRef::BusyUtilExecute;
2431 break;
2432 default:
2433 ndbrequire(false);
2434 break;
2435 }
2436
2437 if (errorCode != 0)
2438 {
2439 jam();
2440 statOpError(signal, stat, errorCode, __LINE__);
2441 return;
2442 }
2443 statUtilRelease(signal, stat);
2444 }
2445
2446 void
statUtilRelease(Signal * signal,StatOp & stat)2447 Trix::statUtilRelease(Signal* signal, StatOp& stat)
2448 {
2449 StatOp::Util& util = stat.m_util;
2450 StatOp::Send& send = stat.m_send;
2451 D("statUtilRelease" << V(stat));
2452
2453 send.m_prepareId = util.m_prepareId;
2454 statSendRelease(signal, stat);
2455 }
2456
2457 void
statUtilReleaseConf(Signal * signal,Uint32 statPtrI)2458 Trix::statUtilReleaseConf(Signal* signal, Uint32 statPtrI)
2459 {
2460 StatOp& stat = statOpGetPtr(statPtrI);
2461 StatOp::Util& util = stat.m_util;
2462 D("statUtilReleaseConf" << V(stat));
2463
2464 util.m_prepareId = RNIL;
2465 execute(signal, util.m_cb, 0);
2466 }
2467
2468 // continue after head table ops
2469
2470 void
statReadHeadDone(Signal * signal,StatOp & stat)2471 Trix::statReadHeadDone(Signal* signal, StatOp& stat)
2472 {
2473 //UNUSED StatOp::Data& data = stat.m_data;
2474 D("statReadHeadDone" << V(stat));
2475
2476 switch (stat.m_requestType) {
2477 case IndexStatReq::RT_CLEAN_NEW:
2478 jam();
2479 case IndexStatReq::RT_CLEAN_OLD:
2480 jam();
2481 case IndexStatReq::RT_CLEAN_ALL:
2482 jam();
2483 statCleanBegin(signal, stat);
2484 break;
2485
2486 case IndexStatReq::RT_SCAN_FRAG:
2487 jam();
2488 statScanBegin(signal, stat);
2489 break;
2490
2491 case IndexStatReq::RT_DROP_HEAD:
2492 jam();
2493 statDropBegin(signal, stat);
2494 break;
2495
2496 default:
2497 ndbrequire(false);
2498 break;
2499 }
2500 }
2501
2502 void
statInsertHeadDone(Signal * signal,StatOp & stat)2503 Trix::statInsertHeadDone(Signal* signal, StatOp& stat)
2504 {
2505 D("statInsertHeadDone" << V(stat));
2506
2507 switch (stat.m_requestType) {
2508 case IndexStatReq::RT_SCAN_FRAG:
2509 jam();
2510 statScanEnd(signal, stat);
2511 break;
2512 default:
2513 ndbrequire(false);
2514 break;
2515 }
2516 }
2517
2518 void
statUpdateHeadDone(Signal * signal,StatOp & stat)2519 Trix::statUpdateHeadDone(Signal* signal, StatOp& stat)
2520 {
2521 D("statUpdateHeadDone" << V(stat));
2522
2523 switch (stat.m_requestType) {
2524 case IndexStatReq::RT_SCAN_FRAG:
2525 jam();
2526 statScanEnd(signal, stat);
2527 break;
2528 default:
2529 ndbrequire(false);
2530 break;
2531 }
2532 }
2533
2534 void
statDeleteHeadDone(Signal * signal,StatOp & stat)2535 Trix::statDeleteHeadDone(Signal* signal, StatOp& stat)
2536 {
2537 D("statDeleteHeadDone" << V(stat));
2538
2539 switch (stat.m_requestType) {
2540 case IndexStatReq::RT_DROP_HEAD:
2541 jam();
2542 statDropEnd(signal, stat);
2543 break;
2544 default:
2545 ndbrequire(false);
2546 break;
2547 }
2548 }
2549
2550 // clean
2551
2552 void
statCleanBegin(Signal * signal,StatOp & stat)2553 Trix::statCleanBegin(Signal* signal, StatOp& stat)
2554 {
2555 const IndexStatImplReq* req = &stat.m_req;
2556 StatOp::Data& data = stat.m_data;
2557 D("statCleanBegin" << V(stat));
2558
2559 if (data.m_head_found == true)
2560 {
2561 jam();
2562 if (data.m_tableId != req->tableId &&
2563 stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2564 {
2565 jam();
2566 // must run ndb_index_stat --drop
2567 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2568 return;
2569 }
2570 }
2571 else
2572 {
2573 if (stat.m_requestType != IndexStatReq::RT_CLEAN_ALL)
2574 {
2575 jam();
2576 // happens normally on first stats scan
2577 stat.m_requestType = IndexStatReq::RT_CLEAN_ALL;
2578 }
2579 }
2580 statCleanPrepare(signal, stat);
2581 }
2582
2583 void
statCleanPrepare(Signal * signal,StatOp & stat)2584 Trix::statCleanPrepare(Signal* signal, StatOp& stat)
2585 {
2586 const IndexStatImplReq* req = &stat.m_req;
2587 StatOp::Data& data = stat.m_data;
2588 StatOp::Clean& clean = stat.m_clean;
2589 StatOp::Send& send = stat.m_send;
2590 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2591 D("statCleanPrepare" << V(stat));
2592
2593 // count of deleted samples is just for info
2594 clean.m_cleanCount = 0;
2595
2596 const Uint32 ao_list[] = {
2597 0, // INDEX_ID
2598 1, // INDEX_VERSION
2599 2, // SAMPLE_VERSION
2600 3 // STAT_KEY
2601 };
2602 const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2603
2604 ndbrequire(req->fragId == ZNIL);
2605 subRec->m_flags = 0;
2606 subRec->requestType = STAT_CLEAN;
2607 subRec->schemaTransId = req->transId;
2608 subRec->userReference = 0; // not used
2609 subRec->connectionPtr = RNIL;
2610 subRec->subscriptionId = rand();
2611 subRec->subscriptionKey = rand();
2612 subRec->prepareId = RNIL;
2613 subRec->indexType = 0; // not used
2614 subRec->sourceTableId = g_statMetaSampleX1.indexId;
2615 subRec->targetTableId = RNIL;
2616 subRec->noOfIndexColumns = ao_size;
2617 subRec->noOfKeyColumns = 0;
2618 subRec->parallelism = 16;
2619 subRec->fragCount = 0;
2620 subRec->fragId = ZNIL;
2621 subRec->syncPtr = RNIL;
2622 subRec->errorCode = BuildIndxRef::NoError;
2623 subRec->subscriptionCreated = false;
2624 subRec->pendingSubSyncContinueConf = false;
2625 subRec->expectedConf = 0;
2626 subRec->m_rows_processed = 0;
2627 subRec->m_gci = 0;
2628
2629 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2630 ndbrequire(ao_buf.isEmpty());
2631 ao_buf.append(ao_list, ao_size);
2632
2633 // create TUX bounds
2634 clean.m_bound[0] = TuxBoundInfo::BoundEQ;
2635 clean.m_bound[1] = AttributeHeader(0, 4).m_value;
2636 clean.m_bound[2] = data.m_indexId;
2637 clean.m_bound[3] = TuxBoundInfo::BoundEQ;
2638 clean.m_bound[4] = AttributeHeader(1, 4).m_value;
2639 clean.m_bound[5] = data.m_indexVersion;
2640 switch (stat.m_requestType) {
2641 case IndexStatReq::RT_CLEAN_NEW:
2642 D("statCleanPrepare delete sample versions > " << data.m_sampleVersion);
2643 clean.m_bound[6] = TuxBoundInfo::BoundLT;
2644 clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2645 clean.m_bound[8] = data.m_sampleVersion;
2646 clean.m_boundCount = 3;
2647 break;
2648 case IndexStatReq::RT_CLEAN_OLD:
2649 D("statCleanPrepare delete sample versions < " << data.m_sampleVersion);
2650 clean.m_bound[6] = TuxBoundInfo::BoundGT;
2651 clean.m_bound[7] = AttributeHeader(2, 4).m_value;
2652 clean.m_bound[8] = data.m_sampleVersion;
2653 clean.m_boundCount = 3;
2654 break;
2655 case IndexStatReq::RT_CLEAN_ALL:
2656 D("statCleanPrepare delete all sample versions");
2657 clean.m_boundCount = 2;
2658 break;
2659 default:
2660 ndbrequire(false);
2661 break;
2662 }
2663 clean.m_boundSize = 3 * clean.m_boundCount;
2664
2665 // TRIX traps the CONF
2666 send.m_sysTable = &g_statMetaSample;
2667 send.m_operationType = UtilPrepareReq::Delete;
2668 statSendPrepare(signal, stat);
2669 }
2670
2671 void
statCleanExecute(Signal * signal,StatOp & stat)2672 Trix::statCleanExecute(Signal* signal, StatOp& stat)
2673 {
2674 StatOp::Data& data = stat.m_data;
2675 StatOp::Send& send = stat.m_send;
2676 StatOp::Clean& clean = stat.m_clean;
2677 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2678 D("statCleanExecute" << V(stat));
2679
2680 CRASH_INSERTION(18025);
2681
2682 SectionHandle handle(this, signal);
2683 ndbrequire(handle.m_cnt == 2);
2684
2685 // ATTR_INFO
2686 AttributeHeader ah[4];
2687 SegmentedSectionPtr ptr0;
2688 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2689 ndbrequire(ptr0.sz == 4);
2690 ::copy((Uint32*)ah, ptr0);
2691 ndbrequire(ah[0].getAttributeId() == 0 && ah[0].getDataSize() == 1);
2692 ndbrequire(ah[1].getAttributeId() == 1 && ah[1].getDataSize() == 1);
2693 ndbrequire(ah[2].getAttributeId() == 2 && ah[2].getDataSize() == 1);
2694 // read via TUP rounds bytes to words
2695 const Uint32 kz = ah[3].getDataSize();
2696 ndbrequire(ah[3].getAttributeId() == 3 && kz != 0);
2697
2698 // AFTER_VALUES
2699 const Uint32 avmax = 3 + MAX_INDEX_STAT_KEY_SIZE;
2700 Uint32 av[avmax];
2701 SegmentedSectionPtr ptr1;
2702 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2703 ndbrequire(ptr1.sz <= avmax);
2704 ::copy(av, ptr1);
2705 ndbrequire(data.m_indexId == av[0]);
2706 ndbrequire(data.m_indexVersion == av[1]);
2707 data.m_sampleVersion = av[2];
2708 data.m_statKey = &av[3];
2709 const unsigned char* kp = (const unsigned char*)data.m_statKey;
2710 const Uint32 kb = kp[0] + (kp[1] << 8);
2711 // key is not empty
2712 ndbrequire(kb != 0);
2713 ndbrequire(kz == ((2 + kb) + 3) / 4);
2714
2715 clean.m_cleanCount++;
2716 releaseSections(handle);
2717
2718 const Uint32 rt = stat.m_requestType;
2719 if ((ERROR_INSERTED(18021) && rt == IndexStatReq::RT_CLEAN_NEW) ||
2720 (ERROR_INSERTED(18022) && rt == IndexStatReq::RT_CLEAN_OLD) ||
2721 (ERROR_INSERTED(18023) && rt == IndexStatReq::RT_CLEAN_ALL))
2722 {
2723 jam();
2724 CLEAR_ERROR_INSERT_VALUE;
2725 UtilExecuteRef* utilRef =
2726 (UtilExecuteRef*)signal->getDataPtrSend();
2727 utilRef->senderData = stat.m_ownPtrI;
2728 utilRef->errorCode = UtilExecuteRef::TCError;
2729 utilRef->TCErrorCode = 626;
2730 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2731 signal, UtilExecuteRef::SignalLength, JBB);
2732 subRec->expectedConf++;
2733 return;
2734 }
2735
2736 // TRIX traps the CONF
2737 send.m_sysTable = &g_statMetaSample;
2738 send.m_operationType = UtilPrepareReq::Delete;
2739 send.m_prepareId = subRec->prepareId;
2740 subRec->expectedConf++;
2741 statSendExecute(signal, stat);
2742 }
2743
2744 void
statCleanRelease(Signal * signal,StatOp & stat)2745 Trix::statCleanRelease(Signal* signal, StatOp& stat)
2746 {
2747 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2748 D("statCleanRelease" << V(stat) << V(subRec->errorCode));
2749
2750 if (subRec->errorCode != 0)
2751 {
2752 jam();
2753 statOpError(signal, stat, subRec->errorCode, __LINE__);
2754 return;
2755 }
2756 statCleanEnd(signal, stat);
2757 }
2758
2759 void
statCleanEnd(Signal * signal,StatOp & stat)2760 Trix::statCleanEnd(Signal* signal, StatOp& stat)
2761 {
2762 D("statCleanEnd" << V(stat));
2763 statOpSuccess(signal, stat);
2764 }
2765
2766 // scan
2767
2768 void
statScanBegin(Signal * signal,StatOp & stat)2769 Trix::statScanBegin(Signal* signal, StatOp& stat)
2770 {
2771 const IndexStatImplReq* req = &stat.m_req;
2772 StatOp::Data& data = stat.m_data;
2773 D("statScanBegin" << V(stat));
2774
2775 if (data.m_head_found == true &&
2776 data.m_tableId != req->tableId)
2777 {
2778 jam();
2779 statOpError(signal, stat, IndexStatRef::InvalidSysTableData, __LINE__);
2780 return;
2781 }
2782 data.m_tableId = req->tableId;
2783 statScanPrepare(signal, stat);
2784 }
2785
2786 void
statScanPrepare(Signal * signal,StatOp & stat)2787 Trix::statScanPrepare(Signal* signal, StatOp& stat)
2788 {
2789 const IndexStatImplReq* req = &stat.m_req;
2790 StatOp::Data& data = stat.m_data;
2791 StatOp::Scan& scan = stat.m_scan;
2792 StatOp::Send& send = stat.m_send;
2793 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2794 D("statScanPrepare" << V(stat));
2795
2796 // update sample version prior to scan
2797 if (data.m_head_found == false)
2798 data.m_sampleVersion = 0;
2799 data.m_sampleVersion += 1;
2800
2801 // zero totals
2802 scan.m_sampleCount = 0;
2803 scan.m_keyBytes = 0;
2804
2805 const Uint32 ao_list[] = {
2806 AttributeHeader::INDEX_STAT_KEY,
2807 AttributeHeader::INDEX_STAT_VALUE
2808 };
2809 const Uint32 ao_size = sizeof(ao_list)/sizeof(ao_list[0]);
2810
2811 ndbrequire(req->fragId != ZNIL);
2812 subRec->m_flags = 0;
2813 subRec->requestType = STAT_SCAN;
2814 subRec->schemaTransId = req->transId;
2815 subRec->userReference = 0; // not used
2816 subRec->connectionPtr = RNIL;
2817 subRec->subscriptionId = rand();
2818 subRec->subscriptionKey = rand();
2819 subRec->prepareId = RNIL;
2820 subRec->indexType = 0; // not used
2821 subRec->sourceTableId = data.m_indexId;
2822 subRec->targetTableId = RNIL;
2823 subRec->noOfIndexColumns = ao_size;
2824 subRec->noOfKeyColumns = 0;
2825 subRec->parallelism = 16;
2826 subRec->fragCount = 0; // XXX Suma currently checks all frags
2827 subRec->fragId = req->fragId;
2828 subRec->syncPtr = RNIL;
2829 subRec->errorCode = BuildIndxRef::NoError;
2830 subRec->subscriptionCreated = false;
2831 subRec->pendingSubSyncContinueConf = false;
2832 subRec->expectedConf = 0;
2833 subRec->m_rows_processed = 0;
2834 subRec->m_gci = 0;
2835
2836 AttrOrderBuffer& ao_buf = subRec->attributeOrder;
2837 ndbrequire(ao_buf.isEmpty());
2838 ao_buf.append(ao_list, ao_size);
2839
2840 // TRIX traps the CONF
2841 send.m_sysTable = &g_statMetaSample;
2842 send.m_operationType = UtilPrepareReq::Insert;
2843 statSendPrepare(signal, stat);
2844 }
2845
2846 void
statScanExecute(Signal * signal,StatOp & stat)2847 Trix::statScanExecute(Signal* signal, StatOp& stat)
2848 {
2849 StatOp::Data& data = stat.m_data;
2850 StatOp::Scan& scan = stat.m_scan;
2851 StatOp::Send& send = stat.m_send;
2852 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2853 D("statScanExecute" << V(stat));
2854
2855 CRASH_INSERTION(18026);
2856
2857 SectionHandle handle(this, signal);
2858 ndbrequire(handle.m_cnt == 2);
2859
2860 // ATTR_INFO
2861 AttributeHeader ah[2];
2862 SegmentedSectionPtr ptr0;
2863 handle.getSection(ptr0, SubTableData::ATTR_INFO);
2864 ndbrequire(ptr0.sz == 2);
2865 ::copy((Uint32*)ah, ptr0);
2866 ndbrequire(ah[0].getAttributeId() == AttributeHeader::INDEX_STAT_KEY);
2867 ndbrequire(ah[1].getAttributeId() == AttributeHeader::INDEX_STAT_VALUE);
2868 // read via TUP rounds bytes to words
2869 const Uint32 kz = ah[0].getDataSize();
2870 const Uint32 vz = ah[1].getDataSize();
2871 ndbrequire(kz != 0 && vz != 0);
2872
2873 // AFTER_VALUES
2874 const Uint32 avmax = MAX_INDEX_STAT_KEY_SIZE + MAX_INDEX_STAT_VALUE_SIZE;
2875 Uint32 av[avmax];
2876 SegmentedSectionPtr ptr1;
2877 handle.getSection(ptr1, SubTableData::AFTER_VALUES);
2878 ndbrequire(ptr1.sz <= avmax);
2879 ::copy(av, ptr1);
2880 data.m_statKey = &av[0];
2881 data.m_statValue = &av[kz];
2882 const unsigned char* kp = (const unsigned char*)data.m_statKey;
2883 const unsigned char* vp = (const unsigned char*)data.m_statValue;
2884 const Uint32 kb = kp[0] + (kp[1] << 8);
2885 const Uint32 vb = vp[0] + (vp[1] << 8);
2886 // key and value are not empty
2887 ndbrequire(kb != 0 && vb != 0);
2888 ndbrequire(kz == ((2 + kb) + 3) / 4);
2889 ndbrequire(vz == ((2 + vb) + 3) / 4);
2890
2891 scan.m_sampleCount++;
2892 scan.m_keyBytes += kb;
2893 releaseSections(handle);
2894
2895 if (ERROR_INSERTED(18024))
2896 {
2897 jam();
2898 CLEAR_ERROR_INSERT_VALUE;
2899 UtilExecuteRef* utilRef =
2900 (UtilExecuteRef*)signal->getDataPtrSend();
2901 utilRef->senderData = stat.m_ownPtrI;
2902 utilRef->errorCode = UtilExecuteRef::TCError;
2903 utilRef->TCErrorCode = 630;
2904 sendSignal(reference(), GSN_UTIL_EXECUTE_REF,
2905 signal, UtilExecuteRef::SignalLength, JBB);
2906 subRec->expectedConf++;
2907 return;
2908 }
2909
2910 // TRIX traps the CONF
2911 send.m_sysTable = &g_statMetaSample;
2912 send.m_operationType = UtilPrepareReq::Insert;
2913 send.m_prepareId = subRec->prepareId;
2914 subRec->expectedConf++;
2915 statSendExecute(signal, stat);
2916 }
2917
2918 void
statScanRelease(Signal * signal,StatOp & stat)2919 Trix::statScanRelease(Signal* signal, StatOp& stat)
2920 {
2921 StatOp::Data& data = stat.m_data;
2922 StatOp::Scan& scan = stat.m_scan;
2923 SubscriptionRecord* subRec = c_theSubscriptions.getPtr(stat.m_subRecPtrI);
2924 D("statScanRelease" << V(stat) << V(subRec->errorCode));
2925
2926 if (subRec->errorCode != 0)
2927 {
2928 jam();
2929 statOpError(signal, stat, subRec->errorCode, __LINE__);
2930 return;
2931 }
2932 subRec->requestType = STAT_UTIL;
2933
2934 const Uint32 now = (Uint32)time(0);
2935 data.m_loadTime = now;
2936 data.m_sampleCount = scan.m_sampleCount;
2937 data.m_keyBytes = scan.m_keyBytes;
2938 data.m_valueFormat = MAX_INDEX_STAT_VALUE_FORMAT;
2939
2940 if (data.m_head_found == false)
2941 {
2942 jam();
2943 statHeadInsert(signal, stat);
2944 }
2945 else
2946 {
2947 jam();
2948 statHeadUpdate(signal, stat);
2949 }
2950 }
2951
2952 void
statScanEnd(Signal * signal,StatOp & stat)2953 Trix::statScanEnd(Signal* signal, StatOp& stat)
2954 {
2955 StatOp::Data& data = stat.m_data;
2956 const IndexStatImplReq* req = &stat.m_req;
2957 D("statScanEnd" << V(stat));
2958
2959 /*
2960 * TRIX reports stats load time to TUX for proper stats monitoring.
2961 * Passing this via DBDICT RT_START_MON is not feasible. For MT-LQH
2962 * we prefer DbtuxProxy to avoid introducing MT-LQH into TRIX.
2963 */
2964
2965 #if trix_index_stat_rep_to_tux_instance
2966 Uint32 instanceKey = getInstanceKey(req->indexId, req->fragId);
2967 BlockReference tuxRef = numberToRef(DBTUX, instanceKey, getOwnNodeId());
2968 #else
2969 BlockReference tuxRef = DBTUX_REF;
2970 #endif
2971
2972 IndexStatRep* rep = (IndexStatRep*)signal->getDataPtrSend();
2973 rep->senderRef = reference();
2974 rep->senderData = 0;
2975 rep->requestType = IndexStatRep::RT_UPDATE_CONF;
2976 rep->requestFlag = 0;
2977 rep->indexId = req->indexId;
2978 rep->indexVersion = req->indexVersion;
2979 rep->tableId = req->tableId;
2980 rep->fragId = req->fragId;
2981 rep->loadTime = data.m_loadTime;
2982 sendSignal(tuxRef, GSN_INDEX_STAT_REP,
2983 signal, IndexStatRep::SignalLength, JBB);
2984
2985 statOpSuccess(signal, stat);
2986 }
2987
2988 // drop
2989
2990 void
statDropBegin(Signal * signal,StatOp & stat)2991 Trix::statDropBegin(Signal* signal, StatOp& stat)
2992 {
2993 StatOp::Data& data = stat.m_data;
2994 D("statDropBegin" << V(stat));
2995
2996 if (data.m_head_found == true)
2997 {
2998 jam();
2999 statHeadDelete(signal, stat);
3000 return;
3001 }
3002 statDropEnd(signal, stat);
3003 }
3004
3005 void
statDropEnd(Signal * signal,StatOp & stat)3006 Trix::statDropEnd(Signal* signal, StatOp& stat)
3007 {
3008 D("statDropEnd");
3009 statOpSuccess(signal, stat);
3010 }
3011
3012 // send
3013
3014 void
statSendPrepare(Signal * signal,StatOp & stat)3015 Trix::statSendPrepare(Signal* signal, StatOp& stat)
3016 {
3017 StatOp::Send& send = stat.m_send;
3018 const IndexStatImplReq* req = &stat.m_req;
3019 const SysTable& sysTable = *send.m_sysTable;
3020 D("statSendPrepare" << V(stat));
3021
3022 UtilPrepareReq* utilReq =
3023 (UtilPrepareReq*)signal->getDataPtrSend();
3024 utilReq->senderData = stat.m_ownPtrI;
3025 utilReq->senderRef = reference();
3026 utilReq->schemaTransId = req->transId;
3027
3028 Uint32 wbuf[256];
3029 LinearWriter w(&wbuf[0], sizeof(wbuf) >> 2);
3030
3031 w.first();
3032 w.add(UtilPrepareReq::NoOfOperations, 1);
3033 w.add(UtilPrepareReq::OperationType, send.m_operationType);
3034 w.add(UtilPrepareReq::TableId, sysTable.tableId);
3035
3036 Uint32 i;
3037 for (i = 0; i < sysTable.columnCount; i++) {
3038 const SysColumn& c = sysTable.columnList[i];
3039 switch (send.m_operationType) {
3040 case UtilPrepareReq::Read:
3041 case UtilPrepareReq::Insert:
3042 case UtilPrepareReq::Update:
3043 jam();
3044 w.add(UtilPrepareReq::AttributeId, i);
3045 break;
3046 case UtilPrepareReq::Delete:
3047 jam();
3048 if (c.keyFlag)
3049 w.add(UtilPrepareReq::AttributeId, i);
3050 break;
3051 default:
3052 ndbrequire(false);
3053 break;
3054 }
3055 }
3056
3057 LinearSectionPtr ptr[3];
3058 ptr[0].p = &wbuf[0];
3059 ptr[0].sz = w.getWordsUsed();
3060 sendSignal(DBUTIL_REF, GSN_UTIL_PREPARE_REQ,
3061 signal, UtilPrepareReq::SignalLength, JBB, ptr, 1);
3062 }
3063
3064 void
statSendExecute(Signal * signal,StatOp & stat)3065 Trix::statSendExecute(Signal* signal, StatOp& stat)
3066 {
3067 D("statSendExecute" << V(stat));
3068 StatOp::Send& send = stat.m_send;
3069 StatOp::Attr& attr = stat.m_attr;
3070 const SysTable& sysTable = *send.m_sysTable;
3071
3072 UtilExecuteReq* utilReq =
3073 (UtilExecuteReq*)signal->getDataPtrSend();
3074 utilReq->senderData = stat.m_ownPtrI;
3075 utilReq->senderRef = reference();
3076 utilReq->prepareId = send.m_prepareId;
3077 utilReq->scanTakeOver = 0;
3078
3079 Uint32 wattr[20];
3080 Uint32 wdata[2048];
3081 attr.m_attr = wattr;
3082 attr.m_attrMax = 20;
3083 attr.m_attrSize = 0;
3084 attr.m_data = wdata;
3085 attr.m_dataMax = 2048;
3086 attr.m_dataSize = 0;
3087
3088 for (Uint32 i = 0; i < sysTable.columnCount; i++) {
3089 const SysColumn& c = sysTable.columnList[i];
3090 switch (send.m_operationType) {
3091 case UtilPrepareReq::Read:
3092 case UtilPrepareReq::Insert:
3093 case UtilPrepareReq::Update:
3094 jam();
3095 statDataOut(stat, i);
3096 break;
3097 case UtilPrepareReq::Delete:
3098 jam();
3099 if (c.keyFlag)
3100 statDataOut(stat, i);
3101 break;
3102 default:
3103 ndbrequire(false);
3104 break;
3105 }
3106 }
3107
3108 LinearSectionPtr ptr[3];
3109 ptr[0].p = attr.m_attr;
3110 ptr[0].sz = attr.m_attrSize;
3111 ptr[1].p = attr.m_data;
3112 ptr[1].sz = attr.m_dataSize;
3113 sendSignal(DBUTIL_REF, GSN_UTIL_EXECUTE_REQ,
3114 signal, UtilExecuteReq::SignalLength, JBB, ptr, 2);
3115 }
3116
3117 void
statSendRelease(Signal * signal,StatOp & stat)3118 Trix::statSendRelease(Signal* signal, StatOp& stat)
3119 {
3120 D("statSendRelease" << V(stat));
3121 StatOp::Send& send = stat.m_send;
3122 ndbrequire(send.m_prepareId != RNIL);
3123
3124 UtilReleaseReq* utilReq =
3125 (UtilReleaseReq*)signal->getDataPtrSend();
3126 utilReq->senderData = stat.m_ownPtrI;
3127 utilReq->prepareId = send.m_prepareId;
3128 sendSignal(DBUTIL_REF, GSN_UTIL_RELEASE_REQ,
3129 signal, UtilReleaseReq::SignalLength, JBB);
3130 }
3131
3132 // data
3133
3134 void
statDataPtr(StatOp & stat,Uint32 i,Uint32 * & dptr,Uint32 & bytes)3135 Trix::statDataPtr(StatOp& stat, Uint32 i, Uint32*& dptr, Uint32& bytes)
3136 {
3137 StatOp::Data& data = stat.m_data;
3138 StatOp::Send& send = stat.m_send;
3139
3140 const SysTable& sysTable = *send.m_sysTable;
3141 ndbrequire(i < sysTable.columnCount);
3142 //UNUSED const SysColumn& c = sysTable.columnList[i];
3143
3144 if (&sysTable == &g_statMetaHead)
3145 {
3146 switch (i) {
3147 case 0:
3148 dptr = &data.m_indexId;
3149 bytes = 4;
3150 break;
3151 case 1:
3152 dptr = &data.m_indexVersion;
3153 bytes = 4;
3154 break;
3155 case 2:
3156 dptr = &data.m_tableId;
3157 bytes = 4;
3158 break;
3159 case 3:
3160 dptr = &data.m_fragCount;
3161 bytes = 4;
3162 break;
3163 case 4:
3164 dptr = &data.m_valueFormat;
3165 bytes = 4;
3166 break;
3167 case 5:
3168 dptr = &data.m_sampleVersion;
3169 bytes = 4;
3170 break;
3171 case 6:
3172 dptr = &data.m_loadTime;
3173 bytes = 4;
3174 break;
3175 case 7:
3176 dptr = &data.m_sampleCount;
3177 bytes = 4;
3178 break;
3179 case 8:
3180 dptr = &data.m_keyBytes;
3181 bytes = 4;
3182 break;
3183 default:
3184 ndbrequire(false);
3185 break;
3186 }
3187 return;
3188 }
3189
3190 if (&sysTable == &g_statMetaSample)
3191 {
3192 switch (i) {
3193 case 0:
3194 dptr = &data.m_indexId;
3195 bytes = 4;
3196 break;
3197 case 1:
3198 dptr = &data.m_indexVersion;
3199 bytes = 4;
3200 break;
3201 case 2:
3202 dptr = &data.m_sampleVersion;
3203 bytes = 4;
3204 break;
3205 case 3:
3206 {
3207 dptr = data.m_statKey;
3208 const uchar* p = (uchar*)dptr;
3209 ndbrequire(p != 0);
3210 bytes = 2 + p[0] + (p[1] << 8);
3211 }
3212 break;
3213 case 4:
3214 {
3215 dptr = data.m_statValue;
3216 const uchar* p = (uchar*)dptr;
3217 ndbrequire(p != 0);
3218 bytes = 2 + p[0] + (p[1] << 8);
3219 }
3220 break;
3221 default:
3222 ndbrequire(false);
3223 break;
3224 }
3225 return;
3226 }
3227
3228 ndbrequire(false);
3229 }
3230
3231 void
statDataOut(StatOp & stat,Uint32 i)3232 Trix::statDataOut(StatOp& stat, Uint32 i)
3233 {
3234 StatOp::Attr& attr = stat.m_attr;
3235 Uint32* dptr = 0;
3236 Uint32 bytes = 0;
3237 statDataPtr(stat, i, dptr, bytes);
3238
3239 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3240 AttributeHeader::init(&attr.m_attr[attr.m_attrSize], i, bytes);
3241 attr.m_attrSize++;
3242
3243 Uint32 words = (bytes + 3) / 4;
3244 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3245 Uint8* dst = (Uint8*)&attr.m_data[attr.m_dataSize];
3246 memcpy(dst, dptr, bytes);
3247 while (bytes < words * 4)
3248 dst[bytes++] = 0;
3249 attr.m_dataSize += words;
3250 D("statDataOut" << V(i) << V(bytes) << hex << V(dptr[0]));
3251 }
3252
3253 void
statDataIn(StatOp & stat,Uint32 i)3254 Trix::statDataIn(StatOp& stat, Uint32 i)
3255 {
3256 StatOp::Attr& attr = stat.m_attr;
3257 Uint32* dptr = 0;
3258 Uint32 bytes = 0;
3259 statDataPtr(stat, i, dptr, bytes);
3260
3261 ndbrequire(attr.m_attrSize + 1 <= attr.m_attrMax);
3262 const AttributeHeader& ah = attr.m_attr[attr.m_attrSize];
3263 attr.m_attrSize++;
3264
3265 ndbrequire(ah.getByteSize() == bytes);
3266 Uint32 words = (bytes + 3) / 4;
3267 ndbrequire(attr.m_dataSize + words <= attr.m_dataMax);
3268 const char* src = (const char*)&attr.m_data[attr.m_dataSize];
3269 memcpy(dptr, src, bytes);
3270 attr.m_dataSize += words;
3271 D("statDataIn" << V(i) << V(bytes) << hex << V(dptr[0]));
3272 }
3273
3274 // abort ongoing
3275
3276 void
statAbortUtil(Signal * signal,StatOp & stat)3277 Trix::statAbortUtil(Signal* signal, StatOp& stat)
3278 {
3279 StatOp::Util& util = stat.m_util;
3280 D("statAbortUtil" << V(stat));
3281
3282 ndbrequire(util.m_prepareId != RNIL);
3283 util.m_cb.m_callbackFunction = safe_cast(&Trix::statAbortUtilCB);
3284 util.m_cb.m_callbackData = stat.m_ownPtrI;
3285 statUtilRelease(signal, stat);
3286 }
3287
3288 void
statAbortUtilCB(Signal * signal,Uint32 statPtrI,Uint32 ret)3289 Trix::statAbortUtilCB(Signal* signal, Uint32 statPtrI, Uint32 ret)
3290 {
3291 StatOp& stat = statOpGetPtr(statPtrI);
3292 D("statAbortUtilCB" << V(stat) << V(ret));
3293
3294 ndbrequire(ret == 0);
3295 statOpAbort(signal, stat);
3296 }
3297
3298 // conf and ref
3299
3300 void
statOpSuccess(Signal * signal,StatOp & stat)3301 Trix::statOpSuccess(Signal* signal, StatOp& stat)
3302 {
3303 StatOp::Data& data = stat.m_data;
3304 D("statOpSuccess" << V(stat));
3305
3306 if (stat.m_requestType == IndexStatReq::RT_SCAN_FRAG)
3307 statOpEvent(stat, "I", "created %u samples", data.m_sampleCount);
3308
3309 statOpConf(signal, stat);
3310 statOpRelease(stat);
3311 }
3312
3313 void
statOpConf(Signal * signal,StatOp & stat)3314 Trix::statOpConf(Signal* signal, StatOp& stat)
3315 {
3316 const IndexStatImplReq* req = &stat.m_req;
3317 D("statOpConf" << V(stat));
3318
3319 IndexStatImplConf* conf = (IndexStatImplConf*)signal->getDataPtrSend();
3320 conf->senderRef = reference();
3321 conf->senderData = req->senderData;
3322 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_CONF,
3323 signal, IndexStatImplConf::SignalLength, JBB);
3324 }
3325
3326 void
statOpError(Signal * signal,StatOp & stat,Uint32 errorCode,Uint32 errorLine,const Uint32 * supress)3327 Trix::statOpError(Signal* signal, StatOp& stat,
3328 Uint32 errorCode, Uint32 errorLine,
3329 const Uint32 * supress)
3330 {
3331 D("statOpError" << V(stat) << V(errorCode) << V(errorLine));
3332
3333 if (supress)
3334 {
3335 for (Uint32 i = 0; supress[i] != 0; i++)
3336 {
3337 if (errorCode == supress[i])
3338 {
3339 goto do_supress;
3340 }
3341 }
3342 }
3343 statOpEvent(stat, "W", "error %u line %u", errorCode, errorLine);
3344
3345 do_supress:
3346 ndbrequire(stat.m_errorCode == 0);
3347 stat.m_errorCode = errorCode;
3348 stat.m_errorLine = errorLine;
3349 statOpAbort(signal, stat);
3350 }
3351
3352 void
statOpAbort(Signal * signal,StatOp & stat)3353 Trix::statOpAbort(Signal* signal, StatOp& stat)
3354 {
3355 StatOp::Util& util = stat.m_util;
3356 D("statOpAbort" << V(stat));
3357
3358 if (util.m_prepareId != RNIL)
3359 {
3360 jam();
3361 // returns here when done
3362 statAbortUtil(signal, stat);
3363 return;
3364 }
3365 statOpRef(signal, stat);
3366 statOpRelease(stat);
3367 }
3368
3369 void
statOpRef(Signal * signal,StatOp & stat)3370 Trix::statOpRef(Signal* signal, StatOp& stat)
3371 {
3372 const IndexStatImplReq* req = &stat.m_req;
3373 D("statOpRef" << V(stat));
3374
3375 statOpRef(signal, req, stat.m_errorCode, stat.m_errorLine);
3376 }
3377
3378 void
statOpRef(Signal * signal,const IndexStatImplReq * req,Uint32 errorCode,Uint32 errorLine)3379 Trix::statOpRef(Signal* signal, const IndexStatImplReq* req,
3380 Uint32 errorCode, Uint32 errorLine)
3381 {
3382 D("statOpRef" << V(errorCode) << V(errorLine));
3383
3384 IndexStatImplRef* ref = (IndexStatImplRef*)signal->getDataPtrSend();
3385 ref->senderRef = reference();
3386 ref->senderData = req->senderData;
3387 ref->errorCode = errorCode;
3388 ref->errorLine = errorLine;
3389 sendSignal(req->senderRef, GSN_INDEX_STAT_IMPL_REF,
3390 signal, IndexStatImplRef::SignalLength, JBB);
3391 }
3392
3393 void
statOpEvent(StatOp & stat,const char * level,const char * msg,...)3394 Trix::statOpEvent(StatOp& stat, const char* level, const char* msg, ...)
3395 {
3396 //UNUSED const IndexStatImplReq* req = &stat.m_req;
3397 StatOp::Data& data = stat.m_data;
3398
3399 char tmp1[100];
3400 va_list ap;
3401 va_start(ap, msg);
3402 BaseString::vsnprintf(tmp1, sizeof(tmp1), msg, ap);
3403 va_end(ap);
3404
3405 char tmp2[100];
3406 BaseString::snprintf(tmp2, sizeof(tmp2),
3407 "index %u stats version %u: %s: %s",
3408 data.m_indexId, data.m_sampleVersion,
3409 stat.m_requestName, tmp1);
3410
3411 D("statOpEvent" << V(level) << V(tmp2));
3412
3413 if (level[0] == 'I')
3414 infoEvent("%s", tmp2);
3415 if (level[0] == 'W')
3416 warningEvent("%s", tmp2);
3417 }
3418
3419 // debug
3420
3421 class NdbOut&
operator <<(NdbOut & out,const Trix::StatOp & stat)3422 operator<<(NdbOut& out, const Trix::StatOp& stat)
3423 {
3424 out << "[";
3425 out << " i:" << stat.m_ownPtrI;
3426 out << " head_found:" << stat.m_data.m_head_found;
3427 out << " ]";
3428 return out;
3429 }
3430
3431
3432 BLOCK_FUNCTIONS(Trix)
3433
3434 template void append(DataBuffer<15>&,SegmentedSectionPtr,SectionSegmentPool&);
3435