1 /*
2 Copyright (c) 2003, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /*
26 This file is used to build both the multithreaded and the singlethreaded
27 ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28 the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29 macro not defined).
30 */
31
32 #include <ndb_global.h>
33
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include <signaldata/EventReport.hpp>
47 #include <signaldata/ContinueFragmented.hpp>
48 #include <signaldata/NodeStateSignalData.hpp>
49 #include <signaldata/FsRef.hpp>
50 #include <signaldata/SignalDroppedRep.hpp>
51 #include <signaldata/LocalRouteOrd.hpp>
52 #include <signaldata/TransIdAI.hpp>
53 #include <signaldata/Sync.hpp>
54 #include <DebuggerNames.hpp>
55 #include "LongSignal.hpp"
56
57 #include <Properties.hpp>
58 #include "Configuration.hpp"
59 #include <AttributeDescriptor.hpp>
60 #include <NdbSqlUtil.hpp>
61
62 #include "../blocks/dbdih/Dbdih.hpp"
63 #include <signaldata/CallbackSignal.hpp>
64 #include "LongSignalImpl.hpp"
65
66 #include "KeyDescriptor.hpp"
67
68 #include <EventLogger.hpp>
69
70 #define JAM_FILE_ID 252
71
72 extern EventLogger * g_eventLogger;
73
74 //
75 // Constructor, Destructor
76 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)77 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
78 struct Block_context & ctx,
79 Uint32 instanceNumber)
80 : theNodeId(globalData.ownId),
81 theNumber(blockNumber),
82 theInstance(instanceNumber),
83 theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
84 theInstanceList(0),
85 theMainInstance(0),
86 m_ctx(ctx),
87 m_global_page_pool(globalData.m_global_page_pool),
88 m_shared_page_pool(globalData.m_shared_page_pool),
89 c_fragmentInfoHash(c_fragmentInfoPool),
90 c_linearFragmentSendList(c_fragmentSendPool),
91 c_segmentedFragmentSendList(c_fragmentSendPool),
92 c_mutexMgr(* this),
93 c_counterMgr(* this)
94 #ifdef VM_TRACE
95 ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
96 #endif
97 #ifdef VM_TRACE_TIME
98 ,m_currentGsn(0)
99 #endif
100 {
101 m_threadId = 0;
102 m_watchDogCounter = NULL;
103 m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
104 NewVarRef = 0;
105
106 SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
107
108 if (theInstance == 0) {
109 ndbrequire(mainBlock == 0);
110 mainBlock = this;
111 theMainInstance = mainBlock;
112 globalData.setBlock(blockNumber, mainBlock);
113 mainBlock->addInstance(this, theInstance);
114 } else {
115 ndbrequire(mainBlock != 0);
116 mainBlock->addInstance(this, theInstance);
117 theMainInstance = mainBlock;
118 }
119
120 c_fragmentIdCounter = 1;
121 c_fragSenderRunning = false;
122
123 #ifdef VM_TRACE_TIME
124 clearTimes();
125 #endif
126
127 for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
128 theExecArray[i] = 0;
129
130 installSimulatedBlockFunctions();
131
132 m_callbackTableAddr = 0;
133
134 CLEAR_ERROR_INSERT_VALUE;
135
136 #ifdef VM_TRACE
137 m_global_variables = new Ptr<void> * [1];
138 m_global_variables[0] = 0;
139 m_global_variables_save = 0;
140 #endif
141 }
142
143 void
addInstance(SimulatedBlock * b,Uint32 theInstance)144 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
145 {
146 ndbrequire(theMainInstance == this);
147 ndbrequire(number() == b->number());
148 if (theInstanceList == 0)
149 {
150 theInstanceList = new SimulatedBlock* [MaxInstances];
151 ndbrequire(theInstanceList != 0);
152 for (Uint32 i = 0; i < MaxInstances; i++)
153 theInstanceList[i] = 0;
154 }
155 ndbrequire(theInstance < MaxInstances);
156 ndbrequire(theInstanceList[theInstance] == 0);
157 theInstanceList[theInstance] = b;
158 }
159
160 void
initCommon()161 SimulatedBlock::initCommon()
162 {
163 Uint32 count = 10;
164 this->getParam("FragmentSendPool", &count);
165 c_fragmentSendPool.setSize(count);
166
167 count = 10;
168 this->getParam("FragmentInfoPool", &count);
169 c_fragmentInfoPool.setSize(count);
170
171 count = 10;
172 this->getParam("FragmentInfoHash", &count);
173 c_fragmentInfoHash.setSize(count);
174
175 Uint32 def = 5;
176 #ifdef NDBD_MULTITHREADED
177 def += globalData.getBlockThreads();
178 #endif
179
180 count = def;
181 this->getParam("ActiveMutexes", &count);
182 c_mutexMgr.setSize(count);
183
184 count = def;
185 this->getParam("ActiveCounters", &count);
186 c_counterMgr.setSize(count);
187
188 count = def;
189 this->getParam("ActiveThreadSync", &count);
190 c_syncThreadPool.setSize(count);
191 }
192
~SimulatedBlock()193 SimulatedBlock::~SimulatedBlock()
194 {
195 freeBat();
196 #ifdef VM_TRACE_TIME
197 printTimes(stdout);
198 #endif
199
200 #ifdef VM_TRACE
201 enable_global_variables();
202 delete [] m_global_variables;
203 m_global_variables = 0;
204 #endif
205
206 if (theInstanceList != 0) {
207 Uint32 i;
208 for (i = 0; i < MaxInstances; i++)
209 {
210 if (theInstanceList[i] != this)
211 {
212 delete theInstanceList[i];
213 }
214 }
215 delete [] theInstanceList;
216 }
217 theInstanceList = 0;
218 }
219
220 void
installSimulatedBlockFunctions()221 SimulatedBlock::installSimulatedBlockFunctions(){
222 ExecFunction * a = theExecArray;
223 a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
224 a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
225 a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
226 a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
227 a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
228 a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
229 a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
230 a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
231 a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
232 a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
233 a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
234 a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
235 a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
236 a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
237 a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
238 a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
239 a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
240 a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
241 a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
242 a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
243 a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
244 a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
245 a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
246 a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
247 a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
248 a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
249 a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
250 a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
251 a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
252 a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
253 a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
254 }
255
256 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)257 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
258 ExecFunction f, bool force){
259 if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
260 char errorMsg[255];
261 BaseString::snprintf(errorMsg, 255,
262 "GSN %d(%d))", gsn, MAX_GSN);
263 ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
264 }
265 theExecArray[gsn] = f;
266 }
267
268 void
assignToThread(ThreadContext ctx)269 SimulatedBlock::assignToThread(ThreadContext ctx)
270 {
271 m_threadId = ctx.threadId;
272 m_jamBuffer = ctx.jamBuffer;
273 m_watchDogCounter = ctx.watchDogCounter;
274 m_sectionPoolCache = ctx.sectionPoolCache;
275 }
276
277 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)278 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
279 {
280 Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
281 Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
282 return instanceKey;
283 }
284
285 Uint32
getInstanceFromKey(Uint32 instanceKey)286 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
287 {
288 Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
289 Uint32 instanceNo;
290 if (lqhWorkers == 0) {
291 instanceNo = 0;
292 } else {
293 assert(instanceKey != 0);
294 instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
295 }
296 return instanceNo;
297 }
298
299 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const300 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
301 const char* filename, int lineno) const
302 {
303 char objRef[255];
304 BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
305 char probData[255];
306 BaseString::snprintf(probData, 255,
307 "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
308 gsn, len, recBlockNo);
309
310 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
311 probData,
312 objRef);
313 }
314
315
316 extern class SectionSegmentPool g_sectionSegmentPool;
317
318 void
handle_invalid_sections_in_send_signal(const Signal * signal) const319 SimulatedBlock::handle_invalid_sections_in_send_signal(const Signal* signal)
320 const
321 {
322 char errMsg[160];
323 BaseString::snprintf(errMsg, sizeof errMsg,
324 "Unhandled sections in sendSignal for GSN %u (%s).",
325 signal->header.theVerId_signalNumber,
326 getSignalName(signal->header.theVerId_signalNumber));
327 // Print message and terminate.
328 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
329 errMsg,
330 "");
331 }
332
333 void
handle_lingering_sections_after_execute(const Signal * signal) const334 SimulatedBlock::handle_lingering_sections_after_execute(const Signal* signal)
335 const
336 {
337 char errMsg[160];
338 BaseString::snprintf(errMsg, sizeof errMsg,
339 "Unhandled sections after execute for GSN %u (%s).",
340 signal->header.theVerId_signalNumber,
341 getSignalName(signal->header.theVerId_signalNumber));
342 // Print message and terminate.
343 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
344 errMsg,
345 "");
346 }
347
348 void
handle_invalid_fragmentInfo(Signal * signal) const349 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
350 {
351 #if defined VM_TRACE || defined ERROR_INSERT
352 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
353 "Incorrect header->m_fragmentInfo in sendSignal()",
354 "");
355 #else
356 signal->header.m_fragmentInfo = 0;
357 infoEvent("Incorrect header->m_fragmentInfo in sendSignal");
358 #endif
359 }
360
361 void
handle_out_of_longsignal_memory(Signal * signal) const362 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
363 {
364 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
365 "Out of LongMessageBuffer in sendSignal",
366 "");
367 }
368
369 void
handle_send_failed(SendStatus ss,Signal * signal) const370 SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const
371 {
372 switch(ss){
373 case SEND_BUFFER_FULL:
374 ErrorReporter::handleError(NDBD_EXIT_GENERIC,
375 "Out of SendBufferMemory in sendSignal", "");
376 break;
377 case SEND_MESSAGE_TOO_BIG:
378 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
379 "Message to big in sendSignal", "");
380 break;
381 case SEND_UNKNOWN_NODE:
382 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
383 "Unknown node in sendSignal", "");
384 break;
385 case SEND_OK:
386 case SEND_BLOCKED:
387 case SEND_DISCONNECTED:
388 break;
389 }
390 ndbrequire(false);
391 }
392
393 static void
linkSegments(Uint32 head,Uint32 tail)394 linkSegments(Uint32 head, Uint32 tail){
395
396 Ptr<SectionSegment> headPtr;
397 g_sectionSegmentPool.getPtr(headPtr, head);
398
399 Ptr<SectionSegment> tailPtr;
400 g_sectionSegmentPool.getPtr(tailPtr, tail);
401
402 Ptr<SectionSegment> oldTailPtr;
403 g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
404
405 /* Can only efficiently link segments if linking to the end of a
406 * multiple-of-segment-size sized chunk
407 */
408 if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
409 {
410 #if defined VM_TRACE || defined ERROR_INSERT
411 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
412 "Bad head segment size",
413 "");
414 #else
415 ndbout_c("linkSegments : Bad head segment size");
416 #endif
417 }
418
419 headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
420 headPtr.p->m_sz += tailPtr.p->m_sz;
421
422 oldTailPtr.p->m_nextSegment = tailPtr.i;
423 }
424
425 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])426 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
427 Uint32 tSec0 = ptr[0].i;
428 Uint32 tSec1 = ptr[1].i;
429 Uint32 tSec2 = ptr[2].i;
430 SectionSegment * p;
431 switch(secCount){
432 case 3:
433 p = g_sectionSegmentPool.getPtr(tSec2);
434 ptr[2].p = p;
435 ptr[2].sz = p->m_sz;
436 case 2:
437 p = g_sectionSegmentPool.getPtr(tSec1);
438 ptr[1].p = p;
439 ptr[1].sz = p->m_sz;
440 case 1:
441 p = g_sectionSegmentPool.getPtr(tSec0);
442 ptr[0].p = p;
443 ptr[0].sz = p->m_sz;
444 case 0:
445 return;
446 }
447 char msg[40];
448 sprintf(msg, "secCount=%d", secCount);
449 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
450 }
451
452 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)453 getSection(SegmentedSectionPtr & ptr, Uint32 i){
454 ptr.i = i;
455 SectionSegment * p = g_sectionSegmentPool.getPtr(i);
456 ptr.p = p;
457 ptr.sz = p->m_sz;
458 }
459
getSectionSz(Uint32 id)460 Uint32 getSectionSz(Uint32 id)
461 {
462 return g_sectionSegmentPool.getPtr(id)->m_sz;
463 }
464
getLastWordPtr(Uint32 id)465 Uint32* getLastWordPtr(Uint32 id)
466 {
467 SectionSegment* first= g_sectionSegmentPool.getPtr(id);
468 SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
469 Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
470 return &last->theData[offset];
471 }
472
473 #ifdef NDBD_MULTITHREADED
474 #define SB_SP_ARG *m_sectionPoolCache,
475 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
476 #else
477 #define SB_SP_ARG
478 #define SB_SP_REL_ARG
479 #endif
480
481 static
482 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])483 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
484 Uint32 tSec0 = ptr[0].i;
485 Uint32 tSz0 = ptr[0].sz;
486 Uint32 tSec1 = ptr[1].i;
487 Uint32 tSz1 = ptr[1].sz;
488 Uint32 tSec2 = ptr[2].i;
489 Uint32 tSz2 = ptr[2].sz;
490 switch(secCount){
491 case 3:
492 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
493 relSz(tSz2), tSec2,
494 ptr[2].p->m_lastSegment);
495 case 2:
496 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
497 relSz(tSz1), tSec1,
498 ptr[1].p->m_lastSegment);
499 case 1:
500 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
501 relSz(tSz0), tSec0,
502 ptr[0].p->m_lastSegment);
503 case 0:
504 return;
505 }
506 char msg[40];
507 sprintf(msg, "secCount=%d", secCount);
508 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
509 }
510
511 void
getSendBufferLevel(NodeId node,SB_LevelType & level)512 SimulatedBlock::getSendBufferLevel(NodeId node, SB_LevelType &level)
513 {
514 #ifdef NDBD_MULTITHREADED
515 mt_getSendBufferLevel(m_threadId, node, level);
516 #else
517 globalTransporterRegistry.getSendBufferLevel(node, level);
518 #endif
519 }
520
521 Uint32
getSignalsInJBB()522 SimulatedBlock::getSignalsInJBB()
523 {
524 Uint32 num_signals;
525 #ifdef NDBD_MULTITHREADED
526 num_signals = mt_getSignalsInJBB(m_threadId);
527 #else
528 num_signals = globalScheduler.getBOccupancy();
529 #endif
530 return num_signals;
531 }
532
533 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const534 SimulatedBlock::sendSignal(BlockReference ref,
535 GlobalSignalNumber gsn,
536 Signal* signal,
537 Uint32 length,
538 JobBufferLevel jobBuffer) const {
539
540 BlockReference sendBRef = reference();
541
542 Uint32 recBlock = refToBlock(ref);
543 Uint32 recNode = refToNode(ref);
544 Uint32 ourProcessor = globalData.ownId;
545
546 check_sections(signal, signal->header.m_noOfSections, 0);
547
548 signal->header.theLength = length;
549 signal->header.theVerId_signalNumber = gsn;
550 signal->header.theReceiversBlockNumber = recBlock;
551 signal->header.m_noOfSections = 0;
552
553 Uint32 tSignalId = signal->header.theSignalId;
554
555 if ((length == 0) || length > 25 || (recBlock == 0)) {
556 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
557 return;
558 }//if
559 #ifdef VM_TRACE
560 if(globalData.testOn){
561 Uint16 proc =
562 (recNode == 0 ? globalData.ownId : recNode);
563 signal->header.theSendersBlockRef = sendBRef;
564 globalSignalLoggers.sendSignal(signal->header,
565 jobBuffer,
566 &signal->theData[0],
567 proc);
568 }
569 #endif
570
571 if(recNode == ourProcessor || recNode == 0) {
572 signal->header.theSendersSignalId = tSignalId;
573 signal->header.theSendersBlockRef = sendBRef;
574 #ifdef NDBD_MULTITHREADED
575 if (jobBuffer == JBB)
576 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
577 else
578 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
579 #else
580 globalScheduler.execute(signal, jobBuffer, recBlock,
581 gsn);
582 #endif
583 return;
584 } else {
585 // send distributed Signal
586 SignalHeader sh;
587
588 Uint32 tTrace = signal->getTrace();
589
590 sh.theVerId_signalNumber = gsn;
591 sh.theReceiversBlockNumber = recBlock;
592 sh.theSendersBlockRef = refToBlock(sendBRef);
593 sh.theLength = length;
594 sh.theTrace = tTrace;
595 sh.theSignalId = tSignalId;
596 sh.m_noOfSections = 0;
597 sh.m_fragmentInfo = 0;
598
599 #ifdef TRACE_DISTRIBUTED
600 ndbout_c("send: %s(%d) to (%s, %d)",
601 getSignalName(gsn), gsn, getBlockName(recBlock),
602 recNode);
603 #endif
604
605 SendStatus ss;
606 #ifdef NDBD_MULTITHREADED
607 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
608 recNode, 0);
609 #else
610 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
611 &signal->theData[0], recNode,
612 (LinearSectionPtr*)0);
613 #endif
614
615 if (unlikely(! (ss == SEND_OK ||
616 ss == SEND_BLOCKED ||
617 ss == SEND_DISCONNECTED)))
618 {
619 handle_send_failed(ss, signal);
620 }
621 }
622 return;
623 }
624
625 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const626 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
627 GlobalSignalNumber gsn,
628 Signal* signal,
629 Uint32 length,
630 JobBufferLevel jobBuffer) const {
631
632 Uint32 noOfSections = signal->header.m_noOfSections;
633 Uint32 tSignalId = signal->header.theSignalId;
634 Uint32 tTrace = signal->getTrace();
635
636 Uint32 ourProcessor = globalData.ownId;
637 Uint32 recBlock = rg.m_block;
638
639 signal->header.theLength = length;
640 signal->header.theVerId_signalNumber = gsn;
641 signal->header.theReceiversBlockNumber = recBlock;
642 signal->header.theSendersSignalId = tSignalId;
643 signal->header.theSendersBlockRef = reference();
644 signal->header.m_noOfSections = 0;
645
646 check_sections(signal, noOfSections, 0);
647
648 if ((length == 0) || (length > 25) || (recBlock == 0)) {
649 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
650 return;
651 }//if
652
653 SignalHeader sh;
654
655 sh.theVerId_signalNumber = gsn;
656 sh.theReceiversBlockNumber = recBlock;
657 sh.theSendersBlockRef = refToBlock(reference());
658 sh.theLength = length;
659 sh.theTrace = tTrace;
660 sh.theSignalId = tSignalId;
661 sh.m_noOfSections = 0;
662 sh.m_fragmentInfo = 0;
663
664 /**
665 * Check own node
666 */
667 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
668 #ifdef VM_TRACE
669 if(globalData.testOn){
670 globalSignalLoggers.sendSignal(signal->header,
671 jobBuffer,
672 &signal->theData[0],
673 ourProcessor);
674 }
675 #endif
676
677 #ifdef NDBD_MULTITHREADED
678 if (jobBuffer == JBB)
679 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
680 else
681 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
682 #else
683 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
684 #endif
685
686 rg.m_nodes.clear((Uint32)0);
687 rg.m_nodes.clear(ourProcessor);
688 }
689
690 /**
691 * Do the big loop
692 */
693 Uint32 recNode = 0;
694 while(!rg.m_nodes.isclear()){
695 recNode = rg.m_nodes.find(recNode + 1);
696 rg.m_nodes.clear(recNode);
697 #ifdef VM_TRACE
698 if(globalData.testOn){
699 globalSignalLoggers.sendSignal(signal->header,
700 jobBuffer,
701 &signal->theData[0],
702 recNode);
703 }
704 #endif
705
706 #ifdef TRACE_DISTRIBUTED
707 ndbout_c("send: %s(%d) to (%s, %d)",
708 getSignalName(gsn), gsn, getBlockName(recBlock),
709 recNode);
710 #endif
711
712 SendStatus ss;
713 #ifdef NDBD_MULTITHREADED
714 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
715 recNode, 0);
716 #else
717 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
718 &signal->theData[0], recNode,
719 (LinearSectionPtr*)0);
720 #endif
721
722 if (unlikely(! (ss == SEND_OK ||
723 ss == SEND_BLOCKED ||
724 ss == SEND_DISCONNECTED)))
725 {
726 handle_send_failed(ss, signal);
727 }
728 }
729
730 return;
731 }
732
733 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
734
735 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const736 SimulatedBlock::sendSignal(BlockReference ref,
737 GlobalSignalNumber gsn,
738 Signal* signal,
739 Uint32 length,
740 JobBufferLevel jobBuffer,
741 LinearSectionPtr ptr[3],
742 Uint32 noOfSections) const {
743
744 BlockReference sendBRef = reference();
745
746 Uint32 recBlock = refToBlock(ref);
747 Uint32 recNode = refToNode(ref);
748 Uint32 ourProcessor = globalData.ownId;
749
750 check_sections(signal, signal->header.m_noOfSections, noOfSections);
751
752 signal->header.theLength = length;
753 signal->header.theVerId_signalNumber = gsn;
754 signal->header.theReceiversBlockNumber = recBlock;
755 signal->header.m_noOfSections = noOfSections;
756
757 Uint32 tSignalId = signal->header.theSignalId;
758 Uint32 tFragInfo = signal->header.m_fragmentInfo;
759
760 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
761 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
762 return;
763 }//if
764 #ifdef VM_TRACE
765 if(globalData.testOn){
766 Uint16 proc =
767 (recNode == 0 ? globalData.ownId : recNode);
768 signal->header.theSendersBlockRef = sendBRef;
769 globalSignalLoggers.sendSignal(signal->header,
770 jobBuffer,
771 &signal->theData[0],
772 proc,
773 ptr, noOfSections);
774 }
775 #endif
776
777 if(recNode == ourProcessor || recNode == 0) {
778 signal->header.theSendersSignalId = tSignalId;
779 signal->header.theSendersBlockRef = sendBRef;
780
781 /**
782 * We have to copy the data
783 */
784 bool ok = true;
785 Ptr<SectionSegment> segptr[3];
786 for(Uint32 i = 0; i<noOfSections; i++){
787 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
788 signal->theData[length+i] = segptr[i].i;
789 }
790
791 if (unlikely(! ok))
792 {
793 handle_out_of_longsignal_memory(signal);
794 }
795
796 #ifdef NDBD_MULTITHREADED
797 if (jobBuffer == JBB)
798 sendlocal(m_threadId, &signal->header, signal->theData,
799 signal->theData+length);
800 else
801 sendprioa(m_threadId, &signal->header, signal->theData,
802 signal->theData+length);
803 #else
804 globalScheduler.execute(signal, jobBuffer, recBlock,
805 gsn);
806 #endif
807 signal->header.m_noOfSections = 0;
808 return;
809 } else {
810 // send distributed Signal
811 SignalHeader sh;
812
813 Uint32 tTrace = signal->getTrace();
814 Uint32 noOfSections = signal->header.m_noOfSections;
815
816 sh.theVerId_signalNumber = gsn;
817 sh.theReceiversBlockNumber = recBlock;
818 sh.theSendersBlockRef = refToBlock(sendBRef);
819 sh.theLength = length;
820 sh.theTrace = tTrace;
821 sh.theSignalId = tSignalId;
822 sh.m_noOfSections = noOfSections;
823 sh.m_fragmentInfo = tFragInfo;
824
825 #ifdef TRACE_DISTRIBUTED
826 ndbout_c("send: %s(%d) to (%s, %d)",
827 getSignalName(gsn), gsn, getBlockName(recBlock),
828 recNode);
829 #endif
830
831 SendStatus ss;
832 #ifdef NDBD_MULTITHREADED
833 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
834 recNode, ptr);
835 #else
836 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
837 &signal->theData[0], recNode,
838 ptr);
839 #endif
840
841 if (unlikely(! (ss == SEND_OK ||
842 ss == SEND_BLOCKED ||
843 ss == SEND_DISCONNECTED)))
844 {
845 handle_send_failed(ss, signal);
846 }
847 }
848
849 signal->header.m_noOfSections = 0;
850 signal->header.m_fragmentInfo = 0;
851 return;
852 }
853
854 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const855 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
856 GlobalSignalNumber gsn,
857 Signal* signal,
858 Uint32 length,
859 JobBufferLevel jobBuffer,
860 LinearSectionPtr ptr[3],
861 Uint32 noOfSections) const {
862
863 Uint32 tSignalId = signal->header.theSignalId;
864 Uint32 tTrace = signal->getTrace();
865 Uint32 tFragInfo = signal->header.m_fragmentInfo;
866
867 Uint32 ourProcessor = globalData.ownId;
868 Uint32 recBlock = rg.m_block;
869
870 check_sections(signal, signal->header.m_noOfSections, noOfSections);
871
872 signal->header.theLength = length;
873 signal->header.theVerId_signalNumber = gsn;
874 signal->header.theReceiversBlockNumber = recBlock;
875 signal->header.theSendersSignalId = tSignalId;
876 signal->header.theSendersBlockRef = reference();
877 signal->header.m_noOfSections = noOfSections;
878
879 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
880 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
881 return;
882 }//if
883
884 SignalHeader sh;
885 sh.theVerId_signalNumber = gsn;
886 sh.theReceiversBlockNumber = recBlock;
887 sh.theSendersBlockRef = refToBlock(reference());
888 sh.theLength = length;
889 sh.theTrace = tTrace;
890 sh.theSignalId = tSignalId;
891 sh.m_noOfSections = noOfSections;
892 sh.m_fragmentInfo = tFragInfo;
893
894 /**
895 * Check own node
896 */
897 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
898 #ifdef VM_TRACE
899 if(globalData.testOn){
900 globalSignalLoggers.sendSignal(signal->header,
901 jobBuffer,
902 &signal->theData[0],
903 ourProcessor,
904 ptr, noOfSections);
905 }
906 #endif
907 /**
908 * We have to copy the data
909 */
910 bool ok = true;
911 Ptr<SectionSegment> segptr[3];
912 for(Uint32 i = 0; i<noOfSections; i++){
913 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
914 signal->theData[length+i] = segptr[i].i;
915 }
916
917 if (unlikely(! ok))
918 {
919 handle_out_of_longsignal_memory(signal);
920 }
921
922 #ifdef NDBD_MULTITHREADED
923 if (jobBuffer == JBB)
924 sendlocal(m_threadId, &signal->header, signal->theData,
925 signal->theData + length);
926 else
927 sendprioa(m_threadId, &signal->header, signal->theData,
928 signal->theData + length);
929 #else
930 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
931 #endif
932
933 rg.m_nodes.clear((Uint32)0);
934 rg.m_nodes.clear(ourProcessor);
935 }
936
937 /**
938 * Do the big loop
939 */
940 Uint32 recNode = 0;
941 while(!rg.m_nodes.isclear()){
942 recNode = rg.m_nodes.find(recNode + 1);
943 rg.m_nodes.clear(recNode);
944
945 #ifdef VM_TRACE
946 if(globalData.testOn){
947 globalSignalLoggers.sendSignal(signal->header,
948 jobBuffer,
949 &signal->theData[0],
950 recNode,
951 ptr, noOfSections);
952 }
953 #endif
954
955 #ifdef TRACE_DISTRIBUTED
956 ndbout_c("send: %s(%d) to (%s, %d)",
957 getSignalName(gsn), gsn, getBlockName(recBlock),
958 recNode);
959 #endif
960
961 SendStatus ss;
962 #ifdef NDBD_MULTITHREADED
963 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
964 recNode, ptr);
965 #else
966 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
967 &signal->theData[0], recNode,
968 ptr);
969 #endif
970
971 if (unlikely(! (ss == SEND_OK ||
972 ss == SEND_BLOCKED ||
973 ss == SEND_DISCONNECTED)))
974 {
975 handle_send_failed(ss, signal);
976 }
977 }
978
979 signal->header.m_noOfSections = 0;
980 signal->header.m_fragmentInfo = 0;
981
982 return;
983 }
984
985 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const986 SimulatedBlock::sendSignal(BlockReference ref,
987 GlobalSignalNumber gsn,
988 Signal* signal,
989 Uint32 length,
990 JobBufferLevel jobBuffer,
991 SectionHandle* sections) const {
992
993 Uint32 noOfSections = sections->m_cnt;
994 BlockReference sendBRef = reference();
995
996 Uint32 recBlock = refToBlock(ref);
997 Uint32 recNode = refToNode(ref);
998 Uint32 ourProcessor = globalData.ownId;
999
1000 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1001
1002 signal->header.theLength = length;
1003 signal->header.theVerId_signalNumber = gsn;
1004 signal->header.theReceiversBlockNumber = recBlock;
1005 signal->header.m_noOfSections = noOfSections;
1006
1007 Uint32 tSignalId = signal->header.theSignalId;
1008 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1009
1010 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1011 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1012 return;
1013 }//if
1014 #ifdef VM_TRACE
1015 if(globalData.testOn){
1016 Uint16 proc =
1017 (recNode == 0 ? globalData.ownId : recNode);
1018 signal->header.theSendersBlockRef = sendBRef;
1019 globalSignalLoggers.sendSignal(signal->header,
1020 jobBuffer,
1021 &signal->theData[0],
1022 proc,
1023 sections->m_ptr, noOfSections);
1024 }
1025 #endif
1026
1027 if(recNode == ourProcessor || recNode == 0) {
1028 signal->header.theSendersSignalId = tSignalId;
1029 signal->header.theSendersBlockRef = sendBRef;
1030
1031 /**
1032 * We have to copy the data
1033 */
1034 Uint32 * dst = signal->theData + length;
1035 * dst ++ = sections->m_ptr[0].i;
1036 * dst ++ = sections->m_ptr[1].i;
1037 * dst ++ = sections->m_ptr[2].i;
1038
1039 #ifdef NDBD_MULTITHREADED
1040 if (jobBuffer == JBB)
1041 sendlocal(m_threadId, &signal->header, signal->theData,
1042 signal->theData + length);
1043 else
1044 sendprioa(m_threadId, &signal->header, signal->theData,
1045 signal->theData + length);
1046 #else
1047 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1048 #endif
1049 } else {
1050 // send distributed Signal
1051 SignalHeader sh;
1052
1053 Uint32 tTrace = signal->getTrace();
1054
1055 sh.theVerId_signalNumber = gsn;
1056 sh.theReceiversBlockNumber = recBlock;
1057 sh.theSendersBlockRef = refToBlock(sendBRef);
1058 sh.theLength = length;
1059 sh.theTrace = tTrace;
1060 sh.theSignalId = tSignalId;
1061 sh.m_noOfSections = noOfSections;
1062 sh.m_fragmentInfo = tFragInfo;
1063
1064 #ifdef TRACE_DISTRIBUTED
1065 ndbout_c("send: %s(%d) to (%s, %d)",
1066 getSignalName(gsn), gsn, getBlockName(recBlock),
1067 recNode);
1068 #endif
1069
1070 SendStatus ss;
1071 #ifdef NDBD_MULTITHREADED
1072 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1073 recNode, &g_sectionSegmentPool, sections->m_ptr);
1074 #else
1075 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1076 &signal->theData[0], recNode,
1077 g_sectionSegmentPool,
1078 sections->m_ptr);
1079 #endif
1080
1081 if (unlikely(! (ss == SEND_OK ||
1082 ss == SEND_BLOCKED ||
1083 ss == SEND_DISCONNECTED)))
1084 {
1085 handle_send_failed(ss, signal);
1086 }
1087
1088 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1089 }
1090
1091 signal->header.m_noOfSections = 0;
1092 signal->header.m_fragmentInfo = 0;
1093 sections->m_cnt = 0;
1094 return;
1095 }
1096
1097 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1098 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1099 GlobalSignalNumber gsn,
1100 Signal* signal,
1101 Uint32 length,
1102 JobBufferLevel jobBuffer,
1103 SectionHandle * sections) const {
1104
1105 Uint32 noOfSections = sections->m_cnt;
1106 Uint32 tSignalId = signal->header.theSignalId;
1107 Uint32 tTrace = signal->getTrace();
1108 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1109
1110 Uint32 ourProcessor = globalData.ownId;
1111 Uint32 recBlock = rg.m_block;
1112
1113 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1114
1115 signal->header.theLength = length;
1116 signal->header.theVerId_signalNumber = gsn;
1117 signal->header.theReceiversBlockNumber = recBlock;
1118 signal->header.theSendersSignalId = tSignalId;
1119 signal->header.theSendersBlockRef = reference();
1120 signal->header.m_noOfSections = noOfSections;
1121
1122 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1123 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1124 return;
1125 }//if
1126
1127 SignalHeader sh;
1128 sh.theVerId_signalNumber = gsn;
1129 sh.theReceiversBlockNumber = recBlock;
1130 sh.theSendersBlockRef = refToBlock(reference());
1131 sh.theLength = length;
1132 sh.theTrace = tTrace;
1133 sh.theSignalId = tSignalId;
1134 sh.m_noOfSections = noOfSections;
1135 sh.m_fragmentInfo = tFragInfo;
1136
1137 /**
1138 * Check own node
1139 */
1140 bool release = true;
1141 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1142 {
1143 release = false;
1144 #ifdef VM_TRACE
1145 if(globalData.testOn){
1146 globalSignalLoggers.sendSignal(signal->header,
1147 jobBuffer,
1148 &signal->theData[0],
1149 ourProcessor,
1150 sections->m_ptr, noOfSections);
1151 }
1152 #endif
1153 /**
1154 * We have to copy the data
1155 */
1156 Uint32 * dst = signal->theData + length;
1157 * dst ++ = sections->m_ptr[0].i;
1158 * dst ++ = sections->m_ptr[1].i;
1159 * dst ++ = sections->m_ptr[2].i;
1160 #ifdef NDBD_MULTITHREADED
1161 if (jobBuffer == JBB)
1162 sendlocal(m_threadId, &signal->header, signal->theData,
1163 signal->theData + length);
1164 else
1165 sendprioa(m_threadId, &signal->header, signal->theData,
1166 signal->theData + length);
1167 #else
1168 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1169 #endif
1170
1171 rg.m_nodes.clear((Uint32)0);
1172 rg.m_nodes.clear(ourProcessor);
1173 }
1174
1175 /**
1176 * Do the big loop
1177 */
1178 Uint32 recNode = 0;
1179 while(!rg.m_nodes.isclear()){
1180 recNode = rg.m_nodes.find(recNode + 1);
1181 rg.m_nodes.clear(recNode);
1182
1183 #ifdef VM_TRACE
1184 if(globalData.testOn){
1185 globalSignalLoggers.sendSignal(signal->header,
1186 jobBuffer,
1187 &signal->theData[0],
1188 recNode,
1189 sections->m_ptr, noOfSections);
1190 }
1191 #endif
1192
1193 #ifdef TRACE_DISTRIBUTED
1194 ndbout_c("send: %s(%d) to (%s, %d)",
1195 getSignalName(gsn), gsn, getBlockName(recBlock),
1196 recNode);
1197 #endif
1198
1199 SendStatus ss;
1200 #ifdef NDBD_MULTITHREADED
1201 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1202 recNode, &g_sectionSegmentPool, sections->m_ptr);
1203 #else
1204 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1205 &signal->theData[0], recNode,
1206 g_sectionSegmentPool,
1207 sections->m_ptr);
1208 #endif
1209
1210 if (unlikely(! (ss == SEND_OK ||
1211 ss == SEND_BLOCKED ||
1212 ss == SEND_DISCONNECTED)))
1213 {
1214 handle_send_failed(ss, signal);
1215 }
1216 }
1217
1218 if (release)
1219 {
1220 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1221 }
1222
1223 sections->m_cnt = 0;
1224 signal->header.m_noOfSections = 0;
1225 signal->header.m_fragmentInfo = 0;
1226
1227 return;
1228 }
1229
1230 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1231 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1232 GlobalSignalNumber gsn,
1233 Signal* signal,
1234 Uint32 length,
1235 JobBufferLevel jobBuffer,
1236 SectionHandle* sections) const {
1237
1238 /**
1239 * Implementation the same as sendSignal(), except that
1240 * the sections are duplicated when sending locally, and
1241 * not released
1242 */
1243
1244 Uint32 noOfSections = sections->m_cnt;
1245 BlockReference sendBRef = reference();
1246
1247 Uint32 recBlock = refToBlock(ref);
1248 Uint32 recNode = refToNode(ref);
1249 Uint32 ourProcessor = globalData.ownId;
1250
1251 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1252
1253 signal->header.theLength = length;
1254 signal->header.theVerId_signalNumber = gsn;
1255 signal->header.theReceiversBlockNumber = recBlock;
1256 signal->header.m_noOfSections = noOfSections;
1257
1258 Uint32 tSignalId = signal->header.theSignalId;
1259 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1260
1261 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1262 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1263 return;
1264 }//if
1265 #ifdef VM_TRACE
1266 if(globalData.testOn){
1267 Uint16 proc =
1268 (recNode == 0 ? globalData.ownId : recNode);
1269 signal->header.theSendersBlockRef = sendBRef;
1270 globalSignalLoggers.sendSignal(signal->header,
1271 jobBuffer,
1272 &signal->theData[0],
1273 proc,
1274 sections->m_ptr, noOfSections);
1275 }
1276 #endif
1277
1278 if(recNode == ourProcessor || recNode == 0) {
1279 signal->header.theSendersSignalId = tSignalId;
1280 signal->header.theSendersBlockRef = sendBRef;
1281
1282 Uint32 * dst = signal->theData + length;
1283
1284 /* We need to copy the segmented section data into separate
1285 * sections when sending locally and keeping a copy ourselves
1286 */
1287 for (Uint32 sec=0; sec < noOfSections; sec++)
1288 {
1289 Uint32 secCopy;
1290 if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1291 {
1292 handle_out_of_longsignal_memory(signal);
1293 return;
1294 }
1295 * dst ++ = secCopy;
1296 }
1297
1298 #ifdef NDBD_MULTITHREADED
1299 if (jobBuffer == JBB)
1300 sendlocal(m_threadId, &signal->header, signal->theData,
1301 signal->theData + length);
1302 else
1303 sendprioa(m_threadId, &signal->header, signal->theData,
1304 signal->theData + length);
1305 #else
1306 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1307 #endif
1308 } else {
1309 // send distributed Signal
1310 SignalHeader sh;
1311
1312 Uint32 tTrace = signal->getTrace();
1313
1314 sh.theVerId_signalNumber = gsn;
1315 sh.theReceiversBlockNumber = recBlock;
1316 sh.theSendersBlockRef = refToBlock(sendBRef);
1317 sh.theLength = length;
1318 sh.theTrace = tTrace;
1319 sh.theSignalId = tSignalId;
1320 sh.m_noOfSections = noOfSections;
1321 sh.m_fragmentInfo = tFragInfo;
1322
1323 #ifdef TRACE_DISTRIBUTED
1324 ndbout_c("send: %s(%d) to (%s, %d)",
1325 getSignalName(gsn), gsn, getBlockName(recBlock),
1326 recNode);
1327 #endif
1328
1329 SendStatus ss;
1330 #ifdef NDBD_MULTITHREADED
1331 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1332 recNode, &g_sectionSegmentPool, sections->m_ptr);
1333 #else
1334 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1335 &signal->theData[0], recNode,
1336 g_sectionSegmentPool,
1337 sections->m_ptr);
1338 #endif
1339
1340 if (unlikely(! (ss == SEND_OK ||
1341 ss == SEND_BLOCKED ||
1342 ss == SEND_DISCONNECTED)))
1343 {
1344 handle_send_failed(ss, signal);
1345 }
1346 }
1347
1348 signal->header.m_noOfSections = 0;
1349 signal->header.m_fragmentInfo = 0;
1350 return;
1351 }
1352
1353 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1354 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1355 GlobalSignalNumber gsn,
1356 Signal* signal,
1357 Uint32 length,
1358 JobBufferLevel jobBuffer,
1359 SectionHandle * sections) const {
1360 /**
1361 * Implementation the same as sendSignal(), except that
1362 * the sections are duplicated when sending locally, and
1363 * not released
1364 */
1365
1366 Uint32 noOfSections = sections->m_cnt;
1367 Uint32 tSignalId = signal->header.theSignalId;
1368 Uint32 tTrace = signal->getTrace();
1369 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1370
1371 Uint32 ourProcessor = globalData.ownId;
1372 Uint32 recBlock = rg.m_block;
1373
1374 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1375
1376 signal->header.theLength = length;
1377 signal->header.theVerId_signalNumber = gsn;
1378 signal->header.theReceiversBlockNumber = recBlock;
1379 signal->header.theSendersSignalId = tSignalId;
1380 signal->header.theSendersBlockRef = reference();
1381 signal->header.m_noOfSections = noOfSections;
1382
1383 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1384 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1385 return;
1386 }//if
1387
1388 SignalHeader sh;
1389 sh.theVerId_signalNumber = gsn;
1390 sh.theReceiversBlockNumber = recBlock;
1391 sh.theSendersBlockRef = refToBlock(reference());
1392 sh.theLength = length;
1393 sh.theTrace = tTrace;
1394 sh.theSignalId = tSignalId;
1395 sh.m_noOfSections = noOfSections;
1396 sh.m_fragmentInfo = tFragInfo;
1397
1398 /**
1399 * Check own node
1400 */
1401 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1402 {
1403 #ifdef VM_TRACE
1404 if(globalData.testOn){
1405 globalSignalLoggers.sendSignal(signal->header,
1406 jobBuffer,
1407 &signal->theData[0],
1408 ourProcessor,
1409 sections->m_ptr, noOfSections);
1410 }
1411 #endif
1412
1413 Uint32 * dst = signal->theData + length;
1414
1415 /* We need to copy the segmented section data into separate
1416 * sections when sending locally and keeping a copy ourselves
1417 */
1418 for (Uint32 sec=0; sec < noOfSections; sec++)
1419 {
1420 Uint32 secCopy;
1421 if (unlikely(! ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i)))
1422 {
1423 handle_out_of_longsignal_memory(signal);
1424 return;
1425 }
1426 * dst ++ = secCopy;
1427 }
1428
1429 #ifdef NDBD_MULTITHREADED
1430 if (jobBuffer == JBB)
1431 sendlocal(m_threadId, &signal->header, signal->theData,
1432 signal->theData + length);
1433 else
1434 sendprioa(m_threadId, &signal->header, signal->theData,
1435 signal->theData + length);
1436 #else
1437 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1438 #endif
1439
1440 rg.m_nodes.clear((Uint32)0);
1441 rg.m_nodes.clear(ourProcessor);
1442 }
1443
1444 /**
1445 * Do the big loop
1446 */
1447 Uint32 recNode = 0;
1448 while(!rg.m_nodes.isclear()){
1449 recNode = rg.m_nodes.find(recNode + 1);
1450 rg.m_nodes.clear(recNode);
1451
1452 #ifdef VM_TRACE
1453 if(globalData.testOn){
1454 globalSignalLoggers.sendSignal(signal->header,
1455 jobBuffer,
1456 &signal->theData[0],
1457 recNode,
1458 sections->m_ptr, noOfSections);
1459 }
1460 #endif
1461
1462 #ifdef TRACE_DISTRIBUTED
1463 ndbout_c("send: %s(%d) to (%s, %d)",
1464 getSignalName(gsn), gsn, getBlockName(recBlock),
1465 recNode);
1466 #endif
1467
1468 SendStatus ss;
1469 #ifdef NDBD_MULTITHREADED
1470 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1471 recNode, &g_sectionSegmentPool, sections->m_ptr);
1472 #else
1473 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1474 &signal->theData[0], recNode,
1475 g_sectionSegmentPool,
1476 sections->m_ptr);
1477 #endif
1478
1479 if (unlikely(! (ss == SEND_OK ||
1480 ss == SEND_BLOCKED ||
1481 ss == SEND_DISCONNECTED)))
1482 {
1483 handle_send_failed(ss, signal);
1484 }
1485 }
1486
1487 signal->header.m_noOfSections = 0;
1488 signal->header.m_fragmentInfo = 0;
1489
1490 return;
1491 }
1492
1493
1494 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1495 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1496 GlobalSignalNumber gsn,
1497 Signal* signal,
1498 Uint32 delayInMilliSeconds,
1499 Uint32 length) const {
1500
1501 BlockNumber bnr = refToBlock(ref);
1502
1503 check_sections(signal, signal->header.m_noOfSections, 0);
1504
1505 signal->header.theLength = length;
1506 signal->header.theSendersSignalId = signal->header.theSignalId;
1507 signal->header.theVerId_signalNumber = gsn;
1508 signal->header.theReceiversBlockNumber = bnr;
1509 signal->header.theSendersBlockRef = reference();
1510
1511 #ifdef VM_TRACE
1512 {
1513 if(globalData.testOn){
1514 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1515 signal->header,
1516 0,
1517 &signal->theData[0],
1518 globalData.ownId);
1519 }
1520 }
1521 #endif
1522
1523 #ifdef NDBD_MULTITHREADED
1524 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1525 #else
1526 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1527 #endif
1528
1529 // befor 2nd parameter to globalTimeQueue.insert
1530 // (Priority)theSendSig[sigIndex].jobBuffer
1531 }
1532
1533 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1534 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1535 GlobalSignalNumber gsn,
1536 Signal* signal,
1537 Uint32 delayInMilliSeconds,
1538 Uint32 length,
1539 SectionHandle * sections) const {
1540
1541 Uint32 noOfSections = sections->m_cnt;
1542 BlockNumber bnr = refToBlock(ref);
1543
1544 BlockReference sendBRef = reference();
1545
1546 if (bnr == 0) {
1547 bnr_error();
1548 }//if
1549
1550 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1551
1552 signal->header.theLength = length;
1553 signal->header.theSendersSignalId = signal->header.theSignalId;
1554 signal->header.theSendersBlockRef = sendBRef;
1555 signal->header.theVerId_signalNumber = gsn;
1556 signal->header.theReceiversBlockNumber = bnr;
1557 signal->header.m_noOfSections = noOfSections;
1558
1559 Uint32 * dst = signal->theData + length;
1560 * dst ++ = sections->m_ptr[0].i;
1561 * dst ++ = sections->m_ptr[1].i;
1562 * dst ++ = sections->m_ptr[2].i;
1563
1564 #ifdef VM_TRACE
1565 {
1566 if(globalData.testOn){
1567 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1568 signal->header,
1569 0,
1570 &signal->theData[0],
1571 globalData.ownId);
1572 }
1573 }
1574 #endif
1575
1576 #ifdef NDBD_MULTITHREADED
1577 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1578 #else
1579 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1580 #endif
1581
1582 signal->header.m_noOfSections = 0;
1583 signal->header.m_fragmentInfo = 0;
1584 sections->m_cnt = 0;
1585 }
1586
1587 void
release(SegmentedSectionPtr & ptr)1588 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1589 {
1590 ::release(SB_SP_ARG ptr);
1591 }
1592
1593 void
releaseSection(Uint32 firstSegmentIVal)1594 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1595 {
1596 ::releaseSection(SB_SP_ARG firstSegmentIVal);
1597 }
1598
1599 void
releaseSections(SectionHandle & handle)1600 SimulatedBlock::releaseSections(SectionHandle& handle)
1601 {
1602 ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1603 handle.m_cnt = 0;
1604 }
1605
1606 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1607 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1608 {
1609 return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1610 }
1611
1612 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1613 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1614 {
1615 return ::import(SB_SP_ARG first, src, len);
1616 }
1617
1618 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len)1619 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
1620 {
1621 Ptr<SectionSegment> tmp;
1622 if (::import(SB_SP_ARG tmp, src, len))
1623 {
1624 ptr.i = tmp.i;
1625 ptr.p = tmp.p;
1626 ptr.sz = len;
1627 return true;
1628 }
1629 return false;
1630 }
1631
1632 bool
import(SectionHandle * dst,LinearSectionPtr src[3],Uint32 cnt)1633 SimulatedBlock::import(SectionHandle * dst,
1634 LinearSectionPtr src[3],
1635 Uint32 cnt)
1636 {
1637 ndbassert(dst->m_cnt == 0);
1638 if (dst->m_cnt)
1639 {
1640 releaseSections(* dst);
1641 }
1642
1643 for (Uint32 i = 0; i < cnt; i++)
1644 {
1645 if (unlikely(!import(dst->m_ptr[i], src[i].p, src[i].sz)))
1646 {
1647 if (i)
1648 {
1649 dst->m_cnt = i - 1;
1650 releaseSections(* dst);
1651 return false;
1652 }
1653 }
1654 }
1655 dst->m_cnt = cnt;
1656 return true;
1657 }
1658
1659
1660 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1661 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1662 {
1663 return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1664 }
1665
1666 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1667 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1668 const Uint32* src, Uint32 len)
1669 {
1670 return ::writeToSection(firstSegmentIVal, offset, src, len);
1671 }
1672
1673 class SectionSegmentPool&
getSectionSegmentPool()1674 SimulatedBlock::getSectionSegmentPool(){
1675 return g_sectionSegmentPool;
1676 }
1677
1678 NewVARIABLE *
allocateBat(int batSize)1679 SimulatedBlock::allocateBat(int batSize){
1680 NewVARIABLE* bat = NewVarRef;
1681 bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
1682 NewVarRef = bat;
1683 theBATSize = batSize;
1684 return bat;
1685 }
1686
1687 void
freeBat()1688 SimulatedBlock::freeBat(){
1689 if(NewVarRef != 0){
1690 free(NewVarRef);
1691 NewVarRef = 0;
1692 }
1693 }
1694
1695 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)1696 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1697 assert(blockNo == blockToMain(blockNo));
1698 SimulatedBlock * sb = globalData.getBlock(blockNo);
1699 if (sb != 0 && instanceNo != 0)
1700 sb = sb->getInstance(instanceNo);
1701 if(sb == 0)
1702 return 0;
1703 return sb->NewVarRef;
1704 }
1705
1706 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)1707 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1708 assert(blockNo == blockToMain(blockNo));
1709 SimulatedBlock * sb = globalData.getBlock(blockNo);
1710 if (sb != 0 && instanceNo != 0)
1711 sb = sb->getInstance(instanceNo);
1712 if(sb == 0)
1713 return 0;
1714 return sb->theBATSize;
1715 }
1716
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)1717 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
1718 {
1719 return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1720 }
1721
1722 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)1723 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
1724 {
1725
1726 void * p = NULL;
1727 Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1728 size_t size = n*s + over_alloc;
1729 Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
1730 refresh_watch_dog(9);
1731 if (real_size > 0){
1732 #ifdef VM_TRACE_MEM
1733 ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
1734 getBlockName(number()),
1735 type,
1736 s,
1737 n,
1738 real_size);
1739 #endif
1740 if( real_size == (Uint64)size )
1741 p = ndbd_malloc(size);
1742 if (p == NULL){
1743 char buf1[255];
1744 char buf2[255];
1745 struct ndb_mgm_param_info param_info;
1746 size_t size = sizeof(ndb_mgm_param_info);
1747
1748 if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size)) {
1749 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
1750 getBlockName(number()), param_info.m_name);
1751 } else {
1752 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
1753 getBlockName(number()), type);
1754 }
1755 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
1756 (Uint32)s, (Uint32)n, (Uint64)real_size);
1757 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1758 }
1759
1760 if(clear){
1761 char * ptr = (char*)p;
1762 const Uint32 chunk = 128 * 1024;
1763 while(size > chunk){
1764 refresh_watch_dog(9);
1765 memset(ptr, 0, chunk);
1766 ptr += chunk;
1767 size -= chunk;
1768 }
1769 refresh_watch_dog(9);
1770 memset(ptr, 0, size);
1771 }
1772 if (unaligned_buffer)
1773 {
1774 *unaligned_buffer = p;
1775 p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1776 #ifdef VM_TRACE
1777 g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
1778 type, align, (Uint64)p, (Uint64)p+n*s,
1779 (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1780 #endif
1781 }
1782 }
1783 return p;
1784 }
1785
1786 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)1787 SimulatedBlock::deallocRecord(void ** ptr,
1788 const char * type, size_t s, size_t n){
1789 (void)type;
1790
1791 if(* ptr != 0){
1792 ndbd_free(* ptr, n*s);
1793 * ptr = 0;
1794 }
1795 }
1796
1797 int
sortchunks(const void * _e0,const void * _e1)1798 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
1799 {
1800 const AllocChunk *p0 = (const AllocChunk*)_e0;
1801 const AllocChunk *p1 = (const AllocChunk*)_e1;
1802
1803 if (p0->ptrI > p1->ptrI)
1804 return 1;
1805 if (p0->ptrI < p1->ptrI)
1806 return -1;
1807 return 0;
1808 }
1809
1810 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)1811 SimulatedBlock::allocChunks(AllocChunk dst[],
1812 Uint32 arraysize,
1813 Uint32 rg,
1814 Uint32 pages,
1815 Uint32 paramId)
1816 {
1817 const Uint32 save = pages; // For fail
1818 Uint32 i = 0;
1819 for (; i<arraysize && pages > 0; i++)
1820 {
1821 Uint32 cnt = pages;
1822 m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1823 if (unlikely(cnt == 0))
1824 goto fail;
1825 pages -= cnt;
1826 dst[i].cnt = cnt;
1827 }
1828 if (unlikely(pages != 0))
1829 goto fail;
1830
1831 qsort(dst, i, sizeof(dst[0]), sortchunks);
1832 return i;
1833
1834 fail:
1835 char buf1[255];
1836 char buf2[255];
1837 struct ndb_mgm_param_info param_info;
1838 size_t size = sizeof(ndb_mgm_param_info);
1839
1840 if (ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size) != 0)
1841 {
1842 ndbassert(false);
1843 param_info.m_name = "<unknown>";
1844 }
1845
1846 BaseString::snprintf(buf1, sizeof(buf1),
1847 "%s could not allocate memory for parameter %s",
1848 getBlockName(number()), param_info.m_name);
1849 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
1850 Uint64(save) * sizeof(GlobalPage));
1851 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1852 return 0;
1853 }
1854
1855 void
refresh_watch_dog(Uint32 place)1856 SimulatedBlock::refresh_watch_dog(Uint32 place)
1857 {
1858 #ifdef NDBD_MULTITHREADED
1859 (*m_watchDogCounter) = place;
1860 #else
1861 globalData.incrementWatchDogCounter(place);
1862 #endif
1863 }
1864
1865 void
update_watch_dog_timer(Uint32 interval)1866 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1867 {
1868 extern EmulatorData globalEmulatorData;
1869 globalEmulatorData.theWatchDog->setCheckInterval(interval);
1870 }
1871
1872 void
progError(int line,int err_code,const char * extra) const1873 SimulatedBlock::progError(int line, int err_code, const char* extra) const {
1874 jamNoBlock();
1875
1876 const char *aBlockName = getBlockName(number(), "VM Kernel");
1877
1878 // Pack status of interesting config variables
1879 // so that we can print them in error.log
1880 int magicStatus =
1881 (m_ctx.m_config.stopOnError()<<1) +
1882 (m_ctx.m_config.getInitialStart()<<2);
1883
1884 /* Add line number to block name */
1885 char buf[100];
1886 BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
1887 aBlockName, line, magicStatus);
1888
1889 ErrorReporter::handleError(err_code, extra, buf);
1890
1891 }
1892
1893 void
infoEvent(const char * msg,...) const1894 SimulatedBlock::infoEvent(const char * msg, ...) const {
1895 if(msg == 0)
1896 return;
1897
1898 SignalT<25> signalT;
1899 signalT.theData[0] = NDB_LE_InfoEvent;
1900 char * buf = (char *)(signalT.theData+1);
1901
1902 va_list ap;
1903 va_start(ap, msg);
1904 BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1905 va_end(ap);
1906
1907 size_t len = strlen(buf) + 1;
1908 if(len > 96){
1909 len = 96;
1910 buf[95] = 0;
1911 }
1912
1913 /**
1914 * Init and put it into the job buffer
1915 */
1916 memset(&signalT.header, 0, sizeof(SignalHeader));
1917
1918 const Signal * signal = globalScheduler.getVMSignals();
1919 Uint32 tTrace = signal->header.theTrace;
1920 Uint32 tSignalId = signal->header.theSignalId;
1921
1922 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1923 signalT.header.theReceiversBlockNumber = CMVMI;
1924 signalT.header.theSendersBlockRef = reference();
1925 signalT.header.theTrace = tTrace;
1926 signalT.header.theSignalId = tSignalId;
1927 signalT.header.theLength = (Uint32)((len+3)/4)+1;
1928
1929 #ifdef NDBD_MULTITHREADED
1930 sendlocal(m_threadId,
1931 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1932 #else
1933 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1934 signalT.m_sectionPtrI);
1935 #endif
1936 }
1937
1938 void
warningEvent(const char * msg,...) const1939 SimulatedBlock::warningEvent(const char * msg, ...) const {
1940 if(msg == 0)
1941 return;
1942
1943 SignalT<25> signalT;
1944 signalT.theData[0] = NDB_LE_WarningEvent;
1945 char * buf = (char *)(signalT.theData+1);
1946
1947 va_list ap;
1948 va_start(ap, msg);
1949 BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1950 va_end(ap);
1951
1952 size_t len = strlen(buf) + 1;
1953 if(len > 96){
1954 len = 96;
1955 buf[95] = 0;
1956 }
1957
1958 /**
1959 * Init and put it into the job buffer
1960 */
1961 memset(&signalT.header, 0, sizeof(SignalHeader));
1962
1963 const Signal * signal = globalScheduler.getVMSignals();
1964 Uint32 tTrace = signal->header.theTrace;
1965 Uint32 tSignalId = signal->header.theSignalId;
1966
1967 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1968 signalT.header.theReceiversBlockNumber = CMVMI;
1969 signalT.header.theSendersBlockRef = reference();
1970 signalT.header.theTrace = tTrace;
1971 signalT.header.theSignalId = tSignalId;
1972 signalT.header.theLength = (Uint32)((len+3)/4)+1;
1973
1974 #ifdef NDBD_MULTITHREADED
1975 sendlocal(m_threadId,
1976 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1977 #else
1978 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1979 signalT.m_sectionPtrI);
1980 #endif
1981 }
1982
1983 void
execNODE_STATE_REP(Signal * signal)1984 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
1985 const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0];
1986
1987 this->theNodeState = rep->nodeState;
1988 }
1989
1990 void
execCHANGE_NODE_STATE_REQ(Signal * signal)1991 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
1992 const ChangeNodeStateReq * const req =
1993 (ChangeNodeStateReq *)&signal->theData[0];
1994
1995 this->theNodeState = req->nodeState;
1996 const Uint32 senderData = req->senderData;
1997 const BlockReference senderRef = req->senderRef;
1998
1999 /**
2000 * Pack return signal
2001 */
2002 ChangeNodeStateConf * const conf =
2003 (ChangeNodeStateConf *)&signal->theData[0];
2004
2005 conf->senderData = senderData;
2006
2007 sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
2008 ChangeNodeStateConf::SignalLength, JBB);
2009 }
2010
2011 void
execNDB_TAMPER(Signal * signal)2012 SimulatedBlock::execNDB_TAMPER(Signal * signal){
2013 if (signal->getLength() == 1)
2014 {
2015 SET_ERROR_INSERT_VALUE(signal->theData[0]);
2016 }
2017 else
2018 {
2019 SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
2020 }
2021 }
2022
2023 void
execSIGNAL_DROPPED_REP(Signal * signal)2024 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
2025 /* Note no need for fragmented signal handling as we are
2026 * going to crash this node
2027 */
2028 char msg[64];
2029 const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
2030 BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
2031 rep->originalGsn, rep->originalLength,rep->originalSectionCount);
2032 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
2033 msg,
2034 __FILE__,
2035 NST_ErrorHandler);
2036 }
2037
2038 void
execCONTINUE_FRAGMENTED(Signal * signal)2039 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
2040 jamEntry();
2041
2042 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2043 ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
2044
2045 switch (sig->type)
2046 {
2047 case ContinueFragmented::CONTINUE_SENDING :
2048 {
2049 jam();
2050 Ptr<FragmentSendInfo> fragPtr;
2051
2052 c_segmentedFragmentSendList.first(fragPtr);
2053 for(; !fragPtr.isNull();){
2054 jam();
2055 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2056 c_segmentedFragmentSendList.next(fragPtr);
2057
2058 sendNextSegmentedFragment(signal, * copyPtr.p);
2059 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2060 jam();
2061 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2062 jam();
2063 execute(signal, copyPtr.p->m_callback, 0);
2064 }//if
2065 c_segmentedFragmentSendList.release(copyPtr);
2066 }
2067 }
2068
2069 c_linearFragmentSendList.first(fragPtr);
2070 for(; !fragPtr.isNull();){
2071 jam();
2072 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2073 c_linearFragmentSendList.next(fragPtr);
2074
2075 sendNextLinearFragment(signal, * copyPtr.p);
2076 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2077 jam();
2078 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2079 jam();
2080 execute(signal, copyPtr.p->m_callback, 0);
2081 }//if
2082 c_linearFragmentSendList.release(copyPtr);
2083 }
2084 }
2085
2086 if(c_segmentedFragmentSendList.isEmpty() &&
2087 c_linearFragmentSendList.isEmpty()){
2088 jam();
2089 c_fragSenderRunning = false;
2090 return;
2091 }
2092
2093 sig->type = ContinueFragmented::CONTINUE_SENDING;
2094 sig->line = __LINE__;
2095 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2096 break;
2097 }
2098 case ContinueFragmented::CONTINUE_CLEANUP:
2099 {
2100 jam();
2101
2102 const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2103 /* Check length of signal */
2104 ndbassert(signal->getLength() ==
2105 ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2106 callbackWords);
2107
2108 Callback cb;
2109 memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2110
2111 doNodeFailureCleanup(signal,
2112 sig->cleanup.failedNodeId,
2113 sig->cleanup.resource,
2114 sig->cleanup.cursor,
2115 sig->cleanup.elementsCleaned,
2116 cb);
2117 break;
2118 }
2119 default:
2120 ndbrequire(false);
2121 }
2122 }
2123
2124 void
execSTOP_FOR_CRASH(Signal * signal)2125 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2126 {
2127 #ifdef NDBD_MULTITHREADED
2128 mt_execSTOP_FOR_CRASH();
2129 #endif
2130 }
2131
2132 void
execNODE_START_REP(Signal * signal)2133 SimulatedBlock::execNODE_START_REP(Signal* signal)
2134 {
2135 }
2136
2137 void
execAPI_START_REP(Signal * signal)2138 SimulatedBlock::execAPI_START_REP(Signal* signal)
2139 {
2140 }
2141
2142 void
execSEND_PACKED(Signal * signal)2143 SimulatedBlock::execSEND_PACKED(Signal* signal)
2144 {
2145 }
2146
2147 ATTRIBUTE_NOINLINE
2148 void
handle_execute_error(GlobalSignalNumber gsn)2149 SimulatedBlock::handle_execute_error(GlobalSignalNumber gsn)
2150 {
2151 /**
2152 * This method only called if an error has occurred
2153 */
2154 char errorMsg[255];
2155 if (!(gsn <= MAX_GSN)) {
2156 BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d too high)", gsn);
2157 ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2158 }
2159 if (!(theExecArray[gsn] != 0)) {
2160 BaseString::snprintf(errorMsg, 255, "Illegal signal received (GSN %d not added)", gsn);
2161 ERROR_SET(fatal, NDBD_EXIT_PRGERR, errorMsg, errorMsg);
2162 }
2163 ndbrequire(false);
2164 }
2165
2166 // MT LQH callback CONF via signal
2167
2168 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2169 SimulatedBlock::getCallbackEntry(Uint32 ci)
2170 {
2171 ndbrequire(m_callbackTableAddr != 0);
2172 const CallbackTable& ct = *m_callbackTableAddr;
2173 ndbrequire(ci < ct.m_count);
2174 return ct.m_entry[ci];
2175 }
2176
2177 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 senderData,Uint32 callbackInfo,Uint32 returnCode)2178 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2179 CallbackPtr& cptr,
2180 Uint32 senderData, Uint32 callbackInfo,
2181 Uint32 returnCode)
2182 {
2183 Uint32 blockNo = blockToMain(fullBlockNo);
2184 Uint32 instanceNo = blockToInstance(fullBlockNo);
2185 SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2186 ndbrequire(b != 0);
2187
2188 const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2189
2190 if (!isNdbMtLqh()) {
2191 Callback c;
2192 c.m_callbackFunction = ce.m_function;
2193 c.m_callbackData = cptr.m_callbackData;
2194 b->execute(signal, c, returnCode);
2195
2196 if (ce.m_flags & CALLBACK_ACK) {
2197 jam();
2198 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2199 ack->senderData = senderData;
2200 ack->callbackInfo = callbackInfo;
2201 EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2202 signal, CallbackAck::SignalLength);
2203 }
2204 } else {
2205 CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2206 conf->senderData = senderData;
2207 conf->senderRef = reference();
2208 conf->callbackIndex = cptr.m_callbackIndex;
2209 conf->callbackData = cptr.m_callbackData;
2210 conf->callbackInfo = callbackInfo;
2211 conf->returnCode = returnCode;
2212
2213 if (ce.m_flags & CALLBACK_DIRECT) {
2214 jam();
2215 EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
2216 signal, CallbackConf::SignalLength, instanceNo);
2217 } else {
2218 jam();
2219 BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2220 sendSignal(ref, GSN_CALLBACK_CONF,
2221 signal, CallbackConf::SignalLength, JBB);
2222 }
2223 }
2224 cptr.m_callbackIndex = ZNIL;
2225 }
2226
2227 void
execCALLBACK_CONF(Signal * signal)2228 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2229 {
2230 const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2231
2232 Uint32 senderData = conf->senderData;
2233 Uint32 senderRef = conf->senderRef;
2234 Uint32 callbackIndex = conf->callbackIndex;
2235 Uint32 callbackData = conf->callbackData;
2236 Uint32 callbackInfo = conf->callbackInfo;
2237 Uint32 returnCode = conf->returnCode;
2238
2239 ndbrequire(m_callbackTableAddr != 0);
2240 const CallbackEntry& ce = getCallbackEntry(callbackIndex);
2241 CallbackFunction function = ce.m_function;
2242
2243 Callback callback;
2244 callback.m_callbackFunction = function;
2245 callback.m_callbackData = callbackData;
2246 execute(signal, callback, returnCode);
2247
2248 if (ce.m_flags & CALLBACK_ACK) {
2249 jam();
2250 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2251 ack->senderData = senderData;
2252 ack->callbackInfo = callbackInfo;
2253 sendSignal(senderRef, GSN_CALLBACK_ACK,
2254 signal, CallbackAck::SignalLength, JBB);
2255 }
2256 }
2257
2258 #ifdef VM_TRACE_TIME
2259 void
clearTimes()2260 SimulatedBlock::clearTimes() {
2261 for(Uint32 i = 0; i <= MAX_GSN; i++){
2262 m_timeTrace[i].cnt = 0;
2263 m_timeTrace[i].sum = 0;
2264 m_timeTrace[i].sub = 0;
2265 }
2266 }
2267
2268 void
printTimes(FILE * output)2269 SimulatedBlock::printTimes(FILE * output){
2270 fprintf(output, "-- %s --\n", getBlockName(number()));
2271 Uint64 sum = 0;
2272 for(Uint32 i = 0; i <= MAX_GSN; i++){
2273 Uint32 n = m_timeTrace[i].cnt;
2274 if(n != 0){
2275 double dn = n;
2276
2277 double avg = m_timeTrace[i].sum;
2278 double avg2 = avg - m_timeTrace[i].sub;
2279
2280 avg /= dn;
2281 avg2 /= dn;
2282
2283 fprintf(output,
2284 //name ; cnt ; loc ; acc
2285 "%s ; #%d ; %dus ; %dus ; %dms\n",
2286 getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2287 (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2288
2289 sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2290 }
2291 }
2292 sum = (sum + 500)/ 1000;
2293 fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2294 fprintf(output, "\n");
2295 fflush(output);
2296 }
2297
2298 #endif
2299
FragmentInfo(Uint32 fragId,Uint32 sender)2300 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2301 m_fragmentId = fragId;
2302 m_senderRef = sender;
2303 m_sectionPtrI[0] = RNIL;
2304 m_sectionPtrI[1] = RNIL;
2305 m_sectionPtrI[2] = RNIL;
2306 }
2307
FragmentSendInfo()2308 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2309 {
2310 }
2311
2312 bool
assembleFragments(Signal * signal)2313 SimulatedBlock::assembleFragments(Signal * signal){
2314 Uint32 sigLen = signal->length() - 1;
2315 Uint32 fragId = signal->theData[sigLen];
2316 Uint32 fragInfo = signal->header.m_fragmentInfo;
2317 Uint32 senderRef = signal->getSendersBlockRef();
2318
2319 Uint32 *sectionPtr = signal->m_sectionPtrI;
2320
2321 if(fragInfo == 0){
2322 return true;
2323 }
2324
2325 const Uint32 secs = signal->header.m_noOfSections;
2326 const Uint32 * const secNos = &signal->theData[sigLen - secs];
2327
2328 if(fragInfo == 1){
2329 /**
2330 * First in train
2331 */
2332 Ptr<FragmentInfo> fragPtr;
2333 if(!c_fragmentInfoHash.seize(fragPtr)){
2334 ndbrequire(false);
2335 return false;
2336 }
2337
2338 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2339 c_fragmentInfoHash.add(fragPtr);
2340
2341 for(Uint32 i = 0; i<secs; i++){
2342 Uint32 sectionNo = secNos[i];
2343 ndbassert(sectionNo < 3);
2344 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2345 }
2346
2347 ndbassert(! fragPtr.p->isDropped() );
2348
2349 /**
2350 * Don't release allocated segments
2351 */
2352 signal->header.m_fragmentInfo = 0;
2353 signal->header.m_noOfSections = 0;
2354 return false;
2355 }
2356
2357 FragmentInfo key(fragId, senderRef);
2358 Ptr<FragmentInfo> fragPtr;
2359 if(c_fragmentInfoHash.find(fragPtr, key)){
2360
2361 /**
2362 * FragInfo == 2 or 3
2363 */
2364 if ( likely(! fragPtr.p->isDropped()) )
2365 {
2366 Uint32 i;
2367 for(i = 0; i<secs; i++){
2368 Uint32 sectionNo = secNos[i];
2369 ndbassert(sectionNo < 3);
2370 Uint32 sectionPtrI = sectionPtr[i];
2371 if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2372 linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2373 } else {
2374 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2375 }
2376 }
2377
2378 /**
2379 * fragInfo = 2
2380 */
2381 if(fragInfo == 2){
2382 signal->header.m_fragmentInfo = 0;
2383 signal->header.m_noOfSections = 0;
2384 return false;
2385 }
2386
2387 /**
2388 * fragInfo = 3
2389 */
2390 for(i = 0; i<3; i++){
2391 Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2392 if(ptrI != RNIL){
2393 signal->m_sectionPtrI[i] = ptrI;
2394 } else {
2395 break;
2396 }
2397 }
2398
2399 signal->setLength(sigLen - secs);
2400 signal->header.m_noOfSections = i;
2401 signal->header.m_fragmentInfo = 0;
2402
2403 c_fragmentInfoHash.release(fragPtr);
2404 return true;
2405 }
2406 else
2407 {
2408 /* This fragmented signal has already had at least 1 fragment
2409 * dropped. We must release the received segments.
2410 */
2411 for (Uint32 i=0; i < secs; i++)
2412 releaseSection( sectionPtr[i] );
2413
2414 signal->header.m_fragmentInfo = 0;
2415 signal->header.m_noOfSections = 0;
2416
2417 /* FragInfo == 2
2418 * More fragments to come, keep waiting
2419 */
2420 if (fragInfo == 2)
2421 return false;
2422
2423 /* FragInfo == 3
2424 * That was the last fragment.
2425 * We're now ready for handling the dropped signal.
2426 */
2427 SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2428 Uint32 gsn = signal->header.theVerId_signalNumber;
2429 Uint32 len = signal->header.theLength;
2430 Uint32 newLen= (len > 22 ? 22 : len);
2431 memmove(rep->originalData, signal->theData, (4 * newLen));
2432 rep->originalGsn = gsn;
2433 rep->originalLength = len;
2434 rep->originalSectionCount = 0;
2435 signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2436 signal->header.theLength = newLen + 3;
2437 signal->header.m_noOfSections = 0;
2438 signal->header.m_fragmentInfo = 3;
2439
2440
2441 /**
2442 * NOTE: Don't use EXECUTE_DIRECT as it
2443 * sets sendersBlockRef to reference()
2444 */
2445 /* Perform dropped signal handling, in this thread, now */
2446 jamBuffer()->markEndOfSigExec();
2447 executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2448
2449 /* return false to caller - they should not process the signal */
2450 return false;
2451 } // else (isDropped())
2452 }
2453
2454 /**
2455 * Unable to find fragment
2456 */
2457 ndbrequire(false);
2458 return false;
2459 }
2460
2461 bool
assembleDroppedFragments(Signal * signal)2462 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2463 {
2464 /* This method is called at the start of a SIGNAL_DROPPED_REP
2465 * handler when there is a chance that the dropped signal could
2466 * be part of a fragmented signal.
2467 * If the dropped signal was a fragmented signal, this
2468 * needs to be handled specially to ensure that fragments
2469 * of the signal are correctly dropped to avoid segment
2470 * leaks etc.
2471 * There are a number of cases :
2472 * 1) First fragment dropped (FragInfo=1)
2473 * All remaining fragments must be dropped when they
2474 * arrive. The Signal dropped report handler must be
2475 * executed when the last fragment has arrived.
2476 * 2) Middle fragment dropped (FragInfo=2)
2477 * Any existing stored segments must be released.
2478 * All remaining fragments must be dropped when they
2479 * arrive.
2480 * 3) Last fragment dropped (FragInfo=3)
2481 * Any existing stored segments must be released.
2482 * Signal Dropped handling can occur, so return true.
2483 *
2484 * To indicate that a fragment has been dropped for a signal,
2485 * all the section I Values in the fragment's hash entry are
2486 * set to RNIL.
2487 * Signal Dropped Report handling is performed when the last
2488 * fragment arrives. If the last fragment is not dropped
2489 * by the transporter layer then normal fragment assembly
2490 * arranges for dropped signal handling to occur.
2491 */
2492 Uint32 sigLen = signal->length() - 1;
2493 Uint32 fragId = signal->theData[sigLen];
2494 Uint32 fragInfo = signal->header.m_fragmentInfo;
2495 Uint32 senderRef = signal->getSendersBlockRef();
2496
2497 if(fragInfo == 0){
2498 return true;
2499 }
2500
2501 /* This method is for handling SIGNAL_DROPPED_REP only */
2502 ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2503 ndbrequire(signal->header.m_noOfSections == 0);
2504
2505 if(fragInfo == 1){
2506 /**
2507 * First in train
2508 */
2509 Ptr<FragmentInfo> fragPtr;
2510 if(!c_fragmentInfoHash.seize(fragPtr)){
2511 ndbrequire(false);
2512 return false;
2513 }
2514
2515 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2516 c_fragmentInfoHash.add(fragPtr);
2517
2518 /* Mark entry in hash as belonging to dropped signal so subsequent
2519 * fragments can also be dropped
2520 */
2521 fragPtr.p->m_sectionPtrI[0]= RNIL;
2522 fragPtr.p->m_sectionPtrI[1]= RNIL;
2523 fragPtr.p->m_sectionPtrI[2]= RNIL;
2524
2525 /* Wait for last fragment before SignalDroppedRep handling */
2526 signal->header.m_fragmentInfo = 0;
2527 return false;
2528 }
2529
2530 FragmentInfo key(fragId, senderRef);
2531 Ptr<FragmentInfo> fragPtr;
2532 if(c_fragmentInfoHash.find(fragPtr, key)){
2533
2534 /**
2535 * FragInfo == 2 or 3
2536 */
2537 if (! fragPtr.p->isDropped() )
2538 {
2539 /* Fragmented Signal not already marked as dropped
2540 * Need to free stored segments
2541 */
2542 releaseSection(fragPtr.p->m_sectionPtrI[0]);
2543 releaseSection(fragPtr.p->m_sectionPtrI[1]);
2544 releaseSection(fragPtr.p->m_sectionPtrI[2]);
2545
2546 /* Mark as dropped now */
2547 fragPtr.p->m_sectionPtrI[0]= RNIL;
2548 fragPtr.p->m_sectionPtrI[1]= RNIL;
2549 fragPtr.p->m_sectionPtrI[2]= RNIL;
2550
2551 ndbassert( fragPtr.p->isDropped() );
2552 }
2553
2554 /**
2555 * fragInfo = 2
2556 * Still waiting for final fragments.
2557 * Return false to caller.
2558 */
2559 if(fragInfo == 2){
2560 signal->header.m_fragmentInfo = 0;
2561 return false;
2562 }
2563
2564 /**
2565 * fragInfo = 3
2566 * All fragments received, remove entry
2567 * from hash and return to caller for
2568 * dropped signal handling.
2569 */
2570 signal->header.m_fragmentInfo = 0;
2571
2572 c_fragmentInfoHash.release(fragPtr);
2573 return true;
2574 }
2575
2576 /**
2577 * Unable to find fragment
2578 */
2579 ndbrequire(false);
2580 return false;
2581 }
2582
2583 /**
2584 * doCleanupFragInfo
2585 * Iterate over block's Fragment assembly hash, looking
2586 * for in-assembly fragments from the failed node
2587 * Release these
2588 * Returns after each scanned bucket to avoid consuming
2589 * too much time.
2590 *
2591 * Parameters
2592 * failedNodeId : Node id of failed node
2593 * cursor : Hash bucket to start iteration from
2594 * rtUnitsUsed : Total rt units used
2595 * elementsCleaned : Number of elements cleaned
2596 *
2597 * Updates
2598 * cursor : Hash bucket to continue iteration from
2599 * rtUnitsUsed : += units used
2600 * elementsCleaned : += elements cleaned
2601 *
2602 * Returns
2603 * true if all FragInfo structs cleaned up
2604 * false if more to do
2605 */
2606 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2607 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2608 Uint32& cursor,
2609 Uint32& rtUnitsUsed,
2610 Uint32& elementsCleaned)
2611 {
2612 jam();
2613 DLHashTable<FragmentInfo>::Iterator iter;
2614
2615 c_fragmentInfoHash.next(cursor, iter);
2616
2617 const Uint32 startBucket = iter.bucket;
2618
2619 while (!iter.isNull() &&
2620 (iter.bucket == startBucket))
2621 {
2622 jam();
2623
2624 Ptr<FragmentInfo> curr = iter.curr;
2625 c_fragmentInfoHash.next(iter);
2626
2627 FragmentInfo* fragInfo = curr.p;
2628
2629 if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2630 {
2631 jam();
2632 /* We were assembling a fragmented signal from the
2633 * failed node, discard the partially assembled
2634 * sections and free the FragmentInfo hash entry
2635 */
2636 for(Uint32 s = 0; s<3; s++)
2637 {
2638 if (fragInfo->m_sectionPtrI[s] != RNIL)
2639 {
2640 jam();
2641 SegmentedSectionPtr ssptr;
2642 getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2643 release(ssptr);
2644 }
2645 }
2646
2647 /* Release FragmentInfo hash element */
2648 c_fragmentInfoHash.release(curr);
2649
2650 elementsCleaned++;
2651 rtUnitsUsed+=3;
2652 }
2653
2654 rtUnitsUsed++;
2655 } // while
2656
2657 cursor = iter.bucket;
2658 return iter.isNull();
2659 }
2660
2661 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2662 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2663 Uint32& cursor,
2664 Uint32& rtUnitsUsed,
2665 Uint32& elementsCleaned)
2666 {
2667 jam();
2668
2669 Ptr<FragmentSendInfo> fragPtr;
2670 const Uint32 NumSendLists = 2;
2671 ndbrequire(cursor < NumSendLists);
2672
2673 DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
2674 { &c_segmentedFragmentSendList,
2675 &c_linearFragmentSendList };
2676
2677 DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
2678
2679 list->first(fragPtr);
2680 for(; !fragPtr.isNull();){
2681 jam();
2682 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2683 list->next(fragPtr);
2684 rtUnitsUsed++;
2685
2686 NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
2687
2688 if (rg.m_nodes.get(failedNodeId))
2689 {
2690 jam();
2691 /* Fragmented signal is being sent to node */
2692 rg.m_nodes.clear(failedNodeId);
2693
2694 if (rg.m_nodes.isclear())
2695 {
2696 jam();
2697 /* No other nodes in receiver group - send
2698 * is cancelled
2699 * Will be cleaned up in the usual CONTINUE_FRAGMENTED
2700 * handling code.
2701 */
2702 copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2703 }
2704 elementsCleaned++;
2705 }
2706 }
2707
2708 /* Next time we'll do the next list */
2709 cursor++;
2710
2711 return (cursor == NumSendLists);
2712 }
2713
2714
2715 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)2716 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
2717 Uint32 failedNodeId,
2718 Uint32 resource,
2719 Uint32 cursor,
2720 Uint32 elementsCleaned,
2721 Callback& cb)
2722 {
2723 jam();
2724 const bool userCallback = (cb.m_callbackFunction != 0);
2725 const Uint32 maxRtUnits = userCallback ?
2726 #ifdef VM_TRACE
2727 2 :
2728 #else
2729 16 :
2730 #endif
2731 ~0; /* Must complete all processing in this call */
2732
2733 Uint32 rtUnitsUsed = 0;
2734
2735 /* Loop over resources, cleaning them up */
2736 do
2737 {
2738 bool resourceDone= false;
2739 switch(resource) {
2740 case ContinueFragmented::RES_FRAGSEND:
2741 {
2742 jam();
2743 resourceDone = doCleanupFragSend(failedNodeId, cursor,
2744 rtUnitsUsed, elementsCleaned);
2745 break;
2746 }
2747 case ContinueFragmented::RES_FRAGINFO:
2748 {
2749 jam();
2750 resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2751 rtUnitsUsed, elementsCleaned);
2752 break;
2753 }
2754 case ContinueFragmented::RES_LAST:
2755 {
2756 jam();
2757 /* Node failure processing complete, execute user callback if provided */
2758 if (userCallback)
2759 execute(signal, cb, elementsCleaned);
2760
2761 return elementsCleaned;
2762 }
2763 default:
2764 ndbrequire(false);
2765 }
2766
2767 /* Did we complete cleaning up this resource? */
2768 if (resourceDone)
2769 {
2770 resource++;
2771 cursor= 0;
2772 }
2773
2774 } while (rtUnitsUsed <= maxRtUnits);
2775
2776 jam();
2777
2778 /* Not yet completed failure handling.
2779 * Must have exhausted RT units.
2780 * Update cursor and re-invoke
2781 */
2782 ndbassert(userCallback);
2783
2784 /* Send signal to continue processing */
2785
2786 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2787 sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2788 sig->cleanup.failedNodeId = failedNodeId;
2789 sig->cleanup.resource = resource;
2790 sig->cleanup.cursor = cursor;
2791 sig->cleanup.elementsCleaned= elementsCleaned;
2792 Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2793 Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2794 callbackWords;
2795 ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
2796 memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2797
2798 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2799
2800 return elementsCleaned;
2801 }
2802
2803 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)2804 SimulatedBlock::simBlockNodeFailure(Signal* signal,
2805 Uint32 failedNodeId,
2806 Callback& cb)
2807 {
2808 jam();
2809 return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2810 }
2811
2812 Uint32
debugPrintFragmentCounts()2813 SimulatedBlock::debugPrintFragmentCounts()
2814 {
2815 const char* blockName = getBlockName(theNumber);
2816 DLHashTable<FragmentInfo>::Iterator iter;
2817 Uint32 fragmentInfoCount = 0;
2818 c_fragmentInfoHash.first(iter);
2819
2820 while(!iter.isNull())
2821 {
2822 fragmentInfoCount++;
2823 c_fragmentInfoHash.next(iter);
2824 }
2825
2826 Ptr<FragmentSendInfo> ptr;
2827 Uint32 linSendInfoCount = 0;
2828
2829 c_linearFragmentSendList.first(ptr);
2830
2831 while (!ptr.isNull())
2832 {
2833 linSendInfoCount++;
2834 c_linearFragmentSendList.next(ptr);
2835 }
2836
2837 Uint32 segSendInfoCount = 0;
2838 c_segmentedFragmentSendList.first(ptr);
2839
2840 while (!ptr.isNull())
2841 {
2842 segSendInfoCount++;
2843 c_segmentedFragmentSendList.next(ptr);
2844 }
2845
2846 ndbout_c("%s : Fragment assembly hash entry count : %d",
2847 blockName,
2848 fragmentInfoCount);
2849
2850 ndbout_c("%s : Linear fragment send list size : %d",
2851 blockName,
2852 linSendInfoCount);
2853
2854 ndbout_c("%s : Segmented fragment send list size : %d",
2855 blockName,
2856 segSendInfoCount);
2857
2858 return fragmentInfoCount +
2859 linSendInfoCount +
2860 segSendInfoCount;
2861 }
2862
2863
2864 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)2865 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
2866 NodeReceiverGroup rg,
2867 GlobalSignalNumber gsn,
2868 Signal* signal,
2869 Uint32 length,
2870 JobBufferLevel jbuf,
2871 SectionHandle* sections,
2872 bool noRelease,
2873 Uint32 messageSize) {
2874
2875 Uint32 noSections = sections->m_cnt;
2876 SegmentedSectionPtr * ptr = sections->m_ptr;
2877
2878 info.m_sectionPtr[0].m_segmented.i = RNIL;
2879 info.m_sectionPtr[1].m_segmented.i = RNIL;
2880 info.m_sectionPtr[2].m_segmented.i = RNIL;
2881
2882 Uint32 totalSize = 0;
2883 switch(noSections){
2884 case 3:
2885 info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2886 info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2887 totalSize += ptr[2].sz;
2888 case 2:
2889 info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2890 info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2891 totalSize += ptr[1].sz;
2892 case 1:
2893 info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2894 info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2895 totalSize += ptr[0].sz;
2896 }
2897
2898 if(totalSize <= messageSize + SectionSegment::DataLength){
2899 /**
2900 * Send signal directly
2901 */
2902 if (noRelease)
2903 sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
2904 else
2905 sendSignal(rg, gsn, signal, length, jbuf, sections);
2906
2907 info.m_status = FragmentSendInfo::SendComplete;
2908 return true;
2909 }
2910
2911 /**
2912 * Setup info object
2913 */
2914 info.m_status = FragmentSendInfo::SendNotComplete;
2915 info.m_prio = (Uint8)jbuf;
2916 info.m_gsn = gsn;
2917 info.m_fragInfo = 1;
2918 info.m_flags = 0;
2919 info.m_messageSize = messageSize;
2920 info.m_fragmentId = c_fragmentIdCounter++;
2921 info.m_nodeReceiverGroup = rg;
2922 info.m_callback.m_callbackFunction = 0;
2923
2924 if (noRelease)
2925 {
2926 /* Record info that we are not releasing segments */
2927 info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2928 }
2929 else
2930 {
2931 /**
2932 * Clear sections in caller's handle. Actual send
2933 * will consume them
2934 */
2935 sections->m_cnt = 0;
2936 }
2937
2938 /* Store main signal data in a segment for sending later */
2939 Ptr<SectionSegment> tmp;
2940 if(!import(tmp, &signal->theData[0], length))
2941 {
2942 handle_out_of_longsignal_memory(0);
2943 return false;
2944 }
2945 info.m_theDataSection.p = &tmp.p->theData[0];
2946 info.m_theDataSection.sz = length;
2947 tmp.p->theData[length] = tmp.i;
2948
2949 sendNextSegmentedFragment(signal, info);
2950
2951 if(c_fragmentIdCounter == 0){
2952 /**
2953 * Fragment id 0 is invalid
2954 */
2955 c_fragmentIdCounter = 1;
2956 }
2957
2958 return true;
2959 }
2960
2961 #if 0
2962 #define lsout(x) x
2963 #else
2964 #define lsout(x)
2965 #endif
2966
2967 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)2968 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
2969 FragmentSendInfo & info){
2970
2971 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2972 {
2973 /* Send was cancelled - all dest. nodes have failed
2974 * since send was started
2975 */
2976 if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2977 {
2978 /*
2979 * Free any sections still to be sent
2980 */
2981 SectionHandle handle(this);
2982 for (Uint32 s = 0; s < 3; s++)
2983 {
2984 Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2985 if (sectionI != RNIL)
2986 {
2987 getSection(handle.m_ptr[handle.m_cnt], sectionI);
2988 info.m_sectionPtr[s].m_segmented.i = RNIL;
2989 info.m_sectionPtr[s].m_segmented.p = NULL;
2990 handle.m_cnt++;
2991 }
2992 }
2993
2994 releaseSections(handle);
2995 }
2996
2997 /* Free inline signal data storage section */
2998 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2999 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3000
3001 info.m_status = FragmentSendInfo::SendComplete;
3002 return;
3003 }
3004
3005 /**
3006 * Setup main signal data from stored copy
3007 */
3008 const Uint32 sigLen = info.m_theDataSection.sz;
3009 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3010
3011 Uint32 sz = 0;
3012 Uint32 maxSz = info.m_messageSize;
3013
3014 Int32 secNo = 2;
3015 Uint32 secCount = 0;
3016 Uint32 * secNos = &signal->theData[sigLen];
3017
3018 SectionHandle sections(this);
3019 SegmentedSectionPtr *ptr = sections.m_ptr;
3020
3021 bool split= false;
3022 Uint32 splitSectionStartI= RNIL;
3023 SectionSegment* splitSectionStartP= NULL;
3024 Uint32 splitSectionLastSegment= RNIL;
3025 Uint32 splitSectionSz= 0;
3026
3027 enum { Unknown = 0, Full = 1 } loop = Unknown;
3028 for(; secNo >= 0 && secCount < 3; secNo--){
3029 Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
3030 if(ptrI == RNIL)
3031 continue;
3032
3033 info.m_sectionPtr[secNo].m_segmented.i = RNIL;
3034
3035 SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
3036 const Uint32 size = ptrP->m_sz;
3037
3038 ptr[secCount].i = ptrI;
3039 ptr[secCount].p = ptrP;
3040 ptr[secCount].sz = size;
3041 secNos[secCount] = secNo;
3042 secCount++;
3043
3044 const Uint32 sizeLeft = maxSz - sz;
3045 if(size <= sizeLeft){
3046 /**
3047 * The section fits
3048 */
3049 sz += size;
3050 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3051 continue;
3052 }
3053
3054 const Uint32 overflow = size - sizeLeft; // > 0
3055 if(overflow <= SectionSegment::DataLength){
3056 /**
3057 * Only one segment left to send
3058 * send even if sizeLeft <= size
3059 */
3060 lsout(ndbout_c("section %d saved as %d but full over: %d",
3061 secNo, secCount-1, overflow));
3062 secNo--;
3063 break;
3064 }
3065
3066 // size >= 61
3067 if(sizeLeft < SectionSegment::DataLength){
3068 /**
3069 * Less than one segment left (space)
3070 * dont bother sending
3071 */
3072 secCount--;
3073 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3074 loop = Full;
3075 lsout(ndbout_c("section %d not saved", secNo));
3076 break;
3077 }
3078
3079 /**
3080 * Split list
3081 * 1) Find place to split
3082 * 2) Rewrite header (the part that will be sent)
3083 * 3) Write new header (for remaining part)
3084 * 4) Store new header on FragmentSendInfo - record
3085 */
3086 // size >= 61 && sizeLeft >= 60
3087 Uint32 sum = SectionSegment::DataLength;
3088 Uint32 prevPtrI = ptrI;
3089 ptrI = ptrP->m_nextSegment;
3090 const Uint32 fill = sizeLeft - SectionSegment::DataLength;
3091 while(sum < fill){
3092 prevPtrI = ptrI;
3093 ptrP = g_sectionSegmentPool.getPtr(ptrI);
3094 ptrI = ptrP->m_nextSegment;
3095 sum += SectionSegment::DataLength;
3096 }
3097
3098 Uint32 prev = secCount - 1;
3099 /**
3100 * Record details of the section pre-split
3101 * This allows the split to be 'healed' afterwards in the
3102 * no release case.
3103 */
3104 split= true;
3105 splitSectionStartI= ptr[prev].i;
3106 splitSectionStartP= ptr[prev].p;
3107 splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3108 splitSectionSz= splitSectionStartP->m_sz;
3109
3110 /**
3111 * Rewrite header w.r.t size and last
3112 * This is what will be sent in this fragment.
3113 */
3114 splitSectionStartP->m_lastSegment = prevPtrI;
3115 splitSectionStartP->m_sz = sum;
3116 ptr[prev].sz = sum;
3117
3118 /**
3119 * Write "new" list header
3120 * This is what remains to be sent in this section
3121 */
3122 ptrP = g_sectionSegmentPool.getPtr(ptrI);
3123 ptrP->m_lastSegment = splitSectionLastSegment;
3124 ptrP->m_sz = size - sum;
3125
3126 /**
3127 * And store it on info-record
3128 */
3129 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3130 info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3131
3132 loop = Full;
3133 lsout(ndbout_c("section %d split into %d", secNo, prev));
3134 break;
3135 }
3136
3137 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3138 loop, secNo, secCount, sz));
3139
3140 /**
3141 * Store fragment id
3142 */
3143 secNos[secCount] = info.m_fragmentId;
3144
3145 Uint32 fragInfo = info.m_fragInfo;
3146 info.m_fragInfo = 2;
3147 switch(loop){
3148 case Unknown:
3149 if(secNo >= 0){
3150 lsout(ndbout_c("Unknown - Full"));
3151 /**
3152 * Not finished
3153 */
3154 break;
3155 }
3156 // Fall through
3157 lsout(ndbout_c("Unknown - Done"));
3158 info.m_status = FragmentSendInfo::SendComplete;
3159 ndbassert(fragInfo == 2);
3160 fragInfo = 3;
3161 case Full:
3162 break;
3163 }
3164
3165 signal->header.m_fragmentInfo = fragInfo;
3166 signal->header.m_noOfSections = 0;
3167 sections.m_cnt = secCount;
3168
3169 if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3170 {
3171 sendSignalNoRelease(info.m_nodeReceiverGroup,
3172 info.m_gsn,
3173 signal,
3174 sigLen + secCount + 1,
3175 (JobBufferLevel)info.m_prio,
3176 §ions);
3177 /* NoRelease leaves SectionHandle populated, we'll
3178 * clear it here. The actual sections themselves
3179 * remain allocated.
3180 */
3181 sections.m_cnt = 0;
3182
3183 if (split)
3184 {
3185 /* There was a split section, which required us to modify the
3186 * segment list.
3187 * Now restore the split section's segment list back to
3188 * its previous state
3189 * (Only really required for first segment, but we do
3190 * it for all of them, to be a good citizen)
3191 */
3192 ndbrequire( splitSectionStartI != RNIL );
3193 ndbrequire( splitSectionStartP != NULL );
3194 ndbrequire( splitSectionLastSegment != RNIL );
3195
3196 splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3197 splitSectionStartP->m_sz= splitSectionSz;
3198
3199 /* Check our handiwork */
3200 assert(verifySection(splitSectionStartI));
3201 }
3202 }
3203 else
3204 {
3205 /* Normal, release sections case */
3206 sendSignal(info.m_nodeReceiverGroup,
3207 info.m_gsn,
3208 signal,
3209 sigLen + secCount + 1,
3210 (JobBufferLevel)info.m_prio,
3211 §ions);
3212 }
3213
3214 if(fragInfo == 3){
3215 /**
3216 * This is the last signal
3217 * Release saved 'main signal' words segment
3218 */
3219 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3220 }
3221 }
3222
3223 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3224 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3225 NodeReceiverGroup rg,
3226 GlobalSignalNumber gsn,
3227 Signal* signal,
3228 Uint32 length,
3229 JobBufferLevel jbuf,
3230 LinearSectionPtr ptr[3],
3231 Uint32 noOfSections,
3232 Uint32 messageSize){
3233
3234 check_sections(signal, signal->header.m_noOfSections, noOfSections);
3235
3236 info.m_sectionPtr[0].m_linear.p = NULL;
3237 info.m_sectionPtr[1].m_linear.p = NULL;
3238 info.m_sectionPtr[2].m_linear.p = NULL;
3239
3240 Uint32 totalSize = 0;
3241 switch(noOfSections){
3242 case 3:
3243 info.m_sectionPtr[2].m_linear = ptr[2];
3244 totalSize += ptr[2].sz;
3245 case 2:
3246 info.m_sectionPtr[1].m_linear = ptr[1];
3247 totalSize += ptr[1].sz;
3248 case 1:
3249 info.m_sectionPtr[0].m_linear = ptr[0];
3250 totalSize += ptr[0].sz;
3251 }
3252
3253 if(totalSize <= messageSize + SectionSegment::DataLength){
3254 /**
3255 * Send signal directly
3256 */
3257 sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3258 info.m_status = FragmentSendInfo::SendComplete;
3259
3260 /**
3261 * Indicate to sendLinearSignalFragment
3262 * that we'r already done
3263 */
3264 return true;
3265 }
3266
3267 /**
3268 * Setup info object
3269 */
3270 info.m_status = FragmentSendInfo::SendNotComplete;
3271 info.m_prio = (Uint8)jbuf;
3272 info.m_gsn = gsn;
3273 info.m_messageSize = messageSize;
3274 info.m_fragInfo = 1;
3275 info.m_flags = 0;
3276 info.m_fragmentId = c_fragmentIdCounter++;
3277 info.m_nodeReceiverGroup = rg;
3278 info.m_callback.m_callbackFunction = 0;
3279
3280 Ptr<SectionSegment> tmp;
3281 if(unlikely(!import(tmp, &signal->theData[0], length)))
3282 {
3283 handle_out_of_longsignal_memory(0);
3284 return false;
3285 }
3286
3287 info.m_theDataSection.p = &tmp.p->theData[0];
3288 info.m_theDataSection.sz = length;
3289 tmp.p->theData[length] = tmp.i;
3290
3291 sendNextLinearFragment(signal, info);
3292
3293 if(c_fragmentIdCounter == 0){
3294 /**
3295 * Fragment id 0 is invalid
3296 */
3297 c_fragmentIdCounter = 1;
3298 }
3299
3300 return true;
3301 }
3302
3303 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3304 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3305 FragmentSendInfo & info){
3306
3307 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3308 {
3309 /* Send was cancelled - all dest. nodes have failed
3310 * since send was started
3311 */
3312 /* Free inline signal data storage section */
3313 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3314 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3315
3316 info.m_status = FragmentSendInfo::SendComplete;
3317 return;
3318 }
3319
3320 /**
3321 * Store "theData"
3322 */
3323 const Uint32 sigLen = info.m_theDataSection.sz;
3324 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3325
3326 Uint32 sz = 0;
3327 Uint32 maxSz = info.m_messageSize;
3328
3329 Int32 secNo = 2;
3330 Uint32 secCount = 0;
3331 Uint32 * secNos = &signal->theData[sigLen];
3332 LinearSectionPtr signalPtr[3];
3333
3334 enum { Unknown = 0, Full = 2 } loop = Unknown;
3335 for(; secNo >= 0 && secCount < 3; secNo--){
3336 Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3337 if(ptrP == NULL)
3338 continue;
3339
3340 info.m_sectionPtr[secNo].m_linear.p = NULL;
3341 const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3342
3343 signalPtr[secCount].p = ptrP;
3344 signalPtr[secCount].sz = size;
3345 secNos[secCount] = secNo;
3346 secCount++;
3347
3348 const Uint32 sizeLeft = maxSz - sz;
3349 if(size <= sizeLeft){
3350 /**
3351 * The section fits
3352 */
3353 sz += size;
3354 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3355 continue;
3356 }
3357
3358 const Uint32 overflow = size - sizeLeft; // > 0
3359 if(overflow <= SectionSegment::DataLength){
3360 /**
3361 * Only one segment left to send
3362 * send even if sizeLeft <= size
3363 */
3364 lsout(ndbout_c("section %d saved as %d but full over: %d",
3365 secNo, secCount-1, overflow));
3366 secNo--;
3367 break;
3368 }
3369
3370 // size >= 61
3371 if(sizeLeft < SectionSegment::DataLength){
3372 /**
3373 * Less than one segment left (space)
3374 * dont bother sending
3375 */
3376 secCount--;
3377 info.m_sectionPtr[secNo].m_linear.p = ptrP;
3378 loop = Full;
3379 lsout(ndbout_c("section %d not saved", secNo));
3380 break;
3381 }
3382
3383 /**
3384 * Split list
3385 * 1) Find place to split
3386 * 2) Rewrite header (the part that will be sent)
3387 * 3) Write new header (for remaining part)
3388 * 4) Store new header on FragmentSendInfo - record
3389 */
3390 Uint32 sum = sizeLeft;
3391 sum /= SectionSegment::DataLength;
3392 sum *= SectionSegment::DataLength;
3393
3394 /**
3395 * Rewrite header w.r.t size
3396 */
3397 Uint32 prev = secCount - 1;
3398 signalPtr[prev].sz = sum;
3399
3400 /**
3401 * Write/store "new" header
3402 */
3403 info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3404 info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3405
3406 loop = Full;
3407 lsout(ndbout_c("section %d split into %d", secNo, prev));
3408 break;
3409 }
3410
3411 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3412 loop, secNo, secCount, sz));
3413
3414 /**
3415 * Store fragment id
3416 */
3417 secNos[secCount] = info.m_fragmentId;
3418
3419 Uint32 fragInfo = info.m_fragInfo;
3420 info.m_fragInfo = 2;
3421 switch(loop){
3422 case Unknown:
3423 if(secNo >= 0){
3424 lsout(ndbout_c("Unknown - Full"));
3425 /**
3426 * Not finished
3427 */
3428 break;
3429 }
3430 // Fall through
3431 lsout(ndbout_c("Unknown - Done"));
3432 info.m_status = FragmentSendInfo::SendComplete;
3433 ndbassert(fragInfo == 2);
3434 fragInfo = 3;
3435 case Full:
3436 break;
3437 }
3438
3439 signal->header.m_noOfSections = 0;
3440 signal->header.m_fragmentInfo = fragInfo;
3441
3442 sendSignal(info.m_nodeReceiverGroup,
3443 info.m_gsn,
3444 signal,
3445 sigLen + secCount + 1,
3446 (JobBufferLevel)info.m_prio,
3447 signalPtr,
3448 secCount);
3449
3450 if(fragInfo == 3){
3451 /**
3452 * This is the last signal
3453 */
3454 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3455 }
3456 }
3457
3458 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3459 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3460 GlobalSignalNumber gsn,
3461 Signal* signal,
3462 Uint32 length,
3463 JobBufferLevel jbuf,
3464 SectionHandle* sections,
3465 Callback & c,
3466 Uint32 messageSize){
3467 bool res = true;
3468 Ptr<FragmentSendInfo> tmp;
3469 res = c_segmentedFragmentSendList.seizeFirst(tmp);
3470 ndbrequire(res);
3471
3472 res = sendFirstFragment(* tmp.p,
3473 NodeReceiverGroup(ref),
3474 gsn,
3475 signal,
3476 length,
3477 jbuf,
3478 sections,
3479 false, // Release sections on send
3480 messageSize);
3481 ndbrequire(res);
3482
3483 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3484 c_segmentedFragmentSendList.release(tmp);
3485 if(c.m_callbackFunction != 0)
3486 execute(signal, c, 0);
3487 return;
3488 }
3489 tmp.p->m_callback = c;
3490
3491 if(!c_fragSenderRunning)
3492 {
3493 SaveSignal<2> save(signal);
3494 c_fragSenderRunning = true;
3495 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3496 sig->type = ContinueFragmented::CONTINUE_SENDING;
3497 sig->line = __LINE__;
3498 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3499 }
3500 }
3501
3502 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3503 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3504 GlobalSignalNumber gsn,
3505 Signal* signal,
3506 Uint32 length,
3507 JobBufferLevel jbuf,
3508 SectionHandle * sections,
3509 Callback & c,
3510 Uint32 messageSize){
3511 bool res = true;
3512 Ptr<FragmentSendInfo> tmp;
3513 res = c_segmentedFragmentSendList.seizeFirst(tmp);
3514 ndbrequire(res);
3515
3516 res = sendFirstFragment(* tmp.p,
3517 rg,
3518 gsn,
3519 signal,
3520 length,
3521 jbuf,
3522 sections,
3523 false, // Release sections on send
3524 messageSize);
3525 ndbrequire(res);
3526
3527 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3528 c_segmentedFragmentSendList.release(tmp);
3529 if(c.m_callbackFunction != 0)
3530 execute(signal, c, 0);
3531 return;
3532 }
3533 tmp.p->m_callback = c;
3534
3535 if(!c_fragSenderRunning)
3536 {
3537 SaveSignal<2> save(signal);
3538 c_fragSenderRunning = true;
3539 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3540 sig->type = ContinueFragmented::CONTINUE_SENDING;
3541 sig->line = __LINE__;
3542 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3543 }
3544 }
3545
3546 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3547 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3548 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3549 { abort(); /* should never be called */ }
3550 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3551 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3552
3553 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3554 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3555 GlobalSignalNumber gsn,
3556 Signal* signal,
3557 Uint32 length,
3558 JobBufferLevel jbuf,
3559 LinearSectionPtr ptr[3],
3560 Uint32 noOfSections,
3561 Callback & c,
3562 Uint32 messageSize){
3563 bool res = true;
3564 Ptr<FragmentSendInfo> tmp;
3565 res = c_linearFragmentSendList.seizeFirst(tmp);
3566 ndbrequire(res);
3567
3568 res = sendFirstFragment(* tmp.p,
3569 NodeReceiverGroup(ref),
3570 gsn,
3571 signal,
3572 length,
3573 jbuf,
3574 ptr,
3575 noOfSections,
3576 messageSize);
3577 ndbrequire(res);
3578
3579 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3580 c_linearFragmentSendList.release(tmp);
3581 if(c.m_callbackFunction != 0)
3582 execute(signal, c, 0);
3583 return;
3584 }
3585 tmp.p->m_callback = c;
3586
3587 if(!c_fragSenderRunning)
3588 {
3589 SaveSignal<2> save(signal);
3590 c_fragSenderRunning = true;
3591 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3592 sig->type = ContinueFragmented::CONTINUE_SENDING;
3593 sig->line = __LINE__;
3594 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3595 }
3596 }
3597
3598 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3599 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3600 GlobalSignalNumber gsn,
3601 Signal* signal,
3602 Uint32 length,
3603 JobBufferLevel jbuf,
3604 LinearSectionPtr ptr[3],
3605 Uint32 noOfSections,
3606 Callback & c,
3607 Uint32 messageSize){
3608 bool res = true;
3609 Ptr<FragmentSendInfo> tmp;
3610 res = c_linearFragmentSendList.seizeFirst(tmp);
3611 ndbrequire(res);
3612
3613 res = sendFirstFragment(* tmp.p,
3614 rg,
3615 gsn,
3616 signal,
3617 length,
3618 jbuf,
3619 ptr,
3620 noOfSections,
3621 messageSize);
3622 ndbrequire(res);
3623
3624 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3625 c_linearFragmentSendList.release(tmp);
3626 if(c.m_callbackFunction != 0)
3627 execute(signal, c, 0);
3628 return;
3629 }
3630 tmp.p->m_callback = c;
3631
3632 if(!c_fragSenderRunning)
3633 {
3634 SaveSignal<2> save(signal);
3635 c_fragSenderRunning = true;
3636 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3637 sig->type = ContinueFragmented::CONTINUE_SENDING;
3638 sig->line = __LINE__;
3639 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3640 }
3641 }
3642
3643 NodeInfo &
setNodeInfo(NodeId nodeId)3644 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3645 ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3646 return globalData.m_nodeInfo[nodeId];
3647 }
3648
3649 bool
isMultiThreaded()3650 SimulatedBlock::isMultiThreaded()
3651 {
3652 #ifdef NDBD_MULTITHREADED
3653 return true;
3654 #else
3655 return false;
3656 #endif
3657 }
3658
3659
3660 void
execUTIL_CREATE_LOCK_REF(Signal * signal)3661 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
3662 jamEntry();
3663 c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3664 }
3665
execUTIL_CREATE_LOCK_CONF(Signal * signal)3666 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
3667 jamEntry();
3668 c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3669 }
3670
execUTIL_DESTORY_LOCK_REF(Signal * signal)3671 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
3672 jamEntry();
3673 c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3674 }
3675
execUTIL_DESTORY_LOCK_CONF(Signal * signal)3676 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
3677 jamEntry();
3678 c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3679 }
3680
execUTIL_LOCK_REF(Signal * signal)3681 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
3682 jamEntry();
3683 c_mutexMgr.execUTIL_LOCK_REF(signal);
3684 }
3685
execUTIL_LOCK_CONF(Signal * signal)3686 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
3687 jamEntry();
3688 c_mutexMgr.execUTIL_LOCK_CONF(signal);
3689 }
3690
execUTIL_UNLOCK_REF(Signal * signal)3691 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
3692 jamEntry();
3693 c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3694 }
3695
execUTIL_UNLOCK_CONF(Signal * signal)3696 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
3697 jamEntry();
3698 c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3699 }
3700
3701 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)3702 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
3703 Uint32 ptrI, Uint32 retVal){
3704 c_mutexMgr.release(ptrI);
3705 }
3706
3707 void
fsRefError(Signal * signal,Uint32 line,const char * msg)3708 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
3709 {
3710 const FsRef *fsRef = (FsRef*)signal->getDataPtr();
3711 Uint32 errorCode = fsRef->errorCode;
3712 Uint32 osErrorCode = fsRef->osErrorCode;
3713 char msg2[100];
3714
3715 sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3716
3717 progError(line, errorCode, msg2);
3718 }
3719
3720 void
execFSWRITEREF(Signal * signal)3721 SimulatedBlock::execFSWRITEREF(Signal* signal)
3722 {
3723 fsRefError(signal, __LINE__, "File system write failed");
3724 }
3725
3726 void
execFSREADREF(Signal * signal)3727 SimulatedBlock::execFSREADREF(Signal* signal)
3728 {
3729 fsRefError(signal, __LINE__, "File system read failed");
3730 }
3731
3732 void
execFSCLOSEREF(Signal * signal)3733 SimulatedBlock::execFSCLOSEREF(Signal* signal)
3734 {
3735 fsRefError(signal, __LINE__, "File system close failed");
3736 }
3737
3738 void
execFSOPENREF(Signal * signal)3739 SimulatedBlock::execFSOPENREF(Signal* signal)
3740 {
3741 fsRefError(signal, __LINE__, "File system open failed");
3742 }
3743
3744 void
execFSREMOVEREF(Signal * signal)3745 SimulatedBlock::execFSREMOVEREF(Signal* signal)
3746 {
3747 fsRefError(signal, __LINE__, "File system remove failed");
3748 }
3749
3750 void
execFSSYNCREF(Signal * signal)3751 SimulatedBlock::execFSSYNCREF(Signal* signal)
3752 {
3753 fsRefError(signal, __LINE__, "File system sync failed");
3754 }
3755
3756 void
execFSAPPENDREF(Signal * signal)3757 SimulatedBlock::execFSAPPENDREF(Signal* signal)
3758 {
3759 fsRefError(signal, __LINE__, "File system append failed");
3760 }
3761
3762 #ifdef VM_TRACE
3763 static Ptr<void> * m_empty_global_variables[] = { 0 };
3764 void
disable_global_variables()3765 SimulatedBlock::disable_global_variables()
3766 {
3767 m_global_variables_save = m_global_variables;
3768 m_global_variables = m_empty_global_variables;
3769 }
3770
3771 void
enable_global_variables()3772 SimulatedBlock::enable_global_variables()
3773 {
3774 if (m_global_variables == m_empty_global_variables)
3775 {
3776 m_global_variables = m_global_variables_save;
3777 }
3778 }
3779
3780 void
clear_global_variables()3781 SimulatedBlock::clear_global_variables(){
3782 Ptr<void> ** tmp = m_global_variables;
3783 while(* tmp != 0){
3784 (* tmp)->i = RNIL;
3785 (* tmp)->p = 0;
3786 tmp++;
3787 }
3788 }
3789
3790 void
init_globals_list(void ** tmp,size_t cnt)3791 SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
3792 m_global_variables = new Ptr<void> * [cnt+1];
3793 for(size_t i = 0; i<cnt; i++){
3794 m_global_variables[i] = (Ptr<void>*)tmp[i];
3795 }
3796 m_global_variables[cnt] = 0;
3797 }
3798
3799 #endif
3800
3801 Uint32
xfrm_key(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3802 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
3803 Uint32 *dst, Uint32 dstSize,
3804 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3805 {
3806 const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3807 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3808
3809 Uint32 i = 0;
3810 Uint32 srcPos = 0;
3811 Uint32 dstPos = 0;
3812 while (i < noOfKeyAttr)
3813 {
3814 const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3815 Uint32 dstWords =
3816 xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3817 src, srcPos, dst, dstPos, dstSize);
3818 keyPartLen[i++] = dstWords;
3819 if (unlikely(dstWords == 0))
3820 return 0;
3821 }
3822
3823 if (0)
3824 {
3825 for(Uint32 i = 0; i<dstPos; i++)
3826 {
3827 printf("%.8x ", dst[i]);
3828 }
3829 printf("\n");
3830 }
3831 return dstPos;
3832 }
3833
3834 Uint32
xfrm_attr(Uint32 attrDesc,CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const3835 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
3836 const Uint32* src, Uint32 & srcPos,
3837 Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
3838 {
3839 Uint32 array =
3840 AttributeDescriptor::getArrayType(attrDesc);
3841 Uint32 srcBytes =
3842 AttributeDescriptor::getSizeInBytes(attrDesc);
3843
3844 Uint32 srcWords = ~0;
3845 Uint32 dstWords = ~0;
3846 uchar* dstPtr = (uchar*)&dst[dstPos];
3847 const uchar* srcPtr = (const uchar*)&src[srcPos];
3848
3849 if (cs == NULL)
3850 {
3851 jam();
3852 Uint32 len;
3853 switch(array){
3854 case NDB_ARRAYTYPE_SHORT_VAR:
3855 len = 1 + srcPtr[0];
3856 break;
3857 case NDB_ARRAYTYPE_MEDIUM_VAR:
3858 len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3859 break;
3860 #ifndef VM_TRACE
3861 default:
3862 abort();
3863 #endif
3864 case NDB_ARRAYTYPE_FIXED:
3865 len = srcBytes;
3866 }
3867 srcWords = (len + 3) >> 2;
3868 dstWords = srcWords;
3869 memcpy(dstPtr, srcPtr, dstWords << 2);
3870
3871 if (0)
3872 {
3873 ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3874 srcPos, dstPos, len, srcWords, dstWords);
3875
3876 for(Uint32 i = 0; i<srcWords; i++)
3877 printf("%.8x ", src[srcPos + i]);
3878 printf("\n");
3879 }
3880 }
3881 else
3882 {
3883 jam();
3884 Uint32 typeId =
3885 AttributeDescriptor::getType(attrDesc);
3886 Uint32 lb, len;
3887 bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
3888 if (unlikely(!ok))
3889 return 0;
3890 Uint32 xmul = cs->strxfrm_multiply;
3891 if (xmul == 0)
3892 xmul = 1;
3893 /*
3894 * Varchar end-spaces are ignored in comparisons. To get same hash
3895 * we blank-pad to maximum length via strnxfrm.
3896 */
3897 Uint32 dstLen = xmul * (srcBytes - lb);
3898 ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3899 int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
3900 if (unlikely(n == -1))
3901 return 0;
3902 while ((n & 3) != 0)
3903 {
3904 dstPtr[n++] = 0;
3905 }
3906 dstWords = (n >> 2);
3907 srcWords = (lb + len + 3) >> 2;
3908 }
3909
3910 dstPos += dstWords;
3911 srcPos += srcWords;
3912 return dstWords;
3913 }
3914
3915 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3916 SimulatedBlock::create_distr_key(Uint32 tableId,
3917 const Uint32 *src,
3918 Uint32* dst,
3919 const Uint32
3920 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3921 {
3922 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3923 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3924 Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3925
3926 Uint32 i = 0;
3927 Uint32 dstPos = 0;
3928
3929 /* --Note that src and dst may be the same location-- */
3930
3931 if(keyPartLen)
3932 {
3933 while (i < noOfKeyAttr && noOfDistrKeys)
3934 {
3935 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3936 Uint32 len = keyPartLen[i];
3937 if(AttributeDescriptor::getDKey(attr))
3938 {
3939 noOfDistrKeys--;
3940 memmove(dst+dstPos, src, len << 2);
3941 dstPos += len;
3942 }
3943 src += len;
3944 i++;
3945 }
3946 }
3947 else
3948 {
3949 while (i < noOfKeyAttr && noOfDistrKeys)
3950 {
3951 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3952 Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3953 ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3954 if(AttributeDescriptor::getDKey(attr))
3955 {
3956 noOfDistrKeys--;
3957 memmove(dst+dstPos, src, len << 2);
3958 dstPos += len;
3959 }
3960 src += len;
3961 i++;
3962 }
3963 }
3964 return dstPos;
3965 }
3966
3967 CArray<KeyDescriptor> g_key_descriptor_pool;
3968
3969 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)3970 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
3971 Uint32 dst[],
3972 Uint32 dstcnt,
3973 Uint32 gsn,
3974 Signal * signal,
3975 Uint32 sigLen,
3976 JobBufferLevel prio,
3977 SectionHandle * userhandle)
3978 {
3979 ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
3980 pathcnt--; // first hop is made from here
3981
3982
3983 Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3984 ndbrequire(len <= 25);
3985
3986 SectionHandle handle(this, signal);
3987 if (userhandle)
3988 {
3989 jam();
3990 handle.m_cnt = userhandle->m_cnt;
3991 for (Uint32 i = 0; i<handle.m_cnt; i++)
3992 handle.m_ptr[i] = userhandle->m_ptr[i];
3993 userhandle->m_cnt = 0;
3994 }
3995
3996 if (len + sigLen > 25)
3997 {
3998 jam();
3999
4000 /**
4001 * we need to store theData in a section
4002 */
4003 ndbrequire(handle.m_cnt < 3);
4004 handle.m_ptr[2] = handle.m_ptr[1];
4005 handle.m_ptr[1] = handle.m_ptr[0];
4006 Ptr<SectionSegment> tmp;
4007 if (unlikely(! import(tmp, signal->theData, sigLen)))
4008 {
4009 handle_out_of_longsignal_memory(0);
4010 }
4011 handle.m_ptr[0].p = tmp.p;
4012 handle.m_ptr[0].i = tmp.i;
4013 handle.m_ptr[0].sz = sigLen;
4014 handle.m_cnt ++;
4015 }
4016 else
4017 {
4018 jam();
4019 memmove(signal->theData + len, signal->theData, 4 * sigLen);
4020 len += sigLen;
4021 }
4022
4023 LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
4024 ord->cnt = (pathcnt << 16) | (dstcnt);
4025 ord->gsn = gsn;
4026 ord->prio = Uint32(prio);
4027
4028 Uint32 * dstptr = ord->path;
4029 for (Uint32 i = 1; i <= pathcnt; i++)
4030 {
4031 ndbrequire(refToNode(path[i].ref) == 0 ||
4032 refToNode(path[i].ref) == getOwnNodeId());
4033
4034 * dstptr++ = path[i].ref;
4035 * dstptr++ = Uint32(path[i].prio);
4036 }
4037
4038 for (Uint32 i = 0; i<dstcnt; i++)
4039 {
4040 ndbrequire(refToNode(dst[i]) == 0 ||
4041 refToNode(dst[i]) == getOwnNodeId());
4042
4043 * dstptr++ = dst[i];
4044 }
4045
4046 sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4047 path[0].prio, &handle);
4048 }
4049
4050 void
execLOCAL_ROUTE_ORD(Signal * signal)4051 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
4052 {
4053 jamEntry();
4054
4055 if (!assembleFragments(signal))
4056 {
4057 jam();
4058 return;
4059 }
4060
4061 if (ERROR_INSERTED(1001))
4062 {
4063 /**
4064 * This NDBCNTR error code 1001
4065 */
4066 jam();
4067 SectionHandle handle(this, signal);
4068 sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
4069 signal->getLength(), &handle);
4070 return;
4071 }
4072
4073 LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
4074 Uint32 pathcnt = ord->cnt >> 16;
4075 Uint32 dstcnt = ord->cnt & 0xFFFF;
4076 Uint32 sigLen = signal->getLength();
4077
4078 if (pathcnt == 0)
4079 {
4080 /**
4081 * Send to final destination(s);
4082 */
4083 jam();
4084 Uint32 gsn = ord->gsn;
4085 Uint32 prio = ord->prio;
4086 memcpy(signal->theData+25, ord->path, 4*dstcnt);
4087 SectionHandle handle(this, signal);
4088 if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
4089 {
4090 jam();
4091 /**
4092 * Data is at end of this...
4093 */
4094 memmove(signal->theData,
4095 signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4096 4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4097 sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4098 }
4099 else
4100 {
4101 jam();
4102 /**
4103 * Put section 0 in signal->theData
4104 */
4105 sigLen = handle.m_ptr[0].sz;
4106 ndbrequire(sigLen <= 25);
4107 copy(signal->theData, handle.m_ptr[0]);
4108 release(handle.m_ptr[0]);
4109
4110 for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4111 handle.m_ptr[i] = handle.m_ptr[i+1];
4112 handle.m_cnt--;
4113 }
4114
4115 /*
4116 * The extra if-statement is as sendSignalNoRelease will copy sections
4117 * which is not necessary is only sending to one destination
4118 */
4119 if (dstcnt > 1)
4120 {
4121 jam();
4122 for (Uint32 i = 0; i<dstcnt; i++)
4123 {
4124 jam();
4125 sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4126 JobBufferLevel(prio), &handle);
4127 }
4128 releaseSections(handle);
4129 }
4130 else
4131 {
4132 jam();
4133 sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4134 JobBufferLevel(prio), &handle);
4135 }
4136 }
4137 else
4138 {
4139 /**
4140 * Reroute
4141 */
4142 jam();
4143 SectionHandle handle(this, signal);
4144 Uint32 ref = ord->path[0];
4145 Uint32 prio = ord->path[1];
4146 Uint32 len = sigLen - 2;
4147 ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4148 memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4149 sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4150 JobBufferLevel(prio), &handle);
4151 }
4152 }
4153
4154
4155 #ifdef VM_TRACE
4156 bool
debugOutOn()4157 SimulatedBlock::debugOutOn()
4158 {
4159 SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4160 return
4161 globalData.testOn &&
4162 globalSignalLoggers.logMatch(number(), mask);
4163 }
4164
4165 const char*
debugOutTag(char * buf,int line)4166 SimulatedBlock::debugOutTag(char *buf, int line)
4167 {
4168 char blockbuf[40];
4169 char instancebuf[40];
4170 char linebuf[40];
4171 char timebuf[40];
4172 sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4173 instancebuf[0] = 0;
4174 if (instance() != 0)
4175 sprintf(instancebuf, "/%u", instance());
4176 sprintf(linebuf, " %d", line);
4177 timebuf[0] = 0;
4178 #ifdef VM_TRACE_TIME
4179 {
4180 Uint64 t = NdbTick_CurrentMillisecond();
4181 uint s = (t / 1000) % 3600;
4182 uint ms = t % 1000;
4183 sprintf(timebuf, " - %u.%03u -", s, ms);
4184 }
4185 #endif
4186 sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4187 return buf;
4188 }
4189 #endif
4190
4191 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4192 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4193 const Uint32 blocks[],
4194 const Callback & cb,
4195 JobBufferLevel prio)
4196 {
4197 #ifndef NDBD_MULTITHREADED
4198 Callback copy = cb;
4199 execute(signal, copy, 0);
4200 #else
4201 jam();
4202 Uint32 ref[32]; // max threads
4203 Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4204 ref, NDB_ARRAY_SIZE(ref));
4205 if (cnt == 0)
4206 {
4207 jam();
4208 Callback copy = cb;
4209 execute(signal, copy, 0);
4210 return;
4211 }
4212
4213 Ptr<SyncThreadRecord> ptr;
4214 ndbrequire(c_syncThreadPool.seize(ptr));
4215 ptr.p->m_cnt = cnt;
4216 ptr.p->m_callback = cb;
4217
4218 signal->theData[0] = reference();
4219 signal->theData[1] = ptr.i;
4220 signal->theData[2] = Uint32(prio);
4221 for (Uint32 i = 0; i<cnt; i++)
4222 {
4223 sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4224 }
4225 #endif
4226 }
4227
4228 void
execSYNC_THREAD_REQ(Signal * signal)4229 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4230 {
4231 jamEntry();
4232 Uint32 ref = signal->theData[0];
4233 Uint32 prio = signal->theData[2];
4234 sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4235 JobBufferLevel(prio));
4236 }
4237
4238 void
execSYNC_THREAD_CONF(Signal * signal)4239 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4240 {
4241 jamEntry();
4242 Ptr<SyncThreadRecord> ptr;
4243 c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4244 if (ptr.p->m_cnt == 1)
4245 {
4246 jam();
4247 Callback copy = ptr.p->m_callback;
4248 c_syncThreadPool.release(ptr);
4249 execute(signal, copy, 0);
4250 return;
4251 }
4252 ptr.p->m_cnt --;
4253 }
4254
4255 void
execSYNC_REQ(Signal * signal)4256 SimulatedBlock::execSYNC_REQ(Signal* signal)
4257 {
4258 jamEntry();
4259 Uint32 ref = signal->theData[0];
4260 Uint32 prio = signal->theData[2];
4261 sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4262 JobBufferLevel(prio));
4263 }
4264
4265 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4266 SimulatedBlock::synchronize_path(Signal * signal,
4267 const Uint32 blocks[],
4268 const Callback & cb,
4269 JobBufferLevel prio)
4270 {
4271 jam();
4272
4273 // reuse SyncThreadRecord
4274 Ptr<SyncThreadRecord> ptr;
4275 ndbrequire(c_syncThreadPool.seize(ptr));
4276 ptr.p->m_cnt = 0; // with count of 0
4277 ptr.p->m_callback = cb;
4278
4279 SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4280 req->senderData = ptr.i;
4281 req->prio = Uint32(prio);
4282 req->count = 1;
4283 if (blocks[0] == 0)
4284 {
4285 jam();
4286 ndbrequire(false); // TODO
4287 }
4288 else
4289 {
4290 jam();
4291 Uint32 len = 0;
4292 for (; blocks[len+1] != 0; len++)
4293 {
4294 req->path[len] = blocks[len+1];
4295 }
4296 req->pathlen = 1 + len;
4297 req->path[len] = reference();
4298 sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4299 GSN_SYNC_PATH_REQ, signal,
4300 SyncPathReq::SignalLength + (1 + len), prio);
4301 }
4302 }
4303
4304 void
execSYNC_PATH_REQ(Signal * signal)4305 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
4306 {
4307 jamEntry();
4308 SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4309 if (req->pathlen == 1)
4310 {
4311 jam();
4312 SyncPathReq copy = *req;
4313 SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
4314 conf->senderData = copy.senderData;
4315 conf->count = copy.count;
4316 sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4317 SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4318 }
4319 else
4320 {
4321 jam();
4322 Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4323 req->pathlen--;
4324 memmove(req->path, req->path + 1, 4 * req->pathlen);
4325 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4326 SyncPathReq::SignalLength + (1 + req->pathlen),
4327 JobBufferLevel(req->prio));
4328 }
4329 }
4330
4331 void
execSYNC_PATH_CONF(Signal * signal)4332 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
4333 {
4334 jamEntry();
4335 SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
4336 Ptr<SyncThreadRecord> ptr;
4337
4338 c_syncThreadPool.getPtr(ptr, conf.senderData);
4339
4340 if (ptr.p->m_cnt == 0)
4341 {
4342 jam();
4343 ptr.p->m_cnt = conf.count;
4344 }
4345
4346 if (ptr.p->m_cnt == 1)
4347 {
4348 jam();
4349 Callback copy = ptr.p->m_callback;
4350 c_syncThreadPool.release(ptr);
4351 execute(signal, copy, 0);
4352 return;
4353 }
4354
4355 ptr.p->m_cnt --;
4356 }
4357
4358
4359 bool
checkNodeFailSequence(Signal * signal)4360 SimulatedBlock::checkNodeFailSequence(Signal* signal)
4361 {
4362 Uint32 ref = signal->getSendersBlockRef();
4363
4364 /**
4365 * Make sure that a signal being part of node-failure handling
4366 * from a remote node, does not get to us before we got the NODE_FAILREP
4367 * (this to avoid tricky state handling)
4368 *
4369 * To ensure this, we send the signal via QMGR (GSN_COMMIT_FAILREQ)
4370 * and NDBCNTR (which sends NODE_FAILREP)
4371 *
4372 * The extra time should be negilable
4373 *
4374 * Note, make an exception for signals sent by our self
4375 * as they are only sent as a consequence of NODE_FAILREP
4376 */
4377 if (ref == reference() ||
4378 (refToNode(ref) == getOwnNodeId() &&
4379 refToMain(ref) == NDBCNTR))
4380 {
4381 jam();
4382 return true;
4383 }
4384
4385 RoutePath path[2];
4386 path[0].ref = QMGR_REF;
4387 path[0].prio = JBB;
4388 path[1].ref = NDBCNTR_REF;
4389 path[1].prio = JBB;
4390
4391 Uint32 dst[1];
4392 dst[0] = reference();
4393
4394 SectionHandle handle(this, signal);
4395 Uint32 gsn = signal->header.theVerId_signalNumber;
4396 Uint32 len = signal->getLength();
4397
4398 sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle);
4399 return false;
4400 }
4401
4402 void
setup_wakeup()4403 SimulatedBlock::setup_wakeup()
4404 {
4405 #ifdef NDBD_MULTITHREADED
4406 #else
4407 globalTransporterRegistry.setup_wakeup_socket();
4408 #endif
4409 }
4410
4411 void
wakeup()4412 SimulatedBlock::wakeup()
4413 {
4414 #ifdef NDBD_MULTITHREADED
4415 mt_wakeup(this);
4416 #else
4417 globalTransporterRegistry.wakeup();
4418 #endif
4419 }
4420
4421
4422 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const4423 SimulatedBlock::ndbinfo_send_row(Signal* signal,
4424 const DbinfoScanReq& req,
4425 const Ndbinfo::Row& row,
4426 Ndbinfo::Ratelimit& rl) const
4427 {
4428 // Check correct number of columns against table
4429 assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4430
4431 TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
4432 tidai->connectPtr= req.resultData;
4433 tidai->transId[0]= req.transId[0];
4434 tidai->transId[1]= req.transId[1];
4435
4436 LinearSectionPtr ptr[3];
4437 ptr[0].p = row.getDataPtr();
4438 ptr[0].sz = row.getLength();
4439
4440 rl.rows++;
4441 rl.bytes += row.getLength();
4442
4443 sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4444 signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4445 }
4446
4447
4448 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const4449 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
4450 DbinfoScanReq& req,
4451 const Ndbinfo::Ratelimit& rl,
4452 Uint32 data1, Uint32 data2,
4453 Uint32 data3, Uint32 data4) const
4454 {
4455 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4456 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4457 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4458
4459 conf->returnedRows = rl.rows;
4460
4461 // Update the cursor with current item number
4462 Ndbinfo::ScanCursor* cursor =
4463 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4464
4465 cursor->data[0] = data1;
4466 cursor->data[1] = data2;
4467 cursor->data[2] = data3;
4468 cursor->data[3] = data4;
4469
4470 // Increase number of rows and bytes sent to far
4471 cursor->totalRows += rl.rows;
4472 cursor->totalBytes += rl.bytes;
4473
4474 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
4475
4476 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4477 signal_length, JBB);
4478 }
4479
4480 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const4481 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
4482 DbinfoScanReq& req,
4483 const Ndbinfo::Ratelimit& rl) const
4484 {
4485 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4486 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4487 Uint32 sender_ref = req.resultRef;
4488 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4489
4490 conf->returnedRows = rl.rows;
4491
4492 if (req.cursor_sz)
4493 {
4494 jam();
4495 // Update the cursor with current item number
4496 Ndbinfo::ScanCursor* cursor =
4497 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4498
4499 // Reset all data holders
4500 memset(cursor->data, 0, sizeof(cursor->data));
4501
4502 // Increase number of rows and bytes sent to far
4503 cursor->totalRows += rl.rows;
4504 cursor->totalBytes += rl.bytes;
4505
4506 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
4507
4508 sender_ref = cursor->senderRef;
4509
4510 }
4511 sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4512 signal_length, JBB);
4513 }
4514
init_elapsed_time(Signal * signal,NDB_TICKS & latestTIME_SIGNAL)4515 void SimulatedBlock::init_elapsed_time(Signal *signal,
4516 NDB_TICKS &latestTIME_SIGNAL)
4517 {
4518 const NDB_TICKS currentTime = NdbTick_getCurrentTicks();
4519 signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
4520 signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
4521 latestTIME_SIGNAL = currentTime;
4522 sendSignal(reference(), GSN_TIME_SIGNAL, signal, 2, JBB);
4523 }
4524
sendTIME_SIGNAL(Signal * signal,const NDB_TICKS currentTime,Uint32 delay)4525 void SimulatedBlock::sendTIME_SIGNAL(Signal *signal,
4526 const NDB_TICKS currentTime,
4527 Uint32 delay)
4528 {
4529 signal->theData[0] = Uint32(currentTime.getUint64() >> 32);
4530 signal->theData[1] = Uint32(currentTime.getUint64() & 0xFFFFFFFF);
4531 sendSignalWithDelay(reference(), GSN_TIME_SIGNAL, signal, delay, 2);
4532 }
4533
4534 /*
4535 This function is used to handle TIME_SIGNAL. This signal is intended to
4536 be used sort of like a drum beat. We should execute some timer calls
4537 every so often. However the OS can easily make the delayed signals to
4538 be delayed if the OS is occupied with other things. We will never report
4539 sleeps for longer than twice the expected delay. We rely on the delayed
4540 signal scheduler to ensure that we run time a bit faster for a while
4541 after long sleeps.
4542
4543 This function will return the elapsed time since last time we called it.
4544 */
4545 Uint64
elapsed_time(Signal * signal,const NDB_TICKS currentTime,NDB_TICKS & latestTIME_SIGNAL,Uint32 expected_delay)4546 SimulatedBlock::elapsed_time(Signal *signal,
4547 const NDB_TICKS currentTime,
4548 NDB_TICKS &latestTIME_SIGNAL,
4549 Uint32 expected_delay)
4550 {
4551 const Uint64 elapsed_time =
4552 NdbTick_Elapsed(latestTIME_SIGNAL, currentTime).milliSec();
4553 latestTIME_SIGNAL = currentTime;
4554
4555 if (elapsed_time > Uint64(2 * expected_delay))
4556 {
4557 return Uint64(2 * expected_delay);
4558 }
4559 return elapsed_time;
4560 }
4561
4562 #ifdef VM_TRACE
4563 void
assertOwnThread()4564 SimulatedBlock::assertOwnThread()
4565 {
4566 #ifdef NDBD_MULTITHREADED
4567 mt_assert_own_thread(this);
4568 #endif
4569 }
4570
4571 #endif
4572
4573 Uint32
get_recv_thread_idx(NodeId nodeId)4574 SimulatedBlock::get_recv_thread_idx(NodeId nodeId)
4575 {
4576 #ifdef NDBD_MULTITHREADED
4577 return mt_get_recv_thread_idx(nodeId);
4578 #else
4579 return 0;
4580 #endif
4581 }
4582
4583 #ifndef NDBD_MULTITHREADED
4584 /**
4585 * Add a stub for this function since we have some code in ErrorReporter.cpp
4586 * that needs this function, it's only really needed for ndbmtd, so need an
4587 * empty function in ndbd.
4588 */
4589 void
prepare_to_crash(bool first_phase,bool error_insert_crash)4590 ErrorReporter::prepare_to_crash(bool first_phase, bool error_insert_crash)
4591 {
4592 (void)first_phase;
4593 (void)error_insert_crash;
4594 }
4595 #endif
4596
4597 /**
4598 * Implementation of SegmentUtils
4599 * Here we forward the calls, but
4600 * in ndbmtd, add our thread cache
4601 * + lock function arguments.
4602 */
4603
4604 SectionSegment*
getSegmentPtr(Uint32 iVal)4605 SimulatedBlock::getSegmentPtr(Uint32 iVal)
4606 {
4607 return g_sectionSegmentPool.getPtr(iVal);
4608 }
4609
4610 bool
seizeSegment(Ptr<SectionSegment> & p)4611 SimulatedBlock::seizeSegment(Ptr<SectionSegment>& p)
4612 {
4613 return g_sectionSegmentPool.seize(SB_SP_REL_ARG p);
4614 }
4615
4616 void
releaseSegment(Uint32 iVal)4617 SimulatedBlock::releaseSegment(Uint32 iVal)
4618 {
4619 g_sectionSegmentPool.release(SB_SP_REL_ARG iVal);
4620 }
4621
4622 void
releaseSegmentList(Uint32 firstSegmentIVal)4623 SimulatedBlock::releaseSegmentList(Uint32 firstSegmentIVal)
4624 {
4625 ::releaseSection(SB_SP_ARG firstSegmentIVal);
4626 }
4627
4628
4629 /**
4630 * #undef is needed since this file is included by SimulatedBlock_nonmt.cpp
4631 * and SimulatedBlock_mt.cpp
4632 */
4633 #undef JAM_FILE_ID
4634