1 /*
2 Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License, version 2.0,
6 as published by the Free Software Foundation.
7
8 This program is also distributed with certain software (including
9 but not limited to OpenSSL) that is licensed under separate terms,
10 as designated in a particular file or component or in included license
11 documentation. The authors of MySQL hereby grant you an additional
12 permission to link the program and your derivative works with the
13 separately licensed software that they have included with MySQL.
14
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License, version 2.0, for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with this program; if not, write to the Free Software
22 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23 */
24
25 #include <my_global.h>
26 #include "Suma.hpp"
27
28 #include <ndb_version.h>
29
30 #include <NdbTCP.h>
31 #include <Bitmask.hpp>
32 #include <SimpleProperties.hpp>
33
34 #include <signaldata/NodeFailRep.hpp>
35 #include <signaldata/ReadNodesConf.hpp>
36
37 #include <signaldata/ListTables.hpp>
38 #include <signaldata/GetTabInfo.hpp>
39 #include <signaldata/GetTableId.hpp>
40 #include <signaldata/DictTabInfo.hpp>
41 #include <signaldata/SumaImpl.hpp>
42 #include <signaldata/ScanFrag.hpp>
43 #include <signaldata/TransIdAI.hpp>
44 #include <signaldata/CreateTrigImpl.hpp>
45 #include <signaldata/DropTrigImpl.hpp>
46 #include <signaldata/FireTrigOrd.hpp>
47 #include <signaldata/TrigAttrInfo.hpp>
48 #include <signaldata/CheckNodeGroups.hpp>
49 #include <signaldata/CreateTab.hpp>
50 #include <signaldata/DropTab.hpp>
51 #include <signaldata/AlterTable.hpp>
52 #include <signaldata/AlterTab.hpp>
53 #include <signaldata/DihScanTab.hpp>
54 #include <signaldata/SystemError.hpp>
55 #include <signaldata/GCP.hpp>
56 #include <signaldata/StopMe.hpp>
57
58 #include <signaldata/DictLock.hpp>
59 #include <ndbapi/NdbDictionary.hpp>
60
61 #include <DebuggerNames.hpp>
62 #include "../dbtup/Dbtup.hpp"
63 #include "../dbdih/Dbdih.hpp"
64
65 #include <signaldata/CreateNodegroup.hpp>
66 #include <signaldata/CreateNodegroupImpl.hpp>
67
68 #include <signaldata/DropNodegroup.hpp>
69 #include <signaldata/DropNodegroupImpl.hpp>
70
71 #include <signaldata/DbinfoScan.hpp>
72 #include <signaldata/TransIdAI.hpp>
73
74 #include <EventLogger.hpp>
75 extern EventLogger * g_eventLogger;
76
77 //#define HANDOVER_DEBUG
78 //#define NODEFAIL_DEBUG
79 //#define NODEFAIL_DEBUG2
80 //#define DEBUG_SUMA_SEQUENCE
81 //#define EVENT_DEBUG
82 //#define EVENT_PH3_DEBUG
83 //#define EVENT_DEBUG2
84 #if 1
85 #undef DBUG_ENTER
86 #undef DBUG_PRINT
87 #undef DBUG_RETURN
88 #undef DBUG_VOID_RETURN
89
90 #if 0
91 #define DBUG_ENTER(a) {ndbout_c("%s:%d >%s", __FILE__, __LINE__, a);}
92 #define DBUG_PRINT(a,b) {ndbout << __FILE__ << ":" << __LINE__ << " " << a << ": "; ndbout_c b ;}
93 #define DBUG_RETURN(a) { ndbout_c("%s:%d <", __FILE__, __LINE__); return(a); }
94 #define DBUG_VOID_RETURN { ndbout_c("%s:%d <", __FILE__, __LINE__); return; }
95 #else
96 #define DBUG_ENTER(a)
97 #define DBUG_PRINT(a,b)
98 #define DBUG_RETURN(a) return a
99 #define DBUG_VOID_RETURN return
100 #endif
101
102 #endif
103
104 #define DBG_3R 0
105
106 /**
107 * @todo:
108 * SUMA crashes if an index is created at the same time as
109 * global replication. Very easy to reproduce using testIndex.
110 * Note: This only happens occasionally, but is quite easy to reprod.
111 */
112
113 Uint32 g_subPtrI = RNIL;
114 static const Uint32 SUMA_SEQUENCE = 0xBABEBABE;
115
116 static const Uint32 MAX_CONCURRENT_GCP = 2;
117
118 /**************************************************************
119 *
120 * Start of suma
121 *
122 */
123
124 #define PRINT_ONLY 0
125
126 void
execREAD_CONFIG_REQ(Signal * signal)127 Suma::execREAD_CONFIG_REQ(Signal* signal)
128 {
129 jamEntry();
130
131 const ReadConfigReq * req = (ReadConfigReq*)signal->getDataPtr();
132
133 Uint32 ref = req->senderRef;
134 Uint32 senderData = req->senderData;
135
136 const ndb_mgm_configuration_iterator * p =
137 m_ctx.m_config.getOwnConfigIterator();
138 ndbrequire(p != 0);
139
140 // SumaParticipant
141 Uint32 noTables, noAttrs, maxBufferedEpochs;
142 ndb_mgm_get_int_parameter(p, CFG_DICT_TABLE,
143 &noTables);
144 ndb_mgm_get_int_parameter(p, CFG_DICT_ATTRIBUTE,
145 &noAttrs);
146 ndb_mgm_get_int_parameter(p, CFG_DB_MAX_BUFFERED_EPOCHS,
147 &maxBufferedEpochs);
148
149 c_tablePool.setSize(noTables);
150 c_tables.setSize(noTables);
151
152 c_subscriptions.setSize(noTables);
153
154 Uint32 cnt = 0;
155 cnt = 0;
156 ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIPTIONS, &cnt);
157 if (cnt == 0)
158 {
159 jam();
160 cnt = noTables;
161 }
162 c_subscriptionPool.setSize(cnt);
163
164 cnt *= 2;
165 {
166 Uint32 val = 0;
167 ndb_mgm_get_int_parameter(p, CFG_DB_SUBSCRIBERS, &val);
168 if (val)
169 {
170 jam();
171 cnt = val;
172 }
173 }
174 c_subscriberPool.setSize(cnt);
175
176 cnt = 0;
177 ndb_mgm_get_int_parameter(p, CFG_DB_SUB_OPERATIONS, &cnt);
178 if (cnt)
179 c_subOpPool.setSize(cnt);
180 else
181 c_subOpPool.setSize(256);
182
183 c_syncPool.setSize(2);
184
185 // Trix: max 5 concurrent index stats ops with max 9 words bounds
186 Uint32 noOfBoundWords = 5 * 9;
187
188 // XXX multiplies number of words by 15 ???
189 c_dataBufferPool.setSize(noAttrs + noOfBoundWords);
190
191 c_maxBufferedEpochs = maxBufferedEpochs;
192
193 // Calculate needed gcp pool as 10 records + the ones needed
194 // during a possible api timeout
195 Uint32 dbApiHbInterval, gcpInterval, microGcpInterval = 0;
196 ndb_mgm_get_int_parameter(p, CFG_DB_API_HEARTBEAT_INTERVAL,
197 &dbApiHbInterval);
198 ndb_mgm_get_int_parameter(p, CFG_DB_GCP_INTERVAL,
199 &gcpInterval);
200 ndb_mgm_get_int_parameter(p, CFG_DB_MICRO_GCP_INTERVAL,
201 µGcpInterval);
202
203 if (microGcpInterval)
204 {
205 gcpInterval = microGcpInterval;
206 }
207 c_gcp_pool.setSize(10 + (4*dbApiHbInterval+gcpInterval-1)/gcpInterval);
208
209 c_page_chunk_pool.setSize(50);
210
211 {
212 SLList<SyncRecord> tmp(c_syncPool);
213 Ptr<SyncRecord> ptr;
214 while(tmp.seize(ptr))
215 new (ptr.p) SyncRecord(* this, c_dataBufferPool);
216 tmp.release();
217 }
218
219 // Suma
220 c_masterNodeId = getOwnNodeId();
221
222 c_nodeGroup = c_noNodesInGroup = 0;
223 for (int i = 0; i < MAX_REPLICAS; i++) {
224 c_nodesInGroup[i] = 0;
225 }
226
227 m_first_free_page= RNIL;
228
229 c_no_of_buckets = 0;
230 memset(c_buckets, 0, sizeof(c_buckets));
231 for(Uint32 i = 0; i<NO_OF_BUCKETS; i++)
232 {
233 Bucket* bucket= c_buckets+i;
234 bucket->m_buffer_tail = RNIL;
235 bucket->m_buffer_head.m_page_id = RNIL;
236 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS;
237 }
238
239 m_max_seen_gci = 0; // FIRE_TRIG_ORD
240 m_max_sent_gci = 0; // FIRE_TRIG_ORD -> send
241 m_last_complete_gci = 0; // SUB_GCP_COMPLETE_REP
242 m_gcp_complete_rep_count = 0;
243 m_out_of_buffer_gci = 0;
244 m_missing_data = false;
245
246 c_startup.m_wait_handover= false;
247 c_failedApiNodes.clear();
248
249 ReadConfigConf * conf = (ReadConfigConf*)signal->getDataPtrSend();
250 conf->senderRef = reference();
251 conf->senderData = senderData;
252 sendSignal(ref, GSN_READ_CONFIG_CONF, signal,
253 ReadConfigConf::SignalLength, JBB);
254 }
255
256 void
execSTTOR(Signal * signal)257 Suma::execSTTOR(Signal* signal) {
258 jamEntry();
259
260 DBUG_ENTER("Suma::execSTTOR");
261 m_startphase = signal->theData[1];
262 m_typeOfStart = signal->theData[7];
263
264 DBUG_PRINT("info",("startphase = %u, typeOfStart = %u",
265 m_startphase, m_typeOfStart));
266
267 if(m_startphase == 3)
268 {
269 jam();
270 void* ptr = m_ctx.m_mm.get_memroot();
271 c_page_pool.set((Buffer_page*)ptr, (Uint32)~0);
272 }
273
274 if(m_startphase == 5)
275 {
276 jam();
277
278 if (ERROR_INSERTED(13029)) /* Hold startphase 5 */
279 {
280 sendSignalWithDelay(SUMA_REF, GSN_STTOR, signal,
281 30, signal->getLength());
282 DBUG_VOID_RETURN;
283 }
284
285 signal->theData[0] = reference();
286 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
287 DBUG_VOID_RETURN;
288 }
289
290 if(m_startphase == 7)
291 {
292 if (m_typeOfStart != NodeState::ST_NODE_RESTART &&
293 m_typeOfStart != NodeState::ST_INITIAL_NODE_RESTART)
294 {
295 for( Uint32 i = 0; i < c_no_of_buckets; i++)
296 {
297 if (get_responsible_node(i) == getOwnNodeId())
298 {
299 // I'm running this bucket
300 DBUG_PRINT("info",("bucket %u set to true", i));
301 m_active_buckets.set(i);
302 ndbout_c("m_active_buckets.set(%d)", i);
303 }
304 }
305 }
306
307 if(!m_active_buckets.isclear())
308 {
309 NdbNodeBitmask tmp;
310 Uint32 bucket = 0;
311 while ((bucket = m_active_buckets.find(bucket)) != Bucket_mask::NotFound)
312 {
313 tmp.set(get_responsible_node(bucket, c_nodes_in_nodegroup_mask));
314 bucket++;
315 }
316
317 ndbassert(tmp.get(getOwnNodeId()));
318 m_gcp_complete_rep_count = m_active_buckets.count();
319 }
320 else
321 m_gcp_complete_rep_count = 0; // I contribute 1 gcp complete rep
322
323 if(m_typeOfStart == NodeState::ST_INITIAL_START &&
324 c_masterNodeId == getOwnNodeId())
325 {
326 jam();
327 createSequence(signal);
328 DBUG_VOID_RETURN;
329 }//if
330
331 if (ERROR_INSERTED(13030))
332 {
333 ndbout_c("Dont start handover");
334 DBUG_VOID_RETURN;
335 }
336 }//if
337
338 if(m_startphase == 100)
339 {
340 /**
341 * Allow API's to connect
342 */
343 sendSTTORRY(signal);
344 DBUG_VOID_RETURN;
345 }
346
347 if(m_startphase == 101)
348 {
349 if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
350 m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
351 {
352 /**
353 * Handover code here
354 */
355 c_startup.m_wait_handover= true;
356 check_start_handover(signal);
357 DBUG_VOID_RETURN;
358 }
359 }
360 sendSTTORRY(signal);
361
362 DBUG_VOID_RETURN;
363 }
364
365 #include <ndb_version.h>
366
367 void
send_dict_lock_req(Signal * signal,Uint32 state)368 Suma::send_dict_lock_req(Signal* signal, Uint32 state)
369 {
370 if (state == DictLockReq::SumaStartMe &&
371 !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
372 {
373 jam();
374 goto notsupported;
375 }
376 else if (state == DictLockReq::SumaHandOver &&
377 !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
378 {
379 jam();
380 goto notsupported;
381 }
382
383 {
384 jam();
385 DictLockReq* req = (DictLockReq*)signal->getDataPtrSend();
386 req->lockType = state;
387 req->userPtr = state;
388 req->userRef = reference();
389 sendSignal(calcDictBlockRef(c_masterNodeId),
390 GSN_DICT_LOCK_REQ, signal, DictLockReq::SignalLength, JBB);
391 }
392 return;
393
394 notsupported:
395 DictLockConf* conf = (DictLockConf*)signal->getDataPtrSend();
396 conf->userPtr = state;
397 execDICT_LOCK_CONF(signal);
398 }
399
400 void
execDICT_LOCK_CONF(Signal * signal)401 Suma::execDICT_LOCK_CONF(Signal* signal)
402 {
403 jamEntry();
404
405 DictLockConf* conf = (DictLockConf*)signal->getDataPtr();
406 Uint32 state = conf->userPtr;
407
408 switch(state){
409 case DictLockReq::SumaStartMe:
410 jam();
411 c_startup.m_restart_server_node_id = 0;
412 CRASH_INSERTION(13039);
413 send_start_me_req(signal);
414 return;
415 case DictLockReq::SumaHandOver:
416 jam();
417 send_handover_req(signal, SumaHandoverReq::RT_START_NODE);
418 return;
419 default:
420 jam();
421 jamLine(state);
422 ndbrequire(false);
423 }
424 }
425
426 void
execDICT_LOCK_REF(Signal * signal)427 Suma::execDICT_LOCK_REF(Signal* signal)
428 {
429 jamEntry();
430
431 DictLockRef* ref = (DictLockRef*)signal->getDataPtr();
432 Uint32 state = ref->userPtr;
433
434 ndbrequire(ref->errorCode == DictLockRef::TooManyRequests);
435 signal->theData[0] = SumaContinueB::RETRY_DICT_LOCK;
436 signal->theData[1] = state;
437 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 300, 2);
438 }
439
440 void
send_dict_unlock_ord(Signal * signal,Uint32 state)441 Suma::send_dict_unlock_ord(Signal* signal, Uint32 state)
442 {
443 if (state == DictLockReq::SumaStartMe &&
444 !ndbd_suma_dictlock_startme(getNodeInfo(c_masterNodeId).m_version))
445 {
446 jam();
447 return;
448 }
449 else if (state == DictLockReq::SumaHandOver &&
450 !ndbd_suma_dictlock_handover(getNodeInfo(c_masterNodeId).m_version))
451 {
452 jam();
453 return;
454 }
455
456 jam();
457 DictUnlockOrd* ord = (DictUnlockOrd*)signal->getDataPtrSend();
458 ord->lockPtr = 0;
459 ord->lockType = state;
460 ord->senderData = state;
461 ord->senderRef = reference();
462 sendSignal(calcDictBlockRef(c_masterNodeId),
463 GSN_DICT_UNLOCK_ORD, signal, DictUnlockOrd::SignalLength, JBB);
464 }
465
466 void
send_start_me_req(Signal * signal)467 Suma::send_start_me_req(Signal* signal)
468 {
469 Uint32 nodeId= c_startup.m_restart_server_node_id;
470 do {
471 nodeId = c_alive_nodes.find(nodeId + 1);
472
473 if(nodeId == getOwnNodeId())
474 continue;
475 if(nodeId == NdbNodeBitmask::NotFound)
476 {
477 nodeId = 0;
478 continue;
479 }
480 break;
481 } while(true);
482
483
484 infoEvent("Suma: asking node %d to recreate subscriptions on me", nodeId);
485 c_startup.m_restart_server_node_id= nodeId;
486 sendSignal(calcSumaBlockRef(nodeId),
487 GSN_SUMA_START_ME_REQ, signal, 1, JBB);
488 }
489
490 void
execSUMA_START_ME_REF(Signal * signal)491 Suma::execSUMA_START_ME_REF(Signal* signal)
492 {
493 const SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtr();
494
495 Uint32 error = ref->errorCode;
496 if (error != SumaStartMeRef::Busy && error != SumaStartMeRef::NotStarted)
497 {
498 jam();
499 // for some reason we did not manage to create a subscription
500 // on the starting node
501 SystemError * const sysErr = (SystemError*)&signal->theData[0];
502 sysErr->errorCode = SystemError::CopySubscriptionRef;
503 sysErr->errorRef = reference();
504 sysErr->data[0] = error;
505 sysErr->data[1] = 0;
506 sendSignal(NDBCNTR_REF, GSN_SYSTEM_ERROR, signal,
507 SystemError::SignalLength, JBB);
508 return;
509 }
510
511 infoEvent("Suma: node %d refused %d",
512 c_startup.m_restart_server_node_id, ref->errorCode);
513
514 send_start_me_req(signal);
515 }
516
517 void
execSUMA_START_ME_CONF(Signal * signal)518 Suma::execSUMA_START_ME_CONF(Signal* signal)
519 {
520 infoEvent("Suma: node %d has completed restoring me",
521 c_startup.m_restart_server_node_id);
522 sendSTTORRY(signal);
523 send_dict_unlock_ord(signal, DictLockReq::SumaStartMe);
524 c_startup.m_restart_server_node_id= 0;
525 }
526
527 void
createSequence(Signal * signal)528 Suma::createSequence(Signal* signal)
529 {
530 jam();
531 DBUG_ENTER("Suma::createSequence");
532
533 UtilSequenceReq * req = (UtilSequenceReq*)signal->getDataPtrSend();
534
535 req->senderData = RNIL;
536 req->sequenceId = SUMA_SEQUENCE;
537 req->requestType = UtilSequenceReq::Create;
538 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
539 signal, UtilSequenceReq::SignalLength, JBB);
540 // execUTIL_SEQUENCE_CONF will call createSequenceReply()
541 DBUG_VOID_RETURN;
542 }
543
544 void
createSequenceReply(Signal * signal,UtilSequenceConf * conf,UtilSequenceRef * ref)545 Suma::createSequenceReply(Signal* signal,
546 UtilSequenceConf * conf,
547 UtilSequenceRef * ref)
548 {
549 jam();
550
551 if (ref != NULL)
552 {
553 switch ((UtilSequenceRef::ErrorCode)ref->errorCode)
554 {
555 case UtilSequenceRef::NoSuchSequence:
556 ndbrequire(false);
557 case UtilSequenceRef::TCError:
558 {
559 char buf[128];
560 BaseString::snprintf(buf, sizeof(buf),
561 "Startup failed during sequence creation. TC error %d",
562 ref->TCErrorCode);
563 progError(__LINE__, NDBD_EXIT_RESOURCE_ALLOC_ERROR, buf);
564 }
565 }
566 ndbrequire(false);
567 }
568
569 sendSTTORRY(signal);
570 }
571
572 void
execREAD_NODESCONF(Signal * signal)573 Suma::execREAD_NODESCONF(Signal* signal){
574 jamEntry();
575 ReadNodesConf * const conf = (ReadNodesConf *)signal->getDataPtr();
576
577 if(getNodeState().getNodeRestartInProgress())
578 {
579 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startedNodes);
580 c_alive_nodes.set(getOwnNodeId());
581 }
582 else
583 {
584 c_alive_nodes.assign(NdbNodeBitmask::Size, conf->startingNodes);
585 NdbNodeBitmask tmp;
586 tmp.assign(NdbNodeBitmask::Size, conf->startedNodes);
587 ndbrequire(tmp.isclear()); // No nodes can be started during SR
588 }
589
590 if (DBG_3R)
591 {
592 for (Uint32 i = 0; i<MAX_NDB_NODES; i++)
593 {
594 if (c_alive_nodes.get(i))
595 ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, i);
596 }
597 }
598
599 c_masterNodeId = conf->masterNodeId;
600
601 getNodeGroupMembers(signal);
602 }
603
604 void
getNodeGroupMembers(Signal * signal)605 Suma::getNodeGroupMembers(Signal* signal)
606 {
607 jam();
608 DBUG_ENTER("Suma::getNodeGroupMembers");
609 /**
610 * Ask DIH for nodeGroupMembers
611 */
612 CheckNodeGroups * sd = (CheckNodeGroups*)signal->getDataPtrSend();
613 sd->blockRef = reference();
614 sd->requestType = CheckNodeGroups::GetNodeGroupMembers;
615 sd->nodeId = getOwnNodeId();
616 sd->senderData = RNIL;
617 sendSignal(DBDIH_REF, GSN_CHECKNODEGROUPSREQ, signal,
618 CheckNodeGroups::SignalLength, JBB);
619 DBUG_VOID_RETURN;
620 }
621
622 static
623 bool
valid_seq(Uint32 n,Uint32 r,Uint16 dst[])624 valid_seq(Uint32 n, Uint32 r, Uint16 dst[])
625 {
626 Uint16 tmp[MAX_REPLICAS];
627 for (Uint32 i = 0; i<r; i++)
628 {
629 tmp[i] = n % r;
630 for (Uint32 j = 0; j<i; j++)
631 if (tmp[j] == tmp[i])
632 return false;
633 n /= r;
634 }
635
636 /**
637 * reverse order for backward compatibility (with 2 replica)
638 */
639 for (Uint32 i = 0; i<r; i++)
640 dst[i] = tmp[r-i-1];
641
642 return true;
643 }
644
645 void
fix_nodegroup()646 Suma::fix_nodegroup()
647 {
648 Uint32 i, pos= 0;
649
650 for (i = 0; i < MAX_NDB_NODES; i++)
651 {
652 if (c_nodes_in_nodegroup_mask.get(i))
653 {
654 c_nodesInGroup[pos++] = i;
655 }
656 }
657
658 const Uint32 replicas= c_noNodesInGroup = pos;
659
660 if (replicas)
661 {
662 Uint32 buckets= 1;
663 for(i = 1; i <= replicas; i++)
664 buckets *= i;
665
666 Uint32 tot = 0;
667 switch(replicas){
668 case 1:
669 tot = 1;
670 break;
671 case 2:
672 tot = 4; // 2^2
673 break;
674 case 3:
675 tot = 27; // 3^3
676 break;
677 case 4:
678 tot = 256; // 4^4
679 break;
680 ndbrequire(false);
681 }
682 Uint32 cnt = 0;
683 for (i = 0; i<tot; i++)
684 {
685 Bucket* ptr= c_buckets + cnt;
686 if (valid_seq(i, replicas, ptr->m_nodes))
687 {
688 jam();
689 if (DBG_3R) printf("bucket %u : ", cnt);
690 for (Uint32 j = 0; j<replicas; j++)
691 {
692 ptr->m_nodes[j] = c_nodesInGroup[ptr->m_nodes[j]];
693 if (DBG_3R) printf("%u ", ptr->m_nodes[j]);
694 }
695 if (DBG_3R) printf("\n");
696 cnt++;
697 }
698 }
699 ndbrequire(cnt == buckets);
700 c_no_of_buckets= buckets;
701 }
702 else
703 {
704 jam();
705 c_no_of_buckets = 0;
706 }
707 }
708
709
710 void
execCHECKNODEGROUPSCONF(Signal * signal)711 Suma::execCHECKNODEGROUPSCONF(Signal *signal)
712 {
713 const CheckNodeGroups *sd = (const CheckNodeGroups *)signal->getDataPtrSend();
714 DBUG_ENTER("Suma::execCHECKNODEGROUPSCONF");
715 jamEntry();
716
717 c_nodeGroup = sd->output;
718 c_nodes_in_nodegroup_mask.assign(sd->mask);
719 c_noNodesInGroup = c_nodes_in_nodegroup_mask.count();
720
721 fix_nodegroup();
722
723 #ifndef DBUG_OFF
724 for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
725 DBUG_PRINT("exit",("Suma: NodeGroup %u, me %u, "
726 "member[%u] %u",
727 c_nodeGroup, getOwnNodeId(),
728 i, c_nodesInGroup[i]));
729 }
730 #endif
731
732 c_startup.m_restart_server_node_id = 0;
733 if (m_typeOfStart == NodeState::ST_NODE_RESTART ||
734 m_typeOfStart == NodeState::ST_INITIAL_NODE_RESTART)
735 {
736 jam();
737
738 send_dict_lock_req(signal, DictLockReq::SumaStartMe);
739
740 return;
741 }
742
743 c_startup.m_restart_server_node_id = 0;
744 sendSTTORRY(signal);
745
746 DBUG_VOID_RETURN;
747 }
748
749 void
execAPI_START_REP(Signal * signal)750 Suma::execAPI_START_REP(Signal* signal)
751 {
752 Uint32 nodeId = signal->theData[0];
753 c_connected_nodes.set(nodeId);
754
755 check_start_handover(signal);
756 }
757
758 void
check_start_handover(Signal * signal)759 Suma::check_start_handover(Signal* signal)
760 {
761 if(c_startup.m_wait_handover)
762 {
763 NodeBitmask tmp;
764 tmp.assign(c_connected_nodes);
765 tmp.bitAND(c_subscriber_nodes);
766 if(!c_subscriber_nodes.equal(tmp))
767 {
768 return;
769 }
770
771 c_startup.m_wait_handover= false;
772
773 if (c_no_of_buckets)
774 {
775 jam();
776 send_dict_lock_req(signal, DictLockReq::SumaHandOver);
777 }
778 else
779 {
780 jam();
781 sendSTTORRY(signal);
782 }
783 }
784 }
785
786 void
send_handover_req(Signal * signal,Uint32 type)787 Suma::send_handover_req(Signal* signal, Uint32 type)
788 {
789 jam();
790 c_startup.m_handover_nodes.assign(c_alive_nodes);
791 c_startup.m_handover_nodes.bitAND(c_nodes_in_nodegroup_mask);
792 c_startup.m_handover_nodes.clear(getOwnNodeId());
793 Uint32 gci= Uint32(m_last_complete_gci >> 32) + 3;
794
795 SumaHandoverReq* req= (SumaHandoverReq*)signal->getDataPtrSend();
796 char buf[255];
797 c_startup.m_handover_nodes.getText(buf);
798 infoEvent("Suma: initiate handover for %s with nodes %s GCI: %u",
799 (type == SumaHandoverReq::RT_START_NODE ? "startup" : "shutdown"),
800 buf,
801 gci);
802
803 req->gci = gci;
804 req->nodeId = getOwnNodeId();
805 req->requestType = type;
806
807 NodeReceiverGroup rg(SUMA, c_startup.m_handover_nodes);
808 sendSignal(rg, GSN_SUMA_HANDOVER_REQ, signal,
809 SumaHandoverReq::SignalLength, JBB);
810 }
811
812 void
sendSTTORRY(Signal * signal)813 Suma::sendSTTORRY(Signal* signal){
814 signal->theData[0] = 0;
815 signal->theData[3] = 1;
816 signal->theData[4] = 3;
817 signal->theData[5] = 5;
818 signal->theData[6] = 7;
819 signal->theData[7] = 100;
820 signal->theData[8] = 101;
821 signal->theData[9] = 255; // No more start phases from missra
822 sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 10, JBB);
823 }
824
825 void
execNDB_STTOR(Signal * signal)826 Suma::execNDB_STTOR(Signal* signal)
827 {
828 jamEntry();
829 }
830
831 void
execCONTINUEB(Signal * signal)832 Suma::execCONTINUEB(Signal* signal){
833 jamEntry();
834 Uint32 type= signal->theData[0];
835 switch(type){
836 case SumaContinueB::RELEASE_GCI:
837 {
838 Uint32 gci_hi = signal->theData[2];
839 Uint32 gci_lo = signal->theData[3];
840 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
841 release_gci(signal, signal->theData[1], gci);
842 return;
843 }
844 case SumaContinueB::RESEND_BUCKET:
845 {
846 Uint32 min_gci_hi = signal->theData[2];
847 Uint32 min_gci_lo = signal->theData[5];
848 Uint32 last_gci_hi = signal->theData[4];
849 Uint32 last_gci_lo = signal->theData[6];
850 Uint64 min_gci = min_gci_lo | (Uint64(min_gci_hi) << 32);
851 Uint64 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
852 resend_bucket(signal,
853 signal->theData[1],
854 min_gci,
855 signal->theData[3],
856 last_gci);
857 return;
858 }
859 case SumaContinueB::OUT_OF_BUFFER_RELEASE:
860 out_of_buffer_release(signal, signal->theData[1]);
861 return;
862 case SumaContinueB::API_FAIL_GCI_LIST:
863 api_fail_gci_list(signal, signal->theData[1]);
864 return;
865 case SumaContinueB::API_FAIL_SUBSCRIBER_LIST:
866 api_fail_subscriber_list(signal,
867 signal->theData[1]);
868 return;
869 case SumaContinueB::API_FAIL_SUBSCRIPTION:
870 api_fail_subscription(signal);
871 return;
872 case SumaContinueB::SUB_STOP_REQ:
873 sub_stop_req(signal);
874 return;
875 case SumaContinueB::RETRY_DICT_LOCK:
876 jam();
877 send_dict_lock_req(signal, signal->theData[1]);
878 return;
879 }
880 }
881
882 /*****************************************************************************
883 *
884 * Node state handling
885 *
886 *****************************************************************************/
887
execAPI_FAILREQ(Signal * signal)888 void Suma::execAPI_FAILREQ(Signal* signal)
889 {
890 jamEntry();
891 DBUG_ENTER("Suma::execAPI_FAILREQ");
892 Uint32 failedApiNode = signal->theData[0];
893 ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
894
895 c_connected_nodes.clear(failedApiNode);
896
897 if (c_failedApiNodes.get(failedApiNode))
898 {
899 jam();
900 /* Being handled already, just conf */
901 goto CONF;
902 }
903
904 if (!c_subscriber_nodes.get(failedApiNode))
905 {
906 jam();
907 /* No Subscribers on that node, no SUMA
908 * specific work to do
909 */
910 goto BLOCK_CLEANUP;
911 }
912
913 c_failedApiNodes.set(failedApiNode);
914 c_subscriber_nodes.clear(failedApiNode);
915 c_subscriber_per_node[failedApiNode] = 0;
916 c_failedApiNodesState[failedApiNode] = __LINE__;
917
918 check_start_handover(signal);
919
920 signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
921 signal->theData[1] = failedApiNode;
922 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
923 return;
924
925 BLOCK_CLEANUP:
926 jam();
927 api_fail_block_cleanup(signal, failedApiNode);
928 DBUG_VOID_RETURN;
929
930 CONF:
931 jam();
932 signal->theData[0] = failedApiNode;
933 signal->theData[1] = reference();
934 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
935
936 c_failedApiNodesState[failedApiNode] = 0;
937
938 DBUG_VOID_RETURN;
939 }//execAPI_FAILREQ()
940
941 void
api_fail_block_cleanup_callback(Signal * signal,Uint32 failedNodeId,Uint32 elementsCleaned)942 Suma::api_fail_block_cleanup_callback(Signal* signal,
943 Uint32 failedNodeId,
944 Uint32 elementsCleaned)
945 {
946 jamEntry();
947
948 /* Suma should not have any block level elements
949 * to be cleaned (Fragmented send/receive structures etc.)
950 * As it only uses Fragmented send/receive locally
951 */
952 ndbassert(elementsCleaned == 0);
953
954 /* Node failure handling is complete */
955 signal->theData[0] = failedNodeId;
956 signal->theData[1] = reference();
957 sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
958 c_failedApiNodes.clear(failedNodeId);
959 c_failedApiNodesState[failedNodeId] = 0;
960 }
961
962 void
api_fail_block_cleanup(Signal * signal,Uint32 failedNode)963 Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
964 {
965 jam();
966
967 c_failedApiNodesState[failedNode] = __LINE__;
968
969 Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
970 failedNode};
971
972 simBlockNodeFailure(signal, failedNode, cb);
973 }
974
975 void
api_fail_gci_list(Signal * signal,Uint32 nodeId)976 Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
977 {
978 jam();
979
980 Ptr<Gcp_record> gcp;
981 if (c_gcp_list.first(gcp))
982 {
983 jam();
984 gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
985
986 if (gcp.p->m_subscribers.isclear())
987 {
988 jam();
989
990 SubGcpCompleteAck* ack = (SubGcpCompleteAck*)signal->getDataPtrSend();
991 ack->rep.gci_hi = Uint32(gcp.p->m_gci >> 32);
992 ack->rep.gci_lo = Uint32(gcp.p->m_gci);
993 ack->rep.senderRef = reference();
994 NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
995 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
996 SubGcpCompleteAck::SignalLength, JBB);
997
998 c_gcp_list.release(gcp);
999
1000 c_failedApiNodesState[nodeId] = __LINE__;
1001 signal->theData[0] = SumaContinueB::API_FAIL_GCI_LIST;
1002 signal->theData[1] = nodeId;
1003 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
1004 return;
1005 }
1006 }
1007
1008 if (ERROR_INSERTED(13023))
1009 {
1010 CLEAR_ERROR_INSERT_VALUE;
1011 }
1012
1013 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1014 signal->theData[1] = nodeId;
1015 signal->theData[2] = RNIL; // SubOpPtr
1016 signal->theData[3] = RNIL; // c_subscribers bucket
1017 signal->theData[4] = RNIL; // subscriptionId
1018 signal->theData[5] = RNIL; // SubscriptionKey
1019
1020 Ptr<SubOpRecord> subOpPtr;
1021 if (c_subOpPool.seize(subOpPtr))
1022 {
1023 c_failedApiNodesState[nodeId] = __LINE__;
1024 signal->theData[2] = subOpPtr.i;
1025 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1026 }
1027 else
1028 {
1029 c_failedApiNodesState[nodeId] = __LINE__;
1030 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1031 }
1032
1033 return;
1034 }
1035
1036 void
api_fail_subscriber_list(Signal * signal,Uint32 nodeId)1037 Suma::api_fail_subscriber_list(Signal* signal, Uint32 nodeId)
1038 {
1039 jam();
1040 Ptr<SubOpRecord> subOpPtr;
1041
1042 if (c_outstanding_drop_trig_req > 9)
1043 {
1044 jam();
1045 /**
1046 * Make sure not to overflow DbtupProxy with too many GSN_DROP_TRIG_IMPL_REQ
1047 * 9 is arbitrary number...
1048 */
1049 sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
1050 signal->getLength());
1051 return;
1052 }
1053
1054 subOpPtr.i = signal->theData[2];
1055 if (subOpPtr.i == RNIL)
1056 {
1057 if (c_subOpPool.seize(subOpPtr))
1058 {
1059 signal->theData[3] = RNIL;
1060 }
1061 else
1062 {
1063 jam();
1064 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1065 c_failedApiNodesState[nodeId] = __LINE__;
1066 return;
1067 }
1068 }
1069 else
1070 {
1071 jam();
1072 c_subOpPool.getPtr(subOpPtr);
1073 }
1074
1075 Uint32 bucket = signal->theData[3];
1076 Uint32 subscriptionId = signal->theData[4];
1077 Uint32 subscriptionKey = signal->theData[5];
1078
1079 DLHashTable<Subscription>::Iterator iter;
1080 if (bucket == RNIL)
1081 {
1082 jam();
1083 c_subscriptions.first(iter);
1084 c_failedApiNodesState[nodeId] = __LINE__;
1085 }
1086 else
1087 {
1088 jam();
1089
1090 Subscription key;
1091 key.m_subscriptionId = subscriptionId;
1092 key.m_subscriptionKey = subscriptionKey;
1093 if (c_subscriptions.find(iter.curr, key) == false)
1094 {
1095 jam();
1096 /**
1097 * We restart from this bucket :-(
1098 */
1099 c_subscriptions.next(bucket, iter);
1100 c_failedApiNodesState[nodeId] = __LINE__;
1101 }
1102 else
1103 {
1104 iter.bucket = bucket;
1105 }
1106 }
1107
1108 if (iter.curr.isNull())
1109 {
1110 jam();
1111 api_fail_block_cleanup(signal, nodeId);
1112 c_failedApiNodesState[nodeId] = __LINE__;
1113 return;
1114 }
1115
1116 subOpPtr.p->m_opType = SubOpRecord::R_API_FAIL_REQ;
1117 subOpPtr.p->m_subPtrI = iter.curr.i;
1118 subOpPtr.p->m_senderRef = nodeId;
1119 subOpPtr.p->m_senderData = iter.bucket;
1120
1121 LocalDLFifoList<SubOpRecord> list(c_subOpPool, iter.curr.p->m_stop_req);
1122 bool empty = list.isEmpty();
1123 list.add(subOpPtr);
1124
1125 if (empty)
1126 {
1127 jam();
1128 c_failedApiNodesState[nodeId] = __LINE__;
1129 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1130 signal->theData[1] = subOpPtr.i;
1131 signal->theData[2] = RNIL;
1132 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1133 }
1134 else
1135 {
1136 jam();
1137 c_failedApiNodesState[nodeId] = __LINE__;
1138 }
1139 }
1140
1141 void
api_fail_subscription(Signal * signal)1142 Suma::api_fail_subscription(Signal* signal)
1143 {
1144 jam();
1145 Ptr<SubOpRecord> subOpPtr;
1146 c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
1147
1148 Uint32 nodeId = subOpPtr.p->m_senderRef;
1149
1150 Ptr<Subscription> subPtr;
1151 c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
1152
1153 Ptr<Subscriber> ptr;
1154 {
1155 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
1156 if (signal->theData[2] == RNIL)
1157 {
1158 jam();
1159 list.first(ptr);
1160 }
1161 else
1162 {
1163 jam();
1164 list.getPtr(ptr, signal->theData[2]);
1165 }
1166
1167 for (Uint32 i = 0; i<32 && !ptr.isNull(); i++)
1168 {
1169 jam();
1170 if (refToNode(ptr.p->m_senderRef) == nodeId)
1171 {
1172 jam();
1173
1174 Ptr<Subscriber> tmp = ptr;
1175 list.next(ptr);
1176 list.remove(tmp);
1177
1178 /**
1179 * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
1180 */
1181 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
1182
1183 send_sub_start_stop_event(signal, tmp, NdbDictionary::Event::_TE_STOP,
1184 report, list);
1185
1186 c_subscriberPool.release(tmp);
1187 }
1188 else
1189 {
1190 jam();
1191 list.next(ptr);
1192 }
1193 }
1194 }
1195
1196 if (!ptr.isNull())
1197 {
1198 jam();
1199 c_failedApiNodesState[nodeId] = __LINE__;
1200 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
1201 signal->theData[1] = subOpPtr.i;
1202 signal->theData[2] = ptr.i;
1203 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
1204 return;
1205 }
1206
1207 // Start potential waiter(s)
1208 check_remove_queue(signal, subPtr, subOpPtr, true, false);
1209 check_release_subscription(signal, subPtr);
1210
1211 // Continue iterating through subscriptions
1212 DLHashTable<Subscription>::Iterator iter;
1213 iter.bucket = subOpPtr.p->m_senderData;
1214 iter.curr = subPtr;
1215
1216 if (c_subscriptions.next(iter))
1217 {
1218 jam();
1219 c_failedApiNodesState[nodeId] = __LINE__;
1220 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIBER_LIST;
1221 signal->theData[1] = nodeId;
1222 signal->theData[2] = subOpPtr.i;
1223 signal->theData[3] = iter.bucket;
1224 signal->theData[4] = iter.curr.p->m_subscriptionId; // subscriptionId
1225 signal->theData[5] = iter.curr.p->m_subscriptionKey; // SubscriptionKey
1226 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 6, JBB);
1227 return;
1228 }
1229
1230 c_subOpPool.release(subOpPtr);
1231
1232 /* Now do block level cleanup */
1233 api_fail_block_cleanup(signal, nodeId);
1234 }
1235
1236 void
execNODE_FAILREP(Signal * signal)1237 Suma::execNODE_FAILREP(Signal* signal){
1238 jamEntry();
1239 DBUG_ENTER("Suma::execNODE_FAILREP");
1240 ndbassert(signal->getNoOfSections() == 0);
1241
1242 const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
1243 NdbNodeBitmask failed; failed.assign(NdbNodeBitmask::Size, rep->theNodes);
1244
1245 if(c_restart.m_ref && failed.get(refToNode(c_restart.m_ref)))
1246 {
1247 jam();
1248
1249 if (c_restart.m_waiting_on_self)
1250 {
1251 jam();
1252 c_restart.m_abort = 1;
1253 }
1254 else
1255 {
1256 jam();
1257 Ptr<Subscription> subPtr;
1258 c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
1259 abort_start_me(signal, subPtr, false);
1260 }
1261 }
1262
1263 if (ERROR_INSERTED(13032))
1264 {
1265 Uint32 node = c_subscriber_nodes.find(0);
1266 if (node != NodeBitmask::NotFound)
1267 {
1268 ndbout_c("Inserting API_FAILREQ node: %u", node);
1269 signal->theData[0] = node;
1270 sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
1271 }
1272 }
1273
1274 NdbNodeBitmask tmp;
1275 tmp.assign(c_alive_nodes);
1276 tmp.bitANDC(failed);
1277
1278 NdbNodeBitmask takeover_nodes;
1279
1280 if(c_nodes_in_nodegroup_mask.overlaps(failed))
1281 {
1282 for( Uint32 i = 0; i < c_no_of_buckets; i++)
1283 {
1284 if(m_active_buckets.get(i))
1285 continue;
1286 else if(m_switchover_buckets.get(i))
1287 {
1288 Uint32 state= c_buckets[i].m_state;
1289 if((state & Bucket::BUCKET_HANDOVER) &&
1290 failed.get(get_responsible_node(i)))
1291 {
1292 m_active_buckets.set(i);
1293 m_switchover_buckets.clear(i);
1294 ndbout_c("aborting handover");
1295 }
1296 else if(state & Bucket::BUCKET_STARTING)
1297 {
1298 progError(__LINE__, NDBD_EXIT_SYSTEM_ERROR,
1299 "Nodefailure during SUMA takeover");
1300 }
1301 else if (state & Bucket::BUCKET_SHUTDOWN_TO)
1302 {
1303 jam();
1304 c_buckets[i].m_state &= ~Uint32(Bucket::BUCKET_SHUTDOWN_TO);
1305 m_switchover_buckets.clear(i);
1306 ndbrequire(get_responsible_node(i, tmp) == getOwnNodeId());
1307 start_resend(signal, i);
1308 }
1309 }
1310 else if(get_responsible_node(i, tmp) == getOwnNodeId())
1311 {
1312 start_resend(signal, i);
1313 }
1314 }
1315 }
1316
1317 /* Block level cleanup */
1318 for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
1319 jam();
1320 if(failed.get(i)) {
1321 jam();
1322 Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
1323 ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals
1324 (void) elementsCleaned; // Avoid compiler error
1325 }//if
1326 }//for
1327
1328 c_alive_nodes.assign(tmp);
1329
1330 DBUG_VOID_RETURN;
1331 }
1332
1333 void
execINCL_NODEREQ(Signal * signal)1334 Suma::execINCL_NODEREQ(Signal* signal){
1335 jamEntry();
1336
1337 const Uint32 senderRef = signal->theData[0];
1338 const Uint32 nodeId = signal->theData[1];
1339
1340 ndbrequire(!c_alive_nodes.get(nodeId));
1341 if (c_nodes_in_nodegroup_mask.get(nodeId))
1342 {
1343 /**
1344 *
1345 * XXX TODO: This should be removed
1346 * But, other nodes are (incorrectly) reported as started
1347 * even if they're not "started", but only INCL_NODEREQ'ed
1348 */
1349 c_alive_nodes.set(nodeId);
1350
1351 /**
1352 *
1353 * Nodes in nodegroup will be "alive" when
1354 * sending SUMA_HANDOVER_REQ
1355 */
1356 }
1357 else
1358 {
1359 jam();
1360 c_alive_nodes.set(nodeId);
1361 }
1362
1363 signal->theData[0] = nodeId;
1364 signal->theData[1] = reference();
1365 sendSignal(senderRef, GSN_INCL_NODECONF, signal, 2, JBB);
1366 }
1367
1368 void
execSIGNAL_DROPPED_REP(Signal * signal)1369 Suma::execSIGNAL_DROPPED_REP(Signal* signal){
1370 jamEntry();
1371 ndbrequire(false);
1372 }
1373
1374 /********************************************************************
1375 *
1376 * Dump state
1377 *
1378 */
1379 static
1380 const char*
cstr(Suma::Subscription::State s)1381 cstr(Suma::Subscription::State s)
1382 {
1383 switch(s){
1384 case Suma::Subscription::UNDEFINED:
1385 return "undefined";
1386 case Suma::Subscription::DEFINED:
1387 return "defined";
1388 case Suma::Subscription::DEFINING:
1389 return "defining";
1390 }
1391 return "<unknown>";
1392 }
1393
1394 static
1395 const char*
cstr(Suma::Subscription::TriggerState s)1396 cstr(Suma::Subscription::TriggerState s)
1397 {
1398 switch(s){
1399 case Suma::Subscription::T_UNDEFINED:
1400 return "undefined";
1401 case Suma::Subscription::T_CREATING:
1402 return "creating";
1403 case Suma::Subscription::T_DEFINED:
1404 return "defined";
1405 case Suma::Subscription::T_DROPPING:
1406 return "dropping";
1407 case Suma::Subscription::T_ERROR:
1408 return "error";
1409 }
1410 return "<uknown>";
1411 }
1412
1413 static
1414 const char*
cstr(Suma::Subscription::Options s)1415 cstr(Suma::Subscription::Options s)
1416 {
1417 static char buf[256];
1418 buf[0] = 0;
1419 strcat(buf, "[");
1420 if (s & Suma::Subscription::REPORT_ALL)
1421 strcat(buf, " reportall");
1422 if (s & Suma::Subscription::REPORT_SUBSCRIBE)
1423 strcat(buf, " reportsubscribe");
1424 if (s & Suma::Subscription::MARKED_DROPPED)
1425 strcat(buf, " dropped");
1426 if (s & Suma::Subscription::NO_REPORT_DDL)
1427 strcat(buf, " noreportddl");
1428 strcat(buf, " ]");
1429 return buf;
1430 }
1431
1432 static
1433 const char*
cstr(Suma::Table::State s)1434 cstr(Suma::Table::State s)
1435 {
1436 switch(s){
1437 case Suma::Table::UNDEFINED:
1438 return "undefined";
1439 case Suma::Table::DEFINING:
1440 return "defining";
1441 case Suma::Table::DEFINED:
1442 return "defined";
1443 case Suma::Table::DROPPED:
1444 return "dropped";
1445 }
1446 return "<unknown>";
1447 }
1448
1449 void
execDUMP_STATE_ORD(Signal * signal)1450 Suma::execDUMP_STATE_ORD(Signal* signal){
1451 jamEntry();
1452
1453 Uint32 tCase = signal->theData[0];
1454 #if 0
1455 if(tCase >= 8000 && tCase <= 8003){
1456 SubscriptionPtr subPtr;
1457 c_subscriptions.getPtr(subPtr, g_subPtrI);
1458
1459 Ptr<SyncRecord> syncPtr;
1460 c_syncPool.getPtr(syncPtr, subPtr.p->m_syncPtrI);
1461
1462 if(tCase == 8000){
1463 syncPtr.p->startMeta(signal);
1464 }
1465
1466 if(tCase == 8001){
1467 syncPtr.p->startScan(signal);
1468 }
1469
1470 if(tCase == 8002){
1471 syncPtr.p->startTrigger(signal);
1472 }
1473
1474 if(tCase == 8003){
1475 subPtr.p->m_subscriptionType = SubCreateReq::SingleTableScan;
1476 LocalDataBuffer<15> attrs(c_dataBufferPool, syncPtr.p->m_attributeList);
1477 Uint32 tab = 0;
1478 Uint32 att[] = { 0, 1, 1 };
1479 syncPtr.p->m_tableList.append(&tab, 1);
1480 attrs.append(att, 3);
1481 }
1482 }
1483 #endif
1484 if(tCase == 8004){
1485 infoEvent("Suma: c_subscriberPool size: %d free: %d",
1486 c_subscriberPool.getSize(),
1487 c_subscriberPool.getNoOfFree());
1488
1489 infoEvent("Suma: c_tablePool size: %d free: %d",
1490 c_tablePool.getSize(),
1491 c_tablePool.getNoOfFree());
1492
1493 infoEvent("Suma: c_subscriptionPool size: %d free: %d",
1494 c_subscriptionPool.getSize(),
1495 c_subscriptionPool.getNoOfFree());
1496
1497 infoEvent("Suma: c_syncPool size: %d free: %d",
1498 c_syncPool.getSize(),
1499 c_syncPool.getNoOfFree());
1500
1501 infoEvent("Suma: c_dataBufferPool size: %d free: %d",
1502 c_dataBufferPool.getSize(),
1503 c_dataBufferPool.getNoOfFree());
1504
1505 infoEvent("Suma: c_subOpPool size: %d free: %d",
1506 c_subOpPool.getSize(),
1507 c_subOpPool.getNoOfFree());
1508
1509 #if 0
1510 infoEvent("Suma: c_dataSubscribers count: %d",
1511 count_subscribers(c_dataSubscribers));
1512 infoEvent("Suma: c_prepDataSubscribers count: %d",
1513 count_subscribers(c_prepDataSubscribers));
1514 #endif
1515 }
1516
1517 if(tCase == 8005)
1518 {
1519 for(Uint32 i = 0; i<c_no_of_buckets; i++)
1520 {
1521 Bucket* ptr= c_buckets + i;
1522 infoEvent("Bucket %d %d%d-%x switch gci: %llu max_acked_gci: %llu max_gci: %llu tail: %d head: %d",
1523 i,
1524 m_active_buckets.get(i),
1525 m_switchover_buckets.get(i),
1526 ptr->m_state,
1527 ptr->m_switchover_gci,
1528 ptr->m_max_acked_gci,
1529 ptr->m_buffer_head.m_max_gci,
1530 ptr->m_buffer_tail,
1531 ptr->m_buffer_head.m_page_id);
1532 }
1533 }
1534
1535 if (tCase == 8006)
1536 {
1537 SET_ERROR_INSERT_VALUE(13029);
1538 }
1539
1540 if (tCase == 8007)
1541 {
1542 c_startup.m_restart_server_node_id = MAX_NDB_NODES + 1;
1543 SET_ERROR_INSERT_VALUE(13029);
1544 }
1545
1546 if (tCase == 8008)
1547 {
1548 CLEAR_ERROR_INSERT_VALUE;
1549 }
1550
1551 if (tCase == 8010)
1552 {
1553 char buf1[255], buf2[255];
1554 c_subscriber_nodes.getText(buf1);
1555 c_connected_nodes.getText(buf2);
1556 infoEvent("c_subscriber_nodes: %s", buf1);
1557 infoEvent("c_connected_nodes: %s", buf2);
1558 }
1559
1560 if (tCase == 8009)
1561 {
1562 if (ERROR_INSERTED(13030))
1563 {
1564 CLEAR_ERROR_INSERT_VALUE;
1565 sendSTTORRY(signal);
1566 }
1567 else
1568 {
1569 SET_ERROR_INSERT_VALUE(13030);
1570 }
1571 return;
1572 }
1573
1574 if (tCase == 8011)
1575 {
1576 jam();
1577 Uint32 bucket = signal->theData[1];
1578 KeyTable<Table>::Iterator it;
1579 if (signal->getLength() == 1)
1580 {
1581 jam();
1582 bucket = 0;
1583 infoEvent("-- Starting dump of subscribers --");
1584 }
1585
1586 c_tables.next(bucket, it);
1587 const Uint32 RT_BREAK = 16;
1588 for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1589 {
1590 jam();
1591 if(it.curr.i == RNIL)
1592 {
1593 jam();
1594 infoEvent("-- Ending dump of subscribers --");
1595 return;
1596 }
1597
1598 infoEvent("Table %u ver %u",
1599 it.curr.p->m_tableId,
1600 it.curr.p->m_schemaVersion);
1601
1602 Uint32 cnt = 0;
1603 Ptr<Subscription> subPtr;
1604 LocalDLList<Subscription> subList(c_subscriptionPool,
1605 it.curr.p->m_subscriptions);
1606 for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
1607 {
1608 infoEvent(" Subcription %u", subPtr.i);
1609 {
1610 Ptr<Subscriber> ptr;
1611 LocalDLList<Subscriber> list(c_subscriberPool,
1612 subPtr.p->m_subscribers);
1613 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1614 {
1615 jam();
1616 cnt++;
1617 infoEvent(" Subscriber [ %x %u %u ]",
1618 ptr.p->m_senderRef,
1619 ptr.p->m_senderData,
1620 subPtr.i);
1621 }
1622 }
1623
1624 {
1625 Ptr<SubOpRecord> ptr;
1626 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1627 subPtr.p->m_create_req);
1628
1629 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1630 {
1631 jam();
1632 infoEvent(" create [ %x %u ]",
1633 ptr.p->m_senderRef,
1634 ptr.p->m_senderData);
1635 }
1636 }
1637
1638 {
1639 Ptr<SubOpRecord> ptr;
1640 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1641 subPtr.p->m_start_req);
1642
1643 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1644 {
1645 jam();
1646 infoEvent(" start [ %x %u ]",
1647 ptr.p->m_senderRef,
1648 ptr.p->m_senderData);
1649 }
1650 }
1651
1652 {
1653 Ptr<SubOpRecord> ptr;
1654 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1655 subPtr.p->m_stop_req);
1656
1657 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1658 {
1659 jam();
1660 infoEvent(" stop [ %u %x %u ]",
1661 ptr.p->m_opType,
1662 ptr.p->m_senderRef,
1663 ptr.p->m_senderData);
1664 }
1665 }
1666 }
1667 infoEvent("Table %u #subscribers %u", it.curr.p->m_tableId, cnt);
1668 c_tables.next(it);
1669 }
1670
1671 signal->theData[0] = tCase;
1672 signal->theData[1] = it.bucket;
1673 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1674 return;
1675 }
1676
1677 if (tCase == 8012)
1678 {
1679 jam();
1680 Uint32 bucket = signal->theData[1];
1681 KeyTable<Subscription>::Iterator it;
1682 if (signal->getLength() == 1)
1683 {
1684 jam();
1685 bucket = 0;
1686 infoEvent("-- Starting dump of subscribers --");
1687 }
1688
1689 c_subscriptions.next(bucket, it);
1690 const Uint32 RT_BREAK = 16;
1691 for(Uint32 i = 0; i<RT_BREAK || it.bucket == bucket; i++)
1692 {
1693 jam();
1694 if(it.curr.i == RNIL)
1695 {
1696 jam();
1697 infoEvent("-- Ending dump of subscribers --");
1698 return;
1699 }
1700
1701 Ptr<Subscription> subPtr = it.curr;
1702 Ptr<Table> tabPtr;
1703 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
1704 infoEvent("Subcription %u id: 0x%.8x key: 0x%.8x state: %s",
1705 subPtr.i,
1706 subPtr.p->m_subscriptionId,
1707 subPtr.p->m_subscriptionKey,
1708 cstr(subPtr.p->m_state));
1709 infoEvent(" trigger state: %s options: %s",
1710 cstr(subPtr.p->m_trigger_state),
1711 cstr((Suma::Subscription::Options)subPtr.p->m_options));
1712 infoEvent(" tablePtr: %u tableId: %u schemaVersion: 0x%.8x state: %s",
1713 tabPtr.i,
1714 subPtr.p->m_tableId,
1715 tabPtr.p->m_schemaVersion,
1716 cstr(tabPtr.p->m_state));
1717 {
1718 Ptr<Subscriber> ptr;
1719 LocalDLList<Subscriber> list(c_subscriberPool,
1720 subPtr.p->m_subscribers);
1721 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1722 {
1723 jam();
1724 infoEvent(" Subscriber [ %x %u %u ]",
1725 ptr.p->m_senderRef,
1726 ptr.p->m_senderData,
1727 subPtr.i);
1728 }
1729 }
1730
1731 {
1732 Ptr<SubOpRecord> ptr;
1733 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1734 subPtr.p->m_create_req);
1735
1736 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1737 {
1738 jam();
1739 infoEvent(" create [ %x %u ]",
1740 ptr.p->m_senderRef,
1741 ptr.p->m_senderData);
1742 }
1743 }
1744
1745 {
1746 Ptr<SubOpRecord> ptr;
1747 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1748 subPtr.p->m_start_req);
1749
1750 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1751 {
1752 jam();
1753 infoEvent(" start [ %x %u ]",
1754 ptr.p->m_senderRef,
1755 ptr.p->m_senderData);
1756 }
1757 }
1758
1759 {
1760 Ptr<SubOpRecord> ptr;
1761 LocalDLFifoList<SubOpRecord> list(c_subOpPool,
1762 subPtr.p->m_stop_req);
1763
1764 for (list.first(ptr); !ptr.isNull(); list.next(ptr), i++)
1765 {
1766 jam();
1767 infoEvent(" stop [ %u %x %u ]",
1768 ptr.p->m_opType,
1769 ptr.p->m_senderRef,
1770 ptr.p->m_senderData);
1771 }
1772 }
1773 c_subscriptions.next(it);
1774 }
1775
1776 signal->theData[0] = tCase;
1777 signal->theData[1] = it.bucket;
1778 sendSignalWithDelay(reference(), GSN_DUMP_STATE_ORD, signal, 100, 2);
1779 return;
1780 }
1781
1782 if (tCase == 7019 && signal->getLength() == 2)
1783 {
1784 jam();
1785 Uint32 nodeId = signal->theData[1];
1786 if (nodeId < MAX_NODES)
1787 {
1788 warningEvent(" Suma 7019 %u line: %u", nodeId,
1789 c_failedApiNodesState[nodeId]);
1790 warningEvent(" c_connected_nodes.get(): %u",
1791 c_connected_nodes.get(nodeId));
1792 warningEvent(" c_failedApiNodes.get(): %u",
1793 c_failedApiNodes.get(nodeId));
1794 warningEvent(" c_subscriber_nodes.get(): %u",
1795 c_subscriber_nodes.get(nodeId));
1796 warningEvent(" c_subscriber_per_node[%u]: %u",
1797 nodeId, c_subscriber_per_node[nodeId]);
1798 }
1799 else
1800 {
1801 warningEvent(" SUMP: dump-7019 to unknown node: %u", nodeId);
1802 }
1803 }
1804 }
1805
execDBINFO_SCANREQ(Signal * signal)1806 void Suma::execDBINFO_SCANREQ(Signal *signal)
1807 {
1808 DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
1809 const Ndbinfo::ScanCursor* cursor =
1810 CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req));
1811 Ndbinfo::Ratelimit rl;
1812
1813 jamEntry();
1814
1815 switch(req.tableId){
1816 case Ndbinfo::POOLS_TABLEID:
1817 {
1818 Ndbinfo::pool_entry pools[] =
1819 {
1820 { "Subscriber",
1821 c_subscriberPool.getUsed(),
1822 c_subscriberPool.getSize(),
1823 c_subscriberPool.getEntrySize(),
1824 c_subscriberPool.getUsedHi(),
1825 { CFG_DB_SUBSCRIBERS,
1826 CFG_DB_SUBSCRIPTIONS,
1827 CFG_DB_NO_TABLES,0 }},
1828 { "Table",
1829 c_tablePool.getUsed(),
1830 c_tablePool.getSize(),
1831 c_tablePool.getEntrySize(),
1832 c_tablePool.getUsedHi(),
1833 { CFG_DB_NO_TABLES,0,0,0 }},
1834 { "Subscription",
1835 c_subscriptionPool.getUsed(),
1836 c_subscriptionPool.getSize(),
1837 c_subscriptionPool.getEntrySize(),
1838 c_subscriptionPool.getUsedHi(),
1839 { CFG_DB_SUBSCRIPTIONS,
1840 CFG_DB_NO_TABLES,0,0 }},
1841 { "Sync",
1842 c_syncPool.getUsed(),
1843 c_syncPool.getSize(),
1844 c_syncPool.getEntrySize(),
1845 c_syncPool.getUsedHi(),
1846 { 0,0,0,0 }},
1847 { "Data Buffer",
1848 c_dataBufferPool.getUsed(),
1849 c_dataBufferPool.getSize(),
1850 c_dataBufferPool.getEntrySize(),
1851 c_dataBufferPool.getUsedHi(),
1852 { CFG_DB_NO_ATTRIBUTES,0,0,0 }},
1853 { "SubOp",
1854 c_subOpPool.getUsed(),
1855 c_subOpPool.getSize(),
1856 c_subOpPool.getEntrySize(),
1857 c_subOpPool.getUsedHi(),
1858 { CFG_DB_SUB_OPERATIONS,0,0,0 }},
1859 { "Page Chunk",
1860 c_page_chunk_pool.getUsed(),
1861 c_page_chunk_pool.getSize(),
1862 c_page_chunk_pool.getEntrySize(),
1863 c_page_chunk_pool.getUsedHi(),
1864 { 0,0,0,0 }},
1865 { "GCP",
1866 c_gcp_pool.getUsed(),
1867 c_gcp_pool.getSize(),
1868 c_gcp_pool.getEntrySize(),
1869 c_gcp_pool.getUsedHi(),
1870 { CFG_DB_API_HEARTBEAT_INTERVAL,
1871 CFG_DB_GCP_INTERVAL,0,0 }},
1872 { NULL, 0,0,0,0, { 0,0,0,0 }}
1873 };
1874
1875 const size_t num_config_params =
1876 sizeof(pools[0].config_params) / sizeof(pools[0].config_params[0]);
1877 Uint32 pool = cursor->data[0];
1878 BlockNumber bn = blockToMain(number());
1879 while(pools[pool].poolname)
1880 {
1881 jam();
1882 Ndbinfo::Row row(signal, req);
1883 row.write_uint32(getOwnNodeId());
1884 row.write_uint32(bn); // block number
1885 row.write_uint32(instance()); // block instance
1886 row.write_string(pools[pool].poolname);
1887 row.write_uint64(pools[pool].used);
1888 row.write_uint64(pools[pool].total);
1889 row.write_uint64(pools[pool].used_hi);
1890 row.write_uint64(pools[pool].entry_size);
1891 for (size_t i = 0; i < num_config_params; i++)
1892 row.write_uint32(pools[pool].config_params[i]);
1893 ndbinfo_send_row(signal, req, row, rl);
1894 pool++;
1895 if (rl.need_break(req))
1896 {
1897 jam();
1898 ndbinfo_send_scan_break(signal, req, rl, pool);
1899 return;
1900 }
1901 }
1902 break;
1903 }
1904 default:
1905 break;
1906 }
1907
1908 ndbinfo_send_scan_conf(signal, req, rl);
1909 }
1910
1911 /*************************************************************
1912 *
1913 * Creation of subscription id's
1914 *
1915 ************************************************************/
1916
1917 void
execCREATE_SUBID_REQ(Signal * signal)1918 Suma::execCREATE_SUBID_REQ(Signal* signal)
1919 {
1920 jamEntry();
1921 DBUG_ENTER("Suma::execCREATE_SUBID_REQ");
1922 ndbassert(signal->getNoOfSections() == 0);
1923 CRASH_INSERTION(13001);
1924
1925 CreateSubscriptionIdReq const * req =
1926 (CreateSubscriptionIdReq*)signal->getDataPtr();
1927 SubscriberPtr subbPtr;
1928 if(!c_subscriberPool.seize(subbPtr)){
1929 jam();
1930 sendSubIdRef(signal, req->senderRef, req->senderData, 1412);
1931 DBUG_VOID_RETURN;
1932 }
1933 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1934 c_subscriberPool.getSize(),
1935 c_subscriberPool.getNoOfFree()));
1936
1937 subbPtr.p->m_senderRef = req->senderRef;
1938 subbPtr.p->m_senderData = req->senderData;
1939
1940 UtilSequenceReq * utilReq = (UtilSequenceReq*)signal->getDataPtrSend();
1941 utilReq->senderData = subbPtr.i;
1942 utilReq->sequenceId = SUMA_SEQUENCE;
1943 utilReq->requestType = UtilSequenceReq::NextVal;
1944 sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
1945 signal, UtilSequenceReq::SignalLength, JBB);
1946
1947 DBUG_VOID_RETURN;
1948 }
1949
1950 void
execUTIL_SEQUENCE_CONF(Signal * signal)1951 Suma::execUTIL_SEQUENCE_CONF(Signal* signal)
1952 {
1953 jamEntry();
1954 DBUG_ENTER("Suma::execUTIL_SEQUENCE_CONF");
1955 ndbassert(signal->getNoOfSections() == 0);
1956 CRASH_INSERTION(13002);
1957
1958 UtilSequenceConf * conf = (UtilSequenceConf*)signal->getDataPtr();
1959 if(conf->requestType == UtilSequenceReq::Create) {
1960 jam();
1961 createSequenceReply(signal, conf, NULL);
1962 DBUG_VOID_RETURN;
1963 }
1964
1965 Uint64 subId;
1966 memcpy(&subId,conf->sequenceValue,8);
1967 SubscriberPtr subbPtr;
1968 c_subscriberPool.getPtr(subbPtr,conf->senderData);
1969
1970 CreateSubscriptionIdConf * subconf = (CreateSubscriptionIdConf*)conf;
1971 subconf->senderRef = reference();
1972 subconf->senderData = subbPtr.p->m_senderData;
1973 subconf->subscriptionId = (Uint32)subId;
1974 subconf->subscriptionKey =(getOwnNodeId() << 16) | (Uint32)(subId & 0xFFFF);
1975
1976 sendSignal(subbPtr.p->m_senderRef, GSN_CREATE_SUBID_CONF, signal,
1977 CreateSubscriptionIdConf::SignalLength, JBB);
1978
1979 c_subscriberPool.release(subbPtr);
1980 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
1981 c_subscriberPool.getSize(),
1982 c_subscriberPool.getNoOfFree()));
1983 DBUG_VOID_RETURN;
1984 }
1985
1986 void
execUTIL_SEQUENCE_REF(Signal * signal)1987 Suma::execUTIL_SEQUENCE_REF(Signal* signal)
1988 {
1989 jamEntry();
1990 DBUG_ENTER("Suma::execUTIL_SEQUENCE_REF");
1991 ndbassert(signal->getNoOfSections() == 0);
1992 UtilSequenceRef * ref = (UtilSequenceRef*)signal->getDataPtr();
1993 Uint32 err= ref->errorCode;
1994
1995 if(ref->requestType == UtilSequenceReq::Create) {
1996 jam();
1997 createSequenceReply(signal, NULL, ref);
1998 DBUG_VOID_RETURN;
1999 }
2000
2001 Uint32 subData = ref->senderData;
2002
2003 SubscriberPtr subbPtr;
2004 c_subscriberPool.getPtr(subbPtr,subData);
2005 if (err == UtilSequenceRef::TCError)
2006 {
2007 jam();
2008 err = ref->TCErrorCode;
2009 }
2010 sendSubIdRef(signal, subbPtr.p->m_senderRef, subbPtr.p->m_senderData, err);
2011 c_subscriberPool.release(subbPtr);
2012 DBUG_PRINT("info",("c_subscriberPool size: %d free: %d",
2013 c_subscriberPool.getSize(),
2014 c_subscriberPool.getNoOfFree()));
2015 DBUG_VOID_RETURN;
2016 }//execUTIL_SEQUENCE_REF()
2017
2018
2019 void
sendSubIdRef(Signal * signal,Uint32 senderRef,Uint32 senderData,Uint32 errCode)2020 Suma::sendSubIdRef(Signal* signal,
2021 Uint32 senderRef, Uint32 senderData, Uint32 errCode)
2022 {
2023 jam();
2024 DBUG_ENTER("Suma::sendSubIdRef");
2025 CreateSubscriptionIdRef * ref =
2026 (CreateSubscriptionIdRef *)signal->getDataPtrSend();
2027
2028 ref->senderRef = reference();
2029 ref->senderData = senderData;
2030 ref->errorCode = errCode;
2031 sendSignal(senderRef,
2032 GSN_CREATE_SUBID_REF,
2033 signal,
2034 CreateSubscriptionIdRef::SignalLength,
2035 JBB);
2036
2037 DBUG_VOID_RETURN;
2038 }
2039
2040 /**********************************************************
2041 * Suma participant interface
2042 *
2043 * Creation of subscriptions
2044 */
2045 void
execSUB_CREATE_REQ(Signal * signal)2046 Suma::execSUB_CREATE_REQ(Signal* signal)
2047 {
2048 jamEntry();
2049 DBUG_ENTER("Suma::execSUB_CREATE_REQ");
2050 ndbassert(signal->getNoOfSections() == 0);
2051 CRASH_INSERTION(13003);
2052
2053 const SubCreateReq req = *(SubCreateReq*)signal->getDataPtr();
2054
2055 const Uint32 senderRef = req.senderRef;
2056 const Uint32 senderData = req.senderData;
2057 const Uint32 subId = req.subscriptionId;
2058 const Uint32 subKey = req.subscriptionKey;
2059 const Uint32 type = req.subscriptionType & SubCreateReq::RemoveFlags;
2060 const Uint32 flags = req.subscriptionType & SubCreateReq::GetFlags;
2061 const Uint32 reportAll = (flags & SubCreateReq::ReportAll) ?
2062 Subscription::REPORT_ALL : 0;
2063 const Uint32 reportSubscribe = (flags & SubCreateReq::ReportSubscribe) ?
2064 Subscription::REPORT_SUBSCRIBE : 0;
2065 const Uint32 noReportDDL = (flags & SubCreateReq::NoReportDDL) ?
2066 Subscription::NO_REPORT_DDL : 0;
2067 const Uint32 tableId = req.tableId;
2068 const Uint32 schemaTransId = req.schemaTransId;
2069
2070 bool subDropped = req.subscriptionType & SubCreateReq::NR_Sub_Dropped;
2071
2072 /**
2073 * This 2 options are only allowed during NR
2074 */
2075 if (subDropped)
2076 {
2077 ndbrequire(refToNode(senderRef) == c_startup.m_restart_server_node_id);
2078 }
2079
2080 Subscription key;
2081 key.m_subscriptionId = subId;
2082 key.m_subscriptionKey = subKey;
2083
2084 DBUG_PRINT("enter",("key.m_subscriptionId: %u, key.m_subscriptionKey: %u",
2085 key.m_subscriptionId, key.m_subscriptionKey));
2086
2087 SubscriptionPtr subPtr;
2088
2089 bool found = c_subscriptions.find(subPtr, key);
2090
2091 if (c_startup.m_restart_server_node_id == RNIL)
2092 {
2093 jam();
2094
2095 /**
2096 * We havent started syncing yet
2097 */
2098 sendSubCreateRef(signal, senderRef, senderData,
2099 SubCreateRef::NotStarted);
2100 return;
2101 }
2102
2103 CRASH_INSERTION2(13040, c_startup.m_restart_server_node_id != RNIL);
2104 CRASH_INSERTION(13041);
2105
2106 bool allowDup = true; //c_startup.m_restart_server_node_id;
2107
2108 if (found && !allowDup)
2109 {
2110 jam();
2111 sendSubCreateRef(signal, senderRef, senderData,
2112 SubCreateRef::SubscriptionAlreadyExist);
2113 return;
2114 }
2115
2116 if (found == false)
2117 {
2118 jam();
2119 if(!c_subscriptions.seize(subPtr))
2120 {
2121 jam();
2122 sendSubCreateRef(signal, senderRef, senderData,
2123 SubCreateRef::OutOfSubscriptionRecords);
2124 return;
2125 }
2126
2127 new (subPtr.p) Subscription();
2128 subPtr.p->m_seq_no = c_current_seq;
2129 subPtr.p->m_subscriptionId = subId;
2130 subPtr.p->m_subscriptionKey = subKey;
2131 subPtr.p->m_subscriptionType = type;
2132 subPtr.p->m_tableId = tableId;
2133 subPtr.p->m_table_ptrI = RNIL;
2134 subPtr.p->m_state = Subscription::UNDEFINED;
2135 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
2136 subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
2137 subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
2138 subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
2139 subPtr.p->m_errorCode = 0;
2140 subPtr.p->m_options = reportSubscribe | reportAll | noReportDDL;
2141 subPtr.p->m_schemaTransId = schemaTransId;
2142 }
2143
2144 Ptr<SubOpRecord> subOpPtr;
2145 LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_create_req);
2146 if ((ERROR_INSERTED(13044) && found == false) ||
2147 subOpList.seize(subOpPtr) == false)
2148 {
2149 jam();
2150 if (found == false)
2151 {
2152 jam();
2153 if (ERROR_INSERTED(13044))
2154 {
2155 CLEAR_ERROR_INSERT_VALUE;
2156 }
2157 c_subscriptionPool.release(subPtr); // not yet in hash
2158 }
2159 sendSubCreateRef(signal, senderRef, senderData,
2160 SubCreateRef::OutOfTableRecords);
2161 return;
2162 }
2163
2164 subOpPtr.p->m_senderRef = senderRef;
2165 subOpPtr.p->m_senderData = senderData;
2166
2167 if (subDropped)
2168 {
2169 jam();
2170 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
2171 }
2172
2173 TablePtr tabPtr;
2174 if (found)
2175 {
2176 jam();
2177 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2178 }
2179 else if (c_tables.find(tabPtr, tableId))
2180 {
2181 jam();
2182 }
2183 else
2184 {
2185 jam();
2186 if (ERROR_INSERTED(13045) || c_tablePool.seize(tabPtr) == false)
2187 {
2188 jam();
2189 if (ERROR_INSERTED(13045))
2190 {
2191 CLEAR_ERROR_INSERT_VALUE;
2192 }
2193
2194 subOpList.release(subOpPtr);
2195 c_subscriptionPool.release(subPtr); // not yet in hash
2196 sendSubCreateRef(signal, senderRef, senderData,
2197 SubCreateRef::OutOfTableRecords);
2198 return;
2199 }
2200
2201 new (tabPtr.p) Table;
2202 tabPtr.p->m_tableId= tableId;
2203 tabPtr.p->m_ptrI= tabPtr.i;
2204 tabPtr.p->m_error = 0;
2205 tabPtr.p->m_schemaVersion = RNIL;
2206 tabPtr.p->m_state = Table::UNDEFINED;
2207 tabPtr.p->m_schemaTransId = schemaTransId;
2208 c_tables.add(tabPtr);
2209 }
2210
2211 if (found == false)
2212 {
2213 jam();
2214 c_subscriptions.add(subPtr);
2215 LocalDLList<Subscription> list(c_subscriptionPool,
2216 tabPtr.p->m_subscriptions);
2217 list.add(subPtr);
2218 subPtr.p->m_table_ptrI = tabPtr.i;
2219 }
2220
2221 switch(tabPtr.p->m_state){
2222 case Table::DEFINED:{
2223 jam();
2224 // Send conf
2225 subOpList.release(subOpPtr);
2226 subPtr.p->m_state = Subscription::DEFINED;
2227 SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2228 conf->senderRef = reference();
2229 conf->senderData = senderData;
2230 sendSignal(senderRef, GSN_SUB_CREATE_CONF, signal,
2231 SubCreateConf::SignalLength, JBB);
2232 return;
2233 }
2234 case Table::UNDEFINED:{
2235 jam();
2236 tabPtr.p->m_state = Table::DEFINING;
2237 subPtr.p->m_state = Subscription::DEFINING;
2238
2239 if (ERROR_INSERTED(13031))
2240 {
2241 jam();
2242 CLEAR_ERROR_INSERT_VALUE;
2243 GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtrSend();
2244 ref->tableId = tableId;
2245 ref->senderData = tabPtr.i;
2246 ref->errorCode = GetTabInfoRef::TableNotDefined;
2247 sendSignal(reference(), GSN_GET_TABINFOREF, signal,
2248 GetTabInfoRef::SignalLength, JBB);
2249 return;
2250 }
2251
2252 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2253 req->senderRef = reference();
2254 req->senderData = tabPtr.i;
2255 req->requestType =
2256 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2257 req->tableId = tableId;
2258 req->schemaTransId = schemaTransId;
2259
2260 sendSignal(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2261 GetTabInfoReq::SignalLength, JBB);
2262 return;
2263 }
2264 case Table::DEFINING:
2265 {
2266 jam();
2267 /**
2268 * just wait for completion
2269 */
2270 subPtr.p->m_state = Subscription::DEFINING;
2271 return;
2272 }
2273 case Table::DROPPED:
2274 {
2275 subOpList.release(subOpPtr);
2276
2277 {
2278 LocalDLList<Subscription> list(c_subscriptionPool,
2279 tabPtr.p->m_subscriptions);
2280 list.remove(subPtr);
2281 }
2282 c_subscriptions.release(subPtr);
2283
2284 sendSubCreateRef(signal, senderRef, senderData,
2285 SubCreateRef::TableDropped);
2286 return;
2287 }
2288 }
2289
2290 ndbrequire(false);
2291 }
2292
2293 void
sendSubCreateRef(Signal * signal,Uint32 retRef,Uint32 data,Uint32 errCode)2294 Suma::sendSubCreateRef(Signal* signal, Uint32 retRef, Uint32 data,
2295 Uint32 errCode)
2296 {
2297 jam();
2298 SubCreateRef * ref = (SubCreateRef *)signal->getDataPtrSend();
2299 ref->errorCode = errCode;
2300 ref->senderData = data;
2301 sendSignal(retRef, GSN_SUB_CREATE_REF, signal,
2302 SubCreateRef::SignalLength, JBB);
2303 return;
2304 }
2305
2306 /**********************************************************
2307 *
2308 * Setting upp trigger for subscription
2309 *
2310 */
2311
2312 void
execSUB_SYNC_REQ(Signal * signal)2313 Suma::execSUB_SYNC_REQ(Signal* signal)
2314 {
2315 jamEntry();
2316
2317 CRASH_INSERTION(13004);
2318
2319 SubSyncReq * const req = (SubSyncReq*)signal->getDataPtr();
2320
2321 SubscriptionPtr subPtr;
2322 Subscription key;
2323 key.m_subscriptionId = req->subscriptionId;
2324 key.m_subscriptionKey = req->subscriptionKey;
2325
2326 SectionHandle handle(this, signal);
2327 if(!c_subscriptions.find(subPtr, key))
2328 {
2329 jam();
2330 releaseSections(handle);
2331 sendSubSyncRef(signal, 1407);
2332 return;
2333 }
2334
2335 Ptr<SyncRecord> syncPtr;
2336 LocalDLList<SyncRecord> list(c_syncPool, subPtr.p->m_syncRecords);
2337 if(!list.seize(syncPtr))
2338 {
2339 jam();
2340 releaseSections(handle);
2341 sendSubSyncRef(signal, 1416);
2342 return;
2343 }
2344
2345 new (syncPtr.p) Ptr<SyncRecord>;
2346 syncPtr.p->m_senderRef = req->senderRef;
2347 syncPtr.p->m_senderData = req->senderData;
2348 syncPtr.p->m_subscriptionPtrI = subPtr.i;
2349 syncPtr.p->ptrI = syncPtr.i;
2350 syncPtr.p->m_error = 0;
2351 syncPtr.p->m_requestInfo = req->requestInfo;
2352 syncPtr.p->m_frag_cnt = req->fragCount;
2353 syncPtr.p->m_frag_id = req->fragId;
2354 syncPtr.p->m_tableId = subPtr.p->m_tableId;
2355
2356 {
2357 jam();
2358 if(handle.m_cnt > 0)
2359 {
2360 SegmentedSectionPtr ptr;
2361 handle.getSection(ptr, SubSyncReq::ATTRIBUTE_LIST);
2362 LocalDataBuffer<15> attrBuf(c_dataBufferPool, syncPtr.p->m_attributeList);
2363 append(attrBuf, ptr, getSectionSegmentPool());
2364 }
2365 if (req->requestInfo & SubSyncReq::RangeScan)
2366 {
2367 jam();
2368 ndbrequire(handle.m_cnt > 1)
2369 SegmentedSectionPtr ptr;
2370 handle.getSection(ptr, SubSyncReq::TUX_BOUND_INFO);
2371 LocalDataBuffer<15> boundBuf(c_dataBufferPool, syncPtr.p->m_boundInfo);
2372 append(boundBuf, ptr, getSectionSegmentPool());
2373 }
2374 releaseSections(handle);
2375 }
2376
2377 /**
2378 * We need to gather fragment info
2379 */
2380 {
2381 jam();
2382 DihScanTabReq* req = (DihScanTabReq*)signal->getDataPtrSend();
2383 req->senderRef = reference();
2384 req->senderData = syncPtr.i;
2385 req->tableId = subPtr.p->m_tableId;
2386 req->schemaTransId = subPtr.p->m_schemaTransId;
2387 sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2388 DihScanTabReq::SignalLength, JBB);
2389 }
2390 }
2391
2392 void
sendSubSyncRef(Signal * signal,Uint32 errCode)2393 Suma::sendSubSyncRef(Signal* signal, Uint32 errCode){
2394 jam();
2395 SubSyncRef * ref= (SubSyncRef *)signal->getDataPtrSend();
2396 ref->errorCode = errCode;
2397 sendSignal(signal->getSendersBlockRef(),
2398 GSN_SUB_SYNC_REF,
2399 signal,
2400 SubSyncRef::SignalLength,
2401 JBB);
2402 return;
2403 }
2404
2405 void
execDIH_SCAN_TAB_REF(Signal * signal)2406 Suma::execDIH_SCAN_TAB_REF(Signal* signal)
2407 {
2408 jamEntry();
2409 DBUG_ENTER("Suma::execDI_FCOUNTREF");
2410 DihScanTabRef * ref = (DihScanTabRef*)signal->getDataPtr();
2411 switch ((DihScanTabRef::ErrorCode) ref->error)
2412 {
2413 case DihScanTabRef::ErroneousTableState:
2414 jam();
2415 if (ref->tableStatus == Dbdih::TabRecord::TS_CREATING)
2416 {
2417 const Uint32 tableId = ref->tableId;
2418 const Uint32 synPtrI = ref->senderData;
2419 const Uint32 schemaTransId = ref->schemaTransId;
2420 DihScanTabReq * req = (DihScanTabReq*)signal->getDataPtrSend();
2421
2422 req->senderData = synPtrI;
2423 req->senderRef = reference();
2424 req->tableId = tableId;
2425 req->schemaTransId = schemaTransId;
2426 sendSignalWithDelay(DBDIH_REF, GSN_DIH_SCAN_TAB_REQ, signal,
2427 DihScanTabReq::SignalLength,
2428 DihScanTabReq::RetryInterval);
2429 DBUG_VOID_RETURN;
2430 }
2431 ndbrequire(false);
2432 default:
2433 ndbrequire(false);
2434 }
2435
2436 DBUG_VOID_RETURN;
2437 }
2438
2439 void
execDIH_SCAN_TAB_CONF(Signal * signal)2440 Suma::execDIH_SCAN_TAB_CONF(Signal* signal)
2441 {
2442 jamEntry();
2443 DBUG_ENTER("Suma::execDI_FCOUNTCONF");
2444 ndbassert(signal->getNoOfSections() == 0);
2445 DihScanTabConf * conf = (DihScanTabConf*)signal->getDataPtr();
2446 const Uint32 tableId = conf->tableId;
2447 const Uint32 fragCount = conf->fragmentCount;
2448 const Uint32 scanCookie = conf->scanCookie;
2449
2450 Ptr<SyncRecord> ptr;
2451 c_syncPool.getPtr(ptr, conf->senderData);
2452
2453 LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2454 ndbrequire(fragBuf.getSize() == 0);
2455
2456 ndbassert(fragCount >= ptr.p->m_frag_cnt);
2457 if (ptr.p->m_frag_cnt == 0)
2458 {
2459 jam();
2460 ptr.p->m_frag_cnt = fragCount;
2461 }
2462 ptr.p->m_scan_cookie = scanCookie;
2463
2464 DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2465 req->senderRef = reference();
2466 req->senderData = ptr.i;
2467 req->tableId = tableId;
2468 req->fragId = 0;
2469 req->scanCookie = scanCookie;
2470 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2471 DihScanGetNodesReq::SignalLength, JBB);
2472
2473 DBUG_VOID_RETURN;
2474 }
2475
2476 void
execDIH_SCAN_GET_NODES_CONF(Signal * signal)2477 Suma::execDIH_SCAN_GET_NODES_CONF(Signal* signal)
2478 {
2479 jamEntry();
2480 DBUG_ENTER("Suma::execDIGETPRIMCONF");
2481 ndbassert(signal->getNoOfSections() == 0);
2482
2483 DihScanGetNodesConf* conf = (DihScanGetNodesConf*)signal->getDataPtr();
2484 const Uint32 nodeCount = conf->count;
2485 const Uint32 tableId = conf->tableId;
2486 const Uint32 fragNo = conf->fragId;
2487
2488 ndbrequire(nodeCount > 0 && nodeCount <= MAX_REPLICAS);
2489
2490 Ptr<SyncRecord> ptr;
2491 c_syncPool.getPtr(ptr, conf->senderData);
2492
2493 {
2494 LocalDataBuffer<15> fragBuf(c_dataBufferPool, ptr.p->m_fragments);
2495
2496 /**
2497 * Add primary node for fragment to list
2498 */
2499 FragmentDescriptor fd;
2500 fd.m_fragDesc.m_nodeId = conf->nodes[0];
2501 fd.m_fragDesc.m_fragmentNo = fragNo;
2502 fd.m_fragDesc.m_lqhInstanceKey = conf->instanceKey;
2503 if (ptr.p->m_frag_id == ZNIL)
2504 {
2505 signal->theData[2] = fd.m_dummy;
2506 fragBuf.append(&signal->theData[2], 1);
2507 }
2508 else if (ptr.p->m_frag_id == fragNo)
2509 {
2510 /*
2511 * Given fragment must have a replica on this node.
2512 */
2513 const Uint32 ownNodeId = getOwnNodeId();
2514 Uint32 i = 0;
2515 for (i = 0; i < nodeCount; i++)
2516 if (conf->nodes[i] == ownNodeId)
2517 break;
2518 if (i == nodeCount)
2519 {
2520 sendSubSyncRef(signal, 1428);
2521 return;
2522 }
2523 fd.m_fragDesc.m_nodeId = ownNodeId;
2524 signal->theData[2] = fd.m_dummy;
2525 fragBuf.append(&signal->theData[2], 1);
2526 }
2527 }
2528
2529 const Uint32 nextFrag = fragNo + 1;
2530 if(nextFrag == ptr.p->m_frag_cnt)
2531 {
2532 jam();
2533
2534 ptr.p->startScan(signal);
2535 return;
2536 }
2537
2538 DihScanGetNodesReq* req = (DihScanGetNodesReq*)signal->getDataPtrSend();
2539 req->senderRef = reference();
2540 req->senderData = ptr.i;
2541 req->tableId = tableId;
2542 req->fragId = nextFrag;
2543 req->scanCookie = ptr.p->m_scan_cookie;
2544 sendSignal(DBDIH_REF, GSN_DIH_SCAN_GET_NODES_REQ, signal,
2545 DihScanGetNodesReq::SignalLength, JBB);
2546
2547 DBUG_VOID_RETURN;
2548 }
2549
2550 /**********************************************************
2551 * Dict interface
2552 */
2553
2554 /*************************************************************************
2555 *
2556 *
2557 */
2558 void
execGET_TABINFOREF(Signal * signal)2559 Suma::execGET_TABINFOREF(Signal* signal){
2560 jamEntry();
2561 GetTabInfoRef* ref = (GetTabInfoRef*)signal->getDataPtr();
2562 Uint32 tableId = ref->tableId;
2563 Uint32 senderData = ref->senderData;
2564 Uint32 schemaTransId = ref->schemaTransId;
2565 GetTabInfoRef::ErrorCode errorCode =
2566 (GetTabInfoRef::ErrorCode) ref->errorCode;
2567 int do_resend_request = 0;
2568 TablePtr tabPtr;
2569 c_tablePool.getPtr(tabPtr, senderData);
2570 switch (errorCode)
2571 {
2572 case GetTabInfoRef::TableNotDefined:
2573 // wrong state
2574 break;
2575 case GetTabInfoRef::InvalidTableId:
2576 // no such table
2577 break;
2578 case GetTabInfoRef::Busy:
2579 do_resend_request = 1;
2580 break;
2581 case GetTabInfoRef::NoFetchByName:
2582 jam();
2583 case GetTabInfoRef::TableNameTooLong:
2584 jam();
2585 ndbrequire(false);
2586 }
2587 if (tabPtr.p->m_state == Table::DROPPED)
2588 {
2589 jam();
2590 do_resend_request = 0;
2591 }
2592
2593 if (do_resend_request)
2594 {
2595 GetTabInfoReq * req = (GetTabInfoReq *)signal->getDataPtrSend();
2596 req->senderRef = reference();
2597 req->senderData = senderData;
2598 req->requestType =
2599 GetTabInfoReq::RequestById | GetTabInfoReq::LongSignalConf;
2600 req->tableId = tableId;
2601 req->schemaTransId = schemaTransId;
2602 sendSignalWithDelay(DBDICT_REF, GSN_GET_TABINFOREQ, signal,
2603 30, GetTabInfoReq::SignalLength);
2604 return;
2605 }
2606 get_tabinfo_ref_release(signal, tabPtr);
2607 }
2608
2609 void
get_tabinfo_ref_release(Signal * signal,Ptr<Table> tabPtr)2610 Suma::get_tabinfo_ref_release(Signal* signal, Ptr<Table> tabPtr)
2611 {
2612 LocalDLList<Subscription> subList(c_subscriptionPool,
2613 tabPtr.p->m_subscriptions);
2614 Ptr<Subscription> subPtr;
2615 bool empty = subList.isEmpty();
2616 for(subList.first(subPtr); !subPtr.isNull();)
2617 {
2618 jam();
2619 Ptr<SubOpRecord> ptr;
2620 ndbassert(subPtr.p->m_start_req.isEmpty());
2621 ndbassert(subPtr.p->m_stop_req.isEmpty());
2622 LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2623 for (list.first(ptr); !ptr.isNull(); )
2624 {
2625 jam();
2626 sendSubCreateRef(signal,
2627 ptr.p->m_senderRef,
2628 ptr.p->m_senderData,
2629 SubCreateRef::TableDropped);
2630
2631 Ptr<SubOpRecord> tmp0 = ptr;
2632 list.next(ptr);
2633 list.release(tmp0);
2634 }
2635 Ptr<Subscription> tmp1 = subPtr;
2636 subList.next(subPtr);
2637 c_subscriptions.remove(tmp1);
2638 subList.release(tmp1);
2639 }
2640
2641 c_tables.release(tabPtr);
2642 ndbassert(!empty);
2643 }
2644
2645 void
execGET_TABINFO_CONF(Signal * signal)2646 Suma::execGET_TABINFO_CONF(Signal* signal){
2647 jamEntry();
2648
2649 CRASH_INSERTION(13006);
2650
2651 if(!assembleFragments(signal)){
2652 return;
2653 }
2654
2655 SectionHandle handle(this, signal);
2656 GetTabInfoConf* conf = (GetTabInfoConf*)signal->getDataPtr();
2657 TablePtr tabPtr;
2658 c_tablePool.getPtr(tabPtr, conf->senderData);
2659 SegmentedSectionPtr ptr;
2660 handle.getSection(ptr, GetTabInfoConf::DICT_TAB_INFO);
2661 ndbrequire(tabPtr.p->parseTable(ptr, *this));
2662 releaseSections(handle);
2663
2664 if (tabPtr.p->m_state == Table::DROPPED)
2665 {
2666 jam();
2667 get_tabinfo_ref_release(signal, tabPtr);
2668 return;
2669 }
2670
2671 tabPtr.p->m_state = Table::DEFINED;
2672
2673 LocalDLList<Subscription> subList(c_subscriptionPool,
2674 tabPtr.p->m_subscriptions);
2675 Ptr<Subscription> subPtr;
2676 bool empty = subList.isEmpty();
2677 for(subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
2678 {
2679 jam();
2680 subPtr.p->m_state = Subscription::DEFINED;
2681
2682 Ptr<SubOpRecord> ptr;
2683 LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_create_req);
2684 for (list.first(ptr); !ptr.isNull();)
2685 {
2686 jam();
2687 SubCreateConf * const conf = (SubCreateConf*)signal->getDataPtrSend();
2688 conf->senderRef = reference();
2689 conf->senderData = ptr.p->m_senderData;
2690 sendSignal(ptr.p->m_senderRef, GSN_SUB_CREATE_CONF, signal,
2691 SubCreateConf::SignalLength, JBB);
2692
2693 Ptr<SubOpRecord> tmp = ptr;
2694 list.next(ptr);
2695 list.release(tmp);
2696 }
2697 }
2698
2699 ndbassert(!empty);
2700 }
2701
2702 bool
parseTable(SegmentedSectionPtr ptr,Suma & suma)2703 Suma::Table::parseTable(SegmentedSectionPtr ptr,
2704 Suma &suma)
2705 {
2706 DBUG_ENTER("Suma::Table::parseTable");
2707
2708 SimplePropertiesSectionReader it(ptr, suma.getSectionSegmentPool());
2709
2710 SimpleProperties::UnpackStatus s;
2711 DictTabInfo::Table tableDesc; tableDesc.init();
2712 s = SimpleProperties::unpack(it, &tableDesc,
2713 DictTabInfo::TableMapping,
2714 DictTabInfo::TableMappingSize,
2715 true, true);
2716
2717 jamBlock(&suma);
2718 suma.suma_ndbrequire(s == SimpleProperties::Break);
2719
2720 /**
2721 * Initialize table object
2722 */
2723 m_noOfAttributes = tableDesc.NoOfAttributes;
2724 m_schemaVersion = tableDesc.TableVersion;
2725
2726 DBUG_RETURN(true);
2727 }
2728
2729 /**********************************************************
2730 *
2731 * Scan interface
2732 *
2733 */
2734
2735 void
startScan(Signal * signal)2736 Suma::SyncRecord::startScan(Signal* signal)
2737 {
2738 jam();
2739 DBUG_ENTER("Suma::SyncRecord::startScan");
2740
2741 /**
2742 * Get fraginfo
2743 */
2744 m_currentFragment = 0;
2745 nextScan(signal);
2746 DBUG_VOID_RETURN;
2747 }
2748
2749 bool
getNextFragment(TablePtr * tab,FragmentDescriptor * fd)2750 Suma::SyncRecord::getNextFragment(TablePtr * tab,
2751 FragmentDescriptor * fd)
2752 {
2753 jam();
2754 SubscriptionPtr subPtr;
2755 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2756 DataBuffer<15>::DataBufferIterator fragIt;
2757
2758 TablePtr tabPtr;
2759 suma.c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
2760 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
2761
2762 fragBuf.position(fragIt, m_currentFragment);
2763 for(; !fragIt.curr.isNull(); fragBuf.next(fragIt), m_currentFragment++)
2764 {
2765 FragmentDescriptor tmp;
2766 tmp.m_dummy = * fragIt.data;
2767 if(tmp.m_fragDesc.m_nodeId == suma.getOwnNodeId()){
2768 * fd = tmp;
2769 * tab = tabPtr;
2770 return true;
2771 }
2772 }
2773 m_currentFragment = 0;
2774 return false;
2775 }
2776
2777 void
nextScan(Signal * signal)2778 Suma::SyncRecord::nextScan(Signal* signal)
2779 {
2780 jam();
2781 DBUG_ENTER("Suma::SyncRecord::nextScan");
2782 TablePtr tabPtr;
2783 FragmentDescriptor fd;
2784 SubscriptionPtr subPtr;
2785 if(!getNextFragment(&tabPtr, &fd)){
2786 jam();
2787 completeScan(signal);
2788 DBUG_VOID_RETURN;
2789 }
2790
2791 suma.c_subscriptions.getPtr(subPtr, m_subscriptionPtrI);
2792
2793 DataBuffer<15>::Head head = m_attributeList;
2794 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, head);
2795
2796 Uint32 instanceKey = fd.m_fragDesc.m_lqhInstanceKey;
2797 BlockReference lqhRef = numberToRef(DBLQH, instanceKey, suma.getOwnNodeId());
2798
2799 ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
2800 const Uint32 parallelism = 16;
2801 //const Uint32 attrLen = 5 + attrBuf.getSize();
2802
2803 req->senderData = ptrI;
2804 req->resultRef = suma.reference();
2805 req->tableId = tabPtr.p->m_tableId;
2806 req->requestInfo = 0;
2807 req->savePointId = 0;
2808 ScanFragReq::setLockMode(req->requestInfo, 0);
2809 ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2810 ScanFragReq::setKeyinfoFlag(req->requestInfo, 0);
2811 if (m_requestInfo & SubSyncReq::NoDisk)
2812 {
2813 ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
2814 }
2815
2816 if (m_requestInfo & SubSyncReq::LM_Exclusive)
2817 {
2818 ScanFragReq::setLockMode(req->requestInfo, 1);
2819 ScanFragReq::setHoldLockFlag(req->requestInfo, 1);
2820 ScanFragReq::setKeyinfoFlag(req->requestInfo, 1);
2821 }
2822
2823 if (m_requestInfo & SubSyncReq::Reorg)
2824 {
2825 ScanFragReq::setReorgFlag(req->requestInfo, ScanFragReq::REORG_MOVED);
2826 }
2827
2828 if (m_requestInfo & SubSyncReq::TupOrder)
2829 {
2830 ScanFragReq::setTupScanFlag(req->requestInfo, 1);
2831 }
2832
2833 if (m_requestInfo & SubSyncReq::LM_CommittedRead)
2834 {
2835 ScanFragReq::setReadCommittedFlag(req->requestInfo, 1);
2836 }
2837
2838 if (m_requestInfo & SubSyncReq::RangeScan)
2839 {
2840 ScanFragReq::setRangeScanFlag(req->requestInfo, 1);
2841 }
2842
2843 if (m_requestInfo & SubSyncReq::StatScan)
2844 {
2845 ScanFragReq::setStatScanFlag(req->requestInfo, 1);
2846 }
2847
2848 req->fragmentNoKeyLen = fd.m_fragDesc.m_fragmentNo;
2849 req->schemaVersion = tabPtr.p->m_schemaVersion;
2850 req->transId1 = 0;
2851 req->transId2 = (SUMA << 20) + (suma.getOwnNodeId() << 8);
2852 req->clientOpPtr = (ptrI << 16);
2853 req->batch_size_rows= parallelism;
2854
2855 req->batch_size_bytes= 0;
2856
2857 Uint32 * attrInfo = signal->theData + 25;
2858 attrInfo[0] = attrBuf.getSize();
2859 attrInfo[1] = 0;
2860 attrInfo[2] = 0;
2861 attrInfo[3] = 0;
2862 attrInfo[4] = 0;
2863
2864 Uint32 pos = 5;
2865 DataBuffer<15>::DataBufferIterator it;
2866 for(attrBuf.first(it); !it.curr.isNull(); attrBuf.next(it))
2867 {
2868 AttributeHeader::init(&attrInfo[pos++], * it.data, 0);
2869 }
2870 LinearSectionPtr ptr[3];
2871 Uint32 noOfSections;
2872 ptr[0].p = attrInfo;
2873 ptr[0].sz = pos;
2874 noOfSections = 1;
2875 if (m_requestInfo & SubSyncReq::RangeScan)
2876 {
2877 jam();
2878 Uint32 oldpos = pos; // after attrInfo
2879 LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
2880 for (boundBuf.first(it); !it.curr.isNull(); boundBuf.next(it))
2881 {
2882 attrInfo[pos++] = *it.data;
2883 }
2884 ptr[1].p = &attrInfo[oldpos];
2885 ptr[1].sz = pos - oldpos;
2886 noOfSections = 2;
2887 }
2888 suma.sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
2889 ScanFragReq::SignalLength, JBB, ptr, noOfSections);
2890
2891 m_currentNoOfAttributes = attrBuf.getSize();
2892
2893 DBUG_VOID_RETURN;
2894 }
2895
2896
2897 void
execSCAN_FRAGREF(Signal * signal)2898 Suma::execSCAN_FRAGREF(Signal* signal){
2899 jamEntry();
2900
2901 // ScanFragRef * const ref = (ScanFragRef*)signal->getDataPtr();
2902 ndbrequire(false);
2903 }
2904
2905 void
execSCAN_FRAGCONF(Signal * signal)2906 Suma::execSCAN_FRAGCONF(Signal* signal){
2907 jamEntry();
2908 DBUG_ENTER("Suma::execSCAN_FRAGCONF");
2909 ndbassert(signal->getNoOfSections() == 0);
2910 CRASH_INSERTION(13011);
2911
2912 ScanFragConf * const conf = (ScanFragConf*)signal->getDataPtr();
2913
2914 const Uint32 completed = conf->fragmentCompleted;
2915 const Uint32 senderData = conf->senderData;
2916 const Uint32 completedOps = conf->completedOps;
2917
2918 Ptr<SyncRecord> syncPtr;
2919 c_syncPool.getPtr(syncPtr, senderData);
2920
2921 if(completed != 2){ // 2==ZSCAN_FRAG_CLOSED
2922 jam();
2923
2924 #if PRINT_ONLY
2925 SubSyncContinueConf * const conf =
2926 (SubSyncContinueConf*)signal->getDataPtrSend();
2927 conf->subscriptionId = subPtr.p->m_subscriptionId;
2928 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
2929 execSUB_SYNC_CONTINUE_CONF(signal);
2930 #else
2931 SubSyncContinueReq * const req = (SubSyncContinueReq*)signal->getDataPtrSend();
2932 req->subscriberData = syncPtr.p->m_senderData;
2933 req->noOfRowsSent = completedOps;
2934 req->senderData = senderData;
2935 sendSignal(syncPtr.p->m_senderRef, GSN_SUB_SYNC_CONTINUE_REQ, signal,
2936 SubSyncContinueReq::SignalLength, JBB);
2937 #endif
2938 DBUG_VOID_RETURN;
2939 }
2940
2941 ndbrequire(completedOps == 0);
2942
2943 syncPtr.p->m_currentFragment++;
2944 syncPtr.p->nextScan(signal);
2945 DBUG_VOID_RETURN;
2946 }
2947
2948 void
execSUB_SYNC_CONTINUE_CONF(Signal * signal)2949 Suma::execSUB_SYNC_CONTINUE_CONF(Signal* signal){
2950 jamEntry();
2951 ndbassert(signal->getNoOfSections() == 0);
2952
2953 CRASH_INSERTION(13012);
2954
2955 SubSyncContinueConf * const conf =
2956 (SubSyncContinueConf*)signal->getDataPtr();
2957
2958 SubscriptionPtr subPtr;
2959 Subscription key;
2960 key.m_subscriptionId = conf->subscriptionId;
2961 key.m_subscriptionKey = conf->subscriptionKey;
2962 Uint32 syncPtrI = conf->senderData;
2963
2964 ndbrequire(c_subscriptions.find(subPtr, key));
2965
2966 Uint32 instanceKey;
2967 {
2968 Ptr<SyncRecord> syncPtr;
2969 c_syncPool.getPtr(syncPtr, syncPtrI);
2970 LocalDataBuffer<15> fragBuf(c_dataBufferPool, syncPtr.p->m_fragments);
2971 DataBuffer<15>::DataBufferIterator fragIt;
2972 bool ok = fragBuf.position(fragIt, syncPtr.p->m_currentFragment);
2973 ndbrequire(ok);
2974 FragmentDescriptor tmp;
2975 tmp.m_dummy = * fragIt.data;
2976 instanceKey = tmp.m_fragDesc.m_lqhInstanceKey;
2977 }
2978 BlockReference lqhRef = numberToRef(DBLQH, instanceKey, getOwnNodeId());
2979
2980 ScanFragNextReq * req = (ScanFragNextReq *)signal->getDataPtrSend();
2981 req->senderData = syncPtrI;
2982 req->requestInfo = 0;
2983 req->transId1 = 0;
2984 req->transId2 = (SUMA << 20) + (getOwnNodeId() << 8);
2985 req->batch_size_rows = 16;
2986 req->batch_size_bytes = 0;
2987 sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
2988 ScanFragNextReq::SignalLength, JBB);
2989 }
2990
2991 void
completeScan(Signal * signal,int error)2992 Suma::SyncRecord::completeScan(Signal* signal, int error)
2993 {
2994 jam();
2995 DBUG_ENTER("Suma::SyncRecord::completeScan");
2996
2997 SubscriptionPtr subPtr;
2998 suma.c_subscriptionPool.getPtr(subPtr, m_subscriptionPtrI);
2999
3000 DihScanTabCompleteRep* rep = (DihScanTabCompleteRep*)signal->getDataPtr();
3001 rep->tableId = subPtr.p->m_tableId;
3002 rep->scanCookie = m_scan_cookie;
3003 suma.sendSignal(DBDIH_REF, GSN_DIH_SCAN_TAB_COMPLETE_REP, signal,
3004 DihScanTabCompleteRep::SignalLength, JBB);
3005
3006 #if PRINT_ONLY
3007 ndbout_c("GSN_SUB_SYNC_CONF (data)");
3008 #else
3009 if (error == 0)
3010 {
3011 SubSyncConf * const conf = (SubSyncConf*)signal->getDataPtrSend();
3012 conf->senderRef = suma.reference();
3013 conf->senderData = m_senderData;
3014 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_CONF, signal,
3015 SubSyncConf::SignalLength, JBB);
3016 }
3017 else
3018 {
3019 SubSyncRef * const ref = (SubSyncRef*)signal->getDataPtrSend();
3020 ref->senderRef = suma.reference();
3021 ref->senderData = m_senderData;
3022 suma.sendSignal(m_senderRef, GSN_SUB_SYNC_REF, signal,
3023 SubSyncRef::SignalLength, JBB);
3024 }
3025 #endif
3026
3027 release();
3028 LocalDLList<SyncRecord> list(suma.c_syncPool, subPtr.p->m_syncRecords);
3029 Ptr<SyncRecord> tmp;
3030 tmp.i = ptrI;
3031 tmp.p = this;
3032 list.release(tmp);
3033
3034 DBUG_VOID_RETURN;
3035 }
3036
3037 void
execSCAN_HBREP(Signal * signal)3038 Suma::execSCAN_HBREP(Signal* signal){
3039 jamEntry();
3040 #if 0
3041 ndbout << "execSCAN_HBREP" << endl << hex;
3042 for(int i = 0; i<signal->length(); i++){
3043 ndbout << signal->theData[i] << " ";
3044 if(((i + 1) % 8) == 0)
3045 ndbout << endl << hex;
3046 }
3047 ndbout << endl;
3048 #endif
3049 }
3050
3051 /**********************************************************
3052 *
3053 * Suma participant interface
3054 *
3055 * Creation of subscriber
3056 *
3057 */
3058
3059 void
execSUB_START_REQ(Signal * signal)3060 Suma::execSUB_START_REQ(Signal* signal){
3061 jamEntry();
3062 ndbassert(signal->getNoOfSections() == 0);
3063 DBUG_ENTER("Suma::execSUB_START_REQ");
3064 SubStartReq * const req = (SubStartReq*)signal->getDataPtr();
3065
3066 CRASH_INSERTION(13013);
3067 Uint32 senderRef = req->senderRef;
3068 Uint32 senderData = req->senderData;
3069 Uint32 subscriberData = req->subscriberData;
3070 Uint32 subscriberRef = req->subscriberRef;
3071 SubscriptionData::Part part = (SubscriptionData::Part)req->part;
3072 (void)part; // TODO validate part
3073
3074 Subscription key;
3075 key.m_subscriptionId = req->subscriptionId;
3076 key.m_subscriptionKey = req->subscriptionKey;
3077
3078 SubscriptionPtr subPtr;
3079
3080 CRASH_INSERTION2(13042, getNodeState().startLevel == NodeState::SL_STARTING);
3081
3082 if (c_startup.m_restart_server_node_id == RNIL)
3083 {
3084 jam();
3085
3086 /**
3087 * We havent started syncing yet
3088 */
3089 sendSubStartRef(signal,
3090 senderRef, senderData, SubStartRef::NotStarted);
3091 return;
3092 }
3093
3094 bool found = c_subscriptions.find(subPtr, key);
3095 if (!found)
3096 {
3097 jam();
3098 sendSubStartRef(signal,
3099 senderRef, senderData, SubStartRef::NoSuchSubscription);
3100 return;
3101 }
3102
3103 if (ERROR_INSERTED(13046))
3104 {
3105 jam();
3106 CLEAR_ERROR_INSERT_VALUE;
3107 sendSubStartRef(signal,
3108 senderRef, senderData, SubStartRef::NoSuchSubscription);
3109 return;
3110 }
3111
3112 switch(subPtr.p->m_state){
3113 case Subscription::UNDEFINED:
3114 jam();
3115 ndbrequire(false);
3116 case Subscription::DEFINING:
3117 jam();
3118 sendSubStartRef(signal,
3119 senderRef, senderData, SubStartRef::Defining);
3120 return;
3121 case Subscription::DEFINED:
3122 break;
3123 }
3124
3125 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
3126 {
3127 jam();
3128 if (c_startup.m_restart_server_node_id == 0)
3129 {
3130 sendSubStartRef(signal,
3131 senderRef, senderData, SubStartRef::Dropped);
3132 return;
3133 }
3134 else
3135 {
3136 /**
3137 * Allow SUB_START_REQ from peer node
3138 */
3139 }
3140 }
3141
3142 if (subPtr.p->m_trigger_state == Subscription::T_ERROR)
3143 {
3144 jam();
3145 sendSubStartRef(signal,
3146 senderRef, senderData, subPtr.p->m_errorCode);
3147 return;
3148 }
3149
3150 SubscriberPtr subbPtr;
3151 if(!c_subscriberPool.seize(subbPtr))
3152 {
3153 jam();
3154 sendSubStartRef(signal,
3155 senderRef, senderData, SubStartRef::OutOfSubscriberRecords);
3156 return;
3157 }
3158
3159 Ptr<SubOpRecord> subOpPtr;
3160 if (!c_subOpPool.seize(subOpPtr))
3161 {
3162 jam();
3163 c_subscriberPool.release(subbPtr);
3164 sendSubStartRef(signal,
3165 senderRef, senderData, SubStartRef::OutOfSubOpRecords);
3166 return;
3167 }
3168
3169 if (! check_sub_start(subscriberRef))
3170 {
3171 jam();
3172 c_subscriberPool.release(subbPtr);
3173 c_subOpPool.release(subOpPtr);
3174 sendSubStartRef(signal,
3175 senderRef, senderData, SubStartRef::NodeDied);
3176 return;
3177 }
3178
3179 // setup subscriber record
3180 subbPtr.p->m_senderRef = subscriberRef;
3181 subbPtr.p->m_senderData = subscriberData;
3182
3183 subOpPtr.p->m_opType = SubOpRecord::R_SUB_START_REQ;
3184 subOpPtr.p->m_subPtrI = subPtr.i;
3185 subOpPtr.p->m_senderRef = senderRef;
3186 subOpPtr.p->m_senderData = senderData;
3187 subOpPtr.p->m_subscriberRef = subbPtr.i;
3188
3189 {
3190 LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3191 subOpList.add(subOpPtr);
3192 }
3193
3194 /**
3195 * Check triggers
3196 */
3197 switch(subPtr.p->m_trigger_state){
3198 case Subscription::T_UNDEFINED:
3199 jam();
3200 /**
3201 * create triggers
3202 */
3203 create_triggers(signal, subPtr);
3204 break;
3205 case Subscription::T_CREATING:
3206 jam();
3207 /**
3208 * Triggers are already being created...wait for completion
3209 */
3210 return;
3211 case Subscription::T_DROPPING:
3212 jam();
3213 /**
3214 * Trigger(s) are being dropped...wait for completion
3215 * (and recreate them when done)
3216 */
3217 break;
3218 case Subscription::T_DEFINED:{
3219 jam();
3220 report_sub_start_conf(signal, subPtr);
3221 return;
3222 }
3223 case Subscription::T_ERROR:
3224 jam();
3225 ndbrequire(false); // Checked above
3226 break;
3227 }
3228 }
3229
3230 void
sendSubStartRef(Signal * signal,Uint32 dstref,Uint32 data,Uint32 err)3231 Suma::sendSubStartRef(Signal* signal, Uint32 dstref, Uint32 data, Uint32 err)
3232 {
3233 jam();
3234 SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
3235 ref->senderRef = reference();
3236 ref->senderData = data;
3237 ref->errorCode = err;
3238 sendSignal(dstref, GSN_SUB_START_REF, signal,
3239 SubStartRef::SignalLength, JBB);
3240 }
3241
3242 void
create_triggers(Signal * signal,SubscriptionPtr subPtr)3243 Suma::create_triggers(Signal* signal, SubscriptionPtr subPtr)
3244 {
3245 jam();
3246
3247 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_UNDEFINED);
3248 subPtr.p->m_trigger_state = Subscription::T_CREATING;
3249
3250 TablePtr tabPtr;
3251 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3252
3253 AttributeMask attrMask;
3254 tabPtr.p->createAttributeMask(attrMask, *this);
3255
3256 subPtr.p->m_outstanding_trigger = 3;
3257 for(Uint32 j = 0; j<3; j++)
3258 {
3259 Uint32 triggerId = (tabPtr.p->m_schemaVersion << 18) | (j << 16) | subPtr.i;
3260 ndbrequire(subPtr.p->m_triggers[j] == ILLEGAL_TRIGGER_ID);
3261
3262 CreateTrigImplReq * const req =
3263 (CreateTrigImplReq*)signal->getDataPtrSend();
3264 req->senderRef = SUMA_REF;
3265 req->senderData = subPtr.i;
3266 req->requestType = 0;
3267
3268 Uint32 ti = 0;
3269 TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3270 TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3271 TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3272 TriggerInfo::setMonitorReplicas(ti, true);
3273 //TriggerInfo::setMonitorAllAttributes(ti, j == TriggerEvent::TE_DELETE);
3274 TriggerInfo::setMonitorAllAttributes(ti, true);
3275 TriggerInfo::setReportAllMonitoredAttributes(ti,
3276 subPtr.p->m_options & Subscription::REPORT_ALL);
3277 req->triggerInfo = ti;
3278
3279 req->receiverRef = SUMA_REF;
3280 req->triggerId = triggerId;
3281 req->tableId = subPtr.p->m_tableId;
3282 req->tableVersion = 0; // not used
3283 req->indexId = ~(Uint32)0;
3284 req->indexVersion = 0;
3285
3286 LinearSectionPtr ptr[3];
3287 ptr[0].p = attrMask.rep.data;
3288 ptr[0].sz = attrMask.getSizeInWords();
3289 sendSignal(DBTUP_REF, GSN_CREATE_TRIG_IMPL_REQ,
3290 signal, CreateTrigImplReq::SignalLength, JBB, ptr, 1);
3291 }
3292 }
3293
3294 void
execCREATE_TRIG_IMPL_CONF(Signal * signal)3295 Suma::execCREATE_TRIG_IMPL_CONF(Signal* signal)
3296 {
3297 jamEntry();
3298
3299 CreateTrigImplConf * conf = (CreateTrigImplConf*)signal->getDataPtr();
3300 const Uint32 triggerId = conf->triggerId;
3301 Uint32 type = (triggerId >> 16) & 0x3;
3302 Uint32 tableId = conf->tableId;
3303
3304 TablePtr tabPtr;
3305 SubscriptionPtr subPtr;
3306 c_subscriptions.getPtr(subPtr, conf->senderData);
3307 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3308
3309 ndbrequire(tabPtr.p->m_tableId == tableId);
3310 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3311
3312 ndbrequire(type < 3);
3313 ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3314 subPtr.p->m_triggers[type] = triggerId;
3315
3316 ndbrequire(subPtr.p->m_outstanding_trigger);
3317 subPtr.p->m_outstanding_trigger--;
3318
3319 if (subPtr.p->m_outstanding_trigger)
3320 {
3321 jam();
3322 /**
3323 * Wait for more
3324 */
3325 return;
3326 }
3327
3328 if (subPtr.p->m_errorCode == 0)
3329 {
3330 jam();
3331 subPtr.p->m_trigger_state = Subscription::T_DEFINED;
3332 report_sub_start_conf(signal, subPtr);
3333 }
3334 else
3335 {
3336 jam();
3337 subPtr.p->m_trigger_state = Subscription::T_ERROR;
3338 drop_triggers(signal, subPtr);
3339 }
3340 }
3341
3342 void
execCREATE_TRIG_IMPL_REF(Signal * signal)3343 Suma::execCREATE_TRIG_IMPL_REF(Signal* signal)
3344 {
3345 jamEntry();
3346
3347 CreateTrigImplRef * const ref = (CreateTrigImplRef*)signal->getDataPtr();
3348 const Uint32 triggerId = ref->triggerId;
3349 Uint32 type = (triggerId >> 16) & 0x3;
3350 Uint32 tableId = ref->tableId;
3351
3352 TablePtr tabPtr;
3353 SubscriptionPtr subPtr;
3354 c_subscriptions.getPtr(subPtr, ref->senderData);
3355 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3356
3357 ndbrequire(tabPtr.p->m_tableId == tableId);
3358 ndbrequire(subPtr.p->m_trigger_state == Subscription::T_CREATING);
3359
3360 ndbrequire(type < 3);
3361 ndbrequire(subPtr.p->m_triggers[type] == ILLEGAL_TRIGGER_ID);
3362
3363 subPtr.p->m_errorCode = ref->errorCode;
3364
3365 ndbrequire(subPtr.p->m_outstanding_trigger);
3366 subPtr.p->m_outstanding_trigger--;
3367
3368 if (subPtr.p->m_outstanding_trigger)
3369 {
3370 jam();
3371 /**
3372 * Wait for more
3373 */
3374 return;
3375 }
3376
3377 subPtr.p->m_trigger_state = Subscription::T_ERROR;
3378 drop_triggers(signal, subPtr);
3379 }
3380
3381 bool
check_sub_start(Uint32 subscriberRef)3382 Suma::check_sub_start(Uint32 subscriberRef)
3383 {
3384 Uint32 nodeId = refToNode(subscriberRef);
3385 bool startme = c_startup.m_restart_server_node_id;
3386 bool handover = c_startup.m_wait_handover;
3387 bool connected =
3388 c_failedApiNodes.get(nodeId) == false &&
3389 c_connected_nodes.get(nodeId);
3390
3391 return (startme || handover || connected);
3392 }
3393
3394 void
report_sub_start_conf(Signal * signal,Ptr<Subscription> subPtr)3395 Suma::report_sub_start_conf(Signal* signal, Ptr<Subscription> subPtr)
3396 {
3397 const Uint64 gci = get_current_gci(signal);
3398 {
3399 LocalDLList<Subscriber> list(c_subscriberPool,
3400 subPtr.p->m_subscribers);
3401 LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3402
3403 Ptr<Subscriber> ptr;
3404 Ptr<SubOpRecord> subOpPtr;
3405 for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3406 {
3407 jam();
3408
3409 Uint32 senderRef = subOpPtr.p->m_senderRef;
3410 Uint32 senderData = subOpPtr.p->m_senderData;
3411 c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3412
3413 if (check_sub_start(ptr.p->m_senderRef))
3414 {
3415 SubStartConf* conf = (SubStartConf*)signal->getDataPtrSend();
3416 conf->senderRef = reference();
3417 conf->senderData = senderData;
3418 conf->subscriptionId = subPtr.p->m_subscriptionId;
3419 conf->subscriptionKey = subPtr.p->m_subscriptionKey;
3420 conf->firstGCI = Uint32(gci >> 32);
3421 conf->part = SubscriptionData::TableData;
3422 conf->bucketCount = c_no_of_buckets;
3423 conf->nodegroup = c_nodeGroup;
3424 sendSignal(senderRef, GSN_SUB_START_CONF, signal,
3425 SubStartConf::SignalLength, JBB);
3426
3427 /**
3428 * Call before adding to list...
3429 * cause method will (maybe) iterate thought list
3430 */
3431 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3432 send_sub_start_stop_event(signal, ptr,NdbDictionary::Event::_TE_ACTIVE,
3433 report, list);
3434
3435 list.add(ptr);
3436 c_subscriber_nodes.set(refToNode(ptr.p->m_senderRef));
3437 c_subscriber_per_node[refToNode(ptr.p->m_senderRef)]++;
3438 }
3439 else
3440 {
3441 jam();
3442
3443 sendSubStartRef(signal,
3444 senderRef, senderData, SubStartRef::NodeDied);
3445
3446 c_subscriberPool.release(ptr);
3447 }
3448
3449 Ptr<SubOpRecord> tmp = subOpPtr;
3450 subOpList.next(subOpPtr);
3451 subOpList.release(tmp);
3452 }
3453 }
3454
3455 check_release_subscription(signal, subPtr);
3456 }
3457
3458 void
report_sub_start_ref(Signal * signal,Ptr<Subscription> subPtr,Uint32 errCode)3459 Suma::report_sub_start_ref(Signal* signal,
3460 Ptr<Subscription> subPtr,
3461 Uint32 errCode)
3462 {
3463 LocalDLList<Subscriber> list(c_subscriberPool,
3464 subPtr.p->m_subscribers);
3465 LocalDLFifoList<SubOpRecord> subOpList(c_subOpPool, subPtr.p->m_start_req);
3466
3467 Ptr<Subscriber> ptr;
3468 Ptr<SubOpRecord> subOpPtr;
3469 for (subOpList.first(subOpPtr); !subOpPtr.isNull(); )
3470 {
3471 jam();
3472
3473 Uint32 senderRef = subOpPtr.p->m_senderRef;
3474 Uint32 senderData = subOpPtr.p->m_senderData;
3475 c_subscriberPool.getPtr(ptr, subOpPtr.p->m_subscriberRef);
3476
3477 SubStartRef* ref = (SubStartRef*)signal->getDataPtrSend();
3478 ref->senderRef = reference();
3479 ref->senderData = senderData;
3480 ref->errorCode = errCode;
3481
3482 sendSignal(senderRef, GSN_SUB_START_REF, signal,
3483 SubStartConf::SignalLength, JBB);
3484
3485
3486 Ptr<SubOpRecord> tmp = subOpPtr;
3487 subOpList.next(subOpPtr);
3488 subOpList.release(tmp);
3489 c_subscriberPool.release(ptr);
3490 }
3491 }
3492
3493 void
drop_triggers(Signal * signal,SubscriptionPtr subPtr)3494 Suma::drop_triggers(Signal* signal, SubscriptionPtr subPtr)
3495 {
3496 jam();
3497
3498 subPtr.p->m_outstanding_trigger = 0;
3499
3500 Ptr<Table> tabPtr;
3501 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3502 if (tabPtr.p->m_state == Table::DROPPED)
3503 {
3504 jam();
3505 subPtr.p->m_triggers[0] = ILLEGAL_TRIGGER_ID;
3506 subPtr.p->m_triggers[1] = ILLEGAL_TRIGGER_ID;
3507 subPtr.p->m_triggers[2] = ILLEGAL_TRIGGER_ID;
3508 }
3509 else
3510 {
3511 for(Uint32 j = 0; j<3; j++)
3512 {
3513 jam();
3514 Uint32 triggerId = subPtr.p->m_triggers[j];
3515 if (triggerId != ILLEGAL_TRIGGER_ID)
3516 {
3517 subPtr.p->m_outstanding_trigger++;
3518
3519 DropTrigImplReq * const req =
3520 (DropTrigImplReq*)signal->getDataPtrSend();
3521 req->senderRef = SUMA_REF; // Sending to myself
3522 req->senderData = subPtr.i;
3523 req->requestType = 0;
3524
3525 // TUP needs some triggerInfo to find right list
3526 Uint32 ti = 0;
3527 TriggerInfo::setTriggerType(ti, TriggerType::SUBSCRIPTION_BEFORE);
3528 TriggerInfo::setTriggerActionTime(ti, TriggerActionTime::TA_DETACHED);
3529 TriggerInfo::setTriggerEvent(ti, (TriggerEvent::Value)j);
3530 TriggerInfo::setMonitorReplicas(ti, true);
3531 //TriggerInfo::setMonitorAllAttributes(ti, j ==TriggerEvent::TE_DELETE);
3532 TriggerInfo::setMonitorAllAttributes(ti, true);
3533 TriggerInfo::setReportAllMonitoredAttributes(ti,
3534 subPtr.p->m_options & Subscription::REPORT_ALL);
3535 req->triggerInfo = ti;
3536
3537 req->tableId = subPtr.p->m_tableId;
3538 req->tableVersion = 0; // not used
3539 req->indexId = RNIL;
3540 req->indexVersion = 0;
3541 req->triggerId = triggerId;
3542 req->receiverRef = SUMA_REF;
3543
3544 c_outstanding_drop_trig_req++;
3545 sendSignal(DBTUP_REF, GSN_DROP_TRIG_IMPL_REQ,
3546 signal, DropTrigImplReq::SignalLength, JBB);
3547 }
3548 }
3549 }
3550
3551 if (subPtr.p->m_outstanding_trigger == 0)
3552 {
3553 jam();
3554 drop_triggers_complete(signal, subPtr);
3555 }
3556 }
3557
3558 void
execDROP_TRIG_IMPL_REF(Signal * signal)3559 Suma::execDROP_TRIG_IMPL_REF(Signal* signal)
3560 {
3561 jamEntry();
3562 DropTrigImplRef * const ref = (DropTrigImplRef*)signal->getDataPtr();
3563 Ptr<Table> tabPtr;
3564 Ptr<Subscription> subPtr;
3565 const Uint32 triggerId = ref->triggerId;
3566 const Uint32 type = (triggerId >> 16) & 0x3;
3567
3568 c_subscriptionPool.getPtr(subPtr, ref->senderData);
3569 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3570 ndbrequire(tabPtr.p->m_tableId == ref->tableId);
3571
3572 ndbrequire(type < 3);
3573 ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3574 subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3575
3576 ndbrequire(subPtr.p->m_outstanding_trigger);
3577 subPtr.p->m_outstanding_trigger--;
3578
3579 ndbrequire(c_outstanding_drop_trig_req);
3580 c_outstanding_drop_trig_req--;
3581
3582 if (subPtr.p->m_outstanding_trigger)
3583 {
3584 jam();
3585 /**
3586 * Wait for more
3587 */
3588 return;
3589 }
3590
3591 drop_triggers_complete(signal, subPtr);
3592 }
3593
3594 void
execDROP_TRIG_IMPL_CONF(Signal * signal)3595 Suma::execDROP_TRIG_IMPL_CONF(Signal* signal)
3596 {
3597 jamEntry();
3598
3599 DropTrigImplConf * const conf = (DropTrigImplConf*)signal->getDataPtr();
3600
3601 Ptr<Table> tabPtr;
3602 Ptr<Subscription> subPtr;
3603 const Uint32 triggerId = conf->triggerId;
3604 const Uint32 type = (triggerId >> 16) & 0x3;
3605
3606 c_subscriptionPool.getPtr(subPtr, conf->senderData);
3607 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
3608 ndbrequire(tabPtr.p->m_tableId == conf->tableId);
3609
3610 ndbrequire(type < 3);
3611 ndbrequire(subPtr.p->m_triggers[type] != ILLEGAL_TRIGGER_ID);
3612 subPtr.p->m_triggers[type] = ILLEGAL_TRIGGER_ID;
3613
3614 ndbrequire(subPtr.p->m_outstanding_trigger);
3615 subPtr.p->m_outstanding_trigger--;
3616
3617 ndbrequire(c_outstanding_drop_trig_req);
3618 c_outstanding_drop_trig_req--;
3619
3620 if (subPtr.p->m_outstanding_trigger)
3621 {
3622 jam();
3623 /**
3624 * Wait for more
3625 */
3626 return;
3627 }
3628
3629 drop_triggers_complete(signal, subPtr);
3630 }
3631
3632 void
drop_triggers_complete(Signal * signal,Ptr<Subscription> subPtr)3633 Suma::drop_triggers_complete(Signal* signal, Ptr<Subscription> subPtr)
3634 {
3635 switch(subPtr.p->m_trigger_state){
3636 case Subscription::T_UNDEFINED:
3637 case Subscription::T_CREATING:
3638 case Subscription::T_DEFINED:
3639 jam();
3640 ndbrequire(false);
3641 break;
3642 case Subscription::T_DROPPING:
3643 jam();
3644 /**
3645 */
3646 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3647 if (!subPtr.p->m_start_req.isEmpty())
3648 {
3649 jam();
3650 create_triggers(signal, subPtr);
3651 return;
3652 }
3653 break;
3654 case Subscription::T_ERROR:
3655 jam();
3656 Uint32 err = subPtr.p->m_errorCode;
3657 subPtr.p->m_trigger_state = Subscription::T_UNDEFINED;
3658 subPtr.p->m_errorCode = 0;
3659 report_sub_start_ref(signal, subPtr, err);
3660 break;
3661 }
3662
3663 check_release_subscription(signal, subPtr);
3664 }
3665
3666 /**********************************************************
3667 * Suma participant interface
3668 *
3669 * Stopping and removing of subscriber
3670 *
3671 */
3672
3673 void
execSUB_STOP_REQ(Signal * signal)3674 Suma::execSUB_STOP_REQ(Signal* signal){
3675 jamEntry();
3676 ndbassert(signal->getNoOfSections() == 0);
3677 DBUG_ENTER("Suma::execSUB_STOP_REQ");
3678
3679 CRASH_INSERTION(13019);
3680
3681 SubStopReq * const req = (SubStopReq*)signal->getDataPtr();
3682 Uint32 senderRef = req->senderRef;
3683 Uint32 senderData = req->senderData;
3684 Uint32 subscriberRef = req->subscriberRef;
3685 Uint32 subscriberData = req->subscriberData;
3686 SubscriptionPtr subPtr;
3687 Subscription key;
3688 key.m_subscriptionId = req->subscriptionId;
3689 key.m_subscriptionKey = req->subscriptionKey;
3690 bool abortStart = (req->requestInfo & SubStopReq::RI_ABORT_START);
3691
3692 if (c_startup.m_restart_server_node_id == RNIL)
3693 {
3694 jam();
3695
3696 /**
3697 * We havent started syncing yet
3698 */
3699 sendSubStopRef(signal,
3700 senderRef, senderData, SubStopRef::NotStarted);
3701 return;
3702 }
3703
3704 bool found = c_subscriptions.find(subPtr, key);
3705 if (!found)
3706 {
3707 jam();
3708 sendSubStopRef(signal,
3709 senderRef, senderData, SubStopRef::NoSuchSubscription);
3710 return;
3711 }
3712
3713 switch(subPtr.p->m_state){
3714 case Subscription::UNDEFINED:
3715 jam();
3716 ndbrequire(false);
3717 case Subscription::DEFINING:
3718 jam();
3719 sendSubStopRef(signal,
3720 senderRef, senderData, SubStopRef::Defining);
3721 return;
3722 case Subscription::DEFINED:
3723 jam();
3724 break;
3725 }
3726
3727 Ptr<SubOpRecord> subOpPtr;
3728 LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3729 bool empty = list.isEmpty();
3730 if (list.seize(subOpPtr) == false)
3731 {
3732 jam();
3733 sendSubStopRef(signal,
3734 senderRef, senderData, SubStopRef::OutOfSubOpRecords);
3735 return;
3736 }
3737
3738 if (abortStart)
3739 {
3740 jam();
3741 subOpPtr.p->m_opType = SubOpRecord::R_SUB_ABORT_START_REQ;
3742 }
3743 else
3744 {
3745 jam();
3746 subOpPtr.p->m_opType = SubOpRecord::R_SUB_STOP_REQ;
3747 }
3748 subOpPtr.p->m_subPtrI = subPtr.i;
3749 subOpPtr.p->m_senderRef = senderRef;
3750 subOpPtr.p->m_senderData = senderData;
3751 subOpPtr.p->m_subscriberRef = subscriberRef;
3752 subOpPtr.p->m_subscriberData = subscriberData;
3753
3754
3755 if (empty)
3756 {
3757 jam();
3758 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3759 signal->theData[1] = subOpPtr.i;
3760 signal->theData[2] = RNIL;
3761 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3762 }
3763 }
3764
3765 void
sub_stop_req(Signal * signal)3766 Suma::sub_stop_req(Signal* signal)
3767 {
3768 jam();
3769
3770 Ptr<SubOpRecord> subOpPtr;
3771 c_subOpPool.getPtr(subOpPtr, signal->theData[1]);
3772
3773 Ptr<Subscription> subPtr;
3774 c_subscriptionPool.getPtr(subPtr, subOpPtr.p->m_subPtrI);
3775
3776 Ptr<Subscriber> ptr;
3777 {
3778 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3779 if (signal->theData[2] == RNIL)
3780 {
3781 jam();
3782 list.first(ptr);
3783 }
3784 else
3785 {
3786 jam();
3787 list.getPtr(ptr, signal->theData[2]);
3788 }
3789
3790 for (Uint32 i = 0; i<32 && !ptr.isNull(); i++, list.next(ptr))
3791 {
3792 if (ptr.p->m_senderRef == subOpPtr.p->m_subscriberRef &&
3793 ptr.p->m_senderData == subOpPtr.p->m_subscriberData)
3794 {
3795 jam();
3796 goto found;
3797 }
3798 }
3799 }
3800
3801 if (ptr.isNull())
3802 {
3803 jam();
3804 sendSubStopRef(signal,
3805 subOpPtr.p->m_senderRef,
3806 subOpPtr.p->m_senderData,
3807 SubStopRef::NoSuchSubscriber);
3808 check_remove_queue(signal, subPtr, subOpPtr, true, true);
3809 return;
3810 }
3811
3812 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3813 signal->theData[1] = subOpPtr.i;
3814 signal->theData[2] = ptr.i;
3815 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3816 return;
3817
3818 found:
3819 {
3820 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
3821 list.remove(ptr);
3822 /**
3823 * NOTE: remove before...so we done send UNSUBSCRIBE to self (yuck)
3824 */
3825 bool report = subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE;
3826 report_sub_stop_conf(signal, subOpPtr, ptr, report, list);
3827 c_subscriberPool.release(ptr);
3828 }
3829 check_remove_queue(signal, subPtr, subOpPtr, true, true);
3830 check_release_subscription(signal, subPtr);
3831 }
3832
3833 void
check_remove_queue(Signal * signal,Ptr<Subscription> subPtr,Ptr<SubOpRecord> subOpPtr,bool ishead,bool dorelease)3834 Suma::check_remove_queue(Signal* signal,
3835 Ptr<Subscription> subPtr,
3836 Ptr<SubOpRecord> subOpPtr,
3837 bool ishead,
3838 bool dorelease)
3839 {
3840 LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
3841
3842 {
3843 Ptr<SubOpRecord> tmp;
3844 list.first(tmp);
3845 if (ishead)
3846 {
3847 jam();
3848 ndbrequire(tmp.i == subOpPtr.i);
3849 }
3850 else
3851 {
3852 jam();
3853 ishead = (tmp.i == subOpPtr.i);
3854 }
3855 }
3856
3857 if (dorelease)
3858 {
3859 jam();
3860 list.release(subOpPtr);
3861 }
3862 else
3863 {
3864 jam();
3865 list.remove(subOpPtr);
3866 }
3867
3868 if (ishead)
3869 {
3870 jam();
3871 if (list.first(subOpPtr) == false)
3872 {
3873 jam();
3874 c_restart.m_waiting_on_self = 1;
3875 return;
3876 }
3877 // Fall through
3878 }
3879 else
3880 {
3881 jam();
3882 return;
3883 }
3884
3885 switch(subOpPtr.p->m_opType){
3886 case SubOpRecord::R_SUB_ABORT_START_REQ:
3887 case SubOpRecord::R_SUB_STOP_REQ:
3888 jam();
3889 signal->theData[0] = SumaContinueB::SUB_STOP_REQ;
3890 signal->theData[1] = subOpPtr.i;
3891 signal->theData[2] = RNIL;
3892 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3893 return;
3894 case SubOpRecord::R_API_FAIL_REQ:
3895 jam();
3896 signal->theData[0] = SumaContinueB::API_FAIL_SUBSCRIPTION;
3897 signal->theData[1] = subOpPtr.i;
3898 signal->theData[2] = RNIL;
3899 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 3, JBB);
3900 return;
3901 case SubOpRecord::R_START_ME_REQ:
3902 jam();
3903 sendSubCreateReq(signal, subPtr);
3904 return;
3905 }
3906 }
3907
3908 void
report_sub_stop_conf(Signal * signal,Ptr<SubOpRecord> subOpPtr,Ptr<Subscriber> ptr,bool report,LocalDLList<Subscriber> & list)3909 Suma::report_sub_stop_conf(Signal* signal,
3910 Ptr<SubOpRecord> subOpPtr,
3911 Ptr<Subscriber> ptr,
3912 bool report,
3913 LocalDLList<Subscriber>& list)
3914 {
3915 jam();
3916 CRASH_INSERTION(13020);
3917
3918 Uint32 senderRef = subOpPtr.p->m_senderRef;
3919 Uint32 senderData = subOpPtr.p->m_senderData;
3920 bool abortStart = subOpPtr.p->m_opType == SubOpRecord::R_SUB_ABORT_START_REQ;
3921
3922 // let subscriber know that subscrber is stopped
3923 if (!abortStart)
3924 {
3925 jam();
3926 send_sub_start_stop_event(signal, ptr, NdbDictionary::Event::_TE_STOP,
3927 report, list);
3928 }
3929
3930 SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
3931 const Uint64 gci = m_max_seen_gci;
3932 conf->senderRef= reference();
3933 conf->senderData= senderData;
3934 conf->gci_hi= Uint32(gci>>32);
3935 conf->gci_lo= Uint32(gci);
3936 sendSignal(senderRef, GSN_SUB_STOP_CONF, signal,
3937 SubStopConf::SignalLength, JBB);
3938
3939 Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3940 if (c_subscriber_per_node[nodeId])
3941 {
3942 c_subscriber_per_node[nodeId]--;
3943 if (c_subscriber_per_node[nodeId] == 0)
3944 {
3945 jam();
3946 c_subscriber_nodes.clear(nodeId);
3947 }
3948 }
3949 }
3950
3951 void
sendSubStopRef(Signal * signal,Uint32 retref,Uint32 data,Uint32 errCode)3952 Suma::sendSubStopRef(Signal* signal,
3953 Uint32 retref,
3954 Uint32 data,
3955 Uint32 errCode)
3956 {
3957 jam();
3958 SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
3959 ref->senderRef = reference();
3960 ref->errorCode = errCode;
3961 ref->senderData = data;
3962 sendSignal(retref, GSN_SUB_STOP_REF, signal, SubStopRef::SignalLength, JBB);
3963 }
3964
3965 // report new started subscriber to all other subscribers
3966 void
send_sub_start_stop_event(Signal * signal,Ptr<Subscriber> ptr,NdbDictionary::Event::_TableEvent event,bool report,LocalDLList<Subscriber> & list)3967 Suma::send_sub_start_stop_event(Signal *signal,
3968 Ptr<Subscriber> ptr,
3969 NdbDictionary::Event::_TableEvent event,
3970 bool report,
3971 LocalDLList<Subscriber>& list)
3972 {
3973 const Uint64 gci = get_current_gci(signal);
3974 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
3975 Uint32 nodeId = refToNode(ptr.p->m_senderRef);
3976
3977 NdbDictionary::Event::_TableEvent other;
3978 if (event == NdbDictionary::Event::_TE_STOP)
3979 {
3980 other = NdbDictionary::Event::_TE_UNSUBSCRIBE;
3981 }
3982 else if (event == NdbDictionary::Event::_TE_ACTIVE)
3983 {
3984 other = NdbDictionary::Event::_TE_SUBSCRIBE;
3985 }
3986 else
3987 {
3988 jamLine(event);
3989 ndbrequire(false);
3990 }
3991
3992 data->gci_hi = Uint32(gci >> 32);
3993 data->gci_lo = Uint32(gci);
3994 data->tableId = 0;
3995 data->requestInfo = 0;
3996 SubTableData::setOperation(data->requestInfo, event);
3997 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
3998 SubTableData::setReqNodeId(data->requestInfo, nodeId);
3999 data->changeMask = 0;
4000 data->totalLen = 0;
4001 data->senderData = ptr.p->m_senderData;
4002 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4003 SubTableData::SignalLength, JBB);
4004
4005 if (report == false)
4006 {
4007 return;
4008 }
4009
4010 data->requestInfo = 0;
4011 SubTableData::setOperation(data->requestInfo, other);
4012 SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
4013
4014 Ptr<Subscriber> tmp;
4015 for(list.first(tmp); !tmp.isNull(); list.next(tmp))
4016 {
4017 jam();
4018 SubTableData::setReqNodeId(data->requestInfo, nodeId);
4019 data->senderData = tmp.p->m_senderData;
4020 sendSignal(tmp.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4021 SubTableData::SignalLength, JBB);
4022
4023 ndbassert(tmp.i != ptr.i); // ptr should *NOT* be in list now
4024 if (other != NdbDictionary::Event::_TE_UNSUBSCRIBE)
4025 {
4026 jam();
4027 SubTableData::setReqNodeId(data->requestInfo,
4028 refToNode(tmp.p->m_senderRef));
4029
4030 data->senderData = ptr.p->m_senderData;
4031 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4032 SubTableData::SignalLength, JBB);
4033 }
4034 }
4035 }
4036
4037 void
createAttributeMask(AttributeMask & mask,Suma & suma)4038 Suma::Table::createAttributeMask(AttributeMask& mask,
4039 Suma &suma)
4040 {
4041 mask.clear();
4042 for(Uint32 i = 0; i<m_noOfAttributes; i++)
4043 mask.set(i);
4044 }
4045
suma_ndbrequire(bool v)4046 void Suma::suma_ndbrequire(bool v) { ndbrequire(v); }
4047
4048
4049 /**********************************************************
4050 * Scan data interface
4051 *
4052 * Assumption: one execTRANSID_AI contains all attr info
4053 *
4054 */
4055
4056 #define SUMA_BUF_SZ1 MAX_KEY_SIZE_IN_WORDS + MAX_TUPLE_SIZE_IN_WORDS
4057 #define SUMA_BUF_SZ MAX_ATTRIBUTES_IN_TABLE + SUMA_BUF_SZ1
4058
4059 static Uint32 f_bufferLock = 0;
4060 static Uint32 f_buffer[SUMA_BUF_SZ];
4061 static Uint32 f_trigBufferSize = 0;
4062 static Uint32 b_bufferLock = 0;
4063 static Uint32 b_buffer[SUMA_BUF_SZ];
4064 static Uint32 b_trigBufferSize = 0;
4065
4066 void
execTRANSID_AI(Signal * signal)4067 Suma::execTRANSID_AI(Signal* signal)
4068 {
4069 jamEntry();
4070 DBUG_ENTER("Suma::execTRANSID_AI");
4071
4072 CRASH_INSERTION(13015);
4073 TransIdAI * const data = (TransIdAI*)signal->getDataPtr();
4074 const Uint32 opPtrI = data->connectPtr;
4075 Uint32 length = signal->length() - 3;
4076
4077 if(f_bufferLock == 0){
4078 f_bufferLock = opPtrI;
4079 } else {
4080 ndbrequire(f_bufferLock == opPtrI);
4081 }
4082
4083 if (signal->getNoOfSections())
4084 {
4085 SectionHandle handle(this, signal);
4086 SegmentedSectionPtr dataPtr;
4087 handle.getSection(dataPtr, 0);
4088 length = dataPtr.sz;
4089 copy(data->attrData, dataPtr);
4090 releaseSections(handle);
4091 }
4092
4093 Ptr<SyncRecord> syncPtr;
4094 c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4095
4096 Uint32 sum = 0;
4097 Uint32 * dst = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4098 Uint32 * headers = f_buffer;
4099 const Uint32 * src = &data->attrData[0];
4100 const Uint32 * const end = &src[length];
4101
4102 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4103 for(Uint32 i = 0; i<attribs; i++){
4104 Uint32 tmp = * src++;
4105 * headers++ = tmp;
4106 Uint32 len = AttributeHeader::getDataSize(tmp);
4107
4108 memcpy(dst, src, 4 * len);
4109 dst += len;
4110 src += len;
4111 sum += len;
4112 }
4113 f_trigBufferSize = sum;
4114
4115 ndbrequire(src == end);
4116
4117 if ((syncPtr.p->m_requestInfo & SubSyncReq::LM_Exclusive) == 0)
4118 {
4119 sendScanSubTableData(signal, syncPtr, 0);
4120 }
4121
4122 DBUG_VOID_RETURN;
4123 }
4124
4125 void
execKEYINFO20(Signal * signal)4126 Suma::execKEYINFO20(Signal* signal)
4127 {
4128 jamEntry();
4129 KeyInfo20* data = (KeyInfo20*)signal->getDataPtr();
4130
4131 const Uint32 opPtrI = data->clientOpPtr;
4132 const Uint32 takeOver = data->scanInfo_Node;
4133
4134 ndbrequire(f_bufferLock == opPtrI);
4135
4136 Ptr<SyncRecord> syncPtr;
4137 c_syncPool.getPtr(syncPtr, (opPtrI >> 16));
4138 sendScanSubTableData(signal, syncPtr, takeOver);
4139 }
4140
4141 void
sendScanSubTableData(Signal * signal,Ptr<SyncRecord> syncPtr,Uint32 takeOver)4142 Suma::sendScanSubTableData(Signal* signal,
4143 Ptr<SyncRecord> syncPtr, Uint32 takeOver)
4144 {
4145 const Uint32 attribs = syncPtr.p->m_currentNoOfAttributes;
4146 const Uint32 sum = f_trigBufferSize;
4147
4148 /**
4149 * Send data to subscriber
4150 */
4151 LinearSectionPtr ptr[3];
4152 ptr[0].p = f_buffer;
4153 ptr[0].sz = attribs;
4154
4155 ptr[1].p = f_buffer + MAX_ATTRIBUTES_IN_TABLE;
4156 ptr[1].sz = sum;
4157
4158 SubscriptionPtr subPtr;
4159 c_subscriptions.getPtr(subPtr, syncPtr.p->m_subscriptionPtrI);
4160
4161
4162 /**
4163 * Initialize signal
4164 */
4165 SubTableData * sdata = (SubTableData*)signal->getDataPtrSend();
4166 Uint32 ref = syncPtr.p->m_senderRef;
4167 sdata->tableId = syncPtr.p->m_tableId;
4168 sdata->senderData = syncPtr.p->m_senderData;
4169 sdata->requestInfo = 0;
4170 SubTableData::setOperation(sdata->requestInfo,
4171 NdbDictionary::Event::_TE_SCAN); // Scan
4172 sdata->gci_hi = 0; // Undefined
4173 sdata->gci_lo = 0;
4174 sdata->takeOver = takeOver;
4175 #if PRINT_ONLY
4176 ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
4177 #else
4178 sendSignal(ref,
4179 GSN_SUB_TABLE_DATA,
4180 signal,
4181 SubTableData::SignalLength, JBB,
4182 ptr, 2);
4183 #endif
4184
4185 /**
4186 * Reset f_bufferLock
4187 */
4188 f_bufferLock = 0;
4189 }
4190
4191 /**********************************************************
4192 *
4193 * Trigger data interface
4194 *
4195 */
4196
4197 void
execTRIG_ATTRINFO(Signal * signal)4198 Suma::execTRIG_ATTRINFO(Signal* signal)
4199 {
4200 jamEntry();
4201 DBUG_ENTER("Suma::execTRIG_ATTRINFO");
4202
4203 CRASH_INSERTION(13016);
4204 TrigAttrInfo* const trg = (TrigAttrInfo*)signal->getDataPtr();
4205 const Uint32 trigId = trg->getTriggerId();
4206
4207 const Uint32 dataLen = signal->length() - TrigAttrInfo::StaticLength;
4208
4209 if(trg->getAttrInfoType() == TrigAttrInfo::BEFORE_VALUES){
4210 jam();
4211
4212 ndbrequire(b_bufferLock == trigId);
4213
4214 memcpy(b_buffer + b_trigBufferSize, trg->getData(), 4 * dataLen);
4215 b_trigBufferSize += dataLen;
4216
4217 // printf("before values %u %u %u\n",trigId, dataLen, b_trigBufferSize);
4218 } else {
4219 jam();
4220
4221 if(f_bufferLock == 0){
4222 f_bufferLock = trigId;
4223 f_trigBufferSize = 0;
4224 b_bufferLock = trigId;
4225 b_trigBufferSize = 0;
4226 } else {
4227 ndbrequire(f_bufferLock == trigId);
4228 }
4229
4230 memcpy(f_buffer + f_trigBufferSize, trg->getData(), 4 * dataLen);
4231 f_trigBufferSize += dataLen;
4232 }
4233
4234
4235 DBUG_VOID_RETURN;
4236 }
4237
4238 #ifdef NODEFAIL_DEBUG2
4239 static int theCounts[64] = {0};
4240 #endif
4241
4242 Uint32
get_responsible_node(Uint32 bucket) const4243 Suma::get_responsible_node(Uint32 bucket) const
4244 {
4245 // id will contain id to responsible suma or
4246 // RNIL if we don't have nodegroup info yet
4247
4248 jam();
4249 Uint32 node;
4250 const Bucket* ptr= c_buckets + bucket;
4251 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4252 {
4253 node= ptr->m_nodes[i];
4254 if(c_alive_nodes.get(node))
4255 {
4256 #ifdef NODEFAIL_DEBUG2
4257 theCounts[node]++;
4258 ndbout_c("Suma:responsible n=%u, D=%u, id = %u, count=%u",
4259 n,D, id, theCounts[node]);
4260 #endif
4261 return node;
4262 }
4263 }
4264
4265 return 0;
4266 }
4267
4268 Uint32
get_responsible_node(Uint32 bucket,const NdbNodeBitmask & mask) const4269 Suma::get_responsible_node(Uint32 bucket, const NdbNodeBitmask& mask) const
4270 {
4271 jam();
4272 Uint32 node;
4273 const Bucket* ptr= c_buckets + bucket;
4274 for(Uint32 i = 0; i<MAX_REPLICAS; i++)
4275 {
4276 node= ptr->m_nodes[i];
4277 if(mask.get(node))
4278 {
4279 return node;
4280 }
4281 }
4282
4283 return 0;
4284 }
4285
4286 bool
check_switchover(Uint32 bucket,Uint64 gci)4287 Suma::check_switchover(Uint32 bucket, Uint64 gci)
4288 {
4289 const Uint32 send_mask =
4290 Bucket::BUCKET_STARTING |
4291 Bucket::BUCKET_TAKEOVER |
4292 Bucket::BUCKET_SHUTDOWN_TO;
4293
4294 bool send = c_buckets[bucket].m_state & send_mask;
4295 ndbassert(m_switchover_buckets.get(bucket));
4296 if(unlikely(gci > c_buckets[bucket].m_switchover_gci))
4297 {
4298 return send;
4299 }
4300 return !send;
4301 }
4302
4303 static
4304 Uint32
reformat(Signal * signal,LinearSectionPtr ptr[3],Uint32 * src_1,Uint32 sz_1,Uint32 * src_2,Uint32 sz_2)4305 reformat(Signal* signal, LinearSectionPtr ptr[3],
4306 Uint32 * src_1, Uint32 sz_1,
4307 Uint32 * src_2, Uint32 sz_2)
4308 {
4309 Uint32 noOfAttrs = 0, dataLen = 0;
4310 Uint32 * headers = signal->theData + 25;
4311 Uint32 * dst = signal->theData + 25 + MAX_ATTRIBUTES_IN_TABLE;
4312
4313 ptr[0].p = headers;
4314 ptr[1].p = dst;
4315
4316 while(sz_1 > 0){
4317 Uint32 tmp = * src_1 ++;
4318 * headers ++ = tmp;
4319 Uint32 len = AttributeHeader::getDataSize(tmp);
4320 memcpy(dst, src_1, 4 * len);
4321 dst += len;
4322 src_1 += len;
4323
4324 noOfAttrs++;
4325 dataLen += len;
4326 sz_1 -= (1 + len);
4327 }
4328 assert(sz_1 == 0);
4329
4330 ptr[0].sz = noOfAttrs;
4331 ptr[1].sz = dataLen;
4332
4333 ptr[2].p = src_2;
4334 ptr[2].sz = sz_2;
4335
4336 return sz_2 > 0 ? 3 : 2;
4337 }
4338
4339 /**
4340 * Pass entire pages with SUMA-trigger-data from
4341 * TUP to SUMA to avoid extensive LongSignalMessage buffer contention
4342 */
4343 void
execFIRE_TRIG_ORD_L(Signal * signal)4344 Suma::execFIRE_TRIG_ORD_L(Signal* signal)
4345 {
4346 jamEntry();
4347
4348 ndbassert(signal->getNoOfSections() == 0);
4349 Uint32 pageId = signal->theData[0];
4350 Uint32 len = signal->theData[1];
4351
4352 if (pageId == RNIL && len == 0)
4353 {
4354 jam();
4355 /**
4356 * Out of memory
4357 */
4358 out_of_buffer(signal);
4359 return;
4360 }
4361
4362 Uint32 * ptr = reinterpret_cast<Uint32*>(c_page_pool.getPtr(pageId));
4363 while (len)
4364 {
4365 Uint32 * save = ptr;
4366 Uint32 msglen = * ptr++;
4367 Uint32 siglen = * ptr++;
4368 Uint32 sec0len = * ptr++;
4369 Uint32 sec1len = * ptr++;
4370 Uint32 sec2len = * ptr++;
4371
4372 /**
4373 * Copy value directly into local buffers
4374 */
4375 Uint32 trigId = ((FireTrigOrd*)ptr)->getTriggerId();
4376 memcpy(signal->theData, ptr, 4 * siglen); // signal
4377 ptr += siglen;
4378 memcpy(f_buffer, ptr, 4*sec0len);
4379 ptr += sec0len;
4380 memcpy(b_buffer, ptr, 4*sec1len);
4381 ptr += sec1len;
4382 memcpy(f_buffer + sec0len, ptr, 4*sec2len);
4383 ptr += sec2len;
4384
4385 f_trigBufferSize = sec0len + sec2len;
4386 b_trigBufferSize = sec1len;
4387 f_bufferLock = trigId;
4388 b_bufferLock = trigId;
4389
4390 execFIRE_TRIG_ORD(signal);
4391
4392 ndbrequire(ptr == save + msglen);
4393 ndbrequire(len >= msglen);
4394 len -= msglen;
4395 }
4396
4397 m_ctx.m_mm.release_page(RT_DBTUP_PAGE, pageId);
4398 }
4399
4400 void
execFIRE_TRIG_ORD(Signal * signal)4401 Suma::execFIRE_TRIG_ORD(Signal* signal)
4402 {
4403 jamEntry();
4404 DBUG_ENTER("Suma::execFIRE_TRIG_ORD");
4405
4406 CRASH_INSERTION(13016);
4407 FireTrigOrd* const trg = (FireTrigOrd*)signal->getDataPtr();
4408 const Uint32 trigId = trg->getTriggerId();
4409 const Uint32 hashValue = trg->getHashValue();
4410 const Uint32 gci_hi = trg->getGCI();
4411 const Uint32 gci_lo = trg->m_gci_lo;
4412 const Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4413 const Uint32 event = trg->getTriggerEvent();
4414 const Uint32 any_value = trg->getAnyValue();
4415 const Uint32 transId1 = trg->m_transId1;
4416 const Uint32 transId2 = trg->m_transId2;
4417
4418 Ptr<Subscription> subPtr;
4419 c_subscriptionPool.getPtr(subPtr, trigId & 0xFFFF);
4420
4421 ndbassert(gci > m_last_complete_gci);
4422
4423 if (signal->getNoOfSections())
4424 {
4425 jam();
4426 ndbassert(isNdbMtLqh());
4427 SectionHandle handle(this, signal);
4428
4429 ndbrequire(b_bufferLock == 0);
4430 ndbrequire(f_bufferLock == 0);
4431 f_bufferLock = trigId;
4432 b_bufferLock = trigId;
4433
4434 SegmentedSectionPtr ptr;
4435 handle.getSection(ptr, 0); // Keys
4436 Uint32 sz = ptr.sz;
4437 copy(f_buffer, ptr);
4438
4439 handle.getSection(ptr, 2); // After values
4440 copy(f_buffer + sz, ptr);
4441 f_trigBufferSize = sz + ptr.sz;
4442
4443 handle.getSection(ptr, 1); // Before values
4444 copy(b_buffer, ptr);
4445 b_trigBufferSize = ptr.sz;
4446 releaseSections(handle);
4447 }
4448
4449 jam();
4450 ndbrequire(f_bufferLock == trigId);
4451 /**
4452 * Reset f_bufferLock
4453 */
4454 f_bufferLock = 0;
4455 b_bufferLock = 0;
4456
4457 Uint32 tableId = subPtr.p->m_tableId;
4458 Uint32 schemaVersion =
4459 c_tablePool.getPtr(subPtr.p->m_table_ptrI)->m_schemaVersion;
4460
4461 Uint32 bucket= hashValue % c_no_of_buckets;
4462 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4463 if(m_active_buckets.get(bucket) ||
4464 (m_switchover_buckets.get(bucket) && (check_switchover(bucket, gci))))
4465 {
4466 m_max_sent_gci = (gci > m_max_sent_gci ? gci : m_max_sent_gci);
4467 Uint32 sz = trg->getNoOfPrimaryKeyWords()+trg->getNoOfAfterValueWords();
4468 ndbrequire(sz == f_trigBufferSize);
4469
4470 LinearSectionPtr ptr[3];
4471 const Uint32 nptr= reformat(signal, ptr,
4472 f_buffer, f_trigBufferSize,
4473 b_buffer, b_trigBufferSize);
4474 Uint32 ptrLen= 0;
4475 for(Uint32 i =0; i < nptr; i++)
4476 ptrLen+= ptr[i].sz;
4477 /**
4478 * Signal to subscriber(s)
4479 */
4480 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
4481 data->gci_hi = gci_hi;
4482 data->gci_lo = gci_lo;
4483 data->tableId = tableId;
4484 data->requestInfo = 0;
4485 SubTableData::setOperation(data->requestInfo, event);
4486 data->flags = 0;
4487 data->anyValue = any_value;
4488 data->totalLen = ptrLen;
4489 data->transId1 = transId1;
4490 data->transId2 = transId2;
4491
4492 {
4493 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4494 SubscriberPtr subbPtr;
4495 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
4496 {
4497 data->senderData = subbPtr.p->m_senderData;
4498 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4499 SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
4500 }
4501 }
4502 }
4503 else
4504 {
4505 const uint buffer_header_sz = 6;
4506 Uint32* dst;
4507 Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
4508 if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
4509 {
4510 * dst++ = subPtr.i;
4511 * dst++ = schemaVersion;
4512 * dst++ = (event << 16) | f_trigBufferSize;
4513 * dst++ = any_value;
4514 * dst++ = transId1;
4515 * dst++ = transId2;
4516 memcpy(dst, f_buffer, f_trigBufferSize << 2);
4517 dst += f_trigBufferSize;
4518 memcpy(dst, b_buffer, b_trigBufferSize << 2);
4519 }
4520 }
4521
4522 DBUG_VOID_RETURN;
4523 }
4524
4525 void
checkMaxBufferedEpochs(Signal * signal)4526 Suma::checkMaxBufferedEpochs(Signal *signal)
4527 {
4528 /*
4529 * Check if any subscribers are exceeding the MaxBufferedEpochs
4530 */
4531 Ptr<Gcp_record> gcp;
4532 jamEntry();
4533 if (c_gcp_list.isEmpty())
4534 {
4535 jam();
4536 return;
4537 }
4538 c_gcp_list.first(gcp);
4539 if (ERROR_INSERTED(13037))
4540 {
4541 jam();
4542 CLEAR_ERROR_INSERT_VALUE;
4543 ndbout_c("Simulating exceeding the MaxBufferedEpochs %u(%llu,%llu,%llu)",
4544 c_maxBufferedEpochs, m_max_seen_gci,
4545 m_last_complete_gci, gcp.p->m_gci);
4546 }
4547 else if (c_gcp_list.count() < c_maxBufferedEpochs)
4548 {
4549 return;
4550 }
4551 NodeBitmask subs = gcp.p->m_subscribers;
4552 jam();
4553 // Disconnect lagging subscribers waiting for oldest epoch
4554 ndbout_c("Found lagging epoch %llu", gcp.p->m_gci);
4555 for(Uint32 nodeId = 0; nodeId < MAX_NODES; nodeId++)
4556 {
4557 if (subs.get(nodeId))
4558 {
4559 jam();
4560 subs.clear(nodeId);
4561 // Disconnecting node
4562 signal->theData[0] = NDB_LE_SubscriptionStatus;
4563 signal->theData[1] = 1; // DISCONNECTED;
4564 signal->theData[2] = nodeId;
4565 signal->theData[3] = (Uint32) gcp.p->m_gci;
4566 signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
4567 signal->theData[5] = (Uint32) c_gcp_list.count();
4568 signal->theData[6] = c_maxBufferedEpochs;
4569 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 8, JBB);
4570
4571 /**
4572 * Force API_FAILREQ
4573 */
4574 signal->theData[0] = nodeId;
4575 sendSignal(QMGR_REF, GSN_API_FAILREQ, signal, 1, JBA);
4576 }
4577 }
4578 }
4579
4580 void
execSUB_GCP_COMPLETE_REP(Signal * signal)4581 Suma::execSUB_GCP_COMPLETE_REP(Signal* signal)
4582 {
4583 jamEntry();
4584 ndbassert(signal->getNoOfSections() == 0);
4585
4586 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
4587 Uint32 gci_hi = rep->gci_hi;
4588 Uint32 gci_lo = rep->gci_lo;
4589 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
4590
4591 if (isNdbMtLqh() && m_gcp_rep_cnt > 1)
4592 {
4593
4594 #define SSPP 0
4595
4596 if (SSPP)
4597 printf("execSUB_GCP_COMPLETE_REP(%u/%u)", gci_hi, gci_lo);
4598 jam();
4599 Uint32 min = m_min_gcp_rep_counter_index;
4600 Uint32 sz = NDB_ARRAY_SIZE(m_gcp_rep_counter);
4601 for (Uint32 i = min; i != m_max_gcp_rep_counter_index; i = (i + 1) % sz)
4602 {
4603 jam();
4604 if (m_gcp_rep_counter[i].m_gci == gci)
4605 {
4606 jam();
4607 m_gcp_rep_counter[i].m_cnt ++;
4608 if (m_gcp_rep_counter[i].m_cnt == m_gcp_rep_cnt)
4609 {
4610 jam();
4611 /**
4612 * Release this entry...
4613 */
4614 if (i != min)
4615 {
4616 jam();
4617 m_gcp_rep_counter[i] = m_gcp_rep_counter[min];
4618 }
4619 m_min_gcp_rep_counter_index = (min + 1) % sz;
4620 if (SSPP)
4621 ndbout_c(" found - complete after: (min: %u max: %u)",
4622 m_min_gcp_rep_counter_index,
4623 m_max_gcp_rep_counter_index);
4624 goto found;
4625 }
4626 else
4627 {
4628 jam();
4629 if (SSPP)
4630 ndbout_c(" found - wait unchanged: (min: %u max: %u)",
4631 m_min_gcp_rep_counter_index,
4632 m_max_gcp_rep_counter_index);
4633 return; // Wait for more...
4634 }
4635 }
4636 }
4637 /**
4638 * Not found...
4639 */
4640 Uint32 next = (m_max_gcp_rep_counter_index + 1) % sz;
4641 ndbrequire(next != min); // ring buffer full
4642 m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_gci = gci;
4643 m_gcp_rep_counter[m_max_gcp_rep_counter_index].m_cnt = 1;
4644 m_max_gcp_rep_counter_index = next;
4645 if (SSPP)
4646 ndbout_c(" new - after: (min: %u max: %u)",
4647 m_min_gcp_rep_counter_index,
4648 m_max_gcp_rep_counter_index);
4649 return;
4650 }
4651 found:
4652 bool drop = false;
4653 Uint32 flags = (m_missing_data)
4654 ? rep->flags | SubGcpCompleteRep::MISSING_DATA
4655 : rep->flags;
4656
4657 if (ERROR_INSERTED(13036))
4658 {
4659 jam();
4660 CLEAR_ERROR_INSERT_VALUE;
4661 ndbout_c("Simulating out of event buffer at node failure");
4662 flags |= SubGcpCompleteRep::MISSING_DATA;
4663 }
4664
4665 #ifdef VM_TRACE
4666 if (m_gcp_monitor == 0)
4667 {
4668 }
4669 else if (gci_hi == Uint32(m_gcp_monitor >> 32))
4670 {
4671 ndbrequire(gci_lo == Uint32(m_gcp_monitor) + 1);
4672 }
4673 else
4674 {
4675 ndbrequire(gci_hi == Uint32(m_gcp_monitor >> 32) + 1);
4676 ndbrequire(gci_lo == 0);
4677 }
4678 m_gcp_monitor = gci;
4679 #endif
4680
4681 m_last_complete_gci = gci;
4682 checkMaxBufferedEpochs(signal);
4683 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
4684
4685 /**
4686 *
4687 */
4688 if(!m_switchover_buckets.isclear())
4689 {
4690 bool unlock = false;
4691 Uint32 i = m_switchover_buckets.find(0);
4692 for(; i != Bucket_mask::NotFound; i = m_switchover_buckets.find(i + 1))
4693 {
4694 if(gci > c_buckets[i].m_switchover_gci)
4695 {
4696 Uint32 state = c_buckets[i].m_state;
4697 m_switchover_buckets.clear(i);
4698 printf("%u/%u (%u/%u) switchover complete bucket %d state: %x",
4699 Uint32(gci >> 32),
4700 Uint32(gci),
4701 Uint32(c_buckets[i].m_switchover_gci >> 32),
4702 Uint32(c_buckets[i].m_switchover_gci),
4703 i, state);
4704
4705 if(state & Bucket::BUCKET_STARTING)
4706 {
4707 /**
4708 * NR case
4709 */
4710 jam();
4711 m_active_buckets.set(i);
4712 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_STARTING;
4713 ndbout_c("starting");
4714 m_gcp_complete_rep_count++;
4715 unlock = true;
4716 }
4717 else if(state & Bucket::BUCKET_TAKEOVER)
4718 {
4719 /**
4720 * NF case
4721 */
4722 jam();
4723 Bucket* bucket= c_buckets + i;
4724 Page_pos pos= bucket->m_buffer_head;
4725 ndbrequire(pos.m_max_gci < gci);
4726
4727 Buffer_page* page= c_page_pool.getPtr(pos.m_page_id);
4728 ndbout_c("takeover %d", pos.m_page_id);
4729 page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
4730 page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
4731 ndbassert(pos.m_max_gci != 0);
4732 page->m_words_used = pos.m_page_pos;
4733 page->m_next_page = RNIL;
4734 memset(&bucket->m_buffer_head, 0, sizeof(bucket->m_buffer_head));
4735 bucket->m_buffer_head.m_page_id = RNIL;
4736 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
4737
4738 m_active_buckets.set(i);
4739 m_gcp_complete_rep_count++;
4740 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_TAKEOVER;
4741 }
4742 else if (state & Bucket::BUCKET_HANDOVER)
4743 {
4744 /**
4745 * NR, living node
4746 */
4747 jam();
4748 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_HANDOVER;
4749 m_gcp_complete_rep_count--;
4750 ndbout_c("handover");
4751 }
4752 else if (state & Bucket::BUCKET_CREATED_MASK)
4753 {
4754 jam();
4755 Uint32 cnt = state >> 8;
4756 Uint32 mask = Uint32(Bucket::BUCKET_CREATED_MASK) | (cnt << 8);
4757 c_buckets[i].m_state &= ~mask;
4758 flags |= SubGcpCompleteRep::ADD_CNT;
4759 flags |= (cnt << 16);
4760 ndbout_c("add %u %s", cnt,
4761 state & Bucket::BUCKET_CREATED_SELF ? "self" : "other");
4762 if (state & Bucket::BUCKET_CREATED_SELF &&
4763 get_responsible_node(i) == getOwnNodeId())
4764 {
4765 jam();
4766 m_active_buckets.set(i);
4767 m_gcp_complete_rep_count++;
4768 }
4769 }
4770 else if (state & Bucket::BUCKET_DROPPED_MASK)
4771 {
4772 jam();
4773 Uint32 cnt = state >> 8;
4774 Uint32 mask = Uint32(Bucket::BUCKET_DROPPED_MASK) | (cnt << 8);
4775 c_buckets[i].m_state &= ~mask;
4776 flags |= SubGcpCompleteRep::SUB_CNT;
4777 flags |= (cnt << 16);
4778 ndbout_c("sub %u %s", cnt,
4779 state & Bucket::BUCKET_DROPPED_SELF ? "self" : "other");
4780 if (state & Bucket::BUCKET_DROPPED_SELF)
4781 {
4782 m_active_buckets.clear(i);
4783 drop = true;
4784 }
4785 }
4786 else if (state & Bucket::BUCKET_SHUTDOWN)
4787 {
4788 jam();
4789 Uint32 nodeId = c_buckets[i].m_switchover_node;
4790 ndbrequire(nodeId == getOwnNodeId());
4791 m_active_buckets.clear(i);
4792 m_gcp_complete_rep_count--;
4793 ndbout_c("shutdown handover");
4794 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN;
4795 }
4796 else if (state & Bucket::BUCKET_SHUTDOWN_TO)
4797 {
4798 jam();
4799 Uint32 nodeId = c_buckets[i].m_switchover_node;
4800 NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
4801 nodegroup.clear(nodeId);
4802 ndbrequire(get_responsible_node(i) == nodeId &&
4803 get_responsible_node(i, nodegroup) == getOwnNodeId());
4804 m_active_buckets.set(i);
4805 m_gcp_complete_rep_count++;
4806 ndbout_c("shutdown takover");
4807 c_buckets[i].m_state &= ~(Uint32)Bucket::BUCKET_SHUTDOWN_TO;
4808 }
4809 }
4810 }
4811
4812 if (m_switchover_buckets.isclear())
4813 {
4814 jam();
4815 if(getNodeState().startLevel == NodeState::SL_STARTING &&
4816 c_startup.m_handover_nodes.isclear())
4817 {
4818 jam();
4819 sendSTTORRY(signal);
4820 }
4821 else if (getNodeState().startLevel >= NodeState::SL_STOPPING_1)
4822 {
4823 jam();
4824 ndbrequire(c_shutdown.m_wait_handover);
4825 StopMeConf * conf = CAST_PTR(StopMeConf, signal->getDataPtrSend());
4826 conf->senderData = c_shutdown.m_senderData;
4827 conf->senderRef = reference();
4828 sendSignal(c_shutdown.m_senderRef, GSN_STOP_ME_CONF, signal,
4829 StopMeConf::SignalLength, JBB);
4830 c_shutdown.m_wait_handover = false;
4831 infoEvent("Suma: handover complete");
4832 }
4833 }
4834
4835 if (unlock)
4836 {
4837 jam();
4838 send_dict_unlock_ord(signal, DictLockReq::SumaHandOver);
4839 }
4840 }
4841
4842 if(ERROR_INSERTED(13010))
4843 {
4844 CLEAR_ERROR_INSERT_VALUE;
4845 ndbout_c("Don't send GCP_COMPLETE_REP(%llu)", gci);
4846 return;
4847 }
4848
4849 /**
4850 * Signal to subscribers
4851 */
4852 rep->gci_hi = gci_hi;
4853 rep->gci_lo = gci_lo;
4854 rep->flags = flags;
4855 rep->senderRef = reference();
4856 rep->gcp_complete_rep_count = m_gcp_complete_rep_count;
4857
4858 if(m_gcp_complete_rep_count && !c_subscriber_nodes.isclear())
4859 {
4860 CRASH_INSERTION(13033);
4861
4862 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
4863 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
4864 SubGcpCompleteRep::SignalLength, JBB);
4865
4866 Ptr<Gcp_record> gcp;
4867 if(c_gcp_list.seize(gcp))
4868 {
4869 gcp.p->m_gci = gci;
4870 gcp.p->m_subscribers = c_subscriber_nodes;
4871 }
4872 else
4873 {
4874 char buf[100];
4875 c_subscriber_nodes.getText(buf);
4876 g_eventLogger->error("c_gcp_list.seize() failed: gci: %llu nodes: %s",
4877 gci, buf);
4878 }
4879 }
4880
4881 /**
4882 * Add GCP COMPLETE REP to buffer
4883 */
4884 bool subscribers = !c_subscriber_nodes.isclear();
4885 for(Uint32 i = 0; i<c_no_of_buckets; i++)
4886 {
4887 if(m_active_buckets.get(i))
4888 continue;
4889
4890 if (subscribers || (c_buckets[i].m_state & Bucket::BUCKET_RESEND))
4891 {
4892 //Uint32* dst;
4893 get_buffer_ptr(signal, i, gci, 0);
4894 }
4895 }
4896
4897 if(m_out_of_buffer_gci && gci > m_out_of_buffer_gci)
4898 {
4899 jam();
4900 infoEvent("Reenable event buffer");
4901 m_out_of_buffer_gci = 0;
4902 m_missing_data = false;
4903 }
4904
4905 if (unlikely(drop))
4906 {
4907 jam();
4908 m_gcp_complete_rep_count = 0;
4909 c_nodeGroup = RNIL;
4910 c_nodes_in_nodegroup_mask.clear();
4911 fix_nodegroup();
4912 }
4913 }
4914
4915 void
execCREATE_TAB_CONF(Signal * signal)4916 Suma::execCREATE_TAB_CONF(Signal *signal)
4917 {
4918 jamEntry();
4919 DBUG_ENTER("Suma::execCREATE_TAB_CONF");
4920
4921 DBUG_VOID_RETURN;
4922 }
4923
4924 void
execDROP_TAB_CONF(Signal * signal)4925 Suma::execDROP_TAB_CONF(Signal *signal)
4926 {
4927 jamEntry();
4928 ndbassert(signal->getNoOfSections() == 0);
4929
4930 DropTabConf * const conf = (DropTabConf*)signal->getDataPtr();
4931 Uint32 senderRef= conf->senderRef;
4932 Uint32 tableId= conf->tableId;
4933
4934 TablePtr tabPtr;
4935 if (!c_tables.find(tabPtr, tableId))
4936 {
4937 jam();
4938 return;
4939 }
4940
4941 DBUG_PRINT("info",("drop table id: %d[i=%u]", tableId, tabPtr.i));
4942 const Table::State old_state = tabPtr.p->m_state;
4943 tabPtr.p->m_state = Table::DROPPED;
4944 c_tables.remove(tabPtr);
4945
4946 if (senderRef != 0)
4947 {
4948 jam();
4949
4950 // dict coordinator sends info to API
4951
4952 const Uint64 gci = get_current_gci(signal);
4953 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
4954 data->gci_hi = Uint32(gci >> 32);
4955 data->gci_lo = Uint32(gci);
4956 data->tableId = tableId;
4957 data->requestInfo = 0;
4958 SubTableData::setOperation(data->requestInfo,
4959 NdbDictionary::Event::_TE_DROP);
4960 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
4961
4962 Ptr<Subscription> subPtr;
4963 LocalDLList<Subscription> subList(c_subscriptionPool,
4964 tabPtr.p->m_subscriptions);
4965
4966 for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
4967 {
4968 jam();
4969 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
4970 {
4971 jam();
4972 continue;
4973 //continue in for-loop if the table is not part of
4974 //the subscription. Otherwise, send data to subscriber.
4975 }
4976
4977 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
4978 {
4979 jam();
4980 continue;
4981 }
4982
4983 Ptr<Subscriber> ptr;
4984 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
4985 for(list.first(ptr); !ptr.isNull(); list.next(ptr))
4986 {
4987 jam();
4988 data->senderData= ptr.p->m_senderData;
4989 sendSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
4990 SubTableData::SignalLength, JBB);
4991 }
4992 }
4993 }
4994
4995 if (old_state == Table::DEFINING)
4996 {
4997 jam();
4998 return;
4999 }
5000
5001 if (tabPtr.p->m_subscriptions.isEmpty())
5002 {
5003 jam();
5004 tabPtr.p->release(* this);
5005 c_tablePool.release(tabPtr);
5006 return;
5007 }
5008 else
5009 {
5010 /**
5011 * check_release_subscription create a subList...
5012 * weirdness below is to make sure that it's not created twice
5013 */
5014 Ptr<Subscription> subPtr;
5015 {
5016 LocalDLList<Subscription> subList(c_subscriptionPool,
5017 tabPtr.p->m_subscriptions);
5018 subList.first(subPtr);
5019 }
5020 while (!subPtr.isNull())
5021 {
5022 Ptr<Subscription> tmp = subPtr;
5023 {
5024 LocalDLList<Subscription> subList(c_subscriptionPool,
5025 tabPtr.p->m_subscriptions);
5026 subList.next(subPtr);
5027 }
5028 check_release_subscription(signal, tmp);
5029 }
5030 }
5031 }
5032
5033 /**
5034 * This receives DICT_TAB_INFO in long signal section 1, and releases the data
5035 * after use.
5036 */
5037 void
execALTER_TAB_REQ(Signal * signal)5038 Suma::execALTER_TAB_REQ(Signal *signal)
5039 {
5040 jamEntry();
5041
5042 AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
5043 Uint32 senderRef= req->senderRef;
5044 Uint32 tableId= req->tableId;
5045 Uint32 changeMask= req->changeMask;
5046 TablePtr tabPtr;
5047
5048 // Copy DICT_TAB_INFO to local linear buffer
5049 SectionHandle handle(this, signal);
5050 SegmentedSectionPtr tabInfoPtr;
5051 handle.getSection(tabInfoPtr, 0);
5052
5053 if (!c_tables.find(tabPtr, tableId))
5054 {
5055 jam();
5056 releaseSections(handle);
5057 return;
5058 }
5059
5060 if (senderRef == 0)
5061 {
5062 jam();
5063 releaseSections(handle);
5064 return;
5065 }
5066 // dict coordinator sends info to API
5067
5068 #ifndef DBUG_OFF
5069 ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
5070 SimplePropertiesSectionReader reader(handle.m_ptr[0],
5071 getSectionSegmentPool());
5072 reader.printAll(ndbout);
5073 #endif
5074 copy(b_dti_buf, tabInfoPtr);
5075 releaseSections(handle);
5076
5077 LinearSectionPtr lptr[3];
5078 lptr[0].p = b_dti_buf;
5079 lptr[0].sz = tabInfoPtr.sz;
5080
5081 const Uint64 gci = get_current_gci(signal);
5082 SubTableData * data = (SubTableData*)signal->getDataPtrSend();
5083 data->gci_hi = Uint32(gci >> 32);
5084 data->gci_lo = Uint32(gci);
5085 data->tableId = tableId;
5086 data->requestInfo = 0;
5087 SubTableData::setOperation(data->requestInfo,
5088 NdbDictionary::Event::_TE_ALTER);
5089 SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
5090 data->flags = 0;
5091 data->changeMask = changeMask;
5092 data->totalLen = tabInfoPtr.sz;
5093 Ptr<Subscription> subPtr;
5094 LocalDLList<Subscription> subList(c_subscriptionPool,
5095 tabPtr.p->m_subscriptions);
5096
5097 for (subList.first(subPtr); !subPtr.isNull(); subList.next(subPtr))
5098 {
5099 if(subPtr.p->m_subscriptionType != SubCreateReq::TableEvent)
5100 {
5101 jam();
5102 continue;
5103 //continue in for-loop if the table is not part of
5104 //the subscription. Otherwise, send data to subscriber.
5105 }
5106
5107 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5108 {
5109 jam();
5110 continue;
5111 }
5112
5113 Ptr<Subscriber> ptr;
5114 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5115 for(list.first(ptr); !ptr.isNull(); list.next(ptr))
5116 {
5117 jam();
5118 data->senderData= ptr.p->m_senderData;
5119 Callback c = { 0, 0 };
5120 sendFragmentedSignal(ptr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
5121 SubTableData::SignalLength, JBB, lptr, 1, c);
5122 }
5123 }
5124 }
5125
5126 void
execSUB_GCP_COMPLETE_ACK(Signal * signal)5127 Suma::execSUB_GCP_COMPLETE_ACK(Signal* signal)
5128 {
5129 jamEntry();
5130 ndbassert(signal->getNoOfSections() == 0);
5131
5132 SubGcpCompleteAck * const ack = (SubGcpCompleteAck*)signal->getDataPtr();
5133 Uint32 gci_hi = ack->rep.gci_hi;
5134 Uint32 gci_lo = ack->rep.gci_lo;
5135 Uint32 senderRef = ack->rep.senderRef;
5136 if (unlikely(signal->getLength() < SubGcpCompleteAck::SignalLength))
5137 {
5138 jam();
5139 ndbassert(!ndb_check_micro_gcp(getNodeInfo(refToNode(senderRef)).m_version));
5140 gci_lo = 0;
5141 }
5142
5143 Uint64 gci = gci_lo | (Uint64(gci_hi) << 32);
5144 m_max_seen_gci = (gci > m_max_seen_gci ? gci : m_max_seen_gci);
5145
5146 if (ERROR_INSERTED(13037))
5147 {
5148 jam();
5149 ndbout_c("Simulating exceeding the MaxBufferedEpochs, ignoring ack");
5150 return;
5151 }
5152
5153 if (refToBlock(senderRef) == SUMA)
5154 {
5155 jam();
5156
5157 // Ack from other SUMA
5158 Uint32 nodeId= refToNode(senderRef);
5159 for(Uint32 i = 0; i<c_no_of_buckets; i++)
5160 {
5161 if(m_active_buckets.get(i) ||
5162 (m_switchover_buckets.get(i) && (check_switchover(i, gci))) ||
5163 (!m_switchover_buckets.get(i) && get_responsible_node(i) == nodeId))
5164 {
5165 release_gci(signal, i, gci);
5166 }
5167 }
5168 return;
5169 }
5170
5171 // Ack from User and not an ack from other SUMA, redistribute in nodegroup
5172
5173 Uint32 nodeId = refToNode(senderRef);
5174 if (ERROR_INSERTED(13023))
5175 {
5176 ndbout_c("Throwing SUB_GCP_COMPLETE_ACK gci: %u/%u from %u",
5177 Uint32(gci>>32), Uint32(gci), nodeId);
5178 return;
5179 }
5180
5181
5182 jam();
5183 Ptr<Gcp_record> gcp;
5184 for(c_gcp_list.first(gcp); !gcp.isNull(); c_gcp_list.next(gcp))
5185 {
5186 if(gcp.p->m_gci == gci)
5187 {
5188 gcp.p->m_subscribers.clear(nodeId);
5189 gcp.p->m_subscribers.bitAND(c_subscriber_nodes);
5190 if(!gcp.p->m_subscribers.isclear())
5191 {
5192 jam();
5193 return;
5194 }
5195 break;
5196 }
5197 }
5198
5199 if(gcp.isNull())
5200 {
5201 g_eventLogger->warning("ACK wo/ gcp record (gci: %u/%u) ref: %.8x from: %.8x",
5202 Uint32(gci >> 32), Uint32(gci),
5203 senderRef, signal->getSendersBlockRef());
5204 }
5205 else
5206 {
5207 c_gcp_list.release(gcp);
5208 }
5209
5210 CRASH_INSERTION(13011);
5211 if(ERROR_INSERTED(13012))
5212 {
5213 CLEAR_ERROR_INSERT_VALUE;
5214 ndbout_c("Don't redistribute SUB_GCP_COMPLETE_ACK");
5215 return;
5216 }
5217
5218 ack->rep.senderRef = reference();
5219 NodeReceiverGroup rg(SUMA, c_nodes_in_nodegroup_mask);
5220 sendSignal(rg, GSN_SUB_GCP_COMPLETE_ACK, signal,
5221 SubGcpCompleteAck::SignalLength, JBB);
5222 }
5223
5224 /**************************************************************
5225 *
5226 * Removing subscription
5227 *
5228 */
5229
5230 void
execSUB_REMOVE_REQ(Signal * signal)5231 Suma::execSUB_REMOVE_REQ(Signal* signal)
5232 {
5233 jamEntry();
5234 DBUG_ENTER("Suma::execSUB_REMOVE_REQ");
5235
5236 CRASH_INSERTION(13021);
5237
5238 const SubRemoveReq req = *(SubRemoveReq*)signal->getDataPtr();
5239 SubscriptionPtr subPtr;
5240 Subscription key;
5241 key.m_subscriptionId = req.subscriptionId;
5242 key.m_subscriptionKey = req.subscriptionKey;
5243
5244 if (c_startup.m_restart_server_node_id == RNIL)
5245 {
5246 jam();
5247
5248 /**
5249 * We havent started syncing yet
5250 */
5251 sendSubRemoveRef(signal, req, SubRemoveRef::NotStarted);
5252 return;
5253 }
5254
5255 bool found = c_subscriptions.find(subPtr, key);
5256
5257 if(!found)
5258 {
5259 jam();
5260 sendSubRemoveRef(signal, req, SubRemoveRef::NoSuchSubscription);
5261 return;
5262 }
5263
5264 switch(subPtr.p->m_state){
5265 case Subscription::UNDEFINED:
5266 jam();
5267 ndbrequire(false);
5268 case Subscription::DEFINING:
5269 jam();
5270 sendSubRemoveRef(signal, req, SubRemoveRef::Defining);
5271 return;
5272 case Subscription::DEFINED:
5273 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5274 {
5275 /**
5276 * already dropped
5277 */
5278 jam();
5279 sendSubRemoveRef(signal, req, SubRemoveRef::AlreadyDropped);
5280 return;
5281 }
5282 break;
5283 }
5284
5285 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5286 check_release_subscription(signal, subPtr);
5287
5288 SubRemoveConf * const conf = (SubRemoveConf*)signal->getDataPtrSend();
5289 conf->senderRef = reference();
5290 conf->senderData = req.senderData;
5291 conf->subscriptionId = req.subscriptionId;
5292 conf->subscriptionKey = req.subscriptionKey;
5293
5294 sendSignal(req.senderRef, GSN_SUB_REMOVE_CONF, signal,
5295 SubRemoveConf::SignalLength, JBB);
5296 return;
5297 }
5298
5299 void
check_release_subscription(Signal * signal,Ptr<Subscription> subPtr)5300 Suma::check_release_subscription(Signal* signal, Ptr<Subscription> subPtr)
5301 {
5302 if (!subPtr.p->m_subscribers.isEmpty())
5303 {
5304 jam();
5305 return;
5306 }
5307
5308 if (!subPtr.p->m_start_req.isEmpty())
5309 {
5310 jam();
5311 return;
5312 }
5313
5314 if (!subPtr.p->m_stop_req.isEmpty())
5315 {
5316 jam();
5317 return;
5318 }
5319
5320 switch(subPtr.p->m_trigger_state){
5321 case Subscription::T_UNDEFINED:
5322 jam();
5323 goto do_release;
5324 case Subscription::T_CREATING:
5325 jam();
5326 /**
5327 * Wait for completion
5328 */
5329 return;
5330 case Subscription::T_DEFINED:
5331 jam();
5332 subPtr.p->m_trigger_state = Subscription::T_DROPPING;
5333 drop_triggers(signal, subPtr);
5334 return;
5335 case Subscription::T_DROPPING:
5336 jam();
5337 /**
5338 * Wait for completion
5339 */
5340 return;
5341 case Subscription::T_ERROR:
5342 jam();
5343 /**
5344 * Wait for completion
5345 */
5346 return;
5347 }
5348 ndbrequire(false);
5349
5350 do_release:
5351 TablePtr tabPtr;
5352 c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5353
5354 if (tabPtr.p->m_state == Table::DROPPED)
5355 {
5356 jam();
5357 subPtr.p->m_options |= Subscription::MARKED_DROPPED;
5358 }
5359
5360 if ((subPtr.p->m_options & Subscription::MARKED_DROPPED) == 0)
5361 {
5362 jam();
5363 return;
5364 }
5365
5366 {
5367 LocalDLList<Subscription> list(c_subscriptionPool,
5368 tabPtr.p->m_subscriptions);
5369 list.remove(subPtr);
5370 }
5371
5372 if (tabPtr.p->m_subscriptions.isEmpty())
5373 {
5374 jam();
5375 switch(tabPtr.p->m_state){
5376 case Table::UNDEFINED:
5377 ndbrequire(false);
5378 case Table::DEFINING:
5379 break;
5380 case Table::DEFINED:
5381 jam();
5382 c_tables.remove(tabPtr);
5383 // Fall through
5384 case Table::DROPPED:
5385 jam();
5386 tabPtr.p->release(* this);
5387 c_tablePool.release(tabPtr);
5388 };
5389 }
5390
5391 c_subscriptions.release(subPtr);
5392 }
5393
5394 void
sendSubRemoveRef(Signal * signal,const SubRemoveReq & req,Uint32 errCode)5395 Suma::sendSubRemoveRef(Signal* signal,
5396 const SubRemoveReq& req,
5397 Uint32 errCode)
5398 {
5399 jam();
5400 DBUG_ENTER("Suma::sendSubRemoveRef");
5401 SubRemoveRef * ref = (SubRemoveRef *)signal->getDataPtrSend();
5402 ref->senderRef = reference();
5403 ref->senderData = req.senderData;
5404 ref->subscriptionId = req.subscriptionId;
5405 ref->subscriptionKey = req.subscriptionKey;
5406 ref->errorCode = errCode;
5407 sendSignal(signal->getSendersBlockRef(), GSN_SUB_REMOVE_REF,
5408 signal, SubRemoveRef::SignalLength, JBB);
5409 DBUG_VOID_RETURN;
5410 }
5411
5412 void
release(Suma & suma)5413 Suma::Table::release(Suma & suma){
5414 jamBlock(&suma);
5415
5416 m_state = UNDEFINED;
5417 }
5418
5419 void
release()5420 Suma::SyncRecord::release(){
5421 jam();
5422
5423 LocalDataBuffer<15> fragBuf(suma.c_dataBufferPool, m_fragments);
5424 fragBuf.release();
5425
5426 LocalDataBuffer<15> attrBuf(suma.c_dataBufferPool, m_attributeList);
5427 attrBuf.release();
5428
5429 LocalDataBuffer<15> boundBuf(suma.c_dataBufferPool, m_boundInfo);
5430 boundBuf.release();
5431 }
5432
5433
5434 /**************************************************************
5435 *
5436 * Restarting remote node functions, master functionality
5437 * (slave does nothing special)
5438 * - triggered on INCL_NODEREQ calling startNode
5439 * - included node will issue START_ME when it's ready to start
5440 * the subscribers
5441 *
5442 */
5443
5444 void
execSUMA_START_ME_REQ(Signal * signal)5445 Suma::execSUMA_START_ME_REQ(Signal* signal) {
5446 jamEntry();
5447
5448 Uint32 retref = signal->getSendersBlockRef();
5449 if (c_restart.m_ref)
5450 {
5451 jam();
5452 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5453 ref->errorCode = SumaStartMeRef::Busy;
5454 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5455 SumaStartMeRef::SignalLength, JBB);
5456 return;
5457 }
5458
5459 if (getNodeState().getStarted() == false)
5460 {
5461 jam();
5462 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5463 ref->errorCode = SumaStartMeRef::NotStarted;
5464 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5465 SumaStartMeRef::SignalLength, JBB);
5466 return;
5467 }
5468
5469 Ptr<SubOpRecord> subOpPtr;
5470 if (c_subOpPool.seize(subOpPtr) == false)
5471 {
5472 jam();
5473 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5474 ref->errorCode = SumaStartMeRef::Busy;
5475 sendSignal(retref, GSN_SUMA_START_ME_REF, signal,
5476 SumaStartMeRef::SignalLength, JBB);
5477 return;
5478 }
5479
5480 subOpPtr.p->m_opType = SubOpRecord::R_START_ME_REQ;
5481
5482 c_restart.m_abort = 0;
5483 c_restart.m_waiting_on_self = 0;
5484 c_restart.m_ref = retref;
5485 c_restart.m_max_seq = c_current_seq;
5486 c_restart.m_subOpPtrI = subOpPtr.i;
5487
5488 DLHashTable<Subscription>::Iterator it;
5489 if (c_subscriptions.first(it))
5490 {
5491 jam();
5492
5493 /**
5494 * We only need to handle subscriptions with seq <= c_current_seq
5495 * all subscriptions(s) created after this, will be handled by
5496 * starting suma directly
5497 */
5498 c_current_seq++;
5499 }
5500
5501 copySubscription(signal, it);
5502 }
5503
5504 void
copySubscription(Signal * signal,DLHashTable<Subscription>::Iterator it)5505 Suma::copySubscription(Signal* signal, DLHashTable<Subscription>::Iterator it)
5506 {
5507 jam();
5508
5509 Ptr<SubOpRecord> subOpPtr;
5510 c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5511
5512 Ptr<Subscription> subPtr = it.curr;
5513 if (!subPtr.isNull())
5514 {
5515 jam();
5516 c_restart.m_subPtrI = subPtr.i;
5517 c_restart.m_bucket = it.bucket;
5518
5519 LocalDLFifoList<SubOpRecord> list(c_subOpPool, subPtr.p->m_stop_req);
5520 bool empty = list.isEmpty();
5521 list.add(subOpPtr);
5522
5523 if (!empty)
5524 {
5525 /**
5526 * Wait for lock
5527 */
5528 jam();
5529 c_restart.m_waiting_on_self = 1;
5530 return;
5531 }
5532
5533 sendSubCreateReq(signal, subPtr);
5534 }
5535 else
5536 {
5537 jam();
5538 SumaStartMeConf* conf = (SumaStartMeConf*)signal->getDataPtrSend();
5539 conf->unused = 0;
5540 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_CONF, signal,
5541 SumaStartMeConf::SignalLength, JBB);
5542
5543 c_subOpPool.release(subOpPtr);
5544 c_restart.m_ref = 0;
5545 return;
5546 }
5547 }
5548
5549 void
sendSubCreateReq(Signal * signal,Ptr<Subscription> subPtr)5550 Suma::sendSubCreateReq(Signal* signal, Ptr<Subscription> subPtr)
5551 {
5552 jam();
5553
5554 if (c_restart.m_abort)
5555 {
5556 jam();
5557 abort_start_me(signal, subPtr, true);
5558 return;
5559 }
5560
5561 c_restart.m_waiting_on_self = 0;
5562 SubCreateReq * req = (SubCreateReq *)signal->getDataPtrSend();
5563 req->senderRef = reference();
5564 req->senderData = subPtr.i;
5565 req->subscriptionId = subPtr.p->m_subscriptionId;
5566 req->subscriptionKey = subPtr.p->m_subscriptionKey;
5567 req->subscriptionType = subPtr.p->m_subscriptionType;
5568 req->tableId = subPtr.p->m_tableId;
5569 req->schemaTransId = 0;
5570
5571 if (subPtr.p->m_options & Subscription::REPORT_ALL)
5572 {
5573 req->subscriptionType |= SubCreateReq::ReportAll;
5574 }
5575
5576 if (subPtr.p->m_options & Subscription::REPORT_SUBSCRIBE)
5577 {
5578 req->subscriptionType |= SubCreateReq::ReportSubscribe;
5579 }
5580
5581 if (subPtr.p->m_options & Subscription::NO_REPORT_DDL)
5582 {
5583 req->subscriptionType |= SubCreateReq::NoReportDDL;
5584 }
5585
5586 if (subPtr.p->m_options & Subscription::MARKED_DROPPED)
5587 {
5588 req->subscriptionType |= SubCreateReq::NR_Sub_Dropped;
5589 ndbout_c("copying dropped sub: %u", subPtr.i);
5590 }
5591
5592 Ptr<Table> tabPtr;
5593 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5594 if (tabPtr.p->m_state != Table::DROPPED)
5595 {
5596 jam();
5597 c_restart.m_waiting_on_self = 0;
5598 if (!ndbd_suma_dictlock_startme(getNodeInfo(refToNode(c_restart.m_ref)).m_version))
5599 {
5600 jam();
5601 /**
5602 * Downgrade
5603 *
5604 * In pre suma v2, SUB_CREATE_REQ::SignalLength is one greater
5605 * but code checks length and set a default value...
5606 * so we dont need to do anything...
5607 * Thank you Ms. Fortuna
5608 */
5609 }
5610
5611 sendSignal(c_restart.m_ref, GSN_SUB_CREATE_REQ, signal,
5612 SubCreateReq::SignalLength, JBB);
5613 }
5614 else
5615 {
5616 jam();
5617 ndbout_c("not copying sub %u with dropped table: %u/%u",
5618 subPtr.i,
5619 tabPtr.p->m_tableId, tabPtr.i);
5620
5621 c_restart.m_waiting_on_self = 1;
5622 SubCreateConf * conf = (SubCreateConf *)signal->getDataPtrSend();
5623 conf->senderRef = reference();
5624 conf->senderData = subPtr.i;
5625 sendSignal(reference(), GSN_SUB_CREATE_CONF, signal,
5626 SubCreateConf::SignalLength, JBB);
5627 }
5628 }
5629
5630 void
execSUB_CREATE_REF(Signal * signal)5631 Suma::execSUB_CREATE_REF(Signal* signal)
5632 {
5633 jamEntry();
5634
5635 SubCreateRef *const ref= (SubCreateRef *)signal->getDataPtr();
5636 Uint32 error= ref->errorCode;
5637
5638 {
5639 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5640 ref->errorCode = error;
5641 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5642 SumaStartMeRef::SignalLength, JBB);
5643 }
5644
5645 Ptr<Subscription> subPtr;
5646 c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5647 abort_start_me(signal, subPtr, true);
5648 }
5649
5650 void
execSUB_CREATE_CONF(Signal * signal)5651 Suma::execSUB_CREATE_CONF(Signal* signal)
5652 {
5653 jamEntry();
5654
5655 /**
5656 * We have lock...start all subscriber(s)
5657 */
5658 Ptr<Subscription> subPtr;
5659 c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5660
5661 c_restart.m_waiting_on_self = 0;
5662
5663 /**
5664 * Check if we were aborted...
5665 * this signal is sent to self in case of DROPPED subscription...
5666 */
5667 if (c_restart.m_abort)
5668 {
5669 jam();
5670 abort_start_me(signal, subPtr, true);
5671 return;
5672 }
5673
5674 Ptr<Table> tabPtr;
5675 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
5676
5677 Ptr<Subscriber> ptr;
5678 if (tabPtr.p->m_state != Table::DROPPED)
5679 {
5680 jam();
5681 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5682 list.first(ptr);
5683 }
5684 else
5685 {
5686 jam();
5687 ptr.setNull();
5688 ndbout_c("not copying subscribers on sub: %u with dropped table %u/%u",
5689 subPtr.i, tabPtr.p->m_tableId, tabPtr.i);
5690 }
5691
5692 copySubscriber(signal, subPtr, ptr);
5693 }
5694
5695 void
copySubscriber(Signal * signal,Ptr<Subscription> subPtr,Ptr<Subscriber> ptr)5696 Suma::copySubscriber(Signal* signal,
5697 Ptr<Subscription> subPtr,
5698 Ptr<Subscriber> ptr)
5699 {
5700 if (!ptr.isNull())
5701 {
5702 jam();
5703
5704 SubStartReq* req = (SubStartReq*)signal->getDataPtrSend();
5705 req->senderRef = reference();
5706 req->senderData = ptr.i;
5707 req->subscriptionId = subPtr.p->m_subscriptionId;
5708 req->subscriptionKey = subPtr.p->m_subscriptionKey;
5709 req->part = SubscriptionData::TableData;
5710 req->subscriberData = ptr.p->m_senderData;
5711 req->subscriberRef = ptr.p->m_senderRef;
5712
5713 sendSignal(c_restart.m_ref, GSN_SUB_START_REQ,
5714 signal, SubStartReq::SignalLength, JBB);
5715 return;
5716 }
5717 else
5718 {
5719 // remove lock from this subscription
5720 Ptr<SubOpRecord> subOpPtr;
5721 c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5722 check_remove_queue(signal, subPtr, subOpPtr, true, false);
5723 check_release_subscription(signal, subPtr);
5724
5725 DLHashTable<Subscription>::Iterator it;
5726 it.curr = subPtr;
5727 it.bucket = c_restart.m_bucket;
5728 c_subscriptions.next(it);
5729 copySubscription(signal, it);
5730 }
5731 }
5732
5733 void
execSUB_START_CONF(Signal * signal)5734 Suma::execSUB_START_CONF(Signal* signal)
5735 {
5736 jamEntry();
5737
5738 SubStartConf * const conf = (SubStartConf*)signal->getDataPtr();
5739
5740 Ptr<Subscription> subPtr;
5741 c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5742
5743 Ptr<Subscriber> ptr;
5744 c_subscriberPool.getPtr(ptr, conf->senderData);
5745
5746 LocalDLList<Subscriber> list(c_subscriberPool, subPtr.p->m_subscribers);
5747 list.next(ptr);
5748 copySubscriber(signal, subPtr, ptr);
5749 }
5750
5751 void
execSUB_START_REF(Signal * signal)5752 Suma::execSUB_START_REF(Signal* signal)
5753 {
5754 jamEntry();
5755
5756 SubStartRef * sig = (SubStartRef*)signal->getDataPtr();
5757 Uint32 errorCode = sig->errorCode;
5758
5759 {
5760 SumaStartMeRef* ref= (SumaStartMeRef*)signal->getDataPtrSend();
5761 ref->errorCode = errorCode;
5762 sendSignal(c_restart.m_ref, GSN_SUMA_START_ME_REF, signal,
5763 SumaStartMeRef::SignalLength, JBB);
5764 }
5765
5766 Ptr<Subscription> subPtr;
5767 c_subscriptionPool.getPtr(subPtr, c_restart.m_subPtrI);
5768
5769 abort_start_me(signal, subPtr, true);
5770 }
5771
5772 void
abort_start_me(Signal * signal,Ptr<Subscription> subPtr,bool lockowner)5773 Suma::abort_start_me(Signal* signal, Ptr<Subscription> subPtr,
5774 bool lockowner)
5775 {
5776 Ptr<SubOpRecord> subOpPtr;
5777 c_subOpPool.getPtr(subOpPtr, c_restart.m_subOpPtrI);
5778 check_remove_queue(signal, subPtr, subOpPtr, lockowner, true);
5779 check_release_subscription(signal, subPtr);
5780
5781 c_restart.m_ref = 0;
5782 }
5783
5784 void
execSUMA_HANDOVER_REQ(Signal * signal)5785 Suma::execSUMA_HANDOVER_REQ(Signal* signal)
5786 {
5787 jamEntry();
5788 DBUG_ENTER("Suma::execSUMA_HANDOVER_REQ");
5789 // Uint32 sumaRef = signal->getSendersBlockRef();
5790 const SumaHandoverReq * req = CAST_CONSTPTR(SumaHandoverReq,
5791 signal->getDataPtr());
5792
5793 Uint32 gci = req->gci;
5794 Uint32 nodeId = req->nodeId;
5795 Uint32 new_gci = Uint32(m_last_complete_gci >> 32) + MAX_CONCURRENT_GCP + 1;
5796 Uint32 requestType = req->requestType;
5797 if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5798 {
5799 jam();
5800 requestType = SumaHandoverReq::RT_START_NODE;
5801 }
5802
5803 Uint32 start_gci = (gci > new_gci ? gci : new_gci);
5804 // mark all active buckets really belonging to restarting SUMA
5805
5806 Bucket_mask tmp;
5807 if (requestType == SumaHandoverReq::RT_START_NODE)
5808 {
5809 jam();
5810 c_alive_nodes.set(nodeId);
5811 if (DBG_3R)
5812 ndbout_c("%u c_alive_nodes.set(%u)", __LINE__, nodeId);
5813
5814 for( Uint32 i = 0; i < c_no_of_buckets; i++)
5815 {
5816 if(get_responsible_node(i) == nodeId)
5817 {
5818 if (m_active_buckets.get(i))
5819 {
5820 // I'm running this bucket but it should really be the restarted node
5821 tmp.set(i);
5822 m_active_buckets.clear(i);
5823 m_switchover_buckets.set(i);
5824 c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5825 c_buckets[i].m_state |= Bucket::BUCKET_HANDOVER;
5826 c_buckets[i].m_switchover_node = nodeId;
5827 ndbout_c("prepare to handover bucket: %d", i);
5828 }
5829 else if(m_switchover_buckets.get(i))
5830 {
5831 ndbout_c("dont handover bucket: %d %d", i, nodeId);
5832 }
5833 }
5834 }
5835 }
5836 else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5837 {
5838 jam();
5839
5840 for( Uint32 i = 0; i < c_no_of_buckets; i++)
5841 {
5842 NdbNodeBitmask nodegroup = c_nodes_in_nodegroup_mask;
5843 nodegroup.clear(nodeId);
5844 if(get_responsible_node(i) == nodeId &&
5845 get_responsible_node(i, nodegroup) == getOwnNodeId())
5846 {
5847 // I'm will be running this bucket when nodeId shutdown
5848 jam();
5849 tmp.set(i);
5850 m_switchover_buckets.set(i);
5851 c_buckets[i].m_switchover_gci = (Uint64(start_gci) << 32) - 1;
5852 c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN_TO;
5853 c_buckets[i].m_switchover_node = nodeId;
5854 ndbout_c("prepare to takeover bucket: %d", i);
5855 }
5856 }
5857 }
5858 else
5859 {
5860 jam();
5861 goto ref;
5862 }
5863
5864 {
5865 SumaHandoverConf *conf= CAST_PTR(SumaHandoverConf,signal->getDataPtrSend());
5866 tmp.copyto(BUCKET_MASK_SIZE, conf->theBucketMask);
5867 conf->gci = start_gci;
5868 conf->nodeId = getOwnNodeId();
5869 conf->requestType = requestType;
5870 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_CONF, signal,
5871 SumaHandoverConf::SignalLength, JBB);
5872 }
5873
5874 DBUG_VOID_RETURN;
5875
5876 ref:
5877 signal->theData[0] = 111;
5878 signal->theData[1] = getOwnNodeId();
5879 signal->theData[2] = nodeId;
5880 sendSignal(calcSumaBlockRef(nodeId), GSN_SUMA_HANDOVER_REF, signal, 3, JBB);
5881 DBUG_VOID_RETURN;
5882 }
5883
5884 // only run on all but restarting suma
5885 void
execSUMA_HANDOVER_REF(Signal * signal)5886 Suma::execSUMA_HANDOVER_REF(Signal* signal)
5887 {
5888 ndbrequire(false);
5889 }
5890
5891 void
execSUMA_HANDOVER_CONF(Signal * signal)5892 Suma::execSUMA_HANDOVER_CONF(Signal* signal) {
5893 jamEntry();
5894 DBUG_ENTER("Suma::execSUMA_HANDOVER_CONF");
5895
5896 const SumaHandoverConf * conf = CAST_CONSTPTR(SumaHandoverConf,
5897 signal->getDataPtr());
5898
5899 CRASH_INSERTION(13043);
5900
5901 Uint32 gci = conf->gci;
5902 Uint32 nodeId = conf->nodeId;
5903 Uint32 requestType = conf->requestType;
5904 Bucket_mask tmp;
5905 tmp.assign(BUCKET_MASK_SIZE, conf->theBucketMask);
5906 #ifdef HANDOVER_DEBUG
5907 ndbout_c("Suma::execSUMA_HANDOVER_CONF, gci = %u", gci);
5908 #endif
5909
5910 if (!ndbd_suma_stop_me(getNodeInfo(nodeId).m_version))
5911 {
5912 jam();
5913 requestType = SumaHandoverReq::RT_START_NODE;
5914 }
5915
5916 if (requestType == SumaHandoverReq::RT_START_NODE)
5917 {
5918 jam();
5919 for (Uint32 i = 0; i < c_no_of_buckets; i++)
5920 {
5921 if (tmp.get(i))
5922 {
5923 if (DBG_3R)
5924 ndbout_c("%u : %u %u", i, get_responsible_node(i), getOwnNodeId());
5925 ndbrequire(get_responsible_node(i) == getOwnNodeId());
5926 // We should run this bucket, but _nodeId_ is
5927 c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5928 c_buckets[i].m_state |= Bucket::BUCKET_STARTING;
5929 }
5930 }
5931
5932 char buf[255];
5933 tmp.getText(buf);
5934 infoEvent("Suma: handover from node %u gci: %u buckets: %s (%u)",
5935 nodeId, gci, buf, c_no_of_buckets);
5936 g_eventLogger->info("Suma: handover from node %u gci: %u buckets: %s (%u)",
5937 nodeId, gci, buf, c_no_of_buckets);
5938 m_switchover_buckets.bitOR(tmp);
5939 c_startup.m_handover_nodes.clear(nodeId);
5940 DBUG_VOID_RETURN;
5941 }
5942 else if (requestType == SumaHandoverReq::RT_STOP_NODE)
5943 {
5944 jam();
5945 for (Uint32 i = 0; i < c_no_of_buckets; i++)
5946 {
5947 if (tmp.get(i))
5948 {
5949 ndbrequire(get_responsible_node(i) == getOwnNodeId());
5950 // We should run this bucket, but _nodeId_ is
5951 c_buckets[i].m_switchover_node = getOwnNodeId();
5952 c_buckets[i].m_switchover_gci = (Uint64(gci) << 32) - 1;
5953 c_buckets[i].m_state |= Bucket::BUCKET_SHUTDOWN;
5954 }
5955 }
5956
5957 char buf[255];
5958 tmp.getText(buf);
5959 infoEvent("Suma: handover to node %u gci: %u buckets: %s (%u)",
5960 nodeId, gci, buf, c_no_of_buckets);
5961 g_eventLogger->info("Suma: handover to node %u gci: %u buckets: %s (%u)",
5962 nodeId, gci, buf, c_no_of_buckets);
5963 m_switchover_buckets.bitOR(tmp);
5964 c_startup.m_handover_nodes.clear(nodeId);
5965 DBUG_VOID_RETURN;
5966 }
5967 }
5968
5969 void
execSTOP_ME_REQ(Signal * signal)5970 Suma::execSTOP_ME_REQ(Signal* signal)
5971 {
5972 jam();
5973 StopMeReq req = * CAST_CONSTPTR(StopMeReq, signal->getDataPtr());
5974
5975 ndbrequire(refToNode(req.senderRef) == getOwnNodeId());
5976 ndbrequire(c_shutdown.m_wait_handover == false);
5977 c_shutdown.m_wait_handover = true;
5978 c_shutdown.m_senderRef = req.senderRef;
5979 c_shutdown.m_senderData = req.senderData;
5980
5981 for (Uint32 i = c_nodes_in_nodegroup_mask.find(0);
5982 i != c_nodes_in_nodegroup_mask.NotFound ;
5983 i = c_nodes_in_nodegroup_mask.find(i + 1))
5984 {
5985 /**
5986 * Check that all SUMA nodes support graceful shutdown...
5987 * and it's too late to stop it...
5988 * Shutdown instead...
5989 */
5990 if (!ndbd_suma_stop_me(getNodeInfo(i).m_version))
5991 {
5992 jam();
5993 char buf[255];
5994 BaseString::snprintf(buf, sizeof(buf),
5995 "Not all versions support graceful shutdown (suma)."
5996 " Shutdown directly instead");
5997 progError(__LINE__,
5998 NDBD_EXIT_GRACEFUL_SHUTDOWN_ERROR,
5999 buf);
6000 ndbrequire(false);
6001 }
6002 }
6003 send_handover_req(signal, SumaHandoverReq::RT_STOP_NODE);
6004 }
6005
6006 #ifdef NOT_USED
6007 static
6008 NdbOut&
operator <<(NdbOut & out,const Suma::Page_pos & pos)6009 operator<<(NdbOut & out, const Suma::Page_pos & pos)
6010 {
6011 out << "[ Page_pos:"
6012 << " m_page_id: " << pos.m_page_id
6013 << " m_page_pos: " << pos.m_page_pos
6014 << " m_max_gci: " << pos.m_max_gci
6015 << " ]";
6016 return out;
6017 }
6018 #endif
6019
6020 Uint32*
get_buffer_ptr(Signal * signal,Uint32 buck,Uint64 gci,Uint32 sz)6021 Suma::get_buffer_ptr(Signal* signal, Uint32 buck, Uint64 gci, Uint32 sz)
6022 {
6023 sz += 1; // len
6024 Bucket* bucket= c_buckets+buck;
6025 Page_pos pos= bucket->m_buffer_head;
6026
6027 Buffer_page* page = 0;
6028 Uint32 *ptr = 0;
6029
6030 if (likely(pos.m_page_id != RNIL))
6031 {
6032 page= c_page_pool.getPtr(pos.m_page_id);
6033 ptr= page->m_data + pos.m_page_pos;
6034 }
6035
6036 const bool same_gci = (gci == pos.m_last_gci) && (!ERROR_INSERTED(13022));
6037
6038 pos.m_page_pos += sz;
6039 pos.m_last_gci = gci;
6040 Uint64 max = pos.m_max_gci > gci ? pos.m_max_gci : gci;
6041
6042 if(likely(same_gci && pos.m_page_pos <= Buffer_page::DATA_WORDS))
6043 {
6044 pos.m_max_gci = max;
6045 bucket->m_buffer_head = pos;
6046 * ptr++ = (0x8000 << 16) | sz; // Same gci
6047 return ptr;
6048 }
6049 else if(pos.m_page_pos + Buffer_page::GCI_SZ32 <= Buffer_page::DATA_WORDS)
6050 {
6051 loop:
6052 pos.m_max_gci = max;
6053 pos.m_page_pos += Buffer_page::GCI_SZ32;
6054 bucket->m_buffer_head = pos;
6055 * ptr++ = (sz + Buffer_page::GCI_SZ32);
6056 * ptr++ = (Uint32)(gci >> 32);
6057 * ptr++ = (Uint32)(gci & 0xFFFFFFFF);
6058 return ptr;
6059 }
6060 else
6061 {
6062 /**
6063 * new page
6064 * 1) save header on last page
6065 * 2) seize new page
6066 */
6067 Uint32 next;
6068 if(unlikely((next= seize_page()) == RNIL))
6069 {
6070 /**
6071 * Out of buffer
6072 */
6073 out_of_buffer(signal);
6074 return 0;
6075 }
6076
6077 if(likely(pos.m_page_id != RNIL))
6078 {
6079 page->m_max_gci_hi = (Uint32)(pos.m_max_gci >> 32);
6080 page->m_max_gci_lo = (Uint32)(pos.m_max_gci & 0xFFFFFFFF);
6081 page->m_words_used = pos.m_page_pos - sz;
6082 page->m_next_page= next;
6083 ndbassert(pos.m_max_gci != 0);
6084 }
6085 else
6086 {
6087 bucket->m_buffer_tail = next;
6088 }
6089
6090 memset(&pos, 0, sizeof(pos));
6091 pos.m_page_id = next;
6092 pos.m_page_pos = sz;
6093 pos.m_last_gci = gci;
6094
6095 page= c_page_pool.getPtr(pos.m_page_id);
6096 page->m_next_page= RNIL;
6097 ptr= page->m_data;
6098 goto loop; //
6099 }
6100 }
6101
6102 void
out_of_buffer(Signal * signal)6103 Suma::out_of_buffer(Signal* signal)
6104 {
6105 if(m_out_of_buffer_gci)
6106 {
6107 return;
6108 }
6109
6110 m_out_of_buffer_gci = m_last_complete_gci - 1;
6111 infoEvent("Out of event buffer: nodefailure will cause event failures");
6112 m_missing_data = false;
6113 out_of_buffer_release(signal, 0);
6114 }
6115
6116 void
out_of_buffer_release(Signal * signal,Uint32 buck)6117 Suma::out_of_buffer_release(Signal* signal, Uint32 buck)
6118 {
6119 Bucket* bucket= c_buckets+buck;
6120 Uint32 tail= bucket->m_buffer_tail;
6121
6122 if(tail != RNIL)
6123 {
6124 Buffer_page* page= c_page_pool.getPtr(tail);
6125 bucket->m_buffer_tail = page->m_next_page;
6126 free_page(tail, page);
6127 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6128 signal->theData[1] = buck;
6129 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6130 return;
6131 }
6132
6133 /**
6134 * Clear head
6135 */
6136 bucket->m_buffer_head.m_page_id = RNIL;
6137 bucket->m_buffer_head.m_page_pos = Buffer_page::DATA_WORDS + 1;
6138
6139 buck++;
6140 if(buck != c_no_of_buckets)
6141 {
6142 signal->theData[0] = SumaContinueB::OUT_OF_BUFFER_RELEASE;
6143 signal->theData[1] = buck;
6144 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
6145 return;
6146 }
6147
6148 /**
6149 * Finished will all release
6150 * prepare for inclusion
6151 */
6152 m_out_of_buffer_gci = m_max_seen_gci > m_last_complete_gci
6153 ? m_max_seen_gci : m_last_complete_gci;
6154 m_missing_data = false;
6155 }
6156
6157 Uint32
seize_page()6158 Suma::seize_page()
6159 {
6160 if (ERROR_INSERTED(13038))
6161 {
6162 jam();
6163 CLEAR_ERROR_INSERT_VALUE;
6164 ndbout_c("Simulating out of event buffer");
6165 m_out_of_buffer_gci = m_max_seen_gci;
6166 }
6167 if(unlikely(m_out_of_buffer_gci))
6168 {
6169 return RNIL;
6170 }
6171 loop:
6172 Ptr<Page_chunk> ptr;
6173 Uint32 ref= m_first_free_page;
6174 if(likely(ref != RNIL))
6175 {
6176 m_first_free_page = (c_page_pool.getPtr(ref))->m_next_page;
6177 Uint32 chunk = (c_page_pool.getPtr(ref))->m_page_chunk_ptr_i;
6178 c_page_chunk_pool.getPtr(ptr, chunk);
6179 ndbassert(ptr.p->m_free);
6180 ptr.p->m_free--;
6181 return ref;
6182 }
6183
6184 if(!c_page_chunk_pool.seize(ptr))
6185 return RNIL;
6186
6187 Uint32 count = 16;
6188 m_ctx.m_mm.alloc_pages(RT_DBTUP_PAGE, &ref, &count, 1);
6189 if (count == 0)
6190 return RNIL;
6191
6192 ndbout_c("alloc_chunk(%d %d) - ", ref, count);
6193
6194 m_first_free_page = ptr.p->m_page_id = ref;
6195 ptr.p->m_size = count;
6196 ptr.p->m_free = count;
6197
6198 Buffer_page* page;
6199 LINT_INIT(page);
6200 for(Uint32 i = 0; i<count; i++)
6201 {
6202 page = c_page_pool.getPtr(ref);
6203 page->m_page_state= SUMA_SEQUENCE;
6204 page->m_page_chunk_ptr_i = ptr.i;
6205 page->m_next_page = ++ref;
6206 }
6207 page->m_next_page = RNIL;
6208
6209 goto loop;
6210 }
6211
6212 void
free_page(Uint32 page_id,Buffer_page * page)6213 Suma::free_page(Uint32 page_id, Buffer_page* page)
6214 {
6215 Ptr<Page_chunk> ptr;
6216 ndbrequire(page->m_page_state == SUMA_SEQUENCE);
6217
6218 Uint32 chunk= page->m_page_chunk_ptr_i;
6219
6220 c_page_chunk_pool.getPtr(ptr, chunk);
6221
6222 ptr.p->m_free ++;
6223 page->m_next_page = m_first_free_page;
6224 ndbrequire(ptr.p->m_free <= ptr.p->m_size);
6225
6226 m_first_free_page = page_id;
6227 }
6228
6229 void
release_gci(Signal * signal,Uint32 buck,Uint64 gci)6230 Suma::release_gci(Signal* signal, Uint32 buck, Uint64 gci)
6231 {
6232 Bucket* bucket= c_buckets+buck;
6233 Uint32 tail= bucket->m_buffer_tail;
6234 Page_pos head= bucket->m_buffer_head;
6235 Uint64 max_acked = bucket->m_max_acked_gci;
6236
6237 const Uint32 mask = Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND;
6238 if(unlikely(bucket->m_state & mask))
6239 {
6240 jam();
6241 ndbout_c("release_gci(%d, %u/%u) 0x%x-> node failure -> abort",
6242 buck, Uint32(gci >> 32), Uint32(gci), bucket->m_state);
6243 return;
6244 }
6245
6246 bucket->m_max_acked_gci = (max_acked > gci ? max_acked : gci);
6247 if(unlikely(tail == RNIL))
6248 {
6249 return;
6250 }
6251
6252 if(tail == head.m_page_id)
6253 {
6254 if(gci >= head.m_max_gci)
6255 {
6256 jam();
6257 if (ERROR_INSERTED(13034))
6258 {
6259 jam();
6260 SET_ERROR_INSERT_VALUE(13035);
6261 return;
6262 }
6263 if (ERROR_INSERTED(13035))
6264 {
6265 CLEAR_ERROR_INSERT_VALUE;
6266 NodeReceiverGroup rg(CMVMI, c_nodes_in_nodegroup_mask);
6267 rg.m_nodes.clear(getOwnNodeId());
6268 signal->theData[0] = 9999;
6269 sendSignal(rg, GSN_NDB_TAMPER, signal, 1, JBA);
6270 return;
6271 }
6272 head.m_page_pos = 0;
6273 head.m_max_gci = gci;
6274 head.m_last_gci = 0;
6275 bucket->m_buffer_head = head;
6276 }
6277 return;
6278 }
6279 else
6280 {
6281 jam();
6282 Buffer_page* page= c_page_pool.getPtr(tail);
6283 Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6284 Uint32 next_page = page->m_next_page;
6285
6286 ndbassert(max_gci != 0);
6287
6288 if(gci >= max_gci)
6289 {
6290 jam();
6291 free_page(tail, page);
6292
6293 bucket->m_buffer_tail = next_page;
6294 signal->theData[0] = SumaContinueB::RELEASE_GCI;
6295 signal->theData[1] = buck;
6296 signal->theData[2] = (Uint32)(gci >> 32);
6297 signal->theData[3] = (Uint32)(gci & 0xFFFFFFFF);
6298 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 4, JBB);
6299 return;
6300 }
6301 else
6302 {
6303 //ndbout_c("do nothing...");
6304 }
6305 }
6306 }
6307
6308 static Uint32 g_cnt = 0;
6309
6310 void
start_resend(Signal * signal,Uint32 buck)6311 Suma::start_resend(Signal* signal, Uint32 buck)
6312 {
6313 printf("start_resend(%d, ", buck);
6314
6315 /**
6316 * Resend from m_max_acked_gci + 1 until max_gci + 1
6317 */
6318 Bucket* bucket= c_buckets + buck;
6319 Page_pos pos= bucket->m_buffer_head;
6320
6321 if(m_out_of_buffer_gci)
6322 {
6323 Ptr<Gcp_record> gcp;
6324 c_gcp_list.last(gcp);
6325 signal->theData[0] = NDB_LE_SubscriptionStatus;
6326 signal->theData[1] = 2; // INCONSISTENT;
6327 signal->theData[2] = 0; // Not used
6328 signal->theData[3] = (Uint32) pos.m_max_gci;
6329 signal->theData[4] = (Uint32) (gcp.p->m_gci >> 32);
6330 sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 5, JBB);
6331 m_missing_data = true;
6332 return;
6333 }
6334
6335 if(pos.m_page_id == RNIL)
6336 {
6337 jam();
6338 m_active_buckets.set(buck);
6339 m_gcp_complete_rep_count ++;
6340 ndbout_c("empty bucket(RNIL) -> active max_acked: %u/%u max_gci: %u/%u",
6341 Uint32(bucket->m_max_acked_gci >> 32),
6342 Uint32(bucket->m_max_acked_gci),
6343 Uint32(pos.m_max_gci >> 32),
6344 Uint32(pos.m_max_gci));
6345 return;
6346 }
6347
6348 Uint64 min= bucket->m_max_acked_gci + 1;
6349 Uint64 max = m_max_seen_gci;
6350
6351 ndbrequire(max <= m_max_seen_gci);
6352
6353 if(min > max)
6354 {
6355 ndbrequire(pos.m_page_id == bucket->m_buffer_tail);
6356 m_active_buckets.set(buck);
6357 m_gcp_complete_rep_count ++;
6358 ndbout_c("empty bucket (%u/%u %u/%u) -> active",
6359 Uint32(min >> 32), Uint32(min),
6360 Uint32(max >> 32), Uint32(max));
6361 return;
6362 }
6363
6364 g_cnt = 0;
6365 bucket->m_state |= (Bucket::BUCKET_TAKEOVER | Bucket::BUCKET_RESEND);
6366 bucket->m_switchover_node = get_responsible_node(buck);
6367 bucket->m_switchover_gci = max;
6368
6369 m_switchover_buckets.set(buck);
6370
6371 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6372 signal->theData[1] = buck;
6373 signal->theData[2] = (Uint32)(min >> 32);
6374 signal->theData[3] = 0;
6375 signal->theData[4] = 0;
6376 signal->theData[5] = (Uint32)(min & 0xFFFFFFFF);
6377 signal->theData[6] = 0;
6378 sendSignal(reference(), GSN_CONTINUEB, signal, 7, JBB);
6379
6380 ndbout_c("min: %u/%u - max: %u/%u) page: %d",
6381 Uint32(min >> 32), Uint32(min), Uint32(max >> 32), Uint32(max),
6382 bucket->m_buffer_tail);
6383 ndbrequire(max >= min);
6384 }
6385
6386 void
resend_bucket(Signal * signal,Uint32 buck,Uint64 min_gci,Uint32 pos,Uint64 last_gci)6387 Suma::resend_bucket(Signal* signal, Uint32 buck, Uint64 min_gci,
6388 Uint32 pos, Uint64 last_gci)
6389 {
6390 Bucket* bucket= c_buckets+buck;
6391 Uint32 tail= bucket->m_buffer_tail;
6392
6393 Buffer_page* page= c_page_pool.getPtr(tail);
6394 Uint64 max_gci = page->m_max_gci_lo | (Uint64(page->m_max_gci_hi) << 32);
6395 Uint32 next_page = page->m_next_page;
6396 Uint32 *ptr = page->m_data + pos;
6397 Uint32 *end = page->m_data + page->m_words_used;
6398 bool delay = false;
6399
6400 ndbrequire(tail != RNIL);
6401
6402 if(tail == bucket->m_buffer_head.m_page_id)
6403 {
6404 max_gci= bucket->m_buffer_head.m_max_gci;
6405 end= page->m_data + bucket->m_buffer_head.m_page_pos;
6406 next_page= RNIL;
6407
6408 if(ptr == end)
6409 {
6410 delay = true;
6411 goto next;
6412 }
6413 }
6414 else if(pos == 0 && min_gci > max_gci)
6415 {
6416 free_page(tail, page);
6417 tail = bucket->m_buffer_tail = next_page;
6418 goto next;
6419 }
6420
6421 #if 0
6422 for(Uint32 i = 0; i<page->m_words_used; i++)
6423 {
6424 printf("%.8x ", page->m_data[i]);
6425 if(((i + 1) % 8) == 0)
6426 printf("\n");
6427 }
6428 printf("\n");
6429 #endif
6430
6431 while(ptr < end)
6432 {
6433 Uint32 *src = ptr;
6434 Uint32 tmp = * src++;
6435 Uint32 sz = tmp & 0xFFFF;
6436
6437 ptr += sz;
6438
6439 if(! (tmp & (0x8000 << 16)))
6440 {
6441 ndbrequire(sz >= Buffer_page::GCI_SZ32);
6442 sz -= Buffer_page::GCI_SZ32;
6443 Uint32 last_gci_hi = * src++;
6444 Uint32 last_gci_lo = * src++;
6445 last_gci = last_gci_lo | (Uint64(last_gci_hi) << 32);
6446 }
6447 else
6448 {
6449 ndbrequire(ptr - sz > page->m_data);
6450 }
6451
6452 if(last_gci < min_gci)
6453 {
6454 continue;
6455 }
6456
6457 ndbrequire(sz);
6458 sz --; // remove *len* part of sz
6459
6460 if(sz == 0)
6461 {
6462 SubGcpCompleteRep * rep = (SubGcpCompleteRep*)signal->getDataPtrSend();
6463 rep->gci_hi = (Uint32)(last_gci >> 32);
6464 rep->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6465 rep->flags = (m_missing_data)
6466 ? SubGcpCompleteRep::MISSING_DATA
6467 : 0;
6468 rep->senderRef = reference();
6469 rep->gcp_complete_rep_count = 1;
6470
6471 if (ERROR_INSERTED(13036))
6472 {
6473 jam();
6474 CLEAR_ERROR_INSERT_VALUE;
6475 ndbout_c("Simulating out of event buffer at node failure");
6476 rep->flags |= SubGcpCompleteRep::MISSING_DATA;
6477 }
6478
6479 char buf[255];
6480 c_subscriber_nodes.getText(buf);
6481 if (g_cnt)
6482 {
6483 ndbout_c("resending GCI: %u/%u rows: %d -> %s",
6484 Uint32(last_gci >> 32), Uint32(last_gci), g_cnt, buf);
6485 }
6486 g_cnt = 0;
6487
6488 NodeReceiverGroup rg(API_CLUSTERMGR, c_subscriber_nodes);
6489 sendSignal(rg, GSN_SUB_GCP_COMPLETE_REP, signal,
6490 SubGcpCompleteRep::SignalLength, JBB);
6491 }
6492 else
6493 {
6494 const uint buffer_header_sz = 6;
6495 g_cnt++;
6496 Uint32 subPtrI = * src++ ;
6497 Uint32 schemaVersion = * src++;
6498 Uint32 event = * src >> 16;
6499 Uint32 sz_1 = (* src ++) & 0xFFFF;
6500 Uint32 any_value = * src++;
6501 Uint32 transId1 = * src++;
6502 Uint32 transId2 = * src++;
6503
6504 ndbassert(sz - buffer_header_sz >= sz_1);
6505
6506 LinearSectionPtr ptr[3];
6507 const Uint32 nptr= reformat(signal, ptr,
6508 src, sz_1,
6509 src + sz_1, sz - buffer_header_sz - sz_1);
6510 Uint32 ptrLen= 0;
6511 for(Uint32 i =0; i < nptr; i++)
6512 ptrLen+= ptr[i].sz;
6513
6514 /**
6515 * Signal to subscriber(s)
6516 */
6517 Ptr<Subscription> subPtr;
6518 c_subscriptionPool.getPtr(subPtr, subPtrI);
6519 Ptr<Table> tabPtr;
6520 c_tablePool.getPtr(tabPtr, subPtr.p->m_table_ptrI);
6521 Uint32 table = subPtr.p->m_tableId;
6522 if (table_version_major(tabPtr.p->m_schemaVersion) ==
6523 table_version_major(schemaVersion))
6524 {
6525 SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
6526 data->gci_hi = (Uint32)(last_gci >> 32);
6527 data->gci_lo = (Uint32)(last_gci & 0xFFFFFFFF);
6528 data->tableId = table;
6529 data->requestInfo = 0;
6530 SubTableData::setOperation(data->requestInfo, event);
6531 data->flags = 0;
6532 data->anyValue = any_value;
6533 data->totalLen = ptrLen;
6534 data->transId1 = transId1;
6535 data->transId2 = transId2;
6536
6537 {
6538 LocalDLList<Subscriber> list(c_subscriberPool,
6539 subPtr.p->m_subscribers);
6540 SubscriberPtr subbPtr;
6541 for(list.first(subbPtr); !subbPtr.isNull(); list.next(subbPtr))
6542 {
6543 data->senderData = subbPtr.p->m_senderData;
6544 sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
6545 SubTableData::SignalLengthWithTransId, JBB, ptr, nptr);
6546 }
6547 }
6548 }
6549 }
6550
6551 break;
6552 }
6553
6554 if(ptr == end && (tail != bucket->m_buffer_head.m_page_id))
6555 {
6556 /**
6557 * release...
6558 */
6559 free_page(tail, page);
6560 tail = bucket->m_buffer_tail = next_page;
6561 pos = 0;
6562 last_gci = 0;
6563 }
6564 else
6565 {
6566 pos = Uint32(ptr - page->m_data);
6567 }
6568
6569 next:
6570 if(tail == RNIL)
6571 {
6572 bucket->m_state &= ~(Uint32)Bucket::BUCKET_RESEND;
6573 ndbassert(! (bucket->m_state & Bucket::BUCKET_TAKEOVER));
6574 ndbout_c("resend done...");
6575 return;
6576 }
6577
6578 signal->theData[0] = SumaContinueB::RESEND_BUCKET;
6579 signal->theData[1] = buck;
6580 signal->theData[2] = (Uint32)(min_gci >> 32);
6581 signal->theData[3] = pos;
6582 signal->theData[4] = (Uint32)(last_gci >> 32);
6583 signal->theData[5] = (Uint32)(min_gci & 0xFFFFFFFF);
6584 signal->theData[6] = (Uint32)(last_gci & 0xFFFFFFFF);
6585 if(!delay)
6586 sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 7, JBB);
6587 else
6588 sendSignalWithDelay(SUMA_REF, GSN_CONTINUEB, signal, 10, 7);
6589 }
6590
6591 void
execGCP_PREPARE(Signal * signal)6592 Suma::execGCP_PREPARE(Signal *signal)
6593 {
6594 jamEntry();
6595 const GCPPrepare *prep = (const GCPPrepare *)signal->getDataPtr();
6596 m_current_gci = prep->gci_lo | (Uint64(prep->gci_hi) << 32);
6597 }
6598
6599 Uint64
get_current_gci(Signal *)6600 Suma::get_current_gci(Signal*)
6601 {
6602 return m_current_gci;
6603 }
6604
6605 void
execCREATE_NODEGROUP_IMPL_REQ(Signal * signal)6606 Suma::execCREATE_NODEGROUP_IMPL_REQ(Signal* signal)
6607 {
6608 CreateNodegroupImplReq reqCopy = *(CreateNodegroupImplReq*)
6609 signal->getDataPtr();
6610 CreateNodegroupImplReq *req = &reqCopy;
6611
6612 Uint32 err = 0;
6613 Uint32 rt = req->requestType;
6614
6615 NdbNodeBitmask tmp;
6616 for (Uint32 i = 0; i<NDB_ARRAY_SIZE(req->nodes) && req->nodes[i]; i++)
6617 {
6618 tmp.set(req->nodes[i]);
6619 }
6620 Uint32 cnt = tmp.count();
6621 Uint32 group = req->nodegroupId;
6622
6623 switch(rt){
6624 case CreateNodegroupImplReq::RT_ABORT:
6625 jam();
6626 break;
6627 case CreateNodegroupImplReq::RT_PARSE:
6628 jam();
6629 break;
6630 case CreateNodegroupImplReq::RT_PREPARE:
6631 jam();
6632 break;
6633 case CreateNodegroupImplReq::RT_COMMIT:
6634 jam();
6635 break;
6636 case CreateNodegroupImplReq::RT_COMPLETE:
6637 jam();
6638 CRASH_INSERTION(13043);
6639
6640 Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6641 ndbrequire(gci > m_last_complete_gci);
6642
6643 Uint32 state = 0;
6644 if (c_nodeGroup != RNIL)
6645 {
6646 jam();
6647 NdbNodeBitmask check = tmp;
6648 check.bitAND(c_nodes_in_nodegroup_mask);
6649 ndbrequire(check.isclear());
6650 ndbrequire(c_nodeGroup != group);
6651 ndbrequire(cnt == c_nodes_in_nodegroup_mask.count());
6652 state = Bucket::BUCKET_CREATED_OTHER;
6653 }
6654 else if (tmp.get(getOwnNodeId()))
6655 {
6656 jam();
6657 c_nodeGroup = group;
6658 c_nodes_in_nodegroup_mask.assign(tmp);
6659 fix_nodegroup();
6660 state = Bucket::BUCKET_CREATED_SELF;
6661 }
6662 if (state != 0)
6663 {
6664 for (Uint32 i = 0; i<c_no_of_buckets; i++)
6665 {
6666 jam();
6667 m_switchover_buckets.set(i);
6668 c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6669 c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6670 }
6671 }
6672 }
6673
6674 {
6675 CreateNodegroupImplConf* conf =
6676 (CreateNodegroupImplConf*)signal->getDataPtrSend();
6677 conf->senderRef = reference();
6678 conf->senderData = req->senderData;
6679 sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_CONF, signal,
6680 CreateNodegroupImplConf::SignalLength, JBB);
6681 }
6682 return;
6683
6684 //error:
6685 CreateNodegroupImplRef *ref =
6686 (CreateNodegroupImplRef*)signal->getDataPtrSend();
6687 ref->senderRef = reference();
6688 ref->senderData = req->senderData;
6689 ref->errorCode = err;
6690 sendSignal(req->senderRef, GSN_CREATE_NODEGROUP_IMPL_REF, signal,
6691 CreateNodegroupImplRef::SignalLength, JBB);
6692 return;
6693 }
6694
6695 void
execDROP_NODEGROUP_IMPL_REQ(Signal * signal)6696 Suma::execDROP_NODEGROUP_IMPL_REQ(Signal* signal)
6697 {
6698 DropNodegroupImplReq reqCopy = *(DropNodegroupImplReq*)
6699 signal->getDataPtr();
6700 DropNodegroupImplReq *req = &reqCopy;
6701
6702 Uint32 err = 0;
6703 Uint32 rt = req->requestType;
6704 Uint32 group = req->nodegroupId;
6705
6706 switch(rt){
6707 case DropNodegroupImplReq::RT_ABORT:
6708 jam();
6709 break;
6710 case DropNodegroupImplReq::RT_PARSE:
6711 jam();
6712 break;
6713 case DropNodegroupImplReq::RT_PREPARE:
6714 jam();
6715 break;
6716 case DropNodegroupImplReq::RT_COMMIT:
6717 jam();
6718 break;
6719 case DropNodegroupImplReq::RT_COMPLETE:
6720 jam();
6721 CRASH_INSERTION(13043);
6722
6723 Uint64 gci = (Uint64(req->gci_hi) << 32) | req->gci_lo;
6724 ndbrequire(gci > m_last_complete_gci);
6725
6726 Uint32 state;
6727 if (c_nodeGroup != group)
6728 {
6729 jam();
6730 state = Bucket::BUCKET_DROPPED_OTHER;
6731 break;
6732 }
6733 else
6734 {
6735 jam();
6736 state = Bucket::BUCKET_DROPPED_SELF;
6737 }
6738
6739 for (Uint32 i = 0; i<c_no_of_buckets; i++)
6740 {
6741 jam();
6742 m_switchover_buckets.set(i);
6743 if (c_buckets[i].m_state != 0)
6744 {
6745 jamLine(c_buckets[i].m_state);
6746 ndbout_c("c_buckets[%u].m_state: %u", i, c_buckets[i].m_state);
6747 }
6748 ndbrequire(c_buckets[i].m_state == 0); // XXX todo
6749 c_buckets[i].m_switchover_gci = gci - 1; // start from gci
6750 c_buckets[i].m_state = state | (c_no_of_buckets << 8);
6751 }
6752 break;
6753 }
6754
6755 {
6756 DropNodegroupImplConf* conf =
6757 (DropNodegroupImplConf*)signal->getDataPtrSend();
6758 conf->senderRef = reference();
6759 conf->senderData = req->senderData;
6760 sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_CONF, signal,
6761 DropNodegroupImplConf::SignalLength, JBB);
6762 }
6763 return;
6764
6765 //error:
6766 DropNodegroupImplRef *ref =
6767 (DropNodegroupImplRef*)signal->getDataPtrSend();
6768 ref->senderRef = reference();
6769 ref->senderData = req->senderData;
6770 ref->errorCode = err;
6771 sendSignal(req->senderRef, GSN_DROP_NODEGROUP_IMPL_REF, signal,
6772 DropNodegroupImplRef::SignalLength, JBB);
6773 return;
6774 }
6775
6776 template void append(DataBuffer<11>&,SegmentedSectionPtr,SectionSegmentPool&);
6777
6778