1 /*
2 Copyright (c) 2011, 2021, Oracle and/or its affiliates.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include "trpman.hpp"
26 #include <TransporterRegistry.hpp>
27 #include <signaldata/CloseComReqConf.hpp>
28 #include <signaldata/DisconnectRep.hpp>
29 #include <signaldata/EnableCom.hpp>
30 #include <signaldata/RouteOrd.hpp>
31 #include <signaldata/DumpStateOrd.hpp>
32
33 #include <mt.hpp>
34 #include <EventLogger.hpp>
35 extern EventLogger * g_eventLogger;
36
37 #define JAM_FILE_ID 430
38
39
Trpman(Block_context & ctx,Uint32 instanceno)40 Trpman::Trpman(Block_context & ctx, Uint32 instanceno) :
41 SimulatedBlock(TRPMAN, ctx, instanceno)
42 {
43 BLOCK_CONSTRUCTOR(Trpman);
44
45 addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
46 addRecSignal(GSN_CLOSE_COMCONF, &Trpman::execCLOSE_COMCONF);
47 addRecSignal(GSN_OPEN_COMORD, &Trpman::execOPEN_COMORD);
48 addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
49 addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
50 addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP);
51 addRecSignal(GSN_ROUTE_ORD, &Trpman::execROUTE_ORD);
52
53 addRecSignal(GSN_NDB_TAMPER, &Trpman::execNDB_TAMPER, true);
54 addRecSignal(GSN_DUMP_STATE_ORD, &Trpman::execDUMP_STATE_ORD);
55 addRecSignal(GSN_DBINFO_SCANREQ, &Trpman::execDBINFO_SCANREQ);
56 }
57
~Trpman()58 Trpman::~Trpman()
59 {
60 }
61
62 BLOCK_FUNCTIONS(Trpman)
63
64 #ifdef ERROR_INSERT
65 static NodeBitmask c_error_9000_nodes_mask;
66 extern Uint32 MAX_RECEIVED_SIGNALS;
67 #endif
68
69 bool
handles_this_node(Uint32 nodeId)70 Trpman::handles_this_node(Uint32 nodeId)
71 {
72 /* If there's only one receiver then no question */
73 if (globalData.ndbMtReceiveThreads <= (Uint32)1)
74 return true;
75
76 /* There's a global receiver->thread index - look it up */
77 return (instance() == (get_recv_thread_idx(nodeId) + /* proxy */ 1));
78 }
79
80 void
execOPEN_COMORD(Signal * signal)81 Trpman::execOPEN_COMORD(Signal* signal)
82 {
83 // Connect to the specifed NDB node, only QMGR allowed communication
84 // so far with the node
85
86 const BlockReference userRef = signal->theData[0];
87 Uint32 tStartingNode = signal->theData[1];
88 Uint32 tData2 = signal->theData[2];
89 jamEntry();
90
91 const Uint32 len = signal->getLength();
92 if (len == 2)
93 {
94 #ifdef ERROR_INSERT
95 if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
96 && c_error_9000_nodes_mask.get(tStartingNode)))
97 #endif
98 {
99 if (!handles_this_node(tStartingNode))
100 {
101 jam();
102 goto done;
103 }
104
105 globalTransporterRegistry.do_connect(tStartingNode);
106 globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
107
108 //-----------------------------------------------------
109 // Report that the connection to the node is opened
110 //-----------------------------------------------------
111 signal->theData[0] = NDB_LE_CommunicationOpened;
112 signal->theData[1] = tStartingNode;
113 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
114 //-----------------------------------------------------
115 }
116 }
117 else
118 {
119 for(unsigned int i = 1; i < MAX_NODES; i++ )
120 {
121 jam();
122 if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2 &&
123 handles_this_node(i))
124 {
125 jam();
126
127 #ifdef ERROR_INSERT
128 if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
129 && c_error_9000_nodes_mask.get(i))
130 continue;
131 #endif
132 globalTransporterRegistry.do_connect(i);
133 globalTransporterRegistry.setIOState(i, HaltIO);
134
135 signal->theData[0] = NDB_LE_CommunicationOpened;
136 signal->theData[1] = i;
137 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
138 }
139 }
140 }
141
142 done:
143 /**
144 * NO REPLY for now
145 */
146 (void)userRef;
147 }
148
149 void
execCONNECT_REP(Signal * signal)150 Trpman::execCONNECT_REP(Signal *signal)
151 {
152 const Uint32 hostId = signal->theData[0];
153 jamEntry();
154
155 const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type;
156 ndbrequire(type != NodeInfo::INVALID);
157
158 /**
159 * Inform QMGR that client has connected
160 */
161 signal->theData[0] = hostId;
162 if (ERROR_INSERTED(9005))
163 {
164 sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1);
165 }
166 else
167 {
168 sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA);
169 }
170
171 /* Automatically subscribe events for MGM nodes.
172 */
173 if (type == NodeInfo::MGM)
174 {
175 jam();
176 globalTransporterRegistry.setIOState(hostId, NoHalt);
177 }
178
179 //------------------------------------------
180 // Also report this event to the Event handler
181 //------------------------------------------
182 signal->theData[0] = NDB_LE_Connected;
183 signal->theData[1] = hostId;
184 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
185 }
186
187 void
execCLOSE_COMREQ(Signal * signal)188 Trpman::execCLOSE_COMREQ(Signal* signal)
189 {
190 // Close communication with the node and halt input/output from
191 // other blocks than QMGR
192
193 CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0];
194
195 const BlockReference userRef = closeCom->xxxBlockRef;
196 Uint32 requestType = closeCom->requestType;
197 Uint32 failNo = closeCom->failNo;
198 Uint32 noOfNodes = closeCom->noOfNodes;
199 Uint32 found_nodes = 0;
200
201 jamEntry();
202 for (unsigned i = 1; i < MAX_NODES; i++)
203 {
204 if (NodeBitmask::get(closeCom->theNodes, i))
205 {
206 found_nodes++;
207 if (handles_this_node(i))
208 {
209 jam();
210
211 //-----------------------------------------------------
212 // Report that the connection to the node is closed
213 //-----------------------------------------------------
214 signal->theData[0] = NDB_LE_CommunicationClosed;
215 signal->theData[1] = i;
216 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
217
218 globalTransporterRegistry.setIOState(i, HaltIO);
219 globalTransporterRegistry.do_disconnect(i);
220 }
221 }
222 }
223 ndbrequire(noOfNodes == found_nodes);
224
225 if (requestType != CloseComReqConf::RT_NO_REPLY)
226 {
227 ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) ||
228 ((requestType == CloseComReqConf::RT_NODE_FAILURE) &&
229 (failNo != 0)));
230 jam();
231 CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend();
232 closeComConf->xxxBlockRef = userRef;
233 closeComConf->requestType = requestType;
234 closeComConf->failNo = failNo;
235
236 /* Note assumption that noOfNodes and theNodes
237 * bitmap is not trampled above
238 * signals received from the remote node.
239 */
240 sendSignal(TRPMAN_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
241 }
242 }
243
244 /*
245 We need to implement CLOSE_COMCONF signal for the non-multithreaded
246 case where message should go to QMGR, for multithreaded case it
247 needs to pass through TRPMAN proxy on its way back.
248 */
249 void
execCLOSE_COMCONF(Signal * signal)250 Trpman::execCLOSE_COMCONF(Signal *signal)
251 {
252 jamEntry();
253 sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
254 }
255
256 void
execENABLE_COMREQ(Signal * signal)257 Trpman::execENABLE_COMREQ(Signal* signal)
258 {
259 jamEntry();
260 const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
261
262 /* Need to copy out signal data to not clobber it with sendSignal(). */
263 BlockReference senderRef = enableComReq->m_senderRef;
264 Uint32 senderData = enableComReq->m_senderData;
265 Uint32 nodes[NodeBitmask::Size];
266 MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
267
268 /* Enable communication with all our NDB blocks to these nodes. */
269 Uint32 search_from = 1;
270 for (;;)
271 {
272 Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
273 if (tStartingNode == NodeBitmask::NotFound)
274 break;
275 search_from = tStartingNode + 1;
276
277 if (!handles_this_node(tStartingNode))
278 continue;
279 globalTransporterRegistry.setIOState(tStartingNode, NoHalt);
280 setNodeInfo(tStartingNode).m_connected = true;
281
282 //-----------------------------------------------------
283 // Report that the version of the node
284 //-----------------------------------------------------
285 signal->theData[0] = NDB_LE_ConnectedApiVersion;
286 signal->theData[1] = tStartingNode;
287 signal->theData[2] = getNodeInfo(tStartingNode).m_version;
288 signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version;
289
290 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB);
291 //-----------------------------------------------------
292 }
293
294 EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend();
295 enableComConf->m_senderRef = reference();
296 enableComConf->m_senderData = senderData;
297 MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size);
298 sendSignal(senderRef, GSN_ENABLE_COMCONF, signal,
299 EnableComConf::SignalLength, JBA);
300 }
301
302 void
execDISCONNECT_REP(Signal * signal)303 Trpman::execDISCONNECT_REP(Signal *signal)
304 {
305 const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
306 const Uint32 hostId = rep->nodeId;
307 jamEntry();
308
309 setNodeInfo(hostId).m_connected = false;
310 setNodeInfo(hostId).m_connectCount++;
311 const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
312 ndbrequire(type != NodeInfo::INVALID);
313
314 sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
315 DisconnectRep::SignalLength, JBA);
316
317 signal->theData[0] = hostId;
318 sendSignal(CMVMI_REF, GSN_CANCEL_SUBSCRIPTION_REQ, signal, 1, JBB);
319
320 signal->theData[0] = NDB_LE_Disconnected;
321 signal->theData[1] = hostId;
322 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
323 }
324
325 /**
326 * execROUTE_ORD
327 * Allows other blocks to route signals as if they
328 * came from TRPMAN
329 * Useful in ndbmtd for synchronising signals w.r.t
330 * external signals received from other nodes which
331 * arrive from the same thread that runs TRPMAN
332 */
333 void
execROUTE_ORD(Signal * signal)334 Trpman::execROUTE_ORD(Signal* signal)
335 {
336 jamEntry();
337 if (!assembleFragments(signal))
338 {
339 jam();
340 return;
341 }
342
343 SectionHandle handle(this, signal);
344
345 RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
346 Uint32 dstRef = ord->dstRef;
347 Uint32 srcRef = ord->srcRef;
348 Uint32 gsn = ord->gsn;
349 /* ord->cnt ignored */
350
351 Uint32 nodeId = refToNode(dstRef);
352
353 if (likely((nodeId == 0) ||
354 getNodeInfo(nodeId).m_connected))
355 {
356 jam();
357 Uint32 secCount = handle.m_cnt;
358 ndbrequire(secCount >= 1 && secCount <= 3);
359
360 jamLine(secCount);
361
362 /**
363 * Put section 0 in signal->theData
364 */
365 Uint32 sigLen = handle.m_ptr[0].sz;
366 ndbrequire(sigLen <= 25);
367 copy(signal->theData, handle.m_ptr[0]);
368
369 SegmentedSectionPtr save = handle.m_ptr[0];
370 for (Uint32 i = 0; i < secCount - 1; i++)
371 handle.m_ptr[i] = handle.m_ptr[i+1];
372 handle.m_cnt--;
373
374 sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle);
375
376 handle.m_cnt = 1;
377 handle.m_ptr[0] = save;
378 releaseSections(handle);
379 return ;
380 }
381
382 releaseSections(handle);
383 warningEvent("Unable to route GSN: %d from %x to %x",
384 gsn, srcRef, dstRef);
385 }
386
387 void
execDBINFO_SCANREQ(Signal * signal)388 Trpman::execDBINFO_SCANREQ(Signal *signal)
389 {
390 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
391 const Ndbinfo::ScanCursor* cursor =
392 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
393 Ndbinfo::Ratelimit rl;
394 char addr_buf[NDB_ADDR_STRLEN];
395
396 jamEntry();
397
398 switch(req.tableId){
399 case Ndbinfo::TRANSPORTERS_TABLEID:
400 {
401 jam();
402 Uint32 rnode = cursor->data[0];
403 if (rnode == 0)
404 rnode++; // Skip node 0
405
406 while (rnode < MAX_NODES)
407 {
408 if (!handles_this_node(rnode))
409 {
410 rnode++;
411 continue;
412 }
413
414 switch(getNodeInfo(rnode).m_type)
415 {
416 default:
417 {
418 jam();
419 Ndbinfo::Row row(signal, req);
420 row.write_uint32(getOwnNodeId()); // Node id
421 row.write_uint32(rnode); // Remote node id
422 row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State
423
424 if (globalTransporterRegistry.get_transporter(rnode) != NULL)
425 {
426 jam();
427 /* Connect address */
428 if (globalTransporterRegistry.get_connect_address(rnode).s_addr != 0)
429 {
430 jam();
431 struct in_addr conn_addr = globalTransporterRegistry.
432 get_connect_address(rnode);
433 char *addr_str = Ndb_inet_ntop(AF_INET,
434 static_cast<void*>(&conn_addr),
435 addr_buf,
436 (socklen_t)sizeof(addr_buf));
437 row.write_string(addr_str);
438 }
439 else
440 {
441 jam();
442 row.write_string("-");
443 }
444
445 /* Bytes sent/received */
446 row.write_uint64(globalTransporterRegistry.get_bytes_sent(rnode));
447 row.write_uint64(globalTransporterRegistry.get_bytes_received(rnode));
448
449 /* Connect count, overload and Slowdown states */
450 row.write_uint32(globalTransporterRegistry.get_connect_count(rnode));
451 row.write_uint32(globalTransporterRegistry.get_status_overloaded().get(rnode));
452 row.write_uint32(globalTransporterRegistry.get_overload_count(rnode));
453 row.write_uint32(globalTransporterRegistry.get_status_slowdown().get(rnode));
454 row.write_uint32(globalTransporterRegistry.get_slowdown_count(rnode));
455 }
456 else
457 {
458 /* Null transporter */
459 jam();
460 row.write_string("-"); /* Remote address */
461 row.write_uint64(0); /* Bytes sent */
462 row.write_uint64(0); /* Bytes received */
463 row.write_uint32(0); /* Connect count */
464 row.write_uint32(0); /* Overloaded */
465 row.write_uint32(0); /* Overload_count */
466 row.write_uint32(0); /* Slowdown */
467 row.write_uint32(0); /* Slowdown_count */
468 }
469
470 ndbinfo_send_row(signal, req, row, rl);
471 break;
472 }
473
474 case NodeInfo::INVALID:
475 jam();
476 break;
477 }
478
479 rnode++;
480 if (rl.need_break(req))
481 {
482 jam();
483 ndbinfo_send_scan_break(signal, req, rl, rnode);
484 return;
485 }
486 }
487 break;
488 }
489
490 default:
491 break;
492 }
493
494 ndbinfo_send_scan_conf(signal, req, rl);
495 }
496
497 void
execNDB_TAMPER(Signal * signal)498 Trpman::execNDB_TAMPER(Signal* signal)
499 {
500 jamEntry();
501 #ifdef ERROR_INSERT
502 if (signal->theData[0] == 9003)
503 {
504 if (MAX_RECEIVED_SIGNALS < 1024)
505 {
506 MAX_RECEIVED_SIGNALS = 1024;
507 }
508 else
509 {
510 MAX_RECEIVED_SIGNALS = 1 + (rand() % 128);
511 }
512 ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS);
513 CLEAR_ERROR_INSERT_VALUE;
514 }
515 #endif
516 }//execNDB_TAMPER()
517
518 void
execDUMP_STATE_ORD(Signal * signal)519 Trpman::execDUMP_STATE_ORD(Signal* signal)
520 {
521 DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0];
522 Uint32 arg = dumpState->args[0]; (void)arg;
523
524 #ifdef ERROR_INSERT
525 if (arg == 9000 || arg == 9002)
526 {
527 SET_ERROR_INSERT_VALUE(arg);
528 for (Uint32 i = 1; i<signal->getLength(); i++)
529 c_error_9000_nodes_mask.set(signal->theData[i]);
530 }
531
532 if (arg == 9001)
533 {
534 CLEAR_ERROR_INSERT_VALUE;
535 if (signal->getLength() == 1 || signal->theData[1])
536 {
537 signal->header.theLength = 2;
538 for (Uint32 i = 1; i<MAX_NODES; i++)
539 {
540 if (c_error_9000_nodes_mask.get(i) &&
541 handles_this_node(i))
542 {
543 signal->theData[0] = 0;
544 signal->theData[1] = i;
545 execOPEN_COMORD(signal);
546 }
547 }
548 }
549 c_error_9000_nodes_mask.clear();
550 }
551
552 if (arg == 9004 && signal->getLength() == 2)
553 {
554 SET_ERROR_INSERT_VALUE(9004);
555 c_error_9000_nodes_mask.clear();
556 c_error_9000_nodes_mask.set(signal->theData[1]);
557 }
558
559 if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004))
560 {
561 Uint32 db = signal->theData[1];
562 Uint32 i = c_error_9000_nodes_mask.find(1);
563 if (handles_this_node(i))
564 {
565 signal->theData[0] = i;
566 sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA);
567 ndbout_c("stopping %u using %u", i, db);
568 }
569 CLEAR_ERROR_INSERT_VALUE;
570 }
571 #endif
572
573 #ifdef ERROR_INSERT
574 /* <Target NodeId> dump 9992 <NodeId list>
575 * On Target NodeId, block receiving signals from NodeId list
576 *
577 * <Target NodeId> dump 9993 <NodeId list>
578 * On Target NodeId, resume receiving signals from NodeId list
579 *
580 * <Target NodeId> dump 9991
581 * On Target NodeId, resume receiving signals from any blocked node
582 *
583 *
584 * See also code in QMGR for blocking receive from nodes based
585 * on HB roles.
586 *
587 */
588 if((arg == 9993) || /* Unblock recv from nodeid */
589 (arg == 9992)) /* Block recv from nodeid */
590 {
591 bool block = (arg == 9992);
592 TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
593 assert(recvdata != 0);
594 for (Uint32 n = 1; n < signal->getLength(); n++)
595 {
596 Uint32 nodeId = signal->theData[n];
597 if (!handles_this_node(nodeId))
598 continue;
599
600 if ((nodeId > 0) &&
601 (nodeId < MAX_NODES))
602 {
603 if (block)
604 {
605 g_eventLogger->info("TRPMAN : Blocking receive from node %u", nodeId);
606 globalTransporterRegistry.blockReceive(*recvdata, nodeId);
607 }
608 else
609 {
610 g_eventLogger->info("TRPMAN : Unblocking receive from node %u",
611 nodeId);
612
613 globalTransporterRegistry.unblockReceive(*recvdata, nodeId);
614 }
615 }
616 else
617 {
618 ndbout_c("TRPMAN : Ignoring dump %u for node %u",
619 arg, nodeId);
620 }
621 }
622 }
623 if (arg == 9990) /* Block recv from all ndbd matching pattern */
624 {
625 Uint32 pattern = 0;
626 if (signal->getLength() > 1)
627 {
628 pattern = signal->theData[1];
629 ndbout_c("TRPMAN : Blocking receive from all ndbds matching pattern -%s-",
630 ((pattern == 1)? "Other side":"Unknown"));
631 }
632
633 TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
634 assert(recvdata != 0);
635 for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
636 {
637 if (!handles_this_node(node))
638 continue;
639 if (globalTransporterRegistry.is_connected(node))
640 {
641 if (getNodeInfo(node).m_type == NodeInfo::DB)
642 {
643 if (!globalTransporterRegistry.isBlocked(node))
644 {
645 switch (pattern)
646 {
647 case 1:
648 {
649 /* Match if given node is on 'other side' of
650 * 2-replica cluster
651 */
652 if ((getOwnNodeId() & 1) != (node & 1))
653 {
654 /* Node is on the 'other side', match */
655 break;
656 }
657 /* Node is on 'my side', don't match */
658 continue;
659 }
660 default:
661 break;
662 }
663 g_eventLogger->info("TRPMAN : Blocking receive from node %u", node);
664 globalTransporterRegistry.blockReceive(*recvdata, node);
665 }
666 }
667 }
668 }
669 }
670 if (arg == 9991) /* Unblock recv from all blocked */
671 {
672 TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
673 assert(recvdata != 0);
674 for (Uint32 node = 1; node < MAX_NODES; node++)
675 {
676 if (!handles_this_node(node))
677 continue;
678 if (globalTransporterRegistry.isBlocked(node))
679 {
680 g_eventLogger->info("TRPMAN : Unblocking receive from node %u", node);
681 globalTransporterRegistry.unblockReceive(*recvdata, node);
682 }
683 }
684 }
685 #endif
686 }
687
TrpmanProxy(Block_context & ctx)688 TrpmanProxy::TrpmanProxy(Block_context & ctx) :
689 LocalProxy(TRPMAN, ctx)
690 {
691 addRecSignal(GSN_OPEN_COMORD, &TrpmanProxy::execOPEN_COMORD);
692 addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
693 addRecSignal(GSN_ENABLE_COMCONF, &TrpmanProxy::execENABLE_COMCONF);
694 addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
695 addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
696 addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
697 }
698
~TrpmanProxy()699 TrpmanProxy::~TrpmanProxy()
700 {
701 }
702
703 SimulatedBlock*
newWorker(Uint32 instanceNo)704 TrpmanProxy::newWorker(Uint32 instanceNo)
705 {
706 return new Trpman(m_ctx, instanceNo);
707 }
708
709 BLOCK_FUNCTIONS(TrpmanProxy);
710
711 // GSN_OPEN_COMORD
712
713 void
execOPEN_COMORD(Signal * signal)714 TrpmanProxy::execOPEN_COMORD(Signal* signal)
715 {
716 jamEntry();
717
718 for (Uint32 i = 0; i<c_workers; i++)
719 {
720 jam();
721 sendSignal(workerRef(i), GSN_OPEN_COMORD, signal,
722 signal->getLength(), JBB);
723 }
724 }
725
726 // GSN_CLOSE_COMREQ
727
728 void
execCLOSE_COMREQ(Signal * signal)729 TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
730 {
731 jamEntry();
732 Ss_CLOSE_COMREQ& ss = ssSeize<Ss_CLOSE_COMREQ>();
733 const CloseComReqConf* req = (const CloseComReqConf*)signal->getDataPtr();
734 ss.m_req = *req;
735 sendREQ(signal, ss);
736 }
737
738 void
sendCLOSE_COMREQ(Signal * signal,Uint32 ssId,SectionHandle *)739 TrpmanProxy::sendCLOSE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
740 {
741 jam();
742 Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
743 CloseComReqConf* req = (CloseComReqConf*)signal->getDataPtrSend();
744
745 *req = ss.m_req;
746 req->xxxBlockRef = reference();
747 req->failNo = ssId;
748 sendSignal(workerRef(ss.m_worker), GSN_CLOSE_COMREQ, signal,
749 CloseComReqConf::SignalLength, JBB);
750 }
751
752 void
execCLOSE_COMCONF(Signal * signal)753 TrpmanProxy::execCLOSE_COMCONF(Signal* signal)
754 {
755 const CloseComReqConf* conf = (const CloseComReqConf*)signal->getDataPtr();
756 Uint32 ssId = conf->failNo;
757 jamEntry();
758 Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
759 recvCONF(signal, ss);
760 }
761
762 void
sendCLOSE_COMCONF(Signal * signal,Uint32 ssId)763 TrpmanProxy::sendCLOSE_COMCONF(Signal *signal, Uint32 ssId)
764 {
765 jam();
766 Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
767
768 if (!lastReply(ss))
769 {
770 jam();
771 return;
772 }
773
774 CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
775 *conf = ss.m_req;
776 sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal,
777 CloseComReqConf::SignalLength, JBB);
778 ssRelease<Ss_CLOSE_COMREQ>(ssId);
779 }
780
781 // GSN_ENABLE_COMREQ
782
783 void
execENABLE_COMREQ(Signal * signal)784 TrpmanProxy::execENABLE_COMREQ(Signal* signal)
785 {
786 jamEntry();
787 Ss_ENABLE_COMREQ& ss = ssSeize<Ss_ENABLE_COMREQ>();
788 const EnableComReq* req = (const EnableComReq*)signal->getDataPtr();
789 ss.m_req = *req;
790 sendREQ(signal, ss);
791 }
792
793 void
sendENABLE_COMREQ(Signal * signal,Uint32 ssId,SectionHandle *)794 TrpmanProxy::sendENABLE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
795 {
796 jam();
797 Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
798 EnableComReq* req = (EnableComReq*)signal->getDataPtrSend();
799
800 *req = ss.m_req;
801 req->m_senderRef = reference();
802 req->m_senderData = ssId;
803 sendSignal(workerRef(ss.m_worker), GSN_ENABLE_COMREQ, signal,
804 EnableComReq::SignalLength, JBB);
805 }
806
807 void
execENABLE_COMCONF(Signal * signal)808 TrpmanProxy::execENABLE_COMCONF(Signal* signal)
809 {
810 const EnableComConf* conf = (const EnableComConf*)signal->getDataPtr();
811 Uint32 ssId = conf->m_senderData;
812 jamEntry();
813 Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
814 recvCONF(signal, ss);
815 }
816
817 void
sendENABLE_COMCONF(Signal * signal,Uint32 ssId)818 TrpmanProxy::sendENABLE_COMCONF(Signal *signal, Uint32 ssId)
819 {
820 jam();
821 Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
822
823 if (!lastReply(ss))
824 {
825 jam();
826 return;
827 }
828
829 EnableComReq* conf = (EnableComReq*)signal->getDataPtr();
830 *conf = ss.m_req;
831 sendSignal(conf->m_senderRef, GSN_ENABLE_COMCONF, signal,
832 EnableComReq::SignalLength, JBB);
833 ssRelease<Ss_ENABLE_COMREQ>(ssId);
834 }
835
836 // GSN_ROUTE_ORD
837
838 void
execROUTE_ORD(Signal * signal)839 TrpmanProxy::execROUTE_ORD(Signal* signal)
840 {
841 jamEntry();
842
843 RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
844 Uint32 nodeId = ord->from;
845 ndbassert(nodeId != 0);
846
847 Uint32 workerIndex = 0;
848
849 if (globalData.ndbMtReceiveThreads > (Uint32) 1)
850 {
851 workerIndex = get_recv_thread_idx(nodeId);
852 ndbrequire(workerIndex < globalData.ndbMtReceiveThreads);
853 }
854
855 SectionHandle handle(this, signal);
856 sendSignal(workerRef(workerIndex), GSN_ROUTE_ORD, signal,
857 signal->getLength(), JBB, &handle);
858 }
859