1 /* Copyright (c) 2008, 2019, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 #include <mt.hpp>
24 #include "LocalProxy.hpp"
25 #include <pgman.hpp>
26 
27 #include <signaldata/RouteOrd.hpp>
28 
29 //#define DBINFO_SCAN_TRACE
30 #ifdef DBINFO_SCAN_TRACE
31 #include <debugger/DebuggerNames.hpp>
32 #endif
33 #include <NdbGetRUsage.h>
34 #include <EventLogger.hpp>
35 
36 #define JAM_FILE_ID 437
37 
38 extern EventLogger *g_eventLogger;
39 
LocalProxy(BlockNumber blockNumber,Block_context & ctx)40 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
41   SimulatedBlock(blockNumber, ctx)
42 {
43   BLOCK_CONSTRUCTOR(LocalProxy);
44 
45   ndbrequire(instance() == 0); // this is main block
46   c_workers = 0;
47   Uint32 i;
48   for (i = 0; i < MaxWorkers; i++)
49     c_worker[i] = 0;
50 
51   c_anyWorkerCounter = 0;
52   c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
53   c_masterNodeId = ZNIL;
54 
55   // GSN_READ_CONFIG_REQ
56   addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
57   addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
58 
59   // GSN_STTOR
60   addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
61   addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
62 
63   // GSN_NDB_STTOR
64   addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
65   addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
66 
67   // GSN_READ_NODESREQ
68   addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
69   addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
70 
71   // GSN_NODE_FAILREP
72   addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
73   addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
74 
75   // GSN_INCL_NODEREQ
76   addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
77   addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
78 
79   // GSN_NODE_STATE_REP
80   addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP, true);
81 
82   // GSN_CHANGE_NODE_STATE_REQ
83   addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ, true);
84   addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
85 
86   // GSN_DUMP_STATE_ORD
87   addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
88 
89   // GSN_NDB_TAMPER
90   addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
91 
92   // GSN_TIME_SIGNAL
93   addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
94 
95   // GSN_CREATE_TRIG_IMPL_REQ
96   addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
97   addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
98   addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
99 
100   // GSN_DROP_TRIG_IMPL_REQ
101   addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
102   addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
103   addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
104 
105   // GSN_DBINFO_SCANREQ
106   addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
107   addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
108 
109   // GSN_SYNC_REQ
110   addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ, true);
111   addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
112   addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
113 
114   // GSN_SYNC_PATH_REQ
115   addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ, true);
116 
117   // GSN_API_FAILREQ
118   addRecSignal(GSN_API_FAILREQ, &LocalProxy::execAPI_FAILREQ);
119   addRecSignal(GSN_API_FAILCONF, &LocalProxy::execAPI_FAILCONF);
120 }
121 
~LocalProxy()122 LocalProxy::~LocalProxy()
123 {
124   // dtor of main block deletes workers
125 }
126 
127 // support routines
128 
129 void
sendREQ(Signal * signal,SsSequential & ss)130 LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
131 {
132   jam();
133   ss.m_worker = 0;
134   ndbrequire(ss.m_sendREQ != 0);
135   SectionHandle handle(this);
136   restoreHandle(handle, ss);
137   (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
138   saveSections(ss, handle);
139 }
140 
141 void
recvCONF(Signal * signal,SsSequential & ss)142 LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
143 {
144   jam();
145   ndbrequire(ss.m_sendCONF != 0);
146   (this->*ss.m_sendCONF)(signal, ss.m_ssId);
147 
148   ss.m_worker++;
149   if (ss.m_worker < c_workers) {
150     jam();
151     ndbrequire(ss.m_sendREQ != 0);
152     SectionHandle handle(this);
153     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
154     return;
155   }
156 }
157 
158 void
recvREF(Signal * signal,SsSequential & ss,Uint32 error)159 LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
160 {
161   jam();
162   ndbrequire(error != 0);
163   if (ss.m_error == 0)
164   {
165     jam();
166     ss.m_error = error;
167   }
168   recvCONF(signal, ss);
169 }
170 
171 void
skipReq(SsSequential & ss)172 LocalProxy::skipReq(SsSequential& ss)
173 {
174   jam();
175 }
176 
177 void
skipConf(SsSequential & ss)178 LocalProxy::skipConf(SsSequential& ss)
179 {
180   jam();
181 }
182 
183 void
saveSections(SsCommon & ss,SectionHandle & handle)184 LocalProxy::saveSections(SsCommon& ss, SectionHandle & handle)
185 {
186   jam();
187   ss.m_sec_cnt = handle.m_cnt;
188   for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
189   {
190     jam();
191     ss.m_sec_ptr[i] = handle.m_ptr[i].i;
192   }
193   handle.clear();
194 }
195 
196 void
restoreHandle(SectionHandle & handle,SsCommon & ss)197 LocalProxy::restoreHandle(SectionHandle & handle, SsCommon& ss)
198 {
199   handle.m_cnt = ss.m_sec_cnt;
200   for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
201   {
202     jam();
203     handle.m_ptr[i].i = ss.m_sec_ptr[i];
204   }
205 
206   getSections(handle.m_cnt, handle.m_ptr);
207   ss.m_sec_cnt = 0;
208 }
209 
210 bool
firstReply(const SsSequential & ss)211 LocalProxy::firstReply(const SsSequential& ss)
212 {
213   return ss.m_worker == 0;
214 }
215 
216 bool
lastReply(const SsSequential & ss)217 LocalProxy::lastReply(const SsSequential& ss)
218 {
219   return ss.m_worker + 1 == c_workers;
220 }
221 
222 void
sendREQ(Signal * signal,SsParallel & ss,bool skipLast)223 LocalProxy::sendREQ(Signal* signal, SsParallel& ss, bool skipLast)
224 {
225   jam();
226   ndbrequire(ss.m_sendREQ != 0);
227 
228   ss.m_workerMask.clear();
229   ss.m_worker = 0;
230   const Uint32 count = skipLast ? c_workers - 1 : c_workers;
231   SectionHandle handle(this);
232   restoreHandle(handle, ss);
233   while (ss.m_worker < count) {
234     jam();
235     ss.m_workerMask.set(ss.m_worker);
236     (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
237     ss.m_worker++;
238   }
239   releaseSections(handle);
240 }
241 
242 void
recvCONF(Signal * signal,SsParallel & ss)243 LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
244 {
245   jam();
246   ndbrequire(ss.m_sendCONF != 0);
247 
248   BlockReference ref = signal->getSendersBlockRef();
249   ndbrequire(refToMain(ref) == number());
250 
251   Uint32 ino = refToInstance(ref);
252   ss.m_worker = workerIndex(ino);
253   ndbrequire(ref == workerRef(ss.m_worker));
254   ndbrequire(ss.m_worker < c_workers);
255   ndbrequire(ss.m_workerMask.get(ss.m_worker));
256   ss.m_workerMask.clear(ss.m_worker);
257 
258   (this->*ss.m_sendCONF)(signal, ss.m_ssId);
259 }
260 
261 void
recvREF(Signal * signal,SsParallel & ss,Uint32 error)262 LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
263 {
264   jam();
265   ndbrequire(error != 0);
266   if (ss.m_error == 0)
267   {
268     jam();
269     ss.m_error = error;
270   }
271   recvCONF(signal, ss);
272 }
273 
274 void
skipReq(SsParallel & ss)275 LocalProxy::skipReq(SsParallel& ss)
276 {
277   jam();
278   ndbrequire(ss.m_workerMask.get(ss.m_worker));
279   ss.m_workerMask.clear(ss.m_worker);
280 }
281 
282 // more replies expected from this worker
283 void
skipConf(SsParallel & ss)284 LocalProxy::skipConf(SsParallel& ss)
285 {
286   jam();
287   ndbrequire(!ss.m_workerMask.get(ss.m_worker));
288   ss.m_workerMask.set(ss.m_worker);
289 }
290 
291 bool
firstReply(const SsParallel & ss)292 LocalProxy::firstReply(const SsParallel& ss)
293 {
294   const WorkerMask& mask = ss.m_workerMask;
295   const Uint32 count = mask.count();
296 
297   // recvCONF has cleared current worker
298   ndbrequire(ss.m_worker < c_workers);
299   ndbrequire(!mask.get(ss.m_worker));
300   ndbrequire(count < c_workers);
301   return count + 1 == c_workers;
302 }
303 
304 bool
lastReply(const SsParallel & ss)305 LocalProxy::lastReply(const SsParallel& ss)
306 {
307   return ss.m_workerMask.isclear();
308 }
309 
310 // used in "reverse" proxying (start with worker REQs)
311 void
setMask(SsParallel & ss)312 LocalProxy::setMask(SsParallel& ss)
313 {
314   Uint32 i;
315   jam();
316   for (i = 0; i < c_workers; i++)
317   {
318     jam();
319     ss.m_workerMask.set(i);
320   }
321 }
322 
323 void
setMask(SsParallel & ss,const WorkerMask & mask)324 LocalProxy::setMask(SsParallel& ss, const WorkerMask& mask)
325 {
326   ss.m_workerMask.assign(mask);
327 }
328 
329 // load workers (before first signal)
330 
331 void
loadWorkers()332 LocalProxy::loadWorkers()
333 {
334   c_workers = mt_get_instance_count(number());
335   for (Uint32 i = 0; i < c_workers; i++)
336   {
337     jamNoBlock();
338     Uint32 instanceNo = workerInstance(i);
339 
340     SimulatedBlock* worker = newWorker(instanceNo);
341     ndbrequire(worker->instance() == instanceNo);
342     ndbrequire(this->getInstance(instanceNo) == worker);
343     c_worker[i] = worker;
344 
345     if (number() == PGMAN && i == (c_workers - 1))
346     {
347       ((Pgman*)worker)->init_extra_pgman();
348     }
349     mt_add_thr_map(number(), instanceNo);
350   }
351 }
352 
353 void
forwardToWorkerIndex(Signal * signal,Uint32 index)354 LocalProxy::forwardToWorkerIndex(Signal* signal, Uint32 index)
355 {
356   jam();
357   /**
358    * We statelessly forward to one of our
359    * workers, including any sections that
360    * might be attached.
361    */
362   BlockReference destRef = workerRef(index);
363   SectionHandle sh(this, signal);
364 
365   sendSignal(destRef,
366              signal->header.theVerId_signalNumber,
367              signal,
368              signal->getLength(),
369              JBB,
370              &sh);
371 }
372 
373 void
forwardToAnyWorker(Signal * signal)374 LocalProxy::forwardToAnyWorker(Signal* signal)
375 {
376   jam();
377 
378   /* Won't work for fragmented signals */
379   ndbassert(signal->header.m_fragmentInfo == 0);
380 
381   forwardToWorkerIndex(signal, getAnyWorkerIndex());
382 }
383 
384 // GSN_READ_CONFIG_REQ
385 
386 void
execREAD_CONFIG_REQ(Signal * signal)387 LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
388 {
389   jam();
390   Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
391 
392   const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
393   ss.m_req = *req;
394   ndbrequire(ss.m_req.noOfParameters == 0);
395   callREAD_CONFIG_REQ(signal);
396 }
397 
398 void
callREAD_CONFIG_REQ(Signal * signal)399 LocalProxy::callREAD_CONFIG_REQ(Signal* signal)
400 {
401   jam();
402   backREAD_CONFIG_REQ(signal);
403 }
404 
405 void
backREAD_CONFIG_REQ(Signal * signal)406 LocalProxy::backREAD_CONFIG_REQ(Signal* signal)
407 {
408   jam();
409   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
410 
411   // run sequentially due to big mallocs and initializations
412   sendREQ(signal, ss);
413 }
414 
415 void
sendREAD_CONFIG_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)416 LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId,
417                                 SectionHandle* handle)
418 {
419   jam();
420   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
421 
422   ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
423   req->senderRef = reference();
424   req->senderData = ssId;
425   req->noOfParameters = 0;
426   sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
427                       signal, ReadConfigReq::SignalLength, JBB, handle);
428 }
429 
430 void
execREAD_CONFIG_CONF(Signal * signal)431 LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
432 {
433   jam();
434   const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
435   Uint32 ssId = conf->senderData;
436   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
437 
438 #ifdef DEBUG_RSS
439   {
440     ndb_rusage ru;
441     if (Ndb_GetRUsage(&ru, true) != 0)
442     {
443       g_eventLogger->error("LocalProxy : Failed to get rusage");
444     }
445     else
446     {
447       g_eventLogger->info("LocalProxy (conf from worker %u/%u) : RSS : %llu kB",
448                           ss.m_worker,
449                           c_workers,
450                           ru.ru_rss);
451     }
452   }
453 #endif
454   recvCONF(signal, ss);
455 }
456 
457 void
sendREAD_CONFIG_CONF(Signal * signal,Uint32 ssId)458 LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
459 {
460   jam();
461   Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
462 
463   if (!lastReply(ss))
464   {
465     jam();
466     return;
467   }
468 
469   SectionHandle handle(this);
470   restoreHandle(handle, ss);
471   releaseSections(handle);
472 
473   ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
474   conf->senderRef = reference();
475   conf->senderData = ss.m_req.senderData;
476   sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
477              signal, ReadConfigConf::SignalLength, JBB);
478 
479   ssRelease<Ss_READ_CONFIG_REQ>(ssId);
480 }
481 
482 // GSN_STTOR
483 
484 void
execSTTOR(Signal * signal)485 LocalProxy::execSTTOR(Signal* signal)
486 {
487   jam();
488   Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
489 
490   const Uint32 startphase  = signal->theData[1];
491   const Uint32 typeOfStart = signal->theData[7];
492 
493   if (startphase == 3)
494   {
495     jam();
496     c_typeOfStart = typeOfStart;
497   }
498 
499   ss.m_reqlength = signal->getLength();
500   memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
501 
502   callSTTOR(signal);
503 }
504 
505 void
callSTTOR(Signal * signal)506 LocalProxy::callSTTOR(Signal* signal)
507 {
508   jam();
509   backSTTOR(signal);
510 }
511 
512 void
backSTTOR(Signal * signal)513 LocalProxy::backSTTOR(Signal* signal)
514 {
515   jam();
516   Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
517   sendREQ(signal, ss);
518 }
519 
520 void
sendSTTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)521 LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
522 {
523   jam();
524   Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
525 
526   memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
527   sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
528                       signal, ss.m_reqlength, JBB, handle);
529 }
530 
531 void
execSTTORRY(Signal * signal)532 LocalProxy::execSTTORRY(Signal* signal)
533 {
534   jam();
535   Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
536   recvCONF(signal, ss);
537 }
538 
539 void
sendSTTORRY(Signal * signal,Uint32 ssId)540 LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
541 {
542   jam();
543   Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
544 
545   const Uint32 conflength = signal->getLength();
546   const Uint32* confdata = signal->getDataPtr();
547 
548   // the reply is identical from all
549   if (firstReply(ss))
550   {
551     jam();
552     ss.m_conflength = conflength;
553     memcpy(ss.m_confdata, confdata, conflength << 2);
554   }
555   else
556   {
557     jam();
558     ndbrequire(ss.m_conflength == conflength);
559     ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
560   }
561 
562   if (!lastReply(ss))
563   {
564     jam();
565     return;
566   }
567 
568   memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
569   sendSignal(NDBCNTR_REF, GSN_STTORRY,
570              signal, ss.m_conflength, JBB);
571 
572   ssRelease<Ss_STTOR>(ssId);
573 }
574 
575 // GSN_NDB_STTOR
576 
577 void
execNDB_STTOR(Signal * signal)578 LocalProxy::execNDB_STTOR(Signal* signal)
579 {
580   jam();
581   Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
582 
583   const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
584   ss.m_req = *req;
585 
586   callNDB_STTOR(signal);
587 }
588 
589 void
callNDB_STTOR(Signal * signal)590 LocalProxy::callNDB_STTOR(Signal* signal)
591 {
592   jam();
593   backNDB_STTOR(signal);
594 }
595 
596 void
backNDB_STTOR(Signal * signal)597 LocalProxy::backNDB_STTOR(Signal* signal)
598 {
599   jam();
600   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
601   sendREQ(signal, ss);
602 }
603 
604 void
sendNDB_STTOR(Signal * signal,Uint32 ssId,SectionHandle * handle)605 LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId, SectionHandle* handle)
606 {
607   jam();
608   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
609 
610   NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
611   *req = ss.m_req;
612   req->senderRef = reference();
613   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
614                       signal, ss.m_reqlength, JBB, handle);
615 }
616 
617 void
execNDB_STTORRY(Signal * signal)618 LocalProxy::execNDB_STTORRY(Signal* signal)
619 {
620   jam();
621   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
622 
623   // the reply contains only senderRef
624   const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
625   ndbrequire(conf->senderRef == signal->getSendersBlockRef());
626   recvCONF(signal, ss);
627 }
628 
629 void
sendNDB_STTORRY(Signal * signal,Uint32 ssId)630 LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
631 {
632   jam();
633   Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
634 
635   if (!lastReply(ss))
636   {
637     jam();
638     return;
639   }
640 
641   NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
642   conf->senderRef = reference();
643   sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
644              signal, NdbSttorry::SignalLength, JBB);
645 
646   ssRelease<Ss_NDB_STTOR>(ssId);
647 }
648 
649 // GSN_READ_NODESREQ
650 
651 void
sendREAD_NODESREQ(Signal * signal)652 LocalProxy::sendREAD_NODESREQ(Signal* signal)
653 {
654   jam();
655   signal->theData[0] = reference();
656   sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
657 }
658 
659 void
execREAD_NODESCONF(Signal * signal)660 LocalProxy::execREAD_NODESCONF(Signal* signal)
661 {
662   jam();
663   Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
664 
665   const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
666 
667   c_masterNodeId = conf->masterNodeId;
668 
669   /**
670    * READ_NODESCONF comes with a section containing a bitmap, since the
671    * the proxy block didn't have its own method to receive this, it isn't
672    * interested in the contents of this. So can simply release it.
673    */
674   SectionHandle handle(this, signal);
675   releaseSections(handle);
676 
677   switch (ss.m_gsn) {
678   case GSN_STTOR:
679     jam();
680     backSTTOR(signal);
681     break;
682   case GSN_NDB_STTOR:
683     jam();
684     backNDB_STTOR(signal);
685     break;
686   default:
687     ndbabort();
688   }
689 
690   ss.m_gsn = 0;
691 }
692 
693 void
execREAD_NODESREF(Signal * signal)694 LocalProxy::execREAD_NODESREF(Signal* signal)
695 {
696   jam();
697   Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
698   ndbrequire(ss.m_gsn != 0);
699   ndbabort();
700 }
701 
702 // GSN_NODE_FAILREP
703 
704 void
execNODE_FAILREP(Signal * signal)705 LocalProxy::execNODE_FAILREP(Signal* signal)
706 {
707   jam();
708   Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
709   NodeFailRep* req = (NodeFailRep*)signal->getDataPtr();
710   ndbrequire(signal->getLength() == NodeFailRep::SignalLength ||
711              signal->getLength() == NodeFailRep::SignalLength_v1);
712 
713   if(signal->getLength() == NodeFailRep::SignalLength)
714   {
715     ndbrequire(signal->getNoOfSections() == 1);
716     ndbrequire(getNodeInfo(refToNode(signal->getSendersBlockRef())).m_version);
717     SegmentedSectionPtr ptr;
718     SectionHandle handle(this, signal);
719     handle.getSection(ptr, 0);
720     memset(req->theNodes, 0 ,sizeof(req->theNodes));
721     copy(req->theNodes, ptr);
722     releaseSections(handle);
723   }
724   else
725   {
726     memset(req->theNodes + NdbNodeBitmask48::Size,
727            0,
728            _NDB_NBM_DIFF_BYTES);
729   }
730   ss.m_req = *req;
731   NdbNodeBitmask mask;
732   mask.assign(NdbNodeBitmask::Size, req->theNodes);
733 
734   // from each worker wait for ack for each failed node
735   for (Uint32 i = 0; i < c_workers; i++)
736   {
737     jam();
738     NdbNodeBitmask& waitFor = ss.m_waitFor[i];
739     waitFor.bitOR(mask);
740   }
741 
742   sendREQ(signal, ss);
743   if (ss.noReply(number()))
744   {
745     jam();
746     ssRelease<Ss_NODE_FAILREP>(ss);
747   }
748 }
749 
750 void
sendNODE_FAILREP(Signal * signal,Uint32 ssId,SectionHandle * handle)751 LocalProxy::sendNODE_FAILREP(Signal* signal, Uint32 ssId, SectionHandle* handle)
752 {
753   jam();
754   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
755 
756   NodeFailRep* req = (NodeFailRep*)signal->getDataPtrSend();
757   *req = ss.m_req;
758   handle->clear();
759   LinearSectionPtr lsptr[3];
760   lsptr[0].p = req->theNodes;
761   lsptr[0].sz = NdbNodeBitmask::getPackedLengthInWords(req->theNodes);
762   ndbrequire(import(handle->m_ptr[0], lsptr[0].p, lsptr[0].sz));
763   handle->m_cnt = 1;
764   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
765                       signal, NodeFailRep::SignalLength, JBB, handle);
766 }
767 
768 void
execNF_COMPLETEREP(Signal * signal)769 LocalProxy::execNF_COMPLETEREP(Signal* signal)
770 {
771   jam();
772   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
773   ndbrequire(!ss.noReply(number()));
774   ss.m_workerMask.set(ss.m_worker); // Avoid require in recvCONF
775   recvCONF(signal, ss);
776 }
777 
778 void
sendNF_COMPLETEREP(Signal * signal,Uint32 ssId)779 LocalProxy::sendNF_COMPLETEREP(Signal* signal, Uint32 ssId)
780 {
781   jam();
782   Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
783 
784   const NFCompleteRep* conf = (const NFCompleteRep*)signal->getDataPtr();
785   Uint32 node = conf->failedNodeId;
786 
787   {
788     NdbNodeBitmask& waitFor = ss.m_waitFor[ss.m_worker];
789     ndbrequire(waitFor.get(node));
790     waitFor.clear(node);
791   }
792 
793   for (Uint32 i = 0; i < c_workers; i++)
794   {
795     jam();
796     NdbNodeBitmask& waitFor = ss.m_waitFor[i];
797     if (waitFor.get(node))
798     {
799       jam();
800       /**
801        * Not all threads are done with this failed node
802        */
803       return;
804     }
805   }
806 
807   {
808     NFCompleteRep* conf = (NFCompleteRep*)signal->getDataPtrSend();
809     conf->blockNo = number();
810     conf->nodeId = getOwnNodeId();
811     conf->failedNodeId = node;
812     conf->unused = 0;
813     conf->from = __LINE__;
814 
815     sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
816                signal, NFCompleteRep::SignalLength, JBB);
817 
818     if (number() == DBTC)
819     {
820       /**
821        * DBTC send NF_COMPLETEREP "early" to QMGR
822        *   so that it can allow api to handle node-failure of
823        *   transactions eariler...
824        * See Qmgr::execNF_COMPLETEREP
825        */
826       jam();
827       sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
828                  NFCompleteRep::SignalLength, JBB);
829     }
830   }
831 }
832 
833 // GSN_INCL_NODEREQ
834 
835 void
execINCL_NODEREQ(Signal * signal)836 LocalProxy::execINCL_NODEREQ(Signal* signal)
837 {
838   jam();
839   Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
840 
841   ss.m_reqlength = signal->getLength();
842   ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
843   memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
844 
845   sendREQ(signal, ss);
846 }
847 
848 void
sendINCL_NODEREQ(Signal * signal,Uint32 ssId,SectionHandle * handle)849 LocalProxy::sendINCL_NODEREQ(Signal* signal, Uint32 ssId, SectionHandle* handle)
850 {
851   jam();
852   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
853 
854   Ss_INCL_NODEREQ::Req* req =
855     (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
856 
857   memcpy(req, &ss.m_req, ss.m_reqlength << 2);
858   req->senderRef = reference();
859   sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
860                       signal, ss.m_reqlength, JBB, handle);
861 }
862 
863 void
execINCL_NODECONF(Signal * signal)864 LocalProxy::execINCL_NODECONF(Signal* signal)
865 {
866   jam();
867   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
868   recvCONF(signal, ss);
869 }
870 
871 void
sendINCL_NODECONF(Signal * signal,Uint32 ssId)872 LocalProxy::sendINCL_NODECONF(Signal* signal, Uint32 ssId)
873 {
874   jam();
875   Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
876 
877   if (!lastReply(ss))
878   {
879     jam();
880     return;
881   }
882 
883   Ss_INCL_NODEREQ::Conf* conf =
884     (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
885 
886   conf->inclNodeId = ss.m_req.inclNodeId;
887   conf->senderRef = reference();
888   sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
889              signal, 2, JBB);
890 
891   ssRelease<Ss_INCL_NODEREQ>(ssId);
892 }
893 
894 // GSN_NODE_STATE_REP
895 
896 void
execNODE_STATE_REP(Signal * signal)897 LocalProxy::execNODE_STATE_REP(Signal* signal)
898 {
899   jam();
900   Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
901   sendREQ(signal, ss);
902   SimulatedBlock::execNODE_STATE_REP(signal);
903   ssRelease<Ss_NODE_STATE_REP>(ss);
904 }
905 
906 void
sendNODE_STATE_REP(Signal * signal,Uint32 ssId,SectionHandle * handle)907 LocalProxy::sendNODE_STATE_REP(Signal* signal, Uint32 ssId,
908                                SectionHandle* handle)
909 {
910   jam();
911   Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
912 
913   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
914                       signal,NodeStateRep::SignalLength, JBB, handle);
915 }
916 
917 // GSN_CHANGE_NODE_STATE_REQ
918 
919 void
execCHANGE_NODE_STATE_REQ(Signal * signal)920 LocalProxy::execCHANGE_NODE_STATE_REQ(Signal* signal)
921 {
922   jam();
923   Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
924 
925   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
926   ss.m_req = *req;
927 
928   sendREQ(signal, ss);
929 }
930 
931 void
sendCHANGE_NODE_STATE_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)932 LocalProxy::sendCHANGE_NODE_STATE_REQ(Signal* signal, Uint32 ssId,
933                                       SectionHandle* handle)
934 {
935   jam();
936   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
937 
938   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
939   req->senderRef = reference();
940 
941   sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
942                       signal, ChangeNodeStateReq::SignalLength, JBB, handle);
943 }
944 
945 void
execCHANGE_NODE_STATE_CONF(Signal * signal)946 LocalProxy::execCHANGE_NODE_STATE_CONF(Signal* signal)
947 {
948   jam();
949   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
950 
951   ChangeNodeStateConf * conf = (ChangeNodeStateConf*)signal->getDataPtrSend();
952   ndbrequire(conf->senderData == ss.m_req.senderData);
953   recvCONF(signal, ss);
954 }
955 
956 void
sendCHANGE_NODE_STATE_CONF(Signal * signal,Uint32 ssId)957 LocalProxy::sendCHANGE_NODE_STATE_CONF(Signal* signal, Uint32 ssId)
958 {
959   jam();
960   Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
961 
962   if (!lastReply(ss))
963   {
964     jam();
965     return;
966   }
967 
968   /**
969    * SimulatedBlock::execCHANGE_NODE_STATE_REQ will reply
970    */
971   ChangeNodeStateReq * req = (ChangeNodeStateReq*)signal->getDataPtrSend();
972   * req = ss.m_req;
973   SimulatedBlock::execCHANGE_NODE_STATE_REQ(signal);
974   ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
975 }
976 
977 // GSN_DUMP_STATE_ORD
978 
979 void
execDUMP_STATE_ORD(Signal * signal)980 LocalProxy::execDUMP_STATE_ORD(Signal* signal)
981 {
982   jam();
983   Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
984 
985   ss.m_reqlength = signal->getLength();
986   memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
987   sendREQ(signal, ss);
988   ssRelease<Ss_DUMP_STATE_ORD>(ss);
989 }
990 
991 void
sendDUMP_STATE_ORD(Signal * signal,Uint32 ssId,SectionHandle * handle)992 LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId,
993                                SectionHandle* handle)
994 {
995   jam();
996   Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
997 
998   memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
999   sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
1000                       signal, ss.m_reqlength, JBB, handle);
1001 }
1002 
1003 // GSN_NDB_TAMPER
1004 
1005 void
execNDB_TAMPER(Signal * signal)1006 LocalProxy::execNDB_TAMPER(Signal* signal)
1007 {
1008   jam();
1009   Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
1010 
1011   const Uint32 siglen = signal->getLength();
1012   if (siglen == 1)
1013   {
1014     jam();
1015     ss.m_errorInsert = signal->theData[0];
1016     ss.m_haveErrorInsertExtra = false;
1017   }
1018   else
1019   {
1020     jam();
1021     ndbrequire(siglen == 2);
1022     ss.m_errorInsert = signal->theData[0];
1023     ss.m_haveErrorInsertExtra = true;
1024     ss.m_errorInsertExtra = signal->theData[1];
1025   }
1026 
1027   SimulatedBlock::execNDB_TAMPER(signal);
1028   sendREQ(signal, ss);
1029   ssRelease<Ss_NDB_TAMPER>(ss);
1030 }
1031 
1032 void
sendNDB_TAMPER(Signal * signal,Uint32 ssId,SectionHandle * handle)1033 LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId, SectionHandle* handle)
1034 {
1035   jam();
1036   Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
1037 
1038   Uint32 siglen = 1;
1039   signal->theData[0] = ss.m_errorInsert;
1040   if (ss.m_haveErrorInsertExtra)
1041   {
1042     jam();
1043     signal->theData[1] = ss.m_errorInsertExtra;
1044     siglen ++;
1045   }
1046   sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
1047                       signal, siglen, JBB, handle);
1048 }
1049 
1050 // GSN_TIME_SIGNAL
1051 
1052 void
execTIME_SIGNAL(Signal * signal)1053 LocalProxy::execTIME_SIGNAL(Signal* signal)
1054 {
1055   jam();
1056   Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
1057 
1058   sendREQ(signal, ss);
1059   ssRelease<Ss_TIME_SIGNAL>(ss);
1060 }
1061 
1062 void
sendTIME_SIGNAL(Signal * signal,Uint32 ssId,SectionHandle * handle)1063 LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId, SectionHandle* handle)
1064 {
1065   jam();
1066   Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
1067   signal->theData[0] = 0;
1068   sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
1069                       signal, 1, JBB, handle);
1070 }
1071 
1072 // GSN_CREATE_TRIG_IMPL_REQ
1073 
1074 void
execCREATE_TRIG_IMPL_REQ(Signal * signal)1075 LocalProxy::execCREATE_TRIG_IMPL_REQ(Signal* signal)
1076 {
1077   if (!assembleFragments(signal))
1078     return;
1079 
1080   jam();
1081   if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
1082   {
1083     jam();
1084     return;
1085   }
1086   const CreateTrigImplReq* req = (const CreateTrigImplReq*)signal->getDataPtr();
1087   Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
1088   ss.m_req = *req;
1089   ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
1090 
1091   SectionHandle handle(this, signal);
1092   saveSections(ss, handle);
1093 
1094   sendREQ(signal, ss);
1095 }
1096 
1097 void
sendCREATE_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1098 LocalProxy::sendCREATE_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
1099                                      SectionHandle * handle)
1100 {
1101   jam();
1102   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1103 
1104   CreateTrigImplReq* req = (CreateTrigImplReq*)signal->getDataPtrSend();
1105   *req = ss.m_req;
1106   req->senderRef = reference();
1107   req->senderData = ssId;
1108   sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
1109                       signal, CreateTrigImplReq::SignalLength, JBB,
1110                       handle);
1111 }
1112 
1113 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)1114 LocalProxy::execCREATE_TRIG_IMPL_CONF(Signal* signal)
1115 {
1116   jam();
1117   const CreateTrigImplConf* conf = (const CreateTrigImplConf*)signal->getDataPtr();
1118   Uint32 ssId = conf->senderData;
1119   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1120   recvCONF(signal, ss);
1121 }
1122 
1123 void
execCREATE_TRIG_IMPL_REF(Signal * signal)1124 LocalProxy::execCREATE_TRIG_IMPL_REF(Signal* signal)
1125 {
1126   jam();
1127   const CreateTrigImplRef* ref = (const CreateTrigImplRef*)signal->getDataPtr();
1128   Uint32 ssId = ref->senderData;
1129   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1130   recvREF(signal, ss, ref->errorCode);
1131 }
1132 
1133 void
sendCREATE_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1134 LocalProxy::sendCREATE_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1135 {
1136   jam();
1137   Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1138   BlockReference dictRef = ss.m_req.senderRef;
1139 
1140   if (!lastReply(ss))
1141   {
1142     jam();
1143     return;
1144   }
1145 
1146   if (ss.m_error == 0)
1147   {
1148     jam();
1149     CreateTrigImplConf* conf = (CreateTrigImplConf*)signal->getDataPtrSend();
1150     conf->senderRef = reference();
1151     conf->senderData = ss.m_req.senderData;
1152     conf->tableId = ss.m_req.tableId;
1153     conf->triggerId = ss.m_req.triggerId;
1154     conf->triggerInfo = ss.m_req.triggerInfo;
1155     sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
1156                signal, CreateTrigImplConf::SignalLength, JBB);
1157   }
1158   else
1159   {
1160     jam();
1161     CreateTrigImplRef* ref = (CreateTrigImplRef*)signal->getDataPtrSend();
1162     ref->senderRef = reference();
1163     ref->senderData = ss.m_req.senderData;
1164     ref->tableId = ss.m_req.tableId;
1165     ref->triggerId = ss.m_req.triggerId;
1166     ref->triggerInfo = ss.m_req.triggerInfo;
1167     ref->errorCode = ss.m_error;
1168     sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
1169                signal, CreateTrigImplRef::SignalLength, JBB);
1170   }
1171 
1172   ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
1173 }
1174 
1175 // GSN_DROP_TRIG_IMPL_REQ
1176 
1177 void
execDROP_TRIG_IMPL_REQ(Signal * signal)1178 LocalProxy::execDROP_TRIG_IMPL_REQ(Signal* signal)
1179 {
1180   jam();
1181   if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
1182   {
1183     jam();
1184     return;
1185   }
1186   const DropTrigImplReq* req = (const DropTrigImplReq*)signal->getDataPtr();
1187   Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
1188   ss.m_req = *req;
1189   ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
1190   sendREQ(signal, ss);
1191 }
1192 
1193 void
sendDROP_TRIG_IMPL_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1194 LocalProxy::sendDROP_TRIG_IMPL_REQ(Signal* signal, Uint32 ssId,
1195                                    SectionHandle * handle)
1196 {
1197   jam();
1198   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1199 
1200   DropTrigImplReq* req = (DropTrigImplReq*)signal->getDataPtrSend();
1201   *req = ss.m_req;
1202   req->senderRef = reference();
1203   req->senderData = ssId;
1204   sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
1205                       signal, DropTrigImplReq::SignalLength, JBB, handle);
1206 }
1207 
1208 void
execDROP_TRIG_IMPL_CONF(Signal * signal)1209 LocalProxy::execDROP_TRIG_IMPL_CONF(Signal* signal)
1210 {
1211   jam();
1212   const DropTrigImplConf* conf = (const DropTrigImplConf*)signal->getDataPtr();
1213   Uint32 ssId = conf->senderData;
1214   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1215   recvCONF(signal, ss);
1216 }
1217 
1218 void
execDROP_TRIG_IMPL_REF(Signal * signal)1219 LocalProxy::execDROP_TRIG_IMPL_REF(Signal* signal)
1220 {
1221   jam();
1222   const DropTrigImplRef* ref = (const DropTrigImplRef*)signal->getDataPtr();
1223   Uint32 ssId = ref->senderData;
1224   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1225   recvREF(signal, ss, ref->errorCode);
1226 }
1227 
1228 void
sendDROP_TRIG_IMPL_CONF(Signal * signal,Uint32 ssId)1229 LocalProxy::sendDROP_TRIG_IMPL_CONF(Signal* signal, Uint32 ssId)
1230 {
1231   jam();
1232   Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1233   BlockReference dictRef = ss.m_req.senderRef;
1234 
1235   if (!lastReply(ss))
1236   {
1237     jam();
1238     return;
1239   }
1240 
1241   if (ss.m_error == 0)
1242   {
1243     jam();
1244     DropTrigImplConf* conf = (DropTrigImplConf*)signal->getDataPtrSend();
1245     conf->senderRef = reference();
1246     conf->senderData = ss.m_req.senderData;
1247     conf->tableId = ss.m_req.tableId;
1248     conf->triggerId = ss.m_req.triggerId;
1249     sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1250                signal, DropTrigImplConf::SignalLength, JBB);
1251   }
1252   else
1253   {
1254     jam();
1255     DropTrigImplRef* ref = (DropTrigImplRef*)signal->getDataPtrSend();
1256     ref->senderRef = reference();
1257     ref->senderData = ss.m_req.senderData;
1258     ref->tableId = ss.m_req.tableId;
1259     ref->triggerId = ss.m_req.triggerId;
1260     ref->errorCode = ss.m_error;
1261     sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1262                signal, DropTrigImplRef::SignalLength, JBB);
1263   }
1264 
1265   ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1266 }
1267 
1268 // GSN_DBINFO_SCANREQ
1269 
1270 static Uint32
switchRef(Uint32 block,Uint32 instance,Uint32 node)1271 switchRef(Uint32 block, Uint32 instance, Uint32 node)
1272 {
1273   const Uint32 ref = numberToRef(block, instance,  node);
1274 #ifdef DBINFO_SCAN_TRACE
1275   ndbout_c("Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1276            getBlockName(block, "<unknown>"), instance, node, ref);
1277 #endif
1278   return ref;
1279 }
1280 
1281 
1282 bool
find_next(Ndbinfo::ScanCursor * cursor) const1283 LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
1284 {
1285   const Uint32 node = refToNode(cursor->currRef);
1286   const Uint32 block = refToMain(cursor->currRef);
1287   Uint32 instance = refToInstance(cursor->currRef);
1288 
1289   ndbrequire(node == getOwnNodeId());
1290   ndbrequire(block == number());
1291 
1292 
1293   Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1294 
1295   if (worker < c_workers)
1296   {
1297     jam();
1298     cursor->currRef = switchRef(block, workerInstance(worker), node);
1299     return true;
1300   }
1301 
1302   cursor->currRef = numberToRef(block, node);
1303   return false;
1304 }
1305 
1306 
1307 
1308 void
execDBINFO_SCANREQ(Signal * signal)1309 LocalProxy::execDBINFO_SCANREQ(Signal* signal)
1310 {
1311   jamEntry();
1312   const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
1313   Uint32 signal_length = signal->getLength();
1314   ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1315 
1316   Ndbinfo::ScanCursor* cursor =
1317     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
1318 
1319   if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
1320       cursor->saveCurrRef)
1321   {
1322     jam();
1323     /* Continue in the saved block ref */
1324     cursor->currRef = cursor->saveCurrRef;
1325     cursor->saveCurrRef = 0;
1326 
1327     // Set this block as sender and remember original sender
1328     cursor->saveSenderRef = cursor->senderRef;
1329     cursor->senderRef = reference();
1330 
1331     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1332                signal, signal_length, JBB);
1333     return;
1334   }
1335 
1336   Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
1337 
1338   if (find_next(cursor))
1339   {
1340     jam();
1341     ndbrequire(cursor->currRef);
1342     ndbrequire(cursor->saveCurrRef == 0);
1343 
1344     // Set this block as sender and remember original sender
1345     cursor->saveSenderRef = cursor->senderRef;
1346     cursor->senderRef = reference();
1347 
1348     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1349                signal, signal_length, JBB);
1350     return;
1351   }
1352 
1353   /* Scan is done, send SCANCONF back to caller  */
1354   ndbrequire(cursor->saveSenderRef == 0);
1355 
1356   ndbrequire(cursor->currRef);
1357   ndbrequire(cursor->saveCurrRef == 0);
1358 
1359   ndbrequire(refToInstance(cursor->currRef) == 0);
1360   sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1361   return;
1362 }
1363 
1364 void
execDBINFO_SCANCONF(Signal * signal)1365 LocalProxy::execDBINFO_SCANCONF(Signal* signal)
1366 {
1367   jamEntry();
1368   const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
1369   Uint32 signal_length = signal->getLength();
1370   ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1371 
1372   Ndbinfo::ScanCursor* cursor =
1373     (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
1374 
1375   if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
1376   {
1377     /* The instance has more data and want to continue */
1378     jam();
1379 
1380     /* Swap back saved senderRef */
1381     const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1382     cursor->saveSenderRef = 0;
1383 
1384     /* Save currRef to continue with same instance again */
1385     cursor->saveCurrRef = cursor->currRef;
1386     cursor->currRef = reference();
1387 
1388     sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1389     return;
1390   }
1391 
1392   if (conf->returnedRows)
1393   {
1394     jam();
1395     /*
1396        The instance has no more data, but it has sent rows
1397        to the API which need to be CONFed
1398     */
1399 
1400     /* Swap back saved senderRef */
1401     const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1402     cursor->saveSenderRef = 0;
1403 
1404     if (find_next(cursor))
1405     {
1406       /*
1407         There is another instance to continue in - signal 'more data'
1408         and setup saveCurrRef to continue in that instance
1409       */
1410       jam();
1411       Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
1412 
1413       cursor->saveCurrRef = cursor->currRef;
1414       cursor->currRef = reference();
1415     }
1416     else
1417     {
1418       /* There was no more instances to continue in */
1419       ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) == false);
1420 
1421       ndbrequire(cursor->currRef == reference());
1422       cursor->saveCurrRef = 0;
1423     }
1424 
1425     sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1426     return;
1427   }
1428 
1429 
1430   /* The underlying block reported completed, find next if any */
1431   if (find_next(cursor))
1432   {
1433     jam();
1434 
1435     ndbrequire(cursor->senderRef == reference());
1436     ndbrequire(cursor->saveSenderRef); // Should already be set
1437 
1438     ndbrequire(cursor->saveCurrRef == 0);
1439 
1440     sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1441                signal, signal_length, JBB);
1442     return;
1443   }
1444 
1445   /* Scan in this block and its instances are completed */
1446 
1447   /* Swap back saved senderRef */
1448   const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1449   cursor->saveSenderRef = 0;
1450 
1451   ndbrequire(cursor->currRef);
1452   ndbrequire(cursor->saveCurrRef == 0);
1453 
1454   sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1455   return;
1456 }
1457 
1458 // GSN_SYNC_REQ
1459 
1460 void
execSYNC_REQ(Signal * signal)1461 LocalProxy::execSYNC_REQ(Signal* signal)
1462 {
1463   jam();
1464   Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1465 
1466   ss.m_req = * CAST_CONSTPTR(SyncReq, signal->getDataPtr());
1467 
1468   sendREQ(signal, ss);
1469 }
1470 
1471 void
sendSYNC_REQ(Signal * signal,Uint32 ssId,SectionHandle * handle)1472 LocalProxy::sendSYNC_REQ(Signal* signal, Uint32 ssId,
1473                          SectionHandle* handle)
1474 {
1475   jam();
1476   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1477 
1478   SyncReq * req = CAST_PTR(SyncReq, signal->getDataPtrSend());
1479   req->senderRef = reference();
1480   req->senderData = ssId;
1481   req->prio = ss.m_req.prio;
1482 
1483   sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1484                       signal, SyncReq::SignalLength,
1485                       JobBufferLevel(ss.m_req.prio), handle);
1486 }
1487 
1488 void
execSYNC_REF(Signal * signal)1489 LocalProxy::execSYNC_REF(Signal* signal)
1490 {
1491   jam();
1492   SyncRef ref = * CAST_CONSTPTR(SyncRef, signal->getDataPtr());
1493   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1494 
1495   recvREF(signal, ss, ref.errorCode);
1496 }
1497 
1498 void
execSYNC_CONF(Signal * signal)1499 LocalProxy::execSYNC_CONF(Signal* signal)
1500 {
1501   jam();
1502   SyncConf conf = * CAST_CONSTPTR(SyncConf, signal->getDataPtr());
1503   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1504 
1505   recvCONF(signal, ss);
1506 }
1507 
1508 void
sendSYNC_CONF(Signal * signal,Uint32 ssId)1509 LocalProxy::sendSYNC_CONF(Signal* signal, Uint32 ssId)
1510 {
1511   jam();
1512   Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1513 
1514   if (!lastReply(ss))
1515   {
1516     jam();
1517     return;
1518   }
1519 
1520   /**
1521    * SimulatedBlock::execSYNC_REQ will reply
1522    */
1523   if (ss.m_error == 0)
1524   {
1525     jam();
1526     SyncConf * conf = CAST_PTR(SyncConf, signal->getDataPtrSend());
1527     conf->senderRef = reference();
1528     conf->senderData = ss.m_req.senderData;
1529 
1530     Uint32 prio = ss.m_req.prio;
1531     sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1532                SyncConf::SignalLength,
1533                JobBufferLevel(prio));
1534   }
1535   else
1536   {
1537     jam();
1538     SyncRef * ref = CAST_PTR(SyncRef, signal->getDataPtrSend());
1539     ref->senderRef = reference();
1540     ref->senderData = ss.m_req.senderData;
1541     ref->errorCode = ss.m_error;
1542 
1543     Uint32 prio = ss.m_req.prio;
1544     sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1545                SyncRef::SignalLength,
1546                JobBufferLevel(prio));
1547   }
1548   ssRelease<Ss_SYNC_REQ>(ssId);
1549 }
1550 
1551 void
execSYNC_PATH_REQ(Signal * signal)1552 LocalProxy::execSYNC_PATH_REQ(Signal* signal)
1553 {
1554   jam();
1555   SyncPathReq* req = CAST_PTR(SyncPathReq, signal->getDataPtrSend());
1556   req->count *= c_workers;
1557 
1558   for (Uint32 i = 0; i < c_workers; i++)
1559   {
1560     jam();
1561     Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1562     sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1563                signal->getLength(),
1564                JobBufferLevel(req->prio));
1565   }
1566 }
1567 
1568 // GSN_API_FAILREQ
1569 
1570 void
execAPI_FAILREQ(Signal * signal)1571 LocalProxy::execAPI_FAILREQ(Signal* signal)
1572 {
1573   jam();
1574   Uint32 nodeId = signal->theData[0];
1575   Ss_API_FAILREQ& ss = ssSeize<Ss_API_FAILREQ>(nodeId);
1576 
1577   ss.m_ref = signal->theData[1];
1578   sendREQ(signal, ss);
1579 }
1580 
1581 void
sendAPI_FAILREQ(Signal * signal,Uint32 nodeId,SectionHandle *)1582 LocalProxy::sendAPI_FAILREQ(Signal* signal, Uint32 nodeId, SectionHandle*)
1583 {
1584   jam();
1585   Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(nodeId);
1586 
1587   /*
1588    * It is a requirement that the API_FAILREQ signal is
1589    * received as the last signal from the failed 'nodeId'.
1590    * It is produced by QMGR and ROUTEed via the TRPMAN(s)
1591    * (in the recv-thread) with the intention that it will
1592    * enter the job buffers after any signals received
1593    * from the failed API node. (Note that the transporter
1594    * *is* closed when API_FAILREQ is created, so any more
1595    * signals should not arrive from this API node).
1596    *
1597    * However, sending the API_FAILREQ via the proxy
1598    * will insert it in another job buffer queue than signals
1599    * received from the API-node:
1600    *
1601    *  API-request :       recv/TRPMAN-thread  ---> Block-instance
1602    *                                               ^
1603    *  Routed-API_FAILREQ: recv/TRPMAN-thread      /
1604    *                                     \       /
1605    *                                       Proxy
1606    *
1607    * Depening on the order the queues are processed, the
1608    * API_FAILREQ may then be processed by the Block-instance
1609    * before a API-request from the failed node - Which is
1610    * not what we want.
1611    *
1612    * Thus we let any proxies receiving a API_FAILREQ, route it
1613    * back to the recv/TRPMAN. There it will be inserted in
1614    * the same queue as the API-requests, and thus remove the
1615    * posibilities for being overtaken:
1616    *
1617    *  Routed-API_FAILREQ: recv/TRPMAN-thread
1618    *                                     \
1619    *  re-route to TRPMAN:  -----------<-Proxy (we are here)
1620    *                       |
1621    *                       v
1622    *  API-request+FAIL :   recv/TRPMAN-thread  ---> Block-instance
1623    */
1624 
1625   /* API_FAILREQ signal: */
1626   signal->theData[0] = nodeId;
1627   signal->theData[1] = reference();
1628 
1629   Uint32 routedSignalSectionI = RNIL;
1630   ndbrequire(appendToSection(routedSignalSectionI,
1631                              &signal->theData[0],
1632                              2));
1633   SectionHandle handle(this, routedSignalSectionI);
1634 
1635   /* RouteOrd data */
1636   RouteOrd* routeOrd = (RouteOrd*) signal->getDataPtrSend();
1637 
1638   routeOrd->srcRef = reference();
1639   routeOrd->gsn = GSN_API_FAILREQ;
1640   routeOrd->from = nodeId;
1641   routeOrd->dstRef = workerRef(ss.m_worker);
1642   /* ROUTE it through the TRPMAN */
1643   sendSignal(TRPMAN_REF, GSN_ROUTE_ORD, signal,
1644              RouteOrd::SignalLength,
1645              JBB, &handle);
1646 }
1647 
1648 void
execAPI_FAILCONF(Signal * signal)1649 LocalProxy::execAPI_FAILCONF(Signal* signal)
1650 {
1651   jam();
1652   Uint32 nodeId = signal->theData[0];
1653   Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(nodeId);
1654   recvCONF(signal, ss);
1655 }
1656 
1657 void
sendAPI_FAILCONF(Signal * signal,Uint32 ssId)1658 LocalProxy::sendAPI_FAILCONF(Signal* signal, Uint32 ssId)
1659 {
1660   jam();
1661   Ss_API_FAILREQ& ss = ssFind<Ss_API_FAILREQ>(ssId);
1662 
1663   if (!lastReply(ss))
1664   {
1665     jam();
1666     return;
1667   }
1668 
1669   signal->theData[0] = ssId;
1670   signal->theData[1] = reference();
1671   sendSignal(ss.m_ref, GSN_API_FAILCONF,
1672              signal, 2, JBB);
1673 
1674   ssRelease<Ss_API_FAILREQ>(ssId);
1675 }
1676 
1677 BLOCK_FUNCTIONS(LocalProxy)
1678