1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 /*
26 This file is used to build both the multithreaded and the singlethreaded
27 ndbd. It is built twice, included from either SimulatedBlock_mt.cpp (with
28 the macro NDBD_MULTITHREADED defined) or SimulatedBlock_nonmt.cpp (with the
29 macro not defined).
30 */
31
32 #include <ndb_global.h>
33
34 #include "SimulatedBlock.hpp"
35 #include <NdbOut.hpp>
36 #include <OutputStream.hpp>
37 #include <GlobalData.hpp>
38 #include <Emulator.hpp>
39 #include <WatchDog.hpp>
40 #include <ErrorHandlingMacros.hpp>
41 #include <TimeQueue.hpp>
42 #include <TransporterRegistry.hpp>
43 #include <SignalLoggerManager.hpp>
44 #include <FastScheduler.hpp>
45 #include "ndbd_malloc.hpp"
46 #include <signaldata/EventReport.hpp>
47 #include <signaldata/ContinueFragmented.hpp>
48 #include <signaldata/NodeStateSignalData.hpp>
49 #include <signaldata/FsRef.hpp>
50 #include <signaldata/SignalDroppedRep.hpp>
51 #include <signaldata/LocalRouteOrd.hpp>
52 #include <signaldata/TransIdAI.hpp>
53 #include <signaldata/Sync.hpp>
54 #include <DebuggerNames.hpp>
55 #include "LongSignal.hpp"
56
57 #include <Properties.hpp>
58 #include "Configuration.hpp"
59 #include <AttributeDescriptor.hpp>
60 #include <NdbSqlUtil.hpp>
61
62 #include "../blocks/dbdih/Dbdih.hpp"
63 #include <signaldata/CallbackSignal.hpp>
64 #include "LongSignalImpl.hpp"
65
66 #include <EventLogger.hpp>
67 extern EventLogger * g_eventLogger;
68
69 #define ljamEntry() jamEntryLine(30000 + __LINE__)
70 #define ljam() jamLine(30000 + __LINE__)
71
72 //
73 // Constructor, Destructor
74 //
SimulatedBlock(BlockNumber blockNumber,struct Block_context & ctx,Uint32 instanceNumber)75 SimulatedBlock::SimulatedBlock(BlockNumber blockNumber,
76 struct Block_context & ctx,
77 Uint32 instanceNumber)
78 : theNodeId(globalData.ownId),
79 theNumber(blockNumber),
80 theInstance(instanceNumber),
81 theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
82 theInstanceList(0),
83 theMainInstance(0),
84 m_ctx(ctx),
85 m_global_page_pool(globalData.m_global_page_pool),
86 m_shared_page_pool(globalData.m_shared_page_pool),
87 c_fragmentInfoHash(c_fragmentInfoPool),
88 c_linearFragmentSendList(c_fragmentSendPool),
89 c_segmentedFragmentSendList(c_fragmentSendPool),
90 c_mutexMgr(* this),
91 c_counterMgr(* this)
92 #ifdef VM_TRACE
93 ,debugOut(*new NdbOut(*new FileOutputStream(globalSignalLoggers.getOutputStream())))
94 #endif
95 #ifdef VM_TRACE_TIME
96 ,m_currentGsn(0)
97 #endif
98 {
99 m_threadId = 0;
100 m_watchDogCounter = NULL;
101 m_jamBuffer = (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);
102 NewVarRef = 0;
103
104 SimulatedBlock* mainBlock = globalData.getBlock(blockNumber);
105
106 if (theInstance == 0) {
107 ndbrequire(mainBlock == 0);
108 mainBlock = this;
109 globalData.setBlock(blockNumber, mainBlock);
110 } else {
111 ndbrequire(mainBlock != 0);
112 mainBlock->addInstance(this, theInstance);
113 }
114 theMainInstance = mainBlock;
115
116 c_fragmentIdCounter = 1;
117 c_fragSenderRunning = false;
118
119 #ifdef VM_TRACE_TIME
120 clearTimes();
121 #endif
122
123 for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++)
124 theExecArray[i] = 0;
125
126 installSimulatedBlockFunctions();
127
128 m_callbackTableAddr = 0;
129
130 CLEAR_ERROR_INSERT_VALUE;
131
132 #ifdef VM_TRACE
133 m_global_variables = new Ptr<void> * [1];
134 m_global_variables[0] = 0;
135 m_global_variables_save = 0;
136 #endif
137 }
138
139 void
addInstance(SimulatedBlock * b,Uint32 theInstance)140 SimulatedBlock::addInstance(SimulatedBlock* b, Uint32 theInstance)
141 {
142 ndbrequire(theMainInstance == this);
143 ndbrequire(number() == b->number());
144 if (theInstanceList == 0)
145 {
146 theInstanceList = new SimulatedBlock* [MaxInstances];
147 ndbrequire(theInstanceList != 0);
148 for (Uint32 i = 0; i < MaxInstances; i++)
149 theInstanceList[i] = 0;
150 }
151 ndbrequire(theInstance < MaxInstances);
152 ndbrequire(theInstanceList[theInstance] == 0);
153 theInstanceList[theInstance] = b;
154 }
155
156 void
initCommon()157 SimulatedBlock::initCommon()
158 {
159 Uint32 count = 10;
160 this->getParam("FragmentSendPool", &count);
161 c_fragmentSendPool.setSize(count);
162
163 count = 10;
164 this->getParam("FragmentInfoPool", &count);
165 c_fragmentInfoPool.setSize(count);
166
167 count = 10;
168 this->getParam("FragmentInfoHash", &count);
169 c_fragmentInfoHash.setSize(count);
170
171 count = 5;
172 this->getParam("ActiveMutexes", &count);
173 c_mutexMgr.setSize(count);
174
175 count = 5;
176 this->getParam("ActiveCounters", &count);
177 c_counterMgr.setSize(count);
178
179 count = 5;
180 this->getParam("ActiveThreadSync", &count);
181 c_syncThreadPool.setSize(count);
182 }
183
~SimulatedBlock()184 SimulatedBlock::~SimulatedBlock()
185 {
186 freeBat();
187 #ifdef VM_TRACE_TIME
188 printTimes(stdout);
189 #endif
190
191 #ifdef VM_TRACE
192 delete [] m_global_variables;
193 #endif
194
195 if (theInstanceList != 0) {
196 Uint32 i;
197 for (i = 0; i < MaxInstances; i++)
198 delete theInstanceList[i];
199 delete [] theInstanceList;
200 }
201 theInstanceList = 0;
202 }
203
204 void
installSimulatedBlockFunctions()205 SimulatedBlock::installSimulatedBlockFunctions(){
206 ExecFunction * a = theExecArray;
207 a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
208 a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ;
209 a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
210 a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
211 a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
212 a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
213 a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
214 a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
215 a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
216 a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
217 a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
218 a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
219 a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
220 a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
221 a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
222 a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
223 a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
224 a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
225 a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
226 a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
227 a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
228 a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
229 a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
230 a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
231 a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
232 a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
233 a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
234 a[GSN_LOCAL_ROUTE_ORD] = &SimulatedBlock::execLOCAL_ROUTE_ORD;
235 a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
236 a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
237 a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
238 }
239
240 void
addRecSignalImpl(GlobalSignalNumber gsn,ExecFunction f,bool force)241 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
242 ExecFunction f, bool force){
243 if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
244 char errorMsg[255];
245 BaseString::snprintf(errorMsg, 255,
246 "GSN %d(%d))", gsn, MAX_GSN);
247 ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
248 }
249 theExecArray[gsn] = f;
250 }
251
252 void
assignToThread(ThreadContext ctx)253 SimulatedBlock::assignToThread(ThreadContext ctx)
254 {
255 m_threadId = ctx.threadId;
256 m_jamBuffer = ctx.jamBuffer;
257 m_watchDogCounter = ctx.watchDogCounter;
258 m_sectionPoolCache = ctx.sectionPoolCache;
259 }
260
261 Uint32
getInstanceKey(Uint32 tabId,Uint32 fragId)262 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
263 {
264 Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
265 Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
266 return instanceKey;
267 }
268
269 Uint32
getInstanceFromKey(Uint32 instanceKey)270 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
271 {
272 Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
273 Uint32 instanceNo;
274 if (lqhWorkers == 0) {
275 instanceNo = 0;
276 } else {
277 assert(instanceKey != 0);
278 instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
279 }
280 return instanceNo;
281 }
282
283 void
signal_error(Uint32 gsn,Uint32 len,Uint32 recBlockNo,const char * filename,int lineno) const284 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
285 const char* filename, int lineno) const
286 {
287 char objRef[255];
288 BaseString::snprintf(objRef, 255, "%s:%d", filename, lineno);
289 char probData[255];
290 BaseString::snprintf(probData, 255,
291 "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
292 gsn, len, recBlockNo);
293
294 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
295 probData,
296 objRef);
297 }
298
299
300 extern class SectionSegmentPool g_sectionSegmentPool;
301
302 #define check_sections(signal, cnt, cnt2) do { if (unlikely(cnt)) { handle_invalid_sections_in_send_signal(signal); } else if (unlikely(cnt2 == 0 && (signal->header.m_fragmentInfo != 0 && signal->header.m_fragmentInfo != 3))) { handle_invalid_fragmentInfo(signal); } } while(0)
303
304 void
handle_invalid_sections_in_send_signal(Signal * signal) const305 SimulatedBlock::handle_invalid_sections_in_send_signal(Signal* signal) const
306 {
307 //Uint32 cnt = signal->header.m_noOfSections;
308 #if defined VM_TRACE || defined ERROR_INSERT
309 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
310 "Unhandled sections in sendSignal",
311 "");
312 #else
313 infoEvent("Unhandled sections in sendSignal!!");
314 #endif
315 }
316
317 void
handle_lingering_sections_after_execute(Signal * signal) const318 SimulatedBlock::handle_lingering_sections_after_execute(Signal* signal) const
319 {
320 //Uint32 cnt = signal->header.m_noOfSections;
321 #if defined VM_TRACE || defined ERROR_INSERT
322 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
323 "Unhandled sections after execute",
324 "");
325 #else
326 infoEvent("Unhandled sections after execute");
327 #endif
328 }
329
330 void
handle_lingering_sections_after_execute(SectionHandle * handle) const331 SimulatedBlock::handle_lingering_sections_after_execute(SectionHandle* handle) const
332 {
333 //Uint32 cnt = signal->header.m_noOfSections;
334 #if defined VM_TRACE || defined ERROR_INSERT
335 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
336 "Unhandled sections(handle) after execute",
337 "");
338 #else
339 infoEvent("Unhandled sections(handle) after execute");
340 #endif
341 }
342
343 void
handle_invalid_fragmentInfo(Signal * signal) const344 SimulatedBlock::handle_invalid_fragmentInfo(Signal* signal) const
345 {
346 #if defined VM_TRACE || defined ERROR_INSERT
347 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
348 "Incorrect header->m_fragmentInfo in sendSignal()",
349 "");
350 #else
351 signal->header.m_fragmentInfo = 0;
352 infoEvent("Incorrect header->m_fragmentInfo in sendSignal");
353 #endif
354 }
355
356 void
handle_out_of_longsignal_memory(Signal * signal) const357 SimulatedBlock::handle_out_of_longsignal_memory(Signal * signal) const
358 {
359 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
360 "Out of LongMessageBuffer in sendSignal",
361 "");
362 }
363
364 void
handle_send_failed(SendStatus ss,Signal * signal) const365 SimulatedBlock::handle_send_failed(SendStatus ss, Signal * signal) const
366 {
367 switch(ss){
368 case SEND_BUFFER_FULL:
369 ErrorReporter::handleError(NDBD_EXIT_GENERIC,
370 "Out of SendBufferMemory in sendSignal", "");
371 break;
372 case SEND_MESSAGE_TOO_BIG:
373 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
374 "Message to big in sendSignal", "");
375 break;
376 case SEND_UNKNOWN_NODE:
377 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
378 "Unknown node in sendSignal", "");
379 break;
380 case SEND_OK:
381 case SEND_BLOCKED:
382 case SEND_DISCONNECTED:
383 break;
384 }
385 ndbrequire(false);
386 }
387
388 static void
linkSegments(Uint32 head,Uint32 tail)389 linkSegments(Uint32 head, Uint32 tail){
390
391 Ptr<SectionSegment> headPtr;
392 g_sectionSegmentPool.getPtr(headPtr, head);
393
394 Ptr<SectionSegment> tailPtr;
395 g_sectionSegmentPool.getPtr(tailPtr, tail);
396
397 Ptr<SectionSegment> oldTailPtr;
398 g_sectionSegmentPool.getPtr(oldTailPtr, headPtr.p->m_lastSegment);
399
400 /* Can only efficiently link segments if linking to the end of a
401 * multiple-of-segment-size sized chunk
402 */
403 if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
404 {
405 #if defined VM_TRACE || defined ERROR_INSERT
406 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
407 "Bad head segment size",
408 "");
409 #else
410 ndbout_c("linkSegments : Bad head segment size");
411 #endif
412 }
413
414 headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
415 headPtr.p->m_sz += tailPtr.p->m_sz;
416
417 oldTailPtr.p->m_nextSegment = tailPtr.i;
418 }
419
420 void
getSections(Uint32 secCount,SegmentedSectionPtr ptr[3])421 getSections(Uint32 secCount, SegmentedSectionPtr ptr[3]){
422 Uint32 tSec0 = ptr[0].i;
423 Uint32 tSec1 = ptr[1].i;
424 Uint32 tSec2 = ptr[2].i;
425 SectionSegment * p;
426 switch(secCount){
427 case 3:
428 p = g_sectionSegmentPool.getPtr(tSec2);
429 ptr[2].p = p;
430 ptr[2].sz = p->m_sz;
431 case 2:
432 p = g_sectionSegmentPool.getPtr(tSec1);
433 ptr[1].p = p;
434 ptr[1].sz = p->m_sz;
435 case 1:
436 p = g_sectionSegmentPool.getPtr(tSec0);
437 ptr[0].p = p;
438 ptr[0].sz = p->m_sz;
439 case 0:
440 return;
441 }
442 char msg[40];
443 sprintf(msg, "secCount=%d", secCount);
444 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
445 }
446
447 void
getSection(SegmentedSectionPtr & ptr,Uint32 i)448 getSection(SegmentedSectionPtr & ptr, Uint32 i){
449 ptr.i = i;
450 SectionSegment * p = g_sectionSegmentPool.getPtr(i);
451 ptr.p = p;
452 ptr.sz = p->m_sz;
453 }
454
getSectionSz(Uint32 id)455 Uint32 getSectionSz(Uint32 id)
456 {
457 return g_sectionSegmentPool.getPtr(id)->m_sz;
458 }
459
getLastWordPtr(Uint32 id)460 Uint32* getLastWordPtr(Uint32 id)
461 {
462 SectionSegment* first= g_sectionSegmentPool.getPtr(id);
463 SectionSegment* last= g_sectionSegmentPool.getPtr(first->m_lastSegment);
464 Uint32 offset= (first->m_sz -1) % SectionSegment::DataLength;
465 return &last->theData[offset];
466 }
467
468 #ifdef NDBD_MULTITHREADED
469 #define SB_SP_ARG *m_sectionPoolCache,
470 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
471 #else
472 #define SB_SP_ARG
473 #define SB_SP_REL_ARG
474 #endif
475
476 static
477 void
releaseSections(SPC_ARG Uint32 secCount,SegmentedSectionPtr ptr[3])478 releaseSections(SPC_ARG Uint32 secCount, SegmentedSectionPtr ptr[3]){
479 Uint32 tSec0 = ptr[0].i;
480 Uint32 tSz0 = ptr[0].sz;
481 Uint32 tSec1 = ptr[1].i;
482 Uint32 tSz1 = ptr[1].sz;
483 Uint32 tSec2 = ptr[2].i;
484 Uint32 tSz2 = ptr[2].sz;
485 switch(secCount){
486 case 3:
487 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
488 relSz(tSz2), tSec2,
489 ptr[2].p->m_lastSegment);
490 case 2:
491 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
492 relSz(tSz1), tSec1,
493 ptr[1].p->m_lastSegment);
494 case 1:
495 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
496 relSz(tSz0), tSec0,
497 ptr[0].p->m_lastSegment);
498 case 0:
499 return;
500 }
501 char msg[40];
502 sprintf(msg, "secCount=%d", secCount);
503 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
504 }
505
506 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const507 SimulatedBlock::sendSignal(BlockReference ref,
508 GlobalSignalNumber gsn,
509 Signal* signal,
510 Uint32 length,
511 JobBufferLevel jobBuffer) const {
512
513 BlockReference sendBRef = reference();
514
515 Uint32 noOfSections = signal->header.m_noOfSections;
516 Uint32 recBlock = refToBlock(ref);
517 Uint32 recNode = refToNode(ref);
518 Uint32 ourProcessor = globalData.ownId;
519
520 signal->header.theLength = length;
521 signal->header.theVerId_signalNumber = gsn;
522 signal->header.theReceiversBlockNumber = recBlock;
523 signal->header.m_noOfSections = 0;
524
525 check_sections(signal, noOfSections, 0);
526
527 Uint32 tSignalId = signal->header.theSignalId;
528
529 if ((length == 0) || length > 25 || (recBlock == 0)) {
530 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
531 return;
532 }//if
533 #ifdef VM_TRACE
534 if(globalData.testOn){
535 Uint16 proc =
536 (recNode == 0 ? globalData.ownId : recNode);
537 signal->header.theSendersBlockRef = sendBRef;
538 globalSignalLoggers.sendSignal(signal->header,
539 jobBuffer,
540 &signal->theData[0],
541 proc);
542 }
543 #endif
544
545 if(recNode == ourProcessor || recNode == 0) {
546 signal->header.theSendersSignalId = tSignalId;
547 signal->header.theSendersBlockRef = sendBRef;
548 #ifdef NDBD_MULTITHREADED
549 if (jobBuffer == JBB)
550 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
551 else
552 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
553 #else
554 globalScheduler.execute(signal, jobBuffer, recBlock,
555 gsn);
556 #endif
557 return;
558 } else {
559 // send distributed Signal
560 SignalHeader sh;
561
562 Uint32 tTrace = signal->getTrace();
563
564 sh.theVerId_signalNumber = gsn;
565 sh.theReceiversBlockNumber = recBlock;
566 sh.theSendersBlockRef = refToBlock(sendBRef);
567 sh.theLength = length;
568 sh.theTrace = tTrace;
569 sh.theSignalId = tSignalId;
570 sh.m_noOfSections = 0;
571 sh.m_fragmentInfo = 0;
572
573 #ifdef TRACE_DISTRIBUTED
574 ndbout_c("send: %s(%d) to (%s, %d)",
575 getSignalName(gsn), gsn, getBlockName(recBlock),
576 recNode);
577 #endif
578
579 SendStatus ss;
580 #ifdef NDBD_MULTITHREADED
581 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
582 recNode, 0);
583 #else
584 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
585 &signal->theData[0], recNode,
586 (LinearSectionPtr*)0);
587 #endif
588
589 if (unlikely(! (ss == SEND_OK ||
590 ss == SEND_BLOCKED ||
591 ss == SEND_DISCONNECTED)))
592 {
593 handle_send_failed(ss, signal);
594 }
595 }
596 return;
597 }
598
599 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer) const600 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
601 GlobalSignalNumber gsn,
602 Signal* signal,
603 Uint32 length,
604 JobBufferLevel jobBuffer) const {
605
606 Uint32 noOfSections = signal->header.m_noOfSections;
607 Uint32 tSignalId = signal->header.theSignalId;
608 Uint32 tTrace = signal->getTrace();
609
610 Uint32 ourProcessor = globalData.ownId;
611 Uint32 recBlock = rg.m_block;
612
613 signal->header.theLength = length;
614 signal->header.theVerId_signalNumber = gsn;
615 signal->header.theReceiversBlockNumber = recBlock;
616 signal->header.theSendersSignalId = tSignalId;
617 signal->header.theSendersBlockRef = reference();
618 signal->header.m_noOfSections = 0;
619
620 check_sections(signal, noOfSections, 0);
621
622 if ((length == 0) || (length > 25) || (recBlock == 0)) {
623 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
624 return;
625 }//if
626
627 SignalHeader sh;
628
629 sh.theVerId_signalNumber = gsn;
630 sh.theReceiversBlockNumber = recBlock;
631 sh.theSendersBlockRef = refToBlock(reference());
632 sh.theLength = length;
633 sh.theTrace = tTrace;
634 sh.theSignalId = tSignalId;
635 sh.m_noOfSections = 0;
636 sh.m_fragmentInfo = 0;
637
638 /**
639 * Check own node
640 */
641 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
642 #ifdef VM_TRACE
643 if(globalData.testOn){
644 globalSignalLoggers.sendSignal(signal->header,
645 jobBuffer,
646 &signal->theData[0],
647 ourProcessor);
648 }
649 #endif
650
651 #ifdef NDBD_MULTITHREADED
652 if (jobBuffer == JBB)
653 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
654 else
655 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
656 #else
657 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
658 #endif
659
660 rg.m_nodes.clear((Uint32)0);
661 rg.m_nodes.clear(ourProcessor);
662 }
663
664 /**
665 * Do the big loop
666 */
667 Uint32 recNode = 0;
668 while(!rg.m_nodes.isclear()){
669 recNode = rg.m_nodes.find(recNode + 1);
670 rg.m_nodes.clear(recNode);
671 #ifdef VM_TRACE
672 if(globalData.testOn){
673 globalSignalLoggers.sendSignal(signal->header,
674 jobBuffer,
675 &signal->theData[0],
676 recNode);
677 }
678 #endif
679
680 #ifdef TRACE_DISTRIBUTED
681 ndbout_c("send: %s(%d) to (%s, %d)",
682 getSignalName(gsn), gsn, getBlockName(recBlock),
683 recNode);
684 #endif
685
686 SendStatus ss;
687 #ifdef NDBD_MULTITHREADED
688 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
689 recNode, 0);
690 #else
691 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
692 &signal->theData[0], recNode,
693 (LinearSectionPtr*)0);
694 #endif
695
696 if (unlikely(! (ss == SEND_OK ||
697 ss == SEND_BLOCKED ||
698 ss == SEND_DISCONNECTED)))
699 {
700 handle_send_failed(ss, signal);
701 }
702 }
703
704 return;
705 }
706
707 bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len);
708
709 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const710 SimulatedBlock::sendSignal(BlockReference ref,
711 GlobalSignalNumber gsn,
712 Signal* signal,
713 Uint32 length,
714 JobBufferLevel jobBuffer,
715 LinearSectionPtr ptr[3],
716 Uint32 noOfSections) const {
717
718 BlockReference sendBRef = reference();
719
720 Uint32 recBlock = refToBlock(ref);
721 Uint32 recNode = refToNode(ref);
722 Uint32 ourProcessor = globalData.ownId;
723
724 check_sections(signal, signal->header.m_noOfSections, noOfSections);
725
726 signal->header.theLength = length;
727 signal->header.theVerId_signalNumber = gsn;
728 signal->header.theReceiversBlockNumber = recBlock;
729 signal->header.m_noOfSections = noOfSections;
730
731 Uint32 tSignalId = signal->header.theSignalId;
732 Uint32 tFragInfo = signal->header.m_fragmentInfo;
733
734 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
735 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
736 return;
737 }//if
738 #ifdef VM_TRACE
739 if(globalData.testOn){
740 Uint16 proc =
741 (recNode == 0 ? globalData.ownId : recNode);
742 signal->header.theSendersBlockRef = sendBRef;
743 globalSignalLoggers.sendSignal(signal->header,
744 jobBuffer,
745 &signal->theData[0],
746 proc,
747 ptr, noOfSections);
748 }
749 #endif
750
751 if(recNode == ourProcessor || recNode == 0) {
752 signal->header.theSendersSignalId = tSignalId;
753 signal->header.theSendersBlockRef = sendBRef;
754
755 /**
756 * We have to copy the data
757 */
758 bool ok = true;
759 Ptr<SectionSegment> segptr[3];
760 for(Uint32 i = 0; i<noOfSections; i++){
761 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
762 signal->theData[length+i] = segptr[i].i;
763 }
764
765 if (unlikely(! ok))
766 {
767 handle_out_of_longsignal_memory(signal);
768 }
769
770 #ifdef NDBD_MULTITHREADED
771 if (jobBuffer == JBB)
772 sendlocal(m_threadId, &signal->header, signal->theData,
773 signal->theData+length);
774 else
775 sendprioa(m_threadId, &signal->header, signal->theData,
776 signal->theData+length);
777 #else
778 globalScheduler.execute(signal, jobBuffer, recBlock,
779 gsn);
780 #endif
781 signal->header.m_noOfSections = 0;
782 return;
783 } else {
784 // send distributed Signal
785 SignalHeader sh;
786
787 Uint32 tTrace = signal->getTrace();
788 Uint32 noOfSections = signal->header.m_noOfSections;
789
790 sh.theVerId_signalNumber = gsn;
791 sh.theReceiversBlockNumber = recBlock;
792 sh.theSendersBlockRef = refToBlock(sendBRef);
793 sh.theLength = length;
794 sh.theTrace = tTrace;
795 sh.theSignalId = tSignalId;
796 sh.m_noOfSections = noOfSections;
797 sh.m_fragmentInfo = tFragInfo;
798
799 #ifdef TRACE_DISTRIBUTED
800 ndbout_c("send: %s(%d) to (%s, %d)",
801 getSignalName(gsn), gsn, getBlockName(recBlock),
802 recNode);
803 #endif
804
805 SendStatus ss;
806 #ifdef NDBD_MULTITHREADED
807 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
808 recNode, ptr);
809 #else
810 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
811 &signal->theData[0], recNode,
812 ptr);
813 #endif
814
815 if (unlikely(! (ss == SEND_OK ||
816 ss == SEND_BLOCKED ||
817 ss == SEND_DISCONNECTED)))
818 {
819 handle_send_failed(ss, signal);
820 }
821 }
822
823 signal->header.m_noOfSections = 0;
824 signal->header.m_fragmentInfo = 0;
825 return;
826 }
827
828 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,LinearSectionPtr ptr[3],Uint32 noOfSections) const829 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
830 GlobalSignalNumber gsn,
831 Signal* signal,
832 Uint32 length,
833 JobBufferLevel jobBuffer,
834 LinearSectionPtr ptr[3],
835 Uint32 noOfSections) const {
836
837 Uint32 tSignalId = signal->header.theSignalId;
838 Uint32 tTrace = signal->getTrace();
839 Uint32 tFragInfo = signal->header.m_fragmentInfo;
840
841 Uint32 ourProcessor = globalData.ownId;
842 Uint32 recBlock = rg.m_block;
843
844 check_sections(signal, signal->header.m_noOfSections, noOfSections);
845
846 signal->header.theLength = length;
847 signal->header.theVerId_signalNumber = gsn;
848 signal->header.theReceiversBlockNumber = recBlock;
849 signal->header.theSendersSignalId = tSignalId;
850 signal->header.theSendersBlockRef = reference();
851 signal->header.m_noOfSections = noOfSections;
852
853 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
854 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
855 return;
856 }//if
857
858 SignalHeader sh;
859 sh.theVerId_signalNumber = gsn;
860 sh.theReceiversBlockNumber = recBlock;
861 sh.theSendersBlockRef = refToBlock(reference());
862 sh.theLength = length;
863 sh.theTrace = tTrace;
864 sh.theSignalId = tSignalId;
865 sh.m_noOfSections = noOfSections;
866 sh.m_fragmentInfo = tFragInfo;
867
868 /**
869 * Check own node
870 */
871 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){
872 #ifdef VM_TRACE
873 if(globalData.testOn){
874 globalSignalLoggers.sendSignal(signal->header,
875 jobBuffer,
876 &signal->theData[0],
877 ourProcessor,
878 ptr, noOfSections);
879 }
880 #endif
881 /**
882 * We have to copy the data
883 */
884 bool ok = true;
885 Ptr<SectionSegment> segptr[3];
886 for(Uint32 i = 0; i<noOfSections; i++){
887 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
888 signal->theData[length+i] = segptr[i].i;
889 }
890
891 if (unlikely(! ok))
892 {
893 handle_out_of_longsignal_memory(signal);
894 }
895
896 #ifdef NDBD_MULTITHREADED
897 if (jobBuffer == JBB)
898 sendlocal(m_threadId, &signal->header, signal->theData,
899 signal->theData + length);
900 else
901 sendprioa(m_threadId, &signal->header, signal->theData,
902 signal->theData + length);
903 #else
904 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
905 #endif
906
907 rg.m_nodes.clear((Uint32)0);
908 rg.m_nodes.clear(ourProcessor);
909 }
910
911 /**
912 * Do the big loop
913 */
914 Uint32 recNode = 0;
915 while(!rg.m_nodes.isclear()){
916 recNode = rg.m_nodes.find(recNode + 1);
917 rg.m_nodes.clear(recNode);
918
919 #ifdef VM_TRACE
920 if(globalData.testOn){
921 globalSignalLoggers.sendSignal(signal->header,
922 jobBuffer,
923 &signal->theData[0],
924 recNode,
925 ptr, noOfSections);
926 }
927 #endif
928
929 #ifdef TRACE_DISTRIBUTED
930 ndbout_c("send: %s(%d) to (%s, %d)",
931 getSignalName(gsn), gsn, getBlockName(recBlock),
932 recNode);
933 #endif
934
935 SendStatus ss;
936 #ifdef NDBD_MULTITHREADED
937 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
938 recNode, ptr);
939 #else
940 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
941 &signal->theData[0], recNode,
942 ptr);
943 #endif
944
945 if (unlikely(! (ss == SEND_OK ||
946 ss == SEND_BLOCKED ||
947 ss == SEND_DISCONNECTED)))
948 {
949 handle_send_failed(ss, signal);
950 }
951 }
952
953 signal->header.m_noOfSections = 0;
954 signal->header.m_fragmentInfo = 0;
955
956 return;
957 }
958
959 void
sendSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const960 SimulatedBlock::sendSignal(BlockReference ref,
961 GlobalSignalNumber gsn,
962 Signal* signal,
963 Uint32 length,
964 JobBufferLevel jobBuffer,
965 SectionHandle* sections) const {
966
967 Uint32 noOfSections = sections->m_cnt;
968 BlockReference sendBRef = reference();
969
970 Uint32 recBlock = refToBlock(ref);
971 Uint32 recNode = refToNode(ref);
972 Uint32 ourProcessor = globalData.ownId;
973
974 check_sections(signal, signal->header.m_noOfSections, noOfSections);
975
976 signal->header.theLength = length;
977 signal->header.theVerId_signalNumber = gsn;
978 signal->header.theReceiversBlockNumber = recBlock;
979 signal->header.m_noOfSections = noOfSections;
980
981 Uint32 tSignalId = signal->header.theSignalId;
982 Uint32 tFragInfo = signal->header.m_fragmentInfo;
983
984 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
985 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
986 return;
987 }//if
988 #ifdef VM_TRACE
989 if(globalData.testOn){
990 Uint16 proc =
991 (recNode == 0 ? globalData.ownId : recNode);
992 signal->header.theSendersBlockRef = sendBRef;
993 globalSignalLoggers.sendSignal(signal->header,
994 jobBuffer,
995 &signal->theData[0],
996 proc,
997 sections->m_ptr, noOfSections);
998 }
999 #endif
1000
1001 if(recNode == ourProcessor || recNode == 0) {
1002 signal->header.theSendersSignalId = tSignalId;
1003 signal->header.theSendersBlockRef = sendBRef;
1004
1005 /**
1006 * We have to copy the data
1007 */
1008 Uint32 * dst = signal->theData + length;
1009 * dst ++ = sections->m_ptr[0].i;
1010 * dst ++ = sections->m_ptr[1].i;
1011 * dst ++ = sections->m_ptr[2].i;
1012
1013 #ifdef NDBD_MULTITHREADED
1014 if (jobBuffer == JBB)
1015 sendlocal(m_threadId, &signal->header, signal->theData,
1016 signal->theData + length);
1017 else
1018 sendprioa(m_threadId, &signal->header, signal->theData,
1019 signal->theData + length);
1020 #else
1021 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1022 #endif
1023 } else {
1024 // send distributed Signal
1025 SignalHeader sh;
1026
1027 Uint32 tTrace = signal->getTrace();
1028
1029 sh.theVerId_signalNumber = gsn;
1030 sh.theReceiversBlockNumber = recBlock;
1031 sh.theSendersBlockRef = refToBlock(sendBRef);
1032 sh.theLength = length;
1033 sh.theTrace = tTrace;
1034 sh.theSignalId = tSignalId;
1035 sh.m_noOfSections = noOfSections;
1036 sh.m_fragmentInfo = tFragInfo;
1037
1038 #ifdef TRACE_DISTRIBUTED
1039 ndbout_c("send: %s(%d) to (%s, %d)",
1040 getSignalName(gsn), gsn, getBlockName(recBlock),
1041 recNode);
1042 #endif
1043
1044 SendStatus ss;
1045 #ifdef NDBD_MULTITHREADED
1046 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1047 recNode, &g_sectionSegmentPool, sections->m_ptr);
1048 #else
1049 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1050 &signal->theData[0], recNode,
1051 g_sectionSegmentPool,
1052 sections->m_ptr);
1053 #endif
1054
1055 if (unlikely(! (ss == SEND_OK ||
1056 ss == SEND_BLOCKED ||
1057 ss == SEND_DISCONNECTED)))
1058 {
1059 handle_send_failed(ss, signal);
1060 }
1061
1062 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1063 }
1064
1065 signal->header.m_noOfSections = 0;
1066 signal->header.m_fragmentInfo = 0;
1067 sections->m_cnt = 0;
1068 return;
1069 }
1070
1071 void
sendSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1072 SimulatedBlock::sendSignal(NodeReceiverGroup rg,
1073 GlobalSignalNumber gsn,
1074 Signal* signal,
1075 Uint32 length,
1076 JobBufferLevel jobBuffer,
1077 SectionHandle * sections) const {
1078
1079 Uint32 noOfSections = sections->m_cnt;
1080 Uint32 tSignalId = signal->header.theSignalId;
1081 Uint32 tTrace = signal->getTrace();
1082 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1083
1084 Uint32 ourProcessor = globalData.ownId;
1085 Uint32 recBlock = rg.m_block;
1086
1087 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1088
1089 signal->header.theLength = length;
1090 signal->header.theVerId_signalNumber = gsn;
1091 signal->header.theReceiversBlockNumber = recBlock;
1092 signal->header.theSendersSignalId = tSignalId;
1093 signal->header.theSendersBlockRef = reference();
1094 signal->header.m_noOfSections = noOfSections;
1095
1096 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1097 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1098 return;
1099 }//if
1100
1101 SignalHeader sh;
1102 sh.theVerId_signalNumber = gsn;
1103 sh.theReceiversBlockNumber = recBlock;
1104 sh.theSendersBlockRef = refToBlock(reference());
1105 sh.theLength = length;
1106 sh.theTrace = tTrace;
1107 sh.theSignalId = tSignalId;
1108 sh.m_noOfSections = noOfSections;
1109 sh.m_fragmentInfo = tFragInfo;
1110
1111 /**
1112 * Check own node
1113 */
1114 bool release = true;
1115 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1116 {
1117 release = false;
1118 #ifdef VM_TRACE
1119 if(globalData.testOn){
1120 globalSignalLoggers.sendSignal(signal->header,
1121 jobBuffer,
1122 &signal->theData[0],
1123 ourProcessor,
1124 sections->m_ptr, noOfSections);
1125 }
1126 #endif
1127 /**
1128 * We have to copy the data
1129 */
1130 Uint32 * dst = signal->theData + length;
1131 * dst ++ = sections->m_ptr[0].i;
1132 * dst ++ = sections->m_ptr[1].i;
1133 * dst ++ = sections->m_ptr[2].i;
1134 #ifdef NDBD_MULTITHREADED
1135 if (jobBuffer == JBB)
1136 sendlocal(m_threadId, &signal->header, signal->theData,
1137 signal->theData + length);
1138 else
1139 sendprioa(m_threadId, &signal->header, signal->theData,
1140 signal->theData + length);
1141 #else
1142 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1143 #endif
1144
1145 rg.m_nodes.clear((Uint32)0);
1146 rg.m_nodes.clear(ourProcessor);
1147 }
1148
1149 /**
1150 * Do the big loop
1151 */
1152 Uint32 recNode = 0;
1153 while(!rg.m_nodes.isclear()){
1154 recNode = rg.m_nodes.find(recNode + 1);
1155 rg.m_nodes.clear(recNode);
1156
1157 #ifdef VM_TRACE
1158 if(globalData.testOn){
1159 globalSignalLoggers.sendSignal(signal->header,
1160 jobBuffer,
1161 &signal->theData[0],
1162 recNode,
1163 sections->m_ptr, noOfSections);
1164 }
1165 #endif
1166
1167 #ifdef TRACE_DISTRIBUTED
1168 ndbout_c("send: %s(%d) to (%s, %d)",
1169 getSignalName(gsn), gsn, getBlockName(recBlock),
1170 recNode);
1171 #endif
1172
1173 SendStatus ss;
1174 #ifdef NDBD_MULTITHREADED
1175 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1176 recNode, &g_sectionSegmentPool, sections->m_ptr);
1177 #else
1178 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1179 &signal->theData[0], recNode,
1180 g_sectionSegmentPool,
1181 sections->m_ptr);
1182 #endif
1183
1184 if (unlikely(! (ss == SEND_OK ||
1185 ss == SEND_BLOCKED ||
1186 ss == SEND_DISCONNECTED)))
1187 {
1188 handle_send_failed(ss, signal);
1189 }
1190 }
1191
1192 if (release)
1193 {
1194 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1195 }
1196
1197 sections->m_cnt = 0;
1198 signal->header.m_noOfSections = 0;
1199 signal->header.m_fragmentInfo = 0;
1200
1201 return;
1202 }
1203
1204 void
sendSignalNoRelease(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1205 SimulatedBlock::sendSignalNoRelease(BlockReference ref,
1206 GlobalSignalNumber gsn,
1207 Signal* signal,
1208 Uint32 length,
1209 JobBufferLevel jobBuffer,
1210 SectionHandle* sections) const {
1211
1212 /**
1213 * Implementation the same as sendSignal(), except that
1214 * the sections are duplicated when sending locally, and
1215 * not released
1216 */
1217
1218 Uint32 noOfSections = sections->m_cnt;
1219 BlockReference sendBRef = reference();
1220
1221 Uint32 recBlock = refToBlock(ref);
1222 Uint32 recNode = refToNode(ref);
1223 Uint32 ourProcessor = globalData.ownId;
1224
1225 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1226
1227 signal->header.theLength = length;
1228 signal->header.theVerId_signalNumber = gsn;
1229 signal->header.theReceiversBlockNumber = recBlock;
1230 signal->header.m_noOfSections = noOfSections;
1231
1232 Uint32 tSignalId = signal->header.theSignalId;
1233 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1234
1235 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1236 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1237 return;
1238 }//if
1239 #ifdef VM_TRACE
1240 if(globalData.testOn){
1241 Uint16 proc =
1242 (recNode == 0 ? globalData.ownId : recNode);
1243 signal->header.theSendersBlockRef = sendBRef;
1244 globalSignalLoggers.sendSignal(signal->header,
1245 jobBuffer,
1246 &signal->theData[0],
1247 proc,
1248 sections->m_ptr, noOfSections);
1249 }
1250 #endif
1251
1252 if(recNode == ourProcessor || recNode == 0) {
1253 signal->header.theSendersSignalId = tSignalId;
1254 signal->header.theSendersBlockRef = sendBRef;
1255
1256 Uint32 * dst = signal->theData + length;
1257
1258 /* We need to copy the segmented section data into separate
1259 * sections when sending locally and keeping a copy ourselves
1260 */
1261 for (Uint32 sec=0; sec < noOfSections; sec++)
1262 {
1263 Uint32 secCopy;
1264 bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1265 ndbrequire (ok);
1266 * dst ++ = secCopy;
1267 }
1268
1269 #ifdef NDBD_MULTITHREADED
1270 if (jobBuffer == JBB)
1271 sendlocal(m_threadId, &signal->header, signal->theData,
1272 signal->theData + length);
1273 else
1274 sendprioa(m_threadId, &signal->header, signal->theData,
1275 signal->theData + length);
1276 #else
1277 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1278 #endif
1279 } else {
1280 // send distributed Signal
1281 SignalHeader sh;
1282
1283 Uint32 tTrace = signal->getTrace();
1284
1285 sh.theVerId_signalNumber = gsn;
1286 sh.theReceiversBlockNumber = recBlock;
1287 sh.theSendersBlockRef = refToBlock(sendBRef);
1288 sh.theLength = length;
1289 sh.theTrace = tTrace;
1290 sh.theSignalId = tSignalId;
1291 sh.m_noOfSections = noOfSections;
1292 sh.m_fragmentInfo = tFragInfo;
1293
1294 #ifdef TRACE_DISTRIBUTED
1295 ndbout_c("send: %s(%d) to (%s, %d)",
1296 getSignalName(gsn), gsn, getBlockName(recBlock),
1297 recNode);
1298 #endif
1299
1300 SendStatus ss;
1301 #ifdef NDBD_MULTITHREADED
1302 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1303 recNode, &g_sectionSegmentPool, sections->m_ptr);
1304 #else
1305 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1306 &signal->theData[0], recNode,
1307 g_sectionSegmentPool,
1308 sections->m_ptr);
1309 #endif
1310
1311 ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1312 }
1313
1314 signal->header.m_noOfSections = 0;
1315 signal->header.m_fragmentInfo = 0;
1316 return;
1317 }
1318
1319 void
sendSignalNoRelease(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jobBuffer,SectionHandle * sections) const1320 SimulatedBlock::sendSignalNoRelease(NodeReceiverGroup rg,
1321 GlobalSignalNumber gsn,
1322 Signal* signal,
1323 Uint32 length,
1324 JobBufferLevel jobBuffer,
1325 SectionHandle * sections) const {
1326 /**
1327 * Implementation the same as sendSignal(), except that
1328 * the sections are duplicated when sending locally, and
1329 * not released
1330 */
1331
1332 Uint32 noOfSections = sections->m_cnt;
1333 Uint32 tSignalId = signal->header.theSignalId;
1334 Uint32 tTrace = signal->getTrace();
1335 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1336
1337 Uint32 ourProcessor = globalData.ownId;
1338 Uint32 recBlock = rg.m_block;
1339
1340 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1341
1342 signal->header.theLength = length;
1343 signal->header.theVerId_signalNumber = gsn;
1344 signal->header.theReceiversBlockNumber = recBlock;
1345 signal->header.theSendersSignalId = tSignalId;
1346 signal->header.theSendersBlockRef = reference();
1347 signal->header.m_noOfSections = noOfSections;
1348
1349 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1350 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1351 return;
1352 }//if
1353
1354 SignalHeader sh;
1355 sh.theVerId_signalNumber = gsn;
1356 sh.theReceiversBlockNumber = recBlock;
1357 sh.theSendersBlockRef = refToBlock(reference());
1358 sh.theLength = length;
1359 sh.theTrace = tTrace;
1360 sh.theSignalId = tSignalId;
1361 sh.m_noOfSections = noOfSections;
1362 sh.m_fragmentInfo = tFragInfo;
1363
1364 /**
1365 * Check own node
1366 */
1367 if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor))
1368 {
1369 #ifdef VM_TRACE
1370 if(globalData.testOn){
1371 globalSignalLoggers.sendSignal(signal->header,
1372 jobBuffer,
1373 &signal->theData[0],
1374 ourProcessor,
1375 sections->m_ptr, noOfSections);
1376 }
1377 #endif
1378
1379 Uint32 * dst = signal->theData + length;
1380
1381 /* We need to copy the segmented section data into separate
1382 * sections when sending locally and keeping a copy ourselves
1383 */
1384 for (Uint32 sec=0; sec < noOfSections; sec++)
1385 {
1386 Uint32 secCopy;
1387 bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1388 ndbrequire (ok);
1389 * dst ++ = secCopy;
1390 }
1391
1392 #ifdef NDBD_MULTITHREADED
1393 if (jobBuffer == JBB)
1394 sendlocal(m_threadId, &signal->header, signal->theData,
1395 signal->theData + length);
1396 else
1397 sendprioa(m_threadId, &signal->header, signal->theData,
1398 signal->theData + length);
1399 #else
1400 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1401 #endif
1402
1403 rg.m_nodes.clear((Uint32)0);
1404 rg.m_nodes.clear(ourProcessor);
1405 }
1406
1407 /**
1408 * Do the big loop
1409 */
1410 Uint32 recNode = 0;
1411 while(!rg.m_nodes.isclear()){
1412 recNode = rg.m_nodes.find(recNode + 1);
1413 rg.m_nodes.clear(recNode);
1414
1415 #ifdef VM_TRACE
1416 if(globalData.testOn){
1417 globalSignalLoggers.sendSignal(signal->header,
1418 jobBuffer,
1419 &signal->theData[0],
1420 recNode,
1421 sections->m_ptr, noOfSections);
1422 }
1423 #endif
1424
1425 #ifdef TRACE_DISTRIBUTED
1426 ndbout_c("send: %s(%d) to (%s, %d)",
1427 getSignalName(gsn), gsn, getBlockName(recBlock),
1428 recNode);
1429 #endif
1430
1431 SendStatus ss;
1432 #ifdef NDBD_MULTITHREADED
1433 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1434 recNode, &g_sectionSegmentPool, sections->m_ptr);
1435 #else
1436 ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer,
1437 &signal->theData[0], recNode,
1438 g_sectionSegmentPool,
1439 sections->m_ptr);
1440 #endif
1441
1442 ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1443 }
1444
1445 signal->header.m_noOfSections = 0;
1446 signal->header.m_fragmentInfo = 0;
1447
1448 return;
1449 }
1450
1451
1452 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length) const1453 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1454 GlobalSignalNumber gsn,
1455 Signal* signal,
1456 Uint32 delayInMilliSeconds,
1457 Uint32 length) const {
1458
1459 BlockNumber bnr = refToBlock(ref);
1460
1461 check_sections(signal, signal->header.m_noOfSections, 0);
1462
1463 signal->header.theLength = length;
1464 signal->header.theSendersSignalId = signal->header.theSignalId;
1465 signal->header.theVerId_signalNumber = gsn;
1466 signal->header.theReceiversBlockNumber = bnr;
1467 signal->header.theSendersBlockRef = reference();
1468
1469 #ifdef VM_TRACE
1470 {
1471 if(globalData.testOn){
1472 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1473 signal->header,
1474 0,
1475 &signal->theData[0],
1476 globalData.ownId);
1477 }
1478 }
1479 #endif
1480
1481 #ifdef NDBD_MULTITHREADED
1482 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1483 #else
1484 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1485 #endif
1486
1487 // befor 2nd parameter to globalTimeQueue.insert
1488 // (Priority)theSendSig[sigIndex].jobBuffer
1489 }
1490
1491 void
sendSignalWithDelay(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 delayInMilliSeconds,Uint32 length,SectionHandle * sections) const1492 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1493 GlobalSignalNumber gsn,
1494 Signal* signal,
1495 Uint32 delayInMilliSeconds,
1496 Uint32 length,
1497 SectionHandle * sections) const {
1498
1499 Uint32 noOfSections = sections->m_cnt;
1500 BlockNumber bnr = refToBlock(ref);
1501
1502 BlockReference sendBRef = reference();
1503
1504 if (bnr == 0) {
1505 bnr_error();
1506 }//if
1507
1508 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1509
1510 signal->header.theLength = length;
1511 signal->header.theSendersSignalId = signal->header.theSignalId;
1512 signal->header.theSendersBlockRef = sendBRef;
1513 signal->header.theVerId_signalNumber = gsn;
1514 signal->header.theReceiversBlockNumber = bnr;
1515 signal->header.m_noOfSections = noOfSections;
1516
1517 Uint32 * dst = signal->theData + length;
1518 * dst ++ = sections->m_ptr[0].i;
1519 * dst ++ = sections->m_ptr[1].i;
1520 * dst ++ = sections->m_ptr[2].i;
1521
1522 #ifdef VM_TRACE
1523 {
1524 if(globalData.testOn){
1525 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1526 signal->header,
1527 0,
1528 &signal->theData[0],
1529 globalData.ownId);
1530 }
1531 }
1532 #endif
1533
1534 #ifdef NDBD_MULTITHREADED
1535 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1536 #else
1537 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1538 #endif
1539
1540 signal->header.m_noOfSections = 0;
1541 signal->header.m_fragmentInfo = 0;
1542 sections->m_cnt = 0;
1543 }
1544
1545 void
release(SegmentedSectionPtr & ptr)1546 SimulatedBlock::release(SegmentedSectionPtr & ptr)
1547 {
1548 ::release(SB_SP_ARG ptr);
1549 }
1550
1551 void
releaseSection(Uint32 firstSegmentIVal)1552 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1553 {
1554 ::releaseSection(SB_SP_ARG firstSegmentIVal);
1555 }
1556
1557 void
releaseSections(SectionHandle & handle)1558 SimulatedBlock::releaseSections(SectionHandle& handle)
1559 {
1560 ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1561 handle.m_cnt = 0;
1562 }
1563
1564 bool
appendToSection(Uint32 & firstSegmentIVal,const Uint32 * src,Uint32 len)1565 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal, const Uint32* src, Uint32 len)
1566 {
1567 return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1568 }
1569
1570 bool
import(Ptr<SectionSegment> & first,const Uint32 * src,Uint32 len)1571 SimulatedBlock::import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len)
1572 {
1573 return ::import(SB_SP_ARG first, src, len);
1574 }
1575
1576 bool
import(SegmentedSectionPtr & ptr,const Uint32 * src,Uint32 len)1577 SimulatedBlock::import(SegmentedSectionPtr& ptr, const Uint32* src, Uint32 len)
1578 {
1579 Ptr<SectionSegment> tmp;
1580 if (::import(SB_SP_ARG tmp, src, len))
1581 {
1582 ptr.i = tmp.i;
1583 ptr.p = tmp.p;
1584 ptr.sz = len;
1585 return true;
1586 }
1587 return false;
1588 }
1589
1590 bool
dupSection(Uint32 & copyFirstIVal,Uint32 srcFirstIVal)1591 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1592 {
1593 return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1594 }
1595
1596 bool
writeToSection(Uint32 firstSegmentIVal,Uint32 offset,const Uint32 * src,Uint32 len)1597 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32 offset,
1598 const Uint32* src, Uint32 len)
1599 {
1600 return ::writeToSection(firstSegmentIVal, offset, src, len);
1601 }
1602
1603 class SectionSegmentPool&
getSectionSegmentPool()1604 SimulatedBlock::getSectionSegmentPool(){
1605 return g_sectionSegmentPool;
1606 }
1607
1608 NewVARIABLE *
allocateBat(int batSize)1609 SimulatedBlock::allocateBat(int batSize){
1610 NewVARIABLE* bat = NewVarRef;
1611 bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE));
1612 NewVarRef = bat;
1613 theBATSize = batSize;
1614 return bat;
1615 }
1616
1617 void
freeBat()1618 SimulatedBlock::freeBat(){
1619 if(NewVarRef != 0){
1620 free(NewVarRef);
1621 NewVarRef = 0;
1622 }
1623 }
1624
1625 const NewVARIABLE *
getBat(Uint16 blockNo,Uint32 instanceNo)1626 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1627 assert(blockNo == blockToMain(blockNo));
1628 SimulatedBlock * sb = globalData.getBlock(blockNo);
1629 if (sb != 0 && instanceNo != 0)
1630 sb = sb->getInstance(instanceNo);
1631 if(sb == 0)
1632 return 0;
1633 return sb->NewVarRef;
1634 }
1635
1636 Uint16
getBatSize(Uint16 blockNo,Uint32 instanceNo)1637 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1638 assert(blockNo == blockToMain(blockNo));
1639 SimulatedBlock * sb = globalData.getBlock(blockNo);
1640 if (sb != 0 && instanceNo != 0)
1641 sb = sb->getInstance(instanceNo);
1642 if(sb == 0)
1643 return 0;
1644 return sb->theBATSize;
1645 }
1646
allocRecord(const char * type,size_t s,size_t n,bool clear,Uint32 paramId)1647 void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
1648 {
1649 return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1650 }
1651
1652 void*
allocRecordAligned(const char * type,size_t s,size_t n,void ** unaligned_buffer,Uint32 align,bool clear,Uint32 paramId)1653 SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
1654 {
1655
1656 void * p = NULL;
1657 Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1658 size_t size = n*s + over_alloc;
1659 Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
1660 refresh_watch_dog(9);
1661 if (real_size > 0){
1662 #ifdef VM_TRACE_MEM
1663 ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
1664 getBlockName(number()),
1665 type,
1666 s,
1667 n,
1668 real_size);
1669 #endif
1670 if( real_size == (Uint64)size )
1671 p = ndbd_malloc(size);
1672 if (p == NULL){
1673 char buf1[255];
1674 char buf2[255];
1675 struct ndb_mgm_param_info param_info;
1676 size_t size = sizeof(ndb_mgm_param_info);
1677
1678 if(0 != paramId && 0 == ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size)) {
1679 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for parameter %s",
1680 getBlockName(number()), param_info.m_name);
1681 } else {
1682 BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
1683 getBlockName(number()), type);
1684 }
1685 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
1686 (Uint32)s, (Uint32)n, (Uint64)real_size);
1687 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1688 }
1689
1690 if(clear){
1691 char * ptr = (char*)p;
1692 const Uint32 chunk = 128 * 1024;
1693 while(size > chunk){
1694 refresh_watch_dog(9);
1695 memset(ptr, 0, chunk);
1696 ptr += chunk;
1697 size -= chunk;
1698 }
1699 refresh_watch_dog(9);
1700 memset(ptr, 0, size);
1701 }
1702 if (unaligned_buffer)
1703 {
1704 *unaligned_buffer = p;
1705 p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1706 #ifdef VM_TRACE
1707 g_eventLogger->info("'%s' (%u) %llu %llu, alignment correction %u bytes",
1708 type, align, (Uint64)p, (Uint64)p+n*s,
1709 (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1710 #endif
1711 }
1712 }
1713 return p;
1714 }
1715
1716 void
deallocRecord(void ** ptr,const char * type,size_t s,size_t n)1717 SimulatedBlock::deallocRecord(void ** ptr,
1718 const char * type, size_t s, size_t n){
1719 (void)type;
1720
1721 if(* ptr != 0){
1722 ndbd_free(* ptr, n*s);
1723 * ptr = 0;
1724 }
1725 }
1726
1727 int
sortchunks(const void * _e0,const void * _e1)1728 SimulatedBlock::sortchunks(const void * _e0, const void * _e1)
1729 {
1730 const AllocChunk *p0 = (const AllocChunk*)_e0;
1731 const AllocChunk *p1 = (const AllocChunk*)_e1;
1732
1733 if (p0->ptrI > p1->ptrI)
1734 return 1;
1735 if (p0->ptrI < p1->ptrI)
1736 return -1;
1737 return 0;
1738 }
1739
1740 Uint32
allocChunks(AllocChunk dst[],Uint32 arraysize,Uint32 rg,Uint32 pages,Uint32 paramId)1741 SimulatedBlock::allocChunks(AllocChunk dst[],
1742 Uint32 arraysize,
1743 Uint32 rg,
1744 Uint32 pages,
1745 Uint32 paramId)
1746 {
1747 const Uint32 save = pages; // For fail
1748 Uint32 i = 0;
1749 for (; i<arraysize && pages > 0; i++)
1750 {
1751 Uint32 cnt = pages;
1752 m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1753 if (unlikely(cnt == 0))
1754 goto fail;
1755 pages -= cnt;
1756 dst[i].cnt = cnt;
1757 }
1758 if (unlikely(pages != 0))
1759 goto fail;
1760
1761 qsort(dst, i, sizeof(dst[0]), sortchunks);
1762 return i;
1763
1764 fail:
1765 char buf1[255];
1766 char buf2[255];
1767 struct ndb_mgm_param_info param_info;
1768 size_t size = sizeof(ndb_mgm_param_info);
1769
1770 if (ndb_mgm_get_db_parameter_info(paramId, ¶m_info, &size) != 0)
1771 {
1772 ndbassert(false);
1773 param_info.m_name = "<unknown>";
1774 }
1775
1776 BaseString::snprintf(buf1, sizeof(buf1),
1777 "%s could not allocate memory for parameter %s",
1778 getBlockName(number()), param_info.m_name);
1779 BaseString::snprintf(buf2, sizeof(buf2), "Requested: %llu bytes",
1780 Uint64(save) * sizeof(GlobalPage));
1781 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1782 return 0;
1783 }
1784
1785 void
refresh_watch_dog(Uint32 place)1786 SimulatedBlock::refresh_watch_dog(Uint32 place)
1787 {
1788 #ifdef NDBD_MULTITHREADED
1789 (*m_watchDogCounter) = place;
1790 #else
1791 globalData.incrementWatchDogCounter(place);
1792 #endif
1793 }
1794
1795 void
update_watch_dog_timer(Uint32 interval)1796 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1797 {
1798 extern EmulatorData globalEmulatorData;
1799 globalEmulatorData.theWatchDog->setCheckInterval(interval);
1800 }
1801
1802 void
progError(int line,int err_code,const char * extra) const1803 SimulatedBlock::progError(int line, int err_code, const char* extra) const {
1804 jamLine(line);
1805
1806 const char *aBlockName = getBlockName(number(), "VM Kernel");
1807
1808 // Pack status of interesting config variables
1809 // so that we can print them in error.log
1810 int magicStatus =
1811 (m_ctx.m_config.stopOnError()<<1) +
1812 (m_ctx.m_config.getInitialStart()<<2);
1813
1814 /* Add line number to block name */
1815 char buf[100];
1816 BaseString::snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x",
1817 aBlockName, line, magicStatus);
1818
1819 ErrorReporter::handleError(err_code, extra, buf);
1820
1821 }
1822
1823 void
infoEvent(const char * msg,...) const1824 SimulatedBlock::infoEvent(const char * msg, ...) const {
1825 if(msg == 0)
1826 return;
1827
1828 SignalT<25> signalT;
1829 signalT.theData[0] = NDB_LE_InfoEvent;
1830 char * buf = (char *)(signalT.theData+1);
1831
1832 va_list ap;
1833 va_start(ap, msg);
1834 BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1835 va_end(ap);
1836
1837 int len = strlen(buf) + 1;
1838 if(len > 96){
1839 len = 96;
1840 buf[95] = 0;
1841 }
1842
1843 /**
1844 * Init and put it into the job buffer
1845 */
1846 memset(&signalT.header, 0, sizeof(SignalHeader));
1847
1848 const Signal * signal = globalScheduler.getVMSignals();
1849 Uint32 tTrace = signal->header.theTrace;
1850 Uint32 tSignalId = signal->header.theSignalId;
1851
1852 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1853 signalT.header.theReceiversBlockNumber = CMVMI;
1854 signalT.header.theSendersBlockRef = reference();
1855 signalT.header.theTrace = tTrace;
1856 signalT.header.theSignalId = tSignalId;
1857 signalT.header.theLength = ((len+3)/4)+1;
1858
1859 #ifdef NDBD_MULTITHREADED
1860 sendlocal(m_threadId,
1861 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1862 #else
1863 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1864 signalT.m_sectionPtrI);
1865 #endif
1866 }
1867
1868 void
warningEvent(const char * msg,...) const1869 SimulatedBlock::warningEvent(const char * msg, ...) const {
1870 if(msg == 0)
1871 return;
1872
1873 SignalT<25> signalT;
1874 signalT.theData[0] = NDB_LE_WarningEvent;
1875 char * buf = (char *)(signalT.theData+1);
1876
1877 va_list ap;
1878 va_start(ap, msg);
1879 BaseString::vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4
1880 va_end(ap);
1881
1882 int len = strlen(buf) + 1;
1883 if(len > 96){
1884 len = 96;
1885 buf[95] = 0;
1886 }
1887
1888 /**
1889 * Init and put it into the job buffer
1890 */
1891 memset(&signalT.header, 0, sizeof(SignalHeader));
1892
1893 const Signal * signal = globalScheduler.getVMSignals();
1894 Uint32 tTrace = signal->header.theTrace;
1895 Uint32 tSignalId = signal->header.theSignalId;
1896
1897 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1898 signalT.header.theReceiversBlockNumber = CMVMI;
1899 signalT.header.theSendersBlockRef = reference();
1900 signalT.header.theTrace = tTrace;
1901 signalT.header.theSignalId = tSignalId;
1902 signalT.header.theLength = ((len+3)/4)+1;
1903
1904 #ifdef NDBD_MULTITHREADED
1905 sendlocal(m_threadId,
1906 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1907 #else
1908 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1909 signalT.m_sectionPtrI);
1910 #endif
1911 }
1912
1913 void
execNODE_STATE_REP(Signal * signal)1914 SimulatedBlock::execNODE_STATE_REP(Signal* signal){
1915 const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0];
1916
1917 this->theNodeState = rep->nodeState;
1918 }
1919
1920 void
execCHANGE_NODE_STATE_REQ(Signal * signal)1921 SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){
1922 const ChangeNodeStateReq * const req =
1923 (ChangeNodeStateReq *)&signal->theData[0];
1924
1925 this->theNodeState = req->nodeState;
1926 const Uint32 senderData = req->senderData;
1927 const BlockReference senderRef = req->senderRef;
1928
1929 /**
1930 * Pack return signal
1931 */
1932 ChangeNodeStateConf * const conf =
1933 (ChangeNodeStateConf *)&signal->theData[0];
1934
1935 conf->senderData = senderData;
1936
1937 sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
1938 ChangeNodeStateConf::SignalLength, JBB);
1939 }
1940
1941 void
execNDB_TAMPER(Signal * signal)1942 SimulatedBlock::execNDB_TAMPER(Signal * signal){
1943 if (signal->getLength() == 1)
1944 {
1945 SET_ERROR_INSERT_VALUE(signal->theData[0]);
1946 }
1947 else
1948 {
1949 SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
1950 }
1951 }
1952
1953 void
execSIGNAL_DROPPED_REP(Signal * signal)1954 SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){
1955 /* Note no need for fragmented signal handling as we are
1956 * going to crash this node
1957 */
1958 char msg[64];
1959 const SignalDroppedRep * const rep = (SignalDroppedRep *)&signal->theData[0];
1960 BaseString::snprintf(msg, sizeof(msg), "%s GSN: %u (%u,%u)", getBlockName(number()),
1961 rep->originalGsn, rep->originalLength,rep->originalSectionCount);
1962 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
1963 msg,
1964 __FILE__,
1965 NST_ErrorHandler);
1966 }
1967
1968 void
execCONTINUE_FRAGMENTED(Signal * signal)1969 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
1970 ljamEntry();
1971
1972 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
1973 ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
1974
1975 switch (sig->type)
1976 {
1977 case ContinueFragmented::CONTINUE_SENDING :
1978 {
1979 ljam();
1980 Ptr<FragmentSendInfo> fragPtr;
1981
1982 c_segmentedFragmentSendList.first(fragPtr);
1983 for(; !fragPtr.isNull();){
1984 ljam();
1985 Ptr<FragmentSendInfo> copyPtr = fragPtr;
1986 c_segmentedFragmentSendList.next(fragPtr);
1987
1988 sendNextSegmentedFragment(signal, * copyPtr.p);
1989 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
1990 ljam();
1991 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
1992 ljam();
1993 execute(signal, copyPtr.p->m_callback, 0);
1994 }//if
1995 c_segmentedFragmentSendList.release(copyPtr);
1996 }
1997 }
1998
1999 c_linearFragmentSendList.first(fragPtr);
2000 for(; !fragPtr.isNull();){
2001 ljam();
2002 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2003 c_linearFragmentSendList.next(fragPtr);
2004
2005 sendNextLinearFragment(signal, * copyPtr.p);
2006 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2007 ljam();
2008 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2009 ljam();
2010 execute(signal, copyPtr.p->m_callback, 0);
2011 }//if
2012 c_linearFragmentSendList.release(copyPtr);
2013 }
2014 }
2015
2016 if(c_segmentedFragmentSendList.isEmpty() &&
2017 c_linearFragmentSendList.isEmpty()){
2018 ljam();
2019 c_fragSenderRunning = false;
2020 return;
2021 }
2022
2023 sig->type = ContinueFragmented::CONTINUE_SENDING;
2024 sig->line = __LINE__;
2025 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2026 break;
2027 }
2028 case ContinueFragmented::CONTINUE_CLEANUP:
2029 {
2030 ljam();
2031
2032 const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2033 /* Check length of signal */
2034 ndbassert(signal->getLength() ==
2035 ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2036 callbackWords);
2037
2038 Callback cb;
2039 memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2040
2041 doNodeFailureCleanup(signal,
2042 sig->cleanup.failedNodeId,
2043 sig->cleanup.resource,
2044 sig->cleanup.cursor,
2045 sig->cleanup.elementsCleaned,
2046 cb);
2047 break;
2048 }
2049 default:
2050 ndbrequire(false);
2051 }
2052 }
2053
2054 void
execSTOP_FOR_CRASH(Signal * signal)2055 SimulatedBlock::execSTOP_FOR_CRASH(Signal* signal)
2056 {
2057 #ifdef NDBD_MULTITHREADED
2058 mt_execSTOP_FOR_CRASH();
2059 #endif
2060 }
2061
2062 void
execNODE_START_REP(Signal * signal)2063 SimulatedBlock::execNODE_START_REP(Signal* signal)
2064 {
2065 }
2066
2067 void
execAPI_START_REP(Signal * signal)2068 SimulatedBlock::execAPI_START_REP(Signal* signal)
2069 {
2070 }
2071
2072 void
execSEND_PACKED(Signal * signal)2073 SimulatedBlock::execSEND_PACKED(Signal* signal)
2074 {
2075 }
2076
2077 // MT LQH callback CONF via signal
2078
2079 const SimulatedBlock::CallbackEntry&
getCallbackEntry(Uint32 ci)2080 SimulatedBlock::getCallbackEntry(Uint32 ci)
2081 {
2082 ndbrequire(m_callbackTableAddr != 0);
2083 const CallbackTable& ct = *m_callbackTableAddr;
2084 ndbrequire(ci < ct.m_count);
2085 return ct.m_entry[ci];
2086 }
2087
2088 void
sendCallbackConf(Signal * signal,Uint32 fullBlockNo,CallbackPtr & cptr,Uint32 returnCode)2089 SimulatedBlock::sendCallbackConf(Signal* signal, Uint32 fullBlockNo,
2090 CallbackPtr& cptr, Uint32 returnCode)
2091 {
2092 Uint32 blockNo = blockToMain(fullBlockNo);
2093 Uint32 instanceNo = blockToInstance(fullBlockNo);
2094 SimulatedBlock* b = globalData.getBlock(blockNo, instanceNo);
2095 ndbrequire(b != 0);
2096
2097 const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2098
2099 // wl4391_todo add as arg if this is not enough
2100 Uint32 senderData = returnCode;
2101
2102 if (!isNdbMtLqh()) {
2103 Callback c;
2104 c.m_callbackFunction = ce.m_function;
2105 c.m_callbackData = cptr.m_callbackData;
2106 b->execute(signal, c, returnCode);
2107
2108 if (ce.m_flags & CALLBACK_ACK) {
2109 jam();
2110 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2111 ack->senderData = senderData;
2112 EXECUTE_DIRECT(number(), GSN_CALLBACK_ACK,
2113 signal, CallbackAck::SignalLength);
2114 }
2115 } else {
2116 CallbackConf* conf = (CallbackConf*)signal->getDataPtrSend();
2117 conf->senderData = senderData;
2118 conf->senderRef = reference();
2119 conf->callbackIndex = cptr.m_callbackIndex;
2120 conf->callbackData = cptr.m_callbackData;
2121 conf->returnCode = returnCode;
2122
2123 if (ce.m_flags & CALLBACK_DIRECT) {
2124 jam();
2125 EXECUTE_DIRECT(blockNo, GSN_CALLBACK_CONF,
2126 signal, CallbackConf::SignalLength, instanceNo);
2127 } else {
2128 jam();
2129 BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2130 sendSignal(ref, GSN_CALLBACK_CONF,
2131 signal, CallbackConf::SignalLength, JBB);
2132 }
2133 }
2134 cptr.m_callbackIndex = ZNIL;
2135 }
2136
2137 void
execCALLBACK_CONF(Signal * signal)2138 SimulatedBlock::execCALLBACK_CONF(Signal* signal)
2139 {
2140 const CallbackConf* conf = (const CallbackConf*)signal->getDataPtr();
2141
2142 Uint32 senderData = conf->senderData;
2143 Uint32 senderRef = conf->senderRef;
2144
2145 ndbrequire(m_callbackTableAddr != 0);
2146 const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
2147 CallbackFunction function = ce.m_function;
2148
2149 Callback callback;
2150 callback.m_callbackFunction = function;
2151 callback.m_callbackData = conf->callbackData;
2152 execute(signal, callback, conf->returnCode);
2153
2154 if (ce.m_flags & CALLBACK_ACK) {
2155 jam();
2156 CallbackAck* ack = (CallbackAck*)signal->getDataPtrSend();
2157 ack->senderData = senderData;
2158 sendSignal(senderRef, GSN_CALLBACK_ACK,
2159 signal, CallbackAck::SignalLength, JBB);
2160 }
2161 }
2162
2163 #ifdef VM_TRACE_TIME
2164 void
clearTimes()2165 SimulatedBlock::clearTimes() {
2166 for(Uint32 i = 0; i <= MAX_GSN; i++){
2167 m_timeTrace[i].cnt = 0;
2168 m_timeTrace[i].sum = 0;
2169 m_timeTrace[i].sub = 0;
2170 }
2171 }
2172
2173 void
printTimes(FILE * output)2174 SimulatedBlock::printTimes(FILE * output){
2175 fprintf(output, "-- %s --\n", getBlockName(number()));
2176 Uint64 sum = 0;
2177 for(Uint32 i = 0; i <= MAX_GSN; i++){
2178 Uint32 n = m_timeTrace[i].cnt;
2179 if(n != 0){
2180 double dn = n;
2181
2182 double avg = m_timeTrace[i].sum;
2183 double avg2 = avg - m_timeTrace[i].sub;
2184
2185 avg /= dn;
2186 avg2 /= dn;
2187
2188 fprintf(output,
2189 //name ; cnt ; loc ; acc
2190 "%s ; #%d ; %dus ; %dus ; %dms\n",
2191 getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2192 (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2193
2194 sum += (m_timeTrace[i].sum - m_timeTrace[i].sub);
2195 }
2196 }
2197 sum = (sum + 500)/ 1000;
2198 fprintf(output, "-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2199 fprintf(output, "\n");
2200 fflush(output);
2201 }
2202
2203 #endif
2204
FragmentInfo(Uint32 fragId,Uint32 sender)2205 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2206 m_fragmentId = fragId;
2207 m_senderRef = sender;
2208 m_sectionPtrI[0] = RNIL;
2209 m_sectionPtrI[1] = RNIL;
2210 m_sectionPtrI[2] = RNIL;
2211 }
2212
FragmentSendInfo()2213 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2214 {
2215 }
2216
2217 bool
assembleFragments(Signal * signal)2218 SimulatedBlock::assembleFragments(Signal * signal){
2219 Uint32 sigLen = signal->length() - 1;
2220 Uint32 fragId = signal->theData[sigLen];
2221 Uint32 fragInfo = signal->header.m_fragmentInfo;
2222 Uint32 senderRef = signal->getSendersBlockRef();
2223
2224 Uint32 *sectionPtr = signal->m_sectionPtrI;
2225
2226 if(fragInfo == 0){
2227 return true;
2228 }
2229
2230 const Uint32 secs = signal->header.m_noOfSections;
2231 const Uint32 * const secNos = &signal->theData[sigLen - secs];
2232
2233 if(fragInfo == 1){
2234 /**
2235 * First in train
2236 */
2237 Ptr<FragmentInfo> fragPtr;
2238 if(!c_fragmentInfoHash.seize(fragPtr)){
2239 ndbrequire(false);
2240 return false;
2241 }
2242
2243 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2244 c_fragmentInfoHash.add(fragPtr);
2245
2246 for(Uint32 i = 0; i<secs; i++){
2247 Uint32 sectionNo = secNos[i];
2248 ndbassert(sectionNo < 3);
2249 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[i];
2250 }
2251
2252 ndbassert(! fragPtr.p->isDropped() );
2253
2254 /**
2255 * Don't release allocated segments
2256 */
2257 signal->header.m_fragmentInfo = 0;
2258 signal->header.m_noOfSections = 0;
2259 return false;
2260 }
2261
2262 FragmentInfo key(fragId, senderRef);
2263 Ptr<FragmentInfo> fragPtr;
2264 if(c_fragmentInfoHash.find(fragPtr, key)){
2265
2266 /**
2267 * FragInfo == 2 or 3
2268 */
2269 if ( likely(! fragPtr.p->isDropped()) )
2270 {
2271 Uint32 i;
2272 for(i = 0; i<secs; i++){
2273 Uint32 sectionNo = secNos[i];
2274 ndbassert(sectionNo < 3);
2275 Uint32 sectionPtrI = sectionPtr[i];
2276 if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2277 linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2278 } else {
2279 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2280 }
2281 }
2282
2283 /**
2284 * fragInfo = 2
2285 */
2286 if(fragInfo == 2){
2287 signal->header.m_fragmentInfo = 0;
2288 signal->header.m_noOfSections = 0;
2289 return false;
2290 }
2291
2292 /**
2293 * fragInfo = 3
2294 */
2295 for(i = 0; i<3; i++){
2296 Uint32 ptrI = fragPtr.p->m_sectionPtrI[i];
2297 if(ptrI != RNIL){
2298 signal->m_sectionPtrI[i] = ptrI;
2299 } else {
2300 break;
2301 }
2302 }
2303
2304 signal->setLength(sigLen - secs);
2305 signal->header.m_noOfSections = i;
2306 signal->header.m_fragmentInfo = 0;
2307
2308 c_fragmentInfoHash.release(fragPtr);
2309 return true;
2310 }
2311 else
2312 {
2313 /* This fragmented signal has already had at least 1 fragment
2314 * dropped. We must release the received segments.
2315 */
2316 for (Uint32 i=0; i < secs; i++)
2317 releaseSection( sectionPtr[i] );
2318
2319 signal->header.m_fragmentInfo = 0;
2320 signal->header.m_noOfSections = 0;
2321
2322 /* FragInfo == 2
2323 * More fragments to come, keep waiting
2324 */
2325 if (fragInfo == 2)
2326 return false;
2327
2328 /* FragInfo == 3
2329 * That was the last fragment.
2330 * We're now ready for handling the dropped signal.
2331 */
2332 SignalDroppedRep * rep = (SignalDroppedRep*)signal->theData;
2333 Uint32 gsn = signal->header.theVerId_signalNumber;
2334 Uint32 len = signal->header.theLength;
2335 Uint32 newLen= (len > 22 ? 22 : len);
2336 memmove(rep->originalData, signal->theData, (4 * newLen));
2337 rep->originalGsn = gsn;
2338 rep->originalLength = len;
2339 rep->originalSectionCount = 0;
2340 signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2341 signal->header.theLength = newLen + 3;
2342 signal->header.m_noOfSections = 0;
2343 signal->header.m_fragmentInfo = 3;
2344
2345
2346 /**
2347 * NOTE: Don't use EXECUTE_DIRECT as it
2348 * sets sendersBlockRef to reference()
2349 */
2350 /* Perform dropped signal handling, in this thread, now */
2351 executeFunction(GSN_SIGNAL_DROPPED_REP, signal);
2352
2353 /* return false to caller - they should not process the signal */
2354 return false;
2355 } // else (isDropped())
2356 }
2357
2358 /**
2359 * Unable to find fragment
2360 */
2361 ndbrequire(false);
2362 return false;
2363 }
2364
2365 bool
assembleDroppedFragments(Signal * signal)2366 SimulatedBlock::assembleDroppedFragments(Signal* signal)
2367 {
2368 /* This method is called at the start of a SIGNAL_DROPPED_REP
2369 * handler when there is a chance that the dropped signal could
2370 * be part of a fragmented signal.
2371 * If the dropped signal was a fragmented signal, this
2372 * needs to be handled specially to ensure that fragments
2373 * of the signal are correctly dropped to avoid segment
2374 * leaks etc.
2375 * There are a number of cases :
2376 * 1) First fragment dropped (FragInfo=1)
2377 * All remaining fragments must be dropped when they
2378 * arrive. The Signal dropped report handler must be
2379 * executed when the last fragment has arrived.
2380 * 2) Middle fragment dropped (FragInfo=2)
2381 * Any existing stored segments must be released.
2382 * All remaining fragments must be dropped when they
2383 * arrive.
2384 * 3) Last fragment dropped (FragInfo=3)
2385 * Any existing stored segments must be released.
2386 * Signal Dropped handling can occur, so return true.
2387 *
2388 * To indicate that a fragment has been dropped for a signal,
2389 * all the section I Values in the fragment's hash entry are
2390 * set to RNIL.
2391 * Signal Dropped Report handling is performed when the last
2392 * fragment arrives. If the last fragment is not dropped
2393 * by the transporter layer then normal fragment assembly
2394 * arranges for dropped signal handling to occur.
2395 */
2396 Uint32 sigLen = signal->length() - 1;
2397 Uint32 fragId = signal->theData[sigLen];
2398 Uint32 fragInfo = signal->header.m_fragmentInfo;
2399 Uint32 senderRef = signal->getSendersBlockRef();
2400
2401 if(fragInfo == 0){
2402 return true;
2403 }
2404
2405 /* This method is for handling SIGNAL_DROPPED_REP only */
2406 ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2407 ndbrequire(signal->header.m_noOfSections == 0);
2408
2409 if(fragInfo == 1){
2410 /**
2411 * First in train
2412 */
2413 Ptr<FragmentInfo> fragPtr;
2414 if(!c_fragmentInfoHash.seize(fragPtr)){
2415 ndbrequire(false);
2416 return false;
2417 }
2418
2419 new (fragPtr.p)FragmentInfo(fragId, senderRef);
2420 c_fragmentInfoHash.add(fragPtr);
2421
2422 /* Mark entry in hash as belonging to dropped signal so subsequent
2423 * fragments can also be dropped
2424 */
2425 fragPtr.p->m_sectionPtrI[0]= RNIL;
2426 fragPtr.p->m_sectionPtrI[1]= RNIL;
2427 fragPtr.p->m_sectionPtrI[2]= RNIL;
2428
2429 /* Wait for last fragment before SignalDroppedRep handling */
2430 signal->header.m_fragmentInfo = 0;
2431 return false;
2432 }
2433
2434 FragmentInfo key(fragId, senderRef);
2435 Ptr<FragmentInfo> fragPtr;
2436 if(c_fragmentInfoHash.find(fragPtr, key)){
2437
2438 /**
2439 * FragInfo == 2 or 3
2440 */
2441 if (! fragPtr.p->isDropped() )
2442 {
2443 /* Fragmented Signal not already marked as dropped
2444 * Need to free stored segments
2445 */
2446 releaseSection(fragPtr.p->m_sectionPtrI[0]);
2447 releaseSection(fragPtr.p->m_sectionPtrI[1]);
2448 releaseSection(fragPtr.p->m_sectionPtrI[2]);
2449
2450 /* Mark as dropped now */
2451 fragPtr.p->m_sectionPtrI[0]= RNIL;
2452 fragPtr.p->m_sectionPtrI[1]= RNIL;
2453 fragPtr.p->m_sectionPtrI[2]= RNIL;
2454
2455 ndbassert( fragPtr.p->isDropped() );
2456 }
2457
2458 /**
2459 * fragInfo = 2
2460 * Still waiting for final fragments.
2461 * Return false to caller.
2462 */
2463 if(fragInfo == 2){
2464 signal->header.m_fragmentInfo = 0;
2465 return false;
2466 }
2467
2468 /**
2469 * fragInfo = 3
2470 * All fragments received, remove entry
2471 * from hash and return to caller for
2472 * dropped signal handling.
2473 */
2474 signal->header.m_fragmentInfo = 0;
2475
2476 c_fragmentInfoHash.release(fragPtr);
2477 return true;
2478 }
2479
2480 /**
2481 * Unable to find fragment
2482 */
2483 ndbrequire(false);
2484 return false;
2485 }
2486
2487 /**
2488 * doCleanupFragInfo
2489 * Iterate over block's Fragment assembly hash, looking
2490 * for in-assembly fragments from the failed node
2491 * Release these
2492 * Returns after each scanned bucket to avoid consuming
2493 * too much time.
2494 *
2495 * Parameters
2496 * failedNodeId : Node id of failed node
2497 * cursor : Hash bucket to start iteration from
2498 * rtUnitsUsed : Total rt units used
2499 * elementsCleaned : Number of elements cleaned
2500 *
2501 * Updates
2502 * cursor : Hash bucket to continue iteration from
2503 * rtUnitsUsed : += units used
2504 * elementsCleaned : += elements cleaned
2505 *
2506 * Returns
2507 * true if all FragInfo structs cleaned up
2508 * false if more to do
2509 */
2510 bool
doCleanupFragInfo(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2511 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2512 Uint32& cursor,
2513 Uint32& rtUnitsUsed,
2514 Uint32& elementsCleaned)
2515 {
2516 ljam();
2517 DLHashTable<FragmentInfo>::Iterator iter;
2518
2519 c_fragmentInfoHash.next(cursor, iter);
2520
2521 const Uint32 startBucket = iter.bucket;
2522
2523 while (!iter.isNull() &&
2524 (iter.bucket == startBucket))
2525 {
2526 ljam();
2527
2528 Ptr<FragmentInfo> curr = iter.curr;
2529 c_fragmentInfoHash.next(iter);
2530
2531 FragmentInfo* fragInfo = curr.p;
2532
2533 if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2534 {
2535 ljam();
2536 /* We were assembling a fragmented signal from the
2537 * failed node, discard the partially assembled
2538 * sections and free the FragmentInfo hash entry
2539 */
2540 for(Uint32 s = 0; s<3; s++)
2541 {
2542 if (fragInfo->m_sectionPtrI[s] != RNIL)
2543 {
2544 ljam();
2545 SegmentedSectionPtr ssptr;
2546 getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2547 release(ssptr);
2548 }
2549 }
2550
2551 /* Release FragmentInfo hash element */
2552 c_fragmentInfoHash.release(curr);
2553
2554 elementsCleaned++;
2555 rtUnitsUsed+=3;
2556 }
2557
2558 rtUnitsUsed++;
2559 } // while
2560
2561 cursor = iter.bucket;
2562 return iter.isNull();
2563 }
2564
2565 bool
doCleanupFragSend(Uint32 failedNodeId,Uint32 & cursor,Uint32 & rtUnitsUsed,Uint32 & elementsCleaned)2566 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2567 Uint32& cursor,
2568 Uint32& rtUnitsUsed,
2569 Uint32& elementsCleaned)
2570 {
2571 ljam();
2572
2573 Ptr<FragmentSendInfo> fragPtr;
2574 const Uint32 NumSendLists = 2;
2575 ndbrequire(cursor < NumSendLists);
2576
2577 DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
2578 { &c_segmentedFragmentSendList,
2579 &c_linearFragmentSendList };
2580
2581 DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
2582
2583 list->first(fragPtr);
2584 for(; !fragPtr.isNull();){
2585 ljam();
2586 Ptr<FragmentSendInfo> copyPtr = fragPtr;
2587 list->next(fragPtr);
2588 rtUnitsUsed++;
2589
2590 NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
2591
2592 if (rg.m_nodes.get(failedNodeId))
2593 {
2594 ljam();
2595 /* Fragmented signal is being sent to node */
2596 rg.m_nodes.clear(failedNodeId);
2597
2598 if (rg.m_nodes.isclear())
2599 {
2600 ljam();
2601 /* No other nodes in receiver group - send
2602 * is cancelled
2603 * Will be cleaned up in the usual CONTINUE_FRAGMENTED
2604 * handling code.
2605 */
2606 copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2607 }
2608 elementsCleaned++;
2609 }
2610 }
2611
2612 /* Next time we'll do the next list */
2613 cursor++;
2614
2615 return (cursor == NumSendLists);
2616 }
2617
2618
2619 Uint32
doNodeFailureCleanup(Signal * signal,Uint32 failedNodeId,Uint32 resource,Uint32 cursor,Uint32 elementsCleaned,Callback & cb)2620 SimulatedBlock::doNodeFailureCleanup(Signal* signal,
2621 Uint32 failedNodeId,
2622 Uint32 resource,
2623 Uint32 cursor,
2624 Uint32 elementsCleaned,
2625 Callback& cb)
2626 {
2627 ljam();
2628 const bool userCallback = (cb.m_callbackFunction != 0);
2629 const Uint32 maxRtUnits = userCallback ?
2630 #ifdef VM_TRACE
2631 2 :
2632 #else
2633 16 :
2634 #endif
2635 ~0; /* Must complete all processing in this call */
2636
2637 Uint32 rtUnitsUsed = 0;
2638
2639 /* Loop over resources, cleaning them up */
2640 do
2641 {
2642 bool resourceDone= false;
2643 switch(resource) {
2644 case ContinueFragmented::RES_FRAGSEND:
2645 {
2646 ljam();
2647 resourceDone = doCleanupFragSend(failedNodeId, cursor,
2648 rtUnitsUsed, elementsCleaned);
2649 break;
2650 }
2651 case ContinueFragmented::RES_FRAGINFO:
2652 {
2653 ljam();
2654 resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2655 rtUnitsUsed, elementsCleaned);
2656 break;
2657 }
2658 case ContinueFragmented::RES_LAST:
2659 {
2660 ljam();
2661 /* Node failure processing complete, execute user callback if provided */
2662 if (userCallback)
2663 execute(signal, cb, elementsCleaned);
2664
2665 return elementsCleaned;
2666 }
2667 default:
2668 ndbrequire(false);
2669 }
2670
2671 /* Did we complete cleaning up this resource? */
2672 if (resourceDone)
2673 {
2674 resource++;
2675 cursor= 0;
2676 }
2677
2678 } while (rtUnitsUsed <= maxRtUnits);
2679
2680 ljam();
2681
2682 /* Not yet completed failure handling.
2683 * Must have exhausted RT units.
2684 * Update cursor and re-invoke
2685 */
2686 ndbassert(userCallback);
2687
2688 /* Send signal to continue processing */
2689
2690 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
2691 sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2692 sig->cleanup.failedNodeId = failedNodeId;
2693 sig->cleanup.resource = resource;
2694 sig->cleanup.cursor = cursor;
2695 sig->cleanup.elementsCleaned= elementsCleaned;
2696 Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
2697 Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2698 callbackWords;
2699 ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
2700 memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2701
2702 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2703
2704 return elementsCleaned;
2705 }
2706
2707 Uint32
simBlockNodeFailure(Signal * signal,Uint32 failedNodeId,Callback & cb)2708 SimulatedBlock::simBlockNodeFailure(Signal* signal,
2709 Uint32 failedNodeId,
2710 Callback& cb)
2711 {
2712 ljam();
2713 return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2714 }
2715
2716 Uint32
debugPrintFragmentCounts()2717 SimulatedBlock::debugPrintFragmentCounts()
2718 {
2719 const char* blockName = getBlockName(theNumber);
2720 DLHashTable<FragmentInfo>::Iterator iter;
2721 Uint32 fragmentInfoCount = 0;
2722 c_fragmentInfoHash.first(iter);
2723
2724 while(!iter.isNull())
2725 {
2726 fragmentInfoCount++;
2727 c_fragmentInfoHash.next(iter);
2728 }
2729
2730 Ptr<FragmentSendInfo> ptr;
2731 Uint32 linSendInfoCount = 0;
2732
2733 c_linearFragmentSendList.first(ptr);
2734
2735 while (!ptr.isNull())
2736 {
2737 linSendInfoCount++;
2738 c_linearFragmentSendList.next(ptr);
2739 }
2740
2741 Uint32 segSendInfoCount = 0;
2742 c_segmentedFragmentSendList.first(ptr);
2743
2744 while (!ptr.isNull())
2745 {
2746 segSendInfoCount++;
2747 c_segmentedFragmentSendList.next(ptr);
2748 }
2749
2750 ndbout_c("%s : Fragment assembly hash entry count : %d",
2751 blockName,
2752 fragmentInfoCount);
2753
2754 ndbout_c("%s : Linear fragment send list size : %d",
2755 blockName,
2756 linSendInfoCount);
2757
2758 ndbout_c("%s : Segmented fragment send list size : %d",
2759 blockName,
2760 segSendInfoCount);
2761
2762 return fragmentInfoCount +
2763 linSendInfoCount +
2764 segSendInfoCount;
2765 }
2766
2767
2768 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,bool noRelease,Uint32 messageSize)2769 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
2770 NodeReceiverGroup rg,
2771 GlobalSignalNumber gsn,
2772 Signal* signal,
2773 Uint32 length,
2774 JobBufferLevel jbuf,
2775 SectionHandle* sections,
2776 bool noRelease,
2777 Uint32 messageSize) {
2778
2779 Uint32 noSections = sections->m_cnt;
2780 SegmentedSectionPtr * ptr = sections->m_ptr;
2781
2782 info.m_sectionPtr[0].m_segmented.i = RNIL;
2783 info.m_sectionPtr[1].m_segmented.i = RNIL;
2784 info.m_sectionPtr[2].m_segmented.i = RNIL;
2785
2786 Uint32 totalSize = 0;
2787 switch(noSections){
2788 case 3:
2789 info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2790 info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2791 totalSize += ptr[2].sz;
2792 case 2:
2793 info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2794 info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2795 totalSize += ptr[1].sz;
2796 case 1:
2797 info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2798 info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2799 totalSize += ptr[0].sz;
2800 }
2801
2802 if(totalSize <= messageSize + SectionSegment::DataLength){
2803 /**
2804 * Send signal directly
2805 */
2806 if (noRelease)
2807 sendSignalNoRelease(rg, gsn, signal, length, jbuf, sections);
2808 else
2809 sendSignal(rg, gsn, signal, length, jbuf, sections);
2810
2811 info.m_status = FragmentSendInfo::SendComplete;
2812 return true;
2813 }
2814
2815 /**
2816 * Setup info object
2817 */
2818 info.m_status = FragmentSendInfo::SendNotComplete;
2819 info.m_prio = (Uint8)jbuf;
2820 info.m_gsn = gsn;
2821 info.m_fragInfo = 1;
2822 info.m_flags = 0;
2823 info.m_messageSize = messageSize;
2824 info.m_fragmentId = c_fragmentIdCounter++;
2825 info.m_nodeReceiverGroup = rg;
2826 info.m_callback.m_callbackFunction = 0;
2827
2828 if (noRelease)
2829 {
2830 /* Record info that we are not releasing segments */
2831 info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2832 }
2833 else
2834 {
2835 /**
2836 * Clear sections in caller's handle. Actual send
2837 * will consume them
2838 */
2839 sections->m_cnt = 0;
2840 }
2841
2842 /* Store main signal data in a segment for sending later */
2843 Ptr<SectionSegment> tmp;
2844 if(!import(tmp, &signal->theData[0], length))
2845 {
2846 handle_out_of_longsignal_memory(0);
2847 return false;
2848 }
2849 info.m_theDataSection.p = &tmp.p->theData[0];
2850 info.m_theDataSection.sz = length;
2851 tmp.p->theData[length] = tmp.i;
2852
2853 sendNextSegmentedFragment(signal, info);
2854
2855 if(c_fragmentIdCounter == 0){
2856 /**
2857 * Fragment id 0 is invalid
2858 */
2859 c_fragmentIdCounter = 1;
2860 }
2861
2862 return true;
2863 }
2864
2865 #if 0
2866 #define lsout(x) x
2867 #else
2868 #define lsout(x)
2869 #endif
2870
2871 void
sendNextSegmentedFragment(Signal * signal,FragmentSendInfo & info)2872 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
2873 FragmentSendInfo & info){
2874
2875 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2876 {
2877 /* Send was cancelled - all dest. nodes have failed
2878 * since send was started
2879 */
2880 if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2881 {
2882 /*
2883 * Free any sections still to be sent
2884 */
2885 SectionHandle handle(this);
2886 for (Uint32 s = 0; s < 3; s++)
2887 {
2888 Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2889 if (sectionI != RNIL)
2890 {
2891 getSection(handle.m_ptr[handle.m_cnt], sectionI);
2892 info.m_sectionPtr[s].m_segmented.i = RNIL;
2893 info.m_sectionPtr[s].m_segmented.p = NULL;
2894 handle.m_cnt++;
2895 }
2896 }
2897
2898 releaseSections(handle);
2899 }
2900
2901 /* Free inline signal data storage section */
2902 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2903 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
2904
2905 info.m_status = FragmentSendInfo::SendComplete;
2906 return;
2907 }
2908
2909 /**
2910 * Setup main signal data from stored copy
2911 */
2912 const Uint32 sigLen = info.m_theDataSection.sz;
2913 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
2914
2915 Uint32 sz = 0;
2916 Uint32 maxSz = info.m_messageSize;
2917
2918 Int32 secNo = 2;
2919 Uint32 secCount = 0;
2920 Uint32 * secNos = &signal->theData[sigLen];
2921
2922 SectionHandle sections(this);
2923 SegmentedSectionPtr *ptr = sections.m_ptr;
2924
2925 bool split= false;
2926 Uint32 splitSectionStartI= RNIL;
2927 SectionSegment* splitSectionStartP= NULL;
2928 Uint32 splitSectionLastSegment= RNIL;
2929 Uint32 splitSectionSz= 0;
2930
2931 enum { Unknown = 0, Full = 1 } loop = Unknown;
2932 for(; secNo >= 0 && secCount < 3; secNo--){
2933 Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
2934 if(ptrI == RNIL)
2935 continue;
2936
2937 info.m_sectionPtr[secNo].m_segmented.i = RNIL;
2938
2939 SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p;
2940 const Uint32 size = ptrP->m_sz;
2941
2942 ptr[secCount].i = ptrI;
2943 ptr[secCount].p = ptrP;
2944 ptr[secCount].sz = size;
2945 secNos[secCount] = secNo;
2946 secCount++;
2947
2948 const Uint32 sizeLeft = maxSz - sz;
2949 if(size <= sizeLeft){
2950 /**
2951 * The section fits
2952 */
2953 sz += size;
2954 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
2955 continue;
2956 }
2957
2958 const Uint32 overflow = size - sizeLeft; // > 0
2959 if(overflow <= SectionSegment::DataLength){
2960 /**
2961 * Only one segment left to send
2962 * send even if sizeLeft <= size
2963 */
2964 lsout(ndbout_c("section %d saved as %d but full over: %d",
2965 secNo, secCount-1, overflow));
2966 secNo--;
2967 break;
2968 }
2969
2970 // size >= 61
2971 if(sizeLeft < SectionSegment::DataLength){
2972 /**
2973 * Less than one segment left (space)
2974 * dont bother sending
2975 */
2976 secCount--;
2977 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
2978 loop = Full;
2979 lsout(ndbout_c("section %d not saved", secNo));
2980 break;
2981 }
2982
2983 /**
2984 * Split list
2985 * 1) Find place to split
2986 * 2) Rewrite header (the part that will be sent)
2987 * 3) Write new header (for remaining part)
2988 * 4) Store new header on FragmentSendInfo - record
2989 */
2990 // size >= 61 && sizeLeft >= 60
2991 Uint32 sum = SectionSegment::DataLength;
2992 Uint32 prevPtrI = ptrI;
2993 ptrI = ptrP->m_nextSegment;
2994 const Uint32 fill = sizeLeft - SectionSegment::DataLength;
2995 while(sum < fill){
2996 prevPtrI = ptrI;
2997 ptrP = g_sectionSegmentPool.getPtr(ptrI);
2998 ptrI = ptrP->m_nextSegment;
2999 sum += SectionSegment::DataLength;
3000 }
3001
3002 Uint32 prev = secCount - 1;
3003 /**
3004 * Record details of the section pre-split
3005 * This allows the split to be 'healed' afterwards in the
3006 * no release case.
3007 */
3008 split= true;
3009 splitSectionStartI= ptr[prev].i;
3010 splitSectionStartP= ptr[prev].p;
3011 splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3012 splitSectionSz= splitSectionStartP->m_sz;
3013
3014 /**
3015 * Rewrite header w.r.t size and last
3016 * This is what will be sent in this fragment.
3017 */
3018 splitSectionStartP->m_lastSegment = prevPtrI;
3019 splitSectionStartP->m_sz = sum;
3020 ptr[prev].sz = sum;
3021
3022 /**
3023 * Write "new" list header
3024 * This is what remains to be sent in this section
3025 */
3026 ptrP = g_sectionSegmentPool.getPtr(ptrI);
3027 ptrP->m_lastSegment = splitSectionLastSegment;
3028 ptrP->m_sz = size - sum;
3029
3030 /**
3031 * And store it on info-record
3032 */
3033 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3034 info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3035
3036 loop = Full;
3037 lsout(ndbout_c("section %d split into %d", secNo, prev));
3038 break;
3039 }
3040
3041 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3042 loop, secNo, secCount, sz));
3043
3044 /**
3045 * Store fragment id
3046 */
3047 secNos[secCount] = info.m_fragmentId;
3048
3049 Uint32 fragInfo = info.m_fragInfo;
3050 info.m_fragInfo = 2;
3051 switch(loop){
3052 case Unknown:
3053 if(secNo >= 0){
3054 lsout(ndbout_c("Unknown - Full"));
3055 /**
3056 * Not finished
3057 */
3058 break;
3059 }
3060 // Fall through
3061 lsout(ndbout_c("Unknown - Done"));
3062 info.m_status = FragmentSendInfo::SendComplete;
3063 ndbassert(fragInfo == 2);
3064 fragInfo = 3;
3065 case Full:
3066 break;
3067 }
3068
3069 signal->header.m_fragmentInfo = fragInfo;
3070 signal->header.m_noOfSections = 0;
3071 sections.m_cnt = secCount;
3072
3073 if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3074 {
3075 sendSignalNoRelease(info.m_nodeReceiverGroup,
3076 info.m_gsn,
3077 signal,
3078 sigLen + secCount + 1,
3079 (JobBufferLevel)info.m_prio,
3080 §ions);
3081 /* NoRelease leaves SectionHandle populated, we'll
3082 * clear it here. The actual sections themselves
3083 * remain allocated.
3084 */
3085 sections.m_cnt = 0;
3086
3087 if (split)
3088 {
3089 /* There was a split section, which required us to modify the
3090 * segment list.
3091 * Now restore the split section's segment list back to
3092 * its previous state
3093 * (Only really required for first segment, but we do
3094 * it for all of them, to be a good citizen)
3095 */
3096 ndbrequire( splitSectionStartI != RNIL );
3097 ndbrequire( splitSectionStartP != NULL );
3098 ndbrequire( splitSectionLastSegment != RNIL );
3099
3100 splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3101 splitSectionStartP->m_sz= splitSectionSz;
3102
3103 /* Check our handiwork */
3104 assert(verifySection(splitSectionStartI));
3105 }
3106 }
3107 else
3108 {
3109 /* Normal, release sections case */
3110 sendSignal(info.m_nodeReceiverGroup,
3111 info.m_gsn,
3112 signal,
3113 sigLen + secCount + 1,
3114 (JobBufferLevel)info.m_prio,
3115 §ions);
3116 }
3117
3118 if(fragInfo == 3){
3119 /**
3120 * This is the last signal
3121 * Release saved 'main signal' words segment
3122 */
3123 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3124 }
3125 }
3126
3127 bool
sendFirstFragment(FragmentSendInfo & info,NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Uint32 messageSize)3128 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
3129 NodeReceiverGroup rg,
3130 GlobalSignalNumber gsn,
3131 Signal* signal,
3132 Uint32 length,
3133 JobBufferLevel jbuf,
3134 LinearSectionPtr ptr[3],
3135 Uint32 noOfSections,
3136 Uint32 messageSize){
3137
3138 check_sections(signal, signal->header.m_noOfSections, noOfSections);
3139
3140 info.m_sectionPtr[0].m_linear.p = NULL;
3141 info.m_sectionPtr[1].m_linear.p = NULL;
3142 info.m_sectionPtr[2].m_linear.p = NULL;
3143
3144 Uint32 totalSize = 0;
3145 switch(noOfSections){
3146 case 3:
3147 info.m_sectionPtr[2].m_linear = ptr[2];
3148 totalSize += ptr[2].sz;
3149 case 2:
3150 info.m_sectionPtr[1].m_linear = ptr[1];
3151 totalSize += ptr[1].sz;
3152 case 1:
3153 info.m_sectionPtr[0].m_linear = ptr[0];
3154 totalSize += ptr[0].sz;
3155 }
3156
3157 if(totalSize <= messageSize + SectionSegment::DataLength){
3158 /**
3159 * Send signal directly
3160 */
3161 sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3162 info.m_status = FragmentSendInfo::SendComplete;
3163
3164 /**
3165 * Indicate to sendLinearSignalFragment
3166 * that we'r already done
3167 */
3168 return true;
3169 }
3170
3171 /**
3172 * Setup info object
3173 */
3174 info.m_status = FragmentSendInfo::SendNotComplete;
3175 info.m_prio = (Uint8)jbuf;
3176 info.m_gsn = gsn;
3177 info.m_messageSize = messageSize;
3178 info.m_fragInfo = 1;
3179 info.m_flags = 0;
3180 info.m_fragmentId = c_fragmentIdCounter++;
3181 info.m_nodeReceiverGroup = rg;
3182 info.m_callback.m_callbackFunction = 0;
3183
3184 Ptr<SectionSegment> tmp;
3185 if(unlikely(!import(tmp, &signal->theData[0], length)))
3186 {
3187 handle_out_of_longsignal_memory(0);
3188 return false;
3189 }
3190
3191 info.m_theDataSection.p = &tmp.p->theData[0];
3192 info.m_theDataSection.sz = length;
3193 tmp.p->theData[length] = tmp.i;
3194
3195 sendNextLinearFragment(signal, info);
3196
3197 if(c_fragmentIdCounter == 0){
3198 /**
3199 * Fragment id 0 is invalid
3200 */
3201 c_fragmentIdCounter = 1;
3202 }
3203
3204 return true;
3205 }
3206
3207 void
sendNextLinearFragment(Signal * signal,FragmentSendInfo & info)3208 SimulatedBlock::sendNextLinearFragment(Signal* signal,
3209 FragmentSendInfo & info){
3210
3211 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3212 {
3213 /* Send was cancelled - all dest. nodes have failed
3214 * since send was started
3215 */
3216 /* Free inline signal data storage section */
3217 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3218 g_sectionSegmentPool.release(SB_SP_REL_ARG inlineDataI);
3219
3220 info.m_status = FragmentSendInfo::SendComplete;
3221 return;
3222 }
3223
3224 /**
3225 * Store "theData"
3226 */
3227 const Uint32 sigLen = info.m_theDataSection.sz;
3228 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3229
3230 Uint32 sz = 0;
3231 Uint32 maxSz = info.m_messageSize;
3232
3233 Int32 secNo = 2;
3234 Uint32 secCount = 0;
3235 Uint32 * secNos = &signal->theData[sigLen];
3236 LinearSectionPtr signalPtr[3];
3237
3238 enum { Unknown = 0, Full = 2 } loop = Unknown;
3239 for(; secNo >= 0 && secCount < 3; secNo--){
3240 Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3241 if(ptrP == NULL)
3242 continue;
3243
3244 info.m_sectionPtr[secNo].m_linear.p = NULL;
3245 const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3246
3247 signalPtr[secCount].p = ptrP;
3248 signalPtr[secCount].sz = size;
3249 secNos[secCount] = secNo;
3250 secCount++;
3251
3252 const Uint32 sizeLeft = maxSz - sz;
3253 if(size <= sizeLeft){
3254 /**
3255 * The section fits
3256 */
3257 sz += size;
3258 lsout(ndbout_c("section %d saved as %d", secNo, secCount-1));
3259 continue;
3260 }
3261
3262 const Uint32 overflow = size - sizeLeft; // > 0
3263 if(overflow <= SectionSegment::DataLength){
3264 /**
3265 * Only one segment left to send
3266 * send even if sizeLeft <= size
3267 */
3268 lsout(ndbout_c("section %d saved as %d but full over: %d",
3269 secNo, secCount-1, overflow));
3270 secNo--;
3271 break;
3272 }
3273
3274 // size >= 61
3275 if(sizeLeft < SectionSegment::DataLength){
3276 /**
3277 * Less than one segment left (space)
3278 * dont bother sending
3279 */
3280 secCount--;
3281 info.m_sectionPtr[secNo].m_linear.p = ptrP;
3282 loop = Full;
3283 lsout(ndbout_c("section %d not saved", secNo));
3284 break;
3285 }
3286
3287 /**
3288 * Split list
3289 * 1) Find place to split
3290 * 2) Rewrite header (the part that will be sent)
3291 * 3) Write new header (for remaining part)
3292 * 4) Store new header on FragmentSendInfo - record
3293 */
3294 Uint32 sum = sizeLeft;
3295 sum /= SectionSegment::DataLength;
3296 sum *= SectionSegment::DataLength;
3297
3298 /**
3299 * Rewrite header w.r.t size
3300 */
3301 Uint32 prev = secCount - 1;
3302 signalPtr[prev].sz = sum;
3303
3304 /**
3305 * Write/store "new" header
3306 */
3307 info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3308 info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3309
3310 loop = Full;
3311 lsout(ndbout_c("section %d split into %d", secNo, prev));
3312 break;
3313 }
3314
3315 lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d",
3316 loop, secNo, secCount, sz));
3317
3318 /**
3319 * Store fragment id
3320 */
3321 secNos[secCount] = info.m_fragmentId;
3322
3323 Uint32 fragInfo = info.m_fragInfo;
3324 info.m_fragInfo = 2;
3325 switch(loop){
3326 case Unknown:
3327 if(secNo >= 0){
3328 lsout(ndbout_c("Unknown - Full"));
3329 /**
3330 * Not finished
3331 */
3332 break;
3333 }
3334 // Fall through
3335 lsout(ndbout_c("Unknown - Done"));
3336 info.m_status = FragmentSendInfo::SendComplete;
3337 ndbassert(fragInfo == 2);
3338 fragInfo = 3;
3339 case Full:
3340 break;
3341 }
3342
3343 signal->header.m_noOfSections = 0;
3344 signal->header.m_fragmentInfo = fragInfo;
3345
3346 sendSignal(info.m_nodeReceiverGroup,
3347 info.m_gsn,
3348 signal,
3349 sigLen + secCount + 1,
3350 (JobBufferLevel)info.m_prio,
3351 signalPtr,
3352 secCount);
3353
3354 if(fragInfo == 3){
3355 /**
3356 * This is the last signal
3357 */
3358 g_sectionSegmentPool.release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3359 }
3360 }
3361
3362 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3363 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3364 GlobalSignalNumber gsn,
3365 Signal* signal,
3366 Uint32 length,
3367 JobBufferLevel jbuf,
3368 SectionHandle* sections,
3369 Callback & c,
3370 Uint32 messageSize){
3371 bool res = true;
3372 Ptr<FragmentSendInfo> tmp;
3373 res = c_segmentedFragmentSendList.seize(tmp);
3374 ndbrequire(res);
3375
3376 res = sendFirstFragment(* tmp.p,
3377 NodeReceiverGroup(ref),
3378 gsn,
3379 signal,
3380 length,
3381 jbuf,
3382 sections,
3383 false, // Release sections on send
3384 messageSize);
3385 ndbrequire(res);
3386
3387 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3388 c_segmentedFragmentSendList.release(tmp);
3389 if(c.m_callbackFunction != 0)
3390 execute(signal, c, 0);
3391 return;
3392 }
3393 tmp.p->m_callback = c;
3394
3395 if(!c_fragSenderRunning)
3396 {
3397 SaveSignal<2> save(signal);
3398 c_fragSenderRunning = true;
3399 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3400 sig->type = ContinueFragmented::CONTINUE_SENDING;
3401 sig->line = __LINE__;
3402 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3403 }
3404 }
3405
3406 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,SectionHandle * sections,Callback & c,Uint32 messageSize)3407 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3408 GlobalSignalNumber gsn,
3409 Signal* signal,
3410 Uint32 length,
3411 JobBufferLevel jbuf,
3412 SectionHandle * sections,
3413 Callback & c,
3414 Uint32 messageSize){
3415 bool res = true;
3416 Ptr<FragmentSendInfo> tmp;
3417 res = c_segmentedFragmentSendList.seize(tmp);
3418 ndbrequire(res);
3419
3420 res = sendFirstFragment(* tmp.p,
3421 rg,
3422 gsn,
3423 signal,
3424 length,
3425 jbuf,
3426 sections,
3427 false, // Release sections on send
3428 messageSize);
3429 ndbrequire(res);
3430
3431 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3432 c_segmentedFragmentSendList.release(tmp);
3433 if(c.m_callbackFunction != 0)
3434 execute(signal, c, 0);
3435 return;
3436 }
3437 tmp.p->m_callback = c;
3438
3439 if(!c_fragSenderRunning)
3440 {
3441 SaveSignal<2> save(signal);
3442 c_fragSenderRunning = true;
3443 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3444 sig->type = ContinueFragmented::CONTINUE_SENDING;
3445 sig->line = __LINE__;
3446 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3447 }
3448 }
3449
3450 SimulatedBlock::Callback SimulatedBlock::TheEmptyCallback = {0, 0};
3451 void
TheNULLCallbackFunction(class Signal *,Uint32,Uint32)3452 SimulatedBlock::TheNULLCallbackFunction(class Signal*, Uint32, Uint32)
3453 { abort(); /* should never be called */ }
3454 SimulatedBlock::Callback SimulatedBlock::TheNULLCallback =
3455 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3456
3457 void
sendFragmentedSignal(BlockReference ref,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3458 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3459 GlobalSignalNumber gsn,
3460 Signal* signal,
3461 Uint32 length,
3462 JobBufferLevel jbuf,
3463 LinearSectionPtr ptr[3],
3464 Uint32 noOfSections,
3465 Callback & c,
3466 Uint32 messageSize){
3467 bool res = true;
3468 Ptr<FragmentSendInfo> tmp;
3469 res = c_linearFragmentSendList.seize(tmp);
3470 ndbrequire(res);
3471
3472 res = sendFirstFragment(* tmp.p,
3473 NodeReceiverGroup(ref),
3474 gsn,
3475 signal,
3476 length,
3477 jbuf,
3478 ptr,
3479 noOfSections,
3480 messageSize);
3481 ndbrequire(res);
3482
3483 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3484 c_linearFragmentSendList.release(tmp);
3485 if(c.m_callbackFunction != 0)
3486 execute(signal, c, 0);
3487 return;
3488 }
3489 tmp.p->m_callback = c;
3490
3491 if(!c_fragSenderRunning)
3492 {
3493 SaveSignal<2> save(signal);
3494 c_fragSenderRunning = true;
3495 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3496 sig->type = ContinueFragmented::CONTINUE_SENDING;
3497 sig->line = __LINE__;
3498 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3499 }
3500 }
3501
3502 void
sendFragmentedSignal(NodeReceiverGroup rg,GlobalSignalNumber gsn,Signal * signal,Uint32 length,JobBufferLevel jbuf,LinearSectionPtr ptr[3],Uint32 noOfSections,Callback & c,Uint32 messageSize)3503 SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg,
3504 GlobalSignalNumber gsn,
3505 Signal* signal,
3506 Uint32 length,
3507 JobBufferLevel jbuf,
3508 LinearSectionPtr ptr[3],
3509 Uint32 noOfSections,
3510 Callback & c,
3511 Uint32 messageSize){
3512 bool res = true;
3513 Ptr<FragmentSendInfo> tmp;
3514 res = c_linearFragmentSendList.seize(tmp);
3515 ndbrequire(res);
3516
3517 res = sendFirstFragment(* tmp.p,
3518 rg,
3519 gsn,
3520 signal,
3521 length,
3522 jbuf,
3523 ptr,
3524 noOfSections,
3525 messageSize);
3526 ndbrequire(res);
3527
3528 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3529 c_linearFragmentSendList.release(tmp);
3530 if(c.m_callbackFunction != 0)
3531 execute(signal, c, 0);
3532 return;
3533 }
3534 tmp.p->m_callback = c;
3535
3536 if(!c_fragSenderRunning)
3537 {
3538 SaveSignal<2> save(signal);
3539 c_fragSenderRunning = true;
3540 ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
3541 sig->type = ContinueFragmented::CONTINUE_SENDING;
3542 sig->line = __LINE__;
3543 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3544 }
3545 }
3546
3547 NodeInfo &
setNodeInfo(NodeId nodeId)3548 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3549 ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3550 return globalData.m_nodeInfo[nodeId];
3551 }
3552
3553 bool
isMultiThreaded()3554 SimulatedBlock::isMultiThreaded()
3555 {
3556 #ifdef NDBD_MULTITHREADED
3557 return true;
3558 #else
3559 return false;
3560 #endif
3561 }
3562
3563
3564 void
execUTIL_CREATE_LOCK_REF(Signal * signal)3565 SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){
3566 ljamEntry();
3567 c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3568 }
3569
execUTIL_CREATE_LOCK_CONF(Signal * signal)3570 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){
3571 ljamEntry();
3572 c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3573 }
3574
execUTIL_DESTORY_LOCK_REF(Signal * signal)3575 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){
3576 ljamEntry();
3577 c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3578 }
3579
execUTIL_DESTORY_LOCK_CONF(Signal * signal)3580 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){
3581 ljamEntry();
3582 c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3583 }
3584
execUTIL_LOCK_REF(Signal * signal)3585 void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){
3586 ljamEntry();
3587 c_mutexMgr.execUTIL_LOCK_REF(signal);
3588 }
3589
execUTIL_LOCK_CONF(Signal * signal)3590 void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){
3591 ljamEntry();
3592 c_mutexMgr.execUTIL_LOCK_CONF(signal);
3593 }
3594
execUTIL_UNLOCK_REF(Signal * signal)3595 void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){
3596 ljamEntry();
3597 c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3598 }
3599
execUTIL_UNLOCK_CONF(Signal * signal)3600 void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){
3601 ljamEntry();
3602 c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3603 }
3604
3605 void
ignoreMutexUnlockCallback(Signal * signal,Uint32 ptrI,Uint32 retVal)3606 SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal,
3607 Uint32 ptrI, Uint32 retVal){
3608 c_mutexMgr.release(ptrI);
3609 }
3610
3611 void
fsRefError(Signal * signal,Uint32 line,const char * msg)3612 SimulatedBlock::fsRefError(Signal* signal, Uint32 line, const char *msg)
3613 {
3614 const FsRef *fsRef = (FsRef*)signal->getDataPtr();
3615 Uint32 errorCode = fsRef->errorCode;
3616 Uint32 osErrorCode = fsRef->osErrorCode;
3617 char msg2[100];
3618
3619 sprintf(msg2, "%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3620
3621 progError(line, errorCode, msg2);
3622 }
3623
3624 void
execFSWRITEREF(Signal * signal)3625 SimulatedBlock::execFSWRITEREF(Signal* signal)
3626 {
3627 fsRefError(signal, __LINE__, "File system write failed");
3628 }
3629
3630 void
execFSREADREF(Signal * signal)3631 SimulatedBlock::execFSREADREF(Signal* signal)
3632 {
3633 fsRefError(signal, __LINE__, "File system read failed");
3634 }
3635
3636 void
execFSCLOSEREF(Signal * signal)3637 SimulatedBlock::execFSCLOSEREF(Signal* signal)
3638 {
3639 fsRefError(signal, __LINE__, "File system close failed");
3640 }
3641
3642 void
execFSOPENREF(Signal * signal)3643 SimulatedBlock::execFSOPENREF(Signal* signal)
3644 {
3645 fsRefError(signal, __LINE__, "File system open failed");
3646 }
3647
3648 void
execFSREMOVEREF(Signal * signal)3649 SimulatedBlock::execFSREMOVEREF(Signal* signal)
3650 {
3651 fsRefError(signal, __LINE__, "File system remove failed");
3652 }
3653
3654 void
execFSSYNCREF(Signal * signal)3655 SimulatedBlock::execFSSYNCREF(Signal* signal)
3656 {
3657 fsRefError(signal, __LINE__, "File system sync failed");
3658 }
3659
3660 void
execFSAPPENDREF(Signal * signal)3661 SimulatedBlock::execFSAPPENDREF(Signal* signal)
3662 {
3663 fsRefError(signal, __LINE__, "File system append failed");
3664 }
3665
3666 #ifdef VM_TRACE
3667 static Ptr<void> * m_empty_global_variables[] = { 0 };
3668 void
disable_global_variables()3669 SimulatedBlock::disable_global_variables()
3670 {
3671 m_global_variables_save = m_global_variables;
3672 m_global_variables = m_empty_global_variables;
3673 }
3674
3675 void
enable_global_variables()3676 SimulatedBlock::enable_global_variables()
3677 {
3678 if (m_global_variables == m_empty_global_variables)
3679 {
3680 m_global_variables = m_global_variables_save;
3681 }
3682 }
3683
3684 void
clear_global_variables()3685 SimulatedBlock::clear_global_variables(){
3686 Ptr<void> ** tmp = m_global_variables;
3687 while(* tmp != 0){
3688 (* tmp)->i = RNIL;
3689 (* tmp)->p = 0;
3690 tmp++;
3691 }
3692 }
3693
3694 void
init_globals_list(void ** tmp,size_t cnt)3695 SimulatedBlock::init_globals_list(void ** tmp, size_t cnt){
3696 m_global_variables = new Ptr<void> * [cnt+1];
3697 for(size_t i = 0; i<cnt; i++){
3698 m_global_variables[i] = (Ptr<void>*)tmp[i];
3699 }
3700 m_global_variables[cnt] = 0;
3701 }
3702
3703 #endif
3704
3705 #include "KeyDescriptor.hpp"
3706
3707 Uint32
xfrm_key(Uint32 tab,const Uint32 * src,Uint32 * dst,Uint32 dstSize,Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3708 SimulatedBlock::xfrm_key(Uint32 tab, const Uint32* src,
3709 Uint32 *dst, Uint32 dstSize,
3710 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3711 {
3712 const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3713 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3714
3715 Uint32 i = 0;
3716 Uint32 srcPos = 0;
3717 Uint32 dstPos = 0;
3718 while (i < noOfKeyAttr)
3719 {
3720 const KeyDescriptor::KeyAttr& keyAttr = desc->keyAttr[i];
3721 Uint32 dstWords =
3722 xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3723 src, srcPos, dst, dstPos, dstSize);
3724 keyPartLen[i++] = dstWords;
3725 if (unlikely(dstWords == 0))
3726 return 0;
3727 }
3728
3729 if (0)
3730 {
3731 for(Uint32 i = 0; i<dstPos; i++)
3732 {
3733 printf("%.8x ", dst[i]);
3734 }
3735 printf("\n");
3736 }
3737 return dstPos;
3738 }
3739
3740 Uint32
xfrm_attr(Uint32 attrDesc,CHARSET_INFO * cs,const Uint32 * src,Uint32 & srcPos,Uint32 * dst,Uint32 & dstPos,Uint32 dstSize) const3741 SimulatedBlock::xfrm_attr(Uint32 attrDesc, CHARSET_INFO* cs,
3742 const Uint32* src, Uint32 & srcPos,
3743 Uint32* dst, Uint32 & dstPos, Uint32 dstSize) const
3744 {
3745 Uint32 array =
3746 AttributeDescriptor::getArrayType(attrDesc);
3747 Uint32 srcBytes =
3748 AttributeDescriptor::getSizeInBytes(attrDesc);
3749
3750 Uint32 srcWords = ~0;
3751 Uint32 dstWords = ~0;
3752 uchar* dstPtr = (uchar*)&dst[dstPos];
3753 const uchar* srcPtr = (const uchar*)&src[srcPos];
3754
3755 if (cs == NULL)
3756 {
3757 jam();
3758 Uint32 len;
3759 LINT_INIT(len);
3760 switch(array){
3761 case NDB_ARRAYTYPE_SHORT_VAR:
3762 len = 1 + srcPtr[0];
3763 break;
3764 case NDB_ARRAYTYPE_MEDIUM_VAR:
3765 len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3766 break;
3767 #ifndef VM_TRACE
3768 default:
3769 abort();
3770 #endif
3771 case NDB_ARRAYTYPE_FIXED:
3772 len = srcBytes;
3773 }
3774 srcWords = (len + 3) >> 2;
3775 dstWords = srcWords;
3776 memcpy(dstPtr, srcPtr, dstWords << 2);
3777
3778 if (0)
3779 {
3780 ndbout_c("srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3781 srcPos, dstPos, len, srcWords, dstWords);
3782
3783 for(Uint32 i = 0; i<srcWords; i++)
3784 printf("%.8x ", src[srcPos + i]);
3785 printf("\n");
3786 }
3787 }
3788 else
3789 {
3790 jam();
3791 Uint32 typeId =
3792 AttributeDescriptor::getType(attrDesc);
3793 Uint32 lb, len;
3794 bool ok = NdbSqlUtil::get_var_length(typeId, srcPtr, srcBytes, lb, len);
3795 if (unlikely(!ok))
3796 return 0;
3797 Uint32 xmul = cs->strxfrm_multiply;
3798 if (xmul == 0)
3799 xmul = 1;
3800 /*
3801 * Varchar end-spaces are ignored in comparisons. To get same hash
3802 * we blank-pad to maximum length via strnxfrm.
3803 */
3804 Uint32 dstLen = xmul * (srcBytes - lb);
3805 ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3806 int n = NdbSqlUtil::strnxfrm_bug7284(cs, dstPtr, dstLen, srcPtr + lb, len);
3807 if (unlikely(n == -1))
3808 return 0;
3809 while ((n & 3) != 0)
3810 {
3811 dstPtr[n++] = 0;
3812 }
3813 dstWords = (n >> 2);
3814 srcWords = (lb + len + 3) >> 2;
3815 }
3816
3817 dstPos += dstWords;
3818 srcPos += srcWords;
3819 return dstWords;
3820 }
3821
3822 Uint32
create_distr_key(Uint32 tableId,const Uint32 * src,Uint32 * dst,const Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const3823 SimulatedBlock::create_distr_key(Uint32 tableId,
3824 const Uint32 *src,
3825 Uint32* dst,
3826 const Uint32
3827 keyPartLen[MAX_ATTRIBUTES_IN_INDEX]) const
3828 {
3829 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3830 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3831 Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3832
3833 Uint32 i = 0;
3834 Uint32 dstPos = 0;
3835
3836 /* --Note that src and dst may be the same location-- */
3837
3838 if(keyPartLen)
3839 {
3840 while (i < noOfKeyAttr && noOfDistrKeys)
3841 {
3842 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3843 Uint32 len = keyPartLen[i];
3844 if(AttributeDescriptor::getDKey(attr))
3845 {
3846 noOfDistrKeys--;
3847 memmove(dst+dstPos, src, len << 2);
3848 dstPos += len;
3849 }
3850 src += len;
3851 i++;
3852 }
3853 }
3854 else
3855 {
3856 while (i < noOfKeyAttr && noOfDistrKeys)
3857 {
3858 Uint32 attr = desc->keyAttr[i].attributeDescriptor;
3859 Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3860 ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3861 if(AttributeDescriptor::getDKey(attr))
3862 {
3863 noOfDistrKeys--;
3864 memmove(dst+dstPos, src, len << 2);
3865 dstPos += len;
3866 }
3867 src += len;
3868 i++;
3869 }
3870 }
3871 return dstPos;
3872 }
3873
3874 CArray<KeyDescriptor> g_key_descriptor_pool;
3875
3876 void
sendRoutedSignal(RoutePath path[],Uint32 pathcnt,Uint32 dst[],Uint32 dstcnt,Uint32 gsn,Signal * signal,Uint32 sigLen,JobBufferLevel prio,SectionHandle * userhandle)3877 SimulatedBlock::sendRoutedSignal(RoutePath path[], Uint32 pathcnt,
3878 Uint32 dst[],
3879 Uint32 dstcnt,
3880 Uint32 gsn,
3881 Signal * signal,
3882 Uint32 sigLen,
3883 JobBufferLevel prio,
3884 SectionHandle * userhandle)
3885 {
3886 ndbrequire(pathcnt > 0); // don't support (now) directly multi-cast
3887 pathcnt--; // first hop is made from here
3888
3889
3890 Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3891 ndbrequire(len <= 25);
3892
3893 SectionHandle handle(this, signal);
3894 if (userhandle)
3895 {
3896 ljam();
3897 handle.m_cnt = userhandle->m_cnt;
3898 for (Uint32 i = 0; i<handle.m_cnt; i++)
3899 handle.m_ptr[i] = userhandle->m_ptr[i];
3900 userhandle->m_cnt = 0;
3901 }
3902
3903 if (len + sigLen > 25)
3904 {
3905 ljam();
3906
3907 /**
3908 * we need to store theData in a section
3909 */
3910 ndbrequire(handle.m_cnt < 3);
3911 handle.m_ptr[2] = handle.m_ptr[1];
3912 handle.m_ptr[1] = handle.m_ptr[0];
3913 Ptr<SectionSegment> tmp;
3914 if (unlikely(! import(tmp, signal->theData, sigLen)))
3915 {
3916 handle_out_of_longsignal_memory(0);
3917 }
3918 handle.m_ptr[0].p = tmp.p;
3919 handle.m_ptr[0].i = tmp.i;
3920 handle.m_ptr[0].sz = sigLen;
3921 handle.m_cnt ++;
3922 }
3923 else
3924 {
3925 ljam();
3926 memmove(signal->theData + len, signal->theData, 4 * sigLen);
3927 len += sigLen;
3928 }
3929
3930 LocalRouteOrd * ord = (LocalRouteOrd*)signal->getDataPtrSend();
3931 ord->cnt = (pathcnt << 16) | (dstcnt);
3932 ord->gsn = gsn;
3933 ord->prio = Uint32(prio);
3934
3935 Uint32 * dstptr = ord->path;
3936 for (Uint32 i = 1; i <= pathcnt; i++)
3937 {
3938 ndbrequire(refToNode(path[i].ref) == 0 ||
3939 refToNode(path[i].ref) == getOwnNodeId());
3940
3941 * dstptr++ = path[i].ref;
3942 * dstptr++ = Uint32(path[i].prio);
3943 }
3944
3945 for (Uint32 i = 0; i<dstcnt; i++)
3946 {
3947 ndbrequire(refToNode(dst[i]) == 0 ||
3948 refToNode(dst[i]) == getOwnNodeId());
3949
3950 * dstptr++ = dst[i];
3951 }
3952
3953 sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
3954 path[0].prio, &handle);
3955 }
3956
3957 void
execLOCAL_ROUTE_ORD(Signal * signal)3958 SimulatedBlock::execLOCAL_ROUTE_ORD(Signal* signal)
3959 {
3960 ljamEntry();
3961
3962 if (!assembleFragments(signal))
3963 {
3964 ljam();
3965 return;
3966 }
3967
3968 if (ERROR_INSERTED(1001))
3969 {
3970 /**
3971 * This NDBCNTR error code 1001
3972 */
3973 ljam();
3974 SectionHandle handle(this, signal);
3975 sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
3976 signal->getLength(), &handle);
3977 return;
3978 }
3979
3980 LocalRouteOrd* ord = (LocalRouteOrd*)signal->getDataPtr();
3981 Uint32 pathcnt = ord->cnt >> 16;
3982 Uint32 dstcnt = ord->cnt & 0xFFFF;
3983 Uint32 sigLen = signal->getLength();
3984
3985 if (pathcnt == 0)
3986 {
3987 /**
3988 * Send to final destination(s);
3989 */
3990 ljam();
3991 Uint32 gsn = ord->gsn;
3992 Uint32 prio = ord->prio;
3993 memcpy(signal->theData+25, ord->path, 4*dstcnt);
3994 SectionHandle handle(this, signal);
3995 if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
3996 {
3997 ljam();
3998 /**
3999 * Data is at end of this...
4000 */
4001 memmove(signal->theData,
4002 signal->theData + LocalRouteOrd::StaticLen + dstcnt,
4003 4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
4004 sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4005 }
4006 else
4007 {
4008 ljam();
4009 /**
4010 * Put section 0 in signal->theData
4011 */
4012 sigLen = handle.m_ptr[0].sz;
4013 ndbrequire(sigLen <= 25);
4014 copy(signal->theData, handle.m_ptr[0]);
4015 release(handle.m_ptr[0]);
4016
4017 for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4018 handle.m_ptr[i] = handle.m_ptr[i+1];
4019 handle.m_cnt--;
4020 }
4021
4022 /*
4023 * The extra if-statement is as sendSignalNoRelease will copy sections
4024 * which is not necessary is only sending to one destination
4025 */
4026 if (dstcnt > 1)
4027 {
4028 jam();
4029 for (Uint32 i = 0; i<dstcnt; i++)
4030 {
4031 ljam();
4032 sendSignalNoRelease(signal->theData[25+i], gsn, signal, sigLen,
4033 JobBufferLevel(prio), &handle);
4034 }
4035 releaseSections(handle);
4036 }
4037 else
4038 {
4039 jam();
4040 sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4041 JobBufferLevel(prio), &handle);
4042 }
4043 }
4044 else
4045 {
4046 /**
4047 * Reroute
4048 */
4049 ljam();
4050 SectionHandle handle(this, signal);
4051 Uint32 ref = ord->path[0];
4052 Uint32 prio = ord->path[1];
4053 Uint32 len = sigLen - 2;
4054 ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4055 memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4056 sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4057 JobBufferLevel(prio), &handle);
4058 }
4059 }
4060
4061
4062 #ifdef VM_TRACE
4063 bool
debugOutOn()4064 SimulatedBlock::debugOutOn()
4065 {
4066 SignalLoggerManager::LogMode mask = SignalLoggerManager::LogInOut;
4067 return
4068 globalData.testOn &&
4069 globalSignalLoggers.logMatch(number(), mask);
4070 }
4071
4072 const char*
debugOutTag(char * buf,int line)4073 SimulatedBlock::debugOutTag(char *buf, int line)
4074 {
4075 char blockbuf[40];
4076 char instancebuf[40];
4077 char linebuf[40];
4078 char timebuf[40];
4079 sprintf(blockbuf, "%s", getBlockName(number(), "UNKNOWN"));
4080 instancebuf[0] = 0;
4081 if (instance() != 0)
4082 sprintf(instancebuf, "/%u", instance());
4083 sprintf(linebuf, " %d", line);
4084 timebuf[0] = 0;
4085 #ifdef VM_TRACE_TIME
4086 {
4087 NDB_TICKS t = NdbTick_CurrentMillisecond();
4088 uint s = (t / 1000) % 3600;
4089 uint ms = t % 1000;
4090 sprintf(timebuf, " - %u.%03u -", s, ms);
4091 }
4092 #endif
4093 sprintf(buf, "%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4094 return buf;
4095 }
4096 #endif
4097
4098 void
synchronize_threads_for_blocks(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4099 SimulatedBlock::synchronize_threads_for_blocks(Signal * signal,
4100 const Uint32 blocks[],
4101 const Callback & cb,
4102 JobBufferLevel prio)
4103 {
4104 #ifndef NDBD_MULTITHREADED
4105 Callback copy = cb;
4106 execute(signal, copy, 0);
4107 #else
4108 ljam();
4109 Uint32 ref[32]; // max threads
4110 Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4111 ref, NDB_ARRAY_SIZE(ref));
4112 if (cnt == 0)
4113 {
4114 ljam();
4115 Callback copy = cb;
4116 execute(signal, copy, 0);
4117 return;
4118 }
4119
4120 Ptr<SyncThreadRecord> ptr;
4121 ndbrequire(c_syncThreadPool.seize(ptr));
4122 ptr.p->m_cnt = cnt;
4123 ptr.p->m_callback = cb;
4124
4125 signal->theData[0] = reference();
4126 signal->theData[1] = ptr.i;
4127 signal->theData[2] = Uint32(prio);
4128 for (Uint32 i = 0; i<cnt; i++)
4129 {
4130 sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4131 }
4132 #endif
4133 }
4134
4135 void
execSYNC_THREAD_REQ(Signal * signal)4136 SimulatedBlock::execSYNC_THREAD_REQ(Signal* signal)
4137 {
4138 ljamEntry();
4139 Uint32 ref = signal->theData[0];
4140 Uint32 prio = signal->theData[2];
4141 sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4142 JobBufferLevel(prio));
4143 }
4144
4145 void
execSYNC_THREAD_CONF(Signal * signal)4146 SimulatedBlock::execSYNC_THREAD_CONF(Signal* signal)
4147 {
4148 ljamEntry();
4149 Ptr<SyncThreadRecord> ptr;
4150 c_syncThreadPool.getPtr(ptr, signal->theData[1]);
4151 if (ptr.p->m_cnt == 1)
4152 {
4153 ljam();
4154 Callback copy = ptr.p->m_callback;
4155 c_syncThreadPool.release(ptr);
4156 execute(signal, copy, 0);
4157 return;
4158 }
4159 ptr.p->m_cnt --;
4160 }
4161
4162 void
execSYNC_REQ(Signal * signal)4163 SimulatedBlock::execSYNC_REQ(Signal* signal)
4164 {
4165 ljamEntry();
4166 Uint32 ref = signal->theData[0];
4167 Uint32 prio = signal->theData[2];
4168 sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4169 JobBufferLevel(prio));
4170 }
4171
4172 void
synchronize_path(Signal * signal,const Uint32 blocks[],const Callback & cb,JobBufferLevel prio)4173 SimulatedBlock::synchronize_path(Signal * signal,
4174 const Uint32 blocks[],
4175 const Callback & cb,
4176 JobBufferLevel prio)
4177 {
4178 ljam();
4179
4180 // reuse SyncThreadRecord
4181 Ptr<SyncThreadRecord> ptr;
4182 ndbrequire(c_syncThreadPool.seize(ptr));
4183 ptr.p->m_cnt = 0; // with count of 0
4184 ptr.p->m_callback = cb;
4185
4186 SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4187 req->senderData = ptr.i;
4188 req->prio = Uint32(prio);
4189 req->count = 1;
4190 if (blocks[0] == 0)
4191 {
4192 ljam();
4193 ndbrequire(false); // TODO
4194 }
4195 else
4196 {
4197 ljam();
4198 Uint32 len = 0;
4199 for (; blocks[len+1] != 0; len++)
4200 {
4201 req->path[len] = blocks[len+1];
4202 }
4203 req->pathlen = 1 + len;
4204 req->path[len] = reference();
4205 sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4206 GSN_SYNC_PATH_REQ, signal,
4207 SyncPathReq::SignalLength + (1 + len), prio);
4208 }
4209 }
4210
4211 void
execSYNC_PATH_REQ(Signal * signal)4212 SimulatedBlock::execSYNC_PATH_REQ(Signal* signal)
4213 {
4214 ljamEntry();
4215 SyncPathReq * req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
4216 if (req->pathlen == 1)
4217 {
4218 ljam();
4219 SyncPathReq copy = *req;
4220 SyncPathConf* conf = CAST_PTR(SyncPathConf, signal->getDataPtrSend());
4221 conf->senderData = copy.senderData;
4222 conf->count = copy.count;
4223 sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4224 SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4225 }
4226 else
4227 {
4228 ljam();
4229 Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4230 req->pathlen--;
4231 memmove(req->path, req->path + 1, 4 * req->pathlen);
4232 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4233 SyncPathReq::SignalLength + (1 + req->pathlen),
4234 JobBufferLevel(req->prio));
4235 }
4236 }
4237
4238 void
execSYNC_PATH_CONF(Signal * signal)4239 SimulatedBlock::execSYNC_PATH_CONF(Signal* signal)
4240 {
4241 ljamEntry();
4242 SyncPathConf conf = * CAST_CONSTPTR(SyncPathConf, signal->getDataPtr());
4243 Ptr<SyncThreadRecord> ptr;
4244
4245 c_syncThreadPool.getPtr(ptr, conf.senderData);
4246
4247 if (ptr.p->m_cnt == 0)
4248 {
4249 ljam();
4250 ptr.p->m_cnt = conf.count;
4251 }
4252
4253 if (ptr.p->m_cnt == 1)
4254 {
4255 ljam();
4256 Callback copy = ptr.p->m_callback;
4257 c_syncThreadPool.release(ptr);
4258 execute(signal, copy, 0);
4259 return;
4260 }
4261
4262 ptr.p->m_cnt --;
4263 }
4264
4265
4266 bool
checkNodeFailSequence(Signal * signal)4267 SimulatedBlock::checkNodeFailSequence(Signal* signal)
4268 {
4269 Uint32 ref = signal->getSendersBlockRef();
4270
4271 /**
4272 * Make sure that a signal being part of node-failure handling
4273 * from a remote node, does not get to us before we got the NODE_FAILREP
4274 * (this to avoid tricky state handling)
4275 *
4276 * To ensure this, we send the signal via QMGR (GSN_COMMIT_FAILREQ)
4277 * and NDBCNTR (which sends NODE_FAILREP)
4278 *
4279 * The extra time should be negilable
4280 *
4281 * Note, make an exception for signals sent by our self
4282 * as they are only sent as a consequence of NODE_FAILREP
4283 */
4284 if (ref == reference() ||
4285 (refToNode(ref) == getOwnNodeId() &&
4286 refToMain(ref) == NDBCNTR))
4287 {
4288 ljam();
4289 return true;
4290 }
4291
4292 RoutePath path[2];
4293 path[0].ref = QMGR_REF;
4294 path[0].prio = JBB;
4295 path[1].ref = NDBCNTR_REF;
4296 path[1].prio = JBB;
4297
4298 Uint32 dst[1];
4299 dst[0] = reference();
4300
4301 SectionHandle handle(this, signal);
4302 Uint32 gsn = signal->header.theVerId_signalNumber;
4303 Uint32 len = signal->getLength();
4304
4305 sendRoutedSignal(path, 2, dst, 1, gsn, signal, len, JBB, &handle);
4306 return false;
4307 }
4308
4309 void
setup_wakeup()4310 SimulatedBlock::setup_wakeup()
4311 {
4312 #ifdef NDBD_MULTITHREADED
4313 #else
4314 globalTransporterRegistry.setup_wakeup_socket();
4315 #endif
4316 }
4317
4318 void
wakeup()4319 SimulatedBlock::wakeup()
4320 {
4321 #ifdef NDBD_MULTITHREADED
4322 mt_wakeup(this);
4323 #else
4324 globalTransporterRegistry.wakeup();
4325 #endif
4326 }
4327
4328
4329 void
ndbinfo_send_row(Signal * signal,const DbinfoScanReq & req,const Ndbinfo::Row & row,Ndbinfo::Ratelimit & rl) const4330 SimulatedBlock::ndbinfo_send_row(Signal* signal,
4331 const DbinfoScanReq& req,
4332 const Ndbinfo::Row& row,
4333 Ndbinfo::Ratelimit& rl) const
4334 {
4335 // Check correct number of columns against table
4336 assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4337
4338 TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
4339 tidai->connectPtr= req.resultData;
4340 tidai->transId[0]= req.transId[0];
4341 tidai->transId[1]= req.transId[1];
4342
4343 LinearSectionPtr ptr[3];
4344 ptr[0].p = row.getDataPtr();
4345 ptr[0].sz = row.getLength();
4346
4347 rl.rows++;
4348 rl.bytes += row.getLength();
4349
4350 sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4351 signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4352 }
4353
4354
4355 void
ndbinfo_send_scan_break(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl,Uint32 data1,Uint32 data2,Uint32 data3,Uint32 data4) const4356 SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
4357 DbinfoScanReq& req,
4358 const Ndbinfo::Ratelimit& rl,
4359 Uint32 data1, Uint32 data2,
4360 Uint32 data3, Uint32 data4) const
4361 {
4362 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4363 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4364 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4365
4366 conf->returnedRows = rl.rows;
4367
4368 // Update the cursor with current item number
4369 Ndbinfo::ScanCursor* cursor =
4370 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4371
4372 cursor->data[0] = data1;
4373 cursor->data[1] = data2;
4374 cursor->data[2] = data3;
4375 cursor->data[3] = data4;
4376
4377 // Increase number of rows and bytes sent to far
4378 cursor->totalRows += rl.rows;
4379 cursor->totalBytes += rl.bytes;
4380
4381 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
4382
4383 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4384 signal_length, JBB);
4385 }
4386
4387 void
ndbinfo_send_scan_conf(Signal * signal,DbinfoScanReq & req,const Ndbinfo::Ratelimit & rl) const4388 SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
4389 DbinfoScanReq& req,
4390 const Ndbinfo::Ratelimit& rl) const
4391 {
4392 DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
4393 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4394 Uint32 sender_ref = req.resultRef;
4395 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4396
4397 conf->returnedRows = rl.rows;
4398
4399 if (req.cursor_sz)
4400 {
4401 jam();
4402 // Update the cursor with current item number
4403 Ndbinfo::ScanCursor* cursor =
4404 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
4405
4406 // Reset all data holders
4407 memset(cursor->data, 0, sizeof(cursor->data));
4408
4409 // Increase number of rows and bytes sent to far
4410 cursor->totalRows += rl.rows;
4411 cursor->totalBytes += rl.bytes;
4412
4413 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
4414
4415 sender_ref = cursor->senderRef;
4416
4417 }
4418 sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4419 signal_length, JBB);
4420 }
4421
4422 #ifdef VM_TRACE
4423 void
assertOwnThread()4424 SimulatedBlock::assertOwnThread()
4425 {
4426 #ifdef NDBD_MULTITHREADED
4427 mt_assert_own_thread(this);
4428 #endif
4429 }
4430
4431 #endif
4432