1 /* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22
23 #include <mt.hpp>
24 #include "LocalProxy.hpp"
25
LocalProxy(BlockNumber blockNumber,Block_context & ctx)26 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
27 SimulatedBlock(blockNumber, ctx)
28 {
29 BLOCK_CONSTRUCTOR(LocalProxy);
30
31 ndbrequire(instance() == 0); // this is main block
32 c_lqhWorkers = 0;
33 c_extraWorkers = 0; // sub-class constructor can set
34 c_workers = 0;
35 Uint32 i;
36 for (i = 0; i < MaxWorkers; i++)
37 c_worker[i] = 0;
38
39 c_ssIdSeq = 0;
40
41 c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
42 c_masterNodeId = ZNIL;
43
44 // GSN_READ_CONFIG_REQ
45 addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
46 addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
47
48 // GSN_STTOR
49 addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
50 addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
51
52 // GSN_NDB_STTOR
53 addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
54 addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
55
56 // GSN_READ_NODESREQ
57 addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
58 addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
59
60 // GSN_NODE_FAILREP
61 addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
62 addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
63
64 // GSN_INCL_NODEREQ
65 addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
66 addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
67
68 // GSN_NODE_STATE_REP
69 addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP, true);
70
71 // GSN_CHANGE_NODE_STATE_REQ
72 addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ, true);
73 addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
74
75 // GSN_DUMP_STATE_ORD
76 addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
77
78 // GSN_NDB_TAMPER
79 addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
80
81 // GSN_TIME_SIGNAL
82 addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
83
84 // GSN_CREATE_TRIG_IMPL_REQ
85 addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
86 addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
87 addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
88
89 // GSN_DROP_TRIG_IMPL_REQ
90 addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
91 addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
92 addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
93
94 // GSN_DBINFO_SCANREQ
95 addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
96 addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
97
98 // GSN_SYNC_REQ
99 addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ, true);
100 addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
101 addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
102
103 // GSN_SYNC_PATH_REQ
104 addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ, true);
105 }
106
~LocalProxy()107 LocalProxy::~LocalProxy()
108 {
109 // dtor of main block deletes workers
110 }
111
112 // support routines
113
114 void
sendREQ(Signal * signal,SsSequential & ss)115 LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
116 {
117 ss.m_worker = 0;
118 ndbrequire(ss.m_sendREQ != 0);
119 SectionHandle handle(this);
120 restoreHandle(handle, ss);
121 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
122 saveSections(ss, handle);
123 }
124
125 void
recvCONF(Signal * signal,SsSequential & ss)126 LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
127 {
128 ndbrequire(ss.m_sendCONF != 0);
129 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
130
131 ss.m_worker++;
132 if (ss.m_worker < c_workers) {
133 jam();
134 ndbrequire(ss.m_sendREQ != 0);
135 SectionHandle handle(this);
136 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
137 return;
138 }
139 }
140
141 void
recvREF(Signal * signal,SsSequential & ss,Uint32 error)142 LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
143 {
144 ndbrequire(error != 0);
145 if (ss.m_error == 0)
146 ss.m_error = error;
147 recvCONF(signal, ss);
148 }
149
150 void
skipReq(SsSequential & ss)151 LocalProxy::skipReq(SsSequential& ss)
152 {
153 }
154
155 void
skipConf(SsSequential & ss)156 LocalProxy::skipConf(SsSequential& ss)
157 {
158 }
159
160 void
saveSections(SsCommon & ss,SectionHandle & handle)161 LocalProxy::saveSections(SsCommon& ss, SectionHandle & handle)
162 {
163 ss.m_sec_cnt = handle.m_cnt;
164 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
165 ss.m_sec_ptr[i] = handle.m_ptr[i].i;
166 handle.clear();
167 }
168
169 void
restoreHandle(SectionHandle & handle,SsCommon & ss)170 LocalProxy::restoreHandle(SectionHandle & handle, SsCommon& ss)
171 {
172 handle.m_cnt = ss.m_sec_cnt;
173 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
174 handle.m_ptr[i].i = ss.m_sec_ptr[i];
175
176 getSections(handle.m_cnt, handle.m_ptr);
177 ss.m_sec_cnt = 0;
178 }
179
180 bool
firstReply(const SsSequential & ss)181 LocalProxy::firstReply(const SsSequential& ss)
182 {
183 return ss.m_worker == 0;
184 }
185
186 bool
lastReply(const SsSequential & ss)187 LocalProxy::lastReply(const SsSequential& ss)
188 {
189 return ss.m_worker + 1 == c_workers;
190 }
191
192 void
sendREQ(Signal * signal,SsParallel & ss)193 LocalProxy::sendREQ(Signal* signal, SsParallel& ss)
194 {
195 ndbrequire(ss.m_sendREQ != 0);
196
197 ss.m_workerMask.clear();
198 ss.m_worker = 0;
199 const Uint32 count = ss.m_extraLast ? c_lqhWorkers : c_workers;
200 SectionHandle handle(this);
201 restoreHandle(handle, ss);
202 while (ss.m_worker < count) {
203 jam();
204 ss.m_workerMask.set(ss.m_worker);
205 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
206 ss.m_worker++;
207 }
208 releaseSections(handle);
209 }
210
211 void
recvCONF(Signal * signal,SsParallel & ss)212 LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
213 {
214 ndbrequire(ss.m_sendCONF != 0);
215
216 BlockReference ref = signal->getSendersBlockRef();
217 ndbrequire(refToMain(ref) == number());
218
219 Uint32 ino = refToInstance(ref);
220 ss.m_worker = workerIndex(ino);
221 ndbrequire(ref == workerRef(ss.m_worker));
222 ndbrequire(ss.m_worker < c_workers);
223 ndbrequire(ss.m_workerMask.get(ss.m_worker));
224 ss.m_workerMask.clear(ss.m_worker);
225
226 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
227 }
228
229 void
recvREF(Signal * signal,SsParallel & ss,Uint32 error)230 LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
231 {
232 ndbrequire(error != 0);
233 if (ss.m_error == 0)
234 ss.m_error = error;
235 recvCONF(signal, ss);
236 }
237
238 void
skipReq(SsParallel & ss)239 LocalProxy::skipReq(SsParallel& ss)
240 {
241 ndbrequire(ss.m_workerMask.get(ss.m_worker));
242 ss.m_workerMask.clear(ss.m_worker);
243 }
244
245 // more replies expected from this worker
246 void
skipConf(SsParallel & ss)247 LocalProxy::skipConf(SsParallel& ss)
248 {
249 ndbrequire(!ss.m_workerMask.get(ss.m_worker));
250 ss.m_workerMask.set(ss.m_worker);
251 }
252
253 bool
firstReply(const SsParallel & ss)254 LocalProxy::firstReply(const SsParallel& ss)
255 {
256 const WorkerMask& mask = ss.m_workerMask;
257 const Uint32 count = mask.count();
258
259 // recvCONF has cleared current worker
260 ndbrequire(ss.m_worker < c_workers);
261 ndbrequire(!mask.get(ss.m_worker));
262 ndbrequire(count < c_workers);
263 return count + 1 == c_workers;
264 }
265
266 bool
lastReply(const SsParallel & ss)267 LocalProxy::lastReply(const SsParallel& ss)
268 {
269 return ss.m_workerMask.isclear();
270 }
271
272 bool
lastExtra(Signal * signal,SsParallel & ss)273 LocalProxy::lastExtra(Signal* signal, SsParallel& ss)
274 {
275 SectionHandle handle(this);
276 if (c_lqhWorkers + ss.m_extraSent < c_workers) {
277 jam();
278 ss.m_worker = c_lqhWorkers + ss.m_extraSent;
279 ss.m_workerMask.set(ss.m_worker);
280 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
281 ss.m_extraSent++;
282 return false;
283 }
284 return true;
285 }
286
287 // used in "reverse" proxying (start with worker REQs)
288 void
setMask(SsParallel & ss)289 LocalProxy::setMask(SsParallel& ss)
290 {
291 Uint32 i;
292 for (i = 0; i < c_workers; i++)
293 ss.m_workerMask.set(i);
294 }
295
296 void
setMask(SsParallel & ss,const WorkerMask & mask)297 LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
298 {
299 ss.m_workerMask.assign(mask);
300 }
301
302 // load workers (before first signal)
303
304 void
loadWorkers()305 LocalProxy::loadWorkers()
306 {
307 c_lqhWorkers = getLqhWorkers();
308 c_workers = c_lqhWorkers + c_extraWorkers;
309
310 Uint32 i;
311 for (i = 0; i < c_workers; i++) {
312 jam();
313 Uint32 instanceNo = workerInstance(i);
314
315 SimulatedBlock* worker = newWorker(instanceNo);
316 ndbrequire(worker->instance() == instanceNo);
317 ndbrequire(this->getInstance(instanceNo) == worker);
318 c_worker[i] = worker;
319
320 if (i < c_lqhWorkers) {
321 add_lqh_worker_thr_map(number(), instanceNo);
322 } else {
323 add_extra_worker_thr_map(number(), instanceNo);
324 }
325 }
326 }
327
328 // GSN_READ_CONFIG_REQ
329
330 void
execREAD_CONFIG_REQ(Signal * signal)331 LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
332 {
333 Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
334
335 const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
336 ss.m_req = *req;
337 ndbrequire(ss.m_req.noOfParameters == 0);
338 callREAD_CONFIG_REQ(signal);
339 }
340
341 void
callREAD_CONFIG_REQ(Signal * signal)342 LocalProxy::callREAD_CONFIG_REQ(Signal* signal)
343 {
344 backREAD_CONFIG_REQ(signal);
345 }
346
347 void
backREAD_CONFIG_REQ(Signal * signal)348 LocalProxy::backREAD_CONFIG_REQ(Signal* signal)
349 {
350 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
351
352 // run sequentially due to big mallocs and initializations
353 sendREQ(signal, ss);
354 }
355
356 void
sendREAD_CONFIG_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)357 LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId,
358 SectionHandle* handle)
359 {
360 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
361
362 ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
363 req->senderRef = reference();
364 req->senderData = ssId;
365 req->noOfParameters = 0;
366 sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
367 signal, ReadConfigReq::SignalLength, JBB, handle);
368 }
369
370 void
execREAD_CONFIG_CONF(Signal * signal)371 LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
372 {
373 const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
374 Uint32 ssId = conf->senderData;
375 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
376 recvCONF(signal, ss);
377 }
378
379 void
sendREAD_CONFIG_CONF(Signal * signal,Uint32 ssId)380 LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
381 {
382 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
383
384 if (!lastReply(ss))
385 return;
386
387 SectionHandle handle(this);
388 restoreHandle(handle, ss);
389 releaseSections(handle);
390
391 ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
392 conf->senderRef = reference();
393 conf->senderData = ss.m_req.senderData;
394 sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
395 signal, ReadConfigConf::SignalLength, JBB);
396
397 ssRelease<Ss_READ_CONFIG_REQ>(ssId);
398 }
399
400 // GSN_STTOR
401
402 void
execSTTOR(Signal * signal)403 LocalProxy::execSTTOR(Signal* signal)
404 {
405 Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
406
407 const Uint32 startphase = signal->theData[1];
408 const Uint32 typeOfStart = signal->theData[7];
409
410 if (startphase == 3) {
411 jam();
412 c_typeOfStart = typeOfStart;
413 }
414
415 ss.m_reqlength = signal->getLength();
416 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
417
418 callSTTOR(signal);
419 }
420
421 void
callSTTOR(Signal * signal)422 LocalProxy::callSTTOR(Signal* signal)
423 {
424 backSTTOR(signal);
425 }
426
427 void
backSTTOR(Signal * signal)428 LocalProxy::backSTTOR(Signal* signal)
429 {
430 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
431 sendREQ(signal, ss);
432 }
433
434 void
sendSTTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)435 LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
436 {
437 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
438
439 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
440 sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
441 signal, ss.m_reqlength, JBB, handle);
442 }
443
444 void
execSTTORRY(Signal * signal)445 LocalProxy::execSTTORRY(Signal* signal)
446 {
447 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
448 recvCONF(signal, ss);
449 }
450
451 void
sendSTTORRY(Signal * signal,Uint32 ssId)452 LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
453 {
454 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
455
456 const Uint32 conflength = signal->getLength();
457 const Uint32* confdata = signal->getDataPtr();
458
459 // the reply is identical from all
460 if (firstReply(ss)) {
461 ss.m_conflength = conflength;
462 memcpy(ss.m_confdata, confdata, conflength << 2);
463 } else {
464 ndbrequire(ss.m_conflength == conflength);
465 ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
466 }
467
468 if (!lastReply(ss))
469 return;
470
471 memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
472 sendSignal(NDBCNTR_REF, GSN_STTORRY,
473 signal, ss.m_conflength, JBB);
474
475 ssRelease<Ss_STTOR>(ssId);
476 }
477
478 // GSN_NDB_STTOR
479
480 void
execNDB_STTOR(Signal * signal)481 LocalProxy::execNDB_STTOR(Signal* signal)
482 {
483 Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
484
485 const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
486 ss.m_req = *req;
487
488 callNDB_STTOR(signal);
489 }
490
491 void
callNDB_STTOR(Signal * signal)492 LocalProxy::callNDB_STTOR(Signal* signal)
493 {
494 backNDB_STTOR(signal);
495 }
496
497 void
backNDB_STTOR(Signal * signal)498 LocalProxy::backNDB_STTOR(Signal* signal)
499 {
500 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
501 sendREQ(signal, ss);
502 }
503
504 void
sendNDB_STTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)505 LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
506 {
507 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
508
509 NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
510 *req = ss.m_req;
511 req->senderRef = reference();
512 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
513 signal, ss.m_reqlength, JBB, handle);
514 }
515
516 void
execNDB_STTORRY(Signal * signal)517 LocalProxy::execNDB_STTORRY(Signal* signal)
518 {
519 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
520
521 // the reply contains only senderRef
522 const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
523 ndbrequire(conf->senderRef == signal->getSendersBlockRef());
524 recvCONF(signal, ss);
525 }
526
527 void
sendNDB_STTORRY(Signal * signal,Uint32 ssId)528 LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
529 {
530 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
531
532 if (!lastReply(ss))
533 return;
534
535 NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
536 conf->senderRef = reference();
537 sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
538 signal, NdbSttorry::SignalLength, JBB);
539
540 ssRelease<Ss_NDB_STTOR>(ssId);
541 }
542
543 // GSN_READ_NODESREQ
544
545 void
sendREAD_NODESREQ(Signal * signal)546 LocalProxy::sendREAD_NODESREQ(Signal* signal)
547 {
548 signal->theData[0] = reference();
549 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
550 }
551
552 void
execREAD_NODESCONF(Signal * signal)553 LocalProxy::execREAD_NODESCONF(Signal* signal)
554 {
555 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
556
557 const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
558
559 c_masterNodeId = conf->masterNodeId;
560
561 switch (ss.m_gsn) {
562 case GSN_STTOR:
563 backSTTOR(signal);
564 break;
565 case GSN_NDB_STTOR:
566 backNDB_STTOR(signal);
567 break;
568 default:
569 ndbrequire(false);
570 break;
571 }
572
573 ss.m_gsn = 0;
574 }
575
576 void
execREAD_NODESREF(Signal * signal)577 LocalProxy::execREAD_NODESREF(Signal* signal)
578 {
579 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
580 ndbrequire(ss.m_gsn != 0);
581 ndbrequire(false);
582 }
583
584 // GSN_NODE_FAILREP
585
586 void
execNODE_FAILREP(Signal * signal)587 LocalProxy::execNODE_FAILREP(Signal* signal)
588 {
589 Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
590 const NodeFailRep* req = (const NodeFailRep*)signal->getDataPtr();
591 ss.m_req = *req;
592 ndbrequire(signal->getLength() == NodeFailRep::SignalLength);
593
594 NdbNodeBitmask mask;
595 mask.assign(NdbNodeBitmask::Size, req->theNodes);
596
597 // from each worker wait for ack for each failed node
598 for (Uint32 i = 0; i < c_workers; i++)
599 {
600 jam();
601 NdbNodeBitmask& waitFor = ss.m_waitFor[i];
602 waitFor.bitOR(mask);
603 }
604
605 sendREQ(signal, ss);
606 if (ss.noReply(number()))
607 {
608 jam();
609 ssRelease<Ss_NODE_FAILREP>(ss);
610 }
611 }
612
613 void
sendNODE_FAILREP(Signal * signal,Uint32 ssId,SectionHandle * handle)614 LocalProxy::sendNODE_FAILREP(Signal* signal, Uint32 ssId, SectionHandle* handle)
615 {
616 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
617
618 NodeFailRep* req = (NodeFailRep*)signal->getDataPtrSend();
619 *req = ss.m_req;
620 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
621 signal, NodeFailRep::SignalLength, JBB, handle);
622 }
623
624 void
execNF_COMPLETEREP(Signal * signal)625 LocalProxy::execNF_COMPLETEREP(Signal* signal)
626 {
627 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
628 ndbrequire(!ss.noReply(number()));
629 ss.m_workerMask.set(ss.m_worker); // Avoid require in recvCONF
630 recvCONF(signal, ss);
631 }
632
633 void
sendNF_COMPLETEREP(Signal * signal,Uint32 ssId)634 LocalProxy::sendNF_COMPLETEREP(Signal* signal, Uint32 ssId)
635 {
636 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
637
638 const NFCompleteRep* conf = (const NFCompleteRep*)signal->getDataPtr();
639 Uint32 node = conf->failedNodeId;
640
641 {
642 NdbNodeBitmask& waitFor = ss.m_waitFor[ss.m_worker];
643 ndbrequire(waitFor.get(node));
644 waitFor.clear(node);
645 }
646
647 for (Uint32 i = 0; i < c_workers; i++)
648 {
649 jam();
650 NdbNodeBitmask& waitFor = ss.m_waitFor[i];
651 if (waitFor.get(node))
652 {
653 jam();
654 /**
655 * Not all threads are done with this failed node
656 */
657 return;
658 }
659 }
660
661 {
662 NFCompleteRep* conf = (NFCompleteRep*)signal->getDataPtrSend();
663 conf->blockNo = number();
664 conf->nodeId = getOwnNodeId();
665 conf->failedNodeId = node;
666 conf->unused = 0;
667 conf->from = __LINE__;
668
669 sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
670 signal, NFCompleteRep::SignalLength, JBB);
671
672 if (number() == DBTC)
673 {
674 /**
675 * DBTC send NF_COMPLETEREP "early" to QMGR
676 * so that it can allow api to handle node-failure of
677 * transactions eariler...
678 * See Qmgr::execNF_COMPLETEREP
679 */
680 jam();
681 sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
682 NFCompleteRep::SignalLength, JBB);
683 }
684 }
685 }
686
687 // GSN_INCL_NODEREQ
688
689 void
execINCL_NODEREQ(Signal * signal)690 LocalProxy::execINCL_NODEREQ(Signal* signal)
691 {
692 Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
693
694 ss.m_reqlength = signal->getLength();
695 ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
696 memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
697
698 sendREQ(signal, ss);
699 }
700
701 void
sendINCL_NODEREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)702 LocalProxy::sendINCL_NODEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
703 {
704 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
705
706 Ss_INCL_NODEREQ::Req* req =
707 (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
708
709 memcpy(req, &ss.m_req, ss.m_reqlength << 2);
710 req->senderRef = reference();
711 sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
712 signal, ss.m_reqlength, JBB, handle);
713 }
714
715 void
execINCL_NODECONF(Signal * signal)716 LocalProxy::execINCL_NODECONF(Signal* signal)
717 {
718 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
719 recvCONF(signal, ss);
720 }
721
722 void
sendINCL_NODECONF(Signal * signal,Uint32 ssId)723 LocalProxy::sendINCL_NODECONF(Signal* signal, Uint32 ssId)
724 {
725 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
726
727 if (!lastReply(ss))
728 return;
729
730 Ss_INCL_NODEREQ::Conf* conf =
731 (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
732
733 conf->inclNodeId = ss.m_req.inclNodeId;
734 conf->senderRef = reference();
735 sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
736 signal, 2, JBB);
737
738 ssRelease<Ss_INCL_NODEREQ>(ssId);
739 }
740
741 // GSN_NODE_STATE_REP
742
743 void
execNODE_STATE_REP(Signal * signal)744 LocalProxy::execNODE_STATE_REP(Signal* signal)
745 {
746 Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
747 sendREQ(signal, ss);
748 SimulatedBlock::execNODE_STATE_REP(signal);
749 ssRelease<Ss_NODE_STATE_REP>(ss);
750 }
751
752 void
sendNODE_STATE_REP(Signal * signal,Uint32 ssId,SectionHandle * handle)753 LocalProxy::sendNODE_STATE_REP(Signal* signal, Uint32 ssId,
754 SectionHandle* handle)
755 {
756 Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
757
758 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
759 signal,NodeStateRep::SignalLength, JBB, handle);
760 }
761
762 // GSN_CHANGE_NODE_STATE_REQ
763
764 void
execCHANGE_NODE_STATE_REQ(Signal * signal)765 LocalProxy::execCHANGE_NODE_STATE_REQ(Signal* signal)
766 {
767 Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
768
769 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
770 ss.m_req = *req;
771
772 sendREQ(signal, ss);
773 }
774
775 void
sendCHANGE_NODE_STATE_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)776 LocalProxy::sendCHANGE_NODE_STATE_REQ(Signal* signal, Uint32 ssId,
777 SectionHandle* handle)
778 {
779 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
780
781 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
782 req->senderRef = reference();
783
784 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
785 signal, ChangeNodeStateReq::SignalLength, JBB, handle);
786 }
787
788 void
execCHANGE_NODE_STATE_CONF(Signal * signal)789 LocalProxy::execCHANGE_NODE_STATE_CONF(Signal* signal)
790 {
791 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
792
793 ChangeNodeStateConf * conf = (ChangeNodeStateConf*)signal->getDataPtrSend();
794 ndbrequire(conf->senderData == ss.m_req.senderData);
795 recvCONF(signal, ss);
796 }
797
798 void
sendCHANGE_NODE_STATE_CONF(Signal * signal,Uint32 ssId)799 LocalProxy::sendCHANGE_NODE_STATE_CONF(Signal* signal, Uint32 ssId)
800 {
801 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
802
803 if (!lastReply(ss))
804 return;
805
806 /**
807 * SimulatedBlock::execCHANGE_NODE_STATE_REQ will reply
808 */
809 ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
810 * req = ss.m_req;
811 SimulatedBlock::execCHANGE_NODE_STATE_REQ(signal);
812 ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
813 }
814
815 // GSN_DUMP_STATE_ORD
816
817 void
execDUMP_STATE_ORD(Signal * signal)818 LocalProxy::execDUMP_STATE_ORD(Signal* signal)
819 {
820 Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
821
822 ss.m_reqlength = signal->getLength();
823 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
824 sendREQ(signal, ss);
825 ssRelease<Ss_DUMP_STATE_ORD>(ss);
826 }
827
828 void
sendDUMP_STATE_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)829 LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId,
830 SectionHandle* handle)
831 {
832 Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
833
834 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
835 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
836 signal, ss.m_reqlength, JBB, handle);
837 }
838
839 // GSN_NDB_TAMPER
840
841 void
execNDB_TAMPER(Signal * signal)842 LocalProxy::execNDB_TAMPER(Signal* signal)
843 {
844 Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
845
846 ndbrequire(signal->getLength() == 1);
847 ss.m_errorInsert = signal->theData[0];
848
849 SimulatedBlock::execNDB_TAMPER(signal);
850 sendREQ(signal, ss);
851 ssRelease<Ss_NDB_TAMPER>(ss);
852 }
853
854 void
sendNDB_TAMPER(Signal * signal,Uint32 ssId,SectionHandle * handle)855 LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId, SectionHandle* handle)
856 {
857 Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
858
859 signal->theData[0] = ss.m_errorInsert;
860 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
861 signal, 1, JBB, handle);
862 }
863
864 // GSN_TIME_SIGNAL
865
866 void
execTIME_SIGNAL(Signal * signal)867 LocalProxy::execTIME_SIGNAL(Signal* signal)
868 {
869 Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
870
871 sendREQ(signal, ss);
872 ssRelease<Ss_TIME_SIGNAL>(ss);
873 }
874
875 void
sendTIME_SIGNAL(Signal * signal,Uint32 ssId,SectionHandle * handle)876 LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId, SectionHandle* handle)
877 {
878 Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
879 signal->theData[0] = 0;
880 sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
881 signal, 1, JBB, handle);
882 }
883
884 // GSN_CREATE_TRIG_IMPL_REQ
885
886 void
execCREATE_TRIG_IMPL_REQ(Signal * signal)887 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
888 {
889 if (!assembleFragments(signal))
890 return;
891
892 if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
893 return;
894 const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
895 Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
896 ss.m_req = *req;
897 ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
898
899 SectionHandle handle(this, signal);
900 saveSections(ss, handle);
901
902 sendREQ(signal, ss);
903 }
904
905 void
sendCREATE_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)906 LocalProxy::sendCREATE_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
907 SectionHandle * handle)
908 {
909 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
910
911 CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
912 *req = ss.m_req;
913 req->senderRef = reference();
914 req->senderData = ssId;
915 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
916 signal, CreateTrigImplReq::SignalLength, JBB,
917 handle);
918 }
919
920 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)921 LocalProxy::execCREATE_TRIG_IMPL_CONF(Signal* signal)
922 {
923 const CreateTrigImplConf* conf = (const CreateTrigImplConf*)signal->getDataPtr();
924 Uint32 ssId = conf->senderData;
925 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
926 recvCONF(signal, ss);
927 }
928
929 void
execCREATE_TRIG_IMPL_REF(Signal * signal)930 LocalProxy::execCREATE_TRIG_IMPL_REF(Signal* signal)
931 {
932 const CreateTrigImplRef* ref = (const CreateTrigImplRef*)signal->getDataPtr();
933 Uint32 ssId = ref->senderData;
934 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
935 recvREF(signal, ss, ref->errorCode);
936 }
937
938 void
sendCREATE_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)939 LocalProxy::sendCREATE_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
940 {
941 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
942 BlockReference dictRef = ss.m_req.senderRef;
943
944 if (!lastReply(ss))
945 return;
946
947 if (ss.m_error == 0) {
948 jam();
949 CreateTrigImplConf* conf = (CreateTrigImplConf*)signal->getDataPtrSend();
950 conf->senderRef = reference();
951 conf->senderData = ss.m_req.senderData;
952 conf->tableId = ss.m_req.tableId;
953 conf->triggerId = ss.m_req.triggerId;
954 conf->triggerInfo = ss.m_req.triggerInfo;
955 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
956 signal, CreateTrigImplConf::SignalLength, JBB);
957 } else {
958 CreateTrigImplRef* ref = (CreateTrigImplRef*)signal->getDataPtrSend();
959 ref->senderRef = reference();
960 ref->senderData = ss.m_req.senderData;
961 ref->tableId = ss.m_req.tableId;
962 ref->triggerId = ss.m_req.triggerId;
963 ref->triggerInfo = ss.m_req.triggerInfo;
964 ref->errorCode = ss.m_error;
965 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
966 signal, CreateTrigImplRef::SignalLength, JBB);
967 }
968
969 ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
970 }
971
972 // GSN_DROP_TRIG_IMPL_REQ
973
974 void
execDROP_TRIG_IMPL_REQ(Signal * signal)975 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
976 {
977 if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
978 return;
979 const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
980 Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
981 ss.m_req = *req;
982 ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
983 sendREQ(signal, ss);
984 }
985
986 void
sendDROP_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)987 LocalProxy::sendDROP_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
988 SectionHandle * handle)
989 {
990 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
991
992 DropTrigImplReq* req = (DropTrigImplReq*)signal->getDataPtrSend();
993 *req = ss.m_req;
994 req->senderRef = reference();
995 req->senderData = ssId;
996 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
997 signal, DropTrigImplReq::SignalLength, JBB, handle);
998 }
999
1000 void
execDROP_TRIG_IMPL_CONF(Signal * signal)1001 LocalProxy::execDROP_TRIG_IMPL_CONF(Signal* signal)
1002 {
1003 const DropTrigImplConf* conf = (const DropTrigImplConf*)signal->getDataPtr();
1004 Uint32 ssId = conf->senderData;
1005 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1006 recvCONF(signal, ss);
1007 }
1008
1009 void
execDROP_TRIG_IMPL_REF(Signal * signal)1010 LocalProxy::execDROP_TRIG_IMPL_REF(Signal* signal)
1011 {
1012 const DropTrigImplRef* ref = (const DropTrigImplRef*)signal->getDataPtr();
1013 Uint32 ssId = ref->senderData;
1014 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1015 recvREF(signal, ss, ref->errorCode);
1016 }
1017
1018 void
sendDROP_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1019 LocalProxy::sendDROP_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1020 {
1021 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1022 BlockReference dictRef = ss.m_req.senderRef;
1023
1024 if (!lastReply(ss))
1025 return;
1026
1027 if (ss.m_error == 0) {
1028 jam();
1029 DropTrigImplConf* conf = (DropTrigImplConf*)signal->getDataPtrSend();
1030 conf->senderRef = reference();
1031 conf->senderData = ss.m_req.senderData;
1032 conf->tableId = ss.m_req.tableId;
1033 conf->triggerId = ss.m_req.triggerId;
1034 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1035 signal, DropTrigImplConf::SignalLength, JBB);
1036 } else {
1037 DropTrigImplRef* ref = (DropTrigImplRef*)signal->getDataPtrSend();
1038 ref->senderRef = reference();
1039 ref->senderData = ss.m_req.senderData;
1040 ref->tableId = ss.m_req.tableId;
1041 ref->triggerId = ss.m_req.triggerId;
1042 ref->errorCode = ss.m_error;
1043 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1044 signal, DropTrigImplRef::SignalLength, JBB);
1045 }
1046
1047 ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1048 }
1049
1050 // GSN_DBINFO_SCANREQ
1051
1052 //#define DBINFO_SCAN_TRACE
1053 #ifdef DBINFO_SCAN_TRACE
1054 #include <debugger/DebuggerNames.hpp>
1055 #endif
1056
1057 static Uint32
switchRef(Uint32 block,Uint32 instance,Uint32 node)1058 switchRef(Uint32 block, Uint32 instance, Uint32 node)
1059 {
1060 const Uint32 ref = numberToRef(block, instance, node);
1061 #ifdef DBINFO_SCAN_TRACE
1062 ndbout_c("Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1063 getBlockName(block, "<unknown>"), instance, node, ref);
1064 #endif
1065 return ref;
1066 }
1067
1068
1069 bool
find_next(Ndbinfo::ScanCursor * cursor) const1070 LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
1071 {
1072 const Uint32 node = refToNode(cursor->currRef);
1073 const Uint32 block = refToMain(cursor->currRef);
1074 Uint32 instance = refToInstance(cursor->currRef);
1075
1076 ndbrequire(node == getOwnNodeId());
1077 ndbrequire(block == number());
1078
1079
1080 Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1081
1082 if (worker < c_workers)
1083 {
1084 jam();
1085 cursor->currRef = switchRef(block, workerInstance(worker), node);
1086 return true;
1087 }
1088
1089 cursor->currRef = numberToRef(block, node);
1090 return false;
1091 }
1092
1093
1094
1095 void
execDBINFO_SCANREQ(Signal * signal)1096 LocalProxy::execDBINFO_SCANREQ(Signal* signal)
1097 {
1098 jamEntry();
1099 const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
1100 Uint32 signal_length = signal->getLength();
1101 ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1102
1103 Ndbinfo::ScanCursor* cursor =
1104 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
1105
1106 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
1107 cursor->saveCurrRef)
1108 {
1109 jam();
1110 /* Continue in the saved block ref */
1111 cursor->currRef = cursor->saveCurrRef;
1112 cursor->saveCurrRef = 0;
1113
1114 // Set this block as sender and remember original sender
1115 cursor->saveSenderRef = cursor->senderRef;
1116 cursor->senderRef = reference();
1117
1118 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1119 signal, signal_length, JBB);
1120 return;
1121 }
1122
1123 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
1124
1125 if (find_next(cursor))
1126 {
1127 jam();
1128 ndbrequire(cursor->currRef);
1129 ndbrequire(cursor->saveCurrRef == 0);
1130
1131 // Set this block as sender and remember original sender
1132 cursor->saveSenderRef = cursor->senderRef;
1133 cursor->senderRef = reference();
1134
1135 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1136 signal, signal_length, JBB);
1137 return;
1138 }
1139
1140 /* Scan is done, send SCANCONF back to caller */
1141 ndbrequire(cursor->saveSenderRef == 0);
1142
1143 ndbrequire(cursor->currRef);
1144 ndbrequire(cursor->saveCurrRef == 0);
1145
1146 ndbrequire(refToInstance(cursor->currRef) == 0);
1147 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1148 return;
1149 }
1150
1151 void
execDBINFO_SCANCONF(Signal * signal)1152 LocalProxy::execDBINFO_SCANCONF(Signal* signal)
1153 {
1154 jamEntry();
1155 const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
1156 Uint32 signal_length = signal->getLength();
1157 ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1158
1159 Ndbinfo::ScanCursor* cursor =
1160 (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
1161
1162 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
1163 {
1164 /* The instance has more data and want to continue */
1165 jam();
1166
1167 /* Swap back saved senderRef */
1168 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1169 cursor->saveSenderRef = 0;
1170
1171 /* Save currRef to continue with same instance again */
1172 cursor->saveCurrRef = cursor->currRef;
1173 cursor->currRef = reference();
1174
1175 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1176 return;
1177 }
1178
1179 if (conf->returnedRows)
1180 {
1181 jam();
1182 /*
1183 The instance has no more data, but it has sent rows
1184 to the API which need to be CONFed
1185 */
1186
1187 /* Swap back saved senderRef */
1188 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1189 cursor->saveSenderRef = 0;
1190
1191 if (find_next(cursor))
1192 {
1193 /*
1194 There is another instance to continue in - signal 'more data'
1195 and setup saveCurrRef to continue in that instance
1196 */
1197 jam();
1198 Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
1199
1200 cursor->saveCurrRef = cursor->currRef;
1201 cursor->currRef = reference();
1202 }
1203 else
1204 {
1205 /* There was no more instances to continue in */
1206 ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) == false);
1207
1208 ndbrequire(cursor->currRef == reference());
1209 cursor->saveCurrRef = 0;
1210 }
1211
1212 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1213 return;
1214 }
1215
1216
1217 /* The underlying block reported completed, find next if any */
1218 if (find_next(cursor))
1219 {
1220 jam();
1221
1222 ndbrequire(cursor->senderRef == reference());
1223 ndbrequire(cursor->saveSenderRef); // Should already be set
1224
1225 ndbrequire(cursor->saveCurrRef == 0);
1226
1227 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1228 signal, signal_length, JBB);
1229 return;
1230 }
1231
1232 /* Scan in this block and its instances are completed */
1233
1234 /* Swap back saved senderRef */
1235 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1236 cursor->saveSenderRef = 0;
1237
1238 ndbrequire(cursor->currRef);
1239 ndbrequire(cursor->saveCurrRef == 0);
1240
1241 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1242 return;
1243 }
1244
1245 // GSN_SYNC_REQ
1246
1247 void
execSYNC_REQ(Signal * signal)1248 LocalProxy::execSYNC_REQ(Signal* signal)
1249 {
1250 Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1251
1252 ss.m_req = * CAST_CONSTPTR(SyncReq, signal->getDataPtr());
1253
1254 sendREQ(signal, ss);
1255 }
1256
1257 void
sendSYNC_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1258 LocalProxy::sendSYNC_REQ(Signal* signal, Uint32 ssId,
1259 SectionHandle* handle)
1260 {
1261 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1262
1263 SyncReq * req = CAST_PTR(SyncReq, signal->getDataPtrSend());
1264 req->senderRef = reference();
1265 req->senderData = ssId;
1266 req->prio = ss.m_req.prio;
1267
1268 sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1269 signal, SyncReq::SignalLength,
1270 JobBufferLevel(ss.m_req.prio), handle);
1271 }
1272
1273 void
execSYNC_REF(Signal * signal)1274 LocalProxy::execSYNC_REF(Signal* signal)
1275 {
1276 SyncRef ref = * CAST_CONSTPTR(SyncRef, signal->getDataPtr());
1277 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1278
1279 recvREF(signal, ss, ref.errorCode);
1280 }
1281
1282 void
execSYNC_CONF(Signal * signal)1283 LocalProxy::execSYNC_CONF(Signal* signal)
1284 {
1285 SyncConf conf = * CAST_CONSTPTR(SyncConf, signal->getDataPtr());
1286 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1287
1288 recvCONF(signal, ss);
1289 }
1290
1291 void
sendSYNC_CONF(Signal * signal,Uint32 ssId)1292 LocalProxy::sendSYNC_CONF(Signal* signal, Uint32 ssId)
1293 {
1294 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1295
1296 if (!lastReply(ss))
1297 return;
1298
1299 /**
1300 * SimulatedBlock::execSYNC_REQ will reply
1301 */
1302 if (ss.m_error == 0)
1303 {
1304 jam();
1305 SyncConf * conf = CAST_PTR(SyncConf, signal->getDataPtrSend());
1306 conf->senderRef = reference();
1307 conf->senderData = ss.m_req.senderData;
1308
1309 Uint32 prio = ss.m_req.prio;
1310 sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1311 SyncConf::SignalLength,
1312 JobBufferLevel(prio));
1313 }
1314 else
1315 {
1316 jam();
1317 SyncRef * ref = CAST_PTR(SyncRef, signal->getDataPtrSend());
1318 ref->senderRef = reference();
1319 ref->senderData = ss.m_req.senderData;
1320 ref->errorCode = ss.m_error;
1321
1322 Uint32 prio = ss.m_req.prio;
1323 sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1324 SyncRef::SignalLength,
1325 JobBufferLevel(prio));
1326 }
1327 ssRelease<Ss_SYNC_REQ>(ssId);
1328 }
1329
1330 void
execSYNC_PATH_REQ(Signal * signal)1331 LocalProxy::execSYNC_PATH_REQ(Signal* signal)
1332 {
1333 SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
1334 req->count *= c_workers;
1335
1336 for (Uint32 i = 0; i < c_workers; i++)
1337 {
1338 jam();
1339 Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1340 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1341 signal->getLength(),
1342 JobBufferLevel(req->prio));
1343 }
1344 }
1345
1346 BLOCK_FUNCTIONS(LocalProxy)
1347