1 /*
2 Copyright (c) 2003, 2020, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /*
26 This file is used to build both the multithreaded and the singlethreaded
27 ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28 the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29 macro not defined).
30 */
31
32 #include <ndb_global.h>
33
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include "signaldata/DumpStateOrd.hpp"
47 #include <signaldata/EventReport.hpp>
48 #include <signaldata/ContinueFragmented.hpp>
49 #include <signaldata/NodeStateSignalData.hpp>
50 #include <signaldata/FsRef.hpp>
51 #include <signaldata/SignalDroppedRep.hpp>
52 #include <signaldata/LocalRouteOrd.hpp>
53 #include <signaldata/TransIdAI.hpp>
54 #include <signaldata/Sync.hpp>
55 #include <DebuggerNames.hpp>
56 #include "LongSignal.hpp"
57
58 #include <Properties.hpp>
59 #include "Configuration.hpp"
60 #include <AttributeDescriptor.hpp>
61 #include <NdbSqlUtil.hpp>
62
63 #include "../blocks/dbdih/Dbdih.hpp"
64 #include <signaldata/CallbackSignal.hpp>
65 #include "LongSignalImpl.hpp"
66
67 #include "KeyDescriptor.hpp"
68
69 #include <EventLogger.hpp>
70
71 #define JAM_FILE_ID 252
72
73 extern EventLogger * g_eventLogger;
74
75 //
76 // Constructor, Destructor
77 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)78 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
79 struct Block_context & ctx,
80 Uint32 instanceNumber)
81 : theNodeId(globalData.ownId),
82 theNumber(blockNumber),
83 theInstance(instanceNumber),
84 theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
85 theInstanceList(0),
86 theMainInstance(0),
87 m_pHighResTimer(0),
88 m_ctx(ctx),
89 m_global_page_pool(globalData.m_global_page_pool),
90 m_shared_page_pool(globalData.m_shared_page_pool),
91 c_fragmentInfoHash(c_fragmentInfoPool),
92 c_linearFragmentSendList(c_fragmentSendPool),
93 c_segmentedFragmentSendList(c_fragmentSendPool),
94 c_mutexMgr(* this),
95 c_counterMgr(* this)
96 #ifdef VM_TRACE_TIME
97 ,m_currentGsn(0)
98 #endif
99 #ifdef VM_TRACE
100 ,debugOutFile(globalSignalLoggers.getOutputStream())
101 ,debugOut(debugOutFile)
102 #endif
103 {
104 m_threadId = 0;
105 m_watchDogCounter = NULL;
106 m_jamBuffer = NDB_THREAD_TLS_JAM;
107 NewVarRef = 0;
108
109 SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
110
111 if (theInstance == 0) {
112 ndbrequire(mainBlock == 0);
113 mainBlock = this;
114 theMainInstance = mainBlock;
115 globalData.setBlock(blockNumber, mainBlock);
116 mainBlock->addInstance(this, theInstance);
117 } else {
118 ndbrequire(mainBlock != 0);
119 mainBlock->addInstance(this, theInstance);
120 theMainInstance = mainBlock;
121 }
122
123 c_fragmentIdCounter = 1;
124 c_fragSenderRunning = false;
125
126 #ifdef VM_TRACE_TIME
127 clearTimes();
128 #endif
129
130 for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
131 theExecArray[i] = 0;
132
133 installSimulatedBlockFunctions();
134
135 m_callbackTableAddr = 0;
136
137 CLEAR_ERROR_INSERT_VALUE;
138
139 #ifndef NDBD_MULTITHREADED
140 /* Ndbd, init from GlobalScheduler */
141 m_pHighResTimer = globalScheduler.getHighResTimerPtr();
142 #endif
143 }
144
145 void
addInstance(SimulatedBlock * b,Uint32 theInstance)146 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
147 {
148 ndbrequire(theMainInstance == this);
149 ndbrequire(number() == b->number());
150 if (theInstanceList == 0)
151 {
152 theInstanceList = new SimulatedBlock* [MaxInstances];
153 ndbrequire(theInstanceList != 0);
154 for (Uint32 i = 0; i < MaxInstances; i++)
155 theInstanceList[i] = 0;
156 }
157 ndbrequire(theInstance < MaxInstances);
158 ndbrequire(theInstanceList[theInstance] == 0);
159 theInstanceList[theInstance] = b;
160 }
161
162 void
initCommon()163 SimulatedBlock::initCommon()
164 {
165 NDB_STATIC_ASSERT(RG_COUNT == MM_RG_COUNT + 1);
166
167 Uint32 count = 10;
168 this->getParam("FragmentSendPool", &count);
169 c_fragmentSendPool.setSize(count);
170
171 count = 10;
172 this->getParam("FragmentInfoPool", &count);
173 c_fragmentInfoPool.setSize(count);
174
175 count = 10;
176 this->getParam("FragmentInfoHash", &count);
177 c_fragmentInfoHash.setSize(count);
178
179 Uint32 def = 5;
180 #ifdef NDBD_MULTITHREADED
181 def += globalData.getBlockThreads();
182 #endif
183
184 count = def;
185 this->getParam("ActiveMutexes", &count);
186 c_mutexMgr.setSize(count);
187
188 count = def;
189 this->getParam("ActiveCounters", &count);
190 c_counterMgr.setSize(count);
191
192 count = def;
193 this->getParam("ActiveThreadSync", &count);
194 c_syncThreadPool.setSize(count);
195 }
196
~SimulatedBlock()197 SimulatedBlock::~SimulatedBlock()
198 {
199 freeBat();
200 #ifdef VM_TRACE_TIME
201 printTimes(stdout);
202 #endif
203
204 if (theInstanceList != 0) {
205 Uint32 i;
206 for (i = 0; i < MaxInstances; i++)
207 {
208 if (theInstanceList[i] != this)
209 {
210 delete theInstanceList[i];
211 }
212 }
213 delete [] theInstanceList;
214 }
215 theInstanceList = 0;
216 }
217
218 void
installSimulatedBlockFunctions()219 SimulatedBlock::installSimulatedBlockFunctions(){
220 ExecFunction * a = theExecArray;
221 a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
222 a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
223 a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
224 a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
225 a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
226 a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
227 a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
228 a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
229 a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
230 a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
231 a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
232 a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
233 a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
234 a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
235 a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
236 a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
237 a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
238 a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
239 a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
240 a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
241 a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
242 a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
243 a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
244 a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
245 a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
246 a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
247 a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
248 a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
249 a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
250 a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
251 a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
252 }
253
254 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)255 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
256 ExecFunction f, bool force){
257 if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
258 char errorMsg[255];
259 BaseString::snprintf(errorMsg, 255,
260 "GSN %d(%d))", gsn, MAX_GSN);
261 ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
262 }
263 theExecArray[gsn] = f;
264 }
265
266 void
assignToThread(ThreadContext ctx)267 SimulatedBlock::assignToThread(ThreadContext ctx)
268 {
269 m_threadId = ctx.threadId;
270 m_jamBuffer = ctx.jamBuffer;
271 m_watchDogCounter = ctx.watchDogCounter;
272 m_sectionPoolCache = ctx.sectionPoolCache;
273 m_pHighResTimer = ctx.pHighResTimer;
274 }
275
276 Uint32
getInstanceKeyCanFail(Uint32 tabId,Uint32 fragId)277 SimulatedBlock::getInstanceKeyCanFail(Uint32 tabId, Uint32 fragId)
278 {
279 Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
280 Uint32 instanceKey = dbdih->dihGetInstanceKeyCanFail(tabId, fragId);
281 return instanceKey;
282 }
283
284 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)285 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
286 {
287 Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
288 Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
289 return instanceKey;
290 }
291
292 Uint32
getInstanceFromKey(Uint32 instanceKey)293 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
294 {
295 Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
296 Uint32 instanceNo;
297 if (lqhWorkers == 0) {
298 instanceNo = 0;
299 } else {
300 assert(instanceKey != 0);
301 instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
302 }
303 return instanceNo;
304 }
305
306 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const307 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
308 const char* filename, int lineno) const
309 {
310 char objRef[255];
311 BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
312 char probData[255];
313 BaseString::snprintf(probData, 255,
314 "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
315 gsn, len, recBlockNo);
316
317 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
318 probData,
319 objRef);
320 }
321
322
323 extern class SectionSegmentPool g_sectionSegmentPool;
324
325 void
handle_invalid_sections_in_send_signal(const Signal * signal) const326 SimulatedBlock::handle_invalid_sections_in_send_signal(const Signal* signal)
327 const
328 {
329 char errMsg[160];
330 BaseString::snprintf(errMsg, sizeof errMsg,
331 "Unhandled sections in sendSignal for GSN %u (%s).",
332 signal->header.theVerId_signalNumber,
333 getSignalName(signal->header.theVerId_signalNumber));
334 // Print message and terminate.
335 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
336 errMsg,
337 "");
338 }
339
340 void
handle_lingering_sections_after_execute(const Signal * signal) const341 SimulatedBlock::handle_lingering_sections_after_execute(const Signal* signal)
342 const
343 {
344 char errMsg[160];
345 BaseString::snprintf(errMsg, sizeof errMsg,
346 "Unhandled sections after execute for GSN %u (%s).",
347 signal->header.theVerId_signalNumber,
348 getSignalName(signal->header.theVerId_signalNumber));
349 // Print message and terminate.
350 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
351 errMsg,
352 "");
353 }
354
355 void
handle_invalid_fragmentInfo(Signal * signal) const356 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
357 {
358 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
359 "Incorrect header->m_fragmentInfo in sendSignal()",
360 "");
361 }
362
363 void
handle_out_of_longsignal_memory(Signal * signal) const364 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
365 {
366 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
367 "Out of LongMessageBuffer in sendSignal",
368 "");
369 }
370
371 template<typename SecPtr>
372 void
handle_send_failed(SendStatus ss,Signal * signal,Uint32 recNode,SecPtr ptr[]) const373 SimulatedBlock::handle_send_failed(SendStatus ss,
374 Signal * signal,
375 Uint32 recNode,
376 SecPtr ptr[]) const
377 {
378 switch(ss){
379 case SEND_BUFFER_FULL:
380 ErrorReporter::handleError(NDBD_EXIT_GENERIC,
381 "Out of SendBufferMemory in sendSignal", "");
382 break;
383 case SEND_MESSAGE_TOO_BIG:
384 /* If message is too big when sending CmvmiDummySignal log a convinient
385 * message about it to.
386 * Note that CmvmiDummySignal is not intended for production usage but for
387 * use by test cases.
388 */
389 if (signal->header.theVerId_signalNumber == GSN_DUMP_STATE_ORD &&
390 signal->theData[0] == DumpStateOrd::CmvmiDummySignal)
391 {
392 jam();
393 const Uint32 num_secs = signal->getNoOfSections();
394 char msg[24*4];
395 snprintf(msg,
396 sizeof(msg),
397 "Failed sending CmvmiDummySignal"
398 " (size %u+%u+%u+%u+%u) from %u to %u.",
399 signal->getLength(), num_secs,
400 (num_secs > 0) ? ptr[0].sz : 0,
401 (num_secs > 1) ? ptr[1].sz : 0,
402 (num_secs > 2) ? ptr[2].sz : 0,
403 signal->theData[2],
404 recNode);
405 g_eventLogger->info("%s", msg);
406 infoEvent("%s", msg);
407 return;
408 }
409 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
410 "Message too big in sendSignal", "");
411 break;
412 case SEND_UNKNOWN_NODE:
413 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
414 "Unknown node in sendSignal", "");
415 break;
416 case SEND_OK:
417 case SEND_BLOCKED:
418 case SEND_DISCONNECTED:
419 // Should never happen
420 ndbabort();
421 }
422 ndbabort();
423 }
424
425 static void
linkSegments(Uint32 head,Uint32 tail)426 linkSegments(Uint32 head, Uint32 tail){
427
428 Ptr<SectionSegment> headPtr;
429 g_sectionSegmentPool.getPtr(headPtr, head);
430
431 Ptr<SectionSegment> tailPtr;
432 g_sectionSegmentPool.getPtr(tailPtr, tail);
433
434 Ptr<SectionSegment> oldTailPtr;
435 g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
436
437 /* Can only efficiently link segments if linking to the end of a
438 * multiple-of-segment-size sized chunk
439 */
440 if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
441 {
442 #if defined VM_TRACE || defined ERROR_INSERT
443 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
444 "Bad head segment size",
445 "");
446 #else
447 ndbout_c("linkSegments : Bad head segment size");
448 #endif
449 }
450
451 headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
452 headPtr.p->m_sz += tailPtr.p->m_sz;
453
454 oldTailPtr.p->m_nextSegment = tailPtr.i;
455 }
456
457 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])458 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
459 Uint32 tSec0 = ptr[0].i;
460 Uint32 tSec1 = ptr[1].i;
461 Uint32 tSec2 = ptr[2].i;
462 SectionSegment * p;
463 switch(secCount){
464 case 3:
465 p = g_sectionSegmentPool.getPtr(tSec2);
466 ptr[2].p = p;
467 ptr[2].sz = p->m_sz;
468 // Fall through
469 case 2:
470 p = g_sectionSegmentPool.getPtr(tSec1);
471 ptr[1].p = p;
472 ptr[1].sz = p->m_sz;
473 // Fall through
474 case 1:
475 p = g_sectionSegmentPool.getPtr(tSec0);
476 ptr[0].p = p;
477 ptr[0].sz = p->m_sz;
478 // Fall through
479 case 0:
480 return;
481 }
482 char msg[40];
483 sprintf(msg, "secCount=%d", secCount);
484 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
485 }
486
487 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)488 getSection(SegmentedSectionPtr & ptr, Uint32 i){
489 ptr.i = i;
490 SectionSegment * p = g_sectionSegmentPool.getPtr(i);
491 ptr.p = p;
492 ptr.sz = p->m_sz;
493 }
494
getSectionSz(Uint32 id)495 Uint32 getSectionSz(Uint32 id)
496 {
497 return g_sectionSegmentPool.getPtr(id)->m_sz;
498 }
499
getLastWordPtr(Uint32 id)500 Uint32* getLastWordPtr(Uint32 id)
501 {
502 SectionSegment* first= g_sectionSegmentPool.getPtr(id);
503 SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
504 Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
505 return &last->theData[offset];
506 }
507
508 #ifdef NDBD_MULTITHREADED
509 #define SB_SP_ARG *m_sectionPoolCache,
510 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
511 #else
512 #define SB_SP_ARG
513 #define SB_SP_REL_ARG
514 #endif
515
516 static
517 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])518 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
519 Uint32 tSec0 = ptr[0].i;
520 Uint32 tSz0 = ptr[0].sz;
521 Uint32 tSec1 = ptr[1].i;
522 Uint32 tSz1 = ptr[1].sz;
523 Uint32 tSec2 = ptr[2].i;
524 Uint32 tSz2 = ptr[2].sz;
525 switch(secCount){
526 case 3:
527 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
528 relSz(tSz2), tSec2,
529 ptr[2].p->m_lastSegment);
530 // Fall through
531 case 2:
532 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
533 relSz(tSz1), tSec1,
534 ptr[1].p->m_lastSegment);
535 // Fall through
536 case 1:
537 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
538 relSz(tSz0), tSec0,
539 ptr[0].p->m_lastSegment);
540 // Fall through
541 case 0:
542 return;
543 }
544 char msg[40];
545 sprintf(msg, "secCount=%d", secCount);
546 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
547 }
548
549 void
getSendBufferLevel(NodeId node,SB_LevelType & level)550 SimulatedBlock::getSendBufferLevel(NodeId node, SB_LevelType &level)
551 {
552 #ifdef NDBD_MULTITHREADED
553 mt_getSendBufferLevel(m_threadId, node, level);
554 #else
555 getNonMTTransporterSendHandle()->getSendBufferLevel(node, level);
556 #endif
557 }
558
559 Uint32
getSignalsInJBB()560 SimulatedBlock::getSignalsInJBB()
561 {
562 Uint32 num_signals;
563 #ifdef NDBD_MULTITHREADED
564 num_signals = mt_getSignalsInJBB(m_threadId);
565 #else
566 num_signals = globalScheduler.getBOccupancy();
567 #endif
568 return num_signals;
569 }
570
571 void
startChangeNeighbourNode()572 SimulatedBlock::startChangeNeighbourNode()
573 {
574 /* We only treat neighbour nodes in a special manner in ndbmtd. */
575 #ifdef NDBD_MULTITHREADED
576 mt_startChangeNeighbourNode();
577 #endif
578 }
579
580 void
setNeighbourNode(NodeId node)581 SimulatedBlock::setNeighbourNode(NodeId node)
582 {
583 /* We only treat neighbour nodes in a special manner in ndbmtd. */
584 #ifdef NDBD_MULTITHREADED
585 mt_setNeighbourNode(node);
586 #endif
587 }
588
589 void
setNoSend()590 SimulatedBlock::setNoSend()
591 {
592 #ifdef NDBD_MULTITHREADED
593 mt_setNoSend(m_threadId);
594 #endif
595 }
596
597 void
endChangeNeighbourNode()598 SimulatedBlock::endChangeNeighbourNode()
599 {
600 /* We only treat neighbour nodes in a special manner in ndbmtd. */
601 #ifdef NDBD_MULTITHREADED
602 mt_endChangeNeighbourNode();
603 #endif
604 }
605
606 void
setWakeupThread(Uint32 wakeup_instance)607 SimulatedBlock::setWakeupThread(Uint32 wakeup_instance)
608 {
609 #ifdef NDBD_MULTITHREADED
610 mt_setWakeupThread(m_threadId, wakeup_instance);
611 #endif
612 }
613
614 void
setOverloadStatus(OverloadStatus new_status)615 SimulatedBlock::setOverloadStatus(OverloadStatus new_status)
616 {
617 #ifdef NDBD_MULTITHREADED
618 mt_setOverloadStatus(m_threadId, new_status);
619 #endif
620 }
621
622 void
setNodeOverloadStatus(OverloadStatus new_status)623 SimulatedBlock::setNodeOverloadStatus(OverloadStatus new_status)
624 {
625 #ifdef NDBD_MULTITHREADED
626 mt_setNodeOverloadStatus(m_threadId, new_status);
627 #endif
628 }
629
630 void
setSendNodeOverloadStatus(OverloadStatus new_status)631 SimulatedBlock::setSendNodeOverloadStatus(OverloadStatus new_status)
632 {
633 #ifdef NDBD_MULTITHREADED
634 mt_setSendNodeOverloadStatus(new_status);
635 #endif
636 }
637
638 Uint32
getConfiguredSpintime()639 SimulatedBlock::getConfiguredSpintime()
640 {
641 #ifdef NDBD_MULTITHREADED
642 return mt_getConfiguredSpintime(m_threadId);
643 #else
644 return 0;
645 #endif
646 }
647
648 void
setSpintime(Uint32 new_spintime)649 SimulatedBlock::setSpintime(Uint32 new_spintime)
650 {
651 #ifdef NDBD_MULTITHREADED
652 mt_setSpintime(m_threadId, new_spintime);
653 #endif
654 }
655
656 Uint32
getWakeupLatency()657 SimulatedBlock::getWakeupLatency()
658 {
659 #ifdef NDBD_MULTITHREADED
660 return mt_getWakeupLatency();
661 #else
662 return 25;
663 #endif
664 }
665
666 void
setWakeupLatency(Uint32 latency)667 SimulatedBlock::setWakeupLatency(Uint32 latency)
668 {
669 #ifdef NDBD_MULTITHREADED
670 mt_setWakeupLatency(latency);
671 #endif
672 }
673
674 void
getPerformanceTimers(Uint64 & micros_sleep,Uint64 & spin_time,Uint64 & buffer_full_micros_sleep,Uint64 & micros_send)675 SimulatedBlock::getPerformanceTimers(Uint64 & micros_sleep,
676 Uint64 & spin_time,
677 Uint64 & buffer_full_micros_sleep,
678 Uint64 & micros_send)
679 {
680 #ifdef NDBD_MULTITHREADED
681 mt_getPerformanceTimers(m_threadId,
682 micros_sleep,
683 spin_time,
684 buffer_full_micros_sleep,
685 micros_send);
686 #else
687 micros_sleep = globalData.theMicrosSleep;
688 spin_time = globalData.theMicrosSpin;
689 buffer_full_micros_sleep = globalData.theBufferFullMicrosSleep;
690 micros_send = globalData.theMicrosSend;
691 #endif
692 }
693
694 const char *
getThreadDescription()695 SimulatedBlock::getThreadDescription()
696 {
697 const char *desc;
698 #ifdef NDBD_MULTITHREADED
699 desc = mt_getThreadDescription(m_threadId);
700 #else
701 desc = "ndbd single thread";
702 #endif
703 return desc;
704 }
705
706 const char *
getThreadName()707 SimulatedBlock::getThreadName()
708 {
709 const char *name;
710 #ifdef NDBD_MULTITHREADED
711 name = mt_getThreadName(m_threadId);
712 #else
713 name = "main";
714 #endif
715 return name;
716 }
717
718 void
getSendPerformanceTimers(Uint32 send_instance,Uint64 & exec_time,Uint64 & sleep_time,Uint64 & spin_time,Uint64 & user_time_os,Uint64 & kernel_time_os,Uint64 & elapsed_time_os)719 SimulatedBlock::getSendPerformanceTimers(Uint32 send_instance,
720 Uint64 & exec_time,
721 Uint64 & sleep_time,
722 Uint64 & spin_time,
723 Uint64 & user_time_os,
724 Uint64 & kernel_time_os,
725 Uint64 & elapsed_time_os)
726 {
727 /* No send thread in ndbd */
728 #ifdef NDBD_MULTITHREADED
729 mt_getSendPerformanceTimers(send_instance,
730 exec_time,
731 sleep_time,
732 spin_time,
733 user_time_os,
734 kernel_time_os,
735 elapsed_time_os);
736 #else
737 exec_time = 0;
738 sleep_time = 0;
739 spin_time = 0;
740 user_time_os = 0;
741 kernel_time_os = 0;
742 elapsed_time_os = 0;
743 #endif
744 }
745
746 Uint32
getNumSendThreads()747 SimulatedBlock::getNumSendThreads()
748 {
749 #ifdef NDBD_MULTITHREADED
750 return mt_getNumSendThreads();
751 #else
752 return 0;
753 #endif
754 }
755
756 Uint32
getNumThreads()757 SimulatedBlock::getNumThreads()
758 {
759 #ifdef NDBD_MULTITHREADED
760 return mt_getNumThreads();
761 #else
762 return 1;
763 #endif
764 }
765
766 void
flush_send_buffers()767 SimulatedBlock::flush_send_buffers()
768 {
769 #ifdef NDBD_MULTITHREADED
770 mt_flush_send_buffers(m_threadId);
771 #endif
772 }
773
774 void
set_watchdog_counter()775 SimulatedBlock::set_watchdog_counter()
776 {
777 #ifdef NDBD_MULTITHREADED
778 mt_set_watchdog_counter(m_threadId);
779 #endif
780 }
781
782 void
assign_recv_thread_new_trp(Uint32 trp_id)783 SimulatedBlock::assign_recv_thread_new_trp(Uint32 trp_id)
784 {
785 #ifdef NDBD_MULTITHREADED
786 mt_assign_recv_thread_new_trp(trp_id);
787 #endif
788 }
789
790 void
assign_multi_trps_to_send_threads()791 SimulatedBlock::assign_multi_trps_to_send_threads()
792 {
793 #ifdef NDBD_MULTITHREADED
794 mt_assign_multi_trps_to_send_threads();
795 #endif
796 }
797
798 bool
epoll_add_trp(NodeId node_id,TrpId trp_id)799 SimulatedBlock::epoll_add_trp(NodeId node_id, TrpId trp_id)
800 {
801 #ifdef NDBD_MULTITHREADED
802 return mt_epoll_add_trp(m_threadId, node_id, trp_id);
803 #else
804 require(false);
805 return false;
806 #endif
807 }
808
809 bool
is_recv_thread_for_new_trp(NodeId node_id,TrpId trp_id)810 SimulatedBlock::is_recv_thread_for_new_trp(NodeId node_id, TrpId trp_id)
811 {
812 #ifdef NDBD_MULTITHREADED
813 return mt_is_recv_thread_for_new_trp(m_threadId, node_id, trp_id);
814 #else
815 require(false);
816 return false;
817 #endif
818 }
819
820 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const821 SimulatedBlock::sendSignal(BlockReference ref,
822 GlobalSignalNumber gsn,
823 Signal* signal,
824 Uint32 length,
825 JobBufferLevel jobBuffer) const {
826
827 BlockReference sendBRef = reference();
828
829 Uint32 recBlock = refToBlock(ref);
830 Uint32 recNode = refToNode(ref);
831 Uint32 ourProcessor = globalData.ownId;
832
833 ndbrequire(signal->header.m_noOfSections == 0);
834 check_sections(signal, signal->header.m_noOfSections, 0);
835
836 signal->header.theLength = length;
837 signal->header.theVerId_signalNumber = gsn;
838 signal->header.theReceiversBlockNumber = recBlock;
839 signal->header.m_noOfSections = 0;
840
841 Uint32 tSignalId = signal->header.theSignalId;
842
843 if (unlikely((length == 0) || length > 25 || (recBlock == 0)))
844 {
845 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
846 return;
847 }//if
848 #ifdef VM_TRACE
849 if(globalData.testOn){
850 Uint16 proc =
851 (recNode == 0 ? globalData.ownId : recNode);
852 signal->header.theSendersBlockRef = sendBRef;
853 globalSignalLoggers.sendSignal(signal->header,
854 jobBuffer,
855 &signal->theData[0],
856 proc);
857 }
858 #endif
859
860 if(recNode == ourProcessor || recNode == 0) {
861 signal->header.theSendersSignalId = tSignalId;
862 signal->header.theSendersBlockRef = sendBRef;
863 #ifdef NDBD_MULTITHREADED
864 if (jobBuffer == JBB)
865 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
866 else
867 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
868 #else
869 globalScheduler.execute(signal, jobBuffer, recBlock,
870 gsn);
871 #endif
872 return;
873 } else {
874 // send distributed Signal
875 SignalHeader sh;
876
877 Uint32 tTrace = signal->getTrace();
878
879 sh.theVerId_signalNumber = gsn;
880 sh.theReceiversBlockNumber = recBlock;
881 sh.theSendersBlockRef = refToBlock(sendBRef);
882 sh.theLength = length;
883 sh.theTrace = tTrace;
884 sh.theSignalId = tSignalId;
885 sh.m_noOfSections = 0;
886 sh.m_fragmentInfo = 0;
887
888 #ifdef TRACE_DISTRIBUTED
889 ndbout_c("send: %s(%d) to (%s, %d)",
890 getSignalName(gsn), gsn, getBlockName(recBlock),
891 recNode);
892 #endif
893
894 SendStatus ss;
895 #ifdef NDBD_MULTITHREADED
896 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
897 recNode, 0);
898 #else
899 TrpId trp_id = 0;
900 ss = globalTransporterRegistry.
901 prepareSend(getNonMTTransporterSendHandle(),
902 &sh, jobBuffer,
903 &signal->theData[0], recNode, trp_id,
904 (LinearSectionPtr*)0);
905 #endif
906
907 if (unlikely(! (ss == SEND_OK ||
908 ss == SEND_BLOCKED ||
909 ss == SEND_DISCONNECTED)))
910 {
911 handle_send_failed(ss, signal, recNode, (LinearSectionPtr*)NULL);
912 }
913 }
914 return;
915 }
916
917 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const918 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
919 GlobalSignalNumber gsn,
920 Signal* signal,
921 Uint32 length,
922 JobBufferLevel jobBuffer) const {
923
924 Uint32 noOfSections = signal->header.m_noOfSections;
925 Uint32 tSignalId = signal->header.theSignalId;
926 Uint32 tTrace = signal->getTrace();
927
928 Uint32 ourProcessor = globalData.ownId;
929 Uint32 recBlock = rg.m_block;
930
931 signal->header.theLength = length;
932 signal->header.theVerId_signalNumber = gsn;
933 signal->header.theReceiversBlockNumber = recBlock;
934 signal->header.theSendersSignalId = tSignalId;
935 signal->header.theSendersBlockRef = reference();
936 signal->header.m_noOfSections = 0;
937
938 ndbrequire(noOfSections == 0);
939 check_sections(signal, noOfSections, 0);
940
941 if ((length == 0) || (length > 25) || (recBlock == 0)) {
942 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
943 return;
944 }//if
945
946 SignalHeader sh;
947
948 sh.theVerId_signalNumber = gsn;
949 sh.theReceiversBlockNumber = recBlock;
950 sh.theSendersBlockRef = refToBlock(reference());
951 sh.theLength = length;
952 sh.theTrace = tTrace;
953 sh.theSignalId = tSignalId;
954 sh.m_noOfSections = 0;
955 sh.m_fragmentInfo = 0;
956
957 /**
958 * Check own node
959 */
960 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
961 #ifdef VM_TRACE
962 if(globalData.testOn){
963 globalSignalLoggers.sendSignal(signal->header,
964 jobBuffer,
965 &signal->theData[0],
966 ourProcessor);
967 }
968 #endif
969
970 #ifdef NDBD_MULTITHREADED
971 if (jobBuffer == JBB)
972 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
973 else
974 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
975 #else
976 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
977 #endif
978
979 rg.m_nodes.clear((Uint32)0);
980 rg.m_nodes.clear(ourProcessor);
981 }
982
983 /**
984 * Do the big loop
985 */
986 Uint32 recNode = 0;
987 while(!rg.m_nodes.isclear()){
988 recNode = rg.m_nodes.find(recNode + 1);
989 rg.m_nodes.clear(recNode);
990 #ifdef VM_TRACE
991 if(globalData.testOn){
992 globalSignalLoggers.sendSignal(signal->header,
993 jobBuffer,
994 &signal->theData[0],
995 recNode);
996 }
997 #endif
998
999 #ifdef TRACE_DISTRIBUTED
1000 ndbout_c("send: %s(%d) to (%s, %d)",
1001 getSignalName(gsn), gsn, getBlockName(recBlock),
1002 recNode);
1003 #endif
1004
1005 SendStatus ss;
1006 #ifdef NDBD_MULTITHREADED
1007 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1008 recNode, 0);
1009 #else
1010 TrpId trp_id = 0;
1011 ss = globalTransporterRegistry.
1012 prepareSend(getNonMTTransporterSendHandle(),
1013 &sh, jobBuffer,
1014 &signal->theData[0], recNode, trp_id,
1015 (LinearSectionPtr*)0);
1016 #endif
1017
1018 if (unlikely(! (ss == SEND_OK ||
1019 ss == SEND_BLOCKED ||
1020 ss == SEND_DISCONNECTED)))
1021 {
1022 handle_send_failed(ss, signal, recNode, (LinearSectionPtr*)NULL);
1023 }
1024 }
1025
1026 return;
1027 }
1028
1029 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
1030
1031 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const1032 SimulatedBlock::sendSignal(BlockReference ref,
1033 GlobalSignalNumber gsn,
1034 Signal* signal,
1035 Uint32 length,
1036 JobBufferLevel jobBuffer,
1037 LinearSectionPtr ptr[3],
1038 Uint32 noOfSections) const {
1039
1040 BlockReference sendBRef = reference();
1041
1042 Uint32 recBlock = refToBlock(ref);
1043 Uint32 recNode = refToNode(ref);
1044 Uint32 ourProcessor = globalData.ownId;
1045
1046 ndbrequire(signal->header.m_noOfSections == 0);
1047 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1048
1049 signal->header.theLength = length;
1050 signal->header.theVerId_signalNumber = gsn;
1051 signal->header.theReceiversBlockNumber = recBlock;
1052 signal->header.m_noOfSections = noOfSections;
1053
1054 Uint32 tSignalId = signal->header.theSignalId;
1055 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1056
1057 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1058 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1059 return;
1060 }//if
1061 #ifdef VM_TRACE
1062 if(globalData.testOn){
1063 Uint16 proc =
1064 (recNode == 0 ? globalData.ownId : recNode);
1065 signal->header.theSendersBlockRef = sendBRef;
1066 globalSignalLoggers.sendSignal(signal->header,
1067 jobBuffer,
1068 &signal->theData[0],
1069 proc,
1070 ptr, noOfSections);
1071 }
1072 #endif
1073
1074 if(recNode == ourProcessor || recNode == 0) {
1075 signal->header.theSendersSignalId = tSignalId;
1076 signal->header.theSendersBlockRef = sendBRef;
1077
1078 /**
1079 * We have to copy the data
1080 */
1081 bool ok = true;
1082 Ptr<SectionSegment> segptr[3];
1083 for(Uint32 i = 0; i<noOfSections; i++){
1084 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
1085 signal->theData[length+i] = segptr[i].i;
1086 }
1087
1088 if (unlikely(! ok))
1089 {
1090 handle_out_of_longsignal_memory(signal);
1091 }
1092
1093 #ifdef NDBD_MULTITHREADED
1094 if (jobBuffer == JBB)
1095 sendlocal(m_threadId, &signal->header, signal->theData,
1096 signal->theData+length);
1097 else
1098 sendprioa(m_threadId, &signal->header, signal->theData,
1099 signal->theData+length);
1100 #else
1101 globalScheduler.execute(signal, jobBuffer, recBlock,
1102 gsn);
1103 #endif
1104 signal->header.m_noOfSections = 0;
1105 return;
1106 } else {
1107 // send distributed Signal
1108 SignalHeader sh;
1109
1110 Uint32 tTrace = signal->getTrace();
1111 Uint32 noOfSections = signal->header.m_noOfSections;
1112
1113 sh.theVerId_signalNumber = gsn;
1114 sh.theReceiversBlockNumber = recBlock;
1115 sh.theSendersBlockRef = refToBlock(sendBRef);
1116 sh.theLength = length;
1117 sh.theTrace = tTrace;
1118 sh.theSignalId = tSignalId;
1119 sh.m_noOfSections = noOfSections;
1120 sh.m_fragmentInfo = tFragInfo;
1121
1122 #ifdef TRACE_DISTRIBUTED
1123 ndbout_c("send: %s(%d) to (%s, %d)",
1124 getSignalName(gsn), gsn, getBlockName(recBlock),
1125 recNode);
1126 #endif
1127
1128 SendStatus ss;
1129 #ifdef NDBD_MULTITHREADED
1130 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1131 recNode, ptr);
1132 #else
1133 TrpId trp_id = 0;
1134 ss = globalTransporterRegistry.
1135 prepareSend(getNonMTTransporterSendHandle(),
1136 &sh, jobBuffer,
1137 &signal->theData[0], recNode, trp_id,
1138 ptr);
1139 #endif
1140
1141 if (unlikely(! (ss == SEND_OK ||
1142 ss == SEND_BLOCKED ||
1143 ss == SEND_DISCONNECTED)))
1144 {
1145 handle_send_failed(ss, signal, recNode, ptr);
1146 }
1147 }
1148
1149 signal->header.m_noOfSections = 0;
1150 signal->header.m_fragmentInfo = 0;
1151 return;
1152 }
1153
1154 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const1155 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1156 GlobalSignalNumber gsn,
1157 Signal* signal,
1158 Uint32 length,
1159 JobBufferLevel jobBuffer,
1160 LinearSectionPtr ptr[3],
1161 Uint32 noOfSections) const {
1162
1163 Uint32 tSignalId = signal->header.theSignalId;
1164 Uint32 tTrace = signal->getTrace();
1165 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1166
1167 Uint32 ourProcessor = globalData.ownId;
1168 Uint32 recBlock = rg.m_block;
1169
1170 ndbrequire(signal->header.m_noOfSections == 0);
1171 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1172
1173 signal->header.theLength = length;
1174 signal->header.theVerId_signalNumber = gsn;
1175 signal->header.theReceiversBlockNumber = recBlock;
1176 signal->header.theSendersSignalId = tSignalId;
1177 signal->header.theSendersBlockRef = reference();
1178 signal->header.m_noOfSections = noOfSections;
1179
1180 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1181 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1182 return;
1183 }//if
1184
1185 SignalHeader sh;
1186 sh.theVerId_signalNumber = gsn;
1187 sh.theReceiversBlockNumber = recBlock;
1188 sh.theSendersBlockRef = refToBlock(reference());
1189 sh.theLength = length;
1190 sh.theTrace = tTrace;
1191 sh.theSignalId = tSignalId;
1192 sh.m_noOfSections = noOfSections;
1193 sh.m_fragmentInfo = tFragInfo;
1194
1195 /**
1196 * Check own node
1197 */
1198 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
1199 #ifdef VM_TRACE
1200 if(globalData.testOn){
1201 globalSignalLoggers.sendSignal(signal->header,
1202 jobBuffer,
1203 &signal->theData[0],
1204 ourProcessor,
1205 ptr, noOfSections);
1206 }
1207 #endif
1208 /**
1209 * We have to copy the data
1210 */
1211 bool ok = true;
1212 Ptr<SectionSegment> segptr[3];
1213 for(Uint32 i = 0; i<noOfSections; i++){
1214 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
1215 signal->theData[length+i] = segptr[i].i;
1216 }
1217
1218 if (unlikely(! ok))
1219 {
1220 handle_out_of_longsignal_memory(signal);
1221 }
1222
1223 #ifdef NDBD_MULTITHREADED
1224 if (jobBuffer == JBB)
1225 sendlocal(m_threadId, &signal->header, signal->theData,
1226 signal->theData + length);
1227 else
1228 sendprioa(m_threadId, &signal->header, signal->theData,
1229 signal->theData + length);
1230 #else
1231 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1232 #endif
1233
1234 rg.m_nodes.clear((Uint32)0);
1235 rg.m_nodes.clear(ourProcessor);
1236 }
1237
1238 /**
1239 * Do the big loop
1240 */
1241 Uint32 recNode = 0;
1242 while(!rg.m_nodes.isclear()){
1243 recNode = rg.m_nodes.find(recNode + 1);
1244 rg.m_nodes.clear(recNode);
1245
1246 #ifdef VM_TRACE
1247 if(globalData.testOn){
1248 globalSignalLoggers.sendSignal(signal->header,
1249 jobBuffer,
1250 &signal->theData[0],
1251 recNode,
1252 ptr, noOfSections);
1253 }
1254 #endif
1255
1256 #ifdef TRACE_DISTRIBUTED
1257 ndbout_c("send: %s(%d) to (%s, %d)",
1258 getSignalName(gsn), gsn, getBlockName(recBlock),
1259 recNode);
1260 #endif
1261
1262 SendStatus ss;
1263 #ifdef NDBD_MULTITHREADED
1264 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1265 recNode, ptr);
1266 #else
1267 TrpId trp_id = 0;
1268 ss = globalTransporterRegistry.
1269 prepareSend(getNonMTTransporterSendHandle(),
1270 &sh, jobBuffer,
1271 &signal->theData[0], recNode, trp_id,
1272 ptr);
1273 #endif
1274
1275 if (unlikely(! (ss == SEND_OK ||
1276 ss == SEND_BLOCKED ||
1277 ss == SEND_DISCONNECTED)))
1278 {
1279 handle_send_failed(ss, signal, recNode, ptr);
1280 }
1281 }
1282
1283 signal->header.m_noOfSections = 0;
1284 signal->header.m_fragmentInfo = 0;
1285
1286 return;
1287 }
1288
1289 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1290 SimulatedBlock::sendSignal(BlockReference ref,
1291 GlobalSignalNumber gsn,
1292 Signal* signal,
1293 Uint32 length,
1294 JobBufferLevel jobBuffer,
1295 SectionHandle* sections) const {
1296
1297 Uint32 noOfSections = sections->m_cnt;
1298 BlockReference sendBRef = reference();
1299
1300 Uint32 recBlock = refToBlock(ref);
1301 Uint32 recNode = refToNode(ref);
1302 Uint32 ourProcessor = globalData.ownId;
1303
1304 ndbrequire(signal->header.m_noOfSections == 0);
1305 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1306
1307 signal->header.theLength = length;
1308 signal->header.theVerId_signalNumber = gsn;
1309 signal->header.theReceiversBlockNumber = recBlock;
1310 signal->header.m_noOfSections = noOfSections;
1311
1312 Uint32 tSignalId = signal->header.theSignalId;
1313 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1314
1315 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1316 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1317 return;
1318 }//if
1319 #ifdef VM_TRACE
1320 if(globalData.testOn){
1321 Uint16 proc =
1322 (recNode == 0 ? globalData.ownId : recNode);
1323 signal->header.theSendersBlockRef = sendBRef;
1324 globalSignalLoggers.sendSignal(signal->header,
1325 jobBuffer,
1326 &signal->theData[0],
1327 proc,
1328 sections->m_ptr, noOfSections);
1329 }
1330 #endif
1331
1332 if(recNode == ourProcessor || recNode == 0) {
1333 signal->header.theSendersSignalId = tSignalId;
1334 signal->header.theSendersBlockRef = sendBRef;
1335
1336 /**
1337 * We have to copy the data
1338 */
1339 Uint32 * dst = signal->theData + length;
1340 * dst ++ = sections->m_ptr[0].i;
1341 * dst ++ = sections->m_ptr[1].i;
1342 * dst ++ = sections->m_ptr[2].i;
1343
1344 #ifdef NDBD_MULTITHREADED
1345 if (jobBuffer == JBB)
1346 sendlocal(m_threadId, &signal->header, signal->theData,
1347 signal->theData + length);
1348 else
1349 sendprioa(m_threadId, &signal->header, signal->theData,
1350 signal->theData + length);
1351 #else
1352 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1353 #endif
1354 } else {
1355 // send distributed Signal
1356 SignalHeader sh;
1357
1358 Uint32 tTrace = signal->getTrace();
1359
1360 sh.theVerId_signalNumber = gsn;
1361 sh.theReceiversBlockNumber = recBlock;
1362 sh.theSendersBlockRef = refToBlock(sendBRef);
1363 sh.theLength = length;
1364 sh.theTrace = tTrace;
1365 sh.theSignalId = tSignalId;
1366 sh.m_noOfSections = noOfSections;
1367 sh.m_fragmentInfo = tFragInfo;
1368
1369 #ifdef TRACE_DISTRIBUTED
1370 ndbout_c("send: %s(%d) to (%s, %d)",
1371 getSignalName(gsn), gsn, getBlockName(recBlock),
1372 recNode);
1373 #endif
1374
1375 SendStatus ss;
1376 #ifdef NDBD_MULTITHREADED
1377 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1378 recNode, &g_sectionSegmentPool, sections->m_ptr);
1379 #else
1380 TrpId trp_id = 0;
1381 ss = globalTransporterRegistry.
1382 prepareSend(getNonMTTransporterSendHandle(),
1383 &sh, jobBuffer,
1384 &signal->theData[0], recNode, trp_id,
1385 g_sectionSegmentPool, sections->m_ptr);
1386 #endif
1387
1388 if (unlikely(! (ss == SEND_OK ||
1389 ss == SEND_BLOCKED ||
1390 ss == SEND_DISCONNECTED)))
1391 {
1392 handle_send_failed(ss, signal, recNode, sections->m_ptr);
1393 }
1394
1395 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1396 }
1397
1398 signal->header.m_noOfSections = 0;
1399 signal->header.m_fragmentInfo = 0;
1400 sections->m_cnt = 0;
1401 return;
1402 }
1403
1404 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1405 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1406 GlobalSignalNumber gsn,
1407 Signal* signal,
1408 Uint32 length,
1409 JobBufferLevel jobBuffer,
1410 SectionHandle * sections) const {
1411
1412 Uint32 noOfSections = sections->m_cnt;
1413 Uint32 tSignalId = signal->header.theSignalId;
1414 Uint32 tTrace = signal->getTrace();
1415 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1416
1417 Uint32 ourProcessor = globalData.ownId;
1418 Uint32 recBlock = rg.m_block;
1419
1420 ndbrequire(signal->header.m_noOfSections == 0);
1421 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1422
1423 signal->header.theLength = length;
1424 signal->header.theVerId_signalNumber = gsn;
1425 signal->header.theReceiversBlockNumber = recBlock;
1426 signal->header.theSendersSignalId = tSignalId;
1427 signal->header.theSendersBlockRef = reference();
1428 signal->header.m_noOfSections = noOfSections;
1429
1430 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1431 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1432 return;
1433 }//if
1434
1435 SignalHeader sh;
1436 sh.theVerId_signalNumber = gsn;
1437 sh.theReceiversBlockNumber = recBlock;
1438 sh.theSendersBlockRef = refToBlock(reference());
1439 sh.theLength = length;
1440 sh.theTrace = tTrace;
1441 sh.theSignalId = tSignalId;
1442 sh.m_noOfSections = noOfSections;
1443 sh.m_fragmentInfo = tFragInfo;
1444
1445 /**
1446 * Check own node
1447 */
1448 bool release = true;
1449 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1450 {
1451 release = false;
1452 #ifdef VM_TRACE
1453 if(globalData.testOn){
1454 globalSignalLoggers.sendSignal(signal->header,
1455 jobBuffer,
1456 &signal->theData[0],
1457 ourProcessor,
1458 sections->m_ptr, noOfSections);
1459 }
1460 #endif
1461 /**
1462 * We have to copy the data
1463 */
1464 Uint32 * dst = signal->theData + length;
1465 * dst ++ = sections->m_ptr[0].i;
1466 * dst ++ = sections->m_ptr[1].i;
1467 * dst ++ = sections->m_ptr[2].i;
1468 #ifdef NDBD_MULTITHREADED
1469 if (jobBuffer == JBB)
1470 sendlocal(m_threadId, &signal->header, signal->theData,
1471 signal->theData + length);
1472 else
1473 sendprioa(m_threadId, &signal->header, signal->theData,
1474 signal->theData + length);
1475 #else
1476 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1477 #endif
1478
1479 rg.m_nodes.clear((Uint32)0);
1480 rg.m_nodes.clear(ourProcessor);
1481 }
1482
1483 /**
1484 * Do the big loop
1485 */
1486 Uint32 recNode = 0;
1487 while(!rg.m_nodes.isclear()){
1488 recNode = rg.m_nodes.find(recNode + 1);
1489 rg.m_nodes.clear(recNode);
1490
1491 #ifdef VM_TRACE
1492 if(globalData.testOn){
1493 globalSignalLoggers.sendSignal(signal->header,
1494 jobBuffer,
1495 &signal->theData[0],
1496 recNode,
1497 sections->m_ptr, noOfSections);
1498 }
1499 #endif
1500
1501 #ifdef TRACE_DISTRIBUTED
1502 ndbout_c("send: %s(%d) to (%s, %d)",
1503 getSignalName(gsn), gsn, getBlockName(recBlock),
1504 recNode);
1505 #endif
1506
1507 SendStatus ss;
1508 #ifdef NDBD_MULTITHREADED
1509 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1510 recNode, &g_sectionSegmentPool, sections->m_ptr);
1511 #else
1512 TrpId trp_id = 0;
1513 ss = globalTransporterRegistry.
1514 prepareSend(getNonMTTransporterSendHandle(),
1515 &sh, jobBuffer,
1516 &signal->theData[0], recNode, trp_id,
1517 g_sectionSegmentPool, sections->m_ptr);
1518 #endif
1519
1520 if (unlikely(! (ss == SEND_OK ||
1521 ss == SEND_BLOCKED ||
1522 ss == SEND_DISCONNECTED)))
1523 {
1524 handle_send_failed(ss, signal, recNode, sections->m_ptr);
1525 }
1526 }
1527
1528 if (release)
1529 {
1530 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1531 }
1532
1533 sections->m_cnt = 0;
1534 signal->header.m_noOfSections = 0;
1535 signal->header.m_fragmentInfo = 0;
1536
1537 return;
1538 }
1539
1540 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1541 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1542 GlobalSignalNumber gsn,
1543 Signal* signal,
1544 Uint32 length,
1545 JobBufferLevel jobBuffer,
1546 SectionHandle* sections) const {
1547
1548 /**
1549 * Implementation the same as sendSignal(), except that
1550 * the sections are duplicated when sending locally, and
1551 * not released
1552 */
1553
1554 Uint32 noOfSections = sections->m_cnt;
1555 BlockReference sendBRef = reference();
1556
1557 Uint32 recBlock = refToBlock(ref);
1558 Uint32 recNode = refToNode(ref);
1559 Uint32 ourProcessor = globalData.ownId;
1560
1561 ndbrequire(signal->header.m_noOfSections == 0);
1562 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1563
1564 signal->header.theLength = length;
1565 signal->header.theVerId_signalNumber = gsn;
1566 signal->header.theReceiversBlockNumber = recBlock;
1567 signal->header.m_noOfSections = noOfSections;
1568
1569 Uint32 tSignalId = signal->header.theSignalId;
1570 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1571
1572 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1573 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1574 return;
1575 }//if
1576 #ifdef VM_TRACE
1577 if(globalData.testOn){
1578 Uint16 proc =
1579 (recNode == 0 ? globalData.ownId : recNode);
1580 signal->header.theSendersBlockRef = sendBRef;
1581 globalSignalLoggers.sendSignal(signal->header,
1582 jobBuffer,
1583 &signal->theData[0],
1584 proc,
1585 sections->m_ptr, noOfSections);
1586 }
1587 #endif
1588
1589 if(recNode == ourProcessor || recNode == 0) {
1590 signal->header.theSendersSignalId = tSignalId;
1591 signal->header.theSendersBlockRef = sendBRef;
1592
1593 Uint32 * dst = signal->theData + length;
1594
1595 /* We need to copy the segmented section data into separate
1596 * sections when sending locally and keeping a copy ourselves
1597 */
1598 for (Uint32 sec=0; sec < noOfSections; sec++)
1599 {
1600 Uint32 secCopy;
1601 if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1602 {
1603 handle_out_of_longsignal_memory(signal);
1604 return;
1605 }
1606 * dst ++ = secCopy;
1607 }
1608
1609 #ifdef NDBD_MULTITHREADED
1610 if (jobBuffer == JBB)
1611 sendlocal(m_threadId, &signal->header, signal->theData,
1612 signal->theData + length);
1613 else
1614 sendprioa(m_threadId, &signal->header, signal->theData,
1615 signal->theData + length);
1616 #else
1617 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1618 #endif
1619 } else {
1620 // send distributed Signal
1621 SignalHeader sh;
1622
1623 Uint32 tTrace = signal->getTrace();
1624
1625 sh.theVerId_signalNumber = gsn;
1626 sh.theReceiversBlockNumber = recBlock;
1627 sh.theSendersBlockRef = refToBlock(sendBRef);
1628 sh.theLength = length;
1629 sh.theTrace = tTrace;
1630 sh.theSignalId = tSignalId;
1631 sh.m_noOfSections = noOfSections;
1632 sh.m_fragmentInfo = tFragInfo;
1633
1634 #ifdef TRACE_DISTRIBUTED
1635 ndbout_c("send: %s(%d) to (%s, %d)",
1636 getSignalName(gsn), gsn, getBlockName(recBlock),
1637 recNode);
1638 #endif
1639
1640 SendStatus ss;
1641 #ifdef NDBD_MULTITHREADED
1642 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1643 recNode, &g_sectionSegmentPool, sections->m_ptr);
1644 #else
1645 TrpId trp_id = 0;
1646 ss = globalTransporterRegistry.
1647 prepareSend(getNonMTTransporterSendHandle(),
1648 &sh, jobBuffer,
1649 &signal->theData[0], recNode, trp_id,
1650 g_sectionSegmentPool, sections->m_ptr);
1651 #endif
1652
1653 if (unlikely(! (ss == SEND_OK ||
1654 ss == SEND_BLOCKED ||
1655 ss == SEND_DISCONNECTED)))
1656 {
1657 handle_send_failed(ss, signal, recNode, sections->m_ptr);
1658 }
1659 }
1660
1661 signal->header.m_noOfSections = 0;
1662 signal->header.m_fragmentInfo = 0;
1663 return;
1664 }
1665
1666 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1667 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1668 GlobalSignalNumber gsn,
1669 Signal* signal,
1670 Uint32 length,
1671 JobBufferLevel jobBuffer,
1672 SectionHandle * sections) const {
1673 /**
1674 * Implementation the same as sendSignal(), except that
1675 * the sections are duplicated when sending locally, and
1676 * not released
1677 */
1678
1679 Uint32 noOfSections = sections->m_cnt;
1680 Uint32 tSignalId = signal->header.theSignalId;
1681 Uint32 tTrace = signal->getTrace();
1682 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1683
1684 Uint32 ourProcessor = globalData.ownId;
1685 Uint32 recBlock = rg.m_block;
1686
1687 ndbrequire(signal->header.m_noOfSections == 0);
1688 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1689
1690 signal->header.theLength = length;
1691 signal->header.theVerId_signalNumber = gsn;
1692 signal->header.theReceiversBlockNumber = recBlock;
1693 signal->header.theSendersSignalId = tSignalId;
1694 signal->header.theSendersBlockRef = reference();
1695 signal->header.m_noOfSections = noOfSections;
1696
1697 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1698 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1699 return;
1700 }//if
1701
1702 SignalHeader sh;
1703 sh.theVerId_signalNumber = gsn;
1704 sh.theReceiversBlockNumber = recBlock;
1705 sh.theSendersBlockRef = refToBlock(reference());
1706 sh.theLength = length;
1707 sh.theTrace = tTrace;
1708 sh.theSignalId = tSignalId;
1709 sh.m_noOfSections = noOfSections;
1710 sh.m_fragmentInfo = tFragInfo;
1711
1712 /**
1713 * Check own node
1714 */
1715 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1716 {
1717 #ifdef VM_TRACE
1718 if(globalData.testOn){
1719 globalSignalLoggers.sendSignal(signal->header,
1720 jobBuffer,
1721 &signal->theData[0],
1722 ourProcessor,
1723 sections->m_ptr, noOfSections);
1724 }
1725 #endif
1726
1727 Uint32 * dst = signal->theData + length;
1728
1729 /* We need to copy the segmented section data into separate
1730 * sections when sending locally and keeping a copy ourselves
1731 */
1732 for (Uint32 sec=0; sec < noOfSections; sec++)
1733 {
1734 Uint32 secCopy;
1735 if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1736 {
1737 handle_out_of_longsignal_memory(signal);
1738 return;
1739 }
1740 * dst ++ = secCopy;
1741 }
1742
1743 #ifdef NDBD_MULTITHREADED
1744 if (jobBuffer == JBB)
1745 sendlocal(m_threadId, &signal->header, signal->theData,
1746 signal->theData + length);
1747 else
1748 sendprioa(m_threadId, &signal->header, signal->theData,
1749 signal->theData + length);
1750 #else
1751 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1752 #endif
1753
1754 rg.m_nodes.clear((Uint32)0);
1755 rg.m_nodes.clear(ourProcessor);
1756 }
1757
1758 /**
1759 * Do the big loop
1760 */
1761 Uint32 recNode = 0;
1762 while(!rg.m_nodes.isclear()){
1763 recNode = rg.m_nodes.find(recNode + 1);
1764 rg.m_nodes.clear(recNode);
1765
1766 #ifdef VM_TRACE
1767 if(globalData.testOn){
1768 globalSignalLoggers.sendSignal(signal->header,
1769 jobBuffer,
1770 &signal->theData[0],
1771 recNode,
1772 sections->m_ptr, noOfSections);
1773 }
1774 #endif
1775
1776 #ifdef TRACE_DISTRIBUTED
1777 ndbout_c("send: %s(%d) to (%s, %d)",
1778 getSignalName(gsn), gsn, getBlockName(recBlock),
1779 recNode);
1780 #endif
1781
1782 SendStatus ss;
1783 #ifdef NDBD_MULTITHREADED
1784 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1785 recNode, &g_sectionSegmentPool, sections->m_ptr);
1786 #else
1787 TrpId trp_id = 0;
1788 ss = globalTransporterRegistry.
1789 prepareSend(getNonMTTransporterSendHandle(),
1790 &sh, jobBuffer,
1791 &signal->theData[0], recNode, trp_id,
1792 g_sectionSegmentPool, sections->m_ptr);
1793 #endif
1794
1795 if (unlikely(! (ss == SEND_OK ||
1796 ss == SEND_BLOCKED ||
1797 ss == SEND_DISCONNECTED)))
1798 {
1799 handle_send_failed(ss, signal, recNode, sections->m_ptr);
1800 }
1801 }
1802
1803 signal->header.m_noOfSections = 0;
1804 signal->header.m_fragmentInfo = 0;
1805
1806 return;
1807 }
1808
1809
1810 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1811 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1812 GlobalSignalNumber gsn,
1813 Signal* signal,
1814 Uint32 delayInMilliSeconds,
1815 Uint32 length) const {
1816
1817 BlockNumber bnr = refToBlock(ref);
1818
1819 ndbrequire(signal->header.m_noOfSections == 0);
1820 check_sections(signal, signal->header.m_noOfSections, 0);
1821
1822 signal->header.theLength = length;
1823 signal->header.theSendersSignalId = signal->header.theSignalId;
1824 signal->header.theVerId_signalNumber = gsn;
1825 signal->header.theReceiversBlockNumber = bnr;
1826 signal->header.theSendersBlockRef = reference();
1827
1828 assert(length <= 25);
1829 #ifdef VM_TRACE
1830 {
1831 if(globalData.testOn){
1832 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1833 signal->header,
1834 0,
1835 &signal->theData[0],
1836 globalData.ownId);
1837 }
1838 }
1839 #endif
1840
1841 #ifdef NDBD_MULTITHREADED
1842 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1843 #else
1844 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1845 #endif
1846
1847 // befor 2nd parameter to globalTimeQueue.insert
1848 // (Priority)theSendSig[sigIndex].jobBuffer
1849 }
1850
1851 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1852 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1853 GlobalSignalNumber gsn,
1854 Signal* signal,
1855 Uint32 delayInMilliSeconds,
1856 Uint32 length,
1857 SectionHandle * sections) const {
1858
1859 Uint32 noOfSections = sections->m_cnt;
1860 BlockNumber bnr = refToBlock(ref);
1861
1862 BlockReference sendBRef = reference();
1863
1864 if (bnr == 0) {
1865 bnr_error();
1866 }//if
1867
1868 ndbrequire(signal->header.m_noOfSections == 0);
1869 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1870
1871 signal->header.theLength = length;
1872 signal->header.theSendersSignalId = signal->header.theSignalId;
1873 signal->header.theSendersBlockRef = sendBRef;
1874 signal->header.theVerId_signalNumber = gsn;
1875 signal->header.theReceiversBlockNumber = bnr;
1876 signal->header.m_noOfSections = noOfSections;
1877
1878 assert(length + noOfSections <= 25);
1879 Uint32 * dst = signal->theData + length;
1880 * dst ++ = sections->m_ptr[0].i;
1881 * dst ++ = sections->m_ptr[1].i;
1882 * dst ++ = sections->m_ptr[2].i;
1883
1884 #ifdef VM_TRACE
1885 {
1886 if(globalData.testOn){
1887 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1888 signal->header,
1889 0,
1890 &signal->theData[0],
1891 globalData.ownId);
1892 }
1893 }
1894 #endif
1895
1896 #ifdef NDBD_MULTITHREADED
1897 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1898 #else
1899 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1900 #endif
1901
1902 signal->header.m_noOfSections = 0;
1903 signal->header.m_fragmentInfo = 0;
1904 sections->m_cnt = 0;
1905 }
1906
1907 void
release(SegmentedSectionPtr & ptr)1908 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1909 {
1910 ::release(SB_SP_ARG ptr);
1911 }
1912
1913 void
releaseSection(Uint32 firstSegmentIVal)1914 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1915 {
1916 ::releaseSection(SB_SP_ARG firstSegmentIVal);
1917 }
1918
1919 void
releaseSections(SectionHandle & handle)1920 SimulatedBlock::releaseSections(SectionHandle& handle)
1921 {
1922 ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1923 handle.m_cnt = 0;
1924 }
1925
1926 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1927 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1928 {
1929 return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1930 }
1931
1932 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1933 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1934 {
1935 return ::import(SB_SP_ARG first, src, len);
1936 }
1937
1938 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len) const1939 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len) const
1940 {
1941 Ptr<SectionSegment> tmp;
1942 if (::import(SB_SP_ARG tmp, src, len))
1943 {
1944 ptr.i = tmp.i;
1945 ptr.p = tmp.p;
1946 ptr.sz = len;
1947 return true;
1948 }
1949 return false;
1950 }
1951
1952 bool
import(SectionHandle * dst,LinearSectionPtr src[3],Uint32 cnt)1953 SimulatedBlock::import(SectionHandle * dst,
1954 LinearSectionPtr src[3],
1955 Uint32 cnt)
1956 {
1957 ndbassert(dst->m_cnt == 0);
1958 if (dst->m_cnt)
1959 {
1960 releaseSections(* dst);
1961 }
1962
1963 for (Uint32 i = 0; i < cnt; i++)
1964 {
1965 if (unlikely(!import(dst->m_ptr[i], src[i].p, src[i].sz)))
1966 {
1967 if (i)
1968 {
1969 dst->m_cnt = i - 1;
1970 releaseSections(* dst);
1971 return false;
1972 }
1973 }
1974 }
1975 dst->m_cnt = cnt;
1976 return true;
1977 }
1978
1979
1980 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1981 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1982 {
1983 return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1984 }
1985
1986 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1987 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1988 const Uint32* src, Uint32 len)
1989 {
1990 return ::writeToSection(firstSegmentIVal, offset, src, len);
1991 }
1992
1993 class SectionSegmentPool&
getSectionSegmentPool()1994 SimulatedBlock::getSectionSegmentPool(){
1995 return g_sectionSegmentPool;
1996 }
1997
1998 NewVARIABLE *
allocateBat(int batSize)1999 SimulatedBlock::allocateBat(int batSize){
2000 NewVARIABLE* bat = NewVarRef;
2001 bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
2002 NewVarRef = bat;
2003 theBATSize = batSize;
2004 return bat;
2005 }
2006
2007 void
freeBat()2008 SimulatedBlock::freeBat(){
2009 if(NewVarRef != 0){
2010 free(NewVarRef);
2011 NewVarRef = 0;
2012 }
2013 }
2014
2015 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)2016 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
2017 assert(blockNo == blockToMain(blockNo));
2018 SimulatedBlock * sb = globalData.getBlock(blockNo);
2019 if (sb != 0 && instanceNo != 0)
2020 sb = sb->getInstance(instanceNo);
2021 if(sb == 0)
2022 return 0;
2023 return sb->NewVarRef;
2024 }
2025
2026 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)2027 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
2028 assert(blockNo == blockToMain(blockNo));
2029 SimulatedBlock * sb = globalData.getBlock(blockNo);
2030 if (sb != 0 && instanceNo != 0)
2031 sb = sb->getInstance(instanceNo);
2032 if(sb == 0)
2033 return 0;
2034 return sb->theBATSize;
2035 }
2036
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)2037 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
2038 {
2039 return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
2040 }
2041
2042 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)2043 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
2044 {
2045
2046 void * p = NULL;
2047 Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
2048 size_t size = n*s + over_alloc;
2049 Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
2050 refresh_watch_dog(9);
2051 if (real_size > 0){
2052 #if defined(VM_TRACE_MEM) || defined(VM_TRACE) || defined(ERROR_INSERT)
2053 g_eventLogger->info("%s::allocRecord(%s, %zu, %zu) = %llu bytes",
2054 getBlockName(number()),
2055 type,
2056 s,
2057 n,
2058 real_size);
2059 #endif
2060 if( real_size == (Uint64)size )
2061 p = ndbd_malloc_watched(size, get_watch_dog());
2062 if (p == NULL){
2063 char buf1[255];
2064 char buf2[255];
2065 struct ndb_mgm_param_info param_info;
2066 size_t size = sizeof(ndb_mgm_param_info);
2067
2068 if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size)) {
2069 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
2070 getBlockName(number()), param_info.m_name);
2071 } else {
2072 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
2073 getBlockName(number()), type);
2074 }
2075 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
2076 (Uint32)s, (Uint32)n, (Uint64)real_size);
2077 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
2078 }
2079
2080 if(clear){
2081 char * ptr = (char*)p;
2082 const Uint32 chunk = 128 * 1024;
2083 while(size > chunk){
2084 refresh_watch_dog(9);
2085 memset(ptr, 0, chunk);
2086 ptr += chunk;
2087 size -= chunk;
2088 }
2089 refresh_watch_dog(9);
2090 memset(ptr, 0, size);
2091 }
2092 if (unaligned_buffer)
2093 {
2094 *unaligned_buffer = p;
2095 p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
2096 #ifdef VM_TRACE
2097 g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
2098 type, align, (Uint64)p, (Uint64)p+n*s,
2099 (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
2100 #endif
2101 }
2102 }
2103 return p;
2104 }
2105
2106 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)2107 SimulatedBlock::deallocRecord(void ** ptr,
2108 const char * type, size_t s, size_t n){
2109 (void)type;
2110
2111 if(* ptr != 0){
2112 ndbd_free(* ptr, n*s);
2113 * ptr = 0;
2114 }
2115 }
2116
2117 int
sortchunks(const void * _e0,const void * _e1)2118 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
2119 {
2120 const AllocChunk *p0 = (const AllocChunk*)_e0;
2121 const AllocChunk *p1 = (const AllocChunk*)_e1;
2122
2123 if (p0->ptrI > p1->ptrI)
2124 return 1;
2125 if (p0->ptrI < p1->ptrI)
2126 return -1;
2127 return 0;
2128 }
2129
2130 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)2131 SimulatedBlock::allocChunks(AllocChunk dst[],
2132 Uint32 arraysize,
2133 Uint32 rg,
2134 Uint32 pages,
2135 Uint32 paramId)
2136 {
2137 const Uint32 save = pages; // For fail
2138 Uint32 i = 0;
2139 for (; i<arraysize && pages > 0; i++)
2140 {
2141 Uint32 cnt = pages;
2142 m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
2143 if (unlikely(cnt == 0))
2144 goto fail;
2145 pages -= cnt;
2146 dst[i].cnt = cnt;
2147 }
2148 if (unlikely(pages != 0))
2149 goto fail;
2150
2151 qsort(dst, i, sizeof(dst[0]), sortchunks);
2152 return i;
2153
2154 fail:
2155 char buf1[255];
2156 char buf2[255];
2157 struct ndb_mgm_param_info param_info;
2158 size_t size = sizeof(ndb_mgm_param_info);
2159
2160 if (ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size) != 0)
2161 {
2162 ndbassert(false);
2163 param_info.m_name = "<unknown>";
2164 }
2165
2166 BaseString::snprintf(buf1, sizeof(buf1),
2167 "%s could not allocate memory for parameter %s",
2168 getBlockName(number()), param_info.m_name);
2169 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
2170 Uint64(save) * sizeof(GlobalPage));
2171 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
2172 return 0;
2173 }
2174
2175 void
refresh_watch_dog(Uint32 place)2176 SimulatedBlock::refresh_watch_dog(Uint32 place)
2177 {
2178 #ifdef NDBD_MULTITHREADED
2179 (*m_watchDogCounter) = place;
2180 #else
2181 globalData.incrementWatchDogCounter(place);
2182 #endif
2183 }
2184
2185 volatile Uint32*
get_watch_dog()2186 SimulatedBlock::get_watch_dog()
2187 {
2188 #ifdef NDBD_MULTITHREADED
2189 return m_watchDogCounter;
2190 #else
2191 return globalData.getWatchDogPtr();
2192 #endif
2193 }
2194
2195 void
update_watch_dog_timer(Uint32 interval)2196 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
2197 {
2198 extern EmulatorData globalEmulatorData;
2199 globalEmulatorData.theWatchDog->setCheckInterval(interval);
2200 }
2201
2202 void
progError(int line,int err_code,const char * extra,const char * check) const2203 SimulatedBlock::progError(int line, int err_code, const char* extra,
2204 const char* check) const {
2205 jamNoBlock();
2206
2207 const char *aBlockName = getBlockName(number(), "VM Kernel");
2208
2209 // Pack status of interesting config variables
2210 // so that we can print them in error.log
2211 int magicStatus =
2212 (m_ctx.m_config.stopOnError()<<1) +
2213 (m_ctx.m_config.getInitialStart()<<2);
2214
2215 /* Add line number and failed expression to block name */
2216 char buf[500];
2217 /*Add the check to the log message only if default value of ""
2218 is over-written. */
2219 if(native_strcasecmp(check,"") == 0)
2220 BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
2221 aBlockName, line, magicStatus);
2222 else
2223 BaseString::snprintf(&buf[0], sizeof(buf),
2224 "%s (Line: %d) 0x%.8x Check %.400s failed", aBlockName,
2225 line, magicStatus, check);
2226
2227 ErrorReporter::handleError(err_code, extra, buf);
2228
2229 }
2230
2231 #define MAX_EVENT_REP_SIZE_BYTES (MAX_EVENT_REP_SIZE_WORDS * 4)
2232 void
infoEvent(const char * msg,...) const2233 SimulatedBlock::infoEvent(const char * msg, ...) const
2234 {
2235 if(msg == 0)
2236 return;
2237
2238 SignalT<25> signalT;
2239 signalT.theData[0] = NDB_LE_InfoEvent;
2240 Uint32 buf_str[MAX_EVENT_REP_SIZE_WORDS];
2241 char * buf = (char *)&buf_str[1];
2242
2243 buf_str[0] = signalT.theData[0];
2244 va_list ap;
2245 va_start(ap, msg);
2246 BaseString::vsnprintf(buf, MAX_EVENT_REP_SIZE_BYTES - 5, msg, ap);
2247 va_end(ap);
2248
2249 size_t len = strlen(buf) + 1;
2250 if(len >= (MAX_EVENT_REP_SIZE_BYTES - 5))
2251 {
2252 len = MAX_EVENT_REP_SIZE_BYTES - 4;
2253 buf[MAX_EVENT_REP_SIZE_BYTES - 5] = 0;
2254 }
2255
2256 SegmentedSectionPtr segptr;
2257 Uint32 len_words = 1 + ((len + 3) / 4);
2258 bool ok = import(segptr,
2259 &buf_str[0],
2260 len_words);
2261 signalT.theData[1] = segptr.i;
2262 if (!ok)
2263 {
2264 return;
2265 }
2266 /**
2267 * Init and put it into the job buffer
2268 */
2269 memset(&signalT.header, 0, sizeof(SignalHeader));
2270
2271 const Signal * signal = globalScheduler.getVMSignals();
2272 Uint32 tTrace = signal->header.theTrace;
2273 Uint32 tSignalId = signal->header.theSignalId;
2274
2275 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
2276 signalT.header.theReceiversBlockNumber = CMVMI;
2277 signalT.header.theSendersBlockRef = reference();
2278 signalT.header.theTrace = tTrace;
2279 signalT.header.theSignalId = tSignalId;
2280 signalT.header.theLength = 1;
2281 signalT.header.m_noOfSections = 1;
2282 #ifdef NDBD_MULTITHREADED
2283 sendlocal(m_threadId,
2284 &signalT.header, signalT.theData, signalT.theData + 1);
2285 #else
2286 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
2287 signalT.theData + 1);
2288 #endif
2289 }
2290
2291 void
warningEvent(const char * msg,...)2292 SimulatedBlock::warningEvent(const char * msg, ...)
2293 {
2294 if(msg == 0)
2295 return;
2296
2297 SignalT<25> signalT;
2298 signalT.theData[0] = NDB_LE_WarningEvent;
2299 Uint32 buf_str[MAX_EVENT_REP_SIZE_WORDS];
2300 char * buf = (char *)&buf_str[1];
2301 memset(&buf_str[0], 0, 4);
2302
2303 va_list ap;
2304 va_start(ap, msg);
2305 BaseString::vsnprintf(buf, MAX_EVENT_REP_SIZE_BYTES - 5, msg, ap);
2306 va_end(ap);
2307
2308 size_t len = strlen(buf) + 1;
2309 if(len >= (MAX_EVENT_REP_SIZE_BYTES - 5))
2310 {
2311 len = MAX_EVENT_REP_SIZE_BYTES - 4;
2312 buf[MAX_EVENT_REP_SIZE_BYTES - 5] = 0;
2313 }
2314 SegmentedSectionPtr segptr;
2315 Uint32 len_words = 1 + ((len + 3) / 4);
2316 bool ok = import(segptr,
2317 &buf_str[0],
2318 len_words);
2319 signalT.theData[1] = segptr.i;
2320 if (!ok)
2321 {
2322 return;
2323 }
2324
2325 /**
2326 * Init and put it into the job buffer
2327 */
2328 memset(&signalT.header, 0, sizeof(SignalHeader));
2329
2330 const Signal * signal = globalScheduler.getVMSignals();
2331 Uint32 tTrace = signal->header.theTrace;
2332 Uint32 tSignalId = signal->header.theSignalId;
2333
2334 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
2335 signalT.header.theReceiversBlockNumber = CMVMI;
2336 signalT.header.theSendersBlockRef = reference();
2337 signalT.header.theTrace = tTrace;
2338 signalT.header.theSignalId = tSignalId;
2339 signalT.header.theLength = 1;
2340 signalT.header.m_noOfSections = 1;
2341
2342 #ifdef NDBD_MULTITHREADED
2343 sendlocal(m_threadId,
2344 &signalT.header, signalT.theData, signalT.theData + 1);
2345 #else
2346 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
2347 signalT.theData + 1);
2348 #endif
2349 }
2350
2351 void
execNODE_STATE_REP(Signal * signal)2352 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
2353 const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0];
2354
2355 this->theNodeState = rep->nodeState;
2356 }
2357
2358 void
execCHANGE_NODE_STATE_REQ(Signal * signal)2359 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
2360 const ChangeNodeStateReq * const req =
2361 (ChangeNodeStateReq *)&signal->theData[0];
2362
2363 this->theNodeState = req->nodeState;
2364 const Uint32 senderData = req->senderData;
2365 const BlockReference senderRef = req->senderRef;
2366
2367 /**
2368 * Pack return signal
2369 */
2370 ChangeNodeStateConf * const conf =
2371 (ChangeNodeStateConf *)&signal->theData[0];
2372
2373 conf->senderData = senderData;
2374
2375 sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
2376 ChangeNodeStateConf::SignalLength, JBB);
2377 }
2378
2379 void
execNDB_TAMPER(Signal * signal)2380 SimulatedBlock::execNDB_TAMPER(Signal * signal){
2381 if (signal->getLength() == 1)
2382 {
2383 SET_ERROR_INSERT_VALUE(signal->theData[0]);
2384 }
2385 else
2386 {
2387 SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
2388 }
2389 }
2390
2391 void
execSIGNAL_DROPPED_REP(Signal * signal)2392 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
2393 /* Note no need for fragmented signal handling as we are
2394 * going to crash this node
2395 */
2396 char msg[64];
2397 const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
2398 BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
2399 rep->originalGsn, rep->originalLength,rep->originalSectionCount);
2400 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
2401 msg,
2402 __FILE__,
2403 NST_ErrorHandler);
2404 }
2405
2406 void
execCONTINUE_FRAGMENTED(Signal * signal)2407 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
2408 jamEntry();
2409
2410 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2411 ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
2412
2413 switch (sig->type)
2414 {
2415 case ContinueFragmented::CONTINUE_SENDING :
2416 {
2417 jam();
2418 Ptr<FragmentSendInfo> fragPtr;
2419
2420 c_segmentedFragmentSendList.first(fragPtr);
2421 for(; !fragPtr.isNull();){
2422 jam();
2423 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2424 c_segmentedFragmentSendList.next(fragPtr);
2425
2426 sendNextSegmentedFragment(signal, * copyPtr.p);
2427 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2428 jam();
2429 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2430 jam();
2431 execute(signal, copyPtr.p->m_callback, 0);
2432 }//if
2433 c_segmentedFragmentSendList.release(copyPtr);
2434 }
2435 }
2436
2437 c_linearFragmentSendList.first(fragPtr);
2438 for(; !fragPtr.isNull();){
2439 jam();
2440 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2441 c_linearFragmentSendList.next(fragPtr);
2442
2443 sendNextLinearFragment(signal, * copyPtr.p);
2444 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2445 jam();
2446 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2447 jam();
2448 execute(signal, copyPtr.p->m_callback, 0);
2449 }//if
2450 c_linearFragmentSendList.release(copyPtr);
2451 }
2452 }
2453
2454 if(c_segmentedFragmentSendList.isEmpty() &&
2455 c_linearFragmentSendList.isEmpty()){
2456 jam();
2457 c_fragSenderRunning = false;
2458 return;
2459 }
2460
2461 sig->type = ContinueFragmented::CONTINUE_SENDING;
2462 sig->line = __LINE__;
2463 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2464 break;
2465 }
2466 case ContinueFragmented::CONTINUE_CLEANUP:
2467 {
2468 jam();
2469
2470 const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2471 /* Check length of signal */
2472 ndbassert(signal->getLength() ==
2473 ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2474 callbackWords);
2475
2476 Callback cb;
2477 memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2478
2479 doNodeFailureCleanup(signal,
2480 sig->cleanup.failedNodeId,
2481 sig->cleanup.resource,
2482 sig->cleanup.cursor,
2483 sig->cleanup.elementsCleaned,
2484 cb);
2485 break;
2486 }
2487 default:
2488 ndbabort();
2489 }
2490 }
2491
2492 void
execSTOP_FOR_CRASH(Signal * signal)2493 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2494 {
2495 #ifdef NDBD_MULTITHREADED
2496 mt_execSTOP_FOR_CRASH();
2497 #endif
2498 }
2499
2500 void
execNODE_START_REP(Signal * signal)2501 SimulatedBlock::execNODE_START_REP(Signal* signal)
2502 {
2503 }
2504
2505 void
execAPI_START_REP(Signal * signal)2506 SimulatedBlock::execAPI_START_REP(Signal* signal)
2507 {
2508 }
2509
2510 void
execSEND_PACKED(Signal * signal)2511 SimulatedBlock::execSEND_PACKED(Signal* signal)
2512 {
2513 }
2514
2515 ATTRIBUTE_NOINLINE
2516 void
handle_execute_error(GlobalSignalNumber gsn)2517 SimulatedBlock::handle_execute_error(GlobalSignalNumber gsn)
2518 {
2519 /**
2520 * This method only called if an error has occurred
2521 */
2522 char errorMsg[255];
2523 if (!(gsn <= MAX_GSN)) {
2524 BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d too high)", gsn);
2525 ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2526 }
2527 if (!(theExecArray[gsn] != 0)) {
2528 BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d not added)", gsn);
2529 ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2530 }
2531 ndbabort();
2532 }
2533
2534 // MT LQH callback CONF via signal
2535
2536 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2537 SimulatedBlock::getCallbackEntry(Uint32 ci)
2538 {
2539 ndbrequire(m_callbackTableAddr != 0);
2540 const CallbackTable& ct = *m_callbackTableAddr;
2541 ndbrequire(ci < ct.m_count);
2542 return ct.m_entry[ci];
2543 }
2544
2545 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 senderData,Uint32 callbackInfo,Uint32 returnCode)2546 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2547 CallbackPtr& cptr,
2548 Uint32 senderData, Uint32 callbackInfo,
2549 Uint32 returnCode)
2550 {
2551 Uint32 blockNo = blockToMain(fullBlockNo);
2552 Uint32 instanceNo = blockToInstance(fullBlockNo);
2553 SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2554 ndbrequire(b != 0);
2555
2556 const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2557
2558 if (!isNdbMtLqh()) {
2559 Callback c;
2560 c.m_callbackFunction = ce.m_function;
2561 c.m_callbackData = cptr.m_callbackData;
2562 b->execute(signal, c, returnCode);
2563
2564 if (ce.m_flags & CALLBACK_ACK) {
2565 jam();
2566 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2567 ack->senderData = senderData;
2568 ack->callbackInfo = callbackInfo;
2569 EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2570 signal, CallbackAck::SignalLength);
2571 }
2572 } else {
2573 CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2574 conf->senderData = senderData;
2575 conf->senderRef = reference();
2576 conf->callbackIndex = cptr.m_callbackIndex;
2577 conf->callbackData = cptr.m_callbackData;
2578 conf->callbackInfo = callbackInfo;
2579 conf->returnCode = returnCode;
2580
2581 if (ce.m_flags & CALLBACK_DIRECT) {
2582 jam();
2583 EXECUTE_DIRECT_MT(blockNo, GSN_CALLBACK_CONF,
2584 signal, CallbackConf::SignalLength, instanceNo);
2585 } else {
2586 jam();
2587 BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2588 sendSignal(ref, GSN_CALLBACK_CONF,
2589 signal, CallbackConf::SignalLength, JBB);
2590 }
2591 }
2592 cptr.m_callbackIndex = ZNIL;
2593 }
2594
2595 void
execCALLBACK_CONF(Signal * signal)2596 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2597 {
2598 const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2599
2600 Uint32 senderData = conf->senderData;
2601 Uint32 senderRef = conf->senderRef;
2602 Uint32 callbackIndex = conf->callbackIndex;
2603 Uint32 callbackData = conf->callbackData;
2604 Uint32 callbackInfo = conf->callbackInfo;
2605 Uint32 returnCode = conf->returnCode;
2606
2607 ndbrequire(m_callbackTableAddr != 0);
2608 const CallbackEntry& ce = getCallbackEntry(callbackIndex);
2609 CallbackFunction function = ce.m_function;
2610
2611 Callback callback;
2612 callback.m_callbackFunction = function;
2613 callback.m_callbackData = callbackData;
2614 execute(signal, callback, returnCode);
2615
2616 if (ce.m_flags & CALLBACK_ACK) {
2617 jam();
2618 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2619 ack->senderData = senderData;
2620 ack->callbackInfo = callbackInfo;
2621 sendSignal(senderRef, GSN_CALLBACK_ACK,
2622 signal, CallbackAck::SignalLength, JBB);
2623 }
2624 }
2625
2626 #ifdef VM_TRACE_TIME
2627 void
clearTimes()2628 SimulatedBlock::clearTimes() {
2629 for(Uint32 i = 0; i <= MAX_GSN; i++){
2630 m_timeTrace[i].cnt = 0;
2631 m_timeTrace[i].sum = 0;
2632 m_timeTrace[i].sub = 0;
2633 }
2634 }
2635
2636 void
printTimes(FILE * output)2637 SimulatedBlock::printTimes(FILE * output){
2638 fprintf(output, "-- %s --\n", getBlockName(number()));
2639 Uint64 sum = 0;
2640 for(Uint32 i = 0; i <= MAX_GSN; i++){
2641 Uint32 n = m_timeTrace[i].cnt;
2642 if(n != 0){
2643 double dn = n;
2644
2645 double avg = m_timeTrace[i].sum;
2646 double avg2 = avg - m_timeTrace[i].sub;
2647
2648 avg /= dn;
2649 avg2 /= dn;
2650
2651 fprintf(output,
2652 //name ; cnt ; loc ; acc
2653 "%s ; #%d ; %dus ; %dus ; %dms\n",
2654 getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2655 (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2656
2657 sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2658 }
2659 }
2660 sum = (sum + 500)/ 1000;
2661 fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2662 fprintf(output, "\n");
2663 fflush(output);
2664 }
2665
2666 #endif
2667
FragmentInfo(Uint32 fragId,Uint32 sender)2668 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2669 m_fragmentId = fragId;
2670 m_senderRef = sender;
2671 m_sectionPtrI[0] = RNIL;
2672 m_sectionPtrI[1] = RNIL;
2673 m_sectionPtrI[2] = RNIL;
2674 }
2675
FragmentSendInfo()2676 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2677 {
2678 }
2679
2680 bool
assembleFragments(Signal * signal)2681 SimulatedBlock::assembleFragments(Signal * signal){
2682 Uint32 sigLen = signal->length() - 1;
2683 Uint32 fragId = signal->theData[sigLen];
2684 Uint32 fragInfo = signal->header.m_fragmentInfo;
2685 Uint32 senderRef = signal->getSendersBlockRef();
2686
2687 Uint32 *sectionPtr = signal->m_sectionPtrI;
2688
2689 if(fragInfo == 0){
2690 return true;
2691 }
2692
2693 const Uint32 secs = signal->header.m_noOfSections;
2694 const Uint32 * const secNos = &signal->theData[sigLen - secs];
2695
2696 if(fragInfo == 1){
2697 /**
2698 * First in train
2699 */
2700 Ptr<FragmentInfo> fragPtr;
2701 if(!c_fragmentInfoHash.seize(fragPtr)){
2702 ndbabort();
2703 return false;
2704 }
2705
2706 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2707 c_fragmentInfoHash.add(fragPtr);
2708
2709 for(Uint32 i = 0; i<secs; i++){
2710 Uint32 sectionNo = secNos[i];
2711 ndbassert(sectionNo < 3);
2712 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2713 }
2714
2715 ndbassert(! fragPtr.p->isDropped() );
2716
2717 /**
2718 * Don't release allocated segments
2719 */
2720 signal->header.m_fragmentInfo = 0;
2721 signal->header.m_noOfSections = 0;
2722 return false;
2723 }
2724
2725 FragmentInfo key(fragId, senderRef);
2726 Ptr<FragmentInfo> fragPtr;
2727 if(c_fragmentInfoHash.find(fragPtr, key)){
2728
2729 /**
2730 * FragInfo == 2 or 3
2731 */
2732 if ( likely(! fragPtr.p->isDropped()) )
2733 {
2734 Uint32 i;
2735 for(i = 0; i<secs; i++){
2736 Uint32 sectionNo = secNos[i];
2737 ndbassert(sectionNo < 3);
2738 Uint32 sectionPtrI = sectionPtr[i];
2739 if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2740 linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2741 } else {
2742 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2743 }
2744 }
2745
2746 /**
2747 * fragInfo = 2
2748 */
2749 if(fragInfo == 2){
2750 signal->header.m_fragmentInfo = 0;
2751 signal->header.m_noOfSections = 0;
2752 return false;
2753 }
2754
2755 /**
2756 * fragInfo = 3
2757 */
2758 for(i = 0; i<3; i++){
2759 Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2760 if(ptrI != RNIL){
2761 signal->m_sectionPtrI[i] = ptrI;
2762 } else {
2763 break;
2764 }
2765 }
2766
2767 signal->setLength(sigLen - secs);
2768 signal->header.m_noOfSections = i;
2769 signal->header.m_fragmentInfo = 0;
2770
2771 c_fragmentInfoHash.release(fragPtr);
2772 return true;
2773 }
2774 else
2775 {
2776 /* This fragmented signal has already had at least 1 fragment
2777 * dropped. We must release the received segments.
2778 */
2779 for (Uint32 i=0; i < secs; i++)
2780 releaseSection( sectionPtr[i] );
2781
2782 signal->header.m_fragmentInfo = 0;
2783 signal->header.m_noOfSections = 0;
2784
2785 /* FragInfo == 2
2786 * More fragments to come, keep waiting
2787 */
2788 if (fragInfo == 2)
2789 return false;
2790
2791 /* FragInfo == 3
2792 * That was the last fragment.
2793 * We're now ready for handling the dropped signal.
2794 */
2795 SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2796 Uint32 gsn = signal->header.theVerId_signalNumber;
2797 Uint32 len = signal->header.theLength;
2798 Uint32 newLen= (len > 22 ? 22 : len);
2799 memmove(rep->originalData, signal->theData, (4 * newLen));
2800 rep->originalGsn = gsn;
2801 rep->originalLength = len;
2802 rep->originalSectionCount = 0;
2803 signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2804 signal->header.theLength = newLen + 3;
2805 signal->header.m_noOfSections = 0;
2806 signal->header.m_fragmentInfo = 3;
2807
2808
2809 /**
2810 * NOTE: Don't use EXECUTE_DIRECT as it
2811 * sets sendersBlockRef to reference()
2812 */
2813 /* Perform dropped signal handling, in this thread, now */
2814 jamBuffer()->markEndOfSigExec();
2815 executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2816
2817 /* return false to caller - they should not process the signal */
2818 return false;
2819 } // else (isDropped())
2820 }
2821
2822 /**
2823 * Unable to find fragment
2824 */
2825 ndbabort();
2826 return false;
2827 }
2828
2829 bool
assembleDroppedFragments(Signal * signal)2830 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2831 {
2832 /* This method is called at the start of a SIGNAL_DROPPED_REP
2833 * handler when there is a chance that the dropped signal could
2834 * be part of a fragmented signal.
2835 * If the dropped signal was a fragmented signal, this
2836 * needs to be handled specially to ensure that fragments
2837 * of the signal are correctly dropped to avoid segment
2838 * leaks etc.
2839 * There are a number of cases :
2840 * 1) First fragment dropped (FragInfo=1)
2841 * All remaining fragments must be dropped when they
2842 * arrive. The Signal dropped report handler must be
2843 * executed when the last fragment has arrived.
2844 * 2) Middle fragment dropped (FragInfo=2)
2845 * Any existing stored segments must be released.
2846 * All remaining fragments must be dropped when they
2847 * arrive.
2848 * 3) Last fragment dropped (FragInfo=3)
2849 * Any existing stored segments must be released.
2850 * Signal Dropped handling can occur, so return true.
2851 *
2852 * To indicate that a fragment has been dropped for a signal,
2853 * all the section I Values in the fragment's hash entry are
2854 * set to RNIL.
2855 * Signal Dropped Report handling is performed when the last
2856 * fragment arrives. If the last fragment is not dropped
2857 * by the transporter layer then normal fragment assembly
2858 * arranges for dropped signal handling to occur.
2859 */
2860 Uint32 sigLen = signal->length() - 1;
2861 Uint32 fragId = signal->theData[sigLen];
2862 Uint32 fragInfo = signal->header.m_fragmentInfo;
2863 Uint32 senderRef = signal->getSendersBlockRef();
2864
2865 if(fragInfo == 0){
2866 return true;
2867 }
2868
2869 /* This method is for handling SIGNAL_DROPPED_REP only */
2870 ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2871 ndbrequire(signal->header.m_noOfSections == 0);
2872
2873 if(fragInfo == 1){
2874 /**
2875 * First in train
2876 */
2877 Ptr<FragmentInfo> fragPtr;
2878 if(!c_fragmentInfoHash.seize(fragPtr)){
2879 ndbabort();
2880 return false;
2881 }
2882
2883 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2884 c_fragmentInfoHash.add(fragPtr);
2885
2886 /* Mark entry in hash as belonging to dropped signal so subsequent
2887 * fragments can also be dropped
2888 */
2889 fragPtr.p->m_sectionPtrI[0]= RNIL;
2890 fragPtr.p->m_sectionPtrI[1]= RNIL;
2891 fragPtr.p->m_sectionPtrI[2]= RNIL;
2892
2893 /* Wait for last fragment before SignalDroppedRep handling */
2894 signal->header.m_fragmentInfo = 0;
2895 return false;
2896 }
2897
2898 FragmentInfo key(fragId, senderRef);
2899 Ptr<FragmentInfo> fragPtr;
2900 if(c_fragmentInfoHash.find(fragPtr, key)){
2901
2902 /**
2903 * FragInfo == 2 or 3
2904 */
2905 if (! fragPtr.p->isDropped() )
2906 {
2907 /* Fragmented Signal not already marked as dropped
2908 * Need to free stored segments
2909 */
2910 releaseSection(fragPtr.p->m_sectionPtrI[0]);
2911 releaseSection(fragPtr.p->m_sectionPtrI[1]);
2912 releaseSection(fragPtr.p->m_sectionPtrI[2]);
2913
2914 /* Mark as dropped now */
2915 fragPtr.p->m_sectionPtrI[0]= RNIL;
2916 fragPtr.p->m_sectionPtrI[1]= RNIL;
2917 fragPtr.p->m_sectionPtrI[2]= RNIL;
2918
2919 ndbassert( fragPtr.p->isDropped() );
2920 }
2921
2922 /**
2923 * fragInfo = 2
2924 * Still waiting for final fragments.
2925 * Return false to caller.
2926 */
2927 if(fragInfo == 2){
2928 signal->header.m_fragmentInfo = 0;
2929 return false;
2930 }
2931
2932 /**
2933 * fragInfo = 3
2934 * All fragments received, remove entry
2935 * from hash and return to caller for
2936 * dropped signal handling.
2937 */
2938 signal->header.m_fragmentInfo = 0;
2939
2940 c_fragmentInfoHash.release(fragPtr);
2941 return true;
2942 }
2943
2944 /**
2945 * Unable to find fragment
2946 */
2947 ndbabort();
2948 return false;
2949 }
2950
2951 /**
2952 * doCleanupFragInfo
2953 * Iterate over block's Fragment assembly hash, looking
2954 * for in-assembly fragments from the failed node
2955 * Release these
2956 * Returns after each scanned bucket to avoid consuming
2957 * too much time.
2958 *
2959 * Parameters
2960 * failedNodeId : Node id of failed node
2961 * cursor : Hash bucket to start iteration from
2962 * rtUnitsUsed : Total rt units used
2963 * elementsCleaned : Number of elements cleaned
2964 *
2965 * Updates
2966 * cursor : Hash bucket to continue iteration from
2967 * rtUnitsUsed : += units used
2968 * elementsCleaned : += elements cleaned
2969 *
2970 * Returns
2971 * true if all FragInfo structs cleaned up
2972 * false if more to do
2973 */
2974 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2975 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2976 Uint32& cursor,
2977 Uint32& rtUnitsUsed,
2978 Uint32& elementsCleaned)
2979 {
2980 jam();
2981 FragmentInfo_hash::Iterator iter;
2982
2983 c_fragmentInfoHash.next(cursor, iter);
2984
2985 const Uint32 startBucket = iter.bucket;
2986
2987 while (!iter.isNull() &&
2988 (iter.bucket == startBucket))
2989 {
2990 jam();
2991
2992 Ptr<FragmentInfo> curr = iter.curr;
2993 c_fragmentInfoHash.next(iter);
2994
2995 FragmentInfo* fragInfo = curr.p;
2996
2997 if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2998 {
2999 jam();
3000 /* We were assembling a fragmented signal from the
3001 * failed node, discard the partially assembled
3002 * sections and free the FragmentInfo hash entry
3003 */
3004 for(Uint32 s = 0; s<3; s++)
3005 {
3006 if (fragInfo->m_sectionPtrI[s] != RNIL)
3007 {
3008 jam();
3009 SegmentedSectionPtr ssptr;
3010 getSection(ssptr, fragInfo->m_sectionPtrI[s]);
3011 release(ssptr);
3012 }
3013 }
3014
3015 /* Release FragmentInfo hash element */
3016 c_fragmentInfoHash.release(curr);
3017
3018 elementsCleaned++;
3019 rtUnitsUsed+=3;
3020 }
3021
3022 rtUnitsUsed++;
3023 } // while
3024
3025 cursor = iter.bucket;
3026 return iter.isNull();
3027 }
3028
3029 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)3030 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
3031 Uint32& cursor,
3032 Uint32& rtUnitsUsed,
3033 Uint32& elementsCleaned)
3034 {
3035 jam();
3036
3037 Ptr<FragmentSendInfo> fragPtr;
3038 const Uint32 NumSendLists = 2;
3039 ndbrequire(cursor < NumSendLists);
3040
3041 FragmentSendInfo_list* fragSendLists[ NumSendLists ] =
3042 { &c_segmentedFragmentSendList,
3043 &c_linearFragmentSendList };
3044
3045 FragmentSendInfo_list* list = fragSendLists[ cursor ];
3046
3047 list->first(fragPtr);
3048 for(; !fragPtr.isNull();){
3049 jam();
3050 Ptr<FragmentSendInfo> copyPtr = fragPtr;
3051 list->next(fragPtr);
3052 rtUnitsUsed++;
3053
3054 NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
3055
3056 if (rg.m_nodes.get(failedNodeId))
3057 {
3058 jam();
3059 /* Fragmented signal is being sent to node */
3060 rg.m_nodes.clear(failedNodeId);
3061
3062 if (rg.m_nodes.isclear())
3063 {
3064 jam();
3065 /* No other nodes in receiver group - send
3066 * is cancelled
3067 * Will be cleaned up in the usual CONTINUE_FRAGMENTED
3068 * handling code.
3069 */
3070 copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
3071 }
3072 elementsCleaned++;
3073 }
3074 }
3075
3076 /* Next time we'll do the next list */
3077 cursor++;
3078
3079 return (cursor == NumSendLists);
3080 }
3081
3082
3083 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)3084 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
3085 Uint32 failedNodeId,
3086 Uint32 resource,
3087 Uint32 cursor,
3088 Uint32 elementsCleaned,
3089 Callback& cb)
3090 {
3091 jam();
3092 const bool userCallback = (cb.m_callbackFunction != 0);
3093 const Uint32 maxRtUnits = userCallback ?
3094 #ifdef VM_TRACE
3095 2 :
3096 #else
3097 16 :
3098 #endif
3099 ~0; /* Must complete all processing in this call */
3100
3101 Uint32 rtUnitsUsed = 0;
3102
3103 /* Loop over resources, cleaning them up */
3104 do
3105 {
3106 bool resourceDone= false;
3107 switch(resource) {
3108 case ContinueFragmented::RES_FRAGSEND:
3109 {
3110 jam();
3111 resourceDone = doCleanupFragSend(failedNodeId, cursor,
3112 rtUnitsUsed, elementsCleaned);
3113 break;
3114 }
3115 case ContinueFragmented::RES_FRAGINFO:
3116 {
3117 jam();
3118 resourceDone = doCleanupFragInfo(failedNodeId, cursor,
3119 rtUnitsUsed, elementsCleaned);
3120 break;
3121 }
3122 case ContinueFragmented::RES_LAST:
3123 {
3124 jam();
3125 /* Node failure processing complete, execute user callback if provided */
3126 if (userCallback)
3127 execute(signal, cb, elementsCleaned);
3128
3129 return elementsCleaned;
3130 }
3131 default:
3132 ndbabort();
3133 }
3134
3135 /* Did we complete cleaning up this resource? */
3136 if (resourceDone)
3137 {
3138 resource++;
3139 cursor= 0;
3140 }
3141
3142 } while (rtUnitsUsed <= maxRtUnits);
3143
3144 jam();
3145
3146 /* Not yet completed failure handling.
3147 * Must have exhausted RT units.
3148 * Update cursor and re-invoke
3149 */
3150 ndbassert(userCallback);
3151
3152 /* Send signal to continue processing */
3153
3154 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3155 sig->type = ContinueFragmented::CONTINUE_CLEANUP;
3156 sig->cleanup.failedNodeId = failedNodeId;
3157 sig->cleanup.resource = resource;
3158 sig->cleanup.cursor = cursor;
3159 sig->cleanup.elementsCleaned= elementsCleaned;
3160 Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
3161 Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
3162 callbackWords;
3163 ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
3164 memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
3165
3166 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
3167
3168 return elementsCleaned;
3169 }
3170
3171 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)3172 SimulatedBlock::simBlockNodeFailure(Signal* signal,
3173 Uint32 failedNodeId,
3174 Callback& cb)
3175 {
3176 jam();
3177 return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
3178 }
3179
3180 Uint32
debugPrintFragmentCounts()3181 SimulatedBlock::debugPrintFragmentCounts()
3182 {
3183 const char* blockName = getBlockName(theNumber);
3184 FragmentInfo_hash::Iterator iter;
3185 Uint32 fragmentInfoCount = 0;
3186 c_fragmentInfoHash.first(iter);
3187
3188 while(!iter.isNull())
3189 {
3190 fragmentInfoCount++;
3191 c_fragmentInfoHash.next(iter);
3192 }
3193
3194 Ptr<FragmentSendInfo> ptr;
3195 Uint32 linSendInfoCount = 0;
3196
3197 c_linearFragmentSendList.first(ptr);
3198
3199 while (!ptr.isNull())
3200 {
3201 linSendInfoCount++;
3202 c_linearFragmentSendList.next(ptr);
3203 }
3204
3205 Uint32 segSendInfoCount = 0;
3206 c_segmentedFragmentSendList.first(ptr);
3207
3208 while (!ptr.isNull())
3209 {
3210 segSendInfoCount++;
3211 c_segmentedFragmentSendList.next(ptr);
3212 }
3213
3214 ndbout_c("%s : Fragment assembly hash entry count : %d",
3215 blockName,
3216 fragmentInfoCount);
3217
3218 ndbout_c("%s : Linear fragment send list size : %d",
3219 blockName,
3220 linSendInfoCount);
3221
3222 ndbout_c("%s : Segmented fragment send list size : %d",
3223 blockName,
3224 segSendInfoCount);
3225
3226 return fragmentInfoCount +
3227 linSendInfoCount +
3228 segSendInfoCount;
3229 }
3230
3231
3232 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)3233 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3234 NodeReceiverGroup rg,
3235 GlobalSignalNumber gsn,
3236 Signal* signal,
3237 Uint32 length,
3238 JobBufferLevel jbuf,
3239 SectionHandle* sections,
3240 bool noRelease,
3241 Uint32 messageSize) {
3242
3243 Uint32 noSections = sections->m_cnt;
3244 SegmentedSectionPtr * ptr = sections->m_ptr;
3245
3246 info.m_sectionPtr[0].m_segmented.i = RNIL;
3247 info.m_sectionPtr[1].m_segmented.i = RNIL;
3248 info.m_sectionPtr[2].m_segmented.i = RNIL;
3249
3250 Uint32 totalSize = 0;
3251 switch(noSections){
3252 case 3:
3253 info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
3254 info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
3255 totalSize += ptr[2].sz;
3256 // Fall through
3257 case 2:
3258 info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
3259 info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
3260 totalSize += ptr[1].sz;
3261 // Fall through
3262 case 1:
3263 info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
3264 info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
3265 totalSize += ptr[0].sz;
3266 }
3267
3268 if(totalSize <= messageSize + SectionSegment::DataLength){
3269 /**
3270 * Send signal directly
3271 */
3272 if (noRelease)
3273 sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
3274 else
3275 sendSignal(rg, gsn, signal, length, jbuf, sections);
3276
3277 info.m_status = FragmentSendInfo::SendComplete;
3278 return true;
3279 }
3280
3281 /**
3282 * Setup info object
3283 */
3284 info.m_status = FragmentSendInfo::SendNotComplete;
3285 info.m_prio = (Uint8)jbuf;
3286 info.m_gsn = gsn;
3287 info.m_fragInfo = 1;
3288 info.m_flags = 0;
3289 info.m_messageSize = messageSize;
3290 info.m_fragmentId = c_fragmentIdCounter++;
3291 info.m_nodeReceiverGroup = rg;
3292 info.m_callback.m_callbackFunction = 0;
3293
3294 if (noRelease)
3295 {
3296 /* Record info that we are not releasing segments */
3297 info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
3298 }
3299 else
3300 {
3301 /**
3302 * Clear sections in caller's handle. Actual send
3303 * will consume them
3304 */
3305 sections->m_cnt = 0;
3306 }
3307
3308 /* Store main signal data in a segment for sending later */
3309 Ptr<SectionSegment> tmp;
3310 if(!import(tmp, &signal->theData[0], length))
3311 {
3312 handle_out_of_longsignal_memory(0);
3313 return false;
3314 }
3315 info.m_theDataSection.p = &tmp.p->theData[0];
3316 info.m_theDataSection.sz = length;
3317 tmp.p->theData[length] = tmp.i;
3318
3319 sendNextSegmentedFragment(signal, info);
3320
3321 if(c_fragmentIdCounter == 0){
3322 /**
3323 * Fragment id 0 is invalid
3324 */
3325 c_fragmentIdCounter = 1;
3326 }
3327
3328 return true;
3329 }
3330
3331 #if 0
3332 #define lsout(x) x
3333 #else
3334 #define lsout(x)
3335 #endif
3336
3337 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)3338 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
3339 FragmentSendInfo & info){
3340
3341 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3342 {
3343 /* Send was cancelled - all dest. nodes have failed
3344 * since send was started
3345 */
3346 if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
3347 {
3348 /*
3349 * Free any sections still to be sent
3350 */
3351 SectionHandle handle(this);
3352 for (Uint32 s = 0; s < 3; s++)
3353 {
3354 Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
3355 if (sectionI != RNIL)
3356 {
3357 getSection(handle.m_ptr[handle.m_cnt], sectionI);
3358 info.m_sectionPtr[s].m_segmented.i = RNIL;
3359 info.m_sectionPtr[s].m_segmented.p = NULL;
3360 handle.m_cnt++;
3361 }
3362 }
3363
3364 releaseSections(handle);
3365 }
3366
3367 /* Free inline signal data storage section */
3368 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3369 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3370
3371 info.m_status = FragmentSendInfo::SendComplete;
3372 return;
3373 }
3374
3375 /**
3376 * Setup main signal data from stored copy
3377 */
3378 const Uint32 sigLen = info.m_theDataSection.sz;
3379 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3380
3381 Uint32 sz = 0;
3382 Uint32 maxSz = info.m_messageSize;
3383
3384 Int32 secNo = 2;
3385 Uint32 secCount = 0;
3386 Uint32 * secNos = &signal->theData[sigLen];
3387
3388 SectionHandle sections(this);
3389 SegmentedSectionPtr *ptr = sections.m_ptr;
3390
3391 bool split= false;
3392 Uint32 splitSectionStartI= RNIL;
3393 SectionSegment* splitSectionStartP= NULL;
3394 Uint32 splitSectionLastSegment= RNIL;
3395 Uint32 splitSectionSz= 0;
3396
3397 enum { Unknown = 0, Full = 1 } loop = Unknown;
3398 for(; secNo >= 0 && secCount < 3; secNo--){
3399 Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
3400 if(ptrI == RNIL)
3401 continue;
3402
3403 info.m_sectionPtr[secNo].m_segmented.i = RNIL;
3404
3405 SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
3406 const Uint32 size = ptrP->m_sz;
3407
3408 ptr[secCount].i = ptrI;
3409 ptr[secCount].p = ptrP;
3410 ptr[secCount].sz = size;
3411 secNos[secCount] = secNo;
3412 secCount++;
3413
3414 const Uint32 sizeLeft = maxSz - sz;
3415 if(size <= sizeLeft){
3416 /**
3417 * The section fits
3418 */
3419 sz += size;
3420 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3421 continue;
3422 }
3423
3424 const Uint32 overflow = size - sizeLeft; // > 0
3425 if(overflow <= SectionSegment::DataLength){
3426 /**
3427 * Only one segment left to send
3428 * send even if sizeLeft <= size
3429 */
3430 lsout(ndbout_c("section %d saved as %d but full over: %d",
3431 secNo, secCount-1, overflow));
3432 secNo--;
3433 break;
3434 }
3435
3436 // size >= 61
3437 if(sizeLeft < SectionSegment::DataLength){
3438 /**
3439 * Less than one segment left (space)
3440 * dont bother sending
3441 */
3442 secCount--;
3443 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3444 loop = Full;
3445 lsout(ndbout_c("section %d not saved", secNo));
3446 break;
3447 }
3448
3449 /**
3450 * Split list
3451 * 1) Find place to split
3452 * 2) Rewrite header (the part that will be sent)
3453 * 3) Write new header (for remaining part)
3454 * 4) Store new header on FragmentSendInfo - record
3455 */
3456 // size >= 61 && sizeLeft >= 60
3457 Uint32 sum = SectionSegment::DataLength;
3458 Uint32 prevPtrI = ptrI;
3459 ptrI = ptrP->m_nextSegment;
3460 const Uint32 fill = sizeLeft - SectionSegment::DataLength;
3461 while (sum <= fill)
3462 {
3463 prevPtrI = ptrI;
3464 ptrP = g_sectionSegmentPool.getPtr(ptrI);
3465 ptrI = ptrP->m_nextSegment;
3466 sum += SectionSegment::DataLength;
3467 }
3468
3469 Uint32 prev = secCount - 1;
3470 /**
3471 * Record details of the section pre-split
3472 * This allows the split to be 'healed' afterwards in the
3473 * no release case.
3474 */
3475 split= true;
3476 splitSectionStartI= ptr[prev].i;
3477 splitSectionStartP= ptr[prev].p;
3478 splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3479 splitSectionSz= splitSectionStartP->m_sz;
3480
3481 /**
3482 * Rewrite header w.r.t size and last
3483 * This is what will be sent in this fragment.
3484 */
3485 splitSectionStartP->m_lastSegment = prevPtrI;
3486 splitSectionStartP->m_sz = sum;
3487 ptr[prev].sz = sum;
3488
3489 /**
3490 * Write "new" list header
3491 * This is what remains to be sent in this section
3492 */
3493 ptrP = g_sectionSegmentPool.getPtr(ptrI);
3494 ptrP->m_lastSegment = splitSectionLastSegment;
3495 ptrP->m_sz = size - sum;
3496
3497 /**
3498 * And store it on info-record
3499 */
3500 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3501 info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3502
3503 loop = Full;
3504 lsout(ndbout_c("section %d split into %d", secNo, prev));
3505 break;
3506 }
3507
3508 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3509 loop, secNo, secCount, sz));
3510
3511 /**
3512 * Store fragment id
3513 */
3514 secNos[secCount] = info.m_fragmentId;
3515
3516 Uint32 fragInfo = info.m_fragInfo;
3517 info.m_fragInfo = 2;
3518 switch(loop){
3519 case Unknown:
3520 if(secNo >= 0){
3521 lsout(ndbout_c("Unknown - Full"));
3522 /**
3523 * Not finished
3524 */
3525 break;
3526 }
3527 // Fall through
3528 lsout(ndbout_c("Unknown - Done"));
3529 info.m_status = FragmentSendInfo::SendComplete;
3530 ndbassert(fragInfo == 2);
3531 fragInfo = 3;
3532 case Full:
3533 break;
3534 }
3535
3536 signal->header.m_fragmentInfo = fragInfo;
3537 signal->header.m_noOfSections = 0;
3538 sections.m_cnt = secCount;
3539
3540 if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3541 {
3542 sendSignalNoRelease(info.m_nodeReceiverGroup,
3543 info.m_gsn,
3544 signal,
3545 sigLen + secCount + 1,
3546 (JobBufferLevel)info.m_prio,
3547 §ions);
3548 /* NoRelease leaves SectionHandle populated, we'll
3549 * clear it here. The actual sections themselves
3550 * remain allocated.
3551 */
3552 sections.m_cnt = 0;
3553
3554 if (split)
3555 {
3556 /* There was a split section, which required us to modify the
3557 * segment list.
3558 * Now restore the split section's segment list back to
3559 * its previous state
3560 * (Only really required for first segment, but we do
3561 * it for all of them, to be a good citizen)
3562 */
3563 ndbrequire( splitSectionStartI != RNIL );
3564 ndbrequire( splitSectionStartP != NULL );
3565 ndbrequire( splitSectionLastSegment != RNIL );
3566
3567 splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3568 splitSectionStartP->m_sz= splitSectionSz;
3569
3570 /* Check our handiwork */
3571 assert(verifySection(splitSectionStartI));
3572 }
3573 }
3574 else
3575 {
3576 /* Normal, release sections case */
3577 sendSignal(info.m_nodeReceiverGroup,
3578 info.m_gsn,
3579 signal,
3580 sigLen + secCount + 1,
3581 (JobBufferLevel)info.m_prio,
3582 §ions);
3583 }
3584
3585 if(fragInfo == 3){
3586 /**
3587 * This is the last signal
3588 * Release saved 'main signal' words segment
3589 */
3590 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3591 }
3592 }
3593
3594 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3595 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3596 NodeReceiverGroup rg,
3597 GlobalSignalNumber gsn,
3598 Signal* signal,
3599 Uint32 length,
3600 JobBufferLevel jbuf,
3601 LinearSectionPtr ptr[3],
3602 Uint32 noOfSections,
3603 Uint32 messageSize){
3604
3605 ndbrequire(signal->header.m_noOfSections == 0);
3606 check_sections(signal, signal->header.m_noOfSections, noOfSections);
3607
3608 info.m_sectionPtr[0].m_linear.p = NULL;
3609 info.m_sectionPtr[1].m_linear.p = NULL;
3610 info.m_sectionPtr[2].m_linear.p = NULL;
3611
3612 Uint32 totalSize = 0;
3613 switch(noOfSections){
3614 case 3:
3615 info.m_sectionPtr[2].m_linear = ptr[2];
3616 totalSize += ptr[2].sz;
3617 // Fall through
3618 case 2:
3619 info.m_sectionPtr[1].m_linear = ptr[1];
3620 totalSize += ptr[1].sz;
3621 // Fall through
3622 case 1:
3623 info.m_sectionPtr[0].m_linear = ptr[0];
3624 totalSize += ptr[0].sz;
3625 }
3626
3627 if(totalSize <= messageSize + SectionSegment::DataLength){
3628 /**
3629 * Send signal directly
3630 */
3631 sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3632 info.m_status = FragmentSendInfo::SendComplete;
3633
3634 /**
3635 * Indicate to sendLinearSignalFragment
3636 * that we'r already done
3637 */
3638 return true;
3639 }
3640
3641 /**
3642 * Setup info object
3643 */
3644 info.m_status = FragmentSendInfo::SendNotComplete;
3645 info.m_prio = (Uint8)jbuf;
3646 info.m_gsn = gsn;
3647 info.m_messageSize = messageSize;
3648 info.m_fragInfo = 1;
3649 info.m_flags = 0;
3650 info.m_fragmentId = c_fragmentIdCounter++;
3651 info.m_nodeReceiverGroup = rg;
3652 info.m_callback.m_callbackFunction = 0;
3653
3654 Ptr<SectionSegment> tmp;
3655 if(unlikely(!import(tmp, &signal->theData[0], length)))
3656 {
3657 handle_out_of_longsignal_memory(0);
3658 return false;
3659 }
3660
3661 info.m_theDataSection.p = &tmp.p->theData[0];
3662 info.m_theDataSection.sz = length;
3663 tmp.p->theData[length] = tmp.i;
3664
3665 sendNextLinearFragment(signal, info);
3666
3667 if(c_fragmentIdCounter == 0){
3668 /**
3669 * Fragment id 0 is invalid
3670 */
3671 c_fragmentIdCounter = 1;
3672 }
3673
3674 return true;
3675 }
3676
3677 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3678 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3679 FragmentSendInfo & info){
3680
3681 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3682 {
3683 /* Send was cancelled - all dest. nodes have failed
3684 * since send was started
3685 */
3686 /* Free inline signal data storage section */
3687 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3688 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3689
3690 info.m_status = FragmentSendInfo::SendComplete;
3691 return;
3692 }
3693
3694 /**
3695 * Store "theData"
3696 */
3697 const Uint32 sigLen = info.m_theDataSection.sz;
3698 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3699
3700 Uint32 sz = 0;
3701 Uint32 maxSz = info.m_messageSize;
3702
3703 Int32 secNo = 2;
3704 Uint32 secCount = 0;
3705 Uint32 * secNos = &signal->theData[sigLen];
3706 LinearSectionPtr signalPtr[3];
3707
3708 enum { Unknown = 0, Full = 2 } loop = Unknown;
3709 for(; secNo >= 0 && secCount < 3; secNo--){
3710 Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3711 if(ptrP == NULL)
3712 continue;
3713
3714 info.m_sectionPtr[secNo].m_linear.p = NULL;
3715 const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3716
3717 signalPtr[secCount].p = ptrP;
3718 signalPtr[secCount].sz = size;
3719 secNos[secCount] = secNo;
3720 secCount++;
3721
3722 const Uint32 sizeLeft = maxSz - sz;
3723 if(size <= sizeLeft){
3724 /**
3725 * The section fits
3726 */
3727 sz += size;
3728 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3729 continue;
3730 }
3731
3732 const Uint32 overflow = size - sizeLeft; // > 0
3733 if(overflow <= SectionSegment::DataLength){
3734 /**
3735 * Only one segment left to send
3736 * send even if sizeLeft <= size
3737 */
3738 lsout(ndbout_c("section %d saved as %d but full over: %d",
3739 secNo, secCount-1, overflow));
3740 secNo--;
3741 break;
3742 }
3743
3744 // size >= 61
3745 if(sizeLeft < SectionSegment::DataLength){
3746 /**
3747 * Less than one segment left (space)
3748 * dont bother sending
3749 */
3750 secCount--;
3751 info.m_sectionPtr[secNo].m_linear.p = ptrP;
3752 loop = Full;
3753 lsout(ndbout_c("section %d not saved", secNo));
3754 break;
3755 }
3756
3757 /**
3758 * Split list
3759 * 1) Find place to split
3760 * 2) Rewrite header (the part that will be sent)
3761 * 3) Write new header (for remaining part)
3762 * 4) Store new header on FragmentSendInfo - record
3763 */
3764 Uint32 sum = sizeLeft;
3765 sum /= SectionSegment::DataLength;
3766 sum *= SectionSegment::DataLength;
3767
3768 /**
3769 * Rewrite header w.r.t size
3770 */
3771 Uint32 prev = secCount - 1;
3772 signalPtr[prev].sz = sum;
3773
3774 /**
3775 * Write/store "new" header
3776 */
3777 info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3778 info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3779
3780 loop = Full;
3781 lsout(ndbout_c("section %d split into %d", secNo, prev));
3782 break;
3783 }
3784
3785 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3786 loop, secNo, secCount, sz));
3787
3788 /**
3789 * Store fragment id
3790 */
3791 secNos[secCount] = info.m_fragmentId;
3792
3793 Uint32 fragInfo = info.m_fragInfo;
3794 info.m_fragInfo = 2;
3795 switch(loop){
3796 case Unknown:
3797 if(secNo >= 0){
3798 lsout(ndbout_c("Unknown - Full"));
3799 /**
3800 * Not finished
3801 */
3802 break;
3803 }
3804 // Fall through
3805 lsout(ndbout_c("Unknown - Done"));
3806 info.m_status = FragmentSendInfo::SendComplete;
3807 ndbassert(fragInfo == 2);
3808 fragInfo = 3;
3809 case Full:
3810 break;
3811 }
3812
3813 signal->header.m_noOfSections = 0;
3814 signal->header.m_fragmentInfo = fragInfo;
3815
3816 sendSignal(info.m_nodeReceiverGroup,
3817 info.m_gsn,
3818 signal,
3819 sigLen + secCount + 1,
3820 (JobBufferLevel)info.m_prio,
3821 signalPtr,
3822 secCount);
3823
3824 if(fragInfo == 3){
3825 /**
3826 * This is the last signal
3827 */
3828 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3829 }
3830 }
3831
3832 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3833 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3834 GlobalSignalNumber gsn,
3835 Signal* signal,
3836 Uint32 length,
3837 JobBufferLevel jbuf,
3838 SectionHandle* sections,
3839 Callback & c,
3840 Uint32 messageSize){
3841 bool res = true;
3842 Ptr<FragmentSendInfo> tmp;
3843 res = c_segmentedFragmentSendList.seizeFirst(tmp);
3844 ndbrequire(res);
3845
3846 res = sendFirstFragment(* tmp.p,
3847 NodeReceiverGroup(ref),
3848 gsn,
3849 signal,
3850 length,
3851 jbuf,
3852 sections,
3853 false, // Release sections on send
3854 messageSize);
3855 ndbrequire(res);
3856
3857 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3858 c_segmentedFragmentSendList.release(tmp);
3859 if(c.m_callbackFunction != 0)
3860 execute(signal, c, 0);
3861 return;
3862 }
3863 tmp.p->m_callback = c;
3864
3865 if(!c_fragSenderRunning)
3866 {
3867 SaveSignal<2> save(signal);
3868 c_fragSenderRunning = true;
3869 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3870 sig->type = ContinueFragmented::CONTINUE_SENDING;
3871 sig->line = __LINE__;
3872 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3873 }
3874 }
3875
3876 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3877 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3878 GlobalSignalNumber gsn,
3879 Signal* signal,
3880 Uint32 length,
3881 JobBufferLevel jbuf,
3882 SectionHandle * sections,
3883 Callback & c,
3884 Uint32 messageSize){
3885 bool res = true;
3886 Ptr<FragmentSendInfo> tmp;
3887 res = c_segmentedFragmentSendList.seizeFirst(tmp);
3888 ndbrequire(res);
3889
3890 res = sendFirstFragment(* tmp.p,
3891 rg,
3892 gsn,
3893 signal,
3894 length,
3895 jbuf,
3896 sections,
3897 false, // Release sections on send
3898 messageSize);
3899 ndbrequire(res);
3900
3901 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3902 c_segmentedFragmentSendList.release(tmp);
3903 if(c.m_callbackFunction != 0)
3904 execute(signal, c, 0);
3905 return;
3906 }
3907 tmp.p->m_callback = c;
3908
3909 if(!c_fragSenderRunning)
3910 {
3911 SaveSignal<2> save(signal);
3912 c_fragSenderRunning = true;
3913 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3914 sig->type = ContinueFragmented::CONTINUE_SENDING;
3915 sig->line = __LINE__;
3916 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3917 }
3918 }
3919
3920 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3921 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3922 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3923 { abort(); /* should never be called */ }
3924 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3925 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3926
3927 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3928 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3929 GlobalSignalNumber gsn,
3930 Signal* signal,
3931 Uint32 length,
3932 JobBufferLevel jbuf,
3933 LinearSectionPtr ptr[3],
3934 Uint32 noOfSections,
3935 Callback & c,
3936 Uint32 messageSize){
3937 bool res = true;
3938 Ptr<FragmentSendInfo> tmp;
3939 res = c_linearFragmentSendList.seizeFirst(tmp);
3940 ndbrequire(res);
3941
3942 res = sendFirstFragment(* tmp.p,
3943 NodeReceiverGroup(ref),
3944 gsn,
3945 signal,
3946 length,
3947 jbuf,
3948 ptr,
3949 noOfSections,
3950 messageSize);
3951 ndbrequire(res);
3952
3953 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3954 c_linearFragmentSendList.release(tmp);
3955 if(c.m_callbackFunction != 0)
3956 execute(signal, c, 0);
3957 return;
3958 }
3959 tmp.p->m_callback = c;
3960
3961 if(!c_fragSenderRunning)
3962 {
3963 SaveSignal<2> save(signal);
3964 c_fragSenderRunning = true;
3965 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3966 sig->type = ContinueFragmented::CONTINUE_SENDING;
3967 sig->line = __LINE__;
3968 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3969 }
3970 }
3971
3972 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3973 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3974 GlobalSignalNumber gsn,
3975 Signal* signal,
3976 Uint32 length,
3977 JobBufferLevel jbuf,
3978 LinearSectionPtr ptr[3],
3979 Uint32 noOfSections,
3980 Callback & c,
3981 Uint32 messageSize){
3982 bool res = true;
3983 Ptr<FragmentSendInfo> tmp;
3984 res = c_linearFragmentSendList.seizeFirst(tmp);
3985 ndbrequire(res);
3986
3987 res = sendFirstFragment(* tmp.p,
3988 rg,
3989 gsn,
3990 signal,
3991 length,
3992 jbuf,
3993 ptr,
3994 noOfSections,
3995 messageSize);
3996 ndbrequire(res);
3997
3998 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3999 c_linearFragmentSendList.release(tmp);
4000 if(c.m_callbackFunction != 0)
4001 execute(signal, c, 0);
4002 return;
4003 }
4004 tmp.p->m_callback = c;
4005
4006 if(!c_fragSenderRunning)
4007 {
4008 SaveSignal<2> save(signal);
4009 c_fragSenderRunning = true;
4010 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
4011 sig->type = ContinueFragmented::CONTINUE_SENDING;
4012 sig->line = __LINE__;
4013 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
4014 }
4015 }
4016
4017 void
sendBatchedFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Callback & c,Uint32 messageSize)4018 SimulatedBlock::sendBatchedFragmentedSignal(BlockReference ref,
4019 GlobalSignalNumber gsn,
4020 Signal* signal,
4021 Uint32 length,
4022 JobBufferLevel jbuf,
4023 SectionHandle* sections,
4024 bool noRelease,
4025 Callback & c,
4026 Uint32 messageSize)
4027 {
4028 jam();
4029 bool res = true;
4030 FragmentSendInfo fragSendInfo;
4031
4032 const Uint32 noOfSections = sections->m_cnt;
4033 SegmentedSectionPtr * const ptr = sections->m_ptr;
4034 const Uint32 totalSize =
4035 (noOfSections >= 1 ? ptr[0].sz : 0) +
4036 (noOfSections >= 2 ? ptr[1].sz : 0) +
4037 (noOfSections >= 3 ? ptr[2].sz : 0);
4038
4039 res = sendFirstFragment(fragSendInfo,
4040 NodeReceiverGroup(ref),
4041 gsn,
4042 signal,
4043 length,
4044 jbuf,
4045 sections,
4046 noRelease,
4047 messageSize);
4048 ndbrequire(res);
4049
4050 Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4051
4052 while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4053 {
4054 jam();
4055 guard--;
4056 // Send remaining fragments
4057 sendNextSegmentedFragment(signal, fragSendInfo);
4058 }
4059
4060 ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4061
4062 if (c.m_callbackFunction != nullptr)
4063 {
4064 jam();
4065 execute(signal, c, 0);
4066 }
4067 return;
4068 }
4069
4070 void
sendBatchedFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Callback & c,Uint32 messageSize)4071 SimulatedBlock::sendBatchedFragmentedSignal(NodeReceiverGroup rg,
4072 GlobalSignalNumber gsn,
4073 Signal* signal,
4074 Uint32 length,
4075 JobBufferLevel jbuf,
4076 SectionHandle * sections,
4077 bool noRelease,
4078 Callback & c,
4079 Uint32 messageSize)
4080 {
4081 jam();
4082 bool res = true;
4083 FragmentSendInfo fragSendInfo;
4084
4085 const Uint32 noOfSections = sections->m_cnt;
4086 SegmentedSectionPtr * const ptr = sections->m_ptr;
4087 const Uint32 totalSize =
4088 (noOfSections >= 1 ? ptr[0].sz : 0) +
4089 (noOfSections >= 2 ? ptr[1].sz : 0) +
4090 (noOfSections >= 3 ? ptr[2].sz : 0);
4091
4092 res = sendFirstFragment(fragSendInfo,
4093 rg,
4094 gsn,
4095 signal,
4096 length,
4097 jbuf,
4098 sections,
4099 noRelease,
4100 messageSize);
4101 ndbrequire(res);
4102
4103 Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4104
4105 while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4106 {
4107 jam();
4108 guard--;
4109 // Send remaining fragments
4110 sendNextSegmentedFragment(signal, fragSendInfo);
4111 }
4112
4113 ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4114
4115 if (c.m_callbackFunction != nullptr)
4116 {
4117 jam();
4118 execute(signal, c, 0);
4119 }
4120 return;
4121 }
4122
4123 void
sendBatchedFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)4124 SimulatedBlock::sendBatchedFragmentedSignal(BlockReference ref,
4125 GlobalSignalNumber gsn,
4126 Signal* signal,
4127 Uint32 length,
4128 JobBufferLevel jbuf,
4129 LinearSectionPtr ptr[3],
4130 Uint32 noOfSections,
4131 Callback & c,
4132 Uint32 messageSize)
4133 {
4134 jam();
4135 bool res = true;
4136 FragmentSendInfo fragSendInfo;
4137
4138 res = sendFirstFragment(fragSendInfo,
4139 NodeReceiverGroup(ref),
4140 gsn,
4141 signal,
4142 length,
4143 jbuf,
4144 ptr,
4145 noOfSections,
4146 messageSize);
4147 ndbrequire(res);
4148
4149 const Uint32 totalSize =
4150 (noOfSections >= 1 ? ptr[0].sz : 0) +
4151 (noOfSections >= 2 ? ptr[1].sz : 0) +
4152 (noOfSections >= 3 ? ptr[2].sz : 0);
4153
4154 Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4155
4156 while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4157 {
4158 jam();
4159 guard--;
4160 // Send remaining fragments
4161 sendNextLinearFragment(signal, fragSendInfo);
4162 }
4163
4164 ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4165
4166 if (c.m_callbackFunction != nullptr)
4167 {
4168 execute(signal, c, 0);
4169 }
4170 return;
4171 }
4172
4173 void
sendBatchedFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)4174 SimulatedBlock::sendBatchedFragmentedSignal(NodeReceiverGroup rg,
4175 GlobalSignalNumber gsn,
4176 Signal* signal,
4177 Uint32 length,
4178 JobBufferLevel jbuf,
4179 LinearSectionPtr ptr[3],
4180 Uint32 noOfSections,
4181 Callback & c,
4182 Uint32 messageSize)
4183 {
4184 jam();
4185 bool res = true;
4186 FragmentSendInfo fragSendInfo;
4187
4188 res = sendFirstFragment(fragSendInfo,
4189 rg,
4190 gsn,
4191 signal,
4192 length,
4193 jbuf,
4194 ptr,
4195 noOfSections,
4196 messageSize);
4197 ndbrequire(res);
4198
4199 const Uint32 totalSize =
4200 (noOfSections >= 1 ? ptr[0].sz : 0) +
4201 (noOfSections >= 2 ? ptr[1].sz : 0) +
4202 (noOfSections >= 3 ? ptr[2].sz : 0);
4203
4204 Uint32 guard = totalSize / messageSize + 1 + noOfSections;
4205
4206 while (guard > 0 && fragSendInfo.m_status != FragmentSendInfo::SendComplete)
4207 {
4208 jam();
4209 guard--;
4210 // Send remaining fragments
4211 sendNextLinearFragment(signal, fragSendInfo);
4212 }
4213
4214 ndbrequire(fragSendInfo.m_status == FragmentSendInfo::SendComplete);
4215
4216 if (c.m_callbackFunction != nullptr)
4217 {
4218 execute(signal, c, 0);
4219 }
4220 return;
4221 }
4222
4223
4224 NodeInfo &
setNodeInfo(NodeId nodeId)4225 SimulatedBlock::setNodeInfo(NodeId nodeId) {
4226 ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
4227 return globalData.m_nodeInfo[nodeId];
4228 }
4229
4230 bool
isMultiThreaded()4231 SimulatedBlock::isMultiThreaded()
4232 {
4233 #ifdef NDBD_MULTITHREADED
4234 return true;
4235 #else
4236 return false;
4237 #endif
4238 }
4239
4240
4241 void
execUTIL_CREATE_LOCK_REF(Signal * signal)4242 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
4243 jamEntry();
4244 c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
4245 }
4246
execUTIL_CREATE_LOCK_CONF(Signal * signal)4247 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
4248 jamEntry();
4249 c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
4250 }
4251
execUTIL_DESTORY_LOCK_REF(Signal * signal)4252 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
4253 jamEntry();
4254 c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
4255 }
4256
execUTIL_DESTORY_LOCK_CONF(Signal * signal)4257 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
4258 jamEntry();
4259 c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
4260 }
4261
execUTIL_LOCK_REF(Signal * signal)4262 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
4263 jamEntry();
4264 c_mutexMgr.execUTIL_LOCK_REF(signal);
4265 }
4266
execUTIL_LOCK_CONF(Signal * signal)4267 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
4268 jamEntry();
4269 c_mutexMgr.execUTIL_LOCK_CONF(signal);
4270 }
4271
execUTIL_UNLOCK_REF(Signal * signal)4272 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
4273 jamEntry();
4274 c_mutexMgr.execUTIL_UNLOCK_REF(signal);
4275 }
4276
execUTIL_UNLOCK_CONF(Signal * signal)4277 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
4278 jamEntry();
4279 c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
4280 }
4281
4282 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)4283 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
4284 Uint32 ptrI, Uint32 retVal){
4285 c_mutexMgr.release(ptrI);
4286 }
4287
4288 void
fsRefError(Signal * signal,Uint32 line,const char * msg)4289 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
4290 {
4291 const FsRef *fsRef = (FsRef*)signal->getDataPtr();
4292 Uint32 errorCode = fsRef->errorCode;
4293 Uint32 osErrorCode = fsRef->osErrorCode;
4294 char msg2[100];
4295
4296 sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
4297
4298 progError(line, errorCode, msg2);
4299 }
4300
4301 void
execFSWRITEREF(Signal * signal)4302 SimulatedBlock::execFSWRITEREF(Signal* signal)
4303 {
4304 fsRefError(signal, __LINE__, "File system write failed");
4305 }
4306
4307 void
execFSREADREF(Signal * signal)4308 SimulatedBlock::execFSREADREF(Signal* signal)
4309 {
4310 fsRefError(signal, __LINE__, "File system read failed");
4311 }
4312
4313 void
execFSCLOSEREF(Signal * signal)4314 SimulatedBlock::execFSCLOSEREF(Signal* signal)
4315 {
4316 fsRefError(signal, __LINE__, "File system close failed");
4317 }
4318
4319 void
execFSOPENREF(Signal * signal)4320 SimulatedBlock::execFSOPENREF(Signal* signal)
4321 {
4322 fsRefError(signal, __LINE__, "File system open failed");
4323 }
4324
4325 void
execFSREMOVEREF(Signal * signal)4326 SimulatedBlock::execFSREMOVEREF(Signal* signal)
4327 {
4328 fsRefError(signal, __LINE__, "File system remove failed");
4329 }
4330
4331 void
execFSSYNCREF(Signal * signal)4332 SimulatedBlock::execFSSYNCREF(Signal* signal)
4333 {
4334 fsRefError(signal, __LINE__, "File system sync failed");
4335 }
4336
4337 void
execFSAPPENDREF(Signal * signal)4338 SimulatedBlock::execFSAPPENDREF(Signal* signal)
4339 {
4340 fsRefError(signal, __LINE__, "File system append failed");
4341 }
4342
4343 #if defined(USE_INIT_GLOBAL_VARIABLES)
4344 void
disable_global_variables()4345 SimulatedBlock::disable_global_variables()
4346 {
4347 #ifdef NDBD_MULTITHREADED
4348 mt_disable_global_variables(m_threadId);
4349 #endif
4350 }
4351
4352 void
enable_global_variables()4353 SimulatedBlock::enable_global_variables()
4354 {
4355 #ifdef NDBD_MULTITHREADED
4356 mt_enable_global_variables(m_threadId);
4357 #endif
4358 }
4359
4360 void
init_global_ptrs(void ** tmp,size_t cnt)4361 SimulatedBlock::init_global_ptrs(void ** tmp, size_t cnt)
4362 {
4363 #ifdef NDBD_MULTITHREADED
4364 mt_init_global_variables_ptr_instances(m_threadId, tmp, cnt);
4365 #endif
4366 }
4367
4368 void
init_global_uint32_ptrs(void ** tmp,size_t cnt)4369 SimulatedBlock::init_global_uint32_ptrs(void ** tmp, size_t cnt)
4370 {
4371 #ifdef NDBD_MULTITHREADED
4372 mt_init_global_variables_uint32_ptr_instances(m_threadId, tmp, cnt);
4373 #endif
4374 }
4375
4376 void
init_global_uint32(void ** tmp,size_t cnt)4377 SimulatedBlock::init_global_uint32(void ** tmp, size_t cnt)
4378 {
4379 #ifdef NDBD_MULTITHREADED
4380 mt_init_global_variables_uint32_instances(m_threadId, tmp, cnt);
4381 #endif
4382 }
4383 #endif
4384
4385 int
cmp_key(Uint32 tab,const Uint32 * s1,const Uint32 * s2) const4386 SimulatedBlock::cmp_key(Uint32 tab, const Uint32 *s1, const Uint32 *s2) const
4387 {
4388 const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
4389 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4390
4391 for (Uint32 i = 0; i < noOfKeyAttr; i++)
4392 {
4393 const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
4394 const Uint32 attrDesc = keyAttr.attributeDescriptor;
4395 const Uint32 srcBytes = AttributeDescriptor::getSizeInBytes(attrDesc);
4396
4397 const int res = cmp_attr(attrDesc, keyAttr.charsetInfo,
4398 s1, srcBytes, s2, srcBytes);
4399 if (res != 0)
4400 return res;
4401
4402 if (i+1 < noOfKeyAttr) //Optimization; skip if last keyAttr
4403 {
4404 const Uint32 typeId = AttributeDescriptor::getType(attrDesc);
4405 Uint32 lb, len;
4406 ndbrequire(NdbSqlUtil::get_var_length(typeId, s1, srcBytes, lb, len));
4407 s1 += ((len+lb+3) >> 2);
4408
4409 ndbrequire(NdbSqlUtil::get_var_length(typeId, s2, srcBytes, lb, len));
4410 s2 += ((len+lb+3) >> 2);
4411 }
4412 }
4413 // Fall through: Compared equal
4414 return 0;
4415 }
4416
4417 int
cmp_attr(Uint32 attrDesc,const CHARSET_INFO * cs,const Uint32 * s1,Uint32 s1Len,const Uint32 * s2,Uint32 s2Len) const4418 SimulatedBlock::cmp_attr(Uint32 attrDesc, const CHARSET_INFO* cs,
4419 const Uint32 *s1, Uint32 s1Len,
4420 const Uint32 *s2, Uint32 s2Len) const
4421 {
4422 const Uint32 typeId = AttributeDescriptor::getType(attrDesc);
4423 NdbSqlUtil::Cmp *cmp = NdbSqlUtil::getType(typeId).m_cmp;
4424 return (*cmp)(cs, s1, s1Len, s2, s1Len);
4425 }
4426
4427
4428 Uint32
xfrm_key_hash(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const4429 SimulatedBlock::xfrm_key_hash(
4430 Uint32 tab, const Uint32* src,
4431 Uint32 *dst, Uint32 dstSize,
4432 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
4433 {
4434 const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
4435 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4436
4437 Uint32 i = 0;
4438 Uint32 srcPos = 0;
4439 Uint32 dstPos = 0;
4440 while (i < noOfKeyAttr)
4441 {
4442 const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
4443 Uint32 dstWords =
4444 xfrm_attr_hash(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
4445 src, srcPos, dst, dstPos, dstSize);
4446 keyPartLen[i++] = dstWords;
4447 if (unlikely(dstWords == 0))
4448 return 0;
4449 }
4450
4451 if (0)
4452 {
4453 for(Uint32 i = 0; i<dstPos; i++)
4454 {
4455 printf("%.8x ", dst[i]);
4456 }
4457 printf("\n");
4458 }
4459 return dstPos;
4460 }
4461
4462 Uint32
xfrm_attr_hash(Uint32 attrDesc,const CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const4463 SimulatedBlock::xfrm_attr_hash(
4464 Uint32 attrDesc, const CHARSET_INFO* cs,
4465 const Uint32* src, Uint32 & srcPos,
4466 Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
4467 {
4468 Uint32 array =
4469 AttributeDescriptor::getArrayType(attrDesc);
4470 Uint32 srcBytes =
4471 AttributeDescriptor::getSizeInBytes(attrDesc);
4472
4473 Uint32 srcWords = ~0;
4474 Uint32 dstWords = ~0;
4475 uchar* dstPtr = (uchar*)&dst[dstPos];
4476 const uchar* srcPtr = (const uchar*)&src[srcPos];
4477
4478 if (cs == NULL)
4479 {
4480 jam();
4481 Uint32 len = 0;
4482 switch(array){
4483 case NDB_ARRAYTYPE_SHORT_VAR:
4484 len = 1 + srcPtr[0];
4485 break;
4486 case NDB_ARRAYTYPE_MEDIUM_VAR:
4487 len = 2 + srcPtr[0] + (srcPtr[1] << 8);
4488 break;
4489 #ifndef VM_TRACE
4490 default:
4491 abort();
4492 #endif
4493 case NDB_ARRAYTYPE_FIXED:
4494 len = srcBytes;
4495 }
4496 srcWords = (len + 3) >> 2;
4497 dstWords = srcWords;
4498 memcpy(dstPtr, srcPtr, dstWords << 2);
4499
4500 if (0)
4501 {
4502 ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
4503 srcPos, dstPos, len, srcWords, dstWords);
4504
4505 for(Uint32 i = 0; i<srcWords; i++)
4506 printf("%.8x ", src[srcPos + i]);
4507 printf("\n");
4508 }
4509 }
4510 else
4511 {
4512 jam();
4513 Uint32 typeId =
4514 AttributeDescriptor::getType(attrDesc);
4515 Uint32 lb, len;
4516 bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
4517 if (unlikely(!ok))
4518 return 0;
4519
4520 // remLen: Remaining dst-buffer length
4521 // len: Actual length of 'src'
4522 // defLen: Max defined length of src data
4523 const unsigned remLen = ((dstSize - dstPos) << 2);
4524 const unsigned defLen = srcBytes - lb;
4525 int n = NdbSqlUtil::strnxfrm_hash(cs, typeId,
4526 dstPtr, remLen,
4527 srcPtr + lb, len, defLen);
4528
4529 if (unlikely(n == -1))
4530 return 0;
4531 while ((n & 3) != 0)
4532 {
4533 dstPtr[n++] = 0;
4534 }
4535 dstWords = (n >> 2);
4536 srcWords = (lb + len + 3) >> 2;
4537 }
4538
4539 dstPos += dstWords;
4540 srcPos += srcWords;
4541 return dstWords;
4542 }
4543
4544 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const4545 SimulatedBlock::create_distr_key(Uint32 tableId,
4546 const Uint32 *src,
4547 Uint32* dst,
4548 const Uint32
4549 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
4550 {
4551 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
4552 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
4553 Uint32 noOfDistrKeys = desc->noOfDistrKeys;
4554
4555 Uint32 i = 0;
4556 Uint32 dstPos = 0;
4557
4558 /* --Note that src and dst may be the same location-- */
4559
4560 if(keyPartLen)
4561 {
4562 while (i < noOfKeyAttr && noOfDistrKeys)
4563 {
4564 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
4565 Uint32 len = keyPartLen[i];
4566 if(AttributeDescriptor::getDKey(attr))
4567 {
4568 noOfDistrKeys--;
4569 memmove(dst+dstPos, src, len << 2);
4570 dstPos += len;
4571 }
4572 src += len;
4573 i++;
4574 }
4575 }
4576 else
4577 {
4578 while (i < noOfKeyAttr && noOfDistrKeys)
4579 {
4580 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
4581 Uint32 len = AttributeDescriptor::getSizeInWords(attr);
4582 ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
4583 if(AttributeDescriptor::getDKey(attr))
4584 {
4585 noOfDistrKeys--;
4586 memmove(dst+dstPos, src, len << 2);
4587 dstPos += len;
4588 }
4589 src += len;
4590 i++;
4591 }
4592 }
4593 return dstPos;
4594 }
4595
4596 CArray<KeyDescriptor> g_key_descriptor_pool;
4597
4598 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)4599 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
4600 Uint32 dst[],
4601 Uint32 dstcnt,
4602 Uint32 gsn,
4603 Signal * signal,
4604 Uint32 sigLen,
4605 JobBufferLevel prio,
4606 SectionHandle * userhandle)
4607 {
4608 ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
4609 pathcnt--; // first hop is made from here
4610
4611
4612 Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
4613 ndbrequire(len <= 25);
4614
4615 SectionHandle handle(this, signal);
4616 if (userhandle)
4617 {
4618 jam();
4619 handle.m_cnt = userhandle->m_cnt;
4620 for (Uint32 i = 0; i<handle.m_cnt; i++)
4621 handle.m_ptr[i] = userhandle->m_ptr[i];
4622 userhandle->m_cnt = 0;
4623 }
4624
4625 if (len + sigLen > 25)
4626 {
4627 jam();
4628
4629 /**
4630 * we need to store theData in a section
4631 */
4632 ndbrequire(handle.m_cnt < 3);
4633 handle.m_ptr[2] = handle.m_ptr[1];
4634 handle.m_ptr[1] = handle.m_ptr[0];
4635 Ptr<SectionSegment> tmp;
4636 if (unlikely(! import(tmp, signal->theData, sigLen)))
4637 {
4638 handle_out_of_longsignal_memory(0);
4639 }
4640 handle.m_ptr[0].p = tmp.p;
4641 handle.m_ptr[0].i = tmp.i;
4642 handle.m_ptr[0].sz = sigLen;
4643 handle.m_cnt ++;
4644 }
4645 else
4646 {
4647 jam();
4648 memmove(signal->theData + len, signal->theData, 4 * sigLen);
4649 len += sigLen;
4650 }
4651
4652 LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
4653 ord->cnt = (pathcnt << 16) | (dstcnt);
4654 ord->gsn = gsn;
4655 ord->prio = Uint32(prio);
4656
4657 Uint32 * dstptr = ord->path;
4658 for (Uint32 i = 1; i <= pathcnt; i++)
4659 {
4660 ndbrequire(refToNode(path[i].ref) == 0 ||
4661 refToNode(path[i].ref) == getOwnNodeId());
4662
4663 * dstptr++ = path[i].ref;
4664 * dstptr++ = Uint32(path[i].prio);
4665 }
4666
4667 for (Uint32 i = 0; i<dstcnt; i++)
4668 {
4669 ndbrequire(refToNode(dst[i]) == 0 ||
4670 refToNode(dst[i]) == getOwnNodeId());
4671
4672 * dstptr++ = dst[i];
4673 }
4674
4675 sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4676 path[0].prio, &handle);
4677 }
4678
4679 void
execLOCAL_ROUTE_ORD(Signal * signal)4680 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
4681 {
4682 jamEntry();
4683
4684 if (!assembleFragments(signal))
4685 {
4686 jam();
4687 return;
4688 }
4689
4690 if (ERROR_INSERTED(1001))
4691 {
4692 /**
4693 * This NDBCNTR error code 1001
4694 */
4695 jam();
4696 SectionHandle handle(this, signal);
4697 sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
4698 signal->getLength(), &handle);
4699 return;
4700 }
4701
4702 LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
4703 Uint32 pathcnt = ord->cnt >> 16;
4704 Uint32 dstcnt = ord->cnt & 0xFFFF;
4705 Uint32 sigLen = signal->getLength();
4706
4707 if (pathcnt == 0)
4708 {
4709 /**
4710 * Send to final destination(s);
4711 */
4712 jam();
4713 Uint32 gsn = ord->gsn;
4714 Uint32 prio = ord->prio;
4715 memcpy(signal->theData+25, ord->path, 4*dstcnt);
4716 SectionHandle handle(this, signal);
4717 if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
4718 {
4719 jam();
4720 /**
4721 * Data is at end of this...
4722 */
4723 memmove(signal->theData,
4724 signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4725 4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4726 sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4727 }
4728 else
4729 {
4730 jam();
4731 /**
4732 * Put section 0 in signal->theData
4733 */
4734 sigLen = handle.m_ptr[0].sz;
4735 ndbrequire(sigLen <= 25);
4736 copy(signal->theData, handle.m_ptr[0]);
4737 release(handle.m_ptr[0]);
4738
4739 for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4740 handle.m_ptr[i] = handle.m_ptr[i+1];
4741 handle.m_cnt--;
4742 }
4743
4744 /*
4745 * The extra if-statement is as sendSignalNoRelease will copy sections
4746 * which is not necessary is only sending to one destination
4747 */
4748 if (dstcnt > 1)
4749 {
4750 jam();
4751 for (Uint32 i = 0; i<dstcnt; i++)
4752 {
4753 jam();
4754 sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4755 JobBufferLevel(prio), &handle);
4756 }
4757 releaseSections(handle);
4758 }
4759 else
4760 {
4761 jam();
4762 sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4763 JobBufferLevel(prio), &handle);
4764 }
4765 }
4766 else
4767 {
4768 /**
4769 * Reroute
4770 */
4771 jam();
4772 SectionHandle handle(this, signal);
4773 Uint32 ref = ord->path[0];
4774 Uint32 prio = ord->path[1];
4775 Uint32 len = sigLen - 2;
4776 ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4777 memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4778 sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4779 JobBufferLevel(prio), &handle);
4780 }
4781 }
4782
4783
4784 #ifdef VM_TRACE
4785 bool
debugOutOn()4786 SimulatedBlock::debugOutOn()
4787 {
4788 return true;
4789 SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4790 return
4791 globalData.testOn &&
4792 globalSignalLoggers.logMatch(number(), mask);
4793 }
4794
4795 const char*
debugOutTag(char * buf,int line)4796 SimulatedBlock::debugOutTag(char *buf, int line)
4797 {
4798 char blockbuf[40];
4799 char instancebuf[40];
4800 char linebuf[40];
4801 char timebuf[40];
4802 sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4803 instancebuf[0] = 0;
4804 if (instance() != 0)
4805 sprintf(instancebuf, "/%u", instance());
4806 sprintf(linebuf, " %d", line);
4807 timebuf[0] = 0;
4808 #ifdef VM_TRACE_TIME
4809 {
4810 Uint64 t = NdbTick_CurrentMillisecond();
4811 uint s = (t / 1000) % 3600;
4812 uint ms = t % 1000;
4813 sprintf(timebuf, " - %u.%03u -", s, ms);
4814 }
4815 #endif
4816 sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4817 return buf;
4818 }
4819 #endif
4820
4821 #ifdef NDBD_MULTITHREADED
4822 // Leave synchronize_threads() undefined for ndbd where it should not be used.
4823 void
synchronize_threads(Signal * signal,const BlockThreadBitmask & threads,const Callback & cb,JobBufferLevel req_prio,JobBufferLevel conf_prio)4824 SimulatedBlock::synchronize_threads(Signal * signal,
4825 const BlockThreadBitmask& threads,
4826 const Callback & cb,
4827 JobBufferLevel req_prio,
4828 JobBufferLevel conf_prio)
4829 {
4830 jam();
4831
4832 Ptr<SyncThreadRecord> ptr;
4833 ndbrequire(c_syncThreadPool.seize(ptr));
4834 ptr.p->m_threads = threads;
4835 ptr.p->m_cnt = 0;
4836 ptr.p->m_next = 0;
4837 ptr.p->m_callback = cb;
4838
4839 const Uint32 cnt = threads.count();
4840 if (cnt == 0)
4841 {
4842 jam();
4843 Callback copy = cb;
4844 c_syncThreadPool.release(ptr);
4845 execute(signal, copy, 0);
4846 return;
4847 }
4848
4849 signal->theData[0] = reference();
4850 signal->theData[1] = ptr.i;
4851 signal->theData[2] = Uint32(req_prio);
4852 signal->theData[3] = Uint32(conf_prio);
4853 ptr.p->m_next = ptr.p->m_threads.find_first();
4854 sendSYNC_THREAD_REQ(signal, ptr);
4855 }
4856 #endif
4857
4858 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel req_prio,JobBufferLevel conf_prio)4859 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4860 const Uint32 blocks[],
4861 const Callback & cb,
4862 JobBufferLevel req_prio,
4863 JobBufferLevel conf_prio)
4864 {
4865 #ifndef NDBD_MULTITHREADED
4866 Callback copy = cb;
4867 execute(signal, copy, 0);
4868 #else
4869 jam();
4870
4871 BlockThreadBitmask threads;
4872
4873 mt_get_threads_for_blocks_no_proxy(blocks, threads);
4874
4875 if (conf_prio == ILLEGAL_JB_LEVEL)
4876 {
4877 conf_prio = req_prio;
4878 }
4879 synchronize_threads(signal, threads, cb, req_prio, conf_prio);
4880 #endif
4881 }
4882
4883 void
sendSYNC_THREAD_REQ(Signal * signal,Ptr<SyncThreadRecord> ptr)4884 SimulatedBlock::sendSYNC_THREAD_REQ(Signal* signal, Ptr<SyncThreadRecord> ptr)
4885 {
4886 JobBufferLevel req_prio = JobBufferLevel(signal->theData[2]);
4887 Uint32 instance = ptr.p->m_next;
4888 constexpr Uint32 MAX_FAN_OUT = 4;
4889 constexpr Uint32 MAX_INFLIGHT = 50;
4890 for (Uint32 fan_out = 0;
4891 ptr.p->m_cnt < MAX_INFLIGHT &&
4892 fan_out < MAX_FAN_OUT &&
4893 instance != BlockThreadBitmask::NotFound;
4894 fan_out++, instance = ptr.p->m_threads.find_next(instance + 1))
4895 {
4896 Uint32 ref = numberToRef(THRMAN, instance, 0);
4897 sendSignal(ref, GSN_SYNC_THREAD_REQ, signal, 4, req_prio);
4898 ptr.p->m_cnt++;
4899 }
4900 ptr.p->m_next = instance;
4901 }
4902
4903 void
execSYNC_THREAD_REQ(Signal * signal)4904 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4905 {
4906 jamEntry();
4907 Uint32 ref = signal->theData[0];
4908 Uint32 conf_prio = signal->theData[3];
4909 sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4910 JobBufferLevel(conf_prio));
4911 }
4912
4913 void
execSYNC_THREAD_CONF(Signal * signal)4914 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4915 {
4916 jamEntry();
4917 Ptr<SyncThreadRecord> ptr;
4918 c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4919
4920 ndbrequire(ptr.p->m_cnt > 0);
4921 ptr.p->m_cnt--;
4922
4923 sendSYNC_THREAD_REQ(signal, ptr);
4924
4925 if (ptr.p->m_cnt > 0)
4926 {
4927 jam();
4928 return;
4929 }
4930
4931 Callback copy = ptr.p->m_callback;
4932 c_syncThreadPool.release(ptr);
4933 execute(signal, copy, 0);
4934 return;
4935 }
4936
4937 void
execSYNC_REQ(Signal * signal)4938 SimulatedBlock::execSYNC_REQ(Signal* signal)
4939 {
4940 jamEntry();
4941 Uint32 ref = signal->theData[0];
4942 Uint32 prio = signal->theData[2];
4943 sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4944 JobBufferLevel(prio));
4945 }
4946
4947 void
synchronize_external_signals(Signal * signal,const Callback & cb)4948 SimulatedBlock::synchronize_external_signals(Signal* signal, const Callback& cb)
4949 {
4950 #ifndef NDBD_MULTITHREADED
4951 Callback copy = cb;
4952 execute(signal, copy, 0);
4953 #else
4954 jam();
4955
4956 BlockThreadBitmask threads;
4957
4958 const Uint32 my_thr_no = getThreadId();
4959 Uint32 cnt = mt_get_addressable_threads(my_thr_no, threads);
4960
4961 // Assume current thread does not need synchronization
4962 if (threads.get(my_thr_no))
4963 {
4964 jam();
4965 threads.clear(my_thr_no);
4966 cnt--;
4967 }
4968
4969 synchronize_threads(signal, threads, cb, JBB, JBA);
4970 #endif
4971 }
4972
4973 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4974 SimulatedBlock::synchronize_path(Signal * signal,
4975 const Uint32 blocks[],
4976 const Callback & cb,
4977 JobBufferLevel prio)
4978 {
4979 jam();
4980
4981 // reuse SyncThreadRecord
4982 Ptr<SyncThreadRecord> ptr;
4983 ndbrequire(c_syncThreadPool.seize(ptr));
4984 ptr.p->m_cnt = 0; // with count of 0
4985 ptr.p->m_callback = cb;
4986
4987 SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4988 req->senderData = ptr.i;
4989 req->prio = Uint32(prio);
4990 req->count = 1;
4991 if (blocks[0] == 0)
4992 {
4993 jam();
4994 ndbabort(); // TODO
4995 }
4996 else
4997 {
4998 jam();
4999 Uint32 len = 0;
5000 for (; blocks[len+1] != 0; len++)
5001 {
5002 req->path[len] = blocks[len+1];
5003 }
5004 req->pathlen = 1 + len;
5005 req->path[len] = reference();
5006 sendSignal(numberToRef(blocks[0], getOwnNodeId()),
5007 GSN_SYNC_PATH_REQ, signal,
5008 SyncPathReq::SignalLength + (1 + len), prio);
5009 }
5010 }
5011
5012 void
execSYNC_PATH_REQ(Signal * signal)5013 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
5014 {
5015 jamEntry();
5016 SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
5017 if (req->pathlen == 1)
5018 {
5019 jam();
5020 SyncPathReq copy = *req;
5021 SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
5022 conf->senderData = copy.senderData;
5023 conf->count = copy.count;
5024 sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
5025 SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
5026 }
5027 else
5028 {
5029 jam();
5030 Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
5031 req->pathlen--;
5032 memmove(req->path, req->path + 1, 4 * req->pathlen);
5033 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
5034 SyncPathReq::SignalLength + (1 + req->pathlen),
5035 JobBufferLevel(req->prio));
5036 }
5037 }
5038
5039 void
execSYNC_PATH_CONF(Signal * signal)5040 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
5041 {
5042 jamEntry();
5043 SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
5044 Ptr<SyncThreadRecord> ptr;
5045
5046 c_syncThreadPool.getPtr(ptr, conf.senderData);
5047
5048 if (ptr.p->m_cnt == 0)
5049 {
5050 jam();
5051 ptr.p->m_cnt = conf.count;
5052 }
5053
5054 if (ptr.p->m_cnt == 1)
5055 {
5056 jam();
5057 Callback copy = ptr.p->m_callback;
5058 c_syncThreadPool.release(ptr);
5059 execute(signal, copy, 0);
5060 return;
5061 }
5062
5063 ptr.p->m_cnt --;
5064 }
5065
5066
5067 bool
checkNodeFailSequence(Signal * signal)5068 SimulatedBlock::checkNodeFailSequence(Signal* signal)
5069 {
5070 Uint32 ref = signal->getSendersBlockRef();
5071
5072 /**
5073 * Make sure that a signal being part of node-failure handling
5074 * from a remote node, does not get to us before we got the NODE_FAILREP
5075 * (this to avoid tricky state handling to some extent when receving
5076 * signals from old nodes)
5077 *
5078 * To ensure this, we send the signal via the transporter for the remote
5079 * sender node via QMGR and NDBCNTR to DBDIH. Although approximating
5080 * synchronization between all threads and transporters with a single hop to
5081 * DBLQH_REF to at least send via another thread for multi threaded data
5082 * node.
5083 *
5084 * The extra time should be negilable
5085 *
5086 * Note, make an exception for signals sent by our self
5087 * as they are only sent as a consequence of NODE_FAILREP
5088 *
5089 * Also note that this function no longer guarantee that signal arrives to
5090 * its destination after corresponding NODE_FAILREP, as a complement caller
5091 * need some further logic delaying the processing of the signal until
5092 * NODE_FAILREP have been seen.
5093 */
5094 if (ref == reference() ||
5095 (refToNode(ref) == getOwnNodeId() &&
5096 refToMain(ref) == DBDIH))
5097 {
5098 jam();
5099 return true;
5100 }
5101
5102 Uint32 trpman_ref;
5103 if (globalData.ndbMtReceiveThreads == 0)
5104 {
5105 jam();
5106 ndbrequire(!isNdbMt());
5107 trpman_ref = TRPMAN_REF;
5108 }
5109 else
5110 {
5111 jam();
5112 ndbrequire(isNdbMt());
5113 Uint32 sender_node = refToNode(ref);
5114 Uint32 inst = (get_recv_thread_idx(sender_node) + /* proxy */ 1);
5115 if (inst > NDBMT_MAX_BLOCK_INSTANCES)
5116 {
5117 jam();
5118 trpman_ref = TRPMAN_REF;
5119 }
5120 else
5121 {
5122 jam();
5123 trpman_ref = numberToRef(TRPMAN, inst, getOwnNodeId());
5124 }
5125 }
5126
5127 RoutePath path[5];
5128 Uint32 path_idx = 0;
5129
5130 /* Start at TRPMAN for sending node */
5131 path[path_idx].ref = trpman_ref;
5132 path[path_idx].prio = JBA;
5133 path_idx++;
5134
5135 /* Follow COMMIT_FAILREQ to QMGR */
5136 path[path_idx].ref = QMGR_REF;
5137 path[path_idx].prio = JBB;
5138 path_idx++;
5139
5140 /*
5141 * Should be sync_threads, but sends only to DBLQH_REF to at least send
5142 * to another thread than main thread (if using a multi threaded data node)
5143 */
5144 path[path_idx].ref = DBLQH_REF;
5145 path[path_idx].prio = JBB;
5146 path_idx++;
5147
5148 /* Follow NODE_FAILREP to NDBCNT */
5149 path[path_idx].ref = NDBCNTR_REF;
5150 path[path_idx].prio = JBB;
5151 path_idx++;
5152
5153 /* Follow NODE_FAILREP to DBDIH */
5154 path[path_idx].ref = DBDIH_REF;
5155 path[path_idx].prio = JBB;
5156 path_idx++;
5157
5158 ndbrequire(path_idx <= NDB_ARRAY_SIZE(path));
5159
5160 Uint32 dst[1];
5161 dst[0] = reference();
5162
5163 SectionHandle handle(this, signal);
5164 Uint32 gsn = signal->header.theVerId_signalNumber;
5165 Uint32 len = signal->getLength();
5166
5167 sendRoutedSignal(path, path_idx, dst, 1, gsn, signal, len, JBB, &handle);
5168 return false;
5169 }
5170
5171 #ifdef ERROR_INSERT
5172 void
setDelayedPrepare()5173 SimulatedBlock::setDelayedPrepare()
5174 {
5175 #ifdef NDBD_MULTITHREADED
5176 mt_set_delayed_prepare(m_threadId);
5177 #else
5178 // ndbd todo
5179 #endif
5180 }
5181 #endif
5182
5183 void
setup_wakeup()5184 SimulatedBlock::setup_wakeup()
5185 {
5186 #ifdef NDBD_MULTITHREADED
5187 #else
5188 globalTransporterRegistry.setup_wakeup_socket();
5189 #endif
5190 }
5191
5192 void
wakeup()5193 SimulatedBlock::wakeup()
5194 {
5195 #ifdef NDBD_MULTITHREADED
5196 mt_wakeup(this);
5197 #else
5198 globalTransporterRegistry.wakeup();
5199 #endif
5200 }
5201
5202
5203 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const5204 SimulatedBlock::ndbinfo_send_row(Signal* signal,
5205 const DbinfoScanReq& req,
5206 const Ndbinfo::Row& row,
5207 Ndbinfo::Ratelimit& rl) const
5208 {
5209 // Check correct number of columns against table
5210 assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
5211
5212 TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
5213 tidai->connectPtr= req.resultData;
5214 tidai->transId[0]= req.transId[0];
5215 tidai->transId[1]= req.transId[1];
5216
5217 LinearSectionPtr ptr[3];
5218 ptr[0].p = row.getDataPtr();
5219 ptr[0].sz = row.getLength();
5220
5221 rl.rows++;
5222 rl.bytes += row.getLength();
5223
5224 sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
5225 signal, TransIdAI::HeaderLength, JBB, ptr, 1);
5226 }
5227
5228
5229 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const5230 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
5231 DbinfoScanReq& req,
5232 const Ndbinfo::Ratelimit& rl,
5233 Uint32 data1, Uint32 data2,
5234 Uint32 data3, Uint32 data4) const
5235 {
5236 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
5237 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
5238 MEMCOPY_NO_WORDS(conf, &req, signal_length);
5239
5240 conf->returnedRows = rl.rows;
5241
5242 // Update the cursor with current item number
5243 Ndbinfo::ScanCursor* cursor =
5244 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
5245
5246 cursor->data[0] = data1;
5247 cursor->data[1] = data2;
5248 cursor->data[2] = data3;
5249 cursor->data[3] = data4;
5250
5251 // Increase number of rows and bytes sent to far
5252 cursor->totalRows += rl.rows;
5253 cursor->totalBytes += rl.bytes;
5254
5255 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
5256
5257 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
5258 signal_length, JBB);
5259 }
5260
5261 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const5262 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
5263 DbinfoScanReq& req,
5264 const Ndbinfo::Ratelimit& rl) const
5265 {
5266 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
5267 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
5268 Uint32 sender_ref = req.resultRef;
5269 MEMCOPY_NO_WORDS(conf, &req, signal_length);
5270
5271 conf->returnedRows = rl.rows;
5272
5273 if (req.cursor_sz)
5274 {
5275 jam();
5276 // Update the cursor with current item number
5277 Ndbinfo::ScanCursor* cursor =
5278 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
5279
5280 // Reset all data holders
5281 memset(cursor->data, 0, sizeof(cursor->data));
5282
5283 // Increase number of rows and bytes sent to far
5284 cursor->totalRows += rl.rows;
5285 cursor->totalBytes += rl.bytes;
5286
5287 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
5288
5289 sender_ref = cursor->senderRef;
5290
5291 }
5292 sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
5293 signal_length, JBB);
5294 }
5295
init_elapsed_time(Signal * signal,NDB_TICKS & latestTIME_SIGNAL)5296 void SimulatedBlock::init_elapsed_time(Signal *signal,
5297 NDB_TICKS &latestTIME_SIGNAL)
5298 {
5299 const NDB_TICKS currentTime = NdbTick_getCurrentTicks();
5300 signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
5301 signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
5302 latestTIME_SIGNAL = currentTime;
5303 sendSignal(reference(), GSN_TIME_SIGNAL, signal, 2, JBB);
5304 }
5305
sendTIME_SIGNAL(Signal * signal,const NDB_TICKS currentTime,Uint32 delay)5306 void SimulatedBlock::sendTIME_SIGNAL(Signal *signal,
5307 const NDB_TICKS currentTime,
5308 Uint32 delay)
5309 {
5310 signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
5311 signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
5312 sendSignalWithDelay(reference(), GSN_TIME_SIGNAL, signal, delay, 2);
5313 }
5314
5315 /*
5316 This function is used to handle TIME_SIGNAL. This signal is intended to
5317 be used sort of like a drum beat. We should execute some timer calls
5318 every so often. However the OS can easily make the delayed signals to
5319 be delayed if the OS is occupied with other things. We will never report
5320 sleeps for longer than twice the expected delay. We rely on the delayed
5321 signal scheduler to ensure that we run time a bit faster for a while
5322 after long sleeps.
5323
5324 This function will return the elapsed time since last time we called it.
5325 */
5326 Uint64
elapsed_time(Signal * signal,const NDB_TICKS currentTime,NDB_TICKS & latestTIME_SIGNAL,Uint32 expected_delay)5327 SimulatedBlock::elapsed_time(Signal *signal,
5328 const NDB_TICKS currentTime,
5329 NDB_TICKS &latestTIME_SIGNAL,
5330 Uint32 expected_delay)
5331 {
5332 const Uint64 elapsed_time =
5333 NdbTick_Elapsed(latestTIME_SIGNAL, currentTime).milliSec();
5334 latestTIME_SIGNAL = currentTime;
5335
5336 if (elapsed_time > Uint64(2 * expected_delay))
5337 {
5338 return Uint64(2 * expected_delay);
5339 }
5340 return elapsed_time;
5341 }
5342
5343 #ifdef VM_TRACE
5344 void
assertOwnThread()5345 SimulatedBlock::assertOwnThread()
5346 {
5347 #ifdef NDBD_MULTITHREADED
5348 mt_assert_own_thread(this);
5349 #endif
5350 }
5351
5352 #endif
5353
5354 Uint32
get_recv_thread_idx(TrpId trp_id)5355 SimulatedBlock::get_recv_thread_idx(TrpId trp_id)
5356 {
5357 #ifdef NDBD_MULTITHREADED
5358 return mt_get_recv_thread_idx(trp_id);
5359 #else
5360 return 0;
5361 #endif
5362 }
5363
5364 #ifndef NDBD_MULTITHREADED
5365 /**
5366 * Add a stub for this function since we have some code in ErrorReporter.cpp
5367 * that needs this function, it's only really needed for ndbmtd, so need an
5368 * empty function in ndbd.
5369 */
5370 void
prepare_to_crash(bool first_phase,bool error_insert_crash)5371 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
5372 {
5373 (void)first_phase;
5374 (void)error_insert_crash;
5375 }
5376 #endif
5377
5378 /**
5379 * Implementation of SegmentUtils
5380 * Here we forward the calls, but
5381 * in ndbmtd, add our thread cache
5382 * + lock function arguments.
5383 */
5384
5385 SectionSegment*
getSegmentPtr(Uint32 iVal)5386 SimulatedBlock::getSegmentPtr(Uint32 iVal)
5387 {
5388 return g_sectionSegmentPool.getPtr(iVal);
5389 }
5390
5391 bool
seizeSegment(Ptr<SectionSegment> & p)5392 SimulatedBlock::seizeSegment(Ptr<SectionSegment>& p)
5393 {
5394 return g_sectionSegmentPool.seize(SB_SP_REL_ARG p);
5395 }
5396
5397 void
releaseSegment(Uint32 iVal)5398 SimulatedBlock::releaseSegment(Uint32 iVal)
5399 {
5400 g_sectionSegmentPool.release(SB_SP_REL_ARG iVal);
5401 }
5402
5403 void
releaseSegmentList(Uint32 firstSegmentIVal)5404 SimulatedBlock::releaseSegmentList(Uint32 firstSegmentIVal)
5405 {
5406 ::releaseSection(SB_SP_ARG firstSegmentIVal);
5407 }
5408
5409 #ifdef NDB_DEBUG_RES_OWNERSHIP
5410 void
lock_global_ssp()5411 SimulatedBlock::lock_global_ssp()
5412 {
5413 #ifdef NDBD_MULTITHREADED
5414 f_section_lock.lock();
5415 #endif
5416 }
5417
5418 void
unlock_global_ssp()5419 SimulatedBlock::unlock_global_ssp()
5420 {
5421 #ifdef NDBD_MULTITHREADED
5422 f_section_lock.unlock();
5423 #endif
5424 }
5425
5426 #endif
5427
5428
5429
5430 /**
5431 * #undef is needed since this file is included by SimulatedBlock_nonmt.cpp
5432 * and SimulatedBlock_mt.cpp
5433 */
5434 #undef JAM_FILE_ID
5435