1 /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 #include <mt.hpp>
24 #include "LocalProxy.hpp"
25 #include <pgman.hpp>
26
27 #include <signaldata/RouteOrd.hpp>
28
29 //#define DBINFO_SCAN_TRACE
30 #ifdef DBINFO_SCAN_TRACE
31 #include <debugger/DebuggerNames.hpp>
32 #endif
33 #include <NdbGetRUsage.h>
34 #include <EventLogger.hpp>
35
36 #define JAM_FILE_ID 437
37
38 extern EventLogger *g_eventLogger;
39
LocalProxy(BlockNumber blockNumber,Block_context & ctx)40 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
41 SimulatedBlock(blockNumber, ctx)
42 {
43 BLOCK_CONSTRUCTOR(LocalProxy);
44
45 ndbrequire(instance() == 0); // this is main block
46 c_workers = 0;
47 Uint32 i;
48 for (i = 0; i < MaxWorkers; i++)
49 c_worker[i] = 0;
50
51 c_anyWorkerCounter = 0;
52 c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
53 c_masterNodeId = ZNIL;
54
55 // GSN_READ_CONFIG_REQ
56 addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
57 addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
58
59 // GSN_STTOR
60 addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
61 addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
62
63 // GSN_NDB_STTOR
64 addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
65 addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
66
67 // GSN_READ_NODESREQ
68 addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
69 addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
70
71 // GSN_NODE_FAILREP
72 addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
73 addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
74
75 // GSN_INCL_NODEREQ
76 addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
77 addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
78
79 // GSN_NODE_STATE_REP
80 addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP, true);
81
82 // GSN_CHANGE_NODE_STATE_REQ
83 addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ, true);
84 addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
85
86 // GSN_DUMP_STATE_ORD
87 addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
88
89 // GSN_NDB_TAMPER
90 addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
91
92 // GSN_TIME_SIGNAL
93 addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
94
95 // GSN_CREATE_TRIG_IMPL_REQ
96 addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
97 addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
98 addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
99
100 // GSN_DROP_TRIG_IMPL_REQ
101 addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
102 addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
103 addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
104
105 // GSN_DBINFO_SCANREQ
106 addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
107 addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
108
109 // GSN_SYNC_REQ
110 addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ, true);
111 addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
112 addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
113
114 // GSN_SYNC_PATH_REQ
115 addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ, true);
116
117 // GSN_API_FAILREQ
118 addRecSignal(GSN_API_FAILREQ, &LocalProxy::execAPI_FAILREQ);
119 addRecSignal(GSN_API_FAILCONF, &LocalProxy::execAPI_FAILCONF);
120 }
121
~LocalProxy()122 LocalProxy::~LocalProxy()
123 {
124 // dtor of main block deletes workers
125 }
126
127 // support routines
128
129 void
sendREQ(Signal * signal,SsSequential & ss)130 LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
131 {
132 jam();
133 ss.m_worker = 0;
134 ndbrequire(ss.m_sendREQ != 0);
135 SectionHandle handle(this);
136 restoreHandle(handle, ss);
137 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
138 saveSections(ss, handle);
139 }
140
141 void
recvCONF(Signal * signal,SsSequential & ss)142 LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
143 {
144 jam();
145 ndbrequire(ss.m_sendCONF != 0);
146 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
147
148 ss.m_worker++;
149 if (ss.m_worker < c_workers) {
150 jam();
151 ndbrequire(ss.m_sendREQ != 0);
152 SectionHandle handle(this);
153 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
154 return;
155 }
156 }
157
158 void
recvREF(Signal * signal,SsSequential & ss,Uint32 error)159 LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
160 {
161 jam();
162 ndbrequire(error != 0);
163 if (ss.m_error == 0)
164 {
165 jam();
166 ss.m_error = error;
167 }
168 recvCONF(signal, ss);
169 }
170
171 void
skipReq(SsSequential & ss)172 LocalProxy::skipReq(SsSequential& ss)
173 {
174 jam();
175 }
176
177 void
skipConf(SsSequential & ss)178 LocalProxy::skipConf(SsSequential& ss)
179 {
180 jam();
181 }
182
183 void
saveSections(SsCommon & ss,SectionHandle & handle)184 LocalProxy::saveSections(SsCommon& ss, SectionHandle & handle)
185 {
186 jam();
187 ss.m_sec_cnt = handle.m_cnt;
188 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
189 {
190 jam();
191 ss.m_sec_ptr[i] = handle.m_ptr[i].i;
192 }
193 handle.clear();
194 }
195
196 void
restoreHandle(SectionHandle & handle,SsCommon & ss)197 LocalProxy::restoreHandle(SectionHandle & handle, SsCommon& ss)
198 {
199 handle.m_cnt = ss.m_sec_cnt;
200 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
201 {
202 jam();
203 handle.m_ptr[i].i = ss.m_sec_ptr[i];
204 }
205
206 getSections(handle.m_cnt, handle.m_ptr);
207 ss.m_sec_cnt = 0;
208 }
209
210 bool
firstReply(const SsSequential & ss)211 LocalProxy::firstReply(const SsSequential& ss)
212 {
213 return ss.m_worker == 0;
214 }
215
216 bool
lastReply(const SsSequential & ss)217 LocalProxy::lastReply(const SsSequential& ss)
218 {
219 return ss.m_worker + 1 == c_workers;
220 }
221
222 void
sendREQ(Signal * signal,SsParallel & ss,bool skipLast)223 LocalProxy::sendREQ(Signal* signal, SsParallel& ss, bool skipLast)
224 {
225 jam();
226 ndbrequire(ss.m_sendREQ != 0);
227
228 ss.m_workerMask.clear();
229 ss.m_worker = 0;
230 const Uint32 count = skipLast ? c_workers - 1 : c_workers;
231 SectionHandle handle(this);
232 restoreHandle(handle, ss);
233 while (ss.m_worker < count) {
234 jam();
235 ss.m_workerMask.set(ss.m_worker);
236 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
237 ss.m_worker++;
238 }
239 releaseSections(handle);
240 }
241
242 void
recvCONF(Signal * signal,SsParallel & ss)243 LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
244 {
245 jam();
246 ndbrequire(ss.m_sendCONF != 0);
247
248 BlockReference ref = signal->getSendersBlockRef();
249 ndbrequire(refToMain(ref) == number());
250
251 Uint32 ino = refToInstance(ref);
252 ss.m_worker = workerIndex(ino);
253 ndbrequire(ref == workerRef(ss.m_worker));
254 ndbrequire(ss.m_worker < c_workers);
255 ndbrequire(ss.m_workerMask.get(ss.m_worker));
256 ss.m_workerMask.clear(ss.m_worker);
257
258 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
259 }
260
261 void
recvREF(Signal * signal,SsParallel & ss,Uint32 error)262 LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
263 {
264 jam();
265 ndbrequire(error != 0);
266 if (ss.m_error == 0)
267 {
268 jam();
269 ss.m_error = error;
270 }
271 recvCONF(signal, ss);
272 }
273
274 void
skipReq(SsParallel & ss)275 LocalProxy::skipReq(SsParallel& ss)
276 {
277 jam();
278 ndbrequire(ss.m_workerMask.get(ss.m_worker));
279 ss.m_workerMask.clear(ss.m_worker);
280 }
281
282 // more replies expected from this worker
283 void
skipConf(SsParallel & ss)284 LocalProxy::skipConf(SsParallel& ss)
285 {
286 jam();
287 ndbrequire(!ss.m_workerMask.get(ss.m_worker));
288 ss.m_workerMask.set(ss.m_worker);
289 }
290
291 bool
firstReply(const SsParallel & ss)292 LocalProxy::firstReply(const SsParallel& ss)
293 {
294 const WorkerMask& mask = ss.m_workerMask;
295 const Uint32 count = mask.count();
296
297 // recvCONF has cleared current worker
298 ndbrequire(ss.m_worker < c_workers);
299 ndbrequire(!mask.get(ss.m_worker));
300 ndbrequire(count < c_workers);
301 return count + 1 == c_workers;
302 }
303
304 bool
lastReply(const SsParallel & ss)305 LocalProxy::lastReply(const SsParallel& ss)
306 {
307 return ss.m_workerMask.isclear();
308 }
309
310 // used in "reverse" proxying (start with worker REQs)
311 void
setMask(SsParallel & ss)312 LocalProxy::setMask(SsParallel& ss)
313 {
314 Uint32 i;
315 jam();
316 for (i = 0; i < c_workers; i++)
317 {
318 jam();
319 ss.m_workerMask.set(i);
320 }
321 }
322
323 void
setMask(SsParallel & ss,const WorkerMask & mask)324 LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
325 {
326 ss.m_workerMask.assign(mask);
327 }
328
329 // load workers (before first signal)
330
331 void
loadWorkers()332 LocalProxy::loadWorkers()
333 {
334 c_workers = mt_get_instance_count(number());
335 for (Uint32 i = 0; i < c_workers; i++)
336 {
337 jamNoBlock();
338 Uint32 instanceNo = workerInstance(i);
339
340 SimulatedBlock* worker = newWorker(instanceNo);
341 ndbrequire(worker->instance() == instanceNo);
342 ndbrequire(this->getInstance(instanceNo) == worker);
343 c_worker[i] = worker;
344
345 if (number() == PGMAN && i == (c_workers - 1))
346 {
347 ((Pgman*)worker)->init_extra_pgman();
348 }
349 mt_add_thr_map(number(), instanceNo);
350 }
351 }
352
353 void
forwardToWorkerIndex(Signal * signal,Uint32 index)354 LocalProxy::forwardToWorkerIndex(Signal* signal, Uint32 index)
355 {
356 jam();
357 /**
358 * We statelessly forward to one of our
359 * workers, including any sections that
360 * might be attached.
361 */
362 BlockReference destRef = workerRef(index);
363 SectionHandle sh(this, signal);
364
365 sendSignal(destRef,
366 signal->header.theVerId_signalNumber,
367 signal,
368 signal->getLength(),
369 JBB,
370 &sh);
371 }
372
373 void
forwardToAnyWorker(Signal * signal)374 LocalProxy::forwardToAnyWorker(Signal* signal)
375 {
376 jam();
377
378 /* Won't work for fragmented signals */
379 ndbassert(signal->header.m_fragmentInfo == 0);
380
381 forwardToWorkerIndex(signal, getAnyWorkerIndex());
382 }
383
384 // GSN_READ_CONFIG_REQ
385
386 void
execREAD_CONFIG_REQ(Signal * signal)387 LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
388 {
389 jam();
390 Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
391
392 const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
393 ss.m_req = *req;
394 ndbrequire(ss.m_req.noOfParameters == 0);
395 callREAD_CONFIG_REQ(signal);
396 }
397
398 void
callREAD_CONFIG_REQ(Signal * signal)399 LocalProxy::callREAD_CONFIG_REQ(Signal* signal)
400 {
401 jam();
402 backREAD_CONFIG_REQ(signal);
403 }
404
405 void
backREAD_CONFIG_REQ(Signal * signal)406 LocalProxy::backREAD_CONFIG_REQ(Signal* signal)
407 {
408 jam();
409 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
410
411 // run sequentially due to big mallocs and initializations
412 sendREQ(signal, ss);
413 }
414
415 void
sendREAD_CONFIG_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)416 LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId,
417 SectionHandle* handle)
418 {
419 jam();
420 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
421
422 ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
423 req->senderRef = reference();
424 req->senderData = ssId;
425 req->noOfParameters = 0;
426 sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
427 signal, ReadConfigReq::SignalLength, JBB, handle);
428 }
429
430 void
execREAD_CONFIG_CONF(Signal * signal)431 LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
432 {
433 jam();
434 const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
435 Uint32 ssId = conf->senderData;
436 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
437
438 #ifdef DEBUG_RSS
439 {
440 ndb_rusage ru;
441 if (Ndb_GetRUsage(&ru, true) != 0)
442 {
443 g_eventLogger->error("LocalProxy : Failed to get rusage");
444 }
445 else
446 {
447 g_eventLogger->info("LocalProxy (conf from worker %u/%u) : RSS : %llu kB",
448 ss.m_worker,
449 c_workers,
450 ru.ru_rss);
451 }
452 }
453 #endif
454 recvCONF(signal, ss);
455 }
456
457 void
sendREAD_CONFIG_CONF(Signal * signal,Uint32 ssId)458 LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
459 {
460 jam();
461 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
462
463 if (!lastReply(ss))
464 {
465 jam();
466 return;
467 }
468
469 SectionHandle handle(this);
470 restoreHandle(handle, ss);
471 releaseSections(handle);
472
473 ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
474 conf->senderRef = reference();
475 conf->senderData = ss.m_req.senderData;
476 sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
477 signal, ReadConfigConf::SignalLength, JBB);
478
479 ssRelease<Ss_READ_CONFIG_REQ>(ssId);
480 }
481
482 // GSN_STTOR
483
484 void
execSTTOR(Signal * signal)485 LocalProxy::execSTTOR(Signal* signal)
486 {
487 jam();
488 Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
489
490 const Uint32 startphase = signal->theData[1];
491 const Uint32 typeOfStart = signal->theData[7];
492
493 if (startphase == 3)
494 {
495 jam();
496 c_typeOfStart = typeOfStart;
497 }
498
499 ss.m_reqlength = signal->getLength();
500 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
501
502 callSTTOR(signal);
503 }
504
505 void
callSTTOR(Signal * signal)506 LocalProxy::callSTTOR(Signal* signal)
507 {
508 jam();
509 backSTTOR(signal);
510 }
511
512 void
backSTTOR(Signal * signal)513 LocalProxy::backSTTOR(Signal* signal)
514 {
515 jam();
516 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
517 sendREQ(signal, ss);
518 }
519
520 void
sendSTTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)521 LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
522 {
523 jam();
524 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
525
526 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
527 sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
528 signal, ss.m_reqlength, JBB, handle);
529 }
530
531 void
execSTTORRY(Signal * signal)532 LocalProxy::execSTTORRY(Signal* signal)
533 {
534 jam();
535 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
536 recvCONF(signal, ss);
537 }
538
539 void
sendSTTORRY(Signal * signal,Uint32 ssId)540 LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
541 {
542 jam();
543 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
544
545 const Uint32 conflength = signal->getLength();
546 const Uint32* confdata = signal->getDataPtr();
547
548 // the reply is identical from all
549 if (firstReply(ss))
550 {
551 jam();
552 ss.m_conflength = conflength;
553 memcpy(ss.m_confdata, confdata, conflength << 2);
554 }
555 else
556 {
557 jam();
558 ndbrequire(ss.m_conflength == conflength);
559 ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
560 }
561
562 if (!lastReply(ss))
563 {
564 jam();
565 return;
566 }
567
568 memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
569 sendSignal(NDBCNTR_REF, GSN_STTORRY,
570 signal, ss.m_conflength, JBB);
571
572 ssRelease<Ss_STTOR>(ssId);
573 }
574
575 // GSN_NDB_STTOR
576
577 void
execNDB_STTOR(Signal * signal)578 LocalProxy::execNDB_STTOR(Signal* signal)
579 {
580 jam();
581 Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
582
583 const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
584 ss.m_req = *req;
585
586 callNDB_STTOR(signal);
587 }
588
589 void
callNDB_STTOR(Signal * signal)590 LocalProxy::callNDB_STTOR(Signal* signal)
591 {
592 jam();
593 backNDB_STTOR(signal);
594 }
595
596 void
backNDB_STTOR(Signal * signal)597 LocalProxy::backNDB_STTOR(Signal* signal)
598 {
599 jam();
600 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
601 sendREQ(signal, ss);
602 }
603
604 void
sendNDB_STTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)605 LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
606 {
607 jam();
608 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
609
610 NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
611 *req = ss.m_req;
612 req->senderRef = reference();
613 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
614 signal, ss.m_reqlength, JBB, handle);
615 }
616
617 void
execNDB_STTORRY(Signal * signal)618 LocalProxy::execNDB_STTORRY(Signal* signal)
619 {
620 jam();
621 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
622
623 // the reply contains only senderRef
624 const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
625 ndbrequire(conf->senderRef == signal->getSendersBlockRef());
626 recvCONF(signal, ss);
627 }
628
629 void
sendNDB_STTORRY(Signal * signal,Uint32 ssId)630 LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
631 {
632 jam();
633 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
634
635 if (!lastReply(ss))
636 {
637 jam();
638 return;
639 }
640
641 NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
642 conf->senderRef = reference();
643 sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
644 signal, NdbSttorry::SignalLength, JBB);
645
646 ssRelease<Ss_NDB_STTOR>(ssId);
647 }
648
649 // GSN_READ_NODESREQ
650
651 void
sendREAD_NODESREQ(Signal * signal)652 LocalProxy::sendREAD_NODESREQ(Signal* signal)
653 {
654 jam();
655 signal->theData[0] = reference();
656 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
657 }
658
659 void
execREAD_NODESCONF(Signal * signal)660 LocalProxy::execREAD_NODESCONF(Signal* signal)
661 {
662 jam();
663 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
664
665 const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
666
667 c_masterNodeId = conf->masterNodeId;
668
669 /**
670 * READ_NODESCONF comes with a section containing a bitmap, since the
671 * the proxy block didn't have its own method to receive this, it isn't
672 * interested in the contents of this. So can simply release it.
673 */
674 SectionHandle handle(this, signal);
675 releaseSections(handle);
676
677 switch (ss.m_gsn) {
678 case GSN_STTOR:
679 jam();
680 backSTTOR(signal);
681 break;
682 case GSN_NDB_STTOR:
683 jam();
684 backNDB_STTOR(signal);
685 break;
686 default:
687 ndbabort();
688 }
689
690 ss.m_gsn = 0;
691 }
692
693 void
execREAD_NODESREF(Signal * signal)694 LocalProxy::execREAD_NODESREF(Signal* signal)
695 {
696 jam();
697 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
698 ndbrequire(ss.m_gsn != 0);
699 ndbabort();
700 }
701
702 // GSN_NODE_FAILREP
703
704 void
execNODE_FAILREP(Signal * signal)705 LocalProxy::execNODE_FAILREP(Signal* signal)
706 {
707 jam();
708 Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
709 NodeFailRep* req = (NodeFailRep*)signal->getDataPtr();
710 ndbrequire(signal->getLength() == NodeFailRep::SignalLength ||
711 signal->getLength() == NodeFailRep::SignalLength_v1);
712
713 if(signal->getLength() == NodeFailRep::SignalLength)
714 {
715 ndbrequire(signal->getNoOfSections() == 1);
716 ndbrequire(getNodeInfo(refToNode(signal->getSendersBlockRef())).m_version);
717 SegmentedSectionPtr ptr;
718 SectionHandle handle(this, signal);
719 handle.getSection(ptr, 0);
720 memset(req->theNodes, 0 ,sizeof(req->theNodes));
721 copy(req->theNodes, ptr);
722 releaseSections(handle);
723 }
724 else
725 {
726 memset(req->theNodes + NdbNodeBitmask48::Size,
727 0,
728 _NDB_NBM_DIFF_BYTES);
729 }
730 ss.m_req = *req;
731 NdbNodeBitmask mask;
732 mask.assign(NdbNodeBitmask::Size, req->theNodes);
733
734 // from each worker wait for ack for each failed node
735 for (Uint32 i = 0; i < c_workers; i++)
736 {
737 jam();
738 NdbNodeBitmask& waitFor = ss.m_waitFor[i];
739 waitFor.bitOR(mask);
740 }
741
742 sendREQ(signal, ss);
743 if (ss.noReply(number()))
744 {
745 jam();
746 ssRelease<Ss_NODE_FAILREP>(ss);
747 }
748 }
749
750 void
sendNODE_FAILREP(Signal * signal,Uint32 ssId,SectionHandle * handle)751 LocalProxy::sendNODE_FAILREP(Signal* signal, Uint32 ssId, SectionHandle* handle)
752 {
753 jam();
754 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
755
756 NodeFailRep* req = (NodeFailRep*)signal->getDataPtrSend();
757 *req = ss.m_req;
758 handle->clear();
759 LinearSectionPtr lsptr[3];
760 lsptr[0].p = req->theNodes;
761 lsptr[0].sz = NdbNodeBitmask::getPackedLengthInWords(req->theNodes);
762 ndbrequire(import(handle->m_ptr[0], lsptr[0].p, lsptr[0].sz));
763 handle->m_cnt = 1;
764 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
765 signal, NodeFailRep::SignalLength, JBB, handle);
766 }
767
768 void
execNF_COMPLETEREP(Signal * signal)769 LocalProxy::execNF_COMPLETEREP(Signal* signal)
770 {
771 jam();
772 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
773 ndbrequire(!ss.noReply(number()));
774 ss.m_workerMask.set(ss.m_worker); // Avoid require in recvCONF
775 recvCONF(signal, ss);
776 }
777
778 void
sendNF_COMPLETEREP(Signal * signal,Uint32 ssId)779 LocalProxy::sendNF_COMPLETEREP(Signal* signal, Uint32 ssId)
780 {
781 jam();
782 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
783
784 const NFCompleteRep* conf = (const NFCompleteRep*)signal->getDataPtr();
785 Uint32 node = conf->failedNodeId;
786
787 {
788 NdbNodeBitmask& waitFor = ss.m_waitFor[ss.m_worker];
789 ndbrequire(waitFor.get(node));
790 waitFor.clear(node);
791 }
792
793 for (Uint32 i = 0; i < c_workers; i++)
794 {
795 jam();
796 NdbNodeBitmask& waitFor = ss.m_waitFor[i];
797 if (waitFor.get(node))
798 {
799 jam();
800 /**
801 * Not all threads are done with this failed node
802 */
803 return;
804 }
805 }
806
807 {
808 NFCompleteRep* conf = (NFCompleteRep*)signal->getDataPtrSend();
809 conf->blockNo = number();
810 conf->nodeId = getOwnNodeId();
811 conf->failedNodeId = node;
812 conf->unused = 0;
813 conf->from = __LINE__;
814
815 sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
816 signal, NFCompleteRep::SignalLength, JBB);
817
818 if (number() == DBTC)
819 {
820 /**
821 * DBTC send NF_COMPLETEREP "early" to QMGR
822 * so that it can allow api to handle node-failure of
823 * transactions eariler...
824 * See Qmgr::execNF_COMPLETEREP
825 */
826 jam();
827 sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
828 NFCompleteRep::SignalLength, JBB);
829 }
830 }
831 }
832
833 // GSN_INCL_NODEREQ
834
835 void
execINCL_NODEREQ(Signal * signal)836 LocalProxy::execINCL_NODEREQ(Signal* signal)
837 {
838 jam();
839 Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
840
841 ss.m_reqlength = signal->getLength();
842 ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
843 memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
844
845 sendREQ(signal, ss);
846 }
847
848 void
sendINCL_NODEREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)849 LocalProxy::sendINCL_NODEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
850 {
851 jam();
852 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
853
854 Ss_INCL_NODEREQ::Req* req =
855 (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
856
857 memcpy(req, &ss.m_req, ss.m_reqlength << 2);
858 req->senderRef = reference();
859 sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
860 signal, ss.m_reqlength, JBB, handle);
861 }
862
863 void
execINCL_NODECONF(Signal * signal)864 LocalProxy::execINCL_NODECONF(Signal* signal)
865 {
866 jam();
867 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
868 recvCONF(signal, ss);
869 }
870
871 void
sendINCL_NODECONF(Signal * signal,Uint32 ssId)872 LocalProxy::sendINCL_NODECONF(Signal* signal, Uint32 ssId)
873 {
874 jam();
875 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
876
877 if (!lastReply(ss))
878 {
879 jam();
880 return;
881 }
882
883 Ss_INCL_NODEREQ::Conf* conf =
884 (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
885
886 conf->inclNodeId = ss.m_req.inclNodeId;
887 conf->senderRef = reference();
888 sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
889 signal, 2, JBB);
890
891 ssRelease<Ss_INCL_NODEREQ>(ssId);
892 }
893
894 // GSN_NODE_STATE_REP
895
896 void
execNODE_STATE_REP(Signal * signal)897 LocalProxy::execNODE_STATE_REP(Signal* signal)
898 {
899 jam();
900 Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
901 sendREQ(signal, ss);
902 SimulatedBlock::execNODE_STATE_REP(signal);
903 ssRelease<Ss_NODE_STATE_REP>(ss);
904 }
905
906 void
sendNODE_STATE_REP(Signal * signal,Uint32 ssId,SectionHandle * handle)907 LocalProxy::sendNODE_STATE_REP(Signal* signal, Uint32 ssId,
908 SectionHandle* handle)
909 {
910 jam();
911 Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
912
913 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
914 signal,NodeStateRep::SignalLength, JBB, handle);
915 }
916
917 // GSN_CHANGE_NODE_STATE_REQ
918
919 void
execCHANGE_NODE_STATE_REQ(Signal * signal)920 LocalProxy::execCHANGE_NODE_STATE_REQ(Signal* signal)
921 {
922 jam();
923 Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
924
925 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
926 ss.m_req = *req;
927
928 sendREQ(signal, ss);
929 }
930
931 void
sendCHANGE_NODE_STATE_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)932 LocalProxy::sendCHANGE_NODE_STATE_REQ(Signal* signal, Uint32 ssId,
933 SectionHandle* handle)
934 {
935 jam();
936 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
937
938 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
939 req->senderRef = reference();
940
941 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
942 signal, ChangeNodeStateReq::SignalLength, JBB, handle);
943 }
944
945 void
execCHANGE_NODE_STATE_CONF(Signal * signal)946 LocalProxy::execCHANGE_NODE_STATE_CONF(Signal* signal)
947 {
948 jam();
949 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
950
951 ChangeNodeStateConf * conf = (ChangeNodeStateConf*)signal->getDataPtrSend();
952 ndbrequire(conf->senderData == ss.m_req.senderData);
953 recvCONF(signal, ss);
954 }
955
956 void
sendCHANGE_NODE_STATE_CONF(Signal * signal,Uint32 ssId)957 LocalProxy::sendCHANGE_NODE_STATE_CONF(Signal* signal, Uint32 ssId)
958 {
959 jam();
960 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
961
962 if (!lastReply(ss))
963 {
964 jam();
965 return;
966 }
967
968 /**
969 * SimulatedBlock::execCHANGE_NODE_STATE_REQ will reply
970 */
971 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
972 * req = ss.m_req;
973 SimulatedBlock::execCHANGE_NODE_STATE_REQ(signal);
974 ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
975 }
976
977 // GSN_DUMP_STATE_ORD
978
979 void
execDUMP_STATE_ORD(Signal * signal)980 LocalProxy::execDUMP_STATE_ORD(Signal* signal)
981 {
982 jam();
983 Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
984
985 ss.m_reqlength = signal->getLength();
986 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
987 sendREQ(signal, ss);
988 ssRelease<Ss_DUMP_STATE_ORD>(ss);
989 }
990
991 void
sendDUMP_STATE_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)992 LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId,
993 SectionHandle* handle)
994 {
995 jam();
996 Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
997
998 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
999 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
1000 signal, ss.m_reqlength, JBB, handle);
1001 }
1002
1003 // GSN_NDB_TAMPER
1004
1005 void
execNDB_TAMPER(Signal * signal)1006 LocalProxy::execNDB_TAMPER(Signal* signal)
1007 {
1008 jam();
1009 Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
1010
1011 const Uint32 siglen = signal->getLength();
1012 if (siglen == 1)
1013 {
1014 jam();
1015 ss.m_errorInsert = signal->theData[0];
1016 ss.m_haveErrorInsertExtra = false;
1017 }
1018 else
1019 {
1020 jam();
1021 ndbrequire(siglen == 2);
1022 ss.m_errorInsert = signal->theData[0];
1023 ss.m_haveErrorInsertExtra = true;
1024 ss.m_errorInsertExtra = signal->theData[1];
1025 }
1026
1027 SimulatedBlock::execNDB_TAMPER(signal);
1028 sendREQ(signal, ss);
1029 ssRelease<Ss_NDB_TAMPER>(ss);
1030 }
1031
1032 void
sendNDB_TAMPER(Signal * signal,Uint32 ssId,SectionHandle * handle)1033 LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId, SectionHandle* handle)
1034 {
1035 jam();
1036 Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
1037
1038 Uint32 siglen = 1;
1039 signal->theData[0] = ss.m_errorInsert;
1040 if (ss.m_haveErrorInsertExtra)
1041 {
1042 jam();
1043 signal->theData[1] = ss.m_errorInsertExtra;
1044 siglen ++;
1045 }
1046 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
1047 signal, siglen, JBB, handle);
1048 }
1049
1050 // GSN_TIME_SIGNAL
1051
1052 void
execTIME_SIGNAL(Signal * signal)1053 LocalProxy::execTIME_SIGNAL(Signal* signal)
1054 {
1055 jam();
1056 Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
1057
1058 sendREQ(signal, ss);
1059 ssRelease<Ss_TIME_SIGNAL>(ss);
1060 }
1061
1062 void
sendTIME_SIGNAL(Signal * signal,Uint32 ssId,SectionHandle * handle)1063 LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId, SectionHandle* handle)
1064 {
1065 jam();
1066 Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
1067 signal->theData[0] = 0;
1068 sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
1069 signal, 1, JBB, handle);
1070 }
1071
1072 // GSN_CREATE_TRIG_IMPL_REQ
1073
1074 void
execCREATE_TRIG_IMPL_REQ(Signal * signal)1075 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
1076 {
1077 if (!assembleFragments(signal))
1078 return;
1079
1080 jam();
1081 if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
1082 {
1083 jam();
1084 return;
1085 }
1086 const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
1087 Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
1088 ss.m_req = *req;
1089 ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
1090
1091 SectionHandle handle(this, signal);
1092 saveSections(ss, handle);
1093
1094 sendREQ(signal, ss);
1095 }
1096
1097 void
sendCREATE_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1098 LocalProxy::sendCREATE_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
1099 SectionHandle * handle)
1100 {
1101 jam();
1102 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1103
1104 CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
1105 *req = ss.m_req;
1106 req->senderRef = reference();
1107 req->senderData = ssId;
1108 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
1109 signal, CreateTrigImplReq::SignalLength, JBB,
1110 handle);
1111 }
1112
1113 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)1114 LocalProxy::execCREATE_TRIG_IMPL_CONF(Signal* signal)
1115 {
1116 jam();
1117 const CreateTrigImplConf* conf = (const CreateTrigImplConf*)signal->getDataPtr();
1118 Uint32 ssId = conf->senderData;
1119 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1120 recvCONF(signal, ss);
1121 }
1122
1123 void
execCREATE_TRIG_IMPL_REF(Signal * signal)1124 LocalProxy::execCREATE_TRIG_IMPL_REF(Signal* signal)
1125 {
1126 jam();
1127 const CreateTrigImplRef* ref = (const CreateTrigImplRef*)signal->getDataPtr();
1128 Uint32 ssId = ref->senderData;
1129 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1130 recvREF(signal, ss, ref->errorCode);
1131 }
1132
1133 void
sendCREATE_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1134 LocalProxy::sendCREATE_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1135 {
1136 jam();
1137 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1138 BlockReference dictRef = ss.m_req.senderRef;
1139
1140 if (!lastReply(ss))
1141 {
1142 jam();
1143 return;
1144 }
1145
1146 if (ss.m_error == 0)
1147 {
1148 jam();
1149 CreateTrigImplConf* conf = (CreateTrigImplConf*)signal->getDataPtrSend();
1150 conf->senderRef = reference();
1151 conf->senderData = ss.m_req.senderData;
1152 conf->tableId = ss.m_req.tableId;
1153 conf->triggerId = ss.m_req.triggerId;
1154 conf->triggerInfo = ss.m_req.triggerInfo;
1155 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
1156 signal, CreateTrigImplConf::SignalLength, JBB);
1157 }
1158 else
1159 {
1160 jam();
1161 CreateTrigImplRef* ref = (CreateTrigImplRef*)signal->getDataPtrSend();
1162 ref->senderRef = reference();
1163 ref->senderData = ss.m_req.senderData;
1164 ref->tableId = ss.m_req.tableId;
1165 ref->triggerId = ss.m_req.triggerId;
1166 ref->triggerInfo = ss.m_req.triggerInfo;
1167 ref->errorCode = ss.m_error;
1168 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
1169 signal, CreateTrigImplRef::SignalLength, JBB);
1170 }
1171
1172 ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1173 }
1174
1175 // GSN_DROP_TRIG_IMPL_REQ
1176
1177 void
execDROP_TRIG_IMPL_REQ(Signal * signal)1178 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
1179 {
1180 jam();
1181 if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
1182 {
1183 jam();
1184 return;
1185 }
1186 const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
1187 Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
1188 ss.m_req = *req;
1189 ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
1190 sendREQ(signal, ss);
1191 }
1192
1193 void
sendDROP_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1194 LocalProxy::sendDROP_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
1195 SectionHandle * handle)
1196 {
1197 jam();
1198 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1199
1200 DropTrigImplReq* req = (DropTrigImplReq*)signal->getDataPtrSend();
1201 *req = ss.m_req;
1202 req->senderRef = reference();
1203 req->senderData = ssId;
1204 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
1205 signal, DropTrigImplReq::SignalLength, JBB, handle);
1206 }
1207
1208 void
execDROP_TRIG_IMPL_CONF(Signal * signal)1209 LocalProxy::execDROP_TRIG_IMPL_CONF(Signal* signal)
1210 {
1211 jam();
1212 const DropTrigImplConf* conf = (const DropTrigImplConf*)signal->getDataPtr();
1213 Uint32 ssId = conf->senderData;
1214 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1215 recvCONF(signal, ss);
1216 }
1217
1218 void
execDROP_TRIG_IMPL_REF(Signal * signal)1219 LocalProxy::execDROP_TRIG_IMPL_REF(Signal* signal)
1220 {
1221 jam();
1222 const DropTrigImplRef* ref = (const DropTrigImplRef*)signal->getDataPtr();
1223 Uint32 ssId = ref->senderData;
1224 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1225 recvREF(signal, ss, ref->errorCode);
1226 }
1227
1228 void
sendDROP_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1229 LocalProxy::sendDROP_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1230 {
1231 jam();
1232 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1233 BlockReference dictRef = ss.m_req.senderRef;
1234
1235 if (!lastReply(ss))
1236 {
1237 jam();
1238 return;
1239 }
1240
1241 if (ss.m_error == 0)
1242 {
1243 jam();
1244 DropTrigImplConf* conf = (DropTrigImplConf*)signal->getDataPtrSend();
1245 conf->senderRef = reference();
1246 conf->senderData = ss.m_req.senderData;
1247 conf->tableId = ss.m_req.tableId;
1248 conf->triggerId = ss.m_req.triggerId;
1249 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1250 signal, DropTrigImplConf::SignalLength, JBB);
1251 }
1252 else
1253 {
1254 jam();
1255 DropTrigImplRef* ref = (DropTrigImplRef*)signal->getDataPtrSend();
1256 ref->senderRef = reference();
1257 ref->senderData = ss.m_req.senderData;
1258 ref->tableId = ss.m_req.tableId;
1259 ref->triggerId = ss.m_req.triggerId;
1260 ref->errorCode = ss.m_error;
1261 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1262 signal, DropTrigImplRef::SignalLength, JBB);
1263 }
1264
1265 ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1266 }
1267
1268 // GSN_DBINFO_SCANREQ
1269
1270 static Uint32
switchRef(Uint32 block,Uint32 instance,Uint32 node)1271 switchRef(Uint32 block, Uint32 instance, Uint32 node)
1272 {
1273 const Uint32 ref = numberToRef(block, instance, node);
1274 #ifdef DBINFO_SCAN_TRACE
1275 ndbout_c("Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1276 getBlockName(block, "<unknown>"), instance, node, ref);
1277 #endif
1278 return ref;
1279 }
1280
1281
1282 bool
find_next(Ndbinfo::ScanCursor * cursor) const1283 LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
1284 {
1285 const Uint32 node = refToNode(cursor->currRef);
1286 const Uint32 block = refToMain(cursor->currRef);
1287 Uint32 instance = refToInstance(cursor->currRef);
1288
1289 ndbrequire(node == getOwnNodeId());
1290 ndbrequire(block == number());
1291
1292
1293 Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1294
1295 if (worker < c_workers)
1296 {
1297 jam();
1298 cursor->currRef = switchRef(block, workerInstance(worker), node);
1299 return true;
1300 }
1301
1302 cursor->currRef = numberToRef(block, node);
1303 return false;
1304 }
1305
1306
1307
1308 void
execDBINFO_SCANREQ(Signal * signal)1309 LocalProxy::execDBINFO_SCANREQ(Signal* signal)
1310 {
1311 jamEntry();
1312 const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
1313 Uint32 signal_length = signal->getLength();
1314 ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1315
1316 Ndbinfo::ScanCursor* cursor =
1317 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
1318
1319 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
1320 cursor->saveCurrRef)
1321 {
1322 jam();
1323 /* Continue in the saved block ref */
1324 cursor->currRef = cursor->saveCurrRef;
1325 cursor->saveCurrRef = 0;
1326
1327 // Set this block as sender and remember original sender
1328 cursor->saveSenderRef = cursor->senderRef;
1329 cursor->senderRef = reference();
1330
1331 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1332 signal, signal_length, JBB);
1333 return;
1334 }
1335
1336 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
1337
1338 if (find_next(cursor))
1339 {
1340 jam();
1341 ndbrequire(cursor->currRef);
1342 ndbrequire(cursor->saveCurrRef == 0);
1343
1344 // Set this block as sender and remember original sender
1345 cursor->saveSenderRef = cursor->senderRef;
1346 cursor->senderRef = reference();
1347
1348 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1349 signal, signal_length, JBB);
1350 return;
1351 }
1352
1353 /* Scan is done, send SCANCONF back to caller */
1354 ndbrequire(cursor->saveSenderRef == 0);
1355
1356 ndbrequire(cursor->currRef);
1357 ndbrequire(cursor->saveCurrRef == 0);
1358
1359 ndbrequire(refToInstance(cursor->currRef) == 0);
1360 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1361 return;
1362 }
1363
1364 void
execDBINFO_SCANCONF(Signal * signal)1365 LocalProxy::execDBINFO_SCANCONF(Signal* signal)
1366 {
1367 jamEntry();
1368 const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
1369 Uint32 signal_length = signal->getLength();
1370 ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1371
1372 Ndbinfo::ScanCursor* cursor =
1373 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
1374
1375 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
1376 {
1377 /* The instance has more data and want to continue */
1378 jam();
1379
1380 /* Swap back saved senderRef */
1381 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1382 cursor->saveSenderRef = 0;
1383
1384 /* Save currRef to continue with same instance again */
1385 cursor->saveCurrRef = cursor->currRef;
1386 cursor->currRef = reference();
1387
1388 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1389 return;
1390 }
1391
1392 if (conf->returnedRows)
1393 {
1394 jam();
1395 /*
1396 The instance has no more data, but it has sent rows
1397 to the API which need to be CONFed
1398 */
1399
1400 /* Swap back saved senderRef */
1401 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1402 cursor->saveSenderRef = 0;
1403
1404 if (find_next(cursor))
1405 {
1406 /*
1407 There is another instance to continue in - signal 'more data'
1408 and setup saveCurrRef to continue in that instance
1409 */
1410 jam();
1411 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
1412
1413 cursor->saveCurrRef = cursor->currRef;
1414 cursor->currRef = reference();
1415 }
1416 else
1417 {
1418 /* There was no more instances to continue in */
1419 ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) == false);
1420
1421 ndbrequire(cursor->currRef == reference());
1422 cursor->saveCurrRef = 0;
1423 }
1424
1425 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1426 return;
1427 }
1428
1429
1430 /* The underlying block reported completed, find next if any */
1431 if (find_next(cursor))
1432 {
1433 jam();
1434
1435 ndbrequire(cursor->senderRef == reference());
1436 ndbrequire(cursor->saveSenderRef); // Should already be set
1437
1438 ndbrequire(cursor->saveCurrRef == 0);
1439
1440 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1441 signal, signal_length, JBB);
1442 return;
1443 }
1444
1445 /* Scan in this block and its instances are completed */
1446
1447 /* Swap back saved senderRef */
1448 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1449 cursor->saveSenderRef = 0;
1450
1451 ndbrequire(cursor->currRef);
1452 ndbrequire(cursor->saveCurrRef == 0);
1453
1454 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1455 return;
1456 }
1457
1458 // GSN_SYNC_REQ
1459
1460 void
execSYNC_REQ(Signal * signal)1461 LocalProxy::execSYNC_REQ(Signal* signal)
1462 {
1463 jam();
1464 Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1465
1466 ss.m_req = * CAST_CONSTPTR(SyncReq, signal->getDataPtr());
1467
1468 sendREQ(signal, ss);
1469 }
1470
1471 void
sendSYNC_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1472 LocalProxy::sendSYNC_REQ(Signal* signal, Uint32 ssId,
1473 SectionHandle* handle)
1474 {
1475 jam();
1476 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1477
1478 SyncReq * req = CAST_PTR(SyncReq, signal->getDataPtrSend());
1479 req->senderRef = reference();
1480 req->senderData = ssId;
1481 req->prio = ss.m_req.prio;
1482
1483 sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1484 signal, SyncReq::SignalLength,
1485 JobBufferLevel(ss.m_req.prio), handle);
1486 }
1487
1488 void
execSYNC_REF(Signal * signal)1489 LocalProxy::execSYNC_REF(Signal* signal)
1490 {
1491 jam();
1492 SyncRef ref = * CAST_CONSTPTR(SyncRef, signal->getDataPtr());
1493 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1494
1495 recvREF(signal, ss, ref.errorCode);
1496 }
1497
1498 void
execSYNC_CONF(Signal * signal)1499 LocalProxy::execSYNC_CONF(Signal* signal)
1500 {
1501 jam();
1502 SyncConf conf = * CAST_CONSTPTR(SyncConf, signal->getDataPtr());
1503 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1504
1505 recvCONF(signal, ss);
1506 }
1507
1508 void
sendSYNC_CONF(Signal * signal,Uint32 ssId)1509 LocalProxy::sendSYNC_CONF(Signal* signal, Uint32 ssId)
1510 {
1511 jam();
1512 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1513
1514 if (!lastReply(ss))
1515 {
1516 jam();
1517 return;
1518 }
1519
1520 /**
1521 * SimulatedBlock::execSYNC_REQ will reply
1522 */
1523 if (ss.m_error == 0)
1524 {
1525 jam();
1526 SyncConf * conf = CAST_PTR(SyncConf, signal->getDataPtrSend());
1527 conf->senderRef = reference();
1528 conf->senderData = ss.m_req.senderData;
1529
1530 Uint32 prio = ss.m_req.prio;
1531 sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1532 SyncConf::SignalLength,
1533 JobBufferLevel(prio));
1534 }
1535 else
1536 {
1537 jam();
1538 SyncRef * ref = CAST_PTR(SyncRef, signal->getDataPtrSend());
1539 ref->senderRef = reference();
1540 ref->senderData = ss.m_req.senderData;
1541 ref->errorCode = ss.m_error;
1542
1543 Uint32 prio = ss.m_req.prio;
1544 sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1545 SyncRef::SignalLength,
1546 JobBufferLevel(prio));
1547 }
1548 ssRelease<Ss_SYNC_REQ>(ssId);
1549 }
1550
1551 void
execSYNC_PATH_REQ(Signal * signal)1552 LocalProxy::execSYNC_PATH_REQ(Signal* signal)
1553 {
1554 jam();
1555 SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
1556 req->count *= c_workers;
1557
1558 for (Uint32 i = 0; i < c_workers; i++)
1559 {
1560 jam();
1561 Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1562 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1563 signal->getLength(),
1564 JobBufferLevel(req->prio));
1565 }
1566 }
1567
1568 // GSN_API_FAILREQ
1569
1570 void
execAPI_FAILREQ(Signal * signal)1571 LocalProxy::execAPI_FAILREQ(Signal* signal)
1572 {
1573 jam();
1574 Uint32 nodeId = signal->theData[0];
1575 Ss_API_FAILREQ& ss = ssSeize<Ss_API_FAILREQ>(nodeId);
1576
1577 ss.m_ref = signal->theData[1];
1578 sendREQ(signal, ss);
1579 }
1580
1581 void
sendAPI_FAILREQ(Signal * signal,Uint32 nodeId,SectionHandle *)1582 LocalProxy::sendAPI_FAILREQ(Signal* signal, Uint32 nodeId, SectionHandle*)
1583 {
1584 jam();
1585 Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(nodeId);
1586
1587 /*
1588 * It is a requirement that the API_FAILREQ signal is
1589 * received as the last signal from the failed 'nodeId'.
1590 * It is produced by QMGR and ROUTEed via the TRPMAN(s)
1591 * (in the recv-thread) with the intention that it will
1592 * enter the job buffers after any signals received
1593 * from the failed API node. (Note that the transporter
1594 * *is* closed when API_FAILREQ is created, so any more
1595 * signals should not arrive from this API node).
1596 *
1597 * However, sending the API_FAILREQ via the proxy
1598 * will insert it in another job buffer queue than signals
1599 * received from the API-node:
1600 *
1601 * API-request : recv/TRPMAN-thread ---> Block-instance
1602 * ^
1603 * Routed-API_FAILREQ: recv/TRPMAN-thread /
1604 * \ /
1605 * Proxy
1606 *
1607 * Depening on the order the queues are processed, the
1608 * API_FAILREQ may then be processed by the Block-instance
1609 * before a API-request from the failed node - Which is
1610 * not what we want.
1611 *
1612 * Thus we let any proxies receiving a API_FAILREQ, route it
1613 * back to the recv/TRPMAN. There it will be inserted in
1614 * the same queue as the API-requests, and thus remove the
1615 * posibilities for being overtaken:
1616 *
1617 * Routed-API_FAILREQ: recv/TRPMAN-thread
1618 * \
1619 * re-route to TRPMAN: -----------<-Proxy (we are here)
1620 * |
1621 * v
1622 * API-request+FAIL : recv/TRPMAN-thread ---> Block-instance
1623 */
1624
1625 /* API_FAILREQ signal: */
1626 signal->theData[0] = nodeId;
1627 signal->theData[1] = reference();
1628
1629 Uint32 routedSignalSectionI = RNIL;
1630 ndbrequire(appendToSection(routedSignalSectionI,
1631 &signal->theData[0],
1632 2));
1633 SectionHandle handle(this, routedSignalSectionI);
1634
1635 /* RouteOrd data */
1636 RouteOrd* routeOrd = (RouteOrd*) signal->getDataPtrSend();
1637
1638 routeOrd->srcRef = reference();
1639 routeOrd->gsn = GSN_API_FAILREQ;
1640 routeOrd->from = nodeId;
1641 routeOrd->dstRef = workerRef(ss.m_worker);
1642 /* ROUTE it through the TRPMAN */
1643 sendSignal(TRPMAN_REF, GSN_ROUTE_ORD, signal,
1644 RouteOrd::SignalLength,
1645 JBB, &handle);
1646 }
1647
1648 void
execAPI_FAILCONF(Signal * signal)1649 LocalProxy::execAPI_FAILCONF(Signal* signal)
1650 {
1651 jam();
1652 Uint32 nodeId = signal->theData[0];
1653 Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(nodeId);
1654 recvCONF(signal, ss);
1655 }
1656
1657 void
sendAPI_FAILCONF(Signal * signal,Uint32 ssId)1658 LocalProxy::sendAPI_FAILCONF(Signal* signal, Uint32 ssId)
1659 {
1660 jam();
1661 Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(ssId);
1662
1663 if (!lastReply(ss))
1664 {
1665 jam();
1666 return;
1667 }
1668
1669 signal->theData[0] = ssId;
1670 signal->theData[1] = reference();
1671 sendSignal(ss.m_ref, GSN_API_FAILCONF,
1672 signal, 2, JBB);
1673
1674 ssRelease<Ss_API_FAILREQ>(ssId);
1675 }
1676
1677 BLOCK_FUNCTIONS(LocalProxy)
1678