1 /*
2    Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 /*
26   This file is used to build both the multithreaded and the singlethreaded
27   ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28   the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29   macro not defined).
30 */
31 
32 #include <ndb_global.h>
33 
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include <signaldata/EventReport.hpp>
47 #include <signaldata/ContinueFragmented.hpp>
48 #include <signaldata/NodeStateSignalData.hpp>
49 #include <signaldata/FsRef.hpp>
50 #include <signaldata/SignalDroppedRep.hpp>
51 #include <signaldata/LocalRouteOrd.hpp>
52 #include <signaldata/TransIdAI.hpp>
53 #include <signaldata/Sync.hpp>
54 #include <DebuggerNames.hpp>
55 #include "LongSignal.hpp"
56 
57 #include <Properties.hpp>
58 #include "Configuration.hpp"
59 #include <AttributeDescriptor.hpp>
60 #include <NdbSqlUtil.hpp>
61 
62 #include "../blocks/dbdih/Dbdih.hpp"
63 #include <signaldata/CallbackSignal.hpp>
64 #include "LongSignalImpl.hpp"
65 
66 #include <EventLogger.hpp>
67 extern EventLogger * g_eventLogger;
68 
69 #define ljamEntry() jamEntryLine(30000 + __LINE__)
70 #define ljam() jamLine(30000 + __LINE__)
71 
72 //
73 // Constructor, Destructor
74 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)75 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
76 			       struct Block_context & ctx,
77                                Uint32 instanceNumber)
78   : theNodeId(globalData.ownId),
79     theNumber(blockNumber),
80     theInstance(instanceNumber),
81     theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
82     theInstanceList(0),
83     theMainInstance(0),
84     m_ctx(ctx),
85     m_global_page_pool(globalData.m_global_page_pool),
86     m_shared_page_pool(globalData.m_shared_page_pool),
87     c_fragmentInfoHash(c_fragmentInfoPool),
88     c_linearFragmentSendList(c_fragmentSendPool),
89     c_segmentedFragmentSendList(c_fragmentSendPool),
90     c_mutexMgr(* this),
91     c_counterMgr(* this)
92 #ifdef VM_TRACE
93     ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
94 #endif
95 #ifdef VM_TRACE_TIME
96     ,m_currentGsn(0)
97 #endif
98 {
99   m_threadId = 0;
100   m_watchDogCounter = NULL;
101   m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
102   NewVarRef = 0;
103 
104   SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
105 
106   if (theInstance == 0) {
107     ndbrequire(mainBlock == 0);
108     mainBlock = this;
109     globalData.setBlock(blockNumber, mainBlock);
110   } else {
111     ndbrequire(mainBlock != 0);
112     mainBlock->addInstance(this, theInstance);
113   }
114   theMainInstance = mainBlock;
115 
116   c_fragmentIdCounter = 1;
117   c_fragSenderRunning = false;
118 
119 #ifdef VM_TRACE_TIME
120   clearTimes();
121 #endif
122 
123   for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
124     theExecArray[i] = 0;
125 
126   installSimulatedBlockFunctions();
127 
128   m_callbackTableAddr = 0;
129 
130   CLEAR_ERROR_INSERT_VALUE;
131 
132 #ifdef VM_TRACE
133   m_global_variables = new Ptr<void> * [1];
134   m_global_variables[0] = 0;
135   m_global_variables_save = 0;
136 #endif
137 }
138 
139 void
addInstance(SimulatedBlock * b,Uint32 theInstance)140 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
141 {
142   ndbrequire(theMainInstance == this);
143   ndbrequire(number() == b->number());
144   if (theInstanceList == 0)
145   {
146     theInstanceList = new SimulatedBlock* [MaxInstances];
147     ndbrequire(theInstanceList != 0);
148     for (Uint32 i = 0; i < MaxInstances; i++)
149       theInstanceList[i] = 0;
150   }
151   ndbrequire(theInstance < MaxInstances);
152   ndbrequire(theInstanceList[theInstance] == 0);
153   theInstanceList[theInstance] = b;
154 }
155 
156 void
initCommon()157 SimulatedBlock::initCommon()
158 {
159   Uint32 count = 10;
160   this->getParam("FragmentSendPool", &count);
161   c_fragmentSendPool.setSize(count);
162 
163   count = 10;
164   this->getParam("FragmentInfoPool", &count);
165   c_fragmentInfoPool.setSize(count);
166 
167   count = 10;
168   this->getParam("FragmentInfoHash", &count);
169   c_fragmentInfoHash.setSize(count);
170 
171   count = 5;
172   this->getParam("ActiveMutexes", &count);
173   c_mutexMgr.setSize(count);
174 
175   count = 5;
176   this->getParam("ActiveCounters", &count);
177   c_counterMgr.setSize(count);
178 
179   count = 5;
180   this->getParam("ActiveThreadSync", &count);
181   c_syncThreadPool.setSize(count);
182 }
183 
~SimulatedBlock()184 SimulatedBlock::~SimulatedBlock()
185 {
186   freeBat();
187 #ifdef VM_TRACE_TIME
188   printTimes(stdout);
189 #endif
190 
191 #ifdef VM_TRACE
192   delete [] m_global_variables;
193 #endif
194 
195   if (theInstanceList != 0) {
196     Uint32 i;
197     for (i = 0; i < MaxInstances; i++)
198       delete theInstanceList[i];
199     delete [] theInstanceList;
200   }
201   theInstanceList = 0;
202 }
203 
204 void
installSimulatedBlockFunctions()205 SimulatedBlock::installSimulatedBlockFunctions(){
206   ExecFunction * a = theExecArray;
207   a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
208   a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
209   a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
210   a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
211   a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
212   a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
213   a[GSN_UTIL_CREATE_LOCK_REF]   = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
214   a[GSN_UTIL_CREATE_LOCK_CONF]  = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
215   a[GSN_UTIL_DESTROY_LOCK_REF]  = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
216   a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
217   a[GSN_UTIL_LOCK_REF]    = &SimulatedBlock::execUTIL_LOCK_REF;
218   a[GSN_UTIL_LOCK_CONF]   = &SimulatedBlock::execUTIL_LOCK_CONF;
219   a[GSN_UTIL_UNLOCK_REF]  = &SimulatedBlock::execUTIL_UNLOCK_REF;
220   a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
221   a[GSN_FSOPENREF]    = &SimulatedBlock::execFSOPENREF;
222   a[GSN_FSCLOSEREF]   = &SimulatedBlock::execFSCLOSEREF;
223   a[GSN_FSWRITEREF]   = &SimulatedBlock::execFSWRITEREF;
224   a[GSN_FSREADREF]    = &SimulatedBlock::execFSREADREF;
225   a[GSN_FSREMOVEREF]  = &SimulatedBlock::execFSREMOVEREF;
226   a[GSN_FSSYNCREF]    = &SimulatedBlock::execFSSYNCREF;
227   a[GSN_FSAPPENDREF]  = &SimulatedBlock::execFSAPPENDREF;
228   a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
229   a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
230   a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
231   a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
232   a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
233   a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
234   a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
235   a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
236   a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
237   a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
238 }
239 
240 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)241 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
242 				 ExecFunction f, bool force){
243   if(gsn > MAX_GSN || (!force &&  theExecArray[gsn] != 0)){
244     char errorMsg[255];
245     BaseString::snprintf(errorMsg, 255,
246  	     "GSN %d(%d))", gsn, MAX_GSN);
247     ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
248   }
249   theExecArray[gsn] = f;
250 }
251 
252 void
assignToThread(ThreadContext ctx)253 SimulatedBlock::assignToThread(ThreadContext ctx)
254 {
255   m_threadId = ctx.threadId;
256   m_jamBuffer = ctx.jamBuffer;
257   m_watchDogCounter = ctx.watchDogCounter;
258   m_sectionPoolCache = ctx.sectionPoolCache;
259 }
260 
261 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)262 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
263 {
264   Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
265   Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
266   return instanceKey;
267 }
268 
269 Uint32
getInstanceFromKey(Uint32 instanceKey)270 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
271 {
272   Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
273   Uint32 instanceNo;
274   if (lqhWorkers == 0) {
275     instanceNo = 0;
276   } else {
277     assert(instanceKey != 0);
278     instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
279   }
280   return instanceNo;
281 }
282 
283 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const284 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
285 			     const char* filename, int lineno) const
286 {
287   char objRef[255];
288   BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
289   char probData[255];
290   BaseString::snprintf(probData, 255,
291 	   "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
292 	   gsn, len, recBlockNo);
293 
294   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
295 			     probData,
296 			     objRef);
297 }
298 
299 
300 extern class SectionSegmentPool g_sectionSegmentPool;
301 
302 #define check_sections(signal, cnt, cnt2) do { if (unlikely(cnt)) { handle_invalid_sections_in_send_signal(signal); } else if (unlikely(cnt2 == 0 && (signal->header.m_fragmentInfo != 0 && signal->header.m_fragmentInfo != 3))) { handle_invalid_fragmentInfo(signal); } } while(0)
303 
304 void
handle_invalid_sections_in_send_signal(Signal * signal) const305 SimulatedBlock::handle_invalid_sections_in_send_signal(Signal* signal) const
306 {
307   //Uint32 cnt = signal->header.m_noOfSections;
308 #if defined VM_TRACE || defined ERROR_INSERT
309   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
310 			     "Unhandled sections in sendSignal",
311 			     "");
312 #else
313   infoEvent("Unhandled sections in sendSignal!!");
314 #endif
315 }
316 
317 void
handle_lingering_sections_after_execute(Signal * signal) const318 SimulatedBlock::handle_lingering_sections_after_execute(Signal* signal) const
319 {
320   //Uint32 cnt = signal->header.m_noOfSections;
321 #if defined VM_TRACE || defined ERROR_INSERT
322   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
323 			     "Unhandled sections after execute",
324 			     "");
325 #else
326   infoEvent("Unhandled sections after execute");
327 #endif
328 }
329 
330 void
handle_lingering_sections_after_execute(SectionHandle * handle) const331 SimulatedBlock::handle_lingering_sections_after_execute(SectionHandle* handle) const
332 {
333   //Uint32 cnt = signal->header.m_noOfSections;
334 #if defined VM_TRACE || defined ERROR_INSERT
335   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
336 			     "Unhandled sections(handle) after execute",
337 			     "");
338 #else
339   infoEvent("Unhandled sections(handle) after execute");
340 #endif
341 }
342 
343 void
handle_invalid_fragmentInfo(Signal * signal) const344 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
345 {
346 #if defined VM_TRACE || defined ERROR_INSERT
347   ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
348                              "Incorrect header->m_fragmentInfo in sendSignal()",
349                              "");
350 #else
351   signal->header.m_fragmentInfo = 0;
352   infoEvent("Incorrect header->m_fragmentInfo in sendSignal");
353 #endif
354 }
355 
356 void
handle_out_of_longsignal_memory(Signal * signal) const357 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
358 {
359   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
360 			     "Out of LongMessageBuffer in sendSignal",
361 			     "");
362 }
363 
364 void
handle_send_failed(SendStatus ss,Signal * signal) const365 SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const
366 {
367   switch(ss){
368   case SEND_BUFFER_FULL:
369     ErrorReporter::handleError(NDBD_EXIT_GENERIC,
370                                "Out of SendBufferMemory in sendSignal", "");
371     break;
372   case SEND_MESSAGE_TOO_BIG:
373     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
374                                "Message to big in sendSignal", "");
375     break;
376   case SEND_UNKNOWN_NODE:
377     ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
378                                "Unknown node in sendSignal", "");
379     break;
380   case SEND_OK:
381   case SEND_BLOCKED:
382   case SEND_DISCONNECTED:
383     break;
384   }
385   ndbrequire(false);
386 }
387 
388 static void
linkSegments(Uint32 head,Uint32 tail)389 linkSegments(Uint32 head, Uint32 tail){
390 
391   Ptr<SectionSegment> headPtr;
392   g_sectionSegmentPool.getPtr(headPtr, head);
393 
394   Ptr<SectionSegment> tailPtr;
395   g_sectionSegmentPool.getPtr(tailPtr, tail);
396 
397   Ptr<SectionSegment> oldTailPtr;
398   g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
399 
400   /* Can only efficiently link segments if linking to the end of a
401    * multiple-of-segment-size sized chunk
402    */
403   if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
404   {
405 #if defined VM_TRACE || defined ERROR_INSERT
406     ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
407                                "Bad head segment size",
408                                "");
409 #else
410     ndbout_c("linkSegments : Bad head segment size");
411 #endif
412   }
413 
414   headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
415   headPtr.p->m_sz += tailPtr.p->m_sz;
416 
417   oldTailPtr.p->m_nextSegment = tailPtr.i;
418 }
419 
420 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])421 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
422   Uint32 tSec0 = ptr[0].i;
423   Uint32 tSec1 = ptr[1].i;
424   Uint32 tSec2 = ptr[2].i;
425   SectionSegment * p;
426   switch(secCount){
427   case 3:
428     p = g_sectionSegmentPool.getPtr(tSec2);
429     ptr[2].p = p;
430     ptr[2].sz = p->m_sz;
431   case 2:
432     p = g_sectionSegmentPool.getPtr(tSec1);
433     ptr[1].p = p;
434     ptr[1].sz = p->m_sz;
435   case 1:
436     p = g_sectionSegmentPool.getPtr(tSec0);
437     ptr[0].p = p;
438     ptr[0].sz = p->m_sz;
439   case 0:
440     return;
441   }
442   char msg[40];
443   sprintf(msg, "secCount=%d", secCount);
444   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
445 }
446 
447 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)448 getSection(SegmentedSectionPtr & ptr, Uint32 i){
449   ptr.i = i;
450   SectionSegment * p = g_sectionSegmentPool.getPtr(i);
451   ptr.p = p;
452   ptr.sz = p->m_sz;
453 }
454 
getSectionSz(Uint32 id)455 Uint32 getSectionSz(Uint32 id)
456 {
457   return g_sectionSegmentPool.getPtr(id)->m_sz;
458 }
459 
getLastWordPtr(Uint32 id)460 Uint32* getLastWordPtr(Uint32 id)
461 {
462   SectionSegment* first= g_sectionSegmentPool.getPtr(id);
463   SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
464   Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
465   return &last->theData[offset];
466 }
467 
468 #ifdef NDBD_MULTITHREADED
469 #define SB_SP_ARG *m_sectionPoolCache,
470 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
471 #else
472 #define SB_SP_ARG
473 #define SB_SP_REL_ARG
474 #endif
475 
476 static
477 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])478 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
479   Uint32 tSec0 = ptr[0].i;
480   Uint32 tSz0 = ptr[0].sz;
481   Uint32 tSec1 = ptr[1].i;
482   Uint32 tSz1 = ptr[1].sz;
483   Uint32 tSec2 = ptr[2].i;
484   Uint32 tSz2 = ptr[2].sz;
485   switch(secCount){
486   case 3:
487     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
488                                      relSz(tSz2), tSec2,
489 				     ptr[2].p->m_lastSegment);
490   case 2:
491     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
492                                      relSz(tSz1), tSec1,
493 				     ptr[1].p->m_lastSegment);
494   case 1:
495     g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
496                                      relSz(tSz0), tSec0,
497 				     ptr[0].p->m_lastSegment);
498   case 0:
499     return;
500   }
501   char msg[40];
502   sprintf(msg, "secCount=%d", secCount);
503   ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
504 }
505 
506 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const507 SimulatedBlock::sendSignal(BlockReference ref,
508 			   GlobalSignalNumber gsn,
509 			   Signal* signal,
510 			   Uint32 length,
511 			   JobBufferLevel jobBuffer) const {
512 
513   BlockReference sendBRef = reference();
514 
515   Uint32 noOfSections = signal->header.m_noOfSections;
516   Uint32 recBlock = refToBlock(ref);
517   Uint32 recNode   = refToNode(ref);
518   Uint32 ourProcessor         = globalData.ownId;
519 
520   signal->header.theLength = length;
521   signal->header.theVerId_signalNumber = gsn;
522   signal->header.theReceiversBlockNumber = recBlock;
523   signal->header.m_noOfSections = 0;
524 
525   check_sections(signal, noOfSections, 0);
526 
527   Uint32 tSignalId = signal->header.theSignalId;
528 
529   if ((length == 0) || length > 25 || (recBlock == 0)) {
530     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
531     return;
532   }//if
533 #ifdef VM_TRACE
534   if(globalData.testOn){
535     Uint16 proc =
536       (recNode == 0 ? globalData.ownId : recNode);
537     signal->header.theSendersBlockRef = sendBRef;
538     globalSignalLoggers.sendSignal(signal->header,
539 				   jobBuffer,
540 				   &signal->theData[0],
541 				   proc);
542   }
543 #endif
544 
545   if(recNode == ourProcessor || recNode == 0) {
546     signal->header.theSendersSignalId = tSignalId;
547     signal->header.theSendersBlockRef = sendBRef;
548 #ifdef NDBD_MULTITHREADED
549     if (jobBuffer == JBB)
550       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
551     else
552       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
553 #else
554     globalScheduler.execute(signal, jobBuffer, recBlock,
555 			    gsn);
556 #endif
557     return;
558   } else {
559     // send distributed Signal
560     SignalHeader sh;
561 
562     Uint32 tTrace = signal->getTrace();
563 
564     sh.theVerId_signalNumber   = gsn;
565     sh.theReceiversBlockNumber = recBlock;
566     sh.theSendersBlockRef      = refToBlock(sendBRef);
567     sh.theLength               = length;
568     sh.theTrace                = tTrace;
569     sh.theSignalId             = tSignalId;
570     sh.m_noOfSections          = 0;
571     sh.m_fragmentInfo          = 0;
572 
573 #ifdef TRACE_DISTRIBUTED
574     ndbout_c("send: %s(%d) to (%s, %d)",
575 	     getSignalName(gsn), gsn, getBlockName(recBlock),
576 	     recNode);
577 #endif
578 
579     SendStatus ss;
580 #ifdef NDBD_MULTITHREADED
581     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
582                         recNode, 0);
583 #else
584     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
585                                                &signal->theData[0], recNode,
586                                                (LinearSectionPtr*)0);
587 #endif
588 
589     if (unlikely(! (ss == SEND_OK ||
590                     ss == SEND_BLOCKED ||
591                     ss == SEND_DISCONNECTED)))
592     {
593       handle_send_failed(ss, signal);
594     }
595   }
596   return;
597 }
598 
599 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const600 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
601 			   GlobalSignalNumber gsn,
602 			   Signal* signal,
603 			   Uint32 length,
604 			   JobBufferLevel jobBuffer) const {
605 
606   Uint32 noOfSections = signal->header.m_noOfSections;
607   Uint32 tSignalId = signal->header.theSignalId;
608   Uint32 tTrace = signal->getTrace();
609 
610   Uint32 ourProcessor = globalData.ownId;
611   Uint32 recBlock = rg.m_block;
612 
613   signal->header.theLength = length;
614   signal->header.theVerId_signalNumber = gsn;
615   signal->header.theReceiversBlockNumber = recBlock;
616   signal->header.theSendersSignalId = tSignalId;
617   signal->header.theSendersBlockRef = reference();
618   signal->header.m_noOfSections = 0;
619 
620   check_sections(signal, noOfSections, 0);
621 
622   if ((length == 0) || (length > 25) || (recBlock == 0)) {
623     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
624     return;
625   }//if
626 
627   SignalHeader sh;
628 
629   sh.theVerId_signalNumber   = gsn;
630   sh.theReceiversBlockNumber = recBlock;
631   sh.theSendersBlockRef      = refToBlock(reference());
632   sh.theLength               = length;
633   sh.theTrace                = tTrace;
634   sh.theSignalId             = tSignalId;
635   sh.m_noOfSections          = 0;
636   sh.m_fragmentInfo          = 0;
637 
638   /**
639    * Check own node
640    */
641   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
642 #ifdef VM_TRACE
643     if(globalData.testOn){
644       globalSignalLoggers.sendSignal(signal->header,
645 				     jobBuffer,
646 				     &signal->theData[0],
647 				     ourProcessor);
648     }
649 #endif
650 
651 #ifdef NDBD_MULTITHREADED
652     if (jobBuffer == JBB)
653       sendlocal(m_threadId, &signal->header, signal->theData, NULL);
654     else
655       sendprioa(m_threadId, &signal->header, signal->theData, NULL);
656 #else
657     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
658 #endif
659 
660     rg.m_nodes.clear((Uint32)0);
661     rg.m_nodes.clear(ourProcessor);
662   }
663 
664   /**
665    * Do the big loop
666    */
667   Uint32 recNode = 0;
668   while(!rg.m_nodes.isclear()){
669     recNode = rg.m_nodes.find(recNode + 1);
670     rg.m_nodes.clear(recNode);
671 #ifdef VM_TRACE
672     if(globalData.testOn){
673       globalSignalLoggers.sendSignal(signal->header,
674 				     jobBuffer,
675 				     &signal->theData[0],
676 				     recNode);
677     }
678 #endif
679 
680 #ifdef TRACE_DISTRIBUTED
681     ndbout_c("send: %s(%d) to (%s, %d)",
682 	     getSignalName(gsn), gsn, getBlockName(recBlock),
683 	     recNode);
684 #endif
685 
686     SendStatus ss;
687 #ifdef NDBD_MULTITHREADED
688     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
689                         recNode, 0);
690 #else
691     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
692                                                &signal->theData[0], recNode,
693                                                (LinearSectionPtr*)0);
694 #endif
695 
696     if (unlikely(! (ss == SEND_OK ||
697                     ss == SEND_BLOCKED ||
698                     ss == SEND_DISCONNECTED)))
699     {
700       handle_send_failed(ss, signal);
701     }
702   }
703 
704   return;
705 }
706 
707 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
708 
709 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const710 SimulatedBlock::sendSignal(BlockReference ref,
711 			   GlobalSignalNumber gsn,
712 			   Signal* signal,
713 			   Uint32 length,
714 			   JobBufferLevel jobBuffer,
715 			   LinearSectionPtr ptr[3],
716 			   Uint32 noOfSections) const {
717 
718   BlockReference sendBRef = reference();
719 
720   Uint32 recBlock = refToBlock(ref);
721   Uint32 recNode   = refToNode(ref);
722   Uint32 ourProcessor         = globalData.ownId;
723 
724   check_sections(signal, signal->header.m_noOfSections, noOfSections);
725 
726   signal->header.theLength = length;
727   signal->header.theVerId_signalNumber = gsn;
728   signal->header.theReceiversBlockNumber = recBlock;
729   signal->header.m_noOfSections = noOfSections;
730 
731   Uint32 tSignalId = signal->header.theSignalId;
732   Uint32 tFragInfo = signal->header.m_fragmentInfo;
733 
734   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
735     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
736     return;
737   }//if
738 #ifdef VM_TRACE
739   if(globalData.testOn){
740     Uint16 proc =
741       (recNode == 0 ? globalData.ownId : recNode);
742     signal->header.theSendersBlockRef = sendBRef;
743     globalSignalLoggers.sendSignal(signal->header,
744 				   jobBuffer,
745 				   &signal->theData[0],
746 				   proc,
747                                    ptr, noOfSections);
748   }
749 #endif
750 
751   if(recNode == ourProcessor || recNode == 0) {
752     signal->header.theSendersSignalId = tSignalId;
753     signal->header.theSendersBlockRef = sendBRef;
754 
755     /**
756      * We have to copy the data
757      */
758     bool ok = true;
759     Ptr<SectionSegment> segptr[3];
760     for(Uint32 i = 0; i<noOfSections; i++){
761       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
762       signal->theData[length+i] = segptr[i].i;
763     }
764 
765     if (unlikely(! ok))
766     {
767       handle_out_of_longsignal_memory(signal);
768     }
769 
770 #ifdef NDBD_MULTITHREADED
771     if (jobBuffer == JBB)
772       sendlocal(m_threadId, &signal->header, signal->theData,
773                 signal->theData+length);
774     else
775       sendprioa(m_threadId, &signal->header, signal->theData,
776                 signal->theData+length);
777 #else
778     globalScheduler.execute(signal, jobBuffer, recBlock,
779 			    gsn);
780 #endif
781     signal->header.m_noOfSections = 0;
782     return;
783   } else {
784     // send distributed Signal
785     SignalHeader sh;
786 
787     Uint32 tTrace = signal->getTrace();
788     Uint32 noOfSections = signal->header.m_noOfSections;
789 
790     sh.theVerId_signalNumber   = gsn;
791     sh.theReceiversBlockNumber = recBlock;
792     sh.theSendersBlockRef      = refToBlock(sendBRef);
793     sh.theLength               = length;
794     sh.theTrace                = tTrace;
795     sh.theSignalId             = tSignalId;
796     sh.m_noOfSections          = noOfSections;
797     sh.m_fragmentInfo          = tFragInfo;
798 
799 #ifdef TRACE_DISTRIBUTED
800     ndbout_c("send: %s(%d) to (%s, %d)",
801 	     getSignalName(gsn), gsn, getBlockName(recBlock),
802 	     recNode);
803 #endif
804 
805     SendStatus ss;
806 #ifdef NDBD_MULTITHREADED
807     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
808                         recNode, ptr);
809 #else
810     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
811                                                &signal->theData[0], recNode,
812                                                ptr);
813 #endif
814 
815     if (unlikely(! (ss == SEND_OK ||
816                     ss == SEND_BLOCKED ||
817                     ss == SEND_DISCONNECTED)))
818     {
819       handle_send_failed(ss, signal);
820     }
821   }
822 
823   signal->header.m_noOfSections = 0;
824   signal->header.m_fragmentInfo = 0;
825   return;
826 }
827 
828 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const829 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
830 			   GlobalSignalNumber gsn,
831 			   Signal* signal,
832 			   Uint32 length,
833 			   JobBufferLevel jobBuffer,
834 			   LinearSectionPtr ptr[3],
835 			   Uint32 noOfSections) const {
836 
837   Uint32 tSignalId = signal->header.theSignalId;
838   Uint32 tTrace    = signal->getTrace();
839   Uint32 tFragInfo = signal->header.m_fragmentInfo;
840 
841   Uint32 ourProcessor = globalData.ownId;
842   Uint32 recBlock = rg.m_block;
843 
844   check_sections(signal, signal->header.m_noOfSections, noOfSections);
845 
846   signal->header.theLength = length;
847   signal->header.theVerId_signalNumber = gsn;
848   signal->header.theReceiversBlockNumber = recBlock;
849   signal->header.theSendersSignalId = tSignalId;
850   signal->header.theSendersBlockRef = reference();
851   signal->header.m_noOfSections = noOfSections;
852 
853   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
854     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
855     return;
856   }//if
857 
858   SignalHeader sh;
859   sh.theVerId_signalNumber   = gsn;
860   sh.theReceiversBlockNumber = recBlock;
861   sh.theSendersBlockRef      = refToBlock(reference());
862   sh.theLength               = length;
863   sh.theTrace                = tTrace;
864   sh.theSignalId             = tSignalId;
865   sh.m_noOfSections          = noOfSections;
866   sh.m_fragmentInfo          = tFragInfo;
867 
868   /**
869    * Check own node
870    */
871   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
872 #ifdef VM_TRACE
873     if(globalData.testOn){
874       globalSignalLoggers.sendSignal(signal->header,
875 				     jobBuffer,
876 				     &signal->theData[0],
877 				     ourProcessor,
878                                      ptr, noOfSections);
879     }
880 #endif
881     /**
882      * We have to copy the data
883      */
884     bool ok = true;
885     Ptr<SectionSegment> segptr[3];
886     for(Uint32 i = 0; i<noOfSections; i++){
887       ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
888       signal->theData[length+i] = segptr[i].i;
889     }
890 
891     if (unlikely(! ok))
892     {
893       handle_out_of_longsignal_memory(signal);
894     }
895 
896 #ifdef NDBD_MULTITHREADED
897     if (jobBuffer == JBB)
898       sendlocal(m_threadId, &signal->header, signal->theData,
899                 signal->theData + length);
900     else
901       sendprioa(m_threadId, &signal->header, signal->theData,
902                 signal->theData + length);
903 #else
904     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
905 #endif
906 
907     rg.m_nodes.clear((Uint32)0);
908     rg.m_nodes.clear(ourProcessor);
909   }
910 
911   /**
912    * Do the big loop
913    */
914   Uint32 recNode = 0;
915   while(!rg.m_nodes.isclear()){
916     recNode = rg.m_nodes.find(recNode + 1);
917     rg.m_nodes.clear(recNode);
918 
919 #ifdef VM_TRACE
920     if(globalData.testOn){
921       globalSignalLoggers.sendSignal(signal->header,
922 				     jobBuffer,
923 				     &signal->theData[0],
924 				     recNode,
925                                      ptr, noOfSections);
926     }
927 #endif
928 
929 #ifdef TRACE_DISTRIBUTED
930     ndbout_c("send: %s(%d) to (%s, %d)",
931 	     getSignalName(gsn), gsn, getBlockName(recBlock),
932 	     recNode);
933 #endif
934 
935     SendStatus ss;
936 #ifdef NDBD_MULTITHREADED
937     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
938                         recNode, ptr);
939 #else
940     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
941                                                &signal->theData[0], recNode,
942                                                ptr);
943 #endif
944 
945     if (unlikely(! (ss == SEND_OK ||
946                     ss == SEND_BLOCKED ||
947                     ss == SEND_DISCONNECTED)))
948     {
949       handle_send_failed(ss, signal);
950     }
951   }
952 
953   signal->header.m_noOfSections = 0;
954   signal->header.m_fragmentInfo = 0;
955 
956   return;
957 }
958 
959 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const960 SimulatedBlock::sendSignal(BlockReference ref,
961 			   GlobalSignalNumber gsn,
962 			   Signal* signal,
963 			   Uint32 length,
964 			   JobBufferLevel jobBuffer,
965 			   SectionHandle* sections) const {
966 
967   Uint32 noOfSections = sections->m_cnt;
968   BlockReference sendBRef = reference();
969 
970   Uint32 recBlock = refToBlock(ref);
971   Uint32 recNode   = refToNode(ref);
972   Uint32 ourProcessor         = globalData.ownId;
973 
974   check_sections(signal, signal->header.m_noOfSections, noOfSections);
975 
976   signal->header.theLength = length;
977   signal->header.theVerId_signalNumber = gsn;
978   signal->header.theReceiversBlockNumber = recBlock;
979   signal->header.m_noOfSections = noOfSections;
980 
981   Uint32 tSignalId = signal->header.theSignalId;
982   Uint32 tFragInfo = signal->header.m_fragmentInfo;
983 
984   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
985     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
986     return;
987   }//if
988 #ifdef VM_TRACE
989   if(globalData.testOn){
990     Uint16 proc =
991       (recNode == 0 ? globalData.ownId : recNode);
992     signal->header.theSendersBlockRef = sendBRef;
993     globalSignalLoggers.sendSignal(signal->header,
994 				   jobBuffer,
995 				   &signal->theData[0],
996 				   proc,
997                                    sections->m_ptr, noOfSections);
998   }
999 #endif
1000 
1001   if(recNode == ourProcessor || recNode == 0) {
1002     signal->header.theSendersSignalId = tSignalId;
1003     signal->header.theSendersBlockRef = sendBRef;
1004 
1005     /**
1006      * We have to copy the data
1007      */
1008     Uint32 * dst = signal->theData + length;
1009     * dst ++ = sections->m_ptr[0].i;
1010     * dst ++ = sections->m_ptr[1].i;
1011     * dst ++ = sections->m_ptr[2].i;
1012 
1013 #ifdef NDBD_MULTITHREADED
1014     if (jobBuffer == JBB)
1015       sendlocal(m_threadId, &signal->header, signal->theData,
1016                 signal->theData + length);
1017     else
1018       sendprioa(m_threadId, &signal->header, signal->theData,
1019                 signal->theData + length);
1020 #else
1021     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1022 #endif
1023   } else {
1024     // send distributed Signal
1025     SignalHeader sh;
1026 
1027     Uint32 tTrace = signal->getTrace();
1028 
1029     sh.theVerId_signalNumber   = gsn;
1030     sh.theReceiversBlockNumber = recBlock;
1031     sh.theSendersBlockRef      = refToBlock(sendBRef);
1032     sh.theLength               = length;
1033     sh.theTrace                = tTrace;
1034     sh.theSignalId             = tSignalId;
1035     sh.m_noOfSections          = noOfSections;
1036     sh.m_fragmentInfo          = tFragInfo;
1037 
1038 #ifdef TRACE_DISTRIBUTED
1039     ndbout_c("send: %s(%d) to (%s, %d)",
1040 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1041 	     recNode);
1042 #endif
1043 
1044     SendStatus ss;
1045 #ifdef NDBD_MULTITHREADED
1046     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1047                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1048 #else
1049     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1050                                                &signal->theData[0], recNode,
1051                                                g_sectionSegmentPool,
1052                                                sections->m_ptr);
1053 #endif
1054 
1055     if (unlikely(! (ss == SEND_OK ||
1056                     ss == SEND_BLOCKED ||
1057                     ss == SEND_DISCONNECTED)))
1058     {
1059       handle_send_failed(ss, signal);
1060     }
1061 
1062     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1063   }
1064 
1065   signal->header.m_noOfSections = 0;
1066   signal->header.m_fragmentInfo = 0;
1067   sections->m_cnt = 0;
1068   return;
1069 }
1070 
1071 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1072 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1073 			   GlobalSignalNumber gsn,
1074 			   Signal* signal,
1075 			   Uint32 length,
1076 			   JobBufferLevel jobBuffer,
1077 			   SectionHandle * sections) const {
1078 
1079   Uint32 noOfSections = sections->m_cnt;
1080   Uint32 tSignalId = signal->header.theSignalId;
1081   Uint32 tTrace    = signal->getTrace();
1082   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1083 
1084   Uint32 ourProcessor = globalData.ownId;
1085   Uint32 recBlock = rg.m_block;
1086 
1087   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1088 
1089   signal->header.theLength = length;
1090   signal->header.theVerId_signalNumber = gsn;
1091   signal->header.theReceiversBlockNumber = recBlock;
1092   signal->header.theSendersSignalId = tSignalId;
1093   signal->header.theSendersBlockRef = reference();
1094   signal->header.m_noOfSections = noOfSections;
1095 
1096   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1097     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1098     return;
1099   }//if
1100 
1101   SignalHeader sh;
1102   sh.theVerId_signalNumber   = gsn;
1103   sh.theReceiversBlockNumber = recBlock;
1104   sh.theSendersBlockRef      = refToBlock(reference());
1105   sh.theLength               = length;
1106   sh.theTrace                = tTrace;
1107   sh.theSignalId             = tSignalId;
1108   sh.m_noOfSections          = noOfSections;
1109   sh.m_fragmentInfo          = tFragInfo;
1110 
1111   /**
1112    * Check own node
1113    */
1114   bool release = true;
1115   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1116   {
1117     release = false;
1118 #ifdef VM_TRACE
1119     if(globalData.testOn){
1120       globalSignalLoggers.sendSignal(signal->header,
1121 				     jobBuffer,
1122 				     &signal->theData[0],
1123 				     ourProcessor,
1124                                      sections->m_ptr, noOfSections);
1125     }
1126 #endif
1127     /**
1128      * We have to copy the data
1129      */
1130     Uint32 * dst = signal->theData + length;
1131     * dst ++ = sections->m_ptr[0].i;
1132     * dst ++ = sections->m_ptr[1].i;
1133     * dst ++ = sections->m_ptr[2].i;
1134 #ifdef NDBD_MULTITHREADED
1135     if (jobBuffer == JBB)
1136       sendlocal(m_threadId, &signal->header, signal->theData,
1137                 signal->theData + length);
1138     else
1139       sendprioa(m_threadId, &signal->header, signal->theData,
1140                 signal->theData + length);
1141 #else
1142     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1143 #endif
1144 
1145     rg.m_nodes.clear((Uint32)0);
1146     rg.m_nodes.clear(ourProcessor);
1147   }
1148 
1149   /**
1150    * Do the big loop
1151    */
1152   Uint32 recNode = 0;
1153   while(!rg.m_nodes.isclear()){
1154     recNode = rg.m_nodes.find(recNode + 1);
1155     rg.m_nodes.clear(recNode);
1156 
1157 #ifdef VM_TRACE
1158     if(globalData.testOn){
1159       globalSignalLoggers.sendSignal(signal->header,
1160 				     jobBuffer,
1161 				     &signal->theData[0],
1162 				     recNode,
1163                                      sections->m_ptr, noOfSections);
1164     }
1165 #endif
1166 
1167 #ifdef TRACE_DISTRIBUTED
1168     ndbout_c("send: %s(%d) to (%s, %d)",
1169 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1170 	     recNode);
1171 #endif
1172 
1173     SendStatus ss;
1174 #ifdef NDBD_MULTITHREADED
1175     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1176                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1177 #else
1178     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1179                                                &signal->theData[0], recNode,
1180                                                g_sectionSegmentPool,
1181                                                sections->m_ptr);
1182 #endif
1183 
1184     if (unlikely(! (ss == SEND_OK ||
1185                     ss == SEND_BLOCKED ||
1186                     ss == SEND_DISCONNECTED)))
1187     {
1188       handle_send_failed(ss, signal);
1189     }
1190   }
1191 
1192   if (release)
1193   {
1194     ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1195   }
1196 
1197   sections->m_cnt = 0;
1198   signal->header.m_noOfSections = 0;
1199   signal->header.m_fragmentInfo = 0;
1200 
1201   return;
1202 }
1203 
1204 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1205 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1206                                     GlobalSignalNumber gsn,
1207                                     Signal* signal,
1208                                     Uint32 length,
1209                                     JobBufferLevel jobBuffer,
1210                                     SectionHandle* sections) const {
1211 
1212   /**
1213    * Implementation the same as sendSignal(), except that
1214    * the sections are duplicated when sending locally, and
1215    * not released
1216    */
1217 
1218   Uint32 noOfSections = sections->m_cnt;
1219   BlockReference sendBRef = reference();
1220 
1221   Uint32 recBlock = refToBlock(ref);
1222   Uint32 recNode   = refToNode(ref);
1223   Uint32 ourProcessor         = globalData.ownId;
1224 
1225   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1226 
1227   signal->header.theLength = length;
1228   signal->header.theVerId_signalNumber = gsn;
1229   signal->header.theReceiversBlockNumber = recBlock;
1230   signal->header.m_noOfSections = noOfSections;
1231 
1232   Uint32 tSignalId = signal->header.theSignalId;
1233   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1234 
1235   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1236     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1237     return;
1238   }//if
1239 #ifdef VM_TRACE
1240   if(globalData.testOn){
1241     Uint16 proc =
1242       (recNode == 0 ? globalData.ownId : recNode);
1243     signal->header.theSendersBlockRef = sendBRef;
1244     globalSignalLoggers.sendSignal(signal->header,
1245 				   jobBuffer,
1246 				   &signal->theData[0],
1247 				   proc,
1248                                    sections->m_ptr, noOfSections);
1249   }
1250 #endif
1251 
1252   if(recNode == ourProcessor || recNode == 0) {
1253     signal->header.theSendersSignalId = tSignalId;
1254     signal->header.theSendersBlockRef = sendBRef;
1255 
1256     Uint32 * dst = signal->theData + length;
1257 
1258     /* We need to copy the segmented section data into separate
1259      * sections when sending locally and keeping a copy ourselves
1260      */
1261     for (Uint32 sec=0; sec < noOfSections; sec++)
1262     {
1263       Uint32 secCopy;
1264       bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1265       ndbrequire (ok);
1266       * dst ++ = secCopy;
1267     }
1268 
1269 #ifdef NDBD_MULTITHREADED
1270     if (jobBuffer == JBB)
1271       sendlocal(m_threadId, &signal->header, signal->theData,
1272                 signal->theData + length);
1273     else
1274       sendprioa(m_threadId, &signal->header, signal->theData,
1275                 signal->theData + length);
1276 #else
1277     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1278 #endif
1279   } else {
1280     // send distributed Signal
1281     SignalHeader sh;
1282 
1283     Uint32 tTrace = signal->getTrace();
1284 
1285     sh.theVerId_signalNumber   = gsn;
1286     sh.theReceiversBlockNumber = recBlock;
1287     sh.theSendersBlockRef      = refToBlock(sendBRef);
1288     sh.theLength               = length;
1289     sh.theTrace                = tTrace;
1290     sh.theSignalId             = tSignalId;
1291     sh.m_noOfSections          = noOfSections;
1292     sh.m_fragmentInfo          = tFragInfo;
1293 
1294 #ifdef TRACE_DISTRIBUTED
1295     ndbout_c("send: %s(%d) to (%s, %d)",
1296 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1297 	     recNode);
1298 #endif
1299 
1300     SendStatus ss;
1301 #ifdef NDBD_MULTITHREADED
1302     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1303                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1304 #else
1305     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1306                                                &signal->theData[0], recNode,
1307                                                g_sectionSegmentPool,
1308                                                sections->m_ptr);
1309 #endif
1310 
1311     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1312   }
1313 
1314   signal->header.m_noOfSections = 0;
1315   signal->header.m_fragmentInfo = 0;
1316   return;
1317 }
1318 
1319 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1320 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1321                                     GlobalSignalNumber gsn,
1322                                     Signal* signal,
1323                                     Uint32 length,
1324                                     JobBufferLevel jobBuffer,
1325                                     SectionHandle * sections) const {
1326   /**
1327    * Implementation the same as sendSignal(), except that
1328    * the sections are duplicated when sending locally, and
1329    * not released
1330    */
1331 
1332   Uint32 noOfSections = sections->m_cnt;
1333   Uint32 tSignalId = signal->header.theSignalId;
1334   Uint32 tTrace    = signal->getTrace();
1335   Uint32 tFragInfo = signal->header.m_fragmentInfo;
1336 
1337   Uint32 ourProcessor = globalData.ownId;
1338   Uint32 recBlock = rg.m_block;
1339 
1340   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1341 
1342   signal->header.theLength = length;
1343   signal->header.theVerId_signalNumber = gsn;
1344   signal->header.theReceiversBlockNumber = recBlock;
1345   signal->header.theSendersSignalId = tSignalId;
1346   signal->header.theSendersBlockRef = reference();
1347   signal->header.m_noOfSections = noOfSections;
1348 
1349   if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1350     signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1351     return;
1352   }//if
1353 
1354   SignalHeader sh;
1355   sh.theVerId_signalNumber   = gsn;
1356   sh.theReceiversBlockNumber = recBlock;
1357   sh.theSendersBlockRef      = refToBlock(reference());
1358   sh.theLength               = length;
1359   sh.theTrace                = tTrace;
1360   sh.theSignalId             = tSignalId;
1361   sh.m_noOfSections          = noOfSections;
1362   sh.m_fragmentInfo          = tFragInfo;
1363 
1364   /**
1365    * Check own node
1366    */
1367   if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1368   {
1369 #ifdef VM_TRACE
1370     if(globalData.testOn){
1371       globalSignalLoggers.sendSignal(signal->header,
1372 				     jobBuffer,
1373 				     &signal->theData[0],
1374 				     ourProcessor,
1375                                      sections->m_ptr, noOfSections);
1376     }
1377 #endif
1378 
1379     Uint32 * dst = signal->theData + length;
1380 
1381     /* We need to copy the segmented section data into separate
1382      * sections when sending locally and keeping a copy ourselves
1383      */
1384     for (Uint32 sec=0; sec < noOfSections; sec++)
1385     {
1386       Uint32 secCopy;
1387       bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1388       ndbrequire (ok);
1389       * dst ++ = secCopy;
1390     }
1391 
1392 #ifdef NDBD_MULTITHREADED
1393     if (jobBuffer == JBB)
1394       sendlocal(m_threadId, &signal->header, signal->theData,
1395                 signal->theData + length);
1396     else
1397       sendprioa(m_threadId, &signal->header, signal->theData,
1398                 signal->theData + length);
1399 #else
1400     globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1401 #endif
1402 
1403     rg.m_nodes.clear((Uint32)0);
1404     rg.m_nodes.clear(ourProcessor);
1405   }
1406 
1407   /**
1408    * Do the big loop
1409    */
1410   Uint32 recNode = 0;
1411   while(!rg.m_nodes.isclear()){
1412     recNode = rg.m_nodes.find(recNode + 1);
1413     rg.m_nodes.clear(recNode);
1414 
1415 #ifdef VM_TRACE
1416     if(globalData.testOn){
1417       globalSignalLoggers.sendSignal(signal->header,
1418 				     jobBuffer,
1419 				     &signal->theData[0],
1420 				     recNode,
1421                                      sections->m_ptr, noOfSections);
1422     }
1423 #endif
1424 
1425 #ifdef TRACE_DISTRIBUTED
1426     ndbout_c("send: %s(%d) to (%s, %d)",
1427 	     getSignalName(gsn), gsn, getBlockName(recBlock),
1428 	     recNode);
1429 #endif
1430 
1431     SendStatus ss;
1432 #ifdef NDBD_MULTITHREADED
1433     ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1434                         recNode, &g_sectionSegmentPool, sections->m_ptr);
1435 #else
1436     ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1437                                                &signal->theData[0], recNode,
1438                                                g_sectionSegmentPool,
1439                                                sections->m_ptr);
1440 #endif
1441 
1442     ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1443   }
1444 
1445   signal->header.m_noOfSections = 0;
1446   signal->header.m_fragmentInfo = 0;
1447 
1448   return;
1449 }
1450 
1451 
1452 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1453 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1454 				    GlobalSignalNumber gsn,
1455 				    Signal* signal,
1456 				    Uint32 delayInMilliSeconds,
1457 				    Uint32 length) const {
1458 
1459   BlockNumber bnr = refToBlock(ref);
1460 
1461   check_sections(signal, signal->header.m_noOfSections, 0);
1462 
1463   signal->header.theLength = length;
1464   signal->header.theSendersSignalId = signal->header.theSignalId;
1465   signal->header.theVerId_signalNumber = gsn;
1466   signal->header.theReceiversBlockNumber = bnr;
1467   signal->header.theSendersBlockRef = reference();
1468 
1469 #ifdef VM_TRACE
1470   {
1471     if(globalData.testOn){
1472       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1473 					      signal->header,
1474 					      0,
1475 					      &signal->theData[0],
1476 					      globalData.ownId);
1477     }
1478   }
1479 #endif
1480 
1481 #ifdef NDBD_MULTITHREADED
1482   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1483 #else
1484   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1485 #endif
1486 
1487   // befor 2nd parameter to globalTimeQueue.insert
1488   // (Priority)theSendSig[sigIndex].jobBuffer
1489 }
1490 
1491 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1492 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1493 				    GlobalSignalNumber gsn,
1494 				    Signal* signal,
1495 				    Uint32 delayInMilliSeconds,
1496 				    Uint32 length,
1497 				    SectionHandle * sections) const {
1498 
1499   Uint32 noOfSections = sections->m_cnt;
1500   BlockNumber bnr = refToBlock(ref);
1501 
1502   BlockReference sendBRef = reference();
1503 
1504   if (bnr == 0) {
1505     bnr_error();
1506   }//if
1507 
1508   check_sections(signal, signal->header.m_noOfSections, noOfSections);
1509 
1510   signal->header.theLength = length;
1511   signal->header.theSendersSignalId = signal->header.theSignalId;
1512   signal->header.theSendersBlockRef = sendBRef;
1513   signal->header.theVerId_signalNumber = gsn;
1514   signal->header.theReceiversBlockNumber = bnr;
1515   signal->header.m_noOfSections = noOfSections;
1516 
1517   Uint32 * dst = signal->theData + length;
1518   * dst ++ = sections->m_ptr[0].i;
1519   * dst ++ = sections->m_ptr[1].i;
1520   * dst ++ = sections->m_ptr[2].i;
1521 
1522 #ifdef VM_TRACE
1523   {
1524     if(globalData.testOn){
1525       globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1526 					      signal->header,
1527 					      0,
1528 					      &signal->theData[0],
1529 					      globalData.ownId);
1530     }
1531   }
1532 #endif
1533 
1534 #ifdef NDBD_MULTITHREADED
1535   senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1536 #else
1537   globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1538 #endif
1539 
1540   signal->header.m_noOfSections = 0;
1541   signal->header.m_fragmentInfo = 0;
1542   sections->m_cnt = 0;
1543 }
1544 
1545 void
release(SegmentedSectionPtr & ptr)1546 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1547 {
1548   ::release(SB_SP_ARG ptr);
1549 }
1550 
1551 void
releaseSection(Uint32 firstSegmentIVal)1552 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1553 {
1554   ::releaseSection(SB_SP_ARG firstSegmentIVal);
1555 }
1556 
1557 void
releaseSections(SectionHandle & handle)1558 SimulatedBlock::releaseSections(SectionHandle& handle)
1559 {
1560   ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1561   handle.m_cnt = 0;
1562 }
1563 
1564 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1565 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1566 {
1567   return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1568 }
1569 
1570 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1571 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1572 {
1573   return ::import(SB_SP_ARG first, src, len);
1574 }
1575 
1576 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len)1577 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
1578 {
1579   Ptr<SectionSegment> tmp;
1580   if (::import(SB_SP_ARG tmp, src, len))
1581   {
1582     ptr.i = tmp.i;
1583     ptr.p = tmp.p;
1584     ptr.sz = len;
1585     return true;
1586   }
1587   return false;
1588 }
1589 
1590 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1591 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1592 {
1593   return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1594 }
1595 
1596 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1597 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1598                                const Uint32* src, Uint32 len)
1599 {
1600   return ::writeToSection(firstSegmentIVal, offset, src, len);
1601 }
1602 
1603 class SectionSegmentPool&
getSectionSegmentPool()1604 SimulatedBlock::getSectionSegmentPool(){
1605   return g_sectionSegmentPool;
1606 }
1607 
1608 NewVARIABLE *
allocateBat(int batSize)1609 SimulatedBlock::allocateBat(int batSize){
1610   NewVARIABLE* bat = NewVarRef;
1611   bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
1612   NewVarRef = bat;
1613   theBATSize = batSize;
1614   return bat;
1615 }
1616 
1617 void
freeBat()1618 SimulatedBlock::freeBat(){
1619   if(NewVarRef != 0){
1620     free(NewVarRef);
1621     NewVarRef = 0;
1622   }
1623 }
1624 
1625 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)1626 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1627   assert(blockNo == blockToMain(blockNo));
1628   SimulatedBlock * sb = globalData.getBlock(blockNo);
1629   if (sb != 0 && instanceNo != 0)
1630     sb = sb->getInstance(instanceNo);
1631   if(sb == 0)
1632     return 0;
1633   return sb->NewVarRef;
1634 }
1635 
1636 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)1637 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1638   assert(blockNo == blockToMain(blockNo));
1639   SimulatedBlock * sb = globalData.getBlock(blockNo);
1640   if (sb != 0 && instanceNo != 0)
1641     sb = sb->getInstance(instanceNo);
1642   if(sb == 0)
1643     return 0;
1644   return sb->theBATSize;
1645 }
1646 
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)1647 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
1648 {
1649   return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1650 }
1651 
1652 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)1653 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
1654 {
1655 
1656   void * p = NULL;
1657   Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1658   size_t size = n*s + over_alloc;
1659   Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
1660   refresh_watch_dog(9);
1661   if (real_size > 0){
1662 #ifdef VM_TRACE_MEM
1663     ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
1664 	     getBlockName(number()),
1665 	     type,
1666 	     s,
1667 	     n,
1668 	     real_size);
1669 #endif
1670     if( real_size == (Uint64)size )
1671       p = ndbd_malloc(size);
1672     if (p == NULL){
1673       char buf1[255];
1674       char buf2[255];
1675       struct ndb_mgm_param_info param_info;
1676       size_t size = sizeof(ndb_mgm_param_info);
1677 
1678       if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, &param_info, &size)) {
1679         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
1680 	         getBlockName(number()), param_info.m_name);
1681       } else {
1682         BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
1683 	         getBlockName(number()), type);
1684       }
1685       BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
1686 	       (Uint32)s, (Uint32)n, (Uint64)real_size);
1687       ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1688     }
1689 
1690     if(clear){
1691       char * ptr = (char*)p;
1692       const Uint32 chunk = 128 * 1024;
1693       while(size > chunk){
1694 	refresh_watch_dog(9);
1695 	memset(ptr, 0, chunk);
1696 	ptr += chunk;
1697 	size -= chunk;
1698       }
1699       refresh_watch_dog(9);
1700       memset(ptr, 0, size);
1701     }
1702     if (unaligned_buffer)
1703     {
1704       *unaligned_buffer = p;
1705       p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1706 #ifdef VM_TRACE
1707       g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
1708                           type, align, (Uint64)p, (Uint64)p+n*s,
1709                           (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1710 #endif
1711     }
1712   }
1713   return p;
1714 }
1715 
1716 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)1717 SimulatedBlock::deallocRecord(void ** ptr,
1718 			      const char * type, size_t s, size_t n){
1719   (void)type;
1720 
1721   if(* ptr != 0){
1722       ndbd_free(* ptr, n*s);
1723     * ptr = 0;
1724   }
1725 }
1726 
1727 int
sortchunks(const void * _e0,const void * _e1)1728 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
1729 {
1730   const AllocChunk *p0 = (const AllocChunk*)_e0;
1731   const AllocChunk *p1 = (const AllocChunk*)_e1;
1732 
1733   if (p0->ptrI > p1->ptrI)
1734     return 1;
1735   if (p0->ptrI < p1->ptrI)
1736     return -1;
1737   return 0;
1738 }
1739 
1740 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)1741 SimulatedBlock::allocChunks(AllocChunk dst[],
1742                             Uint32 arraysize,
1743                             Uint32 rg,
1744                             Uint32 pages,
1745                             Uint32 paramId)
1746 {
1747   const Uint32 save = pages; // For fail
1748   Uint32 i = 0;
1749   for (; i<arraysize && pages > 0; i++)
1750   {
1751     Uint32 cnt = pages;
1752     m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1753     if (unlikely(cnt == 0))
1754       goto fail;
1755     pages -= cnt;
1756     dst[i].cnt = cnt;
1757   }
1758   if (unlikely(pages != 0))
1759     goto fail;
1760 
1761   qsort(dst, i, sizeof(dst[0]), sortchunks);
1762   return i;
1763 
1764 fail:
1765   char buf1[255];
1766   char buf2[255];
1767   struct ndb_mgm_param_info param_info;
1768   size_t size = sizeof(ndb_mgm_param_info);
1769 
1770   if (ndb_mgm_get_db_parameter_info(paramId, &param_info, &size) != 0)
1771   {
1772     ndbassert(false);
1773     param_info.m_name = "<unknown>";
1774   }
1775 
1776   BaseString::snprintf(buf1, sizeof(buf1),
1777                        "%s could not allocate memory for parameter %s",
1778                        getBlockName(number()), param_info.m_name);
1779   BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
1780                        Uint64(save) * sizeof(GlobalPage));
1781   ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1782   return 0;
1783 }
1784 
1785 void
refresh_watch_dog(Uint32 place)1786 SimulatedBlock::refresh_watch_dog(Uint32 place)
1787 {
1788 #ifdef NDBD_MULTITHREADED
1789   (*m_watchDogCounter) = place;
1790 #else
1791   globalData.incrementWatchDogCounter(place);
1792 #endif
1793 }
1794 
1795 void
update_watch_dog_timer(Uint32 interval)1796 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1797 {
1798   extern EmulatorData globalEmulatorData;
1799   globalEmulatorData.theWatchDog->setCheckInterval(interval);
1800 }
1801 
1802 void
progError(int line,int err_code,const char * extra) const1803 SimulatedBlock::progError(int line, int err_code, const char* extra) const {
1804   jamLine(line);
1805 
1806   const char *aBlockName = getBlockName(number(), "VM Kernel");
1807 
1808   // Pack status of interesting config variables
1809   // so that we can print them in error.log
1810   int magicStatus =
1811     (m_ctx.m_config.stopOnError()<<1) +
1812     (m_ctx.m_config.getInitialStart()<<2);
1813 
1814   /* Add line number to block name */
1815   char buf[100];
1816   BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
1817 	   aBlockName, line, magicStatus);
1818 
1819   ErrorReporter::handleError(err_code, extra, buf);
1820 
1821 }
1822 
1823 void
infoEvent(const char * msg,...) const1824 SimulatedBlock::infoEvent(const char * msg, ...) const {
1825   if(msg == 0)
1826     return;
1827 
1828   SignalT<25> signalT;
1829   signalT.theData[0] = NDB_LE_InfoEvent;
1830   char * buf = (char *)(signalT.theData+1);
1831 
1832   va_list ap;
1833   va_start(ap, msg);
1834   BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1835   va_end(ap);
1836 
1837   int len = strlen(buf) + 1;
1838   if(len > 96){
1839     len = 96;
1840     buf[95] = 0;
1841   }
1842 
1843   /**
1844    * Init and put it into the job buffer
1845    */
1846   memset(&signalT.header, 0, sizeof(SignalHeader));
1847 
1848   const Signal * signal = globalScheduler.getVMSignals();
1849   Uint32 tTrace = signal->header.theTrace;
1850   Uint32 tSignalId = signal->header.theSignalId;
1851 
1852   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
1853   signalT.header.theReceiversBlockNumber = CMVMI;
1854   signalT.header.theSendersBlockRef      = reference();
1855   signalT.header.theTrace                = tTrace;
1856   signalT.header.theSignalId             = tSignalId;
1857   signalT.header.theLength               = ((len+3)/4)+1;
1858 
1859 #ifdef NDBD_MULTITHREADED
1860   sendlocal(m_threadId,
1861             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1862 #else
1863   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1864                           signalT.m_sectionPtrI);
1865 #endif
1866 }
1867 
1868 void
warningEvent(const char * msg,...) const1869 SimulatedBlock::warningEvent(const char * msg, ...) const {
1870   if(msg == 0)
1871     return;
1872 
1873   SignalT<25> signalT;
1874   signalT.theData[0] = NDB_LE_WarningEvent;
1875   char * buf = (char *)(signalT.theData+1);
1876 
1877   va_list ap;
1878   va_start(ap, msg);
1879   BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1880   va_end(ap);
1881 
1882   int len = strlen(buf) + 1;
1883   if(len > 96){
1884     len = 96;
1885     buf[95] = 0;
1886   }
1887 
1888   /**
1889    * Init and put it into the job buffer
1890    */
1891   memset(&signalT.header, 0, sizeof(SignalHeader));
1892 
1893   const Signal * signal = globalScheduler.getVMSignals();
1894   Uint32 tTrace = signal->header.theTrace;
1895   Uint32 tSignalId = signal->header.theSignalId;
1896 
1897   signalT.header.theVerId_signalNumber   = GSN_EVENT_REP;
1898   signalT.header.theReceiversBlockNumber = CMVMI;
1899   signalT.header.theSendersBlockRef      = reference();
1900   signalT.header.theTrace                = tTrace;
1901   signalT.header.theSignalId             = tSignalId;
1902   signalT.header.theLength               = ((len+3)/4)+1;
1903 
1904 #ifdef NDBD_MULTITHREADED
1905   sendlocal(m_threadId,
1906             &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1907 #else
1908   globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1909                           signalT.m_sectionPtrI);
1910 #endif
1911 }
1912 
1913 void
execNODE_STATE_REP(Signal * signal)1914 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
1915   const NodeStateRep * const  rep = (NodeStateRep *)&signal->theData[0];
1916 
1917   this->theNodeState = rep->nodeState;
1918 }
1919 
1920 void
execCHANGE_NODE_STATE_REQ(Signal * signal)1921 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
1922   const ChangeNodeStateReq * const  req =
1923     (ChangeNodeStateReq *)&signal->theData[0];
1924 
1925   this->theNodeState = req->nodeState;
1926   const Uint32 senderData = req->senderData;
1927   const BlockReference senderRef = req->senderRef;
1928 
1929   /**
1930    * Pack return signal
1931    */
1932   ChangeNodeStateConf * const  conf =
1933     (ChangeNodeStateConf *)&signal->theData[0];
1934 
1935   conf->senderData = senderData;
1936 
1937   sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
1938 	     ChangeNodeStateConf::SignalLength, JBB);
1939 }
1940 
1941 void
execNDB_TAMPER(Signal * signal)1942 SimulatedBlock::execNDB_TAMPER(Signal * signal){
1943   if (signal->getLength() == 1)
1944   {
1945     SET_ERROR_INSERT_VALUE(signal->theData[0]);
1946   }
1947   else
1948   {
1949     SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
1950   }
1951 }
1952 
1953 void
execSIGNAL_DROPPED_REP(Signal * signal)1954 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
1955   /* Note no need for fragmented signal handling as we are
1956    * going to crash this node
1957    */
1958   char msg[64];
1959   const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
1960   BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
1961 	   rep->originalGsn, rep->originalLength,rep->originalSectionCount);
1962   ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
1963 			     msg,
1964 			     __FILE__,
1965 			     NST_ErrorHandler);
1966 }
1967 
1968 void
execCONTINUE_FRAGMENTED(Signal * signal)1969 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
1970   ljamEntry();
1971 
1972   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
1973   ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
1974 
1975   switch (sig->type)
1976   {
1977   case ContinueFragmented::CONTINUE_SENDING :
1978   {
1979     ljam();
1980     Ptr<FragmentSendInfo> fragPtr;
1981 
1982     c_segmentedFragmentSendList.first(fragPtr);
1983     for(; !fragPtr.isNull();){
1984       ljam();
1985       Ptr<FragmentSendInfo> copyPtr = fragPtr;
1986       c_segmentedFragmentSendList.next(fragPtr);
1987 
1988       sendNextSegmentedFragment(signal, * copyPtr.p);
1989       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
1990         ljam();
1991         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
1992           ljam();
1993           execute(signal, copyPtr.p->m_callback, 0);
1994         }//if
1995         c_segmentedFragmentSendList.release(copyPtr);
1996       }
1997     }
1998 
1999     c_linearFragmentSendList.first(fragPtr);
2000     for(; !fragPtr.isNull();){
2001       ljam();
2002       Ptr<FragmentSendInfo> copyPtr = fragPtr;
2003       c_linearFragmentSendList.next(fragPtr);
2004 
2005       sendNextLinearFragment(signal, * copyPtr.p);
2006       if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2007         ljam();
2008         if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2009           ljam();
2010           execute(signal, copyPtr.p->m_callback, 0);
2011         }//if
2012         c_linearFragmentSendList.release(copyPtr);
2013       }
2014     }
2015 
2016     if(c_segmentedFragmentSendList.isEmpty() &&
2017        c_linearFragmentSendList.isEmpty()){
2018       ljam();
2019       c_fragSenderRunning = false;
2020       return;
2021     }
2022 
2023     sig->type = ContinueFragmented::CONTINUE_SENDING;
2024     sig->line = __LINE__;
2025     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2026     break;
2027   }
2028   case ContinueFragmented::CONTINUE_CLEANUP:
2029   {
2030     ljam();
2031 
2032     const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2033     /* Check length of signal */
2034     ndbassert(signal->getLength() ==
2035               ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2036               callbackWords);
2037 
2038     Callback cb;
2039     memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2040 
2041     doNodeFailureCleanup(signal,
2042                          sig->cleanup.failedNodeId,
2043                          sig->cleanup.resource,
2044                          sig->cleanup.cursor,
2045                          sig->cleanup.elementsCleaned,
2046                          cb);
2047     break;
2048   }
2049   default:
2050     ndbrequire(false);
2051   }
2052 }
2053 
2054 void
execSTOP_FOR_CRASH(Signal * signal)2055 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2056 {
2057 #ifdef NDBD_MULTITHREADED
2058   mt_execSTOP_FOR_CRASH();
2059 #endif
2060 }
2061 
2062 void
execNODE_START_REP(Signal * signal)2063 SimulatedBlock::execNODE_START_REP(Signal* signal)
2064 {
2065 }
2066 
2067 void
execAPI_START_REP(Signal * signal)2068 SimulatedBlock::execAPI_START_REP(Signal* signal)
2069 {
2070 }
2071 
2072 void
execSEND_PACKED(Signal * signal)2073 SimulatedBlock::execSEND_PACKED(Signal* signal)
2074 {
2075 }
2076 
2077 // MT LQH callback CONF via signal
2078 
2079 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2080 SimulatedBlock::getCallbackEntry(Uint32 ci)
2081 {
2082   ndbrequire(m_callbackTableAddr != 0);
2083   const CallbackTable& ct = *m_callbackTableAddr;
2084   ndbrequire(ci < ct.m_count);
2085   return ct.m_entry[ci];
2086 }
2087 
2088 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 returnCode)2089 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2090                                  CallbackPtr& cptr, Uint32 returnCode)
2091 {
2092   Uint32 blockNo = blockToMain(fullBlockNo);
2093   Uint32 instanceNo = blockToInstance(fullBlockNo);
2094   SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2095   ndbrequire(b != 0);
2096 
2097   const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2098 
2099   // wl4391_todo add as arg if this is not enough
2100   Uint32 senderData = returnCode;
2101 
2102   if (!isNdbMtLqh()) {
2103     Callback c;
2104     c.m_callbackFunction = ce.m_function;
2105     c.m_callbackData = cptr.m_callbackData;
2106     b->execute(signal, c, returnCode);
2107 
2108     if (ce.m_flags & CALLBACK_ACK) {
2109       jam();
2110       CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2111       ack->senderData = senderData;
2112       EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2113                      signal, CallbackAck::SignalLength);
2114     }
2115   } else {
2116     CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2117     conf->senderData = senderData;
2118     conf->senderRef = reference();
2119     conf->callbackIndex = cptr.m_callbackIndex;
2120     conf->callbackData = cptr.m_callbackData;
2121     conf->returnCode = returnCode;
2122 
2123     if (ce.m_flags & CALLBACK_DIRECT) {
2124       jam();
2125       EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
2126                      signal, CallbackConf::SignalLength, instanceNo);
2127     } else {
2128       jam();
2129       BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2130       sendSignal(ref, GSN_CALLBACK_CONF,
2131                  signal, CallbackConf::SignalLength, JBB);
2132     }
2133   }
2134   cptr.m_callbackIndex = ZNIL;
2135 }
2136 
2137 void
execCALLBACK_CONF(Signal * signal)2138 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2139 {
2140   const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2141 
2142   Uint32 senderData = conf->senderData;
2143   Uint32 senderRef = conf->senderRef;
2144 
2145   ndbrequire(m_callbackTableAddr != 0);
2146   const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
2147   CallbackFunction function = ce.m_function;
2148 
2149   Callback callback;
2150   callback.m_callbackFunction = function;
2151   callback.m_callbackData = conf->callbackData;
2152   execute(signal, callback, conf->returnCode);
2153 
2154   if (ce.m_flags & CALLBACK_ACK) {
2155     jam();
2156     CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2157     ack->senderData = senderData;
2158     sendSignal(senderRef, GSN_CALLBACK_ACK,
2159                signal, CallbackAck::SignalLength, JBB);
2160   }
2161 }
2162 
2163 #ifdef VM_TRACE_TIME
2164 void
clearTimes()2165 SimulatedBlock::clearTimes() {
2166   for(Uint32 i = 0; i <= MAX_GSN; i++){
2167     m_timeTrace[i].cnt = 0;
2168     m_timeTrace[i].sum = 0;
2169     m_timeTrace[i].sub = 0;
2170   }
2171 }
2172 
2173 void
printTimes(FILE * output)2174 SimulatedBlock::printTimes(FILE * output){
2175   fprintf(output, "-- %s --\n", getBlockName(number()));
2176   Uint64 sum = 0;
2177   for(Uint32 i = 0; i <= MAX_GSN; i++){
2178     Uint32 n = m_timeTrace[i].cnt;
2179     if(n != 0){
2180       double dn = n;
2181 
2182       double avg = m_timeTrace[i].sum;
2183       double avg2 = avg - m_timeTrace[i].sub;
2184 
2185       avg /= dn;
2186       avg2 /= dn;
2187 
2188       fprintf(output,
2189 	      //name ; cnt ; loc ; acc
2190 	      "%s ; #%d ; %dus ; %dus ; %dms\n",
2191 	      getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2192 	      (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2193 
2194       sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2195     }
2196   }
2197   sum = (sum + 500)/ 1000;
2198   fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2199   fprintf(output, "\n");
2200   fflush(output);
2201 }
2202 
2203 #endif
2204 
FragmentInfo(Uint32 fragId,Uint32 sender)2205 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2206   m_fragmentId = fragId;
2207   m_senderRef = sender;
2208   m_sectionPtrI[0] = RNIL;
2209   m_sectionPtrI[1] = RNIL;
2210   m_sectionPtrI[2] = RNIL;
2211 }
2212 
FragmentSendInfo()2213 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2214 {
2215 }
2216 
2217 bool
assembleFragments(Signal * signal)2218 SimulatedBlock::assembleFragments(Signal * signal){
2219   Uint32 sigLen = signal->length() - 1;
2220   Uint32 fragId = signal->theData[sigLen];
2221   Uint32 fragInfo = signal->header.m_fragmentInfo;
2222   Uint32 senderRef = signal->getSendersBlockRef();
2223 
2224   Uint32 *sectionPtr = signal->m_sectionPtrI;
2225 
2226   if(fragInfo == 0){
2227     return true;
2228   }
2229 
2230   const Uint32 secs = signal->header.m_noOfSections;
2231   const Uint32 * const secNos = &signal->theData[sigLen - secs];
2232 
2233   if(fragInfo == 1){
2234     /**
2235      * First in train
2236      */
2237     Ptr<FragmentInfo> fragPtr;
2238     if(!c_fragmentInfoHash.seize(fragPtr)){
2239       ndbrequire(false);
2240       return false;
2241     }
2242 
2243     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2244     c_fragmentInfoHash.add(fragPtr);
2245 
2246     for(Uint32 i = 0; i<secs; i++){
2247       Uint32 sectionNo = secNos[i];
2248       ndbassert(sectionNo < 3);
2249       fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2250     }
2251 
2252     ndbassert(! fragPtr.p->isDropped() );
2253 
2254     /**
2255      * Don't release allocated segments
2256      */
2257     signal->header.m_fragmentInfo = 0;
2258     signal->header.m_noOfSections = 0;
2259     return false;
2260   }
2261 
2262   FragmentInfo key(fragId, senderRef);
2263   Ptr<FragmentInfo> fragPtr;
2264   if(c_fragmentInfoHash.find(fragPtr, key)){
2265 
2266     /**
2267      * FragInfo == 2 or 3
2268      */
2269     if ( likely(! fragPtr.p->isDropped()) )
2270     {
2271       Uint32 i;
2272       for(i = 0; i<secs; i++){
2273         Uint32 sectionNo = secNos[i];
2274         ndbassert(sectionNo < 3);
2275         Uint32 sectionPtrI = sectionPtr[i];
2276         if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2277           linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2278         } else {
2279           fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2280         }
2281       }
2282 
2283       /**
2284        * fragInfo = 2
2285        */
2286       if(fragInfo == 2){
2287         signal->header.m_fragmentInfo = 0;
2288         signal->header.m_noOfSections = 0;
2289         return false;
2290       }
2291 
2292       /**
2293        * fragInfo = 3
2294        */
2295       for(i = 0; i<3; i++){
2296         Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2297         if(ptrI != RNIL){
2298           signal->m_sectionPtrI[i] = ptrI;
2299         } else {
2300           break;
2301         }
2302       }
2303 
2304       signal->setLength(sigLen - secs);
2305       signal->header.m_noOfSections = i;
2306       signal->header.m_fragmentInfo = 0;
2307 
2308       c_fragmentInfoHash.release(fragPtr);
2309       return true;
2310     }
2311     else
2312     {
2313       /* This fragmented signal has already had at least 1 fragment
2314        * dropped.  We must release the received segments.
2315        */
2316       for (Uint32 i=0; i < secs; i++)
2317         releaseSection( sectionPtr[i] );
2318 
2319       signal->header.m_fragmentInfo = 0;
2320       signal->header.m_noOfSections = 0;
2321 
2322       /* FragInfo == 2
2323        * More fragments to come, keep waiting
2324        */
2325       if (fragInfo == 2)
2326         return false;
2327 
2328       /* FragInfo == 3
2329        * That was the last fragment.
2330        * We're now ready for handling the dropped signal.
2331        */
2332       SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2333       Uint32 gsn = signal->header.theVerId_signalNumber;
2334       Uint32 len = signal->header.theLength;
2335       Uint32 newLen= (len > 22 ? 22 : len);
2336       memmove(rep->originalData, signal->theData, (4 * newLen));
2337       rep->originalGsn = gsn;
2338       rep->originalLength = len;
2339       rep->originalSectionCount = 0;
2340       signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2341       signal->header.theLength = newLen + 3;
2342       signal->header.m_noOfSections = 0;
2343       signal->header.m_fragmentInfo = 3;
2344 
2345 
2346       /**
2347        * NOTE: Don't use EXECUTE_DIRECT as it
2348        *       sets sendersBlockRef to reference()
2349        */
2350       /* Perform dropped signal handling, in this thread, now */
2351       executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2352 
2353       /* return false to caller - they should not process the signal */
2354       return false;
2355     } // else (isDropped())
2356   }
2357 
2358   /**
2359    * Unable to find fragment
2360    */
2361   ndbrequire(false);
2362   return false;
2363 }
2364 
2365 bool
assembleDroppedFragments(Signal * signal)2366 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2367 {
2368   /* This method is called at the start of a  SIGNAL_DROPPED_REP
2369    * handler when there is a chance that the dropped signal could
2370    * be part of a fragmented signal.
2371    * If the dropped signal was a fragmented signal, this
2372    * needs to be handled specially to ensure that fragments
2373    * of the signal are correctly dropped to avoid segment
2374    * leaks etc.
2375    * There are a number of cases :
2376    *   1) First fragment dropped  (FragInfo=1)
2377    *      All remaining fragments must be dropped when they
2378    *      arrive.  The Signal dropped report handler must be
2379    *      executed when the last fragment has arrived.
2380    *   2) Middle fragment dropped  (FragInfo=2)
2381    *      Any existing stored segments must be released.
2382    *      All remaining fragments must be dropped when they
2383    *      arrive.
2384    *   3) Last fragment dropped  (FragInfo=3)
2385    *      Any existing stored segments must be released.
2386    *      Signal Dropped handling can occur, so return true.
2387    *
2388    * To indicate that a fragment has been dropped for a signal,
2389    * all the section I Values in the fragment's hash entry are
2390    * set to RNIL.
2391    * Signal Dropped Report handling is performed when the last
2392    * fragment arrives.  If the last fragment is not dropped
2393    * by the transporter layer then normal fragment assembly
2394    * arranges for dropped signal handling to occur.
2395    */
2396   Uint32 sigLen = signal->length() - 1;
2397   Uint32 fragId = signal->theData[sigLen];
2398   Uint32 fragInfo = signal->header.m_fragmentInfo;
2399   Uint32 senderRef = signal->getSendersBlockRef();
2400 
2401   if(fragInfo == 0){
2402     return true;
2403   }
2404 
2405   /* This method is for handling SIGNAL_DROPPED_REP only */
2406   ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2407   ndbrequire(signal->header.m_noOfSections == 0);
2408 
2409   if(fragInfo == 1){
2410     /**
2411      * First in train
2412      */
2413     Ptr<FragmentInfo> fragPtr;
2414     if(!c_fragmentInfoHash.seize(fragPtr)){
2415       ndbrequire(false);
2416       return false;
2417     }
2418 
2419     new (fragPtr.p)FragmentInfo(fragId, senderRef);
2420     c_fragmentInfoHash.add(fragPtr);
2421 
2422     /* Mark entry in hash as belonging to dropped signal so subsequent
2423      * fragments can also be dropped
2424      */
2425     fragPtr.p->m_sectionPtrI[0]= RNIL;
2426     fragPtr.p->m_sectionPtrI[1]= RNIL;
2427     fragPtr.p->m_sectionPtrI[2]= RNIL;
2428 
2429     /* Wait for last fragment before SignalDroppedRep handling */
2430     signal->header.m_fragmentInfo = 0;
2431     return false;
2432   }
2433 
2434   FragmentInfo key(fragId, senderRef);
2435   Ptr<FragmentInfo> fragPtr;
2436   if(c_fragmentInfoHash.find(fragPtr, key)){
2437 
2438     /**
2439      * FragInfo == 2 or 3
2440      */
2441     if (! fragPtr.p->isDropped() )
2442     {
2443       /* Fragmented Signal not already marked as dropped
2444        * Need to free stored segments
2445        */
2446       releaseSection(fragPtr.p->m_sectionPtrI[0]);
2447       releaseSection(fragPtr.p->m_sectionPtrI[1]);
2448       releaseSection(fragPtr.p->m_sectionPtrI[2]);
2449 
2450       /* Mark as dropped now */
2451       fragPtr.p->m_sectionPtrI[0]= RNIL;
2452       fragPtr.p->m_sectionPtrI[1]= RNIL;
2453       fragPtr.p->m_sectionPtrI[2]= RNIL;
2454 
2455       ndbassert( fragPtr.p->isDropped() );
2456     }
2457 
2458     /**
2459      * fragInfo = 2
2460      *   Still waiting for final fragments.
2461      *   Return false to caller.
2462      */
2463     if(fragInfo == 2){
2464       signal->header.m_fragmentInfo = 0;
2465       return false;
2466     }
2467 
2468     /**
2469      * fragInfo = 3
2470      *   All fragments received, remove entry
2471      *   from hash and return to caller for
2472      *   dropped signal handling.
2473      */
2474     signal->header.m_fragmentInfo = 0;
2475 
2476     c_fragmentInfoHash.release(fragPtr);
2477     return true;
2478   }
2479 
2480   /**
2481    * Unable to find fragment
2482    */
2483   ndbrequire(false);
2484   return false;
2485 }
2486 
2487 /**
2488  * doCleanupFragInfo
2489  * Iterate over block's Fragment assembly hash, looking
2490  * for in-assembly fragments from the failed node
2491  * Release these
2492  * Returns after each scanned bucket to avoid consuming
2493  * too much time.
2494  *
2495  * Parameters
2496  *   failedNodeId    : Node id of failed node
2497  *   cursor          : Hash bucket to start iteration from
2498  *   rtUnitsUsed     : Total rt units used
2499  *   elementsCleaned : Number of elements cleaned
2500  *
2501  * Updates
2502  *   cursor          : Hash bucket to continue iteration from
2503  *   rtUnitsUsed     : += units used
2504  *   elementsCleaned : += elements cleaned
2505  *
2506  * Returns
2507  *   true  if all FragInfo structs cleaned up
2508  *   false if more to do
2509  */
2510 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2511 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2512                                   Uint32& cursor,
2513                                   Uint32& rtUnitsUsed,
2514                                   Uint32& elementsCleaned)
2515 {
2516   ljam();
2517   DLHashTable<FragmentInfo>::Iterator iter;
2518 
2519   c_fragmentInfoHash.next(cursor, iter);
2520 
2521   const Uint32 startBucket = iter.bucket;
2522 
2523   while (!iter.isNull() &&
2524          (iter.bucket == startBucket))
2525   {
2526     ljam();
2527 
2528     Ptr<FragmentInfo> curr = iter.curr;
2529     c_fragmentInfoHash.next(iter);
2530 
2531     FragmentInfo* fragInfo = curr.p;
2532 
2533     if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2534     {
2535       ljam();
2536       /* We were assembling a fragmented signal from the
2537        * failed node, discard the partially assembled
2538        * sections and free the FragmentInfo hash entry
2539        */
2540       for(Uint32 s = 0; s<3; s++)
2541       {
2542         if (fragInfo->m_sectionPtrI[s] != RNIL)
2543         {
2544           ljam();
2545           SegmentedSectionPtr ssptr;
2546           getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2547           release(ssptr);
2548         }
2549       }
2550 
2551       /* Release FragmentInfo hash element */
2552       c_fragmentInfoHash.release(curr);
2553 
2554       elementsCleaned++;
2555       rtUnitsUsed+=3;
2556     }
2557 
2558     rtUnitsUsed++;
2559   } // while
2560 
2561   cursor = iter.bucket;
2562   return iter.isNull();
2563 }
2564 
2565 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2566 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2567                                   Uint32& cursor,
2568                                   Uint32& rtUnitsUsed,
2569                                   Uint32& elementsCleaned)
2570 {
2571   ljam();
2572 
2573   Ptr<FragmentSendInfo> fragPtr;
2574   const Uint32 NumSendLists = 2;
2575   ndbrequire(cursor < NumSendLists);
2576 
2577   DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
2578     { &c_segmentedFragmentSendList,
2579       &c_linearFragmentSendList };
2580 
2581   DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
2582 
2583   list->first(fragPtr);
2584   for(; !fragPtr.isNull();){
2585     ljam();
2586     Ptr<FragmentSendInfo> copyPtr = fragPtr;
2587     list->next(fragPtr);
2588     rtUnitsUsed++;
2589 
2590     NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
2591 
2592     if (rg.m_nodes.get(failedNodeId))
2593     {
2594       ljam();
2595       /* Fragmented signal is being sent to node */
2596       rg.m_nodes.clear(failedNodeId);
2597 
2598       if (rg.m_nodes.isclear())
2599       {
2600         ljam();
2601         /* No other nodes in receiver group - send
2602          * is cancelled
2603          * Will be cleaned up in the usual CONTINUE_FRAGMENTED
2604          * handling code.
2605          */
2606         copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2607       }
2608       elementsCleaned++;
2609     }
2610   }
2611 
2612   /* Next time we'll do the next list */
2613   cursor++;
2614 
2615   return (cursor == NumSendLists);
2616 }
2617 
2618 
2619 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)2620 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
2621                                      Uint32 failedNodeId,
2622                                      Uint32 resource,
2623                                      Uint32 cursor,
2624                                      Uint32 elementsCleaned,
2625                                      Callback& cb)
2626 {
2627   ljam();
2628   const bool userCallback = (cb.m_callbackFunction != 0);
2629   const Uint32 maxRtUnits = userCallback ?
2630 #ifdef VM_TRACE
2631     2 :
2632 #else
2633     16 :
2634 #endif
2635     ~0; /* Must complete all processing in this call */
2636 
2637   Uint32 rtUnitsUsed = 0;
2638 
2639   /* Loop over resources, cleaning them up */
2640   do
2641   {
2642     bool resourceDone= false;
2643     switch(resource) {
2644     case ContinueFragmented::RES_FRAGSEND:
2645     {
2646       ljam();
2647       resourceDone = doCleanupFragSend(failedNodeId, cursor,
2648                                        rtUnitsUsed, elementsCleaned);
2649       break;
2650     }
2651     case ContinueFragmented::RES_FRAGINFO:
2652     {
2653       ljam();
2654       resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2655                                        rtUnitsUsed, elementsCleaned);
2656       break;
2657     }
2658     case ContinueFragmented::RES_LAST:
2659     {
2660       ljam();
2661       /* Node failure processing complete, execute user callback if provided */
2662       if (userCallback)
2663         execute(signal, cb, elementsCleaned);
2664 
2665       return elementsCleaned;
2666     }
2667     default:
2668       ndbrequire(false);
2669     }
2670 
2671     /* Did we complete cleaning up this resource? */
2672     if (resourceDone)
2673     {
2674       resource++;
2675       cursor= 0;
2676     }
2677 
2678   } while (rtUnitsUsed <= maxRtUnits);
2679 
2680   ljam();
2681 
2682   /* Not yet completed failure handling.
2683    * Must have exhausted RT units.
2684    * Update cursor and re-invoke
2685    */
2686   ndbassert(userCallback);
2687 
2688   /* Send signal to continue processing */
2689 
2690   ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2691   sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2692   sig->cleanup.failedNodeId = failedNodeId;
2693   sig->cleanup.resource = resource;
2694   sig->cleanup.cursor = cursor;
2695   sig->cleanup.elementsCleaned= elementsCleaned;
2696   Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2697   Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2698     callbackWords;
2699   ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
2700   memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2701 
2702   sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2703 
2704   return elementsCleaned;
2705 }
2706 
2707 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)2708 SimulatedBlock::simBlockNodeFailure(Signal* signal,
2709                                     Uint32 failedNodeId,
2710                                     Callback& cb)
2711 {
2712   ljam();
2713   return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2714 }
2715 
2716 Uint32
debugPrintFragmentCounts()2717 SimulatedBlock::debugPrintFragmentCounts()
2718 {
2719   const char* blockName = getBlockName(theNumber);
2720   DLHashTable<FragmentInfo>::Iterator iter;
2721   Uint32 fragmentInfoCount = 0;
2722   c_fragmentInfoHash.first(iter);
2723 
2724   while(!iter.isNull())
2725   {
2726     fragmentInfoCount++;
2727     c_fragmentInfoHash.next(iter);
2728   }
2729 
2730   Ptr<FragmentSendInfo> ptr;
2731   Uint32 linSendInfoCount = 0;
2732 
2733   c_linearFragmentSendList.first(ptr);
2734 
2735   while (!ptr.isNull())
2736   {
2737     linSendInfoCount++;
2738     c_linearFragmentSendList.next(ptr);
2739   }
2740 
2741   Uint32 segSendInfoCount = 0;
2742   c_segmentedFragmentSendList.first(ptr);
2743 
2744   while (!ptr.isNull())
2745   {
2746     segSendInfoCount++;
2747     c_segmentedFragmentSendList.next(ptr);
2748   }
2749 
2750   ndbout_c("%s : Fragment assembly hash entry count : %d",
2751            blockName,
2752            fragmentInfoCount);
2753 
2754   ndbout_c("%s : Linear fragment send list size : %d",
2755            blockName,
2756            linSendInfoCount);
2757 
2758   ndbout_c("%s : Segmented fragment send list size : %d",
2759            blockName,
2760            segSendInfoCount);
2761 
2762   return fragmentInfoCount +
2763     linSendInfoCount +
2764     segSendInfoCount;
2765 }
2766 
2767 
2768 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)2769 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
2770 				  NodeReceiverGroup rg,
2771 				  GlobalSignalNumber gsn,
2772 				  Signal* signal,
2773 				  Uint32 length,
2774 				  JobBufferLevel jbuf,
2775 				  SectionHandle* sections,
2776                                   bool noRelease,
2777                                   Uint32 messageSize) {
2778 
2779   Uint32 noSections = sections->m_cnt;
2780   SegmentedSectionPtr * ptr = sections->m_ptr;
2781 
2782   info.m_sectionPtr[0].m_segmented.i = RNIL;
2783   info.m_sectionPtr[1].m_segmented.i = RNIL;
2784   info.m_sectionPtr[2].m_segmented.i = RNIL;
2785 
2786   Uint32 totalSize = 0;
2787   switch(noSections){
2788   case 3:
2789     info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2790     info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2791     totalSize += ptr[2].sz;
2792   case 2:
2793     info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2794     info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2795     totalSize += ptr[1].sz;
2796   case 1:
2797     info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2798     info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2799     totalSize += ptr[0].sz;
2800   }
2801 
2802   if(totalSize <= messageSize + SectionSegment::DataLength){
2803     /**
2804      * Send signal directly
2805      */
2806     if (noRelease)
2807       sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
2808     else
2809       sendSignal(rg, gsn, signal, length, jbuf, sections);
2810 
2811     info.m_status = FragmentSendInfo::SendComplete;
2812     return true;
2813   }
2814 
2815   /**
2816    * Setup info object
2817    */
2818   info.m_status = FragmentSendInfo::SendNotComplete;
2819   info.m_prio = (Uint8)jbuf;
2820   info.m_gsn = gsn;
2821   info.m_fragInfo = 1;
2822   info.m_flags = 0;
2823   info.m_messageSize = messageSize;
2824   info.m_fragmentId = c_fragmentIdCounter++;
2825   info.m_nodeReceiverGroup = rg;
2826   info.m_callback.m_callbackFunction = 0;
2827 
2828   if (noRelease)
2829   {
2830     /* Record info that we are not releasing segments */
2831     info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2832   }
2833   else
2834   {
2835     /**
2836      * Clear sections in caller's handle.  Actual send
2837      * will consume them
2838      */
2839     sections->m_cnt = 0;
2840   }
2841 
2842   /* Store main signal data in a segment for sending later */
2843   Ptr<SectionSegment> tmp;
2844   if(!import(tmp, &signal->theData[0], length))
2845   {
2846     handle_out_of_longsignal_memory(0);
2847     return false;
2848   }
2849   info.m_theDataSection.p = &tmp.p->theData[0];
2850   info.m_theDataSection.sz = length;
2851   tmp.p->theData[length] = tmp.i;
2852 
2853   sendNextSegmentedFragment(signal, info);
2854 
2855   if(c_fragmentIdCounter == 0){
2856     /**
2857      * Fragment id 0 is invalid
2858      */
2859     c_fragmentIdCounter = 1;
2860   }
2861 
2862   return true;
2863 }
2864 
2865 #if 0
2866 #define lsout(x) x
2867 #else
2868 #define lsout(x)
2869 #endif
2870 
2871 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)2872 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
2873 					  FragmentSendInfo & info){
2874 
2875   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2876   {
2877     /* Send was cancelled - all dest. nodes have failed
2878      * since send was started
2879      */
2880     if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2881     {
2882       /*
2883        * Free any sections still to be sent
2884        */
2885       SectionHandle handle(this);
2886       for (Uint32 s = 0; s < 3; s++)
2887       {
2888         Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2889         if (sectionI != RNIL)
2890         {
2891           getSection(handle.m_ptr[handle.m_cnt], sectionI);
2892           info.m_sectionPtr[s].m_segmented.i = RNIL;
2893           info.m_sectionPtr[s].m_segmented.p = NULL;
2894           handle.m_cnt++;
2895         }
2896       }
2897 
2898       releaseSections(handle);
2899     }
2900 
2901     /* Free inline signal data storage section */
2902     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2903     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
2904 
2905     info.m_status = FragmentSendInfo::SendComplete;
2906     return;
2907   }
2908 
2909   /**
2910    * Setup main signal data from stored copy
2911    */
2912   const Uint32 sigLen = info.m_theDataSection.sz;
2913   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
2914 
2915   Uint32 sz = 0;
2916   Uint32 maxSz = info.m_messageSize;
2917 
2918   Int32 secNo = 2;
2919   Uint32 secCount = 0;
2920   Uint32 * secNos = &signal->theData[sigLen];
2921 
2922   SectionHandle sections(this);
2923   SegmentedSectionPtr *ptr = sections.m_ptr;
2924 
2925   bool split= false;
2926   Uint32 splitSectionStartI= RNIL;
2927   SectionSegment* splitSectionStartP= NULL;
2928   Uint32 splitSectionLastSegment= RNIL;
2929   Uint32 splitSectionSz= 0;
2930 
2931   enum { Unknown = 0, Full = 1 } loop = Unknown;
2932   for(; secNo >= 0 && secCount < 3; secNo--){
2933     Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
2934     if(ptrI == RNIL)
2935       continue;
2936 
2937     info.m_sectionPtr[secNo].m_segmented.i = RNIL;
2938 
2939     SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
2940     const Uint32 size = ptrP->m_sz;
2941 
2942     ptr[secCount].i = ptrI;
2943     ptr[secCount].p = ptrP;
2944     ptr[secCount].sz = size;
2945     secNos[secCount] = secNo;
2946     secCount++;
2947 
2948     const Uint32 sizeLeft = maxSz - sz;
2949     if(size <= sizeLeft){
2950       /**
2951        * The section fits
2952        */
2953       sz += size;
2954       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
2955       continue;
2956     }
2957 
2958     const Uint32 overflow = size - sizeLeft; // > 0
2959     if(overflow <= SectionSegment::DataLength){
2960       /**
2961        * Only one segment left to send
2962        *   send even if sizeLeft <= size
2963        */
2964       lsout(ndbout_c("section %d saved as %d but full over: %d",
2965 		     secNo, secCount-1, overflow));
2966       secNo--;
2967       break;
2968     }
2969 
2970     // size >= 61
2971     if(sizeLeft < SectionSegment::DataLength){
2972       /**
2973        * Less than one segment left (space)
2974        *   dont bother sending
2975        */
2976       secCount--;
2977       info.m_sectionPtr[secNo].m_segmented.i = ptrI;
2978       loop = Full;
2979       lsout(ndbout_c("section %d not saved", secNo));
2980       break;
2981     }
2982 
2983     /**
2984      * Split list
2985      * 1) Find place to split
2986      * 2) Rewrite header (the part that will be sent)
2987      * 3) Write new header (for remaining part)
2988      * 4) Store new header on FragmentSendInfo - record
2989      */
2990     // size >= 61 && sizeLeft >= 60
2991     Uint32 sum = SectionSegment::DataLength;
2992     Uint32 prevPtrI = ptrI;
2993     ptrI = ptrP->m_nextSegment;
2994     const Uint32 fill = sizeLeft - SectionSegment::DataLength;
2995     while(sum < fill){
2996       prevPtrI = ptrI;
2997       ptrP = g_sectionSegmentPool.getPtr(ptrI);
2998       ptrI = ptrP->m_nextSegment;
2999       sum += SectionSegment::DataLength;
3000     }
3001 
3002     Uint32 prev = secCount - 1;
3003     /**
3004      * Record details of the section pre-split
3005      * This allows the split to be 'healed' afterwards in the
3006      * no release case.
3007      */
3008     split= true;
3009     splitSectionStartI= ptr[prev].i;
3010     splitSectionStartP= ptr[prev].p;
3011     splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3012     splitSectionSz= splitSectionStartP->m_sz;
3013 
3014     /**
3015      * Rewrite header w.r.t size and last
3016      * This is what will be sent in this fragment.
3017      */
3018     splitSectionStartP->m_lastSegment = prevPtrI;
3019     splitSectionStartP->m_sz = sum;
3020     ptr[prev].sz = sum;
3021 
3022     /**
3023      * Write "new" list header
3024      * This is what remains to be sent in this section
3025      */
3026     ptrP = g_sectionSegmentPool.getPtr(ptrI);
3027     ptrP->m_lastSegment = splitSectionLastSegment;
3028     ptrP->m_sz = size - sum;
3029 
3030     /**
3031      * And store it on info-record
3032      */
3033     info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3034     info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3035 
3036     loop = Full;
3037     lsout(ndbout_c("section %d split into %d", secNo, prev));
3038     break;
3039   }
3040 
3041   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3042 		 loop, secNo, secCount, sz));
3043 
3044   /**
3045    * Store fragment id
3046    */
3047   secNos[secCount] = info.m_fragmentId;
3048 
3049   Uint32 fragInfo = info.m_fragInfo;
3050   info.m_fragInfo = 2;
3051   switch(loop){
3052   case Unknown:
3053     if(secNo >= 0){
3054       lsout(ndbout_c("Unknown - Full"));
3055       /**
3056        * Not finished
3057        */
3058       break;
3059     }
3060     // Fall through
3061     lsout(ndbout_c("Unknown - Done"));
3062     info.m_status = FragmentSendInfo::SendComplete;
3063     ndbassert(fragInfo == 2);
3064     fragInfo = 3;
3065   case Full:
3066     break;
3067   }
3068 
3069   signal->header.m_fragmentInfo = fragInfo;
3070   signal->header.m_noOfSections = 0;
3071   sections.m_cnt = secCount;
3072 
3073   if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3074   {
3075     sendSignalNoRelease(info.m_nodeReceiverGroup,
3076                         info.m_gsn,
3077                         signal,
3078                         sigLen + secCount + 1,
3079                         (JobBufferLevel)info.m_prio,
3080                         &sections);
3081     /* NoRelease leaves SectionHandle populated, we'll
3082      * clear it here.  The actual sections themselves
3083      * remain allocated.
3084      */
3085     sections.m_cnt = 0;
3086 
3087     if (split)
3088     {
3089       /* There was a split section, which required us to modify the
3090        * segment list.
3091        * Now restore the split section's segment list back to
3092        * its previous state
3093        * (Only really required for first segment, but we do
3094        *  it for all of them, to be a good citizen)
3095        */
3096       ndbrequire( splitSectionStartI != RNIL );
3097       ndbrequire( splitSectionStartP != NULL );
3098       ndbrequire( splitSectionLastSegment != RNIL );
3099 
3100       splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3101       splitSectionStartP->m_sz= splitSectionSz;
3102 
3103       /* Check our handiwork */
3104       assert(verifySection(splitSectionStartI));
3105     }
3106   }
3107   else
3108   {
3109     /* Normal, release sections case */
3110     sendSignal(info.m_nodeReceiverGroup,
3111                info.m_gsn,
3112                signal,
3113                sigLen + secCount + 1,
3114                (JobBufferLevel)info.m_prio,
3115                &sections);
3116   }
3117 
3118   if(fragInfo == 3){
3119     /**
3120      * This is the last signal
3121      * Release saved 'main signal' words segment
3122      */
3123     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3124   }
3125 }
3126 
3127 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3128 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3129 				  NodeReceiverGroup rg,
3130 				  GlobalSignalNumber gsn,
3131 				  Signal* signal,
3132 				  Uint32 length,
3133 				  JobBufferLevel jbuf,
3134 				  LinearSectionPtr ptr[3],
3135 				  Uint32 noOfSections,
3136 				  Uint32 messageSize){
3137 
3138   check_sections(signal, signal->header.m_noOfSections, noOfSections);
3139 
3140   info.m_sectionPtr[0].m_linear.p = NULL;
3141   info.m_sectionPtr[1].m_linear.p = NULL;
3142   info.m_sectionPtr[2].m_linear.p = NULL;
3143 
3144   Uint32 totalSize = 0;
3145   switch(noOfSections){
3146   case 3:
3147     info.m_sectionPtr[2].m_linear = ptr[2];
3148     totalSize += ptr[2].sz;
3149   case 2:
3150     info.m_sectionPtr[1].m_linear = ptr[1];
3151     totalSize += ptr[1].sz;
3152   case 1:
3153     info.m_sectionPtr[0].m_linear = ptr[0];
3154     totalSize += ptr[0].sz;
3155   }
3156 
3157   if(totalSize <= messageSize + SectionSegment::DataLength){
3158     /**
3159      * Send signal directly
3160      */
3161     sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3162     info.m_status = FragmentSendInfo::SendComplete;
3163 
3164     /**
3165      * Indicate to sendLinearSignalFragment
3166      *   that we'r already done
3167      */
3168     return true;
3169   }
3170 
3171   /**
3172    * Setup info object
3173    */
3174   info.m_status = FragmentSendInfo::SendNotComplete;
3175   info.m_prio = (Uint8)jbuf;
3176   info.m_gsn = gsn;
3177   info.m_messageSize = messageSize;
3178   info.m_fragInfo = 1;
3179   info.m_flags = 0;
3180   info.m_fragmentId = c_fragmentIdCounter++;
3181   info.m_nodeReceiverGroup = rg;
3182   info.m_callback.m_callbackFunction = 0;
3183 
3184   Ptr<SectionSegment> tmp;
3185   if(unlikely(!import(tmp, &signal->theData[0], length)))
3186   {
3187     handle_out_of_longsignal_memory(0);
3188     return false;
3189   }
3190 
3191   info.m_theDataSection.p = &tmp.p->theData[0];
3192   info.m_theDataSection.sz = length;
3193   tmp.p->theData[length] = tmp.i;
3194 
3195   sendNextLinearFragment(signal, info);
3196 
3197   if(c_fragmentIdCounter == 0){
3198     /**
3199      * Fragment id 0 is invalid
3200      */
3201     c_fragmentIdCounter = 1;
3202   }
3203 
3204   return true;
3205 }
3206 
3207 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3208 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3209 				       FragmentSendInfo & info){
3210 
3211   if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3212   {
3213     /* Send was cancelled - all dest. nodes have failed
3214      * since send was started
3215      */
3216     /* Free inline signal data storage section */
3217     Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3218     g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3219 
3220     info.m_status = FragmentSendInfo::SendComplete;
3221     return;
3222   }
3223 
3224   /**
3225    * Store "theData"
3226    */
3227   const Uint32 sigLen = info.m_theDataSection.sz;
3228   memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3229 
3230   Uint32 sz = 0;
3231   Uint32 maxSz = info.m_messageSize;
3232 
3233   Int32 secNo = 2;
3234   Uint32 secCount = 0;
3235   Uint32 * secNos = &signal->theData[sigLen];
3236   LinearSectionPtr signalPtr[3];
3237 
3238   enum { Unknown = 0, Full = 2 } loop = Unknown;
3239   for(; secNo >= 0 && secCount < 3; secNo--){
3240     Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3241     if(ptrP == NULL)
3242       continue;
3243 
3244     info.m_sectionPtr[secNo].m_linear.p = NULL;
3245     const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3246 
3247     signalPtr[secCount].p = ptrP;
3248     signalPtr[secCount].sz = size;
3249     secNos[secCount] = secNo;
3250     secCount++;
3251 
3252     const Uint32 sizeLeft = maxSz - sz;
3253     if(size <= sizeLeft){
3254       /**
3255        * The section fits
3256        */
3257       sz += size;
3258       lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3259       continue;
3260     }
3261 
3262     const Uint32 overflow = size - sizeLeft; // > 0
3263     if(overflow <= SectionSegment::DataLength){
3264       /**
3265        * Only one segment left to send
3266        *   send even if sizeLeft <= size
3267        */
3268       lsout(ndbout_c("section %d saved as %d but full over: %d",
3269 		     secNo, secCount-1, overflow));
3270       secNo--;
3271       break;
3272     }
3273 
3274     // size >= 61
3275     if(sizeLeft < SectionSegment::DataLength){
3276       /**
3277        * Less than one segment left (space)
3278        *   dont bother sending
3279        */
3280       secCount--;
3281       info.m_sectionPtr[secNo].m_linear.p = ptrP;
3282       loop = Full;
3283       lsout(ndbout_c("section %d not saved", secNo));
3284       break;
3285     }
3286 
3287     /**
3288      * Split list
3289      * 1) Find place to split
3290      * 2) Rewrite header (the part that will be sent)
3291      * 3) Write new header (for remaining part)
3292      * 4) Store new header on FragmentSendInfo - record
3293      */
3294     Uint32 sum = sizeLeft;
3295     sum /= SectionSegment::DataLength;
3296     sum *= SectionSegment::DataLength;
3297 
3298     /**
3299      * Rewrite header w.r.t size
3300      */
3301     Uint32 prev = secCount - 1;
3302     signalPtr[prev].sz = sum;
3303 
3304     /**
3305      * Write/store "new" header
3306      */
3307     info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3308     info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3309 
3310     loop = Full;
3311     lsout(ndbout_c("section %d split into %d", secNo, prev));
3312     break;
3313   }
3314 
3315   lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3316 		 loop, secNo, secCount, sz));
3317 
3318   /**
3319    * Store fragment id
3320    */
3321   secNos[secCount] = info.m_fragmentId;
3322 
3323   Uint32 fragInfo = info.m_fragInfo;
3324   info.m_fragInfo = 2;
3325   switch(loop){
3326   case Unknown:
3327     if(secNo >= 0){
3328       lsout(ndbout_c("Unknown - Full"));
3329       /**
3330        * Not finished
3331        */
3332       break;
3333     }
3334     // Fall through
3335     lsout(ndbout_c("Unknown - Done"));
3336     info.m_status = FragmentSendInfo::SendComplete;
3337     ndbassert(fragInfo == 2);
3338     fragInfo = 3;
3339   case Full:
3340     break;
3341   }
3342 
3343   signal->header.m_noOfSections = 0;
3344   signal->header.m_fragmentInfo = fragInfo;
3345 
3346   sendSignal(info.m_nodeReceiverGroup,
3347 	     info.m_gsn,
3348 	     signal,
3349 	     sigLen + secCount + 1,
3350 	     (JobBufferLevel)info.m_prio,
3351 	     signalPtr,
3352 	     secCount);
3353 
3354   if(fragInfo == 3){
3355     /**
3356      * This is the last signal
3357      */
3358     g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3359   }
3360 }
3361 
3362 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3363 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3364 				     GlobalSignalNumber gsn,
3365 				     Signal* signal,
3366 				     Uint32 length,
3367 				     JobBufferLevel jbuf,
3368 				     SectionHandle* sections,
3369 				     Callback & c,
3370 				     Uint32 messageSize){
3371   bool res = true;
3372   Ptr<FragmentSendInfo> tmp;
3373   res = c_segmentedFragmentSendList.seize(tmp);
3374   ndbrequire(res);
3375 
3376   res = sendFirstFragment(* tmp.p,
3377 			  NodeReceiverGroup(ref),
3378 			  gsn,
3379 			  signal,
3380 			  length,
3381 			  jbuf,
3382 			  sections,
3383                           false, // Release sections on send
3384 			  messageSize);
3385   ndbrequire(res);
3386 
3387   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3388     c_segmentedFragmentSendList.release(tmp);
3389     if(c.m_callbackFunction != 0)
3390       execute(signal, c, 0);
3391     return;
3392   }
3393   tmp.p->m_callback = c;
3394 
3395   if(!c_fragSenderRunning)
3396   {
3397     SaveSignal<2> save(signal);
3398     c_fragSenderRunning = true;
3399     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3400     sig->type = ContinueFragmented::CONTINUE_SENDING;
3401     sig->line = __LINE__;
3402     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3403   }
3404 }
3405 
3406 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3407 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3408 				     GlobalSignalNumber gsn,
3409 				     Signal* signal,
3410 				     Uint32 length,
3411 				     JobBufferLevel jbuf,
3412 				     SectionHandle * sections,
3413 				     Callback & c,
3414 				     Uint32 messageSize){
3415   bool res = true;
3416   Ptr<FragmentSendInfo> tmp;
3417   res = c_segmentedFragmentSendList.seize(tmp);
3418   ndbrequire(res);
3419 
3420   res = sendFirstFragment(* tmp.p,
3421 			  rg,
3422 			  gsn,
3423 			  signal,
3424 			  length,
3425 			  jbuf,
3426 			  sections,
3427 			  false, // Release sections on send
3428                           messageSize);
3429   ndbrequire(res);
3430 
3431   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3432     c_segmentedFragmentSendList.release(tmp);
3433     if(c.m_callbackFunction != 0)
3434       execute(signal, c, 0);
3435     return;
3436   }
3437   tmp.p->m_callback = c;
3438 
3439   if(!c_fragSenderRunning)
3440   {
3441     SaveSignal<2> save(signal);
3442     c_fragSenderRunning = true;
3443     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3444     sig->type = ContinueFragmented::CONTINUE_SENDING;
3445     sig->line = __LINE__;
3446     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3447   }
3448 }
3449 
3450 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3451 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3452 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3453 { abort(); /* should never be called */ }
3454 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3455 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3456 
3457 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3458 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3459 				     GlobalSignalNumber gsn,
3460 				     Signal* signal,
3461 				     Uint32 length,
3462 				     JobBufferLevel jbuf,
3463 				     LinearSectionPtr ptr[3],
3464 				     Uint32 noOfSections,
3465 				     Callback & c,
3466 				     Uint32 messageSize){
3467   bool res = true;
3468   Ptr<FragmentSendInfo> tmp;
3469   res = c_linearFragmentSendList.seize(tmp);
3470   ndbrequire(res);
3471 
3472   res = sendFirstFragment(* tmp.p,
3473 			  NodeReceiverGroup(ref),
3474 			  gsn,
3475 			  signal,
3476 			  length,
3477 			  jbuf,
3478 			  ptr,
3479 			  noOfSections,
3480 			  messageSize);
3481   ndbrequire(res);
3482 
3483   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3484     c_linearFragmentSendList.release(tmp);
3485     if(c.m_callbackFunction != 0)
3486       execute(signal, c, 0);
3487     return;
3488   }
3489   tmp.p->m_callback = c;
3490 
3491   if(!c_fragSenderRunning)
3492   {
3493     SaveSignal<2> save(signal);
3494     c_fragSenderRunning = true;
3495     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3496     sig->type = ContinueFragmented::CONTINUE_SENDING;
3497     sig->line = __LINE__;
3498     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3499   }
3500 }
3501 
3502 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3503 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3504 				     GlobalSignalNumber gsn,
3505 				     Signal* signal,
3506 				     Uint32 length,
3507 				     JobBufferLevel jbuf,
3508 				     LinearSectionPtr ptr[3],
3509 				     Uint32 noOfSections,
3510 				     Callback & c,
3511 				     Uint32 messageSize){
3512   bool res = true;
3513   Ptr<FragmentSendInfo> tmp;
3514   res = c_linearFragmentSendList.seize(tmp);
3515   ndbrequire(res);
3516 
3517   res = sendFirstFragment(* tmp.p,
3518 			  rg,
3519 			  gsn,
3520 			  signal,
3521 			  length,
3522 			  jbuf,
3523 			  ptr,
3524 			  noOfSections,
3525 			  messageSize);
3526   ndbrequire(res);
3527 
3528   if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3529     c_linearFragmentSendList.release(tmp);
3530     if(c.m_callbackFunction != 0)
3531       execute(signal, c, 0);
3532     return;
3533   }
3534   tmp.p->m_callback = c;
3535 
3536   if(!c_fragSenderRunning)
3537   {
3538     SaveSignal<2> save(signal);
3539     c_fragSenderRunning = true;
3540     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3541     sig->type = ContinueFragmented::CONTINUE_SENDING;
3542     sig->line = __LINE__;
3543     sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3544   }
3545 }
3546 
3547 NodeInfo &
setNodeInfo(NodeId nodeId)3548 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3549   ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3550   return globalData.m_nodeInfo[nodeId];
3551 }
3552 
3553 bool
isMultiThreaded()3554 SimulatedBlock::isMultiThreaded()
3555 {
3556 #ifdef NDBD_MULTITHREADED
3557   return true;
3558 #else
3559   return false;
3560 #endif
3561 }
3562 
3563 
3564 void
execUTIL_CREATE_LOCK_REF(Signal * signal)3565 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
3566   ljamEntry();
3567   c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3568 }
3569 
execUTIL_CREATE_LOCK_CONF(Signal * signal)3570 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
3571   ljamEntry();
3572   c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3573 }
3574 
execUTIL_DESTORY_LOCK_REF(Signal * signal)3575 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
3576   ljamEntry();
3577   c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3578 }
3579 
execUTIL_DESTORY_LOCK_CONF(Signal * signal)3580 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
3581   ljamEntry();
3582   c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3583 }
3584 
execUTIL_LOCK_REF(Signal * signal)3585 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
3586   ljamEntry();
3587   c_mutexMgr.execUTIL_LOCK_REF(signal);
3588 }
3589 
execUTIL_LOCK_CONF(Signal * signal)3590 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
3591   ljamEntry();
3592   c_mutexMgr.execUTIL_LOCK_CONF(signal);
3593 }
3594 
execUTIL_UNLOCK_REF(Signal * signal)3595 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
3596   ljamEntry();
3597   c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3598 }
3599 
execUTIL_UNLOCK_CONF(Signal * signal)3600 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
3601   ljamEntry();
3602   c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3603 }
3604 
3605 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)3606 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
3607 					  Uint32 ptrI, Uint32 retVal){
3608   c_mutexMgr.release(ptrI);
3609 }
3610 
3611 void
fsRefError(Signal * signal,Uint32 line,const char * msg)3612 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
3613 {
3614   const FsRef *fsRef = (FsRef*)signal->getDataPtr();
3615   Uint32 errorCode = fsRef->errorCode;
3616   Uint32 osErrorCode = fsRef->osErrorCode;
3617   char msg2[100];
3618 
3619   sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3620 
3621   progError(line, errorCode, msg2);
3622 }
3623 
3624 void
execFSWRITEREF(Signal * signal)3625 SimulatedBlock::execFSWRITEREF(Signal* signal)
3626 {
3627   fsRefError(signal, __LINE__, "File system write failed");
3628 }
3629 
3630 void
execFSREADREF(Signal * signal)3631 SimulatedBlock::execFSREADREF(Signal* signal)
3632 {
3633   fsRefError(signal, __LINE__, "File system read failed");
3634 }
3635 
3636 void
execFSCLOSEREF(Signal * signal)3637 SimulatedBlock::execFSCLOSEREF(Signal* signal)
3638 {
3639   fsRefError(signal, __LINE__, "File system close failed");
3640 }
3641 
3642 void
execFSOPENREF(Signal * signal)3643 SimulatedBlock::execFSOPENREF(Signal* signal)
3644 {
3645   fsRefError(signal, __LINE__, "File system open failed");
3646 }
3647 
3648 void
execFSREMOVEREF(Signal * signal)3649 SimulatedBlock::execFSREMOVEREF(Signal* signal)
3650 {
3651   fsRefError(signal, __LINE__, "File system remove failed");
3652 }
3653 
3654 void
execFSSYNCREF(Signal * signal)3655 SimulatedBlock::execFSSYNCREF(Signal* signal)
3656 {
3657   fsRefError(signal, __LINE__, "File system sync failed");
3658 }
3659 
3660 void
execFSAPPENDREF(Signal * signal)3661 SimulatedBlock::execFSAPPENDREF(Signal* signal)
3662 {
3663   fsRefError(signal, __LINE__, "File system append failed");
3664 }
3665 
3666 #ifdef VM_TRACE
3667 static Ptr<void> * m_empty_global_variables[] = { 0 };
3668 void
disable_global_variables()3669 SimulatedBlock::disable_global_variables()
3670 {
3671   m_global_variables_save = m_global_variables;
3672   m_global_variables = m_empty_global_variables;
3673 }
3674 
3675 void
enable_global_variables()3676 SimulatedBlock::enable_global_variables()
3677 {
3678   if (m_global_variables == m_empty_global_variables)
3679   {
3680     m_global_variables = m_global_variables_save;
3681   }
3682 }
3683 
3684 void
clear_global_variables()3685 SimulatedBlock::clear_global_variables(){
3686   Ptr<void> ** tmp = m_global_variables;
3687   while(* tmp != 0){
3688     (* tmp)->i = RNIL;
3689     (* tmp)->p = 0;
3690     tmp++;
3691   }
3692 }
3693 
3694 void
init_globals_list(void ** tmp,size_t cnt)3695 SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
3696   m_global_variables = new Ptr<void> * [cnt+1];
3697   for(size_t i = 0; i<cnt; i++){
3698     m_global_variables[i] = (Ptr<void>*)tmp[i];
3699   }
3700   m_global_variables[cnt] = 0;
3701 }
3702 
3703 #endif
3704 
3705 #include "KeyDescriptor.hpp"
3706 
3707 Uint32
xfrm_key(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3708 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
3709 			 Uint32 *dst, Uint32 dstSize,
3710 			 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3711 {
3712   const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3713   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3714 
3715   Uint32 i = 0;
3716   Uint32 srcPos = 0;
3717   Uint32 dstPos = 0;
3718   while (i < noOfKeyAttr)
3719   {
3720     const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3721     Uint32 dstWords =
3722       xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3723                 src, srcPos, dst, dstPos, dstSize);
3724     keyPartLen[i++] = dstWords;
3725     if (unlikely(dstWords == 0))
3726       return 0;
3727   }
3728 
3729   if (0)
3730   {
3731     for(Uint32 i = 0; i<dstPos; i++)
3732     {
3733       printf("%.8x ", dst[i]);
3734     }
3735     printf("\n");
3736   }
3737   return dstPos;
3738 }
3739 
3740 Uint32
xfrm_attr(Uint32 attrDesc,CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const3741 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
3742                           const Uint32* src, Uint32 & srcPos,
3743                           Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
3744 {
3745   Uint32 array =
3746     AttributeDescriptor::getArrayType(attrDesc);
3747   Uint32 srcBytes =
3748     AttributeDescriptor::getSizeInBytes(attrDesc);
3749 
3750   Uint32 srcWords = ~0;
3751   Uint32 dstWords = ~0;
3752   uchar* dstPtr = (uchar*)&dst[dstPos];
3753   const uchar* srcPtr = (const uchar*)&src[srcPos];
3754 
3755   if (cs == NULL)
3756   {
3757     jam();
3758     Uint32 len;
3759     LINT_INIT(len);
3760     switch(array){
3761     case NDB_ARRAYTYPE_SHORT_VAR:
3762       len = 1 + srcPtr[0];
3763       break;
3764     case NDB_ARRAYTYPE_MEDIUM_VAR:
3765       len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3766       break;
3767 #ifndef VM_TRACE
3768     default:
3769       abort();
3770 #endif
3771     case NDB_ARRAYTYPE_FIXED:
3772       len = srcBytes;
3773     }
3774     srcWords = (len + 3) >> 2;
3775     dstWords = srcWords;
3776     memcpy(dstPtr, srcPtr, dstWords << 2);
3777 
3778     if (0)
3779     {
3780       ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3781                srcPos, dstPos, len, srcWords, dstWords);
3782 
3783       for(Uint32 i = 0; i<srcWords; i++)
3784         printf("%.8x ", src[srcPos + i]);
3785       printf("\n");
3786     }
3787   }
3788   else
3789   {
3790     jam();
3791     Uint32 typeId =
3792       AttributeDescriptor::getType(attrDesc);
3793     Uint32 lb, len;
3794     bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
3795     if (unlikely(!ok))
3796       return 0;
3797     Uint32 xmul = cs->strxfrm_multiply;
3798     if (xmul == 0)
3799       xmul = 1;
3800     /*
3801      * Varchar end-spaces are ignored in comparisons.  To get same hash
3802      * we blank-pad to maximum length via strnxfrm.
3803      */
3804     Uint32 dstLen = xmul * (srcBytes - lb);
3805     ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3806     int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
3807     if (unlikely(n == -1))
3808       return 0;
3809     while ((n & 3) != 0)
3810     {
3811       dstPtr[n++] = 0;
3812     }
3813     dstWords = (n >> 2);
3814     srcWords = (lb + len + 3) >> 2;
3815   }
3816 
3817   dstPos += dstWords;
3818   srcPos += srcWords;
3819   return dstWords;
3820 }
3821 
3822 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3823 SimulatedBlock::create_distr_key(Uint32 tableId,
3824 				 const Uint32 *src,
3825                                  Uint32* dst,
3826 				 const Uint32
3827 				 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3828 {
3829   const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3830   const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3831   Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3832 
3833   Uint32 i = 0;
3834   Uint32 dstPos = 0;
3835 
3836   /* --Note that src and dst may be the same location-- */
3837 
3838   if(keyPartLen)
3839   {
3840     while (i < noOfKeyAttr && noOfDistrKeys)
3841     {
3842       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3843       Uint32 len = keyPartLen[i];
3844       if(AttributeDescriptor::getDKey(attr))
3845       {
3846 	noOfDistrKeys--;
3847 	memmove(dst+dstPos, src, len << 2);
3848 	dstPos += len;
3849       }
3850       src += len;
3851       i++;
3852     }
3853   }
3854   else
3855   {
3856     while (i < noOfKeyAttr && noOfDistrKeys)
3857     {
3858       Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3859       Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3860       ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3861       if(AttributeDescriptor::getDKey(attr))
3862       {
3863 	noOfDistrKeys--;
3864 	memmove(dst+dstPos, src, len << 2);
3865 	dstPos += len;
3866       }
3867       src += len;
3868       i++;
3869     }
3870   }
3871   return dstPos;
3872 }
3873 
3874 CArray<KeyDescriptor> g_key_descriptor_pool;
3875 
3876 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)3877 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
3878                                  Uint32 dst[],
3879                                  Uint32 dstcnt,
3880                                  Uint32 gsn,
3881                                  Signal * signal,
3882                                  Uint32 sigLen,
3883                                  JobBufferLevel prio,
3884                                  SectionHandle * userhandle)
3885 {
3886   ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
3887   pathcnt--; // first hop is made from here
3888 
3889 
3890   Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3891   ndbrequire(len <= 25);
3892 
3893   SectionHandle handle(this, signal);
3894   if (userhandle)
3895   {
3896     ljam();
3897     handle.m_cnt = userhandle->m_cnt;
3898     for (Uint32 i = 0; i<handle.m_cnt; i++)
3899       handle.m_ptr[i] = userhandle->m_ptr[i];
3900     userhandle->m_cnt = 0;
3901   }
3902 
3903   if (len + sigLen > 25)
3904   {
3905     ljam();
3906 
3907     /**
3908      * we need to store theData in a section
3909      */
3910     ndbrequire(handle.m_cnt < 3);
3911     handle.m_ptr[2] = handle.m_ptr[1];
3912     handle.m_ptr[1] = handle.m_ptr[0];
3913     Ptr<SectionSegment> tmp;
3914     if (unlikely(! import(tmp, signal->theData, sigLen)))
3915     {
3916       handle_out_of_longsignal_memory(0);
3917     }
3918     handle.m_ptr[0].p = tmp.p;
3919     handle.m_ptr[0].i = tmp.i;
3920     handle.m_ptr[0].sz = sigLen;
3921     handle.m_cnt ++;
3922   }
3923   else
3924   {
3925     ljam();
3926     memmove(signal->theData + len, signal->theData, 4 * sigLen);
3927     len += sigLen;
3928   }
3929 
3930   LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
3931   ord->cnt = (pathcnt << 16) | (dstcnt);
3932   ord->gsn = gsn;
3933   ord->prio = Uint32(prio);
3934 
3935   Uint32 * dstptr = ord->path;
3936   for (Uint32 i = 1; i <= pathcnt; i++)
3937   {
3938     ndbrequire(refToNode(path[i].ref) == 0 ||
3939                refToNode(path[i].ref) == getOwnNodeId());
3940 
3941     * dstptr++ = path[i].ref;
3942     * dstptr++ = Uint32(path[i].prio);
3943   }
3944 
3945   for (Uint32 i = 0; i<dstcnt; i++)
3946   {
3947     ndbrequire(refToNode(dst[i]) == 0 ||
3948                refToNode(dst[i]) == getOwnNodeId());
3949 
3950     * dstptr++ = dst[i];
3951   }
3952 
3953   sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
3954              path[0].prio, &handle);
3955 }
3956 
3957 void
execLOCAL_ROUTE_ORD(Signal * signal)3958 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
3959 {
3960   ljamEntry();
3961 
3962   if (!assembleFragments(signal))
3963   {
3964     ljam();
3965     return;
3966   }
3967 
3968   if (ERROR_INSERTED(1001))
3969   {
3970     /**
3971      * This NDBCNTR error code 1001
3972      */
3973     ljam();
3974     SectionHandle handle(this, signal);
3975     sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
3976                         signal->getLength(), &handle);
3977     return;
3978   }
3979 
3980   LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
3981   Uint32 pathcnt = ord->cnt >> 16;
3982   Uint32 dstcnt = ord->cnt & 0xFFFF;
3983   Uint32 sigLen = signal->getLength();
3984 
3985   if (pathcnt == 0)
3986   {
3987     /**
3988      * Send to final destination(s);
3989      */
3990     ljam();
3991     Uint32 gsn = ord->gsn;
3992     Uint32 prio = ord->prio;
3993     memcpy(signal->theData+25, ord->path, 4*dstcnt);
3994     SectionHandle handle(this, signal);
3995     if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
3996     {
3997       ljam();
3998       /**
3999        * Data is at end of this...
4000        */
4001       memmove(signal->theData,
4002               signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4003               4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4004       sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4005     }
4006     else
4007     {
4008       ljam();
4009       /**
4010        * Put section 0 in signal->theData
4011        */
4012       sigLen = handle.m_ptr[0].sz;
4013       ndbrequire(sigLen <= 25);
4014       copy(signal->theData, handle.m_ptr[0]);
4015       release(handle.m_ptr[0]);
4016 
4017       for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4018         handle.m_ptr[i] = handle.m_ptr[i+1];
4019       handle.m_cnt--;
4020     }
4021 
4022     /*
4023      * The extra if-statement is as sendSignalNoRelease will copy sections
4024      *   which is not necessary is only sending to one destination
4025      */
4026     if (dstcnt > 1)
4027     {
4028       jam();
4029       for (Uint32 i = 0; i<dstcnt; i++)
4030       {
4031         ljam();
4032         sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4033                             JobBufferLevel(prio), &handle);
4034       }
4035       releaseSections(handle);
4036     }
4037     else
4038     {
4039       jam();
4040       sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4041                  JobBufferLevel(prio), &handle);
4042     }
4043   }
4044   else
4045   {
4046     /**
4047      * Reroute
4048      */
4049     ljam();
4050     SectionHandle handle(this, signal);
4051     Uint32 ref = ord->path[0];
4052     Uint32 prio = ord->path[1];
4053     Uint32 len = sigLen - 2;
4054     ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4055     memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4056     sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4057                JobBufferLevel(prio), &handle);
4058   }
4059 }
4060 
4061 
4062 #ifdef VM_TRACE
4063 bool
debugOutOn()4064 SimulatedBlock::debugOutOn()
4065 {
4066   SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4067   return
4068     globalData.testOn &&
4069     globalSignalLoggers.logMatch(number(), mask);
4070 }
4071 
4072 const char*
debugOutTag(char * buf,int line)4073 SimulatedBlock::debugOutTag(char *buf, int line)
4074 {
4075   char blockbuf[40];
4076   char instancebuf[40];
4077   char linebuf[40];
4078   char timebuf[40];
4079   sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4080   instancebuf[0] = 0;
4081   if (instance() != 0)
4082     sprintf(instancebuf, "/%u", instance());
4083   sprintf(linebuf, " %d", line);
4084   timebuf[0] = 0;
4085 #ifdef VM_TRACE_TIME
4086   {
4087     NDB_TICKS t = NdbTick_CurrentMillisecond();
4088     uint s = (t / 1000) % 3600;
4089     uint ms = t % 1000;
4090     sprintf(timebuf, " - %u.%03u -", s, ms);
4091   }
4092 #endif
4093   sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4094   return buf;
4095 }
4096 #endif
4097 
4098 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4099 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4100                                                const Uint32 blocks[],
4101                                                const Callback & cb,
4102                                                JobBufferLevel prio)
4103 {
4104 #ifndef NDBD_MULTITHREADED
4105   Callback copy = cb;
4106   execute(signal, copy, 0);
4107 #else
4108   ljam();
4109   Uint32 ref[32]; // max threads
4110   Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4111                                                    ref, NDB_ARRAY_SIZE(ref));
4112   if (cnt == 0)
4113   {
4114     ljam();
4115     Callback copy = cb;
4116     execute(signal, copy, 0);
4117     return;
4118   }
4119 
4120   Ptr<SyncThreadRecord> ptr;
4121   ndbrequire(c_syncThreadPool.seize(ptr));
4122   ptr.p->m_cnt = cnt;
4123   ptr.p->m_callback = cb;
4124 
4125   signal->theData[0] = reference();
4126   signal->theData[1] = ptr.i;
4127   signal->theData[2] = Uint32(prio);
4128   for (Uint32 i = 0; i<cnt; i++)
4129   {
4130     sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4131   }
4132 #endif
4133 }
4134 
4135 void
execSYNC_THREAD_REQ(Signal * signal)4136 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4137 {
4138   ljamEntry();
4139   Uint32 ref = signal->theData[0];
4140   Uint32 prio = signal->theData[2];
4141   sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4142              JobBufferLevel(prio));
4143 }
4144 
4145 void
execSYNC_THREAD_CONF(Signal * signal)4146 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4147 {
4148   ljamEntry();
4149   Ptr<SyncThreadRecord> ptr;
4150   c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4151   if (ptr.p->m_cnt == 1)
4152   {
4153     ljam();
4154     Callback copy = ptr.p->m_callback;
4155     c_syncThreadPool.release(ptr);
4156     execute(signal, copy, 0);
4157     return;
4158   }
4159   ptr.p->m_cnt --;
4160 }
4161 
4162 void
execSYNC_REQ(Signal * signal)4163 SimulatedBlock::execSYNC_REQ(Signal* signal)
4164 {
4165   ljamEntry();
4166   Uint32 ref = signal->theData[0];
4167   Uint32 prio = signal->theData[2];
4168   sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4169              JobBufferLevel(prio));
4170 }
4171 
4172 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4173 SimulatedBlock::synchronize_path(Signal * signal,
4174                                  const Uint32 blocks[],
4175                                  const Callback & cb,
4176                                  JobBufferLevel prio)
4177 {
4178   ljam();
4179 
4180   // reuse SyncThreadRecord
4181   Ptr<SyncThreadRecord> ptr;
4182   ndbrequire(c_syncThreadPool.seize(ptr));
4183   ptr.p->m_cnt = 0; // with count of 0
4184   ptr.p->m_callback = cb;
4185 
4186   SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4187   req->senderData = ptr.i;
4188   req->prio = Uint32(prio);
4189   req->count = 1;
4190   if (blocks[0] == 0)
4191   {
4192     ljam();
4193     ndbrequire(false); // TODO
4194   }
4195   else
4196   {
4197     ljam();
4198     Uint32 len = 0;
4199     for (; blocks[len+1] != 0; len++)
4200     {
4201       req->path[len] = blocks[len+1];
4202     }
4203     req->pathlen = 1 + len;
4204     req->path[len] = reference();
4205     sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4206                GSN_SYNC_PATH_REQ, signal,
4207                SyncPathReq::SignalLength + (1 + len), prio);
4208   }
4209 }
4210 
4211 void
execSYNC_PATH_REQ(Signal * signal)4212 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
4213 {
4214   ljamEntry();
4215   SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4216   if (req->pathlen == 1)
4217   {
4218     ljam();
4219     SyncPathReq copy = *req;
4220     SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
4221     conf->senderData = copy.senderData;
4222     conf->count = copy.count;
4223     sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4224                SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4225   }
4226   else
4227   {
4228     ljam();
4229     Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4230     req->pathlen--;
4231     memmove(req->path, req->path + 1, 4 * req->pathlen);
4232     sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4233                SyncPathReq::SignalLength + (1 + req->pathlen),
4234                JobBufferLevel(req->prio));
4235   }
4236 }
4237 
4238 void
execSYNC_PATH_CONF(Signal * signal)4239 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
4240 {
4241   ljamEntry();
4242   SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
4243   Ptr<SyncThreadRecord> ptr;
4244 
4245   c_syncThreadPool.getPtr(ptr, conf.senderData);
4246 
4247   if (ptr.p->m_cnt == 0)
4248   {
4249     ljam();
4250     ptr.p->m_cnt = conf.count;
4251   }
4252 
4253   if (ptr.p->m_cnt == 1)
4254   {
4255     ljam();
4256     Callback copy = ptr.p->m_callback;
4257     c_syncThreadPool.release(ptr);
4258     execute(signal, copy, 0);
4259     return;
4260   }
4261 
4262   ptr.p->m_cnt --;
4263 }
4264 
4265 
4266 bool
checkNodeFailSequence(Signal * signal)4267 SimulatedBlock::checkNodeFailSequence(Signal* signal)
4268 {
4269   Uint32 ref = signal->getSendersBlockRef();
4270 
4271   /**
4272    * Make sure that a signal being part of node-failure handling
4273    *   from a remote node, does not get to us before we got the NODE_FAILREP
4274    *   (this to avoid tricky state handling)
4275    *
4276    * To ensure this, we send the signal via QMGR (GSN_COMMIT_FAILREQ)
4277    *   and NDBCNTR (which sends NODE_FAILREP)
4278    *
4279    * The extra time should be negilable
4280    *
4281    * Note, make an exception for signals sent by our self
4282    *       as they are only sent as a consequence of NODE_FAILREP
4283    */
4284   if (ref == reference() ||
4285       (refToNode(ref) == getOwnNodeId() &&
4286        refToMain(ref) == NDBCNTR))
4287   {
4288     ljam();
4289     return true;
4290   }
4291 
4292   RoutePath path[2];
4293   path[0].ref = QMGR_REF;
4294   path[0].prio = JBB;
4295   path[1].ref = NDBCNTR_REF;
4296   path[1].prio = JBB;
4297 
4298   Uint32 dst[1];
4299   dst[0] = reference();
4300 
4301   SectionHandle handle(this, signal);
4302   Uint32 gsn = signal->header.theVerId_signalNumber;
4303   Uint32 len = signal->getLength();
4304 
4305   sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle);
4306   return false;
4307 }
4308 
4309 void
setup_wakeup()4310 SimulatedBlock::setup_wakeup()
4311 {
4312 #ifdef NDBD_MULTITHREADED
4313 #else
4314   globalTransporterRegistry.setup_wakeup_socket();
4315 #endif
4316 }
4317 
4318 void
wakeup()4319 SimulatedBlock::wakeup()
4320 {
4321 #ifdef NDBD_MULTITHREADED
4322   mt_wakeup(this);
4323 #else
4324   globalTransporterRegistry.wakeup();
4325 #endif
4326 }
4327 
4328 
4329 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const4330 SimulatedBlock::ndbinfo_send_row(Signal* signal,
4331                                  const DbinfoScanReq& req,
4332                                  const Ndbinfo::Row& row,
4333                                  Ndbinfo::Ratelimit& rl) const
4334 {
4335   // Check correct number of columns against table
4336   assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4337 
4338   TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
4339   tidai->connectPtr= req.resultData;
4340   tidai->transId[0]= req.transId[0];
4341   tidai->transId[1]= req.transId[1];
4342 
4343   LinearSectionPtr ptr[3];
4344   ptr[0].p = row.getDataPtr();
4345   ptr[0].sz = row.getLength();
4346 
4347   rl.rows++;
4348   rl.bytes += row.getLength();
4349 
4350   sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4351              signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4352 }
4353 
4354 
4355 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const4356 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
4357                                        DbinfoScanReq& req,
4358                                        const Ndbinfo::Ratelimit& rl,
4359                                        Uint32 data1, Uint32 data2,
4360                                        Uint32 data3, Uint32 data4) const
4361 {
4362   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4363   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4364   MEMCOPY_NO_WORDS(conf, &req, signal_length);
4365 
4366   conf->returnedRows = rl.rows;
4367 
4368   // Update the cursor with current item number
4369   Ndbinfo::ScanCursor* cursor =
4370     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4371 
4372   cursor->data[0] = data1;
4373   cursor->data[1] = data2;
4374   cursor->data[2] = data3;
4375   cursor->data[3] = data4;
4376 
4377   // Increase number of rows and bytes sent to far
4378   cursor->totalRows += rl.rows;
4379   cursor->totalBytes += rl.bytes;
4380 
4381   Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
4382 
4383   sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4384              signal_length, JBB);
4385 }
4386 
4387 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const4388 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
4389                                        DbinfoScanReq& req,
4390                                        const Ndbinfo::Ratelimit& rl) const
4391 {
4392   DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4393   const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4394   Uint32 sender_ref = req.resultRef;
4395   MEMCOPY_NO_WORDS(conf, &req, signal_length);
4396 
4397   conf->returnedRows = rl.rows;
4398 
4399   if (req.cursor_sz)
4400   {
4401     jam();
4402     // Update the cursor with current item number
4403     Ndbinfo::ScanCursor* cursor =
4404       (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4405 
4406     // Reset all data holders
4407     memset(cursor->data, 0, sizeof(cursor->data));
4408 
4409     // Increase number of rows and bytes sent to far
4410     cursor->totalRows += rl.rows;
4411     cursor->totalBytes += rl.bytes;
4412 
4413     Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
4414 
4415     sender_ref = cursor->senderRef;
4416 
4417   }
4418   sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4419              signal_length, JBB);
4420 }
4421 
4422 #ifdef VM_TRACE
4423 void
assertOwnThread()4424 SimulatedBlock::assertOwnThread()
4425 {
4426 #ifdef NDBD_MULTITHREADED
4427   mt_assert_own_thread(this);
4428 #endif
4429 }
4430 
4431 #endif
4432