1 /*
2   Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3 
4   This program is free software; you can redistribute it and/or modify
5   it under the terms of the GNU General Public License, version 2.0,
6   as published by the Free Software Foundation.
7 
8   This program is also distributed with certain software (including
9   but not limited to OpenSSL) that is licensed under separate terms,
10   as designated in a particular file or component or in included license
11   documentation.  The authors of MySQL hereby grant you an additional
12   permission to link the program and your derivative works with the
13   separately licensed software that they have included with MySQL.
14 
15   This program is distributed in the hope that it will be useful,
16   but WITHOUT ANY WARRANTY; without even the implied warranty of
17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   GNU General Public License, version 2.0, for more details.
19 
20   You should have received a copy of the GNU General Public License
21   along with this program; if not, write to the Free Software
22   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 #include "trpman.hpp"
26 #include <TransporterRegistry.hpp>
27 #include <signaldata/CloseComReqConf.hpp>
28 #include <signaldata/DisconnectRep.hpp>
29 #include <signaldata/EnableCom.hpp>
30 #include <signaldata/RouteOrd.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32 
33 #include <mt.hpp>
34 #include <EventLogger.hpp>
35 extern EventLogger * g_eventLogger;
36 
37 #define JAM_FILE_ID 430
38 
39 
Trpman(Block_context & ctx,Uint32 instanceno)40 Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
41   SimulatedBlock(TRPMAN, ctx, instanceno)
42 {
43   BLOCK_CONSTRUCTOR(Trpman);
44 
45   addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
46   addRecSignal(GSN_CLOSE_COMCONF, &Trpman::execCLOSE_COMCONF);
47   addRecSignal(GSN_OPEN_COMORD, &Trpman::execOPEN_COMORD);
48   addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
49   addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
50   addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP);
51   addRecSignal(GSN_ROUTE_ORD, &Trpman::execROUTE_ORD);
52 
53   addRecSignal(GSN_NDB_TAMPER, &Trpman::execNDB_TAMPER, true);
54   addRecSignal(GSN_DUMP_STATE_ORD, &Trpman::execDUMP_STATE_ORD);
55   addRecSignal(GSN_DBINFO_SCANREQ, &Trpman::execDBINFO_SCANREQ);
56 }
57 
~Trpman()58 Trpman::~Trpman()
59 {
60 }
61 
62 BLOCK_FUNCTIONS(Trpman)
63 
64 #ifdef ERROR_INSERT
65 static NodeBitmask c_error_9000_nodes_mask;
66 extern Uint32 MAX_RECEIVED_SIGNALS;
67 #endif
68 
69 bool
handles_this_node(Uint32 nodeId)70 Trpman::handles_this_node(Uint32 nodeId)
71 {
72   /* If there's only one receiver then no question */
73   if (globalData.ndbMtReceiveThreads <= (Uint32)1)
74     return true;
75 
76   /* There's a global receiver->thread index - look it up */
77   return (instance() == (get_recv_thread_idx(nodeId) + /* proxy */ 1));
78 }
79 
80 void
execOPEN_COMORD(Signal * signal)81 Trpman::execOPEN_COMORD(Signal* signal)
82 {
83   // Connect to the specifed NDB node, only QMGR allowed communication
84   // so far with the node
85 
86   const BlockReference userRef = signal->theData[0];
87   Uint32 tStartingNode = signal->theData[1];
88   Uint32 tData2 = signal->theData[2];
89   jamEntry();
90 
91   const Uint32 len = signal->getLength();
92   if (len == 2)
93   {
94 #ifdef ERROR_INSERT
95     if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
96 	   && c_error_9000_nodes_mask.get(tStartingNode)))
97 #endif
98     {
99       if (!handles_this_node(tStartingNode))
100       {
101         jam();
102         goto done;
103       }
104 
105       globalTransporterRegistry.do_connect(tStartingNode);
106       globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
107 
108       //-----------------------------------------------------
109       // Report that the connection to the node is opened
110       //-----------------------------------------------------
111       signal->theData[0] = NDB_LE_CommunicationOpened;
112       signal->theData[1] = tStartingNode;
113       sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
114       //-----------------------------------------------------
115     }
116   }
117   else
118   {
119     for(unsigned int i = 1; i < MAX_NODES; i++ )
120     {
121       jam();
122       if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2 &&
123           handles_this_node(i))
124       {
125 	jam();
126 
127 #ifdef ERROR_INSERT
128 	if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
129 	    && c_error_9000_nodes_mask.get(i))
130 	  continue;
131 #endif
132 	globalTransporterRegistry.do_connect(i);
133 	globalTransporterRegistry.setIOState(i, HaltIO);
134 
135 	signal->theData[0] = NDB_LE_CommunicationOpened;
136 	signal->theData[1] = i;
137 	sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
138       }
139     }
140   }
141 
142 done:
143   /**
144    * NO REPLY for now
145    */
146   (void)userRef;
147 }
148 
149 void
execCONNECT_REP(Signal * signal)150 Trpman::execCONNECT_REP(Signal *signal)
151 {
152   const Uint32 hostId = signal->theData[0];
153   jamEntry();
154 
155   const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type;
156   ndbrequire(type != NodeInfo::INVALID);
157 
158   /**
159    * Inform QMGR that client has connected
160    */
161   signal->theData[0] = hostId;
162   if (ERROR_INSERTED(9005))
163   {
164     sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1);
165   }
166   else
167   {
168     sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA);
169   }
170 
171   /* Automatically subscribe events for MGM nodes.
172    */
173   if (type == NodeInfo::MGM)
174   {
175     jam();
176     globalTransporterRegistry.setIOState(hostId, NoHalt);
177   }
178 
179   //------------------------------------------
180   // Also report this event to the Event handler
181   //------------------------------------------
182   signal->theData[0] = NDB_LE_Connected;
183   signal->theData[1] = hostId;
184   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
185 }
186 
187 void
execCLOSE_COMREQ(Signal * signal)188 Trpman::execCLOSE_COMREQ(Signal* signal)
189 {
190   // Close communication with the node and halt input/output from
191   // other blocks than QMGR
192 
193   CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0];
194 
195   const BlockReference userRef = closeCom->xxxBlockRef;
196   Uint32 requestType = closeCom->requestType;
197   Uint32 failNo = closeCom->failNo;
198   Uint32 noOfNodes = closeCom->noOfNodes;
199   Uint32 found_nodes = 0;
200 
201   jamEntry();
202   for (unsigned i = 1; i < MAX_NODES; i++)
203   {
204     if (NodeBitmask::get(closeCom->theNodes, i))
205     {
206       found_nodes++;
207       if (handles_this_node(i))
208       {
209         jam();
210 
211         //-----------------------------------------------------
212         // Report that the connection to the node is closed
213         //-----------------------------------------------------
214         signal->theData[0] = NDB_LE_CommunicationClosed;
215         signal->theData[1] = i;
216         sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
217 
218         globalTransporterRegistry.setIOState(i, HaltIO);
219         globalTransporterRegistry.do_disconnect(i);
220       }
221     }
222   }
223   ndbrequire(noOfNodes == found_nodes);
224 
225   if (requestType != CloseComReqConf::RT_NO_REPLY)
226   {
227     ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) ||
228               ((requestType == CloseComReqConf::RT_NODE_FAILURE) &&
229                (failNo != 0)));
230     jam();
231     CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend();
232     closeComConf->xxxBlockRef = userRef;
233     closeComConf->requestType = requestType;
234     closeComConf->failNo = failNo;
235 
236     /* Note assumption that noOfNodes and theNodes
237      * bitmap is not trampled above
238      * signals received from the remote node.
239      */
240     sendSignal(TRPMAN_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
241   }
242 }
243 
244 /*
245   We need to implement CLOSE_COMCONF signal for the non-multithreaded
246   case where message should go to QMGR, for multithreaded case it
247   needs to pass through TRPMAN proxy on its way back.
248 */
249 void
execCLOSE_COMCONF(Signal * signal)250 Trpman::execCLOSE_COMCONF(Signal *signal)
251 {
252   jamEntry();
253   sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
254 }
255 
256 void
execENABLE_COMREQ(Signal * signal)257 Trpman::execENABLE_COMREQ(Signal* signal)
258 {
259   jamEntry();
260   const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
261 
262   /* Need to copy out signal data to not clobber it with sendSignal(). */
263   BlockReference senderRef = enableComReq->m_senderRef;
264   Uint32 senderData = enableComReq->m_senderData;
265   Uint32 nodes[NodeBitmask::Size];
266   MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
267 
268   /* Enable communication with all our NDB blocks to these nodes. */
269   Uint32 search_from = 1;
270   for (;;)
271   {
272     Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
273     if (tStartingNode == NodeBitmask::NotFound)
274       break;
275     search_from = tStartingNode + 1;
276 
277     if (!handles_this_node(tStartingNode))
278       continue;
279     globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
280     setNodeInfo(tStartingNode).m_connected = true;
281 
282     //-----------------------------------------------------
283     // Report that the version of the node
284     //-----------------------------------------------------
285     signal->theData[0] = NDB_LE_ConnectedApiVersion;
286     signal->theData[1] = tStartingNode;
287     signal->theData[2] = getNodeInfo(tStartingNode).m_version;
288     signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version;
289 
290     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB);
291     //-----------------------------------------------------
292   }
293 
294   EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend();
295   enableComConf->m_senderRef = reference();
296   enableComConf->m_senderData = senderData;
297   MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size);
298   sendSignal(senderRef, GSN_ENABLE_COMCONF, signal,
299              EnableComConf::SignalLength, JBA);
300 }
301 
302 void
execDISCONNECT_REP(Signal * signal)303 Trpman::execDISCONNECT_REP(Signal *signal)
304 {
305   const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
306   const Uint32 hostId = rep->nodeId;
307   jamEntry();
308 
309   setNodeInfo(hostId).m_connected = false;
310   setNodeInfo(hostId).m_connectCount++;
311   const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
312   ndbrequire(type != NodeInfo::INVALID);
313 
314   sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
315              DisconnectRep::SignalLength, JBA);
316 
317   signal->theData[0] = hostId;
318   sendSignal(CMVMI_REF, GSN_CANCEL_SUBSCRIPTION_REQ, signal, 1, JBB);
319 
320   signal->theData[0] = NDB_LE_Disconnected;
321   signal->theData[1] = hostId;
322   sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
323 }
324 
325 /**
326  * execROUTE_ORD
327  * Allows other blocks to route signals as if they
328  * came from TRPMAN
329  * Useful in ndbmtd for synchronising signals w.r.t
330  * external signals received from other nodes which
331  * arrive from the same thread that runs TRPMAN
332  */
333 void
execROUTE_ORD(Signal * signal)334 Trpman::execROUTE_ORD(Signal* signal)
335 {
336   jamEntry();
337   if (!assembleFragments(signal))
338   {
339     jam();
340     return;
341   }
342 
343   SectionHandle handle(this, signal);
344 
345   RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
346   Uint32 dstRef = ord->dstRef;
347   Uint32 srcRef = ord->srcRef;
348   Uint32 gsn = ord->gsn;
349   /* ord->cnt ignored */
350 
351   Uint32 nodeId = refToNode(dstRef);
352 
353   if (likely((nodeId == 0) ||
354              getNodeInfo(nodeId).m_connected))
355   {
356     jam();
357     Uint32 secCount = handle.m_cnt;
358     ndbrequire(secCount >= 1 && secCount <= 3);
359 
360     jamLine(secCount);
361 
362     /**
363      * Put section 0 in signal->theData
364      */
365     Uint32 sigLen = handle.m_ptr[0].sz;
366     ndbrequire(sigLen <= 25);
367     copy(signal->theData, handle.m_ptr[0]);
368 
369     SegmentedSectionPtr save = handle.m_ptr[0];
370     for (Uint32 i = 0; i < secCount - 1; i++)
371       handle.m_ptr[i] = handle.m_ptr[i+1];
372     handle.m_cnt--;
373 
374     sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle);
375 
376     handle.m_cnt = 1;
377     handle.m_ptr[0] = save;
378     releaseSections(handle);
379     return ;
380   }
381 
382   releaseSections(handle);
383   warningEvent("Unable to route GSN: %d from %x to %x",
384 	       gsn, srcRef, dstRef);
385 }
386 
387 void
execDBINFO_SCANREQ(Signal * signal)388 Trpman::execDBINFO_SCANREQ(Signal *signal)
389 {
390   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
391   const Ndbinfo::ScanCursor* cursor =
392     CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
393   Ndbinfo::Ratelimit rl;
394   char addr_buf[NDB_ADDR_STRLEN];
395 
396   jamEntry();
397 
398   switch(req.tableId){
399   case Ndbinfo::TRANSPORTERS_TABLEID:
400   {
401     jam();
402     Uint32 rnode = cursor->data[0];
403     if (rnode == 0)
404       rnode++; // Skip node 0
405 
406     while (rnode < MAX_NODES)
407     {
408       if (!handles_this_node(rnode))
409       {
410         rnode++;
411         continue;
412       }
413 
414       switch(getNodeInfo(rnode).m_type)
415       {
416       default:
417       {
418         jam();
419         Ndbinfo::Row row(signal, req);
420         row.write_uint32(getOwnNodeId()); // Node id
421         row.write_uint32(rnode); // Remote node id
422         row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State
423 
424         if (globalTransporterRegistry.get_transporter(rnode) != NULL)
425         {
426           jam();
427           /* Connect address */
428           if (globalTransporterRegistry.get_connect_address(rnode).s_addr != 0)
429           {
430             jam();
431             struct in_addr conn_addr = globalTransporterRegistry.
432                                          get_connect_address(rnode);
433             char *addr_str = Ndb_inet_ntop(AF_INET,
434                                            static_cast<void*>(&conn_addr),
435                                            addr_buf,
436                                            (socklen_t)sizeof(addr_buf));
437             row.write_string(addr_str);
438           }
439           else
440           {
441             jam();
442             row.write_string("-");
443           }
444 
445           /* Bytes sent/received */
446           row.write_uint64(globalTransporterRegistry.get_bytes_sent(rnode));
447           row.write_uint64(globalTransporterRegistry.get_bytes_received(rnode));
448 
449           /* Connect count, overload and Slowdown states */
450           row.write_uint32(globalTransporterRegistry.get_connect_count(rnode));
451           row.write_uint32(globalTransporterRegistry.get_status_overloaded().get(rnode));
452           row.write_uint32(globalTransporterRegistry.get_overload_count(rnode));
453           row.write_uint32(globalTransporterRegistry.get_status_slowdown().get(rnode));
454           row.write_uint32(globalTransporterRegistry.get_slowdown_count(rnode));
455         }
456         else
457         {
458           /* Null transporter */
459           jam();
460           row.write_string("-");  /* Remote address */
461           row.write_uint64(0);    /* Bytes sent */
462           row.write_uint64(0);    /* Bytes received */
463           row.write_uint32(0);    /* Connect count */
464           row.write_uint32(0);    /* Overloaded */
465           row.write_uint32(0);    /* Overload_count */
466           row.write_uint32(0);    /* Slowdown */
467           row.write_uint32(0);    /* Slowdown_count */
468         }
469 
470         ndbinfo_send_row(signal, req, row, rl);
471         break;
472       }
473 
474       case NodeInfo::INVALID:
475         jam();
476        break;
477       }
478 
479       rnode++;
480       if (rl.need_break(req))
481       {
482         jam();
483         ndbinfo_send_scan_break(signal, req, rl, rnode);
484         return;
485       }
486     }
487     break;
488   }
489 
490   default:
491     break;
492   }
493 
494   ndbinfo_send_scan_conf(signal, req, rl);
495 }
496 
497 void
execNDB_TAMPER(Signal * signal)498 Trpman::execNDB_TAMPER(Signal* signal)
499 {
500   jamEntry();
501 #ifdef ERROR_INSERT
502   if (signal->theData[0] == 9003)
503   {
504     if (MAX_RECEIVED_SIGNALS < 1024)
505     {
506       MAX_RECEIVED_SIGNALS = 1024;
507     }
508     else
509     {
510       MAX_RECEIVED_SIGNALS = 1 + (rand() % 128);
511     }
512     ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS);
513     CLEAR_ERROR_INSERT_VALUE;
514   }
515 #endif
516 }//execNDB_TAMPER()
517 
518 void
execDUMP_STATE_ORD(Signal * signal)519 Trpman::execDUMP_STATE_ORD(Signal* signal)
520 {
521   DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
522   Uint32 arg = dumpState->args[0]; (void)arg;
523 
524 #ifdef ERROR_INSERT
525   if (arg == 9000 || arg == 9002)
526   {
527     SET_ERROR_INSERT_VALUE(arg);
528     for (Uint32 i = 1; i<signal->getLength(); i++)
529       c_error_9000_nodes_mask.set(signal->theData[i]);
530   }
531 
532   if (arg == 9001)
533   {
534     CLEAR_ERROR_INSERT_VALUE;
535     if (signal->getLength() == 1 || signal->theData[1])
536     {
537       signal->header.theLength = 2;
538       for (Uint32 i = 1; i<MAX_NODES; i++)
539       {
540         if (c_error_9000_nodes_mask.get(i) &&
541             handles_this_node(i))
542         {
543           signal->theData[0] = 0;
544           signal->theData[1] = i;
545           execOPEN_COMORD(signal);
546         }
547       }
548     }
549     c_error_9000_nodes_mask.clear();
550   }
551 
552   if (arg == 9004 && signal->getLength() == 2)
553   {
554     SET_ERROR_INSERT_VALUE(9004);
555     c_error_9000_nodes_mask.clear();
556     c_error_9000_nodes_mask.set(signal->theData[1]);
557   }
558 
559   if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
560   {
561     Uint32 db = signal->theData[1];
562     Uint32 i = c_error_9000_nodes_mask.find(1);
563     if (handles_this_node(i))
564     {
565       signal->theData[0] = i;
566       sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
567       ndbout_c("stopping %u using %u", i, db);
568     }
569     CLEAR_ERROR_INSERT_VALUE;
570   }
571 #endif
572 
573 #ifdef ERROR_INSERT
574   /* <Target NodeId> dump 9992 <NodeId list>
575    * On Target NodeId, block receiving signals from NodeId list
576    *
577    * <Target NodeId> dump 9993 <NodeId list>
578    * On Target NodeId, resume receiving signals from NodeId list
579    *
580    * <Target NodeId> dump 9991
581    * On Target NodeId, resume receiving signals from any blocked node
582    *
583    *
584    * See also code in QMGR for blocking receive from nodes based
585    * on HB roles.
586    *
587    */
588   if((arg == 9993) ||  /* Unblock recv from nodeid */
589      (arg == 9992))    /* Block recv from nodeid */
590   {
591     bool block = (arg == 9992);
592     TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
593     assert(recvdata != 0);
594     for (Uint32 n = 1; n < signal->getLength(); n++)
595     {
596       Uint32 nodeId = signal->theData[n];
597       if (!handles_this_node(nodeId))
598         continue;
599 
600       if ((nodeId > 0) &&
601           (nodeId < MAX_NODES))
602       {
603         if (block)
604         {
605           g_eventLogger->info("TRPMAN : Blocking receive from node %u", nodeId);
606           globalTransporterRegistry.blockReceive(*recvdata, nodeId);
607         }
608         else
609         {
610           g_eventLogger->info("TRPMAN : Unblocking receive from node %u",
611                               nodeId);
612 
613           globalTransporterRegistry.unblockReceive(*recvdata, nodeId);
614         }
615       }
616       else
617       {
618         ndbout_c("TRPMAN : Ignoring dump %u for node %u",
619                  arg, nodeId);
620       }
621     }
622   }
623   if (arg == 9990) /* Block recv from all ndbd matching pattern */
624   {
625     Uint32 pattern = 0;
626     if (signal->getLength() > 1)
627     {
628       pattern = signal->theData[1];
629       ndbout_c("TRPMAN : Blocking receive from all ndbds matching pattern -%s-",
630                ((pattern == 1)? "Other side":"Unknown"));
631     }
632 
633     TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
634     assert(recvdata != 0);
635     for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
636     {
637       if (!handles_this_node(node))
638         continue;
639       if (globalTransporterRegistry.is_connected(node))
640       {
641         if (getNodeInfo(node).m_type == NodeInfo::DB)
642         {
643           if (!globalTransporterRegistry.isBlocked(node))
644           {
645             switch (pattern)
646             {
647             case 1:
648             {
649               /* Match if given node is on 'other side' of
650                * 2-replica cluster
651                */
652               if ((getOwnNodeId() & 1) != (node & 1))
653               {
654                 /* Node is on the 'other side', match */
655                 break;
656               }
657               /* Node is on 'my side', don't match */
658               continue;
659             }
660             default:
661               break;
662             }
663             g_eventLogger->info("TRPMAN : Blocking receive from node %u", node);
664             globalTransporterRegistry.blockReceive(*recvdata, node);
665           }
666         }
667       }
668     }
669   }
670   if (arg == 9991) /* Unblock recv from all blocked */
671   {
672     TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
673     assert(recvdata != 0);
674     for (Uint32 node = 1; node < MAX_NODES; node++)
675     {
676       if (!handles_this_node(node))
677         continue;
678       if (globalTransporterRegistry.isBlocked(node))
679       {
680         g_eventLogger->info("TRPMAN : Unblocking receive from node %u", node);
681         globalTransporterRegistry.unblockReceive(*recvdata, node);
682       }
683     }
684   }
685 #endif
686 }
687 
TrpmanProxy(Block_context & ctx)688 TrpmanProxy::TrpmanProxy(Block_context & ctx) :
689   LocalProxy(TRPMAN, ctx)
690 {
691   addRecSignal(GSN_OPEN_COMORD, &TrpmanProxy::execOPEN_COMORD);
692   addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
693   addRecSignal(GSN_ENABLE_COMCONF, &TrpmanProxy::execENABLE_COMCONF);
694   addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
695   addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
696   addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
697 }
698 
~TrpmanProxy()699 TrpmanProxy::~TrpmanProxy()
700 {
701 }
702 
703 SimulatedBlock*
newWorker(Uint32 instanceNo)704 TrpmanProxy::newWorker(Uint32 instanceNo)
705 {
706   return new Trpman(m_ctx, instanceNo);
707 }
708 
709 BLOCK_FUNCTIONS(TrpmanProxy);
710 
711 // GSN_OPEN_COMORD
712 
713 void
execOPEN_COMORD(Signal * signal)714 TrpmanProxy::execOPEN_COMORD(Signal* signal)
715 {
716   jamEntry();
717 
718   for (Uint32 i = 0; i<c_workers; i++)
719   {
720     jam();
721     sendSignal(workerRef(i), GSN_OPEN_COMORD, signal,
722                signal->getLength(), JBB);
723   }
724 }
725 
726 // GSN_CLOSE_COMREQ
727 
728 void
execCLOSE_COMREQ(Signal * signal)729 TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
730 {
731   jamEntry();
732   Ss_CLOSE_COMREQ& ss = ssSeize<Ss_CLOSE_COMREQ>();
733   const CloseComReqConf* req = (const CloseComReqConf*)signal->getDataPtr();
734   ss.m_req = *req;
735   sendREQ(signal, ss);
736 }
737 
738 void
sendCLOSE_COMREQ(Signal * signal,Uint32 ssId,SectionHandle *)739 TrpmanProxy::sendCLOSE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
740 {
741   jam();
742   Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
743   CloseComReqConf* req = (CloseComReqConf*)signal->getDataPtrSend();
744 
745   *req = ss.m_req;
746   req->xxxBlockRef = reference();
747   req->failNo = ssId;
748   sendSignal(workerRef(ss.m_worker), GSN_CLOSE_COMREQ, signal,
749              CloseComReqConf::SignalLength, JBB);
750 }
751 
752 void
execCLOSE_COMCONF(Signal * signal)753 TrpmanProxy::execCLOSE_COMCONF(Signal* signal)
754 {
755   const CloseComReqConf* conf = (const CloseComReqConf*)signal->getDataPtr();
756   Uint32 ssId = conf->failNo;
757   jamEntry();
758   Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
759   recvCONF(signal, ss);
760 }
761 
762 void
sendCLOSE_COMCONF(Signal * signal,Uint32 ssId)763 TrpmanProxy::sendCLOSE_COMCONF(Signal *signal, Uint32 ssId)
764 {
765   jam();
766   Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
767 
768   if (!lastReply(ss))
769   {
770     jam();
771     return;
772   }
773 
774   CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
775   *conf = ss.m_req;
776   sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal,
777              CloseComReqConf::SignalLength, JBB);
778   ssRelease<Ss_CLOSE_COMREQ>(ssId);
779 }
780 
781 // GSN_ENABLE_COMREQ
782 
783 void
execENABLE_COMREQ(Signal * signal)784 TrpmanProxy::execENABLE_COMREQ(Signal* signal)
785 {
786   jamEntry();
787   Ss_ENABLE_COMREQ& ss = ssSeize<Ss_ENABLE_COMREQ>();
788   const EnableComReq* req = (const EnableComReq*)signal->getDataPtr();
789   ss.m_req = *req;
790   sendREQ(signal, ss);
791 }
792 
793 void
sendENABLE_COMREQ(Signal * signal,Uint32 ssId,SectionHandle *)794 TrpmanProxy::sendENABLE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
795 {
796   jam();
797   Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
798   EnableComReq* req = (EnableComReq*)signal->getDataPtrSend();
799 
800   *req = ss.m_req;
801   req->m_senderRef = reference();
802   req->m_senderData = ssId;
803   sendSignal(workerRef(ss.m_worker), GSN_ENABLE_COMREQ, signal,
804              EnableComReq::SignalLength, JBB);
805 }
806 
807 void
execENABLE_COMCONF(Signal * signal)808 TrpmanProxy::execENABLE_COMCONF(Signal* signal)
809 {
810   const EnableComConf* conf = (const EnableComConf*)signal->getDataPtr();
811   Uint32 ssId = conf->m_senderData;
812   jamEntry();
813   Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
814   recvCONF(signal, ss);
815 }
816 
817 void
sendENABLE_COMCONF(Signal * signal,Uint32 ssId)818 TrpmanProxy::sendENABLE_COMCONF(Signal *signal, Uint32 ssId)
819 {
820   jam();
821   Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
822 
823   if (!lastReply(ss))
824   {
825     jam();
826     return;
827   }
828 
829   EnableComReq* conf = (EnableComReq*)signal->getDataPtr();
830   *conf = ss.m_req;
831   sendSignal(conf->m_senderRef, GSN_ENABLE_COMCONF, signal,
832              EnableComReq::SignalLength, JBB);
833   ssRelease<Ss_ENABLE_COMREQ>(ssId);
834 }
835 
836 // GSN_ROUTE_ORD
837 
838 void
execROUTE_ORD(Signal * signal)839 TrpmanProxy::execROUTE_ORD(Signal* signal)
840 {
841   jamEntry();
842 
843   RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
844   Uint32 nodeId = ord->from;
845   ndbassert(nodeId != 0);
846 
847   Uint32 workerIndex = 0;
848 
849   if (globalData.ndbMtReceiveThreads > (Uint32) 1)
850   {
851     workerIndex = get_recv_thread_idx(nodeId);
852     ndbrequire(workerIndex < globalData.ndbMtReceiveThreads);
853   }
854 
855   SectionHandle handle(this, signal);
856   sendSignal(workerRef(workerIndex), GSN_ROUTE_ORD, signal,
857              signal->getLength(), JBB, &handle);
858 }
859