1 /* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
22 
23 #include <mt.hpp>
24 #include "LocalProxy.hpp"
25 
LocalProxy(BlockNumber blockNumber,Block_context & ctx)26 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
27   SimulatedBlock(blockNumber, ctx)
28 {
29   BLOCK_CONSTRUCTOR(LocalProxy);
30 
31   ndbrequire(instance() == 0); // this is main block
32   c_lqhWorkers = 0;
33   c_extraWorkers = 0; // sub-class constructor can set
34   c_workers = 0;
35   Uint32 i;
36   for (i = 0; i < MaxWorkers; i++)
37     c_worker[i] = 0;
38 
39   c_ssIdSeq = 0;
40 
41   c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
42   c_masterNodeId = ZNIL;
43 
44   // GSN_READ_CONFIG_REQ
45   addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
46   addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
47 
48   // GSN_STTOR
49   addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
50   addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
51 
52   // GSN_NDB_STTOR
53   addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
54   addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
55 
56   // GSN_READ_NODESREQ
57   addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
58   addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
59 
60   // GSN_NODE_FAILREP
61   addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
62   addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
63 
64   // GSN_INCL_NODEREQ
65   addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
66   addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
67 
68   // GSN_NODE_STATE_REP
69   addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP, true);
70 
71   // GSN_CHANGE_NODE_STATE_REQ
72   addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ, true);
73   addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
74 
75   // GSN_DUMP_STATE_ORD
76   addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
77 
78   // GSN_NDB_TAMPER
79   addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
80 
81   // GSN_TIME_SIGNAL
82   addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
83 
84   // GSN_CREATE_TRIG_IMPL_REQ
85   addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
86   addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
87   addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
88 
89   // GSN_DROP_TRIG_IMPL_REQ
90   addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
91   addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
92   addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
93 
94   // GSN_DBINFO_SCANREQ
95   addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
96   addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
97 
98   // GSN_SYNC_REQ
99   addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ, true);
100   addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
101   addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
102 
103   // GSN_SYNC_PATH_REQ
104   addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ, true);
105 }
106 
~LocalProxy()107 LocalProxy::~LocalProxy()
108 {
109   // dtor of main block deletes workers
110 }
111 
112 // support routines
113 
114 void
sendREQ(Signal * signal,SsSequential & ss)115 LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
116 {
117   ss.m_worker = 0;
118   ndbrequire(ss.m_sendREQ != 0);
119   SectionHandle handle(this);
120   restoreHandle(handle, ss);
121   (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
122   saveSections(ss, handle);
123 }
124 
125 void
recvCONF(Signal * signal,SsSequential & ss)126 LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
127 {
128   ndbrequire(ss.m_sendCONF != 0);
129   (this->*ss.m_sendCONF)(signal, ss.m_ssId);
130 
131   ss.m_worker++;
132   if (ss.m_worker < c_workers) {
133     jam();
134     ndbrequire(ss.m_sendREQ != 0);
135     SectionHandle handle(this);
136     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
137     return;
138   }
139 }
140 
141 void
recvREF(Signal * signal,SsSequential & ss,Uint32 error)142 LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
143 {
144   ndbrequire(error != 0);
145   if (ss.m_error == 0)
146     ss.m_error = error;
147   recvCONF(signal, ss);
148 }
149 
150 void
skipReq(SsSequential & ss)151 LocalProxy::skipReq(SsSequential& ss)
152 {
153 }
154 
155 void
skipConf(SsSequential & ss)156 LocalProxy::skipConf(SsSequential& ss)
157 {
158 }
159 
160 void
saveSections(SsCommon & ss,SectionHandle & handle)161 LocalProxy::saveSections(SsCommon& ss, SectionHandle & handle)
162 {
163   ss.m_sec_cnt = handle.m_cnt;
164   for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
165     ss.m_sec_ptr[i] = handle.m_ptr[i].i;
166   handle.clear();
167 }
168 
169 void
restoreHandle(SectionHandle & handle,SsCommon & ss)170 LocalProxy::restoreHandle(SectionHandle & handle, SsCommon& ss)
171 {
172   handle.m_cnt = ss.m_sec_cnt;
173   for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
174     handle.m_ptr[i].i = ss.m_sec_ptr[i];
175 
176   getSections(handle.m_cnt, handle.m_ptr);
177   ss.m_sec_cnt = 0;
178 }
179 
180 bool
firstReply(const SsSequential & ss)181 LocalProxy::firstReply(const SsSequential& ss)
182 {
183   return ss.m_worker == 0;
184 }
185 
186 bool
lastReply(const SsSequential & ss)187 LocalProxy::lastReply(const SsSequential& ss)
188 {
189   return ss.m_worker + 1 == c_workers;
190 }
191 
192 void
sendREQ(Signal * signal,SsParallel & ss)193 LocalProxy::sendREQ(Signal* signal, SsParallel& ss)
194 {
195   ndbrequire(ss.m_sendREQ != 0);
196 
197   ss.m_workerMask.clear();
198   ss.m_worker = 0;
199   const Uint32 count = ss.m_extraLast ? c_lqhWorkers : c_workers;
200   SectionHandle handle(this);
201   restoreHandle(handle, ss);
202   while (ss.m_worker < count) {
203     jam();
204     ss.m_workerMask.set(ss.m_worker);
205     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
206     ss.m_worker++;
207   }
208   releaseSections(handle);
209 }
210 
211 void
recvCONF(Signal * signal,SsParallel & ss)212 LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
213 {
214   ndbrequire(ss.m_sendCONF != 0);
215 
216   BlockReference ref = signal->getSendersBlockRef();
217   ndbrequire(refToMain(ref) == number());
218 
219   Uint32 ino = refToInstance(ref);
220   ss.m_worker = workerIndex(ino);
221   ndbrequire(ref == workerRef(ss.m_worker));
222   ndbrequire(ss.m_worker < c_workers);
223   ndbrequire(ss.m_workerMask.get(ss.m_worker));
224   ss.m_workerMask.clear(ss.m_worker);
225 
226   (this->*ss.m_sendCONF)(signal, ss.m_ssId);
227 }
228 
229 void
recvREF(Signal * signal,SsParallel & ss,Uint32 error)230 LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
231 {
232   ndbrequire(error != 0);
233   if (ss.m_error == 0)
234     ss.m_error = error;
235   recvCONF(signal, ss);
236 }
237 
238 void
skipReq(SsParallel & ss)239 LocalProxy::skipReq(SsParallel& ss)
240 {
241   ndbrequire(ss.m_workerMask.get(ss.m_worker));
242   ss.m_workerMask.clear(ss.m_worker);
243 }
244 
245 // more replies expected from this worker
246 void
skipConf(SsParallel & ss)247 LocalProxy::skipConf(SsParallel& ss)
248 {
249   ndbrequire(!ss.m_workerMask.get(ss.m_worker));
250   ss.m_workerMask.set(ss.m_worker);
251 }
252 
253 bool
firstReply(const SsParallel & ss)254 LocalProxy::firstReply(const SsParallel& ss)
255 {
256   const WorkerMask& mask = ss.m_workerMask;
257   const Uint32 count = mask.count();
258 
259   // recvCONF has cleared current worker
260   ndbrequire(ss.m_worker < c_workers);
261   ndbrequire(!mask.get(ss.m_worker));
262   ndbrequire(count < c_workers);
263   return count + 1 == c_workers;
264 }
265 
266 bool
lastReply(const SsParallel & ss)267 LocalProxy::lastReply(const SsParallel& ss)
268 {
269   return ss.m_workerMask.isclear();
270 }
271 
272 bool
lastExtra(Signal * signal,SsParallel & ss)273 LocalProxy::lastExtra(Signal* signal, SsParallel& ss)
274 {
275   SectionHandle handle(this);
276   if (c_lqhWorkers + ss.m_extraSent < c_workers) {
277     jam();
278     ss.m_worker = c_lqhWorkers + ss.m_extraSent;
279     ss.m_workerMask.set(ss.m_worker);
280     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
281     ss.m_extraSent++;
282     return false;
283   }
284   return true;
285 }
286 
287 // used in "reverse" proxying (start with worker REQs)
288 void
setMask(SsParallel & ss)289 LocalProxy::setMask(SsParallel& ss)
290 {
291   Uint32 i;
292   for (i = 0; i < c_workers; i++)
293     ss.m_workerMask.set(i);
294 }
295 
296 void
setMask(SsParallel & ss,const WorkerMask & mask)297 LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
298 {
299   ss.m_workerMask.assign(mask);
300 }
301 
302 // load workers (before first signal)
303 
304 void
loadWorkers()305 LocalProxy::loadWorkers()
306 {
307   c_lqhWorkers = getLqhWorkers();
308   c_workers = c_lqhWorkers + c_extraWorkers;
309 
310   Uint32 i;
311   for (i = 0; i < c_workers; i++) {
312     jam();
313     Uint32 instanceNo = workerInstance(i);
314 
315     SimulatedBlock* worker = newWorker(instanceNo);
316     ndbrequire(worker->instance() == instanceNo);
317     ndbrequire(this->getInstance(instanceNo) == worker);
318     c_worker[i] = worker;
319 
320     if (i < c_lqhWorkers) {
321       add_lqh_worker_thr_map(number(), instanceNo);
322     } else {
323       add_extra_worker_thr_map(number(), instanceNo);
324     }
325   }
326 }
327 
328 // GSN_READ_CONFIG_REQ
329 
330 void
execREAD_CONFIG_REQ(Signal * signal)331 LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
332 {
333   Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
334 
335   const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
336   ss.m_req = *req;
337   ndbrequire(ss.m_req.noOfParameters == 0);
338   callREAD_CONFIG_REQ(signal);
339 }
340 
341 void
callREAD_CONFIG_REQ(Signal * signal)342 LocalProxy::callREAD_CONFIG_REQ(Signal* signal)
343 {
344   backREAD_CONFIG_REQ(signal);
345 }
346 
347 void
backREAD_CONFIG_REQ(Signal * signal)348 LocalProxy::backREAD_CONFIG_REQ(Signal* signal)
349 {
350   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
351 
352   // run sequentially due to big mallocs and initializations
353   sendREQ(signal, ss);
354 }
355 
356 void
sendREAD_CONFIG_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)357 LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId,
358                                 SectionHandle* handle)
359 {
360   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
361 
362   ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
363   req->senderRef = reference();
364   req->senderData = ssId;
365   req->noOfParameters = 0;
366   sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
367                       signal, ReadConfigReq::SignalLength, JBB, handle);
368 }
369 
370 void
execREAD_CONFIG_CONF(Signal * signal)371 LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
372 {
373   const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
374   Uint32 ssId = conf->senderData;
375   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
376   recvCONF(signal, ss);
377 }
378 
379 void
sendREAD_CONFIG_CONF(Signal * signal,Uint32 ssId)380 LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
381 {
382   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
383 
384   if (!lastReply(ss))
385     return;
386 
387   SectionHandle handle(this);
388   restoreHandle(handle, ss);
389   releaseSections(handle);
390 
391   ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
392   conf->senderRef = reference();
393   conf->senderData = ss.m_req.senderData;
394   sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
395              signal, ReadConfigConf::SignalLength, JBB);
396 
397   ssRelease<Ss_READ_CONFIG_REQ>(ssId);
398 }
399 
400 // GSN_STTOR
401 
402 void
execSTTOR(Signal * signal)403 LocalProxy::execSTTOR(Signal* signal)
404 {
405   Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
406 
407   const Uint32 startphase  = signal->theData[1];
408   const Uint32 typeOfStart = signal->theData[7];
409 
410   if (startphase == 3) {
411     jam();
412     c_typeOfStart = typeOfStart;
413   }
414 
415   ss.m_reqlength = signal->getLength();
416   memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
417 
418   callSTTOR(signal);
419 }
420 
421 void
callSTTOR(Signal * signal)422 LocalProxy::callSTTOR(Signal* signal)
423 {
424   backSTTOR(signal);
425 }
426 
427 void
backSTTOR(Signal * signal)428 LocalProxy::backSTTOR(Signal* signal)
429 {
430   Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
431   sendREQ(signal, ss);
432 }
433 
434 void
sendSTTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)435 LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
436 {
437   Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
438 
439   memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
440   sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
441                       signal, ss.m_reqlength, JBB, handle);
442 }
443 
444 void
execSTTORRY(Signal * signal)445 LocalProxy::execSTTORRY(Signal* signal)
446 {
447   Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
448   recvCONF(signal, ss);
449 }
450 
451 void
sendSTTORRY(Signal * signal,Uint32 ssId)452 LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
453 {
454   Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
455 
456   const Uint32 conflength = signal->getLength();
457   const Uint32* confdata = signal->getDataPtr();
458 
459   // the reply is identical from all
460   if (firstReply(ss)) {
461     ss.m_conflength = conflength;
462     memcpy(ss.m_confdata, confdata, conflength << 2);
463   } else {
464     ndbrequire(ss.m_conflength == conflength);
465     ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
466   }
467 
468   if (!lastReply(ss))
469     return;
470 
471   memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
472   sendSignal(NDBCNTR_REF, GSN_STTORRY,
473              signal, ss.m_conflength, JBB);
474 
475   ssRelease<Ss_STTOR>(ssId);
476 }
477 
478 // GSN_NDB_STTOR
479 
480 void
execNDB_STTOR(Signal * signal)481 LocalProxy::execNDB_STTOR(Signal* signal)
482 {
483   Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
484 
485   const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
486   ss.m_req = *req;
487 
488   callNDB_STTOR(signal);
489 }
490 
491 void
callNDB_STTOR(Signal * signal)492 LocalProxy::callNDB_STTOR(Signal* signal)
493 {
494   backNDB_STTOR(signal);
495 }
496 
497 void
backNDB_STTOR(Signal * signal)498 LocalProxy::backNDB_STTOR(Signal* signal)
499 {
500   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
501   sendREQ(signal, ss);
502 }
503 
504 void
sendNDB_STTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)505 LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
506 {
507   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
508 
509   NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
510   *req = ss.m_req;
511   req->senderRef = reference();
512   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
513                       signal, ss.m_reqlength, JBB, handle);
514 }
515 
516 void
execNDB_STTORRY(Signal * signal)517 LocalProxy::execNDB_STTORRY(Signal* signal)
518 {
519   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
520 
521   // the reply contains only senderRef
522   const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
523   ndbrequire(conf->senderRef == signal->getSendersBlockRef());
524   recvCONF(signal, ss);
525 }
526 
527 void
sendNDB_STTORRY(Signal * signal,Uint32 ssId)528 LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
529 {
530   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
531 
532   if (!lastReply(ss))
533     return;
534 
535   NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
536   conf->senderRef = reference();
537   sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
538              signal, NdbSttorry::SignalLength, JBB);
539 
540   ssRelease<Ss_NDB_STTOR>(ssId);
541 }
542 
543 // GSN_READ_NODESREQ
544 
545 void
sendREAD_NODESREQ(Signal * signal)546 LocalProxy::sendREAD_NODESREQ(Signal* signal)
547 {
548   signal->theData[0] = reference();
549   sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
550 }
551 
552 void
execREAD_NODESCONF(Signal * signal)553 LocalProxy::execREAD_NODESCONF(Signal* signal)
554 {
555   Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
556 
557   const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
558 
559   c_masterNodeId = conf->masterNodeId;
560 
561   switch (ss.m_gsn) {
562   case GSN_STTOR:
563     backSTTOR(signal);
564     break;
565   case GSN_NDB_STTOR:
566     backNDB_STTOR(signal);
567     break;
568   default:
569     ndbrequire(false);
570     break;
571   }
572 
573   ss.m_gsn = 0;
574 }
575 
576 void
execREAD_NODESREF(Signal * signal)577 LocalProxy::execREAD_NODESREF(Signal* signal)
578 {
579   Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
580   ndbrequire(ss.m_gsn != 0);
581   ndbrequire(false);
582 }
583 
584 // GSN_NODE_FAILREP
585 
586 void
execNODE_FAILREP(Signal * signal)587 LocalProxy::execNODE_FAILREP(Signal* signal)
588 {
589   Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
590   const NodeFailRep* req = (const NodeFailRep*)signal->getDataPtr();
591   ss.m_req = *req;
592   ndbrequire(signal->getLength() == NodeFailRep::SignalLength);
593 
594   NdbNodeBitmask mask;
595   mask.assign(NdbNodeBitmask::Size, req->theNodes);
596 
597   // from each worker wait for ack for each failed node
598   for (Uint32 i = 0; i < c_workers; i++)
599   {
600     jam();
601     NdbNodeBitmask& waitFor = ss.m_waitFor[i];
602     waitFor.bitOR(mask);
603   }
604 
605   sendREQ(signal, ss);
606   if (ss.noReply(number()))
607   {
608     jam();
609     ssRelease<Ss_NODE_FAILREP>(ss);
610   }
611 }
612 
613 void
sendNODE_FAILREP(Signal * signal,Uint32 ssId,SectionHandle * handle)614 LocalProxy::sendNODE_FAILREP(Signal* signal, Uint32 ssId, SectionHandle* handle)
615 {
616   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
617 
618   NodeFailRep* req = (NodeFailRep*)signal->getDataPtrSend();
619   *req = ss.m_req;
620   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
621                       signal, NodeFailRep::SignalLength, JBB, handle);
622 }
623 
624 void
execNF_COMPLETEREP(Signal * signal)625 LocalProxy::execNF_COMPLETEREP(Signal* signal)
626 {
627   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
628   ndbrequire(!ss.noReply(number()));
629   ss.m_workerMask.set(ss.m_worker); // Avoid require in recvCONF
630   recvCONF(signal, ss);
631 }
632 
633 void
sendNF_COMPLETEREP(Signal * signal,Uint32 ssId)634 LocalProxy::sendNF_COMPLETEREP(Signal* signal, Uint32 ssId)
635 {
636   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
637 
638   const NFCompleteRep* conf = (const NFCompleteRep*)signal->getDataPtr();
639   Uint32 node = conf->failedNodeId;
640 
641   {
642     NdbNodeBitmask& waitFor = ss.m_waitFor[ss.m_worker];
643     ndbrequire(waitFor.get(node));
644     waitFor.clear(node);
645   }
646 
647   for (Uint32 i = 0; i < c_workers; i++)
648   {
649     jam();
650     NdbNodeBitmask& waitFor = ss.m_waitFor[i];
651     if (waitFor.get(node))
652     {
653       jam();
654       /**
655        * Not all threads are done with this failed node
656        */
657       return;
658     }
659   }
660 
661   {
662     NFCompleteRep* conf = (NFCompleteRep*)signal->getDataPtrSend();
663     conf->blockNo = number();
664     conf->nodeId = getOwnNodeId();
665     conf->failedNodeId = node;
666     conf->unused = 0;
667     conf->from = __LINE__;
668 
669     sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
670                signal, NFCompleteRep::SignalLength, JBB);
671 
672     if (number() == DBTC)
673     {
674       /**
675        * DBTC send NF_COMPLETEREP "early" to QMGR
676        *   so that it can allow api to handle node-failure of
677        *   transactions eariler...
678        * See Qmgr::execNF_COMPLETEREP
679        */
680       jam();
681       sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
682                  NFCompleteRep::SignalLength, JBB);
683     }
684   }
685 }
686 
687 // GSN_INCL_NODEREQ
688 
689 void
execINCL_NODEREQ(Signal * signal)690 LocalProxy::execINCL_NODEREQ(Signal* signal)
691 {
692   Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
693 
694   ss.m_reqlength = signal->getLength();
695   ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
696   memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
697 
698   sendREQ(signal, ss);
699 }
700 
701 void
sendINCL_NODEREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)702 LocalProxy::sendINCL_NODEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
703 {
704   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
705 
706   Ss_INCL_NODEREQ::Req* req =
707     (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
708 
709   memcpy(req, &ss.m_req, ss.m_reqlength << 2);
710   req->senderRef = reference();
711   sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
712                       signal, ss.m_reqlength, JBB, handle);
713 }
714 
715 void
execINCL_NODECONF(Signal * signal)716 LocalProxy::execINCL_NODECONF(Signal* signal)
717 {
718   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
719   recvCONF(signal, ss);
720 }
721 
722 void
sendINCL_NODECONF(Signal * signal,Uint32 ssId)723 LocalProxy::sendINCL_NODECONF(Signal* signal, Uint32 ssId)
724 {
725   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
726 
727   if (!lastReply(ss))
728     return;
729 
730   Ss_INCL_NODEREQ::Conf* conf =
731     (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
732 
733   conf->inclNodeId = ss.m_req.inclNodeId;
734   conf->senderRef = reference();
735   sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
736              signal, 2, JBB);
737 
738   ssRelease<Ss_INCL_NODEREQ>(ssId);
739 }
740 
741 // GSN_NODE_STATE_REP
742 
743 void
execNODE_STATE_REP(Signal * signal)744 LocalProxy::execNODE_STATE_REP(Signal* signal)
745 {
746   Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
747   sendREQ(signal, ss);
748   SimulatedBlock::execNODE_STATE_REP(signal);
749   ssRelease<Ss_NODE_STATE_REP>(ss);
750 }
751 
752 void
sendNODE_STATE_REP(Signal * signal,Uint32 ssId,SectionHandle * handle)753 LocalProxy::sendNODE_STATE_REP(Signal* signal, Uint32 ssId,
754                                SectionHandle* handle)
755 {
756   Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
757 
758   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
759                       signal,NodeStateRep::SignalLength, JBB, handle);
760 }
761 
762 // GSN_CHANGE_NODE_STATE_REQ
763 
764 void
execCHANGE_NODE_STATE_REQ(Signal * signal)765 LocalProxy::execCHANGE_NODE_STATE_REQ(Signal* signal)
766 {
767   Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
768 
769   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
770   ss.m_req = *req;
771 
772   sendREQ(signal, ss);
773 }
774 
775 void
sendCHANGE_NODE_STATE_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)776 LocalProxy::sendCHANGE_NODE_STATE_REQ(Signal* signal, Uint32 ssId,
777                                       SectionHandle* handle)
778 {
779   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
780 
781   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
782   req->senderRef = reference();
783 
784   sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
785                       signal, ChangeNodeStateReq::SignalLength, JBB, handle);
786 }
787 
788 void
execCHANGE_NODE_STATE_CONF(Signal * signal)789 LocalProxy::execCHANGE_NODE_STATE_CONF(Signal* signal)
790 {
791   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
792 
793   ChangeNodeStateConf * conf = (ChangeNodeStateConf*)signal->getDataPtrSend();
794   ndbrequire(conf->senderData == ss.m_req.senderData);
795   recvCONF(signal, ss);
796 }
797 
798 void
sendCHANGE_NODE_STATE_CONF(Signal * signal,Uint32 ssId)799 LocalProxy::sendCHANGE_NODE_STATE_CONF(Signal* signal, Uint32 ssId)
800 {
801   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
802 
803   if (!lastReply(ss))
804     return;
805 
806   /**
807    * SimulatedBlock::execCHANGE_NODE_STATE_REQ will reply
808    */
809   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
810   * req = ss.m_req;
811   SimulatedBlock::execCHANGE_NODE_STATE_REQ(signal);
812   ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
813 }
814 
815 // GSN_DUMP_STATE_ORD
816 
817 void
execDUMP_STATE_ORD(Signal * signal)818 LocalProxy::execDUMP_STATE_ORD(Signal* signal)
819 {
820   Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
821 
822   ss.m_reqlength = signal->getLength();
823   memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
824   sendREQ(signal, ss);
825   ssRelease<Ss_DUMP_STATE_ORD>(ss);
826 }
827 
828 void
sendDUMP_STATE_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)829 LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId,
830                                SectionHandle* handle)
831 {
832   Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
833 
834   memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
835   sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
836                       signal, ss.m_reqlength, JBB, handle);
837 }
838 
839 // GSN_NDB_TAMPER
840 
841 void
execNDB_TAMPER(Signal * signal)842 LocalProxy::execNDB_TAMPER(Signal* signal)
843 {
844   Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
845 
846   ndbrequire(signal->getLength() == 1);
847   ss.m_errorInsert = signal->theData[0];
848 
849   SimulatedBlock::execNDB_TAMPER(signal);
850   sendREQ(signal, ss);
851   ssRelease<Ss_NDB_TAMPER>(ss);
852 }
853 
854 void
sendNDB_TAMPER(Signal * signal,Uint32 ssId,SectionHandle * handle)855 LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId, SectionHandle* handle)
856 {
857   Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
858 
859   signal->theData[0] = ss.m_errorInsert;
860   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
861                       signal, 1, JBB, handle);
862 }
863 
864 // GSN_TIME_SIGNAL
865 
866 void
execTIME_SIGNAL(Signal * signal)867 LocalProxy::execTIME_SIGNAL(Signal* signal)
868 {
869   Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
870 
871   sendREQ(signal, ss);
872   ssRelease<Ss_TIME_SIGNAL>(ss);
873 }
874 
875 void
sendTIME_SIGNAL(Signal * signal,Uint32 ssId,SectionHandle * handle)876 LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId, SectionHandle* handle)
877 {
878   Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
879   signal->theData[0] = 0;
880   sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
881                       signal, 1, JBB, handle);
882 }
883 
884 // GSN_CREATE_TRIG_IMPL_REQ
885 
886 void
execCREATE_TRIG_IMPL_REQ(Signal * signal)887 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
888 {
889   if (!assembleFragments(signal))
890     return;
891 
892   if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
893     return;
894   const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
895   Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
896   ss.m_req = *req;
897   ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
898 
899   SectionHandle handle(this, signal);
900   saveSections(ss, handle);
901 
902   sendREQ(signal, ss);
903 }
904 
905 void
sendCREATE_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)906 LocalProxy::sendCREATE_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
907                                      SectionHandle * handle)
908 {
909   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
910 
911   CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
912   *req = ss.m_req;
913   req->senderRef = reference();
914   req->senderData = ssId;
915   sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
916                       signal, CreateTrigImplReq::SignalLength, JBB,
917                       handle);
918 }
919 
920 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)921 LocalProxy::execCREATE_TRIG_IMPL_CONF(Signal* signal)
922 {
923   const CreateTrigImplConf* conf = (const CreateTrigImplConf*)signal->getDataPtr();
924   Uint32 ssId = conf->senderData;
925   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
926   recvCONF(signal, ss);
927 }
928 
929 void
execCREATE_TRIG_IMPL_REF(Signal * signal)930 LocalProxy::execCREATE_TRIG_IMPL_REF(Signal* signal)
931 {
932   const CreateTrigImplRef* ref = (const CreateTrigImplRef*)signal->getDataPtr();
933   Uint32 ssId = ref->senderData;
934   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
935   recvREF(signal, ss, ref->errorCode);
936 }
937 
938 void
sendCREATE_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)939 LocalProxy::sendCREATE_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
940 {
941   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
942   BlockReference dictRef = ss.m_req.senderRef;
943 
944   if (!lastReply(ss))
945     return;
946 
947   if (ss.m_error == 0) {
948     jam();
949     CreateTrigImplConf* conf = (CreateTrigImplConf*)signal->getDataPtrSend();
950     conf->senderRef = reference();
951     conf->senderData = ss.m_req.senderData;
952     conf->tableId = ss.m_req.tableId;
953     conf->triggerId = ss.m_req.triggerId;
954     conf->triggerInfo = ss.m_req.triggerInfo;
955     sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
956                signal, CreateTrigImplConf::SignalLength, JBB);
957   } else {
958     CreateTrigImplRef* ref = (CreateTrigImplRef*)signal->getDataPtrSend();
959     ref->senderRef = reference();
960     ref->senderData = ss.m_req.senderData;
961     ref->tableId = ss.m_req.tableId;
962     ref->triggerId = ss.m_req.triggerId;
963     ref->triggerInfo = ss.m_req.triggerInfo;
964     ref->errorCode = ss.m_error;
965     sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
966                signal, CreateTrigImplRef::SignalLength, JBB);
967   }
968 
969   ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
970 }
971 
972 // GSN_DROP_TRIG_IMPL_REQ
973 
974 void
execDROP_TRIG_IMPL_REQ(Signal * signal)975 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
976 {
977   if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
978     return;
979   const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
980   Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
981   ss.m_req = *req;
982   ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
983   sendREQ(signal, ss);
984 }
985 
986 void
sendDROP_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)987 LocalProxy::sendDROP_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
988                                    SectionHandle * handle)
989 {
990   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
991 
992   DropTrigImplReq* req = (DropTrigImplReq*)signal->getDataPtrSend();
993   *req = ss.m_req;
994   req->senderRef = reference();
995   req->senderData = ssId;
996   sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
997                       signal, DropTrigImplReq::SignalLength, JBB, handle);
998 }
999 
1000 void
execDROP_TRIG_IMPL_CONF(Signal * signal)1001 LocalProxy::execDROP_TRIG_IMPL_CONF(Signal* signal)
1002 {
1003   const DropTrigImplConf* conf = (const DropTrigImplConf*)signal->getDataPtr();
1004   Uint32 ssId = conf->senderData;
1005   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1006   recvCONF(signal, ss);
1007 }
1008 
1009 void
execDROP_TRIG_IMPL_REF(Signal * signal)1010 LocalProxy::execDROP_TRIG_IMPL_REF(Signal* signal)
1011 {
1012   const DropTrigImplRef* ref = (const DropTrigImplRef*)signal->getDataPtr();
1013   Uint32 ssId = ref->senderData;
1014   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1015   recvREF(signal, ss, ref->errorCode);
1016 }
1017 
1018 void
sendDROP_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1019 LocalProxy::sendDROP_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1020 {
1021   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1022   BlockReference dictRef = ss.m_req.senderRef;
1023 
1024   if (!lastReply(ss))
1025     return;
1026 
1027   if (ss.m_error == 0) {
1028     jam();
1029     DropTrigImplConf* conf = (DropTrigImplConf*)signal->getDataPtrSend();
1030     conf->senderRef = reference();
1031     conf->senderData = ss.m_req.senderData;
1032     conf->tableId = ss.m_req.tableId;
1033     conf->triggerId = ss.m_req.triggerId;
1034     sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1035                signal, DropTrigImplConf::SignalLength, JBB);
1036   } else {
1037     DropTrigImplRef* ref = (DropTrigImplRef*)signal->getDataPtrSend();
1038     ref->senderRef = reference();
1039     ref->senderData = ss.m_req.senderData;
1040     ref->tableId = ss.m_req.tableId;
1041     ref->triggerId = ss.m_req.triggerId;
1042     ref->errorCode = ss.m_error;
1043     sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1044                signal, DropTrigImplRef::SignalLength, JBB);
1045   }
1046 
1047   ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1048 }
1049 
1050 // GSN_DBINFO_SCANREQ
1051 
1052 //#define DBINFO_SCAN_TRACE
1053 #ifdef DBINFO_SCAN_TRACE
1054 #include <debugger/DebuggerNames.hpp>
1055 #endif
1056 
1057 static Uint32
switchRef(Uint32 block,Uint32 instance,Uint32 node)1058 switchRef(Uint32 block, Uint32 instance, Uint32 node)
1059 {
1060   const Uint32 ref = numberToRef(block, instance,  node);
1061 #ifdef DBINFO_SCAN_TRACE
1062   ndbout_c("Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1063            getBlockName(block, "<unknown>"), instance, node, ref);
1064 #endif
1065   return ref;
1066 }
1067 
1068 
1069 bool
find_next(Ndbinfo::ScanCursor * cursor) const1070 LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
1071 {
1072   const Uint32 node = refToNode(cursor->currRef);
1073   const Uint32 block = refToMain(cursor->currRef);
1074   Uint32 instance = refToInstance(cursor->currRef);
1075 
1076   ndbrequire(node == getOwnNodeId());
1077   ndbrequire(block == number());
1078 
1079 
1080   Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1081 
1082   if (worker < c_workers)
1083   {
1084     jam();
1085     cursor->currRef = switchRef(block, workerInstance(worker), node);
1086     return true;
1087   }
1088 
1089   cursor->currRef = numberToRef(block, node);
1090   return false;
1091 }
1092 
1093 
1094 
1095 void
execDBINFO_SCANREQ(Signal * signal)1096 LocalProxy::execDBINFO_SCANREQ(Signal* signal)
1097 {
1098   jamEntry();
1099   const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
1100   Uint32 signal_length = signal->getLength();
1101   ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1102 
1103   Ndbinfo::ScanCursor* cursor =
1104     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
1105 
1106   if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
1107       cursor->saveCurrRef)
1108   {
1109     jam();
1110     /* Continue in the saved block ref */
1111     cursor->currRef = cursor->saveCurrRef;
1112     cursor->saveCurrRef = 0;
1113 
1114     // Set this block as sender and remember original sender
1115     cursor->saveSenderRef = cursor->senderRef;
1116     cursor->senderRef = reference();
1117 
1118     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1119                signal, signal_length, JBB);
1120     return;
1121   }
1122 
1123   Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
1124 
1125   if (find_next(cursor))
1126   {
1127     jam();
1128     ndbrequire(cursor->currRef);
1129     ndbrequire(cursor->saveCurrRef == 0);
1130 
1131     // Set this block as sender and remember original sender
1132     cursor->saveSenderRef = cursor->senderRef;
1133     cursor->senderRef = reference();
1134 
1135     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1136                signal, signal_length, JBB);
1137     return;
1138   }
1139 
1140   /* Scan is done, send SCANCONF back to caller  */
1141   ndbrequire(cursor->saveSenderRef == 0);
1142 
1143   ndbrequire(cursor->currRef);
1144   ndbrequire(cursor->saveCurrRef == 0);
1145 
1146   ndbrequire(refToInstance(cursor->currRef) == 0);
1147   sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1148   return;
1149 }
1150 
1151 void
execDBINFO_SCANCONF(Signal * signal)1152 LocalProxy::execDBINFO_SCANCONF(Signal* signal)
1153 {
1154   jamEntry();
1155   const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
1156   Uint32 signal_length = signal->getLength();
1157   ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1158 
1159   Ndbinfo::ScanCursor* cursor =
1160     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
1161 
1162   if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
1163   {
1164     /* The instance has more data and want to continue */
1165     jam();
1166 
1167     /* Swap back saved senderRef */
1168     const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1169     cursor->saveSenderRef = 0;
1170 
1171     /* Save currRef to continue with same instance again */
1172     cursor->saveCurrRef = cursor->currRef;
1173     cursor->currRef = reference();
1174 
1175     sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1176     return;
1177   }
1178 
1179   if (conf->returnedRows)
1180   {
1181     jam();
1182     /*
1183        The instance has no more data, but it has sent rows
1184        to the API which need to be CONFed
1185     */
1186 
1187     /* Swap back saved senderRef */
1188     const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1189     cursor->saveSenderRef = 0;
1190 
1191     if (find_next(cursor))
1192     {
1193       /*
1194         There is another instance to continue in - signal 'more data'
1195         and setup saveCurrRef to continue in that instance
1196       */
1197       jam();
1198       Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
1199 
1200       cursor->saveCurrRef = cursor->currRef;
1201       cursor->currRef = reference();
1202     }
1203     else
1204     {
1205       /* There was no more instances to continue in */
1206       ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) == false);
1207 
1208       ndbrequire(cursor->currRef == reference());
1209       cursor->saveCurrRef = 0;
1210     }
1211 
1212     sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1213     return;
1214   }
1215 
1216 
1217   /* The underlying block reported completed, find next if any */
1218   if (find_next(cursor))
1219   {
1220     jam();
1221 
1222     ndbrequire(cursor->senderRef == reference());
1223     ndbrequire(cursor->saveSenderRef); // Should already be set
1224 
1225     ndbrequire(cursor->saveCurrRef == 0);
1226 
1227     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1228                signal, signal_length, JBB);
1229     return;
1230   }
1231 
1232   /* Scan in this block and its instances are completed */
1233 
1234   /* Swap back saved senderRef */
1235   const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1236   cursor->saveSenderRef = 0;
1237 
1238   ndbrequire(cursor->currRef);
1239   ndbrequire(cursor->saveCurrRef == 0);
1240 
1241   sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1242   return;
1243 }
1244 
1245 // GSN_SYNC_REQ
1246 
1247 void
execSYNC_REQ(Signal * signal)1248 LocalProxy::execSYNC_REQ(Signal* signal)
1249 {
1250   Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1251 
1252   ss.m_req = * CAST_CONSTPTR(SyncReq, signal->getDataPtr());
1253 
1254   sendREQ(signal, ss);
1255 }
1256 
1257 void
sendSYNC_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1258 LocalProxy::sendSYNC_REQ(Signal* signal, Uint32 ssId,
1259                          SectionHandle* handle)
1260 {
1261   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1262 
1263   SyncReq * req = CAST_PTR(SyncReq, signal->getDataPtrSend());
1264   req->senderRef = reference();
1265   req->senderData = ssId;
1266   req->prio = ss.m_req.prio;
1267 
1268   sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1269                       signal, SyncReq::SignalLength,
1270                       JobBufferLevel(ss.m_req.prio), handle);
1271 }
1272 
1273 void
execSYNC_REF(Signal * signal)1274 LocalProxy::execSYNC_REF(Signal* signal)
1275 {
1276   SyncRef ref = * CAST_CONSTPTR(SyncRef, signal->getDataPtr());
1277   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1278 
1279   recvREF(signal, ss, ref.errorCode);
1280 }
1281 
1282 void
execSYNC_CONF(Signal * signal)1283 LocalProxy::execSYNC_CONF(Signal* signal)
1284 {
1285   SyncConf conf = * CAST_CONSTPTR(SyncConf, signal->getDataPtr());
1286   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1287 
1288   recvCONF(signal, ss);
1289 }
1290 
1291 void
sendSYNC_CONF(Signal * signal,Uint32 ssId)1292 LocalProxy::sendSYNC_CONF(Signal* signal, Uint32 ssId)
1293 {
1294   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1295 
1296   if (!lastReply(ss))
1297     return;
1298 
1299   /**
1300    * SimulatedBlock::execSYNC_REQ will reply
1301    */
1302   if (ss.m_error == 0)
1303   {
1304     jam();
1305     SyncConf * conf = CAST_PTR(SyncConf, signal->getDataPtrSend());
1306     conf->senderRef = reference();
1307     conf->senderData = ss.m_req.senderData;
1308 
1309     Uint32 prio = ss.m_req.prio;
1310     sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1311                SyncConf::SignalLength,
1312                JobBufferLevel(prio));
1313   }
1314   else
1315   {
1316     jam();
1317     SyncRef * ref = CAST_PTR(SyncRef, signal->getDataPtrSend());
1318     ref->senderRef = reference();
1319     ref->senderData = ss.m_req.senderData;
1320     ref->errorCode = ss.m_error;
1321 
1322     Uint32 prio = ss.m_req.prio;
1323     sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1324                SyncRef::SignalLength,
1325                JobBufferLevel(prio));
1326   }
1327   ssRelease<Ss_SYNC_REQ>(ssId);
1328 }
1329 
1330 void
execSYNC_PATH_REQ(Signal * signal)1331 LocalProxy::execSYNC_PATH_REQ(Signal* signal)
1332 {
1333   SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
1334   req->count *= c_workers;
1335 
1336   for (Uint32 i = 0; i < c_workers; i++)
1337   {
1338     jam();
1339     Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1340     sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1341                signal->getLength(),
1342                JobBufferLevel(req->prio));
1343   }
1344 }
1345 
1346 BLOCK_FUNCTIONS(LocalProxy)
1347